16 #include "rpl_info_table.h"
17 #include "rpl_utility.h"
19 Rpl_info_table::Rpl_info_table(uint nparam,
20 const char* param_schema,
21 const char *param_table)
22 :Rpl_info_handler(nparam), is_transactional(FALSE)
24 str_schema.str= str_table.str= NULL;
25 str_schema.length= str_table.length= 0;
27 uint schema_length= strlen(param_schema);
28 if ((str_schema.str= (
char *) my_malloc(schema_length + 1, MYF(0))))
30 str_schema.length= schema_length;
31 strmake(str_schema.str, param_schema, schema_length);
34 uint table_length= strlen(param_table);
35 if ((str_table.str= (
char *) my_malloc(table_length + 1, MYF(0))))
37 str_table.length= table_length;
38 strmake(str_table.str, param_table, table_length);
41 if ((description= (
char *)
42 my_malloc(str_schema.length + str_table.length + 2, MYF(0))))
44 char *pos= strmov(description, param_schema);
45 pos= strmov(pos,
".");
46 pos= strmov(pos, param_table);
52 Rpl_info_table::~Rpl_info_table()
58 my_free(str_table.str);
60 my_free(str_schema.str);
63 int Rpl_info_table::do_init_info()
65 return do_init_info(FIND_KEY, 0);
68 int Rpl_info_table::do_init_info(uint instance)
70 return do_init_info(FIND_SCAN, instance);
73 int Rpl_info_table::do_init_info(enum_find_method method, uint instance)
76 enum enum_return_id res= FOUND_ID;
79 Open_tables_backup backup;
81 DBUG_ENTER(
"Rlp_info_table::do_init_info");
85 saved_mode= thd->variables.sql_mode;
86 tmp_disable_binlog(thd);
91 if (access->
open_table(thd, str_schema, str_table,
92 get_number_info(), TL_WRITE,
103 res= access->
find_info(field_values, table);
125 error= (res == ERROR_ID);
131 reenable_binlog(thd);
132 thd->variables.sql_mode= saved_mode;
137 int Rpl_info_table::do_flush_info(
const bool force)
140 enum enum_return_id res= FOUND_ID;
143 Open_tables_backup backup;
145 DBUG_ENTER(
"Rpl_info_table::do_flush_info");
147 if (!(force || (sync_period &&
148 ++(sync_counter) >= sync_period)))
154 saved_mode= thd->variables.sql_mode;
155 tmp_disable_binlog(thd);
160 if (access->
open_table(thd, str_schema, str_table,
161 get_number_info(), TL_WRITE,
169 if ((res= access->
find_info(field_values, table)) == NOT_FOUND_ID)
182 if ((error= table->file->ha_write_row(table->record[0])))
194 else if (res == FOUND_ID)
199 store_record(table,
record[1]);
207 if ((error= table->file->ha_update_row(table->record[1], table->record[0])) &&
208 error != HA_ERR_RECORD_IS_THE_SAME)
222 DBUG_EXECUTE_IF(
"mts_debug_concurrent_access",
224 while (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER &&
225 mts_debug_concurrent_access < 2 && mts_debug_concurrent_access > 0)
227 DBUG_PRINT(
"mts", (
"Waiting while locks are acquired to show "
228 "concurrency in mts: %u %lu\n", mts_debug_concurrent_access,
229 (ulong) thd->thread_id));
238 access->close_table(thd, table, &backup, error);
239 reenable_binlog(thd);
240 thd->variables.sql_mode= saved_mode;
241 access->drop_thd(thd);
245 int Rpl_info_table::do_remove_info()
247 return do_clean_info();
250 int Rpl_info_table::do_clean_info()
253 enum enum_return_id res= FOUND_ID;
256 Open_tables_backup backup;
258 DBUG_ENTER(
"Rpl_info_table::do_remove_info");
262 saved_mode= thd->variables.sql_mode;
263 tmp_disable_binlog(thd);
268 if (access->
open_table(thd, str_schema, str_table,
269 get_number_info(), TL_WRITE,
277 if ((res= access->
find_info(field_values, table)) == FOUND_ID)
282 if ((error= table->file->ha_delete_row(table->record[0])))
288 error= (res == ERROR_ID);
294 reenable_binlog(thd);
295 thd->variables.sql_mode= saved_mode;
300 int Rpl_info_table::do_reset_info(uint nparam,
301 const char* param_schema,
302 const char *param_table)
307 Open_tables_backup backup;
310 enum enum_return_id scan_retval= FOUND_ID;
312 DBUG_ENTER(
"Rpl_info_table::do_reset_info");
319 saved_mode= thd->variables.sql_mode;
320 tmp_disable_binlog(thd);
325 if (info->access->
open_table(thd, info->str_schema, info->str_table,
326 info->get_number_info(), TL_WRITE,
334 while ((scan_retval= info->access->
scan_info(table, 1)) == FOUND_ID)
336 if ((error= table->file->ha_delete_row(table->record[0])))
338 table->file->print_error(error, MYF(0));
342 error= (scan_retval == ERROR_ID);
348 info->access->
close_table(thd, table, &backup, error);
349 reenable_binlog(thd);
350 thd->variables.sql_mode= saved_mode;
356 enum_return_check Rpl_info_table::do_check_info()
360 Open_tables_backup backup;
361 enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
363 DBUG_ENTER(
"Rpl_info_table::do_check_info");
366 saved_mode= thd->variables.sql_mode;
371 if (access->
open_table(thd, str_schema, str_table,
372 get_number_info(), TL_READ,
375 sql_print_warning(
"Info table is not ready to be used. Table "
376 "'%s.%s' cannot be opened.", str_schema.str,
379 return_check= ERROR_CHECKING_REPOSITORY;
387 if (access->
find_info(field_values, table) != FOUND_ID)
394 return_check= REPOSITORY_DOES_NOT_EXIST;
397 return_check= REPOSITORY_EXISTS;
405 return_check == ERROR_CHECKING_REPOSITORY);
406 thd->variables.sql_mode= saved_mode;
408 DBUG_RETURN(return_check);
411 enum_return_check Rpl_info_table::do_check_info(uint instance)
415 Open_tables_backup backup;
416 enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
418 DBUG_ENTER(
"Rpl_info_table::do_check_info");
421 saved_mode= thd->variables.sql_mode;
426 if (access->
open_table(thd, str_schema, str_table,
427 get_number_info(), TL_READ,
430 sql_print_warning(
"Info table is not ready to be used. Table "
431 "'%s.%s' cannot be opened.", str_schema.str,
434 return_check= ERROR_CHECKING_REPOSITORY;
442 if (access->
scan_info(table, instance) != FOUND_ID)
449 return_check= REPOSITORY_DOES_NOT_EXIST;
452 return_check= REPOSITORY_EXISTS;
460 return_check == ERROR_CHECKING_REPOSITORY);
461 thd->variables.sql_mode= saved_mode;
463 DBUG_RETURN(return_check);
466 bool Rpl_info_table::do_count_info(uint nparam,
467 const char* param_schema,
468 const char *param_table,
474 Open_tables_backup backup;
478 DBUG_ENTER(
"Rpl_info_table::do_count_info");
480 if (!(info=
new Rpl_info_table(nparam, param_schema, param_table)))
484 saved_mode= thd->variables.sql_mode;
489 if (info->access->
open_table(thd, info->str_schema, info->str_table,
490 info->get_number_info(), TL_READ,
506 sql_print_warning(
"Info table is not ready to be used. Table "
507 "'%s.%s' cannot be scanned.", info->str_schema.str,
508 info->str_table.str);
517 info->access->
close_table(thd, table, &backup, error);
518 thd->variables.sql_mode= saved_mode;
524 void Rpl_info_table::do_end_info()
528 int Rpl_info_table::do_prepare_info_for_read()
539 int Rpl_info_table::do_prepare_info_for_write()
541 return(do_prepare_info_for_read());
544 uint Rpl_info_table::do_get_rpl_info_type()
546 return INFO_REPOSITORY_TABLE;
549 bool Rpl_info_table::do_set_info(
const int pos,
const char *value)
551 return (field_values->value[pos].copy(value, strlen(value),
555 bool Rpl_info_table::do_set_info(
const int pos,
const uchar *value,
558 return (field_values->value[pos].copy((
char *) value, size,
562 bool Rpl_info_table::do_set_info(
const int pos,
const ulong value)
564 return (field_values->value[pos].set_int(value, TRUE,
568 bool Rpl_info_table::do_set_info(
const int pos,
const int value)
570 return (field_values->value[pos].set_int(value, FALSE,
574 bool Rpl_info_table::do_set_info(
const int pos,
const float value)
576 return (field_values->value[pos].set_real(value, NOT_FIXED_DEC,
580 bool Rpl_info_table::do_set_info(
const int pos,
const Dynamic_ids *value)
582 if (const_cast<Dynamic_ids *>(value)->pack_dynamic_ids(&field_values->value[pos]))
588 bool Rpl_info_table::do_get_info(
const int pos,
char *value,
const size_t size,
589 const char *default_value)
591 if (field_values->value[pos].length())
592 strmake(value, field_values->value[pos].c_ptr_safe(),
593 field_values->value[pos].length());
594 else if (default_value)
595 strmake(value, default_value, strlen(default_value));
602 bool Rpl_info_table::do_get_info(
const int pos, uchar *value,
const size_t size,
603 const uchar *default_value __attribute__((unused)))
605 if (field_values->value[pos].length() ==
size)
606 return (!memcpy((
char *) value, (
char *)
607 field_values->value[pos].c_ptr_safe(),
size));
611 bool Rpl_info_table::do_get_info(
const int pos, ulong *value,
612 const ulong default_value)
614 if (field_values->value[pos].length())
616 *value= strtoul(field_values->value[pos].c_ptr_safe(), 0, 10);
619 else if (default_value)
621 *value= default_value;
628 bool Rpl_info_table::do_get_info(
const int pos,
int *value,
629 const int default_value)
631 if (field_values->value[pos].length())
633 *value= atoi(field_values->value[pos].c_ptr_safe());
636 else if (default_value)
638 *value= default_value;
645 bool Rpl_info_table::do_get_info(
const int pos,
float *value,
646 const float default_value)
648 if (field_values->value[pos].length())
650 if (sscanf(field_values->value[pos].c_ptr_safe(),
"%f", value) != 1)
654 else if (default_value != 0.0)
656 *value= default_value;
663 bool Rpl_info_table::do_get_info(
const int pos,
Dynamic_ids *value,
664 const Dynamic_ids *default_value __attribute__((unused)))
666 if (value->unpack_dynamic_ids(field_values->value[pos].c_ptr_safe()))
672 char* Rpl_info_table::do_get_description_info()
677 bool Rpl_info_table::do_is_transactional()
679 return is_transactional;
682 bool Rpl_info_table::do_update_is_transactional()
687 Open_tables_backup backup;
689 DBUG_ENTER(
"Rpl_info_table::do_update_is_transactional");
692 saved_mode= thd->variables.sql_mode;
693 tmp_disable_binlog(thd);
698 if (access->
open_table(thd, str_schema, str_table,
699 get_number_info(), TL_READ,
703 is_transactional= table->file->has_transactions();
708 reenable_binlog(thd);
709 thd->variables.sql_mode= saved_mode;