18 #ifndef NDB_SOCKET_POLLER_H
19 #define NDB_SOCKET_POLLER_H
21 #include <portlib/NdbTick.h>
39 struct pollfd m_one_pfd;
40 struct pollfd* m_pfds;
46 set_max_count(fd_set*
set, fd_set* static_set,
unsigned count) {
47 void* ptr = malloc(
sizeof(fd_set) + count-1*
sizeof(SOCKET));
50 if (
set != static_set)
58 set_fd(fd_set*
set, SOCKET s) {
61 set->fd_array[
set->fd_count++] = s;
69 fd_set m_one_read_set;
70 fd_set m_one_write_set;
71 fd_set m_one_excp_set;
77 ndb_native_socket_t m_one_fd;
78 ndb_native_socket_t* m_fds;
90 , m_read_set(&m_one_read_set)
91 , m_write_set(&m_one_write_set)
92 , m_excp_set(&m_one_excp_set)
103 FD_ZERO(m_write_set);
111 if (m_pfds != &m_one_pfd)
115 if (m_read_set != &m_one_read_set)
117 if (m_write_set != &m_one_write_set)
119 if (m_excp_set != &m_one_excp_set)
122 if (m_fds != &m_one_fd)
127 bool set_max_count(
unsigned count) {
128 if (count <= m_max_count)
134 struct pollfd* pfds =
new struct pollfd[count];
137 if (m_pfds != &m_one_pfd)
142 if (count > FD_SETSIZE)
145 if (!set_max_count(m_read_set, &m_one_read_set, count) ||
146 !set_max_count(m_write_set, &m_one_write_set, count) ||
147 !set_max_count(m_excp_set, &m_one_excp_set, count))
151 ndb_native_socket_t* fds =
new ndb_native_socket_t[count];
154 if (m_fds != &m_one_fd)
162 unsigned add(
ndb_socket_t sock,
bool read,
bool write,
bool error) {
163 const unsigned index = m_count;
165 assert(m_count < m_max_count);
166 struct pollfd &pfd = m_pfds[m_count++];
167 pfd.fd = ndb_socket_get_native(sock);
182 set_fd(m_read_set, ndb_socket_get_native(sock));
184 set_fd(m_write_set, ndb_socket_get_native(sock));
186 set_fd(m_excp_set, ndb_socket_get_native(sock));
190 int fd = ndb_socket_get_native(sock);
191 if (fd < 0 || fd >= FD_SETSIZE)
193 fprintf(stderr,
"Maximum value for FD_SETSIZE: %d exceeded when"
194 "trying to add fd: %d", FD_SETSIZE, fd);
199 FD_SET(fd, m_read_set);
201 FD_SET(fd, m_write_set);
203 FD_SET(fd, m_excp_set);
208 m_fds[m_count++] = ndb_socket_get_native(sock);
210 assert(m_count > index);
214 unsigned count(
void)
const {
219 assert(index < m_count);
220 assert(m_count <= m_max_count);
222 return (m_pfds[index].fd == ndb_socket_get_native(socket));
224 return (m_fds[index] == ndb_socket_get_native(socket));
228 bool has_read(
unsigned index)
const {
229 assert(index < m_count);
230 assert(m_count <= m_max_count);
232 return (m_pfds[index].revents & POLLIN);
234 return FD_ISSET(m_fds[index], m_read_set);
238 bool has_write(
unsigned index)
const {
239 assert(index < m_count);
240 assert(m_count <= m_max_count);
242 return (m_pfds[index].revents & POLLOUT);
244 return FD_ISSET(m_fds[index], m_write_set);
251 int poll_unsafe(
int timeout)
254 return ::poll(m_pfds, m_count, timeout);
267 tv.tv_sec = (timeout / 1000);
268 tv.tv_usec = (timeout % 1000) * 1000;
270 return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
271 timeout == -1 ? NULL : &tv);
279 int poll(
int timeout)
283 const NDB_TICKS start = NdbTick_CurrentMillisecond();
285 const int res = poll_unsafe(timeout);
286 if (likely(res >= 0))
289 const int error = my_socket_errno();
291 (error == EINTR || error == EAGAIN))
296 timeout -= (int)(NdbTick_CurrentMillisecond() - start);
327 bool read,
bool write,
bool error,
int timeout_millis)
330 (void)poller.add(sock, read, write, error);
332 const int res = poller.poll(timeout_millis);