18 #include "ha_ndbcluster_glue.h"
20 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
21 #include "ha_ndbcluster.h"
22 #include "ha_ndbcluster_connection.h"
23 #include "ndb_local_connection.h"
25 #include "ndb_table_guard.h"
26 #include "ndb_global_schema_lock.h"
27 #include "ndb_global_schema_lock_guard.h"
29 #include "rpl_injector.h"
30 #include "rpl_filter.h"
31 #if MYSQL_VERSION_ID > 50600
37 #include "global_threads.h"
38 #include "ha_ndbcluster_binlog.h"
39 #include <ndbapi/NdbDictionary.hpp>
40 #include <ndbapi/ndb_cluster_connection.hpp>
42 extern my_bool opt_ndb_log_orig;
43 extern my_bool opt_ndb_log_bin;
44 extern my_bool opt_ndb_log_update_as_write;
45 extern my_bool opt_ndb_log_updated_only;
46 extern my_bool opt_ndb_log_binlog_index;
47 extern my_bool opt_ndb_log_apply_status;
48 extern ulong opt_ndb_extra_logging;
51 bool ndb_log_empty_epochs(
void);
56 #include "ha_ndbcluster_tables.h"
58 #include "ndb_dist_priv_util.h"
64 static const int DEFAULT_SYNC_TIMEOUT= 120;
71 static int ndb_binlog_thread_running= 0;
76 my_bool ndb_binlog_running= FALSE;
77 static my_bool ndb_binlog_tables_inited= FALSE;
78 static my_bool ndb_binlog_is_ready= FALSE;
81 ndb_binlog_is_read_only(
void)
83 if(!ndb_binlog_tables_inited)
89 if (ndb_binlog_running && !ndb_binlog_is_ready)
106 extern THD * injector_thd;
118 static Ndb *injector_ndb= 0;
119 static Ndb *schema_ndb= 0;
121 static int ndbcluster_binlog_inited= 0;
136 static int ndbcluster_binlog_terminating= 0;
142 pthread_t ndb_binlog_thread;
143 pthread_mutex_t injector_mutex;
144 pthread_cond_t injector_cond;
147 static ulonglong ndb_latest_applied_binlog_epoch= 0;
148 static ulonglong ndb_latest_handled_binlog_epoch= 0;
149 static ulonglong ndb_latest_received_binlog_epoch= 0;
153 pthread_mutex_t ndb_schema_share_mutex;
155 extern my_bool opt_log_slave_updates;
156 static my_bool g_ndb_log_slave_updates;
159 HASH ndb_schema_objects;
160 typedef struct st_ndb_schema_object {
161 pthread_mutex_t mutex;
166 uint32 slock[256/32];
168 uint32 table_version;
170 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(
const char *key,
171 my_bool create_if_not_exists,
173 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
180 for (uint j= 0; j < table->s->fields; j++)
184 Field *field= table->field[j];
185 const uchar* field_ptr= field->ptr - table->record[0] + record;
186 int pack_len= field->pack_length();
187 int n= pack_len < 10 ? pack_len : 10;
189 for (
int i= 0;
i < n && pos < 20;
i++)
191 pos+= sprintf(&buf[pos],
" %x", (
int) (uchar) field_ptr[
i]);
194 DBUG_PRINT(
"info",(
"[%u]field_ptr[0->%d]: %s", j, n, buf));
199 #define print_records(a,b)
204 static void dbug_print_table(
const char *info,
TABLE *table)
208 DBUG_PRINT(
"info",(
"%s: (null)", info));
212 (
"%s: %s.%s s->fields: %d "
213 "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
217 table->s->table_name.str,
220 table->s->rec_buff_length,
221 (
long) table->record[0],
222 (
long) table->record[1]));
224 for (
unsigned int i= 0;
i < table->s->fields;
i++)
226 Field *f= table->field[
i];
228 (
"[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
229 "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
233 (f->flags & PRI_KEY_FLAG) ?
"pri" :
"attr",
234 (f->flags & NOT_NULL_FLAG) ?
"" :
",nullable",
235 (f->flags & UNSIGNED_FLAG) ?
",unsigned" :
",signed",
236 (f->flags & ZEROFILL_FLAG) ?
",zerofill" :
"",
237 (f->flags & BLOB_FLAG) ?
",blob" :
"",
238 (f->flags & BINARY_FLAG) ?
",binary" :
"",
241 (long) f->ptr, (
int) (f->ptr - table->record[0]),
244 (int) ((uchar*) f->
null_ptr - table->record[0])));
245 if (f->type() == MYSQL_TYPE_BIT)
248 DBUG_PRINT(
"MYSQL_TYPE_BIT",(
"field_length: %d bit_ptr: 0x%lx[+%d] "
249 "bit_ofs: %d bit_len: %u",
250 g->field_length, (
long) g->bit_ptr,
251 (
int) ((uchar*) g->bit_ptr -
253 g->bit_ofs, g->bit_len));
258 #define dbug_print_table(a,b)
263 print_warning_list(
const char* prefix, THD* thd)
266 it(thd->get_stmt_da()->sql_conditions());
271 sql_print_warning(
"%s: (%d)%s",
273 err->get_sql_errno(),
274 err->get_message_text());
279 static void run_query(THD *thd,
char *buf,
char *end,
280 const int *no_print_error)
293 (void)mysqld.raw_run_query(buf, (end - buf),
298 ndbcluster_binlog_close_table(THD *thd,
NDB_SHARE *share)
300 DBUG_ENTER(
"ndbcluster_binlog_close_table");
305 share->event_data= 0;
319 ndb_binlog_open_shadow_table(THD *thd,
NDB_SHARE *share)
322 DBUG_ASSERT(share->event_data == 0);
324 DBUG_ENTER(
"ndb_binlog_open_shadow_table");
327 my_pthread_getspecific_ptr(
MEM_ROOT**, THR_MALLOC);
329 init_sql_alloc(&event_data->mem_root, 1024, 0);
330 *root_ptr= &event_data->mem_root;
335 (
TABLE*)alloc_root(&event_data->mem_root,
sizeof(
TABLE));
337 init_tmp_table_share(thd, shadow_table_share,
341 if ((error= open_table_def(thd, shadow_table_share, 0)) ||
342 (error= open_table_from_share(thd, shadow_table_share,
"", 0,
343 (uint) (OPEN_FRM_FILE_ONLY | DELAYED_OPEN | READ_ALL),
345 #ifdef NDB_WITHOUT_ONLINE_ALTER
352 DBUG_PRINT(
"error", (
"failed to open shadow table, error: %d my_errno: %d",
354 free_table_share(shadow_table_share);
356 share->event_data= 0;
360 event_data->shadow_table= shadow_table;
363 assign_new_table_id(shadow_table_share);
366 shadow_table->in_use= injector_thd;
368 shadow_table->s->db.str= share->db;
369 shadow_table->s->db.length= strlen(share->db);
370 shadow_table->s->table_name.str= share->table_name;
371 shadow_table->s->table_name.length= strlen(share->table_name);
373 shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
374 &shadow_table->s->all_set);
376 dbug_print_table(
"table", shadow_table);
386 int ndbcluster_binlog_init_share(THD *thd,
NDB_SHARE *share,
TABLE *_table)
388 MEM_ROOT *mem_root= &share->mem_root;
389 int do_event_op= ndb_binlog_running;
391 DBUG_ENTER(
"ndbcluster_binlog_init_share");
393 share->connect_count= g_ndb_cluster_connection->get_connect_count();
394 #ifdef HAVE_NDB_BINLOG
395 share->m_cfn_share= NULL;
400 share->event_data= 0;
402 if (!ndb_schema_share &&
403 strcmp(share->db, NDB_REP_DB) == 0 &&
404 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
406 else if (!ndb_apply_status_share &&
407 strcmp(share->db, NDB_REP_DB) == 0 &&
408 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
411 if (Ndb_dist_priv_util::is_distributed_priv_table(share->db,
418 int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
420 alloc_root(mem_root, no_nodes *
sizeof(
MY_BITMAP));
421 for (i= 0; i < no_nodes; i++)
423 bitmap_init(&share->subscriber_bitmap[i],
424 (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
425 max_ndb_nodes, FALSE);
426 bitmap_clear_all(&share->subscriber_bitmap[i]);
434 if (_table->s->primary_key == MAX_KEY)
435 share->flags|= NSF_HIDDEN_PK;
436 if (_table->s->blob_fields != 0)
437 share->flags|= NSF_BLOB_FLAG;
441 share->flags|= NSF_NO_BINLOG;
447 if ((error= ndb_binlog_open_shadow_table(thd, share)))
449 if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
450 share->flags|= NSF_HIDDEN_PK;
451 if (share->event_data->shadow_table->s->blob_fields != 0)
452 share->flags|= NSF_BLOB_FLAG;
470 static void ndbcluster_binlog_wait(THD *thd)
472 if (ndb_binlog_running)
474 DBUG_ENTER(
"ndbcluster_binlog_wait");
475 ulonglong wait_epoch= ndb_get_latest_trans_gci();
482 const char *save_info= thd ? thd->proc_info : 0;
485 thd->proc_info=
"Waiting for ndbcluster binlog update to "
486 "reach current position";
487 pthread_mutex_lock(&injector_mutex);
488 while (!(thd && thd->killed) && count && ndb_binlog_running &&
489 (ndb_latest_handled_binlog_epoch == 0 ||
490 ndb_latest_handled_binlog_epoch < wait_epoch))
494 set_timespec(abstime, 1);
495 pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
497 pthread_mutex_unlock(&injector_mutex);
499 thd->proc_info= save_info;
507 static int ndbcluster_reset_logs(THD *thd)
509 if (!ndb_binlog_running)
513 if (!((thd->lex->sql_command == SQLCOM_RESET) &&
514 (thd->lex->type & REFRESH_MASTER)))
517 DBUG_ENTER(
"ndbcluster_reset_logs");
523 ndbcluster_binlog_wait(thd);
530 const bool ignore_no_such_table =
true;
531 if(mysqld.truncate_table(STRING_WITH_LEN(
"mysql"),
532 STRING_WITH_LEN(
"ndb_binlog_index"),
533 ignore_no_such_table))
546 setup_thd(
char * stackptr)
548 DBUG_ENTER(
"setup_thd");
554 THD_CHECK_SENTRY(thd);
557 thd->thread_stack= stackptr;
558 if (thd->store_globals())
567 thd->init_for_queries();
568 thd_set_command(thd, COM_DAEMON);
569 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
570 #ifndef NDB_THD_HAS_NO_VERSION
571 thd->version= refresh_version;
573 thd->client_capabilities= 0;
574 thd->lex->start_transaction_opt= 0;
575 thd->security_ctx->skip_grants();
577 CHARSET_INFO *charset_connection= get_charset_by_csname(
"utf8",
580 thd->variables.character_set_client= charset_connection;
581 thd->variables.character_set_results= charset_connection;
582 thd->variables.collation_connection= charset_connection;
583 thd->update_charset();
593 ndbcluster_binlog_index_purge_file(THD *thd,
const char *
file)
597 DBUG_ENTER(
"ndbcluster_binlog_index_purge_file");
598 DBUG_PRINT(
"enter", (
"file: %s", file));
600 if (!ndb_binlog_running || (thd && thd->slave_thread))
609 if ((thd = setup_thd((
char*)&save_thd)) == 0)
616 sql_print_warning(
"NDB: Unable to purge "
617 NDB_REP_DB
"." NDB_REP_TABLE
618 " File=%s (failed to setup thd)", file);
629 const bool ignore_no_such_table =
true;
630 if(mysqld.delete_rows(STRING_WITH_LEN(
"mysql"),
631 STRING_WITH_LEN(
"ndb_binlog_index"),
632 ignore_no_such_table,
633 "File='", file,
"'", NULL))
649 #ifndef NDB_WITHOUT_DIST_PRIV
652 priv_tables_are_in_ndb(THD *thd)
654 bool distributed=
false;
656 DBUG_ENTER(
"ndbcluster_distributed_privileges");
658 Ndb *ndb= check_ndb_in_thd(thd);
666 while((table_name= dist_priv.iter_next_table()))
668 DBUG_PRINT(
"info", (
"table_name: %s", table_name));
670 const NDBTAB *ndbtab= ndbtab_g.get_table();
675 else if (distributed)
677 sql_print_error(
"NDB: Inconsistency detected in distributed "
678 "privilege tables. Table '%s.%s' is not distributed",
683 DBUG_RETURN(distributed);
688 ndbcluster_binlog_log_query(
handlerton *hton, THD *thd, enum_binlog_command binlog_command,
689 const char *
query, uint query_length,
690 const char *db,
const char *table_name)
692 DBUG_ENTER(
"ndbcluster_binlog_log_query");
693 DBUG_PRINT(
"enter", (
"db: %s table_name: %s query: %s",
694 db, table_name, query));
695 enum SCHEMA_OP_TYPE
type;
697 uint32 table_id= 0, table_version= 0;
705 table_id = (uint32)rand();
706 table_version = (uint32)rand();
707 switch (binlog_command)
709 case LOGCOM_CREATE_TABLE:
710 type= SOT_CREATE_TABLE;
713 case LOGCOM_ALTER_TABLE:
714 type= SOT_ALTER_TABLE_COMMIT;
717 case LOGCOM_RENAME_TABLE:
718 type= SOT_RENAME_TABLE;
721 case LOGCOM_DROP_TABLE:
722 type= SOT_DROP_TABLE;
725 case LOGCOM_CREATE_DB:
729 case LOGCOM_ALTER_DB:
737 #ifndef NDB_WITHOUT_DIST_PRIV
738 case LOGCOM_CREATE_USER:
739 type= SOT_CREATE_USER;
740 if (priv_tables_are_in_ndb(thd))
742 DBUG_PRINT(
"info", (
"Privilege tables have been distributed, logging statement"));
746 case LOGCOM_DROP_USER:
748 if (priv_tables_are_in_ndb(thd))
750 DBUG_PRINT(
"info", (
"Privilege tables have been distributed, logging statement"));
754 case LOGCOM_RENAME_USER:
755 type= SOT_RENAME_USER;
756 if (priv_tables_are_in_ndb(thd))
758 DBUG_PRINT(
"info", (
"Privilege tables have been distributed, logging statement"));
764 if (priv_tables_are_in_ndb(thd))
766 DBUG_PRINT(
"info", (
"Privilege tables have been distributed, logging statement"));
772 if (priv_tables_are_in_ndb(thd))
774 DBUG_PRINT(
"info", (
"Privilege tables have been distributed, logging statement"));
782 ndbcluster_log_schema_op(thd, query, query_length,
783 db, table_name, table_id, table_version, type,
795 int ndbcluster_binlog_end(THD *thd)
797 DBUG_ENTER(
"ndbcluster_binlog_end");
799 if (ndb_util_thread_running > 0)
808 sql_print_information(
"Stopping Cluster Utility thread");
809 pthread_mutex_lock(&LOCK_ndb_util_thread);
811 ndb_util_thread_running++;
812 ndbcluster_terminating= 1;
813 pthread_cond_signal(&COND_ndb_util_thread);
814 while (ndb_util_thread_running > 1)
815 pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
816 ndb_util_thread_running--;
817 pthread_mutex_unlock(&LOCK_ndb_util_thread);
820 if (ndb_index_stat_thread_running > 0)
826 sql_print_information(
"Stopping Cluster Index Stats thread");
827 pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
829 ndb_index_stat_thread_running++;
830 ndbcluster_terminating= 1;
831 pthread_cond_signal(&COND_ndb_index_stat_thread);
832 while (ndb_index_stat_thread_running > 1)
833 pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
834 ndb_index_stat_thread_running--;
835 pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
838 if (ndbcluster_binlog_inited)
840 ndbcluster_binlog_inited= 0;
841 if (ndb_binlog_thread_running)
844 ndbcluster_binlog_terminating= 1;
845 pthread_mutex_lock(&injector_mutex);
846 pthread_cond_signal(&injector_cond);
847 while (ndb_binlog_thread_running > 0)
848 pthread_cond_wait(&injector_cond, &injector_mutex);
849 pthread_mutex_unlock(&injector_mutex);
851 pthread_mutex_destroy(&injector_mutex);
852 pthread_cond_destroy(&injector_cond);
853 pthread_mutex_destroy(&ndb_schema_share_mutex);
862 static void ndbcluster_reset_slave(THD *thd)
865 if (!ndb_binlog_running)
868 DBUG_ENTER(
"ndbcluster_reset_slave");
876 const bool ignore_no_such_table =
true;
877 if(mysqld.delete_rows(STRING_WITH_LEN(
"mysql"),
878 STRING_WITH_LEN(
"ndb_apply_status"),
879 ignore_no_such_table,
886 g_ndb_slave_state.atResetSlave();
901 static bool ndbcluster_flush_logs(
handlerton *hton)
903 ndbcluster_binlog_wait(current_thd);
908 static int ndbcluster_binlog_func(
handlerton *hton, THD *thd,
912 DBUG_ENTER(
"ndbcluster_binlog_func");
917 res= ndbcluster_reset_logs(thd);
919 case BFN_RESET_SLAVE:
920 ndbcluster_reset_slave(thd);
922 case BFN_BINLOG_WAIT:
923 ndbcluster_binlog_wait(thd);
926 res= ndbcluster_binlog_end(thd);
928 case BFN_BINLOG_PURGE_FILE:
929 res= ndbcluster_binlog_index_purge_file(thd, (
const char *)arg);
935 void ndbcluster_binlog_init_handlerton()
938 h->flush_logs= ndbcluster_flush_logs;
939 h->binlog_func= ndbcluster_binlog_func;
940 h->binlog_log_query= ndbcluster_binlog_log_query;
949 ndb_open_tables__create_key(
char* key_buf,
size_t key_buf_length,
950 const char* db,
size_t db_length,
951 const char* table,
size_t table_length)
953 size_t key_length = my_snprintf(key_buf, key_buf_length,
954 "./%*s/%*s", db_length, db,
955 table_length, table) - 1;
956 assert(key_length > 0);
957 assert(key_length < key_buf_length);
968 ndb_open_tables__is_table_open(
const char* db,
size_t db_length,
969 const char* table,
size_t table_length)
971 char key[FN_REFLEN + 1];
972 size_t key_length = ndb_open_tables__create_key(key,
sizeof(key),
974 table, table_length);
975 DBUG_ENTER(
"ndb_open_tables__is_table_open");
976 DBUG_PRINT(
"enter", (
"db: '%s', table: '%s', key: '%s'",
979 pthread_mutex_lock(&ndbcluster_mutex);
980 bool result = my_hash_search(&ndbcluster_open_tables,
983 pthread_mutex_unlock(&ndbcluster_mutex);
985 DBUG_PRINT(
"exit", (
"result: %d", result));
991 ndbcluster_check_ndb_schema_share()
993 return ndb_open_tables__is_table_open(STRING_WITH_LEN(
"mysql"),
994 STRING_WITH_LEN(
"ndb_schema"));
999 ndbcluster_check_ndb_apply_status_share()
1001 return ndb_open_tables__is_table_open(STRING_WITH_LEN(
"mysql"),
1002 STRING_WITH_LEN(
"ndb_apply_status"));
1007 create_cluster_sys_table(THD *thd,
const char* db,
size_t db_length,
1008 const char* table,
size_t table_length,
1009 const char* create_definitions,
1010 const char* create_options)
1012 if (ndb_open_tables__is_table_open(db, db_length, table, table_length))
1015 if (g_ndb_cluster_connection->get_no_ready() <= 0)
1018 if (opt_ndb_extra_logging)
1019 sql_print_information(
"NDB: Creating %s.%s", db, table);
1028 char path[FN_REFLEN + 1];
1029 build_table_filename(path,
sizeof(path) - 1,
1030 db, table, reg_ext, 0);
1031 if (my_delete(path, MYF(0)) == 0)
1040 if (opt_ndb_extra_logging)
1041 sql_print_information(
"NDB: Flushing %s.%s", db, table);
1044 (void)mysqld.flush_table(db, db_length,
1045 table, table_length);
1049 const bool create_if_not_exists =
true;
1050 const bool res = mysqld.create_sys_table(db, db_length,
1051 table, table_length,
1052 create_if_not_exists,
1060 ndb_apply_table__create(THD *thd)
1062 DBUG_ENTER(
"ndb_apply_table__create");
1066 create_cluster_sys_table(thd,
1067 STRING_WITH_LEN(
"mysql"),
1068 STRING_WITH_LEN(
"ndb_apply_status"),
1070 "server_id INT UNSIGNED NOT NULL,"
1071 "epoch BIGINT UNSIGNED NOT NULL, "
1072 "log_name VARCHAR(255) BINARY NOT NULL, "
1073 "start_pos BIGINT UNSIGNED NOT NULL, "
1074 "end_pos BIGINT UNSIGNED NOT NULL, "
1075 "PRIMARY KEY USING HASH (server_id)",
1077 "ENGINE=NDB CHARACTER SET latin1");
1083 ndb_schema_table__create(THD *thd)
1085 DBUG_ENTER(
"ndb_schema_table__create");
1089 create_cluster_sys_table(thd,
1090 STRING_WITH_LEN(
"mysql"),
1091 STRING_WITH_LEN(
"ndb_schema"),
1093 "db VARBINARY(63) NOT NULL,"
1094 "name VARBINARY(63) NOT NULL,"
1095 "slock BINARY(32) NOT NULL,"
1096 "query BLOB NOT NULL,"
1097 "node_id INT UNSIGNED NOT NULL,"
1098 "epoch BIGINT UNSIGNED NOT NULL,"
1099 "id INT UNSIGNED NOT NULL,"
1100 "version INT UNSIGNED NOT NULL,"
1101 "type INT UNSIGNED NOT NULL,"
1102 "PRIMARY KEY USING HASH (db,name)",
1104 "ENGINE=NDB CHARACTER SET latin1");
1108 class Thd_ndb_options_guard
1111 Thd_ndb_options_guard(
Thd_ndb *thd_ndb)
1112 : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
1113 ~Thd_ndb_options_guard() { m_val= m_save_val; }
1114 void set(uint32 flag) { m_val|= flag; }
1120 extern int ndb_setup_complete;
1121 extern pthread_cond_t COND_ndb_setup_complete;
1129 static void ndb_notify_tables_writable()
1131 pthread_mutex_lock(&ndbcluster_mutex);
1132 ndb_setup_complete= 1;
1133 pthread_cond_broadcast(&COND_ndb_setup_complete);
1134 pthread_mutex_unlock(&ndbcluster_mutex);
1147 static int ndbcluster_find_all_databases(THD *thd)
1149 Ndb *ndb= check_ndb_in_thd(thd);
1150 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1151 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
1156 int retry_sleep= 30;
1157 DBUG_ENTER(
"ndbcluster_find_all_databases");
1164 if (!thd_ndb->has_required_global_schema_lock(
"ndbcluster_find_all_databases"))
1168 thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
1169 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
1172 char db_buffer[FN_REFLEN];
1173 char *db= db_buffer+1;
1174 char name[FN_REFLEN];
1177 const NDBTAB *ndbtab= ndbtab_g.get_table();
1200 NdbScanOperation::SF_TupScan, 1);
1202 r|= op->
getValue(
"db", db_buffer) == NULL;
1203 r|= op->
getValue(
"name", name) == NULL;
1204 r|= (query_blob_handle= op->
getBlobHandle(
"query")) == NULL;
1205 r|= query_blob_handle->
getValue(query,
sizeof(query));
1221 unsigned db_len= db_buffer[0];
1222 unsigned name_len= name[0];
1227 if (db_len > 0 && name_len == 0)
1233 Uint64 query_length= 0;
1234 if (query_blob_handle->
getLength(query_length))
1239 query[query_length]= 0;
1240 build_table_filename(name,
sizeof(name), db,
"",
"", 0);
1241 int database_exists= !my_access(name, F_OK);
1242 if (strncasecmp(
"CREATE", query, 6) == 0)
1245 if (!database_exists)
1248 sql_print_information(
"NDB: Discovered missing database '%s'", db);
1249 const int no_print_error[1]= {0};
1250 run_query(thd, query, query + query_length,
1254 else if (strncasecmp(
"ALTER", query, 5) == 0)
1257 if (!database_exists)
1260 sql_print_information(
"NDB: Discovered missing database '%s'", db);
1261 const int no_print_error[1]= {0};
1262 name_len= my_snprintf(name,
sizeof(name),
"CREATE DATABASE %s", db);
1263 run_query(thd, name, name + name_len,
1265 run_query(thd, query, query + query_length,
1269 else if (strncasecmp(
"DROP", query, 4) == 0)
1272 if (database_exists)
1275 sql_print_information(
"NDB: Discovered reamining database '%s'", db);
1298 sql_print_warning(
"NDB: ndbcluster_find_all_databases retry: %u - %s",
1301 do_retry_sleep(retry_sleep);
1307 sql_print_error(
"NDB: ndbcluster_find_all_databases fail: %u - %s",
1317 ndb_binlog_setup(THD *thd)
1319 if (ndb_binlog_tables_inited)
1323 if (global_schema_lock_guard.lock(
false,
false))
1325 if (!ndb_schema_share &&
1326 ndbcluster_check_ndb_schema_share() == 0)
1328 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
1329 if (!ndb_schema_share)
1331 ndb_schema_table__create(thd);
1333 if (!ndb_schema_share)
1337 if (!ndb_apply_status_share &&
1338 ndbcluster_check_ndb_apply_status_share() == 0)
1340 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
1341 if (!ndb_apply_status_share)
1343 ndb_apply_table__create(thd);
1344 if (!ndb_apply_status_share)
1349 if (ndbcluster_find_all_databases(thd))
1354 if (!ndbcluster_find_all_files(thd))
1356 ndb_binlog_tables_inited= TRUE;
1357 if (ndb_binlog_tables_inited &&
1358 ndb_binlog_running && ndb_binlog_is_ready)
1360 if (opt_ndb_extra_logging)
1361 sql_print_information(
"NDB Binlog: ndb tables writable");
1362 close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
1368 ndb_notify_tables_writable();
1371 pthread_cond_signal(&injector_cond);
1382 #define SCHEMA_DB_I 0u
1383 #define SCHEMA_NAME_I 1u
1384 #define SCHEMA_SLOCK_I 2u
1385 #define SCHEMA_QUERY_I 3u
1386 #define SCHEMA_NODE_ID_I 4u
1387 #define SCHEMA_EPOCH_I 5u
1388 #define SCHEMA_ID_I 6u
1389 #define SCHEMA_VERSION_I 7u
1390 #define SCHEMA_TYPE_I 8u
1391 #define SCHEMA_SIZE 9u
1392 #define SCHEMA_SLOCK_SIZE 32u
1394 struct Cluster_schema
1401 uint32 slock[SCHEMA_SLOCK_SIZE/4];
1402 unsigned short query_length;
1413 print_could_not_discover_error(THD *thd,
1414 const Cluster_schema *schema)
1416 sql_print_error(
"NDB Binlog: Could not discover table '%s.%s' from "
1417 "binlog schema event '%s' from node %d. "
1419 schema->db, schema->name, schema->query,
1420 schema->node_id, my_errno);
1421 print_warning_list(
"NDB Binlog", thd);
1431 TABLE *table= event_data->shadow_table;
1434 uchar* blobs_buffer= 0;
1435 uint blobs_buffer_size= 0;
1436 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1438 ptrdiff_t ptrdiff= 0;
1439 int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
1440 blobs_buffer, blobs_buffer_size,
1444 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1445 DBUG_PRINT(
"info", (
"blob read error"));
1450 field= table->field;
1451 s->db_length= *(uint8*)(*field)->ptr;
1452 DBUG_ASSERT(s->db_length <= (*field)->field_length);
1453 DBUG_ASSERT((*field)->field_length + 1 ==
sizeof(s->db));
1454 memcpy(s->db, (*field)->ptr + 1, s->db_length);
1455 s->db[s->db_length]= 0;
1458 s->name_length= *(uint8*)(*field)->ptr;
1459 DBUG_ASSERT(s->name_length <= (*field)->field_length);
1460 DBUG_ASSERT((*field)->field_length + 1 ==
sizeof(s->name));
1461 memcpy(s->name, (*field)->ptr + 1, s->name_length);
1462 s->name[s->name_length]= 0;
1465 s->slock_length= (*field)->field_length;
1466 DBUG_ASSERT((*field)->field_length ==
sizeof(s->slock));
1467 memcpy(s->slock, (*field)->ptr, s->slock_length);
1472 uint blob_len= field_blob->get_length((*field)->ptr);
1474 field_blob->get_ptr(&blob_ptr);
1475 DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1476 s->query_length= blob_len;
1477 s->query= sql_strmake((
char*) blob_ptr, blob_len);
1481 s->node_id= (Uint32)((
Field_long *)*field)->val_int();
1487 s->id= (Uint32)((
Field_long *)*field)->val_int();
1490 s->version= (Uint32)((
Field_long *)*field)->val_int();
1493 s->type= (Uint32)((
Field_long *)*field)->val_int();
1495 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1496 dbug_tmp_restore_column_map(table->read_set, old_map);
1502 char *ndb_pack_varchar(
const NDBCOL *col,
char *buf,
1503 const char *str,
int sz)
1505 switch (col->getArrayType())
1507 case NDBCOL::ArrayTypeFixed:
1508 memcpy(buf, str, sz);
1510 case NDBCOL::ArrayTypeShortVar:
1511 *(uchar*)buf= (uchar)sz;
1512 memcpy(buf + 1, str, sz);
1514 case NDBCOL::ArrayTypeMediumVar:
1516 memcpy(buf + 2, str, sz);
1526 ndbcluster_update_slock(THD *thd,
1528 const char *table_name,
1530 uint32 table_version)
1532 DBUG_ENTER(
"ndbcluster_update_slock");
1533 if (!ndb_schema_share)
1539 uint32 node_id= g_ndb_cluster_connection->node_id();
1540 Ndb *ndb= check_ndb_in_thd(thd);
1541 char save_db[FN_HEADLEN];
1544 char tmp_buf[FN_REFLEN];
1548 const NDBTAB *ndbtab= ndbtab_g.get_table();
1551 int retry_sleep= 30;
1552 const NDBCOL *col[SCHEMA_SIZE];
1553 unsigned sz[SCHEMA_SIZE];
1556 uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
1557 bitmap_init(&slock, bitbuf,
sizeof(bitbuf)*8,
false);
1568 for (i= 0; i < SCHEMA_SIZE; i++)
1571 if (i != SCHEMA_QUERY_I)
1574 DBUG_ASSERT(sz[i] <=
sizeof(tmp_buf));
1589 DBUG_ASSERT(r == 0);
1591 DBUG_ASSERT(r == 0);
1594 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1595 r|= op->
equal(SCHEMA_DB_I, tmp_buf);
1596 DBUG_ASSERT(r == 0);
1598 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1599 strlen(table_name));
1600 r|= op->
equal(SCHEMA_NAME_I, tmp_buf);
1601 DBUG_ASSERT(r == 0);
1603 r|= op->
getValue(SCHEMA_SLOCK_I, (
char*)slock.bitmap) == 0;
1604 DBUG_ASSERT(r == 0);
1609 if (opt_ndb_extra_logging > 19)
1611 uint32 copy[SCHEMA_SLOCK_SIZE/4];
1612 memcpy(copy, bitbuf,
sizeof(copy));
1613 bitmap_clear_bit(&slock, node_id);
1614 sql_print_information(
"NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
1616 table_id, table_version,
1623 bitmap_clear_bit(&slock, node_id);
1632 DBUG_ASSERT(r == 0);
1634 DBUG_ASSERT(r == 0);
1637 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1638 r|= op->
equal(SCHEMA_DB_I, tmp_buf);
1639 DBUG_ASSERT(r == 0);
1641 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1642 strlen(table_name));
1643 r|= op->
equal(SCHEMA_NAME_I, tmp_buf);
1644 DBUG_ASSERT(r == 0);
1646 r|= op->
setValue(SCHEMA_SLOCK_I, (
char*)slock.bitmap);
1647 DBUG_ASSERT(r == 0);
1649 r|= op->
setValue(SCHEMA_NODE_ID_I, node_id);
1650 DBUG_ASSERT(r == 0);
1652 r|= op->
setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
1653 DBUG_ASSERT(r == 0);
1658 DBUG_PRINT(
"info", (
"node %d cleared lock on '%s.%s'",
1659 node_id, db, table_name));
1664 const NdbError *this_error= trans ?
1672 do_retry_sleep(retry_sleep);
1676 ndb_error= this_error;
1683 my_snprintf(buf,
sizeof(buf),
"Could not release lock on '%s.%s'",
1685 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
1686 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1699 static void ndb_report_waiting(
const char *key,
1705 ulonglong ndb_latest_epoch= 0;
1706 const char *proc_info=
"<no info>";
1707 pthread_mutex_lock(&injector_mutex);
1709 ndb_latest_epoch= injector_ndb->getLatestGCI();
1711 proc_info= injector_thd->proc_info;
1712 pthread_mutex_unlock(&injector_mutex);
1715 sql_print_information(
"NDB %s:"
1716 " waiting max %u sec for %s %s."
1717 " epochs: (%u/%u,%u/%u,%u/%u)"
1718 " injector proc_info: %s"
1719 ,key, the_time, op, obj
1720 ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1721 ,(uint)(ndb_latest_handled_binlog_epoch)
1722 ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1723 ,(uint)(ndb_latest_received_binlog_epoch)
1724 ,(uint)(ndb_latest_epoch >> 32)
1725 ,(uint)(ndb_latest_epoch)
1731 sql_print_information(
"NDB %s:"
1732 " waiting max %u sec for %s %s."
1733 " epochs: (%u/%u,%u/%u,%u/%u)"
1734 " injector proc_info: %s map: %x%x"
1735 ,key, the_time, op, obj
1736 ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1737 ,(uint)(ndb_latest_handled_binlog_epoch)
1738 ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1739 ,(uint)(ndb_latest_received_binlog_epoch)
1740 ,(uint)(ndb_latest_epoch >> 32)
1741 ,(uint)(ndb_latest_epoch)
1751 get_schema_type_name(uint type)
1754 case SOT_DROP_TABLE:
1755 return "DROP_TABLE";
1756 case SOT_CREATE_TABLE:
1757 return "CREATE_TABLE";
1758 case SOT_RENAME_TABLE_NEW:
1759 return "RENAME_TABLE_NEW";
1760 case SOT_ALTER_TABLE_COMMIT:
1761 return "ALTER_TABLE_COMMIT";
1768 case SOT_CLEAR_SLOCK:
1769 return "CLEAR_SLOCK";
1770 case SOT_TABLESPACE:
1771 return "TABLESPACE";
1772 case SOT_LOGFILE_GROUP:
1773 return "LOGFILE_GROUP";
1774 case SOT_RENAME_TABLE:
1775 return "RENAME_TABLE";
1776 case SOT_TRUNCATE_TABLE:
1777 return "TRUNCATE_TABLE";
1778 case SOT_RENAME_TABLE_PREPARE:
1779 return "RENAME_TABLE_PREPARE";
1780 case SOT_ONLINE_ALTER_TABLE_PREPARE:
1781 return "ONLINE_ALTER_TABLE_PREPARE";
1782 case SOT_ONLINE_ALTER_TABLE_COMMIT:
1783 return "ONLINE_ALTER_TABLE_COMMIT";
1784 case SOT_CREATE_USER:
1785 return "CREATE_USER";
1788 case SOT_RENAME_USER:
1789 return "RENAME_USER";
1798 extern void update_slave_api_stats(
Ndb*);
1800 int ndbcluster_log_schema_op(THD *thd,
1801 const char *query,
int query_length,
1802 const char *db,
const char *table_name,
1803 uint32 ndb_table_id,
1804 uint32 ndb_table_version,
1805 enum SCHEMA_OP_TYPE type,
1806 const char *new_db,
const char *new_table_name)
1808 DBUG_ENTER(
"ndbcluster_log_schema_op");
1809 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1812 if (!(thd_ndb= Thd_ndb::seize(thd)))
1814 sql_print_error(
"Could not allocate Thd_ndb object");
1817 thd_set_thd_ndb(thd, thd_ndb);
1821 (
"query: %s db: %s table_name: %s thd_ndb->options: %d",
1822 query, db, table_name, thd_ndb->options));
1823 if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1825 if (thd->slave_thread)
1826 update_slave_api_stats(thd_ndb->ndb);
1831 char tmp_buf2[FN_REFLEN];
1832 char quoted_table1[2 + 2 * FN_REFLEN + 1];
1833 char quoted_db1[2 + 2 * FN_REFLEN + 1];
1834 char quoted_db2[2 + 2 * FN_REFLEN + 1];
1835 char quoted_table2[2 + 2 * FN_REFLEN + 1];
1837 const char *type_str;
1838 int also_internal= 0;
1839 uint32 log_type= (uint32)type;
1842 case SOT_DROP_TABLE:
1844 if (thd->lex->sql_command == SQLCOM_DROP_DB)
1848 id_length= my_strmov_quoted_identifier (thd, (
char *) quoted_table1,
1850 quoted_table1[id_length]=
'\0';
1851 id_length= my_strmov_quoted_identifier (thd, (
char *) quoted_db1,
1853 quoted_db1[id_length]=
'\0';
1854 query_length= (uint) (strxmov(tmp_buf2,
"drop table ", quoted_db1,
".",
1855 quoted_table1, NullS) - tmp_buf2);
1856 type_str=
"drop table";
1858 case SOT_RENAME_TABLE_PREPARE:
1859 type_str=
"rename table prepare";
1862 case SOT_RENAME_TABLE:
1865 id_length= my_strmov_quoted_identifier (thd, (
char *) quoted_db1,
1867 quoted_db1[id_length]=
'\0';
1868 id_length= my_strmov_quoted_identifier (thd, (
char *) quoted_table1,
1870 quoted_table1[id_length]=
'\0';
1871 id_length= my_strmov_quoted_identifier (thd, (
char *) quoted_db2,
1873 quoted_db2[id_length]=
'\0';
1874 id_length= my_strmov_quoted_identifier (thd, (
char *) quoted_table2,
1876 quoted_table2[id_length]=
'\0';
1877 query_length= (uint) (strxmov(tmp_buf2,
"rename table ",
1878 quoted_db1,
".", quoted_table1,
" to ",
1879 quoted_db2,
".", quoted_table2, NullS) - tmp_buf2);
1880 type_str=
"rename table";
1882 case SOT_CREATE_TABLE:
1883 type_str=
"create table";
1885 case SOT_ALTER_TABLE_COMMIT:
1886 type_str=
"alter table";
1889 case SOT_ONLINE_ALTER_TABLE_PREPARE:
1890 type_str=
"online alter table prepare";
1893 case SOT_ONLINE_ALTER_TABLE_COMMIT:
1894 type_str=
"online alter table commit";
1898 type_str=
"drop db";
1901 type_str=
"create db";
1904 type_str=
"alter db";
1906 case SOT_TABLESPACE:
1907 type_str=
"tablespace";
1909 case SOT_LOGFILE_GROUP:
1910 type_str=
"logfile group";
1912 case SOT_TRUNCATE_TABLE:
1913 type_str=
"truncate table";
1915 case SOT_CREATE_USER:
1916 type_str=
"create user";
1919 type_str=
"drop user";
1921 case SOT_RENAME_USER:
1922 type_str=
"rename user";
1925 type_str=
"grant/revoke";
1928 type_str=
"revoke all";
1934 NDB_SCHEMA_OBJECT *ndb_schema_object;
1936 char key[FN_REFLEN + 1];
1937 build_table_filename(key,
sizeof(key) - 1, db, table_name,
"", 0);
1938 ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1939 ndb_schema_object->table_id= ndb_table_id;
1940 ndb_schema_object->table_version= ndb_table_version;
1944 uint32 node_id= g_ndb_cluster_connection->node_id();
1948 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1951 pthread_mutex_lock(&ndb_schema_share_mutex);
1952 if (ndb_schema_share == 0)
1954 pthread_mutex_unlock(&ndb_schema_share_mutex);
1955 if (ndb_schema_object)
1956 ndb_free_schema_object(&ndb_schema_object, FALSE);
1959 pthread_mutex_lock(&ndb_schema_share->mutex);
1960 for (i= 0; i < no_storage_nodes; i++)
1962 bitmap_union(&ndb_schema_object->slock_bitmap,
1963 &ndb_schema_share->subscriber_bitmap[i]);
1965 pthread_mutex_unlock(&ndb_schema_share->mutex);
1966 pthread_mutex_unlock(&ndb_schema_share_mutex);
1970 bitmap_set_bit(&ndb_schema_object->slock_bitmap, node_id);
1972 bitmap_clear_bit(&ndb_schema_object->slock_bitmap, node_id);
1974 DBUG_DUMP(
"schema_subscribers", (uchar*)&ndb_schema_object->slock,
1975 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1978 Ndb *ndb= thd_ndb->ndb;
1979 char save_db[FN_REFLEN];
1982 char tmp_buf[FN_REFLEN];
1986 const NDBTAB *ndbtab= ndbtab_g.get_table();
1989 int retry_sleep= 30;
1990 const NDBCOL *col[SCHEMA_SIZE];
1991 unsigned sz[SCHEMA_SIZE];
1995 if (strcmp(NDB_REP_DB, db) != 0 ||
1996 strcmp(NDB_SCHEMA_TABLE, table_name))
2005 for (i= 0; i < SCHEMA_SIZE; i++)
2008 if (i != SCHEMA_QUERY_I)
2011 DBUG_ASSERT(sz[i] <=
sizeof(tmp_buf));
2018 const char *log_db= db;
2020 const char *log_subscribers= (
char*)ndb_schema_object->slock;
2028 DBUG_ASSERT(r == 0);
2030 DBUG_ASSERT(r == 0);
2033 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
2034 r|= op->
equal(SCHEMA_DB_I, tmp_buf);
2035 DBUG_ASSERT(r == 0);
2037 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
2039 r|= op->
equal(SCHEMA_NAME_I, tmp_buf);
2040 DBUG_ASSERT(r == 0);
2042 DBUG_ASSERT(sz[SCHEMA_SLOCK_I] ==
2043 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2044 r|= op->
setValue(SCHEMA_SLOCK_I, log_subscribers);
2045 DBUG_ASSERT(r == 0);
2049 DBUG_ASSERT(ndb_blob != 0);
2050 uint blob_len= query_length;
2051 const char* blob_ptr= query;
2052 r|= ndb_blob->
setValue(blob_ptr, blob_len);
2053 DBUG_ASSERT(r == 0);
2056 r|= op->
setValue(SCHEMA_NODE_ID_I, node_id);
2057 DBUG_ASSERT(r == 0);
2059 r|= op->
setValue(SCHEMA_EPOCH_I, epoch);
2060 DBUG_ASSERT(r == 0);
2062 r|= op->
setValue(SCHEMA_ID_I, ndb_table_id);
2063 DBUG_ASSERT(r == 0);
2065 r|= op->
setValue(SCHEMA_VERSION_I, ndb_table_version);
2066 DBUG_ASSERT(r == 0);
2068 r|= op->
setValue(SCHEMA_TYPE_I, log_type);
2069 DBUG_ASSERT(r == 0);
2071 Uint32 anyValue = 0;
2072 if (! thd->slave_thread)
2077 if (thd_options(thd) & OPTION_BIN_LOG)
2079 DBUG_PRINT(
"info", (
"Schema event for binlogging"));
2080 ndbcluster_anyvalue_set_normal(anyValue);
2084 DBUG_PRINT(
"info", (
"Schema event not for binlogging"));
2085 ndbcluster_anyvalue_set_nologging(anyValue);
2100 DBUG_PRINT(
"info", (
"Replicated schema event with original server id %d",
2102 anyValue = thd_unmasked_server_id(thd);
2110 const char* p = getenv(
"NDB_TEST_ANYVALUE_USERDATA");
2111 if (p != 0 && *p != 0 && *p !=
'0' && *p !=
'n' && *p !=
'N')
2113 dbug_ndbcluster_anyvalue_set_userbits(anyValue);
2117 r|= op->setAnyValue(anyValue);
2118 DBUG_ASSERT(r == 0);
2124 DBUG_PRINT(
"info", (
"logged: %s", query));
2129 const NdbError *this_error= trans ?
2137 do_retry_sleep(retry_sleep);
2141 ndb_error= this_error;
2146 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
2147 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2150 "Could not log query '%s' on other mysqld's");
2156 if (opt_ndb_extra_logging > 19)
2158 sql_print_information(
"NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
2163 get_schema_type_name(log_type),
2166 ndb_schema_object->slock_bitmap.bitmap[0],
2167 ndb_schema_object->slock_bitmap.bitmap[1]);
2173 if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2175 int max_timeout= DEFAULT_SYNC_TIMEOUT;
2176 pthread_mutex_lock(&ndb_schema_object->mutex);
2181 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
2182 set_timespec(abstime, 1);
2183 int ret= pthread_cond_timedwait(&injector_cond,
2184 &ndb_schema_object->mutex,
2190 pthread_mutex_lock(&ndb_schema_share_mutex);
2191 if (ndb_schema_share == 0)
2193 pthread_mutex_unlock(&ndb_schema_share_mutex);
2197 bitmap_init(&servers, 0, 256, FALSE);
2198 bitmap_clear_all(&servers);
2199 bitmap_set_bit(&servers, node_id);
2200 pthread_mutex_lock(&ndb_schema_share->mutex);
2201 for (i= 0; i < no_storage_nodes; i++)
2204 MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[
i];
2205 bitmap_union(&servers, tmp);
2207 pthread_mutex_unlock(&ndb_schema_share->mutex);
2208 pthread_mutex_unlock(&ndb_schema_share_mutex);
2212 bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
2213 bitmap_free(&servers);
2215 if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2221 if (max_timeout == 0)
2223 sql_print_error(
"NDB %s: distributing %s timed out. Ignoring...",
2224 type_str, ndb_schema_object->key);
2228 if (opt_ndb_extra_logging)
2229 ndb_report_waiting(type_str, max_timeout,
2230 "distributing", ndb_schema_object->key,
2231 &ndb_schema_object->slock_bitmap);
2234 pthread_mutex_unlock(&ndb_schema_object->mutex);
2238 sql_print_error(
"NDB %s: distributing %s err: %u",
2239 type_str, ndb_schema_object->key,
2242 else if (opt_ndb_extra_logging > 19)
2244 sql_print_information(
"NDB %s: not waiting for distributing %s",
2245 type_str, ndb_schema_object->key);
2248 if (ndb_schema_object)
2249 ndb_free_schema_object(&ndb_schema_object, FALSE);
2251 if (opt_ndb_extra_logging > 19)
2253 sql_print_information(
"NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
2259 get_schema_type_name(log_type),
2264 if (thd->slave_thread)
2265 update_slave_api_stats(ndb);
2278 DBUG_ENTER(
"ndb_handle_schema_change");
2280 TABLE *shadow_table= event_data->shadow_table;
2281 const char *tabname= shadow_table->s->table_name.str;
2282 const char *dbname= shadow_table->s->db.str;
2283 bool do_close_cached_tables= FALSE;
2284 bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
2293 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2294 Ndb *ndb= thd_ndb->ndb;
2299 const NDBTAB *cache_tab= ndbtab_g.get_table();
2303 ndbtab_g.invalidate();
2306 pthread_mutex_lock(&share->mutex);
2307 DBUG_ASSERT(share->state == NSS_DROPPED ||
2308 share->op == pOp || share->new_op == pOp);
2320 pthread_mutex_unlock(&share->mutex);
2321 (void) pthread_cond_signal(&injector_cond);
2323 pthread_mutex_lock(&ndbcluster_mutex);
2325 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog free use_count: %u",
2326 share->key, share->use_count));
2327 free_share(&share, TRUE);
2328 if (is_remote_change && share && share->state != NSS_DROPPED)
2330 DBUG_PRINT(
"info", (
"remote change"));
2331 share->state= NSS_DROPPED;
2332 if (share->use_count != 1)
2336 do_close_cached_tables= TRUE;
2341 DBUG_PRINT(
"NDB_SHARE", (
"%s create free use_count: %u",
2342 share->key, share->use_count));
2343 free_share(&share, TRUE);
2349 pthread_mutex_unlock(&ndbcluster_mutex);
2354 pOp->setCustomData(NULL);
2357 pthread_mutex_lock(&injector_mutex);
2360 pthread_mutex_unlock(&injector_mutex);
2362 if (do_close_cached_tables)
2365 memset(&table_list, 0,
sizeof(table_list));
2366 table_list.db= (
char *)dbname;
2367 table_list.alias= table_list.table_name= (
char *)tabname;
2368 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2370 DBUG_PRINT(
"NDB_SHARE", (
"%s create free use_count: %u",
2371 share->key, share->use_count));
2377 static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
2382 if (ndbcluster_anyvalue_is_reserved(schema->any_value))
2385 if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
2386 sql_print_warning(
"NDB: unknown value for binlog signalling 0x%X, "
2392 Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
2401 Uint32 loggedServerId = schema->any_value;
2410 if (! g_ndb_log_slave_updates)
2419 ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
2422 uint32 thd_server_id_save= thd->server_id;
2423 DBUG_ASSERT(
sizeof(thd_server_id_save) ==
sizeof(thd->server_id));
2424 char *thd_db_save= thd->db;
2425 thd->server_id = loggedServerId;
2426 thd->db= schema->db;
2427 int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
2428 thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
2429 schema->query_length, FALSE,
2430 #ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
2433 schema->name[0] == 0 || thd->db[0] == 0,
2435 thd->server_id= thd_server_id_save;
2436 thd->db= thd_db_save;
2440 ndb_binlog_thread_handle_schema_event(THD *thd,
Ndb *s_ndb,
2443 *post_epoch_log_list,
2445 *post_epoch_unlock_list,
2448 DBUG_ENTER(
"ndb_binlog_thread_handle_schema_event");
2450 NDB_SHARE *tmp_share= event_data->share;
2451 if (tmp_share && ndb_schema_share == tmp_share)
2454 DBUG_PRINT(
"enter", (
"%s.%s ev_type: %d",
2455 tmp_share->db, tmp_share->table_name, ev_type));
2459 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2460 Ndb *ndb= thd_ndb->ndb;
2462 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
2463 Cluster_schema *schema= (Cluster_schema *)
2464 sql_alloc(
sizeof(Cluster_schema));
2466 bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
2467 uint node_id= g_ndb_cluster_connection->node_id();
2469 ndbcluster_get_schema(event_data, schema);
2472 enum SCHEMA_OP_TYPE schema_type= (
enum SCHEMA_OP_TYPE)schema->type;
2474 (
"%s.%s: log query_length: %d query: '%s' type: %d",
2475 schema->db, schema->name,
2476 schema->query_length, schema->query,
2479 if (opt_ndb_extra_logging > 19)
2481 sql_print_information(
"NDB: got schema event on %s.%s(%u/%u) query: '%s' type: %s(%d) node: %u slock: %x%x",
2482 schema->db, schema->name,
2483 schema->id, schema->version,
2485 get_schema_type_name(schema_type),
2488 slock.bitmap[0], slock.bitmap[1]);
2491 if ((schema->db[0] == 0) && (schema->name[0] == 0))
2493 switch (schema_type)
2495 case SOT_CLEAR_SLOCK:
2501 post_epoch_log_list->push_back(schema, mem_root);
2503 case SOT_ALTER_TABLE_COMMIT:
2505 case SOT_RENAME_TABLE_PREPARE:
2507 case SOT_ONLINE_ALTER_TABLE_PREPARE:
2509 case SOT_ONLINE_ALTER_TABLE_COMMIT:
2510 post_epoch_log_list->push_back(schema, mem_root);
2511 post_epoch_unlock_list->push_back(schema, mem_root);
2518 if (schema->node_id != node_id)
2520 int log_query= 0, post_epoch_unlock= 0;
2521 char errmsg[MYSQL_ERRMSG_SIZE];
2523 switch (schema_type)
2525 case SOT_RENAME_TABLE:
2527 case SOT_RENAME_TABLE_NEW:
2529 uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
2530 "NDB Binlog: Skipping renaming locally "
2531 "defined table '%s.%s' from binlog schema "
2532 "event '%s' from node %d. ",
2533 schema->db, schema->name, schema->query,
2538 case SOT_DROP_TABLE:
2539 if (schema_type == SOT_DROP_TABLE)
2541 uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
2542 "NDB Binlog: Skipping dropping locally "
2543 "defined table '%s.%s' from binlog schema "
2544 "event '%s' from node %d. ",
2545 schema->db, schema->name, schema->query,
2549 if (! ndbcluster_check_if_local_table(schema->db, schema->name))
2551 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2552 const int no_print_error[2]=
2553 {ER_BAD_TABLE_ERROR, 0};
2554 run_query(thd, schema->query,
2555 schema->query + schema->query_length,
2558 post_epoch_log_list->push_back(schema, mem_root);
2560 post_epoch_unlock= 1;
2565 DBUG_PRINT(
"info", (
"%s", errmsg));
2566 sql_print_error(
"%s", errmsg);
2570 case SOT_TRUNCATE_TABLE:
2572 char key[FN_REFLEN + 1];
2573 build_table_filename(key,
sizeof(key) - 1,
2574 schema->db, schema->name,
"", 0);
2576 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2579 DBUG_PRINT(
"NDB_SHARE", (
"%s temporary use_count: %u",
2580 share->key, share->use_count));
2583 if (!share || !share->op)
2588 ndbtab_g.invalidate();
2591 memset(&table_list, 0,
sizeof(table_list));
2592 table_list.db= schema->db;
2593 table_list.alias= table_list.table_name= schema->name;
2594 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2599 DBUG_PRINT(
"NDB_SHARE", (
"%s temporary free use_count: %u",
2600 share->key, share->use_count));
2604 if (schema_type != SOT_TRUNCATE_TABLE)
2607 case SOT_CREATE_TABLE:
2608 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2609 if (ndbcluster_check_if_local_table(schema->db, schema->name))
2611 DBUG_PRINT(
"info", (
"NDB Binlog: Skipping locally defined table '%s.%s'",
2612 schema->db, schema->name));
2613 sql_print_error(
"NDB Binlog: Skipping locally defined table '%s.%s' from "
2614 "binlog schema event '%s' from node %d. ",
2615 schema->db, schema->name, schema->query,
2618 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2620 print_could_not_discover_error(thd, schema);
2626 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2627 if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
2629 const int no_print_error[1]= {0};
2630 run_query(thd, schema->query,
2631 schema->query + schema->query_length,
2634 post_epoch_log_list->push_back(schema, mem_root);
2636 post_epoch_unlock= 1;
2641 sql_print_error(
"NDB Binlog: Skipping drop database '%s' since it contained local tables "
2642 "binlog schema event '%s' from node %d. ",
2643 schema->db, schema->query,
2649 if (opt_ndb_extra_logging > 9)
2650 sql_print_information(
"SOT_CREATE_DB %s", schema->db);
2655 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2656 const int no_print_error[1]= {0};
2657 run_query(thd, schema->query,
2658 schema->query + schema->query_length,
2663 case SOT_CREATE_USER:
2665 case SOT_RENAME_USER:
2669 if (opt_ndb_extra_logging > 9)
2670 sql_print_information(
"Got dist_priv event: %s, "
2671 "flushing privileges",
2672 get_schema_type_name(schema_type));
2674 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2675 const int no_print_error[1]= {0};
2676 char *cmd= (
char *)
"flush privileges";
2683 case SOT_TABLESPACE:
2684 case SOT_LOGFILE_GROUP:
2687 case SOT_ALTER_TABLE_COMMIT:
2688 case SOT_RENAME_TABLE_PREPARE:
2689 case SOT_ONLINE_ALTER_TABLE_PREPARE:
2690 case SOT_ONLINE_ALTER_TABLE_COMMIT:
2691 case SOT_CLEAR_SLOCK:
2694 if (log_query && ndb_binlog_running)
2695 ndb_binlog_query(thd, schema);
2697 DBUG_DUMP(
"slock", (uchar*) schema->slock, schema->slock_length);
2698 if (bitmap_is_set(&slock, node_id))
2700 if (post_epoch_unlock)
2701 post_epoch_unlock_list->push_back(schema, mem_root);
2703 ndbcluster_update_slock(thd, schema->db, schema->name,
2704 schema->id, schema->version);
2718 if (opt_ndb_extra_logging)
2719 sql_print_information(
"NDB Binlog: cluster failure for %s at epoch %u/%u.",
2720 ndb_schema_share->key,
2721 (uint)(pOp->
getGCI() >> 32),
2725 if (opt_ndb_extra_logging &&
2726 ndb_binlog_tables_inited && ndb_binlog_running)
2727 sql_print_information(
"NDB Binlog: ndb tables initially "
2728 "read only on reconnect.");
2731 pthread_mutex_lock(&ndb_schema_share_mutex);
2733 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra free use_count: %u",
2734 ndb_schema_share->key,
2735 ndb_schema_share->use_count));
2736 free_share(&ndb_schema_share);
2737 ndb_schema_share= 0;
2738 ndb_binlog_tables_inited= FALSE;
2739 ndb_binlog_is_ready= FALSE;
2740 pthread_mutex_unlock(&ndb_schema_share_mutex);
2743 close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
2746 ndb_handle_schema_change(thd, s_ndb, pOp, event_data);
2750 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2751 DBUG_ASSERT(node_id != 0xFF);
2752 pthread_mutex_lock(&tmp_share->mutex);
2753 bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
2754 DBUG_PRINT(
"info",(
"NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
2755 if (opt_ndb_extra_logging)
2757 sql_print_information(
"NDB Binlog: Node: %d, down,"
2758 " Subscriber bitmask %x%x",
2759 pOp->getNdbdNodeId(),
2760 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2761 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2763 pthread_mutex_unlock(&tmp_share->mutex);
2764 (void) pthread_cond_signal(&injector_cond);
2769 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2770 uint8 req_id= pOp->getReqNodeId();
2771 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2772 pthread_mutex_lock(&tmp_share->mutex);
2773 bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2774 DBUG_PRINT(
"info",(
"SUBSCRIBE[%d] %d", node_id, req_id));
2775 if (opt_ndb_extra_logging)
2777 sql_print_information(
"NDB Binlog: Node: %d, subscribe from node %d,"
2778 " Subscriber bitmask %x%x",
2779 pOp->getNdbdNodeId(),
2781 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2782 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2784 pthread_mutex_unlock(&tmp_share->mutex);
2785 (void) pthread_cond_signal(&injector_cond);
2790 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2791 uint8 req_id= pOp->getReqNodeId();
2792 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2793 pthread_mutex_lock(&tmp_share->mutex);
2794 bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2795 DBUG_PRINT(
"info",(
"UNSUBSCRIBE[%d] %d", node_id, req_id));
2796 if (opt_ndb_extra_logging)
2798 sql_print_information(
"NDB Binlog: Node: %d, unsubscribe from node %d,"
2799 " Subscriber bitmask %x%x",
2800 pOp->getNdbdNodeId(),
2802 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2803 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2805 pthread_mutex_unlock(&tmp_share->mutex);
2806 (void) pthread_cond_signal(&injector_cond);
2810 sql_print_error(
"NDB Binlog: unknown non data event %d for %s. "
2811 "Ignoring...", (
unsigned) ev_type, tmp_share->key);
2822 ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2824 *post_epoch_log_list,
2826 *post_epoch_unlock_list)
2828 if (post_epoch_log_list->elements == 0)
2830 DBUG_ENTER(
"ndb_binlog_thread_handle_schema_event_post_epoch");
2831 Cluster_schema *schema;
2832 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2833 Ndb *ndb= thd_ndb->ndb;
2835 while ((schema= post_epoch_log_list->pop()))
2837 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
2839 (
"%s.%s: log query_length: %d query: '%s' type: %d",
2840 schema->db, schema->name,
2841 schema->query_length, schema->query,
2845 enum SCHEMA_OP_TYPE schema_type= (
enum SCHEMA_OP_TYPE)schema->type;
2846 char key[FN_REFLEN + 1];
2847 build_table_filename(key,
sizeof(key) - 1, schema->db, schema->name,
"", 0);
2848 if (schema_type == SOT_CLEAR_SLOCK)
2850 pthread_mutex_lock(&ndbcluster_mutex);
2851 NDB_SCHEMA_OBJECT *ndb_schema_object=
2852 (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
2853 (
const uchar*) key, strlen(key));
2854 if (ndb_schema_object &&
2855 (ndb_schema_object->table_id == schema->id &&
2856 ndb_schema_object->table_version == schema->version))
2858 pthread_mutex_lock(&ndb_schema_object->mutex);
2859 if (opt_ndb_extra_logging > 19)
2861 sql_print_information(
"NDB: CLEAR_SLOCK key: %s(%u/%u) from"
2863 key, schema->id, schema->version,
2864 ndb_schema_object->slock[0],
2865 ndb_schema_object->slock[1],
2869 memcpy(ndb_schema_object->slock, schema->slock,
2870 sizeof(ndb_schema_object->slock));
2871 DBUG_DUMP(
"ndb_schema_object->slock_bitmap.bitmap",
2872 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2873 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2874 pthread_mutex_unlock(&ndb_schema_object->mutex);
2875 pthread_cond_signal(&injector_cond);
2877 else if (opt_ndb_extra_logging > 19)
2879 if (ndb_schema_object == 0)
2881 sql_print_information(
"NDB: Discarding event...no obj: %s (%u/%u)",
2882 key, schema->id, schema->version);
2886 sql_print_information(
"NDB: Discarding event...key: %s "
2887 "non matching id/version [%u/%u] != [%u/%u]",
2889 ndb_schema_object->table_id,
2890 ndb_schema_object->table_version,
2895 pthread_mutex_unlock(&ndbcluster_mutex);
2899 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2902 DBUG_PRINT(
"NDB_SHARE", (
"%s temporary use_count: %u",
2903 share->key, share->use_count));
2905 switch (schema_type)
2910 case SOT_DROP_TABLE:
2911 if (opt_ndb_extra_logging > 9)
2912 sql_print_information(
"SOT_DROP_TABLE %s.%s", schema->db, schema->name);
2917 ndbtab_g.invalidate();
2921 memset(&table_list, 0,
sizeof(table_list));
2922 table_list.db= schema->db;
2923 table_list.alias= table_list.table_name= schema->name;
2924 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2927 case SOT_RENAME_TABLE:
2928 if (opt_ndb_extra_logging > 9)
2929 sql_print_information(
"SOT_RENAME_TABLE %s.%s", schema->db, schema->name);
2933 ndbcluster_rename_share(thd, share);
2936 case SOT_RENAME_TABLE_PREPARE:
2937 if (opt_ndb_extra_logging > 9)
2938 sql_print_information(
"SOT_RENAME_TABLE_PREPARE %s.%s -> %s",
2939 schema->db, schema->name, schema->query);
2941 schema->node_id != g_ndb_cluster_connection->node_id())
2942 ndbcluster_prepare_rename_share(share, schema->query);
2944 case SOT_ALTER_TABLE_COMMIT:
2945 if (opt_ndb_extra_logging > 9)
2946 sql_print_information(
"SOT_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
2947 if (schema->node_id == g_ndb_cluster_connection->node_id())
2953 ndbtab_g.invalidate();
2957 memset(&table_list, 0,
sizeof(table_list));
2958 table_list.db= schema->db;
2959 table_list.alias= table_list.table_name= schema->name;
2960 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2969 share->op->setCustomData(NULL);
2986 DBUG_PRINT(
"NDB_SHARE", (
"%s early free, use_count: %u",
2987 share->key, share->use_count));
2992 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2993 if (ndbcluster_check_if_local_table(schema->db, schema->name) &&
2994 !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
2997 sql_print_error(
"NDB Binlog: Skipping locally defined table '%s.%s' "
2998 "from binlog schema event '%s' from node %d.",
2999 schema->db, schema->name, schema->query,
3002 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
3004 print_could_not_discover_error(thd, schema);
3008 case SOT_ONLINE_ALTER_TABLE_PREPARE:
3010 if (opt_ndb_extra_logging > 9)
3011 sql_print_information(
"SOT_ONLINE_ALTER_TABLE_PREPARE %s.%s", schema->db, schema->name);
3016 ndbtab_g.get_table();
3017 ndbtab_g.invalidate();
3020 const NDBTAB *ndbtab= ndbtab_g.get_table();
3026 memset(&table_list, 0,
sizeof(table_list));
3027 table_list.db= (
char *)schema->db;
3028 table_list.alias= table_list.table_name= (
char *)schema->name;
3029 close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
3031 if (schema->node_id != g_ndb_cluster_connection->node_id())
3033 char key[FN_REFLEN];
3034 uchar *data= 0, *pack_data= 0;
3035 size_t length, pack_length;
3037 DBUG_PRINT(
"info", (
"Detected frm change of table %s.%s",
3038 schema->db, schema->name));
3040 build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
3046 if (!ndbcluster_check_if_local_table(schema->db, schema->name) &&
3047 readfrm(key, &data, &length) == 0 &&
3048 packfrm(data, length, &pack_data, &pack_length) == 0 &&
3049 cmp_frm(ndbtab, pack_data, pack_length))
3051 DBUG_DUMP(
"frm", (uchar*) ndbtab->
getFrmData(),
3052 ndbtab->getFrmLength());
3053 my_free((
char*)data, MYF(MY_ALLOW_ZERO_PTR));
3055 if ((error= unpackfrm(&data, &length,
3057 (error= writefrm(key, data, length)))
3059 sql_print_error(
"NDB: Failed write frm for %s.%s, error %d",
3060 schema->db, schema->name, error);
3063 my_free((
char*)data, MYF(MY_ALLOW_ZERO_PTR));
3064 my_free((
char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
3068 if (opt_ndb_extra_logging > 9)
3069 sql_print_information(
"NDB Binlog: handeling online alter/rename");
3071 pthread_mutex_lock(&share->mutex);
3072 ndbcluster_binlog_close_table(thd, share);
3074 if ((error= ndb_binlog_open_shadow_table(thd, share)))
3075 sql_print_error(
"NDB Binlog: Failed to re-open shadow table %s.%s",
3076 schema->db, schema->name);
3078 pthread_mutex_unlock(&share->mutex);
3080 if (!error && share)
3082 if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
3083 share->flags|= NSF_HIDDEN_PK;
3087 if (share->event_data->shadow_table->s->blob_fields != 0)
3088 share->flags|= NSF_BLOB_FLAG;
3093 String event_name(INJECTOR_EVENT_LEN);
3094 ndb_rep_event_name(&event_name, schema->db, schema->name,
3095 get_binlog_full(share));
3100 if (ndbcluster_create_event_ops(thd, share, ndbtab, event_name.c_ptr()))
3102 sql_print_error(
"NDB Binlog:"
3103 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
3104 event_name.c_ptr());
3108 share->new_op= share->op;
3111 pthread_mutex_unlock(&share->mutex);
3113 if (opt_ndb_extra_logging > 9)
3114 sql_print_information(
"NDB Binlog: handeling online alter/rename done");
3118 case SOT_ONLINE_ALTER_TABLE_COMMIT:
3120 if (opt_ndb_extra_logging > 9)
3121 sql_print_information(
"SOT_ONLINE_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
3124 pthread_mutex_lock(&share->mutex);
3125 if (share->op && share->new_op)
3130 share->op->setCustomData(NULL);
3135 share->op= share->new_op;
3139 pthread_mutex_unlock(&share->mutex);
3143 case SOT_RENAME_TABLE_NEW:
3144 if (opt_ndb_extra_logging > 9)
3145 sql_print_information(
"SOT_RENAME_TABLE_NEW %s.%s", schema->db, schema->name);
3147 if (ndb_binlog_running && (!share || !share->op))
3156 DBUG_PRINT(
"NDB_SHARE", (
"%s temporary free use_count: %u",
3157 share->key, share->use_count));
3161 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3162 if (ndbcluster_check_if_local_table(schema->db, schema->name))
3164 DBUG_PRINT(
"info", (
"NDB Binlog: Skipping locally defined table '%s.%s'",
3165 schema->db, schema->name));
3166 sql_print_error(
"NDB Binlog: Skipping locally defined table '%s.%s' from "
3167 "binlog schema event '%s' from node %d. ",
3168 schema->db, schema->name, schema->query,
3171 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
3173 print_could_not_discover_error(thd, schema);
3183 DBUG_PRINT(
"NDB_SHARE", (
"%s temporary free use_count: %u",
3184 share->key, share->use_count));
3189 if (ndb_binlog_running && log_query)
3190 ndb_binlog_query(thd, schema);
3192 while ((schema= post_epoch_unlock_list->pop()))
3194 ndbcluster_update_slock(thd, schema->db, schema->name,
3195 schema->id, schema->version);
3214 struct ndb_binlog_index_row {
3216 const char *master_log_file;
3217 ulonglong master_log_pos;
3223 ulong orig_server_id;
3224 ulonglong orig_epoch;
3228 struct ndb_binlog_index_row *next;
3236 ndb_binlog_index_table__open(THD *thd,
3237 TABLE **ndb_binlog_index)
3239 const char *save_proc_info=
3240 thd_proc_info(thd,
"Opening " NDB_REP_DB
"." NDB_REP_TABLE);
3244 STRING_WITH_LEN(NDB_REP_TABLE),
3249 tables.required_type= FRMTYPE_TABLE;
3251 const bool derived =
false;
3253 MYSQL_LOCK_IGNORE_TIMEOUT;
3257 sql_print_error(
"NDB Binlog: Opening ndb_binlog_index: killed");
3259 sql_print_error(
"NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
3260 thd->get_stmt_da()->sql_errno(),
3261 thd->get_stmt_da()->message());
3262 thd_proc_info(thd, save_proc_info);
3265 *ndb_binlog_index= tables.table;
3266 thd_proc_info(thd, save_proc_info);
3275 ndb_binlog_index_table__write_rows(THD *thd,
3276 ndb_binlog_index_row *row)
3279 ndb_binlog_index_row *first= row;
3280 TABLE *ndb_binlog_index= 0;
3286 assert(!thd->is_error());
3293 tmp_disable_binlog(thd);
3295 if (ndb_binlog_index_table__open(thd, &ndb_binlog_index))
3297 sql_print_error(
"NDB Binlog: Unable to open ndb_binlog_index table");
3299 goto add_ndb_binlog_index_err;
3303 ndb_binlog_index->use_all_columns();
3307 ulonglong epoch= 0, orig_epoch= 0;
3308 uint orig_server_id= 0;
3311 empty_record(ndb_binlog_index);
3313 ndb_binlog_index->field[0]->store(first->master_log_pos,
true);
3314 ndb_binlog_index->field[1]->store(first->master_log_file,
3315 strlen(first->master_log_file),
3317 ndb_binlog_index->field[2]->store(epoch= first->epoch,
true);
3318 if (ndb_binlog_index->s->fields > 7)
3320 ndb_binlog_index->field[3]->store(row->n_inserts,
true);
3321 ndb_binlog_index->field[4]->store(row->n_updates,
true);
3322 ndb_binlog_index->field[5]->store(row->n_deletes,
true);
3323 ndb_binlog_index->field[6]->store(row->n_schemaops,
true);
3324 ndb_binlog_index->field[7]->store(orig_server_id= row->orig_server_id,
true);
3325 ndb_binlog_index->field[8]->store(orig_epoch= row->orig_epoch,
true);
3326 ndb_binlog_index->field[9]->store(first->gci,
true);
3331 while ((row= row->next))
3333 first->n_inserts+= row->n_inserts;
3334 first->n_updates+= row->n_updates;
3335 first->n_deletes+= row->n_deletes;
3336 first->n_schemaops+= row->n_schemaops;
3338 ndb_binlog_index->field[3]->store((ulonglong)first->n_inserts,
true);
3339 ndb_binlog_index->field[4]->store((ulonglong)first->n_updates,
true);
3340 ndb_binlog_index->field[5]->store((ulonglong)first->n_deletes,
true);
3341 ndb_binlog_index->field[6]->store((ulonglong)first->n_schemaops,
true);
3344 if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
3347 if (ndb_binlog_index->s->fields > 7)
3348 my_snprintf(tmp,
sizeof(tmp),
"%u/%u,%u,%u/%u",
3349 uint(epoch >> 32), uint(epoch),
3351 uint(orig_epoch >> 32), uint(orig_epoch));
3354 my_snprintf(tmp,
sizeof(tmp),
"%u/%u", uint(epoch >> 32), uint(epoch));
3355 sql_print_error(
"NDB Binlog: Writing row (%s) to ndb_binlog_index: %d",
3358 goto add_ndb_binlog_index_err;
3362 add_ndb_binlog_index_err:
3367 thd->get_stmt_da()->set_overwrite_status(
true);
3368 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
3369 thd->get_stmt_da()->set_overwrite_status(
false);
3372 close_thread_tables(thd);
3378 DBUG_ASSERT(! thd->transaction_rollback_request);
3381 thd->mdl_context.release_transactional_locks();
3383 reenable_binlog(thd);
3391 pthread_handler_t ndb_binlog_thread_func(
void *arg);
3393 int ndbcluster_binlog_start()
3395 DBUG_ENTER(
"ndbcluster_binlog_start");
3397 if (::server_id == 0)
3399 sql_print_warning(
"NDB: server id set to zero - changes logged to "
3400 "bin log with server id zero will be logged with "
3401 "another server id by slave mysqlds");
3408 if ((::server_id & 0x1 << 31) ||
3409 !ndbcluster_anyvalue_is_serverid_in_range(::server_id))
3411 sql_print_error(
"NDB: server id provided is too large to be represented in "
3412 "opt_server_id_bits or is reserved");
3416 pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
3417 pthread_cond_init(&injector_cond, NULL);
3418 pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
3421 if (pthread_create(&ndb_binlog_thread, &connection_attrib,
3422 ndb_binlog_thread_func, 0))
3424 DBUG_PRINT(
"error", (
"Could not create ndb injector thread"));
3425 pthread_cond_destroy(&injector_cond);
3426 pthread_mutex_destroy(&injector_mutex);
3430 ndbcluster_binlog_inited= 1;
3433 pthread_mutex_lock(&injector_mutex);
3434 while (!ndb_binlog_thread_running)
3435 pthread_cond_wait(&injector_cond, &injector_mutex);
3436 pthread_mutex_unlock(&injector_mutex);
3438 if (ndb_binlog_thread_running < 0)
3450 ndb_rep_event_name(
String *event_name,
const char *db,
const char *tbl,
3454 event_name->set_ascii(
"REPLF$", 6);
3456 event_name->set_ascii(
"REPL$", 5);
3457 event_name->append(db);
3466 backslash_sep.set_ascii(
"\\",1);
3469 if((bsloc= event_name->strstr(backslash_sep,0))!=-1)
3470 event_name->replace(bsloc, 1,
"/", 1);
3474 event_name->append(
'/');
3475 event_name->append(tbl);
3477 DBUG_PRINT(
"info", (
"ndb_rep_event_name: %s", event_name->c_ptr()));
3480 #ifdef HAVE_NDB_BINLOG
3483 Ndb_binlog_type ndb_binlog_type)
3485 DBUG_ENTER(
"set_binlog_flags");
3486 switch (ndb_binlog_type)
3488 case NBT_NO_LOGGING:
3489 DBUG_PRINT(
"info", (
"NBT_NO_LOGGING"));
3490 set_binlog_nologging(share);
3493 DBUG_PRINT(
"info", (
"NBT_DEFAULT"));
3494 if (opt_ndb_log_updated_only)
3496 set_binlog_updated_only(share);
3500 set_binlog_full(share);
3502 if (opt_ndb_log_update_as_write)
3504 set_binlog_use_write(share);
3508 set_binlog_use_update(share);
3511 case NBT_UPDATED_ONLY:
3512 DBUG_PRINT(
"info", (
"NBT_UPDATED_ONLY"));
3513 set_binlog_updated_only(share);
3514 set_binlog_use_write(share);
3516 case NBT_USE_UPDATE:
3517 DBUG_PRINT(
"info", (
"NBT_USE_UPDATE"));
3518 case NBT_UPDATED_ONLY_USE_UPDATE:
3519 DBUG_PRINT(
"info", (
"NBT_UPDATED_ONLY_USE_UPDATE"));
3520 set_binlog_updated_only(share);
3521 set_binlog_use_update(share);
3524 DBUG_PRINT(
"info", (
"NBT_FULL"));
3525 set_binlog_full(share);
3526 set_binlog_use_write(share);
3528 case NBT_FULL_USE_UPDATE:
3529 DBUG_PRINT(
"info", (
"NBT_FULL_USE_UPDATE"));
3530 set_binlog_full(share);
3531 set_binlog_use_update(share);
3534 set_binlog_logging(share);
3539 inline void slave_reset_conflict_fn(
NDB_SHARE *share)
3541 NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
3544 memset(cfn_share, 0,
sizeof(*cfn_share));
3549 slave_check_resolve_col_type(
const NDBTAB *ndbtab,
3552 DBUG_ENTER(
"slave_check_resolve_col_type");
3559 DBUG_PRINT(
"info", (
"resolve column Uint32 %u",
3564 DBUG_PRINT(
"info", (
"resolve column Uint64 %u",
3568 DBUG_PRINT(
"info", (
"resolve column %u has wrong type",
3576 slave_set_resolve_fn(THD *thd,
NDB_SHARE *share,
3577 const NDBTAB *ndbtab, uint field_index,
3578 uint resolve_col_sz,
3579 const st_conflict_fn_def* conflict_fn,
3583 DBUG_ENTER(
"slave_set_resolve_fn");
3585 Thd_ndb *thd_ndb= get_thd_ndb(thd);
3586 Ndb *ndb= thd_ndb->ndb;
3588 NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
3589 if (cfn_share == NULL)
3591 share->m_cfn_share= cfn_share= (NDB_CONFLICT_FN_SHARE*)
3592 alloc_root(&share->mem_root,
sizeof(NDB_CONFLICT_FN_SHARE));
3593 slave_reset_conflict_fn(share);
3595 cfn_share->m_conflict_fn= conflict_fn;
3598 cfn_share->m_resolve_size= resolve_col_sz;
3599 cfn_share->m_resolve_column= field_index;
3600 cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
3602 cfn_share->m_flags =
flags;
3606 char ex_tab_name[FN_REFLEN];
3607 strxnmov(ex_tab_name,
sizeof(ex_tab_name), share->table_name,
3608 lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
3609 NDB_EXCEPTIONS_TABLE_SUFFIX, NullS);
3612 const NDBTAB *ex_tab= ndbtab_g.get_table();
3615 const int fixed_cols= 4;
3636 for (i= k= 0; i < ncol && k < nkey; i++)
3650 cfn_share->m_offset[k]=
3651 (uint16)(table->field[i]->ptr - table->record[0]);
3657 cfn_share->m_ex_tab= ex_tab;
3658 cfn_share->m_pk_cols= nkey;
3660 if (opt_ndb_extra_logging)
3661 sql_print_information(
"NDB Slave: Table %s.%s logging exceptions to %s.%s",
3663 table->s->table_name.str,
3668 sql_print_warning(
"NDB Slave: exceptions table %s has wrong "
3669 "definition (column %d)",
3670 ex_tab_name, fixed_cols + k);
3673 sql_print_warning(
"NDB Slave: exceptions table %s has wrong "
3674 "definition (initial %d columns)",
3675 ex_tab_name, fixed_cols);
3695 row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
3696 enum_conflicting_op_type op_type,
3697 const uchar* old_data,
3698 const uchar* new_data,
3702 DBUG_ENTER(
"row_conflict_fn_old");
3703 uint32 resolve_column= cfn_share->m_resolve_column;
3704 uint32 resolve_size= cfn_share->m_resolve_size;
3705 const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
3707 assert((resolve_size == 4) || (resolve_size == 8));
3709 if (unlikely(!bitmap_is_set(write_set, resolve_column)))
3711 sql_print_information(
"NDB Slave: missing data for %s",
3712 cfn_share->m_conflict_fn->name);
3716 const uint label_0= 0;
3717 const Uint32 RegOldValue= 1, RegCurrentValue= 2;
3721 (
"Adding interpreted filter, existing value must eq event old value"));
3726 uint32 old_value_32;
3727 uint64 old_value_64;
3730 if (resolve_size == 4)
3732 memcpy(&old_value_32, field_ptr, resolve_size);
3733 DBUG_PRINT(
"info", (
" old_value_32: %u", old_value_32));
3737 memcpy(&old_value_64, field_ptr, resolve_size);
3738 DBUG_PRINT(
"info", (
" old_value_64: %llu",
3739 (
unsigned long long) old_value_64));
3746 if (resolve_size == 4)
3747 r= code->load_const_u32(RegOldValue, old_value_32);
3749 r= code->load_const_u64(RegOldValue, old_value_64);
3750 DBUG_ASSERT(r == 0);
3751 r= code->read_attr(RegCurrentValue, resolve_column);
3752 DBUG_ASSERT(r == 0);
3757 r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
3758 DBUG_ASSERT(r == 0);
3759 r= code->interpret_exit_nok(error_conflict_fn_violation);
3760 DBUG_ASSERT(r == 0);
3761 r= code->def_label(label_0);
3762 DBUG_ASSERT(r == 0);
3763 r= code->interpret_exit_ok();
3764 DBUG_ASSERT(r == 0);
3766 DBUG_ASSERT(r == 0);
3771 row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
3772 enum_conflicting_op_type op_type,
3773 const uchar* old_data,
3774 const uchar* new_data,
3778 DBUG_ENTER(
"row_conflict_fn_max_update_only");
3779 uint32 resolve_column= cfn_share->m_resolve_column;
3780 uint32 resolve_size= cfn_share->m_resolve_size;
3781 const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
3783 assert((resolve_size == 4) || (resolve_size == 8));
3785 if (unlikely(!bitmap_is_set(write_set, resolve_column)))
3787 sql_print_information(
"NDB Slave: missing data for %s",
3788 cfn_share->m_conflict_fn->name);
3792 const uint label_0= 0;
3793 const Uint32 RegNewValue= 1, RegCurrentValue= 2;
3797 (
"Adding interpreted filter, existing value must be lt event new"));
3802 uint32 new_value_32;
3803 uint64 new_value_64;
3806 if (resolve_size == 4)
3808 memcpy(&new_value_32, field_ptr, resolve_size);
3809 DBUG_PRINT(
"info", (
" new_value_32: %u", new_value_32));
3813 memcpy(&new_value_64, field_ptr, resolve_size);
3814 DBUG_PRINT(
"info", (
" new_value_64: %llu",
3815 (
unsigned long long) new_value_64));
3821 if (resolve_size == 4)
3822 r= code->load_const_u32(RegNewValue, new_value_32);
3824 r= code->load_const_u64(RegNewValue, new_value_64);
3825 DBUG_ASSERT(r == 0);
3826 r= code->read_attr(RegCurrentValue, resolve_column);
3827 DBUG_ASSERT(r == 0);
3832 r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
3833 DBUG_ASSERT(r == 0);
3834 r= code->interpret_exit_nok(error_conflict_fn_violation);
3835 DBUG_ASSERT(r == 0);
3836 r= code->def_label(label_0);
3837 DBUG_ASSERT(r == 0);
3838 r= code->interpret_exit_ok();
3839 DBUG_ASSERT(r == 0);
3841 DBUG_ASSERT(r == 0);
3858 row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
3859 enum_conflicting_op_type op_type,
3860 const uchar* old_data,
3861 const uchar* new_data,
3871 return row_conflict_fn_max_update_only(cfn_share,
3882 return row_conflict_fn_old(cfn_share,
3910 row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
3911 enum_conflicting_op_type op_type,
3912 const uchar* old_data,
3913 const uchar* new_data,
3923 return row_conflict_fn_max_update_only(cfn_share,
3947 row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
3948 enum_conflicting_op_type op_type,
3949 const uchar* old_data,
3950 const uchar* new_data,
3954 DBUG_ENTER(
"row_conflict_fn_epoch");
3963 const uint label_0= 0;
3965 RegAuthor= 1, RegZero= 2,
3966 RegMaxRepEpoch= 1, RegRowEpoch= 2;
3969 r= code->load_const_u32(RegZero, 0);
3971 r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
3974 r= code->branch_ne(RegZero, RegAuthor, label_0);
3980 r= code->load_const_u64(RegMaxRepEpoch, g_ndb_slave_state.max_rep_epoch);
3982 r= code->read_attr(RegRowEpoch, NdbDictionary::Column::ROW_GCI64);
3989 r= code->branch_le(RegRowEpoch, RegMaxRepEpoch, label_0);
3991 r= code->interpret_exit_nok(error_conflict_fn_violation);
3993 r= code->def_label(label_0);
3995 r= code->interpret_exit_ok();
4007 static const st_conflict_fn_arg_def resolve_col_args[]=
4010 { CFAT_COLUMN_NAME,
false },
4014 static const st_conflict_fn_arg_def epoch_fn_args[]=
4017 { CFAT_EXTRA_GCI_BITS,
true },
4021 static const st_conflict_fn_def conflict_fns[]=
4023 {
"NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
4024 &resolve_col_args[0], row_conflict_fn_max_del_win },
4025 {
"NDB$MAX", CFT_NDB_MAX,
4026 &resolve_col_args[0], row_conflict_fn_max },
4027 {
"NDB$OLD", CFT_NDB_OLD,
4028 &resolve_col_args[0], row_conflict_fn_old },
4029 {
"NDB$EPOCH", CFT_NDB_EPOCH,
4030 &epoch_fn_args[0], row_conflict_fn_epoch }
4033 static unsigned n_conflict_fns=
4034 sizeof(conflict_fns) /
sizeof(
struct st_conflict_fn_def);
4038 parse_conflict_fn_spec(
const char* conflict_fn_spec,
4039 const st_conflict_fn_def** conflict_fn,
4040 st_conflict_fn_arg* args,
4043 char *
msg, uint msg_len)
4045 DBUG_ENTER(
"parse_conflict_fn_spec");
4048 const char *ptr= conflict_fn_spec;
4049 const char *error_str=
"unknown conflict resolution function";
4051 while (*ptr ==
' ' && *ptr !=
'\0') ptr++;
4053 DBUG_PRINT(
"info", (
"parsing %s", conflict_fn_spec));
4055 for (
unsigned i= 0; i < n_conflict_fns; i++)
4057 const st_conflict_fn_def &fn= conflict_fns[
i];
4059 uint len= strlen(fn.name);
4060 if (strncmp(ptr, fn.name, len))
4063 DBUG_PRINT(
"info", (
"found function %s", fn.name));
4069 while (*ptr ==
' ' && *ptr !=
'\0') ptr++;
4074 error_str=
"missing '('";
4075 DBUG_PRINT(
"info", (
"parse error %s", error_str));
4083 if (no_args >= *max_args)
4085 error_str=
"too many arguments";
4086 DBUG_PRINT(
"info", (
"parse error %s", error_str));
4091 enum enum_conflict_fn_arg_type type=
4092 conflict_fns[
i].arg_defs[no_args].arg_type;
4095 while (*ptr ==
' ' && *ptr !=
'\0') ptr++;
4097 if (type == CFAT_END)
4099 args[no_args].type=
type;
4106 const char *start_arg= ptr;
4107 while (*ptr !=
')' && *ptr !=
' ' && *ptr !=
'\0') ptr++;
4108 const char *end_arg= ptr;
4110 bool optional_arg = conflict_fns[
i].arg_defs[no_args].optional;
4112 if (start_arg == end_arg)
4116 error_str=
"missing function argument";
4117 DBUG_PRINT(
"info", (
"parse error %s", error_str));
4125 args[no_args].type= CFAT_END;
4131 uint len= end_arg - start_arg;
4132 args[no_args].type=
type;
4133 args[no_args].ptr= start_arg;
4134 args[no_args].len= len;
4135 args[no_args].fieldno= (uint32)-1;
4137 DBUG_PRINT(
"info", (
"found argument %s %u", start_arg, len));
4139 bool arg_processing_error =
false;
4142 case CFAT_COLUMN_NAME:
4145 DBUG_PRINT(
"info", (
"searching for %s %u", start_arg, len));
4147 for (uint j= 0; j < table_s->fields; j++)
4149 Field *field= table_s->field[j];
4150 if (strncmp(start_arg, field->field_name, len) == 0 &&
4151 field->field_name[len] ==
'\0')
4153 DBUG_PRINT(
"info", (
"found %s", field->field_name));
4154 args[no_args].fieldno= j;
4160 case CFAT_EXTRA_GCI_BITS:
4163 char* end_of_arg = (
char*) end_arg;
4164 Uint32 bits = strtoul(start_arg, &end_of_arg, 0);
4165 DBUG_PRINT(
"info", (
"Using %u as the number of extra bits", bits));
4169 arg_processing_error=
true;
4170 error_str=
"Too many extra Gci bits";
4171 DBUG_PRINT(
"info", (
"%s", error_str));
4175 args[no_args].extraGciBits = bits;
4182 if (arg_processing_error)
4191 while (*ptr ==
' ' && *ptr !=
'\0') ptr++;
4196 error_str=
"missing ')'";
4202 while (*ptr ==
' ' && *ptr !=
'\0') ptr++;
4207 error_str=
"garbage in the end";
4212 *conflict_fn = &conflict_fns[
i];
4213 *max_args = no_args;
4218 my_snprintf(msg, msg_len,
"%s, %s at '%s'",
4219 conflict_fn_spec, error_str, ptr);
4220 DBUG_PRINT(
"info", (
"%s", msg));
4225 setup_conflict_fn(THD *thd,
NDB_SHARE *share,
4227 char *msg, uint msg_len,
4229 const st_conflict_fn_def* conflict_fn,
4230 const st_conflict_fn_arg* args,
4231 const Uint32 num_args)
4233 DBUG_ENTER(
"setup_conflict_fn");
4236 switch (conflict_fn->type)
4240 case CFT_NDB_MAX_DEL_WIN:
4244 my_snprintf(msg, msg_len,
4245 "Incorrect arguments to conflict function");
4246 DBUG_PRINT(
"info", (
"%s", msg));
4250 uint resolve_col_sz= 0;
4252 if (0 == (resolve_col_sz =
4253 slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
4256 slave_reset_conflict_fn(share);
4257 my_snprintf(msg, msg_len,
4258 "column '%s' has wrong datatype",
4259 table->s->field[args[0].fieldno]->field_name);
4260 DBUG_PRINT(
"info", (
"%s", msg));
4264 if (slave_set_resolve_fn(thd, share, ndbtab,
4265 args[0].fieldno, resolve_col_sz,
4266 conflict_fn, table, CFF_NONE))
4268 my_snprintf(msg, msg_len,
4269 "unable to setup conflict resolution using column '%s'",
4270 table->s->field[args[0].fieldno]->field_name);
4271 DBUG_PRINT(
"info", (
"%s", msg));
4274 if (opt_ndb_extra_logging)
4276 sql_print_information(
"NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
4278 table->s->table_name.str,
4280 table->s->field[args[0].fieldno]->field_name);
4288 my_snprintf(msg, msg_len,
4289 "Too many arguments to conflict function");
4290 DBUG_PRINT(
"info", (
"%s", msg));
4295 if (share->flags & NSF_BLOB_FLAG)
4297 my_snprintf(msg, msg_len,
"Table has Blob column(s), not suitable for NDB$EPOCH.");
4298 DBUG_PRINT(
"info", (
"%s", msg));
4306 if (ndbtab->getExtraRowGciBits() == 0)
4307 sql_print_information(
"Ndb Slave : CFT_NDB_EPOCH, low epoch resolution");
4309 if (ndbtab->getExtraRowAuthorBits() == 0)
4311 my_snprintf(msg, msg_len,
"No extra row author bits in table.");
4312 DBUG_PRINT(
"info", (
"%s", msg));
4316 if (slave_set_resolve_fn(thd, share, ndbtab,
4319 conflict_fn, table, CFF_REFRESH_ROWS))
4321 my_snprintf(msg, msg_len,
4322 "unable to setup conflict resolution");
4323 DBUG_PRINT(
"info", (
"%s", msg));
4326 if (opt_ndb_extra_logging)
4328 sql_print_information(
"NDB Slave: Table %s.%s using conflict_fn %s.",
4330 table->s->table_name.str,
4335 case CFT_NUMBER_OF_CFTS:
4342 static const char *ndb_rep_db= NDB_REP_DB;
4343 static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
4344 static const char *nrt_db=
"db";
4345 static const char *nrt_table_name=
"table_name";
4346 static const char *nrt_server_id=
"server_id";
4347 static const char *nrt_binlog_type=
"binlog_type";
4348 static const char *nrt_conflict_fn=
"conflict_fn";
4359 ndbcluster_read_replication_table(THD *thd,
Ndb *ndb,
4361 const char* table_name,
4363 Uint32* binlog_flags,
4364 char** conflict_fn_spec,
4365 char* conflict_fn_buffer,
4366 Uint32 conflict_fn_buffer_len)
4368 DBUG_ENTER(
"ndbcluster_read_replication_table");
4371 const char *error_str=
"<none>";
4376 const NDBTAB *reptab= ndbtab_g.get_table();
4377 if (reptab == NULL &&
4381 DBUG_PRINT(
"info", (
"No %s.%s table", ndb_rep_db, ndb_replication_table));
4382 *binlog_flags= NBT_DEFAULT;
4383 *conflict_fn_spec= NULL;
4387 *col_db, *col_table_name, *col_server_id, *col_binlog_type, *col_conflict_fn;
4388 char tmp_buf[FN_REFLEN];
4390 int retry_sleep= 30;
4399 error_str=
"Wrong number of primary key parts, expected 3";
4403 col_db= reptab->
getColumn(error_str= nrt_db);
4404 if (col_db == NULL ||
4408 col_table_name= reptab->
getColumn(error_str= nrt_table_name);
4409 if (col_table_name == NULL ||
4413 col_server_id= reptab->
getColumn(error_str= nrt_server_id);
4414 if (col_server_id == NULL ||
4418 col_binlog_type= reptab->
getColumn(error_str= nrt_binlog_type);
4419 if (col_binlog_type == NULL ||
4423 col_conflict_fn= reptab->
getColumn(error_str= nrt_conflict_fn);
4424 if (col_conflict_fn == NULL)
4426 col_conflict_fn= NULL;
4442 NdbRecAttr *col_conflict_fn_rec_attr[2]= {NULL, NULL};
4443 uint32 ndb_binlog_type[2];
4445 char ndb_conflict_fn_buf[2*sz];
4446 char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
4453 for (i= 0; i < 2; i++)
4456 DBUG_PRINT(
"info", (
"reading[%u]: %s,%s,%u", i, db, table_name,
id));
4459 ndb_pack_varchar(col_db, tmp_buf, db, strlen(db));
4461 ndb_pack_varchar(col_table_name, tmp_buf, table_name, strlen(table_name));
4464 if ((col_binlog_type_rec_attr[i]=
4465 _op->
getValue(col_binlog_type, (
char *)&(ndb_binlog_type[i]))) == 0) abort();
4467 if (col_conflict_fn)
4469 if ((col_conflict_fn_rec_attr[i]=
4470 _op->
getValue(col_conflict_fn, ndb_conflict_fn[i])) == 0) abort();
4485 do_retry_sleep(retry_sleep);
4493 for (i= 0; i < 2; i++)
4495 if (op[i]->getNdbError().code)
4499 col_binlog_type_rec_attr[
i]= NULL;
4500 col_conflict_fn_rec_attr[
i]= NULL;
4501 DBUG_PRINT(
"info", (
"not found row[%u]", i));
4507 DBUG_PRINT(
"info", (
"found row[%u]", i));
4509 if (col_binlog_type_rec_attr[1] == NULL ||
4510 col_binlog_type_rec_attr[1]->isNULL())
4513 col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
4514 ndb_binlog_type[1]= ndb_binlog_type[0];
4516 if (col_conflict_fn_rec_attr[1] == NULL ||
4517 col_conflict_fn_rec_attr[1]->isNULL())
4520 col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
4521 ndb_conflict_fn[1]= ndb_conflict_fn[0];
4524 if (col_binlog_type_rec_attr[1] == NULL ||
4525 col_binlog_type_rec_attr[1]->isNULL())
4527 DBUG_PRINT(
"info", (
"No binlog flag value, using default"));
4529 *binlog_flags= NBT_DEFAULT;
4533 DBUG_PRINT(
"info", (
"Taking binlog flag value from the table"));
4534 *binlog_flags= (
enum Ndb_binlog_type) ndb_binlog_type[1];
4537 if (col_conflict_fn_rec_attr[1] == NULL ||
4538 col_conflict_fn_rec_attr[1]->isNULL())
4541 *conflict_fn_spec = NULL;
4545 const char* conflict_fn = ndb_conflict_fn[1];
4547 switch (col_conflict_fn->getArrayType())
4549 case NDBCOL::ArrayTypeShortVar:
4550 len= *(uchar*)conflict_fn;
4553 case NDBCOL::ArrayTypeMediumVar:
4554 len= uint2korr(conflict_fn);
4560 if ((len + 1) > conflict_fn_buffer_len)
4564 error_str=
"Conflict function specification too long.";
4567 memcpy(conflict_fn_buffer, conflict_fn, len);
4568 conflict_fn_buffer[len] =
'\0';
4569 *conflict_fn_spec = conflict_fn_buffer;
4572 DBUG_PRINT(
"info", (
"Retrieved Binlog flags : %u and function spec : %s",
4573 *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
4582 DBUG_PRINT(
"info", (
"error %d, error_str %s, ndberror.code %u",
4583 error, error_str, ndberror.
code));
4586 char msg[FN_REFLEN];
4590 my_snprintf(msg,
sizeof(msg),
4591 "Missing or wrong type for column '%s'", error_str);
4594 my_snprintf(msg,
sizeof(msg),
"%s", error_str);
4599 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4600 ER_NDB_REPLICATION_SCHEMA_ERROR,
4601 ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
4606 char msg[FN_REFLEN];
4607 my_snprintf(tmp_buf,
sizeof(tmp_buf),
"ndberror %u", ndberror.
code);
4608 my_snprintf(msg,
sizeof(msg),
"Unable to retrieve %s.%s, logging and "
4609 "conflict resolution may not function as intended (%s)",
4610 ndb_rep_db, ndb_replication_table, tmp_buf);
4611 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4612 ER_ILLEGAL_HA_CREATE_OPTION,
4613 ER(ER_ILLEGAL_HA_CREATE_OPTION),
4614 ndbcluster_hton_name, msg);
4616 *binlog_flags= NBT_DEFAULT;
4617 *conflict_fn_spec= NULL;
4619 if (ndberror.
code && opt_ndb_extra_logging)
4620 print_warning_list(
"NDB", thd);
4621 DBUG_RETURN(ndberror.
code);
4634 ndbcluster_get_binlog_replication_info(THD *thd,
Ndb *ndb,
4636 const char* table_name,
4639 Uint32* binlog_flags,
4640 const st_conflict_fn_def** conflict_fn,
4641 st_conflict_fn_arg* args,
4644 DBUG_ENTER(
"ndbcluster_get_binlog_replication_info");
4647 if (opt_ndb_log_apply_status)
4649 if (strcmp(db, NDB_REP_DB) == 0 &&
4650 strcmp(table_name, NDB_APPLY_TABLE) == 0)
4658 DBUG_PRINT(
"info", (
"ndb_apply_status defaulting to FULL, USE_WRITE"));
4659 sql_print_information(
"NDB : ndb-log-apply-status forcing "
4660 "%s.%s to FULL USE_WRITE",
4661 NDB_REP_DB, NDB_APPLY_TABLE);
4662 *binlog_flags = NBT_FULL;
4663 *conflict_fn = NULL;
4669 const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
4670 char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
4671 char* conflict_fn_spec;
4673 if (ndbcluster_read_replication_table(thd,
4681 MAX_CONFLICT_FN_SPEC_LEN) != 0)
4683 DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
4688 if (conflict_fn_spec != NULL)
4690 char tmp_buf[FN_REFLEN];
4692 if (parse_conflict_fn_spec(conflict_fn_spec,
4698 sizeof(tmp_buf)) != 0)
4700 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4701 ER_CONFLICT_FN_PARSE_ERROR,
4702 ER(ER_CONFLICT_FN_PARSE_ERROR),
4704 DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
4719 ndbcluster_apply_binlog_replication_info(THD *thd,
4723 const st_conflict_fn_def* conflict_fn,
4724 const st_conflict_fn_arg* args,
4726 bool do_set_binlog_flags,
4727 Uint32 binlog_flags)
4729 DBUG_ENTER(
"ndbcluster_apply_binlog_replication_info");
4730 char tmp_buf[FN_REFLEN];
4732 if (do_set_binlog_flags)
4734 DBUG_PRINT(
"info", (
"Setting binlog flags to %u", binlog_flags));
4735 set_binlog_flags(share, (
enum Ndb_binlog_type)binlog_flags);
4738 if (conflict_fn != NULL)
4740 if (setup_conflict_fn(thd, share,
4742 tmp_buf,
sizeof(tmp_buf),
4748 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4749 ER_CONFLICT_FN_PARSE_ERROR,
4750 ER(ER_CONFLICT_FN_PARSE_ERROR),
4758 slave_reset_conflict_fn(share);
4765 ndbcluster_read_binlog_replication(THD *thd,
Ndb *ndb,
4770 bool do_set_binlog_flags)
4772 DBUG_ENTER(
"ndbcluster_read_binlog_replication");
4773 Uint32 binlog_flags;
4774 const st_conflict_fn_def* conflict_fn= NULL;
4775 st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
4776 Uint32 num_args = MAX_CONFLICT_ARGS;
4778 if ((ndbcluster_get_binlog_replication_info(thd, ndb,
4787 (ndbcluster_apply_binlog_replication_info(thd,
4794 do_set_binlog_flags,
4795 binlog_flags) != 0))
4805 ndbcluster_check_if_local_table(
const char *dbname,
const char *tabname)
4807 char key[FN_REFLEN + 1];
4808 char ndb_file[FN_REFLEN + 1];
4810 DBUG_ENTER(
"ndbcluster_check_if_local_table");
4811 build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
4812 build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
4814 DBUG_PRINT(
"info", (
"Looking for file %s and %s", key, ndb_file));
4815 if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
4817 DBUG_PRINT(
"info", (
"table file %s not on disk, local table", ndb_file));
4827 ndbcluster_check_if_local_tables_in_db(THD *thd,
const char *dbname)
4829 DBUG_ENTER(
"ndbcluster_check_if_local_tables_in_db");
4830 DBUG_PRINT(
"info", (
"Looking for files in directory %s", dbname));
4833 char path[FN_REFLEN + 1];
4835 build_table_filename(path,
sizeof(path) - 1, dbname,
"",
"", 0);
4836 if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
4839 DBUG_PRINT(
"info", (
"Failed to find files"));
4842 DBUG_PRINT(
"info",(
"found: %d files", files.elements));
4843 while ((tabname= files.pop()))
4845 DBUG_PRINT(
"info", (
"Found table %s", tabname->str));
4846 if (ndbcluster_check_if_local_table(dbname, tabname->str))
4857 int ndbcluster_create_binlog_setup(THD *thd,
Ndb *ndb,
const char *key,
4860 const char *table_name,
4863 int do_event_op= ndb_binlog_running;
4864 DBUG_ENTER(
"ndbcluster_create_binlog_setup");
4865 DBUG_PRINT(
"enter",(
"key: %s key_len: %d %s.%s",
4866 key, key_len, db, table_name));
4867 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
4868 DBUG_ASSERT(strlen(key) == key_len);
4870 pthread_mutex_lock(&ndbcluster_mutex);
4871 NDB_SHARE * share = get_share(key, table, TRUE, TRUE);
4877 pthread_mutex_unlock(&ndbcluster_mutex);
4880 pthread_mutex_unlock(&ndbcluster_mutex);
4882 pthread_mutex_lock(&share->mutex);
4883 if (get_binlog_nologging(share) || share->op != 0 || share->new_op != 0)
4885 pthread_mutex_unlock(&share->mutex);
4890 if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
4895 DBUG_PRINT(
"info", (
"Skipping binlogging of table %s/%s", db, table_name));
4899 if (!ndb_schema_share &&
4900 strcmp(share->db, NDB_REP_DB) == 0 &&
4901 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4903 else if (!ndb_apply_status_share &&
4904 strcmp(share->db, NDB_REP_DB) == 0 &&
4905 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
4910 set_binlog_nologging(share);
4911 pthread_mutex_unlock(&share->mutex);
4915 while (share && !IS_TMP_PREFIX(table_name))
4929 const NDBTAB *ndbtab= ndbtab_g.get_table();
4932 if (opt_ndb_extra_logging)
4933 sql_print_information(
"NDB Binlog: Failed to get table %s from ndb: "
4938 #ifdef HAVE_NDB_BINLOG
4941 ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
4942 ::server_id, NULL, TRUE);
4947 if (get_binlog_nologging(share))
4949 if (opt_ndb_extra_logging)
4950 sql_print_information(
"NDB Binlog: NOT logging %s", share->key);
4951 pthread_mutex_unlock(&share->mutex);
4955 String event_name(INJECTOR_EVENT_LEN);
4956 ndb_rep_event_name(&event_name, db, table_name, get_binlog_full(share));
4964 if (ndbcluster_create_event(thd, ndb, ndbtab, event_name.c_ptr(), share))
4966 sql_print_error(
"NDB Binlog: "
4967 "FAILED CREATE (DISCOVER) TABLE Event: %s",
4968 event_name.c_ptr());
4971 if (opt_ndb_extra_logging)
4972 sql_print_information(
"NDB Binlog: "
4973 "CREATE (DISCOVER) TABLE Event: %s",
4974 event_name.c_ptr());
4979 if (opt_ndb_extra_logging)
4980 sql_print_information(
"NDB Binlog: DISCOVER TABLE Event: %s",
4981 event_name.c_ptr());
4987 if (ndbcluster_create_event_ops(thd, share,
4988 ndbtab, event_name.c_ptr()))
4990 sql_print_error(
"NDB Binlog:"
4991 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
4992 event_name.c_ptr());
4996 pthread_mutex_unlock(&share->mutex);
5000 pthread_mutex_unlock(&share->mutex);
5006 ndbcluster_create_event(THD *thd,
Ndb *ndb,
const NDBTAB *ndbtab,
5007 const char *event_name,
NDB_SHARE *share,
5010 DBUG_ENTER(
"ndbcluster_create_event");
5011 DBUG_PRINT(
"info", (
"table=%s version=%d event=%s share=%s",
5013 event_name, share ? share->key :
"(nil)"));
5014 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->
getName()));
5017 DBUG_PRINT(
"info", (
"share == NULL"));
5020 if (get_binlog_nologging(share))
5022 if (opt_ndb_extra_logging && ndb_binlog_running)
5023 sql_print_information(
"NDB Binlog: NOT logging %s", share->key);
5024 DBUG_PRINT(
"info", (
"share->flags & NSF_NO_BINLOG, flags: %x %d",
5025 share->flags, share->flags & NSF_NO_BINLOG));
5032 my_event.setTable(*ndbtab);
5034 if (share->flags & NSF_HIDDEN_PK)
5036 if (share->flags & NSF_BLOB_FLAG)
5038 sql_print_error(
"NDB Binlog: logging of table %s "
5039 "with BLOB attribute and no PK is not supported",
5042 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5043 ER_ILLEGAL_HA_CREATE_OPTION,
5044 ER(ER_ILLEGAL_HA_CREATE_OPTION),
5045 ndbcluster_hton_name,
5046 "Binlog of table with BLOB attribute and no PK");
5048 share->flags|= NSF_NO_BINLOG;
5053 (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
5054 DBUG_PRINT(
"info", (
"subscription all"));
5058 if (strcmp(share->db, NDB_REP_DB) == 0 &&
5059 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
5066 NDBEVENT::ER_SUBSCRIBE |
5068 DBUG_PRINT(
"info", (
"subscription all and subscribe"));
5072 if (get_binlog_full(share))
5075 (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
5076 DBUG_PRINT(
"info", (
"subscription all"));
5081 (NDBEVENT::ER_UPDATED | NDBEVENT::ER_DDL));
5082 DBUG_PRINT(
"info", (
"subscription only updated"));
5086 if (share->flags & NSF_BLOB_FLAG)
5087 my_event.mergeEvents(TRUE);
5091 for(
int a= 0; a < n_cols; a++)
5092 my_event.addEventColumn(a);
5101 if (push_warning > 1)
5102 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5103 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5106 sql_print_error(
"NDB Binlog: Unable to create event in database. "
5107 "Event: %s Error Code: %d Message: %s", event_name,
5117 if ((ev= dict->
getEvent(event_name)))
5129 if (push_warning > 1)
5130 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5131 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5134 sql_print_error(
"NDB Binlog: Unable to create event in database. "
5135 " Attempt to correct with drop failed. "
5136 "Event: %s Error Code: %d Message: %s",
5148 if (push_warning > 1)
5149 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5150 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5153 sql_print_error(
"NDB Binlog: Unable to create event in database. "
5154 " Attempt to correct with drop ok, but create failed. "
5155 "Event: %s Error Code: %d Message: %s",
5161 #ifdef NDB_BINLOG_EXTRA_WARNINGS
5162 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5163 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5164 0,
"NDB Binlog: Removed trailing event",
5172 inline int is_ndb_compatible_type(
Field *field)
5175 !(field->flags & BLOB_FLAG) &&
5176 field->type() != MYSQL_TYPE_BIT &&
5177 field->pack_length() != 0;
5188 ndbcluster_create_event_ops(THD *thd,
NDB_SHARE *share,
5189 const NDBTAB *ndbtab,
const char *event_name)
5196 DBUG_ENTER(
"ndbcluster_create_event_ops");
5197 DBUG_PRINT(
"enter", (
"table: %s event: %s", ndbtab->
getName(), event_name));
5198 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->
getName()));
5200 DBUG_ASSERT(share != 0);
5202 if (get_binlog_nologging(share))
5204 DBUG_PRINT(
"info", (
"share->flags & NSF_NO_BINLOG, flags: %x",
5211 assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
5212 share->table_name));
5215 int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
5216 #ifdef HAVE_NDB_BINLOG
5217 uint len= strlen(share->table_name);
5219 if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
5220 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
5221 do_ndb_schema_share= 1;
5222 else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
5223 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
5224 do_ndb_apply_status_share= 1;
5226 #ifdef HAVE_NDB_BINLOG
5227 if (!binlog_filter->db_ok(share->db) ||
5228 !ndb_binlog_running ||
5229 (len >=
sizeof(NDB_EXCEPTIONS_TABLE_SUFFIX) &&
5230 strcmp(share->table_name+len-
sizeof(NDB_EXCEPTIONS_TABLE_SUFFIX)+1,
5231 lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
5232 NDB_EXCEPTIONS_TABLE_SUFFIX) == 0))
5235 share->flags|= NSF_NO_BINLOG;
5242 assert(event_data->share == share);
5243 assert(share->event_data == 0);
5245 DBUG_ASSERT(share->use_count > 1);
5246 sql_print_error(
"NDB Binlog: discover reusing old ev op");
5248 DBUG_PRINT(
"NDB_SHARE", (
"%s ToDo free use_count: %u",
5249 share->key, share->use_count));
5254 DBUG_ASSERT(event_data != 0);
5255 TABLE *table= event_data->shadow_table;
5262 int retry_sleep= 100;
5266 Ndb *ndb= injector_ndb;
5267 if (do_ndb_schema_share)
5274 if (do_ndb_schema_share)
5280 assert(ret == 0); NDB_IGNORE_VALUE(ret);
5287 sql_print_error(
"NDB Binlog: Creating NdbEventOperation failed for"
5289 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5290 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5297 if (share->flags & NSF_BLOB_FLAG)
5301 uint n_fields= table->s->fields;
5302 uint val_length=
sizeof(
NdbValue) * n_columns;
5307 if (my_multi_malloc(MYF(MY_WME),
5308 &event_data->ndb_value[0],
5310 &event_data->ndb_value[1],
5314 DBUG_PRINT(
"info", (
"Failed to allocate records for event operation"));
5318 for (uint j= 0; j < n_columns; j++)
5324 Field *f= table->field[j];
5325 if (is_ndb_compatible_type(f))
5327 DBUG_PRINT(
"info", (
"%s compatible", col_name));
5328 attr0.rec= op->
getValue(col_name, (
char*) f->ptr);
5330 (f->ptr - table->record[0]) +
5331 (
char*) table->record[1]);
5333 else if (! (f->flags & BLOB_FLAG))
5335 DBUG_PRINT(
"info", (
"%s non compatible", col_name));
5341 DBUG_PRINT(
"info", (
"%s blob", col_name));
5342 DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
5344 attr1.blob= op->getPreBlobHandle(col_name);
5345 if (attr0.blob == NULL || attr1.blob == NULL)
5347 sql_print_error(
"NDB Binlog: Creating NdbEventOperation"
5348 " blob field %u handles failed (code=%d) for %s",
5350 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5351 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5362 DBUG_PRINT(
"info", (
"%s hidden key", col_name));
5366 event_data->ndb_value[0][j].ptr= attr0.ptr;
5367 event_data->ndb_value[1][j].ptr= attr1.ptr;
5368 DBUG_PRINT(
"info", (
"&event_data->ndb_value[0][%d]: 0x%lx "
5369 "event_data->ndb_value[0][%d]: 0x%lx",
5370 j, (
long) &event_data->ndb_value[0][j],
5371 j, (
long) attr0.ptr));
5372 DBUG_PRINT(
"info", (
"&event_data->ndb_value[1][%d]: 0x%lx "
5373 "event_data->ndb_value[1][%d]: 0x%lx",
5374 j, (
long) &event_data->ndb_value[0][j],
5375 j, (
long) attr1.ptr));
5377 op->setCustomData((
void *) event_data);
5378 share->event_data= 0;
5390 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5391 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5394 sql_print_error(
"NDB Binlog: ndbevent->execute failed for %s; %d %s",
5398 share->event_data= event_data;
5399 op->setCustomData(NULL);
5401 if (retries && !thd->killed)
5403 do_retry_sleep(retry_sleep);
5413 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog use_count: %u",
5414 share->key, share->use_count));
5415 if (do_ndb_apply_status_share)
5418 ndb_apply_status_share= get_share(share);
5419 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra use_count: %u",
5420 share->key, share->use_count));
5421 (void) pthread_cond_signal(&injector_cond);
5423 else if (do_ndb_schema_share)
5426 ndb_schema_share= get_share(share);
5427 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra use_count: %u",
5428 share->key, share->use_count));
5429 (void) pthread_cond_signal(&injector_cond);
5432 DBUG_PRINT(
"info",(
"%s share->op: 0x%lx share->use_count: %u",
5433 share->key, (
long) share->op, share->use_count));
5435 if (opt_ndb_extra_logging)
5436 sql_print_information(
"NDB Binlog: logging %s (%s,%s)", share->key,
5437 get_binlog_full(share) ?
"FULL" :
"UPDATED",
5438 get_binlog_use_update(share) ?
"USE_UPDATE" :
"USE_WRITE");
5443 ndbcluster_drop_event(THD *thd,
Ndb *ndb,
NDB_SHARE *share,
5444 const char *type_str,
5446 const char *tabname)
5448 DBUG_ENTER(
"ndbcluster_drop_event");
5454 for (uint i= 0; i < 2; i++)
5457 String event_name(INJECTOR_EVENT_LEN);
5458 ndb_rep_event_name(&event_name, dbname, tabname, i);
5460 if (!dict->
dropEvent(event_name.c_ptr()))
5467 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5468 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5472 sql_print_error(
"NDB Binlog: Unable to drop event in database. "
5473 "Event: %s Error Code: %d Message: %s",
5478 if (share && share->op &&
5497 ndbcluster_handle_drop_table(THD *thd,
Ndb *ndb,
NDB_SHARE *share,
5498 const char *type_str,
5499 const char * dbname,
const char * tabname)
5501 DBUG_ENTER(
"ndbcluster_handle_drop_table");
5503 if (dbname && tabname)
5505 if (ndbcluster_drop_event(thd, ndb, share, type_str, dbname, tabname))
5509 if (share == 0 || share->op == 0)
5526 const char *save_proc_info= thd->proc_info;
5529 thd->proc_info=
"Syncing ndb table schema operation and binlog";
5530 pthread_mutex_lock(&share->mutex);
5531 int max_timeout= DEFAULT_SYNC_TIMEOUT;
5535 set_timespec(abstime, 1);
5536 int ret= pthread_cond_timedwait(&injector_cond,
5545 if (max_timeout == 0)
5547 sql_print_error(
"NDB %s: %s timed out. Ignoring...",
5548 type_str, share->key);
5552 if (opt_ndb_extra_logging)
5553 ndb_report_waiting(type_str, max_timeout,
5554 type_str, share->key, 0);
5557 pthread_mutex_unlock(&share->mutex);
5559 pthread_mutex_lock(&share->mutex);
5561 pthread_mutex_unlock(&share->mutex);
5563 thd->proc_info= save_proc_info;
5588 static void ndb_unpack_record(
TABLE *table,
NdbValue *value,
5591 Field **p_field= table->field, *field= *p_field;
5592 my_ptrdiff_t row_offset= (my_ptrdiff_t) (buf - table->record[0]);
5593 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
5594 DBUG_ENTER(
"ndb_unpack_record");
5602 if (table->s->null_bytes > 0)
5603 buf[table->s->null_bytes - 1]|= 256
U - (1
U <<
5604 table->s->last_null_bit_pos);
5609 p_field++, value++, field= *p_field)
5611 field->set_notnull(row_offset);
5614 if (!(field->flags & BLOB_FLAG))
5616 int is_null= (*value).rec->isNULL();
5621 DBUG_PRINT(
"info",(
"[%u] NULL", field->field_index));
5622 field->set_null(row_offset);
5626 DBUG_PRINT(
"info",(
"[%u] UNDEFINED", field->field_index));
5627 bitmap_clear_bit(defined, field->field_index);
5630 else if (field->type() == MYSQL_TYPE_BIT)
5639 field_bit->Field_bit::move_field_offset(row_offset);
5640 if (field->pack_length() < 5)
5642 DBUG_PRINT(
"info", (
"bit field H'%.8X",
5643 (*value).rec->u_32_value()));
5644 field_bit->Field_bit::store((longlong) (*value).rec->u_32_value(),
5649 DBUG_PRINT(
"info", (
"bit field H'%.8X%.8X",
5650 *(Uint32 *)(*value).rec->aRef(),
5651 *((Uint32 *)(*value).rec->aRef()+1)));
5652 #ifdef WORDS_BIGENDIAN
5654 Uint32 *buf= (Uint32 *)(*value).rec->aRef();
5655 field_bit->Field_bit::store((((longlong)*buf)
5656 & 0x00000000FFFFFFFFLL)
5658 ((((longlong)*(buf+1)) << 32)
5659 & 0xFFFFFFFF00000000LL),
5662 field_bit->Field_bit::store((longlong)
5663 (*value).rec->u_64_value(), TRUE);
5670 field_bit->Field_bit::move_field_offset(-row_offset);
5671 DBUG_PRINT(
"info",(
"[%u] SET",
5672 (*value).rec->getColumn()->getColumnNo()));
5673 DBUG_DUMP(
"info", (
const uchar*) field->ptr, field->pack_length());
5677 DBUG_PRINT(
"info",(
"[%u] SET",
5678 (*value).rec->getColumn()->getColumnNo()));
5679 DBUG_DUMP(
"info", (
const uchar*) field->ptr, field->pack_length());
5684 NdbBlob *ndb_blob= (*value).blob;
5685 uint col_no= field->field_index;
5687 ndb_blob->getDefined(isNull);
5690 DBUG_PRINT(
"info",(
"[%u] NULL", col_no));
5691 field->set_null(row_offset);
5693 else if (isNull == -1)
5695 DBUG_PRINT(
"info",(
"[%u] UNDEFINED", col_no));
5696 bitmap_clear_bit(defined, col_no);
5704 field_blob->get_ptr(&ptr, row_offset);
5705 uint32 len= field_blob->get_length(row_offset);
5706 DBUG_PRINT(
"info",(
"[%u] SET ptr: 0x%lx len: %u",
5707 col_no, (
long) ptr, len));
5713 dbug_tmp_restore_column_map(table->write_set, old_map);
5721 ndb_binlog_thread_handle_error(
Ndb *ndb,
5726 DBUG_ENTER(
"ndb_binlog_thread_handle_error");
5728 int overrun= pOp->isOverrun();
5735 sql_print_error(
"NDB Binlog: Overrun in event buffer, "
5736 "this means we have dropped events. Cannot "
5737 "continue binlog for %s", share->key);
5748 sql_print_error(
"NDB Binlog: Not Consistent. Cannot "
5749 "continue binlog for %s. Error code: %d"
5750 " Message: %s", share->key,
5756 sql_print_error(
"NDB Binlog: unhandled error %d for table %s",
5757 pOp->hasError(), share->key);
5763 ndb_binlog_thread_handle_non_data_event(THD *thd,
5765 ndb_binlog_index_row &row)
5774 if (opt_ndb_extra_logging)
5775 sql_print_information(
"NDB Binlog: cluster failure for %s at epoch %u/%u.",
5777 (uint)(pOp->
getGCI() >> 32),
5779 if (ndb_apply_status_share == share)
5781 if (opt_ndb_extra_logging &&
5782 ndb_binlog_tables_inited && ndb_binlog_running)
5783 sql_print_information(
"NDB Binlog: ndb tables initially "
5784 "read only on reconnect.");
5786 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra free use_count: %u",
5787 share->key, share->use_count));
5788 free_share(&ndb_apply_status_share);
5789 ndb_apply_status_share= 0;
5790 ndb_binlog_tables_inited= FALSE;
5792 DBUG_PRINT(
"error", (
"CLUSTER FAILURE EVENT: "
5793 "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
5795 share->key, (
long) share, (
long) pOp,
5796 (
long) share->op, (
long) share->new_op));
5799 if (ndb_apply_status_share == share)
5801 if (opt_ndb_extra_logging &&
5802 ndb_binlog_tables_inited && ndb_binlog_running)
5803 sql_print_information(
"NDB Binlog: ndb tables initially "
5804 "read only on reconnect.");
5806 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra free use_count: %u",
5807 share->key, share->use_count));
5808 free_share(&ndb_apply_status_share);
5809 ndb_apply_status_share= 0;
5810 ndb_binlog_tables_inited= FALSE;
5813 if (opt_ndb_extra_logging)
5814 sql_print_information(
"NDB Binlog: drop table %s.", share->key);
5818 DBUG_PRINT(
"info", (
"TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
5819 "share op: 0x%lx new_op: 0x%lx",
5821 share->key, (
long) share, (
long) pOp,
5822 (
long) share->op, (
long) share->new_op));
5832 sql_print_error(
"NDB Binlog: unknown non data event %d for %s. "
5833 "Ignoring...", (
unsigned) type, share->key);
5837 ndb_handle_schema_change(thd, injector_ndb, pOp, event_data);
5844 inline ndb_binlog_index_row *
5845 ndb_find_binlog_index_row(ndb_binlog_index_row **rows,
5846 uint orig_server_id,
int flag)
5848 ndb_binlog_index_row *row= *rows;
5849 if (opt_ndb_log_orig)
5851 ndb_binlog_index_row *first= row, *found_id= 0;
5854 if (row->orig_server_id == orig_server_id)
5857 if (!flag || !row->orig_epoch)
5862 if (row->orig_server_id == 0)
5867 row= (ndb_binlog_index_row*)sql_alloc(
sizeof(ndb_binlog_index_row));
5868 memset(row, 0,
sizeof(ndb_binlog_index_row));
5878 row->n_inserts= found_id->n_inserts;
5879 row->n_updates= found_id->n_updates;
5880 row->n_deletes= found_id->n_deletes;
5881 found_id->n_inserts= 0;
5882 found_id->n_updates= 0;
5883 found_id->n_deletes= 0;
5886 row->n_schemaops= first->n_schemaops;
5887 first->n_schemaops= 0;
5891 row->orig_server_id= orig_server_id;
5898 ndb_binlog_index_row **rows,
5900 unsigned &trans_row_count,
5901 unsigned &trans_slave_row_count)
5904 TABLE *table= event_data->shadow_table;
5906 if (pOp != share->op)
5912 if (ndbcluster_anyvalue_is_reserved(anyValue))
5914 if (!ndbcluster_anyvalue_is_nologging(anyValue))
5915 sql_print_warning(
"NDB: unknown value for binlog signalling 0x%X, "
5920 uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue);
5921 bool log_this_slave_update = g_ndb_log_slave_updates;
5922 bool count_this_event =
true;
5924 if (share == ndb_apply_status_share)
5930 if (opt_ndb_log_apply_status ||
5933 Uint32 ndb_apply_status_logging_server_id= originating_server_id;
5934 Uint32 ndb_apply_status_server_id= 0;
5935 Uint64 ndb_apply_status_epoch= 0;
5936 bool event_has_data =
false;
5943 event_has_data =
true;
5952 if (likely( event_has_data ))
5955 uint n_fields= table->s->fields;
5957 uint32 bitbuf[128 / (
sizeof(uint32) * 8)];
5958 bitmap_init(&b, bitbuf, n_fields, FALSE);
5960 ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5961 ndb_apply_status_server_id= (uint)((
Field_long *)table->field[0])->val_int();
5962 ndb_apply_status_epoch= ((Field_longlong *)table->field[1])->val_int();
5964 if (opt_ndb_log_apply_status)
5970 if ((ndb_apply_status_logging_server_id != 0) &&
5971 (! ndbcluster_anyvalue_is_reserved(ndb_apply_status_logging_server_id)))
5973 bool isFromImmediateMaster = (ndb_apply_status_server_id ==
5974 ndb_apply_status_logging_server_id);
5976 if (isFromImmediateMaster)
5983 assert(ndb_apply_status_logging_server_id != ::server_id);
5985 originating_server_id= 0;
5990 if (opt_ndb_log_orig)
5993 ndb_binlog_index_row *row= ndb_find_binlog_index_row
5994 (rows, ndb_apply_status_server_id, 1);
5995 row->orig_epoch= ndb_apply_status_epoch;
6000 if (opt_ndb_log_apply_status)
6006 count_this_event =
false;
6007 log_this_slave_update =
true;
6016 if (originating_server_id == 0)
6017 originating_server_id= ::server_id;
6021 if (likely( count_this_event ))
6022 trans_slave_row_count++;
6024 if (!log_this_slave_update)
6041 uint32 logged_server_id= anyValue;
6042 ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
6044 DBUG_ASSERT(trans.good());
6045 DBUG_ASSERT(table != 0);
6047 dbug_print_table(
"table", table);
6049 uint n_fields= table->s->fields;
6050 DBUG_PRINT(
"info", (
"Assuming %u columns for table %s",
6051 n_fields, table->s->table_name.str));
6054 uint32 bitbuf[128 / (
sizeof(uint32) * 8)];
6055 const bool own_buffer = n_fields <=
sizeof(bitbuf) * 8;
6056 bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE);
6069 uchar* blobs_buffer[2] = { 0, 0 };
6070 uint blobs_buffer_size[2] = { 0, 0 };
6072 ndb_binlog_index_row *row=
6073 ndb_find_binlog_index_row(rows, originating_server_id, 0);
6078 if (likely( count_this_event ))
6083 DBUG_PRINT(
"info", (
"INSERT INTO %s.%s",
6084 table->s->db.str, table->s->table_name.str));
6087 if (share->flags & NSF_BLOB_FLAG)
6089 my_ptrdiff_t ptrdiff= 0;
6090 ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
6092 blobs_buffer_size[0],
6096 ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
6097 ret = trans.write_row(logged_server_id,
6099 &b, n_fields, table->record[0]);
6104 if (likely( count_this_event ))
6109 DBUG_PRINT(
"info",(
"DELETE FROM %s.%s",
6110 table->s->db.str, table->s->table_name.str));
6117 if (!get_binlog_full(share) && table->s->primary_key != MAX_KEY)
6130 if (share->flags & NSF_BLOB_FLAG)
6132 my_ptrdiff_t ptrdiff= table->record[
n] - table->record[0];
6133 ret = get_ndb_blobs_value(table, event_data->ndb_value[n],
6135 blobs_buffer_size[n],
6139 ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
6140 DBUG_EXECUTE(
"info", print_records(table, table->record[n]););
6141 ret = trans.delete_row(logged_server_id,
6143 &b, n_fields, table->record[n]);
6148 if (likely( count_this_event ))
6153 DBUG_PRINT(
"info", (
"UPDATE %s.%s",
6154 table->s->db.str, table->s->table_name.str));
6157 if (share->flags & NSF_BLOB_FLAG)
6159 my_ptrdiff_t ptrdiff= 0;
6160 ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
6162 blobs_buffer_size[0],
6166 ndb_unpack_record(table, event_data->ndb_value[0],
6167 &b, table->record[0]);
6168 DBUG_EXECUTE(
"info", print_records(table, table->record[0]););
6169 if (table->s->primary_key != MAX_KEY &&
6170 !get_binlog_use_update(share))
6176 ret = trans.write_row(logged_server_id,
6178 &b, n_fields, table->record[0]);
6187 if (share->flags & NSF_BLOB_FLAG)
6189 my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
6190 ret = get_ndb_blobs_value(table, event_data->ndb_value[1],
6192 blobs_buffer_size[1],
6196 ndb_unpack_record(table, event_data->ndb_value[1], &b, table->record[1]);
6197 DBUG_EXECUTE(
"info", print_records(table, table->record[1]););
6198 ret = trans.update_row(logged_server_id,
6209 DBUG_PRINT(
"info", (
"default - uh oh, a brain exploded."));
6213 if (share->flags & NSF_BLOB_FLAG)
6215 my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
6216 my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
6228 #ifdef RUN_NDB_BINLOG_TIMER
6232 Timer() { start(); }
6233 void start() { gettimeofday(&m_start, 0); }
6234 void stop() { gettimeofday(&m_stop, 0); }
6238 (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
6239 ((longlong) m_stop.tv_usec -
6240 (longlong) m_start.tv_usec + 999) / 1000);
6243 struct timeval m_start,m_stop;
6252 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
6254 my_bool not_used __attribute__((unused)))
6256 *length= schema_object->key_length;
6257 return (uchar*) schema_object->key;
6260 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(
const char *key,
6261 my_bool create_if_not_exists,
6264 NDB_SCHEMA_OBJECT *ndb_schema_object;
6265 uint length= (uint) strlen(key);
6266 DBUG_ENTER(
"ndb_get_schema_object");
6267 DBUG_PRINT(
"enter", (
"key: '%s'", key));
6270 pthread_mutex_lock(&ndbcluster_mutex);
6271 while (!(ndb_schema_object=
6272 (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
6276 if (!create_if_not_exists)
6278 DBUG_PRINT(
"info", (
"does not exist"));
6281 if (!(ndb_schema_object=
6282 (NDB_SCHEMA_OBJECT*) my_malloc(
sizeof(*ndb_schema_object) + length + 1,
6283 MYF(MY_WME | MY_ZEROFILL))))
6285 DBUG_PRINT(
"info", (
"malloc error"));
6288 ndb_schema_object->key= (
char *)(ndb_schema_object+1);
6289 memcpy(ndb_schema_object->key, key, length + 1);
6290 ndb_schema_object->key_length= length;
6291 if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
6293 my_free((uchar*) ndb_schema_object, 0);
6296 pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
6297 bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
6298 sizeof(ndb_schema_object->slock)*8, FALSE);
6299 bitmap_clear_all(&ndb_schema_object->slock_bitmap);
6302 if (ndb_schema_object)
6304 ndb_schema_object->use_count++;
6305 DBUG_PRINT(
"info", (
"use_count: %d", ndb_schema_object->use_count));
6308 pthread_mutex_unlock(&ndbcluster_mutex);
6309 DBUG_RETURN(ndb_schema_object);
6313 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
6316 DBUG_ENTER(
"ndb_free_schema_object");
6317 DBUG_PRINT(
"enter", (
"key: '%s'", (*ndb_schema_object)->key));
6319 pthread_mutex_lock(&ndbcluster_mutex);
6320 if (!--(*ndb_schema_object)->use_count)
6322 DBUG_PRINT(
"info", (
"use_count: %d", (*ndb_schema_object)->use_count));
6323 my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
6324 pthread_mutex_destroy(&(*ndb_schema_object)->mutex);
6325 my_free((uchar*) *ndb_schema_object, MYF(0));
6326 *ndb_schema_object= 0;
6330 DBUG_PRINT(
"info", (
"use_count: %d", (*ndb_schema_object)->use_count));
6333 pthread_mutex_unlock(&ndbcluster_mutex);
6339 remove_event_operations(
Ndb* ndb)
6341 DBUG_ENTER(
"remove_event_operations");
6343 while ((op= ndb->getEventOperation()))
6345 DBUG_ASSERT(!IS_NDB_BLOB_PREFIX(op->getEvent()->
getTable()->
getName()));
6346 DBUG_PRINT(
"info", (
"removing event operation on %s",
6350 DBUG_ASSERT(event_data);
6353 DBUG_ASSERT(share != NULL);
6354 DBUG_ASSERT(share->op == op || share->new_op == op);
6357 op->setCustomData(NULL);
6359 pthread_mutex_lock(&share->mutex);
6362 pthread_mutex_unlock(&share->mutex);
6364 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog free use_count: %u",
6365 share->key, share->use_count));
6373 extern long long g_event_data_count;
6374 extern long long g_event_nondata_count;
6375 extern long long g_event_bytes_count;
6377 void updateInjectorStats(
Ndb* schemaNdb,
Ndb* dataNdb)
6382 g_event_data_count =
6383 schemaNdb->getClientStat(Ndb::DataEventsRecvdCount) +
6384 dataNdb->getClientStat(Ndb::DataEventsRecvdCount);
6385 g_event_nondata_count =
6386 schemaNdb->getClientStat(Ndb::NonDataEventsRecvdCount) +
6387 dataNdb->getClientStat(Ndb::NonDataEventsRecvdCount);
6388 g_event_bytes_count =
6389 schemaNdb->getClientStat(Ndb::EventBytesRecvdCount) +
6390 dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
6393 enum Binlog_thread_state
6400 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
6401 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
6404 ndb_binlog_thread_func(
void *arg)
6410 injector *inj= injector::instance();
6411 uint incident_id= 0;
6412 Binlog_thread_state do_ndbcluster_binlog_close_connection;
6419 bool do_incident =
true;
6421 #ifdef RUN_NDB_BINLOG_TIMER
6425 pthread_mutex_lock(&injector_mutex);
6430 DBUG_ENTER(
"ndb_binlog_thread");
6433 THD_CHECK_SENTRY(thd);
6439 thd->thread_id= thread_id++;
6442 thd->thread_stack= (
char*) &thd;
6443 if (thd->store_globals())
6447 ndb_binlog_thread_running= -1;
6448 pthread_mutex_unlock(&injector_mutex);
6449 pthread_cond_signal(&injector_cond);
6458 thd->init_for_queries();
6459 thd_set_command(thd, COM_DAEMON);
6460 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
6461 #ifndef NDB_THD_HAS_NO_VERSION
6462 thd->version= refresh_version;
6464 thd->client_capabilities= 0;
6465 thd->security_ctx->skip_grants();
6469 thd->set_current_stmt_binlog_format_row();
6474 sql_print_information(
"Starting Cluster Binlog Thread");
6476 pthread_detach_this_thread();
6477 thd->real_id= pthread_self();
6479 add_global_thread(thd);
6481 thd->lex->start_transaction_opt= 0;
6484 restart_cluster_failure:
6485 int have_injector_mutex_lock= 0;
6486 do_ndbcluster_binlog_close_connection= BCCC_exit;
6488 if (!(thd_ndb= Thd_ndb::seize(thd)))
6490 sql_print_error(
"Could not allocate Thd_ndb object");
6491 ndb_binlog_thread_running= -1;
6492 pthread_mutex_unlock(&injector_mutex);
6493 pthread_cond_signal(&injector_cond);
6497 if (!(s_ndb=
new Ndb(g_ndb_cluster_connection, NDB_REP_DB)) ||
6500 sql_print_error(
"NDB Binlog: Getting Schema Ndb object failed");
6501 ndb_binlog_thread_running= -1;
6502 pthread_mutex_unlock(&injector_mutex);
6503 pthread_cond_signal(&injector_cond);
6508 if (!(i_ndb=
new Ndb(g_ndb_cluster_connection,
"")) ||
6511 sql_print_error(
"NDB Binlog: Getting Ndb object failed");
6512 ndb_binlog_thread_running= -1;
6513 pthread_mutex_unlock(&injector_mutex);
6514 pthread_cond_signal(&injector_cond);
6519 (void) my_hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
6520 (my_hash_get_key)ndb_schema_objects_get_key, 0, 0);
6530 injector_ndb= i_ndb;
6533 if (opt_bin_log && opt_ndb_log_bin)
6535 ndb_binlog_running= TRUE;
6539 ndb_binlog_thread_running= 1;
6540 pthread_mutex_unlock(&injector_mutex);
6541 pthread_cond_signal(&injector_cond);
6548 while (!mysqld_server_started)
6551 set_timespec(abstime, 1);
6554 if (ndbcluster_terminating)
6564 while (do_incident && ndb_binlog_running)
6570 if (incident_id == 0)
6573 mysql_bin_log.get_current_log(&log_info);
6574 int len= strlen(log_info.log_file_name);
6576 if ((sscanf(log_info.log_file_name + len - 6,
"%u", &no) == 1) &&
6590 { C_STRING_WITH_LEN(
"mysqld startup") },
6591 { C_STRING_WITH_LEN(
"cluster disconnect")}
6593 int ret = inj->record_incident(thd, INCIDENT_LOST_EVENTS,
6595 assert(ret == 0); NDB_IGNORE_VALUE(ret);
6596 do_incident =
false;
6601 thd->proc_info=
"Waiting for ndbcluster to start";
6603 pthread_mutex_lock(&injector_mutex);
6604 while (!ndb_schema_share ||
6605 (ndb_binlog_running && !ndb_apply_status_share) ||
6606 !ndb_binlog_tables_inited)
6608 if (!thd_ndb->valid_ndb())
6616 have_injector_mutex_lock= 1;
6617 do_ndbcluster_binlog_close_connection= BCCC_restart;
6622 set_timespec(abstime, 1);
6623 pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
6624 if (ndbcluster_binlog_terminating)
6626 pthread_mutex_unlock(&injector_mutex);
6630 pthread_mutex_unlock(&injector_mutex);
6632 DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
6633 thd_set_thd_ndb(thd, thd_ndb);
6634 thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
6640 thd->proc_info=
"Waiting for first event from ndbcluster";
6641 int schema_res, res;
6645 DBUG_PRINT(
"info", (
"Waiting for the first event"));
6647 if (ndbcluster_binlog_terminating)
6650 schema_res= s_ndb->
pollEvents(100, &schema_gci);
6651 }
while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
6652 if (ndb_binlog_running)
6654 Uint64 gci= i_ndb->getLatestGCI();
6655 while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
6657 if (ndbcluster_binlog_terminating)
6661 if (gci > schema_gci)
6667 DBUG_PRINT(
"info", (
"schema_res: %d schema_gci: %u/%u", schema_res,
6668 (uint)(schema_gci >> 32),
6669 (uint)(schema_gci)));
6671 i_ndb->flushIncompleteEvents(schema_gci);
6672 s_ndb->flushIncompleteEvents(schema_gci);
6673 if (schema_gci < ndb_latest_handled_binlog_epoch)
6675 sql_print_error(
"NDB Binlog: cluster has been restarted --initial or with older filesystem. "
6676 "ndb_latest_handled_binlog_epoch: %u/%u, while current epoch: %u/%u. "
6677 "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
6678 (uint)(ndb_latest_handled_binlog_epoch >> 32),
6679 (uint)(ndb_latest_handled_binlog_epoch),
6680 (uint)(schema_gci >> 32),
6681 (uint)(schema_gci));
6682 ndb_set_latest_trans_gci(0);
6683 ndb_latest_handled_binlog_epoch= 0;
6684 ndb_latest_applied_binlog_epoch= 0;
6685 ndb_latest_received_binlog_epoch= 0;
6687 else if (ndb_latest_applied_binlog_epoch > 0)
6689 sql_print_warning(
"NDB Binlog: cluster has reconnected. "
6690 "Changes to the database that occured while "
6691 "disconnected will not be in the binlog");
6693 if (opt_ndb_extra_logging)
6695 sql_print_information(
"NDB Binlog: starting log at epoch %u/%u",
6696 (uint)(schema_gci >> 32),
6697 (uint)(schema_gci));
6706 ndb_binlog_is_ready= TRUE;
6708 if (opt_ndb_extra_logging)
6709 sql_print_information(
"NDB Binlog: ndb tables writable");
6710 close_cached_tables((THD*) 0, (
TABLE_LIST*) 0, FALSE, FALSE, FALSE);
6716 ndb_notify_tables_writable();
6719 static char db[]=
"";
6723 do_ndbcluster_binlog_close_connection= BCCC_running;
6724 for ( ; !((ndbcluster_binlog_terminating ||
6725 do_ndbcluster_binlog_close_connection) &&
6726 ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
6727 do_ndbcluster_binlog_close_connection != BCCC_restart; )
6730 if (do_ndbcluster_binlog_close_connection)
6732 DBUG_PRINT(
"info", (
"do_ndbcluster_binlog_close_connection: %d, "
6733 "ndb_latest_handled_binlog_epoch: %u/%u, "
6734 "*get_latest_trans_gci(): %u/%u",
6735 do_ndbcluster_binlog_close_connection,
6736 (uint)(ndb_latest_handled_binlog_epoch >> 32),
6737 (uint)(ndb_latest_handled_binlog_epoch),
6738 (uint)(ndb_get_latest_trans_gci() >> 32),
6739 (uint)(ndb_get_latest_trans_gci())));
6742 #ifdef RUN_NDB_BINLOG_TIMER
6744 sql_print_information(
"main_timer %ld ms", main_timer.elapsed_ms());
6751 thd->proc_info=
"Waiting for event from ndbcluster";
6755 Uint64 gci= 0, schema_gci;
6756 int res= 0, tot_poll_wait= 1000;
6757 if (ndb_binlog_running)
6762 int schema_res= s_ndb->
pollEvents(tot_poll_wait, &schema_gci);
6763 ndb_latest_received_binlog_epoch= gci;
6765 while (gci > schema_gci && schema_res >= 0)
6767 static char buf[64];
6768 thd->proc_info=
"Waiting for schema epoch";
6769 my_snprintf(buf,
sizeof(buf),
"%s %u/%u(%u/%u)", thd->proc_info,
6770 (uint)(schema_gci >> 32),
6774 thd->proc_info=
buf;
6775 schema_res= s_ndb->
pollEvents(10, &schema_gci);
6778 if ((ndbcluster_binlog_terminating ||
6779 do_ndbcluster_binlog_close_connection) &&
6780 (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
6781 !ndb_binlog_running))
6785 my_pthread_getspecific_ptr(
MEM_ROOT**, THR_MALLOC);
6788 init_sql_alloc(&mem_root, 4096, 0);
6791 *root_ptr= &mem_root;
6793 if (unlikely(schema_res > 0))
6795 thd->proc_info=
"Processing events from schema table";
6796 g_ndb_log_slave_updates= opt_log_slave_updates;
6798 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6800 setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6804 if (!pOp->hasError())
6806 ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
6807 &post_epoch_log_list,
6808 &post_epoch_unlock_list,
6810 DBUG_PRINT(
"info", (
"s_ndb first: %s", s_ndb->getEventOperation() ?
6813 DBUG_PRINT(
"info", (
"i_ndb first: %s", i_ndb->getEventOperation() ?
6816 if (i_ndb->getEventOperation() == NULL &&
6817 s_ndb->getEventOperation() == NULL &&
6818 do_ndbcluster_binlog_close_connection == BCCC_running)
6820 DBUG_PRINT(
"info", (
"do_ndbcluster_binlog_close_connection= BCCC_restart"));
6821 do_ndbcluster_binlog_close_connection= BCCC_restart;
6822 if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6824 sql_print_error(
"NDB Binlog: latest transaction in epoch %u/%u not in binlog "
6825 "as latest received epoch is %u/%u",
6826 (uint)(ndb_get_latest_trans_gci() >> 32),
6827 (uint)(ndb_get_latest_trans_gci()),
6828 (uint)(ndb_latest_received_binlog_epoch >> 32),
6829 (uint)(ndb_latest_received_binlog_epoch));
6834 sql_print_error(
"NDB: error %lu (%s) on handling "
6835 "binlog schema event",
6840 updateInjectorStats(s_ndb, i_ndb);
6843 if (!ndb_binlog_running)
6856 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6858 ndb_binlog_index_row row;
6859 ndb_binlog_thread_handle_non_data_event(thd, pOp, row);
6862 if (i_ndb->getEventOperation() == NULL &&
6863 s_ndb->getEventOperation() == NULL &&
6864 do_ndbcluster_binlog_close_connection == BCCC_running)
6866 DBUG_PRINT(
"info", (
"do_ndbcluster_binlog_close_connection= BCCC_restart"));
6867 do_ndbcluster_binlog_close_connection= BCCC_restart;
6870 updateInjectorStats(s_ndb, i_ndb);
6873 (ndb_log_empty_epochs() &&
6874 gci > ndb_latest_handled_binlog_epoch))
6876 DBUG_PRINT(
"info", (
"pollEvents res: %d", res));
6877 thd->proc_info=
"Processing events";
6878 uchar apply_status_buf[512];
6879 TABLE *apply_status_table= NULL;
6880 if (ndb_apply_status_share)
6892 DBUG_ASSERT(ndb_apply_status_share->op);
6895 DBUG_ASSERT(event_data);
6897 apply_status_table= event_data->shadow_table;
6902 empty_record(apply_status_table);
6904 apply_status_table->field[0]->store((longlong)::server_id,
true);
6909 apply_status_table->field[2]->store(
"", 0, &my_charset_bin);
6910 apply_status_table->field[3]->store((longlong)0,
true);
6911 apply_status_table->field[4]->store((longlong)0,
true);
6912 DBUG_ASSERT(
sizeof(apply_status_buf) >= apply_status_table->s->reclength);
6913 memcpy(apply_status_buf, apply_status_table->record[0],
6914 apply_status_table->s->reclength);
6917 ndb_binlog_index_row _row;
6918 ndb_binlog_index_row *rows= &_row;
6920 unsigned trans_row_count= 0;
6921 unsigned trans_slave_row_count= 0;
6931 DBUG_PRINT(
"info", (
"Writing empty epoch for gci %llu", gci));
6932 DBUG_PRINT(
"info", (
"Initializing transaction"));
6933 inj->new_trans(thd, &trans);
6935 memset(&_row, 0,
sizeof(_row));
6936 thd->variables.character_set_client= &my_charset_latin1;
6937 goto commit_to_binlog;
6942 #ifdef RUN_NDB_BINLOG_TIMER
6943 Timer gci_timer, write_timer;
6948 DBUG_PRINT(
"info", (
"Handling gci: %u/%u",
6954 DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
6957 g_ndb_log_slave_updates= opt_log_slave_updates;
6959 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6960 i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6962 memset(&_row, 0,
sizeof(_row));
6963 thd->variables.character_set_client= &my_charset_latin1;
6964 DBUG_PRINT(
"info", (
"Initializing transaction"));
6965 inj->new_trans(thd, &trans);
6967 trans_slave_row_count= 0;
6977 uint end= sprintf(&errmsg[0],
6978 "Detected missing data in GCI %llu, "
6979 "inserting GAP event", gci);
6982 (
"Detected missing data in GCI %llu, "
6983 "inserting GAP event", gci));
6984 LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
6985 inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
6992 NDB_SHARE *share= (event_data)?event_data->share:NULL;
6993 DBUG_PRINT(
"info", (
"per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
6994 (
long) gci_op, (
long) share, event_types));
6999 DBUG_PRINT(
"info", (
"Skipped TE_STOP on table %s",
7000 gci_op->getEvent()->getTable()->getName()));
7004 if (share == NULL || event_data->shadow_table == NULL)
7006 DBUG_PRINT(
"info", (
"no share or table %s!",
7007 gci_op->getEvent()->getTable()->getName()));
7010 if (share == ndb_apply_status_share)
7015 TABLE *table= event_data->shadow_table;
7017 const LEX_STRING &name= table->s->table_name;
7023 DBUG_PRINT(
"info", (
"skipping non data event table: %.*s",
7024 (
int) name.length, name.str));
7030 (
"Found new data event, initializing transaction"));
7031 inj->new_trans(thd, &trans);
7033 DBUG_PRINT(
"info", (
"use_table: %.*s, cols %u",
7034 (
int) name.length, name.str,
7037 int ret = trans.use_table(::server_id, tbl);
7038 assert(ret == 0); NDB_IGNORE_VALUE(ret);
7043 if (apply_status_table)
7046 const LEX_STRING& name= apply_status_table->s->table_name;
7047 DBUG_PRINT(
"info", (
"use_table: %.*s",
7048 (
int) name.length, name.str));
7051 int ret = trans.use_table(::server_id, tbl);
7052 assert(ret == 0); NDB_IGNORE_VALUE(ret);
7055 Field *field= apply_status_table->field[1];
7056 my_ptrdiff_t row_offset=
7057 (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
7058 field->move_field_offset(row_offset);
7059 field->store((longlong)gci,
true);
7060 field->move_field_offset(-row_offset);
7062 trans.write_row(::server_id,
7065 &apply_status_table->s->all_set,
7066 apply_status_table->s->fields,
7071 sql_print_error(
"NDB: Could not get apply status share");
7074 #ifdef RUN_NDB_BINLOG_TIMER
7075 write_timer.start();
7079 #ifdef RUN_NDB_BINLOG_TIMER
7082 if (pOp->hasError() &&
7083 ndb_binlog_thread_handle_error(i_ndb, pOp) < 0)
7090 NDB_SHARE *share= (event_data)?event_data->share:NULL;
7092 (
"EVENT TYPE: %d GCI: %u/%u last applied: %u/%u "
7096 (uint)(ndb_latest_applied_binlog_epoch >> 32),
7097 (uint)(ndb_latest_applied_binlog_epoch),
7099 share ? share->db :
"'NULL'",
7100 share ? share->table_name :
"'NULL'"));
7101 DBUG_ASSERT(share != 0);
7115 DBUG_ASSERT(gci_op == pOp);
7120 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
7121 ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans, trans_row_count, trans_slave_row_count);
7124 ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
7125 DBUG_PRINT(
"info", (
"s_ndb first: %s", s_ndb->getEventOperation() ?
7128 DBUG_PRINT(
"info", (
"i_ndb first: %s", i_ndb->getEventOperation() ?
7131 if (i_ndb->getEventOperation() == NULL &&
7132 s_ndb->getEventOperation() == NULL &&
7133 do_ndbcluster_binlog_close_connection == BCCC_running)
7135 DBUG_PRINT(
"info", (
"do_ndbcluster_binlog_close_connection= BCCC_restart"));
7136 do_ndbcluster_binlog_close_connection= BCCC_restart;
7137 if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
7139 sql_print_error(
"NDB Binlog: latest transaction in epoch %lu not in binlog "
7140 "as latest received epoch is %lu",
7141 (ulong) ndb_get_latest_trans_gci(),
7142 (ulong) ndb_latest_received_binlog_epoch);
7148 }
while (pOp && pOp->
getGCI() == gci);
7150 updateInjectorStats(s_ndb, i_ndb);
7156 #ifdef RUN_NDB_BINLOG_TIMER
7160 while (trans.good())
7162 if (!ndb_log_empty_epochs())
7176 if ((trans_row_count == 0) &&
7177 (! (opt_ndb_log_apply_status &&
7178 trans_slave_row_count) ))
7181 if (
int r= trans.rollback())
7183 sql_print_error(
"NDB Binlog: "
7184 "Error during ROLLBACK of GCI %u/%u. Error: %d",
7185 uint(gci >> 32), uint(gci), r);
7192 thd->proc_info=
"Committing events to binlog";
7194 if (
int r= trans.
commit())
7196 sql_print_error(
"NDB Binlog: "
7197 "Error during COMMIT of GCI. Error: %d",
7201 rows->gci= (Uint32)(gci >> 32);
7203 rows->master_log_file= start.file_name();
7204 rows->master_log_pos= start.file_pos();
7206 DBUG_PRINT(
"info", (
"COMMIT gci: %lu", (ulong) gci));
7207 if (opt_ndb_log_binlog_index)
7209 if (ndb_binlog_index_table__write_rows(thd, rows))
7218 volatile THD::killed_state killed= thd->killed;
7220 thd->killed= THD::NOT_KILLED;
7221 ndb_binlog_index_table__write_rows(thd, rows);
7223 thd->killed= killed;
7228 ndb_latest_applied_binlog_epoch= gci;
7231 ndb_latest_handled_binlog_epoch= gci;
7233 #ifdef RUN_NDB_BINLOG_TIMER
7235 sql_print_information(
"gci %ld event_count %d write time "
7236 "%ld(%d e/s), total time %ld(%d e/s)",
7237 (ulong)gci, event_count,
7238 write_timer.elapsed_ms(),
7239 (1000*event_count) / write_timer.elapsed_ms(),
7240 gci_timer.elapsed_ms(),
7241 (1000*event_count) / gci_timer.elapsed_ms());
7247 uint end= sprintf(&errmsg[0],
7248 "Detected missing data in GCI %llu, "
7249 "inserting GAP event", gci);
7252 (
"Detected missing data in GCI %llu, "
7253 "inserting GAP event", gci));
7254 LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
7255 inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
7259 ndb_binlog_thread_handle_schema_event_post_epoch(thd,
7260 &post_epoch_log_list,
7261 &post_epoch_unlock_list);
7262 free_root(&mem_root, MYF(0));
7263 *root_ptr= old_root;
7264 ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
7267 if (do_ndbcluster_binlog_close_connection != BCCC_restart)
7269 sql_print_information(
"Stopping Cluster Binlog");
7270 DBUG_PRINT(
"info",(
"Shutting down cluster binlog thread"));
7271 thd->proc_info=
"Shutting down";
7275 sql_print_information(
"Restarting Cluster Binlog");
7276 DBUG_PRINT(
"info",(
"Restarting cluster binlog thread"));
7277 thd->proc_info=
"Restarting";
7279 if (!have_injector_mutex_lock)
7280 pthread_mutex_lock(&injector_mutex);
7285 pthread_mutex_unlock(&injector_mutex);
7293 ndb_binlog_tables_inited= FALSE;
7295 if (ndb_apply_status_share)
7298 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra free use_count: %u",
7299 ndb_apply_status_share->key,
7300 ndb_apply_status_share->use_count));
7301 free_share(&ndb_apply_status_share);
7302 ndb_apply_status_share= 0;
7304 if (ndb_schema_share)
7307 pthread_mutex_lock(&ndb_schema_share_mutex);
7309 DBUG_PRINT(
"NDB_SHARE", (
"%s binlog extra free use_count: %u",
7310 ndb_schema_share->key,
7311 ndb_schema_share->use_count));
7312 free_share(&ndb_schema_share);
7313 ndb_schema_share= 0;
7314 pthread_mutex_unlock(&ndb_schema_share_mutex);
7321 remove_event_operations(s_ndb);
7327 remove_event_operations(i_ndb);
7332 my_hash_free(&ndb_schema_objects);
7336 Thd_ndb::release(thd_ndb);
7337 thd_set_thd_ndb(thd, NULL);
7345 if (opt_ndb_extra_logging > 9)
7346 sql_print_information(
"NDB Binlog: Release extra share references");
7348 pthread_mutex_lock(&ndbcluster_mutex);
7349 for (uint i= 0; i < ndbcluster_open_tables.records;)
7353 if (share->state != NSS_DROPPED)
7358 share->state= NSS_DROPPED;
7360 DBUG_PRINT(
"NDB_SHARE", (
"%s create free use_count: %u",
7361 share->key, share->use_count));
7362 free_share(&share, TRUE);
7375 pthread_mutex_unlock(&ndbcluster_mutex);
7378 close_cached_tables((THD*) 0, (
TABLE_LIST*) 0, FALSE, FALSE, FALSE);
7379 if (opt_ndb_extra_logging > 15)
7381 sql_print_information(
"NDB Binlog: remaining open tables: ");
7382 for (uint i= 0; i < ndbcluster_open_tables.records; i++)
7385 sql_print_information(
" %s.%s state: %u use_count: %u",
7393 if (do_ndbcluster_binlog_close_connection == BCCC_restart)
7395 pthread_mutex_lock(&injector_mutex);
7396 goto restart_cluster_failure;
7403 ndb_binlog_thread_running= -1;
7404 ndb_binlog_running= FALSE;
7405 (void) pthread_cond_signal(&injector_cond);
7407 DBUG_PRINT(
"exit", (
"ndb_binlog_thread"));
7416 ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
7417 enum ha_stat_type stat_type)
7421 ulonglong ndb_latest_epoch= 0;
7422 DBUG_ENTER(
"ndbcluster_show_status_binlog");
7424 pthread_mutex_lock(&injector_mutex);
7427 char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
7428 ndb_latest_epoch= injector_ndb->getLatestGCI();
7429 pthread_mutex_unlock(&injector_mutex);
7432 my_snprintf(buf,
sizeof(buf),
7434 "latest_trans_epoch=%s, "
7435 "latest_received_binlog_epoch=%s, "
7436 "latest_handled_binlog_epoch=%s, "
7437 "latest_applied_binlog_epoch=%s",
7438 llstr(ndb_latest_epoch, buff1),
7439 llstr(ndb_get_latest_trans_gci(), buff2),
7440 llstr(ndb_latest_received_binlog_epoch, buff3),
7441 llstr(ndb_latest_handled_binlog_epoch, buff4),
7442 llstr(ndb_latest_applied_binlog_epoch, buff5));
7443 if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
7444 "binlog", strlen(
"binlog"),
7449 pthread_mutex_unlock(&injector_mutex);
7475 #define NDB_ANYVALUE_RESERVED_BIT 0x80000000
7476 #define NDB_ANYVALUE_RESERVED_MASK 0x8000007f
7478 #define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
7481 void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
7487 const Uint32 userDataMask = ~(opt_server_id_mask |
7488 NDB_ANYVALUE_RESERVED_BIT);
7490 anyValue |= userDataMask;
7494 bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
7496 return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
7499 bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
7501 return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
7502 NDB_ANYVALUE_NOLOGGING_CODE);
7505 void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
7507 anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
7510 void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
7513 anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
7514 anyValue &= ~(opt_server_id_mask);
7517 bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
7519 return ((serverId & ~opt_server_id_mask) == 0);
7522 Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
7524 assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
7526 return (anyValue & opt_server_id_mask);
7529 void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
7531 assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
7532 anyValue &= ~(opt_server_id_mask);
7533 anyValue |= (serverId & opt_server_id_mask);
7536 #ifdef NDB_WITHOUT_SERVER_ID_BITS
7539 ulong opt_server_id_mask = ~0;