49 #include <NdbThread.h>
53 #include <NdbTimer.hpp>
56 #include <NdbTest.hpp>
58 #include <NDBT_Error.hpp>
63 #define NDB_MAXTHREADS 256
71 #define MAXATTRSIZE 1000
74 enum StartType { stIdle,
87 unsigned int threadBase;
88 unsigned int transactionCompleted;
91 extern "C" void* threadLoop(
void*);
92 void setAttrNames(
void);
93 void setTableNames(
void);
94 void readArguments(
int argc,
const char** argv);
95 void createAttributeSpace();
96 void createTables(
Ndb*);
97 void defineOperation(
NdbConnection* aTransObject, StartType aType,
unsigned int key,
int *);
98 void execute(StartType aType);
99 void executeThread(StartType aType,
Ndb* aNdbObject,
ThreadNdb* threadInfo);
100 void executeCallback(
int result,
NdbConnection* NdbObject,
void* aObject);
103 bool error_handler(
const NdbError &) ;
104 static int failed = 0 ;
108 static NdbThread* threadLife[NDB_MAXTHREADS];
110 static int ThreadReady[NDB_MAXTHREADS];
111 static StartType ThreadStart[NDB_MAXTHREADS];
112 static char tableName[MAXTABLES][MAXSTRLEN+1];
113 static char attrName[MAXATTR][MAXSTRLEN+1];
114 static int *getAttrValueTable;
117 static int tNoOfLoops = 1;
118 static int tAttributeSize = 1;
119 static unsigned int tNoOfThreads = 1;
120 static unsigned int tNoOfTransInBatch = 32;
121 static unsigned int tNoOfAttributes = 25;
122 static unsigned int tNoOfBatchesInLoop = 200;
123 static unsigned int tNoOfOpsPerTrans = 1;
124 static unsigned int tTimeBetweenBatches = 0;
127 static int theTestFlag = 0;
128 static int theTempFlag = 1;
129 static int theSimpleFlag = 0;
130 static int theDirtyFlag = 0;
131 static int theWriteFlag = 0;
132 static int theStdTableNameFlag = 0;
133 static int theTableCreateFlag = 0;
135 #define START_REAL_TIME NdbTimer timer; timer.doStart();
136 #define STOP_REAL_TIME timer.doStop();
138 #define START_TIMER { NdbTimer timer; timer.doStart();
139 #define STOP_TIMER timer.doStop();
140 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
145 for (
int i = 0;
i < tNoOfThreads ;
i++) {
147 ThreadStart[
i] = stIdle;
157 NdbSleep_MilliSleep(20);
158 for (
int i = 0;
i < tNoOfThreads ;
i++) {
159 if (ThreadReady[
i] == 0) {
167 tellThreads(StartType what)
169 for (
int i = 0;
i < tNoOfThreads ;
i++)
170 ThreadStart[
i] = what;
173 void createAttributeSpace(){
174 getAttrValueTable =
new int[tAttributeSize*
180 void deleteAttributeSpace(){
181 delete [] getAttrValueTable;
184 NDB_COMMAND(flexTimedAsynch,
"flexTimedAsynch",
"flexTimedAsynch [-tpoilcas]",
"flexTimedAsynch", 65535)
192 flexTimedAsynchErrorData =
new ErrorData;
196 pNdb =
new Ndb(
"TEST_DB" );
199 readArguments(argc, argv);
201 createAttributeSpace();
203 ndbout << endl <<
"FLEXTIMEDASYNCH - Starting normal mode" << endl;
204 ndbout <<
"Perform benchmark of insert, update and delete transactions" << endl << endl;
207 ndbout <<
" " <<
"Using temporary tables. " << endl;
210 ndbout <<
" " << tNoOfThreads <<
" number of concurrent threads " << endl;
212 ndbout <<
" " << tNoOfOpsPerTrans <<
" operations per transaction " << endl;
214 ndbout <<
" " << tNoOfTransInBatch <<
" number of transactions in a batch per thread " << endl;
216 ndbout <<
" " << tNoOfBatchesInLoop <<
" number of batches per loop " << endl;
218 ndbout <<
" " << tTimeBetweenBatches <<
" milli seconds at least between batch starts " << endl;
220 ndbout <<
" " << tNoOfLoops <<
" loops " << endl;
222 ndbout <<
" " << tNoOfAttributes <<
" attributes per table " << endl;
224 ndbout <<
" " << tAttributeSize <<
" is the number of 32 bit words per attribute " << endl << endl;
226 NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
234 ndbout <<
"Waiting for ndb to become ready..." <<endl;
237 ndbout <<
" NdbAPI node with id = " << tNodeId << endl;
244 for (
int i = 0;
i < tNoOfThreads ;
i++) {
245 tabThread[
i].ThreadNo =
i;
247 threadLife[
i] = NdbThread_Create(threadLoop,
248 (
void**)&tabThread[
i],
250 "flexTimedAsynchThread",
251 NDB_THREAD_PRIO_LOW);
253 ndbout << endl <<
"All NDB objects and table created" << endl << endl;
254 int noOfTransacts = tNoOfTransInBatch*tNoOfBatchesInLoop*tNoOfThreads;
263 int loopCount = tLoops + 1 ;
264 ndbout << endl <<
"Loop # " << loopCount << endl << endl ;
275 PRINT_TIMER(
"insert", noOfTransacts, tNoOfOpsPerTrans);
278 ndbout << failed <<
" of the transactions returned errors!, moving on now..."<<endl ;
290 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
293 ndbout << failed <<
" of the transactions returned errors!, moving on now..."<<endl ;
307 PRINT_TIMER(
"update", noOfTransacts, tNoOfOpsPerTrans) ;
310 ndbout << failed <<
" of the transactions returned errors!, moving on now..."<<endl ;
322 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
325 ndbout << failed <<
" of the transactions returned errors!, moving on now..."<<endl ;
338 PRINT_TIMER(
"delete", noOfTransacts, tNoOfOpsPerTrans);
341 ndbout << failed <<
" of the transactions returned errors!, moving on now..."<<endl ;
345 ndbout <<
"--------------------------------------------------" << endl;
348 if(tNoOfLoops <= tLoops)
353 ndbout << endl <<
"Benchmark completed!" << endl;
354 returnValue = NDBT_OK;
358 for(
int i = 0;
i<tNoOfThreads;
i++){
359 NdbThread_WaitFor(threadLife[
i], &tmp);
360 NdbThread_Destroy(&threadLife[
i]);
363 ndbout <<
"NDB is not ready" << endl;
364 ndbout <<
"Benchmark failed!" << endl;
365 returnValue = NDBT_FAILED;
368 deleteAttributeSpace();
374 return NDBT_ProgramExit(returnValue);
379 void execute(StartType aType)
394 int threadNo = threadInfo->ThreadNo;
395 localNdb =
new Ndb(
"TEST_DB");
398 threadInfo->threadBase = (threadNo * 2000000) + (tNodeId * 260000000);
401 while (ThreadStart[threadNo] == stIdle) {
402 NdbSleep_MilliSleep(10);
406 if (ThreadStart[threadNo] == stStop) {
410 tType = ThreadStart[threadNo];
411 ThreadStart[threadNo] = stIdle;
412 executeThread(tType, localNdb, threadInfo);
413 ThreadReady[threadNo] = 1;
417 ThreadReady[threadNo] = 1;
422 void executeThread(StartType aType,
Ndb* aNdbObject,
ThreadNdb* threadInfo)
429 int threadId = threadInfo->ThreadNo;
430 int *getValueRowAddress = NULL;
435 for (i = 0; i < tNoOfBatchesInLoop; i++) {
437 tBase = threadInfo->threadBase + (i * tNoOfTransInBatch * tNoOfOpsPerTrans);
439 threadInfo->transactionCompleted = 0;
441 for (j = 0; j < tNoOfTransInBatch; j++) {
442 tBase2 = tBase + (j * tNoOfOpsPerTrans);
444 if ( tConArray[j] == NULL && !error_handler(aNdbObject->
getNdbError())) {
445 ndbout << endl <<
"Unable to recover! Quiting now" << endl ;
450 for (k = 0; k < tNoOfOpsPerTrans; k++) {
455 getValueRowAddress = getAttrValueTable +
456 threadId * tNoOfAttributes * tAttributeSize;
458 defineOperation(tConArray[j], aType, (tBase2 + k), getValueRowAddress);
461 tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, threadInfo);
471 while (threadInfo->transactionCompleted < tNoOfTransInBatch) {
473 ndbout <<
"threadInfo->transactionCompleted = " <<
474 threadInfo->transactionCompleted << endl;
477 for (j = 0 ; j < tNoOfTransInBatch ; j++) {
484 while(timer.elapsedTime() < tTimeBetweenBatches){
485 NdbSleep_MilliSleep(1);
495 executeCallback(
int result,
NdbConnection* NdbObject,
void* aObject)
499 threadInfo->transactionCompleted++;
505 int retCode = flexTimedAsynchErrorData->handleErrorCommon(NdbObject->
getNdbError());
510 }
else if (retCode == 2) {
511 ndbout <<
"4115 should not happen in flexAsynch" << endl;
512 }
else if (retCode == 3) {
519 ndbout <<
"executeCallback threadInfo->transactionCompleted = " <<
520 threadInfo->transactionCompleted << endl;
530 unsigned int threadBase,
534 unsigned int loopCountAttributes = tNoOfAttributes;
535 unsigned int countAttributes;
536 int attrValue[MAXATTRSIZE];
541 for (
int k = 0; k < loopCountAttributes; k++) {
542 *(
int *)&attrValue[k] = (
int)threadBase;
545 if (localNdbOperation == NULL) {
551 if (theWriteFlag == 1 && theDirtyFlag == 1) {
553 }
else if (theWriteFlag == 1) {
561 if (theSimpleFlag == 1) {
563 }
else if (theDirtyFlag == 1) {
571 if (theWriteFlag == 1 && theDirtyFlag == 1) {
573 }
else if (theWriteFlag == 1) {
575 }
else if (theDirtyFlag == 1) {
591 localNdbOperation->
equal((
char*)attrName[0],(
char*)&attrValue[0]);
597 for (countAttributes = 1; countAttributes < loopCountAttributes; countAttributes++) {
598 localNdbOperation->
setValue( (
char*)attrName[countAttributes],(
char*)&attrValue[0]);
603 for (countAttributes = 1; countAttributes < loopCountAttributes; countAttributes++) {
605 localNdbOperation->
getValue((
char*)attrName[countAttributes],
606 (
char *) (pRow + countAttributes*tAttributeSize));
620 void readArguments(
int argc,
const char** argv)
625 if (strcmp(argv[i],
"-t") == 0)
627 tNoOfThreads = atoi(argv[i+1]);
629 if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS))
632 else if (strcmp(argv[i],
"-i") == 0)
634 tTimeBetweenBatches = atoi(argv[i+1]);
635 if (tTimeBetweenBatches < 0)
638 else if (strcmp(argv[i],
"-p") == 0)
640 tNoOfTransInBatch = atoi(argv[i+1]);
642 if ((tNoOfTransInBatch < 1) || (tNoOfTransInBatch > 10000))
645 else if (strcmp(argv[i],
"-c") == 0)
647 tNoOfOpsPerTrans = atoi(argv[i+1]);
648 if (tNoOfOpsPerTrans < 1)
651 else if (strcmp(argv[i],
"-o") == 0)
653 tNoOfBatchesInLoop = atoi(argv[i+1]);
654 if (tNoOfBatchesInLoop < 1)
657 else if (strcmp(argv[i],
"-a") == 0)
659 tNoOfAttributes = atoi(argv[i+1]);
660 if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR))
663 else if (strcmp(argv[i],
"-n") == 0)
665 theStdTableNameFlag = 1;
669 else if (strcmp(argv[i],
"-l") == 0)
671 tNoOfLoops = atoi(argv[i+1]);
672 if ((tNoOfLoops < 0) || (tNoOfLoops > 100000))
675 else if (strcmp(argv[i],
"-s") == 0)
677 tAttributeSize = atoi(argv[i+1]);
678 if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE))
681 else if (strcmp(argv[i],
"-simple") == 0)
687 else if (strcmp(argv[i],
"-write") == 0)
693 else if (strcmp(argv[i],
"-dirty") == 0)
699 else if (strcmp(argv[i],
"-test") == 0)
705 else if (strcmp(argv[i],
"-temp") == 0)
711 else if (strcmp(argv[i],
"-no_table_create") == 0)
713 theTableCreateFlag = 1;
719 ndbout <<
"Arguments: " << endl;
720 ndbout <<
"-t Number of threads to start, i.e., number of parallel loops, default 1 " << endl;
721 ndbout <<
"-p Number of transactions in a batch, default 32 " << endl;
722 ndbout <<
"-o Number of batches per loop, default 200 " << endl;
723 ndbout <<
"-i Minimum time between batch starts in milli seconds, default 0 " << endl;
724 ndbout <<
"-l Number of loops to run, default 1, 0=infinite " << endl;
725 ndbout <<
"-a Number of attributes, default 25 " << endl;
726 ndbout <<
"-c Number of operations per transaction, default 1 " << endl;
727 ndbout <<
"-s Size of each attribute in 32 bit word, default 1"
728 "(Primary Key is always of size 1, independent of this value) " << endl;
729 ndbout <<
"-simple Use simple read to read from database " << endl;
730 ndbout <<
"-dirty Use dirty read to read from database " << endl;
731 ndbout <<
"-write Use writeTuple in insert and update " << endl;
732 ndbout <<
"-n Use standard table names " << endl;
733 ndbout <<
"-no_table_create Don't create tables in db " << endl;
734 ndbout <<
"-temp Use temporary tables, no writing to disk. " << endl;
747 for (i = 0; i < MAXATTR ; i++)
749 sprintf(attrName[i],
"COL%d", i);
759 for (i = 0; i < MAXTABLES ; i++)
761 if (theStdTableNameFlag==1)
763 sprintf(tableName[i],
"TAB%d_%d", tNoOfAttributes,
764 NdbTick_CurrentMillisecond()/1000);
766 sprintf(tableName[i],
"TAB%d_%d", tNoOfAttributes, tAttributeSize*4);
771 void createTables(
Ndb* pMyNdb)
778 if (theTableCreateFlag == 0)
780 for(
int i=0; i < 1 ;i++)
782 ndbout <<
"Creating " << tableName[
i] <<
"..." << endl;
783 MySchemaTransaction = pMyNdb->startSchemaTransaction();
785 if( MySchemaTransaction ==
786 NULL && (!error_handler(MySchemaTransaction->
getNdbError())))
790 if( MySchemaOp == NULL
791 && (!error_handler(MySchemaTransaction->
getNdbError())))
806 if ( check == -1 && (!error_handler(MySchemaTransaction->
getNdbError())))
818 if ( check == -1 &&(!error_handler(MySchemaTransaction->
getNdbError())))
821 for (
int j = 1; j < tNoOfAttributes ; j++)
831 && (!error_handler(MySchemaTransaction->
getNdbError())))
835 if ( MySchemaTransaction->
execute() == -1
836 &&(!error_handler(MySchemaTransaction->
getNdbError())))
839 pMyNdb->closeSchemaTransaction(MySchemaTransaction);
846 bool error_handler(
const NdbError & err) {
847 ndbout << err << endl ;
848 if ( 4008==err.
code || 721==err.
code || 266==err.
code ){
849 ndbout << endl <<
"Attempting to recover and continue now..." << endl ;