MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Ndbfs.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include "Ndbfs.hpp"
21 #include "AsyncFile.hpp"
22 
23 #ifdef NDB_WIN
24 #include "Win32AsyncFile.hpp"
25 #else
26 #include "PosixAsyncFile.hpp"
27 #endif
28 
29 #include <signaldata/FsOpenReq.hpp>
30 #include <signaldata/FsCloseReq.hpp>
31 #include <signaldata/FsReadWriteReq.hpp>
32 #include <signaldata/FsAppendReq.hpp>
33 #include <signaldata/FsRemoveReq.hpp>
34 #include <signaldata/FsConf.hpp>
35 #include <signaldata/FsRef.hpp>
36 #include <signaldata/NdbfsContinueB.hpp>
37 #include <signaldata/DumpStateOrd.hpp>
38 #include <signaldata/AllocMem.hpp>
39 
40 #include <RefConvert.hpp>
41 #include <portlib/NdbDir.hpp>
42 #include <NdbOut.hpp>
43 #include <Configuration.hpp>
44 
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47 
48 NdbMutex g_active_bound_threads_mutex;
49 
50 inline
51 int pageSize( const NewVARIABLE* baseAddrRef )
52 {
53  int log_psize;
54  int log_qsize = baseAddrRef->bits.q;
55  int log_vsize = baseAddrRef->bits.v;
56  if (log_vsize < 3)
57  log_vsize = 3;
58  log_psize = log_qsize + log_vsize - 3;
59  return (1 << log_psize);
60 }
61 
62 Ndbfs::Ndbfs(Block_context& ctx) :
63  SimulatedBlock(NDBFS, ctx),
64  scanningInProgress(false),
65  theLastId(0),
66  theRequestPool(0),
67  m_maxOpenedFiles(0),
68  m_bound_threads_cnt(0),
69  m_unbounds_threads_cnt(0),
70  m_active_bound_threads_cnt(0)
71 {
72  BLOCK_CONSTRUCTOR(Ndbfs);
73 
74  NdbMutex_Init(&g_active_bound_threads_mutex);
75 
76  // Set received signals
77  addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
78  addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD);
79  addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR);
80  addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
81  addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
82  addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
83  addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
84  addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
85  addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
86  addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
87  addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
88  addRecSignal(GSN_ALLOC_MEM_REQ, &Ndbfs::execALLOC_MEM_REQ);
89  addRecSignal(GSN_SEND_PACKED, &Ndbfs::execSEND_PACKED, true);
90  addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Ndbfs::execBUILD_INDX_IMPL_REQ);
91  // Set send signals
92  addRecSignal(GSN_FSSUSPENDORD, &Ndbfs::execFSSUSPENDORD);
93 
94  theRequestPool = new Pool<Request>;
95 }
96 
98 {
107  request.action = Request::end;
108  for (unsigned i = 0; i < theThreads.size(); i++)
109  {
110  theToBoundThreads.writeChannel(&request);
111  theToUnboundThreads.writeChannel(&request);
112  }
113 
114  for (unsigned i = 0; i < theThreads.size(); i++)
115  {
116  AsyncIoThread * thr = theThreads[i];
117  thr->shutdown();
118  }
119 
123  for (unsigned i = 0; i < theThreads.size(); i++)
124  {
125  AsyncIoThread * thr = theThreads[i];
126  delete thr;
127  theThreads[i] = 0;
128  }
129  theThreads.clear();
130 
134  for (unsigned i = 0; i < theFiles.size(); i++){
135  AsyncFile* file = theFiles[i];
136  delete file;
137  theFiles[i] = NULL;
138  }//for
139  theFiles.clear();
140 
141  if (theRequestPool)
142  delete theRequestPool;
143 }
144 
145 static
146 bool
147 do_mkdir(const char * path)
148 {
149  return NdbDir::create(path,
150  NdbDir::u_rwx() | NdbDir::g_r() | NdbDir::g_x(),
151  true /* ignore_existing */);
152 }
153 
154 static
155 void
156 add_path(BaseString& dst, const char * add)
157 {
158  const char * tmp = dst.c_str();
159  unsigned len = dst.length();
160  unsigned dslen = (unsigned)strlen(DIR_SEPARATOR);
161 
162  if (len > dslen && strcmp(tmp+(len - dslen), DIR_SEPARATOR) != 0)
163  dst.append(DIR_SEPARATOR);
164  dst.append(add);
165 }
166 
167 static
168 bool
169 validate_path(BaseString & dst,
170  const char * path)
171 {
172  char buf2[PATH_MAX];
173  memset(buf2, 0,sizeof(buf2));
174 #ifdef NDB_WIN32
175  CreateDirectory(path, 0);
176  char* szFilePart;
177  if(!GetFullPathName(path, sizeof(buf2), buf2, &szFilePart) ||
178  (GetFileAttributes(buf2) & FILE_ATTRIBUTE_READONLY))
179  return false;
180 #else
181  if (::realpath(path, buf2) == NULL ||
182  ::access(buf2, W_OK) != 0)
183  return false;
184 #endif
185  dst.assign(buf2);
186  add_path(dst, "");
187  return true;
188 }
189 
190 const BaseString&
191 Ndbfs::get_base_path(Uint32 no) const
192 {
193  if (no < NDB_ARRAY_SIZE(m_base_path) &&
194  strlen(m_base_path[no].c_str()) > 0)
195  {
196  jam();
197  return m_base_path[no];
198  }
199 
200  return m_base_path[FsOpenReq::BP_FS];
201 }
202 
203 void
205 {
206  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
207 
208  Uint32 ref = req->senderRef;
209  Uint32 senderData = req->senderData;
210 
211  const ndb_mgm_configuration_iterator * p =
212  m_ctx.m_config.getOwnConfigIterator();
213  ndbrequire(p != 0);
214  BaseString tmp;
215  tmp.assfmt("ndb_%u_fs%s", getOwnNodeId(), DIR_SEPARATOR);
216  m_base_path[FsOpenReq::BP_FS].assfmt("%s%s",
217  m_ctx.m_config.fileSystemPath(),
218  tmp.c_str());
219  m_base_path[FsOpenReq::BP_BACKUP].assign(m_ctx.m_config.backupFilePath());
220 
221  const char * ddpath = 0;
222  ndb_mgm_get_string_parameter(p, CFG_DB_DD_FILESYSTEM_PATH, &ddpath);
223 
224  {
225  const char * datapath = ddpath;
226  ndb_mgm_get_string_parameter(p, CFG_DB_DD_DATAFILE_PATH, &datapath);
227  if (datapath)
228  {
234  BaseString path;
235  add_path(path, datapath);
236  do_mkdir(path.c_str());
237  add_path(path, tmp.c_str());
238  do_mkdir(path.c_str());
239  if (!validate_path(m_base_path[FsOpenReq::BP_DD_DF], path.c_str()))
240  {
241  ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
242  m_base_path[FsOpenReq::BP_DD_DF].c_str(),
243  "FileSystemPathDataFiles");
244  }
245  }
246  }
247 
248  {
249  const char * undopath = ddpath;
250  ndb_mgm_get_string_parameter(p, CFG_DB_DD_UNDOFILE_PATH, &undopath);
251  if (undopath)
252  {
258  BaseString path;
259  add_path(path, undopath);
260  do_mkdir(path.c_str());
261  add_path(path, tmp.c_str());
262  do_mkdir(path.c_str());
263 
264  if (!validate_path(m_base_path[FsOpenReq::BP_DD_UF], path.c_str()))
265  {
266  ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
267  m_base_path[FsOpenReq::BP_DD_UF].c_str(),
268  "FileSystemPathUndoFiles");
269  }
270  }
271  }
272 
273  m_maxFiles = 0;
274  ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
275  Uint32 noIdleFiles = 27;
276 
277  ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
278  // Make sure at least "noIdleFiles" files can be created
279  if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
280  m_maxFiles = noIdleFiles;
281 
282  // Create idle AsyncFiles
283  for (Uint32 i = 0; i < noIdleFiles; i++)
284  {
285  theIdleFiles.push_back(createAsyncFile());
286  AsyncIoThread * thr = createIoThread(/* bound */ true);
287  if (thr)
288  {
289  theThreads.push_back(thr);
290  }
291  }
292 
293  Uint32 threadpool = 2;
294  ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
295 
296  // Create IoThreads
297  for (Uint32 i = 0; i < threadpool; i++)
298  {
299  AsyncIoThread * thr = createIoThread(/* bound */ false);
300  if (thr)
301  {
302  jam();
303  theThreads.push_back(thr);
304  }
305  else
306  {
307  jam();
308  break;
309  }
310  }
311 
312  setup_wakeup();
313 
314  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
315  conf->senderRef = reference();
316  conf->senderData = senderData;
317  sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
318  ReadConfigConf::SignalLength, JBB);
319 
320  // start scanning
321  signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
322  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
323 }
324 
325 /* Received a restart signal.
326  * Answer it like any other block
327  * PR0 : StartCase
328  * DR0 : StartPhase
329  * DR1 : ?
330  * DR2 : ?
331  * DR3 : ?
332  * DR4 : ?
333  * DR5 : SignalKey
334  */
335 void
336 Ndbfs::execSTTOR(Signal* signal)
337 {
338  jamEntry();
339 
340  if(signal->theData[1] == 0){ // StartPhase 0
341  jam();
342 
343  do_mkdir(m_base_path[FsOpenReq::BP_FS].c_str());
344 
345  // close all open files
346  ndbrequire(theOpenFiles.size() == 0);
347 
348  signal->theData[3] = 255;
349  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
350  return;
351  }
352  ndbrequire(0);
353 }
354 
355 int
356 Ndbfs::forward( AsyncFile * file, Request* request)
357 {
358  jam();
359  AsyncIoThread* thr = file->getThread();
360  if (thr) // bound
361  {
362  thr->dispatch(request);
363  }
364  else if (request->m_do_bind)
365  {
366  theToBoundThreads.writeChannel(request);
367  }
368  else
369  {
370  theToUnboundThreads.writeChannel(request);
371  }
372  return 1;
373 }
374 
375 void
376 Ndbfs::execFSOPENREQ(Signal* signal)
377 {
378  jamEntry();
379  const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
380  const BlockReference userRef = fsOpenReq->userReference;
381 
382  bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
383  AsyncFile* file = getIdleFile(bound);
384  ndbrequire(file != NULL);
385 
386  Uint32 userPointer = fsOpenReq->userPointer;
387 
388 
389  SectionHandle handle(this, signal);
390  SegmentedSectionPtr ptr; ptr.setNull();
391  if (handle.m_cnt)
392  {
393  jam();
394  handle.getSection(ptr, FsOpenReq::FILENAME);
395  }
396  file->theFileName.set(this, userRef, fsOpenReq->fileNumber, false, ptr);
397  releaseSections(handle);
398 
399  if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
400  {
401  jam();
402  Uint32 cnt = 16; // 512k
403  Ptr<GlobalPage> page_ptr;
404  m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &page_ptr.i, &cnt, 1);
405  if(cnt == 0)
406  {
407  file->m_page_ptr.setNull();
408  file->m_page_cnt = 0;
409 
410  FsRef * const fsRef = (FsRef *)&signal->theData[0];
411  fsRef->userPointer = userPointer;
412  fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
413  fsRef->osErrorCode = ~0; // Indicate local error
414  sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
415  return;
416  }
417  m_shared_page_pool.getPtr(page_ptr);
418  file->set_buffer(RT_DBTUP_PAGE, page_ptr, cnt);
419  }
420  else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
421  {
422  jam();
423  Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
424  Ptr<GlobalPage> page_ptr;
425  m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
426  if (cnt == 0)
427  {
428  jam();
429  file->m_page_ptr.setNull();
430  file->m_page_cnt = 0;
431 
432  FsRef * const fsRef = (FsRef *)&signal->theData[0];
433  fsRef->userPointer = userPointer;
434  fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
435  fsRef->osErrorCode = ~0; // Indicate local error
436  sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
437  return;
438  }
439  m_shared_page_pool.getPtr(page_ptr);
440  file->set_buffer(RT_FILE_BUFFER, page_ptr, cnt);
441  }
442  else
443  {
444  ndbassert(file->m_page_ptr.isNull());
445  file->m_page_ptr.setNull();
446  file->m_page_cnt = 0;
447  }
448 
449  if (getenv("NDB_TRACE_OPEN"))
450  ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
451 
452  Request* request = theRequestPool->get();
453  request->action = Request::open;
454  request->error = 0;
455  request->set(userRef, userPointer, newId() );
456  request->file = file;
457  request->theTrace = signal->getTrace();
458  request->par.open.flags = fsOpenReq->fileFlags;
459  request->par.open.page_size = fsOpenReq->page_size;
460  request->par.open.file_size = fsOpenReq->file_size_hi;
461  request->par.open.file_size <<= 32;
462  request->par.open.file_size |= fsOpenReq->file_size_lo;
463  request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
464  request->m_do_bind = bound;
465 
466  ndbrequire(forward(file, request));
467 }
468 
469 void
470 Ndbfs::execFSREMOVEREQ(Signal* signal)
471 {
472  jamEntry();
473  const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
474  const BlockReference userRef = req->userReference;
475  bool bound = true;
476  AsyncFile* file = getIdleFile(bound);
477  ndbrequire(file != NULL);
478 
479  SectionHandle handle(this, signal);
480  SegmentedSectionPtr ptr; ptr.setNull();
481  if(handle.m_cnt)
482  {
483  jam();
484  handle.getSection(ptr, FsOpenReq::FILENAME);
485  }
486 
487  file->theFileName.set(this, userRef, req->fileNumber, req->directory, ptr);
488  releaseSections(handle);
489 
490  Uint32 version = FsOpenReq::getVersion(req->fileNumber);
491  Uint32 bp = FsOpenReq::v5_getLcpNo(req->fileNumber);
492 
493  Request* request = theRequestPool->get();
494  request->action = Request::rmrf;
495  request->par.rmrf.directory = req->directory;
496  request->par.rmrf.own_directory = req->ownDirectory;
497  request->error = 0;
498  request->set(userRef, req->userPointer, newId() );
499  request->file = file;
500  request->theTrace = signal->getTrace();
501  request->m_do_bind = bound;
502 
503  if (version == 6)
504  {
505  ndbrequire(bp < NDB_ARRAY_SIZE(m_base_path));
506  if (strlen(m_base_path[bp].c_str()) == 0)
507  {
508  goto ignore;
509  }
510  }
511 
512  ndbrequire(forward(file, request));
513  return;
514 ignore:
515  report(request, signal);
516 }
517 
518 /*
519  * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
520  * remove file
521  */
522 void
523 Ndbfs::execFSCLOSEREQ(Signal * signal)
524 {
525  jamEntry();
526  const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
527  const BlockReference userRef = fsCloseReq->userReference;
528  const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
529  const UintR userPointer = fsCloseReq->userPointer;
530 
531  AsyncFile* openFile = theOpenFiles.find(filePointer);
532  if (openFile == NULL) {
533  // The file was not open, send error back to sender
534  jam();
535  // Initialise FsRef signal
536  FsRef * const fsRef = (FsRef *)&signal->theData[0];
537  fsRef->userPointer = userPointer;
538  fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
539  fsRef->osErrorCode = ~0; // Indicate local error
540  sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
541 
542  g_eventLogger->warning("Trying to close unknown file!! %u", userPointer);
543  g_eventLogger->warning("Dumping files");
544  signal->theData[0] = 405;
545  execDUMP_STATE_ORD(signal);
546  return;
547  }
548 
549  if (getenv("NDB_TRACE_OPEN"))
550  ndbout_c("close(%s)", openFile->theFileName.c_str());
551 
552  Request *request = theRequestPool->get();
553  if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
554  jam();
555  request->action = Request::closeRemove;
556  } else {
557  jam();
558  request->action = Request::close;
559  }
560  request->set(userRef, fsCloseReq->userPointer, filePointer);
561  request->file = openFile;
562  request->error = 0;
563  request->theTrace = signal->getTrace();
564  request->m_do_bind = false;
565 
566  ndbrequire(forward(openFile, request));
567 }
568 
569 void
570 Ndbfs::readWriteRequest(int action, Signal * signal)
571 {
572  Uint32 theData[25 + 2 * 32];
573  memcpy(theData, signal->theData, 4 * signal->getLength());
574  SectionHandle handle(this, signal);
575  if (handle.m_cnt > 0)
576  {
577  SegmentedSectionPtr secPtr;
578  ndbrequire(handle.getSection(secPtr, 0));
579  ndbrequire(signal->getLength() + secPtr.sz < NDB_ARRAY_SIZE(theData));
580  copy(theData + signal->getLength(), secPtr);
581  releaseSections(handle);
582  }
583 
584  const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)theData;
585  Uint16 filePointer = (Uint16)fsRWReq->filePointer;
586  const UintR userPointer = fsRWReq->userPointer;
587  const BlockReference userRef = fsRWReq->userReference;
588  const BlockNumber blockNumber = refToMain(userRef);
589  const Uint32 instanceNumber = refToInstance(userRef);
590 
591  AsyncFile* openFile = theOpenFiles.find(filePointer);
592 
593  const NewVARIABLE *myBaseAddrRef =
594  &getBat(blockNumber, instanceNumber)[fsRWReq->varIndex];
595  UintPtr tPageSize;
596  UintPtr tClusterSize;
597  UintPtr tNRR;
598  UintPtr tPageOffset;
599  char* tWA;
600  FsRef::NdbfsErrorCodeType errorCode;
601 
602  Request *request = theRequestPool->get();
603  request->error = 0;
604  request->set(userRef, userPointer, filePointer);
605  request->file = openFile;
606  request->action = (Request::Action) action;
607  request->theTrace = signal->getTrace();
608  request->m_do_bind = false;
609 
610  Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
611 
612  if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
613  jam();
614  errorCode = FsRef::fsErrInvalidParameters;
615  goto error;
616  }
617 
618  if(format != FsReadWriteReq::fsFormatGlobalPage &&
619  format != FsReadWriteReq::fsFormatSharedPage)
620  {
621  if (fsRWReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
622  jam();// Ensure that a valid variable is used
623  errorCode = FsRef::fsErrInvalidParameters;
624  goto error;
625  }
626  if (myBaseAddrRef == NULL) {
627  jam(); // Ensure that a valid variable is used
628  errorCode = FsRef::fsErrInvalidParameters;
629  goto error;
630  }
631  if (openFile == NULL) {
632  jam(); //file not open
633  errorCode = FsRef::fsErrFileDoesNotExist;
634  goto error;
635  }
636  tPageSize = pageSize(myBaseAddrRef);
637  tClusterSize = myBaseAddrRef->ClusterSize;
638  tNRR = myBaseAddrRef->nrr;
639  tWA = (char*)myBaseAddrRef->WA;
640 
641  switch (format) {
642 
643  // List of memory and file pages pairs
644  case FsReadWriteReq::fsFormatListOfPairs: {
645  jam();
646  for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
647  jam();
648  const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;
649  const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
650  if (varIndex >= tNRR) {
651  jam();
652  errorCode = FsRef::fsErrInvalidParameters;
653  goto error;
654  }//if
655  request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
656  request->par.readWrite.pages[i].size = tPageSize;
657  request->par.readWrite.pages[i].offset = (off_t)(fileOffset*tPageSize);
658  }//for
659  request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
660  break;
661  }//case
662 
663  // Range of memory page with one file page
664  case FsReadWriteReq::fsFormatArrayOfPages: {
665  if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
666  jam();
667  errorCode = FsRef::fsErrInvalidParameters;
668  goto error;
669  }//if
670  const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
671  const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
672 
673  request->par.readWrite.pages[0].offset = (off_t)(fileOffset * tPageSize);
674  request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
675  request->par.readWrite.numberOfPages = 1;
676  request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
677  break;
678  }//case
679 
680  // List of memory pages followed by one file page
681  case FsReadWriteReq::fsFormatListOfMemPages: {
682 
683  tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
684  tPageOffset *= tPageSize;
685 
686  for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
687  jam();
688  UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
689 
690  if (varIndex >= tNRR) {
691  jam();
692  errorCode = FsRef::fsErrInvalidParameters;
693  goto error;
694  }//if
695  request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
696  request->par.readWrite.pages[i].size = tPageSize;
697  request->par.readWrite.pages[i].offset = (off_t)
698  (tPageOffset + (i*tPageSize));
699  }//for
700  request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
701  break;
702  // make it a writev or readv
703  }//case
704 
705  default: {
706  jam();
707  errorCode = FsRef::fsErrInvalidParameters;
708  goto error;
709  }//default
710  }//switch
711  }
712  else if (format == FsReadWriteReq::fsFormatGlobalPage)
713  {
714  Ptr<GlobalPage> ptr;
715  m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
716  request->par.readWrite.pages[0].buf = (char*)ptr.p;
717  request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
718  request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
719  request->par.readWrite.numberOfPages = 1;
720  }
721  else
722  {
723  ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
724  Ptr<GlobalPage> ptr;
725  m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
726  request->par.readWrite.pages[0].buf = (char*)ptr.p;
727  request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
728  request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
729  request->par.readWrite.numberOfPages = 1;
730  }
731 
732  ndbrequire(forward(openFile, request));
733  return;
734 
735 error:
736  theRequestPool->put(request);
737  FsRef * const fsRef = (FsRef *)&signal->theData[0];
738  fsRef->userPointer = userPointer;
739  fsRef->setErrorCode(fsRef->errorCode, errorCode);
740  fsRef->osErrorCode = ~0; // Indicate local error
741  switch (action) {
742  case Request:: write:
743  case Request:: writeSync: {
744  jam();
745  sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
746  break;
747  }//case
748  case Request:: readPartial:
749  case Request:: read: {
750  jam();
751  sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
752  }//case
753  }//switch
754  return;
755 }
756 
757 /*
758  PR0: File Pointer , theData[0]
759  DR0: User reference, theData[1]
760  DR1: User Pointer, etc.
761  DR2: Flag
762  DR3: Var number
763  DR4: amount of pages
764  DR5->: Memory Page id and File page id according to Flag
765 */
766 void
767 Ndbfs::execFSWRITEREQ(Signal* signal)
768 {
769  jamEntry();
770  const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
771 
772  if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
773  jam();
774  readWriteRequest( Request::writeSync, signal );
775  } else {
776  jam();
777  readWriteRequest( Request::write, signal );
778  }
779 }
780 
781 /*
782  PR0: File Pointer
783  DR0: User reference
784  DR1: User Pointer
785  DR2: Flag
786  DR3: Var number
787  DR4: amount of pages
788  DR5->: Memory Page id and File page id according to Flag
789 */
790 void
791 Ndbfs::execFSREADREQ(Signal* signal)
792 {
793  jamEntry();
794  FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr();
795  if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
796  readWriteRequest( Request::readPartial, signal );
797  else
798  readWriteRequest( Request::read, signal );
799 }
800 
801 /*
802  * PR0: File Pointer DR0: User reference DR1: User Pointer
803  */
804 void
805 Ndbfs::execFSSYNCREQ(Signal * signal)
806 {
807  jamEntry();
808  Uint16 filePointer = (Uint16)signal->theData[0];
809  BlockReference userRef = signal->theData[1];
810  const UintR userPointer = signal->theData[2];
811  AsyncFile* openFile = theOpenFiles.find(filePointer);
812 
813  if (openFile == NULL) {
814  jam(); //file not open
815  FsRef * const fsRef = (FsRef *)&signal->theData[0];
816  fsRef->userPointer = userPointer;
817  fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
818  fsRef->osErrorCode = ~0; // Indicate local error
819  sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
820  return;
821  }
822 
823  Request *request = theRequestPool->get();
824  request->error = 0;
825  request->action = Request::sync;
826  request->set(userRef, userPointer, filePointer);
827  request->file = openFile;
828  request->theTrace = signal->getTrace();
829  request->m_do_bind = false;
830 
831  ndbrequire(forward(openFile,request));
832 }
833 
834 /*
835  * PR0: File Pointer DR0: User reference DR1: User Pointer
836  */
837 void
838 Ndbfs::execFSSUSPENDORD(Signal * signal)
839 {
840  jamEntry();
841  Uint16 filePointer = (Uint16)signal->theData[0];
842  Uint32 millis = signal->theData[1];
843  AsyncFile* openFile = theOpenFiles.find(filePointer);
844 
845  if (openFile == NULL)
846  {
847  jam(); //file not open
848  return;
849  }
850 
851  Request *request = theRequestPool->get();
852  request->error = 0;
853  request->action = Request::suspend;
854  request->set(0, 0, filePointer);
855  request->file = openFile;
856  request->theTrace = signal->getTrace();
857  request->par.suspend.milliseconds = millis;
858  request->m_do_bind = false;
859 
860  ndbrequire(forward(openFile,request));
861 }
862 
863 void
864 Ndbfs::execFSAPPENDREQ(Signal * signal)
865 {
866  const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
867  const Uint16 filePointer = (Uint16)fsReq->filePointer;
868  const UintR userPointer = fsReq->userPointer;
869  const BlockReference userRef = fsReq->userReference;
870  const BlockNumber blockNumber = refToMain(userRef);
871  const Uint32 instanceNumber = refToInstance(userRef);
872 
873  FsRef::NdbfsErrorCodeType errorCode;
874 
875  AsyncFile* openFile = theOpenFiles.find(filePointer);
876  const NewVARIABLE *myBaseAddrRef =
877  &getBat(blockNumber, instanceNumber)[fsReq->varIndex];
878 
879  const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA;
880  const Uint32 tSz = myBaseAddrRef->nrr;
881  const Uint32 offset = fsReq->offset;
882  const Uint32 size = fsReq->size;
883  const Uint32 synch_flag = fsReq->synch_flag;
884  Request *request = theRequestPool->get();
885 
886  if (openFile == NULL) {
887  jam();
888  errorCode = FsRef::fsErrFileDoesNotExist;
889  goto error;
890  }
891 
892  if (myBaseAddrRef == NULL) {
893  jam(); // Ensure that a valid variable is used
894  errorCode = FsRef::fsErrInvalidParameters;
895  goto error;
896  }
897 
898  if (fsReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
899  jam();// Ensure that a valid variable is used
900  errorCode = FsRef::fsErrInvalidParameters;
901  goto error;
902  }
903 
904  if(offset + size > tSz){
905  jam(); // Ensure that a valid variable is used
906  errorCode = FsRef::fsErrInvalidParameters;
907  goto error;
908  }
909 
910  request->error = 0;
911  request->set(userRef, userPointer, filePointer);
912  request->file = openFile;
913  request->theTrace = signal->getTrace();
914 
915  request->par.append.buf = (const char *)(tWA + offset);
916  request->par.append.size = size << 2;
917 
918  if (!synch_flag)
919  request->action = Request::append;
920  else
921  request->action = Request::append_synch;
922  request->m_do_bind = false;
923  ndbrequire(forward(openFile, request));
924  return;
925 
926 error:
927  jam();
928  theRequestPool->put(request);
929  FsRef * const fsRef = (FsRef *)&signal->theData[0];
930  fsRef->userPointer = userPointer;
931  fsRef->setErrorCode(fsRef->errorCode, errorCode);
932  fsRef->osErrorCode = ~0; // Indicate local error
933 
934  jam();
935  sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
936  return;
937 }
938 
939 void
940 Ndbfs::execALLOC_MEM_REQ(Signal* signal)
941 {
942  jamEntry();
943 
944  AllocMemReq* req = (AllocMemReq*)signal->getDataPtr();
945 
946  bool bound = true;
947  AsyncFile* file = getIdleFile(bound);
948  ndbrequire(file != NULL);
949 
950  Request *request = theRequestPool->get();
951 
952  request->error = 0;
953  request->set(req->senderRef, req->senderData, 0);
954  request->file = file;
955  request->theTrace = signal->getTrace();
956 
957  request->par.alloc.ctx = &m_ctx;
958  request->par.alloc.requestInfo = req->requestInfo;
959  request->par.alloc.bytes = (Uint64(req->bytes_hi) << 32) + req->bytes_lo;
960  request->action = Request::allocmem;
961  request->m_do_bind = bound;
962  ndbrequire(forward(file, request));
963 }
964 
965 #include <signaldata/BuildIndxImpl.hpp>
966 
967 void
968 Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* signal)
969 {
970  jamEntry();
971  mt_BuildIndxReq * req = (mt_BuildIndxReq*)signal->getDataPtr();
972 
973  bool bound = true;
974  AsyncFile* file = getIdleFile(bound);
975  ndbrequire(file != NULL);
976 
977  Request *request = theRequestPool->get();
978  request->error = 0;
979  request->set(req->senderRef, req->senderData, 0);
980  request->file = file;
981  request->theTrace = signal->getTrace();
982 
983  Uint32 cnt = (req->buffer_size + 32768 - 1) / 32768;
984  Uint32 save = cnt;
985  Ptr<GlobalPage> page_ptr;
986  m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &page_ptr.i, &cnt, cnt);
987  if(cnt == 0)
988  {
989  file->m_page_ptr.setNull();
990  file->m_page_cnt = 0;
991 
992  ndbrequire(false); // TODO
993  return;
994  }
995 
996  ndbrequire(cnt == save);
997 
998  m_shared_page_pool.getPtr(page_ptr);
999  file->set_buffer(RT_DBTUP_PAGE, page_ptr, cnt);
1000 
1001  memcpy(&request->par.build.m_req, req, sizeof(* req));
1002  request->action = Request::buildindx;
1003  request->m_do_bind = bound;
1004  ndbrequire(forward(file, request));
1005 }
1006 
1007 Uint16
1008 Ndbfs::newId()
1009 {
1010  // finds a new key, eg a new filepointer
1011  for (int i = 1; i < SHRT_MAX; i++)
1012  {
1013  if (theLastId == SHRT_MAX) {
1014  jam();
1015  theLastId = 1;
1016  } else {
1017  jam();
1018  theLastId++;
1019  }
1020 
1021  if(theOpenFiles.find(theLastId) == NULL) {
1022  jam();
1023  return theLastId;
1024  }
1025  }
1026  ndbrequire(1 == 0);
1027  // The program will not reach this point
1028  return 0;
1029 }
1030 
1031 AsyncFile*
1032 Ndbfs::createAsyncFile()
1033 {
1034  // Check limit of open files
1035  if (m_maxFiles !=0 && theFiles.size() == m_maxFiles)
1036  {
1037  // Print info about all open files
1038  for (unsigned i = 0; i < theFiles.size(); i++){
1039  AsyncFile* file = theFiles[i];
1040  ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
1041  }
1042  ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
1043  }
1044 
1045 #ifdef NDB_WIN
1046  AsyncFile* file = new Win32AsyncFile(* this);
1047 #else
1048  AsyncFile* file = new PosixAsyncFile(* this);
1049 #endif
1050 
1051  if (file->init())
1052  {
1053  ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
1054  }
1055 
1056  theFiles.push_back(file);
1057  return file;
1058 }
1059 
1060 void
1061 Ndbfs::pushIdleFile(AsyncFile* file)
1062 {
1063  assert(file->getThread() == 0);
1064  theIdleFiles.push_back(file);
1065 }
1066 
1068 Ndbfs::createIoThread(bool bound)
1069 {
1070  AsyncIoThread* thr = new AsyncIoThread(*this, bound);
1071  if (thr)
1072  {
1073 #ifdef VM_TRACE
1074  ndbout_c("NDBFS: Created new file thread %d", theThreads.size());
1075 #endif
1076 
1077  struct NdbThread* thrptr = thr->doStart();
1078  globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
1079 
1080  if (bound)
1081  m_bound_threads_cnt++;
1082  else
1083  m_unbounds_threads_cnt++;
1084  }
1085 
1086  return thr;
1087 }
1088 
1089 AsyncFile*
1090 Ndbfs::getIdleFile(bool bound)
1091 {
1092  AsyncFile* file = 0;
1093  Uint32 sz = theIdleFiles.size();
1094  if (sz)
1095  {
1096  file = theIdleFiles[sz - 1];
1097  theIdleFiles.erase(sz - 1);
1098  }
1099  else
1100  {
1101  file = createAsyncFile();
1102  }
1103 
1104  if (bound)
1105  {
1109  if (m_active_bound_threads_cnt == m_bound_threads_cnt)
1110  {
1111  AsyncIoThread * thr = createIoThread(true);
1112  if (thr)
1113  {
1114  theThreads.push_back(thr);
1115  }
1116  }
1117  }
1118  return file;
1119 }
1120 
1121 void
1122 Ndbfs::cnt_active_bound(int val)
1123 {
1124  Guard g(&g_active_bound_threads_mutex);
1125  if (val < 0)
1126  {
1127  val = -val;
1128  assert(m_active_bound_threads_cnt >= (Uint32)val);
1129  m_active_bound_threads_cnt -= val;
1130  }
1131  else
1132  {
1133  m_active_bound_threads_cnt += val;
1134  }
1135 }
1136 
1137 void
1138 Ndbfs::report(Request * request, Signal* signal)
1139 {
1140  const Uint32 orgTrace = signal->getTrace();
1141  signal->setTrace(request->theTrace);
1142  const BlockReference ref = request->theUserReference;
1143 
1144  if (request->file->has_buffer())
1145  {
1146  if ((request->action == Request::open && request->error) ||
1147  request->action == Request::close ||
1148  request->action == Request::closeRemove ||
1149  request->action == Request::buildindx)
1150  {
1151  Uint32 rg;
1152  Uint32 cnt;
1153  Ptr<GlobalPage> ptr;
1154  request->file->clear_buffer(rg, ptr, cnt);
1155  m_ctx.m_mm.release_pages(rg, ptr.i, cnt);
1156  }
1157  }
1158 
1159  if (request->error) {
1160  jam();
1161  // Initialise FsRef signal
1162  FsRef * const fsRef = (FsRef *)&signal->theData[0];
1163  fsRef->userPointer = request->theUserPointer;
1164  if(request->error & FsRef::FS_ERR_BIT)
1165  {
1166  fsRef->errorCode = request->error;
1167  fsRef->osErrorCode = 0;
1168  }
1169  else
1170  {
1171  fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
1172  fsRef->osErrorCode = request->error;
1173  }
1174  switch (request->action) {
1175  case Request:: open: {
1176  jam();
1177  // Put the file back in idle files list
1178  pushIdleFile(request->file);
1179  sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
1180  break;
1181  }
1182  case Request:: closeRemove:
1183  case Request:: close: {
1184  jam();
1185  sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
1186 
1187  g_eventLogger->warning("Error closing file: %s %u/%u",
1188  request->file->theFileName.c_str(),
1189  fsRef->errorCode,
1190  fsRef->osErrorCode);
1191  g_eventLogger->warning("Dumping files");
1192  signal->theData[0] = 405;
1193  execDUMP_STATE_ORD(signal);
1194  break;
1195  }
1196  case Request:: writeSync:
1197  case Request:: writevSync:
1198  case Request:: write:
1199  case Request:: writev: {
1200  jam();
1201  sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
1202  break;
1203  }
1204  case Request:: read:
1205  case Request:: readPartial:
1206  case Request:: readv: {
1207  jam();
1208  sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
1209  break;
1210  }
1211  case Request:: sync: {
1212  jam();
1213  sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
1214  break;
1215  }
1216  case Request::append:
1217  case Request::append_synch:
1218  {
1219  jam();
1220  sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
1221  break;
1222  }
1223  case Request::rmrf: {
1224  jam();
1225  // Put the file back in idle files list
1226  pushIdleFile(request->file);
1227  sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
1228  break;
1229  }
1230 
1231  case Request:: end: {
1232  case Request:: suspend:
1233  // Report nothing
1234  break;
1235  }
1236  case Request::allocmem: {
1237  jam();
1238  AllocMemRef* rep = (AllocMemRef*)signal->getDataPtrSend();
1239  rep->senderRef = reference();
1240  rep->senderData = request->theUserPointer;
1241  rep->errorCode = request->error;
1242  sendSignal(ref, GSN_ALLOC_MEM_REF, signal,
1243  AllocMemRef::SignalLength, JBB);
1244  pushIdleFile(request->file);
1245  break;
1246  }
1247  case Request::buildindx: {
1248  jam();
1249  BuildIndxImplRef* rep = (BuildIndxImplRef*)signal->getDataPtrSend();
1250  rep->senderRef = reference();
1251  rep->senderData = request->theUserPointer;
1252  rep->errorCode = (BuildIndxImplRef::ErrorCode)request->error;
1253  sendSignal(ref, GSN_BUILD_INDX_IMPL_REF, signal,
1254  BuildIndxImplRef::SignalLength, JBB);
1255  pushIdleFile(request->file);
1256  break;
1257  }
1258  }//switch
1259  } else {
1260  jam();
1261  FsConf * const fsConf = (FsConf *)&signal->theData[0];
1262  fsConf->userPointer = request->theUserPointer;
1263  switch (request->action) {
1264  case Request:: open: {
1265  jam();
1266  theOpenFiles.insert(request->file, request->theFilePointer);
1267 
1268  // Keep track on max number of opened files
1269  if (theOpenFiles.size() > m_maxOpenedFiles)
1270  m_maxOpenedFiles = theOpenFiles.size();
1271 
1272  fsConf->filePointer = request->theFilePointer;
1273  sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBA);
1274  break;
1275  }
1276  case Request:: closeRemove:
1277  case Request:: close: {
1278  jam();
1279  // removes the file from OpenFiles list
1280  theOpenFiles.erase(request->theFilePointer);
1281  // Put the file in idle files list
1282  pushIdleFile(request->file);
1283  sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBA);
1284  break;
1285  }
1286  case Request:: writeSync:
1287  case Request:: writevSync:
1288  case Request:: write:
1289  case Request:: writev: {
1290  jam();
1291  sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBA);
1292  break;
1293  }
1294  case Request:: read:
1295  case Request:: readv: {
1296  jam();
1297  sendSignal(ref, GSN_FSREADCONF, signal, 1, JBA);
1298  break;
1299  }
1300  case Request:: readPartial: {
1301  jam();
1302  fsConf->bytes_read = Uint32(request->par.readWrite.pages[0].size);
1303  sendSignal(ref, GSN_FSREADCONF, signal, 2, JBA);
1304  break;
1305  }
1306  case Request:: sync: {
1307  jam();
1308  sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBA);
1309  break;
1310  }//case
1311  case Request::append:
1312  case Request::append_synch:
1313  {
1314  jam();
1315  signal->theData[1] = Uint32(request->par.append.size);
1316  sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBA);
1317  break;
1318  }
1319  case Request::rmrf: {
1320  jam();
1321  // Put the file in idle files list
1322  pushIdleFile(request->file);
1323  sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBA);
1324  break;
1325  }
1326  case Request:: end: {
1327  case Request:: suspend:
1328  // Report nothing
1329  break;
1330  }
1331  case Request::allocmem: {
1332  jam();
1333  AllocMemConf* conf = (AllocMemConf*)signal->getDataPtrSend();
1334  conf->senderRef = reference();
1335  conf->senderData = request->theUserPointer;
1336  conf->bytes_hi = Uint32(request->par.alloc.bytes >> 32);
1337  conf->bytes_lo = Uint32(request->par.alloc.bytes);
1338  sendSignal(ref, GSN_ALLOC_MEM_CONF, signal,
1339  AllocMemConf::SignalLength, JBB);
1340  pushIdleFile(request->file);
1341  break;
1342  }
1343  case Request::buildindx: {
1344  jam();
1345  BuildIndxImplConf* rep = (BuildIndxImplConf*)signal->getDataPtrSend();
1346  rep->senderRef = reference();
1347  rep->senderData = request->theUserPointer;
1348  sendSignal(ref, GSN_BUILD_INDX_IMPL_CONF, signal,
1349  BuildIndxImplConf::SignalLength, JBB);
1350  pushIdleFile(request->file);
1351  break;
1352  }
1353  }
1354  }//if
1355  signal->setTrace(orgTrace);
1356 }
1357 
1358 
1359 bool
1360 Ndbfs::scanIPC(Signal* signal)
1361 {
1362  Request* request = theFromThreads.tryReadChannel();
1363  jam();
1364  if (request) {
1365  jam();
1366  report(request, signal);
1367  theRequestPool->put(request);
1368  return true;
1369  }
1370  return false;
1371 }
1372 
1373 #if defined NDB_WIN32
1374 Uint32 Ndbfs::translateErrno(int aErrno)
1375 {
1376  switch (aErrno)
1377  {
1378  //permission denied
1379  case ERROR_ACCESS_DENIED:
1380 
1381  return FsRef::fsErrPermissionDenied;
1382  //temporary not accessible
1383  case ERROR_PATH_BUSY:
1384  case ERROR_NO_MORE_SEARCH_HANDLES:
1385 
1386  return FsRef::fsErrTemporaryNotAccessible;
1387  //no space left on device
1388  case ERROR_HANDLE_DISK_FULL:
1389  case ERROR_DISK_FULL:
1390 
1391  return FsRef::fsErrNoSpaceLeftOnDevice;
1392  //none valid parameters
1393  case ERROR_INVALID_HANDLE:
1394  case ERROR_INVALID_DRIVE:
1395  case ERROR_INVALID_ACCESS:
1396  case ERROR_HANDLE_EOF:
1397  case ERROR_BUFFER_OVERFLOW:
1398 
1399  return FsRef::fsErrInvalidParameters;
1400  //environment error
1401  case ERROR_CRC:
1402  case ERROR_ARENA_TRASHED:
1403  case ERROR_BAD_ENVIRONMENT:
1404  case ERROR_INVALID_BLOCK:
1405  case ERROR_WRITE_FAULT:
1406  case ERROR_READ_FAULT:
1407  case ERROR_OPEN_FAILED:
1408 
1409  return FsRef::fsErrEnvironmentError;
1410 
1411  //no more process resources
1412  case ERROR_TOO_MANY_OPEN_FILES:
1413  case ERROR_NOT_ENOUGH_MEMORY:
1414  case ERROR_OUTOFMEMORY:
1415  return FsRef::fsErrNoMoreResources;
1416  //no file
1417  case ERROR_FILE_NOT_FOUND:
1418  return FsRef::fsErrFileDoesNotExist;
1419 
1420  case ERR_ReadUnderflow:
1421  return FsRef::fsErrReadUnderflow;
1422 
1423  default:
1424  return FsRef::fsErrUnknown;
1425  }
1426 }
1427 #else
1428 Uint32 Ndbfs::translateErrno(int aErrno)
1429 {
1430  switch (aErrno)
1431  {
1432  //permission denied
1433  case EACCES:
1434  case EROFS:
1435  case ENXIO:
1436  return FsRef::fsErrPermissionDenied;
1437  //temporary not accessible
1438  case EAGAIN:
1439  case ETIMEDOUT:
1440  case ENOLCK:
1441  case EINTR:
1442  case EIO:
1443  return FsRef::fsErrTemporaryNotAccessible;
1444  //no space left on device
1445  case ENFILE:
1446  case EDQUOT:
1447 #ifdef ENOSR
1448  case ENOSR:
1449 #endif
1450  case ENOSPC:
1451  case EFBIG:
1452  return FsRef::fsErrNoSpaceLeftOnDevice;
1453  //none valid parameters
1454  case EINVAL:
1455  case EBADF:
1456  case ENAMETOOLONG:
1457  case EFAULT:
1458  case EISDIR:
1459  case ENOTDIR:
1460  case EEXIST:
1461  case ETXTBSY:
1462  return FsRef::fsErrInvalidParameters;
1463  //environment error
1464  case ELOOP:
1465 #ifdef ENOLINK
1466  case ENOLINK:
1467 #endif
1468 #ifdef EMULTIHOP
1469  case EMULTIHOP:
1470 #endif
1471 #ifdef EOPNOTSUPP
1472  case EOPNOTSUPP:
1473 #endif
1474 #ifdef ESPIPE
1475  case ESPIPE:
1476 #endif
1477  case EPIPE:
1478  return FsRef::fsErrEnvironmentError;
1479 
1480  //no more process resources
1481  case EMFILE:
1482  case ENOMEM:
1483  return FsRef::fsErrNoMoreResources;
1484  //no file
1485  case ENOENT:
1486  return FsRef::fsErrFileDoesNotExist;
1487 
1488  case ERR_ReadUnderflow:
1489  return FsRef::fsErrReadUnderflow;
1490 
1491  default:
1492  return FsRef::fsErrUnknown;
1493  }
1494 }
1495 #endif
1496 
1497 
1498 
1499 void
1500 Ndbfs::execCONTINUEB(Signal* signal)
1501 {
1502  jamEntry();
1503  if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
1504  jam();
1505 
1506  // Also send CONTINUEB to ourself in order to scan for
1507  // incoming answers from AsyncFile on MemoryChannel theFromThreads
1508  signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
1509  sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
1510  if (scanningInProgress == true) {
1511  jam();
1512  return;
1513  }
1514  }
1515  if (scanIPC(signal)) {
1516  jam();
1517  scanningInProgress = true;
1518  signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1519  sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1520  } else {
1521  jam();
1522  scanningInProgress = false;
1523  }
1524  return;
1525 }
1526 
1527 void
1528 Ndbfs::execSEND_PACKED(Signal* signal)
1529 {
1530  jamEntry();
1531  if (scanningInProgress == false && scanIPC(signal))
1532  {
1533  jam();
1534  scanningInProgress = true;
1535  signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1536  sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1537  }
1538 }
1539 
1540 void
1541 Ndbfs::execDUMP_STATE_ORD(Signal* signal)
1542 {
1543  if(signal->theData[0] == 19){
1544  return;
1545  }
1546  if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
1547  infoEvent("NDBFS: Files: %d Open files: %d",
1548  theFiles.size(),
1549  theOpenFiles.size());
1550  infoEvent(" Idle files: %u Max opened files: %d",
1551  theIdleFiles.size(),
1552  m_maxOpenedFiles);
1553  infoEvent(" Bound Threads: %u (active %u) Unbound threads: %u",
1554  m_bound_threads_cnt,
1555  m_active_bound_threads_cnt,
1556  m_unbounds_threads_cnt);
1557  infoEvent(" Max files: %d",
1558  m_maxFiles);
1559  infoEvent(" Requests: %d",
1560  theRequestPool->size());
1561 
1562  return;
1563  }
1564  if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
1565  infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
1566 
1567  for (unsigned i = 0; i < theOpenFiles.size(); i++){
1568  AsyncFile* file = theOpenFiles.getFile(i);
1569  infoEvent("%2d (0x%lx): %s thr: %lx", i,
1570  (long)file,
1571  file->theFileName.c_str(),
1572  (long)file->getThread());
1573  }
1574  return;
1575  }
1576  if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
1577  infoEvent("NDBFS: Dump all files: %d", theFiles.size());
1578 
1579  for (unsigned i = 0; i < theFiles.size(); i++){
1580  AsyncFile* file = theFiles[i];
1581  infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
1582  }
1583  return;
1584  }
1585  if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
1586  infoEvent("NDBFS: Dump idle files: %u",
1587  theIdleFiles.size());
1588 
1589  for (unsigned i = 0; i < theIdleFiles.size(); i++){
1590  AsyncFile* file = theIdleFiles[i];
1591  infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
1592  }
1593 
1594  return;
1595  }
1596 
1597  if(signal->theData[0] == 404)
1598  {
1599 #if 0
1600  ndbrequire(signal->getLength() == 2);
1601  Uint32 file= signal->theData[1];
1602  AsyncFile* openFile = theOpenFiles.find(file);
1603  ndbrequire(openFile != 0);
1604  ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile);
1605  Request* curr = openFile->m_current_request;
1606  Request* last = openFile->m_last_request;
1607  if(curr)
1608  ndbout << "Current request: " << *curr << endl;
1609  if(last)
1610  ndbout << "Last request: " << *last << endl;
1611 
1612  ndbout << "theReportTo " << *openFile->theReportTo << endl;
1613  ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
1614 
1615  ndbout << "All files: " << endl;
1616  for (unsigned i = 0; i < theFiles.size(); i++){
1617  AsyncFile* file = theFiles[i];
1618  ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
1619  }
1620 #endif
1621  }
1622 
1623  if(signal->theData[0] == 405)
1624  {
1625  for (unsigned i = 0; i < theFiles.size(); i++)
1626  {
1627  AsyncFile* file = theFiles[i];
1628  if (file == 0)
1629  continue;
1630  ndbout_c("%u : %s %s", i,
1631  file->theFileName.c_str() ? file->theFileName.c_str() : "",
1632  file->isOpen() ? "OPEN" : "CLOSED");
1633  }
1634  }
1635 }//Ndbfs::execDUMP_STATE_ORD()
1636 
1637 const char*
1638 Ndbfs::get_filename(Uint32 fd) const
1639 {
1640  jamEntry();
1641  const AsyncFile* openFile = theOpenFiles.find(fd);
1642  if(openFile)
1643  return openFile->theFileName.get_base_name();
1644  return "";
1645 }
1646 
1647 
1648 BLOCK_FUNCTIONS(Ndbfs)
1649 
1650 template class Vector<AsyncFile*>;
1651 template class Vector<AsyncIoThread*>;
1652 template class Vector<OpenFiles::OpenFileItem>;
1653 template class MemoryChannel<Request>;
1654 template class Pool<Request>;
1655 template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);