16 #include <ndb_global.h>
18 #include "AsyncIoThread.hpp"
19 #include "AsyncFile.hpp"
20 #include <ErrorHandlingMacros.hpp>
21 #include <kernel_types.h>
22 #include <NdbThread.h>
23 #include <signaldata/FsRef.hpp>
24 #include <signaldata/FsOpenReq.hpp>
25 #include <signaldata/FsReadWriteReq.hpp>
26 #include <signaldata/AllocMem.hpp>
30 #include <EventLogger.hpp>
33 AsyncIoThread::AsyncIoThread(
class Ndbfs& fs,
bool bound)
39 theMemoryChannelPtr = &m_fs.theToBoundThreads;
43 theMemoryChannelPtr = &m_fs.theToUnboundThreads;
45 theReportTo = &m_fs.theFromThreads;
48 static int numAsyncFiles = 0;
52 runAsyncIoThread(
void* arg)
60 AsyncIoThread::doStart()
63 #if !defined(DBUG_OFF) && defined (__hpux)
65 const NDB_THREAD_STACKSIZE stackSize = 32768;
68 const NDB_THREAD_STACKSIZE stackSize = 8192;
75 theStartMutexPtr = NdbMutex_Create();
76 theStartConditionPtr = NdbCondition_Create();
77 NdbMutex_Lock(theStartMutexPtr);
80 theThreadPtr = NdbThread_Create(runAsyncIoThread,
84 NDB_THREAD_PRIO_MEAN);
86 if (theThreadPtr == 0)
88 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
89 "",
"Could not allocate file system thread");
94 NdbCondition_Wait(theStartConditionPtr,
97 while (theStartFlag ==
false);
99 NdbMutex_Unlock(theStartMutexPtr);
100 NdbMutex_Destroy(theStartMutexPtr);
101 NdbCondition_Destroy(theStartConditionPtr);
107 AsyncIoThread::shutdown()
111 request.action = Request::end;
112 this->theMemoryChannelPtr->writeChannel( &request );
113 NdbThread_WaitFor(theThreadPtr, &status);
114 NdbThread_Destroy(&theThreadPtr);
120 assert(m_current_file);
121 assert(m_current_file->getThread() ==
this);
122 assert(theMemoryChannelPtr == &theMemoryChannel);
123 theMemoryChannelPtr->writeChannel(request);
132 NdbMutex_Lock(theStartMutexPtr);
134 NdbMutex_Unlock(theStartMutexPtr);
135 NdbCondition_Signal(theStartConditionPtr);
139 request = theMemoryChannelPtr->readChannel();
140 if (!request || request->action == Request::end)
142 DEBUG(ndbout_c(
"Nothing read from Memory Channel in AsyncFile"));
143 theStartFlag =
false;
148 m_current_request= request;
149 switch (request->action) {
151 file->openReq(request);
152 if (request->error == 0 && request->m_do_bind)
156 file->closeReq(request);
159 case Request::closeRemove:
160 file->closeReq(request);
161 file->removeReq(request);
164 case Request::readPartial:
169 file->readvReq(request);
174 case Request::writev:
175 file->writevReq(request);
177 case Request::writeSync:
179 file->syncReq(request);
181 case Request::writevSync:
182 file->writevReq(request);
183 file->syncReq(request);
186 file->syncReq(request);
188 case Request::append:
189 file->appendReq(request);
191 case Request::append_synch:
192 file->appendReq(request);
193 file->syncReq(request);
196 file->rmrfReq(request, file->theFileName.c_str(),
197 request->par.rmrf.own_directory);
200 theStartFlag =
false;
202 case Request::allocmem:
204 allocMemReq(request);
207 case Request::buildindx:
208 buildIndxReq(request);
210 case Request::suspend:
211 if (request->par.suspend.milliseconds)
213 g_eventLogger->
debug(
"Suspend %s %u ms",
214 file->theFileName.c_str(),
215 request->par.suspend.milliseconds);
216 NdbSleep_MilliSleep(request->par.suspend.milliseconds);
221 g_eventLogger->
debug(
"Suspend %s",
222 file->theFileName.c_str());
223 theStartFlag =
false;
227 DEBUG(ndbout_c(
"Invalid Request"));
231 m_last_request = request;
232 m_current_request = 0;
235 theReportTo->writeChannelNoSignal(request);
241 AsyncIoThread::allocMemReq(
Request* request)
244 switch((request->par.alloc.requestInfo & 255)){
245 case AllocMemReq::RT_MAP:{
246 bool memlock = !!(request->par.alloc.requestInfo & AllocMemReq::RT_MEMLOCK);
247 request->par.alloc.ctx->m_mm.map(&watchDog, memlock);
248 request->par.alloc.bytes = 0;
252 case AllocMemReq::RT_EXTEND:
257 request->par.alloc.bytes = 0;
264 AsyncIoThread::buildIndxReq(
Request* request)
267 memcpy(&req, &request->par.build.m_req,
sizeof(req));
268 req.mem_buffer = request->file->m_page_ptr.p;
269 req.buffer_size = request->file->m_page_cnt *
sizeof(
GlobalPage);
270 request->error = (* req.func_ptr)(&req);
276 assert(m_current_file == 0);
277 assert(theMemoryChannelPtr == &m_fs.theToBoundThreads);
278 m_current_file =
file;
279 theMemoryChannelPtr = &theMemoryChannel;
281 m_fs.cnt_active_bound(1);
287 if (m_current_file == 0)
289 assert(file->getThread() == 0);
293 assert(m_current_file == file);
294 assert(theMemoryChannelPtr = &theMemoryChannel);
296 theMemoryChannelPtr = &m_fs.theToBoundThreads;
298 m_fs.cnt_active_bound(-1);