20 #include <NdbRestarter.hpp>
21 #include <HugoOperations.hpp>
22 #include <HugoTransactions.hpp>
23 #include <UtilTransactions.hpp>
24 #include <signaldata/DumpStateOrd.hpp>
27 #include <InputStream.hpp>
40 static CASE g_op_types[] =
42 {
false,
true,
false,
"INS", 0, 0, 0 },
43 {
true,
true,
false,
"UPD", 0, 0, 0 },
44 {
true,
false,
false,
"DEL", 0, 0, 0 },
46 {
false,
true,
false,
"INS",
"UPD", 0, 0 },
47 {
false,
false,
false,
"INS",
"DEL", 0, 0 },
48 {
true,
true,
false,
"UPD",
"UPD", 0, 0 },
49 {
true,
false,
false,
"UPD",
"DEL", 0, 0 },
50 {
true,
true,
false,
"DEL",
"INS", 0, 0 },
52 {
false,
true,
false,
"INS",
"DEL",
"INS", 0 },
53 {
true,
false,
false,
"DEL",
"INS",
"DEL", 0 }
55 const size_t OP_COUNT = (
sizeof(g_op_types)/
sizeof(g_op_types[0]));
57 static Ndb* g_ndb = 0;
61 static int g_use_ops = 1 | 2 | 4;
62 static int g_cases = 0x1;
63 static int g_case_loop = 2;
64 static int g_rows = 10;
65 static int g_setup_tables = 1;
66 static int g_one_op_at_a_time = 0;
67 static const char * g_tablename =
"T1";
71 static int init_ndb(
int argc,
char** argv);
72 static int parse_args(
int argc,
char** argv);
73 static int connect_ndb();
74 static int drop_all_tables();
75 static int load_table();
76 static int pause_lcp(
int error);
77 static int do_op(
int row);
78 static int continue_lcp(
int error = 0);
81 static int validate();
84 main(
int argc,
char ** argv){
86 require(!init_ndb(argc, argv));
87 if(parse_args(argc, argv))
89 require(!connect_ndb());
92 require(!drop_all_tables());
94 if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
101 g_err <<
"Failed to retreive table: " << g_tablename << endl;
105 require(!g_hugo_ops->startTransaction(g_ndb));
107 g_ops=
new CASE[g_rows];
109 const int use_ops = g_use_ops;
110 for(
size_t i = 0;
i<OP_COUNT;
i++)
112 if(g_one_op_at_a_time){
113 while(
i < OP_COUNT && (use_ops & (1 <<
i)) == 0)
i++;
116 ndbout_c(
"-- loop\noperation: %c use_ops: %x",
int(
'a'+
i), use_ops);
117 g_use_ops = (1 <<
i);
123 if((1 << test_case++) & g_cases)
125 for(
size_t tl = 0; tl<(size_t)g_case_loop; tl++){
126 g_info <<
"Performing all ops wo/ inteference of LCP" << endl;
128 g_info <<
"Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
129 g_info <<
" where ZLCP_OP_WRITE_RT_BREAK is "
130 " finished before SAVE_PAGES" << endl;
131 require(!load_table());
132 require(!pause_lcp(5900));
133 for(
size_t j = 0; j<(size_t)g_rows; j++){
136 require(!continue_lcp(5900));
138 require(!pause_lcp(5900));
140 require(!validate());
144 if((1 << test_case++) & g_cases)
146 for(
size_t tl = 0; tl<(size_t)g_case_loop; tl++){
147 g_info <<
"Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
148 g_info <<
" where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
150 require(!load_table());
151 require(!pause_lcp(5901));
152 for(
size_t j = 0; j<(size_t)g_rows; j++){
155 require(!continue_lcp(5901));
157 require(!pause_lcp(5900));
159 require(!validate());
163 if((1 << test_case++) & g_cases)
165 for(
size_t tl = 0; tl<(size_t)g_case_loop; tl++){
166 g_info <<
"Testing pre LCP operations, undo-ed at commit" << endl;
167 require(!load_table());
168 require(!pause_lcp(5902));
169 for(
size_t j = 0; j<(size_t)g_rows; j++){
172 require(!continue_lcp(5902));
174 require(!continue_lcp(5903));
175 require(!pause_lcp(5900));
177 require(!validate());
181 if((1 << test_case++) & g_cases)
183 for(
size_t tl = 0; tl<(size_t)g_case_loop; tl++){
184 g_info <<
"Testing prepared during LCP and committed after" << endl;
185 require(!load_table());
186 require(!pause_lcp(5904));
187 for(
size_t j = 0; j<(size_t)g_rows; j++){
190 require(!continue_lcp(5904));
191 require(!pause_lcp(5900));
194 require(!validate());
200 static int init_ndb(
int argc,
char** argv)
206 static int parse_args(
int argc,
char** argv)
209 char * ops= 0, *cases=0;
211 {
"records", 0, arg_integer, &g_rows,
"Number of records",
"records" },
212 {
"operations",
'o', arg_string, &ops,
"Operations [a-h]", 0 },
213 {
"1",
'1', arg_flag, &g_one_op_at_a_time,
"One op at a time", 0 },
214 {
"0",
'0', arg_negative_flag, &g_one_op_at_a_time,
"All ops at once", 0 },
215 {
"cases",
'c', arg_string, &cases,
"Cases [a-c]", 0 },
216 { 0,
't', arg_flag, &g_setup_tables,
"Create table", 0 },
217 { 0,
'u', arg_negative_flag, &g_setup_tables,
"Dont create table", 0 }
221 const int num_args =
sizeof(args)/
sizeof(args[0]);
222 if(getarg(args, num_args, argc, (
const char**)argv, &optind)) {
223 arg_printusage(args, num_args, argv[0],
" tabname1\n");
224 ndbout_c(
"\n -- Operations [a-%c] = ",
int(
'a'+OP_COUNT-1));
225 for(i = 0; i<OP_COUNT; i++){
226 ndbout_c(
"\t%c = %s %s",
227 int(
'a'+i), g_op_types[i].op1,
228 g_op_types[i].op2 ? g_op_types[i].op2 :
"");
237 g_use_ops |= (1 << ((* s++) -
'a'));
244 g_cases |= (1 << ((* s++) -
'a'));
247 ndbout_c(
"table: %s", g_tablename);
248 printf(
"operations: ");
249 for(i = 0; i<OP_COUNT; i++)
250 if(g_use_ops & (1 << i))
251 printf(
"%c",
int(
'a'+i));
254 printf(
"test cases: ");
256 if(g_cases & (1 << i))
257 printf(
"%c",
int(
'1'+i));
259 printf(
"-------------\n");
263 static int connect_ndb()
266 if(g_cluster_connection->
connect(12, 5, 1) != 0)
271 g_ndb =
new Ndb(g_cluster_connection,
"TEST_DB");
281 static int disconnect_ndb()
284 delete g_cluster_connection;
287 g_cluster_connection= 0;
291 static int drop_all_tables()
301 g_err <<
"Failed to list tables: " << endl
305 for (
unsigned i = 0; i < list.
count; i++) {
313 g_err <<
"Failed to drop table: "
336 static int load_table()
339 require(!clear.clearTable(g_ndb));
342 require(!ops.startTransaction(g_ndb));
345 size_t uncommitted = 0;
347 for(
size_t i = 0; i<(size_t)g_rows; i++){
348 for(op %= OP_COUNT; !((1 << op) & g_use_ops); op = (op + 1) % OP_COUNT);
349 g_ops[
i] = g_op_types[op++];
350 if(g_ops[i].start_row){
351 g_ops[
i].curr_row =
true;
352 g_ops[
i].val = rand();
353 require(!ops.pkInsertRecord(g_ndb, i, 1, g_ops[i].val));
356 g_ops[
i].curr_row =
false;
358 if(uncommitted >= 100){
359 require(!ops.execute_Commit(g_ndb));
360 require(!ops.getTransaction()->restart());
366 require(!ops.execute_Commit(g_ndb));
368 require(!ops.closeTransaction(g_ndb));
370 g_info <<
"Inserted " << rows <<
" rows" << endl;
374 static int pause_lcp(
int error)
376 int nodes = g_restarter.getNumDbNodes();
380 NDB_SOCKET_TYPE my_fd;
389 require(my_socket_valid(my_fd));
390 require(!g_restarter.insertErrorInAllNodes(error));
391 int dump[] = { DumpStateOrd::DihStartLcpImmediately };
392 require(!g_restarter.dumpStateAllNodes(dump, 1));
399 tmp = in.gets(buf, 1024);
403 if(sscanf(tmp,
"%*[^:]: LCP: %d ", &
id) == 1 &&
id == error &&
405 my_socket_close(my_fd);
409 }
while(count++ < 30);
411 my_socket_close(my_fd);
415 static int do_op(
int row)
418 if(strcmp(g_ops[row].op1,
"INS") == 0){
419 require(!g_ops[row].curr_row);
420 g_ops[row].curr_row =
true;
421 g_ops[row].val = rand();
422 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
423 }
else if(strcmp(g_ops[row].op1,
"UPD") == 0){
424 require(g_ops[row].curr_row);
425 g_ops[row].val = rand();
426 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
427 }
else if(strcmp(g_ops[row].op1,
"DEL") == 0){
428 require(g_ops[row].curr_row);
429 g_ops[row].curr_row =
false;
430 require(!ops.pkDeleteRecord(g_ndb, row, 1));
433 require(!ops.execute_NoCommit(g_ndb));
435 if(g_ops[row].op2 == 0){
436 }
else if(strcmp(g_ops[row].op2,
"INS") == 0){
437 require(!g_ops[row].curr_row);
438 g_ops[row].curr_row =
true;
439 g_ops[row].val = rand();
440 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
441 }
else if(strcmp(g_ops[row].op2,
"UPD") == 0){
442 require(g_ops[row].curr_row);
443 g_ops[row].val = rand();
444 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
445 }
else if(strcmp(g_ops[row].op2,
"DEL") == 0){
446 require(g_ops[row].curr_row);
447 g_ops[row].curr_row =
false;
448 require(!ops.pkDeleteRecord(g_ndb, row, 1));
451 if(g_ops[row].op2 != 0)
452 require(!ops.execute_NoCommit(g_ndb));
454 if(g_ops[row].op3 == 0){
455 }
else if(strcmp(g_ops[row].op3,
"INS") == 0){
456 require(!g_ops[row].curr_row);
457 g_ops[row].curr_row =
true;
458 g_ops[row].val = rand();
459 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
460 }
else if(strcmp(g_ops[row].op3,
"UPD") == 0){
461 require(g_ops[row].curr_row);
462 g_ops[row].val = rand();
463 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
464 }
else if(strcmp(g_ops[row].op3,
"DEL") == 0){
465 require(g_ops[row].curr_row);
466 g_ops[row].curr_row =
false;
467 require(!ops.pkDeleteRecord(g_ndb, row, 1));
470 if(g_ops[row].op3 != 0)
471 require(!ops.execute_NoCommit(g_ndb));
476 static int continue_lcp(
int error)
479 NDB_SOCKET_TYPE my_fd;
480 my_socket_invalidate(&my_fd);
494 require(my_socket_valid(my_fd));
497 int args[] = { DumpStateOrd::LCPContinue };
498 if(g_restarter.dumpStateAllNodes(args, 1) != 0)
506 int nodes = g_restarter.getNumDbNodes();
508 tmp = in.gets(buf, 1024);
512 if(sscanf(tmp,
"%*[^:]: LCP: %d ", &
id) == 1 &&
id == error &&
514 my_socket_close(my_fd);
518 }
while(count++ < 30);
520 my_socket_close(my_fd);
528 int res = ops.execute_Commit(g_ndb);
530 return ops.getTransaction()->
restart();
537 g_info <<
"Restarting cluster" << endl;
538 g_hugo_ops->closeTransaction(g_ndb);
542 require(!g_restarter.restartAll());
543 require(!g_restarter.waitClusterStarted(30));
544 require(!connect_ndb());
549 require(!g_hugo_ops->startTransaction(g_ndb));
553 static int validate()
556 for(
size_t i = 0; i<(size_t)g_rows; i++){
557 require(g_ops[i].curr_row == g_ops[i].end_row);
558 require(!ops.startTransaction(g_ndb));
559 ops.pkReadRecord(g_ndb, i, 1);
560 int res = ops.execute_Commit(g_ndb);
561 if(g_ops[i].curr_row){
562 require(res == 0 && ops.verifyUpdatesValue(g_ops[i].val) == 0);
566 ops.closeTransaction(g_ndb);
569 for(
size_t j = 0; j<10; j++){
571 require(!clear.clearTable(g_ndb));
574 require(trans.loadTable(g_ndb, 1024) == 0);