17 #include "my_global.h"
21 #include "rpl_filter.h"
23 #include "sql_plugin.h"
24 #include "rpl_handler.h"
25 #include "rpl_info_factory.h"
26 #include "rpl_utility.h"
28 #include "global_threads.h"
30 #include "sql_parse.h"
34 #include <my_stacktrace.h>
41 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
48 #define MY_OFF_T_UNDEF (~(my_off_t)0UL)
54 #define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 50
56 #define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 50
57 #define MAX_SESSION_ATTACH_TRIES 10
59 static ulonglong limit_unsafe_suppression_start_time= 0;
60 static bool unsafe_warning_suppression_is_activated=
false;
61 static int limit_unsafe_warning_count= 0;
64 bool opt_binlog_order_commits=
true;
66 const char *log_bin_index= 0;
67 const char *log_bin_basename= 0;
71 static int binlog_init(
void *p);
72 static int binlog_start_trans_and_stmt(THD *thd,
Log_event *start_event);
73 static int binlog_close_connection(
handlerton *hton, THD *thd);
74 static int binlog_savepoint_set(
handlerton *hton, THD *thd,
void *sv);
75 static int binlog_savepoint_rollback(
handlerton *hton, THD *thd,
void *sv);
76 static int binlog_commit(
handlerton *hton, THD *thd,
bool all);
77 static int binlog_rollback(
handlerton *hton, THD *thd,
bool all);
78 static int binlog_prepare(
handlerton *hton, THD *thd,
bool all);
121 static void print_system_time()
125 GetSystemTime(&utc_time);
126 const long hrs= utc_time.wHour;
127 const long mins= utc_time.wMinute;
128 const long secs= utc_time.wSecond;
131 const time_t curr_time= time(NULL);
133 const long tmins = curr_time / 60;
134 const long thrs = tmins / 60;
135 const long hrs = thrs % 24;
136 const long mins = tmins % 60;
137 const long secs = curr_time % 60;
139 char hrs_buf[3]=
"00";
140 char mins_buf[3]=
"00";
141 char secs_buf[3]=
"00";
143 my_safe_itoa(base, hrs, &hrs_buf[2]);
144 my_safe_itoa(base, mins, &mins_buf[2]);
145 my_safe_itoa(base, secs, &secs_buf[2]);
147 my_safe_printf_stderr(
"---------- %s:%s:%s UTC - ",
148 hrs_buf, mins_buf, secs_buf);
180 : m_original_thd(thd)
181 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
188 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
192 #ifndef EMBEDDED_LIBRARY
193 if (unlikely(setup_thread_globals(m_original_thd)))
214 while (i < MAX_SESSION_ATTACH_TRIES)
225 sql_print_warning(
"Server overcomes the temporary 'out of memory' "
226 "in '%d' tries while attaching to session thread "
227 "during the group commit phase.\n", i + 1);
236 if (MAX_SESSION_ATTACH_TRIES == i)
239 my_safe_printf_stderr(
"%s",
"[Fatal] Out of memory while attaching to "
240 "session thread during the group commit phase. "
241 "Data consistency between master and slave can "
242 "be guaranteed after server restarts.\n");
252 int attach_to(THD *thd)
254 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
258 #ifndef EMBEDDED_LIBRARY
259 if (DBUG_EVALUATE_IF(
"simulate_session_attach_error", 1, 0)
260 || unlikely(setup_thread_globals(thd)))
262 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
271 return ER_OUTOFMEMORY;
277 int setup_thread_globals(THD *thd)
const {
279 THD *original_thd= my_pthread_getspecific(THD*, THR_THD);
281 if ((error= my_pthread_setspecific_ptr(THR_THD, thd)))
283 if ((error= my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root)))
285 if ((error= set_mysys_var(thd->mysys_var)))
289 error= my_pthread_setspecific_ptr(THR_MALLOC, original_mem_root);
291 error= my_pthread_setspecific_ptr(THR_THD, original_thd);
297 PSI_thread *m_saved_psi;
314 my_off_t max_binlog_cache_size_arg,
315 ulong *ptr_binlog_cache_use_arg,
316 ulong *ptr_binlog_cache_disk_use_arg)
317 : m_pending(0), saved_max_binlog_cache_size(max_binlog_cache_size_arg),
318 ptr_binlog_cache_use(ptr_binlog_cache_use_arg),
319 ptr_binlog_cache_disk_use(ptr_binlog_cache_disk_use_arg)
322 flags.transactional= trx_cache_arg;
323 cache_log.end_of_file= saved_max_binlog_cache_size;
327 int flush(THD *thd, my_off_t *bytes,
bool *wrote_xid);
332 DBUG_ASSERT(is_binlog_empty());
333 close_cached_file(&cache_log);
336 bool is_binlog_empty()
const
338 my_off_t pos= my_b_tell(&cache_log);
339 DBUG_PRINT(
"debug", (
"%s_cache - pending: 0x%llx, bytes: %llu",
340 (flags.transactional ?
"trx" :
"stmt"),
341 (ulonglong) pending(), (ulonglong) pos));
342 return pending() == NULL && pos == 0;
345 bool is_group_cache_empty()
const
351 bool dbug_is_finalized()
const {
352 return flags.finalized;
366 void set_incident(
void)
368 flags.incident=
true;
371 bool has_incident(
void)
const
373 return flags.incident;
376 bool has_xid()
const {
378 DBUG_ASSERT((flags.transactional && flags.with_xid) || !flags.with_xid);
379 return flags.with_xid;
382 bool is_trx_cache()
const
384 return flags.transactional;
387 my_off_t get_byte_position()
const
389 return my_b_tell(&cache_log);
394 compute_statistics();
396 flags.incident=
false;
397 flags.with_xid=
false;
398 flags.immediate=
false;
399 flags.finalized=
false;
407 cache_log.disk_writes= 0;
409 DBUG_ASSERT(is_binlog_empty());
428 my_off_t reset_write_pos(my_off_t pos,
bool use_reinit)
430 DBUG_ENTER(
"reset_write_pos");
431 DBUG_ASSERT(cache_log.type == WRITE_CACHE);
433 my_off_t oldpos= get_byte_position();
436 reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
438 my_b_seek(&cache_log, pos);
458 void truncate(my_off_t pos)
460 DBUG_PRINT(
"info", (
"truncating to position %lu", (ulong) pos));
462 reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
463 cache_log.end_of_file= saved_max_binlog_cache_size;
472 m_pending->set_flags(Rows_log_event::STMT_END_F);
473 if (
int error= write_event(thd, m_pending))
475 thd->clear_binlog_table_maps();
493 bool transactional:1;
528 void compute_statistics()
530 if (!is_binlog_empty())
532 statistic_increment(*ptr_binlog_cache_use, &LOCK_status);
533 if (cache_log.disk_writes != 0)
534 statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status);
543 my_off_t saved_max_binlog_cache_size;
550 ulong *ptr_binlog_cache_use;
557 ulong *ptr_binlog_cache_disk_use;
569 my_off_t max_binlog_cache_size_arg,
570 ulong *ptr_binlog_cache_use_arg,
571 ulong *ptr_binlog_cache_disk_use_arg)
573 max_binlog_cache_size_arg,
574 ptr_binlog_cache_use_arg,
575 ptr_binlog_cache_disk_use_arg)
581 int finalize(THD *thd);
586 binlog_stmt_cache_data::finalize(THD *thd)
590 if (
int error= finalize(thd, NULL))
596 end_evt(thd, STRING_WITH_LEN(
"COMMIT"),
false,
false,
true, 0,
true);
597 if (
int error= finalize(thd, &end_evt))
608 my_off_t max_binlog_cache_size_arg,
609 ulong *ptr_binlog_cache_use_arg,
610 ulong *ptr_binlog_cache_disk_use_arg)
612 max_binlog_cache_size_arg,
613 ptr_binlog_cache_use_arg,
614 ptr_binlog_cache_disk_use_arg),
615 m_cannot_rollback(FALSE), before_stmt_pos(MY_OFF_T_UNDEF)
621 DBUG_PRINT(
"enter", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
622 m_cannot_rollback= FALSE;
623 before_stmt_pos= MY_OFF_T_UNDEF;
624 binlog_cache_data::reset();
625 DBUG_PRINT(
"return", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
629 bool cannot_rollback()
const
631 return m_cannot_rollback;
634 void set_cannot_rollback()
636 m_cannot_rollback= TRUE;
639 my_off_t get_prev_position()
const
641 return before_stmt_pos;
644 void set_prev_position(my_off_t pos)
646 DBUG_ENTER(
"set_prev_position");
647 DBUG_PRINT(
"enter", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
648 before_stmt_pos= pos;
649 DBUG_PRINT(
"return", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
653 void restore_prev_position()
655 DBUG_ENTER(
"restore_prev_position");
656 DBUG_PRINT(
"enter", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
658 before_stmt_pos= MY_OFF_T_UNDEF;
659 DBUG_PRINT(
"return", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
663 void restore_savepoint(my_off_t pos)
665 DBUG_ENTER(
"restore_savepoint");
666 DBUG_PRINT(
"enter", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
668 if (pos <= before_stmt_pos)
669 before_stmt_pos= MY_OFF_T_UNDEF;
670 DBUG_PRINT(
"return", (
"before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
683 bool m_cannot_rollback;
688 my_off_t before_stmt_pos;
697 ulong *ptr_binlog_stmt_cache_use_arg,
698 ulong *ptr_binlog_stmt_cache_disk_use_arg,
699 my_off_t max_binlog_cache_size_arg,
700 ulong *ptr_binlog_cache_use_arg,
701 ulong *ptr_binlog_cache_disk_use_arg)
702 : stmt_cache(FALSE, max_binlog_stmt_cache_size_arg,
703 ptr_binlog_stmt_cache_use_arg,
704 ptr_binlog_stmt_cache_disk_use_arg),
705 trx_cache(TRUE, max_binlog_cache_size_arg,
706 ptr_binlog_cache_use_arg,
707 ptr_binlog_cache_disk_use_arg)
712 if (is_transactional)
718 IO_CACHE* get_binlog_cache_log(
bool is_transactional)
720 return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log);
727 return stmt_cache.is_binlog_empty() && trx_cache.is_binlog_empty();
731 bool dbug_any_finalized()
const {
732 return stmt_cache.dbug_is_finalized() || trx_cache.dbug_is_finalized();
747 int flush(THD *thd, my_off_t *bytes_written,
bool *wrote_xid)
749 my_off_t stmt_bytes= 0;
750 my_off_t trx_bytes= 0;
751 DBUG_ASSERT(stmt_cache.has_xid() == 0 && trx_cache.has_xid() <= 1);
752 if (
int error= stmt_cache.
flush(thd, &stmt_bytes, wrote_xid))
754 if (
int error= trx_cache.
flush(thd, &trx_bytes, wrote_xid))
756 *bytes_written= stmt_bytes + trx_bytes;
776 DBUG_ASSERT(opt_bin_log);
787 if (binlog_cache_size > max_binlog_cache_size)
791 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
792 ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX,
793 ER(ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX),
794 (ulong) binlog_cache_size,
795 (ulong) max_binlog_cache_size);
799 sql_print_warning(ER_DEFAULT(ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX),
800 (ulong) binlog_cache_size,
801 (ulong) max_binlog_cache_size);
803 binlog_cache_size= max_binlog_cache_size;
813 if (binlog_stmt_cache_size > max_binlog_stmt_cache_size)
817 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
818 ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX,
819 ER(ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX),
820 (ulong) binlog_stmt_cache_size,
821 (ulong) max_binlog_stmt_cache_size);
825 sql_print_warning(ER_DEFAULT(ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX),
826 (ulong) binlog_stmt_cache_size,
827 (ulong) max_binlog_stmt_cache_size);
829 binlog_stmt_cache_size= max_binlog_stmt_cache_size;
838 return(binlog_hton && binlog_hton->slot != HA_SLOT_UNDEF);
857 binlog_trans_log_savepos(THD *thd, my_off_t *pos)
859 DBUG_ENTER(
"binlog_trans_log_savepos");
860 DBUG_ASSERT(pos != NULL);
862 DBUG_ASSERT(mysql_bin_log.is_open());
863 *pos= cache_mngr->trx_cache.get_byte_position();
864 DBUG_PRINT(
"return", (
"position: %lu", (ulong) *pos));
875 static int binlog_init(
void *p)
878 binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
879 binlog_hton->db_type=DB_TYPE_BINLOG;
880 binlog_hton->savepoint_offset=
sizeof(my_off_t);
881 binlog_hton->close_connection= binlog_close_connection;
882 binlog_hton->savepoint_set= binlog_savepoint_set;
883 binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
884 binlog_hton->commit= binlog_commit;
885 binlog_hton->rollback= binlog_rollback;
886 binlog_hton->prepare= binlog_prepare;
887 binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
891 static int binlog_close_connection(
handlerton *hton, THD *thd)
893 DBUG_ENTER(
"binlog_close_connection");
896 DBUG_ASSERT(cache_mngr->trx_cache.is_group_cache_empty() &&
897 cache_mngr->stmt_cache.is_group_cache_empty());
898 DBUG_PRINT(
"debug", (
"Set ha_data slot %d to 0x%llx", binlog_hton->slot, (ulonglong) NULL));
899 thd_set_ha_data(thd, binlog_hton, NULL);
900 cache_mngr->~binlog_cache_mngr();
905 int binlog_cache_data::write_event(THD *thd,
Log_event *ev)
907 DBUG_ENTER(
"binlog_cache_data::write_event");
912 group_cache.add_logged_group(thd, get_byte_position());
913 if (status == Group_cache::ERROR)
915 else if (status == Group_cache::APPEND_NEW_GROUP)
918 if (gtid_ev.write(&cache_log) != 0)
925 DBUG_EXECUTE_IF(
"simulate_disk_full_at_flush_pending",
926 {DBUG_SET(
"+d,simulate_file_write_error");});
927 if (ev->write(&cache_log) != 0)
929 DBUG_EXECUTE_IF(
"simulate_disk_full_at_flush_pending",
931 DBUG_SET(
"-d,simulate_file_write_error");
932 DBUG_SET(
"-d,simulate_disk_full_at_flush_pending");
940 DBUG_SET(
"+d,simulate_do_write_cache_failure");
944 if (ev->get_type_code() == XID_EVENT)
945 flags.with_xid=
true;
946 if (ev->is_using_immediate_logging())
947 flags.immediate=
true;
963 static int write_one_empty_group_to_cache(THD *thd,
967 DBUG_ENTER(
"write_one_empty_group_to_cache");
980 #ifdef NON_ERROR_GTID
981 IO_CACHE *cache= &cache_data->cache_log;
983 if (status == Group_cache::ERROR)
985 DBUG_ASSERT(status == Group_cache::APPEND_NEW_GROUP);
988 if (gtid_ev.write(cache) != 0)
1005 DBUG_ENTER(
"write_empty_groups_to_cache");
1006 if (thd->owned_gtid.sidno == -1)
1008 #ifdef HAVE_GTID_NEXT_LIST
1010 Gtid gtid= git.get();
1011 while (gtid.
sidno != 0)
1013 if (write_one_empty_group_to_cache(thd, cache_data, gtid) != 0)
1022 else if (thd->owned_gtid.sidno > 0)
1023 if (write_one_empty_group_to_cache(thd, cache_data, thd->owned_gtid) != 0)
1036 DBUG_ENTER(
"gtid_before_write_cache");
1039 DBUG_ASSERT(thd->variables.gtid_next.type != UNDEFINED_GROUP);
1046 global_sid_lock->
rdlock();
1048 if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
1053 global_sid_lock->
unlock();
1057 if (write_empty_groups_to_cache(thd, cache_data) != 0)
1059 global_sid_lock->
unlock();
1063 global_sid_lock->
unlock();
1069 if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
1073 DBUG_ASSERT(cached_group->spec.type != AUTOMATIC_GROUP);
1075 &cached_group->spec);
1076 bool using_file= cache_data->cache_log.pos_in_file > 0;
1077 my_off_t saved_position= cache_data->reset_write_pos(0, using_file);
1078 error= gtid_ev.write(&cache_data->cache_log);
1079 cache_data->reset_write_pos(saved_position, using_file);
1103 DBUG_ENTER(
"gtid_empty_group_log_and_cleanup");
1106 FALSE, TRUE, 0, TRUE);
1107 DBUG_ASSERT(!qinfo.is_using_immediate_logging());
1112 if (thd->binlog_setup_trx_data())
1114 cache_data= &thd_get_cache_mngr(thd)->trx_cache;
1115 DBUG_PRINT(
"debug", (
"Writing to trx_cache"));
1116 if (cache_data->write_event(thd, &qinfo) ||
1117 gtid_before_write_cache(thd, cache_data))
1120 ret= mysql_bin_log.
commit(thd,
true);
1145 DBUG_ENTER(
"binlog_cache_data::finalize");
1146 if (!is_binlog_empty())
1148 DBUG_ASSERT(!flags.finalized);
1151 if (
int error= write_event(thd, end_event))
1153 flags.finalized=
true;
1154 DBUG_PRINT(
"debug", (
"flags.finalized: %s", YESNO(flags.finalized)));
1190 DBUG_ENTER(
"binlog_cache_data::flush");
1191 DBUG_PRINT(
"debug", (
"flags.finalized: %s", YESNO(flags.finalized)));
1193 if (flags.finalized)
1195 my_off_t bytes_in_cache= my_b_tell(&cache_log);
1196 DBUG_PRINT(
"debug", (
"bytes_in_cache: %llu", bytes_in_cache));
1202 if (!(error= gtid_before_write_cache(thd,
this)))
1205 if (flags.with_xid && error == 0)
1214 *bytes_written= bytes_in_cache;
1216 DBUG_ASSERT(!flags.finalized);
1235 DBUG_ENTER(
"binlog_trx_cache_data::truncate");
1238 DBUG_PRINT(
"info", (
"thd->options={ %s %s}, transaction: %s",
1239 FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
1240 FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
1241 all ?
"all" :
"stmt"));
1259 else if (get_prev_position() != MY_OFF_T_UNDEF)
1261 restore_prev_position();
1262 if (is_binlog_empty())
1278 group_cache.
clear();
1282 thd->clear_binlog_table_maps();
1287 static int binlog_prepare(
handlerton *hton, THD *thd,
bool all)
1314 static int binlog_commit(
handlerton *hton, THD *thd,
bool all)
1316 DBUG_ENTER(
"binlog_commit");
1342 static int binlog_rollback(
handlerton *hton, THD *thd,
bool all)
1344 DBUG_ENTER(
"binlog_rollback");
1346 if (thd->lex->sql_command == SQLCOM_ROLLBACK_TO_SAVEPOINT)
1347 error= mysql_bin_log.
rollback(thd, all);
1355 DBUG_ENTER(
"Stage_manager::Mutex_queue::append");
1357 DBUG_PRINT(
"enter", (
"first: 0x%llx", (ulonglong) first));
1358 DBUG_PRINT(
"info", (
"m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1359 (ulonglong) m_first, (ulonglong) &m_first,
1360 (ulonglong) m_last));
1361 bool empty= (m_first == NULL);
1363 DBUG_PRINT(
"info", (
"m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1364 (ulonglong) m_first, (ulonglong) &m_first,
1365 (ulonglong) m_last));
1371 while (first->next_to_commit)
1372 first= first->next_to_commit;
1373 m_last= &first->next_to_commit;
1374 DBUG_PRINT(
"info", (
"m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1375 (ulonglong) m_first, (ulonglong) &m_first,
1376 (ulonglong) m_last));
1377 DBUG_ASSERT(m_first || m_last == &m_first);
1378 DBUG_PRINT(
"return", (
"empty: %s", YESNO(empty)));
1384 std::pair<bool, THD*>
1385 Stage_manager::Mutex_queue::pop_front()
1387 DBUG_ENTER(
"Stage_manager::Mutex_queue::pop_front");
1389 THD *result= m_first;
1398 m_first= result->next_to_commit;
1399 if (m_first == NULL)
1404 DBUG_ASSERT(m_first || m_last == &m_first);
1406 DBUG_PRINT(
"return", (
"result: 0x%llx, more: %s",
1407 (ulonglong) result, YESNO(more)));
1408 DBUG_RETURN(std::make_pair(more, result));
1416 DBUG_PRINT(
"debug", (
"Enqueue 0x%llx to queue for stage %d",
1417 (ulonglong) thd, stage));
1418 bool leader= m_queue[stage].
append(thd);
1441 thd->transaction.flags.ready_preempt= 1;
1442 if (leader_await_preempt_status)
1445 while (thd->transaction.flags.pending)
1455 DBUG_ENTER(
"Stage_manager::Mutex_queue::fetch_and_empty");
1457 DBUG_PRINT(
"enter", (
"m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1458 (ulonglong) m_first, (ulonglong) &m_first,
1459 (ulonglong) m_last));
1460 THD *result= m_first;
1463 DBUG_PRINT(
"info", (
"m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
1464 (ulonglong) m_first, (ulonglong) &m_first,
1465 (ulonglong) m_last));
1466 DBUG_ASSERT(m_first || m_last == &m_first);
1467 DBUG_PRINT(
"return", (
"result: 0x%llx", (ulonglong) result));
1469 DBUG_RETURN(result);
1478 while(!head->transaction.flags.ready_preempt)
1480 leader_await_preempt_status=
true;
1483 leader_await_preempt_status=
false;
1523 bool stuff_logged=
false;
1526 DBUG_ENTER(
"MYSQL_BIN_LOG::rollback(THD *thd, bool all)");
1527 DBUG_PRINT(
"enter", (
"all: %s, cache_mngr: 0x%llx, thd->is_error: %s",
1528 YESNO(all), (ulonglong) cache_mngr, YESNO(thd->is_error())));
1538 if (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT)
1539 if ((error= ha_rollback_low(thd, all)))
1546 if (cache_mngr == NULL || cache_mngr->is_binlog_empty())
1550 (
"all.cannot_safely_rollback(): %s, trx_cache_empty: %s",
1551 YESNO(thd->transaction.all.cannot_safely_rollback()),
1552 YESNO(cache_mngr->trx_cache.is_binlog_empty())));
1554 (
"stmt.cannot_safely_rollback(): %s, stmt_cache_empty: %s",
1555 YESNO(thd->transaction.stmt.cannot_safely_rollback()),
1556 YESNO(cache_mngr->stmt_cache.is_binlog_empty())));
1562 if (cache_mngr->stmt_cache.has_incident())
1564 error= write_incident(thd,
true);
1565 cache_mngr->stmt_cache.reset();
1567 else if (!cache_mngr->stmt_cache.is_binlog_empty())
1569 if ((error= cache_mngr->stmt_cache.finalize(thd)))
1583 end_evt(thd, STRING_WITH_LEN(
"ROLLBACK"),
true,
false,
true, 0,
true);
1584 error= cache_mngr->trx_cache.finalize(thd, &end_evt);
1593 error= cache_mngr->trx_cache.truncate(thd, all);
1618 if (thd->transaction.stmt.has_dropped_temp_table() ||
1619 thd->transaction.stmt.has_created_temp_table() ||
1620 (thd->transaction.stmt.has_modified_non_trans_table() &&
1621 thd->variables.binlog_format == BINLOG_FORMAT_STMT))
1629 cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
1637 error= cache_mngr->trx_cache.truncate(thd, all);
1641 DBUG_PRINT(
"debug", (
"error: %d", error));
1642 if (error == 0 && stuff_logged)
1643 error= ordered_commit(thd, all,
true);
1645 if (check_write_error(thd))
1658 error |= cache_mngr->trx_cache.truncate(thd, all);
1666 if (!thd->in_active_multi_stmt_transaction())
1669 DBUG_PRINT(
"return", (
"error: %d", error));
1697 static int binlog_savepoint_set(
handlerton *hton, THD *thd,
void *sv)
1699 DBUG_ENTER(
"binlog_savepoint_set");
1703 if (log_query.append(STRING_WITH_LEN(
"SAVEPOINT ")))
1706 append_identifier(thd, &log_query, thd->lex->ident.str,
1707 thd->lex->ident.length);
1709 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
1710 Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
1711 TRUE, FALSE, TRUE, errcode);
1726 binlog_trans_log_savepos(thd, (my_off_t*) sv);
1731 static int binlog_savepoint_rollback(
handlerton *hton, THD *thd,
void *sv)
1733 DBUG_ENTER(
"binlog_savepoint_rollback");
1735 my_off_t pos= *(my_off_t*) sv;
1736 DBUG_ASSERT(pos != ~(my_off_t) 0);
1746 if (log_query.append(STRING_WITH_LEN(
"ROLLBACK TO ")) ||
1747 log_query.append(
"`") ||
1748 log_query.append(thd->lex->ident.str, thd->lex->ident.length) ||
1749 log_query.append(
"`"))
1751 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
1752 Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
1753 TRUE, FALSE, TRUE, errcode);
1757 cache_mngr->trx_cache.restore_savepoint(pos);
1758 if (cache_mngr->trx_cache.is_binlog_empty())
1763 #ifdef HAVE_REPLICATION
1786 static void adjust_linfo_offsets(my_off_t purge_offset)
1790 Thread_iterator it= global_thread_list_begin();
1791 Thread_iterator end= global_thread_list_end();
1792 for (; it != end; ++it)
1795 if ((linfo = (*it)->current_linfo))
1803 if (linfo->index_file_offset < purge_offset)
1804 linfo->fatal = (linfo->index_file_offset != 0);
1806 linfo->index_file_offset -= purge_offset;
1814 static int log_in_use(
const char* log_name)
1816 size_t log_name_len = strlen(log_name) + 1;
1821 Thread_iterator it= global_thread_list_begin();
1822 Thread_iterator end= global_thread_list_end();
1823 for (; it != end; ++it)
1826 if ((linfo = (*it)->current_linfo))
1829 if(!memcmp(log_name, linfo->log_file_name, log_name_len))
1832 sql_print_warning(
"file %s was not purged because it was being read"
1833 "by thread number %llu", log_name,
1834 (ulonglong)(*it)->thread_id);
1841 return thread_count;
1844 static bool purge_error_message(THD* thd,
int res)
1850 my_message(errcode, ER(errcode), MYF(0));
1859 int check_binlog_magic(
IO_CACHE* log,
const char** errmsg)
1862 DBUG_ASSERT(my_b_tell(log) == 0);
1864 if (my_b_read(log, (uchar*) magic,
sizeof(magic)))
1866 *errmsg =
"I/O error reading the header from the binary log";
1867 sql_print_error(
"%s, errno=%d, io cache code=%d", *errmsg, my_errno,
1871 if (memcmp(magic, BINLOG_MAGIC,
sizeof(magic)))
1873 *errmsg =
"Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
1883 DBUG_ENTER(
"open_binlog_file");
1886 log_file_name, O_RDONLY | O_BINARY | O_SHARE,
1889 sql_print_error(
"Failed to open log (file '%s', errno %d)",
1890 log_file_name, my_errno);
1891 *errmsg =
"Could not open log file";
1894 if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
1895 MYF(MY_WME|MY_DONT_CHECK_FILESIZE)))
1897 sql_print_error(
"Failed to create a cache on log (file '%s')",
1899 *errmsg =
"Could not open log file";
1902 if (check_binlog_magic(log,errmsg))
1928 return (cache_mngr ? !cache_mngr->trx_cache.is_binlog_empty() : 0);
1942 Ha_trx_info *ha_info;
1944 for (ha_info= thd->transaction.stmt.ha_list; ha_info;
1945 ha_info= ha_info->next())
1947 if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
1981 return (!all && !thd->in_multi_stmt_transaction_mode());
1995 return cache_mngr->trx_cache.cannot_rollback();
2007 return thd->transaction.stmt.cannot_safely_rollback();
2010 #ifndef EMBEDDED_LIBRARY
2024 char search_file_name[FN_REFLEN];
2025 if (!mysql_bin_log.is_open())
2032 return purge_error_message(thd,
2033 mysql_bin_log.purge_logs(search_file_name,
false,
2053 if (!mysql_bin_log.is_open())
2058 return purge_error_message(thd,
2059 mysql_bin_log.purge_logs_before_date(purge_time,
2067 int query_error_code(THD *thd,
bool not_killed)
2071 if (not_killed || (thd->killed == THD::KILL_BAD_DATA))
2073 error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
2080 if (error == ER_SERVER_SHUTDOWN || error == ER_QUERY_INTERRUPTED)
2086 DBUG_ASSERT(!(thd->system_thread & SYSTEM_THREAD_DELAYED_INSERT));
2087 error= thd->killed_errno();
2114 uchar io_buf[IO_SIZE*2];
2115 DBUG_ENTER(
"copy_file");
2120 if ((bytes_read= (
int)
mysql_file_read(from->file, io_buf,
sizeof(io_buf),
2124 if (DBUG_EVALUATE_IF(
"fault_injection_copy_part_file", 1, 0))
2125 bytes_read= bytes_read/2;
2139 #ifdef HAVE_REPLICATION
2152 DBUG_ENTER(
"log_loaded_block");
2156 uchar* buffer= (uchar*) my_b_get_buffer_start(file);
2157 uint max_event_size= current_thd->variables.max_allowed_packet;
2159 if (lf_info->thd->is_current_stmt_binlog_format_row())
2161 if (lf_info->last_pos_in_file != HA_POS_ERROR &&
2162 lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
2165 for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
2166 buffer += min(block_len, max_event_size),
2167 block_len -= min(block_len, max_event_size))
2169 lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
2170 if (lf_info->wrote_create_file)
2173 min(block_len, max_event_size),
2174 lf_info->log_delayed);
2182 min(block_len, max_event_size),
2183 lf_info->log_delayed);
2186 lf_info->wrote_create_file= 1;
2193 bool show_binlog_events(THD *thd,
MYSQL_BIN_LOG *binary_log)
2197 const char *errmsg = 0;
2201 int old_max_allowed_packet= thd->variables.max_allowed_packet;
2204 DBUG_ENTER(
"show_binlog_events");
2206 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
2207 thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
2212 if (binary_log->is_open())
2214 LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
2215 SELECT_LEX_UNIT *unit= &thd->lex->unit;
2216 ha_rows event_count, limit_start, limit_end;
2217 my_off_t pos = max<my_off_t>(BIN_LOG_HEADER_SIZE, lex_mi->pos);
2218 char search_file_name[FN_REFLEN], *
name;
2219 const char *log_file_name = lex_mi->log_file_name;
2223 unit->set_limit(thd->lex->current_select);
2224 limit_start= unit->offset_limit_cnt;
2225 limit_end= unit->select_limit_cnt;
2227 name= search_file_name;
2233 linfo.index_file_offset = 0;
2237 errmsg =
"Could not find target log";
2242 thd->current_linfo = &linfo;
2251 thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
2253 DEBUG_SYNC(thd,
"after_show_binlog_event_found_file");
2265 ev= Log_event::read_log_event(&log, (
mysql_mutex_t*)0, description_event,
2266 opt_master_verify_checksum);
2269 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
2271 delete description_event;
2278 my_b_seek(&log, pos);
2280 if (!description_event->is_valid())
2282 errmsg=
"Invalid Format_description event; could be out of memory";
2286 for (event_count = 0;
2289 opt_master_verify_checksum)); )
2291 if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
2292 description_event->checksum_alg= ev->checksum_alg;
2294 if (event_count >= limit_start &&
2295 ev->net_send(protocol, linfo.log_file_name, pos))
2297 errmsg =
"Net error";
2303 pos = my_b_tell(&log);
2306 if (++event_count >= limit_end)
2310 if (event_count < limit_end && log.error)
2312 errmsg =
"Wrong offset or I/O error";
2320 DEBUG_SYNC(thd,
"after_show_binlog_events");
2325 delete description_event;
2333 my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
2334 "SHOW BINLOG EVENTS", errmsg);
2339 thd->current_linfo = 0;
2341 thd->variables.max_allowed_packet= old_max_allowed_packet;
2354 bool mysql_show_binlog_events(THD* thd)
2358 DBUG_ENTER(
"mysql_show_binlog_events");
2360 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS);
2362 Log_event::init_show_field_list(&field_list);
2364 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2372 ha_binlog_wait(thd);
2374 DBUG_RETURN(show_binlog_events(thd, &mysql_bin_log));
2380 MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
2381 :bytes_written(0), file_id(1), open_count(1),
2382 sync_period_ptr(sync_period), sync_counter(0),
2384 is_relay_log(0), signal_cnt(0),
2385 checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
2386 relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
2387 previous_gtid_set(0)
2395 index_file_name[0] = 0;
2396 memset(&index_file, 0,
sizeof(index_file));
2397 memset(&purge_index_file, 0,
sizeof(purge_index_file));
2398 memset(&crash_safe_index_file, 0,
sizeof(crash_safe_index_file));
2404 void MYSQL_BIN_LOG::cleanup()
2406 DBUG_ENTER(
"cleanup");
2410 close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
2417 my_atomic_rwlock_destroy(&m_prep_xids_lock);
2419 stage_manager.deinit();
2425 void MYSQL_BIN_LOG::init_pthread_objects()
2427 MYSQL_LOG::init_pthread_objects();
2433 my_atomic_rwlock_init(&m_prep_xids_lock);
2436 #ifdef HAVE_PSI_INTERFACE
2437 m_key_LOCK_flush_queue,
2438 m_key_LOCK_sync_queue,
2439 m_key_LOCK_commit_queue,
2440 m_key_LOCK_done, m_key_COND_done
2445 bool MYSQL_BIN_LOG::open_index_file(
const char *index_file_name_arg,
2446 const char *log_name,
bool need_lock_index)
2448 File index_file_nr= -1;
2449 DBUG_ASSERT(!my_b_inited(&index_file));
2456 myf opt= MY_UNPACK_FILENAME;
2457 if (!index_file_name_arg)
2459 index_file_name_arg= log_name;
2460 opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT;
2462 fn_format(index_file_name, index_file_name_arg, mysql_data_home,
2467 sql_print_error(
"MYSQL_BIN_LOG::set_crash_safe_index_file_name failed.");
2476 if (my_access(index_file_name, F_OK) &&
2477 !my_access(crash_safe_index_file_name, F_OK) &&
2478 my_rename(crash_safe_index_file_name, index_file_name, MYF(MY_WME)))
2480 sql_print_error(
"MYSQL_BIN_LOG::open_index_file failed to "
2481 "move crash_safe_index_file to index file.");
2487 O_RDWR | O_CREAT | O_BINARY,
2488 MYF(MY_WME))) < 0 ||
2490 init_io_cache(&index_file, index_file_nr,
2491 IO_SIZE, READ_CACHE,
2493 0, MYF(MY_WME | MY_WAIT_IF_FULL)) ||
2494 DBUG_EVALUATE_IF(
"fault_injection_openning_index", 1, 0))
2502 if (index_file_nr >= 0)
2507 #ifdef HAVE_REPLICATION
2516 if (set_purge_index_file_name(index_file_name_arg) ||
2517 open_purge_index_file(FALSE) ||
2518 purge_index_entry(NULL, NULL, need_lock_index) ||
2519 close_purge_index_file() ||
2520 DBUG_EVALUATE_IF(
"fault_injection_recovering_index", 1, 0))
2522 sql_print_error(
"MYSQL_BIN_LOG::open_index_file failed to sync the index "
2556 { GOT_GTIDS, GOT_PREVIOUS_GTIDS, NO_GTIDS, ERROR, TRUNCATED };
2558 read_gtids_from_binlog(
const char *filename,
Gtid_set *all_gtids,
2559 Gtid_set *prev_gtids,
bool verify_checksum)
2561 DBUG_ENTER(
"read_gtids_from_binlog");
2562 DBUG_PRINT(
"info", (
"Opening file %s", filename));
2569 if (!fd_ev.is_valid())
2575 const char *errmsg= NULL;
2578 sql_print_error(
"%s", errmsg);
2584 DBUG_RETURN(TRUNCATED);
2591 my_b_seek(&log, BIN_LOG_HEADER_SIZE);
2596 (ev= Log_event::read_log_event(&log, 0, fd_ev_p, verify_checksum)) !=
2599 DBUG_PRINT(
"info", (
"Read event of type %s", ev->
get_type_str()));
2600 switch (ev->get_type_code())
2602 case FORMAT_DESCRIPTION_EVENT:
2603 if (fd_ev_p != &fd_ev)
2610 case PREVIOUS_GTIDS_LOG_EVENT:
2614 my_error(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF, MYF(0));
2617 ret= GOT_PREVIOUS_GTIDS;
2621 if (all_gtids != NULL && prev_gtids_ev->
add_to_set(all_gtids) != 0)
2622 ret= ERROR, done=
true;
2623 else if (prev_gtids != NULL && prev_gtids_ev->
add_to_set(prev_gtids) != 0)
2624 ret= ERROR, done=
true;
2626 char* prev_buffer= prev_gtids_ev->
get_str(NULL, NULL);
2627 DBUG_PRINT(
"info", (
"Got Previous_gtids from file '%s': Gtid_set='%s'.",
2628 filename, prev_buffer));
2629 my_free(prev_buffer);
2633 case GTID_LOG_EVENT:
2635 DBUG_EXECUTE_IF(
"inject_fault_bug16502579", {
2636 DBUG_PRINT(
"debug", (
"GTID_LOG_EVENT found. Injected ret=NO_GTIDS."));
2639 if (ret != GOT_GTIDS)
2641 if (ret != GOT_PREVIOUS_GTIDS)
2647 const char* msg_fmt= (current_thd != NULL) ?
2648 ER(ER_BINLOG_LOGICAL_CORRUPTION) :
2649 ER_DEFAULT(ER_BINLOG_LOGICAL_CORRUPTION);
2650 my_printf_error(ER_BINLOG_LOGICAL_CORRUPTION,
2653 "The first global transaction identifier was read, but "
2654 "no other information regarding identifiers existing "
2655 "on the previous log files was found.");
2656 ret= ERROR, done=
true;
2668 if (all_gtids == NULL)
2669 ret= GOT_GTIDS, done=
true;
2673 rpl_sidno sidno= gtid_ev->
get_sidno(
false);
2675 ret= ERROR, done=
true;
2677 if (all_gtids->
ensure_sidno(sidno) != RETURN_STATUS_OK)
2678 ret= ERROR, done=
true;
2681 ret= ERROR, done=
true;
2683 DBUG_PRINT(
"info", (
"Got Gtid from file '%s': Gtid(%d, %lld).",
2684 filename, sidno, gtid_ev->
get_gno()));
2688 case ANONYMOUS_GTID_LOG_EVENT:
2693 if (ret != GOT_GTIDS && ret != GOT_PREVIOUS_GTIDS)
2699 DBUG_PRINT(
"info", (
"done=%d", done));
2707 sql_print_warning(
"Error reading GTIDs from binary log: %d", log.error);
2710 if (fd_ev_p != &fd_ev)
2719 DBUG_PRINT(
"info", (
"returning %d", ret));
2725 const char **errmsg)
2727 DBUG_ENTER(
"MYSQL_BIN_LOG::gtid_read_start_binlog");
2731 list<string> filename_list;
2735 list<string>::reverse_iterator rit;
2742 DBUG_PRINT(
"info", (
"read log filename '%s'", linfo.log_file_name));
2743 filename_list.push_back(
string(linfo.log_file_name));
2746 if (error != LOG_INFO_EOF)
2748 *errmsg=
"Failed to read the binary log index file while "
2749 "looking for the oldest binary log that contains any GTID "
2750 "that is not in the given gtid set";
2755 if (filename_list.empty())
2757 *errmsg=
"Could not find first log file name in binary log index file "
2758 "while looking for the oldest binary log that contains any GTID "
2759 "that is not in the given gtid set";
2771 DBUG_PRINT(
"info", (
"Iterating backwards through binary logs, and reading "
2772 "only the Previous_gtids_log_event, to find the first "
2773 "one, that is the subset of the given gtid set."));
2774 rit= filename_list.rbegin();
2776 while (rit != filename_list.rend())
2778 const char *filename= rit->c_str();
2779 DBUG_PRINT(
"info", (
"Read Previous_gtids_log_event from filename='%s'",
2781 switch (read_gtids_from_binlog(filename, NULL, &previous_gtid_set,
2782 opt_master_verify_checksum))
2785 *errmsg=
"Error reading header of binary log while looking for "
2786 "the oldest binary log that contains any GTID that is not in "
2787 "the given gtid set";
2791 *errmsg=
"Found old binary log without GTIDs while looking for "
2792 "the oldest binary log that contains any GTID that is not in "
2793 "the given gtid set";
2797 case GOT_PREVIOUS_GTIDS:
2798 if (previous_gtid_set.
is_subset(gtid_set))
2800 strcpy(binlog_file_name, filename);
2804 DBUG_EXECUTE_IF(
"slave_reconnect_with_gtid_set_executed",
2805 DBUG_ASSERT(strcmp(filename_list.begin()->c_str(),
2806 binlog_file_name) != 0););
2812 previous_gtid_set.
clear();
2817 if (rit == filename_list.rend())
2819 *errmsg= ER(ER_MASTER_HAS_PURGED_REQUIRED_GTIDS);
2825 DBUG_PRINT(
"error", (
"'%s'", *errmsg));
2826 filename_list.clear();
2827 DBUG_PRINT(
"info", (
"returning %d", error));
2828 DBUG_RETURN(error != 0 ?
true :
false);
2832 bool verify_checksum,
bool need_lock)
2834 DBUG_ENTER(
"MYSQL_BIN_LOG::init_gtid_sets");
2835 DBUG_PRINT(
"info", (
"lost_gtids=%p; so we are recovering a %s log",
2836 lost_gtids, lost_gtids == NULL ?
"relay" :
"binary"));
2846 if (all_gtids != NULL)
2849 global_sid_lock->
wrlock();
2853 if (all_gtids != NULL)
2860 list<string> filename_list;
2864 list<string>::iterator it;
2865 list<string>::reverse_iterator rit;
2866 bool reached_first_file=
false;
2871 DBUG_PRINT(
"info", (
"read log filename '%s'", linfo.log_file_name));
2872 filename_list.push_back(
string(linfo.log_file_name));
2874 if (error != LOG_INFO_EOF)
2876 DBUG_PRINT(
"error", (
"Error reading binlog index"));
2881 if (all_gtids != NULL)
2883 DBUG_PRINT(
"info", (
"Iterating backwards through binary logs, looking for the last binary log that contains a Previous_gtids_log_event."));
2886 rit= filename_list.rbegin();
2887 bool got_gtids=
false;
2888 reached_first_file= (rit == filename_list.rend());
2889 DBUG_PRINT(
"info", (
"filename='%s' reached_first_file=%d",
2890 rit->c_str(), reached_first_file));
2891 while (!got_gtids && !reached_first_file)
2893 const char *filename= rit->c_str();
2895 reached_first_file= (rit == filename_list.rend());
2896 DBUG_PRINT(
"info", (
"filename='%s' got_gtids=%d reached_first_file=%d",
2897 filename, got_gtids, reached_first_file));
2898 switch (read_gtids_from_binlog(filename, all_gtids,
2899 reached_first_file ? lost_gtids : NULL,
2906 case GOT_PREVIOUS_GTIDS:
2915 if (lost_gtids != NULL && !reached_first_file)
2917 DBUG_PRINT(
"info", (
"Iterating forwards through binary logs, looking for the first binary log that contains a Previous_gtids_log_event."));
2918 for (it= filename_list.begin(); it != filename_list.end(); it++)
2920 const char *filename= it->c_str();
2921 DBUG_PRINT(
"info", (
"filename='%s'", filename));
2922 switch (read_gtids_from_binlog(filename, NULL, lost_gtids,
2930 case GOT_PREVIOUS_GTIDS:
2944 global_sid_lock->
unlock();
2946 if (all_gtids != NULL)
2949 filename_list.clear();
2950 DBUG_PRINT(
"info", (
"returning %d", error));
2951 DBUG_RETURN(error != 0 ?
true :
false);
2970 const char *new_name,
2971 enum cache_type io_cache_type_arg,
2973 bool null_created_arg,
2974 bool need_lock_index,
2981 DBUG_ASSERT(need_sid_lock || !need_lock_index);
2982 DBUG_ENTER(
"MYSQL_BIN_LOG::open_binlog(const char *, ...)");
2983 DBUG_PRINT(
"enter",(
"name: %s", log_name));
2985 if (init_and_set_log_file_name(log_name, new_name, LOG_BIN,
2988 sql_print_error(
"MYSQL_BIN_LOG::open failed to generate new file name.");
2992 #ifdef HAVE_REPLICATION
2993 if (open_purge_index_file(TRUE) ||
2994 register_create_index_entry(log_file_name) ||
2995 sync_purge_index_file() ||
2996 DBUG_EVALUATE_IF(
"fault_injection_registering_index", 1, 0))
3007 DBUG_EXECUTE_IF(
"fault_injection_registering_index", {
3008 if (my_b_inited(&purge_index_file))
3010 end_io_cache(&purge_index_file);
3011 my_close(purge_index_file.file, MYF(0));
3015 sql_print_error(
"MYSQL_BIN_LOG::open failed to sync the index file.");
3018 DBUG_EXECUTE_IF(
"crash_create_non_critical_before_update_index", DBUG_SUICIDE(););
3024 if (MYSQL_LOG::open(
3025 #ifdef HAVE_PSI_INTERFACE
3028 log_name, LOG_BIN, new_name, io_cache_type_arg))
3030 #ifdef HAVE_REPLICATION
3031 close_purge_index_file();
3036 max_size= max_size_arg;
3040 bool write_file_name_to_index_file=0;
3045 if (!my_b_filelength(&log_file))
3053 if (my_b_safe_write(&log_file, (uchar*) BINLOG_MAGIC,
3054 BIN_LOG_HEADER_SIZE))
3056 bytes_written+= BIN_LOG_HEADER_SIZE;
3057 write_file_name_to_index_file= 1;
3064 if (io_cache_type == WRITE_CACHE)
3065 s.
flags |= LOG_EVENT_BINLOG_IN_USE_F;
3066 s.checksum_alg= is_relay_log ?
3069 (relay_log_checksum_alg=
3070 (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
3071 relay_log_checksum_alg :
3073 (opt_slave_sql_verify_checksum == 0) ?
3074 (uint8) BINLOG_CHECKSUM_ALG_OFF : binlog_checksum_options):
3076 binlog_checksum_options;
3077 DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
3080 s.dont_set_created= null_created_arg;
3083 s.set_relay_log_event();
3084 if (s.write(&log_file))
3086 bytes_written+= s.data_written;
3092 if (current_thd && gtid_mode > 0)
3095 global_sid_lock->
wrlock();
3100 global_sid_lock->
unlock();
3101 prev_gtids_ev.checksum_alg= s.checksum_alg;
3102 if (prev_gtids_ev.write(&log_file))
3104 bytes_written+= prev_gtids_ev.data_written;
3106 if (extra_description_event &&
3107 extra_description_event->binlog_version>=4)
3129 extra_description_event->created= 0;
3131 extra_description_event->set_artificial_event();
3133 if (extra_description_event->write(&log_file))
3135 bytes_written+= extra_description_event->data_written;
3137 if (flush_io_cache(&log_file) ||
3141 if (write_file_name_to_index_file)
3143 #ifdef HAVE_REPLICATION
3144 DBUG_EXECUTE_IF(
"crash_create_critical_before_update_index", DBUG_SUICIDE(););
3147 DBUG_ASSERT(my_b_inited(&index_file) != 0);
3154 if (DBUG_EVALUATE_IF(
"fault_injection_updating_index", 1, 0) ||
3159 #ifdef HAVE_REPLICATION
3160 DBUG_EXECUTE_IF(
"crash_create_after_update_index", DBUG_SUICIDE(););
3164 log_state= LOG_OPENED;
3166 #ifdef HAVE_REPLICATION
3167 close_purge_index_file();
3173 #ifdef HAVE_REPLICATION
3174 if (is_inited_purge_index_file())
3175 purge_index_entry(NULL, NULL, need_lock_index);
3176 close_purge_index_file();
3178 sql_print_error(
"Could not use %s for logging (error %d). \
3179 Turning logging off for the whole duration of the MySQL server process. \
3180 To turn it on again: fix the cause, \
3181 shutdown the MySQL server and restart it.", name, errno);
3184 end_io_cache(&log_file);
3185 end_io_cache(&index_file);
3188 log_state= LOG_CLOSED;
3206 DBUG_ENTER(
"MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file");
3208 if (need_lock_index)
3213 if (my_b_inited(&index_file))
3215 end_io_cache(&index_file);
3219 sql_print_error(
"MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file "
3220 "failed to close the index file.");
3226 DBUG_EXECUTE_IF(
"crash_create_before_rename_index_file", DBUG_SUICIDE(););
3227 if (my_rename(crash_safe_index_file_name, index_file_name, MYF(MY_WME)))
3230 sql_print_error(
"MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file "
3231 "failed to move crash_safe_index_file to index file.");
3234 DBUG_EXECUTE_IF(
"crash_create_after_rename_index_file", DBUG_SUICIDE(););
3238 O_RDWR | O_CREAT | O_BINARY,
3239 MYF(MY_WME))) < 0 ||
3241 init_io_cache(&index_file, fd, IO_SIZE, READ_CACHE,
3243 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
3246 sql_print_error(
"MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file "
3247 "failed to open the index file.");
3252 if (need_lock_index)
3272 int log_name_len,
bool need_lock_index)
3274 DBUG_ENTER(
"MYSQL_BIN_LOG::add_log_to_index");
3278 sql_print_error(
"MYSQL_BIN_LOG::add_log_to_index failed to "
3279 "open the crash safe index file.");
3283 if (copy_file(&index_file, &crash_safe_index_file, 0))
3285 sql_print_error(
"MYSQL_BIN_LOG::add_log_to_index failed to "
3286 "copy index file to crash safe index file.");
3290 if (my_b_write(&crash_safe_index_file, log_name, log_name_len) ||
3291 my_b_write(&crash_safe_index_file, (uchar*)
"\n", 1) ||
3292 flush_io_cache(&crash_safe_index_file) ||
3295 sql_print_error(
"MYSQL_BIN_LOG::add_log_to_index failed to "
3296 "append log file name: %s, to crash "
3297 "safe index file.", log_name);
3303 sql_print_error(
"MYSQL_BIN_LOG::add_log_to_index failed to "
3304 "close the crash safe index file.");
3310 sql_print_error(
"MYSQL_BIN_LOG::add_log_to_index failed to "
3311 "move crash safe index file to index file.");
3321 int MYSQL_BIN_LOG::get_current_log(
LOG_INFO* linfo)
3324 int ret = raw_get_current_log(linfo);
3329 int MYSQL_BIN_LOG::raw_get_current_log(
LOG_INFO* linfo)
3331 strmake(linfo->log_file_name, log_file_name,
sizeof(linfo->log_file_name)-1);
3332 linfo->pos = my_b_tell(&log_file);
3336 bool MYSQL_BIN_LOG::check_write_error(THD *thd)
3338 DBUG_ENTER(
"MYSQL_BIN_LOG::check_write_error");
3340 bool checked= FALSE;
3342 if (!thd->is_error())
3343 DBUG_RETURN(checked);
3345 switch (thd->get_stmt_da()->sql_errno())
3347 case ER_TRANS_CACHE_FULL:
3348 case ER_STMT_CACHE_FULL:
3349 case ER_ERROR_ON_WRITE:
3350 case ER_BINLOG_LOGGING_IMPOSSIBLE:
3354 DBUG_PRINT(
"return", (
"checked: %s", YESNO(checked)));
3355 DBUG_RETURN(checked);
3358 void MYSQL_BIN_LOG::set_write_error(THD *thd,
bool is_transactional)
3360 DBUG_ENTER(
"MYSQL_BIN_LOG::set_write_error");
3364 if (check_write_error(thd))
3367 if (my_errno == EFBIG)
3369 if (is_transactional)
3371 my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME));
3375 my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME));
3380 char errbuf[MYSYS_STRERROR_SIZE];
3381 my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name,
3382 errno, my_strerror(errbuf,
sizeof(errbuf), errno));
3411 bool need_lock_index)
3414 char *full_fname= linfo->log_file_name;
3415 char full_log_name[FN_REFLEN], fname[FN_REFLEN];
3416 uint log_name_len= 0, fname_len= 0;
3417 DBUG_ENTER(
"find_log_pos");
3418 full_log_name[0]= full_fname[0]= 0;
3424 if (need_lock_index)
3432 if(normalize_binlog_name(full_log_name, log_name, is_relay_log))
3434 error= LOG_INFO_EOF;
3439 log_name_len= log_name ? (uint) strlen(full_log_name) : 0;
3440 DBUG_PRINT(
"enter", (
"log_name: %s, full_log_name: %s",
3441 log_name ? log_name :
"NULL", full_log_name));
3444 my_b_seek(&index_file, (my_off_t) 0);
3449 my_off_t offset= my_b_tell(&index_file);
3451 DBUG_EXECUTE_IF(
"simulate_find_log_pos_error",
3452 error= LOG_INFO_EOF;
break;);
3454 if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
3457 error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
3462 if (normalize_binlog_name(full_fname, fname, is_relay_log))
3464 error= LOG_INFO_EOF;
3467 fname_len= (uint) strlen(full_fname);
3471 (log_name_len == fname_len-1 && full_fname[log_name_len] ==
'\n' &&
3472 !memcmp(full_fname, full_log_name, log_name_len)))
3474 DBUG_PRINT(
"info", (
"Found log file entry"));
3475 full_fname[fname_len-1]= 0;
3476 linfo->index_file_start_offset=
offset;
3477 linfo->index_file_offset = my_b_tell(&index_file);
3480 linfo->entry_index++;
3484 if (need_lock_index)
3513 char fname[FN_REFLEN];
3514 char *full_fname= linfo->log_file_name;
3516 if (need_lock_index)
3522 my_b_seek(&index_file, linfo->index_file_offset);
3524 linfo->index_file_start_offset= linfo->index_file_offset;
3525 if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
3527 error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
3533 if(normalize_binlog_name(full_fname, fname, is_relay_log))
3535 error= LOG_INFO_EOF;
3538 length= strlen(full_fname);
3541 full_fname[length-1]= 0;
3542 linfo->index_file_offset= my_b_tell(&index_file);
3545 if (need_lock_index)
3573 const char* save_name;
3574 DBUG_ENTER(
"reset_logs");
3580 if (ha_flush_logs(NULL))
3600 global_sid_lock->
wrlock();
3605 close(LOG_CLOSE_TO_BE_OPENED);
3620 sql_print_error(
"Failed to locate old binlog or relay log files");
3621 my_message(errcode, ER(errcode), MYF(0));
3628 if ((error= my_delete_allow_opened(linfo.log_file_name, MYF(0))) != 0)
3630 if (my_errno == ENOENT)
3632 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3633 ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
3634 linfo.log_file_name);
3635 sql_print_information(
"Failed to delete file '%s'",
3636 linfo.log_file_name);
3642 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3643 ER_BINLOG_PURGE_FATAL_ERR,
3644 "a problem with deleting %s; "
3645 "consider examining correspondence "
3646 "of your binlog index file "
3647 "to the actual binlog files",
3648 linfo.log_file_name);
3658 close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
3659 if ((error= my_delete_allow_opened(index_file_name, MYF(0))))
3661 if (my_errno == ENOENT)
3663 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3664 ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
3666 sql_print_information(
"Failed to delete file '%s'",
3673 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
3674 ER_BINLOG_PURGE_FATAL_ERR,
3675 "a problem with deleting %s; "
3676 "consider examining correspondence "
3677 "of your binlog index file "
3678 "to the actual binlog files",
3685 #ifdef HAVE_REPLICATION
3688 DBUG_ASSERT(active_mi != NULL);
3689 DBUG_ASSERT(active_mi->rli != NULL);
3690 (
const_cast<Gtid_set *
>(active_mi->rli->get_gtid_set()))->clear();
3694 gtid_state->
clear();
3696 if (gtid_state->
init() != 0)
3701 if (!open_index_file(index_file_name, 0,
false))
3702 if ((error=
open_binlog(save_name, 0, io_cache_type,
3708 my_free((
void *) save_name);
3712 name=
const_cast<char*
>(save_name);
3713 global_sid_lock->
unlock();
3732 DBUG_ENTER(
"MYSQL_BIN_LOG::set_crash_safe_index_file_name");
3733 if (fn_format(crash_safe_index_file_name, base_file_name, mysql_data_home,
3734 ".index_crash_safe", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
3735 MY_REPLACE_EXT)) == NULL)
3738 sql_print_error(
"MYSQL_BIN_LOG::set_crash_safe_index_file_name failed "
3739 "to set file name.");
3761 DBUG_ENTER(
"MYSQL_BIN_LOG::open_crash_safe_index_file");
3763 if (!my_b_inited(&crash_safe_index_file))
3765 if ((file= my_open(crash_safe_index_file_name, O_RDWR | O_CREAT | O_BINARY,
3766 MYF(MY_WME | ME_WAITTANG))) < 0 ||
3767 init_io_cache(&crash_safe_index_file, file, IO_SIZE, WRITE_CACHE,
3768 0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
3771 sql_print_error(
"MYSQL_BIN_LOG::open_crash_safe_index_file failed "
3772 "to open temporary index file.");
3794 DBUG_ENTER(
"MYSQL_BIN_LOG::close_crash_safe_index_file");
3796 if (my_b_inited(&crash_safe_index_file))
3798 end_io_cache(&crash_safe_index_file);
3799 error= my_close(crash_safe_index_file.file, MYF(0));
3801 memset(&crash_safe_index_file, 0,
sizeof(crash_safe_index_file));
3848 #ifdef HAVE_REPLICATION
3850 int MYSQL_BIN_LOG::purge_first_log(
Relay_log_info* rli,
bool included)
3853 char *to_purge_if_included= NULL;
3854 DBUG_ENTER(
"purge_first_log");
3856 DBUG_ASSERT(current_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
3857 DBUG_ASSERT(is_relay_log);
3858 DBUG_ASSERT(is_open());
3859 DBUG_ASSERT(rli->slave_running == 1);
3860 DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->get_event_relay_log_name()));
3865 to_purge_if_included= my_strdup(rli->get_group_relay_log_name(), MYF(0));
3871 if((error=
find_log_pos(&rli->linfo, rli->get_event_relay_log_name(),
3876 sql_print_error(
"next log error: %d offset: %s log: %s included: %d",
3878 llstr(rli->linfo.index_file_offset,buff),
3879 rli->get_event_relay_log_name(),
3887 rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
3888 rli->set_event_relay_log_name(rli->linfo.log_file_name);
3897 rli->set_group_relay_log_pos(BIN_LOG_HEADER_SIZE);
3898 rli->set_group_relay_log_name(rli->linfo.log_file_name);
3905 DBUG_EXECUTE_IF(
"crash_before_purge_logs", DBUG_SUICIDE(););
3908 rli->relay_log.purge_logs(to_purge_if_included, included,
3911 &rli->log_space_total,
true);
3913 rli->ignore_log_space_limit= 0;
3927 if((error=
find_log_pos(&rli->linfo, rli->get_event_relay_log_name(),
3931 sql_print_error(
"next log error: %d offset: %s log: %s included: %d",
3933 llstr(rli->linfo.index_file_offset,buff),
3934 rli->get_group_relay_log_name(),
3940 DBUG_ASSERT(!included || rli->linfo.index_file_start_offset == 0);
3943 my_free(to_purge_if_included);
3970 int MYSQL_BIN_LOG::remove_logs_from_index(
LOG_INFO* log_info,
bool need_update_threads)
3974 sql_print_error(
"MYSQL_BIN_LOG::remove_logs_from_index failed to "
3975 "open the crash safe index file.");
3979 if (copy_file(&index_file, &crash_safe_index_file,
3980 log_info->index_file_start_offset))
3982 sql_print_error(
"MYSQL_BIN_LOG::remove_logs_from_index failed to "
3983 "copy index file to crash safe index file.");
3989 sql_print_error(
"MYSQL_BIN_LOG::remove_logs_from_index failed to "
3990 "close the crash safe index file.");
3993 DBUG_EXECUTE_IF(
"fault_injection_copy_part_file", DBUG_SUICIDE(););
3997 sql_print_error(
"MYSQL_BIN_LOG::remove_logs_from_index failed to "
3998 "move crash safe index file to index file.");
4003 if (need_update_threads)
4004 adjust_linfo_offsets(log_info->index_file_start_offset);
4036 int MYSQL_BIN_LOG::purge_logs(
const char *to_log,
4038 bool need_lock_index,
4039 bool need_update_threads,
4040 ulonglong *decrease_log_space,
4043 int error= 0, no_of_log_files_to_purge= 0, no_of_log_files_purged= 0;
4044 int no_of_threads_locking_log= 0;
4047 THD *thd= current_thd;
4048 DBUG_ENTER(
"purge_logs");
4049 DBUG_PRINT(
"info",(
"to_log= %s",to_log));
4051 if (need_lock_index)
4057 sql_print_error(
"MYSQL_BIN_LOG::purge_logs was called with file %s not "
4058 "listed in the index.", to_log);
4062 no_of_log_files_to_purge= log_info.entry_index;
4064 if ((error= open_purge_index_file(TRUE)))
4066 sql_print_error(
"MYSQL_BIN_LOG::purge_logs failed to sync the index file.");
4077 while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)))
4082 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4083 ER_WARN_PURGE_LOG_IS_ACTIVE,
4084 ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
4085 log_info.log_file_name);
4089 if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
4092 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4093 ER_WARN_PURGE_LOG_IN_USE,
4094 ER(ER_WARN_PURGE_LOG_IN_USE),
4095 log_info.log_file_name, no_of_threads_locking_log,
4096 no_of_log_files_purged, no_of_log_files_to_purge);
4099 no_of_log_files_purged++;
4101 if ((error= register_purge_index_entry(log_info.log_file_name)))
4103 sql_print_error(
"MYSQL_BIN_LOG::purge_logs failed to copy %s to register file.",
4104 log_info.log_file_name);
4112 DBUG_EXECUTE_IF(
"crash_purge_before_update_index", DBUG_SUICIDE(););
4114 if ((error= sync_purge_index_file()))
4116 sql_print_error(
"MYSQL_BIN_LOG::purge_logs failed to flush register file.");
4121 if ((error=remove_logs_from_index(&log_info, need_update_threads)))
4123 sql_print_error(
"MYSQL_BIN_LOG::purge_logs failed to update the index file");
4128 if (gtid_mode > 0 && !is_relay_log)
4130 global_sid_lock->
wrlock();
4133 opt_master_verify_checksum,
4135 global_sid_lock->
unlock();
4140 DBUG_EXECUTE_IF(
"crash_purge_critical_after_update_index", DBUG_SUICIDE(););
4144 int error_index= 0, close_error_index= 0;
4146 if (is_inited_purge_index_file() &&
4147 (error_index= purge_index_entry(thd, decrease_log_space,
false)))
4148 sql_print_error(
"MYSQL_BIN_LOG::purge_logs failed to process registered files"
4149 " that would be purged.");
4151 close_error_index= close_purge_index_file();
4153 DBUG_EXECUTE_IF(
"crash_purge_non_critical_after_update_index", DBUG_SUICIDE(););
4155 if (need_lock_index)
4163 error= error ? error : (error_index ? error_index :
4169 int MYSQL_BIN_LOG::set_purge_index_file_name(
const char *base_file_name)
4172 DBUG_ENTER(
"MYSQL_BIN_LOG::set_purge_index_file_name");
4173 if (fn_format(purge_index_file_name, base_file_name, mysql_data_home,
4174 ".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
4175 MY_REPLACE_EXT)) == NULL)
4178 sql_print_error(
"MYSQL_BIN_LOG::set_purge_index_file_name failed to set "
4184 int MYSQL_BIN_LOG::open_purge_index_file(
bool destroy)
4189 DBUG_ENTER(
"MYSQL_BIN_LOG::open_purge_index_file");
4192 close_purge_index_file();
4194 if (!my_b_inited(&purge_index_file))
4196 if ((file= my_open(purge_index_file_name, O_RDWR | O_CREAT | O_BINARY,
4197 MYF(MY_WME | ME_WAITTANG))) < 0 ||
4198 init_io_cache(&purge_index_file, file, IO_SIZE,
4199 (destroy ? WRITE_CACHE : READ_CACHE),
4200 0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
4203 sql_print_error(
"MYSQL_BIN_LOG::open_purge_index_file failed to open register "
4210 int MYSQL_BIN_LOG::close_purge_index_file()
4214 DBUG_ENTER(
"MYSQL_BIN_LOG::close_purge_index_file");
4216 if (my_b_inited(&purge_index_file))
4218 end_io_cache(&purge_index_file);
4219 error= my_close(purge_index_file.file, MYF(0));
4221 my_delete(purge_index_file_name, MYF(0));
4222 memset(&purge_index_file, 0,
sizeof(purge_index_file));
4227 bool MYSQL_BIN_LOG::is_inited_purge_index_file()
4229 DBUG_ENTER(
"MYSQL_BIN_LOG::is_inited_purge_index_file");
4230 DBUG_RETURN (my_b_inited(&purge_index_file));
4233 int MYSQL_BIN_LOG::sync_purge_index_file()
4236 DBUG_ENTER(
"MYSQL_BIN_LOG::sync_purge_index_file");
4238 if ((error= flush_io_cache(&purge_index_file)) ||
4239 (error= my_sync(purge_index_file.file, MYF(MY_WME))))
4245 int MYSQL_BIN_LOG::register_purge_index_entry(
const char *
entry)
4248 DBUG_ENTER(
"MYSQL_BIN_LOG::register_purge_index_entry");
4250 if ((error=my_b_write(&purge_index_file, (
const uchar*)entry, strlen(entry))) ||
4251 (error=my_b_write(&purge_index_file, (
const uchar*)
"\n", 1)))
4252 DBUG_RETURN (error);
4257 int MYSQL_BIN_LOG::register_create_index_entry(
const char *entry)
4259 DBUG_ENTER(
"MYSQL_BIN_LOG::register_create_index_entry");
4260 DBUG_RETURN(register_purge_index_entry(entry));
4263 int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
4264 bool need_lock_index)
4271 DBUG_ENTER(
"MYSQL_BIN_LOG:purge_index_entry");
4273 DBUG_ASSERT(my_b_inited(&purge_index_file));
4275 if ((error=reinit_io_cache(&purge_index_file, READ_CACHE, 0, 0, 0)))
4277 sql_print_error(
"MYSQL_BIN_LOG::purge_index_entry failed to reinit register file "
4286 if ((length=my_b_gets(&purge_index_file, log_info.log_file_name,
4289 if (purge_index_file.error)
4291 error= purge_index_file.error;
4292 sql_print_error(
"MYSQL_BIN_LOG::purge_index_entry error %d reading from "
4293 "register file.", error);
4302 log_info.log_file_name[length-1]= 0;
4304 if (!
mysql_file_stat(m_key_file_log, log_info.log_file_name, &s, MYF(0)))
4306 if (my_errno == ENOENT)
4314 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4315 ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
4316 log_info.log_file_name);
4318 sql_print_information(
"Failed to execute mysql_file_stat on file '%s'",
4319 log_info.log_file_name);
4329 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4330 ER_BINLOG_PURGE_FATAL_ERR,
4331 "a problem with getting info on being purged %s; "
4332 "consider examining correspondence "
4333 "of your binlog index file "
4334 "to the actual binlog files",
4335 log_info.log_file_name);
4339 sql_print_information(
"Failed to delete log file '%s'; "
4340 "consider examining correspondence "
4341 "of your binlog index file "
4342 "to the actual binlog files",
4343 log_info.log_file_name);
4345 error= LOG_INFO_FATAL;
4351 if ((error=
find_log_pos(&check_log_info, log_info.log_file_name,
4354 if (error != LOG_INFO_EOF)
4358 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4359 ER_BINLOG_PURGE_FATAL_ERR,
4360 "a problem with deleting %s and "
4361 "reading the binlog index file",
4362 log_info.log_file_name);
4366 sql_print_information(
"Failed to delete file '%s' and "
4367 "read the binlog index file",
4368 log_info.log_file_name);
4374 if (!need_lock_index)
4382 ha_binlog_index_purge_file(current_thd, log_info.log_file_name);
4385 DBUG_PRINT(
"info",(
"purging %s",log_info.log_file_name));
4386 if (!my_delete(log_info.log_file_name, MYF(0)))
4388 if (decrease_log_space)
4389 *decrease_log_space-= s.st_size;
4393 if (my_errno == ENOENT)
4397 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4398 ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
4399 log_info.log_file_name);
4401 sql_print_information(
"Failed to delete file '%s'",
4402 log_info.log_file_name);
4409 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4410 ER_BINLOG_PURGE_FATAL_ERR,
4411 "a problem with deleting %s; "
4412 "consider examining correspondence "
4413 "of your binlog index file "
4414 "to the actual binlog files",
4415 log_info.log_file_name);
4419 sql_print_information(
"Failed to delete file '%s'; "
4420 "consider examining correspondence "
4421 "of your binlog index file "
4422 "to the actual binlog files",
4423 log_info.log_file_name);
4425 if (my_errno == EMFILE)
4428 (
"my_errno: %d, set ret = LOG_INFO_EMFILE", my_errno));
4429 error= LOG_INFO_EMFILE;
4432 error= LOG_INFO_FATAL;
4464 int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time,
bool auto_purge)
4467 int no_of_threads_locking_log= 0, no_of_log_files_purged= 0;
4468 bool log_is_active=
false, log_is_in_use=
false;
4469 char to_log[FN_REFLEN], copy_log_in_use[FN_REFLEN];
4472 THD *thd= current_thd;
4474 DBUG_ENTER(
"purge_logs_before_date");
4482 while (!(log_is_active=
is_active(log_info.log_file_name)))
4484 if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
4488 log_is_in_use=
true;
4489 strcpy(copy_log_in_use, log_info.log_file_name);
4493 no_of_log_files_purged++;
4496 log_info.log_file_name, &stat_area, MYF(0)))
4498 if (my_errno == ENOENT)
4512 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4513 ER_BINLOG_PURGE_FATAL_ERR,
4514 "a problem with getting info on being purged %s; "
4515 "consider examining correspondence "
4516 "of your binlog index file "
4517 "to the actual binlog files",
4518 log_info.log_file_name);
4522 sql_print_information(
"Failed to delete log file '%s'",
4523 log_info.log_file_name);
4525 error= LOG_INFO_FATAL;
4531 if (stat_area.st_mtime < purge_time)
4533 log_info.log_file_name,
4534 sizeof(log_info.log_file_name) - 1);
4545 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4546 ER_WARN_PURGE_LOG_IS_ACTIVE,
4547 ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
4548 log_info.log_file_name);
4554 int no_of_log_files_to_purge= no_of_log_files_purged+1;
4555 while (strcmp(log_file_name, log_info.log_file_name))
4558 &stat_area, MYF(0)))
4560 if (stat_area.st_mtime < purge_time)
4561 no_of_log_files_to_purge++;
4567 no_of_log_files_to_purge++;
4572 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4573 ER_WARN_PURGE_LOG_IN_USE,
4574 ER(ER_WARN_PURGE_LOG_IN_USE),
4575 copy_log_in_use, no_of_threads_locking_log,
4576 no_of_log_files_purged, no_of_log_files_to_purge);
4579 error= (to_log[0] ? purge_logs(to_log,
true,
4582 (ulonglong *) 0, auto_purge) : 0);
4602 uint dir_len = dirname_length(log_file_name);
4603 if (dir_len >= FN_REFLEN)
4604 dir_len=FN_REFLEN-1;
4605 strnmov(buf, log_file_name, dir_len);
4606 strmake(buf+dir_len, log_ident, FN_REFLEN - dir_len -1);
4616 return !strcmp(log_file_name, log_file_name_arg);
4634 return new_file_impl(
true, extra_description_event);
4643 return new_file_impl(
false, extra_description_event);
4660 int error= 0, close_on_error= FALSE;
4661 char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open;
4663 DBUG_ENTER(
"MYSQL_BIN_LOG::new_file_impl");
4666 DBUG_PRINT(
"info",(
"log is closed"));
4674 DBUG_EXECUTE_IF(
"semi_sync_3-way_deadlock",
4675 DEBUG_SYNC(current_thd,
"before_rotate_binlog"););
4685 while (get_prep_xids() > 0)
4691 if (DBUG_EVALUATE_IF(
"expire_logs_always", 0, 1)
4692 && (error= ha_flush_logs(NULL)))
4706 if ((error= generate_new_name(new_name, name)))
4710 new_name_ptr=new_name;
4716 is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
4722 r.checksum_alg= relay_log_checksum_alg;
4723 DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
4724 if(DBUG_EVALUATE_IF(
"fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
4725 (error= r.write(&log_file)))
4727 char errbuf[MYSYS_STRERROR_SIZE];
4728 DBUG_EXECUTE_IF(
"fault_injection_new_file_rotate_event", errno=2;);
4729 close_on_error= TRUE;
4730 my_printf_error(ER_ERROR_ON_WRITE, ER(ER_CANT_OPEN_FILE),
4731 MYF(ME_FATALERROR), name,
4732 errno, my_strerror(errbuf,
sizeof(errbuf), errno));
4735 bytes_written += r.data_written;
4746 close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX);
4748 if (checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF)
4750 DBUG_ASSERT(!is_relay_log);
4751 DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset);
4752 binlog_checksum_options= checksum_alg_reset;
4768 file_to_open= index_file_name;
4769 error= open_index_file(index_file_name, 0,
false);
4773 file_to_open= new_name_ptr;
4774 error=
open_binlog(old_name, new_name_ptr, io_cache_type,
4778 extra_description_event);
4784 char errbuf[MYSYS_STRERROR_SIZE];
4785 my_printf_error(ER_CANT_OPEN_FILE, ER(ER_CANT_OPEN_FILE),
4786 MYF(ME_FATALERROR), file_to_open,
4787 error, my_strerror(errbuf,
sizeof(errbuf), error));
4788 close_on_error= TRUE;
4794 if (error && close_on_error )
4808 close(LOG_CLOSE_INDEX);
4809 sql_print_error(
"Could not open %s for logging (error %d). "
4810 "Turning logging off for the whole duration "
4811 "of the MySQL server process. To turn it on "
4812 "again: fix the cause, shutdown the MySQL "
4813 "server and restart it.",
4814 new_name_ptr, errno);
4825 #ifdef HAVE_REPLICATION
4842 bool MYSQL_BIN_LOG::after_append_to_relay_log(Master_info *mi)
4844 DBUG_ENTER(
"MYSQL_BIN_LOG::after_append_to_relay_log");
4845 DBUG_PRINT(
"info",(
"max_size: %lu",max_size));
4850 DBUG_ASSERT(is_relay_log);
4851 DBUG_ASSERT(current_thd->system_thread == SYSTEM_THREAD_SLAVE_IO);
4858 if ((uint) my_b_append_tell(&log_file) >
4859 DBUG_EVALUATE_IF(
"rotate_slave_debug_group", 500, max_size))
4861 error= new_file_without_locking(mi->get_mi_description_event());
4871 bool MYSQL_BIN_LOG::append_event(
Log_event* ev, Master_info *mi)
4873 DBUG_ENTER(
"MYSQL_BIN_LOG::append");
4876 DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
4877 DBUG_ASSERT(is_relay_log);
4884 if (ev->write(&log_file) == 0)
4886 bytes_written+= ev->data_written;
4887 error= after_append_to_relay_log(mi);
4897 bool MYSQL_BIN_LOG::append_buffer(
const char*
buf, uint len, Master_info *mi)
4899 DBUG_ENTER(
"MYSQL_BIN_LOG::append_buffer");
4902 DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
4903 DBUG_ASSERT(is_relay_log);
4908 if (my_b_append(&log_file,(uchar*) buf,len) == 0)
4910 bytes_written += len;
4911 error= after_append_to_relay_log(mi);
4918 #endif // ifdef HAVE_REPLICATION
4924 if (flush_io_cache(&log_file))
4927 std::pair<bool, bool> result= sync_binlog_file(force);
4929 return result.first;
4932 void MYSQL_BIN_LOG::start_union_events(THD *thd, query_id_t query_id_param)
4934 DBUG_ASSERT(!thd->binlog_evt_union.do_union);
4935 thd->binlog_evt_union.do_union= TRUE;
4936 thd->binlog_evt_union.unioned_events= FALSE;
4937 thd->binlog_evt_union.unioned_events_trans= FALSE;
4938 thd->binlog_evt_union.first_query_id= query_id_param;
4941 void MYSQL_BIN_LOG::stop_union_events(THD *thd)
4943 DBUG_ASSERT(thd->binlog_evt_union.do_union);
4944 thd->binlog_evt_union.do_union= FALSE;
4947 bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
4949 return (thd->binlog_evt_union.do_union &&
4950 query_id_param >= thd->binlog_evt_union.first_query_id);
4957 void MYSQL_BIN_LOG::update_thd_next_event_pos(THD* thd)
4959 if (likely(thd != NULL))
4961 thd->set_next_event_pos(log_file_name,
4962 my_b_tell(&log_file));
4977 MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
4979 bool is_transactional)
4981 DBUG_ENTER(
"MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
4982 DBUG_ASSERT(mysql_bin_log.is_open());
4983 DBUG_PRINT(
"enter", (
"event: 0x%lx", (
long) event));
4988 DBUG_ASSERT(cache_mngr);
4991 cache_mngr->get_binlog_cache_data(is_transactional);
4993 DBUG_PRINT(
"info", (
"cache_mngr->pending(): 0x%lx", (
long) cache_data->pending()));
5000 if (cache_data->write_event(thd, pending))
5002 set_write_error(thd, is_transactional);
5003 if (check_write_error(thd) && cache_data &&
5005 cache_data->set_incident();
5007 cache_data->set_pending(NULL);
5014 cache_data->set_pending(event);
5025 THD *thd= event_info->thd;
5027 DBUG_ENTER(
"MYSQL_BIN_LOG::write_event(Log_event *)");
5029 if (thd->binlog_evt_union.do_union)
5035 thd->binlog_evt_union.unioned_events= TRUE;
5036 thd->binlog_evt_union.unioned_events_trans |=
5037 event_info->is_using_trans_cache();
5046 bool const end_stmt=
5047 thd->locked_tables_mode && thd->lex->requires_prelocking();
5048 if (thd->binlog_flush_pending_rows_event(end_stmt,
5049 event_info->is_using_trans_cache()))
5057 if (likely(is_open()))
5059 #ifdef HAVE_REPLICATION
5065 const char *local_db= event_info->get_db();
5066 if ((thd && !(thd->variables.option_bits & OPTION_BIN_LOG)) ||
5067 (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
5068 thd->lex->sql_command != SQLCOM_SAVEPOINT &&
5069 (!event_info->is_no_filter_event() &&
5070 !binlog_filter->db_ok(local_db))))
5074 DBUG_ASSERT(event_info->is_using_trans_cache() || event_info->is_using_stmt_cache());
5076 if (binlog_start_trans_and_stmt(thd, event_info))
5079 bool is_trans_cache= event_info->is_using_trans_cache();
5081 binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(is_trans_cache);
5083 DBUG_PRINT(
"info",(
"event type: %d",event_info->get_type_code()));
5095 if (!thd->is_current_stmt_binlog_format_row())
5097 if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
5100 thd->first_successful_insert_id_in_prev_stmt_for_binlog,
5102 if (cache_data->write_event(thd, &e))
5105 if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
5107 DBUG_PRINT(
"info",(
"number of auto_inc intervals: %u",
5108 thd->auto_inc_intervals_in_cur_stmt_for_binlog.
5111 thd->auto_inc_intervals_in_cur_stmt_for_binlog.
5114 if (cache_data->write_event(thd, &e))
5119 Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
5122 if (cache_data->write_event(thd, &e))
5125 if (thd->user_var_events.elements)
5127 for (uint
i= 0;
i < thd->user_var_events.elements;
i++)
5130 get_dynamic(&thd->user_var_events,(uchar*) &user_var_event,
i);
5133 uchar
flags= User_var_log_event::UNDEF_F;
5134 if (user_var_event->unsigned_flag)
5135 flags|= User_var_log_event::UNSIGNED_F;
5138 user_var_event->user_var_event->entry_name.ptr(),
5139 user_var_event->user_var_event->entry_name.length(),
5140 user_var_event->value,
5141 user_var_event->length,
5142 user_var_event->type,
5143 user_var_event->charset_number,
flags,
5146 if (cache_data->write_event(thd, &e))
5156 if (cache_data->write_event(thd, event_info) ||
5157 DBUG_EVALUATE_IF(
"injecting_fault_writing", 1, 0))
5166 cache_mngr->trx_cache.set_cannot_rollback();
5173 set_write_error(thd, is_trans_cache);
5174 if (check_write_error(thd) && cache_data &&
5176 cache_data->set_incident();
5203 DBUG_ENTER(
"MYSQL_BIN_LOG::rotate");
5205 DBUG_ASSERT(!is_relay_log);
5208 *check_purge=
false;
5210 if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size))
5212 if ((error= new_file_without_locking(NULL)))
5230 sql_print_error(
"The server was unable to create a new log file. "
5231 "An incident event has been written to the binary "
5232 "log which will stop the slaves.");
5249 #ifdef HAVE_REPLICATION
5250 if (expire_logs_days)
5252 DEBUG_SYNC(current_thd,
"at_purge_logs_before_date");
5253 time_t purge_time= my_time(0) - expire_logs_days*24*60*60;
5254 DBUG_EXECUTE_IF(
"expire_logs_always",
5255 { purge_time= my_time(0);});
5256 if (purge_time >= 0)
5262 ha_flush_logs(NULL);
5263 purge_logs_before_date(purge_time,
true);
5281 DBUG_ENTER(
"MYSQL_BIN_LOG::rotate_and_purge");
5282 bool check_purge=
false;
5284 DBUG_ASSERT(!is_relay_log);
5286 error=
rotate(force_rotate, &check_purge);
5293 if (!error && check_purge)
5299 uint MYSQL_BIN_LOG::next_file_id()
5325 static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
5326 uint length, ha_checksum *crc)
5329 uchar *event_begin= buf + off;
5330 uint16
flags= uint2korr(event_begin + FLAGS_OFFSET);
5332 DBUG_ASSERT(length >= off + LOG_EVENT_HEADER_LEN);
5333 int2store(event_begin + FLAGS_OFFSET, flags);
5334 ret= length >= off + event_len ? 0 : off + event_len - length;
5335 *crc= my_checksum(*crc, event_begin, event_len - ret);
5356 int MYSQL_BIN_LOG::do_write_cache(
IO_CACHE *cache)
5358 DBUG_ENTER(
"MYSQL_BIN_LOG::do_write_cache(IO_CACHE *)");
5360 DBUG_EXECUTE_IF(
"simulate_do_write_cache_failure",
5366 DBUG_SET(
"-d,simulate_do_write_cache_failure");
5367 DBUG_RETURN(ER_ERROR_ON_WRITE);
5370 if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
5371 DBUG_RETURN(ER_ERROR_ON_WRITE);
5372 uint length= my_b_bytes_in_cache(cache),
group, carry, hdr_offs;
5375 ulong end_log_pos_inc= 0;
5376 uchar header[LOG_EVENT_HEADER_LEN];
5377 ha_checksum crc= 0, crc_0= 0;
5378 my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
5382 DBUG_ASSERT(!do_checksum ||
5383 binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
5399 group= (uint)my_b_tell(&log_file);
5400 DBUG_PRINT(
"debug", (
"length: %llu, group: %llu",
5401 (ulonglong) length, (ulonglong)
group));
5404 crc= crc_0= my_checksum(0L, NULL, 0);
5406 if (DBUG_EVALUATE_IF(
"fault_injection_crc_value", 1, 0))
5415 if (unlikely(carry > 0))
5417 DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
5420 memcpy(&header[carry], (
char *)cache->read_pos,
5421 LOG_EVENT_HEADER_LEN - carry);
5424 val=uint4korr(header + LOG_POS_OFFSET);
5427 int4store(&header[LOG_POS_OFFSET], val);
5431 ulong len= uint4korr(header + EVENT_LEN_OFFSET);
5437 if (my_b_write(&log_file, header, carry))
5438 DBUG_RETURN(ER_ERROR_ON_WRITE);
5444 memcpy((
char *)cache->read_pos, &header[carry],
5445 LOG_EVENT_HEADER_LEN - carry);
5448 hdr_offs= uint4korr(header + EVENT_LEN_OFFSET) - carry -
5453 DBUG_ASSERT(crc == crc_0 && remains == 0);
5454 crc= my_checksum(crc, header, carry);
5455 remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
5463 if (likely(length > 0))
5473 if (do_checksum && hdr_offs >= length)
5476 DBUG_ASSERT(remains != 0 && crc != crc_0);
5478 crc= my_checksum(crc, cache->read_pos, length);
5480 if (my_b_write(&log_file, cache->read_pos, length))
5481 DBUG_RETURN(ER_ERROR_ON_WRITE);
5484 int4store(buf, crc);
5486 DBUG_RETURN(ER_ERROR_ON_WRITE);
5491 while (hdr_offs < length)
5506 DBUG_ASSERT(crc != crc_0);
5507 crc= my_checksum(crc, cache->read_pos, hdr_offs);
5508 int4store(buf, crc);
5509 remains -= hdr_offs;
5510 DBUG_ASSERT(remains == 0);
5511 if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
5513 DBUG_RETURN(ER_ERROR_ON_WRITE);
5518 if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
5520 carry= length - hdr_offs;
5521 memcpy(header, (
char *)cache->read_pos + hdr_offs, carry);
5527 uchar *ev= (uchar *)cache->read_pos + hdr_offs;
5528 uint event_len= uint4korr(ev + EVENT_LEN_OFFSET);
5529 uchar *log_pos= ev + LOG_POS_OFFSET;
5532 val= uint4korr(log_pos) +
group +
5534 int4store(log_pos, val);
5541 remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
5543 if (my_b_write(&log_file, ev,
5544 remains == 0 ? event_len : length - hdr_offs))
5545 DBUG_RETURN(ER_ERROR_ON_WRITE);
5548 int4store(buf, crc);
5550 DBUG_RETURN(ER_ERROR_ON_WRITE);
5556 hdr_offs += event_len;
5558 DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
5575 if (my_b_write(&log_file, cache->read_pos, length))
5576 DBUG_RETURN(ER_ERROR_ON_WRITE);
5577 cache->read_pos=cache->read_end;
5578 }
while ((length= my_b_fill(cache)));
5580 DBUG_ASSERT(carry == 0);
5581 DBUG_ASSERT(!do_checksum || remains == 0);
5582 DBUG_ASSERT(!do_checksum || crc == crc_0);
5600 bool do_flush_and_sync)
5603 DBUG_ENTER(
"MYSQL_BIN_LOG::write_incident");
5615 error= ev->write(&log_file);
5617 if (do_flush_and_sync)
5621 bool check_purge=
false;
5623 error=
rotate(
true, &check_purge);
5624 if (!error && check_purge)
5647 bool do_flush_and_sync)
5649 DBUG_ENTER(
"MYSQL_BIN_LOG::write_incident");
5655 { C_STRING_WITH_LEN(
"error writing to the binary log") };
5656 Incident incident= INCIDENT_LOST_EVENTS;
5659 DBUG_RETURN(
write_incident(&ev, need_lock_log, do_flush_and_sync));
5682 DBUG_ENTER(
"MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)");
5684 IO_CACHE *cache= &cache_data->cache_log;
5685 bool incident= cache_data->has_incident();
5687 DBUG_EXECUTE_IF(
"simulate_binlog_flush_error",
5689 if (rand() % 3 == 0)
5698 DBUG_ASSERT(is_open());
5699 if (likely(is_open()))
5705 if (my_b_tell(cache) > 0)
5707 DBUG_EXECUTE_IF(
"crash_before_writing_xid",
5709 if ((write_error= do_write_cache(cache)))
5710 DBUG_PRINT(
"info", (
"error writing binlog cache: %d",
5713 DBUG_PRINT(
"info", (
"crashing before writing xid"));
5717 if ((write_error= do_write_cache(cache)))
5724 DBUG_EXECUTE_IF(
"half_binlogged_transaction", DBUG_SUICIDE(););
5727 char errbuf[MYSYS_STRERROR_SIZE];
5728 sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name,
5729 errno, my_strerror(errbuf,
sizeof(errbuf), errno));
5734 global_sid_lock->
rdlock();
5737 global_sid_lock->
unlock();
5740 global_sid_lock->
unlock();
5742 update_thd_next_event_pos(thd);
5750 char errbuf[MYSYS_STRERROR_SIZE];
5752 sql_print_error(ER(ER_ERROR_ON_WRITE), name,
5753 errno, my_strerror(errbuf,
sizeof(errbuf), errno));
5755 thd->commit_error= THD::CE_FLUSH_ERROR;
5779 DBUG_ENTER(
"wait_for_update_relay_log");
5781 thd->ENTER_COND(&update_cond, &LOCK_log,
5782 &stage_slave_has_read_all_relay_log,
5789 const_cast<struct timespec *>(timeout));
5790 thd->EXIT_COND(&old_stage);
5815 DBUG_ENTER(
"wait_for_update_bin_log");
5821 const_cast<struct timespec *>(timeout));
5842 DBUG_ENTER(
"MYSQL_BIN_LOG::close");
5843 DBUG_PRINT(
"enter",(
"exiting: %d", (
int) exiting));
5844 if (log_state == LOG_OPENED)
5846 #ifdef HAVE_REPLICATION
5847 if ((exiting & LOG_CLOSE_STOP_EVENT) != 0)
5851 s.checksum_alg= is_relay_log ?
5852 relay_log_checksum_alg : binlog_checksum_options;
5853 DBUG_ASSERT(!is_relay_log ||
5854 relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
5856 bytes_written+= s.data_written;
5862 if (log_file.type == WRITE_CACHE)
5864 my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
5886 if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file))
5888 end_io_cache(&index_file);
5891 char errbuf[MYSYS_STRERROR_SIZE];
5893 sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name,
5894 errno, my_strerror(errbuf,
sizeof(errbuf), errno));
5897 log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
5904 void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg)
5913 DBUG_ENTER(
"MYSQL_BIN_LOG::set_max_size");
5916 max_size= max_size_arg;
5922 void MYSQL_BIN_LOG::signal_update()
5924 DBUG_ENTER(
"MYSQL_BIN_LOG::signal_update");
5949 DBUG_ASSERT(!is_relay_log);
5950 DBUG_ASSERT(total_ha_2pc > 1 || (1 == total_ha_2pc && opt_bin_log));
5951 DBUG_ASSERT(opt_name && opt_name[0]);
5953 if (!my_b_inited(&index_file))
5963 open_binlog(opt_name, 0, WRITE_CACHE, max_binlog_size,
false,
5973 if (error != LOG_INFO_EOF)
5974 sql_print_error(
"find_log_pos() failed (error: %d)", error);
5986 char log_name[FN_REFLEN];
5987 my_off_t valid_pos= 0;
5988 my_off_t binlog_size;
5991 if (! fdle.is_valid())
5996 strmake(log_name, log_info.log_file_name,
sizeof(log_name)-1);
5999 if (error != LOG_INFO_EOF)
6001 sql_print_error(
"find_log_pos() failed (error: %d)", error);
6007 sql_print_error(
"%s", errmsg);
6011 my_stat(log_name, &s, MYF(0));
6012 binlog_size= s.st_size;
6014 if ((ev= Log_event::read_log_event(&log, 0, &fdle,
6015 opt_master_verify_checksum)) &&
6016 ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
6017 ev->
flags & LOG_EVENT_BINLOG_IN_USE_F)
6019 sql_print_information(
"Recovering after a crash using %s", opt_name);
6020 valid_pos= my_b_tell(&log);
6038 O_RDWR | O_BINARY, MYF(MY_WME))) < 0)
6040 sql_print_error(
"Failed to open the crashed binlog file "
6041 "when master server is recovering it.");
6046 if (valid_pos < binlog_size)
6048 if (my_chsize(file, valid_pos, 0, MYF(MY_WME)))
6050 sql_print_error(
"Failed to trim the crashed binlog file "
6051 "when master server is recovering it.");
6057 sql_print_information(
"Crashed binlog file %s size is %llu, "
6058 "but recovered up to %llu. Binlog trimmed to %llu bytes.",
6059 log_name, binlog_size, valid_pos, valid_pos);
6064 my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
6068 sql_print_error(
"Failed to clear LOG_EVENT_BINLOG_IN_USE_F "
6069 "for the crashed binlog file when master "
6070 "server is recovering it.");
6098 int MYSQL_BIN_LOG::prepare(THD *thd,
bool all)
6100 DBUG_ENTER(
"MYSQL_BIN_LOG::prepare");
6102 int error= ha_prepare_low(thd, all);
6135 DBUG_ENTER(
"MYSQL_BIN_LOG::commit");
6138 my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
6139 int error= RESULT_SUCCESS;
6140 bool stuff_logged=
false;
6142 DBUG_PRINT(
"enter", (
"thd: 0x%llx, all: %s, xid: %llu, cache_mngr: 0x%llx",
6143 (ulonglong) thd, YESNO(all), (ulonglong) xid,
6144 (ulonglong) cache_mngr));
6150 if (cache_mngr == NULL)
6153 DBUG_RETURN(RESULT_ABORTED);
6154 DBUG_RETURN(RESULT_SUCCESS);
6157 THD_TRANS *trans= all ? &thd->transaction.all : &thd->transaction.stmt;
6159 DBUG_PRINT(
"debug", (
"in_transaction: %s, no_2pc: %s, rw_ha_count: %d",
6160 YESNO(thd->in_multi_stmt_transaction_mode()),
6161 YESNO(trans->no_2pc),
6162 trans->rw_ha_count));
6164 (
"all.cannot_safely_rollback(): %s, trx_cache_empty: %s",
6165 YESNO(thd->transaction.all.cannot_safely_rollback()),
6166 YESNO(cache_mngr->trx_cache.is_binlog_empty())));
6168 (
"stmt.cannot_safely_rollback(): %s, stmt_cache_empty: %s",
6169 YESNO(thd->transaction.stmt.cannot_safely_rollback()),
6170 YESNO(cache_mngr->stmt_cache.is_binlog_empty())));
6187 if (!all && trans->ha_list == 0 &&
6188 cache_mngr->stmt_cache.is_binlog_empty())
6189 DBUG_RETURN(RESULT_SUCCESS);
6200 if (!cache_mngr->stmt_cache.is_binlog_empty())
6202 error= write_empty_groups_to_cache(thd, &cache_mngr->stmt_cache);
6205 if (cache_mngr->stmt_cache.finalize(thd))
6206 DBUG_RETURN(RESULT_ABORTED);
6217 if (!error && !cache_mngr->trx_cache.is_binlog_empty() &&
6220 const bool real_trans= (all || thd->transaction.all.ha_list == 0);
6232 if (real_trans && xid && trans->rw_ha_count > 1 && !trans->no_2pc)
6235 if (cache_mngr->trx_cache.finalize(thd, &end_evt))
6236 DBUG_RETURN(RESULT_ABORTED);
6241 true, FALSE, TRUE, 0, TRUE);
6242 if (cache_mngr->trx_cache.finalize(thd, &end_evt))
6243 DBUG_RETURN(RESULT_ABORTED);
6252 cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
6254 DBUG_PRINT(
"debug", (
"error: %d", error));
6257 DBUG_RETURN(RESULT_ABORTED);
6271 if (ordered_commit(thd, all))
6272 DBUG_RETURN(RESULT_INCONSISTENT);
6277 DBUG_RETURN(RESULT_INCONSISTENT);
6280 DBUG_RETURN(error ? RESULT_INCONSISTENT : RESULT_SUCCESS);
6298 std::pair<int,my_off_t>
6299 MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
6303 bool wrote_xid=
false;
6304 int error= cache_mngr->flush(thd, &bytes, &wrote_xid);
6305 if (!error && bytes > 0)
6311 thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
6315 DBUG_PRINT(
"debug", (
"bytes: %llu", bytes));
6316 return std::make_pair(error, bytes);
6334 MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
6336 THD **out_queue_var)
6338 DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var);
6339 my_off_t total_bytes= 0;
6343 my_atomic_rwlock_rdlock(&opt_binlog_max_flush_queue_time_lock);
6344 const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_time);
6345 my_atomic_rwlock_rdunlock(&opt_binlog_max_flush_queue_time_lock);
6346 const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0;
6355 bool has_more=
true;
6356 THD *first_seen= NULL;
6357 while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more)
6359 std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FLUSH_STAGE);
6360 std::pair<int,my_off_t> result= flush_thread_caches(current.second);
6361 has_more= current.first;
6362 total_bytes+= result.second;
6363 if (flush_error == 1)
6364 flush_error= result.first;
6365 if (first_seen == NULL)
6366 first_seen= current.second;
6376 THD *queue= stage_manager.
fetch_queue_for(Stage_manager::FLUSH_STAGE);
6377 for (THD *head= queue ; head ; head = head->next_to_commit)
6379 std::pair<int,my_off_t> result= flush_thread_caches(head);
6380 total_bytes+= result.second;
6381 if (flush_error == 1)
6382 flush_error= result.first;
6384 if (first_seen == NULL)
6388 *out_queue_var= first_seen;
6389 *total_bytes_var= total_bytes;
6390 if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size)
6411 MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
6416 thd->transaction.flags.ready_preempt= 1;
6418 for (THD *head= first ; head ; head = head->next_to_commit)
6420 DBUG_PRINT(
"debug", (
"Thread ID: %lu, commit_error: %d, flags.pending: %s",
6421 head->thread_id, head->commit_error,
6422 YESNO(head->transaction.flags.pending)));
6434 if (head->commit_error == THD::CE_NONE)
6436 excursion.try_to_attach_to(head);
6437 bool all= head->transaction.flags.real_commit;
6438 if (head->transaction.flags.commit_low)
6441 DBUG_ASSERT(head->transaction.flags.ready_preempt);
6446 head->commit_error= THD::CE_COMMIT_ERROR;
6448 DBUG_PRINT(
"debug", (
"commit_error: %d, flags.pending: %s",
6450 YESNO(head->transaction.flags.pending)));
6458 if (head->transaction.flags.xid_written)
6459 dec_prep_xids(head);
6471 MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
6474 for (THD *head= first; head; head= head->next_to_commit)
6476 if (head->transaction.flags.run_hooks &&
6477 head->commit_error == THD::CE_NONE)
6479 excursion.try_to_attach_to(head);
6480 bool all= head->transaction.flags.real_commit;
6481 (void) RUN_HOOK(transaction, after_commit, (head, all));
6486 head->transaction.flags.run_hooks=
false;
6493 static const char* g_stage_name[] = {
6530 MYSQL_BIN_LOG::change_stage(THD *thd,
6535 DBUG_ENTER(
"MYSQL_BIN_LOG::change_stage");
6536 DBUG_PRINT(
"enter", (
"thd: 0x%llx, stage: %s, queue: 0x%llx",
6537 (ulonglong) thd, g_stage_name[stage], (ulonglong) queue));
6538 DBUG_ASSERT(0 <= stage && stage < Stage_manager::STAGE_COUNTER);
6539 DBUG_ASSERT(enter_mutex);
6545 if (!stage_manager.
enroll_for(stage, queue, leave_mutex))
6547 DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
6565 MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var)
6567 if (flush_io_cache(&log_file))
6568 return ER_ERROR_ON_WRITE;
6569 *end_pos_var= my_b_tell(&log_file);
6577 std::pair<bool, bool>
6578 MYSQL_BIN_LOG::sync_binlog_file(
bool force)
6581 unsigned int sync_period= get_sync_period();
6582 if (force || (sync_period && ++sync_counter >= sync_period))
6586 return std::make_pair(
true, synced);
6589 return std::make_pair(
false, synced);
6614 MYSQL_BIN_LOG::finish_commit(THD *thd)
6616 if (thd->transaction.flags.commit_low)
6618 const bool all= thd->transaction.flags.real_commit;
6622 if (thd->commit_error == THD::CE_NONE &&
6624 thd->commit_error= THD::CE_COMMIT_ERROR;
6628 if (thd->transaction.flags.xid_written)
6633 if (thd->commit_error == THD::CE_NONE)
6635 (void) RUN_HOOK(transaction, after_commit, (thd, all));
6636 thd->transaction.flags.run_hooks=
false;
6639 else if (thd->transaction.flags.xid_written)
6646 global_sid_lock->
rdlock();
6648 global_sid_lock->
unlock();
6650 DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.run_hooks);
6651 DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
6652 DBUG_PRINT(
"return", (
"Thread ID: %lu, commit_error: %d",
6653 thd->thread_id, thd->commit_error));
6654 return thd->commit_error;
6707 int MYSQL_BIN_LOG::ordered_commit(THD *thd,
bool all,
bool skip_commit)
6709 DBUG_ENTER(
"MYSQL_BIN_LOG::ordered_commit");
6711 my_off_t total_bytes= 0;
6712 bool do_rotate=
false;
6727 thd->transaction.flags.pending=
true;
6728 thd->commit_error= THD::CE_NONE;
6729 thd->next_to_commit= NULL;
6730 thd->durability_property= HA_IGNORE_DURABILITY;
6731 thd->transaction.flags.real_commit= all;
6732 thd->transaction.flags.xid_written=
false;
6733 thd->transaction.flags.commit_low= !skip_commit;
6734 thd->transaction.flags.run_hooks= !skip_commit;
6744 thd->transaction.flags.ready_preempt= 0;
6747 DBUG_PRINT(
"enter", (
"flags.pending: %s, commit_error: %d, thread_id: %lu",
6748 YESNO(thd->transaction.flags.pending),
6749 thd->commit_error, thd->thread_id));
6759 if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
6761 DBUG_PRINT(
"return", (
"Thread ID: %lu, commit_error: %d",
6762 thd->thread_id, thd->commit_error));
6763 DBUG_RETURN(finish_commit(thd));
6766 THD *wait_queue= NULL;
6767 flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);
6769 my_off_t flush_end_pos= 0;
6770 if (flush_error == 0 && total_bytes > 0)
6771 flush_error= flush_cache_to_file(&flush_end_pos);
6779 if (flush_error == 0)
6781 const char *file_name_ptr= log_file_name + dirname_length(log_file_name);
6782 DBUG_ASSERT(flush_end_pos != 0);
6783 if (RUN_HOOK(binlog_storage, after_flush,
6784 (thd, file_name_ptr, flush_end_pos)))
6786 sql_print_error(
"Failed to run 'after_flush' hooks");
6787 flush_error= ER_ERROR_ON_WRITE;
6791 DBUG_EXECUTE_IF(
"crash_commit_after_log", DBUG_SUICIDE(););
6797 if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
6799 DBUG_PRINT(
"return", (
"Thread ID: %lu, commit_error: %d",
6800 thd->thread_id, thd->commit_error));
6801 DBUG_RETURN(finish_commit(thd));
6803 THD *final_queue= stage_manager.
fetch_queue_for(Stage_manager::SYNC_STAGE);
6804 if (flush_error == 0 && total_bytes > 0)
6806 std::pair<bool, bool> result= sync_binlog_file(
false);
6807 flush_error= result.first;
6819 if (opt_binlog_order_commits)
6821 if (change_stage(thd, Stage_manager::COMMIT_STAGE,
6822 final_queue, &LOCK_sync, &LOCK_commit))
6824 DBUG_PRINT(
"return", (
"Thread ID: %lu, commit_error: %d",
6825 thd->thread_id, thd->commit_error));
6826 DBUG_RETURN(finish_commit(thd));
6828 THD *commit_queue= stage_manager.
fetch_queue_for(Stage_manager::COMMIT_STAGE);
6829 DBUG_EXECUTE_IF(
"semi_sync_3-way_deadlock",
6830 DEBUG_SYNC(thd,
"before_process_commit_stage_queue"););
6831 process_commit_stage_queue(thd, commit_queue);
6837 process_after_commit_stage_queue(thd, commit_queue);
6838 final_queue= commit_queue;
6844 stage_manager.signal_done(final_queue);
6851 (void) finish_commit(thd);
6857 if (do_rotate && thd->commit_error == THD::CE_NONE)
6867 DBUG_EXECUTE_IF(
"crash_commit_before_unlog", DBUG_SUICIDE(););
6868 DEBUG_SYNC(thd,
"ready_to_do_rotation");
6869 bool check_purge=
false;
6871 int error=
rotate(
false, &check_purge);
6875 thd->commit_error= THD::CE_COMMIT_ERROR;
6876 else if (check_purge)
6879 DBUG_RETURN(thd->commit_error);
6897 my_off_t *valid_pos)
6906 bool in_transaction= FALSE;
6908 if (! fdle->is_valid() ||
6909 my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
6910 sizeof(my_xid), 0, 0, MYF(0)))
6913 init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
6915 while ((ev= Log_event::read_log_event(log, 0, fdle, TRUE))
6918 if (ev->get_type_code() == QUERY_EVENT &&
6920 in_transaction= TRUE;
6922 if (ev->get_type_code() == QUERY_EVENT &&
6925 DBUG_ASSERT(in_transaction == TRUE);
6926 in_transaction= FALSE;
6928 else if (ev->get_type_code() == XID_EVENT)
6930 DBUG_ASSERT(in_transaction == TRUE);
6931 in_transaction= FALSE;
6933 uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
6935 if (!x || my_hash_insert(&xids, x))
6974 if (!log->error && !in_transaction &&
6976 *valid_pos= my_b_tell(log);
6981 if (ha_recover(&xids))
6984 free_root(&mem_root, MYF(0));
6985 my_hash_free(&xids);
6989 free_root(&mem_root, MYF(0));
6990 my_hash_free(&xids);
6992 sql_print_error(
"Crash recovery failed. Either correct the problem "
6993 "(if it's, for example, out of memory error) and restart, "
6994 "or delete (or rename) binary log and start mysqld with "
6995 "--tc-heuristic-recover={commit|rollback}");
6999 Group_cache *THD::get_group_cache(
bool is_transactional)
7001 DBUG_ENTER(
"THD::get_group_cache(bool)");
7005 DBUG_ASSERT(opt_bin_log);
7010 DBUG_ASSERT(cache_mngr != NULL);
7013 cache_mngr->get_binlog_cache_data(is_transactional);
7014 DBUG_ASSERT(cache_data != NULL);
7024 int THD::binlog_setup_trx_data()
7026 DBUG_ENTER(
"THD::binlog_setup_trx_data");
7034 open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
7035 LOG_PREFIX, binlog_stmt_cache_size, MYF(MY_WME)) ||
7036 open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
7037 LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
7039 my_free(cache_mngr);
7042 DBUG_PRINT(
"debug", (
"Set ha_data slot %d to 0x%llx", binlog_hton->slot, (ulonglong) cache_mngr));
7043 thd_set_ha_data(
this, binlog_hton, cache_mngr);
7045 cache_mngr=
new (thd_get_cache_mngr(
this))
7047 &binlog_stmt_cache_use,
7048 &binlog_stmt_cache_disk_use,
7049 max_binlog_cache_size,
7051 &binlog_cache_disk_use);
7058 void register_binlog_handler(THD *thd,
bool trx)
7060 DBUG_ENTER(
"register_binlog_handler");
7075 if (cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF)
7081 binlog_trans_log_savepos(thd, &pos);
7082 cache_mngr->trx_cache.set_prev_position(pos);
7094 thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
7127 static int binlog_start_trans_and_stmt(THD *thd,
Log_event *start_event)
7129 DBUG_ENTER(
"binlog_start_trans_and_stmt");
7134 if (thd->binlog_setup_trx_data())
7140 bool is_transactional= start_event->is_using_trans_cache();
7142 binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(is_transactional);
7148 if (start_event->is_using_immediate_logging())
7151 register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode());
7158 if (cache_data->is_binlog_empty())
7161 is_transactional, FALSE, TRUE, 0, TRUE);
7162 if (cache_data->write_event(thd, &qinfo))
7188 int THD::binlog_write_table_map(
TABLE *
table,
bool is_transactional,
7189 bool binlog_rows_query)
7192 DBUG_ENTER(
"THD::binlog_write_table_map");
7193 DBUG_PRINT(
"enter", (
"table: 0x%lx (%s: #%llu)",
7194 (
long) table, table->s->table_name.str,
7195 table->s->table_map_id.id()));
7198 DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
7199 DBUG_ASSERT(table->s->table_map_id.is_valid());
7202 the_event(
this, table, table->s->table_map_id, is_transactional);
7204 binlog_start_trans_and_stmt(
this, &the_event);
7209 cache_mngr->get_binlog_cache_data(is_transactional);
7211 if (binlog_rows_query && this->
query())
7215 rows_query_ev(
this, this->
query(), this->query_length());
7216 if ((error= cache_data->write_event(
this, &rows_query_ev)))
7220 if ((error= cache_data->write_event(
this, &the_event)))
7223 binlog_table_maps++;
7239 THD::binlog_get_pending_rows_event(
bool is_transactional)
const
7252 cache_mngr->get_binlog_cache_data(is_transactional);
7254 rows= cache_data->pending();
7269 THD::add_to_binlog_accessed_dbs(
const char *db_param)
7272 MEM_ROOT *db_mem_root= &main_mem_root;
7274 if (!binlog_accessed_db_names)
7275 binlog_accessed_db_names=
new (db_mem_root)
List<char>;
7277 if (binlog_accessed_db_names->elements > MAX_DBS_IN_EVENT_MTS)
7279 push_warning_printf(
this, Sql_condition::WARN_LEVEL_WARN,
7280 ER_MTS_UPDATED_DBS_GREATER_MAX,
7281 ER(ER_MTS_UPDATED_DBS_GREATER_MAX),
7282 MAX_DBS_IN_EVENT_MTS);
7286 after_db= strdup_root(db_mem_root, db_param);
7293 if (binlog_accessed_db_names->elements != 0)
7300 char **ref_cur_db= it.ref();
7301 int cmp= strcmp(after_db, *ref_cur_db);
7303 DBUG_ASSERT(!swap || cmp < 0);
7310 else if (swap || cmp > 0)
7313 *ref_cur_db= after_db;
7319 binlog_accessed_db_names->push_back(after_db, &main_mem_root);
7421 int THD::decide_logging_format(
TABLE_LIST *tables)
7423 DBUG_ENTER(
"THD::decide_logging_format");
7424 DBUG_PRINT(
"info", (
"query: %s",
query()));
7425 DBUG_PRINT(
"info", (
"variables.binlog_format: %lu",
7426 variables.binlog_format));
7427 DBUG_PRINT(
"info", (
"lex->get_stmt_unsafe_flags(): 0x%x",
7428 lex->get_stmt_unsafe_flags()));
7430 reset_binlog_local_stmt_filter();
7437 if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
7438 !(variables.binlog_format == BINLOG_FORMAT_STMT &&
7439 !binlog_filter->db_ok(db)))
7446 handler::Table_flags flags_write_some_set= 0;
7447 handler::Table_flags flags_access_some_set= 0;
7448 handler::Table_flags flags_write_all_set=
7449 HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE;
7455 my_bool multi_write_engine= FALSE;
7461 my_bool multi_access_engine= FALSE;
7465 my_bool is_write= FALSE;
7469 TABLE* prev_write_table= NULL;
7473 TABLE* prev_access_table= NULL;
7477 bool write_to_some_transactional_table=
false;
7481 bool write_to_some_non_transactional_table=
false;
7486 bool write_all_non_transactional_are_tmp_tables=
true;
7491 uint replicated_tables_count= 0;
7508 uint non_replicated_tables_count= 0;
7511 static const char *prelocked_mode_name[] = {
7514 "PRELOCKED_UNDER_LOCK_TABLES",
7516 DBUG_PRINT(
"debug", (
"prelocked_mode: %s",
7517 prelocked_mode_name[locked_tables_mode]));
7527 if (table->placeholder())
7532 DBUG_PRINT(
"info", (
"table: %s; ha_table_flags: 0x%llx",
7533 table->table_name, flags));
7551 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE);
7553 if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
7555 non_replicated_tables_count++;
7560 replicated_tables_count++;
7562 my_bool trans= table->table->file->has_transactions();
7564 if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
7566 write_to_some_transactional_table=
7567 write_to_some_transactional_table || trans;
7569 write_to_some_non_transactional_table=
7570 write_to_some_non_transactional_table || !trans;
7572 if (prev_write_table && prev_write_table->file->ht !=
7573 table->table->file->ht)
7574 multi_write_engine= TRUE;
7576 if (table->table->s->tmp_table)
7577 lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TEMP_TRANS_TABLE :
7578 LEX::STMT_WRITES_TEMP_NON_TRANS_TABLE);
7580 lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TRANS_TABLE :
7581 LEX::STMT_WRITES_NON_TRANS_TABLE);
7589 write_all_non_transactional_are_tmp_tables=
7590 write_all_non_transactional_are_tmp_tables &&
7591 table->table->s->tmp_table;
7593 flags_write_all_set &=
flags;
7594 flags_write_some_set |=
flags;
7597 prev_write_table= table->table;
7599 flags_access_some_set |=
flags;
7601 if (lex->sql_command != SQLCOM_CREATE_TABLE ||
7602 (lex->sql_command == SQLCOM_CREATE_TABLE &&
7603 (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)))
7605 if (table->table->s->tmp_table)
7606 lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TEMP_TRANS_TABLE :
7607 LEX::STMT_READS_TEMP_NON_TRANS_TABLE);
7609 lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TRANS_TABLE :
7610 LEX::STMT_READS_NON_TRANS_TABLE);
7613 if (prev_access_table && prev_access_table->file->ht !=
7614 table->table->file->ht)
7615 multi_access_engine= TRUE;
7617 prev_access_table= table->table;
7619 DBUG_ASSERT(!is_write ||
7620 write_to_some_transactional_table ||
7621 write_to_some_non_transactional_table);
7626 write_all_non_transactional_are_tmp_tables=
7627 write_all_non_transactional_are_tmp_tables &&
7628 write_to_some_non_transactional_table;
7630 DBUG_PRINT(
"info", (
"flags_write_all_set: 0x%llx", flags_write_all_set));
7631 DBUG_PRINT(
"info", (
"flags_write_some_set: 0x%llx", flags_write_some_set));
7632 DBUG_PRINT(
"info", (
"flags_access_some_set: 0x%llx", flags_access_some_set));
7633 DBUG_PRINT(
"info", (
"multi_write_engine: %d", multi_write_engine));
7634 DBUG_PRINT(
"info", (
"multi_access_engine: %d", multi_access_engine));
7639 bool multi_stmt_trans= in_multi_stmt_transaction_mode();
7641 bool binlog_direct= variables.binlog_direct_non_trans_update;
7643 if (lex->is_mixed_stmt_unsafe(multi_stmt_trans, binlog_direct,
7644 trans_table, tx_isolation))
7645 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MIXED_STATEMENT);
7646 else if (multi_stmt_trans && trans_table && !binlog_direct &&
7647 lex->stmt_accessed_table(LEX::STMT_WRITES_NON_TRANS_TABLE))
7648 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS);
7656 if (multi_write_engine &&
7657 (flags_write_some_set & HA_HAS_OWN_BINLOGGING))
7658 my_error((error= ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE),
7660 else if (multi_access_engine && flags_access_some_set & HA_HAS_OWN_BINLOGGING)
7661 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE);
7664 if ((flags_write_all_set & (HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE)) == 0)
7670 my_error((error= ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE), MYF(0));
7673 else if ((flags_write_all_set & HA_BINLOG_ROW_CAPABLE) == 0)
7675 if (lex->is_stmt_row_injection())
7681 my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
7683 else if (variables.binlog_format == BINLOG_FORMAT_ROW &&
7684 sqlcom_can_generate_row_events(
this))
7690 my_error((error= ER_BINLOG_ROW_MODE_AND_STMT_ENGINE), MYF(0));
7692 else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
7699 for (
int unsafe_type= 0;
7700 unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT;
7702 if (unsafe_flags & (1 << unsafe_type))
7703 my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0),
7704 ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
7712 if (variables.binlog_format == BINLOG_FORMAT_STMT)
7714 if (lex->is_stmt_row_injection())
7720 my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0));
7722 else if ((flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 &&
7723 sqlcom_can_generate_row_events(
this))
7729 my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0),
"");
7731 else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
7737 binlog_unsafe_warning_flags|= unsafe_flags;
7738 DBUG_PRINT(
"info", (
"Scheduling warning to be issued by "
7739 "binlog_query: '%s'",
7740 ER(ER_BINLOG_UNSAFE_STATEMENT)));
7741 DBUG_PRINT(
"info", (
"binlog_unsafe_warning_flags: 0x%x",
7742 binlog_unsafe_warning_flags));
7750 if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection()
7751 || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0)
7754 set_current_stmt_binlog_format_row_if_mixed();
7759 if (non_replicated_tables_count > 0)
7761 if ((replicated_tables_count == 0) || ! is_write)
7763 DBUG_PRINT(
"info", (
"decision: no logging, no replicated table affected"));
7764 set_binlog_local_stmt_filter();
7768 if (! is_current_stmt_binlog_format_row())
7770 my_error((error= ER_BINLOG_STMT_MODE_AND_NO_REPL_TABLES), MYF(0));
7774 clear_binlog_local_stmt_filter();
7780 clear_binlog_local_stmt_filter();
7783 if (!error && enforce_gtid_consistency &&
7784 !is_dml_gtid_compatible(write_to_some_transactional_table,
7785 write_to_some_non_transactional_table,
7786 write_all_non_transactional_are_tmp_tables))
7790 DBUG_PRINT(
"info", (
"decision: no logging since an error was generated"));
7795 lex->sql_command != SQLCOM_END )
7805 if (table->placeholder())
7808 DBUG_ASSERT(table->table);
7810 if (table->table->file->referenced_by_foreign_key())
7816 binlog_accessed_db_names= NULL;
7817 add_to_binlog_accessed_dbs(
"");
7820 if (!is_current_stmt_binlog_format_row())
7821 add_to_binlog_accessed_dbs(table->db);
7824 DBUG_PRINT(
"info", (
"decision: logging in %s format",
7825 is_current_stmt_binlog_format_row() ?
7826 "ROW" :
"STATEMENT"));
7828 if (variables.binlog_format == BINLOG_FORMAT_ROW &&
7829 (lex->sql_command == SQLCOM_UPDATE ||
7830 lex->sql_command == SQLCOM_UPDATE_MULTI ||
7831 lex->sql_command == SQLCOM_DELETE ||
7832 lex->sql_command == SQLCOM_DELETE_MULTI))
7841 if (table->placeholder())
7843 if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
7844 table->lock_type >= TL_WRITE_ALLOW_WRITE)
7846 table_names.append(table->table_name);
7847 table_names.append(
",");
7850 if (!table_names.is_empty())
7852 bool is_update= (lex->sql_command == SQLCOM_UPDATE ||
7853 lex->sql_command == SQLCOM_UPDATE_MULTI);
7857 table_names.replace(table_names.length()-1, 1,
".", 1);
7858 push_warning_printf(
this, Sql_condition::WARN_LEVEL_WARN,
7859 WARN_ON_BLOCKHOLE_IN_RBR,
7860 ER(WARN_ON_BLOCKHOLE_IN_RBR),
7861 is_update ?
"UPDATE" :
"DELETE",
7862 table_names.c_ptr());
7868 DBUG_PRINT(
"info", (
"decision: no logging since "
7869 "mysql_bin_log.is_open() = %d "
7870 "and (options & OPTION_BIN_LOG) = 0x%llx "
7871 "and binlog_format = %lu "
7872 "and binlog_filter->db_ok(db) = %d",
7873 mysql_bin_log.is_open(),
7874 (variables.option_bits & OPTION_BIN_LOG),
7875 variables.binlog_format,
7876 binlog_filter->db_ok(db)));
7883 bool THD::is_ddl_gtid_compatible()
const
7885 DBUG_ENTER(
"THD::is_ddl_gtid_compatible");
7889 if ((variables.option_bits & OPTION_BIN_LOG) == 0)
7892 if (lex->sql_command == SQLCOM_CREATE_TABLE &&
7893 !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
7894 lex->select_lex.item_list.elements)
7903 my_error(ER_GTID_UNSAFE_CREATE_SELECT, MYF(0));
7906 if ((lex->sql_command == SQLCOM_CREATE_TABLE &&
7907 (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) != 0) ||
7908 (lex->sql_command == SQLCOM_DROP_TABLE && lex->drop_temporary))
7916 if (in_multi_stmt_transaction_mode())
7918 my_error(ER_GTID_UNSAFE_CREATE_DROP_TEMPORARY_TABLE_IN_TRANSACTION,
7928 THD::is_dml_gtid_compatible(
bool transactional_table,
7929 bool non_transactional_table,
7930 bool non_transactional_tmp_tables)
const
7932 DBUG_ENTER(
"THD::is_dml_gtid_compatible(bool, bool, bool)");
7936 if ((variables.option_bits & OPTION_BIN_LOG) == 0)
7956 if (non_transactional_table &&
7958 !(non_transactional_tmp_tables && is_current_stmt_binlog_format_row()) &&
7959 !DBUG_EVALUATE_IF(
"allow_gtid_unsafe_non_transactional_updates", 1, 0))
7961 my_error(ER_GTID_UNSAFE_NON_TRANSACTIONAL_TABLE, MYF(0));
7974 #ifndef MYSQL_CLIENT
7998 THD::binlog_prepare_pending_rows_event(
TABLE* table, uint32 serv_id,
8000 bool is_transactional,
8001 RowsEventT *hint __attribute__((unused)),
8002 const uchar* extra_row_info)
8004 DBUG_ENTER(
"binlog_prepare_pending_rows_event");
8007 int const general_type_code= RowsEventT::TYPE_CODE;
8009 Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional);
8011 if (unlikely(pending && !pending->is_valid()))
8025 pending->server_id != serv_id ||
8026 pending->get_table_id() != table->s->table_map_id ||
8027 pending->get_general_type_code() != general_type_code ||
8028 pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
8029 pending->read_write_bitmaps_cmp(table) == FALSE ||
8030 !binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
8035 ev=
new RowsEventT(
this, table, table->s->table_map_id,
8036 is_transactional, extra_row_info);
8039 ev->server_id= serv_id;
8045 mysql_bin_log.flush_and_set_pending_rows_event(
this, ev,
8054 DBUG_RETURN(pending);
8058 CPP_UNNAMED_NS_START
8091 m_alloc_checked= FALSE;
8093 allocate_memory(table, len1);
8102 m_alloc_checked= FALSE;
8104 allocate_memory(table, len1 + len2);
8106 m_ptr[1]=
has_memory() ? m_memory + len1 : 0;
8111 if (m_memory != 0 && m_release_memory_on_destruction)
8123 m_alloc_checked= TRUE;
8125 return m_memory != 0;
8130 DBUG_ASSERT(s <
sizeof(m_ptr)/
sizeof(*m_ptr));
8131 DBUG_ASSERT(m_ptr[s] != 0);
8132 DBUG_ASSERT(m_alloc_checked == TRUE);
8137 void allocate_memory(
TABLE *
const table,
size_t const total_length)
8139 if (table->s->blob_fields == 0)
8151 size_t const maxlen= table->s->reclength + 2 * table->s->fields;
8158 if (table->write_row_record == 0)
8159 table->write_row_record=
8160 (uchar *) alloc_root(&table->
mem_root, 2 * maxlen);
8161 m_memory= table->write_row_record;
8162 m_release_memory_on_destruction= FALSE;
8166 m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
8167 m_release_memory_on_destruction= TRUE;
8172 mutable bool m_alloc_checked;
8174 bool m_release_memory_on_destruction;
8181 int THD::binlog_write_row(
TABLE* table,
bool is_trans,
8183 const uchar* extra_row_info)
8185 DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
8192 if (!memory.has_memory())
8193 return HA_ERR_OUT_OF_MEM;
8195 uchar *row_data= memory.slot(0);
8197 size_t const len= pack_row(table, table->write_set, row_data, record);
8200 binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
8201 static_cast<Write_rows_log_event*>(0),
8204 if (unlikely(ev == 0))
8205 return HA_ERR_OUT_OF_MEM;
8207 return ev->add_row_data(row_data, len);
8210 int THD::binlog_update_row(
TABLE* table,
bool is_trans,
8211 const uchar *before_record,
8212 const uchar *after_record,
8213 const uchar* extra_row_info)
8215 DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
8222 MY_BITMAP *old_read_set= table->read_set;
8223 MY_BITMAP *old_write_set= table->write_set;
8230 binlog_prepare_row_images(table);
8232 size_t const before_maxlen = max_row_length(table, before_record);
8233 size_t const after_maxlen = max_row_length(table, after_record);
8236 if (!row_data.has_memory())
8237 return HA_ERR_OUT_OF_MEM;
8239 uchar *before_row= row_data.slot(0);
8240 uchar *after_row= row_data.slot(1);
8242 size_t const before_size= pack_row(table, table->read_set, before_row,
8244 size_t const after_size= pack_row(table, table->write_set, after_row,
8252 DBUG_DUMP(
"before_record", before_record, table->s->reclength);
8253 DBUG_DUMP(
"after_record", after_record, table->s->reclength);
8254 DBUG_DUMP(
"before_row", before_row, before_size);
8255 DBUG_DUMP(
"after_row", after_row, after_size);
8259 binlog_prepare_pending_rows_event(table, server_id,
8260 before_size + after_size, is_trans,
8261 static_cast<Update_rows_log_event*>(0),
8264 if (unlikely(ev == 0))
8265 return HA_ERR_OUT_OF_MEM;
8267 error= ev->add_row_data(before_row, before_size) ||
8268 ev->add_row_data(after_row, after_size);
8271 table->column_bitmaps_set_no_signal(old_read_set,
8277 int THD::binlog_delete_row(
TABLE* table,
bool is_trans,
8278 uchar
const *record,
8279 const uchar* extra_row_info)
8281 DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
8288 MY_BITMAP *old_read_set= table->read_set;
8289 MY_BITMAP *old_write_set= table->write_set;
8296 binlog_prepare_row_images(table);
8303 if (unlikely(!memory.has_memory()))
8304 return HA_ERR_OUT_OF_MEM;
8306 uchar *row_data= memory.slot(0);
8308 DBUG_DUMP(
"table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
8309 size_t const len= pack_row(table, table->read_set, row_data, record);
8312 binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
8313 static_cast<Delete_rows_log_event*>(0),
8316 if (unlikely(ev == 0))
8317 return HA_ERR_OUT_OF_MEM;
8319 error= ev->add_row_data(row_data, len);
8322 table->column_bitmaps_set_no_signal(old_read_set,
8328 void THD::binlog_prepare_row_images(
TABLE *table)
8330 DBUG_ENTER(
"THD::binlog_prepare_row_images");
8336 DBUG_PRINT_BITSET(
"debug",
"table->read_set (before preparing): %s", table->read_set);
8337 THD *thd= table->in_use;
8344 if (table->s->primary_key < MAX_KEY &&
8345 (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
8346 !ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
8352 DBUG_ASSERT(table->read_set != &table->tmp_set);
8354 bitmap_clear_all(&table->tmp_set);
8356 switch(thd->variables.binlog_row_image)
8358 case BINLOG_ROW_IMAGE_MINIMAL:
8360 table->mark_columns_used_by_index_no_reset(table->s->primary_key,
8363 case BINLOG_ROW_IMAGE_NOBLOB:
8368 bitmap_union(&table->tmp_set, table->read_set);
8369 for (
Field **ptr=table->field ; *ptr ; ptr++)
8371 Field *field= (*ptr);
8372 if ((field->type() == MYSQL_TYPE_BLOB) &&
8373 !(field->flags & PRI_KEY_FLAG))
8374 bitmap_clear_bit(&table->tmp_set, field->field_index);
8382 table->column_bitmaps_set_no_signal(&table->tmp_set,
8386 DBUG_PRINT_BITSET(
"debug",
"table->read_set (after preparing): %s", table->read_set);
8391 int THD::binlog_flush_pending_rows_event(
bool stmt_end,
bool is_transactional)
8393 DBUG_ENTER(
"THD::binlog_flush_pending_rows_event");
8399 if (!mysql_bin_log.is_open())
8407 if (
Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional))
8411 pending->set_flags(Rows_log_event::STMT_END_F);
8412 binlog_table_maps= 0;
8415 error= mysql_bin_log.flush_and_set_pending_rows_event(
this, 0,
8443 THD::binlog_row_event_extra_data_eq(
const uchar* a,
8449 (a[EXTRA_ROW_INFO_LEN_OFFSET] ==
8450 b[EXTRA_ROW_INFO_LEN_OFFSET]) &&
8452 a[EXTRA_ROW_INFO_LEN_OFFSET]) == 0)));
8455 #if !defined(DBUG_OFF) && !defined(_lint)
8457 show_query_type(THD::enum_binlog_query_type qtype)
8460 case THD::ROW_QUERY_TYPE:
8462 case THD::STMT_QUERY_TYPE:
8464 case THD::QUERY_TYPE_COUNT:
8466 DBUG_ASSERT(0 <= qtype && qtype < THD::QUERY_TYPE_COUNT);
8468 static char buf[64];
8469 sprintf(buf,
"UNKNOWN#%d", qtype);
8477 static void reset_binlog_unsafe_suppression()
8479 DBUG_ENTER(
"reset_binlog_unsafe_suppression");
8480 unsafe_warning_suppression_is_activated=
false;
8481 limit_unsafe_warning_count= 0;
8482 limit_unsafe_suppression_start_time= my_getsystime()/10000000;
8489 static void print_unsafe_warning_to_log(
int unsafe_type,
char* buf,
8492 DBUG_ENTER(
"print_unsafe_warning_in_log");
8493 sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT),
8494 ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
8495 sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query);
8512 static void do_unsafe_limit_checkout(
char* buf,
int unsafe_type,
char* query)
8515 DBUG_ENTER(
"do_unsafe_limit_checkout");
8516 DBUG_ASSERT(unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT);
8517 limit_unsafe_warning_count++;
8523 if (limit_unsafe_suppression_start_time == 0)
8525 limit_unsafe_suppression_start_time= my_getsystime()/10000000;
8526 print_unsafe_warning_to_log(unsafe_type, buf, query);
8530 if (!unsafe_warning_suppression_is_activated)
8531 print_unsafe_warning_to_log(unsafe_type, buf, query);
8533 if (limit_unsafe_warning_count >=
8534 LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT)
8536 now= my_getsystime()/10000000;
8537 if (!unsafe_warning_suppression_is_activated)
8545 if ((now-limit_unsafe_suppression_start_time) <=
8546 LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
8548 unsafe_warning_suppression_is_activated=
true;
8549 DBUG_PRINT(
"info",(
"A warning flood has been detected and the limit \
8550 unsafety warning suppression has been activated."));
8557 limit_unsafe_suppression_start_time= my_getsystime()/10000000;
8558 limit_unsafe_warning_count= 0;
8566 sql_print_information(
"The following warning was suppressed %d times \
8567 during the last %d seconds in the error log",
8568 limit_unsafe_warning_count,
8570 (now-limit_unsafe_suppression_start_time));
8571 print_unsafe_warning_to_log(unsafe_type, buf, query);
8577 if ((now - limit_unsafe_suppression_start_time) >
8578 LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
8580 reset_binlog_unsafe_suppression();
8581 DBUG_PRINT(
"info",(
"The limit unsafety warning supression has been \
8585 limit_unsafe_warning_count= 0;
8597 void THD::issue_unsafe_warnings()
8599 char buf[MYSQL_ERRMSG_SIZE * 2];
8600 DBUG_ENTER(
"issue_unsafe_warnings");
8605 DBUG_ASSERT(LEX::BINLOG_STMT_UNSAFE_COUNT <=
8606 sizeof(binlog_unsafe_warning_flags) * CHAR_BIT);
8608 uint32 unsafe_type_flags= binlog_unsafe_warning_flags;
8614 for (
int unsafe_type=0;
8615 unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT;
8618 if ((unsafe_type_flags & (1 << unsafe_type)) != 0)
8620 push_warning_printf(
this, Sql_condition::WARN_LEVEL_NOTE,
8621 ER_BINLOG_UNSAFE_STATEMENT,
8622 ER(ER_BINLOG_UNSAFE_STATEMENT),
8623 ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
8626 if (unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT)
8627 do_unsafe_limit_checkout( buf, unsafe_type, query());
8629 print_unsafe_warning_to_log(unsafe_type, buf, query());
8661 int THD::binlog_query(THD::enum_binlog_query_type qtype,
char const *query_arg,
8662 ulong query_len,
bool is_trans,
bool direct,
8663 bool suppress_use,
int errcode)
8665 DBUG_ENTER(
"THD::binlog_query");
8666 DBUG_PRINT(
"enter", (
"qtype: %s query: '%s'",
8667 show_query_type(qtype), query_arg));
8668 DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
8670 if (get_binlog_local_stmt_filter() == BINLOG_FILTER_SET)
8688 if (this->locked_tables_mode <= LTM_LOCK_TABLES)
8689 if (
int error= binlog_flush_pending_rows_event(TRUE, is_trans))
8714 if ((variables.option_bits & OPTION_BIN_LOG) &&
8715 sp_runtime_ctx == NULL && !binlog_evt_union.do_union)
8716 issue_unsafe_warnings();
8726 case THD::ROW_QUERY_TYPE:
8728 (
"is_current_stmt_binlog_format_row: %d",
8729 is_current_stmt_binlog_format_row()));
8730 if (is_current_stmt_binlog_format_row())
8744 case THD::STMT_QUERY_TYPE:
8751 suppress_use, errcode);
8759 binlog_table_maps= 0;
8764 case THD::QUERY_TYPE_COUNT:
8766 DBUG_ASSERT(0 <= qtype && qtype < QUERY_TYPE_COUNT);
8774 { MYSQL_HANDLERTON_INTERFACE_VERSION };
8778 mysql_declare_plugin(binlog)
8780 MYSQL_STORAGE_ENGINE_PLUGIN,
8781 &binlog_storage_engine,
8784 "This is a pseudo storage engine to represent the binlog in a transaction",
8794 mysql_declare_plugin_end;