1 #ifndef BINLOG_H_INCLUDED
17 #define BINLOG_H_INCLUDED
37 : m_first(NULL), m_last(&m_first)
42 #ifdef HAVE_PSI_INTERFACE
43 PSI_mutex_key key_LOCK_queue
53 bool is_empty()
const {
54 return m_first == NULL;
67 std::pair<bool,THD*> pop_front();
89 } __attribute__((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));
111 #ifdef HAVE_PSI_INTERFACE
112 PSI_mutex_key key_LOCK_flush_queue,
113 PSI_mutex_key key_LOCK_sync_queue,
114 PSI_mutex_key key_LOCK_commit_queue,
115 PSI_mutex_key key_LOCK_done,
116 PSI_cond_key key_COND_done
126 m_queue[FLUSH_STAGE].init(
127 #ifdef HAVE_PSI_INTERFACE
131 m_queue[SYNC_STAGE].init(
132 #ifdef HAVE_PSI_INTERFACE
136 m_queue[COMMIT_STAGE].init(
137 #ifdef HAVE_PSI_INTERFACE
138 key_LOCK_commit_queue
145 for (
size_t i = 0 ;
i < STAGE_COUNTER ; ++
i)
175 std::pair<bool,THD*> pop_front(
StageID stage)
177 return m_queue[stage].pop_front();
198 DBUG_PRINT(
"debug", (
"Fetching queue for stage %d", stage));
202 void signal_done(THD *queue) {
204 for (THD *thd= queue ; thd ; thd = thd->next_to_commit)
205 thd->transaction.flags.pending=
false;
218 Mutex_queue m_queue[STAGE_COUNTER];
227 bool leader_await_preempt_status;
238 #ifdef HAVE_PSI_INTERFACE
240 PSI_mutex_key m_key_LOCK_index;
242 PSI_mutex_key m_key_COND_done;
244 PSI_mutex_key m_key_LOCK_commit_queue;
245 PSI_mutex_key m_key_LOCK_done;
246 PSI_mutex_key m_key_LOCK_flush_queue;
247 PSI_mutex_key m_key_LOCK_sync_queue;
249 PSI_mutex_key m_key_LOCK_commit;
251 PSI_mutex_key m_key_LOCK_sync;
253 PSI_mutex_key m_key_LOCK_xids;
255 PSI_cond_key m_key_update_cond;
257 PSI_cond_key m_key_prep_xids_cond;
259 PSI_file_key m_key_file_log;
261 PSI_file_key m_key_file_log_index;
269 ulonglong bytes_written;
271 char index_file_name[FN_REFLEN];
277 char crash_safe_index_file_name[FN_REFLEN];
285 char purge_index_file_name[FN_REFLEN];
307 uint *sync_period_ptr;
312 volatile int32 m_prep_xids;
317 void inc_prep_xids(THD *thd) {
318 DBUG_ENTER(
"MYSQL_BIN_LOG::inc_prep_xids");
319 my_atomic_rwlock_wrlock(&m_prep_xids_lock);
321 int result= my_atomic_add32(&m_prep_xids, 1);
323 (void) my_atomic_add32(&m_prep_xids, 1);
325 DBUG_PRINT(
"debug", (
"m_prep_xids: %d", result + 1));
326 my_atomic_rwlock_wrunlock(&m_prep_xids_lock);
327 thd->transaction.flags.xid_written=
true;
336 void dec_prep_xids(THD *thd) {
337 DBUG_ENTER(
"MYSQL_BIN_LOG::dec_prep_xids");
338 my_atomic_rwlock_wrlock(&m_prep_xids_lock);
339 int32 result= my_atomic_add32(&m_prep_xids, -1);
340 DBUG_PRINT(
"debug", (
"m_prep_xids: %d", result - 1));
341 my_atomic_rwlock_wrunlock(&m_prep_xids_lock);
342 thd->transaction.flags.xid_written=
false;
353 int32 get_prep_xids() {
354 my_atomic_rwlock_rdlock(&m_prep_xids_lock);
355 int32 result= my_atomic_load32(&m_prep_xids);
356 my_atomic_rwlock_rdunlock(&m_prep_xids_lock);
360 inline uint get_sync_period()
362 return *sync_period_ptr;
376 void do_flush(THD *thd);
380 using MYSQL_LOG::is_open;
385 uint8 checksum_alg_reset;
419 uint8 relay_log_checksum_alg;
428 #ifdef HAVE_PSI_INTERFACE
429 void set_psi_keys(PSI_mutex_key key_LOCK_index,
430 PSI_mutex_key key_LOCK_commit,
431 PSI_mutex_key key_LOCK_commit_queue,
432 PSI_mutex_key key_LOCK_done,
433 PSI_mutex_key key_LOCK_flush_queue,
434 PSI_mutex_key key_LOCK_log,
435 PSI_mutex_key key_LOCK_sync,
436 PSI_mutex_key key_LOCK_sync_queue,
437 PSI_mutex_key key_LOCK_xids,
438 PSI_cond_key key_COND_done,
439 PSI_cond_key key_update_cond,
440 PSI_cond_key key_prep_xids_cond,
441 PSI_file_key key_file_log,
442 PSI_file_key key_file_log_index)
444 m_key_COND_done= key_COND_done;
446 m_key_LOCK_commit_queue= key_LOCK_commit_queue;
447 m_key_LOCK_done= key_LOCK_done;
448 m_key_LOCK_flush_queue= key_LOCK_flush_queue;
449 m_key_LOCK_sync_queue= key_LOCK_sync_queue;
451 m_key_LOCK_index= key_LOCK_index;
452 m_key_LOCK_log= key_LOCK_log;
453 m_key_LOCK_commit= key_LOCK_commit;
454 m_key_LOCK_sync= key_LOCK_sync;
455 m_key_LOCK_xids= key_LOCK_xids;
456 m_key_update_cond= key_update_cond;
457 m_key_prep_xids_cond= key_prep_xids_cond;
458 m_key_file_log= key_file_log;
459 m_key_file_log_index= key_file_log_index;
474 const char **errmsg);
492 bool verify_checksum,
bool need_lock);
494 void set_previous_gtid_set(
Gtid_set *previous_gtid_set_param)
496 previous_gtid_set= previous_gtid_set_param;
501 int open(
const char *opt_name) {
return open_binlog(opt_name); }
505 std::pair<int,my_off_t> flush_thread_caches(THD *thd);
506 int flush_cache_to_file(my_off_t *flush_end_pos);
507 int finish_commit(THD *thd);
508 std::pair<bool, bool> sync_binlog_file(
bool force);
509 void process_commit_stage_queue(THD *thd, THD *queue);
510 void process_after_commit_stage_queue(THD *thd, THD *first);
511 int process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
512 THD **out_queue_var);
513 int ordered_commit(THD *thd,
bool all,
bool skip_commit =
false);
517 enum_result
commit(THD *thd,
bool all);
519 int prepare(THD *thd,
bool all);
521 my_off_t *valid_pos);
523 #if !defined(MYSQL_CLIENT)
525 void update_thd_next_event_pos(THD *thd);
527 bool is_transactional);
530 void add_bytes_written(ulonglong inc)
532 bytes_written += inc;
534 void reset_bytes_written()
538 void harvest_bytes_written(ulonglong* counter)
541 char buf1[22],buf2[22];
543 DBUG_ENTER(
"harvest_bytes_written");
544 (*counter)+=bytes_written;
545 DBUG_PRINT(
"info",(
"counter: %s bytes_written: %s", llstr(*counter,buf1),
546 llstr(bytes_written,buf2)));
550 void set_max_size(ulong max_size_arg);
551 void signal_update();
555 void init_pthread_objects();
576 const char *new_name,
577 enum cache_type io_cache_type_arg,
580 bool need_lock_index,
bool need_sid_lock,
582 bool open_index_file(
const char *index_file_name_arg,
583 const char *log_name,
bool need_lock_index);
589 int do_write_cache(
IO_CACHE *cache);
591 void set_write_error(THD *thd,
bool is_transactional);
592 bool check_write_error(THD *thd);
594 bool do_flush_and_sync=
true);
596 bool do_flush_and_sync=
true);
598 void start_union_events(THD *thd, query_id_t query_id_param);
599 void stop_union_events(THD *thd);
600 bool is_query_in_union(THD *thd, query_id_t query_id_param);
602 #ifdef HAVE_REPLICATION
603 bool append_buffer(
const char*
buf, uint len, Master_info *mi);
604 bool append_event(
Log_event* ev, Master_info *mi);
606 bool after_append_to_relay_log(Master_info *mi);
607 #endif // ifdef HAVE_REPLICATION
611 bool is_active(
const char* log_file_name);
612 int remove_logs_from_index(
LOG_INFO* linfo,
bool need_update_threads);
613 int rotate(
bool force_rotate,
bool* check_purge);
631 int purge_logs(
const char *to_log,
bool included,
632 bool need_lock_index,
bool need_update_threads,
633 ulonglong *decrease_log_space,
bool auto_purge);
634 int purge_logs_before_date(time_t purge_time,
bool auto_purge);
640 bool need_lock_index);
642 int set_purge_index_file_name(
const char *base_file_name);
643 int open_purge_index_file(
bool destroy);
644 bool is_inited_purge_index_file();
645 int close_purge_index_file();
646 int clean_purge_index_file();
647 int sync_purge_index_file();
648 int register_purge_index_entry(
const char*
entry);
649 int register_create_index_entry(
const char* entry);
650 int purge_index_entry(THD *thd, ulonglong *decrease_log_space,
651 bool need_lock_index);
653 void close(uint exiting);
657 bool need_lock_index);
659 int get_current_log(
LOG_INFO* linfo);
660 int raw_get_current_log(
LOG_INFO* linfo);
662 inline char* get_index_fname() {
return index_file_name;}
663 inline char* get_log_fname() {
return log_file_name; }
664 inline char* get_name() {
return name; }
666 inline mysql_cond_t* get_log_cond() {
return &update_cond; }
667 inline IO_CACHE* get_log_file() {
return &log_file; }
671 inline IO_CACHE *get_index_file() {
return &index_file;}
672 inline uint32 get_open_count() {
return open_count; }
678 my_off_t last_pos_in_file;
679 bool wrote_create_file, log_delayed;
697 const char **errmsg);
698 int check_binlog_magic(
IO_CACHE* log,
const char** errmsg);
701 bool show_binlog_events(THD *thd,
MYSQL_BIN_LOG *binary_log);
702 bool mysql_show_binlog_events(THD* thd);
706 void register_binlog_handler(THD *thd,
bool trx);
709 extern const char *log_bin_index;
710 extern const char *log_bin_basename;
711 extern bool opt_binlog_order_commits;
727 inline bool normalize_binlog_name(
char *
to,
const char *from,
bool is_relay_log)
729 DBUG_ENTER(
"normalize_binlog_name");
731 char buff[FN_REFLEN];
732 char *ptr= (
char*) from;
733 char *opt_name= is_relay_log ? opt_relay_logname : opt_bin_logname;
738 if (opt_name && opt_name[0] && from && !test_if_hard_path(from))
742 char log_dirpart[FN_REFLEN], log_dirname[FN_REFLEN];
743 size_t log_dirpart_len, log_dirname_len;
744 dirname_part(log_dirpart, opt_name, &log_dirpart_len);
745 dirname_part(log_dirname, from, &log_dirname_len);
749 if (log_dirpart_len > 0)
752 if(fn_format(buff, from+log_dirname_len, log_dirpart,
"",
753 MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH)) == NULL)
766 strmake(to, ptr, strlen(ptr));