19 #include <ndb_global.h>
21 #include <HugoAsynchTransactions.hpp>
27 transactionsCompleted(0),
32 operationType(NO_READ),
34 nextUnProcessedRecord(0),
36 totalCompletedRecords(0),
43 HugoAsynchTransactions::~HugoAsynchTransactions(){
44 deallocTransactions();
48 HugoAsynchTransactions::loadTableAsynch(
Ndb* pNdb,
54 int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
56 g_info << (
unsigned int)transactionsCompleted * operations
57 <<
"|- inserted..." << endl;
63 HugoAsynchTransactions::pkDelRecordsAsynch(
Ndb* pNdb,
69 g_info <<
"|- Deleting records asynchronous..." << endl;
71 int result = executeAsynchOperation(pNdb, records, batch, trans,
74 g_info <<
"|- " << (
unsigned int)transactionsCompleted * operations
75 <<
" deleted..." << endl;
81 HugoAsynchTransactions::pkReadRecordsAsynch(
Ndb* pNdb,
87 g_info <<
"|- Reading records asynchronous..." << endl;
89 allocRows(trans*operations);
90 int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
93 g_info <<
"|- " << (
unsigned int)transactionsCompleted * operations
103 HugoAsynchTransactions::pkUpdateRecordsAsynch(
Ndb* pNdb,
109 g_info <<
"|- Updating records asynchronous..." << endl;
111 allocRows(trans*operations);
112 int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
115 g_info <<
"|- " << (
unsigned int)transactionsCompleted * operations
126 HugoAsynchTransactions::allocTransactions(
int trans,
int maxOpsPerTrans) {
127 if (transInfo != NULL) {
128 deallocTransactions();
130 transInfo =
new TransactionInfo[trans];
133 TransactionInfo init;
135 init.transaction= NULL;
136 init.startRecordId= 0;
138 init.resultRowStartIndex= 0;
140 init.opType= NO_READ;
142 for (
int i=0;
i < trans;
i++)
145 transInfo[
i].resultRowStartIndex= (
i * maxOpsPerTrans);
150 HugoAsynchTransactions::deallocTransactions() {
151 if (transInfo != NULL){
158 HugoAsynchTransactions::getNextWorkTask(
int* startRecordId,
int* numRecords)
164 if (nextUnProcessedRecord == recordsPerLoop)
167 if ((loopNum + 1) == totalLoops)
172 nextUnProcessedRecord= 0;
176 int availableRecords= recordsPerLoop- nextUnProcessedRecord;
177 int recordsInTask= (availableRecords < maxOpsPerTrans)?
178 availableRecords : maxOpsPerTrans;
180 *startRecordId= nextUnProcessedRecord;
181 *numRecords= recordsInTask;
183 nextUnProcessedRecord+= recordsInTask;
189 HugoAsynchTransactions::defineUpdateOpsForTask(TransactionInfo* tInfo)
200 for (
int recordId= tInfo->startRecordId;
201 recordId < (tInfo->startRecordId + tInfo->numRecords);
212 int updateVal= calc.getUpdatesValue(rows[recordId]) + 1;
215 if (equalForRow(pOp, recordId) != 0)
224 if (setValueForAttr(pOp, a, recordId, updateVal) != 0) {
237 HugoAsynchTransactions::defineTransactionForTask(TransactionInfo* tInfo,
238 ExecType taskExecType)
249 for (
int recordId= tInfo->startRecordId;
250 recordId < (tInfo->startRecordId + tInfo->numRecords);
260 switch (tInfo->opType) {
272 if (setValueForAttr(pOp, a, recordId, 0 ) != 0) {
281 g_err <<
"Attempt to define update transaction" << endl;
287 if (equalForRow(pOp, recordId) != 0)
295 if ((rows[recordId]->attributeStore(a) =
313 if (equalForRow(pOp, recordId) != 0)
327 tInfo->transaction= trans;
338 HugoAsynchTransactions::beginNewTask(TransactionInfo* tInfo)
340 tInfo->transaction= NULL;
341 tInfo->startRecordId= 0;
342 tInfo->numRecords= 0;
346 NDB_OPERATION realOpType= operationType;
347 ExecType realExecType= execType;
348 if (operationType == NO_UPDATE)
351 realExecType= NoCommit;
353 tInfo->opType= realOpType;
355 if (getNextWorkTask(&tInfo->startRecordId,
356 &tInfo->numRecords) == 0)
359 if (defineTransactionForTask(tInfo, realExecType) != 0)
361 g_err <<
"Error defining new transaction" << endl;
375 HugoAsynchTransactions::callbackFunc(
int result,
379 HugoAsynchTransactions::TransactionInfo* tranInfo=
380 (HugoAsynchTransactions::TransactionInfo*) anObject;
382 tranInfo->hugoP->callback(result, pTrans, tranInfo);
387 HugoAsynchTransactions::callback(
int result,
389 TransactionInfo* tInfo)
395 if (pTrans != tInfo->transaction)
397 g_err <<
"Transactions not same in callback!" << endl;
399 testResult= NDBT_FAILED;
405 if (transErr.
code == 0)
410 switch (tInfo->opType)
414 for (
int recordId = tInfo->startRecordId;
415 recordId < (tInfo->startRecordId + tInfo->numRecords);
418 if (calc.verifyRowValues(rows[recordId]) != 0) {
419 g_info <<
"|- Verify failed..." << endl;
422 testResult= NDBT_FAILED;
427 if (operationType == NO_UPDATE)
430 if (defineUpdateOpsForTask(tInfo) == 0)
432 tInfo->opType= NO_UPDATE;
433 tInfo->transaction->executeAsynch(Commit,
439 g_err <<
"Error defining update operations in callback" << endl;
441 testResult= NDBT_FAILED;
459 transactionsCompleted ++;
460 totalCompletedRecords+= tInfo->numRecords;
462 if (beginNewTask(tInfo) < 0)
465 testResult= NDBT_FAILED;
471 g_err <<
"Callback got error on task : "
472 << tInfo->startRecordId <<
" to "
473 << tInfo->startRecordId + tInfo->numRecords <<
" "
474 << transErr.
code <<
":"
476 <<
". Task type : " << tInfo->opType << endl;
481 if (tInfo->retries < 10)
485 tInfo->transaction->close();
487 if (tInfo->retries > maxUsedRetries)
488 maxUsedRetries= tInfo->retries;
494 int multiplier= 1 << tInfo->retries;
496 int backoffMillis= multiplier*base + myRandom48(base);
498 g_err <<
" Error is temporary, retrying in "
499 << backoffMillis <<
" millis. Retry number "
500 << tInfo->retries << endl;
501 NdbSleep_MilliSleep(backoffMillis);
506 tInfo->opType= operationType;
507 ExecType taskExecType= execType;
508 if (operationType == NO_UPDATE)
510 tInfo->opType= NO_READ;
511 taskExecType= NoCommit;
515 if (defineTransactionForTask(tInfo, taskExecType) != 0)
517 g_err <<
"Error defining retry transaction in callback" << endl;
519 testResult= NDBT_FAILED;
525 g_err <<
"Too many retries (" << tInfo->retries
526 <<
") failing." << endl;
532 g_err <<
"Status= " << transErr.
status <<
" Failing test" << endl;
533 testResult= NDBT_FAILED;
541 HugoAsynchTransactions::executeAsynchOperation(
Ndb* pNdb,
546 NDB_OPERATION theOperation,
568 recordsPerLoop= records;
569 maxOpsPerTrans= operations;
570 operationType= theOperation;
572 nextUnProcessedRecord= 0;
573 totalCompletedRecords= 0;
578 allocTransactions(trans, maxOpsPerTrans);
581 int nextUndefinedTrans= 0;
582 while ((nextUndefinedTrans < trans) &&
583 (beginNewTask(&transInfo[nextUndefinedTrans++]) == 0))
594 if (totalCompletedRecords == (records * totalLoops))
598 deallocTransactions();