19 #include "ha_ndbcluster_glue.h"
21 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
22 #include "ha_ndbinfo.h"
23 #include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
26 static MYSQL_THDVAR_UINT(
29 "Specify max number of rows to fetch per roundtrip to cluster",
38 static MYSQL_THDVAR_UINT(
41 "Specify approx. max number of bytes to fetch per roundtrip to cluster",
50 static MYSQL_THDVAR_BOOL(
53 "Control if tables should be visible or not",
59 static char* opt_ndbinfo_dbname = (
char*)
"ndbinfo";
60 static MYSQL_SYSVAR_STR(
63 PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
64 "Name of the database used by ndbinfo",
70 static char* opt_ndbinfo_table_prefix = (
char*)
"ndb$";
71 static MYSQL_SYSVAR_STR(
73 opt_ndbinfo_table_prefix,
74 PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
75 "Prefix to use for all virtual tables loaded from NDB",
81 static Uint32 opt_ndbinfo_version = NDB_VERSION_D;
82 static MYSQL_SYSVAR_UINT(
85 PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_READONLY,
86 "Compile version for ndbinfo",
95 static my_bool opt_ndbinfo_offline;
100 void* var_ptr,
const void* save)
102 DBUG_ENTER(
"offline_update");
104 const my_bool new_offline =
105 (*(
static_cast<const my_bool*
>(save)) != 0);
106 if (new_offline == opt_ndbinfo_offline)
114 opt_ndbinfo_offline = new_offline;
117 (void)close_cached_tables(thd, NULL,
false,
true,
false);
122 static MYSQL_SYSVAR_BOOL(
126 "Set ndbinfo in offline mode, tables and views can "
127 "be opened even if they don't exist or have different "
128 "definition in NDB. No rows will be returned.",
140 ndbcluster_is_disabled(
void)
147 if (g_ndb_cluster_connection)
149 assert(g_ndbinfo == NULL);
156 return new (mem_root)
ha_ndbinfo(hton, table);
159 struct ha_ndbinfo_impl
180 :
handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
184 ha_ndbinfo::~ha_ndbinfo()
189 enum ndbinfo_error_codes {
190 ERR_INCOMPAT_TABLE_DEF = 40001
193 struct error_message {
196 } error_messages[] = {
197 { ERR_INCOMPAT_TABLE_DEF,
"Incompatible table definitions" },
198 { HA_ERR_NO_CONNECTION,
"Connection to NDB failed" },
204 const char* find_error_message(
int error)
206 struct error_message* err = error_messages;
207 while (err->error && err->message)
209 if (err->error == error)
211 assert(err->message);
219 static int err2mysql(
int error)
221 DBUG_ENTER(
"err2mysql");
222 DBUG_PRINT(
"enter", (
"error: %d", error));
226 case NdbInfo::ERR_ClusterFailure:
227 DBUG_RETURN(HA_ERR_NO_CONNECTION);
229 case NdbInfo::ERR_OutOfMemory:
230 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
235 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
236 ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
237 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
242 DBUG_ENTER(
"ha_ndbinfo::get_error_message");
243 DBUG_PRINT(
"enter", (
"error: %d", error));
245 const char*
message = find_error_message(error);
249 buf->set(message, strlen(message), &my_charset_bin);
250 DBUG_PRINT(
"exit", (
"message: %s", buf->ptr()));
257 sql.
appfmt(
"'CREATE TABLE `%s`.`%s%s` (",
258 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix, ndb_tab->getName());
260 const char* separator =
"";
261 for (
unsigned i = 0;
i < ndb_tab->columns();
i++)
265 sql.
appfmt(
"%s", separator);
272 case NdbInfo::Column::Number:
273 sql.
appfmt(
"INT UNSIGNED");
275 case NdbInfo::Column::Number64:
276 sql.
appfmt(
"BIGINT UNSIGNED");
278 case NdbInfo::Column::String:
279 sql.
appfmt(
"VARCHAR(512)");
287 sql.
appfmt(
") ENGINE=NDBINFO'");
297 const char* format, ...)
300 DBUG_ENTER(
"warn_incompatible");
301 DBUG_PRINT(
"enter",(
"table_name: %s, fatal: %d", ndb_tab->getName(), fatal));
302 DBUG_ASSERT(format != NULL);
305 char explanation[128];
306 va_start(args,format);
307 my_vsnprintf(explanation,
sizeof(explanation), format, args);
310 msg.
assfmt(
"Table '%s%s' is defined differently in NDB, %s. The "
311 "SQL to regenerate is: ",
312 opt_ndbinfo_table_prefix, ndb_tab->getName(), explanation);
313 generate_sql(ndb_tab, msg);
315 const Sql_condition::enum_warning_level
level =
316 (fatal ? Sql_condition::WARN_LEVEL_WARN : Sql_condition::WARN_LEVEL_NOTE);
317 push_warning(current_thd, level, ERR_INCOMPAT_TABLE_DEF, msg.
c_str());
325 DBUG_ENTER(
"ha_ndbinfo::create");
326 DBUG_PRINT(
"enter", (
"name: %s", name));
331 bool ha_ndbinfo::is_open(
void)
const
333 return m_impl.m_table != NULL;
336 bool ha_ndbinfo::is_offline(
void)
const
338 return m_impl.m_offline;
341 int ha_ndbinfo::open(
const char *name,
int mode, uint test_if_locked)
343 DBUG_ENTER(
"ha_ndbinfo::open");
344 DBUG_PRINT(
"enter", (
"name: %s, mode: %d", name, mode));
347 assert(!is_offline());
351 if (table->db_stat & HA_TRY_READ_ONLY)
353 DBUG_PRINT(
"info", (
"Telling server to use readonly mode"));
360 if (opt_ndbinfo_offline ||
361 ndbcluster_is_disabled())
364 m_impl.m_offline =
true;
368 int err = g_ndbinfo->openTable(name, &m_impl.m_table);
371 assert(m_impl.m_table == 0);
372 if (err == NdbInfo::ERR_NoSuchTable)
373 DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
374 DBUG_RETURN(err2mysql(err));
382 DBUG_PRINT(
"info", (
"Comparing MySQL's table def against NDB"));
384 for (uint
i = 0;
i < table->s->fields;
i++)
386 const Field* field = table->field[
i];
389 if (const_cast<Field*>(field)->real_maybe_null() ==
false)
392 warn_incompatible(ndb_tab,
true,
393 "column '%s' is NOT NULL",
395 delete m_impl.m_table; m_impl.m_table= 0;
396 DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
408 bool compatible =
false;
411 case NdbInfo::Column::Number:
412 if (field->type() == MYSQL_TYPE_LONG)
415 case NdbInfo::Column::Number64:
416 if (field->type() == MYSQL_TYPE_LONGLONG)
419 case NdbInfo::Column::String:
420 if (field->type() == MYSQL_TYPE_VARCHAR)
430 warn_incompatible(ndb_tab,
true,
431 "column '%s' is not compatible",
433 delete m_impl.m_table; m_impl.m_table= 0;
434 DBUG_RETURN(ERR_INCOMPAT_TABLE_DEF);
440 for (uint
i = 0;
i < table->s->fields;
i++)
442 DBUG_PRINT(
"info", (
"ref_length: %u",
ref_length));
447 int ha_ndbinfo::close(
void)
449 DBUG_ENTER(
"ha_ndbinfo::close");
457 g_ndbinfo->closeTable(m_impl.m_table);
458 m_impl.m_table = NULL;
465 DBUG_ENTER(
"ha_ndbinfo::rnd_init");
466 DBUG_PRINT(
"info", (
"scan: %d", scan));
470 push_warning(current_thd, Sql_condition::WARN_LEVEL_NOTE, 1,
471 "'NDBINFO' has been started in offline mode "
472 "since the 'NDBCLUSTER' engine is disabled "
473 "or @@global.ndbinfo_offline is turned on "
474 "- no rows can be returned");
479 assert(m_impl.m_scan_op == NULL);
481 if (m_impl.m_first_use)
483 m_impl.m_first_use =
false;
492 uint fields_found_in_ndb = 0;
494 for (uint
i = 0;
i < table->s->fields;
i++)
496 const Field* field = table->field[
i];
501 warn_incompatible(ndb_tab,
true,
502 "column '%s' does not exist",
506 fields_found_in_ndb++;
509 if (fields_found_in_ndb < ndb_tab->columns())
512 warn_incompatible(ndb_tab,
false,
513 "there are more columns available");
520 DBUG_PRINT(
"info", (
"not scan"));
524 THD* thd = current_thd;
527 if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
529 THDVAR(thd, max_rows),
530 THDVAR(thd, max_bytes))) != 0)
531 DBUG_RETURN(err2mysql(err));
533 if ((err = scan_op->readTuples()) != 0)
534 DBUG_RETURN(err2mysql(err));
537 for (uint
i = 0;
i < table->s->fields;
i++)
539 Field *field = table->field[
i];
540 if (bitmap_is_set(table->read_set,
i))
541 m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
543 m_impl.m_columns.push_back(NULL);
546 if ((err = scan_op->execute()) != 0)
547 DBUG_RETURN(err2mysql(err));
549 m_impl.m_scan_op = scan_op;
553 int ha_ndbinfo::rnd_end()
555 DBUG_ENTER(
"ha_ndbinfo::rnd_end");
562 if (m_impl.m_scan_op)
564 g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
565 m_impl.m_scan_op = NULL;
567 m_impl.m_columns.clear();
575 DBUG_ENTER(
"ha_ndbinfo::rnd_next");
578 DBUG_RETURN(HA_ERR_END_OF_FILE);
581 assert(m_impl.m_scan_op);
583 if ((err = m_impl.m_scan_op->nextResult()) == 0)
584 DBUG_RETURN(HA_ERR_END_OF_FILE);
587 DBUG_RETURN(err2mysql(err));
596 DBUG_ENTER(
"ha_ndbinfo::rnd_pos");
598 assert(m_impl.m_scan_op == NULL);
602 for (uint
i = 0;
i < table->s->fields;
i++)
603 table->field[
i]->set_notnull();
608 void ha_ndbinfo::position(
const uchar *
record)
610 DBUG_ENTER(
"ha_ndbinfo::position");
612 assert(m_impl.m_scan_op);
620 int ha_ndbinfo::info(uint flag)
622 DBUG_ENTER(
"ha_ndbinfo::info");
623 DBUG_PRINT(
"enter", (
"flag: %d", flag));
628 ha_ndbinfo::unpack_record(uchar *dst_row)
630 DBUG_ENTER(
"ha_ndbinfo::unpack_record");
631 my_ptrdiff_t dst_offset = dst_row - table->record[0];
633 for (uint
i = 0;
i < table->s->fields;
i++)
635 Field *field = table->field[
i];
637 if (record && !record->isNULL())
639 field->set_notnull();
640 field->move_field_offset(dst_offset);
641 switch (field->type()) {
643 case (MYSQL_TYPE_VARCHAR):
645 DBUG_PRINT(
"info", (
"str: %s", record->c_str()));
648 my_bitmap_map *old_map =
649 dbug_tmp_use_all_columns(table, table->write_set);
650 (void)vfield->store(record->c_str(),
651 MIN(record->length(), field->field_length)-1,
653 dbug_tmp_restore_column_map(table->write_set, old_map);
657 case (MYSQL_TYPE_LONG):
659 memcpy(field->ptr, record->ptr(),
sizeof(Uint32));
663 case (MYSQL_TYPE_LONGLONG):
665 memcpy(field->ptr, record->ptr(),
sizeof(Uint64));
670 sql_print_error(
"Found unexpected field type %u", field->type());
674 field->move_field_offset(-dst_offset);
686 ndbinfo_find_files(
handlerton *hton, THD *thd,
687 const char *db,
const char *path,
690 DBUG_ENTER(
"ndbinfo_find_files");
691 DBUG_PRINT(
"enter", (
"db: '%s', dir: %d, path: '%s'", db, dir, path));
693 const bool show_hidden = THDVAR(thd, show_hidden);
700 if (!ndbcluster_is_disabled())
706 while ((dir_name=it++))
708 if (strcmp(dir_name->str, opt_ndbinfo_dbname))
711 DBUG_PRINT(
"info", (
"Hiding own databse '%s'", dir_name->str));
719 if (strcmp(db, opt_ndbinfo_dbname))
725 while ((file_name=it++))
727 if (is_prefix(file_name->str, opt_ndbinfo_table_prefix))
729 DBUG_PRINT(
"info", (
"Hiding '%s'", file_name->str));
740 int ndbinfo_init(
void *plugin)
742 DBUG_ENTER(
"ndbinfo_init");
745 hton->create = create_handler;
747 HTON_TEMPORARY_NOT_SUPPORTED |
748 HTON_ALTER_NOT_SUPPORTED;
749 hton->find_files = ndbinfo_find_files;
753 if (ndbcluster_is_disabled())
759 char prefix[FN_REFLEN];
760 build_table_filename(prefix,
sizeof(prefix) - 1,
761 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix,
"", 0);
762 DBUG_PRINT(
"info", (
"prefix: '%s'", prefix));
763 assert(g_ndb_cluster_connection);
764 g_ndbinfo =
new NdbInfo(g_ndb_cluster_connection, prefix,
765 opt_ndbinfo_dbname, opt_ndbinfo_table_prefix);
768 sql_print_error(
"Failed to create NdbInfo");
772 if (!g_ndbinfo->init())
774 sql_print_error(
"Failed to init NdbInfo");
785 int ndbinfo_deinit(
void *plugin)
787 DBUG_ENTER(
"ndbinfo_deinit");
799 MYSQL_SYSVAR(max_rows),
800 MYSQL_SYSVAR(max_bytes),
801 MYSQL_SYSVAR(show_hidden),
802 MYSQL_SYSVAR(database),
803 MYSQL_SYSVAR(table_prefix),
804 MYSQL_SYSVAR(version),
805 MYSQL_SYSVAR(offline),