MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_gtid_state.cc
1 /* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or
4  modify it under the terms of the GNU General Public License as
5  published by the Free Software Foundation; version 2 of the
6  License.
7 
8  This program is distributed in the hope that it will be useful, but
9  WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
16  02110-1301 USA */
17 
18 #include "rpl_gtid.h"
19 
20 #include "rpl_mi.h"
21 #include "rpl_slave.h"
22 #include "sql_class.h"
23 
24 
26 {
27  DBUG_ENTER("Gtid_state::clear()");
28  // the wrlock implies that no other thread can hold any of the mutexes
29  sid_lock->assert_some_wrlock();
30  logged_gtids.clear();
31  lost_gtids.clear();
32  DBUG_VOID_RETURN;
33 }
34 
35 
36 enum_return_status Gtid_state::acquire_ownership(THD *thd, const Gtid &gtid)
37 {
38  DBUG_ENTER("Gtid_state::acquire_ownership");
39  // caller must take lock on the SIDNO.
40  global_sid_lock->assert_some_lock();
41  gtid_state->assert_sidno_lock_owner(gtid.sidno);
42  DBUG_ASSERT(!logged_gtids.contains_gtid(gtid));
43  DBUG_PRINT("info", ("group=%d:%lld", gtid.sidno, gtid.gno));
44  DBUG_ASSERT(thd->owned_gtid.sidno == 0);
45  if (owned_gtids.add_gtid_owner(gtid, thd->thread_id) != RETURN_STATUS_OK)
46  goto err2;
47  if (thd->get_gtid_next_list() != NULL)
48  {
49 #ifdef HAVE_GTID_NEXT_LIST
50  if (thd->owned_gtid_set._add_gtid(gtid) != RETURN_STATUS_OK)
51  goto err1;
52  thd->owned_gtid.sidno= -1;
53 #else
54  DBUG_ASSERT(0);
55 #endif
56  }
57  else
58  thd->owned_gtid= gtid;
59  RETURN_OK;
60 #ifdef HAVE_GTID_NEXT_LIST
61 err1:
62  owned_gtids.remove_gtid(gtid);
63 #endif
64 err2:
65  if (thd->get_gtid_next_list() != NULL)
66  {
67 #ifdef HAVE_GTID_NEXT_LIST
68  Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
69  Gtid g= git.get();
70  while (g.sidno != 0)
71  {
72  owned_gtids.remove_gtid(g);
73  g= git.get();
74  }
75 #else
76  DBUG_ASSERT(0);
77 #endif
78  }
79  thd->owned_gtid_set.clear();
80  thd->owned_gtid.sidno= 0;
81  RETURN_REPORTED_ERROR;
82 }
83 
84 #ifdef HAVE_GTID_NEXT_LIST
85 void Gtid_state::lock_owned_sidnos(const THD *thd)
86 {
87  if (thd->owned_gtid.sidno == -1)
88  lock_sidnos(&thd->owned_gtid_set);
89  else if (thd->owned_gtid.sidno > 0)
90  lock_sidno(thd->owned_gtid.sidno);
91 }
92 #endif
93 
94 
95 void Gtid_state::unlock_owned_sidnos(const THD *thd)
96 {
97  if (thd->owned_gtid.sidno == -1)
98  {
99 #ifdef HAVE_GTID_NEXT_LIST
100  unlock_sidnos(&thd->owned_gtid_set);
101 #else
102  DBUG_ASSERT(0);
103 #endif
104  }
105  else if (thd->owned_gtid.sidno > 0)
106  {
107  unlock_sidno(thd->owned_gtid.sidno);
108  }
109 }
110 
111 
112 void Gtid_state::broadcast_owned_sidnos(const THD *thd)
113 {
114  if (thd->owned_gtid.sidno == -1)
115  {
116 #ifdef HAVE_GTID_NEXT_LIST
117  broadcast_sidnos(&thd->owned_gtid_set);
118 #else
119  DBUG_ASSERT(0);
120 #endif
121  }
122  else if (thd->owned_gtid.sidno > 0)
123  {
124  broadcast_sidno(thd->owned_gtid.sidno);
125  }
126 }
127 
128 
129 enum_return_status Gtid_state::update_on_flush(THD *thd)
130 {
131  DBUG_ENTER("Gtid_state::update_on_flush");
132  enum_return_status ret= RETURN_STATUS_OK;
133 
134  // Caller must take lock on the SIDNO.
135  global_sid_lock->assert_some_lock();
136 
137  if (thd->owned_gtid.sidno == -1)
138  {
139 #ifdef HAVE_GTID_NEXT_LIST
140  rpl_sidno prev_sidno= 0;
141  Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
142  Gtid g= git.get();
143  while (g.sidno != 0)
144  {
145  if (g.sidno != prev_sidno)
146  sid_locks.lock(g.sidno);
147  if (ret == RETURN_STATUS_OK)
148  ret= logged_gtids._add_gtid(g);
149  git.next();
150  g= git.get();
151  }
152 #else
153  DBUG_ASSERT(0);
154 #endif
155  }
156  else if (thd->owned_gtid.sidno > 0)
157  {
158  lock_sidno(thd->owned_gtid.sidno);
159  ret= logged_gtids._add_gtid(thd->owned_gtid);
160  }
161 
162  /*
163  There may be commands that cause implicit commits, e.g.
164  SET AUTOCOMMIT=1 may cause the previous statements to commit
165  without executing a COMMIT command or be on auto-commit mode.
166  Although we set GTID_NEXT type to UNDEFINED on
167  Gtid_state::update_on_commit(), we also set it here to do it
168  as soon as possible.
169  */
170  thd->variables.gtid_next.set_undefined();
171  broadcast_owned_sidnos(thd);
172  unlock_owned_sidnos(thd);
173 
174  DBUG_RETURN(ret);
175 }
176 
177 
179 {
180  DBUG_ENTER("Gtid_state::update_on_commit");
181  update_owned_gtids_impl(thd, true);
182  DBUG_VOID_RETURN;
183 }
184 
185 
187 {
188  DBUG_ENTER("Gtid_state::update_on_rollback");
189  update_owned_gtids_impl(thd, false);
190  DBUG_VOID_RETURN;
191 }
192 
193 
194 void Gtid_state::update_owned_gtids_impl(THD *thd, bool is_commit)
195 {
196  DBUG_ENTER("Gtid_state::update_owned_gtids_impl");
197 
198  // Caller must take lock on the SIDNO.
199  global_sid_lock->assert_some_lock();
200 
201  if (thd->owned_gtid.sidno == -1)
202  {
203 #ifdef HAVE_GTID_NEXT_LIST
204  rpl_sidno prev_sidno= 0;
205  Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
206  Gtid g= git.get();
207  while (g.sidno != 0)
208  {
209  if (g.sidno != prev_sidno)
210  sid_locks.lock(g.sidno);
211  owned_gtids.remove_gtid(g);
212  git.next();
213  g= git.get();
214  }
215 #else
216  DBUG_ASSERT(0);
217 #endif
218  }
219  else if (thd->owned_gtid.sidno > 0)
220  {
221  lock_sidno(thd->owned_gtid.sidno);
222  owned_gtids.remove_gtid(thd->owned_gtid);
223  }
224 
225  /*
226  There may be commands that cause implicit commits, e.g.
227  SET AUTOCOMMIT=1 may cause the previous statements to commit
228  without executing a COMMIT command or be on auto-commit mode.
229  */
230  thd->variables.gtid_next.set_undefined();
231  if (!is_commit)
232  broadcast_owned_sidnos(thd);
233  unlock_owned_sidnos(thd);
234 
235  thd->clear_owned_gtids();
236 
237  DBUG_VOID_RETURN;
238 }
239 
240 
241 rpl_gno Gtid_state::get_automatic_gno(rpl_sidno sidno) const
242 {
243  DBUG_ENTER("Gtid_state::get_automatic_gno");
244  //logged_gtids.ensure_sidno(sidno);
245  Gtid_set::Const_interval_iterator ivit(&logged_gtids, sidno);
246  Gtid next_candidate= { sidno, 1 };
247  while (true)
248  {
249  const Gtid_set::Interval *iv= ivit.get();
250  rpl_gno next_interval_start= iv != NULL ? iv->start : MAX_GNO;
251  while (next_candidate.gno < next_interval_start)
252  {
253  if (owned_gtids.get_owner(next_candidate) == 0)
254  DBUG_RETURN(next_candidate.gno);
255  next_candidate.gno++;
256  }
257  if (iv == NULL)
258  {
259  my_error(ER_GNO_EXHAUSTED, MYF(0));
260  DBUG_RETURN(-1);
261  }
262  next_candidate.gno= iv->end;
263  ivit.next();
264  }
265 }
266 
267 
268 void Gtid_state::wait_for_gtid(THD *thd, const Gtid &gtid)
269 {
270  DBUG_ENTER("Gtid_state::wait_for_gtid");
271  // Enter cond, wait, exit cond.
272  PSI_stage_info old_stage;
273  DBUG_PRINT("info", ("SIDNO=%d GNO=%lld owner(sidno,gno)=%lu thread_id=%lu",
274  gtid.sidno, gtid.gno,
275  owned_gtids.get_owner(gtid), thd->thread_id));
276  DBUG_ASSERT(owned_gtids.get_owner(gtid) != thd->thread_id);
277  sid_locks.enter_cond(thd, gtid.sidno,
278  &stage_waiting_for_gtid_to_be_written_to_binary_log,
279  &old_stage);
280  //while (get_owner(g.sidno, g.gno) != 0 && !thd->killed && !abort_loop)
281  sid_locks.wait(gtid.sidno);
282  thd->EXIT_COND(&old_stage);
283 
284  DBUG_VOID_RETURN;
285 }
286 
287 
288 #ifdef HAVE_GTID_NEXT_LIST
289 void Gtid_state::lock_sidnos(const Gtid_set *gs)
290 {
291  DBUG_ASSERT(gs);
292  rpl_sidno max_sidno= gs->get_max_sidno();
293  for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
294  if (gs->contains_sidno(sidno))
295  lock_sidno(sidno);
296 }
297 
298 
299 void Gtid_state::unlock_sidnos(const Gtid_set *gs)
300 {
301  DBUG_ASSERT(gs);
302  rpl_sidno max_sidno= gs->get_max_sidno();
303  for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
304  if (gs->contains_sidno(sidno))
305  unlock_sidno(sidno);
306 }
307 
308 
309 void Gtid_state::broadcast_sidnos(const Gtid_set *gs)
310 {
311  DBUG_ASSERT(gs);
312  rpl_sidno max_sidno= gs->get_max_sidno();
313  for (rpl_sidno sidno= 1; sidno <= max_sidno; sidno++)
314  if (gs->contains_sidno(sidno))
315  broadcast_sidno(sidno);
316 }
317 #endif
318 
319 
320 enum_return_status Gtid_state::ensure_sidno()
321 {
322  DBUG_ENTER("Gtid_state::ensure_sidno");
323  sid_lock->assert_some_wrlock();
324  rpl_sidno sidno= sid_map->get_max_sidno();
325  if (sidno > 0)
326  {
327  // The lock may be temporarily released during one of the calls to
328  // ensure_sidno or ensure_index. Hence, we must re-check the
329  // condition after the calls.
330  PROPAGATE_REPORTED_ERROR(logged_gtids.ensure_sidno(sidno));
331  PROPAGATE_REPORTED_ERROR(lost_gtids.ensure_sidno(sidno));
332  PROPAGATE_REPORTED_ERROR(owned_gtids.ensure_sidno(sidno));
333  PROPAGATE_REPORTED_ERROR(sid_locks.ensure_index(sidno));
334  sidno= sid_map->get_max_sidno();
335  DBUG_ASSERT(logged_gtids.get_max_sidno() >= sidno);
336  DBUG_ASSERT(lost_gtids.get_max_sidno() >= sidno);
337  DBUG_ASSERT(owned_gtids.get_max_sidno() >= sidno);
338  DBUG_ASSERT(sid_locks.get_max_index() >= sidno);
339  }
340  RETURN_OK;
341 }
342 
343 
344 enum_return_status Gtid_state::add_lost_gtids(const char *text)
345 {
346  DBUG_ENTER("Gtid_state::add_lost_gtids()");
347  sid_lock->assert_some_wrlock();
348 
349  DBUG_PRINT("info", ("add_lost_gtids '%s'", text));
350 
351  if (!logged_gtids.is_empty())
352  {
353  BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY)),
354  (ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY,
355  MYF(0)));
356  RETURN_REPORTED_ERROR;
357  }
358  if (!owned_gtids.is_empty())
359  {
360  BINLOG_ERROR((ER(ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY)),
361  (ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY,
362  MYF(0)));
363  RETURN_REPORTED_ERROR;
364  }
365  DBUG_ASSERT(lost_gtids.is_empty());
366 
367  PROPAGATE_REPORTED_ERROR(lost_gtids.add_gtid_text(text));
368  PROPAGATE_REPORTED_ERROR(logged_gtids.add_gtid_text(text));
369 
370  DBUG_RETURN(RETURN_STATUS_OK);
371 }
372 
373 
375 {
376  DBUG_ENTER("Gtid_state::init()");
377 
378  global_sid_lock->assert_some_lock();
379 
380  rpl_sid server_sid;
381  if (server_sid.parse(server_uuid) != RETURN_STATUS_OK)
382  DBUG_RETURN(1);
383  rpl_sidno sidno= sid_map->add_sid(server_sid);
384  if (sidno <= 0)
385  DBUG_RETURN(1);
386  server_sidno= sidno;
387 
388  DBUG_RETURN(0);
389 }