MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
trx0purge.cc
Go to the documentation of this file.
1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************/
26 #include "trx0purge.h"
27 
28 #ifdef UNIV_NONINL
29 #include "trx0purge.ic"
30 #endif
31 
32 #include "fsp0fsp.h"
33 #include "mach0data.h"
34 #include "trx0rseg.h"
35 #include "trx0trx.h"
36 #include "trx0roll.h"
37 #include "read0read.h"
38 #include "fut0fut.h"
39 #include "que0que.h"
40 #include "row0purge.h"
41 #include "row0upd.h"
42 #include "trx0rec.h"
43 #include "srv0srv.h"
44 #include "srv0start.h"
45 #include "os0thread.h"
46 #include "srv0mon.h"
47 #include "mtr0log.h"
48 
50 UNIV_INTERN ulong srv_max_purge_lag = 0;
51 
53 UNIV_INTERN ulong srv_max_purge_lag_delay = 0;
54 
56 UNIV_INTERN trx_purge_t* purge_sys = NULL;
57 
61 
62 #ifdef UNIV_PFS_RWLOCK
63 /* Key to register trx_purge_latch with performance schema */
64 UNIV_INTERN mysql_pfs_key_t trx_purge_latch_key;
65 #endif /* UNIV_PFS_RWLOCK */
66 
67 #ifdef UNIV_PFS_MUTEX
68 /* Key to register purge_sys_bh_mutex with performance schema */
69 UNIV_INTERN mysql_pfs_key_t purge_sys_bh_mutex_key;
70 #endif /* UNIV_PFS_MUTEX */
71 
72 #ifdef UNIV_DEBUG
73 UNIV_INTERN my_bool srv_purge_view_update_only_debug;
74 #endif /* UNIV_DEBUG */
75 
76 /****************************************************************/
80 static
81 que_t*
82 trx_purge_graph_build(
83 /*==================*/
84  trx_t* trx,
85  ulint n_purge_threads)
87 {
88  ulint i;
90  que_fork_t* fork;
91 
92  heap = mem_heap_create(512);
93  fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
94  fork->trx = trx;
95 
96  for (i = 0; i < n_purge_threads; ++i) {
97  que_thr_t* thr;
98 
99  thr = que_thr_create(fork, heap);
100 
101  thr->child = row_purge_node_create(thr, heap);
102  }
103 
104  return(fork);
105 }
106 
107 /********************************************************************/
110 UNIV_INTERN
111 void
113 /*=================*/
114  ulint n_purge_threads,
116  ib_bh_t* ib_bh)
118 {
119  purge_sys = static_cast<trx_purge_t*>(mem_zalloc(sizeof(*purge_sys)));
120 
123 
124  /* Take ownership of ib_bh, we are responsible for freeing it. */
125  purge_sys->ib_bh = ib_bh;
126 
127  rw_lock_create(trx_purge_latch_key,
128  &purge_sys->latch, SYNC_PURGE_LATCH);
129 
130  mutex_create(
131  purge_sys_bh_mutex_key, &purge_sys->bh_mutex,
132  SYNC_PURGE_QUEUE);
133 
135 
136  ut_a(n_purge_threads > 0);
137 
138  purge_sys->sess = sess_open();
139 
141 
143 
144  /* A purge transaction is not a real transaction, we use a transaction
145  here only because the query threads code requires it. It is otherwise
146  quite unnecessary. We should get rid of it eventually. */
147  purge_sys->trx->id = 0;
149  purge_sys->trx->state = TRX_STATE_ACTIVE;
150  purge_sys->trx->op_info = "purge trx";
151 
152  purge_sys->query = trx_purge_graph_build(
153  purge_sys->trx, n_purge_threads);
154 
156 }
157 
158 /************************************************************************
159 Frees the global purge system control structure. */
160 UNIV_INTERN
161 void
163 /*======================*/
164 {
166 
167  ut_a(purge_sys->trx->id == 0);
169 
170  purge_sys->trx->state = TRX_STATE_NOT_STARTED;
171 
173 
174  purge_sys->sess = NULL;
175 
176  purge_sys->view = NULL;
177 
178  rw_lock_free(&purge_sys->latch);
179  mutex_free(&purge_sys->bh_mutex);
180 
182 
184 
186 
187  purge_sys->event = NULL;
188 
190 
191  purge_sys = NULL;
192 }
193 
194 /*================ UNDO LOG HISTORY LIST =============================*/
195 
196 /********************************************************************/
199 UNIV_INTERN
200 void
202 /*=================================*/
203  trx_t* trx,
204  page_t* undo_page,
206  mtr_t* mtr)
207 {
208  trx_undo_t* undo;
209  trx_rseg_t* rseg;
210  trx_rsegf_t* rseg_header;
211  trx_ulogf_t* undo_header;
212 
213  undo = trx->update_undo;
214  rseg = undo->rseg;
215 
216  rseg_header = trx_rsegf_get(
217  undo->rseg->space, undo->rseg->zip_size, undo->rseg->page_no,
218  mtr);
219 
220  undo_header = undo_page + undo->hdr_offset;
221 
222  if (undo->state != TRX_UNDO_CACHED) {
223  ulint hist_size;
224 #ifdef UNIV_DEBUG
225  trx_usegf_t* seg_header = undo_page + TRX_UNDO_SEG_HDR;
226 #endif /* UNIV_DEBUG */
227 
228  /* The undo log segment will not be reused */
229 
230  if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {
231  fprintf(stderr,
232  "InnoDB: Error: undo->id is %lu\n",
233  (ulong) undo->id);
234  ut_error;
235  }
236 
237  trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
238 
239  MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
240 
241  hist_size = mtr_read_ulint(
242  rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
243 
244  ut_ad(undo->size == flst_get_len(
245  seg_header + TRX_UNDO_PAGE_LIST, mtr));
246 
248  rseg_header + TRX_RSEG_HISTORY_SIZE,
249  hist_size + undo->size, MLOG_4BYTES, mtr);
250  }
251 
252  /* Add the log as the first in the history list */
253  flst_add_first(rseg_header + TRX_RSEG_HISTORY,
254  undo_header + TRX_UNDO_HISTORY_NODE, mtr);
255 
256 #ifdef HAVE_ATOMIC_BUILTINS
257  os_atomic_increment_ulint(&trx_sys->rseg_history_len, 1);
258 #else
259  mutex_enter(&trx_sys->mutex);
261  mutex_exit(&trx_sys->mutex);
262 #endif /* HAVE_ATOMIC_BUILTINS */
263 
265 
266  /* Write the trx number to the undo log header */
267  mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
268 
269  /* Write information about delete markings to the undo log header */
270 
271  if (!undo->del_marks) {
272  mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
273  MLOG_2BYTES, mtr);
274  }
275 
276  if (rseg->last_page_no == FIL_NULL) {
277  rseg->last_page_no = undo->hdr_page_no;
278  rseg->last_offset = undo->hdr_offset;
279  rseg->last_trx_no = trx->no;
280  rseg->last_del_marks = undo->del_marks;
281  }
282 }
283 
284 /**********************************************************************/
287 static
288 void
289 trx_purge_free_segment(
290 /*===================*/
291  trx_rseg_t* rseg,
292  fil_addr_t hdr_addr,
293  ulint n_removed_logs)
296 {
297  mtr_t mtr;
299  trx_ulogf_t* log_hdr;
300  trx_usegf_t* seg_hdr;
301  ulint seg_size;
302  ulint hist_size;
303  ibool marked = FALSE;
304 
305  /* fputs("Freeing an update undo log segment\n", stderr); */
306 
307  for (;;) {
308  page_t* undo_page;
309 
310  mtr_start(&mtr);
311 
312  mutex_enter(&rseg->mutex);
313 
314  rseg_hdr = trx_rsegf_get(
315  rseg->space, rseg->zip_size, rseg->page_no, &mtr);
316 
317  undo_page = trx_undo_page_get(
318  rseg->space, rseg->zip_size, hdr_addr.page, &mtr);
319 
320  seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
321  log_hdr = undo_page + hdr_addr.boffset;
322 
323  /* Mark the last undo log totally purged, so that if the
324  system crashes, the tail of the undo log will not get accessed
325  again. The list of pages in the undo log tail gets inconsistent
326  during the freeing of the segment, and therefore purge should
327  not try to access them again. */
328 
329  if (!marked) {
331  log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
332  MLOG_2BYTES, &mtr);
333 
334  marked = TRUE;
335  }
336 
338  seg_hdr + TRX_UNDO_FSEG_HEADER, &mtr)) {
339 
340  break;
341  }
342 
343  mutex_exit(&rseg->mutex);
344 
345  mtr_commit(&mtr);
346  }
347 
348  /* The page list may now be inconsistent, but the length field
349  stored in the list base node tells us how big it was before we
350  started the freeing. */
351 
352  seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST, &mtr);
353 
354  /* We may free the undo log segment header page; it must be freed
355  within the same mtr as the undo log header is removed from the
356  history list: otherwise, in case of a database crash, the segment
357  could become inaccessible garbage in the file space. */
358 
359  flst_cut_end(rseg_hdr + TRX_RSEG_HISTORY,
360  log_hdr + TRX_UNDO_HISTORY_NODE, n_removed_logs, &mtr);
361 
362 #ifdef HAVE_ATOMIC_BUILTINS
363  os_atomic_decrement_ulint(&trx_sys->rseg_history_len, n_removed_logs);
364 #else
365  mutex_enter(&trx_sys->mutex);
366  trx_sys->rseg_history_len -= n_removed_logs;
367  mutex_exit(&trx_sys->mutex);
368 #endif /* HAVE_ATOMIC_BUILTINS */
369 
370  do {
371 
372  /* Here we assume that a file segment with just the header
373  page can be freed in a few steps, so that the buffer pool
374  is not flooded with bufferfixed pages: see the note in
375  fsp0fsp.cc. */
376 
377  } while(!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, &mtr));
378 
379  hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
380  MLOG_4BYTES, &mtr);
381  ut_ad(hist_size >= seg_size);
382 
383  mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
384  hist_size - seg_size, MLOG_4BYTES, &mtr);
385 
386  ut_ad(rseg->curr_size >= seg_size);
387 
388  rseg->curr_size -= seg_size;
389 
390  mutex_exit(&(rseg->mutex));
391 
392  mtr_commit(&mtr);
393 }
394 
395 /********************************************************************/
397 static
398 void
399 trx_purge_truncate_rseg_history(
400 /*============================*/
401  trx_rseg_t* rseg,
402  const purge_iter_t* limit)
403 {
404  fil_addr_t hdr_addr;
405  fil_addr_t prev_hdr_addr;
407  page_t* undo_page;
408  trx_ulogf_t* log_hdr;
409  trx_usegf_t* seg_hdr;
410  ulint n_removed_logs = 0;
411  mtr_t mtr;
412  trx_id_t undo_trx_no;
413 
414  mtr_start(&mtr);
415  mutex_enter(&(rseg->mutex));
416 
417  rseg_hdr = trx_rsegf_get(rseg->space, rseg->zip_size,
418  rseg->page_no, &mtr);
419 
420  hdr_addr = trx_purge_get_log_from_hist(
421  flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
422 loop:
423  if (hdr_addr.page == FIL_NULL) {
424 
425  mutex_exit(&(rseg->mutex));
426 
427  mtr_commit(&mtr);
428 
429  return;
430  }
431 
432  undo_page = trx_undo_page_get(rseg->space, rseg->zip_size,
433  hdr_addr.page, &mtr);
434 
435  log_hdr = undo_page + hdr_addr.boffset;
436 
437  undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
438 
439  if (undo_trx_no >= limit->trx_no) {
440 
441  if (undo_trx_no == limit->trx_no) {
442 
444  rseg, rseg->space, hdr_addr.page,
445  hdr_addr.boffset, limit->undo_no);
446  }
447 
448 #ifdef HAVE_ATOMIC_BUILTINS
449  os_atomic_decrement_ulint(
450  &trx_sys->rseg_history_len, n_removed_logs);
451 #else
452  mutex_enter(&trx_sys->mutex);
453  trx_sys->rseg_history_len -= n_removed_logs;
454  mutex_exit(&trx_sys->mutex);
455 #endif /* HAVE_ATOMIC_BUILTINS */
456 
457  flst_truncate_end(rseg_hdr + TRX_RSEG_HISTORY,
458  log_hdr + TRX_UNDO_HISTORY_NODE,
459  n_removed_logs, &mtr);
460 
461  mutex_exit(&(rseg->mutex));
462  mtr_commit(&mtr);
463 
464  return;
465  }
466 
467  prev_hdr_addr = trx_purge_get_log_from_hist(
468  flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
469  n_removed_logs++;
470 
471  seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
472 
473  if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
474  && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
475 
476  /* We can free the whole log segment */
477 
478  mutex_exit(&(rseg->mutex));
479  mtr_commit(&mtr);
480 
481  trx_purge_free_segment(rseg, hdr_addr, n_removed_logs);
482 
483  n_removed_logs = 0;
484  } else {
485  mutex_exit(&(rseg->mutex));
486  mtr_commit(&mtr);
487  }
488 
489  mtr_start(&mtr);
490  mutex_enter(&(rseg->mutex));
491 
492  rseg_hdr = trx_rsegf_get(rseg->space, rseg->zip_size,
493  rseg->page_no, &mtr);
494 
495  hdr_addr = prev_hdr_addr;
496 
497  goto loop;
498 }
499 
500 /********************************************************************/
503 static
504 void
505 trx_purge_truncate_history(
506 /*========================*/
507  purge_iter_t* limit,
508  const read_view_t* view)
509 {
510  ulint i;
511 
512  /* We play safe and set the truncate limit at most to the purge view
513  low_limit number, though this is not necessary */
514 
515  if (limit->trx_no >= view->low_limit_no) {
516  limit->trx_no = view->low_limit_no;
517  limit->undo_no = 0;
518  }
519 
520  ut_ad(limit->trx_no <= purge_sys->view->low_limit_no);
521 
522  for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
523  trx_rseg_t* rseg = trx_sys->rseg_array[i];
524 
525  if (rseg != NULL) {
526  ut_a(rseg->id == i);
527  trx_purge_truncate_rseg_history(rseg, limit);
528  }
529  }
530 }
531 
532 /***********************************************************************/
535 static
536 void
537 trx_purge_rseg_get_next_history_log(
538 /*================================*/
539  trx_rseg_t* rseg,
540  ulint* n_pages_handled)
542 {
543  const void* ptr;
544  page_t* undo_page;
545  trx_ulogf_t* log_hdr;
546  fil_addr_t prev_log_addr;
547  trx_id_t trx_no;
548  ibool del_marks;
549  mtr_t mtr;
550  rseg_queue_t rseg_queue;
551 
552  mutex_enter(&(rseg->mutex));
553 
554  ut_a(rseg->last_page_no != FIL_NULL);
555 
556  purge_sys->iter.trx_no = rseg->last_trx_no + 1;
557  purge_sys->iter.undo_no = 0;
558  purge_sys->next_stored = FALSE;
559 
560  mtr_start(&mtr);
561 
562  undo_page = trx_undo_page_get_s_latched(
563  rseg->space, rseg->zip_size, rseg->last_page_no, &mtr);
564 
565  log_hdr = undo_page + rseg->last_offset;
566 
567  /* Increase the purge page count by one for every handled log */
568 
569  (*n_pages_handled)++;
570 
571  prev_log_addr = trx_purge_get_log_from_hist(
572  flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
573 
574  if (prev_log_addr.page == FIL_NULL) {
575  /* No logs left in the history list */
576 
577  rseg->last_page_no = FIL_NULL;
578 
579  mutex_exit(&(rseg->mutex));
580  mtr_commit(&mtr);
581 
582  mutex_enter(&trx_sys->mutex);
583 
584  /* Add debug code to track history list corruption reported
585  on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc
586  file-based list was corrupt. The prev node pointer was
587  FIL_NULL, even though the list length was over 8 million nodes!
588  We assume that purge truncates the history list in large
589  size pieces, and if we here reach the head of the list, the
590  list cannot be longer than 2000 000 undo logs now. */
591 
592  if (trx_sys->rseg_history_len > 2000000) {
593  ut_print_timestamp(stderr);
594  fprintf(stderr,
595  " InnoDB: Warning: purge reached the"
596  " head of the history list,\n"
597  "InnoDB: but its length is still"
598  " reported as %lu! Make a detailed bug\n"
599  "InnoDB: report, and submit it"
600  " to http://bugs.mysql.com\n",
601  (ulong) trx_sys->rseg_history_len);
602  ut_ad(0);
603  }
604 
605  mutex_exit(&trx_sys->mutex);
606 
607  return;
608  }
609 
610  mutex_exit(&rseg->mutex);
611 
612  mtr_commit(&mtr);
613 
614  /* Read the trx number and del marks from the previous log header */
615  mtr_start(&mtr);
616 
617  log_hdr = trx_undo_page_get_s_latched(rseg->space, rseg->zip_size,
618  prev_log_addr.page, &mtr)
619  + prev_log_addr.boffset;
620 
621  trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
622 
623  del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);
624 
625  mtr_commit(&mtr);
626 
627  mutex_enter(&(rseg->mutex));
628 
629  rseg->last_page_no = prev_log_addr.page;
630  rseg->last_offset = prev_log_addr.boffset;
631  rseg->last_trx_no = trx_no;
632  rseg->last_del_marks = del_marks;
633 
634  rseg_queue.rseg = rseg;
635  rseg_queue.trx_no = rseg->last_trx_no;
636 
637  /* Purge can also produce events, however these are already ordered
638  in the rollback segment and any user generated event will be greater
639  than the events that Purge produces. ie. Purge can never produce
640  events from an empty rollback segment. */
641 
642  mutex_enter(&purge_sys->bh_mutex);
643 
644  ptr = ib_bh_push(purge_sys->ib_bh, &rseg_queue);
645  ut_a(ptr != NULL);
646 
647  mutex_exit(&purge_sys->bh_mutex);
648 
649  mutex_exit(&rseg->mutex);
650 }
651 
652 /***********************************************************************/
656 static
657 ulint
658 trx_purge_get_rseg_with_min_trx_id(
659 /*===============================*/
662 {
663  ulint zip_size = 0;
664 
665  mutex_enter(&purge_sys->bh_mutex);
666 
667  /* Only purge consumes events from the binary heap, user
668  threads only produce the events. */
669 
670  if (!ib_bh_is_empty(purge_sys->ib_bh)) {
671  trx_rseg_t* rseg;
672 
673  rseg = ((rseg_queue_t*) ib_bh_first(purge_sys->ib_bh))->rseg;
674  ib_bh_pop(purge_sys->ib_bh);
675 
676  mutex_exit(&purge_sys->bh_mutex);
677 
678  purge_sys->rseg = rseg;
679  } else {
680  mutex_exit(&purge_sys->bh_mutex);
681 
682  purge_sys->rseg = NULL;
683 
684  return(ULINT_UNDEFINED);
685  }
686 
687  ut_a(purge_sys->rseg != NULL);
688 
689  mutex_enter(&purge_sys->rseg->mutex);
690 
691  ut_a(purge_sys->rseg->last_page_no != FIL_NULL);
692 
693  /* We assume in purge of externally stored fields that space id is
694  in the range of UNDO tablespace space ids */
695  ut_a(purge_sys->rseg->space <= srv_undo_tablespaces_open);
696 
697  zip_size = purge_sys->rseg->zip_size;
698 
699  ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no);
700 
701  purge_sys->iter.trx_no = purge_sys->rseg->last_trx_no;
702  purge_sys->hdr_offset = purge_sys->rseg->last_offset;
703  purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
704 
705  mutex_exit(&purge_sys->rseg->mutex);
706 
707  return(zip_size);
708 }
709 
710 /***********************************************************************/
712 static
713 void
714 trx_purge_read_undo_rec(
715 /*====================*/
716  trx_purge_t* purge_sys,
717  ulint zip_size)
718 {
719  ulint offset;
720  ulint page_no;
721  ib_uint64_t undo_no;
722 
723  purge_sys->hdr_offset = purge_sys->rseg->last_offset;
724  page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no;
725 
726  if (purge_sys->rseg->last_del_marks) {
727  mtr_t mtr;
728  trx_undo_rec_t* undo_rec = NULL;
729 
730  mtr_start(&mtr);
731 
732  undo_rec = trx_undo_get_first_rec(
733  purge_sys->rseg->space,
734  zip_size,
735  purge_sys->hdr_page_no,
736  purge_sys->hdr_offset, RW_S_LATCH, &mtr);
737 
738  if (undo_rec != NULL) {
739  offset = page_offset(undo_rec);
740  undo_no = trx_undo_rec_get_undo_no(undo_rec);
741  page_no = page_get_page_no(page_align(undo_rec));
742  } else {
743  offset = 0;
744  undo_no = 0;
745  }
746 
747  mtr_commit(&mtr);
748  } else {
749  offset = 0;
750  undo_no = 0;
751  }
752 
753  purge_sys->offset = offset;
754  purge_sys->page_no = page_no;
755  purge_sys->iter.undo_no = undo_no;
756 
757  purge_sys->next_stored = TRUE;
758 }
759 
760 /***********************************************************************/
765 static
766 void
767 trx_purge_choose_next_log(void)
768 /*===========================*/
769 {
770  ulint zip_size;
771 
772  ut_ad(purge_sys->next_stored == FALSE);
773 
774  zip_size = trx_purge_get_rseg_with_min_trx_id(purge_sys);
775 
776  if (purge_sys->rseg != NULL) {
777  trx_purge_read_undo_rec(purge_sys, zip_size);
778  } else {
779  /* There is nothing to do yet. */
780  os_thread_yield();
781  }
782 }
783 
784 /***********************************************************************/
787 static
789 trx_purge_get_next_rec(
790 /*===================*/
791  ulint* n_pages_handled,
793  mem_heap_t* heap)
794 {
798  page_t* undo_page;
799  page_t* page;
800  ulint offset;
801  ulint page_no;
802  ulint space;
803  ulint zip_size;
804  mtr_t mtr;
805 
806  ut_ad(purge_sys->next_stored);
807  ut_ad(purge_sys->iter.trx_no < purge_sys->view->low_limit_no);
808 
809  space = purge_sys->rseg->space;
810  zip_size = purge_sys->rseg->zip_size;
811  page_no = purge_sys->page_no;
812  offset = purge_sys->offset;
813 
814  if (offset == 0) {
815  /* It is the dummy undo log record, which means that there is
816  no need to purge this undo log */
817 
818  trx_purge_rseg_get_next_history_log(
819  purge_sys->rseg, n_pages_handled);
820 
821  /* Look for the next undo log and record to purge */
822 
823  trx_purge_choose_next_log();
824 
825  return(&trx_purge_dummy_rec);
826  }
827 
828  mtr_start(&mtr);
829 
830  undo_page = trx_undo_page_get_s_latched(space, zip_size, page_no, &mtr);
831 
832  rec = undo_page + offset;
833 
834  rec2 = rec;
835 
836  for (;;) {
837  ulint type;
838  trx_undo_rec_t* next_rec;
839  ulint cmpl_info;
840 
841  /* Try first to find the next record which requires a purge
842  operation from the same page of the same undo log */
843 
844  next_rec = trx_undo_page_get_next_rec(
845  rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset);
846 
847  if (next_rec == NULL) {
848  rec2 = trx_undo_get_next_rec(
849  rec2, purge_sys->hdr_page_no,
850  purge_sys->hdr_offset, &mtr);
851  break;
852  }
853 
854  rec2 = next_rec;
855 
856  type = trx_undo_rec_get_type(rec2);
857 
858  if (type == TRX_UNDO_DEL_MARK_REC) {
859 
860  break;
861  }
862 
863  cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
864 
866  break;
867  }
868 
869  if ((type == TRX_UNDO_UPD_EXIST_REC)
870  && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
871  break;
872  }
873  }
874 
875  if (rec2 == NULL) {
876  mtr_commit(&mtr);
877 
878  trx_purge_rseg_get_next_history_log(
879  purge_sys->rseg, n_pages_handled);
880 
881  /* Look for the next undo log and record to purge */
882 
883  trx_purge_choose_next_log();
884 
885  mtr_start(&mtr);
886 
887  undo_page = trx_undo_page_get_s_latched(
888  space, zip_size, page_no, &mtr);
889 
890  rec = undo_page + offset;
891  } else {
892  page = page_align(rec2);
893 
894  purge_sys->offset = rec2 - page;
895  purge_sys->page_no = page_get_page_no(page);
896  purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2);
897 
898  if (undo_page != page) {
899  /* We advance to a new page of the undo log: */
900  (*n_pages_handled)++;
901  }
902  }
903 
904  rec_copy = trx_undo_rec_copy(rec, heap);
905 
906  mtr_commit(&mtr);
907 
908  return(rec_copy);
909 }
910 
911 /********************************************************************/
916 static __attribute__((warn_unused_result, nonnull))
918 trx_purge_fetch_next_rec(
919 /*=====================*/
920  roll_ptr_t* roll_ptr,
921  ulint* n_pages_handled,
923  mem_heap_t* heap)
924 {
925  if (!purge_sys->next_stored) {
926  trx_purge_choose_next_log();
927 
928  if (!purge_sys->next_stored) {
929 
930  if (srv_print_thread_releases) {
931  fprintf(stderr,
932  "Purge: No logs left in the"
933  " history list\n");
934  }
935 
936  return(NULL);
937  }
938  }
939 
940  if (purge_sys->iter.trx_no >= purge_sys->view->low_limit_no) {
941 
942  return(NULL);
943  }
944 
945  /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
946  os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
947 
948  *roll_ptr = trx_undo_build_roll_ptr(
949  FALSE, purge_sys->rseg->id,
950  purge_sys->page_no, purge_sys->offset);
951 
952  /* The following call will advance the stored values of the
953  purge iterator. */
954 
955  return(trx_purge_get_next_rec(n_pages_handled, heap));
956 }
957 
958 /*******************************************************************/
961 static
962 ulint
963 trx_purge_attach_undo_recs(
964 /*=======================*/
965  ulint n_purge_threads,
966  trx_purge_t* purge_sys,
967  purge_iter_t* limit,
968  ulint batch_size)
969 {
970  que_thr_t* thr;
971  ulint i = 0;
972  ulint n_pages_handled = 0;
973  ulint n_thrs = UT_LIST_GET_LEN(purge_sys->query->thrs);
974 
975  ut_a(n_purge_threads > 0);
976 
977  *limit = purge_sys->iter;
978 
979  /* Debug code to validate some pre-requisites and reset done flag. */
980  for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
981  thr != NULL && i < n_purge_threads;
982  thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
983 
984  purge_node_t* node;
985 
986  /* Get the purge node. */
987  node = (purge_node_t*) thr->child;
988 
989  ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
990  ut_a(node->undo_recs == NULL);
991  ut_a(node->done);
992 
993  node->done = FALSE;
994  }
995 
996  /* There should never be fewer nodes than threads, the inverse
997  however is allowed because we only use purge threads as needed. */
998  ut_a(i == n_purge_threads);
999 
1000  /* Fetch and parse the UNDO records. The UNDO records are added
1001  to a per purge node vector. */
1002  thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1003  ut_a(n_thrs > 0 && thr != NULL);
1004 
1005  ut_ad(trx_purge_check_limit());
1006 
1007  i = 0;
1008 
1009  for (;;) {
1010  purge_node_t* node;
1011  trx_purge_rec_t* purge_rec;
1012 
1013  ut_a(!thr->is_active);
1014 
1015  /* Get the purge node. */
1016  node = (purge_node_t*) thr->child;
1017  ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1018 
1019  purge_rec = static_cast<trx_purge_rec_t*>(
1020  mem_heap_zalloc(node->heap, sizeof(*purge_rec)));
1021 
1022  /* Track the max {trx_id, undo_no} for truncating the
1023  UNDO logs once we have purged the records. */
1024 
1025  if (purge_sys->iter.trx_no > limit->trx_no
1026  || (purge_sys->iter.trx_no == limit->trx_no
1027  && purge_sys->iter.undo_no >= limit->undo_no)) {
1028 
1029  *limit = purge_sys->iter;
1030  }
1031 
1032  /* Fetch the next record, and advance the purge_sys->iter. */
1033  purge_rec->undo_rec = trx_purge_fetch_next_rec(
1034  &purge_rec->roll_ptr, &n_pages_handled, node->heap);
1035 
1036  if (purge_rec->undo_rec != NULL) {
1037 
1038  if (node->undo_recs == NULL) {
1039  node->undo_recs = ib_vector_create(
1040  ib_heap_allocator_create(node->heap),
1041  sizeof(trx_purge_rec_t),
1042  batch_size);
1043  } else {
1045  }
1046 
1047  ib_vector_push(node->undo_recs, purge_rec);
1048 
1049  if (n_pages_handled >= batch_size) {
1050 
1051  break;
1052  }
1053  } else {
1054  break;
1055  }
1056 
1057  thr = UT_LIST_GET_NEXT(thrs, thr);
1058 
1059  if (!(++i % n_purge_threads)) {
1060  thr = UT_LIST_GET_FIRST(purge_sys->query->thrs);
1061  }
1062 
1063  ut_a(thr != NULL);
1064  }
1065 
1066  ut_ad(trx_purge_check_limit());
1067 
1068  return(n_pages_handled);
1069 }
1070 
1071 /*******************************************************************/
1074 static
1075 ulint
1076 trx_purge_dml_delay(void)
1077 /*=====================*/
1078 {
1079  /* Determine how much data manipulation language (DML) statements
1080  need to be delayed in order to reduce the lagging of the purge
1081  thread. */
1082  ulint delay = 0; /* in microseconds; default: no delay */
1083 
1084  /* If purge lag is set (ie. > 0) then calculate the new DML delay.
1085  Note: we do a dirty read of the trx_sys_t data structure here,
1086  without holding trx_sys->mutex. */
1087 
1088  if (srv_max_purge_lag > 0) {
1089  float ratio;
1090 
1091  ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag;
1092 
1093  if (ratio > 1.0) {
1094  /* If the history list length exceeds the
1095  srv_max_purge_lag, the data manipulation
1096  statements are delayed by at least 5000
1097  microseconds. */
1098  delay = (ulint) ((ratio - .5) * 10000);
1099  }
1100 
1101  if (delay > srv_max_purge_lag_delay) {
1102  delay = srv_max_purge_lag_delay;
1103  }
1104 
1105  MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1106  }
1107 
1108  return(delay);
1109 }
1110 
1111 /*******************************************************************/
1113 static
1114 void
1115 trx_purge_wait_for_workers_to_complete(
1116 /*===================================*/
1117  trx_purge_t* purge_sys)
1118 {
1119  ulint n_submitted = purge_sys->n_submitted;
1120 
1121 #ifdef HAVE_ATOMIC_BUILTINS
1122  /* Ensure that the work queue empties out. */
1123  while (!os_compare_and_swap_ulint(
1124  &purge_sys->n_completed, n_submitted, n_submitted)) {
1125 #else
1126  mutex_enter(&purge_sys->bh_mutex);
1127 
1128  while (purge_sys->n_completed < n_submitted) {
1129 #endif /* HAVE_ATOMIC_BUILTINS */
1130 
1131 #ifndef HAVE_ATOMIC_BUILTINS
1132  mutex_exit(&purge_sys->bh_mutex);
1133 #endif /* !HAVE_ATOMIC_BUILTINS */
1134 
1135  if (srv_get_task_queue_length() > 0) {
1137  }
1138 
1139  os_thread_yield();
1140 
1141 #ifndef HAVE_ATOMIC_BUILTINS
1142  mutex_enter(&purge_sys->bh_mutex);
1143 #endif /* !HAVE_ATOMIC_BUILTINS */
1144  }
1145 
1146 #ifndef HAVE_ATOMIC_BUILTINS
1147  mutex_exit(&purge_sys->bh_mutex);
1148 #endif /* !HAVE_ATOMIC_BUILTINS */
1149 
1150  /* None of the worker threads should be doing any work. */
1151  ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1152 
1153  /* There should be no outstanding tasks as long
1154  as the worker threads are active. */
1156 }
1157 
1158 /******************************************************************/
1160 static
1161 void
1162 trx_purge_truncate(void)
1163 /*====================*/
1164 {
1165  ut_ad(trx_purge_check_limit());
1166 
1167  if (purge_sys->limit.trx_no == 0) {
1168  trx_purge_truncate_history(&purge_sys->iter, purge_sys->view);
1169  } else {
1170  trx_purge_truncate_history(&purge_sys->limit, purge_sys->view);
1171  }
1172 }
1173 
1174 /*******************************************************************/
1177 UNIV_INTERN
1178 ulint
1179 trx_purge(
1180 /*======*/
1181  ulint n_purge_threads,
1183  ulint batch_size,
1185  bool truncate)
1186 {
1187  que_thr_t* thr = NULL;
1188  ulint n_pages_handled;
1189 
1190  ut_a(n_purge_threads > 0);
1191 
1192  srv_dml_needed_delay = trx_purge_dml_delay();
1193 
1194  /* The number of tasks submitted should be completed. */
1195  ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1196 
1197  rw_lock_x_lock(&purge_sys->latch);
1198 
1199  purge_sys->view = NULL;
1200 
1201  mem_heap_empty(purge_sys->heap);
1202 
1203  purge_sys->view = read_view_purge_open(purge_sys->heap);
1204 
1205  rw_lock_x_unlock(&purge_sys->latch);
1206 
1207 #ifdef UNIV_DEBUG
1208  if (srv_purge_view_update_only_debug) {
1209  return(0);
1210  }
1211 #endif
1212 
1213  /* Fetch the UNDO recs that need to be purged. */
1214  n_pages_handled = trx_purge_attach_undo_recs(
1215  n_purge_threads, purge_sys, &purge_sys->limit, batch_size);
1216 
1217  /* Do we do an asynchronous purge or not ? */
1218  if (n_purge_threads > 1) {
1219  ulint i = 0;
1220 
1221  /* Submit the tasks to the work queue. */
1222  for (i = 0; i < n_purge_threads - 1; ++i) {
1224  purge_sys->query, thr);
1225 
1226  ut_a(thr != NULL);
1227 
1229  }
1230 
1231  thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
1232  ut_a(thr != NULL);
1233 
1234  purge_sys->n_submitted += n_purge_threads - 1;
1235 
1236  goto run_synchronously;
1237 
1238  /* Do it synchronously. */
1239  } else {
1240  thr = que_fork_scheduler_round_robin(purge_sys->query, NULL);
1241  ut_ad(thr);
1242 
1243 run_synchronously:
1244  ++purge_sys->n_submitted;
1245 
1246  que_run_threads(thr);
1247 
1248  os_atomic_inc_ulint(
1249  &purge_sys->bh_mutex, &purge_sys->n_completed, 1);
1250 
1251  if (n_purge_threads > 1) {
1252  trx_purge_wait_for_workers_to_complete(purge_sys);
1253  }
1254  }
1255 
1256  ut_a(purge_sys->n_submitted == purge_sys->n_completed);
1257 
1258 #ifdef UNIV_DEBUG
1259  if (purge_sys->limit.trx_no == 0) {
1260  purge_sys->done = purge_sys->iter;
1261  } else {
1262  purge_sys->done = purge_sys->limit;
1263  }
1264 #endif /* UNIV_DEBUG */
1265 
1266  if (truncate) {
1267  trx_purge_truncate();
1268  }
1269 
1270  MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1271  MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1272 
1273  return(n_pages_handled);
1274 }
1275 
1276 /*******************************************************************/
1279 UNIV_INTERN
1281 trx_purge_state(void)
1282 /*=================*/
1283 {
1284  purge_state_t state;
1285 
1286  rw_lock_x_lock(&purge_sys->latch);
1287 
1288  state = purge_sys->state;
1289 
1290  rw_lock_x_unlock(&purge_sys->latch);
1291 
1292  return(state);
1293 }
1294 
1295 /*******************************************************************/
1297 UNIV_INTERN
1298 void
1299 trx_purge_stop(void)
1300 /*================*/
1301 {
1302  purge_state_t state;
1303  ib_int64_t sig_count = os_event_reset(purge_sys->event);
1304 
1305  ut_a(srv_n_purge_threads > 0);
1306 
1307  rw_lock_x_lock(&purge_sys->latch);
1308 
1309  ut_a(purge_sys->state != PURGE_STATE_INIT);
1310  ut_a(purge_sys->state != PURGE_STATE_EXIT);
1311  ut_a(purge_sys->state != PURGE_STATE_DISABLED);
1312 
1313  ++purge_sys->n_stop;
1314 
1315  state = purge_sys->state;
1316 
1317  if (state == PURGE_STATE_RUN) {
1318  ib_logf(IB_LOG_LEVEL_INFO, "Stopping purge");
1319 
1320  /* We need to wakeup the purge thread in case it is suspended,
1321  so that it can acknowledge the state change. */
1322 
1323  srv_purge_wakeup();
1324  }
1325 
1326  purge_sys->state = PURGE_STATE_STOP;
1327 
1328  rw_lock_x_unlock(&purge_sys->latch);
1329 
1330  if (state != PURGE_STATE_STOP) {
1331 
1332  /* Wait for purge coordinator to signal that it
1333  is suspended. */
1334  os_event_wait_low(purge_sys->event, sig_count);
1335  } else {
1336  bool once = true;
1337 
1338  rw_lock_x_lock(&purge_sys->latch);
1339 
1340  /* Wait for purge to signal that it has actually stopped. */
1341  while (purge_sys->running) {
1342 
1343  if (once) {
1344  ib_logf(IB_LOG_LEVEL_INFO,
1345  "Waiting for purge to stop");
1346  once = false;
1347  }
1348 
1349  rw_lock_x_unlock(&purge_sys->latch);
1350 
1351  os_thread_sleep(10000);
1352 
1353  rw_lock_x_lock(&purge_sys->latch);
1354  }
1355 
1356  rw_lock_x_unlock(&purge_sys->latch);
1357  }
1358 
1359  MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1);
1360 }
1361 
1362 /*******************************************************************/
1364 UNIV_INTERN
1365 void
1366 trx_purge_run(void)
1367 /*===============*/
1368 {
1369  rw_lock_x_lock(&purge_sys->latch);
1370 
1371  switch(purge_sys->state) {
1372  case PURGE_STATE_INIT:
1373  case PURGE_STATE_EXIT:
1374  case PURGE_STATE_DISABLED:
1375  ut_error;
1376 
1377  case PURGE_STATE_RUN:
1378  case PURGE_STATE_STOP:
1379  break;
1380  }
1381 
1382  if (purge_sys->n_stop > 0) {
1383 
1384  ut_a(purge_sys->state == PURGE_STATE_STOP);
1385 
1386  --purge_sys->n_stop;
1387 
1388  if (purge_sys->n_stop == 0) {
1389 
1390  ib_logf(IB_LOG_LEVEL_INFO, "Resuming purge");
1391 
1392  purge_sys->state = PURGE_STATE_RUN;
1393  }
1394 
1395  MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1);
1396  } else {
1397  ut_a(purge_sys->state == PURGE_STATE_RUN);
1398  }
1399 
1400  rw_lock_x_unlock(&purge_sys->latch);
1401 
1402  srv_purge_wakeup();
1403 }