18 #include <ndb_global.h>
22 #include <NdbThread.h>
26 #include <NdbTest.hpp>
28 #include "userInterface.h"
29 #include "dbGenerator.h"
30 #include "ndb_schema.hpp"
33 static int numProcesses;
34 static int numSeconds;
35 static int numWarmSeconds;
36 static int parallellism;
37 static int millisSendPoll;
38 static int minEventSendPoll;
39 static int forceSendPoll;
40 static bool useNdbRecord;
41 static bool useCombUpd;
47 static void usage(
const char *prog)
54 progname = strrchr(prog,
'/');
62 "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>]"
63 "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ]\n"
64 " -proc <num> Specifies that <num> is the number of\n"
65 " threads. The default is 1.\n"
66 " -time <num> Specifies that the test will run for <num> sec.\n"
67 " The default is 10 sec\n"
68 " -warm <num> Specifies the warm-up/cooldown period of <num> "
70 " The default is 10 sec\n"
71 " -p <num> The no of parallell transactions started by "
73 " -e <num> Minimum no of events before wake up in call to "
76 " -f <num> force parameter to sendPoll\n"
78 " -ndbrecord Use NdbRecord Api.\n"
79 " Default is to use old Api\n"
80 " -combupdread Use update pre-read operation where possible\n"
81 " Default is to use separate read+update ops\n",
87 parse_args(
int argc,
const char **argv)
95 millisSendPoll = 10000;
103 if (strcmp(
"-proc",argv[i]) == 0) {
107 if (sscanf(argv[i+1],
"%d", &numProcesses) == -1 ||
108 numProcesses <= 0 || numProcesses > 127) {
109 ndbout_c(
"-proc flag requires a positive integer argument [1..127]");
113 }
else if (strcmp(
"-p", argv[i]) == 0){
118 if (sscanf(argv[i+1],
"%d", ¶llellism) == -1 ||
120 ndbout_c(
"-p flag requires a positive integer argument");
125 else if (strcmp(
"-time",argv[i]) == 0) {
129 if (sscanf(argv[i+1],
"%d", &numSeconds) == -1 ||
131 ndbout_c(
"-time flag requires a positive integer argument");
136 else if (strcmp(
"-warm",argv[i]) == 0) {
140 if (sscanf(argv[i+1],
"%d", &numWarmSeconds) == -1 ||
141 numWarmSeconds < 0) {
142 ndbout_c(
"-warm flag requires a positive integer argument");
147 else if (strcmp(
"-e",argv[i]) == 0) {
151 if (sscanf(argv[i+1],
"%d", &minEventSendPoll) == -1 ||
152 minEventSendPoll < 0) {
153 ndbout_c(
"-e flag requires a positive integer argument");
158 else if (strcmp(
"-f",argv[i]) == 0) {
163 if (sscanf(argv[i+1],
"%d", &forceSendPoll) == -1 ||
165 ndbout_c(
"-f flag requires a positive integer argument");
170 else if (strcmp(
"-ndbrecord",argv[i]) == 0) {
174 else if (strcmp(
"-combupdread",argv[i]) == 0) {
184 if(minEventSendPoll > parallellism){
185 ndbout_c(
"minEventSendPoll(%d) > parallellism(%d)",
186 minEventSendPoll, parallellism);
187 ndbout_c(
"not very good...");
188 ndbout_c(
"very bad...");
189 ndbout_c(
"exiting...");
192 if (useNdbRecord && useCombUpd){
193 ndbout_c(
"NdbRecord does not currently support combined update "
194 "and read. Using separate read and update ops");
201 print_transaction(
const char *header,
202 unsigned long totalCount,
204 unsigned int printBranch,
205 unsigned int printRollback)
209 ndbout_c(
" %s: %d (%.2f%%) "
210 "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
213 (
double)trans->count / (
double)totalCount * 100.0,
214 (
int)trans->latency.getMean(),
215 (int)trans->latency.getMin(),
216 (int)trans->latency.getMax(),
217 (int)trans->latency.getStddev(),
218 (int)trans->latency.getCount()
222 if( trans->count == 0 )
225 f = (double)trans->branchExecuted / (
double)trans->count * 100.0;
226 ndbout_c(
" Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
230 if( trans->count == 0 )
233 f = (double)trans->rollbackExecuted / (
double)trans->count * 100.0;
234 ndbout_c(
" Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
239 print_stats(
const char *title,
241 unsigned int transactionFlag,
243 int numProc,
int parallellism)
247 char name[MAXHOSTNAMELEN];
250 NdbHost_GetHostName(name);
252 ndbout_c(
"\n------ %s ------",title);
253 ndbout_c(
"Length : %d %s",
255 transactionFlag ?
"Transactions" :
"sec");
256 ndbout_c(
"Processor : %s", name);
257 ndbout_c(
"Number of Proc: %d",numProc);
258 ndbout_c(
"Parallellism : %d", parallellism);
259 ndbout_c(
"UseNdbRecord : %u", useNdbRecord);
262 if( gen->totalTransactions == 0 ) {
263 ndbout_c(
" No Transactions for this test");
266 for(i = 0; i < 5; i++) {
267 sprintf(buf,
"T%d",i+1);
268 print_transaction(buf,
269 gen->totalTransactions,
270 &gen->transactions[i],
276 ndbout_c(
" Overall Statistics:");
277 ndbout_c(
" Transactions: %d", gen->totalTransactions);
278 ndbout_c(
" Outer : %.0f TPS",gen->outerTps);
280 ndbout_c(
"NDBT_Observation;tps;%.0f", gen->outerTps);
286 threadRoutine(
void *arg)
292 pNDB = asyncDbConnect(parallellism);
295 for(i = 0; i<parallellism; i++){
298 millisSendPoll = 30000;
299 asyncGenerator(data, parallellism,
300 millisSendPoll, minEventSendPoll, forceSendPoll);
302 asyncDbDisconnect(pNDB);
307 NDB_COMMAND(DbAsyncGenerator,
"DbAsyncGenerator",
308 "DbAsyncGenerator",
"DbAsyncGenerator", 65535)
320 if(parse_args(argc,argv) != 0){
322 return NDBT_ProgramExit(NDBT_WRONGARGS);
326 ndbout_c(
"\nStarting Test with %d process(es) for %d %s parallellism %d",
332 ndbout_c(
" WarmUp/coolDown = %d sec", numWarmSeconds);
337 ndbout <<
"Unable to connect to management server." << endl;
342 ndbout <<
"Cluster nodes not ready in 30 seconds." << endl;
348 g_cluster_connection= &con;
363 Ndb* tempNdb= asyncDbConnect(1);
369 cols[0].column= tab->
getColumn((
int) IND_SUBSCRIBER_NUMBER);
371 cols[0].nullbit_byte_offset= 0;
372 cols[0].nullbit_bit_in_byte= 0;
373 cols[1].column= tab->
getColumn((
int) IND_SUBSCRIBER_NAME);
375 cols[1].nullbit_byte_offset= 0;
376 cols[1].nullbit_bit_in_byte= 0;
377 cols[2].column= tab->
getColumn((
int) IND_SUBSCRIBER_GROUP);
379 cols[2].nullbit_byte_offset= 0;
380 cols[2].nullbit_bit_in_byte= 0;
381 cols[3].column= tab->
getColumn((
int) IND_SUBSCRIBER_LOCATION);
383 cols[3].nullbit_byte_offset= 0;
384 cols[3].nullbit_bit_in_byte= 0;
385 cols[4].column= tab->
getColumn((
int) IND_SUBSCRIBER_SESSIONS);
387 cols[4].nullbit_byte_offset= 0;
388 cols[4].nullbit_bit_in_byte= 0;
389 cols[5].column= tab->
getColumn((
int) IND_SUBSCRIBER_CHANGED_BY);
391 cols[5].nullbit_byte_offset= 0;
392 cols[5].nullbit_bit_in_byte= 0;
393 cols[6].column= tab->
getColumn((
int) IND_SUBSCRIBER_CHANGED_TIME);
395 cols[6].nullbit_byte_offset= 0;
396 cols[6].nullbit_bit_in_byte= 0;
398 ndbRecordSharedDataPtr->subscriberTableNdbRecord=
399 dict->createRecord(tab, cols, 7,
sizeof(cols[0]), 0);
401 if (ndbRecordSharedDataPtr->subscriberTableNdbRecord == NULL)
403 ndbout <<
"Error creating record 1 : " << dict->
getNdbError() << endl;
408 cols[0].column= tab->
getColumn((
int) IND_GROUP_ID);
410 cols[0].nullbit_byte_offset= 0;
411 cols[0].nullbit_bit_in_byte= 0;
413 cols[1].column= tab->
getColumn((
int) IND_GROUP_ALLOW_READ);
415 cols[1].nullbit_byte_offset= 0;
416 cols[1].nullbit_bit_in_byte= 0;
418 ndbRecordSharedDataPtr->groupTableAllowReadNdbRecord=
419 dict->createRecord(tab, cols, 2,
sizeof(cols[0]), 0);
421 if (ndbRecordSharedDataPtr->groupTableAllowReadNdbRecord == NULL)
423 ndbout <<
"Error creating record 2.1: " << dict->
getNdbError() << endl;
427 cols[1].column= tab->
getColumn((
int) IND_GROUP_ALLOW_INSERT);
429 cols[1].nullbit_byte_offset= 0;
430 cols[1].nullbit_bit_in_byte= 0;
432 ndbRecordSharedDataPtr->groupTableAllowInsertNdbRecord=
433 dict->createRecord(tab, cols, 2,
sizeof(cols[0]), 0);
435 if (ndbRecordSharedDataPtr->groupTableAllowInsertNdbRecord == NULL)
437 ndbout <<
"Error creating record 2.2: " << dict->
getNdbError() << endl;
441 cols[1].column= tab->
getColumn((
int) IND_GROUP_ALLOW_DELETE);
443 cols[1].nullbit_byte_offset= 0;
444 cols[1].nullbit_bit_in_byte= 0;
446 ndbRecordSharedDataPtr->groupTableAllowDeleteNdbRecord=
447 dict->createRecord(tab, cols, 2,
sizeof(cols[0]), 0);
449 if (ndbRecordSharedDataPtr->groupTableAllowDeleteNdbRecord == NULL)
451 ndbout <<
"Error creating record 2.3: " << dict->
getNdbError() << endl;
456 cols[0].column= tab->
getColumn((
int) IND_SESSION_SUBSCRIBER);
458 cols[0].nullbit_byte_offset= 0;
459 cols[0].nullbit_bit_in_byte= 0;
460 cols[1].column= tab->
getColumn((
int) IND_SESSION_SERVER);
462 cols[1].nullbit_byte_offset= 0;
463 cols[1].nullbit_bit_in_byte= 0;
464 cols[2].column= tab->
getColumn((
int) IND_SESSION_DATA);
466 cols[2].nullbit_byte_offset= 0;
467 cols[2].nullbit_bit_in_byte= 0;
469 ndbRecordSharedDataPtr->sessionTableNdbRecord=
470 dict->createRecord(tab, cols, 3,
sizeof(cols[0]), 0);
472 if (ndbRecordSharedDataPtr->sessionTableNdbRecord == NULL)
474 ndbout <<
"Error creating record 3 : " << dict->
getNdbError() << endl;
479 cols[0].column= tab->
getColumn((
int) IND_SERVER_SUBSCRIBER_SUFFIX);
481 cols[0].nullbit_byte_offset= 0;
482 cols[0].nullbit_bit_in_byte= 0;
483 cols[1].column= tab->
getColumn((
int) IND_SERVER_ID);
485 cols[1].nullbit_byte_offset= 0;
486 cols[1].nullbit_bit_in_byte= 0;
492 ndbRecordSharedDataPtr->serverTableNdbRecord=
493 dict->createRecord(tab, cols, 2,
sizeof(cols[0]), 0);
495 if (ndbRecordSharedDataPtr->serverTableNdbRecord == NULL)
497 ndbout <<
"Error creating record 4 : " << dict->
getNdbError() << endl;
504 if (prog1->add_val(IND_SERVER_READS, (Uint32)1) ||
505 prog1->interpret_exit_ok() ||
508 ndbout <<
"Program 1 definition failed, exiting." << endl;
514 if (prog2->add_val(IND_SERVER_INSERTS, (Uint32)1) ||
515 prog2->interpret_exit_ok() ||
518 ndbout <<
"Program 2 definition failed, exiting." << endl;
524 if (prog3->add_val(IND_SERVER_DELETES, (Uint32)1) ||
525 prog3->interpret_exit_ok() ||
528 ndbout <<
"Program 3 definition failed, exiting." << endl;
532 ndbRecordSharedDataPtr->incrServerReadsProg= prog1;
533 ndbRecordSharedDataPtr->incrServerInsertsProg= prog2;
534 ndbRecordSharedDataPtr->incrServerDeletesProg= prog3;
536 asyncDbDisconnect(tempNdb);
539 for(i = 0; i < numProcesses; i++) {
540 for(j = 0; j<parallellism; j++){
541 int tid= i*parallellism + j;
542 data[tid].warmUpSeconds = numWarmSeconds;
543 data[tid].testSeconds = numSeconds;
544 data[tid].coolDownSeconds = numWarmSeconds;
545 data[tid].randomSeed =
546 (
unsigned long)(NdbTick_CurrentMillisecond()+i+j);
547 data[tid].changedTime = 0;
549 data[tid].ndbRecordSharedData = ndbRecordSharedDataPtr;
550 data[tid].useCombinedUpdate = useCombUpd;
552 sprintf(threadName,
"AsyncThread[%d]", i);
553 pThread = NdbThread_Create(threadRoutine,
554 (
void**)&data[i*parallellism],
557 NDB_THREAD_PRIO_LOW);
558 if(pThread != 0 && pThread != NULL){
559 (&data[i*parallellism])->pThread = pThread;
561 perror(
"Failed to create thread");
571 for(i = 0; i < numProcesses; i++) {
572 NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
573 NdbThread_Destroy(&data[i*parallellism].pThread);
576 ndbout_c(
"All threads have finished");
580 free(ndbRecordSharedDataPtr);
589 stats.totalTransactions = 0;
590 stats.outerTps = 0.0;
592 for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
593 stats.transactions[
i].count = 0;
594 stats.transactions[
i].branchExecuted = 0;
595 stats.transactions[
i].rollbackExecuted = 0;
596 stats.transactions[
i].latency.reset();
602 for(i = 0; i < numProcesses; i++) {
603 for(k = 0; k<parallellism; k++){
604 p = &data[i*parallellism+k].generator;
606 stats.totalTransactions += p->totalTransactions;
607 stats.outerTps += p->outerTps;
609 for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
610 stats.transactions[j].count +=
611 p->transactions[j].count;
612 stats.transactions[j].branchExecuted +=
613 p->transactions[j].branchExecuted;
614 stats.transactions[j].rollbackExecuted +=
615 p->transactions[j].rollbackExecuted;
616 stats.transactions[j].latency +=
617 p->transactions[j].latency;
622 print_stats(
"Test Results",
631 NDBT_ProgramExit(rc);
639 #include <sys/types.h>
642 #include "ndb_error.hpp"
643 #include "userInterface.h"
644 #include <NdbMutex.h>
645 #include <NdbThread.h>
647 #include <NdbApi.hpp>
648 #include <NdbOut.hpp>
667 asyncDbConnect(
int parallellism){
668 Ndb * pNDB =
new Ndb(g_cluster_connection,
"TEST_DB");
670 pNDB->
init(parallellism + 1);
679 asyncDbDisconnect(
Ndb* pNDB)
687 static bool initialized =
false;
688 static NDB_TICKS initSecs = 0;
689 static Uint32 initMicros = 0;
690 double timeValue = 0;
692 if ( !initialized ) {
694 NdbTick_CurrentMicrosecond(&initSecs, &initMicros);
700 NdbTick_CurrentMicrosecond(&secs, µs);
701 double s = (double)secs - (
double)initSecs;
702 double us = (double)micros - (
double)initMicros;
704 timeValue = s + (us / 1000000.0);
714 now = ::time((time_t*)NULL);
715 tm_now = ::gmtime(&now);
718 "%d-%.2d-%.2d %.2d:%.2d:%.2d",
719 tm_now->tm_year + 1900,
726 ndbout_c(
"Time: %s", buf);