MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_handler.cc
1 /* Copyright (c) 2008, 2010, 2012 Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software Foundation,
14  51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
15 
16 #include "sql_priv.h"
17 #include "unireg.h"
18 
19 #include "rpl_mi.h"
20 #include "log_event.h"
21 #include "rpl_filter.h"
22 #include <my_dir.h>
23 #include "rpl_handler.h"
24 
25 Trans_delegate *transaction_delegate;
26 Binlog_storage_delegate *binlog_storage_delegate;
27 #ifdef HAVE_REPLICATION
28 Binlog_transmit_delegate *binlog_transmit_delegate;
29 Binlog_relay_IO_delegate *binlog_relay_io_delegate;
30 #endif /* HAVE_REPLICATION */
31 
32 /*
33  structure to save transaction log filename and position
34 */
35 typedef struct Trans_binlog_info {
36  my_off_t log_pos;
37  char log_file[FN_REFLEN];
39 
40 int get_user_var_int(const char *name,
41  long long int *value, int *null_value)
42 {
43  my_bool null_val;
44  user_var_entry *entry=
45  (user_var_entry*) my_hash_search(&current_thd->user_vars,
46  (uchar*) name, strlen(name));
47  if (!entry)
48  return 1;
49  *value= entry->val_int(&null_val);
50  if (null_value)
51  *null_value= null_val;
52  return 0;
53 }
54 
55 int get_user_var_real(const char *name,
56  double *value, int *null_value)
57 {
58  my_bool null_val;
59  user_var_entry *entry=
60  (user_var_entry*) my_hash_search(&current_thd->user_vars,
61  (uchar*) name, strlen(name));
62  if (!entry)
63  return 1;
64  *value= entry->val_real(&null_val);
65  if (null_value)
66  *null_value= null_val;
67  return 0;
68 }
69 
70 int get_user_var_str(const char *name, char *value,
71  size_t len, unsigned int precision, int *null_value)
72 {
73  String str;
74  my_bool null_val;
75  user_var_entry *entry=
76  (user_var_entry*) my_hash_search(&current_thd->user_vars,
77  (uchar*) name, strlen(name));
78  if (!entry)
79  return 1;
80  entry->val_str(&null_val, &str, precision);
81  strncpy(value, str.c_ptr(), len);
82  if (null_value)
83  *null_value= null_val;
84  return 0;
85 }
86 
87 int delegates_init()
88 {
89  static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
90  static my_aligned_storage<sizeof(Binlog_storage_delegate),
91  MY_ALIGNOF(long)> storage_mem;
92 #ifdef HAVE_REPLICATION
93  static my_aligned_storage<sizeof(Binlog_transmit_delegate),
94  MY_ALIGNOF(long)> transmit_mem;
95  static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
96  MY_ALIGNOF(long)> relay_io_mem;
97 #endif
98 
99  void *place_trans_mem= trans_mem.data;
100  void *place_storage_mem= storage_mem.data;
101 
102  transaction_delegate= new (place_trans_mem) Trans_delegate;
103 
104  if (!transaction_delegate->is_inited())
105  {
106  sql_print_error("Initialization of transaction delegates failed. "
107  "Please report a bug.");
108  return 1;
109  }
110 
111  binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
112 
113  if (!binlog_storage_delegate->is_inited())
114  {
115  sql_print_error("Initialization binlog storage delegates failed. "
116  "Please report a bug.");
117  return 1;
118  }
119 
120 #ifdef HAVE_REPLICATION
121  void *place_transmit_mem= transmit_mem.data;
122  void *place_relay_io_mem= relay_io_mem.data;
123 
124  binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
125 
126  if (!binlog_transmit_delegate->is_inited())
127  {
128  sql_print_error("Initialization of binlog transmit delegates failed. "
129  "Please report a bug.");
130  return 1;
131  }
132 
133  binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
134 
135  if (!binlog_relay_io_delegate->is_inited())
136  {
137  sql_print_error("Initialization binlog relay IO delegates failed. "
138  "Please report a bug.");
139  return 1;
140  }
141 #endif
142 
143  return 0;
144 }
145 
146 void delegates_destroy()
147 {
148  if (transaction_delegate)
149  transaction_delegate->~Trans_delegate();
150  if (binlog_storage_delegate)
151  binlog_storage_delegate->~Binlog_storage_delegate();
152 #ifdef HAVE_REPLICATION
153  if (binlog_transmit_delegate)
154  binlog_transmit_delegate->~Binlog_transmit_delegate();
155  if (binlog_relay_io_delegate)
156  binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
157 #endif /* HAVE_REPLICATION */
158 }
159 
160 /*
161  This macro is used by almost all the Delegate methods to iterate
162  over all the observers running given callback function of the
163  delegate .
164 
165  Add observer plugins to the thd->lex list, after each statement, all
166  plugins add to thd->lex will be automatically unlocked.
167  */
168 #define FOREACH_OBSERVER(r, f, thd, args) \
169  param.server_id= thd->server_id; \
170  /*
171  Use a struct to make sure that they are allocated adjacent, check
172  delete_dynamic().
173  */ \
174  struct { \
175  DYNAMIC_ARRAY plugins; \
176  /* preallocate 8 slots */ \
177  plugin_ref plugins_buffer[8]; \
178  } s; \
179  DYNAMIC_ARRAY *plugins= &s.plugins; \
180  plugin_ref *plugins_buffer= s.plugins_buffer; \
181  my_init_dynamic_array2(plugins, sizeof(plugin_ref), \
182  plugins_buffer, 8, 8); \
183  read_lock(); \
184  Observer_info_iterator iter= observer_info_iter(); \
185  Observer_info *info= iter++; \
186  for (; info; info= iter++) \
187  { \
188  plugin_ref plugin= \
189  my_plugin_lock(0, &info->plugin); \
190  if (!plugin) \
191  { \
192  /* plugin is not intialized or deleted, this is not an error */ \
193  r= 0; \
194  break; \
195  } \
196  insert_dynamic(plugins, &plugin); \
197  if (((Observer *)info->observer)->f \
198  && ((Observer *)info->observer)->f args) \
199  { \
200  r= 1; \
201  sql_print_error("Run function '" #f "' in plugin '%s' failed", \
202  info->plugin_int->name.str); \
203  break; \
204  } \
205  } \
206  unlock(); \
207  /*
208  Unlock plugins should be done after we released the Delegate lock
209  to avoid possible deadlock when this is the last user of the
210  plugin, and when we unlock the plugin, it will try to
211  deinitialize the plugin, which will try to lock the Delegate in
212  order to remove the observers.
213  */ \
214  plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \
215  plugins->elements); \
216  delete_dynamic(plugins)
217 
218 
219 int Trans_delegate::after_commit(THD *thd, bool all)
220 {
221  DBUG_ENTER("Trans_delegate::after_commit");
222  Trans_param param = { 0, 0, 0, 0 };
223  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
224 
225  if (is_real_trans)
226  param.flags = true;
227 
228  thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
229 
230  DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos));
231 
232  int ret= 0;
233  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
234  DBUG_RETURN(ret);
235 }
236 
237 int Trans_delegate::after_rollback(THD *thd, bool all)
238 {
239  Trans_param param = { 0, 0, 0, 0 };
240  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
241 
242  if (is_real_trans)
243  param.flags|= TRANS_IS_REAL_TRANS;
244  thd->get_trans_fixed_pos(&param.log_file, &param.log_pos);
245  int ret= 0;
246  FOREACH_OBSERVER(ret, after_rollback, thd, (&param));
247  return ret;
248 }
249 
250 int Binlog_storage_delegate::after_flush(THD *thd,
251  const char *log_file,
252  my_off_t log_pos)
253 {
254  DBUG_ENTER("Binlog_storage_delegate::after_flush");
255  DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
256  log_file, (ulonglong) log_pos));
257  Binlog_storage_param param;
258 
259  int ret= 0;
260  FOREACH_OBSERVER(ret, after_flush, thd, (&param, log_file, log_pos));
261  DBUG_RETURN(ret);
262 }
263 
264 #ifdef HAVE_REPLICATION
265 int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
266  const char *log_file,
267  my_off_t log_pos)
268 {
269  Binlog_transmit_param param;
270  param.flags= flags;
271 
272  int ret= 0;
273  FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
274  return ret;
275 }
276 
277 int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
278 {
279  Binlog_transmit_param param;
280  param.flags= flags;
281 
282  int ret= 0;
283  FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
284  return ret;
285 }
286 
287 int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
288  String *packet)
289 {
290  /* NOTE2ME: Maximum extra header size for each observer, I hope 32
291  bytes should be enough for each Observer to reserve their extra
292  header. If later found this is not enough, we can increase this
293  /HEZX
294  */
295 #define RESERVE_HEADER_SIZE 32
296  unsigned char header[RESERVE_HEADER_SIZE];
297  ulong hlen;
298  Binlog_transmit_param param;
299  param.flags= flags;
300  param.server_id= thd->server_id;
301 
302  int ret= 0;
303  read_lock();
304  Observer_info_iterator iter= observer_info_iter();
305  Observer_info *info= iter++;
306  for (; info; info= iter++)
307  {
308  plugin_ref plugin=
309  my_plugin_lock(thd, &info->plugin);
310  if (!plugin)
311  {
312  ret= 1;
313  break;
314  }
315  hlen= 0;
316  if (((Observer *)info->observer)->reserve_header
317  && ((Observer *)info->observer)->reserve_header(&param,
318  header,
319  RESERVE_HEADER_SIZE,
320  &hlen))
321  {
322  ret= 1;
323  plugin_unlock(thd, plugin);
324  break;
325  }
326  plugin_unlock(thd, plugin);
327  if (hlen == 0)
328  continue;
329  if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
330  {
331  ret= 1;
332  break;
333  }
334  }
335  unlock();
336  return ret;
337 }
338 
339 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
340  String *packet,
341  const char *log_file,
342  my_off_t log_pos)
343 {
344  Binlog_transmit_param param;
345  param.flags= flags;
346 
347  int ret= 0;
348  FOREACH_OBSERVER(ret, before_send_event, thd,
349  (&param, (uchar *)packet->c_ptr(),
350  packet->length(),
351  log_file+dirname_length(log_file), log_pos));
352  return ret;
353 }
354 
355 int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
356  String *packet,
357  const char *skipped_log_file,
358  my_off_t skipped_log_pos)
359 {
360  Binlog_transmit_param param;
361  param.flags= flags;
362 
363  int ret= 0;
364  FOREACH_OBSERVER(ret, after_send_event, thd,
365  (&param, packet->c_ptr(), packet->length(),
366  skipped_log_file+dirname_length(skipped_log_file),
367  skipped_log_pos));
368  return ret;
369 }
370 
371 int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
372 
373 {
374  Binlog_transmit_param param;
375  param.flags= flags;
376 
377  int ret= 0;
378  FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
379  return ret;
380 }
381 
382 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
383  Master_info *mi)
384 {
385  param->mysql= mi->mysql;
386  param->user= const_cast<char *>(mi->get_user());
387  param->host= mi->host;
388  param->port= mi->port;
389  param->master_log_name= const_cast<char *>(mi->get_master_log_name());
390  param->master_log_pos= mi->get_master_log_pos();
391 }
392 
393 int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
394 {
395  Binlog_relay_IO_param param;
396  init_param(&param, mi);
397 
398  int ret= 0;
399  FOREACH_OBSERVER(ret, thread_start, thd, (&param));
400  return ret;
401 }
402 
403 
404 int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
405 {
406 
407  Binlog_relay_IO_param param;
408  init_param(&param, mi);
409 
410  int ret= 0;
411  FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
412  return ret;
413 }
414 
415 int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
416  Master_info *mi,
417  ushort flags)
418 {
419  Binlog_relay_IO_param param;
420  init_param(&param, mi);
421 
422  int ret= 0;
423  FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
424  return ret;
425 }
426 
427 int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
428  const char *packet, ulong len,
429  const char **event_buf,
430  ulong *event_len)
431 {
432  Binlog_relay_IO_param param;
433  init_param(&param, mi);
434 
435  int ret= 0;
436  FOREACH_OBSERVER(ret, after_read_event, thd,
437  (&param, packet, len, event_buf, event_len));
438  return ret;
439 }
440 
441 int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
442  const char *event_buf,
443  ulong event_len,
444  bool synced)
445 {
446  Binlog_relay_IO_param param;
447  init_param(&param, mi);
448 
449  uint32 flags=0;
450  if (synced)
451  flags |= BINLOG_STORAGE_IS_SYNCED;
452 
453  int ret= 0;
454  FOREACH_OBSERVER(ret, after_queue_event, thd,
455  (&param, event_buf, event_len, flags));
456  return ret;
457 }
458 
459 int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
460 
461 {
462  Binlog_relay_IO_param param;
463  init_param(&param, mi);
464 
465  int ret= 0;
466  FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
467  return ret;
468 }
469 #endif /* HAVE_REPLICATION */
470 
471 int register_trans_observer(Trans_observer *observer, void *p)
472 {
473  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
474 }
475 
476 int unregister_trans_observer(Trans_observer *observer, void *p)
477 {
478  return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
479 }
480 
481 int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
482 {
483  DBUG_ENTER("register_binlog_storage_observer");
484  int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
485  DBUG_RETURN(result);
486 }
487 
488 int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
489 {
490  return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
491 }
492 
493 #ifdef HAVE_REPLICATION
494 int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
495 {
496  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
497 }
498 
499 int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
500 {
501  return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
502 }
503 
504 int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
505 {
506  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
507 }
508 
509 int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
510 {
511  return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
512 }
513 #endif /* HAVE_REPLICATION */