18 #include <ndb_global.h>
20 #include <TransporterRegistry.hpp>
21 #include "TransporterInternalDefinitions.hpp"
23 #include "Transporter.hpp"
24 #include <SocketAuthenticator.hpp>
26 #ifdef NDB_TCP_TRANSPORTER
27 #include "TCP_Transporter.hpp"
28 #include "Loopback_Transporter.hpp"
31 #ifdef NDB_SCI_TRANSPORTER
32 #include "SCI_Transporter.hpp"
35 #ifdef NDB_SHM_TRANSPORTER
36 #include "SHM_Transporter.hpp"
37 extern int g_ndb_shm_signum;
43 #include <InputStream.hpp>
44 #include <OutputStream.hpp>
46 #include <mgmapi/mgmapi.h>
47 #include <mgmapi_internal.h>
48 #include <mgmapi/mgmapi_debug.h>
50 #include <EventLogger.hpp>
56 return theTransporters[node_id]->m_connect_address;
61 DBUG_ENTER(
"SocketServer::Session * TransporterService::newSession");
62 if (m_auth && !m_auth->server_authenticate(sockfd)){
63 NDB_CLOSE_SOCKET(sockfd);
70 NDB_CLOSE_SOCKET(sockfd);
78 bool use_default_send_buffer,
79 unsigned _maxTransporters,
80 unsigned sizeOfLongSignalMemory) :
84 m_use_default_send_buffer(use_default_send_buffer),
85 m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
86 m_total_max_send_buffer(0)
88 DBUG_ENTER(
"TransporterRegistry::TransporterRegistry");
90 maxTransporters = _maxTransporters;
98 theTransporterTypes =
new TransporterType [maxTransporters];
99 theTransporters =
new Transporter * [maxTransporters];
100 performStates =
new PerformState [maxTransporters];
101 ioStates =
new IOState [maxTransporters];
102 m_disconnect_errnum =
new int [maxTransporters];
103 m_error_states =
new ErrorState [maxTransporters];
105 m_has_extra_wakeup_socket =
false;
106 #if defined(HAVE_EPOLL_CREATE)
108 m_epoll_events =
new struct epoll_event[maxTransporters];
109 m_epoll_fd = epoll_create(maxTransporters);
110 if (m_epoll_fd == -1 || !m_epoll_events)
113 perror(
"Failed to alloc epoll-array or calling epoll_create... falling back to select!");
114 if (m_epoll_fd != -1)
121 delete [] m_epoll_events;
127 memset((
char*)m_epoll_events, 0,
128 maxTransporters *
sizeof(
struct epoll_event));
134 m_blocked_with_data.clear();
135 m_blocked_disconnected.clear();
139 nTCPTransporters = 0;
140 nSCITransporters = 0;
141 nSHMTransporters = 0;
144 ErrorState default_error_state = { TE_NO_ERROR, (
const char *)~(UintPtr)0 };
145 for (
unsigned i=0;
i<maxTransporters;
i++) {
146 theTCPTransporters[
i] = NULL;
147 theSCITransporters[
i] = NULL;
148 theSHMTransporters[
i] = NULL;
149 theTransporters[
i] = NULL;
150 performStates[
i] = DISCONNECTED;
151 ioStates[
i] = NoHalt;
152 m_disconnect_errnum[
i]= 0;
153 m_error_states[
i] = default_error_state;
162 if (!m_use_default_send_buffer)
165 if (total_send_buffer == 0)
171 assert(m_send_buffer_memory);
179 m_send_buffers =
new SendBuffer[maxTransporters];
180 for (
unsigned i = 0;
i < maxTransporters;
i++)
182 SendBuffer &b = m_send_buffers[
i];
183 b.m_first_page = NULL;
184 b.m_last_page = NULL;
189 Uint64 send_buffer_pages =
190 (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
192 send_buffer_pages += nTransporters;
194 m_send_buffer_memory =
195 new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
196 if (m_send_buffer_memory == NULL)
198 ndbout <<
"Unable to allocate "
199 << send_buffer_pages * SendBufferPage::PGSIZE
200 <<
" bytes of memory for send buffers, aborting." << endl;
204 m_page_freelist = NULL;
205 for (
unsigned i = 0;
i < send_buffer_pages;
i++)
207 SendBufferPage *
page =
208 (SendBufferPage *)(m_send_buffer_memory +
i * SendBufferPage::PGSIZE);
210 page->m_next = m_page_freelist;
211 m_page_freelist =
page;
217 DBUG_ENTER(
"TransporterRegistry::set_mgm_handle");
226 DBUG_PRINT(
"info",(
"handle set with connectstring: %s",
231 DBUG_PRINT(
"info",(
"handle set to NULL"));
239 DBUG_ENTER(
"TransporterRegistry::~TransporterRegistry");
243 delete[] theTCPTransporters;
244 delete[] theSCITransporters;
245 delete[] theSHMTransporters;
246 delete[] theTransporterTypes;
247 delete[] theTransporters;
248 delete[] performStates;
250 delete[] m_disconnect_errnum;
251 delete[] m_error_states;
254 delete[] m_send_buffers;
255 m_page_freelist = NULL;
256 if (m_send_buffer_memory)
257 delete[] m_send_buffer_memory;
259 #if defined(HAVE_EPOLL_CREATE)
260 if (m_epoll_events)
delete [] m_epoll_events;
261 if (m_epoll_fd != -1) close(m_epoll_fd);
266 if (m_has_extra_wakeup_socket)
268 my_socket_close(m_extra_wakeup_sockets[0]);
269 my_socket_close(m_extra_wakeup_sockets[1]);
277 for(
unsigned i = 0;
i<maxTransporters;
i++){
278 if(theTransporters[
i] != NULL)
279 removeTransporter(theTransporters[
i]->getRemoteNodeId());
285 for(
unsigned i = 0;
i<maxTransporters;
i++){
286 if(theTransporters[
i] != NULL)
292 TransporterRegistry::init(NodeId nodeId) {
293 DBUG_ENTER(
"TransporterRegistry::init");
294 assert(localNodeId == 0 ||
295 localNodeId == nodeId);
297 localNodeId = nodeId;
299 DEBUG(
"TransporterRegistry started node: " << localNodeId);
301 if (!m_socket_poller.set_max_count(maxTransporters +
312 DBUG_ENTER(
"TransporterRegistry::connect_server(sockfd)");
318 if (s_input.gets(buf,
sizeof(buf)) == 0) {
319 msg.
assfmt(
"line: %u : Failed to get nodeid from client", __LINE__);
320 DBUG_PRINT(
"error", (
"Failed to read 'hello' from client"));
324 int nodeId, remote_transporter_type= -1;
325 int r= sscanf(buf,
"%d %d", &nodeId, &remote_transporter_type);
334 msg.
assfmt(
"line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
335 DBUG_PRINT(
"error", (
"Failed to parse 'hello' from client, buf: '%.*s'",
336 (
int)
sizeof(buf), buf));
340 DBUG_PRINT(
"info", (
"Client hello, nodeId: %d transporter type: %d",
341 nodeId, remote_transporter_type));
346 nodeId >= (
int)maxTransporters)
348 msg.
assfmt(
"line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
349 DBUG_PRINT(
"error", (
"Out of range nodeId: %d from client",
358 msg.
assfmt(
"line: %u : Incorrect reply from client: >%s<, node: %u",
359 __LINE__, buf, nodeId);
360 DBUG_PRINT(
"error", (
"No transporter available for node id %d", nodeId));
365 if (performStates[nodeId] != TransporterRegistry::CONNECTING)
367 msg.
assfmt(
"line: %u : Incorrect state for node %u state: %s (%u)",
369 getPerformStateString(performStates[nodeId]),
370 performStates[nodeId]);
372 DBUG_PRINT(
"error", (
"Transporter for node id %d in wrong state",
378 if (remote_transporter_type != -1 &&
379 remote_transporter_type != t->m_type)
381 g_eventLogger->
error(
"Connection from node: %d uses different transporter "
382 "type: %d, expected type: %d",
383 nodeId, remote_transporter_type, t->m_type);
389 if (s_output.println(
"%d %d", t->
getLocalNodeId(), t->m_type) < 0)
391 msg.
assfmt(
"line: %u : Failed to reply to connecting socket (node: %u)",
393 DBUG_PRINT(
"error", (
"Send of reply failed"));
398 bool res = t->connect_server(sockfd, msg);
400 if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
402 msg.
assfmt(
"line: %u : Incorrect state for node %u state: %s (%u)",
404 getPerformStateString(performStates[nodeId]),
405 performStates[nodeId]);
418 NodeId remoteNodeId = config->remoteNodeId;
421 assert(config->localNodeId == localNodeId);
423 if (remoteNodeId >= maxTransporters)
430 return t->configure(config);
433 DEBUG(
"Configuring transporter from " << localNodeId
434 <<
" to " << remoteNodeId);
436 switch (config->type){
437 case tt_TCP_TRANSPORTER:
438 return createTCPTransporter(config);
439 case tt_SHM_TRANSPORTER:
440 return createSHMTransporter(config);
441 case tt_SCI_TRANSPORTER:
442 return createSCITransporter(config);
453 #ifdef NDB_TCP_TRANSPORTER
456 if (config->remoteNodeId == config->localNodeId)
467 else if (!t->initTransporter()) {
473 theTCPTransporters[nTCPTransporters] = t;
479 m_total_max_send_buffer += t->get_max_send_buffer();
489 #ifdef NDB_SCI_TRANSPORTER
491 if(!SCI_Transporter::initSCI())
495 config->localHostName,
496 config->remoteHostName,
498 config->isMgmConnection,
499 config->sci.sendLimit,
500 config->sci.bufferSize,
501 config->sci.nLocalAdapters,
502 config->sci.remoteSciNodeId0,
503 config->sci.remoteSciNodeId1,
505 config->remoteNodeId,
506 config->serverNodeId,
517 theSCITransporters[nSCITransporters] = t;
523 m_total_max_send_buffer += t->get_max_send_buffer();
533 DBUG_ENTER(
"TransporterRegistry::createTransporter SHM");
534 #ifdef NDB_SHM_TRANSPORTER
536 if (!g_ndb_shm_signum) {
537 g_ndb_shm_signum= config->shm.signum;
538 DBUG_PRINT(
"info",(
"Block signum %d",g_ndb_shm_signum));
543 NdbThread_set_shm_sigmask(TRUE);
546 if(config->shm.signum != g_ndb_shm_signum)
550 config->localHostName,
551 config->remoteHostName,
553 config->isMgmConnection,
555 config->remoteNodeId,
556 config->serverNodeId,
569 theSHMTransporters[nSHMTransporters] = t;
576 m_total_max_send_buffer += t->get_max_send_buffer();
586 TransporterRegistry::removeTransporter(NodeId nodeId) {
588 DEBUG(
"Removing transporter from " << localNodeId
589 <<
" to " << nodeId);
591 if(theTransporters[nodeId] == NULL)
596 const TransporterType
type = theTransporterTypes[nodeId];
600 case tt_TCP_TRANSPORTER:
601 #ifdef NDB_TCP_TRANSPORTER
602 for(; ind < nTCPTransporters; ind++)
603 if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
606 for(; ind<nTCPTransporters; ind++)
607 theTCPTransporters[ind-1] = theTCPTransporters[ind];
611 case tt_SCI_TRANSPORTER:
612 #ifdef NDB_SCI_TRANSPORTER
613 for(; ind < nSCITransporters; ind++)
614 if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
617 for(; ind<nSCITransporters; ind++)
618 theSCITransporters[ind-1] = theSCITransporters[ind];
622 case tt_SHM_TRANSPORTER:
623 #ifdef NDB_SHM_TRANSPORTER
624 for(; ind < nSHMTransporters; ind++)
625 if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
628 for(; ind<nSHMTransporters; ind++)
629 theSHMTransporters[ind-1] = theSHMTransporters[ind];
638 delete theTransporters[nodeId];
639 theTransporters[nodeId] = NULL;
646 const Uint32 *
const signalData,
653 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
654 ((signalHeader->theReceiversBlockNumber == 252) ||
655 (signalHeader->theReceiversBlockNumber == 4002)))) {
658 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
659 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
660 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
662 t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
663 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
673 for(
int i = 0;
i<50;
i++){
674 if((nSHMTransporters+nSCITransporters) == 0)
675 NdbSleep_MilliSleep(sleepTime);
676 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
678 t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
679 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
692 WARNING(
"Signal to " << nodeId <<
" lost(buffer)");
694 return SEND_BUFFER_FULL;
696 return SEND_MESSAGE_TOO_BIG;
700 if (m_blocked.get(nodeId))
705 WARNING(
"Signal to " << nodeId <<
" discarded as node blocked + disconnected");
709 DEBUG(
"Signal to " << nodeId <<
" lost(disconnect) ");
710 return SEND_DISCONNECTED;
713 DEBUG(
"Discarding message to block: "
714 << signalHeader->theReceiversBlockNumber
715 <<
" node: " << nodeId);
718 return SEND_UNKNOWN_NODE;
728 const Uint32 *
const signalData,
736 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
737 ((signalHeader->theReceiversBlockNumber == 252)||
738 (signalHeader->theReceiversBlockNumber == 4002)))) {
741 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
742 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
743 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
745 t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
746 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
756 for(
int i = 0;
i<50;
i++){
757 if((nSHMTransporters+nSCITransporters) == 0)
758 NdbSleep_MilliSleep(sleepTime);
759 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
761 t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
762 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
775 WARNING(
"Signal to " << nodeId <<
" lost(buffer)");
777 return SEND_BUFFER_FULL;
779 return SEND_MESSAGE_TOO_BIG;
783 if (m_blocked.get(nodeId))
788 WARNING(
"Signal to " << nodeId <<
" discarded as node blocked + disconnected");
792 DEBUG(
"Signal to " << nodeId <<
" lost(disconnect) ");
793 return SEND_DISCONNECTED;
796 DEBUG(
"Discarding message to block: "
797 << signalHeader->theReceiversBlockNumber
798 <<
" node: " << nodeId);
801 return SEND_UNKNOWN_NODE;
812 const Uint32 *
const signalData,
819 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
820 ((signalHeader->theReceiversBlockNumber == 252) ||
821 (signalHeader->theReceiversBlockNumber == 4002)))) {
824 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
825 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
826 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
828 t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
829 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
839 for(
int i = 0;
i<50;
i++){
840 if((nSHMTransporters+nSCITransporters) == 0)
841 NdbSleep_MilliSleep(sleepTime);
842 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
844 t->m_packer.
pack(insertPtr, prio, signalHeader, signalData, ptr);
845 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
858 WARNING(
"Signal to " << nodeId <<
" lost(buffer)");
860 return SEND_BUFFER_FULL;
862 return SEND_MESSAGE_TOO_BIG;
865 DEBUG(
"Signal to " << nodeId <<
" lost(disconnect) ");
866 return SEND_DISCONNECTED;
869 DEBUG(
"Discarding message to block: "
870 << signalHeader->theReceiversBlockNumber
871 <<
" node: " << nodeId);
874 return SEND_UNKNOWN_NODE;
888 if(pollReceive(timeOutMillis)){
895 TransporterRegistry::setup_wakeup_socket()
897 if (m_has_extra_wakeup_socket)
902 if (my_socketpair(m_extra_wakeup_sockets))
904 perror(
"socketpair failed!");
908 if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
909 !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
914 #if defined(HAVE_EPOLL_CREATE)
915 if (m_epoll_fd != -1)
917 int sock = m_extra_wakeup_sockets[0].fd;
918 struct epoll_event event_poll;
919 bzero(&event_poll,
sizeof(event_poll));
920 event_poll.data.u32 = 0;
921 event_poll.events = EPOLLIN;
922 int ret_val = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, sock, &event_poll);
926 fprintf(stderr,
"Failed to add extra sock %u to epoll-set: %u\n",
933 m_has_extra_wakeup_socket =
true;
937 my_socket_close(m_extra_wakeup_sockets[0]);
938 my_socket_close(m_extra_wakeup_sockets[1]);
939 my_socket_invalidate(m_extra_wakeup_sockets+0);
940 my_socket_invalidate(m_extra_wakeup_sockets+1);
945 TransporterRegistry::wakeup()
947 if (m_has_extra_wakeup_socket)
950 my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
955 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
970 if (nSCITransporters > 0)
975 #ifdef NDB_SHM_TRANSPORTER
976 if (nSHMTransporters > 0)
978 Uint32 res = poll_SHM(0, mask);
987 #ifdef NDB_TCP_TRANSPORTER
988 #if defined(HAVE_EPOLL_CREATE)
989 if (likely(m_epoll_fd != -1))
991 Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
995 tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
996 num_trps, timeOutMillis);
997 retVal |= tcpReadSelectReply;
1000 int num_socket_events = tcpReadSelectReply;
1001 if (num_socket_events > 0)
1003 for (
int i = 0;
i < num_socket_events;
i++)
1005 const Uint32 trpid = m_epoll_events[
i].data.u32;
1007 if (m_blocked.get(trpid))
1010 m_blocked_with_data.set(trpid);
1017 else if (num_socket_events < 0)
1019 assert(errno == EINTR);
1025 if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
1027 retVal |= poll_TCP(timeOutMillis, mask);
1030 tcpReadSelectReply = 0;
1033 #ifdef NDB_SCI_TRANSPORTER
1034 if (nSCITransporters > 0)
1035 retVal |= poll_SCI(timeOutMillis, mask);
1037 #ifdef NDB_SHM_TRANSPORTER
1038 if (nSHMTransporters > 0)
1040 int res = poll_SHM(0, mask);
1048 #ifdef NDB_SCI_TRANSPORTER
1050 TransporterRegistry::poll_SCI(Uint32 timeOutMillis,
NodeBitmask& mask)
1053 for (
int i = 0;
i < nSCITransporters;
i++)
1059 if (t->hasDataToRead())
1071 #ifdef NDB_SHM_TRANSPORTER
1072 static int g_shm_counter = 0;
1074 TransporterRegistry::poll_SHM(Uint32 timeOutMillis,
NodeBitmask& mask)
1077 for (
int j = 0; j < 100; j++)
1079 for (
int i = 0;
i<nSHMTransporters;
i++)
1085 if (t->hasDataToRead())
1098 #ifdef NDB_TCP_TRANSPORTER
1108 TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
NodeBitmask& mask)
1110 m_socket_poller.clear();
1112 if (m_has_extra_wakeup_socket)
1114 const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1117 m_socket_poller.add(socket,
true,
false,
false);
1120 Uint16 idx[MAX_NODES];
1121 for (
int i = 0;
i < nTCPTransporters;
i++)
1124 const NDB_SOCKET_TYPE socket = t->getSocket();
1127 if (is_connected(node_id) && t->
isConnected() && my_socket_valid(socket))
1129 idx[
i] = m_socket_poller.add(socket,
true,
false,
false);
1133 idx[
i] = MAX_NODES + 1;
1137 tcpReadSelectReply = m_socket_poller.poll_unsafe(timeOutMillis);
1139 if (tcpReadSelectReply > 0)
1141 if (m_extra_wakeup_sockets)
1143 if (m_socket_poller.has_read(0))
1144 mask.
set((Uint32)0);
1147 for (
int i = 0;
i < nTCPTransporters;
i++)
1150 if (idx[
i] != MAX_NODES + 1)
1154 if (m_blocked.get(
i))
1157 m_blocked_with_data.set(
i);
1161 if (m_socket_poller.has_read(idx[
i]))
1167 return tcpReadSelectReply;
1171 #if defined(HAVE_EPOLL_CREATE)
1175 struct epoll_event event_poll;
1176 bzero(&event_poll,
sizeof(event_poll));
1177 NDB_SOCKET_TYPE sock_fd = t->getSocket();
1179 int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
1182 if (!my_socket_valid(sock_fd))
1186 event_poll.events = EPOLLIN;
1187 ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
1191 if (error == ENOENT && !add)
1199 if (!add || (add && (error != ENOMEM)))
1206 ndbout_c(
"Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
1207 " node %u to epoll-set,"
1209 add ?
"ADD" :
"DEL",
1211 MY_SOCKET_FORMAT_VALUE(sock_fd),
1217 ndbout <<
"We lacked memory to add the socket for node id ";
1218 ndbout << node_id << endl;
1233 bool hasReceived =
false;
1235 if (m_has_data_transporters.
get(0))
1237 m_has_data_transporters.
clear(Uint32(0));
1238 consume_extra_sockets();
1242 if (!m_blocked.isclear())
1244 if (m_has_data_transporters.
isclear())
1249 NdbSleep_MilliSleep(1);
1254 #ifdef NDB_TCP_TRANSPORTER
1256 while ((
id = m_has_data_transporters.
find(
id + 1)) != BitmaskImpl::NotFound)
1258 bool hasdata =
false;
1260 if (is_connected(
id))
1269 Uint32 sz = t->getReceiveData(&ptr);
1271 Uint32 szUsed = unpack(ptr, sz,
id, ioStates[
id]);
1272 t->updateReceiveDataPtr(szUsed);
1273 hasdata = t->hasReceiveData();
1277 m_has_data_transporters.
set(
id, hasdata);
1281 #ifdef NDB_SCI_TRANSPORTER
1284 for (
int i=0;
i<nSCITransporters;
i++)
1288 if(is_connected(nodeId))
1295 Uint32 * readPtr, * eodPtr;
1296 t->getReceivePtr(&readPtr, &eodPtr);
1298 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1299 t->updateReceivePtr(newPtr);
1304 #ifdef NDB_SHM_TRANSPORTER
1305 for (
int i=0;
i<nSHMTransporters;
i++)
1309 if(is_connected(nodeId)){
1315 Uint32 * readPtr, * eodPtr;
1316 t->getReceivePtr(&readPtr, &eodPtr);
1318 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1319 t->updateReceivePtr(newPtr);
1334 if (t && t->
isConnected() && is_connected(nodeId))
1343 TransporterRegistry::consume_extra_sockets()
1348 NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
1351 ret = my_recv(sock, buf,
sizeof(buf), 0);
1352 err = my_socket_errno();
1353 }
while (ret ==
sizeof(buf) || (ret == -1 && err == EINTR));
1362 #ifdef NDB_TCP_TRANSPORTER
1363 for (i = m_transp_count; i < nTCPTransporters; i++)
1366 if (t && t->has_data_to_send() &&
1372 for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
1375 if (t && t->has_data_to_send() &&
1382 if (m_transp_count == nTCPTransporters) m_transp_count = 0;
1384 #ifdef NDB_SCI_TRANSPORTER
1387 for (i=0; i<nSCITransporters; i++) {
1391 if(is_connected(nodeId))
1401 #ifdef NDB_SHM_TRANSPORTER
1402 for (i=0; i<nSHMTransporters; i++)
1406 if(is_connected(nodeId))
1419 int tSendCounter = sendCounter;
1420 sendCounter = tSendCounter + 1;
1421 if (tSendCounter >= sendLimit) {
1429 #ifdef DEBUG_TRANSPORTER
1431 TransporterRegistry::printState(){
1432 ndbout <<
"-- TransporterRegistry -- " << endl << endl
1433 <<
"Transporters = " << nTransporters << endl;
1434 for(
int i = 0; i<maxTransporters; i++)
1435 if(theTransporters[i] != NULL){
1437 ndbout <<
"Transporter: " << remoteNodeId
1438 <<
" PerformState: " << performStates[remoteNodeId]
1439 <<
" IOState: " << ioStates[remoteNodeId] << endl;
1446 TransporterRegistry::isBlocked(NodeId nodeId)
1448 return m_blocked.get(nodeId);
1452 TransporterRegistry::blockReceive(NodeId nodeId)
1458 assert(!m_blocked.get(nodeId));
1460 m_blocked.set(nodeId);
1462 if (m_has_data_transporters.
get(nodeId))
1464 assert(!m_blocked_with_data.get(nodeId));
1465 m_blocked_with_data.set(nodeId);
1466 m_has_data_transporters.
clear(nodeId);
1471 TransporterRegistry::unblockReceive(NodeId nodeId)
1477 assert(m_blocked.get(nodeId));
1478 assert(!m_has_data_transporters.
get(nodeId));
1480 m_blocked.clear(nodeId);
1482 if (m_blocked_with_data.get(nodeId))
1484 m_has_data_transporters.
set(nodeId);
1487 if (m_blocked_disconnected.get(nodeId))
1490 m_blocked_disconnected.clear(nodeId);
1492 report_disconnect(nodeId, m_disconnect_errors[nodeId]);
1499 return ioStates[nodeId];
1503 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
1504 if (ioStates[nodeId] == state)
1507 DEBUG(
"TransporterRegistry::setIOState("
1508 << nodeId <<
", " << state <<
")");
1510 ioStates[nodeId] = state;
1514 run_start_clients_C(
void * me)
1529 PerformState &curr_state = performStates[node_id];
1540 DBUG_ENTER(
"TransporterRegistry::do_connect");
1541 DBUG_PRINT(
"info",(
"performStates[%d]=CONNECTING",node_id));
1549 curr_state= CONNECTING;
1562 PerformState &curr_state = performStates[node_id];
1573 DBUG_ENTER(
"TransporterRegistry::do_disconnect");
1574 DBUG_PRINT(
"info",(
"performStates[%d]=DISCONNECTING",node_id));
1575 curr_state= DISCONNECTING;
1576 m_disconnect_errnum[node_id] = errnum;
1581 TransporterRegistry::report_connect(NodeId node_id)
1583 DBUG_ENTER(
"TransporterRegistry::report_connect");
1584 DBUG_PRINT(
"info",(
"performStates[%d]=CONNECTED",node_id));
1595 performStates[node_id] = CONNECTED;
1596 #if defined(HAVE_EPOLL_CREATE)
1597 if (likely(m_epoll_fd != -1))
1602 performStates[node_id] = DISCONNECTING;
1612 TransporterRegistry::report_disconnect(NodeId node_id,
int errnum)
1614 DBUG_ENTER(
"TransporterRegistry::report_disconnect");
1615 DBUG_PRINT(
"info",(
"performStates[%d]=DISCONNECTED",node_id));
1618 if (m_blocked.get(node_id))
1623 m_blocked_disconnected.set(node_id);
1624 m_disconnect_errors[node_id] = errnum;
1629 performStates[node_id] = DISCONNECTED;
1630 m_has_data_transporters.
clear(node_id);
1644 const char *errorInfo)
1646 if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
1647 m_error_states[nodeId].m_info == (
const char *)~(UintPtr)0)
1649 m_error_states[nodeId].m_code = errorCode;
1650 m_error_states[nodeId].m_info = errorInfo;
1662 for (
int i= 0,
n= 0;
n < nTransporters; i++){
1670 TransporterError
code = m_error_states[nodeId].m_code;
1671 const char *info = m_error_states[nodeId].m_info;
1672 if (code != TE_NO_ERROR && info != (
const char *)~(UintPtr)0)
1675 m_error_states[nodeId].m_code = TE_NO_ERROR;
1676 m_error_states[nodeId].m_info = (
const char *)~(UintPtr)0;
1679 switch(performStates[nodeId]){
1685 report_connect(nodeId);
1689 report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
1699 int persist_mgm_count= 0;
1700 DBUG_ENTER(
"TransporterRegistry::start_clients_thread");
1701 while (m_run_start_clients_thread) {
1702 NdbSleep_MilliSleep(100);
1703 persist_mgm_count++;
1704 if(persist_mgm_count==50)
1706 ndb_mgm_check_connection(m_mgm_handle);
1707 persist_mgm_count= 0;
1709 for (
int i= 0,
n= 0;
n < nTransporters && m_run_start_clients_thread; i++){
1716 switch(performStates[nodeId]){
1719 bool connected=
false;
1726 DBUG_PRINT(
"info", (
"connecting to node %d using port %d",
1738 DBUG_PRINT(
"info", (
"connection to node %d should use "
1747 DBUG_PRINT(
"info", (
"asking mgmd which port to use for node %d",
1751 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1754 CFG_CONNECTION_SERVER_PORT,
1757 DBUG_PRINT(
"info",(
"Got dynamic port %d for %d -> %d (ret: %d)",
1762 DBUG_PRINT(
"info", (
"got port %d to use for connection to %d",
1763 server_port, nodeId));
1773 DBUG_PRINT(
"info", (
"Failed to get dynamic port, res: %d",
1775 g_eventLogger->
info(
"Failed to get dynamic port, res: %d",
1781 DBUG_PRINT(
"info", (
"mgmd close connection early"));
1783 (
"Management server closed connection early. "
1784 "It is probably being shut down (or has problems). "
1785 "We will retry the connection. %d %s %s line: %d",
1812 g_eventLogger->
warning(
"Found connection to %u in state DISCONNECTED "
1813 " while being connected, disconnecting!",
1828 TransporterRegistry::start_clients()
1830 m_run_start_clients_thread=
true;
1831 m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1834 "ndb_start_clients",
1835 NDB_THREAD_PRIO_LOW);
1836 if (m_start_clients_thread == 0)
1838 m_run_start_clients_thread=
false;
1840 return m_start_clients_thread;
1844 TransporterRegistry::stop_clients()
1846 if (m_start_clients_thread) {
1847 m_run_start_clients_thread=
false;
1849 NdbThread_WaitFor(m_start_clients_thread, &status);
1850 NdbThread_Destroy(&m_start_clients_thread);
1856 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1860 DBUG_ENTER(
"TransporterRegistry::add_transporter_interface");
1861 DBUG_PRINT(
"enter",(
"interface=%s, s_port= %d", interf, s_port));
1862 if (interf && strlen(interf) == 0)
1865 for (
unsigned i= 0; i < m_transporter_interface.size(); i++)
1867 Transporter_interface &tmp= m_transporter_interface[
i];
1868 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1870 if (interf != 0 && tmp.m_interface != 0 &&
1871 strcmp(interf, tmp.m_interface) == 0)
1875 if (interf == 0 && tmp.m_interface == 0)
1880 Transporter_interface t;
1881 t.m_remote_nodeId= remoteNodeId;
1882 t.m_s_service_port= s_port;
1883 t.m_interface= interf;
1884 m_transporter_interface.push_back(t);
1885 DBUG_PRINT(
"exit",(
"interface and port added"));
1890 TransporterRegistry::start_service(
SocketServer& socket_server)
1892 DBUG_ENTER(
"TransporterRegistry::start_service");
1893 if (m_transporter_interface.size() > 0 &&
1896 g_eventLogger->
error(
"INTERNAL ERROR: not initialized");
1900 for (
unsigned i= 0; i < m_transporter_interface.size(); i++)
1902 Transporter_interface &t= m_transporter_interface[
i];
1904 unsigned short port= (
unsigned short)t.m_s_service_port;
1905 if(t.m_s_service_port<0)
1906 port= -t.m_s_service_port;
1909 if(!socket_server.
setup(transporter_service,
1910 &port, t.m_interface))
1912 DBUG_PRINT(
"info", (
"Trying new port"));
1914 if(t.m_s_service_port>0
1915 || !socket_server.
setup(transporter_service,
1916 &port, t.m_interface))
1922 g_eventLogger->
error(
"Unable to setup transporter service port: %s:%d!\n"
1923 "Please check if the port is already used,\n"
1924 "(perhaps the node is already running)",
1925 t.m_interface ? t.m_interface :
"*", t.m_s_service_port);
1926 delete transporter_service;
1930 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port;
1931 DBUG_PRINT(
"info", (
"t.m_s_service_port = %d",t.m_s_service_port));
1932 transporter_service->setTransporterRegistry(
this);
1937 #ifdef NDB_SHM_TRANSPORTER
1940 shm_sig_handler(
int signo)
1949 DBUG_ENTER(
"TransporterRegistry::startReceiving");
1951 #ifdef NDB_SHM_TRANSPORTER
1952 m_shm_own_pid = getpid();
1953 if (g_ndb_shm_signum)
1955 DBUG_PRINT(
"info",(
"Install signal handler for signum %d",
1958 NdbThread_set_shm_sigmask(FALSE);
1959 sigemptyset(&sa.sa_mask);
1960 sa.sa_handler = shm_sig_handler;
1963 while((ret =
sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
1967 DBUG_PRINT(
"error",(
"Install failed"));
1968 g_eventLogger->
error(
"Failed to install signal handler for"
1969 " SHM transporter, signum %d, errno: %d (%s)",
1970 g_ndb_shm_signum, errno, strerror(errno));
1973 #endif // NDB_SHM_TRANSPORTER
1992 TransporterRegistry::stopSending(){
1996 out <<
"-- Signal Header --" << endl;
1997 out <<
"theLength: " << sh.theLength << endl;
1998 out <<
"gsn: " << sh.theVerId_signalNumber << endl;
1999 out <<
"recBlockNo: " << sh.theReceiversBlockNumber << endl;
2000 out <<
"sendBlockRef: " << sh.theSendersBlockRef << endl;
2001 out <<
"sendersSig: " << sh.theSendersSignalId << endl;
2002 out <<
"theSignalId: " << sh.theSignalId << endl;
2003 out <<
"trace: " << (int)sh.theTrace << endl;
2008 TransporterRegistry::get_transporter(NodeId nodeId) {
2009 assert(nodeId < maxTransporters);
2010 return theTransporters[nodeId];
2014 bool TransporterRegistry::connect_client(
NdbMgmHandle *h)
2016 DBUG_ENTER(
"TransporterRegistry::connect_client(NdbMgmHandle)");
2022 g_eventLogger->
error(
"%s: %d", __FILE__, __LINE__);
2028 g_eventLogger->
error(
"%s: %d", __FILE__, __LINE__);
2035 performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
2049 NDB_SOCKET_TYPE sockfd;
2050 my_socket_invalidate(&sockfd);
2052 DBUG_ENTER(
"TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
2054 if ( h==NULL || *h == NULL )
2056 g_eventLogger->
error(
"Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
2057 DBUG_RETURN(sockfd);
2060 for(
unsigned int i=0;i < m_transporter_interface.size();i++)
2062 if (m_transporter_interface[i].m_s_service_port >= 0)
2065 DBUG_PRINT(
"info", (
"Setting dynamic port %d for connection from node %d",
2066 m_transporter_interface[i].m_s_service_port,
2067 m_transporter_interface[i].m_remote_nodeId));
2069 if (ndb_mgm_set_connection_int_parameter(*h,
2071 m_transporter_interface[i].m_remote_nodeId,
2072 CFG_CONNECTION_SERVER_PORT,
2073 m_transporter_interface[i].m_s_service_port,
2076 g_eventLogger->
error(
"Could not set dynamic port for %d->%d (%s:%d)",
2078 m_transporter_interface[i].m_remote_nodeId,
2079 __FILE__, __LINE__);
2081 DBUG_RETURN(sockfd);
2089 DBUG_PRINT(
"info", (
"Converting handle to transporter"));
2090 sockfd= ndb_mgm_convert_to_transporter(h);
2091 if (!my_socket_valid(sockfd))
2093 g_eventLogger->
error(
"Failed to convert to transporter (%s: %d)",
2094 __FILE__, __LINE__);
2097 DBUG_RETURN(sockfd);
2108 my_socket_invalidate(&s);
2110 DBUG_ENTER(
"TransporterRegistry::connect_ndb_mgmd(SocketClient)");
2122 cs.
assfmt(
"%s:%u",sc->get_server_name(),sc->get_port());
2128 DBUG_PRINT(
"info", (
"connection to mgmd failed"));
2142 NodeId node, Uint32 lenBytes, Uint32 prio)
2145 Uint32 *insertPtr = handle->
getWritePtr(node, lenBytes, prio,
2146 t->get_max_send_buffer());
2148 if (insertPtr == 0) {
2153 if(t->send_is_possible(10)) {
2164 insertPtr = handle->
getWritePtr(node, lenBytes, prio,
2165 t->get_max_send_buffer());
2176 NodeId node, Uint32 lenBytes, Uint32 prio)
2181 t->update_status_overloaded(used);
2183 if(t->send_limit_reached(used)) {
2192 if(t->send_is_possible(0)) {
2202 TransporterRegistry::get_bytes_to_send_iovec(NodeId node,
struct iovec *dst,
2205 assert(m_use_default_send_buffer);
2211 SendBuffer *b = m_send_buffers + node;
2212 SendBufferPage *
page = b->m_first_page;
2213 while (page != NULL && count < max)
2215 dst[count].iov_base = page->m_data+page->m_start;
2216 dst[count].iov_len = page->m_bytes;
2217 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2218 page = page->m_next;
2226 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
2228 assert(m_use_default_send_buffer);
2230 SendBuffer *b = m_send_buffers + node;
2231 Uint32 used_bytes = b->m_used_bytes;
2236 used_bytes -= bytes;
2237 b->m_used_bytes = used_bytes;
2239 SendBufferPage *page = b->m_first_page;
2240 while (bytes && bytes >= page->m_bytes)
2242 SendBufferPage * tmp =
page;
2243 bytes -= page->m_bytes;
2244 page = page->m_next;
2248 if (used_bytes == 0)
2250 b->m_first_page = 0;
2255 page->m_start += bytes;
2256 page->m_bytes -= bytes;
2257 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2258 b->m_first_page =
page;
2265 TransporterRegistry::has_data_to_send(NodeId node)
2267 assert(m_use_default_send_buffer);
2269 SendBuffer *b = m_send_buffers + node;
2270 return (b->m_first_page != NULL && b->m_first_page->m_bytes);
2274 TransporterRegistry::reset_send_buffer(NodeId node,
bool should_be_empty)
2276 assert(m_use_default_send_buffer);
2281 if (should_be_empty && !has_data_to_send(node))
2283 assert(!should_be_empty);
2285 SendBuffer *b = m_send_buffers + node;
2286 SendBufferPage *page = b->m_first_page;
2287 while (page != NULL)
2289 SendBufferPage *next = page->m_next;
2293 b->m_first_page = NULL;
2294 b->m_last_page = NULL;
2295 b->m_used_bytes = 0;
2298 TransporterRegistry::SendBufferPage *
2299 TransporterRegistry::alloc_page()
2301 SendBufferPage *page = m_page_freelist;
2304 m_page_freelist = page->m_next;
2308 ndbout <<
"ERROR: out of send buffers in kernel." << endl;
2313 TransporterRegistry::release_page(SendBufferPage *page)
2315 assert(page != NULL);
2316 page->m_next = m_page_freelist;
2317 m_page_freelist =
page;
2321 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
2324 assert(m_use_default_send_buffer);
2326 SendBuffer *b = m_send_buffers + node;
2329 SendBufferPage *page = b->m_last_page;
2330 if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
2332 return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
2335 if (b->m_used_bytes + lenBytes > max_use)
2339 page = alloc_page();
2342 page->m_next = NULL;
2346 if (b->m_last_page == NULL)
2348 b->m_first_page =
page;
2349 b->m_last_page =
page;
2353 assert(b->m_first_page != NULL);
2354 b->m_last_page->m_next =
page;
2355 b->m_last_page =
page;
2357 return (Uint32 *)(page->m_data);
2361 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2363 assert(m_use_default_send_buffer);
2365 SendBuffer *b = m_send_buffers + node;
2366 SendBufferPage *page = b->m_last_page;
2367 assert(page != NULL);
2368 assert(page->m_bytes + lenBytes <= page->max_data_bytes());
2369 page->m_bytes += lenBytes;
2370 b->m_used_bytes += lenBytes;
2371 return b->m_used_bytes;
2375 TransporterRegistry::forceSend(NodeId node)
2386 TransporterRegistry::print_transporters(
const char* where, NdbOut& out)
2388 out << where <<
" >>" << endl;
2390 for(
unsigned i = 0; i < maxTransporters; i++){
2391 if(theTransporters[i] == NULL)
2397 << getPerformStateString(remoteNodeId) <<
" to node: "
2398 << remoteNodeId <<
" at "
2399 << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
2402 out <<
"<<" << endl;
2404 for (
size_t i= 0; i < m_transporter_interface.size(); i++){
2405 Transporter_interface tf= m_transporter_interface[
i];
2408 <<
" remote node: " << tf.m_remote_nodeId
2409 <<
" port: " << tf.m_s_service_port
2410 <<
" interface: " << tf.m_interface << endl;