20 #include "my_global.h"
23 #include "sql_cache.h"
27 #include "sql_parse.h"
29 #include "rpl_utility.h"
32 #include "rpl_record_old.h"
33 #include "transaction.h"
40 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
46 DBUG_ENTER(
"Old_rows_log_event::do_apply_event(st_relay_log_info*)");
49 uchar
const *row_start= ev->m_rows_buf;
57 if ((ev->m_table_id.id() == ~0
U || ev->m_table_id.id() == (~0ULL >> 16)) &&
58 ev->m_cols.n_bits == 1 && ev->m_cols.bitmap[0] == 0)
65 DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
67 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(ev_thd);
68 ev_thd->clear_error();
77 DBUG_ASSERT(rli->info_thd == ev_thd);
105 ev_thd->lex->set_stmt_row_injection();
109 uint actual_error= ev_thd->get_stmt_da()->sql_errno();
110 if (ev_thd->is_slave_error || ev_thd->is_fatal_error)
116 rli->
report(ERROR_LEVEL, actual_error,
117 "Error '%s' on opening tables",
118 (actual_error ? ev_thd->get_stmt_da()->message() :
119 "unexpected success or fatal error"));
120 ev_thd->is_slave_error= 1;
123 DBUG_RETURN(actual_error);
136 for (uint
i= 0 ; ptr&& (
i< rli->tables_to_lock_count);
139 DBUG_ASSERT(ptr->m_tabledef_valid);
141 if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
142 ptr->table, &conv_table))
144 ev_thd->is_slave_error= 1;
145 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(ev_thd);
148 DBUG_PRINT(
"debug", (
"Table: %s.%s is compatible with master"
150 ptr->table->s->db.str,
151 ptr->table->s->table_name.str, conv_table));
152 ptr->m_conv_table= conv_table;
171 for (uint
i=0; ptr && (
i < rli->tables_to_lock_count); ptr= ptr->next_global,
i++)
172 const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
173 #ifdef HAVE_QUERY_CACHE
174 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
196 ev_thd->set_time(&ev->when);
202 if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F))
207 if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F))
225 error= do_before_row_operations(table);
226 while (error == 0 && row_start < ev->m_rows_end)
228 uchar
const *row_end= NULL;
229 if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
233 DBUG_ASSERT(row_end != NULL);
234 DBUG_ASSERT(row_end <= ev->m_rows_end);
237 THD* old_thd= table->in_use;
239 table->in_use= ev_thd;
240 error= do_exec_row(table);
241 table->in_use = old_thd;
245 case HA_ERR_RECORD_CHANGED:
246 case HA_ERR_KEY_NOT_FOUND:
253 rli->
report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
254 "Error in %s event: row application failed. %s",
256 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() :
"");
257 thd->is_slave_error= 1;
263 DBUG_EXECUTE_IF(
"stop_slave_middle_group",
264 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
265 error= do_after_row_operations(table, error);
270 rli->
report(ERROR_LEVEL, ev_thd->get_stmt_da()->sql_errno(),
271 "Error in %s event: error during transaction execution "
272 "on table %s.%s. %s",
274 table->s->table_name.str,
275 ev_thd->is_error() ? ev_thd->get_stmt_da()->message() :
"");
288 ev_thd->reset_current_stmt_binlog_format_row();
290 ev_thd->is_slave_error= 1;
299 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
305 last_uniq_key(
TABLE *table, uint keyno)
307 while (++keyno < table->s->keys)
308 if (table->key_info[keyno].
flags & HA_NOSAME)
319 static bool record_compare(
TABLE *table)
334 uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
336 if (table->s->null_bytes > 0)
338 for (
int i = 0 ;
i < 2 ; ++
i)
343 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
345 saved_x[
i]= table->record[
i][0];
346 table->record[
i][0]|= 1
U;
356 if (table->s->last_null_bit_pos > 0)
358 saved_filler[
i]= table->record[
i][table->s->null_bytes - 1];
359 table->record[
i][table->s->null_bytes - 1]|=
360 256
U - (1
U << table->s->last_null_bit_pos);
365 if (table->s->blob_fields + table->s->varchar_fields == 0)
367 result= cmp_record(table,
record[1]);
368 goto record_compare_exit;
372 if (memcmp(table->null_flags,
373 table->null_flags+table->s->rec_buff_length,
374 table->s->null_bytes))
377 goto record_compare_exit;
381 for (
Field **ptr=table->field ; *ptr ; ptr++)
383 if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
386 goto record_compare_exit;
397 if (table->s->null_bytes > 0)
399 for (
int i = 0 ;
i < 2 ; ++
i)
401 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
402 table->record[
i][0]= saved_x[
i];
404 if (table->s->last_null_bit_pos > 0)
405 table->record[
i][table->s->null_bytes - 1]= saved_filler[
i];
422 copy_extra_record_fields(
TABLE *table,
423 size_t master_reclength,
424 my_ptrdiff_t master_fields)
426 DBUG_ENTER(
"copy_extra_record_fields(table, master_reclen, master_fields)");
427 DBUG_PRINT(
"info", (
"Copying to 0x%lx "
428 "from field %lu at offset %lu "
429 "to field %d at offset %lu",
430 (
long) table->record[0],
431 (ulong) master_fields, (ulong) master_reclength,
432 table->s->fields, table->s->reclength));
438 if (table->s->fields < (uint) master_fields)
441 DBUG_ASSERT(master_reclength <= table->s->reclength);
442 if (master_reclength < table->s->reclength)
443 memcpy(table->record[0] + master_reclength,
444 table->record[1] + master_reclength,
445 table->s->reclength - master_reclength);
464 Field **field_ptr= table->field + master_fields;
465 for ( ; *field_ptr ; ++field_ptr)
470 if ((*field_ptr)->maybe_null() &&
471 (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
472 (*field_ptr)->set_null();
474 (*field_ptr)->set_notnull();
479 switch ((*field_ptr)->real_type())
489 my_ptrdiff_t
const offset= table->record[1] - table->record[0];
491 get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
492 set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
523 replace_record(THD *thd,
TABLE *table,
524 ulong
const master_reclength,
525 uint
const master_fields)
527 DBUG_ENTER(
"replace_record");
528 DBUG_ASSERT(table != NULL && thd != NULL);
535 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
536 DBUG_PRINT_BITSET(
"debug",
"write_set = %s", table->write_set);
537 DBUG_PRINT_BITSET(
"debug",
"read_set = %s", table->read_set);
540 while ((error= table->file->ha_write_row(table->record[0])))
542 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
570 error= table->file->
ha_rnd_pos(table->record[1], table->file->dup_ref);
573 DBUG_PRINT(
"info",(
"ha_rnd_pos() returns error %d",error));
574 if (error == HA_ERR_RECORD_DELETED)
575 error= HA_ERR_KEY_NOT_FOUND;
582 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
584 DBUG_RETURN(my_errno);
587 if (key.get() == NULL)
589 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
590 if (key.get() == NULL)
594 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
597 (
const uchar*)key.get(),
602 DBUG_PRINT(
"info", (
"ha_index_read_idx_map() returns error %d", error));
603 if (error == HA_ERR_RECORD_DELETED)
604 error= HA_ERR_KEY_NOT_FOUND;
618 copy_extra_record_fields(table, master_reclength, master_fields);
635 if (last_uniq_key(table, keynum) &&
636 !table->file->referenced_by_foreign_key())
638 error=table->file->ha_update_row(table->record[1],
640 if (error && error != HA_ERR_RECORD_IS_THE_SAME)
648 if ((error= table->file->ha_delete_row(table->record[1])))
685 static int find_and_fetch_row(
TABLE *table, uchar *key)
687 DBUG_ENTER(
"find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
688 DBUG_PRINT(
"enter", (
"table: 0x%lx, key: 0x%lx record: 0x%lx",
689 (
long) table, (
long) key, (
long) table->record[1]));
691 DBUG_ASSERT(table->in_use != NULL);
693 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
695 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
696 table->s->primary_key < MAX_KEY)
716 table->file->position(table->record[0]);
717 int error= table->file->ha_rnd_pos(table->record[0], table->file->ref);
722 memcpy(table->record[1], table->record[0], table->s->reclength);
728 table->use_all_columns();
730 if (table->s->keys > 0)
734 if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
736 table->file->print_error(error, MYF(0));
745 DBUG_DUMP(
"table->record[0]", table->record[0], table->s->reclength);
746 DBUG_DUMP(
"table->record[1]", table->record[1], table->s->reclength);
755 my_ptrdiff_t
const pos=
756 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
757 table->record[1][pos]= 0xFF;
758 if ((error= table->file->ha_index_read_map(table->record[1], key, HA_WHOLE_KEY,
761 table->file->print_error(error, MYF(0));
762 table->file->ha_index_end();
771 DBUG_DUMP(
"table->record[0]", table->record[0], table->s->reclength);
772 DBUG_DUMP(
"table->record[1]", table->record[1], table->s->reclength);
788 if (table->key_info->flags & HA_NOSAME)
790 table->file->ha_index_end();
794 while (record_compare(table))
807 if (table->s->null_bytes > 0)
809 table->record[1][table->s->null_bytes - 1]|=
810 256
U - (1
U << table->s->last_null_bit_pos);
813 while ((error= table->file->ha_index_next(table->record[1])))
816 if (error == HA_ERR_RECORD_DELETED)
818 table->file->print_error(error, MYF(0));
819 table->file->ha_index_end();
827 table->file->ha_index_end();
831 int restart_count= 0;
835 if ((error= table->file->ha_rnd_init(1)))
837 table->file->print_error(error, MYF(0));
845 error= table->file->ha_rnd_next(table->record[1]);
847 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
848 DBUG_DUMP(
"record[1]", table->record[1], table->s->reclength);
858 case HA_ERR_RECORD_DELETED:
859 goto restart_ha_rnd_next;
861 case HA_ERR_END_OF_FILE:
862 if (++restart_count < 2)
864 if ((error= table->file->ha_rnd_init(1)))
866 table->file->print_error(error, MYF(0));
873 table->file->print_error(error, MYF(0));
874 DBUG_PRINT(
"info", (
"Record not found"));
875 (void) table->file->ha_rnd_end();
879 while (restart_count < 2 && record_compare(table));
884 DBUG_PRINT(
"info", (
"Record %sfound", restart_count == 2 ?
"not " :
""));
885 table->file->ha_rnd_end();
887 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
899 int Write_rows_log_event_old::do_before_row_operations(
TABLE *table)
910 thd->lex->duplicates= DUP_REPLACE;
917 thd->lex->sql_command= SQLCOM_REPLACE;
921 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
928 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
934 table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
945 int Write_rows_log_event_old::do_after_row_operations(
TABLE *table,
int error)
948 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
949 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
960 return error? error : local_error;
965 Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
968 uchar
const *row_start,
969 uchar
const **row_end)
971 DBUG_ASSERT(table != NULL);
972 DBUG_ASSERT(row_start && row_end);
975 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
976 table, m_width, table->record[0],
977 row_start, &m_cols, row_end, &m_master_reclength,
978 table->write_set, PRE_GA_WRITE_ROWS_EVENT);
979 bitmap_copy(table->read_set, table->write_set);
984 int Write_rows_log_event_old::do_exec_row(
TABLE *table)
986 DBUG_ASSERT(table != NULL);
987 int error= replace_record(thd, table, m_master_reclength, m_width);
996 int Delete_rows_log_event_old::do_before_row_operations(
TABLE *table)
998 DBUG_ASSERT(m_memory == NULL);
1000 if ((table->file->
ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
1001 table->s->primary_key < MAX_KEY)
1012 if (table->s->keys > 0)
1014 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1016 (uint) table->s->reclength,
1023 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1024 m_memory= (uchar*)m_after_image;
1028 return HA_ERR_OUT_OF_MEM;
1034 int Delete_rows_log_event_old::do_after_row_operations(
TABLE *table,
int error)
1037 table->file->ha_index_or_rnd_end();
1040 m_after_image= NULL;
1048 Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
1051 uchar
const *row_start,
1052 uchar
const **row_end)
1055 DBUG_ASSERT(row_start && row_end);
1060 DBUG_ASSERT(table->s->fields >= m_width);
1062 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1063 table, m_width, table->record[0],
1064 row_start, &m_cols, row_end, &m_master_reclength,
1065 table->read_set, PRE_GA_DELETE_ROWS_EVENT);
1072 KEY *
const key_info= table->key_info;
1074 key_copy(m_key, table->record[0], key_info, 0);
1081 int Delete_rows_log_event_old::do_exec_row(
TABLE *table)
1084 DBUG_ASSERT(table != NULL);
1086 if (!(error= ::find_and_fetch_row(table, m_key)))
1093 error= table->file->ha_delete_row(table->record[0]);
1103 int Update_rows_log_event_old::do_before_row_operations(
TABLE *table)
1105 DBUG_ASSERT(m_memory == NULL);
1109 if (table->s->keys > 0)
1111 m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
1113 (uint) table->s->reclength,
1120 m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
1121 m_memory= m_after_image;
1125 return HA_ERR_OUT_OF_MEM;
1131 int Update_rows_log_event_old::do_after_row_operations(
TABLE *table,
int error)
1134 table->file->ha_index_or_rnd_end();
1137 m_after_image= NULL;
1144 int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
1147 uchar
const *row_start,
1148 uchar
const **row_end)
1151 DBUG_ASSERT(row_start && row_end);
1156 DBUG_ASSERT(table->s->fields >= m_width);
1159 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1160 table, m_width, table->record[0],
1161 row_start, &m_cols, row_end, &m_master_reclength,
1162 table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
1163 row_start = *row_end;
1165 error= unpack_row_old(const_cast<Relay_log_info*>(rli),
1166 table, m_width, m_after_image,
1167 row_start, &m_cols, row_end, &m_master_reclength,
1168 table->write_set, PRE_GA_UPDATE_ROWS_EVENT);
1170 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
1171 DBUG_DUMP(
"m_after_image", m_after_image, table->s->reclength);
1179 KEY *
const key_info= table->key_info;
1181 key_copy(m_key, table->record[0], key_info, 0);
1188 int Update_rows_log_event_old::do_exec_row(
TABLE *table)
1190 DBUG_ASSERT(table != NULL);
1192 int error= ::find_and_fetch_row(table, m_key);
1209 memcpy(table->record[0], m_after_image, table->s->reclength);
1210 copy_extra_record_fields(table, m_master_reclength, m_width);
1218 error= table->file->ha_update_row(table->record[1], table->record[0]);
1219 if (error == HA_ERR_RECORD_IS_THE_SAME)
1232 #ifndef MYSQL_CLIENT
1233 Old_rows_log_event::Old_rows_log_event(THD *thd_arg,
TABLE *tbl_arg, ulong tid,
1237 using_trans ?
Log_event::EVENT_TRANSACTIONAL_CACHE :
1243 m_width(tbl_arg ? tbl_arg->s->fields : 1),
1244 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
1245 #ifdef HAVE_REPLICATION
1246 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1259 DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
1260 (!tbl_arg && !cols && tid == ~0UL));
1263 set_flags(NO_FOREIGN_KEY_CHECKS_F);
1265 set_flags(RELAXED_UNIQUE_CHECKS_F);
1267 if (likely(!bitmap_init(&m_cols,
1268 m_width <=
sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1273 if (likely(cols != NULL))
1275 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
1276 create_last_word_mask(&m_cols);
1288 Old_rows_log_event::Old_rows_log_event(
const char *
buf, uint event_len,
1294 #ifndef MYSQL_CLIENT
1297 m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
1298 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1299 , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
1302 DBUG_ENTER(
"Old_rows_log_event::Old_Rows_log_event(const char*,...)");
1303 uint8
const common_header_len= description_event->common_header_len;
1304 uint8
const post_header_len= description_event->post_header_len[event_type-1];
1306 DBUG_PRINT(
"enter",(
"event_len: %u common_header_len: %d "
1307 "post_header_len: %d",
1308 event_len, common_header_len,
1311 const char *post_start= buf + common_header_len;
1312 DBUG_DUMP(
"post_header", (uchar*) post_start, post_header_len);
1313 post_start+= RW_MAPID_OFFSET;
1314 if (post_header_len == 6)
1317 m_table_id= uint4korr(post_start);
1322 m_table_id= (ulong) uint6korr(post_start);
1323 post_start+= RW_FLAGS_OFFSET;
1326 m_flags= uint2korr(post_start);
1328 uchar
const *
const var_start=
1329 (
const uchar *)buf + common_header_len + post_header_len;
1330 uchar
const *
const ptr_width= var_start;
1331 uchar *ptr_after_width= (uchar*) ptr_width;
1332 DBUG_PRINT(
"debug", (
"Reading from %p", ptr_after_width));
1333 m_width = net_field_length(&ptr_after_width);
1334 DBUG_PRINT(
"debug", (
"m_width=%lu", m_width));
1336 if (likely(!bitmap_init(&m_cols,
1337 m_width <=
sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
1341 DBUG_PRINT(
"debug", (
"Reading from %p", ptr_after_width));
1342 memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
1343 create_last_word_mask(&m_cols);
1344 ptr_after_width+= (m_width + 7) / 8;
1345 DBUG_DUMP(
"m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1350 m_cols.bitmap= NULL;
1354 const uchar*
const ptr_rows_data= (
const uchar*) ptr_after_width;
1355 size_t const data_size= event_len - (ptr_rows_data - (
const uchar *) buf);
1356 DBUG_PRINT(
"info",(
"m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
1357 m_table_id.id(), m_flags, m_width, (ulong) data_size));
1358 DBUG_DUMP(
"rows_data", (uchar*) ptr_rows_data, data_size);
1360 m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
1361 if (likely((
bool)m_rows_buf))
1363 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1364 m_curr_row= m_rows_buf;
1366 m_rows_end= m_rows_buf + data_size;
1367 m_rows_cur= m_rows_end;
1368 memcpy(m_rows_buf, ptr_rows_data, data_size);
1377 Old_rows_log_event::~Old_rows_log_event()
1379 if (m_cols.bitmap == m_bitbuf)
1381 bitmap_free(&m_cols);
1382 my_free(m_rows_buf);
1386 int Old_rows_log_event::get_data_size()
1388 uchar buf[
sizeof(m_width)+1];
1389 uchar *end= net_store_length(buf, (m_width + 7) / 8);
1391 DBUG_EXECUTE_IF(
"old_row_based_repl_4_byte_map_id_master",
1392 return 6 + no_bytes_in_map(&m_cols) + (end - buf) +
1393 (m_rows_cur - m_rows_buf););
1394 int data_size= ROWS_HEADER_LEN;
1395 data_size+= no_bytes_in_map(&m_cols);
1396 data_size+= (uint) (end - buf);
1398 data_size+= (uint) (m_rows_cur - m_rows_buf);
1403 #ifndef MYSQL_CLIENT
1404 int Old_rows_log_event::do_add_row_data(uchar *row_data,
size_t length)
1411 DBUG_ENTER(
"Old_rows_log_event::do_add_row_data");
1412 DBUG_PRINT(
"enter", (
"row_data: 0x%lx length: %lu", (ulong) row_data,
1419 DBUG_DUMP(
"row_data", row_data, min<size_t>(length, 32));
1422 DBUG_ASSERT(m_rows_buf <= m_rows_cur);
1423 DBUG_ASSERT(!m_rows_buf || (m_rows_end && m_rows_buf < m_rows_end));
1424 DBUG_ASSERT(m_rows_cur <= m_rows_end);
1427 if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
1429 size_t const block_size= 1024;
1430 my_ptrdiff_t
const cur_size= m_rows_cur - m_rows_buf;
1431 my_ptrdiff_t
const new_alloc=
1432 block_size * ((cur_size + length + block_size - 1) / block_size);
1434 uchar*
const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
1435 MYF(MY_ALLOW_ZERO_PTR|MY_WME));
1436 if (unlikely(!new_buf))
1437 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1440 if (new_buf != m_rows_buf)
1442 m_rows_buf= new_buf;
1443 m_rows_cur= m_rows_buf + cur_size;
1450 m_rows_end= m_rows_buf + new_alloc;
1453 DBUG_ASSERT(m_rows_cur + length <= m_rows_end);
1454 memcpy(m_rows_cur, row_data, length);
1455 m_rows_cur+= length;
1462 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1463 int Old_rows_log_event::do_apply_event(
Relay_log_info const *rli)
1465 DBUG_ENTER(
"Old_rows_log_event::do_apply_event(Relay_log_info*)");
1474 if ((m_table_id.id() == ~0
U || m_table_id.id() == (~0ULL >> 16)) &&
1475 m_cols.n_bits == 1 && m_cols.bitmap[0] == 0)
1481 DBUG_ASSERT(get_flags(STMT_END_F));
1483 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(thd);
1493 DBUG_ASSERT(rli->info_thd == thd);
1512 rli->tables_to_lock_count, 0)))
1514 if (thd->is_slave_error || thd->is_fatal_error)
1520 uint actual_error= thd->net.last_errno;
1521 rli->
report(ERROR_LEVEL, actual_error,
1522 "Error '%s' in %s event: when locking tables",
1523 (actual_error ? thd->net.last_error :
1524 "unexpected success or fatal error"),
1526 thd->is_fatal_error= 1;
1530 rli->
report(ERROR_LEVEL, error,
1531 "Error in %s event: when locking tables",
1534 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(thd);
1548 for (uint
i= 0 ; ptr&& (
i< rli->tables_to_lock_count);
1552 if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
1553 ptr->table, &conv_table))
1555 thd->is_slave_error= 1;
1556 const_cast<Relay_log_info*
>(rli)->slave_close_thread_tables(thd);
1559 ptr->m_conv_table= conv_table;
1578 for (
TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
1580 const_cast<Relay_log_info*
>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
1582 #ifdef HAVE_QUERY_CACHE
1583 query_cache.invalidate_locked_for_write(rli->tables_to_lock);
1589 m_table=
const_cast<Relay_log_info*
>(rli)->m_table_map.get_table(m_table_id);
1607 thd->set_time(&when);
1613 if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
1618 if (get_flags(RELAXED_UNIQUE_CHECKS_F))
1636 if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
1649 DBUG_PRINT_BITSET(
"debug",
"Setting table's write_set from: %s", &m_cols);
1651 bitmap_set_all(table->read_set);
1652 bitmap_set_all(table->write_set);
1654 bitmap_intersect(table->write_set,&m_cols);
1658 error= do_before_row_operations(rli);
1662 while (error == 0 && m_curr_row < m_rows_end)
1665 THD* old_thd= table->in_use;
1669 error= do_exec_row(rli);
1671 DBUG_PRINT(
"info", (
"error: %d", error));
1672 DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
1674 table->in_use = old_thd;
1681 case HA_ERR_RECORD_CHANGED:
1682 case HA_ERR_KEY_NOT_FOUND:
1688 rli->
report(ERROR_LEVEL, thd->net.last_errno,
1689 "Error in %s event: row application failed. %s",
1691 thd->net.last_error ? thd->net.last_error :
"");
1692 thd->is_slave_error= 1;
1703 DBUG_PRINT(
"info", (
"error: %d", error));
1704 DBUG_PRINT(
"info", (
"curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
1705 (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
1707 if (!m_curr_row_end && !error)
1708 unpack_current_row(rli);
1711 DBUG_ASSERT(error || m_curr_row_end != NULL);
1712 DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
1713 DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
1715 m_curr_row= m_curr_row_end;
1719 DBUG_EXECUTE_IF(
"stop_slave_middle_group",
1720 const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
1721 error= do_after_row_operations(rli, error);
1726 rli->
report(ERROR_LEVEL, thd->net.last_errno,
1727 "Error in %s event: error during transaction execution "
1728 "on table %s.%s. %s",
1730 table->s->table_name.str,
1731 thd->net.last_error ? thd->net.last_error :
"");
1744 thd->reset_current_stmt_binlog_format_row();
1746 thd->is_slave_error= 1;
1755 if (table && (table->s->primary_key == MAX_KEY) &&
1756 !is_using_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS)
1774 const_cast<Relay_log_info*
>(rli)->last_event_start_time= my_time(0);
1777 if (get_flags(STMT_END_F))
1794 int binlog_error= thd->binlog_flush_pending_rows_event(TRUE);
1807 DBUG_ASSERT(! thd->transaction_rollback_request);
1808 if ((error= (binlog_error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd))))
1809 rli->
report(ERROR_LEVEL, error,
1810 "Error in %s event: commit of row events failed, "
1813 m_table->s->table_name.str);
1814 error|= binlog_error;
1826 thd->reset_current_stmt_binlog_format_row();
1842 if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
1845 return Log_event::do_shall_skip(rli);
1851 DBUG_ENTER(
"Old_rows_log_event::do_update_pos");
1854 DBUG_PRINT(
"info", (
"flags: %s",
1855 get_flags(STMT_END_F) ?
"STMT_END_F " :
""));
1857 if (get_flags(STMT_END_F))
1875 rli->inc_event_relay_log_pos();
1884 #ifndef MYSQL_CLIENT
1885 bool Old_rows_log_event::write_data_header(
IO_CACHE *
file)
1893 bool Old_rows_log_event::write_data_body(
IO_CACHE*
file)
1899 uchar sbuf[
sizeof(m_width)];
1900 my_ptrdiff_t
const data_size= m_rows_cur - m_rows_buf;
1906 uchar *
const sbuf_end= net_store_length(sbuf, (
size_t) m_width);
1907 DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <=
sizeof(sbuf));
1909 DBUG_DUMP(
"m_width", sbuf, (
size_t) (sbuf_end - sbuf));
1910 res= res || my_b_safe_write(file, sbuf, (
size_t) (sbuf_end - sbuf));
1912 DBUG_DUMP(
"m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
1913 res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
1914 no_bytes_in_map(&m_cols));
1915 DBUG_DUMP(
"rows", m_rows_buf, data_size);
1916 res= res || my_b_safe_write(file, m_rows_buf, (
size_t) data_size);
1924 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1925 int Old_rows_log_event::pack_info(
Protocol *protocol)
1928 char const *
const flagstr=
1929 get_flags(STMT_END_F) ?
" flags: STMT_END_F" :
"";
1930 size_t bytes= my_snprintf(buf,
sizeof(buf),
1931 "table_id: %llu%s", m_table_id.id(), flagstr);
1932 protocol->
store(buf, bytes, &my_charset_bin);
1939 void Old_rows_log_event::print_helper(FILE *file,
1940 PRINT_EVENT_INFO *print_event_info,
1941 char const *
const name)
1943 IO_CACHE *
const head= &print_event_info->head_cache;
1944 IO_CACHE *
const body= &print_event_info->body_cache;
1945 if (!print_event_info->short_form)
1947 bool const last_stmt_event= get_flags(STMT_END_F);
1948 print_header(head, print_event_info, !last_stmt_event);
1949 my_b_printf(head,
"\t%s: table id %llu%s\n",
1950 name, m_table_id.id(),
1951 last_stmt_event ?
" flags: STMT_END_F" :
"");
1952 print_base64(body, print_event_info, !last_stmt_event);
1958 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1996 const bool overwrite)
1998 DBUG_ENTER(
"write_row");
1999 DBUG_ASSERT(m_table != NULL && thd != NULL);
2001 TABLE *table= m_table;
2008 if ((error= prepare_record(table, table->write_set,
2013 error= unpack_current_row(rli);
2016 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
2017 DBUG_PRINT_BITSET(
"debug",
"write_set = %s", table->write_set);
2018 DBUG_PRINT_BITSET(
"debug",
"read_set = %s", table->read_set);
2029 while ((error= table->file->ha_write_row(table->record[0])))
2031 if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
2036 if ((keynum= table->file->
get_dup_key(error)) < 0)
2038 DBUG_PRINT(
"info",(
"Can't locate duplicate key (get_dup_key returns %d)",keynum));
2060 DBUG_PRINT(
"info",(
"Locating offending record using ha_rnd_pos()"));
2061 error= table->file->
ha_rnd_pos(table->record[1], table->file->dup_ref);
2064 DBUG_PRINT(
"info",(
"ha_rnd_pos() returns error %d",error));
2065 if (error == HA_ERR_RECORD_DELETED)
2066 error= HA_ERR_KEY_NOT_FOUND;
2073 DBUG_PRINT(
"info",(
"Locating offending record using index_read_idx()"));
2075 if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
2077 DBUG_PRINT(
"info",(
"Error when setting HA_EXTRA_FLUSH_CACHE"));
2078 DBUG_RETURN(my_errno);
2081 if (key.get() == NULL)
2083 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
2084 if (key.get() == NULL)
2086 DBUG_PRINT(
"info",(
"Can't allocate key buffer"));
2087 DBUG_RETURN(ENOMEM);
2091 key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
2094 (
const uchar*)key.get(),
2099 DBUG_PRINT(
"info",(
"ha_index_read_idx_map() returns error %d", error));
2100 if (error == HA_ERR_RECORD_DELETED)
2101 error= HA_ERR_KEY_NOT_FOUND;
2119 restore_record(table,
record[1]);
2120 error= unpack_current_row(rli);
2124 DBUG_PRINT(
"debug",(
"preparing for update: before and after image"));
2125 DBUG_DUMP(
"record[1] (before)", table->record[1], table->s->reclength);
2126 DBUG_DUMP(
"record[0] (after)", table->record[0], table->s->reclength);
2144 if (last_uniq_key(table, keynum) &&
2145 !table->file->referenced_by_foreign_key())
2147 DBUG_PRINT(
"info",(
"Updating row using ha_update_row()"));
2148 error=table->file->ha_update_row(table->record[1],
2152 case HA_ERR_RECORD_IS_THE_SAME:
2153 DBUG_PRINT(
"info",(
"ignoring HA_ERR_RECORD_IS_THE_SAME error from"
2154 " ha_update_row()"));
2161 DBUG_PRINT(
"info",(
"ha_update_row() returns error %d",error));
2169 DBUG_PRINT(
"info",(
"Deleting offending row and trying to write new one again"));
2170 if ((error= table->file->ha_delete_row(table->record[1])))
2172 DBUG_PRINT(
"info",(
"ha_delete_row() returns error %d",error));
2213 DBUG_ENTER(
"find_row");
2215 DBUG_ASSERT(m_table && m_table->in_use != NULL);
2217 TABLE *table= m_table;
2223 prepare_record(table, table->read_set, FALSE );
2224 error= unpack_current_row(rli);
2227 DBUG_PRINT(
"info",(
"looking for the following record"));
2228 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
2231 if ((table->file->
ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2232 table->s->primary_key < MAX_KEY)
2252 DBUG_PRINT(
"info",(
"locating record using primary key (position)"));
2256 DBUG_PRINT(
"info",(
"rnd_pos returns error %d",error));
2257 if (error == HA_ERR_RECORD_DELETED)
2258 error= HA_ERR_KEY_NOT_FOUND;
2270 table->use_all_columns();
2276 store_record(table,
record[1]);
2278 if (table->s->keys > 0)
2280 DBUG_PRINT(
"info",(
"locating record using primary key (index_read)"));
2283 if (!table->file->inited && (error= table->file->
ha_index_init(0, FALSE)))
2285 DBUG_PRINT(
"info",(
"ha_index_init returns error %d",error));
2293 key_copy(m_key, table->record[0], table->key_info, 0);
2300 DBUG_DUMP(
"key data", m_key, table->key_info->
key_length);
2309 my_ptrdiff_t
const pos=
2310 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
2311 table->record[0][pos]= 0xFF;
2315 HA_READ_KEY_EXACT)))
2317 DBUG_PRINT(
"info",(
"no record matching the key found in the table"));
2318 if (error == HA_ERR_RECORD_DELETED)
2319 error= HA_ERR_KEY_NOT_FOUND;
2330 DBUG_PRINT(
"info",(
"found first matching record"));
2331 DBUG_DUMP(
"record[0]", table->record[0], table->s->reclength);
2347 if (table->key_info->
flags & HA_NOSAME)
2350 if (!(table->key_info->
flags & (HA_NULL_PART_KEY)))
2357 KEY *keyinfo= table->key_info;
2362 bool null_found= FALSE;
2365 uint fieldnr= keyinfo->key_part[
i].fieldnr - 1;
2366 Field **f= table->field+fieldnr;
2367 null_found= (*f)->is_null();
2385 DBUG_PRINT(
"info",(
"non-unique index, scanning it to find matching record"));
2387 while (record_compare(table))
2398 if (table->s->null_bytes > 0)
2400 table->record[0][table->s->null_bytes - 1]|=
2401 256
U - (1
U << table->s->last_null_bit_pos);
2404 while ((error= table->file->
ha_index_next(table->record[0])))
2407 if (error == HA_ERR_RECORD_DELETED)
2409 DBUG_PRINT(
"info",(
"no record matching the given row found"));
2423 DBUG_PRINT(
"info",(
"locating record using table scan (ha_rnd_next)"));
2425 int restart_count= 0;
2430 DBUG_PRINT(
"info",(
"error initializing table scan"
2431 " (ha_rnd_init returns %d)",error));
2439 restart_ha_rnd_next:
2440 error= table->file->
ha_rnd_next(table->record[0]);
2447 case HA_ERR_RECORD_DELETED:
2448 goto restart_ha_rnd_next;
2450 case HA_ERR_END_OF_FILE:
2451 if (++restart_count < 2)
2462 DBUG_PRINT(
"info", (
"Failed to get next record"
2463 " (ha_rnd_next returns %d)",error));
2469 while (restart_count < 2 && record_compare(table));
2479 if (restart_count == 2)
2480 DBUG_PRINT(
"info", (
"Record not found"));
2482 DBUG_DUMP(
"record found", table->record[0], table->s->reclength);
2485 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
2502 #if !defined(MYSQL_CLIENT)
2503 Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg,
2507 bool is_transactional)
2521 #ifdef HAVE_REPLICATION
2522 Write_rows_log_event_old::Write_rows_log_event_old(
const char *buf,
2533 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2546 thd->lex->duplicates= DUP_REPLACE;
2553 thd->lex->sql_command= SQLCOM_REPLACE;
2557 m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2564 m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2570 m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
2586 m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2587 m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2598 return error? error : local_error;
2603 Write_rows_log_event_old::do_exec_row(
const Relay_log_info *
const rli)
2605 DBUG_ASSERT(m_table != NULL);
2606 int error= write_row(rli, TRUE );
2608 if (error && !thd->net.last_errno)
2609 thd->net.last_errno= error;
2618 void Write_rows_log_event_old::print(FILE *file,
2619 PRINT_EVENT_INFO* print_event_info)
2621 Old_rows_log_event::print_helper(file, print_event_info,
"Write_rows_old");
2634 #ifndef MYSQL_CLIENT
2635 Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg,
2639 bool is_transactional)
2641 m_after_image(NULL), m_memory(NULL)
2654 #ifdef HAVE_REPLICATION
2655 Delete_rows_log_event_old::Delete_rows_log_event_old(
const char *buf,
2661 m_after_image(NULL), m_memory(NULL)
2667 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2672 if ((m_table->file->
ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
2673 m_table->s->primary_key < MAX_KEY)
2681 if (m_table->s->keys > 0)
2684 m_key= (uchar*)my_malloc(m_table->key_info->
key_length, MYF(MY_WME));
2686 return HA_ERR_OUT_OF_MEM;
2697 m_table->file->ha_index_or_rnd_end();
2705 int Delete_rows_log_event_old::do_exec_row(
const Relay_log_info *
const rli)
2708 DBUG_ASSERT(m_table != NULL);
2710 if (!(error= find_row(rli)))
2715 error= m_table->file->ha_delete_row(m_table->record[0]);
2724 void Delete_rows_log_event_old::print(FILE *file,
2725 PRINT_EVENT_INFO* print_event_info)
2727 Old_rows_log_event::print_helper(file, print_event_info,
"Delete_rows_old");
2739 #if !defined(MYSQL_CLIENT)
2740 Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg,
2744 bool is_transactional)
2746 m_after_image(NULL), m_memory(NULL)
2758 #ifdef HAVE_REPLICATION
2759 Update_rows_log_event_old::Update_rows_log_event_old(
const char *buf,
2766 m_after_image(NULL), m_memory(NULL)
2772 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
2777 if (m_table->s->keys > 0)
2780 m_key= (uchar*)my_malloc(m_table->key_info->
key_length, MYF(MY_WME));
2782 return HA_ERR_OUT_OF_MEM;
2794 m_table->file->ha_index_or_rnd_end();
2803 Update_rows_log_event_old::do_exec_row(
const Relay_log_info *
const rli)
2805 DBUG_ASSERT(m_table != NULL);
2807 int error= find_row(rli);
2814 m_curr_row= m_curr_row_end;
2815 unpack_current_row(rli);
2830 store_record(m_table,
record[1]);
2832 m_curr_row= m_curr_row_end;
2833 error= unpack_current_row(rli);
2844 DBUG_PRINT(
"info",(
"Updating row in table"));
2845 DBUG_DUMP(
"old record", m_table->record[1], m_table->s->reclength);
2846 DBUG_DUMP(
"new values", m_table->record[0], m_table->s->reclength);
2849 error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
2850 if (error == HA_ERR_RECORD_IS_THE_SAME)
2860 void Update_rows_log_event_old::print(FILE *file,
2861 PRINT_EVENT_INFO* print_event_info)
2863 Old_rows_log_event::print_helper(file, print_event_info,
"Update_rows_old");