374 #define MYSQL_SERVER 1
376 #include "sql_servers.h"
377 #include "sql_class.h"
378 #include "sql_analyse.h"
379 #include <mysql/plugin.h>
381 #include "ha_federated.h"
382 #include "probes_mysql.h"
384 #include "m_string.h"
387 #include <mysql/plugin.h>
395 static HASH federated_open_tables;
397 static char ident_quote_char=
'`';
399 static char value_quote_char=
'\'';
401 static const int bulk_padding= 64;
404 static const uint sizeof_trailing_comma=
sizeof(
", ") - 1;
405 static const uint sizeof_trailing_closeparen=
sizeof(
") ") - 1;
406 static const uint sizeof_trailing_and=
sizeof(
" AND ") - 1;
407 static const uint sizeof_trailing_where=
sizeof(
" WHERE ") - 1;
413 static int federated_commit(
handlerton *hton, THD *thd,
bool all);
414 static int federated_rollback(
handlerton *hton, THD *thd,
bool all);
428 static uchar *federated_get_key(
FEDERATED_SHARE *share,
size_t *length,
429 my_bool not_used __attribute__ ((unused)))
431 *length= share->share_key_length;
432 return (uchar*) share->share_key;
435 #ifdef HAVE_PSI_INTERFACE
436 static PSI_mutex_key fe_key_mutex_federated, fe_key_mutex_FEDERATED_SHARE_mutex;
438 static PSI_mutex_info all_federated_mutexes[]=
440 { &fe_key_mutex_federated,
"federated", PSI_FLAG_GLOBAL},
441 { &fe_key_mutex_FEDERATED_SHARE_mutex,
"FEDERATED_SHARE::mutex", 0}
444 static void init_federated_psi_keys(
void)
446 const char* category=
"federated";
449 count= array_elements(all_federated_mutexes);
466 int federated_db_init(
void *p)
468 DBUG_ENTER(
"federated_db_init");
470 #ifdef HAVE_PSI_INTERFACE
471 init_federated_psi_keys();
475 federated_hton->state= SHOW_OPTION_YES;
476 federated_hton->db_type= DB_TYPE_FEDERATED_DB;
477 federated_hton->commit= federated_commit;
478 federated_hton->rollback= federated_rollback;
479 federated_hton->create= federated_create_handler;
480 federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION;
486 federated_hton->commit= 0;
487 federated_hton->rollback= 0;
490 &federated_mutex, MY_MUTEX_INIT_FAST))
492 if (!my_hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
493 (my_hash_get_key) federated_get_key, 0, 0))
514 int federated_done(
void *p)
516 my_hash_free(&federated_open_tables);
539 static bool append_ident(
String *
string,
const char *
name,
size_t length,
540 const char quote_char)
544 const char *name_end;
545 DBUG_ENTER(
"append_ident");
549 string->reserve((uint) length * 2 + 2);
550 if ((result= string->append("e_char, 1, system_charset_info)))
553 for (name_end= name+length; name < name_end; name+= clen)
555 uchar c= *(uchar *) name;
556 if (!(clen= my_mbcharlen(system_charset_info, c)))
558 if (clen == 1 && c == (uchar) quote_char &&
559 (result=
string->append("e_char, 1, system_charset_info)))
561 if ((result= string->append(name, clen, string->charset())))
564 result=
string->append("e_char, 1, system_charset_info);
567 result=
string->append(name, (uint) length, system_charset_info);
576 char buf[FEDERATED_QUERY_BUFFER_SIZE];
578 DBUG_ENTER(
"ha_federated parse_url_error");
580 buf_len= min<size_t>(table->s->connect_string.length,
581 FEDERATED_QUERY_BUFFER_SIZE-1);
582 strmake(buf, table->s->connect_string.str, buf_len);
583 my_error(error_num, MYF(0), buf);
584 DBUG_RETURN(error_num);
594 int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
596 DBUG_ENTER(
"ha_federated::get_connection");
603 get_server_by_name(mem_root, share->connection_string, &server_buffer)))
605 DBUG_PRINT(
"info", (
"get_server_by_name returned > 0 error condition!"));
606 error_num= ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE;
609 DBUG_PRINT(
"info", (
"get_server_by_name returned server at %lx",
610 (
long unsigned int) server));
619 share->server_name_length= server->server_name_length;
620 share->server_name= server->server_name;
621 share->username= server->username;
622 share->password= server->password;
623 share->database= server->db;
624 #ifndef I_AM_PARANOID
625 share->port= server->port > 0 && server->port < 65536 ?
627 share->port= server->port > 1023 && server->port < 65536 ?
629 (ushort) server->port : MYSQL_PORT;
630 share->hostname= server->host;
631 if (!(share->socket= server->socket) &&
632 !strcmp(share->hostname, my_localhost))
633 share->socket= (
char *) MYSQL_UNIX_ADDR;
634 share->scheme= server->scheme;
636 DBUG_PRINT(
"info", (
"share->username %s", share->username));
637 DBUG_PRINT(
"info", (
"share->password %s", share->password));
638 DBUG_PRINT(
"info", (
"share->hostname %s", share->hostname));
639 DBUG_PRINT(
"info", (
"share->database %s", share->database));
640 DBUG_PRINT(
"info", (
"share->port %d", share->port));
641 DBUG_PRINT(
"info", (
"share->socket %s", share->socket));
645 my_printf_error(error_num,
"server name: '%s' doesn't exist!",
646 MYF(0), share->connection_string);
647 DBUG_RETURN(error_num);
706 uint table_create_flag)
708 uint error_num= (table_create_flag ?
709 ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
710 ER_FOREIGN_DATA_STRING_INVALID);
711 DBUG_ENTER(
"ha_federated::parse_url");
715 DBUG_PRINT(
"info", (
"share at %lx", (
long unsigned int) share));
716 DBUG_PRINT(
"info", (
"Length: %u", (uint) table->s->connect_string.length));
717 DBUG_PRINT(
"info", (
"String: '%.*s'", (
int) table->s->connect_string.length,
718 table->s->connect_string.str));
719 share->connection_string= strmake_root(mem_root, table->s->connect_string.str,
720 table->s->connect_string.length);
722 DBUG_PRINT(
"info",(
"parse_url alloced share->connection_string %lx",
723 (
long unsigned int) share->connection_string));
725 DBUG_PRINT(
"info",(
"share->connection_string %s",share->connection_string));
730 if ( (!strstr(share->connection_string,
"://") &&
731 (!strchr(share->connection_string,
'@'))))
735 (
"share->connection_string %s internal format \
736 share->connection_string %lx",
737 share->connection_string,
738 (
long unsigned int) share->connection_string));
741 share->parsed= FALSE;
747 if ((share->table_name= strchr(share->connection_string,
'/')))
749 share->connection_string[share->table_name - share->connection_string]=
'\0';
751 share->table_name_length= (uint) strlen(share->table_name);
754 (
"internal format, parsed table_name share->connection_string \
755 %s share->table_name %s",
756 share->connection_string, share->table_name));
761 if (strchr(share->table_name,
'/'))
775 share->table_name= strmake_root(mem_root, table->s->table_name.str,
776 (share->table_name_length= table->s->table_name.length));
778 (
"internal format, default table_name share->connection_string \
779 %s share->table_name %s",
780 share->connection_string, share->table_name));
783 if ((error_num= get_connection(mem_root, share)))
790 share->connection_string[table->s->connect_string.length]= 0;
791 share->scheme= share->connection_string;
792 DBUG_PRINT(
"info",(
"parse_url alloced share->scheme %lx",
793 (
long unsigned int) share->scheme));
799 if (!(share->username= strstr(share->scheme,
"://")))
801 share->scheme[share->username - share->scheme]=
'\0';
803 if (strcmp(share->scheme,
"mysql") != 0)
808 if (!(share->hostname= strchr(share->username,
'@')))
811 share->username[share->hostname - share->username]=
'\0';
814 if ((share->password= strchr(share->username,
':')))
816 share->username[share->password - share->username]=
'\0';
818 share->username= share->username;
820 if ((strchr(share->password,
'/') || strchr(share->hostname,
'@')))
827 if (share->password[0] ==
'\0')
828 share->password= NULL;
831 share->username= share->username;
834 if ((strchr(share->username,
'/')) || (strchr(share->hostname,
'@')))
837 if (!(share->database= strchr(share->hostname,
'/')))
839 share->hostname[share->database - share->hostname]=
'\0';
842 if ((share->sport= strchr(share->hostname,
':')))
844 share->hostname[share->sport - share->hostname]=
'\0';
846 if (share->sport[0] ==
'\0')
849 share->port= atoi(share->sport);
852 if (!(share->table_name= strchr(share->database,
'/')))
854 share->database[share->table_name - share->database]=
'\0';
857 share->table_name_length= strlen(share->table_name);
860 if ((strchr(share->table_name,
'/')))
870 if (share->hostname[0] ==
'\0')
871 share->hostname= NULL;
876 if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
877 share->socket= (
char*) MYSQL_UNIX_ADDR;
879 share->port= MYSQL_PORT;
883 (
"scheme: %s username: %s password: %s \
884 hostname: %s port: %d db: %s tablename: %s",
885 share->scheme, share->username, share->password,
886 share->hostname, share->port, share->database,
892 DBUG_RETURN(parse_url_error(share, table, error_num));
902 mysql(0), stored_result(0)
905 memset(&bulk_insert, 0,
sizeof(bulk_insert));
929 uint ha_federated::convert_row_to_internal_format(uchar *
record,
935 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
936 DBUG_ENTER(
"ha_federated::convert_row_to_internal_format");
938 lengths= mysql_fetch_lengths(result);
940 for (field= table->field; *field; field++, row++, lengths++)
946 my_ptrdiff_t old_ptr;
947 old_ptr= (my_ptrdiff_t) (record - table->record[0]);
948 (*field)->move_field_offset(old_ptr);
951 (*field)->set_null();
956 if (bitmap_is_set(table->read_set, (*field)->field_index))
958 (*field)->set_notnull();
959 (*field)->store(*row, *lengths, &my_charset_bin);
962 (*field)->move_field_offset(-old_ptr);
964 dbug_tmp_restore_column_map(table->write_set, old_map);
970 DBUG_ENTER(
"emit_key_part_name");
971 if (append_ident(to, part->field->field_name,
972 strlen(part->field->field_name), ident_quote_char))
978 bool needs_quotes,
bool is_like,
979 const uchar *ptr, uint len)
981 Field *field= part->field;
982 DBUG_ENTER(
"emit_key_part_element");
984 if (needs_quotes && to->append(STRING_WITH_LEN(
"'")))
987 if (part->type == HA_KEYTYPE_BIT)
989 char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
993 buf= octet2hex(buf, (
char*) ptr, len);
994 if (to->append((
char*) buff, (uint)(buf - buff)))
997 else if (part->key_part_flag & HA_BLOB_PART)
1000 uint blob_length= uint2korr(ptr);
1001 blob.set_quick((
char*) ptr+HA_KEY_BLOB_LENGTH,
1002 blob_length, &my_charset_bin);
1003 if (append_escaped(to, &blob))
1006 else if (part->key_part_flag & HA_VAR_LENGTH_PART)
1009 uint var_length= uint2korr(ptr);
1010 varchar.set_quick((
char*) ptr+HA_KEY_BLOB_LENGTH,
1011 var_length, &my_charset_bin);
1012 if (append_escaped(to, &varchar))
1017 char strbuff[MAX_FIELD_WIDTH];
1018 String str(strbuff,
sizeof(strbuff), part->field->charset()), *res;
1020 res= field->val_str(&str, ptr);
1022 if (field->result_type() == STRING_RESULT)
1024 if (append_escaped(to, res))
1027 else if (to->append(res->ptr(), res->length()))
1031 if (is_like && to->append(STRING_WITH_LEN(
"%")))
1034 if (needs_quotes && to->append(STRING_WITH_LEN(
"'")))
1278 bool ha_federated::create_where_from_key(
String *to,
1282 bool from_records_in_range,
1286 (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
1288 uint remainder, length;
1289 char tmpbuff[FEDERATED_QUERY_BUFFER_SIZE];
1290 String tmp(tmpbuff,
sizeof(tmpbuff), system_charset_info);
1291 const key_range *ranges[2]= { start_key, end_key };
1292 my_bitmap_map *old_map;
1293 DBUG_ENTER(
"ha_federated::create_where_from_key");
1296 if (start_key == NULL && end_key == NULL)
1299 old_map= dbug_tmp_use_all_columns(table, table->write_set);
1300 for (uint
i= 0;
i <= 1;
i++)
1304 if (ranges[
i] == NULL)
1310 tmp.append(STRING_WITH_LEN(
") AND ("));
1312 tmp.append(STRING_WITH_LEN(
" ("));
1315 for (key_part= key_info->key_part,
1317 length= ranges[
i]->length,
1318 ptr= ranges[i]->key; ;
1322 Field *field= key_part->field;
1323 uint store_length= key_part->store_length;
1324 uint part_length= min(store_length, length);
1325 needs_quotes= field->str_needs_quotes();
1326 DBUG_DUMP(
"key, start of loop", ptr, length);
1328 if (key_part->null_bit)
1337 if (emit_key_part_name(&tmp, key_part) ||
1338 (ranges[i]->flag == HA_READ_KEY_EXACT ?
1339 tmp.append(STRING_WITH_LEN(
" IS NULL ")) :
1340 tmp.append(STRING_WITH_LEN(
" IS NOT NULL "))))
1346 goto prepare_for_next_key_part;
1350 if (tmp.append(STRING_WITH_LEN(
" (")))
1353 switch (ranges[i]->flag) {
1354 case HA_READ_KEY_EXACT:
1355 DBUG_PRINT(
"info", (
"federated HA_READ_KEY_EXACT %d", i));
1356 if (store_length >= length ||
1358 key_part->type == HA_KEYTYPE_BIT ||
1359 field->result_type() != STRING_RESULT)
1361 if (emit_key_part_name(&tmp, key_part))
1364 if (from_records_in_range)
1366 if (tmp.append(STRING_WITH_LEN(
" >= ")))
1371 if (tmp.append(STRING_WITH_LEN(
" = ")))
1375 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1382 if (emit_key_part_name(&tmp, key_part) ||
1383 tmp.append(STRING_WITH_LEN(
" LIKE ")) ||
1384 emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
1389 case HA_READ_AFTER_KEY:
1392 if (tmp.append(
"1=1"))
1396 DBUG_PRINT(
"info", (
"federated HA_READ_AFTER_KEY %d", i));
1397 if ((store_length >= length) || (i > 0))
1399 if (emit_key_part_name(&tmp, key_part))
1404 if (tmp.append(STRING_WITH_LEN(
" <= ")))
1409 if (tmp.append(STRING_WITH_LEN(
" > ")))
1413 if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1420 case HA_READ_KEY_OR_NEXT:
1421 DBUG_PRINT(
"info", (
"federated HA_READ_KEY_OR_NEXT %d", i));
1422 if (emit_key_part_name(&tmp, key_part) ||
1423 tmp.append(STRING_WITH_LEN(
" >= ")) ||
1424 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1428 case HA_READ_BEFORE_KEY:
1429 DBUG_PRINT(
"info", (
"federated HA_READ_BEFORE_KEY %d", i));
1430 if (store_length >= length)
1432 if (emit_key_part_name(&tmp, key_part) ||
1433 tmp.append(STRING_WITH_LEN(
" < ")) ||
1434 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1439 case HA_READ_KEY_OR_PREV:
1440 DBUG_PRINT(
"info", (
"federated HA_READ_KEY_OR_PREV %d", i));
1441 if (emit_key_part_name(&tmp, key_part) ||
1442 tmp.append(STRING_WITH_LEN(
" <= ")) ||
1443 emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
1448 DBUG_PRINT(
"info",(
"cannot handle flag %d", ranges[i]->flag));
1451 if (tmp.append(STRING_WITH_LEN(
") ")))
1454 prepare_for_next_key_part:
1455 if (store_length >= length)
1457 DBUG_PRINT(
"info", (
"remainder %d", remainder));
1458 DBUG_ASSERT(remainder > 1);
1459 length-= store_length;
1465 ptr+= store_length -
test(key_part->null_bit);
1466 if (tmp.append(STRING_WITH_LEN(
" AND ")))
1470 (
"create_where_from_key WHERE clause: %s",
1471 tmp.c_ptr_quick()));
1474 dbug_tmp_restore_column_map(table->write_set, old_map);
1477 if (tmp.append(STRING_WITH_LEN(
") ")))
1480 if (to->append(STRING_WITH_LEN(
" WHERE ")))
1483 if (to->append(tmp))
1489 dbug_tmp_restore_column_map(table->write_set, old_map);
1501 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1503 String query(query_buffer,
sizeof(query_buffer), &my_charset_bin);
1506 DBUG_ENTER(
"ha_federated.cc::get_share");
1514 init_alloc_root(&mem_root, 256, 0);
1519 tmp_share.share_key_length= (uint) strlen(table_name);
1520 if (parse_url(&mem_root, &tmp_share, table, 0))
1524 if (!(share= (
FEDERATED_SHARE *) my_hash_search(&federated_open_tables,
1525 (uchar*) tmp_share.share_key,
1529 query.set_charset(system_charset_info);
1530 query.append(STRING_WITH_LEN(
"SELECT "));
1531 for (field= table->field; *field; field++)
1533 append_ident(&
query, (*field)->field_name,
1534 strlen((*field)->field_name), ident_quote_char);
1535 query.append(STRING_WITH_LEN(
", "));
1538 query.length(
query.length() - sizeof_trailing_comma);
1540 query.append(STRING_WITH_LEN(
" FROM "));
1542 append_ident(&
query, tmp_share.table_name,
1543 tmp_share.table_name_length, ident_quote_char);
1545 if (!(share= (
FEDERATED_SHARE *) memdup_root(&mem_root, (
char*)&tmp_share,
sizeof(*share))) ||
1546 !(share->select_query= (
char*) strmake_root(&mem_root,
query.ptr(),
query.length() + 1)))
1549 share->use_count= 0;
1550 share->mem_root= mem_root;
1553 (
"share->select_query %s", share->select_query));
1555 if (my_hash_insert(&federated_open_tables, (uchar*) share))
1557 thr_lock_init(&share->lock);
1559 &share->mutex, MY_MUTEX_INIT_FAST);
1562 free_root(&mem_root, MYF(0));
1571 free_root(&mem_root, MYF(0));
1584 MEM_ROOT mem_root= share->mem_root;
1585 DBUG_ENTER(
"free_share");
1588 if (!--share->use_count)
1590 my_hash_delete(&federated_open_tables, (uchar*) share);
1591 thr_lock_delete(&share->lock);
1593 free_root(&mem_root, MYF(0));
1601 ha_rows ha_federated::records_in_range(uint inx,
key_range *start_key,
1611 DBUG_ENTER(
"ha_federated::records_in_range");
1612 DBUG_RETURN(FEDERATED_RECORDS_IN_RANGE);
1623 static const char *ext[]=
1642 int ha_federated::open(
const char *name,
int mode, uint test_if_locked)
1644 DBUG_ENTER(
"ha_federated::open");
1646 if (!(share= get_share(name, table)))
1648 thr_lock_data_init(&share->lock, &lock, NULL);
1650 DBUG_ASSERT(mysql == NULL);
1653 DBUG_PRINT(
"info", (
"ref_length: %u",
ref_length));
1655 my_init_dynamic_array(&results,
sizeof(
MYSQL_RES *), 4, 4);
1673 int ha_federated::close(
void)
1675 DBUG_ENTER(
"ha_federated::close");
1679 delete_dynamic(&results);
1693 table->in_use->clear_error();
1695 DBUG_RETURN(free_share(share));
1710 bool ha_federated::append_stmt_insert(
String *
query)
1712 char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1715 bool added_field= FALSE;
1718 String insert_string(insert_buffer,
sizeof(insert_buffer), &my_charset_bin);
1719 DBUG_ENTER(
"ha_federated::append_stmt_insert");
1721 insert_string.length(0);
1723 if (replace_duplicates)
1724 insert_string.append(STRING_WITH_LEN(
"REPLACE INTO "));
1725 else if (ignore_duplicates && !insert_dup_update)
1726 insert_string.append(STRING_WITH_LEN(
"INSERT IGNORE INTO "));
1728 insert_string.append(STRING_WITH_LEN(
"INSERT INTO "));
1729 append_ident(&insert_string, share->table_name, share->table_name_length,
1731 tmp_length= insert_string.length();
1732 insert_string.append(STRING_WITH_LEN(
" ("));
1738 for (field= table->field; *field; field++)
1740 if (bitmap_is_set(table->write_set, (*field)->field_index))
1743 append_ident(&insert_string, (*field)->field_name,
1744 strlen((*field)->field_name), ident_quote_char);
1752 insert_string.append(STRING_WITH_LEN(
", "));
1760 insert_string.length(insert_string.length() - sizeof_trailing_comma);
1761 insert_string.append(STRING_WITH_LEN(
") "));
1766 insert_string.length(tmp_length);
1769 insert_string.append(STRING_WITH_LEN(
" VALUES "));
1771 DBUG_RETURN(query->append(insert_string));
1789 int ha_federated::write_row(uchar *buf)
1791 char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1792 char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
1796 bool use_bulk_insert;
1797 bool auto_increment_update_required= (table->next_number_field != NULL);
1800 String values_string(values_buffer,
sizeof(values_buffer), &my_charset_bin);
1802 String insert_field_value_string(insert_field_value_buffer,
1803 sizeof(insert_field_value_buffer),
1805 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1806 DBUG_ENTER(
"ha_federated::write_row");
1808 values_string.length(0);
1809 insert_field_value_string.length(0);
1810 ha_statistic_increment(&SSV::ha_write_count);
1821 if (!(use_bulk_insert= bulk_insert.str &&
1822 (!insert_dup_update || replace_duplicates)))
1823 append_stmt_insert(&values_string);
1825 values_string.append(STRING_WITH_LEN(
" ("));
1826 tmp_length= values_string.length();
1832 for (field= table->field; *field; field++)
1834 if (bitmap_is_set(table->write_set, (*field)->field_index))
1836 if ((*field)->is_null())
1837 values_string.append(STRING_WITH_LEN(
" NULL "));
1840 bool needs_quote= (*field)->str_needs_quotes();
1841 (*field)->val_str(&insert_field_value_string);
1843 values_string.append(value_quote_char);
1844 insert_field_value_string.print(&values_string);
1846 values_string.append(value_quote_char);
1848 insert_field_value_string.length(0);
1857 values_string.append(STRING_WITH_LEN(
", "));
1860 dbug_tmp_restore_column_map(table->read_set, old_map);
1867 if (values_string.length() > tmp_length)
1870 values_string.length(values_string.length() - sizeof_trailing_comma);
1873 values_string.append(STRING_WITH_LEN(
") "));
1875 if (use_bulk_insert)
1882 if (bulk_insert.length + values_string.length() + bulk_padding >
1883 mysql->net.max_packet_size && bulk_insert.length)
1885 error= real_query(bulk_insert.str, bulk_insert.length);
1886 bulk_insert.length= 0;
1889 auto_increment_update_required= FALSE;
1891 if (bulk_insert.length == 0)
1893 char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
1894 String insert_string(insert_buffer,
sizeof(insert_buffer),
1896 insert_string.length(0);
1897 append_stmt_insert(&insert_string);
1898 dynstr_append_mem(&bulk_insert, insert_string.ptr(),
1899 insert_string.length());
1902 dynstr_append_mem(&bulk_insert,
",", 1);
1904 dynstr_append_mem(&bulk_insert, values_string.ptr(),
1905 values_string.length());
1909 error= real_query(values_string.ptr(), values_string.length());
1914 DBUG_RETURN(stash_remote_error());
1920 if (auto_increment_update_required)
1922 update_auto_increment();
1925 table->next_number_field->store(
stats.auto_increment_value, 1);
1944 DBUG_ENTER(
"ha_federated::start_bulk_insert");
1946 dynstr_free(&bulk_insert);
1961 if (!mysql && real_connect())
1964 page_size= (uint) my_getpagesize();
1966 if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
1969 bulk_insert.length= 0;
1988 DBUG_ENTER(
"ha_federated::end_bulk_insert");
1990 if (bulk_insert.str && bulk_insert.length)
1992 if (real_query(bulk_insert.str, bulk_insert.length))
1993 error= stash_remote_error();
1995 if (table->next_number_field)
1996 update_auto_increment();
1999 dynstr_free(&bulk_insert);
2001 DBUG_RETURN(my_errno= error);
2013 void ha_federated::update_auto_increment(
void)
2015 THD *thd= current_thd;
2016 DBUG_ENTER(
"ha_federated::update_auto_increment");
2018 ha_federated::info(HA_STATUS_AUTO);
2019 thd->first_successful_insert_id_in_cur_stmt=
2020 stats.auto_increment_value;
2021 DBUG_PRINT(
"info",(
"last_insert_id: %ld", (
long)
stats.auto_increment_value));
2026 int ha_federated::optimize(THD* thd,
HA_CHECK_OPT* check_opt)
2028 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2029 String query(query_buffer,
sizeof(query_buffer), &my_charset_bin);
2030 DBUG_ENTER(
"ha_federated::optimize");
2034 query.set_charset(system_charset_info);
2035 query.append(STRING_WITH_LEN(
"OPTIMIZE TABLE "));
2036 append_ident(&query, share->table_name, share->table_name_length,
2039 if (real_query(query.ptr(), query.length()))
2041 DBUG_RETURN(stash_remote_error());
2050 char query_buffer[STRING_BUFFER_USUAL_SIZE];
2051 String query(query_buffer,
sizeof(query_buffer), &my_charset_bin);
2052 DBUG_ENTER(
"ha_federated::repair");
2056 query.set_charset(system_charset_info);
2057 query.append(STRING_WITH_LEN(
"REPAIR TABLE "));
2058 append_ident(&query, share->table_name, share->table_name_length,
2060 if (check_opt->flags & T_QUICK)
2061 query.append(STRING_WITH_LEN(
" QUICK"));
2062 if (check_opt->flags & T_EXTEND)
2063 query.append(STRING_WITH_LEN(
" EXTENDED"));
2064 if (check_opt->sql_flags & TT_USEFRM)
2065 query.append(STRING_WITH_LEN(
" USE_FRM"));
2067 if (real_query(query.ptr(), query.length()))
2069 DBUG_RETURN(stash_remote_error());
2093 int ha_federated::update_row(
const uchar *old_data, uchar *new_data)
2107 bool has_a_primary_key=
test(table->s->primary_key != MAX_KEY);
2112 char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
2113 char update_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2114 char where_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2117 String field_value(field_value_buffer,
sizeof(field_value_buffer),
2120 String update_string(update_buffer,
2121 sizeof(update_buffer),
2124 String where_string(where_buffer,
2125 sizeof(where_buffer),
2127 uchar *record= table->record[0];
2128 DBUG_ENTER(
"ha_federated::update_row");
2132 field_value.length(0);
2133 update_string.length(0);
2134 where_string.length(0);
2136 if (ignore_duplicates)
2137 update_string.append(STRING_WITH_LEN(
"UPDATE IGNORE "));
2139 update_string.append(STRING_WITH_LEN(
"UPDATE "));
2140 append_ident(&update_string, share->table_name,
2141 share->table_name_length, ident_quote_char);
2142 update_string.append(STRING_WITH_LEN(
" SET "));
2154 for (
Field **field= table->field; *field; field++)
2156 if (bitmap_is_set(table->write_set, (*field)->field_index))
2158 size_t field_name_length= strlen((*field)->field_name);
2159 append_ident(&update_string, (*field)->field_name, field_name_length,
2161 update_string.append(STRING_WITH_LEN(
" = "));
2163 if ((*field)->is_null())
2164 update_string.append(STRING_WITH_LEN(
" NULL "));
2168 my_bitmap_map *old_map= tmp_use_all_columns(table, table->read_set);
2169 bool needs_quote= (*field)->str_needs_quotes();
2170 (*field)->val_str(&field_value);
2172 update_string.append(value_quote_char);
2173 field_value.print(&update_string);
2175 update_string.append(value_quote_char);
2176 field_value.length(0);
2177 tmp_restore_column_map(table->read_set, old_map);
2179 update_string.append(STRING_WITH_LEN(
", "));
2182 if (bitmap_is_set(table->read_set, (*field)->field_index))
2184 size_t field_name_length= strlen((*field)->field_name);
2185 append_ident(&where_string, (*field)->field_name, field_name_length,
2187 if ((*field)->is_null_in_record(old_data))
2188 where_string.append(STRING_WITH_LEN(
" IS NULL "));
2191 bool needs_quote= (*field)->str_needs_quotes();
2192 where_string.append(STRING_WITH_LEN(
" = "));
2193 (*field)->val_str(&field_value,
2194 (old_data + (*field)->offset(record)));
2196 where_string.append(value_quote_char);
2197 field_value.print(&where_string);
2199 where_string.append(value_quote_char);
2200 field_value.length(0);
2202 where_string.append(STRING_WITH_LEN(
" AND "));
2207 update_string.length(update_string.length() - sizeof_trailing_comma);
2209 if (where_string.length())
2212 where_string.length(where_string.length() - sizeof_trailing_and);
2213 update_string.append(STRING_WITH_LEN(
" WHERE "));
2214 update_string.append(where_string);
2221 if (!has_a_primary_key)
2222 update_string.append(STRING_WITH_LEN(
" LIMIT 1"));
2224 if (real_query(update_string.ptr(), update_string.length()))
2226 DBUG_RETURN(stash_remote_error());
2246 int ha_federated::delete_row(
const uchar *buf)
2248 char delete_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2249 char data_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2250 String delete_string(delete_buffer,
sizeof(delete_buffer), &my_charset_bin);
2251 String data_string(data_buffer,
sizeof(data_buffer), &my_charset_bin);
2253 DBUG_ENTER(
"ha_federated::delete_row");
2255 delete_string.length(0);
2256 delete_string.append(STRING_WITH_LEN(
"DELETE FROM "));
2257 append_ident(&delete_string, share->table_name,
2258 share->table_name_length, ident_quote_char);
2259 delete_string.append(STRING_WITH_LEN(
" WHERE "));
2261 for (
Field **field= table->field; *field; field++)
2263 Field *cur_field= *field;
2265 if (bitmap_is_set(table->read_set, cur_field->field_index))
2267 append_ident(&delete_string, (*field)->field_name,
2268 strlen((*field)->field_name), ident_quote_char);
2269 data_string.length(0);
2270 if (cur_field->is_null())
2272 delete_string.append(STRING_WITH_LEN(
" IS NULL "));
2276 bool needs_quote= cur_field->str_needs_quotes();
2277 delete_string.append(STRING_WITH_LEN(
" = "));
2278 cur_field->val_str(&data_string);
2280 delete_string.append(value_quote_char);
2281 data_string.print(&delete_string);
2283 delete_string.append(value_quote_char);
2285 delete_string.append(STRING_WITH_LEN(
" AND "));
2290 delete_string.length(delete_string.length() - sizeof_trailing_and);
2292 delete_string.length(delete_string.length() - sizeof_trailing_where);
2294 delete_string.append(STRING_WITH_LEN(
" LIMIT 1"));
2296 (
"Delete sql: %s", delete_string.c_ptr_quick()));
2297 if (real_query(delete_string.ptr(), delete_string.length()))
2299 DBUG_RETURN(stash_remote_error());
2301 stats.deleted+= (ha_rows) mysql->affected_rows;
2302 stats.records-= (ha_rows) mysql->affected_rows;
2304 (
"rows deleted %ld rows deleted for all time %ld",
2305 (
long) mysql->affected_rows, (
long)
stats.deleted));
2318 int ha_federated::index_read(uchar *buf,
const uchar *key,
2319 uint key_len, ha_rkey_function find_flag)
2322 DBUG_ENTER(
"ha_federated::index_read");
2324 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2326 rc= index_read_idx_with_result_set(buf, active_index, key,
2329 MYSQL_INDEX_READ_ROW_DONE(rc);
2347 int ha_federated::index_read_idx(uchar *buf, uint
index,
const uchar *key,
2348 uint key_len,
enum ha_rkey_function find_flag)
2352 DBUG_ENTER(
"ha_federated::index_read_idx");
2354 if ((retval= index_read_idx_with_result_set(buf, index, key,
2357 DBUG_RETURN(retval);
2358 mysql_free_result(mysql_result);
2374 int ha_federated::index_read_idx_with_result_set(uchar *buf, uint index,
2377 ha_rkey_function find_flag,
2381 char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2382 char index_value[STRING_BUFFER_USUAL_SIZE];
2383 char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2384 String index_string(index_value,
2385 sizeof(index_value),
2387 String sql_query(sql_query_buffer,
2388 sizeof(sql_query_buffer),
2391 DBUG_ENTER(
"ha_federated::index_read_idx_with_result_set");
2394 index_string.length(0);
2395 sql_query.length(0);
2396 ha_statistic_increment(&SSV::ha_read_key_count);
2398 sql_query.append(share->select_query);
2401 range.length= key_len;
2402 range.flag= find_flag;
2403 create_where_from_key(&index_string,
2404 &table->key_info[index],
2407 sql_query.append(index_string);
2409 if (real_query(sql_query.ptr(), sql_query.length()))
2411 sprintf(error_buffer,
"error: %d '%s'",
2412 mysql_errno(mysql), mysql_error(mysql));
2413 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2418 retval= HA_ERR_END_OF_FILE;
2421 if ((retval= read_next(buf, *result)))
2423 mysql_free_result(*result);
2426 table->status= STATUS_NOT_FOUND;
2427 DBUG_RETURN(retval);
2432 table->status= STATUS_NOT_FOUND;
2433 my_error(retval, MYF(0), error_buffer);
2434 DBUG_RETURN(retval);
2452 return HA_POS_ERROR;
2458 int ha_federated::index_init(uint keynr,
bool sorted)
2460 DBUG_ENTER(
"ha_federated::index_init");
2461 DBUG_PRINT(
"info", (
"table: '%s' key: %u", table->s->table_name.str, keynr));
2462 active_index= keynr;
2473 bool eq_range_arg,
bool sorted)
2475 char sql_query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2477 String sql_query(sql_query_buffer,
2478 sizeof(sql_query_buffer),
2480 DBUG_ENTER(
"ha_federated::read_range_first");
2481 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2483 DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
2485 sql_query.length(0);
2486 sql_query.append(share->select_query);
2487 create_where_from_key(&sql_query,
2488 &table->key_info[active_index],
2489 start_key, end_key, 0, eq_range_arg);
2490 if (real_query(sql_query.ptr(), sql_query.length()))
2492 retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2495 sql_query.length(0);
2499 retval= HA_ERR_END_OF_FILE;
2503 retval= read_next(table->record[0], stored_result);
2504 MYSQL_INDEX_READ_ROW_DONE(retval);
2505 DBUG_RETURN(retval);
2508 table->status= STATUS_NOT_FOUND;
2509 MYSQL_INDEX_READ_ROW_DONE(retval);
2510 DBUG_RETURN(retval);
2517 DBUG_ENTER(
"ha_federated::read_range_next");
2518 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2519 retval= rnd_next_int(table->record[0]);
2520 MYSQL_INDEX_READ_ROW_DONE(retval);
2521 DBUG_RETURN(retval);
2529 DBUG_ENTER(
"ha_federated::index_next");
2530 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
2531 ha_statistic_increment(&SSV::ha_read_next_count);
2532 retval= read_next(buf, stored_result);
2533 MYSQL_INDEX_READ_ROW_DONE(retval);
2534 DBUG_RETURN(retval);
2553 DBUG_ENTER(
"ha_federated::rnd_init");
2591 if (real_query(share->select_query, strlen(share->select_query)) ||
2593 DBUG_RETURN(stash_remote_error());
2599 int ha_federated::rnd_end()
2601 DBUG_ENTER(
"ha_federated::rnd_end");
2602 DBUG_RETURN(index_end());
2606 int ha_federated::index_end(
void)
2608 DBUG_ENTER(
"ha_federated::index_end");
2610 active_index= MAX_KEY;
2628 DBUG_ENTER(
"ha_federated::rnd_next");
2629 MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2631 rc= rnd_next_int(buf);
2632 MYSQL_READ_ROW_DONE(rc);
2636 int ha_federated::rnd_next_int(uchar *buf)
2638 DBUG_ENTER(
"ha_federated::rnd_next_int");
2640 if (stored_result == 0)
2649 DBUG_RETURN(read_next(buf, stored_result));
2673 int ha_federated::read_next(uchar *buf,
MYSQL_RES *result)
2677 DBUG_ENTER(
"ha_federated::read_next");
2679 table->status= STATUS_NOT_FOUND;
2682 current_position= result->data_cursor;
2685 if (!(row= mysql_fetch_row(result)))
2686 DBUG_RETURN(HA_ERR_END_OF_FILE);
2688 if (!(retval= convert_row_to_internal_format(buf, row, result)))
2691 DBUG_RETURN(retval);
2717 DBUG_ENTER(
"ha_federated::position");
2719 DBUG_ASSERT(stored_result);
2721 position_called= TRUE;
2723 memcpy(ref, &stored_result,
sizeof(
MYSQL_RES *));
2725 memcpy(ref +
sizeof(
MYSQL_RES *), ¤t_position,
2744 DBUG_ENTER(
"ha_federated::rnd_pos");
2746 MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
2748 ha_statistic_increment(&SSV::ha_read_rnd_count);
2751 memcpy(&result, pos,
sizeof(
MYSQL_RES *));
2752 DBUG_ASSERT(result);
2754 memcpy(&result->data_cursor, pos +
sizeof(
MYSQL_RES *),
2757 ret_val= read_next(buf, result);
2758 MYSQL_READ_ROW_DONE(ret_val);
2759 DBUG_RETURN(ret_val);
2807 int ha_federated::info(uint flag)
2809 char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
2814 String status_query_string(status_buf,
sizeof(status_buf), &my_charset_bin);
2815 DBUG_ENTER(
"ha_federated::info");
2817 error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
2819 if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
2821 status_query_string.length(0);
2822 status_query_string.append(STRING_WITH_LEN(
"SHOW TABLE STATUS LIKE "));
2823 append_ident(&status_query_string, share->table_name,
2824 share->table_name_length, value_quote_char);
2826 if (real_query(status_query_string.ptr(), status_query_string.length()))
2829 status_query_string.length(0);
2831 result= mysql_store_result(mysql);
2837 if (!result || (mysql_num_fields(result) < 14))
2840 if (!mysql_num_rows(result))
2843 if (!(row= mysql_fetch_row(result)))
2858 stats.records= (ha_rows) my_strtoll10(row[4], (
char**) 0,
2861 stats.mean_rec_length= (ulong) my_strtoll10(row[5], (
char**) 0, &error);
2865 if (row[12] != NULL)
2866 stats.update_time= (ulong) my_strtoll10(row[12], (
char**) 0,
2868 if (row[13] != NULL)
2869 stats.check_time= (ulong) my_strtoll10(row[13], (
char**) 0,
2876 if (flag & HA_STATUS_CONST)
2877 stats.block_size= 4096;
2881 if (flag & HA_STATUS_AUTO)
2882 stats.auto_increment_value= mysql->insert_id;
2884 mysql_free_result(result);
2889 mysql_free_result(result);
2892 my_printf_error(error_code,
": %d : %s", MYF(0),
2893 mysql_errno(mysql), mysql_error(mysql));
2896 if (remote_error_number != -1 )
2898 error_code= remote_error_number;
2899 my_error(error_code, MYF(0), ER(error_code));
2901 DBUG_RETURN(error_code);
2915 DBUG_ENTER(
"ha_federated::extra");
2916 switch (operation) {
2917 case HA_EXTRA_IGNORE_DUP_KEY:
2918 ignore_duplicates= TRUE;
2920 case HA_EXTRA_NO_IGNORE_DUP_KEY:
2921 insert_dup_update= FALSE;
2922 ignore_duplicates= FALSE;
2924 case HA_EXTRA_WRITE_CAN_REPLACE:
2925 replace_duplicates= TRUE;
2927 case HA_EXTRA_WRITE_CANNOT_REPLACE:
2932 replace_duplicates= FALSE;
2934 case HA_EXTRA_INSERT_WITH_UPDATE:
2935 insert_dup_update= TRUE;
2939 DBUG_PRINT(
"info",(
"unhandled operation: %d", (uint) operation));
2957 insert_dup_update= FALSE;
2958 ignore_duplicates= FALSE;
2959 replace_duplicates= FALSE;
2962 for (uint i= 0; i < results.elements; i++)
2965 get_dynamic(&results, (uchar *) &result, i);
2966 mysql_free_result(result);
2968 reset_dynamic(&results);
2988 char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
2989 String query(query_buffer,
sizeof(query_buffer), &my_charset_bin);
2990 DBUG_ENTER(
"ha_federated::delete_all_rows");
2994 query.set_charset(system_charset_info);
2995 query.append(STRING_WITH_LEN(
"TRUNCATE "));
2996 append_ident(&query, share->table_name, share->table_name_length,
3002 if (real_query(query.ptr(), query.length()))
3004 DBUG_RETURN(stash_remote_error());
3054 enum thr_lock_type lock_type)
3056 DBUG_ENTER(
"ha_federated::store_lock");
3057 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
3066 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
3067 lock_type <= TL_WRITE) && !thd->in_lock_tables)
3068 lock_type= TL_WRITE_ALLOW_WRITE;
3078 if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
3081 lock.type= lock_type;
3094 int ha_federated::create(
const char *name,
TABLE *table_arg,
3098 THD *thd= current_thd;
3100 DBUG_ENTER(
"ha_federated::create");
3102 retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1);
3104 DBUG_RETURN(retval);
3109 int ha_federated::real_connect()
3111 char buffer[FEDERATED_QUERY_BUFFER_SIZE];
3112 String sql_query(buffer,
sizeof(buffer), &my_charset_bin);
3113 DBUG_ENTER(
"ha_federated::real_connect");
3123 DBUG_ASSERT(mysql == NULL);
3125 if (!(mysql= mysql_init(NULL)))
3127 remote_error_number= HA_ERR_OUT_OF_MEM;
3137 mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
3138 this->table->s->table_charset->csname);
3140 sql_query.length(0);
3142 if (!mysql_real_connect(mysql,
3150 stash_remote_error();
3153 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), remote_error_buf);
3154 remote_error_number= -1;
3162 sql_query.append(share->select_query);
3163 sql_query.append(STRING_WITH_LEN(
" WHERE 1=0"));
3164 if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
3166 sql_query.length(0);
3167 sql_query.append(
"error: ");
3168 sql_query.qs_append(mysql_errno(mysql));
3169 sql_query.append(
" '");
3170 sql_query.append(mysql_error(mysql));
3171 sql_query.append(
"'");
3174 my_error(ER_FOREIGN_DATA_SOURCE_DOESNT_EXIST, MYF(0), sql_query.ptr());
3175 remote_error_number= -1;
3180 mysql_free_result(mysql_store_result(mysql));
3188 mysql->reconnect= 1;
3193 int ha_federated::real_query(
const char *query,
size_t length)
3196 DBUG_ENTER(
"ha_federated::real_query");
3198 if (!mysql && (rc= real_connect()))
3201 if (!query || !length)
3204 rc= mysql_real_query(mysql, query, (uint) length);
3211 int ha_federated::stash_remote_error()
3213 DBUG_ENTER(
"ha_federated::stash_remote_error()");
3215 DBUG_RETURN(remote_error_number);
3216 remote_error_number= mysql_errno(mysql);
3217 strmake(remote_error_buf, mysql_error(mysql),
sizeof(remote_error_buf)-1);
3218 if (remote_error_number == ER_DUP_ENTRY ||
3219 remote_error_number == ER_DUP_KEY)
3220 DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
3221 DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
3227 DBUG_ENTER(
"ha_federated::get_error_message");
3228 DBUG_PRINT(
"enter", (
"error: %d", error));
3229 if (error == HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM)
3231 buf->append(STRING_WITH_LEN(
"Error on remote system: "));
3232 buf->qs_append(remote_error_number);
3233 buf->append(STRING_WITH_LEN(
": "));
3234 buf->append(remote_error_buf);
3236 remote_error_number= 0;
3237 remote_error_buf[0]=
'\0';
3239 DBUG_PRINT(
"exit", (
"message: %s", buf->ptr()));
3257 MYSQL_RES *result= mysql_store_result(mysql_arg);
3258 DBUG_ENTER(
"ha_federated::store_result");
3261 (void) insert_dynamic(&results, &result);
3263 position_called= FALSE;
3264 DBUG_RETURN(result);
3268 void ha_federated::free_result()
3270 DBUG_ENTER(
"ha_federated::free_result");
3271 if (stored_result && !position_called)
3273 mysql_free_result(stored_result);
3275 if (results.elements > 0)
3282 int ha_federated::external_lock(THD *thd,
int lock_type)
3285 DBUG_ENTER(
"ha_federated::external_lock");
3290 #ifdef XXX_SUPERCEDED_BY_WL2952
3291 if (lock_type != F_UNLCK)
3295 DBUG_PRINT(
"info",(
"federated not lock F_UNLCK"));
3296 if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
3298 DBUG_PRINT(
"info",(
"federated autocommit"));
3302 error= connection_autocommit(TRUE);
3305 DBUG_PRINT(
"info", (
"error setting autocommit TRUE: %d", error));
3312 DBUG_PRINT(
"info",(
"not autocommit"));
3318 error= connection_autocommit(FALSE);
3321 DBUG_PRINT(
"info", (
"error setting autocommit FALSE: %d", error));
3324 thd_set_ha_data(thd, ht,
this);
3330 if (thd->options & (OPTION_TABLE_LOCK))
3332 DBUG_PRINT(
"info", (
"We do not support lock table yet"));
3338 for (ptr= trx; ptr; ptr= ptr->trx_next)
3341 else if (!ptr->trx_next)
3342 ptr->trx_next=
this;
3351 static int federated_commit(
handlerton *hton, THD *thd,
bool all)
3355 DBUG_ENTER(
"federated_commit");
3361 for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3364 old->trx_next= NULL;
3365 error= ptr->connection_commit();
3366 if (error && !return_val)
3369 thd_set_ha_data(thd, hton, NULL);
3372 DBUG_PRINT(
"info", (
"error val: %d", return_val));
3373 DBUG_RETURN(return_val);
3377 static int federated_rollback(
handlerton *hton, THD *thd,
bool all)
3381 DBUG_ENTER(
"federated_rollback");
3387 for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
3390 old->trx_next= NULL;
3391 error= ptr->connection_rollback();
3392 if (error && !return_val)
3395 thd_set_ha_data(thd, hton, NULL);
3398 DBUG_PRINT(
"info", (
"error val: %d", return_val));
3399 DBUG_RETURN(return_val);
3402 int ha_federated::connection_commit()
3404 DBUG_ENTER(
"ha_federated::connection_commit");
3405 DBUG_RETURN(execute_simple_query(
"COMMIT", 6));
3409 int ha_federated::connection_rollback()
3411 DBUG_ENTER(
"ha_federated::connection_rollback");
3412 DBUG_RETURN(execute_simple_query(
"ROLLBACK", 8));
3416 int ha_federated::connection_autocommit(
bool state)
3419 DBUG_ENTER(
"ha_federated::connection_autocommit");
3420 text= (state == TRUE) ?
"SET AUTOCOMMIT=1" :
"SET AUTOCOMMIT=0";
3421 DBUG_RETURN(execute_simple_query(text, 16));
3425 int ha_federated::execute_simple_query(
const char *query,
int len)
3427 DBUG_ENTER(
"ha_federated::execute_simple_query");
3429 if (mysql_real_query(mysql, query, len))
3431 DBUG_RETURN(stash_remote_error());
3437 { MYSQL_HANDLERTON_INTERFACE_VERSION };
3439 mysql_declare_plugin(federated)
3441 MYSQL_STORAGE_ENGINE_PLUGIN,
3442 &federated_storage_engine,
3444 "Patrick Galbraith and Brian Aker, MySQL AB",
3445 "Federated MySQL storage engine",
3455 mysql_declare_plugin_end;