23 #include "sql_connect.h"
25 #include "global_threads.h"
34 #define SCHED_FUNC __FUNCTION__
37 #define SCHED_FUNC "<unknown>"
40 #define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__)
41 #define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__)
42 #define COND_STATE_WAIT(mythd, abstime, stage) \
43 cond_wait(mythd, abstime, stage, SCHED_FUNC, __FILE__, __LINE__)
45 extern pthread_attr_t connection_attrib;
54 { C_STRING_WITH_LEN(
"INITIALIZED") },
55 { C_STRING_WITH_LEN(
"RUNNING") },
56 { C_STRING_WITH_LEN(
"STOPPING") }
80 DBUG_ENTER(
"evex_print_warnings");
81 if (thd->get_stmt_da()->is_warning_info_empty())
84 char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
85 char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
86 String prefix(prefix_buf,
sizeof(prefix_buf), system_charset_info);
88 prefix.append(
"Event Scheduler: [");
90 prefix.append(et->definer.str, et->definer.length, system_charset_info);
91 prefix.append(
"][", 2);
92 prefix.append(et->dbname.str, et->dbname.length, system_charset_info);
94 prefix.append(et->name.str, et->name.length, system_charset_info);
95 prefix.append(
"] ", 2);
98 thd->get_stmt_da()->sql_conditions();
101 String err_msg(msg_buf,
sizeof(msg_buf), system_charset_info);
104 err_msg.append(prefix);
108 (sql_print_message_handlers[err->
get_level()])(
"%*s", err_msg.length(),
128 post_init_event_thread(THD *thd)
130 (void) init_new_connection_handler_thread();
131 if (init_thr_lock() || thd->store_globals())
136 inc_thread_running();
138 add_global_thread(thd);
153 deinit_event_thread(THD *thd)
155 thd->proc_info=
"Clearing";
156 DBUG_ASSERT(thd->net.buff != 0);
158 DBUG_PRINT(
"exit", (
"Event thread finishing"));
160 dec_thread_running();
161 thd->release_resources();
163 remove_global_thread(thd);
184 pre_init_event_thread(THD* thd)
186 DBUG_ENTER(
"pre_init_event_thread");
187 thd->client_capabilities= 0;
188 thd->security_ctx->master_access= 0;
189 thd->security_ctx->db_access= 0;
190 thd->security_ctx->host_or_ip= (
char*)my_localhost;
192 thd->security_ctx->set_user((
char*)
"event_scheduler");
193 thd->net.read_timeout= slave_net_timeout;
194 thd->slave_thread= 0;
195 thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
196 thd->client_capabilities|= CLIENT_MULTI_RESULTS;
198 thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
206 thd->proc_info=
"Initialized";
210 thd->variables.lock_wait_timeout= LONG_TIMEOUT;
228 event_scheduler_thread(
void *arg)
235 thd->thread_stack= (
char *)&thd;
239 res= post_init_event_thread(thd);
241 DBUG_ENTER(
"event_scheduler_thread");
247 thd->proc_info=
"Clearing";
306 thd->thread_stack= &my_stack;
307 res= post_init_event_thread(thd);
309 DBUG_ENTER(
"Event_worker_thread::run");
310 DBUG_PRINT(
"info", (
"Time is %ld, THD: 0x%lx", (
long) my_time(0), (
long) thd));
318 DBUG_PRINT(
"error", (
"Got error from load_named_event"));
322 thd->enable_slow_log= TRUE;
324 res= job_data.
execute(thd, event->dropped);
326 print_warnings(thd, &job_data);
329 sql_print_information(
"Event Scheduler: "
330 "[%s].[%s.%s] event execution failed.",
331 job_data.definer.str,
332 job_data.dbname.str, job_data.name.str);
334 DBUG_PRINT(
"info", (
"Done with Event %s.%s", event->dbname.str,
338 deinit_event_thread(thd);
344 Event_scheduler::Event_scheduler(
Event_queue *queue_arg)
348 mutex_last_locked_at_line(0),
349 mutex_last_unlocked_at_line(0),
350 mutex_last_locked_in_func(
"n/a"),
351 mutex_last_unlocked_in_func(
"n/a"),
352 mutex_scheduler_data_locked(FALSE),
353 waiting_on_cond(FALSE),
357 &LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
362 Event_scheduler::~Event_scheduler()
392 DBUG_ENTER(
"Event_scheduler::start");
395 DBUG_PRINT(
"info", (
"state before action %s", scheduler_states_names[state].str));
396 if (state > INITIALIZED)
399 DBUG_EXECUTE_IF(
"event_scheduler_thread_create_failure", {
401 Events::opt_event_scheduler= Events::EVENTS_OFF;
405 if (!(new_thd=
new THD))
407 sql_print_error(
"Event Scheduler: Cannot initialize the scheduler thread");
411 pre_init_event_thread(new_thd);
412 new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
413 new_thd->set_command(COM_DAEMON);
422 new_thd->security_ctx->master_access |= SUPER_ACL;
423 new_thd->variables.tx_read_only=
false;
424 new_thd->tx_read_only=
false;
426 scheduler_param_value=
428 scheduler_param_value->thd= new_thd;
429 scheduler_param_value->scheduler=
this;
431 scheduler_thd= new_thd;
432 DBUG_PRINT(
"info", (
"Setting state go RUNNING"));
434 DBUG_PRINT(
"info", (
"Forking new thread for scheduler. THD: 0x%lx", (
long) new_thd));
436 &th, &connection_attrib,
437 event_scheduler_thread,
438 (
void*)scheduler_param_value)))
440 DBUG_PRINT(
"error", (
"cannot create a new thread"));
441 sql_print_error(
"Event scheduler: Failed to start scheduler,"
442 " Can not create thread for event scheduler (errno=%d)",
445 new_thd->proc_info=
"Clearing";
446 DBUG_ASSERT(new_thd->net.buff != 0);
447 net_end(&new_thd->net);
453 delete scheduler_param_value;
476 Event_scheduler::run(THD *thd)
479 DBUG_ENTER(
"Event_scheduler::run");
481 sql_print_information(
"Event Scheduler: scheduler thread started with id %lu",
487 queue->recalculate_activation_times(thd);
494 if (queue->get_top_for_execution_if_time(thd, &event_name))
496 sql_print_information(
"Event Scheduler: "
497 "Serious error during getting next "
498 "event to execute. Stopping");
502 DBUG_PRINT(
"info", (
"get_top_for_execution_if_time returned "
503 "event_name=0x%lx", (
long) event_name));
506 if ((res= execute_top(event_name)))
511 DBUG_ASSERT(thd->killed);
512 DBUG_PRINT(
"info", (
"job_data is NULL, the thread was killed"));
514 DBUG_PRINT(
"info", (
"state=%s", scheduler_states_names[state].str));
518 deinit_event_thread(thd);
521 DBUG_PRINT(
"info", (
"Broadcasting COND_state back to the stoppers"));
547 DBUG_ENTER(
"Event_scheduler::execute_top");
548 if (!(new_thd=
new THD()))
551 pre_init_event_thread(new_thd);
552 new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
553 event_name->thd= new_thd;
554 DBUG_PRINT(
"info", (
"Event %s@%s ready for start",
555 event_name->dbname.str, event_name->name.str));
571 Events::opt_event_scheduler= Events::EVENTS_OFF;
574 sql_print_error(
"Event_scheduler::execute_top: Can not create event worker"
575 " thread (errno=%d). Stopping event scheduler", res);
577 new_thd->proc_info=
"Clearing";
578 DBUG_ASSERT(new_thd->net.buff != 0);
579 net_end(&new_thd->net);
586 DBUG_PRINT(
"info", (
"Event is in THD: 0x%lx", (
long) new_thd));
590 DBUG_PRINT(
"error", (
"Event_scheduler::execute_top() res: %d", res));
611 Event_scheduler::is_running()
614 bool ret= (state == RUNNING);
638 THD *thd= current_thd;
639 DBUG_ENTER(
"Event_scheduler::stop");
640 DBUG_PRINT(
"enter", (
"thd: 0x%lx", (
long) thd));
643 DBUG_PRINT(
"info", (
"state before action %s", scheduler_states_names[state].str));
644 if (state != RUNNING)
647 while (state != INITIALIZED)
648 COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
654 DBUG_PRINT(
"info", (
"Waiting for COND_started_or_stopped from "
655 "the scheduler thread. Current value of state is %s . "
656 "workers count=%d", scheduler_states_names[state].str,
669 DBUG_PRINT(
"info", (
"Scheduler thread has id %lu",
670 scheduler_thd->thread_id));
674 sql_print_information(
"Event Scheduler: Killing the scheduler thread, "
676 scheduler_thd->thread_id);
677 scheduler_thd->awake(THD::KILL_CONNECTION);
681 sql_print_information(
"Event Scheduler: "
682 "Waiting for the scheduler thread to reply");
683 COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
684 }
while (state == STOPPING);
685 DBUG_PRINT(
"info", (
"Scheduler thread has cleaned up. Set state to INIT"));
686 sql_print_information(
"Event Scheduler: Stopped");
701 Event_scheduler::workers_count()
705 DBUG_ENTER(
"Event_scheduler::workers_count");
707 Thread_iterator it= global_thread_list_begin();
708 Thread_iterator end= global_thread_list_end();
709 for (; it != end; ++it)
710 if ((*it)->system_thread == SYSTEM_THREAD_EVENT_WORKER)
713 DBUG_PRINT(
"exit", (
"%d", count));
729 Event_scheduler::lock_data(
const char *func, uint line)
731 DBUG_ENTER(
"Event_scheduler::lock_data");
732 DBUG_PRINT(
"enter", (
"func=%s line=%u", func, line));
734 mutex_last_locked_in_func= func;
735 mutex_last_locked_at_line= line;
736 mutex_scheduler_data_locked= TRUE;
752 Event_scheduler::unlock_data(
const char *func, uint line)
754 DBUG_ENTER(
"Event_scheduler::unlock_data");
755 DBUG_PRINT(
"enter", (
"func=%s line=%u", func, line));
756 mutex_last_unlocked_at_line= line;
757 mutex_scheduler_data_locked= FALSE;
758 mutex_last_unlocked_in_func= func;
778 const char *src_func,
const char *src_file, uint src_line)
780 DBUG_ENTER(
"Event_scheduler::cond_wait");
781 waiting_on_cond= TRUE;
782 mutex_last_unlocked_at_line= src_line;
783 mutex_scheduler_data_locked= FALSE;
784 mutex_last_unlocked_in_func= src_func;
786 thd->enter_cond(&COND_state, &LOCK_scheduler_state, stage,
787 NULL, src_func, src_file, src_line);
789 DBUG_PRINT(
"info", (
"mysql_cond_%swait", abstime?
"timed":
""));
800 thd->exit_cond(NULL, src_func, src_file, src_line);
803 mutex_last_locked_in_func= src_func;
804 mutex_last_locked_at_line= src_line;
805 mutex_scheduler_data_locked= TRUE;
806 waiting_on_cond= FALSE;
819 Event_scheduler::dump_internal_status()
821 DBUG_ENTER(
"Event_scheduler::dump_internal_status");
824 puts(
"Event scheduler status:");
825 printf(
"State : %s\n", scheduler_states_names[state].str);
826 printf(
"Thread id : %lu\n", scheduler_thd? scheduler_thd->thread_id : 0);
827 printf(
"LLA : %s:%u\n", mutex_last_locked_in_func,
828 mutex_last_locked_at_line);
829 printf(
"LUA : %s:%u\n", mutex_last_unlocked_in_func,
830 mutex_last_unlocked_at_line);
831 printf(
"WOC : %s\n", waiting_on_cond?
"YES":
"NO");
832 printf(
"Workers : %u\n", workers_count());
833 printf(
"Executed : %lu\n", (ulong) started_events);
834 printf(
"Data locked: %s\n", mutex_scheduler_data_locked ?
"YES":
"NO");