18 #include <ndb_global.h>
21 #include "TCP_Transporter.hpp"
25 #include <EventLogger.hpp>
33 ndbstrerror(
int iError);
35 operator char*(void) {
return m_szError; };
42 ndbstrerror::ndbstrerror(
int iError)
46 FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
49 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
55 ndbstrerror::~ndbstrerror(
void)
57 LocalFree( m_szError );
61 #define ndbstrerror strerror
66 setIf(
int& ref, Uint32 val, Uint32 def)
78 return (conf->tcp.tcpOverloadLimit ?
79 conf->tcp.tcpOverloadLimit :
80 conf->tcp.sendBufferSize*4/5);
91 conf->isMgmConnection,
98 conf->tcp.sendBufferSize)
100 maxReceiveSize = conf->tcp.maxReceiveSize;
103 my_socket_invalidate(&theSocket);
105 sendCount = receiveCount = 0;
106 sendSize = receiveSize = 0;
110 setIf(sockOptRcvBufSize, conf->tcp.tcpRcvBufSize, 70080);
111 setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540);
112 setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
114 m_overload_limit = overload_limit(conf);
121 if (conf->tcp.sendBufferSize == m_max_send_buffer &&
122 conf->tcp.maxReceiveSize == maxReceiveSize &&
123 (
int)conf->tcp.tcpSndBufSize == sockOptSndBufSize &&
124 (
int)conf->tcp.tcpRcvBufSize == sockOptRcvBufSize &&
125 (
int)conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg &&
126 overload_limit(conf) == m_overload_limit)
133 TCP_Transporter::~TCP_Transporter() {
136 if (my_socket_valid(theSocket))
140 receiveBuffer.destroy();
145 DBUG_ENTER(
"TCP_Transpporter::connect_server_impl");
146 DBUG_RETURN(connect_common(sockfd));
149 bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
151 DBUG_ENTER(
"TCP_Transpporter::connect_client_impl");
152 DBUG_RETURN(connect_common(sockfd));
155 bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
157 setSocketOptions(sockfd);
158 setSocketNonBlocking(sockfd);
162 get_callback_obj()->unlock_transporter(remoteNodeId);
164 DBUG_PRINT(
"info", (
"Successfully set-up TCP transporter to node %d",
170 TCP_Transporter::initTransporter() {
175 Uint32 recBufSize = maxReceiveSize;
176 if(recBufSize < MAX_RECV_MESSAGE_BYTESIZE){
177 recBufSize = MAX_RECV_MESSAGE_BYTESIZE;
180 if(!receiveBuffer.init(recBufSize+MAX_RECV_MESSAGE_BYTESIZE)){
189 set_get(NDB_SOCKET_TYPE fd,
int level,
int optval,
const char *optname,
192 int actual = 0, defval = 0;
193 SOCKET_SIZE_TYPE len =
sizeof(actual);
195 my_getsockopt(fd, level, optval, (
char*)&defval, &len);
197 if (my_setsockopt(fd, level, optval,
198 (
char*)&val,
sizeof(val)) < 0)
200 #ifdef DEBUG_TRANSPORTER
201 g_eventLogger->
error(
"setsockopt(%s, %d) errno: %d %s",
202 optname, val, errno, strerror(errno));
206 len =
sizeof(actual);
207 if ((my_getsockopt(fd, level, optval,
208 (
char*)&actual, &len) == 0) &&
211 #ifdef DEBUG_TRANSPORTER
212 g_eventLogger->
error(
"setsockopt(%s, %d) - actual %d default: %d",
213 optname, val, actual, defval);
219 TCP_Transporter::pre_connect_options(NDB_SOCKET_TYPE sockfd)
221 if (sockOptTcpMaxSeg)
224 set_get(sockfd, IPPROTO_TCP, TCP_MAXSEG,
"TCP_MAXSEG", sockOptTcpMaxSeg);
231 TCP_Transporter::setSocketOptions(NDB_SOCKET_TYPE socket)
233 set_get(socket, SOL_SOCKET, SO_RCVBUF,
"SO_RCVBUF", sockOptRcvBufSize);
234 set_get(socket, SOL_SOCKET, SO_SNDBUF,
"SO_SNDBUF", sockOptSndBufSize);
235 set_get(socket, IPPROTO_TCP, TCP_NODELAY,
"TCP_NODELAY", sockOptNodelay);
236 set_get(socket, SOL_SOCKET, SO_KEEPALIVE,
"SO_KEEPALIVE", 1);
238 if (sockOptTcpMaxSeg)
241 set_get(socket, IPPROTO_TCP, TCP_MAXSEG,
"TCP_MAXSEG", sockOptTcpMaxSeg);
246 bool TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket)
248 if(my_socket_nonblock(socket,
true)==0)
254 TCP_Transporter::send_is_possible(
int timeout_millisec)
const
256 return send_is_possible(theSocket, timeout_millisec);
260 TCP_Transporter::send_is_possible(NDB_SOCKET_TYPE fd,
int timeout_millisec)
const
264 if (!my_socket_valid(fd))
268 poller.add(fd,
false,
true,
false);
270 if (poller.poll_unsafe(timeout_millisec) <= 0)
276 #define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
277 (!((sz == -1) && ((e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK) || (e == SOCKET_EINTR)))))
281 TCP_Transporter::doSend() {
282 struct iovec iov[64];
291 for(Uint32
i = 0;
i<cnt;
i++)
293 assert(iov[
i].iov_len);
294 sum += iov[
i].iov_len;
302 if (cnt == NDB_ARRAY_SIZE(iov))
312 Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
313 int nBytesSent = (int)my_socket_writev(theSocket, iov+pos, iovcnt);
314 assert(nBytesSent <= (
int)remain);
316 if (Uint32(nBytesSent) == remain)
318 sum_sent += nBytesSent;
321 else if (nBytesSent > 0)
323 sum_sent += nBytesSent;
324 remain -= nBytesSent;
329 while (Uint32(nBytesSent) >= iov[pos].iov_len)
331 assert(iov[pos].iov_len > 0);
332 nBytesSent -= iov[pos].iov_len;
339 assert(iov[pos].iov_len > Uint32(nBytesSent));
340 iov[pos].iov_len -= nBytesSent;
341 iov[pos].iov_base = ((
char*)(iov[pos].iov_base))+nBytesSent;
347 int err = my_socket_errno();
348 if (!(DISCONNECT_ERRNO(err, nBytesSent)))
360 #if defined DEBUG_TRANSPORTER
361 g_eventLogger->
error(
"Send Failure(disconnect==%d) to node = %d "
363 "errno = %d strerror = %s",
364 DISCONNECT_ERRNO(err, nBytesSent),
365 remoteNodeId, nBytesSent, my_socket_errno(),
366 (
char*)ndbstrerror(err));
374 assert(sum >= sum_sent);
375 iovec_data_sent(sum_sent);
376 sendCount += send_cnt;
377 sendSize += sum_sent;
378 if(sendCount >= reportFreq)
380 get_callback_obj()->
reportSendLen(remoteNodeId, sendCount, sendSize);
385 return sum - sum_sent;
389 TCP_Transporter::doReceive() {
393 Uint32
size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
395 const int nBytesRead = (int)my_recv(theSocket,
396 receiveBuffer.insertPtr,
397 size < maxReceiveSize ? size : maxReceiveSize,
400 if (nBytesRead > 0) {
401 receiveBuffer.sizeOfData += nBytesRead;
402 receiveBuffer.insertPtr += nBytesRead;
404 if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
405 #ifdef DEBUG_TRANSPORTER
406 g_eventLogger->
error(
"receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
407 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
408 g_eventLogger->
error(
"nBytesRead = %d", nBytesRead);
410 g_eventLogger->
error(
"receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
411 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
412 report_error(TE_INVALID_MESSAGE_LENGTH);
417 receiveSize += nBytesRead;
419 if(receiveCount == reportFreq){
421 receiveCount, receiveSize);
427 #if defined DEBUG_TRANSPORTER
428 g_eventLogger->
error(
"Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
429 "errno = %d strerror = %s",
430 DISCONNECT_ERRNO(my_socket_errno(), nBytesRead),
431 remoteNodeId, nBytesRead, my_socket_errno(),
432 (
char*)ndbstrerror(my_socket_errno()));
434 if(DISCONNECT_ERRNO(my_socket_errno(), nBytesRead)){
435 do_disconnect(my_socket_errno());
448 NDB_SOCKET_TYPE sock = theSocket;
449 receiveBuffer.clear();
450 my_socket_invalidate(&theSocket);
452 get_callback_obj()->unlock_transporter(remoteNodeId);
454 if(my_socket_valid(sock))
456 if(my_socket_close(sock) < 0){
457 report_error(TE_ERROR_CLOSING_SOCKET);