19 #include <ndb_global.h>
23 #include <NdbThread.h>
27 #include <NdbTest.hpp>
29 #include "userInterface.h"
30 #include "dbGenerator.h"
32 static int numProcesses;
33 static int numSeconds;
34 static int numWarmSeconds;
35 static int parallellism;
36 static int millisSendPoll;
37 static int minEventSendPoll;
38 static int forceSendPoll;
42 static void usage(
const char *prog)
49 progname = strrchr(prog,
'/');
57 "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] "
58 "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
59 " -proc <num> Specifies that <num> is the number of\n"
60 " threads. The default is 1.\n"
61 " -time <num> Specifies that the test will run for <num> sec.\n"
62 " The default is 10 sec\n"
63 " -warm <num> Specifies the warm-up/cooldown period of <num> "
65 " The default is 10 sec\n"
66 " -p <num> The no of parallell transactions started by "
68 " -e <num> Minimum no of events before wake up in call to "
71 " -f <num> force parameter to sendPoll\n"
78 parse_args(
int argc,
const char **argv)
86 millisSendPoll = 10000;
93 if (strcmp(
"-proc",argv[i]) == 0) {
97 if (sscanf(argv[i+1],
"%d", &numProcesses) == -1 ||
98 numProcesses <= 0 || numProcesses > 127) {
99 ndbout_c(
"-proc flag requires a positive integer argument [1..127]");
103 }
else if (strcmp(
"-p", argv[i]) == 0){
108 if (sscanf(argv[i+1],
"%d", ¶llellism) == -1 ||
110 ndbout_c(
"-p flag requires a positive integer argument");
115 else if (strcmp(
"-time",argv[i]) == 0) {
119 if (sscanf(argv[i+1],
"%d", &numSeconds) == -1 ||
121 ndbout_c(
"-time flag requires a positive integer argument");
126 else if (strcmp(
"-warm",argv[i]) == 0) {
130 if (sscanf(argv[i+1],
"%d", &numWarmSeconds) == -1 ||
131 numWarmSeconds < 0) {
132 ndbout_c(
"-warm flag requires a positive integer argument");
137 else if (strcmp(
"-e",argv[i]) == 0) {
141 if (sscanf(argv[i+1],
"%d", &minEventSendPoll) == -1 ||
142 minEventSendPoll < 0) {
143 ndbout_c(
"-e flag requires a positive integer argument");
148 else if (strcmp(
"-f",argv[i]) == 0) {
153 if (sscanf(argv[i+1],
"%d", &forceSendPoll) == -1 ||
155 ndbout_c(
"-f flag requires a positive integer argument");
165 if(minEventSendPoll > parallellism){
166 ndbout_c(
"minEventSendPoll(%d) > parallellism(%d)",
167 minEventSendPoll, parallellism);
168 ndbout_c(
"not very good...");
169 ndbout_c(
"very bad...");
170 ndbout_c(
"exiting...");
178 print_transaction(
const char *header,
179 unsigned long totalCount,
181 unsigned int printBranch,
182 unsigned int printRollback)
186 ndbout_c(
" %s: %d (%.2f%%) "
187 "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
190 (
double)trans->count / (
double)totalCount * 100.0,
191 (
int)trans->latency.getMean(),
192 (int)trans->latency.getMin(),
193 (int)trans->latency.getMax(),
194 (int)trans->latency.getStddev(),
195 (int)trans->latency.getCount()
199 if( trans->count == 0 )
202 f = (double)trans->branchExecuted / (
double)trans->count * 100.0;
203 ndbout_c(
" Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
207 if( trans->count == 0 )
210 f = (double)trans->rollbackExecuted / (
double)trans->count * 100.0;
211 ndbout_c(
" Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
216 print_stats(
const char *title,
218 unsigned int transactionFlag,
220 int numProc,
int parallellism)
224 char name[MAXHOSTNAMELEN];
227 NdbHost_GetHostName(name);
229 ndbout_c(
"\n------ %s ------",title);
230 ndbout_c(
"Length : %d %s",
232 transactionFlag ?
"Transactions" :
"sec");
233 ndbout_c(
"Processor : %s", name);
234 ndbout_c(
"Number of Proc: %d",numProc);
235 ndbout_c(
"Parallellism : %d", parallellism);
238 if( gen->totalTransactions == 0 ) {
239 ndbout_c(
" No Transactions for this test");
242 for(i = 0; i < 5; i++) {
243 sprintf(buf,
"T%d",i+1);
244 print_transaction(buf,
245 gen->totalTransactions,
246 &gen->transactions[i],
252 ndbout_c(
" Overall Statistics:");
253 ndbout_c(
" Transactions: %d", gen->totalTransactions);
254 ndbout_c(
" Outer : %.0f TPS",gen->outerTps);
261 threadRoutine(
void *arg)
267 pNDB = asyncDbConnect(parallellism);
270 for(i = 0; i<parallellism; i++){
273 millisSendPoll = 30000;
274 asyncGenerator(data, parallellism,
275 millisSendPoll, minEventSendPoll, forceSendPoll);
277 asyncDbDisconnect(pNDB);
282 NDB_COMMAND(DbAsyncGenerator,
"DbAsyncGenerator",
283 "DbAsyncGenerator",
"DbAsyncGenerator", 65535)
295 if(parse_args(argc,argv) != 0){
297 return NDBT_ProgramExit(NDBT_WRONGARGS);
301 ndbout_c(
"\nStarting Test with %d process(es) for %d %s parallellism %d",
307 ndbout_c(
" WarmUp/coolDown = %d sec", numWarmSeconds);
311 for(i = 0; i < numProcesses; i++) {
312 for(j = 0; j<parallellism; j++){
313 data[i*parallellism+j].warmUpSeconds = numWarmSeconds;
314 data[i*parallellism+j].testSeconds = numSeconds;
315 data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
316 data[i*parallellism+j].randomSeed =
317 NdbTick_CurrentMillisecond()+i+j;
318 data[i*parallellism+j].changedTime = 0;
319 data[i*parallellism+j].
runState = Runnable;
321 sprintf(threadName,
"AsyncThread[%d]", i);
322 pThread = NdbThread_Create(threadRoutine,
323 (
void**)&data[i*parallellism],
326 NDB_THREAD_PRIO_LOW);
327 if(pThread != 0 && pThread != NULL){
328 (&data[i*parallellism])->pThread = pThread;
330 perror(
"Failed to create thread");
340 for(i = 0; i < numProcesses; i++) {
341 NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
342 NdbThread_Destroy(&data[i*parallellism].pThread);
345 ndbout_c(
"All threads have finished");
350 stats.totalTransactions = 0;
351 stats.outerTps = 0.0;
353 for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
354 stats.transactions[
i].count = 0;
355 stats.transactions[
i].branchExecuted = 0;
356 stats.transactions[
i].rollbackExecuted = 0;
357 stats.transactions[
i].latency.reset();
363 for(i = 0; i < numProcesses; i++) {
364 for(k = 0; k<parallellism; k++){
365 p = &data[i*parallellism+k].generator;
367 stats.totalTransactions += p->totalTransactions;
368 stats.outerTps += p->outerTps;
370 for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
371 stats.transactions[j].count +=
372 p->transactions[j].count;
373 stats.transactions[j].branchExecuted +=
374 p->transactions[j].branchExecuted;
375 stats.transactions[j].rollbackExecuted +=
376 p->transactions[j].rollbackExecuted;
377 stats.transactions[j].latency +=
378 p->transactions[j].latency;
383 print_stats(
"Test Results",
392 NDBT_ProgramExit(rc);