19 #include <ndb_global.h>
22 #include <NdbSchemaCon.hpp>
24 #include <md5_hash.hpp>
26 #include <NdbThread.h>
30 #include <NdbTimer.hpp>
32 #include <NdbTest.hpp>
33 #include <NDBT_Error.hpp>
40 #define NDB_MAXTHREADS 128
49 #define MAXATTRSIZE 1000
54 inline long lrand48(
void) {
return rand(); };
72 Uint32 threadLoopCounter;
73 Uint32 threadNextStart;
75 Uint32 threadLoopStop;
76 Uint32 threadIncrement;
77 Uint32 threadNoCompleted;
79 StartType threadStartType;
84 char transRecord[128];
86 StartType transStartType;
89 Uint32 transErrorCount;
94 extern "C" {
static void* threadLoop(
void*); }
95 static void setAttrNames(
void);
96 static void setTableNames(
void);
97 static int readArguments(
int argc,
const char** argv);
98 static int createTables(
Ndb*);
100 Uint32 vpn_nb, Uint32 vpn_id);
101 static bool executeTransaction(
TransNdb* transNdbRef);
102 static StartType random_choice();
103 static void execute(StartType aType);
105 static void executeCallback(
int result,
NdbConnection* NdbObject,
107 static bool error_handler(
const NdbError & err) ;
108 static Uint32 getKey(Uint32, Uint32) ;
109 static void input_error();
113 static NdbThread* threadLife[NDB_MAXTHREADS];
115 static int ThreadReady[NDB_MAXTHREADS];
116 static StartType ThreadStart[NDB_MAXTHREADS];
117 static char tableName[1][MAXSTRLEN+1];
118 static char attrName[5][MAXSTRLEN+1];
121 static bool tInsert =
false;
122 static bool tDelete =
false;
123 static bool tReadUpdate =
true;
124 static int tUpdateFreq = 20;
125 static bool tLocal =
false;
126 static int tLocalPart = 0;
127 static Uint32 tMinEvents = 0;
128 static int tSendForce = 0;
129 static int tNoOfLoops = 1;
130 static Uint32 tNoOfThreads = 1;
131 static Uint32 tNoOfParallelTrans = 32;
132 static Uint32 tNoOfTransactions = 500;
133 static Uint32 tLoadFactor = 80;
134 static bool tempTable =
false;
135 static bool startTransGuess =
true;
138 static int theSimpleFlag = 0;
139 static int theDirtyFlag = 0;
140 static int theWriteFlag = 0;
141 static int theTableCreateFlag = 1;
143 #define START_REAL_TIME
144 #define STOP_REAL_TIME
145 #define START_TIMER { NdbTimer timer; timer.doStart();
146 #define STOP_TIMER timer.doStop();
147 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
152 for (
int i = 0;
i < (int)tNoOfThreads ;
i++) {
154 ThreadStart[
i] = stIdle;
164 NdbSleep_MilliSleep(20);
165 for (
int i = 0;
i < (int)tNoOfThreads ;
i++) {
166 if (ThreadReady[
i] == 0) {
174 tellThreads(StartType what)
176 for (
int i = 0;
i < (int)tNoOfThreads ;
i++)
177 ThreadStart[
i] = what;
182 NDB_COMMAND(flexTT,
"flexTT",
"flexTT",
"flexTT", 65535)
186 int returnValue = NDBT_OK;
190 if (readArguments(argc, argv) != 0){
192 return NDBT_ProgramExit(NDBT_WRONGARGS);
195 pThreadData =
new ThreadNdb[NDB_MAXTHREADS];
197 ndbout << endl <<
"FLEXTT - Starting normal mode" << endl;
198 ndbout <<
"Perform TimesTen benchmark" << endl;
199 ndbout <<
" " << tNoOfThreads <<
" number of concurrent threads " << endl;
200 ndbout <<
" " << tNoOfParallelTrans;
201 ndbout <<
" number of parallel transaction per thread " << endl;
202 ndbout <<
" " << tNoOfTransactions <<
" transaction(s) per round " << endl;
203 ndbout <<
" " << tNoOfLoops <<
" iterations " << endl;
204 ndbout <<
" " <<
"Update Frequency is " << tUpdateFreq <<
"%" << endl;
205 ndbout <<
" " <<
"Load Factor is " << tLoadFactor <<
"%" << endl;
206 if (tLocal ==
true) {
207 ndbout <<
" " <<
"We only use Local Part = ";
208 ndbout << tLocalPart << endl;
210 if (tempTable ==
true) {
211 ndbout <<
" Tables are without logging " << endl;
213 ndbout <<
" Tables are with logging " << endl;
215 if (startTransGuess ==
true) {
216 ndbout <<
" Transactions are executed with hint provided" << endl;
218 ndbout <<
" Transactions are executed with round robin scheme" << endl;
220 if (tSendForce == 0) {
221 ndbout <<
" No force send is used, adaptive algorithm used" << endl;
222 }
else if (tSendForce == 1) {
223 ndbout <<
" Force send used" << endl;
225 ndbout <<
" No force send is used, adaptive algorithm disabled" << endl;
233 NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
241 return NDBT_ProgramExit(NDBT_FAILED);
243 g_cluster_connection= &con;
245 Ndb * pNdb =
new Ndb(g_cluster_connection,
"TEST_DB");
249 ndbout <<
" NdbAPI node with id = " << pNdb->
getNodeId() << endl;
252 ndbout <<
"Waiting for ndb to become ready..." <<endl;
254 ndbout <<
"NDB is not ready" << endl;
255 ndbout <<
"Benchmark failed!" << endl;
256 returnValue = NDBT_FAILED;
259 if(returnValue == NDBT_OK){
260 if (createTables(pNdb) != 0){
261 returnValue = NDBT_FAILED;
265 if(returnValue == NDBT_OK){
270 for (
int i = 0;
i < (int)tNoOfThreads ;
i++) {
271 pThreadData[
i].threadNo =
i;
272 threadLife[
i] = NdbThread_Create(threadLoop,
273 (
void**)&pThreadData[
i],
276 NDB_THREAD_PRIO_LOW);
278 ndbout << endl <<
"All NDB objects and table created" << endl << endl;
279 int noOfTransacts = tNoOfParallelTrans * tNoOfTransactions *
280 tNoOfThreads * tNoOfLoops;
288 if (tInsert ==
true) {
294 PRINT_TIMER(
"insert", noOfTransacts, 1);
300 if (tReadUpdate ==
true) {
304 PRINT_TIMER(
"update + read", noOfTransacts, 1);
310 if (tDelete ==
true) {
315 PRINT_TIMER(
"delete", noOfTransacts, 1);
317 ndbout <<
"--------------------------------------------------" << endl;
321 for(
int i = 0;
i<(int)tNoOfThreads;
i++){
322 NdbThread_WaitFor(threadLife[
i], &tmp);
323 NdbThread_Destroy(&threadLife[
i]);
326 delete [] pThreadData;
332 return NDBT_ProgramExit(returnValue);
336 static void execute(StartType aType)
348 int loc_threadNo = tabThread->threadNo;
350 void * mem = malloc(
sizeof(
TransNdb)*tNoOfParallelTrans);
353 localNdb =
new Ndb(g_cluster_connection,
"TEST_DB");
354 localNdb->
init(1024);
357 if (tLocal ==
false) {
358 tabThread->threadIncrement = 1;
360 tabThread->threadIncrement = MAX_SEEK;
362 tabThread->threadBase = (loc_threadNo << 16) + tNodeId;
363 tabThread->threadNdb = localNdb;
364 tabThread->threadStop = tNoOfParallelTrans * tNoOfTransactions;
365 tabThread->threadStop *= tabThread->threadIncrement;
366 tabThread->threadLoopStop = tNoOfLoops;
368 for (i = 0; i < tNoOfParallelTrans; i++) {
369 pTransData[
i].transNdb = localNdb;
370 pTransData[
i].transThread = tabThread;
371 pTransData[
i].transOperation = NULL;
372 pTransData[
i].transStartType = stIdle;
373 pTransData[
i].vpn_number = tabThread->threadBase;
374 pTransData[
i].vpn_identity = 0;
375 pTransData[
i].transErrorCount = 0;
376 for (j = 0; j < 128; j++) {
377 pTransData[
i].transRecord[j] = 0x30;
382 while (ThreadStart[loc_threadNo] == stIdle) {
383 NdbSleep_MilliSleep(10);
387 if (ThreadStart[loc_threadNo] == stStop) {
391 tabThread->threadStartType = ThreadStart[loc_threadNo];
392 tabThread->threadLoopCounter = 0;
393 tabThread->threadCompleted =
false;
394 tabThread->threadNoCompleted = 0;
395 tabThread->threadNextStart = 0;
397 ThreadStart[loc_threadNo] = stIdle;
398 if(!executeThread(tabThread, pTransData)){
401 ThreadReady[loc_threadNo] = 1;
406 ThreadReady[loc_threadNo] = 1;
415 for (i = 0; i < tNoOfParallelTrans; i++) {
416 TransNdb* transNdbPtr = &atransDataArrayPtr[
i];
417 transNdbPtr->vpn_identity = i * tabThread->threadIncrement;
418 transNdbPtr->transStartType = tabThread->threadStartType;
419 if (executeTransaction(transNdbPtr) ==
false) {
423 tabThread->threadNextStart = tNoOfParallelTrans * tabThread->threadIncrement;
425 tabThread->threadNdb->
sendPollNdb(3000, tMinEvents, tSendForce);
426 }
while (tabThread->threadCompleted ==
false);
431 bool executeTransaction(
TransNdb* transNdbRef)
434 ThreadNdb* tabThread = transNdbRef->transThread;
435 Ndb* aNdbObject = transNdbRef->transNdb;
436 Uint32 threadBase = tabThread->threadBase;
437 Uint32 startKey = transNdbRef->vpn_identity;
438 if (tLocal ==
true) {
439 startKey = getKey(startKey, threadBase);
441 if (startTransGuess ==
true) {
444 tKey[1] = threadBase;
446 (
const char*)&tKey[0],
451 if (MyTrans == NULL) {
453 ndbout << endl <<
"Unable to recover! Quiting now" << endl ;
459 if (!defineOperation(MyTrans, transNdbRef, startKey, threadBase))
468 getKey(Uint32 aBase, Uint32 aThreadBase) {
469 Uint32 Tfound = aBase;
475 tKey32[0] = aThreadBase;
476 for (Uint32 i = aBase; i < (aBase + MAX_SEEK); i++) {
478 hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
479 hash = (hash >> 6) & (MAX_PARTS - 1);
480 if (hash == (Uint32)tLocalPart) {
489 executeCallback(
int result,
NdbConnection* NdbObject,
void* aObject)
492 ThreadNdb* tabThread = transNdbRef->transThread;
493 Ndb* tNdb = transNdbRef->transNdb;
494 Uint32 vpn_id = transNdbRef->vpn_identity;
495 Uint32 vpn_nb = tabThread->threadBase;
499 int retCode = flexTTErrorData->handleErrorCommon(NdbObject->
getNdbError());
506 }
else if (retCode == 2) {
507 ndbout <<
"4115 should not happen in flexTT" << endl;
508 }
else if (retCode == 3) {
512 transNdbRef->transErrorCount++;
517 ndbout <<
"Error with vpn_id = " << vpn_id <<
" and vpn_nb = ";
518 ndbout << vpn_nb << endl;
519 ndbout << err << endl;
522 NdbSleep_MilliSleep(10);
530 if ((transNdbRef->transErrorCount > 10) ||
531 (tabThread->threadNoCompleted > 0)) {
535 if (tabThread->threadNoCompleted == 0) {
536 transNdbRef->transErrorCount = 0;
537 transNdbRef->vpn_identity = tabThread->threadNextStart;
538 if (tabThread->threadNextStart == tabThread->threadStop) {
539 tabThread->threadLoopCounter++;
540 transNdbRef->vpn_identity = 0;
541 tabThread->threadNextStart = 0;
542 if (tabThread->threadLoopCounter == (Uint32)tNoOfLoops) {
546 tabThread->threadNextStart += tabThread->threadIncrement;
552 executeTransaction(transNdbRef);
557 tabThread->threadNoCompleted++;
558 if (tabThread->threadNoCompleted == tNoOfParallelTrans) {
559 tabThread->threadCompleted =
true;
571 UintR random_number = lrand48() % 100;
572 if ((
int)random_number < tUpdateFreq)
580 unsigned int vpn_id,
unsigned int vpn_nb)
583 StartType TType = transNdbRef->transStartType;
589 if (localNdbOperation == NULL) {
595 if (theWriteFlag == 1 && theDirtyFlag == 1) {
597 }
else if (theWriteFlag == 1) {
604 TType = random_choice();
605 if (TType == stRead) {
606 if (theSimpleFlag == 1) {
608 }
else if (theDirtyFlag == 1) {
614 if (theWriteFlag == 1 && theDirtyFlag == 1) {
616 }
else if (theWriteFlag == 1) {
618 }
else if (theDirtyFlag == 1) {
631 localNdbOperation->
equal((Uint32)0,vpn_id);
632 localNdbOperation->
equal((Uint32)1,vpn_nb);
633 char* attrValue = &transNdbRef->transRecord[0];
636 localNdbOperation->
setValue((Uint32)2, attrValue);
637 localNdbOperation->
setValue((Uint32)3, attrValue);
638 localNdbOperation->
setValue((Uint32)4, attrValue);
641 localNdbOperation->
setValue((Uint32)3, attrValue);
644 localNdbOperation->
getValue((Uint32)2, attrValue);
645 localNdbOperation->
getValue((Uint32)3, attrValue);
646 localNdbOperation->
getValue((Uint32)4, attrValue);
659 static void setAttrNames()
669 static void setTableNames()
676 createTables(
Ndb* pMyNdb){
682 if (theTableCreateFlag == 0) {
683 ndbout <<
"Creating Table: vpn_users " <<
"..." << endl;
684 MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
686 if(MySchemaTransaction == NULL &&
687 (!error_handler(MySchemaTransaction->
getNdbError())))
691 if(MySchemaOp == NULL &&
692 (!error_handler(MySchemaTransaction->
getNdbError())))
708 (!error_handler(MySchemaTransaction->
getNdbError())))
720 (!error_handler(MySchemaTransaction->
getNdbError())))
731 (!error_handler(MySchemaTransaction->
getNdbError())))
741 (!error_handler(MySchemaTransaction->
getNdbError())))
752 (!error_handler(MySchemaTransaction->
getNdbError())))
763 (!error_handler(MySchemaTransaction->
getNdbError())))
766 if (MySchemaTransaction->
execute() == -1 &&
767 (!error_handler(MySchemaTransaction->
getNdbError())))
770 NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
776 bool error_handler(
const NdbError& err){
777 ndbout << err << endl ;
782 ndbout << endl <<
"Attempting to recover and continue now..." << endl ;
790 bool error_handler(
const char* error_string,
int error_int) {
791 ndbout << error_string << endl ;
792 if ((4008 == error_int) ||
793 (677 == error_int) ||
794 (891 == error_int) ||
795 (1221 == error_int) ||
796 (721 == error_int) ||
797 (266 == error_int)) {
798 ndbout << endl <<
"Attempting to recover and continue now..." << endl ;
807 readArguments(
int argc,
const char** argv){
811 if (strcmp(argv[i],
"-t") == 0){
812 tNoOfThreads = atoi(argv[i+1]);
813 if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)){
814 ndbout_c(
"Invalid no of threads");
817 }
else if (strcmp(argv[i],
"-p") == 0){
818 tNoOfParallelTrans = atoi(argv[i+1]);
819 if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
820 ndbout_c(
"Invalid no of parallell transactions");
823 }
else if (strcmp(argv[i],
"-o") == 0) {
824 tNoOfTransactions = atoi(argv[i+1]);
825 if (tNoOfTransactions < 1){
826 ndbout_c(
"Invalid no of transactions");
829 }
else if (strcmp(argv[i],
"-l") == 0){
830 tNoOfLoops = atoi(argv[i+1]);
831 if (tNoOfLoops < 1) {
832 ndbout_c(
"Invalid no of loops");
835 }
else if (strcmp(argv[i],
"-e") == 0){
836 tMinEvents = atoi(argv[i+1]);
837 if ((tMinEvents < 1) || (tMinEvents > tNoOfParallelTrans)) {
838 ndbout_c(
"Invalid no of loops");
841 }
else if (strcmp(argv[i],
"-local") == 0){
842 tLocalPart = atoi(argv[i+1]);
844 startTransGuess =
true;
845 if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
846 ndbout_c(
"Invalid local part");
849 }
else if (strcmp(argv[i],
"-ufreq") == 0){
850 tUpdateFreq = atoi(argv[i+1]);
851 if ((tUpdateFreq < 0) || (tUpdateFreq > 100)){
852 ndbout_c(
"Invalid Update Frequency");
855 }
else if (strcmp(argv[i],
"-load_factor") == 0){
856 tLoadFactor = atoi(argv[i+1]);
857 if ((tLoadFactor < 40) || (tLoadFactor >= 100)){
858 ndbout_c(
"Invalid LoadFactor");
861 }
else if (strcmp(argv[i],
"-d") == 0){
865 }
else if (strcmp(argv[i],
"-i") == 0){
869 }
else if (strcmp(argv[i],
"-simple") == 0){
873 }
else if (strcmp(argv[i],
"-adaptive") == 0){
877 }
else if (strcmp(argv[i],
"-force") == 0){
881 }
else if (strcmp(argv[i],
"-non_adaptive") == 0){
885 }
else if (strcmp(argv[i],
"-write") == 0){
889 }
else if (strcmp(argv[i],
"-dirty") == 0){
893 }
else if (strcmp(argv[i],
"-table_create") == 0){
894 theTableCreateFlag = 0;
898 }
else if (strcmp(argv[i],
"-temp") == 0){
902 }
else if (strcmp(argv[i],
"-no_hint") == 0){
903 startTransGuess =
false;
913 if (tLocal ==
true) {
914 if (startTransGuess ==
false) {
915 ndbout_c(
"Not valid to use no_hint with local");
926 ndbout_c(
" Perform benchmark of insert, update and delete transactions");
928 ndbout_c(
"Arguments:");
929 ndbout_c(
" -t Number of threads to start, default 1");
930 ndbout_c(
" -p Number of parallel transactions per thread, default 32");
931 ndbout_c(
" -o Number of transactions per loop, default 500");
932 ndbout_c(
" -ufreq Number Update Frequency in percent (0 -> 100), rest is read");
933 ndbout_c(
" -load_factor Number Fill level in index in percent (40 -> 99)");
934 ndbout_c(
" -l Number of loops to run, default 1, 0=infinite");
935 ndbout_c(
" -i Start by inserting all records");
936 ndbout_c(
" -d End by deleting all records (only one loop)");
937 ndbout_c(
" -simple Use simple read to read from database");
938 ndbout_c(
" -dirty Use dirty read to read from database");
939 ndbout_c(
" -write Use writeTuple in insert and update");
940 ndbout_c(
" -n Use standard table names");
941 ndbout_c(
" -table_create Create tables in db");
942 ndbout_c(
" -temp Create table(s) without logging");
943 ndbout_c(
" -no_hint Don't give hint on where to execute transaction coordinator");
944 ndbout_c(
" -adaptive Use adaptive send algorithm (default)");
945 ndbout_c(
" -force Force send when communicating");
946 ndbout_c(
" -non_adaptive Send at a 10 millisecond interval");
947 ndbout_c(
" -local Number of part, only use keys in one part out of 16");