65 #include <NdbTimer.hpp>
66 #include <NdbThread.h>
67 #include <NdbAutoPtr.hpp>
69 #include <NdbTest.hpp>
74 #define MAXATTRSIZE 1000
75 #define MAXNOLONGKEY 16 // Max number of long keys.
76 #define MAXLONGKEYTOTALSIZE 1023 // words = 4092 bytes
78 extern "C" {
static void* flexBenchThread(
void*); }
79 static int readArguments(
int argc,
const char** argv);
81 static int createTables(
MYSQL*);
82 static int dropTables(
MYSQL*);
84 static int createTables(
Ndb*);
85 static void sleepBeforeStartingTest(
int seconds);
86 static void input_error();
105 StartType threadStart;
109 static int tNodeId = 0 ;
110 static char tableName[MAXTABLES][MAXSTRLEN+1];
111 static char attrName[MAXATTR][MAXSTRLEN+1];
112 static char** longKeyAttrName;
115 static int tNoOfLoops = 1;
116 static int tAttributeSize = 1;
117 static unsigned int tNoOfThreads = 1;
118 static unsigned int tNoOfTables = 1;
119 static unsigned int tNoOfAttributes = 25;
120 static unsigned int tNoOfOperations = 500;
121 static unsigned int tSleepTime = 0;
122 static unsigned int tNoOfLongPK = 1;
123 static unsigned int tSizeOfLongPK = 1;
124 static unsigned int t_instances = 1;
127 static int theSimpleFlag = 0;
128 static int theDirtyFlag = 0;
129 static int theWriteFlag = 0;
130 static int theStdTableNameFlag = 0;
131 static int theTableCreateFlag = 0;
132 static bool theTempTable =
false;
133 static bool VerifyFlag =
true;
134 static bool useLongKeys =
false;
135 static bool verbose =
false;
137 static bool use_ndb =
false;
138 static int engine_id = 0;
139 static int sockets[16];
140 static int n_sockets = 0;
141 static char* engine[] =
143 " ENGINE = NDBCLUSTER ",
149 static bool use_ndb =
true;
154 #define START_TIMER { NdbTimer timer; timer.doStart();
155 #define STOP_TIMER timer.doStop();
156 #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
161 #include <NdbMutex.h>
162 static bool statEnable =
false;
163 static char statHost[100];
164 static int statFreq = 100;
165 static int statPort = 0;
166 static int statSock = -1;
167 static enum { statError = -1, statClosed, statOpen } statState;
168 static NdbMutex statMutex = NDB_MUTEX_INITIALIZER;
178 statReport(
enum StartType st,
int ops)
182 if (NdbMutex_Lock(&statMutex) < 0) {
183 if (statState != statError) {
184 ndbout_c(
"stat: lock mutex failed: %s", strerror(errno));
185 statState = statError;
191 if (statState != statOpen) {
192 char *p = getenv(
"NDB_NODEID");
193 nodeid = p == 0 ? 0 : atoi(p);
194 if ((statSock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
195 if (statState != statError) {
196 ndbout_c(
"stat: create socket failed: %s", strerror(errno));
197 statState = statError;
199 (void)NdbMutex_Unlock(&statMutex);
202 struct sockaddr_in saddr;
203 memset(&saddr, 0,
sizeof(saddr));
204 saddr.sin_family = AF_INET;
205 saddr.sin_port = htons(statPort);
206 if (Ndb_getInAddr(&saddr.sin_addr, statHost) < 0) {
207 if (statState != statError) {
208 ndbout_c(
"stat: host %s not found", statHost);
209 statState = statError;
211 (void)close(statSock);
212 (void)NdbMutex_Unlock(&statMutex);
215 if (connect(statSock, (
struct sockaddr *)&saddr,
sizeof(saddr)) < 0) {
216 if (statState != statError) {
217 ndbout_c(
"stat: connect failed: %s", strerror(errno));
218 statState = statError;
220 (void)close(statSock);
221 (void)NdbMutex_Unlock(&statMutex);
224 statState = statOpen;
225 ndbout_c(
"stat: connection to %s:%d opened", statHost, (
int)statPort);
245 text =
"verifydelete";
252 sprintf(buf,
"%d %s %d\n", nodeid, text, ops);
253 int len = strlen(buf);
255 if (send(statSock, buf, len, 0) != len) {
256 if (statState != statError) {
257 ndbout_c(
"stat: write failed: %s", strerror(errno));
258 statState = statError;
260 (void)close(statSock);
261 (void)NdbMutex_Unlock(&statMutex);
264 (void)NdbMutex_Unlock(&statMutex);
270 for (
unsigned int i = 0;
i < tNoOfThreads;
i++){
271 pt[
i].threadReady = 0;
272 pt[
i].threadResult = 0;
273 pt[
i].threadStart = stIdle;
279 for (
unsigned int i = 0;
i < tNoOfThreads;
i++){
280 if(pt[
i].threadResult != 0){
281 ndbout_c(
"Thread%d reported fatal error %d",
i, pt[
i].threadResult);
294 NdbSleep_MilliSleep(100);
296 for (
unsigned int i = 0;
i < tNoOfThreads;
i++){
297 if (pt[
i].threadReady == 0)
306 for (
unsigned int i = 0;
i < tNoOfThreads;
i++)
307 pt[
i].threadStart = what;
310 NDB_COMMAND(flexBench,
"flexBench",
"flexBench",
"flexbench", 65535)
315 int returnValue = NDBT_OK;
316 if (readArguments(argc, argv) != 0){
318 return NDBT_ProgramExit(NDBT_WRONGARGS);
322 int e1 =
sizeof(
char*) * tNoOfLongPK;
323 int e2_1 = strlen(
"KEYATTR ") + 1;
324 int e2 = e2_1 * tNoOfLongPK;
325 char *tmp = (
char *) malloc(e1 + e2);
327 longKeyAttrName = (
char **) tmp;
329 for (Uint32
i = 0;
i < tNoOfLongPK;
i++) {
331 longKeyAttrName[
i] = tmp;
333 memset(longKeyAttrName[
i], 0, e2_1);
334 sprintf(longKeyAttrName[
i],
"KEYATTR%i", i);
339 p12( pThreadsData =
new ThreadData[tNoOfThreads] );
342 ndbout << endl <<
"FLEXBENCH - Starting normal mode" << endl;
343 ndbout <<
"Perform benchmark of insert, update and delete transactions"<< endl;
344 ndbout <<
" " << tNoOfThreads <<
" thread(s) " << endl;
345 ndbout <<
" " << tNoOfLoops <<
" iterations " << endl;
346 ndbout <<
" " << tNoOfTables <<
" table(s) and " << 1 <<
" operation(s) per transaction " <<endl;
347 ndbout <<
" " << tNoOfAttributes <<
" attributes per table " << endl;
348 ndbout <<
" " << tNoOfOperations <<
" transaction(s) per thread and round " << endl;
349 ndbout <<
" " << tAttributeSize <<
" is the number of 32 bit words per attribute "<< endl;
350 ndbout <<
" " <<
"Table(s) without logging: " << (Uint32)theTempTable << endl;
353 ndbout <<
" " <<
"Using long keys with " << tNoOfLongPK <<
" keys a' " <<
354 tSizeOfLongPK * 4 <<
" bytes each." << endl;
356 ndbout <<
" " <<
"Verification is " ;
358 ndbout <<
"enabled" << endl ;
360 ndbout <<
"disabled" << endl ;
363 ndbout <<
"Use NDB API with NdbPool in this test case" << endl;
364 ndbout <<
"Pool size = " << t_instances << endl;
366 ndbout <<
"Use mysql client with " << engine[engine_id];
367 ndbout <<
" as engine" << endl;
371 NdbThread_SetConcurrencyLevel(tNoOfThreads + 2);
376 if ( mysql_thread_safe() == 0 ) {
377 ndbout <<
"Not thread safe mysql library..." << endl;
378 return NDBT_ProgramExit(NDBT_FAILED);
381 ndbout <<
"Connecting to MySQL..." <<endl;
385 int the_socket = sockets[0];
386 char the_socket_name[1024];
387 sprintf(the_socket_name,
"%s%u%s",
"/tmp/mysql.",the_socket,
".sock");
389 ndbout << the_socket_name << endl;
390 if ( mysql_real_connect(&mysql,
398 ndbout <<
"Connect failed" <<endl;
399 returnValue = NDBT_FAILED;
403 if(returnValue == NDBT_OK){
404 mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
405 if (createTables(&mysql) != 0){
406 returnValue = NDBT_FAILED;
414 ndbout <<
"Creation of the NdbPool failed" << endl;
415 returnValue = NDBT_FAILED;
417 Ndb* pNdb = get_ndb_object(ndb_id,
"test",
"def");
419 ndbout <<
"Failed to get a NDB object" << endl;
420 returnValue = NDBT_FAILED;
423 ndbout <<
" NdbAPI node with id = " << tNodeId << endl;
426 ndbout <<
"Waiting for ndb to become ready..." <<endl;
428 ndbout <<
"NDB is not ready" << endl;
429 ndbout <<
"Benchmark failed!" << endl;
430 returnValue = NDBT_FAILED;
432 if(returnValue == NDBT_OK){
433 if (createTables(pNdb) != 0){
434 returnValue = NDBT_FAILED;
437 return_ndb_object(pNdb, ndb_id);
441 if(returnValue == NDBT_OK){
443 sleepBeforeStartingTest(tSleepTime);
448 resetThreads(pThreadsData);
450 for (
unsigned int i = 0;
i < tNoOfThreads;
i++){
451 pThreadsData[
i].threadNo =
i;
452 pThreadsData[
i].threadLife = NdbThread_Create(flexBenchThread,
453 (
void**)&pThreadsData[
i],
456 NDB_THREAD_PRIO_LOW);
459 waitForThreads(pThreadsData);
461 ndbout << endl <<
"All threads started" << endl << endl;
469 int loopCount = tLoops + 1;
470 ndbout << endl <<
"Loop # " << loopCount << endl << endl;
478 resetThreads(pThreadsData);
479 tellThreads(pThreadsData, stInsert);
480 waitForThreads(pThreadsData);
481 if (checkThreadResults(pThreadsData) != 0){
482 ndbout <<
"Error: Threads failed in performing insert" << endl;
483 returnValue = NDBT_FAILED;
488 PRINT_TIMER(
"insert", tNoOfOperations*tNoOfThreads, tNoOfTables);
493 resetThreads(pThreadsData);
494 ndbout <<
"Verifying inserts...\t" ;
495 tellThreads(pThreadsData, stVerify);
496 waitForThreads(pThreadsData);
497 if (checkThreadResults(pThreadsData) != 0){
498 ndbout <<
"Error: Threads failed while verifying inserts" << endl;
499 returnValue = NDBT_FAILED;
502 ndbout <<
"\t\tOK" << endl << endl ;
512 resetThreads(pThreadsData);
513 tellThreads(pThreadsData, stRead);
514 waitForThreads(pThreadsData);
515 if (checkThreadResults(pThreadsData) != 0){
516 ndbout <<
"Error: Threads failed in performing read" << endl;
517 returnValue = NDBT_FAILED;
522 PRINT_TIMER(
"read", tNoOfOperations*tNoOfThreads, tNoOfTables);
530 resetThreads(pThreadsData);
531 tellThreads(pThreadsData, stUpdate);
532 waitForThreads(pThreadsData);
533 if (checkThreadResults(pThreadsData) != 0){
534 ndbout <<
"Error: Threads failed in performing update" << endl;
535 returnValue = NDBT_FAILED;
540 PRINT_TIMER(
"update", tNoOfOperations*tNoOfThreads, tNoOfTables);
546 resetThreads(pThreadsData);
547 ndbout <<
"Verifying updates...\t" ;
548 tellThreads(pThreadsData, stVerify);
549 waitForThreads(pThreadsData);
550 if (checkThreadResults(pThreadsData) != 0){
551 ndbout <<
"Error: Threads failed while verifying updates" << endl;
552 returnValue = NDBT_FAILED;
555 ndbout <<
"\t\tOK" << endl << endl ;
565 resetThreads(pThreadsData);
566 tellThreads(pThreadsData, stRead);
567 waitForThreads(pThreadsData);
568 if (checkThreadResults(pThreadsData) != 0){
569 ndbout <<
"Error: Threads failed in performing read" << endl;
570 returnValue = NDBT_FAILED;
575 PRINT_TIMER(
"read", tNoOfOperations*tNoOfThreads, tNoOfTables);
583 resetThreads(pThreadsData);
584 tellThreads(pThreadsData, stDelete);
585 waitForThreads(pThreadsData);
586 if (checkThreadResults(pThreadsData) != 0){
587 ndbout <<
"Error: Threads failed in performing delete" << endl;
588 returnValue = NDBT_FAILED;
593 PRINT_TIMER(
"delete", tNoOfOperations*tNoOfThreads, tNoOfTables);
599 resetThreads(pThreadsData);
600 ndbout <<
"Verifying tuple deletion..." ;
601 tellThreads(pThreadsData, stVerifyDelete);
602 waitForThreads(pThreadsData);
603 if (checkThreadResults(pThreadsData) != 0){
604 ndbout <<
"Error: Threads failed in verifying deletes" << endl;
605 returnValue = NDBT_FAILED;
608 ndbout <<
"\t\tOK" << endl << endl ;
612 ndbout <<
"--------------------------------------------------" << endl;
616 if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops )
621 resetThreads(pThreadsData);
622 tellThreads(pThreadsData, stStop);
623 waitForThreads(pThreadsData);
626 for(Uint32
i = 0;
i<tNoOfThreads;
i++){
627 NdbThread_WaitFor(pThreadsData[
i].threadLife, &tmp);
628 NdbThread_Destroy(&pThreadsData[
i].threadLife);
641 return NDBT_ProgramExit(returnValue);
646 unsigned long get_hash(
unsigned long * hash_key,
int len)
648 unsigned long hash_value = 147;
651 for (i = 0; i < len; i++)
654 hash_value = (hash_value << 5) + hash_value + (h_key & 255);
655 hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255);
656 hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255);
657 hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255);
666 static void* flexBenchThread(
void* pArg)
669 unsigned int threadNo, threadBase;
675 StartType tSaveType ;
677 int* attrValue = NULL ;
678 int* attrRefValue = NULL ;
680 int loopCountOps, loopCountTables, loopCountAttributes;
682 int tRetryAttempts = 20;
684 int tSpecialTrans = 0;
685 int nRefLocalOpOffset = 0 ;
687 tNoOfTables * tNoOfAttributes *
sizeof(int) * tAttributeSize ;
689 tNoOfOperations * tNoOfAttributes *
sizeof(int) * tAttributeSize ;
690 unsigned*** longKeyAttrValue = NULL;
693 threadNo = pThreadData->threadNo ;
697 int the_socket = sockets[threadNo % n_sockets];
698 char the_socket_name[1024];
700 sprintf(the_socket_name,
"%s%u%s",
"/tmp/mysql.",the_socket,
".sock");
702 ndbout << the_socket_name << endl;
703 ndbout <<
"Thread connecting to MySQL... " << endl;
706 if ( mysql_real_connect(&mysql,
714 ndbout <<
"failed" << endl;
718 ndbout <<
"ok" << endl;
722 r = mysql_autocommit(&mysql, 0);
724 r = mysql_autocommit(&mysql, 1);
727 ndbout <<
"autocommit on/off failed" << endl;
740 if( !attrValue || !attrRefValue ||
741 ( use_ndb && ( !pOps) ) ){
743 ndbout <<
"One or more memory allocations failed when starting thread #";
744 ndbout << threadNo << endl ;
745 ndbout <<
"Thread #" << threadNo <<
" will now exit" << endl ;
751 pNdb = get_ndb_object(ndb_id,
"test",
"def");
753 ndbout <<
"Failed to get an NDB object" << endl;
754 ndbout <<
"Thread #" << threadNo <<
" will now exit" << endl ;
759 return_ndb_object(pNdb, ndb_id);
765 threadBase = (threadNo * 2000000) + (tNodeId * 260000000);
770 int e1 =
sizeof(
unsigned**) * tNoOfOperations;
771 int e2 =
sizeof(
unsigned*) * tNoOfLongPK * tNoOfOperations;
772 int e3 =
sizeof(unsigned) * tSizeOfLongPK * tNoOfLongPK * tNoOfOperations;
774 p22.reset(tmp = (
char*)malloc(e1+e2+e3));
776 longKeyAttrValue = (
unsigned ***) tmp;
778 for (Uint32
n = 0;
n < tNoOfOperations;
n++) {
779 longKeyAttrValue[
n] = (
unsigned **) tmp;
780 tmp +=
sizeof(
unsigned*) * tNoOfLongPK;
783 for (Uint32
n = 0;
n < tNoOfOperations;
n++){
784 for (Uint32 i = 0; i < tNoOfLongPK ; i++) {
785 longKeyAttrValue[
n][
i] = (
unsigned *) tmp;
786 tmp +=
sizeof(unsigned) * tSizeOfLongPK;
787 memset(longKeyAttrValue[
n][i], 0,
sizeof(
unsigned) * tSizeOfLongPK);
788 for(Uint32 j = 0; j < tSizeOfLongPK; j++) {
790 longKeyAttrValue[
n][
i][j] = threadBase +
n;
796 int nRefOpOffset = 0 ;
798 for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){
800 nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ;
801 for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){
802 *(
int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] =
803 (
int)(threadBase + ops + a) ;
823 int* mysql_data = NULL;
830 int e1 =
sizeof(int)*tAttributeSize*tNoOfAttributes;
839 p21.reset(tmp = (
char*)malloc(e1+e2+e3+e4+e5+e6+e7+e8+e9));
841 mysql_data = (
int*)tmp; tmp += e1;
851 for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
853 bi.buffer_type = MYSQL_TYPE_LONG;
854 bi.buffer = (
char*)&mysql_data[ca*tAttributeSize];
855 bi.buffer_length = 0;
860 for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
862 bi.buffer_type = MYSQL_TYPE_LONG;
863 if ( ca == tNoOfAttributes-1 )
864 bi.buffer = (
char*)&mysql_data[0];
866 bi.buffer = (
char*)&mysql_data[(ca+1)*tAttributeSize];
867 bi.buffer_length = 0;
872 for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
874 bi.buffer_type = MYSQL_TYPE_LONG;
875 bi.buffer = (
char*)&mysql_data[ca*tAttributeSize];
876 bi.buffer_length = 4;
881 for (Uint32 ca = 0; ca < 1; ca++){
883 bi.buffer_type = MYSQL_TYPE_LONG;
884 bi.buffer = (
char*)&mysql_data[ca*tAttributeSize];
885 bi.buffer_length = 0;
890 for (Uint32 i = 0; i < tNoOfTables; i++) {
892 pos += sprintf(buf+pos,
"%s%s%s",
896 pos += sprintf(buf+pos,
"%s",
"?");
897 for (Uint32 j = 1; j < tNoOfAttributes; j++) {
898 pos += sprintf(buf+pos,
"%s",
",?");
900 pos += sprintf(buf+pos,
"%s",
")");
902 ndbout << buf << endl;
903 prep_insert[
i] = mysql_prepare(&mysql, buf, pos);
904 if (prep_insert[i] == 0) {
905 ndbout <<
"mysql_prepare: " << mysql_error(&mysql) << endl;
908 if (mysql_bind_param(prep_insert[i], bind_insert)) {
909 ndbout <<
"mysql_bind_param: " << mysql_error(&mysql) << endl;
914 for (Uint32 i = 0; i < tNoOfTables; i++) {
916 pos += sprintf(buf+pos,
"%s%s%s",
920 for (Uint32 j = 1; j < tNoOfAttributes; j++) {
922 pos += sprintf(buf+pos,
"%s",
",");
923 pos += sprintf(buf+pos,
"%s%s", attrName[j],
"=?");
925 pos += sprintf(buf+pos,
"%s%s%s",
" WHERE ", attrName[0],
"=?");
928 ndbout << buf << endl;
929 prep_update[
i] = mysql_prepare(&mysql, buf, pos);
930 if (prep_update[i] == 0) {
931 ndbout <<
"mysql_prepare: " << mysql_error(&mysql) << endl;
934 if (mysql_bind_param(prep_update[i], bind_update)) {
935 ndbout <<
"mysql_bind_param: " << mysql_error(&mysql) << endl;
940 for (Uint32 i = 0; i < tNoOfTables; i++) {
942 pos += sprintf(buf+pos,
"%s",
"SELECT ");
943 for (Uint32 j = 1; j < tNoOfAttributes; j++) {
945 pos += sprintf(buf+pos,
"%s",
",");
946 pos += sprintf(buf+pos,
"%s", attrName[j]);
948 pos += sprintf(buf+pos,
"%s%s%s%s%s",
955 ndbout << buf << endl;
956 prep_read[
i] = mysql_prepare(&mysql, buf, pos);
957 if (prep_read[i] == 0) {
958 ndbout <<
"mysql_prepare: " << mysql_error(&mysql) << endl;
961 if (mysql_bind_param(prep_read[i], bind_read)) {
962 ndbout <<
"mysql_bind_param: " << mysql_error(&mysql) << endl;
965 if (mysql_bind_result(prep_read[i], &bind_read[1])) {
966 ndbout <<
"mysql_bind_result: " << mysql_error(&mysql) << endl;
971 for (Uint32 i = 0; i < tNoOfTables; i++) {
973 pos += sprintf(buf+pos,
"%s%s%s%s%s",
980 ndbout << buf << endl;
981 prep_delete[
i] = mysql_prepare(&mysql, buf, pos);
982 if (prep_delete[i] == 0) {
983 ndbout <<
"mysql_prepare: " << mysql_error(&mysql) << endl;
986 if (mysql_bind_param(prep_delete[i], bind_delete)) {
987 ndbout <<
"mysql_bind_param: " << mysql_error(&mysql) << endl;
995 pThreadData->threadResult = tResult;
997 pThreadData->threadReady = 1;
999 while (pThreadData->threadStart == stIdle){
1000 NdbSleep_MilliSleep(100);
1004 if (pThreadData->threadStart == stStop){
1005 pThreadData->threadReady = 1;
1013 tType = pThreadData->threadStart;
1015 pThreadData->threadStart = stIdle;
1019 loopCountOps = tNoOfOperations;
1020 loopCountTables = tNoOfTables;
1021 loopCountAttributes = tNoOfAttributes;
1022 for (
int count = 1; count < loopCountOps && tResult == 0;){
1025 pNdb = get_ndb_object(ndb_id,
"test",
"def");
1027 ndbout <<
"Could not get Ndb object in thread" << threadNo;
1033 if (pTrans == NULL) {
1035 ndbout <<
"Could not start transaction in thread" << threadNo;
1044 nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ;
1045 int* tmpAttrRefValue = attrRefValue + nRefLocalOpOffset;
1047 for (
int countTables = 0;
1048 countTables < loopCountTables && tResult == 0;
1051 int nTableOffset = tAttributeSize *
1052 loopCountAttributes *
1055 int* tmpAttrValue = attrValue + nTableOffset;
1059 if (pOps[countTables] == NULL) {
1061 ndbout <<
"getNdbOperation: " << pTrans->
getNdbError();
1068 if (theWriteFlag == 1 && theDirtyFlag == 1)
1070 else if (theWriteFlag == 1)
1076 if (theSimpleFlag == 1)
1078 else if (theDirtyFlag == 1)
1084 if (theWriteFlag == 1 && theDirtyFlag == 1)
1086 else if (theWriteFlag == 1)
1088 else if (theDirtyFlag == 1)
1099 case stVerifyDelete:
1108 for(Uint32 i = 0; i < tNoOfLongPK; i++)
1109 pOps[countTables]->equal(longKeyAttrName[i],
1110 (
char *)longKeyAttrValue[count - 1][i],
1114 pOps[countTables]->
equal((
char*)attrName[0],
1115 (
char*)&tmpAttrRefValue[0]);
1117 if (tType == stInsert) {
1118 for (
int ca = 1; ca < loopCountAttributes; ca++){
1119 pOps[countTables]->
setValue((
char*)attrName[ca],
1120 (
char*)&tmpAttrRefValue[tAttributeSize*ca]);
1122 }
else if (tType == stUpdate) {
1123 for (
int ca = 1; ca < loopCountAttributes; ca++){
1124 int* tmp = (
int*)&tmpAttrRefValue[tAttributeSize*ca];
1125 if (countTables == 0)
1127 pOps[countTables]->
setValue((
char*)attrName[ca],(
char*)tmp);
1129 }
else if (tType == stRead || stVerify == tType) {
1130 for (
int ca = 1; ca < loopCountAttributes; ca++) {
1132 pOps[countTables]->
getValue((
char*)attrName[ca],
1133 (
char*)&tmpAttrValue[tAttributeSize*ca]);
1135 }
else if (stVerifyDelete == tType) {
1137 tTmp = pOps[countTables]->
getValue(longKeyAttrName[0],
1138 (
char*)&tmpAttrValue[0]);
1140 tTmp = pOps[countTables]->
getValue((
char*)attrName[0],
1141 (
char*)&tmpAttrValue[0]);
1151 for (
int ca = 0; ca < loopCountAttributes; ca++){
1152 mysql_data[ca] = tmpAttrRefValue[tAttributeSize*ca];
1154 if (mysql_execute(prep_insert[countTables])) {
1155 ndbout << tableName[countTables];
1156 ndbout <<
" mysql_execute: " << mysql_error(&mysql) << endl;
1161 mysql_data[0] = tmpAttrRefValue[0];
1162 for (
int ca = 1; ca < loopCountAttributes; ca++){
1163 int* tmp = (
int*)&tmpAttrRefValue[tAttributeSize*ca];
1164 if (countTables == 0)
1166 mysql_data[ca] = *tmp;
1168 if (mysql_execute(prep_update[countTables])) {
1169 ndbout << tableName[countTables];
1170 ndbout <<
" mysql_execute: " << mysql_error(&mysql) << endl;
1176 mysql_data[0] = tmpAttrRefValue[0];
1177 if (mysql_execute(prep_read[countTables])) {
1178 ndbout << tableName[countTables];
1179 ndbout <<
" mysql_execute: " << mysql_error(&mysql) << endl;
1183 if (mysql_stmt_store_result(prep_read[countTables])) {
1184 ndbout << tableName[countTables];
1185 ndbout <<
" mysql_stmt_store_result: "
1186 << mysql_error(&mysql) << endl;
1193 while ( (r= mysql_fetch(prep_read[countTables])) == 0 ){
1197 ndbout << tableName[countTables];
1198 ndbout <<
" mysql_fetch: " << mysql_error(&mysql) << endl;
1203 ndbout << tableName[countTables];
1204 ndbout <<
" mysql_fetch: rows = " << rows << endl;
1210 for (
int ca = 1; ca < loopCountAttributes; ca++) {
1211 tmpAttrValue[tAttributeSize*ca] = mysql_data[ca];
1216 mysql_data[0] = tmpAttrRefValue[0];
1217 if (mysql_execute(prep_delete[countTables])) {
1218 ndbout << tableName[countTables];
1219 ndbout <<
" mysql_execute: " << mysql_error(&mysql) << endl;
1224 case stVerifyDelete:
1226 sprintf(buf,
"%s%s%s",
1227 "SELECT COUNT(*) FROM ",tableName[countTables],
";");
1228 if (mysql_query(&mysql, buf)) {
1229 ndbout << buf << endl;
1230 ndbout <<
"Error: " << mysql_error(&mysql) << endl;
1234 MYSQL_RES *res = mysql_store_result(&mysql);
1235 if ( res == NULL ) {
1236 ndbout <<
"mysql_store_result: "
1237 << mysql_error(&mysql) << endl
1238 <<
"errno: " << mysql_errno(&mysql) << endl;
1242 int num_fields = mysql_num_fields(res);
1243 int num_rows = mysql_num_rows(res);
1244 if ( num_rows != 1 || num_fields != 1 ) {
1245 ndbout << tableName[countTables];
1246 ndbout <<
" mysql_store_result: num_rows = " << num_rows
1247 <<
" num_fields = " << num_fields << endl;
1251 MYSQL_ROW row = mysql_fetch_row(res);
1252 if ( row == NULL ) {
1253 ndbout <<
"mysql_fetch_row: "
1254 << mysql_error(&mysql) << endl;
1258 if ( *(
char*)row[0] !=
'0' ) {
1259 ndbout << tableName[countTables];
1260 ndbout <<
" mysql_fetch_row: value = "
1261 << (
char*)(row[0]) << endl;
1265 mysql_free_result(res);
1279 check = pTrans->
execute(Commit);
1282 if (tNoOfTables > 1)
1283 if (mysql_commit(&mysql)) {
1284 ndbout <<
" mysql_commit: " << mysql_error(&mysql) << endl;
1293 if ((tSpecialTrans == 1) &&
1303 ndbout <<
"Insert with 4007 was successful" << endl;
1309 ndbout <<
"Delete with 4007 was successful" << endl;
1318 if ((stVerifyDelete == tType) &&
1327 theErrorData.handleErrorCommon(pTrans->
getNdbError());
1329 ndbout_c(
"execute: %d, %d, %s", count, tType,
1333 }
else if (retCode == 2) {
1334 ndbout <<
"4115 should not happen in flexBench" << endl;
1336 }
else if (retCode == 3) {
1344 if ((tType == stInsert) || (tType == stDelete)) {
1351 if (check == -1 && tResult == 0) {
1352 if (tAttemptNo < tRetryAttempts){
1358 ndbout <<
"Thread" << threadNo;
1359 ndbout <<
": too many errors reported" << endl;
1373 statOps += loopCountTables;
1374 if (statOps >= statFreq) {
1375 statReport(tType, statOps);
1382 if (stVerify == tType && 0 == check){
1383 int nTableOffset = 0 ;
1384 for (
int a = 1 ; a < loopCountAttributes ; a++){
1385 for (
int tables = 0 ; tables < loopCountTables ; tables++){
1386 nTableOffset = tables*loopCountAttributes*tAttributeSize;
1387 int ov =*(
int*)&attrValue[nTableOffset + tAttributeSize*a];
1388 int nv =*(
int*)&tmpAttrRefValue[tAttributeSize*a];
1390 ndbout <<
"Error in verify ";
1391 ndbout <<
"pk = " << tmpAttrRefValue[0] <<
":" << endl;
1392 ndbout <<
"attrValue[" << nTableOffset + tAttributeSize*a <<
"] = " << ov << endl ;
1393 ndbout <<
"attrRefValue[" << nRefLocalOpOffset + tAttributeSize*a <<
"]" << nv << endl ;
1402 return_ndb_object(pNdb, ndb_id);
1410 statReport(tType, statOps);
1417 return_ndb_object(pNdb, ndb_id);
1424 mysql_close(&mysql);
1425 for (Uint32 i = 0; i < tNoOfTables; i++) {
1426 mysql_stmt_close(prep_insert[i]);
1427 mysql_stmt_close(prep_update[i]);
1428 mysql_stmt_close(prep_delete[i]);
1429 mysql_stmt_close(prep_read[i]);
1433 if (use_ndb && pNdb) {
1434 ndbout <<
"I got here " << endl;
1435 return_ndb_object(pNdb, ndb_id);
1441 static int readArguments(
int argc,
const char** argv)
1446 if (strcmp(argv[i],
"-t") == 0){
1447 tNoOfThreads = atoi(argv[i+1]);
1448 if ((tNoOfThreads < 1))
1452 }
else if (strcmp(argv[i],
"-o") == 0){
1453 tNoOfOperations = atoi(argv[i+1]);
1454 if (tNoOfOperations < 1)
1458 }
else if (strcmp(argv[i],
"-a") == 0){
1459 tNoOfAttributes = atoi(argv[i+1]);
1460 if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR))
1464 }
else if (strcmp(argv[i],
"-c") == 0){
1465 tNoOfTables = atoi(argv[i+1]);
1466 if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES))
1470 }
else if (strcmp(argv[i],
"-stdtables") == 0){
1471 theStdTableNameFlag = 1;
1472 }
else if (strcmp(argv[i],
"-l") == 0){
1473 tNoOfLoops = atoi(argv[i+1]);
1474 if ((tNoOfLoops < 0) || (tNoOfLoops > 100000))
1478 }
else if (strcmp(argv[i],
"-pool_size") == 0){
1479 t_instances = atoi(argv[i+1]);
1480 if ((t_instances < 1) || (t_instances > 240))
1485 }
else if (strcmp(argv[i],
"-engine") == 0){
1486 engine_id = atoi(argv[i+1]);
1487 if ((engine_id < 0) || (engine_id > 3))
1491 }
else if (strcmp(argv[i],
"-socket") == 0){
1492 sockets[n_sockets] = atoi(argv[i+1]);
1493 if (sockets[n_sockets] <= 0)
1498 }
else if (strcmp(argv[i],
"-use_ndb") == 0){
1501 }
else if (strcmp(argv[i],
"-s") == 0){
1502 tAttributeSize = atoi(argv[i+1]);
1503 if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE))
1507 }
else if (strcmp(argv[i],
"-lkn") == 0){
1508 tNoOfLongPK = atoi(argv[i+1]);
1510 if ((tNoOfLongPK < 1) || (tNoOfLongPK > MAXNOLONGKEY) ||
1511 (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
1512 ndbout <<
"Argument -lkn is not in the proper range." << endl;
1517 }
else if (strcmp(argv[i],
"-lks") == 0){
1518 tSizeOfLongPK = atoi(argv[i+1]);
1520 if ((tSizeOfLongPK < 1) || (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
1521 ndbout <<
"Argument -lks is not in the proper range 1 to " <<
1522 MAXLONGKEYTOTALSIZE << endl;
1527 }
else if (strcmp(argv[i],
"-simple") == 0){
1529 }
else if (strcmp(argv[i],
"-write") == 0){
1531 }
else if (strcmp(argv[i],
"-dirty") == 0){
1533 }
else if (strcmp(argv[i],
"-sleep") == 0){
1534 tSleepTime = atoi(argv[i+1]);
1535 if ((tSleepTime < 1) || (tSleepTime > 3600))
1539 }
else if (strcmp(argv[i],
"-no_table_create") == 0){
1540 theTableCreateFlag = 1;
1541 }
else if (strcmp(argv[i],
"-temp") == 0){
1542 theTempTable =
true;
1543 }
else if (strcmp(argv[i],
"-noverify") == 0){
1544 VerifyFlag = false ;
1547 }
else if (strcmp(argv[i],
"-verify") == 0){
1550 }
else if (strcmp(argv[i],
"-statserv") == 0){
1553 const char *p = argv[i+1];
1554 const char *q = strrchr(p,
':');
1558 statPort = atoi(q+1);
1562 }
else if (strcmp(argv[i],
"-statfreq") == 0){
1565 statFreq = atoi(argv[i+1]);
1578 if (n_sockets == 0) {
1586 static void sleepBeforeStartingTest(
int seconds){
1588 ndbout <<
"Sleeping(" <<seconds <<
")...";
1589 NdbSleep_SecSleep(seconds);
1590 ndbout <<
" done!" << endl;
1597 dropTables(
MYSQL* mysqlp){
1599 for(
unsigned i = 0; i < tNoOfTables; i++){
1601 ndbout <<
"Dropping " << tableName[
i] <<
"... ";
1602 pos += sprintf(buf+pos,
"%s",
"DROP TABLE ");
1603 pos += sprintf(buf+pos,
"%s%s", tableName[i],
";");
1605 ndbout << endl << buf << endl;
1606 if (mysql_query(mysqlp, buf) != 0){
1607 ndbout <<
"Failed!"<<endl
1608 <<mysql_error(mysqlp)<<endl
1611 ndbout <<
"OK!" << endl;
1620 createTables(
MYSQL* mysqlp){
1622 for (Uint32 i = 0; i < tNoOfAttributes; i++){
1628 for (Uint32 i = 0; i < tNoOfTables; i++){
1629 if (theStdTableNameFlag == 0){
1631 (
int)(NdbTick_CurrentMillisecond() / 1000));
1638 for(
unsigned i = 0; i < tNoOfTables; i++){
1640 ndbout <<
"Creating " << tableName[
i] <<
"... ";
1642 pos += sprintf(buf+pos,
"%s",
"CREATE TABLE ");
1643 pos += sprintf(buf+pos,
"%s%s", tableName[i],
" ");
1645 for(Uint32 i = 0; i < tNoOfLongPK; i++) {
1648 pos += sprintf(buf+pos,
"%s%s%s",
1649 "(", attrName[0],
" int unsigned primary key");
1651 for (
unsigned j = 1; j < tNoOfAttributes; j++)
1652 pos += sprintf(buf+pos,
"%s%s%s",
",", attrName[j],
" int unsigned");
1653 pos += sprintf(buf+pos,
"%s%s%s",
")", engine[engine_id],
";");
1655 ndbout << endl << buf << endl;
1656 if (mysql_query(mysqlp, buf) != 0)
1658 ndbout <<
"done" << endl;
1665 createTables(
Ndb* pMyNdb){
1667 for (Uint32 i = 0; i < tNoOfAttributes; i++){
1673 for (Uint32 i = 0; i < tNoOfTables; i++){
1674 if (theStdTableNameFlag == 0){
1676 (
int)(NdbTick_CurrentMillisecond() / 1000));
1682 for(
unsigned i = 0; i < tNoOfTables; i++){
1683 ndbout <<
"Creating " << tableName[
i] <<
"... ";
1687 tmpTable.setStoredTable(!theTempTable);
1690 for(Uint32 i = 0; i < tNoOfLongPK; i++) {
1693 col.setLength(tSizeOfLongPK);
1694 col.setPrimaryKey(
true);
1695 tmpTable.addColumn(col);
1701 col.setPrimaryKey(
true);
1702 tmpTable.addColumn(col);
1707 for (
unsigned j = 1; j < tNoOfAttributes; j++){
1709 tmpTable.addColumn(col);
1714 ndbout <<
"done" << endl;
1721 static void input_error(){
1722 ndbout << endl <<
"Invalid argument!" << endl;
1723 ndbout << endl <<
"Arguments:" << endl;
1724 ndbout <<
" -t Number of threads to start, default 1" << endl;
1725 ndbout <<
" -o Number of operations per loop, default 500" << endl;
1726 ndbout <<
" -l Number of loops to run, default 1, 0=infinite" << endl;
1727 ndbout <<
" -a Number of attributes, default 25" << endl;
1728 ndbout <<
" -c Number of tables, default 1" << endl;
1729 ndbout <<
" -s Size of each attribute, default 1 (Primary Key is always of size 1," << endl;
1730 ndbout <<
" independent of this value)" << endl;
1731 ndbout <<
" -lkn Number of long primary keys, default 1" << endl;
1732 ndbout <<
" -lks Size of each long primary key, default 1" << endl;
1734 ndbout <<
" -simple Use simple read to read from database" << endl;
1735 ndbout <<
" -dirty Use dirty read to read from database" << endl;
1736 ndbout <<
" -write Use writeTuple in insert and update" << endl;
1737 ndbout <<
" -stdtables Use standard table names" << endl;
1738 ndbout <<
" -no_table_create Don't create tables in db" << endl;
1739 ndbout <<
" -sleep Sleep a number of seconds before running the test, this" << endl;
1740 ndbout <<
" can be used so that another flexBench have time to create tables" << endl;
1741 ndbout <<
" -temp Use tables without logging" << endl;
1742 ndbout <<
" -verify Verify inserts, updates and deletes" << endl ;
1743 ndbout <<
" -use_ndb Use NDB API (otherwise use mysql client)" << endl ;
1744 ndbout <<
" -pool_size Number of Ndb objects in pool" << endl ;
1746 ndbout << endl <<
"Returns:" << endl;
1747 ndbout <<
"\t 0 - Test passed" << endl;
1748 ndbout <<
"\t 1 - Test failed" << endl;
1749 ndbout <<
"\t 2 - Invalid arguments" << endl << endl;