19 #include <ndb_global.h>
21 #include "SCI_Transporter.hpp"
27 #include "TransporterInternalDefinitions.hpp"
28 #include <TransporterCallback.hpp>
30 #include <InputStream.hpp>
31 #include <OutputStream.hpp>
34 #define DEBUG_TRANSPORTER
36 const char *lHostName,
37 const char *rHostName,
43 Uint16 remoteSciNodeId0,
44 Uint16 remoteSciNodeId1,
52 lHostName, rHostName, r_port, isMgmConnection, _localNodeId,
53 _remoteNodeId, serverNodeId, 0, false, chksm, signalId,
54 4 * ((packetSize + 3)/4) + MAX_MESSAGE_SIZE)
56 DBUG_ENTER(
"SCI_Transporter::SCI_Transporter");
57 m_PacketSize = (packetSize + 3)/4 ;
58 m_BufferSize = bufferSize;
60 m_RemoteSciNodeId = remoteSciNodeId0;
62 if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0)
63 m_numberOfRemoteNodes=1;
65 m_numberOfRemoteNodes=2;
67 m_RemoteSciNodeId1 = remoteSciNodeId1;
72 m_remoteNodes[0]=remoteSciNodeId0;
73 m_remoteNodes[1]=remoteSciNodeId1;
74 m_adapters = nAdapters;
81 sciAdapters=
new SciAdapter[nAdapters* (
sizeof (SciAdapter))];
82 if(sciAdapters==NULL) {
84 m_SourceSegm=
new sourceSegm[nAdapters* (
sizeof (sourceSegm))];
85 if(m_SourceSegm==NULL) {
87 m_TargetSegm=
new targetSegm[nAdapters* (
sizeof (targetSegm))];
88 if(m_TargetSegm==NULL) {
90 m_reportFreq= reportFreq;
93 #ifdef DEBUG_TRANSPORTER
109 if (conf->sci.sendLimit == (m_PacketSize + 3)/4 &&
110 conf->sci.bufferSize == m_buffersize &&
111 conf->sci.nLocalAdapters == m_adapters &&
112 conf->sci.remoteSciNodeId0 == m_remoteNodes[0] &&
113 conf->sci.remoteSciNodeId1 == m_remoteNodes[1])
121 DBUG_ENTER(
"SCI_Transporter::disconnectImpl");
125 DBUG_PRINT(
"info", (
"connect status = %d, remote node = %d",
135 for(Uint32
i=0;
i<m_adapters ;
i++) {
136 SCIClose(sciAdapters[
i].scidesc, FLAGS, &err);
138 if(err != SCI_ERR_OK) {
139 report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL);
141 (
"Cannot close channel to the driver. Error code 0x%x",
148 #ifdef DEBUG_TRANSPORTER
149 ndbout <<
"total: " << i1024+ i10242048 + i2048+i2049 << endl;
150 ndbout <<
"<1024: " << i1024 << endl;
151 ndbout <<
"1024-2047: " << i10242048 << endl;
152 ndbout <<
"==2048: " << i2048 << endl;
153 ndbout <<
"2049-4096: " << i20484096 << endl;
154 ndbout <<
"==4096: " << i4096 << endl;
155 ndbout <<
">4096: " << i4097 << endl;
162 DBUG_ENTER(
"SCI_Transporter::initTransporter");
163 if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){
164 m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096;
167 DBUG_PRINT(
"info", (
"SCI packet size %d", m_PacketSize * 4));
168 if(!getLinkStatus(m_ActiveAdapterId) ||
170 !getLinkStatus(m_StandbyAdapterId))) {
172 (
"The link is not fully operational. Check the cables and the switches"));
174 report_error(TE_SCI_LINK_ERROR);
184 sci_query_adapter_t queryAdapter;
188 queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID;
189 queryAdapter.localAdapterNo = adapterNo;
190 queryAdapter.data = &_localNodeId;
192 SCIQuery(SCI_Q_ADAPTER,(
void*)(&queryAdapter),(Uint32)NULL,&error);
194 if(error != SCI_ERR_OK)
200 bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
202 sci_query_adapter_t queryAdapter;
205 queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL;
207 queryAdapter.localAdapterNo = adapterNo;
208 queryAdapter.data = &linkstatus;
210 SCIQuery(SCI_Q_ADAPTER,(
void*)(&queryAdapter),(Uint32)NULL,&error);
212 if(error != SCI_ERR_OK) {
213 DBUG_PRINT(
"error", (
"error %d querying adapter", error));
224 DBUG_ENTER(
"SCI_Transporter::initLocalSegment");
225 Uint32 segmentSize = m_BufferSize;
229 for(Uint32
i=0;
i<m_adapters ;
i++) {
230 SCIOpen(&(sciAdapters[
i].scidesc), FLAGS, &err);
232 DBUG_PRINT(
"info", (
"SCInode iD %d adapter %d\n",
233 sciAdapters[
i].localSciNodeId,
i));
234 if(err != SCI_ERR_OK) {
236 (
"Cannot open an SCI virtual device. Error code 0x%x",
245 SCICreateSegment(sciAdapters[0].scidesc,
246 &(m_SourceSegm[0].localHandle),
254 if(err != SCI_ERR_OK) {
255 DBUG_PRINT(
"error", (
"Error creating segment, err = 0x%x", err));
258 DBUG_PRINT(
"info", (
"created segment id : %d",
263 for(Uint32
i=0;
i < m_adapters;
i++) {
264 SCIPrepareSegment((m_SourceSegm[0].localHandle),
269 if(err != SCI_ERR_OK) {
271 (
"Local Segment is not accessible by an SCI adapter. Error code 0x%x\n",
278 m_SourceSegm[0].mappedMemory =
279 SCIMapLocalSegment((m_SourceSegm[0].localHandle),
280 &(m_SourceSegm[0].lhm[0].map),
289 if(err != SCI_ERR_OK) {
290 DBUG_PRINT(
"error", (
"Cannot map area of size %d. Error code 0x%x",
298 for(Uint32
i=0;
i < m_adapters;
i++) {
299 SCISetSegmentAvailable((m_SourceSegm[0].localHandle),
304 if(err != SCI_ERR_OK) {
306 (
"Local Segment is not available for remote connections. Error code 0x%x\n",
317 bool SCI_Transporter::doSend() {
318 #ifdef DEBUG_TRANSPORTER
319 NDB_TICKS startSec=0, stopSec=0;
320 Uint32 startMicro=0, stopMicro=0, totalMicro=0;
328 Uint32 used = m_send_iovec_used;
332 #ifdef DEBUG_TRANSPORTER
333 Uint32 sizeToSend = 0;
334 for (Uint32
i = 0;
i < used;
i++)
335 sizeToSend += m_send_iovec[
i].iov_len;
337 if(sizeToSend < 1024 )
339 if(sizeToSend > 1024 && sizeToSend < 2048 )
343 if(sizeToSend>2048 && sizeToSend < 4096)
358 DBUG_PRINT(
"error", (
"SCI Transfer failed"));
359 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
363 Uint32 segSize = m_send_iovec[curr].iov_len;
364 Uint32 * insertPtr = (Uint32 *)
365 (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(segSize);
369 const Uint32 remoteOffset=(Uint32)
371 (
char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory));
373 SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence,
374 (
void*)m_send_iovec[curr].iov_base,
375 m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map,
378 SCI_FLAG_ERROR_CHECK,
381 if (err != SCI_ERR_OK) {
382 if (err == SCI_ERR_OUT_OF_RANGE ||
383 err == SCI_ERR_SIZE_ALIGNMENT ||
384 err == SCI_ERR_OFFSET_ALIGNMENT) {
385 DBUG_PRINT(
"error", (
"Data transfer error = %d", err));
386 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
390 if(err == SCI_ERR_TRANSFER_FAILED) {
391 if(getLinkStatus(m_ActiveAdapterId))
396 if (m_adapters == 1) {
397 DBUG_PRINT(
"error", (
"SCI Transfer failed"));
398 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
403 Uint32
temp=m_ActiveAdapterId;
404 if (getLinkStatus(m_StandbyAdapterId)) {
406 SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0);
407 m_ActiveAdapterId=m_StandbyAdapterId;
408 m_StandbyAdapterId=
temp;
409 DBUG_PRINT(
"error", (
"Swapping from adapter %u to %u",
410 m_StandbyAdapterId, m_ActiveAdapterId));
412 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
413 DBUG_PRINT(
"error", (
"SCI Transfer failed"));
418 SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);
419 writer->updateWritePtr(segSize);
431 DBUG_PRINT(
"error", (
"the segment is full for some reason"));
439 iovec_data_sent(total);
446 void SCI_Transporter::failoverShmWriter() {
448 (m_TargetSegm[m_StandbyAdapterId].writer)
449 ->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));
454 void SCI_Transporter::setupLocalSegment()
456 DBUG_ENTER(
"SCI_Transporter::setupLocalSegment");
457 Uint32 sharedSize = 0;
460 Uint32 sizeOfBuffer = m_BufferSize;
462 sizeOfBuffer -= sharedSize;
464 Uint32 * localReadIndex =
465 (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;
466 Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1);
467 m_localStatusFlag = (Uint32*)(localReadIndex + 3);
469 char * localStartOfBuf = (
char*)
470 ((
char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize);
472 * localReadIndex = 0;
473 * localWriteIndex = 0;
475 const Uint32 slack = MAX_MESSAGE_SIZE;
487 void SCI_Transporter::setupRemoteSegment()
489 DBUG_ENTER(
"SCI_Transporter::setupRemoteSegment");
490 Uint32 sharedSize = 0;
493 Uint32 sizeOfBuffer = m_BufferSize;
494 const Uint32 slack = MAX_MESSAGE_SIZE;
495 sizeOfBuffer -= sharedSize;
497 Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;
499 Uint32 * remoteReadIndex = (Uint32*)segPtr;
500 Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);
501 m_remoteStatusFlag = (Uint32*)(segPtr + 3);
503 char * remoteStartOfBuf = (
char*)((
char*)segPtr+(sharedSize));
513 m_TargetSegm[0].writer=writer;
516 report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
517 DBUG_PRINT(
"error", (
"Unable to create sequence on active"));
520 if (m_adapters > 1) {
521 segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ;
523 Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
524 Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
525 m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
527 char * remoteStartOfBuf2 = (
char*)((
char *)segPtr+sharedSize);
541 * remoteReadIndex = 0;
542 * remoteWriteIndex = 0;
544 m_TargetSegm[1].writer=writer2;
546 report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
547 DBUG_PRINT(
"error", (
"Unable to create sequence on standby"));
555 SCI_Transporter::init_local()
557 DBUG_ENTER(
"SCI_Transporter::init_local");
560 NdbSleep_MilliSleep(10);
562 report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT);
571 SCI_Transporter::init_remote()
573 DBUG_ENTER(
"SCI_Transporter::init_remote");
577 DBUG_PRINT(
"info", (
"Map remote segments"));
578 for(Uint32
i=0;
i < m_adapters ;
i++) {
579 m_TargetSegm[
i].rhm[
i].remoteHandle=0;
580 SCIConnectSegment(sciAdapters[
i].scidesc,
581 &(m_TargetSegm[
i].rhm[
i].remoteHandle),
591 if(err != SCI_ERR_OK) {
592 NdbSleep_MilliSleep(10);
593 DBUG_PRINT(
"error", (
"Error connecting segment, err 0x%x", err));
598 for(Uint32 i=0; i < m_adapters ; i++) {
599 m_TargetSegm[
i].mappedMemory =
600 SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),
601 &(m_TargetSegm[i].rhm[i].map),
608 if(err!= SCI_ERR_OK) {
610 (
"Cannot map a segment to the remote node %d. Error code 0x%x",
611 m_RemoteSciNodeId, err));
613 report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT);
618 setupRemoteSegment();
620 DBUG_PRINT(
"info", (
"connected and mapped to segment, remoteNode: %d",
622 DBUG_PRINT(
"info", (
"remoteSegId: %d",
631 SCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
636 DBUG_ENTER(
"SCI_Transporter::connect_client_impl");
638 if (s_input.gets(buf, 256) == 0) {
639 DBUG_PRINT(
"error", (
"No initial response from server in SCI"));
640 NDB_CLOSE_SOCKET(sockfd);
644 NDB_CLOSE_SOCKET(sockfd);
649 s_output.println(
"sci client 1 ok");
651 if (!init_remote()) {
652 NDB_CLOSE_SOCKET(sockfd);
656 if (s_input.gets(buf, 256) == 0) {
657 DBUG_PRINT(
"error", (
"No second response from server in SCI"));
658 NDB_CLOSE_SOCKET(sockfd);
662 s_output.println(
"sci client 2 ok");
664 NDB_CLOSE_SOCKET(sockfd);
665 DBUG_PRINT(
"info", (
"Successfully connected client to node %d",
676 DBUG_ENTER(
"SCI_Transporter::connect_server_impl");
679 NDB_CLOSE_SOCKET(sockfd);
683 s_output.println(
"sci server 1 ok");
686 if (s_input.gets(buf, 256) == 0) {
687 DBUG_PRINT(
"error", (
"No response from client in SCI"));
688 NDB_CLOSE_SOCKET(sockfd);
692 if (!init_remote()) {
693 NDB_CLOSE_SOCKET(sockfd);
697 s_output.println(
"sci server 2 ok");
699 if (s_input.gets(buf, 256) == 0) {
700 DBUG_PRINT(
"error", (
"No second response from client in SCI"));
701 NDB_CLOSE_SOCKET(sockfd);
705 NDB_CLOSE_SOCKET(sockfd);
706 DBUG_PRINT(
"info", (
"Successfully connected server to node %d",
713 SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map),
714 &(m_TargetSegm[adapterid].sequence),
715 SCI_FLAG_FAST_BARRIER,
720 bool SCI_Transporter::disconnectLocal()
722 DBUG_ENTER(
"SCI_Transporter::disconnectLocal");
729 SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err);
730 if(err!=SCI_ERR_OK) {
731 report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
732 DBUG_PRINT(
"error", (
"Unable to unmap segment"));
736 SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle),
740 if(err!=SCI_ERR_OK) {
741 report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT);
742 DBUG_PRINT(
"error", (
"Unable to remove segment"));
745 DBUG_PRINT(
"info", (
"Local memory segment is unmapped and removed"));
750 bool SCI_Transporter::disconnectRemote() {
751 DBUG_ENTER(
"SCI_Transporter::disconnectRemote");
753 for(Uint32 i=0; i<m_adapters; i++) {
757 SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err);
758 if(err!=SCI_ERR_OK) {
759 report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
760 DBUG_PRINT(
"error", (
"Unable to unmap segment"));
764 SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle,
767 if(err!=SCI_ERR_OK) {
768 report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);
769 DBUG_PRINT(
"error", (
"Unable to disconnect segment"));
772 DBUG_PRINT(
"info", (
"Remote memory segment is unmapped and disconnected"));
778 SCI_Transporter::~SCI_Transporter() {
779 DBUG_ENTER(
"SCI_Transporter::~SCI_Transporter");
788 DBUG_ENTER(
"SCI_Transporter::closeSCI");
798 SCIClose(activeSCIDescriptor, FLAGS, &err);
800 if(err != SCI_ERR_OK) {
802 (
"Cannot close SCI channel to the driver. Error code 0x%x",
816 if(*m_localStatusFlag == SCICONNECTED &&
817 (*m_remoteStatusFlag == SCICONNECTED ||
819 *m_remoteStatusFlag2 == SCICONNECTED)))
826 SCI_Transporter::setConnected() {
827 *m_remoteStatusFlag = SCICONNECTED;
828 if (m_adapters > 1) {
829 *m_remoteStatusFlag2 = SCICONNECTED;
831 *m_localStatusFlag = SCICONNECTED;
835 SCI_Transporter::setDisconnect() {
836 if(getLinkStatus(m_ActiveAdapterId))
837 *m_remoteStatusFlag = SCIDISCONNECT;
838 if (m_adapters > 1) {
839 if(getLinkStatus(m_StandbyAdapterId))
840 *m_remoteStatusFlag2 = SCIDISCONNECT;
846 if (*m_localStatusFlag == SCIDISCONNECT) {
853 static bool init =
false;
856 SCI_Transporter::initSCI() {
857 DBUG_ENTER(
"SCI_Transporter::initSCI");
861 SCIInitialize(0, &error);
862 if(error != SCI_ERR_OK) {
863 DBUG_PRINT(
"error", (
"Cannot initialize SISCI library."));
865 (
"Inconsistency between SISCI library and SISCI driver. Error code 0x%x",