19 #include <ndb_global.h>
21 #include "TransporterRegistry.hpp"
22 #include "TransporterDefinitions.hpp"
23 #include "TransporterCallback.hpp"
24 #include <RefConvert.hpp>
26 #include "prioTransporterTest.hpp"
33 int basePortTCP = 17000;
35 SCI_TransporterConfiguration sciTemplate = {
51 SHM_TransporterConfiguration shmTemplate = {
62 TCP_TransporterConfiguration tcpTemplate = {
82 signalHandler(
int signo){
83 ::signal(13, signalHandler);
85 sprintf(buf,
"Signal: %d\n", signo);
86 ndbout << buf << endl;
90 usage(
const char * progName){
91 ndbout <<
"Usage: " << progName <<
" localNodeId localHostName"
93 <<
" [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
94 ndbout <<
" localNodeId - {1,2}" << endl;
97 typedef void (* CreateTransporterFunc)(
void * conf,
100 const char * localHostName,
101 const char * remoteHostName,
106 createSCITransporter(
void * _conf,
109 const char * localHostName,
110 const char * remoteHostName,
115 ndbout <<
"Creating SCI transporter from node "
116 << localNodeId <<
"(" << localHostName <<
") to "
117 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
120 SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
122 conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
123 conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
126 conf->localNodeId = localNodeId;
127 conf->remoteNodeId = remoteNodeId;
129 bool res = tReg->createTransporter(conf);
131 ndbout <<
"... -- Success " << endl;
133 ndbout <<
"... -- Failure " << endl;
137 createSHMTransporter(
void * _conf,
140 const char * localHostName,
141 const char * remoteHostName,
146 ndbout <<
"Creating SHM transporter from node "
147 << localNodeId <<
"(" << localHostName <<
") to "
148 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
151 SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
154 conf->localNodeId = localNodeId;
155 conf->remoteNodeId = remoteNodeId;
157 bool res = tReg->createTransporter(conf);
159 ndbout <<
"... -- Success " << endl;
161 ndbout <<
"... -- Failure " << endl;
166 createTCPTransporter(
void * _conf,
169 const char * localHostName,
170 const char * remoteHostName,
173 ndbout <<
"Creating TCP transporter from node "
174 << localNodeId <<
"(" << localHostName <<
") to "
175 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
177 TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
180 if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
181 if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
182 if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
183 if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
184 if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
185 if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
188 conf->sendBufferSize = sendBuf;
191 conf->maxReceiveSize = recvBuf;
194 ndbout <<
"\tSendBufferSize: " << conf->sendBufferSize << endl;
195 ndbout <<
"\tReceiveBufferSize: " << conf->maxReceiveSize << endl;
197 conf->localNodeId = localNodeId;
198 conf->localHostName = localHostName;
199 conf->remoteNodeId = remoteNodeId;
200 conf->remoteHostName = remoteHostName;
202 bool res = tReg->createTransporter(conf);
204 ndbout <<
"... -- Success " << endl;
206 ndbout <<
"... -- Failure " << endl;
213 int noOfSignalReceived;
217 NDB_TICKS startTimePrioA;
218 NDB_TICKS stopTimePrioA;
219 NDB_TICKS totTimePrioA;
220 int bytesSentBeforePrioA;
223 Uint64 sendLenBytes, sendCount;
224 Uint64 recvLenBytes, recvCount;
228 { 1, 10, 0,0, 0,0,0,0,0,0,0 }
229 ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 }
230 ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 }
231 ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 }
233 ,{ 8, 10, 0,0, 0,0,0,0,0,0,0 }
234 ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 }
235 ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 }
236 ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 }
238 ,{ 16, 10, 0,0, 0,0,0,0,0,0,0 }
239 ,{ 16, 100, 0,0, 0,0,0,0,0,0,0 }
240 ,{ 16, 1000, 0,0, 0,0,0,0,0,0,0 }
241 ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 }
243 ,{ 24, 10, 0,0, 0,0,0,0,0,0,0 }
244 ,{ 24, 100, 0,0, 0,0,0,0,0,0,0 }
245 ,{ 24, 1000, 0,0, 0,0,0,0,0,0,0 }
246 ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 }
248 ,{ 0, 10, 0,0, 0,0,0,0,0,0,0 }
249 ,{ 0, 100, 0,0, 0,0,0,0,0,0,0 }
250 ,{ 0, 1000, 0,0, 0,0,0,0,0,0,0 }
251 ,{ 0, 10000, 0,0, 0,0,0,0,0,0,0 }
254 const int noOfTests =
sizeof(testSpec)/
sizeof(
TestPhase);
257 sendSignalTo(NodeId nodeId,
int signalSize,
int prio){
259 signalSize = (rand() % 25) + 1;
262 sh.theLength = signalSize;
263 sh.theVerId_signalNumber = rand();
264 sh.theReceiversBlockNumber = rand();
265 sh.theSendersBlockRef = rand();
266 sh.theSendersSignalId = rand();
267 sh.theSignalId = rand();
268 sh.theTrace = rand();
271 for(
int i = 0;
i<signalSize;
i++)
272 theData[
i] = (
i+1) * (Uint32)(&theData[
i]);
274 return tReg->
prepareSend(&sh, prio, theData, nodeId);
279 ndbout <<
"#Sigs\tSz\tPayload\tTime\tSig/sec\tBps\t"
280 <<
"s len\tr len\tprioAtime\tbytesb4pA" << endl;
286 Uint32 secs = (p.accTime/p.loopCount)/1000;
287 Uint32 mill = (p.accTime/p.loopCount)%1000;
290 sprintf(st,
"%d.%.2ds", secs, (mill/10));
292 sprintf(st,
"%dms", mill);
295 Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
296 Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
297 if(p.signalSize == 0)
298 ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
303 sprintf(ssps,
"%dM", (
int)sps);
304 }
else if(sps > 1000){
306 sprintf(ssps,
"%dk", (
int)sps);
308 sprintf(ssps,
"%d", (
int)sps);
314 sprintf(sbps,
"%dM", bps);
317 sprintf(sbps,
"%dk", bps);
319 sprintf(sbps,
"%d", bps);
323 if(p.signalSize != 0){
325 "%d\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
332 (
int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
333 (
int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
334 (
int)(p.totTimePrioA / p.loopCount),
335 (
int)(p.bytesSentBeforePrioA));
338 "%d\trand\t4*rand\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
343 (
int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
344 (
int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
345 (
int)(p.totTimePrioA / p.loopCount),
346 (
int)(p.bytesSentBeforePrioA));
349 ndbout << buf << endl;
357 NDB_TICKS startSec=0;
364 bool isClient =
false;
365 bool isConnected =
false;
366 bool isStarted =
false;
367 int currentPhase = 0;
370 NDB_TICKS startTime, stopTime;
373 client(NodeId remoteNodeId){
377 memcpy(allPhases, testSpec,
sizeof(testSpec));
382 TestPhase * current = &allPhases[currentPhase];
383 if(current->noOfSignals == current->noOfSignalSent &&
384 current->noOfSignals == current->noOfSignalReceived){
389 current->stopTime = NdbTick_CurrentMillisecond();
390 current->accTime += (current->stopTime - current->startTime);
392 NdbSleep_MilliSleep(500 / loopCount);
394 current->startTime = NdbTick_CurrentMillisecond();
396 current->noOfSignalSent = 0;
397 current->noOfSignalReceived = 0;
399 current->loopCount ++;
400 if(current->loopCount == loopCount){
402 printReport(allPhases[currentPhase]);
405 if(currentPhase == noOfTests){
411 NdbSleep_MilliSleep(500);
412 current = &allPhases[currentPhase];
413 current->startTime = NdbTick_CurrentMillisecond();
416 int signalsLeft = current->noOfSignals - current->noOfSignalSent;
418 for(; signalsLeft > 1; signalsLeft--){
419 if(sendSignalTo(remoteNodeId, current->signalSize, 1) == SEND_OK) {
420 current->noOfSignalSent++;
422 current->bytesSentBeforePrioA += (current->signalSize << 2);
433 int ret = NdbTick_CurrentMicrosecond(&sec,µ);
435 current->startTimePrioA = micro + sec*1000000;
436 if(sendSignalTo(remoteNodeId, current->signalSize, 0) == SEND_OK) {
437 current->noOfSignalSent++;
447 if(counter % 10 == 0)
448 tReg->checkConnections();
459 for(
int i = 0; i<noOfTests; i++)
460 signalToEcho += testSpec[i].noOfSignals;
462 signalToEcho *= loopCount;
464 while(signalToEcho > 0){
465 tReg->checkConnections();
466 for(
int i = 0; i<10; i++)
472 prioTransporterTest(TestType tt,
const char * progName,
473 int argc,
const char **argv){
491 const NodeId localNodeId = atoi(argv[1]);
492 const char * localHostName = argv[2];
493 const char * remoteHost1 = argv[3];
496 loopCount = atoi(argv[4]);
498 sendBufSz = atoi(argv[5]);
500 recvBufSz = atoi(argv[6]);
502 if(localNodeId < 1 || localNodeId > 2){
503 ndbout <<
"localNodeId = " << localNodeId << endl << endl;
509 ndbout <<
"-- ECHO CLIENT --" << endl;
511 ndbout <<
"-- ECHO SERVER --" << endl;
513 ndbout <<
"localNodeId: " << localNodeId << endl;
514 ndbout <<
"localHostName: " << localHostName << endl;
515 ndbout <<
"remoteHost1 (node " << (localNodeId == 1?2:1) <<
"): "
516 << remoteHost1 << endl;
517 ndbout <<
"Loop count: " << loopCount << endl;
518 ndbout <<
"-----------------" << endl;
520 void * confTemplate = 0;
521 CreateTransporterFunc func = 0;
524 func = createTCPTransporter;
525 confTemplate = &tcpTemplate;
528 func = createSCITransporter;
529 confTemplate = &sciTemplate;
532 func = createSHMTransporter;
533 confTemplate = &shmTemplate;
536 ndbout <<
"Unsupported transporter type" << endl;
540 ndbout <<
"Creating transporter registry" << endl;
542 tReg->init(localNodeId);
546 (* func)(confTemplate, 1, 2, localHostName, remoteHost1,
547 sendBufSz, recvBufSz);
550 (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
551 sendBufSz, recvBufSz);
555 ndbout <<
"Doing startSending/startReceiving" << endl;
559 ndbout <<
"Connecting" << endl;
560 tReg->setPerformState(PerformConnect);
561 tReg->checkConnections();
570 ndbout <<
"Sleep 3 secs" << endl;
571 NdbSleep_SecSleep(3);
573 ndbout <<
"Doing setPerformState(Disconnect)" << endl;
574 tReg->setPerformState(PerformDisconnect);
576 ndbout <<
"Doing checkConnections()" << endl;
577 tReg->checkConnections();
579 ndbout <<
"Deleting transporter registry" << endl;
580 delete tReg; tReg = 0;
586 out <<
"-- Signal Header --" << endl;
587 out <<
"theLength: " << sh.theLength << endl;
588 out <<
"gsn: " << sh.theVerId_signalNumber << endl;
589 out <<
"recBlockNo: " << sh.theReceiversBlockNumber << endl;
590 out <<
"sendBlockRef: " << sh.theSendersBlockRef << endl;
591 out <<
"sendersSig: " << sh.theSendersSignalId << endl;
592 out <<
"theSignalId: " << sh.theSignalId << endl;
593 out <<
"trace: " << (int)sh.theTrace << endl;
598 execute(
SignalHeader *
const header, Uint8 prio, Uint32 *
const theData){
599 const NodeId nodeId = refToNode(header->theSendersBlockRef);
602 int ret = NdbTick_CurrentMicrosecond(&sec,µ);
603 if(prio == 0 && isClient && ret == 0) {
604 allPhases[currentPhase].stopTimePrioA = micro + sec*1000000;
605 allPhases[currentPhase].totTimePrioA +=
606 allPhases[currentPhase].stopTimePrioA -
607 allPhases[currentPhase].startTimePrioA;
610 allPhases[currentPhase].totTimePrioA = -1;
613 allPhases[currentPhase].noOfSignalReceived++;
616 while(tReg->
prepareSend(header, prio, theData, nodeId) != SEND_OK){
617 ndbout <<
"Failed to echo" << sleepTime << endl;
618 NdbSleep_MilliSleep(sleepTime);
627 reportError(NodeId nodeId, TransporterError errorCode){
629 sprintf(buf,
"reportError (%d, %x) in perfTest", nodeId, errorCode);
630 ndbout << buf << endl;
631 if(errorCode & 0x8000){
632 tReg->setPerformState(nodeId, PerformDisconnect);
640 reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes){
641 allPhases[currentPhase].sendCount += count;
642 allPhases[currentPhase].sendLenBytes += bytes;
645 ndbout <<
"reportSendLen(" << nodeId <<
", "
646 << (bytes/count) <<
")" << endl;
654 reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes){
655 allPhases[currentPhase].recvCount += count;
656 allPhases[currentPhase].recvLenBytes += bytes;
659 ndbout <<
"reportReceiveLen(" << nodeId <<
", "
660 << (bytes/count) <<
")" << endl;
668 reportConnect(NodeId nodeId){
670 sprintf(buf,
"reportConnect(%d)", nodeId);
671 ndbout << buf << endl;
672 tReg->setPerformState(nodeId, PerformIO);
676 startTime = NdbTick_CurrentMillisecond();
679 allPhases[0].startTime = startTime;
684 TestPhase * current = &allPhases[currentPhase];
685 current->noOfSignalSent = current->noOfSignalReceived;
693 reportDisconnect(NodeId nodeId, Uint32 errNo){
695 sprintf(buf,
"reportDisconnect(%d)", nodeId);
696 ndbout << buf << endl;
699 tReg->setPerformState(nodeId, PerformConnect);