21 #include "rpl_filter.h" 
   23 #include "rpl_handler.h" 
   27 #ifdef HAVE_REPLICATION 
   28 Binlog_transmit_delegate *binlog_transmit_delegate;
 
   29 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
 
   37   char log_file[FN_REFLEN];
 
   40 int get_user_var_int(
const char *
name,
 
   41                      long long int *value, 
int *null_value)
 
   44   user_var_entry *
entry= 
 
   45     (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 
   46                                   (uchar*) 
name, strlen(name));
 
   49   *value= entry->val_int(&null_val);
 
   51     *null_value= null_val;
 
   55 int get_user_var_real(
const char *name,
 
   56                       double *value, 
int *null_value)
 
   59   user_var_entry *entry= 
 
   60     (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 
   61                                   (uchar*) 
name, strlen(name));
 
   64   *value= entry->val_real(&null_val);
 
   66     *null_value= null_val;
 
   70 int get_user_var_str(
const char *name, 
char *value,
 
   71                      size_t len, 
unsigned int precision, 
int *null_value)
 
   75   user_var_entry *entry= 
 
   76     (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 
   77                                   (uchar*) 
name, strlen(name));
 
   80   entry->val_str(&null_val, &str, precision);
 
   81   strncpy(value, str.c_ptr(), len);
 
   83     *null_value= null_val;
 
   89   static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
 
   91                             MY_ALIGNOF(
long)> storage_mem;
 
   92 #ifdef HAVE_REPLICATION 
   93   static my_aligned_storage<
sizeof(Binlog_transmit_delegate),
 
   94                             MY_ALIGNOF(
long)> transmit_mem;
 
   95   static my_aligned_storage<
sizeof(Binlog_relay_IO_delegate),
 
   96                             MY_ALIGNOF(
long)> relay_io_mem;
 
   99   void *place_trans_mem= trans_mem.data;
 
  100   void *place_storage_mem= storage_mem.data;
 
  104   if (!transaction_delegate->is_inited())
 
  106     sql_print_error(
"Initialization of transaction delegates failed. " 
  107                     "Please report a bug.");
 
  113   if (!binlog_storage_delegate->is_inited())
 
  115     sql_print_error(
"Initialization binlog storage delegates failed. " 
  116                     "Please report a bug.");
 
  120 #ifdef HAVE_REPLICATION 
  121   void *place_transmit_mem= transmit_mem.data;
 
  122   void *place_relay_io_mem= relay_io_mem.data;
 
  124   binlog_transmit_delegate= 
new (place_transmit_mem) Binlog_transmit_delegate;
 
  126   if (!binlog_transmit_delegate->is_inited())
 
  128     sql_print_error(
"Initialization of binlog transmit delegates failed. " 
  129                     "Please report a bug.");
 
  133   binlog_relay_io_delegate= 
new (place_relay_io_mem) Binlog_relay_IO_delegate;
 
  135   if (!binlog_relay_io_delegate->is_inited())
 
  137     sql_print_error(
"Initialization binlog relay IO delegates failed. " 
  138                     "Please report a bug.");
 
  146 void delegates_destroy()
 
  148   if (transaction_delegate)
 
  149     transaction_delegate->~Trans_delegate();
 
  150   if (binlog_storage_delegate)
 
  151     binlog_storage_delegate->~Binlog_storage_delegate();
 
  152 #ifdef HAVE_REPLICATION 
  153   if (binlog_transmit_delegate)
 
  154     binlog_transmit_delegate->~Binlog_transmit_delegate();
 
  155   if (binlog_relay_io_delegate)
 
  156     binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
 
  168 #define FOREACH_OBSERVER(r, f, thd, args)                               \ 
  169   param.server_id= thd->server_id;                                      \ 
  175     DYNAMIC_ARRAY plugins;                                              \ 
  177     plugin_ref plugins_buffer[8];                                       \ 
  179   DYNAMIC_ARRAY *plugins= &s.plugins;                                   \ 
  180   plugin_ref *plugins_buffer= s.plugins_buffer;                         \ 
  181   my_init_dynamic_array2(plugins, sizeof(plugin_ref),                   \ 
  182                          plugins_buffer, 8, 8);                         \ 
  184   Observer_info_iterator iter= observer_info_iter();                    \ 
  185   Observer_info *info= iter++;                                          \ 
  186   for (; info; info= iter++)                                            \ 
  189       my_plugin_lock(0, &info->plugin);                                 \ 
  196     insert_dynamic(plugins, &plugin);                                   \ 
  197     if (((Observer *)info->observer)->f                                 \ 
  198         && ((Observer *)info->observer)->f args)                        \ 
  201       sql_print_error("Run function '" #f "' in plugin '%s' failed",    \ 
  202                       info->plugin_int->name.str);                      \ 
  214   plugin_unlock_list(0, (plugin_ref*)plugins->buffer,                   \ 
  215                      plugins->elements);                                \ 
  216   delete_dynamic(plugins) 
  219 int Trans_delegate::after_commit(THD *thd, 
bool all)
 
  221   DBUG_ENTER(
"Trans_delegate::after_commit");
 
  223   bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
 
  228   thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
 
  230   DBUG_PRINT(
"enter", (
"log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
 
  233   FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
 
  237 int Trans_delegate::after_rollback(THD *thd, 
bool all)
 
  240   bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
 
  243     param.flags|= TRANS_IS_REAL_TRANS;
 
  244   thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos);
 
  246   FOREACH_OBSERVER(ret, after_rollback, thd, (¶m));
 
  250 int Binlog_storage_delegate::after_flush(THD *thd,
 
  251                                          const char *log_file,
 
  254   DBUG_ENTER(
"Binlog_storage_delegate::after_flush");
 
  255   DBUG_PRINT(
"enter", (
"log_file: %s, log_pos: %llu",
 
  256                        log_file, (ulonglong) log_pos));
 
  260   FOREACH_OBSERVER(ret, after_flush, thd, (¶m, log_file, log_pos));
 
  264 #ifdef HAVE_REPLICATION 
  265 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort 
flags,
 
  266                                              const char *log_file,
 
  273   FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
 
  277 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
 
  283   FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
 
  287 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
 
  295 #define RESERVE_HEADER_SIZE 32 
  296   unsigned char header[RESERVE_HEADER_SIZE];
 
  300   param.server_id= thd->server_id;
 
  304   Observer_info_iterator iter= observer_info_iter();
 
  306   for (; info; info= iter++)
 
  309       my_plugin_lock(thd, &info->plugin);
 
  316     if (((Observer *)info->observer)->reserve_header
 
  317         && ((Observer *)info->observer)->reserve_header(¶m,
 
  323       plugin_unlock(thd, plugin);
 
  326     plugin_unlock(thd, plugin);
 
  329     if (hlen > RESERVE_HEADER_SIZE || packet->append((
char *)header, hlen))
 
  339 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
 
  341                                                 const char *log_file,
 
  348   FOREACH_OBSERVER(ret, before_send_event, thd,
 
  349                    (¶m, (uchar *)packet->c_ptr(),
 
  351                     log_file+dirname_length(log_file), log_pos));
 
  355 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
 
  357                                                const char *skipped_log_file,
 
  358                                                my_off_t skipped_log_pos)
 
  364   FOREACH_OBSERVER(ret, after_send_event, thd,
 
  365                    (¶m, packet->c_ptr(), packet->length(),
 
  366                    skipped_log_file+dirname_length(skipped_log_file),
 
  371 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
 
  378   FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
 
  385   param->mysql= mi->mysql;
 
  386   param->user= 
const_cast<char *
>(mi->get_user());
 
  387   param->host= mi->host;
 
  388   param->port= mi->port;
 
  389   param->master_log_name= 
const_cast<char *
>(mi->get_master_log_name());
 
  390   param->master_log_pos= mi->get_master_log_pos();
 
  393 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
 
  396   init_param(¶m, mi);
 
  399   FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
 
  404 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
 
  408   init_param(¶m, mi);
 
  411   FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
 
  415 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
 
  420   init_param(¶m, mi);
 
  423   FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
 
  427 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
 
  428                                                const char *packet, ulong len,
 
  429                                                const char **event_buf,
 
  433   init_param(¶m, mi);
 
  436   FOREACH_OBSERVER(ret, after_read_event, thd,
 
  437                    (¶m, packet, len, event_buf, event_len));
 
  441 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
 
  442                                                 const char *event_buf,
 
  447   init_param(¶m, mi);
 
  451     flags |= BINLOG_STORAGE_IS_SYNCED;
 
  454   FOREACH_OBSERVER(ret, after_queue_event, thd,
 
  455                    (¶m, event_buf, event_len, flags));
 
  459 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
 
  463   init_param(¶m, mi);
 
  466   FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
 
  473   return transaction_delegate->add_observer(observer, (
st_plugin_int *)p);
 
  478   return transaction_delegate->remove_observer(observer, (
st_plugin_int *)p);
 
  483   DBUG_ENTER(
"register_binlog_storage_observer");
 
  484   int result= binlog_storage_delegate->add_observer(observer, (
st_plugin_int *)p);
 
  490   return binlog_storage_delegate->remove_observer(observer, (
st_plugin_int *)p);
 
  493 #ifdef HAVE_REPLICATION 
  496   return binlog_transmit_delegate->add_observer(observer, (
st_plugin_int *)p);
 
  501   return binlog_transmit_delegate->remove_observer(observer, (
st_plugin_int *)p);
 
  506   return binlog_relay_io_delegate->add_observer(observer, (
st_plugin_int *)p);
 
  511   return binlog_relay_io_delegate->remove_observer(observer, (
st_plugin_int *)p);