19 #include <ndb_global.h>
21 #include "TransporterRegistry.hpp"
22 #include "TransporterDefinitions.hpp"
23 #include "TransporterCallback.hpp"
24 #include <RefConvert.hpp>
31 int basePortTCP = 17000;
33 SCI_TransporterConfiguration sciTemplate = {
48 TCP_TransporterConfiguration tcpTemplate = {
62 SHM_TransporterConfiguration shmTemplate = {
79 signalHandler(
int signo){
80 ::signal(13, signalHandler);
82 sprintf(buf,
"Signal: %d\n", signo);
83 ndbout << buf << endl;
87 usage(
const char * progName){
88 ndbout <<
"Usage: " << progName <<
" <type> localNodeId localHostName"
89 <<
" remoteHostName1 remoteHostName2" << endl;
90 ndbout <<
" type = shm tcp ose sci" << endl;
91 ndbout <<
" localNodeId - 1 to 3" << endl;
94 typedef void (* CreateTransporterFunc)(
void * conf,
97 const char * localHostName,
98 const char * remoteHostName);
100 void createSCITransporter(
void *, NodeId, NodeId,
const char *,
const char *);
101 void createTCPTransporter(
void *, NodeId, NodeId,
const char *,
const char *);
102 void createSHMTransporter(
void *, NodeId, NodeId,
const char *,
const char *);
104 int signalReceived[4];
107 main(
int argc,
const char **argv){
111 for(
int i = 0;
i<4;
i++)
112 signalReceived[
i] = 0;
119 Uint32 noOfConnections = 0;
120 const char * progName = argv[0];
121 const char *
type = argv[1];
122 const NodeId localNodeId = atoi(argv[2]);
123 const char * localHostName = argv[3];
124 const char * remoteHost1 = argv[4];
125 const char * remoteHost2 = NULL;
131 remoteHost2 = argv[5];
134 if(localNodeId < 1 || localNodeId > 3){
135 ndbout <<
"localNodeId = " << localNodeId << endl << endl;
140 ndbout <<
"-----------------" << endl;
141 ndbout <<
"localNodeId: " << localNodeId << endl;
142 ndbout <<
"localHostName: " << localHostName << endl;
143 ndbout <<
"remoteHost1 (node " << (localNodeId == 1?2:1) <<
"): "
144 << remoteHost1 << endl;
145 if(noOfConnections == 2){
146 ndbout <<
"remoteHost2 (node " << (localNodeId == 3?2:3) <<
"): "
147 << remoteHost2 << endl;
149 ndbout <<
"-----------------" << endl;
151 void * confTemplate = 0;
152 CreateTransporterFunc func = 0;
154 if(strcasecmp(type,
"tcp") == 0){
155 func = createTCPTransporter;
156 confTemplate = &tcpTemplate;
157 }
else if(strcasecmp(type,
"sci") == 0){
158 func = createSCITransporter;
159 confTemplate = &sciTemplate;
160 }
else if(strcasecmp(type,
"shm") == 0){
161 func = createSHMTransporter;
162 confTemplate = &shmTemplate;
164 ndbout <<
"Unsupported transporter type" << endl;
168 ndbout <<
"Creating transporter registry" << endl;
170 tReg->init(localNodeId);
174 (* func)(confTemplate, 1, 2, localHostName, remoteHost1);
175 if(noOfConnections == 2)
176 (* func)(confTemplate, 1, 3, localHostName, remoteHost2);
179 (* func)(confTemplate, 2, 1, localHostName, remoteHost1);
180 if(noOfConnections == 2)
181 (* func)(confTemplate, 2, 3, localHostName, remoteHost2);
184 (* func)(confTemplate, 3, 1, localHostName, remoteHost1);
185 if(noOfConnections == 2)
186 (* func)(confTemplate, 3, 2, localHostName, remoteHost2);
190 ndbout <<
"Doing startSending/startReceiving" << endl;
194 ndbout <<
"Connecting" << endl;
195 tReg->setPerformState(PerformConnect);
196 tReg->checkConnections();
201 for(
int i = 0;
i<4;
i++)
202 sum += signalReceived[
i];
204 tReg->checkConnections();
207 NdbSleep_MilliSleep(500);
209 ndbout <<
"In main loop" << endl;
210 }
while(sum != 2*noOfConnections);
212 ndbout <<
"Doing setPerformState(Disconnect)" << endl;
213 tReg->setPerformState(PerformDisconnect);
215 ndbout <<
"Doing checkConnections()" << endl;
216 tReg->checkConnections();
218 ndbout <<
"Sleeping 3 secs" << endl;
219 NdbSleep_SecSleep(3);
221 ndbout <<
"Deleting transporter registry" << endl;
222 delete tReg; tReg = 0;
228 checkData(
SignalHeader *
const header, Uint8 prio, Uint32 *
const theData,
230 Uint32 expectedLength = 0;
236 if(header->theLength != expectedLength){
237 ndbout <<
"Unexpected signal length: " << header->theLength
238 <<
" expected: " << expectedLength << endl;
242 if(header->theVerId_signalNumber != expectedLength + 1)
245 if(header->theReceiversBlockNumber != expectedLength + 2)
248 if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)
251 if(header->theSendersSignalId != expectedLength + 5)
254 if(header->theTrace != expectedLength + 6)
257 if(header->m_noOfSections != (prio == 0 ? 0 : 1))
260 if(header->m_fragmentInfo != (prio + 1))
263 Uint32 dataWordStart = header->theLength ;
264 for(
unsigned i = 0;
i<header->theLength;
i++){
266 ndbout <<
"data corrupt!\n" << endl;
269 dataWordStart ^= (~
i*
i);
273 ndbout_c(
"Found section");
274 if(ptr[0].sz != header->theLength)
277 if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)
283 sendSignalTo(NodeId nodeId,
int prio){
285 sh.theLength = (prio == 0 ? 17 : 19);
286 sh.theVerId_signalNumber = sh.theLength + 1;
287 sh.theReceiversBlockNumber = sh.theLength + 2;
288 sh.theSendersBlockRef = sh.theLength + 3;
289 sh.theSendersSignalId = sh.theLength + 4;
290 sh.theSignalId = sh.theLength + 5;
291 sh.theTrace = sh.theLength + 6;
292 sh.m_noOfSections = (prio == 0 ? 0 : 1);
293 sh.m_fragmentInfo = prio + 1;
297 Uint32 dataWordStart = sh.theLength;
298 for(
unsigned i = 0;
i<sh.theLength;
i++){
300 dataWordStart ^= (~
i*
i);
302 ndbout <<
"Sending prio " << (int)prio <<
" signal to node: "
304 <<
" gsn = " << sh.theVerId_signalNumber << endl;
307 ptr[0].p = &theData[0];
308 ptr[0].sz = sh.theLength;
310 SendStatus s = tReg->
prepareSend(&sh, prio, theData, nodeId, ptr);
312 ndbout <<
"Send was not ok. Send was: " << s << endl;
317 execute(
void* callbackObj,
318 SignalHeader *
const header, Uint8 prio, Uint32 *
const theData,
320 const NodeId nodeId = refToNode(header->theSendersBlockRef);
322 ndbout <<
"Recieved prio " << (int)prio <<
" signal from node: "
324 <<
" gsn = " << header->theVerId_signalNumber << endl;
325 checkData(header, prio, theData, ptr);
326 ndbout <<
" Data is ok!\n" << endl;
328 signalReceived[nodeId]++;
331 sendSignalTo(nodeId, 1);
333 tReg->setPerformState(nodeId, PerformDisconnect);
337 copy(Uint32 * & insertPtr,
343 reportError(
void* callbackObj, NodeId nodeId, TransporterError errorCode){
345 sprintf(buf,
"reportError (%d, %x)", nodeId, errorCode);
346 ndbout << buf << endl;
347 if(errorCode & 0x8000){
348 tReg->setPerformState(nodeId, PerformDisconnect);
357 reportSendLen(
void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
359 sprintf(buf,
"reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));
360 ndbout << buf << endl;
367 reportReceiveLen(
void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
369 sprintf(buf,
"reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));
370 ndbout << buf << endl;
377 reportConnect(
void* callbackObj, NodeId nodeId){
379 sprintf(buf,
"reportConnect(%d)", nodeId);
380 ndbout << buf << endl;
381 tReg->setPerformState(nodeId, PerformIO);
383 sendSignalTo(nodeId, 0);
390 reportDisconnect(
void* callbackObj, NodeId nodeId, Uint32 errNo){
392 sprintf(buf,
"reportDisconnect(%d)", nodeId);
393 ndbout << buf << endl;
394 if(signalReceived[nodeId] < 2)
395 tReg->setPerformState(nodeId, PerformConnect);
408 createOSETransporter(
void * _conf,
411 const char * localHostName,
412 const char * remoteHostName){
413 ndbout <<
"Creating OSE transporter from node "
414 << localNodeId <<
"(" << localHostName <<
") to "
415 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
417 OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
419 conf->localNodeId = localNodeId;
420 conf->localHostName = localHostName;
421 conf->remoteNodeId = remoteNodeId;
422 conf->remoteHostName = remoteHostName;
423 bool res = tReg->createTransporter(conf);
425 ndbout <<
"... -- Success " << endl;
427 ndbout <<
"... -- Failure " << endl;
431 createTCPTransporter(
void * _conf,
434 const char * localHostName,
435 const char * remoteHostName){
436 ndbout <<
"Creating TCP transporter from node "
437 << localNodeId <<
"(" << localHostName <<
") to "
438 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
440 TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
443 if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
444 if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
445 if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
446 if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
447 if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
448 if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
450 conf->localNodeId = localNodeId;
451 conf->localHostName = localHostName;
452 conf->remoteNodeId = remoteNodeId;
453 conf->remoteHostName = remoteHostName;
455 bool res = tReg->createTransporter(conf);
457 ndbout <<
"... -- Success " << endl;
459 ndbout <<
"... -- Failure " << endl;
463 createSCITransporter(
void * _conf,
466 const char * localHostName,
467 const char * remoteHostName){
470 ndbout <<
"Creating SCI transporter from node "
471 << localNodeId <<
"(" << localHostName <<
") to "
472 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
475 SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
477 conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
478 conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
481 conf->localNodeId = localNodeId;
482 conf->remoteNodeId = remoteNodeId;
484 bool res = tReg->createTransporter(conf);
486 ndbout <<
"... -- Success " << endl;
488 ndbout <<
"... -- Failure " << endl;
492 createSHMTransporter(
void * _conf,
495 const char * localHostName,
496 const char * remoteHostName){
499 ndbout <<
"Creating SHM transporter from node "
500 << localNodeId <<
"(" << localHostName <<
") to "
501 << remoteNodeId <<
"(" << remoteHostName <<
")..." << endl;;
504 SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
506 conf->localNodeId = localNodeId;
507 conf->remoteNodeId = remoteNodeId;
509 bool res = tReg->createTransporter(conf);
511 ndbout <<
"... -- Success " << endl;
513 ndbout <<
"... -- Failure " << endl;