19 #include "sql_parse.h"
20 #include "global_threads.h"
21 #ifdef HAVE_REPLICATION
25 #include "rpl_filter.h"
27 #include "rpl_handler.h"
28 #include "rpl_master.h"
31 int max_binlog_dump_events = 0;
32 my_bool opt_sporadic_binlog_dump_fail = 0;
35 static int binlog_dump_count = 0;
38 #define SLAVE_LIST_CHUNK 128
39 #define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
41 extern TYPELIB binlog_checksum_typelib;
44 #define get_object(p, obj, msg) \
49 my_error(ER_MALFORMED_PACKET, MYF(0)); \
54 if (p + len > p_end || len >= sizeof(obj)) \
59 strmake(obj,(char*) p,len); \
64 *slave_list_key(SLAVE_INFO* si,
size_t *len,
65 my_bool not_used __attribute__((unused)))
68 return &si->server_id;
71 extern "C" void slave_info_free(
void *s)
76 #ifdef HAVE_PSI_INTERFACE
77 static PSI_mutex_key key_LOCK_slave_list;
79 static PSI_mutex_info all_slave_list_mutexes[]=
81 { &key_LOCK_slave_list,
"LOCK_slave_list", PSI_FLAG_GLOBAL}
84 static void init_all_slave_list_mutexes(
void)
88 count= array_elements(all_slave_list_mutexes);
93 void init_slave_list()
95 #ifdef HAVE_PSI_INTERFACE
96 init_all_slave_list_mutexes();
99 my_hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0,
100 (my_hash_get_key) slave_list_key,
101 (my_hash_free_key) slave_info_free, 0);
102 mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list, MY_MUTEX_INIT_FAST);
105 void end_slave_list()
108 if (my_hash_inited(&slave_list))
110 my_hash_free(&slave_list);
124 int register_slave(THD* thd, uchar* packet, uint packet_length)
128 uchar *p= packet, *p_end= packet + packet_length;
129 const char *errmsg=
"Wrong parameters to function register_slave";
131 if (
check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
133 if (!(si = (SLAVE_INFO*)my_malloc(
sizeof(SLAVE_INFO), MYF(MY_WME))))
139 my_error(ER_MALFORMED_PACKET, MYF(0));
144 thd->server_id= si->server_id= uint4korr(p);
146 get_object(p,si->host,
"Failed to register slave: too long 'report-host'");
147 get_object(p,si->user,
"Failed to register slave: too long 'report-user'");
148 get_object(p,si->password,
"Failed to register slave; too long 'report-password'");
151 si->port= uint2korr(p);
160 if (!(si->master_id= uint4korr(p)))
161 si->master_id= server_id;
165 unregister_slave(thd,
false,
false);
166 res= my_hash_insert(&slave_list, (uchar*) si);
172 my_message(ER_UNKNOWN_ERROR, errmsg, MYF(0));
177 void unregister_slave(THD* thd,
bool only_mine,
bool need_lock_slave_list)
181 if (need_lock_slave_list)
187 if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list,
188 (uchar*)&thd->server_id, 4)) &&
189 (!only_mine || old_si->thd == thd))
190 my_hash_delete(&slave_list, (uchar*)old_si);
192 if (need_lock_slave_list)
207 bool show_slave_hosts(THD* thd)
211 DBUG_ENTER(
"show_slave_hosts");
216 if (opt_show_slave_auth_info)
227 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
232 for (uint
i = 0;
i < slave_list.records; ++
i)
234 SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list,
i);
235 protocol->prepare_for_resend();
236 protocol->
store((uint32) si->server_id);
237 protocol->
store(si->host, &my_charset_bin);
238 if (opt_show_slave_auth_info)
240 protocol->
store(si->user, &my_charset_bin);
241 protocol->
store(si->password, &my_charset_bin);
243 protocol->
store((uint32) si->port);
244 protocol->
store((uint32) si->master_id);
248 if (get_slave_uuid(si->thd, &slave_uuid))
249 protocol->
store(slave_uuid.c_ptr_safe(), &my_charset_bin);
250 if (protocol->write())
266 inline void fix_checksum(
String *packet, ulong ev_offset)
269 uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
270 ha_checksum crc= my_checksum(0L, NULL, 0);
271 DBUG_ASSERT(data_len ==
272 LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
274 crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len -
280 static user_var_entry * get_binlog_checksum_uservar(THD * thd)
283 user_var_entry *
entry=
284 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
298 static bool is_slave_checksum_aware(THD * thd)
300 DBUG_ENTER(
"is_slave_checksum_aware");
301 user_var_entry *entry= get_binlog_checksum_uservar(thd);
302 DBUG_RETURN(entry?
true :
false);
319 static uint8 get_binlog_checksum_value_at_connect(THD * thd)
323 DBUG_ENTER(
"get_binlog_checksum_value_at_connect");
324 user_var_entry *entry= get_binlog_checksum_uservar(thd);
327 ret= BINLOG_CHECKSUM_ALG_UNDEF;
331 DBUG_ASSERT(entry->type() == STRING_RESULT);
334 str.copy(entry->ptr(), entry->length(), &my_charset_bin, &my_charset_bin,
336 ret= (uint8) find_type ((
char*) str.ptr(), &binlog_checksum_typelib, 1) - 1;
337 DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32);
360 static int fake_rotate_event(
NET* net,
String* packet,
char* log_file_name,
361 ulonglong position,
const char** errmsg,
362 uint8 checksum_alg_arg)
364 DBUG_ENTER(
"fake_rotate_event");
365 char header[LOG_EVENT_HEADER_LEN],
buf[ROTATE_HEADER_LEN+100];
373 my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
374 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
380 memset(header, 0, 4);
381 header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
383 char* p = log_file_name+dirname_length(log_file_name);
384 uint ident_len = (uint) strlen(p);
385 ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
387 int4store(header + SERVER_ID_OFFSET, server_id);
388 int4store(header + EVENT_LEN_OFFSET, event_len);
392 int4store(header + LOG_POS_OFFSET, 0);
394 packet->append(header,
sizeof(header));
395 int8store(buf+R_POS_OFFSET,position);
396 packet->append(buf, ROTATE_HEADER_LEN);
397 packet->append(p, ident_len);
402 ha_checksum crc= my_checksum(0L, NULL, 0);
403 crc= my_checksum(crc, (uchar*)header,
sizeof(header));
404 crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
405 crc= my_checksum(crc, (uchar*)p, ident_len);
407 packet->append(b,
sizeof(b));
410 if (
my_net_write(net, (uchar*) packet->ptr(), packet->length()))
412 *errmsg =
"failed on my_net_write()";
424 static int reset_transmit_packet(THD *thd, ushort
flags,
425 ulong *ev_offset,
const char **errmsg)
428 String *packet= &thd->packet;
432 packet->set(
"\0", 1, &my_charset_bin);
434 if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
436 *errmsg=
"Failed to run hook 'reserve_header'";
437 my_errno= ER_UNKNOWN_ERROR;
440 *ev_offset= packet->length();
441 DBUG_PRINT(
"info", (
"rpl_master.cc:reset_transmit_packet returns %d", ret));
445 static int send_file(THD *thd)
447 NET* net = &thd->net;
448 int fd = -1, error = 1;
450 char fname[FN_REFLEN+1];
451 const char *errmsg = 0;
453 unsigned long packet_len;
455 DBUG_ENTER(
"send_file");
461 old_timeout= net->read_timeout;
462 my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
470 errmsg =
"while reading file name";
475 *((
char*)net->read_pos + packet_len) = 0;
476 fn_format(fname, (
char*) net->read_pos + 1,
"",
"", 4);
478 if (!strcmp(fname,
"/dev/null"))
482 fname, O_RDONLY, MYF(0))) < 0)
484 errmsg =
"on open of file";
492 errmsg =
"while writing data to client";
501 errmsg =
"while negotiating file transfer close";
507 my_net_set_read_timeout(net, old_timeout);
512 sql_print_error(
"Failed in send_file() %s", errmsg);
513 DBUG_PRINT(
"error", (
"%s", errmsg));
519 int test_for_non_eof_log_read_errors(
int error,
const char **errmsg)
521 if (error == LOG_READ_EOF)
523 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
526 *errmsg =
"bogus data in log event";
528 case LOG_READ_TOO_LARGE:
529 *errmsg =
"log event entry exceeded max_allowed_packet; \
530 Increase max_allowed_packet on master";
533 *errmsg =
"I/O error reading log event";
536 *errmsg =
"memory allocation failed reading log event";
539 *errmsg =
"binlog truncated in the middle of event; consider out of disk space on master";
541 case LOG_READ_CHECKSUM_FAILURE:
542 *errmsg =
"event read from binlog did not pass crc check";
545 *errmsg =
"unknown error reading log event on the master";
561 static ulonglong get_heartbeat_period(THD * thd)
564 LEX_STRING name= { C_STRING_WITH_LEN(
"master_heartbeat_period")};
565 user_var_entry *entry=
566 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
568 return entry? entry->val_int(&null_value) : 0;
585 static int send_heartbeat_event(
NET* net,
String* packet,
587 uint8 checksum_alg_arg)
589 DBUG_ENTER(
"send_heartbeat_event");
590 char header[LOG_EVENT_HEADER_LEN];
591 my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
592 checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
597 memset(header, 0, 4);
599 header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
601 char* p= coord->file_name + dirname_length(coord->file_name);
603 uint ident_len = strlen(p);
604 ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
606 int4store(header + SERVER_ID_OFFSET, server_id);
607 int4store(header + EVENT_LEN_OFFSET, event_len);
608 int2store(header + FLAGS_OFFSET, 0);
610 int4store(header + LOG_POS_OFFSET, coord->pos);
612 packet->append(header,
sizeof(header));
613 packet->append(p, ident_len);
618 ha_checksum crc= my_checksum(0L, NULL, 0);
619 crc= my_checksum(crc, (uchar*) header,
sizeof(header));
620 crc= my_checksum(crc, (uchar*) p, ident_len);
622 packet->append(b,
sizeof(b));
625 if (
my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
653 static int send_last_skip_group_heartbeat(THD *thd,
NET* net,
String *packet,
656 uint8 checksum_alg_arg,
const char **errmsg)
658 DBUG_ENTER(
"send_last_skip_group_heartbeat");
660 int save_offset= *ev_offset;
663 save_packet.swap(*packet);
665 if (reset_transmit_packet(thd, 0, ev_offset, errmsg))
669 if (send_heartbeat_event(net, packet, last_skip_coord, checksum_alg_arg))
671 *errmsg=
"Failed on my_net_write()";
672 my_errno= ER_UNKNOWN_ERROR;
677 packet->swap(save_packet);
678 *ev_offset= save_offset;
680 DBUG_PRINT(
"info", (
"rpl_master.cc:send_last_skip_group_heartbeat returns 0"));
689 #define CHECK_PACKET_SIZE(BYTES) \
691 if (packet_bytes_todo < BYTES) \
692 goto error_malformed_packet; \
702 #define READ(DECODE, BYTES) \
704 CHECK_PACKET_SIZE(BYTES); \
706 packet_position+= BYTES; \
707 packet_bytes_todo-= BYTES; \
710 #define SKIP(BYTES) READ((void)(0), BYTES)
717 #define READ_INT(VAR, BYTES) \
718 READ(VAR= uint ## BYTES ## korr(packet_position), BYTES)
725 #define READ_STRING(VAR, BYTES, BUFFER_SIZE) \
727 if (BUFFER_SIZE <= BYTES) \
728 goto error_malformed_packet; \
729 READ(memcpy(VAR, packet_position, BYTES), BYTES); \
734 bool com_binlog_dump(THD *thd,
char *packet, uint packet_length)
736 DBUG_ENTER(
"com_binlog_dump");
739 const uchar* packet_position= (uchar *) packet;
740 uint packet_bytes_todo= packet_length;
742 status_var_increment(thd->status_var.com_other);
743 thd->enable_slow_log= opt_log_slow_admin_statements;
754 READ_INT(thd->server_id, 4);
756 get_slave_uuid(thd, &slave_uuid);
757 kill_zombie_dump_threads(&slave_uuid);
759 general_log_print(thd, thd->get_command(),
"Log: '%s' Pos: %ld",
760 packet + 10, (long) pos);
761 mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, NULL);
763 unregister_slave(thd,
true,
true);
767 error_malformed_packet:
768 my_error(ER_MALFORMED_PACKET, MYF(0));
773 bool com_binlog_dump_gtid(THD *thd,
char *packet, uint packet_length)
775 DBUG_ENTER(
"com_binlog_dump_gtid");
783 char name[FN_REFLEN + 1];
785 char* gtid_string= NULL;
786 const uchar* packet_position= (uchar *) packet;
787 uint packet_bytes_todo= packet_length;
789 Gtid_set slave_gtid_executed(&sid_map);
791 status_var_increment(thd->status_var.com_other);
792 thd->enable_slow_log= opt_log_slow_admin_statements;
797 READ_INT(thd->server_id, 4);
798 READ_INT(name_size, 4);
799 READ_STRING(name, name_size,
sizeof(name));
801 READ_INT(data_size, 4);
802 CHECK_PACKET_SIZE(data_size);
803 if (slave_gtid_executed.add_gtid_encoding(packet_position, data_size) !=
806 gtid_string= slave_gtid_executed.to_string();
807 DBUG_PRINT(
"info", (
"Slave %d requested to read %s at position %llu gtid set "
808 "'%s'.", thd->server_id, name, pos, gtid_string));
810 get_slave_uuid(thd, &slave_uuid);
811 kill_zombie_dump_threads(&slave_uuid);
812 general_log_print(thd, thd->get_command(),
"Log: '%s' Pos: %llu GTIDs: '%s'",
813 name, pos, gtid_string);
814 my_free(gtid_string);
815 mysql_binlog_send(thd, name, (my_off_t) pos, &slave_gtid_executed);
817 unregister_slave(thd,
true,
true);
821 error_malformed_packet:
822 my_error(ER_MALFORMED_PACKET, MYF(0));
827 void mysql_binlog_send(THD* thd,
char* log_ident, my_off_t pos,
828 const Gtid_set* slave_gtid_executed)
836 DBUG_PRINT("info", ("mysql_binlog_send fails; goto err from line %d", \
841 char *log_file_name = linfo.log_file_name;
842 char search_file_name[FN_REFLEN], *
name;
845 bool using_gtid_protocol= slave_gtid_executed != NULL;
846 bool searching_first_gtid= using_gtid_protocol;
847 bool skip_group=
false;
848 bool binlog_has_previous_gtids_log_event=
false;
853 String* packet = &thd->packet;
855 const char *errmsg =
"Unknown error";
856 char error_text[MAX_SLAVE_ERRMSG];
857 NET* net = &thd->net;
860 uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
864 int left_events = max_binlog_dump_events;
866 int old_max_allowed_packet= thd->variables.max_allowed_packet;
878 thd->set_stmt_da(&temp_da);
880 DBUG_ENTER(
"mysql_binlog_send");
881 DBUG_PRINT(
"enter",(
"log_ident: '%s' pos: %ld", log_ident, (
long) pos));
883 memset(&log, 0,
sizeof(log));
887 ulonglong heartbeat_period= get_heartbeat_period(thd);
889 struct timespec *heartbeat_ts= NULL;
891 *p_start_coord= &start_coord;
892 LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
893 *p_coord= &coord_buf;
900 bool last_skip_group= skip_group;
902 char last_skip_log_name[FN_REFLEN+1];
904 LOG_POS_COORD last_skip_coord_buf= {last_skip_log_name, BIN_LOG_HEADER_SIZE},
905 *p_last_skip_coord= &last_skip_coord_buf;
907 if (heartbeat_period != LL(0))
909 heartbeat_ts= &heartbeat_buf;
910 set_timespec_nsec(*heartbeat_ts, 0);
912 if (log_warnings > 1)
913 sql_print_information(
"Start binlog_dump to master_thread_id(%lu) slave_server(%d), pos(%s, %lu)",
914 thd->thread_id, thd->server_id, log_ident, (ulong)pos);
915 if (RUN_HOOK(binlog_transmit, transmit_start, (thd, 0, log_ident, pos)))
917 errmsg=
"Failed to run hook 'transmit_start'";
918 my_errno= ER_UNKNOWN_ERROR;
923 if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
925 errmsg =
"Master fails in COM_BINLOG_DUMP because of --opt-sporadic-binlog-dump-fail";
926 my_errno= ER_UNKNOWN_ERROR;
931 if (!mysql_bin_log.is_open())
933 errmsg =
"Binary log is not open";
934 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
937 if (!server_id_supplied)
939 errmsg =
"Misconfigured master - server_id was not set";
940 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
944 name= search_file_name;
949 if (using_gtid_protocol)
955 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
963 linfo.index_file_offset= 0;
967 errmsg =
"Could not find first log file name in binary log index file";
968 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
973 thd->current_linfo = &linfo;
978 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
981 if (pos < BIN_LOG_HEADER_SIZE)
983 errmsg=
"Client requested master to start replication from position < 4";
984 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
987 if (pos > my_b_filelength(&log))
989 errmsg=
"Client requested master to start replication from position > file size";
990 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
995 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1026 if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
1027 get_binlog_checksum_value_at_connect(current_thd)))
1034 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1043 thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
1051 log_lock= mysql_bin_log.get_log_lock();
1052 log_cond= mysql_bin_log.get_log_cond();
1053 if (pos > BIN_LOG_HEADER_SIZE)
1057 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1064 if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
1066 DBUG_PRINT(
"info", (
"read_log_event returned 0 on line %d", __LINE__));
1073 (
"Looked for a Format_description_log_event, found event type %s",
1075 if ((*packet)[EVENT_TYPE_OFFSET + ev_offset] == FORMAT_DESCRIPTION_EVENT)
1078 packet->length() - ev_offset);
1079 DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
1080 current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1081 current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
1082 if (!is_slave_checksum_aware(thd) &&
1083 current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1084 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1086 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1087 errmsg=
"Slave can not handle replication events with the checksum "
1088 "that master is configured to log";
1089 sql_print_warning(
"Master is configured to log replication events "
1090 "with checksum, but will not send such events to "
1091 "slaves that cannot process them");
1094 (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1100 int4store((
char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
1105 int4store((
char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
1106 ST_CREATED_OFFSET+ev_offset, (ulong) 0);
1109 if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1110 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1111 fix_checksum(packet, ev_offset);
1114 if (
my_net_write(net, (uchar*) packet->ptr(), packet->length()))
1116 errmsg =
"Failed on my_net_write()";
1117 my_errno= ER_UNKNOWN_ERROR;
1131 if (test_for_non_eof_log_read_errors(error, &errmsg))
1145 my_b_seek(&log, pos);
1147 while (!net->error && net->vio != 0 && !thd->killed)
1150 bool goto_next_binlog=
false;
1154 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1156 DBUG_EXECUTE_IF(
"semi_sync_3-way_deadlock",
1158 const char act[]=
"now wait_for signal.rotate_finished";
1159 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1160 STRING_WITH_LEN(act)));
1162 bool is_active_binlog=
false;
1163 while (!(error= Log_event::read_log_event(&log, packet, log_lock,
1164 current_checksum_alg,
1166 &is_active_binlog)))
1168 DBUG_PRINT(
"info", (
"read_log_event returned 0 on line %d", __LINE__));
1170 if (max_binlog_dump_events && !left_events--)
1173 errmsg =
"Debugging binlog dump abort";
1174 my_errno= ER_UNKNOWN_ERROR;
1181 p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
1183 event_type= (
Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
1184 DBUG_EXECUTE_IF(
"dump_thread_wait_before_send_xid",
1186 if (event_type == XID_EVENT)
1191 "wait_for signal.continue";
1192 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1193 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1194 STRING_WITH_LEN(act)));
1200 case FORMAT_DESCRIPTION_EVENT:
1203 packet->length() - ev_offset);
1204 DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
1205 current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
1206 current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
1207 if (!is_slave_checksum_aware(thd) &&
1208 current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1209 current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1211 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1212 errmsg=
"Slave can not handle replication events with the checksum "
1213 "that master is configured to log";
1214 sql_print_warning(
"Master is configured to log replication events "
1215 "with checksum, but will not send such events to "
1216 "slaves that cannot process them");
1219 (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1226 p_fdle->checksum_alg= current_checksum_alg;
1229 case ANONYMOUS_GTID_LOG_EVENT:
1232 case GTID_LOG_EVENT:
1235 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1236 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1239 if (using_gtid_protocol)
1249 ulonglong checksum_size=
1250 ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1251 p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
1258 packet->length() - checksum_size,
1260 skip_group= slave_gtid_executed->
contains_gtid(gtid_ev.get_sidno(sid_map),
1262 searching_first_gtid= skip_group;
1263 DBUG_PRINT(
"info", (
"Dumping GTID sidno(%d) gno(%lld) skip group(%d) "
1264 "searching gtid(%d).",
1265 gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
1266 skip_group, searching_first_gtid));
1271 case INCIDENT_EVENT:
1272 skip_group= searching_first_gtid;
1275 case PREVIOUS_GTIDS_LOG_EVENT:
1276 binlog_has_previous_gtids_log_event=
true;
1279 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1280 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1289 if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
1298 goto_next_binlog=
true;
1303 if (goto_next_binlog)
1307 DBUG_PRINT(
"info", (
"EVENT_TYPE %d SEARCHING %d SKIP_GROUP %d file %s pos %lld\n",
1308 event_type, searching_first_gtid, skip_group, log_file_name,
1310 pos = my_b_tell(&log);
1311 if (RUN_HOOK(binlog_transmit, before_send_event,
1312 (thd, 0, packet, log_file_name, pos)))
1314 my_errno= ER_UNKNOWN_ERROR;
1315 errmsg=
"run 'before_send_event' hook failed";
1322 p_last_skip_coord->pos= p_coord->pos;
1323 strcpy(p_last_skip_coord->file_name, p_coord->file_name);
1326 if (!skip_group && last_skip_group
1327 && event_type != FORMAT_DESCRIPTION_EVENT)
1340 if (send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord,
1341 &ev_offset, current_checksum_alg,
1349 last_skip_group= skip_group;
1351 if (skip_group ==
false &&
my_net_write(net, (uchar*) packet->ptr(), packet->length()))
1353 errmsg =
"Failed on my_net_write()";
1354 my_errno= ER_UNKNOWN_ERROR;
1359 DBUG_EXECUTE_IF(
"dump_thread_wait_before_send_xid",
1361 if (event_type == XID_EVENT)
1367 DBUG_PRINT(
"info", (
"log event code %d", event_type));
1368 if (skip_group ==
false && event_type == LOAD_EVENT)
1372 errmsg =
"failed in send_file()";
1373 my_errno= ER_UNKNOWN_ERROR;
1378 if (RUN_HOOK(binlog_transmit, after_send_event, (thd, 0, packet,
1379 log_file_name, skip_group ? pos : 0)))
1381 errmsg=
"Failed to run hook 'after_send_event'";
1382 my_errno= ER_UNKNOWN_ERROR;
1387 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1391 DBUG_EXECUTE_IF(
"wait_after_binlog_EOF",
1393 const char act[]=
"now wait_for signal.rotate_finished";
1394 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1395 STRING_WITH_LEN(act)));
1405 if (test_for_non_eof_log_read_errors(error, &errmsg))
1408 if (!is_active_binlog)
1409 goto_next_binlog=
true;
1411 if (!goto_next_binlog)
1418 errmsg =
"failed on net_flush()";
1419 my_errno= ER_UNKNOWN_ERROR;
1431 bool read_packet = 0;
1434 if (max_binlog_dump_events && !left_events--)
1436 errmsg =
"Debugging binlog dump abort";
1437 my_errno= ER_UNKNOWN_ERROR;
1444 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1458 switch (error= Log_event::read_log_event(&log, packet, (
mysql_mutex_t*) 0,
1459 current_checksum_alg)) {
1461 DBUG_PRINT(
"info", (
"read_log_event returned 0 on line %d",
1466 p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
1467 event_type= (
Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
1468 DBUG_ASSERT(event_type != FORMAT_DESCRIPTION_EVENT);
1475 DBUG_PRINT(
"wait",(
"waiting for data in binary log"));
1476 if (thd->server_id==0)
1479 DBUG_EXECUTE_IF(
"inject_hb_event_on_mysqlbinlog_dump_thread",
1488 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg) ||
1489 send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
1496 ulong hb_info_counter= 0;
1499 signal_cnt= mysql_bin_log.signal_cnt;
1503 if (heartbeat_period != 0)
1505 DBUG_ASSERT(heartbeat_ts);
1506 set_timespec_nsec(*heartbeat_ts, heartbeat_period);
1508 thd->ENTER_COND(log_cond, log_lock,
1509 &stage_master_has_sent_all_binlog_to_slave,
1521 if (send_last_skip_group_heartbeat(thd, net, packet,
1522 p_coord, &ev_offset,
1523 current_checksum_alg, &errmsg))
1525 thd->EXIT_COND(&old_stage);
1528 last_skip_group=
false;
1532 DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
1533 if (ret == ETIMEDOUT || ret == ETIME)
1536 if (hb_info_counter < 3)
1538 sql_print_information(
"master sends heartbeat message");
1540 if (hb_info_counter == 3)
1541 sql_print_information(
"the rest of heartbeat info skipped ...");
1545 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1547 thd->EXIT_COND(&old_stage);
1550 if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
1552 errmsg =
"Failed on my_net_write()";
1553 my_errno= ER_UNKNOWN_ERROR;
1554 thd->EXIT_COND(&old_stage);
1560 DBUG_PRINT(
"wait",(
"binary log received update or a broadcast signal caught"));
1562 }
while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
1563 thd->EXIT_COND(&old_stage);
1569 test_for_non_eof_log_read_errors(error, &errmsg);
1577 case ANONYMOUS_GTID_LOG_EVENT:
1580 case GTID_LOG_EVENT:
1583 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1584 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1587 if (using_gtid_protocol)
1589 ulonglong checksum_size=
1590 ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1591 p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
1594 packet->length() - checksum_size,
1597 slave_gtid_executed->
contains_gtid(gtid_ev.get_sidno(sid_map),
1599 searching_first_gtid= skip_group;
1600 DBUG_PRINT(
"info", (
"Dumping GTID sidno(%d) gno(%lld) "
1601 "skip group(%d) searching gtid(%d).",
1602 gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
1603 skip_group, searching_first_gtid));
1608 case INCIDENT_EVENT:
1609 skip_group= searching_first_gtid;
1612 case PREVIOUS_GTIDS_LOG_EVENT:
1613 binlog_has_previous_gtids_log_event=
true;
1616 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1617 errmsg= ER(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF);
1625 if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
1634 goto_next_binlog=
true;
1642 p_last_skip_coord->pos= p_coord->pos;
1643 strcpy(p_last_skip_coord->file_name, p_coord->file_name);
1646 if (!skip_group && !goto_next_binlog)
1649 if (last_skip_group &&
1650 send_last_skip_group_heartbeat(thd, net, packet,
1651 p_last_skip_coord, &ev_offset,
1652 current_checksum_alg, &errmsg))
1657 THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
1658 pos = my_b_tell(&log);
1659 if (RUN_HOOK(binlog_transmit, before_send_event,
1660 (thd, 0, packet, log_file_name, pos)))
1662 my_errno= ER_UNKNOWN_ERROR;
1663 errmsg=
"run 'before_send_event' hook failed";
1667 if (
my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
1669 errmsg =
"Failed on my_net_write()";
1670 my_errno= ER_UNKNOWN_ERROR;
1674 if (event_type == LOAD_EVENT)
1678 errmsg =
"failed in send_file()";
1679 my_errno= ER_UNKNOWN_ERROR;
1685 if(!goto_next_binlog)
1687 if (RUN_HOOK(binlog_transmit, after_send_event, (thd, 0, packet,
1688 log_file_name, skip_group ? pos : 0)))
1690 my_errno= ER_UNKNOWN_ERROR;
1691 errmsg=
"Failed to run hook 'after_send_event'";
1697 last_skip_group= skip_group;
1705 if (goto_next_binlog)
1708 binlog_has_previous_gtids_log_event=
false;
1710 THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
1715 errmsg =
"could not find next log";
1716 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1724 if (reset_transmit_packet(thd, 0, &ev_offset, &errmsg))
1744 fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
1745 &errmsg, current_checksum_alg))
1747 my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
1751 p_coord->file_name= log_file_name;
1756 thd->set_stmt_da(saved_da);
1760 (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, 0));
1762 THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
1764 thd->current_linfo = 0;
1766 thd->variables.max_allowed_packet= old_max_allowed_packet;
1770 THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
1771 if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
1777 my_snprintf(error_text,
sizeof(error_text),
1778 "%s; the first event '%s' at %lld, "
1779 "the last event read from '%s' at %lld, "
1780 "the last byte read from '%s' at %lld.",
1782 p_start_coord->file_name, p_start_coord->pos,
1783 p_coord->file_name, p_coord->pos,
1784 log_file_name, my_b_tell(&log));
1788 strncpy(error_text, errmsg,
sizeof(error_text));
1789 error_text[
sizeof(error_text) - 1]=
'\0';
1792 (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, 0));
1801 thd->current_linfo = 0;
1805 thd->variables.max_allowed_packet= old_max_allowed_packet;
1807 thd->set_stmt_da(saved_da);
1808 my_message(my_errno, error_text, MYF(0));
1823 uchar name[]=
"slave_uuid";
1827 user_var_entry *entry=
1828 (user_var_entry*) my_hash_search(&thd->user_vars, name,
sizeof(name)-1);
1829 if (entry && entry->length() > 0)
1831 value->copy(entry->ptr(), entry->length(), NULL);
1858 void kill_zombie_dump_threads(
String *slave_uuid)
1860 if (slave_uuid->length() == 0)
1862 DBUG_ASSERT(slave_uuid->length() == UUID_LENGTH);
1866 Thread_iterator it= global_thread_list_begin();
1867 Thread_iterator end= global_thread_list_end();
1868 for (; it != end; ++it)
1870 if ((*it) != current_thd && ((*it)->get_command() == COM_BINLOG_DUMP ||
1871 (*it)->get_command() == COM_BINLOG_DUMP_GTID))
1874 if (get_slave_uuid((*it), &tmp_uuid) != NULL &&
1875 !strncmp(slave_uuid->c_ptr(), tmp_uuid.c_ptr(), UUID_LENGTH))
1891 tmp->awake(THD::KILL_QUERY);
1906 int reset_master(THD* thd)
1908 if (!mysql_bin_log.is_open())
1910 my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1911 ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1917 (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 ));
1931 bool show_master_status(THD* thd)
1934 char* gtid_set_buffer= NULL;
1935 int gtid_set_size= 0;
1938 DBUG_ENTER(
"show_binlog_info");
1940 global_sid_lock->
wrlock();
1942 if ((gtid_set_size= gtid_set->
to_string(>id_set_buffer)) < 0)
1944 global_sid_lock->
unlock();
1946 my_free(gtid_set_buffer);
1949 global_sid_lock->
unlock();
1953 MYSQL_TYPE_LONGLONG));
1960 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1962 my_free(gtid_set_buffer);
1965 protocol->prepare_for_resend();
1967 if (mysql_bin_log.is_open())
1970 mysql_bin_log.get_current_log(&li);
1971 int dir_len = dirname_length(li.log_file_name);
1972 protocol->
store(li.log_file_name + dir_len, &my_charset_bin);
1973 protocol->
store((ulonglong) li.pos);
1974 protocol->
store(binlog_filter->get_do_db());
1975 protocol->
store(binlog_filter->get_ignore_db());
1976 protocol->
store(gtid_set_buffer, &my_charset_bin);
1977 if (protocol->write())
1979 my_free(gtid_set_buffer);
1984 my_free(gtid_set_buffer);
1998 bool show_binlogs(THD* thd)
2003 char fname[FN_REFLEN];
2008 DBUG_ENTER(
"show_binlogs");
2010 if (!mysql_bin_log.is_open())
2012 my_error(ER_NO_BINARY_LOGGING, MYF(0));
2018 MYSQL_TYPE_LONGLONG));
2020 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2024 mysql_bin_log.lock_index();
2025 index_file=mysql_bin_log.get_index_file();
2027 mysql_bin_log.raw_get_current_log(&cur);
2030 cur_dir_len= dirname_length(cur.log_file_name);
2032 reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
2035 while ((length=my_b_gets(index_file, fname,
sizeof(fname))) > 1)
2038 ulonglong file_length= 0;
2039 fname[--length] =
'\0';
2041 protocol->prepare_for_resend();
2042 dir_len= dirname_length(fname);
2044 protocol->
store(fname + dir_len, length, &my_charset_bin);
2046 if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
2047 file_length= cur.pos;
2052 fname, O_RDONLY | O_SHARE | O_BINARY,
2055 file_length= (ulonglong)
mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
2059 protocol->
store(file_length);
2060 if (protocol->write())
2062 DBUG_PRINT(
"info", (
"stopping dump thread because protocol->write failed at line %d", __LINE__));
2066 if(index_file->error == -1)
2068 mysql_bin_log.unlock_index();
2073 mysql_bin_log.unlock_index();