4 #include "rpl_rli_pdb.h"
6 #include "sql_string.h"
11 uint mts_debug_concurrent_access= 0;
14 #define HASH_DYNAMIC_INIT 4
15 #define HASH_DYNAMIC_INCR 1
23 const char *info_slave_worker_fields []=
32 "group_relay_log_name",
33 "group_relay_log_pos",
34 "group_master_log_name",
35 "group_master_log_pos",
42 "checkpoint_relay_log_name",
43 "checkpoint_relay_log_pos",
44 "checkpoint_master_log_name",
45 "checkpoint_master_log_pos",
55 "checkpoint_group_size",
59 "checkpoint_group_bitmap"
67 ulong mts_partition_hash_soft_max= 16;
70 #ifdef HAVE_PSI_INTERFACE
71 ,PSI_mutex_key *param_key_info_run_lock,
72 PSI_mutex_key *param_key_info_data_lock,
73 PSI_mutex_key *param_key_info_sleep_lock,
74 PSI_mutex_key *param_key_info_data_cond,
75 PSI_mutex_key *param_key_info_start_cond,
76 PSI_mutex_key *param_key_info_stop_cond,
77 PSI_mutex_key *param_key_info_sleep_cond
82 #ifdef HAVE_PSI_INTERFACE
83 ,param_key_info_run_lock, param_key_info_data_lock,
84 param_key_info_sleep_lock,
85 param_key_info_data_cond, param_key_info_start_cond,
86 param_key_info_stop_cond, param_key_info_sleep_cond
89 ), c_rli(rli),
id(param_id),
90 checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
91 checkpoint_seqno(0), running_status(NOT_RUNNING)
98 checkpoint_relay_log_name[0]= 0;
99 checkpoint_master_log_name[0]= 0;
100 my_init_dynamic_array(&curr_group_exec_parts,
sizeof(db_worker_hash_entry*),
101 SLAVE_INIT_DBS_IN_GROUP, 1);
107 Slave_worker::~Slave_worker()
110 if (jobs.inited_queue)
112 DBUG_ASSERT(jobs.Q.elements == jobs.size);
113 delete_dynamic(&jobs.Q);
115 delete_dynamic(&curr_group_exec_parts);
119 set_rli_description_event(NULL);
134 DBUG_ENTER(
"Slave_worker::init_worker");
135 DBUG_ASSERT(!rli->info_thd->is_error());
137 Slave_job_item empty= {NULL};
140 if (rli_init_info(
false) ||
141 DBUG_EVALUATE_IF(
"inject_init_worker_init_info_fault",
true,
false))
145 curr_group_exec_parts.elements= 0;
146 relay_log_change_notified= FALSE;
147 checkpoint_notified= FALSE;
148 master_log_change_notified=
false;
150 workers= c_rli->workers;
151 wq_size_waits_cnt= groups_done= events_done= curr_jobs= 0;
153 end_group_sets_max_dbs=
false;
154 gaq_index= last_group_done_index= c_rli->gaq->size;
156 DBUG_ASSERT(!jobs.inited_queue);
159 jobs.overfill= FALSE;
160 jobs.waited_overfill= 0;
161 jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
162 jobs.inited_queue=
true;
163 curr_group_seen_begin= curr_group_seen_gtid=
false;
165 my_init_dynamic_array(&jobs.Q,
sizeof(Slave_job_item), jobs.size, 0);
166 for (k= 0; k < jobs.size; k++)
167 insert_dynamic(&jobs.Q, (uchar*) &empty);
169 DBUG_ASSERT(jobs.Q.elements == jobs.size);
171 wq_overrun_cnt= excess_cnt= 0;
172 underrun_level= (ulong) ((rli->mts_worker_underrun_level * jobs.size) / 100.0);
174 overrun_level= jobs.size - underrun_level;
199 int Slave_worker::rli_init_info(
bool is_gaps_collecting_phase)
201 enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
203 DBUG_ENTER(
"Slave_worker::rli_init_info");
213 size_t num_bits= is_gaps_collecting_phase ?
214 MTS_MAX_BITS_IN_GROUP : c_rli->checkpoint_group;
220 return_check= check_info();
221 if (return_check == ERROR_CHECKING_REPOSITORY ||
222 (return_check == REPOSITORY_DOES_NOT_EXIST && is_gaps_collecting_phase))
228 bitmap_init(&group_executed, NULL, num_bits, FALSE);
229 bitmap_init(&group_shifted, NULL, num_bits, FALSE);
231 if (is_gaps_collecting_phase &&
232 (DBUG_EVALUATE_IF(
"mts_slave_worker_init_at_gaps_fails",
true,
false) ||
235 bitmap_free(&group_executed);
236 bitmap_free(&group_shifted);
246 sql_print_error(
"Error reading slave worker configuration");
250 void Slave_worker::end_info()
252 DBUG_ENTER(
"Slave_worker::end_info");
262 bitmap_free(&group_executed);
263 bitmap_free(&group_shifted);
270 int Slave_worker::flush_info(
const bool force)
272 DBUG_ENTER(
"Slave_worker::flush_info");
283 handler->set_sync_period(sync_relayloginfo_period);
288 if (
handler->flush_info(force))
294 sql_print_error(
"Error writing slave worker configuration");
298 bool Slave_worker::read_info(Rpl_info_handler *from)
300 DBUG_ENTER(
"Slave_worker::read_info");
302 ulong temp_group_relay_log_pos= 0;
303 ulong temp_group_master_log_pos= 0;
304 ulong temp_checkpoint_relay_log_pos= 0;
305 ulong temp_checkpoint_master_log_pos= 0;
306 ulong temp_checkpoint_seqno= 0;
308 uchar *buffer= (uchar *) group_executed.bitmap;
309 int temp_internal_id= 0;
311 if (from->prepare_info_for_read())
314 if (from->get_info((
int *) &temp_internal_id, (
int) 0) ||
315 from->get_info(group_relay_log_name,
316 (
size_t)
sizeof(group_relay_log_name),
318 from->get_info((ulong *) &temp_group_relay_log_pos,
320 from->get_info(group_master_log_name,
321 (
size_t)
sizeof(group_master_log_name),
323 from->get_info((ulong *) &temp_group_master_log_pos,
325 from->get_info(checkpoint_relay_log_name,
326 (
size_t)
sizeof(checkpoint_relay_log_name),
328 from->get_info((ulong *) &temp_checkpoint_relay_log_pos,
330 from->get_info(checkpoint_master_log_name,
331 (
size_t)
sizeof(checkpoint_master_log_name),
333 from->get_info((ulong *) &temp_checkpoint_master_log_pos,
335 from->get_info((ulong *) &temp_checkpoint_seqno,
337 from->get_info(&nbytes, (ulong) 0) ||
338 from->get_info(buffer, (
size_t) nbytes,
342 DBUG_ASSERT(nbytes <= no_bytes_in_map(&group_executed));
344 internal_id=(uint) temp_internal_id;
345 group_relay_log_pos= temp_group_relay_log_pos;
346 group_master_log_pos= temp_group_master_log_pos;
347 checkpoint_relay_log_pos= temp_checkpoint_relay_log_pos;
348 checkpoint_master_log_pos= temp_checkpoint_master_log_pos;
349 checkpoint_seqno= temp_checkpoint_seqno;
354 bool Slave_worker::write_info(Rpl_info_handler *
to)
356 DBUG_ENTER(
"Master_info::write_info");
358 ulong nbytes= (ulong) no_bytes_in_map(&group_executed);
359 uchar *buffer= (uchar*) group_executed.bitmap;
360 DBUG_ASSERT(nbytes <= (c_rli->checkpoint_group + 7) / 8);
362 if (to->prepare_info_for_write() ||
363 to->set_info((
int) internal_id) ||
364 to->set_info(group_relay_log_name) ||
365 to->set_info((ulong) group_relay_log_pos) ||
366 to->set_info(group_master_log_name) ||
367 to->set_info((ulong) group_master_log_pos) ||
368 to->set_info(checkpoint_relay_log_name) ||
369 to->set_info((ulong) checkpoint_relay_log_pos) ||
370 to->set_info(checkpoint_master_log_name) ||
371 to->set_info((ulong) checkpoint_master_log_pos) ||
372 to->set_info((ulong) checkpoint_seqno) ||
373 to->set_info(nbytes) ||
374 to->set_info(buffer, (
size_t) nbytes))
388 bool Slave_worker::reset_recovery_info()
390 DBUG_ENTER(
"Slave_worker::reset_recovery_info");
392 set_group_master_log_name(
"");
393 set_group_master_log_pos(0);
395 DBUG_RETURN(flush_info(
true));
398 size_t Slave_worker::get_number_worker_fields()
400 return sizeof(info_slave_worker_fields)/
sizeof(info_slave_worker_fields[0]);
403 const char* Slave_worker::get_master_log_name()
405 Slave_job_group* ptr_g= c_rli->gaq->get_job_group(gaq_index);
407 return (ptr_g->checkpoint_log_name != NULL) ?
408 ptr_g->checkpoint_log_name : checkpoint_master_log_name;
411 bool Slave_worker::commit_positions(
Log_event *ev, Slave_job_group* ptr_g,
bool force)
413 DBUG_ENTER(
"Slave_worker::checkpoint_positions");
423 if (ptr_g->group_master_log_name != NULL)
425 strmake(group_master_log_name, ptr_g->group_master_log_name,
426 sizeof(group_master_log_name) - 1);
427 my_free(ptr_g->group_master_log_name);
428 ptr_g->group_master_log_name= NULL;
429 strmake(checkpoint_master_log_name, group_master_log_name,
430 sizeof(checkpoint_master_log_name) - 1);
432 if (ptr_g->checkpoint_log_name != NULL)
434 strmake(checkpoint_relay_log_name, ptr_g->checkpoint_relay_log_name,
435 sizeof(checkpoint_relay_log_name) - 1);
436 checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
437 strmake(checkpoint_master_log_name, ptr_g->checkpoint_log_name,
438 sizeof(checkpoint_master_log_name) - 1);
439 checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
441 my_free(ptr_g->checkpoint_log_name);
442 ptr_g->checkpoint_log_name= NULL;
443 my_free(ptr_g->checkpoint_relay_log_name);
444 ptr_g->checkpoint_relay_log_name= NULL;
446 bitmap_copy(&group_shifted, &group_executed);
447 bitmap_clear_all(&group_executed);
448 for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++)
450 if (bitmap_is_set(&group_shifted, pos))
451 bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
457 if (ptr_g->group_relay_log_name)
459 DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
460 <=
sizeof(group_relay_log_name));
461 strmake(group_relay_log_name, ptr_g->group_relay_log_name,
462 sizeof(group_relay_log_name) - 1);
465 DBUG_ASSERT(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1));
467 bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
468 checkpoint_seqno= ptr_g->checkpoint_seqno;
470 group_master_log_pos= ev->log_pos;
477 strmake(group_master_log_name, c_rli->get_group_master_log_name(),
478 sizeof(group_master_log_name)-1);
480 DBUG_PRINT(
"mts", (
"Committing worker-id %lu group master log pos %llu "
481 "group master log name %s checkpoint sequence number %lu.",
482 id, group_master_log_pos, group_master_log_name, checkpoint_seqno));
484 DBUG_EXECUTE_IF(
"mts_debug_concurrent_access",
486 mts_debug_concurrent_access++;
490 DBUG_RETURN(flush_info(force));
493 static HASH mapping_db_to_worker;
494 static bool inited_hash_workers= FALSE;
496 #ifdef HAVE_PSI_INTERFACE
497 PSI_mutex_key key_mutex_slave_worker_hash;
498 PSI_cond_key key_cond_slave_worker_hash;
505 extern "C" uchar *get_key(
const uchar *
record,
size_t *length,
506 my_bool not_used __attribute__((unused)))
508 DBUG_ENTER(
"get_key");
510 db_worker_hash_entry *
entry=(db_worker_hash_entry *) record;
511 *length= strlen(entry->db);
513 DBUG_PRINT(
"info", (
"get_key %s, %d", entry->db, (
int) *length));
515 DBUG_RETURN((uchar*) entry->db);
519 static void free_entry(db_worker_hash_entry *entry)
521 THD *c_thd= current_thd;
523 DBUG_ENTER(
"free_entry");
525 DBUG_PRINT(
"info", (
"free_entry %s, %d", entry->db, (
int) strlen(entry->db)));
527 DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
537 mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
538 entry->temporary_tables= NULL;
540 my_free((
void *) entry->db);
546 bool init_hash_workers(ulong slave_parallel_workers)
548 DBUG_ENTER(
"init_hash_workers");
551 (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
553 (my_hash_free_key) free_entry, 0) == 0);
554 if (inited_hash_workers)
556 #ifdef HAVE_PSI_INTERFACE
559 mysql_cond_init(key_cond_slave_worker_hash, &slave_worker_hash_cond, NULL);
567 DBUG_RETURN (!inited_hash_workers);
572 DBUG_ENTER(
"destroy_hash_workers");
573 if (inited_hash_workers)
575 my_hash_free(&mapping_db_to_worker);
578 inited_hash_workers=
false;
597 db_worker_hash_entry *entry)
603 table->prev->next= table->next;
604 if (table->prev->next)
605 table->next->prev= table->prev;
610 DBUG_ASSERT(table == thd->temporary_tables);
612 thd->temporary_tables= table->next;
613 if (thd->temporary_tables)
614 table->next->prev= 0;
616 table->next= entry->temporary_tables;
619 table->next->prev=
table;
620 entry->temporary_tables=
table;
638 TABLE* mts_move_temp_tables_to_thd(THD *thd,
TABLE *temporary_tables)
640 TABLE *table= temporary_tables;
645 DBUG_ASSERT(!temporary_tables->prev);
651 }
while(table->next && (table= table->next));
654 if (thd->temporary_tables)
655 thd->temporary_tables->prev=
table;
656 table->next= thd->temporary_tables;
657 thd->temporary_tables= temporary_tables;
659 return thd->temporary_tables;
671 static void move_temp_tables_to_entry(THD* thd, db_worker_hash_entry* entry)
673 for (
TABLE *table= thd->temporary_tables; table;)
675 if (strcmp(table->s->db.str, entry->db) == 0)
678 table= mts_move_temp_table_to_entry(table, thd, entry);
743 Slave_worker *map_db_to_worker(
const char *dbname,
Relay_log_info *rli,
744 db_worker_hash_entry **ptr_entry,
745 bool need_temp_tables, Slave_worker *last_worker)
756 THD *thd= rli->info_thd;
758 DBUG_ENTER(
"get_slave_worker");
760 DBUG_ASSERT(!rli->last_assigned_worker ||
761 rli->last_assigned_worker == last_worker);
763 if (!inited_hash_workers)
766 db_worker_hash_entry *entry= NULL;
767 my_hash_value_type hash_value;
768 uchar dblength= (uint) strlen(dbname);
772 for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
774 entry= * (db_worker_hash_entry **)
775 dynamic_array_ptr(&rli->curr_group_assigned_parts, i);
776 if ((uchar) entry->db_len != dblength)
779 if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
782 DBUG_RETURN(last_worker);
786 DBUG_PRINT(
"info", (
"Searching for %s, %d", dbname, dblength));
788 hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
793 entry= (db_worker_hash_entry *)
794 my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
795 (uchar*) dbname, dblength);
809 DBUG_PRINT(
"info", (
"Inserting %s, %d", dbname, dblength));
814 if (!(db= (
char *) my_malloc((
size_t) dblength + 1, MYF(0))))
816 if (!(entry= (db_worker_hash_entry *)
817 my_malloc(
sizeof(db_worker_hash_entry), MYF(0))))
824 entry->db_len= strlen(db);
826 entry->temporary_tables= NULL;
833 entry->worker= (!last_worker) ?
834 get_least_occupied_worker(workers) : last_worker;
835 entry->worker->usage_partition++;
836 if (mapping_db_to_worker.records > mts_partition_hash_soft_max)
843 my_init_dynamic_array(&hash_element,
sizeof(db_worker_hash_entry *),
844 HASH_DYNAMIC_INIT, HASH_DYNAMIC_INCR);
845 for (uint i= 0; i < mapping_db_to_worker.records; i++)
847 DBUG_ASSERT(!entry->temporary_tables || !entry->temporary_tables->prev);
848 DBUG_ASSERT(!thd->temporary_tables || !thd->temporary_tables->prev);
850 db_worker_hash_entry *entry=
851 (db_worker_hash_entry*) my_hash_element(&mapping_db_to_worker, i);
853 if (entry->usage == 0)
855 mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
856 entry->temporary_tables= NULL;
859 push_dynamic(&hash_element, (uchar*) &entry);
864 for (uint i=0; i < hash_element.elements; i++)
866 db_worker_hash_entry *temp_entry= *(db_worker_hash_entry **) dynamic_array_ptr(&hash_element, i);
867 my_hash_delete(&mapping_db_to_worker, (uchar*) temp_entry);
870 delete_dynamic(&hash_element);
873 ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
882 DBUG_PRINT(
"info", (
"Inserted %s, %d", entry->db, (
int) strlen(entry->db)));
887 if (entry->usage == 0)
889 entry->worker= (!last_worker) ?
890 get_least_occupied_worker(workers) : last_worker;
891 entry->worker->usage_partition++;
894 else if (entry->worker == last_worker || !last_worker)
897 DBUG_ASSERT(entry->worker);
908 DBUG_ASSERT(last_worker != NULL &&
909 rli->curr_group_assigned_parts.elements > 0);
912 entry->worker= last_worker;
916 thd->ENTER_COND(&slave_worker_hash_cond,
917 &slave_worker_hash_lock,
918 &stage_slave_waiting_worker_to_release_partition,
921 }
while (entry->usage != 0 && !thd->killed);
923 thd->EXIT_COND(&old_stage);
931 entry->worker->usage_partition++;
938 if (entry->usage == 1 && need_temp_tables)
940 if (!entry->temporary_tables)
942 if (entry->db_len != 0)
944 move_temp_tables_to_entry(thd, entry);
948 entry->temporary_tables= thd->temporary_tables;
949 thd->temporary_tables= NULL;
957 for (
TABLE *table= thd->temporary_tables; table; table= table->next)
959 DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
972 (
"Updating %s with worker %lu", entry->db, entry->worker->id));
973 insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) &entry);
976 DBUG_RETURN(entry ? entry->worker : NULL);
994 long usage= LONG_MAX;
995 Slave_worker **ptr_current_worker= NULL, *worker= NULL;
998 DBUG_ENTER(
"get_least_occupied_worker");
1000 DBUG_EXECUTE_IF(
"mts_distribute_round_robin",
1004 dynamic_array_ptr(ws, w_rr % ws->elements));
1005 sql_print_information(
"Chosing worker id %lu, the following "
1006 "is going to be %lu", worker->id,
1007 w_rr % ws->elements);
1008 DBUG_RETURN(worker);
1011 for (i= 0; i< ws->elements; i++)
1013 ptr_current_worker= (Slave_worker **) dynamic_array_ptr(ws, i);
1014 if ((*ptr_current_worker)->usage_partition <= usage)
1016 worker= *ptr_current_worker;
1017 usage= (*ptr_current_worker)->usage_partition;
1021 DBUG_ASSERT(worker != NULL);
1023 DBUG_RETURN(worker);
1037 void Slave_worker::slave_worker_ends_group(
Log_event* ev,
int error)
1039 DBUG_ENTER(
"Slave_worker::slave_worker_ends_group");
1043 Slave_committed_queue *gaq= c_rli->gaq;
1044 Slave_job_group *ptr_g= gaq->get_job_group(gaq_index);
1049 DBUG_ASSERT(last_group_done_index != c_rli->gaq->size ||
1050 ptr_g->group_relay_log_name != NULL);
1051 DBUG_ASSERT(ptr_g->worker_id ==
id);
1053 if (ev->get_type_code() != XID_EVENT)
1055 commit_positions(ev, ptr_g,
false);
1056 DBUG_EXECUTE_IF(
"crash_after_commit_and_update_pos",
1057 sql_print_information(
"Crashing crash_after_commit_and_update_pos.");
1063 ptr_g->group_master_log_pos= group_master_log_pos;
1064 ptr_g->group_relay_log_pos= group_relay_log_pos;
1068 last_group_done_index= gaq_index;
1076 running_status= ERROR_LEAVING;
1081 c_rli->info_thd->awake(THD::KILL_QUERY);
1090 for (uint i= 0; i < ep->elements; i++)
1092 db_worker_hash_entry *entry=
1093 *((db_worker_hash_entry **) dynamic_array_ptr(ep, i));
1101 DBUG_ASSERT(entry->usage >= 0);
1103 if (entry->usage == 0)
1111 DBUG_ASSERT(usage_partition >= 0);
1112 DBUG_ASSERT(this->info_thd->temporary_tables == 0);
1113 DBUG_ASSERT(!entry->temporary_tables ||
1114 !entry->temporary_tables->prev);
1116 if (entry->worker !=
this)
1122 (
"Notifying entry %p release by worker %lu", entry, this->
id));
1128 DBUG_ASSERT(usage_partition != 0);
1133 if (ep->elements > ep->max_element)
1136 ep->elements= ep->max_element;
1142 curr_group_seen_gtid= curr_group_seen_begin=
false;
1167 ulong circular_buffer_queue::de_queue(uchar *val)
1172 DBUG_ASSERT(len == 0);
1177 get_dynamic(&Q, val, entry);
1183 entry= (entry + 1) %
size;
1189 DBUG_ASSERT(entry ==
size ||
1190 (len == (avail >= entry)? (avail - entry) :
1191 (
size + avail - entry)));
1192 DBUG_ASSERT(avail != entry);
1203 ulong circular_buffer_queue::de_tail(uchar *val)
1207 DBUG_ASSERT(len == 0);
1211 avail= (entry + len - 1) %
size;
1212 get_dynamic(&Q, val, avail);
1219 DBUG_ASSERT(entry ==
size ||
1220 (len == (avail >= entry)? (avail - entry) :
1221 (
size + avail - entry)));
1222 DBUG_ASSERT(avail != entry);
1231 ulong circular_buffer_queue::en_queue(
void *item)
1236 DBUG_ASSERT(avail == Q.elements);
1243 set_dynamic(&Q, (uchar*) item, avail);
1250 avail= (avail + 1) %
size;
1257 DBUG_ASSERT(avail == entry ||
1258 len == (avail >= entry) ?
1259 (avail - entry) : (
size + avail - entry));
1260 DBUG_ASSERT(avail != entry);
1265 void* circular_buffer_queue::head_queue()
1270 DBUG_ASSERT(len == 0);
1274 get_dynamic(&Q, (uchar*) ret, entry);
1290 bool circular_buffer_queue::gt(ulong i, ulong k)
1293 DBUG_ASSERT(avail != entry);
1312 for (i= entry, k= 0; k < len; i= (i + 1) %
size, k++)
1314 Slave_job_group *ptr_g;
1316 ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
1318 if (ptr_g->worker_id != (ulong) -1 && ptr_g->done)
1322 DBUG_ASSERT(cnt <=
size);
1324 DBUG_PRINT(
"mts", (
"Checking if it can simulate a crash:"
1325 " mts_checkpoint_group %u counter %lu parallel slaves %lu\n",
1326 opt_mts_checkpoint_group, cnt, rli->slave_parallel_workers));
1328 return (cnt == (rli->slave_parallel_workers * opt_mts_checkpoint_group));
1354 ulong Slave_committed_queue::move_queue_head(
DYNAMIC_ARRAY *ws)
1358 for (i= entry; i != avail && !empty(); cnt++, i= (i + 1) %
size)
1361 Slave_job_group *ptr_g, g;
1362 char grl_name[FN_REFLEN];
1366 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0) &&
1367 cnt == opt_mts_checkpoint_period)
1372 ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
1378 if (ptr_g->worker_id == MTS_WORKER_UNDEF || !ptr_g->done)
1382 compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS);
1384 get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
1389 if (ptr_g->group_relay_log_name)
1391 strcpy(grl_name, ptr_g->group_relay_log_name);
1392 my_free(ptr_g->group_relay_log_name);
1396 ptr_g->group_relay_log_name= NULL;
1402 ind= de_queue((uchar*) &g);
1409 if (grl_name[0] != 0)
1411 strcpy(lwm.group_relay_log_name, grl_name);
1413 g.group_relay_log_name= lwm.group_relay_log_name;
1416 DBUG_ASSERT(ind == i);
1417 DBUG_ASSERT(!ptr_g->group_relay_log_name);
1418 DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
1422 get_dynamic(&last_done, (uchar *) &l, w_i->id);
1427 DBUG_ASSERT(l < ptr_g->total_seqno);
1434 set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
1437 DBUG_ASSERT(cnt <=
size);
1447 void Slave_committed_queue::free_dynamic_items()
1450 for (i= entry, k= 0; k < len; i= (i + 1) %
size, k++)
1452 Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
1453 if (ptr_g->group_relay_log_name)
1455 my_free(ptr_g->group_relay_log_name);
1457 if (ptr_g->checkpoint_log_name)
1459 my_free(ptr_g->checkpoint_log_name);
1461 if (ptr_g->checkpoint_relay_log_name)
1463 my_free(ptr_g->checkpoint_relay_log_name);
1465 if (ptr_g->group_master_log_name)
1467 my_free(ptr_g->group_master_log_name);
1470 DBUG_ASSERT((avail ==
size || entry ==
size ) ||
1475 void Slave_worker::do_report(loglevel
level,
int err_code,
const char *
msg,
1478 char buff_coord[MAX_SLAVE_ERRMSG];
1480 const char* log_name=
const_cast<Slave_worker*
>(
this)->get_master_log_name();
1481 ulonglong log_pos=
const_cast<Slave_worker*
>(
this)->get_master_log_pos();
1484 if (gtid_next->
type == GTID_GROUP)
1486 global_sid_lock->
rdlock();
1487 gtid_next->
to_string(global_sid_map, buff_gtid);
1488 global_sid_lock->
unlock();
1496 "Worker %lu failed executing transaction '%s' at "
1497 "master log %s, end_log_pos %llu",
1498 id, buff_gtid, log_name, log_pos);
1499 c_rli->va_report(level, err_code, buff_coord, msg, args);
1527 int wait_for_workers_to_finish(
Relay_log_info const *rli, Slave_worker *ignore)
1530 HASH *hash= &mapping_db_to_worker;
1531 THD *thd= rli->info_thd;
1532 bool cant_sync= FALSE;
1535 DBUG_ENTER(
"wait_for_workers_to_finish");
1537 llstr(const_cast<Relay_log_info*>(rli)->get_event_relay_log_pos(), llbuf);
1538 if (log_warnings > 1)
1539 sql_print_information(
"Coordinator and workers enter synchronization procedure "
1540 "when scheduling event relay-log: %s pos: %s",
1541 const_cast<Relay_log_info*>(rli)->get_event_relay_log_name(),
1544 for (uint i= 0, ret= 0; i < hash->records; i++)
1546 db_worker_hash_entry *
entry;
1550 entry= (db_worker_hash_entry*) my_hash_element(hash, i);
1555 if (ignore && entry->worker == ignore && entry->usage > 0)
1561 if (entry->usage > 0 && !thd->killed)
1564 Slave_worker *w_entry= entry->worker;
1566 entry->worker= NULL;
1567 thd->ENTER_COND(&slave_worker_hash_cond,
1568 &slave_worker_hash_lock,
1569 &stage_slave_waiting_worker_to_release_partition,
1575 (
"Either got awakened of notified: "
1576 "entry %p, usage %lu, worker %lu",
1577 entry, entry->usage, w_entry->id));
1578 }
while (entry->usage != 0 && !thd->killed);
1579 entry->worker= w_entry;
1580 thd->EXIT_COND(&old_stage);
1588 mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
1589 entry->temporary_tables= NULL;
1590 if (entry->worker->running_status != Slave_worker::RUNNING)
1596 if (log_warnings > 1)
1597 sql_print_information(
"Coordinator synchronized with Workers, "
1598 "waited entries: %d, cant_sync: %d",
1601 const_cast<Relay_log_info*
>(rli)->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
1604 DBUG_RETURN(!cant_sync ? ret : -1);
1609 static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
1611 if (jobs->avail == jobs->size)
1613 DBUG_ASSERT(jobs->avail == jobs->Q.elements);
1619 set_dynamic(&jobs->Q, (uchar*) item, jobs->avail);
1622 if (jobs->entry == jobs->size)
1623 jobs->entry= jobs->avail;
1625 jobs->avail= (jobs->avail + 1) % jobs->size;
1629 if (jobs->avail == jobs->entry)
1630 jobs->avail= jobs->size;
1631 DBUG_ASSERT(jobs->avail == jobs->entry ||
1632 jobs->len == (jobs->avail >= jobs->entry) ?
1633 (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry));
1640 static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
1642 if (jobs->entry == jobs->size)
1644 DBUG_ASSERT(jobs->len == 0);
1648 get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
1650 DBUG_ASSERT(ret->data);
1659 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
1661 if (jobs->entry == jobs->size)
1663 DBUG_ASSERT(jobs->len == 0);
1666 get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
1670 if (jobs->avail == jobs->size)
1671 jobs->avail= jobs->entry;
1672 jobs->entry= (jobs->entry + 1) % jobs->size;
1675 if (jobs->avail == jobs->entry)
1676 jobs->entry= jobs->size;
1678 DBUG_ASSERT(jobs->entry == jobs->size ||
1679 (jobs->len == (jobs->avail >= jobs->entry) ?
1680 (jobs->avail - jobs->entry) :
1681 (jobs->size + jobs->avail - jobs->entry)));
1697 bool append_item_to_jobs(slave_job_item *job_item,
1700 THD *thd= rli->info_thd;
1702 ulong ev_size= ((
Log_event*) (job_item->data))->data_written;
1703 ulonglong new_pend_size;
1707 DBUG_ASSERT(thd == current_thd);
1709 if (ev_size > rli->mts_pending_jobs_size_max)
1712 llstr(rli->get_event_relay_log_pos(), llbuff);
1713 my_error(ER_MTS_EVENT_BIGGER_PENDING_JOBS_SIZE_MAX, MYF(0),
1714 ((
Log_event*) (job_item->data))->get_type_str(),
1715 rli->get_event_relay_log_name(), llbuff, ev_size,
1716 rli->mts_pending_jobs_size_max);
1718 rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
1723 new_pend_size= rli->mts_pending_jobs_size + ev_size;
1725 while (new_pend_size > rli->mts_pending_jobs_size_max)
1727 rli->mts_wq_oversize= TRUE;
1728 rli->wq_size_waits_cnt++;
1729 thd->ENTER_COND(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
1730 &stage_slave_waiting_worker_to_free_events, &old_stage);
1732 thd->EXIT_COND(&old_stage);
1735 if (log_warnings > 1 && (rli->wq_size_waits_cnt % 10 == 1))
1736 sql_print_information(
"Multi-threaded slave: Coordinator has waited "
1737 "%lu times hitting slave_pending_jobs_size_max; "
1738 "current event size = %lu.",
1739 rli->wq_size_waits_cnt, ev_size);
1742 new_pend_size= rli->mts_pending_jobs_size + ev_size;
1744 rli->pending_jobs++;
1745 rli->mts_pending_jobs_size= new_pend_size;
1746 rli->mts_events_assigned++;
1754 if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF &&
1755 worker->jobs.len > worker->underrun_level)
1762 ulong nap_weight= rli->mts_wq_excess_cnt + 1;
1776 my_sleep(min<ulong>(1000, nap_weight * rli->mts_coordinator_basic_nap));
1777 rli->mts_wq_no_underrun_cnt++;
1783 while (worker->running_status == Slave_worker::RUNNING && !thd->killed &&
1784 (ret= en_queue(&worker->jobs, job_item)) == -1)
1786 thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
1787 &stage_slave_waiting_worker_queue, &old_stage);
1788 worker->jobs.overfill= TRUE;
1789 worker->jobs.waited_overfill++;
1790 rli->mts_wq_overfill_cnt++;
1792 thd->EXIT_COND(&old_stage);
1798 worker->curr_jobs++;
1799 if (worker->jobs.len == 1)
1809 rli->pending_jobs--;
1810 rli->mts_pending_jobs_size -= ev_size;
1814 return (-1 != ret ?
false :
true);
1828 struct slave_job_item* pop_jobs_item(Slave_worker *worker, Slave_job_item *job_item)
1830 THD *thd= worker->info_thd;
1834 while (!job_item->data && !thd->killed &&
1835 worker->running_status == Slave_worker::RUNNING)
1839 head_queue(&worker->jobs, job_item);
1840 if (job_item->data == NULL)
1842 worker->wq_empty_waits++;
1843 thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
1844 &stage_slave_waiting_event_from_coordinator,
1847 thd->EXIT_COND(&old_stage);
1852 worker->curr_jobs--;
1856 thd_proc_info(worker->info_thd,
"Executing event");
1876 int slave_worker_exec_job(Slave_worker *worker,
Relay_log_info *rli)
1879 struct slave_job_item item= {NULL}, *job_item= &item;
1880 THD *thd= worker->info_thd;
1882 bool part_event= FALSE;
1884 DBUG_ENTER(
"slave_worker_exec_job");
1886 job_item= pop_jobs_item(worker, job_item);
1887 if (thd->killed || worker->running_status != Slave_worker::RUNNING)
1893 ev=
static_cast<Log_event*
>(job_item->data);
1894 thd->server_id = ev->server_id;
1896 thd->lex->current_select= 0;
1897 if (!ev->when.tv_sec)
1898 ev->when.tv_sec= my_time(0);
1902 DBUG_PRINT(
"slave_worker_exec_job:", (
"W_%lu <- job item: %p data: %p thd: %p", worker->id, job_item, ev, thd));
1904 if (ev->starts_group())
1906 worker->curr_group_seen_begin=
true;
1907 worker->end_group_sets_max_dbs=
true;
1909 else if (!is_gtid_event(ev))
1912 ev->contains_partition_info(worker->end_group_sets_max_dbs)))
1914 uint num_dbs= ev->mts_number_dbs();
1917 if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
1920 DBUG_ASSERT(num_dbs > 0);
1922 for (uint k= 0; k < num_dbs; k++)
1926 for (uint i= 0; i < ep->elements && !found; i++)
1929 *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
1930 ev->mts_assigned_partitions[k];
1939 insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
1942 worker->end_group_sets_max_dbs=
false;
1947 worker->set_master_log_pos(ev->log_pos);
1949 error= ev->do_apply_event_worker(worker);
1950 if (ev->ends_group() || (!worker->curr_group_seen_begin &&
1956 part_event && !is_gtid_event(ev)))
1958 DBUG_PRINT(
"slave_worker_exec_job:",
1959 (
" commits GAQ index %lu, last committed %lu",
1961 worker->slave_worker_ends_group(ev, error);
1964 DBUG_PRINT(
"mts", (
"Check_slave_debug_group worker %lu mts_checkpoint_group"
1965 " %u processed %lu debug %d\n", worker->id, opt_mts_checkpoint_group,
1966 worker->groups_done,
1967 DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0)));
1968 if (DBUG_EVALUATE_IF(
"check_slave_debug_group", 1, 0) &&
1969 opt_mts_checkpoint_group == worker->groups_done)
1971 DBUG_PRINT(
"mts", (
"Putting worker %lu in busy wait.", worker->id));
1972 while (
true) my_sleep(6000000);
1978 de_queue(&worker->jobs, job_item);
1981 if (worker->jobs.len == worker->jobs.size - 1 && worker->jobs.overfill == TRUE)
1983 worker->jobs.overfill= FALSE;
1994 rli->pending_jobs--;
1995 rli->mts_pending_jobs_size -= ev->data_written;
1996 DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
2004 if (worker->underrun_level > worker->jobs.len && worker->jobs.len != 0)
2006 rli->mts_wq_underrun_w_id= worker->id;
2007 }
else if (rli->mts_wq_underrun_w_id == worker->id)
2010 rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
2022 if (worker->overrun_level < worker->jobs.len)
2024 ulong last_overrun= worker->wq_overrun_cnt;
2028 worker->wq_overrun_cnt= worker->jobs.len - worker->overrun_level;
2029 excess_delta= worker->wq_overrun_cnt - last_overrun;
2030 worker->excess_cnt+= excess_delta;
2031 rli->mts_wq_excess_cnt+= excess_delta;
2032 rli->mts_wq_overrun_cnt++;
2035 DBUG_ASSERT(rli->workers.elements != 1 ||
2036 rli->mts_wq_excess_cnt == worker->wq_overrun_cnt);
2038 else if (worker->excess_cnt > 0)
2042 rli->mts_wq_excess_cnt-= worker->excess_cnt;
2043 worker->excess_cnt= 0;
2044 worker->wq_overrun_cnt= 0;
2046 DBUG_ASSERT(rli->mts_wq_excess_cnt >= 0);
2047 DBUG_ASSERT(rli->mts_wq_excess_cnt == 0 || rli->workers.elements > 1);
2052 if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
2053 rli->mts_wq_oversize)
2055 rli->mts_wq_oversize= FALSE;
2061 worker->events_done++;
2066 if (log_warnings > 1)
2067 sql_print_information(
"Worker %lu is exiting: killed %i, error %i, "
2068 "running_status %d",
2069 worker->id, thd->killed, thd->is_error(),
2070 worker->running_status);
2071 worker->slave_worker_ends_group(ev, error);
2075 if (ev && ev->
worker && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)