19 #include <AttributeHeader.hpp>
20 #include <signaldata/TcKeyConf.hpp>
21 #include <signaldata/DictTabInfo.hpp>
23 NdbReceiver::NdbReceiver(
Ndb *aNdb) :
28 m_type(NDB_UNINITIALIZED),
30 m_using_ndb_record(false),
31 theFirstRecAttr(NULL),
32 theCurrentRecAttr(NULL),
34 m_current_row(0xffffffff),
38 NdbReceiver::~NdbReceiver()
40 DBUG_ENTER(
"NdbReceiver::~NdbReceiver");
41 if (m_id != NdbObjectIdMap::InvalidId) {
42 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id,
this);
49 NdbReceiver::init(ReceiverType
type,
bool useRec,
void* owner)
51 theMagicNumber = 0x11223344;
53 m_using_ndb_record= useRec;
58 m_record.m_ndb_record= NULL;
59 m_record.m_row_recv= NULL;
60 m_record.m_row_buffer= NULL;
61 m_record.m_row_offset= 0;
62 m_record.m_read_range_no=
false;
64 theFirstRecAttr = NULL;
65 theCurrentRecAttr = NULL;
67 if (m_id == NdbObjectIdMap::InvalidId) {
70 m_id = m_ndb->theImpl->theNdbObjectIdMap.map(
this);
71 if (m_id == NdbObjectIdMap::InvalidId)
86 while (tRecAttr != NULL)
89 tRecAttr = tRecAttr->next();
90 m_ndb->releaseRecAttr(tSaveRecAttr);
92 m_using_ndb_record=
false;
93 theFirstRecAttr = NULL;
94 theCurrentRecAttr = NULL;
98 NdbReceiver::getValue(
const NdbColumnImpl* tAttrInfo,
char * user_dst_ptr){
100 if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
101 if (theFirstRecAttr == NULL)
102 theFirstRecAttr = tRecAttr;
104 theCurrentRecAttr->next(tRecAttr);
105 theCurrentRecAttr = tRecAttr;
106 tRecAttr->next(NULL);
110 m_ndb->releaseRecAttr(tRecAttr);
116 NdbReceiver::getValues(
const NdbRecord*
rec,
char *row_ptr)
118 assert(m_using_ndb_record);
120 m_record.m_ndb_record=
rec;
121 m_record.m_row_recv= row_ptr;
122 m_record.m_row_offset= rec->m_row_size;
126 NdbReceiver::prepareReceive(
char *
buf)
129 assert(theMagicNumber == 0x11223344);
130 m_received_result_length = 0;
131 m_expected_result_length = 0;
132 if (m_using_ndb_record)
134 m_record.m_row_recv=
buf;
136 theCurrentRecAttr = theFirstRecAttr;
140 NdbReceiver::prepareRead(
char *buf, Uint32 rows)
143 assert(theMagicNumber == 0x11223344);
145 m_result_rows = rows;
146 if (m_using_ndb_record)
148 m_record.m_row_buffer =
buf;
152 #define KEY_ATTR_ID (~(Uint32)0)
165 NdbReceiver::calculate_batch_size(
const NdbImpl& theImpl,
171 Uint32& batch_byte_size,
172 Uint32& first_batch_size)
174 const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
175 const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
176 const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
177 const Uint32 max_batch_size= cfg.m_batch_size;
179 Uint32 tot_size= (key_size ? (key_size + 32) : 0);
182 tot_size+= record->m_max_transid_ai_bytes;
186 while (rec_attr != NULL) {
187 Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
188 attr_size= ((attr_size + 4 + 3) >> 2) << 2;
189 tot_size+= attr_size;
190 rec_attr= rec_attr->next();
203 batch_byte_size= max_batch_byte_size;
207 batch_byte_size= batch_size * tot_size;
210 if (batch_byte_size * parallelism > max_scan_batch_size) {
211 batch_byte_size= max_scan_batch_size / parallelism;
213 batch_size= batch_byte_size / tot_size;
214 if (batch_size == 0) {
217 if (batch_size > max_batch_size) {
218 batch_size= max_batch_size;
219 }
else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
220 batch_size= MAX_PARALLEL_OP_PER_SCAN;
223 first_batch_size= batch_size;
228 NdbReceiver::calculate_batch_size(Uint32 key_size,
231 Uint32& batch_byte_size,
232 Uint32& first_batch_size,
235 calculate_batch_size(* m_ndb->theImpl,
238 key_size, parallelism, batch_size, batch_byte_size,
243 NdbReceiver::do_setup_ndbrecord(
const NdbRecord *ndb_record, Uint32 batch_size,
244 Uint32 key_size, Uint32 read_range_no,
245 Uint32 rowsize,
char *row_buffer)
247 m_using_ndb_record=
true;
248 m_record.m_ndb_record= ndb_record;
249 m_record.m_row_recv= row_buffer;
250 m_record.m_row_buffer= row_buffer;
251 m_record.m_row_offset= rowsize;
252 m_record.m_read_range_no= read_range_no;
257 NdbReceiver::ndbrecord_rowsize(
const NdbRecord *ndb_record,
262 Uint32 rowsize= (ndb_record) ? ndb_record->m_row_size : 0;
272 rowsize+= 8 + key_size*4;
280 rowsize+=
sizeof(Uint32) + ra->getColumn()->getSizeInBytes();
284 rowsize= (rowsize+3) & 0xfffffffc;
306 pad(
const Uint8* src, Uint32 align, Uint32 bitPos)
308 UintPtr ptr = UintPtr(src);
310 case DictTabInfo::aBit:
311 case DictTabInfo::a32Bit:
312 case DictTabInfo::a64Bit:
313 case DictTabInfo::a128Bit:
314 return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
316 case DictTabInfo::an8Bit:
317 case DictTabInfo::a16Bit:
318 return src + 4 * ((bitPos + 31) >> 5);
334 handle_packed_bit(
const char* _src, Uint32 pos, Uint32 len,
char* _dst)
336 Uint32 * src = (Uint32*)_src;
337 assert((UintPtr(src) & 3) == 0);
340 UintPtr uiPtr= UintPtr((Uint32*)_dst);
341 Uint32 dstByteOffset= Uint32(uiPtr) & 3;
342 Uint32* dst= (Uint32*) (uiPtr - dstByteOffset);
356 NdbReceiver::receive_packed_recattr(
NdbRecAttr** recAttr,
358 const Uint32* aDataPtr,
362 const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
364 for (Uint32
i = 0, attrId = 0;
i<32*bmlen;
i++, attrId++)
369 NdbColumnImpl::getImpl(* currRecAttr->getColumn());
370 if (unlikely(attrId != (Uint32)col.m_attrId))
376 currRecAttr->setNULL();
377 currRecAttr = currRecAttr->next();
382 Uint32 attrSize = col.m_attrSize;
383 Uint32 array = col.m_arraySize;
384 Uint32 len = col.m_length;
385 Uint32 sz = attrSize * array;
386 Uint32 arrayType = col.m_arrayType;
389 case DictTabInfo::aBit:
390 src = pad(src, 0, 0);
391 handle_packed_bit((
const char*)src, bitPos, len,
392 currRecAttr->
aRef());
393 src += 4 * ((bitPos + len) >> 5);
394 bitPos = (bitPos + len) & 31;
397 src = pad(src, align, bitPos);
400 case NDB_ARRAYTYPE_FIXED:
402 case NDB_ARRAYTYPE_SHORT_VAR:
405 case NDB_ARRAYTYPE_MEDIUM_VAR:
406 sz = 2 + src[0] + 256 * src[1];
413 currRecAttr->receive_data((Uint32*)src, sz);
416 currRecAttr = currRecAttr->next();
419 * recAttr = currRecAttr;
420 return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
435 if (col->flags & NdbRecord::IsNullable)
436 row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
438 memcpy(&row[col->offset], src, byteSize);
445 assert(col->flags & NdbRecord::IsNullable);
446 row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
450 NdbReceiver::get_range_no()
const
453 assert(m_using_ndb_record);
454 Uint32 idx= m_current_row;
455 if (idx == 0 || !m_record.m_read_range_no)
458 m_record.m_row_buffer +
459 (idx-1)*m_record.m_row_offset +
460 m_record.m_ndb_record->m_row_size,
477 if (col->flags & NdbRecord::IsNullable)
480 row[col->nullbit_byte_offset] &=
481 ~(1 << col->nullbit_bit_in_byte);
491 bool isMDBitfield= (col->flags & NdbRecord::IsMysqldBitfield) != 0;
496 dest= (
char*) &mysqldSpace;
499 dest= row + col->offset;
502 src = pad(src, 0, 0);
503 handle_packed_bit((
const char*)src, bitPos, len, dest);
504 src += 4 * ((bitPos + len) >> 5);
505 bitPos = (bitPos + len) & 31;
509 col->put_mysqld_bitfield(row, dest);
520 NdbReceiver::receive_packed_ndbrecord(Uint32 bmlen,
521 const Uint32* aDataPtr,
524 const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
527 const Uint32 maxAttrId= rec->columns[rec->noOfColumns -1].attrId;
528 const Uint32 bmSize= bmlen << 5;
531 for (Uint32
i = 0, attrId = 0;
532 (
i < bmSize) && (attrId <= maxAttrId);
540 assert(attrId < rec->m_attrId_indexes_length);
545 assert((col->flags & NdbRecord::IsBlob) == 0);
550 if (col->flags & NdbRecord::IsNullable)
554 setRecToNULL(col, m_record.m_row_recv);
561 Uint32 align = col->orgAttrSize;
562 Uint32 sz = col->maxSize;
563 Uint32 len = col->bitCount;
565 (col->flags & NdbRecord::IsVar1ByteLen)?
566 NDB_ARRAYTYPE_SHORT_VAR :
568 (col->flags & NdbRecord::IsVar2ByteLen)?
569 NDB_ARRAYTYPE_MEDIUM_VAR :
570 NDB_ARRAYTYPE_FIXED);
573 case DictTabInfo::aBit:
574 handle_bitfield_ndbrecord(col,
581 src = pad(src, align, bitPos);
584 case NDB_ARRAYTYPE_FIXED:
586 case NDB_ARRAYTYPE_SHORT_VAR:
589 case NDB_ARRAYTYPE_MEDIUM_VAR:
590 sz = 2 + src[0] + 256 * src[1];
606 return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
611 NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
612 const char * & data_ptr)
const
614 assert(m_using_ndb_record);
615 Uint32 idx= m_current_row;
618 const char *p= m_record.m_row_buffer +
619 (idx-1)*m_record.m_row_offset +
620 m_record.m_ndb_record->m_row_size;
621 if (m_record.m_read_range_no)
623 scaninfo= uint4korr(p);
625 length= uint4korr(p);
633 NdbReceiver::getScanAttrData(
const char * & data, Uint32 &
size, Uint32 & pos)
const
635 assert(m_using_ndb_record);
636 Uint32 idx= m_current_row;
639 const char *row_end= m_record.m_row_buffer + idx*m_record.m_row_offset;
641 pos+=
sizeof(Uint32);
642 memcpy(&size, row_end - pos,
sizeof(Uint32));
646 assert (pos <= m_record.m_row_offset);
651 NdbReceiver::execTRANSID_AI(
const Uint32* aDataPtr, Uint32 aLength)
667 Uint32 exp= m_expected_result_length;
668 Uint32 tmp= m_received_result_length + aLength;
669 Uint32 origLength=aLength;
673 bool ndbrecord_part_done= !m_using_ndb_record;
674 const bool isScan= (m_type == NDB_SCANRECEIVER) ||
675 (m_type == NDB_QUERY_OPERATION);
685 const Uint32 attrId= ah.getAttributeId();
686 Uint32 attrSize= ah.getByteSize();
689 if (!ndbrecord_part_done)
693 if (attrId == AttributeHeader::RANGE_NO)
695 assert(m_record.m_read_range_no);
697 assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
698 memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
708 if (attrId == AttributeHeader::READ_PACKED)
710 assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
711 Uint32 len= receive_packed_ndbrecord(attrSize >> 2,
713 m_record.m_row_recv);
739 save_pos+=
sizeof(Uint32);
740 memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
741 &attrSize,
sizeof(Uint32));
745 assert (save_pos<=m_record.m_row_offset);
746 memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
750 Uint32 sizeInWords= (attrSize+3)>>2;
751 aDataPtr+= sizeInWords;
752 aLength-= sizeInWords;
760 assert(theCurrentRecAttr != NULL);
761 assert(theCurrentRecAttr->attrId() == attrId);
765 ndbrecord_part_done=
true;
776 if (ndbrecord_part_done)
782 if (attrId == AttributeHeader::READ_PACKED)
784 assert(!m_using_ndb_record);
786 Uint32 len = receive_packed_recattr(&tmp, attrSize>>2, aDataPtr, origLength);
796 while(currRecAttr && currRecAttr->attrId() != attrId){
797 currRecAttr = currRecAttr->next();
800 if(currRecAttr && currRecAttr->receive_data(aDataPtr, attrSize))
802 Uint32 add= (attrSize + 3) >> 2;
805 currRecAttr = currRecAttr->next();
813 ndbout_c(
"this=%p: attrId: %d currRecAttr: %p theCurrentRecAttr: %p "
814 "attrSize: %d %d",
this,
815 attrId, currRecAttr, theCurrentRecAttr, attrSize,
817 currRecAttr = theCurrentRecAttr;
818 while(currRecAttr != 0){
819 ndbout_c(
"%d ", currRecAttr->attrId());
820 currRecAttr = currRecAttr->next();
828 theCurrentRecAttr = currRecAttr;
830 m_received_result_length = tmp;
832 if (m_using_ndb_record) {
834 m_record.m_row_recv+= m_record.m_row_offset;
836 return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
840 NdbReceiver::execKEYINFO20(Uint32 info,
const Uint32* aDataPtr, Uint32 aLength)
842 if (m_using_ndb_record)
846 char *keyinfo_ptr= m_record.m_row_buffer +
847 m_current_row++ * m_record.m_row_offset +
848 m_record.m_ndb_record->m_row_size;
849 if (m_record.m_read_range_no)
852 int4store(keyinfo_ptr, info);
854 int4store(keyinfo_ptr, aLength);
856 memcpy(keyinfo_ptr, aDataPtr, 4*aLength);
858 Uint32 tmp= m_received_result_length + aLength;
859 m_received_result_length = tmp;
861 return (tmp == m_expected_result_length ? 1 : 0);
865 NdbRecAttr* currRecAttr = m_rows[m_current_row++];
866 assert(currRecAttr->attrId() == KEY_ATTR_ID);
876 currRecAttr->receive_data(aDataPtr, 4*(aLength + 1));
881 ((Uint32*)currRecAttr->
aRef())[aLength] = info;
883 Uint32 tmp = m_received_result_length + aLength;
884 m_received_result_length = tmp;
886 return (tmp == m_expected_result_length ? 1 : 0);
893 if (getType()==NDB_QUERY_OPERATION)
901 assert(op->checkMagicNumber()==0);
902 op->setErrorCode(code);