21 #include "rpl_filter.h"
23 #include "rpl_handler.h"
27 #ifdef HAVE_REPLICATION
28 Binlog_transmit_delegate *binlog_transmit_delegate;
29 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
37 char log_file[FN_REFLEN];
40 int get_user_var_int(
const char *
name,
41 long long int *value,
int *null_value)
44 user_var_entry *
entry=
45 (user_var_entry*) my_hash_search(¤t_thd->user_vars,
46 (uchar*)
name, strlen(name));
49 *value= entry->val_int(&null_val);
51 *null_value= null_val;
55 int get_user_var_real(
const char *name,
56 double *value,
int *null_value)
59 user_var_entry *entry=
60 (user_var_entry*) my_hash_search(¤t_thd->user_vars,
61 (uchar*)
name, strlen(name));
64 *value= entry->val_real(&null_val);
66 *null_value= null_val;
70 int get_user_var_str(
const char *name,
char *value,
71 size_t len,
unsigned int precision,
int *null_value)
75 user_var_entry *entry=
76 (user_var_entry*) my_hash_search(¤t_thd->user_vars,
77 (uchar*)
name, strlen(name));
80 entry->val_str(&null_val, &str, precision);
81 strncpy(value, str.c_ptr(), len);
83 *null_value= null_val;
89 static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
91 MY_ALIGNOF(
long)> storage_mem;
92 #ifdef HAVE_REPLICATION
93 static my_aligned_storage<
sizeof(Binlog_transmit_delegate),
94 MY_ALIGNOF(
long)> transmit_mem;
95 static my_aligned_storage<
sizeof(Binlog_relay_IO_delegate),
96 MY_ALIGNOF(
long)> relay_io_mem;
99 void *place_trans_mem= trans_mem.data;
100 void *place_storage_mem= storage_mem.data;
104 if (!transaction_delegate->is_inited())
106 sql_print_error(
"Initialization of transaction delegates failed. "
107 "Please report a bug.");
113 if (!binlog_storage_delegate->is_inited())
115 sql_print_error(
"Initialization binlog storage delegates failed. "
116 "Please report a bug.");
120 #ifdef HAVE_REPLICATION
121 void *place_transmit_mem= transmit_mem.data;
122 void *place_relay_io_mem= relay_io_mem.data;
124 binlog_transmit_delegate=
new (place_transmit_mem) Binlog_transmit_delegate;
126 if (!binlog_transmit_delegate->is_inited())
128 sql_print_error(
"Initialization of binlog transmit delegates failed. "
129 "Please report a bug.");
133 binlog_relay_io_delegate=
new (place_relay_io_mem) Binlog_relay_IO_delegate;
135 if (!binlog_relay_io_delegate->is_inited())
137 sql_print_error(
"Initialization binlog relay IO delegates failed. "
138 "Please report a bug.");
146 void delegates_destroy()
148 if (transaction_delegate)
149 transaction_delegate->~Trans_delegate();
150 if (binlog_storage_delegate)
151 binlog_storage_delegate->~Binlog_storage_delegate();
152 #ifdef HAVE_REPLICATION
153 if (binlog_transmit_delegate)
154 binlog_transmit_delegate->~Binlog_transmit_delegate();
155 if (binlog_relay_io_delegate)
156 binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
168 #define FOREACH_OBSERVER(r, f, thd, args) \
169 param.server_id= thd->server_id; \
175 DYNAMIC_ARRAY plugins; \
177 plugin_ref plugins_buffer[8]; \
179 DYNAMIC_ARRAY *plugins= &s.plugins; \
180 plugin_ref *plugins_buffer= s.plugins_buffer; \
181 my_init_dynamic_array2(plugins, sizeof(plugin_ref), \
182 plugins_buffer, 8, 8); \
184 Observer_info_iterator iter= observer_info_iter(); \
185 Observer_info *info= iter++; \
186 for (; info; info= iter++) \
189 my_plugin_lock(0, &info->plugin); \
196 insert_dynamic(plugins, &plugin); \
197 if (((Observer *)info->observer)->f \
198 && ((Observer *)info->observer)->f args) \
201 sql_print_error("Run function '" #f "' in plugin '%s' failed", \
202 info->plugin_int->name.str); \
214 plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \
215 plugins->elements); \
216 delete_dynamic(plugins)
219 int Trans_delegate::after_commit(THD *thd,
bool all)
221 DBUG_ENTER(
"Trans_delegate::after_commit");
223 bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
228 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
230 DBUG_PRINT(
"enter", (
"log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
233 FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
237 int Trans_delegate::after_rollback(THD *thd,
bool all)
240 bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
243 param.flags|= TRANS_IS_REAL_TRANS;
244 thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
246 FOREACH_OBSERVER(ret, after_rollback, thd, (¶m));
250 int Binlog_storage_delegate::after_flush(THD *thd,
251 const char *log_file,
254 DBUG_ENTER(
"Binlog_storage_delegate::after_flush");
255 DBUG_PRINT(
"enter", (
"log_file: %s, log_pos: %llu",
256 log_file, (ulonglong) log_pos));
260 FOREACH_OBSERVER(ret, after_flush, thd, (¶m, log_file, log_pos));
264 #ifdef HAVE_REPLICATION
265 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort
flags,
266 const char *log_file,
273 FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
277 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
283 FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
287 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
295 #define RESERVE_HEADER_SIZE 32
296 unsigned char header[RESERVE_HEADER_SIZE];
300 param.server_id= thd->server_id;
304 Observer_info_iterator iter= observer_info_iter();
306 for (; info; info= iter++)
309 my_plugin_lock(thd, &info->plugin);
316 if (((Observer *)info->observer)->reserve_header
317 && ((Observer *)info->observer)->reserve_header(¶m,
323 plugin_unlock(thd, plugin);
326 plugin_unlock(thd, plugin);
329 if (hlen > RESERVE_HEADER_SIZE || packet->append((
char *)header, hlen))
339 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
341 const char *log_file,
348 FOREACH_OBSERVER(ret, before_send_event, thd,
349 (¶m, (uchar *)packet->c_ptr(),
351 log_file+dirname_length(log_file), log_pos));
355 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
357 const char *skipped_log_file,
358 my_off_t skipped_log_pos)
364 FOREACH_OBSERVER(ret, after_send_event, thd,
365 (¶m, packet->c_ptr(), packet->length(),
366 skipped_log_file+dirname_length(skipped_log_file),
371 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
378 FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
385 param->mysql= mi->mysql;
386 param->user=
const_cast<char *
>(mi->get_user());
387 param->host= mi->host;
388 param->port= mi->port;
389 param->master_log_name=
const_cast<char *
>(mi->get_master_log_name());
390 param->master_log_pos= mi->get_master_log_pos();
393 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
396 init_param(¶m, mi);
399 FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
404 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
408 init_param(¶m, mi);
411 FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
415 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
420 init_param(¶m, mi);
423 FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
427 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
428 const char *packet, ulong len,
429 const char **event_buf,
433 init_param(¶m, mi);
436 FOREACH_OBSERVER(ret, after_read_event, thd,
437 (¶m, packet, len, event_buf, event_len));
441 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
442 const char *event_buf,
447 init_param(¶m, mi);
451 flags |= BINLOG_STORAGE_IS_SYNCED;
454 FOREACH_OBSERVER(ret, after_queue_event, thd,
455 (¶m, event_buf, event_len, flags));
459 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
463 init_param(¶m, mi);
466 FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
473 return transaction_delegate->add_observer(observer, (
st_plugin_int *)p);
478 return transaction_delegate->remove_observer(observer, (
st_plugin_int *)p);
483 DBUG_ENTER(
"register_binlog_storage_observer");
484 int result= binlog_storage_delegate->add_observer(observer, (
st_plugin_int *)p);
490 return binlog_storage_delegate->remove_observer(observer, (
st_plugin_int *)p);
493 #ifdef HAVE_REPLICATION
496 return binlog_transmit_delegate->add_observer(observer, (
st_plugin_int *)p);
501 return binlog_transmit_delegate->remove_observer(observer, (
st_plugin_int *)p);
506 return binlog_relay_io_delegate->add_observer(observer, (
st_plugin_int *)p);
511 return binlog_relay_io_delegate->remove_observer(observer, (
st_plugin_int *)p);