5 #ifdef HAVE_REPLICATION
7 #include "sql_string.h"
10 #include <my_bitmap.h>
25 typedef struct st_db_worker_hash_entry
42 TABLE*
volatile temporary_tables;
50 } db_worker_hash_entry;
52 bool init_hash_workers(ulong slave_parallel_workers);
54 Slave_worker *map_db_to_worker(
const char *dbname,
Relay_log_info *rli,
55 db_worker_hash_entry **ptr_entry,
56 bool need_temp_tables, Slave_worker *w);
57 Slave_worker *get_least_occupied_worker(
DYNAMIC_ARRAY *workers);
59 Slave_worker *ignore= NULL);
61 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
63 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
65 typedef struct slave_job_item
77 class circular_buffer_queue
88 circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
89 size(max), avail(0),
entry(max), len(0), inited_queue(FALSE)
91 DBUG_ASSERT(
size < (ulong) -1);
92 if (!my_init_dynamic_array(&Q, el_size,
size, alloc_inc))
95 circular_buffer_queue () : inited_queue(FALSE) {}
96 ~circular_buffer_queue ()
110 ulong de_queue(uchar *);
114 ulong de_tail(uchar *val);
121 ulong en_queue(
void *item);
126 bool gt(ulong
i, ulong k);
128 bool in(ulong k) {
return !empty() &&
129 (
entry > avail ? (k >=
entry || k < avail) : (k >=
entry && k < avail)); }
131 bool full() {
return avail ==
size; }
134 typedef struct st_slave_job_group
136 char *group_master_log_name;
140 my_off_t group_master_log_pos;
151 char *group_relay_log_name;
152 my_off_t group_relay_log_pos;
154 Slave_worker *worker;
155 ulonglong total_seqno;
157 my_off_t master_log_pos;
159 uint checkpoint_seqno;
160 my_off_t checkpoint_log_pos;
161 char* checkpoint_log_name;
162 my_off_t checkpoint_relay_log_pos;
163 char* checkpoint_relay_log_name;
174 void reset(my_off_t master_pos, ulonglong seqno)
176 master_log_pos= master_pos;
177 group_master_log_pos= group_relay_log_pos= 0;
178 group_master_log_name= NULL;
179 group_relay_log_name= NULL;
180 worker_id= MTS_WORKER_UNDEF;
182 checkpoint_log_name= NULL;
183 checkpoint_log_pos= 0;
184 checkpoint_relay_log_name= NULL;
185 checkpoint_relay_log_pos= 0;
186 checkpoint_seqno= (uint) -1;
199 class Slave_committed_queue :
public circular_buffer_queue
206 void update_current_binlog(
const char *post_rotate);
217 ulong assigned_group_index;
219 Slave_committed_queue (
const char *log, uint el_size, ulong max, uint
n,
221 : circular_buffer_queue(el_size, max, inc), inited(FALSE)
226 if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
230 my_init_dynamic_array(&last_done,
sizeof(lwm.total_seqno), n, 0);
231 for (k= 0; k <
n; k++)
232 insert_dynamic(&last_done, (uchar*) &l);
233 lwm.group_relay_log_name= (
char *) my_malloc(FN_REFLEN + 1, MYF(0));
234 lwm.group_relay_log_name[0]= 0;
237 ~Slave_committed_queue ()
241 delete_dynamic(&last_done);
242 my_free(lwm.group_relay_log_name);
243 free_dynamic_items();
254 void free_dynamic_items();
259 Slave_job_group* get_job_group(ulong ind)
261 return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
268 ulong en_queue(
void *item)
270 return assigned_group_index= circular_buffer_queue::en_queue(item);
275 class Slave_jobs_queue :
public circular_buffer_queue
284 ulonglong waited_overfill;
291 #ifdef HAVE_PSI_INTERFACE
292 ,PSI_mutex_key *param_key_info_run_lock,
293 PSI_mutex_key *param_key_info_data_lock,
294 PSI_mutex_key *param_key_info_sleep_lock,
295 PSI_mutex_key *param_key_info_data_cond,
296 PSI_mutex_key *param_key_info_start_cond,
297 PSI_mutex_key *param_key_info_stop_cond,
298 PSI_mutex_key *param_key_info_sleep_cond
302 virtual ~Slave_worker();
304 Slave_jobs_queue jobs;
309 bool curr_group_seen_begin;
316 volatile ulong last_group_done_index;
317 ulong wq_empty_waits;
320 volatile int curr_jobs;
322 long usage_partition;
324 bool end_group_sets_max_dbs;
326 volatile bool relay_log_change_notified;
327 volatile bool checkpoint_notified;
328 volatile bool master_log_change_notified;
329 ulong bitmap_shifted;
342 ulong underrun_level;
352 char checkpoint_relay_log_name[FN_REFLEN];
353 ulonglong checkpoint_relay_log_pos;
354 char checkpoint_master_log_name[FN_REFLEN];
355 ulonglong checkpoint_master_log_pos;
358 ulong checkpoint_seqno;
359 enum en_running_state
370 en_running_state
volatile running_status;
373 int rli_init_info(
bool);
375 static size_t get_number_worker_fields();
376 void slave_worker_ends_group(
Log_event*,
int);
377 const char *get_master_log_name();
378 ulonglong get_master_log_pos() {
return master_log_pos; };
379 ulonglong set_master_log_pos(ulong val) {
return master_log_pos= val; };
380 bool commit_positions(
Log_event *evt, Slave_job_group *ptr_g,
bool force);
381 bool reset_recovery_info();
390 DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
398 rli_description_event= fdle;
401 inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
402 inline void set_gaq_index(ulong val)
404 if (gaq_index == c_rli->gaq->size)
410 virtual void do_report(loglevel
level,
int err_code,
411 const char *
msg, va_list v_args)
const;
415 ulonglong master_log_pos;
417 bool read_info(Rpl_info_handler *from);
418 bool write_info(Rpl_info_handler *
to);
419 Slave_worker& operator=(
const Slave_worker& info);
420 Slave_worker(
const Slave_worker& info);
423 TABLE* mts_move_temp_table_to_entry(
TABLE*, THD*, db_worker_hash_entry*);
424 TABLE* mts_move_temp_tables_to_thd(THD*,
TABLE*);
425 #endif // HAVE_REPLICATION