18 #include <ndb_global.h> 
   20 #include <TransporterRegistry.hpp> 
   21 #include "TransporterInternalDefinitions.hpp" 
   23 #include "Transporter.hpp" 
   24 #include <SocketAuthenticator.hpp> 
   26 #ifdef NDB_TCP_TRANSPORTER 
   27 #include "TCP_Transporter.hpp" 
   28 #include "Loopback_Transporter.hpp" 
   31 #ifdef NDB_SCI_TRANSPORTER 
   32 #include "SCI_Transporter.hpp" 
   35 #ifdef NDB_SHM_TRANSPORTER 
   36 #include "SHM_Transporter.hpp" 
   37 extern int g_ndb_shm_signum;
 
   43 #include <InputStream.hpp> 
   44 #include <OutputStream.hpp> 
   46 #include <mgmapi/mgmapi.h> 
   47 #include <mgmapi_internal.h> 
   48 #include <mgmapi/mgmapi_debug.h> 
   50 #include <EventLogger.hpp> 
   56   return theTransporters[node_id]->m_connect_address;
 
   61   DBUG_ENTER(
"SocketServer::Session * TransporterService::newSession");
 
   62   if (m_auth && !m_auth->server_authenticate(sockfd)){
 
   63     NDB_CLOSE_SOCKET(sockfd);
 
   70     NDB_CLOSE_SOCKET(sockfd);
 
   78                                          bool use_default_send_buffer,
 
   79                                          unsigned _maxTransporters,
 
   80                                          unsigned sizeOfLongSignalMemory) :
 
   84   m_use_default_send_buffer(use_default_send_buffer),
 
   85   m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
 
   86   m_total_max_send_buffer(0)
 
   88   DBUG_ENTER(
"TransporterRegistry::TransporterRegistry");
 
   90   maxTransporters = _maxTransporters;
 
   98   theTransporterTypes = 
new TransporterType   [maxTransporters];
 
   99   theTransporters     = 
new Transporter     * [maxTransporters];
 
  100   performStates       = 
new PerformState      [maxTransporters];
 
  101   ioStates            = 
new IOState           [maxTransporters]; 
 
  102   m_disconnect_errnum = 
new int               [maxTransporters];
 
  103   m_error_states      = 
new ErrorState        [maxTransporters];
 
  105   m_has_extra_wakeup_socket = 
false;
 
  106 #if defined(HAVE_EPOLL_CREATE) 
  108  m_epoll_events       = 
new struct epoll_event[maxTransporters];
 
  109  m_epoll_fd = epoll_create(maxTransporters);
 
  110  if (m_epoll_fd == -1 || !m_epoll_events)
 
  113    perror(
"Failed to alloc epoll-array or calling epoll_create... falling back to select!");
 
  114    if (m_epoll_fd != -1)
 
  121      delete [] m_epoll_events;
 
  127    memset((
char*)m_epoll_events, 0,
 
  128           maxTransporters * 
sizeof(
struct epoll_event));
 
  134   m_blocked_with_data.clear();
 
  135   m_blocked_disconnected.clear();
 
  139   nTCPTransporters = 0;
 
  140   nSCITransporters = 0;
 
  141   nSHMTransporters = 0;
 
  144   ErrorState default_error_state = { TE_NO_ERROR, (
const char *)~(UintPtr)0 };
 
  145   for (
unsigned i=0; 
i<maxTransporters; 
i++) {
 
  146     theTCPTransporters[
i] = NULL;
 
  147     theSCITransporters[
i] = NULL;
 
  148     theSHMTransporters[
i] = NULL;
 
  149     theTransporters[
i]    = NULL;
 
  150     performStates[
i]      = DISCONNECTED;
 
  151     ioStates[
i]           = NoHalt;
 
  152     m_disconnect_errnum[
i]= 0;
 
  153     m_error_states[
i]     = default_error_state;
 
  162   if (!m_use_default_send_buffer)
 
  165   if (total_send_buffer == 0)
 
  171     assert(m_send_buffer_memory);
 
  179   m_send_buffers = 
new SendBuffer[maxTransporters];
 
  180   for (
unsigned i = 0; 
i < maxTransporters; 
i++)
 
  182     SendBuffer &b = m_send_buffers[
i];
 
  183     b.m_first_page = NULL;
 
  184     b.m_last_page = NULL;
 
  189   Uint64 send_buffer_pages =
 
  190     (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
 
  192   send_buffer_pages += nTransporters;
 
  194   m_send_buffer_memory =
 
  195     new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
 
  196   if (m_send_buffer_memory == NULL)
 
  198     ndbout << 
"Unable to allocate " 
  199            << send_buffer_pages * SendBufferPage::PGSIZE
 
  200            << 
" bytes of memory for send buffers, aborting." << endl;
 
  204   m_page_freelist = NULL;
 
  205   for (
unsigned i = 0; 
i < send_buffer_pages; 
i++)
 
  207     SendBufferPage *
page =
 
  208       (SendBufferPage *)(m_send_buffer_memory + 
i * SendBufferPage::PGSIZE);
 
  210     page->m_next = m_page_freelist;
 
  211     m_page_freelist = 
page;
 
  217   DBUG_ENTER(
"TransporterRegistry::set_mgm_handle");
 
  226     DBUG_PRINT(
"info",(
"handle set with connectstring: %s",
 
  231     DBUG_PRINT(
"info",(
"handle set to NULL"));
 
  239   DBUG_ENTER(
"TransporterRegistry::~TransporterRegistry");
 
  243   delete[] theTCPTransporters;
 
  244   delete[] theSCITransporters;
 
  245   delete[] theSHMTransporters;
 
  246   delete[] theTransporterTypes;
 
  247   delete[] theTransporters;
 
  248   delete[] performStates;
 
  250   delete[] m_disconnect_errnum;
 
  251   delete[] m_error_states;
 
  254     delete[] m_send_buffers;
 
  255   m_page_freelist = NULL;
 
  256   if (m_send_buffer_memory)
 
  257     delete[] m_send_buffer_memory;
 
  259 #if defined(HAVE_EPOLL_CREATE) 
  260   if (m_epoll_events) 
delete [] m_epoll_events;
 
  261   if (m_epoll_fd != -1) close(m_epoll_fd);
 
  266   if (m_has_extra_wakeup_socket)
 
  268     my_socket_close(m_extra_wakeup_sockets[0]);
 
  269     my_socket_close(m_extra_wakeup_sockets[1]);
 
  277   for(
unsigned i = 0; 
i<maxTransporters; 
i++){
 
  278     if(theTransporters[
i] != NULL)
 
  279       removeTransporter(theTransporters[
i]->getRemoteNodeId());
 
  285   for(
unsigned i = 0; 
i<maxTransporters; 
i++){
 
  286     if(theTransporters[
i] != NULL)
 
  292 TransporterRegistry::init(NodeId nodeId) {
 
  293   DBUG_ENTER(
"TransporterRegistry::init");
 
  294   assert(localNodeId == 0 ||
 
  295          localNodeId == nodeId);
 
  297   localNodeId = nodeId;
 
  299   DEBUG(
"TransporterRegistry started node: " << localNodeId);
 
  301   if (!m_socket_poller.set_max_count(maxTransporters +
 
  312   DBUG_ENTER(
"TransporterRegistry::connect_server(sockfd)");
 
  318   if (s_input.gets(buf, 
sizeof(buf)) == 0) {
 
  319     msg.
assfmt(
"line: %u : Failed to get nodeid from client", __LINE__);
 
  320     DBUG_PRINT(
"error", (
"Failed to read 'hello' from client"));
 
  324   int nodeId, remote_transporter_type= -1;
 
  325   int r= sscanf(buf, 
"%d %d", &nodeId, &remote_transporter_type);
 
  334     msg.
assfmt(
"line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
 
  335     DBUG_PRINT(
"error", (
"Failed to parse 'hello' from client, buf: '%.*s'",
 
  336                          (
int)
sizeof(buf), buf));
 
  340   DBUG_PRINT(
"info", (
"Client hello, nodeId: %d transporter type: %d",
 
  341                       nodeId, remote_transporter_type));
 
  346       nodeId >= (
int)maxTransporters)
 
  348     msg.
assfmt(
"line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
 
  349     DBUG_PRINT(
"error", (
"Out of range nodeId: %d from client",
 
  358     msg.
assfmt(
"line: %u : Incorrect reply from client: >%s<, node: %u",
 
  359                __LINE__, buf, nodeId);
 
  360     DBUG_PRINT(
"error", (
"No transporter available for node id %d", nodeId));
 
  365   if (performStates[nodeId] != TransporterRegistry::CONNECTING)
 
  367     msg.
assfmt(
"line: %u : Incorrect state for node %u state: %s (%u)",
 
  369                getPerformStateString(performStates[nodeId]),
 
  370                performStates[nodeId]);
 
  372     DBUG_PRINT(
"error", (
"Transporter for node id %d in wrong state",
 
  378   if (remote_transporter_type != -1 &&
 
  379       remote_transporter_type != t->m_type)
 
  381     g_eventLogger->
error(
"Connection from node: %d uses different transporter " 
  382                          "type: %d, expected type: %d",
 
  383                          nodeId, remote_transporter_type, t->m_type);
 
  389   if (s_output.println(
"%d %d", t->
getLocalNodeId(), t->m_type) < 0)
 
  391     msg.
assfmt(
"line: %u : Failed to reply to connecting socket (node: %u)",
 
  393     DBUG_PRINT(
"error", (
"Send of reply failed"));
 
  398   bool res = t->connect_server(sockfd, msg);
 
  400   if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
 
  402     msg.
assfmt(
"line: %u : Incorrect state for node %u state: %s (%u)",
 
  404                getPerformStateString(performStates[nodeId]),
 
  405                performStates[nodeId]);
 
  418   NodeId remoteNodeId = config->remoteNodeId;
 
  421   assert(config->localNodeId == localNodeId);
 
  423   if (remoteNodeId >= maxTransporters)
 
  430     return t->configure(config);
 
  433   DEBUG(
"Configuring transporter from " << localNodeId
 
  434         << 
" to " << remoteNodeId);
 
  436   switch (config->type){
 
  437   case tt_TCP_TRANSPORTER:
 
  438     return createTCPTransporter(config);
 
  439   case tt_SHM_TRANSPORTER:
 
  440     return createSHMTransporter(config);
 
  441   case tt_SCI_TRANSPORTER:
 
  442     return createSCITransporter(config);
 
  453 #ifdef NDB_TCP_TRANSPORTER 
  456   if (config->remoteNodeId == config->localNodeId)
 
  467   else if (!t->initTransporter()) {
 
  473   theTCPTransporters[nTCPTransporters]      = t;
 
  479   m_total_max_send_buffer += t->get_max_send_buffer();
 
  489 #ifdef NDB_SCI_TRANSPORTER 
  491   if(!SCI_Transporter::initSCI())
 
  495                                             config->localHostName,
 
  496                                             config->remoteHostName,
 
  498                                             config->isMgmConnection,
 
  499                                             config->sci.sendLimit, 
 
  500                                             config->sci.bufferSize,
 
  501                                             config->sci.nLocalAdapters,
 
  502                                             config->sci.remoteSciNodeId0,
 
  503                                             config->sci.remoteSciNodeId1,
 
  505                                             config->remoteNodeId,
 
  506                                             config->serverNodeId,
 
  517   theSCITransporters[nSCITransporters]      = t;
 
  523   m_total_max_send_buffer += t->get_max_send_buffer();
 
  533   DBUG_ENTER(
"TransporterRegistry::createTransporter SHM");
 
  534 #ifdef NDB_SHM_TRANSPORTER 
  536   if (!g_ndb_shm_signum) {
 
  537     g_ndb_shm_signum= config->shm.signum;
 
  538     DBUG_PRINT(
"info",(
"Block signum %d",g_ndb_shm_signum));
 
  543     NdbThread_set_shm_sigmask(TRUE);
 
  546   if(config->shm.signum != g_ndb_shm_signum)
 
  550                                             config->localHostName,
 
  551                                             config->remoteHostName,
 
  553                                             config->isMgmConnection,
 
  555                                             config->remoteNodeId,
 
  556                                             config->serverNodeId,
 
  569   theSHMTransporters[nSHMTransporters]      = t;
 
  576   m_total_max_send_buffer += t->get_max_send_buffer();
 
  586 TransporterRegistry::removeTransporter(NodeId nodeId) {
 
  588   DEBUG(
"Removing transporter from " << localNodeId
 
  589         << 
" to " << nodeId);
 
  591   if(theTransporters[nodeId] == NULL)
 
  596   const TransporterType 
type = theTransporterTypes[nodeId];
 
  600   case tt_TCP_TRANSPORTER:
 
  601 #ifdef NDB_TCP_TRANSPORTER 
  602     for(; ind < nTCPTransporters; ind++)
 
  603       if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
 
  606     for(; ind<nTCPTransporters; ind++)
 
  607       theTCPTransporters[ind-1] = theTCPTransporters[ind];
 
  611   case tt_SCI_TRANSPORTER:
 
  612 #ifdef NDB_SCI_TRANSPORTER 
  613     for(; ind < nSCITransporters; ind++)
 
  614       if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
 
  617     for(; ind<nSCITransporters; ind++)
 
  618       theSCITransporters[ind-1] = theSCITransporters[ind];
 
  622   case tt_SHM_TRANSPORTER:
 
  623 #ifdef NDB_SHM_TRANSPORTER 
  624     for(; ind < nSHMTransporters; ind++)
 
  625       if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
 
  628     for(; ind<nSHMTransporters; ind++)
 
  629       theSHMTransporters[ind-1] = theSHMTransporters[ind];
 
  638   delete theTransporters[nodeId];
 
  639   theTransporters[nodeId] = NULL;        
 
  646                                  const Uint32 * 
const signalData,
 
  653      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
 
  654       ((signalHeader->theReceiversBlockNumber == 252) ||
 
  655        (signalHeader->theReceiversBlockNumber == 4002)))) {
 
  658       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
 
  659       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
 
  660         Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  662           t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
 
  663           updateWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  673         for(
int i = 0; 
i<50; 
i++){
 
  674           if((nSHMTransporters+nSCITransporters) == 0)
 
  675             NdbSleep_MilliSleep(sleepTime); 
 
  676           insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  678             t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
 
  679             updateWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  692         WARNING(
"Signal to " << nodeId << 
" lost(buffer)");
 
  694         return SEND_BUFFER_FULL;
 
  696         return SEND_MESSAGE_TOO_BIG;
 
  700       if (m_blocked.get(nodeId))
 
  705         WARNING(
"Signal to " << nodeId << 
" discarded as node blocked + disconnected");
 
  709       DEBUG(
"Signal to " << nodeId << 
" lost(disconnect) ");
 
  710       return SEND_DISCONNECTED;
 
  713     DEBUG(
"Discarding message to block: "  
  714           << signalHeader->theReceiversBlockNumber 
 
  715           << 
" node: " << nodeId);
 
  718       return SEND_UNKNOWN_NODE;
 
  728                                  const Uint32 * 
const signalData,
 
  736      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
 
  737       ((signalHeader->theReceiversBlockNumber == 252)|| 
 
  738        (signalHeader->theReceiversBlockNumber == 4002)))) {
 
  741       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
 
  742       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
 
  743         Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  745           t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
 
  746           updateWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  756         for(
int i = 0; 
i<50; 
i++){
 
  757           if((nSHMTransporters+nSCITransporters) == 0)
 
  758             NdbSleep_MilliSleep(sleepTime); 
 
  759           insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  761             t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
 
  762             updateWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  775         WARNING(
"Signal to " << nodeId << 
" lost(buffer)");
 
  777         return SEND_BUFFER_FULL;
 
  779         return SEND_MESSAGE_TOO_BIG;
 
  783       if (m_blocked.get(nodeId))
 
  788         WARNING(
"Signal to " << nodeId << 
" discarded as node blocked + disconnected");
 
  792       DEBUG(
"Signal to " << nodeId << 
" lost(disconnect) ");
 
  793       return SEND_DISCONNECTED;
 
  796     DEBUG(
"Discarding message to block: "  
  797           << signalHeader->theReceiversBlockNumber 
 
  798           << 
" node: " << nodeId);
 
  801       return SEND_UNKNOWN_NODE;
 
  812                                  const Uint32 * 
const signalData,
 
  819      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
 
  820       ((signalHeader->theReceiversBlockNumber == 252) ||
 
  821        (signalHeader->theReceiversBlockNumber == 4002)))) {
 
  824       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
 
  825       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
 
  826         Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  828           t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
 
  829           updateWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  839         for(
int i = 0; 
i<50; 
i++){
 
  840           if((nSHMTransporters+nSCITransporters) == 0)
 
  841             NdbSleep_MilliSleep(sleepTime); 
 
  842           insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  844             t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
 
  845             updateWritePtr(sendHandle, nodeId, lenBytes, prio);
 
  858         WARNING(
"Signal to " << nodeId << 
" lost(buffer)");
 
  860         return SEND_BUFFER_FULL;
 
  862         return SEND_MESSAGE_TOO_BIG;
 
  865       DEBUG(
"Signal to " << nodeId << 
" lost(disconnect) ");
 
  866       return SEND_DISCONNECTED;
 
  869     DEBUG(
"Discarding message to block: "  
  870           << signalHeader->theReceiversBlockNumber 
 
  871           << 
" node: " << nodeId);
 
  874       return SEND_UNKNOWN_NODE;
 
  888   if(pollReceive(timeOutMillis)){
 
  895 TransporterRegistry::setup_wakeup_socket()
 
  897   if (m_has_extra_wakeup_socket)
 
  902   if (my_socketpair(m_extra_wakeup_sockets))
 
  904     perror(
"socketpair failed!");
 
  908   if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
 
  909       !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
 
  914 #if defined(HAVE_EPOLL_CREATE) 
  915   if (m_epoll_fd != -1)
 
  917     int sock = m_extra_wakeup_sockets[0].fd;
 
  918     struct epoll_event event_poll;
 
  919     bzero(&event_poll, 
sizeof(event_poll));
 
  920     event_poll.data.u32 = 0;
 
  921     event_poll.events = EPOLLIN;
 
  922     int ret_val = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, sock, &event_poll);
 
  926       fprintf(stderr, 
"Failed to add extra sock %u to epoll-set: %u\n",
 
  933   m_has_extra_wakeup_socket = 
true;
 
  937   my_socket_close(m_extra_wakeup_sockets[0]);
 
  938   my_socket_close(m_extra_wakeup_sockets[1]);
 
  939   my_socket_invalidate(m_extra_wakeup_sockets+0);
 
  940   my_socket_invalidate(m_extra_wakeup_sockets+1);
 
  945 TransporterRegistry::wakeup()
 
  947   if (m_has_extra_wakeup_socket)
 
  950     my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
 
  955 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
 
  970   if (nSCITransporters > 0)
 
  975 #ifdef NDB_SHM_TRANSPORTER 
  976   if (nSHMTransporters > 0)
 
  978     Uint32 res = poll_SHM(0, mask);
 
  987 #ifdef NDB_TCP_TRANSPORTER 
  988 #if defined(HAVE_EPOLL_CREATE) 
  989   if (likely(m_epoll_fd != -1))
 
  991     Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
 
  995       tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
 
  996                                       num_trps, timeOutMillis);
 
  997       retVal |= tcpReadSelectReply;
 
 1000     int num_socket_events = tcpReadSelectReply;
 
 1001     if (num_socket_events > 0)
 
 1003       for (
int i = 0; 
i < num_socket_events; 
i++)
 
 1005         const Uint32 trpid = m_epoll_events[
i].data.u32;
 
 1007         if (m_blocked.get(trpid))
 
 1010           m_blocked_with_data.set(trpid);
 
 1017     else if (num_socket_events < 0)
 
 1019       assert(errno == EINTR);
 
 1025     if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
 
 1027       retVal |= poll_TCP(timeOutMillis, mask);
 
 1030       tcpReadSelectReply = 0;
 
 1033 #ifdef NDB_SCI_TRANSPORTER 
 1034   if (nSCITransporters > 0)
 
 1035     retVal |= poll_SCI(timeOutMillis, mask);
 
 1037 #ifdef NDB_SHM_TRANSPORTER 
 1038   if (nSHMTransporters > 0)
 
 1040     int res = poll_SHM(0, mask);
 
 1048 #ifdef NDB_SCI_TRANSPORTER 
 1050 TransporterRegistry::poll_SCI(Uint32 timeOutMillis, 
NodeBitmask& mask)
 
 1053   for (
int i = 0; 
i < nSCITransporters; 
i++)
 
 1059       if (t->hasDataToRead())
 
 1071 #ifdef NDB_SHM_TRANSPORTER 
 1072 static int g_shm_counter = 0;
 
 1074 TransporterRegistry::poll_SHM(Uint32 timeOutMillis, 
NodeBitmask& mask)
 
 1077   for (
int j = 0; j < 100; j++)
 
 1079     for (
int i = 0; 
i<nSHMTransporters; 
i++)
 
 1085         if (t->hasDataToRead())
 
 1098 #ifdef NDB_TCP_TRANSPORTER 
 1108 TransporterRegistry::poll_TCP(Uint32 timeOutMillis, 
NodeBitmask& mask)
 
 1110   m_socket_poller.clear();
 
 1112   if (m_has_extra_wakeup_socket)
 
 1114     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
 
 1117     m_socket_poller.add(socket, 
true, 
false, 
false);
 
 1120   Uint16 idx[MAX_NODES];
 
 1121   for (
int i = 0; 
i < nTCPTransporters; 
i++)
 
 1124     const NDB_SOCKET_TYPE socket = t->getSocket();
 
 1127     if (is_connected(node_id) && t->
isConnected() && my_socket_valid(socket))
 
 1129       idx[
i] = m_socket_poller.add(socket, 
true, 
false, 
false);
 
 1133       idx[
i] = MAX_NODES + 1;
 
 1137   tcpReadSelectReply = m_socket_poller.poll_unsafe(timeOutMillis);
 
 1139   if (tcpReadSelectReply > 0)
 
 1141     if (m_extra_wakeup_sockets)
 
 1143       if (m_socket_poller.has_read(0))
 
 1144         mask.
set((Uint32)0);
 
 1147     for (
int i = 0; 
i < nTCPTransporters; 
i++)
 
 1150       if (idx[
i] != MAX_NODES + 1)
 
 1154         if (m_blocked.get(
i))
 
 1157           m_blocked_with_data.set(
i);
 
 1161         if (m_socket_poller.has_read(idx[
i]))
 
 1167   return tcpReadSelectReply;
 
 1171 #if defined(HAVE_EPOLL_CREATE) 
 1175   struct epoll_event event_poll;
 
 1176   bzero(&event_poll, 
sizeof(event_poll));
 
 1177   NDB_SOCKET_TYPE sock_fd = t->getSocket();
 
 1179   int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
 
 1182   if (!my_socket_valid(sock_fd))
 
 1186   event_poll.events = EPOLLIN;
 
 1187   ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
 
 1191   if (error == ENOENT && !add)
 
 1199   if (!add || (add && (error != ENOMEM)))
 
 1206     ndbout_c(
"Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
 
 1207              " node %u to epoll-set," 
 1209              add ? 
"ADD" : 
"DEL",
 
 1211              MY_SOCKET_FORMAT_VALUE(sock_fd),
 
 1217   ndbout << 
"We lacked memory to add the socket for node id ";
 
 1218   ndbout << node_id << endl;
 
 1233   bool hasReceived = 
false;
 
 1235   if (m_has_data_transporters.
get(0))
 
 1237     m_has_data_transporters.
clear(Uint32(0));
 
 1238     consume_extra_sockets();
 
 1242   if (!m_blocked.isclear())
 
 1244     if (m_has_data_transporters.
isclear())
 
 1249       NdbSleep_MilliSleep(1);
 
 1254 #ifdef NDB_TCP_TRANSPORTER 
 1256   while ((
id = m_has_data_transporters.
find(
id + 1)) != BitmaskImpl::NotFound)
 
 1258     bool hasdata = 
false;
 
 1260     if (is_connected(
id))
 
 1269         Uint32 sz = t->getReceiveData(&ptr);
 
 1271         Uint32 szUsed = unpack(ptr, sz, 
id, ioStates[
id]);
 
 1272         t->updateReceiveDataPtr(szUsed);
 
 1273         hasdata = t->hasReceiveData();
 
 1277     m_has_data_transporters.
set(
id, hasdata);
 
 1281 #ifdef NDB_SCI_TRANSPORTER 
 1284   for (
int i=0; 
i<nSCITransporters; 
i++) 
 
 1288     if(is_connected(nodeId))
 
 1295         Uint32 * readPtr, * eodPtr;
 
 1296         t->getReceivePtr(&readPtr, &eodPtr);
 
 1298         Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
 
 1299         t->updateReceivePtr(newPtr);
 
 1304 #ifdef NDB_SHM_TRANSPORTER 
 1305   for (
int i=0; 
i<nSHMTransporters; 
i++) 
 
 1309     if(is_connected(nodeId)){
 
 1315         Uint32 * readPtr, * eodPtr;
 
 1316         t->getReceivePtr(&readPtr, &eodPtr);
 
 1318         Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
 
 1319         t->updateReceivePtr(newPtr);
 
 1334   if (t && t->
isConnected() && is_connected(nodeId))
 
 1343 TransporterRegistry::consume_extra_sockets()
 
 1348   NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
 
 1351     ret = my_recv(sock, buf, 
sizeof(buf), 0);
 
 1352     err = my_socket_errno();
 
 1353   } 
while (ret == 
sizeof(buf) || (ret == -1 && err == EINTR));
 
 1362 #ifdef NDB_TCP_TRANSPORTER 
 1363   for (i = m_transp_count; i < nTCPTransporters; i++) 
 
 1366     if (t && t->has_data_to_send() &&
 
 1372   for (i = 0; i < m_transp_count && i < nTCPTransporters; i++) 
 
 1375     if (t && t->has_data_to_send() &&
 
 1382   if (m_transp_count == nTCPTransporters) m_transp_count = 0;
 
 1384 #ifdef NDB_SCI_TRANSPORTER 
 1387   for (i=0; i<nSCITransporters; i++) {
 
 1391     if(is_connected(nodeId))
 
 1401 #ifdef NDB_SHM_TRANSPORTER 
 1402   for (i=0; i<nSHMTransporters; i++) 
 
 1406     if(is_connected(nodeId))
 
 1419   int tSendCounter = sendCounter;
 
 1420   sendCounter = tSendCounter + 1;
 
 1421   if (tSendCounter >= sendLimit) {
 
 1429 #ifdef DEBUG_TRANSPORTER 
 1431 TransporterRegistry::printState(){
 
 1432   ndbout << 
"-- TransporterRegistry -- " << endl << endl
 
 1433          << 
"Transporters = " << nTransporters << endl;
 
 1434   for(
int i = 0; i<maxTransporters; i++)
 
 1435     if(theTransporters[i] != NULL){
 
 1437       ndbout << 
"Transporter: " << remoteNodeId 
 
 1438              << 
" PerformState: " << performStates[remoteNodeId]
 
 1439              << 
" IOState: " << ioStates[remoteNodeId] << endl;
 
 1446 TransporterRegistry::isBlocked(NodeId nodeId)
 
 1448   return m_blocked.get(nodeId);
 
 1452 TransporterRegistry::blockReceive(NodeId nodeId)
 
 1458   assert(!m_blocked.get(nodeId));
 
 1460   m_blocked.set(nodeId);
 
 1462   if (m_has_data_transporters.
get(nodeId))
 
 1464     assert(!m_blocked_with_data.get(nodeId));
 
 1465     m_blocked_with_data.set(nodeId);
 
 1466     m_has_data_transporters.
clear(nodeId);
 
 1471 TransporterRegistry::unblockReceive(NodeId nodeId)
 
 1477   assert(m_blocked.get(nodeId));
 
 1478   assert(!m_has_data_transporters.
get(nodeId));
 
 1480   m_blocked.clear(nodeId);
 
 1482   if (m_blocked_with_data.get(nodeId))
 
 1484     m_has_data_transporters.
set(nodeId);
 
 1487   if (m_blocked_disconnected.get(nodeId))
 
 1490     m_blocked_disconnected.clear(nodeId);
 
 1492     report_disconnect(nodeId, m_disconnect_errors[nodeId]);
 
 1499   return ioStates[nodeId]; 
 
 1503 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
 
 1504   if (ioStates[nodeId] == state)
 
 1507   DEBUG(
"TransporterRegistry::setIOState(" 
 1508         << nodeId << 
", " << state << 
")");
 
 1510   ioStates[nodeId] = state;
 
 1514 run_start_clients_C(
void * me)
 
 1529   PerformState &curr_state = performStates[node_id];
 
 1540   DBUG_ENTER(
"TransporterRegistry::do_connect");
 
 1541   DBUG_PRINT(
"info",(
"performStates[%d]=CONNECTING",node_id));
 
 1549   curr_state= CONNECTING;
 
 1562   PerformState &curr_state = performStates[node_id];
 
 1573   DBUG_ENTER(
"TransporterRegistry::do_disconnect");
 
 1574   DBUG_PRINT(
"info",(
"performStates[%d]=DISCONNECTING",node_id));
 
 1575   curr_state= DISCONNECTING;
 
 1576   m_disconnect_errnum[node_id] = errnum;
 
 1581 TransporterRegistry::report_connect(NodeId node_id)
 
 1583   DBUG_ENTER(
"TransporterRegistry::report_connect");
 
 1584   DBUG_PRINT(
"info",(
"performStates[%d]=CONNECTED",node_id));
 
 1595   performStates[node_id] = CONNECTED;
 
 1596 #if defined(HAVE_EPOLL_CREATE) 
 1597   if (likely(m_epoll_fd != -1))
 
 1602       performStates[node_id] = DISCONNECTING;
 
 1612 TransporterRegistry::report_disconnect(NodeId node_id, 
int errnum)
 
 1614   DBUG_ENTER(
"TransporterRegistry::report_disconnect");
 
 1615   DBUG_PRINT(
"info",(
"performStates[%d]=DISCONNECTED",node_id));
 
 1618   if (m_blocked.get(node_id))
 
 1623     m_blocked_disconnected.set(node_id);
 
 1624     m_disconnect_errors[node_id] = errnum;
 
 1629   performStates[node_id] = DISCONNECTED;
 
 1630   m_has_data_transporters.
clear(node_id);
 
 1644                                   const char *errorInfo)
 
 1646   if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
 
 1647       m_error_states[nodeId].m_info == (
const char *)~(UintPtr)0)
 
 1649     m_error_states[nodeId].m_code = errorCode;
 
 1650     m_error_states[nodeId].m_info = errorInfo;
 
 1662   for (
int i= 0, 
n= 0; 
n < nTransporters; i++){
 
 1670     TransporterError 
code = m_error_states[nodeId].m_code;
 
 1671     const char *info = m_error_states[nodeId].m_info;
 
 1672     if (code != TE_NO_ERROR && info != (
const char *)~(UintPtr)0)
 
 1675       m_error_states[nodeId].m_code = TE_NO_ERROR;
 
 1676       m_error_states[nodeId].m_info = (
const char *)~(UintPtr)0;
 
 1679     switch(performStates[nodeId]){
 
 1685         report_connect(nodeId);
 
 1689         report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
 
 1699   int persist_mgm_count= 0;
 
 1700   DBUG_ENTER(
"TransporterRegistry::start_clients_thread");
 
 1701   while (m_run_start_clients_thread) {
 
 1702     NdbSleep_MilliSleep(100);
 
 1703     persist_mgm_count++;
 
 1704     if(persist_mgm_count==50)
 
 1706       ndb_mgm_check_connection(m_mgm_handle);
 
 1707       persist_mgm_count= 0;
 
 1709     for (
int i= 0, 
n= 0; 
n < nTransporters && m_run_start_clients_thread; i++){
 
 1716       switch(performStates[nodeId]){
 
 1719           bool connected= 
false;
 
 1726             DBUG_PRINT(
"info", (
"connecting to node %d using port %d",
 
 1738             DBUG_PRINT(
"info", (
"connection to node %d should use " 
 1747               DBUG_PRINT(
"info", (
"asking mgmd which port to use for node %d",
 
 1751                 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
 
 1754                                                      CFG_CONNECTION_SERVER_PORT,
 
 1757               DBUG_PRINT(
"info",(
"Got dynamic port %d for %d -> %d (ret: %d)",
 
 1762                 DBUG_PRINT(
"info", (
"got port %d to use for connection to %d",
 
 1763                                     server_port, nodeId));
 
 1773                 DBUG_PRINT(
"info", (
"Failed to get dynamic port, res: %d",
 
 1775                 g_eventLogger->
info(
"Failed to get dynamic port, res: %d",
 
 1781                 DBUG_PRINT(
"info", (
"mgmd close connection early"));
 
 1783                   (
"Management server closed connection early. " 
 1784                    "It is probably being shut down (or has problems). " 
 1785                    "We will retry the connection. %d %s %s line: %d",
 
 1812           g_eventLogger->
warning(
"Found connection to %u in state DISCONNECTED " 
 1813                                  " while being connected, disconnecting!",
 
 1828 TransporterRegistry::start_clients()
 
 1830   m_run_start_clients_thread= 
true;
 
 1831   m_start_clients_thread= NdbThread_Create(run_start_clients_C,
 
 1834                                            "ndb_start_clients",
 
 1835                                            NDB_THREAD_PRIO_LOW);
 
 1836   if (m_start_clients_thread == 0)
 
 1838     m_run_start_clients_thread= 
false;
 
 1840   return m_start_clients_thread;
 
 1844 TransporterRegistry::stop_clients()
 
 1846   if (m_start_clients_thread) {
 
 1847     m_run_start_clients_thread= 
false;
 
 1849     NdbThread_WaitFor(m_start_clients_thread, &status);
 
 1850     NdbThread_Destroy(&m_start_clients_thread);
 
 1856 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
 
 1860   DBUG_ENTER(
"TransporterRegistry::add_transporter_interface");
 
 1861   DBUG_PRINT(
"enter",(
"interface=%s, s_port= %d", interf, s_port));
 
 1862   if (interf && strlen(interf) == 0)
 
 1865   for (
unsigned i= 0; i < m_transporter_interface.size(); i++)
 
 1867     Transporter_interface &tmp= m_transporter_interface[
i];
 
 1868     if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
 
 1870     if (interf != 0 && tmp.m_interface != 0 &&
 
 1871         strcmp(interf, tmp.m_interface) == 0)
 
 1875     if (interf == 0 && tmp.m_interface == 0)
 
 1880   Transporter_interface t;
 
 1881   t.m_remote_nodeId= remoteNodeId;
 
 1882   t.m_s_service_port= s_port;
 
 1883   t.m_interface= interf;
 
 1884   m_transporter_interface.push_back(t);
 
 1885   DBUG_PRINT(
"exit",(
"interface and port added"));
 
 1890 TransporterRegistry::start_service(
SocketServer& socket_server)
 
 1892   DBUG_ENTER(
"TransporterRegistry::start_service");
 
 1893   if (m_transporter_interface.size() > 0 &&
 
 1896     g_eventLogger->
error(
"INTERNAL ERROR: not initialized");
 
 1900   for (
unsigned i= 0; i < m_transporter_interface.size(); i++)
 
 1902     Transporter_interface &t= m_transporter_interface[
i];
 
 1904     unsigned short port= (
unsigned short)t.m_s_service_port;
 
 1905     if(t.m_s_service_port<0)
 
 1906       port= -t.m_s_service_port; 
 
 1909     if(!socket_server.
setup(transporter_service,
 
 1910                             &port, t.m_interface))
 
 1912       DBUG_PRINT(
"info", (
"Trying new port"));
 
 1914       if(t.m_s_service_port>0
 
 1915          || !socket_server.
setup(transporter_service,
 
 1916                                  &port, t.m_interface))
 
 1922         g_eventLogger->
error(
"Unable to setup transporter service port: %s:%d!\n" 
 1923                              "Please check if the port is already used,\n" 
 1924                              "(perhaps the node is already running)",
 
 1925                              t.m_interface ? t.m_interface : 
"*", t.m_s_service_port);
 
 1926         delete transporter_service;
 
 1930     t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; 
 
 1931     DBUG_PRINT(
"info", (
"t.m_s_service_port = %d",t.m_s_service_port));
 
 1932     transporter_service->setTransporterRegistry(
this);
 
 1937 #ifdef NDB_SHM_TRANSPORTER 
 1940 shm_sig_handler(
int signo)
 
 1949   DBUG_ENTER(
"TransporterRegistry::startReceiving");
 
 1951 #ifdef NDB_SHM_TRANSPORTER 
 1952   m_shm_own_pid = getpid();
 
 1953   if (g_ndb_shm_signum)
 
 1955     DBUG_PRINT(
"info",(
"Install signal handler for signum %d",
 
 1958     NdbThread_set_shm_sigmask(FALSE);
 
 1959     sigemptyset(&sa.sa_mask);
 
 1960     sa.sa_handler = shm_sig_handler;
 
 1963     while((ret = 
sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
 
 1967       DBUG_PRINT(
"error",(
"Install failed"));
 
 1968       g_eventLogger->
error(
"Failed to install signal handler for" 
 1969                            " SHM transporter, signum %d, errno: %d (%s)",
 
 1970                            g_ndb_shm_signum, errno, strerror(errno));
 
 1973 #endif // NDB_SHM_TRANSPORTER 
 1992 TransporterRegistry::stopSending(){
 
 1996   out << 
"-- Signal Header --" << endl;
 
 1997   out << 
"theLength:    " << sh.theLength << endl;
 
 1998   out << 
"gsn:          " << sh.theVerId_signalNumber << endl;
 
 1999   out << 
"recBlockNo:   " << sh.theReceiversBlockNumber << endl;
 
 2000   out << 
"sendBlockRef: " << sh.theSendersBlockRef << endl;
 
 2001   out << 
"sendersSig:   " << sh.theSendersSignalId << endl;
 
 2002   out << 
"theSignalId:  " << sh.theSignalId << endl;
 
 2003   out << 
"trace:        " << (int)sh.theTrace << endl;
 
 2008 TransporterRegistry::get_transporter(NodeId nodeId) {
 
 2009   assert(nodeId < maxTransporters);
 
 2010   return theTransporters[nodeId];
 
 2014 bool TransporterRegistry::connect_client(
NdbMgmHandle *h)
 
 2016   DBUG_ENTER(
"TransporterRegistry::connect_client(NdbMgmHandle)");
 
 2022     g_eventLogger->
error(
"%s: %d", __FILE__, __LINE__);
 
 2028     g_eventLogger->
error(
"%s: %d", __FILE__, __LINE__);
 
 2035     performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
 
 2049   NDB_SOCKET_TYPE sockfd;
 
 2050   my_socket_invalidate(&sockfd);
 
 2052   DBUG_ENTER(
"TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
 
 2054   if ( h==NULL || *h == NULL )
 
 2056     g_eventLogger->
error(
"Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
 
 2057     DBUG_RETURN(sockfd);
 
 2060   for(
unsigned int i=0;i < m_transporter_interface.size();i++)
 
 2062     if (m_transporter_interface[i].m_s_service_port >= 0)
 
 2065     DBUG_PRINT(
"info", (
"Setting dynamic port %d for connection from node %d",
 
 2066                         m_transporter_interface[i].m_s_service_port,
 
 2067                         m_transporter_interface[i].m_remote_nodeId));
 
 2069     if (ndb_mgm_set_connection_int_parameter(*h,
 
 2071                                    m_transporter_interface[i].m_remote_nodeId,
 
 2072                                    CFG_CONNECTION_SERVER_PORT,
 
 2073                                    m_transporter_interface[i].m_s_service_port,
 
 2076       g_eventLogger->
error(
"Could not set dynamic port for %d->%d (%s:%d)",
 
 2078                            m_transporter_interface[i].m_remote_nodeId,
 
 2079                            __FILE__, __LINE__);
 
 2081       DBUG_RETURN(sockfd);
 
 2089   DBUG_PRINT(
"info", (
"Converting handle to transporter"));
 
 2090   sockfd= ndb_mgm_convert_to_transporter(h);
 
 2091   if (!my_socket_valid(sockfd))
 
 2093     g_eventLogger->
error(
"Failed to convert to transporter (%s: %d)",
 
 2094                          __FILE__, __LINE__);
 
 2097   DBUG_RETURN(sockfd);
 
 2108   my_socket_invalidate(&s);
 
 2110   DBUG_ENTER(
"TransporterRegistry::connect_ndb_mgmd(SocketClient)");
 
 2122     cs.
assfmt(
"%s:%u",sc->get_server_name(),sc->get_port());
 
 2128     DBUG_PRINT(
"info", (
"connection to mgmd failed"));
 
 2142                                  NodeId node, Uint32 lenBytes, Uint32 prio)
 
 2145   Uint32 *insertPtr = handle->
getWritePtr(node, lenBytes, prio,
 
 2146                                           t->get_max_send_buffer());
 
 2148   if (insertPtr == 0) {
 
 2153     if(t->send_is_possible(10)) {
 
 2164         insertPtr = handle->
getWritePtr(node, lenBytes, prio,
 
 2165                                         t->get_max_send_buffer());
 
 2176                                     NodeId node, Uint32 lenBytes, Uint32 prio)
 
 2181   t->update_status_overloaded(used);
 
 2183   if(t->send_limit_reached(used)) {
 
 2192     if(t->send_is_possible(0)) {
 
 2202 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, 
struct iovec *dst,
 
 2205   assert(m_use_default_send_buffer);
 
 2211   SendBuffer *b = m_send_buffers + node;
 
 2212   SendBufferPage *
page = b->m_first_page;
 
 2213   while (page != NULL && count < max)
 
 2215     dst[count].iov_base = page->m_data+page->m_start;
 
 2216     dst[count].iov_len = page->m_bytes;
 
 2217     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
 
 2218     page = page->m_next;
 
 2226 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
 
 2228   assert(m_use_default_send_buffer);
 
 2230   SendBuffer *b = m_send_buffers + node;
 
 2231   Uint32 used_bytes = b->m_used_bytes;
 
 2236   used_bytes -= bytes;
 
 2237   b->m_used_bytes = used_bytes;
 
 2239   SendBufferPage *page = b->m_first_page;
 
 2240   while (bytes && bytes >= page->m_bytes)
 
 2242     SendBufferPage * tmp = 
page;
 
 2243     bytes -= page->m_bytes;
 
 2244     page = page->m_next;
 
 2248   if (used_bytes == 0)
 
 2250     b->m_first_page = 0;
 
 2255     page->m_start += bytes;
 
 2256     page->m_bytes -= bytes;
 
 2257     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
 
 2258     b->m_first_page = 
page;
 
 2265 TransporterRegistry::has_data_to_send(NodeId node)
 
 2267   assert(m_use_default_send_buffer);
 
 2269   SendBuffer *b = m_send_buffers + node;
 
 2270   return (b->m_first_page != NULL && b->m_first_page->m_bytes);
 
 2274 TransporterRegistry::reset_send_buffer(NodeId node, 
bool should_be_empty)
 
 2276   assert(m_use_default_send_buffer);
 
 2281   if (should_be_empty && !has_data_to_send(node))
 
 2283   assert(!should_be_empty);
 
 2285   SendBuffer *b = m_send_buffers + node;
 
 2286   SendBufferPage *page = b->m_first_page;
 
 2287   while (page != NULL)
 
 2289     SendBufferPage *next = page->m_next;
 
 2293   b->m_first_page = NULL;
 
 2294   b->m_last_page = NULL;
 
 2295   b->m_used_bytes = 0;
 
 2298 TransporterRegistry::SendBufferPage *
 
 2299 TransporterRegistry::alloc_page()
 
 2301   SendBufferPage *page = m_page_freelist;
 
 2304     m_page_freelist = page->m_next;
 
 2308   ndbout << 
"ERROR: out of send buffers in kernel." << endl;
 
 2313 TransporterRegistry::release_page(SendBufferPage *page)
 
 2315   assert(page != NULL);
 
 2316   page->m_next = m_page_freelist;
 
 2317   m_page_freelist = 
page;
 
 2321 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
 
 2324   assert(m_use_default_send_buffer);
 
 2326   SendBuffer *b = m_send_buffers + node;
 
 2329   SendBufferPage *page = b->m_last_page;
 
 2330   if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
 
 2332     return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
 
 2335   if (b->m_used_bytes + lenBytes > max_use)
 
 2339   page = alloc_page();
 
 2342   page->m_next = NULL;
 
 2346   if (b->m_last_page == NULL)
 
 2348     b->m_first_page = 
page;
 
 2349     b->m_last_page = 
page;
 
 2353     assert(b->m_first_page != NULL);
 
 2354     b->m_last_page->m_next = 
page;
 
 2355     b->m_last_page = 
page;
 
 2357   return (Uint32 *)(page->m_data);
 
 2361 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
 
 2363   assert(m_use_default_send_buffer);
 
 2365   SendBuffer *b = m_send_buffers + node;
 
 2366   SendBufferPage *page = b->m_last_page;
 
 2367   assert(page != NULL);
 
 2368   assert(page->m_bytes + lenBytes <= page->max_data_bytes());
 
 2369   page->m_bytes += lenBytes;
 
 2370   b->m_used_bytes += lenBytes;
 
 2371   return b->m_used_bytes;
 
 2375 TransporterRegistry::forceSend(NodeId node)
 
 2386 TransporterRegistry::print_transporters(
const char* where, NdbOut& out)
 
 2388   out << where << 
" >>" << endl;
 
 2390   for(
unsigned i = 0; i < maxTransporters; i++){
 
 2391     if(theTransporters[i] == NULL)
 
 2397         << getPerformStateString(remoteNodeId) << 
" to node: " 
 2398         << remoteNodeId << 
" at " 
 2399         << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
 
 2402   out << 
"<<" << endl;
 
 2404   for (
size_t i= 0; i < m_transporter_interface.size(); i++){
 
 2405     Transporter_interface tf= m_transporter_interface[
i];
 
 2408         << 
" remote node: " << tf.m_remote_nodeId
 
 2409         << 
" port: " << tf.m_s_service_port
 
 2410         << 
" interface: " << tf.m_interface << endl;