19 #include "semisync_master.h"
21 #define TIME_THOUSAND 1000
22 #define TIME_MILLION 1000000
23 #define TIME_BILLION 1000000000
26 char rpl_semi_sync_master_enabled;
27 unsigned long rpl_semi_sync_master_timeout;
28 unsigned long rpl_semi_sync_master_trace_level;
29 char rpl_semi_sync_master_status = 0;
30 unsigned long rpl_semi_sync_master_yes_transactions = 0;
31 unsigned long rpl_semi_sync_master_no_transactions = 0;
32 unsigned long rpl_semi_sync_master_off_times = 0;
33 unsigned long rpl_semi_sync_master_timefunc_fails = 0;
34 unsigned long rpl_semi_sync_master_wait_timeouts = 0;
35 unsigned long rpl_semi_sync_master_wait_sessions = 0;
36 unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
37 unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
38 unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
39 unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
40 unsigned long long rpl_semi_sync_master_net_wait_num = 0;
41 unsigned long rpl_semi_sync_master_clients = 0;
42 unsigned long long rpl_semi_sync_master_net_wait_time = 0;
43 unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
44 char rpl_semi_sync_master_wait_no_slave = 1;
47 static int getWaitTime(
const struct timespec& start_ts);
49 static unsigned long long timespec_to_usec(
const struct timespec *ts)
52 return (
unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
54 return ts->tv.i64 / 10;
65 unsigned long trace_level)
66 :
Trace(trace_level), allocator_(max_connections),
67 num_entries_(max_connections << 1),
78 for (
int idx = 0; idx < num_entries_; ++idx)
81 sql_print_information(
"Semi-sync replication initialized for transactions.");
84 ActiveTranx::~ActiveTranx()
91 unsigned int ActiveTranx::calc_hash(
const unsigned char *key,
94 unsigned int nr = 1, nr2 = 4;
99 nr ^= (((nr & 63)+nr2)*((
unsigned int) (
unsigned char) *key++))+ (nr << 8);
102 return((
unsigned int) nr);
105 unsigned int ActiveTranx::get_hash_value(
const char *log_file_name,
106 my_off_t log_file_pos)
108 unsigned int hash1 = calc_hash((
const unsigned char *)log_file_name,
109 strlen(log_file_name));
110 unsigned int hash2 = calc_hash((
const unsigned char *)(&log_file_pos),
111 sizeof(log_file_pos));
113 return (hash1 + hash2) % num_entries_;
116 int ActiveTranx::compare(
const char *log_file_name1, my_off_t log_file_pos1,
117 const char *log_file_name2, my_off_t log_file_pos2)
119 int cmp = strcmp(log_file_name1, log_file_name2);
124 if (log_file_pos1 > log_file_pos2)
126 else if (log_file_pos1 < log_file_pos2)
131 int ActiveTranx::insert_tranx_node(
const char *log_file_name,
132 my_off_t log_file_pos)
134 const char *kWho =
"ActiveTranx:insert_tranx_node";
137 unsigned int hash_val;
139 function_enter(kWho);
144 sql_print_error(
"%s: transaction node allocation failed for: (%s, %lu)",
145 kWho, log_file_name, (
unsigned long)log_file_pos);
151 strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
152 ins_node->log_name_[FN_REFLEN-1] = 0;
153 ins_node->log_pos_ = log_file_pos;
158 trx_front_ = trx_rear_ = ins_node;
162 int cmp = compare(ins_node, trx_rear_);
168 trx_rear_->next_ = ins_node;
169 trx_rear_ = ins_node;
176 sql_print_error(
"%s: binlog write out-of-order, tail (%s, %lu), "
177 "new node (%s, %lu)", kWho,
178 trx_rear_->log_name_, (
unsigned long)trx_rear_->log_pos_,
179 ins_node->log_name_, (
unsigned long)ins_node->log_pos_);
185 hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
186 ins_node->hash_next_ = trx_htb_[hash_val];
187 trx_htb_[hash_val] = ins_node;
189 if (trace_level_ & kTraceDetail)
190 sql_print_information(
"%s: insert (%s, %lu) in entry(%u)", kWho,
191 ins_node->log_name_, (
unsigned long)ins_node->log_pos_,
195 return function_exit(kWho, result);
198 bool ActiveTranx::is_tranx_end_pos(
const char *log_file_name,
199 my_off_t log_file_pos)
201 const char *kWho =
"ActiveTranx::is_tranx_end_pos";
202 function_enter(kWho);
204 unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
207 while (entry != NULL)
209 if (compare(entry, log_file_name, log_file_pos) == 0)
212 entry = entry->hash_next_;
215 if (trace_level_ & kTraceDetail)
216 sql_print_information(
"%s: probe (%s, %lu) in entry(%u)", kWho,
217 log_file_name, (
unsigned long)log_file_pos, hash_val);
219 function_exit(kWho, (entry != NULL));
220 return (entry != NULL);
223 int ActiveTranx::clear_active_tranx_nodes(
const char *log_file_name,
224 my_off_t log_file_pos)
226 const char *kWho =
"ActiveTranx::::clear_active_tranx_nodes";
229 function_enter(kWho);
231 if (log_file_name != NULL)
233 new_front = trx_front_;
237 if (compare(new_front, log_file_name, log_file_pos) > 0)
239 new_front = new_front->next_;
248 if (new_front == NULL)
253 memset(trx_htb_, 0, num_entries_ *
sizeof(
TranxNode *));
257 if (trx_front_ != NULL)
263 if (trace_level_ & kTraceDetail)
264 sql_print_information(
"%s: cleared all nodes", kWho);
266 else if (new_front != trx_front_)
272 curr_node = trx_front_;
273 while (curr_node != new_front)
275 next_node = curr_node->next_;
279 unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
280 TranxNode **hash_ptr = &(trx_htb_[hash_val]);
281 while ((*hash_ptr) != NULL)
283 if ((*hash_ptr) == curr_node)
285 (*hash_ptr) = curr_node->hash_next_;
288 hash_ptr = &((*hash_ptr)->hash_next_);
291 curr_node = next_node;
294 trx_front_ = new_front;
297 if (trace_level_ & kTraceDetail)
298 sql_print_information(
"%s: cleared %d nodes back until pos (%s, %lu)",
300 trx_front_->log_name_, (
unsigned long)trx_front_->log_pos_);
303 return function_exit(kWho, 0);
330 ReplSemiSyncMaster::ReplSemiSyncMaster()
331 : active_tranxs_(NULL),
333 reply_file_name_inited_(false),
335 wait_file_name_inited_(false),
337 master_enabled_(false),
341 strcpy(reply_file_name_,
"");
342 strcpy(wait_file_name_,
"");
345 int ReplSemiSyncMaster::initObject()
348 const char *kWho =
"ReplSemiSyncMaster::initObject";
352 fprintf(stderr,
"%s called twice\n", kWho);
358 setWaitTimeout(rpl_semi_sync_master_timeout);
359 setTraceLevel(rpl_semi_sync_master_trace_level);
363 &LOCK_binlog_, MY_MUTEX_INIT_FAST);
365 &COND_binlog_send_, NULL);
367 if (rpl_semi_sync_master_enabled)
368 result = enableMaster();
370 result = disableMaster();
375 int ReplSemiSyncMaster::enableMaster()
382 if (!getMasterEnabled())
384 active_tranxs_ =
new ActiveTranx(&LOCK_binlog_, trace_level_);
385 if (active_tranxs_ != NULL)
387 commit_file_name_inited_ =
false;
388 reply_file_name_inited_ =
false;
389 wait_file_name_inited_ =
false;
391 set_master_enabled(
true);
393 sql_print_information(
"Semi-sync replication enabled on the master.");
397 sql_print_error(
"Cannot allocate memory to enable semi-sync on the master.");
407 int ReplSemiSyncMaster::disableMaster()
412 if (getMasterEnabled())
419 assert(active_tranxs_ != NULL);
420 delete active_tranxs_;
421 active_tranxs_ = NULL;
423 reply_file_name_inited_ =
false;
424 wait_file_name_inited_ =
false;
425 commit_file_name_inited_ =
false;
427 set_master_enabled(
false);
428 sql_print_information(
"Semi-sync replication disabled on the master.");
436 ReplSemiSyncMaster::~ReplSemiSyncMaster()
444 delete active_tranxs_;
447 void ReplSemiSyncMaster::lock()
452 void ReplSemiSyncMaster::unlock()
457 void ReplSemiSyncMaster::cond_broadcast()
462 int ReplSemiSyncMaster::cond_timewait(
struct timespec *wait_time)
464 const char *kWho =
"ReplSemiSyncMaster::cond_timewait()";
467 function_enter(kWho);
469 &LOCK_binlog_, wait_time);
470 return function_exit(kWho, wait_res);
473 void ReplSemiSyncMaster::add_slave()
476 rpl_semi_sync_master_clients++;
480 void ReplSemiSyncMaster::remove_slave()
483 rpl_semi_sync_master_clients--;
489 if (!rpl_semi_sync_master_wait_no_slave &&
490 rpl_semi_sync_master_clients == 0)
495 bool ReplSemiSyncMaster::is_semi_sync_slave()
499 get_user_var_int(
"rpl_semi_sync_slave", &val, &null_value);
503 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
504 const char *log_file_name,
505 my_off_t log_file_pos,
508 const char *kWho =
"ReplSemiSyncMaster::reportReplyBinlog";
510 bool can_release_threads =
false;
511 bool need_copy_send_pos =
true;
513 if (!(getMasterEnabled()))
516 function_enter(kWho);
521 if (!getMasterEnabled())
526 try_switch_on(server_id, log_file_name, log_file_pos);
534 if (reply_file_name_inited_)
537 reply_file_name_, reply_file_pos_);
550 need_copy_send_pos =
false;
554 if (need_copy_send_pos)
556 strcpy(reply_file_name_, log_file_name);
557 reply_file_pos_ = log_file_pos;
558 reply_file_name_inited_ =
true;
561 assert(active_tranxs_ != NULL);
562 active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
564 if (trace_level_ & kTraceDetail)
567 sql_print_information(
"%s: Got reply at (%s, %lu)", kWho,
568 log_file_name, (
unsigned long)log_file_pos);
570 sql_print_information(
"%s: Transaction skipped at (%s, %lu)", kWho,
571 log_file_name, (
unsigned long)log_file_pos);
575 if (rpl_semi_sync_master_wait_sessions > 0)
581 wait_file_name_, wait_file_pos_);
587 can_release_threads =
true;
588 wait_file_name_inited_ =
false;
595 if (can_release_threads)
597 if (trace_level_ & kTraceDetail)
598 sql_print_information(
"%s: signal all waiting threads.", kWho);
603 return function_exit(kWho, 0);
606 int ReplSemiSyncMaster::commitTrx(
const char* trx_wait_binlog_name,
607 my_off_t trx_wait_binlog_pos)
609 const char *kWho =
"ReplSemiSyncMaster::commitTrx";
611 function_enter(kWho);
613 if (getMasterEnabled() && trx_wait_binlog_name)
620 set_timespec(start_ts, 0);
626 THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
627 & stage_waiting_for_semi_sync_ack_from_slave,
631 if (!getMasterEnabled() || !is_on())
634 if (trace_level_ & kTraceDetail)
636 sql_print_information(
"%s: wait pos (%s, %lu), repl(%d)\n", kWho,
637 trx_wait_binlog_name, (
unsigned long)trx_wait_binlog_pos,
643 abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
644 abstime.max_timeout_msec= (long)wait_timeout_;
646 abstime.tv_sec = start_ts.tv_sec + wait_timeout_ / TIME_THOUSAND;
647 abstime.tv_nsec = start_ts.tv_nsec +
648 (wait_timeout_ % TIME_THOUSAND) * TIME_MILLION;
649 if (abstime.tv_nsec >= TIME_BILLION)
652 abstime.tv_nsec -= TIME_BILLION;
658 if (reply_file_name_inited_)
661 trx_wait_binlog_name, trx_wait_binlog_pos);
667 if (trace_level_ & kTraceDetail)
668 sql_print_information(
"%s: Binlog reply is ahead (%s, %lu),",
669 kWho, reply_file_name_, (
unsigned long)reply_file_pos_);
677 if (wait_file_name_inited_)
680 wait_file_name_, wait_file_pos_);
684 strcpy(wait_file_name_, trx_wait_binlog_name);
685 wait_file_pos_ = trx_wait_binlog_pos;
687 rpl_semi_sync_master_wait_pos_backtraverse++;
688 if (trace_level_ & kTraceDetail)
689 sql_print_information(
"%s: move back wait position (%s, %lu),",
690 kWho, wait_file_name_, (
unsigned long)wait_file_pos_);
695 strcpy(wait_file_name_, trx_wait_binlog_name);
696 wait_file_pos_ = trx_wait_binlog_pos;
697 wait_file_name_inited_ =
true;
699 if (trace_level_ & kTraceDetail)
700 sql_print_information(
"%s: init wait position (%s, %lu),",
701 kWho, wait_file_name_, (
unsigned long)wait_file_pos_);
712 rpl_semi_sync_master_wait_sessions++;
714 if (trace_level_ & kTraceDetail)
715 sql_print_information(
"%s: wait %lu ms for binlog sent (%s, %lu)",
717 wait_file_name_, (
unsigned long)wait_file_pos_);
719 wait_result = cond_timewait(&abstime);
720 rpl_semi_sync_master_wait_sessions--;
722 if (wait_result != 0)
725 sql_print_warning(
"Timeout waiting for reply of binlog (file: %s, pos: %lu), "
726 "semi-sync up to file %s, position %lu.",
727 trx_wait_binlog_name, (
unsigned long)trx_wait_binlog_pos,
728 reply_file_name_, (
unsigned long)reply_file_pos_);
729 rpl_semi_sync_master_wait_timeouts++;
738 wait_time = getWaitTime(start_ts);
741 if (trace_level_ & kTraceGeneral)
743 sql_print_information(
"Assessment of waiting time for commitTrx "
744 "failed at wait position (%s, %lu)",
745 trx_wait_binlog_name,
746 (
unsigned long)trx_wait_binlog_pos);
748 rpl_semi_sync_master_timefunc_fails++;
752 rpl_semi_sync_master_trx_wait_num++;
753 rpl_semi_sync_master_trx_wait_time += wait_time;
763 assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
764 trx_wait_binlog_pos));
768 rpl_semi_sync_master_yes_transactions++;
770 rpl_semi_sync_master_no_transactions++;
774 THD_EXIT_COND(NULL, & old_stage);
777 return function_exit(kWho, 0);
798 int ReplSemiSyncMaster::switch_off()
800 const char *kWho =
"ReplSemiSyncMaster::switch_off";
803 function_enter(kWho);
807 assert(active_tranxs_ != NULL);
808 result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
810 rpl_semi_sync_master_off_times++;
811 wait_file_name_inited_ =
false;
812 reply_file_name_inited_ =
false;
813 sql_print_information(
"Semi-sync replication switched OFF.");
816 return function_exit(kWho, result);
819 int ReplSemiSyncMaster::try_switch_on(
int server_id,
820 const char *log_file_name,
821 my_off_t log_file_pos)
823 const char *kWho =
"ReplSemiSyncMaster::try_switch_on";
824 bool semi_sync_on =
false;
826 function_enter(kWho);
834 if (commit_file_name_inited_)
837 commit_file_name_, commit_file_pos_);
838 semi_sync_on = (cmp >= 0);
850 sql_print_information(
"Semi-sync replication switched ON with slave (server_id: %d) "
852 server_id, log_file_name,
853 (
unsigned long)log_file_pos);
856 return function_exit(kWho, 0);
859 int ReplSemiSyncMaster::reserveSyncHeader(
unsigned char *header,
862 const char *kWho =
"ReplSemiSyncMaster::reserveSyncHeader";
863 function_enter(kWho);
866 if (!is_semi_sync_slave())
873 if (
sizeof(kSyncHeader) > size)
875 sql_print_warning(
"No enough space in the packet "
876 "for semi-sync extra header, "
877 "semi-sync replication disabled");
885 memcpy(header, kSyncHeader,
sizeof(kSyncHeader));
886 hlen=
sizeof(kSyncHeader);
888 return function_exit(kWho, hlen);
891 int ReplSemiSyncMaster::updateSyncHeader(
unsigned char *packet,
892 const char *log_file_name,
893 my_off_t log_file_pos,
896 const char *kWho =
"ReplSemiSyncMaster::updateSyncHeader";
903 if (!getMasterEnabled() || !is_semi_sync_slave())
909 function_enter(kWho);
914 if (!getMasterEnabled())
925 if (reply_file_name_inited_)
928 reply_file_name_, reply_file_pos_);
938 if (wait_file_name_inited_)
941 wait_file_name_, wait_file_pos_);
956 assert(active_tranxs_ != NULL);
957 sync = active_tranxs_->is_tranx_end_pos(log_file_name,
963 if (commit_file_name_inited_)
966 commit_file_name_, commit_file_pos_);
975 if (trace_level_ & kTraceDetail)
976 sql_print_information(
"%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
977 kWho, server_id, log_file_name,
978 (
unsigned long)log_file_pos, sync, (
int)is_on());
988 (packet)[2] = kPacketFlagSync;
991 return function_exit(kWho, 0);
994 int ReplSemiSyncMaster::writeTranxInBinlog(
const char* log_file_name,
995 my_off_t log_file_pos)
997 const char *kWho =
"ReplSemiSyncMaster::writeTranxInBinlog";
1000 function_enter(kWho);
1005 if (!getMasterEnabled())
1015 if (commit_file_name_inited_)
1018 commit_file_name_, commit_file_pos_);
1022 strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1023 commit_file_name_[FN_REFLEN-1] = 0;
1024 commit_file_pos_ = log_file_pos;
1029 strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1030 commit_file_name_[FN_REFLEN-1] = 0;
1031 commit_file_pos_ = log_file_pos;
1032 commit_file_name_inited_ =
true;
1037 assert(active_tranxs_ != NULL);
1038 if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
1044 sql_print_warning(
"Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1045 log_file_name, (ulong)log_file_pos);
1053 return function_exit(kWho, result);
1056 int ReplSemiSyncMaster::skipSlaveReply(
const char *event_buf,
1058 const char* skipped_log_file,
1059 my_off_t skipped_log_pos)
1061 const char *kWho =
"ReplSemiSyncMaster::skipSlaveReply";
1063 function_enter(kWho);
1065 assert((
unsigned char)event_buf[1] == kPacketMagicNum);
1066 if ((
unsigned char)event_buf[2] != kPacketFlagSync)
1072 reportReplyBinlog(server_id, skipped_log_file,
1073 skipped_log_pos,
true);
1076 return function_exit(kWho, 0);
1079 int ReplSemiSyncMaster::readSlaveReply(
NET *net, uint32 server_id,
1080 const char *event_buf)
1082 const char *kWho =
"ReplSemiSyncMaster::readSlaveReply";
1083 const unsigned char *packet;
1084 char log_file_name[FN_REFLEN];
1085 my_off_t log_file_pos;
1086 ulong log_file_len = 0;
1090 struct timespec start_ts= { 0, 0 };
1091 ulong trc_level = trace_level_;
1093 function_enter(kWho);
1095 assert((
unsigned char)event_buf[1] == kPacketMagicNum);
1096 if ((
unsigned char)event_buf[2] != kPacketFlagSync)
1103 if (trc_level & kTraceNetWait)
1104 set_timespec(start_ts, 0);
1111 sql_print_error(
"Semi-sync master failed on net_flush() "
1112 "before waiting for slave reply");
1117 if (trc_level & kTraceDetail)
1118 sql_print_information(
"%s: Wait for replica's reply", kWho);
1127 if (trc_level & kTraceNetWait)
1129 int wait_time = getWaitTime(start_ts);
1132 sql_print_information(
"Assessment of waiting time for "
1133 "readSlaveReply failed.");
1134 rpl_semi_sync_master_timefunc_fails++;
1138 rpl_semi_sync_master_net_wait_num++;
1139 rpl_semi_sync_master_net_wait_time += wait_time;
1143 if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
1145 if (packet_len == packet_error)
1146 sql_print_error(
"Read semi-sync reply network error: %s (errno: %d)",
1149 sql_print_error(
"Read semi-sync reply length error: %s (errno: %d)",
1154 packet = net->read_pos;
1155 if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
1157 sql_print_error(
"Read semi-sync reply magic number error");
1161 log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
1162 log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
1163 if (log_file_len >= FN_REFLEN)
1165 sql_print_error(
"Read semi-sync reply binlog file length too large");
1168 strncpy(log_file_name, (
const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
1169 log_file_name[log_file_len] = 0;
1171 if (trc_level & kTraceDetail)
1172 sql_print_information(
"%s: Got reply (%s, %lu)",
1173 kWho, log_file_name, (ulong)log_file_pos);
1175 result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
1178 return function_exit(kWho, result);
1182 int ReplSemiSyncMaster::resetMaster()
1184 const char *kWho =
"ReplSemiSyncMaster::resetMaster";
1187 function_enter(kWho);
1192 state_ = getMasterEnabled()? 1 : 0;
1194 wait_file_name_inited_ =
false;
1195 reply_file_name_inited_ =
false;
1196 commit_file_name_inited_ =
false;
1198 rpl_semi_sync_master_yes_transactions = 0;
1199 rpl_semi_sync_master_no_transactions = 0;
1200 rpl_semi_sync_master_off_times = 0;
1201 rpl_semi_sync_master_timefunc_fails = 0;
1202 rpl_semi_sync_master_wait_sessions = 0;
1203 rpl_semi_sync_master_wait_pos_backtraverse = 0;
1204 rpl_semi_sync_master_trx_wait_num = 0;
1205 rpl_semi_sync_master_trx_wait_time = 0;
1206 rpl_semi_sync_master_net_wait_num = 0;
1207 rpl_semi_sync_master_net_wait_time = 0;
1211 return function_exit(kWho, result);
1214 void ReplSemiSyncMaster::setExportStats()
1218 rpl_semi_sync_master_status = state_;
1219 rpl_semi_sync_master_avg_trx_wait_time=
1220 ((rpl_semi_sync_master_trx_wait_num) ?
1221 (
unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
1222 ((
double)rpl_semi_sync_master_trx_wait_num)) : 0);
1223 rpl_semi_sync_master_avg_net_wait_time=
1224 ((rpl_semi_sync_master_net_wait_num) ?
1225 (
unsigned long)((double)rpl_semi_sync_master_net_wait_time /
1226 ((
double)rpl_semi_sync_master_net_wait_num)) : 0);
1237 static int getWaitTime(
const struct timespec& start_ts)
1239 unsigned long long start_usecs, end_usecs;
1243 start_usecs = timespec_to_usec(&start_ts);
1246 set_timespec(end_ts, 0);
1249 end_usecs = timespec_to_usec(&end_ts);
1251 if (end_usecs < start_usecs)
1254 return (
int)(end_usecs - start_usecs);