19 #include <ndb_global.h>
21 #include <SocketServer.hpp>
25 #include <NdbThread.h>
32 m_maxSessions(maxSessions),
38 SocketServer::~SocketServer() {
40 for(i = 0; i<m_sessions.size(); i++){
41 Session* session= m_sessions[
i].m_session;
42 assert(session->m_refCount == 0);
45 for(i = 0; i<m_services.size(); i++){
46 if(my_socket_valid(m_services[i].m_socket))
47 my_socket_close(m_services[i].m_socket);
48 delete m_services[
i].m_service;
54 struct sockaddr_in servaddr;
55 memset(&servaddr, 0,
sizeof(servaddr));
56 servaddr.sin_family = AF_INET;
57 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
58 servaddr.sin_port = htons(port);
61 if(Ndb_getInAddr(&servaddr.sin_addr, intface))
65 const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
66 if (!my_socket_valid(sock))
69 DBUG_PRINT(
"info",(
"NDB_SOCKET: " MY_SOCKET_FORMAT,
70 MY_SOCKET_FORMAT_VALUE(sock)));
72 if (my_socket_reuseaddr(sock,
true) == -1)
74 NDB_CLOSE_SOCKET(sock);
78 if (my_bind_inet(sock, &servaddr) == -1) {
79 NDB_CLOSE_SOCKET(sock);
83 NDB_CLOSE_SOCKET(sock);
89 unsigned short * port,
90 const char * intface){
91 DBUG_ENTER(
"SocketServer::setup");
92 DBUG_PRINT(
"enter",(
"interface=%s, port=%u", intface, *port));
93 struct sockaddr_in servaddr;
94 memset(&servaddr, 0,
sizeof(servaddr));
95 servaddr.sin_family = AF_INET;
96 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
97 servaddr.sin_port = htons(*port);
100 if(Ndb_getInAddr(&servaddr.sin_addr, intface))
104 const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
105 if (!my_socket_valid(sock))
107 DBUG_PRINT(
"error",(
"socket() - %d - %s",
108 socket_errno, strerror(socket_errno)));
112 DBUG_PRINT(
"info",(
"NDB_SOCKET: " MY_SOCKET_FORMAT,
113 MY_SOCKET_FORMAT_VALUE(sock)));
115 if (my_socket_reuseaddr(sock,
true) == -1)
117 DBUG_PRINT(
"error",(
"setsockopt() - %d - %s",
118 errno, strerror(errno)));
119 NDB_CLOSE_SOCKET(sock);
123 if (my_bind_inet(sock, &servaddr) == -1) {
124 DBUG_PRINT(
"error",(
"bind() - %d - %s",
125 socket_errno, strerror(socket_errno)));
126 NDB_CLOSE_SOCKET(sock);
131 if(my_socket_get_port(sock, port))
133 ndbout_c(
"An error occurred while trying to find out what"
134 " port we bound to. Error: %d - %s",
135 socket_errno, strerror(socket_errno));
136 my_socket_close(sock);
140 DBUG_PRINT(
"info",(
"bound to %u", *port));
142 if (my_listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1)
144 DBUG_PRINT(
"error",(
"listen() - %d - %s",
145 socket_errno, strerror(socket_errno)));
146 my_socket_close(sock);
152 i.m_service = service;
153 m_services.push_back(i);
156 m_services_poller.set_max_count(m_services.size());
163 SocketServer::doAccept()
167 m_services_poller.clear();
168 for (
unsigned i = 0; i < m_services.size(); i++)
170 m_services_poller.add(m_services[i].m_socket,
true,
false,
true);
172 assert(m_services.size() == m_services_poller.count());
174 const int accept_timeout_ms = 1000;
175 const int ret = m_services_poller.poll(accept_timeout_ms);
191 for (
unsigned i = 0; i < m_services_poller.count(); i++)
193 const bool has_read = m_services_poller.has_read(i);
198 ServiceInstance & si = m_services[
i];
199 assert(m_services_poller.is_socket_equal(i, si.m_socket));
201 const NDB_SOCKET_TYPE childSock = my_accept(si.m_socket, 0, 0);
202 if (!my_socket_valid(childSock))
211 s.m_service = si.m_service;
212 s.m_session = si.m_service->newSession(childSock);
213 if (s.m_session != 0)
215 m_session_mutex.lock();
216 m_sessions.push_back(s);
217 startSession(m_sessions.back());
218 m_session_mutex.unlock();
228 socketServerThread_C(
void* _ss){
238 if(m_thread == 0 && m_stopThread ==
false)
240 m_thread = NdbThread_Create(socketServerThread_C,
244 NDB_THREAD_PRIO_LOW);
246 m_threadLock.unlock();
251 SocketServer::stopServer(){
257 NdbThread_WaitFor(m_thread, &res);
258 NdbThread_Destroy(&m_thread);
261 m_threadLock.unlock();
265 SocketServer::doRun(){
267 while(!m_stopThread){
268 m_session_mutex.lock();
270 m_session_mutex.unlock();
272 if(m_sessions.size() >= m_maxSessions){
274 NdbSleep_MilliSleep(200);
280 NdbSleep_MilliSleep(200);
286 SocketServer::startSession(SessionInstance & si){
287 si.m_thread = NdbThread_Create(sessionThread_C,
288 (
void**)si.m_session,
291 NDB_THREAD_PRIO_LOW);
300 m_session_mutex.lock();
302 for(
unsigned i= 0; i < m_sessions.size(); i++){
303 Session* session= m_sessions[
i].m_session;
304 session_pointers.push_back(session);
305 session->m_refCount++;
307 m_session_mutex.unlock();
310 for(
unsigned i= 0; i < session_pointers.size(); i++){
311 (*func)(session_pointers[
i], data);
315 m_session_mutex.lock();
316 for(
unsigned i= 0; i < session_pointers.size(); i++){
317 Session* session= session_pointers[
i];
318 assert(session->m_refCount > 0);
319 session->m_refCount--;
322 m_session_mutex.unlock();
326 SocketServer::checkSessions()
328 m_session_mutex.lock();
330 m_session_mutex.unlock();
334 SocketServer::checkSessionsImpl()
336 for(
int i = m_sessions.size() - 1; i >= 0; i--)
338 if(m_sessions[i].m_session->m_thread_stopped &&
339 (m_sessions[i].m_session->m_refCount == 0))
341 if(m_sessions[i].m_thread != 0)
344 NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
345 NdbThread_Destroy(&m_sessions[i].m_thread);
347 m_sessions[
i].m_session->stopSession();
348 delete m_sessions[
i].m_session;
357 m_session_mutex.lock();
358 for(i = m_sessions.size() - 1; i>=0; i--)
360 m_sessions[
i].m_session->stopSession();
362 m_session_mutex.unlock();
364 for(i = m_services.size() - 1; i>=0; i--)
365 m_services[i].m_service->stopSessions();
370 NDB_TICKS start = NdbTick_CurrentMillisecond();
371 m_session_mutex.lock();
372 while(m_sessions.size() > 0){
374 m_session_mutex.unlock();
376 if (wait_timeout > 0 &&
377 (NdbTick_CurrentMillisecond() - start) > wait_timeout)
380 NdbSleep_MilliSleep(100);
381 m_session_mutex.lock();
383 m_session_mutex.unlock();
392 sessionThread_C(
void* _sc){
395 assert(si->m_thread_stopped ==
false);
400 NDB_CLOSE_SOCKET(si->m_socket);
404 si->m_thread_stopped =
true;