21 #include <kernel_types.h>
24 #include <signaldata/ReadNodesConf.hpp>
25 #include <signaldata/NodeFailRep.hpp>
26 #include <signaldata/DumpStateOrd.hpp>
27 #include <signaldata/GetTabInfo.hpp>
28 #include <signaldata/DictTabInfo.hpp>
29 #include <signaldata/CopyData.hpp>
30 #include <signaldata/BuildIndxImpl.hpp>
31 #include <signaldata/SumaImpl.hpp>
32 #include <signaldata/UtilPrepare.hpp>
33 #include <signaldata/UtilExecute.hpp>
34 #include <signaldata/UtilRelease.hpp>
35 #include <SectionReader.hpp>
36 #include <AttributeHeader.hpp>
37 #include <signaldata/TcKeyReq.hpp>
39 #include <signaldata/DbinfoScan.hpp>
40 #include <signaldata/TransIdAI.hpp>
41 #include <signaldata/WaitGCP.hpp>
43 #define CONSTRAINT_VIOLATION 893
47 check_timeout(Uint32 errCode)
56 #define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
63 c_theNodes(c_theNodeRecPool),
68 c_theSubscriptions(c_theSubscriptionRecPool)
70 BLOCK_CONSTRUCTOR(
Trix);
73 addRecSignal(GSN_READ_CONFIG_REQ, &Trix::execREAD_CONFIG_REQ);
74 addRecSignal(GSN_STTOR, &Trix::execSTTOR);
75 addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR);
76 addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
77 addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
78 addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
79 addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
80 addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
81 addRecSignal(GSN_DBINFO_SCANREQ, &Trix::execDBINFO_SCANREQ);
84 addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Trix::execBUILD_INDX_IMPL_REQ);
86 addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &Trix::execBUILD_INDX_IMPL_CONF);
87 addRecSignal(GSN_BUILD_INDX_IMPL_REF, &Trix::execBUILD_INDX_IMPL_REF);
89 addRecSignal(GSN_COPY_DATA_IMPL_REQ, &Trix::execCOPY_DATA_IMPL_REQ);
91 addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
92 addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
93 addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
94 addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
95 addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
96 addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
100 addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
101 addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
102 addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
103 addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
104 addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
105 addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
106 addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
107 addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
109 addRecSignal(GSN_WAIT_GCP_REF, &Trix::execWAIT_GCP_REF);
110 addRecSignal(GSN_WAIT_GCP_CONF, &Trix::execWAIT_GCP_CONF);
113 addRecSignal(GSN_INDEX_STAT_IMPL_REQ, &Trix::execINDEX_STAT_IMPL_REQ);
114 addRecSignal(GSN_GET_TABINFO_CONF, &Trix::execGET_TABINFO_CONF);
115 addRecSignal(GSN_GET_TABINFOREF, &Trix::execGET_TABINFO_REF);
118 c_statGetMetaDone =
false;
129 Trix::execREAD_CONFIG_REQ(
Signal* signal)
135 Uint32 ref = req->senderRef;
136 Uint32 senderData = req->senderData;
139 m_ctx.m_config.getOwnConfigIterator();
143 c_theAttrOrderBufferPool.
setSize(100);
144 c_theSubscriptionRecPool.
setSize(100);
148 SubscriptionRecPtr subptr;
149 while(subscriptions.seize(subptr) ==
true) {
150 new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
152 subscriptions.release();
155 conf->senderRef = reference();
156 conf->senderData = senderData;
157 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
158 ReadConfigConf::SignalLength, JBB);
164 void Trix::execSTTOR(
Signal* signal)
169 const Uint32 theSignalKey = signal->theData[6];
171 signal->theData[0] = theSignalKey;
172 signal->theData[3] = 1;
173 signal->theData[4] = 255;
174 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
181 void Trix::execNDB_STTOR(
Signal* signal)
184 BlockReference ndbcntrRef = signal->theData[0];
185 Uint16 startphase = signal->theData[2];
186 Uint16 mynode = signal->theData[1];
190 switch (startphase) {
196 signal->theData[0] = calcTrixBlockRef(mynode);
197 sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
210 void Trix::execREAD_NODESCONF(
Signal* signal)
216 NodeRecPtr nodeRecPtr;
218 c_masterNodeId = readNodes->masterNodeId;
219 c_masterTrixRef = RNIL;
222 for(
unsigned i = 0;
i < MAX_NDB_NODES;
i++) {
227 ndbrequire(c_theNodes.
seizeId(nodeRecPtr,
i));
228 nodeRecPtr.p->trixRef = calcTrixBlockRef(
i);
229 if (
i == c_masterNodeId) {
230 c_masterTrixRef = nodeRecPtr.p->trixRef;
241 arrGuard(c_noNodesFailed, MAX_NDB_NODES);
242 nodeRecPtr.p->alive =
false;
244 c_blockState = Trix::NODE_FAILURE;
250 nodeRecPtr.p->alive =
true;
254 if (c_noNodesFailed == 0) {
255 c_blockState = Trix::STARTED;
262 void Trix::execREAD_NODESREF(
Signal* signal)
270 void Trix::execNODE_FAILREP(
Signal* signal)
279 NodeRecPtr nodeRecPtr;
281 for(c_theNodes.
first(nodeRecPtr);
282 nodeRecPtr.i != RNIL;
283 c_theNodes.
next(nodeRecPtr)) {
285 nodeRecPtr.p->alive =
false;
290 if (c_masterNodeId != masterNodeId) {
291 c_masterNodeId = masterNodeId;
292 NodeRecord* nodeRec = c_theNodes.
getPtr(masterNodeId);
293 c_masterTrixRef = nodeRec->trixRef;
300 void Trix::execINCL_NODEREQ(
Signal* signal)
303 UintR node_id = signal->theData[1];
304 NodeRecord* nodeRec = c_theNodes.
getPtr(node_id);
305 nodeRec->alive =
true;
308 nodeRec->trixRef = calcTrixBlockRef(node_id);
309 if (c_noNodesFailed == 0) {
310 c_blockState = Trix::STARTED;
316 Trix::execDUMP_STATE_ORD(
Signal* signal)
322 switch(dumpStateOrd->args[0]) {
329 MEMCOPY_NO_WORDS(buildIndxReq,
331 BuildIndxImplReq::SignalLength);
332 buildIndxReq->senderRef = reference();
333 buildIndxReq->parallelism = 10;
334 Uint32 indexColumns[1] = {1};
335 Uint32 keyColumns[1] = {0};
337 ls_ptr[0].p = indexColumns;
339 ls_ptr[1].p = keyColumns;
341 sendSignal(reference(),
342 GSN_BUILD_INDX_IMPL_REQ,
344 BuildIndxImplReq::SignalLength,
354 MEMCOPY_NO_WORDS(buildIndxReq,
356 BuildIndxImplReq::SignalLength);
357 buildIndxReq->senderRef = reference();
358 buildIndxReq->parallelism = 10;
359 Uint32 indexColumns[2] = {0, 1};
360 Uint32 keyColumns[1] = {0};
362 ls_ptr[0].p = indexColumns;
364 ls_ptr[1].p = keyColumns;
366 sendSignal(reference(),
367 GSN_BUILD_INDX_IMPL_REQ,
369 BuildIndxImplReq::SignalLength,
379 MEMCOPY_NO_WORDS(buildIndxReq,
381 BuildIndxImplReq::SignalLength);
382 buildIndxReq->senderRef = reference();
383 buildIndxReq->parallelism = 10;
384 Uint32 indexColumns[3] = {0, 3, 5};
385 Uint32 keyColumns[1] = {0};
387 ls_ptr[0].p = indexColumns;
389 ls_ptr[1].p = keyColumns;
391 sendSignal(reference(),
392 GSN_BUILD_INDX_IMPL_REQ,
394 BuildIndxImplReq::SignalLength,
404 MEMCOPY_NO_WORDS(buildIndxReq,
406 BuildIndxImplReq::SignalLength);
407 buildIndxReq->senderRef = reference();
408 buildIndxReq->parallelism = 10;
409 Uint32 indexColumns[3] = {0, 3, 5};
410 Uint32 keyColumns[2] = {0, 1};
412 ls_ptr[0].p = indexColumns;
414 ls_ptr[1].p = keyColumns;
416 sendSignal(reference(),
417 GSN_BUILD_INDX_IMPL_REQ,
419 BuildIndxImplReq::SignalLength,
429 MEMCOPY_NO_WORDS(buildIndxReq,
431 BuildIndxImplReq::SignalLength);
432 buildIndxReq->senderRef = reference();
433 buildIndxReq->parallelism = 10;
434 Uint32 indexColumns[3] = {0, 3, 5};
435 Uint32 keyColumns[1] = {0};
437 ls_ptr[0].p = indexColumns;
439 ls_ptr[1].p = keyColumns;
441 sendSignal(reference(),
442 GSN_BUILD_INDX_IMPL_REQ,
444 BuildIndxImplReq::SignalLength,
454 MEMCOPY_NO_WORDS(buildIndxReq,
456 BuildIndxImplReq::SignalLength);
457 buildIndxReq->senderRef = reference();
458 buildIndxReq->parallelism = 10;
459 Uint32 indexColumns[3] = {0, 3, 5};
460 Uint32 keyColumns[2] = {0, 1};
462 ls_ptr[0].p = indexColumns;
464 ls_ptr[1].p = keyColumns;
466 sendSignal(reference(),
467 GSN_BUILD_INDX_IMPL_REQ,
469 BuildIndxImplReq::SignalLength,
478 if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
480 RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
481 RSS_AP_SNAPSHOT_SAVE(c_statOpPool);
485 if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
487 RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
488 RSS_AP_SNAPSHOT_CHECK(c_statOpPool);
492 if (signal->theData[0] == 8004)
494 infoEvent(
"TRIX: c_theSubscriptionRecPool size: %u free: %u",
495 c_theSubscriptionRecPool.getSize(),
496 c_theSubscriptionRecPool.getNoOfFree());
502 void Trix::execDBINFO_SCANREQ(
Signal *signal)
512 case Ndbinfo::POOLS_TABLEID:
516 {
"Attribute Order Buffer",
517 c_theAttrOrderBufferPool.getUsed(),
518 c_theAttrOrderBufferPool.getSize(),
519 c_theAttrOrderBufferPool.getEntrySize(),
520 c_theAttrOrderBufferPool.getUsedHi(),
522 {
"Subscription Record",
523 c_theSubscriptionRecPool.getUsed(),
524 c_theSubscriptionRecPool.getSize(),
525 c_theSubscriptionRecPool.getEntrySize(),
526 c_theSubscriptionRecPool.getUsedHi(),
528 { NULL, 0,0,0,0,{0,0,0,0}}
531 const size_t num_config_params =
532 sizeof(pools[0].config_params) /
sizeof(pools[0].config_params[0]);
533 Uint32 pool =
cursor->data[0];
534 BlockNumber bn = blockToMain(number());
535 while(pools[pool].poolname)
539 row.write_uint32(getOwnNodeId());
540 row.write_uint32(bn);
541 row.write_uint32(instance());
542 row.write_string(pools[pool].poolname);
543 row.write_uint64(pools[pool].used);
544 row.write_uint64(pools[pool].total);
545 row.write_uint64(pools[pool].used_hi);
546 row.write_uint64(pools[pool].entry_size);
547 for (
size_t i = 0;
i < num_config_params;
i++)
548 row.write_uint32(pools[pool].config_params[
i]);
549 ndbinfo_send_row(signal, req, row, rl);
551 if (rl.need_break(req))
554 ndbinfo_send_scan_break(signal, req, rl, pool);
564 ndbinfo_send_scan_conf(signal, req, rl);
568 void Trix:: execBUILD_INDX_IMPL_REQ(
Signal* signal)
571 DBUG_ENTER(
"Trix:: execBUILD_INDX_IMPL_REQ");
575 *buildIndxReq = &buildIndxReqData;
578 SubscriptionRecPtr subRecPtr;
579 SubscriptionRecord* subRec;
582 if (!c_theSubscriptions.
seizeId(subRecPtr, buildIndxReq->buildId)) {
587 buildIndxRef->errorCode = BuildIndxRef::AllocationFailure;
588 releaseSections(handle);
589 sendSignal(buildIndxReq->senderRef, GSN_BUILD_INDX_IMPL_REF, signal,
590 BuildIndxRef::SignalLength, JBB);
594 subRec = subRecPtr.p;
595 subRec->errorCode = BuildIndxRef::NoError;
596 subRec->userReference = buildIndxReq->senderRef;
597 subRec->connectionPtr = buildIndxReq->senderData;
598 subRec->schemaTransId = buildIndxReq->transId;
599 subRec->subscriptionId = buildIndxReq->buildId;
600 subRec->subscriptionKey = buildIndxReq->buildKey;
601 subRec->indexType = buildIndxReq->indexType;
602 subRec->sourceTableId = buildIndxReq->tableId;
603 subRec->targetTableId = buildIndxReq->indexId;
604 subRec->parallelism = buildIndxReq->parallelism;
605 subRec->expectedConf = 0;
606 subRec->subscriptionCreated =
false;
607 subRec->pendingSubSyncContinueConf =
false;
608 subRec->prepareId = RNIL;
609 subRec->requestType = INDEX_BUILD;
610 subRec->fragCount = 0;
611 subRec->fragId = ZNIL;
612 subRec->m_rows_processed = 0;
613 subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP;
615 if (buildIndxReq->requestType & BuildIndxImplReq::RF_NO_DISK)
617 subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
621 Uint32 noOfSections = handle.m_cnt;
622 if (noOfSections > 0) {
625 handle.getSection(ptr, BuildIndxImplReq::INDEX_COLUMNS);
626 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
627 subRec->noOfIndexColumns = ptr.sz;
629 if (noOfSections > 1) {
632 handle.getSection(ptr, BuildIndxImplReq::KEY_COLUMNS);
633 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
634 subRec->noOfKeyColumns = ptr.sz;
639 printf(
"Trix:: execBUILD_INDX_IMPL_REQ: Attribute order:\n");
640 subRec->attributeOrder.print(stdout);
643 releaseSections(handle);
644 prepareInsertTransactions(signal, subRecPtr);
648 void Trix:: execBUILD_INDX_IMPL_CONF(
Signal* signal)
650 printf(
"Trix:: execBUILD_INDX_IMPL_CONF\n");
653 void Trix:: execBUILD_INDX_IMPL_REF(
Signal* signal)
655 printf(
"Trix:: execBUILD_INDX_IMPL_REF\n");
658 void Trix::execUTIL_PREPARE_CONF(
Signal* signal)
662 SubscriptionRecPtr subRecPtr;
663 SubscriptionRecord* subRec;
665 subRecPtr.i = utilPrepareConf->senderData;
666 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
667 printf(
"Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
670 if (subRec->requestType == STAT_UTIL)
672 statUtilPrepareConf(signal, subRec->m_statPtrI);
675 subRecPtr.p = subRec;
676 subRec->prepareId = utilPrepareConf->prepareId;
677 setupSubscription(signal, subRecPtr);
680 void Trix::execUTIL_PREPARE_REF(
Signal* signal)
684 SubscriptionRecPtr subRecPtr;
685 SubscriptionRecord* subRec;
687 subRecPtr.i = utilPrepareRef->senderData;
688 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
689 printf(
"Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
692 if (subRec->requestType == STAT_UTIL)
694 statUtilPrepareRef(signal, subRec->m_statPtrI);
697 subRecPtr.p = subRec;
698 subRec->errorCode = (BuildIndxRef::ErrorCode)utilPrepareRef->errorCode;
701 conf->senderData = subRecPtr.i;
702 execUTIL_RELEASE_CONF(signal);
705 void Trix::execUTIL_EXECUTE_CONF(
Signal* signal)
709 SubscriptionRecPtr subRecPtr;
710 SubscriptionRecord* subRec;
712 const Uint32 gci_hi = utilExecuteConf->gci_hi;
713 const Uint32 gci_lo = utilExecuteConf->gci_lo;
714 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
716 subRecPtr.i = utilExecuteConf->senderData;
717 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
718 printf(
"rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
721 if (subRec->requestType == STAT_UTIL)
723 statUtilExecuteConf(signal, subRec->m_statPtrI);
726 subRecPtr.p = subRec;
727 subRec->expectedConf--;
729 if (gci > subRecPtr.p->m_gci)
732 subRecPtr.p->m_gci = gci;
735 checkParallelism(signal, subRec);
736 if (subRec->expectedConf == 0)
738 if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
741 wait_gcp(signal, subRecPtr);
744 buildComplete(signal, subRecPtr);
748 void Trix::execUTIL_EXECUTE_REF(
Signal* signal)
752 SubscriptionRecPtr subRecPtr;
753 SubscriptionRecord* subRec;
755 subRecPtr.i = utilExecuteRef->senderData;
756 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
757 printf(
"Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
760 if (subRec->requestType == STAT_UTIL)
762 statUtilExecuteRef(signal, subRec->m_statPtrI);
765 subRecPtr.p = subRec;
766 ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
767 if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
770 buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
772 else if (check_timeout(utilExecuteRef->TCErrorCode))
775 buildFailed(signal, subRecPtr, BuildIndxRef::DeadlockError);
780 buildFailed(signal, subRecPtr,
781 (BuildIndxRef::ErrorCode)utilExecuteRef->TCErrorCode);
785 void Trix::execSUB_CREATE_CONF(
Signal* signal)
788 DBUG_ENTER(
"Trix::execSUB_CREATE_CONF");
790 SubscriptionRecPtr subRecPtr;
791 SubscriptionRecord* subRec;
793 subRecPtr.i = subCreateConf->senderData;
794 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
795 printf(
"Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
798 subRec->subscriptionCreated =
true;
799 subRecPtr.p = subRec;
801 DBUG_PRINT(
"info",(
"i: %u subscriptionId: %u, subscriptionKey: %u",
802 subRecPtr.i, subRecPtr.p->subscriptionId,
803 subRecPtr.p->subscriptionKey));
805 startTableScan(signal, subRecPtr);
809 void Trix::execSUB_CREATE_REF(
Signal* signal)
812 DBUG_ENTER(
"Trix::execSUB_CREATE_REF");
815 SubscriptionRecPtr subRecPtr;
816 SubscriptionRecord* subRec;
818 subRecPtr.i = subCreateRef->senderData;
819 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL)
821 printf(
"Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
824 subRecPtr.p = subRec;
825 subRecPtr.p->errorCode = (BuildIndxRef::ErrorCode)subCreateRef->errorCode;
828 req->prepareId = subRecPtr.p->prepareId;
829 req->senderData = subRecPtr.i;
831 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
832 UtilReleaseReq::SignalLength, JBB);
837 void Trix::execSUB_SYNC_CONF(
Signal* signal)
840 DBUG_ENTER(
"Trix::execSUB_SYNC_CONF");
842 SubscriptionRecPtr subRecPtr;
843 SubscriptionRecord* subRec;
845 subRecPtr.i = subSyncConf->senderData;
846 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
847 printf(
"Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n",
852 subRecPtr.p = subRec;
853 subRec->expectedConf--;
854 checkParallelism(signal, subRec);
855 if (subRec->expectedConf == 0)
857 if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
860 wait_gcp(signal, subRecPtr);
863 buildComplete(signal, subRecPtr);
868 void Trix::execSUB_SYNC_REF(
Signal* signal)
871 DBUG_ENTER(
"Trix::execSUB_SYNC_REF");
873 SubscriptionRecPtr subRecPtr;
874 SubscriptionRecord* subRec;
876 subRecPtr.i = subSyncRef->senderData;
877 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
878 printf(
"Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
881 subRecPtr.p = subRec;
882 buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
886 void Trix::execSUB_SYNC_CONTINUE_REQ(
Signal* signal)
891 SubscriptionRecPtr subRecPtr;
892 SubscriptionRecord* subRec;
893 subRecPtr.i = subSyncContinueReq->subscriberData;
894 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
895 printf(
"Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
898 subRecPtr.p = subRec;
899 subRec->pendingSubSyncContinueConf =
true;
900 subRec->syncPtr = subSyncContinueReq->senderData;
901 checkParallelism(signal, subRec);
904 void Trix::execSUB_TABLE_DATA(
Signal* signal)
907 DBUG_ENTER(
"Trix::execSUB_TABLE_DATA");
909 SubscriptionRecPtr subRecPtr;
910 SubscriptionRecord* subRec;
911 subRecPtr.i = subTableData->senderData;
912 if ((subRec = c_theSubscriptions.
getPtr(subRecPtr.i)) == NULL) {
913 printf(
"Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
916 subRecPtr.p = subRec;
917 switch(subRecPtr.p->requestType){
919 executeBuildInsertTransaction(signal, subRecPtr);
923 executeReorgTransaction(signal, subRecPtr, subTableData->takeOver);
930 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
931 statCleanExecute(signal, stat);
936 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
937 statScanExecute(signal, stat);
942 subRecPtr.p->m_rows_processed++;
947 void Trix::setupSubscription(
Signal* signal, SubscriptionRecPtr subRecPtr)
950 DBUG_ENTER(
"Trix::setupSubscription");
951 SubscriptionRecord* subRec = subRecPtr.p;
954 subCreateReq->senderRef = reference();
955 subCreateReq->senderData = subRecPtr.i;
956 subCreateReq->subscriptionId = subRec->subscriptionId;
957 subCreateReq->subscriptionKey = subRec->subscriptionKey;
958 subCreateReq->tableId = subRec->sourceTableId;
959 subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
960 subCreateReq->schemaTransId = subRec->schemaTransId;
962 DBUG_PRINT(
"info",(
"i: %u subscriptionId: %u, subscriptionKey: %u",
963 subRecPtr.i, subCreateReq->subscriptionId,
964 subCreateReq->subscriptionKey));
966 sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
967 signal, SubCreateReq::SignalLength, JBB);
972 void Trix::startTableScan(
Signal* signal, SubscriptionRecPtr subRecPtr)
976 Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
977 SubscriptionRecord* subRec = subRecPtr.p;
978 AttrOrderBuffer::DataBufferIterator iter;
981 bool moreAttributes = subRec->attributeOrder.first(iter);
982 while (moreAttributes) {
983 attributeList[i++] = *iter.data;
984 moreAttributes = subRec->attributeOrder.next(iter);
990 orderPtr[0].p = attributeList;
991 orderPtr[0].sz = subRec->attributeOrder.getSize();
995 subSyncReq->senderRef = reference();
996 subSyncReq->senderData = subRecPtr.i;
997 subSyncReq->subscriptionId = subRec->subscriptionId;
998 subSyncReq->subscriptionKey = subRec->subscriptionKey;
999 subSyncReq->part = SubscriptionData::TableData;
1000 subSyncReq->requestInfo = 0;
1001 subSyncReq->fragCount = subRec->fragCount;
1002 subSyncReq->fragId = subRec->fragId;
1004 if (subRec->m_flags & SubscriptionRecord::RF_NO_DISK)
1007 subSyncReq->requestInfo |= SubSyncReq::NoDisk;
1010 if (subRec->m_flags & SubscriptionRecord::RF_TUP_ORDER)
1013 subSyncReq->requestInfo |= SubSyncReq::TupOrder;
1016 if (subRec->requestType == REORG_COPY)
1019 subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1021 else if (subRec->requestType == REORG_DELETE)
1024 subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1025 subSyncReq->requestInfo |= SubSyncReq::Reorg;
1027 else if (subRec->requestType == STAT_CLEAN)
1030 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1031 StatOp::Clean clean = stat.m_clean;
1032 orderPtr[1].p = clean.m_bound;
1033 orderPtr[1].sz = clean.m_boundSize;
1035 subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1036 subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1038 else if (subRec->requestType == STAT_SCAN)
1044 subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1045 subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1046 subSyncReq->requestInfo |= SubSyncReq::StatScan;
1048 subRecPtr.p->expectedConf = 1;
1050 DBUG_PRINT(
"info",(
"i: %u subscriptionId: %u, subscriptionKey: %u",
1051 subRecPtr.i, subSyncReq->subscriptionId,
1052 subSyncReq->subscriptionKey));
1054 sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
1055 signal, SubSyncReq::SignalLength, JBB, orderPtr, noOfSections);
1058 void Trix::prepareInsertTransactions(
Signal* signal,
1059 SubscriptionRecPtr subRecPtr)
1061 SubscriptionRecord* subRec = subRecPtr.p;
1066 utilPrepareReq->senderRef = reference();
1067 utilPrepareReq->senderData = subRecPtr.i;
1068 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1070 const Uint32 pageSizeInWords = 128;
1071 Uint32 propPage[pageSizeInWords];
1075 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1076 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1078 for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
1079 w.add(UtilPrepareReq::AttributeId, i);
1084 printf(
"Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
1085 reader.printAll(ndbout);
1089 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1090 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1091 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1092 UtilPrepareReq::SignalLength, JBB,
1093 sectionsPtr, UtilPrepareReq::NoOfSections);
1096 void Trix::executeBuildInsertTransaction(Signal* signal,
1097 SubscriptionRecPtr subRecPtr)
1100 SubscriptionRecord* subRec = subRecPtr.p;
1104 utilExecuteReq->senderRef = reference();
1105 utilExecuteReq->senderData = subRecPtr.i;
1106 utilExecuteReq->prepareId = subRec->prepareId;
1108 printf(
"Header size %u\n", headerPtr.sz);
1109 for(
int i = 0; i < headerPtr.sz; i++)
1110 printf(
"H'%.8x ", headerBuffer[i]);
1113 printf(
"Data size %u\n", dataPtr.sz);
1114 for(
int i = 0; i < dataPtr.sz; i++)
1115 printf(
"H'%.8x ", dataBuffer[i]);
1122 handle.getSection(headerPtr, 0);
1123 handle.getSection(dataPtr, 1);
1125 Uint32* headerBuffer = signal->theData + 25;
1126 Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1128 copy(headerBuffer, headerPtr);
1129 copy(dataBuffer, dataPtr);
1130 releaseSections(handle);
1133 Uint32 noOfKeyData = 0;
1134 for(Uint32 i = 0; i < headerPtr.sz; i++) {
1138 if (keyAttrHead->isNULL())
1141 if (i < subRec->noOfIndexColumns)
1143 keyAttrHead->setAttributeId(i);
1146 noOfKeyData += keyAttrHead->getDataSize();
1149 subRec->expectedConf++;
1153 subRec->noOfIndexColumns,
1157 sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1158 sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
1159 subRec->noOfIndexColumns + 1;
1160 sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1161 sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1162 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1163 UtilExecuteReq::SignalLength, JBB,
1164 sectionsPtr, UtilExecuteReq::NoOfSections);
1167 void Trix::executeReorgTransaction(Signal* signal,
1168 SubscriptionRecPtr subRecPtr,
1172 SubscriptionRecord* subRec = subRecPtr.p;
1176 const Uint32 tScanInfo = takeOver & 0x3FFFF;
1177 const Uint32 tTakeOverFragment = takeOver >> 20;
1180 TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
1181 TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
1182 TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
1183 utilExecuteReq->scanTakeOver = scanInfo;
1186 utilExecuteReq->senderRef = reference();
1187 utilExecuteReq->senderData = subRecPtr.i;
1188 utilExecuteReq->prepareId = subRec->prepareId;
1190 printf(
"Header size %u\n", headerPtr.sz);
1191 for(
int i = 0; i < headerPtr.sz; i++)
1192 printf(
"H'%.8x ", headerBuffer[i]);
1195 printf(
"Data size %u\n", dataPtr.sz);
1196 for(
int i = 0; i < dataPtr.sz; i++)
1197 printf(
"H'%.8x ", dataBuffer[i]);
1201 subRec->expectedConf++;
1204 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1205 UtilExecuteReq::SignalLength, JBB,
1210 Trix::wait_gcp(
Signal* signal, SubscriptionRecPtr subRecPtr, Uint32 delay)
1213 req->senderRef = reference();
1214 req->senderData = subRecPtr.i;
1220 sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1221 WaitGCPReq::SignalLength, JBB);
1226 sendSignalWithDelay(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1227 delay, WaitGCPReq::SignalLength);
1232 Trix::execWAIT_GCP_REF(
Signal* signal)
1236 SubscriptionRecPtr subRecPtr;
1237 c_theSubscriptions.getPtr(subRecPtr, ref.senderData);
1238 wait_gcp(signal, subRecPtr, 100);
1242 Trix::execWAIT_GCP_CONF(
Signal* signal)
1246 SubscriptionRecPtr subRecPtr;
1247 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1249 const Uint32 gci_hi = conf->gci_hi;
1250 const Uint32 gci_lo = conf->gci_lo;
1251 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
1253 if (gci > subRecPtr.p->m_gci)
1256 buildComplete(signal, subRecPtr);
1261 wait_gcp(signal, subRecPtr, 100);
1265 void Trix::buildComplete(
Signal* signal, SubscriptionRecPtr subRecPtr)
1268 req->senderRef = reference();
1269 req->senderData = subRecPtr.i;
1270 req->subscriptionId = subRecPtr.p->subscriptionId;
1271 req->subscriptionKey = subRecPtr.p->subscriptionKey;
1272 sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
1273 SubRemoveReq::SignalLength, JBB);
1276 void Trix::buildFailed(
Signal* signal,
1277 SubscriptionRecPtr subRecPtr,
1278 BuildIndxRef::ErrorCode errorCode)
1280 SubscriptionRecord* subRec = subRecPtr.p;
1282 subRec->errorCode = errorCode;
1284 subRec->expectedConf--;
1285 checkParallelism(signal, subRec);
1286 if (subRec->expectedConf == 0)
1287 buildComplete(signal, subRecPtr);
1291 Trix::execSUB_REMOVE_REF(
Signal* signal){
1298 Trix::execSUB_REMOVE_CONF(
Signal* signal){
1303 SubscriptionRecPtr subRecPtr;
1304 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1306 if(subRecPtr.p->prepareId != RNIL){
1310 req->prepareId = subRecPtr.p->prepareId;
1311 req->senderData = subRecPtr.i;
1313 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
1314 UtilReleaseReq::SignalLength , JBB);
1320 conf->senderData = subRecPtr.i;
1321 execUTIL_RELEASE_CONF(signal);
1326 Trix::execUTIL_RELEASE_REF(
Signal* signal){
1332 Trix::execUTIL_RELEASE_CONF(
Signal* signal){
1336 SubscriptionRecPtr subRecPtr;
1337 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1339 switch(subRecPtr.p->requestType){
1342 if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1347 conf->senderRef = reference();
1348 conf->senderData = subRecPtr.p->connectionPtr;
1350 sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_CONF, signal,
1351 CopyDataImplConf::SignalLength , JBB);
1353 infoEvent(
"%s table %u processed %llu rows",
1354 subRecPtr.p->requestType == REORG_COPY ?
1355 "reorg-copy" :
"reorg-delete",
1356 subRecPtr.p->sourceTableId,
1357 subRecPtr.p->m_rows_processed);
1362 ref->senderRef = reference();
1363 ref->senderData = subRecPtr.p->connectionPtr;
1364 ref->errorCode = subRecPtr.p->errorCode;
1366 sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_REF, signal,
1367 CopyDataImplRef::SignalLength , JBB);
1371 if (subRecPtr.p->errorCode == BuildIndxRef::NoError) {
1376 buildIndxConf->senderRef = reference();
1377 buildIndxConf->senderData = subRecPtr.p->connectionPtr;
1379 sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_CONF, signal,
1380 BuildIndxConf::SignalLength , JBB);
1382 infoEvent(
"index-build table %u index: %u processed %llu rows",
1383 subRecPtr.p->sourceTableId,
1384 subRecPtr.p->targetTableId,
1385 subRecPtr.p->m_rows_processed);
1391 buildIndxRef->senderRef = reference();
1392 buildIndxRef->senderData = subRecPtr.p->connectionPtr;
1393 buildIndxRef->errorCode = subRecPtr.p->errorCode;
1395 sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_REF, signal,
1396 BuildIndxRef::SignalLength , JBB);
1400 ndbrequire(subRecPtr.p->errorCode == BuildIndxRef::NoError);
1401 statUtilReleaseConf(signal, subRecPtr.p->m_statPtrI);
1405 subRecPtr.p->prepareId = RNIL;
1406 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1407 statCleanRelease(signal, stat);
1412 subRecPtr.p->prepareId = RNIL;
1413 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1414 statScanRelease(signal, stat);
1420 subRecPtr.p->attributeOrder.release();
1421 c_theSubscriptions.release(subRecPtr.i);
1424 void Trix::checkParallelism(
Signal* signal, SubscriptionRecord* subRec)
1426 if ((subRec->pendingSubSyncContinueConf) &&
1427 (subRec->expectedConf == 1)) {
1431 subSyncContinueConf->subscriptionId = subRec->subscriptionId;
1432 subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
1433 subSyncContinueConf->senderData = subRec->syncPtr;
1434 sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
1435 SubSyncContinueConf::SignalLength , JBB);
1436 subRec->pendingSubSyncContinueConf =
false;
1443 Trix::execCOPY_DATA_IMPL_REQ(
Signal* signal)
1451 SubscriptionRecPtr subRecPtr;
1454 if (!c_theSubscriptions.seize(subRecPtr))
1458 releaseSections(handle);
1462 ref->errorCode = -1;
1463 ref->senderData = req->senderData;
1464 ref->transId = req->transId;
1465 sendSignal(req->senderRef, GSN_COPY_DATA_IMPL_REF, signal,
1466 CopyDataImplRef::SignalLength, JBB);
1470 SubscriptionRecord* subRec = subRecPtr.p;
1471 subRec->errorCode = BuildIndxRef::NoError;
1472 subRec->userReference = req->senderRef;
1473 subRec->connectionPtr = req->senderData;
1474 subRec->schemaTransId = req->transId;
1475 subRec->subscriptionId = rand();
1476 subRec->subscriptionKey = rand();
1477 subRec->indexType = RNIL;
1478 subRec->sourceTableId = req->srcTableId;
1479 subRec->targetTableId = req->dstTableId;
1480 subRec->parallelism = 16;
1481 subRec->expectedConf = 0;
1482 subRec->subscriptionCreated =
false;
1483 subRec->pendingSubSyncContinueConf =
false;
1484 subRec->prepareId = req->transId;
1485 subRec->fragCount = req->srcFragments;
1486 subRec->fragId = ZNIL;
1487 subRec->m_rows_processed = 0;
1488 subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP;
1490 switch(req->requestType){
1491 case CopyDataImplReq::ReorgCopy:
1493 subRec->requestType = REORG_COPY;
1495 case CopyDataImplReq::ReorgDelete:
1496 subRec->requestType = REORG_DELETE;
1499 jamLine(req->requestType);
1503 if (req->requestInfo & CopyDataReq::TupOrder)
1506 subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1510 Uint32 noOfSections = handle.m_cnt;
1511 if (noOfSections > 0) {
1514 handle.getSection(ptr, 0);
1515 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1516 subRec->noOfIndexColumns = ptr.sz;
1519 if (noOfSections > 1) {
1522 handle.getSection(ptr, 1);
1523 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1524 subRec->noOfKeyColumns = ptr.sz;
1527 releaseSections(handle);
1532 utilPrepareReq->senderRef = reference();
1533 utilPrepareReq->senderData = subRecPtr.i;
1534 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1536 const Uint32 pageSizeInWords = 128;
1537 Uint32 propPage[pageSizeInWords];
1541 if (subRec->requestType == REORG_COPY)
1543 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1547 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Delete);
1549 w.add(UtilPrepareReq::ScanTakeOverInd, 1);
1550 w.add(UtilPrepareReq::ReorgInd, 1);
1551 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1553 AttrOrderBuffer::DataBufferIterator iter;
1554 ndbrequire(subRec->attributeOrder.first(iter));
1556 for(Uint32 i = 0; i < subRec->noOfIndexColumns; i++)
1558 w.add(UtilPrepareReq::AttributeId, * iter.data);
1559 subRec->attributeOrder.next(iter);
1563 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1564 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1565 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1566 UtilPrepareReq::SignalLength, JBB,
1567 sectionsPtr, UtilPrepareReq::NoOfSections);
1574 Trix::statOpGetPtr(Uint32 statPtrI)
1576 ndbrequire(statPtrI != RNIL);
1577 return *c_statOpPool.getPtr(statPtrI);
1581 Trix::statOpSeize(Uint32& statPtrI)
1584 if (ERROR_INSERTED(18001) ||
1585 !c_statOpPool.seize(statPtr))
1588 D(
"statOpSeize: seize statOp failed");
1592 memset(statPtr.p, 0xf3,
sizeof(*statPtr.p));
1594 new (statPtr.p) StatOp;
1595 statPtrI = statPtr.i;
1596 StatOp& stat = statOpGetPtr(statPtrI);
1597 stat.m_ownPtrI = statPtrI;
1599 SubscriptionRecPtr subRecPtr;
1600 if (ERROR_INSERTED(18002) ||
1601 !c_theSubscriptions.seize(subRecPtr))
1604 c_statOpPool.release(statPtr);
1605 D(
"statOpSeize: seize subRec failed");
1608 SubscriptionRecord* subRec = subRecPtr.p;
1609 subRec->m_statPtrI = stat.m_ownPtrI;
1610 stat.m_subRecPtrI = subRecPtr.i;
1612 D(
"statOpSeize" << V(statPtrI) << V(subRecPtr.i));
1617 Trix::statOpRelease(StatOp& stat)
1619 StatOp::Util& util = stat.m_util;
1620 D(
"statOpRelease" << V(stat));
1622 if (stat.m_subRecPtrI != RNIL)
1625 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1626 ndbrequire(subRec->prepareId == RNIL);
1627 subRec->attributeOrder.release();
1628 c_theSubscriptions.release(stat.m_subRecPtrI);
1629 stat.m_subRecPtrI = RNIL;
1631 ndbrequire(util.m_prepareId == RNIL);
1632 c_statOpPool.release(stat.m_ownPtrI);
1636 Trix::execINDEX_STAT_IMPL_REQ(
Signal* signal)
1641 Uint32 statPtrI = RNIL;
1642 if (!statOpSeize(statPtrI))
1645 statOpRef(signal, req, IndexStatRef::NoFreeStatOp, __LINE__);
1648 StatOp& stat = statOpGetPtr(statPtrI);
1650 stat.m_requestType = req->requestType;
1653 switch (stat.m_requestType) {
1654 case IndexStatReq::RT_CLEAN_NEW:
1656 stat.m_requestName =
"clean new";
1658 case IndexStatReq::RT_CLEAN_OLD:
1660 stat.m_requestName =
"clean old";
1662 case IndexStatReq::RT_CLEAN_ALL:
1664 stat.m_requestName =
"clean all";
1666 case IndexStatReq::RT_SCAN_FRAG:
1668 stat.m_requestName =
"scan frag";
1670 case IndexStatReq::RT_DROP_HEAD:
1672 stat.m_requestName =
"drop head";
1679 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1680 subRec->prepareId = RNIL;
1681 subRec->errorCode = BuildIndxRef::NoError;
1684 if (!c_statGetMetaDone)
1687 statMetaGetHead(signal, stat);
1690 statGetMetaDone(signal, stat);
1695 const Trix::SysColumn
1696 Trix::g_statMetaHead_column[] = {
1700 { 1,
"index_version",
1709 { 4,
"value_format",
1712 { 5,
"sample_version",
1718 { 7,
"sample_count",
1726 const Trix::SysColumn
1727 Trix::g_statMetaSample_column[] = {
1731 { 1,
"index_version",
1734 { 2,
"sample_version",
1745 const Trix::SysTable
1746 Trix::g_statMetaHead = {
1749 sizeof(g_statMetaHead_column)/
sizeof(g_statMetaHead_column[0]),
1750 g_statMetaHead_column
1753 const Trix::SysTable
1754 Trix::g_statMetaSample = {
1757 sizeof(g_statMetaSample_column)/
sizeof(g_statMetaSample_column[0]),
1758 g_statMetaSample_column
1761 const Trix::SysIndex
1762 Trix::g_statMetaSampleX1 = {
1764 "sys" "/" NDB_INDEX_STAT_SCHEMA
"/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
1770 Trix::statMetaGetHead(
Signal* signal, StatOp& stat)
1772 D(
"statMetaGetHead" << V(stat));
1773 StatOp::Meta& meta = stat.m_meta;
1774 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetHeadCB);
1775 meta.m_cb.m_callbackData = stat.m_ownPtrI;
1776 const char*
name = g_statMetaHead.name;
1777 sendGetTabInfoReq(signal, stat, name);
1781 Trix::statMetaGetHeadCB(
Signal* signal, Uint32 statPtrI, Uint32
ret)
1783 StatOp& stat = statOpGetPtr(statPtrI);
1784 D(
"statMetaGetHeadCB" << V(stat) << V(ret));
1785 StatOp::Meta& meta = stat.m_meta;
1789 Uint32 supress[] = { GetTabInfoRef::TableNotDefined, 0 };
1790 statOpError(signal, stat, ret, __LINE__, supress);
1793 g_statMetaHead.tableId = meta.m_conf.tableId;
1794 statMetaGetSample(signal, stat);
1798 Trix::statMetaGetSample(
Signal* signal, StatOp& stat)
1800 D(
"statMetaGetSample" << V(stat));
1801 StatOp::Meta& meta = stat.m_meta;
1802 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleCB);
1803 meta.m_cb.m_callbackData = stat.m_ownPtrI;
1804 const char*
name = g_statMetaSample.name;
1805 sendGetTabInfoReq(signal, stat, name);
1809 Trix::statMetaGetSampleCB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
1811 StatOp& stat = statOpGetPtr(statPtrI);
1812 D(
"statMetaGetSampleCB" << V(stat) << V(ret));
1813 StatOp::Meta& meta = stat.m_meta;
1817 statOpError(signal, stat, ret, __LINE__);
1820 g_statMetaSample.tableId = meta.m_conf.tableId;
1821 statMetaGetSampleX1(signal, stat);
1825 Trix::statMetaGetSampleX1(
Signal* signal, StatOp& stat)
1827 D(
"statMetaGetSampleX1" << V(stat));
1828 StatOp::Meta& meta = stat.m_meta;
1829 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleX1CB);
1830 meta.m_cb.m_callbackData = stat.m_ownPtrI;
1831 const char* name_fmt = g_statMetaSampleX1.name;
1832 char name[MAX_TAB_NAME_SIZE];
1834 sendGetTabInfoReq(signal, stat, name);
1838 Trix::statMetaGetSampleX1CB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
1840 StatOp& stat = statOpGetPtr(statPtrI);
1841 D(
"statMetaGetSampleX1CB" << V(stat) << V(ret));
1842 StatOp::Meta& meta = stat.m_meta;
1846 statOpError(signal, stat, ret, __LINE__);
1849 g_statMetaSampleX1.tableId = g_statMetaSample.tableId;
1850 g_statMetaSampleX1.indexId = meta.m_conf.tableId;
1851 statGetMetaDone(signal, stat);
1855 Trix::sendGetTabInfoReq(
Signal* signal, StatOp& stat,
const char* name)
1857 D(
"sendGetTabInfoReq" << V(stat) << V(name));
1860 Uint32 name_len = (Uint32)strlen(name) + 1;
1861 Uint32 name_len_words = (name_len + 3 ) / 4;
1862 Uint32 name_buf[32];
1863 ndbrequire(name_len_words <= 32);
1864 memset(name_buf, 0,
sizeof(name_buf));
1865 memcpy(name_buf, name, name_len);
1867 req->senderData = stat.m_ownPtrI;
1868 req->senderRef = reference();
1869 req->requestType = GetTabInfoReq::RequestByName |
1870 GetTabInfoReq::LongSignalConf;
1871 req->tableNameLen = name_len;
1872 req->schemaTransId = 0;
1874 ptr[0].p = name_buf;
1875 ptr[0].sz = name_len_words;
1876 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ,
1877 signal, GetTabInfoReq::SignalLength, JBB, ptr, 1);
1881 Trix::execGET_TABINFO_CONF(
Signal* signal)
1889 StatOp& stat = statOpGetPtr(conf->senderData);
1890 D(
"execGET_TABINFO_CONF" << V(stat));
1891 StatOp::Meta& meta = stat.m_meta;
1892 meta.m_conf = *conf;
1896 releaseSections(handle);
1898 execute(signal, meta.m_cb, 0);
1902 Trix::execGET_TABINFO_REF(
Signal* signal)
1906 StatOp& stat = statOpGetPtr(ref->senderData);
1907 D(
"execGET_TABINFO_REF" << V(stat));
1908 StatOp::Meta& meta = stat.m_meta;
1910 ndbrequire(ref->errorCode != 0);
1911 execute(signal, meta.m_cb, ref->errorCode);
1917 Trix::statGetMetaDone(
Signal* signal, StatOp& stat)
1920 StatOp::Data& data = stat.m_data;
1921 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1922 D(
"statGetMetaDone" << V(stat));
1926 subRec->requestType = STAT_UTIL;
1928 ndbrequire(req->fragCount != 0);
1929 data.m_indexId = req->indexId;
1930 data.m_indexVersion = req->indexVersion;
1931 data.m_fragCount = req->fragCount;
1932 statHeadRead(signal, stat);
1938 Trix::statHeadRead(
Signal* signal, StatOp& stat)
1940 StatOp::Util& util = stat.m_util;
1941 StatOp::Send& send = stat.m_send;
1942 D(
"statHeadRead" << V(stat));
1944 util.m_not_found =
false;
1945 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadReadCB);
1946 util.m_cb.m_callbackData = stat.m_ownPtrI;
1947 send.m_sysTable = &g_statMetaHead;
1948 send.m_operationType = UtilPrepareReq::Read;
1949 statUtilPrepare(signal, stat);
1953 Trix::statHeadReadCB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
1955 StatOp& stat = statOpGetPtr(statPtrI);
1956 StatOp::Data& data = stat.m_data;
1957 StatOp::Util& util = stat.m_util;
1958 D(
"statHeadReadCB" << V(stat) << V(ret));
1960 ndbrequire(ret == 0);
1961 data.m_head_found = !util.m_not_found;
1962 statReadHeadDone(signal, stat);
1966 Trix::statHeadInsert(
Signal* signal, StatOp& stat)
1968 StatOp::Util& util = stat.m_util;
1969 StatOp::Send& send = stat.m_send;
1970 D(
"statHeadInsert" << V(stat));
1972 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadInsertCB);
1973 util.m_cb.m_callbackData = stat.m_ownPtrI;
1974 send.m_sysTable = &g_statMetaHead;
1975 send.m_operationType = UtilPrepareReq::Insert;
1976 statUtilPrepare(signal, stat);
1980 Trix::statHeadInsertCB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
1982 StatOp& stat = statOpGetPtr(statPtrI);
1983 D(
"statHeadInsertCB" << V(stat) << V(ret));
1985 ndbrequire(ret == 0);
1986 statInsertHeadDone(signal, stat);
1990 Trix::statHeadUpdate(
Signal* signal, StatOp& stat)
1992 StatOp::Util& util = stat.m_util;
1993 StatOp::Send& send = stat.m_send;
1994 D(
"statHeadUpdate" << V(stat));
1996 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadUpdateCB);
1997 util.m_cb.m_callbackData = stat.m_ownPtrI;
1998 send.m_sysTable = &g_statMetaHead;
1999 send.m_operationType = UtilPrepareReq::Update;
2000 statUtilPrepare(signal, stat);
2004 Trix::statHeadUpdateCB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
2006 StatOp& stat = statOpGetPtr(statPtrI);
2007 D(
"statHeadUpdateCB" << V(stat) << V(ret));
2009 ndbrequire(ret == 0);
2010 statUpdateHeadDone(signal, stat);
2014 Trix::statHeadDelete(
Signal* signal, StatOp& stat)
2016 StatOp::Util& util = stat.m_util;
2017 StatOp::Send& send = stat.m_send;
2018 D(
"statHeadDelete" << V(stat));
2020 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadDeleteCB);
2021 util.m_cb.m_callbackData = stat.m_ownPtrI;
2022 send.m_sysTable = &g_statMetaHead;
2023 send.m_operationType = UtilPrepareReq::Delete;
2024 statUtilPrepare(signal, stat);
2028 Trix::statHeadDeleteCB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
2030 StatOp& stat = statOpGetPtr(statPtrI);
2031 D(
"statHeadDeleteCB" << V(stat) << V(ret));
2033 ndbrequire(ret == 0);
2034 statDeleteHeadDone(signal, stat);
2040 Trix::statUtilPrepare(
Signal* signal, StatOp& stat)
2042 StatOp::Util& util = stat.m_util;
2043 D(
"statUtilPrepare" << V(stat));
2045 util.m_prepareId = RNIL;
2046 statSendPrepare(signal, stat);
2050 Trix::statUtilPrepareConf(
Signal* signal, Uint32 statPtrI)
2052 StatOp& stat = statOpGetPtr(statPtrI);
2053 StatOp::Util& util = stat.m_util;
2054 StatOp::Send& send = stat.m_send;
2055 D(
"statUtilPrepareConf" << V(stat));
2059 util.m_prepareId = utilConf->prepareId;
2061 const Uint32 ot = send.m_operationType;
2062 if ((ERROR_INSERTED(18011) && ot == UtilPrepareReq::Read) ||
2063 (ERROR_INSERTED(18012) && ot != UtilPrepareReq::Read))
2066 CLEAR_ERROR_INSERT_VALUE;
2069 utilRef->senderData = stat.m_ownPtrI;
2070 utilRef->errorCode = UtilExecuteRef::AllocationError;
2071 utilRef->TCErrorCode = 0;
2072 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2073 signal, UtilExecuteRef::SignalLength, JBB);
2077 statUtilExecute(signal, stat);
2081 Trix::statUtilPrepareRef(
Signal* signal, Uint32 statPtrI)
2083 StatOp& stat = statOpGetPtr(statPtrI);
2084 D(
"statUtilPrepareRef" << V(stat));
2088 Uint32 errorCode = utilRef->errorCode;
2089 ndbrequire(errorCode != 0);
2091 switch (errorCode) {
2092 case UtilPrepareRef::PREPARE_SEIZE_ERROR:
2093 case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
2094 case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
2095 errorCode = IndexStatRef::BusyUtilPrepare;
2097 case UtilPrepareRef::DICT_TAB_INFO_ERROR:
2098 errorCode = IndexStatRef::InvalidSysTable;
2100 case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
2105 statOpError(signal, stat, errorCode, __LINE__);
2109 Trix::statUtilExecute(
Signal* signal, StatOp& stat)
2111 StatOp::Util& util = stat.m_util;
2112 StatOp::Send& send = stat.m_send;
2113 D(
"statUtilExecute" << V(stat));
2115 send.m_prepareId = util.m_prepareId;
2116 statSendExecute(signal, stat);
2120 Trix::statUtilExecuteConf(
Signal* signal, Uint32 statPtrI)
2122 StatOp& stat = statOpGetPtr(statPtrI);
2123 StatOp::Attr& attr = stat.m_attr;
2124 StatOp::Send& send = stat.m_send;
2125 D(
"statUtilExecuteConf" << V(stat));
2127 if (send.m_operationType == UtilPrepareReq::Read)
2133 attr.m_attr = rattr;
2134 attr.m_attrMax = 20;
2135 attr.m_attrSize = 0;
2136 attr.m_data = rdata;
2137 attr.m_dataMax = 2048;
2138 attr.m_dataSize = 0;
2141 handle.getSection(ssPtr, 0);
2142 ::copy(rattr, ssPtr);
2146 handle.getSection(ssPtr, 1);
2147 ::copy(rdata, ssPtr);
2149 releaseSections(handle);
2151 const SysTable& sysTable = *send.m_sysTable;
2152 for (Uint32 i = 0; i < sysTable.columnCount; i++)
2155 statDataIn(stat, i);
2159 statUtilRelease(signal, stat);
2163 Trix::statUtilExecuteRef(
Signal* signal, Uint32 statPtrI)
2165 StatOp& stat = statOpGetPtr(statPtrI);
2166 StatOp::Util& util = stat.m_util;
2167 StatOp::Send& send = stat.m_send;
2168 D(
"statUtilExecuteRef" << V(stat));
2172 Uint32 errorCode = utilRef->errorCode;
2173 ndbrequire(errorCode != 0);
2175 switch (errorCode) {
2176 case UtilExecuteRef::TCError:
2177 errorCode = utilRef->TCErrorCode;
2178 ndbrequire(errorCode != 0);
2179 if (send.m_operationType == UtilPrepareReq::Read &&
2180 errorCode == ZNOT_FOUND)
2183 util.m_not_found =
true;
2187 case UtilExecuteRef::AllocationError:
2188 errorCode = IndexStatRef::BusyUtilExecute;
2198 statOpError(signal, stat, errorCode, __LINE__);
2201 statUtilRelease(signal, stat);
2205 Trix::statUtilRelease(
Signal* signal, StatOp& stat)
2207 StatOp::Util& util = stat.m_util;
2208 StatOp::Send& send = stat.m_send;
2209 D(
"statUtilRelease" << V(stat));
2211 send.m_prepareId = util.m_prepareId;
2212 statSendRelease(signal, stat);
2216 Trix::statUtilReleaseConf(
Signal* signal, Uint32 statPtrI)
2218 StatOp& stat = statOpGetPtr(statPtrI);
2219 StatOp::Util& util = stat.m_util;
2220 D(
"statUtilReleaseConf" << V(stat));
2222 util.m_prepareId = RNIL;
2223 execute(signal, util.m_cb, 0);
2229 Trix::statReadHeadDone(
Signal* signal, StatOp& stat)
2232 D(
"statReadHeadDone" << V(stat));
2234 switch (stat.m_requestType) {
2235 case IndexStatReq::RT_CLEAN_NEW:
2237 case IndexStatReq::RT_CLEAN_OLD:
2239 case IndexStatReq::RT_CLEAN_ALL:
2241 statCleanBegin(signal, stat);
2244 case IndexStatReq::RT_SCAN_FRAG:
2246 statScanBegin(signal, stat);
2249 case IndexStatReq::RT_DROP_HEAD:
2251 statDropBegin(signal, stat);
2261 Trix::statInsertHeadDone(
Signal* signal, StatOp& stat)
2263 D(
"statInsertHeadDone" << V(stat));
2265 switch (stat.m_requestType) {
2266 case IndexStatReq::RT_SCAN_FRAG:
2268 statScanEnd(signal, stat);
2277 Trix::statUpdateHeadDone(
Signal* signal, StatOp& stat)
2279 D(
"statUpdateHeadDone" << V(stat));
2281 switch (stat.m_requestType) {
2282 case IndexStatReq::RT_SCAN_FRAG:
2284 statScanEnd(signal, stat);
2293 Trix::statDeleteHeadDone(
Signal* signal, StatOp& stat)
2295 D(
"statDeleteHeadDone" << V(stat));
2297 switch (stat.m_requestType) {
2298 case IndexStatReq::RT_DROP_HEAD:
2300 statDropEnd(signal, stat);
2311 Trix::statCleanBegin(
Signal* signal, StatOp& stat)
2314 StatOp::Data& data = stat.m_data;
2315 D(
"statCleanBegin" << V(stat));
2317 if (data.m_head_found ==
true)
2320 if (data.m_tableId != req->tableId &&
2321 stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2325 statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2331 if (stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2335 stat.m_requestType = IndexStatReq::RT_CLEAN_ALL;
2338 statCleanPrepare(signal, stat);
2342 Trix::statCleanPrepare(
Signal* signal, StatOp& stat)
2345 StatOp::Data& data = stat.m_data;
2346 StatOp::Clean& clean = stat.m_clean;
2347 StatOp::Send& send = stat.m_send;
2348 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2349 D(
"statCleanPrepare" << V(stat));
2352 clean.m_cleanCount = 0;
2354 const Uint32 ao_list[] = {
2360 const Uint32 ao_size =
sizeof(ao_list)/
sizeof(ao_list[0]);
2362 ndbrequire(req->fragId == ZNIL);
2363 subRec->m_flags = 0;
2364 subRec->requestType = STAT_CLEAN;
2365 subRec->schemaTransId = req->transId;
2366 subRec->userReference = 0;
2367 subRec->connectionPtr = RNIL;
2368 subRec->subscriptionId = rand();
2369 subRec->subscriptionKey = rand();
2370 subRec->prepareId = RNIL;
2371 subRec->indexType = 0;
2372 subRec->sourceTableId = g_statMetaSampleX1.indexId;
2373 subRec->targetTableId = RNIL;
2374 subRec->noOfIndexColumns = ao_size;
2375 subRec->noOfKeyColumns = 0;
2376 subRec->parallelism = 16;
2377 subRec->fragCount = 0;
2378 subRec->fragId = ZNIL;
2379 subRec->syncPtr = RNIL;
2380 subRec->errorCode = BuildIndxRef::NoError;
2381 subRec->subscriptionCreated =
false;
2382 subRec->pendingSubSyncContinueConf =
false;
2383 subRec->expectedConf = 0;
2384 subRec->m_rows_processed = 0;
2387 AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2388 ndbrequire(ao_buf.isEmpty());
2389 ao_buf.append(ao_list, ao_size);
2392 clean.m_bound[0] = TuxBoundInfo::BoundEQ;
2394 clean.m_bound[2] = data.m_indexId;
2395 clean.m_bound[3] = TuxBoundInfo::BoundEQ;
2397 clean.m_bound[5] = data.m_indexVersion;
2398 switch (stat.m_requestType) {
2399 case IndexStatReq::RT_CLEAN_NEW:
2400 D(
"statCleanPrepare delete sample versions > " << data.m_sampleVersion);
2401 clean.m_bound[6] = TuxBoundInfo::BoundLT;
2403 clean.m_bound[8] = data.m_sampleVersion;
2404 clean.m_boundCount = 3;
2406 case IndexStatReq::RT_CLEAN_OLD:
2407 D(
"statCleanPrepare delete sample versions < " << data.m_sampleVersion);
2408 clean.m_bound[6] = TuxBoundInfo::BoundGT;
2410 clean.m_bound[8] = data.m_sampleVersion;
2411 clean.m_boundCount = 3;
2413 case IndexStatReq::RT_CLEAN_ALL:
2414 D(
"statCleanPrepare delete all sample versions");
2415 clean.m_boundCount = 2;
2421 clean.m_boundSize = 3 * clean.m_boundCount;
2424 send.m_sysTable = &g_statMetaSample;
2425 send.m_operationType = UtilPrepareReq::Delete;
2426 statSendPrepare(signal, stat);
2430 Trix::statCleanExecute(
Signal* signal, StatOp& stat)
2432 StatOp::Data& data = stat.m_data;
2433 StatOp::Send& send = stat.m_send;
2434 StatOp::Clean& clean = stat.m_clean;
2435 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2436 D(
"statCleanExecute" << V(stat));
2439 ndbrequire(handle.m_cnt == 2);
2444 handle.getSection(ptr0, SubTableData::ATTR_INFO);
2445 ndbrequire(ptr0.sz == 4);
2446 ::copy((Uint32*)ah, ptr0);
2447 ndbrequire(ah[0].getAttributeId() == 0 && ah[0].getDataSize() == 1);
2448 ndbrequire(ah[1].getAttributeId() == 1 && ah[1].getDataSize() == 1);
2449 ndbrequire(ah[2].getAttributeId() == 2 && ah[2].getDataSize() == 1);
2451 const Uint32 kz = ah[3].getDataSize();
2452 ndbrequire(ah[3].getAttributeId() == 3 && kz != 0);
2455 const Uint32 avmax = 3 + MAX_INDEX_STAT_KEY_SIZE;
2458 handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2459 ndbrequire(ptr1.sz <= avmax);
2461 ndbrequire(data.m_indexId == av[0]);
2462 ndbrequire(data.m_indexVersion == av[1]);
2463 data.m_sampleVersion = av[2];
2464 data.m_statKey = &av[3];
2465 const char* kp = (
const char*)data.m_statKey;
2466 const Uint32 kb = kp[0] + (kp[1] << 8);
2468 ndbrequire(kb != 0);
2469 ndbrequire(kz == ((2 + kb) + 3) / 4);
2471 clean.m_cleanCount++;
2472 releaseSections(handle);
2474 const Uint32 rt = stat.m_requestType;
2475 if ((ERROR_INSERTED(18021) && rt == IndexStatReq::RT_CLEAN_NEW) ||
2476 (ERROR_INSERTED(18022) && rt == IndexStatReq::RT_CLEAN_OLD) ||
2477 (ERROR_INSERTED(18023) && rt == IndexStatReq::RT_CLEAN_ALL))
2480 CLEAR_ERROR_INSERT_VALUE;
2483 utilRef->senderData = stat.m_ownPtrI;
2484 utilRef->errorCode = UtilExecuteRef::TCError;
2485 utilRef->TCErrorCode = 626;
2486 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2487 signal, UtilExecuteRef::SignalLength, JBB);
2488 subRec->expectedConf++;
2493 send.m_sysTable = &g_statMetaSample;
2494 send.m_operationType = UtilPrepareReq::Delete;
2495 send.m_prepareId = subRec->prepareId;
2496 subRec->expectedConf++;
2497 statSendExecute(signal, stat);
2501 Trix::statCleanRelease(
Signal* signal, StatOp& stat)
2503 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2504 D(
"statCleanRelease" << V(stat) << V(subRec->errorCode));
2506 if (subRec->errorCode != 0)
2509 statOpError(signal, stat, subRec->errorCode, __LINE__);
2512 statCleanEnd(signal, stat);
2516 Trix::statCleanEnd(
Signal* signal, StatOp& stat)
2518 D(
"statCleanEnd" << V(stat));
2519 statOpSuccess(signal, stat);
2525 Trix::statScanBegin(
Signal* signal, StatOp& stat)
2528 StatOp::Data& data = stat.m_data;
2529 D(
"statScanBegin" << V(stat));
2531 if (data.m_head_found ==
true &&
2532 data.m_tableId != req->tableId)
2535 statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2538 data.m_tableId = req->tableId;
2539 statScanPrepare(signal, stat);
2543 Trix::statScanPrepare(
Signal* signal, StatOp& stat)
2546 StatOp::Data& data = stat.m_data;
2547 StatOp::Scan& scan = stat.m_scan;
2548 StatOp::Send& send = stat.m_send;
2549 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2550 D(
"statScanPrepare" << V(stat));
2553 if (data.m_head_found ==
false)
2554 data.m_sampleVersion = 0;
2555 data.m_sampleVersion += 1;
2558 scan.m_sampleCount = 0;
2559 scan.m_keyBytes = 0;
2561 const Uint32 ao_list[] = {
2562 AttributeHeader::INDEX_STAT_KEY,
2563 AttributeHeader::INDEX_STAT_VALUE
2565 const Uint32 ao_size =
sizeof(ao_list)/
sizeof(ao_list[0]);
2567 ndbrequire(req->fragId != ZNIL);
2568 subRec->m_flags = 0;
2569 subRec->requestType = STAT_SCAN;
2570 subRec->schemaTransId = req->transId;
2571 subRec->userReference = 0;
2572 subRec->connectionPtr = RNIL;
2573 subRec->subscriptionId = rand();
2574 subRec->subscriptionKey = rand();
2575 subRec->prepareId = RNIL;
2576 subRec->indexType = 0;
2577 subRec->sourceTableId = data.m_indexId;
2578 subRec->targetTableId = RNIL;
2579 subRec->noOfIndexColumns = ao_size;
2580 subRec->noOfKeyColumns = 0;
2581 subRec->parallelism = 16;
2582 subRec->fragCount = 0;
2583 subRec->fragId = req->fragId;
2584 subRec->syncPtr = RNIL;
2585 subRec->errorCode = BuildIndxRef::NoError;
2586 subRec->subscriptionCreated =
false;
2587 subRec->pendingSubSyncContinueConf =
false;
2588 subRec->expectedConf = 0;
2589 subRec->m_rows_processed = 0;
2592 AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2593 ndbrequire(ao_buf.isEmpty());
2594 ao_buf.append(ao_list, ao_size);
2597 send.m_sysTable = &g_statMetaSample;
2598 send.m_operationType = UtilPrepareReq::Insert;
2599 statSendPrepare(signal, stat);
2603 Trix::statScanExecute(
Signal* signal, StatOp& stat)
2605 StatOp::Data& data = stat.m_data;
2606 StatOp::Scan& scan = stat.m_scan;
2607 StatOp::Send& send = stat.m_send;
2608 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2609 D(
"statScanExecute" << V(stat));
2612 ndbrequire(handle.m_cnt == 2);
2617 handle.getSection(ptr0, SubTableData::ATTR_INFO);
2618 ndbrequire(ptr0.sz == 2);
2619 ::copy((Uint32*)ah, ptr0);
2620 ndbrequire(ah[0].getAttributeId() == AttributeHeader::INDEX_STAT_KEY);
2621 ndbrequire(ah[1].getAttributeId() == AttributeHeader::INDEX_STAT_VALUE);
2623 const Uint32 kz = ah[0].getDataSize();
2624 const Uint32 vz = ah[1].getDataSize();
2625 ndbrequire(kz != 0 && vz != 0);
2628 const Uint32 avmax = MAX_INDEX_STAT_KEY_SIZE + MAX_INDEX_STAT_VALUE_SIZE;
2631 handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2632 ndbrequire(ptr1.sz <= avmax);
2634 data.m_statKey = &av[0];
2635 data.m_statValue = &av[kz];
2636 const char* kp = (
const char*)data.m_statKey;
2637 const char* vp = (
const char*)data.m_statValue;
2638 const Uint32 kb = kp[0] + (kp[1] << 8);
2639 const Uint32 vb = vp[0] + (vp[1] << 8);
2641 ndbrequire(kb != 0 && vb != 0);
2642 ndbrequire(kz == ((2 + kb) + 3) / 4);
2643 ndbrequire(vz == ((2 + vb) + 3) / 4);
2645 scan.m_sampleCount++;
2646 scan.m_keyBytes += kb;
2647 releaseSections(handle);
2649 if (ERROR_INSERTED(18024))
2652 CLEAR_ERROR_INSERT_VALUE;
2655 utilRef->senderData = stat.m_ownPtrI;
2656 utilRef->errorCode = UtilExecuteRef::TCError;
2657 utilRef->TCErrorCode = 630;
2658 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2659 signal, UtilExecuteRef::SignalLength, JBB);
2660 subRec->expectedConf++;
2665 send.m_sysTable = &g_statMetaSample;
2666 send.m_operationType = UtilPrepareReq::Insert;
2667 send.m_prepareId = subRec->prepareId;
2668 subRec->expectedConf++;
2669 statSendExecute(signal, stat);
2673 Trix::statScanRelease(
Signal* signal, StatOp& stat)
2675 StatOp::Data& data = stat.m_data;
2676 StatOp::Scan& scan = stat.m_scan;
2677 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2678 D(
"statScanRelease" << V(stat) << V(subRec->errorCode));
2680 if (subRec->errorCode != 0)
2683 statOpError(signal, stat, subRec->errorCode, __LINE__);
2686 subRec->requestType = STAT_UTIL;
2688 const Uint32 now = (Uint32)time(0);
2689 data.m_loadTime = now;
2690 data.m_sampleCount = scan.m_sampleCount;
2691 data.m_keyBytes = scan.m_keyBytes;
2692 data.m_valueFormat = MAX_INDEX_STAT_VALUE_FORMAT;
2694 if (data.m_head_found ==
false)
2697 statHeadInsert(signal, stat);
2702 statHeadUpdate(signal, stat);
2707 Trix::statScanEnd(
Signal* signal, StatOp& stat)
2709 StatOp::Data& data = stat.m_data;
2711 D(
"statScanEnd" << V(stat));
2719 #if trix_index_stat_rep_to_tux_instance
2720 Uint32 instanceKey = getInstanceKey(req->indexId, req->fragId);
2721 BlockReference tuxRef = numberToRef(DBTUX, instanceKey, getOwnNodeId());
2723 BlockReference tuxRef = DBTUX_REF;
2727 rep->senderRef = reference();
2728 rep->senderData = 0;
2729 rep->requestType = IndexStatRep::RT_UPDATE_CONF;
2730 rep->requestFlag = 0;
2731 rep->indexId = req->indexId;
2732 rep->indexVersion = req->indexVersion;
2733 rep->tableId = req->tableId;
2734 rep->fragId = req->fragId;
2735 rep->loadTime = data.m_loadTime;
2736 sendSignal(tuxRef, GSN_INDEX_STAT_REP,
2737 signal, IndexStatRep::SignalLength, JBB);
2739 statOpSuccess(signal, stat);
2745 Trix::statDropBegin(
Signal* signal, StatOp& stat)
2747 StatOp::Data& data = stat.m_data;
2748 D(
"statDropBegin" << V(stat));
2750 if (data.m_head_found ==
true)
2753 statHeadDelete(signal, stat);
2756 statDropEnd(signal, stat);
2760 Trix::statDropEnd(
Signal* signal, StatOp& stat)
2763 statOpSuccess(signal, stat);
2769 Trix::statSendPrepare(
Signal* signal, StatOp& stat)
2771 StatOp::Send& send = stat.m_send;
2773 const SysTable& sysTable = *send.m_sysTable;
2774 D(
"statSendPrepare" << V(stat));
2778 utilReq->senderData = stat.m_ownPtrI;
2779 utilReq->senderRef = reference();
2780 utilReq->schemaTransId = req->transId;
2787 w.add(UtilPrepareReq::OperationType, send.m_operationType);
2788 w.add(UtilPrepareReq::TableId, sysTable.tableId);
2791 for (i = 0; i < sysTable.columnCount; i++) {
2792 const SysColumn& c = sysTable.columnList[
i];
2793 switch (send.m_operationType) {
2794 case UtilPrepareReq::Read:
2795 case UtilPrepareReq::Insert:
2796 case UtilPrepareReq::Update:
2798 w.add(UtilPrepareReq::AttributeId, i);
2800 case UtilPrepareReq::Delete:
2803 w.add(UtilPrepareReq::AttributeId, i);
2812 ptr[0].p = &wbuf[0];
2813 ptr[0].sz = w.getWordsUsed();
2814 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ,
2815 signal, UtilPrepareReq::SignalLength, JBB, ptr, 1);
2819 Trix::statSendExecute(
Signal* signal, StatOp& stat)
2821 D(
"statSendExecute" << V(stat));
2822 StatOp::Send& send = stat.m_send;
2823 StatOp::Attr& attr = stat.m_attr;
2824 const SysTable& sysTable = *send.m_sysTable;
2828 utilReq->senderData = stat.m_ownPtrI;
2829 utilReq->senderRef = reference();
2830 utilReq->prepareId = send.m_prepareId;
2831 utilReq->scanTakeOver = 0;
2835 attr.m_attr = wattr;
2836 attr.m_attrMax = 20;
2837 attr.m_attrSize = 0;
2838 attr.m_data = wdata;
2839 attr.m_dataMax = 2048;
2840 attr.m_dataSize = 0;
2842 for (Uint32 i = 0; i < sysTable.columnCount; i++) {
2843 const SysColumn& c = sysTable.columnList[
i];
2844 switch (send.m_operationType) {
2845 case UtilPrepareReq::Read:
2846 case UtilPrepareReq::Insert:
2847 case UtilPrepareReq::Update:
2849 statDataOut(stat, i);
2851 case UtilPrepareReq::Delete:
2854 statDataOut(stat, i);
2863 ptr[0].p = attr.m_attr;
2864 ptr[0].sz = attr.m_attrSize;
2865 ptr[1].p = attr.m_data;
2866 ptr[1].sz = attr.m_dataSize;
2867 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ,
2868 signal, UtilExecuteReq::SignalLength, JBB, ptr, 2);
2872 Trix::statSendRelease(
Signal* signal, StatOp& stat)
2874 D(
"statSendRelease" << V(stat));
2875 StatOp::Send& send = stat.m_send;
2876 ndbrequire(send.m_prepareId != RNIL);
2880 utilReq->senderData = stat.m_ownPtrI;
2881 utilReq->prepareId = send.m_prepareId;
2882 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ,
2883 signal, UtilReleaseReq::SignalLength, JBB);
2889 Trix::statDataPtr(StatOp& stat, Uint32 i, Uint32*& dptr, Uint32& bytes)
2891 StatOp::Data& data = stat.m_data;
2892 StatOp::Send& send = stat.m_send;
2894 const SysTable& sysTable = *send.m_sysTable;
2895 ndbrequire(i < sysTable.columnCount);
2898 if (&sysTable == &g_statMetaHead)
2902 dptr = &data.m_indexId;
2906 dptr = &data.m_indexVersion;
2910 dptr = &data.m_tableId;
2914 dptr = &data.m_fragCount;
2918 dptr = &data.m_valueFormat;
2922 dptr = &data.m_sampleVersion;
2926 dptr = &data.m_loadTime;
2930 dptr = &data.m_sampleCount;
2934 dptr = &data.m_keyBytes;
2944 if (&sysTable == &g_statMetaSample)
2948 dptr = &data.m_indexId;
2952 dptr = &data.m_indexVersion;
2956 dptr = &data.m_sampleVersion;
2961 dptr = data.m_statKey;
2962 const uchar* p = (uchar*)dptr;
2964 bytes = 2 + p[0] + (p[1] << 8);
2969 dptr = data.m_statValue;
2970 const uchar* p = (uchar*)dptr;
2972 bytes = 2 + p[0] + (p[1] << 8);
2986 Trix::statDataOut(StatOp& stat, Uint32 i)
2988 StatOp::Attr& attr = stat.m_attr;
2991 statDataPtr(stat, i, dptr, bytes);
2993 ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
2997 Uint32 words = (bytes + 3) / 4;
2998 ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
2999 Uint8* dst = (Uint8*)&attr.m_data[attr.m_dataSize];
3000 memcpy(dst, dptr, bytes);
3001 while (bytes < words * 4)
3003 attr.m_dataSize += words;
3004 D(
"statDataOut" << V(i) << V(bytes) << hex << V(dptr[0]));
3008 Trix::statDataIn(StatOp& stat, Uint32 i)
3010 StatOp::Attr& attr = stat.m_attr;
3013 statDataPtr(stat, i, dptr, bytes);
3015 ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3019 ndbrequire(ah.getByteSize() == bytes);
3020 Uint32 words = (bytes + 3) / 4;
3021 ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3022 const char* src = (
const char*)&attr.m_data[attr.m_dataSize];
3023 memcpy(dptr, src, bytes);
3024 attr.m_dataSize += words;
3025 D(
"statDataIn" << V(i) << V(bytes) << hex << V(dptr[0]));
3031 Trix::statAbortUtil(
Signal* signal, StatOp& stat)
3033 StatOp::Util& util = stat.m_util;
3034 D(
"statAbortUtil" << V(stat));
3036 ndbrequire(util.m_prepareId != RNIL);
3037 util.m_cb.m_callbackFunction = safe_cast(&Trix::statAbortUtilCB);
3038 util.m_cb.m_callbackData = stat.m_ownPtrI;
3039 statUtilRelease(signal, stat);
3043 Trix::statAbortUtilCB(
Signal* signal, Uint32 statPtrI, Uint32 ret)
3045 StatOp& stat = statOpGetPtr(statPtrI);
3046 D(
"statAbortUtilCB" << V(stat) << V(ret));
3048 ndbrequire(ret == 0);
3049 statOpAbort(signal, stat);
3055 Trix::statOpSuccess(
Signal* signal, StatOp& stat)
3057 StatOp::Data& data = stat.m_data;
3058 D(
"statOpSuccess" << V(stat));
3060 if (stat.m_requestType == IndexStatReq::RT_SCAN_FRAG)
3061 statOpEvent(stat,
"I",
"created %u samples", data.m_sampleCount);
3063 statOpConf(signal, stat);
3064 statOpRelease(stat);
3068 Trix::statOpConf(
Signal* signal, StatOp& stat)
3071 D(
"statOpConf" << V(stat));
3074 conf->senderRef = reference();
3075 conf->senderData = req->senderData;
3076 sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_CONF,
3077 signal, IndexStatImplConf::SignalLength, JBB);
3081 Trix::statOpError(
Signal* signal, StatOp& stat,
3082 Uint32 errorCode, Uint32 errorLine,
3083 const Uint32 * supress)
3085 D(
"statOpError" << V(stat) << V(errorCode) << V(errorLine));
3089 for (Uint32 i = 0; supress[
i] != 0; i++)
3091 if (errorCode == supress[i])
3097 statOpEvent(stat,
"W",
"error %u line %u", errorCode, errorLine);
3100 ndbrequire(stat.m_errorCode == 0);
3101 stat.m_errorCode = errorCode;
3102 stat.m_errorLine = errorLine;
3103 statOpAbort(signal, stat);
3107 Trix::statOpAbort(
Signal* signal, StatOp& stat)
3109 StatOp::Util& util = stat.m_util;
3110 D(
"statOpAbort" << V(stat));
3112 if (util.m_prepareId != RNIL)
3116 statAbortUtil(signal, stat);
3119 statOpRef(signal, stat);
3120 statOpRelease(stat);
3124 Trix::statOpRef(
Signal* signal, StatOp& stat)
3127 D(
"statOpRef" << V(stat));
3129 statOpRef(signal, req, stat.m_errorCode, stat.m_errorLine);
3134 Uint32 errorCode, Uint32 errorLine)
3136 D(
"statOpRef" << V(errorCode) << V(errorLine));
3139 ref->senderRef = reference();
3140 ref->senderData = req->senderData;
3141 ref->errorCode = errorCode;
3142 ref->errorLine = errorLine;
3143 sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_REF,
3144 signal, IndexStatImplRef::SignalLength, JBB);
3148 Trix::statOpEvent(StatOp& stat,
const char*
level,
const char*
msg, ...)
3151 StatOp::Data& data = stat.m_data;
3156 BaseString::vsnprintf(tmp1,
sizeof(tmp1), msg, ap);
3161 "index %u stats version %u: %s: %s",
3162 data.m_indexId, data.m_sampleVersion,
3163 stat.m_requestName, tmp1);
3165 D(
"statOpEvent" << V(level) << V(tmp2));
3167 if (level[0] ==
'I')
3169 if (level[0] ==
'W')
3176 operator<<(NdbOut& out,
const Trix::StatOp& stat)
3179 out <<
" i:" << stat.m_ownPtrI;
3180 out <<
" head_found:" << stat.m_data.m_head_found;
3186 BLOCK_FUNCTIONS(
Trix)