18 #include "sql_binlog.h"
19 #include "sql_parse.h"
22 #include "rpl_info_factory.h"
48 if (fd_event && fd_event->event_type_permutation)
53 DBUG_PRINT(
"info", (
"converting event type %d to %d (%s)",
56 type= fd_event->event_type_permutation[
type];
62 case FORMAT_DESCRIPTION_EVENT:
73 case ROWS_QUERY_LOG_EVENT:
75 case WRITE_ROWS_EVENT:
76 case UPDATE_ROWS_EVENT:
77 case DELETE_ROWS_EVENT:
78 case WRITE_ROWS_EVENT_V1:
79 case UPDATE_ROWS_EVENT_V1:
80 case DELETE_ROWS_EVENT_V1:
81 case PRE_GA_WRITE_ROWS_EVENT:
82 case PRE_GA_UPDATE_ROWS_EVENT:
83 case PRE_GA_DELETE_ROWS_EVENT:
92 my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
106 my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
126 void mysql_client_binlog_statement(THD* thd)
128 DBUG_ENTER(
"mysql_client_binlog_statement");
129 DBUG_PRINT(
"info",(
"binlog base64: '%*s'",
130 (
int) (thd->lex->comment.length < 2048 ?
131 thd->lex->comment.length : 2048),
132 thd->lex->comment.str));
137 size_t coded_len= thd->lex->comment.length;
140 my_error(ER_SYNTAX_ERROR, MYF(0));
143 size_t decoded_len= base64_needed_decoded_length(coded_len);
150 ulonglong thd_options= thd->variables.option_bits;
165 if ((rli= Rpl_info_factory::create_rli(INFO_REPOSITORY_DUMMY, FALSE)))
172 const char *error= 0;
173 char *
buf= (
char *) my_malloc(decoded_len, MYF(MY_WME));
181 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
185 DBUG_ASSERT(rli->belongs_to_client());
187 for (
char const *strptr= thd->lex->comment.str ;
188 strptr < thd->lex->comment.str + thd->lex->comment.length ; )
190 char const *endptr= 0;
191 int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr,
192 MY_BASE64_DECODE_ALLOW_MULTIPLE_CHUNKS);
200 (
"bytes_decoded: %d strptr: 0x%lx endptr: 0x%lx ('%c':%d)",
201 bytes_decoded, (
long) strptr, (
long) endptr, *endptr,
205 if (bytes_decoded < 0)
207 my_error(ER_BASE64_DECODE_ERROR, MYF(0));
210 else if (bytes_decoded == 0)
213 DBUG_ASSERT(bytes_decoded > 0);
214 DBUG_ASSERT(endptr > strptr);
215 coded_len-= endptr - strptr;
229 DBUG_PRINT(
"info",(
"binlog base64 decoded_len: %lu bytes_decoded: %d",
230 (ulong) decoded_len, bytes_decoded));
236 for (
char *bufptr= buf ; bytes_decoded > 0 ; )
242 if (bytes_decoded < EVENT_LEN_OFFSET + 4 ||
243 (event_len= uint4korr(bufptr + EVENT_LEN_OFFSET)) >
244 (uint) bytes_decoded)
246 my_error(ER_SYNTAX_ERROR, MYF(0));
249 DBUG_PRINT(
"info", (
"event_len=%lu, bytes_decoded=%d",
250 event_len, bytes_decoded));
252 if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
255 ev= Log_event::read_log_event(bufptr, event_len, &error,
259 DBUG_PRINT(
"info",(
"binlog base64 err=%s", error));
266 my_error(ER_SYNTAX_ERROR, MYF(0));
270 bytes_decoded -= event_len;
273 DBUG_PRINT(
"info",(
"ev->get_type_code()=%d", ev->get_type_code()));
283 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
284 err= ev->apply_event(rli);
294 if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT &&
295 ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
306 my_error(ER_UNKNOWN_ERROR, MYF(0));
312 DBUG_PRINT(
"info",(
"binlog base64 execution finished successfully"));
318 if ((error || err) && rli->rows_query_ev)
320 delete rli->rows_query_ev;
321 rli->rows_query_ev= NULL;
323 rli->slave_close_thread_tables(thd);
325 thd->variables.option_bits= thd_options;