21 #include <ndb_global.h>
23 #include <kernel_types.h>
25 #include "AsyncFile.hpp"
28 #include "NdbThread.h"
40 #define DEFAULT_NUM_FILES 1
41 #define MAXREQUESTS 256
42 #define DEFAULT_NUM_REQUESTS 1
43 #define MAXFILESIZE 4096
44 #define DEFAULT_FILESIZE 2048
45 #define FVERSION 0x01000000
48 #define TIMER_START { Uint64 starttick = NdbTick_CurrentMillisecond()
49 #define TIMER_PRINT(str, ops) Uint64 stoptick = NdbTick_CurrentMillisecond();\
50 Uint64 totaltime = (stoptick-starttick); \
51 ndbout << ops << " " << str << \
52 " total time " << (int)totaltime << "ms" << endl;\
54 sprintf(buf, "%d %s/sec\n",(int)((ops*1000)/totaltime), str);\
55 ndbout <<buf << endl;}
57 static int numberOfFiles = DEFAULT_NUM_FILES;
58 static int numberOfRequests = DEFAULT_NUM_REQUESTS;
59 static int fileSize = DEFAULT_FILESIZE;
60 static int removeFiles = 0;
61 static int writeFilesReverse = 0;
62 static int numberOfIterations = 1;
63 Uint32 FileNameArray[4];
68 MemoryChannelMultipleWriter<Request>* theReportChannel;
70 char WritePages[MAXFILES][PAGESIZE];
71 char ReadPages[MAXFILES][PAGESIZE];
73 int readArguments(
int argc,
const char** argv);
74 int openFile(
int fileNum);
76 int closeFile(
int fileNum);
78 int writeFile(
int fileNum,
int pagenum);
80 int writeSyncFile(
int fileNum,
int pagenum);
81 int writeSyncFileWait();
82 int readFile(
int fileNum,
int pagenum);
86 NDB_COMMAND(aftest,
"aftest",
"aftest [-n <Number of files>] [-r <Number of simultaneous requests>] [-s <Filesize, number of pages>] [-l <Number of iterations>] [-remove, remove files after close] [-reverse, write files in reverse order, start with the last page]",
"Test the AsyncFile class of Ndb", 8192)
88 int s, numReq, numOps;
90 readArguments(argc, argv);
94 theReportChannel =
new MemoryChannelMultipleWriter<Request>;
96 ndbout <<
"AsyncFileTest starting" << endl;
97 ndbout <<
" " << numberOfFiles <<
" files" << endl;
98 ndbout <<
" " << numberOfRequests <<
" requests" << endl;
99 ndbout <<
" " << fileSize <<
" * 8k files" << endl << endl;
100 ndbout <<
" " << numberOfIterations <<
" iterations" << endl << endl;
102 NdbThread_SetConcurrencyLevel(numberOfFiles+2);
105 for (
int i = 0;
i < MAXFILES;
i++) {
106 for (
int j = 0; j < PAGESIZE; j++){
107 WritePages[
i][j] = (64+
i+j)%256;
114 FileNameArray[0] = 27;
115 FileNameArray[1] = 27;
116 FileNameArray[2] = 27;
117 FileNameArray[3] = FVERSION;
119 for (
int l = 0; l < numberOfIterations; l++)
122 ndbout <<
"Opening files" << endl;
124 for (
int f = 0; f < numberOfFiles; f++)
133 ndbout <<
"Files opened!" << endl<< endl;
136 ndbout <<
"Started writing" << endl;
141 while ( s < fileSize)
143 for (
int r = 0; r < numberOfRequests; r++)
145 for (
int f = 0; f < numberOfFiles; f++)
163 TIMER_PRINT(
"writes", numOps);
166 ndbout <<
"Started reading" << endl;
173 while ( s < fileSize)
175 for (
int r = 0; r < numberOfRequests; r++)
177 for (
int f = 0; f < numberOfFiles; f++)
195 TIMER_PRINT(
"reads", numOps);
197 ndbout <<
"Started writing with sync" << endl;
204 while ( s < fileSize)
206 for (
int r = 0; r < numberOfRequests; r++)
208 for (
int f = 0; f < numberOfFiles; f++)
226 TIMER_PRINT(
"writeSync", numOps);
229 ndbout <<
"Closing files" << endl;
230 for (
int f = 0; f < numberOfFiles; f++)
239 ndbout <<
"Files closed!" << endl<< endl;
244 delete theReportChannel;
245 delete theRequestPool;
255 file->execute(request);
260 int openFile(
int fileNum)
264 FileNameArray[3] = fileNum | FVERSION;
265 file->fileName().set( NDBFS_REF, &FileNameArray[0] );
266 ndbout <<
"openFile: " << file->fileName().c_str() << endl;
271 ndbout <<
"Failed to set filename" << endl;
274 file->reportTo(theReportChannel);
276 Request* request = theRequestPool->get();
277 request->action= Request::open;
279 request->par.open.flags = 0x302;
280 request->set(NDBFS_REF, 0x23456789, fileNum );
281 request->file =
file;
283 if (!forward(file,request)) {
285 ndbout <<
"Could not forward open request" << endl;
286 theRequestPool->put(request);
292 int closeFile(
int fileNum)
297 Request* request = theRequestPool->get();
298 if (removeFiles == 1)
299 request->action = Request::closeRemove;
301 request->action= Request::close;
304 request->set(NDBFS_REF, 0x23456789, fileNum );
305 request->file =
file;
307 if (!forward(file,request)) {
309 ndbout <<
"Could not forward close request" << endl;
310 theRequestPool->put(request);
316 int writeFile(
int fileNum,
int pagenum)
320 ndbout <<
"writeFile" << fileNum <<
": "<<pagenum<<
", " << file->fileName().c_str()<< endl;
322 Request *request = theRequestPool->get();
323 request->action = Request::write;
325 request->set(NDBFS_REF, pagenum, fileNum);
326 request->file = openFiles[fileNum];
329 request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
330 request->par.readWrite.pages[0].size = PAGESIZE;
331 if (writeFilesReverse == 1)
335 request->par.readWrite.pages[0].offset = (fileSize - pagenum - 1) * PAGESIZE;
339 request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
341 request->par.readWrite.numberOfPages = 1;
343 if (!forward(file,request)) {
345 ndbout <<
"Could not forward write request" << endl;
346 theRequestPool->put(request);
353 int writeSyncFile(
int fileNum,
int pagenum)
357 ndbout <<
"writeFile" << fileNum <<
": "<<pagenum<<
", " << file->fileName().c_str() << endl;
359 Request *request = theRequestPool->get();
360 request->action = Request::writeSync;
362 request->set(NDBFS_REF, pagenum, fileNum);
363 request->file = openFiles[fileNum];
366 request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
367 request->par.readWrite.pages[0].size = PAGESIZE;
368 request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
369 request->par.readWrite.numberOfPages = 1;
371 if (!forward(file,request)) {
373 ndbout <<
"Could not forward write request" << endl;
374 theRequestPool->put(request);
381 int readFile(
int fileNum,
int pagenum)
385 ndbout <<
"readFile" << fileNum <<
": "<<pagenum<<
", " << file->fileName().c_str() << endl;
387 Request *request = theRequestPool->get();
388 request->action = Request::read;
390 request->set(NDBFS_REF, pagenum, fileNum);
391 request->file = openFiles[fileNum];
394 request->par.readWrite.pages[0].buf = &ReadPages[fileNum][0];
395 request->par.readWrite.pages[0].size = PAGESIZE;
396 request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
397 request->par.readWrite.numberOfPages = 1;
399 if (!forward(file,request)) {
401 ndbout <<
"Could not forward read request" << endl;
402 theRequestPool->put(request);
412 while (openedFiles < numberOfFiles)
414 Request* request = theReportChannel->readChannel();
417 if (request->action == Request::open)
419 if (request->error ==0)
422 ndbout <<
"Opened file " << request->file->fileName().c_str() << endl;
424 openFiles[request->theFilePointer] = request->file;
428 ndbout <<
"error while opening file" << endl;
431 theRequestPool->put(request);
436 ndbout <<
"Unexpected request received" << endl;
441 ndbout <<
"Nothing read from theReportChannel" << endl;
450 while (closedFiles < numberOfFiles)
452 Request* request = theReportChannel->readChannel();
455 if (request->action == Request::close || request->action == Request::closeRemove)
457 if (request->error ==0)
460 ndbout <<
"Closed file " << request->file->fileName().c_str() << endl;
462 openFiles[request->theFilePointer] = NULL;
463 files->put(request->file);
467 ndbout <<
"error while closing file" << endl;
470 theRequestPool->put(request);
475 ndbout <<
"Unexpected request received" << endl;
480 ndbout <<
"Nothing read from theReportChannel" << endl;
488 Request* request = theReportChannel->readChannel();
491 if (request->action == Request::write)
493 if (request->error == 0)
496 ndbout <<
"writeFileWait"<<request->theFilePointer<<
", " << request->theUserPointer<<
" "<< request->file->fileName().c_str() << endl;
502 ndbout <<
"error while writing file, error=" << request->error << endl;
505 theRequestPool->put(request);
509 ndbout <<
"Unexpected request received" << endl;
514 ndbout <<
"Nothing read from theReportChannel" << endl;
519 int writeSyncFileWait()
521 Request* request = theReportChannel->readChannel();
524 if (request->action == Request::writeSync)
526 if (request->error == 0)
529 ndbout <<
"writeFileWait"<<request->theFilePointer<<
", " << request->theUserPointer<<
" "<< request->file->fileName().c_str() << endl;
535 ndbout <<
"error while writing file" << endl;
538 theRequestPool->put(request);
542 ndbout <<
"Unexpected request received" << endl;
547 ndbout <<
"Nothing read from theReportChannel" << endl;
554 Request* request = theReportChannel->readChannel();
557 if (request->action == Request::read)
559 if (request->error == 0)
562 ndbout <<
"readFileWait"<<request->theFilePointer<<
", " << request->theUserPointer<<
" "<< request->file->fileName().c_str() << endl;
564 if (memcmp(&(ReadPages[request->theFilePointer][0]), &(WritePages[request->theFilePointer][0]), PAGESIZE)!=0)
566 ndbout <<
"Verification error!" << endl;
567 for (
int i = 0;
i < PAGESIZE;
i++ ){
568 ndbout <<
" Compare Page " <<
i <<
" : " << ReadPages[request->theFilePointer][
i] <<
", " <<WritePages[request->theFilePointer][
i] << endl;;
569 if( ReadPages[request->theFilePointer][
i] !=WritePages[request->theFilePointer][
i])
578 ndbout <<
"error while reading file" << endl;
581 theRequestPool->put(request);
585 ndbout <<
"Unexpected request received" << endl;
590 ndbout <<
"Nothing read from theReportChannel" << endl;
595 int readArguments(
int argc,
const char** argv)
601 if (strcmp(argv[i],
"-n") == 0)
603 numberOfFiles = atoi(argv[i+1]);
604 if ((numberOfFiles < 1) || (numberOfFiles > MAXFILES))
606 ndbout <<
"Wrong number of files, default = "<<DEFAULT_NUM_FILES << endl;
607 numberOfFiles = DEFAULT_NUM_FILES;
610 else if (strcmp(argv[i],
"-r") == 0)
612 numberOfRequests = atoi(argv[i+1]);
613 if ((numberOfRequests < 1) || (numberOfRequests > MAXREQUESTS))
615 ndbout <<
"Wrong number of requests, default = "<<DEFAULT_NUM_REQUESTS << endl;
616 numberOfRequests = DEFAULT_NUM_REQUESTS;
619 else if (strcmp(argv[i],
"-s") == 0)
621 fileSize = atoi(argv[i+1]);
622 if ((fileSize < 1) || (fileSize > MAXFILESIZE))
624 ndbout <<
"Wrong number of 8k pages, default = "<<DEFAULT_FILESIZE << endl;
625 fileSize = DEFAULT_FILESIZE;
628 else if (strcmp(argv[i],
"-l") == 0)
630 numberOfIterations = atoi(argv[i+1]);
631 if ((numberOfIterations < 1))
633 ndbout <<
"Wrong number of iterations, default = 1" << endl;
634 numberOfIterations = 1;
637 else if (strcmp(argv[i],
"-remove") == 0)
643 else if (strcmp(argv[i],
"-reverse") == 0)
645 ndbout <<
"Writing files reversed" << endl;
646 writeFilesReverse = 1;
655 if ((fileSize % numberOfRequests)!= 0)
657 numberOfRequests = numberOfRequests - (fileSize % numberOfRequests);
658 ndbout <<
"numberOfRequest must be modulo of filesize" << endl;
659 ndbout <<
"New numberOfRequest="<<numberOfRequests<<endl;
667 void ErrorReporter::handleError(ErrorCategory
type,
int messageID,
668 const char* problemData,
const char* objRef, NdbShutdownType stype)
671 ndbout <<
"ErrorReporter::handleError activated" << endl;
672 ndbout <<
"type= " << type << endl;
673 ndbout <<
"messageID= " << messageID << endl;
674 ndbout <<
"problemData= " << problemData << endl;
675 ndbout <<
"objRef= " << objRef << endl;
680 void ErrorReporter::handleAssert(
const char*
message,
const char* file,
int line)
682 ndbout <<
"ErrorReporter::handleAssert activated" << endl;
683 ndbout <<
"message= " << message << endl;
684 ndbout <<
"file= " << file << endl;
685 ndbout <<
"line= " << line << endl;