18 #include <ndb_global.h>
21 #include "AsyncFile.hpp"
24 #include "Win32AsyncFile.hpp"
26 #include "PosixAsyncFile.hpp"
29 #include <signaldata/FsOpenReq.hpp>
30 #include <signaldata/FsCloseReq.hpp>
31 #include <signaldata/FsReadWriteReq.hpp>
32 #include <signaldata/FsAppendReq.hpp>
33 #include <signaldata/FsRemoveReq.hpp>
34 #include <signaldata/FsConf.hpp>
35 #include <signaldata/FsRef.hpp>
36 #include <signaldata/NdbfsContinueB.hpp>
37 #include <signaldata/DumpStateOrd.hpp>
38 #include <signaldata/AllocMem.hpp>
40 #include <RefConvert.hpp>
41 #include <portlib/NdbDir.hpp>
43 #include <Configuration.hpp>
45 #include <EventLogger.hpp>
48 NdbMutex g_active_bound_threads_mutex;
54 int log_qsize = baseAddrRef->bits.q;
55 int log_vsize = baseAddrRef->bits.v;
58 log_psize = log_qsize + log_vsize - 3;
59 return (1 << log_psize);
64 scanningInProgress(false),
68 m_bound_threads_cnt(0),
69 m_unbounds_threads_cnt(0),
70 m_active_bound_threads_cnt(0)
72 BLOCK_CONSTRUCTOR(
Ndbfs);
74 NdbMutex_Init(&g_active_bound_threads_mutex);
78 addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD);
79 addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR);
80 addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
81 addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
82 addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
83 addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
84 addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
85 addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
86 addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
87 addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
88 addRecSignal(GSN_ALLOC_MEM_REQ, &Ndbfs::execALLOC_MEM_REQ);
89 addRecSignal(GSN_SEND_PACKED, &Ndbfs::execSEND_PACKED,
true);
90 addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Ndbfs::execBUILD_INDX_IMPL_REQ);
92 addRecSignal(GSN_FSSUSPENDORD, &Ndbfs::execFSSUSPENDORD);
107 request.action = Request::end;
108 for (
unsigned i = 0;
i < theThreads.size();
i++)
110 theToBoundThreads.writeChannel(&request);
111 theToUnboundThreads.writeChannel(&request);
114 for (
unsigned i = 0;
i < theThreads.size();
i++)
123 for (
unsigned i = 0;
i < theThreads.size();
i++)
134 for (
unsigned i = 0;
i < theFiles.size();
i++){
142 delete theRequestPool;
147 do_mkdir(
const char * path)
149 return NdbDir::create(path,
150 NdbDir::u_rwx() | NdbDir::g_r() | NdbDir::g_x(),
158 const char * tmp = dst.
c_str();
159 unsigned len = dst.
length();
160 unsigned dslen = (unsigned)strlen(DIR_SEPARATOR);
162 if (len > dslen && strcmp(tmp+(len - dslen), DIR_SEPARATOR) != 0)
163 dst.
append(DIR_SEPARATOR);
173 memset(buf2, 0,
sizeof(buf2));
175 CreateDirectory(path, 0);
177 if(!GetFullPathName(path,
sizeof(buf2), buf2, &szFilePart) ||
178 (GetFileAttributes(buf2) & FILE_ATTRIBUTE_READONLY))
181 if (::realpath(path, buf2) == NULL ||
182 ::access(buf2, W_OK) != 0)
191 Ndbfs::get_base_path(Uint32 no)
const
193 if (no < NDB_ARRAY_SIZE(m_base_path) &&
194 strlen(m_base_path[no].c_str()) > 0)
197 return m_base_path[no];
200 return m_base_path[FsOpenReq::BP_FS];
208 Uint32 ref = req->senderRef;
209 Uint32 senderData = req->senderData;
212 m_ctx.m_config.getOwnConfigIterator();
215 tmp.
assfmt(
"ndb_%u_fs%s", getOwnNodeId(), DIR_SEPARATOR);
216 m_base_path[FsOpenReq::BP_FS].
assfmt(
"%s%s",
217 m_ctx.m_config.fileSystemPath(),
219 m_base_path[FsOpenReq::BP_BACKUP].
assign(m_ctx.m_config.backupFilePath());
221 const char * ddpath = 0;
222 ndb_mgm_get_string_parameter(p, CFG_DB_DD_FILESYSTEM_PATH, &ddpath);
225 const char * datapath = ddpath;
226 ndb_mgm_get_string_parameter(p, CFG_DB_DD_DATAFILE_PATH, &datapath);
235 add_path(path, datapath);
236 do_mkdir(path.
c_str());
237 add_path(path, tmp.
c_str());
238 do_mkdir(path.
c_str());
239 if (!validate_path(m_base_path[FsOpenReq::BP_DD_DF], path.
c_str()))
241 ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
242 m_base_path[FsOpenReq::BP_DD_DF].c_str(),
243 "FileSystemPathDataFiles");
249 const char * undopath = ddpath;
250 ndb_mgm_get_string_parameter(p, CFG_DB_DD_UNDOFILE_PATH, &undopath);
259 add_path(path, undopath);
260 do_mkdir(path.
c_str());
261 add_path(path, tmp.
c_str());
262 do_mkdir(path.
c_str());
264 if (!validate_path(m_base_path[FsOpenReq::BP_DD_UF], path.
c_str()))
266 ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
267 m_base_path[FsOpenReq::BP_DD_UF].c_str(),
268 "FileSystemPathUndoFiles");
274 ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
275 Uint32 noIdleFiles = 27;
277 ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
279 if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
280 m_maxFiles = noIdleFiles;
283 for (Uint32
i = 0;
i < noIdleFiles;
i++)
285 theIdleFiles.push_back(createAsyncFile());
289 theThreads.push_back(thr);
293 Uint32 threadpool = 2;
294 ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
297 for (Uint32
i = 0;
i < threadpool;
i++)
303 theThreads.push_back(thr);
315 conf->senderRef = reference();
316 conf->senderData = senderData;
317 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
318 ReadConfigConf::SignalLength, JBB);
321 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
322 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
336 Ndbfs::execSTTOR(
Signal* signal)
340 if(signal->theData[1] == 0){
343 do_mkdir(m_base_path[FsOpenReq::BP_FS].c_str());
346 ndbrequire(theOpenFiles.size() == 0);
348 signal->theData[3] = 255;
349 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
364 else if (request->m_do_bind)
366 theToBoundThreads.writeChannel(request);
370 theToUnboundThreads.writeChannel(request);
376 Ndbfs::execFSOPENREQ(
Signal* signal)
380 const BlockReference userRef = fsOpenReq->userReference;
382 bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
384 ndbrequire(file != NULL);
386 Uint32 userPointer = fsOpenReq->userPointer;
394 handle.getSection(ptr, FsOpenReq::FILENAME);
396 file->theFileName.set(
this, userRef, fsOpenReq->fileNumber,
false, ptr);
397 releaseSections(handle);
399 if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
404 m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &page_ptr.i, &cnt, 1);
407 file->m_page_ptr.setNull();
408 file->m_page_cnt = 0;
410 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
412 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
413 fsRef->osErrorCode = ~0;
414 sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
417 m_shared_page_pool.
getPtr(page_ptr);
418 file->set_buffer(RT_DBTUP_PAGE, page_ptr, cnt);
420 else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
423 Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE;
425 m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
429 file->m_page_ptr.setNull();
430 file->m_page_cnt = 0;
432 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
434 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
435 fsRef->osErrorCode = ~0;
436 sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
439 m_shared_page_pool.
getPtr(page_ptr);
440 file->set_buffer(RT_FILE_BUFFER, page_ptr, cnt);
444 ndbassert(file->m_page_ptr.isNull());
445 file->m_page_ptr.setNull();
446 file->m_page_cnt = 0;
449 if (getenv(
"NDB_TRACE_OPEN"))
450 ndbout_c(
"open(%s) bound: %u", file->theFileName.c_str(), bound);
452 Request* request = theRequestPool->get();
453 request->action = Request::open;
455 request->set(userRef, userPointer, newId() );
456 request->file =
file;
457 request->theTrace = signal->getTrace();
458 request->par.open.flags = fsOpenReq->fileFlags;
459 request->par.open.page_size = fsOpenReq->page_size;
460 request->par.open.file_size = fsOpenReq->file_size_hi;
461 request->par.open.file_size <<= 32;
462 request->par.open.file_size |= fsOpenReq->file_size_lo;
463 request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
464 request->m_do_bind = bound;
466 ndbrequire(forward(file, request));
470 Ndbfs::execFSREMOVEREQ(
Signal* signal)
474 const BlockReference userRef = req->userReference;
477 ndbrequire(file != NULL);
484 handle.getSection(ptr, FsOpenReq::FILENAME);
487 file->theFileName.set(
this, userRef, req->fileNumber, req->directory, ptr);
488 releaseSections(handle);
490 Uint32 version = FsOpenReq::getVersion(req->fileNumber);
491 Uint32 bp = FsOpenReq::v5_getLcpNo(req->fileNumber);
493 Request* request = theRequestPool->get();
494 request->action = Request::rmrf;
495 request->par.rmrf.directory = req->directory;
496 request->par.rmrf.own_directory = req->ownDirectory;
498 request->set(userRef, req->userPointer, newId() );
499 request->file =
file;
500 request->theTrace = signal->getTrace();
501 request->m_do_bind = bound;
505 ndbrequire(bp < NDB_ARRAY_SIZE(m_base_path));
506 if (strlen(m_base_path[bp].c_str()) == 0)
512 ndbrequire(forward(file, request));
515 report(request, signal);
523 Ndbfs::execFSCLOSEREQ(
Signal * signal)
527 const BlockReference userRef = fsCloseReq->userReference;
528 const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
529 const UintR userPointer = fsCloseReq->userPointer;
531 AsyncFile* openFile = theOpenFiles.find(filePointer);
532 if (openFile == NULL) {
536 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
538 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
539 fsRef->osErrorCode = ~0;
540 sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
542 g_eventLogger->
warning(
"Trying to close unknown file!! %u", userPointer);
543 g_eventLogger->
warning(
"Dumping files");
544 signal->theData[0] = 405;
545 execDUMP_STATE_ORD(signal);
549 if (getenv(
"NDB_TRACE_OPEN"))
550 ndbout_c(
"close(%s)", openFile->theFileName.c_str());
552 Request *request = theRequestPool->get();
553 if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
555 request->action = Request::closeRemove;
558 request->action = Request::close;
560 request->set(userRef, fsCloseReq->userPointer, filePointer);
561 request->file = openFile;
563 request->theTrace = signal->getTrace();
564 request->m_do_bind =
false;
566 ndbrequire(forward(openFile, request));
570 Ndbfs::readWriteRequest(
int action,
Signal * signal)
572 Uint32 theData[25 + 2 * 32];
573 memcpy(theData, signal->theData, 4 * signal->getLength());
575 if (handle.m_cnt > 0)
578 ndbrequire(handle.getSection(secPtr, 0));
579 ndbrequire(signal->getLength() + secPtr.sz < NDB_ARRAY_SIZE(theData));
580 copy(theData + signal->getLength(), secPtr);
581 releaseSections(handle);
585 Uint16 filePointer = (Uint16)fsRWReq->filePointer;
586 const UintR userPointer = fsRWReq->userPointer;
587 const BlockReference userRef = fsRWReq->userReference;
588 const BlockNumber blockNumber = refToMain(userRef);
589 const Uint32 instanceNumber = refToInstance(userRef);
591 AsyncFile* openFile = theOpenFiles.find(filePointer);
594 &getBat(blockNumber, instanceNumber)[fsRWReq->varIndex];
596 UintPtr tClusterSize;
600 FsRef::NdbfsErrorCodeType errorCode;
602 Request *request = theRequestPool->get();
604 request->set(userRef, userPointer, filePointer);
605 request->file = openFile;
606 request->action = (Request::Action) action;
607 request->theTrace = signal->getTrace();
608 request->m_do_bind =
false;
610 Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
612 if (fsRWReq->numberOfPages == 0) {
614 errorCode = FsRef::fsErrInvalidParameters;
618 if(format != FsReadWriteReq::fsFormatGlobalPage &&
619 format != FsReadWriteReq::fsFormatSharedPage)
621 if (fsRWReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
623 errorCode = FsRef::fsErrInvalidParameters;
626 if (myBaseAddrRef == NULL) {
628 errorCode = FsRef::fsErrInvalidParameters;
631 if (openFile == NULL) {
633 errorCode = FsRef::fsErrFileDoesNotExist;
636 tPageSize = pageSize(myBaseAddrRef);
637 tClusterSize = myBaseAddrRef->ClusterSize;
638 tNRR = myBaseAddrRef->nrr;
639 tWA = (
char*)myBaseAddrRef->WA;
644 case FsReadWriteReq::fsFormatListOfPairs: {
646 for (
unsigned int i = 0;
i < fsRWReq->numberOfPages;
i++) {
648 const UintPtr varIndex = fsRWReq->data.listOfPair[
i].varIndex;
649 const UintPtr fileOffset = fsRWReq->data.listOfPair[
i].fileOffset;
650 if (varIndex >= tNRR) {
652 errorCode = FsRef::fsErrInvalidParameters;
655 request->par.readWrite.pages[
i].buf = &tWA[varIndex * tClusterSize];
656 request->par.readWrite.pages[
i].size = tPageSize;
657 request->par.readWrite.pages[
i].offset = (off_t)(fileOffset*tPageSize);
659 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
664 case FsReadWriteReq::fsFormatArrayOfPages: {
665 if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
667 errorCode = FsRef::fsErrInvalidParameters;
670 const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
671 const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
673 request->par.readWrite.pages[0].offset = (off_t)(fileOffset * tPageSize);
674 request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
675 request->par.readWrite.numberOfPages = 1;
676 request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
681 case FsReadWriteReq::fsFormatListOfMemPages: {
683 tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
684 tPageOffset *= tPageSize;
686 for (
unsigned int i = 0;
i < fsRWReq->numberOfPages;
i++) {
688 UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[
i];
690 if (varIndex >= tNRR) {
692 errorCode = FsRef::fsErrInvalidParameters;
695 request->par.readWrite.pages[
i].buf = &tWA[varIndex * tClusterSize];
696 request->par.readWrite.pages[
i].size = tPageSize;
697 request->par.readWrite.pages[
i].offset = (off_t)
698 (tPageOffset + (
i*tPageSize));
700 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
707 errorCode = FsRef::fsErrInvalidParameters;
712 else if (format == FsReadWriteReq::fsFormatGlobalPage)
715 m_global_page_pool.
getPtr(ptr, fsRWReq->data.pageData[0]);
716 request->par.readWrite.pages[0].buf = (
char*)ptr.p;
717 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
718 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
719 request->par.readWrite.numberOfPages = 1;
723 ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
725 m_shared_page_pool.
getPtr(ptr, fsRWReq->data.pageData[0]);
726 request->par.readWrite.pages[0].buf = (
char*)ptr.p;
727 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
728 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
729 request->par.readWrite.numberOfPages = 1;
732 ndbrequire(forward(openFile, request));
736 theRequestPool->put(request);
737 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
739 fsRef->setErrorCode(fsRef->errorCode, errorCode);
740 fsRef->osErrorCode = ~0;
742 case Request:: write:
743 case Request:: writeSync: {
745 sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
748 case Request:: readPartial:
749 case Request:: read: {
751 sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
767 Ndbfs::execFSWRITEREQ(
Signal* signal)
772 if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) ==
true){
774 readWriteRequest( Request::writeSync, signal );
777 readWriteRequest( Request::write, signal );
791 Ndbfs::execFSREADREQ(
Signal* signal)
795 if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
796 readWriteRequest( Request::readPartial, signal );
798 readWriteRequest( Request::read, signal );
805 Ndbfs::execFSSYNCREQ(
Signal * signal)
808 Uint16 filePointer = (Uint16)signal->theData[0];
809 BlockReference userRef = signal->theData[1];
810 const UintR userPointer = signal->theData[2];
811 AsyncFile* openFile = theOpenFiles.find(filePointer);
813 if (openFile == NULL) {
815 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
817 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
818 fsRef->osErrorCode = ~0;
819 sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
823 Request *request = theRequestPool->get();
825 request->action = Request::sync;
826 request->set(userRef, userPointer, filePointer);
827 request->file = openFile;
828 request->theTrace = signal->getTrace();
829 request->m_do_bind =
false;
831 ndbrequire(forward(openFile,request));
838 Ndbfs::execFSSUSPENDORD(
Signal * signal)
841 Uint16 filePointer = (Uint16)signal->theData[0];
842 Uint32 millis = signal->theData[1];
843 AsyncFile* openFile = theOpenFiles.find(filePointer);
845 if (openFile == NULL)
851 Request *request = theRequestPool->get();
853 request->action = Request::suspend;
854 request->set(0, 0, filePointer);
855 request->file = openFile;
856 request->theTrace = signal->getTrace();
857 request->par.suspend.milliseconds = millis;
858 request->m_do_bind =
false;
860 ndbrequire(forward(openFile,request));
864 Ndbfs::execFSAPPENDREQ(
Signal * signal)
867 const Uint16 filePointer = (Uint16)fsReq->filePointer;
868 const UintR userPointer = fsReq->userPointer;
869 const BlockReference userRef = fsReq->userReference;
870 const BlockNumber blockNumber = refToMain(userRef);
871 const Uint32 instanceNumber = refToInstance(userRef);
873 FsRef::NdbfsErrorCodeType errorCode;
875 AsyncFile* openFile = theOpenFiles.find(filePointer);
877 &getBat(blockNumber, instanceNumber)[fsReq->varIndex];
879 const Uint32* tWA = (
const Uint32*)myBaseAddrRef->WA;
880 const Uint32 tSz = myBaseAddrRef->nrr;
881 const Uint32
offset = fsReq->offset;
882 const Uint32
size = fsReq->size;
883 const Uint32 synch_flag = fsReq->synch_flag;
884 Request *request = theRequestPool->get();
886 if (openFile == NULL) {
888 errorCode = FsRef::fsErrFileDoesNotExist;
892 if (myBaseAddrRef == NULL) {
894 errorCode = FsRef::fsErrInvalidParameters;
898 if (fsReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
900 errorCode = FsRef::fsErrInvalidParameters;
906 errorCode = FsRef::fsErrInvalidParameters;
911 request->set(userRef, userPointer, filePointer);
912 request->file = openFile;
913 request->theTrace = signal->getTrace();
915 request->par.append.buf = (
const char *)(tWA +
offset);
916 request->par.append.size =
size << 2;
919 request->action = Request::append;
921 request->action = Request::append_synch;
922 request->m_do_bind =
false;
923 ndbrequire(forward(openFile, request));
928 theRequestPool->put(request);
929 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
931 fsRef->setErrorCode(fsRef->errorCode, errorCode);
932 fsRef->osErrorCode = ~0;
935 sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
940 Ndbfs::execALLOC_MEM_REQ(
Signal* signal)
948 ndbrequire(file != NULL);
950 Request *request = theRequestPool->get();
953 request->set(req->senderRef, req->senderData, 0);
954 request->file =
file;
955 request->theTrace = signal->getTrace();
957 request->par.alloc.ctx = &m_ctx;
958 request->par.alloc.requestInfo = req->requestInfo;
959 request->par.alloc.bytes = (Uint64(req->bytes_hi) << 32) + req->bytes_lo;
960 request->action = Request::allocmem;
961 request->m_do_bind = bound;
962 ndbrequire(forward(file, request));
965 #include <signaldata/BuildIndxImpl.hpp>
968 Ndbfs::execBUILD_INDX_IMPL_REQ(
Signal* signal)
975 ndbrequire(file != NULL);
977 Request *request = theRequestPool->get();
979 request->set(req->senderRef, req->senderData, 0);
980 request->file =
file;
981 request->theTrace = signal->getTrace();
983 Uint32 cnt = (req->buffer_size + 32768 - 1) / 32768;
986 m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &page_ptr.i, &cnt, cnt);
989 file->m_page_ptr.setNull();
990 file->m_page_cnt = 0;
996 ndbrequire(cnt == save);
998 m_shared_page_pool.
getPtr(page_ptr);
999 file->set_buffer(RT_DBTUP_PAGE, page_ptr, cnt);
1001 memcpy(&request->par.build.m_req, req,
sizeof(* req));
1002 request->action = Request::buildindx;
1003 request->m_do_bind = bound;
1004 ndbrequire(forward(file, request));
1011 for (
int i = 1;
i < SHRT_MAX;
i++)
1013 if (theLastId == SHRT_MAX) {
1021 if(theOpenFiles.find(theLastId) == NULL) {
1032 Ndbfs::createAsyncFile()
1035 if (m_maxFiles !=0 && theFiles.size() == m_maxFiles)
1038 for (
unsigned i = 0;
i < theFiles.size();
i++){
1040 ndbout_c(
"%2d (0x%lx): %s",
i, (
long) file, file->isOpen()?
"OPEN":
"CLOSED");
1042 ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,
"",
" Ndbfs::createAsyncFile");
1053 ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,
"",
" Ndbfs::createAsyncFile");
1056 theFiles.push_back(file);
1063 assert(file->getThread() == 0);
1064 theIdleFiles.push_back(file);
1068 Ndbfs::createIoThread(
bool bound)
1074 ndbout_c(
"NDBFS: Created new file thread %d", theThreads.size());
1077 struct NdbThread* thrptr = thr->doStart();
1078 globalEmulatorData.theConfiguration->
addThread(thrptr, NdbfsThread);
1081 m_bound_threads_cnt++;
1083 m_unbounds_threads_cnt++;
1090 Ndbfs::getIdleFile(
bool bound)
1093 Uint32 sz = theIdleFiles.size();
1096 file = theIdleFiles[sz - 1];
1097 theIdleFiles.erase(sz - 1);
1101 file = createAsyncFile();
1109 if (m_active_bound_threads_cnt == m_bound_threads_cnt)
1114 theThreads.push_back(thr);
1122 Ndbfs::cnt_active_bound(
int val)
1124 Guard g(&g_active_bound_threads_mutex);
1128 assert(m_active_bound_threads_cnt >= (Uint32)val);
1129 m_active_bound_threads_cnt -= val;
1133 m_active_bound_threads_cnt += val;
1140 const Uint32 orgTrace = signal->getTrace();
1141 signal->setTrace(request->theTrace);
1142 const BlockReference ref = request->theUserReference;
1144 if (request->file->has_buffer())
1146 if ((request->action == Request::open && request->error) ||
1147 request->action == Request::close ||
1148 request->action == Request::closeRemove ||
1149 request->action == Request::buildindx)
1154 request->file->clear_buffer(rg, ptr, cnt);
1155 m_ctx.m_mm.release_pages(rg, ptr.i, cnt);
1159 if (request->error) {
1162 FsRef *
const fsRef = (
FsRef *)&signal->theData[0];
1164 if(request->error & FsRef::FS_ERR_BIT)
1166 fsRef->errorCode = request->error;
1167 fsRef->osErrorCode = 0;
1171 fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
1172 fsRef->osErrorCode = request->error;
1174 switch (request->action) {
1175 case Request:: open: {
1178 pushIdleFile(request->file);
1179 sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
1182 case Request:: closeRemove:
1183 case Request:: close: {
1185 sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
1187 g_eventLogger->
warning(
"Error closing file: %s %u/%u",
1188 request->file->theFileName.c_str(),
1190 fsRef->osErrorCode);
1191 g_eventLogger->
warning(
"Dumping files");
1192 signal->theData[0] = 405;
1193 execDUMP_STATE_ORD(signal);
1196 case Request:: writeSync:
1197 case Request:: writevSync:
1198 case Request:: write:
1199 case Request:: writev: {
1201 sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
1204 case Request:: read:
1205 case Request:: readPartial:
1206 case Request:: readv: {
1208 sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
1211 case Request:: sync: {
1213 sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
1216 case Request::append:
1217 case Request::append_synch:
1220 sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
1223 case Request::rmrf: {
1226 pushIdleFile(request->file);
1227 sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
1231 case Request:: end: {
1232 case Request:: suspend:
1236 case Request::allocmem: {
1239 rep->senderRef = reference();
1240 rep->senderData = request->theUserPointer;
1241 rep->errorCode = request->error;
1242 sendSignal(ref, GSN_ALLOC_MEM_REF, signal,
1243 AllocMemRef::SignalLength, JBB);
1244 pushIdleFile(request->file);
1247 case Request::buildindx: {
1250 rep->senderRef = reference();
1251 rep->senderData = request->theUserPointer;
1252 rep->errorCode = (BuildIndxImplRef::ErrorCode)request->error;
1253 sendSignal(ref, GSN_BUILD_INDX_IMPL_REF, signal,
1254 BuildIndxImplRef::SignalLength, JBB);
1255 pushIdleFile(request->file);
1262 fsConf->userPointer = request->theUserPointer;
1263 switch (request->action) {
1264 case Request:: open: {
1266 theOpenFiles.insert(request->file, request->theFilePointer);
1269 if (theOpenFiles.size() > m_maxOpenedFiles)
1270 m_maxOpenedFiles = theOpenFiles.size();
1272 fsConf->filePointer = request->theFilePointer;
1273 sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBA);
1276 case Request:: closeRemove:
1277 case Request:: close: {
1280 theOpenFiles.erase(request->theFilePointer);
1282 pushIdleFile(request->file);
1283 sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBA);
1286 case Request:: writeSync:
1287 case Request:: writevSync:
1288 case Request:: write:
1289 case Request:: writev: {
1291 sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBA);
1294 case Request:: read:
1295 case Request:: readv: {
1297 sendSignal(ref, GSN_FSREADCONF, signal, 1, JBA);
1300 case Request:: readPartial: {
1302 fsConf->bytes_read = Uint32(request->par.readWrite.pages[0].size);
1303 sendSignal(ref, GSN_FSREADCONF, signal, 2, JBA);
1306 case Request:: sync: {
1308 sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBA);
1311 case Request::append:
1312 case Request::append_synch:
1315 signal->theData[1] = Uint32(request->par.append.size);
1316 sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBA);
1319 case Request::rmrf: {
1322 pushIdleFile(request->file);
1323 sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBA);
1326 case Request:: end: {
1327 case Request:: suspend:
1331 case Request::allocmem: {
1334 conf->senderRef = reference();
1335 conf->senderData = request->theUserPointer;
1336 conf->bytes_hi = Uint32(request->par.alloc.bytes >> 32);
1337 conf->bytes_lo = Uint32(request->par.alloc.bytes);
1338 sendSignal(ref, GSN_ALLOC_MEM_CONF, signal,
1339 AllocMemConf::SignalLength, JBB);
1340 pushIdleFile(request->file);
1343 case Request::buildindx: {
1346 rep->senderRef = reference();
1347 rep->senderData = request->theUserPointer;
1348 sendSignal(ref, GSN_BUILD_INDX_IMPL_CONF, signal,
1349 BuildIndxImplConf::SignalLength, JBB);
1350 pushIdleFile(request->file);
1355 signal->setTrace(orgTrace);
1360 Ndbfs::scanIPC(
Signal* signal)
1362 Request* request = theFromThreads.tryReadChannel();
1366 report(request, signal);
1367 theRequestPool->put(request);
1373 #if defined NDB_WIN32
1374 Uint32 Ndbfs::translateErrno(
int aErrno)
1379 case ERROR_ACCESS_DENIED:
1381 return FsRef::fsErrPermissionDenied;
1383 case ERROR_PATH_BUSY:
1384 case ERROR_NO_MORE_SEARCH_HANDLES:
1386 return FsRef::fsErrTemporaryNotAccessible;
1388 case ERROR_HANDLE_DISK_FULL:
1389 case ERROR_DISK_FULL:
1391 return FsRef::fsErrNoSpaceLeftOnDevice;
1393 case ERROR_INVALID_HANDLE:
1394 case ERROR_INVALID_DRIVE:
1395 case ERROR_INVALID_ACCESS:
1396 case ERROR_HANDLE_EOF:
1397 case ERROR_BUFFER_OVERFLOW:
1399 return FsRef::fsErrInvalidParameters;
1402 case ERROR_ARENA_TRASHED:
1403 case ERROR_BAD_ENVIRONMENT:
1404 case ERROR_INVALID_BLOCK:
1405 case ERROR_WRITE_FAULT:
1406 case ERROR_READ_FAULT:
1407 case ERROR_OPEN_FAILED:
1409 return FsRef::fsErrEnvironmentError;
1412 case ERROR_TOO_MANY_OPEN_FILES:
1413 case ERROR_NOT_ENOUGH_MEMORY:
1414 case ERROR_OUTOFMEMORY:
1415 return FsRef::fsErrNoMoreResources;
1417 case ERROR_FILE_NOT_FOUND:
1418 return FsRef::fsErrFileDoesNotExist;
1420 case ERR_ReadUnderflow:
1421 return FsRef::fsErrReadUnderflow;
1424 return FsRef::fsErrUnknown;
1428 Uint32 Ndbfs::translateErrno(
int aErrno)
1436 return FsRef::fsErrPermissionDenied;
1443 return FsRef::fsErrTemporaryNotAccessible;
1452 return FsRef::fsErrNoSpaceLeftOnDevice;
1462 return FsRef::fsErrInvalidParameters;
1478 return FsRef::fsErrEnvironmentError;
1483 return FsRef::fsErrNoMoreResources;
1486 return FsRef::fsErrFileDoesNotExist;
1488 case ERR_ReadUnderflow:
1489 return FsRef::fsErrReadUnderflow;
1492 return FsRef::fsErrUnknown;
1500 Ndbfs::execCONTINUEB(
Signal* signal)
1503 if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
1508 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
1509 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
1510 if (scanningInProgress ==
true) {
1515 if (scanIPC(signal)) {
1517 scanningInProgress =
true;
1518 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1519 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1522 scanningInProgress =
false;
1528 Ndbfs::execSEND_PACKED(
Signal* signal)
1531 if (scanningInProgress ==
false && scanIPC(signal))
1534 scanningInProgress =
true;
1535 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1536 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1541 Ndbfs::execDUMP_STATE_ORD(
Signal* signal)
1543 if(signal->theData[0] == 19){
1546 if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
1547 infoEvent(
"NDBFS: Files: %d Open files: %d",
1549 theOpenFiles.size());
1550 infoEvent(
" Idle files: %u Max opened files: %d",
1551 theIdleFiles.size(),
1553 infoEvent(
" Bound Threads: %u (active %u) Unbound threads: %u",
1554 m_bound_threads_cnt,
1555 m_active_bound_threads_cnt,
1556 m_unbounds_threads_cnt);
1560 theRequestPool->size());
1564 if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
1565 infoEvent(
"NDBFS: Dump open files: %d", theOpenFiles.size());
1567 for (
unsigned i = 0;
i < theOpenFiles.size();
i++){
1571 file->theFileName.c_str(),
1572 (long)file->getThread());
1576 if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
1577 infoEvent(
"NDBFS: Dump all files: %d", theFiles.size());
1579 for (
unsigned i = 0;
i < theFiles.size();
i++){
1581 infoEvent(
"%2d (0x%lx): %s",
i, (
long)file, file->isOpen()?
"OPEN":
"CLOSED");
1585 if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
1587 theIdleFiles.size());
1589 for (
unsigned i = 0;
i < theIdleFiles.size();
i++){
1591 infoEvent(
"%2d (0x%lx): %s",
i, (
long)file, file->isOpen()?
"OPEN":
"CLOSED");
1597 if(signal->theData[0] == 404)
1600 ndbrequire(signal->getLength() == 2);
1601 Uint32 file= signal->theData[1];
1602 AsyncFile* openFile = theOpenFiles.find(file);
1603 ndbrequire(openFile != 0);
1604 ndbout_c(
"File: %s %p", openFile->theFileName.c_str(), openFile);
1605 Request* curr = openFile->m_current_request;
1606 Request* last = openFile->m_last_request;
1608 ndbout <<
"Current request: " << *curr << endl;
1610 ndbout <<
"Last request: " << *last << endl;
1612 ndbout <<
"theReportTo " << *openFile->theReportTo << endl;
1613 ndbout <<
"theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
1615 ndbout <<
"All files: " << endl;
1616 for (
unsigned i = 0;
i < theFiles.size();
i++){
1618 ndbout_c(
"%2d (0x%lx): %s",
i, (
long) file, file->isOpen()?
"OPEN":
"CLOSED");
1623 if(signal->theData[0] == 405)
1625 for (
unsigned i = 0;
i < theFiles.size();
i++)
1630 ndbout_c(
"%u : %s %s",
i,
1631 file->theFileName.c_str() ? file->theFileName.c_str() :
"",
1632 file->isOpen() ?
"OPEN" :
"CLOSED");
1638 Ndbfs::get_filename(Uint32 fd)
const
1641 const AsyncFile* openFile = theOpenFiles.find(fd);
1643 return openFile->theFileName.get_base_name();
1648 BLOCK_FUNCTIONS(
Ndbfs)
1654 template class
Pool<Request>;
1655 template NdbOut& operator<<(NdbOut&, const
MemoryChannel<Request>&);