19 #ifndef SHM_BUFFER_HPP
20 #define SHM_BUFFER_HPP
22 #include <ndb_global.h>
47 Uint32 * _writeIndex) :
48 m_startOfBuffer(_startOfBuffer),
49 m_totalBufferSize(_sizeOfBuffer),
50 m_bufferSize(_sizeOfBuffer - _slack),
51 m_sharedReadIndex(_readIndex),
52 m_sharedWriteIndex(_writeIndex)
63 inline bool empty()
const;
71 inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);
79 char *
const m_startOfBuffer;
80 Uint32 m_totalBufferSize;
84 Uint32 * m_sharedReadIndex;
85 Uint32 * m_sharedWriteIndex;
90 SHM_Reader::empty()
const{
91 bool ret = (m_readIndex == * m_sharedWriteIndex);
105 Uint32 tReadIndex = m_readIndex;
106 Uint32 tWriteIndex = * m_sharedWriteIndex;
108 ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
110 if(tReadIndex <= tWriteIndex){
111 eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
113 eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
124 Uint32 tReadIndex = ((
char*)ptr) - m_startOfBuffer;
126 assert(tReadIndex < m_totalBufferSize);
128 if(tReadIndex >= m_bufferSize){
132 m_readIndex = tReadIndex;
133 * m_sharedReadIndex = tReadIndex;
136 #define WRITER_SLACK 4
141 Uint32 _sizeOfBuffer,
144 Uint32 * _writeIndex) :
145 m_startOfBuffer(_startOfBuffer),
146 m_totalBufferSize(_sizeOfBuffer),
147 m_bufferSize(_sizeOfBuffer - _slack),
148 m_sharedReadIndex(_readIndex),
149 m_sharedWriteIndex(_writeIndex)
157 inline char * getWritePtr(Uint32 sz);
158 inline void updateWritePtr(Uint32 sz);
160 inline Uint32 getWriteIndex()
const {
return m_writeIndex;}
161 inline Uint32 getBufferSize()
const {
return m_bufferSize;}
162 inline Uint32 get_free_buffer()
const;
164 inline void copyIndexes(
SHM_Writer * standbyWriter);
167 inline Uint32
writev(
const struct iovec *vec,
int count);
170 char *
const m_startOfBuffer;
171 Uint32 m_totalBufferSize;
176 Uint32 * m_sharedReadIndex;
177 Uint32 * m_sharedWriteIndex;
182 SHM_Writer::getWritePtr(Uint32 sz){
183 Uint32 tReadIndex = * m_sharedReadIndex;
184 Uint32 tWriteIndex = m_writeIndex;
186 char * ptr = &m_startOfBuffer[tWriteIndex];
189 if(tReadIndex <= tWriteIndex){
190 free = m_bufferSize + tReadIndex - tWriteIndex;
192 free = tReadIndex - tWriteIndex;
205 SHM_Writer::updateWritePtr(Uint32 sz){
207 assert(m_writeIndex == * m_sharedWriteIndex);
209 Uint32 tWriteIndex = m_writeIndex;
212 assert(tWriteIndex < m_totalBufferSize);
214 if(tWriteIndex >= m_bufferSize){
218 m_writeIndex = tWriteIndex;
219 * m_sharedWriteIndex = tWriteIndex;
224 SHM_Writer::get_free_buffer()
const
226 Uint32 tReadIndex = * m_sharedReadIndex;
227 Uint32 tWriteIndex = m_writeIndex;
230 if(tReadIndex <= tWriteIndex){
231 free = m_bufferSize + tReadIndex - tWriteIndex;
233 free = tReadIndex - tWriteIndex;
242 Uint32 tReadIndex = * m_sharedReadIndex;
243 Uint32 tWriteIndex = m_writeIndex;
258 for (
int i = 0;
i < count;
i++)
260 unsigned char *ptr = (
unsigned char *)vec[
i].iov_base;
261 Uint32 remain = vec[
i].iov_len;
264 if (tReadIndex <= tWriteIndex)
267 if (tWriteIndex + remain > m_bufferSize)
268 maxBytes = (m_bufferSize - tWriteIndex)/4;
271 segment = 4*TransporterRegistry::unpack_length_words((Uint32 *)ptr,
274 memcpy(m_startOfBuffer + tWriteIndex, ptr, segment);
281 if (remain > tReadIndex)
282 maxBytes = tReadIndex;
285 segment = 4*TransporterRegistry::unpack_length_words((Uint32 *)ptr,
288 memcpy(m_startOfBuffer, ptr, segment);
290 tWriteIndex = segment;
291 if (remain > segment)
297 if (tWriteIndex + remain > tReadIndex)
298 maxBytes = tReadIndex - tWriteIndex;
301 segment = 4*TransporterRegistry::unpack_length_words((Uint32 *)ptr,
304 memcpy(m_startOfBuffer + tWriteIndex, ptr, segment);
306 tWriteIndex += segment;
307 if (remain > segment)
312 m_writeIndex = tWriteIndex;
313 *m_sharedWriteIndex = tWriteIndex;