18 #include <ndb_global.h>
20 #include "ndb_cluster_connection_impl.hpp"
21 #include <mgmapi_configuration.hpp>
22 #include <mgmapi_config_parameters.h>
23 #include "TransporterFacade.hpp"
26 #include <NdbThread.h>
27 #include <ndb_limits.h>
28 #include <ConfigRetriever.hpp>
29 #include <ndb_version.h>
30 #include <mgmapi_debug.h>
31 #include <mgmapi_internal.h>
32 #include "NdbImpl.hpp"
33 #include "NdbDictionaryImpl.hpp"
37 NdbMutex *ndb_print_state_mutex= NULL;
40 #include <EventLogger.hpp>
43 static int g_ndb_connection_count = 0;
73 Ndb_cluster_connection::~Ndb_cluster_connection()
80 int Ndb_cluster_connection::get_connected_port()
const
82 if (m_impl.m_config_retriever)
83 return m_impl.m_config_retriever->get_mgmd_port();
87 const char *Ndb_cluster_connection::get_connected_host()
const
89 if (m_impl.m_config_retriever)
90 return m_impl.m_config_retriever->get_mgmd_host();
94 const char *Ndb_cluster_connection::get_connectstring(
char *
buf,
97 if (m_impl.m_config_retriever)
98 return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
104 run_ndb_cluster_connection_connect_thread(
void *me)
107 connection->m_run_connect_thread= 1;
108 connection->connect_thread();
112 int Ndb_cluster_connection::start_connect_thread(
int (*connect_callback)(
void))
115 DBUG_ENTER(
"Ndb_cluster_connection::start_connect_thread");
116 m_impl.m_connect_callback= connect_callback;
119 DBUG_PRINT(
"info",(
"starting thread"));
120 m_impl.m_connect_thread=
121 NdbThread_Create(run_ndb_cluster_connection_connect_thread,
124 "ndb_cluster_connection",
125 NDB_THREAD_PRIO_LOW);
131 else if (m_impl.m_connect_callback)
133 (*m_impl.m_connect_callback)();
138 void Ndb_cluster_connection::set_optimized_node_selection(
int val)
140 m_impl.m_optimized_node_selection= val;
144 Ndb_cluster_connection_impl::init_get_next_node
147 if (iter.scan_state != (Uint8)~0)
148 iter.cur_pos= iter.scan_state;
149 if (iter.cur_pos >= no_db_nodes())
151 iter.init_pos= iter.cur_pos;
160 Uint32 cur_pos= iter.cur_pos;
161 if (cur_pos >= no_db_nodes())
164 Ndb_cluster_connection_impl::Node *nodes= m_all_nodes.getBase();
165 Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
167 if (iter.scan_state != (Uint8)~0)
169 assert(iter.scan_state < no_db_nodes());
170 if (nodes[iter.scan_state].group == node.group)
173 return nodes[iter.scan_state++].id;
179 Uint32 init_pos= iter.init_pos;
180 if (cur_pos == node.next_group)
182 cur_pos= nodes[init_pos].this_group;
186 if (cur_pos != init_pos)
187 iter.cur_pos= cur_pos;
190 iter.cur_pos= node.next_group;
191 iter.init_pos= node.next_group;
202 if (tp == 0 || tp->ownId() == 0)
205 while ((
id = get_next_node(iter)))
208 if (tp->get_node_alive(
id) != 0)
219 Ndb_cluster_connection::no_db_nodes()
221 return m_impl.m_all_nodes.size();
225 Ndb_cluster_connection::node_id()
227 return m_impl.m_transporter_facade->ownId();
231 Ndb_cluster_connection::max_nodegroup()
234 if (tp == 0 || tp->ownId() == 0)
239 for(
unsigned i= 0;
i < no_db_nodes();
i++)
244 trp_node n = tp->theClusterMgr->getNodeInfo(m_impl.m_all_nodes[
i].id);
245 if (n.is_confirmed() && n.m_state.
nodeGroup <= MAX_NDB_NODES)
258 }
while ((n = ng.
find(n+1)) != ng.NotFound);
263 int Ndb_cluster_connection::get_no_ready()
266 if (tp == 0 || tp->ownId() == 0)
269 unsigned int foundAliveNode = 0;
271 for(
unsigned i= 0;
i < no_db_nodes();
i++)
276 if (tp->get_node_alive(m_impl.m_all_nodes[
i].id) != 0) {
282 return foundAliveNode;
287 int timeout_after_first_alive)
289 DBUG_ENTER(
"Ndb_cluster_connection::wait_until_ready");
295 if (tp->ownId() == 0)
299 int secondsCounter = 0;
300 int milliCounter = 0;
301 int noChecksSinceFirstAliveFound = 0;
303 unsigned int foundAliveNode = get_no_ready();
305 if (foundAliveNode == no_db_nodes())
309 else if (foundAliveNode > 0)
311 noChecksSinceFirstAliveFound++;
313 if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
316 else if (secondsCounter >= timeout)
320 NdbSleep_MilliSleep(100);
322 if (milliCounter >= 1000) {
329 unsigned Ndb_cluster_connection::get_connect_count()
const
331 return m_impl.get_connect_count();
334 unsigned Ndb_cluster_connection::get_min_db_version()
const
336 return m_impl.get_min_db_version();
339 int Ndb_cluster_connection::get_latest_error()
const
341 return m_impl.m_latest_error;
344 const char *Ndb_cluster_connection::get_latest_error_msg()
const
346 return m_impl.m_latest_error_msg.
c_str();
353 Ndb_cluster_connection_impl::
354 Ndb_cluster_connection_impl(
const char * connect_string,
356 int force_api_nodeid)
358 m_main_connection(main_connection),
359 m_optimized_node_selection(1),
360 m_run_connect_thread(0),
361 m_latest_trans_gci(0),
362 m_first_ndb_object(0),
363 m_latest_error_msg(),
367 DBUG_ENTER(
"Ndb_cluster_connection");
368 DBUG_PRINT(
"enter",(
"Ndb_cluster_connection this=0x%lx", (
long)
this));
370 NdbMutex_Lock(g_ndb_connection_mutex);
371 if(g_ndb_connection_count++ == 0)
373 NdbColumnImpl::create_pseudo_columns();
376 g_eventLogger->
enable(Logger::LL_ON, Logger::LL_ERROR);
382 g_eventLogger->setRepeatFrequency(0);
384 NdbMutex_Unlock(g_ndb_connection_mutex);
386 m_event_add_drop_mutex= NdbMutex_Create();
387 m_new_delete_ndb_mutex = NdbMutex_Create();
390 m_connect_callback= 0;
393 memset(globalApiStatsBaseline, 0,
sizeof(globalApiStatsBaseline));
396 if (ndb_print_state_mutex == NULL)
397 ndb_print_state_mutex= NdbMutex_Create();
402 if (m_config_retriever->hasError())
405 m_latest_error_msg.assfmt
406 (
"Could not initialize handle to management server: %s",
407 m_config_retriever->getErrorString());
408 printf(
"%s\n", get_latest_error_msg());
410 if (!m_main_connection)
417 assert(m_main_connection->m_impl.m_globalDictCache != NULL);
418 m_globalDictCache = 0;
419 m_transporter_facade=
424 m_config_retriever->setNodeId(0);
431 Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
433 DBUG_ENTER(
"~Ndb_cluster_connection");
435 if (m_first_ndb_object != 0)
437 g_eventLogger->
warning(
"Deleting Ndb_cluster_connection with Ndb-object"
439 Ndb * p = m_first_ndb_object;
444 p = p->theImpl->m_next_ndb_object;
450 if (m_transporter_facade != 0)
454 if (m_globalDictCache)
456 delete m_globalDictCache;
458 if (m_connect_thread)
461 m_run_connect_thread= 0;
462 NdbThread_WaitFor(m_connect_thread, &status);
463 NdbThread_Destroy(&m_connect_thread);
466 if (m_transporter_facade != 0)
468 delete m_transporter_facade;
469 m_transporter_facade = 0;
471 if (m_config_retriever)
473 delete m_config_retriever;
474 m_config_retriever= NULL;
477 if (ndb_print_state_mutex != NULL)
479 NdbMutex_Destroy(ndb_print_state_mutex);
480 ndb_print_state_mutex= NULL;
484 NdbMutex_Lock(g_ndb_connection_mutex);
485 if(--g_ndb_connection_count == 0)
487 NdbColumnImpl::destory_pseudo_columns();
489 NdbMutex_Unlock(g_ndb_connection_mutex);
491 if (m_event_add_drop_mutex)
492 NdbMutex_Destroy(m_event_add_drop_mutex);
493 m_event_add_drop_mutex = 0;
495 if (m_new_delete_ndb_mutex)
496 NdbMutex_Destroy(m_new_delete_ndb_mutex);
497 m_new_delete_ndb_mutex = 0;
505 NdbMutex_Lock(m_impl.m_new_delete_ndb_mutex);
511 NdbMutex_Unlock(m_impl.m_new_delete_ndb_mutex);
518 return m_impl.m_first_ndb_object;
520 return p->theImpl->m_next_ndb_object;
524 Ndb_cluster_connection_impl::link_ndb_object(
Ndb* p)
527 if (m_first_ndb_object != 0)
529 m_first_ndb_object->theImpl->m_prev_ndb_object = p;
532 p->theImpl->m_next_ndb_object = m_first_ndb_object;
533 m_first_ndb_object = p;
535 p->theFirstTransId += m_max_trans_id;
540 Ndb_cluster_connection_impl::unlink_ndb_object(
Ndb* p)
543 Ndb* prev = p->theImpl->m_prev_ndb_object;
544 Ndb* next = p->theImpl->m_next_ndb_object;
548 assert(m_first_ndb_object == p);
549 m_first_ndb_object = next;
553 prev->theImpl->m_next_ndb_object = next;
558 next->theImpl->m_prev_ndb_object = prev;
561 p->theImpl->m_prev_ndb_object = 0;
562 p->theImpl->m_next_ndb_object = 0;
564 Uint32 transId = (Uint32)p->theFirstTransId;
565 if (transId > m_max_trans_id)
567 m_max_trans_id = transId;
574 for (Uint32
i=0;
i<Ndb::NumClientStatistics;
i++)
576 globalApiStatsBaseline[
i] += p->theImpl->clientStats[
i];
583 Ndb_cluster_connection_impl::set_name(
const char *
name)
590 Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
594 DBUG_ENTER(
"Ndb_cluster_connection_impl::init_nodes_vector");
597 for(iter.first(); iter.valid(); iter.next())
599 Uint32 nodeid1, nodeid2, remoteNodeId,
group= 5;
600 const char * remoteHostName= 0, * localHostName= 0;
601 if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1))
continue;
602 if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2))
continue;
604 if(nodeid1 != nodeid && nodeid2 != nodeid)
continue;
605 remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
607 iter.get(CFG_CONNECTION_GROUP, &group);
610 const char * host1= 0, * host2= 0;
611 iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
612 iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
613 localHostName = (nodeid == nodeid1 ? host1 : host2);
614 remoteHostName = (nodeid == nodeid1 ? host2 : host1);
618 if(iter.get(CFG_TYPE_OF_SECTION, &type))
continue;
621 case CONNECTION_TYPE_SHM:{
624 case CONNECTION_TYPE_SCI:{
627 case CONNECTION_TYPE_TCP:{
635 if (m_all_nodes.push_back(Node(group,remoteNodeId)))
639 DBUG_PRINT(
"info",(
"saved %d %d", group,remoteNodeId));
640 for (
int i= m_all_nodes.size()-2;
641 i >= 0 && m_all_nodes[
i].group > m_all_nodes[
i+1].group;
644 Node tmp= m_all_nodes[
i];
645 m_all_nodes[
i]= m_all_nodes[
i+1];
646 m_all_nodes[
i+1]= tmp;
651 Uint32 cur_group, i_group= 0;
653 for (i= (
int)m_all_nodes.size()-1; i >= 0; i--)
655 if (m_all_nodes[i].group != cur_group)
657 cur_group= m_all_nodes[
i].group;
660 m_all_nodes[
i].next_group= i_group;
663 for (i= 0; i < (int)m_all_nodes.size(); i++)
665 if (m_all_nodes[i].group != cur_group)
667 cur_group= m_all_nodes[
i].group;
670 m_all_nodes[
i].this_group= i_group;
673 for (i= 0; i < (int)m_all_nodes.size(); i++)
675 fprintf(stderr,
"[%d] %d %d %d %d\n",
678 m_all_nodes[i].group,
679 m_all_nodes[i].this_group,
680 m_all_nodes[i].next_group);
689 Ndb_cluster_connection_impl::get_db_nodes(Uint8 arr[MAX_NDB_NODES])
const
691 Uint32 cnt = (Uint32)m_all_nodes.size();
692 assert(cnt < MAX_NDB_NODES);
693 const Node *nodes = m_all_nodes.getBase();
694 for (Uint32 i = 0; i<cnt; i++)
695 arr[i] = (Uint8)nodes[
i].id;
700 Ndb_cluster_connection_impl::configure(Uint32 nodeId,
703 DBUG_ENTER(
"Ndb_cluster_connection_impl::configure");
706 if(iter.find(CFG_NODE_ID, nodeId))
710 Uint32 scan_batch_size= 0;
711 if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
712 m_config.m_scan_batch_size= scan_batch_size;
714 Uint32 batch_byte_size= 0;
715 if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
716 m_config.m_batch_byte_size= batch_byte_size;
718 Uint32 batch_size= 0;
719 if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
720 m_config.m_batch_size= batch_size;
724 Uint32 timeout = 120000;
725 for (iter.first(); iter.valid(); iter.next())
727 Uint32 tmp1 = 0, tmp2 = 0;
728 iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
729 iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
734 m_config.m_waitfor_timeout = timeout;
737 if (!iter.get(CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION, &queue))
739 m_config.m_default_queue_option = queue;
742 DBUG_RETURN(init_nodes_vector(nodeId, config));
746 Ndb_cluster_connection_impl::do_test()
749 int n= no_db_nodes()+5;
750 Uint32 *nodes=
new Uint32[n+1];
752 for (
int g= 0; g <
n; g++)
754 for (
int h= 0; h <
n; h++)
759 for (
int j= 0; j < g; j++)
761 nodes[j]= get_next_node(iter2);
765 for (
int i= 0; i <
n; i++)
767 init_get_next_node(iter);
768 fprintf(stderr,
"%d dead:(", g);
772 if ((
id= get_next_node(iter)) == 0)
774 for (
int j= 0; j < g; j++)
778 fprintf(stderr,
" %d",
id);
784 fprintf(stderr,
")");
789 fprintf(stderr,
" %d\n",
id);
791 fprintf(stderr,
"\n");
799 m_impl.set_name(name);
802 int Ndb_cluster_connection_impl::connect(
int no_retries,
803 int retry_delay_in_seconds,
806 DBUG_ENTER(
"Ndb_cluster_connection::connect");
808 if (m_config_retriever == 0)
813 m_latest_error_msg.
assign(
"Ndb_cluster_connection init "
814 "error: m_config_retriever==0");
816 DBUG_PRINT(
"exit", (
"no m_config_retriever, ret: -1"));
819 if (m_config_retriever->do_connect(no_retries,
820 retry_delay_in_seconds,
825 m_latest_error_msg.
assfmt(
"Connect using '%s' timed out",
826 get_connectstring(buf,
sizeof(buf)));
827 DBUG_PRINT(
"exit", (
"mgmt server not up yet, ret: 1"));
839 if (configure(nodeId, *props))
841 ndb_mgm_destroy_configuration(props);
842 DBUG_PRINT(
"exit", (
"malloc failure, ret: -1"));
848 ndb_mgm_destroy_configuration(props);
852 ndb_mgm_destroy_configuration(props);
853 m_transporter_facade->connected();
855 m_latest_error_msg.
assign(
"");
856 DBUG_PRINT(
"exit", (
"connect ok, ret: 0"));
860 const char* erString = m_config_retriever->getErrorString();
862 erString =
"No error specified!";
865 m_latest_error_msg.
assfmt(
"Configuration error: %s", erString);
866 ndbout << get_latest_error_msg() << endl;
867 DBUG_PRINT(
"exit", (
"connect failed, '%s' ret: -1", erString));
874 int retry_delay_in_seconds,
877 return m_impl.connect(no_retries, retry_delay_in_seconds, verbose);
881 void Ndb_cluster_connection_impl::connect_thread()
883 DBUG_ENTER(
"Ndb_cluster_connection_impl::connect_thread");
886 NdbSleep_SecSleep(1);
887 if ((r = connect(0,0,0)) == 0)
890 printf(
"Ndb_cluster_connection::connect_thread error\n");
892 m_run_connect_thread= 0;
895 NdbSleep_SecSleep(1);
897 }
while (m_run_connect_thread);
898 if (m_connect_callback)
899 (*m_connect_callback)();
904 Ndb_cluster_connection::get_latest_trans_gci()
906 return m_impl.get_latest_trans_gci();
912 m_impl.init_get_next_node(iter);
918 return m_impl.get_next_node(iter);
924 return m_impl.get_next_alive_node(iter);
928 Ndb_cluster_connection::get_active_ndb_objects()
const
930 return m_impl.m_transporter_facade->get_active_ndb_objects();
940 Ndb_cluster_connection::get_auto_reconnect()
const
942 return m_impl.m_transporter_facade->get_auto_reconnect();
948 m_impl.m_transporter_facade->set_auto_reconnect(value);
964 const Uint32 relevant = MIN((Uint32)Ndb::NumClientStatistics, sz);
965 const Ndb* ndb = NULL;
968 memcpy(statsArr, &m_impl.globalApiStatsBaseline[0],
sizeof(Uint64)*relevant);
972 for (Uint32 i=0; i<relevant; i++)
974 statsArr[
i] += ndb->theImpl->clientStats[
i];