29 MYSQL* mysql = &proc->m_mysql;
30 g_logger.
debug(
"'%s@%s' - Running query '%s'",
31 proc->m_cluster->m_name.
c_str(),
32 proc->m_host->m_hostname.
c_str(),
35 if (mysql_query(mysql, query))
37 g_logger.
error(
"'%s@%s' - Failed to run query '%s' %d:%s",
38 proc->m_cluster->m_name.
c_str(),
39 proc->m_host->m_hostname.
c_str(),
48 static const char* create_sql[] = {
49 "create database atrt",
54 " id int primary key,"
60 "create table cluster ("
61 " id int primary key,"
64 " ) engine = myisam;",
66 "create table process ("
67 " id int primary key,"
68 " host_id int not null,"
69 " cluster_id int not null,"
70 " node_id int not null,"
71 " type enum ('ndbd', 'ndbapi', 'ndb_mgmd', 'mysqld', 'mysql') not null,"
72 " state enum ('starting', 'started', 'stopping', 'stopped') not null"
73 " ) engine = myisam;",
75 "create table options ("
76 " id int primary key,"
77 " process_id int not null,"
78 " name varchar(255) not null,"
79 " value varchar(255) not null"
80 " ) engine = myisam;",
83 " id int auto_increment primary key,"
84 " master_id int not null,"
85 " slave_id int not null"
86 " ) engine = myisam;",
88 "create table command ("
89 " id int auto_increment primary key,"
90 " state enum ('new', 'running', 'done') not null default 'new',"
92 " process_id int not null,"
93 " process_args varchar(255) default NULL"
94 " ) engine = myisam;",
107 for (
size_t i = 0;
i<config.m_clusters.size();
i++)
109 if (strcmp(config.m_clusters[
i]->m_name.c_str(),
".atrt") == 0)
111 cluster = config.m_clusters[
i];
113 for (
size_t i = 0;
i<cluster->m_processes.size();
i++)
115 if (cluster->m_processes[
i]->m_type == atrt_process::AP_CLIENT)
117 atrt_client = cluster->m_processes[
i];
130 for (
size_t i = 0;
i<config.m_processes.size();
i++)
133 if (proc->m_type == atrt_process::AP_MYSQLD)
135 if (!connect_mysqld(config.m_processes[
i]))
146 for (
int i = 0; create_sql[
i];
i++)
148 const char* query = create_sql[
i];
149 if (!run_query(atrt_mysqld, query))
153 if (!populate_db(config, atrt_mysqld))
160 if (setup_repl(config) !=
true)
171 const char * res = 0;
172 if (proc->m_options.m_loaded.get(key, &res))
175 proc->m_options.m_generated.get(key, &res);
182 if ( !mysql_init(&proc->m_mysql))
184 g_logger.
error(
"Failed to init mysql");
188 const char * port = find(proc,
"--port=");
189 const char * socket = find(proc,
"--socket=");
190 if (port == 0 && socket == 0)
192 g_logger.
error(
"Neither socket nor port specified...cant connect to mysql");
196 for (
size_t i = 0;
i<20;
i++)
200 mysql_protocol_type val = MYSQL_PROTOCOL_TCP;
201 mysql_options(&proc->m_mysql, MYSQL_OPT_PROTOCOL, &val);
203 if (mysql_real_connect(&proc->m_mysql,
204 proc->m_host->m_hostname.
c_str(),
206 port ? atoi(port) : 0,
212 g_logger.
info(
"Retrying connect to %s:%u 3s",
213 proc->m_host->m_hostname.
c_str(),atoi(port));
214 NdbSleep_SecSleep(3);
217 g_logger.
error(
"Failed to connect to mysqld err: >%s< >%s:%u:%s<",
218 mysql_error(&proc->m_mysql),
219 proc->m_host->m_hostname.
c_str(), port ? atoi(port) : 0,
220 socket ? socket :
"<null>");
227 bind.buffer_type= MYSQL_TYPE_LONG;
228 bind.buffer= (
char*)i;
234 BINDS(
MYSQL_BIND& bind,
const char * s,
unsigned long * len)
236 bind.buffer_type= MYSQL_TYPE_STRING;
237 bind.buffer= (
char*)s;
238 bind.buffer_length= * len = strlen(s);
243 template <
typename T>
247 for (
size_t i = 0;
i<arr.size();
i++)
261 const char *
name = it.first();
262 for (;
name; name = it.next())
265 int proc_id = process_id;
266 unsigned long l0, l1;
268 p->get(name, &value);
270 bzero(bind2,
sizeof(bind2));
271 BINDI(bind2[0], &optid);
272 BINDI(bind2[1], &proc_id);
273 BINDS(bind2[2], name, &l0);
274 BINDS(bind2[3], value, &l1);
276 if (mysql_stmt_bind_param(stmt, bind2))
278 g_logger.
error(
"Failed to bind: %s", mysql_error(mysql));
282 if (mysql_stmt_execute(stmt))
284 g_logger.
error(
"0 Failed to execute: %s", mysql_error(mysql));
298 const char * sql =
"INSERT INTO host (id, name, port) values (?, ?, ?)";
299 MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
300 if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
302 g_logger.
error(
"Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
306 for (
size_t i = 0;
i<config.m_hosts.size();
i++)
310 bzero(bind,
sizeof(bind));
312 int port = config.m_hosts[
i]->m_cpcd->getPort();
314 BINDS(bind[1], config.m_hosts[
i]->m_hostname.c_str(), &l0);
315 BINDI(bind[2], &port);
316 if (mysql_stmt_bind_param(stmt, bind))
318 g_logger.
error(
"Failed to bind: %s", mysql_error(&mysqld->m_mysql));
322 if (mysql_stmt_execute(stmt))
324 g_logger.
error(
"1 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
328 mysql_stmt_close(stmt);
332 const char * sql =
"INSERT INTO cluster (id, name) values (?, ?)";
333 MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
334 if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
336 g_logger.
error(
"Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
340 for (
size_t i = 0;
i<config.m_clusters.size();
i++)
344 bzero(bind,
sizeof(bind));
347 BINDS(bind[1], config.m_clusters[
i]->m_name.c_str(), &l0);
349 if (mysql_stmt_bind_param(stmt, bind))
351 g_logger.
error(
"Failed to bind: %s", mysql_error(&mysqld->m_mysql));
355 if (mysql_stmt_execute(stmt))
357 g_logger.
error(
"2 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
361 mysql_stmt_close(stmt);
366 "INSERT INTO process (id, host_id, cluster_id, type, state, node_id) values (?,?,?,?,?,?)";
368 const char * sqlopt =
369 "INSERT INTO options (id, process_id, name, value) values (?,?,?,?)";
371 MYSQL_STMT * stmt = mysql_stmt_init(&mysqld->m_mysql);
372 if (mysql_stmt_prepare(stmt, sql, strlen(sql)))
374 g_logger.
error(
"Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
378 MYSQL_STMT * stmtopt = mysql_stmt_init(&mysqld->m_mysql);
379 if (mysql_stmt_prepare(stmtopt, sqlopt, strlen(sqlopt)))
381 g_logger.
error(
"Failed to prepare: %s", mysql_error(&mysqld->m_mysql));
386 for (
size_t i = 0;
i<config.m_processes.size();
i++)
388 unsigned long l0, l1;
390 bzero(bind,
sizeof(bind));
393 int host_id = find(proc->m_host, config.m_hosts);
394 int cluster_id = find(proc->m_cluster, config.m_clusters);
395 int node_id= proc->m_nodeid;
397 const char *
type = 0;
398 const char * state =
"started";
399 switch(proc->m_type){
400 case atrt_process::AP_NDBD: type =
"ndbd";
break;
401 case atrt_process::AP_NDB_API: type =
"ndbapi"; state =
"stopped";
break;
402 case atrt_process::AP_NDB_MGMD: type =
"ndb_mgmd";
break;
403 case atrt_process::AP_MYSQLD: type =
"mysqld";
break;
404 case atrt_process::AP_CLIENT: type =
"mysql"; state =
"stopped";
break;
410 BINDI(bind[1], &host_id);
411 BINDI(bind[2], &cluster_id);
412 BINDS(bind[3], type, &l0);
413 BINDS(bind[4], state, &l1);
414 BINDI(bind[5], &node_id);
416 if (mysql_stmt_bind_param(stmt, bind))
418 g_logger.
error(
"Failed to bind: %s", mysql_error(&mysqld->m_mysql));
422 if (mysql_stmt_execute(stmt))
424 g_logger.
error(
"3 Failed to execute: %s", mysql_error(&mysqld->m_mysql));
428 if (populate_options(&mysqld->m_mysql, stmtopt, &option_id,
id,
429 &proc->m_options.m_loaded) ==
false)
432 if (populate_options(&mysqld->m_mysql, stmtopt, &option_id,
id,
433 &proc->m_cluster->m_options.m_loaded) ==
false)
437 mysql_stmt_close(stmt);
438 mysql_stmt_close(stmtopt);
448 if (!run_query(src,
"STOP SLAVE"))
450 g_logger.
error(
"Failed to stop slave: %s",
451 mysql_error(&src->m_mysql));
455 if (!run_query(src,
"RESET SLAVE"))
457 g_logger.
error(
"Failed to reset slave: %s",
458 mysql_error(&src->m_mysql));
463 tmp.
assfmt(
"CHANGE MASTER TO "
464 " MASTER_HOST='%s', "
466 dst->m_host->m_hostname.
c_str(),
467 atoi(find(dst,
"--port=")));
469 if (!run_query(src, tmp.
c_str()))
471 g_logger.
error(
"Failed to setup repl from %s to %s: %s",
472 src->m_host->m_hostname.
c_str(),
473 dst->m_host->m_hostname.
c_str(),
474 mysql_error(&src->m_mysql));
478 if (!run_query(src,
"START SLAVE"))
480 g_logger.
error(
"Failed to start slave: %s",
481 mysql_error(&src->m_mysql));
485 g_logger.
info(
"Replication from %s(%s) to %s(%s) setup",
486 src->m_host->m_hostname.
c_str(),
487 src->m_cluster->m_name.
c_str(),
488 dst->m_host->m_hostname.
c_str(),
489 dst->m_cluster->m_name.
c_str());
497 for (
size_t i = 0;
i<config.m_processes.size();
i++)
502 if (setup_repl(dst->m_rep_src, dst) !=
true)