18 #include <ndb_global.h> 
   20 #include <TransporterCallback.hpp> 
   21 #include <TransporterRegistry.hpp> 
   22 #include <FastScheduler.hpp> 
   23 #include <Emulator.hpp> 
   24 #include <ErrorHandlingMacros.hpp> 
   26 #include "LongSignal.hpp" 
   27 #include "LongSignalImpl.hpp" 
   29 #include <signaldata/EventReport.hpp> 
   30 #include <signaldata/TestOrd.hpp> 
   31 #include <signaldata/SignalDroppedRep.hpp> 
   32 #include <signaldata/DisconnectRep.hpp> 
   34 #include "VMSignal.hpp" 
   36 #include "TransporterCallbackKernel.hpp" 
   46 int ErrorSignalReceive= 0;
 
   47 int ErrorMaxSegmentsToSeize= 0;
 
   54 extern bool ErrorImportActive;
 
   58   enum TransporterError err;
 
   64   { TE_NO_ERROR, 
"No error"},
 
   65   { TE_SHM_UNABLE_TO_CREATE_SEGMENT, 
"Unable to create shared memory segment"},
 
   66   { (
enum TransporterError) -1, 
"No connection error message available (please report a bug)"}
 
   69 const char *lookupConnectionError(Uint32 err)
 
   72   while ((Uint32)connectionError[i].err != err && 
 
   73          connectionError[i].err != -1)
 
   75   return connectionError[
i].text;
 
   78 #include <DebuggerNames.hpp> 
   80 #ifndef NDBD_MULTITHREADED 
   89   int checkJobBuffer() { 
return globalScheduler.checkDoJob(); }
 
   90   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
 
   91   Uint32 get_bytes_to_send_iovec(NodeId node, 
struct iovec *dst, Uint32 max)
 
   93     return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
 
   95   Uint32 bytes_sent(NodeId node, Uint32 bytes)
 
   97     return globalTransporterRegistry.bytes_sent(node, bytes);
 
   99   bool has_data_to_send(NodeId node)
 
  101     return globalTransporterRegistry.has_data_to_send(node);
 
  103   void reset_send_buffer(NodeId node, 
bool should_be_empty)
 
  105     globalTransporterRegistry.reset_send_buffer(node, should_be_empty);
 
  112 #ifdef NDBD_MULTITHREADED 
  113 static SectionSegmentPool::Cache cache(1024,1024);
 
  116 mt_set_section_chunk_size()
 
  118   g_sectionSegmentPool.setChunkSize(256);
 
  122 void mt_set_section_chunk_size(){}
 
  128                                           Uint32 * 
const theData,
 
  132   const Uint32 secCount = header->m_noOfSections;
 
  133   const Uint32 length = header->theLength;
 
  137 #ifdef TRACE_DISTRIBUTED 
  138   ndbout_c(
"recv: %s(%d) from (%s, %d)",
 
  139            getSignalName(header->theVerId_signalNumber), 
 
  140            header->theVerId_signalNumber,
 
  141            getBlockName(refToBlock(header->theSendersBlockRef)),
 
  142            refToNode(header->theSendersBlockRef));
 
  147   bzero(secPtr, 
sizeof(secPtr));
 
  148   secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
 
  150   ErrorImportActive = 
true;
 
  153     ok &= 
import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
 
  155     ok &= 
import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
 
  157     ok &= 
import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
 
  159   ErrorImportActive = 
false;
 
  164   ok &= (length + secCount <= 25);
 
  171     secPtrI[0] = secPtr[0].i;
 
  172     secPtrI[1] = secPtr[1].i;
 
  173     secPtrI[2] = secPtr[2].i;
 
  175 #ifndef NDBD_MULTITHREADED 
  176     globalScheduler.execute(header, prio, theData, secPtrI);  
 
  179       sendlocal(receiverThreadId,
 
  180                 header, theData, secPtrI);
 
  182       sendprioa(receiverThreadId,
 
  183                 header, theData, secPtrI);
 
  192   for(Uint32 i = 0; i<secCount; i++){
 
  193     if(secPtr[i].p != 0){
 
  194       g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
 
  195                                        relSz(secPtr[i].p->m_sz),
 
  197                                        secPtr[i].p->m_lastSegment);
 
  202   Uint32 gsn = header->theVerId_signalNumber;
 
  203   Uint32 len = header->theLength;
 
  204   Uint32 newLen= (len > 22 ? 22 : len);
 
  205   memmove(rep->originalData, theData, (4 * newLen));
 
  206   rep->originalGsn = gsn;
 
  207   rep->originalLength = len;
 
  208   rep->originalSectionCount = secCount;
 
  209   header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
 
  210   header->theLength = newLen + 3;
 
  211   header->m_noOfSections = 0;
 
  212 #ifndef NDBD_MULTITHREADED 
  213   globalScheduler.execute(header, prio, theData, secPtrI);    
 
  216     sendlocal(receiverThreadId,
 
  217               header, theData, NULL);
 
  219     sendprioa(receiverThreadId,
 
  220               header, theData, NULL);
 
  227   out << 
"[ last= " << ss.m_lastSegment << 
" next= " << ss.nextPool << 
" ]";
 
  233                                        TransporterError errorCode,
 
  236 #ifdef DEBUG_TRANSPORTER 
  237   ndbout_c(
"reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : 
"");
 
  240   DBUG_ENTER(
"reportError");
 
  241   DBUG_PRINT(
"info",(
"nodeId %d  errorCode: 0x%x  info: %s",
 
  242                      nodeId, errorCode, info));
 
  246   case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
 
  250              info ? 
" " : 
"", info ? info : 
"");
 
  251     ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
 
  252                                msg, __FILE__, NST_ErrorHandler);
 
  258              info ? 
" " : 
"", info ? info : 
"");
 
  259     ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
 
  260                                msg, __FILE__, NST_ErrorHandler);
 
  262   case TE_SHM_IPC_PERMANENT:
 
  266              "Remote node id %d.%s%s",
 
  267              nodeId, info ? 
" " : 
"", info ? info : 
"");
 
  268     ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
 
  269                                msg, __FILE__, NST_ErrorHandler);
 
  275   if(errorCode & TE_DO_DISCONNECT){
 
  280   memset(&signal.header, 0, 
sizeof(signal.header));
 
  283   if(errorCode & TE_DO_DISCONNECT)
 
  288   signal.theData[1] = nodeId;
 
  289   signal.theData[2] = errorCode;
 
  291   signal.header.theLength = 3;  
 
  292   signal.header.theSendersSignalId = 0;
 
  293   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
 
  294   signal.header.theReceiversBlockNumber = CMVMI;
 
  295   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
 
  296 #ifndef NDBD_MULTITHREADED 
  298   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 
  300   sendprioa(receiverThreadId,
 
  301             &signal.header, signal.theData, NULL);
 
  310 #ifndef NDBD_MULTITHREADED 
  312 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
 
  317   memset(&signal.header, 0, 
sizeof(signal.header));
 
  319   signal.header.theLength = 3;
 
  320   signal.header.theSendersSignalId = 0;
 
  321   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
 
  322   signal.header.theReceiversBlockNumber = CMVMI;
 
  323   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
 
  326   signal.theData[1] = nodeId;
 
  327   signal.theData[2] = Uint32(bytes/count);
 
  330   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 
  343   memset(&signal.header, 0, 
sizeof(signal.header));
 
  345   signal.header.theLength = 3;  
 
  346   signal.header.theSendersSignalId = 0;
 
  347   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
 
  348   signal.header.theReceiversBlockNumber = CMVMI;
 
  349   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
 
  352   signal.theData[1] = nodeId;
 
  353   signal.theData[2] = Uint32(bytes/count);
 
  354 #ifndef NDBD_MULTITHREADED 
  356   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 
  358   sendprioa(receiverThreadId,
 
  359             &signal.header, signal.theData, NULL);
 
  372   memset(&signal.header, 0, 
sizeof(signal.header));
 
  374   signal.header.theLength = 1; 
 
  375   signal.header.theSendersSignalId = 0;
 
  376   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
 
  377   signal.header.theReceiversBlockNumber = CMVMI;
 
  378   signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
 
  380   signal.theData[0] = nodeId;
 
  382 #ifndef NDBD_MULTITHREADED 
  384   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 
  386   sendprioa(receiverThreadId,
 
  387             &signal.header, signal.theData, NULL);
 
  398   DBUG_ENTER(
"reportDisconnect");
 
  401   memset(&signal.header, 0, 
sizeof(signal.header));
 
  403   signal.header.theLength = DisconnectRep::SignalLength; 
 
  404   signal.header.theSendersSignalId = 0;
 
  405   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
 
  406   signal.header.theTrace = TestOrd::TraceDisconnect;
 
  407   signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
 
  408   signal.header.theReceiversBlockNumber = CMVMI;
 
  411   rep->nodeId = nodeId;
 
  414 #ifndef NDBD_MULTITHREADED 
  416   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 
  418   sendprioa(receiverThreadId,
 
  419             &signal.header, signal.theData, NULL);
 
  431   fprintf(output, 
"SECTION %u type=segmented", i);
 
  433     fprintf(output, 
" *** invalid ***\n");
 
  436   const Uint32 len = ptr[
i].sz;
 
  439   fprintf(output, 
" size=%u\n", (
unsigned)len);
 
  441     if (pos > 0 && pos % SectionSegment::DataLength == 0) {
 
  442       ssp = g_sectionSegmentPool.
getPtr(ssp->m_nextSegment);
 
  444     printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);