29 #ifndef TransporterRegistry_H
30 #define TransporterRegistry_H
32 #if defined(HAVE_EPOLL_CREATE)
33 #include <sys/epoll.h>
35 #include "TransporterDefinitions.hpp"
36 #include "TransporterCallback.hpp"
37 #include <SocketServer.hpp>
38 #include <SocketClient.hpp>
42 #include <mgmapi/mgmapi.h>
44 #include <NodeBitmask.hpp>
57 static const char *performStateString[] =
59 "is trying to connect",
61 "is trying to disconnect" };
78 m_transporter_registry= 0;
82 m_transporter_registry= t;
101 bool use_default_send_buffer =
true,
102 unsigned maxTransporters = MAX_NTRANSPORTERS,
103 unsigned sizeOfLongSignalMemory = 100);
111 NdbMgmHandle get_mgm_handle(
void) {
return m_mgm_handle; };
113 bool init(NodeId localNodeId);
180 const char *getPerformStateString(NodeId nodeId)
const
181 {
return performStateString[(unsigned)performStates[nodeId]]; };
183 PerformState getPerformState(NodeId nodeId)
const {
return performStates[nodeId]; }
190 bool is_connected(NodeId node_id) {
return performStates[node_id] == CONNECTED; };
191 void report_connect(NodeId node_id);
192 void report_disconnect(NodeId node_id,
int errnum);
193 void report_error(NodeId nodeId, TransporterError errorCode,
194 const char *errorInfo = 0);
199 IOState
ioState(NodeId nodeId);
200 void setIOState(NodeId nodeId, IOState state);
238 bool get_using_default_send_buffer()
const{
return m_use_default_send_buffer;}
260 const Uint32 *
const signalData,
266 const Uint32 *
const signalData,
272 const Uint32 *
const signalData,
279 const Uint32 *
const signalData,
283 return prepareSend(
this, signalHeader, prio, signalData, nodeId, ptr);
286 const Uint32 *
const signalData,
291 return prepareSend(
this, signalHeader, prio, signalData, nodeId, pool, ptr);
294 const Uint32 *
const signalData,
298 return prepareSend(
this, signalHeader, prio, signalData, nodeId, ptr);
309 inline Uint32 pollReceive(Uint32 timeOutMillis) {
310 return pollReceive(timeOutMillis, m_has_data_transporters);
312 Uint32 pollReceive(Uint32 timeOutMillis,
NodeBitmask& mask);
324 #ifdef DEBUG_TRANSPORTER
330 bool isBlocked(NodeId nodeId);
331 void blockReceive(NodeId nodeId);
332 void unblockReceive(NodeId nodeId);
337 NodeId m_remote_nodeId;
338 int m_s_service_port;
339 const char *m_interface;
342 void add_transporter_interface(NodeId remoteNodeId,
const char *interf,
345 struct in_addr get_connect_address(NodeId node_id) const;
353 struct NdbThread *m_start_clients_thread;
354 bool m_run_start_clients_thread;
358 unsigned maxTransporters;
360 int nTCPTransporters;
361 int nSCITransporters;
362 int nSHMTransporters;
365 Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
366 Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
367 Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
368 int m_disconnect_errors[MAX_NTRANSPORTERS];
376 #if defined(HAVE_EPOLL_CREATE)
378 struct epoll_event *m_epoll_events;
391 TransporterType* theTransporterTypes;
397 PerformState* performStates;
398 int* m_disconnect_errnum;
401 TransporterError m_code;
404 struct ErrorState *m_error_states;
416 Uint32 unpack(Uint32 * readPtr,
421 Uint32 * unpack(Uint32 * readPtr,
426 static Uint32 unpack_length_words(
const Uint32 *readPtr, Uint32 maxWords);
433 void removeTransporter(NodeId nodeId);
438 int tcpReadSelectReply;
441 Uint32 poll_TCP(Uint32 timeOutMillis,
NodeBitmask&);
442 Uint32 poll_SCI(Uint32 timeOutMillis,
NodeBitmask&);
443 Uint32 poll_SHM(Uint32 timeOutMillis,
NodeBitmask&);
449 bool setup_wakeup_socket();
452 bool m_has_extra_wakeup_socket;
453 NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
454 void consume_extra_sockets();
458 NodeId node, Uint32 lenBytes, Uint32 prio);
460 NodeId node, Uint32 lenBytes, Uint32 prio);
468 virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
470 virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
471 virtual bool forceSend(NodeId node);
475 struct SendBufferPage {
477 static const Uint32 PGSIZE = 32768;
478 static Uint32 max_data_bytes()
480 return PGSIZE - offsetof(SendBufferPage, m_data);
484 struct SendBufferPage *m_next;
500 SendBufferPage *m_first_page;
501 SendBufferPage *m_last_page;
504 SendBufferPage *alloc_page();
505 void release_page(SendBufferPage *
page);
509 bool m_use_default_send_buffer;
511 SendBuffer *m_send_buffers;
513 SendBufferPage *m_page_freelist;
515 unsigned char *m_send_buffer_memory;
520 Uint64 m_total_max_send_buffer;
523 Uint32 get_bytes_to_send_iovec(NodeId node,
struct iovec *dst, Uint32 max);
524 Uint32 bytes_sent(NodeId node, Uint32 bytes);
525 bool has_data_to_send(NodeId node);
527 void reset_send_buffer(NodeId node,
bool should_be_empty);
529 void print_transporters(
const char* where, NdbOut& out = ndbout);
536 assert(nodeId < MAX_NODES);
537 m_status_overloaded.
set(nodeId, val);
541 TransporterRegistry::get_status_overloaded()
const
543 return m_status_overloaded;
546 #endif // Define of TransporterRegistry_H