19 #include "semisync_master.h" 
   21 #define TIME_THOUSAND 1000 
   22 #define TIME_MILLION  1000000 
   23 #define TIME_BILLION  1000000000 
   26 char rpl_semi_sync_master_enabled;
 
   27 unsigned long rpl_semi_sync_master_timeout;
 
   28 unsigned long rpl_semi_sync_master_trace_level;
 
   29 char rpl_semi_sync_master_status                    = 0;
 
   30 unsigned long rpl_semi_sync_master_yes_transactions = 0;
 
   31 unsigned long rpl_semi_sync_master_no_transactions  = 0;
 
   32 unsigned long rpl_semi_sync_master_off_times        = 0;
 
   33 unsigned long rpl_semi_sync_master_timefunc_fails   = 0;
 
   34 unsigned long rpl_semi_sync_master_wait_timeouts     = 0;
 
   35 unsigned long rpl_semi_sync_master_wait_sessions    = 0;
 
   36 unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
 
   37 unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
 
   38 unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
 
   39 unsigned long rpl_semi_sync_master_avg_net_wait_time    = 0;
 
   40 unsigned long long rpl_semi_sync_master_net_wait_num = 0;
 
   41 unsigned long rpl_semi_sync_master_clients          = 0;
 
   42 unsigned long long rpl_semi_sync_master_net_wait_time = 0;
 
   43 unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
 
   44 char rpl_semi_sync_master_wait_no_slave = 1;
 
   47 static int getWaitTime(
const struct timespec& start_ts);
 
   49 static unsigned long long timespec_to_usec(
const struct timespec *ts)
 
   52   return (
unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
 
   54   return ts->tv.i64 / 10;
 
   65                          unsigned long trace_level)
 
   66   : 
Trace(trace_level), allocator_(max_connections),
 
   67     num_entries_(max_connections << 1), 
 
   78   for (
int idx = 0; idx < num_entries_; ++idx)
 
   81   sql_print_information(
"Semi-sync replication initialized for transactions.");
 
   84 ActiveTranx::~ActiveTranx()
 
   91 unsigned int ActiveTranx::calc_hash(
const unsigned char *key,
 
   94   unsigned int nr = 1, nr2 = 4;
 
   99     nr  ^= (((nr & 63)+nr2)*((
unsigned int) (
unsigned char) *key++))+ (nr << 8);
 
  102   return((
unsigned int) nr);
 
  105 unsigned int ActiveTranx::get_hash_value(
const char *log_file_name,
 
  106                                  my_off_t    log_file_pos)
 
  108   unsigned int hash1 = calc_hash((
const unsigned char *)log_file_name,
 
  109                                  strlen(log_file_name));
 
  110   unsigned int hash2 = calc_hash((
const unsigned char *)(&log_file_pos),
 
  111                                  sizeof(log_file_pos));
 
  113   return (hash1 + hash2) % num_entries_;
 
  116 int ActiveTranx::compare(
const char *log_file_name1, my_off_t log_file_pos1,
 
  117                          const char *log_file_name2, my_off_t log_file_pos2)
 
  119   int cmp = strcmp(log_file_name1, log_file_name2);
 
  124   if (log_file_pos1 > log_file_pos2)
 
  126   else if (log_file_pos1 < log_file_pos2)
 
  131 int ActiveTranx::insert_tranx_node(
const char *log_file_name,
 
  132                                    my_off_t log_file_pos)
 
  134   const char *kWho = 
"ActiveTranx:insert_tranx_node";
 
  137   unsigned int        hash_val;
 
  139   function_enter(kWho);
 
  144     sql_print_error(
"%s: transaction node allocation failed for: (%s, %lu)",
 
  145                     kWho, log_file_name, (
unsigned long)log_file_pos);
 
  151   strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
 
  152   ins_node->log_name_[FN_REFLEN-1] = 0; 
 
  153   ins_node->log_pos_ = log_file_pos;
 
  158     trx_front_ = trx_rear_ = ins_node;
 
  162     int cmp = compare(ins_node, trx_rear_);
 
  168       trx_rear_->next_ = ins_node;
 
  169       trx_rear_        = ins_node;
 
  176       sql_print_error(
"%s: binlog write out-of-order, tail (%s, %lu), " 
  177                       "new node (%s, %lu)", kWho,
 
  178                       trx_rear_->log_name_, (
unsigned long)trx_rear_->log_pos_,
 
  179                       ins_node->log_name_, (
unsigned long)ins_node->log_pos_);
 
  185   hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
 
  186   ins_node->hash_next_ = trx_htb_[hash_val];
 
  187   trx_htb_[hash_val]   = ins_node;
 
  189   if (trace_level_ & kTraceDetail)
 
  190     sql_print_information(
"%s: insert (%s, %lu) in entry(%u)", kWho,
 
  191                           ins_node->log_name_, (
unsigned long)ins_node->log_pos_,
 
  195   return function_exit(kWho, result);
 
  198 bool ActiveTranx::is_tranx_end_pos(
const char *log_file_name,
 
  199                                    my_off_t    log_file_pos)
 
  201   const char *kWho = 
"ActiveTranx::is_tranx_end_pos";
 
  202   function_enter(kWho);
 
  204   unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
 
  207   while (entry != NULL)
 
  209     if (compare(entry, log_file_name, log_file_pos) == 0)
 
  212     entry = entry->hash_next_;
 
  215   if (trace_level_ & kTraceDetail)
 
  216     sql_print_information(
"%s: probe (%s, %lu) in entry(%u)", kWho,
 
  217                           log_file_name, (
unsigned long)log_file_pos, hash_val);
 
  219   function_exit(kWho, (entry != NULL));
 
  220   return (entry != NULL);
 
  223 int ActiveTranx::clear_active_tranx_nodes(
const char *log_file_name,
 
  224                                           my_off_t log_file_pos)
 
  226   const char *kWho = 
"ActiveTranx::::clear_active_tranx_nodes";
 
  229   function_enter(kWho);
 
  231   if (log_file_name != NULL)
 
  233     new_front = trx_front_;
 
  237       if (compare(new_front, log_file_name, log_file_pos) > 0)
 
  239       new_front = new_front->next_;
 
  248   if (new_front == NULL)
 
  253     memset(trx_htb_, 0, num_entries_ * 
sizeof(
TranxNode *));
 
  257     if (trx_front_ != NULL)
 
  263     if (trace_level_ & kTraceDetail)
 
  264       sql_print_information(
"%s: cleared all nodes", kWho);
 
  266   else if (new_front != trx_front_)
 
  272     curr_node = trx_front_;
 
  273     while (curr_node != new_front)
 
  275       next_node = curr_node->next_;
 
  279       unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
 
  280       TranxNode **hash_ptr = &(trx_htb_[hash_val]);
 
  281       while ((*hash_ptr) != NULL)
 
  283         if ((*hash_ptr) == curr_node)
 
  285           (*hash_ptr) = curr_node->hash_next_;
 
  288         hash_ptr = &((*hash_ptr)->hash_next_);
 
  291       curr_node = next_node;
 
  294     trx_front_ = new_front;
 
  297     if (trace_level_ & kTraceDetail)
 
  298       sql_print_information(
"%s: cleared %d nodes back until pos (%s, %lu)",
 
  300                             trx_front_->log_name_, (
unsigned long)trx_front_->log_pos_);
 
  303   return function_exit(kWho, 0);
 
  330 ReplSemiSyncMaster::ReplSemiSyncMaster()
 
  331   : active_tranxs_(NULL),
 
  333     reply_file_name_inited_(false),
 
  335     wait_file_name_inited_(false),
 
  337     master_enabled_(false),
 
  341   strcpy(reply_file_name_, 
"");
 
  342   strcpy(wait_file_name_, 
"");
 
  345 int ReplSemiSyncMaster::initObject()
 
  348   const char *kWho = 
"ReplSemiSyncMaster::initObject";
 
  352     fprintf(stderr, 
"%s called twice\n", kWho);
 
  358   setWaitTimeout(rpl_semi_sync_master_timeout);
 
  359   setTraceLevel(rpl_semi_sync_master_trace_level);
 
  363                    &LOCK_binlog_, MY_MUTEX_INIT_FAST);
 
  365                   &COND_binlog_send_, NULL);
 
  367   if (rpl_semi_sync_master_enabled)
 
  368     result = enableMaster();
 
  370     result = disableMaster();
 
  375 int ReplSemiSyncMaster::enableMaster()
 
  382   if (!getMasterEnabled())
 
  384     active_tranxs_ = 
new ActiveTranx(&LOCK_binlog_, trace_level_);
 
  385     if (active_tranxs_ != NULL)
 
  387       commit_file_name_inited_ = 
false;
 
  388       reply_file_name_inited_  = 
false;
 
  389       wait_file_name_inited_   = 
false;
 
  391       set_master_enabled(
true);
 
  393       sql_print_information(
"Semi-sync replication enabled on the master.");
 
  397       sql_print_error(
"Cannot allocate memory to enable semi-sync on the master.");
 
  407 int ReplSemiSyncMaster::disableMaster()
 
  412   if (getMasterEnabled())
 
  419     assert(active_tranxs_ != NULL);
 
  420     delete active_tranxs_;
 
  421     active_tranxs_ = NULL;
 
  423     reply_file_name_inited_ = 
false;
 
  424     wait_file_name_inited_  = 
false;
 
  425     commit_file_name_inited_ = 
false;
 
  427     set_master_enabled(
false);
 
  428     sql_print_information(
"Semi-sync replication disabled on the master.");
 
  436 ReplSemiSyncMaster::~ReplSemiSyncMaster()
 
  444   delete active_tranxs_;
 
  447 void ReplSemiSyncMaster::lock()
 
  452 void ReplSemiSyncMaster::unlock()
 
  457 void ReplSemiSyncMaster::cond_broadcast()
 
  462 int ReplSemiSyncMaster::cond_timewait(
struct timespec *wait_time)
 
  464   const char *kWho = 
"ReplSemiSyncMaster::cond_timewait()";
 
  467   function_enter(kWho);
 
  469                                  &LOCK_binlog_, wait_time);
 
  470   return function_exit(kWho, wait_res);
 
  473 void ReplSemiSyncMaster::add_slave()
 
  476   rpl_semi_sync_master_clients++;
 
  480 void ReplSemiSyncMaster::remove_slave()
 
  483   rpl_semi_sync_master_clients--;
 
  489   if (!rpl_semi_sync_master_wait_no_slave &&
 
  490       rpl_semi_sync_master_clients == 0)
 
  495 bool ReplSemiSyncMaster::is_semi_sync_slave()
 
  499   get_user_var_int(
"rpl_semi_sync_slave", &val, &null_value);
 
  503 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
 
  504                                           const char *log_file_name,
 
  505                                           my_off_t log_file_pos,
 
  508   const char *kWho = 
"ReplSemiSyncMaster::reportReplyBinlog";
 
  510   bool  can_release_threads = 
false;
 
  511   bool  need_copy_send_pos = 
true;
 
  513   if (!(getMasterEnabled()))
 
  516   function_enter(kWho);
 
  521   if (!getMasterEnabled())
 
  526     try_switch_on(server_id, log_file_name, log_file_pos);
 
  534   if (reply_file_name_inited_)
 
  537                                reply_file_name_, reply_file_pos_);
 
  550       need_copy_send_pos = 
false;
 
  554   if (need_copy_send_pos)
 
  556     strcpy(reply_file_name_, log_file_name);
 
  557     reply_file_pos_ = log_file_pos;
 
  558     reply_file_name_inited_ = 
true;
 
  561     assert(active_tranxs_ != NULL);
 
  562     active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
 
  564     if (trace_level_ & kTraceDetail)
 
  567         sql_print_information(
"%s: Got reply at (%s, %lu)", kWho,
 
  568                             log_file_name, (
unsigned long)log_file_pos);
 
  570         sql_print_information(
"%s: Transaction skipped at (%s, %lu)", kWho,
 
  571                             log_file_name, (
unsigned long)log_file_pos);
 
  575   if (rpl_semi_sync_master_wait_sessions > 0)
 
  581                                wait_file_name_, wait_file_pos_);
 
  587       can_release_threads = 
true;
 
  588       wait_file_name_inited_ = 
false;
 
  595   if (can_release_threads)
 
  597     if (trace_level_ & kTraceDetail)
 
  598       sql_print_information(
"%s: signal all waiting threads.", kWho);
 
  603   return function_exit(kWho, 0);
 
  606 int ReplSemiSyncMaster::commitTrx(
const char* trx_wait_binlog_name,
 
  607                                   my_off_t trx_wait_binlog_pos)
 
  609   const char *kWho = 
"ReplSemiSyncMaster::commitTrx";
 
  611   function_enter(kWho);
 
  613   if (getMasterEnabled() && trx_wait_binlog_name)
 
  620     set_timespec(start_ts, 0);
 
  626     THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
 
  627                    & stage_waiting_for_semi_sync_ack_from_slave,
 
  631     if (!getMasterEnabled() || !is_on())
 
  634     if (trace_level_ & kTraceDetail)
 
  636       sql_print_information(
"%s: wait pos (%s, %lu), repl(%d)\n", kWho,
 
  637                             trx_wait_binlog_name, (
unsigned long)trx_wait_binlog_pos,
 
  643       abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
 
  644       abstime.max_timeout_msec= (long)wait_timeout_;
 
  646       abstime.tv_sec = start_ts.tv_sec + wait_timeout_ / TIME_THOUSAND;
 
  647       abstime.tv_nsec = start_ts.tv_nsec +
 
  648         (wait_timeout_ % TIME_THOUSAND) * TIME_MILLION;
 
  649       if (abstime.tv_nsec >= TIME_BILLION)
 
  652         abstime.tv_nsec -= TIME_BILLION;
 
  658       if (reply_file_name_inited_)
 
  661                                        trx_wait_binlog_name, trx_wait_binlog_pos);
 
  667           if (trace_level_ & kTraceDetail)
 
  668             sql_print_information(
"%s: Binlog reply is ahead (%s, %lu),",
 
  669                                   kWho, reply_file_name_, (
unsigned long)reply_file_pos_);
 
  677       if (wait_file_name_inited_)
 
  680                                        wait_file_name_, wait_file_pos_);
 
  684           strcpy(wait_file_name_, trx_wait_binlog_name);
 
  685           wait_file_pos_ = trx_wait_binlog_pos;
 
  687           rpl_semi_sync_master_wait_pos_backtraverse++;
 
  688           if (trace_level_ & kTraceDetail)
 
  689             sql_print_information(
"%s: move back wait position (%s, %lu),",
 
  690                                   kWho, wait_file_name_, (
unsigned long)wait_file_pos_);
 
  695         strcpy(wait_file_name_, trx_wait_binlog_name);
 
  696         wait_file_pos_ = trx_wait_binlog_pos;
 
  697         wait_file_name_inited_ = 
true;
 
  699         if (trace_level_ & kTraceDetail)
 
  700           sql_print_information(
"%s: init wait position (%s, %lu),",
 
  701                                 kWho, wait_file_name_, (
unsigned long)wait_file_pos_);
 
  712       rpl_semi_sync_master_wait_sessions++;
 
  714       if (trace_level_ & kTraceDetail)
 
  715         sql_print_information(
"%s: wait %lu ms for binlog sent (%s, %lu)",
 
  717                               wait_file_name_, (
unsigned long)wait_file_pos_);
 
  719       wait_result = cond_timewait(&abstime);
 
  720       rpl_semi_sync_master_wait_sessions--;
 
  722       if (wait_result != 0)
 
  725         sql_print_warning(
"Timeout waiting for reply of binlog (file: %s, pos: %lu), " 
  726                           "semi-sync up to file %s, position %lu.",
 
  727                           trx_wait_binlog_name, (
unsigned long)trx_wait_binlog_pos,
 
  728                           reply_file_name_, (
unsigned long)reply_file_pos_);
 
  729         rpl_semi_sync_master_wait_timeouts++;
 
  738         wait_time = getWaitTime(start_ts);
 
  741           if (trace_level_ & kTraceGeneral)
 
  743             sql_print_information(
"Assessment of waiting time for commitTrx " 
  744                                   "failed at wait position (%s, %lu)",
 
  745                                   trx_wait_binlog_name,
 
  746                                   (
unsigned long)trx_wait_binlog_pos);
 
  748           rpl_semi_sync_master_timefunc_fails++;
 
  752           rpl_semi_sync_master_trx_wait_num++;
 
  753           rpl_semi_sync_master_trx_wait_time += wait_time;
 
  763     assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
 
  764                                              trx_wait_binlog_pos));
 
  768       rpl_semi_sync_master_yes_transactions++;
 
  770       rpl_semi_sync_master_no_transactions++;
 
  774     THD_EXIT_COND(NULL, & old_stage);
 
  777   return function_exit(kWho, 0);
 
  798 int ReplSemiSyncMaster::switch_off()
 
  800   const char *kWho = 
"ReplSemiSyncMaster::switch_off";
 
  803   function_enter(kWho);
 
  807   assert(active_tranxs_ != NULL);
 
  808   result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
 
  810   rpl_semi_sync_master_off_times++;
 
  811   wait_file_name_inited_   = 
false;
 
  812   reply_file_name_inited_  = 
false;
 
  813   sql_print_information(
"Semi-sync replication switched OFF.");
 
  816   return function_exit(kWho, result);
 
  819 int ReplSemiSyncMaster::try_switch_on(
int server_id,
 
  820                                       const char *log_file_name,
 
  821                                       my_off_t log_file_pos)
 
  823   const char *kWho = 
"ReplSemiSyncMaster::try_switch_on";
 
  824   bool semi_sync_on = 
false;
 
  826   function_enter(kWho);
 
  834   if (commit_file_name_inited_)
 
  837                                    commit_file_name_, commit_file_pos_);
 
  838     semi_sync_on = (cmp >= 0);
 
  850     sql_print_information(
"Semi-sync replication switched ON with slave (server_id: %d) " 
  852                           server_id, log_file_name,
 
  853                           (
unsigned long)log_file_pos);
 
  856   return function_exit(kWho, 0);
 
  859 int ReplSemiSyncMaster::reserveSyncHeader(
unsigned char *header,
 
  862   const char *kWho = 
"ReplSemiSyncMaster::reserveSyncHeader";
 
  863   function_enter(kWho);
 
  866   if (!is_semi_sync_slave())
 
  873     if (
sizeof(kSyncHeader) > size)
 
  875       sql_print_warning(
"No enough space in the packet " 
  876                         "for semi-sync extra header, " 
  877                         "semi-sync replication disabled");
 
  885     memcpy(header, kSyncHeader, 
sizeof(kSyncHeader));
 
  886     hlen= 
sizeof(kSyncHeader);
 
  888   return function_exit(kWho, hlen);
 
  891 int ReplSemiSyncMaster::updateSyncHeader(
unsigned char *packet,
 
  892                                          const char *log_file_name,
 
  893                                          my_off_t log_file_pos,
 
  896   const char *kWho = 
"ReplSemiSyncMaster::updateSyncHeader";
 
  903   if (!getMasterEnabled() || !is_semi_sync_slave())
 
  909   function_enter(kWho);
 
  914   if (!getMasterEnabled())
 
  925     if (reply_file_name_inited_)
 
  928                                  reply_file_name_, reply_file_pos_);
 
  938     if (wait_file_name_inited_)
 
  941                                  wait_file_name_, wait_file_pos_);
 
  956       assert(active_tranxs_ != NULL);
 
  957       sync = active_tranxs_->is_tranx_end_pos(log_file_name,
 
  963     if (commit_file_name_inited_)
 
  966                                      commit_file_name_, commit_file_pos_);
 
  975   if (trace_level_ & kTraceDetail)
 
  976     sql_print_information(
"%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
 
  977                           kWho, server_id, log_file_name,
 
  978                           (
unsigned long)log_file_pos, sync, (
int)is_on());
 
  988     (packet)[2] = kPacketFlagSync;
 
  991   return function_exit(kWho, 0);
 
  994 int ReplSemiSyncMaster::writeTranxInBinlog(
const char* log_file_name,
 
  995                                            my_off_t log_file_pos)
 
  997   const char *kWho = 
"ReplSemiSyncMaster::writeTranxInBinlog";
 
 1000   function_enter(kWho);
 
 1005   if (!getMasterEnabled())
 
 1015   if (commit_file_name_inited_)
 
 1018                                    commit_file_name_, commit_file_pos_);
 
 1022       strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
 
 1023       commit_file_name_[FN_REFLEN-1] = 0; 
 
 1024       commit_file_pos_ = log_file_pos;
 
 1029     strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
 
 1030     commit_file_name_[FN_REFLEN-1] = 0; 
 
 1031     commit_file_pos_ = log_file_pos;
 
 1032     commit_file_name_inited_ = 
true;
 
 1037     assert(active_tranxs_ != NULL);
 
 1038     if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
 
 1044       sql_print_warning(
"Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
 
 1045                         log_file_name, (ulong)log_file_pos);
 
 1053   return function_exit(kWho, result);
 
 1056 int ReplSemiSyncMaster::skipSlaveReply(
const char *event_buf,
 
 1058                                        const char* skipped_log_file,
 
 1059                                        my_off_t skipped_log_pos)
 
 1061   const char *kWho = 
"ReplSemiSyncMaster::skipSlaveReply";
 
 1063   function_enter(kWho);
 
 1065   assert((
unsigned char)event_buf[1] == kPacketMagicNum);
 
 1066   if ((
unsigned char)event_buf[2] != kPacketFlagSync)
 
 1072   reportReplyBinlog(server_id, skipped_log_file,
 
 1073                     skipped_log_pos, 
true);
 
 1076   return function_exit(kWho, 0);
 
 1079 int ReplSemiSyncMaster::readSlaveReply(
NET *net, uint32 server_id,
 
 1080                                        const char *event_buf)
 
 1082   const char *kWho = 
"ReplSemiSyncMaster::readSlaveReply";
 
 1083   const unsigned char *packet;
 
 1084   char     log_file_name[FN_REFLEN];
 
 1085   my_off_t log_file_pos;
 
 1086   ulong    log_file_len = 0;
 
 1090   struct timespec start_ts= { 0, 0 };
 
 1091   ulong trc_level = trace_level_;
 
 1093   function_enter(kWho);
 
 1095   assert((
unsigned char)event_buf[1] == kPacketMagicNum);
 
 1096   if ((
unsigned char)event_buf[2] != kPacketFlagSync)
 
 1103   if (trc_level & kTraceNetWait)
 
 1104     set_timespec(start_ts, 0);
 
 1111     sql_print_error(
"Semi-sync master failed on net_flush() " 
 1112                     "before waiting for slave reply");
 
 1117   if (trc_level & kTraceDetail)
 
 1118     sql_print_information(
"%s: Wait for replica's reply", kWho);
 
 1127   if (trc_level & kTraceNetWait)
 
 1129     int wait_time = getWaitTime(start_ts);
 
 1132       sql_print_information(
"Assessment of waiting time for " 
 1133                             "readSlaveReply failed.");
 
 1134       rpl_semi_sync_master_timefunc_fails++;
 
 1138       rpl_semi_sync_master_net_wait_num++;
 
 1139       rpl_semi_sync_master_net_wait_time += wait_time;
 
 1143   if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
 
 1145     if (packet_len == packet_error)
 
 1146       sql_print_error(
"Read semi-sync reply network error: %s (errno: %d)",
 
 1149       sql_print_error(
"Read semi-sync reply length error: %s (errno: %d)",
 
 1154   packet = net->read_pos;
 
 1155   if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
 
 1157     sql_print_error(
"Read semi-sync reply magic number error");
 
 1161   log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
 
 1162   log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
 
 1163   if (log_file_len >= FN_REFLEN)
 
 1165     sql_print_error(
"Read semi-sync reply binlog file length too large");
 
 1168   strncpy(log_file_name, (
const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
 
 1169   log_file_name[log_file_len] = 0;
 
 1171   if (trc_level & kTraceDetail)
 
 1172     sql_print_information(
"%s: Got reply (%s, %lu)",
 
 1173                           kWho, log_file_name, (ulong)log_file_pos);
 
 1175   result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
 
 1178   return function_exit(kWho, result);
 
 1182 int ReplSemiSyncMaster::resetMaster()
 
 1184   const char *kWho = 
"ReplSemiSyncMaster::resetMaster";
 
 1187   function_enter(kWho);
 
 1192   state_ = getMasterEnabled()? 1 : 0;
 
 1194   wait_file_name_inited_   = 
false;
 
 1195   reply_file_name_inited_  = 
false;
 
 1196   commit_file_name_inited_ = 
false;
 
 1198   rpl_semi_sync_master_yes_transactions = 0;
 
 1199   rpl_semi_sync_master_no_transactions = 0;
 
 1200   rpl_semi_sync_master_off_times = 0;
 
 1201   rpl_semi_sync_master_timefunc_fails = 0;
 
 1202   rpl_semi_sync_master_wait_sessions = 0;
 
 1203   rpl_semi_sync_master_wait_pos_backtraverse = 0;
 
 1204   rpl_semi_sync_master_trx_wait_num = 0;
 
 1205   rpl_semi_sync_master_trx_wait_time = 0;
 
 1206   rpl_semi_sync_master_net_wait_num = 0;
 
 1207   rpl_semi_sync_master_net_wait_time = 0;
 
 1211   return function_exit(kWho, result);
 
 1214 void ReplSemiSyncMaster::setExportStats()
 
 1218   rpl_semi_sync_master_status           = state_;
 
 1219   rpl_semi_sync_master_avg_trx_wait_time=
 
 1220     ((rpl_semi_sync_master_trx_wait_num) ?
 
 1221      (
unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
 
 1222                      ((
double)rpl_semi_sync_master_trx_wait_num)) : 0);
 
 1223   rpl_semi_sync_master_avg_net_wait_time=
 
 1224     ((rpl_semi_sync_master_net_wait_num) ?
 
 1225      (
unsigned long)((double)rpl_semi_sync_master_net_wait_time /
 
 1226                      ((
double)rpl_semi_sync_master_net_wait_num)) : 0);
 
 1237 static int getWaitTime(
const struct timespec& start_ts)
 
 1239   unsigned long long start_usecs, end_usecs;
 
 1243   start_usecs = timespec_to_usec(&start_ts);
 
 1246   set_timespec(end_ts, 0);
 
 1249   end_usecs = timespec_to_usec(&end_ts);
 
 1251   if (end_usecs < start_usecs)
 
 1254   return (
int)(end_usecs - start_usecs);