18 #include <ndb_global.h>
21 #include <TransporterRegistry.hpp>
22 #include <TransporterCallback.hpp>
23 #include <RefConvert.hpp>
26 Uint32 MAX_RECEIVED_SIGNALS = 1024;
28 #define MAX_RECEIVED_SIGNALS 1024
32 TransporterRegistry::unpack(Uint32 * readPtr,
40 Uint32 loop_count = 0;
42 if(state == NoHalt || state == HaltOutput){
43 while ((sizeOfData >= 4 +
sizeof(
Protocol6)) &&
44 (loop_count < MAX_RECEIVED_SIGNALS)) {
45 Uint32 word1 = readPtr[0];
46 Uint32 word2 = readPtr[1];
47 Uint32 word3 = readPtr[2];
56 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
57 const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
59 if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
60 DEBUG(
"Message Size = " << messageLenBytes);
65 if (sizeOfData < messageLenBytes) {
69 if(Protocol6::getCheckSumIncluded(word1)){
70 const Uint32 tmpLen = messageLen32 - 1;
71 const Uint32 checkSumSent = readPtr[tmpLen];
72 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
74 if(checkSumComputed != checkSumSent){
81 if(Protocol6::getCompressed(word1)){
86 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
88 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
89 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
90 signalHeader.theSendersBlockRef = sBlockNum;
92 Uint8 prio = Protocol6::getPrio(word1);
94 Uint32 * signalData = &readPtr[3];
96 if(Protocol6::getSignalIdIncluded(word1) == 0){
97 signalHeader.theSendersSignalId = ~0;
99 signalHeader.theSendersSignalId = * signalData;
102 signalHeader.theSignalId= ~0;
104 Uint32 * sectionPtr = signalData + signalHeader.theLength;
105 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
106 for(Uint32
i = 0;
i<signalHeader.m_noOfSections;
i++){
107 Uint32 sz = * sectionPtr;
109 ptr[
i].p = sectionData;
115 callbackObj->
deliver_signal(&signalHeader, prio, signalData, ptr);
117 readPtr += messageLen32;
118 sizeOfData -= messageLenBytes;
119 usedData += messageLenBytes;
126 while ((sizeOfData >= 4 +
sizeof(
Protocol6)) &&
127 (loop_count < MAX_RECEIVED_SIGNALS)) {
128 Uint32 word1 = readPtr[0];
129 Uint32 word2 = readPtr[1];
130 Uint32 word3 = readPtr[2];
139 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
140 const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
141 if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
142 DEBUG(
"Message Size = " << messageLenBytes);
147 if (sizeOfData < messageLenBytes) {
151 if(Protocol6::getCheckSumIncluded(word1)){
152 const Uint32 tmpLen = messageLen32 - 1;
153 const Uint32 checkSumSent = readPtr[tmpLen];
154 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
156 if(checkSumComputed != checkSumSent){
165 if(Protocol6::getCompressed(word1)){
170 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
172 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
174 if(rBlockNum == 252){
175 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
176 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
177 signalHeader.theSendersBlockRef = sBlockNum;
179 Uint8 prio = Protocol6::getPrio(word1);
181 Uint32 * signalData = &readPtr[3];
183 if(Protocol6::getSignalIdIncluded(word1) == 0){
184 signalHeader.theSendersSignalId = ~0;
186 signalHeader.theSendersSignalId = * signalData;
190 Uint32 * sectionPtr = signalData + signalHeader.theLength;
191 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
192 for(Uint32
i = 0;
i<signalHeader.m_noOfSections;
i++){
193 Uint32 sz = * sectionPtr;
195 ptr[
i].p = sectionData;
201 callbackObj->
deliver_signal(&signalHeader, prio, signalData, ptr);
203 DEBUG(
"prepareReceive(...) - Discarding message to block: "
204 << rBlockNum <<
" from Node: " << remoteNodeId);
207 readPtr += messageLen32;
208 sizeOfData -= messageLenBytes;
209 usedData += messageLenBytes;
218 TransporterRegistry::unpack(Uint32 * readPtr,
224 Uint32 loop_count = 0;
225 if(state == NoHalt || state == HaltOutput){
226 while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
227 Uint32 word1 = readPtr[0];
228 Uint32 word2 = readPtr[1];
229 Uint32 word3 = readPtr[2];
237 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
239 if(messageLen32 == 0 ||
240 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
242 DEBUG(
"Message Size(words) = " << messageLen32);
247 if(Protocol6::getCheckSumIncluded(word1)){
248 const Uint32 tmpLen = messageLen32 - 1;
249 const Uint32 checkSumSent = readPtr[tmpLen];
250 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
252 if(checkSumComputed != checkSumSent){
259 if(Protocol6::getCompressed(word1)){
264 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
266 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
267 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
268 signalHeader.theSendersBlockRef = sBlockNum;
270 Uint8 prio = Protocol6::getPrio(word1);
272 Uint32 * signalData = &readPtr[3];
274 if(Protocol6::getSignalIdIncluded(word1) == 0){
275 signalHeader.theSendersSignalId = ~0;
277 signalHeader.theSendersSignalId = * signalData;
281 Uint32 * sectionPtr = signalData + signalHeader.theLength;
282 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
283 for(Uint32
i = 0;
i<signalHeader.m_noOfSections;
i++){
284 Uint32 sz = * sectionPtr;
286 ptr[
i].p = sectionData;
292 callbackObj->
deliver_signal(&signalHeader, prio, signalData, ptr);
294 readPtr += messageLen32;
299 while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
300 Uint32 word1 = readPtr[0];
301 Uint32 word2 = readPtr[1];
302 Uint32 word3 = readPtr[2];
310 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
311 if(messageLen32 == 0 ||
312 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
314 DEBUG(
"Message Size(words) = " << messageLen32);
319 if(Protocol6::getCheckSumIncluded(word1)){
320 const Uint32 tmpLen = messageLen32 - 1;
321 const Uint32 checkSumSent = readPtr[tmpLen];
322 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
324 if(checkSumComputed != checkSumSent){
333 if(Protocol6::getCompressed(word1)){
338 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
340 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
342 if(rBlockNum == 252){
343 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
344 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
345 signalHeader.theSendersBlockRef = sBlockNum;
347 Uint8 prio = Protocol6::getPrio(word1);
349 Uint32 * signalData = &readPtr[3];
351 if(Protocol6::getSignalIdIncluded(word1) == 0){
352 signalHeader.theSendersSignalId = ~0;
354 signalHeader.theSendersSignalId = * signalData;
358 Uint32 * sectionPtr = signalData + signalHeader.theLength;
359 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
360 for(Uint32
i = 0;
i<signalHeader.m_noOfSections;
i++){
361 Uint32 sz = * sectionPtr;
363 ptr[
i].p = sectionData;
369 callbackObj->
deliver_signal(&signalHeader, prio, signalData, ptr);
371 DEBUG(
"prepareReceive(...) - Discarding message to block: "
372 << rBlockNum <<
" from Node: " << remoteNodeId);
375 readPtr += messageLen32;
381 Packer::Packer(
bool signalId,
bool checksum) {
383 checksumUsed = (checksum ? 1 : 0);
384 signalIdUsed = (signalId ? 1 : 0);
388 preComputedWord1 = 0;
389 Protocol6::setByteOrder(preComputedWord1, MY_OWN_BYTE_ORDER);
390 Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
391 Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
392 Protocol6::setCompressed(preComputedWord1, 0);
398 const Uint32 sz = ptr.sz;
399 memcpy(insertPtr, ptr.p, 4 * sz);
407 Uint32 remain= ptr.sz;
412 const Uint32* next= ptr.sectionIter->getNextWords(len);
414 assert(len <= remain);
415 assert(next != NULL);
417 memcpy(insertPtr, next, 4 * len);
425 assert(ptr.sectionIter->getNextWords(remain) == NULL);
428 void copy(Uint32 * & insertPtr,
435 const Uint32 * theData,
439 Uint32 dataLen32 = header->theLength;
440 Uint32 no_segs = header->m_noOfSections;
443 dataLen32 + no_segs +
444 checksumUsed + signalIdUsed + (
sizeof(
Protocol6)/4);
447 for(i = 0; i<no_segs; i++){
454 Uint32 word1 = preComputedWord1;
458 Protocol6::setPrio(word1, prio);
459 Protocol6::setMessageLength(word1, len32);
460 Protocol6::createProtocol6Header(word1, word2, word3, header);
462 insertPtr[0] = word1;
463 insertPtr[1] = word2;
464 insertPtr[2] = word3;
466 Uint32 * tmpInserPtr = &insertPtr[3];
469 * tmpInserPtr = header->theSignalId;
473 memcpy(tmpInserPtr, theData, 4 * dataLen32);
475 tmpInserPtr += dataLen32;
476 for(i = 0; i<no_segs; i++){
477 tmpInserPtr[
i] = ptr[
i].sz;
480 tmpInserPtr += no_segs;
481 for(i = 0; i<no_segs; i++){
482 import(tmpInserPtr, ptr[
i]);
486 * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
494 const Uint32 * theData,
499 Uint32 dataLen32 = header->theLength;
500 Uint32 no_segs = header->m_noOfSections;
503 dataLen32 + no_segs +
504 checksumUsed + signalIdUsed + (
sizeof(
Protocol6)/4);
506 for(i = 0; i<no_segs; i++){
513 Uint32 word1 = preComputedWord1;
517 Protocol6::setPrio(word1, prio);
518 Protocol6::setMessageLength(word1, len32);
519 Protocol6::createProtocol6Header(word1, word2, word3, header);
521 insertPtr[0] = word1;
522 insertPtr[1] = word2;
523 insertPtr[2] = word3;
525 Uint32 * tmpInserPtr = &insertPtr[3];
528 * tmpInserPtr = header->theSignalId;
532 memcpy(tmpInserPtr, theData, 4 * dataLen32);
534 tmpInserPtr += dataLen32;
535 for(i = 0; i<no_segs; i++){
536 tmpInserPtr[
i] = ptr[
i].sz;
539 tmpInserPtr += no_segs;
540 for(i = 0; i<no_segs; i++){
541 copy(tmpInserPtr, thePool, ptr[i]);
545 * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
554 const Uint32 * theData,
558 Uint32 dataLen32 = header->theLength;
559 Uint32 no_segs = header->m_noOfSections;
562 dataLen32 + no_segs +
563 checksumUsed + signalIdUsed + (
sizeof(
Protocol6)/4);
566 for(i = 0; i<no_segs; i++){
573 Uint32 word1 = preComputedWord1;
577 Protocol6::setPrio(word1, prio);
578 Protocol6::setMessageLength(word1, len32);
579 Protocol6::createProtocol6Header(word1, word2, word3, header);
581 insertPtr[0] = word1;
582 insertPtr[1] = word2;
583 insertPtr[2] = word3;
585 Uint32 * tmpInsertPtr = &insertPtr[3];
588 * tmpInsertPtr = header->theSignalId;
592 memcpy(tmpInsertPtr, theData, 4 * dataLen32);
594 tmpInsertPtr += dataLen32;
595 for(i = 0; i<no_segs; i++){
596 tmpInsertPtr[
i] = ptr[
i].sz;
599 tmpInsertPtr += no_segs;
600 for(i = 0; i<no_segs; i++){
601 importGeneric(tmpInsertPtr, ptr[i]);
605 * tmpInsertPtr = computeChecksum(&insertPtr[0], len32-1);
617 TransporterRegistry::unpack_length_words(
const Uint32 *readPtr, Uint32 maxWords)
619 Uint32 wordLength = 0;
621 while (wordLength + 4 +
sizeof(
Protocol6) <= maxWords)
623 Uint32 word1 = readPtr[wordLength];
624 Uint16 messageLen32 = Protocol6::getMessageLength(word1);
625 if (wordLength + messageLen32 > maxWords)
627 wordLength += messageLen32;