19 #include "ha_ndbcluster_glue.h"
21 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
22 #include <ndbapi/NdbApi.hpp>
23 #include <portlib/NdbTick.h>
24 #include "ha_ndbcluster_connection.h"
29 static uint g_pool_alloc= 0;
30 static uint g_pool_pos= 0;
31 static pthread_mutex_t g_pool_mutex;
40 extern int global_flag_skip_waiting_for_clean_cache;
43 ndbcluster_connect(
int (*connect_callback)(
void),
45 uint connection_pool_size,
46 bool optimized_node_select,
47 const char* connect_string,
52 #ifndef EMBEDDED_LIBRARY
53 const char mysqld_name[]=
"mysqld";
55 const char mysqld_name[]=
"libmysqld";
58 DBUG_ENTER(
"ndbcluster_connect");
59 DBUG_PRINT(
"enter", (
"connect_string: %s, force_nodeid: %d",
60 connect_string, force_nodeid));
62 global_flag_skip_waiting_for_clean_cache= 1;
64 g_ndb_cluster_connection=
66 if (!g_ndb_cluster_connection)
68 sql_print_error(
"NDB: failed to allocate global ndb cluster connection");
69 DBUG_PRINT(
"error", (
"Ndb_cluster_connection(%s)", connect_string));
70 my_errno= HA_ERR_OUT_OF_MEM;
75 my_snprintf(buf,
sizeof(buf),
"%s --server-id=%lu",
76 mysqld_name, server_id);
77 g_ndb_cluster_connection->
set_name(buf);
79 g_ndb_cluster_connection->set_optimized_node_selection(optimized_node_select);
82 if ( (g_ndb=
new Ndb(g_ndb_cluster_connection,
"sys")) == 0 )
84 sql_print_error(
"NDB: failed to allocate global ndb object");
85 DBUG_PRINT(
"error", (
"failed to create global ndb object"));
86 my_errno= HA_ERR_OUT_OF_MEM;
89 if (g_ndb->
init() != 0)
91 DBUG_PRINT(
"error", (
"%d message: %s",
99 end_time= NdbTick_CurrentMillisecond();
100 end_time+= 1000 * wait_connected;
102 while ((res= g_ndb_cluster_connection->
connect(0,0,0)) == 1)
104 if (NdbTick_CurrentMillisecond() > end_time)
112 g_pool_alloc= connection_pool_size;
115 MYF(MY_WME | MY_ZEROFILL));
116 pthread_mutex_init(&g_pool_mutex,
118 g_pool[0]= g_ndb_cluster_connection;
119 for (uint
i= 1;
i < g_pool_alloc;
i++)
123 g_ndb_cluster_connection)) == 0)
125 sql_print_error(
"NDB[%u]: failed to allocate cluster connect object",
127 DBUG_PRINT(
"error",(
"Ndb_cluster_connection[%u](%s)",
133 my_snprintf(buf,
sizeof(buf),
"%s --server-id=%lu (connection %u)",
134 mysqld_name, server_id,
i+1);
137 g_pool[
i]->set_optimized_node_selection(optimized_node_select);
144 for (uint
i= 0;
i < g_pool_alloc;
i++)
146 int node_id= g_pool[
i]->node_id();
151 if (g_pool[
i]->node_id() == 0)
153 sql_print_warning(
"NDB[%u]: starting connect thread",
i);
154 g_pool[
i]->start_connect_thread();
157 node_id= g_pool[
i]->node_id();
160 (
"NDBCLUSTER storage engine (%u) at %s on port %d",
i,
161 g_pool[
i]->get_connected_host(),
162 g_pool[
i]->get_connected_port()));
168 now_time= NdbTick_CurrentMillisecond();
169 }
while (res != 0 && now_time < end_time);
174 msg=
"all storage nodes connected";
178 msg=
"some storage nodes connected";
182 msg=
"no storage nodes connected (timed out)";
184 sql_print_information(
"NDB[%u]: NodeID: %d, %s",
190 for (uint
i= 0;
i < g_pool_alloc;
i++)
193 start_connect_thread(
i == 0 ? connect_callback : NULL))
195 sql_print_error(
"NDB[%u]: failed to start connect thread",
i);
196 DBUG_PRINT(
"error", (
"g_ndb_cluster_connection->start_connect_thread()"));
204 (
"NDBCLUSTER storage engine not started, "
205 "will connect using %s",
206 g_ndb_cluster_connection->
207 get_connectstring(buf,
sizeof(buf))));
213 DBUG_ASSERT(res == -1);
214 DBUG_PRINT(
"error", (
"permanent error"));
215 sql_print_error(
"NDB: error (%u) %s",
216 g_ndb_cluster_connection->get_latest_error(),
217 g_ndb_cluster_connection->get_latest_error_msg());
223 void ndbcluster_disconnect(
void)
225 DBUG_ENTER(
"ndbcluster_disconnect");
233 for (uint
i= 1;
i < g_pool_alloc;
i++)
238 my_free((uchar*) g_pool, MYF(MY_ALLOW_ZERO_PTR));
239 pthread_mutex_destroy(&g_pool_mutex);
245 if (g_ndb_cluster_connection)
246 delete g_ndb_cluster_connection;
247 g_ndb_cluster_connection= NULL;
253 pthread_mutex_lock(&g_pool_mutex);
256 if (g_pool_pos == g_pool_alloc)
258 pthread_mutex_unlock(&g_pool_mutex);
262 ulonglong ndb_get_latest_trans_gci()
264 ulonglong val= *g_ndb_cluster_connection->get_latest_trans_gci();
265 for (uint
i= 1;
i < g_pool_alloc;
i++)
267 ulonglong tmp= *g_pool[
i]->get_latest_trans_gci();
274 void ndb_set_latest_trans_gci(ulonglong val)
276 for (uint
i= 0;
i < g_pool_alloc;
i++)
278 *g_pool[
i]->get_latest_trans_gci()= val;
282 int ndb_has_node_id(uint
id)
284 for (uint
i= 0;
i < g_pool_alloc;
i++)
286 if (
id == g_pool[
i]->node_id())
292 void ndb_get_connection_stats(Uint64* statsArr)
294 Uint64 connectionStats[ Ndb::NumClientStatistics ];
295 memset(statsArr, 0,
sizeof(connectionStats));
297 for (uint
i=0;
i < g_pool_alloc;
i++)
301 for (Uint32 s=0; s < Ndb::NumClientStatistics; s++)
302 statsArr[s]+= connectionStats[s];