18 #include <HugoOperations.hpp>
23 const NdbError &_error= (error); \
24 if (!m_quiet) ERR_OUT(g_err, _error); \
27 int HugoOperations::startTransaction(
Ndb* pNdb,
29 const char *keyData, Uint32 keyLen){
32 ndbout <<
"HugoOperations::startTransaction, pTrans != NULL" << endl;
45 int HugoOperations::setTransaction(
NdbTransaction* new_trans,
bool not_null_ok){
47 if (pTrans != NULL && !not_null_ok){
48 ndbout <<
"HugoOperations::startTransaction, pTrans != NULL" << endl;
59 HugoOperations::setTransactionId(Uint64
id){
61 pTrans->setTransactionId(
id);
65 int HugoOperations::closeTransaction(
Ndb* pNdb){
67 UtilTransactions::closeTransaction(pNdb);
69 m_result_sets.clear();
70 m_executed_result_sets.clear();
79 int HugoOperations::pkReadRecord(
Ndb* pNdb,
85 allocRows(numRecords);
92 for(
int r=0; r < numRecords; r++){
114 if (pIndexScanOp == 0)
117 bool mrrScan= (numRecords > 1);
118 Uint32
flags= mrrScan? NdbScanOperation::SF_MultiRange : 0;
121 indexScans.push_back(pIndexScanOp);
139 if (equalForRow(pOp, r+recordNo) != 0)
144 if (getPartIdForRow(pOp, r+recordNo, partId))
146 g_info <<
"Setting operation partition Id" << endl;
153 if(r == 0 || pIndexScanOp == 0)
157 if((rows[r]->attributeStore(a) =
173 int HugoOperations::pkReadRandRecord(
Ndb* pNdb,
179 allocRows(numRecords);
186 for(
int r=0; r < numRecords; r++){
210 check = pIndexScanOp->readTuples(lm);
212 indexScans.push_back(pIndexScanOp);
228 int rowid= rand() % records;
231 if (equalForRow(pOp, rowid) != 0)
236 if (getPartIdForRow(pOp, rowid, partId))
238 g_info <<
"Setting operation partition Id" << endl;
243 pIndexScanOp->end_of_bound(r);
245 if(r == 0 || pIndexScanOp == 0)
249 if((rows[r]->attributeStore(a) =
265 int HugoOperations::pkReadRecordLockHandle(
Ndb* pNdb,
273 g_err <<
"ERROR : Cannot call pkReadRecordLockHandle on an index"
288 const NdbOperation* prevOp = pTrans->getLastDefinedOperation();
290 int readRc = pkReadRecord(pNdb,
296 if (readRc == NDBT_OK)
301 const NdbOperation* definedOp = (prevOp)? prevOp->next() :
302 pTrans->getFirstDefinedOperation();
308 (
const_cast<NdbOperation*
>(definedOp))->getLockHandle();
317 lockHandles.push_back(lh);
318 definedOp = definedOp->next();
325 int HugoOperations::pkUnlockRecord(
Ndb* pNdb,
331 if (numRecords == ~(0))
333 numRecords = lockHandles.size() -
offset;
336 if (lockHandles.size() < (unsigned)(offset + numRecords))
338 g_err <<
"ERROR : LockHandles size is "
339 << lockHandles.size()
342 <<
") and/or numRecords ("
344 <<
") too large." << endl;
348 for (
int i = 0;
i < numRecords;
i++)
355 if (unlockOp == NULL)
364 g_err <<
"ERROR : LockHandle number "
365 <<
i+offset <<
" is NULL. "
366 <<
" offset is " << offset << endl;
374 int HugoOperations::pkUpdateRecord(
Ndb* pNdb,
378 allocRows(numRecords);
380 for(
int r=0; r < numRecords; r++){
395 if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
401 if(getPartIdForRow(pOp, r+recordNo, partId))
404 pOp->setAnyValue(getAnyValueForRowUpd(r+recordNo, updatesValue));
411 HugoOperations::setValues(
NdbOperation* pOp,
int rowId,
int updateId)
415 if (equalForRow(pOp, rowId) != 0)
420 if(setValueForAttr(pOp, a, rowId, updateId ) != 0){
431 int HugoOperations::pkInsertRecord(
Ndb* pNdb,
437 for(
int r=0; r < numRecords; r++){
452 if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
459 if(getPartIdForRow(pOp, r+recordNo, partId))
466 int HugoOperations::pkWriteRecord(
Ndb* pNdb,
471 for(
int r=0; r < numRecords; r++){
487 if (equalForRow(pOp, r+recordNo) != 0)
491 if(getPartIdForRow(pOp, r+recordNo, partId))
498 if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
509 int HugoOperations::pkWritePartialRecord(
Ndb* pNdb,
514 for(
int r=0; r < numRecords; r++){
530 if (equalForRow(pOp, r+recordNo) != 0)
534 if(getPartIdForRow(pOp, r+recordNo, partId))
541 int HugoOperations::pkDeleteRecord(
Ndb* pNdb,
546 for(
int r=0; r < numRecords; r++){
562 if (equalForRow(pOp, r+recordNo) != 0)
566 if(getPartIdForRow(pOp, r+recordNo, partId))
572 int HugoOperations::pkRefreshRecord(
Ndb* pNdb,
577 char buffer[NDB_MAX_TUPLE_SIZE];
588 opts.optionsPresent = NdbOperation::OperationOptions::OO_ANYVALUE;
589 for(
int r=0; r < numRecords; r++)
591 bzero(buffer,
sizeof(buffer));
592 if (calc.equalForRow((Uint8*)buffer, record, r + recordNo))
597 opts.anyValue = anyValueInfo?
598 (anyValueInfo << 16) | (r+recordNo) :
601 const NdbOperation* pOp = pTrans->refreshTuple(record, buffer,
602 &opts,
sizeof(opts));
613 int HugoOperations::execute_Commit(
Ndb* pNdb,
617 check = pTrans->
execute(Commit, eao);
620 if( check == -1 || err.
code) {
634 for(
unsigned int i = 0;
i<m_result_sets.size();
i++){
635 m_executed_result_sets.push_back(m_result_sets[
i]);
637 int rows = m_result_sets[
i].records;
647 return (err.
code > 0 ? err.
code : NDBT_FAILED);
656 m_result_sets[
i].records--;
661 m_result_sets.clear();
666 int HugoOperations::execute_NoCommit(
Ndb* pNdb, AbortOption eao){
669 check = pTrans->
execute(NoCommit, eao);
672 if( check == -1 || err.
code) {
691 for(
unsigned int i = 0; i<m_result_sets.size(); i++){
692 m_executed_result_sets.push_back(m_result_sets[i]);
694 int rows = m_result_sets[
i].records;
704 return (err.
code > 0 ? err.
code : NDBT_FAILED);
718 m_result_sets.clear();
723 int HugoOperations::execute_Rollback(
Ndb* pNdb){
725 check = pTrans->
execute(Rollback);
736 HugoOperations_async_callback(
int res,
NdbTransaction* pCon,
void* ho)
744 assert(pCon == pTrans);
762 HugoOperations_async_callback,
777 HugoOperations_async_callback,
785 HugoOperations::wait_async(
Ndb* pNdb,
int timeout)
787 volatile int * wait = &m_async_reply;
795 ndbout <<
"ERROR: " << pNdb->
getNdbError(m_async_return) << endl;
796 return m_async_return;
799 ndbout_c(
"wait returned nothing...");
813 HugoOperations::~HugoOperations(){
829 if(equalForAttr(pOp, a, row) != 0)
840 bool HugoOperations::getPartIdForRow(
const NdbOperation* pOp,
861 partId= rowid % numFrags;
862 g_info <<
"Returning partition Id of " << partId << endl;
875 g_info <<
"Can't call equalForAttr on non PK attribute" << endl;
879 int len = attr->getSizeInBytes();
880 char buf[NDB_MAX_TUPLE_SIZE];
881 memset(buf, 0,
sizeof(buf));
883 const char * value = calc.calcValue(rowId, attrId, 0, buf, len, &real_len);
895 int len = attr->getSizeInBytes();
896 char buf[NDB_MAX_TUPLE_SIZE];
897 memset(buf, 0,
sizeof(buf));
899 const char * value = calc.calcValue(rowId, attrId,
900 updateId, buf, len, &real_len);
906 int len = (int)
sizeof(buf);
908 const char * value = calc.calcValue(rowId, attrId,
909 updateId, buf, len, &real_len);
917 return b->
setValue(value, real_len);
922 HugoOperations::verifyUpdatesValue(
int updatesValue,
int _numRows){
923 _numRows = (_numRows == 0 ? rows.size() : _numRows);
925 int result = NDBT_OK;
927 for(
int i = 0; i<_numRows; i++){
928 if(calc.verifyRowValues(rows[i]) != NDBT_OK){
929 g_err <<
"Inconsistent row"
930 << endl <<
"\t" << rows[
i]->c_str().c_str() << endl;
931 result = NDBT_FAILED;
935 if(calc.getUpdatesValue(rows[i]) != updatesValue){
936 result = NDBT_FAILED;
937 g_err <<
"Invalid updates value for row " << i << endl
938 <<
" updatesValue: " << updatesValue << endl
939 <<
" calc.getUpdatesValue: " << calc.getUpdatesValue(rows[i]) << endl
940 << rows[
i]->c_str().c_str() << endl;
946 g_err <<
"No rows -> Invalid updates value" << endl;
953 void HugoOperations::allocRows(
int _numRows){
955 g_info <<
"Illegal value for num rows : " << _numRows << endl;
959 for(
int b=rows.size(); b<_numRows; b++){
964 void HugoOperations::deallocRows(){
965 while(rows.size() > 0){
967 rows.erase(rows.size() - 1);
971 int HugoOperations::saveCopyOfRecord(
int numRecords ){
973 if (numRecords > (
int)rows.size())
976 for (
int i = 0; i < numRecords; i++){
977 savedRecords.push_back(rows[i]->c_str());
982 BaseString HugoOperations::getRecordStr(
int recordNum){
983 if (recordNum > (
int)rows.size())
985 return rows[recordNum]->
c_str();
988 int HugoOperations::getRecordGci(
int recordNum){
993 int HugoOperations::compareRecordToCopy(
int numRecords ){
994 if (numRecords > (
int)rows.size())
996 if ((
unsigned)numRecords > savedRecords.size())
999 int result = NDBT_OK;
1000 for (
int i = 0; i < numRecords; i++){
1002 ndbout <<
"row["<<i<<
"]: " << str << endl;
1003 ndbout <<
"sav["<<i<<
"]: " << savedRecords[
i] << endl;
1004 if (savedRecords[i] == str){
1007 result = NDBT_FAILED;
1014 HugoOperations::refresh() {
1020 int HugoOperations::indexReadRecords(
Ndb*,
const char * idxName,
int recordNo,
1025 allocRows(numRecords);
1027 for(
int r=0; r < numRecords; r++){
1035 if (exclusive ==
true)
1046 if (equalForRow(pOp, r+recordNo) != 0)
1051 if((rows[r]->attributeStore(a) =
1063 HugoOperations::indexUpdateRecord(
Ndb*,
1064 const char * idxName,
1069 allocRows(numRecords);
1071 for(
int r=0; r < numRecords; r++){
1087 if (equalForRow(pOp, r+recordNo) != 0)
1093 if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
1119 if((rows[0]->attributeStore(a) =
1127 RsPair p = {pOp, records};
1128 m_result_sets.push_back(p);
1134 HugoOperations::releaseLockHandles(
Ndb* pNdb,
1139 int totalSize = lockHandles.size();
1140 if (numRecords == ~(0))
1142 numRecords = totalSize -
offset;
1145 if (totalSize < (offset + numRecords))
1147 g_err <<
"ERROR : LockHandles size is "
1148 << lockHandles.size()
1151 <<
") and/or numRecords ("
1153 <<
") too large." << endl;
1157 for (
int i = 0; i < numRecords; i++)
1162 if (pTrans->releaseLockHandle(lh) != 0)
1174 g_err <<
"ERROR : LockHandle number "
1175 << i+offset <<
" is NULL. "
1176 <<
" offset is " << offset << endl;
1190 ndberror_update(&ndberror);
1195 HugoOperations::getNdbError()
const
1202 HugoOperations::setNdbError(
const NdbError& error)
1208 HugoOperations::setAnyValueCallback(AnyValueCallback avc)
1214 HugoOperations::getAnyValueForRowUpd(
int row,
int update)
1216 if (avCallback == NULL)
1219 return (avCallback)(pTrans->
getNdb(), pTrans,