26 #include "rpl_utility.h"
27 #include "transaction.h"
28 #include "sql_parse.h"
30 #include "rpl_rli_pdb.h"
31 #include "rpl_info_factory.h"
32 #include <mysql/plugin.h>
43 const char* info_rli_fields[]=
46 "group_relay_log_name",
47 "group_relay_log_pos",
48 "group_master_log_name",
49 "group_master_log_pos",
55 Relay_log_info::Relay_log_info(
bool is_slave_recovery
56 #ifdef HAVE_PSI_INTERFACE
57 ,PSI_mutex_key *param_key_info_run_lock,
58 PSI_mutex_key *param_key_info_data_lock,
59 PSI_mutex_key *param_key_info_sleep_lock,
60 PSI_mutex_key *param_key_info_data_cond,
61 PSI_mutex_key *param_key_info_start_cond,
62 PSI_mutex_key *param_key_info_stop_cond,
63 PSI_mutex_key *param_key_info_sleep_cond
68 #ifdef HAVE_PSI_INTERFACE
69 ,param_key_info_run_lock, param_key_info_data_lock,
70 param_key_info_sleep_lock,
71 param_key_info_data_cond, param_key_info_start_cond,
72 param_key_info_stop_cond, param_key_info_sleep_cond
76 replicate_same_server_id(::replicate_same_server_id),
77 cur_log_fd(-1), relay_log(&sync_relaylog_period),
78 is_relay_log_recovery(is_slave_recovery),
79 save_temporary_tables(0),
80 cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
81 group_master_log_pos(0),
82 gtid_set(global_sid_map, global_sid_lock),
83 log_space_total(0), ignore_log_space_limit(0),
84 sql_force_rotate_relay(false),
85 last_master_timestamp(0), slave_skip_counter(0),
86 abort_pos_wait(0), until_condition(UNTIL_NONE),
88 until_sql_gtids(global_sid_map),
89 until_sql_gtids_first_event(true),
91 tables_to_lock(0), tables_to_lock_count(0),
92 rows_query_ev(NULL), last_event_start_time(0), deferred_events(NULL),
93 slave_parallel_workers(0),
94 recovery_parallel_workers(0), checkpoint_seqno(0),
95 checkpoint_group(opt_mts_checkpoint_group),
96 recovery_groups_inited(false), mts_recovery_group_cnt(0),
97 mts_recovery_index(0), mts_recovery_group_seen_begin(0),
98 mts_group_status(MTS_NOT_IN_GROUP), reported_unsafe_warning(false),
99 rli_description_event(NULL),
100 sql_delay(0), sql_delay_end(0), m_flags(0), row_stmt_start_timestamp(0),
101 long_find_row_note_printed(false), error_on_rli_init_info(false)
103 DBUG_ENTER(
"Relay_log_info::Relay_log_info");
105 #ifdef HAVE_PSI_INTERFACE
106 relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
107 key_RELAYLOG_LOCK_commit,
108 key_RELAYLOG_LOCK_commit_queue,
109 key_RELAYLOG_LOCK_done,
110 key_RELAYLOG_LOCK_flush_queue,
111 key_RELAYLOG_LOCK_log,
112 key_RELAYLOG_LOCK_sync,
113 key_RELAYLOG_LOCK_sync_queue,
114 key_RELAYLOG_LOCK_xids,
115 key_RELAYLOG_COND_done,
116 key_RELAYLOG_update_cond,
117 key_RELAYLOG_prep_xids_cond,
119 key_file_relaylog_index);
122 group_relay_log_name[0]= event_relay_log_name[0]=
123 group_master_log_name[0]= 0;
124 until_log_name[0]= ign_master_log_name_end[0]= 0;
125 set_timespec_nsec(last_clock, 0);
126 memset(&cache_buf, 0,
sizeof(cache_buf));
127 cached_charset_invalidate();
130 &log_space_lock, MY_MUTEX_INIT_FAST);
131 mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
134 mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
135 my_atomic_rwlock_init(&slave_open_temp_tables_lock);
137 relay_log.init_pthread_objects();
152 mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits_cnt= 0;
153 mts_wq_excess_cnt= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
154 mts_last_online_stat= 0;
155 my_init_dynamic_array(&workers,
sizeof(Slave_worker *), n_workers, 4);
163 delete_dynamic(&workers);
166 Relay_log_info::~Relay_log_info()
168 DBUG_ENTER(
"Relay_log_info::~Relay_log_info");
170 if (recovery_groups_inited)
171 bitmap_free(&recovery_groups);
176 my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
178 set_rli_description_event(NULL);
194 if (!is_parallel_exec())
196 for (uint
i= 0;
i < workers.elements;
i++)
198 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers,
i);
199 w->relay_log_change_notified= FALSE;
226 if (!is_parallel_exec())
229 for (uint
i= 0;
i < workers.elements;
i++)
231 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers,
i);
241 w->checkpoint_notified= FALSE;
242 w->bitmap_shifted= w->bitmap_shifted + shift;
249 w->master_log_change_notified=
false;
251 DBUG_PRINT(
"mts", (
"reset_notified_checkpoint shift --> %lu, "
252 "worker->bitmap_shifted --> %lu, worker --> %u.",
253 shift, w->bitmap_shifted,
i));
261 DBUG_ASSERT(!(shift == 0 && checkpoint_seqno != 0));
262 checkpoint_seqno= checkpoint_seqno - shift;
263 DBUG_PRINT(
"mts", (
"reset_notified_checkpoint shift --> %lu, "
264 "checkpoint_seqno --> %u.", shift, checkpoint_seqno));
272 last_master_timestamp= new_ts;
288 uint repo_type= get_rpl_info_handler()->get_rpl_info_type();
290 DBUG_ENTER(
"Relay_log_info::mts_finalize_recovery");
292 for (i= 0; !ret && i < workers.elements; i++)
294 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
295 ret= w->reset_recovery_info();
296 DBUG_EXECUTE_IF(
"mts_debug_recovery_reset_fails", ret=
true;);
304 for (i= recovery_parallel_workers; i > workers.elements && !
ret; i--)
307 Rpl_info_factory::create_worker(repo_type, i - 1,
this,
true);
308 ret= w->remove_info();
311 recovery_parallel_workers= slave_parallel_workers;
319 DBUG_ENTER(
"add_relay_log");
321 linfo->log_file_name, &s, MYF(0)))
323 sql_print_error(
"log %s listed in the index, but failed to stat.",
324 linfo->log_file_name);
327 rli->log_space_total += s.st_size;
330 DBUG_PRINT(
"info",(
"log_space_total: %s", llstr(rli->log_space_total,buf)));
335 int Relay_log_info::count_relay_log_space()
338 DBUG_ENTER(
"Relay_log_info::count_relay_log_space");
340 if (relay_log.find_log_pos(&flinfo, NullS, 1))
342 sql_print_error(
"Could not find first log while counting relay log space.");
347 if (add_relay_log(
this, &flinfo))
349 }
while (!relay_log.find_next_log(&flinfo, 1));
355 relay_log.reset_bytes_written();
365 DBUG_ENTER(
"clear_until_condition");
367 until_condition= Relay_log_info::UNTIL_NONE;
368 until_log_name[0]= 0;
370 until_sql_gtids.clear();
371 until_sql_gtids_first_event=
true;
407 ulonglong pos,
bool need_data_lock,
409 bool look_for_description_event)
411 DBUG_ENTER(
"Relay_log_info::init_relay_log_pos");
412 DBUG_PRINT(
"info", (
"pos: %lu", (ulong) pos));
415 const char* errmsg_fmt= 0;
416 static char errmsg_buff[MYSQL_ERRMSG_SIZE + FN_REFLEN];
437 end_io_cache(&cache_buf);
442 group_relay_log_pos= event_relay_log_pos= pos;
448 if (relay_log.find_log_pos(&linfo, NullS, 1))
450 *errmsg=
"Could not find first log during relay log initialization";
454 if (log && relay_log.find_log_pos(&linfo, log, 1))
456 errmsg_fmt=
"Could not find target log file mentioned in "
457 "relay log info in the index file '%s' during "
458 "relay log initialization";
459 sprintf(errmsg_buff, errmsg_fmt, relay_log.get_index_fname());
460 *errmsg= errmsg_buff;
464 strmake(group_relay_log_name, linfo.log_file_name,
465 sizeof(group_relay_log_name) - 1);
466 strmake(event_relay_log_name, linfo.log_file_name,
467 sizeof(event_relay_log_name) - 1);
469 if (relay_log.is_active(linfo.log_file_name))
476 my_b_seek((cur_log=relay_log.get_log_file()), (off_t)0);
477 if (check_binlog_magic(cur_log, errmsg))
479 cur_log_old_open_count=relay_log.get_open_count();
487 linfo.log_file_name,errmsg)) < 0)
489 cur_log = &cache_buf;
495 if (pos > BIN_LOG_HEADER_SIZE)
498 while (look_for_description_event)
504 DBUG_PRINT(
"info",(
"looking for a Format_description_log_event"));
506 if (my_b_tell(cur_log) >= pos)
513 if (!(ev= Log_event::read_log_event(cur_log, 0,
514 rli_description_event,
515 opt_slave_sql_verify_checksum)))
517 DBUG_PRINT(
"info",(
"could not read event, cur_log->error=%d",
521 *errmsg=
"I/O error reading event at position 4";
526 else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
528 DBUG_PRINT(
"info",(
"found Format_description_log_event"));
553 DBUG_PRINT(
"info",(
"found event of another type=%d",
554 ev->get_type_code()));
555 look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
559 my_b_seek(cur_log,(off_t)pos);
562 char llbuf1[22], llbuf2[22];
563 DBUG_PRINT(
"info", (
"my_b_tell(cur_log)=%s >event_relay_log_pos=%s",
564 llstr(my_b_tell(cur_log),llbuf1),
565 llstr(get_event_relay_log_pos(),llbuf2)));
576 if (!relay_log_purge)
586 if (!rli_description_event->is_valid() && !*errmsg)
587 *errmsg=
"Invalid Format_description log event; could be out of memory";
589 DBUG_RETURN ((*errmsg) ? 1 : 0);
618 ulong init_abort_pos_wait;
622 DBUG_ENTER(
"Relay_log_info::wait_for_pos");
627 DBUG_PRINT(
"enter",(
"log_name: '%s' log_pos: %lu timeout: %lu",
628 log_name->c_ptr_safe(), (ulong) log_pos, (ulong) timeout));
630 set_timespec(abstime,timeout);
632 thd->ENTER_COND(&data_cond, &data_lock,
633 &stage_waiting_for_the_slave_thread_to_advance_position,
648 init_abort_pos_wait= abort_pos_wait;
656 ulong log_name_extension;
657 char log_name_tmp[FN_REFLEN];
659 strmake(log_name_tmp, log_name->ptr(), min<uint32>(log_name->length(), FN_REFLEN-1));
661 char *p= fn_ext(log_name_tmp);
663 if (!*p || log_pos<0)
669 log_pos= max<ulong>(log_pos, BIN_LOG_HEADER_SIZE);
671 log_name_extension= strtoul(++p, &p_end, 10);
677 if (p_end==p || *p_end)
684 while (!thd->killed &&
685 init_abort_pos_wait == abort_pos_wait &&
692 (
"init_abort_pos_wait: %ld abort_pos_wait: %ld",
693 init_abort_pos_wait, abort_pos_wait));
694 DBUG_PRINT(
"info",(
"group_master_log_name: '%s' pos: %lu",
695 group_master_log_name, (ulong) group_master_log_pos));
710 if (*group_master_log_name)
712 char *basename= (group_master_log_name +
713 dirname_length(group_master_log_name));
720 char *q= (
char*)(fn_ext(basename)+1);
721 if (strncmp(basename, log_name_tmp, (
int)(q-basename)))
728 ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
729 if (group_master_log_name_extension < log_name_extension)
732 cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
734 pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
736 if (pos_reached || thd->killed)
742 DBUG_PRINT(
"info",(
"Waiting for master update"));
766 DBUG_PRINT(
"info",(
"Got signal of master update or timed out"));
767 if (error == ETIMEDOUT || error == ETIME)
774 if (DBUG_EVALUATE_IF(
"debug_crash_slave_time_out", 1, 0))
782 DBUG_PRINT(
"info",(
"Testing if killed or SQL thread not running"));
786 thd->EXIT_COND(&old_stage);
787 DBUG_PRINT(
"exit",(
"killed: %d abort: %d slave_running: %d \
788 improper_arguments: %d timed_out: %d",
790 (int) (init_abort_pos_wait != abort_pos_wait),
793 (
int) (error == -1)));
794 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
799 DBUG_RETURN( error ? error : event_count );
809 int Relay_log_info::wait_for_gtid_set(THD* thd,
String* gtid,
813 ulong init_abort_pos_wait;
817 DBUG_ENTER(
"Relay_log_info::wait_for_gtid_set");
822 DBUG_PRINT(
"info", (
"Waiting for %s timeout %lld", gtid->c_ptr_safe(),
825 set_timespec(abstime, timeout);
827 thd->ENTER_COND(&data_cond, &data_lock,
828 &stage_waiting_for_the_slave_thread_to_advance_position,
843 init_abort_pos_wait= abort_pos_wait;
844 Gtid_set wait_gtid_set(global_sid_map);
845 global_sid_lock->
rdlock();
846 if (wait_gtid_set.add_gtid_text(gtid->c_ptr_safe()) != RETURN_STATUS_OK)
848 global_sid_lock->
unlock();
851 global_sid_lock->
unlock();
854 while (!thd->killed &&
855 init_abort_pos_wait == abort_pos_wait &&
859 (
"init_abort_pos_wait: %ld abort_pos_wait: %ld",
860 init_abort_pos_wait, abort_pos_wait));
864 global_sid_lock->
wrlock();
868 DBUG_PRINT(
"info", (
"Waiting for '%s'. is_subset: %d and "
869 "!is_intersection_nonempty: %d",
870 gtid->c_ptr_safe(), wait_gtid_set.is_subset(logged_gtids),
879 if (wait_gtid_set.is_subset(logged_gtids) &&
882 global_sid_lock->
unlock();
885 global_sid_lock->
unlock();
887 DBUG_PRINT(
"info",(
"Waiting for master update"));
912 DBUG_PRINT(
"info",(
"Got signal of master update or timed out"));
913 if (error == ETIMEDOUT || error == ETIME)
920 if (DBUG_EVALUATE_IF(
"debug_crash_slave_time_out", 1, 0))
928 DBUG_PRINT(
"info",(
"Testing if killed or SQL thread not running"));
932 thd->EXIT_COND(&old_stage);
933 DBUG_PRINT(
"exit",(
"killed: %d abort: %d slave_running: %d \
934 improper_arguments: %d timed_out: %d",
936 (int) (init_abort_pos_wait != abort_pos_wait),
939 (
int) (error == -1)));
940 if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
945 DBUG_RETURN( error ? error : event_count );
948 int Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
952 DBUG_ENTER(
"Relay_log_info::inc_group_relay_log_pos");
959 inc_event_relay_log_pos();
960 group_relay_log_pos= event_relay_log_pos;
961 strmake(group_relay_log_name,event_relay_log_name,
962 sizeof(group_relay_log_name)-1);
964 notify_group_relay_log_name_update();
986 DBUG_PRINT(
"info", (
"log_pos: %lu group_master_log_pos: %lu",
987 (
long) log_pos, (
long) group_master_log_pos));
990 group_master_log_pos= log_pos;
999 DBUG_ASSERT(!is_parallel_exec() ||
1000 mts_group_status != Relay_log_info::MTS_IN_GROUP);
1011 error= flush_info(FALSE);
1020 void Relay_log_info::close_temporary_tables()
1023 DBUG_ENTER(
"Relay_log_info::close_temporary_tables");
1025 for (table=save_temporary_tables ;
table ; table=next)
1032 DBUG_PRINT(
"info", (
"table: 0x%lx", (
long) table));
1033 close_temporary(table, 1, 0);
1035 save_temporary_tables= 0;
1036 slave_open_temp_tables= 0;
1054 const char** errmsg)
1057 DBUG_ENTER(
"Relay_log_info::purge_relay_logs");
1075 group_master_log_name[0]= 0;
1076 group_master_log_pos= 0;
1080 DBUG_PRINT(
"info", (
"inited == 0"));
1084 DBUG_ASSERT(slave_running == 0);
1085 DBUG_ASSERT(mi->slave_running == 0);
1087 slave_skip_counter= 0;
1096 if (cur_log_fd >= 0)
1098 end_io_cache(&cache_buf);
1099 my_close(cur_log_fd, MYF(MY_WME));
1103 if (relay_log.reset_logs(thd))
1105 *errmsg =
"Failed during log reset";
1110 strmake(group_relay_log_name, relay_log.get_log_fname(),
1111 sizeof(group_relay_log_name)-1);
1112 strmake(event_relay_log_name, relay_log.get_log_fname(),
1113 sizeof(event_relay_log_name)-1);
1114 group_relay_log_pos= event_relay_log_pos= BIN_LOG_HEADER_SIZE;
1115 if (count_relay_log_space())
1117 *errmsg=
"Error counting relay log space";
1122 error= init_relay_log_pos(group_relay_log_name,
1123 group_relay_log_pos,
1130 DBUG_PRINT(
"info",(
"log_space_total: %s",llstr(log_space_total,buf)));
1168 char error_msg[]=
"Slave SQL thread is stopped because UNTIL "
1169 "condition is bad.";
1170 DBUG_ENTER(
"Relay_log_info::is_until_satisfied");
1172 switch (until_condition)
1174 case UNTIL_MASTER_POS:
1175 case UNTIL_RELAY_POS:
1177 const char *log_name= NULL;
1178 ulonglong log_pos= 0;
1180 if (until_condition == UNTIL_MASTER_POS)
1182 if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
1184 log_name= group_master_log_name;
1185 log_pos= (!ev)? group_master_log_pos :
1186 ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
1187 group_master_log_pos : ev->log_pos - ev->data_written);
1191 log_name= group_relay_log_name;
1192 log_pos= group_relay_log_pos;
1198 DBUG_PRINT(
"info", (
"group_master_log_name='%s', group_master_log_pos=%s",
1199 group_master_log_name, llstr(group_master_log_pos, buf)));
1200 DBUG_PRINT(
"info", (
"group_relay_log_name='%s', group_relay_log_pos=%s",
1201 group_relay_log_name, llstr(group_relay_log_pos, buf)));
1202 DBUG_PRINT(
"info", (
"(%s) log_name='%s', log_pos=%s",
1203 until_condition == UNTIL_MASTER_POS ?
"master" :
"relay",
1204 log_name, llstr(log_pos, buf)));
1205 DBUG_PRINT(
"info", (
"(%s) until_log_name='%s', until_log_pos=%s",
1206 until_condition == UNTIL_MASTER_POS ?
"master" :
"relay",
1207 until_log_name, llstr(until_log_pos, buf)));
1211 if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
1223 const char *basename= log_name + dirname_length(log_name);
1225 const char *q= (
const char*)(fn_ext(basename)+1);
1226 if (strncmp(basename, until_log_name, (
int)(q-basename)) == 0)
1230 ulong log_name_extension= strtoul(q, &q_end, 10);
1231 if (log_name_extension < until_log_name_extension)
1232 until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
1234 until_log_names_cmp_result=
1235 (log_name_extension > until_log_name_extension) ?
1236 UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
1241 sql_print_error(
"%s", error_msg);
1246 DBUG_RETURN(until_log_pos == 0);
1249 if (((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
1250 log_pos >= until_log_pos) ||
1251 until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER))
1254 sql_print_information(
"Slave SQL thread stopped because it reached its"
1255 " UNTIL position %s", llstr(until_pos(), buf));
1261 case UNTIL_SQL_BEFORE_GTIDS:
1263 if (until_sql_gtids_first_event)
1265 until_sql_gtids_first_event=
false;
1266 global_sid_lock->
wrlock();
1269 if (until_sql_gtids.is_intersection_nonempty(logged_gtids))
1271 char *buffer= until_sql_gtids.
to_string();
1272 global_sid_lock->
unlock();
1273 sql_print_information(
"Slave SQL thread stopped because "
1274 "UNTIL SQL_BEFORE_GTIDS %s is already "
1279 global_sid_lock->
unlock();
1281 if (ev != NULL && ev->get_type_code() == GTID_LOG_EVENT)
1284 global_sid_lock->
rdlock();
1287 char *buffer= until_sql_gtids.to_string();
1288 global_sid_lock->
unlock();
1289 sql_print_information(
"Slave SQL thread stopped because it reached "
1290 "UNTIL SQL_BEFORE_GTIDS %s", buffer);
1294 global_sid_lock->
unlock();
1299 case UNTIL_SQL_AFTER_GTIDS:
1301 global_sid_lock->
wrlock();
1303 if (until_sql_gtids.is_subset(logged_gtids))
1305 char *buffer= until_sql_gtids.
to_string();
1306 global_sid_lock->
unlock();
1307 sql_print_information(
"Slave SQL thread stopped because it reached "
1308 "UNTIL SQL_AFTER_GTIDS %s", buffer);
1312 global_sid_lock->
unlock();
1317 case UNTIL_SQL_AFTER_MTS_GAPS:
1329 if (mts_recovery_group_cnt == 0)
1331 sql_print_information(
"Slave SQL thread stopped according to "
1332 "UNTIL SQL_AFTER_MTS_GAPS as it has "
1333 "processed all gap transactions left from "
1334 "the previous slave session.");
1336 until_condition= UNTIL_DONE;
1357 DBUG_ENTER(
"Relay_log_info::cached_charset_invalidate");
1360 memset(cached_charset, 0,
sizeof(cached_charset));
1365 bool Relay_log_info::cached_charset_compare(
char *charset)
const
1367 DBUG_ENTER(
"Relay_log_info::cached_charset_compare");
1369 if (memcmp(cached_charset, charset,
sizeof(cached_charset)))
1371 memcpy(const_cast<char*>(cached_charset), charset,
sizeof(cached_charset));
1382 clear_flag(IN_STMT);
1384 DBUG_ASSERT(!belongs_to_client());
1386 DBUG_ASSERT(!is_mts_worker(info_thd));
1418 if ((!is_parallel_exec() && is_in_group()) ||
1419 mts_group_status != MTS_NOT_IN_GROUP)
1421 inc_event_relay_log_pos();
1425 if (is_parallel_exec())
1428 DBUG_ASSERT(!is_mts_worker(info_thd));
1436 error= mts_checkpoint_routine(
this, 0,
false,
1440 error= inc_group_relay_log_pos(event_master_log_pos,
1447 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1448 void Relay_log_info::cleanup_context(THD *thd,
bool error)
1450 DBUG_ENTER(
"Relay_log_info::cleanup_context");
1452 DBUG_ASSERT(info_thd == thd);
1467 trans_rollback_stmt(thd);
1468 trans_rollback(thd);
1472 delete rows_query_ev;
1473 rows_query_ev= NULL;
1474 info_thd->set_query(NULL, 0);
1476 m_table_map.clear_tables();
1477 slave_close_thread_tables(thd);
1479 thd->mdl_context.release_transactional_locks();
1480 clear_flag(IN_STMT);
1492 reset_row_stmt_start_timestamp();
1493 unset_long_find_row_note_printed();
1498 void Relay_log_info::clear_tables_to_lock()
1500 DBUG_ENTER(
"Relay_log_info::clear_tables_to_lock()");
1513 for (
TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
1514 DBUG_ASSERT(i == tables_to_lock_count);
1517 while (tables_to_lock)
1519 uchar* to_free=
reinterpret_cast<uchar*
>(tables_to_lock);
1520 if (tables_to_lock->m_tabledef_valid)
1522 tables_to_lock->m_tabledef.table_def::~table_def();
1523 tables_to_lock->m_tabledef_valid= FALSE;
1532 if (tables_to_lock->m_conv_table)
1533 free_blobs(tables_to_lock->m_conv_table);
1537 tables_to_lock_count--;
1540 DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
1544 void Relay_log_info::slave_close_thread_tables(THD *thd)
1546 thd->get_stmt_da()->set_overwrite_status(
true);
1547 DBUG_ENTER(
"Relay_log_info::slave_close_thread_tables(THD *thd)");
1548 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
1549 thd->get_stmt_da()->set_overwrite_status(
false);
1551 close_thread_tables(thd);
1564 if (thd->transaction_rollback_request)
1566 trans_rollback_implicit(thd);
1567 thd->mdl_context.release_transactional_locks();
1569 else if (! thd->in_multi_stmt_transaction_mode())
1570 thd->mdl_context.release_transactional_locks();
1572 thd->mdl_context.release_statement_locks();
1574 clear_tables_to_lock();
1586 bool mysql_show_relaylog_events(THD* thd)
1590 DBUG_ENTER(
"mysql_show_relaylog_events");
1592 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
1594 Log_event::init_show_field_list(&field_list);
1596 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1599 if (active_mi == NULL)
1601 my_error(ER_SLAVE_CONFIGURATION, MYF(0));
1605 DBUG_RETURN(show_binlog_events(thd, &active_mi->rli->relay_log));
1610 int Relay_log_info::rli_init_info()
1613 enum_return_check check_return= ERROR_CHECKING_REPOSITORY;
1614 const char *
msg= NULL;
1616 DBUG_ENTER(
"Relay_log_info::rli_init_info");
1629 if (error_on_rli_init_info)
1646 bool hot_log= FALSE;
1660 hot_log= relay_log.is_active(linfo.log_file_name);
1665 my_b_seek(cur_log, (my_off_t) 0);
1669 DBUG_RETURN(recovery_parallel_workers ? mts_recovery_groups(
this) : 0);
1673 slave_skip_counter= 0;
1675 log_space_limit= relay_log_space_limit;
1678 tables_to_lock_count= 0;
1680 char pattern[FN_REFLEN];
1681 (void) my_realpath(pattern, slave_load_tmpdir, 0);
1682 if (fn_format(pattern, PREFIX_SQL_LOAD, pattern,
"",
1683 MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
1685 sql_print_error(
"Unable to use slave's temporary directory '%s'.",
1689 unpack_filename(slave_patternload_file, pattern);
1690 slave_patternload_file_size= strlen(slave_patternload_file);
1709 if (opt_relay_logname &&
1710 opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
1712 sql_print_error(
"Path '%s' is a directory name, please specify \
1713 a file name for --relay-log option.", opt_relay_logname);
1719 if (opt_relaylog_index_name &&
1720 opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
1723 sql_print_error(
"Path '%s' is a directory name, please specify \
1724 a file name for --relay-log-index option.", opt_relaylog_index_name);
1728 char buf[FN_REFLEN];
1730 static bool name_warning_sent= 0;
1731 ln= relay_log.generate_name(opt_relay_logname,
"-relay-bin",
1734 if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
1743 sql_print_warning(
"Neither --relay-log nor --relay-log-index were used;"
1745 "may break when this MySQL server acts as a "
1746 "slave and has his hostname changed!! Please "
1747 "use '--relay-log=%s' to avoid this problem.", ln);
1748 name_warning_sent= 1;
1751 relay_log.is_relay_log= TRUE;
1753 if (relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
1755 sql_print_error(
"Failed in open_index_file() called from Relay_log_info::rli_init_info().");
1759 global_sid_lock->
wrlock();
1760 gtid_set.dbug_print(
"set of GTIDs in relay log before initialization");
1761 global_sid_lock->
unlock();
1764 relay_log.init_gtid_sets(>id_set, NULL,
1765 opt_slave_sql_verify_checksum,
1768 sql_print_error(
"Failed in init_gtid_sets() called from Relay_log_info::rli_init_info().");
1772 global_sid_lock->
wrlock();
1773 gtid_set.dbug_print(
"set of GTIDs in relay log after initialization");
1774 global_sid_lock->
unlock();
1781 relay_log.set_previous_gtid_set(>id_set);
1786 if (relay_log.open_binlog(ln, 0, SEQ_READ_APPEND,
1787 (max_relay_log_size ? max_relay_log_size :
1788 max_binlog_size),
true,
1791 mi->get_mi_description_event()))
1793 sql_print_error(
"Failed in open_log() called from Relay_log_info::rli_init_info().");
1803 if ((check_return= check_info()) == ERROR_CHECKING_REPOSITORY)
1805 msg=
"Error checking relay log repository";
1812 msg=
"Error reading relay log configuration";
1817 if (check_return == REPOSITORY_DOES_NOT_EXIST)
1820 if (init_relay_log_pos(NullS, BIN_LOG_HEADER_SIZE,
1828 group_master_log_name[0]= 0;
1829 group_master_log_pos= 0;
1835 msg=
"Error reading relay log configuration";
1840 if (is_relay_log_recovery && init_recovery(mi, &msg))
1846 if (init_relay_log_pos(group_relay_log_name,
1847 group_relay_log_pos,
1853 sql_print_error(
"Failed to open the relay log '%s' (relay_log_pos %s).",
1854 group_relay_log_name,
1855 llstr(group_relay_log_pos, llbuf));
1862 char llbuf1[22], llbuf2[22];
1863 DBUG_PRINT(
"info", (
"my_b_tell(cur_log)=%s event_relay_log_pos=%s",
1864 llstr(my_b_tell(cur_log),llbuf1),
1865 llstr(event_relay_log_pos,llbuf2)));
1866 DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
1867 DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
1873 error_on_rli_init_info=
false;
1874 if (flush_info(TRUE))
1876 msg=
"Error reading relay log configuration";
1881 if (count_relay_log_space())
1883 msg=
"Error counting relay log space";
1888 is_relay_log_recovery= FALSE;
1894 error_on_rli_init_info=
true;
1896 sql_print_error(
"%s.", msg);
1897 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1901 void Relay_log_info::end_info()
1903 DBUG_ENTER(
"Relay_log_info::end_info");
1905 error_on_rli_init_info=
false;
1911 if (cur_log_fd >= 0)
1913 end_io_cache(&cache_buf);
1914 (void)my_close(cur_log_fd, MYF(MY_WME));
1918 relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
1919 relay_log.harvest_bytes_written(&log_space_total);
1925 close_temporary_tables();
1930 int Relay_log_info::flush_current_log()
1932 DBUG_ENTER(
"Relay_log_info::flush_current_log");
1937 IO_CACHE *log_file= relay_log.get_log_file();
1938 if (flush_io_cache(log_file))
1944 void Relay_log_info::set_master_info(Master_info* info)
2004 DBUG_ENTER(
"Relay_log_info::flush_info");
2015 handler->set_sync_period(sync_relayloginfo_period);
2020 if (
handler->flush_info(force))
2026 sql_print_error(
"Error writing relay log configuration.");
2030 size_t Relay_log_info::get_number_info_rli_fields()
2032 return sizeof(info_rli_fields)/
sizeof(info_rli_fields[0]);
2035 bool Relay_log_info::read_info(Rpl_info_handler *from)
2038 char *first_non_digit= NULL;
2039 ulong temp_group_relay_log_pos= 0;
2040 ulong temp_group_master_log_pos= 0;
2041 int temp_sql_delay= 0;
2042 int temp_internal_id= 0;
2044 DBUG_ENTER(
"Relay_log_info::read_info");
2084 if (from->prepare_info_for_read() ||
2085 from->get_info(group_relay_log_name, (
size_t)
sizeof(group_relay_log_name),
2089 lines= strtoul(group_relay_log_name, &first_non_digit, 10);
2091 if (group_relay_log_name[0]!=
'\0' &&
2092 *first_non_digit==
'\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2095 if (from->get_info(group_relay_log_name, (
size_t)
sizeof(group_relay_log_name),
2100 DBUG_PRINT(
"info", (
"relay_log_info file is in old format."));
2102 if (from->get_info((ulong *) &temp_group_relay_log_pos,
2103 (ulong) BIN_LOG_HEADER_SIZE) ||
2104 from->get_info(group_master_log_name,
2105 (
size_t)
sizeof(group_relay_log_name),
2107 from->get_info((ulong *) &temp_group_master_log_pos,
2111 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
2113 if (from->get_info((
int *) &temp_sql_delay, (
int) 0))
2117 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_WORKERS)
2119 if (from->get_info(&recovery_parallel_workers,(ulong) 0))
2123 if (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID)
2125 if (from->get_info(&temp_internal_id, (
int) 1))
2129 group_relay_log_pos= temp_group_relay_log_pos;
2130 group_master_log_pos= temp_group_master_log_pos;
2131 sql_delay= (int32) temp_sql_delay;
2132 internal_id= (uint) temp_internal_id;
2134 DBUG_ASSERT(lines < LINES_IN_RELAY_LOG_INFO_WITH_ID ||
2135 (lines >= LINES_IN_RELAY_LOG_INFO_WITH_ID && internal_id == 1));
2139 bool Relay_log_info::write_info(Rpl_info_handler *
to)
2141 DBUG_ENTER(
"Relay_log_info::write_info");
2149 if (to->prepare_info_for_write() ||
2150 to->set_info((
int) LINES_IN_RELAY_LOG_INFO_WITH_ID) ||
2151 to->set_info(group_relay_log_name) ||
2152 to->set_info((ulong) group_relay_log_pos) ||
2153 to->set_info(group_master_log_name) ||
2154 to->set_info((ulong) group_master_log_pos) ||
2155 to->set_info((
int) sql_delay) ||
2156 to->set_info(recovery_parallel_workers) ||
2157 to->set_info((
int) internal_id))
2180 DBUG_ASSERT(!info_thd || !is_mts_worker(info_thd) || !fe);
2184 adapt_to_master_version(fe);
2185 if (info_thd && is_parallel_exec())
2187 for (uint i= 0; i < workers.elements; i++)
2189 Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
2191 if (w->running_status == Slave_worker::RUNNING)
2192 w->set_rli_description_event(fe);
2197 delete rli_description_event;
2198 rli_description_event= fe;
2211 WL6292_TIMESTAMP_EXPLICIT_DEFAULT= 0,
2217 uchar version_split[3];
2222 void (*upgrade) (THD*);
2227 void (*downgrade) (THD*);
2230 void wl6292_upgrade_func(THD *thd)
2232 thd->variables.explicit_defaults_for_timestamp=
false;
2233 if (global_system_variables.explicit_defaults_for_timestamp)
2234 thd->variables.explicit_defaults_for_timestamp=
true;
2239 void wl6292_downgrade_func(THD *thd)
2241 if (global_system_variables.explicit_defaults_for_timestamp)
2242 thd->variables.explicit_defaults_for_timestamp=
false;
2254 { st_feature_version::WL6292_TIMESTAMP_EXPLICIT_DEFAULT,
2255 {5, 6, 6}, wl6292_upgrade_func, wl6292_downgrade_func },
2256 { st_feature_version::_END_OF_LIST,
2257 {255, 255, 255}, NULL, NULL }
2290 ulong master_version, current_version;
2291 int changed= !fdle || ! rli_description_event ? 0 :
2293 (current_version= rli_description_event->get_version_product());
2304 bool downgrade= changed < 0;
2305 long i, i_first= st_feature_version::_END_OF_LIST, i_last= i_first;
2307 for (i= 0; i < st_feature_version::_END_OF_LIST; i++)
2309 ulong ver_f= version_product(s_features[i].version_split);
2311 if ((downgrade ? master_version : current_version) < ver_f &&
2312 i_first == st_feature_version::_END_OF_LIST)
2314 if ((downgrade ? current_version : master_version) < ver_f)
2317 DBUG_ASSERT(i_last >= i_first);
2325 for (i= i_first; i < i_last; i++)
2329 version_product(s_features[i - 1].version_split) <=
2330 version_product(s_features[i].version_split));
2332 DBUG_ASSERT((downgrade ? master_version : current_version) <
2333 version_product(s_features[i].version_split) &&
2334 (downgrade ? current_version : master_version >=
2335 version_product(s_features[i].version_split)));
2337 if (downgrade && s_features[i].downgrade)
2339 s_features[
i].downgrade(thd);
2341 else if (s_features[i].upgrade)
2343 s_features[
i].upgrade(thd);