20 #include <NdbSqlUtil.hpp>
21 #include <AttributeHeader.hpp>
23 #include <signaldata/ScanTab.hpp>
24 #include <signaldata/KeyInfo.hpp>
25 #include <signaldata/AttrInfo.hpp>
26 #include <signaldata/TcKeyReq.hpp>
28 #define DEBUG_NEXT_RESULT 0
32 m_transConnection(NULL)
35 m_allocated_receivers = 0;
36 m_prepared_receivers = 0;
41 m_array =
new Uint32[1];
45 m_scanUsingOldApi=
true;
46 m_readTuplesCalled=
false;
47 m_interpretedCodeOldApi= NULL;
50 NdbScanOperation::~NdbScanOperation()
52 for(Uint32
i = 0;
i<m_allocated_receivers;
i++){
53 m_receivers[
i]->release();
54 theNdb->releaseNdbScanRec(m_receivers[
i]);
60 NdbScanOperation::setErrorCode(
int aErrorCode)
const
65 pnonConstThis->theNdbCon = m_transConnection;
66 NdbOperation::setErrorCode(aErrorCode);
67 pnonConstThis->theNdbCon = tmp;
71 NdbScanOperation::setErrorCodeAbort(
int aErrorCode)
const
76 pnonConstThis->theNdbCon = m_transConnection;
77 NdbOperation::setErrorCodeAbort(aErrorCode);
78 pnonConstThis->theNdbCon = tmp;
91 m_transConnection = myConnection;
93 if (NdbOperation::init(tab, myConnection,
false) != 0)
96 theNdb->theRemainingStartTransactions++;
98 if (!aScanConnection){
99 theNdb->theRemainingStartTransactions--;
105 theNdbCon= aScanConnection;
109 theStatus = GetValue;
111 theNdbCon->theMagicNumber = 0xFE11DF;
112 theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
117 m_scanUsingOldApi=
true;
118 m_readTuplesCalled=
false;
119 m_interpretedCodeOldApi= NULL;
120 m_pruneState= SPS_UNKNOWN;
122 m_api_receivers_count = 0;
123 m_current_api_receiver = 0;
124 m_sent_receivers_count = 0;
125 m_conf_receivers_count = 0;
130 NdbScanOperation::handleScanGetValuesOldApi()
133 assert(m_scanUsingOldApi);
135 if (theReceiver.theFirstRecAttr != NULL)
145 const NdbRecAttr* recAttrToRead = theReceiver.theFirstRecAttr;
147 while(recAttrToRead != NULL)
150 res= insertATTRINFOHdr_NdbRecord(recAttrToRead->theAttrId, 0);
151 if (unlikely(res == -1))
153 recAttrToRead= recAttrToRead->next();
156 theInitialReadSize= theTotalCurrAI_Len - AttrInfo::SectionSizeInfoLength;
170 NdbScanOperation::addInterpretedCode()
172 Uint32 mainProgramWords= 0;
173 Uint32 subroutineWords= 0;
177 if (code->m_flags & NdbInterpretedCode::UsesDisk)
179 m_flags &= ~Uint8(OF_NO_DISK);
184 mainProgramWords= code->m_first_sub_instruction_pos ?
185 code->m_first_sub_instruction_pos :
186 code->m_instructions_length;
188 int res = insertATTRINFOData_NdbRecord((
const char*)code->m_buffer,
189 mainProgramWords << 2);
193 if (code->m_number_of_subs > 0)
195 assert(mainProgramWords > 0);
196 assert(code->m_first_sub_instruction_pos > 0);
198 Uint32 *subroutineStart=
199 &code->m_buffer[ code->m_first_sub_instruction_pos ];
201 code->m_instructions_length -
202 code->m_first_sub_instruction_pos;
204 res = insertATTRINFOData_NdbRecord((
const char*) subroutineStart,
205 subroutineWords << 2);
209 theInterpretedSize= mainProgramWords;
210 theSubroutineSize= subroutineWords;
220 NdbScanOperation::handleScanOptions(
const ScanOptions *options)
226 if ((options->optionsPresent & ScanOptions::SO_GETVALUE) &&
227 (options->numExtraGetValues > 0))
229 if (options->extraGetValues == NULL)
231 setErrorCodeAbort(4299);
238 for (
unsigned int i=0;
i < options->numExtraGetValues;
i++)
242 pvalSpec->recAttr=NULL;
244 if (pvalSpec->column == NULL)
246 setErrorCodeAbort(4295);
256 (
char *) pvalSpec->appStorage);
263 pvalSpec->recAttr = pra;
267 if (options->optionsPresent & ScanOptions::SO_PARTITION_ID)
270 assert(theBlobList == NULL);
271 assert(m_pruneState == SPS_UNKNOWN);
276 if(unlikely(! (m_attribute_record->flags &
277 NdbRecord::RecHasUserDefinedPartitioning)))
280 setErrorCodeAbort(4546);
284 m_pruneState= SPS_FIXED;
285 m_pruningKey= options->partitionId;
288 theDistributionKey = options->partitionId;
289 theDistrKeyIndicator_ = 1;
290 assert((m_attribute_record->flags & NdbRecord::RecHasUserDefinedPartitioning) != 0);
291 DBUG_PRINT(
"info", (
"NdbScanOperation::handleScanOptions(dist key): %u",
292 theDistributionKey));
295 if (options->optionsPresent & ScanOptions::SO_INTERPRETED)
302 options->interpretedCode->getTable();
303 if (codeTable != NULL)
307 if ((impl->m_id != (
int) m_attribute_record->tableId) ||
308 (table_version_major(impl->m_version) !=
309 table_version_major(m_attribute_record->tableVersion)))
313 if ((options->interpretedCode->m_flags &
314 NdbInterpretedCode::Finalised) == 0)
316 setErrorCodeAbort(4519);
319 m_interpreted_code= options->interpretedCode;
323 if (options->optionsPresent & ScanOptions::SO_CUSTOMDATA)
325 m_customData = options->customData;
329 if (options->optionsPresent & ScanOptions::SO_PART_INFO)
334 if (unlikely(validatePartInfoPtr(pSpec,
335 options->sizeOfPartInfo,
337 getPartValueFromInfo(pSpec,
342 assert(m_pruneState == SPS_UNKNOWN);
343 m_pruneState= SPS_FIXED;
344 m_pruningKey= partValue;
346 theDistributionKey= partValue;
347 theDistrKeyIndicator_= 1;
348 DBUG_PRINT(
"info", (
"Set distribution key from partition spec to %u",
364 const Uint32 * m_read_mask)
367 Uint32 columnCount= 0;
372 for (Uint32
i= 0;
i<result_record->noOfColumns;
i++)
375 Uint32 attrId= col->attrId;
377 assert(!(attrId & AttributeHeader::PSEUDO));
386 if (unlikely(col->flags & NdbRecord::IsBlob))
393 if (col->flags & NdbRecord::IsDisk)
394 m_flags &= ~Uint8(OF_NO_DISK);
396 if (attrId > maxAttrId)
399 readMask.
set(attrId);
411 bool all= (columnCount == m_currentTable->m_columns.size());
414 result= insertATTRINFOHdr_NdbRecord(AttributeHeader::READ_ALL,
419 Uint32 sigBitmaskWords= (maxAttrId>>5) + 1;
421 result= insertATTRINFOHdr_NdbRecord(AttributeHeader::READ_PACKED,
422 sigBitmaskWords << 2);
424 result= insertATTRINFOData_NdbRecord((
const char*) &readMask.rep.data[0],
425 sigBitmaskWords << 2);
440 const Uint32 * readMask)
442 bool haveBlob=
false;
448 theInitialReadSize= theTotalCurrAI_Len - AttrInfo::SectionSizeInfoLength;
451 if (m_scanUsingOldApi)
453 if (handleScanGetValuesOldApi() !=0)
460 if (handleScanOptions(options) != 0)
468 if (unlikely(haveBlob) && !m_scanUsingOldApi)
470 if (getBlobHandlesNdbRecord(m_transConnection, readMask) == -1)
477 if (m_interpreted_code != NULL)
479 if (addInterpretedCode() == -1)
487 theNdbCon->theTransactionId) == -1)
495 NdbScanOperation::handleScanOptionsVersion(
const ScanOptions*& optionsPtr,
496 Uint32 sizeOfOptions,
497 ScanOptions& currOptions)
500 if (unlikely((sizeOfOptions !=0) &&
501 (sizeOfOptions !=
sizeof(ScanOptions))))
504 if (sizeOfOptions ==
sizeof(ScanOptions_v1))
506 const ScanOptions_v1* oldOptions=
507 (
const ScanOptions_v1*) optionsPtr;
512 currOptions.optionsPresent= oldOptions->optionsPresent;
513 currOptions.scan_flags= oldOptions->scan_flags;
514 currOptions.parallel= oldOptions->parallel;
515 currOptions.batch= oldOptions->batch;
516 currOptions.extraGetValues= oldOptions->extraGetValues;
517 currOptions.numExtraGetValues= oldOptions->numExtraGetValues;
518 currOptions.partitionId= oldOptions->partitionId;
519 currOptions.interpretedCode= oldOptions->interpretedCode;
520 currOptions.customData= oldOptions->customData;
523 currOptions.partitionInfo= NULL;
524 currOptions.sizeOfPartInfo= 0;
526 optionsPtr= &currOptions;
531 setErrorCodeAbort(4298);
540 NdbScanOperation::scanTableImpl(
const NdbRecord *result_record,
542 const unsigned char *result_mask,
544 Uint32 sizeOfOptions)
547 Uint32 scan_flags = 0;
551 ScanOptions currentOptions;
555 if (handleScanOptionsVersion(options, sizeOfOptions, currentOptions))
561 if (options->optionsPresent & ScanOptions::SO_SCANFLAGS)
562 scan_flags = options->scan_flags;
563 if (options->optionsPresent & ScanOptions::SO_PARALLEL)
564 parallel = options->parallel;
565 if (options->optionsPresent & ScanOptions::SO_BATCH)
566 batch = options->batch;
568 #if 0 // ToDo: this breaks optimize index, but maybe there is a better solution
569 if (result_record->flags & NdbRecord::RecIsIndex)
571 setErrorCodeAbort(4340);
576 m_attribute_record= result_record;
578 m_attribute_record->copyMask(readMask.rep.data, result_mask);
581 res= processTableScanDefs(lock_mode, scan_flags, parallel, batch);
585 theStatus= NdbOperation::UseNdbRecord;
587 return scanImpl(options, readMask.rep.data);
596 switch(partInfo->type)
598 case Ndb::PartitionSpec::PS_USER_DEFINED:
600 assert(table->m_fragmentType == NdbDictionary::Object::UserDefined);
601 *partValue= partInfo->UserDefined.partitionId;
605 case Ndb::PartitionSpec::PS_DISTR_KEY_PART_PTR:
607 assert(table->m_fragmentType != NdbDictionary::Object::UserDefined);
610 partInfo->KeyPartPtr.tableKeyParts,
611 partInfo->KeyPartPtr.xfrmbuf,
612 partInfo->KeyPartPtr.xfrmbuflen);
634 setErrorCodeAbort(ret);
639 case Ndb::PartitionSpec::PS_DISTR_KEY_RECORD:
641 assert(table->m_fragmentType != NdbDictionary::Object::UserDefined);
644 partInfo->KeyRecord.keyRecord,
645 partInfo->KeyRecord.keyRow,
646 partInfo->KeyRecord.xfrmbuf,
647 partInfo->KeyRecord.xfrmbuflen);
658 setErrorCodeAbort(ret);
665 setErrorCodeAbort(4542);
679 Uint32 prefix_length)
686 for (i= 0; i<prefix_length; i++)
690 bool is_null1= col->is_null(row1);
691 bool is_null2= col->is_null(row2);
703 Uint32
offset= col->offset;
704 Uint32 maxSize= col->maxSize;
705 const char *ptr1= row1 +
offset;
706 const char *ptr2= row2 +
offset;
709 char buf1[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
710 char buf2[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
711 if (col->flags & NdbRecord::IsMysqldShrinkVarchar)
714 bool ok1 = col->shrink_varchar(row1, len1, buf1);
718 bool ok2 = col->shrink_varchar(row2, len2, buf2);
723 void *info= col->charset_info;
725 (*col->compare_function)(info, ptr1, maxSize, ptr2, maxSize);
737 NdbIndexScanOperation::getDistKeyFromRange(
const NdbRecord *key_record,
742 const Uint32 MaxKeySizeInLongWords= (NDB_MAX_KEY_SIZE + 7) / 8;
743 Uint64 tmp[ MaxKeySizeInLongWords ];
744 char* tmpshrink = (
char*)tmp;
745 Uint32 tmplen = (Uint32)
sizeof(tmp);
748 assert(key_record->table->m_fragmentType !=
749 NdbDictionary::Object::UserDefined);
753 for (i = 0; i<key_record->distkey_index_length; i++)
756 &key_record->columns[key_record->distkey_indexes[
i]];
757 if (col->flags & NdbRecord::IsMysqldShrinkVarchar)
762 bool len_ok = col->shrink_varchar(row, len, tmpshrink);
766 setErrorCodeAbort(4209);
769 ptrs[
i].ptr = tmpshrink;
776 setErrorCodeAbort(4207);
782 ptrs[
i].ptr = row + col->offset;
784 ptrs[
i].len = col->maxSize;
790 ptrs, tmpshrink, tmplen);
793 *distKey = hashValue;
799 ndbout <<
"err: " << ret << endl;
801 setErrorCodeAbort(ret);
808 Uint32 sizeOfPartInfo,
819 tmpSpec.type= oldPSpec->type;
820 if (tmpSpec.type == Ndb::PartitionSpec_v1::PS_USER_DEFINED)
822 tmpSpec.UserDefined.partitionId= oldPSpec->UserDefined.partitionId;
826 tmpSpec.KeyPartPtr.tableKeyParts= oldPSpec->KeyPartPtr.tableKeyParts;
827 tmpSpec.KeyPartPtr.xfrmbuf= oldPSpec->KeyPartPtr.xfrmbuf;
828 tmpSpec.KeyPartPtr.xfrmbuflen= oldPSpec->KeyPartPtr.xfrmbuflen;
836 setErrorCodeAbort(4545);
841 if (partInfo->type != Ndb::PartitionSpec::PS_NONE)
843 if (m_pruneState == SPS_FIXED)
846 setErrorCodeAbort(4543);
850 if ((partInfo->type == Ndb::PartitionSpec::PS_USER_DEFINED) !=
851 ((m_currentTable->m_fragmentType == NdbDictionary::Object::UserDefined)))
857 setErrorCodeAbort(4544);
875 return setBound(key_record, bound, NULL, 0);
889 Uint32 sizeOfPartInfo)
891 if (unlikely((theStatus != NdbOperation::UseNdbRecord)))
893 setErrorCodeAbort(4284);
898 if (unlikely(key_record == NULL))
900 setErrorCodeAbort(4285);
906 const bool openRange= (((bound.low_key == NULL) &&
907 (bound.high_key == NULL)) ||
908 ((bound.low_key_count == 0) &&
909 (bound.high_key_count == 0)));
914 bool tabHasUserDefPartitioning= (m_currentTable->m_fragmentType ==
915 NdbDictionary::Object::UserDefined);
922 if (validatePartInfoPtr(partInfo,
930 if (unlikely((m_num_bounds > 1) &&
931 (m_multi_range == 0)))
934 setErrorCodeAbort(4509);
940 Uint32 key_count, common_key_count;
944 range_no= bound.range_no;
945 if (unlikely(range_no > MaxRangeNo))
947 setErrorCodeAbort(4286);
952 if ( m_read_range_no && m_ordered )
954 if (unlikely((m_num_bounds > 1) &&
955 (range_no <= m_previous_range_num)))
957 setErrorCodeAbort(4282);
962 m_previous_range_num= range_no;
965 key_count= bound.low_key_count;
966 common_key_count= key_count;
967 if (key_count < bound.high_key_count)
968 key_count= bound.high_key_count;
970 common_key_count= bound.high_key_count;
972 if (unlikely(key_count > key_record->key_index_length))
975 setErrorCodeAbort(4281);
983 Uint32* firstRangeWord= NULL;
984 const Uint32 keyLenBeforeRange= theTupKeyLen;
986 if (likely(!openRange))
995 const bool isEqRange=
996 (bound.low_key == bound.high_key) &&
997 (bound.low_key_count == bound.high_key_count) &&
998 (bound.low_inclusive && bound.high_inclusive);
1003 for (j= 0; j<key_count; j++)
1005 ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
1006 bound.low_key,
BoundEQ, firstRangeWord);
1016 for (j= 0; j<key_count; j++)
1020 if (bound.low_key && j<bound.low_key_count)
1023 bound_type= bound.low_inclusive || j+1 < bound.low_key_count ?
1025 ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
1026 bound.low_key, bound_type, firstRangeWord);
1029 if (bound.high_key && j<bound.high_key_count)
1032 bound_type= bound.high_inclusive || j+1 < bound.high_key_count ?
1034 ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
1035 bound.high_key, bound_type, firstRangeWord);
1046 insert_open_bound(key_record, firstRangeWord);
1054 assert(firstRangeWord != NULL);
1056 bound_head= *firstRangeWord;
1058 (theTupKeyLen - keyLenBeforeRange) << 16 | (range_no << 4);
1059 *firstRangeWord= bound_head;
1077 if ((m_pruneState == SPS_UNKNOWN) ||
1078 (m_pruneState == SPS_ONE_PARTITION))
1080 bool currRangeHasOnePartVal=
false;
1081 Uint32 currRangePartValue= 0;
1087 currRangeHasOnePartVal=
true;
1089 if (getPartValueFromInfo(partInfo,
1090 m_attribute_record->table,
1091 &currRangePartValue))
1098 if (likely(!tabHasUserDefPartitioning))
1106 Uint32 distkey_min= key_record->m_min_distkey_prefix_length;
1107 if (index_distkeys == table_distkeys &&
1108 common_key_count >= distkey_min &&
1111 0==compare_index_row_prefix(key_record,
1116 assert(! openRange);
1117 currRangeHasOnePartVal=
true;
1118 if (getDistKeyFromRange(key_record, m_attribute_record,
1120 &currRangePartValue))
1136 const ScanPruningState prevPruneState= m_pruneState;
1137 if (currRangeHasOnePartVal)
1139 if (m_pruneState == SPS_UNKNOWN)
1142 m_pruneState= SPS_ONE_PARTITION;
1143 m_pruningKey= currRangePartValue;
1150 assert(m_pruneState == SPS_ONE_PARTITION);
1151 if (currRangePartValue != m_pruningKey)
1156 m_pruneState= SPS_MULTI_PARTITION;
1165 m_pruneState= SPS_MULTI_PARTITION;
1169 if (m_pruneState != prevPruneState)
1171 theDistrKeyIndicator_= (m_pruneState == SPS_ONE_PARTITION);
1172 theDistributionKey= m_pruningKey;
1175 ScanTabReq::setDistributionKeyFlag(req->requestInfo, theDistrKeyIndicator_);
1176 req->distributionKey= theDistributionKey;
1177 theSCAN_TABREQ->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
1186 NdbIndexScanOperation::scanIndexImpl(
const NdbRecord *key_record,
1189 const unsigned char *result_mask,
1192 Uint32 sizeOfOptions)
1196 Uint32 scan_flags = 0;
1197 Uint32 parallel = 0;
1200 ScanOptions currentOptions;
1202 if (options != NULL)
1204 if (handleScanOptionsVersion(options, sizeOfOptions, currentOptions))
1210 if (options->optionsPresent & ScanOptions::SO_SCANFLAGS)
1211 scan_flags = options->scan_flags;
1212 if (options->optionsPresent & ScanOptions::SO_PARALLEL)
1213 parallel = options->parallel;
1214 if (options->optionsPresent & ScanOptions::SO_BATCH)
1215 batch = options->batch;
1218 if (!(key_record->flags & NdbRecord::RecHasAllKeys))
1220 setErrorCodeAbort(4292);
1225 result_record->copyMask(readMask.rep.data, result_mask);
1227 if (scan_flags & (NdbScanOperation::SF_OrderBy |
1236 Uint32 keymask[MAXNROFATTRIBUTESINWORDS];
1239 for (i = 0; i < key_record->key_index_length; i++)
1241 Uint32 attrId = key_record->columns[key_record->key_indexes[
i]].attrId;
1242 if (attrId >= result_record->m_attrId_indexes_length ||
1245 setErrorCodeAbort(4292);
1257 readMask.rep.data, keymask))
1259 setErrorCodeAbort(4341);
1264 if (!(key_record->flags & NdbRecord::RecIsIndex))
1266 setErrorCodeAbort(4283);
1269 if (result_record->flags & NdbRecord::RecIsIndex)
1271 setErrorCodeAbort(4340);
1279 m_currentTable= result_record->table;
1281 m_key_record = key_record;
1282 m_attribute_record= result_record;
1284 res= processIndexScanDefs(lock_mode, scan_flags, parallel, batch);
1289 theStatus= NdbOperation::UseNdbRecord;
1292 res=
scanImpl(options, readMask.rep.data);
1323 if (m_readTuplesCalled)
1330 m_readTuplesCalled=
true;
1331 m_savedLockModeOldApi= lm;
1332 m_savedScanFlagsOldApi= scan_flags;
1333 m_savedParallelOldApi= parallel;
1334 m_savedBatchOldApi= batch;
1339 if (scan_flags & SF_OrderBy)
1352 m_ordered = m_descending =
false;
1353 m_pruneState= SPS_UNKNOWN;
1354 Uint32 fragCount = m_currentTable->m_fragmentCount;
1356 assert(fragCount > 0);
1358 if (parallel > fragCount || parallel == 0) {
1359 parallel = fragCount;
1362 theNdbCon->theScanningOp =
this;
1363 bool tupScan = (scan_flags & SF_TupScan);
1365 #if 0 // XXX temp for testing
1366 {
char* p = getenv(
"NDB_USE_TUPSCAN");
1368 unsigned n = atoi(p);
1369 if ((
unsigned int) (::time(0) % 10) < n) tupScan =
true;
1373 if (scan_flags & SF_DiskScan)
1376 m_flags &= ~Uint8(OF_NO_DISK);
1379 bool rangeScan=
false;
1382 if ( (
int) m_accessTable->m_indexType ==
1385 if (m_currentTable == m_accessTable){
1387 m_currentTable = theNdb->theDictionary->
1389 assert(m_currentTable != NULL);
1391 assert (m_currentTable != m_accessTable);
1393 theStatus = GetValue;
1400 parallel = fragCount;
1404 theParallelism = parallel;
1406 if(fix_receivers(parallel) == -1){
1407 setErrorCodeAbort(4000);
1411 if (theSCAN_TABREQ == NULL) {
1412 setErrorCodeAbort(4000);
1416 theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ, refToBlock(theNdbCon->m_tcRef));
1418 req->apiConnectPtr = theNdbCon->theTCConPtr;
1419 req->tableId = m_accessTable->m_id;
1420 req->tableSchemaVersion = m_accessTable->m_version;
1421 req->storedProcId = 0xFFFF;
1422 req->buddyConPtr = theNdbCon->theBuddyConPtr;
1424 req->first_batch_size = batch;
1427 ScanTabReq::setParallelism(reqInfo, parallel);
1428 ScanTabReq::setScanBatch(reqInfo, 0);
1429 ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
1430 ScanTabReq::setTupScanFlag(reqInfo, tupScan);
1431 req->requestInfo = reqInfo;
1433 m_keyInfo = (scan_flags & SF_KeyInfo) ? 1 : 0;
1434 setReadLockMode(lm);
1437 req->transId1 = (Uint32) transId;
1438 req->transId2 = (Uint32) (transId >> 32);
1440 assert(theSCAN_TABREQ->next() == NULL);
1442 theSCAN_TABREQ->next(tSignal);
1443 theLastKEYINFO = tSignal;
1445 theKEYINFOptr= tSignal->getDataPtrSend();
1446 keyInfoRemain= NdbApiSignal::MaxSignalWords;
1447 theTotalNrOfKeyWordInSignal= 0;
1449 getFirstATTRINFOScan();
1456 if (theStatus == NdbOperation::UseNdbRecord)
1458 setErrorCodeAbort(4284);
1462 if ((code->m_flags & NdbInterpretedCode::Finalised) == 0)
1464 setErrorCodeAbort(4519);
1468 m_interpreted_code= code;
1474 NdbScanOperation::allocInterpretedCodeOldApi()
1477 assert (m_interpretedCodeOldApi == NULL);
1480 if (! m_scanUsingOldApi)
1485 setErrorCodeAbort(4536);
1491 if (m_interpretedCodeOldApi == NULL)
1492 setErrorCodeAbort(4000);
1494 return m_interpretedCodeOldApi;
1498 NdbScanOperation::freeInterpretedCodeOldApi()
1500 if (m_interpretedCodeOldApi != NULL)
1502 delete m_interpretedCodeOldApi;
1503 m_interpretedCodeOldApi= NULL;
1509 NdbScanOperation::setReadLockMode(LockMode lockMode)
1511 bool lockExcl, lockHoldMode, readCommitted;
1516 lockHoldMode=
false;
1517 readCommitted=
true;
1523 readCommitted=
false;
1528 readCommitted=
false;
1535 theLockMode= lockMode;
1537 Uint32 reqInfo= req->requestInfo;
1538 ScanTabReq::setLockMode(reqInfo, lockExcl);
1539 ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
1540 ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
1541 req->requestInfo= reqInfo;
1545 NdbScanOperation::fix_receivers(Uint32 parallel){
1546 assert(parallel > 0);
1547 if(parallel > m_allocated_receivers){
1548 const Uint32 sz = parallel * (4*
sizeof(
char*)+
sizeof(Uint32));
1551 Uint64 * tmp =
new Uint64[(sz+7)/8];
1554 setErrorCodeAbort(4000);
1558 memcpy(tmp, m_receivers, m_allocated_receivers*
sizeof(
char*));
1560 m_array = (Uint32*)tmp;
1563 m_api_receivers = m_receivers + parallel;
1564 m_conf_receivers = m_api_receivers + parallel;
1565 m_sent_receivers = m_conf_receivers + parallel;
1566 m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
1570 for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
1571 tScanRec = theNdb->getNdbScanRec();
1572 if (tScanRec == NULL) {
1573 setErrorCodeAbort(4000);
1576 m_receivers[
i] = tScanRec;
1577 tScanRec->init(NdbReceiver::NDB_SCANRECEIVER,
false,
this);
1579 m_allocated_receivers = parallel;
1582 reset_receivers(parallel, 0);
1591 if(theError.
code == 0){
1592 if(DEBUG_NEXT_RESULT)
1593 ndbout_c(
"receiver_delivered");
1595 Uint32 idx = tRec->m_list_index;
1596 Uint32 last = m_sent_receivers_count - 1;
1599 m_sent_receivers[idx] = move;
1600 move->m_list_index = idx;
1602 m_sent_receivers_count = last;
1604 last = m_conf_receivers_count;
1605 m_conf_receivers[last] = tRec;
1606 m_conf_receivers_count = last + 1;
1607 tRec->m_current_row = 0;
1616 if(theError.
code == 0){
1617 if(DEBUG_NEXT_RESULT)
1618 ndbout_c(
"receiver_completed");
1620 Uint32 idx = tRec->m_list_index;
1621 Uint32 last = m_sent_receivers_count - 1;
1624 m_sent_receivers[idx] = move;
1625 move->m_list_index = idx;
1627 m_sent_receivers_count = last;
1644 NdbScanOperation::getFirstATTRINFOScan()
1648 tSignal = theNdb->getSignal();
1649 if (tSignal == NULL){
1650 setErrorCodeAbort(4000);
1654 theAI_LenInCurrAI = AttrInfo::SectionSizeInfoLength;
1655 theATTRINFOptr = &tSignal->getDataPtrSend()[AttrInfo::SectionSizeInfoLength];
1656 attrInfoRemain= NdbApiSignal::MaxSignalWords - AttrInfo::SectionSizeInfoLength;
1657 tSignal->setLength(AttrInfo::SectionSizeInfoLength);
1658 theFirstATTRINFO = tSignal;
1659 theCurrentATTRINFO = tSignal;
1660 theCurrentATTRINFO->next(NULL);
1672 bool locked =
false;
1673 NdbImpl* theImpl = theNdb->theImpl;
1676 if (m_scanUsingOldApi && finaliseScanOldApi() == -1)
1687 Uint32 seq = tCon->theNodeSequence;
1689 if (theImpl->get_node_alive(nodeId) &&
1690 (theImpl->getNodeSequence(nodeId) == seq)) {
1692 tCon->theMagicNumber = 0x37412619;
1694 if (doSendScan(nodeId) == -1)
1704 if (!(theImpl->get_node_stopping(nodeId) &&
1705 (theImpl->getNodeSequence(nodeId) == seq)))
1707 TRACE_DEBUG(
"The node is hard dead when attempting to start a scan");
1709 tCon->theReleaseOnClose =
true;
1713 TRACE_DEBUG(
"The node is stopping when attempting to start a scan");
1729 m_sent_receivers_count = theParallelism;
1732 m_current_api_receiver = theParallelism;
1733 m_api_receivers_count = theParallelism;
1749 const char * dummyOutRowPtr;
1751 if (unlikely(! m_scanUsingOldApi))
1767 bool fetchAllowed,
bool forceSend)
1773 NdbRecAttr *getvalue_recattr= theReceiver.theFirstRecAttr;
1774 if (((UintPtr)tBlob | (UintPtr)getvalue_recattr) != 0)
1776 const Uint32 idx= m_current_api_receiver;
1777 assert(idx < m_api_receivers_count);
1778 const NdbReceiver *receiver= m_api_receivers[idx];
1782 while (getvalue_recattr != NULL)
1784 const char *attr_data;
1786 if (receiver->getScanAttrData(attr_data, attr_size, pos) == -1)
1788 if (!getvalue_recattr->receive_data((
const Uint32 *)attr_data,
1791 getvalue_recattr= getvalue_recattr->next();
1799 const char *key_data;
1800 res= receiver->get_keyinfo20(infoword, key_length, key_data);
1806 if (tBlob->atNextResultNdbRecord(key_data, key_length*4) == -1)
1808 tBlob= tBlob->theNext;
1809 }
while (tBlob != 0);
1821 NdbScanOperation::nextResultCopyOut(
char * buffer,
1822 bool fetchAllowed,
bool forceSend)
1826 if ((result =
nextResult(&data, fetchAllowed, forceSend)) == 0)
1828 memcpy(buffer, data, m_attribute_record->m_row_size);
1835 bool fetchAllowed,
bool forceSend)
1839 (out_row, fetchAllowed, forceSend);
1842 while (m_current_api_receiver < m_api_receivers_count)
1844 NdbReceiver *tRec= m_api_receivers[m_current_api_receiver];
1845 if (tRec->nextResult())
1847 out_row= tRec->get_row();
1850 m_current_api_receiver++;
1863 Uint32 nodeId = theNdbCon->theDBnode;
1864 NdbImpl* theImpl = theNdb->theImpl;
1865 Uint32 timeout= theImpl->get_waitfor_timeout();
1874 const Uint32 seq= theNdbCon->theNodeSequence;
1881 if(seq == theImpl->getNodeSequence(nodeId) &&
1884 idx= m_current_api_receiver;
1885 last= m_api_receivers_count;
1889 setErrorCode(theError.
code);
1893 Uint32 cnt= m_conf_receivers_count;
1894 Uint32 sent= m_sent_receivers_count;
1899 memcpy(m_api_receivers+last, m_conf_receivers, cnt *
sizeof(
char*));
1901 theImpl->incClientStat(Ndb::ScanBatchCount, cnt);
1902 m_conf_receivers_count= 0;
1904 else if (retVal == 2 && sent > 0)
1907 theImpl->incClientStat(Ndb::WaitScanResultCount, 1);
1909 int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
1910 if (ret_code == 0 && seq == theImpl->getNodeSequence(nodeId)) {
1912 }
else if(ret_code == -1){
1919 else if (retVal == 2)
1934 if (tRec->nextResult())
1936 out_row= tRec->get_row();
1942 }
while(retVal == 2);
1944 m_api_receivers_count= last;
1945 m_current_api_receiver= idx;
1963 if(theError.
code == 0)
1968 setErrorCode(theError.
code);
1972 theNdbCon->theTransactionIsStarted=
false;
1973 theNdbCon->theReleaseOnClose=
true;
1982 tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(theNdbCon->m_tcRef));
1984 Uint32* theData = tSignal.getDataPtrSend();
1985 theData[0] = theNdbCon->theTCConPtr;
1986 theData[1] = stopScanFlag ==
true ? 1 : 0;
1987 Uint64 transId = theNdbCon->theTransactionId;
1988 theData[2] = (Uint32) transId;
1989 theData[3] = (Uint32) (transId >> 32);
1994 Uint32 last = m_sent_receivers_count;
1995 Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
1997 for(Uint32 i = 0; i<cnt; i++){
1999 if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
2001 m_sent_receivers[last+sent] = tRec;
2002 tRec->m_list_index = last+sent;
2003 tRec->prepareSend();
2007 memmove(m_api_receivers, m_api_receivers+cnt,
2008 (theParallelism-cnt) *
sizeof(
char*));
2013 Uint32 nodeId = theNdbCon->theDBnode;
2014 NdbImpl* impl = theNdb->theImpl;
2016 tSignal.setLength(4);
2018 ptr[0].p = prep_array;
2020 ret = impl->sendSignal(&tSignal, nodeId, ptr, 1);
2022 tSignal.setLength(4+sent);
2023 ret = impl->sendSignal(&tSignal, nodeId);
2026 m_sent_receivers_count = last + sent;
2027 m_api_receivers_count -= cnt;
2028 m_current_api_receiver = 0;
2036 NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr,
2037 Uint64 TransactionId,
2045 NdbScanOperation::doSend(
int ProcessorId)
2052 DBUG_ENTER(
"NdbScanOperation::close");
2053 DBUG_PRINT(
"enter", (
"this: 0x%lx tcon: 0x%lx con: 0x%lx force: %d release: %d",
2055 (
long) m_transConnection, (
long) theNdbCon,
2056 forceSend, releaseOp));
2058 if(m_transConnection){
2059 if(DEBUG_NEXT_RESULT)
2060 ndbout_c(
"close() theError.code = %d "
2061 "m_api_receivers_count = %d "
2062 "m_conf_receivers_count = %d "
2063 "m_sent_receivers_count = %d",
2065 m_api_receivers_count,
2066 m_conf_receivers_count,
2067 m_sent_receivers_count);
2075 PollGuard poll_guard(* theNdb->theImpl);
2085 m_transConnection = NULL;
2087 if (tTransCon && releaseOp)
2092 if (theStatus != WaitResponse)
2098 tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
2099 &tTransCon->m_theLastScanOperation,
2104 ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
2110 tCon->theScanningOp = 0;
2112 tNdb->theImpl->decClientStat(Ndb::TransCloseCount, 1);
2113 tNdb->theRemainingStartTransactions--;
2118 NdbScanOperation::execCLOSE_SCAN_REP(){
2119 m_conf_receivers_count = 0;
2120 m_sent_receivers_count = 0;
2123 void NdbScanOperation::release()
2125 if(theNdbCon != 0 || m_transConnection != 0){
2128 for(Uint32 i = 0; i<m_allocated_receivers; i++){
2129 m_receivers[
i]->release();
2133 delete[] m_scan_buffer;
2134 m_scan_buffer= NULL;
2137 NdbOperation::release();
2141 theNdb->releaseSignal(theSCAN_TABREQ);
2152 int NdbScanOperation::finaliseScanOldApi()
2162 ScanOptions options;
2163 options.optionsPresent=(ScanOptions::SO_SCANFLAGS |
2164 ScanOptions::SO_PARALLEL |
2165 ScanOptions::SO_BATCH);
2167 options.scan_flags= m_savedScanFlagsOldApi;
2168 options.parallel= m_savedParallelOldApi;
2169 options.batch= m_savedBatchOldApi;
2171 if (theDistrKeyIndicator_ == 1)
2174 options.optionsPresent |= ScanOptions::SO_PARTITION_ID;
2175 options.partitionId= theDistributionKey;
2189 const unsigned char* emptyMask=
2196 result= scanTableImpl(m_currentTable->
m_ndbrecord,
2197 m_savedLockModeOldApi,
2200 sizeof(ScanOptions));
2207 if (isop->currentRangeOldApi != NULL)
2210 if (isop->buildIndexBoundOldApi(0) != 0)
2218 const unsigned char * resultMask=
2223 result= isop->scanIndexImpl(m_accessTable->
m_ndbrecord,
2225 m_savedLockModeOldApi,
2229 sizeof(ScanOptions));
2232 if (isop->firstRangeOldApi != NULL)
2235 while (bound != NULL)
2238 *isop->getIndexBoundFromRecAttr(bound) ) != 0)
2241 bound= bound->next();
2245 isop->releaseIndexBoundsOldApi();
2251 freeInterpretedCodeOldApi();
2268 Uint64 aTransactionId){
2269 if (theInterpretIndicator != 1 ||
2272 setErrorCodeAbort(4005);
2279 assert(m_attribute_record);
2284 theReceiver.prepareSend();
2285 bool keyInfo = m_keyInfo;
2286 Uint32 key_size= keyInfo ? m_attribute_record->m_keyLenInWords : 0;
2293 Uint32 batch_size = req->first_batch_size;
2294 Uint32 batch_byte_size, first_batch_size;
2295 theReceiver.calculate_batch_size(key_size,
2300 m_attribute_record);
2301 ScanTabReq::setScanBatch(req->requestInfo, batch_size);
2302 req->batch_byte_size= batch_byte_size;
2303 req->first_batch_size= first_batch_size;
2310 Uint32 reqInfo = req->requestInfo;
2311 ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
2312 ScanTabReq::setNoDiskFlag(reqInfo, (m_flags & OF_NO_DISK) != 0);
2315 ScanTabReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_);
2316 req->requestInfo = reqInfo;
2317 req->distributionKey= theDistributionKey;
2318 theSCAN_TABREQ->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
2321 assert(theStatus == UseNdbRecord);
2323 assert(theParallelism > 0);
2324 Uint32 rowsize= NdbReceiver::ndbrecord_rowsize(m_attribute_record,
2325 theReceiver.theFirstRecAttr,
2328 Uint32 bufsize= batch_size*rowsize;
2329 char *
buf=
new char[bufsize*theParallelism];
2332 setErrorCodeAbort(4000);
2335 assert(!m_scan_buffer);
2338 for (Uint32 i = 0; i<theParallelism; i++)
2340 m_receivers[
i]->do_setup_ndbrecord(m_attribute_record, batch_size,
2341 key_size, m_read_range_no,
2347 if (doSendSetAISectionSizes() == -1)
2354 NdbScanOperation::doSendSetAISectionSizes()
2357 Uint32* sectionSizesPtr= theFirstATTRINFO->getDataPtrSend();
2358 *sectionSizesPtr++ = theInitialReadSize;
2359 *sectionSizesPtr++ = theInterpretedSize;
2360 *sectionSizesPtr++ = 0;
2361 *sectionSizesPtr++ = 0;
2362 *sectionSizesPtr = theSubroutineSize;
2377 NdbScanOperation::doSendScan(
int aProcessorId)
2379 if (theInterpretIndicator != 1 ||
2382 setErrorCodeAbort(4005);
2386 assert(theSCAN_TABREQ != NULL);
2389 if (unlikely(theTotalCurrAI_Len > ScanTabReq::MaxTotalAttrInfo)) {
2407 secs[0].sectionIter= &receiverIdIterator;
2408 secs[0].sz= theParallelism;
2410 secs[1].sectionIter= &attrInfoIter;
2411 secs[1].sz= theTotalCurrAI_Len;
2413 Uint32 numSections= 2;
2417 secs[2].sectionIter= &keyInfoIter;
2418 secs[2].sz= theTupKeyLen;
2422 NdbImpl* impl = theNdb->theImpl;
2424 const Ndb::ClientStatistics counterIndex = (numSections == 3)?
2425 Ndb::RangeScanCount :
2426 Ndb::TableScanCount;
2427 impl->incClientStat(counterIndex, 1);
2429 impl->incClientStat(Ndb::PrunedScanCount, 1);
2431 Uint32 tcNodeVersion = impl->getNodeNdbVersion(aProcessorId);
2432 bool forceShort = impl->forceShortRequests;
2433 bool sendLong = ( tcNodeVersion >= NDBD_LONG_SCANTABREQ) &&
2439 if (impl->sendFragmentedSignal(theSCAN_TABREQ,
2454 Uint32 attrInfoLen = secs[1].sz;
2455 Uint32 keyInfoLen = (numSections == 3)? secs[2].sz : 0;
2458 Uint32 connectPtr = scanTabReq->apiConnectPtr;
2459 Uint32 transId1 = scanTabReq->transId1;
2460 Uint32 transId2 = scanTabReq->transId2;
2463 scanTabReq->attrLenKeyLen = (keyInfoLen << 16) | attrInfoLen;
2466 if (impl->sendSignal(theSCAN_TABREQ, aProcessorId, &secs[0], 1) == -1)
2474 GSIReader keyInfoReader(secs[2].sectionIter);
2475 theSCAN_TABREQ->theVerId_signalNumber = GSN_KEYINFO;
2477 keyInfo->connectPtr = connectPtr;
2478 keyInfo->transId[0] = transId1;
2479 keyInfo->transId[1] = transId2;
2483 Uint32 dataWords = MIN(keyInfoLen, KeyInfo::DataLength);
2484 keyInfoReader.copyNWords(&keyInfo->keyData[0], dataWords);
2485 theSCAN_TABREQ->setLength(KeyInfo::HeaderLength + dataWords);
2487 if (impl->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
2492 keyInfoLen -= dataWords;
2496 GSIReader attrInfoReader(secs[1].sectionIter);
2497 theSCAN_TABREQ->theVerId_signalNumber = GSN_ATTRINFO;
2499 attrInfo->connectPtr = connectPtr;
2500 attrInfo->transId[0] = transId1;
2501 attrInfo->transId[1] = transId2;
2505 Uint32 dataWords = MIN(attrInfoLen, AttrInfo::DataLength);
2506 attrInfoReader.copyNWords(&attrInfo->attrData[0], dataWords);
2507 theSCAN_TABREQ->setLength(AttrInfo::HeaderLength + dataWords);
2509 if (impl->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
2514 attrInfoLen -= dataWords;
2518 theStatus = WaitResponse;
2527 NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 &
size)
2532 const Uint32 * src = (Uint32*)tRecAttr->
aRef();
2538 assert(size >= len);
2539 memcpy(data, src, 4*len);
2567 NdbScanOperation::takeOverScanOp(OperationType opType,
NdbTransaction* pTrans)
2569 if (!m_scanUsingOldApi)
2571 setErrorCodeAbort(4284);
2578 setErrorCodeAbort(4604);
2587 const char *src= NULL;
2589 Uint32 idx= m_current_api_receiver;
2590 if (idx >= m_api_receivers_count)
2592 const NdbReceiver *receiver= m_api_receivers[m_current_api_receiver];
2595 int res= receiver->get_keyinfo20(infoword, len, src);
2603 pTrans->theSimpleState = 0;
2606 assert(len < 16384);
2608 newOp->theTupKeyLen = len;
2609 newOp->theOperationType = opType;
2613 newOp->theLockMode = theLockMode;
2616 newOp->theStatus = GetValue;
2619 newOp->theStatus = SetValue;
2621 const Uint32 tScanInfo = infoword & 0x3FFFF;
2622 const Uint32 tTakeOverFragment = infoword >> 20;
2625 TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
2626 TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
2627 TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
2628 newOp->theScanInfo = scanInfo;
2629 newOp->theDistrKeyIndicator_ = 1;
2630 newOp->theDistributionKey = tTakeOverFragment;
2634 TcKeyReq * tcKeyReq = CAST_PTR(
TcKeyReq,newOp->theTCREQ->getDataPtrSend());
2635 Uint32 i = MIN(TcKeyReq::MaxKeyInfo, len);
2636 memcpy(tcKeyReq->keyInfo, src, 4*i);
2641 newOp->theTCREQ->next(tSignal);
2643 Uint32 left = len -
i;
2644 while(tSignal && left > KeyInfo::DataLength){
2645 tSignal->setSignal(GSN_KEYINFO, refToBlock(pTrans->m_tcRef));
2646 tSignal->setLength(KeyInfo::MaxSignalLength);
2647 KeyInfo * keyInfo = CAST_PTR(
KeyInfo, tSignal->getDataPtrSend());
2648 memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
2649 src += 4 * KeyInfo::DataLength;
2650 left -= KeyInfo::DataLength;
2652 tSignal->next(theNdb->getSignal());
2653 tSignal = tSignal->next();
2654 newOp->theLastKEYINFO = tSignal;
2657 if(tSignal && left > 0){
2658 tSignal->setSignal(GSN_KEYINFO, refToBlock(pTrans->m_tcRef));
2659 tSignal->setLength(KeyInfo::HeaderLength + left);
2660 newOp->theLastKEYINFO = tSignal;
2661 KeyInfo * keyInfo = CAST_PTR(
KeyInfo, tSignal->getDataPtrSend());
2662 memcpy(keyInfo->keyData, src, 4 * left);
2668 if (opType ==
DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
2669 for (
unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
2672 if (c->getBlobType()) {
2683 NdbScanOperation::takeOverScanOpNdbRecord(OperationType opType,
2687 const unsigned char *mask,
2689 Uint32 sizeOfOptions)
2693 if (!m_attribute_record)
2695 setErrorCodeAbort(4284);
2700 setErrorCodeAbort(4285);
2706 setErrorCodeAbort(4604);
2709 if (record->flags & NdbRecord::RecIsIndex)
2712 setErrorCodeAbort(4340);
2715 if (m_blob_lock_upgraded)
2723 setErrorCodeAbort(4604);
2731 pTrans->theSimpleState= 0;
2732 op->theStatus= NdbOperation::UseNdbRecord;
2733 op->theOperationType= opType;
2735 op->m_key_record= NULL;
2736 op->m_attribute_record= record;
2745 Uint32 idx= m_current_api_receiver;
2746 if (idx >= m_api_receivers_count)
2748 const NdbReceiver *receiver= m_api_receivers[m_current_api_receiver];
2750 res= receiver->get_keyinfo20(infoword, op->m_keyinfo_length, op->m_key_row);
2754 TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
2755 Uint32 fragment= infoword >> 20;
2756 TcKeyReq::setTakeOverScanFragment(scanInfo, fragment);
2757 TcKeyReq::setTakeOverScanInfo(scanInfo, infoword & 0x3FFFF);
2758 op->theScanInfo= scanInfo;
2759 op->theDistrKeyIndicator_= 1;
2760 op->theDistributionKey= fragment;
2762 op->m_attribute_row= row;
2764 record->copyMask(readMask.rep.data, mask);
2768 op->theLockMode= theLockMode;
2773 op->theReceiver.getValues(record, row);
2778 op->theReceiver.getValues(record, row);
2786 Uint32 result = NdbOperation::handleOperationOptions (opType,
2792 setErrorCodeAbort(result);
2803 if (unlikely(record->flags & NdbRecord::RecHasBlob))
2805 if (op->getBlobHandlesNdbRecord(pTrans, readMask.rep.data) == -1)
2816 if (unlikely(record->flags & NdbRecord::RecTableHasBlob))
2818 if (op->getBlobHandlesNdbRecordDelete(pTrans,
2820 readMask.rep.data) == -1)
2831 int returnCode=op->buildSignalsNdbRecord(pTrans->theTCConPtr,
2832 pTrans->theTransactionId,
2848 const NdbColumnImpl* col= m_currentTable->getColumn(anAttrName);
2855 if (m_scanUsingOldApi)
2856 m_savedScanFlagsOldApi|= SF_KeyInfo;
2872 const NdbColumnImpl* col= m_currentTable->getColumn(anAttrId);
2879 if (m_scanUsingOldApi)
2880 m_savedScanFlagsOldApi|= SF_KeyInfo;
2903 DBUG_ENTER(
"NdbScanOperation::getValue_NdbRecord_scan");
2906 DBUG_PRINT(
"info", (
"Column: %u", attrInfo->m_attrId));
2908 if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
2910 m_flags &= ~Uint8(OF_NO_DISK);
2913 res= insertATTRINFOHdr_NdbRecord(attrInfo->m_attrId, 0);
2917 theInitialReadSize= theTotalCurrAI_Len - AttrInfo::SectionSizeInfoLength;
2918 ra= theReceiver.getValue(attrInfo, aValue);
2921 setErrorCodeAbort(4000);
2945 if (attrInfo != NULL)
2947 if (attrInfo->m_storageType == NDB_STORAGETYPE_DISK)
2949 m_flags &= ~Uint8(OF_NO_DISK);
2952 recAttr = theReceiver.getValue(attrInfo, aValue);
2954 if (recAttr != NULL)
2958 setErrorCodeAbort(4000);
2963 setErrorCodeAbort(4004);
2970 NdbScanOperation::getValue_impl(
const NdbColumnImpl *attrInfo,
char *aValue)
2972 if (theStatus == UseNdbRecord)
2978 NdbIndexScanOperation::NdbIndexScanOperation(
Ndb* aNdb)
2981 firstRangeOldApi= NULL;
2982 lastRangeOldApi= NULL;
2983 currentRangeOldApi= NULL;
2987 NdbIndexScanOperation::~NdbIndexScanOperation(){
2994 return setBound(m_accessTable->getColumn(anAttrName),
type, aValue);
3001 return setBound(m_accessTable->getColumn(anAttrId),
type, aValue);
3005 NdbIndexScanOperation::equal_impl(
const NdbColumnImpl* anAttrObject,
3012 NdbIndexScanOperation::getValue_impl(
const NdbColumnImpl* attrInfo,
3018 return NdbScanOperation::getValue_impl(attrInfo, aValue);
3027 NdbIndexScanOperation::setBoundHelperOldApi(OldApiBoundInfo& boundInfo,
3028 Uint32 maxKeyRecordBytes,
3029 Uint32 index_attrId,
3033 Uint32 nullbit_byte_offset,
3034 Uint32 nullbit_bit_in_byte,
3037 Uint32 presentBitMask= (1 << (index_attrId & 0x1f));
3039 if ((boundInfo.keysPresentBitmap & presentBitMask) != 0)
3042 setErrorCodeAbort(4522);
3047 boundInfo.keysPresentBitmap |= presentBitMask;
3049 if ((index_attrId + 1) > boundInfo.highestKey)
3053 if (boundInfo.highestSoFarIsStrict)
3056 setErrorCodeAbort(4259);
3059 boundInfo.highestKey= (index_attrId + 1);
3060 boundInfo.highestSoFarIsStrict= !inclusive;
3068 setErrorCodeAbort(4259);
3074 assert(byteOffset + valueLen <= maxKeyRecordBytes);
3076 memcpy(boundInfo.key + byteOffset,
3081 bool nullBit=(aValue == NULL);
3083 boundInfo.key[nullbit_byte_offset]|=
3084 (nullBit) << nullbit_bit_in_byte;
3094 int type,
const void* aValue)
3098 setErrorCodeAbort(4318);
3102 (0 <= type && type <= 4))
3105 const Uint32 maxKeyRecordBytes= key_record->m_row_size;
3107 Uint32 valueLen = 0;
3109 if (! tAttrInfo->get_var_length(aValue, valueLen)) {
3111 setErrorCodeAbort(4209);
3116 Uint32 byteOffset= 0;
3119 Uint32 attrId= tAttrInfo->m_attrId;
3121 if (attrId >= key_record->key_index_length)
3124 setErrorCodeAbort(4535);
3127 Uint32 columnNum= key_record->key_indexes[ attrId ];
3129 if (columnNum >= key_record->noOfColumns)
3132 setErrorCodeAbort(4005);
3138 byteOffset= attr.offset;
3142 if (currentRangeOldApi == NULL)
3145 NdbRecAttr* boundSpace= theNdb->getRecAttr();
3146 if (boundSpace == NULL)
3149 setErrorCodeAbort(4000);
3152 if (boundSpace->setup(
sizeof(OldApiScanRangeDefinition) +
3153 (2 * maxKeyRecordBytes) - 1, NULL) != 0)
3155 theNdb->releaseRecAttr(boundSpace);
3157 setErrorCodeAbort(4000);
3162 OldApiScanRangeDefinition* boundsDef=
3163 (OldApiScanRangeDefinition*) boundSpace->
aRef();
3165 boundsDef->oldBound.lowBound.highestKey = 0;
3166 boundsDef->oldBound.lowBound.highestSoFarIsStrict =
false;
3168 assert(NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY == 32);
3169 boundsDef->oldBound.lowBound.keysPresentBitmap = 0;
3171 boundsDef->oldBound.highBound= boundsDef->oldBound.lowBound;
3172 boundsDef->oldBound.lowBound.key= &boundsDef->space[ 0 ];
3173 boundsDef->oldBound.highBound.key= &boundsDef->space[ maxKeyRecordBytes ];
3175 currentRangeOldApi= boundSpace;
3178 OldApiScanRangeDefinition* bounds=
3179 (OldApiScanRangeDefinition*) currentRangeOldApi->
aRef();
3187 if (setBoundHelperOldApi(bounds->oldBound.lowBound,
3189 tAttrInfo->m_attrId,
3193 attr.nullbit_byte_offset,
3194 attr.nullbit_bit_in_byte,
3204 if (setBoundHelperOldApi(bounds->oldBound.highBound,
3206 tAttrInfo->m_attrId,
3210 attr.nullbit_byte_offset,
3211 attr.nullbit_bit_in_byte,
3219 setErrorCodeAbort(4514);
3234 NdbIndexScanOperation::buildIndexBoundOldApi(
int range_no)
3237 OldApiScanRangeDefinition* boundDef=
3238 (OldApiScanRangeDefinition*) currentRangeOldApi->
aRef();
3242 if (boundDef->oldBound.lowBound.highestKey != 0)
3248 Uint32 expectedValue= (~(Uint32) 0) >> (32 - boundDef->oldBound.lowBound.highestKey);
3250 if (boundDef->oldBound.lowBound.keysPresentBitmap != expectedValue)
3253 setErrorCodeAbort(4259);
3257 ib.low_key= boundDef->oldBound.lowBound.key;
3258 ib.low_key_count= boundDef->oldBound.lowBound.highestKey;
3259 ib.low_inclusive= !boundDef->oldBound.lowBound.highestSoFarIsStrict;
3265 ib.low_key_count= 0;
3266 ib.low_inclusive=
false;
3269 if (boundDef->oldBound.highBound.highestKey != 0)
3274 Uint32 expectedValue= (~(Uint32) 0) >> (32 - boundDef->oldBound.highBound.highestKey);
3276 if (boundDef->oldBound.highBound.keysPresentBitmap != expectedValue)
3279 setErrorCodeAbort(4259);
3283 ib.high_key= boundDef->oldBound.highBound.key;
3284 ib.high_key_count= boundDef->oldBound.highBound.highestKey;
3285 ib.high_inclusive= !boundDef->oldBound.highBound.highestSoFarIsStrict;
3291 ib.high_key_count= 0;
3292 ib.high_inclusive=
false;
3295 ib.range_no= range_no;
3299 assert( currentRangeOldApi->next() == NULL );
3301 if (lastRangeOldApi == NULL)
3304 assert( firstRangeOldApi == NULL );
3305 firstRangeOldApi= lastRangeOldApi= currentRangeOldApi;
3310 assert( firstRangeOldApi != NULL );
3311 assert( lastRangeOldApi->next() == NULL );
3312 lastRangeOldApi->next(currentRangeOldApi);
3313 lastRangeOldApi= currentRangeOldApi;
3316 currentRangeOldApi= NULL;
3322 NdbIndexScanOperation::getIndexBoundFromRecAttr(
NdbRecAttr* recAttr)
3324 return &((OldApiScanRangeDefinition*)recAttr->
aRef())->ib;
3330 NdbIndexScanOperation::releaseIndexBoundsOldApi()
3333 while (bound != NULL)
3336 bound= bound->next();
3337 theNdb->releaseRecAttr(release);
3340 if (currentRangeOldApi != NULL)
3341 theNdb->releaseRecAttr(currentRangeOldApi);
3343 firstRangeOldApi= lastRangeOldApi= currentRangeOldApi= NULL;
3348 NdbIndexScanOperation::ndbrecord_insert_bound(
const NdbRecord *key_record,
3349 Uint32 column_index,
3352 Uint32*& firstWordOfBound)
3354 char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
3357 bool is_null= column->is_null(row);
3359 const void *aValue= row+column->offset;
3365 if (column->flags & NdbRecord::IsMysqldShrinkVarchar)
3367 len_ok= column->shrink_varchar(row, len, buf);
3372 len_ok= column->get_var_length(row, len);
3375 setErrorCodeAbort(4209);
3381 if (unlikely(insertKEYINFO_NdbRecord((
const char*) &bound_type,
3385 setErrorCodeAbort(4000);
3389 assert( theKEYINFOptr != NULL );
3391 if (firstWordOfBound == NULL)
3392 firstWordOfBound= theKEYINFOptr - 1;
3397 if (unlikely(insertKEYINFO_NdbRecord((
const char*) &ah.m_value,
3399 insertKEYINFO_NdbRecord((
const char*) aValue, len) ))
3402 setErrorCodeAbort(4000);
3410 NdbIndexScanOperation::insert_open_bound(
const NdbRecord *key_record,
3411 Uint32*& firstWordOfBound)
3425 if (unlikely(insertKEYINFO_NdbRecord((
const char*) &bound_type,
3429 setErrorCodeAbort(4000);
3434 if (firstWordOfBound == NULL)
3435 firstWordOfBound= theKEYINFOptr - 1;
3444 if (unlikely(insertKEYINFO_NdbRecord((
const char*) &ah.m_value,
3448 setErrorCodeAbort(4000);
3471 ( (
int) m_accessTable->m_indexType ==
3472 (
int) NdbDictionary::Index::OrderedIndex))
3474 if (m_currentTable == m_accessTable){
3476 m_currentTable = theNdb->theDictionary->
3478 assert(m_currentTable != NULL);
3480 assert (m_currentTable != m_accessTable);
3482 theStatus = GetValue;
3493 NdbIndexScanOperation::processIndexScanDefs(LockMode lm,
3498 const bool order_by = scan_flags & (SF_OrderBy |
SF_OrderByFull);
3499 const bool order_desc = scan_flags & SF_Descending;
3500 const bool read_range_no = scan_flags & SF_ReadRangeNo;
3501 m_multi_range = scan_flags & SF_MultiRange;
3504 int res = NdbScanOperation::processTableScanDefs(lm,
3508 if(!res && read_range_no)
3510 m_read_range_no = 1;
3511 if (insertATTRINFOHdr_NdbRecord(AttributeHeader::RANGE_NO,
3527 m_descending =
true;
3529 ScanTabReq::setDescendingFlag(req->requestInfo,
true);
3534 m_sort_columns = cnt;
3535 m_current_api_receiver = m_sent_receivers_count;
3536 m_api_receivers_count = m_sent_receivers_count;
3540 assert (m_attribute_record);
3544 m_previous_range_num = 0;
3557 int jdir= 1 - 2 * (int)descending;
3559 assert(jdir == 1 || jdir == -1);
3561 const char *a_row= r1->peek_row();
3562 const char *b_row= r2->peek_row();
3567 Uint32 a_range_no= uint4korr(a_row+result_record->m_row_size);
3568 Uint32 b_range_no= uint4korr(b_row+result_record->m_row_size);
3569 if (a_range_no != b_range_no)
3570 return (a_range_no < b_range_no ? -1 : 1);
3573 for (i= 0; i<key_record->key_index_length; i++)
3576 &key_record->columns[key_record->key_indexes[
i]];
3577 assert(key_col->attrId < result_record->m_attrId_indexes_length);
3579 assert(col_idx >= 0);
3580 assert((Uint32)col_idx < result_record->noOfColumns);
3583 bool a_is_null= result_col->is_null(a_row);
3584 bool b_is_null= result_col->is_null(b_row);
3595 Uint32 offset= result_col->offset;
3596 Uint32 maxSize= result_col->maxSize;
3597 const char *a_ptr= a_row +
offset;
3598 const char *b_ptr= b_row +
offset;
3599 void *info= result_col->charset_info;
3601 (*result_col->compare_function)
3602 (info, a_ptr, maxSize, b_ptr, maxSize);
3626 NdbIndexScanOperation::next_result_ordered_ndbrecord(
const char * & out_row,
3638 if (m_current_api_receiver==theParallelism ||
3639 !m_api_receivers[m_current_api_receiver]->
nextResult())
3645 int count= ordered_send_scan_wait_for_all(forceSend);
3653 current= m_current_api_receiver;
3654 for (
int i= 0; i < count; i++)
3655 ordered_insert_receiver(current--, m_conf_receivers[i]);
3656 m_current_api_receiver= current;
3657 theNdb->theImpl->incClientStat(Ndb::ScanBatchCount, count);
3665 current= m_current_api_receiver;
3666 ordered_insert_receiver(current + 1, m_api_receivers[current]);
3670 if (current < theParallelism && m_api_receivers[current]->
nextResult())
3672 out_row= m_api_receivers[current]->get_row();
3684 NdbIndexScanOperation::ordered_insert_receiver(Uint32 start,
3692 Uint32 first= start;
3693 Uint32 last= theParallelism;
3694 while (first < last)
3696 Uint32 idx= (first+last)/2;
3697 int res= compare_ndbrecord(receiver,
3698 m_api_receivers[idx],
3711 memmove(&m_api_receivers[start-1],
3712 &m_api_receivers[start],
3713 (last - start) *
sizeof(m_api_receivers[0]));
3714 m_api_receivers[last-1]= receiver;
3739 NdbIndexScanOperation::ordered_send_scan_wait_for_all(
bool forceSend)
3741 NdbImpl* impl = theNdb->theImpl;
3742 Uint32 timeout= impl->get_waitfor_timeout();
3748 Uint32 seq= theNdbCon->theNodeSequence;
3749 Uint32 nodeId= theNdbCon->theDBnode;
3750 if (seq == impl->getNodeSequence(nodeId) &&
3751 !send_next_scan_ordered(m_current_api_receiver))
3753 impl->incClientStat(Ndb::WaitScanResultCount, 1);
3754 while (m_sent_receivers_count > 0 && !theError.
code)
3756 int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
3757 if (ret_code == 0 && seq == impl->getNodeSequence(nodeId))
3768 setErrorCode(theError.
code);
3772 Uint32 new_receivers= m_conf_receivers_count;
3773 m_conf_receivers_count= 0;
3774 return new_receivers;
3794 NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
3796 if(idx == theParallelism)
3801 tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(theNdbCon->m_tcRef));
3803 Uint32 last = m_sent_receivers_count;
3804 Uint32* theData = tSignal.getDataPtrSend();
3805 Uint32* prep_array = theData + 4;
3807 m_current_api_receiver = idx + 1;
3808 if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
3810 if(DEBUG_NEXT_RESULT)
3811 ndbout_c(
"receiver completed, don't send");
3815 theData[0] = theNdbCon->theTCConPtr;
3817 Uint64 transId = theNdbCon->theTransactionId;
3818 theData[2] = (Uint32) transId;
3819 theData[3] = (Uint32) (transId >> 32);
3824 m_sent_receivers[last] = tRec;
3825 tRec->m_list_index = last;
3826 tRec->prepareSend();
3827 m_sent_receivers_count = last + 1;
3829 Uint32 nodeId = theNdbCon->theDBnode;
3830 NdbImpl * impl = theNdb->theImpl;
3831 tSignal.setLength(4+1);
3832 int ret= impl->sendSignal(&tSignal, nodeId);
3839 NdbImpl* impl = theNdb->theImpl;
3840 Uint32 timeout= impl->get_waitfor_timeout();
3841 Uint32 seq = theNdbCon->theNodeSequence;
3842 Uint32 nodeId = theNdbCon->theDBnode;
3844 if (seq != impl->getNodeSequence(nodeId))
3846 theNdbCon->theReleaseOnClose =
true;
3853 impl->incClientStat(Ndb::WaitScanResultCount, 1);
3854 while(theError.
code == 0 && m_sent_receivers_count)
3856 int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
3857 switch(return_code){
3863 m_api_receivers_count = 0;
3864 m_conf_receivers_count = 0;
3865 m_sent_receivers_count = 0;
3866 theNdbCon->theReleaseOnClose =
true;
3873 m_api_receivers_count = 0;
3874 m_current_api_receiver = m_ordered ? theParallelism : 0;
3882 Uint32 api = m_api_receivers_count;
3883 Uint32 conf = m_conf_receivers_count;
3890 memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
3891 (theParallelism - m_current_api_receiver) *
sizeof(
char*));
3892 api = (theParallelism - m_current_api_receiver);
3893 m_api_receivers_count = api;
3896 if(DEBUG_NEXT_RESULT)
3897 ndbout_c(
"close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
3898 m_ordered, api, conf,
3899 m_sent_receivers_count, m_current_api_receiver, theParallelism);
3907 memcpy(m_api_receivers+api, m_conf_receivers, conf *
sizeof(
char*));
3908 m_api_receivers_count = api + conf;
3909 m_conf_receivers_count = 0;
3915 theNdbCon->theReleaseOnClose =
true;
3922 impl->incClientStat(Ndb::WaitScanResultCount, 1);
3923 while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
3925 int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
3926 switch(return_code){
3932 m_api_receivers_count = 0;
3933 m_conf_receivers_count = 0;
3934 m_sent_receivers_count = 0;
3935 theNdbCon->theReleaseOnClose =
true;
3949 isop->releaseIndexBoundsOldApi();
3955 freeInterpretedCodeOldApi();
3961 NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
3962 for(Uint32 i = 0; i<parallell; i++){
3963 m_receivers[
i]->m_list_index =
i;
3964 m_prepared_receivers[
i] = m_receivers[
i]->getId();
3965 m_sent_receivers[
i] = m_receivers[
i];
3966 m_conf_receivers[
i] = 0;
3967 m_api_receivers[
i] = 0;
3968 m_receivers[
i]->prepareSend();
3971 m_api_receivers_count = 0;
3972 m_current_api_receiver = 0;
3973 m_sent_receivers_count = 0;
3974 m_conf_receivers_count = 0;
3980 DBUG_ENTER(
"end_of_bound");
3981 DBUG_PRINT(
"info", (
"Range number %u", no));
3983 if (! (m_savedScanFlagsOldApi & SF_MultiRange || no == 0))
3985 setErrorCodeAbort(4509);
3990 if (currentRangeOldApi == NULL)
3992 setErrorCodeAbort(4259);
4002 (m_savedScanFlagsOldApi & SF_ReadRangeNo))
4004 Uint32 expectedNum= 0;
4006 if (lastRangeOldApi != NULL)
4008 assert( firstRangeOldApi != NULL );
4010 getIndexBoundFromRecAttr(lastRangeOldApi)->range_no + 1;
4013 if (no != expectedNum)
4015 setErrorCodeAbort(4282);
4021 if (buildIndexBoundOldApi(no) != 0)
4030 assert(m_attribute_record);
4032 if (m_read_range_no)
4034 Uint32 idx= m_current_api_receiver;
4035 if (idx >= m_api_receivers_count)
4038 const NdbReceiver *tRec= m_api_receivers[m_current_api_receiver];
4039 return tRec->get_range_no();
4048 const unsigned char *result_mask,
4050 Uint32 sizeOfOptions)
4052 unsigned char empty_mask[NDB_MAX_ATTRIBUTES_IN_TABLE>>3];
4056 bzero(empty_mask,
sizeof(empty_mask));
4057 result_mask= &empty_mask[0];
4060 result_rec, result_row,
4061 result_mask, opts, sizeOfOptions);
4070 return ((m_pruneState == SPS_ONE_PARTITION) ||
4071 (m_pruneState == SPS_FIXED));