MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rpl_rli_pdb.h
1 #ifndef RPL_RLI_PDB_H
2 
3 #define RPL_RLI_PDB_H
4 
5 #ifdef HAVE_REPLICATION
6 
7 #include "sql_string.h"
8 #include "rpl_rli.h"
9 #include <my_sys.h>
10 #include <my_bitmap.h>
11 #include "rpl_slave.h"
12 
24 /* Assigned Partition Hash (APH) entry */
25 typedef struct st_db_worker_hash_entry
26 {
27  uint db_len;
28  const char *db;
29  Slave_worker *worker;
30  /*
31  The number of transaction pending on this database.
32  This should only be modified under the lock slave_worker_hash_lock.
33  */
34  long usage;
35  /*
36  The list of temp tables belonging to @ db database is
37  attached to an assigned @c worker to become its thd->temporary_tables.
38  The list is updated with every ddl incl CREATE, DROP.
39  It is removed from the entry and merged to the coordinator's
40  thd->temporary_tables in case of events: slave stops, APH oversize.
41  */
42  TABLE* volatile temporary_tables;
43 
44  /* todo: relax concurrency to mimic record-level locking.
45  That is to augmenting the entry with mutex/cond pair
46  pthread_mutex_t
47  pthread_cond_t
48  timestamp updated_at; */
49 
50 } db_worker_hash_entry;
51 
52 bool init_hash_workers(ulong slave_parallel_workers);
53 void destroy_hash_workers(Relay_log_info*);
54 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
55  db_worker_hash_entry **ptr_entry,
56  bool need_temp_tables, Slave_worker *w);
57 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
58 int wait_for_workers_to_finish(Relay_log_info const *rli,
59  Slave_worker *ignore= NULL);
60 
61 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
62 
63 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
64 
65 typedef struct slave_job_item
66 {
67  void *data;
68 } Slave_job_item;
69 
77 class circular_buffer_queue
78 {
79 public:
80 
81  DYNAMIC_ARRAY Q;
82  ulong size; // the Size of the queue in terms of element
83  ulong avail; // first Available index to append at (next to tail)
84  ulong entry; // the head index or the entry point to the queue.
85  volatile ulong len; // actual length
86  bool inited_queue;
87 
88  circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
89  size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
90  {
91  DBUG_ASSERT(size < (ulong) -1);
92  if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
93  inited_queue= TRUE;
94  }
95  circular_buffer_queue () : inited_queue(FALSE) {}
96  ~circular_buffer_queue ()
97  {
98  if (inited_queue)
99  delete_dynamic(&Q);
100  }
101 
110  ulong de_queue(uchar *);
114  ulong de_tail(uchar *val);
115 
121  ulong en_queue(void *item);
125  void* head_queue();
126  bool gt(ulong i, ulong k); // comparision of ordering of two entities
127  /* index is within the valid range */
128  bool in(ulong k) { return !empty() &&
129  (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
130  bool empty() { return entry == size; }
131  bool full() { return avail == size; }
132 };
133 
134 typedef struct st_slave_job_group
135 {
136  char *group_master_log_name; // (actually redundant)
137  /*
138  T-event lop_pos filled by Worker for CheckPoint (CP)
139  */
140  my_off_t group_master_log_pos;
141 
142  /*
143  When relay-log name changes allocates and fill in a new name of relay-log,
144  otherwise it fills in NULL.
145  Coordinator keeps track of each Worker has been notified on the updating
146  to make sure the routine runs once per change.
147 
148  W checks the value at commit and memoriezes a not-NULL.
149  Freeing unless NULL is left to Coordinator at CP.
150  */
151  char *group_relay_log_name; // The value is last seen relay-log
152  my_off_t group_relay_log_pos; // filled by W
153  ulong worker_id;
154  Slave_worker *worker;
155  ulonglong total_seqno;
156 
157  my_off_t master_log_pos; // B-event log_pos
158  /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
159  uint checkpoint_seqno;
160  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
161  char* checkpoint_log_name;
162  my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
163  char* checkpoint_relay_log_name;
164  volatile uchar done; // Flag raised by W, read and reset by Coordinator
165  ulong shifted; // shift the last CP bitmap at receiving a new CP
166  time_t ts; // Group's timestampt to update Seconds_behind_master
167 #ifndef DBUG_OFF
168  bool notified; // to debug group_master_log_name change notification
169 #endif
170  /*
171  Coordinator fills the struct with defaults and options at starting of
172  a group distribution.
173  */
174  void reset(my_off_t master_pos, ulonglong seqno)
175  {
176  master_log_pos= master_pos;
177  group_master_log_pos= group_relay_log_pos= 0;
178  group_master_log_name= NULL; // todo: remove
179  group_relay_log_name= NULL;
180  worker_id= MTS_WORKER_UNDEF;
181  total_seqno= seqno;
182  checkpoint_log_name= NULL;
183  checkpoint_log_pos= 0;
184  checkpoint_relay_log_name= NULL;
185  checkpoint_relay_log_pos= 0;
186  checkpoint_seqno= (uint) -1;
187  done= 0;
188 #ifndef DBUG_OFF
189  notified= false;
190 #endif
191  }
192 } Slave_job_group;
193 
199 class Slave_committed_queue : public circular_buffer_queue
200 {
201 public:
202 
203  bool inited;
204 
205  /* master's Rot-ev exec */
206  void update_current_binlog(const char *post_rotate);
207 
208  /*
209  The last checkpoint time Low-Water-Mark
210  */
211  Slave_job_group lwm;
212 
213  /* last time processed indexes for each worker */
214  DYNAMIC_ARRAY last_done;
215 
216  /* the being assigned group index in GAQ */
217  ulong assigned_group_index;
218 
219  Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
220  uint inc= 0)
221  : circular_buffer_queue(el_size, max, inc), inited(FALSE)
222  {
223  uint k;
224  ulonglong l= 0;
225 
226  if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
227  return;
228  else
229  inited= TRUE;
230  my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
231  for (k= 0; k < n; k++)
232  insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker
233  lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
234  lwm.group_relay_log_name[0]= 0;
235  }
236 
237  ~Slave_committed_queue ()
238  {
239  if (inited)
240  {
241  delete_dynamic(&last_done);
242  my_free(lwm.group_relay_log_name);
243  free_dynamic_items(); // free possibly left allocated strings in GAQ list
244  }
245  }
246 
247 #ifndef DBUG_OFF
248  bool count_done(Relay_log_info* rli);
249 #endif
250 
251  /* Checkpoint routine refreshes the queue */
252  ulong move_queue_head(DYNAMIC_ARRAY *ws);
253  /* Method is for slave shutdown time cleanup */
254  void free_dynamic_items();
255  /*
256  returns a pointer to Slave_job_group struct instance as indexed by arg
257  in the circular buffer dyn-array
258  */
259  Slave_job_group* get_job_group(ulong ind)
260  {
261  return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
262  }
263 
268  ulong en_queue(void *item)
269  {
270  return assigned_group_index= circular_buffer_queue::en_queue(item);
271  }
272 
273 };
274 
275 class Slave_jobs_queue : public circular_buffer_queue
276 {
277 public:
278 
279  /*
280  Coordinator marks with true, Worker signals back at queue back to
281  available
282  */
283  bool overfill;
284  ulonglong waited_overfill;
285 };
286 
287 class Slave_worker : public Relay_log_info
288 {
289 public:
290  Slave_worker(Relay_log_info *rli
291 #ifdef HAVE_PSI_INTERFACE
292  ,PSI_mutex_key *param_key_info_run_lock,
293  PSI_mutex_key *param_key_info_data_lock,
294  PSI_mutex_key *param_key_info_sleep_lock,
295  PSI_mutex_key *param_key_info_data_cond,
296  PSI_mutex_key *param_key_info_start_cond,
297  PSI_mutex_key *param_key_info_stop_cond,
298  PSI_mutex_key *param_key_info_sleep_cond
299 #endif
300  , uint param_id
301  );
302  virtual ~Slave_worker();
303 
304  Slave_jobs_queue jobs; // assignment queue containing events to execute
305  mysql_mutex_t jobs_lock; // mutex for the jobs queue
306  mysql_cond_t jobs_cond; // condition variable for the jobs queue
307  Relay_log_info *c_rli; // pointer to Coordinator's rli
308  DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
309  bool curr_group_seen_begin; // is set to TRUE with explicit B-event
310  ulong id; // numberic identifier of the Worker
311 
312  /*
313  Worker runtime statictics
314  */
315  // the index in GAQ of the last processed group by this Worker
316  volatile ulong last_group_done_index;
317  ulong wq_empty_waits; // how many times got idle
318  ulong events_done; // how many events (statements) processed
319  ulong groups_done; // how many groups (transactions) processed
320  volatile int curr_jobs; // number of active assignments
321  // number of partitions allocated to the worker at point in time
322  long usage_partition;
323  // symmetric to rli->mts_end_group_sets_max_dbs
324  bool end_group_sets_max_dbs;
325 
326  volatile bool relay_log_change_notified; // Coord sets and resets, W can read
327  volatile bool checkpoint_notified; // Coord sets and resets, W can read
328  volatile bool master_log_change_notified; // Coord sets and resets, W can read
329  ulong bitmap_shifted; // shift the last bitmap at receiving new CP
330  // WQ current excess above the overrun level
331  long wq_overrun_cnt;
332  /*
333  number of events starting from which Worker queue is regarded as
334  close to full. The number of the excessive events yields a weight factor
335  to compute Coordinator's nap.
336  */
337  ulong overrun_level;
338  /*
339  reverse to overrun: the number of events below which Worker is
340  considered underruning
341  */
342  ulong underrun_level;
343  /*
344  Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
345  When WQ length is dropped below overrun the counter is reset.
346  */
347  ulong excess_cnt;
348  /*
349  Coordinates of the last CheckPoint (CP) this Worker has
350  acknowledged; part of is persisent data
351  */
352  char checkpoint_relay_log_name[FN_REFLEN];
353  ulonglong checkpoint_relay_log_pos;
354  char checkpoint_master_log_name[FN_REFLEN];
355  ulonglong checkpoint_master_log_pos;
356  MY_BITMAP group_executed; // bitmap describes groups executed after last CP
357  MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
358  ulong checkpoint_seqno; // the most significant ON bit in group_executed
359  enum en_running_state
360  {
361  NOT_RUNNING= 0,
362  RUNNING= 1,
363  ERROR_LEAVING, // is set by Worker
364  KILLED // is set by Coordinator
365  };
366  /*
367  The running status is guarded by jobs_lock mutex that a writer
368  Coordinator or Worker itself needs to hold when write a new value.
369  */
370  en_running_state volatile running_status;
371 
372  int init_worker(Relay_log_info*, ulong);
373  int rli_init_info(bool);
374  int flush_info(bool force= FALSE);
375  static size_t get_number_worker_fields();
376  void slave_worker_ends_group(Log_event*, int);
377  const char *get_master_log_name();
378  ulonglong get_master_log_pos() { return master_log_pos; };
379  ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
380  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
381  bool reset_recovery_info();
389  {
390  DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
391 #ifndef DBUG_OFF
392  if (fdle)
393  mysql_mutex_assert_owner(&jobs_lock);
394 #endif
395 
396  if (fdle)
398  rli_description_event= fdle;
399  }
400 
401  inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
402  inline void set_gaq_index(ulong val)
403  {
404  if (gaq_index == c_rli->gaq->size)
405  gaq_index= val;
406  };
407 
408 protected:
409 
410  virtual void do_report(loglevel level, int err_code,
411  const char *msg, va_list v_args) const;
412 
413 private:
414  ulong gaq_index; // GAQ index of the current assignment
415  ulonglong master_log_pos; // event's cached log_pos for possibile error report
416  void end_info();
417  bool read_info(Rpl_info_handler *from);
418  bool write_info(Rpl_info_handler *to);
419  Slave_worker& operator=(const Slave_worker& info);
420  Slave_worker(const Slave_worker& info);
421 };
422 
423 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
424 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
425 #endif // HAVE_REPLICATION
426 #endif