23 #include <HugoTransactions.hpp>
27 #define BATCH_SIZE 128
44 static char* event_name(uint etype,
char *
buf)
48 strcpy(buf,
"TE_INSERT");
51 strcpy(buf,
"TE_DELETE");
54 strcpy(buf,
"TE_UPDATE");
57 strcpy(buf,
"TE_CLUSTER_FAILURE");
60 strcpy(buf,
"TE_ALTER");
63 strcpy(buf,
"TE_DROP");
66 strcpy(buf,
"TE_NODE_FAILURE");
69 strcpy(buf,
"TE_SUBSCRIBE");
72 strcpy(buf,
"TE_UNSUBSCRIBE");
75 strcpy(buf,
"unknown");
80 static void do_begin(
Ndb *ndb,
struct Trans_arg &trans_arg)
84 trans_arg.bytes_batched = 0;
94 for (i= 0; i < n_columns; i++)
97 op->
equal(i, ev[i]->aRef()))
111 for (i= 0; i < n_columns; i++)
123 if (!trans_arg.trans)
131 do_set_value(op, pOp);
133 trans_arg.bytes_batched++;
134 if (trans_arg.bytes_batched > BATCH_SIZE)
137 trans_arg.bytes_batched = 0;
142 if (!trans_arg.trans)
150 do_set_value(op, pOp);
152 trans_arg.bytes_batched++;
153 if (trans_arg.bytes_batched > BATCH_SIZE)
156 trans_arg.bytes_batched = 0;
161 if (!trans_arg.trans)
170 trans_arg.bytes_batched++;
171 if (trans_arg.bytes_batched > BATCH_SIZE)
174 trans_arg.bytes_batched = 0;
177 static void do_commit(
struct Trans_arg &trans_arg)
179 if (!trans_arg.trans)
186 main(
int argc,
const char** argv){
192 const char* connectstring1 = 0;
193 const char* connectstring2 = 0;
196 {
"connectstring1",
'c',
197 arg_string, &connectstring1,
"connectstring1",
"" },
198 {
"connectstring2",
'C',
199 arg_string, &connectstring2,
"connectstring2",
"" },
200 {
"database",
'd', arg_string, &db,
"Database",
"" },
201 {
"usage",
'?', arg_flag, &_help,
"Print help",
"" }
203 int num_args =
sizeof(args) /
sizeof(args[0]);
206 "<tabname>+ \nThis program listen to events on specified tables\n";
208 if(getarg(args, num_args, argc, argv, &optind) ||
209 argv[optind] == NULL || _help) {
210 arg_printusage(args, num_args, argv[0], desc);
211 return NDBT_ProgramExit(NDBT_WRONGARGS);
218 return NDBT_ProgramExit(NDBT_FAILED);
220 Ndb MyNdb( &con, db ? db :
"TEST_DB" );
222 if(MyNdb.init() != 0){
223 ERR(MyNdb.getNdbError());
224 return NDBT_ProgramExit(NDBT_FAILED);
228 while(MyNdb.waitUntilReady() != 0)
229 ndbout <<
"Waiting for ndb to become ready..." << endl;
237 if(con2->
connect(12, 5, 1) != 0)
239 return NDBT_ProgramExit(NDBT_FAILED);
241 ndb2 =
new Ndb( con2, db ? db :
"TEST_DB" );
243 if(ndb2->
init() != 0){
245 return NDBT_ProgramExit(NDBT_FAILED);
250 ndbout <<
"Waiting for ndb to become ready..." << endl;
259 for(i= optind; i<argc; i++)
264 ndbout_c(
"Could not find table: %s, skipping", argv[i]);
269 name.
appfmt(
"EV-%s", argv[i]);
277 (NdbDictionary::Event::ER_UPDATED |
278 NdbDictionary::Event::ER_DDL));
284 g_info <<
"Event creation failed event exists. Removing...\n";
287 g_err <<
"Failed to drop event: " << myDict->
getNdbError() << endl;
294 g_err <<
"Failed to create event: " << myDict->
getNdbError() << endl;
301 g_err <<
"Failed to create event: " << myDict->
getNdbError() << endl;
307 events.push_back(myEvent);
311 g_err <<
"Event operation creation failed" << endl;
322 event_pre_values[sz].
325 event_ops.push_back(pOp);
329 table_infos.push_back(ti);
331 pOp->setCustomData((
void *)&table_infos[sz]);
335 for(i= 0; i<(int)event_ops.size(); i++)
337 if (event_ops[i]->execute())
339 g_err <<
"operation execution failed: " << event_ops[
i]->getNdbError()
351 while(MyNdb.pollEvents(100) == 0);
356 Uint64 gci= pOp->
getGCI();
357 Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
359 do_begin(ndb2, trans_arg);
367 do_insert(trans_arg, pOp);
372 do_delete(trans_arg, pOp);
377 do_update(trans_arg, pOp);
397 ndbout_c(
"Error: unknown event type: %u",
401 }
while ((pOp= MyNdb.nextEvent()) && gci == pOp->
getGCI());
403 do_commit(trans_arg);
404 ndbout_c(
"GCI: %u/%u events: %lld(I) %lld(U) %lld(D)",
405 Uint32(gci >> 32), Uint32(gci), cnt_i, cnt_u, cnt_d);
409 for(i= 0; i<(int)event_ops.size(); i++)
410 MyNdb.dropEventOperation(event_ops[i]);
416 return NDBT_ProgramExit(NDBT_OK);