20 #include <ndb_global.h>
22 #include <NdbSchemaCon.hpp>
24 #include <md5_hash.hpp>
26 #include <NdbThread.h>
30 #include <NdbTimer.hpp>
31 #include <NDBT_Error.hpp>
33 #include <NdbTest.hpp>
34 #include <NDBT_Stats.hpp>
41 #define NDB_MAXTHREADS 128
50 #define MAXATTRSIZE 1000
69 extern "C" {
static void* threadLoop(
void*); }
70 static void setAttrNames(
void);
71 static void setTableNames(
void);
72 static int readArguments(
int argc,
const char** argv);
73 static int createTables(
Ndb*);
74 static void defineOperation(
NdbConnection* aTransObject, StartType aType,
75 Uint32 base, Uint32 aIndex);
77 Uint32 base, Uint32 aIndex);
78 static void execute(StartType aType);
79 static bool executeThread(
ThreadNdb*, StartType aType,
Ndb* aNdbObject,
unsigned int);
80 static void executeCallback(
int result,
NdbConnection* NdbObject,
82 static bool error_handler(
const NdbError & err);
83 static Uint32 getKey(Uint32, Uint32) ;
84 static void input_error();
87 static int retry_opt = 3 ;
88 static int failed = 0 ;
92 static NdbThread* threadLife[NDB_MAXTHREADS];
94 static int ThreadReady[NDB_MAXTHREADS];
95 static StartType ThreadStart[NDB_MAXTHREADS];
96 static char tableName[MAXTABLES][MAXSTRLEN+1];
97 static char attrName[MAXATTR][MAXSTRLEN+1];
101 static bool tNdbRecord =
false;
103 static bool tLocal =
false;
104 static int tLocalPart = 0;
105 static int tSendForce = 0;
106 static int tNoOfLoops = 1;
107 static int tAttributeSize = 1;
108 static unsigned int tNoOfThreads = 1;
109 static unsigned int tNoOfParallelTrans = 32;
110 static unsigned int tNoOfAttributes = 25;
111 static unsigned int tNoOfTransactions = 500;
112 static unsigned int tNoOfOpsPerTrans = 1;
113 static unsigned int tLoadFactor = 80;
114 static bool tempTable =
false;
115 static bool startTransGuess =
true;
116 static int tExtraReadLoop = 0;
119 static int theTestFlag = 0;
120 static int theSimpleFlag = 0;
121 static int theDirtyFlag = 0;
122 static int theWriteFlag = 0;
123 static int theStdTableNameFlag = 0;
124 static int theTableCreateFlag = 0;
125 static int tConnections = 1;
127 #define START_REAL_TIME
128 #define STOP_REAL_TIME
129 #define START_TIMER { NdbTimer timer; timer.doStart();
130 #define STOP_TIMER timer.doStop();
131 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
139 printf(
"%s average: %u/s min: %u/s max: %u/s stddev: %u%%\n",
141 (
unsigned)s.getMean(),
142 (unsigned)s.getMin(),
143 (unsigned)s.getMax(),
144 (unsigned)(100*s.getStddev() / s.getMean()));
150 for (
unsigned i = 0;
i < tNoOfThreads ;
i++) {
152 ThreadStart[
i] = stIdle;
162 NdbSleep_MilliSleep(20);
163 for (
unsigned i = 0;
i < tNoOfThreads ;
i++) {
164 if (ThreadReady[
i] == 0) {
172 tellThreads(StartType what)
174 for (
unsigned i = 0;
i < tNoOfThreads ;
i++)
175 ThreadStart[
i] = what;
180 NDB_COMMAND(flexAsynch,
"flexAsynch",
"flexAsynch",
"flexAsynch", 65535)
185 int returnValue = NDBT_OK;
190 if (readArguments(argc, argv) != 0){
192 return NDBT_ProgramExit(NDBT_WRONGARGS);
195 pThreadData =
new ThreadNdb[NDB_MAXTHREADS];
197 ndbout << endl <<
"FLEXASYNCH - Starting normal mode" << endl;
198 ndbout <<
"Perform benchmark of insert, update and delete transactions";
200 ndbout <<
" " << tNoOfThreads <<
" number of concurrent threads " << endl;
201 ndbout <<
" " << tNoOfParallelTrans;
202 ndbout <<
" number of parallel operation per thread " << endl;
203 ndbout <<
" " << tNoOfTransactions <<
" transaction(s) per round " << endl;
204 ndbout <<
" " << tNoOfLoops <<
" iterations " << endl;
205 ndbout <<
" " <<
"Load Factor is " << tLoadFactor <<
"%" << endl;
206 ndbout <<
" " << tNoOfAttributes <<
" attributes per table " << endl;
207 ndbout <<
" " << tAttributeSize;
208 ndbout <<
" is the number of 32 bit words per attribute " << endl;
209 if (tempTable ==
true) {
210 ndbout <<
" Tables are without logging " << endl;
212 ndbout <<
" Tables are with logging " << endl;
214 if (startTransGuess ==
true) {
215 ndbout <<
" Transactions are executed with hint provided" << endl;
217 ndbout <<
" Transactions are executed with round robin scheme" << endl;
219 if (tSendForce == 0) {
220 ndbout <<
" No force send is used, adaptive algorithm used" << endl;
221 }
else if (tSendForce == 1) {
222 ndbout <<
" Force send used" << endl;
224 ndbout <<
" No force send is used, adaptive algorithm disabled" << endl;
229 NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
238 if (tConnections > 1)
240 printf(
"Creating %u connections...", tConnections);
243 for (
int i = 0;
i < tConnections;
i++)
245 if(g_cluster_connection[
i].connect(12, 5, 1) != 0)
246 return NDBT_ProgramExit(NDBT_FAILED);
248 if (tConnections > 1)
254 Ndb * pNdb =
new Ndb(g_cluster_connection+0,
"TEST_DB");
258 ndbout <<
" NdbAPI node with id = " << pNdb->
getNodeId() << endl;
261 ndbout <<
"Waiting for ndb to become ready..." <<endl;
263 ndbout <<
"NDB is not ready" << endl;
264 ndbout <<
"Benchmark failed!" << endl;
265 returnValue = NDBT_FAILED;
268 if(returnValue == NDBT_OK){
269 if (createTables(pNdb) != 0){
270 returnValue = NDBT_FAILED;
276 Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
278 for (Uint32
i = 0;
i<tNoOfThreads;
i++)
280 pThreadData[
i].record = (
char*)malloc(sz);
281 bzero(pThreadData[
i].
record, sz);
285 if(returnValue == NDBT_OK){
290 for (Uint32
i = 0;
i < tNoOfThreads ;
i++) {
291 pThreadData[
i].ThreadNo =
i
293 threadLife[
i] = NdbThread_Create(threadLoop,
294 (
void**)&pThreadData[
i],
297 NDB_THREAD_PRIO_LOW);
299 ndbout << endl <<
"All NDB objects and table created" << endl << endl;
300 int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
307 int loopCount = tLoops + 1 ;
308 ndbout << endl <<
"Loop # " << loopCount << endl << endl ;
319 a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
320 PRINT_TIMER(
"insert", noOfTransacts, tNoOfOpsPerTrans);
325 while (0 < failed && 0 < i){
326 ndbout << failed <<
" of the transactions returned errors!"
328 ndbout <<
"Attempting to redo the failed transactions now..."
330 ndbout <<
"Redo attempt " << ci <<
" out of " << retry_opt
336 PRINT_TIMER(
"insert", noOfTransacts, tNoOfOpsPerTrans);
341 ndbout << endl <<
"Redo attempt succeeded" << endl << endl;
343 ndbout << endl <<
"Redo attempt failed, moving on now..." << endl
354 for (
int ll = 0; ll < 1 + tExtraReadLoop; ll++)
359 a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
360 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
366 while (0 < failed && 0 < i){
367 ndbout << failed <<
" of the transactions returned errors!"<<endl ;
369 ndbout <<
"Attempting to redo the failed transactions now..." << endl;
371 ndbout <<
"Redo attempt " << cr <<
" out of ";
372 ndbout << retry_opt << endl << endl;
377 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
382 ndbout << endl <<
"Redo attempt succeeded" << endl << endl ;
384 ndbout << endl <<
"Redo attempt failed, moving on now..." << endl << endl ;
398 a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
399 PRINT_TIMER(
"update", noOfTransacts, tNoOfOpsPerTrans) ;
404 while (0 < failed && 0 < i){
405 ndbout << failed <<
" of the transactions returned errors!"<<endl ;
407 ndbout <<
"Attempting to redo the failed transactions now..." << endl;
408 ndbout << endl <<
"Redo attempt " << cu <<
" out of ";
409 ndbout << retry_opt << endl << endl;
414 PRINT_TIMER(
"update", noOfTransacts, tNoOfOpsPerTrans);
419 ndbout << endl <<
"Redo attempt succeeded" << endl << endl;
422 ndbout <<
"Redo attempt failed, moving on now..." << endl << endl;
432 for (
int ll = 0; ll < 1 + tExtraReadLoop; ll++)
437 a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
438 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
444 while (0 < failed && 0 < i){
445 ndbout << failed <<
" of the transactions returned errors!"<<endl ;
447 ndbout <<
"Attempting to redo the failed transactions now..." << endl;
448 ndbout << endl <<
"Redo attempt " << cr2 <<
" out of ";
449 ndbout << retry_opt << endl << endl;
454 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
459 ndbout << endl <<
"Redo attempt succeeded" << endl << endl;
462 ndbout <<
"Redo attempt failed, moving on now..." << endl << endl;
476 a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
477 PRINT_TIMER(
"delete", noOfTransacts, tNoOfOpsPerTrans);
482 while (0 < failed && 0 < i){
483 ndbout << failed <<
" of the transactions returned errors!"<< endl ;
485 ndbout <<
"Attempting to redo the failed transactions now:" << endl ;
486 ndbout << endl <<
"Redo attempt " << cd <<
" out of ";
487 ndbout << retry_opt << endl << endl;
492 PRINT_TIMER(
"read", noOfTransacts, tNoOfOpsPerTrans);
497 ndbout << endl <<
"Redo attempt succeeded" << endl << endl ;
500 ndbout <<
"Redo attempt failed, moving on now..." << endl << endl;
505 ndbout <<
"--------------------------------------------------" << endl;
508 if(tNoOfLoops <= tLoops)
515 for(Uint32 i = 0; i<tNoOfThreads; i++){
516 NdbThread_WaitFor(threadLife[i], &tmp);
517 NdbThread_Destroy(&threadLife[i]);
520 delete [] pThreadData;
526 print(
"insert", a_i);
527 print(
"update", a_u);
528 print(
"delete", a_d);
531 delete [] g_cluster_connection;
533 return NDBT_ProgramExit(returnValue);
537 static void execute(StartType aType)
550 int threadNo = tabThread->ThreadNo;
551 localNdb =
new Ndb(g_cluster_connection+(threadNo % tConnections),
"TEST_DB");
552 localNdb->
init(1024);
554 unsigned int threadBase = (threadNo << 16) + tNodeId ;
557 while (ThreadStart[threadNo] == stIdle) {
558 NdbSleep_MilliSleep(10);
562 if (ThreadStart[threadNo] == stStop) {
566 tType = ThreadStart[threadNo];
567 ThreadStart[threadNo] = stIdle;
568 if(!executeThread(tabThread, tType, localNdb, threadBase)){
571 ThreadReady[threadNo] = 1;
575 ThreadReady[threadNo] = 1;
583 StartType aType,
Ndb* aNdbObject,
unsigned int threadBase) {
589 unsigned int extraLoops= 0;
591 for (
unsigned int ex= 0; ex < (1 + extraLoops); ex++)
593 for (
unsigned int i = 0; i < tNoOfTransactions; i++) {
594 if (tLocal ==
false) {
595 tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
597 tBase = i * tNoOfParallelTrans * MAX_SEEK;
600 for (
unsigned int j = 0; j < tNoOfParallelTrans; j++) {
601 if (tLocal ==
false) {
602 tBase2 = tBase + (j * tNoOfOpsPerTrans);
604 tBase2 = tBase + (j * MAX_SEEK);
605 tBase2 = getKey(threadBase, tBase2);
607 if (startTransGuess ==
true) {
612 Tkey32[0] = threadBase;
615 (
const char*)&Tkey64,
620 if (tConArray[j] == NULL &&
622 ndbout << endl <<
"Unable to recover! Quiting now" << endl ;
626 for (
unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
631 defineNdbRecordOperation(pThread,
632 tConArray[j], aType, threadBase,(tBase2+k));
634 defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
645 while (
unsigned(Tcomp) < tNoOfParallelTrans) {
646 int TlocalComp = aNdbObject->
pollNdb(3000, 0);
649 for (
unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
659 getKey(Uint32 aBase, Uint32 anIndex) {
660 Uint32 Tfound = anIndex;
667 for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
668 Tkey32[1] = (Uint32)i;
669 hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
670 hash = (hash >> 6) & (MAX_PARTS - 1);
671 if (hash ==
unsigned(tLocalPart)) {
680 executeCallback(
int result,
NdbConnection* NdbObject,
void* aObject)
686 int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->
getNdbError());
691 }
else if (retCode == 2) {
692 ndbout <<
"4115 should not happen in flexAsynch" << endl;
693 }
else if (retCode == 3) {
709 defineOperation(
NdbConnection* localNdbConnection, StartType aType,
710 Uint32 threadBase, Uint32 aIndex)
713 unsigned int loopCountAttributes = tNoOfAttributes;
714 unsigned int countAttributes;
715 Uint32 attrValue[MAXATTRSIZE];
720 attrValue[0] = threadBase;
721 attrValue[1] = aIndex;
722 for (
unsigned k = 2; k < loopCountAttributes; k++) {
723 attrValue[k] = aIndex;
726 if (localNdbOperation == NULL) {
731 if (theWriteFlag == 1 && theDirtyFlag == 1) {
733 }
else if (theWriteFlag == 1) {
741 if (theSimpleFlag == 1) {
743 }
else if (theDirtyFlag == 1) {
751 if (theWriteFlag == 1 && theDirtyFlag == 1) {
753 }
else if (theWriteFlag == 1) {
755 }
else if (theDirtyFlag == 1) {
770 localNdbOperation->
equal((Uint32)0,(
char*)&attrValue[0]);
775 for (countAttributes = 1;
776 countAttributes < loopCountAttributes; countAttributes++) {
777 localNdbOperation->
setValue(countAttributes,
778 (
char*)&attrValue[0]);
783 for (countAttributes = 1;
784 countAttributes < loopCountAttributes; countAttributes++) {
785 localNdbOperation->
getValue(countAttributes,
786 (
char*)&attrValue[0]);
803 defineNdbRecordOperation(
ThreadNdb* pThread,
805 Uint32 threadBase, Uint32 aIndex)
807 char *
record = pThread->record;
809 NdbDictionary::getOffset(g_record[0], 0, offset);
810 * (Uint32*)(record + offset) = threadBase;
811 * (Uint32*)(record + offset + 4) = aIndex;
816 if (aType != stRead && aType != stDelete)
818 for (
unsigned k = 1; k < tNoOfAttributes; k++) {
819 NdbDictionary::getOffset(g_record[0], k, offset);
820 * (Uint32*)(record + offset) = aIndex;
827 if (theWriteFlag == 1)
829 op = pTrans->writeTuple(g_record[0],record,g_record[0],record);
833 op = pTrans->insertTuple(g_record[0],record,g_record[0],record);
842 op = pTrans->updateTuple(g_record[0],record,g_record[0],record);
846 op = pTrans->deleteTuple(g_record[0],record, g_record[0]);
856 ndbout <<
"Operation is null " << pTrans->
getNdbError() << endl;
863 static void setAttrNames()
867 for (i = 0; i < MAXATTR ; i++){
873 static void setTableNames()
878 for (i = 0; i < MAXTABLES ; i++){
879 if (theStdTableNameFlag==0){
881 (
unsigned)(NdbTick_CurrentMillisecond()+rand()));
890 createTables(
Ndb* pMyNdb){
896 if (theTableCreateFlag == 0) {
897 for(
int i=0; i < 1 ;i++) {
898 ndbout <<
"Creating " << tableName[
i] <<
"..." << endl;
899 MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
901 if(MySchemaTransaction == NULL &&
902 (!error_handler(MySchemaTransaction->
getNdbError())))
906 if(MySchemaOp == NULL &&
907 (!error_handler(MySchemaTransaction->
getNdbError())))
924 (!error_handler(MySchemaTransaction->
getNdbError())))
936 (!error_handler(MySchemaTransaction->
getNdbError())))
938 for (
unsigned j = 1; j < tNoOfAttributes ; j++){
947 (!error_handler(MySchemaTransaction->
getNdbError())))
951 if (MySchemaTransaction->
execute() == -1 &&
952 (!error_handler(MySchemaTransaction->
getNdbError())))
955 NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
969 off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
973 pDict->createRecord(pTab, spec.getBase(),
985 bool error_handler(
const NdbError & err){
986 ndbout << err << endl ;
991 ndbout << endl <<
"Attempting to recover and continue now..." << endl ;
1001 readArguments(
int argc,
const char** argv){
1005 if (strcmp(argv[i],
"-t") == 0){
1006 tNoOfThreads = atoi(argv[i+1]);
1007 if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)){
1008 ndbout_c(
"Invalid no of threads");
1011 }
else if (strcmp(argv[i],
"-p") == 0){
1012 tNoOfParallelTrans = atoi(argv[i+1]);
1013 if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
1014 ndbout_c(
"Invalid no of parallell transactions");
1017 }
else if (strcmp(argv[i],
"-load_factor") == 0){
1018 tLoadFactor = atoi(argv[i+1]);
1019 if ((tLoadFactor < 40) || (tLoadFactor > 99)){
1020 ndbout_c(
"Invalid load factor");
1023 }
else if (strcmp(argv[i],
"-c") == 0) {
1024 tNoOfOpsPerTrans = atoi(argv[i+1]);
1025 if (tNoOfOpsPerTrans < 1){
1026 ndbout_c(
"Invalid no of operations per transaction");
1029 }
else if (strcmp(argv[i],
"-o") == 0) {
1030 tNoOfTransactions = atoi(argv[i+1]);
1031 if (tNoOfTransactions < 1){
1032 ndbout_c(
"Invalid no of transactions");
1035 }
else if (strcmp(argv[i],
"-a") == 0){
1036 tNoOfAttributes = atoi(argv[i+1]);
1037 if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)){
1038 ndbout_c(
"Invalid no of attributes");
1041 }
else if (strcmp(argv[i],
"-n") == 0){
1042 theStdTableNameFlag = 1;
1045 }
else if (strcmp(argv[i],
"-l") == 0){
1046 tNoOfLoops = atoi(argv[i+1]);
1047 if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)){
1048 ndbout_c(
"Invalid no of loops");
1051 }
else if (strcmp(argv[i],
"-s") == 0){
1052 tAttributeSize = atoi(argv[i+1]);
1053 if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)){
1054 ndbout_c(
"Invalid attributes size");
1057 }
else if (strcmp(argv[i],
"-local") == 0){
1058 tLocalPart = atoi(argv[i+1]);
1060 startTransGuess =
true;
1061 if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
1062 ndbout_c(
"Invalid local part");
1065 }
else if (strcmp(argv[i],
"-simple") == 0){
1069 }
else if (strcmp(argv[i],
"-adaptive") == 0){
1073 }
else if (strcmp(argv[i],
"-force") == 0){
1077 }
else if (strcmp(argv[i],
"-non_adaptive") == 0){
1081 }
else if (strcmp(argv[i],
"-write") == 0){
1085 }
else if (strcmp(argv[i],
"-dirty") == 0){
1089 }
else if (strcmp(argv[i],
"-test") == 0){
1093 }
else if (strcmp(argv[i],
"-no_table_create") == 0){
1094 theTableCreateFlag = 1;
1097 }
else if (strcmp(argv[i],
"-temp") == 0){
1101 }
else if (strcmp(argv[i],
"-no_hint") == 0){
1102 startTransGuess =
false;
1105 }
else if (strcmp(argv[i],
"-ndbrecord") == 0){
1109 }
else if (strcmp(argv[i],
"-r") == 0){
1110 tExtraReadLoop = atoi(argv[i+1]);
1111 }
else if (strcmp(argv[i],
"-con") == 0){
1112 tConnections = atoi(argv[i+1]);
1120 if (tLocal ==
true) {
1121 if (tNoOfOpsPerTrans != 1) {
1122 ndbout_c(
"Not valid to have more than one op per trans with local");
1124 if (startTransGuess ==
false) {
1125 ndbout_c(
"Not valid to use no_hint with local");
1135 ndbout_c(
"FLEXASYNCH");
1136 ndbout_c(
" Perform benchmark of insert, update and delete transactions");
1138 ndbout_c(
"Arguments:");
1139 ndbout_c(
" -t Number of threads to start, default 1");
1140 ndbout_c(
" -p Number of parallel transactions per thread, default 32");
1141 ndbout_c(
" -o Number of transactions per loop, default 500");
1142 ndbout_c(
" -l Number of loops to run, default 1, 0=infinite");
1143 ndbout_c(
" -load_factor Number Load factor in index in percent (40 -> 99)");
1144 ndbout_c(
" -a Number of attributes, default 25");
1145 ndbout_c(
" -c Number of operations per transaction");
1146 ndbout_c(
" -s Size of each attribute, default 1 ");
1147 ndbout_c(
" (PK is always of size 1, independent of this value)");
1148 ndbout_c(
" -simple Use simple read to read from database");
1149 ndbout_c(
" -dirty Use dirty read to read from database");
1150 ndbout_c(
" -write Use writeTuple in insert and update");
1151 ndbout_c(
" -n Use standard table names");
1152 ndbout_c(
" -no_table_create Don't create tables in db");
1153 ndbout_c(
" -temp Create table(s) without logging");
1154 ndbout_c(
" -no_hint Don't give hint on where to execute transaction coordinator");
1155 ndbout_c(
" -adaptive Use adaptive send algorithm (default)");
1156 ndbout_c(
" -force Force send when communicating");
1157 ndbout_c(
" -non_adaptive Send at a 10 millisecond interval");
1158 ndbout_c(
" -local Number of part, only use keys in one part out of 16");
1159 ndbout_c(
" -ndbrecord");