28 #include "my_global.h"
30 #include "sql_parse.h"
31 #include "sql_table.h"
34 #include "rpl_filter.h"
35 #include "rpl_info_factory.h"
36 #include "transaction.h"
37 #include <thr_alarm.h>
39 #include <sql_common.h>
41 #include <mysqld_error.h>
42 #include <mysys_err.h>
43 #include "rpl_handler.h"
44 #include "rpl_info_dummy.h"
54 #include "dynamic_ids.h"
55 #include "rpl_rli_pdb.h"
56 #include "global_threads.h"
58 #ifdef HAVE_REPLICATION
60 #include "rpl_tblmap.h"
66 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
68 #define MAX_SLAVE_RETRY_PAUSE 5
72 #define SLAVE_WAIT_GROUP_DONE 60
73 bool use_slave_mask = 0;
75 char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
77 static unsigned long stop_wait_timeout;
78 char* slave_load_tmpdir = 0;
79 Master_info *active_mi= 0;
80 my_bool replicate_same_server_id;
81 ulonglong relay_log_space_limit = 0;
83 const char *relay_log_index= 0;
84 const char *relay_log_basename= 0;
92 const ulong mts_slave_worker_queue_len_max= 16384;
97 const long mts_online_stat_period= 60 * 2;
105 const ulong mts_coordinator_basic_nap= 5;
121 const ulong mts_worker_underrun_level= 10;
123 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *
ret);
124 bool append_item_to_jobs(slave_job_item *job_item,
134 int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
138 enum enum_slave_reconnect_actions
140 SLAVE_RECON_ACT_REG= 0,
141 SLAVE_RECON_ACT_DUMP= 1,
142 SLAVE_RECON_ACT_EVENT= 2,
146 enum enum_slave_reconnect_messages
148 SLAVE_RECON_MSG_WAIT= 0,
149 SLAVE_RECON_MSG_KILLED_WAITING= 1,
150 SLAVE_RECON_MSG_AFTER= 2,
151 SLAVE_RECON_MSG_FAILED= 3,
152 SLAVE_RECON_MSG_COMMAND= 4,
153 SLAVE_RECON_MSG_KILLED_AFTER= 5,
157 static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
160 "Waiting to reconnect after a failed registration on master",
161 "Slave I/O thread killed while waitnig to reconnect after a failed \
162 registration on master",
163 "Reconnecting after a failed registration on master",
164 "failed registering on master, reconnecting to try again, \
165 log '%s' at position %s",
166 "COM_REGISTER_SLAVE",
167 "Slave I/O thread killed during or after reconnect"
170 "Waiting to reconnect after a failed binlog dump request",
171 "Slave I/O thread killed while retrying master dump",
172 "Reconnecting after a failed binlog dump request",
173 "failed dump request, reconnecting to try again, log '%s' at position %s",
175 "Slave I/O thread killed during or after reconnect"
178 "Waiting to reconnect after a failed master event read",
179 "Slave I/O thread killed while waiting to reconnect after a failed read",
180 "Reconnecting after a failed master event read",
181 "Slave I/O thread: Failed reading log event, reconnecting to retry, \
182 log '%s' at position %s",
184 "Slave I/O thread killed during or after a reconnect done to recover from \
189 enum enum_slave_apply_event_and_update_pos_retval
191 SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK= 0,
192 SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR= 1,
193 SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR= 2,
194 SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR= 3,
195 SLAVE_APPLY_EVENT_AND_UPDATE_POS_MAX
202 static inline bool io_slave_killed(THD* thd,Master_info* mi);
203 static inline bool sql_slave_killed(THD* thd,
Relay_log_info* rli);
204 static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
205 static void print_slave_skip_errors(
void);
206 static int safe_connect(THD* thd,
MYSQL* mysql, Master_info* mi);
207 static int safe_reconnect(THD* thd,
MYSQL* mysql, Master_info* mi,
208 bool suppress_warnings);
209 static int connect_to_master(THD* thd,
MYSQL* mysql, Master_info* mi,
210 bool reconnect,
bool suppress_warnings);
211 static int get_master_version_and_clock(
MYSQL* mysql, Master_info* mi);
212 static int get_master_uuid(
MYSQL *mysql, Master_info *mi);
213 int io_thread_init_commands(
MYSQL *mysql, Master_info *mi);
215 static int queue_event(Master_info* mi,
const char*
buf,ulong event_len);
216 static void set_stop_slave_wait_timeout(
unsigned long wait_timeout);
217 static int terminate_slave_thread(THD *thd,
220 volatile uint *slave_running,
221 bool need_lock_term);
222 static bool check_io_slave_killed(THD *thd, Master_info *mi,
const char *info);
233 static void set_slave_max_allowed_packet(THD *thd,
MYSQL *mysql)
235 DBUG_ENTER(
"set_slave_max_allowed_packet");
237 DBUG_ASSERT(thd && mysql);
239 thd->variables.max_allowed_packet= slave_max_allowed_packet;
240 thd->net.max_packet_size= slave_max_allowed_packet;
247 thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
252 mysql->options.max_allowed_packet=
253 slave_max_allowed_packet+MAX_LOG_EVENT_HEADER;
275 void init_thread_mask(
int* mask, Master_info* mi,
bool inverse)
277 bool set_io = mi->slave_running, set_sql = mi->rli->slave_running;
278 register int tmp_mask=0;
279 DBUG_ENTER(
"init_thread_mask");
282 tmp_mask |= SLAVE_IO;
284 tmp_mask |= SLAVE_SQL;
286 tmp_mask^= (SLAVE_IO | SLAVE_SQL);
296 void lock_slave_threads(Master_info* mi)
298 DBUG_ENTER(
"lock_slave_threads");
311 void unlock_slave_threads(Master_info* mi)
313 DBUG_ENTER(
"unlock_slave_threads");
321 #ifdef HAVE_PSI_INTERFACE
322 static PSI_thread_key key_thread_slave_io, key_thread_slave_sql, key_thread_slave_worker;
324 static PSI_thread_info all_slave_threads[]=
326 { &key_thread_slave_io,
"slave_io", PSI_FLAG_GLOBAL},
327 { &key_thread_slave_sql,
"slave_sql", PSI_FLAG_GLOBAL},
328 { &key_thread_slave_worker,
"slave_worker", PSI_FLAG_GLOBAL}
331 static void init_slave_psi_keys(
void)
333 const char* category=
"sql";
336 count= array_elements(all_slave_threads);
345 DBUG_ENTER(
"init_slave");
347 int thread_mask= SLAVE_SQL | SLAVE_IO;
350 #ifdef HAVE_PSI_INTERFACE
351 init_slave_psi_keys();
361 if (pthread_key_create(&RPL_MASTER_INFO, NULL))
364 if ((error= Rpl_info_factory::create_coordinators(opt_mi_repository_id, &active_mi,
365 opt_rli_repository_id, &rli)))
367 sql_print_error(
"Failed to create or recover replication info repository.");
376 if (global_init_info(active_mi,
true, thread_mask))
378 sql_print_error(
"Failed to initialize the master info structure");
383 DBUG_PRINT(
"info", (
"init group master %s %lu group relay %s %lu event %s %lu\n",
384 rli->get_group_master_log_name(),
385 (ulong) rli->get_group_master_log_pos(),
386 rli->get_group_relay_log_name(),
387 (ulong) rli->get_group_relay_log_pos(),
388 rli->get_event_relay_log_name(),
389 (ulong) rli->get_event_relay_log_pos()));
392 if (active_mi->host[0] && !opt_skip_slave_start)
395 active_mi->rli->opt_slave_parallel_workers= opt_mts_slave_parallel_workers;
396 active_mi->rli->checkpoint_group= opt_mts_checkpoint_group;
397 if (start_slave_threads(
true,
402 sql_print_error(
"Failed to create slave threads");
411 sql_print_information(
"Check error log for additional messages. "
412 "You will not be able to start replication until "
413 "the issue is resolved and the server restarted.");
444 int init_recovery(Master_info* mi,
const char** errmsg)
446 DBUG_ENTER(
"init_recovery");
450 char *group_master_log_name= NULL;
452 if (rli->recovery_parallel_workers)
465 error= mts_recovery_groups(rli);
466 if (rli->mts_recovery_group_cnt)
469 sql_print_error(
"--relay-log-recovery cannot be executed when the slave "
470 "was stopped with an error or killed in MTS mode; "
471 "consider using RESET SLAVE or restart the server "
472 "with --relay-log-recovery = 0 followed by "
473 "START SLAVE UNTIL SQL_AFTER_MTS_GAPS");
477 group_master_log_name=
const_cast<char *
>(rli->get_group_master_log_name());
478 if (!error && group_master_log_name[0])
480 mi->set_master_log_pos(max<ulonglong>(BIN_LOG_HEADER_SIZE,
481 rli->get_group_master_log_pos()));
482 mi->set_master_log_name(rli->get_group_master_log_name());
484 sql_print_warning(
"Recovery from master pos %ld and file %s.",
485 (ulong) mi->get_master_log_pos(), mi->get_master_log_name());
487 rli->set_group_relay_log_name(rli->relay_log.get_log_fname());
488 rli->set_event_relay_log_name(rli->relay_log.get_log_fname());
489 rli->set_group_relay_log_pos(BIN_LOG_HEADER_SIZE);
490 rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
497 global_sid_lock->
wrlock();
498 (
const_cast<Gtid_set *
>(rli->get_gtid_set()))->clear();
499 global_sid_lock->
unlock();
503 int global_init_info(Master_info* mi,
bool ignore_if_no_info,
int thread_mask)
505 DBUG_ENTER(
"init_info");
506 DBUG_ASSERT(mi != NULL && mi->rli != NULL);
508 enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
509 THD *thd= current_thd;
523 if (thd && thd->in_multi_stmt_transaction_mode() &&
524 (opt_mi_repository_id == INFO_REPOSITORY_TABLE ||
525 opt_rli_repository_id == INFO_REPOSITORY_TABLE))
526 if (trans_begin(thd))
539 check_return= mi->check_info();
540 if (check_return == ERROR_CHECKING_REPOSITORY)
543 if (!(ignore_if_no_info && check_return == REPOSITORY_DOES_NOT_EXIST))
545 if ((thread_mask & SLAVE_IO) != 0 && mi->mi_init_info())
549 check_return= mi->rli->check_info();
550 if (check_return == ERROR_CHECKING_REPOSITORY)
552 if (!(ignore_if_no_info && check_return == REPOSITORY_DOES_NOT_EXIST))
554 if (((thread_mask & SLAVE_SQL) != 0 || !(mi->rli->inited))
555 && mi->rli->rli_init_info())
565 if (thd && thd->in_multi_stmt_transaction_mode() &&
566 (opt_mi_repository_id == INFO_REPOSITORY_TABLE ||
567 opt_rli_repository_id == INFO_REPOSITORY_TABLE))
568 if (trans_commit(thd))
573 DBUG_RETURN(check_return == ERROR_CHECKING_REPOSITORY || init_error);
576 void end_info(Master_info* mi)
578 DBUG_ENTER(
"end_info");
579 DBUG_ASSERT(mi != NULL && mi->rli != NULL);
591 int remove_info(Master_info* mi)
594 DBUG_ENTER(
"remove_info");
595 DBUG_ASSERT(mi != NULL && mi->rli != NULL);
606 mi->rli->clear_error();
607 mi->rli->clear_until_condition();
608 mi->rli->clear_sql_delay();
613 if (mi->remove_info() || Rpl_info_factory::reset_workers(mi->rli) ||
614 mi->rli->remove_info())
623 int flush_master_info(Master_info* mi,
bool force)
625 DBUG_ENTER(
"flush_master_info");
626 DBUG_ASSERT(mi != NULL && mi->rli != NULL);
651 int err= (mi->rli->flush_current_log() ||
652 mi->flush_info(force));
663 static void print_slave_skip_errors(
void)
671 const size_t MIN_ROOM= 10;
672 DBUG_ENTER(
"print_slave_skip_errors");
673 DBUG_ASSERT(
sizeof(slave_skip_error_names) > MIN_ROOM);
674 DBUG_ASSERT(MAX_SLAVE_ERROR <= 999999);
676 if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
679 memcpy(slave_skip_error_names, STRING_WITH_LEN(
"OFF"));
682 else if (bitmap_is_set_all(&slave_error_mask))
685 memcpy(slave_skip_error_names, STRING_WITH_LEN(
"ALL"));
690 char *buff= slave_skip_error_names;
691 char *bend= buff +
sizeof(slave_skip_error_names);
694 for (errnum= 0; errnum < MAX_SLAVE_ERROR; errnum++)
696 if (bitmap_is_set(&slave_error_mask, errnum))
698 if (buff + MIN_ROOM >= bend)
700 buff= int10_to_str(errnum, buff, 10);
704 if (buff != slave_skip_error_names)
706 if (errnum < MAX_SLAVE_ERROR)
709 buff= strmov(buff,
"...");
713 DBUG_PRINT(
"init", (
"error_names: '%s'", slave_skip_error_names));
717 static void set_stop_slave_wait_timeout(
unsigned long wait_timeout) {
718 stop_wait_timeout = wait_timeout;
726 void set_slave_skip_errors(
char** slave_skip_errors_ptr)
728 DBUG_ENTER(
"set_slave_skip_errors");
729 print_slave_skip_errors();
730 *slave_skip_errors_ptr= slave_skip_error_names;
737 static void init_slave_skip_errors()
739 DBUG_ENTER(
"init_slave_skip_errors");
740 DBUG_ASSERT(!use_slave_mask);
742 if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
744 fprintf(stderr,
"Badly out of memory, please check your system status\n");
751 static void add_slave_skip_errors(
const uint*
errors, uint n_errors)
753 DBUG_ENTER(
"add_slave_skip_errors");
755 DBUG_ASSERT(use_slave_mask);
757 for (uint
i = 0;
i < n_errors;
i++)
759 const uint err_code = errors[
i];
760 if (err_code < MAX_SLAVE_ERROR)
761 bitmap_set_bit(&slave_error_mask, err_code);
777 void add_slave_skip_errors(
const char* arg)
783 const uchar SKIP_ALL[]=
"all";
784 size_t SIZE_SKIP_ALL= strlen((
const char *) SKIP_ALL) + 1;
789 const uchar SKIP_DDL_ERRORS[]=
"ddl_exist_errors";
790 size_t SIZE_SKIP_DDL_ERRORS= strlen((
const char *) SKIP_DDL_ERRORS);
791 DBUG_ENTER(
"add_slave_skip_errors");
795 init_slave_skip_errors();
797 for (; my_isspace(system_charset_info,*arg); ++arg)
799 if (!my_strnncoll(system_charset_info, (uchar*)arg, SIZE_SKIP_ALL,
800 SKIP_ALL, SIZE_SKIP_ALL))
802 bitmap_set_all(&slave_error_mask);
805 if (!my_strnncoll(system_charset_info, (uchar*)arg, SIZE_SKIP_DDL_ERRORS,
806 SKIP_DDL_ERRORS, SIZE_SKIP_DDL_ERRORS))
809 const uint ddl_errors[] = {
811 ER_DB_CREATE_EXISTS, ER_TABLE_EXISTS_ERROR, ER_DUP_KEYNAME,
814 ER_BAD_FIELD_ERROR, ER_NO_SUCH_TABLE, ER_DUP_FIELDNAME,
816 ER_DB_DROP_EXISTS, ER_BAD_TABLE_ERROR, ER_CANT_DROP_FIELD_OR_KEY
819 add_slave_skip_errors(ddl_errors,
820 sizeof(ddl_errors)/
sizeof(ddl_errors[0]));
825 if (strlen(arg) > SIZE_SKIP_DDL_ERRORS + 1)
826 arg+= SIZE_SKIP_DDL_ERRORS + 1;
831 if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
833 if (err_code < MAX_SLAVE_ERROR)
834 bitmap_set_bit(&slave_error_mask,(uint)err_code);
835 while (!my_isdigit(system_charset_info,*p) && *p)
845 for (table= rli->save_temporary_tables ; table ; table= table->next)
847 table->in_use= rli->info_thd;
848 if (table->file != NULL)
855 table->file->unbind_psi();
856 table->file->rebind_psi();
861 int terminate_slave_threads(Master_info* mi,
int thread_mask,
bool need_lock_term)
863 DBUG_ENTER(
"terminate_slave_threads");
867 int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
868 mysql_mutex_t *sql_lock = &mi->rli->run_lock, *io_lock = &mi->run_lock;
870 set_stop_slave_wait_timeout(rpl_stop_slave_timeout);
872 if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
874 DBUG_PRINT(
"info",(
"Terminating SQL thread"));
875 mi->rli->abort_slave= 1;
876 if ((error=terminate_slave_thread(mi->rli->info_thd, sql_lock,
878 &mi->rli->slave_running,
884 DBUG_RETURN(ER_STOP_SLAVE_SQL_THREAD_TIMEOUT);
890 DBUG_PRINT(
"info",(
"Flushing relay-log info file."));
892 THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file);
897 if (mi->rli->flush_info(TRUE))
900 DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
905 if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
907 DBUG_PRINT(
"info",(
"Terminating IO thread"));
909 if ((error=terminate_slave_thread(mi->info_thd,io_lock,
917 DBUG_RETURN(ER_STOP_SLAVE_IO_THREAD_TIMEOUT);
923 DBUG_PRINT(
"info",(
"Flushing relay log and master info repository."));
925 THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository);
930 if (mi->flush_info(TRUE))
933 DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
939 if (mi->rli->relay_log.is_open() &&
940 mi->rli->relay_log.flush_and_sync(
true))
943 DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
987 terminate_slave_thread(THD *thd,
990 volatile uint *slave_running,
993 DBUG_ENTER(
"terminate_slave_thread");
1002 if (!*slave_running)
1015 DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
1018 DBUG_ASSERT(thd != 0);
1019 THD_CHECK_SENTRY(thd);
1026 while (*slave_running)
1029 DBUG_PRINT(
"loop", (
"killing slave thread"));
1032 #ifndef DONT_USE_THR_ALARM
1038 int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm);
1039 DBUG_ASSERT(err != EINVAL);
1041 thd->awake(THD::NOT_KILLED);
1049 set_timespec(abstime,2);
1051 if (stop_wait_timeout >= 2)
1052 stop_wait_timeout= stop_wait_timeout - 2;
1053 else if (*slave_running)
1059 DBUG_ASSERT(error == ETIMEDOUT || error == 0);
1062 DBUG_ASSERT(*slave_running == 0);
1070 int start_slave_thread(
1071 #ifdef HAVE_PSI_INTERFACE
1072 PSI_thread_key thread_key,
1077 volatile uint *slave_running,
1078 volatile ulong *slave_run_id,
1084 DBUG_ENTER(
"start_slave_thread");
1094 sql_print_error(
"Server id not set, will not start slave");
1095 DBUG_RETURN(ER_BAD_SLAVE);
1104 DBUG_RETURN(ER_SLAVE_MUST_STOP);
1106 start_id= *slave_run_id;
1107 DBUG_PRINT(
"info",(
"Creating new slave thread"));
1109 &th, &connection_attrib, h_func, (
void*)mi)))
1111 sql_print_error(
"Can't create slave thread (errno= %d).", error);
1114 DBUG_RETURN(ER_SLAVE_THREAD);
1116 if (start_cond && cond_lock)
1118 THD* thd = current_thd;
1119 while (start_id == *slave_run_id && thd != NULL)
1121 DBUG_PRINT(
"sleep",(
"Waiting for slave thread to start"));
1123 thd->ENTER_COND(start_cond, cond_lock,
1124 & stage_waiting_for_slave_thread_to_start,
1135 thd->EXIT_COND(& saved_stage);
1141 DBUG_RETURN(thd->killed_errno());
1160 int start_slave_threads(
bool need_lock_slave,
bool wait_for_start,
1161 Master_info* mi,
int thread_mask)
1163 mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0;
1166 DBUG_ENTER(
"start_slave_threads");
1167 DBUG_EXECUTE_IF(
"uninitialized_master-info_structure",
1168 mi->inited= FALSE;);
1170 if (!mi->inited || !mi->rli->inited)
1172 error= !mi->inited ? ER_SLAVE_MI_INIT_REPOSITORY :
1173 ER_SLAVE_RLI_INIT_REPOSITORY;
1175 const char* prefix= current_thd ? ER(error) : ER_DEFAULT(error);
1176 info->
report(ERROR_LEVEL, error, prefix, NULL);
1181 if (need_lock_slave)
1183 lock_io = &mi->run_lock;
1184 lock_sql = &mi->rli->run_lock;
1188 cond_io = &mi->start_cond;
1189 cond_sql = &mi->rli->start_cond;
1190 lock_cond_io = &mi->run_lock;
1191 lock_cond_sql = &mi->rli->run_lock;
1194 if (thread_mask & SLAVE_IO)
1195 error= start_slave_thread(
1196 #ifdef HAVE_PSI_INTERFACE
1197 key_thread_slave_io,
1199 handle_slave_io, lock_io, lock_cond_io,
1201 &mi->slave_running, &mi->slave_run_id,
1203 if (!error && (thread_mask & SLAVE_SQL))
1209 if (mi->rli->recovery_parallel_workers != 0)
1210 error= mts_recovery_groups(mi->rli);
1212 error= start_slave_thread(
1213 #ifdef HAVE_PSI_INTERFACE
1214 key_thread_slave_sql,
1216 handle_slave_sql, lock_sql, lock_cond_sql,
1218 &mi->rli->slave_running, &mi->rli->slave_run_id,
1221 terminate_slave_threads(mi, thread_mask & SLAVE_IO, need_lock_slave);
1235 DBUG_ENTER(
"end_slave");
1252 terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
1267 void close_active_mi()
1272 end_info(active_mi);
1274 delete active_mi->rli;
1281 static bool io_slave_killed(THD* thd, Master_info* mi)
1283 DBUG_ENTER(
"io_slave_killed");
1285 DBUG_ASSERT(mi->info_thd == thd);
1286 DBUG_ASSERT(mi->slave_running);
1287 DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
1312 bool is_parallel_warn= FALSE;
1314 DBUG_ENTER(
"sql_slave_killed");
1316 DBUG_ASSERT(rli->info_thd == thd);
1317 DBUG_ASSERT(rli->slave_running == 1);
1318 if (abort_loop || thd->killed || rli->abort_slave)
1329 if (is_parallel_warn ||
1331 thd->transaction.all.cannot_safely_rollback() && rli->
is_in_group()))
1334 "... Slave SQL Thread stopped with incomplete event group "
1335 "having non-transactional changes. "
1336 "If the group consists solely of row-based events, you can try "
1337 "to restart the slave with --slave-exec-mode=IDEMPOTENT, which "
1338 "ignores duplicate key, key not found, and similar errors (see "
1339 "documentation for details).";
1340 char msg_stopped_mts[]=
1341 "... The slave coordinator and worker threads are stopped, possibly "
1342 "leaving data in inconsistent state. A restart should "
1343 "restore consistency automatically, although using non-transactional "
1344 "storage for data or info tables or DDL queries could lead to problems. "
1345 "In such cases you have to examine your data (see documentation for "
1349 if (rli->abort_slave)
1351 DBUG_PRINT(
"info", (
"Request to stop slave SQL Thread received while "
1352 "applying an MTS group or a group that "
1353 "has non-transactional "
1354 "changes; waiting for completion of the group ... "));
1366 if (rli->last_event_start_time == 0)
1367 rli->last_event_start_time= my_time(0);
1368 ret= difftime(my_time(0), rli->last_event_start_time) <=
1369 SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE;
1371 DBUG_EXECUTE_IF(
"stop_slave_middle_group",
1372 DBUG_EXECUTE_IF(
"incomplete_group_in_relay_log",
1375 if (!ret && !rli->reported_unsafe_warning)
1377 rli->
report(WARNING_LEVEL, 0,
1379 "Request to stop slave SQL Thread received while "
1380 "applying a group that has non-transactional "
1381 "changes; waiting for completion of the group ... "
1383 "Coordinator thread of multi-threaded slave is being "
1384 "stopped in the middle of assigning a group of events; "
1385 "deferring to exit until the group completion ... ");
1386 rli->reported_unsafe_warning=
true;
1391 if (is_parallel_warn)
1392 rli->
report(!rli->is_error() ? ERROR_LEVEL :
1394 ER_MTS_INCONSISTENT_DATA,
1395 ER(ER_MTS_INCONSISTENT_DATA),
1398 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1399 ER(ER_SLAVE_FATAL_ERROR), msg_stopped);
1409 rli->last_event_start_time= 0;
1410 if (rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)
1412 rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
1427 void skip_load_data_infile(
NET *net)
1429 DBUG_ENTER(
"skip_load_data_infile");
1431 (void)net_request_file(net,
"/dev/null");
1438 bool net_request_file(
NET* net,
const char* fname)
1440 DBUG_ENTER(
"net_request_file");
1452 const char *print_slave_db_safe(
const char* db)
1454 DBUG_ENTER(
"*print_slave_db_safe");
1456 DBUG_RETURN((db ? db :
""));
1467 bool is_network_error(uint errorno)
1469 if (errorno == CR_CONNECTION_ERROR ||
1470 errorno == CR_CONN_HOST_ERROR ||
1471 errorno == CR_SERVER_GONE_ERROR ||
1472 errorno == CR_SERVER_LOST ||
1473 errorno == ER_CON_COUNT_ERROR ||
1474 errorno == ER_SERVER_SHUTDOWN)
1503 enum enum_command_status
1504 { COMMAND_STATUS_OK, COMMAND_STATUS_ERROR, COMMAND_STATUS_ALLOWED_ERROR };
1505 static enum_command_status
1506 io_thread_init_command(Master_info *mi,
const char *
query,
int allowed_error,
1508 MYSQL_ROW *master_row= NULL)
1510 DBUG_ENTER(
"io_thread_init_command");
1511 DBUG_PRINT(
"info", (
"IO thread initialization command: '%s'", query));
1512 MYSQL *mysql= mi->mysql;
1513 int ret= mysql_real_query(mysql, query, strlen(query));
1514 if (io_slave_killed(mi->info_thd, mi))
1516 sql_print_information(
"The slave IO thread was killed while executing "
1517 "initialization query '%s'", query);
1518 mysql_free_result(mysql_store_result(mysql));
1519 DBUG_RETURN(COMMAND_STATUS_ERROR);
1523 int err= mysql_errno(mysql);
1524 mysql_free_result(mysql_store_result(mysql));
1525 if (!err || err != allowed_error)
1527 mi->report(is_network_error(err) ? WARNING_LEVEL : ERROR_LEVEL, err,
1528 "The slave IO thread stops because the initialization query "
1529 "'%s' failed with error '%s'.",
1530 query, mysql_error(mysql));
1531 DBUG_RETURN(COMMAND_STATUS_ERROR);
1533 DBUG_RETURN(COMMAND_STATUS_ALLOWED_ERROR);
1535 if (master_res != NULL)
1537 if ((*master_res= mysql_store_result(mysql)) == NULL)
1539 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1540 "The slave IO thread stops because the initialization query "
1541 "'%s' did not return any result.",
1543 DBUG_RETURN(COMMAND_STATUS_ERROR);
1545 if (master_row != NULL)
1547 if ((*master_row= mysql_fetch_row(*master_res)) == NULL)
1549 mysql_free_result(*master_res);
1550 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1551 "The slave IO thread stops because the initialization query "
1552 "'%s' did not return any row.",
1554 DBUG_RETURN(COMMAND_STATUS_ERROR);
1559 DBUG_ASSERT(master_row == NULL);
1560 DBUG_RETURN(COMMAND_STATUS_OK);
1572 int io_thread_init_commands(
MYSQL *mysql, Master_info *mi)
1577 sprintf(query,
"SET @slave_uuid= '%s'", server_uuid);
1578 if (mysql_real_query(mysql, query, strlen(query))
1579 && !check_io_slave_killed(mi->info_thd, mi, NULL))
1582 mysql_free_result(mysql_store_result(mysql));
1586 if (mysql_errno(mysql) && is_network_error(mysql_errno(mysql)))
1588 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1589 "The initialization command '%s' failed with the following"
1590 " error: '%s'.", query, mysql_error(mysql));
1596 const char *errmsg_fmt=
1597 "The slave I/O thread stops because a fatal error is encountered "
1598 "when it tries to send query to master(query: %s).";
1600 sprintf(errmsg, errmsg_fmt, query);
1601 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
1605 mysql_free_result(mysql_store_result(mysql));
1617 static int get_master_uuid(
MYSQL *mysql, Master_info *mi)
1621 MYSQL_ROW master_row= NULL;
1624 DBUG_EXECUTE_IF(
"dbug.before_get_MASTER_UUID",
1626 const char act[]=
"now wait_for signal.get_master_uuid";
1627 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1628 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1629 STRING_WITH_LEN(act)));
1632 DBUG_EXECUTE_IF(
"dbug.simulate_busy_io",
1634 const char act[]=
"now signal Reached wait_for signal.got_stop_slave";
1635 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1636 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1637 STRING_WITH_LEN(act)));
1639 if (!mysql_real_query(mysql,
1640 STRING_WITH_LEN(
"SHOW VARIABLES LIKE 'SERVER_UUID'")) &&
1641 (master_res= mysql_store_result(mysql)) &&
1642 (master_row= mysql_fetch_row(master_res)))
1644 if (!strcmp(::server_uuid, master_row[1]) &&
1645 !mi->rli->replicate_same_server_id)
1647 errmsg=
"The slave I/O thread stops because master and slave have equal "
1648 "MySQL server UUIDs; these UUIDs must be different for "
1649 "replication to work.";
1650 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
1657 if (mi->master_uuid[0] != 0 && strcmp(mi->master_uuid, master_row[1]))
1658 sql_print_warning(
"The master's UUID has changed, although this should"
1659 " not happen unless you have changed it manually."
1660 " The old UUID was %s.",
1662 strncpy(mi->master_uuid, master_row[1], UUID_LENGTH);
1663 mi->master_uuid[UUID_LENGTH]= 0;
1666 else if (mysql_errno(mysql))
1668 if (is_network_error(mysql_errno(mysql)))
1670 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1671 "Get master SERVER_UUID failed with error: %s",
1672 mysql_error(mysql));
1678 errmsg=
"The slave I/O thread stops because a fatal error is encountered "
1679 "when it tries to get the value of SERVER_UUID variable from master.";
1680 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
1685 else if (!master_row && master_res)
1687 mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
1688 "Unknown system variable 'SERVER_UUID' on master. "
1689 "A probable cause is that the variable is not supported on the "
1690 "master (version: %s), even though it is on the slave (version: %s)",
1691 mysql->server_version, server_version);
1695 mysql_free_result(master_res);
1714 static int get_master_version_and_clock(
MYSQL* mysql, Master_info* mi)
1716 char err_buff[MAX_SLAVE_ERRMSG];
1717 const char* errmsg= 0;
1719 int version_number=0;
1720 version_number= atoi(mysql->server_version);
1723 MYSQL_ROW master_row;
1724 DBUG_ENTER(
"get_master_version_and_clock");
1730 DBUG_EXECUTE_IF(
"unrecognized_master_version",
1735 mi->set_mi_description_event(NULL);
1737 if (!my_isdigit(&my_charset_bin,*mysql->server_version))
1739 errmsg =
"Master reported unrecognized MySQL version";
1740 err_code= ER_SLAVE_FATAL_ERROR;
1741 sprintf(err_buff, ER(err_code), errmsg);
1748 switch (version_number)
1753 errmsg =
"Master reported unrecognized MySQL version";
1754 err_code= ER_SLAVE_FATAL_ERROR;
1755 sprintf(err_buff, ER(err_code), errmsg);
1758 mi->set_mi_description_event(
new
1762 mi->set_mi_description_event(
new
1774 mi->set_mi_description_event(
new
1795 if (mi->get_mi_description_event() == NULL)
1798 errmsg=
"default Format_description_log_event";
1799 err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
1800 sprintf(err_buff, ER(err_code), errmsg);
1839 mi->get_mi_description_event()->checksum_alg=
1840 mi->rli->relay_log.relay_log_checksum_alg;
1842 DBUG_ASSERT(mi->get_mi_description_event()->checksum_alg !=
1843 BINLOG_CHECKSUM_ALG_UNDEF);
1844 DBUG_ASSERT(mi->rli->relay_log.relay_log_checksum_alg !=
1845 BINLOG_CHECKSUM_ALG_UNDEF);
1854 DBUG_EXECUTE_IF(
"dbug.before_get_UNIX_TIMESTAMP",
1858 "wait_for signal.get_unix_timestamp";
1859 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1860 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1861 STRING_WITH_LEN(act)));
1865 if (!mysql_real_query(mysql, STRING_WITH_LEN(
"SELECT UNIX_TIMESTAMP()")) &&
1866 (master_res= mysql_store_result(mysql)) &&
1867 (master_row= mysql_fetch_row(master_res)))
1870 mi->clock_diff_with_master=
1871 (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
1874 else if (check_io_slave_killed(mi->info_thd, mi, NULL))
1875 goto slave_killed_err;
1876 else if (is_network_error(mysql_errno(mysql)))
1878 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1879 "Get master clock failed with error: %s", mysql_error(mysql));
1885 mi->clock_diff_with_master= 0;
1887 sql_print_warning(
"\"SELECT UNIX_TIMESTAMP()\" failed on master, "
1888 "do not trust column Seconds_Behind_Master of SHOW "
1889 "SLAVE STATUS. Error: %s (%d)",
1890 mysql_error(mysql), mysql_errno(mysql));
1894 mysql_free_result(master_res);
1908 DBUG_EXECUTE_IF(
"dbug.before_get_SERVER_ID",
1912 "wait_for signal.get_server_id";
1913 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1914 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1915 STRING_WITH_LEN(act)));
1919 if (!mysql_real_query(mysql,
1920 STRING_WITH_LEN(
"SHOW VARIABLES LIKE 'SERVER_ID'")) &&
1921 (master_res= mysql_store_result(mysql)) &&
1922 (master_row= mysql_fetch_row(master_res)))
1924 if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
1925 !mi->rli->replicate_same_server_id)
1927 errmsg=
"The slave I/O thread stops because master and slave have equal \
1928 MySQL server ids; these ids must be different for replication to work (or \
1929 the --replicate-same-server-id option must be used on slave but this does \
1930 not always make sense; please check the manual before using it).";
1931 err_code= ER_SLAVE_FATAL_ERROR;
1932 sprintf(err_buff, ER(err_code), errmsg);
1936 else if (mysql_errno(mysql))
1938 if (check_io_slave_killed(mi->info_thd, mi, NULL))
1939 goto slave_killed_err;
1940 else if (is_network_error(mysql_errno(mysql)))
1942 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1943 "Get master SERVER_ID failed with error: %s", mysql_error(mysql));
1947 errmsg=
"The slave I/O thread stops because a fatal error is encountered \
1948 when it try to get the value of SERVER_ID variable from master.";
1949 err_code= mysql_errno(mysql);
1950 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
1953 else if (!master_row && master_res)
1955 mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
1956 "Unknown system variable 'SERVER_ID' on master, \
1957 maybe it is a *VERY OLD MASTER*.");
1961 mysql_free_result(master_res);
1964 if (mi->master_id == 0 && mi->ignore_server_ids->dynamic_ids.elements > 0)
1966 errmsg=
"Slave configured with server id filtering could not detect the master server id.";
1967 err_code= ER_SLAVE_FATAL_ERROR;
1968 sprintf(err_buff, ER(err_code), errmsg);
1990 if (*mysql->server_version ==
'3')
1993 if (*mysql->server_version ==
'4')
1996 if (!mysql_real_query(mysql,
1997 STRING_WITH_LEN(
"SELECT @@GLOBAL.COLLATION_SERVER")) &&
1998 (master_res= mysql_store_result(mysql)) &&
1999 (master_row= mysql_fetch_row(master_res)))
2001 if (strcmp(master_row[0], global_system_variables.collation_server->name))
2003 errmsg=
"The slave I/O thread stops because master and slave have \
2004 different values for the COLLATION_SERVER global variable. The values must \
2005 be equal for the Statement-format replication to work";
2006 err_code= ER_SLAVE_FATAL_ERROR;
2007 sprintf(err_buff, ER(err_code), errmsg);
2011 else if (check_io_slave_killed(mi->info_thd, mi, NULL))
2012 goto slave_killed_err;
2013 else if (is_network_error(mysql_errno(mysql)))
2015 mi->report(WARNING_LEVEL, mysql_errno(mysql),
2016 "Get master COLLATION_SERVER failed with error: %s", mysql_error(mysql));
2019 else if (mysql_errno(mysql) != ER_UNKNOWN_SYSTEM_VARIABLE)
2022 errmsg=
"The slave I/O thread stops because a fatal error is encountered \
2023 when it try to get the value of COLLATION_SERVER global variable from master.";
2024 err_code= mysql_errno(mysql);
2025 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
2029 mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
2030 "Unknown system variable 'COLLATION_SERVER' on master, \
2031 maybe it is a *VERY OLD MASTER*. *NOTE*: slave may experience \
2032 inconsistency if replicated data deals with collation.");
2036 mysql_free_result(master_res);
2056 if (*mysql->server_version ==
'4')
2059 if (!mysql_real_query(mysql, STRING_WITH_LEN(
"SELECT @@GLOBAL.TIME_ZONE")) &&
2060 (master_res= mysql_store_result(mysql)) &&
2061 (master_row= mysql_fetch_row(master_res)))
2063 if (strcmp(master_row[0],
2064 global_system_variables.time_zone->
get_name()->ptr()))
2066 errmsg=
"The slave I/O thread stops because master and slave have \
2067 different values for the TIME_ZONE global variable. The values must \
2068 be equal for the Statement-format replication to work";
2069 err_code= ER_SLAVE_FATAL_ERROR;
2070 sprintf(err_buff, ER(err_code), errmsg);
2074 else if (check_io_slave_killed(mi->info_thd, mi, NULL))
2075 goto slave_killed_err;
2076 else if (is_network_error(mysql_errno(mysql)))
2078 mi->report(WARNING_LEVEL, mysql_errno(mysql),
2079 "Get master TIME_ZONE failed with error: %s", mysql_error(mysql));
2085 errmsg=
"The slave I/O thread stops because a fatal error is encountered \
2086 when it try to get the value of TIME_ZONE global variable from master.";
2087 err_code= mysql_errno(mysql);
2088 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
2093 mysql_free_result(master_res);
2098 if (mi->heartbeat_period != 0.0)
2101 const char query_format[]=
"SET @master_heartbeat_period= %s";
2102 char query[
sizeof(query_format) - 2 +
sizeof(llbuf)];
2106 llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
2107 sprintf(query, query_format, llbuf);
2109 if (mysql_real_query(mysql, query, strlen(query)))
2111 if (check_io_slave_killed(mi->info_thd, mi, NULL))
2112 goto slave_killed_err;
2114 if (is_network_error(mysql_errno(mysql)))
2116 mi->report(WARNING_LEVEL, mysql_errno(mysql),
2117 "SET @master_heartbeat_period to master failed with error: %s",
2118 mysql_error(mysql));
2119 mysql_free_result(mysql_store_result(mysql));
2125 errmsg=
"The slave I/O thread stops because a fatal error is encountered "
2126 " when it tries to SET @master_heartbeat_period on master.";
2127 err_code= ER_SLAVE_FATAL_ERROR;
2128 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
2129 mysql_free_result(mysql_store_result(mysql));
2133 mysql_free_result(mysql_store_result(mysql));
2142 if (DBUG_EVALUATE_IF(
"simulate_slave_unaware_checksum", 0, 1))
2145 const char query[]=
"SET @master_binlog_checksum= @@global.binlog_checksum";
2147 mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF;
2154 rc= mysql_real_query(mysql, query, strlen(query));
2157 mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_OFF;
2158 if (check_io_slave_killed(mi->info_thd, mi, NULL))
2159 goto slave_killed_err;
2161 if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE)
2164 mi->report(WARNING_LEVEL, mysql_errno(mysql),
2165 "Notifying master by %s failed with "
2166 "error: %s", query, mysql_error(mysql));
2170 if (is_network_error(mysql_errno(mysql)))
2172 mi->report(WARNING_LEVEL, mysql_errno(mysql),
2173 "Notifying master by %s failed with "
2174 "error: %s", query, mysql_error(mysql));
2175 mysql_free_result(mysql_store_result(mysql));
2180 errmsg=
"The slave I/O thread stops because a fatal error is encountered "
2181 "when it tried to SET @master_binlog_checksum on master.";
2182 err_code= ER_SLAVE_FATAL_ERROR;
2183 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
2184 mysql_free_result(mysql_store_result(mysql));
2191 mysql_free_result(mysql_store_result(mysql));
2192 if (!mysql_real_query(mysql,
2193 STRING_WITH_LEN(
"SELECT @master_binlog_checksum")) &&
2194 (master_res= mysql_store_result(mysql)) &&
2195 (master_row= mysql_fetch_row(master_res)) &&
2196 (master_row[0] != NULL))
2198 mi->checksum_alg_before_fd= (uint8)
2199 find_type(master_row[0], &binlog_checksum_typelib, 1) - 1;
2201 DBUG_EXECUTE_IF(
"undefined_algorithm_on_slave",
2202 mi->checksum_alg_before_fd = BINLOG_CHECKSUM_ALG_UNDEF;);
2203 if(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_UNDEF)
2205 errmsg=
"The slave I/O thread was stopped because a fatal error is encountered "
2206 "The checksum algorithm used by master is unknown to slave.";
2207 err_code= ER_SLAVE_FATAL_ERROR;
2208 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
2209 mysql_free_result(mysql_store_result(mysql));
2214 DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF ||
2215 mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32);
2217 else if (check_io_slave_killed(mi->info_thd, mi, NULL))
2218 goto slave_killed_err;
2219 else if (is_network_error(mysql_errno(mysql)))
2221 mi->report(WARNING_LEVEL, mysql_errno(mysql),
2222 "Get master BINLOG_CHECKSUM failed with error: %s", mysql_error(mysql));
2227 errmsg=
"The slave I/O thread stops because a fatal error is encountered "
2228 "when it tried to SELECT @master_binlog_checksum.";
2229 err_code= ER_SLAVE_FATAL_ERROR;
2230 sprintf(err_buff,
"%s Error: %s", errmsg, mysql_error(mysql));
2231 mysql_free_result(mysql_store_result(mysql));
2237 mysql_free_result(master_res);
2242 mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_OFF;
2244 if (DBUG_EVALUATE_IF(
"simulate_slave_unaware_gtid", 0, 1))
2246 switch (io_thread_init_command(mi,
"SELECT @@GLOBAL.GTID_MODE",
2247 ER_UNKNOWN_SYSTEM_VARIABLE,
2248 &master_res, &master_row))
2250 case COMMAND_STATUS_ERROR:
2252 case COMMAND_STATUS_ALLOWED_ERROR:
2254 mi->master_gtid_mode= 0;
2256 case COMMAND_STATUS_OK:
2257 int typelib_index= find_type(master_row[0], >id_mode_typelib, 1);
2258 mysql_free_result(master_res);
2259 if (typelib_index == 0)
2261 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2262 "The slave IO thread stops because the master has "
2263 "an unknown @@GLOBAL.GTID_MODE.");
2266 mi->master_gtid_mode= typelib_index - 1;
2269 if (mi->master_gtid_mode > gtid_mode + 1 ||
2270 gtid_mode > mi->master_gtid_mode + 1)
2272 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2273 "The slave IO thread stops because the master has "
2274 "@@GLOBAL.GTID_MODE %s and this server has "
2275 "@@GLOBAL.GTID_MODE %s",
2276 gtid_mode_names[mi->master_gtid_mode],
2277 gtid_mode_names[gtid_mode]);
2280 if (mi->is_auto_position() && mi->master_gtid_mode != 3)
2282 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2283 "The slave IO thread stops because the master has "
2284 "@@GLOBAL.GTID_MODE %s and we are trying to connect "
2285 "using MASTER_AUTO_POSITION.",
2286 gtid_mode_names[mi->master_gtid_mode]);
2295 mysql_free_result(master_res);
2296 DBUG_ASSERT(err_code != 0);
2297 mi->report(ERROR_LEVEL, err_code,
"%s", err_buff);
2305 mysql_free_result(master_res);
2310 mysql_free_result(master_res);
2316 bool slave_killed=0;
2317 Master_info* mi = rli->mi;
2319 THD* thd = mi->info_thd;
2320 DBUG_ENTER(
"wait_for_relay_log_space");
2323 thd->ENTER_COND(&rli->log_space_cond,
2324 &rli->log_space_lock,
2325 &stage_waiting_for_relay_log_space,
2327 while (rli->log_space_limit < rli->log_space_total &&
2328 !(slave_killed=io_slave_killed(thd,mi)) &&
2329 !rli->ignore_log_space_limit)
2355 if (rli->ignore_log_space_limit)
2359 char llbuf1[22], llbuf2[22];
2360 DBUG_PRINT(
"info", (
"log_space_limit=%s "
2361 "log_space_total=%s "
2362 "ignore_log_space_limit=%d "
2363 "sql_force_rotate_relay=%d",
2364 llstr(rli->log_space_limit,llbuf1),
2365 llstr(rli->log_space_total,llbuf2),
2366 (
int) rli->ignore_log_space_limit,
2367 (
int) rli->sql_force_rotate_relay));
2370 if (rli->sql_force_rotate_relay)
2373 rotate_relay_log(mi);
2375 rli->sql_force_rotate_relay=
false;
2378 rli->ignore_log_space_limit=
false;
2381 thd->EXIT_COND(&old_stage);
2382 DBUG_RETURN(slave_killed);
2396 static int write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
2401 DBUG_ENTER(
"write_ignored_events_info_to_relay_log");
2403 DBUG_ASSERT(thd == mi->info_thd);
2406 if (rli->ign_master_log_name_end[0])
2408 DBUG_PRINT(
"info",(
"writing a Rotate event to track down ignored events"));
2410 0, rli->ign_master_log_pos_end,
2411 Rotate_log_event::DUP_NAME);
2412 if (mi->get_mi_description_event() != NULL)
2413 ev->checksum_alg= mi->get_mi_description_event()->checksum_alg;
2415 rli->ign_master_log_name_end[0]= 0;
2418 if (likely((
bool)ev))
2421 if (unlikely(rli->relay_log.append_event(ev, mi) != 0))
2422 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2423 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2424 "failed to write a Rotate event"
2425 " to the relay log, SHOW SLAVE STATUS may be"
2427 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2428 if (flush_master_info(mi, TRUE))
2431 sql_print_error(
"Failed to flush master info file.");
2438 mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
2439 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
2440 "Rotate_event (out of memory?),"
2441 " SHOW SLAVE STATUS may be inaccurate");
2451 int register_slave_on_master(
MYSQL* mysql, Master_info *mi,
2452 bool *suppress_warnings)
2454 uchar
buf[1024], *pos=
buf;
2455 uint report_host_len=0, report_user_len=0, report_password_len=0;
2456 DBUG_ENTER(
"register_slave_on_master");
2458 *suppress_warnings= FALSE;
2460 report_host_len= strlen(report_host);
2461 if (report_host_len > HOSTNAME_LENGTH)
2463 sql_print_warning(
"The length of report_host is %d. "
2464 "It is larger than the max length(%d), so this "
2465 "slave cannot be registered to the master.",
2466 report_host_len, HOSTNAME_LENGTH);
2471 report_user_len= strlen(report_user);
2472 if (report_user_len > USERNAME_LENGTH)
2474 sql_print_warning(
"The length of report_user is %d. "
2475 "It is larger than the max length(%d), so this "
2476 "slave cannot be registered to the master.",
2477 report_user_len, USERNAME_LENGTH);
2481 if (report_password)
2482 report_password_len= strlen(report_password);
2483 if (report_password_len > MAX_PASSWORD_LENGTH)
2485 sql_print_warning(
"The length of report_password is %d. "
2486 "It is larger than the max length(%d), so this "
2487 "slave cannot be registered to the master.",
2488 report_password_len, MAX_PASSWORD_LENGTH);
2492 int4store(pos, server_id); pos+= 4;
2493 pos= net_store_data(pos, (uchar*) report_host, report_host_len);
2494 pos= net_store_data(pos, (uchar*) report_user, report_user_len);
2495 pos= net_store_data(pos, (uchar*) report_password, report_password_len);
2496 int2store(pos, (uint16) report_port); pos+= 2;
2502 int4store(pos, 0); pos+= 4;
2504 int4store(pos, 0); pos+= 4;
2506 if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (
size_t) (pos- buf), 0))
2508 if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
2510 *suppress_warnings= TRUE;
2512 else if (!check_io_slave_killed(mi->info_thd, mi, NULL))
2515 my_snprintf(buf,
sizeof(buf),
"%s (Errno: %d)", mysql_error(mysql),
2516 mysql_errno(mysql));
2517 mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
2518 ER(ER_SLAVE_MASTER_COM_FAILURE),
"COM_REGISTER_SLAVE", buf);
2537 bool show_slave_status(THD* thd, Master_info* mi)
2542 char *slave_sql_running_state= NULL;
2543 char *sql_gtid_set_buffer= NULL, *io_gtid_set_buffer= NULL;
2544 int sql_gtid_set_size= 0, io_gtid_set_size= 0;
2545 DBUG_ENTER(
"show_slave_status");
2549 global_sid_lock->
wrlock();
2551 const Gtid_set* io_gtid_set= mi->rli->get_gtid_set();
2552 if ((sql_gtid_set_size= sql_gtid_set->
to_string(&sql_gtid_set_buffer)) < 0 ||
2553 (io_gtid_set_size= io_gtid_set->
to_string(&io_gtid_set_buffer)) < 0)
2556 my_free(sql_gtid_set_buffer);
2557 my_free(io_gtid_set_buffer);
2558 global_sid_lock->
unlock();
2561 global_sid_lock->
unlock();
2567 sizeof(mi->host) : 0));
2569 mi->get_user_size() : 0));
2577 MYSQL_TYPE_LONGLONG));
2581 MYSQL_TYPE_LONGLONG));
2593 field_list.push_back(
new Item_return_int(
"Last_Errno", 4, MYSQL_TYPE_LONG));
2598 MYSQL_TYPE_LONGLONG));
2600 MYSQL_TYPE_LONGLONG));
2604 MYSQL_TYPE_LONGLONG));
2607 sizeof(mi->ssl_ca) : 0));
2609 sizeof(mi->ssl_capath) : 0));
2611 sizeof(mi->ssl_cert) : 0));
2613 sizeof(mi->ssl_cipher) : 0));
2615 sizeof(mi->ssl_key) : 0));
2617 MYSQL_TYPE_LONGLONG));
2620 field_list.push_back(
new Item_return_int(
"Last_IO_Errno", 4, MYSQL_TYPE_LONG));
2622 field_list.push_back(
new Item_return_int(
"Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
2626 field_list.push_back(
new Item_return_int(
"Master_Server_Id",
sizeof(ulong),
2631 field_list.push_back(
new Item_return_int(
"SQL_Delay", 10, MYSQL_TYPE_LONG));
2632 field_list.push_back(
new Item_return_int(
"SQL_Remaining_Delay", 8, MYSQL_TYPE_LONG));
2635 MYSQL_TYPE_LONGLONG));
2637 sizeof(mi->bind_addr) : 0));
2641 sizeof(mi->ssl_crl) : 0));
2643 sizeof(mi->ssl_crlpath) : 0));
2647 sql_gtid_set_size));
2648 field_list.push_back(
new Item_return_int(
"Auto_Position",
sizeof(ulong),
2652 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2654 my_free(sql_gtid_set_buffer);
2655 my_free(io_gtid_set_buffer);
2659 if (mi != NULL && mi->host[0])
2661 DBUG_PRINT(
"info",(
"host is set: '%s'", mi->host));
2662 String *packet= &thd->packet;
2663 protocol->prepare_for_resend();
2670 protocol->
store(mi->info_thd ? mi->info_thd->get_proc_info() :
"", &my_charset_bin);
2674 slave_sql_running_state=
const_cast<char *
>(mi->rli->info_thd ? mi->rli->info_thd->get_proc_info() :
"");
2682 DEBUG_SYNC(thd,
"wait_after_lock_active_mi_and_rli_data_lock_is_acquired");
2683 protocol->
store(mi->host, &my_charset_bin);
2684 protocol->
store(mi->get_user(), &my_charset_bin);
2685 protocol->
store((uint32) mi->port);
2686 protocol->
store((uint32) mi->connect_retry);
2687 protocol->
store(mi->get_master_log_name(), &my_charset_bin);
2688 protocol->
store((ulonglong) mi->get_master_log_pos());
2689 protocol->
store(mi->rli->get_group_relay_log_name() +
2690 dirname_length(mi->rli->get_group_relay_log_name()),
2692 protocol->
store((ulonglong) mi->rli->get_group_relay_log_pos());
2693 protocol->
store(mi->rli->get_group_master_log_name(), &my_charset_bin);
2694 protocol->
store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
2695 "Yes" : (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT ?
2696 "Connecting" :
"No"), &my_charset_bin);
2697 protocol->
store(mi->rli->slave_running ?
"Yes":
"No", &my_charset_bin);
2698 protocol->
store(rpl_filter->get_do_db());
2699 protocol->
store(rpl_filter->get_ignore_db());
2702 String tmp(buf,
sizeof(buf), &my_charset_bin);
2703 rpl_filter->get_do_table(&tmp);
2704 protocol->
store(&tmp);
2705 rpl_filter->get_ignore_table(&tmp);
2706 protocol->
store(&tmp);
2707 rpl_filter->get_wild_do_table(&tmp);
2708 protocol->
store(&tmp);
2709 rpl_filter->get_wild_ignore_table(&tmp);
2710 protocol->
store(&tmp);
2712 protocol->
store(mi->rli->last_error().number);
2713 protocol->
store(mi->rli->last_error().message, &my_charset_bin);
2714 protocol->
store((uint32) mi->rli->slave_skip_counter);
2715 protocol->
store((ulonglong) mi->rli->get_group_master_log_pos());
2716 protocol->
store((ulonglong) mi->rli->log_space_total);
2718 const char *until_type=
"";
2720 switch (mi->rli->until_condition)
2722 case Relay_log_info::UNTIL_NONE:
2725 case Relay_log_info::UNTIL_MASTER_POS:
2726 until_type=
"Master";
2728 case Relay_log_info::UNTIL_RELAY_POS:
2729 until_type=
"Relay";
2731 case Relay_log_info::UNTIL_SQL_BEFORE_GTIDS:
2732 until_type=
"SQL_BEFORE_GTIDS";
2734 case Relay_log_info::UNTIL_SQL_AFTER_GTIDS:
2735 until_type=
"SQL_AFTER_GTIDS";
2737 case Relay_log_info::UNTIL_SQL_AFTER_MTS_GAPS:
2738 until_type=
"SQL_AFTER_MTS_GAPS";
2740 case Relay_log_info::UNTIL_DONE:
2747 protocol->
store(until_type, &my_charset_bin);
2748 protocol->
store(mi->rli->until_log_name, &my_charset_bin);
2749 protocol->
store((ulonglong) mi->rli->until_log_pos);
2752 protocol->
store(mi->ssl?
"Yes":
"No", &my_charset_bin);
2754 protocol->
store(mi->ssl?
"Ignored":
"No", &my_charset_bin);
2756 protocol->
store(mi->ssl_ca, &my_charset_bin);
2757 protocol->
store(mi->ssl_capath, &my_charset_bin);
2758 protocol->
store(mi->ssl_cert, &my_charset_bin);
2759 protocol->
store(mi->ssl_cipher, &my_charset_bin);
2760 protocol->
store(mi->ssl_key, &my_charset_bin);
2779 if (mi->rli->slave_running)
2786 if ((mi->get_master_log_pos() == mi->rli->get_group_master_log_pos()) &&
2787 (!strcmp(mi->get_master_log_name(), mi->rli->get_group_master_log_name())))
2789 if (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT)
2790 protocol->
store(0LL);
2792 protocol->store_null();
2796 long time_diff= ((long)(time(0) - mi->rli->last_master_timestamp)
2797 - mi->clock_diff_with_master);
2818 protocol->
store((longlong)(mi->rli->last_master_timestamp ?
2819 max(0L, time_diff) : 0));
2824 protocol->store_null();
2826 protocol->
store(mi->ssl_verify_server_cert?
"Yes":
"No", &my_charset_bin);
2829 protocol->
store(mi->last_error().number);
2831 protocol->
store(mi->last_error().message, &my_charset_bin);
2833 protocol->
store(mi->rli->last_error().number);
2835 protocol->
store(mi->rli->last_error().message, &my_charset_bin);
2838 char buff[FN_REFLEN];
2840 for (i= 0, buff[0]= 0, cur_len= 0;
2841 i < mi->ignore_server_ids->dynamic_ids.elements; i++)
2844 char sbuff[FN_REFLEN];
2845 get_dynamic(&(mi->ignore_server_ids->dynamic_ids), (uchar*) &s_id, i);
2846 slen= sprintf(sbuff, (i == 0 ?
"%lu" :
", %lu"), s_id);
2847 if (cur_len + slen + 4 > FN_REFLEN)
2853 sprintf(buff + cur_len,
"...");
2856 cur_len += sprintf(buff + cur_len,
"%s", sbuff);
2858 protocol->
store(buff, &my_charset_bin);
2861 protocol->
store((uint32) mi->master_id);
2862 protocol->
store(mi->master_uuid, &my_charset_bin);
2864 protocol->
store(mi->get_description_info(), &my_charset_bin);
2866 protocol->
store((uint32) mi->rli->get_sql_delay());
2868 if (slave_sql_running_state == stage_sql_thd_waiting_until_delay.
m_name)
2870 time_t t= my_time(0), sql_delay_end= mi->rli->get_sql_delay_end();
2871 protocol->
store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0));
2874 protocol->store_null();
2876 protocol->
store(slave_sql_running_state, &my_charset_bin);
2878 protocol->
store((ulonglong) mi->retry_count);
2880 protocol->
store(mi->bind_addr, &my_charset_bin);
2882 protocol->
store(mi->last_error().timestamp, &my_charset_bin);
2884 protocol->
store(mi->rli->last_error().timestamp, &my_charset_bin);
2886 protocol->
store(mi->ssl_ca, &my_charset_bin);
2888 protocol->
store(mi->ssl_capath, &my_charset_bin);
2890 protocol->
store(io_gtid_set_buffer, &my_charset_bin);
2892 protocol->
store(sql_gtid_set_buffer, &my_charset_bin);
2894 protocol->
store(mi->is_auto_position() ? 1 : 0);
2901 if (
my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
2903 my_free(sql_gtid_set_buffer);
2904 my_free(io_gtid_set_buffer);
2909 my_free(sql_gtid_set_buffer);
2910 my_free(io_gtid_set_buffer);
2915 void set_slave_thread_options(THD* thd)
2917 DBUG_ENTER(
"set_slave_thread_options");
2927 ulonglong options= thd->variables.option_bits | OPTION_BIG_SELECTS;
2928 if (opt_log_slave_updates)
2929 options|= OPTION_BIN_LOG;
2931 options&= ~OPTION_BIN_LOG;
2932 thd->variables.option_bits= options;
2933 thd->variables.completion_type= 0;
2941 if ((thd->variables.option_bits & OPTION_NOT_AUTOCOMMIT) &&
2942 (opt_mi_repository_id == INFO_REPOSITORY_TABLE ||
2943 opt_rli_repository_id == INFO_REPOSITORY_TABLE))
2946 thd->variables.option_bits&= ~OPTION_NOT_AUTOCOMMIT;
2947 thd->server_status|= SERVER_STATUS_AUTOCOMMIT;
2953 void set_slave_thread_default_charset(THD* thd,
Relay_log_info const *rli)
2955 DBUG_ENTER(
"set_slave_thread_default_charset");
2957 thd->variables.character_set_client=
2958 global_system_variables.character_set_client;
2959 thd->variables.collation_connection=
2960 global_system_variables.collation_connection;
2961 thd->variables.collation_server=
2962 global_system_variables.collation_server;
2963 thd->update_charset();
2979 static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
2981 DBUG_ENTER(
"init_slave_thread");
2982 #if !defined(DBUG_OFF)
2983 int simulate_error= 0;
2985 thd->system_thread= (thd_type == SLAVE_THD_WORKER) ?
2986 SYSTEM_THREAD_SLAVE_WORKER : (thd_type == SLAVE_THD_SQL) ?
2987 SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
2988 thd->security_ctx->skip_grants();
2990 thd->slave_thread = 1;
2991 thd->enable_slow_log= opt_log_slow_slave_statements;
2992 set_slave_thread_options(thd);
2993 thd->client_capabilities = CLIENT_LOCAL_FILES;
2995 thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
2998 DBUG_EXECUTE_IF(
"simulate_io_slave_error_on_init",
2999 simulate_error|= (1 << SLAVE_THD_IO););
3000 DBUG_EXECUTE_IF(
"simulate_sql_slave_error_on_init",
3001 simulate_error|= (1 << SLAVE_THD_SQL););
3002 #if !defined(DBUG_OFF)
3003 if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
3005 if (init_thr_lock() || thd->store_globals())
3011 if (thd_type == SLAVE_THD_SQL)
3013 THD_STAGE_INFO(thd, stage_waiting_for_the_next_event_in_relay_log);
3017 THD_STAGE_INFO(thd, stage_waiting_for_master_update);
3021 thd->variables.lock_wait_timeout= LONG_TIMEOUT;
3036 template <
typename killed_func,
typename rpl_info>
3037 static inline bool slave_sleep(THD *thd, time_t seconds,
3038 killed_func func, rpl_info info)
3046 set_timespec(abstime, seconds);
3049 thd->ENTER_COND(cond, lock, NULL, NULL);
3051 while (! (ret= func(thd, info)))
3054 if (error == ETIMEDOUT || error == ETIME)
3059 thd->EXIT_COND(NULL);
3064 static int request_dump(THD *thd,
MYSQL* mysql, Master_info* mi,
3065 bool *suppress_warnings)
3067 DBUG_ENTER(
"request_dump");
3069 const int BINLOG_NAME_INFO_SIZE= strlen(mi->get_master_log_name());
3071 size_t command_size= 0;
3072 enum_server_command command= mi->is_auto_position() ?
3073 COM_BINLOG_DUMP_GTID : COM_BINLOG_DUMP;
3074 uchar* command_buffer= NULL;
3075 ushort binlog_flags= 0;
3077 if (RUN_HOOK(binlog_relay_io,
3078 before_request_transmit,
3079 (thd, mi, binlog_flags)))
3082 *suppress_warnings=
false;
3083 if (command == COM_BINLOG_DUMP_GTID)
3088 global_sid_lock->
wrlock();
3090 if (gtid_executed.add_gtid_set(mi->rli->get_gtid_set()) != RETURN_STATUS_OK ||
3094 global_sid_lock->
unlock();
3097 global_sid_lock->
unlock();
3100 size_t encoded_data_size= gtid_executed.get_encoded_length();
3101 size_t allocation_size=
3102 ::BINLOG_FLAGS_INFO_SIZE + ::BINLOG_SERVER_ID_INFO_SIZE +
3103 ::BINLOG_NAME_SIZE_INFO_SIZE + BINLOG_NAME_INFO_SIZE +
3104 ::BINLOG_POS_INFO_SIZE + ::BINLOG_DATA_SIZE_INFO_SIZE +
3105 encoded_data_size + 1;
3106 if (!(command_buffer= (uchar *) my_malloc(allocation_size, MYF(MY_WME))))
3108 uchar* ptr_buffer= command_buffer;
3110 DBUG_PRINT(
"info", (
"Do I know something about the master? (binary log's name %s - auto position %d).",
3111 mi->get_master_log_name(), mi->is_auto_position()));
3120 int2store(ptr_buffer, binlog_flags);
3121 ptr_buffer+= ::BINLOG_FLAGS_INFO_SIZE;
3122 int4store(ptr_buffer, server_id);
3123 ptr_buffer+= ::BINLOG_SERVER_ID_INFO_SIZE;
3124 int4store(ptr_buffer, BINLOG_NAME_INFO_SIZE);
3125 ptr_buffer+= ::BINLOG_NAME_SIZE_INFO_SIZE;
3126 memset(ptr_buffer, 0, BINLOG_NAME_INFO_SIZE);
3127 ptr_buffer+= BINLOG_NAME_INFO_SIZE;
3128 int8store(ptr_buffer, 4LL);
3129 ptr_buffer+= ::BINLOG_POS_INFO_SIZE;
3131 int4store(ptr_buffer, encoded_data_size);
3132 ptr_buffer+= ::BINLOG_DATA_SIZE_INFO_SIZE;
3133 gtid_executed.encode(ptr_buffer);
3134 ptr_buffer+= encoded_data_size;
3136 command_size= ptr_buffer - command_buffer;
3137 DBUG_ASSERT(command_size == (allocation_size - 1));
3141 size_t allocation_size= ::BINLOG_POS_OLD_INFO_SIZE +
3142 BINLOG_NAME_INFO_SIZE + ::BINLOG_FLAGS_INFO_SIZE +
3143 ::BINLOG_SERVER_ID_INFO_SIZE + 1;
3144 if (!(command_buffer= (uchar *) my_malloc(allocation_size, MYF(MY_WME))))
3146 uchar* ptr_buffer= command_buffer;
3148 int4store(ptr_buffer, mi->get_master_log_pos());
3149 ptr_buffer+= ::BINLOG_POS_OLD_INFO_SIZE;
3151 int2store(ptr_buffer, binlog_flags);
3152 ptr_buffer+= ::BINLOG_FLAGS_INFO_SIZE;
3153 int4store(ptr_buffer, server_id);
3154 ptr_buffer+= ::BINLOG_SERVER_ID_INFO_SIZE;
3155 memcpy(ptr_buffer, mi->get_master_log_name(), BINLOG_NAME_INFO_SIZE);
3156 ptr_buffer+= BINLOG_NAME_INFO_SIZE;
3158 command_size= ptr_buffer - command_buffer;
3159 DBUG_ASSERT(command_size == (allocation_size - 1));
3162 if (simple_command(mysql, command, command_buffer, command_size, 1))
3169 if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
3170 *suppress_warnings=
true;
3172 sql_print_error(
"Error on %s: %d %s, will retry in %d secs",
3173 command_name[command].str,
3174 mysql_errno(mysql), mysql_error(mysql),
3181 my_free(command_buffer);
3203 static ulong read_event(
MYSQL* mysql, Master_info *mi,
bool* suppress_warnings)
3206 DBUG_ENTER(
"read_event");
3208 *suppress_warnings= FALSE;
3214 if (disconnect_slave_event_count && !(mi->events_until_exit--))
3215 DBUG_RETURN(packet_error);
3218 len = cli_safe_read(mysql);
3219 if (len == packet_error || (
long) len < 1)
3221 if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
3228 *suppress_warnings= TRUE;
3231 sql_print_error(
"Error reading packet from server: %s ( server_errno=%d)",
3232 mysql_error(mysql), mysql_errno(mysql));
3233 DBUG_RETURN(packet_error);
3237 if (len < 8 && mysql->net.read_pos[0] == 254)
3239 sql_print_information(
"Slave: received end packet from server, apparent "
3240 "master shutdown: %s",
3241 mysql_error(mysql));
3242 DBUG_RETURN(packet_error);
3245 DBUG_PRINT(
"exit", (
"len: %lu net->read_pos[4]: %d",
3246 len, mysql->net.read_pos[4]));
3247 DBUG_RETURN(len - 1);
3270 long sql_delay= rli->get_sql_delay();
3272 DBUG_ENTER(
"sql_delay_event");
3274 DBUG_ASSERT(!rli->belongs_to_client());
3276 int type= ev->get_type_code();
3277 if (sql_delay && type != ROTATE_EVENT &&
3278 type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3)
3281 time_t sql_delay_end=
3282 ev->when.tv_sec + rli->mi->clock_diff_with_master + sql_delay;
3284 time_t now= my_time(0);
3286 unsigned long nap_time= 0;
3287 if (sql_delay_end > now)
3288 nap_time= sql_delay_end - now;
3290 DBUG_PRINT(
"info", (
"sql_delay= %lu "
3292 "rli->mi->clock_diff_with_master= %lu "
3294 "sql_delay_end= %ld "
3296 sql_delay, (
long) ev->when.tv_sec,
3297 rli->mi->clock_diff_with_master,
3298 (
long)now, (
long)sql_delay_end, (
long)nap_time));
3300 if (sql_delay_end > now)
3302 DBUG_PRINT(
"info", (
"delaying replication event %lu secs",
3306 DBUG_RETURN(slave_sleep(thd, nap_time, sql_slave_killed, rli));
3319 int ulong_cmp(ulong *id1, ulong *id2)
3321 return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
3378 enum enum_slave_apply_event_and_update_pos_retval
3382 bool skip_event= FALSE;
3386 DBUG_ENTER(
"apply_event_and_update_pos");
3388 DBUG_PRINT(
"exec_event",(
"%s(type_code: %d; server_id: %d)",
3391 DBUG_PRINT(
"info", (
"thd->options: %s%s; rli->last_event_start_time: %lu",
3392 FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
3393 FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
3394 (ulong) rli->last_event_start_time));
3422 thd->server_id = ev->server_id;
3423 thd->unmasked_server_id = ev->unmasked_server_id;
3425 thd->lex->current_select= 0;
3426 if (!ev->when.tv_sec)
3427 my_micro_time_to_timeval(my_micro_time(), &ev->when);
3431 rli->mts_recovery_index)))
3433 reason= ev->shall_skip(rli);
3438 DBUG_PRINT(
"mts", (
"Mts is recovering %d, number of bits set %d, "
3439 "bitmap is set %d, index %lu.\n",
3441 bitmap_bits_set(&rli->recovery_groups),
3442 bitmap_is_set(&rli->recovery_groups,
3443 rli->mts_recovery_index),
3444 rli->mts_recovery_index));
3449 sql_slave_skip_counter= --rli->slave_skip_counter;
3455 if (sql_delay_event(ev, thd, rli))
3456 DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK);
3458 exec_res= ev->apply_event(rli);
3460 if (!exec_res && (ev->
worker != rli))
3464 Slave_job_item item= {ev}, *job_item= &item;
3465 Slave_worker *w= (Slave_worker *) ev->
worker;
3467 bool need_sync= ev->is_mts_group_isolated();
3470 DBUG_ASSERT(((Slave_worker*) ev->
worker) == rli->last_assigned_worker);
3472 DBUG_PRINT(
"Log_event::apply_event:",
3473 (
"-> job item data %p to W_%lu", job_item->data, w->id));
3476 if (rli->mts_group_status == Relay_log_info::MTS_END_GROUP)
3479 for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--)
3480 delete_dynamic_element(&rli->
3481 curr_group_assigned_parts, i - 1);
3483 rli->curr_group_seen_begin= rli->curr_group_seen_gtid=
false;
3484 rli->last_assigned_worker= NULL;
3492 bool append_item_to_jobs_error=
false;
3493 if (rli->curr_group_da.elements > 0)
3499 for (uint i= 0; i < rli->curr_group_da.elements; i++)
3501 Slave_job_item da_item;
3502 get_dynamic(&rli->curr_group_da, (uchar*) &da_item.data, i);
3503 DBUG_PRINT(
"mts", (
"Assigning job %llu to worker %lu",
3504 ((
Log_event* )da_item.data)->log_pos, w->id));
3505 static_cast<Log_event*
>(da_item.data)->mts_group_idx=
3506 rli->gaq->assigned_group_index;
3507 if (!append_item_to_jobs_error)
3508 append_item_to_jobs_error= append_item_to_jobs(&da_item, w, rli);
3509 if (append_item_to_jobs_error)
3510 delete static_cast<Log_event*
>(da_item.data);
3512 if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
3515 rli->curr_group_da.elements= rli->curr_group_da.max_element;
3516 rli->curr_group_da.max_element= 0;
3517 freeze_size(&rli->curr_group_da);
3519 rli->curr_group_da.elements= 0;
3521 if (append_item_to_jobs_error)
3522 DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR);
3524 DBUG_PRINT(
"mts", (
"Assigning job %llu to worker %lu\n",
3525 ((
Log_event* )job_item->data)->log_pos, w->id));
3528 if (append_item_to_jobs(job_item, w, rli))
3529 DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR);
3540 (void) wait_for_workers_to_finish(rli);
3546 if (log_warnings > 1 &&
3549 time_t my_now= my_time(0);
3551 if ((my_now - rli->mts_last_online_stat) >=
3552 mts_online_stat_period)
3554 sql_print_information(
"Multi-threaded slave statistics: "
3555 "seconds elapsed = %lu; "
3556 "events assigned = %llu; "
3557 "worker queues filled over overrun level = %lu; "
3558 "waited due a Worker queue full = %lu; "
3559 "waited due the total size = %lu; "
3560 "slept when Workers occupied = %lu ",
3561 static_cast<unsigned long>
3562 (my_now - rli->mts_last_online_stat),
3563 rli->mts_events_assigned,
3564 rli->mts_wq_overrun_cnt,
3565 rli->mts_wq_overfill_cnt,
3566 rli->wq_size_waits_cnt,
3567 rli->mts_wq_no_underrun_cnt);
3568 rli->mts_last_online_stat= my_now;
3576 DBUG_PRINT(
"info", (
"apply_event error = %d", exec_res));
3591 (ev->get_type_code() != XID_EVENT ||
3593 (ev->ends_group() || !rli->mts_recovery_group_seen_begin) &&
3594 bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index))))
3602 static const char *
const explain[] = {
3606 "skipped because event should be ignored",
3608 "skipped because event skip counter was non-zero"
3610 DBUG_PRINT(
"info", (
"OPTION_BEGIN: %d; IN_STMT: %d",
3611 test(thd->variables.option_bits & OPTION_BEGIN),
3613 DBUG_PRINT(
"skip_event", (
"%s event was %s",
3617 error= ev->update_pos(rli);
3620 DBUG_PRINT(
"info", (
"update_pos error = %d", error));
3621 if (!rli->belongs_to_client())
3624 DBUG_PRINT(
"info", (
"group %s %s",
3625 llstr(rli->get_group_relay_log_pos(),
buf),
3626 rli->get_group_relay_log_name()));
3627 DBUG_PRINT(
"info", (
"event %s %s",
3628 llstr(rli->get_event_relay_log_pos(),
buf),
3629 rli->get_event_relay_log_name()));
3637 (ev->get_type_code() == INTVAR_EVENT ||
3638 ev->get_type_code() == RAND_EVENT ||
3639 ev->get_type_code() == USER_VAR_EVENT)));
3641 rli->inc_event_relay_log_pos();
3645 ev->get_type_code() != ROTATE_EVENT &&
3646 ev->get_type_code() != FORMAT_DESCRIPTION_EVENT &&
3647 ev->get_type_code() != PREVIOUS_GTIDS_LOG_EVENT)
3649 if (ev->starts_group())
3651 rli->mts_recovery_group_seen_begin=
true;
3653 else if ((ev->ends_group() || !rli->mts_recovery_group_seen_begin) &&
3656 rli->mts_recovery_index++;
3657 if (--rli->mts_recovery_group_cnt == 0)
3659 rli->mts_recovery_index= 0;
3660 sql_print_information(
"Slave: MTS Recovery has completed at "
3661 "relay log %s, position %llu "
3662 "master log %s, position %llu.",
3663 rli->get_group_relay_log_name(),
3664 rli->get_group_relay_log_pos(),
3665 rli->get_group_master_log_name(),
3666 rli->get_group_master_log_pos());
3676 if (rli->until_condition == Relay_log_info::UNTIL_SQL_AFTER_MTS_GAPS)
3678 rli->until_condition= Relay_log_info::UNTIL_DONE;
3684 (void) Rpl_info_factory::reset_workers(rli);
3687 rli->mts_recovery_group_seen_begin=
false;
3703 rli->
report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3704 "It was not possible to update the positions"
3705 " of the relay log information: the slave may"
3706 " be in an inconsistent state."
3707 " Stopped in %s position %s",
3708 rli->get_group_relay_log_name(),
3709 llstr(rli->get_group_relay_log_pos(),
buf));
3710 DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR);
3714 DBUG_RETURN(exec_res ? SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR
3715 : SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK);
3750 DBUG_ENTER(
"exec_relay_log_event");
3767 if (rli->until_condition == Relay_log_info::UNTIL_SQL_AFTER_GTIDS &&
3770 rli->abort_slave= 1;
3775 Log_event *ev = next_event(rli), **ptr_ev;
3777 DBUG_ASSERT(rli->info_thd==thd);
3779 if (sql_slave_killed(thd,rli))
3787 enum enum_slave_apply_event_and_update_pos_retval exec_res;
3800 ev->is_artificial_event() || ev->is_relay_log_event() ||
3801 (ev->when.tv_sec == 0) || ev->get_type_code() == FORMAT_DESCRIPTION_EVENT))
3803 rli->last_master_timestamp= ev->when.tv_sec + (time_t) ev->exec_time;
3804 DBUG_ASSERT(rli->last_master_timestamp >= 0);
3815 if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
3816 rli->until_condition != Relay_log_info::UNTIL_SQL_AFTER_GTIDS &&
3823 rli->abort_slave= 1;
3835 DBUG_EXECUTE_IF(
"incomplete_group_in_relay_log",
3836 if ((ev->get_type_code() == XID_EVENT) ||
3837 ((ev->get_type_code() == QUERY_EVENT) &&
3840 DBUG_ASSERT(thd->transaction.all.cannot_safely_rollback());
3841 rli->abort_slave= 1;
3844 rli->inc_event_relay_log_pos();
3850 exec_res= apply_event_and_update_pos(ptr_ev, thd, rli);
3863 DBUG_ASSERT(*ptr_ev == ev);
3872 if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT &&
3873 ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
3875 DBUG_PRINT(
"info", (
"Deleting the event after it has been executed"));
3889 if (exec_res >= SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR)
3895 if (slave_trans_retries)
3897 int UNINIT_VAR(temp_err);
3899 if (exec_res && !is_mts_worker(thd) &&
3901 !thd->transaction.all.cannot_safely_rollback())
3918 if (rli->trans_retries < slave_trans_retries)
3924 if (global_init_info(rli->mi,
false, SLAVE_SQL))
3925 sql_print_error(
"Failed to initialize the master info structure");
3927 rli->get_group_relay_log_pos(),
3930 sql_print_error(
"Error initializing relay log position: %s",
3934 exec_res= SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK;
3935 rli->cleanup_context(thd, 1);
3937 slave_sleep(thd, min<ulong>(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
3938 sql_slave_killed, rli);
3941 rli->trans_retries++;
3943 rli->retried_trans++;
3945 DBUG_PRINT(
"info", (
"Slave retries transaction "
3946 "rli->trans_retries: %lu", rli->trans_retries));
3951 thd->is_fatal_error= 1;
3952 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
3953 "Slave SQL thread retried transaction %lu time(s) "
3954 "in vain, giving up. Consider raising the value of "
3955 "the slave_transaction_retries variable.", rli->trans_retries);
3958 else if ((exec_res && !temp_err) ||
3959 (opt_using_transactions &&
3960 rli->get_group_relay_log_pos() == rli->get_event_relay_log_pos()))
3968 rli->trans_retries= 0;
3969 DBUG_PRINT(
"info", (
"Resetting retry counter, rli->trans_retries: %lu",
3970 rli->trans_retries));
3975 DBUG_RETURN(exec_res);
3978 rli->
report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
3979 ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
"\
3980 Could not parse relay log event entry. The possible reasons are: the master's \
3981 binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
3982 binary log), the slave's relay log is corrupted (you can check this by running \
3983 'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
3984 or slave's MySQL code. If you want to check the master's binary log or slave's \
3985 relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
3991 static bool check_io_slave_killed(THD *thd, Master_info *mi,
const char *info)
3993 if (io_slave_killed(thd, mi))
3995 if (info && log_warnings)
3996 sql_print_information(
"%s", info);
4028 static int try_to_reconnect(THD *thd,
MYSQL *mysql, Master_info *mi,
4029 uint *retry_count,
bool suppress_warnings,
4030 const char *messages[SLAVE_RECON_MSG_MAX])
4032 mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
4033 thd->proc_info= messages[SLAVE_RECON_MSG_WAIT];
4034 #ifdef SIGNAL_WITH_VIO_SHUTDOWN
4035 thd->clear_active_vio();
4038 if ((*retry_count)++)
4040 if (*retry_count > mi->retry_count)
4042 slave_sleep(thd, mi->connect_retry, io_slave_killed, mi);
4044 if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
4046 thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
4047 if (!suppress_warnings)
4049 char buf[256], llbuff[22];
4050 my_snprintf(buf,
sizeof(buf), messages[SLAVE_RECON_MSG_FAILED],
4051 mi->get_io_rpl_log_name(), llstr(mi->get_master_log_pos(),
4057 if (messages[SLAVE_RECON_MSG_COMMAND][0])
4059 mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
4060 ER(ER_SLAVE_MASTER_COM_FAILURE),
4061 messages[SLAVE_RECON_MSG_COMMAND], buf);
4065 sql_print_information(
"%s", buf);
4068 if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi))
4071 sql_print_information(
"%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]);
4086 pthread_handler_t handle_slave_io(
void *arg)
4089 bool thd_added=
false;
4091 Master_info *mi = (Master_info*)arg;
4095 bool suppress_warnings;
4099 uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
4103 DBUG_ENTER(
"handle_slave_io");
4105 DBUG_ASSERT(mi->inited);
4114 mi->events_until_exit = disconnect_slave_event_count;
4118 THD_CHECK_SENTRY(thd);
4121 pthread_detach_this_thread();
4122 thd->thread_stack= (
char*) &thd;
4124 if (init_slave_thread(thd, SLAVE_THD_IO))
4128 sql_print_error(
"Failed during slave I/O thread initialization");
4133 add_global_thread(thd);
4137 mi->slave_running = 1;
4138 mi->abort_slave = 0;
4142 DBUG_PRINT(
"master_info",(
"log_file_name: '%s' position: %s",
4143 mi->get_master_log_name(),
4144 llstr(mi->get_master_log_pos(), llbuff)));
4147 my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
4149 if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
4151 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4152 ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'thread_start' hook");
4156 if (!(mi->mysql = mysql = mysql_init(NULL)))
4158 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4159 ER(ER_SLAVE_FATAL_ERROR),
"error in mysql_init()");
4163 THD_STAGE_INFO(thd, stage_connecting_to_master);
4165 if (!safe_connect(thd, mysql, mi))
4167 sql_print_information(
"Slave I/O thread: connected to master '%s@%s:%d',"
4168 "replication started in log '%s' at position %s",
4169 mi->get_user(), mi->host, mi->port,
4170 mi->get_io_rpl_log_name(),
4171 llstr(mi->get_master_log_pos(), llbuff));
4175 sql_print_information(
"Slave I/O thread killed while connecting to master");
4181 DBUG_EXECUTE_IF(
"dbug.before_get_running_status_yes",
4185 "wait_for signal.io_thread_let_running";
4186 DBUG_ASSERT(opt_debug_sync_timeout > 0);
4187 DBUG_ASSERT(!debug_sync_set_action(thd,
4188 STRING_WITH_LEN(act)));
4191 mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
4194 thd->slave_net = &mysql->net;
4195 THD_STAGE_INFO(thd, stage_checking_master_version);
4196 ret= get_master_version_and_clock(mysql, mi);
4198 ret= get_master_uuid(mysql, mi);
4200 ret= io_thread_init_commands(mysql, mi);
4208 if (check_io_slave_killed(mi->info_thd, mi,
"Slave I/O thread killed"
4209 "while calling get_master_version_and_clock(...)"))
4211 suppress_warnings= FALSE;
4213 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4214 reconnect_messages[SLAVE_RECON_ACT_REG]))
4220 binlog_version= mi->get_mi_description_event()->binlog_version;
4223 if (binlog_version > 1)
4228 THD_STAGE_INFO(thd, stage_registering_slave_on_master);
4229 if (register_slave_on_master(mysql, mi, &suppress_warnings))
4231 if (!check_io_slave_killed(thd, mi,
"Slave I/O thread killed "
4232 "while registering slave on master"))
4234 sql_print_error(
"Slave I/O thread couldn't register on master");
4235 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4236 reconnect_messages[SLAVE_RECON_ACT_REG]))
4243 DBUG_EXECUTE_IF(
"FORCE_SLAVE_TO_RECONNECT_REG",
4244 if (!retry_count_reg)
4247 sql_print_information(
"Forcing to reconnect slave I/O thread");
4248 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4249 reconnect_messages[SLAVE_RECON_ACT_REG]))
4255 DBUG_PRINT(
"info",(
"Starting reading binary log from master"));
4256 while (!io_slave_killed(thd,mi))
4258 THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
4259 if (request_dump(thd, mysql, mi, &suppress_warnings))
4261 sql_print_error(
"Failed on request_dump()");
4262 if (check_io_slave_killed(thd, mi,
"Slave I/O thread killed while \
4263 requesting master dump") ||
4264 try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4265 reconnect_messages[SLAVE_RECON_ACT_DUMP]))
4269 DBUG_EXECUTE_IF(
"FORCE_SLAVE_TO_RECONNECT_DUMP",
4270 if (!retry_count_dump)
4273 sql_print_information(
"Forcing to reconnect slave I/O thread");
4274 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4275 reconnect_messages[SLAVE_RECON_ACT_DUMP]))
4279 const char *event_buf;
4281 DBUG_ASSERT(mi->last_error().number == 0);
4282 while (!io_slave_killed(thd,mi))
4291 THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
4292 event_len= read_event(mysql, mi, &suppress_warnings);
4293 if (check_io_slave_killed(thd, mi,
"Slave I/O thread killed while \
4296 DBUG_EXECUTE_IF(
"FORCE_SLAVE_TO_RECONNECT_EVENT",
4297 if (!retry_count_event)
4299 retry_count_event++;
4300 sql_print_information(
"Forcing to reconnect slave I/O thread");
4301 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4302 reconnect_messages[SLAVE_RECON_ACT_EVENT]))
4307 if (event_len == packet_error)
4309 uint mysql_error_number= mysql_errno(mysql);
4310 switch (mysql_error_number) {
4311 case CR_NET_PACKET_TOO_LARGE:
4313 Log entry on master is longer than slave_max_allowed_packet (%lu) on \
4314 slave. If the entry is correct, restart the server with a higher value of \
4315 slave_max_allowed_packet",
4316 slave_max_allowed_packet);
4317 mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE,
4318 "%s",
"Got a packet bigger than 'slave_max_allowed_packet' bytes");
4320 case ER_MASTER_FATAL_ERROR_READING_BINLOG:
4321 mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG,
4322 ER(ER_MASTER_FATAL_ERROR_READING_BINLOG),
4323 mysql_error_number, mysql_error(mysql));
4325 case ER_OUT_OF_RESOURCES:
4327 Stopping slave I/O thread due to out-of-memory error from master");
4328 mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES,
4329 "%s", ER(ER_OUT_OF_RESOURCES));
4332 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
4333 reconnect_messages[SLAVE_RECON_ACT_EVENT]))
4339 THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
4340 event_buf= (
const char*)mysql->net.read_pos + 1;
4342 if (RUN_HOOK(binlog_relay_io, after_read_event,
4343 (thd, mi,(
const char*)mysql->net.read_pos + 1,
4344 event_len, &event_buf, &event_len)))
4346 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4347 ER(ER_SLAVE_FATAL_ERROR),
4348 "Failed to run 'after_read_event' hook");
4355 if (queue_event(mi, event_buf, event_len))
4357 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
4358 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
4359 "could not queue event from master");
4363 if (RUN_HOOK(binlog_relay_io, after_queue_event,
4364 (thd, mi, event_buf, event_len, synced)))
4366 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4367 ER(ER_SLAVE_FATAL_ERROR),
4368 "Failed to run 'after_queue_event' hook");
4373 if (flush_master_info(mi, FALSE))
4375 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4376 ER(ER_SLAVE_FATAL_ERROR),
4377 "Failed to flush master info.");
4397 char llbuf1[22], llbuf2[22];
4398 DBUG_PRINT(
"info", (
"log_space_limit=%s log_space_total=%s \
4399 ignore_log_space_limit=%d",
4400 llstr(rli->log_space_limit,llbuf1),
4401 llstr(rli->log_space_total,llbuf2),
4402 (
int) rli->ignore_log_space_limit));
4406 if (rli->log_space_limit && rli->log_space_limit <
4407 rli->log_space_total &&
4408 !rli->ignore_log_space_limit)
4409 if (wait_for_relay_log_space(rli))
4411 sql_print_error(
"Slave I/O thread aborted while waiting for relay \
4421 sql_print_information(
"Slave I/O thread exiting, read up to log '%s', position %s",
4422 mi->get_io_rpl_log_name(), llstr(mi->get_master_log_pos(), llbuff));
4423 (void) RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
4425 thd->reset_db(NULL, 0);
4436 #ifdef SIGNAL_WITH_VIO_SHUTDOWN
4437 thd->clear_active_vio();
4443 write_ignored_events_info_to_relay_log(thd, mi);
4445 THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
4451 mi->reset_start_info();
4454 mi->set_mi_description_event(NULL);
4457 DBUG_ASSERT(thd->net.buff != 0);
4460 thd->release_resources();
4462 THD_CHECK_SENTRY(thd);
4464 remove_global_thread(thd);
4469 mi->slave_running= 0;
4477 DBUG_EXECUTE_IF(
"simulate_slave_delay_at_terminate_bug38694", sleep(5););
4481 ERR_remove_state(0);
4491 int check_temp_dir(
char* tmp_file)
4495 char tmp_dir[FN_REFLEN];
4496 size_t tmp_dir_size;
4498 DBUG_ENTER(
"check_temp_dir");
4503 dirname_part(tmp_dir, tmp_file, &tmp_dir_size);
4508 if (!(dirp=my_dir(tmp_dir,MYF(MY_WME))))
4516 char *unique_tmp_file_name= (
char*)my_malloc((FN_REFLEN+
TEMP_FILE_MAX_LEN)*
sizeof(char), MYF(0));
4517 sprintf(unique_tmp_file_name,
"%s%s", tmp_file, server_uuid);
4519 unique_tmp_file_name, CREATE_MODE,
4520 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4530 my_free(unique_tmp_file_name);
4537 pthread_handler_t handle_slave_worker(
void *arg)
4540 bool thd_added=
false;
4542 Slave_worker *w= (Slave_worker *) arg;
4545 ulonglong purge_size= 0;
4546 struct slave_job_item _item, *job_item= &_item;
4549 DBUG_ENTER(
"handle_slave_worker");
4554 sql_print_error(
"Failed during slave worker initialization");
4558 thd->thread_stack = (
char*)&thd;
4560 pthread_detach_this_thread();
4561 if (init_slave_thread(thd, SLAVE_THD_WORKER))
4564 sql_print_error(
"Failed during slave worker initialization");
4567 thd->init_for_queries(w);
4570 add_global_thread(thd);
4574 if (w->update_is_transactional())
4576 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4577 "Error checking if the worker repository is transactional.");
4582 w->running_status= Slave_worker::RUNNING;
4587 DBUG_ASSERT(thd->is_slave_error == 0);
4591 error= slave_worker_exec_job(w, rli);
4599 w->cleanup_context(thd, error);
4603 while(de_queue(&w->jobs, job_item))
4606 purge_size += ((
Log_event*) (job_item->data))->data_written;
4607 DBUG_ASSERT(job_item->data);
4608 delete static_cast<Log_event*
>(job_item->data);
4611 DBUG_ASSERT(w->jobs.len == 0);
4616 rli->pending_jobs -= purge_cnt;
4617 rli->mts_pending_jobs_size -= purge_size;
4618 DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
4628 w->cleanup_after_session();
4629 thd->rli_slave= NULL;
4633 w->running_status= Slave_worker::NOT_RUNNING;
4634 if (log_warnings > 1)
4635 sql_print_information(
"Worker %lu statistics: "
4636 "events processed = %lu "
4637 "hungry waits = %lu "
4638 "priv queue overfills = %llu ",
4639 w->id, w->events_done, w->wq_size_waits_cnt,
4640 w->jobs.waited_overfill);
4656 DBUG_ASSERT(thd->net.buff != 0);
4663 thd->system_thread= NON_SYSTEM_THREAD;
4664 thd->release_resources();
4667 THD_CHECK_SENTRY(thd);
4669 remove_global_thread(thd);
4675 ERR_remove_state(0);
4686 longlong filecmp= strcmp(id1->file_name, id2->file_name);
4687 longlong poscmp= id1->pos - id2->pos;
4688 return (filecmp < 0 ? -1 : (filecmp > 0 ? 1 :
4689 (poscmp < 0 ? -1 : (poscmp > 0 ? 1 : 0))));
4695 const char *errmsg= NULL;
4697 bool flag_group_seen_begin= FALSE;
4698 uint recovery_group_cnt= 0;
4699 bool not_reached_commit=
true;
4701 Slave_job_group job_worker;
4706 MY_BITMAP *groups= &rli->recovery_groups;
4708 DBUG_ENTER(
"mts_recovery_groups");
4710 DBUG_ASSERT(rli->slave_parallel_workers == 0);
4725 (
char *) rli->get_group_master_log_name(),
4726 rli->get_group_master_log_pos()
4731 if (!p_fdle->is_valid())
4738 my_init_dynamic_array(&above_lwm_jobs,
sizeof(Slave_job_group),
4739 rli->recovery_parallel_workers,
4740 rli->recovery_parallel_workers);
4742 for (uint
id= 0;
id < rli->recovery_parallel_workers;
id++)
4744 Slave_worker *worker=
4745 Rpl_info_factory::create_worker(opt_rli_repository_id,
id, rli,
true);
4753 LOG_POS_COORD w_last= {
const_cast<char*
>(worker->get_group_master_log_name()),
4754 worker->get_group_master_log_pos() };
4755 if (mts_event_coord_cmp(&w_last, &cp) > 0)
4762 job_worker.worker= worker;
4763 job_worker.checkpoint_log_pos= worker->checkpoint_master_log_pos;
4764 job_worker.checkpoint_log_name= worker->checkpoint_master_log_name;
4766 insert_dynamic(&above_lwm_jobs, (uchar*) &job_worker);
4795 DBUG_ASSERT(!rli->recovery_groups_inited);
4797 if (above_lwm_jobs.elements != 0)
4799 bitmap_init(groups, NULL, MTS_MAX_BITS_IN_GROUP, FALSE);
4800 rli->recovery_groups_inited=
true;
4801 bitmap_clear_all(groups);
4803 rli->mts_recovery_group_cnt= 0;
4804 for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
4806 Slave_worker *w= ((Slave_job_group *)
4807 dynamic_array_ptr(&above_lwm_jobs, it_job))->worker;
4808 LOG_POS_COORD w_last= {
const_cast<char*
>(w->get_group_master_log_name()),
4809 w->get_group_master_log_pos() };
4810 bool checksum_detected= FALSE;
4812 sql_print_information(
"Slave: MTS group recovery relay log info based on Worker-Id %lu, "
4813 "group_relay_log_name %s, group_relay_log_pos %llu "
4814 "group_master_log_name %s, group_master_log_pos %llu",
4816 w->get_group_relay_log_name(),
4817 w->get_group_relay_log_pos(),
4818 w->get_group_master_log_name(),
4819 w->get_group_master_log_pos());
4821 recovery_group_cnt= 0;
4822 not_reached_commit=
true;
4823 if (rli->relay_log.
find_log_pos(&linfo, rli->get_group_relay_log_name(), 1))
4826 sql_print_error(
"Error looking for %s.", rli->get_group_relay_log_name());
4829 offset= rli->get_group_relay_log_pos();
4830 for (
int checking= 0 ; not_reached_commit; checking++)
4835 sql_print_error(
"%s", errmsg);
4842 if (!checksum_detected)
4845 while (i < 4 && (ev= Log_event::read_log_event(&log,
4848 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
4850 p_fdle->checksum_alg= ev->checksum_alg;
4851 checksum_detected= TRUE;
4856 if (!checksum_detected)
4859 sql_print_error(
"%s",
"malformed or very old relay log which "
4860 "does not have FormatDescriptor");
4865 my_b_seek(&log, offset);
4867 while (not_reached_commit &&
4868 (ev= Log_event::read_log_event(&log, 0, p_fdle,
4869 opt_slave_sql_verify_checksum)))
4871 DBUG_ASSERT(ev->is_valid());
4873 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
4874 p_fdle->checksum_alg= ev->checksum_alg;
4876 if (ev->get_type_code() == ROTATE_EVENT ||
4877 ev->get_type_code() == FORMAT_DESCRIPTION_EVENT ||
4878 ev->get_type_code() == PREVIOUS_GTIDS_LOG_EVENT)
4885 DBUG_PRINT(
"mts", (
"Event Recoverying relay log info "
4886 "group_mster_log_name %s, event_master_log_pos %llu type code %u.",
4887 linfo.log_file_name, ev->log_pos, ev->get_type_code()));
4889 if (ev->starts_group())
4891 flag_group_seen_begin=
true;
4893 else if ((ev->ends_group() || !flag_group_seen_begin) &&
4897 LOG_POS_COORD ev_coord= { (
char *) rli->get_group_master_log_name(),
4899 flag_group_seen_begin=
false;
4900 recovery_group_cnt++;
4902 sql_print_information(
"Slave: MTS group recovery relay log info "
4903 "group_master_log_name %s, "
4904 "event_master_log_pos %llu.",
4905 rli->get_group_master_log_name(), ev->log_pos);
4906 if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
4909 for (uint i= 0; i <= w->checkpoint_seqno; i++)
4911 if (bitmap_is_set(&w->group_executed, i))
4912 DBUG_PRINT(
"mts", (
"Bit %u is set.", i));
4914 DBUG_PRINT(
"mts", (
"Bit %u is not set.", i));
4918 (
"Doing a shift ini(%lu) end(%lu).",
4919 (w->checkpoint_seqno + 1) - recovery_group_cnt,
4920 w->checkpoint_seqno));
4922 for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
4923 j= 0; i <= w->checkpoint_seqno; i++, j++)
4925 if (bitmap_is_set(&w->group_executed, i))
4927 DBUG_PRINT(
"mts", (
"Setting bit %u.", j));
4928 bitmap_fast_test_and_set(groups, j);
4931 not_reached_commit=
false;
4934 DBUG_ASSERT(ret < 0);
4941 offset= BIN_LOG_HEADER_SIZE;
4942 if (not_reached_commit && rli->relay_log.
find_next_log(&linfo, 1))
4945 sql_print_error(
"Error looking for file after %s.", linfo.log_file_name);
4950 rli->mts_recovery_group_cnt= (rli->mts_recovery_group_cnt < recovery_group_cnt ?
4951 recovery_group_cnt : rli->mts_recovery_group_cnt);
4954 DBUG_ASSERT(!rli->recovery_groups_inited ||
4955 rli->mts_recovery_group_cnt <= groups->n_bits);
4959 for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
4961 get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
4962 delete job_worker.worker;
4965 delete_dynamic(&above_lwm_jobs);
4966 if (rli->recovery_groups_inited && rli->mts_recovery_group_cnt == 0)
4968 bitmap_free(groups);
4969 rli->recovery_groups_inited=
false;
4972 DBUG_RETURN(error ? ER_MTS_RECOVERY_FAILURE : 0);
4988 bool mts_checkpoint_routine(
Relay_log_info *rli, ulonglong period,
4989 bool force,
bool need_data_lock)
4995 DBUG_ENTER(
"checkpoint_routine");
4998 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0))
5000 if (!rli->gaq->count_done(rli))
5009 DBUG_ASSERT(!rli->gaq->full() ||
5010 ((rli->checkpoint_seqno == rli->checkpoint_group -1 &&
5011 rli->mts_group_status == Relay_log_info::MTS_IN_GROUP) ||
5012 rli->checkpoint_seqno == rli->checkpoint_group));
5020 set_timespec_nsec(curr_clock, 0);
5021 ulonglong diff= diff_timespec(curr_clock, rli->
last_clock);
5022 if (!force && diff < period)
5033 cnt= rli->gaq->move_queue_head(&rli->workers);
5035 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0) &&
5036 cnt != opt_mts_checkpoint_period)
5037 sql_print_error(
"This an error cnt != mts_checkpoint_period");
5039 }
while (!sql_slave_killed(rli->info_thd, rli) &&
5040 cnt == 0 && force &&
5041 !DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0) &&
5042 (my_sleep(rli->mts_coordinator_basic_nap), 1));
5056 for (uint i= 0; i < rli->workers.elements; i++)
5059 get_dynamic(&rli->workers, (uchar *) &w_i, i);
5060 set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
5062 sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
5076 rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
5077 rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
5078 DBUG_PRINT(
"mts", (
"New checkpoint %llu %llu %s",
5079 rli->gaq->lwm.group_master_log_pos,
5080 rli->gaq->lwm.group_relay_log_pos,
5081 rli->gaq->lwm.group_relay_log_name));
5083 if (rli->gaq->lwm.group_relay_log_name[0] != 0)
5084 rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
5113 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0))
5133 Slave_worker *w= NULL;
5138 Rpl_info_factory::create_worker(opt_rli_repository_id, i, rli,
false)))
5140 sql_print_error(
"Failed during slave worker thread create");
5145 if (w->init_worker(rli, i))
5147 sql_print_error(
"Failed during slave worker thread create");
5151 set_dynamic(&rli->workers, (uchar*) &w, i);
5153 if (DBUG_EVALUATE_IF(
"mts_worker_thread_fails", i == 1, 0) ||
5156 handle_slave_worker, (
void*) w)))
5158 sql_print_error(
"Failed during slave worker thread create (errno= %d)",
5165 if (w->running_status == Slave_worker::NOT_RUNNING)
5169 insert_dynamic(&rli->least_occupied_workers, (uchar*) &w->jobs.len);
5179 if (rli->workers.elements == i + 1)
5180 delete_dynamic_element(&rli->workers, i);
5195 int slave_start_workers(
Relay_log_info *rli, ulong
n,
bool *mts_inited)
5202 if (n == 0 && rli->mts_recovery_group_cnt == 0)
5204 reset_dynamic(&rli->workers);
5215 rli->
init_workers(max(n, rli->recovery_parallel_workers));
5218 my_init_dynamic_array(&rli->curr_group_assigned_parts,
5219 sizeof(db_worker_hash_entry*),
5220 SLAVE_INIT_DBS_IN_GROUP, 1);
5221 rli->last_assigned_worker= NULL;
5222 my_init_dynamic_array(&rli->curr_group_da,
sizeof(
Log_event*), 8, 2);
5224 my_init_dynamic_array(&rli->least_occupied_workers,
sizeof(ulong), n, 0);
5233 rli->gaq=
new Slave_committed_queue(rli->get_group_master_log_name(),
5234 sizeof(Slave_job_group),
5235 rli->checkpoint_group, n);
5236 if (!rli->gaq->inited)
5240 rli->mts_slave_worker_queue_len_max= mts_slave_worker_queue_len_max;
5241 rli->mts_pending_jobs_size= 0;
5242 rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
5243 rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
5244 rli->mts_wq_excess_cnt= 0;
5245 rli->mts_wq_overrun_cnt= 0;
5246 rli->mts_wq_oversize= FALSE;
5247 rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap;
5248 rli->mts_worker_underrun_level= mts_worker_underrun_level;
5249 rli->curr_group_seen_begin= rli->curr_group_seen_gtid=
false;
5250 rli->curr_group_isolated= FALSE;
5251 rli->checkpoint_seqno= 0;
5252 rli->mts_last_online_stat= my_time(0);
5253 rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
5257 init_alloc_root(&rli->mts_coor_mem_root, NAME_LEN,
5258 (MAX_DBS_IN_EVENT_MTS / 2) * NAME_LEN);
5260 if (init_hash_workers(n))
5262 sql_print_error(
"Failed to init partitions hash");
5267 for (i= 0; i <
n; i++)
5269 if ((error= slave_start_single_worker(rli, i)))
5274 rli->slave_parallel_workers=
n;
5276 if (!error && rli->mts_recovery_group_cnt == 0)
5279 (void) Rpl_info_factory::reset_workers(rli);
5301 THD *thd= rli->info_thd;
5305 else if (rli->slave_parallel_workers == 0)
5316 if (rli->mts_group_status != Relay_log_info::MTS_KILLED_GROUP &&
5317 thd->killed == THD::NOT_KILLED)
5319 DBUG_ASSERT(rli->mts_group_status != Relay_log_info::MTS_IN_GROUP ||
5323 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0))
5325 sql_print_error(
"This is not supposed to happen at this point...");
5330 (void) wait_for_workers_to_finish(rli);
5335 (void) mts_checkpoint_routine(rli, 0,
false,
true);
5337 for (i= rli->workers.elements - 1; i >= 0; i--)
5344 if (w->running_status != Slave_worker::RUNNING)
5350 w->running_status= Slave_worker::KILLED;
5355 if (log_warnings > 1)
5356 sql_print_information(
"Notifying Worker %lu to exit, thd %p", w->id,
5360 thd_proc_info(thd,
"Waiting for workers to exit");
5362 for (i= rli->workers.elements - 1; i >= 0; i--)
5364 Slave_worker *w= NULL;
5368 while (w->running_status != Slave_worker::NOT_RUNNING)
5371 DBUG_ASSERT(w->running_status == Slave_worker::KILLED ||
5372 w->running_status == Slave_worker::ERROR_LEAVING);
5374 thd->ENTER_COND(&w->jobs_cond, &w->jobs_lock,
5375 &stage_slave_waiting_workers_to_exit, &old_stage);
5377 thd->EXIT_COND(&old_stage);
5382 delete_dynamic_element(&rli->workers, i);
5386 if (log_warnings > 1)
5387 sql_print_information(
"Total MTS session statistics: "
5388 "events processed = %llu; "
5389 "worker queues filled over overrun level = %lu; "
5390 "waited due a Worker queue full = %lu; "
5391 "waited due the total size = %lu; "
5392 "slept when Workers occupied = %lu ",
5393 rli->mts_events_assigned, rli->mts_wq_overrun_cnt,
5394 rli->mts_wq_overfill_cnt, rli->wq_size_waits_cnt,
5395 rli->mts_wq_no_underrun_cnt);
5397 DBUG_ASSERT(rli->pending_jobs == 0);
5398 DBUG_ASSERT(rli->mts_pending_jobs_size == 0);
5401 rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
5402 destroy_hash_workers(rli);
5404 delete_dynamic(&rli->least_occupied_workers);
5407 for (uint i= 0; i < rli->curr_group_da.elements; i++)
5408 delete *(
Log_event**) dynamic_array_ptr(&rli->curr_group_da, i);
5409 delete_dynamic(&rli->curr_group_da);
5411 delete_dynamic(&rli->curr_group_assigned_parts);
5413 rli->slave_parallel_workers= 0;
5414 free_root(&rli->mts_coor_mem_root, MYF(0));
5427 pthread_handler_t handle_slave_sql(
void *arg)
5430 bool thd_added=
false;
5431 char llbuff[22],llbuff1[22];
5432 char saved_log_name[FN_REFLEN];
5433 char saved_master_log_name[FN_REFLEN];
5434 my_off_t saved_log_pos= 0;
5435 my_off_t saved_master_log_pos= 0;
5436 my_off_t saved_skip= 0;
5440 bool mts_inited=
false;
5444 DBUG_ENTER(
"handle_slave_sql");
5446 DBUG_ASSERT(rli->inited);
5448 DBUG_ASSERT(!rli->slave_running);
5451 rli->events_until_exit = abort_slave_event_count;
5455 thd->thread_stack = (
char*)&thd;
5459 rli->slave_run_id++;
5460 rli->slave_running = 1;
5461 rli->reported_unsafe_warning=
false;
5463 pthread_detach_this_thread();
5464 if (init_slave_thread(thd, SLAVE_THD_SQL))
5472 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
5473 "Failed during slave thread initialization");
5476 thd->init_for_queries(rli);
5477 thd->temporary_tables = rli->save_temporary_tables;
5478 set_thd_in_use_temporary_tables(rli);
5481 add_global_thread(thd);
5486 if (slave_start_workers(rli, rli->opt_slave_parallel_workers, &mts_inited) != 0)
5490 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
5491 "Failed during slave workers initialization");
5502 rli->abort_slave = 0;
5516 if (rli->update_is_transactional())
5520 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
5521 "Error checking if the relay log repository is transactional.");
5525 if (!rli->is_transactional())
5526 rli->
report(WARNING_LEVEL, 0,
5527 "If a crash happens this configuration does not guarantee that the relay "
5528 "log info will be consistent");
5533 DEBUG_SYNC(thd,
"after_start_slave");
5537 rli->ignore_log_space_limit= 0;
5539 rli->trans_retries= 0;
5540 DBUG_PRINT(
"info", (
"rli->trans_retries: %lu", rli->trans_retries));
5543 rli->get_group_relay_log_pos(),
5547 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
5548 "Error initializing relay log position: %s", errmsg);
5551 THD_CHECK_SENTRY(thd);
5554 char llbuf1[22], llbuf2[22];
5555 DBUG_PRINT(
"info", (
"my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
5556 llstr(my_b_tell(rli->cur_log),llbuf1),
5557 llstr(rli->get_event_relay_log_pos(),llbuf2)));
5558 DBUG_ASSERT(rli->get_event_relay_log_pos() >= BIN_LOG_HEADER_SIZE);
5571 #ifdef SHOULD_BE_CHECKED
5572 DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->get_event_relay_log_pos());
5576 DBUG_ASSERT(rli->info_thd == thd);
5578 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
5580 if (ndb_wait_setup_func && ndb_wait_setup_func(opt_ndb_wait_setup))
5582 sql_print_warning(
"Slave SQL thread : NDB : Tables not available after %lu"
5583 " seconds. Consider increasing --ndb-wait-setup value",
5584 opt_ndb_wait_setup);
5588 DBUG_PRINT(
"master_info",(
"log_file_name: %s position: %s",
5589 rli->get_group_master_log_name(),
5590 llstr(rli->get_group_master_log_pos(),llbuff)));
5592 sql_print_information(
"Slave SQL thread initialized, starting replication in \
5593 log '%s' at position %s, relay log '%s' position: %s", rli->get_rpl_log_name(),
5594 llstr(rli->get_group_master_log_pos(),llbuff),rli->get_group_relay_log_name(),
5595 llstr(rli->get_group_relay_log_pos(),llbuff1));
5597 if (check_temp_dir(rli->slave_patternload_file))
5599 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
5600 "Unable to use slave's temporary directory %s - %s",
5601 slave_load_tmpdir, thd->get_stmt_da()->message());
5606 if (opt_init_slave.length)
5608 execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave);
5609 if (thd->is_slave_error)
5611 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
5612 "Slave SQL thread aborted. Can't execute init_slave query");
5622 if (rli->slave_skip_counter)
5624 strmake(saved_log_name, rli->get_group_relay_log_name(), FN_REFLEN - 1);
5625 strmake(saved_master_log_name, rli->get_group_master_log_name(), FN_REFLEN - 1);
5626 saved_log_pos= rli->get_group_relay_log_pos();
5627 saved_master_log_pos= rli->get_group_master_log_pos();
5628 saved_skip= rli->slave_skip_counter;
5630 if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
5640 while (!sql_slave_killed(thd,rli))
5642 THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log);
5643 DBUG_ASSERT(rli->info_thd == thd);
5644 THD_CHECK_SENTRY(thd);
5646 if (saved_skip && rli->slave_skip_counter == 0)
5648 sql_print_information(
"'SQL_SLAVE_SKIP_COUNTER=%ld' executed at "
5649 "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', "
5650 "master_log_pos='%ld' and new position at "
5651 "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', "
5652 "master_log_pos='%ld' ",
5653 (ulong) saved_skip, saved_log_name, (ulong) saved_log_pos,
5654 saved_master_log_name, (ulong) saved_master_log_pos,
5655 rli->get_group_relay_log_name(), (ulong) rli->get_group_relay_log_pos(),
5656 rli->get_group_master_log_name(), (ulong) rli->get_group_master_log_pos());
5660 if (exec_relay_log_event(thd,rli))
5662 DBUG_PRINT(
"info", (
"exec_relay_log_event() failed"));
5664 if (!sql_slave_killed(thd,rli))
5671 uint32
const last_errno= rli->last_error().
number;
5673 if (thd->is_error())
5675 char const *
const errmsg= thd->get_stmt_da()->message();
5678 (
"thd->get_stmt_da()->sql_errno()=%d; "
5679 "rli->last_error.number=%d",
5680 thd->get_stmt_da()->sql_errno(), last_errno));
5681 if (last_errno == 0)
5687 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
5690 else if (last_errno != thd->get_stmt_da()->sql_errno())
5698 sql_print_error(
"Slave (additional info): %s Error_code: %d",
5699 errmsg, thd->get_stmt_da()->sql_errno());
5705 thd->get_stmt_da()->sql_conditions();
5711 bool udf_error =
false;
5719 sql_print_error(
"Error loading user-defined library, slave SQL "
5720 "thread aborted. Install the missing library, and restart the "
5721 "slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
5722 "position %s", rli->get_rpl_log_name(),
5723 llstr(rli->get_group_master_log_pos(), llbuff));
5726 Error running query, slave SQL thread aborted. Fix the problem, and restart \
5727 the slave SQL thread with \"SLAVE START\". We stopped at log \
5728 '%s' position %s", rli->get_rpl_log_name(),
5729 llstr(rli->get_group_master_log_pos(), llbuff));
5736 sql_print_information(
"Slave SQL thread exiting, replication stopped in log "
5737 "'%s' at position %s",
5738 rli->get_rpl_log_name(),
5739 llstr(rli->get_group_master_log_pos(), llbuff));
5743 slave_stop_workers(rli, &mts_inited);
5744 if (rli->recovery_groups_inited)
5746 bitmap_free(&rli->recovery_groups);
5747 rli->mts_recovery_group_cnt= 0;
5748 rli->recovery_groups_inited=
false;
5758 rli->cleanup_context(thd, 1);
5766 thd->reset_db(NULL, 0);
5768 THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
5772 DBUG_ASSERT(rli->slave_running == 1);
5774 rli->slave_running= 0;
5779 DBUG_PRINT(
"info",(
"Signaling possibly waiting master_pos_wait() functions"));
5781 rli->ignore_log_space_limit= 0;
5784 rli->save_temporary_tables = thd->temporary_tables;
5790 thd->temporary_tables = 0;
5791 DBUG_ASSERT(thd->net.buff != 0);
5793 DBUG_ASSERT(rli->info_thd == thd);
5794 THD_CHECK_SENTRY(thd);
5796 set_thd_in_use_temporary_tables(rli);
5798 thd->release_resources();
5800 THD_CHECK_SENTRY(thd);
5802 remove_global_thread(thd);
5811 DBUG_EXECUTE_IF(
"simulate_slave_delay_at_terminate_bug38694", sleep(5););
5816 ERR_remove_state(0);
5830 bool cev_not_written;
5831 THD *thd = mi->info_thd;
5832 NET *net = &mi->mysql->net;
5833 DBUG_ENTER(
"process_io_create_file");
5837 if (unlikely(!cev->is_valid()))
5840 if (!rpl_filter->db_ok(cev->db))
5842 skip_load_data_infile(net);
5845 DBUG_ASSERT(cev->inited_from_old);
5846 thd->file_id = cev->file_id = mi->file_id++;
5847 thd->server_id = cev->server_id;
5848 cev_not_written = 1;
5850 if (unlikely(net_request_file(net,cev->fname)))
5852 sql_print_error(
"Slave I/O: failed requesting download of '%s'",
5867 if (unlikely((num_bytes=
my_net_read(net)) == packet_error))
5869 sql_print_error(
"Network read error downloading '%s' from master",
5873 if (unlikely(!num_bytes))
5883 if (unlikely(cev_not_written))
5886 xev.log_pos = cev->log_pos;
5887 if (unlikely(mi->rli->relay_log.append_event(&xev, mi) != 0))
5889 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
5890 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
5891 "error writing Exec_load event to relay log");
5894 mi->rli->relay_log.harvest_bytes_written(&mi->rli->log_space_total);
5897 if (unlikely(cev_not_written))
5899 cev->block = net->read_pos;
5900 cev->block_len = num_bytes;
5901 if (unlikely(mi->rli->relay_log.append_event(cev, mi) != 0))
5903 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
5904 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
5905 "error writing Create_file event to relay log");
5909 mi->rli->relay_log.harvest_bytes_written(&mi->rli->log_space_total);
5913 aev.block = net->read_pos;
5914 aev.block_len = num_bytes;
5915 aev.log_pos = cev->log_pos;
5916 if (unlikely(mi->rli->relay_log.append_event(&aev, mi) != 0))
5918 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
5919 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
5920 "error writing Append_block event to relay log");
5923 mi->rli->relay_log.harvest_bytes_written(&mi->rli->log_space_total);
5951 DBUG_ENTER(
"process_io_rotate");
5954 if (unlikely(!rev->is_valid()))
5958 memcpy(const_cast<char *>(mi->get_master_log_name()),
5959 rev->new_log_ident, rev->ident_len + 1);
5960 mi->set_master_log_pos(rev->pos);
5961 DBUG_PRINT(
"info", (
"new (master_log_name, master_log_pos): ('%s', %lu)",
5962 mi->get_master_log_name(), (ulong) mi->get_master_log_pos()));
5968 if (disconnect_slave_event_count)
5969 mi->events_until_exit++;
5980 if (old_fdle->binlog_version >= 4)
5982 DBUG_ASSERT(old_fdle->checksum_alg ==
5983 mi->rli->relay_log.relay_log_checksum_alg);
5986 new_fdle->checksum_alg= mi->rli->relay_log.relay_log_checksum_alg;
5987 mi->set_mi_description_event(new_fdle);
5993 int ret= rotate_relay_log(mi);
6003 static int queue_binlog_ver_1_event(Master_info *mi,
const char *buf,
6006 const char *errmsg = 0;
6008 bool ignore_event= 0;
6011 DBUG_ENTER(
"queue_binlog_ver_1_event");
6019 if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
6021 if (unlikely(!(tmp_buf=(
char*)my_malloc(event_len+1,MYF(MY_WME)))))
6023 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
6024 ER(ER_SLAVE_FATAL_ERROR),
"Memory allocation failed");
6027 memcpy(tmp_buf,buf,event_len);
6035 tmp_buf[event_len++]=0;
6036 int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
6037 buf = (
const char*)tmp_buf;
6046 Log_event::read_log_event(buf, event_len, &errmsg,
6047 mi->get_mi_description_event(), 0);
6050 sql_print_error(
"Read invalid event from master: '%s',\
6051 master could be corrupt but a more likely cause of this is a bug",
6053 my_free((
char*) tmp_buf);
6057 mi->set_master_log_pos(ev->log_pos);
6058 switch (ev->get_type_code()) {
6071 case CREATE_FILE_EVENT:
6080 DBUG_ASSERT(tmp_buf != 0);
6082 ev->log_pos+= inc_pos;
6085 mi->set_master_log_pos(mi->get_master_log_pos() + inc_pos);
6086 DBUG_PRINT(
"info", (
"master_log_pos: %lu", (ulong) mi->get_master_log_pos()));
6087 my_free((
char*)tmp_buf);
6094 if (likely(!ignore_event))
6101 ev->log_pos+= event_len;
6102 if (unlikely(rli->relay_log.append_event(ev, mi) != 0))
6107 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
6110 mi->set_master_log_pos(mi->get_master_log_pos() + inc_pos);
6111 DBUG_PRINT(
"info", (
"master_log_pos: %lu", (ulong) mi->get_master_log_pos()));
6121 static int queue_binlog_ver_3_event(Master_info *mi,
const char *buf,
6124 const char *errmsg = 0;
6128 DBUG_ENTER(
"queue_binlog_ver_3_event");
6134 Log_event::read_log_event(buf, event_len, &errmsg,
6135 mi->get_mi_description_event(), 0);
6138 sql_print_error(
"Read invalid event from master: '%s',\
6139 master could be corrupt but a more likely cause of this is a bug",
6141 my_free((
char*) tmp_buf);
6144 switch (ev->get_type_code()) {
6160 if (unlikely(rli->relay_log.append_event(ev, mi) != 0))
6165 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
6167 mi->set_master_log_pos(mi->get_master_log_pos() + inc_pos);
6169 DBUG_PRINT(
"info", (
"master_log_pos: %lu", (ulong) mi->get_master_log_pos()));
6185 static int queue_old_event(Master_info *mi,
const char *buf,
6188 DBUG_ENTER(
"queue_old_event");
6192 switch (mi->get_mi_description_event()->binlog_version)
6195 DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
6197 DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
6199 DBUG_PRINT(
"info",(
"unsupported binlog format %d in queue_old_event()",
6200 mi->get_mi_description_event()->binlog_version));
6215 static int queue_event(Master_info* mi,
const char* buf, ulong event_len)
6223 bool unlock_data_lock= TRUE;
6230 uint8 checksum_alg= mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ?
6231 mi->checksum_alg_before_fd :
6232 mi->rli->relay_log.relay_log_checksum_alg;
6234 char *save_buf= NULL;
6235 char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN];
6236 Gtid gtid= { 0, 0 };
6239 DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
6240 checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
6241 checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
6243 DBUG_ENTER(
"queue_event");
6248 if (event_type == FORMAT_DESCRIPTION_EVENT)
6252 else if (event_type == START_EVENT_V3)
6255 mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF;
6257 mi->get_mi_description_event()->checksum_alg=
6258 mi->rli->relay_log.relay_log_checksum_alg= checksum_alg=
6259 BINLOG_CHECKSUM_ALG_OFF;
6268 DBUG_ASSERT(mi->rli->relay_log.relay_log_checksum_alg !=
6269 BINLOG_CHECKSUM_ALG_UNDEF);
6272 DBUG_EXECUTE_IF(
"corrupt_queue_event",
6273 if (event_type != FORMAT_DESCRIPTION_EVENT)
6275 char *debug_event_buf_c = (
char*) buf;
6277 debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
6278 DBUG_PRINT(
"info", (
"Corrupt the event at queue_event: byte on position %d", debug_cor_pos));
6285 error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE;
6286 unlock_data_lock= FALSE;
6292 if (mi->get_mi_description_event()->binlog_version < 4 &&
6293 event_type != FORMAT_DESCRIPTION_EVENT )
6295 int ret= queue_old_event(mi,buf,event_len);
6300 switch (event_type) {
6319 mi->get_mi_description_event());
6321 if (unlikely(process_io_rotate(mi, &rev)))
6323 error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
6339 if (uint4korr(&buf[0]) == 0 && checksum_alg == BINLOG_CHECKSUM_ALG_OFF &&
6340 mi->rli->relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
6342 ha_checksum rot_crc= my_checksum(0L, NULL, 0);
6345 int4store(&rot_buf[EVENT_LEN_OFFSET],
6347 rot_crc= my_checksum(rot_crc, (
const uchar *) rot_buf,
6350 DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET]));
6351 DBUG_ASSERT(mi->get_mi_description_event()->checksum_alg ==
6352 mi->rli->relay_log.relay_log_checksum_alg);
6354 DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
6355 save_buf= (
char *) buf;
6363 if (uint4korr(&buf[0]) == 0 && checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
6364 mi->rli->relay_log.relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_OFF)
6367 memcpy(rot_buf, buf, event_len);
6368 int4store(&rot_buf[EVENT_LEN_OFFSET],
6370 DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET]));
6371 DBUG_ASSERT(mi->get_mi_description_event()->checksum_alg ==
6372 mi->rli->relay_log.relay_log_checksum_alg);
6374 DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
6375 save_buf= (
char *) buf;
6385 case FORMAT_DESCRIPTION_EVENT:
6398 mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF;
6401 Log_event::read_log_event(buf, event_len, &errmsg,
6402 mi->get_mi_description_event(), 1);
6403 if (new_fdle == NULL)
6405 error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
6408 if (new_fdle->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
6409 new_fdle->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
6410 mi->set_mi_description_event(new_fdle);
6413 mi->rli->relay_log.relay_log_checksum_alg= new_fdle->checksum_alg;
6423 inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
6424 DBUG_PRINT(
"info",(
"binlog format is now %d",
6425 mi->get_mi_description_event()->binlog_version));
6430 case HEARTBEAT_LOG_EVENT:
6436 Heartbeat_log_event hb(buf,
6437 mi->rli->relay_log.relay_log_checksum_alg
6438 != BINLOG_CHECKSUM_ALG_OFF ?
6440 mi->get_mi_description_event());
6443 error= ER_SLAVE_HEARTBEAT_FAILURE;
6444 error_msg.append(STRING_WITH_LEN(
"inconsistent heartbeat event content;"));
6445 error_msg.append(STRING_WITH_LEN(
"the event's data: log_file_name "));
6446 error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
6447 error_msg.append(STRING_WITH_LEN(
" log_pos "));
6448 llstr(hb.log_pos, llbuf);
6449 error_msg.append(llbuf, strlen(llbuf));
6452 mi->received_heartbeats++;
6453 mi->last_heartbeat= my_time(0);
6470 if (mi->is_auto_position() && mi->get_master_log_pos() < hb.log_pos
6471 && mi->get_master_log_name() != NULL)
6474 DBUG_ASSERT(memcmp(const_cast<char*>(mi->get_master_log_name()),
6475 hb.get_log_ident(), hb.get_ident_len()) == 0);
6477 mi->set_master_log_pos(hb.log_pos);
6483 memcpy(rli->ign_master_log_name_end, mi->get_master_log_name(),
6485 rli->ign_master_log_pos_end = mi->get_master_log_pos();
6487 if (write_ignored_events_info_to_relay_log(mi->info_thd, mi))
6503 if ((memcmp(const_cast<char *>(mi->get_master_log_name()),
6504 hb.get_log_ident(), hb.get_ident_len())
6505 && mi->get_master_log_name() != NULL)
6506 || ((mi->get_master_log_pos() != hb.log_pos && gtid_mode == 0) ||
6512 (mi->get_master_log_pos() > hb.log_pos)))
6515 error= ER_SLAVE_HEARTBEAT_FAILURE;
6516 error_msg.append(STRING_WITH_LEN(
"heartbeat is not compatible with local info;"));
6517 error_msg.append(STRING_WITH_LEN(
"the event's data: log_file_name "));
6518 error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
6519 error_msg.append(STRING_WITH_LEN(
" log_pos "));
6520 llstr(hb.log_pos, llbuf);
6521 error_msg.append(llbuf, strlen(llbuf));
6524 goto skip_relay_logging;
6528 case PREVIOUS_GTIDS_LOG_EVENT:
6532 error= ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF;
6544 mi->set_master_log_pos(mi->get_master_log_pos() + event_len);
6545 memcpy(rli->ign_master_log_name_end, mi->get_master_log_name(), FN_REFLEN);
6546 rli->ign_master_log_pos_end= mi->get_master_log_pos();
6548 if (write_ignored_events_info_to_relay_log(mi->info_thd, mi))
6551 goto skip_relay_logging;
6555 case GTID_LOG_EVENT:
6559 error= ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF;
6562 global_sid_lock->
rdlock();
6563 Gtid_log_event gtid_ev(buf, checksum_alg != BINLOG_CHECKSUM_ALG_OFF ?
6565 mi->get_mi_description_event());
6566 gtid.
sidno= gtid_ev.get_sidno(
false);
6567 global_sid_lock->
unlock();
6570 gtid.
gno= gtid_ev.get_gno();
6575 case ANONYMOUS_GTID_LOG_EVENT:
6596 s_id= uint4korr(buf + SERVER_ID_OFFSET);
6603 s_id&= opt_server_id_mask;
6605 if ((s_id == ::server_id && !mi->rli->replicate_same_server_id) ||
6611 (mi->ignore_server_ids->dynamic_ids.elements > 0 &&
6612 mi->shall_ignore_server_id(s_id) &&
6614 (s_id != mi->master_id ||
6616 (event_type != FORMAT_DESCRIPTION_EVENT &&
6617 event_type != ROTATE_EVENT))))
6636 if (!(s_id == ::server_id && !mi->rli->replicate_same_server_id) ||
6637 (event_type != FORMAT_DESCRIPTION_EVENT &&
6638 event_type != ROTATE_EVENT &&
6639 event_type != STOP_EVENT))
6641 mi->set_master_log_pos(mi->get_master_log_pos() + inc_pos);
6642 memcpy(rli->ign_master_log_name_end, mi->get_master_log_name(), FN_REFLEN);
6643 DBUG_ASSERT(rli->ign_master_log_name_end[0]);
6644 rli->ign_master_log_pos_end= mi->get_master_log_pos();
6646 rli->relay_log.signal_update();
6647 DBUG_PRINT(
"info", (
"master_log_pos: %lu, event originating from %u server, ignored",
6648 (ulong) mi->get_master_log_pos(), uint4korr(buf + SERVER_ID_OFFSET)));
6653 if (likely(rli->relay_log.append_buffer(buf, event_len, mi) == 0))
6655 mi->set_master_log_pos(mi->get_master_log_pos() + inc_pos);
6656 DBUG_PRINT(
"info", (
"master_log_pos: %lu", (ulong) mi->get_master_log_pos()));
6657 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
6659 if (event_type == GTID_LOG_EVENT)
6661 global_sid_lock->
rdlock();
6662 int ret= rli->add_logged_gtid(gtid.
sidno, gtid.
gno);
6663 global_sid_lock->
unlock();
6670 error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
6672 rli->ign_master_log_name_end[0]= 0;
6673 if (save_buf != NULL)
6681 if (unlock_data_lock)
6683 DBUG_PRINT(
"info", (
"error: %d", error));
6685 mi->report(ERROR_LEVEL, error, ER(error),
6686 (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
6687 "could not queue event from master" :
6706 extern "C" void slave_io_thread_detach_vio()
6708 #ifdef SIGNAL_WITH_VIO_SHUTDOWN
6709 THD *thd= current_thd;
6710 if (thd && thd->slave_thread)
6711 thd->clear_active_vio();
6730 static int safe_connect(THD* thd,
MYSQL* mysql, Master_info* mi)
6732 DBUG_ENTER(
"safe_connect");
6734 DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0));
6747 static int connect_to_master(THD* thd,
MYSQL* mysql, Master_info* mi,
6748 bool reconnect,
bool suppress_warnings)
6750 int slave_was_killed= 0;
6754 char password[MAX_PASSWORD_LENGTH + 1];
6755 int password_size=
sizeof(password);
6756 DBUG_ENTER(
"connect_to_master");
6757 set_slave_max_allowed_packet(thd, mysql);
6759 mi->events_until_exit = disconnect_slave_event_count;
6761 ulong client_flag= CLIENT_REMEMBER_OPTIONS;
6762 if (opt_slave_compressed_protocol)
6763 client_flag=CLIENT_COMPRESS;
6765 mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (
char *) &slave_net_timeout);
6766 mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (
char *) &slave_net_timeout);
6768 if (mi->bind_addr[0])
6770 DBUG_PRINT(
"info",(
"bind_addr: %s", mi->bind_addr));
6771 mysql_options(mysql, MYSQL_OPT_BIND, mi->bind_addr);
6777 mysql_ssl_set(mysql,
6778 mi->ssl_key[0]?mi->ssl_key:0,
6779 mi->ssl_cert[0]?mi->ssl_cert:0,
6780 mi->ssl_ca[0]?mi->ssl_ca:0,
6781 mi->ssl_capath[0]?mi->ssl_capath:0,
6782 mi->ssl_cipher[0]?mi->ssl_cipher:0);
6783 mysql_options(mysql, MYSQL_OPT_SSL_CRL,
6784 mi->ssl_crl[0] ? mi->ssl_crl : 0);
6785 mysql_options(mysql, MYSQL_OPT_SSL_CRLPATH,
6786 mi->ssl_crlpath[0] ? mi->ssl_crlpath : 0);
6787 mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
6788 &mi->ssl_verify_server_cert);
6792 mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
6794 mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (
char *) charsets_dir);
6796 if (mi->is_start_plugin_auth_configured())
6798 DBUG_PRINT(
"info", (
"Slaving is using MYSQL_DEFAULT_AUTH %s",
6799 mi->get_start_plugin_auth()));
6800 mysql_options(mysql, MYSQL_DEFAULT_AUTH, mi->get_start_plugin_auth());
6803 if (mi->is_start_plugin_dir_configured())
6805 DBUG_PRINT(
"info", (
"Slaving is using MYSQL_PLUGIN_DIR %s",
6806 mi->get_start_plugin_dir()));
6807 mysql_options(mysql, MYSQL_PLUGIN_DIR, mi->get_start_plugin_dir());
6810 else if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
6811 mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
6813 if (!mi->is_start_user_configured())
6814 sql_print_warning(
"%s", ER(ER_INSECURE_CHANGE_MASTER));
6816 if (mi->get_password(password, &password_size))
6818 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
6819 ER(ER_SLAVE_FATAL_ERROR),
6820 "Unable to configure password when attempting to "
6821 "connect to the master server. Connection attempt "
6826 const char* user= mi->get_user();
6827 if (user == NULL || user[0] == 0)
6829 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
6830 ER(ER_SLAVE_FATAL_ERROR),
6831 "Invalid (empty) username when attempting to "
6832 "connect to the master server. Connection attempt "
6837 while (!(slave_was_killed = io_slave_killed(thd,mi))
6838 && (reconnect ? mysql_reconnect(mysql) != 0 :
6839 mysql_real_connect(mysql, mi->host, user,
6840 password, 0, mi->port, 0, client_flag) == 0))
6847 last_errno=mysql_errno(mysql);
6848 suppress_warnings= 0;
6849 mi->report(ERROR_LEVEL, last_errno,
6850 "error %s to master '%s@%s:%d'"
6851 " - retry-time: %d retries: %lu",
6852 (reconnect ?
"reconnecting" :
"connecting"),
6853 mi->get_user(), mi->host, mi->port,
6854 mi->connect_retry, err_count + 1);
6861 if (++err_count == mi->retry_count)
6866 slave_sleep(thd, mi->connect_retry, io_slave_killed, mi);
6869 if (!slave_was_killed)
6874 if (!suppress_warnings && log_warnings)
6875 sql_print_information(
"Slave: connected to master '%s@%s:%d',\
6876 replication resumed in log '%s' at position %s", mi->get_user(),
6878 mi->get_io_rpl_log_name(),
6879 llstr(mi->get_master_log_pos(),llbuff));
6883 general_log_print(thd, COM_CONNECT_OUT,
"%s@%s:%d",
6884 mi->get_user(), mi->host, mi->port);
6886 #ifdef SIGNAL_WITH_VIO_SHUTDOWN
6887 thd->set_active_vio(mysql->net.vio);
6890 mysql->reconnect= 1;
6891 DBUG_PRINT(
"exit",(
"slave_was_killed: %d", slave_was_killed));
6892 DBUG_RETURN(slave_was_killed);
6904 static int safe_reconnect(THD* thd,
MYSQL* mysql, Master_info* mi,
6905 bool suppress_warnings)
6907 DBUG_ENTER(
"safe_reconnect");
6908 DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
6914 THD *thd= current_thd;
6915 char password[MAX_PASSWORD_LENGTH + 1];
6916 int password_size=
sizeof(password);
6917 Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
6920 sql_print_error(
"'rpl_connect_master' must be called in slave I/O thread context.");
6924 bool allocated=
false;
6928 if(!(mysql= mysql_init(NULL)))
6930 sql_print_error(
"rpl_connect_master: failed in mysql_init()");
6943 mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (
char *) &slave_net_timeout);
6944 mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (
char *) &slave_net_timeout);
6946 if (mi->bind_addr[0])
6948 DBUG_PRINT(
"info",(
"bind_addr: %s", mi->bind_addr));
6949 mysql_options(mysql, MYSQL_OPT_BIND, mi->bind_addr);
6955 mysql_ssl_set(mysql,
6956 mi->ssl_key[0]?mi->ssl_key:0,
6957 mi->ssl_cert[0]?mi->ssl_cert:0,
6958 mi->ssl_ca[0]?mi->ssl_ca:0,
6959 mi->ssl_capath[0]?mi->ssl_capath:0,
6960 mi->ssl_cipher[0]?mi->ssl_cipher:0);
6961 mysql_options(mysql, MYSQL_OPT_SSL_CRL,
6962 mi->ssl_crl[0] ? mi->ssl_crl : 0);
6963 mysql_options(mysql, MYSQL_OPT_SSL_CRLPATH,
6964 mi->ssl_crlpath[0] ? mi->ssl_crlpath : 0);
6965 mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
6966 &mi->ssl_verify_server_cert);
6970 mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
6972 mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (
char *) charsets_dir);
6974 if (mi->is_start_plugin_auth_configured())
6976 DBUG_PRINT(
"info", (
"Slaving is using MYSQL_DEFAULT_AUTH %s",
6977 mi->get_start_plugin_auth()));
6978 mysql_options(mysql, MYSQL_DEFAULT_AUTH, mi->get_start_plugin_auth());
6981 if (mi->is_start_plugin_dir_configured())
6983 DBUG_PRINT(
"info", (
"Slaving is using MYSQL_PLUGIN_DIR %s",
6984 mi->get_start_plugin_dir()));
6985 mysql_options(mysql, MYSQL_PLUGIN_DIR, mi->get_start_plugin_dir());
6988 else if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
6989 mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
6991 if (!mi->is_start_user_configured())
6992 sql_print_warning(
"%s", ER(ER_INSECURE_CHANGE_MASTER));
6994 const char *user= mi->get_user();
6997 || mi->get_password(password, &password_size)
6998 || io_slave_killed(thd, mi)
6999 || !mysql_real_connect(mysql, mi->host, user,
7000 password, 0, mi->port, 0, 0))
7002 if (!io_slave_killed(thd, mi))
7003 sql_print_error(
"rpl_connect_master: error connecting to master: %s (server_error: %d)",
7004 mysql_error(mysql), mysql_errno(mysql));
7019 DBUG_ENTER(
"reopen_relay_log");
7020 DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
7021 DBUG_ASSERT(rli->cur_log_fd == -1);
7023 IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
7024 if ((rli->cur_log_fd=
open_binlog_file(cur_log,rli->get_event_relay_log_name(),
7032 rli->set_event_relay_log_pos(max<ulonglong>(rli->get_event_relay_log_pos(),
7033 BIN_LOG_HEADER_SIZE));
7034 my_b_seek(cur_log,rli->get_event_relay_log_pos());
7035 DBUG_RETURN(cur_log);
7054 const char* errmsg=0;
7055 THD* thd = rli->info_thd;
7056 DBUG_ENTER(
"next_event");
7058 DBUG_ASSERT(thd != 0);
7061 if (abort_slave_event_count && !rli->events_until_exit--)
7074 while (!sql_slave_killed(thd,rli))
7088 if ((hot_log = (cur_log != &rli->cache_buf)))
7090 DBUG_ASSERT(rli->cur_log_fd == -1);
7097 if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
7100 cur_log=reopen_relay_log(rli, &errmsg);
7112 if (!my_b_inited(cur_log))
7116 DBUG_PRINT(
"info", (
"assertion skip %lu file pos %lu event relay log pos %lu file %s\n",
7117 (ulong) rli->slave_skip_counter, (ulong) my_b_tell(cur_log),
7118 (ulong) rli->get_event_relay_log_pos(),
7119 rli->get_event_relay_log_name()));
7122 char llbuf1[22], llbuf2[22];
7123 DBUG_PRINT(
"info", (
"my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
7124 llstr(my_b_tell(cur_log),llbuf1),
7125 llstr(rli->get_event_relay_log_pos(),llbuf2)));
7127 DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
7128 DBUG_ASSERT(my_b_tell(cur_log) == rli->get_event_relay_log_pos() || rli->
is_parallel_exec());
7130 DBUG_PRINT(
"info", (
"next_event group master %s %lu group relay %s %lu event %s %lu\n",
7131 rli->get_group_master_log_name(),
7132 (ulong) rli->get_group_master_log_pos(),
7133 rli->get_group_relay_log_name(),
7134 (ulong) rli->get_group_relay_log_pos(),
7135 rli->get_event_relay_log_name(),
7136 (ulong) rli->get_event_relay_log_pos()));
7151 if ((ev= Log_event::read_log_event(cur_log, 0,
7153 opt_slave_sql_verify_checksum)))
7155 DBUG_ASSERT(thd==rli->info_thd);
7160 rli->set_future_event_relay_log_pos(my_b_tell(cur_log));
7169 bool force= (rli->checkpoint_seqno > (rli->checkpoint_group - 1));
7172 ulonglong period=
static_cast<ulonglong
>(opt_mts_checkpoint_period * 1000000ULL);
7178 (void) mts_checkpoint_routine(rli, period, force,
true);
7179 DBUG_ASSERT(!force ||
7180 (force && (rli->checkpoint_seqno <= (rli->checkpoint_group - 1))) ||
7181 sql_slave_killed(thd, rli));
7186 DBUG_ASSERT(thd==rli->info_thd);
7187 if (opt_reckless_slave)
7189 if (cur_log->error < 0)
7191 errmsg =
"slave SQL thread aborted because of I/O error";
7192 if (rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)
7197 rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
7202 if (!cur_log->error)
7247 rli->last_master_timestamp= 0;
7249 DBUG_ASSERT(rli->relay_log.get_open_count() ==
7250 rli->cur_log_old_open_count);
7252 if (rli->ign_master_log_name_end[0])
7255 DBUG_PRINT(
"info",(
"seeing an ignored end segment"));
7257 0, rli->ign_master_log_pos_end,
7258 Rotate_log_event::DUP_NAME);
7259 rli->ign_master_log_name_end[0]= 0;
7263 errmsg=
"Slave SQL thread failed to create a Rotate event "
7264 "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
7320 if (rli->log_space_limit &&
7321 rli->log_space_limit < rli->log_space_total)
7330 rli->sql_force_rotate_relay=
7331 (rli->mts_group_status != Relay_log_info::MTS_IN_GROUP);
7334 rli->ignore_log_space_limit=
true;
7348 DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0)))
7352 ulonglong period=
static_cast<ulonglong
>(opt_mts_checkpoint_period * 1000000ULL);
7353 ulong signal_cnt= rli->relay_log.signal_cnt;
7363 (void) mts_checkpoint_routine(rli, period,
false,
true);
7366 if (rli->gaq->empty())
7367 rli->last_master_timestamp= 0;
7369 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0))
7370 period= 10000000ULL;
7372 set_timespec_nsec(waittime, period);
7374 }
while ((ret == ETIMEDOUT || ret == ETIME) &&
7375 signal_cnt == rli->relay_log.signal_cnt && !thd->killed);
7391 end_io_cache(cur_log);
7392 DBUG_ASSERT(rli->cur_log_fd >= 0);
7394 rli->cur_log_fd = -1;
7396 if (relay_log_purge)
7409 if (rli->relay_log.purge_first_log
7411 rli->get_group_relay_log_pos() == rli->get_event_relay_log_pos()
7412 && !strcmp(rli->get_group_relay_log_name(),rli->get_event_relay_log_name())))
7414 errmsg =
"Error purging processed logs";
7417 DBUG_PRINT(
"info", (
"next_event group master %s %lu group relay %s %lu event %s %lu\n",
7418 rli->get_group_master_log_name(),
7419 (ulong) rli->get_group_master_log_pos(),
7420 rli->get_group_relay_log_name(),
7421 (ulong) rli->get_group_relay_log_pos(),
7422 rli->get_event_relay_log_name(),
7423 (ulong) rli->get_event_relay_log_pos()));
7436 errmsg =
"error switching to the next log";
7439 rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
7440 rli->set_event_relay_log_name(rli->linfo.log_file_name);
7451 DBUG_PRINT(
"info", (
"next_event: MTS group relay log changes to %s %lu\n",
7452 rli->get_group_relay_log_name(),
7453 (ulong) rli->get_group_relay_log_pos()));
7467 DBUG_PRINT(
"info",(
"hot_log: %d",hot_log));
7470 if (rli->relay_log.
is_active(rli->linfo.log_file_name))
7474 sql_print_information(
"next log '%s' is currently active",
7475 rli->linfo.log_file_name);
7477 rli->cur_log= cur_log= rli->relay_log.get_log_file();
7478 rli->cur_log_old_open_count= rli->relay_log.get_open_count();
7479 DBUG_ASSERT(rli->cur_log_fd == -1);
7541 my_b_seek(cur_log, (my_off_t) 0);
7542 if (check_binlog_magic(cur_log,&errmsg))
7561 sql_print_information(
"next log '%s' is not active",
7562 rli->linfo.log_file_name);
7577 sql_print_error(
"Slave SQL thread: I/O error reading \
7578 event(errno: %d cur_log->error: %d)",
7579 my_errno,cur_log->error);
7581 my_b_seek(cur_log,rli->get_event_relay_log_pos());
7583 errmsg =
"Aborting slave SQL thread because of partial event read";
7587 if (!errmsg && log_warnings)
7589 sql_print_information(
"Error reading relay log event: %s",
7590 "slave SQL thread was killed");
7596 sql_print_error(
"Error reading relay log event: %s", errmsg);
7608 int rotate_relay_log(Master_info* mi)
7610 DBUG_ENTER(
"rotate_relay_log");
7613 DBUG_EXECUTE_IF(
"crash_before_rotate_relaylog", DBUG_SUICIDE(););
7624 DBUG_PRINT(
"info", (
"rli->inited == 0"));
7629 error= rli->relay_log.new_file(mi->get_mi_description_event());
7646 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
7667 bool rpl_master_has_bug(
const Relay_log_info *rli, uint bug_id,
bool report,
7668 bool (*pred)(
const void *),
const void *param)
7670 struct st_version_range_for_one_bug {
7672 const uchar introduced_in[3];
7673 const uchar fixed_in[3];
7675 static struct st_version_range_for_one_bug versions_for_all_bugs[]=
7677 {24432, { 5, 0, 24 }, { 5, 0, 38 } },
7678 {24432, { 5, 1, 12 }, { 5, 1, 17 } },
7679 {33029, { 5, 0, 0 }, { 5, 0, 58 } },
7680 {33029, { 5, 1, 0 }, { 5, 1, 12 } },
7681 {37426, { 5, 1, 0 }, { 5, 1, 26 } },
7683 const uchar *master_ver=
7689 i <
sizeof(versions_for_all_bugs)/
sizeof(*versions_for_all_bugs);i++)
7691 const uchar *introduced_in= versions_for_all_bugs[
i].introduced_in,
7692 *fixed_in= versions_for_all_bugs[
i].fixed_in;
7693 if ((versions_for_all_bugs[i].bug_id == bug_id) &&
7694 (memcmp(introduced_in, master_ver, 3) <= 0) &&
7695 (memcmp(fixed_in, master_ver, 3) > 0) &&
7696 (pred == NULL || (*pred)(param)))
7701 my_printf_error(ER_UNKNOWN_ERROR,
"master may suffer from"
7702 " http://bugs.mysql.com/bug.php?id=%u"
7703 " so slave stops; check error log on slave"
7704 " for more info", MYF(0), bug_id);
7706 rli->
report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
7707 "According to the master's version ('%s'),"
7708 " it is probable that master suffers from this bug:"
7709 " http://bugs.mysql.com/bug.php?id=%u"
7710 " and thus replicating the current binary log event"
7711 " may make the slave's data become different from the"
7713 " To take no risk, slave refuses to replicate"
7714 " this event and stops."
7715 " We recommend that all updates be stopped on the"
7716 " master and slave, that the data of both be"
7717 " manually synchronized,"
7718 " that master's binary logs be deleted,"
7719 " that master be upgraded to a version at least"
7720 " equal to '%d.%d.%d'. Then replication can be"
7724 fixed_in[0], fixed_in[1], fixed_in[2]);
7741 bool rpl_master_erroneous_autoinc(THD *thd)
7743 if (active_mi != NULL && active_mi->rli->info_thd == thd)
7746 DBUG_EXECUTE_IF(
"simulate_bug33029",
return TRUE;);
7747 return rpl_master_has_bug(rli, 33029, FALSE, NULL, NULL);
7758 uint sql_slave_skip_counter;
7773 int start_slave(THD* thd , Master_info* mi,
bool net_report)
7777 DBUG_ENTER(
"start_slave");
7779 if (
check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
7782 if (thd->lex->slave_connection.user ||
7783 thd->lex->slave_connection.password)
7785 #if defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
7786 if (thd->vio_ok() && !thd->net.vio->ssl_arg)
7787 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
7788 ER_INSECURE_PLAIN_TEXT,
7789 ER(ER_INSECURE_PLAIN_TEXT));
7791 #if !defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
7792 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
7793 ER_INSECURE_PLAIN_TEXT,
7794 ER(ER_INSECURE_PLAIN_TEXT));
7798 lock_slave_threads(mi);
7800 init_thread_mask(&thread_mask,mi,1 );
7807 if (thd->lex->slave_thd_opt)
7808 thread_mask&= thd->lex->slave_thd_opt;
7811 if (global_init_info(mi,
false, thread_mask))
7812 slave_errno=ER_MASTER_INFO;
7813 else if (server_id_supplied && *mi->host)
7819 if (thread_mask & SLAVE_IO)
7821 if (thd->lex->slave_connection.user)
7823 mi->set_start_user_configured(
true);
7824 mi->set_user(thd->lex->slave_connection.user);
7826 if (thd->lex->slave_connection.password)
7828 mi->set_start_user_configured(
true);
7829 mi->set_password(thd->lex->slave_connection.password,
7830 strlen(thd->lex->slave_connection.password));
7832 if (thd->lex->slave_connection.plugin_auth)
7833 mi->set_plugin_auth(thd->lex->slave_connection.plugin_auth);
7834 if (thd->lex->slave_connection.plugin_dir)
7835 mi->set_plugin_dir(thd->lex->slave_connection.plugin_dir);
7843 if (thread_mask & SLAVE_SQL)
7850 mi->rli->opt_slave_parallel_workers= opt_mts_slave_parallel_workers;
7852 if (!DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0))
7854 mi->rli->checkpoint_group= opt_mts_checkpoint_group;
7858 if (thd->lex->mi.pos)
7860 if (thd->lex->mi.relay_log_pos)
7861 slave_errno= ER_BAD_SLAVE_UNTIL_COND;
7862 mi->rli->until_condition= Relay_log_info::UNTIL_MASTER_POS;
7863 mi->rli->until_log_pos= thd->lex->mi.pos;
7868 strmake(mi->rli->until_log_name, thd->lex->mi.log_file_name,
7869 sizeof(mi->rli->until_log_name)-1);
7871 else if (thd->lex->mi.relay_log_pos)
7873 if (thd->lex->mi.pos)
7874 slave_errno= ER_BAD_SLAVE_UNTIL_COND;
7875 mi->rli->until_condition= Relay_log_info::UNTIL_RELAY_POS;
7876 mi->rli->until_log_pos= thd->lex->mi.relay_log_pos;
7877 strmake(mi->rli->until_log_name, thd->lex->mi.relay_log_name,
7878 sizeof(mi->rli->until_log_name)-1);
7880 else if (thd->lex->mi.gtid)
7882 global_sid_lock->
wrlock();
7883 mi->rli->clear_until_condition();
7884 if (mi->rli->until_sql_gtids.add_gtid_text(thd->lex->mi.gtid)
7885 != RETURN_STATUS_OK)
7886 slave_errno= ER_BAD_SLAVE_UNTIL_COND;
7888 mi->rli->until_condition=
7889 LEX_MASTER_INFO::UNTIL_SQL_BEFORE_GTIDS == thd->lex->mi.gtid_until_condition
7890 ? Relay_log_info::UNTIL_SQL_BEFORE_GTIDS
7891 : Relay_log_info::UNTIL_SQL_AFTER_GTIDS;
7892 if ((mi->rli->until_condition ==
7893 Relay_log_info::UNTIL_SQL_AFTER_GTIDS) &&
7894 mi->rli->opt_slave_parallel_workers != 0)
7896 mi->rli->opt_slave_parallel_workers= 0;
7897 push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
7898 ER_MTS_FEATURE_IS_NOT_SUPPORTED,
7899 ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
7901 "Slave is started in the sequential execution mode.");
7904 global_sid_lock->
unlock();
7906 else if (thd->lex->mi.until_after_gaps)
7908 mi->rli->until_condition= Relay_log_info::UNTIL_SQL_AFTER_MTS_GAPS;
7909 mi->rli->opt_slave_parallel_workers=
7910 mi->rli->recovery_parallel_workers;
7913 mi->rli->clear_until_condition();
7915 if (mi->rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
7916 mi->rli->until_condition == Relay_log_info::UNTIL_RELAY_POS)
7919 const char *p= fn_ext(mi->rli->until_log_name);
7924 mi->rli->until_log_name_extension= strtoul(++p,&p_end, 10);
7930 if (p_end==p || *p_end)
7931 slave_errno=ER_BAD_SLAVE_UNTIL_COND;
7934 slave_errno=ER_BAD_SLAVE_UNTIL_COND;
7937 mi->rli->until_log_names_cmp_result=
7938 Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
7941 if (!opt_skip_slave_start)
7942 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
7943 ER_MISSING_SKIP_SLAVE,
7944 ER(ER_MISSING_SKIP_SLAVE));
7945 if (mi->rli->opt_slave_parallel_workers != 0)
7947 mi->rli->opt_slave_parallel_workers= 0;
7948 push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
7949 ER_MTS_FEATURE_IS_NOT_SUPPORTED,
7950 ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
7952 "Slave is started in the sequential execution mode.");
7959 if (mi->rli->opt_slave_parallel_workers != 0 && slave_trans_retries != 0)
7961 push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
7962 ER_MTS_FEATURE_IS_NOT_SUPPORTED,
7963 ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
7964 "slave_transaction_retries",
7965 "In the event of a transient failure, the slave will "
7966 "not retry the transaction and will stop.");
7969 else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos || thd->lex->mi.gtid)
7970 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
7971 ER(ER_UNTIL_COND_IGNORED));
7974 slave_errno = start_slave_threads(
false,
7980 slave_errno = ER_BAD_SLAVE;
7985 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
7986 ER(ER_SLAVE_WAS_RUNNING));
7994 (thread_mask & SLAVE_IO) == SLAVE_IO)
7995 mi->reset_start_info();
7997 unlock_slave_threads(mi);
8002 my_message(slave_errno, ER(slave_errno), MYF(0));
8005 else if (net_report)
8025 int stop_slave(THD* thd, Master_info* mi,
bool net_report )
8027 DBUG_ENTER(
"stop_slave");
8033 if (
check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
8035 THD_STAGE_INFO(thd, stage_killing_slave);
8037 lock_slave_threads(mi);
8039 init_thread_mask(&thread_mask,mi,0 );
8046 if (thd->lex->slave_thd_opt)
8047 thread_mask &= thd->lex->slave_thd_opt;
8051 slave_errno= terminate_slave_threads(mi,thread_mask,
8058 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
8059 ER(ER_SLAVE_WAS_NOT_RUNNING));
8061 unlock_slave_threads(mi);
8065 if ((slave_errno == ER_STOP_SLAVE_SQL_THREAD_TIMEOUT) ||
8066 (slave_errno == ER_STOP_SLAVE_IO_THREAD_TIMEOUT))
8068 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, slave_errno,
8070 sql_print_warning(
"%s",ER(slave_errno));
8073 my_message(slave_errno, ER(slave_errno), MYF(0));
8076 else if (net_report)
8094 int reset_slave(THD *thd, Master_info* mi)
8096 int thread_mask= 0, error= 0;
8097 uint sql_errno=ER_UNKNOWN_ERROR;
8098 const char* errmsg=
"Unknown error occured while reseting slave";
8099 DBUG_ENTER(
"reset_slave");
8101 lock_slave_threads(mi);
8102 init_thread_mask(&thread_mask,mi,0 );
8105 sql_errno= ER_SLAVE_MUST_STOP;
8110 ha_reset_slave(thd);
8113 if ((error= mi->rli->purge_relay_logs(thd,
8117 sql_errno= ER_RELAY_LOG_FAIL;
8122 DBUG_ASSERT(!mi->rli || !mi->rli->slave_running);
8123 mi->clear_in_memory_info(thd->lex->reset_slave_info.all);
8125 if (remove_info(mi))
8131 (void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
8133 unlock_slave_threads(mi);
8135 my_error(sql_errno, MYF(0), errmsg);
8152 bool change_master(THD* thd, Master_info* mi)
8155 const char* errmsg= 0;
8156 bool need_relay_log_purge= 1;
8157 char *var_master_log_name= NULL, *var_group_master_log_name= NULL;
8159 char saved_host[HOSTNAME_LENGTH + 1], saved_bind_addr[HOSTNAME_LENGTH + 1];
8161 char saved_log_name[FN_REFLEN];
8162 my_off_t saved_log_pos= 0;
8163 my_bool save_relay_log_purge= relay_log_purge;
8164 bool mts_remove_workers=
false;
8166 DBUG_ENTER(
"change_master");
8168 lock_slave_threads(mi);
8169 init_thread_mask(&thread_mask,mi,0 );
8170 LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
8173 my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
8177 thread_mask= SLAVE_IO | SLAVE_SQL;
8179 THD_STAGE_INFO(thd, stage_changing_master);
8187 if(lex_mi->host && !*lex_mi->host)
8189 my_error(ER_WRONG_ARGUMENTS, MYF(0),
"MASTER_HOST");
8190 unlock_slave_threads(mi);
8193 if (global_init_info(mi,
false, thread_mask))
8195 my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
8199 if (mi->rli->mts_recovery_group_cnt)
8205 DBUG_ASSERT(mi->rli->recovery_parallel_workers);
8207 my_message(ER_MTS_CHANGE_MASTER_CANT_RUN_WITH_GAPS,
8208 ER(ER_MTS_CHANGE_MASTER_CANT_RUN_WITH_GAPS), MYF(0));
8218 if (mi->rli->recovery_parallel_workers)
8219 mts_remove_workers=
true;
8226 if (lex_mi->log_file_name != NULL || lex_mi->pos != 0 ||
8227 lex_mi->relay_log_name != NULL || lex_mi->relay_log_pos != 0)
8229 if (lex_mi->auto_position == LEX_MASTER_INFO::LEX_MI_ENABLE ||
8230 (lex_mi->auto_position != LEX_MASTER_INFO::LEX_MI_DISABLE &&
8231 mi->is_auto_position()))
8233 my_message(ER_BAD_SLAVE_AUTO_POSITION,
8234 ER(ER_BAD_SLAVE_AUTO_POSITION), MYF(0));
8241 if (lex_mi->auto_position == LEX_MASTER_INFO::LEX_MI_ENABLE && gtid_mode != 3)
8243 my_message(ER_AUTO_POSITION_REQUIRES_GTID_MODE_ON,
8244 ER(ER_AUTO_POSITION_REQUIRES_GTID_MODE_ON), MYF(0));
8258 strmake(saved_host, mi->host, HOSTNAME_LENGTH);
8259 strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH);
8260 saved_port= mi->port;
8261 strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1);
8262 saved_log_pos= mi->get_master_log_pos();
8269 if ((lex_mi->host && strcmp(lex_mi->host, mi->host)) ||
8270 (lex_mi->port && lex_mi->port != mi->port))
8281 if (mi->clean_info())
8286 mi->master_uuid[0]= 0;
8290 if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
8292 var_master_log_name=
const_cast<char*
>(mi->get_master_log_name());
8293 var_master_log_name[0]=
'\0';
8294 mi->set_master_log_pos(BIN_LOG_HEADER_SIZE);
8297 if (lex_mi->log_file_name)
8298 mi->set_master_log_name(lex_mi->log_file_name);
8301 mi->set_master_log_pos(lex_mi->pos);
8303 DBUG_PRINT(
"info", (
"master_log_pos: %lu", (ulong) mi->get_master_log_pos()));
8305 if (lex_mi->user || lex_mi->password)
8307 #if defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
8308 if (thd->vio_ok() && !thd->net.vio->ssl_arg)
8309 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
8310 ER_INSECURE_PLAIN_TEXT,
8311 ER(ER_INSECURE_PLAIN_TEXT));
8313 #if !defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
8314 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
8315 ER_INSECURE_PLAIN_TEXT,
8316 ER(ER_INSECURE_PLAIN_TEXT));
8318 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
8319 ER_INSECURE_CHANGE_MASTER,
8320 ER(ER_INSECURE_CHANGE_MASTER));
8324 mi->set_user(lex_mi->user);
8326 if (lex_mi->password)
8328 if (mi->set_password(lex_mi->password, strlen(lex_mi->password)))
8335 my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
8341 strmake(mi->host, lex_mi->host,
sizeof(mi->host)-1);
8342 if (lex_mi->bind_addr)
8343 strmake(mi->bind_addr, lex_mi->bind_addr,
sizeof(mi->bind_addr)-1);
8345 mi->port = lex_mi->port;
8346 if (lex_mi->connect_retry)
8347 mi->connect_retry = lex_mi->connect_retry;
8348 if (lex_mi->retry_count_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
8349 mi->retry_count = lex_mi->retry_count;
8350 if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
8351 mi->heartbeat_period = lex_mi->heartbeat_period;
8354 (slave_net_timeout/2.0));
8355 mi->received_heartbeats= LL(0);
8360 if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
8361 reset_dynamic(&(mi->ignore_server_ids->dynamic_ids));
8362 for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
8365 get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
8366 if (s_id == ::server_id && replicate_same_server_id)
8368 my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
8374 if (bsearch((
const ulong *) &s_id,
8375 mi->ignore_server_ids->dynamic_ids.buffer,
8376 mi->ignore_server_ids->dynamic_ids.elements,
sizeof(ulong),
8377 (int (*) (
const void*,
const void*))
8378 change_master_server_id_cmp) == NULL)
8379 insert_dynamic(&(mi->ignore_server_ids->dynamic_ids), (uchar*) &s_id);
8382 sort_dynamic(&(mi->ignore_server_ids->dynamic_ids), (qsort_cmp) change_master_server_id_cmp);
8384 if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
8385 mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
8387 if (lex_mi->sql_delay != -1)
8388 mi->rli->set_sql_delay(lex_mi->sql_delay);
8390 if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
8391 mi->ssl_verify_server_cert=
8392 (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
8395 strmake(mi->ssl_ca, lex_mi->ssl_ca,
sizeof(mi->ssl_ca)-1);
8396 if (lex_mi->ssl_capath)
8397 strmake(mi->ssl_capath, lex_mi->ssl_capath,
sizeof(mi->ssl_capath)-1);
8398 if (lex_mi->ssl_cert)
8399 strmake(mi->ssl_cert, lex_mi->ssl_cert,
sizeof(mi->ssl_cert)-1);
8400 if (lex_mi->ssl_cipher)
8401 strmake(mi->ssl_cipher, lex_mi->ssl_cipher,
sizeof(mi->ssl_cipher)-1);
8402 if (lex_mi->ssl_key)
8403 strmake(mi->ssl_key, lex_mi->ssl_key,
sizeof(mi->ssl_key)-1);
8404 if (lex_mi->ssl_crl)
8405 strmake(mi->ssl_crl, lex_mi->ssl_crl,
sizeof(mi->ssl_crl)-1);
8406 if (lex_mi->ssl_crlpath)
8407 strmake(mi->ssl_crlpath, lex_mi->ssl_crlpath,
sizeof(mi->ssl_crlpath)-1);
8408 #ifndef HAVE_OPENSSL
8409 if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
8410 lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
8411 lex_mi->ssl_verify_server_cert || lex_mi->ssl_crl || lex_mi->ssl_crlpath)
8412 push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
8413 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
8416 if (lex_mi->relay_log_name)
8418 need_relay_log_purge= 0;
8419 char relay_log_name[FN_REFLEN];
8421 mi->rli->relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
8422 mi->rli->set_group_relay_log_name(relay_log_name);
8423 mi->rli->set_event_relay_log_name(relay_log_name);
8426 if (lex_mi->relay_log_pos)
8428 need_relay_log_purge= 0;
8429 mi->rli->set_group_relay_log_pos(lex_mi->relay_log_pos);
8430 mi->rli->set_event_relay_log_pos(lex_mi->relay_log_pos);
8447 if (!lex_mi->host && !lex_mi->port &&
8448 !lex_mi->log_file_name && !lex_mi->pos &&
8449 need_relay_log_purge)
8458 mi->set_master_log_pos(max<ulonglong>(BIN_LOG_HEADER_SIZE,
8459 mi->rli->get_group_master_log_pos()));
8460 mi->set_master_log_name(mi->rli->get_group_master_log_name());
8467 if (lex_mi->auto_position != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
8468 mi->set_auto_position(
8469 (lex_mi->auto_position == LEX_MASTER_INFO::LEX_MI_ENABLE));
8475 if (flush_master_info(mi,
true))
8477 my_error(ER_RELAY_LOG_INIT, MYF(0),
"Failed to flush master info file");
8481 if (need_relay_log_purge)
8484 THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
8485 if (mi->rli->purge_relay_logs(thd,
8489 my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
8500 if (mi->rli->init_relay_log_pos(mi->rli->get_group_relay_log_name(),
8501 mi->rli->get_group_relay_log_pos(),
8505 my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
8510 relay_log_purge= save_relay_log_purge;
8522 if (need_relay_log_purge)
8524 mi->rli->set_group_master_log_pos(mi->get_master_log_pos());
8525 DBUG_PRINT(
"info", (
"master_log_pos: %lu", (ulong) mi->get_master_log_pos()));
8526 mi->rli->set_group_master_log_name(mi->get_master_log_name());
8528 var_group_master_log_name=
const_cast<char *
>(mi->rli->get_group_master_log_name());
8529 if (!var_group_master_log_name[0])
8530 mi->rli->set_group_master_log_pos(0);
8533 mi->rli->abort_pos_wait++;
8535 mi->rli->clear_error();
8536 mi->rli->clear_until_condition();
8538 sql_print_information(
"'CHANGE MASTER TO executed'. "
8539 "Previous state master_host='%s', master_port= %u, master_log_file='%s', "
8540 "master_log_pos= %ld, master_bind='%s'. "
8541 "New state master_host='%s', master_port= %u, master_log_file='%s', "
8542 "master_log_pos= %ld, master_bind='%s'.",
8543 saved_host, saved_port, saved_log_name, (ulong) saved_log_pos,
8544 saved_bind_addr, mi->host, mi->port, mi->get_master_log_name(),
8545 (ulong) mi->get_master_log_pos(), mi->bind_addr);
8557 DBUG_ASSERT(!mi->rli->slave_running);
8558 if ((ret= mi->rli->flush_info(
true)))
8559 my_error(ER_RELAY_LOG_INIT, MYF(0),
"Failed to flush relay info file.");
8564 unlock_slave_threads(mi);
8567 if (!mts_remove_workers)
8570 if (!Rpl_info_factory::reset_workers(mi->rli))
8573 my_error(ER_MTS_RESET_WORKERS, MYF(0));