18 #include <ndb_global.h>
20 #include "AsyncFile.hpp"
21 #include <ErrorHandlingMacros.hpp>
22 #include <kernel_types.h>
23 #include <ndbd_malloc.hpp>
24 #include <NdbThread.h>
25 #include <signaldata/FsRef.hpp>
26 #include <signaldata/FsOpenReq.hpp>
27 #include <signaldata/FsReadWriteReq.hpp>
28 #include <Configuration.hpp>
36 m_resource_group = RNIL;
40 theWriteBufferSize = 0;
47 ndbout_c(
"%p:%s attach to %p (m_thread: %p)",
this, theFileName.c_str(),
thr,
50 assert(m_thread == 0);
58 ndbout_c(
"%p:%s detach from %p",
this, theFileName.c_str(),
thr);
60 assert(m_thread == thr);
67 for(
int i = 0;
i < request->par.readWrite.numberOfPages ;
i++)
69 off_t
offset = request->par.readWrite.pages[
i].offset;
70 size_t size = request->par.readWrite.pages[
i].size;
71 char *
buf = request->par.readWrite.pages[
i].buf;
73 int err = readBuffer(request, buf, size, offset);
91 const Uint32 cnt = request->par.readWrite.numberOfPages;
92 if (theWriteBuffer == 0 || cnt == 1)
94 for (Uint32
i = 0;
i<cnt;
i++)
96 int err = writeBuffer(request->par.readWrite.pages[
i].buf,
97 request->par.readWrite.pages[
i].size,
98 request->par.readWrite.pages[
i].offset);
101 request->error = err;
110 bool write_not_complete =
true;
112 while(write_not_complete) {
114 off_t
offset = request->par.readWrite.pages[page_num].offset;
115 char* bufptr = theWriteBuffer;
117 write_not_complete =
false;
118 if (request->par.readWrite.numberOfPages > 1) {
122 for(
int i=page_num;
i < request->par.readWrite.numberOfPages;
i++) {
124 request->par.readWrite.pages[
i].buf,
125 request->par.readWrite.pages[
i].size);
126 bufptr += request->par.readWrite.pages[
i].size;
127 totsize += request->par.readWrite.pages[
i].size;
128 if (((
i + 1) < request->par.readWrite.numberOfPages)) {
131 off_t tmp = page_offset + request->par.readWrite.pages[
i].size;
132 if (tmp != request->par.readWrite.pages[
i+1].offset) {
134 DEBUG(ndbout_c(
"Page offsets are not aligned"));
135 request->error = EINVAL;
138 if ((
unsigned)(totsize + request->par.readWrite.pages[
i+1].size) > (
unsigned)theWriteBufferSize) {
140 write_not_complete =
true;
146 page_offset += request->par.readWrite.pages[
i].size;
148 bufptr = theWriteBuffer;
151 bufptr = request->par.readWrite.pages[0].buf;
152 totsize = request->par.readWrite.pages[0].size;
154 int err = writeBuffer(bufptr, totsize, offset);
156 request->error = err;
162 if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq)
174 #ifdef DEBUG_ASYNCFILE
175 void printErrorAndFlags(Uint32 used_flags) {
177 sprintf(buf,
"PEAF: errno=%d \"", errno);
179 strcat(buf, strerror(errno));
182 strcat(buf,
" flags: ");
183 switch(used_flags & 3){
185 strcat(buf,
"O_RDONLY, ");
188 strcat(buf,
"O_WRONLY, ");
191 strcat(buf,
"O_RDWR, ");
194 strcat(buf,
"Unknown!!, ");
197 if((used_flags & O_APPEND)==O_APPEND)
198 strcat(buf,
"O_APPEND, ");
199 if((used_flags & O_CREAT)==O_CREAT)
200 strcat(buf,
"O_CREAT, ");
201 if((used_flags & O_EXCL)==O_EXCL)
202 strcat(buf,
"O_EXCL, ");
203 if((used_flags & O_NOCTTY) == O_NOCTTY)
204 strcat(buf,
"O_NOCTTY, ");
205 if((used_flags & O_NONBLOCK)==O_NONBLOCK)
206 strcat(buf,
"O_NONBLOCK, ");
207 if((used_flags & O_TRUNC)==O_TRUNC)
208 strcat(buf,
"O_TRUNC, ");
210 if((used_flags & O_DSYNC)==O_DSYNC)
211 strcat(buf,
"O_DSYNC, ");
213 if((used_flags & O_NDELAY)==O_NDELAY)
214 strcat(buf,
"O_NDELAY, ");
216 if((used_flags & O_RSYNC)==O_RSYNC)
217 strcat(buf,
"O_RSYNC, ");
220 if((used_flags & O_SYNC)==O_SYNC)
221 strcat(buf,
"O_SYNC, ");
223 DEBUG(ndbout_c(buf));
229 operator<<(NdbOut& out,
const Request& req)
231 out <<
"[ Request: file: " << hex << req.file
232 <<
" userRef: " << hex << req.theUserReference
233 <<
" userData: " << dec << req.theUserPointer
234 <<
" theFilePointer: " << req.theFilePointer
243 case Request::closeRemove:
244 out <<
"closeRemove";
255 case Request::writev:
258 case Request::writeSync:
262 case Request::writevSync:
271 case Request::append:
278 out << (Uint32)req.action;