18 #include "rpl_injector.h"
19 #include "transaction.h"
20 #include "sql_parse.h"
31 injector::transaction::transaction(
MYSQL_BIN_LOG *log, THD *thd)
32 : m_state(START_STATE), m_thd(thd)
39 log->get_current_log(&log_info);
41 m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
42 m_start_pos.m_file_pos= log_info.pos;
44 if (unlikely(m_start_pos.m_file_name == NULL))
53 m_next_pos.m_file_name= 0;
54 m_next_pos.m_file_pos= 0;
64 m_thd->clear_next_event_pos();
69 injector::transaction::~transaction()
75 char*
const start_pos_memory=
const_cast<char*
>(m_start_pos.m_file_name);
79 my_free(start_pos_memory);
82 char*
const next_pos_memory=
const_cast<char*
>(m_next_pos.m_file_name);
85 my_free(next_pos_memory);
95 DBUG_ENTER(
"injector::transaction::commit()");
96 int error= m_thd->binlog_flush_pending_rows_event(
true);
116 trans_commit_stmt(m_thd);
117 if (!trans_commit(m_thd))
119 close_thread_tables(m_thd);
120 m_thd->mdl_context.release_transactional_locks();
125 (m_thd->binlog_next_event_pos.file_name != NULL) &&
126 ((m_next_pos.m_file_name=
127 my_strdup(m_thd->binlog_next_event_pos.file_name, MYF(0))) != NULL))
129 m_next_pos.m_file_pos= m_thd->binlog_next_event_pos.pos;
134 m_next_pos.m_file_name= NULL;
135 m_next_pos.m_file_pos= 0;
142 int injector::transaction::rollback()
144 DBUG_ENTER(
"injector::transaction::rollback()");
145 trans_rollback_stmt(m_thd);
146 if (!trans_rollback(m_thd))
148 close_thread_tables(m_thd);
149 if (!m_thd->locked_tables_mode)
150 m_thd->mdl_context.release_transactional_locks();
156 int injector::transaction::use_table(server_id_type sid,
table tbl)
158 DBUG_ENTER(
"injector::transaction::use_table");
162 if ((error= check_state(TABLE_STATE)))
165 server_id_type save_id= m_thd->server_id;
166 m_thd->set_server_id(sid);
167 error= m_thd->binlog_write_table_map(tbl.get_table(),
168 tbl.is_transactional(), FALSE);
169 m_thd->set_server_id(save_id);
174 int injector::transaction::write_row (server_id_type sid,
table tbl,
177 const uchar* extra_row_info)
179 DBUG_ENTER(
"injector::transaction::write_row(...)");
181 int error= check_state(ROW_STATE);
185 server_id_type save_id= m_thd->server_id;
186 m_thd->set_server_id(sid);
187 table::save_sets saveset(tbl, cols, cols);
189 error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
190 record, extra_row_info);
191 m_thd->set_server_id(save_id);
195 int injector::transaction::write_row (server_id_type sid,
table tbl,
199 return write_row(sid, tbl, cols, colcnt, record, NULL);
203 int injector::transaction::delete_row(server_id_type sid,
table tbl,
206 const uchar* extra_row_info)
208 DBUG_ENTER(
"injector::transaction::delete_row(...)");
210 int error= check_state(ROW_STATE);
214 server_id_type save_id= m_thd->server_id;
215 m_thd->set_server_id(sid);
216 table::save_sets saveset(tbl, cols, cols);
217 error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
218 record, extra_row_info);
219 m_thd->set_server_id(save_id);
223 int injector::transaction::delete_row(server_id_type sid,
table tbl,
227 return delete_row(sid, tbl, cols, colcnt, record, NULL);
231 int injector::transaction::update_row(server_id_type sid,
table tbl,
233 record_type before, record_type after,
234 const uchar* extra_row_info)
236 DBUG_ENTER(
"injector::transaction::update_row(...)");
238 int error= check_state(ROW_STATE);
242 server_id_type save_id= m_thd->server_id;
243 m_thd->set_server_id(sid);
245 table::save_sets saveset(tbl, cols, cols);
247 error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
248 before, after, extra_row_info);
249 m_thd->set_server_id(save_id);
253 int injector::transaction::update_row(server_id_type sid,
table tbl,
255 record_type before, record_type after)
257 return update_row(sid, tbl, cols, colcnt, before, after, NULL);
275 inline injector::injector()
288 void injector::free_instance()
301 DBUG_ENTER(
"injector::new_trans(THD *, transaction *)");
306 transaction trans(&mysql_bin_log, thd);
312 int injector::record_incident(THD *thd, Incident incident)
318 int injector::record_incident(THD *thd, Incident incident,
LEX_STRING const message)