19 #include "consumer_restore.hpp"
22 extern FilteredNdbOut err;
23 extern FilteredNdbOut info;
24 extern FilteredNdbOut debug;
27 static void callback(
int result,
NdbTransaction* trans,
void* aObject);
33 if (!m_restore && !m_restore_meta)
42 m_ndb->useFullyQualifiedNames(
false);
47 ndbout <<
"Failed to connect to ndb!!" << endl;
50 ndbout <<
"Connected to ndb!!" << endl;
55 if ( mysql_thread_safe() == 0 )
57 ndbout <<
"Not thread safe mysql library..." << endl;
61 ndbout <<
"Connecting to MySQL..." <<endl;
69 bool returnValue =
true;
73 if ( mysql_real_connect(&mysql,
82 ndbout_c(
"Connect failed: %s", mysql_error(&mysql));
86 ndbout <<
"Connected to MySQL!!!" <<endl;
106 ndbout <<
"Failed to allocate callback structs" << endl;
110 m_free_callback = m_callback;
111 for (
int i= 0;
i < m_parallelism;
i++) {
112 m_callback[
i].restore =
this;
113 m_callback[
i].connection = 0;
114 m_callback[
i].retries = 0;
116 m_callback[
i-1].next = &(m_callback[
i]);
118 m_callback[m_parallelism-1].next = 0;
124 BackupRestore::~BackupRestore()
130 delete [] m_callback;
141 char tmpTabName[MAX_TAB_NAME_SIZE*2];
142 sprintf(tmpTabName,
"%s", table.getTableName());
143 char * database = strtok(tmpTabName,
"/");
144 char * schema = strtok( NULL ,
"/");
145 char * tableName = strtok( NULL ,
"/");
158 char stmtCreateDB[255];
159 sprintf(stmtCreateDB,
"CREATE DATABASE %s", database);
162 if (mysql_query(mysqlp,stmtCreateDB) == 0)
167 if (mysql_select_db(&mysql, database) != 0)
169 ndbout_c(
"Error: %s", mysql_error(&mysql));
177 if (create_table_string(table, tableName, buf))
179 ndbout_c(
"Unable to create a table definition since the "
180 "backup contains undefined types");
186 if (mysql_query(mysqlp,buf) != 0)
188 ndbout_c(
"Error: %s", mysql_error(&mysql));
192 ndbout_c(
"Successfully restored table %s into database %s", tableName, database);
208 err <<
"Create table " << table.getTableName() <<
" failed: "
212 info <<
"Successfully restored table " << table.getTableName()<< endl ;
216 void BackupRestore::tuple(
const TupleS & tup, Uint32 fragId)
228 m_free_callback = cb->next;
235 if (m_free_callback == 0)
245 while (cb->retries < 10)
251 if (cb->connection == NULL)
263 const TupleS &tup = *(cb->tup);
264 const TableS * table = tup.getTable();
269 if (asynchErrorHandler(cb->connection, m_ndb))
279 if (asynchErrorHandler(cb->connection, m_ndb))
288 for (
int i = 0;
i < tup.getNoOfAttributes();
i++)
291 int size = attr->Desc->size;
292 int arraySize = attr->Desc->arraySize;
293 char * dataPtr = attr->Data.string_value;
294 Uint32 length = (size * arraySize) / 8;
297 ret = op->
equal(
i, dataPtr, length);
309 ndbout_c(
"Column: %d type %d",
i,
311 if (asynchErrorHandler(cb->connection, m_ndb))
326 ndbout_c(
"Unable to recover from errors. Exiting...");
337 if (asynchErrorHandler(cb->connection, m_ndb))
344 ndbout_c(
"Restore: Failed to restore data "
345 "due to a unrecoverable error. Exiting...");
362 void BackupRestore::asynchExitHandler()
369 #if 0 // old tuple impl
371 BackupRestore::tuple(
const TupleS & tup)
381 ndbout <<
"Cannot start transaction" << endl;
385 const TableS * table = tup.getTable();
389 ndbout <<
"Cannot get operation: ";
397 ndbout <<
"writeTuple call failed: ";
402 for (
int i = 0;
i < tup.getNoOfAttributes();
i++)
405 int size = attr->Desc->size;
406 int arraySize = attr->Desc->arraySize;
407 const char * dataPtr = attr->Data.string_value;
409 const Uint32 length = (size * arraySize) / 8;
411 op->
equal(
i, dataPtr, length);
414 for (
int i = 0;
i < tup.getNoOfAttributes();
i++)
417 int size = attr->Desc->size;
418 int arraySize = attr->Desc->arraySize;
419 const char * dataPtr = attr->Data.string_value;
421 const Uint32 length = (size * arraySize) / 8;
428 int ret = trans->
execute(Commit);
431 ndbout <<
"execute failed: ";
444 BackupRestore::endOfTuples()
453 m_ndb->
pollNdb(3000, m_transactions);
461 BackupRestore::logEntry(
const LogEntry & tup)
470 ndbout <<
"Cannot start transaction" << endl;
474 const TableS * table = tup.m_table;
478 ndbout <<
"Cannot get operation: ";
486 case LogEntry::LE_INSERT:
489 case LogEntry::LE_UPDATE:
492 case LogEntry::LE_DELETE:
496 ndbout <<
"Log entry has wrong operation type."
501 for (
int i = 0;
i < tup.m_values.size();
i++)
504 int size = attr->Desc->size;
505 int arraySize = attr->Desc->arraySize;
506 const char * dataPtr = attr->Data.string_value;
508 const Uint32 length = (size / 8) * arraySize;
510 op->
equal(attr->Desc->attrId, dataPtr, length);
512 op->
setValue(attr->Desc->attrId, dataPtr, length);
518 const int ret = trans->
execute(Commit);
524 ndbout <<
"execute failed: ";
535 BackupRestore::endOfLogEntrys()
539 ndbout <<
"Restored " << m_dataCount <<
" tuples and "
540 << m_logCount <<
" log entries" << endl;
556 static void restoreCallback(
int result,
560 static Uint32 counter = 0;
563 debug <<
"restoreCallback function called " << counter <<
" time(s)" << endl;
569 ndbout <<
" restoreCallback (" << counter;
570 if ((counter % 10) == 1)
574 else if ((counter % 10) == 2)
578 else if ((counter % 10 ) ==3)
586 err <<
" time: error detected " <<
object->getNdbError() << endl;
607 (cb->restore)->cback(result, cb);
628 NdbSleep_MilliSleep(10);
634 ndbout << error << endl;
645 NdbSleep_MilliSleep(10);
651 ndbout << error << endl;