19 #include <signaldata/TcKeyReq.hpp>
21 #include <ndb_version.h>
27 static const bool g_ndb_blob_ok_to_read_index_table =
false;
40 version = theEventBlobVersion;
46 NdbBlob::setState(State newState)
48 DBUG_ENTER(
"NdbBlob::setState");
49 DBUG_PRINT(
"info", (
"this=%p newState=%u",
this, newState));
59 DBUG_ENTER(
"NdbBlob::getBlobTableName");
60 NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
73 DBUG_ENTER(
"NdbBlob::getBlobTableName");
74 assert(t != 0 && c != 0 && c->getBlobType() && c->
getPartSize() != 0);
75 memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
76 sprintf(btname,
"NDB$BLOB_%d_%d", (
int)t->m_id, (
int)c->m_column_no);
77 DBUG_PRINT(
"info", (
"blob table name: %s", btname));
84 DBUG_ENTER(
"NdbBlob::getBlobTable");
85 const int blobVersion = c->getBlobVersion();
86 assert(blobVersion == NDB_BLOB_V1 || blobVersion == NDB_BLOB_V2);
87 char btname[NdbBlobImpl::BlobTableNameSize];
96 bt.m_primaryTableId = t->m_id;
100 bt.m_tablespace_id = t->m_tablespace_id;
101 bt.m_tablespace_version = t->m_tablespace_version;
104 DBUG_PRINT(
"info", (
"Define BLOB table V%d with"
105 " primary table = %u and Fragment Type = %u",
108 if (unlikely(blobVersion == NDB_BLOB_V1)) {
114 error.
code = NdbBlobImpl::ErrTable;
119 assert(t->m_keyLenInWords != 0);
120 bc.setLength(t->m_keyLenInWords);
121 bc.setPrimaryKey(
true);
122 bc.setDistributionKey(
true);
127 bc.setPrimaryKey(
true);
128 bc.setDistributionKey(
true);
133 bc.setPrimaryKey(
true);
134 bc.setDistributionKey(
false);
150 bc.setStorageType(c->getStorageType());
156 const uint columns = t->m_columns.size();
160 for (i = 0; n < noOfKeys; i++) {
169 if (c->getDistributionKey()) {
170 bc->setDistributionKey(
true);
173 bc->setAutoIncrement(
false);
184 bc.setDistributionKey(
true);
190 bc.setDistributionKey(
false);
197 bc.setDistributionKey(
false);
202 const Uint32 storageType = (Uint32)c->getStorageType();
205 if (storageType == NDB_STORAGETYPE_MEMORY)
211 if (storageType == NDB_STORAGETYPE_MEMORY)
222 bc.setStorageType(c->getStorageType());
232 NdbEventImpl* e = anNdb->theDictionary->m_impl.getEvent(eventName);
253 DBUG_ENTER(
"NdbBlob::getBlobEvent");
255 assert(c->m_blobTable != NULL);
258 char bename[MAX_TAB_NAME_SIZE+1];
260 bename[
sizeof(bename)-1]= 0;
264 be.mi_type = e->mi_type;
266 be.m_mergeEvents = e->m_mergeEvents;
269 be.
setReport(NdbDictionary::Event::ER_ALL);
288 NdbBlob::NdbBlob(
Ndb*)
297 theFixedDataFlag =
false;
301 theEventBlobVersion = -1;
302 theBtColumnNo[0] = -1;
303 theBtColumnNo[1] = -1;
304 theBtColumnNo[2] = -1;
305 theBtColumnNo[3] = -1;
306 theBtColumnNo[4] = -1;
311 theBlobEventOp = NULL;
312 theBlobEventPkRecAttr = NULL;
313 theBlobEventDistRecAttr = NULL;
314 theBlobEventPartRecAttr = NULL;
315 theBlobEventPkidRecAttr = NULL;
316 theBlobEventDataRecAttr = NULL;
318 theAccessTable = NULL;
328 theSetValueInPreExecFlag =
false;
331 thePendingBlobOps = 0;
332 theActiveHook = NULL;
333 theActiveHookArg = NULL;
335 theInlineData = NULL;
336 theHeadInlineRecAttr = NULL;
337 theHeadInlineReadOp = NULL;
338 theHeadInlineUpdateFlag =
false;
339 userDefinedPartitioning =
false;
340 thePartitionId = noPartitionId();
341 thePartitionIdRecAttr = NULL;
352 theAccessKeyBuf.release();
353 thePackKeyBuf.release();
354 theHeadInlineBuf.release();
355 theHeadInlineCopyBuf.release();
356 thePartBuf.release();
357 theBlobEventDataBuf.release();
363 NdbBlob::Buf::Buf() :
376 NdbBlob::Buf::alloc(
unsigned n)
388 memset(data,
'X', maxsize);
393 NdbBlob::Buf::release()
403 NdbBlob::Buf::zerorest()
405 assert(
size <= maxsize);
406 memset(data +
size, 0, maxsize -
size);
410 NdbBlob::Buf::copyfrom(
const NdbBlob::Buf& src)
413 memcpy(data, src.data,
size);
421 return theTable == theAccessTable;
427 return theTable != theAccessTable;
451 NdbBlob::isInsertOp()
458 NdbBlob::isUpdateOp()
472 NdbBlob::isDeleteOp()
487 NdbBlob::isReadOnlyOp()
497 NdbBlob::isTakeOverOp()
500 TcKeyReq::getTakeOverScanFlag(theNdbOp->theScanInfo);
506 NdbBlob::getPartNumber(Uint64 pos)
508 assert(thePartSize != 0 && pos >= theInlineSize);
509 Uint64 partNo = (pos - theInlineSize) / thePartSize;
510 assert(partNo < (Uint64(1) << 32));
511 return Uint32(partNo);
515 NdbBlob::getPartOffset(Uint64 pos)
517 assert(thePartSize != 0 && pos >= theInlineSize);
518 return (pos - theInlineSize) % thePartSize;
522 NdbBlob::getPartCount()
524 if (theLength <= theInlineSize)
526 return 1 + getPartNumber(theLength - 1);
530 NdbBlob::getDistKey(Uint32 part)
532 assert(theStripeSize != 0);
534 if (unlikely(theBlobVersion == NDB_BLOB_V1))
535 dist = (part / theStripeSize) % theStripeSize;
538 dist = (part / theStripeSize);
552 if (userDefinedPartitioning &&
553 (thePartitionId != noPartitionId())) {
568 if (userDefinedPartitioning) {
569 assert(thePartitionId != noPartitionId());
577 NdbBlob::packKeyValue(
const NdbTableImpl* aTable,
const Buf& srcBuf)
579 DBUG_ENTER(
"NdbBlob::packKeyValue");
580 const Uint32* data = (
const Uint32*)srcBuf.data;
582 Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
583 unsigned pack_pos = 0;
584 for (
unsigned i = 0; i < aTable->m_columns.size(); i++) {
588 unsigned len = c->m_attrSize * c->m_arraySize;
590 bool ok = c->get_var_length(&data[pos], pack_len);
592 setErrorCode(NdbBlobImpl::ErrCorruptPK);
595 memcpy(&pack_data[pack_pos], &data[pos], pack_len);
596 while (pack_len % 4 != 0) {
597 char* p = (
char*)&pack_data[pack_pos] + pack_len++;
600 pos += (len + 3) / 4;
601 pack_pos += pack_len / 4;
604 assert(4 * pos == srcBuf.size);
605 assert(4 * pack_pos <= thePackKeyBuf.maxsize);
606 thePackKeyBuf.size = 4 * pack_pos;
607 thePackKeyBuf.zerorest();
612 NdbBlob::unpackKeyValue(
const NdbTableImpl* aTable, Buf& dstBuf)
614 DBUG_ENTER(
"NdbBlob::unpackKeyValue");
615 Uint32* data = (Uint32*)dstBuf.data;
617 const Uint32* pack_data = (
const Uint32*)thePackKeyBuf.data;
618 unsigned pack_pos = 0;
619 for (
unsigned i = 0; i < aTable->m_columns.size(); i++) {
623 unsigned len = c->m_attrSize * c->m_arraySize;
625 bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
627 setErrorCode(NdbBlobImpl::ErrCorruptPK);
630 memcpy(&data[pos], &pack_data[pack_pos], pack_len);
631 while (pack_len % 4 != 0) {
632 char* p = (
char*)&data[pos] + pack_len++;
635 pos += (len + 3) / 4;
636 pack_pos += pack_len / 4;
639 assert(4 * pos == dstBuf.size);
640 assert(4 * pack_pos == thePackKeyBuf.size);
647 Buf& packedBuf, Buf& unpackedBuf)
649 char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
650 DBUG_ENTER(
"NdbBlob::copyKeyFromRow");
652 assert(record->flags & NdbRecord::RecHasAllKeys);
654 char *packed= packedBuf.data;
655 char *unpacked= unpackedBuf.data;
657 for (Uint32 i= 0; i < record->key_index_length; i++)
664 if (col->flags & NdbRecord::IsMysqldShrinkVarchar)
667 len_ok= col->shrink_varchar(row, len, buf);
672 len_ok= col->get_var_length(row, len);
673 src= &row[col->offset];
678 setErrorCode(NdbBlobImpl::ErrCorruptPK);
683 memcpy(packed, src, len);
684 memcpy(unpacked, src, len);
687 Uint32 packed_len= (len + 3) & ~3;
688 Uint32 unpacked_len= (col->maxSize + 3) & ~3;
689 Uint32 packed_pad= packed_len - len;
690 Uint32 unpacked_pad= unpacked_len - len;
692 bzero(packed + len, packed_pad);
693 if (unpacked_pad > 0)
694 bzero(unpacked + len, unpacked_pad);
696 unpacked+= unpacked_len;
699 packedBuf.size= packed - packedBuf.data;
700 packedBuf.zerorest();
701 assert(unpacked == unpackedBuf.data + unpackedBuf.size);
715 NdbBlob::getNullOrEmptyBlobHeadDataPtr(
const char * & data,
721 assert(theState==Prepared);
722 assert(theLength==0);
723 assert(theSetBuf==NULL);
724 assert(theGetSetBytes==0);
726 assert(theHeadInlineBuf.data!=NULL);
728 DBUG_PRINT(
"info", (
"getNullOrEmptyBlobHeadDataPtr. Nullable : %d",
729 theColumn->m_nullable));
731 if (theColumn->m_nullable)
744 prepareSetHeadInlineValue();
746 data=theHeadInlineBuf.data;
749 if (unlikely(theBlobVersion == NDB_BLOB_V1))
750 byteSize = theHeadInlineBuf.size;
752 byteSize = theHead.varsize + 2;
756 memset(&theHead, 0,
sizeof(theHead));
768 NdbBlob::packBlobHead(
const Head& head,
char* buf,
int blobVersion)
770 DBUG_ENTER(
"NdbBlob::packBlobHead");
771 DBUG_PRINT(
"info", (
"version=%d", blobVersion));
772 if (unlikely(blobVersion == NDB_BLOB_V1)) {
774 memcpy(buf, &head.length,
sizeof(head.length));
776 unsigned char* p = (
unsigned char*)buf;
779 for (i = 0, n = 0; i < 2; i++, n += 8)
780 *p++ = (head.varsize >> n) & 0xff;
781 for (i = 0, n = 0; i < 2; i++, n += 8)
782 *p++ = (head.reserved >> n) & 0xff;
783 for (i = 0, n = 0; i < 4; i++, n += 8)
784 *p++ = (head.pkid >> n) & 0xff;
785 for (i = 0, n = 0; i < 8; i++, n += 8)
786 *p++ = (head.length >> n) & 0xff;
787 assert(p - (uchar*)buf == 16);
788 assert(head.reserved == 0);
789 DBUG_DUMP(
"info", (uchar*)buf, 16);
791 DBUG_PRINT(
"info", (
"pack: varsize=%u length=%u pkid=%u",
792 (uint)head.varsize, (uint)head.length, (uint)head.pkid));
797 NdbBlob::unpackBlobHead(Head& head,
const char* buf,
int blobVersion)
799 DBUG_ENTER(
"NdbBlob::unpackBlobHead");
800 DBUG_PRINT(
"info", (
"version=%d", blobVersion));
805 if (unlikely(blobVersion == NDB_BLOB_V1)) {
807 memcpy(&head.length, buf,
sizeof(head.length));
808 head.headsize = (NDB_BLOB_V1_HEAD_SIZE << 2);
810 const unsigned char* p = (
const unsigned char*)buf;
813 for (i = 0, n = 0; i < 2; i++, n += 8)
814 head.varsize |= ((Uint16)*p++ <<
n);
815 for (i = 0, n = 0; i < 2; i++, n += 8)
816 head.reserved |= ((Uint32)*p++ <<
n);
817 for (i = 0, n = 0; i < 4; i++, n += 8)
818 head.pkid |= ((Uint32)*p++ <<
n);
819 for (i = 0, n = 0; i < 8; i++, n += 8)
820 head.length |= ((Uint64)*p++ <<
n);
821 assert(p - (uchar*)buf == 16);
822 assert(head.reserved == 0);
823 head.headsize = (NDB_BLOB_V2_HEAD_SIZE << 2);
824 DBUG_DUMP(
"info", (uchar*)buf, 16);
826 DBUG_PRINT(
"info", (
"unpack: varsize=%u length=%u pkid=%u",
827 (uint)head.varsize, (uint)head.length, (uint)head.pkid));
832 NdbBlob::packBlobHead()
834 packBlobHead(theHead, theHeadInlineBuf.data, theBlobVersion);
838 NdbBlob::unpackBlobHead()
840 unpackBlobHead(theHead, theHeadInlineBuf.data, theBlobVersion);
846 DBUG_ENTER(
"NdbBlob::getTableKeyValue");
847 Uint32* data = (Uint32*)theKeyBuf.data;
849 for (
unsigned i = 0; i < theTable->m_columns.size(); i++) {
853 unsigned len = c->m_attrSize * c->m_arraySize;
854 if (anOp->getValue_impl(c, (
char*)&data[pos]) == NULL) {
859 while (len % 4 != 0) {
860 char* p = (
char*)&data[pos] + len++;
866 assert(pos == theKeyBuf.size / 4);
874 DBUG_ENTER(
"NdbBlob::setTableKeyValue");
875 DBUG_DUMP(
"info", (uchar*) theKeyBuf.data, 4 * theTable->m_keyLenInWords);
876 const bool isBlobPartOp = (anOp->m_currentTable == theBlobTable);
877 const Uint32* data = (
const Uint32*)theKeyBuf.data;
878 const unsigned columns = theTable->m_columns.size();
882 for (
unsigned i = 0; n < noOfKeys; i++) {
887 unsigned len = c->m_attrSize * c->m_arraySize;
889 c = theBlobTable->getColumn(n);
892 if (anOp->equal_impl(c, (
const char*)&data[pos]) == -1) {
896 pos += (len + 3) / 4;
900 assert(pos == theKeyBuf.size / 4);
907 DBUG_ENTER(
"NdbBlob::setAccessKeyValue");
908 DBUG_DUMP(
"info", (uchar*) theAccessKeyBuf.data,
909 4 * theAccessTable->m_keyLenInWords);
910 const Uint32* data = (
const Uint32*)theAccessKeyBuf.data;
911 const unsigned columns = theAccessTable->m_columns.size();
913 for (
unsigned i = 0; i < columns; i++) {
917 unsigned len = c->m_attrSize * c->m_arraySize;
918 if (anOp->equal_impl(c, (
const char*)&data[pos]) == -1) {
922 pos += (len + 3) / 4;
925 assert(pos == theAccessKeyBuf.size / 4);
930 NdbBlob::setDistKeyValue(
NdbOperation* anOp, Uint32 part)
932 DBUG_ENTER(
"NdbBlob::setDistKeyValue");
933 if (theStripeSize != 0) {
934 Uint32 dist = getDistKey(part);
935 DBUG_PRINT(
"info", (
"dist=%u", dist));
936 if (anOp->
equal(theBtColumnNo[BtColumnDist], dist) == -1)
943 NdbBlob::setPartKeyValue(
NdbOperation* anOp, Uint32 part)
945 DBUG_ENTER(
"NdbBlob::setPartKeyValue");
946 DBUG_PRINT(
"info", (
"part=%u packkey=", part));
947 DBUG_DUMP(
"info", (uchar*) thePackKeyBuf.data, thePackKeyBuf.size);
949 if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
951 if (anOp->
equal(
"PK", thePackKeyBuf.data) == -1 ||
952 anOp->
equal(
"DIST", getDistKey(part)) == -1 ||
953 anOp->
equal(
"PART", part) == -1) {
958 if (setTableKeyValue(anOp) == -1 ||
959 setDistKeyValue(anOp, part) == -1 ||
960 anOp->
equal(theBtColumnNo[BtColumnPart], part) == -1) {
965 setPartPartitionId(anOp);
970 NdbBlob::setPartPkidValue(
NdbOperation* anOp, Uint32 pkid)
972 DBUG_ENTER(
"NdbBlob::setPartPkidValue");
973 DBUG_PRINT(
"info", (
"pkid=%u", pkid));
974 if (unlikely(theBlobVersion == NDB_BLOB_V1))
977 if (anOp->
setValue(theBtColumnNo[BtColumnPkid], pkid) == -1) {
986 NdbBlob::getPartDataValue(
NdbOperation* anOp,
char* buf, Uint16* aLenLoc)
988 DBUG_ENTER(
"NdbBlob::getPartDataValue");
989 assert(aLenLoc != NULL);
990 Uint32 bcNo = theBtColumnNo[BtColumnData];
991 if (theFixedDataFlag) {
992 if (anOp->
getValue(bcNo, buf) == NULL) {
997 *aLenLoc = thePartSize;
1001 if (anOp->getVarValue(bc, buf, aLenLoc) == NULL) {
1011 NdbBlob::setPartDataValue(
NdbOperation* anOp,
const char* buf,
const Uint16& aLen)
1013 DBUG_ENTER(
"NdbBlob::setPartDataValue");
1015 Uint32 bcNo = theBtColumnNo[BtColumnData];
1016 if (theFixedDataFlag) {
1017 if (anOp->
setValue(bcNo, buf) == -1) {
1024 if (anOp->setVarValue(bc, buf, aLen) == -1) {
1035 DBUG_ENTER(
"NdbBlob::getHeadInlineValue");
1040 theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
1041 if (theHeadInlineRecAttr == NULL) {
1045 if (userDefinedPartitioning)
1053 thePartitionIdRecAttr =
1054 anOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
1056 if (thePartitionIdRecAttr == NULL) {
1067 memset(&theHead, 0,
sizeof(theHead));
1073 NdbBlob::getHeadFromRecAttr()
1075 DBUG_ENTER(
"NdbBlob::getHeadFromRecAttr");
1076 assert(theHeadInlineRecAttr != NULL);
1077 theNullFlag = theHeadInlineRecAttr->
isNULL();
1078 assert(theEventBlobVersion >= 0 || theNullFlag != -1);
1079 if (theNullFlag == 0) {
1081 theLength = theHead.length;
1085 if (theEventBlobVersion == -1) {
1086 if (userDefinedPartitioning)
1091 Uint32
id = thePartitionIdRecAttr->
u_32_value();
1092 DBUG_PRINT(
"info", (
"table partition id: %u",
id));
1093 if (thePartitionId == noPartitionId()) {
1094 DBUG_PRINT(
"info", (
"discovered here"));
1095 thePartitionId =
id;
1097 assert(thePartitionId ==
id);
1102 assert(thePartitionIdRecAttr == NULL);
1106 DBUG_PRINT(
"info", (
"theNullFlag=%d theLength=%llu",
1107 theNullFlag, theLength));
1112 NdbBlob::prepareSetHeadInlineValue()
1114 theHead.length = theLength;
1115 if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
1116 if (theLength < theInlineSize)
1117 memset(theInlineData + theLength, 0,
size_t(theInlineSize - theLength));
1120 if (theLength < theInlineSize)
1121 theHead.varsize = (theHeadSize - 2) + Uint32(theLength);
1123 theHead.varsize = (theHeadSize - 2) + theInlineSize;
1127 theHeadInlineUpdateFlag =
false;
1128 assert(theNullFlag != -1);
1134 DBUG_ENTER(
"NdbBlob::setHeadInlineValue");
1135 prepareSetHeadInlineValue();
1136 const char* aValue = theNullFlag ? 0 : theHeadInlineBuf.data;
1137 if (anOp->
setValue(theColumn, aValue) == -1) {
1149 DBUG_ENTER(
"NdbBlob::getValue");
1150 DBUG_PRINT(
"info", (
"data=%p bytes=%u", data, bytes));
1151 if (! isReadOp() && ! isScanOp()) {
1152 setErrorCode(NdbBlobImpl::ErrCompat);
1155 if (theGetFlag || theState != Prepared) {
1156 setErrorCode(NdbBlobImpl::ErrState);
1159 if (data == NULL && bytes != 0) {
1160 setErrorCode(NdbBlobImpl::ErrUsage);
1164 theGetBuf =
static_cast<char*
>(data);
1165 theGetSetBytes = bytes;
1172 DBUG_ENTER(
"NdbBlob::setValue");
1173 DBUG_PRINT(
"info", (
"data=%p bytes=%u", data, bytes));
1174 if (isReadOnlyOp()) {
1175 setErrorCode(NdbBlobImpl::ErrCompat);
1178 if (theSetFlag || theState != Prepared) {
1179 setErrorCode(NdbBlobImpl::ErrState);
1182 if (data == NULL && bytes != 0) {
1183 setErrorCode(NdbBlobImpl::ErrUsage);
1187 theSetBuf =
static_cast<const char*
>(data);
1188 theGetSetBytes = bytes;
1191 if (theSetBuf != NULL) {
1192 Uint32 n = theGetSetBytes;
1193 if (n > theInlineSize)
1195 assert(thePos == 0);
1196 if (writeDataPrivate(theSetBuf, n) == -1)
1208 if (!theNdbRecordFlag)
1210 if (setHeadInlineValue(theNdbOp) == -1)
1222 DBUG_ENTER(
"NdbBlob::setActiveHook");
1223 DBUG_PRINT(
"info", (
"hook=%p arg=%p", (
void*)&activeHook, arg));
1224 if (theState != Prepared) {
1225 setErrorCode(NdbBlobImpl::ErrState);
1228 theActiveHook = activeHook;
1229 theActiveHookArg = arg;
1236 NdbBlob::getDefined(
int& isNull)
1238 DBUG_ENTER(
"NdbBlob::getDefined");
1239 if (theState == Prepared && theSetFlag) {
1240 isNull = (theSetBuf == NULL);
1243 isNull = theNullFlag;
1248 NdbBlob::getNull(
bool& isNull)
1250 DBUG_ENTER(
"NdbBlob::getNull");
1251 if (theState == Prepared && theSetFlag) {
1252 isNull = (theSetBuf == NULL);
1255 if (theNullFlag == -1) {
1256 setErrorCode(NdbBlobImpl::ErrState);
1259 isNull = theNullFlag;
1264 NdbBlob::getNull(
int& isNull)
1266 DBUG_ENTER(
"NdbBlob::getNull");
1267 if (theState == Prepared && theSetFlag) {
1268 isNull = (theSetBuf == NULL);
1271 isNull = theNullFlag;
1272 if (isNull == -1 && theEventBlobVersion == -1) {
1273 setErrorCode(NdbBlobImpl::ErrState);
1276 DBUG_PRINT(
"info", (
"isNull=%d", isNull));
1283 DBUG_ENTER(
"NdbBlob::setNull");
1284 if (isReadOnlyOp()) {
1285 setErrorCode(NdbBlobImpl::ErrCompat);
1288 if (theNullFlag == -1) {
1289 if (theState == Prepared) {
1292 setErrorCode(NdbBlobImpl::ErrState);
1297 if (deletePartsThrottled(0, getPartCount()) == -1)
1301 theHeadInlineUpdateFlag =
true;
1308 DBUG_ENTER(
"NdbBlob::getLength");
1309 if (theState == Prepared && theSetFlag) {
1310 len = theGetSetBytes;
1313 if (theNullFlag == -1) {
1314 setErrorCode(NdbBlobImpl::ErrState);
1324 DBUG_ENTER(
"NdbBlob::truncate");
1325 DBUG_PRINT(
"info", (
"length old=%llu new=%llu", theLength, length));
1326 if (isReadOnlyOp()) {
1327 setErrorCode(NdbBlobImpl::ErrCompat);
1330 if (theNullFlag == -1) {
1331 setErrorCode(NdbBlobImpl::ErrState);
1334 if (theLength > length) {
1335 if (length > theInlineSize) {
1336 Uint32 part1 = getPartNumber(length - 1);
1337 Uint32 part2 = getPartNumber(theLength - 1);
1338 assert(part2 >= part1);
1339 if (part2 > part1 && deletePartsThrottled(part1 + 1, part2 - part1) == -1)
1341 Uint32 off = getPartOffset(length);
1343 assert(off < thePartSize);
1347 if (executePendingBlobWrites() == -1)
1350 if (readPart(thePartBuf.data, part1, len) == -1)
1352 if (executePendingBlobReads() == -1)
1355 DBUG_PRINT(
"info", (
"part %u length old=%u new=%u",
1356 part1, (Uint32)len, off));
1357 if (theFixedDataFlag)
1358 memset(thePartBuf.data + off, theFillChar, thePartSize - off);
1359 if (updatePart(thePartBuf.data, part1, off) == -1)
1363 if (deletePartsThrottled(0, getPartCount()) == -1)
1367 theHeadInlineUpdateFlag =
true;
1368 if (thePos > length)
1377 DBUG_ENTER(
"NdbBlob::getPos");
1378 if (theNullFlag == -1) {
1379 setErrorCode(NdbBlobImpl::ErrState);
1389 DBUG_ENTER(
"NdbBlob::setPos");
1390 DBUG_PRINT(
"info", (
"this=%p pos=%llu",
this, pos));
1391 if (theNullFlag == -1) {
1392 setErrorCode(NdbBlobImpl::ErrState);
1395 if (pos > theLength) {
1396 setErrorCode(NdbBlobImpl::ErrSeek);
1408 DBUG_ENTER(
"NdbBlob::readData");
1409 if (unlikely(theState != Active)) {
1410 setErrorCode(NdbBlobImpl::ErrState);
1413 char* buf =
static_cast<char*
>(data);
1414 int ret = readDataPrivate(buf, bytes);
1419 NdbBlob::readDataPrivate(
char* buf, Uint32& bytes)
1421 DBUG_ENTER(
"NdbBlob::readDataPrivate");
1422 DBUG_PRINT(
"info", (
"bytes=%u thePos=%u theLength=%u",
1423 bytes, (Uint32)thePos, (Uint32)theLength));
1424 assert(thePos <= theLength);
1425 Uint64 pos = thePos;
1426 if (bytes > theLength - pos)
1427 bytes = Uint32(theLength - pos);
1431 if (pos < theInlineSize) {
1432 Uint32 n = theInlineSize - Uint32(pos);
1435 memcpy(buf, theInlineData + pos, n);
1441 if (unlikely(len > 0 && thePartSize == 0)) {
1442 setErrorCode(NdbBlobImpl::ErrSeek);
1446 assert(pos >= theInlineSize);
1447 Uint32 off = (pos - theInlineSize) % thePartSize;
1450 DBUG_PRINT(
"info", (
"partial first block pos=%llu len=%u", pos, len));
1451 Uint32 part = getPartNumber(pos);
1453 if (readPart(thePartBuf.data, part, sz) == -1)
1456 if (executePendingBlobReads() == -1)
1459 Uint32 n = sz - off;
1462 memcpy(buf, thePartBuf.data + off, n);
1469 assert((pos - theInlineSize) % thePartSize == 0);
1471 if (len >= thePartSize) {
1472 Uint32 part = getPartNumber(pos);
1473 Uint32 count = len / thePartSize;
1477 Uint32 partsThisTrip = count;
1478 if (theEventBlobVersion == -1)
1481 const Uint32 remainingQuota =
1482 theNdbCon->maxPendingBlobReadBytes -
1483 MIN(theNdbCon->maxPendingBlobReadBytes, theNdbCon->pendingBlobReadBytes);
1484 const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1);
1485 partsThisTrip= MIN(count, maxPartsThisTrip);
1488 if (readParts(buf, part, partsThisTrip) == -1)
1490 Uint32 n = thePartSize * partsThisTrip;
1495 part += partsThisTrip;
1496 count -= partsThisTrip;
1500 if (executePendingBlobReads() == -1)
1503 }
while (count != 0);
1508 DBUG_PRINT(
"info", (
"partial last block pos=%llu len=%u", pos, len));
1509 assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
1510 Uint32 part = getPartNumber(pos);
1512 if (readPart(thePartBuf.data, part, sz) == -1)
1515 if (executePendingBlobReads() == -1)
1518 memcpy(buf, thePartBuf.data, len);
1526 assert(thePos <= theLength);
1533 DBUG_ENTER(
"NdbBlob::writeData");
1534 if (unlikely(isReadOnlyOp())) {
1535 setErrorCode(NdbBlobImpl::ErrCompat);
1538 if (unlikely(theState != Active)) {
1539 setErrorCode(NdbBlobImpl::ErrState);
1542 const char* buf =
static_cast<const char*
>(data);
1543 int ret = writeDataPrivate(buf, bytes);
1548 NdbBlob::writeDataPrivate(
const char* buf, Uint32 bytes)
1550 DBUG_ENTER(
"NdbBlob::writeDataPrivate");
1551 DBUG_PRINT(
"info", (
"pos=%llu bytes=%u", thePos, bytes));
1552 assert(thePos <= theLength);
1553 Uint64 pos = thePos;
1557 theNullFlag =
false;
1558 theHeadInlineUpdateFlag =
true;
1562 if (pos < theInlineSize) {
1563 Uint32 n = theInlineSize - Uint32(pos);
1566 memcpy(theInlineData + pos, buf, n);
1567 theHeadInlineUpdateFlag =
true;
1573 if (unlikely(len > 0 && thePartSize == 0)) {
1574 setErrorCode(NdbBlobImpl::ErrSeek);
1578 assert(pos >= theInlineSize);
1579 Uint32 off = (pos - theInlineSize) % thePartSize;
1582 DBUG_PRINT(
"info", (
"partial first block pos=%llu len=%u", pos, len));
1584 if (executePendingBlobWrites() == -1)
1586 Uint32 part = getPartNumber(pos);
1588 if (readPart(thePartBuf.data, part, sz) == -1)
1591 if (executePendingBlobReads() == -1)
1593 DBUG_PRINT(
"info", (
"part len=%u", (Uint32)sz));
1595 Uint32 n = thePartSize - off;
1599 if (pos + n > theLength) {
1603 memcpy(thePartBuf.data + off, buf, n);
1604 if (updatePart(thePartBuf.data, part, newsz) == -1)
1612 assert((pos - theInlineSize) % thePartSize == 0);
1614 if (len >= thePartSize) {
1615 Uint32 part = getPartNumber(pos);
1616 Uint32 count = len / thePartSize;
1617 for (
unsigned i = 0; i < count; i++) {
1618 if (part + i < getPartCount()) {
1619 if (updateParts(buf, part + i, 1) == -1)
1622 if (insertParts(buf, part + i, 1) == -1)
1625 Uint32 n = thePartSize;
1629 if (theNdbCon->pendingBlobWriteBytes >
1630 theNdbCon->maxPendingBlobWriteBytes)
1633 if (executePendingBlobWrites() == -1)
1643 DBUG_PRINT(
"info", (
"partial last block pos=%llu len=%u", pos, len));
1644 assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
1645 Uint32 part = getPartNumber(pos);
1646 if (theLength > pos + len) {
1648 if (executePendingBlobWrites() == -1)
1651 if (readPart(thePartBuf.data, part, sz) == -1)
1654 if (executePendingBlobReads() == -1)
1656 memcpy(thePartBuf.data, buf, len);
1658 if (updatePart(thePartBuf.data, part, sz) == -1)
1661 memcpy(thePartBuf.data, buf, len);
1662 if (theFixedDataFlag) {
1663 memset(thePartBuf.data + len, theFillChar, thePartSize - len);
1666 if (part < getPartCount()) {
1667 if (updatePart(thePartBuf.data, part, sz) == -1)
1670 if (insertPart(thePartBuf.data, part, sz) == -1)
1680 if (theLength < pos) {
1682 theHeadInlineUpdateFlag =
true;
1685 assert(thePos <= theLength);
1702 NdbBlob::readParts(
char* buf, Uint32 part, Uint32 count)
1704 DBUG_ENTER(
"NdbBlob::readParts");
1705 DBUG_PRINT(
"info", (
"part=%u count=%u", part, count));
1706 if (theEventBlobVersion == -1) {
1707 if (readTableParts(buf, part, count) == -1)
1710 if (readEventParts(buf, part, count) == -1)
1717 NdbBlob::readPart(
char* buf, Uint32 part, Uint16& len)
1719 DBUG_ENTER(
"NdbBlob::readPart");
1720 DBUG_PRINT(
"info", (
"part=%u", part));
1721 if (theEventBlobVersion == -1) {
1722 if (readTablePart(buf, part, len) == -1)
1725 if (readEventPart(buf, part, len) == -1)
1728 DBUG_PRINT(
"info", (
"part=%u len=%u", part, (Uint32)len));
1733 NdbBlob::readTableParts(
char* buf, Uint32 part, Uint32 count)
1735 DBUG_ENTER(
"NdbBlob::readTableParts");
1739 if (readTablePart(buf + n * thePartSize, part + n, thePartLen) == -1)
1747 NdbBlob::readTablePart(
char* buf, Uint32 part, Uint16& len)
1749 DBUG_ENTER(
"NdbBlob::readTablePart");
1758 setPartKeyValue(tOp, part) == -1 ||
1759 getPartDataValue(tOp, buf, &len) == -1) {
1767 theNdbCon->pendingBlobReadBytes += len;
1772 NdbBlob::readEventParts(
char* buf, Uint32 part, Uint32 count)
1774 DBUG_ENTER(
"NdbBlob::readEventParts");
1776 if (theEventOp->readBlobParts(buf,
this, part, count, (Uint16*)0) == -1) {
1777 setErrorCode(theEventOp);
1784 NdbBlob::readEventPart(
char* buf, Uint32 part, Uint16& len)
1786 DBUG_ENTER(
"NdbBlob::readEventPart");
1787 if (theEventOp->readBlobParts(buf,
this, part, 1, &len) == -1) {
1788 setErrorCode(theEventOp);
1795 NdbBlob::insertParts(
const char* buf, Uint32 part, Uint32 count)
1797 DBUG_ENTER(
"NdbBlob::insertParts");
1798 DBUG_PRINT(
"info", (
"part=%u count=%u", part, count));
1802 thePartLen = thePartSize;
1803 if (insertPart(buf + n * thePartSize, part + n, thePartLen) == -1)
1811 NdbBlob::insertPart(
const char* buf, Uint32 part,
const Uint16& len)
1813 DBUG_ENTER(
"NdbBlob::insertPart");
1814 DBUG_PRINT(
"info", (
"part=%u len=%u", part, (Uint32)len));
1818 setPartKeyValue(tOp, part) == -1 ||
1819 setPartPkidValue(tOp, theHead.pkid) == -1 ||
1820 setPartDataValue(tOp, buf, len) == -1) {
1828 theNdbCon->pendingBlobWriteBytes += len;
1833 NdbBlob::updateParts(
const char* buf, Uint32 part, Uint32 count)
1835 DBUG_ENTER(
"NdbBlob::updateParts");
1836 DBUG_PRINT(
"info", (
"part=%u count=%u", part, count));
1840 thePartLen = thePartSize;
1841 if (updatePart(buf + n * thePartSize, part + n, thePartLen) == -1)
1849 NdbBlob::updatePart(
const char* buf, Uint32 part,
const Uint16& len)
1851 DBUG_ENTER(
"NdbBlob::updatePart");
1852 DBUG_PRINT(
"info", (
"part=%u len=%u", part, (Uint32)len));
1856 setPartKeyValue(tOp, part) == -1 ||
1857 setPartPkidValue(tOp, theHead.pkid) == -1 ||
1858 setPartDataValue(tOp, buf, len) == -1) {
1866 theNdbCon->pendingBlobWriteBytes += len;
1871 NdbBlob::deletePartsThrottled(Uint32 part, Uint32 count)
1873 DBUG_ENTER(
"NdbBlob::deletePartsThrottled");
1874 DBUG_PRINT(
"info", (
"part=%u count=%u maxPendingBlobWriteBytes=%u",
1875 part, count, theNdbCon->maxPendingBlobWriteBytes));
1882 const Uint32 remainingQuota =
1883 theNdbCon->maxPendingBlobWriteBytes -
1884 MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
1885 const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1);
1886 const Uint32 partsThisTrip= MIN(count, maxPartsThisTrip);
1888 int rc = deleteParts(part, partsThisTrip);
1892 part+= partsThisTrip;
1893 count-= partsThisTrip;
1898 if (executePendingBlobWrites() == -1)
1901 }
while (count != 0);
1908 NdbBlob::deleteParts(Uint32 part, Uint32 count)
1910 DBUG_ENTER(
"NdbBlob::deleteParts");
1911 DBUG_PRINT(
"info", (
"part=%u count=%u", part, count));
1917 setPartKeyValue(tOp, part + n) == -1) {
1926 theNdbCon->pendingBlobWriteBytes += thePartSize;
1936 NdbBlob::deletePartsUnknown(Uint32 part)
1938 DBUG_ENTER(
"NdbBlob::deletePartsUnknown");
1939 DBUG_PRINT(
"info", (
"part=%u count=all", part));
1940 if (thePartSize == 0)
1942 static const unsigned maxbat = 256;
1943 static const unsigned minbat = 1;
1944 unsigned bat = minbat;
1951 Uint32 remainingQuota = theNdbCon->maxPendingBlobWriteBytes -
1952 MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
1953 Uint32 deleteQuota = MAX(remainingQuota / thePartSize, 1);
1954 bat = MIN(deleteQuota, bat);
1960 setPartKeyValue(tOp, part + count + n) == -1) {
1965 tOp->m_noErrorPropagation =
true;
1966 theNdbCon->pendingBlobWriteBytes += thePartSize;
1969 DBUG_PRINT(
"info", (
"bat=%u", bat));
1975 if (tOp->theError.
code != 0) {
1976 if (tOp->theError.
code != 626) {
1981 DBUG_PRINT(
"info", (
"count=%u", count));
1996 NdbBlob::executePendingBlobReads()
1998 DBUG_ENTER(
"NdbBlob::executePendingBlobReads");
2000 if (thePendingBlobOps & flags) {
2003 thePendingBlobOps = 0;
2004 theNdbCon->thePendingBlobOps = 0;
2010 NdbBlob::executePendingBlobWrites()
2012 DBUG_ENTER(
"NdbBlob::executePendingBlobWrites");
2014 if (thePendingBlobOps & flags) {
2017 thePendingBlobOps = 0;
2018 theNdbCon->thePendingBlobOps = 0;
2026 NdbBlob::invokeActiveHook()
2028 DBUG_ENTER(
"NdbBlob::invokeActiveHook");
2029 assert(theState == Active && theActiveHook != NULL);
2030 int ret = (*theActiveHook)(
this, theActiveHookArg);
2049 DBUG_ENTER(
"NdbBlob::atPrepare");
2050 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p version=%d fixed data=%d",
2051 this, theNdbOp, theNdbCon,
2052 theBlobVersion, theFixedDataFlag));
2053 if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2060 theNdbRecordFlag= isScanOp();
2063 bool supportedOp =
false;
2067 Uint32* data = (Uint32*)thePackKeyBuf.data;
2068 Uint32
size = theTable->m_keyLenInWords;
2069 if (theNdbOp->getKeyFromTCREQ(data,
size) == -1) {
2070 setErrorCode(NdbBlobImpl::ErrUsage);
2073 thePackKeyBuf.size = 4 *
size;
2074 thePackKeyBuf.zerorest();
2075 if (unpackKeyValue(theTable, theKeyBuf) == -1)
2080 Uint32* data = (Uint32*)thePackKeyBuf.data;
2081 Uint32
size = theAccessTable->m_keyLenInWords;
2082 if (theNdbOp->getKeyFromTCREQ(data,
size) == -1) {
2083 setErrorCode(NdbBlobImpl::ErrUsage);
2086 thePackKeyBuf.size = 4 *
size;
2087 thePackKeyBuf.zerorest();
2088 if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
2096 if (! supportedOp) {
2097 setErrorCode(NdbBlobImpl::ErrUsage);
2112 assert(theState == Idle);
2115 theNdb = anOp->theNdb;
2118 theTable = anOp->m_currentTable;
2119 theAccessTable = anOp->m_accessTable;
2120 theColumn = aColumn;
2122 if (prepareColumn() == -1)
2125 NdbDictionary::Object::UserDefined);
2130 if (userDefinedPartitioning &&
2131 theNdbOp->theDistrKeyIndicator_) {
2132 thePartitionId = theNdbOp->getPartitionId();
2133 DBUG_PRINT(
"info", (
"op partition id: %u", thePartitionId));
2136 theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
2137 theHeadInlineCopyBuf.alloc(getHeadInlineSize());
2145 assert(! theNdbOp->m_blob_lock_upgraded);
2147 theNdbOp->m_blob_lock_upgraded =
true;
2151 assert(theNdbOp->theLockHandle == NULL);
2156 if (likely(theNdb->getMinDbNodeVersion() >=
2157 NDBD_UNLOCK_OP_SUPPORTED))
2163 if (theNdbOp->m_attribute_record)
2166 int rc = theNdbOp->prepareGetLockHandleNdbRecord();
2169 setErrorCode(rc,
true);
2176 int rc = theNdbOp->getLockHandleImpl();
2179 setErrorCode(rc,
true);
2187 if (getHeadInlineValue(theNdbOp) == -1)
2199 theHeadInlineUpdateFlag =
true;
2209 if (sop->m_scanUsingOldApi)
2217 assert(! theNdbOp->m_blob_lock_upgraded);
2219 theNdbOp->m_blob_lock_upgraded =
true;
2231 assert(! theNdbOp->m_blob_lock_upgraded);
2233 theNdbOp->m_blob_lock_upgraded =
true;
2238 if (getHeadInlineValue(sop) == -1)
2249 const NdbRecord *key_record,
const char *key_row)
2252 DBUG_ENTER(
"NdbBlob::atPrepareNdbRecord");
2253 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p",
this, anOp, aCon));
2255 theNdbRecordFlag=
true;
2256 if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2263 res= copyKeyFromRow(key_record, key_row, thePackKeyBuf, theKeyBuf);
2265 if (theNdbOp->theLockHandle)
2271 theNdbOp->theLockHandle->m_openBlobCount++;
2274 else if (isIndexOp())
2275 res= copyKeyFromRow(key_record, key_row, thePackKeyBuf, theAccessKeyBuf);
2285 const char *keyinfo, Uint32 keyinfo_bytes)
2287 DBUG_ENTER(
"NdbBlob::atPrepareNdbRecordTakeover");
2288 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p",
this, anOp, aCon));
2290 theNdbRecordFlag=
true;
2291 if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2297 if (keyinfo_bytes > thePackKeyBuf.maxsize)
2302 memcpy(thePackKeyBuf.data, keyinfo, keyinfo_bytes);
2303 thePackKeyBuf.size= keyinfo_bytes;
2304 thePackKeyBuf.zerorest();
2305 if (unpackKeyValue(theTable, theKeyBuf) == -1)
2308 if (theNdbOp->theLockHandle)
2314 theNdbOp->theLockHandle->m_openBlobCount++;
2325 DBUG_ENTER(
"NdbBlob::atPrepareNdbRecordScan");
2326 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p",
this, anOp, aCon));
2328 theNdbRecordFlag=
true;
2329 if (atPrepareCommon(aCon, anOp, aColumn) == -1)
2340 DBUG_ENTER(
"NdbBlob::atPrepare [event]");
2341 assert(theState == Idle);
2343 assert(version == 0 || version == 1);
2344 theEventBlobVersion = version;
2346 theNdb = anOp->m_ndb;
2348 theBlobEventOp = aBlobOp;
2349 theTable = anOp->m_eventImpl->m_tableImpl;
2350 theAccessTable = theTable;
2351 theColumn = aColumn;
2353 if (prepareColumn() == -1)
2355 DBUG_PRINT(
"info", (
"this=%p main op=%p blob op=%p version=%d fixed data=%d",
2356 this, anOp, aBlobOp,
2357 theBlobVersion, theFixedDataFlag));
2359 assert((theBlobEventOp == NULL) == (theBlobTable == NULL));
2361 theBlobEventDataBuf.alloc(theVarsizeBytes + thePartSize);
2363 theHeadInlineRecAttr = theEventOp->getValue(aColumn, theHeadInlineBuf.data, version);
2364 if (theHeadInlineRecAttr == NULL) {
2365 setErrorCode(theEventOp);
2369 if (theBlobEventOp != NULL) {
2373 if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
2374 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPk]);
2375 buf = thePackKeyBuf.data;
2376 theBlobEventPkRecAttr = theBlobEventOp->getValue(bc, buf, version);
2378 assert(theStripeSize != 0);
2379 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnDist]);
2380 buf = (
char*)&theBlobEventDistValue;
2381 theBlobEventDistRecAttr = theBlobEventOp->getValue(bc, buf, version);
2383 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPart]);
2384 buf = (
char*)&theBlobEventPartValue;
2385 theBlobEventPartRecAttr = theBlobEventOp->getValue(bc, buf, version);
2387 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnData]);
2388 buf = theBlobEventDataBuf.data;
2389 theBlobEventDataRecAttr = theBlobEventOp->getValue(bc, buf, version);
2391 theBlobEventPkRecAttr == NULL ||
2392 theBlobEventDistRecAttr == NULL ||
2393 theBlobEventPartRecAttr == NULL ||
2394 theBlobEventDataRecAttr == NULL
2396 setErrorCode(theBlobEventOp);
2400 const uint columns = theTable->m_columns.size();
2404 for (i = 0; n < noOfKeys; i++) {
2405 assert(i < columns);
2409 bc = theBlobTable->m_columns[
n];
2410 assert(bc != NULL && bc->m_pk);
2412 ra = theBlobEventOp->getValue(bc, (
char*)0, version);
2413 if (unlikely(ra == NULL)) {
2414 setErrorCode(theBlobEventOp);
2420 if (theStripeSize != 0) {
2421 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnDist]);
2422 buf = (
char*)&theBlobEventDistValue;
2423 theBlobEventDistRecAttr = theBlobEventOp->getValue(bc, buf, version);
2426 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPart]);
2427 buf = (
char*)&theBlobEventPartValue;
2428 theBlobEventPartRecAttr = theBlobEventOp->getValue(bc, buf, version);
2430 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnPkid]);
2431 buf = (
char*)&theBlobEventPkidValue;
2432 theBlobEventPkidRecAttr = theBlobEventOp->getValue(bc, buf, version);
2434 bc = theBlobTable->getColumn(theBtColumnNo[BtColumnData]);
2435 buf = theBlobEventDataBuf.data;
2436 theBlobEventDataRecAttr = theBlobEventOp->getValue(bc, buf, version);
2438 (theStripeSize != 0 && theBlobEventDistRecAttr == NULL) ||
2439 theBlobEventPartRecAttr == NULL ||
2440 theBlobEventPkidRecAttr == NULL ||
2441 theBlobEventDataRecAttr == NULL
2443 setErrorCode(theBlobEventOp);
2453 NdbBlob::prepareColumn()
2455 DBUG_ENTER(
"prepareColumn");
2458 theBlobVersion = theColumn->getBlobVersion();
2463 if (unlikely(theBlobVersion == NDB_BLOB_V1)) {
2464 theFixedDataFlag =
true;
2465 theHeadSize = (NDB_BLOB_V1_HEAD_SIZE << 2);
2466 theVarsizeBytes = 0;
2467 switch (theColumn->
getType()) {
2477 setErrorCode(NdbBlobImpl::ErrUsage);
2481 assert(!(thePartSize != 0 && theStripeSize == 0));
2482 theBtColumnNo[BtColumnPk] = 0;
2483 theBtColumnNo[BtColumnDist] = 1;
2484 theBtColumnNo[BtColumnPart] = 2;
2485 theBtColumnNo[BtColumnData] = 3;
2486 }
else if (theBlobVersion == NDB_BLOB_V2) {
2487 const Uint32 storageType = (Uint32)theColumn->getStorageType();
2488 theFixedDataFlag = (storageType != NDB_STORAGETYPE_MEMORY);
2489 theHeadSize = (NDB_BLOB_V2_HEAD_SIZE << 2);
2490 theVarsizeBytes = 2;
2491 switch (theColumn->
getType()) {
2493 if (theFixedDataFlag) {
2500 if (theFixedDataFlag) {
2507 setErrorCode(NdbBlobImpl::ErrUsage);
2511 if (theStripeSize != 0) {
2512 theBtColumnNo[BtColumnDist] = off;
2515 theBtColumnNo[BtColumnPart] = off + 0;
2516 theBtColumnNo[BtColumnPkid] = off + 1;
2517 theBtColumnNo[BtColumnData] = off + 2;
2519 setErrorCode(NdbBlobImpl::ErrUsage);
2523 assert(theColumn->m_attrSize * theColumn->m_arraySize == getHeadInlineSize());
2524 if (thePartSize > 0) {
2527 if ((bt = theColumn->m_blobTable) == NULL ||
2528 (bc = bt->getColumn(theBtColumnNo[BtColumnData])) == NULL ||
2531 setErrorCode(NdbBlobImpl::ErrTable);
2535 theBlobTable = &NdbTableImpl::getImpl(*bt);
2538 theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
2539 thePackKeyBuf.alloc(MAX(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
2540 theHeadInlineBuf.alloc(getHeadInlineSize());
2541 theInlineData = theHeadInlineBuf.data + theHeadSize;
2543 thePartBuf.alloc(thePartSize);
2578 DBUG_ENTER(
"NdbBlob::preExecute");
2579 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p",
this, theNdbOp, theNdbCon));
2580 DBUG_PRINT(
"info", (
"optype=%d theGetSetBytes=%d theSetFlag=%d",
2581 theNdbOp->theOperationType,
2584 if (theState == Invalid)
2586 assert(theState == Prepared);
2593 if (!theColumn->m_nullable &&
2594 (isInsertOp() || isWriteOp()) &&
2603 if (theGetFlag && theGetSetBytes > theInlineSize) {
2611 if (isInsertOp() && theSetFlag) {
2629 theSetValueInPreExecFlag =
2631 ((theGetSetBytes <= theInlineSize) ||
2632 ((theGetSetBytes - theInlineSize) <=
2633 (theNdbCon->maxPendingBlobWriteBytes -
2634 MIN(theNdbCon->maxPendingBlobWriteBytes,
2635 theNdbCon->pendingBlobWriteBytes))
2638 if (theSetValueInPreExecFlag)
2641 (
"Insert extra ops added in preExecute"));
2645 if (theGetSetBytes > theInlineSize) {
2647 assert(theSetBuf != NULL);
2648 const char* buf = theSetBuf + theInlineSize;
2649 Uint32 bytes = theGetSetBytes - theInlineSize;
2650 assert(thePos == theInlineSize);
2651 Uint32 savePendingBlobWriteBytes = theNdbCon->pendingBlobWriteBytes;
2652 if (writeDataPrivate(buf, bytes) == -1)
2655 assert(theNdbCon->pendingBlobWriteBytes >
2656 savePendingBlobWriteBytes);
2659 if (theHeadInlineUpdateFlag)
2664 setTableKeyValue(tOp) == -1 ||
2665 setHeadInlineValue(tOp) == -1) {
2666 setErrorCode(NdbBlobImpl::ErrAbort);
2669 setHeadPartitionId(tOp);
2671 DBUG_PRINT(
"info", (
"Insert : added op to update head+inline in preExecute"));
2677 (
"Insert waiting for Blob head insert"));
2687 if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
2706 setTableKeyValue(tOp) == -1 ||
2707 getHeadInlineValue(tOp) == -1) {
2711 setHeadPartitionId(tOp);
2716 tOp->m_noErrorPropagation =
true;
2718 theHeadInlineReadOp = tOp;
2733 DBUG_PRINT(
"info", (
"added op before to read head+inline"));
2738 NdbBlob* tFirstBlob = theNdbOp->theBlobList;
2739 if (
this == tFirstBlob) {
2741 if (g_ndb_blob_ok_to_read_index_table) {
2746 assert(!userDefinedPartitioning);
2751 setAccessKeyValue(tOp) == -1 ||
2752 tOp->
getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
2760 setAccessKeyValue(tOp) == -1 ||
2761 getTableKeyValue(tOp) == -1) {
2765 if (userDefinedPartitioning && isWriteOp())
2772 thePartitionIdRecAttr = tOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
2774 if (thePartitionIdRecAttr == NULL) {
2780 DBUG_PRINT(
"info", (
"Index op : added op before to read table key"));
2782 if (isUpdateOp() || isDeleteOp()) {
2787 setAccessKeyValue(tOp) == -1 ||
2788 getHeadInlineValue(tOp) == -1) {
2792 theHeadInlineReadOp = tOp;
2797 DBUG_PRINT(
"info", (
"added index op before to read head+inline"));
2810 if (theSetBuf != NULL) {
2811 Uint32 n = theGetSetBytes;
2812 if (n > theInlineSize)
2814 assert(thePos == 0);
2815 if (writeDataPrivate(theSetBuf, n) == -1)
2823 if (!theNdbRecordFlag)
2825 if (setHeadInlineValue(theNdbOp) == -1)
2842 setTableKeyValue(tOp) == -1 ||
2843 setHeadInlineValue(tOp) == -1) {
2844 setErrorCode(NdbBlobImpl::ErrAbort);
2847 setHeadPartitionId(tOp);
2849 DBUG_PRINT(
"info", (
"NdbRecord table write : added op to update head+inline"));
2856 theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
2859 if (theActiveHook != NULL) {
2863 DBUG_PRINT(
"info", (
"batch=%u", batch));
2900 DBUG_ENTER(
"NdbBlob::postExecute");
2901 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p anExecType=%u",
this, theNdbOp, theNdbCon, anExecType));
2902 if (theState == Closed)
2904 if (theState == Invalid)
2906 if (theState == Active) {
2908 DBUG_PRINT(
"info", (
"skip active"));
2911 assert(theState == Prepared);
2915 NdbBlob* tFirstBlob = theNdbOp->theBlobList;
2916 if (
this == tFirstBlob) {
2917 packKeyValue(theTable, theKeyBuf);
2920 theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
2921 thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
2922 thePackKeyBuf.zerorest();
2930 getHeadFromRecAttr();
2935 assert(theGetSetBytes == 0 || theGetBuf != 0);
2936 assert(theGetSetBytes <= theInlineSize ||
2938 Uint32 bytes = theGetSetBytes;
2939 if (readDataPrivate(theGetBuf, bytes) == -1)
2943 if (isInsertOp() && theSetFlag) {
2953 if (! theSetValueInPreExecFlag)
2955 DBUG_PRINT(
"info", (
"Insert adding extra ops"));
2959 if (theNdbOp->theError.
code == 0)
2964 if (theGetSetBytes > theInlineSize) {
2966 assert(theSetBuf != NULL);
2967 const char* buf = theSetBuf + theInlineSize;
2968 Uint32 bytes = theGetSetBytes - theInlineSize;
2969 assert(thePos == theInlineSize);
2970 if (writeDataPrivate(buf, bytes) == -1)
2974 if (theHeadInlineUpdateFlag)
2979 setTableKeyValue(tOp) == -1 ||
2980 setHeadInlineValue(tOp) == -1) {
2981 setErrorCode(NdbBlobImpl::ErrAbort);
2984 setHeadPartitionId(tOp);
2986 DBUG_PRINT(
"info", (
"Insert : added op to update head+inline"));
2995 getHeadFromRecAttr();
2998 if (theSetBuf != NULL) {
3001 assert(thePos == 0);
3002 if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1)
3010 if (isWriteOp() && isTableOp()) {
3012 if (theHeadInlineReadOp->theError.
code == 0) {
3013 int tNullFlag = theNullFlag;
3014 Uint64 tLength = theLength;
3015 Uint64 tPos = thePos;
3016 getHeadFromRecAttr();
3017 DBUG_PRINT(
"info", (
"tuple found"));
3021 theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf);
3022 theNullFlag = tNullFlag;
3023 theLength = tLength;
3026 if (theHeadInlineReadOp->theError.
code != 626) {
3027 setErrorCode(theHeadInlineReadOp);
3030 DBUG_PRINT(
"info", (
"tuple not found"));
3036 if (deletePartsUnknown(0) == -1)
3039 if (theSetFlag && theGetSetBytes > theInlineSize) {
3040 assert(theSetBuf != NULL);
3041 const char* buf = theSetBuf + theInlineSize;
3042 Uint32 bytes = theGetSetBytes - theInlineSize;
3043 assert(thePos == theInlineSize);
3044 if (writeDataPrivate(buf, bytes) == -1)
3048 if (isWriteOp() && isIndexOp()) {
3050 if (userDefinedPartitioning)
3058 if (thePartitionIdRecAttr != NULL)
3060 assert(
this == theNdbOp->theBlobList );
3061 Uint32
id= thePartitionIdRecAttr->
u_32_value();
3062 assert(
id != noPartitionId() );
3063 DBUG_PRINT(
"info", (
"Index write, setting partition id to %d",
id));
3069 assert( theNdbOp->theBlobList );
3070 assert(
this != theNdbOp->theBlobList );
3072 thePartitionId= theNdbOp->theBlobList->thePartitionId;
3074 assert(thePartitionId != noPartitionId());
3077 if (deletePartsUnknown(0) == -1)
3079 if (theSetFlag && theGetSetBytes > theInlineSize) {
3080 assert(theSetBuf != NULL);
3081 const char* buf = theSetBuf + theInlineSize;
3082 Uint32 bytes = theGetSetBytes - theInlineSize;
3083 assert(thePos == theInlineSize);
3084 if (writeDataPrivate(buf, bytes) == -1)
3090 getHeadFromRecAttr();
3091 if (deletePartsThrottled(0, getPartCount()) == -1)
3096 if (theActiveHook != NULL) {
3097 if (invokeActiveHook() == -1)
3105 setTableKeyValue(tOp) == -1 ||
3106 setHeadInlineValue(tOp) == -1) {
3107 setErrorCode(NdbBlobImpl::ErrAbort);
3110 setHeadPartitionId(tOp);
3113 DBUG_PRINT(
"info", (
"added op to update head+inline"));
3124 NdbBlob::preCommit()
3126 DBUG_ENTER(
"NdbBlob::preCommit");
3127 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p",
this, theNdbOp, theNdbCon));
3128 if (theState == Closed)
3130 if (theState == Invalid)
3132 if (unlikely((theState == Prepared) &&
3140 assert(theState == Active);
3142 if (isInsertOp() || isUpdateOp() || isWriteOp()) {
3143 if (theHeadInlineUpdateFlag) {
3148 setTableKeyValue(tOp) == -1 ||
3149 setHeadInlineValue(tOp) == -1) {
3150 setErrorCode(NdbBlobImpl::ErrAbort);
3153 setHeadPartitionId(tOp);
3156 DBUG_PRINT(
"info", (
"added op to update head+inline"));
3167 NdbBlob::atNextResult()
3169 DBUG_ENTER(
"NdbBlob::atNextResult");
3170 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p",
this, theNdbOp, theNdbCon));
3171 if (theState == Invalid)
3176 Uint32* data = (Uint32*)thePackKeyBuf.data;
3177 unsigned size = theTable->m_keyLenInWords;
3178 if (tScanOp->getKeyFromKEYINFO20(data,
size) == -1) {
3179 setErrorCode(NdbBlobImpl::ErrUsage);
3182 thePackKeyBuf.size = 4 *
size;
3183 thePackKeyBuf.zerorest();
3184 if (unpackKeyValue(theTable, theKeyBuf) == -1)
3188 DBUG_RETURN(atNextResultCommon());
3196 NdbBlob::atNextResultNdbRecord(
const char *keyinfo, Uint32 keyinfo_bytes)
3198 DBUG_ENTER(
"NdbBlob::atNextResultNdbRecord");
3199 DBUG_PRINT(
"info", (
"this=%p op=%p con=%p keyinfo_bytes=%lu",
3200 this, theNdbOp, theNdbCon,
3201 (
unsigned long)keyinfo_bytes));
3202 if (theState == Invalid)
3206 memcpy(thePackKeyBuf.data, keyinfo, keyinfo_bytes);
3207 thePackKeyBuf.size= keyinfo_bytes;
3208 thePackKeyBuf.zerorest();
3209 if (unpackKeyValue(theTable, theKeyBuf) == -1)
3212 DBUG_RETURN(atNextResultCommon());
3217 NdbBlob::atNextResultCommon()
3219 DBUG_ENTER(
"NdbBlob::atNextResultCommon");
3221 thePartitionId = noPartitionId();
3222 getHeadFromRecAttr();
3226 assert(theGetSetBytes == 0 || theGetBuf != 0);
3227 Uint32 bytes = theGetSetBytes;
3228 if (readDataPrivate(theGetBuf, bytes) == -1)
3233 if (theActiveHook != NULL) {
3234 if (invokeActiveHook() == -1)
3244 NdbBlob::atNextEvent()
3246 DBUG_ENTER(
"NdbBlob::atNextEvent");
3248 SubTableData::getOperation(theEventOp->m_data_item->sdata->requestInfo);
3249 DBUG_PRINT(
"info", (
"this=%p op=%p blob op=%p version=%d optype=%u",
this, theEventOp, theBlobEventOp, theEventBlobVersion, optype));
3250 if (theState == Invalid)
3252 assert(theEventBlobVersion >= 0);
3253 if (optype >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)
3255 getHeadFromRecAttr();
3256 if (theNullFlag == -1)
3275 NdbBlob::setErrorCode(
int anErrorCode,
bool invalidFlag)
3277 DBUG_ENTER(
"NdbBlob::setErrorCode");
3278 DBUG_PRINT(
"info", (
"this=%p code=%u",
this, anErrorCode));
3279 theError.
code = anErrorCode;
3281 if (theNdbOp != NULL && theNdbOp->theError.
code == 0)
3282 theNdbOp->setErrorCode(theError.
code);
3286 if (NdbEnv_GetEnv(
"NDB_BLOB_ABORT_ON_ERROR", (
char*)0, 0)) {
3294 NdbBlob::setErrorCode(
NdbOperation* anOp,
bool invalidFlag)
3297 if (anOp != NULL && (code = anOp->theError.
code) != 0)
3299 else if ((code = theNdbCon->theError.
code) != 0)
3301 else if ((code = theNdb->theError.
code) != 0)
3304 code = NdbBlobImpl::ErrUnknown;
3305 setErrorCode(code, invalidFlag);
3312 if ((code = anOp->m_error.
code) != 0)
3315 code = NdbBlobImpl::ErrUnknown;
3316 setErrorCode(code, invalidFlag);
3324 return theNdbOp->theBlobList;
3342 DBUG_ENTER(
"NdbBlob::close");
3343 DBUG_PRINT(
"info", (
"this=%p state=%u",
this, theState));
3348 if (theState != Active)
3355 if (execPendingBlobOps)
3357 if (thePendingBlobOps != 0)
3361 thePendingBlobOps = 0;
3362 theNdbCon->thePendingBlobOps = 0;
3365 else if (thePendingBlobOps != 0)
3374 if (theNdbOp->theLockHandle)
3377 (
"Decrementing lockhandle Blob ref count to %d",
3378 theNdbOp->theLockHandle->m_openBlobCount -1));
3385 assert(theNdbOp->theLockHandle->m_openBlobCount > 0);
3387 theNdbOp->theLockHandle->m_openBlobCount --;
3390 if (theNdbOp->m_blob_lock_upgraded)
3400 if (likely(theNdbOp->theLockHandle != NULL))
3402 if (theNdbOp->theLockHandle->m_openBlobCount == 0)
3405 (
"Upgraded -> LM_Read lock "
3406 "now no longer required. Issuing unlock "
3411 const NdbOperation* op = theNdbCon->unlock(theNdbOp->theLockHandle,
3414 if (unlikely(op == NULL))
3424 if (unlikely(theNdbCon->releaseLockHandle(theNdbOp->theLockHandle) != 0))
3426 setErrorCode(theNdbCon->theError.
code,
true);