19 #include "sql_parse.h"
24 #include "sql_table.h"
81 ulong Events::opt_event_scheduler= Events::EVENTS_OFF;
82 bool Events::check_system_tables_error= FALSE;
102 return cs->coll->strnncollsp(cs, (uchar *) s.str,s.length,
103 (uchar *) t.str,t.length, 0);
114 DBUG_ENTER(
"Events::check_if_system_tables_error");
116 if (check_system_tables_error)
118 my_error(ER_EVENTS_DB_ERROR, MYF(0));
148 ulonglong expr= expression;
149 char tmp_buff[128], *end;
150 bool close_quote= TRUE;
155 case INTERVAL_YEAR_MONTH:
158 goto common_1_lev_code;
159 case INTERVAL_DAY_HOUR:
162 goto common_1_lev_code;
163 case INTERVAL_HOUR_MINUTE:
164 case INTERVAL_MINUTE_SECOND:
168 end= longlong10_to_str(expression/multipl, tmp_buff, 10);
169 buf->append(tmp_buff, (uint) (end- tmp_buff));
170 expr= expr - (expr/multipl)*multipl;
172 case INTERVAL_DAY_MINUTE:
174 ulonglong tmp_expr= expr;
178 end= longlong10_to_str(tmp_expr, tmp_buff, 10);
179 buf->append(tmp_buff, (uint) (end- tmp_buff));
182 tmp_expr= expr - tmp_expr*(24*60);
183 end= longlong10_to_str(tmp_expr/60, tmp_buff, 10);
184 buf->append(tmp_buff, (uint) (end- tmp_buff));
186 expr= tmp_expr - (tmp_expr/60)*60;
190 case INTERVAL_HOUR_SECOND:
192 ulonglong tmp_expr= expr;
195 end= longlong10_to_str(tmp_expr/3600, tmp_buff, 10);
196 buf->append(tmp_buff, (uint) (end- tmp_buff));
199 tmp_expr= tmp_expr - (tmp_expr/3600)*3600;
200 end= longlong10_to_str(tmp_expr/60, tmp_buff, 10);
201 buf->append(tmp_buff, (uint) (end- tmp_buff));
203 expr= tmp_expr - (tmp_expr/60)*60;
207 case INTERVAL_DAY_SECOND:
209 ulonglong tmp_expr= expr;
213 end= longlong10_to_str(tmp_expr, tmp_buff, 10);
214 buf->append(tmp_buff, (uint) (end- tmp_buff));
217 tmp_expr= expr - tmp_expr*(24*3600);
218 end= longlong10_to_str(tmp_expr/3600, tmp_buff, 10);
219 buf->append(tmp_buff, (uint) (end- tmp_buff));
222 tmp_expr= tmp_expr - (tmp_expr/3600)*3600;
223 end= longlong10_to_str(tmp_expr/60, tmp_buff, 10);
224 buf->append(tmp_buff, (uint) (end- tmp_buff));
226 expr= tmp_expr - (tmp_expr/60)*60;
230 case INTERVAL_DAY_MICROSECOND:
231 case INTERVAL_HOUR_MICROSECOND:
232 case INTERVAL_MINUTE_MICROSECOND:
233 case INTERVAL_SECOND_MICROSECOND:
234 case INTERVAL_MICROSECOND:
235 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
"MICROSECOND");
238 case INTERVAL_QUARTER:
249 buf->append(separator);
250 end= longlong10_to_str(expr, tmp_buff, 10);
251 buf->append(tmp_buff, (uint) (end- tmp_buff));
272 create_query_string(THD *thd,
String *
buf)
275 if (buf->append(STRING_WITH_LEN(
"CREATE ")))
278 append_definer(thd, buf, &(thd->lex->definer->user), &(thd->lex->definer->host));
280 if (buf->append(thd->lex->stmt_definition_begin,
281 thd->lex->stmt_definition_end -
282 thd->lex->stmt_definition_begin))
310 bool save_binlog_row_based, event_already_exists;
311 DBUG_ENTER(
"Events::create_event");
321 if (parse_data->check_parse_data(thd))
325 DBUG_ASSERT(parse_data->expression || parse_data->execute_at);
327 if (
check_access(thd, EVENT_ACL, parse_data->dbname.str, NULL, NULL, 0, 0))
330 if (check_db_dir_existence(parse_data->dbname.str))
332 my_error(ER_BAD_DB_ERROR, MYF(0), parse_data->dbname.str);
336 if (parse_data->do_not_create)
342 if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row()))
343 thd->clear_current_stmt_binlog_format_row();
346 parse_data->dbname.str, parse_data->name.str))
350 if (!(ret= db_repository->
create_event(thd, parse_data, if_not_exists,
351 &event_already_exists)))
356 if (!event_already_exists)
364 if (!db_repository->
drop_event(thd, parse_data->dbname, parse_data->name,
383 DBUG_ASSERT(thd->query() && thd->query_length());
385 if (create_query_string(thd, &log_query))
387 sql_print_error(
"Event Error: An error occurred while creating query string, "
388 "before writing it into binary log.");
393 thd->add_to_binlog_accessed_dbs(parse_data->dbname.str);
398 ret= write_bin_log(thd, TRUE, log_query.c_ptr(), log_query.length());
403 DBUG_ASSERT(!thd->is_current_stmt_binlog_format_row());
404 if (save_binlog_row_based)
405 thd->set_current_stmt_binlog_format_row();
435 bool save_binlog_row_based;
438 DBUG_ENTER(
"Events::update_event");
443 if (parse_data->check_parse_data(thd) || parse_data->do_not_create)
446 if (
check_access(thd, EVENT_ACL, parse_data->dbname.str, NULL, NULL, 0, 0))
452 if ( !sortcmp_lex_string(parse_data->dbname, *new_dbname,
453 system_charset_info) &&
454 !sortcmp_lex_string(parse_data->name, *new_name,
455 system_charset_info))
457 my_error(ER_EVENT_SAME_NAME, MYF(0));
467 if (
check_access(thd, EVENT_ACL, new_dbname->str, NULL, NULL, 0, 0))
471 if (check_db_dir_existence(new_dbname->str))
473 my_error(ER_BAD_DB_ERROR, MYF(0), new_dbname->str);
482 if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row()))
483 thd->clear_current_stmt_binlog_format_row();
486 parse_data->dbname.str, parse_data->name.str))
491 new_dbname, new_name)))
493 LEX_STRING dbname= new_dbname ? *new_dbname : parse_data->dbname;
510 event_queue->update_event(thd, parse_data->dbname, parse_data->name,
513 DBUG_ASSERT(thd->query() && thd->query_length());
515 thd->add_to_binlog_accessed_dbs(parse_data->dbname.str);
517 thd->add_to_binlog_accessed_dbs(new_dbname->str);
519 ret= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
523 DBUG_ASSERT(!thd->is_current_stmt_binlog_format_row());
524 if (save_binlog_row_based)
525 thd->set_current_stmt_binlog_format_row();
559 bool save_binlog_row_based;
560 DBUG_ENTER(
"Events::drop_event");
565 if (
check_access(thd, EVENT_ACL, dbname.str, NULL, NULL, 0, 0))
572 if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row()))
573 thd->clear_current_stmt_binlog_format_row();
576 dbname.str, name.str))
579 if (!(ret= db_repository->
drop_event(thd, dbname, name, if_exists)))
582 event_queue->drop_event(thd, dbname, name);
584 DBUG_ASSERT(thd->query() && thd->query_length());
586 thd->add_to_binlog_accessed_dbs(dbname.str);
587 ret= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
590 DBUG_ASSERT(!thd->is_current_stmt_binlog_format_row());
591 if (save_binlog_row_based)
592 thd->set_current_stmt_binlog_format_row();
613 DBUG_ENTER(
"Events::drop_schema_events");
614 DBUG_PRINT(
"enter", (
"dropping events from %s", db));
621 event_queue->drop_schema_events(thd, db_lex);
622 db_repository->drop_schema_events(thd, db_lex);
636 char show_str_buf[10 * STRING_BUFFER_USUAL_SIZE];
637 String show_str(show_str_buf,
sizeof(show_str_buf), system_charset_info);
642 DBUG_ENTER(
"send_show_create_event");
645 if (et->get_create_event(thd, &show_str))
650 if (sql_mode_string_representation(thd, et->sql_mode, &sql_mode))
663 field_list.push_back(
666 field_list.push_back(
669 field_list.push_back(
673 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
676 protocol->prepare_for_resend();
678 protocol->
store(et->name.str, et->name.length, system_charset_info);
679 protocol->
store(sql_mode.str, sql_mode.length, system_charset_info);
680 protocol->
store(tz_name->ptr(), tz_name->length(), system_charset_info);
681 protocol->
store(show_str.c_ptr(), show_str.length(),
682 et->creation_ctx->get_client_cs());
683 protocol->
store(et->creation_ctx->get_client_cs()->csname,
684 strlen(et->creation_ctx->get_client_cs()->csname),
685 system_charset_info);
686 protocol->
store(et->creation_ctx->get_connection_cl()->name,
687 strlen(et->creation_ctx->get_connection_cl()->name),
688 system_charset_info);
689 protocol->
store(et->creation_ctx->get_db_cl()->name,
690 strlen(et->creation_ctx->get_db_cl()->name),
691 system_charset_info);
693 if (protocol->write())
718 DBUG_ENTER(
"Events::show_create_event");
719 DBUG_PRINT(
"enter", (
"name: %s@%s", dbname.str, name.str));
724 if (
check_access(thd, EVENT_ACL, dbname.str, NULL, NULL, 0, 0))
740 ret= send_show_create_event(thd, &et, thd->protocol);
767 DBUG_ENTER(
"Events::fill_schema_events");
776 if (thd->lex->sql_command == SQLCOM_SHOW_EVENTS)
778 DBUG_ASSERT(thd->lex->select_lex.db);
779 if (!is_infoschema_db(thd->lex->select_lex.db) &&
783 db= thd->lex->select_lex.db;
814 DBUG_ENTER(
"Events::init");
817 if (!(thd=
new THD()))
827 thd->thread_stack= (
char*) &thd;
828 thd->store_globals();
860 if (! opt_noacl_or_bootstrap)
862 sql_print_error(
"Event Scheduler: An error occurred when initializing "
863 "system tables. Disabling the Event Scheduler.");
864 check_system_tables_error= TRUE;
868 opt_event_scheduler= EVENTS_DISABLED;
877 if (opt_event_scheduler == Events::EVENTS_DISABLED)
881 DBUG_ASSERT(opt_event_scheduler == Events::EVENTS_ON ||
882 opt_event_scheduler == Events::EVENTS_OFF);
891 if (event_queue->init_queue(thd) || load_events_from_db(thd) ||
892 (opt_event_scheduler == EVENTS_ON && scheduler->
start(&err_no)))
894 sql_print_error(
"Event Scheduler: Error while loading from disk.");
903 delete db_repository;
909 my_pthread_setspecific_ptr(THR_THD, NULL);
927 DBUG_ENTER(
"Events::deinit");
929 if (opt_event_scheduler != EVENTS_DISABLED)
937 delete db_repository;
943 #ifdef HAVE_PSI_INTERFACE
944 PSI_mutex_key key_LOCK_event_queue,
945 key_event_scheduler_LOCK_scheduler_state;
947 static PSI_mutex_info all_events_mutexes[]=
949 { &key_LOCK_event_queue,
"LOCK_event_queue", PSI_FLAG_GLOBAL},
950 { &key_event_scheduler_LOCK_scheduler_state,
"Event_scheduler::LOCK_scheduler_state", PSI_FLAG_GLOBAL}
953 PSI_cond_key key_event_scheduler_COND_state, key_COND_queue_state;
955 static PSI_cond_info all_events_conds[]=
957 { &key_event_scheduler_COND_state,
"Event_scheduler::COND_state", PSI_FLAG_GLOBAL},
958 { &key_COND_queue_state,
"COND_queue_state", PSI_FLAG_GLOBAL},
961 PSI_thread_key key_thread_event_scheduler, key_thread_event_worker;
963 static PSI_thread_info all_events_threads[]=
965 { &key_thread_event_scheduler,
"event_scheduler", PSI_FLAG_GLOBAL},
966 { &key_thread_event_worker,
"event_worker", 0}
970 PSI_stage_info stage_waiting_on_empty_queue= { 0,
"Waiting on empty queue", 0};
971 PSI_stage_info stage_waiting_for_next_activation= { 0,
"Waiting for next activation", 0};
972 PSI_stage_info stage_waiting_for_scheduler_to_stop= { 0,
"Waiting for the scheduler to stop", 0};
974 #ifdef HAVE_PSI_INTERFACE
977 & stage_waiting_on_empty_queue,
978 & stage_waiting_for_next_activation,
979 & stage_waiting_for_scheduler_to_stop
982 static void init_events_psi_keys(
void)
984 const char* category=
"sql";
987 count= array_elements(all_events_mutexes);
990 count= array_elements(all_events_conds);
993 count= array_elements(all_events_threads);
996 count= array_elements(all_events_stages);
1013 #ifdef HAVE_PSI_INTERFACE
1014 init_events_psi_keys();
1030 Events::dump_internal_status()
1032 DBUG_ENTER(
"Events::dump_internal_status");
1033 puts(
"\n\n\nEvents status:");
1034 puts(
"LLA = Last Locked At LUA = Last Unlocked At");
1035 puts(
"WOC = Waiting On Condition DL = Data Locked");
1042 if (opt_event_scheduler == EVENTS_DISABLED)
1043 puts(
"The Event Scheduler is disabled");
1046 scheduler->dump_internal_status();
1047 event_queue->dump_internal_status();
1054 bool Events::start(
int *err_no)
1056 return scheduler->
start(err_no);
1061 return scheduler->
stop();
1083 Events::load_events_from_db(THD *thd)
1089 ulong saved_master_access;
1091 DBUG_ENTER(
"Events::load_events_from_db");
1092 DBUG_PRINT(
"enter", (
"thd: 0x%lx", (
long) thd));
1103 saved_master_access= thd->security_ctx->master_access;
1104 thd->security_ctx->master_access |= SUPER_ACL;
1105 bool save_tx_read_only= thd->tx_read_only;
1106 thd->tx_read_only=
false;
1110 thd->tx_read_only= save_tx_read_only;
1111 thd->security_ctx->master_access= saved_master_access;
1115 sql_print_error(
"Event Scheduler: Failed to open table mysql.event");
1119 if (init_read_record(&read_record_info, thd, table, NULL, 0, 1, FALSE))
1121 sql_print_error(
"Event Scheduler: Error while starting read of mysql.event");
1124 while (!(read_record_info.read_record(&read_record_info)))
1127 bool created, dropped;
1132 DBUG_PRINT(
"info", (
"Loading event from row."));
1136 sql_print_error(
"Event Scheduler: "
1137 "Error while loading events from mysql.event. "
1138 "The table probably contains bad data or is corrupted");
1148 dropped= et->dropped;
1168 int rc= table->file->ha_delete_row(table->record[0]);
1176 sql_print_information(
"Event Scheduler: Loaded %d event%s",
1177 count, (count == 1) ?
"" :
"s");
1181 end_read_record(&read_record_info);