21 #include <SectionReader.hpp>
22 #include <signaldata/LqhKey.hpp>
23 #include <signaldata/QueryTree.hpp>
24 #include <signaldata/TcKeyRef.hpp>
25 #include <signaldata/RouteOrd.hpp>
26 #include <signaldata/TransIdAI.hpp>
27 #include <signaldata/DiGetNodes.hpp>
28 #include <signaldata/DihScanTab.hpp>
29 #include <signaldata/AttrInfo.hpp>
30 #include <Interpreter.hpp>
31 #include <AttributeHeader.hpp>
32 #include <AttributeDescriptor.hpp>
33 #include <KeyDescriptor.hpp>
34 #include <md5_hash.hpp>
35 #include <signaldata/TcKeyConf.hpp>
37 #include <signaldata/NodeFailRep.hpp>
38 #include <signaldata/ReadNodesConf.hpp>
45 #define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
46 #define DEBUG_LQHKEYREQ
47 #define DEBUG_SCAN_FRAGREQ
56 #define DEBUG_CRASH() ndbrequire(false)
64 #undef DEBUG_LQHKEYREQ
65 #undef DEBUG_SCAN_FRAGREQ
69 const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, { 0 } };
72 void Dbspj::execREAD_CONFIG_REQ(
Signal* signal)
76 *
reinterpret_cast<const ReadConfigReq*
>(signal->getDataPtr());
81 DEBUG(
"execREAD_CONFIG_REQ");
82 DEBUG(
"sizeof(Request): " <<
sizeof(
Request) <<
83 " sizeof(TreeNode): " <<
sizeof(TreeNode));
85 m_arenaAllocator.init(1024, RT_SPJ_ARENA_BLOCK, pc);
86 m_request_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_REQUEST, pc);
87 m_treenode_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_TREENODE, pc);
88 m_scanfraghandle_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_SCANFRAG, pc);
89 m_lookup_request_hash.
setSize(16);
90 m_scan_request_hash.
setSize(16);
91 void* ptr = m_ctx.m_mm.get_memroot();
92 m_page_pool.set((RowPage*)ptr, (Uint32)~0);
95 Dependency_map::createRecordInfo(ri, RT_SPJ_DATABUFFER);
96 m_dependency_map_pool.init(&m_arenaAllocator, ri, pc);
100 conf->senderRef = reference();
101 conf->senderData = req.senderData;
103 sendSignal(req.senderRef, GSN_READ_CONFIG_CONF, signal,
104 ReadConfigConf::SignalLength, JBB);
107 static Uint32 f_STTOR_REF = 0;
109 void Dbspj::execSTTOR(
Signal* signal)
115 const Uint16 tphase = signal->theData[1];
116 f_STTOR_REF = signal->getSendersBlockRef();
118 ndbout <<
"Dbspj::execSTTOR() inst:" << instance()
119 <<
" phase=" << tphase << endl;
124 signal->theData[0] = 0;
125 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 1);
132 signal->theData[0] = reference();
133 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
139 #ifdef UNIT_TEST_DATABUFFER2
142 ndbout_c(
"basic test of ArenaPool / DataBuffer2");
144 for (Uint32
i = 0;
i<100;
i++)
147 if (!m_arenaAllocator.seize(ah))
149 ndbout_c(
"Failed to allocate arena");
153 ndbout_c(
"*** LOOP %u",
i);
155 Dependency_map::Head head;
157 for (Uint32 j = 0; j<100; j++)
159 Uint32 sz = rand() % 1000;
161 ndbout_c(
"adding %u", sz);
162 Local_dependency_map list(pool, head);
163 for (Uint32
i = 0;
i<sz;
i++)
164 signal->theData[
i] = sum +
i;
165 list.append(signal->theData, sz);
170 ndbrequire(head.getSize() == sum);
171 Local_dependency_map list(pool, head);
172 Dependency_map::ConstDataBufferIterator it;
174 for (list.first(it); !it.isNull(); list.next(it))
176 ndbrequire(* it.data == cnt);
180 ndbrequire(cnt == sum);
184 if (m_ctx.m_mm.get_resource_limit(7, rl))
186 ndbout_c(
"Resource %d min: %d max: %d curr: %d",
187 7, rl.m_min, rl.m_max, rl.m_curr);
191 ndbout_c(
"release map");
192 Local_dependency_map list(pool, head);
196 ndbout_c(
"release all");
197 m_arenaAllocator.release(ah);
198 ndbout_c(
"*** LOOP %u sum: %u",
i, sum);
205 Dbspj::sendSTTORRY(
Signal* signal)
207 signal->theData[0] = 0;
208 signal->theData[1] = 0;
209 signal->theData[2] = 0;
210 signal->theData[3] = 4;
211 #ifdef UNIT_TEST_DATABUFFER2
212 signal->theData[4] = 120;
214 signal->theData[4] = 255;
216 signal->theData[5] = 255;
217 sendSignal(f_STTOR_REF, GSN_STTORRY, signal, 6, JBB);
221 Dbspj::execREAD_NODESCONF(
Signal* signal)
230 c_alive_nodes.
assign(NdbNodeBitmask::Size, conf->startedNodes);
231 c_alive_nodes.
set(getOwnNodeId());
236 c_alive_nodes.
assign(NdbNodeBitmask::Size, conf->startingNodes);
238 tmp.
assign(NdbNodeBitmask::Size, conf->startedNodes);
239 c_alive_nodes.
bitOR(tmp);
246 Dbspj::execINCL_NODEREQ(
Signal* signal)
249 const Uint32 senderRef = signal->theData[0];
250 const Uint32 nodeId = signal->theData[1];
252 ndbrequire(!c_alive_nodes.
get(nodeId));
253 c_alive_nodes.
set(nodeId);
255 signal->theData[0] = nodeId;
256 signal->theData[1] = reference();
257 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
261 Dbspj::execNODE_FAILREP(
Signal* signal)
267 failed.
assign(NdbNodeBitmask::Size, rep->theNodes);
271 signal->theData[0] = 1;
272 signal->theData[1] = 0;
273 failed.
copyto(NdbNodeBitmask::Size, signal->theData + 2);
274 sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
279 Dbspj::execAPI_FAILREQ(
Signal* signal)
282 Uint32 failedApiNode = signal->theData[0];
283 ndbrequire(signal->theData[1] == QMGR_REF);
290 signal->theData[0] = failedApiNode;
291 signal->theData[1] = reference();
292 sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
296 Dbspj::execCONTINUEB(
Signal* signal)
299 switch(signal->theData[0]) {
301 releaseGlobal(signal);
304 nodeFail_checkRequests(signal);
307 nodeFail_checkRequests(signal);
315 Dbspj::nodeFail_checkRequests(
Signal* signal)
318 const Uint32
type = signal->theData[0];
319 const Uint32
bucket = signal->theData[1];
322 failed.
assign(NdbNodeBitmask::Size, signal->theData+2);
324 Request_iterator iter;
328 hash = &m_lookup_request_hash;
331 hash = &m_scan_request_hash;
334 hash->
next(bucket, iter);
336 const Uint32 RT_BREAK = 64;
337 for(Uint32
i = 0; (
i<RT_BREAK || iter.bucket == bucket) &&
338 !iter.curr.isNull();
i++)
344 i += nodeFail(signal, requestPtr, failed);
347 if (!iter.curr.isNull())
350 signal->theData[0] =
type;
351 signal->theData[1] = bucket;
352 failed.
copyto(NdbNodeBitmask::Size, signal->theData+2);
353 sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
359 signal->theData[0] = 2;
360 signal->theData[1] = 0;
361 failed.
copyto(NdbNodeBitmask::Size, signal->theData+2);
362 sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
368 ndbout_c(
"Finished with handling node-failure");
375 void Dbspj::execLQHKEYREQ(
Signal* signal)
378 c_Counters.incr_counter(CI_READS_RECEIVED, 1);
390 handle.getSection(ssPtr, LqhKeyReq::AttrInfoSectionNum);
397 err = DbspjErr::OutOfQueryMemory;
398 if (unlikely(!m_arenaAllocator.seize(ah)))
402 m_request_pool.
seize(ah, requestPtr);
404 new (requestPtr.p)
Request(ah);
405 do_init(requestPtr.p, req, signal->getSendersBlockRef());
412 err = DbspjErr::ZeroLengthQueryTree;
413 if (unlikely(!r0.getWord(&len_cnt)))
417 Uint32 len = QueryTree::getLength(len_cnt);
418 Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
423 paramReader.step(len);
426 ctx.m_resultRef = req->variableData[0];
427 ctx.m_savepointId = req->savePointId;
429 ctx.m_start_signal = signal;
430 ctx.m_keyPtr.i = handle.m_ptr[LqhKeyReq::KeyInfoSectionNum].i;
431 ctx.m_senderRef = signal->getSendersBlockRef();
433 err = build(ctx, requestPtr, treeReader, paramReader);
434 if (unlikely(err != 0))
442 ndbassert(requestPtr.p->isLookup());
443 ndbassert(requestPtr.p->m_node_cnt == cnt);
444 err = DbspjErr::InvalidRequest;
445 if (unlikely(!requestPtr.p->isLookup() || requestPtr.p->m_node_cnt != cnt))
451 store_lookup(requestPtr);
456 start(signal, requestPtr);
464 if (!requestPtr.isNull())
467 m_request_pool.
release(requestPtr);
469 releaseSections(handle);
470 handle_early_lqhkey_ref(signal, req, err);
476 requestP->m_bits = 0;
477 requestP->m_errCode = 0;
478 requestP->m_state = Request::RS_BUILDING;
479 requestP->m_node_cnt = 0;
480 requestP->m_cnt_active = 0;
481 requestP->m_rows = 0;
482 requestP->m_active_nodes.clear();
483 requestP->m_outstanding = 0;
484 requestP->m_transId[0] = req->transId1;
485 requestP->m_transId[1] = req->transId2;
486 bzero(requestP->m_lookup_node_data,
sizeof(requestP->m_lookup_node_data));
487 #ifdef SPJ_TRACE_TIME
488 requestP->m_cnt_batches = 0;
489 requestP->m_sum_rows = 0;
490 requestP->m_sum_running = 0;
491 requestP->m_sum_waiting = 0;
492 requestP->m_save_time = spj_now();
494 const Uint32 reqInfo = req->requestInfo;
495 Uint32 tmp = req->clientConnectPtr;
496 if (LqhKeyReq::getDirtyFlag(reqInfo) &&
497 LqhKeyReq::getOperation(reqInfo) == ZREAD)
501 ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
504 tmp = req->variableData[1];
505 requestP->m_senderData = tmp;
506 requestP->m_senderRef = senderRef;
510 if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
512 if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
513 tmp = req->variableData[2];
515 tmp = req->variableData[0];
517 requestP->m_senderData = tmp;
518 requestP->m_senderRef = senderRef;
520 requestP->m_rootResultData = tmp;
526 ndbassert(requestPtr.p->isLookup());
528 bool found = m_lookup_request_hash.
find(tmp, *requestPtr.p);
529 ndbrequire(found ==
false);
530 m_lookup_request_hash.
add(requestPtr);
534 Dbspj::handle_early_lqhkey_ref(
Signal* signal,
542 const Uint32 reqInfo = lqhKeyReq->requestInfo;
543 const Uint32 transid[2] = { lqhKeyReq->transId1, lqhKeyReq->transId2 };
545 if (LqhKeyReq::getDirtyFlag(reqInfo) &&
546 LqhKeyReq::getOperation(reqInfo) == ZREAD)
550 ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
551 const Uint32 apiRef = lqhKeyReq->variableData[0];
552 const Uint32 apiOpRec = lqhKeyReq->variableData[1];
554 TcKeyRef*
const tcKeyRef =
reinterpret_cast<TcKeyRef*
>(signal->getDataPtrSend());
556 tcKeyRef->connectPtr = apiOpRec;
557 tcKeyRef->transId[0] = transid[0];
558 tcKeyRef->transId[1] = transid[1];
559 tcKeyRef->errorCode = err;
560 sendTCKEYREF(signal, apiRef, signal->getSendersBlockRef());
565 const Uint32 returnref = signal->getSendersBlockRef();
566 const Uint32 clientPtr = lqhKeyReq->clientConnectPtr;
568 Uint32 TcOprec = clientPtr;
569 if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
571 if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
572 TcOprec = lqhKeyReq->variableData[2];
574 TcOprec = lqhKeyReq->variableData[0];
578 ref->userRef = clientPtr;
579 ref->connectPtr = TcOprec;
580 ref->errorCode = err;
581 ref->transId1 = transid[0];
582 ref->transId2 = transid[1];
583 sendSignal(returnref, GSN_LQHKEYREF, signal,
584 LqhKeyRef::SignalLength, JBB);
589 Dbspj::sendTCKEYREF(
Signal* signal, Uint32 ref, Uint32 routeRef)
591 const Uint32 nodeId = refToNode(ref);
594 if (likely(connectedToNode))
597 sendSignal(ref, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
602 memmove(signal->theData+25, signal->theData, 4*TcKeyRef::SignalLength);
605 ord->srcRef = reference();
606 ord->gsn = GSN_TCKEYREF;
609 ptr[0].p = signal->theData+25;
610 ptr[0].sz = TcKeyRef::SignalLength;
611 sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
617 Dbspj::sendTCKEYCONF(
Signal* signal, Uint32 len, Uint32 ref, Uint32 routeRef)
619 const Uint32 nodeId = refToNode(ref);
622 if (likely(connectedToNode))
625 sendSignal(ref, GSN_TCKEYCONF, signal, len, JBB);
630 memmove(signal->theData+25, signal->theData, 4*len);
633 ord->srcRef = reference();
634 ord->gsn = GSN_TCKEYCONF;
637 ptr[0].p = signal->theData+25;
639 sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
653 Dbspj::execSCAN_FRAGREQ(
Signal* signal)
666 #ifdef DEBUG_SCAN_FRAGREQ
667 ndbout_c(
"Incomming SCAN_FRAGREQ ");
668 printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
669 ScanFragReq::SignalLength + 2,
682 handle.getSection(ssPtr, ScanFragReq::AttrInfoSectionNum);
689 err = DbspjErr::OutOfQueryMemory;
690 if (unlikely(!m_arenaAllocator.seize(ah)))
693 m_request_pool.
seize(ah, requestPtr);
695 new (requestPtr.p)
Request(ah);
696 do_init(requestPtr.p, req, signal->getSendersBlockRef());
701 err = DbspjErr::ZeroLengthQueryTree;
702 if (unlikely(!r0.getWord(&len_cnt)))
706 Uint32 len = QueryTree::getLength(len_cnt);
707 Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
712 paramReader.step(len);
715 ctx.m_resultRef = req->resultRef;
716 ctx.m_scanPrio = ScanFragReq::getScanPrio(req->requestInfo);
717 ctx.m_savepointId = req->savePointId;
718 ctx.m_batch_size_rows = req->batch_size_rows;
719 ctx.m_start_signal = signal;
720 ctx.m_senderRef = signal->getSendersBlockRef();
722 if (handle.m_cnt > 1)
725 ctx.m_keyPtr.i = handle.m_ptr[ScanFragReq::KeyInfoSectionNum].i;
730 ctx.m_keyPtr.i = RNIL;
733 err = build(ctx, requestPtr, treeReader, paramReader);
734 if (unlikely(err != 0))
738 ndbassert(requestPtr.p->isScan());
739 ndbassert(requestPtr.p->m_node_cnt == cnt);
740 err = DbspjErr::InvalidRequest;
741 if (unlikely(!requestPtr.p->isScan() || requestPtr.p->m_node_cnt != cnt))
747 store_scan(requestPtr);
752 start(signal, requestPtr);
756 if (!requestPtr.isNull())
759 m_request_pool.
release(requestPtr);
761 releaseSections(handle);
762 handle_early_scanfrag_ref(signal, req, err);
768 requestP->m_bits = 0;
769 requestP->m_errCode = 0;
770 requestP->m_state = Request::RS_BUILDING;
771 requestP->m_node_cnt = 0;
772 requestP->m_cnt_active = 0;
773 requestP->m_rows = 0;
774 requestP->m_active_nodes.clear();
775 requestP->m_outstanding = 0;
776 requestP->m_senderRef = senderRef;
777 requestP->m_senderData = req->senderData;
778 requestP->m_transId[0] = req->transId1;
779 requestP->m_transId[1] = req->transId2;
780 requestP->m_rootResultData = req->resultData;
781 bzero(requestP->m_lookup_node_data,
sizeof(requestP->m_lookup_node_data));
782 #ifdef SPJ_TRACE_TIME
783 requestP->m_cnt_batches = 0;
784 requestP->m_sum_rows = 0;
785 requestP->m_sum_running = 0;
786 requestP->m_sum_waiting = 0;
787 requestP->m_save_time = spj_now();
794 ndbassert(requestPtr.p->isScan());
796 bool found = m_scan_request_hash.
find(tmp, *requestPtr.p);
797 ndbrequire(found ==
false);
798 m_scan_request_hash.
add(requestPtr);
802 Dbspj::handle_early_scanfrag_ref(
Signal* signal,
807 Uint32 senderRef = signal->getSendersBlockRef();
810 ref->senderData = req.senderData;
811 ref->transId1 = req.transId1;
812 ref->transId2 = req.transId2;
813 ref->errorCode = err;
814 sendSignal(senderRef, GSN_SCAN_FRAGREF, signal,
815 ScanFragRef::SignalLength, JBB);
826 Dbspj::build(Build_context& ctx,
832 Uint32 err = DbspjErr::ZeroLengthQueryTree;
837 Uint32 loop = QueryTree::getNodeCnt(tmp0);
840 err = DbspjErr::InvalidTreeNodeCount;
841 if (loop == 0 || loop > NDB_SPJ_MAX_TREE_NODES)
847 while (ctx.m_cnt < loop)
849 DEBUG(
" - loop " << ctx.m_cnt <<
" pos: " << tree.getPos().currPos);
850 tree.peekWord(&tmp0);
851 param.peekWord(&tmp1);
852 Uint32 node_op = QueryNode::getOpType(tmp0);
853 Uint32 node_len = QueryNode::getLength(tmp0);
854 Uint32 param_op = QueryNodeParameters::getOpType(tmp1);
855 Uint32 param_len = QueryNodeParameters::getLength(tmp1);
857 err = DbspjErr::QueryNodeTooBig;
858 if (unlikely(node_len >= NDB_ARRAY_SIZE(m_buffer0)))
864 err = DbspjErr::QueryNodeParametersTooBig;
865 if (unlikely(param_len >= NDB_ARRAY_SIZE(m_buffer1)))
871 err = DbspjErr::InvalidTreeNodeSpecification;
872 if (unlikely(tree.getWords(m_buffer0, node_len) ==
false))
878 err = DbspjErr::InvalidTreeParametersSpecification;
879 if (unlikely(param.getWords(m_buffer1, param_len) ==
false))
885 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
887 for (Uint32
i = 0;
i<node_len;
i++)
888 printf(
"0x%.8x ", m_buffer0[
i]);
892 for (Uint32 i = 0; i<param_len; i++)
893 printf(
"0x%.8x ", m_buffer1[i]);
897 err = DbspjErr::UnknowQueryOperation;
898 if (unlikely(node_op != param_op))
904 const OpInfo* info = getOpInfo(node_op);
905 if (unlikely(info == 0))
915 err = (this->*(info->m_build))(ctx, requestPtr, qn, qp);
916 if (unlikely(err != 0))
925 ctx.m_start_signal = 0;
930 ndbrequire(ctx.m_cnt < NDB_ARRAY_SIZE(ctx.m_node_list));
933 requestPtr.p->m_node_cnt = ctx.m_cnt;
939 if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
942 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
943 for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
948 treeNodePtr.p->m_row_map.init();
950 else if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
953 treeNodePtr.p->m_row_list.init();
958 if (ctx.m_scan_cnt > 1)
961 requestPtr.p->m_bits |= Request::RT_MULTI_SCAN;
971 if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
974 requestPtr.p->m_bits |= Request::RT_VAR_ALLOC;
986 Dbspj::createNode(Build_context& ctx,
Ptr<Request> requestPtr,
994 if (m_treenode_pool.
seize(requestPtr.p->m_arena, treeNodePtr))
996 DEBUG(
"createNode - seize -> ptrI: " << treeNodePtr.i);
997 new (treeNodePtr.p) TreeNode(requestPtr.i);
998 ctx.m_node_list[ctx.m_cnt] = treeNodePtr;
999 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1000 list.addLast(treeNodePtr);
1001 treeNodePtr.p->m_node_no = ctx.m_cnt;
1004 return DbspjErr::OutOfOperations;
1008 Dbspj::start(
Signal* signal,
1011 if (requestPtr.p->m_bits & Request::RT_NEED_PREPARE)
1014 requestPtr.p->m_outstanding = 0;
1015 requestPtr.p->m_state = Request::RS_PREPARING;
1018 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1019 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1022 ndbrequire(nodePtr.p->m_info != 0);
1026 (this->*(nodePtr.p->m_info->
m_prepare))(signal, requestPtr, nodePtr);
1034 ndbassert(requestPtr.p->m_outstanding);
1037 checkPrepareComplete(signal, requestPtr, 0);
1044 ndbrequire(requestPtr.p->m_outstanding >= cnt);
1045 requestPtr.p->m_outstanding -= cnt;
1047 if (requestPtr.p->m_outstanding == 0)
1051 if (unlikely((requestPtr.p->m_state & Request::RS_ABORTING) != 0))
1054 batchComplete(signal, requestPtr);
1058 requestPtr.p->m_state = Request::RS_RUNNING;
1061 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1062 ndbrequire(list.first(nodePtr));
1064 ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->
m_start != 0);
1065 (this->*(nodePtr.p->m_info->
m_start))(signal, requestPtr, nodePtr);
1073 ndbrequire(requestPtr.p->m_outstanding >= cnt);
1074 requestPtr.p->m_outstanding -= cnt;
1076 if (requestPtr.p->m_outstanding == 0)
1079 batchComplete(signal, requestPtr);
1086 ndbrequire(requestPtr.p->m_outstanding == 0);
1088 bool is_complete = requestPtr.p->m_cnt_active == 0;
1089 bool need_complete_phase = requestPtr.p->m_bits & Request::RT_NEED_COMPLETE;
1091 if (requestPtr.p->isLookup())
1093 ndbassert(requestPtr.p->m_cnt_active == 0);
1096 if (!is_complete || (is_complete && need_complete_phase ==
false))
1105 if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1107 ndbassert(is_complete);
1110 prepareNextBatch(signal, requestPtr);
1111 sendConf(signal, requestPtr, is_complete);
1113 else if (is_complete && need_complete_phase)
1119 complete(signal, requestPtr);
1123 if (requestPtr.p->m_cnt_active == 0)
1129 cleanup(requestPtr);
1131 else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
1137 releaseScanBuffers(requestPtr);
1139 else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
1146 releaseRequestBuffers(requestPtr,
true);
1159 requestPtr.p->m_cursor_nodes.init();
1160 requestPtr.p->m_active_nodes.
clear();
1162 if (requestPtr.p->m_cnt_active == 0)
1168 if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT)
1186 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1192 for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1197 DEBUG(
"Will fetch more from 'active' m_node_no: " << nodePtr.p->m_node_no);
1201 registerActiveCursor(requestPtr, nodePtr);
1213 if (!nodePtr.isNull())
1216 DEBUG(
"Calculate 'active', w/ cursor on m_node_no: " << nodePtr.p->m_node_no);
1219 for (list.next(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1222 if (!nodePtr.p->m_ancestors.
overlaps (requestPtr.p->m_active_nodes))
1226 ndbrequire(nodePtr.p->m_info != 0);
1250 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1251 TreeNodeBitMask ancestors_of_active;
1253 for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1263 !ancestors_of_active.get (nodePtr.p->m_node_no))
1266 DEBUG(
"Add 'active' m_node_no: " << nodePtr.p->m_node_no);
1267 registerActiveCursor(requestPtr, nodePtr);
1268 ancestors_of_active.bitOR(nodePtr.p->m_ancestors);
1273 DEBUG(
"Calculated 'm_active_nodes': " << requestPtr.p->m_active_nodes.rep.data[0]);
1279 if (requestPtr.p->isScan())
1281 if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1288 ndbrequire(is_complete);
1289 ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
1293 if (requestPtr.p->m_errCode == 0)
1297 reinterpret_cast<ScanFragConf*
>(signal->getDataPtrSend());
1298 conf->senderData = requestPtr.p->m_senderData;
1299 conf->transId1 = requestPtr.p->m_transId[0];
1300 conf->transId2 = requestPtr.p->m_transId[1];
1301 conf->completedOps = requestPtr.p->m_rows;
1302 conf->fragmentCompleted = is_complete ? 1 : 0;
1303 conf->total_len = requestPtr.p->m_active_nodes.rep.data[0];
1305 c_Counters.incr_counter(CI_SCAN_BATCHES_RETURNED, 1);
1306 c_Counters.incr_counter(CI_SCAN_ROWS_RETURNED, requestPtr.p->m_rows);
1308 #ifdef SPJ_TRACE_TIME
1309 Uint64 now = spj_now();
1310 Uint64 then = requestPtr.p->m_save_time;
1312 requestPtr.p->m_sum_rows += requestPtr.p->m_rows;
1313 requestPtr.p->m_sum_running += Uint32(now - then);
1314 requestPtr.p->m_cnt_batches++;
1315 requestPtr.p->m_save_time = now;
1319 Uint32 cnt = requestPtr.p->m_cnt_batches;
1320 ndbout_c(
"batches: %u avg_rows: %u avg_running: %u avg_wait: %u",
1322 (requestPtr.p->m_sum_rows / cnt),
1323 (requestPtr.p->m_sum_running / cnt),
1324 cnt == 1 ? 0 : requestPtr.p->m_sum_waiting / (cnt - 1));
1331 requestPtr.p->m_rows = 0;
1335 requestPtr.p->m_state |= Request::RS_WAITING;
1337 #ifdef DEBUG_SCAN_FRAGREQ
1338 ndbout_c(
"Dbspj::sendConf() sending SCAN_FRAGCONF ");
1339 printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1343 sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
1344 ScanFragConf::SignalLength, JBB);
1349 ndbrequire(is_complete);
1351 reinterpret_cast<ScanFragRef*
>(signal->getDataPtrSend());
1352 ref->senderData = requestPtr.p->m_senderData;
1353 ref->transId1 = requestPtr.p->m_transId[0];
1354 ref->transId2 = requestPtr.p->m_transId[1];
1355 ref->errorCode = requestPtr.p->m_errCode;
1357 sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
1358 ScanFragRef::SignalLength, JBB);
1363 ndbassert(is_complete);
1364 if (requestPtr.p->m_errCode)
1367 Uint32 resultRef = getResultRef(requestPtr);
1369 ref->connectPtr = requestPtr.p->m_senderData;
1370 ref->transId[0] = requestPtr.p->m_transId[0];
1371 ref->transId[1] = requestPtr.p->m_transId[1];
1372 ref->errorCode = requestPtr.p->m_errCode;
1375 sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
1384 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1385 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1387 if (nodePtr.p->m_info == &g_LookupOpInfo)
1390 return nodePtr.p->m_lookup_data.m_api_resultRef;
1401 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1402 TreeNodeBitMask ancestors_of_active;
1404 for (list.last(treeNodePtr); !treeNodePtr.isNull(); list.prev(treeNodePtr))
1410 if (!ancestors_of_active.get(treeNodePtr.p->m_node_no))
1412 if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1415 releaseNodeRows(requestPtr, treeNodePtr);
1424 requestPtr.p->m_active_nodes.
get(treeNodePtr.p->m_node_no))
1427 cleanupChildBranch(requestPtr, treeNodePtr);
1436 requestPtr.p->m_active_nodes.
get(treeNodePtr.p->m_node_no))
1438 ancestors_of_active.bitOR(treeNodePtr.p->m_ancestors);
1445 ndbrequire(requestPtr.p->m_cnt_active >= 1);
1451 Uint32 bit = treeNodePtr.p->m_node_no;
1452 ndbrequire(!requestPtr.p->m_active_nodes.
get(bit));
1453 requestPtr.p->m_active_nodes.
set(bit);
1455 Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
1459 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1461 ndbrequire(nodePtr.i != treeNodePtr.i);
1465 list.add(treeNodePtr);
1472 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1473 Dependency_map::ConstDataBufferIterator it;
1474 for (list.first(it); !it.isNull(); list.next(it))
1478 m_treenode_pool.
getPtr(childPtr, *it.data);
1485 cleanupChildBranch(requestPtr,childPtr);
1498 ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1499 ndbassert(treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER);
1508 SLFifoRowListIterator iter;
1509 for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1512 RowRef pos = iter.m_ref;
1514 releaseRow(requestPtr, pos);
1517 treeNodePtr.p->m_row_list.init();
1518 DEBUG(
"SLFifoRowListIterator: released " << cnt <<
" rows!");
1524 RowMapIterator iter;
1525 for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1528 RowRef pos = iter.m_ref;
1531 releaseRow(requestPtr, pos);
1534 treeNodePtr.p->m_row_map.init();
1535 DEBUG(
"RowMapIterator: released " << cnt <<
" rows!");
1542 ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1543 ndbassert(pos.m_allocator == 1);
1545 m_page_pool.
getPtr(ptr, pos.m_page_id);
1546 ((Var_page*)ptr.p)->free_record(pos.m_page_pos, Var_page::CHAIN);
1547 Uint32 free_space = ((Var_page*)ptr.p)->free_space;
1548 if (free_space == 0)
1552 requestPtr.p->m_rowBuffer.m_page_list);
1556 else if (free_space > requestPtr.p->m_rowBuffer.m_var.m_free)
1559 requestPtr.p->m_rowBuffer.m_page_list);
1562 requestPtr.p->m_rowBuffer.m_var.m_free = free_space;
1567 Dbspj::releaseRequestBuffers(
Ptr<Request> requestPtr,
bool reset)
1575 requestPtr.p->m_rowBuffer.m_page_list);
1576 if (!list.isEmpty())
1582 releasePages(first.i, last);
1586 requestPtr.p->m_rowBuffer.stack_init();
1592 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1593 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1596 if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1602 nodePtr.p->m_row_map.init();
1606 nodePtr.p->m_row_list.init();
1618 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1619 Dependency_map::ConstDataBufferIterator it;
1620 for (list.first(it); !it.isNull(); list.next(it))
1624 m_treenode_pool.
getPtr(childPtr, * it.data);
1628 ndbrequire(childPtr.p->m_info != 0 &&
1642 if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1648 requestPtr.p->m_state |= Request::RS_ABORTING;
1649 requestPtr.p->m_errCode = errCode;
1653 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1654 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1663 ndbrequire(nodePtr.p->m_info != 0);
1664 if (nodePtr.p->m_info->
m_abort != 0)
1667 (this->*(nodePtr.p->m_info->
m_abort))(signal, requestPtr, nodePtr);
1673 checkBatchComplete(signal, requestPtr, 0);
1685 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1686 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1689 ndbrequire(nodePtr.p->m_info != 0);
1709 if (requestPtr.p->isScan() &&
1710 nodes.
get(refToNode(requestPtr.p->m_senderRef)))
1713 abort(signal, requestPtr, DbspjErr::NodeFailure);
1719 abort(signal, requestPtr, DbspjErr::NodeFailure);
1731 Uint32
flags = requestPtr.p->m_state &
1732 (Request::RS_ABORTING | Request::RS_WAITING);
1734 requestPtr.p->m_state = Request::RS_COMPLETING |
flags;
1738 ndbassert((requestPtr.p->m_bits & Request::RT_NEED_COMPLETE) != 0);
1739 requestPtr.p->m_bits &= ~(Uint32)Request::RT_NEED_COMPLETE;
1740 requestPtr.p->m_outstanding = 0;
1743 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1744 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1747 ndbrequire(nodePtr.p->m_info != 0);
1751 (this->*(nodePtr.p->m_info->
m_complete))(signal, requestPtr, nodePtr);
1765 checkBatchComplete(signal, requestPtr, 0);
1771 ndbrequire(requestPtr.p->m_cnt_active == 0);
1774 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1775 for (list.first(nodePtr); !nodePtr.isNull(); )
1778 ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->
m_cleanup != 0);
1779 (this->*(nodePtr.p->m_info->
m_cleanup))(requestPtr, nodePtr);
1787 if (requestPtr.p->isScan())
1791 if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1794 requestPtr.p->m_state = Request::RS_ABORTED;
1801 key.m_transId[0] = requestPtr.p->m_transId[0];
1802 key.m_transId[1] = requestPtr.p->m_transId[1];
1803 key.m_senderData = requestPtr.p->m_senderData;
1805 ndbrequire(m_scan_request_hash.
find(tmp, key));
1808 m_scan_request_hash.
remove(requestPtr);
1816 key.m_transId[0] = requestPtr.p->m_transId[0];
1817 key.m_transId[1] = requestPtr.p->m_transId[1];
1818 key.m_senderData = requestPtr.p->m_senderData;
1820 ndbrequire(m_lookup_request_hash.
find(tmp, key));
1823 m_lookup_request_hash.
remove(requestPtr);
1825 releaseRequestBuffers(requestPtr,
false);
1827 m_request_pool.
release(requestPtr);
1828 m_arenaAllocator.release(ah);
1838 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1843 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
1848 Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
1852 if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
1855 releaseSection(treeNodePtr.p->m_send.m_keyInfoPtrI);
1858 if (treeNodePtr.p->m_send.m_attrInfoPtrI != RNIL)
1861 releaseSection(treeNodePtr.p->m_send.m_attrInfoPtrI);
1869 Dbspj::execLQHKEYREF(
Signal* signal)
1875 DEBUG(
"execLQHKEYREF, errorCode:" << ref->errorCode);
1877 m_treenode_pool.
getPtr(treeNodePtr, ref->connectPtr);
1880 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1882 ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->
m_execLQHKEYREF);
1889 Dbspj::execLQHKEYCONF(
Signal* signal)
1893 DEBUG(
"execLQHKEYCONF");
1897 m_treenode_pool.
getPtr(treeNodePtr, conf->opPtr);
1900 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1902 ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->
m_execLQHKEYCONF);
1909 Dbspj::execSCAN_FRAGREF(
Signal* signal)
1914 DEBUG(
"execSCAN_FRAGREF, errorCode:" << ref->errorCode);
1917 m_scanfraghandle_pool.
getPtr(scanFragHandlePtr, ref->senderData);
1919 m_treenode_pool.
getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1921 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1931 Dbspj::execSCAN_HBREP(
Signal* signal)
1935 Uint32 senderData = signal->theData[0];
1939 m_scanfraghandle_pool.
getPtr(scanFragHandlePtr, senderData);
1941 m_treenode_pool.
getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1943 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1945 Uint32 ref = requestPtr.p->m_senderRef;
1946 signal->theData[0] = requestPtr.p->m_senderData;
1947 sendSignal(ref, GSN_SCAN_HBREP, signal, 3, JBB);
1951 Dbspj::execSCAN_FRAGCONF(
Signal* signal)
1954 DEBUG(
"execSCAN_FRAGCONF");
1958 #ifdef DEBUG_SCAN_FRAGREQ
1959 ndbout_c(
"Dbspj::execSCAN_FRAGCONF() receiveing SCAN_FRAGCONF ");
1960 printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1966 m_scanfraghandle_pool.
getPtr(scanFragHandlePtr, conf->senderData);
1968 m_treenode_pool.
getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1970 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1980 Dbspj::execSCAN_NEXTREQ(
Signal* signal)
1985 DEBUG(
"Incomming SCAN_NEXTREQ");
1986 #ifdef DEBUG_SCAN_FRAGREQ
1987 printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
1988 ScanFragNextReq::SignalLength, DBLQH);
1992 key.m_transId[0] = req->transId1;
1993 key.m_transId[1] = req->transId2;
1994 key.m_senderData = req->senderData;
1997 if (unlikely(!m_scan_request_hash.
find(requestPtr, key)))
2000 ndbrequire(req->requestInfo == ScanFragNextReq::ZCLOSE);
2004 #ifdef SPJ_TRACE_TIME
2005 Uint64 now = spj_now();
2006 Uint64 then = requestPtr.p->m_save_time;
2007 requestPtr.p->m_sum_waiting += Uint32(now - then);
2008 requestPtr.p->m_save_time = now;
2011 Uint32 state = requestPtr.p->m_state;
2012 requestPtr.p->m_state = state & ~Uint32(Request::RS_WAITING);
2014 if (unlikely(state == Request::RS_ABORTED))
2017 batchComplete(signal, requestPtr);
2021 if (unlikely((state & Request::RS_ABORTING) != 0))
2031 if (req->requestInfo == ScanFragNextReq::ZCLOSE)
2034 abort(signal, requestPtr, 0);
2038 ndbrequire((state & Request::RS_WAITING) != 0);
2039 ndbrequire(requestPtr.p->m_outstanding == 0);
2046 Local_TreeNodeCursor_list list(m_treenode_pool,
2047 requestPtr.p->m_cursor_nodes);
2048 Uint32 cnt_active = 0;
2050 for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
2055 DEBUG(
"SCAN_NEXTREQ on TreeNode: " << treeNodePtr.i
2056 <<
", m_node_no: " << treeNodePtr.p->m_node_no
2057 <<
", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2059 ndbrequire(treeNodePtr.p->m_info != 0 &&
2073 ndbrequire(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT);
2074 DEBUG(
" Restart TreeNode: " << treeNodePtr.i
2075 <<
", m_node_no: " << treeNodePtr.p->m_node_no
2076 <<
", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2078 ndbrequire(treeNodePtr.p->m_info != 0 &&
2086 ndbrequire(cnt_active == 1 ||
2087 !(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT));
2092 Dbspj::execTRANSID_AI(
Signal* signal)
2095 DEBUG(
"execTRANSID_AI");
2097 Uint32 ptrI = req->connectPtr;
2101 m_treenode_pool.
getPtr(treeNodePtr, ptrI);
2103 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
2105 ndbrequire(signal->getNoOfSections() != 0);
2110 handle.getSection(dataPtr, 0);
2114 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
2115 printf(
"execTRANSID_AI: ");
2116 print(dataPtr, stdout);
2122 Uint32 tmp[2+MAX_ATTRIBUTES_IN_TABLE];
2123 RowPtr::Header* header = CAST_PTR(RowPtr::Header, &tmp[0]);
2125 Uint32 cnt = buildRowHeader(header, dataPtr);
2126 ndbassert(header->m_len < NDB_ARRAY_SIZE(tmp));
2129 row.m_type = RowPtr::RT_SECTION;
2130 row.m_src_node_ptrI = treeNodePtr.i;
2131 row.m_row_data.m_section.m_header = header;
2132 row.m_row_data.m_section.m_dataPtr.assign(dataPtr);
2134 getCorrelationData(row.m_row_data.m_section,
2136 row.m_src_correlation);
2138 if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
2141 Uint32 err = storeRow(requestPtr, treeNodePtr, row);
2142 ndbrequire(err == 0);
2145 ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->
m_execTRANSID_AI);
2157 ndbassert(row.m_type == RowPtr::RT_SECTION);
2159 Uint32 * headptr = (Uint32*)row.m_row_data.m_section.m_header;
2160 Uint32 headlen = 1 + row.m_row_data.m_section.m_header->m_len;
2169 totlen += dataPtr.sz;
2174 Uint32 * dstptr = 0;
2175 if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2178 dstptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2183 dstptr = varAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2186 if (unlikely(dstptr == 0))
2189 return DbspjErr::OutOfRowMemory;
2192 row.m_type = RowPtr::RT_LINEAR;
2193 row.m_row_data.m_linear.m_row_ref = ref;
2194 row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen);
2195 row.m_row_data.m_linear.m_data = dstptr + linklen + headlen;
2197 memcpy(dstptr + linklen, headptr, 4 * headlen);
2198 copy(dstptr + linklen + headlen, dataPtr);
2203 NullRowRef.copyto_link(dstptr);
2209 return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
2217 RowPtr& row, RowRef ref,
const Uint32 * src)
2221 const RowPtr::Header * headptr = (RowPtr::Header*)(src + linklen);
2222 Uint32 headlen = 1 + headptr->m_len;
2224 row.m_type = RowPtr::RT_LINEAR;
2225 row.m_row_data.m_linear.m_row_ref = ref;
2226 row.m_row_data.m_linear.m_header = headptr;
2227 row.m_row_data.m_linear.m_data = (Uint32*)headptr + headlen;
2236 list.m_first_row_page_id = rowref.m_page_id;
2237 list.m_first_row_page_pos = rowref.m_page_pos;
2246 last.m_allocator = rowref.m_allocator;
2247 last.m_page_id = list.m_last_row_page_id;
2248 last.m_page_pos = list.m_last_row_page_pos;
2250 if (rowref.m_allocator == 0)
2253 rowptr = get_row_ptr_stack(last);
2258 rowptr = get_row_ptr_var(last);
2260 rowref.copyto_link(rowptr);
2263 list.m_last_row_page_id = rowref.m_page_id;
2264 list.m_last_row_page_pos = rowref.m_page_pos;
2268 Dbspj::get_row_ptr_stack(RowRef pos)
2270 ndbassert(pos.m_allocator == 0);
2272 m_page_pool.
getPtr(ptr, pos.m_page_id);
2273 return ptr.p->m_data + pos.m_page_pos;
2277 Dbspj::get_row_ptr_var(RowRef pos)
2279 ndbassert(pos.m_allocator == 1);
2281 m_page_pool.
getPtr(ptr, pos.m_page_id);
2282 return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos);
2287 SLFifoRowListIterator& iter)
2289 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2290 SLFifoRowList & list = treeNodePtr.p->m_row_list;
2298 iter.m_ref.m_allocator = var;
2299 iter.m_ref.m_page_id = list.m_first_row_page_id;
2300 iter.m_ref.m_page_pos = list.m_first_row_page_pos;
2304 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2309 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2316 Dbspj::next(SLFifoRowListIterator& iter)
2318 iter.m_ref.assign_from_link(iter.m_row_ptr);
2319 if (iter.m_ref.isNull())
2325 if (iter.m_ref.m_allocator == 0)
2328 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2333 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2340 SLFifoRowListIterator& iter, SLFifoRowListIteratorPtr start)
2342 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2344 ndbassert(var == iter.m_ref.m_allocator);
2345 if (iter.m_ref.m_allocator == 0)
2348 iter.m_row_ptr = get_row_ptr_stack(start.m_ref);
2353 iter.m_row_ptr = get_row_ptr_var(start.m_ref);
2360 Uint32 corrVal, RowRef rowref)
2363 RowMap& map = treeNodePtr.p->m_row_map;
2367 Uint16 batchsize = treeNodePtr.p->m_batch_size;
2368 Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * batchsize;
2369 Uint32 sz32 = (sz16 + 1) / 2;
2371 if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2374 mapptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2379 mapptr = varAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2381 if (unlikely(mapptr == 0))
2384 return DbspjErr::OutOfRowMemory;
2388 map.m_size = batchsize;
2396 if (ref.m_allocator == 0)
2399 mapptr = get_row_ptr_stack(ref);
2404 mapptr = get_row_ptr_var(ref);
2408 Uint32 pos = corrVal & 0xFFFF;
2409 ndbrequire(pos < map.m_size);
2410 ndbrequire(map.m_elements < map.m_size);
2418 map.load(mapptr, pos, check);
2419 ndbrequire(check.m_page_pos == 0xFFFF);
2422 map.store(mapptr, pos, rowref);
2429 RowMapIterator & iter)
2431 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2432 RowMap& map = treeNodePtr.p->m_row_map;
2443 iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2448 iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2450 iter.m_size = map.m_size;
2451 iter.m_ref.m_allocator = var;
2454 while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2457 if (pos == iter.m_size)
2466 RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2467 iter.m_element_no = pos;
2471 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2476 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2483 Dbspj::next(RowMapIterator & iter)
2485 Uint32 pos = iter.m_element_no + 1;
2486 while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2489 if (pos == iter.m_size)
2498 RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2499 iter.m_element_no = pos;
2500 if (iter.m_ref.m_allocator == 0)
2503 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2508 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2516 RowMapIterator & iter, RowMapIteratorPtr start)
2518 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2519 RowMap& map = treeNodePtr.p->m_row_map;
2520 ndbrequire(!map.isNull());
2525 iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2530 iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2532 iter.m_size = map.m_size;
2534 RowMap::load(iter.m_map_ptr, start.m_element_no, iter.m_ref);
2535 iter.m_element_no = start.m_element_no;
2540 Dbspj::stackAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2545 Uint32 pos = buffer.m_stack.m_pos;
2546 const Uint32 SIZE = RowPage::SIZE;
2547 if (list.isEmpty() || (pos + sz) > SIZE)
2550 bool ret = allocPage(ptr);
2551 if (unlikely(ret ==
false))
2565 dst.m_page_id = ptr.i;
2566 dst.m_page_pos = pos;
2567 dst.m_allocator = 0;
2568 buffer.m_stack.m_pos = pos + sz;
2569 return ptr.p->m_data + pos;
2573 Dbspj::varAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2578 Uint32 free_space = buffer.m_var.m_free;
2579 if (list.isEmpty() || free_space < (sz + 1))
2582 bool ret = allocPage(ptr);
2583 if (unlikely(ret ==
false))
2590 ((Var_page*)ptr.p)->init();
2598 Var_page * vp = (Var_page*)ptr.p;
2599 Uint32 pos = vp->alloc_record(sz, (Var_page*)m_buffer0, Var_page::CHAIN);
2601 dst.m_page_id = ptr.i;
2602 dst.m_page_pos = pos;
2603 dst.m_allocator = 1;
2604 buffer.m_var.m_free = vp->free_space;
2605 return vp->get_ptr(pos);
2611 if (m_free_page_list.firstItem == RNIL)
2614 ptr.p = (RowPage*)m_ctx.m_mm.alloc_page(RT_SPJ_DATABUFFER,
2616 Ndbd_mem_manager::NDB_ZONE_ANY);
2627 bool ret = list.remove_front(ptr);
2644 list.add(first, last);
2648 Dbspj::releaseGlobal(
Signal * signal)
2660 list.remove_front(ptr);
2661 m_ctx.m_mm.release_page(RT_SPJ_DATABUFFER, ptr.i);
2664 signal->theData[0] = 0;
2665 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1);
2676 Dbspj::g_LookupOpInfo =
2678 &Dbspj::lookup_build,
2680 &Dbspj::lookup_start,
2681 &Dbspj::lookup_execTRANSID_AI,
2682 &Dbspj::lookup_execLQHKEYREF,
2683 &Dbspj::lookup_execLQHKEYCONF,
2686 &Dbspj::lookup_parent_row,
2687 &Dbspj::lookup_parent_batch_complete,
2692 &Dbspj::lookup_abort,
2693 &Dbspj::lookup_execNODE_FAILREP,
2694 &Dbspj::lookup_cleanup
2698 Dbspj::lookup_build(Build_context& ctx,
2709 err = createNode(ctx, requestPtr, treeNodePtr);
2710 if (unlikely(err != 0))
2716 treeNodePtr.p->m_info = &g_LookupOpInfo;
2717 Uint32 transId1 = requestPtr.p->m_transId[0];
2718 Uint32 transId2 = requestPtr.p->m_transId[1];
2719 Uint32 savePointId = ctx.m_savepointId;
2721 Uint32 treeBits = node->requestInfo;
2722 Uint32 paramBits = param->requestInfo;
2730 dst->tcBlockref = reference();
2731 dst->clientConnectPtr = treeNodePtr.i;
2739 dst->transId1 = transId1;
2740 dst->transId2 = transId2;
2741 dst->savePointId = savePointId;
2745 dst->variableData[0] = ctx.m_resultRef;
2746 dst->variableData[1] = param->resultData;
2747 Uint32 requestInfo = 0;
2748 LqhKeyReq::setOperation(requestInfo, ZREAD);
2749 LqhKeyReq::setApplicationAddressFlag(requestInfo, 1);
2750 LqhKeyReq::setDirtyFlag(requestInfo, 1);
2751 LqhKeyReq::setSimpleFlag(requestInfo, 1);
2752 LqhKeyReq::setNormalProtocolFlag(requestInfo, 0);
2753 LqhKeyReq::setCorrFactorFlag(requestInfo, 1);
2754 LqhKeyReq::setNoDiskFlag(requestInfo,
2757 dst->requestInfo = requestInfo;
2760 err = DbspjErr::InvalidTreeNodeSpecification;
2761 if (unlikely(node->len < QN_LookupNode::NodeSize))
2773 Uint32 tableId = node->tableId;
2774 Uint32 schemaVersion = node->tableVersion;
2776 Uint32 tableSchemaVersion = tableId + ((schemaVersion << 16) & 0xFFFF0000);
2777 dst->tableSchemaVersion = tableSchemaVersion;
2779 err = DbspjErr::InvalidTreeParametersSpecification;
2780 DEBUG(
"param len: " << param->len);
2781 if (unlikely(param->len < QN_LookupParameters::NodeSize))
2787 ctx.m_resultData = param->resultData;
2788 treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef;
2789 treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData;
2796 struct DABuffer nodeDA, paramDA;
2798 nodeDA.end = nodeDA.ptr + (node->len - QN_LookupNode::NodeSize);
2800 paramDA.end = paramDA.ptr + (param->len - QN_LookupParameters::NodeSize);
2801 err = parseDA(ctx, requestPtr, treeNodePtr,
2802 nodeDA, treeBits, paramDA, paramBits);
2803 if (unlikely(err != 0))
2809 if (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED)
2812 LqhKeyReq::setInterpretedFlag(dst->requestInfo, 1);
2818 treeNodePtr.p->m_batch_size = 1;
2819 if (treeNodePtr.p->m_parentPtrI != RNIL)
2823 m_treenode_pool.
getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
2824 treeNodePtr.p->m_batch_size = parentPtr.p->m_batch_size;
2827 if (ctx.m_start_signal)
2830 Signal * signal = ctx.m_start_signal;
2834 blockToInstance(signal->header.theReceiversBlockNumber);
2835 treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
2836 instanceNo, getOwnNodeId());
2838 treeNodePtr.p->m_send.m_ref =
2839 numberToRef(DBLQH, getInstanceKey(src->tableSchemaVersion & 0xFFFF,
2840 src->fragmentData & 0xFFFF),
2844 Uint32 hashValue = src->hashValue;
2845 Uint32 fragId = src->fragmentData;
2846 Uint32 requestInfo = src->requestInfo;
2847 Uint32 attrLen = src->attrLen;
2852 ndbassert(LqhKeyReq::getAttrLen(attrLen) == 0);
2853 ndbassert(LqhKeyReq::getScanTakeOverFlag(attrLen) == 0);
2854 ndbassert(LqhKeyReq::getReorgFlag(attrLen) == 0);
2855 ndbassert(LqhKeyReq::getOperation(requestInfo) == ZREAD);
2856 ndbassert(LqhKeyReq::getKeyLen(requestInfo) == 0);
2857 ndbassert(LqhKeyReq::getMarkerFlag(requestInfo) == 0);
2858 ndbassert(LqhKeyReq::getAIInLqhKeyReq(requestInfo) == 0);
2859 ndbassert(LqhKeyReq::getSeqNoReplica(requestInfo) == 0);
2860 ndbassert(LqhKeyReq::getLastReplicaNo(requestInfo) == 0);
2861 ndbassert(LqhKeyReq::getApplicationAddressFlag(requestInfo) != 0);
2862 ndbassert(LqhKeyReq::getSameClientAndTcFlag(requestInfo) == 0);
2868 static Uint8 getDirtyFlag(
const UintR & requestInfo);
2869 static Uint8 getSimpleFlag(
const UintR & requestInfo);
2872 Uint32 dst_requestInfo = dst->requestInfo;
2873 ndbassert(LqhKeyReq::getInterpretedFlag(requestInfo) ==
2874 LqhKeyReq::getInterpretedFlag(dst_requestInfo));
2875 ndbassert(LqhKeyReq::getNoDiskFlag(requestInfo) ==
2876 LqhKeyReq::getNoDiskFlag(dst_requestInfo));
2878 dst->hashValue = hashValue;
2879 dst->fragmentData = fragId;
2880 dst->attrLen = attrLen;
2882 treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
2892 Dbspj::lookup_start(
Signal* signal,
2896 lookup_send(signal, requestPtr, treeNodePtr);
2900 Dbspj::lookup_send(
Signal* signal,
2907 if (treeNodePtr.p->isLeaf())
2910 if (requestPtr.p->isLookup())
2924 memcpy(req, treeNodePtr.p->m_lookup_data.m_lqhKeyReq,
2925 sizeof(treeNodePtr.p->m_lookup_data.m_lqhKeyReq));
2927 req->variableData[3] = requestPtr.p->m_rootResultData;
2929 if (!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()))
2932 LqhKeyReq::setNormalProtocolFlag(req->requestInfo, 1);
2933 req->variableData[0] = reference();
2934 req->variableData[1] = treeNodePtr.i;
2943 req->tcBlockref = requestPtr.p->m_senderRef;
2948 Uint32 ref = treeNodePtr.p->m_send.m_ref;
2949 Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
2950 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
2958 treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2959 treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2967 ndbrequire(dupSection(tmp, keyInfoPtrI));
2973 treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2980 ndbrequire(dupSection(tmp, attrInfoPtrI));
2986 treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2990 getSection(handle.m_ptr[0], keyInfoPtrI);
2991 getSection(handle.m_ptr[1], attrInfoPtrI);
2994 #if defined DEBUG_LQHKEYREQ
2995 ndbout_c(
"LQHKEYREQ to %x", ref);
2996 printLQHKEYREQ(stdout, signal->getDataPtrSend(),
2997 NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
2999 printf(
"KEYINFO: ");
3000 print(handle.m_ptr[0], stdout);
3001 printf(
"ATTRINFO: ");
3002 print(handle.m_ptr[1], stdout);
3005 Uint32 Tnode = refToNode(ref);
3006 if (Tnode == getOwnNodeId())
3008 c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1);
3012 c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1);
3015 if (unlikely(!c_alive_nodes.
get(Tnode)))
3018 releaseSections(handle);
3019 abort(signal, requestPtr, DbspjErr::NodeFailure);
3022 else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup()))
3025 ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data));
3026 requestPtr.p->m_outstanding += cnt;
3027 requestPtr.p->m_lookup_node_data[Tnode] += cnt;
3029 ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0));
3032 sendSignal(ref, GSN_LQHKEYREQ, signal,
3033 NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
3037 if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())
3044 Uint32 resultRef = req->variableData[0];
3045 Uint32 resultData = req->variableData[1];
3048 conf->apiConnectPtr = RNIL;
3050 TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3051 conf->transId1 = requestPtr.p->m_transId[0];
3052 conf->transId2 = requestPtr.p->m_transId[1];
3053 conf->operations[0].apiOperationPtr = resultData;
3054 conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode;
3055 Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength;
3056 sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef);
3061 Dbspj::lookup_execTRANSID_AI(
Signal* signal,
3064 const RowPtr & rowRef)
3068 Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3072 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3073 Dependency_map::ConstDataBufferIterator it;
3074 for (list.first(it); !it.isNull(); list.next(it))
3078 m_treenode_pool.
getPtr(childPtr, * it.data);
3079 ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->
m_parent_row!=0);
3081 requestPtr, childPtr,rowRef);
3084 ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3086 ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3087 requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3097 reportBatchComplete(signal, requestPtr, treeNodePtr);
3104 checkBatchComplete(signal, requestPtr, 1);
3108 Dbspj::lookup_execLQHKEYREF(
Signal* signal,
3113 Uint32 errCode = rep->errorCode;
3114 Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3116 c_Counters.incr_counter(CI_READS_NOT_FOUND, 1);
3118 if (requestPtr.p->isLookup())
3123 ndbrequire(!treeNodePtr.p->isLeaf());
3133 Uint32 resultRef = treeNodePtr.p->m_lookup_data.m_api_resultRef;
3134 Uint32 resultData = treeNodePtr.p->m_lookup_data.m_api_resultData;
3136 ref->connectPtr = resultData;
3137 ref->transId[0] = requestPtr.p->m_transId[0];
3138 ref->transId[1] = requestPtr.p->m_transId[1];
3139 ref->errorCode = errCode;
3142 DEBUG(
"lookup_execLQHKEYREF, errorCode:" << errCode);
3144 sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
3154 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3155 Dependency_map::ConstDataBufferIterator it;
3156 ndbrequire(list.first(it));
3157 ndbrequire(list.getSize() == 1);
3159 m_treenode_pool.
getPtr(childPtr, * it.data);
3163 Uint32 resultRef = childPtr.p->m_lookup_data.m_api_resultRef;
3164 Uint32 resultData = childPtr.p->m_lookup_data.m_api_resultData;
3166 conf->apiConnectPtr = RNIL;
3169 TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3170 conf->transId1 = requestPtr.p->m_transId[0];
3171 conf->transId2 = requestPtr.p->m_transId[1];
3172 conf->operations[0].apiOperationPtr = resultData;
3173 conf->operations[0].attrInfoLen =
3174 TcKeyConf::DirtyReadBit |getOwnNodeId();
3175 sendTCKEYCONF(signal, TcKeyConf::StaticLength + 2, resultRef, requestPtr.p->m_senderRef);
3189 abort(signal, requestPtr, errCode);
3194 if (treeNodePtr.p->isLeaf())
3197 ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt);
3198 requestPtr.p->m_lookup_node_data[Tnode] -= cnt;
3208 reportBatchComplete(signal, requestPtr, treeNodePtr);
3215 checkBatchComplete(signal, requestPtr, cnt);
3219 Dbspj::lookup_execLQHKEYCONF(
Signal* signal,
3223 ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3225 Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3230 requestPtr.p->m_rows++;
3233 ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3234 requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3244 reportBatchComplete(signal, requestPtr, treeNodePtr);
3251 checkBatchComplete(signal, requestPtr, 1);
3255 Dbspj::lookup_parent_row(
Signal* signal,
3258 const RowPtr & rowRef)
3268 const Uint32 tableId = LqhKeyReq::getTableId(src->tableSchemaVersion);
3269 const Uint32 corrVal = rowRef.m_src_correlation;
3271 DEBUG(
"::lookup_parent_row");
3279 DEBUG(
"parent_row w/ T_KEYINFO_CONSTRUCTED");
3284 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
3287 err = expand(ptrI, pattern, rowRef, keyIsNull);
3288 if (unlikely(err != 0))
3294 DEBUG(
"Key contain NULL values");
3302 if (requestPtr.p->isScan())
3310 DEBUG(
"..Ignore impossible KEYREQ");
3313 releaseSection(ptrI);
3354 err = createEmptySection(ptrI);
3355 if (unlikely(err != 0))
3359 treeNodePtr.p->m_send.m_keyInfoPtrI = ptrI;
3363 err = computeHash(signal, tmp, tableId, treeNodePtr.p->m_send.m_keyInfoPtrI);
3364 if (unlikely(err != 0))
3367 err = getNodes(signal, tmp, tableId);
3368 if (unlikely(err != 0))
3371 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3376 ndbrequire(dupSection(tmp, attrInfoPtrI));
3381 getSection(ptr, tmp);
3387 Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
3388 err = expand(tmp, pattern, rowRef, hasNull);
3389 if (unlikely(err != 0))
3397 getSection(ptr, tmp);
3398 Uint32 new_size = ptr.sz;
3399 Uint32 * sectionptrs = ptr.p->theData;
3400 sectionptrs[4] = new_size - org_size;
3402 treeNodePtr.p->m_send.m_attrInfoPtrI = tmp;
3425 (corrVal << 16) + (corrVal & 0xffff);
3427 treeNodePtr.p->m_send.m_ref = tmp.receiverRef;
3429 dst->hashValue = tmp.hashInfo[0];
3430 dst->fragmentData = tmp.fragId;
3432 LqhKeyReq::setDistributionKey(attrLen, tmp.fragDistKey);
3433 dst->attrLen = attrLen;
3434 lookup_send(signal, requestPtr, treeNodePtr);
3440 treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
3450 Dbspj::lookup_parent_batch_complete(
Signal* signal,
3473 reportBatchComplete(signal, requestPtr, treeNodePtr);
3482 Dbspj::lookup_abort(
Signal* signal,
3490 Dbspj::lookup_execNODE_FAILREP(
Signal* signal,
3498 while (requestPtr.p->m_outstanding &&
3499 ((node = mask.
find(node + 1)) != NdbNodeBitmask::NotFound))
3501 Uint32 cnt = requestPtr.p->m_lookup_node_data[node];
3503 requestPtr.p->m_lookup_node_data[node] = 0;
3509 ndbrequire(requestPtr.p->m_outstanding >= sum);
3510 requestPtr.p->m_outstanding -= sum;
3520 cleanup_common(requestPtr, treeNodePtr);
3525 Dbspj::handle_special_hash(Uint32 tableId, Uint32 dstHash[4],
3530 const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3531 (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3532 Uint64 alignedWorkspace[MAX_KEY_SIZE_IN_LONG_WORDS * MAX_XFRM_MULTIPLY];
3533 const bool hasVarKeys = desc->noOfVarKeys > 0;
3534 const bool hasCharAttr = desc->hasCharAttr;
3535 const bool compute_distkey = desc->noOfDistrKeys > 0;
3537 const Uint64 *hashInput = 0;
3538 Uint32 inputLen = 0;
3539 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
3540 Uint32 * keyPartLenPtr;
3543 if (hasCharAttr || (compute_distkey && hasVarKeys))
3545 hashInput = alignedWorkspace;
3546 keyPartLenPtr = keyPartLen;
3549 (Uint32*)alignedWorkspace,
3550 sizeof(alignedWorkspace) >> 2,
3552 if (unlikely(inputLen == 0))
3566 md5_hash(dstHash, hashInput, inputLen);
3572 if (compute_distkey)
3576 Uint32 distrKeyHash[4];
3578 Uint32 len = create_distr_key(tableId, (Uint32*)hashInput, (Uint32*)alignedWorkspace, keyPartLenPtr);
3580 md5_hash(distrKeyHash, alignedWorkspace, len);
3583 dstHash[1] = distrKeyHash[1];
3589 Dbspj::computeHash(
Signal* signal,
3590 BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3597 getSection(ptr, ptrI);
3601 const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3602 (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3603 Uint64 tmp64[MAX_KEY_SIZE_IN_LONG_WORDS];
3604 Uint32 *tmp32 = (Uint32*)tmp64;
3607 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3608 ndbrequire(desc != NULL);
3610 bool need_special_hash = desc->hasCharAttr | (desc->noOfDistrKeys > 0);
3611 if (need_special_hash)
3614 return handle_special_hash(tableId, dst.hashInfo, tmp64, ptr.sz, desc);
3619 md5_hash(dst.hashInfo, tmp64, ptr.sz);
3629 Dbspj::computePartitionHash(
Signal* signal,
3630 BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3633 getSection(ptr, ptrI);
3637 const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3638 (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3639 Uint64 _space[MAX_KEY_SIZE_IN_LONG_WORDS];
3640 Uint64 *tmp64 = _space;
3641 Uint32 *tmp32 = (Uint32*)tmp64;
3645 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3646 ndbrequire(desc != NULL);
3648 bool need_xfrm = desc->hasCharAttr || desc->noOfVarKeys;
3657 Uint32 * src = tmp32;
3658 Uint32 * dst = signal->theData+24;
3659 for (Uint32 i = 0; i < desc->noOfKeyAttr; i++)
3662 if (AttributeDescriptor::getDKey(keyAttr.attributeDescriptor))
3664 xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3665 src, srcPos, dst, dstPos,
3666 NDB_ARRAY_SIZE(signal->theData) - 24);
3669 tmp64 = (Uint64*)dst;
3673 md5_hash(dst.hashInfo, tmp64, sz);
3678 Dbspj::getNodes(
Signal* signal, BuildKeyReq& dst, Uint32 tableId)
3682 req->tableId = tableId;
3683 req->hashValue = dst.hashInfo[1];
3684 req->distr_key_indicator = 0;
3689 DiGetNodesReq::SignalLength, 0);
3691 sendSignal(DBDIH_REF, GSN_DIGETNODESREQ, signal,
3692 DiGetNodesReq::SignalLength, JBB);
3698 err = signal->theData[0];
3699 Uint32 Tdata2 = conf->reqinfo;
3700 Uint32 nodeId = conf->nodes[0];
3701 Uint32 instanceKey = (Tdata2 >> 24) & 127;
3703 DEBUG(
"HASH to nodeId:" << nodeId <<
", instanceKey:" << instanceKey);
3706 if (unlikely(err != 0))
3709 dst.fragId = conf->fragId;
3710 dst.fragDistKey = (Tdata2 >> 16) & 255;
3711 dst.receiverRef = numberToRef(DBLQH, instanceKey, nodeId);
3733 Dbspj::g_ScanFragOpInfo =
3735 &Dbspj::scanFrag_build,
3737 &Dbspj::scanFrag_start,
3738 &Dbspj::scanFrag_execTRANSID_AI,
3741 &Dbspj::scanFrag_execSCAN_FRAGREF,
3742 &Dbspj::scanFrag_execSCAN_FRAGCONF,
3747 &Dbspj::scanFrag_execSCAN_NEXTREQ,
3749 &Dbspj::scanFrag_abort,
3751 &Dbspj::scanFrag_cleanup
3755 Dbspj::scanFrag_build(Build_context& ctx,
3767 err = createNode(ctx, requestPtr, treeNodePtr);
3768 if (unlikely(err != 0))
3771 treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = RNIL;
3773 if (unlikely(m_scanfraghandle_pool.
seize(requestPtr.p->m_arena,
3774 scanFragHandlePtr) !=
true))
3776 err = DbspjErr::OutOfQueryMemory;
3780 scanFragHandlePtr.p->m_treeNodePtrI = treeNodePtr.i;
3781 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
3782 treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = scanFragHandlePtr.i;
3784 requestPtr.p->m_bits |= Request::RT_SCAN;
3785 treeNodePtr.p->m_info = &g_ScanFragOpInfo;
3786 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
3787 treeNodePtr.p->m_batch_size = ctx.m_batch_size_rows;
3790 dst->senderData = scanFragHandlePtr.i;
3791 dst->resultRef = reference();
3792 dst->resultData = treeNodePtr.i;
3793 dst->savePointId = ctx.m_savepointId;
3795 Uint32 transId1 = requestPtr.p->m_transId[0];
3796 Uint32 transId2 = requestPtr.p->m_transId[1];
3797 dst->transId1 = transId1;
3798 dst->transId2 = transId2;
3800 Uint32 treeBits = node->requestInfo;
3801 Uint32 paramBits = param->requestInfo;
3804 Uint32 requestInfo = 0;
3805 ScanFragReq::setReadCommittedFlag(requestInfo, 1);
3806 ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
3807 ScanFragReq::setCorrFactorFlag(requestInfo, 1);
3808 ScanFragReq::setNoDiskFlag(requestInfo,
3811 dst->requestInfo = requestInfo;
3813 err = DbspjErr::InvalidTreeNodeSpecification;
3814 DEBUG(
"scanFrag_build: len=" << node->len);
3815 if (unlikely(node->len < QN_ScanFragNode::NodeSize))
3818 dst->tableId = node->tableId;
3819 dst->schemaVersion = node->tableVersion;
3821 err = DbspjErr::InvalidTreeParametersSpecification;
3822 DEBUG(
"param len: " << param->len);
3823 if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
3830 ctx.m_resultData = param->resultData;
3835 struct DABuffer nodeDA, paramDA;
3837 nodeDA.end = nodeDA.ptr + (node->len - QN_ScanFragNode::NodeSize);
3839 paramDA.end = paramDA.ptr + (param->len - QN_ScanFragParameters::NodeSize);
3840 err = parseDA(ctx, requestPtr, treeNodePtr,
3841 nodeDA, treeBits, paramDA, paramBits);
3842 if (unlikely(err != 0))
3850 ctx.m_scans.set(treeNodePtr.p->m_node_no);
3852 if (ctx.m_start_signal)
3855 Signal* signal = ctx.m_start_signal;
3860 blockToInstance(signal->header.theReceiversBlockNumber);
3861 treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
3862 instanceNo, getOwnNodeId());
3864 treeNodePtr.p->m_send.m_ref =
3865 numberToRef(DBLQH, getInstanceKey(src->tableId,
3866 src->fragmentNoKeyLen),
3870 Uint32 fragId = src->fragmentNoKeyLen;
3871 Uint32 requestInfo = src->requestInfo;
3872 Uint32 batch_size_bytes = src->batch_size_bytes;
3873 Uint32 batch_size_rows = src->batch_size_rows;
3876 Uint32 savePointId = src->savePointId;
3877 Uint32 tableId = src->tableId;
3878 Uint32 schemaVersion = src->schemaVersion;
3879 Uint32 transId1 = src->transId1;
3880 Uint32 transId2 = src->transId2;
3882 ndbassert(ScanFragReq::getLockMode(requestInfo) == 0);
3883 ndbassert(ScanFragReq::getHoldLockFlag(requestInfo) == 0);
3884 ndbassert(ScanFragReq::getKeyinfoFlag(requestInfo) == 0);
3885 ndbassert(ScanFragReq::getReadCommittedFlag(requestInfo) == 1);
3886 ndbassert(ScanFragReq::getLcpScanFlag(requestInfo) == 0);
3888 ndbassert(ScanFragReq::getReorgFlag(requestInfo) == 0);
3890 Uint32 tupScanFlag = ScanFragReq::getTupScanFlag(requestInfo);
3891 Uint32 rangeScanFlag = ScanFragReq::getRangeScanFlag(requestInfo);
3892 Uint32 descendingFlag = ScanFragReq::getDescendingFlag(requestInfo);
3893 Uint32 scanPrio = ScanFragReq::getScanPrio(requestInfo);
3895 Uint32 dst_requestInfo = dst->requestInfo;
3897 ScanFragReq::setTupScanFlag(dst_requestInfo,tupScanFlag);
3898 ScanFragReq::setRangeScanFlag(dst_requestInfo,rangeScanFlag);
3899 ScanFragReq::setDescendingFlag(dst_requestInfo,descendingFlag);
3900 ScanFragReq::setScanPrio(dst_requestInfo,scanPrio);
3905 ndbassert(ScanFragReq::getNoDiskFlag(requestInfo) ==
3906 ScanFragReq::getNoDiskFlag(dst_requestInfo));
3908 dst->fragmentNoKeyLen = fragId;
3909 dst->requestInfo = dst_requestInfo;
3910 dst->batch_size_bytes = batch_size_bytes;
3911 dst->batch_size_rows = batch_size_rows;
3914 ndbassert(dst->savePointId == savePointId);
3915 ndbassert(dst->tableId == tableId);
3916 ndbassert(dst->schemaVersion == schemaVersion);
3917 ndbassert(dst->transId1 == transId1);
3918 ndbassert(dst->transId2 == transId2);
3921 treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
3926 c_Counters.incr_counter(CI_RANGE_SCANS_RECEIVED, 1);
3930 c_Counters.incr_counter(CI_TABLE_SCANS_RECEIVED, 1);
3945 Dbspj::scanFrag_start(
Signal* signal,
3949 scanFrag_send(signal, requestPtr, treeNodePtr);
3953 Dbspj::scanFrag_send(
Signal* signal,
3959 requestPtr.p->m_outstanding++;
3960 requestPtr.p->m_cnt_active++;
3963 m_scanfraghandle_pool.
getPtr(scanFragHandlePtr, treeNodePtr.p->
3964 m_scanfrag_data.m_scanFragHandlePtrI);
3968 memcpy(req, treeNodePtr.p->m_scanfrag_data.m_scanFragReq,
3969 sizeof(treeNodePtr.p->m_scanfrag_data.m_scanFragReq));
3971 req->variableData[1] = requestPtr.p->m_rootResultData;
3975 Uint32 ref = treeNodePtr.p->m_send.m_ref;
3976 Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
3977 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3987 treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
3988 treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
3990 getSection(handle.m_ptr[0], attrInfoPtrI);
3993 if (keyInfoPtrI != RNIL)
3996 getSection(handle.m_ptr[1], keyInfoPtrI);
4000 #ifdef DEBUG_SCAN_FRAGREQ
4001 ndbout_c(
"SCAN_FRAGREQ to %x", ref);
4002 printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
4003 NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4005 printf(
"ATTRINFO: ");
4006 print(handle.m_ptr[0], stdout);
4007 if (handle.m_cnt > 1)
4009 printf(
"KEYINFO: ");
4010 print(handle.m_ptr[1], stdout);
4014 if (ScanFragReq::getRangeScanFlag(req->requestInfo))
4016 c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
4020 c_Counters.incr_counter(CI_LOCAL_TABLE_SCANS_SENT, 1);
4023 ndbrequire(refToNode(ref) == getOwnNodeId());
4024 sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
4025 NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4028 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4029 treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4030 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4034 Dbspj::scanFrag_execTRANSID_AI(
Signal* signal,
4037 const RowPtr & rowRef)
4040 treeNodePtr.p->m_scanfrag_data.m_rows_received++;
4043 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
4044 Dependency_map::ConstDataBufferIterator it;
4047 for (list.first(it); !it.isNull(); list.next(it))
4051 m_treenode_pool.
getPtr(childPtr, * it.data);
4052 ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->
m_parent_row!=0);
4054 requestPtr, childPtr,rowRef);
4058 if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
4059 treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
4066 reportBatchComplete(signal, requestPtr, treeNodePtr);
4069 checkBatchComplete(signal, requestPtr, 1);
4075 Dbspj::scanFrag_execSCAN_FRAGREF(
Signal* signal,
4081 reinterpret_cast<const ScanFragRef*
>(signal->getDataPtr());
4082 Uint32 errCode = rep->errorCode;
4084 DEBUG(
"scanFrag_execSCAN_FRAGREF, rep->senderData:" << rep->senderData
4085 <<
", requestPtr.p->m_senderData:" << requestPtr.p->m_senderData);
4086 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4088 ndbrequire(requestPtr.p->m_cnt_active);
4089 requestPtr.p->m_cnt_active--;
4090 ndbrequire(requestPtr.p->m_outstanding);
4091 requestPtr.p->m_outstanding--;
4094 abort(signal, requestPtr, errCode);
4099 Dbspj::scanFrag_execSCAN_FRAGCONF(
Signal* signal,
4105 reinterpret_cast<const ScanFragConf*
>(signal->getDataPtr());
4106 Uint32 rows = conf->completedOps;
4107 Uint32 done = conf->fragmentCompleted;
4109 Uint32 state = scanFragHandlePtr.p->m_state;
4110 if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
4119 ndbrequire(done <= 2);
4121 ndbassert(treeNodePtr.p->m_scanfrag_data.m_rows_expecting == ~Uint32(0));
4122 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = rows;
4123 if (treeNodePtr.p->isLeaf())
4129 treeNodePtr.p->m_scanfrag_data.m_rows_received = rows;
4132 requestPtr.p->m_rows += rows;
4137 ndbrequire(requestPtr.p->m_cnt_active);
4138 requestPtr.p->m_cnt_active--;
4140 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4145 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
4148 if (treeNodePtr.p->m_scanfrag_data.m_rows_expecting ==
4149 treeNodePtr.p->m_scanfrag_data.m_rows_received ||
4150 (state == ScanFragHandle::SFH_WAIT_CLOSE))
4157 reportBatchComplete(signal, requestPtr, treeNodePtr);
4160 checkBatchComplete(signal, requestPtr, 1);
4166 Dbspj::scanFrag_execSCAN_NEXTREQ(
Signal* signal,
4173 m_scanfraghandle_pool.
getPtr(scanFragHandlePtr, treeNodePtr.p->
4174 m_scanfrag_data.m_scanFragHandlePtrI);
4177 (
ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
4180 reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4181 req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4182 req->requestInfo = 0;
4183 req->transId1 = requestPtr.p->m_transId[0];
4184 req->transId2 = requestPtr.p->m_transId[1];
4185 req->batch_size_rows = org->batch_size_rows;
4186 req->batch_size_bytes = org->batch_size_bytes;
4188 DEBUG(
"scanFrag_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref
4189 <<
", senderData: " << req->senderData);
4190 #ifdef DEBUG_SCAN_FRAGREQ
4191 printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
4192 ScanFragNextReq::SignalLength, DBLQH);
4195 sendSignal(treeNodePtr.p->m_send.m_ref,
4198 ScanFragNextReq::SignalLength,
4201 treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4202 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4203 requestPtr.p->m_outstanding++;
4204 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4208 Dbspj::scanFrag_abort(
Signal* signal,
4215 m_scanfraghandle_pool.
getPtr(scanFragHandlePtr, treeNodePtr.p->
4216 m_scanfrag_data.m_scanFragHandlePtrI);
4221 switch(scanFragHandlePtr.p->m_state){
4222 case ScanFragHandle::SFH_NOT_STARTED:
4223 case ScanFragHandle::SFH_COMPLETE:
4226 case ScanFragHandle::SFH_WAIT_CLOSE:
4230 case ScanFragHandle::SFH_WAIT_NEXTREQ:
4233 requestPtr.p->m_outstanding++;
4235 case ScanFragHandle::SFH_SCANNING:
4240 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4241 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
4245 req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4246 req->requestInfo = ScanFragNextReq::ZCLOSE;
4247 req->transId1 = requestPtr.p->m_transId[0];
4248 req->transId2 = requestPtr.p->m_transId[1];
4249 req->batch_size_rows = 0;
4250 req->batch_size_bytes = 0;
4252 sendSignal(treeNodePtr.p->m_send.m_ref,
4255 ScanFragNextReq::SignalLength,
4265 Uint32 ptrI = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4268 m_scanfraghandle_pool.
release(ptrI);
4270 cleanup_common(requestPtr, treeNodePtr);
4283 Dbspj::g_ScanIndexOpInfo =
4285 &Dbspj::scanIndex_build,
4286 &Dbspj::scanIndex_prepare,
4288 &Dbspj::scanIndex_execTRANSID_AI,
4291 &Dbspj::scanIndex_execSCAN_FRAGREF,
4292 &Dbspj::scanIndex_execSCAN_FRAGCONF,
4293 &Dbspj::scanIndex_parent_row,
4294 &Dbspj::scanIndex_parent_batch_complete,
4295 &Dbspj::scanIndex_parent_batch_repeat,
4296 &Dbspj::scanIndex_parent_batch_cleanup,
4297 &Dbspj::scanIndex_execSCAN_NEXTREQ,
4298 &Dbspj::scanIndex_complete,
4299 &Dbspj::scanIndex_abort,
4300 &Dbspj::scanIndex_execNODE_FAILREP,
4301 &Dbspj::scanIndex_cleanup
4305 Dbspj::scanIndex_build(Build_context& ctx,
4317 err = createNode(ctx, requestPtr, treeNodePtr);
4318 if (unlikely(err != 0))
4321 Uint32 batchSize = param->batchSize;
4323 requestPtr.p->m_bits |= Request::RT_SCAN;
4324 requestPtr.p->m_bits |= Request::RT_NEED_PREPARE;
4325 requestPtr.p->m_bits |= Request::RT_NEED_COMPLETE;
4326 treeNodePtr.p->m_info = &g_ScanIndexOpInfo;
4327 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
4329 treeNodePtr.p->m_batch_size =
4330 batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4333 dst->senderData = treeNodePtr.i;
4334 dst->resultRef = reference();
4335 dst->resultData = treeNodePtr.i;
4336 dst->savePointId = ctx.m_savepointId;
4337 dst->batch_size_rows =
4338 batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4339 dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits;
4341 Uint32 transId1 = requestPtr.p->m_transId[0];
4342 Uint32 transId2 = requestPtr.p->m_transId[1];
4343 dst->transId1 = transId1;
4344 dst->transId2 = transId2;
4346 Uint32 treeBits = node->requestInfo;
4347 Uint32 paramBits = param->requestInfo;
4348 Uint32 requestInfo = 0;
4349 ScanFragReq::setRangeScanFlag(requestInfo, 1);
4350 ScanFragReq::setReadCommittedFlag(requestInfo, 1);
4351 ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
4352 ScanFragReq::setNoDiskFlag(requestInfo,
4353 (treeBits & DABits::NI_LINKED_DISK) == 0 &&
4355 ScanFragReq::setCorrFactorFlag(requestInfo, 1);
4356 dst->requestInfo = requestInfo;
4358 err = DbspjErr::InvalidTreeNodeSpecification;
4359 DEBUG(
"scanIndex_build: len=" << node->len);
4360 if (unlikely(node->len < QN_ScanIndexNode::NodeSize))
4363 dst->tableId = node->tableId;
4364 dst->schemaVersion = node->tableVersion;
4366 err = DbspjErr::InvalidTreeParametersSpecification;
4367 DEBUG(
"param len: " << param->len);
4368 if (unlikely(param->len < QN_ScanIndexParameters::NodeSize))
4375 ctx.m_resultData = param->resultData;
4380 struct DABuffer nodeDA, paramDA;
4382 nodeDA.end = nodeDA.ptr + (node->len - QN_ScanIndexNode::NodeSize);
4384 paramDA.end = paramDA.ptr + (param->len - QN_ScanIndexParameters::NodeSize);
4386 err = parseScanIndex(ctx, requestPtr, treeNodePtr,
4387 nodeDA, treeBits, paramDA, paramBits);
4389 if (unlikely(err != 0))
4401 nodePtr.i = treeNodePtr.p->m_parentPtrI;
4402 while (nodePtr.i != RNIL)
4405 m_treenode_pool.
getPtr(nodePtr);
4408 nodePtr.i = nodePtr.p->m_parentPtrI;
4420 if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
4421 !treeNodePtr.p->m_ancestors.
contains(ctx.m_scans))
4427 ctx.m_scans.set(treeNodePtr.p->m_node_no);
4436 Dbspj::parseScanIndex(Build_context& ctx,
4439 DABuffer tree, Uint32 treeBits,
4440 DABuffer param, Uint32 paramBits)
4451 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4452 data.m_fragments.init();
4453 data.m_frags_outstanding = 0;
4454 data.m_frags_complete = 0;
4455 data.m_frags_not_started = 0;
4456 data.m_parallelismStat.init();
4457 data.m_firstExecution =
true;
4458 data.m_batch_chunks = 0;
4460 err = parseDA(ctx, requestPtr, treeNodePtr,
4461 tree, treeBits, param, paramBits);
4462 if (unlikely(err != 0))
4465 if (treeBits & Node::SI_PRUNE_PATTERN)
4467 Uint32 len_cnt = * tree.ptr ++;
4468 Uint32 len = len_cnt & 0xFFFF;
4469 Uint32 cnt = len_cnt >> 16;
4472 ndbrequire((cnt==0) == ((treeBits & Node::SI_PRUNE_PARAMS) ==0));
4473 ndbrequire((cnt==0) == ((paramBits & Params::SIP_PRUNE_PARAMS)==0));
4475 if (treeBits & Node::SI_PRUNE_LINKED)
4478 DEBUG(
"LINKED-PRUNE PATTERN w/ " << cnt <<
" PARAM values");
4480 data.m_prunePattern.init();
4481 Local_pattern_store pattern(pool, data.m_prunePattern);
4486 err = expand(pattern, treeNodePtr, tree, len, param, cnt);
4487 if (unlikely(err != 0))
4491 c_Counters.incr_counter(CI_PRUNED_RANGE_SCANS_RECEIVED, 1);
4496 DEBUG(
"FIXED-PRUNE w/ " << cnt <<
" PARAM values");
4503 Uint32 prunePtrI = RNIL;
4505 err = expand(prunePtrI, tree, len, param, cnt, hasNull);
4506 if (unlikely(err != 0))
4509 if (unlikely(hasNull))
4513 DEBUG(
"BEWARE: T_CONST_PRUNE-key contain NULL values");
4518 ndbrequire(prunePtrI != RNIL);
4519 data.m_constPrunePtrI = prunePtrI;
4526 c_Counters.incr_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED, 1);
4531 ((treeBits & Node::SI_PARALLEL) ||
4532 ((paramBits & Params::SIP_PARALLEL))))
4546 Dbspj::scanIndex_prepare(
Signal * signal,
4555 req->senderRef = reference();
4556 req->senderData = treeNodePtr.i;
4557 req->tableId = dst->tableId;
4558 req->schemaTransId = 0;
4559 sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
4560 DihScanTabReq::SignalLength, JBB);
4562 requestPtr.p->m_outstanding++;
4566 Dbspj::execDIH_SCAN_TAB_REF(
Signal* signal)
4573 Dbspj::execDIH_SCAN_TAB_CONF(
Signal* signal)
4579 m_treenode_pool.
getPtr(treeNodePtr, conf->senderData);
4580 ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4582 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4584 Uint32 cookie = conf->scanCookie;
4585 Uint32 fragCount = conf->fragmentCount;
4588 if (conf->reorgFlag)
4591 ScanFragReq::setReorgFlag(dst->requestInfo, 1);
4594 data.m_fragCount = fragCount;
4595 data.m_scanCookie = cookie;
4598 bool pruned = (treeNodePtr.p->m_bits & prunemask) != 0;
4601 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4604 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4605 if (likely(m_scanfraghandle_pool.
seize(requestPtr.p->m_arena, fragPtr)))
4609 fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4610 list.addLast(fragPtr);
4627 Uint32 indexId = dst->tableId;
4628 Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4629 Uint32 err = computePartitionHash(signal, tmp, tableId, data.m_constPrunePtrI);
4630 if (unlikely(err != 0))
4633 releaseSection(data.m_constPrunePtrI);
4634 data.m_constPrunePtrI = RNIL;
4636 err = getNodes(signal, tmp, tableId);
4637 if (unlikely(err != 0))
4640 fragPtr.p->m_fragId = tmp.fragId;
4641 fragPtr.p->m_ref = tmp.receiverRef;
4642 data.m_fragCount = 1;
4644 else if (fragCount == 1)
4655 Local_pattern_store pattern(pool, data.m_prunePattern);
4658 data.m_constPrunePtrI = RNIL;
4660 treeNodePtr.p->m_bits &= ~clear;
4670 for (Uint32 i = 1; i<fragCount; i++)
4674 if (likely(m_scanfraghandle_pool.
seize(requestPtr.p->m_arena, fragPtr)))
4678 fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4679 list.addLast(fragPtr);
4687 data.m_frags_complete = data.m_fragCount;
4692 Uint32 tableId = ((
ScanFragReq*)data.m_scanFragReq)->tableId;
4694 req->senderRef = reference();
4695 req->tableId = tableId;
4696 req->scanCookie = cookie;
4699 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4702 req->senderData = fragPtr.i;
4703 req->fragId = fragPtr.p->m_fragId;
4704 sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
4705 DihScanGetNodesReq::SignalLength, JBB);
4708 data.m_frags_outstanding = cnt;
4709 requestPtr.p->m_outstanding++;
4716 checkPrepareComplete(signal, requestPtr, 1);
4726 Dbspj::execDIH_SCAN_GET_NODES_REF(
Signal* signal)
4733 Dbspj::execDIH_SCAN_GET_NODES_CONF(
Signal* signal)
4739 Uint32 senderData = conf->senderData;
4740 Uint32 node = conf->nodes[0];
4741 Uint32 instanceKey = conf->instanceKey;
4744 m_scanfraghandle_pool.
getPtr(fragPtr, senderData);
4746 m_treenode_pool.
getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI);
4747 ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4748 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4749 ndbrequire(data.m_frags_outstanding > 0);
4750 data.m_frags_outstanding--;
4752 fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node);
4754 if (data.m_frags_outstanding == 0)
4761 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4762 checkPrepareComplete(signal, requestPtr, 1);
4767 Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list,
4770 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4773 if (fragPtr.p->m_fragId == fragId)
4784 Dbspj::scanIndex_parent_row(
Signal* signal,
4787 const RowPtr & rowRef)
4792 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4802 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4813 Local_pattern_store pattern(pool, data.m_prunePattern);
4814 Uint32 pruneKeyPtrI = RNIL;
4816 err = expand(pruneKeyPtrI, pattern, rowRef, hasNull);
4817 if (unlikely(err != 0))
4823 if (unlikely(hasNull))
4826 DEBUG(
"T_PRUNE_PATTERN-key contain NULL values");
4829 if (pruneKeyPtrI != RNIL)
4831 releaseSection(pruneKeyPtrI);
4842 Uint32 indexId = dst->tableId;
4843 Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4844 err = computePartitionHash(signal, tmp, tableId, pruneKeyPtrI);
4845 releaseSection(pruneKeyPtrI);
4846 if (unlikely(err != 0))
4852 err = getNodes(signal, tmp, tableId);
4853 if (unlikely(err != 0))
4859 err = scanIndex_findFrag(list, fragPtr, tmp.fragId);
4860 if (unlikely(err != 0))
4875 fragPtr.p->m_ref = tmp.receiverRef;
4884 list.first(fragPtr);
4887 Uint32 ptrI = fragPtr.p->m_rangePtrI;
4892 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
4893 err = expand(ptrI, pattern, rowRef, hasNull);
4894 if (unlikely(err != 0))
4907 fragPtr.p->m_rangePtrI = ptrI;
4908 scanIndex_fixupBound(fragPtr, ptrI, rowRef.m_src_correlation);
4917 scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr);
4929 Uint32 ptrI, Uint32 corrVal)
4937 ndbrequire(r0.step(fragPtr.p->m_range_builder.m_range_size));
4938 Uint32 boundsz = r0.getSize() - fragPtr.p->m_range_builder.m_range_size;
4939 Uint32 boundno = fragPtr.p->m_range_builder.m_range_cnt + 1;
4942 ndbrequire(r0.peekWord(&tmp));
4943 tmp |= (boundsz << 16) | ((corrVal & 0xFFF) << 4);
4944 ndbrequire(r0.updateWord(tmp));
4945 ndbrequire(r0.step(1));
4952 ndbrequire(r0.peekWord(&tmp));
4954 Uint32 len = ah.getByteSize();
4956 ndbrequire(r0.updateWord(tmp));
4957 len32 = (len + 3) >> 2;
4958 }
while (r0.step(2 + len32));
4960 fragPtr.p->m_range_builder.m_range_cnt = boundno;
4961 fragPtr.p->m_range_builder.m_range_size = r0.getSize();
4965 Dbspj::scanIndex_parent_batch_complete(
Signal* signal,
4971 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4972 data.m_rows_received = 0;
4973 data.m_rows_expecting = 0;
4974 ndbassert(data.m_frags_outstanding == 0);
4975 ndbassert(data.m_frags_complete == data.m_fragCount);
4976 data.m_frags_complete = 0;
4980 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4981 list.first(fragPtr);
4985 if (fragPtr.p->m_rangePtrI == RNIL)
4989 data.m_frags_complete = data.m_fragCount;
4994 while(!fragPtr.isNull())
4996 if (fragPtr.p->m_rangePtrI == RNIL)
5003 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5004 data.m_frags_complete++;
5010 data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
5012 if (data.m_frags_complete == data.m_fragCount)
5025 ndbrequire(org->batch_size_rows > 0);
5030 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5031 org->batch_size_rows);
5033 else if (data.m_firstExecution)
5045 data.m_parallelism = 1;
5057 static_cast<Int32
>(MIN(data.m_parallelismStat.getMean()
5058 - 2 * data.m_parallelismStat.getStdDev(),
5059 org->batch_size_rows));
5061 if (parallelism < 1)
5066 else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
5075 const Int32 roundTrips =
5076 1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
5077 parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
5080 data.m_parallelism =
static_cast<Uint32
>(parallelism);
5082 #ifdef DEBUG_SCAN_FRAGREQ
5083 DEBUG(
"::scanIndex_send() starting index scan with parallelism="
5084 << data.m_parallelism);
5087 ndbrequire(data.m_parallelism > 0);
5089 const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
5090 const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
5091 ndbassert(bs_rows > 0);
5092 ndbassert(bs_bytes > 0);
5094 data.m_largestBatchRows = 0;
5095 data.m_largestBatchBytes = 0;
5096 data.m_totalRows = 0;
5097 data.m_totalBytes = 0;
5100 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5102 list.first(fragPtr);
5104 while(!fragPtr.isNull())
5106 ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
5107 fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
5108 fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
5113 Uint32 batchRange = 0;
5114 scanIndex_send(signal,
5122 data.m_firstExecution =
false;
5124 ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
5125 data.m_frags_complete) <=
5128 data.m_batch_chunks = 1;
5129 requestPtr.p->m_cnt_active++;
5130 requestPtr.p->m_outstanding++;
5135 Dbspj::scanIndex_parent_batch_repeat(
Signal* signal,
5140 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5142 DEBUG(
"scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
5143 <<
", m_batch_chunks: " << data.m_batch_chunks);
5151 if (data.m_batch_chunks > 1)
5154 DEBUG(
"Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no);
5156 registerActiveCursor(requestPtr, treeNodePtr);
5157 data.m_batch_chunks = 0;
5165 Dbspj::scanIndex_send(
Signal* signal,
5178 const bool prune = treeNodePtr.p->m_bits &
5185 const bool repeatable =
5188 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5189 ndbassert(noOfFrags > 0);
5190 ndbassert(data.m_frags_not_started >= noOfFrags);
5192 reinterpret_cast<ScanFragReq*
>(signal->getDataPtrSend());
5194 =
reinterpret_cast<ScanFragReq*
>(data.m_scanFragReq);
5195 memcpy(req, org,
sizeof(data.m_scanFragReq));
5197 req->variableData[1] = requestPtr.p->m_rootResultData;
5198 req->batch_size_bytes = bs_bytes;
5199 req->batch_size_rows = bs_rows;
5201 Uint32 requestsSent = 0;
5202 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5204 list.first(fragPtr);
5205 Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
5206 ndbrequire(prune || keyInfoPtrI != RNIL);
5211 while (requestsSent < noOfFrags)
5214 ndbassert(!fragPtr.isNull());
5216 if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
5224 const Uint32 ref = fragPtr.p->m_ref;
5226 if (noOfFrags==1 && !prune &&
5227 data.m_frags_not_started == data.m_fragCount &&
5228 refToNode(ref) != getOwnNodeId() &&
5229 list.hasNext(fragPtr))
5246 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
5251 req->senderData = fragPtr.i;
5252 req->fragmentNoKeyLen = fragPtr.p->m_fragId;
5257 keyInfoPtrI = fragPtr.p->m_rangePtrI;
5258 if (keyInfoPtrI == RNIL)
5265 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5279 ndbrequire(dupSection(tmp, attrInfoPtrI));
5284 req->variableData[0] = batchRange;
5285 getSection(handle.m_ptr[0], attrInfoPtrI);
5286 getSection(handle.m_ptr[1], keyInfoPtrI);
5289 #if defined DEBUG_SCAN_FRAGREQ
5290 ndbout_c(
"SCAN_FRAGREQ to %x", ref);
5291 printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
5292 NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
5294 printf(
"ATTRINFO: ");
5295 print(handle.m_ptr[0], stdout);
5296 printf(
"KEYINFO: ");
5297 print(handle.m_ptr[1], stdout);
5300 if (refToNode(ref) == getOwnNodeId())
5302 c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
5306 c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
5309 if (prune && !repeatable)
5317 sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
5318 NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5319 fragPtr.p->m_rangePtrI = RNIL;
5320 fragPtr.p->reset_ranges();
5330 NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5334 fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
5335 data.m_frags_outstanding++;
5336 batchRange += bs_rows;
5341 data.m_frags_not_started -= requestsSent;
5345 Dbspj::scanIndex_execTRANSID_AI(
Signal* signal,
5348 const RowPtr & rowRef)
5353 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
5354 Dependency_map::ConstDataBufferIterator it;
5357 for (list.first(it); !it.isNull(); list.next(it))
5361 m_treenode_pool.
getPtr(childPtr, * it.data);
5362 ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->
m_parent_row!=0);
5364 requestPtr, childPtr,rowRef);
5368 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5369 data.m_rows_received++;
5371 if (data.m_frags_outstanding == 0 &&
5372 data.m_rows_received == data.m_rows_expecting)
5381 reportBatchComplete(signal, requestPtr, treeNodePtr);
5384 checkBatchComplete(signal, requestPtr, 1);
5390 Dbspj::scanIndex_execSCAN_FRAGCONF(
Signal* signal,
5399 Uint32 rows = conf->completedOps;
5400 Uint32 done = conf->fragmentCompleted;
5402 Uint32 state = fragPtr.p->m_state;
5403 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5405 if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
5414 requestPtr.p->m_rows += rows;
5415 data.m_totalRows += rows;
5416 data.m_totalBytes += conf->total_len;
5417 data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
5418 data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
5420 if (!treeNodePtr.p->isLeaf())
5423 data.m_rows_expecting += rows;
5425 ndbrequire(data.m_frags_outstanding);
5426 ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5427 state == ScanFragHandle::SFH_WAIT_CLOSE);
5429 data.m_frags_outstanding--;
5430 fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
5435 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5436 ndbrequire(data.m_frags_complete < data.m_fragCount);
5437 data.m_frags_complete++;
5439 if (data.m_frags_complete == data.m_fragCount ||
5440 ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
5441 data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
5444 ndbrequire(requestPtr.p->m_cnt_active);
5445 requestPtr.p->m_cnt_active--;
5451 if (data.m_frags_outstanding == 0)
5454 =
reinterpret_cast<const ScanFragReq*
>(data.m_scanFragReq);
5456 if (data.m_frags_complete == data.m_fragCount)
5465 double parallelism = data.m_fragCount;
5466 if (data.m_totalRows > 0)
5468 parallelism = MIN(parallelism,
5469 double(org->batch_size_rows) / data.m_totalRows);
5471 if (data.m_totalBytes > 0)
5473 parallelism = MIN(parallelism,
5474 double(org->batch_size_bytes) / data.m_totalBytes);
5476 data.m_parallelismStat.update(parallelism);
5482 if (state == ScanFragHandle::SFH_WAIT_CLOSE)
5485 ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
5487 else if (! (data.m_rows_received == data.m_rows_expecting))
5497 reportBatchComplete(signal, requestPtr, treeNodePtr);
5501 checkBatchComplete(signal, requestPtr, 1);
5507 Dbspj::scanIndex_execSCAN_FRAGREF(
Signal* signal,
5515 const Uint32 errCode = rep->errorCode;
5517 Uint32 state = fragPtr.p->m_state;
5518 ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5519 state == ScanFragHandle::SFH_WAIT_CLOSE);
5521 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5523 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5524 ndbrequire(data.m_frags_complete < data.m_fragCount);
5525 data.m_frags_complete++;
5526 ndbrequire(data.m_frags_outstanding > 0);
5527 data.m_frags_outstanding--;
5529 if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5532 ndbrequire(requestPtr.p->m_cnt_active);
5533 requestPtr.p->m_cnt_active--;
5537 if (data.m_frags_outstanding == 0)
5540 ndbrequire(requestPtr.p->m_outstanding);
5541 requestPtr.p->m_outstanding--;
5544 abort(signal, requestPtr, errCode);
5548 Dbspj::scanIndex_execSCAN_NEXTREQ(
Signal* signal,
5554 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5557 data.m_rows_received = 0;
5558 data.m_rows_expecting = 0;
5559 ndbassert(data.m_frags_outstanding == 0);
5561 ndbrequire(data.m_frags_complete < data.m_fragCount);
5570 if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
5571 data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
5574 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5575 org->batch_size_rows);
5576 if (data.m_largestBatchRows > 0)
5579 data.m_parallelism =
5580 MIN(org->batch_size_rows / data.m_largestBatchRows,
5581 data.m_parallelism);
5583 if (data.m_largestBatchBytes > 0)
5586 data.m_parallelism =
5587 MIN(data.m_parallelism,
5588 org->batch_size_bytes/data.m_largestBatchBytes);
5590 if (data.m_frags_complete == 0 &&
5591 data.m_frags_not_started % data.m_parallelism != 0)
5600 const Uint32 roundTrips =
5601 1 + data.m_frags_not_started / data.m_parallelism;
5602 data.m_parallelism = data.m_frags_not_started / roundTrips;
5609 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5610 MAX(1, data.m_parallelism/2));
5612 ndbassert(data.m_parallelism > 0);
5613 #ifdef DEBUG_SCAN_FRAGREQ
5614 DEBUG(
"::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
5615 data.m_parallelism <<
5616 " fragments with " << org->batch_size_rows/data.m_parallelism <<
5617 " rows and " << org->batch_size_bytes/data.m_parallelism <<
5624 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5625 org->batch_size_rows);
5628 const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
5629 ndbassert(bs_rows > 0);
5632 req->requestInfo = 0;
5633 ScanFragNextReq::setCorrFactorFlag(req->requestInfo);
5634 req->transId1 = requestPtr.p->m_transId[0];
5635 req->transId2 = requestPtr.p->m_transId[1];
5636 req->batch_size_rows = bs_rows;
5637 req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
5639 Uint32 batchRange = 0;
5641 Uint32 sentFragCount = 0;
5646 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5647 list.first(fragPtr);
5648 while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
5651 ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
5652 fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
5653 fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
5654 if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
5658 data.m_frags_outstanding++;
5659 req->variableData[0] = batchRange;
5660 fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
5661 batchRange += bs_rows;
5663 DEBUG(
"scanIndex_execSCAN_NEXTREQ to: " << hex
5664 << treeNodePtr.p->m_send.m_ref
5665 <<
", m_node_no=" << treeNodePtr.p->m_node_no
5666 <<
", senderData: " << req->senderData);
5668 #ifdef DEBUG_SCAN_FRAGREQ
5669 printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
5670 ScanFragNextReq:: SignalLength + 1, DBLQH);
5673 req->senderData = fragPtr.i;
5674 sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5675 ScanFragNextReq::SignalLength + 1,
5683 if (sentFragCount < data.m_parallelism)
5689 ndbassert(data.m_frags_not_started != 0);
5690 scanIndex_send(signal,
5693 data.m_parallelism - sentFragCount,
5694 org->batch_size_bytes/data.m_parallelism,
5703 ndbrequire(data.m_frags_outstanding > 0);
5704 ndbrequire(data.m_batch_chunks > 0);
5705 data.m_batch_chunks++;
5707 requestPtr.p->m_outstanding++;
5712 Dbspj::scanIndex_complete(
Signal* signal,
5717 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5719 if (!data.m_fragments.isEmpty())
5723 rep->tableId = dst->tableId;
5724 rep->scanCookie = data.m_scanCookie;
5725 sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP,
5726 signal, DihScanTabCompleteRep::SignalLength, JBB);
5731 Dbspj::scanIndex_abort(
Signal* signal,
5737 switch(treeNodePtr.p->m_state){
5743 ndbout_c(
"H'%.8x H'%.8x scanIndex_abort state: %u",
5744 requestPtr.p->m_transId[0],
5745 requestPtr.p->m_transId[1],
5746 treeNodePtr.p->m_state);
5755 req->requestInfo = ScanFragNextReq::ZCLOSE;
5756 req->transId1 = requestPtr.p->m_transId[0];
5757 req->transId2 = requestPtr.p->m_transId[1];
5758 req->batch_size_rows = 0;
5759 req->batch_size_bytes = 0;
5761 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5762 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5765 Uint32 cnt_waiting = 0;
5766 Uint32 cnt_scanning = 0;
5767 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5769 switch(fragPtr.p->m_state){
5770 case ScanFragHandle::SFH_NOT_STARTED:
5771 case ScanFragHandle::SFH_COMPLETE:
5772 case ScanFragHandle::SFH_WAIT_CLOSE:
5775 case ScanFragHandle::SFH_WAIT_NEXTREQ:
5778 data.m_frags_outstanding++;
5780 case ScanFragHandle::SFH_SCANNING:
5785 req->senderData = fragPtr.i;
5786 sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5787 ScanFragNextReq::SignalLength, JBB);
5789 fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
5794 if (cnt_scanning == 0)
5796 if (cnt_waiting > 0)
5802 requestPtr.p->m_outstanding++;
5811 ndbassert(data.m_frags_not_started > 0);
5812 ndbrequire(requestPtr.p->m_cnt_active);
5813 requestPtr.p->m_cnt_active--;
5820 Dbspj::scanIndex_execNODE_FAILREP(
Signal* signal,
5827 switch(treeNodePtr.p->m_state){
5844 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5845 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5848 Uint32 save0 = data.m_frags_outstanding;
5849 Uint32 save1 = data.m_frags_complete;
5851 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5853 if (nodes.
get(refToNode(fragPtr.p->m_ref)) ==
false)
5862 switch(fragPtr.p->m_state){
5863 case ScanFragHandle::SFH_NOT_STARTED:
5865 ndbrequire(data.m_frags_complete < data.m_fragCount);
5866 data.m_frags_complete++;
5867 ndbrequire(data.m_frags_not_started > 0);
5868 data.m_frags_not_started--;
5870 case ScanFragHandle::SFH_COMPLETE:
5879 case ScanFragHandle::SFH_WAIT_CLOSE:
5880 case ScanFragHandle::SFH_SCANNING:
5882 ndbrequire(data.m_frags_outstanding > 0);
5883 data.m_frags_outstanding--;
5885 case ScanFragHandle::SFH_WAIT_NEXTREQ:
5888 ndbrequire(data.m_frags_complete < data.m_fragCount);
5889 data.m_frags_complete++;
5892 fragPtr.p->m_ref = 0;
5893 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5896 if (save0 != 0 && data.m_frags_outstanding == 0)
5899 ndbrequire(requestPtr.p->m_outstanding);
5900 requestPtr.p->m_outstanding--;
5904 data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5907 ndbrequire(requestPtr.p->m_cnt_active);
5908 requestPtr.p->m_cnt_active--;
5916 Dbspj::scanIndex_release_rangekeys(
Ptr<Request> requestPtr,
5920 DEBUG(
"scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
5921 <<
" m_node_no: " << treeNodePtr.p->m_node_no);
5923 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5924 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5930 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5932 if (fragPtr.p->m_rangePtrI != RNIL)
5934 releaseSection(fragPtr.p->m_rangePtrI);
5935 fragPtr.p->m_rangePtrI = RNIL;
5937 fragPtr.p->reset_ranges();
5943 list.first(fragPtr);
5944 if (fragPtr.p->m_rangePtrI != RNIL)
5946 releaseSection(fragPtr.p->m_rangePtrI);
5947 fragPtr.p->m_rangePtrI = RNIL;
5949 fragPtr.p->reset_ranges();
5959 Dbspj::scanIndex_parent_batch_cleanup(
Ptr<Request> requestPtr,
5962 DEBUG(
"scanIndex_parent_batch_cleanup");
5963 scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5970 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5971 DEBUG(
"scanIndex_cleanup");
5977 scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5980 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5987 Local_pattern_store pattern(pool, data.m_prunePattern);
5993 if (data.m_constPrunePtrI != RNIL)
5996 releaseSection(data.m_constPrunePtrI);
5997 data.m_constPrunePtrI = RNIL;
6001 cleanup_common(requestPtr, treeNodePtr);
6012 Dbspj::getOpInfo(Uint32 op)
6014 DEBUG(
"getOpInfo(" << op <<
")");
6016 case QueryNode::QN_LOOKUP:
6017 return &Dbspj::g_LookupOpInfo;
6018 case QueryNode::QN_SCAN_FRAG:
6019 return &Dbspj::g_ScanFragOpInfo;
6020 case QueryNode::QN_SCAN_INDEX:
6021 return &Dbspj::g_ScanIndexOpInfo;
6038 const Uint32 * ptr = buffer.ptr;
6039 if (likely(ptr != buffer.end))
6041 Uint32 tmp = * ptr++;
6042 Uint32 cnt = tmp & 0xFFFF;
6044 * dst ++ = (tmp >> 16);
6045 DEBUG(
"cnt: " << cnt <<
" first: " << (tmp >> 16));
6049 Uint32 len = cnt / 2;
6050 if (unlikely(cnt >= dstLen || (ptr + len > buffer.end)))
6055 for (Uint32 i = 0; i < cnt/2; i++)
6057 * dst++ = (* ptr) & 0xFFFF;
6058 * dst++ = (* ptr) >> 16;
6064 * dst ++ = * ptr & 0xFFFF;
6087 Uint32 * dst = header->m_offset;
6088 const Uint32 *
const save = dst;
6095 len = AttributeHeader::getDataSize(tmp);
6097 }
while (r0.step(len));
6099 return header->m_len =
static_cast<Uint32
>(dst - save);
6107 Dbspj::buildRowHeader(RowPtr::Header * header,
const Uint32 *& src, Uint32 len)
6109 Uint32 * dst = header->m_offset;
6110 const Uint32 * save = dst;
6112 for (Uint32 i = 0; i<len; i++)
6115 Uint32 tmp = * src++;
6116 Uint32 tmp_len = AttributeHeader::getDataSize(tmp);
6117 offset += 1 + tmp_len;
6121 return header->m_len =
static_cast<Uint32
>(dst - save);
6125 Dbspj::appendToPattern(Local_pattern_store & pattern,
6126 DABuffer & tree, Uint32 len)
6128 if (unlikely(tree.ptr + len > tree.end))
6129 return DbspjErr::InvalidTreeNodeSpecification;
6131 if (unlikely(pattern.append(tree.ptr, len)==0))
6132 return DbspjErr::OutOfQueryMemory;
6139 Dbspj::appendParamToPattern(Local_pattern_store& dst,
6140 const RowPtr::Linear & row, Uint32 col)
6145 Uint32 offset = row.m_header->m_offset[col];
6146 const Uint32 * ptr = row.m_data +
offset;
6147 Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6149 Uint32 info = QueryPattern::data(len);
6150 return dst.append(&info,1) && dst.append(ptr,len) ? 0 : DbspjErr::OutOfQueryMemory;
6154 Dbspj::appendParamHeadToPattern(Local_pattern_store& dst,
6155 const RowPtr::Linear & row, Uint32 col)
6160 Uint32 offset = row.m_header->m_offset[col];
6161 const Uint32 * ptr = row.m_data +
offset;
6162 Uint32 len = AttributeHeader::getDataSize(*ptr);
6164 Uint32 info = QueryPattern::data(len+1);
6165 return dst.append(&info,1) && dst.append(ptr,len+1) ? 0 : DbspjErr::OutOfQueryMemory;
6169 Dbspj::appendTreeToSection(Uint32 & ptrI,
SectionReader & tree, Uint32 len)
6179 tree.getWords(tmp, SZ);
6180 ndbrequire(appendToSection(ptrI, tmp, SZ));
6184 tree.getWords(tmp, len);
6185 return appendToSection(ptrI, tmp, len) ? 0 : 1;
6193 Dbspj::getCorrelationData(
const RowPtr::Section & row,
6195 Uint32& correlationNumber)
6202 Uint32 offset = row.m_header->m_offset[col];
6203 ndbrequire(reader.step(offset));
6205 ndbrequire(reader.getWord(&tmp));
6206 Uint32 len = AttributeHeader::getDataSize(tmp);
6207 ndbrequire(len == 1);
6209 ndbrequire(reader.getWord(&correlationNumber));
6213 Dbspj::getCorrelationData(
const RowPtr::Linear & row,
6215 Uint32& correlationNumber)
6220 Uint32 offset = row.m_header->m_offset[col];
6221 Uint32 tmp = row.m_data[
offset];
6222 Uint32 len = AttributeHeader::getDataSize(tmp);
6223 ndbrequire(len == 1);
6225 correlationNumber = row.m_data[offset+1];
6229 Dbspj::appendColToSection(Uint32 & dst,
const RowPtr::Section & row,
6230 Uint32 col,
bool& hasNull)
6237 Uint32 offset = row.m_header->m_offset[col];
6238 ndbrequire(reader.step(offset));
6240 ndbrequire(reader.getWord(&tmp));
6241 Uint32 len = AttributeHeader::getDataSize(tmp);
6242 if (unlikely(len==0))
6248 return appendTreeToSection(dst, reader, len);
6252 Dbspj::appendColToSection(Uint32 & dst,
const RowPtr::Linear & row,
6253 Uint32 col,
bool& hasNull)
6258 Uint32 offset = row.m_header->m_offset[col];
6259 const Uint32 * ptr = row.m_data +
offset;
6260 Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6261 if (unlikely(len==0))
6267 return appendToSection(dst, ptr, len) ? 0 : DbspjErr::InvalidPattern;
6271 Dbspj::appendAttrinfoToSection(Uint32 & dst,
const RowPtr::Linear & row,
6272 Uint32 col,
bool& hasNull)
6277 Uint32 offset = row.m_header->m_offset[col];
6278 const Uint32 * ptr = row.m_data +
offset;
6279 Uint32 len = AttributeHeader::getDataSize(* ptr);
6280 if (unlikely(len==0))
6285 return appendToSection(dst, ptr, 1 + len) ? 0 : DbspjErr::InvalidPattern;
6289 Dbspj::appendAttrinfoToSection(Uint32 & dst,
const RowPtr::Section & row,
6290 Uint32 col,
bool& hasNull)
6297 Uint32 offset = row.m_header->m_offset[col];
6298 ndbrequire(reader.step(offset));
6300 ndbrequire(reader.peekWord(&tmp));
6301 Uint32 len = AttributeHeader::getDataSize(tmp);
6302 if (unlikely(len==0))
6307 return appendTreeToSection(dst, reader, 1 + len);
6315 Dbspj::appendPkColToSection(Uint32 & dst,
const RowPtr::Section & row, Uint32 col)
6322 Uint32 offset = row.m_header->m_offset[col];
6323 ndbrequire(reader.step(offset));
6325 ndbrequire(reader.getWord(&tmp));
6326 Uint32 len = AttributeHeader::getDataSize(tmp);
6328 ndbrequire(reader.step(1));
6329 return appendTreeToSection(dst, reader, len-1);
6337 Dbspj::appendPkColToSection(Uint32 & dst,
const RowPtr::Linear & row, Uint32 col)
6339 Uint32 offset = row.m_header->m_offset[col];
6340 Uint32 tmp = row.m_data[
offset];
6341 Uint32 len = AttributeHeader::getDataSize(tmp);
6343 return appendToSection(dst, row.m_data+offset+2, len - 1) ? 0 : 1;
6347 Dbspj::appendFromParent(Uint32 & dst, Local_pattern_store& pattern,
6348 Local_pattern_store::ConstDataBufferIterator& it,
6349 Uint32 levels,
const RowPtr & rowptr,
6353 m_treenode_pool.
getPtr(treeNodePtr, rowptr.m_src_node_ptrI);
6354 Uint32 corrVal = rowptr.m_src_correlation;
6359 if (unlikely(treeNodePtr.p->m_parentPtrI == RNIL))
6362 return DbspjErr::InvalidPattern;
6364 m_treenode_pool.
getPtr(treeNodePtr, treeNodePtr.p->m_parentPtrI);
6368 return DbspjErr::InvalidPattern;
6372 treeNodePtr.p->m_row_map.copyto(ref);
6373 Uint32 allocator = ref.m_allocator;
6374 const Uint32 * mapptr;
6378 mapptr = get_row_ptr_stack(ref);
6383 mapptr = get_row_ptr_var(ref);
6386 Uint32 pos = corrVal >> 16;
6387 if (unlikely(! (pos < treeNodePtr.p->m_row_map.m_size)))
6390 return DbspjErr::InvalidPattern;
6394 treeNodePtr.p->m_row_map.load(mapptr, pos, ref);
6396 const Uint32 * rowptr;
6400 rowptr = get_row_ptr_stack(ref);
6405 rowptr = get_row_ptr_var(ref);
6407 setupRowPtr(treeNodePtr, targetRow, ref, rowptr);
6412 getCorrelationData(targetRow.m_row_data.m_linear,
6413 targetRow.m_row_data.m_linear.m_header->m_len - 1,
6418 if (unlikely(it.isNull()))
6421 return DbspjErr::InvalidPattern;
6424 Uint32 info = *it.data;
6425 Uint32 type = QueryPattern::getType(info);
6429 case QueryPattern::P_COL:
6431 return appendColToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6433 case QueryPattern::P_UNQ_PK:
6435 return appendPkColToSection(dst, targetRow.m_row_data.m_linear, val);
6437 case QueryPattern::P_ATTRINFO:
6439 return appendAttrinfoToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6441 case QueryPattern::P_DATA:
6445 case QueryPattern::P_PARENT:
6449 case QueryPattern::P_PARAM:
6450 case QueryPattern::P_PARAM_HEADER:
6457 return DbspjErr::InvalidPattern;
6461 Dbspj::appendDataToSection(Uint32 & ptrI,
6462 Local_pattern_store& pattern,
6463 Local_pattern_store::ConstDataBufferIterator& it,
6464 Uint32 len,
bool& hasNull)
6466 if (unlikely(len==0))
6477 Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6478 while (len > NDB_SECTION_SEGMENT_SZ)
6480 pattern.copyout(tmp, NDB_SECTION_SEGMENT_SZ, it);
6481 appendToSection(ptrI, tmp, NDB_SECTION_SEGMENT_SZ);
6482 len -= NDB_SECTION_SEGMENT_SZ;
6485 pattern.copyout(tmp, len, it);
6486 appendToSection(ptrI, tmp, len);
6489 Uint32 remaining = len;
6491 Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6493 while (remaining > 0 && !it.isNull())
6495 tmp[dstIdx] = *it.data;
6499 if (dstIdx == NDB_SECTION_SEGMENT_SZ || remaining == 0)
6501 if (!appendToSection(ptrI, tmp, dstIdx))
6504 return DbspjErr::InvalidPattern;
6512 return DbspjErr::InvalidPattern;
6522 Dbspj::createEmptySection(Uint32 & dst)
6526 if (likely(
import(ptr, &tmp, 0)))
6534 return DbspjErr::OutOfSectionMemory;
6541 Dbspj::expandS(Uint32 & _dst, Local_pattern_store& pattern,
6542 const RowPtr & row,
bool& hasNull)
6547 Local_pattern_store::ConstDataBufferIterator it;
6549 while (!it.isNull())
6551 Uint32 info = *it.data;
6552 Uint32 type = QueryPattern::getType(info);
6556 case QueryPattern::P_COL:
6558 err = appendColToSection(dst, row.m_row_data.m_section, val, hasNull);
6560 case QueryPattern::P_UNQ_PK:
6562 err = appendPkColToSection(dst, row.m_row_data.m_section, val);
6564 case QueryPattern::P_ATTRINFO:
6566 err = appendAttrinfoToSection(dst, row.m_row_data.m_section, val, hasNull);
6568 case QueryPattern::P_DATA:
6570 err = appendDataToSection(dst, pattern, it, val, hasNull);
6572 case QueryPattern::P_PARENT:
6577 err = appendFromParent(dst, pattern, it, val, row, hasNull);
6580 case QueryPattern::P_PARAM:
6581 case QueryPattern::P_PARAM_HEADER:
6584 err = DbspjErr::InvalidPattern;
6587 if (unlikely(err != 0))
6606 Dbspj::expandL(Uint32 & _dst, Local_pattern_store& pattern,
6607 const RowPtr & row,
bool& hasNull)
6612 Local_pattern_store::ConstDataBufferIterator it;
6614 while (!it.isNull())
6616 Uint32 info = *it.data;
6617 Uint32 type = QueryPattern::getType(info);
6621 case QueryPattern::P_COL:
6623 err = appendColToSection(dst, row.m_row_data.m_linear, val, hasNull);
6625 case QueryPattern::P_UNQ_PK:
6627 err = appendPkColToSection(dst, row.m_row_data.m_linear, val);
6629 case QueryPattern::P_ATTRINFO:
6631 err = appendAttrinfoToSection(dst, row.m_row_data.m_linear, val, hasNull);
6633 case QueryPattern::P_DATA:
6635 err = appendDataToSection(dst, pattern, it, val, hasNull);
6637 case QueryPattern::P_PARENT:
6642 err = appendFromParent(dst, pattern, it, val, row, hasNull);
6645 case QueryPattern::P_PARAM:
6646 case QueryPattern::P_PARAM_HEADER:
6649 err = DbspjErr::InvalidPattern;
6652 if (unlikely(err != 0))
6668 Dbspj::expand(Uint32 & ptrI, DABuffer& pattern, Uint32 len,
6669 DABuffer& param, Uint32 paramCnt,
bool& hasNull)
6675 Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6676 struct RowPtr::Linear row;
6677 row.m_data = param.ptr;
6678 row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6679 buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6682 const Uint32 * ptr = pattern.ptr;
6683 const Uint32 * end = ptr + len;
6688 Uint32 info = * ptr++;
6689 Uint32 type = QueryPattern::getType(info);
6692 case QueryPattern::P_PARAM:
6694 ndbassert(val < paramCnt);
6695 err = appendColToSection(dst, row, val, hasNull);
6697 case QueryPattern::P_PARAM_HEADER:
6699 ndbassert(val < paramCnt);
6700 err = appendAttrinfoToSection(dst, row, val, hasNull);
6702 case QueryPattern::P_DATA:
6703 if (unlikely(val==0))
6709 else if (likely(appendToSection(dst, ptr, val)))
6717 err = DbspjErr::InvalidPattern;
6721 case QueryPattern::P_COL:
6722 case QueryPattern::P_PARENT:
6723 case QueryPattern::P_ATTRINFO:
6724 case QueryPattern::P_UNQ_PK:
6728 err = DbspjErr::InvalidPattern;
6731 if (unlikely(err != 0))
6751 Dbspj::expand(Local_pattern_store& dst,
Ptr<TreeNode> treeNodePtr,
6752 DABuffer& pattern, Uint32 len,
6753 DABuffer& param, Uint32 paramCnt)
6759 Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6760 struct RowPtr::Linear row;
6761 row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6762 row.m_data = param.ptr;
6763 buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6765 const Uint32 * end = pattern.ptr + len;
6766 for (; pattern.ptr < end; )
6768 Uint32 info = *pattern.ptr;
6769 Uint32 type = QueryPattern::getType(info);
6772 case QueryPattern::P_COL:
6773 case QueryPattern::P_UNQ_PK:
6774 case QueryPattern::P_ATTRINFO:
6776 err = appendToPattern(dst, pattern, 1);
6778 case QueryPattern::P_DATA:
6780 err = appendToPattern(dst, pattern, val+1);
6782 case QueryPattern::P_PARAM:
6785 ndbassert(val < paramCnt);
6786 err = appendParamToPattern(dst, row, val);
6789 case QueryPattern::P_PARAM_HEADER:
6792 ndbassert(val < paramCnt);
6793 err = appendParamHeadToPattern(dst, row, val);
6796 case QueryPattern::P_PARENT:
6799 err = appendToPattern(dst, pattern, 1);
6804 m_treenode_pool.
getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
6808 ndbassert(parentPtr.p->m_parentPtrI != RNIL);
6809 m_treenode_pool.
getPtr(parentPtr, parentPtr.p->m_parentPtrI);
6810 parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER;
6814 m_request_pool.
getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
6815 requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
6820 err = DbspjErr::InvalidPattern;
6824 if (unlikely(err != 0))
6838 Dbspj::parseDA(Build_context& ctx,
6841 DABuffer& tree, Uint32 treeBits,
6842 DABuffer& param, Uint32 paramBits)
6845 Uint32 attrInfoPtrI = RNIL;
6846 Uint32 attrParamPtrI = RNIL;
6853 DEBUG(
"use REPEAT_SCAN_RESULT when returning results");
6854 requestPtr.p->m_bits |= Request::RT_REPEAT_SCAN_RESULT;
6857 if (treeBits & DABits::NI_HAS_PARENT)
6860 DEBUG(
"NI_HAS_PARENT");
6869 err = DbspjErr::InvalidTreeNodeSpecification;
6871 Uint32 cnt = unpackList(NDB_ARRAY_SIZE(dst), dst, tree);
6872 if (unlikely(cnt > NDB_ARRAY_SIZE(dst)))
6880 if (unlikely(cnt!=1))
6888 for (Uint32 i = 0; i<cnt; i++)
6890 DEBUG(
"adding " << dst[i] <<
" as parent");
6893 Local_dependency_map map(pool, parentPtr.p->m_dependent_nodes);
6894 if (unlikely(!map.append(&treeNodePtr.i, 1)))
6896 err = DbspjErr::OutOfQueryMemory;
6901 treeNodePtr.p->m_parentPtrI = parentPtr.i;
6904 treeNodePtr.p->m_ancestors = parentPtr.p->m_ancestors;
6905 treeNodePtr.p->m_ancestors.
set(parentPtr.p->m_node_no);
6908 if (unlikely(err != 0))
6912 err = DbspjErr::InvalidTreeParametersSpecificationKeyParamBitsMissmatch;
6913 if (unlikely( ((treeBits & DABits::NI_KEY_PARAMS)==0) !=
6914 ((paramBits & DABits::PI_KEY_PARAMS)==0)))
6920 if (treeBits & (DABits::NI_KEY_PARAMS
6921 | DABits::NI_KEY_LINKED
6922 | DABits::NI_KEY_CONSTS))
6925 DEBUG(
"NI_KEY_PARAMS | NI_KEY_LINKED | NI_KEY_CONSTS");
6933 Uint32 len_cnt = * tree.ptr ++;
6934 Uint32 len = len_cnt & 0xFFFF;
6935 Uint32 cnt = len_cnt >> 16;
6938 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
6940 err = DbspjErr::InvalidTreeParametersSpecificationIncorrectKeyParamCount;
6941 if (unlikely( ((cnt==0) != ((treeBits & DABits::NI_KEY_PARAMS) == 0)) ||
6942 ((cnt==0) != ((paramBits & DABits::PI_KEY_PARAMS) == 0))))
6948 if (treeBits & DABits::NI_KEY_LINKED)
6951 DEBUG(
"LINKED-KEY PATTERN w/ " << cnt <<
" PARAM values");
6955 err = expand(pattern, treeNodePtr, tree, len, param, cnt);
6965 DEBUG(
"FIXED-KEY w/ " << cnt <<
" PARAM values");
6971 Uint32 keyInfoPtrI = RNIL;
6972 err = expand(keyInfoPtrI, tree, len, param, cnt, hasNull);
6973 if (unlikely(hasNull))
6977 DEBUG(
"BEWARE: FIXED-key contain NULL values");
6982 treeNodePtr.p->m_send.m_keyInfoPtrI = keyInfoPtrI;
6985 if (unlikely(err != 0))
6993 DABits::NI_LINKED_ATTR | DABits::NI_ATTR_INTERPRET |
6994 DABits::NI_ATTR_LINKED | DABits::NI_ATTR_PARAMS;
6996 if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
7026 Uint32 sections[5] = { 0, 0, 0, 0, 0 };
7027 Uint32 * sectionptrs = 0;
7030 (treeBits & DABits::NI_ATTR_INTERPRET) ||
7032 (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED);
7041 err = DbspjErr::OutOfSectionMemory;
7042 if (unlikely(!appendToSection(attrInfoPtrI, sections, 5)))
7049 getSection(ptr, attrInfoPtrI);
7050 sectionptrs = ptr.p->theData;
7052 if (treeBits & DABits::NI_ATTR_INTERPRET)
7059 err = DbspjErr::BothTreeAndParametersContainInterpretedProgram;
7060 if (unlikely(paramBits & DABits::PI_ATTR_INTERPRET))
7066 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7067 Uint32 len2 = * tree.ptr++;
7068 Uint32 len_prg = len2 & 0xFFFF;
7069 Uint32 len_pattern = len2 >> 16;
7070 err = DbspjErr::OutOfSectionMemory;
7071 if (unlikely(!appendToSection(attrInfoPtrI, tree.ptr, len_prg)))
7077 tree.ptr += len_prg;
7078 sectionptrs[1] = len_prg;
7080 Uint32 tmp = * tree.ptr ++;
7081 Uint32 cnt = tmp & 0xFFFF;
7083 if (treeBits & DABits::NI_ATTR_LINKED)
7090 m_dependency_map_pool);
7091 Local_pattern_store pattern(pool,treeNodePtr.p->m_attrParamPattern);
7092 err = expand(pattern, treeNodePtr, tree, len_pattern, param, cnt);
7111 err = expand(attrParamPtrI, tree, len_pattern, param, cnt, hasNull);
7126 ndbrequire((treeBits & DABits::NI_ATTR_PARAMS) == 0);
7128 ndbrequire((treeBits & DABits::NI_ATTR_LINKED) == 0);
7130 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7132 if (! (paramBits & DABits::PI_ATTR_INTERPRET))
7141 Uint32 tmp = Interpreter::ExitOK();
7142 err = DbspjErr::OutOfSectionMemory;
7143 if (unlikely(!appendToSection(attrInfoPtrI, &tmp, 1)))
7153 if (paramBits & DABits::PI_ATTR_INTERPRET)
7160 const Uint32 len2 = * param.ptr++;
7161 Uint32 program_len = len2 & 0xFFFF;
7162 Uint32 subroutine_len = len2 >> 16;
7163 err = DbspjErr::OutOfSectionMemory;
7164 if (unlikely(!appendToSection(attrInfoPtrI, param.ptr, program_len)))
7175 sectionptrs[1] = program_len;
7176 param.ptr += program_len;
7180 if (unlikely(!appendToSection(attrParamPtrI,
7181 param.ptr, subroutine_len)))
7186 sectionptrs[4] = subroutine_len;
7187 param.ptr += subroutine_len;
7189 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7192 Uint32 sum_read = 0;
7193 Uint32 dst[MAX_ATTRIBUTES_IN_TABLE + 2];
7195 if (paramBits & DABits::PI_ATTR_LIST)
7198 Uint32 len = * param.ptr++;
7199 DEBUG(
"PI_ATTR_LIST");
7202 err = DbspjErr::OutOfSectionMemory;
7203 if (!appendToSection(attrInfoPtrI, param.ptr, len))
7215 flush[0] = AttributeHeader::FLUSH_AI << 16;
7216 flush[1] = ctx.m_resultRef;
7217 flush[2] = ctx.m_resultData;
7218 flush[3] = ctx.m_senderRef;
7219 if (!appendToSection(attrInfoPtrI, flush, 4))
7225 sum_read += len + 4;
7228 if (treeBits & DABits::NI_LINKED_ATTR)
7231 DEBUG(
"NI_LINKED_ATTR");
7232 err = DbspjErr::InvalidTreeNodeSpecification;
7233 Uint32 cnt = unpackList(MAX_ATTRIBUTES_IN_TABLE, dst, tree);
7234 if (unlikely(cnt > MAX_ATTRIBUTES_IN_TABLE))
7243 for (Uint32 i = 0; i<cnt; i++)
7249 dst[cnt++] = AttributeHeader::CORR_FACTOR32 << 16;
7251 err = DbspjErr::OutOfSectionMemory;
7252 if (!appendToSection(attrInfoPtrI, dst, cnt))
7268 sectionptrs[3] = sum_read;
7270 if (attrParamPtrI != RNIL)
7276 getSection(ptr, attrParamPtrI);
7279 err = appendTreeToSection(attrInfoPtrI, r0, ptr.sz);
7280 sectionptrs[4] = ptr.sz;
7281 if (unlikely(err != 0))
7287 releaseSection(attrParamPtrI);
7291 treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
7308 void Dbspj::execDBINFO_SCANREQ(
Signal *signal)
7317 switch(req.tableId){
7320 case Ndbinfo::COUNTERS_TABLEID:
7323 { Ndbinfo::SPJ_READS_RECEIVED_COUNTER,
7324 c_Counters.get_counter(CI_READS_RECEIVED) },
7325 { Ndbinfo::SPJ_LOCAL_READS_SENT_COUNTER,
7326 c_Counters.get_counter(CI_LOCAL_READS_SENT) },
7327 { Ndbinfo::SPJ_REMOTE_READS_SENT_COUNTER,
7328 c_Counters.get_counter(CI_REMOTE_READS_SENT) },
7329 { Ndbinfo::SPJ_READS_NOT_FOUND_COUNTER,
7330 c_Counters.get_counter(CI_READS_NOT_FOUND) },
7331 { Ndbinfo::SPJ_TABLE_SCANS_RECEIVED_COUNTER,
7332 c_Counters.get_counter(CI_TABLE_SCANS_RECEIVED) },
7333 { Ndbinfo::SPJ_LOCAL_TABLE_SCANS_SENT_COUNTER,
7334 c_Counters.get_counter(CI_LOCAL_TABLE_SCANS_SENT) },
7335 { Ndbinfo::SPJ_RANGE_SCANS_RECEIVED_COUNTER,
7336 c_Counters.get_counter(CI_RANGE_SCANS_RECEIVED) },
7337 { Ndbinfo::SPJ_LOCAL_RANGE_SCANS_SENT_COUNTER,
7338 c_Counters.get_counter(CI_LOCAL_RANGE_SCANS_SENT) },
7339 { Ndbinfo::SPJ_REMOTE_RANGE_SCANS_SENT_COUNTER,
7340 c_Counters.get_counter(CI_REMOTE_RANGE_SCANS_SENT) },
7341 { Ndbinfo::SPJ_SCAN_BATCHES_RETURNED_COUNTER,
7342 c_Counters.get_counter(CI_SCAN_BATCHES_RETURNED) },
7343 { Ndbinfo::SPJ_SCAN_ROWS_RETURNED_COUNTER,
7344 c_Counters.get_counter(CI_SCAN_ROWS_RETURNED) },
7345 { Ndbinfo::SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7346 c_Counters.get_counter(CI_PRUNED_RANGE_SCANS_RECEIVED) },
7347 { Ndbinfo::SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7348 c_Counters.get_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED) }
7350 const size_t num_counters =
sizeof(counters) /
sizeof(counters[0]);
7352 Uint32 i = cursor->data[0];
7353 const BlockNumber bn = blockToMain(number());
7354 while(i < num_counters)
7358 row.write_uint32(getOwnNodeId());
7359 row.write_uint32(bn);
7360 row.write_uint32(instance());
7361 row.write_uint32(counters[i].
id);
7363 row.write_uint64(counters[i].val);
7364 ndbinfo_send_row(signal, req, row, rl);
7366 if (rl.need_break(req))
7369 ndbinfo_send_scan_break(signal, req, rl, i);
7380 ndbinfo_send_scan_conf(signal, req, rl);
7383 void Dbspj::IncrementalStatistics::update(
double sample)
7386 if(m_noOfSamples < 0xffffffff)
7389 const double delta = sample - m_mean;
7390 m_mean += delta/m_noOfSamples;
7391 m_sumSquare += delta * (sample - m_mean);