27 #include "my_global.h"
30 #include "sql_cache.h"
31 #include "sql_locale.h"
34 #include "sql_parse.h"
41 #include "rpl_filter.h"
42 #include "rpl_record.h"
43 #include "transaction.h"
45 #include "rpl_rli_pdb.h"
52 #include <my_bitmap.h>
53 #include "rpl_utility.h"
61 const char *binlog_checksum_type_names[]= {
67 unsigned int binlog_checksum_type_length[]= {
73 TYPELIB binlog_checksum_typelib=
75 array_elements(binlog_checksum_type_names) - 1,
"",
76 binlog_checksum_type_names,
77 binlog_checksum_type_length
81 #define log_cs &my_charset_latin1
83 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
91 #define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
97 template unsigned int available_buffer<unsigned int>(
const char*,
105 template bool valid_buffer_range<unsigned int>(
unsigned int,
115 const uchar checksum_version_split[3]= {5, 6, 1};
116 const ulong checksum_version_product=
117 (checksum_version_split[0] * 256 + checksum_version_split[1]) * 256 +
118 checksum_version_split[2];
120 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
121 static int rows_event_stmt_cleanup(
Relay_log_info const *rli, THD* thd);
123 static const char *HA_ERR(
int i)
131 case HA_ERR_KEY_NOT_FOUND:
return "HA_ERR_KEY_NOT_FOUND";
132 case HA_ERR_FOUND_DUPP_KEY:
return "HA_ERR_FOUND_DUPP_KEY";
133 case HA_ERR_RECORD_CHANGED:
return "HA_ERR_RECORD_CHANGED";
134 case HA_ERR_WRONG_INDEX:
return "HA_ERR_WRONG_INDEX";
135 case HA_ERR_CRASHED:
return "HA_ERR_CRASHED";
136 case HA_ERR_WRONG_IN_RECORD:
return "HA_ERR_WRONG_IN_RECORD";
137 case HA_ERR_OUT_OF_MEM:
return "HA_ERR_OUT_OF_MEM";
138 case HA_ERR_NOT_A_TABLE:
return "HA_ERR_NOT_A_TABLE";
139 case HA_ERR_WRONG_COMMAND:
return "HA_ERR_WRONG_COMMAND";
140 case HA_ERR_OLD_FILE:
return "HA_ERR_OLD_FILE";
141 case HA_ERR_NO_ACTIVE_RECORD:
return "HA_ERR_NO_ACTIVE_RECORD";
142 case HA_ERR_RECORD_DELETED:
return "HA_ERR_RECORD_DELETED";
143 case HA_ERR_RECORD_FILE_FULL:
return "HA_ERR_RECORD_FILE_FULL";
144 case HA_ERR_INDEX_FILE_FULL:
return "HA_ERR_INDEX_FILE_FULL";
145 case HA_ERR_END_OF_FILE:
return "HA_ERR_END_OF_FILE";
146 case HA_ERR_UNSUPPORTED:
return "HA_ERR_UNSUPPORTED";
147 case HA_ERR_TO_BIG_ROW:
return "HA_ERR_TO_BIG_ROW";
148 case HA_WRONG_CREATE_OPTION:
return "HA_WRONG_CREATE_OPTION";
149 case HA_ERR_FOUND_DUPP_UNIQUE:
return "HA_ERR_FOUND_DUPP_UNIQUE";
150 case HA_ERR_UNKNOWN_CHARSET:
return "HA_ERR_UNKNOWN_CHARSET";
151 case HA_ERR_WRONG_MRG_TABLE_DEF:
return "HA_ERR_WRONG_MRG_TABLE_DEF";
152 case HA_ERR_CRASHED_ON_REPAIR:
return "HA_ERR_CRASHED_ON_REPAIR";
153 case HA_ERR_CRASHED_ON_USAGE:
return "HA_ERR_CRASHED_ON_USAGE";
154 case HA_ERR_LOCK_WAIT_TIMEOUT:
return "HA_ERR_LOCK_WAIT_TIMEOUT";
155 case HA_ERR_LOCK_TABLE_FULL:
return "HA_ERR_LOCK_TABLE_FULL";
156 case HA_ERR_READ_ONLY_TRANSACTION:
return "HA_ERR_READ_ONLY_TRANSACTION";
157 case HA_ERR_LOCK_DEADLOCK:
return "HA_ERR_LOCK_DEADLOCK";
158 case HA_ERR_CANNOT_ADD_FOREIGN:
return "HA_ERR_CANNOT_ADD_FOREIGN";
159 case HA_ERR_NO_REFERENCED_ROW:
return "HA_ERR_NO_REFERENCED_ROW";
160 case HA_ERR_ROW_IS_REFERENCED:
return "HA_ERR_ROW_IS_REFERENCED";
161 case HA_ERR_NO_SAVEPOINT:
return "HA_ERR_NO_SAVEPOINT";
162 case HA_ERR_NON_UNIQUE_BLOCK_SIZE:
return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
163 case HA_ERR_NO_SUCH_TABLE:
return "HA_ERR_NO_SUCH_TABLE";
164 case HA_ERR_TABLE_EXIST:
return "HA_ERR_TABLE_EXIST";
165 case HA_ERR_NO_CONNECTION:
return "HA_ERR_NO_CONNECTION";
166 case HA_ERR_NULL_IN_SPATIAL:
return "HA_ERR_NULL_IN_SPATIAL";
167 case HA_ERR_TABLE_DEF_CHANGED:
return "HA_ERR_TABLE_DEF_CHANGED";
168 case HA_ERR_NO_PARTITION_FOUND:
return "HA_ERR_NO_PARTITION_FOUND";
169 case HA_ERR_RBR_LOGGING_FAILED:
return "HA_ERR_RBR_LOGGING_FAILED";
170 case HA_ERR_DROP_INDEX_FK:
return "HA_ERR_DROP_INDEX_FK";
171 case HA_ERR_FOREIGN_DUPLICATE_KEY:
return "HA_ERR_FOREIGN_DUPLICATE_KEY";
172 case HA_ERR_TABLE_NEEDS_UPGRADE:
return "HA_ERR_TABLE_NEEDS_UPGRADE";
173 case HA_ERR_TABLE_READONLY:
return "HA_ERR_TABLE_READONLY";
174 case HA_ERR_AUTOINC_READ_FAILED:
return "HA_ERR_AUTOINC_READ_FAILED";
175 case HA_ERR_AUTOINC_ERANGE:
return "HA_ERR_AUTOINC_ERANGE";
176 case HA_ERR_GENERIC:
return "HA_ERR_GENERIC";
177 case HA_ERR_RECORD_IS_THE_SAME:
return "HA_ERR_RECORD_IS_THE_SAME";
178 case HA_ERR_LOGGING_IMPOSSIBLE:
return "HA_ERR_LOGGING_IMPOSSIBLE";
179 case HA_ERR_CORRUPT_EVENT:
return "HA_ERR_CORRUPT_EVENT";
180 case HA_ERR_ROWS_EVENT_APPLY :
return "HA_ERR_ROWS_EVENT_APPLY";
181 case HA_ERR_INNODB_READ_ONLY:
return "HA_ERR_INNODB_READ_ONLY";
199 static void inline slave_rows_error_report(
enum loglevel
level,
int ha_error,
202 const char *log_name, ulong pos)
204 const char *handler_error= (ha_error ? HA_ERR(ha_error) : NULL);
205 char buff[MAX_SLAVE_ERRMSG], *slider;
206 const char *buff_end= buff +
sizeof(buff);
209 thd->get_stmt_da()->sql_conditions();
213 for (err= it++, slider= buff; err && slider < buff_end - 1;
214 slider += len, err= it++)
216 len= my_snprintf(slider, buff_end - slider,
217 " %s, Error_code: %d;", err->get_message_text(),
218 err->get_sql_errno());
222 rli->
report(level, thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0,
223 "Could not execute %s event on table %s.%s;"
224 "%s handler error %s; "
225 "the event's master log %s, end_log_pos %lu",
226 type, table->s->db.str, table->s->table_name.str,
227 buff, handler_error == NULL ?
"<unknown>" : handler_error,
230 rli->
report(level, thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0,
231 "Could not execute %s event on table %s.%s;"
232 "%s the event's master log %s, end_log_pos %lu",
233 type, table->s->db.str, table->s->table_name.str,
234 buff, log_name, pos);
237 static void set_thd_db(THD *thd,
const char *db, uint32 db_len)
239 char lcase_db_buf[NAME_LEN +1];
241 new_db.length= db_len;
242 if (lower_case_table_names == 1)
244 strmov(lcase_db_buf, db);
245 my_casedn_str(system_charset_info, lcase_db_buf);
246 new_db.str= lcase_db_buf;
249 new_db.str= (
char*) db;
251 new_db.str= (
char*) rpl_filter->get_rewrite_db(new_db.str,
253 thd->set_db(new_db.str, new_db.length);
264 static void pretty_print_str(
IO_CACHE* cache,
const char* str,
int len)
266 const char* end = str + len;
267 my_b_printf(cache,
"\'");
271 switch ((c=*str++)) {
272 case '\n': my_b_printf(cache,
"\\n");
break;
273 case '\r': my_b_printf(cache,
"\\r");
break;
274 case '\\': my_b_printf(cache,
"\\\\");
break;
275 case '\b': my_b_printf(cache,
"\\b");
break;
276 case '\t': my_b_printf(cache,
"\\t");
break;
277 case '\'': my_b_printf(cache,
"\\'");
break;
278 case 0 : my_b_printf(cache,
"\\0");
break;
280 my_b_printf(cache,
"%c", c);
284 my_b_printf(cache,
"\'");
288 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
292 thd->is_slave_error = 0;
297 inline int idempotent_error_code(
int err_code)
322 case HA_ERR_RECORD_CHANGED:
323 case HA_ERR_KEY_NOT_FOUND:
324 case HA_ERR_END_OF_FILE:
325 case HA_ERR_FOUND_DUPP_KEY:
326 case HA_ERR_FOUND_DUPP_UNIQUE:
327 case HA_ERR_FOREIGN_DUPLICATE_KEY:
328 case HA_ERR_NO_REFERENCED_ROW:
329 case HA_ERR_ROW_IS_REFERENCED:
343 inline int ignored_error_code(
int err_code)
345 #ifdef HAVE_NDB_BINLOG
351 case ER_DB_CREATE_EXISTS:
352 case ER_DB_DROP_EXISTS:
359 return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
360 (use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
371 int convert_handler_error(
int error, THD* thd,
TABLE *table)
373 uint actual_error= (thd->is_error() ? thd->get_stmt_da()->sql_errno() :
376 if (actual_error == 0)
379 actual_error= (thd->is_error() ? thd->get_stmt_da()->sql_errno() :
381 if (actual_error == ER_UNKNOWN_ERROR)
383 sql_print_warning(
"Unknown error detected %d in handler", error);
386 return (actual_error);
389 inline bool concurrency_error_code(
int error)
393 case ER_LOCK_WAIT_TIMEOUT:
394 case ER_LOCK_DEADLOCK:
395 case ER_XA_RBDEADLOCK:
402 inline bool unexpected_error_code(
int unexpected_error)
404 switch (unexpected_error)
406 case ER_NET_READ_ERROR:
407 case ER_NET_ERROR_ON_WRITE:
408 case ER_QUERY_INTERRUPTED:
409 case ER_SERVER_SHUTDOWN:
410 case ER_NEW_ABORTING_CONNECTION:
421 static char *pretty_print_str(
char *packet,
const char *str,
int len)
423 const char *end= str + len;
429 switch ((c=*str++)) {
430 case '\n': *pos++=
'\\'; *pos++=
'n';
break;
431 case '\r': *pos++=
'\\'; *pos++=
'r';
break;
432 case '\\': *pos++=
'\\'; *pos++=
'\\';
break;
433 case '\b': *pos++=
'\\'; *pos++=
'b';
break;
434 case '\t': *pos++=
'\\'; *pos++=
't';
break;
435 case '\'': *pos++=
'\\'; *pos++=
'\'';
break;
436 case 0 : *pos++=
'\\'; *pos++=
'0';
break;
448 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
462 static char *slave_load_file_stem(
char *
buf, uint file_id,
463 int event_server_id,
const char *ext)
466 fn_format(buf,PREFIX_SQL_LOAD,slave_load_tmpdir,
"", MY_UNPACK_FILENAME);
470 int appended_length= sprintf(buf,
"%s-%d-", server_uuid, event_server_id);
471 buf+= appended_length;
472 res= int10_to_str(file_id, buf, 10);
479 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
485 static void cleanup_load_tmpdir()
492 if (!(dirp=my_dir(slave_load_tmpdir,MYF(0))))
503 p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD));
504 sprintf(p,
"%s-",server_uuid);
506 for (i=0 ; i < (uint)dirp->number_off_files; i++)
508 file=dirp->dir_entry+
i;
509 if (is_prefix(file->name, prefbuf))
511 fn_format(fname,file->name,slave_load_tmpdir,
"",MY_UNPACK_FILENAME);
529 static bool write_str_at_most_255_bytes(
IO_CACHE *file,
const char *str,
533 tmp[0]= (uchar) length;
534 return (my_b_safe_write(file, tmp,
sizeof(tmp)) ||
535 my_b_safe_write(file, (uchar*) str, length));
550 static inline int read_str_at_most_255_bytes(
const char **
buf,
555 if (*buf + ((uint) (uchar) **buf) >= buf_end)
559 (*buf)+= (uint) *len+1;
574 to= octet2hex(to, from, len);
577 to= strmov(to,
"\"\"");
594 uint32
const orig_len= to->length();
595 if (to->reserve(orig_len + from->length()*2+3))
598 beg= to->c_ptr_quick() + to->length();
600 if (csinfo->escape_with_backslash_is_dangerous)
601 ptr=
str_to_hex(ptr, from->ptr(), from->length());
605 if (!(thd->variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES))
607 ptr+= escape_string_for_mysql(csinfo, ptr, 0,
608 from->ptr(), from->length());
612 const char *frm_str= from->ptr();
614 for (; frm_str < (from->ptr() + from->length()); frm_str++)
617 if (*frm_str ==
'\'')
626 to->length(orig_len + ptr - beg);
639 static void print_set_option(
IO_CACHE* file, uint32 bits_changed,
643 if (bits_changed & option)
646 my_b_printf(file,
", ");
647 my_b_printf(file,
"%s=%d", name,
test(flags & option));
664 case START_EVENT_V3:
return "Start_v3";
665 case STOP_EVENT:
return "Stop";
666 case QUERY_EVENT:
return "Query";
667 case ROTATE_EVENT:
return "Rotate";
668 case INTVAR_EVENT:
return "Intvar";
669 case LOAD_EVENT:
return "Load";
670 case NEW_LOAD_EVENT:
return "New_load";
671 case CREATE_FILE_EVENT:
return "Create_file";
672 case APPEND_BLOCK_EVENT:
return "Append_block";
673 case DELETE_FILE_EVENT:
return "Delete_file";
674 case EXEC_LOAD_EVENT:
return "Exec_load";
675 case RAND_EVENT:
return "RAND";
676 case XID_EVENT:
return "Xid";
677 case USER_VAR_EVENT:
return "User var";
678 case FORMAT_DESCRIPTION_EVENT:
return "Format_desc";
679 case TABLE_MAP_EVENT:
return "Table_map";
680 case PRE_GA_WRITE_ROWS_EVENT:
return "Write_rows_event_old";
681 case PRE_GA_UPDATE_ROWS_EVENT:
return "Update_rows_event_old";
682 case PRE_GA_DELETE_ROWS_EVENT:
return "Delete_rows_event_old";
683 case WRITE_ROWS_EVENT_V1:
return "Write_rows_v1";
684 case UPDATE_ROWS_EVENT_V1:
return "Update_rows_v1";
685 case DELETE_ROWS_EVENT_V1:
return "Delete_rows_v1";
686 case BEGIN_LOAD_QUERY_EVENT:
return "Begin_load_query";
687 case EXECUTE_LOAD_QUERY_EVENT:
return "Execute_load_query";
688 case INCIDENT_EVENT:
return "Incident";
689 case IGNORABLE_LOG_EVENT:
return "Ignorable";
690 case ROWS_QUERY_LOG_EVENT:
return "Rows_query";
691 case WRITE_ROWS_EVENT:
return "Write_rows";
692 case UPDATE_ROWS_EVENT:
return "Update_rows";
693 case DELETE_ROWS_EVENT:
return "Delete_rows";
694 case GTID_LOG_EVENT:
return "Gtid";
695 case ANONYMOUS_GTID_LOG_EVENT:
return "Anonymous_Gtid";
696 case PREVIOUS_GTIDS_LOG_EVENT:
return "Previous_gtids";
697 case HEARTBEAT_LOG_EVENT:
return "Heartbeat";
698 default:
return "Unknown";
714 enum_event_cache_type cache_type_arg,
715 enum_event_logging_type logging_type_arg)
716 :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg),
717 event_cache_type(cache_type_arg),
718 event_logging_type(logging_type_arg),
719 crc(0), thd(thd_arg), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
721 server_id= thd->server_id;
722 unmasked_server_id= server_id;
723 when= thd->start_time;
734 enum_event_logging_type logging_type_arg)
735 :temp_buf(0), exec_time(0), flags(0), event_cache_type(cache_type_arg),
736 event_logging_type(logging_type_arg), crc(0), thd(0),
737 checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
739 server_id= ::server_id;
740 unmasked_server_id= server_id;
758 :temp_buf(0), exec_time(0),
759 event_cache_type(EVENT_INVALID_CACHE),
760 event_logging_type(EVENT_INVALID_LOGGING),
761 crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
766 when.tv_sec= uint4korr(buf);
768 server_id = uint4korr(buf + SERVER_ID_OFFSET);
769 unmasked_server_id = server_id;
773 #ifdef HAVE_REPLICATION
774 server_id = unmasked_server_id & opt_server_id_mask;
776 server_id = unmasked_server_id;
778 data_written= uint4korr(buf + EVENT_LEN_OFFSET);
779 if (description_event->binlog_version==1)
786 log_pos= uint4korr(buf + LOG_POS_OFFSET);
797 if (description_event->binlog_version==3 &&
798 buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
811 log_pos+= data_written;
813 DBUG_PRINT(
"info", (
"log_pos: %lu", (ulong) log_pos));
815 flags= uint2korr(buf + FLAGS_OFFSET);
816 if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
817 (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
838 #ifdef HAVE_REPLICATION
839 inline int Log_event::do_apply_event_worker(Slave_worker *w)
841 return do_apply_event(w);
847 DBUG_ASSERT(!rli->belongs_to_client());
861 DBUG_ASSERT(!is_mts_worker(rli->info_thd));
872 DBUG_PRINT(
"info", (
"ev->server_id=%lu, ::server_id=%lu,"
873 " rli->replicate_same_server_id=%d,"
874 " rli->slave_skip_counter=%d",
875 (ulong) server_id, (ulong) ::server_id,
876 rli->replicate_same_server_id,
877 rli->slave_skip_counter));
878 if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
879 (rli->slave_skip_counter == 1 && rli->
is_in_group()))
881 else if (rli->slave_skip_counter > 0)
892 int Log_event::pack_info(
Protocol *protocol)
894 protocol->
store(
"", &my_charset_bin);
902 int Log_event::net_send(
Protocol *protocol,
const char* log_name, my_off_t pos)
904 const char *p= strrchr(log_name, FN_LIBCHAR);
905 const char *event_type;
909 protocol->prepare_for_resend();
910 protocol->
store(log_name, &my_charset_bin);
911 protocol->
store((ulonglong) pos);
913 protocol->
store(event_type, strlen(event_type), &my_charset_bin);
914 protocol->
store((uint32) server_id);
915 protocol->
store((ulonglong) log_pos);
916 if (pack_info(protocol))
918 return protocol->write();
929 void Log_event::init_show_field_list(
List<Item>* field_list)
932 field_list->push_back(
new Item_return_int(
"Pos", MY_INT32_NUM_DECIMAL_DIGITS,
933 MYSQL_TYPE_LONGLONG));
938 MY_INT32_NUM_DECIMAL_DIGITS,
939 MYSQL_TYPE_LONGLONG));
961 my_bool Log_event::need_checksum()
963 DBUG_ENTER(
"Log_event::need_checksum");
971 if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
972 ret= (checksum_alg != BINLOG_CHECKSUM_ALG_OFF);
973 else if (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF &&
975 ret= binlog_checksum_options;
986 DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
989 if (checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
991 binlog_checksum_options : (uint8) BINLOG_CHECKSUM_ALG_OFF;
994 ((checksum_alg == binlog_checksum_options ||
1000 get_type_code() == STOP_EVENT ||
1007 get_type_code() == ROTATE_EVENT ||
1012 get_type_code() == PREVIOUS_GTIDS_LOG_EVENT ||
1014 get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
1015 checksum_alg != BINLOG_CHECKSUM_ALG_OFF));
1017 DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
1018 DBUG_ASSERT(((get_type_code() != ROTATE_EVENT &&
1019 get_type_code() != STOP_EVENT) ||
1020 get_type_code() != FORMAT_DESCRIPTION_EVENT) ||
1026 bool Log_event::wrapper_my_b_safe_write(
IO_CACHE* file,
const uchar* buf, ulong
size)
1028 if (need_checksum() && size != 0)
1029 crc= my_checksum(
crc, buf, size);
1031 return my_b_safe_write(file, buf, size);
1034 bool Log_event::write_footer(
IO_CACHE* file)
1040 if (need_checksum())
1043 int4store(buf,
crc);
1044 return (my_b_safe_write(file, (uchar*) buf,
sizeof(buf)));
1053 bool Log_event::write_header(
IO_CACHE* file, ulong event_data_length)
1055 uchar header[LOG_EVENT_HEADER_LEN];
1058 DBUG_ENTER(
"Log_event::write_header");
1061 data_written= event_data_length +
sizeof(header);
1063 if (need_checksum())
1065 crc= my_checksum(0L, NULL, 0);
1074 if (is_artificial_event())
1110 log_pos= my_b_safe_tell(file)+data_written;
1113 now= (ulong) get_time();
1114 if (DBUG_EVALUATE_IF(
"inc_event_time_by_1_hour",1,0) &&
1115 DBUG_EVALUATE_IF(
"dec_event_time_by_1_hour",1,0))
1125 DBUG_EXECUTE_IF(
"inc_event_time_by_1_hour", now= now + 3600;);
1126 DBUG_EXECUTE_IF(
"dec_event_time_by_1_hour", now= now - 3600;);
1136 int4store(header, now);
1137 header[EVENT_TYPE_OFFSET]= get_type_code();
1138 int4store(header+ SERVER_ID_OFFSET, server_id);
1139 int4store(header+ EVENT_LEN_OFFSET, data_written);
1140 int4store(header+ LOG_POS_OFFSET, log_pos);
1147 if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT ||
1148 !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F))
1150 int2store(header+ FLAGS_OFFSET, flags);
1151 ret= wrapper_my_b_safe_write(file, header,
sizeof(header)) != 0;
1155 ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0);
1158 flags &= ~LOG_EVENT_BINLOG_IN_USE_F;
1159 int2store(header + FLAGS_OFFSET, flags);
1160 crc= my_checksum(
crc, header + FLAGS_OFFSET,
sizeof(flags));
1161 flags |= LOG_EVENT_BINLOG_IN_USE_F;
1162 int2store(header + FLAGS_OFFSET, flags);
1163 ret= (my_b_safe_write(file, header + FLAGS_OFFSET,
sizeof(flags)) != 0);
1166 ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET +
sizeof(flags),
1168 - (FLAGS_OFFSET +
sizeof(flags))) != 0);
1181 uint8 checksum_alg_arg,
1182 const char *log_file_name_arg,
1183 bool* is_binlog_active)
1187 char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
1188 uchar ev_offset= packet->length();
1189 DBUG_ENTER(
"Log_event::read_log_event(IO_CACHE *, String *, mysql_mutex_t, uint8)");
1194 if (log_file_name_arg)
1195 *is_binlog_active= mysql_bin_log.
is_active(log_file_name_arg);
1197 if (my_b_read(file, (uchar*) buf,
sizeof(buf)))
1204 DBUG_PRINT(
"error",(
"my_b_read failed. file->error: %d", file->error));
1206 result= LOG_READ_EOF;
1208 result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
1211 data_len= uint4korr(buf + EVENT_LEN_OFFSET);
1212 if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
1213 data_len > max(current_thd->variables.max_allowed_packet,
1214 opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
1216 DBUG_PRINT(
"error",(
"data_len is out of bounds. data_len: %lu", data_len));
1217 result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
1218 LOG_READ_TOO_LARGE);
1223 if (packet->append(buf,
sizeof(buf)))
1225 DBUG_PRINT(
"info", (
"first packet->append failed (out of memory)"));
1227 result= LOG_READ_MEM;
1230 data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
1234 if (packet->append(file, data_len))
1248 DBUG_PRINT(
"info", (
"second packet->append failed (out of memory)"));
1249 result= (my_errno == ENOMEM ? LOG_READ_MEM :
1250 (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
1256 DBUG_EXECUTE_IF(
"corrupt_read_log_event",
1257 uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
1258 if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
1261 debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
1262 DBUG_PRINT(
"info", (
"Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
1263 DBUG_SET(
"-d,corrupt_read_log_event");
1269 if (opt_master_verify_checksum &&
1271 data_len +
sizeof(
buf),
1274 DBUG_PRINT(
"info", (
"checksum test failed"));
1275 result= LOG_READ_CHECKSUM_FAILURE;
1284 DBUG_PRINT(
"info", (
"read_log_event returns %d", result));
1285 DBUG_RETURN(result);
1289 #ifndef MYSQL_CLIENT
1290 #define UNLOCK_MUTEX if (log_lock) mysql_mutex_unlock(log_lock);
1291 #define LOCK_MUTEX if (log_lock) mysql_mutex_lock(log_lock);
1293 #define UNLOCK_MUTEX
1297 #ifndef MYSQL_CLIENT
1314 DBUG_ENTER(
"Log_event::read_log_event(IO_CACHE *[, mysql_mutex_t *], Format_description_log_event *, my_bool)");
1315 DBUG_ASSERT(description_event != 0);
1316 char head[LOG_EVENT_MINIMAL_HEADER_LEN];
1324 uint header_size= min<uint>(description_event->common_header_len,
1325 LOG_EVENT_MINIMAL_HEADER_LEN);
1328 DBUG_PRINT(
"info", (
"my_b_tell: %lu", (ulong) my_b_tell(file)));
1329 if (my_b_read(file, (uchar *) head, header_size))
1331 DBUG_PRINT(
"info", (
"Log_event::read_log_event(IO_CACHE*,Format_desc*) "
1332 "failed in my_b_read((IO_CACHE*)%p, (uchar*)%p, %u)",
1333 file, head, header_size));
1342 ulong data_len = uint4korr(head + EVENT_LEN_OFFSET);
1344 const char *error= 0;
1346 #ifndef max_allowed_packet
1347 THD *thd=current_thd;
1348 uint max_allowed_packet= thd ? slave_max_allowed_packet : ~0
U;
1351 ulong
const max_size=
1352 max<ulong>(max_allowed_packet,
1353 opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER);
1354 if (data_len > max_size)
1356 error =
"Event too big";
1360 if (data_len < header_size)
1362 error =
"Event too small";
1367 if (!(buf = (
char*) my_malloc(data_len+1, MYF(MY_WME))))
1369 error =
"Out of memory";
1373 memcpy(buf, head, header_size);
1374 if (my_b_read(file, (uchar*) buf + header_size, data_len - header_size))
1376 error =
"read error";
1379 if ((res= read_log_event(buf, data_len, &error, description_event, crc_check)))
1380 res->register_temp_buf(buf);
1386 DBUG_ASSERT(error != 0);
1387 sql_print_error(
"Error in Log_event::read_log_event(): "
1388 "'%s', data_len: %lu, event_type: %d",
1389 error,data_len,head[EVENT_TYPE_OFFSET]);
1412 Log_event* Log_event::read_log_event(
const char* buf, uint event_len,
1419 DBUG_ENTER(
"Log_event::read_log_event(char *, uint, char **, Format_description_log_event *, my_bool)");
1420 DBUG_ASSERT(description_event != 0);
1421 DBUG_PRINT(
"info", (
"binlog_version: %d", description_event->binlog_version));
1422 DBUG_DUMP(
"data", (
unsigned char*) buf, event_len);
1425 if (event_len < EVENT_LEN_OFFSET ||
1426 buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
1427 (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
1429 DBUG_PRINT(
"error", (
"event_len=%u EVENT_LEN_OFFSET=%d "
1430 "buf[EVENT_TYPE_OFFSET]=%d ENUM_END_EVENT=%d "
1431 "uint4korr(buf+EVENT_LEN_OFFSET)=%d",
1432 event_len, EVENT_LEN_OFFSET,
1433 buf[EVENT_TYPE_OFFSET], ENUM_END_EVENT,
1434 uint4korr(buf+EVENT_LEN_OFFSET)));
1435 *error=
"Sanity check failed";
1439 uint event_type= buf[EVENT_TYPE_OFFSET];
1441 if (event_type == START_EVENT_V3)
1462 alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
1465 DBUG_EXECUTE_IF(
"corrupt_read_log_event_char",
1466 if (event_type != FORMAT_DESCRIPTION_EVENT)
1468 char *debug_event_buf_c = (
char *)buf;
1470 debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
1471 DBUG_PRINT(
"info", (
"Corrupt the event at Log_event::read_log_event(char*,...): byte on position %d", debug_cor_pos));
1478 *error=
"Event crc check failed! Most likely there is event corruption.";
1482 ev=
new Unknown_log_event(buf, description_event);
1489 if (event_type > description_event->number_of_event_types &&
1490 event_type != FORMAT_DESCRIPTION_EVENT)
1496 DBUG_PRINT(
"error", (
"event type %d found, but the current "
1497 "Format_description_log_event supports only %d event "
1498 "types", event_type,
1499 description_event->number_of_event_types));
1514 if (description_event->event_type_permutation)
1516 int new_event_type= description_event->event_type_permutation[event_type];
1517 DBUG_PRINT(
"info", (
"converting event type %d to %d (%s)",
1518 event_type, new_event_type,
1520 event_type= new_event_type;
1523 if (alg != BINLOG_CHECKSUM_ALG_UNDEF &&
1524 (event_type == FORMAT_DESCRIPTION_EVENT ||
1525 alg != BINLOG_CHECKSUM_ALG_OFF))
1528 switch(event_type) {
1530 ev =
new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
1535 case NEW_LOAD_EVENT:
1541 case CREATE_FILE_EVENT:
1544 case APPEND_BLOCK_EVENT:
1547 case DELETE_FILE_EVENT:
1550 case EXEC_LOAD_EVENT:
1553 case START_EVENT_V3:
1568 case USER_VAR_EVENT:
1571 case FORMAT_DESCRIPTION_EVENT:
1574 #if defined(HAVE_REPLICATION)
1575 case PRE_GA_WRITE_ROWS_EVENT:
1578 case PRE_GA_UPDATE_ROWS_EVENT:
1581 case PRE_GA_DELETE_ROWS_EVENT:
1584 case WRITE_ROWS_EVENT_V1:
1587 case UPDATE_ROWS_EVENT_V1:
1590 case DELETE_ROWS_EVENT_V1:
1593 case TABLE_MAP_EVENT:
1597 case BEGIN_LOAD_QUERY_EVENT:
1600 case EXECUTE_LOAD_QUERY_EVENT:
1603 case INCIDENT_EVENT:
1606 case ROWS_QUERY_LOG_EVENT:
1609 case GTID_LOG_EVENT:
1610 case ANONYMOUS_GTID_LOG_EVENT:
1613 case PREVIOUS_GTIDS_LOG_EVENT:
1616 #if defined(HAVE_REPLICATION)
1617 case WRITE_ROWS_EVENT:
1620 case UPDATE_ROWS_EVENT:
1623 case DELETE_ROWS_EVENT:
1638 DBUG_PRINT(
"error",(
"Unknown event code: %d",
1639 (
int) buf[EVENT_TYPE_OFFSET]));
1648 ev->checksum_alg= alg;
1649 if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1650 ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1651 ev->
crc= uint4korr(buf + (event_len));
1654 DBUG_PRINT(
"read_event", (
"%s(type_code: %d; event_len: %d)",
1656 buf[EVENT_TYPE_OFFSET],
1669 if (!ev || !ev->is_valid() || (event_type == SLAVE_EVENT))
1671 DBUG_PRINT(
"error",(
"Found invalid event in binary log"));
1677 *error=
"Found invalid event in binary log";
1680 ev=
new Unknown_log_event(buf, description_event);
1682 *error=
"Found invalid event in binary log";
1695 void Log_event::print_header(
IO_CACHE* file,
1696 PRINT_EVENT_INFO* print_event_info,
1697 bool is_more __attribute__((unused)))
1700 my_off_t hexdump_from= print_event_info->hexdump_from;
1701 DBUG_ENTER(
"Log_event::print_header");
1703 my_b_printf(file,
"#");
1704 print_timestamp(file, NULL);
1705 my_b_printf(file,
" server id %lu end_log_pos %s ", (ulong) server_id,
1706 llstr(log_pos,llbuff));
1710 if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
1711 checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
1714 size_t const bytes_written=
1715 my_snprintf(checksum_buf,
sizeof(checksum_buf),
"0x%08lx ", (ulong)
crc);
1716 my_b_printf(file,
"%s ", get_type(&binlog_checksum_typelib, checksum_alg));
1717 my_b_printf(file, checksum_buf, bytes_written);
1721 if (print_event_info->hexdump_from)
1723 my_b_printf(file,
"\n");
1724 uchar *ptr= (uchar*)temp_buf;
1726 uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN;
1730 char *h, hex_string[49]= {0};
1731 char *c, char_string[16+1]= {0};
1734 if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN)
1737 my_b_printf(file,
"# Position Timestamp Type Master ID "
1738 "Size Master Pos Flags \n");
1739 size_t const bytes_written=
1740 my_snprintf(emit_buf,
sizeof(emit_buf),
1741 "# %8.8lx %02x %02x %02x %02x %02x "
1742 "%02x %02x %02x %02x %02x %02x %02x %02x "
1743 "%02x %02x %02x %02x %02x %02x\n",
1744 (
unsigned long) hexdump_from,
1745 ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6],
1746 ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13],
1747 ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]);
1748 DBUG_ASSERT(static_cast<size_t>(bytes_written) <
sizeof(emit_buf));
1749 my_b_write(file, (uchar*) emit_buf, bytes_written);
1750 ptr += LOG_EVENT_MINIMAL_HEADER_LEN;
1751 hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN;
1755 for (i= 0, c= char_string, h=hex_string;
1759 my_snprintf(h, 4, (i % 16 <= 7) ?
"%02x " :
" %02x", *ptr);
1762 *c++= my_isalnum(&my_charset_bin, *ptr) ? *ptr :
'.';
1773 size_t const bytes_written=
1774 my_snprintf(emit_buf,
sizeof(emit_buf),
1775 "# %8.8lx %-48.48s |%16s|\n",
1776 (
unsigned long) (hexdump_from + (i & 0xfffffff0)),
1777 hex_string, char_string);
1778 DBUG_ASSERT(static_cast<size_t>(bytes_written) <
sizeof(emit_buf));
1779 my_b_write(file, (uchar*) emit_buf, bytes_written);
1787 DBUG_ASSERT(hex_string[48] == 0);
1793 memset(h,
' ', (
sizeof(hex_string) -1) - (h - hex_string));
1794 size_t const bytes_written=
1795 my_snprintf(emit_buf,
sizeof(emit_buf),
1796 "# %8.8lx %-48.48s |%s|\n",
1797 (
unsigned long) (hexdump_from + (i & 0xfffffff0)),
1798 hex_string, char_string);
1799 DBUG_ASSERT(static_cast<size_t>(bytes_written) <
sizeof(emit_buf));
1800 my_b_write(file, (uchar*) emit_buf, bytes_written);
1806 my_b_write(file, reinterpret_cast<const uchar*>(
"# "), 2);
1823 my_b_write_quoted(
IO_CACHE *file,
const uchar *ptr, uint length,
bool esc_all)
1826 my_b_printf(file,
"'");
1827 for (s= ptr; length > 0 ; s++, length--)
1829 if (*s > 0x1F && !esc_all)
1830 my_b_write(file, s, 1);
1834 size_t len= my_snprintf((
char*) hex,
sizeof(hex),
"%s%02x",
"\\x", *s);
1835 my_b_write(file, hex, len);
1838 my_b_printf(file,
"'");
1843 my_b_write_quoted(
IO_CACHE *file,
const uchar *ptr, uint length)
1845 my_b_write_quoted(file, ptr, length,
false);
1857 my_b_write_bit(
IO_CACHE *file,
const uchar *ptr, uint nbits)
1859 uint bitnum, nbits8= ((nbits + 7) / 8) * 8, skip_bits= nbits8 - nbits;
1860 my_b_printf(file,
"b'");
1861 for (bitnum= skip_bits ; bitnum < nbits8; bitnum++)
1863 int is_set= (ptr[(bitnum) / 8] >> (7 - bitnum % 8)) & 0x01;
1864 my_b_write(file, (
const uchar*) (is_set ?
"1" :
"0"), 1);
1866 my_b_printf(file,
"'");
1882 my_b_write_quoted_with_length(
IO_CACHE *file,
const uchar *ptr, uint length)
1887 my_b_write_quoted(file, ptr + 1, length);
1892 length= uint2korr(ptr);
1893 my_b_write_quoted(file, ptr + 2, length);
1907 my_b_write_sint32_and_uint32(
IO_CACHE *file, int32 si, uint32 ui)
1909 my_b_printf(file,
"%d", si);
1911 my_b_printf(file,
" (%u)", ui);
1928 log_event_print_value(
IO_CACHE *file,
const uchar *ptr,
1929 uint type, uint meta,
1930 char *typestr,
size_t typestr_length)
1934 if (type == MYSQL_TYPE_STRING)
1938 uint byte0= meta >> 8;
1939 uint byte1= meta & 0xFF;
1941 if ((byte0 & 0x30) != 0x30)
1944 length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
1948 length = meta & 0xFF;
1955 case MYSQL_TYPE_LONG:
1957 int32 si= sint4korr(ptr);
1958 uint32 ui= uint4korr(ptr);
1959 my_b_write_sint32_and_uint32(file, si, ui);
1960 my_snprintf(typestr, typestr_length,
"INT");
1964 case MYSQL_TYPE_TINY:
1966 my_b_write_sint32_and_uint32(file, (
int) (
signed char) *ptr,
1967 (uint) (
unsigned char) *ptr);
1968 my_snprintf(typestr, typestr_length,
"TINYINT");
1972 case MYSQL_TYPE_SHORT:
1974 int32 si= (int32) sint2korr(ptr);
1975 uint32 ui= (uint32) uint2korr(ptr);
1976 my_b_write_sint32_and_uint32(file, si, ui);
1977 my_snprintf(typestr, typestr_length,
"SHORTINT");
1981 case MYSQL_TYPE_INT24:
1983 int32 si= sint3korr(ptr);
1984 uint32 ui= uint3korr(ptr);
1985 my_b_write_sint32_and_uint32(file, si, ui);
1986 my_snprintf(typestr, typestr_length,
"MEDIUMINT");
1990 case MYSQL_TYPE_LONGLONG:
1993 longlong si= sint8korr(ptr);
1994 longlong10_to_str(si, tmp, -10);
1995 my_b_printf(file,
"%s", tmp);
1998 ulonglong ui= uint8korr(ptr);
1999 longlong10_to_str((longlong) ui, tmp, 10);
2000 my_b_printf(file,
" (%s)", tmp);
2002 my_snprintf(typestr, typestr_length,
"LONGINT");
2006 case MYSQL_TYPE_NEWDECIMAL:
2008 uint precision= meta >> 8;
2009 uint decimals= meta & 0xFF;
2010 uint bin_size= my_decimal_get_binary_size(precision, decimals);
2012 binary2my_decimal(E_DEC_FATAL_ERROR, (uchar*) ptr, &dec,
2013 precision, decimals);
2015 char buff[512], *pos;
2017 pos+= sprintf(buff,
"%s", dec.sign() ?
"-" :
"");
2018 end= ROUND_UP(dec.frac) + ROUND_UP(dec.intg)-1;
2019 for (i=0; i < end; i++)
2020 pos+= sprintf(pos,
"%09d.", dec.buf[i]);
2021 pos+= sprintf(pos,
"%09d", dec.buf[i]);
2022 my_b_printf(file,
"%s", buff);
2023 my_snprintf(typestr, typestr_length,
"DECIMAL(%d,%d)",
2024 precision, decimals);
2028 case MYSQL_TYPE_FLOAT:
2033 sprintf(tmp,
"%-20g", (
double) fl);
2034 my_b_printf(file,
"%s", tmp);
2035 my_snprintf(typestr, typestr_length,
"FLOAT");
2039 case MYSQL_TYPE_DOUBLE:
2042 float8get(dbl, ptr);
2044 sprintf(tmp,
"%-.20g", dbl);
2045 my_b_printf(file,
"%s", tmp);
2046 strcpy(typestr,
"DOUBLE");
2050 case MYSQL_TYPE_BIT:
2053 uint nbits= ((meta >> 8) * 8) + (meta & 0xFF);
2054 length= (nbits + 7) / 8;
2055 my_b_write_bit(file, ptr, nbits);
2056 my_snprintf(typestr, typestr_length,
"BIT(%d)", nbits);
2060 case MYSQL_TYPE_TIMESTAMP:
2062 uint32 i32= uint4korr(ptr);
2063 my_b_printf(file,
"%d", i32);
2064 my_snprintf(typestr, typestr_length,
"TIMESTAMP");
2068 case MYSQL_TYPE_TIMESTAMP2:
2070 char buf[MAX_DATE_STRING_REP_LENGTH];
2072 my_timestamp_from_binary(&tm, ptr, meta);
2073 int buflen= my_timeval_to_str(&tm, buf, meta);
2074 my_b_write(file, buf, buflen);
2075 my_snprintf(typestr, typestr_length,
"TIMESTAMP(%d)", meta);
2076 return my_timestamp_binary_length(meta);
2079 case MYSQL_TYPE_DATETIME:
2082 uint64 i64= uint8korr(ptr);
2085 my_b_printf(file,
"%04d-%02d-%02d %02d:%02d:%02d",
2086 static_cast<int>(d / 10000),
2087 static_cast<int>(d % 10000) / 100,
2088 static_cast<int>(d % 100),
2089 static_cast<int>(t / 10000),
2090 static_cast<int>(t % 10000) / 100,
2091 static_cast<int>(t % 100));
2092 my_snprintf(typestr, typestr_length,
"DATETIME");
2096 case MYSQL_TYPE_DATETIME2:
2098 char buf[MAX_DATE_STRING_REP_LENGTH];
2100 longlong packed= my_datetime_packed_from_binary(ptr, meta);
2101 TIME_from_longlong_datetime_packed(<ime, packed);
2102 int buflen= my_datetime_to_str(<ime, buf, meta);
2103 my_b_write_quoted(file, (uchar *) buf, buflen);
2104 my_snprintf(typestr, typestr_length,
"DATETIME(%d)", meta);
2105 return my_datetime_binary_length(meta);
2108 case MYSQL_TYPE_TIME:
2110 uint32 i32= uint3korr(ptr);
2111 my_b_printf(file,
"'%02d:%02d:%02d'",
2112 i32 / 10000, (i32 % 10000) / 100, i32 % 100);
2113 my_snprintf(typestr, typestr_length,
"TIME");
2117 case MYSQL_TYPE_TIME2:
2119 char buf[MAX_DATE_STRING_REP_LENGTH];
2121 longlong packed= my_time_packed_from_binary(ptr, meta);
2122 TIME_from_longlong_time_packed(<ime, packed);
2123 int buflen= my_time_to_str(<ime, buf, meta);
2124 my_b_write_quoted(file, (uchar *) buf, buflen);
2125 my_snprintf(typestr, typestr_length,
"TIME(%d)", meta);
2126 return my_time_binary_length(meta);
2129 case MYSQL_TYPE_NEWDATE:
2131 uint32 tmp= uint3korr(ptr);
2134 char *pos= &buf[10];
2138 part=(int) (tmp & 31);
2139 *pos--= (char) (
'0'+part%10);
2140 *pos--= (char) (
'0'+part/10);
2142 part=(int) (tmp >> 5 & 15);
2143 *pos--= (char) (
'0'+part%10);
2144 *pos--= (char) (
'0'+part/10);
2146 part=(int) (tmp >> 9);
2147 *pos--= (char) (
'0'+part%10); part/=10;
2148 *pos--= (char) (
'0'+part%10); part/=10;
2149 *pos--= (char) (
'0'+part%10); part/=10;
2150 *pos= (char) (
'0'+part);
2151 my_b_printf(file ,
"'%s'", buf);
2152 my_snprintf(typestr, typestr_length,
"DATE");
2156 case MYSQL_TYPE_YEAR:
2159 my_b_printf(file,
"%04d", i32+ 1900);
2160 my_snprintf(typestr, typestr_length,
"YEAR");
2164 case MYSQL_TYPE_ENUM:
2165 switch (meta & 0xFF) {
2167 my_b_printf(file,
"%d", (
int) *ptr);
2168 my_snprintf(typestr, typestr_length,
"ENUM(1 byte)");
2172 int32 i32= uint2korr(ptr);
2173 my_b_printf(file,
"%d", i32);
2174 my_snprintf(typestr, typestr_length,
"ENUM(2 bytes)");
2178 my_b_printf(file,
"!! Unknown ENUM packlen=%d", meta & 0xFF);
2183 case MYSQL_TYPE_SET:
2184 my_b_write_bit(file, ptr , (meta & 0xFF) * 8);
2185 my_snprintf(typestr, typestr_length,
"SET(%d bytes)", meta & 0xFF);
2188 case MYSQL_TYPE_BLOB:
2192 my_b_write_quoted(file, ptr + 1, length);
2193 my_snprintf(typestr, typestr_length,
"TINYBLOB/TINYTEXT");
2196 length= uint2korr(ptr);
2197 my_b_write_quoted(file, ptr + 2, length);
2198 my_snprintf(typestr, typestr_length,
"BLOB/TEXT");
2201 length= uint3korr(ptr);
2202 my_b_write_quoted(file, ptr + 3, length);
2203 my_snprintf(typestr, typestr_length,
"MEDIUMBLOB/MEDIUMTEXT");
2206 length= uint4korr(ptr);
2207 my_b_write_quoted(file, ptr + 4, length);
2208 my_snprintf(typestr, typestr_length,
"LONGBLOB/LONGTEXT");
2211 my_b_printf(file,
"!! Unknown BLOB packlen=%d", length);
2215 case MYSQL_TYPE_VARCHAR:
2216 case MYSQL_TYPE_VAR_STRING:
2218 my_snprintf(typestr, typestr_length,
"VARSTRING(%d)", length);
2219 return my_b_write_quoted_with_length(file, ptr, length);
2221 case MYSQL_TYPE_STRING:
2222 my_snprintf(typestr, typestr_length,
"STRING(%d)", length);
2223 return my_b_write_quoted_with_length(file, ptr, length);
2228 my_snprintf(tmp,
sizeof(tmp),
"%04x", meta);
2230 "!! Don't know how to handle column type=%d meta=%d (%s)",
2256 PRINT_EVENT_INFO *print_event_info,
2258 const uchar *value,
const uchar *prefix)
2260 const uchar *value0= value;
2261 const uchar *null_bits= value;
2262 uint null_bit_index= 0;
2263 char typestr[64]=
"";
2265 value+= (m_width + 7) / 8;
2267 my_b_printf(file,
"%s", prefix);
2269 for (
size_t i= 0; i < td->
size(); i ++)
2271 int is_null= (null_bits[null_bit_index / 8]
2272 >> (null_bit_index % 8)) & 0x01;
2274 if (bitmap_is_set(cols_bitmap, i) == 0)
2279 my_b_printf(file,
"### @%d=NULL", static_cast<int>(i + 1));
2283 my_b_printf(file,
"### @%d=", static_cast<int>(i + 1));
2284 size_t size= log_event_print_value(file, value,
2285 td->type(i), td->field_metadata(i),
2286 typestr,
sizeof(typestr));
2293 if (print_event_info->verbose > 1)
2295 my_b_printf(file,
" /* ");
2298 my_b_printf(file,
"%s ", typestr);
2300 my_b_printf(file,
"type=%d ", td->type(i));
2302 my_b_printf(file,
"meta=%d nullable=%d is_null=%d ",
2303 td->field_metadata(i),
2304 td->maybe_null(i), is_null);
2305 my_b_printf(file,
"*/");
2308 my_b_printf(file,
"\n");
2312 return value - value0;
2322 void Rows_log_event::print_verbose(
IO_CACHE *file,
2323 PRINT_EVENT_INFO *print_event_info)
2326 char quoted_db[1 + NAME_LEN * 2 + 2];
2327 char quoted_table[1 + NAME_LEN * 2 + 2];
2328 int quoted_db_len, quoted_table_len;
2331 const char *sql_command, *sql_clause1, *sql_clause2;
2334 if (m_extra_row_data)
2336 uint8 extra_data_len= m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
2337 uint8 extra_payload_len= extra_data_len - EXTRA_ROW_INFO_HDR_BYTES;
2338 assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
2340 my_b_printf(file,
"### Extra row data format: %u, len: %u :",
2341 m_extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET],
2343 if (extra_payload_len)
2349 const int buff_len= 2 + (256 * 2) + 1;
2350 char buff[buff_len];
2351 str_to_hex(buff, (
const char*) &m_extra_row_data[EXTRA_ROW_INFO_HDR_BYTES],
2353 my_b_printf(file,
"%s", buff);
2355 my_b_printf(file,
"\n");
2358 switch (general_type_code) {
2359 case WRITE_ROWS_EVENT:
2360 sql_command=
"INSERT INTO";
2361 sql_clause1=
"### SET\n";
2364 case DELETE_ROWS_EVENT:
2365 sql_command=
"DELETE FROM";
2366 sql_clause1=
"### WHERE\n";
2369 case UPDATE_ROWS_EVENT:
2370 sql_command=
"UPDATE";
2371 sql_clause1=
"### WHERE\n";
2372 sql_clause2=
"### SET\n";
2375 sql_command= sql_clause1= sql_clause2= NULL;
2379 if (!(map= print_event_info->m_table_map.get_table(m_table_id)) ||
2380 !(td= map->create_table_def()))
2383 my_b_printf(file,
"### Row event for unknown table #%s",
2384 llstr(m_table_id, llbuff));
2389 if (((general_type_code == WRITE_ROWS_EVENT) && (m_rows_buf==m_rows_end)))
2391 my_b_printf(file,
"### INSERT INTO `%s`.`%s` VALUES ()\n",
2392 map->get_db_name(), map->get_table_name());
2396 for (
const uchar *value= m_rows_buf; value < m_rows_end; )
2400 quoted_db_len= my_strmov_quoted_identifier(this->thd, (
char *) quoted_db,
2401 map->get_db_name(), 0);
2402 quoted_table_len= my_strmov_quoted_identifier(this->thd,
2403 (
char *) quoted_table,
2404 map->get_table_name(), 0);
2406 quoted_db_len= my_strmov_quoted_identifier((
char *) quoted_db,
2407 map->get_db_name());
2408 quoted_table_len= my_strmov_quoted_identifier((
char *) quoted_table,
2409 map->get_table_name());
2411 quoted_db[quoted_db_len]=
'\0';
2412 quoted_table[quoted_table_len]=
'\0';
2413 my_b_printf(file,
"### %s %s.%s\n",
2415 quoted_db, quoted_table);
2417 if (!(length= print_verbose_one_row(file, td, print_event_info,
2419 (
const uchar*) sql_clause1)))
2426 if (!(length= print_verbose_one_row(file, td, print_event_info,
2428 (
const uchar*) sql_clause2)))
2445 void Log_event::print_base64(
IO_CACHE* file,
2446 PRINT_EVENT_INFO* print_event_info,
2449 const uchar *ptr= (
const uchar *)temp_buf;
2450 uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET);
2451 DBUG_ENTER(
"Log_event::print_base64");
2453 size_t const tmp_str_sz= base64_needed_encoded_length((
int) size);
2454 char *
const tmp_str= (
char *) my_malloc(tmp_str_sz, MYF(MY_WME));
2456 fprintf(stderr,
"\nError: Out of memory. "
2457 "Could not print correct binlog event.\n");
2461 if (base64_encode(ptr, (
size_t) size, tmp_str))
2466 if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS)
2468 if (my_b_tell(file) == 0)
2469 my_b_printf(file,
"\nBINLOG '\n");
2471 my_b_printf(file,
"%s\n", tmp_str);
2474 my_b_printf(file,
"'%s\n", print_event_info->delimiter);
2477 if (print_event_info->verbose)
2482 if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF &&
2483 checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
2488 case TABLE_MAP_EVENT:
2492 glob_description_event);
2493 print_event_info->m_table_map.set_table(map->get_table_id(), map);
2496 case WRITE_ROWS_EVENT:
2497 case WRITE_ROWS_EVENT_V1:
2500 glob_description_event);
2503 case DELETE_ROWS_EVENT:
2504 case DELETE_ROWS_EVENT_V1:
2507 glob_description_event);
2510 case UPDATE_ROWS_EVENT:
2511 case UPDATE_ROWS_EVENT_V1:
2514 glob_description_event);
2523 ev->print_verbose(file, print_event_info);
2537 void Log_event::print_timestamp(
IO_CACHE* file, time_t *ts)
2546 time_t ts_tmp= ts ? *ts : (ulong)when.tv_sec;
2547 DBUG_ENTER(
"Log_event::print_timestamp");
2548 #ifdef MYSQL_SERVER // This is always false
2550 localtime_r(&ts_tmp, (res= &tm_tmp));
2552 res= localtime(&ts_tmp);
2555 my_b_printf(file,
"%02d%02d%02d %2d:%02d:%02d",
2568 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2572 if (rli->slave_skip_counter == 1)
2574 return Log_event::do_shall_skip(rli);
2591 bool Log_event::contains_partition_info(
bool end_group_sets_max_dbs)
2595 switch (get_type_code()) {
2596 case TABLE_MAP_EVENT:
2597 case EXECUTE_LOAD_QUERY_EVENT:
2603 if (ends_group() && end_group_sets_max_dbs)
2607 OVER_MAX_DBS_IN_EVENT_MTS;
2610 res= (!ends_group() && !starts_group()) ?
true :
false;
2660 Slave_job_group
group, *ptr_group= NULL;
2663 Slave_worker *ret_worker= NULL;
2666 THD *thd= rli->info_thd;
2668 Slave_committed_queue *gaq= rli->gaq;
2673 if ((is_s_event= starts_group()) || is_gtid_event(
this) ||
2675 (!rli->curr_group_seen_begin && !rli->curr_group_seen_gtid &&
2684 gaq->get_job_group(rli->gaq->assigned_group_index)->
2685 worker_id != MTS_WORKER_UNDEF)))
2687 if (!rli->curr_group_seen_gtid && !rli->curr_group_seen_begin)
2690 rli->mts_groups_assigned++;
2692 rli->curr_group_isolated= FALSE;
2693 group.reset(log_pos, rli->mts_groups_assigned);
2695 gaq_idx= gaq->assigned_group_index= gaq->en_queue((
void *) &group);
2697 DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < gaq->size);
2698 DBUG_ASSERT(gaq->get_job_group(rli->gaq->assigned_group_index)->
2699 group_relay_log_name == NULL);
2700 DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF);
2701 DBUG_ASSERT(rli->last_assigned_worker == NULL);
2703 if (is_s_event || is_gtid_event(
this))
2707 insert_dynamic(&rli->curr_group_da,
2708 (uchar*) &ptr_curr_ev);
2710 DBUG_ASSERT(rli->curr_group_da.elements == 1);
2715 rli->mts_end_group_sets_max_dbs=
true;
2716 rli->curr_group_seen_begin=
true;
2719 if (is_gtid_event(
this))
2721 rli->curr_group_seen_gtid=
true;
2730 insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
2731 rli->curr_group_seen_begin=
true;
2732 rli->mts_end_group_sets_max_dbs=
true;
2733 DBUG_ASSERT(rli->curr_group_da.elements == 2);
2734 DBUG_ASSERT(starts_group());
2741 if (contains_partition_info(rli->mts_end_group_sets_max_dbs))
2744 num_dbs= mts_number_dbs();
2755 DBUG_ASSERT(!ends_group() ||
2759 (rli->curr_group_seen_begin && rli->curr_group_seen_gtid &&
2761 (rli->mts_end_group_sets_max_dbs &&
2762 ((rli->curr_group_da.elements == 3 && rli->curr_group_seen_gtid) ||
2763 (rli->curr_group_da.elements == 2 && !rli->curr_group_seen_gtid)) &&
2765 dynamic_array_ptr(&rli->curr_group_da,
2766 rli->curr_group_da.elements - 1))->
2767 get_type_code() == BEGIN_LOAD_QUERY_EVENT)));
2770 rli->mts_end_group_sets_max_dbs=
false;
2771 ret_worker= rli->last_assigned_worker;
2772 if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
2776 ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
2778 (void) wait_for_workers_to_finish(rli, ret_worker);
2782 rli->curr_group_isolated= TRUE;
2787 char **ref_cur_db= it.ref();
2790 map_db_to_worker(*ref_cur_db, rli,
2791 &mts_assigned_partitions[i],
2799 llstr(rli->get_event_relay_log_pos(), llbuff);
2800 my_error(ER_MTS_CANT_PARALLEL, MYF(0),
2801 get_type_str(), rli->get_event_relay_log_name(), llbuff,
2802 "could not distribute the event to a Worker");
2806 DBUG_ASSERT(num_dbs != OVER_MAX_DBS_IN_EVENT_MTS || !thd->temporary_tables);
2807 DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
2808 DBUG_ASSERT(ret_worker == mts_assigned_partitions[i]->
worker);
2809 DBUG_ASSERT(mts_assigned_partitions[i]->usage >= 0);
2814 if ((ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index))->
2815 worker_id == MTS_WORKER_UNDEF)
2817 ptr_group->worker_id= ret_worker->id;
2819 DBUG_ASSERT(ptr_group->group_relay_log_name == NULL);
2822 DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
2827 if (rli->last_assigned_worker)
2829 ret_worker= rli->last_assigned_worker;
2831 DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 ||
2832 ret_worker->id == 0);
2838 if (!(get_type_code() == INTVAR_EVENT ||
2839 get_type_code() == RAND_EVENT ||
2840 get_type_code() == USER_VAR_EVENT ||
2841 get_type_code() == ROWS_QUERY_LOG_EVENT ||
2842 get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
2843 get_type_code() == APPEND_BLOCK_EVENT))
2845 DBUG_ASSERT(!ret_worker);
2847 llstr(rli->get_event_relay_log_pos(), llbuff);
2848 my_error(ER_MTS_CANT_PARALLEL, MYF(0),
2849 get_type_str(), rli->get_event_relay_log_name(), llbuff,
2850 "the event is a part of a group that is unsupported in "
2851 "the parallel execution mode");
2856 insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
2858 DBUG_ASSERT(!ret_worker);
2863 DBUG_ASSERT(ret_worker);
2870 if (!ret_worker->master_log_change_notified)
2873 ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
2874 ptr_group->group_master_log_name=
2875 my_strdup(rli->get_group_master_log_name(), MYF(MY_WME));
2876 ret_worker->master_log_change_notified=
true;
2878 DBUG_ASSERT(!ptr_group->notified);
2880 ptr_group->notified=
true;
2885 if (ends_group() || !rli->curr_group_seen_begin)
2887 rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
2888 if (rli->curr_group_isolated)
2889 set_mts_isolate_group();
2891 ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
2893 DBUG_ASSERT(ret_worker != NULL);
2903 if (!ret_worker->relay_log_change_notified)
2911 DBUG_ASSERT(ptr_group->group_relay_log_name == NULL);
2913 ptr_group->group_relay_log_name= (
char *)
2914 my_malloc(strlen(rli->
2915 get_group_relay_log_name()) + 1, MYF(MY_WME));
2916 strcpy(ptr_group->group_relay_log_name,
2917 rli->get_event_relay_log_name());
2919 DBUG_ASSERT(ptr_group->group_relay_log_name != NULL);
2921 ret_worker->relay_log_change_notified= TRUE;
2924 if (!ret_worker->checkpoint_notified)
2927 ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
2928 ptr_group->checkpoint_log_name=
2929 my_strdup(rli->get_group_master_log_name(), MYF(MY_WME));
2930 ptr_group->checkpoint_log_pos= rli->get_group_master_log_pos();
2931 ptr_group->checkpoint_relay_log_name=
2932 my_strdup(rli->get_group_relay_log_name(), MYF(MY_WME));
2933 ptr_group->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
2934 ptr_group->shifted= ret_worker->bitmap_shifted;
2935 ret_worker->bitmap_shifted= 0;
2936 ret_worker->checkpoint_notified= TRUE;
2938 ptr_group->checkpoint_seqno= rli->checkpoint_seqno;
2939 ptr_group->ts= when.tv_sec + (time_t) exec_time;
2940 rli->checkpoint_seqno++;
2943 free_root(&rli->mts_coor_mem_root, MYF(MY_KEEP_PREALLOC));
2968 DBUG_ENTER(
"LOG_EVENT:apply_event");
2969 bool parallel= FALSE;
2970 enum enum_mts_event_exec_mode actual_exec_mode= EVENT_EXEC_PARALLEL;
2971 THD *thd= rli->info_thd;
2978 bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
2979 (get_mts_execution_mode(::server_id,
2980 rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)
2981 == EVENT_EXEC_PARALLEL);
2988 DBUG_RETURN(do_apply_event(rli));
2994 get_mts_execution_mode(::server_id,
2995 rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
2996 != EVENT_EXEC_PARALLEL))
3008 if (actual_exec_mode != EVENT_EXEC_ASYNC)
3015 if (rli->curr_group_da.elements > 0)
3023 llstr(rli->get_event_relay_log_pos(), llbuff);
3024 my_error(ER_MTS_CANT_PARALLEL, MYF(0),
3025 get_type_str(), rli->get_event_relay_log_name(), llbuff,
3026 "possible malformed group of events from an old master");
3029 rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
3036 if (wait_for_workers_to_finish(rli) == -1)
3039 rli->
report(WARNING_LEVEL, 0,
3040 "Slave worker thread has failed to apply an event. As a "
3041 "consequence, the coordinator thread is stopping "
3049 DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP);
3053 for (uint k= 0; k < rli->curr_group_da.elements; k++)
3055 DBUG_ASSERT(!(*(Slave_worker **)
3056 dynamic_array_ptr(&rli->workers, k))->usage_partition);
3057 DBUG_ASSERT(!(*(Slave_worker **)
3058 dynamic_array_ptr(&rli->workers, k))->jobs.len);
3064 DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_ASYNC);
3067 DBUG_RETURN(do_apply_event(rli));
3070 DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_PARALLEL);
3071 DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
3075 (rli->curr_group_seen_begin && rli->curr_group_seen_gtid
3077 rli->last_assigned_worker ||
3084 dynamic_array_ptr(&rli->curr_group_da,
3085 rli->curr_group_da.elements - 1))->
3086 get_type_code() == BEGIN_LOAD_QUERY_EVENT);
3089 rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
3092 (rli->last_assigned_worker= get_slave_worker(rli));
3095 if (rli->last_assigned_worker)
3096 DBUG_PRINT(
"mts", (
"Assigning job to worker %lu",
3097 rli->last_assigned_worker->id));
3101 if (thd->is_error())
3110 for (uint k= 0; k < rli->curr_group_da.elements; k++)
3113 *(
Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
3117 rli->curr_group_da.elements= 0;
3121 DBUG_ASSERT(
worker || rli->curr_group_assigned_parts.elements == 0);
3124 DBUG_RETURN((!thd->is_error() ||
3125 DBUG_EVALUATE_IF(
"fault_injection_get_slave_worker", 1, 0)) ?
3135 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3146 int Query_log_event::pack_info(
Protocol *protocol)
3154 temp_buf.append(
"use ");
3155 append_identifier(this->thd, &temp_buf, db, db_len);
3156 temp_buf.append(
"; ");
3160 temp_buf.append(
query);
3162 protocol->
store(temp_buf.ptr(), temp_buf.length(), &my_charset_bin);
3167 #ifndef MYSQL_CLIENT
3172 static void write_str_with_code_and_len(uchar **dst,
const char *src,
3173 uint len, uint
code)
3179 DBUG_ASSERT(len <= 255);
3182 *((*dst)++)= (uchar) len;
3183 bmove(*dst, src, len);
3197 bool Query_log_event::write(
IO_CACHE* file)
3199 uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS];
3200 uchar *start, *start_of_status;
3243 int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
3244 int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
3245 buf[Q_DB_LEN_OFFSET] = (char) db_len;
3246 int2store(buf + Q_ERR_CODE_OFFSET, error_code);
3253 start_of_status= start= buf+QUERY_HEADER_LEN;
3256 *start++= Q_FLAGS2_CODE;
3257 int4store(start, flags2);
3260 if (sql_mode_inited)
3262 *start++= Q_SQL_MODE_CODE;
3263 int8store(start, sql_mode);
3268 write_str_with_code_and_len(&start,
3269 catalog, catalog_len, Q_CATALOG_NZ_CODE);
3289 if (auto_increment_increment != 1 || auto_increment_offset != 1)
3291 *start++= Q_AUTO_INCREMENT;
3292 int2store(start, auto_increment_increment);
3293 int2store(start+2, auto_increment_offset);
3298 *start++= Q_CHARSET_CODE;
3299 memcpy(start, charset, 6);
3306 write_str_with_code_and_len(&start,
3307 time_zone_str, time_zone_len, Q_TIME_ZONE_CODE);
3309 if (lc_time_names_number)
3311 DBUG_ASSERT(lc_time_names_number <= 0xFFFF);
3312 *start++= Q_LC_TIME_NAMES_CODE;
3313 int2store(start, lc_time_names_number);
3316 if (charset_database_number)
3318 DBUG_ASSERT(charset_database_number <= 0xFFFF);
3319 *start++= Q_CHARSET_DATABASE_CODE;
3320 int2store(start, charset_database_number);
3323 if (table_map_for_update)
3325 *start++= Q_TABLE_MAP_FOR_UPDATE_CODE;
3326 int8store(start, table_map_for_update);
3329 if (master_data_written != 0)
3336 *start++= Q_MASTER_DATA_WRITTEN_CODE;
3337 int4store(start, master_data_written);
3341 if (thd && thd->need_binlog_invoker())
3345 memset(&user, 0,
sizeof(user));
3346 memset(&host, 0,
sizeof(host));
3348 if (thd->slave_thread && thd->has_invoker())
3351 user= thd->get_invoker_user();
3352 host= thd->get_invoker_host();
3354 else if (thd->security_ctx->priv_user)
3356 Security_context *ctx= thd->security_ctx;
3358 user.length= strlen(ctx->priv_user);
3359 user.str= ctx->priv_user;
3360 if (ctx->priv_host[0] !=
'\0')
3362 host.str= ctx->priv_host;
3363 host.length= strlen(ctx->priv_host);
3367 if (user.length > 0)
3369 *start++= Q_INVOKER;
3375 *start++= (uchar)user.length;
3376 memcpy(start, user.str, user.length);
3377 start+= user.length;
3383 *start++= (uchar)host.length;
3384 memcpy(start, host.str, host.length);
3385 start+= host.length;
3389 if (thd && thd->get_binlog_accessed_db_names() != NULL)
3392 *start++= Q_UPDATED_DB_NAMES;
3394 compile_time_assert(MAX_DBS_IN_EVENT_MTS <= OVER_MAX_DBS_IN_EVENT_MTS);
3401 (thd->get_binlog_accessed_db_names()->elements <= MAX_DBS_IN_EVENT_MTS) ?
3402 thd->get_binlog_accessed_db_names()->elements : OVER_MAX_DBS_IN_EVENT_MTS;
3404 DBUG_ASSERT(dbs != 0);
3406 if (dbs <= MAX_DBS_IN_EVENT_MTS)
3409 char *db_name= it++;
3414 if (dbs == 1 && !strcmp(db_name,
""))
3415 dbs= OVER_MAX_DBS_IN_EVENT_MTS;
3417 if (dbs != OVER_MAX_DBS_IN_EVENT_MTS)
3420 strcpy((
char*) start, db_name);
3421 start += strlen(db_name) + 1;
3422 }
while ((db_name= it++));
3430 if (thd && thd->query_start_usec_used)
3432 *start++= Q_MICROSECONDS;
3434 int3store(start, when.tv_usec);
3453 status_vars_len= (uint) (start-start_of_status);
3454 DBUG_ASSERT(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
3455 int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
3461 event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
3463 return (write_header(file, event_length) ||
3464 wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
3465 write_post_header_for_derived(file) ||
3466 wrapper_my_b_safe_write(file, (uchar*) start_of_status,
3467 (uint) (start-start_of_status)) ||
3468 wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)
"", db_len + 1) ||
3469 wrapper_my_b_safe_write(file, (uchar*)
query, q_len) ||
3470 write_footer(file)) ? 1 : 0;
3481 memset(&user, 0,
sizeof(user));
3482 memset(&host, 0,
sizeof(host));
3502 ulong query_length,
bool using_trans,
3503 bool immediate,
bool suppress_use,
3504 int errcode,
bool ignore_cmd_internals)
3509 (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
3510 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
3513 data_buf(0),
query(query_arg), catalog(thd_arg->catalog),
3514 db(thd_arg->db), q_len((uint32) query_length),
3515 thread_id(thd_arg->thread_id),
3517 slave_proxy_id(thd_arg->variables.pseudo_thread_id),
3518 flags2_inited(1), sql_mode_inited(1), charset_inited(1),
3519 sql_mode(thd_arg->variables.sql_mode),
3520 auto_increment_increment(thd_arg->variables.auto_increment_increment),
3521 auto_increment_offset(thd_arg->variables.auto_increment_offset),
3522 lc_time_names_number(thd_arg->variables.lc_time_names->number),
3523 charset_database_number(0),
3524 table_map_for_update((ulonglong)thd_arg->table_map_for_update),
3525 master_data_written(0), mts_accessed_dbs(0)
3529 memset(&user, 0,
sizeof(user));
3530 memset(&host, 0,
sizeof(host));
3532 error_code= errcode;
3535 exec_time = (ulong) (end_time - thd_arg->start_time.tv_sec);
3540 catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
3542 db_len = (db) ? (uint32) strlen(db) : 0;
3543 if (thd_arg->variables.collation_database != thd_arg->db_charset)
3544 charset_database_number= thd_arg->variables.collation_database->number;
3568 flags2= (uint32) (thd_arg->variables.option_bits &
3570 DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256);
3571 DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256);
3572 DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256);
3573 DBUG_ASSERT(thd_arg->variables.character_set_client->mbminlen == 1);
3574 int2store(charset, thd_arg->variables.character_set_client->number);
3575 int2store(charset+2, thd_arg->variables.collation_connection->number);
3576 int2store(charset+4, thd_arg->variables.collation_server->number);
3577 if (thd_arg->time_zone_used)
3584 time_zone_len= thd_arg->variables.time_zone->get_name()->length();
3585 time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
3659 if (ignore_cmd_internals)
3665 bool cmd_can_generate_row_events= FALSE;
3669 bool cmd_must_go_to_trx_cache= FALSE;
3674 switch (lex->sql_command)
3676 case SQLCOM_DROP_TABLE:
3677 cmd_can_generate_row_events= lex->drop_temporary &&
3678 thd->in_multi_stmt_transaction_mode();
3680 case SQLCOM_CREATE_TABLE:
3681 cmd_must_go_to_trx_cache= lex->select_lex.item_list.elements &&
3682 thd->is_current_stmt_binlog_format_row();
3683 cmd_can_generate_row_events=
3684 ((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
3685 thd->in_multi_stmt_transaction_mode()) || cmd_must_go_to_trx_cache;
3687 case SQLCOM_SET_OPTION:
3688 if (lex->autocommit)
3689 cmd_can_generate_row_events= cmd_must_go_to_trx_cache= FALSE;
3691 cmd_can_generate_row_events= TRUE;
3693 case SQLCOM_RELEASE_SAVEPOINT:
3694 case SQLCOM_ROLLBACK_TO_SAVEPOINT:
3695 case SQLCOM_SAVEPOINT:
3696 cmd_can_generate_row_events= cmd_must_go_to_trx_cache= TRUE;
3699 cmd_can_generate_row_events= sqlcom_can_generate_row_events(thd);
3704 if (cmd_can_generate_row_events)
3706 cmd_must_go_to_trx_cache= cmd_must_go_to_trx_cache || using_trans;
3708 thd->lex->is_mixed_stmt_unsafe(thd->in_multi_stmt_transaction_mode(),
3709 thd->variables.binlog_direct_non_trans_update,
3711 thd->tx_isolation) ||
3731 DBUG_PRINT(
"info",(
"Query_log_event has flags2: %lu sql_mode: %llu",
3732 (ulong) flags2, sql_mode));
3764 get_str_len_and_pointer(
const Log_event::Byte **src,
3767 const Log_event::Byte *end)
3774 if (*src + length >= end)
3775 return *src + length - end + 1;
3776 *dst= (
char *)*src + 1;
3783 static void copy_str_and_move(
const char **src,
3784 Log_event::Byte **dst,
3787 memcpy(*dst, *src, len);
3788 *src= (
const char *)*dst;
3798 static char buf[255];
3800 case Q_FLAGS2_CODE:
return "Q_FLAGS2_CODE";
3801 case Q_SQL_MODE_CODE:
return "Q_SQL_MODE_CODE";
3802 case Q_CATALOG_CODE:
return "Q_CATALOG_CODE";
3803 case Q_AUTO_INCREMENT:
return "Q_AUTO_INCREMENT";
3804 case Q_CHARSET_CODE:
return "Q_CHARSET_CODE";
3805 case Q_TIME_ZONE_CODE:
return "Q_TIME_ZONE_CODE";
3806 case Q_CATALOG_NZ_CODE:
return "Q_CATALOG_NZ_CODE";
3807 case Q_LC_TIME_NAMES_CODE:
return "Q_LC_TIME_NAMES_CODE";
3808 case Q_CHARSET_DATABASE_CODE:
return "Q_CHARSET_DATABASE_CODE";
3809 case Q_TABLE_MAP_FOR_UPDATE_CODE:
return "Q_TABLE_MAP_FOR_UPDATE_CODE";
3810 case Q_MASTER_DATA_WRITTEN_CODE:
return "Q_MASTER_DATA_WRITTEN_CODE";
3811 case Q_UPDATED_DB_NAMES:
return "Q_UPDATED_DB_NAMES";
3812 case Q_MICROSECONDS:
return "Q_MICROSECONDS";
3814 sprintf(buf,
"CODE#%d", code);
3826 #define CHECK_SPACE(PTR,END,CNT) \
3828 DBUG_PRINT("info", ("Read %s", code_name(pos[-1]))); \
3829 DBUG_ASSERT((PTR) + (CNT) <= (END)); \
3830 if ((PTR) + (CNT) > (END)) { \
3831 DBUG_PRINT("info", ("query= 0")); \
3846 db(NullS), catalog_len(0), status_vars_len(0),
3847 flags2_inited(0), sql_mode_inited(0), charset_inited(0),
3848 auto_increment_increment(1), auto_increment_offset(1),
3849 time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
3850 table_map_for_update(0), master_data_written(0),
3851 mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS)
3855 uint8 common_header_len, post_header_len;
3856 Log_event::Byte *start;
3857 const Log_event::Byte *end;
3859 DBUG_ENTER(
"Query_log_event::Query_log_event(char*,...)");
3861 memset(&user, 0,
sizeof(user));
3862 memset(&host, 0,
sizeof(host));
3863 common_header_len= description_event->common_header_len;
3864 post_header_len= description_event->post_header_len[event_type-1];
3865 DBUG_PRINT(
"info",(
"event_len: %u common_header_len: %d post_header_len: %d",
3866 event_len, common_header_len, post_header_len));
3873 if (event_len < (uint)(common_header_len + post_header_len))
3875 data_len = event_len - (common_header_len + post_header_len);
3876 buf+= common_header_len;
3878 slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
3879 exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
3880 db_len = (uint)buf[Q_DB_LEN_OFFSET];
3881 error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
3888 tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
3891 status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
3898 if (status_vars_len > min<ulong>(data_len, MAX_SIZE_LOG_EVENT_STATUS))
3900 DBUG_PRINT(
"info", (
"status_vars_len (%u) > data_len (%lu); query= 0",
3901 status_vars_len, data_len));
3905 data_len-= status_vars_len;
3906 DBUG_PRINT(
"info", (
"Query_log_event has status_vars_len: %u",
3907 (uint) status_vars_len));
3919 DBUG_ASSERT(description_event->binlog_version < 4);
3920 master_data_written= data_written;
3930 start= (Log_event::Byte*) (buf+post_header_len);
3931 end= (
const Log_event::Byte*) (start+status_vars_len);
3932 for (
const Log_event::Byte* pos= start; pos < end;)
3936 CHECK_SPACE(pos, end, 4);
3938 flags2= uint4korr(pos);
3939 DBUG_PRINT(
"info",(
"In Query_log_event, read flags2: %lu", (ulong) flags2));
3942 case Q_SQL_MODE_CODE:
3947 CHECK_SPACE(pos, end, 8);
3949 sql_mode= uint8korr(pos);
3950 DBUG_PRINT(
"info",(
"In Query_log_event, read sql_mode: %s",
3951 llstr(sql_mode, buff)));
3955 case Q_CATALOG_NZ_CODE:
3956 DBUG_PRINT(
"info", (
"case Q_CATALOG_NZ_CODE; pos: 0x%lx; end: 0x%lx",
3957 (ulong) pos, (ulong) end));
3958 if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
3960 DBUG_PRINT(
"info", (
"query= 0"));
3965 case Q_AUTO_INCREMENT:
3966 CHECK_SPACE(pos, end, 4);
3967 auto_increment_increment= uint2korr(pos);
3968 auto_increment_offset= uint2korr(pos+2);
3971 case Q_CHARSET_CODE:
3973 CHECK_SPACE(pos, end, 6);
3975 memcpy(charset, pos, 6);
3979 case Q_TIME_ZONE_CODE:
3981 if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
3983 DBUG_PRINT(
"info", (
"Q_TIME_ZONE_CODE: query= 0"));
3989 case Q_CATALOG_CODE:
3990 CHECK_SPACE(pos, end, 1);
3991 if ((catalog_len= *pos))
3992 catalog= (
char*) pos+1;
3993 CHECK_SPACE(pos, end, catalog_len + 2);
3994 pos+= catalog_len+2;
3997 case Q_LC_TIME_NAMES_CODE:
3998 CHECK_SPACE(pos, end, 2);
3999 lc_time_names_number= uint2korr(pos);
4002 case Q_CHARSET_DATABASE_CODE:
4003 CHECK_SPACE(pos, end, 2);
4004 charset_database_number= uint2korr(pos);
4007 case Q_TABLE_MAP_FOR_UPDATE_CODE:
4008 CHECK_SPACE(pos, end, 8);
4009 table_map_for_update= uint8korr(pos);
4012 case Q_MASTER_DATA_WRITTEN_CODE:
4013 CHECK_SPACE(pos, end, 4);
4014 data_written= master_data_written= uint4korr(pos);
4017 case Q_MICROSECONDS:
4019 CHECK_SPACE(pos, end, 3);
4020 when.tv_usec= uint3korr(pos);
4026 CHECK_SPACE(pos, end, 1);
4027 user.length= *pos++;
4028 CHECK_SPACE(pos, end, user.length);
4029 user.str= (
char *)pos;
4032 CHECK_SPACE(pos, end, 1);
4033 host.length= *pos++;
4034 CHECK_SPACE(pos, end, host.length);
4035 host.str= (
char *)pos;
4039 case Q_UPDATED_DB_NAMES:
4042 CHECK_SPACE(pos, end, 1);
4043 mts_accessed_dbs= *pos++;
4049 if (mts_accessed_dbs > MAX_DBS_IN_EVENT_MTS)
4051 mts_accessed_dbs= OVER_MAX_DBS_IN_EVENT_MTS;
4055 DBUG_ASSERT(mts_accessed_dbs != 0);
4057 for (i= 0; i < mts_accessed_dbs && pos < start + status_vars_len; i++)
4059 DBUG_EXECUTE_IF(
"query_log_event_mts_corrupt_db_names",
4061 if (mts_accessed_dbs == 2)
4063 DBUG_ASSERT(pos[
sizeof(
"d?") - 1] == 0);
4064 ((
char*) pos)[
sizeof(
"d?") - 1]=
'a';
4066 strncpy(mts_accessed_db_names[i], (
char*) pos,
4067 min<ulong>(NAME_LEN, start + status_vars_len - pos));
4068 mts_accessed_db_names[
i][NAME_LEN - 1]= 0;
4069 pos+= 1 + strlen((
const char*) pos);
4071 if (i != mts_accessed_dbs || pos > start + status_vars_len)
4077 DBUG_PRINT(
"info",(
"Query_log_event has unknown status vars (first has\
4078 code: %u), skipping the rest of them", (uint) *(pos-1)));
4079 pos= (
const uchar*) end;
4099 #if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
4100 if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1
4107 + QUERY_CACHE_FLAGS_SIZE,
4110 if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1
4125 if (likely(catalog_nz))
4126 copy_str_and_move(&catalog, &start, catalog_len);
4129 memcpy(start, catalog, catalog_len+1);
4130 catalog= (
const char *)start;
4131 start+= catalog_len+1;
4135 copy_str_and_move(&time_zone_str, &start, time_zone_len);
4137 if (user.length > 0)
4138 copy_str_and_move((
const char **)&(user.str), &start, user.length);
4139 if (host.length > 0)
4140 copy_str_and_move((
const char **)&(host.str), &start, host.length);
4150 memcpy((
char*) start, end, data_len);
4151 start[data_len]=
'\0';
4153 query= (
char *)(start + db_len + 1);
4154 q_len= data_len - db_len -1;
4159 #if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
4160 size_t db_length= (size_t)db_len;
4161 memcpy(start + data_len + 1, &db_length,
sizeof(
size_t));
4174 void Query_log_event::print_query_header(
IO_CACHE* file,
4175 PRINT_EVENT_INFO* print_event_info)
4178 char buff[48], *end;
4179 char quoted_id[1+ 2*FN_REFLEN+ 2];
4181 bool different_db= 1;
4184 if (!print_event_info->short_form)
4186 print_header(file, print_event_info, FALSE);
4187 my_b_printf(file,
"\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
4192 if ((flags & LOG_EVENT_SUPPRESS_USE_F))
4194 if (!is_trans_keyword())
4195 print_event_info->db[0]=
'\0';
4200 quoted_len= my_strmov_quoted_identifier(this->thd, (
char*)quoted_id, db, 0);
4202 quoted_len= my_strmov_quoted_identifier((
char*)quoted_id, db);
4204 quoted_id[quoted_len]=
'\0';
4205 different_db= memcmp(print_event_info->db, db, db_len + 1);
4207 memcpy(print_event_info->db, db, db_len + 1);
4208 if (db[0] && different_db)
4209 my_b_printf(file,
"use %s%s\n", quoted_id, print_event_info->delimiter);
4212 end=int10_to_str((
long) when.tv_sec, strmov(buff,
"SET TIMESTAMP="),10);
4214 end+= sprintf(end,
".%06d", (
int) when.tv_usec);
4215 end= strmov(end, print_event_info->delimiter);
4217 DBUG_ASSERT(end < buff +
sizeof(buff));
4218 my_b_write(file, (uchar*) buff, (uint) (end-buff));
4219 if ((!print_event_info->thread_id_printed ||
4221 thread_id != print_event_info->thread_id)))
4224 my_b_printf(file,
"SET @@session.pseudo_thread_id=%lu%s\n",
4225 short_form ? 999999999 : (ulong)thread_id,
4226 print_event_info->delimiter);
4227 print_event_info->thread_id= thread_id;
4228 print_event_info->thread_id_printed= 1;
4236 if (likely(flags2_inited))
4239 if (likely(print_event_info->flags2_inited))
4241 tmp= (print_event_info->flags2) ^ flags2;
4244 print_event_info->flags2_inited= 1;
4251 my_b_printf(file,
"SET ");
4253 "@@session.foreign_key_checks", &need_comma);
4254 print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2,
4255 "@@session.sql_auto_is_null", &need_comma);
4257 "@@session.unique_checks", &need_comma);
4258 print_set_option(file, tmp, OPTION_NOT_AUTOCOMMIT, ~flags2,
4259 "@@session.autocommit", &need_comma);
4260 my_b_printf(file,
"%s\n", print_event_info->delimiter);
4261 print_event_info->flags2= flags2;
4278 if (likely(sql_mode_inited) &&
4279 (unlikely(print_event_info->sql_mode != sql_mode ||
4280 !print_event_info->sql_mode_inited)))
4282 my_b_printf(file,
"SET @@session.sql_mode=%lu%s\n",
4283 (ulong)sql_mode, print_event_info->delimiter);
4284 print_event_info->sql_mode= sql_mode;
4285 print_event_info->sql_mode_inited= 1;
4287 if (print_event_info->auto_increment_increment != auto_increment_increment ||
4288 print_event_info->auto_increment_offset != auto_increment_offset)
4290 my_b_printf(file,
"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu%s\n",
4291 auto_increment_increment,auto_increment_offset,
4292 print_event_info->delimiter);
4293 print_event_info->auto_increment_increment= auto_increment_increment;
4294 print_event_info->auto_increment_offset= auto_increment_offset;
4299 if (likely(charset_inited) &&
4300 (unlikely(!print_event_info->charset_inited ||
4301 memcmp(print_event_info->charset, charset, 6))))
4303 char *charset_p= charset;
4304 CHARSET_INFO *cs_info= get_charset(uint2korr(charset_p), MYF(MY_WME));
4308 my_b_printf(file,
"/*!\\C %s */%s\n",
4309 cs_info->csname, print_event_info->delimiter);
4311 my_b_printf(file,
"SET "
4312 "@@session.character_set_client=%d,"
4313 "@@session.collation_connection=%d,"
4314 "@@session.collation_server=%d"
4316 uint2korr(charset_p),
4317 uint2korr(charset+2),
4318 uint2korr(charset+4),
4319 print_event_info->delimiter);
4320 memcpy(print_event_info->charset, charset, 6);
4321 print_event_info->charset_inited= 1;
4325 if (memcmp(print_event_info->time_zone_str,
4326 time_zone_str, time_zone_len+1))
4328 my_b_printf(file,
"SET @@session.time_zone='%s'%s\n",
4329 time_zone_str, print_event_info->delimiter);
4330 memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1);
4333 if (lc_time_names_number != print_event_info->lc_time_names_number)
4335 my_b_printf(file,
"SET @@session.lc_time_names=%d%s\n",
4336 lc_time_names_number, print_event_info->delimiter);
4337 print_event_info->lc_time_names_number= lc_time_names_number;
4339 if (charset_database_number != print_event_info->charset_database_number)
4341 if (charset_database_number)
4342 my_b_printf(file,
"SET @@session.collation_database=%d%s\n",
4343 charset_database_number, print_event_info->delimiter);
4345 my_b_printf(file,
"SET @@session.collation_database=DEFAULT%s\n",
4346 print_event_info->delimiter);
4347 print_event_info->charset_database_number= charset_database_number;
4352 void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4354 IO_CACHE *
const head= &print_event_info->head_cache;
4360 DBUG_EXECUTE_IF (
"simulate_file_write_error",
4361 {head->write_pos= head->write_end- 500;});
4362 print_query_header(head, print_event_info);
4363 my_b_write(head, (uchar*)
query, q_len);
4364 my_b_printf(head,
"\n%s\n", print_event_info->delimiter);
4368 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4377 void Query_log_event::attach_temp_tables_worker(THD *thd)
4379 if (!is_mts_worker(thd) || (ends_group() ||
starts_group()))
4383 int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ?
4384 1 : mts_accessed_dbs);
4386 DBUG_ASSERT(!thd->temporary_tables);
4388 for (
int i= 0; i < parts; i++)
4390 mts_move_temp_tables_to_thd(thd,
4391 mts_assigned_partitions[i]->temporary_tables);
4392 mts_assigned_partitions[
i]->temporary_tables= NULL;
4403 void Query_log_event::detach_temp_tables_worker(THD *thd)
4405 if (!is_mts_worker(thd))
4408 int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ?
4409 1 : mts_accessed_dbs);
4422 for (
int i= 0; i < parts; i++)
4424 mts_assigned_partitions[
i]->temporary_tables= NULL;
4427 for (
TABLE *table= thd->temporary_tables; table;)
4430 char *db_name= NULL;
4433 for (i= 0; i < parts; i++)
4435 db_name= mts_accessed_db_names[
i];
4437 if (!strlen(db_name))
4441 if (!rpl_filter->is_rewrite_empty() && !strcmp(get_db(), db_name))
4444 const char *db_filtered= rpl_filter->get_rewrite_db(db_name, &dummy_len);
4446 if (strcmp(db_name, db_filtered))
4447 db_name= (
char*)db_filtered;
4450 if (strcmp(table->s->db.str, db_name) < 0)
4456 if (!rpl_filter->is_rewrite_empty() &&
4457 strcmp(table->s->db.str, db_name))
4464 DBUG_ASSERT(db_name && (
4465 !strcmp(table->s->db.str, db_name) ||
4468 DBUG_ASSERT(i < mts_accessed_dbs);
4471 table= mts_move_temp_table_to_entry(table, thd, mts_assigned_partitions[i]);
4474 DBUG_ASSERT(!thd->temporary_tables);
4476 for (
int i= 0; i < parts; i++)
4478 DBUG_ASSERT(!mts_assigned_partitions[i]->temporary_tables ||
4479 !mts_assigned_partitions[i]->temporary_tables->prev);
4489 return do_apply_event(rli,
query, q_len);
4499 static bool is_silent_error(THD* thd)
4501 DBUG_ENTER(
"is_silent_error");
4503 thd->get_stmt_da()->sql_conditions();
4507 DBUG_PRINT(
"info", (
"has condition %d %s", err->
get_sql_errno(),
4511 case ER_SLAVE_SILENT_RETRY_TRANSACTION:
4540 const char *query_arg, uint32 q_len_arg)
4542 int expected_error,actual_error= 0;
4552 thd->catalog= catalog_len ? (
char *) catalog : (
char *)
"";
4553 set_thd_db(thd, db, db_len);
4558 load_db_opt_by_name(thd, thd->db, &db_options);
4559 if (db_options.default_table_charset)
4560 thd->db_charset= db_options.default_table_charset;
4561 thd->variables.auto_increment_increment= auto_increment_increment;
4562 thd->variables.auto_increment_offset= auto_increment_offset;
4574 const_cast<Relay_log_info*
>(rli)->set_future_group_master_log_pos(log_pos);
4575 DBUG_PRINT(
"info", (
"log_pos: %lu", (ulong) log_pos));
4581 clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
4582 if (strcmp(
"COMMIT",
query) == 0 && rli->tables_to_lock != NULL)
4591 if ((error= rows_event_stmt_cleanup(const_cast<Relay_log_info*>(rli), thd)))
4594 "Error in cleaning up after an event preceeding the commit; "
4595 "the group log file/position: %s %s",
4596 const_cast<Relay_log_info*>(rli)->get_group_master_log_name(),
4597 llstr(const_cast<Relay_log_info*>(rli)->get_group_master_log_pos(),
4611 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(thd);
4624 if (is_trans_keyword() || rpl_filter->db_ok(thd->db))
4626 thd->set_time(&when);
4627 thd->set_query_and_id((
char*)query_arg, q_len_arg,
4628 thd->charset(), next_query_id());
4629 thd->variables.pseudo_thread_id= thread_id;
4630 attach_temp_tables_worker(thd);
4631 DBUG_PRINT(
"query",(
"%s", thd->query()));
4633 if (ignored_error_code((expected_error= error_code)) ||
4634 !unexpected_error_code(expected_error))
4655 if (sql_mode_inited)
4656 thd->variables.sql_mode=
4657 (sql_mode_t) ((thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE) |
4658 (sql_mode & ~(ulonglong) MODE_NO_DIR_IN_CREATE));
4661 if (rli->cached_charset_compare(charset))
4663 char *charset_p= charset;
4665 if (!(thd->variables.character_set_client=
4666 get_charset(uint2korr(charset_p), MYF(MY_WME))) ||
4667 !(thd->variables.collation_connection=
4668 get_charset(uint2korr(charset+2), MYF(MY_WME))) ||
4669 !(thd->variables.collation_server=
4670 get_charset(uint2korr(charset+4), MYF(MY_WME))))
4678 set_slave_thread_default_charset(thd, rli);
4679 goto compare_errors;
4681 thd->update_charset();
4693 thd->set_query((
char*) query_arg, q_len_arg, thd->charset());
4698 String tmp(time_zone_str, time_zone_len, &my_charset_bin);
4699 if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
4701 my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
4702 thd->variables.time_zone= global_system_variables.time_zone;
4703 goto compare_errors;
4706 if (lc_time_names_number)
4708 if (!(thd->variables.lc_time_names=
4709 my_locale_by_number(lc_time_names_number)))
4711 my_printf_error(ER_UNKNOWN_ERROR,
4712 "Unknown locale: '%d'", MYF(0), lc_time_names_number);
4713 thd->variables.lc_time_names= &my_locale_en_US;
4714 goto compare_errors;
4718 thd->variables.lc_time_names= &my_locale_en_US;
4719 if (charset_database_number)
4722 if (!(cs= get_charset(charset_database_number, MYF(0))))
4725 int10_to_str((
int) charset_database_number, buf, -10);
4726 my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
4727 goto compare_errors;
4729 thd->variables.collation_database= cs;
4732 thd->variables.collation_database= thd->db_charset;
4734 thd->table_map_for_update= (table_map)table_map_for_update;
4735 thd->set_invoker(&user, &host);
4743 if (expected_error &&
4744 (ignored_error_code(expected_error) ||
4745 concurrency_error_code(expected_error)))
4750 Parser_state parser_state;
4751 if (!parser_state.init(thd, thd->query(), thd->query_length()))
4753 thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state,
4754 stmt_info_rpl.m_key,
4755 thd->db, thd->db_length,
4757 THD_STAGE_INFO(thd, stage_init);
4758 MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length());
4760 mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
4762 thd->update_server_status();
4778 thd->enable_slow_log= opt_log_slow_slave_statements;
4789 if (mysql_test_parse_for_slave(thd, thd->query(), thd->query_length()))
4790 clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
4793 rli->
report(ERROR_LEVEL, expected_error,
4795 Query partially completed on the master (error on master: %d) \
4796 and was aborted. There is a chance that your master is inconsistent at this \
4797 point. If you are sure that your master is ok, run this query manually on the \
4798 slave and then restart the slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; \
4799 START SLAVE; . Query: '%s'", expected_error, thd->query());
4800 thd->is_slave_error= 1;
4806 if (!thd->is_error() || thd->get_stmt_da()->sql_errno() != ER_SLAVE_IGNORED_TABLE)
4815 if (opt_log_raw || thd->rewritten_query.length() == 0)
4816 general_log_write(thd, COM_QUERY, thd->query(), thd->query_length());
4818 general_log_write(thd, COM_QUERY, thd->rewritten_query.c_ptr_safe(),
4819 thd->rewritten_query.length());
4830 if (thd->lex->sql_command == SQLCOM_DROP_TABLE && thd->lex->drop_temporary &&
4831 thd->is_error() && thd->get_stmt_da()->sql_errno() == ER_BAD_TABLE_ERROR &&
4833 thd->get_stmt_da()->reset_diagnostics_area();
4838 actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
4839 DBUG_PRINT(
"info",(
"expected_error: %d sql_errno: %d",
4840 expected_error, actual_error));
4842 if ((expected_error && expected_error != actual_error &&
4843 !concurrency_error_code(expected_error)) &&
4844 !ignored_error_code(actual_error) &&
4845 !ignored_error_code(expected_error))
4847 rli->
report(ERROR_LEVEL, 0,
4849 Query caused different errors on master and slave. \
4850 Error on master: message (format)='%s' error code=%d ; \
4851 Error on slave: actual message='%s', error code=%d. \
4852 Default database: '%s'. Query: '%s'",
4853 ER_SAFE(expected_error),
4855 actual_error ? thd->get_stmt_da()->message() :
"no error",
4857 print_slave_db_safe(db), query_arg);
4858 thd->is_slave_error= 1;
4864 else if ((expected_error == actual_error &&
4865 !concurrency_error_code(expected_error)) ||
4866 ignored_error_code(actual_error))
4868 DBUG_PRINT(
"info",(
"error ignored"));
4869 if (log_warnings > 1 && ignored_error_code(actual_error))
4871 rli->
report(WARNING_LEVEL, actual_error,
4872 "Could not execute %s event. Detailed error: %s;",
4875 clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
4876 thd->killed= THD::NOT_KILLED;
4881 else if (thd->is_slave_error || thd->is_fatal_error)
4883 if (!is_silent_error(thd))
4885 rli->
report(ERROR_LEVEL, actual_error,
4886 "Error '%s' on query. Default database: '%s'. Query: '%s'",
4887 (actual_error ? thd->get_stmt_da()->message() :
4888 "unexpected success or fatal error"),
4889 print_slave_db_safe(thd->db), query_arg);
4891 thd->is_slave_error= 1;
4927 DBUG_EXECUTE_IF(
"stop_slave_middle_group",
4928 if (strcmp(
"COMMIT",
query) != 0 &&
4929 strcmp(
"BEGIN",
query) != 0)
4931 if (thd->transaction.all.cannot_safely_rollback())
4932 const_cast<Relay_log_info*>(rli)->abort_slave= 1;
4938 if (thd->temporary_tables)
4939 detach_temp_tables_worker(thd);
4951 thd->set_db(NULL, 0);
4953 thd->lex->sql_command= SQLCOM_END;
4954 DBUG_PRINT(
"info", (
"end: query= 0"));
4957 MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
4958 thd->m_statement_psi= NULL;
4967 thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
4968 thd->first_successful_insert_id_in_prev_stmt= 0;
4969 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
4970 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
4971 return thd->is_slave_error;
4982 if (thd->one_shot_set)
4984 rli->inc_event_relay_log_pos();
4987 ret= Log_event::do_update_pos(rli);
4989 DBUG_EXECUTE_IF(
"crash_after_commit_and_update_pos",
4990 if (!strcmp(
"COMMIT",
query))
4992 sql_print_information(
"Crashing crash_after_commit_and_update_pos.");
5006 DBUG_ENTER(
"Query_log_event::do_shall_skip");
5007 DBUG_PRINT(
"debug", (
"query: %s; q_len: %d",
query, q_len));
5008 DBUG_ASSERT(
query && q_len > 0);
5010 if (rli->slave_skip_counter > 0)
5012 if (strcmp(
"BEGIN",
query) == 0)
5014 thd->variables.option_bits|= OPTION_BEGIN;
5015 DBUG_RETURN(Log_event::continue_group(rli));
5018 if (strcmp(
"COMMIT",
query) == 0 || strcmp(
"ROLLBACK",
query) == 0)
5020 thd->variables.option_bits&= ~OPTION_BEGIN;
5024 DBUG_RETURN(Log_event::do_shall_skip(rli));
5034 #ifndef MYSQL_CLIENT
5035 Start_log_event_v3::Start_log_event_v3()
5036 :
Log_event(), created(0), binlog_version(BINLOG_VERSION),
5039 memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
5047 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5048 int Start_log_event_v3::pack_info(
Protocol *protocol)
5050 char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
5051 pos= strmov(buf,
"Server ver: ");
5052 pos= strmov(pos, server_version);
5053 pos= strmov(pos,
", Binlog ver: ");
5054 pos= int10_to_str(binlog_version, pos, 10);
5055 protocol->
store(buf, (uint) (pos-buf), &my_charset_bin);
5066 void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
5068 DBUG_ENTER(
"Start_log_event_v3::print");
5070 IO_CACHE *
const head= &print_event_info->head_cache;
5072 if (!print_event_info->short_form)
5074 print_header(head, print_event_info, FALSE);
5075 my_b_printf(head,
"\tStart: binlog v %d, server v %s created ",
5076 binlog_version, server_version);
5077 print_timestamp(head, NULL);
5079 my_b_printf(head,
" at startup");
5080 my_b_printf(head,
"\n");
5081 if (flags & LOG_EVENT_BINLOG_IN_USE_F)
5082 my_b_printf(head,
"# Warning: this binlog is either in use or was not "
5083 "closed properly.\n");
5085 if (!is_artificial_event() && created)
5087 #ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND
5094 my_b_printf(head,
"RESET CONNECTION%s\n", print_event_info->delimiter);
5096 my_b_printf(head,
"ROLLBACK%s\n", print_event_info->delimiter);
5100 print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
5101 !print_event_info->short_form)
5103 if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS)
5104 my_b_printf(head,
"BINLOG '\n");
5105 print_base64(head, print_event_info, FALSE);
5106 print_event_info->printed_fd_event= TRUE;
5116 Start_log_event_v3::Start_log_event_v3(
const char* buf,
5121 buf+= description_event->common_header_len;
5122 binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
5123 memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
5126 server_version[ST_SERVER_VER_LEN-1]= 0;
5127 created= uint4korr(buf+ST_CREATED_OFFSET);
5128 dont_set_created= 1;
5136 #ifndef MYSQL_CLIENT
5137 bool Start_log_event_v3::write(
IO_CACHE* file)
5139 char buff[START_V3_HEADER_LEN];
5140 int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
5141 memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
5142 if (!dont_set_created)
5143 created= get_time();
5144 int4store(buff + ST_CREATED_OFFSET,created);
5145 return (write_header(file,
sizeof(buff)) ||
5146 wrapper_my_b_safe_write(file, (uchar*) buff,
sizeof(buff)) ||
5147 write_footer(file));
5152 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5172 int Start_log_event_v3::do_apply_event(
Relay_log_info const *rli)
5174 DBUG_ENTER(
"Start_log_event_v3::do_apply_event");
5176 switch (binlog_version)
5188 error= close_temporary_tables(thd);
5189 cleanup_load_tmpdir();
5199 for (table= thd->temporary_tables; table; table= table->next)
5210 "3.23.57",7) >= 0 && created)
5216 error= close_temporary_tables(thd);
5257 binlog_version= binlog_ver;
5258 switch (binlog_ver) {
5260 memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
5261 DBUG_EXECUTE_IF(
"pretend_version_50034_in_binlog",
5262 strmov(server_version,
"5.0.34"););
5263 common_header_len= LOG_EVENT_HEADER_LEN;
5264 number_of_event_types= LOG_EVENT_TYPES;
5266 post_header_len=(uint8*) my_malloc(number_of_event_types*
sizeof(uint8)
5267 + BINLOG_CHECKSUM_ALG_DESC_LEN,
5274 if (post_header_len)
5279 memset(post_header_len, 255, number_of_event_types*
sizeof(uint8));
5283 post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
5284 post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
5285 post_header_len[STOP_EVENT-1]= STOP_HEADER_LEN;
5286 post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
5287 post_header_len[INTVAR_EVENT-1]= INTVAR_HEADER_LEN;
5288 post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
5289 post_header_len[SLAVE_EVENT-1]= 0;
5290 post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
5291 post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
5292 post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
5293 post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
5294 post_header_len[NEW_LOAD_EVENT-1]= NEW_LOAD_HEADER_LEN;
5295 post_header_len[RAND_EVENT-1]= RAND_HEADER_LEN;
5296 post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN;
5297 post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
5298 post_header_len[XID_EVENT-1]= XID_HEADER_LEN;
5299 post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN;
5300 post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
5307 post_header_len[PRE_GA_WRITE_ROWS_EVENT-1] = 0;
5308 post_header_len[PRE_GA_UPDATE_ROWS_EVENT-1] = 0;
5309 post_header_len[PRE_GA_DELETE_ROWS_EVENT-1] = 0;
5311 post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
5312 post_header_len[WRITE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
5313 post_header_len[UPDATE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
5314 post_header_len[DELETE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
5325 DBUG_EXECUTE_IF(
"old_row_based_repl_4_byte_map_id_master",
5326 post_header_len[TABLE_MAP_EVENT-1]=
5327 post_header_len[WRITE_ROWS_EVENT_V1-1]=
5328 post_header_len[UPDATE_ROWS_EVENT_V1-1]=
5329 post_header_len[DELETE_ROWS_EVENT_V1-1]= 6;);
5330 post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
5331 post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
5332 post_header_len[IGNORABLE_LOG_EVENT-1]= IGNORABLE_HEADER_LEN;
5333 post_header_len[ROWS_QUERY_LOG_EVENT-1]= IGNORABLE_HEADER_LEN;
5334 post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
5335 post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
5336 post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
5337 post_header_len[GTID_LOG_EVENT-1]=
5338 post_header_len[ANONYMOUS_GTID_LOG_EVENT-1]=
5340 post_header_len[PREVIOUS_GTIDS_LOG_EVENT-1]= IGNORABLE_HEADER_LEN;
5344 for (i=0; i<number_of_event_types; i++)
5345 DBUG_ASSERT(post_header_len[i] != 255);
5356 strmov(server_version, server_ver ? server_ver :
"3.23");
5358 strmov(server_version, server_ver ? server_ver :
"4.0");
5359 common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
5360 LOG_EVENT_MINIMAL_HEADER_LEN;
5368 number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
5369 post_header_len=(uint8*) my_malloc(number_of_event_types*
sizeof(uint8),
5371 if (post_header_len)
5373 post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
5374 post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
5375 post_header_len[STOP_EVENT-1]= 0;
5376 post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
5377 post_header_len[INTVAR_EVENT-1]= 0;
5378 post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
5379 post_header_len[SLAVE_EVENT-1]= 0;
5380 post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
5381 post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
5382 post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
5383 post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
5384 post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
5385 post_header_len[RAND_EVENT-1]= 0;
5386 post_header_len[USER_VAR_EVENT-1]= 0;
5394 checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
5425 DBUG_ENTER(
"Format_description_log_event::Format_description_log_event(char*,...)");
5426 buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
5427 if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
5429 number_of_event_types=
5430 event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET + 1);
5431 DBUG_PRINT(
"info", (
"common_header_len=%d number_of_event_types=%d",
5432 common_header_len, number_of_event_types));
5435 post_header_len= (uint8*) my_memdup((uchar*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
5436 number_of_event_types*
5437 sizeof(*post_header_len),
5443 number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
5449 DBUG_ASSERT(ver_calc != checksum_version_product ||
5450 number_of_event_types == LOG_EVENT_TYPES);
5451 checksum_alg= post_header_len[number_of_event_types];
5455 checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
5511 if (post_header_len &&
5512 server_version[0] ==
'5' && server_version[1] ==
'.' &&
5513 server_version[3] ==
'.' &&
5514 strncmp(server_version + 5,
"-a_drop", 7) == 0 &&
5515 ((server_version[2] ==
'1' &&
5516 server_version[4] >=
'1' && server_version[4] <=
'5' &&
5517 server_version[12] ==
'5') ||
5518 (server_version[2] ==
'1' &&
5519 server_version[4] ==
'4' &&
5520 server_version[12] ==
'6') ||
5521 (server_version[2] ==
'2' &&
5522 server_version[4] >=
'0' && server_version[4] <=
'2' &&
5523 server_version[12] ==
'6')))
5525 if (number_of_event_types != 22)
5527 DBUG_PRINT(
"info", (
" number_of_event_types=%d",
5528 number_of_event_types));
5530 my_free(post_header_len);
5531 post_header_len= NULL;
5534 static const uint8 perm[23]=
5536 UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
5537 INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
5538 APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
5540 RAND_EVENT, USER_VAR_EVENT,
5541 FORMAT_DESCRIPTION_EVENT,
5543 PRE_GA_WRITE_ROWS_EVENT,
5544 PRE_GA_UPDATE_ROWS_EVENT,
5545 PRE_GA_DELETE_ROWS_EVENT,
5547 BEGIN_LOAD_QUERY_EVENT,
5548 EXECUTE_LOAD_QUERY_EVENT,
5550 event_type_permutation= perm;
5555 uint8 post_header_len_temp[23];
5556 for (
int i= 1; i < 23; i++)
5557 post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
5558 for (
int i= 0; i < 22; i++)
5559 post_header_len[i] = post_header_len_temp[i];
5564 #ifndef MYSQL_CLIENT
5565 bool Format_description_log_event::write(
IO_CACHE* file)
5573 uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN];
5574 size_t rec_size=
sizeof(buff);
5575 int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
5576 memcpy((
char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
5577 if (!dont_set_created)
5578 created= get_time();
5579 int4store(buff + ST_CREATED_OFFSET,created);
5580 buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
5581 memcpy((
char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len,
5591 compile_time_assert(
sizeof(BINLOG_CHECKSUM_ALG_DESC_LEN == 1));
5595 buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ?
5596 checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
5610 if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF)))
5612 checksum_alg= BINLOG_CHECKSUM_ALG_CRC32;
5614 ret= (write_header(file, rec_size) ||
5615 wrapper_my_b_safe_write(file, buff, rec_size) ||
5616 write_footer(file));
5618 checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
5623 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5624 int Format_description_log_event::do_apply_event(
Relay_log_info const *rli)
5627 DBUG_ENTER(
"Format_description_log_event::do_apply_event");
5640 if (!is_artificial_event() && created && thd->transaction.all.ha_list)
5643 rli->
report(INFORMATION_LEVEL, 0,
5644 "Rolling back unfinished transaction (no COMMIT "
5645 "or ROLLBACK in relay log). A probable cause is that "
5646 "the master died while writing the transaction to "
5647 "its binary log, thus rolled back too.");
5656 if (server_id != (uint32) ::server_id)
5667 ret= Start_log_event_v3::do_apply_event(rli);
5673 const_cast<Relay_log_info *
>(rli)->set_rli_description_event(
this);
5679 int Format_description_log_event::do_update_pos(
Relay_log_info *rli)
5681 if (server_id == (uint32) ::server_id)
5696 rli->inc_event_relay_log_pos();
5701 return Log_event::do_update_pos(rli);
5722 DBUG_PRINT(
"info",(
"Format_description_log_event::server_version_split:"
5723 " '%s' %d %d %d", server_version,
5724 server_version_split[0],
5725 server_version_split[1], server_version_split[2]));
5734 return version_product(server_version_split);
5758 char version[ST_SERVER_VER_LEN];
5759 uchar version_split[3];
5761 DBUG_ENTER(
"get_checksum_alg");
5762 DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
5764 memcpy(version, buf +
5765 buf[LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET]
5766 + ST_SERVER_VER_OFFSET, ST_SERVER_VER_LEN);
5767 version[ST_SERVER_VER_LEN - 1]= 0;
5770 ret= (version_product(version_split) < checksum_version_product) ?
5771 (uint8) BINLOG_CHECKSUM_ALG_UNDEF :
5773 DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF ||
5774 ret == BINLOG_CHECKSUM_ALG_UNDEF ||
5775 ret == BINLOG_CHECKSUM_ALG_CRC32);
5797 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5798 uint Load_log_event::get_query_buffer_length()
5803 18 + fname_len + 2 +
5807 13 + table_name_len*2 +
5808 21 + sql_ex.field_term_len*4 + 2 +
5809 23 + sql_ex.enclosed_len*4 + 2 +
5810 12 + sql_ex.escaped_len*4 + 2 +
5811 21 + sql_ex.line_term_len*4 + 2 +
5812 19 + sql_ex.line_start_len*4 + 2 +
5814 3 + (num_fields-1)*2 + field_block_len;
5818 void Load_log_event::print_query(
bool need_db,
const char *cs,
char *buf,
5819 char **end,
char **fn_start,
char **fn_end)
5821 char quoted_id[1 + NAME_LEN * 2 + 2];
5822 int quoted_id_len= 0;
5825 if (need_db && db && db_len)
5827 pos= strmov(pos,
"use ");
5829 quoted_id_len= my_strmov_quoted_identifier(this->thd, (
char *) quoted_id,
5832 quoted_id_len= my_strmov_quoted_identifier((
char *) quoted_id, db);
5834 quoted_id[quoted_id_len]=
'\0';
5835 pos= strmov(pos, quoted_id);
5836 pos= strmov(pos,
"; ");
5839 pos= strmov(pos,
"LOAD DATA ");
5842 pos= strmov(pos,
"CONCURRENT ");
5847 if (check_fname_outside_temp_buf())
5848 pos= strmov(pos,
"LOCAL ");
5849 pos= strmov(pos,
"INFILE '");
5850 memcpy(pos, fname, fname_len);
5851 pos= strmov(pos+fname_len,
"' ");
5853 if (sql_ex.opt_flags & REPLACE_FLAG)
5854 pos= strmov(pos,
"REPLACE ");
5855 else if (sql_ex.opt_flags & IGNORE_FLAG)
5856 pos= strmov(pos,
"IGNORE ");
5858 pos= strmov(pos ,
"INTO");
5863 pos= strmov(pos ,
" TABLE ");
5865 pos+= table_name_len;
5869 pos= strmov(pos ,
" CHARACTER SET ");
5870 pos= strmov(pos , cs);
5874 pos= strmov(pos,
" FIELDS TERMINATED BY ");
5875 pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
5876 if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
5877 pos= strmov(pos,
" OPTIONALLY ");
5878 pos= strmov(pos,
" ENCLOSED BY ");
5879 pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
5881 pos= strmov(pos,
" ESCAPED BY ");
5882 pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
5884 pos= strmov(pos,
" LINES TERMINATED BY ");
5885 pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
5886 if (sql_ex.line_start_len)
5888 pos= strmov(pos,
" STARTING BY ");
5889 pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
5892 if ((
long) skip_lines > 0)
5894 pos= strmov(pos,
" IGNORE ");
5895 pos= longlong10_to_str((longlong) skip_lines, pos, 10);
5896 pos= strmov(pos,
" LINES ");
5902 const char *field= fields;
5903 pos= strmov(pos,
" (");
5904 for (i = 0; i < num_fields; i++)
5911 quoted_id_len= my_strmov_quoted_identifier(this->thd, quoted_id, field,
5913 memcpy(pos, quoted_id, quoted_id_len-1);
5922 int Load_log_event::pack_info(
Protocol *protocol)
5926 if (!(buf= (
char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
5928 print_query(TRUE, NULL, buf, &end, 0, 0);
5929 protocol->
store(buf, end-buf, &my_charset_bin);
5936 #ifndef MYSQL_CLIENT
5942 bool Load_log_event::write_data_header(
IO_CACHE* file)
5944 char buf[LOAD_HEADER_LEN];
5945 int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
5946 int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
5947 int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
5948 buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
5949 buf[L_DB_LEN_OFFSET] = (char)db_len;
5950 int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
5951 return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0;
5959 bool Load_log_event::write_data_body(
IO_CACHE* file)
5963 if (num_fields && fields && field_lens)
5965 if (my_b_safe_write(file, (uchar*)field_lens, num_fields) ||
5966 my_b_safe_write(file, (uchar*)fields, field_block_len))
5969 return (my_b_safe_write(file, (uchar*)
table_name, table_name_len + 1) ||
5970 my_b_safe_write(file, (uchar*)db, db_len + 1) ||
5971 my_b_safe_write(file, (uchar*)fname, fname_len));
5980 const char *db_arg,
const char *table_name_arg,
5982 bool is_concurrent_arg,
5983 enum enum_duplicates handle_dup,
5984 bool ignore,
bool using_trans)
5986 thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
5987 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
5990 thread_id(thd_arg->thread_id),
5991 slave_proxy_id(thd_arg->variables.pseudo_thread_id),
5992 num_fields(0),fields(0),
5993 field_lens(0),field_block_len(0),
5994 table_name(table_name_arg ? table_name_arg :
""),
5995 db(db_arg), fname(ex->file_name), local_fname(FALSE),
5996 is_concurrent(is_concurrent_arg)
6000 exec_time = (ulong) (end_time - thd_arg->start_time.tv_sec);
6002 db_len = (uint32) strlen(db);
6003 table_name_len = (uint32) strlen(
table_name);
6004 fname_len = (fname) ? (uint) strlen(fname) : 0;
6005 sql_ex.field_term = (
char*) ex->field_term->ptr();
6006 sql_ex.field_term_len = (uint8) ex->field_term->length();
6007 sql_ex.enclosed = (
char*) ex->enclosed->ptr();
6008 sql_ex.enclosed_len = (uint8) ex->enclosed->length();
6009 sql_ex.line_term = (
char*) ex->line_term->ptr();
6010 sql_ex.line_term_len = (uint8) ex->line_term->length();
6011 sql_ex.line_start = (
char*) ex->line_start->ptr();
6012 sql_ex.line_start_len = (uint8) ex->line_start->length();
6013 sql_ex.escaped = (
char*) ex->escaped->ptr();
6014 sql_ex.escaped_len = (uint8) ex->escaped->length();
6015 sql_ex.opt_flags = 0;
6016 sql_ex.cached_new_format = -1;
6019 sql_ex.opt_flags|= DUMPFILE_FLAG;
6020 if (ex->opt_enclosed)
6021 sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
6023 sql_ex.empty_flags= 0;
6025 switch (handle_dup) {
6027 sql_ex.opt_flags|= REPLACE_FLAG;
6034 sql_ex.opt_flags|= IGNORE_FLAG;
6036 if (!ex->field_term->length())
6037 sql_ex.empty_flags |= FIELD_TERM_EMPTY;
6038 if (!ex->enclosed->length())
6039 sql_ex.empty_flags |= ENCLOSED_EMPTY;
6040 if (!ex->line_term->length())
6041 sql_ex.empty_flags |= LINE_TERM_EMPTY;
6042 if (!ex->line_start->length())
6043 sql_ex.empty_flags |= LINE_START_EMPTY;
6044 if (!ex->escaped->length())
6045 sql_ex.empty_flags |= ESCAPED_EMPTY;
6047 skip_lines = ex->skip_lines;
6050 field_lens_buf.length(0);
6051 fields_buf.length(0);
6053 while ((item = li++))
6056 uchar len= (uchar) item->item_name.
length();
6057 field_block_len += len + 1;
6058 fields_buf.append(item->item_name.
ptr(), len + 1);
6059 field_lens_buf.append((
char*)&len, 1);
6062 field_lens = (
const uchar*)field_lens_buf.ptr();
6063 fields = fields_buf.ptr();
6075 :
Log_event(buf, description_event), num_fields(0), fields(0),
6076 field_lens(0),field_block_len(0),
6077 table_name(0), db(0), fname(0), local_fname(FALSE),
6083 is_concurrent(FALSE)
6085 DBUG_ENTER(
"Load_log_event");
6091 copy_log_event(buf, event_len,
6092 ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
6094 description_event->common_header_len :
6095 LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
6106 int Load_log_event::copy_log_event(
const char *buf, ulong event_len,
6110 DBUG_ENTER(
"Load_log_event::copy_log_event");
6112 char* buf_end = (
char*)buf + event_len;
6114 const char* data_head = buf + description_event->common_header_len;
6115 slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
6116 exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
6117 skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
6118 table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
6119 db_len = (uint)data_head[L_DB_LEN_OFFSET];
6120 num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
6122 if ((
int) event_len < body_offset)
6128 if (!(field_lens= (uchar*)sql_ex.init((
char*)buf + body_offset,
6130 buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
6133 data_len = event_len - body_offset;
6134 if (num_fields > data_len)
6136 for (uint i = 0; i < num_fields; i++)
6137 field_block_len += (uint)field_lens[
i] + 1;
6139 fields = (
char*)field_lens + num_fields;
6142 fname = db + db_len + 1;
6143 fname_len = (uint) strlen(fname);
6155 void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
6157 print(file, print_event_info, 0);
6161 void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info,
6164 IO_CACHE *
const head= &print_event_info->head_cache;
6166 char temp_buf[1 + 2*FN_REFLEN + 2];
6168 DBUG_ENTER(
"Load_log_event::print");
6169 if (!print_event_info->short_form)
6171 print_header(head, print_event_info, FALSE);
6172 my_b_printf(head,
"\tQuery\tthread_id=%ld\texec_time=%ld\n",
6173 thread_id, exec_time);
6176 bool different_db= 1;
6185 if ((different_db= memcmp(print_event_info->db, db, db_len + 1)) &&
6187 memcpy(print_event_info->db, db, db_len + 1);
6190 if (db && db[0] && different_db)
6193 id_len= my_strmov_quoted_identifier(this->thd, temp_buf, db, 0);
6195 id_len= my_strmov_quoted_identifier(temp_buf, db);
6197 temp_buf[id_len]=
'\0';
6198 my_b_printf(head,
"%suse %s%s\n",
6199 commented ?
"# " :
"", temp_buf, print_event_info->delimiter);
6201 if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
6202 my_b_printf(head,
"%sSET @@session.pseudo_thread_id=%lu%s\n",
6203 commented ?
"# " :
"", (ulong)thread_id,
6204 print_event_info->delimiter);
6205 my_b_printf(head,
"%sLOAD DATA ",
6206 commented ?
"# " :
"");
6207 if (check_fname_outside_temp_buf())
6208 my_b_printf(head,
"LOCAL ");
6209 my_b_printf(head,
"INFILE '%-*s' ", fname_len, fname);
6211 if (sql_ex.opt_flags & REPLACE_FLAG)
6212 my_b_printf(head,
"REPLACE ");
6213 else if (sql_ex.opt_flags & IGNORE_FLAG)
6214 my_b_printf(head,
"IGNORE ");
6217 id_len= my_strmov_quoted_identifier(this->thd, temp_buf,
table_name, 0);
6219 id_len= my_strmov_quoted_identifier(temp_buf,
table_name);
6221 temp_buf[id_len]=
'\0';
6222 my_b_printf(head,
"INTO TABLE %s", temp_buf);
6224 my_b_printf(head,
" FIELDS TERMINATED BY ");
6225 pretty_print_str(head, sql_ex.field_term, sql_ex.field_term_len);
6227 if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
6228 my_b_printf(head,
" OPTIONALLY ");
6229 my_b_printf(head,
" ENCLOSED BY ");
6230 pretty_print_str(head, sql_ex.enclosed, sql_ex.enclosed_len);
6232 my_b_printf(head,
" ESCAPED BY ");
6233 pretty_print_str(head, sql_ex.escaped, sql_ex.escaped_len);
6235 my_b_printf(head,
" LINES TERMINATED BY ");
6236 pretty_print_str(head, sql_ex.line_term, sql_ex.line_term_len);
6239 if (sql_ex.line_start)
6241 my_b_printf(head,
" STARTING BY ");
6242 pretty_print_str(head, sql_ex.line_start, sql_ex.line_start_len);
6244 if ((
long) skip_lines > 0)
6245 my_b_printf(head,
" IGNORE %ld LINES", (
long) skip_lines);
6250 const char* field = fields;
6251 my_b_printf(head,
" (");
6252 for (i = 0; i < num_fields; i++)
6255 my_b_printf(head,
",");
6256 id_len= my_strmov_quoted_identifier((
char *) temp_buf, field);
6257 temp_buf[id_len]=
'\0';
6258 my_b_printf(head,
"%s", temp_buf);
6260 field += field_lens[
i] + 1;
6262 my_b_printf(head,
")");
6265 my_b_printf(head,
"%s\n", print_event_info->delimiter);
6270 #ifndef MYSQL_CLIENT
6282 void Load_log_event::set_fields(
const char* affected_db,
6287 const char* field = fields;
6288 for (i= 0; i < num_fields; i++)
6292 field+= field_lens[
i] + 1;
6298 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
6329 bool use_rli_only_for_errors)
6331 DBUG_ASSERT(thd->query() == 0);
6332 thd->reset_query_inner();
6333 set_thd_db(thd, db, db_len);
6334 thd->is_slave_error= 0;
6335 clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
6338 DBUG_ASSERT(!rli->m_table_map.count());
6344 thd->lex->local_file= local_fname;
6347 if (!use_rli_only_for_errors)
6353 const_cast<Relay_log_info*
>(rli)->set_future_group_master_log_pos(log_pos);
6354 DBUG_PRINT(
"info", (
"log_pos: %lu", (ulong) log_pos));
6380 if (rpl_filter->db_ok(thd->db))
6382 thd->set_time(&when);
6383 thd->set_query_id(next_query_id());
6384 thd->get_stmt_da()->opt_clear_warning_info(thd->query_id);
6387 char table_buf[NAME_LEN + 1];
6389 if (lower_case_table_names == 1)
6390 my_casedn_str(system_charset_info, table_buf);
6393 table_buf, strlen(table_buf),
6394 table_buf, TL_WRITE);
6398 if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
6402 skip_load_data_infile(net);
6408 enum enum_duplicates handle_dup;
6410 char *load_data_query;
6416 if (!(load_data_query= (
char *)thd->alloc(get_query_buffer_length() + 1)))
6425 print_query(FALSE, NULL, load_data_query, &end, NULL, NULL);
6427 thd->set_query(load_data_query, (uint) (end - load_data_query));
6429 if (sql_ex.opt_flags & REPLACE_FLAG)
6430 handle_dup= DUP_REPLACE;
6431 else if (sql_ex.opt_flags & IGNORE_FLAG)
6434 handle_dup= DUP_ERROR;
6451 handle_dup= DUP_ERROR;
6461 thd->lex->sql_command= SQLCOM_LOAD;
6462 thd->lex->duplicates= handle_dup;
6464 sql_exchange ex((
char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
6465 String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
6466 String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
6467 String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
6468 String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
6469 String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
6470 const String empty_str(
"", 0, log_cs);
6471 ex.field_term= &field_term;
6472 ex.enclosed= &enclosed;
6473 ex.line_term= &line_term;
6474 ex.line_start= &line_start;
6475 ex.escaped= &escaped;
6477 ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
6478 if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
6479 ex.field_term= &empty_str;
6481 ex.skip_lines = skip_lines;
6483 thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
6484 set_fields(tables.db, field_list, &thd->lex->select_lex.context);
6485 thd->variables.pseudo_thread_id= thread_id;
6489 thd->net.vio = net->vio;
6491 thd->net.pkt_nr = net->pkt_nr;
6499 mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
6500 handle_dup, ignore, net != 0))
6501 thd->is_slave_error= 1;
6502 if (thd->cuted_fields)
6505 sql_print_warning(
"Slave: load data infile on table '%s' at "
6506 "log position %s in log '%s' produced %ld "
6507 "warning(s). Default database: '%s'",
6509 llstr(log_pos,llbuff),
6510 const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
6511 (ulong) thd->cuted_fields,
6512 print_slave_db_safe(thd->db));
6515 net->pkt_nr= thd->net.pkt_nr;
6526 skip_load_data_infile(net);
6531 const char *remember_db= thd->db;
6533 thd->set_db(NULL, 0);
6535 thd->get_stmt_da()->set_overwrite_status(
true);
6536 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
6537 thd->get_stmt_da()->set_overwrite_status(
false);
6538 close_thread_tables(thd);
6551 if (thd->transaction_rollback_request)
6553 trans_rollback_implicit(thd);
6554 thd->mdl_context.release_transactional_locks();
6556 else if (! thd->in_multi_stmt_transaction_mode())
6557 thd->mdl_context.release_transactional_locks();
6559 thd->mdl_context.release_statement_locks();
6561 DBUG_EXECUTE_IF(
"LOAD_DATA_INFILE_has_fatal_error",
6562 thd->is_slave_error= 0; thd->is_fatal_error= 1;);
6564 if (thd->is_slave_error)
6569 if (thd->is_error())
6571 err= thd->get_stmt_da()->message();
6572 sql_errno= thd->get_stmt_da()->sql_errno();
6576 sql_errno=ER_UNKNOWN_ERROR;
6579 rli->
report(ERROR_LEVEL, sql_errno,
"\
6580 Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
6581 err, (
char*)
table_name, print_slave_db_safe(remember_db));
6582 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
6585 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
6587 if (thd->is_fatal_error)
6590 my_snprintf(buf,
sizeof(buf),
6591 "Running LOAD DATA INFILE on table '%-.64s'."
6592 " Default database: '%-.64s'",
6594 print_slave_db_safe(remember_db));
6596 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
6597 ER(ER_SLAVE_FATAL_ERROR), buf);
6601 return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
6614 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
6615 int Rotate_log_event::pack_info(
Protocol *protocol)
6617 char buf1[256], buf[22];
6618 String tmp(buf1,
sizeof(buf1), log_cs);
6620 tmp.append(new_log_ident, ident_len);
6621 tmp.append(STRING_WITH_LEN(
";pos="));
6622 tmp.append(llstr(pos,buf));
6623 protocol->
store(tmp.ptr(), tmp.length(), &my_charset_bin);
6634 void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
6637 IO_CACHE *
const head= &print_event_info->head_cache;
6639 if (print_event_info->short_form)
6641 print_header(head, print_event_info, FALSE);
6642 my_b_printf(head,
"\tRotate to ");
6644 my_b_write(head, (uchar*) new_log_ident, (uint)ident_len);
6645 my_b_printf(head,
" pos: %s\n", llstr(pos, buf));
6656 #ifndef MYSQL_CLIENT
6657 Rotate_log_event::Rotate_log_event(
const char* new_log_ident_arg,
6658 uint ident_len_arg, ulonglong pos_arg,
6661 new_log_ident(new_log_ident_arg), pos(pos_arg),ident_len(ident_len_arg ?
6662 ident_len_arg : (uint) strlen(new_log_ident_arg)), flags(flags_arg)
6666 DBUG_ENTER(
"Rotate_log_event::Rotate_log_event(...,flags)");
6667 DBUG_PRINT(
"enter",(
"new_log_ident: %s pos: %s flags: %lu", new_log_ident_arg,
6668 llstr(pos_arg, buff), (ulong) flags));
6670 if (flags & DUP_NAME)
6671 new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
6672 if (flags & RELAY_LOG)
6673 set_relay_log_event();
6679 Rotate_log_event::Rotate_log_event(
const char* buf, uint event_len,
6681 :
Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
6683 DBUG_ENTER(
"Rotate_log_event::Rotate_log_event(char*,...)");
6685 uint8 header_size= description_event->common_header_len;
6686 uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
6688 if (event_len < header_size)
6691 pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
6692 ident_len = (uint)(event_len -
6693 (header_size+post_header_len));
6694 ident_offset = post_header_len;
6695 set_if_smaller(ident_len,FN_REFLEN-1);
6696 new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
6697 DBUG_PRINT(
"debug", (
"new_log_ident: '%s'", new_log_ident));
6706 #ifndef MYSQL_CLIENT
6707 bool Rotate_log_event::write(
IO_CACHE* file)
6709 char buf[ROTATE_HEADER_LEN];
6710 int8store(buf + R_POS_OFFSET, pos);
6711 return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
6712 wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) ||
6713 wrapper_my_b_safe_write(file, (uchar*) new_log_ident,
6714 (uint) ident_len) ||
6715 write_footer(file));
6720 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
6741 DBUG_ENTER(
"Rotate_log_event::do_update_pos");
6746 DBUG_PRINT(
"info", (
"server_id=%lu; ::server_id=%lu",
6747 (ulong) this->server_id, (ulong) ::server_id));
6748 DBUG_PRINT(
"info", (
"new_log_ident: %s", this->new_log_ident));
6749 DBUG_PRINT(
"info", (
"pos: %s", llstr(this->pos, buf)));
6767 if ((server_id != ::server_id || rli->replicate_same_server_id) &&
6768 !is_relay_log_event() &&
6770 rli->mts_group_status != Relay_log_info::MTS_IN_GROUP))
6779 if ((error= mts_checkpoint_routine(rli, 0,
false,
6785 DBUG_PRINT(
"info", (
"old group_master_log_name: '%s' "
6786 "old group_master_log_pos: %lu",
6787 rli->get_group_master_log_name(),
6788 (ulong) rli->get_group_master_log_pos()));
6790 memcpy((
void *)rli->get_group_master_log_name(),
6791 new_log_ident, ident_len + 1);
6793 if ((error= rli->inc_group_relay_log_pos(pos,
6800 DBUG_PRINT(
"info", (
"new group_master_log_name: '%s' "
6801 "new group_master_log_pos: %lu",
6802 rli->get_group_master_log_name(),
6803 (ulong) rli->get_group_master_log_pos()));
6816 set_slave_thread_options(thd);
6817 set_slave_thread_default_charset(thd, rli);
6818 thd->variables.sql_mode= global_system_variables.
sql_mode;
6819 thd->variables.auto_increment_increment=
6820 thd->variables.auto_increment_offset= 1;
6823 rli->inc_event_relay_log_pos();
6858 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
6859 int Intvar_log_event::pack_info(
Protocol *protocol)
6861 char buf[256], *pos;
6862 pos= strmake(buf, get_var_type_name(),
sizeof(buf)-23);
6864 pos= longlong10_to_str(val, pos, -10);
6865 protocol->
store(buf, (uint) (pos-buf), &my_charset_bin);
6875 Intvar_log_event::Intvar_log_event(
const char* buf,
6880 buf+= description_event->common_header_len +
6881 description_event->post_header_len[INTVAR_EVENT-1];
6882 type= buf[I_TYPE_OFFSET];
6883 val= uint8korr(buf+I_VAL_OFFSET);
6891 const char* Intvar_log_event::get_var_type_name()
6894 case LAST_INSERT_ID_EVENT:
return "LAST_INSERT_ID";
6895 case INSERT_ID_EVENT:
return "INSERT_ID";
6896 default:
return "UNKNOWN";
6905 #ifndef MYSQL_CLIENT
6906 bool Intvar_log_event::write(
IO_CACHE* file)
6909 buf[I_TYPE_OFFSET]= (uchar) type;
6910 int8store(buf + I_VAL_OFFSET, val);
6911 return (write_header(file,
sizeof(buf)) ||
6912 wrapper_my_b_safe_write(file, buf,
sizeof(buf)) ||
6913 write_footer(file));
6923 void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
6928 IO_CACHE *
const head= &print_event_info->head_cache;
6930 if (!print_event_info->short_form)
6932 print_header(head, print_event_info, FALSE);
6933 my_b_printf(head,
"\tIntvar\n");
6936 my_b_printf(head,
"SET ");
6938 case LAST_INSERT_ID_EVENT:
6939 msg=
"LAST_INSERT_ID";
6941 case INSERT_ID_EVENT:
6944 case INVALID_INT_EVENT:
6949 my_b_printf(head,
"%s=%s%s\n",
6950 msg, llstr(val,llbuff), print_event_info->delimiter);
6955 #if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT)
6969 if (rli->deferred_events_collecting)
6970 return rli->deferred_events->add(
this);
6973 case LAST_INSERT_ID_EVENT:
6974 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
6975 thd->first_successful_insert_id_in_prev_stmt= val;
6977 case INSERT_ID_EVENT:
6978 thd->force_one_auto_inc_interval(val);
6986 rli->inc_event_relay_log_pos();
7002 return continue_group(rli);
7012 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7013 int Rand_log_event::pack_info(
Protocol *protocol)
7015 char buf1[256], *pos;
7016 pos= strmov(buf1,
"rand_seed1=");
7017 pos= int10_to_str((
long) seed1, pos, 10);
7018 pos= strmov(pos,
",rand_seed2=");
7019 pos= int10_to_str((
long) seed2, pos, 10);
7020 protocol->
store(buf1, (uint) (pos-buf1), &my_charset_bin);
7026 Rand_log_event::Rand_log_event(
const char* buf,
7031 buf+= description_event->common_header_len +
7032 description_event->post_header_len[RAND_EVENT-1];
7033 seed1= uint8korr(buf+RAND_SEED1_OFFSET);
7034 seed2= uint8korr(buf+RAND_SEED2_OFFSET);
7038 #ifndef MYSQL_CLIENT
7039 bool Rand_log_event::write(
IO_CACHE* file)
7042 int8store(buf + RAND_SEED1_OFFSET, seed1);
7043 int8store(buf + RAND_SEED2_OFFSET, seed2);
7044 return (write_header(file,
sizeof(buf)) ||
7045 wrapper_my_b_safe_write(file, buf,
sizeof(buf)) ||
7046 write_footer(file));
7052 void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
7054 IO_CACHE *
const head= &print_event_info->head_cache;
7056 char llbuff[22],llbuff2[22];
7057 if (!print_event_info->short_form)
7059 print_header(head, print_event_info, FALSE);
7060 my_b_printf(head,
"\tRand\n");
7062 my_b_printf(head,
"SET @@RAND_SEED1=%s, @@RAND_SEED2=%s%s\n",
7063 llstr(seed1, llbuff),llstr(seed2, llbuff2),
7064 print_event_info->delimiter);
7069 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7078 if (rli->deferred_events_collecting)
7079 return rli->deferred_events->add(
this);
7081 thd->rand.seed1= (ulong) seed1;
7082 thd->rand.seed2= (ulong) seed2;
7088 rli->inc_event_relay_log_pos();
7104 return continue_group(rli);
7115 bool slave_execute_deferred_events(THD *thd)
7120 DBUG_ASSERT(rli && (!rli->deferred_events_collecting || rli->deferred_events));
7122 if (!rli->deferred_events_collecting || rli->deferred_events->is_empty())
7125 res= rli->deferred_events->execute(rli);
7137 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7138 int Xid_log_event::pack_info(
Protocol *protocol)
7140 char buf[128], *pos;
7141 pos= strmov(buf,
"COMMIT /* xid=");
7142 pos= longlong10_to_str(xid, pos, 10);
7143 pos= strmov(pos,
" */");
7144 protocol->
store(buf, (uint) (pos-buf), &my_charset_bin);
7164 buf+= description_event->common_header_len +
7165 description_event->post_header_len[XID_EVENT-1];
7166 memcpy((
char*) &xid, buf,
sizeof(xid));
7170 #ifndef MYSQL_CLIENT
7171 bool Xid_log_event::write(
IO_CACHE* file)
7173 DBUG_EXECUTE_IF(
"do_not_write_xid",
return 0;);
7174 return (write_header(file,
sizeof(xid)) ||
7175 wrapper_my_b_safe_write(file, (uchar*) &xid,
sizeof(xid)) ||
7176 write_footer(file));
7182 void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
7184 IO_CACHE *
const head= &print_event_info->head_cache;
7186 if (!print_event_info->short_form)
7189 longlong10_to_str(xid, buf, 10);
7191 print_header(head, print_event_info, FALSE);
7192 my_b_printf(head,
"\tXid = %s\n", buf);
7194 my_b_printf(head,
"COMMIT%s\n", print_event_info->delimiter);
7199 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7209 bool Xid_log_event::do_commit(THD *thd)
7211 bool error= trans_commit(thd);
7212 DBUG_EXECUTE_IF(
"crash_after_apply",
7213 sql_print_information(
"Crashing crash_after_apply.");
7215 thd->mdl_context.release_transactional_locks();
7217 if (thd->variables.gtid_next.type == GTID_GROUP &&
7218 thd->owned_gtid.sidno != 0)
7228 status_var_increment(thd->status_var.com_stat[SQLCOM_COMMIT]);
7240 int Xid_log_event::do_apply_event_worker(Slave_worker *w)
7243 Slave_committed_queue *coordinator_gaq= w->c_rli->gaq;
7246 general_log_print(thd, COM_QUERY,
7247 "COMMIT /* implicit, from Xid_log_event */");
7249 DBUG_PRINT(
"mts", (
"do_apply group master %s %llu group relay %s %llu event %s %llu.",
7250 w->get_group_master_log_name(),
7251 w->get_group_master_log_pos(),
7252 w->get_group_relay_log_name(),
7253 w->get_group_relay_log_pos(),
7254 w->get_event_relay_log_name(),
7255 w->get_event_relay_log_pos()));
7257 DBUG_EXECUTE_IF(
"crash_before_update_pos",
7258 sql_print_information(
"Crashing crash_before_update_pos.");
7262 Slave_job_group *ptr_group= coordinator_gaq->get_job_group(gaq_idx);
7264 if ((error= w->commit_positions(
this, ptr_group,
7265 w->c_rli->is_transactional())))
7268 DBUG_PRINT(
"mts", (
"do_apply group master %s %llu group relay %s %llu event %s %llu.",
7269 w->get_group_master_log_name(),
7270 w->get_group_master_log_pos(),
7271 w->get_group_relay_log_name(),
7272 w->get_group_relay_log_pos(),
7273 w->get_event_relay_log_name(),
7274 w->get_event_relay_log_pos()));
7276 DBUG_EXECUTE_IF(
"crash_after_update_pos_before_apply",
7277 sql_print_information(
"Crashing crash_after_update_pos_before_apply.");
7280 error= do_commit(thd);
7293 general_log_print(thd, COM_QUERY,
7294 "COMMIT /* implicit, from Xid_log_event */");
7298 DBUG_PRINT(
"info", (
"do_apply group master %s %llu group relay %s %llu event %s %llu\n",
7299 rli_ptr->get_group_master_log_name(),
7300 rli_ptr->get_group_master_log_pos(),
7301 rli_ptr->get_group_relay_log_name(),
7302 rli_ptr->get_group_relay_log_pos(),
7303 rli_ptr->get_event_relay_log_name(),
7304 rli_ptr->get_event_relay_log_pos()));
7306 DBUG_EXECUTE_IF(
"crash_before_update_pos",
7307 sql_print_information(
"Crashing crash_before_update_pos.");
7313 rli_ptr->inc_event_relay_log_pos();
7314 rli_ptr->set_group_relay_log_pos(rli_ptr->get_event_relay_log_pos());
7315 rli_ptr->set_group_relay_log_name(rli_ptr->get_event_relay_log_name());
7320 rli_ptr->set_group_master_log_pos(log_pos);
7322 if ((error= rli_ptr->
flush_info(rli_ptr->is_transactional())))
7325 DBUG_PRINT(
"info", (
"do_apply group master %s %llu group relay %s %llu event %s %llu\n",
7326 rli_ptr->get_group_master_log_name(),
7327 rli_ptr->get_group_master_log_pos(),
7328 rli_ptr->get_group_relay_log_name(),
7329 rli_ptr->get_group_relay_log_pos(),
7330 rli_ptr->get_event_relay_log_name(),
7331 rli_ptr->get_event_relay_log_pos()));
7333 DBUG_EXECUTE_IF(
"crash_after_update_pos_before_apply",
7334 sql_print_information(
"Crashing crash_after_update_pos_before_apply.");
7343 DBUG_EXECUTE_IF(
"simulate_commit_failure",
7345 thd->transaction.xid_state.xa_state = XA_IDLE;
7347 error= do_commit(thd);
7349 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
7350 "Error in Xid_log_event: Commit could not be completed, '%s'",
7351 thd->get_stmt_da()->message());
7362 DBUG_ENTER(
"Xid_log_event::do_shall_skip");
7363 if (rli->slave_skip_counter > 0) {
7364 thd->variables.option_bits&= ~OPTION_BEGIN;
7367 DBUG_RETURN(Log_event::do_shall_skip(rli));
7376 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7377 int User_var_log_event::pack_info(
Protocol* protocol)
7380 char quoted_id[1 + FN_REFLEN * 2 + 2];
7381 int id_len= my_strmov_quoted_identifier(this->thd, quoted_id, name, name_len);
7382 quoted_id[id_len]=
'\0';
7383 uint val_offset= 2 + id_len;
7384 uint event_len= val_offset;
7388 if (!(buf= (
char*) my_malloc(val_offset + 5, MYF(MY_WME))))
7390 strmov(buf + val_offset,
"NULL");
7391 event_len= val_offset + 4;
7398 float8get(real_val, val);
7399 if (!(buf= (
char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
7402 event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
7403 buf + val_offset, NULL);
7406 if (!(buf= (
char*) my_malloc(val_offset + 22, MYF(MY_WME))))
7408 event_len= longlong10_to_str(uint8korr(val), buf + val_offset,
7409 ((flags & User_var_log_event::UNSIGNED_F) ?
7412 case DECIMAL_RESULT:
7419 binary2my_decimal(E_DEC_FATAL_ERROR, (uchar*) (val+2), &dec, val[0],
7421 my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
7422 event_len= str.length() + val_offset;
7427 buf= (
char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
7432 if (!(cs= get_charset(charset_number, MYF(0))))
7434 strmov(buf+val_offset,
"???");
7439 char *p= strxmov(buf + val_offset,
"_", cs->csname,
" ", NullS);
7441 p= strxmov(p,
" COLLATE ", cs->name, NullS);
7452 memcpy(buf + 1, quoted_id, id_len);
7453 buf[1 + id_len]=
'=';
7454 protocol->
store(buf, event_len, &my_charset_bin);
7465 #ifndef MYSQL_CLIENT
7466 , deferred(false), query_id(0)
7470 const char* buf_start=
buf;
7472 const char *start=
buf;
7473 buf+= description_event->common_header_len +
7474 description_event->post_header_len[USER_VAR_EVENT-1];
7475 name_len= uint4korr(buf);
7476 name= (
char *) buf + UV_NAME_LEN_SIZE;
7483 if (!valid_buffer_range<uint>(name_len, buf_start, name,
7484 event_len - UV_VAL_IS_NULL))
7490 buf+= UV_NAME_LEN_SIZE + name_len;
7491 is_null= (bool) *buf;
7492 flags= User_var_log_event::UNDEF_F;
7495 type= STRING_RESULT;
7496 charset_number= my_charset_bin.number;
7502 if (!valid_buffer_range<uint>(UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE
7503 + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE,
7504 buf_start, buf, event_len))
7510 type= (Item_result) buf[UV_VAL_IS_NULL];
7511 charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
7512 val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
7513 UV_CHARSET_NUMBER_SIZE);
7514 val= (
char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
7515 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
7517 if (!valid_buffer_range<uint>(val_len, buf_start, val, event_len))
7534 uint bytes_read= ((val + val_len) - start);
7538 DBUG_ASSERT((bytes_read == data_written -
7539 (old_pre_checksum_fd ||
7540 (description_event->checksum_alg ==
7541 BINLOG_CHECKSUM_ALG_OFF)) ?
7544 (bytes_read == data_written -1 -
7545 (old_pre_checksum_fd ||
7546 (description_event->checksum_alg ==
7547 BINLOG_CHECKSUM_ALG_OFF)) ?
7548 0 : BINLOG_CHECKSUM_LEN));
7549 if ((data_written - bytes_read) > 0)
7551 flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
7552 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE +
7563 #ifndef MYSQL_CLIENT
7564 bool User_var_log_event::write(
IO_CACHE* file)
7566 char buf[UV_NAME_LEN_SIZE];
7567 char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
7568 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
7570 uint unsigned_len= 0;
7574 int4store(buf, name_len);
7576 if ((buf1[0]= is_null))
7584 int4store(buf1 + 2, charset_number);
7588 float8store(buf2, *(
double*) val);
7591 int8store(buf2, *(longlong*) val);
7594 case DECIMAL_RESULT:
7597 dec->fix_buffer_pointer();
7598 buf2[0]= (char)(dec->intg + dec->frac);
7599 buf2[1]= (char)dec->frac;
7600 decimal2bin((
decimal_t*)val, buf2+2, buf2[0], buf2[1]);
7601 val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
7612 int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
7617 event_length=
sizeof(
buf)+ name_len + buf1_length + val_len + unsigned_len;
7619 return (write_header(file, event_length) ||
7620 wrapper_my_b_safe_write(file, (uchar*) buf,
sizeof(buf)) ||
7621 wrapper_my_b_safe_write(file, (uchar*) name, name_len) ||
7622 wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
7623 wrapper_my_b_safe_write(file, pos, val_len) ||
7624 wrapper_my_b_safe_write(file, &flags, unsigned_len) ||
7625 write_footer(file));
7635 void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
7637 IO_CACHE *
const head= &print_event_info->head_cache;
7638 char quoted_id[1 + NAME_LEN * 2 + 2];
7639 char name_id[NAME_LEN];
7642 if (!print_event_info->short_form)
7644 print_header(head, print_event_info, FALSE);
7645 my_b_printf(head,
"\tUser_var\n");
7647 strmov(name_id, name);
7648 name_id[name_len]=
'\0';
7649 my_b_printf(head,
"SET @");
7650 quoted_len= my_strmov_quoted_identifier((
char *) quoted_id,
7651 (
const char *) name_id);
7652 quoted_id[quoted_len]=
'\0';
7653 my_b_write(head, (uchar*) quoted_id, quoted_len);
7657 my_b_printf(head,
":=NULL%s\n", print_event_info->delimiter);
7664 char real_buf[FMT_G_BUFSIZE(14)];
7665 float8get(real_val, val);
7666 sprintf(real_buf,
"%.14g", real_val);
7667 my_b_printf(head,
":=%s%s\n", real_buf, print_event_info->delimiter);
7671 longlong10_to_str(uint8korr(val), int_buf,
7672 ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10));
7673 my_b_printf(head,
":=%s%s\n", int_buf, print_event_info->delimiter);
7675 case DECIMAL_RESULT:
7678 int str_len=
sizeof(str_buf) - 1;
7679 int precision= (int)val[0];
7680 int scale= (int)val[1];
7681 decimal_digit_t dec_buf[10];
7686 bin2decimal((uchar*) val+2, &dec, precision, scale);
7687 decimal2string(&dec, str_buf, &str_len, 0, 0, 0);
7688 str_buf[str_len]= 0;
7689 my_b_printf(head,
":=%s%s\n", str_buf, print_event_info->delimiter);
7711 hex_str= (
char *)my_malloc(2*val_len+1+2,MYF(MY_WME));
7721 if (!(cs= get_charset(charset_number, MYF(0))))
7726 my_b_printf(head,
":=???%s\n", print_event_info->delimiter);
7728 my_b_printf(head,
":=_%s %s COLLATE `%s`%s\n",
7729 cs->csname, hex_str, cs->name,
7730 print_event_info->delimiter);
7748 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7749 int User_var_log_event::do_apply_event(
Relay_log_info const *rli)
7753 query_id_t sav_query_id= 0;
7755 if (rli->deferred_events_collecting)
7757 set_deferred(current_thd->query_id);
7758 return rli->deferred_events->add(
this);
7759 }
else if (is_deferred())
7761 sav_query_id= current_thd->query_id;
7762 current_thd->query_id= query_id;
7765 if (!(charset= get_charset(charset_number, MYF(MY_WME))))
7784 float8get(real_val, val);
7786 val= (
char*) &real_val;
7790 int_val= (longlong) uint8korr(val);
7792 val= (
char*) &int_val;
7795 case DECIMAL_RESULT:
7799 val= (
char *)dec->val_decimal(NULL);
7822 if (e->fix_fields(thd, 0))
7830 e->update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT,
7831 (flags & User_var_log_event::UNSIGNED_F));
7833 free_root(thd->mem_root, 0);
7835 current_thd->query_id= sav_query_id;
7842 rli->inc_event_relay_log_pos();
7857 return continue_group(rli);
7866 #ifdef HAVE_REPLICATION
7868 void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info)
7870 if (print_event_info->short_form)
7872 print_header(&print_event_info->head_cache, print_event_info, FALSE);
7873 my_b_printf(&print_event_info->head_cache,
"\n# %s",
"Unknown event\n");
7886 void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
7888 if (print_event_info->short_form)
7891 print_header(&print_event_info->head_cache, print_event_info, FALSE);
7892 my_b_printf(&print_event_info->head_cache,
"\tStop\n");
7897 #ifndef MYSQL_CLIENT
7925 if ((thd->variables.option_bits & OPTION_BEGIN) || rli->
is_parallel_exec())
7926 rli->inc_event_relay_log_pos();
7929 error_inc= rli->inc_group_relay_log_pos(0,
true);
7932 return (error_inc || error_flush);
7947 #ifndef MYSQL_CLIENT
7948 Create_file_log_event::
7949 Create_file_log_event(THD* thd_arg, sql_exchange* ex,
7950 const char* db_arg,
const char* table_name_arg,
7952 bool is_concurrent_arg,
7953 enum enum_duplicates handle_dup,
7955 uchar* block_arg, uint block_len_arg,
bool using_trans)
7958 handle_dup, ignore, using_trans),
7959 fake_base(0),
block(block_arg), event_buf(0), block_len(block_len_arg),
7960 file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
7962 DBUG_ENTER(
"Create_file_log_event");
7963 sql_ex.force_new_format();
7972 bool Create_file_log_event::write_data_body(
IO_CACHE* file)
7975 if ((res= Load_log_event::write_data_body(file)) || fake_base)
7977 return (my_b_safe_write(file, (uchar*)
"", 1) ||
7978 my_b_safe_write(file, (uchar*)
block, block_len));
7986 bool Create_file_log_event::write_data_header(
IO_CACHE* file)
7989 uchar buf[CREATE_FILE_HEADER_LEN];
7990 if ((res= Load_log_event::write_data_header(file)) || fake_base)
7992 int4store(buf + CF_FILE_ID_OFFSET, file_id);
7993 return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
8001 bool Create_file_log_event::write_base(
IO_CACHE* file)
8016 Create_file_log_event::Create_file_log_event(
const char* buf, uint len,
8020 DBUG_ENTER(
"Create_file_log_event::Create_file_log_event(char*,...)");
8022 uint header_len= description_event->common_header_len;
8023 uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1];
8024 uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
8025 if (!(event_buf= (
char*) my_memdup(buf, len, MYF(MY_WME))) ||
8026 copy_log_event(event_buf,len,
8027 ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
8028 load_header_len + header_len :
8029 (fake_base ? (header_len+load_header_len) :
8030 (header_len+load_header_len) +
8031 create_file_header_len)),
8034 if (description_event->binlog_version!=1)
8036 file_id= uint4korr(buf +
8038 load_header_len + CF_FILE_ID_OFFSET);
8050 block_offset= (description_event->common_header_len +
8051 Load_log_event::get_data_size() +
8052 create_file_header_len + 1);
8053 if (len < block_offset)
8055 block = (uchar*)buf + block_offset;
8056 block_len = len - block_offset;
8060 sql_ex.force_new_format();
8061 inited_from_old = 1;
8072 void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info,
8075 if (print_event_info->short_form)
8077 if (enable_local && check_fname_outside_temp_buf())
8078 Load_log_event::print(file, print_event_info);
8084 Load_log_event::print(file, print_event_info,
8085 !check_fname_outside_temp_buf());
8090 DBUG_EXECUTE_IF (
"simulate_create_event_write_error",
8091 {(&print_event_info->head_cache)->write_pos=
8092 (&print_event_info->head_cache)->write_end;
8093 DBUG_SET(
"+d,simulate_file_write_error");});
8098 my_b_printf(&print_event_info->head_cache,
"#");
8101 my_b_printf(&print_event_info->head_cache,
8102 " file_id: %d block_len: %d\n", file_id, block_len);
8106 void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
8108 print(file, print_event_info, 0);
8117 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8118 int Create_file_log_event::pack_info(
Protocol *protocol)
8120 char buf[NAME_LEN*2 + 30 + 21*2], *pos;
8121 pos= strmov(buf,
"db=");
8122 memcpy(pos, db, db_len);
8123 pos= strmov(pos + db_len,
";table=");
8125 pos= strmov(pos + table_name_len,
";file_id=");
8126 pos= int10_to_str((
long) file_id, pos, 10);
8127 pos= strmov(pos,
";block_len=");
8128 pos= int10_to_str((
long) block_len, pos, 10);
8129 protocol->
store(buf, (uint) (pos-buf), &my_charset_bin);
8146 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8147 int Create_file_log_event::do_apply_event(
Relay_log_info const *rli)
8157 THD_STAGE_INFO(thd, stage_making_temp_file_create_before_load_data);
8158 memset(&file, 0,
sizeof(file));
8159 ext= slave_load_file_stem(fname_buf, file_id, server_id,
".info");
8166 DBUG_EXECUTE_IF(
"simulate_file_create_error_create_log_event",
8168 strcat(fname_buf,
"/");
8171 fname_buf, CREATE_MODE,
8172 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
8173 MYF(MY_WME))) < 0 ||
8174 init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
8175 MYF(MY_WME|MY_NABP)))
8177 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
8178 "Error in Create_file event: could not open file '%s'",
8185 fname_len= (uint) (strmov(ext,
".data") - fname);
8186 if (write_base(&file))
8188 strmov(ext,
".info");
8189 rli->
report(ERROR_LEVEL, my_errno,
8190 "Error in Create_file event: could not write to file '%s'",
8194 end_io_cache(&file);
8201 fname_buf, CREATE_MODE,
8202 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
8205 rli->
report(ERROR_LEVEL, my_errno,
8206 "Error in Create_file event: could not open file '%s'",
8214 DBUG_EXECUTE_IF(
"simulate_file_write_error_create_log_event",
8220 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
8221 "Error in Create_file event: write to '%s' failed",
8229 end_io_cache(&file);
8245 #ifndef MYSQL_CLIENT
8252 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
8256 block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
8270 DBUG_ENTER(
"Append_block_log_event::Append_block_log_event(char*,...)");
8271 uint8 common_header_len= description_event->common_header_len;
8272 uint8 append_block_header_len=
8273 description_event->post_header_len[APPEND_BLOCK_EVENT-1];
8274 uint total_header_len= common_header_len+append_block_header_len;
8275 if (len < total_header_len)
8277 file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
8278 block= (uchar*)buf + total_header_len;
8279 block_len= len - total_header_len;
8288 #ifndef MYSQL_CLIENT
8289 bool Append_block_log_event::write(
IO_CACHE* file)
8291 uchar buf[APPEND_BLOCK_HEADER_LEN];
8292 int4store(buf + AB_FILE_ID_OFFSET, file_id);
8293 return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
8294 wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
8295 wrapper_my_b_safe_write(file, (uchar*)
block, block_len) ||
8296 write_footer(file));
8306 void Append_block_log_event::print(FILE* file,
8307 PRINT_EVENT_INFO* print_event_info)
8309 if (print_event_info->short_form)
8311 print_header(&print_event_info->head_cache, print_event_info, FALSE);
8312 my_b_printf(&print_event_info->head_cache,
8313 "\n#%s: file_id: %d block_len: %d\n",
8323 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8324 int Append_block_log_event::pack_info(
Protocol *protocol)
8328 length= my_snprintf(buf,
sizeof(buf),
";file_id=%u;block_len=%u",
8329 file_id, block_len);
8330 protocol->
store(buf, length, &my_charset_bin);
8339 int Append_block_log_event::get_create_or_append()
const
8348 int Append_block_log_event::do_apply_event(
Relay_log_info const *rli)
8353 DBUG_ENTER(
"Append_block_log_event::do_apply_event");
8355 THD_STAGE_INFO(thd, stage_making_temp_file_append_before_load_data);
8356 slave_load_file_stem(fname, file_id, server_id,
".data");
8357 if (get_create_or_append())
8369 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
8372 rli->
report(ERROR_LEVEL, my_errno,
8373 "Error in %s event: could not create file '%s'",
8380 O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
8383 rli->
report(ERROR_LEVEL, my_errno,
8384 "Error in %s event: could not open file '%s'",
8389 DBUG_EXECUTE_IF(
"remove_slave_load_file_before_write",
8391 my_delete_allow_opened(fname, MYF(0));
8396 rli->
report(ERROR_LEVEL, my_errno,
8397 "Error in %s event: write to '%s' failed",
8419 #ifndef MYSQL_CLIENT
8420 Delete_file_log_event::Delete_file_log_event(THD *thd_arg,
const char* db_arg,
8423 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
8426 file_id(thd_arg->file_id), db(db_arg)
8435 Delete_file_log_event::Delete_file_log_event(
const char* buf, uint len,
8437 :
Log_event(buf, description_event),file_id(0)
8439 uint8 common_header_len= description_event->common_header_len;
8440 uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
8441 if (len < (uint)(common_header_len + delete_file_header_len))
8443 file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
8451 #ifndef MYSQL_CLIENT
8452 bool Delete_file_log_event::write(
IO_CACHE* file)
8454 uchar buf[DELETE_FILE_HEADER_LEN];
8455 int4store(buf + DF_FILE_ID_OFFSET, file_id);
8456 return (write_header(file,
sizeof(buf)) ||
8457 wrapper_my_b_safe_write(file, buf,
sizeof(buf)) ||
8458 write_footer(file));
8468 void Delete_file_log_event::print(FILE* file,
8469 PRINT_EVENT_INFO* print_event_info)
8471 if (print_event_info->short_form)
8473 print_header(&print_event_info->head_cache, print_event_info, FALSE);
8474 my_b_printf(&print_event_info->head_cache,
8475 "\n#Delete_file: file_id=%u\n", file_id);
8483 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8484 int Delete_file_log_event::pack_info(
Protocol *protocol)
8488 length= my_snprintf(buf,
sizeof(buf),
";file_id=%u", (uint) file_id);
8489 protocol->
store(buf, length, &my_charset_bin);
8498 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8499 int Delete_file_log_event::do_apply_event(
Relay_log_info const *rli)
8504 char *ext= slave_load_file_stem(fname, file_id, server_id,
".data");
8506 strmov(ext,
".info");
8521 #ifndef MYSQL_CLIENT
8522 Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
8526 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
8529 file_id(thd_arg->file_id), db(db_arg)
8539 Execute_load_log_event::Execute_load_log_event(
const char* buf, uint len,
8541 :
Log_event(buf, description_event), file_id(0)
8543 uint8 common_header_len= description_event->common_header_len;
8544 uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
8545 if (len < (uint)(common_header_len+exec_load_header_len))
8547 file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
8555 #ifndef MYSQL_CLIENT
8556 bool Execute_load_log_event::write(
IO_CACHE* file)
8558 uchar buf[EXEC_LOAD_HEADER_LEN];
8559 int4store(buf + EL_FILE_ID_OFFSET, file_id);
8560 return (write_header(file,
sizeof(buf)) ||
8561 wrapper_my_b_safe_write(file, buf,
sizeof(buf)) ||
8562 write_footer(file));
8572 void Execute_load_log_event::print(FILE* file,
8573 PRINT_EVENT_INFO* print_event_info)
8575 if (print_event_info->short_form)
8577 print_header(&print_event_info->head_cache, print_event_info, FALSE);
8578 my_b_printf(&print_event_info->head_cache,
"\n#Exec_load: file_id=%d\n",
8587 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8588 int Execute_load_log_event::pack_info(
Protocol *protocol)
8592 length= my_snprintf(buf,
sizeof(buf),
";file_id=%u", (uint) file_id);
8593 protocol->
store(buf, length, &my_charset_bin);
8602 int Execute_load_log_event::do_apply_event(
Relay_log_info const *rli)
8613 ext= slave_load_file_stem(fname, file_id, server_id,
".info");
8619 DBUG_EXECUTE_IF(
"simulate_file_open_error_exec_event",
8624 fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
8625 MYF(MY_WME))) < 0 ||
8626 init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
8627 MYF(MY_WME|MY_NABP)))
8629 rli->
report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
8630 "Error in Exec_load event: could not open file '%s'",
8635 Log_event::read_log_event(&file,
8638 opt_slave_sql_verify_checksum)) ||
8639 lev->get_type_code() != NEW_LOAD_EVENT)
8641 rli->
report(ERROR_LEVEL, 0,
"Error in Exec_load event: "
8642 "file '%s' appears corrupted", fname);
8653 const_cast<Relay_log_info*
>(rli)->set_future_group_master_log_pos(log_pos);
8654 if (lev->do_apply_event(0,rli,1))
8664 char *tmp= my_strdup(rli->last_error().
message, MYF(MY_WME));
8668 "%s. Failed executing load from '%s'", tmp, fname);
8680 end_io_cache(&file);
8684 memcpy(ext,
".data", 6);
8693 end_io_cache(&file);
8705 #ifndef MYSQL_CLIENT
8706 Begin_load_query_log_event::
8707 Begin_load_query_log_event(THD* thd_arg,
const char* db_arg, uchar* block_arg,
8708 uint block_len_arg,
bool using_trans)
8712 file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
8717 Begin_load_query_log_event::
8718 Begin_load_query_log_event(
const char* buf, uint len,
8725 #if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8726 int Begin_load_query_log_event::get_create_or_append()
const
8733 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
8741 return continue_group(rli);
8751 #ifndef MYSQL_CLIENT
8752 Execute_load_query_log_event::
8753 Execute_load_query_log_event(THD *thd_arg,
const char* query_arg,
8754 ulong query_length_arg, uint fn_pos_start_arg,
8755 uint fn_pos_end_arg,
8756 enum_load_dup_handling dup_handling_arg,
8757 bool using_trans,
bool immediate,
bool suppress_use,
8759 Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, immediate,
8760 suppress_use, errcode),
8761 file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
8762 fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
8768 Execute_load_query_log_event::
8769 Execute_load_query_log_event(
const char* buf, uint event_len,
8771 Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
8772 file_id(0), fn_pos_start(0), fn_pos_end(0)
8774 if (!Query_log_event::is_valid())
8777 buf+= desc_event->common_header_len;
8779 fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
8780 fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
8781 dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
8783 if (fn_pos_start > q_len || fn_pos_end > q_len ||
8784 dup_handling > LOAD_DUP_REPLACE)
8787 file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
8791 ulong Execute_load_query_log_event::get_post_header_size_for_derived()
8793 return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
8797 #ifndef MYSQL_CLIENT
8799 Execute_load_query_log_event::write_post_header_for_derived(
IO_CACHE* file)
8801 uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
8802 int4store(buf, file_id);
8803 int4store(buf + 4, fn_pos_start);
8804 int4store(buf + 4 + 4, fn_pos_end);
8805 *(buf + 4 + 4 + 4)= (uchar) dup_handling;
8806 return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
8812 void Execute_load_query_log_event::print(FILE* file,
8813 PRINT_EVENT_INFO* print_event_info)
8815 print(file, print_event_info, 0);
8821 void Execute_load_query_log_event::print(FILE* file,
8822 PRINT_EVENT_INFO* print_event_info,
8823 const char *local_fname)
8825 IO_CACHE *
const head= &print_event_info->head_cache;
8827 print_query_header(head, print_event_info);
8832 DBUG_EXECUTE_IF (
"simulate_execute_event_write_error",
8833 {head->write_pos= head->write_end;
8834 DBUG_SET(
"+d,simulate_file_write_error");});
8838 my_b_write(head, (uchar*)
query, fn_pos_start);
8839 my_b_printf(head,
" LOCAL INFILE \'");
8840 my_b_printf(head,
"%s", local_fname);
8841 my_b_printf(head,
"\'");
8842 if (dup_handling == LOAD_DUP_REPLACE)
8843 my_b_printf(head,
" REPLACE");
8844 my_b_printf(head,
" INTO");
8845 my_b_write(head, (uchar*)
query + fn_pos_end, q_len-fn_pos_end);
8846 my_b_printf(head,
"\n%s\n", print_event_info->delimiter);
8850 my_b_write(head, (uchar*)
query, q_len);
8851 my_b_printf(head,
"\n%s\n", print_event_info->delimiter);
8854 if (!print_event_info->short_form)
8855 my_b_printf(head,
"# file_id: %d \n", file_id);
8860 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8861 int Execute_load_query_log_event::pack_info(
Protocol *protocol)
8864 if (!(buf= (
char*) my_malloc(9 + (db_len * 2) + 2 + q_len + 10 + 21,
8875 char quoted_db[1 + NAME_LEN * 2 + 2];
8877 size= my_strmov_quoted_identifier(this->thd, quoted_db, db, 0);
8878 pos= strmov(buf,
"use ");
8879 memcpy(pos, quoted_db, size);
8880 pos= strmov(pos + size,
"; ");
8884 memcpy(pos,
query, q_len);
8887 pos= strmov(pos,
" ;file_id=");
8888 pos= int10_to_str((
long) file_id, pos, 10);
8889 protocol->
store(buf, pos-buf, &my_charset_bin);
8896 Execute_load_query_log_event::do_apply_event(
Relay_log_info const *rli)
8904 buf= (
char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
8907 DBUG_EXECUTE_IF(
"LOAD_DATA_INFILE_has_fatal_error", my_free(buf); buf= NULL;);
8912 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
8913 ER(ER_SLAVE_FATAL_ERROR),
"Not enough memory");
8918 memcpy(p,
query, fn_pos_start);
8920 fname= (p= strmake(p, STRING_WITH_LEN(
" INFILE \'")));
8921 p= slave_load_file_stem(p, file_id, server_id,
".data");
8922 fname_end= p= strend(p);
8924 switch (dup_handling) {
8925 case LOAD_DUP_IGNORE:
8926 p= strmake(p, STRING_WITH_LEN(
" IGNORE"));
8928 case LOAD_DUP_REPLACE:
8929 p= strmake(p, STRING_WITH_LEN(
" REPLACE"));
8935 p= strmake(p, STRING_WITH_LEN(
" INTO "));
8936 p= strmake(p,
query+fn_pos_end, q_len-fn_pos_end);
8938 error= Query_log_event::do_apply_event(rli, buf, p-buf);
8968 return (write_str_at_most_255_bytes(file, field_term, (uint) field_term_len) ||
8969 write_str_at_most_255_bytes(file, enclosed, (uint) enclosed_len) ||
8970 write_str_at_most_255_bytes(file, line_term, (uint) line_term_len) ||
8971 write_str_at_most_255_bytes(file, line_start, (uint) line_start_len) ||
8972 write_str_at_most_255_bytes(file, escaped, (uint) escaped_len) ||
8973 my_b_safe_write(file,(uchar*) &opt_flags,1));
8982 old_ex.field_term= *field_term;
8983 old_ex.enclosed= *enclosed;
8984 old_ex.line_term= *line_term;
8985 old_ex.line_start= *line_start;
8986 old_ex.escaped= *escaped;
8987 old_ex.opt_flags= opt_flags;
8988 old_ex.empty_flags=empty_flags;
8989 return my_b_safe_write(file, (uchar*) &old_ex,
sizeof(old_ex)) != 0;
8998 const char *sql_ex_info::init(
const char *buf,
const char *buf_end,
8999 bool use_new_format)
9001 cached_new_format = use_new_format;
9012 if (read_str_at_most_255_bytes(&buf, buf_end, &field_term, &field_term_len) ||
9013 read_str_at_most_255_bytes(&buf, buf_end, &enclosed, &enclosed_len) ||
9014 read_str_at_most_255_bytes(&buf, buf_end, &line_term, &line_term_len) ||
9015 read_str_at_most_255_bytes(&buf, buf_end, &line_start, &line_start_len) ||
9016 read_str_at_most_255_bytes(&buf, buf_end, &escaped, &escaped_len))
9022 field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
9029 empty_flags= *buf++;
9030 if (empty_flags & FIELD_TERM_EMPTY)
9032 if (empty_flags & ENCLOSED_EMPTY)
9034 if (empty_flags & LINE_TERM_EMPTY)
9036 if (empty_flags & LINE_START_EMPTY)
9038 if (empty_flags & ESCAPED_EMPTY)
9045 #ifndef MYSQL_CLIENT
9046 static uchar dbug_extra_row_data_val= 0;
9058 const uchar* set_extra_data(uchar* arr)
9060 uchar val= (dbug_extra_row_data_val++) %
9061 (EXTRA_ROW_INFO_MAX_PAYLOAD + 1);
9062 arr[EXTRA_ROW_INFO_LEN_OFFSET]= val + EXTRA_ROW_INFO_HDR_BYTES;
9063 arr[EXTRA_ROW_INFO_FORMAT_OFFSET]= val;
9064 for (uchar i=0; i<val; i++)
9065 arr[EXTRA_ROW_INFO_HDR_BYTES+i]= val;
9070 #endif // #ifndef MYSQL_CLIENT
9084 void check_extra_data(uchar* extra_row_data)
9086 assert(extra_row_data);
9087 uint16 len= extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
9088 uint8 val= len - EXTRA_ROW_INFO_HDR_BYTES;
9089 assert(extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET] == val);
9090 for (uint16 i= 0; i < val; i++)
9092 assert(extra_row_data[EXTRA_ROW_INFO_HDR_BYTES + i] == val);
9096 #endif // #ifndef DBUG_OFF
9102 #ifndef MYSQL_CLIENT
9103 Rows_log_event::Rows_log_event(THD *thd_arg,
TABLE *tbl_arg,
const Table_id& tid,
9104 MY_BITMAP const *cols,
bool using_trans,
9106 const uchar* extra_row_info)
9108 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
9114 m_width(tbl_arg ? tbl_arg->s->fields : 1),
9115 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0),
9116 m_type(event_type), m_extra_row_data(0)
9117 #ifdef HAVE_REPLICATION
9118 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL), last_hashed_key(NULL)
9121 DBUG_ASSERT(tbl_arg && tbl_arg->s && tid.is_valid());
9124 set_flags(NO_FOREIGN_KEY_CHECKS_F);
9126 set_flags(RELAXED_UNIQUE_CHECKS_F);
9128 uchar extra_data[255];
9129 DBUG_EXECUTE_IF(
"extra_row_data_set",
9131 extra_row_info = set_extra_data(extra_data););
9136 uint8 extra_data_len= extra_row_info[EXTRA_ROW_INFO_LEN_OFFSET];
9137 assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
9139 m_extra_row_data= (uchar*) my_malloc(extra_data_len, MYF(MY_WME));
9141 if (likely(m_extra_row_data != NULL))
9143 memcpy(m_extra_row_data, extra_row_info,
9149 if (likely(!bitmap_init(&m_cols,
9150 m_width <=
sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
9155 if (likely(cols != NULL))
9157 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
9158 create_last_word_mask(&m_cols);
9169 Rows_log_event::Rows_log_event(
const char *buf, uint event_len,
9174 #ifndef MYSQL_CLIENT
9177 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
9179 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
9180 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL), last_hashed_key(NULL)
9183 DBUG_ENTER(
"Rows_log_event::Rows_log_event(const char*,...)");
9184 uint8
const common_header_len= description_event->common_header_len;
9188 uint8
const post_header_len= description_event->post_header_len[event_type-1];
9190 DBUG_PRINT(
"enter",(
"event_len: %u common_header_len: %d "
9191 "post_header_len: %d",
9192 event_len, common_header_len,
9195 const char *post_start= buf + common_header_len;
9196 post_start+= RW_MAPID_OFFSET;
9197 if (post_header_len == 6)
9200 m_table_id= uint4korr(post_start);
9205 m_table_id= uint6korr(post_start);
9206 post_start+= RW_FLAGS_OFFSET;
9209 m_flags= uint2korr(post_start);
9212 uint16 var_header_len= 0;
9213 if (post_header_len == ROWS_HEADER_LEN_V2)
9219 var_header_len= uint2korr(post_start);
9220 assert(var_header_len >= 2);
9224 const char* start= post_start + 2;
9225 const char* end= start + var_header_len;
9226 for (
const char* pos= start; pos < end;)
9230 case RW_V_EXTRAINFO_TAG:
9233 assert((end - pos) >= EXTRA_ROW_INFO_HDR_BYTES);
9234 uint8 infoLen= pos[EXTRA_ROW_INFO_LEN_OFFSET];
9235 assert((end - pos) >= infoLen);
9237 if (likely(!m_extra_row_data))
9239 m_extra_row_data= (uchar*) my_malloc(infoLen,
9241 if (likely(m_extra_row_data != NULL))
9243 memcpy(m_extra_row_data, pos, infoLen);
9245 DBUG_EXECUTE_IF(
"extra_row_data_check",
9247 check_extra_data(m_extra_row_data););
9259 uchar
const *
const var_start=
9260 (
const uchar *)buf + common_header_len + post_header_len + var_header_len;
9261 uchar
const *
const ptr_width= var_start;
9262 uchar *ptr_after_width= (uchar*) ptr_width;
9263 DBUG_PRINT(
"debug", (
"Reading from %p", ptr_after_width));
9264 m_width = net_field_length(&ptr_after_width);
9265 DBUG_PRINT(
"debug", (
"m_width=%lu", m_width));
9267 if (likely(!bitmap_init(&m_cols,
9268 m_width <=
sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
9272 DBUG_PRINT(
"debug", (
"Reading from %p", ptr_after_width));
9273 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
9274 create_last_word_mask(&m_cols);
9275 ptr_after_width+= (m_width + 7) / 8;
9276 DBUG_DUMP(
"m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
9281 m_cols.bitmap= NULL;
9285 m_cols_ai.bitmap= m_cols.bitmap;
9287 if ((event_type == UPDATE_ROWS_EVENT) ||
9288 (event_type == UPDATE_ROWS_EVENT_V1))
9290 DBUG_PRINT(
"debug", (
"Reading from %p", ptr_after_width));
9293 if (likely(!bitmap_init(&m_cols_ai,
9294 m_width <=
sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
9298 DBUG_PRINT(
"debug", (
"Reading from %p", ptr_after_width));
9299 memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
9300 create_last_word_mask(&m_cols_ai);
9301 ptr_after_width+= (m_width + 7) / 8;
9302 DBUG_DUMP(
"m_cols_ai", (uchar*) m_cols_ai.bitmap,
9303 no_bytes_in_map(&m_cols_ai));
9308 m_cols_ai.bitmap= 0;
9313 const uchar*
const ptr_rows_data= (
const uchar*) ptr_after_width;
9315 size_t const data_size= event_len - (ptr_rows_data - (
const uchar *) buf);
9316 DBUG_PRINT(
"info",(
"m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
9317 m_table_id.id(), m_flags, m_width, (ulong) data_size));
9320 m_rows_buf= (uchar*) my_malloc(data_size + 1, MYF(MY_WME));
9321 if (likely((
bool)m_rows_buf))
9323 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
9324 m_curr_row= m_rows_buf;
9326 m_rows_end= m_rows_buf + data_size;
9327 m_rows_cur= m_rows_end;
9328 memcpy(m_rows_buf, ptr_rows_data, data_size);
9336 Rows_log_event::~Rows_log_event()
9338 if (m_cols.bitmap == m_bitbuf)
9340 bitmap_free(&m_cols);
9341 my_free(m_rows_buf);
9342 my_free(m_extra_row_data);
9345 int Rows_log_event::get_data_size()
9347 int const general_type_code= get_general_type_code();
9349 uchar buf[
sizeof(m_width) + 1];
9350 uchar *end= net_store_length(buf, m_width);
9352 DBUG_EXECUTE_IF(
"old_row_based_repl_4_byte_map_id_master",
9353 return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
9354 (general_type_code == UPDATE_ROWS_EVENT ? no_bytes_in_map(&m_cols_ai) : 0) +
9355 (m_rows_cur - m_rows_buf););
9358 bool is_v2_event= get_type_code() > DELETE_ROWS_EVENT_V1;
9361 data_size= ROWS_HEADER_LEN_V2 +
9363 RW_V_TAG_LEN + m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET]:
9368 data_size= ROWS_HEADER_LEN_V1;
9370 data_size+= no_bytes_in_map(&m_cols);
9371 data_size+= (uint) (end - buf);
9373 if (general_type_code == UPDATE_ROWS_EVENT)
9374 data_size+= no_bytes_in_map(&m_cols_ai);
9376 data_size+= (uint) (m_rows_cur - m_rows_buf);
9381 #ifndef MYSQL_CLIENT
9382 int Rows_log_event::do_add_row_data(uchar *row_data,
size_t length)
9389 DBUG_ENTER(
"Rows_log_event::do_add_row_data");
9390 DBUG_PRINT(
"enter", (
"row_data: 0x%lx length: %lu", (ulong) row_data,
9409 DBUG_DUMP(
"row_data", row_data, min<size_t>(length, 32));
9412 DBUG_ASSERT(m_rows_buf <= m_rows_cur);
9413 DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
9414 DBUG_ASSERT(m_rows_cur <= m_rows_end);
9417 if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
9419 size_t const block_size= 1024;
9420 my_ptrdiff_t
const cur_size= m_rows_cur - m_rows_buf;
9421 my_ptrdiff_t
const new_alloc=
9422 block_size * ((cur_size + length + block_size - 1) / block_size);
9425 uchar*
const new_buf=
9426 (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc + 1,
9427 MYF(MY_ALLOW_ZERO_PTR|MY_WME));
9428 if (unlikely(!new_buf))
9429 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
9432 if (new_buf != m_rows_buf)
9434 m_rows_buf= new_buf;
9435 m_rows_cur= m_rows_buf + cur_size;
9442 m_rows_end= m_rows_buf + new_alloc;
9445 DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
9446 memcpy(m_rows_cur, row_data, length);
9447 m_rows_cur+= length;
9453 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
9487 my_bool is_any_column_signaled_for_table(
TABLE *table,
MY_BITMAP *cols)
9489 DBUG_ENTER(
"is_any_column_signaled_for_table");
9491 for (
Field **ptr= table->field ;
9492 *ptr && ((*ptr)->field_index < cols->n_bits);
9495 if (bitmap_is_set(cols, (*ptr)->field_index))
9499 DBUG_RETURN (FALSE);
9534 my_bool are_all_columns_signaled_for_key(
KEY *keyinfo,
MY_BITMAP *cols)
9536 DBUG_ENTER(
"are_all_columns_signaled_for_key");
9540 uint fieldnr= keyinfo->key_part[
i].fieldnr - 1;
9541 if (fieldnr >= cols->n_bits ||
9542 !bitmap_is_set(cols, fieldnr))
9590 search_key_in_table(
TABLE *table,
MY_BITMAP *bi_cols, uint key_type)
9592 DBUG_ENTER(
"search_key_in_table");
9598 if (key_type & PRI_KEY_FLAG &&
9599 (table->s->primary_key < MAX_KEY))
9601 DBUG_PRINT(
"debug", (
"Searching for PK"));
9602 keyinfo= table->s->key_info + (uint) table->s->primary_key;
9603 if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
9604 DBUG_RETURN(table->s->primary_key);
9607 DBUG_PRINT(
"debug", (
"Unique keys count: %u", table->s->uniques));
9609 if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
9611 DBUG_PRINT(
"debug", (
"Searching for UK"));
9612 for (key=0,keyinfo= table->key_info ;
9613 (key < table->s->keys) && (res == MAX_KEY);
9621 if (!((keyinfo->
flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) ||
9622 (key == table->s->primary_key))
9624 res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
9630 DBUG_PRINT(
"debug", (
"UK has NULLABLE parts or not all columns signaled."));
9633 if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
9635 DBUG_PRINT(
"debug", (
"Searching for K."));
9636 for (key=0,keyinfo= table->key_info ;
9637 (key < table->s->keys) && (res == MAX_KEY);
9646 if (!(table->s->keys_in_use.is_set(key)) ||
9647 ((keyinfo->
flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) ||
9648 !(table->file->index_flags(key, 0,
true) & HA_READ_NEXT) ||
9649 (key == table->s->primary_key))
9652 res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
9658 DBUG_PRINT(
"debug", (
"Not all columns signaled for K."));
9665 Rows_log_event::decide_row_lookup_algorithm_and_key()
9668 DBUG_ENTER(
"decide_row_lookup_algorithm_and_key");
9687 TABLE *table= this->m_table;
9688 uint event_type= this->get_general_type_code();
9691 this->m_key_index= MAX_KEY;
9693 if (event_type == WRITE_ROWS_EVENT)
9696 if (!(slave_rows_search_algorithms_options & SLAVE_ROWS_INDEX_SCAN))
9697 goto TABLE_OR_INDEX_HASH_SCAN;
9700 this->m_key_index= search_key_in_table(table, cols, (PRI_KEY_FLAG | UNIQUE_KEY_FLAG));
9701 if (this->m_key_index != MAX_KEY)
9703 DBUG_PRINT(
"info", (
"decide_row_lookup_algorithm_and_key: decided - INDEX_SCAN"));
9708 TABLE_OR_INDEX_HASH_SCAN:
9714 if (!(slave_rows_search_algorithms_options & SLAVE_ROWS_HASH_SCAN) ||
9716 goto TABLE_OR_INDEX_FULL_SCAN;
9719 this->m_key_index= search_key_in_table(table, cols, (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG));
9721 DBUG_PRINT(
"info", (
"decide_row_lookup_algorithm_and_key: decided - HASH_SCAN"));
9724 TABLE_OR_INDEX_FULL_SCAN:
9726 this->m_key_index= MAX_KEY;
9729 if (slave_rows_search_algorithms_options & SLAVE_ROWS_INDEX_SCAN)
9730 this->m_key_index= search_key_in_table(table, cols, (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG));
9732 if (this->m_key_index != MAX_KEY)
9734 DBUG_PRINT(
"info", (
"decide_row_lookup_algorithm_and_key: decided - INDEX_SCAN"));
9739 DBUG_PRINT(
"info", (
"decide_row_lookup_algorithm_and_key: decided - TABLE_SCAN"));
9751 DBUG_PRINT(
"debug", (
"Row lookup method: %s", s));
9765 Rows_log_event::row_operations_scan_and_key_setup()
9768 DBUG_ENTER(
"Row_log_event::row_operations_scan_and_key_setup");
9778 decide_row_lookup_algorithm_and_key();
9782 case ROW_LOOKUP_HASH_SCAN:
9785 error= HA_ERR_OUT_OF_MEM;
9788 case ROW_LOOKUP_INDEX_SCAN:
9790 DBUG_ASSERT (m_key_index < MAX_KEY);
9792 m_key= (uchar*)my_malloc(MAX_KEY_LENGTH, MYF(MY_WME));
9794 error= HA_ERR_OUT_OF_MEM;
9797 case ROW_LOOKUP_TABLE_SCAN:
9813 Rows_log_event::row_operations_scan_and_key_teardown(
int error)
9815 DBUG_ENTER(
"Rows_log_event::row_operations_scan_and_key_teardown");
9817 DBUG_ASSERT(!m_table->file->inited);
9820 case ROW_LOOKUP_HASH_SCAN:
9826 case ROW_LOOKUP_INDEX_SCAN:
9828 if (m_table->s->keys > 0)
9832 m_key_index= MAX_KEY;
9837 case ROW_LOOKUP_TABLE_SCAN:
9853 DBUG_ENTER(
"record_compare");
9867 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
9868 DBUG_DUMP(
"record[1]", table->record[1], table->s->reclength);
9871 uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
9873 if (table->s->null_bytes > 0)
9875 for (
int i = 0 ; i < 2 ; ++
i)
9880 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
9882 saved_x[
i]= table->record[
i][0];
9883 table->record[
i][0]|= 1
U;
9893 if (table->s->last_null_bit_pos > 0)
9895 saved_filler[
i]= table->record[
i][table->s->null_bytes - 1];
9896 table->record[
i][table->s->null_bytes - 1]|=
9897 256
U - (1
U << table->s->last_null_bit_pos);
9913 if ((table->s->blob_fields +
9914 table->s->varchar_fields +
9915 table->s->null_fields) == 0 &&
9916 bitmap_is_set_all(cols))
9918 result= cmp_record(table,
record[1]);
9930 for (
Field **ptr=table->field ;
9931 *ptr && ((*ptr)->field_index < cols->n_bits) && !result;
9935 if (bitmap_is_set(cols, field->field_index))
9938 if (field->is_null() != field->is_null_in_record(table->record[1]))
9942 else if (!field->is_null())
9943 result= field->cmp_binary_offset(table->s->rec_buff_length);
9954 if (table->s->null_bytes > 0)
9956 for (
int i = 0 ; i < 2 ; ++
i)
9958 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
9959 table->record[
i][0]= saved_x[
i];
9961 if (table->s->last_null_bit_pos)
9962 table->record[
i][table->s->null_bytes - 1]= saved_filler[
i];
9966 DBUG_RETURN(result);
9969 void Rows_log_event::do_post_row_operations(
Relay_log_info const *rli,
int error)
9979 DBUG_PRINT(
"info", (
"curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
9980 (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
9982 if (!m_curr_row_end && !error)
9984 error= unpack_current_row(rli, &m_cols);
9988 DBUG_ASSERT(error || m_curr_row_end != NULL);
9989 DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
9990 DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
9992 m_curr_row= m_curr_row_end;
9994 if (error == 0 && !m_table->file->has_transactions())
9996 thd->transaction.all.set_unsafe_rollback_flags(TRUE);
9997 thd->transaction.stmt.set_unsafe_rollback_flags(TRUE);
10001 int Rows_log_event::handle_idempotent_and_ignored_errors(
Relay_log_info const *rli,
int *err)
10006 int actual_error= convert_handler_error(error, thd, m_table);
10007 bool idempotent_error= (idempotent_error_code(error) &&
10009 bool ignored_error= (idempotent_error == 0 ?
10010 ignored_error_code(actual_error) : 0);
10012 if (idempotent_error || ignored_error)
10014 if ( (idempotent_error && log_warnings) ||
10015 (ignored_error && log_warnings > 1) )
10016 slave_rows_error_report(WARNING_LEVEL, error, rli, thd, m_table,
10018 const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
10020 thd->get_stmt_da()->clear_warning_info(thd->query_id);
10021 clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
10023 if (idempotent_error == 0)
10024 return ignored_error;
10033 DBUG_ENTER(
"Rows_log_event::do_apply_row");
10038 THD* old_thd= m_table->in_use;
10039 if (!m_table->in_use)
10040 m_table->in_use= thd;
10042 error= do_exec_row(rli);
10046 DBUG_PRINT(
"info", (
"error: %s", HA_ERR(error)));
10047 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
10050 m_table->in_use = old_thd;
10052 DBUG_RETURN(error);
10062 Rows_log_event::close_record_scan()
10064 DBUG_ENTER(
"Rows_log_event::close_record_scan");
10068 if (m_key_index < MAX_KEY)
10070 if (m_table->file->inited)
10071 error= m_table->file->ha_index_end();
10078 while((key_val= it++) && (key_val))
10082 else if (m_table->file->inited)
10083 error= m_table->file->ha_rnd_end();
10085 DBUG_RETURN(error);
10101 Rows_log_event::next_record_scan(
bool first_read)
10103 DBUG_ENTER(
"Rows_log_event::next_record_scan");
10104 DBUG_ASSERT(m_table->file->inited);
10105 TABLE *table= m_table;
10108 if (m_key_index >= MAX_KEY)
10109 error= table->file->
ha_rnd_next(table->record[0]);
10112 KEY *keyinfo= m_table->key_info + m_key_index;
10119 if (table->s->null_bytes > 0)
10120 table->record[0][table->s->null_bytes - 1]|=
10121 256
U - (1
U << table->s->last_null_bit_pos);
10139 (key_cmp(keyinfo->key_part, m_key, keyinfo->
key_length) != 0))
10141 if ((m_key= m_itr++))
10144 error= HA_ERR_KEY_NOT_FOUND;
10151 HA_READ_KEY_EXACT)))
10153 DBUG_PRINT(
"info",(
"no record matching the key found in the table"));
10154 if (error == HA_ERR_RECORD_DELETED)
10155 error= HA_ERR_KEY_NOT_FOUND;
10159 DBUG_RETURN(error);
10168 Rows_log_event::open_record_scan()
10171 TABLE *table= m_table;
10172 DBUG_ENTER(
"Rows_log_event::open_record_scan");
10174 if (m_key_index < MAX_KEY )
10176 KEY *keyinfo= m_table->key_info + m_key_index;
10180 m_itr.init(m_distinct_key_list);
10190 key_copy(m_key, m_table->record[0], keyinfo, 0);
10197 store_record(table,
record[1]);
10199 DBUG_PRINT(
"info",(
"locating record using a key (index_read)"));
10202 if (!table->file->inited && (error= table->file->
ha_index_init(m_key_index, FALSE)))
10204 DBUG_PRINT(
"info",(
"ha_index_init returns error %d",error));
10212 #ifndef HAVE_purify
10213 DBUG_DUMP(
"key data", m_key, keyinfo->
key_length);
10220 DBUG_PRINT(
"info",(
"error initializing table scan"
10221 " (ha_rnd_init returns %d)",error));
10227 DBUG_RETURN(error);
10237 Rows_log_event::add_key_to_distinct_keyset()
10240 bool distinct=
true;
10241 DBUG_ENTER(
"Rows_log_event::add_key_to_distinct_keyset");
10242 DBUG_ASSERT(m_key_index < MAX_KEY);
10243 KEY *cur_key_info= m_table->key_info + m_key_index;
10245 if ((last_hashed_key))
10246 distinct= key_cmp(cur_key_info->key_part, last_hashed_key,
10251 uchar *cur_key= (uchar *)my_malloc(cur_key_info->
key_length,
10255 error= HA_ERR_OUT_OF_MEM;
10258 m_distinct_key_list.push_back(cur_key);
10259 last_hashed_key= cur_key;
10260 key_copy(cur_key, m_table->record[0], cur_key_info, 0);
10264 DBUG_RETURN(error);
10268 int Rows_log_event::do_index_scan_and_update(
Relay_log_info const *rli)
10270 DBUG_ENTER(
"Rows_log_event::do_index_scan_and_update");
10271 DBUG_ASSERT(m_table && m_table->in_use != NULL);
10273 KEY *keyinfo= NULL;
10274 TABLE *table= m_table;
10276 const uchar *saved_m_curr_row= m_curr_row;
10285 prepare_record(table, &m_cols, FALSE);
10286 if ((error= unpack_current_row(rli, &m_cols)))
10290 memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
10300 if (m_key_index >= MAX_KEY)
10302 error= HA_ERR_END_OF_FILE;
10307 DBUG_PRINT(
"info",(
"looking for the following record"));
10308 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
10311 if (m_key_index != m_table->s->primary_key)
10315 if ((table->file->
ha_table_flags() & HA_READ_BEFORE_WRITE_REMOVAL))
10321 DBUG_PRINT(
"info", (
"using read before write removal"));
10322 DBUG_ASSERT(m_key_index == m_table->s->primary_key);
10328 table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
10333 if ((table->file->
ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
10354 DBUG_PRINT(
"info",(
"locating record using primary key (position)"));
10355 if (table->file->inited && (error= table->file->
ha_index_end()))
10366 DBUG_PRINT(
"info",(
"rnd_pos returns error %d",error));
10367 if (error == HA_ERR_RECORD_DELETED)
10368 error= HA_ERR_KEY_NOT_FOUND;
10379 keyinfo= table->key_info + m_key_index;
10381 if ((error= open_record_scan()))
10384 error= next_record_scan(
true);
10387 DBUG_PRINT(
"info",(
"no record matching the key found in the table"));
10388 if (error == HA_ERR_RECORD_DELETED)
10389 error= HA_ERR_KEY_NOT_FOUND;
10398 #ifndef HAVE_purify
10399 DBUG_PRINT(
"info",(
"found first matching record"));
10400 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
10416 if (keyinfo->
flags & HA_NOSAME || m_key_index == table->s->primary_key)
10419 if (!(keyinfo->
flags & (HA_NULL_PART_KEY)))
10427 bool null_found= FALSE;
10430 uint fieldnr= keyinfo->key_part[
i].fieldnr - 1;
10431 Field **f= table->field+fieldnr;
10432 null_found= (*f)->is_null();
10447 DBUG_PRINT(
"info",(
"non-unique index, scanning it to find matching record"));
10449 while (record_compare(table, &m_cols))
10451 while((error= next_record_scan(
false)))
10454 if (error == HA_ERR_RECORD_DELETED)
10456 DBUG_PRINT(
"info",(
"no record matching the given row found"));
10463 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
10465 if (error && error != HA_ERR_RECORD_DELETED)
10468 error= do_apply_row(rli);
10471 error= close_record_scan();
10477 (void) close_record_scan();
10479 if ((get_general_type_code() == UPDATE_ROWS_EVENT) &&
10480 (saved_m_curr_row == m_curr_row))
10483 m_curr_row= m_curr_row_end;
10484 unpack_current_row(rli, &m_cols);
10486 table->default_column_bitmaps();
10487 DBUG_RETURN(error);
10493 DBUG_ENTER(
"Rows_log_event::do_hash_row");
10494 DBUG_ASSERT(m_table && m_table->in_use != NULL);
10501 entry->positions->
bi_start= m_curr_row;
10502 prepare_record(m_table, &m_cols,
false);
10503 if ((error= unpack_current_row(rli, &m_cols)))
10505 entry->positions->bi_ends= m_curr_row_end;
10514 if (m_key_index < MAX_KEY)
10515 add_key_to_distinct_keyset();
10522 if (get_general_type_code() == UPDATE_ROWS_EVENT)
10525 store_record(m_table,
record[1]);
10536 m_curr_row= m_curr_row_end;
10539 prepare_record(m_table, &m_cols,
false);
10540 error= unpack_current_row(rli, &m_cols_ai);
10551 restore_record(m_table,
record[1]);
10555 DBUG_RETURN(error);
10558 int Rows_log_event::do_scan_and_update(
Relay_log_info const *rli)
10560 DBUG_ENTER(
"Rows_log_event::do_scan_and_update");
10561 DBUG_ASSERT(m_table && m_table->in_use != NULL);
10563 TABLE *table= m_table;
10565 const uchar *saved_last_m_curr_row= NULL;
10566 const uchar *saved_last_m_curr_row_end= NULL;
10569 int idempotent_errors= 0;
10572 saved_last_m_curr_row=m_curr_row;
10573 saved_last_m_curr_row_end=m_curr_row_end;
10575 DBUG_PRINT(
"info",(
"Hash was populated with %d records!",
m_hash.
size()));
10578 if ((error= open_record_scan()))
10588 error= next_record_scan(i == 0);
10592 DBUG_PRINT(
"info", (
"error: %s", HA_ERR(error)));
10597 store_record(table,
record[1]);
10607 m_curr_row= entry->positions->
bi_start;
10608 m_curr_row_end= entry->positions->bi_ends;
10610 prepare_record(table, &m_cols,
false);
10611 if ((error= unpack_current_row(rli, &m_cols)))
10614 if (record_compare(table, &m_cols))
10626 restore_record(table,
record[1]);
10639 m_curr_row= entry->positions->
bi_start;
10640 m_curr_row_end= entry->positions->bi_ends;
10646 if ((error= do_apply_row(rli)))
10648 if (handle_idempotent_and_ignored_errors(rli, &error))
10651 do_post_row_operations(rli, error);
10657 case HA_ERR_RECORD_DELETED:
10661 case HA_ERR_KEY_NOT_FOUND:
10664 if (handle_idempotent_and_ignored_errors(rli, &error))
10666 idempotent_errors++;
10669 case HA_ERR_END_OF_FILE:
10682 (!error || (error == HA_ERR_RECORD_DELETED)));
10685 if (error == HA_ERR_RECORD_DELETED)
10690 table->file->print_error(error, MYF(0));
10691 DBUG_PRINT(
"info", (
"Failed to get next record"
10692 " (ha_rnd_next returns %d)",error));
10697 (void) close_record_scan();
10700 error= close_record_scan();
10704 ((error) || (idempotent_errors >=
m_hash.
size()))));
10714 m_curr_row= saved_last_m_curr_row;
10715 m_curr_row_end= saved_last_m_curr_row_end;
10718 DBUG_RETURN(error);
10721 int Rows_log_event::do_hash_scan_and_update(
Relay_log_info const *rli)
10723 DBUG_ENTER(
"Rows_log_event::do_hash_scan_and_update");
10724 DBUG_ASSERT(m_table && m_table->in_use != NULL);
10729 if (
int error= this->do_hash_row(rli))
10730 DBUG_RETURN(error);
10733 if (m_curr_row_end < m_rows_end)
10736 DBUG_PRINT(
"info",(
"Hash was populated with %d records!",
m_hash.
size()));
10737 DBUG_ASSERT(m_curr_row_end == m_rows_end);
10741 DBUG_RETURN(this->do_scan_and_update(rli));
10744 int Rows_log_event::do_table_scan_and_update(
Relay_log_info const *rli)
10747 const uchar* saved_m_curr_row= m_curr_row;
10748 TABLE* table= m_table;
10750 DBUG_ENTER(
"Rows_log_event::do_table_scan_and_update");
10751 DBUG_ASSERT(m_curr_row != m_rows_end);
10752 DBUG_PRINT(
"info",(
"locating record using table scan (ha_rnd_next)"));
10754 saved_m_curr_row= m_curr_row;
10757 prepare_record(table, &m_cols, FALSE);
10758 if (!(error= unpack_current_row(rli, &m_cols)))
10761 memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
10764 store_record(m_table,
record[1]);
10766 int restart_count= 0;
10768 if ((error= m_table->file->ha_rnd_init(1)))
10770 DBUG_PRINT(
"info",(
"error initializing table scan"
10771 " (ha_rnd_init returns %d)",error));
10778 error= m_table->file->ha_rnd_next(m_table->record[0]);
10780 DBUG_PRINT(
"info", (
"error: %s", HA_ERR(error)));
10782 case HA_ERR_END_OF_FILE:
10784 if (++restart_count < 2)
10785 error= m_table->file->ha_rnd_init(1);
10788 case HA_ERR_RECORD_DELETED:
10799 while ((error == HA_ERR_END_OF_FILE && restart_count < 2) ||
10800 (error == HA_ERR_RECORD_DELETED) ||
10801 (!error && record_compare(m_table, &m_cols)));
10806 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
10809 if (error && error != HA_ERR_RECORD_DELETED)
10811 DBUG_PRINT(
"info", (
"Failed to get next record"
10812 " (ha_rnd_next returns %d)",error));
10813 m_table->file->print_error(error, MYF(0));
10816 error= do_apply_row(rli);
10820 error= close_record_scan();
10826 (void) close_record_scan();
10828 if ((get_general_type_code() == UPDATE_ROWS_EVENT) &&
10829 (saved_m_curr_row == m_curr_row))
10831 m_curr_row= m_curr_row_end;
10832 unpack_current_row(rli, &m_cols);
10835 table->default_column_bitmaps();
10836 DBUG_RETURN(error);
10841 DBUG_ENTER(
"Rows_log_event::do_apply_event(Relay_log_info*)");
10846 enum_gtid_statement_status state= gtid_pre_statement_checks(thd);
10847 if (state == GTID_STATEMENT_CANCEL)
10850 else if (state == GTID_STATEMENT_SKIP)
10859 DBUG_ASSERT(rli->info_thd == thd);
10885 thd->transaction.stmt.reset_unsafe_rollback_flags();
10892 thd->lex->set_stmt_row_injection();
10899 if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
10904 if (get_flags(RELAXED_UNIQUE_CHECKS_F))
10909 thd->binlog_row_event_extra_data = m_extra_row_data;
10916 uint actual_error= thd->get_stmt_da()->sql_errno();
10917 if (thd->is_slave_error || thd->is_fatal_error)
10925 rli->
report(ERROR_LEVEL, actual_error,
10926 "Error executing row event: '%s'",
10927 (actual_error ? thd->get_stmt_da()->message() :
10928 "unexpected success or fatal error"));
10929 thd->is_slave_error= 1;
10931 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(thd);
10932 DBUG_RETURN(actual_error);
10944 DBUG_PRINT(
"debug", (
"Checking compability of tables to lock - tables_to_lock: %p",
10945 rli->tables_to_lock));
10960 for (uint i= 0 ; ptr && (i < rli->tables_to_lock_count);
10963 DBUG_ASSERT(ptr->m_tabledef_valid);
10965 if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
10966 ptr->table, &conv_table))
10968 DBUG_PRINT(
"debug", (
"Table: %s.%s is not compatible with master",
10969 ptr->table->s->db.str,
10970 ptr->table->s->table_name.str));
10975 thd->is_slave_error= 1;
10976 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(thd);
10979 DBUG_PRINT(
"debug", (
"Table: %s.%s is compatible with master"
10980 " - conv_table: %p",
10981 ptr->table->s->db.str,
10982 ptr->table->s->table_name.str, conv_table));
10983 ptr->m_conv_table= conv_table;
11002 for (uint i=0 ; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
11003 const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
11005 #ifdef HAVE_QUERY_CACHE
11006 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
11012 m_table=
const_cast<Relay_log_info*
>(rli)->m_table_map.get_table(m_table_id);
11014 DBUG_PRINT(
"debug", (
"m_table: 0x%lx, m_table_id: %llu", (ulong) m_table,
11033 thd->set_time(&when);
11035 thd->binlog_row_event_extra_data = m_extra_row_data;
11048 if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
11061 DBUG_PRINT_BITSET(
"debug",
"Setting table's read_set from: %s", &m_cols);
11063 bitmap_set_all(table->read_set);
11064 if (get_general_type_code() == DELETE_ROWS_EVENT ||
11065 get_general_type_code() == UPDATE_ROWS_EVENT)
11066 bitmap_intersect(table->read_set,&m_cols);
11068 bitmap_set_all(table->write_set);
11071 MY_BITMAP *after_image= ((get_general_type_code() == UPDATE_ROWS_EVENT) ?
11072 &m_cols_ai : &m_cols);
11073 bitmap_intersect(table->write_set, after_image);
11078 error= do_before_row_operations(rli);
11085 ulong saved_sql_mode= thd->variables.sql_mode;
11086 thd->variables.sql_mode= MODE_NO_AUTO_VALUE_ON_ZERO;
11094 const_cast<Relay_log_info*
>(rli)->set_row_stmt_start_timestamp();
11096 const uchar *saved_m_curr_row= m_curr_row;
11104 if ((get_general_type_code() == UPDATE_ROWS_EVENT) &&
11105 !is_any_column_signaled_for_table(table, &m_cols_ai))
11106 goto AFTER_MAIN_EXEC_ROW_LOOP;
11116 !is_any_column_signaled_for_table(table, &m_cols))
11118 error= HA_ERR_END_OF_FILE;
11119 goto AFTER_MAIN_EXEC_ROW_LOOP;
11123 case ROW_LOOKUP_HASH_SCAN:
11124 do_apply_row_ptr= &Rows_log_event::do_hash_scan_and_update;
11127 case ROW_LOOKUP_INDEX_SCAN:
11128 do_apply_row_ptr= &Rows_log_event::do_index_scan_and_update;
11131 case ROW_LOOKUP_TABLE_SCAN:
11132 do_apply_row_ptr= &Rows_log_event::do_table_scan_and_update;
11135 case ROW_LOOKUP_NOT_NEEDED:
11136 DBUG_ASSERT(get_general_type_code() == WRITE_ROWS_EVENT);
11139 do_apply_row_ptr= &Rows_log_event::do_apply_row;
11145 goto AFTER_MAIN_EXEC_ROW_LOOP;
11151 error= (this->*do_apply_row_ptr)(rli);
11153 if (handle_idempotent_and_ignored_errors(rli, &error))
11157 do_post_row_operations(rli, error);
11159 }
while (!error && (m_curr_row != m_rows_end));
11161 AFTER_MAIN_EXEC_ROW_LOOP:
11163 if (saved_m_curr_row != m_curr_row && !table->file->has_transactions())
11180 thd->transaction.stmt.mark_modified_non_trans_table();
11181 thd->transaction.merge_unsafe_rollback_flags();
11187 thd->variables.sql_mode= saved_sql_mode;
11195 DBUG_EXECUTE_IF(
"stop_slave_middle_group",
11196 if (thd->transaction.all.cannot_safely_rollback())
11197 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
11200 if ((error= do_after_row_operations(rli, error)) &&
11201 ignored_error_code(convert_handler_error(error, thd, table)))
11204 if (log_warnings > 1)
11205 slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
11207 const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
11209 thd->get_stmt_da()->clear_warning_info(thd->query_id);
11210 clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
11217 slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
11219 const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
11228 thd->reset_current_stmt_binlog_format_row();
11229 thd->is_slave_error= 1;
11230 DBUG_RETURN(error);
11233 if (get_flags(STMT_END_F) && (error= rows_event_stmt_cleanup(rli, thd)))
11234 slave_rows_error_report(ERROR_LEVEL,
11235 thd->is_error() ? 0 : error,
11240 DBUG_RETURN(error);
11251 if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
11254 return Log_event::do_shall_skip(rli);
11268 static int rows_event_stmt_cleanup(
Relay_log_info const *rli, THD * thd)
11287 error= thd->binlog_flush_pending_rows_event(TRUE);
11300 DBUG_ASSERT(! thd->transaction_rollback_request);
11301 error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
11323 thd->reset_current_stmt_binlog_format_row();
11343 DBUG_ENTER(
"Rows_log_event::do_update_pos");
11346 DBUG_PRINT(
"info", (
"flags: %s",
11347 get_flags(STMT_END_F) ?
"STMT_END_F " :
""));
11350 DBUG_ASSERT(!is_mts_worker(rli->info_thd));
11352 if (get_flags(STMT_END_F))
11363 rli->inc_event_relay_log_pos();
11366 DBUG_RETURN(error);
11371 #ifndef MYSQL_CLIENT
11372 bool Rows_log_event::write_data_header(
IO_CACHE *file)
11374 uchar buf[ROWS_HEADER_LEN_V2];
11375 DBUG_ASSERT(m_table_id.is_valid());
11376 DBUG_EXECUTE_IF(
"old_row_based_repl_4_byte_map_id_master",
11378 int4store(buf + 0, (ulong) m_table_id.id());
11379 int2store(buf + 4, m_flags);
11380 return (wrapper_my_b_safe_write(file, buf, 6));
11382 int6store(buf + RW_MAPID_OFFSET, m_table_id.id());
11383 int2store(buf + RW_FLAGS_OFFSET, m_flags);
11385 if (likely(!log_bin_use_v1_row_events))
11392 uint16 vhpayloadlen= 0;
11393 uint16 extra_data_len= 0;
11394 if (m_extra_row_data)
11396 extra_data_len= m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
11397 vhpayloadlen= RW_V_TAG_LEN + extra_data_len;
11401 int2store(buf + RW_VHLEN_OFFSET, vhlen + vhpayloadlen);
11402 rc= wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN_V2);
11405 if ((vhpayloadlen > 0) &&
11409 uchar type_code= RW_V_EXTRAINFO_TAG;
11410 rc= wrapper_my_b_safe_write(file, &type_code, RW_V_TAG_LEN);
11412 rc= wrapper_my_b_safe_write(file, m_extra_row_data, extra_data_len);
11417 rc= wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN_V1);
11423 bool Rows_log_event::write_data_body(
IO_CACHE*file)
11429 uchar sbuf[
sizeof(m_width) + 1];
11430 my_ptrdiff_t
const data_size= m_rows_cur - m_rows_buf;
11432 uchar *
const sbuf_end= net_store_length(sbuf, (
size_t) m_width);
11433 DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <=
sizeof(sbuf));
11435 DBUG_DUMP(
"m_width", sbuf, (
size_t) (sbuf_end - sbuf));
11436 res= res || wrapper_my_b_safe_write(file, sbuf, (
size_t) (sbuf_end - sbuf));
11438 DBUG_DUMP(
"m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
11439 res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap,
11440 no_bytes_in_map(&m_cols));
11444 if (get_general_type_code() == UPDATE_ROWS_EVENT)
11446 DBUG_DUMP(
"m_cols_ai", (uchar*) m_cols_ai.bitmap,
11447 no_bytes_in_map(&m_cols_ai));
11448 res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
11449 no_bytes_in_map(&m_cols_ai));
11451 DBUG_DUMP(
"rows", m_rows_buf, data_size);
11452 res= res || wrapper_my_b_safe_write(file, m_rows_buf, (
size_t) data_size);
11459 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
11460 int Rows_log_event::pack_info(
Protocol *protocol)
11463 char const *
const flagstr=
11464 get_flags(STMT_END_F) ?
" flags: STMT_END_F" :
"";
11465 size_t bytes= my_snprintf(buf,
sizeof(buf),
11466 "table_id: %llu%s", m_table_id.id(), flagstr);
11467 protocol->
store(buf, bytes, &my_charset_bin);
11472 #ifdef MYSQL_CLIENT
11473 void Rows_log_event::print_helper(FILE *file,
11474 PRINT_EVENT_INFO *print_event_info,
11475 char const *
const name)
11477 IO_CACHE *
const head= &print_event_info->head_cache;
11478 IO_CACHE *
const body= &print_event_info->body_cache;
11479 if (!print_event_info->short_form)
11481 bool const last_stmt_event= get_flags(STMT_END_F);
11482 print_header(head, print_event_info, !last_stmt_event);
11483 my_b_printf(head,
"\t%s: table id %llu%s\n",
11484 name, m_table_id.id(),
11485 last_stmt_event ?
" flags: STMT_END_F" :
"");
11486 print_base64(body, print_event_info, !last_stmt_event);
11527 #if !defined(MYSQL_CLIENT)
11552 int Table_map_log_event::save_field_metadata()
11554 DBUG_ENTER(
"Table_map_log_event::save_field_metadata");
11556 for (
unsigned int i= 0 ; i < m_table->s->fields ; i++)
11558 DBUG_PRINT(
"debug", (
"field_type: %d", m_coltype[i]));
11559 index+= m_table->s->field[
i]->save_field_metadata(&m_field_metadata[index]);
11561 DBUG_RETURN(index);
11570 #if !defined(MYSQL_CLIENT)
11571 Table_map_log_event::Table_map_log_event(THD *thd,
TABLE *tbl,
11575 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
11579 m_dbnam(tbl->s->db.str),
11580 m_dblen(m_dbnam ? tbl->s->db.length : 0),
11583 m_colcnt(tbl->s->fields),
11586 m_flags(TM_BIT_LEN_EXACT_F),
11588 m_field_metadata(0),
11589 m_field_metadata_size(0),
11591 m_meta_memory(NULL)
11593 uchar cbuf[
sizeof(m_colcnt) + 1];
11595 DBUG_ASSERT(m_table_id.is_valid());
11602 DBUG_ASSERT((tbl->s->db.str == 0) ||
11603 (tbl->s->db.str[tbl->s->db.length] == 0));
11604 DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
11607 m_data_size= TABLE_MAP_HEADER_LEN;
11608 DBUG_EXECUTE_IF(
"old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
11609 m_data_size+= m_dblen + 2;
11610 m_data_size+= m_tbllen + 2;
11611 cbuf_end= net_store_length(cbuf, (
size_t) m_colcnt);
11612 DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <=
sizeof(cbuf));
11613 m_data_size+= (cbuf_end - cbuf) + m_colcnt;
11616 if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
11618 m_coltype=
reinterpret_cast<uchar*
>(m_memory);
11619 for (
unsigned int i= 0 ; i < m_table->s->fields ; ++
i)
11620 m_coltype[i]= m_table->field[i]->binlog_type();
11629 uint num_null_bytes= (m_table->s->fields + 7) / 8;
11630 m_data_size+= num_null_bytes;
11631 m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
11632 &m_null_bits, num_null_bytes,
11633 &m_field_metadata, (m_colcnt * 2),
11636 memset(m_field_metadata, 0, (m_colcnt * 2));
11641 m_field_metadata_size= save_field_metadata();
11642 DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
11649 if (m_field_metadata_size < 251)
11650 m_data_size+= m_field_metadata_size + 1;
11652 m_data_size+= m_field_metadata_size + 3;
11654 memset(m_null_bits, 0, num_null_bytes);
11655 for (
unsigned int i= 0 ; i < m_table->s->fields ; ++
i)
11656 if (m_table->field[i]->maybe_null())
11657 m_null_bits[(i / 8)]+= 1 << (i % 8);
11664 uchar dbs= thd->get_binlog_accessed_db_names() ?
11665 thd->get_binlog_accessed_db_names()->elements : 0;
11668 char *db_name= thd->get_binlog_accessed_db_names()->head();
11669 if (!strcmp(db_name,
""))
11670 m_flags |= TM_REFERRED_FK_DB_F;
11678 #if defined(HAVE_REPLICATION)
11679 Table_map_log_event::Table_map_log_event(
const char *buf, uint event_len,
11681 *description_event)
11684 #ifndef MYSQL_CLIENT
11687 m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
11688 m_colcnt(0), m_coltype(0),
11689 m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
11690 m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
11691 m_null_bits(0), m_meta_memory(NULL)
11693 unsigned int bytes_read= 0;
11694 DBUG_ENTER(
"Table_map_log_event::Table_map_log_event(const char*,uint,...)");
11696 uint8 common_header_len= description_event->common_header_len;
11697 uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
11698 DBUG_PRINT(
"info",(
"event_len: %u common_header_len: %d post_header_len: %d",
11699 event_len, common_header_len, post_header_len));
11705 #ifndef HAVE_purify
11706 DBUG_DUMP(
"event buffer", (uchar*) buf, event_len);
11710 const char *post_start= buf + common_header_len;
11712 post_start+= TM_MAPID_OFFSET;
11713 if (post_header_len == 6)
11716 m_table_id= uint4korr(post_start);
11721 DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
11722 m_table_id= uint6korr(post_start);
11723 post_start+= TM_FLAGS_OFFSET;
11726 m_flags= uint2korr(post_start);
11729 const char *
const vpart= buf + common_header_len + post_header_len;
11732 uchar
const *
const ptr_dblen= (uchar
const*)vpart + 0;
11733 m_dblen= *(uchar*) ptr_dblen;
11736 uchar
const *
const ptr_tbllen= ptr_dblen + m_dblen + 2;
11737 m_tbllen= *(uchar*) ptr_tbllen;
11740 uchar
const *
const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
11741 uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
11742 m_colcnt= net_field_length(&ptr_after_colcnt);
11744 DBUG_PRINT(
"info",(
"m_dblen: %lu off: %ld m_tbllen: %lu off: %ld m_colcnt: %lu off: %ld",
11745 (ulong) m_dblen, (
long) (ptr_dblen-(
const uchar*)vpart),
11746 (ulong) m_tbllen, (
long) (ptr_tbllen-(
const uchar*)vpart),
11747 m_colcnt, (
long) (ptr_colcnt-(
const uchar*)vpart)));
11750 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
11751 &m_dbnam, (uint) m_dblen + 1,
11752 &m_tblnam, (uint) m_tbllen + 1,
11753 &m_coltype, (uint) m_colcnt,
11759 strncpy(const_cast<char*>(m_dbnam), (
const char*)ptr_dblen + 1, m_dblen + 1);
11760 strncpy(const_cast<char*>(m_tblnam), (
const char*)ptr_tbllen + 1, m_tbllen + 1);
11761 memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
11763 ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
11764 bytes_read= (uint) (ptr_after_colcnt - (uchar *)
buf);
11765 DBUG_PRINT(
"info", (
"Bytes read: %d.\n", bytes_read));
11766 if (bytes_read < event_len)
11768 m_field_metadata_size= net_field_length(&ptr_after_colcnt);
11769 DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
11770 uint num_null_bytes= (m_colcnt + 7) / 8;
11771 m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
11772 &m_null_bits, num_null_bytes,
11773 &m_field_metadata, m_field_metadata_size,
11775 memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
11776 ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
11777 memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
11785 Table_map_log_event::~Table_map_log_event()
11787 my_free(m_meta_memory);
11802 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
11804 enum enum_tbl_map_status
11813 SAME_ID_MAPPING_DIFFERENT_TABLE= 2,
11816 SAME_ID_MAPPING_SAME_TABLE= 3
11858 static enum_tbl_map_status
11861 DBUG_ENTER(
"check_table_map");
11862 enum_tbl_map_status res= OK_TO_PROCESS;
11864 if (rli->info_thd->slave_thread &&
11865 (!rpl_filter->db_ok(table_list->db) ||
11866 (rpl_filter->is_on() && !rpl_filter->tables_ok(
"", table_list))))
11871 for(uint i=0 ; ptr && (i< rli->tables_to_lock_count);
11874 if (ptr->table_id == table_list->table_id)
11877 if (strcmp(ptr->db, table_list->db) ||
11878 strcmp(ptr->alias, table_list->table_name) ||
11879 ptr->lock_type != TL_WRITE)
11880 res= SAME_ID_MAPPING_DIFFERENT_TABLE;
11882 res= SAME_ID_MAPPING_SAME_TABLE;
11889 DBUG_PRINT(
"debug", (
"check of table map ended up with: %u", res));
11894 int Table_map_log_event::do_apply_event(
Relay_log_info const *rli)
11897 char *db_mem, *tname_mem, *ptr;
11900 DBUG_ENTER(
"Table_map_log_event::do_apply_event(Relay_log_info*)");
11901 DBUG_ASSERT(rli->info_thd == thd);
11904 thd->set_query_id(next_query_id());
11906 if (!(memory= my_multi_malloc(MYF(MY_WME),
11908 &db_mem, (uint) NAME_LEN + 1,
11909 &tname_mem, (uint) NAME_LEN + 1,
11911 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
11913 strmov(db_mem, m_dbnam);
11914 strmov(tname_mem, m_tblnam);
11916 if (lower_case_table_names == 1)
11918 my_casedn_str(system_charset_info, db_mem);
11919 my_casedn_str(system_charset_info, tname_mem);
11923 if (((ptr= (
char*) rpl_filter->get_rewrite_db(db_mem, &dummy_len)) != db_mem))
11924 strmov(db_mem, ptr);
11927 tname_mem, strlen(tname_mem),
11928 tname_mem, TL_WRITE);
11930 table_list->table_id=
11931 DBUG_EVALUATE_IF(
"inject_tblmap_same_id_maps_diff_table", 0, m_table_id.id());
11932 table_list->updating= 1;
11933 table_list->required_type= FRMTYPE_TABLE;
11934 DBUG_PRINT(
"debug", (
"table: %s is mapped to %llu", table_list->table_name,
11935 table_list->table_id.id()));
11937 enum_tbl_map_status tblmap_status= check_table_map(rli, table_list);
11938 if (tblmap_status == OK_TO_PROCESS)
11940 DBUG_ASSERT(thd->lex->query_tables != table_list);
11951 new (&table_list->m_tabledef)
11953 m_field_metadata, m_field_metadata_size,
11954 m_null_bits, m_flags);
11955 table_list->m_tabledef_valid= TRUE;
11956 table_list->m_conv_table= NULL;
11963 table_list->next_global= table_list->next_local= rli->tables_to_lock;
11979 if (tblmap_status == SAME_ID_MAPPING_DIFFERENT_TABLE)
11989 my_snprintf(buf,
sizeof(buf),
11990 "Found table map event mapping table id %llu which "
11991 "was already mapped but with different settings.",
11992 table_list->table_id.id());
11994 if (thd->slave_thread)
11995 rli->
report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
11996 ER(ER_SLAVE_FATAL_ERROR), buf);
12002 my_printf_error(ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
12009 DBUG_RETURN(tblmap_status == SAME_ID_MAPPING_DIFFERENT_TABLE);
12019 return continue_group(rli);
12024 rli->inc_event_relay_log_pos();
12030 #ifndef MYSQL_CLIENT
12031 bool Table_map_log_event::write_data_header(
IO_CACHE *file)
12033 DBUG_ASSERT(m_table_id.is_valid());
12034 uchar buf[TABLE_MAP_HEADER_LEN];
12035 DBUG_EXECUTE_IF(
"old_row_based_repl_4_byte_map_id_master",
12037 int4store(buf + 0, m_table_id.id());
12038 int2store(buf + 4, m_flags);
12039 return (wrapper_my_b_safe_write(file, buf, 6));
12041 int6store(buf + TM_MAPID_OFFSET, m_table_id.id());
12042 int2store(buf + TM_FLAGS_OFFSET, m_flags);
12043 return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
12046 bool Table_map_log_event::write_data_body(
IO_CACHE *file)
12048 DBUG_ASSERT(m_dbnam != NULL);
12049 DBUG_ASSERT(m_tblnam != NULL);
12051 DBUG_ASSERT(m_dblen < 128);
12052 DBUG_ASSERT(m_tbllen < 128);
12054 uchar
const dbuf[]= { (uchar) m_dblen };
12055 uchar
const tbuf[]= { (uchar) m_tbllen };
12057 uchar cbuf[
sizeof(m_colcnt) + 1];
12058 uchar *
const cbuf_end= net_store_length(cbuf, (
size_t) m_colcnt);
12059 DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <=
sizeof(cbuf));
12064 uchar mbuf[
sizeof(m_field_metadata_size)];
12065 uchar *
const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
12067 return (wrapper_my_b_safe_write(file, dbuf,
sizeof(dbuf)) ||
12068 wrapper_my_b_safe_write(file, (
const uchar*)m_dbnam, m_dblen+1) ||
12069 wrapper_my_b_safe_write(file, tbuf,
sizeof(tbuf)) ||
12070 wrapper_my_b_safe_write(file, (
const uchar*)m_tblnam, m_tbllen+1) ||
12071 wrapper_my_b_safe_write(file, cbuf, (
size_t) (cbuf_end - cbuf)) ||
12072 wrapper_my_b_safe_write(file, m_coltype, m_colcnt) ||
12073 wrapper_my_b_safe_write(file, mbuf, (
size_t) (mbuf_end - mbuf)) ||
12074 wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
12075 wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
12079 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
12086 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
12087 int Table_map_log_event::pack_info(
Protocol *protocol)
12090 size_t bytes= my_snprintf(buf,
sizeof(buf),
12091 "table_id: %llu (%s.%s)",
12092 m_table_id.id(), m_dbnam, m_tblnam);
12093 protocol->
store(buf, bytes, &my_charset_bin);
12102 #ifdef MYSQL_CLIENT
12103 void Table_map_log_event::print(FILE *, PRINT_EVENT_INFO *print_event_info)
12105 if (!print_event_info->short_form)
12107 print_header(&print_event_info->head_cache, print_event_info, TRUE);
12108 my_b_printf(&print_event_info->head_cache,
12109 "\tTable_map: `%s`.`%s` mapped to number %llu\n",
12110 m_dbnam, m_tblnam, m_table_id.id());
12111 print_base64(&print_event_info->body_cache, print_event_info, TRUE);
12123 #if !defined(MYSQL_CLIENT)
12124 Write_rows_log_event::Write_rows_log_event(THD *thd_arg,
TABLE *tbl_arg,
12126 bool is_transactional,
12127 const uchar* extra_row_info)
12128 :
Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional,
12129 log_bin_use_v1_row_events?
12130 WRITE_ROWS_EVENT_V1:
12140 #ifdef HAVE_REPLICATION
12141 Write_rows_log_event::Write_rows_log_event(
const char *buf, uint event_len,
12143 *description_event)
12149 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
12158 if (get_flags(STMT_END_F))
12159 status_var_increment(thd->status_var.com_stat[SQLCOM_INSERT]);
12166 (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
12175 thd->lex->duplicates= DUP_REPLACE;
12182 thd->lex->sql_command= SQLCOM_REPLACE;
12186 m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
12191 m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
12197 m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
12207 m_table->next_number_field= m_table->found_next_number_field;
12217 m_table->auto_increment_field_not_null= TRUE;
12222 decide_row_lookup_algorithm_and_key();
12231 int local_error= 0;
12232 m_table->next_number_field=0;
12233 m_table->auto_increment_field_not_null= FALSE;
12235 m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
12237 m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
12238 m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
12247 if ((local_error= m_table->file->ha_end_bulk_insert()))
12249 m_table->file->print_error(local_error, MYF(0));
12254 return error? error : local_error;
12257 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
12263 last_uniq_key(
TABLE *table, uint keyno)
12265 while (++keyno < table->s->keys)
12266 if (table->key_info[keyno].
flags & HA_NOSAME)
12285 is_duplicate_key_error(
int errcode)
12289 case HA_ERR_FOUND_DUPP_KEY:
12290 case HA_ERR_FOUND_DUPP_UNIQUE:
12332 Write_rows_log_event::write_row(
const Relay_log_info *
const rli,
12333 const bool overwrite)
12335 DBUG_ENTER(
"write_row");
12336 DBUG_ASSERT(m_table != NULL && thd != NULL);
12338 TABLE *table= m_table;
12340 int UNINIT_VAR(keynum);
12343 prepare_record(table, &m_cols,
12344 table->file->ht->db_type != DB_TYPE_NDBCLUSTER);
12347 if ((error= unpack_current_row(rli, &m_cols)))
12348 DBUG_RETURN(error);
12350 if (m_curr_row == m_rows_buf)
12355 DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
12356 ulong estimated_rows= 0;
12357 if (m_curr_row < m_curr_row_end)
12358 estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
12359 else if (m_curr_row == m_curr_row_end)
12362 m_table->file->ha_start_bulk_insert(estimated_rows);
12367 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
12368 DBUG_PRINT_BITSET(
"debug",
"write_set = %s", table->write_set);
12369 DBUG_PRINT_BITSET(
"debug",
"read_set = %s", table->read_set);
12380 m_table->mark_columns_per_binlog_row_image();
12382 while ((error= table->file->ha_write_row(table->record[0])))
12384 if (error == HA_ERR_LOCK_DEADLOCK ||
12385 error == HA_ERR_LOCK_WAIT_TIMEOUT ||
12389 DBUG_PRINT(
"info",(
"get_dup_key returns %d)", keynum));
12412 DBUG_PRINT(
"info",(
"Locating offending record using ha_rnd_pos()"));
12414 if (table->file->inited && (error= table->file->
ha_index_end()))
12425 error= table->file->
ha_rnd_pos(table->record[1], table->file->dup_ref);
12430 DBUG_PRINT(
"info",(
"ha_rnd_pos() returns error %d",error));
12431 if (error == HA_ERR_RECORD_DELETED)
12432 error= HA_ERR_KEY_NOT_FOUND;
12439 DBUG_PRINT(
"info",(
"Locating offending record using index_read_idx()"));
12441 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
12443 DBUG_PRINT(
"info",(
"Error when setting HA_EXTRA_FLUSH_CACHE"));
12448 if (key.get() == NULL)
12450 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
12451 if (key.get() == NULL)
12453 DBUG_PRINT(
"info",(
"Can't allocate key buffer"));
12459 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
12462 (
const uchar*)key.get(),
12464 HA_READ_KEY_EXACT);
12467 DBUG_PRINT(
"info",(
"ha_index_read_idx_map() returns %s", HA_ERR(error)));
12468 if (error == HA_ERR_RECORD_DELETED)
12469 error= HA_ERR_KEY_NOT_FOUND;
12487 restore_record(table,
record[1]);
12488 error= unpack_current_row(rli, &m_cols);
12492 DBUG_PRINT(
"debug",(
"preparing for update: before and after image"));
12493 DBUG_DUMP(
"record[1] (before)", table->record[1], table->s->reclength);
12494 DBUG_DUMP(
"record[0] (after)", table->record[0], table->s->reclength);
12512 if (last_uniq_key(table, keynum) &&
12513 !table->file->referenced_by_foreign_key())
12515 DBUG_PRINT(
"info",(
"Updating row using ha_update_row()"));
12516 error=table->file->ha_update_row(table->record[1],
12520 case HA_ERR_RECORD_IS_THE_SAME:
12521 DBUG_PRINT(
"info",(
"ignoring HA_ERR_RECORD_IS_THE_SAME error from"
12522 " ha_update_row()"));
12529 DBUG_PRINT(
"info",(
"ha_update_row() returns error %d",error));
12537 DBUG_PRINT(
"info",(
"Deleting offending row and trying to write new one again"));
12538 if ((error= table->file->ha_delete_row(table->record[1])))
12540 DBUG_PRINT(
"info",(
"ha_delete_row() returns error %d",error));
12549 m_table->default_column_bitmaps();
12550 DBUG_RETURN(error);
12556 Write_rows_log_event::do_exec_row(
const Relay_log_info *
const rli)
12558 DBUG_ASSERT(m_table != NULL);
12559 int error= write_row(rli,
slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
12561 if (error && !thd->is_error())
12564 my_error(ER_UNKNOWN_ERROR, MYF(0));
12572 #ifdef MYSQL_CLIENT
12573 void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
12575 DBUG_EXECUTE_IF(
"simulate_cache_read_error",
12576 {DBUG_SET(
"+d,simulate_my_b_fill_error");});
12577 Rows_log_event::print_helper(file, print_event_info,
"Write_rows");
12589 #ifndef MYSQL_CLIENT
12590 Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg,
TABLE *tbl_arg,
12592 bool is_transactional,
12593 const uchar* extra_row_info)
12594 :
Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional,
12595 log_bin_use_v1_row_events?
12596 DELETE_ROWS_EVENT_V1:
12606 #ifdef HAVE_REPLICATION
12607 Delete_rows_log_event::Delete_rows_log_event(
const char *buf, uint event_len,
12609 *description_event)
12615 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
12621 DBUG_ENTER(
"Delete_rows_log_event::do_before_row_operations");
12625 if (get_flags(STMT_END_F))
12626 status_var_increment(thd->status_var.com_stat[SQLCOM_DELETE]);
12627 error= row_operations_scan_and_key_setup();
12628 DBUG_RETURN(error);
12636 DBUG_ENTER(
"Delete_rows_log_event::do_after_row_operations");
12637 error= row_operations_scan_and_key_teardown(error);
12638 DBUG_RETURN(error);
12641 int Delete_rows_log_event::do_exec_row(
const Relay_log_info *
const rli)
12644 DBUG_ASSERT(m_table != NULL);
12646 m_table->mark_columns_per_binlog_row_image();
12647 error= m_table->file->ha_delete_row(m_table->record[0]);
12648 m_table->default_column_bitmaps();
12654 #ifdef MYSQL_CLIENT
12655 void Delete_rows_log_event::print(FILE *file,
12656 PRINT_EVENT_INFO* print_event_info)
12658 Rows_log_event::print_helper(file, print_event_info,
"Delete_rows");
12670 #if !defined(MYSQL_CLIENT)
12671 Update_rows_log_event::Update_rows_log_event(THD *thd_arg,
TABLE *tbl_arg,
12673 bool is_transactional,
12674 const uchar* extra_row_info)
12675 :
Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional,
12676 log_bin_use_v1_row_events?
12677 UPDATE_ROWS_EVENT_V1:
12681 init(tbl_arg->write_set);
12684 void Update_rows_log_event::init(
MY_BITMAP const *cols)
12687 if (likely(!bitmap_init(&m_cols_ai,
12688 m_width <=
sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
12693 if (likely(cols != NULL))
12695 memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
12696 create_last_word_mask(&m_cols_ai);
12703 Update_rows_log_event::~Update_rows_log_event()
12705 if (m_cols_ai.bitmap == m_bitbuf_ai)
12706 m_cols_ai.bitmap= 0;
12707 bitmap_free(&m_cols_ai);
12714 #ifdef HAVE_REPLICATION
12715 Update_rows_log_event::Update_rows_log_event(
const char *buf, uint event_len,
12718 *description_event)
12724 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
12730 DBUG_ENTER(
"Update_rows_log_event::do_before_row_operations");
12734 if (get_flags(STMT_END_F))
12735 status_var_increment(thd->status_var.com_stat[SQLCOM_UPDATE]);
12736 error= row_operations_scan_and_key_setup();
12737 DBUG_RETURN(error);
12745 DBUG_ENTER(
"Update_rows_log_event::do_after_row_operations");
12746 error= row_operations_scan_and_key_teardown(error);
12747 DBUG_RETURN(error);
12751 Update_rows_log_event::do_exec_row(
const Relay_log_info *
const rli)
12753 DBUG_ASSERT(m_table != NULL);
12767 store_record(m_table,
record[1]);
12769 m_curr_row= m_curr_row_end;
12771 if ((error= unpack_current_row(rli, &m_cols_ai)))
12778 #ifndef HAVE_purify
12783 DBUG_PRINT(
"info",(
"Updating row in table"));
12784 DBUG_DUMP(
"old record", m_table->record[1], m_table->s->reclength);
12785 DBUG_DUMP(
"new values", m_table->record[0], m_table->s->reclength);
12789 memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
12790 memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
12792 m_table->mark_columns_per_binlog_row_image();
12793 error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
12794 if (error == HA_ERR_RECORD_IS_THE_SAME)
12796 m_table->default_column_bitmaps();
12803 #ifdef MYSQL_CLIENT
12804 void Update_rows_log_event::print(FILE *file,
12805 PRINT_EVENT_INFO* print_event_info)
12807 Rows_log_event::print_helper(file, print_event_info,
"Update_rows");
12812 Incident_log_event::Incident_log_event(
const char *buf, uint event_len,
12816 DBUG_ENTER(
"Incident_log_event::Incident_log_event");
12817 uint8
const common_header_len=
12818 descr_event->common_header_len;
12819 uint8
const post_header_len=
12820 descr_event->post_header_len[INCIDENT_EVENT-1];
12822 DBUG_PRINT(
"info",(
"event_len: %u; common_header_len: %d; post_header_len: %d",
12823 event_len, common_header_len, post_header_len));
12825 m_message.str= NULL;
12826 m_message.length= 0;
12827 int incident_number= uint2korr(buf + common_header_len);
12828 if (incident_number >= INCIDENT_COUNT ||
12829 incident_number <= INCIDENT_NONE)
12834 m_incident= INCIDENT_NONE;
12837 m_incident=
static_cast<Incident
>(incident_number);
12838 char const *ptr= buf + common_header_len + post_header_len;
12839 char const *
const str_end= buf + event_len;
12841 const char *str= NULL;
12842 read_str_at_most_255_bytes(&ptr, str_end, &str, &len);
12843 if (!(m_message.str= (
char*) my_malloc(len+1, MYF(MY_WME))))
12846 m_incident= INCIDENT_NONE;
12849 strmake(m_message.str, str, len);
12850 m_message.length= len;
12851 DBUG_PRINT(
"info", (
"m_incident: %d", m_incident));
12856 Incident_log_event::~Incident_log_event()
12859 my_free(m_message.str);
12864 Incident_log_event::description()
const
12866 static const char *
const description[]= {
12871 DBUG_PRINT(
"info", (
"m_incident: %d", m_incident));
12873 return description[m_incident];
12877 #ifndef MYSQL_CLIENT
12878 int Incident_log_event::pack_info(
Protocol *protocol)
12882 if (m_message.length > 0)
12883 bytes= my_snprintf(buf,
sizeof(buf),
"#%d (%s)",
12884 m_incident, description());
12886 bytes= my_snprintf(buf,
sizeof(buf),
"#%d (%s): %s",
12887 m_incident, description(), m_message.str);
12888 protocol->
store(buf, bytes, &my_charset_bin);
12894 #ifdef MYSQL_CLIENT
12896 Incident_log_event::print(FILE *file,
12897 PRINT_EVENT_INFO *print_event_info)
12899 if (print_event_info->short_form)
12902 print_header(&print_event_info->head_cache, print_event_info, FALSE);
12903 my_b_printf(&print_event_info->head_cache,
12904 "\n# Incident: %s\nRELOAD DATABASE; # Shall generate syntax error\n",
12909 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
12913 DBUG_ENTER(
"Incident_log_event::do_apply_event");
12915 if (ignored_error_code(ER_SLAVE_INCIDENT))
12917 DBUG_PRINT(
"info", (
"Ignoring Incident"));
12921 rli->
report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
12922 ER(ER_SLAVE_INCIDENT),
12924 m_message.length > 0 ? m_message.str :
"<none>");
12930 Incident_log_event::write_data_header(
IO_CACHE *file)
12932 DBUG_ENTER(
"Incident_log_event::write_data_header");
12933 DBUG_PRINT(
"enter", (
"m_incident: %d", m_incident));
12934 uchar buf[
sizeof(int16)];
12935 int2store(buf, (int16) m_incident);
12936 #ifndef MYSQL_CLIENT
12937 DBUG_RETURN(wrapper_my_b_safe_write(file, buf,
sizeof(buf)));
12939 DBUG_RETURN(my_b_safe_write(file, buf,
sizeof(buf)));
12944 Incident_log_event::write_data_body(
IO_CACHE *file)
12947 DBUG_ENTER(
"Incident_log_event::write_data_body");
12948 tmp[0]= (uchar) m_message.length;
12949 crc= my_checksum(
crc, (uchar*) tmp, 1);
12950 if (m_message.length > 0)
12952 crc= my_checksum(
crc, (uchar*) m_message.str, m_message.length);
12955 DBUG_RETURN(write_str_at_most_255_bytes(file, m_message.str, (uint) m_message.length));
12959 Ignorable_log_event::Ignorable_log_event(
const char *buf,
12963 DBUG_ENTER(
"Ignorable_log_event::Ignorable_log_event");
12967 Ignorable_log_event::~Ignorable_log_event()
12971 #ifndef MYSQL_CLIENT
12973 int Ignorable_log_event::pack_info(
Protocol *protocol)
12977 bytes= my_snprintf(buf,
sizeof(buf),
"# Unrecognized ignorable event");
12978 protocol->
store(buf, bytes, &my_charset_bin);
12983 #ifdef MYSQL_CLIENT
12986 Ignorable_log_event::print(FILE *file,
12987 PRINT_EVENT_INFO *print_event_info)
12989 if (print_event_info->short_form)
12992 print_header(&print_event_info->head_cache, print_event_info, FALSE);
12993 my_b_printf(&print_event_info->head_cache,
"\tIgnorable\n");
12994 my_b_printf(&print_event_info->head_cache,
12995 "# Unrecognized ignorable event\n");
13000 Rows_query_log_event::Rows_query_log_event(
const char *buf, uint event_len,
13004 DBUG_ENTER(
"Rows_query_log_event::Rows_query_log_event");
13005 uint8
const common_header_len=
13006 descr_event->common_header_len;
13007 uint8
const post_header_len=
13008 descr_event->post_header_len[ROWS_QUERY_LOG_EVENT-1];
13010 DBUG_PRINT(
"info",(
"event_len: %u; common_header_len: %d; post_header_len: %d",
13011 event_len, common_header_len, post_header_len));
13017 int offset= common_header_len + post_header_len + 1;
13018 int len= event_len -
offset;
13019 if (!(m_rows_query= (
char*) my_malloc(len+1, MYF(MY_WME))))
13021 strmake(m_rows_query, buf + offset, len);
13022 DBUG_PRINT(
"info", (
"m_rows_query: %s", m_rows_query));
13026 Rows_query_log_event::~Rows_query_log_event()
13028 my_free(m_rows_query);
13031 #ifndef MYSQL_CLIENT
13032 int Rows_query_log_event::pack_info(
Protocol *protocol)
13036 ulong len=
sizeof(
"# ") + (ulong) strlen(m_rows_query);
13037 if (!(buf= (
char*) my_malloc(len, MYF(MY_WME))))
13039 bytes= my_snprintf(buf, len,
"# %s", m_rows_query);
13040 protocol->
store(buf, bytes, &my_charset_bin);
13046 #ifdef MYSQL_CLIENT
13048 Rows_query_log_event::print(FILE *file,
13049 PRINT_EVENT_INFO *print_event_info)
13051 if (!print_event_info->short_form && print_event_info->verbose > 1)
13053 IO_CACHE *
const head= &print_event_info->head_cache;
13054 IO_CACHE *
const body= &print_event_info->body_cache;
13055 char *token= NULL, *saveptr= NULL;
13056 char *rows_query_copy= NULL;
13057 if (!(rows_query_copy= my_strdup(m_rows_query, MYF(MY_WME))))
13060 print_header(head, print_event_info, FALSE);
13061 my_b_printf(head,
"\tRows_query\n");
13067 for (token= strtok_r(rows_query_copy,
"\n", &saveptr); token;
13068 token= strtok_r(NULL,
"\n", &saveptr))
13069 my_b_printf(head,
"# %s\n", token);
13070 my_free(rows_query_copy);
13071 print_base64(body, print_event_info,
true);
13077 Rows_query_log_event::write_data_body(
IO_CACHE *file)
13079 DBUG_ENTER(
"Rows_query_log_event::write_data_body");
13084 DBUG_RETURN(write_str_at_most_255_bytes(file, m_rows_query,
13085 (uint) strlen(m_rows_query)));
13088 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
13089 int Rows_query_log_event::do_apply_event(
Relay_log_info const *rli)
13091 DBUG_ENTER(
"Rows_query_log_event::do_apply_event");
13092 DBUG_ASSERT(rli->info_thd == thd);
13094 thd->set_query(m_rows_query, (uint32) strlen(m_rows_query));
13096 DBUG_ASSERT(rli->rows_query_ev == NULL);
13105 const char *Gtid_log_event::SET_STRING_PREFIX=
"SET @@SESSION.GTID_NEXT= '";
13112 DBUG_ENTER(
"Gtid_log_event::Gtid_log_event(const char *, uint, const Format_description_log_event *");
13113 uint8
const common_header_len=
13114 descr_event->common_header_len;
13117 uint8
const post_header_len=
13118 buffer[EVENT_TYPE_OFFSET] == ANONYMOUS_GTID_LOG_EVENT ?
13119 descr_event->post_header_len[ANONYMOUS_GTID_LOG_EVENT - 1] :
13120 descr_event->post_header_len[GTID_LOG_EVENT - 1];
13121 DBUG_PRINT(
"info",(
"event_len: %u; common_header_len: %d; post_header_len: %d",
13122 event_len, common_header_len, post_header_len));
13125 char const *ptr_buffer= buffer + common_header_len;
13127 spec.type= buffer[EVENT_TYPE_OFFSET] == ANONYMOUS_GTID_LOG_EVENT ?
13128 ANONYMOUS_GROUP : GTID_GROUP;
13130 commit_flag= *ptr_buffer != 0;
13131 ptr_buffer+= ENCODED_FLAG_LENGTH;
13133 sid.copy_from((uchar *)ptr_buffer);
13134 ptr_buffer+= ENCODED_SID_LENGTH;
13137 spec.gtid.sidno= -1;
13139 spec.gtid.gno= uint8korr(ptr_buffer);
13140 ptr_buffer+= ENCODED_GNO_LENGTH;
13145 #ifndef MYSQL_CLIENT
13148 :
Log_event(thd_arg, thd_arg->variables.gtid_next.type == ANONYMOUS_GROUP ?
13150 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
13154 DBUG_ENTER(
"Gtid_log_event::Gtid_log_event(THD *)");
13155 spec= spec_arg ? *spec_arg : thd_arg->variables.gtid_next;
13156 if (spec.
type == GTID_GROUP)
13158 global_sid_lock->
rdlock();
13160 global_sid_lock->
unlock();
13165 char buf[MAX_SET_STRING_LENGTH + 1];
13167 DBUG_PRINT(
"info", (
"%s", buf));
13173 #ifndef MYSQL_CLIENT
13174 int Gtid_log_event::pack_info(
Protocol *protocol)
13176 char buffer[MAX_SET_STRING_LENGTH + 1];
13177 size_t len= to_string(buffer);
13178 protocol->
store(buffer, len, &my_charset_bin);
13183 size_t Gtid_log_event::to_string(
char *buf)
const
13186 DBUG_ASSERT(strlen(SET_STRING_PREFIX) == SET_STRING_PREFIX_LENGTH);
13187 strcpy(p, SET_STRING_PREFIX);
13188 p+= SET_STRING_PREFIX_LENGTH;
13195 #ifdef MYSQL_CLIENT
13197 Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
13199 char buffer[MAX_SET_STRING_LENGTH + 1];
13200 IO_CACHE *
const head= &print_event_info->head_cache;
13201 if (!print_event_info->short_form)
13203 print_header(head, print_event_info, FALSE);
13204 my_b_printf(head,
"\tGTID [commit=%s]\n", commit_flag ?
"yes" :
"no");
13207 my_b_printf(head,
"%s%s\n", buffer, print_event_info->delimiter);
13211 #ifdef MYSQL_SERVER
13212 bool Gtid_log_event::write_data_header(
IO_CACHE *file)
13214 DBUG_ENTER(
"Gtid_log_event::write_data_header");
13216 char* ptr_buffer= buffer;
13218 *ptr_buffer= commit_flag ? 1 : 0;
13219 ptr_buffer+= ENCODED_FLAG_LENGTH;
13224 DBUG_PRINT(
"info", (
"sid=%s sidno=%d gno=%lld",
13228 sid.
copy_to((uchar *)ptr_buffer);
13229 ptr_buffer+= ENCODED_SID_LENGTH;
13231 int8store(ptr_buffer, spec.
gtid.
gno);
13232 ptr_buffer+= ENCODED_GNO_LENGTH;
13234 DBUG_ASSERT(ptr_buffer == (buffer +
sizeof(buffer)));
13235 DBUG_RETURN(wrapper_my_b_safe_write(file, (uchar *) buffer,
sizeof(buffer)));
13237 #endif // MYSQL_SERVER
13239 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
13242 DBUG_ENTER(
"Gtid_log_event::do_apply_event");
13243 DBUG_ASSERT(rli->info_thd == thd);
13246 DBUG_ASSERT(gtid_mode > 0);
13251 if (thd->owned_gtid.sidno)
13253 gtid_rollback(thd);
13255 thd->variables.gtid_next.set(sidno, spec.
gtid.
gno);
13256 DBUG_PRINT(
"info", (
"setting gtid_next=%d:%lld",
13259 if (gtid_acquire_ownership_single(thd))
13272 rli->inc_event_relay_log_pos();
13273 DBUG_EXECUTE_IF(
"crash_after_update_pos_gtid",
13274 sql_print_information(
"Crashing crash_after_update_pos_gtid.");
13280 Previous_gtids_log_event::Previous_gtids_log_event(
13281 const char *buffer, uint event_len,
13285 DBUG_ENTER(
"Previous_gtids_log_event::Previous_gtids_log_event");
13286 uint8
const common_header_len=
13287 descr_event->common_header_len;
13288 uint8
const post_header_len=
13289 descr_event->post_header_len[PREVIOUS_GTIDS_LOG_EVENT - 1];
13291 DBUG_PRINT(
"info",(
"event_len: %u; common_header_len: %d; post_header_len: %d",
13292 event_len, common_header_len, post_header_len));
13294 buf= (
const uchar *)buffer + common_header_len + post_header_len;
13295 buf_size= (
const uchar *)buffer + event_len - buf;
13296 DBUG_PRINT(
"info", (
"data size of the event: %d", buf_size));
13300 #ifndef MYSQL_CLIENT
13301 Previous_gtids_log_event::Previous_gtids_log_event(
const Gtid_set *
set)
13305 DBUG_ENTER(
"Previous_gtids_log_event::Previous_gtids_log_event(THD *, const Gtid_set *)");
13307 buf_size=
set->get_encoded_length();
13308 uchar *buffer= (uchar *) my_malloc(buf_size, MYF(MY_WME));
13309 if (buffer != NULL)
13311 set->encode(buffer);
13312 register_temp_buf((
char *)buffer);
13320 #ifndef MYSQL_CLIENT
13321 int Previous_gtids_log_event::pack_info(
Protocol *protocol)
13324 global_sid_lock->
rdlock();
13326 global_sid_lock->
unlock();
13329 protocol->
store(str, length, &my_charset_bin);
13335 #ifdef MYSQL_CLIENT
13336 void Previous_gtids_log_event::print(FILE *file,
13337 PRINT_EVENT_INFO *print_event_info)
13339 IO_CACHE *
const head= &print_event_info->head_cache;
13341 global_sid_lock->
rdlock();
13343 global_sid_lock->
unlock();
13346 if (!print_event_info->short_form)
13348 print_header(head, print_event_info, FALSE);
13349 my_b_printf(head,
"\tPrevious-GTIDs\n");
13351 my_b_printf(head,
"%s\n", str);
13359 DBUG_ENTER(
"Previous_gtids_log_event::add_to_set(Gtid_set *)");
13361 size_t add_size= DBUG_EVALUATE_IF(
"gtid_has_extra_data", 10, 0);
13364 buf_size + add_size,
13366 DBUG_ASSERT(end_pos <= (
size_t) buf_size);
13373 DBUG_ENTER(
"Previous_gtids_log_event::get_str(size_t *)");
13376 DBUG_PRINT(
"info", (
"temp_buf=%p buf=%p", temp_buf, buf));
13377 if (
set.add_gtid_encoding(buf, buf_size) != RETURN_STATUS_OK)
13379 set.dbug_print(
"set");
13380 size_t length=
set.get_string_length(string_format);
13381 DBUG_PRINT(
"info", (
"string length= %lu", (ulong) length));
13382 char* str= (
char *)my_malloc(length + 1, MYF(MY_WME));
13385 set.to_string(str, string_format);
13386 if (length_p != NULL)
13392 #ifndef MYSQL_CLIENT
13393 bool Previous_gtids_log_event::write_data_body(
IO_CACHE *file)
13395 DBUG_ENTER(
"Previous_gtids_log_event::write_data_body");
13396 DBUG_PRINT(
"info", (
"size=%d", buf_size));
13397 bool ret= wrapper_my_b_safe_write(file, buf, buf_size);
13402 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
13403 int Previous_gtids_log_event::do_update_pos(
Relay_log_info *rli)
13405 rli->inc_event_relay_log_pos();
13411 #ifdef MYSQL_CLIENT
13417 st_print_event_info::st_print_event_info()
13418 :flags2_inited(0), sql_mode_inited(0), sql_mode(0),
13419 auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
13420 lc_time_names_number(~0),
13421 charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
13422 thread_id(0), thread_id_printed(false),
13423 base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE),
13424 have_unflushed_events(FALSE), skipped_event_in_transaction(false)
13431 memset(db, 0,
sizeof(db));
13432 memset(charset, 0,
sizeof(charset));
13433 memset(time_zone_str, 0,
sizeof(time_zone_str));
13436 myf
const flags = MYF(MY_WME | MY_NABP);
13437 open_cached_file(&head_cache, NULL, NULL, 0, flags);
13438 open_cached_file(&body_cache, NULL, NULL, 0, flags);
13443 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
13444 Heartbeat_log_event::Heartbeat_log_event(
const char* buf, uint event_len,
13448 uint8 header_size= description_event->common_header_len;
13449 ident_len = event_len - header_size;
13450 set_if_smaller(ident_len,FN_REFLEN-1);
13451 log_ident= buf + header_size;
13455 #ifdef MYSQL_SERVER
13467 size_t my_strmov_quoted_identifier(THD* thd,
char *buffer,
13468 const char* identifier,
13471 int q= thd ? get_quote_char_for_identifier(thd, identifier, length) :
'`';
13472 return my_strmov_quoted_identifier_helper(q, buffer, identifier, length);
13475 size_t my_strmov_quoted_identifier(
char *buffer,
const char* identifier)
13478 return my_strmov_quoted_identifier_helper(q, buffer, identifier, 0);
13483 size_t my_strmov_quoted_identifier_helper(
int q,
char *buffer,
13484 const char* identifier,
13489 uint id_length= (length) ? length : strlen(identifier);
13493 (void) strncpy(buffer, identifier, id_length);
13496 quote_char= (char) q;
13497 *buffer++= quote_char;
13499 while (id_length--)
13501 if (*identifier == quote_char)
13503 *buffer++= quote_char;
13506 *buffer++= *identifier++;
13509 *buffer++= quote_char;