19 #include <ndb_global.h>
21 #include "SHM_Transporter.hpp"
22 #include "TransporterInternalDefinitions.hpp"
23 #include <TransporterCallback.hpp>
27 #include <InputStream.hpp>
28 #include <OutputStream.hpp>
30 extern int g_ndb_shm_signum;
33 const char *lHostName,
34 const char *rHostName,
36 bool isMgmConnection_arg,
45 lHostName, rHostName, r_port, isMgmConnection_arg,
46 lNodeId, rNodeId, serverNodeId,
47 0, false, checksum, signalId,
48 4096 + MAX_SEND_MESSAGE_BYTESIZE),
55 _shmSegCreated =
false;
62 setupBuffersDone=
false;
63 #ifdef DEBUG_TRANSPORTER
64 printf(
"shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
66 m_signal_threshold = 4096;
73 if ((key_t)conf->shm.shmKey == shmKey &&
74 (
int)conf->shm.shmSize == shmSize &&
75 conf->shm.signum == g_ndb_shm_signum)
94 Uint32 sharedSize = 0;
98 const Uint32 slack = MAX(MAX_RECV_MESSAGE_BYTESIZE,
99 MAX_SEND_MESSAGE_BYTESIZE);
104 Uint32 sizeOfBuffer = shmSize;
105 sizeOfBuffer -= 2*sharedSize;
108 Uint32 * base1 = (Uint32*)shmBuf;
110 Uint32 * sharedReadIndex1 = base1;
111 Uint32 * sharedWriteIndex1 = base1 + 1;
112 serverStatusFlag = base1 + 4;
113 char * startOfBuf1 = shmBuf+sharedSize;
115 Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
116 Uint32 * sharedReadIndex2 = base2;
117 Uint32 * sharedWriteIndex2 = base2 + 1;
118 clientStatusFlag = base2 + 4;
119 char * startOfBuf2 = ((
char *)base2)+sharedSize;
122 * serverStatusFlag = 0;
135 * sharedReadIndex1 = 0;
136 * sharedWriteIndex1 = 0;
138 * sharedReadIndex2 = 0;
139 * sharedWriteIndex2 = 0;
144 * serverStatusFlag = 1;
146 #ifdef DEBUG_TRANSPORTER
147 printf(
"-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
148 printf(
"Reader at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
149 printf(
"sharedReadIndex1 at %ld (%p) = %d\n",
150 (
char*)sharedReadIndex1-shmBuf,
151 sharedReadIndex1, *sharedReadIndex1);
152 printf(
"sharedWriteIndex1 at %ld (%p) = %d\n",
153 (
char*)sharedWriteIndex1-shmBuf,
154 sharedWriteIndex1, *sharedWriteIndex1);
156 printf(
"Writer at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
157 printf(
"sharedReadIndex2 at %ld (%p) = %d\n",
158 (
char*)sharedReadIndex2-shmBuf,
159 sharedReadIndex2, *sharedReadIndex2);
160 printf(
"sharedWriteIndex2 at %ld (%p) = %d\n",
161 (
char*)sharedWriteIndex2-shmBuf,
162 sharedWriteIndex2, *sharedWriteIndex2);
164 printf(
"sizeOfBuffer = %d\n", sizeOfBuffer);
167 * clientStatusFlag = 0;
180 * sharedReadIndex2 = 0;
181 * sharedWriteIndex1 = 0;
185 * clientStatusFlag = 1;
186 #ifdef DEBUG_TRANSPORTER
187 printf(
"-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
188 printf(
"Reader at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
189 printf(
"sharedReadIndex2 at %ld (%p) = %d\n",
190 (
char*)sharedReadIndex2-shmBuf,
191 sharedReadIndex2, *sharedReadIndex2);
192 printf(
"sharedWriteIndex2 at %ld (%p) = %d\n",
193 (
char*)sharedWriteIndex2-shmBuf,
194 sharedWriteIndex2, *sharedWriteIndex2);
196 printf(
"Writer at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
197 printf(
"sharedReadIndex1 at %ld (%p) = %d\n",
198 (
char*)sharedReadIndex1-shmBuf,
199 sharedReadIndex1, *sharedReadIndex1);
200 printf(
"sharedWriteIndex1 at %ld (%p) = %d\n",
201 (
char*)sharedWriteIndex1-shmBuf,
202 sharedWriteIndex1, *sharedWriteIndex1);
204 printf(
"sizeOfBuffer = %d\n", sizeOfBuffer);
207 #ifdef DEBUG_TRANSPORTER
208 printf(
"Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
215 DBUG_ENTER(
"SHM_Transporter::connect_server_impl");
222 if (!ndb_shm_create()) {
223 make_error_info(buf,
sizeof(buf));
224 report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT, buf);
225 NDB_CLOSE_SOCKET(sockfd);
228 _shmSegCreated =
true;
233 if (!ndb_shm_attach()) {
234 make_error_info(buf,
sizeof(buf));
235 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
236 NDB_CLOSE_SOCKET(sockfd);
243 s_output.println(
"shm server 1 ok: %d",
244 m_transporter_registry.m_shm_own_pid);
247 DBUG_PRINT(
"info", (
"Wait for ok from client"));
248 if (s_input.gets(buf,
sizeof(buf)) == 0)
250 NDB_CLOSE_SOCKET(sockfd);
254 if(sscanf(buf,
"shm client 1 ok: %d", &m_remote_pid) != 1)
256 NDB_CLOSE_SOCKET(sockfd);
260 int r= connect_common(sockfd);
264 s_output.println(
"shm server 2 ok");
266 if (s_input.gets(buf, 256) == 0) {
267 NDB_CLOSE_SOCKET(sockfd);
270 DBUG_PRINT(
"info", (
"Successfully connected server to node %d",
274 NDB_CLOSE_SOCKET(sockfd);
281 DBUG_ENTER(
"SHM_Transporter::connect_client_impl");
287 DBUG_PRINT(
"info", (
"Wait for server to create and attach"));
288 if (s_input.gets(buf, 256) == 0) {
289 NDB_CLOSE_SOCKET(sockfd);
290 DBUG_PRINT(
"error", (
"Server id %d did not attach",
295 if(sscanf(buf,
"shm server 1 ok: %d", &m_remote_pid) != 1)
297 NDB_CLOSE_SOCKET(sockfd);
303 if (!ndb_shm_get()) {
304 NDB_CLOSE_SOCKET(sockfd);
305 DBUG_PRINT(
"error", (
"Failed create of shm seg to node %d",
309 _shmSegCreated =
true;
314 if (!ndb_shm_attach()) {
315 make_error_info(buf,
sizeof(buf));
316 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
317 NDB_CLOSE_SOCKET(sockfd);
318 DBUG_PRINT(
"error", (
"Failed attach of shm seg to node %d",
326 s_output.println(
"shm client 1 ok: %d",
327 m_transporter_registry.m_shm_own_pid);
329 int r= connect_common(sockfd);
333 DBUG_PRINT(
"info", (
"Wait for ok from server"));
334 if (s_input.gets(buf, 256) == 0) {
335 NDB_CLOSE_SOCKET(sockfd);
336 DBUG_PRINT(
"error", (
"No ok from server node %d",
341 s_output.println(
"shm client 2 ok");
342 DBUG_PRINT(
"info", (
"Successfully connected client to node %d",
346 NDB_CLOSE_SOCKET(sockfd);
351 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
357 if(!setupBuffersDone)
360 setupBuffersDone=
true;
365 NdbSleep_MilliSleep(m_timeOutMillis);
366 if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
370 DBUG_PRINT(
"error", (
"Failed to set up buffers to node %d",
378 struct iovec iov[64];
387 for(Uint32
i = 0;
i<cnt;
i++)
389 assert(iov[
i].iov_len);
390 sum += iov[
i].iov_len;
393 int nBytesSent = writer->
writev(iov, cnt);
397 kill(m_remote_pid, g_ndb_shm_signum);
398 iovec_data_sent(nBytesSent);
400 if (Uint32(nBytesSent) == sum && (cnt != NDB_ARRAY_SIZE(iov)))