18 #include <ndb_global.h>
20 #include <TransporterCallback.hpp>
21 #include <TransporterRegistry.hpp>
22 #include <FastScheduler.hpp>
23 #include <Emulator.hpp>
24 #include <ErrorHandlingMacros.hpp>
26 #include "LongSignal.hpp"
27 #include "LongSignalImpl.hpp"
29 #include <signaldata/EventReport.hpp>
30 #include <signaldata/TestOrd.hpp>
31 #include <signaldata/SignalDroppedRep.hpp>
32 #include <signaldata/DisconnectRep.hpp>
34 #include "VMSignal.hpp"
36 #include "TransporterCallbackKernel.hpp"
46 int ErrorSignalReceive= 0;
47 int ErrorMaxSegmentsToSeize= 0;
54 extern bool ErrorImportActive;
58 enum TransporterError err;
64 { TE_NO_ERROR,
"No error"},
65 { TE_SHM_UNABLE_TO_CREATE_SEGMENT,
"Unable to create shared memory segment"},
66 { (
enum TransporterError) -1,
"No connection error message available (please report a bug)"}
69 const char *lookupConnectionError(Uint32 err)
72 while ((Uint32)connectionError[i].err != err &&
73 connectionError[i].err != -1)
75 return connectionError[
i].text;
78 #include <DebuggerNames.hpp>
80 #ifndef NDBD_MULTITHREADED
89 int checkJobBuffer() {
return globalScheduler.checkDoJob(); }
90 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
91 Uint32 get_bytes_to_send_iovec(NodeId node,
struct iovec *dst, Uint32 max)
93 return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
95 Uint32 bytes_sent(NodeId node, Uint32 bytes)
97 return globalTransporterRegistry.bytes_sent(node, bytes);
99 bool has_data_to_send(NodeId node)
101 return globalTransporterRegistry.has_data_to_send(node);
103 void reset_send_buffer(NodeId node,
bool should_be_empty)
105 globalTransporterRegistry.reset_send_buffer(node, should_be_empty);
112 #ifdef NDBD_MULTITHREADED
113 static SectionSegmentPool::Cache cache(1024,1024);
116 mt_set_section_chunk_size()
118 g_sectionSegmentPool.setChunkSize(256);
122 void mt_set_section_chunk_size(){}
128 Uint32 *
const theData,
132 const Uint32 secCount = header->m_noOfSections;
133 const Uint32 length = header->theLength;
137 #ifdef TRACE_DISTRIBUTED
138 ndbout_c(
"recv: %s(%d) from (%s, %d)",
139 getSignalName(header->theVerId_signalNumber),
140 header->theVerId_signalNumber,
141 getBlockName(refToBlock(header->theSendersBlockRef)),
142 refToNode(header->theSendersBlockRef));
147 bzero(secPtr,
sizeof(secPtr));
148 secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
150 ErrorImportActive =
true;
153 ok &=
import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
155 ok &=
import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
157 ok &=
import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
159 ErrorImportActive =
false;
164 ok &= (length + secCount <= 25);
171 secPtrI[0] = secPtr[0].i;
172 secPtrI[1] = secPtr[1].i;
173 secPtrI[2] = secPtr[2].i;
175 #ifndef NDBD_MULTITHREADED
176 globalScheduler.execute(header, prio, theData, secPtrI);
179 sendlocal(receiverThreadId,
180 header, theData, secPtrI);
182 sendprioa(receiverThreadId,
183 header, theData, secPtrI);
192 for(Uint32 i = 0; i<secCount; i++){
193 if(secPtr[i].p != 0){
194 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
195 relSz(secPtr[i].p->m_sz),
197 secPtr[i].p->m_lastSegment);
202 Uint32 gsn = header->theVerId_signalNumber;
203 Uint32 len = header->theLength;
204 Uint32 newLen= (len > 22 ? 22 : len);
205 memmove(rep->originalData, theData, (4 * newLen));
206 rep->originalGsn = gsn;
207 rep->originalLength = len;
208 rep->originalSectionCount = secCount;
209 header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
210 header->theLength = newLen + 3;
211 header->m_noOfSections = 0;
212 #ifndef NDBD_MULTITHREADED
213 globalScheduler.execute(header, prio, theData, secPtrI);
216 sendlocal(receiverThreadId,
217 header, theData, NULL);
219 sendprioa(receiverThreadId,
220 header, theData, NULL);
227 out <<
"[ last= " << ss.m_lastSegment <<
" next= " << ss.nextPool <<
" ]";
233 TransporterError errorCode,
236 #ifdef DEBUG_TRANSPORTER
237 ndbout_c(
"reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info :
"");
240 DBUG_ENTER(
"reportError");
241 DBUG_PRINT(
"info",(
"nodeId %d errorCode: 0x%x info: %s",
242 nodeId, errorCode, info));
246 case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
250 info ?
" " :
"", info ? info :
"");
251 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
252 msg, __FILE__, NST_ErrorHandler);
258 info ?
" " :
"", info ? info :
"");
259 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
260 msg, __FILE__, NST_ErrorHandler);
262 case TE_SHM_IPC_PERMANENT:
266 "Remote node id %d.%s%s",
267 nodeId, info ?
" " :
"", info ? info :
"");
268 ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
269 msg, __FILE__, NST_ErrorHandler);
275 if(errorCode & TE_DO_DISCONNECT){
280 memset(&signal.header, 0,
sizeof(signal.header));
283 if(errorCode & TE_DO_DISCONNECT)
288 signal.theData[1] = nodeId;
289 signal.theData[2] = errorCode;
291 signal.header.theLength = 3;
292 signal.header.theSendersSignalId = 0;
293 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
294 signal.header.theReceiversBlockNumber = CMVMI;
295 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
296 #ifndef NDBD_MULTITHREADED
298 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
300 sendprioa(receiverThreadId,
301 &signal.header, signal.theData, NULL);
310 #ifndef NDBD_MULTITHREADED
312 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
317 memset(&signal.header, 0,
sizeof(signal.header));
319 signal.header.theLength = 3;
320 signal.header.theSendersSignalId = 0;
321 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
322 signal.header.theReceiversBlockNumber = CMVMI;
323 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
326 signal.theData[1] = nodeId;
327 signal.theData[2] = Uint32(bytes/count);
330 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
343 memset(&signal.header, 0,
sizeof(signal.header));
345 signal.header.theLength = 3;
346 signal.header.theSendersSignalId = 0;
347 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
348 signal.header.theReceiversBlockNumber = CMVMI;
349 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
352 signal.theData[1] = nodeId;
353 signal.theData[2] = Uint32(bytes/count);
354 #ifndef NDBD_MULTITHREADED
356 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
358 sendprioa(receiverThreadId,
359 &signal.header, signal.theData, NULL);
372 memset(&signal.header, 0,
sizeof(signal.header));
374 signal.header.theLength = 1;
375 signal.header.theSendersSignalId = 0;
376 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
377 signal.header.theReceiversBlockNumber = CMVMI;
378 signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
380 signal.theData[0] = nodeId;
382 #ifndef NDBD_MULTITHREADED
384 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
386 sendprioa(receiverThreadId,
387 &signal.header, signal.theData, NULL);
398 DBUG_ENTER(
"reportDisconnect");
401 memset(&signal.header, 0,
sizeof(signal.header));
403 signal.header.theLength = DisconnectRep::SignalLength;
404 signal.header.theSendersSignalId = 0;
405 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
406 signal.header.theTrace = TestOrd::TraceDisconnect;
407 signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
408 signal.header.theReceiversBlockNumber = CMVMI;
411 rep->nodeId = nodeId;
414 #ifndef NDBD_MULTITHREADED
416 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
418 sendprioa(receiverThreadId,
419 &signal.header, signal.theData, NULL);
431 fprintf(output,
"SECTION %u type=segmented", i);
433 fprintf(output,
" *** invalid ***\n");
436 const Uint32 len = ptr[
i].sz;
439 fprintf(output,
" size=%u\n", (
unsigned)len);
441 if (pos > 0 && pos % SectionSegment::DataLength == 0) {
442 ssp = g_sectionSegmentPool.
getPtr(ssp->m_nextSegment);
444 printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);