22 #include "sql_audit.h"
25 #include "sql_class.h"
32 #define EVENT_QUEUE_INITIAL_SIZE 30
33 #define EVENT_QUEUE_EXTENT 30
37 #define SCHED_FUNC __FUNCTION__
40 #define SCHED_FUNC "<unknown>"
43 #define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__)
44 #define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
66 extern "C" int event_queue_element_compare_q(
void *, uchar *, uchar *);
68 int event_queue_element_compare_q(
void *vptr, uchar* a, uchar *b)
72 my_time_t lhs = left->execute_at;
73 my_time_t rhs = right->execute_at;
75 if (left->status == Event_parse_data::DISABLED)
76 return right->status != Event_parse_data::DISABLED;
78 if (right->status == Event_parse_data::DISABLED)
81 return (lhs < rhs ? -1 : (lhs > rhs ? 1 : 0));
92 Event_queue::Event_queue()
93 :next_activation_at(0),
94 mutex_last_locked_at_line(0),
95 mutex_last_unlocked_at_line(0),
96 mutex_last_attempted_lock_at_line(0),
97 mutex_last_locked_in_func(
"n/a"),
98 mutex_last_unlocked_in_func(
"n/a"),
99 mutex_last_attempted_lock_in_func(
"n/a"),
100 mutex_queue_data_locked(FALSE),
101 mutex_queue_data_attempting_lock(FALSE),
102 waiting_on_cond(FALSE)
104 mysql_mutex_init(key_LOCK_event_queue, &LOCK_event_queue, MY_MUTEX_INIT_FAST);
109 Event_queue::~Event_queue()
134 Event_queue::init_queue(THD *thd)
136 DBUG_ENTER(
"Event_queue::init_queue");
137 DBUG_PRINT(
"enter", (
"this: 0x%lx", (
long)
this));
141 if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 ,
142 0 , event_queue_element_compare_q,
143 NULL, EVENT_QUEUE_EXTENT))
145 sql_print_error(
"Event Scheduler: Can't initialize the execution queue");
167 Event_queue::deinit_queue()
169 DBUG_ENTER(
"Event_queue::deinit_queue");
173 delete_queue(&queue);
202 DBUG_ENTER(
"Event_queue::create_event");
203 DBUG_PRINT(
"enter", (
"thd: 0x%lx et=%s.%s", (
long) thd,
204 new_element->dbname.str, new_element->name.str));
207 new_element->compute_next_execution_time();
208 if (new_element->status != Event_parse_data::ENABLED)
215 DBUG_PRINT(
"info", (
"new event in the queue: 0x%lx", (
long) new_element));
218 *created= (queue_insert_safe(&queue, (uchar *) new_element) == FALSE);
219 dbug_dump_queue(thd->query_start());
223 DBUG_RETURN(!*created);
243 DBUG_ENTER(
"Event_queue::update_event");
244 DBUG_PRINT(
"enter", (
"thd: 0x%lx et=[%s.%s]", (
long) thd, dbname.str, name.str));
246 if ((new_element->status == Event_parse_data::DISABLED) ||
247 (new_element->status == Event_parse_data::SLAVESIDE_DISABLED))
249 DBUG_PRINT(
"info", (
"The event is disabled."));
258 new_element->compute_next_execution_time();
261 find_n_remove_event(dbname, name);
266 DBUG_PRINT(
"info", (
"new event in the queue: 0x%lx", (
long) new_element));
267 queue_insert_safe(&queue, (uchar *) new_element);
271 dbug_dump_queue(thd->query_start());
291 DBUG_ENTER(
"Event_queue::drop_event");
292 DBUG_PRINT(
"enter", (
"thd: 0x%lx db :%s name: %s", (
long) thd,
293 dbname.str, name.str));
296 find_n_remove_event(dbname, name);
297 dbug_dump_queue(thd->query_start());
327 Event_queue::drop_matching_events(THD *thd,
LEX_STRING pattern,
331 DBUG_ENTER(
"Event_queue::drop_matching_events");
332 DBUG_PRINT(
"enter", (
"pattern=%s", pattern.str));
334 while (i < queue.elements)
337 DBUG_PRINT(
"info", (
"[%s.%s]?", et->dbname.str, et->name.str));
338 if (comparator(pattern, et))
346 queue_remove(&queue, i);
379 Event_queue::drop_schema_events(THD *thd,
LEX_STRING schema)
381 DBUG_ENTER(
"Event_queue::drop_schema_events");
383 drop_matching_events(thd, schema, event_basic_db_equal);
406 DBUG_ENTER(
"Event_queue::find_n_remove_event");
408 for (i= 0; i < queue.elements; ++
i)
411 DBUG_PRINT(
"info", (
"[%s.%s]==[%s.%s]?", db.str, name.str,
412 et->dbname.str, et->name.str));
413 if (event_basic_identifier_equal(db, name, et))
415 queue_remove(&queue, i);
439 Event_queue::recalculate_activation_times(THD *thd)
442 DBUG_ENTER(
"Event_queue::recalculate_activation_times");
445 DBUG_PRINT(
"info", (
"%u loaded events to be recalculated", queue.elements));
446 for (i= 0; i < queue.elements; i++)
458 for (i= queue.elements; i > 0; i--)
461 if (element->status != Event_parse_data::DISABLED)
467 queue_remove(&queue, i - 1);
495 Event_queue::empty_queue()
498 DBUG_ENTER(
"Event_queue::empty_queue");
499 DBUG_PRINT(
"enter", (
"Purging the queue. %u element(s)", queue.elements));
500 sql_print_information(
"Event Scheduler: Purging the queue. %u events",
503 for (i= 0; i < queue.elements; ++
i)
508 resize_queue(&queue, 0);
522 Event_queue::dbug_dump_queue(time_t now)
527 DBUG_ENTER(
"Event_queue::dbug_dump_queue");
528 DBUG_PRINT(
"info", (
"Dumping queue . Elements=%u", queue.elements));
529 for (i = 0; i < queue.elements; i++)
532 DBUG_PRINT(
"info", (
"et: 0x%lx name: %s.%s", (
long) et,
533 et->dbname.str, et->name.str));
534 DBUG_PRINT(
"info", (
"exec_at: %lu starts: %lu ends: %lu execs_so_far: %u "
535 "expr: %ld et.exec_at: %ld now: %ld "
536 "(et.exec_at - now): %d if: %d",
537 (
long) et->execute_at, (
long) et->starts,
538 (
long) et->ends, et->execution_count,
539 (
long) et->expression, (
long) et->execute_at,
540 (
long) now, (
int) (et->execute_at - now),
541 et->execute_at <= now));
563 Event_queue::get_top_for_execution_if_time(THD *thd,
568 my_time_t UNINIT_VAR(last_executed);
569 int UNINIT_VAR(status);
570 DBUG_ENTER(
"Event_queue::get_top_for_execution_if_time");
580 DBUG_PRINT(
"info", (
"thd->killed=%d", thd->killed));
587 next_activation_at= 0;
590 mysql_audit_release(thd);
593 cond_wait(thd, NULL, & stage_waiting_on_empty_queue, SCHED_FUNC, __FILE__, __LINE__);
600 thd->set_current_time();
602 next_activation_at= top->execute_at;
603 if (next_activation_at > thd->query_start())
610 set_timespec(top_time, next_activation_at - thd->query_start());
613 mysql_audit_release(thd);
615 cond_wait(thd, &top_time, &stage_waiting_for_next_activation, SCHED_FUNC, __FILE__, __LINE__);
621 (*event_name)->init(top->dbname, top->name))
627 DBUG_PRINT(
"info", (
"Ready for execution"));
628 top->mark_last_executed(thd);
629 if (top->compute_next_execution_time())
630 top->status= Event_parse_data::DISABLED;
631 DBUG_PRINT(
"info", (
"event %s status is %d", top->name.str, top->status));
633 top->execution_count++;
634 (*event_name)->dropped= top->dropped;
640 last_executed= top->last_executed;
643 if (top->status == Event_parse_data::DISABLED)
645 DBUG_PRINT(
"info", (
"removing from the queue"));
646 sql_print_information(
"Event Scheduler: Last execution of %s.%s. %s",
647 top->dbname.str, top->name.str,
648 top->dropped?
"Dropping.":
"");
650 queue_remove(&queue, 0);
653 queue_replaced(&queue);
655 dbug_dump_queue(thd->query_start());
661 DBUG_PRINT(
"info", (
"returning %d et_new: 0x%lx ",
662 ret, (
long) *event_name));
666 DBUG_PRINT(
"info", (
"db: %s name: %s",
667 (*event_name)->dbname.str, (*event_name)->name.str));
671 (*event_name)->dbname, (*event_name)->name,
672 last_executed, (ulonglong) status);
690 Event_queue::lock_data(
const char *func, uint line)
692 DBUG_ENTER(
"Event_queue::lock_data");
693 DBUG_PRINT(
"enter", (
"func=%s line=%u", func, line));
694 mutex_last_attempted_lock_in_func= func;
695 mutex_last_attempted_lock_at_line= line;
696 mutex_queue_data_attempting_lock= TRUE;
698 mutex_last_attempted_lock_in_func=
"";
699 mutex_last_attempted_lock_at_line= 0;
700 mutex_queue_data_attempting_lock= FALSE;
702 mutex_last_locked_in_func= func;
703 mutex_last_locked_at_line= line;
704 mutex_queue_data_locked= TRUE;
721 Event_queue::unlock_data(
const char *func, uint line)
723 DBUG_ENTER(
"Event_queue::unlock_data");
724 DBUG_PRINT(
"enter", (
"func=%s line=%u", func, line));
725 mutex_last_unlocked_at_line= line;
726 mutex_queue_data_locked= FALSE;
727 mutex_last_unlocked_in_func= func;
747 const char *src_func,
const char *src_file, uint src_line)
749 DBUG_ENTER(
"Event_queue::cond_wait");
750 waiting_on_cond= TRUE;
751 mutex_last_unlocked_at_line= src_line;
752 mutex_queue_data_locked= FALSE;
753 mutex_last_unlocked_in_func= src_func;
755 thd->enter_cond(&COND_queue_state, &LOCK_event_queue, stage, NULL, src_func, src_file, src_line);
765 mutex_last_locked_in_func= src_func;
766 mutex_last_locked_at_line= src_line;
767 mutex_queue_data_locked= TRUE;
768 waiting_on_cond= FALSE;
774 thd->exit_cond(NULL, src_func, src_file, src_line);
775 lock_data(src_func, src_line);
789 Event_queue::dump_internal_status()
791 DBUG_ENTER(
"Event_queue::dump_internal_status");
795 puts(
"Event queue status:");
796 printf(
"Element count : %u\n", queue.elements);
797 printf(
"Data locked : %s\n", mutex_queue_data_locked?
"YES":
"NO");
798 printf(
"Attempting lock : %s\n", mutex_queue_data_attempting_lock?
"YES":
"NO");
799 printf(
"LLA : %s:%u\n", mutex_last_locked_in_func,
800 mutex_last_locked_at_line);
801 printf(
"LUA : %s:%u\n", mutex_last_unlocked_in_func,
802 mutex_last_unlocked_at_line);
803 if (mutex_last_attempted_lock_at_line)
804 printf(
"Last lock attempt at: %s:%u\n", mutex_last_attempted_lock_in_func,
805 mutex_last_attempted_lock_at_line);
806 printf(
"WOC : %s\n", waiting_on_cond?
"YES":
"NO");
810 if (time.year != 1970)
811 printf(
"Next activation : %04d-%02d-%02d %02d:%02d:%02d\n",
812 time.year, time.month, time.day, time.hour, time.minute, time.second);
814 printf(
"Next activation : never");