MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
semisync_master_plugin.cc
1 /* Copyright (C) 2007 Google Inc.
2  Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3  Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17 
18 
19 #include "semisync_master.h"
20 #include "sql_class.h" // THD
21 
22 ReplSemiSyncMaster repl_semisync;
23 
24 C_MODE_START
25 
26 int repl_semi_report_binlog_update(Binlog_storage_param *param,
27  const char *log_file,
28  my_off_t log_pos)
29 {
30  int error= 0;
31 
32  if (repl_semisync.getMasterEnabled())
33  {
34  /*
35  Let us store the binlog file name and the position, so that
36  we know how long to wait for the binlog to the replicated to
37  the slave in synchronous replication.
38  */
39  error= repl_semisync.writeTranxInBinlog(log_file,
40  log_pos);
41  }
42 
43  return error;
44 }
45 
46 int repl_semi_request_commit(Trans_param *param)
47 {
48  return 0;
49 }
50 
51 int repl_semi_report_commit(Trans_param *param)
52 {
53 
54  bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
55 
56  if (is_real_trans && param->log_pos)
57  {
58  const char *binlog_name= param->log_file;
59  return repl_semisync.commitTrx(binlog_name, param->log_pos);
60  }
61  return 0;
62 }
63 
64 int repl_semi_report_rollback(Trans_param *param)
65 {
66  return repl_semi_report_commit(param);
67 }
68 
69 int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
70  const char *log_file,
71  my_off_t log_pos)
72 {
73  bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
74 
75  if (semi_sync_slave)
76  {
77  /* One more semi-sync slave */
78  repl_semisync.add_slave();
79 
80  /*
81  Let's assume this semi-sync slave has already received all
82  binlog events before the filename and position it requests.
83  */
84  repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
85  }
86  sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
87  semi_sync_slave ? "semi-sync" : "asynchronous",
88  param->server_id, log_file, (unsigned long)log_pos);
89 
90  return 0;
91 }
92 
93 int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
94 {
95  bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
96 
97  sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
98  semi_sync_slave ? "semi-sync" : "asynchronous",
99  param->server_id);
100  if (semi_sync_slave)
101  {
102  /* One less semi-sync slave */
103  repl_semisync.remove_slave();
104  }
105  return 0;
106 }
107 
108 int repl_semi_reserve_header(Binlog_transmit_param *param,
109  unsigned char *header,
110  unsigned long size, unsigned long *len)
111 {
112  *len += repl_semisync.reserveSyncHeader(header, size);
113  return 0;
114 }
115 
116 int repl_semi_before_send_event(Binlog_transmit_param *param,
117  unsigned char *packet, unsigned long len,
118  const char *log_file, my_off_t log_pos)
119 {
120  return repl_semisync.updateSyncHeader(packet,
121  log_file,
122  log_pos,
123  param->server_id);
124 }
125 
126 int repl_semi_after_send_event(Binlog_transmit_param *param,
127  const char *event_buf, unsigned long len,
128  const char * skipped_log_file,
129  my_off_t skipped_log_pos)
130 {
131  if (repl_semisync.is_semi_sync_slave())
132  {
133  if(skipped_log_pos>0)
134  repl_semisync.skipSlaveReply(event_buf, param->server_id,
135  skipped_log_file, skipped_log_pos);
136  else
137  {
138  THD *thd= current_thd;
139  /*
140  Possible errors in reading slave reply are ignored deliberately
141  because we do not want dump thread to quit on this. Error
142  messages are already reported.
143  */
144  (void) repl_semisync.readSlaveReply(&thd->net,
145  param->server_id, event_buf);
146  thd->clear_error();
147  }
148  }
149  return 0;
150 }
151 
152 int repl_semi_reset_master(Binlog_transmit_param *param)
153 {
154  if (repl_semisync.resetMaster())
155  return 1;
156  return 0;
157 }
158 
159 C_MODE_END
160 
161 /*
162  semisync system variables
163  */
164 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
165  SYS_VAR *var,
166  void *ptr,
167  const void *val);
168 
169 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
170  SYS_VAR *var,
171  void *ptr,
172  const void *val);
173 
174 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
175  SYS_VAR *var,
176  void *ptr,
177  const void *val);
178 
179 static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
180  PLUGIN_VAR_OPCMDARG,
181  "Enable semi-synchronous replication master (disabled by default). ",
182  NULL, // check
183  &fix_rpl_semi_sync_master_enabled, // update
184  0);
185 
186 static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout,
187  PLUGIN_VAR_OPCMDARG,
188  "The timeout value (in ms) for semi-synchronous replication in the master",
189  NULL, // check
190  fix_rpl_semi_sync_master_timeout, // update
191  10000, 0, ~0UL, 1);
192 
193 static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave,
194  PLUGIN_VAR_OPCMDARG,
195  "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ",
196  NULL, // check
197  NULL, // update
198  1);
199 
200 static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
201  PLUGIN_VAR_OPCMDARG,
202  "The tracing level for semi-sync replication.",
203  NULL, // check
204  &fix_rpl_semi_sync_master_trace_level, // update
205  32, 0, ~0UL, 1);
206 
207 static SYS_VAR* semi_sync_master_system_vars[]= {
208  MYSQL_SYSVAR(enabled),
209  MYSQL_SYSVAR(timeout),
210  MYSQL_SYSVAR(wait_no_slave),
211  MYSQL_SYSVAR(trace_level),
212  NULL,
213 };
214 
215 
216 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
217  SYS_VAR *var,
218  void *ptr,
219  const void *val)
220 {
221  *(unsigned long *)ptr= *(unsigned long *)val;
222  repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout);
223  return;
224 }
225 
226 static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd,
227  SYS_VAR *var,
228  void *ptr,
229  const void *val)
230 {
231  *(unsigned long *)ptr= *(unsigned long *)val;
232  repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level);
233  return;
234 }
235 
236 static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
237  SYS_VAR *var,
238  void *ptr,
239  const void *val)
240 {
241  *(char *)ptr= *(char *)val;
242  if (rpl_semi_sync_master_enabled)
243  {
244  if (repl_semisync.enableMaster() != 0)
245  rpl_semi_sync_master_enabled = false;
246  }
247  else
248  {
249  if (repl_semisync.disableMaster() != 0)
250  rpl_semi_sync_master_enabled = true;
251  }
252 
253  return;
254 }
255 
256 Trans_observer trans_observer = {
257  sizeof(Trans_observer), // len
258 
259  repl_semi_report_commit, // after_commit
260  repl_semi_report_rollback, // after_rollback
261 };
262 
263 Binlog_storage_observer storage_observer = {
264  sizeof(Binlog_storage_observer), // len
265 
266  repl_semi_report_binlog_update, // report_update
267 };
268 
269 Binlog_transmit_observer transmit_observer = {
270  sizeof(Binlog_transmit_observer), // len
271 
272  repl_semi_binlog_dump_start, // start
273  repl_semi_binlog_dump_end, // stop
274  repl_semi_reserve_header, // reserve_header
275  repl_semi_before_send_event, // before_send_event
276  repl_semi_after_send_event, // after_send_event
277  repl_semi_reset_master, // reset
278 };
279 
280 
281 #define SHOW_FNAME(name) \
282  rpl_semi_sync_master_show_##name
283 
284 #define DEF_SHOW_FUNC(name, show_type) \
285  static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
286  { \
287  repl_semisync.setExportStats(); \
288  var->type= show_type; \
289  var->value= (char *)&rpl_semi_sync_master_##name; \
290  return 0; \
291  }
292 
293 DEF_SHOW_FUNC(status, SHOW_BOOL)
294 DEF_SHOW_FUNC(clients, SHOW_LONG)
295 DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
296 DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
297 DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
298 DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
299 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
300 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
301 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
302 
303 
304 /* plugin status variables */
305 static SHOW_VAR semi_sync_master_status_vars[]= {
306  {"Rpl_semi_sync_master_status",
307  (char*) &SHOW_FNAME(status),
308  SHOW_FUNC},
309  {"Rpl_semi_sync_master_clients",
310  (char*) &SHOW_FNAME(clients),
311  SHOW_FUNC},
312  {"Rpl_semi_sync_master_yes_tx",
313  (char*) &rpl_semi_sync_master_yes_transactions,
314  SHOW_LONG},
315  {"Rpl_semi_sync_master_no_tx",
316  (char*) &rpl_semi_sync_master_no_transactions,
317  SHOW_LONG},
318  {"Rpl_semi_sync_master_wait_sessions",
319  (char*) &SHOW_FNAME(wait_sessions),
320  SHOW_FUNC},
321  {"Rpl_semi_sync_master_no_times",
322  (char*) &rpl_semi_sync_master_off_times,
323  SHOW_LONG},
324  {"Rpl_semi_sync_master_timefunc_failures",
325  (char*) &rpl_semi_sync_master_timefunc_fails,
326  SHOW_LONG},
327  {"Rpl_semi_sync_master_wait_pos_backtraverse",
328  (char*) &rpl_semi_sync_master_wait_pos_backtraverse,
329  SHOW_LONG},
330  {"Rpl_semi_sync_master_tx_wait_time",
331  (char*) &SHOW_FNAME(trx_wait_time),
332  SHOW_FUNC},
333  {"Rpl_semi_sync_master_tx_waits",
334  (char*) &SHOW_FNAME(trx_wait_num),
335  SHOW_FUNC},
336  {"Rpl_semi_sync_master_tx_avg_wait_time",
337  (char*) &SHOW_FNAME(avg_trx_wait_time),
338  SHOW_FUNC},
339  {"Rpl_semi_sync_master_net_wait_time",
340  (char*) &SHOW_FNAME(net_wait_time),
341  SHOW_FUNC},
342  {"Rpl_semi_sync_master_net_waits",
343  (char*) &SHOW_FNAME(net_wait_num),
344  SHOW_FUNC},
345  {"Rpl_semi_sync_master_net_avg_wait_time",
346  (char*) &SHOW_FNAME(avg_net_wait_time),
347  SHOW_FUNC},
348  {NULL, NULL, SHOW_LONG},
349 };
350 
351 #ifdef HAVE_PSI_INTERFACE
352 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
353 
354 static PSI_mutex_info all_semisync_mutexes[]=
355 {
356  { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
357 };
358 
359 PSI_cond_key key_ss_cond_COND_binlog_send_;
360 
361 static PSI_cond_info all_semisync_conds[]=
362 {
363  { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
364 };
365 #endif /* HAVE_PSI_INTERFACE */
366 
367 PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
368 { 0, "Waiting for semi-sync ACK from slave", 0};
369 
370 #ifdef HAVE_PSI_INTERFACE
371 PSI_stage_info *all_semisync_stages[]=
372 {
373  & stage_waiting_for_semi_sync_ack_from_slave
374 };
375 
376 static void init_semisync_psi_keys(void)
377 {
378  const char* category= "semisync";
379  int count;
380 
381  count= array_elements(all_semisync_mutexes);
382  mysql_mutex_register(category, all_semisync_mutexes, count);
383 
384  count= array_elements(all_semisync_conds);
385  mysql_cond_register(category, all_semisync_conds, count);
386 
387  count= array_elements(all_semisync_stages);
388  mysql_stage_register(category, all_semisync_stages, count);
389 }
390 #endif /* HAVE_PSI_INTERFACE */
391 
392 static int semi_sync_master_plugin_init(void *p)
393 {
394 #ifdef HAVE_PSI_INTERFACE
395  init_semisync_psi_keys();
396 #endif
397 
398  if (repl_semisync.initObject())
399  return 1;
400  if (register_trans_observer(&trans_observer, p))
401  return 1;
402  if (register_binlog_storage_observer(&storage_observer, p))
403  return 1;
404  if (register_binlog_transmit_observer(&transmit_observer, p))
405  return 1;
406  return 0;
407 }
408 
409 static int semi_sync_master_plugin_deinit(void *p)
410 {
411  if (unregister_trans_observer(&trans_observer, p))
412  {
413  sql_print_error("unregister_trans_observer failed");
414  return 1;
415  }
416  if (unregister_binlog_storage_observer(&storage_observer, p))
417  {
418  sql_print_error("unregister_binlog_storage_observer failed");
419  return 1;
420  }
421  if (unregister_binlog_transmit_observer(&transmit_observer, p))
422  {
423  sql_print_error("unregister_binlog_transmit_observer failed");
424  return 1;
425  }
426  sql_print_information("unregister_replicator OK");
427  return 0;
428 }
429 
430 struct Mysql_replication semi_sync_master_plugin= {
431  MYSQL_REPLICATION_INTERFACE_VERSION
432 };
433 
434 /*
435  Plugin library descriptor
436 */
437 mysql_declare_plugin(semi_sync_master)
438 {
439  MYSQL_REPLICATION_PLUGIN,
440  &semi_sync_master_plugin,
441  "rpl_semi_sync_master",
442  "He Zhenxing",
443  "Semi-synchronous replication master",
444  PLUGIN_LICENSE_GPL,
445  semi_sync_master_plugin_init, /* Plugin Init */
446  semi_sync_master_plugin_deinit, /* Plugin Deinit */
447  0x0100 /* 1.0 */,
448  semi_sync_master_status_vars, /* status variables */
449  semi_sync_master_system_vars, /* system variables */
450  NULL, /* config options */
451  0, /* flags */
452 }
453 mysql_declare_plugin_end;