18 #include <my_global.h>
21 #include <ndb_version.h>
24 #include <Bitmask.hpp>
25 #include <SimpleProperties.hpp>
27 #include <signaldata/NodeFailRep.hpp>
28 #include <signaldata/ReadNodesConf.hpp>
30 #include <signaldata/ListTables.hpp>
31 #include <signaldata/GetTabInfo.hpp>
32 #include <signaldata/GetTableId.hpp>
33 #include <signaldata/DictTabInfo.hpp>
34 #include <signaldata/SumaImpl.hpp>
35 #include <signaldata/ScanFrag.hpp>
36 #include <signaldata/TransIdAI.hpp>
37 #include <signaldata/CreateTrigImpl.hpp>
38 #include <signaldata/DropTrigImpl.hpp>
39 #include <signaldata/FireTrigOrd.hpp>
40 #include <signaldata/TrigAttrInfo.hpp>
41 #include <signaldata/CheckNodeGroups.hpp>
42 #include <signaldata/CreateTab.hpp>
43 #include <signaldata/DropTab.hpp>
44 #include <signaldata/AlterTable.hpp>
45 #include <signaldata/AlterTab.hpp>
46 #include <signaldata/DihScanTab.hpp>
47 #include <signaldata/SystemError.hpp>
48 #include <signaldata/GCP.hpp>
49 #include <signaldata/StopMe.hpp>
51 #include <signaldata/DictLock.hpp>
52 #include <ndbapi/NdbDictionary.hpp>
54 #include <DebuggerNames.hpp>
55 #include "../dbtup/Dbtup.hpp"
56 #include "../dbdih/Dbdih.hpp"
58 #include <signaldata/CreateNodegroup.hpp>
59 #include <signaldata/CreateNodegroupImpl.hpp>
61 #include <signaldata/DropNodegroup.hpp>
62 #include <signaldata/DropNodegroupImpl.hpp>
64 #include <signaldata/DbinfoScan.hpp>
65 #include <signaldata/TransIdAI.hpp>
67 #include <EventLogger.hpp>
81 #undef DBUG_VOID_RETURN
84 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
85 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
86 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
87 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
90 #define DBUG_PRINT(a,b)
91 #define DBUG_RETURN(a) return a
92 #define DBUG_VOID_RETURN return
106 Uint32 g_subPtrI = RNIL;
107 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
109 static const Uint32 MAX_CONCURRENT_GCP = 2;
120 Suma::execREAD_CONFIG_REQ(
Signal* signal)
126 Uint32 ref = req->senderRef;
127 Uint32 senderData = req->senderData;
130 m_ctx.m_config.getOwnConfigIterator();
134 Uint32 noTables, noAttrs, maxBufferedEpochs;
135 ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE,
137 ndb_mgm_get_int_parameter(p, CFG_DICT_ATTRIBUTE,
139 ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_EPOCHS,
145 c_subscriptions.
setSize(noTables);
149 ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt);
155 c_subscriptionPool.
setSize(cnt);
160 ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &val);
170 ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt);
179 Uint32 noOfBoundWords = 5 * 9;
182 c_dataBufferPool.setSize(noAttrs + noOfBoundWords);
184 c_maxBufferedEpochs = maxBufferedEpochs;
188 Uint32 dbApiHbInterval, gcpInterval, microGcpInterval = 0;
189 ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL,
191 ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL,
193 ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL,
196 if (microGcpInterval)
198 gcpInterval = microGcpInterval;
200 c_gcp_pool.
setSize(10 + (4*dbApiHbInterval+gcpInterval-1)/gcpInterval);
207 while(tmp.seize(ptr))
208 new (ptr.p) SyncRecord(*
this, c_dataBufferPool);
213 c_masterNodeId = getOwnNodeId();
215 c_nodeGroup = c_noNodesInGroup = 0;
216 for (
int i = 0;
i < MAX_REPLICAS;
i++) {
217 c_nodesInGroup[
i] = 0;
220 m_first_free_page= RNIL;
223 memset(c_buckets, 0,
sizeof(c_buckets));
224 for(Uint32
i = 0;
i<NO_OF_BUCKETS;
i++)
227 bucket->m_buffer_tail = RNIL;
228 bucket->m_buffer_head.m_page_id = RNIL;
229 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
234 m_last_complete_gci = 0;
235 m_gcp_complete_rep_count = 0;
236 m_out_of_buffer_gci = 0;
237 m_missing_data =
false;
239 c_startup.m_wait_handover=
false;
240 c_failedApiNodes.
clear();
243 conf->senderRef = reference();
244 conf->senderData = senderData;
245 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
246 ReadConfigConf::SignalLength, JBB);
253 DBUG_ENTER(
"Suma::execSTTOR");
254 m_startphase = signal->theData[1];
255 m_typeOfStart = signal->theData[7];
257 DBUG_PRINT(
"info",(
"startphase = %u, typeOfStart = %u",
258 m_startphase, m_typeOfStart));
260 if(m_startphase == 3)
263 void* ptr = m_ctx.m_mm.get_memroot();
264 c_page_pool.set((Buffer_page*)ptr, (Uint32)~0);
267 if(m_startphase == 5)
271 if (ERROR_INSERTED(13029))
273 sendSignalWithDelay(SUMA_REF, GSN_STTOR, signal,
274 30, signal->getLength());
278 signal->theData[0] = reference();
279 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
283 if(m_startphase == 7)
285 if (m_typeOfStart != NodeState::ST_NODE_RESTART &&
286 m_typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
288 for( Uint32
i = 0;
i < c_no_of_buckets;
i++)
290 if (get_responsible_node(
i) == getOwnNodeId())
293 DBUG_PRINT(
"info",(
"bucket %u set to true",
i));
294 m_active_buckets.
set(
i);
295 ndbout_c(
"m_active_buckets.set(%d)",
i);
300 if(!m_active_buckets.
isclear())
304 while ((bucket = m_active_buckets.
find(bucket)) != Bucket_mask::NotFound)
306 tmp.
set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask));
310 ndbassert(tmp.
get(getOwnNodeId()));
311 m_gcp_complete_rep_count = m_active_buckets.
count();
314 m_gcp_complete_rep_count = 0;
316 if(m_typeOfStart == NodeState::ST_INITIAL_START &&
317 c_masterNodeId == getOwnNodeId())
324 if (ERROR_INSERTED(13030))
326 ndbout_c(
"Dont start handover");
331 if(m_startphase == 100)
340 if(m_startphase == 101)
342 if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
343 m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
348 c_startup.m_wait_handover=
true;
349 check_start_handover(signal);
358 #include <ndb_version.h>
361 Suma::send_dict_lock_req(
Signal* signal, Uint32 state)
363 if (state == DictLockReq::SumaStartMe &&
364 !ndbd_suma_dictlock_startme(
getNodeInfo(c_masterNodeId).m_version))
369 else if (state == DictLockReq::SumaHandOver &&
370 !ndbd_suma_dictlock_handover(
getNodeInfo(c_masterNodeId).m_version))
379 req->lockType = state;
380 req->userPtr = state;
381 req->userRef = reference();
382 sendSignal(calcDictBlockRef(c_masterNodeId),
383 GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB);
389 conf->userPtr = state;
390 execDICT_LOCK_CONF(signal);
394 Suma::execDICT_LOCK_CONF(
Signal* signal)
399 Uint32 state = conf->userPtr;
402 case DictLockReq::SumaStartMe:
404 c_startup.m_restart_server_node_id = 0;
405 CRASH_INSERTION(13039);
406 send_start_me_req(signal);
408 case DictLockReq::SumaHandOver:
410 send_handover_req(signal, SumaHandoverReq::RT_START_NODE);
420 Suma::execDICT_LOCK_REF(
Signal* signal)
425 Uint32 state = ref->userPtr;
427 ndbrequire(ref->errorCode == DictLockRef::TooManyRequests);
428 signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK;
429 signal->theData[1] = state;
430 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 2);
434 Suma::send_dict_unlock_ord(
Signal* signal, Uint32 state)
436 if (state == DictLockReq::SumaStartMe &&
437 !ndbd_suma_dictlock_startme(
getNodeInfo(c_masterNodeId).m_version))
442 else if (state == DictLockReq::SumaHandOver &&
443 !ndbd_suma_dictlock_handover(
getNodeInfo(c_masterNodeId).m_version))
452 ord->lockType = state;
453 ord->senderData = state;
454 ord->senderRef = reference();
455 sendSignal(calcDictBlockRef(c_masterNodeId),
456 GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLength, JBB);
460 Suma::send_start_me_req(
Signal* signal)
462 Uint32 nodeId= c_startup.m_restart_server_node_id;
464 nodeId = c_alive_nodes.
find(nodeId + 1);
466 if(nodeId == getOwnNodeId())
468 if(nodeId == NdbNodeBitmask::NotFound)
477 infoEvent(
"Suma: asking node %d to recreate subscriptions on me", nodeId);
478 c_startup.m_restart_server_node_id= nodeId;
479 sendSignal(calcSumaBlockRef(nodeId),
480 GSN_SUMA_START_ME_REQ, signal, 1, JBB);
484 Suma::execSUMA_START_ME_REF(
Signal* signal)
488 Uint32 error = ref->errorCode;
489 if (error != SumaStartMeRef::Busy && error != SumaStartMeRef::NotStarted)
495 sysErr->errorCode = SystemError::CopySubscriptionRef;
496 sysErr->errorRef = reference();
497 sysErr->data[0] = error;
499 sendSignal(NDBCNTR_REF, GSN_SYSTEM_ERROR, signal,
500 SystemError::SignalLength, JBB);
505 c_startup.m_restart_server_node_id, ref->errorCode);
507 send_start_me_req(signal);
511 Suma::execSUMA_START_ME_CONF(
Signal* signal)
513 infoEvent(
"Suma: node %d has completed restoring me",
514 c_startup.m_restart_server_node_id);
516 send_dict_unlock_ord(signal, DictLockReq::SumaStartMe);
517 c_startup.m_restart_server_node_id= 0;
524 DBUG_ENTER(
"Suma::createSequence");
528 req->senderData = RNIL;
529 req->sequenceId = SUMA_SEQUENCE;
530 req->requestType = UtilSequenceReq::Create;
531 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
532 signal, UtilSequenceReq::SignalLength, JBB);
538 Suma::createSequenceReply(
Signal* signal,
546 switch ((UtilSequenceRef::ErrorCode)ref->errorCode)
548 case UtilSequenceRef::NoSuchSequence:
550 case UtilSequenceRef::TCError:
554 "Startup failed during sequence creation. TC error %d",
556 progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf);
566 Suma::execREAD_NODESCONF(
Signal* signal){
572 c_alive_nodes.
assign(NdbNodeBitmask::Size, conf->startedNodes);
573 c_alive_nodes.
set(getOwnNodeId());
577 c_alive_nodes.
assign(NdbNodeBitmask::Size, conf->startingNodes);
579 tmp.
assign(NdbNodeBitmask::Size, conf->startedNodes);
585 for (Uint32
i = 0;
i<MAX_NDB_NODES;
i++)
587 if (c_alive_nodes.
get(
i))
588 ndbout_c(
"%u c_alive_nodes.set(%u)", __LINE__,
i);
592 c_masterNodeId = conf->masterNodeId;
601 DBUG_ENTER(
"Suma::getNodeGroupMembers");
606 sd->blockRef = reference();
607 sd->requestType = CheckNodeGroups::GetNodeGroupMembers;
608 sd->nodeId = getOwnNodeId();
609 sd->senderData = RNIL;
610 sendSignal(DBDIH_REF, GSN_CHECKNODEGROUPSREQ, signal,
611 CheckNodeGroups::SignalLength, JBB);
617 valid_seq(Uint32
n, Uint32 r, Uint16 dst[])
619 Uint16 tmp[MAX_REPLICAS];
620 for (Uint32
i = 0;
i<r;
i++)
623 for (Uint32 j = 0; j<
i; j++)
624 if (tmp[j] == tmp[i])
632 for (Uint32 i = 0; i<r; i++)
639 Suma::fix_nodegroup()
643 for (i = 0; i < MAX_NDB_NODES; i++)
645 if (c_nodes_in_nodegroup_mask.
get(i))
647 c_nodesInGroup[pos++] =
i;
651 const Uint32 replicas= c_noNodesInGroup = pos;
656 for(i = 1; i <= replicas; i++)
676 for (i = 0; i<tot; i++)
678 Bucket* ptr= c_buckets + cnt;
679 if (valid_seq(i, replicas, ptr->m_nodes))
682 if (DBG_3R) printf(
"bucket %u : ", cnt);
683 for (Uint32 j = 0; j<replicas; j++)
685 ptr->m_nodes[j] = c_nodesInGroup[ptr->m_nodes[j]];
686 if (DBG_3R) printf(
"%u ", ptr->m_nodes[j]);
688 if (DBG_3R) printf(
"\n");
692 ndbrequire(cnt == buckets);
693 c_no_of_buckets= buckets;
704 Suma::execCHECKNODEGROUPSCONF(
Signal *signal)
707 DBUG_ENTER(
"Suma::execCHECKNODEGROUPSCONF");
710 c_nodeGroup = sd->output;
711 c_nodes_in_nodegroup_mask.
assign(sd->mask);
712 c_noNodesInGroup = c_nodes_in_nodegroup_mask.
count();
717 for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
718 DBUG_PRINT(
"exit",(
"Suma: NodeGroup %u, me %u, "
720 c_nodeGroup, getOwnNodeId(),
721 i, c_nodesInGroup[i]));
725 c_startup.m_restart_server_node_id = 0;
726 if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
727 m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
731 send_dict_lock_req(signal, DictLockReq::SumaStartMe);
736 c_startup.m_restart_server_node_id = 0;
743 Suma::execAPI_START_REP(
Signal* signal)
745 Uint32 nodeId = signal->theData[0];
746 c_connected_nodes.
set(nodeId);
748 check_start_handover(signal);
752 Suma::check_start_handover(
Signal* signal)
754 if(c_startup.m_wait_handover)
757 tmp.
assign(c_connected_nodes);
758 tmp.
bitAND(c_subscriber_nodes);
759 if(!c_subscriber_nodes.
equal(tmp))
764 c_startup.m_wait_handover=
false;
769 send_dict_lock_req(signal, DictLockReq::SumaHandOver);
780 Suma::send_handover_req(
Signal* signal, Uint32
type)
783 c_startup.m_handover_nodes.assign(c_alive_nodes);
784 c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask);
785 c_startup.m_handover_nodes.clear(getOwnNodeId());
786 Uint32 gci= Uint32(m_last_complete_gci >> 32) + 3;
790 c_startup.m_handover_nodes.getText(buf);
791 infoEvent(
"Suma: initiate handover for %s with nodes %s GCI: %u",
792 (type == SumaHandoverReq::RT_START_NODE ?
"startup" :
"shutdown"),
797 req->nodeId = getOwnNodeId();
798 req->requestType =
type;
801 sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal,
802 SumaHandoverReq::SignalLength, JBB);
806 Suma::sendSTTORRY(
Signal* signal){
807 signal->theData[0] = 0;
808 signal->theData[3] = 1;
809 signal->theData[4] = 3;
810 signal->theData[5] = 5;
811 signal->theData[6] = 7;
812 signal->theData[7] = 100;
813 signal->theData[8] = 101;
814 signal->theData[9] = 255;
815 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
819 Suma::execNDB_STTOR(
Signal* signal)
827 Uint32 type= signal->theData[0];
829 case SumaContinueB::RELEASE_GCI:
831 Uint32 gci_hi = signal->theData[2];
832 Uint32 gci_lo = signal->theData[3];
833 Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
834 release_gci(signal, signal->theData[1], gci);
837 case SumaContinueB::RESEND_BUCKET:
839 Uint32 min_gci_hi = signal->theData[2];
840 Uint32 min_gci_lo = signal->theData[5];
841 Uint32 last_gci_hi = signal->theData[4];
842 Uint32 last_gci_lo = signal->theData[6];
843 Uint64 min_gci = min_gci_lo | (Uint64(min_gci_hi) << 32);
844 Uint64 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
845 resend_bucket(signal,
852 case SumaContinueB::OUT_OF_BUFFER_RELEASE:
853 out_of_buffer_release(signal, signal->theData[1]);
855 case SumaContinueB::API_FAIL_GCI_LIST:
856 api_fail_gci_list(signal, signal->theData[1]);
858 case SumaContinueB::API_FAIL_SUBSCRIBER_LIST:
862 case SumaContinueB::API_FAIL_SUBSCRIPTION:
865 case SumaContinueB::SUB_STOP_REQ:
868 case SumaContinueB::RETRY_DICT_LOCK:
870 send_dict_lock_req(signal, signal->theData[1]);
881 void Suma::execAPI_FAILREQ(
Signal* signal)
884 DBUG_ENTER(
"Suma::execAPI_FAILREQ");
885 Uint32 failedApiNode = signal->theData[0];
886 ndbrequire(signal->theData[1] == QMGR_REF);
888 c_connected_nodes.
clear(failedApiNode);
890 if (c_failedApiNodes.
get(failedApiNode))
897 if (!c_subscriber_nodes.
get(failedApiNode))
906 c_failedApiNodes.
set(failedApiNode);
907 c_subscriber_nodes.
clear(failedApiNode);
908 c_subscriber_per_node[failedApiNode] = 0;
909 c_failedApiNodesState[failedApiNode] = __LINE__;
911 check_start_handover(signal);
913 signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
914 signal->theData[1] = failedApiNode;
915 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
920 api_fail_block_cleanup(signal, failedApiNode);
925 signal->theData[0] = failedApiNode;
926 signal->theData[1] = reference();
927 sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
929 c_failedApiNodesState[failedApiNode] = 0;
935 Suma::api_fail_block_cleanup_callback(
Signal* signal,
937 Uint32 elementsCleaned)
945 ndbassert(elementsCleaned == 0);
948 signal->theData[0] = failedNodeId;
949 signal->theData[1] = reference();
950 sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
951 c_failedApiNodes.
clear(failedNodeId);
952 c_failedApiNodesState[failedNodeId] = 0;
956 Suma::api_fail_block_cleanup(
Signal* signal, Uint32 failedNode)
960 c_failedApiNodesState[failedNode] = __LINE__;
962 Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback),
969 Suma::api_fail_gci_list(
Signal* signal, Uint32 nodeId)
974 if (c_gcp_list.
first(gcp))
977 gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
979 if (gcp.p->m_subscribers.isclear())
984 ack->rep.gci_hi = Uint32(gcp.p->m_gci >> 32);
985 ack->rep.gci_lo = Uint32(gcp.p->m_gci);
986 ack->rep.senderRef = reference();
988 sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
989 SubGcpCompleteAck::SignalLength, JBB);
991 c_gcp_list.release(gcp);
993 c_failedApiNodesState[nodeId] = __LINE__;
994 signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
995 signal->theData[1] = nodeId;
996 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
1001 if (ERROR_INSERTED(13023))
1003 CLEAR_ERROR_INSERT_VALUE;
1006 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1007 signal->theData[1] = nodeId;
1008 signal->theData[2] = RNIL;
1009 signal->theData[3] = RNIL;
1010 signal->theData[4] = RNIL;
1011 signal->theData[5] = RNIL;
1014 if (c_subOpPool.
seize(subOpPtr))
1016 c_failedApiNodesState[nodeId] = __LINE__;
1017 signal->theData[2] = subOpPtr.i;
1018 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1022 c_failedApiNodesState[nodeId] = __LINE__;
1023 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1035 if (c_outstanding_drop_trig_req > 9)
1042 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
1043 signal->getLength());
1047 subOpPtr.i = signal->theData[2];
1048 if (subOpPtr.i == RNIL)
1050 if (c_subOpPool.
seize(subOpPtr))
1052 signal->theData[3] = RNIL;
1057 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1058 c_failedApiNodesState[nodeId] = __LINE__;
1065 c_subOpPool.
getPtr(subOpPtr);
1068 Uint32
bucket = signal->theData[3];
1069 Uint32 subscriptionId = signal->theData[4];
1070 Uint32 subscriptionKey = signal->theData[5];
1076 c_subscriptions.
first(iter);
1077 c_failedApiNodesState[nodeId] = __LINE__;
1084 key.m_subscriptionId = subscriptionId;
1085 key.m_subscriptionKey = subscriptionKey;
1086 if (c_subscriptions.
find(iter.curr, key) ==
false)
1092 c_subscriptions.
next(bucket, iter);
1093 c_failedApiNodesState[nodeId] = __LINE__;
1097 iter.bucket = bucket;
1101 if (iter.curr.isNull())
1104 api_fail_block_cleanup(signal, nodeId);
1105 c_failedApiNodesState[nodeId] = __LINE__;
1109 subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ;
1110 subOpPtr.p->m_subPtrI = iter.curr.i;
1111 subOpPtr.p->m_senderRef = nodeId;
1112 subOpPtr.p->m_senderData = iter.bucket;
1115 bool empty = list.isEmpty();
1121 c_failedApiNodesState[nodeId] = __LINE__;
1122 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1123 signal->theData[1] = subOpPtr.i;
1124 signal->theData[2] = RNIL;
1125 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1130 c_failedApiNodesState[nodeId] = __LINE__;
1139 c_subOpPool.
getPtr(subOpPtr, signal->theData[1]);
1141 Uint32 nodeId = subOpPtr.p->m_senderRef;
1144 c_subscriptionPool.
getPtr(subPtr, subOpPtr.p->m_subPtrI);
1149 if (signal->theData[2] == RNIL)
1157 list.
getPtr(ptr, signal->theData[2]);
1160 for (Uint32 i = 0; i<32 && !ptr.isNull(); i++)
1163 if (refToNode(ptr.p->m_senderRef) == nodeId)
1174 bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
1176 send_sub_start_stop_event(signal, tmp, NdbDictionary::Event::_TE_STOP,
1192 c_failedApiNodesState[nodeId] = __LINE__;
1193 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1194 signal->theData[1] = subOpPtr.i;
1195 signal->theData[2] = ptr.i;
1196 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1201 check_remove_queue(signal, subPtr, subOpPtr,
true,
false);
1206 iter.bucket = subOpPtr.p->m_senderData;
1209 if (c_subscriptions.
next(iter))
1212 c_failedApiNodesState[nodeId] = __LINE__;
1213 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1214 signal->theData[1] = nodeId;
1215 signal->theData[2] = subOpPtr.i;
1216 signal->theData[3] = iter.bucket;
1217 signal->theData[4] = iter.curr.p->m_subscriptionId;
1218 signal->theData[5] = iter.curr.p->m_subscriptionKey;
1219 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1223 c_subOpPool.
release(subOpPtr);
1226 api_fail_block_cleanup(signal, nodeId);
1230 Suma::execNODE_FAILREP(
Signal* signal){
1232 DBUG_ENTER(
"Suma::execNODE_FAILREP");
1233 ndbassert(signal->getNoOfSections() == 0);
1238 if(c_restart.m_ref && failed.
get(refToNode(c_restart.m_ref)))
1242 if (c_restart.m_waiting_on_self)
1245 c_restart.m_abort = 1;
1251 c_subscriptionPool.
getPtr(subPtr, c_restart.m_subPtrI);
1252 abort_start_me(signal, subPtr,
false);
1256 if (ERROR_INSERTED(13032))
1258 Uint32 node = c_subscriber_nodes.
find(0);
1259 if (node != NodeBitmask::NotFound)
1261 ndbout_c(
"Inserting API_FAILREQ node: %u", node);
1262 signal->theData[0] = node;
1263 sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
1268 tmp.
assign(c_alive_nodes);
1273 if(c_nodes_in_nodegroup_mask.
overlaps(failed))
1275 for( Uint32 i = 0; i < c_no_of_buckets; i++)
1277 if(m_active_buckets.
get(i))
1279 else if(m_switchover_buckets.
get(i))
1281 Uint32 state= c_buckets[
i].m_state;
1282 if((state & Bucket::BUCKET_HANDOVER) &&
1283 failed.
get(get_responsible_node(i)))
1285 m_active_buckets.
set(i);
1286 m_switchover_buckets.
clear(i);
1287 ndbout_c(
"aborting handover");
1289 else if(state & Bucket::BUCKET_STARTING)
1291 progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
1292 "Nodefailure during SUMA takeover");
1294 else if (state & Bucket::BUCKET_SHUTDOWN_TO)
1297 c_buckets[
i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN_TO);
1298 m_switchover_buckets.
clear(i);
1299 ndbrequire(get_responsible_node(i, tmp) == getOwnNodeId());
1300 start_resend(signal, i);
1303 else if(get_responsible_node(i, tmp) == getOwnNodeId())
1305 start_resend(signal, i);
1311 for(
unsigned i = 1; i < MAX_NDB_NODES; i++) {
1316 ndbassert(elementsCleaned == 0);
1317 (void) elementsCleaned;
1321 c_alive_nodes.
assign(tmp);
1330 const Uint32 senderRef = signal->theData[0];
1331 const Uint32 nodeId = signal->theData[1];
1333 ndbrequire(!c_alive_nodes.
get(nodeId));
1334 if (c_nodes_in_nodegroup_mask.
get(nodeId))
1342 c_alive_nodes.
set(nodeId);
1353 c_alive_nodes.
set(nodeId);
1356 signal->theData[0] = nodeId;
1357 signal->theData[1] = reference();
1358 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
1362 Suma::execSIGNAL_DROPPED_REP(
Signal* signal){
1374 cstr(Suma::Subscription::State s)
1377 case Suma::Subscription::UNDEFINED:
1379 case Suma::Subscription::DEFINED:
1381 case Suma::Subscription::DEFINING:
1389 cstr(Suma::Subscription::TriggerState s)
1392 case Suma::Subscription::T_UNDEFINED:
1394 case Suma::Subscription::T_CREATING:
1396 case Suma::Subscription::T_DEFINED:
1398 case Suma::Subscription::T_DROPPING:
1400 case Suma::Subscription::T_ERROR:
1408 cstr(Suma::Subscription::Options s)
1410 static char buf[256];
1413 if (s & Suma::Subscription::REPORT_ALL)
1414 strcat(buf,
" reportall");
1415 if (s & Suma::Subscription::REPORT_SUBSCRIBE)
1416 strcat(buf,
" reportsubscribe");
1417 if (s & Suma::Subscription::MARKED_DROPPED)
1418 strcat(buf,
" dropped");
1419 if (s & Suma::Subscription::NO_REPORT_DDL)
1420 strcat(buf,
" noreportddl");
1427 cstr(Suma::Table::State s)
1430 case Suma::Table::UNDEFINED:
1432 case Suma::Table::DEFINING:
1434 case Suma::Table::DEFINED:
1436 case Suma::Table::DROPPED:
1443 Suma::execDUMP_STATE_ORD(
Signal* signal){
1446 Uint32 tCase = signal->theData[0];
1448 if(tCase >= 8000 && tCase <= 8003){
1449 SubscriptionPtr subPtr;
1450 c_subscriptions.
getPtr(subPtr, g_subPtrI);
1453 c_syncPool.
getPtr(syncPtr, subPtr.p->m_syncPtrI);
1456 syncPtr.p->startMeta(signal);
1464 syncPtr.p->startTrigger(signal);
1468 subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
1471 Uint32 att[] = { 0, 1, 1 };
1472 syncPtr.p->m_tableList.append(&tab, 1);
1473 attrs.append(att, 3);
1478 infoEvent(
"Suma: c_subscriberPool size: %d free: %d",
1482 infoEvent(
"Suma: c_tablePool size: %d free: %d",
1483 c_tablePool.getSize(),
1484 c_tablePool.getNoOfFree());
1486 infoEvent(
"Suma: c_subscriptionPool size: %d free: %d",
1487 c_subscriptionPool.getSize(),
1488 c_subscriptionPool.getNoOfFree());
1490 infoEvent(
"Suma: c_syncPool size: %d free: %d",
1491 c_syncPool.getSize(),
1492 c_syncPool.getNoOfFree());
1494 infoEvent(
"Suma: c_dataBufferPool size: %d free: %d",
1496 c_dataBufferPool.getNoOfFree());
1498 infoEvent(
"Suma: c_subOpPool size: %d free: %d",
1499 c_subOpPool.getSize(),
1500 c_subOpPool.getNoOfFree());
1503 infoEvent(
"Suma: c_dataSubscribers count: %d",
1504 count_subscribers(c_dataSubscribers));
1505 infoEvent(
"Suma: c_prepDataSubscribers count: %d",
1506 count_subscribers(c_prepDataSubscribers));
1512 for(Uint32 i = 0; i<c_no_of_buckets; i++)
1515 infoEvent(
"Bucket %d %d%d-%x switch gci: %llu max_acked_gci: %llu max_gci: %llu tail: %d head: %d",
1517 m_active_buckets.
get(i),
1518 m_switchover_buckets.
get(i),
1520 ptr->m_switchover_gci,
1521 ptr->m_max_acked_gci,
1522 ptr->m_buffer_head.m_max_gci,
1524 ptr->m_buffer_head.m_page_id);
1530 SET_ERROR_INSERT_VALUE(13029);
1535 c_startup.m_restart_server_node_id = MAX_NDB_NODES + 1;
1536 SET_ERROR_INSERT_VALUE(13029);
1541 CLEAR_ERROR_INSERT_VALUE;
1546 char buf1[255], buf2[255];
1547 c_subscriber_nodes.
getText(buf1);
1548 c_connected_nodes.
getText(buf2);
1549 infoEvent(
"c_subscriber_nodes: %s", buf1);
1550 infoEvent(
"c_connected_nodes: %s", buf2);
1555 if (ERROR_INSERTED(13030))
1557 CLEAR_ERROR_INSERT_VALUE;
1558 sendSTTORRY(signal);
1562 SET_ERROR_INSERT_VALUE(13030);
1570 Uint32
bucket = signal->theData[1];
1572 if (signal->getLength() == 1)
1576 infoEvent(
"-- Starting dump of subscribers --");
1580 const Uint32 RT_BREAK = 16;
1581 for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1584 if(it.curr.i == RNIL)
1587 infoEvent(
"-- Ending dump of subscribers --");
1592 it.curr.p->m_tableId,
1593 it.curr.p->m_schemaVersion);
1598 it.curr.p->m_subscriptions);
1599 for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
1605 subPtr.p->m_subscribers);
1606 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1612 ptr.p->m_senderData,
1620 subPtr.p->m_create_req);
1622 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1627 ptr.p->m_senderData);
1634 subPtr.p->m_start_req);
1636 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1641 ptr.p->m_senderData);
1648 subPtr.p->m_stop_req);
1650 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1656 ptr.p->m_senderData);
1660 infoEvent(
"Table %u #subscribers %u", it.curr.p->m_tableId, cnt);
1664 signal->theData[0] = tCase;
1665 signal->theData[1] = it.bucket;
1666 sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1673 Uint32 bucket = signal->theData[1];
1675 if (signal->getLength() == 1)
1679 infoEvent(
"-- Starting dump of subscribers --");
1682 c_subscriptions.
next(bucket, it);
1683 const Uint32 RT_BREAK = 16;
1684 for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1687 if(it.curr.i == RNIL)
1690 infoEvent(
"-- Ending dump of subscribers --");
1696 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
1697 infoEvent(
"Subcription %u id: 0x%.8x key: 0x%.8x state: %s",
1699 subPtr.p->m_subscriptionId,
1700 subPtr.p->m_subscriptionKey,
1701 cstr(subPtr.p->m_state));
1702 infoEvent(
" trigger state: %s options: %s",
1703 cstr(subPtr.p->m_trigger_state),
1704 cstr((Suma::Subscription::Options)subPtr.p->m_options));
1705 infoEvent(
" tablePtr: %u tableId: %u schemaVersion: 0x%.8x state: %s",
1708 tabPtr.p->m_schemaVersion,
1709 cstr(tabPtr.p->m_state));
1713 subPtr.p->m_subscribers);
1714 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1719 ptr.p->m_senderData,
1727 subPtr.p->m_create_req);
1729 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1734 ptr.p->m_senderData);
1741 subPtr.p->m_start_req);
1743 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1748 ptr.p->m_senderData);
1755 subPtr.p->m_stop_req);
1757 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1763 ptr.p->m_senderData);
1766 c_subscriptions.
next(it);
1769 signal->theData[0] = tCase;
1770 signal->theData[1] = it.bucket;
1771 sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1775 if (tCase == 7019 && signal->getLength() == 2)
1778 Uint32 nodeId = signal->theData[1];
1779 if (nodeId < MAX_NODES)
1782 c_failedApiNodesState[nodeId]);
1784 c_connected_nodes.
get(nodeId));
1786 c_failedApiNodes.
get(nodeId));
1788 c_subscriber_nodes.
get(nodeId));
1790 nodeId, c_subscriber_per_node[nodeId]);
1794 warningEvent(
" SUMP: dump-7019 to unknown node: %u", nodeId);
1799 void Suma::execDBINFO_SCANREQ(
Signal *signal)
1808 switch(req.tableId){
1809 case Ndbinfo::POOLS_TABLEID:
1818 { CFG_DB_SUBSCRIBERS,
1819 CFG_DB_SUBSCRIPTIONS,
1820 CFG_DB_NO_TABLES,0 }},
1822 c_tablePool.getUsed(),
1823 c_tablePool.getSize(),
1824 c_tablePool.getEntrySize(),
1825 c_tablePool.getUsedHi(),
1826 { CFG_DB_NO_TABLES,0,0,0 }},
1828 c_subscriptionPool.getUsed(),
1829 c_subscriptionPool.getSize(),
1830 c_subscriptionPool.getEntrySize(),
1831 c_subscriptionPool.getUsedHi(),
1832 { CFG_DB_SUBSCRIPTIONS,
1833 CFG_DB_NO_TABLES,0,0 }},
1835 c_syncPool.getUsed(),
1836 c_syncPool.getSize(),
1837 c_syncPool.getEntrySize(),
1838 c_syncPool.getUsedHi(),
1841 c_dataBufferPool.getUsed(),
1843 c_dataBufferPool.getEntrySize(),
1844 c_dataBufferPool.getUsedHi(),
1845 { CFG_DB_NO_ATTRIBUTES,0,0,0 }},
1847 c_subOpPool.getUsed(),
1848 c_subOpPool.getSize(),
1849 c_subOpPool.getEntrySize(),
1850 c_subOpPool.getUsedHi(),
1851 { CFG_DB_SUB_OPERATIONS,0,0,0 }},
1853 c_page_chunk_pool.getUsed(),
1854 c_page_chunk_pool.getSize(),
1855 c_page_chunk_pool.getEntrySize(),
1856 c_page_chunk_pool.getUsedHi(),
1859 c_gcp_pool.getUsed(),
1860 c_gcp_pool.getSize(),
1861 c_gcp_pool.getEntrySize(),
1862 c_gcp_pool.getUsedHi(),
1863 { CFG_DB_API_HEARTBEAT_INTERVAL,
1864 CFG_DB_GCP_INTERVAL,0,0 }},
1865 { NULL, 0,0,0,0, { 0,0,0,0 }}
1868 const size_t num_config_params =
1869 sizeof(pools[0].config_params) /
sizeof(pools[0].config_params[0]);
1870 Uint32 pool =
cursor->data[0];
1871 BlockNumber bn = blockToMain(number());
1872 while(pools[pool].poolname)
1876 row.write_uint32(getOwnNodeId());
1877 row.write_uint32(bn);
1878 row.write_uint32(instance());
1879 row.write_string(pools[pool].poolname);
1880 row.write_uint64(pools[pool].used);
1881 row.write_uint64(pools[pool].total);
1882 row.write_uint64(pools[pool].used_hi);
1883 row.write_uint64(pools[pool].entry_size);
1884 for (
size_t i = 0; i < num_config_params; i++)
1885 row.write_uint32(pools[pool].config_params[i]);
1886 ndbinfo_send_row(signal, req, row, rl);
1888 if (rl.need_break(req))
1891 ndbinfo_send_scan_break(signal, req, rl, pool);
1901 ndbinfo_send_scan_conf(signal, req, rl);
1911 Suma::execCREATE_SUBID_REQ(
Signal* signal)
1914 DBUG_ENTER(
"Suma::execCREATE_SUBID_REQ");
1915 ndbassert(signal->getNoOfSections() == 0);
1916 CRASH_INSERTION(13001);
1920 SubscriberPtr subbPtr;
1923 sendSubIdRef(signal, req->senderRef, req->senderData, 1412);
1926 DBUG_PRINT(
"info",(
"c_subscriberPool size: %d free: %d",
1930 subbPtr.p->m_senderRef = req->senderRef;
1931 subbPtr.p->m_senderData = req->senderData;
1934 utilReq->senderData = subbPtr.i;
1935 utilReq->sequenceId = SUMA_SEQUENCE;
1936 utilReq->requestType = UtilSequenceReq::NextVal;
1937 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
1938 signal, UtilSequenceReq::SignalLength, JBB);
1944 Suma::execUTIL_SEQUENCE_CONF(
Signal* signal)
1947 DBUG_ENTER(
"Suma::execUTIL_SEQUENCE_CONF");
1948 ndbassert(signal->getNoOfSections() == 0);
1949 CRASH_INSERTION(13002);
1952 if(conf->requestType == UtilSequenceReq::Create) {
1954 createSequenceReply(signal, conf, NULL);
1959 memcpy(&subId,conf->sequenceValue,8);
1960 SubscriberPtr subbPtr;
1964 subconf->senderRef = reference();
1965 subconf->senderData = subbPtr.p->m_senderData;
1966 subconf->subscriptionId = (Uint32)subId;
1967 subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
1969 sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal,
1970 CreateSubscriptionIdConf::SignalLength, JBB);
1973 DBUG_PRINT(
"info",(
"c_subscriberPool size: %d free: %d",
1980 Suma::execUTIL_SEQUENCE_REF(
Signal* signal)
1983 DBUG_ENTER(
"Suma::execUTIL_SEQUENCE_REF");
1984 ndbassert(signal->getNoOfSections() == 0);
1986 Uint32 err= ref->errorCode;
1988 if(ref->requestType == UtilSequenceReq::Create) {
1990 createSequenceReply(signal, NULL, ref);
1994 Uint32 subData = ref->senderData;
1996 SubscriberPtr subbPtr;
1998 if (err == UtilSequenceRef::TCError)
2001 err = ref->TCErrorCode;
2003 sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err);
2005 DBUG_PRINT(
"info",(
"c_subscriberPool size: %d free: %d",
2013 Suma::sendSubIdRef(
Signal* signal,
2014 Uint32 senderRef, Uint32 senderData, Uint32 errCode)
2017 DBUG_ENTER(
"Suma::sendSubIdRef");
2021 ref->senderRef = reference();
2022 ref->senderData = senderData;
2023 ref->errorCode = errCode;
2024 sendSignal(senderRef,
2025 GSN_CREATE_SUBID_REF,
2027 CreateSubscriptionIdRef::SignalLength,
2042 DBUG_ENTER(
"Suma::execSUB_CREATE_REQ");
2043 ndbassert(signal->getNoOfSections() == 0);
2044 CRASH_INSERTION(13003);
2048 const Uint32 senderRef = req.senderRef;
2049 const Uint32 senderData = req.senderData;
2050 const Uint32 subId = req.subscriptionId;
2051 const Uint32 subKey = req.subscriptionKey;
2052 const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags;
2053 const Uint32
flags = req.subscriptionType & SubCreateReq::GetFlags;
2054 const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
2055 Subscription::REPORT_ALL : 0;
2056 const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
2057 Subscription::REPORT_SUBSCRIBE : 0;
2058 const Uint32 noReportDDL = (flags & SubCreateReq::NoReportDDL) ?
2059 Subscription::NO_REPORT_DDL : 0;
2060 const Uint32 tableId = req.tableId;
2061 const Uint32 schemaTransId = req.schemaTransId;
2063 bool subDropped = req.subscriptionType & SubCreateReq::NR_Sub_Dropped;
2070 ndbrequire(refToNode(senderRef) == c_startup.m_restart_server_node_id);
2074 key.m_subscriptionId = subId;
2075 key.m_subscriptionKey = subKey;
2077 DBUG_PRINT(
"enter",(
"key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
2078 key.m_subscriptionId, key.m_subscriptionKey));
2082 bool found = c_subscriptions.
find(subPtr, key);
2084 if (c_startup.m_restart_server_node_id == RNIL)
2091 sendSubCreateRef(signal, senderRef, senderData,
2092 SubCreateRef::NotStarted);
2096 CRASH_INSERTION2(13040, c_startup.m_restart_server_node_id != RNIL);
2097 CRASH_INSERTION(13041);
2099 bool allowDup =
true;
2101 if (found && !allowDup)
2104 sendSubCreateRef(signal, senderRef, senderData,
2105 SubCreateRef::SubscriptionAlreadyExist);
2112 if(!c_subscriptions.
seize(subPtr))
2115 sendSubCreateRef(signal, senderRef, senderData,
2116 SubCreateRef::OutOfSubscriptionRecords);
2121 subPtr.p->m_seq_no = c_current_seq;
2122 subPtr.p->m_subscriptionId = subId;
2123 subPtr.p->m_subscriptionKey = subKey;
2124 subPtr.p->m_subscriptionType =
type;
2125 subPtr.p->m_tableId = tableId;
2126 subPtr.p->m_table_ptrI = RNIL;
2127 subPtr.p->m_state = Subscription::UNDEFINED;
2128 subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
2129 subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
2130 subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
2131 subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
2132 subPtr.p->m_errorCode = 0;
2133 subPtr.p->m_options = reportSubscribe | reportAll | noReportDDL;
2134 subPtr.p->m_schemaTransId = schemaTransId;
2139 if ((ERROR_INSERTED(13044) && found ==
false) ||
2140 subOpList.seize(subOpPtr) ==
false)
2146 if (ERROR_INSERTED(13044))
2148 CLEAR_ERROR_INSERT_VALUE;
2150 c_subscriptionPool.
release(subPtr);
2152 sendSubCreateRef(signal, senderRef, senderData,
2153 SubCreateRef::OutOfTableRecords);
2157 subOpPtr.p->m_senderRef = senderRef;
2158 subOpPtr.p->m_senderData = senderData;
2163 subPtr.p->m_options |= Subscription::MARKED_DROPPED;
2170 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
2179 if (ERROR_INSERTED(13045) || c_tablePool.
seize(tabPtr) ==
false)
2182 if (ERROR_INSERTED(13045))
2184 CLEAR_ERROR_INSERT_VALUE;
2187 subOpList.release(subOpPtr);
2188 c_subscriptionPool.
release(subPtr);
2189 sendSubCreateRef(signal, senderRef, senderData,
2190 SubCreateRef::OutOfTableRecords);
2194 new (tabPtr.p)
Table;
2195 tabPtr.p->m_tableId= tableId;
2196 tabPtr.p->m_ptrI= tabPtr.i;
2197 tabPtr.p->m_error = 0;
2198 tabPtr.p->m_schemaVersion = RNIL;
2199 tabPtr.p->m_state = Table::UNDEFINED;
2200 tabPtr.p->m_schemaTransId = schemaTransId;
2207 c_subscriptions.
add(subPtr);
2209 tabPtr.p->m_subscriptions);
2211 subPtr.p->m_table_ptrI = tabPtr.i;
2214 switch(tabPtr.p->m_state){
2215 case Table::DEFINED:{
2218 subOpList.release(subOpPtr);
2219 subPtr.p->m_state = Subscription::DEFINED;
2221 conf->senderRef = reference();
2222 conf->senderData = senderData;
2223 sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal,
2224 SubCreateConf::SignalLength, JBB);
2227 case Table::UNDEFINED:{
2229 tabPtr.p->m_state = Table::DEFINING;
2230 subPtr.p->m_state = Subscription::DEFINING;
2232 if (ERROR_INSERTED(13031))
2235 CLEAR_ERROR_INSERT_VALUE;
2237 ref->tableId = tableId;
2238 ref->senderData = tabPtr.i;
2239 ref->errorCode = GetTabInfoRef::TableNotDefined;
2240 sendSignal(reference(), GSN_GET_TABINFOREF, signal,
2241 GetTabInfoRef::SignalLength, JBB);
2246 req->senderRef = reference();
2247 req->senderData = tabPtr.i;
2249 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2250 req->tableId = tableId;
2251 req->schemaTransId = schemaTransId;
2253 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2254 GetTabInfoReq::SignalLength, JBB);
2257 case Table::DEFINING:
2263 subPtr.p->m_state = Subscription::DEFINING;
2266 case Table::DROPPED:
2268 subOpList.release(subOpPtr);
2272 tabPtr.p->m_subscriptions);
2275 c_subscriptions.
release(subPtr);
2277 sendSubCreateRef(signal, senderRef, senderData,
2278 SubCreateRef::TableDropped);
2287 Suma::sendSubCreateRef(
Signal* signal, Uint32 retRef, Uint32 data,
2292 ref->errorCode = errCode;
2293 ref->senderData = data;
2294 sendSignal(retRef, GSN_SUB_CREATE_REF, signal,
2295 SubCreateRef::SignalLength, JBB);
2310 CRASH_INSERTION(13004);
2316 key.m_subscriptionId = req->subscriptionId;
2317 key.m_subscriptionKey = req->subscriptionKey;
2320 if(!c_subscriptions.
find(subPtr, key))
2323 releaseSections(handle);
2324 sendSubSyncRef(signal, 1407);
2330 if(!list.
seize(syncPtr))
2333 releaseSections(handle);
2334 sendSubSyncRef(signal, 1416);
2339 syncPtr.p->m_senderRef = req->senderRef;
2340 syncPtr.p->m_senderData = req->senderData;
2341 syncPtr.p->m_subscriptionPtrI = subPtr.i;
2342 syncPtr.p->ptrI = syncPtr.i;
2343 syncPtr.p->m_error = 0;
2344 syncPtr.p->m_requestInfo = req->requestInfo;
2345 syncPtr.p->m_frag_cnt = req->fragCount;
2346 syncPtr.p->m_frag_id = req->fragId;
2347 syncPtr.p->m_tableId = subPtr.p->m_tableId;
2351 if(handle.m_cnt > 0)
2354 handle.getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
2356 append(attrBuf, ptr, getSectionSegmentPool());
2358 if (req->requestInfo & SubSyncReq::RangeScan)
2361 ndbrequire(handle.m_cnt > 1)
2363 handle.getSection(ptr, SubSyncReq::TUX_BOUND_INFO);
2365 append(boundBuf, ptr, getSectionSegmentPool());
2367 releaseSections(handle);
2376 req->senderRef = reference();
2377 req->senderData = syncPtr.i;
2378 req->tableId = subPtr.p->m_tableId;
2379 req->schemaTransId = subPtr.p->m_schemaTransId;
2380 sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2381 DihScanTabReq::SignalLength, JBB);
2386 Suma::sendSubSyncRef(
Signal* signal, Uint32 errCode){
2389 ref->errorCode = errCode;
2390 sendSignal(signal->getSendersBlockRef(),
2393 SubSyncRef::SignalLength,
2402 DBUG_ENTER(
"Suma::execDI_FCOUNTREF");
2404 switch ((DihScanTabRef::ErrorCode) ref->error)
2406 case DihScanTabRef::ErroneousTableState:
2408 if (ref->tableStatus == Dbdih::TabRecord::TS_CREATING)
2410 const Uint32 tableId = ref->tableId;
2411 const Uint32 synPtrI = ref->senderData;
2412 const Uint32 schemaTransId = ref->schemaTransId;
2415 req->senderData = synPtrI;
2416 req->senderRef = reference();
2417 req->tableId = tableId;
2418 req->schemaTransId = schemaTransId;
2419 sendSignalWithDelay(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2420 DihScanTabReq::SignalLength,
2421 DihScanTabReq::RetryInterval);
2433 Suma::execDIH_SCAN_TAB_CONF(
Signal* signal)
2436 DBUG_ENTER(
"Suma::execDI_FCOUNTCONF");
2437 ndbassert(signal->getNoOfSections() == 0);
2439 const Uint32 tableId = conf->tableId;
2440 const Uint32 fragCount = conf->fragmentCount;
2441 const Uint32 scanCookie = conf->scanCookie;
2444 c_syncPool.
getPtr(ptr, conf->senderData);
2447 ndbrequire(fragBuf.getSize() == 0);
2449 ndbassert(fragCount >= ptr.p->m_frag_cnt);
2450 if (ptr.p->m_frag_cnt == 0)
2453 ptr.p->m_frag_cnt = fragCount;
2458 req->senderRef = reference();
2459 req->senderData = ptr.i;
2460 req->tableId = tableId;
2462 req->scanCookie = scanCookie;
2463 sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2464 DihScanGetNodesReq::SignalLength, JBB);
2473 DBUG_ENTER(
"Suma::execDIGETPRIMCONF");
2474 ndbassert(signal->getNoOfSections() == 0);
2477 const Uint32 nodeCount = conf->count;
2478 const Uint32 tableId = conf->tableId;
2479 const Uint32 fragNo = conf->fragId;
2481 ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
2484 c_syncPool.
getPtr(ptr, conf->senderData);
2493 fd.m_fragDesc.m_nodeId = conf->nodes[0];
2494 fd.m_fragDesc.m_fragmentNo = fragNo;
2495 fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey;
2496 if (ptr.p->m_frag_id == ZNIL)
2498 signal->theData[2] = fd.m_dummy;
2499 fragBuf.
append(&signal->theData[2], 1);
2501 else if (ptr.p->m_frag_id == fragNo)
2506 const Uint32 ownNodeId = getOwnNodeId();
2508 for (i = 0; i < nodeCount; i++)
2509 if (conf->nodes[i] == ownNodeId)
2513 sendSubSyncRef(signal, 1428);
2516 fd.m_fragDesc.m_nodeId = ownNodeId;
2517 signal->theData[2] = fd.m_dummy;
2518 fragBuf.
append(&signal->theData[2], 1);
2522 const Uint32 nextFrag = fragNo + 1;
2523 if(nextFrag == ptr.p->m_frag_cnt)
2532 req->senderRef = reference();
2533 req->senderData = ptr.i;
2534 req->tableId = tableId;
2535 req->fragId = nextFrag;
2537 sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2538 DihScanGetNodesReq::SignalLength, JBB);
2555 Uint32 tableId = ref->tableId;
2556 Uint32 senderData = ref->senderData;
2557 Uint32 schemaTransId = ref->schemaTransId;
2558 GetTabInfoRef::ErrorCode errorCode =
2559 (GetTabInfoRef::ErrorCode) ref->errorCode;
2560 int do_resend_request = 0;
2562 c_tablePool.
getPtr(tabPtr, senderData);
2565 case GetTabInfoRef::TableNotDefined:
2568 case GetTabInfoRef::InvalidTableId:
2571 case GetTabInfoRef::Busy:
2572 do_resend_request = 1;
2574 case GetTabInfoRef::NoFetchByName:
2576 case GetTabInfoRef::TableNameTooLong:
2580 if (tabPtr.p->m_state == Table::DROPPED)
2583 do_resend_request = 0;
2586 if (do_resend_request)
2589 req->senderRef = reference();
2590 req->senderData = senderData;
2592 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2593 req->tableId = tableId;
2594 req->schemaTransId = schemaTransId;
2595 sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2596 30, GetTabInfoReq::SignalLength);
2599 get_tabinfo_ref_release(signal, tabPtr);
2606 tabPtr.p->m_subscriptions);
2608 bool empty = subList.isEmpty();
2609 for(subList.first(subPtr); !subPtr.isNull();)
2613 ndbassert(subPtr.p->m_start_req.isEmpty());
2614 ndbassert(subPtr.p->m_stop_req.isEmpty());
2616 for (list.first(ptr); !ptr.isNull(); )
2619 sendSubCreateRef(signal,
2621 ptr.p->m_senderData,
2622 SubCreateRef::TableDropped);
2629 subList.next(subPtr);
2630 c_subscriptions.
remove(tmp1);
2631 subList.release(tmp1);
2639 Suma::execGET_TABINFO_CONF(
Signal* signal){
2642 CRASH_INSERTION(13006);
2651 c_tablePool.
getPtr(tabPtr, conf->senderData);
2653 handle.getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
2654 ndbrequire(tabPtr.p->parseTable(ptr, *
this));
2655 releaseSections(handle);
2657 if (tabPtr.p->m_state == Table::DROPPED)
2660 get_tabinfo_ref_release(signal, tabPtr);
2664 tabPtr.p->m_state = Table::DEFINED;
2667 tabPtr.p->m_subscriptions);
2669 bool empty = subList.isEmpty();
2670 for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
2673 subPtr.p->m_state = Subscription::DEFINED;
2677 for (list.first(ptr); !ptr.isNull();)
2681 conf->senderRef = reference();
2682 conf->senderData = ptr.p->m_senderData;
2683 sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal,
2684 SubCreateConf::SignalLength, JBB);
2699 DBUG_ENTER(
"Suma::Table::parseTable");
2705 s = SimpleProperties::unpack(it, &tableDesc,
2706 DictTabInfo::TableMapping,
2707 DictTabInfo::TableMappingSize,
2711 suma.suma_ndbrequire(s == SimpleProperties::Break);
2716 m_noOfAttributes = tableDesc.NoOfAttributes;
2717 m_schemaVersion = tableDesc.TableVersion;
2732 DBUG_ENTER(
"Suma::SyncRecord::startScan");
2737 m_currentFragment = 0;
2743 Suma::SyncRecord::getNextFragment(
TablePtr * tab,
2748 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2752 suma.c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2755 fragBuf.position(fragIt, m_currentFragment);
2756 for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++)
2759 tmp.m_dummy = * fragIt.data;
2760 if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
2766 m_currentFragment = 0;
2771 Suma::SyncRecord::nextScan(
Signal* signal)
2774 DBUG_ENTER(
"Suma::SyncRecord::nextScan");
2776 FragmentDescriptor fd;
2777 SubscriptionPtr subPtr;
2778 if(!getNextFragment(&tabPtr, &fd)){
2780 completeScan(signal);
2784 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2789 Uint32 instanceKey = fd.m_fragDesc.m_lqhInstanceKey;
2790 BlockReference lqhRef = numberToRef(DBLQH, instanceKey, suma.getOwnNodeId());
2793 const Uint32 parallelism = 16;
2796 req->senderData = ptrI;
2797 req->resultRef = suma.reference();
2798 req->tableId = tabPtr.p->m_tableId;
2799 req->requestInfo = 0;
2800 req->savePointId = 0;
2801 ScanFragReq::setLockMode(req->requestInfo, 0);
2802 ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2803 ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
2804 if (m_requestInfo & SubSyncReq::NoDisk)
2806 ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
2809 if (m_requestInfo & SubSyncReq::LM_Exclusive)
2811 ScanFragReq::setLockMode(req->requestInfo, 1);
2812 ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2813 ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
2816 if (m_requestInfo & SubSyncReq::Reorg)
2818 ScanFragReq::setReorgFlag(req->requestInfo, ScanFragReq::REORG_MOVED);
2821 if (m_requestInfo & SubSyncReq::TupOrder)
2823 ScanFragReq::setTupScanFlag(req->requestInfo, 1);
2826 if (m_requestInfo & SubSyncReq::LM_CommittedRead)
2828 ScanFragReq::setReadCommittedFlag(req->requestInfo, 1);
2831 if (m_requestInfo & SubSyncReq::RangeScan)
2833 ScanFragReq::setRangeScanFlag(req->requestInfo, 1);
2836 if (m_requestInfo & SubSyncReq::StatScan)
2838 ScanFragReq::setStatScanFlag(req->requestInfo, 1);
2841 req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
2842 req->schemaVersion = tabPtr.p->m_schemaVersion;
2844 req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
2845 req->clientOpPtr = (ptrI << 16);
2846 req->batch_size_rows= parallelism;
2848 req->batch_size_bytes= 0;
2850 Uint32 * attrInfo = signal->theData + 25;
2851 attrInfo[0] = attrBuf.getSize();
2859 for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it))
2864 Uint32 noOfSections;
2865 ptr[0].p = attrInfo;
2868 if (m_requestInfo & SubSyncReq::RangeScan)
2871 Uint32 oldpos = pos;
2873 for (boundBuf.first(it); !it.curr.isNull(); boundBuf.next(it))
2875 attrInfo[pos++] = *it.data;
2877 ptr[1].p = &attrInfo[oldpos];
2878 ptr[1].sz = pos - oldpos;
2881 suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
2882 ScanFragReq::SignalLength, JBB, ptr, noOfSections);
2884 m_currentNoOfAttributes = attrBuf.getSize();
2891 Suma::execSCAN_FRAGREF(
Signal* signal){
2899 Suma::execSCAN_FRAGCONF(
Signal* signal){
2901 DBUG_ENTER(
"Suma::execSCAN_FRAGCONF");
2902 ndbassert(signal->getNoOfSections() == 0);
2903 CRASH_INSERTION(13011);
2907 const Uint32 completed = conf->fragmentCompleted;
2908 const Uint32 senderData = conf->senderData;
2909 const Uint32 completedOps = conf->completedOps;
2912 c_syncPool.
getPtr(syncPtr, senderData);
2920 conf->subscriptionId = subPtr.p->m_subscriptionId;
2921 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
2922 execSUB_SYNC_CONTINUE_CONF(signal);
2925 req->subscriberData = syncPtr.p->m_senderData;
2926 req->noOfRowsSent = completedOps;
2927 req->senderData = senderData;
2928 sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
2929 SubSyncContinueReq::SignalLength, JBB);
2934 ndbrequire(completedOps == 0);
2937 syncPtr.p->nextScan(signal);
2942 Suma::execSUB_SYNC_CONTINUE_CONF(
Signal* signal){
2944 ndbassert(signal->getNoOfSections() == 0);
2946 CRASH_INSERTION(13012);
2951 SubscriptionPtr subPtr;
2953 key.m_subscriptionId = conf->subscriptionId;
2954 key.m_subscriptionKey = conf->subscriptionKey;
2955 Uint32 syncPtrI = conf->senderData;
2957 ndbrequire(c_subscriptions.
find(subPtr, key));
2962 c_syncPool.
getPtr(syncPtr, syncPtrI);
2967 FragmentDescriptor tmp;
2968 tmp.m_dummy = * fragIt.data;
2969 instanceKey = tmp.m_fragDesc.m_lqhInstanceKey;
2971 BlockReference lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
2974 req->senderData = syncPtrI;
2975 req->requestInfo = 0;
2977 req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
2978 req->batch_size_rows = 16;
2979 req->batch_size_bytes = 0;
2980 sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
2981 ScanFragNextReq::SignalLength, JBB);
2985 Suma::SyncRecord::completeScan(
Signal* signal,
int error)
2988 DBUG_ENTER(
"Suma::SyncRecord::completeScan");
2990 SubscriptionPtr subPtr;
2991 suma.c_subscriptionPool.getPtr(subPtr, m_subscriptionPtrI);
2994 rep->tableId = subPtr.p->m_tableId;
2995 rep->scanCookie = m_scan_cookie;
2996 suma.sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP, signal,
2997 DihScanTabCompleteRep::SignalLength, JBB);
3000 ndbout_c(
"GSN_SUB_SYNC_CONF (data)");
3005 conf->senderRef = suma.reference();
3006 conf->senderData = m_senderData;
3007 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal,
3008 SubSyncConf::SignalLength, JBB);
3013 ref->senderRef = suma.reference();
3014 ref->senderData = m_senderData;
3015 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal,
3016 SubSyncRef::SignalLength, JBB);
3034 ndbout <<
"execSCAN_HBREP" << endl << hex;
3035 for(
int i = 0; i<signal->
length(); i++){
3036 ndbout << signal->theData[
i] <<
" ";
3037 if(((i + 1) % 8) == 0)
3038 ndbout << endl << hex;
3055 ndbassert(signal->getNoOfSections() == 0);
3056 DBUG_ENTER(
"Suma::execSUB_START_REQ");
3059 CRASH_INSERTION(13013);
3060 Uint32 senderRef = req->senderRef;
3061 Uint32 senderData = req->senderData;
3062 Uint32 subscriberData = req->subscriberData;
3063 Uint32 subscriberRef = req->subscriberRef;
3064 SubscriptionData::Part part = (SubscriptionData::Part)req->part;
3068 key.m_subscriptionId = req->subscriptionId;
3069 key.m_subscriptionKey = req->subscriptionKey;
3075 if (c_startup.m_restart_server_node_id == RNIL)
3082 sendSubStartRef(signal,
3083 senderRef, senderData, SubStartRef::NotStarted);
3087 bool found = c_subscriptions.
find(subPtr, key);
3091 sendSubStartRef(signal,
3092 senderRef, senderData, SubStartRef::NoSuchSubscription);
3096 if (ERROR_INSERTED(13046))
3099 CLEAR_ERROR_INSERT_VALUE;
3100 sendSubStartRef(signal,
3101 senderRef, senderData, SubStartRef::NoSuchSubscription);
3105 switch(subPtr.p->m_state){
3106 case Subscription::UNDEFINED:
3109 case Subscription::DEFINING:
3111 sendSubStartRef(signal,
3112 senderRef, senderData, SubStartRef::Defining);
3114 case Subscription::DEFINED:
3118 if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
3121 if (c_startup.m_restart_server_node_id == 0)
3123 sendSubStartRef(signal,
3124 senderRef, senderData, SubStartRef::Dropped);
3135 if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
3138 sendSubStartRef(signal,
3139 senderRef, senderData, subPtr.p->m_errorCode);
3147 sendSubStartRef(signal,
3148 senderRef, senderData, SubStartRef::OutOfSubscriberRecords);
3153 if (!c_subOpPool.
seize(subOpPtr))
3157 sendSubStartRef(signal,
3158 senderRef, senderData, SubStartRef::OutOfSubOpRecords);
3162 if (! check_sub_start(subscriberRef))
3166 c_subOpPool.
release(subOpPtr);
3167 sendSubStartRef(signal,
3168 senderRef, senderData, SubStartRef::NodeDied);
3173 subbPtr.p->m_senderRef = subscriberRef;
3174 subbPtr.p->m_senderData = subscriberData;
3176 subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ;
3177 subOpPtr.p->m_subPtrI = subPtr.i;
3178 subOpPtr.p->m_senderRef = senderRef;
3179 subOpPtr.p->m_senderData = senderData;
3180 subOpPtr.p->m_subscriberRef = subbPtr.i;
3184 subOpList.add(subOpPtr);
3190 switch(subPtr.p->m_trigger_state){
3191 case Subscription::T_UNDEFINED:
3196 create_triggers(signal, subPtr);
3198 case Subscription::T_CREATING:
3204 case Subscription::T_DROPPING:
3211 case Subscription::T_DEFINED:{
3216 case Subscription::T_ERROR:
3224 Suma::sendSubStartRef(
Signal* signal, Uint32 dstref, Uint32 data, Uint32 err)
3228 ref->senderRef = reference();
3229 ref->senderData = data;
3230 ref->errorCode = err;
3231 sendSignal(dstref, GSN_SUB_START_REF, signal,
3232 SubStartRef::SignalLength, JBB);
3236 Suma::create_triggers(
Signal* signal, SubscriptionPtr subPtr)
3240 ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED);
3241 subPtr.p->m_trigger_state = Subscription::T_CREATING;
3244 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
3247 tabPtr.p->createAttributeMask(attrMask, *
this);
3249 subPtr.p->m_outstanding_trigger = 3;
3250 for(Uint32 j = 0; j<3; j++)
3252 Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | subPtr.i;
3253 ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID);
3257 req->senderRef = SUMA_REF;
3258 req->senderData = subPtr.i;
3259 req->requestType = 0;
3262 TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3263 TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3265 TriggerInfo::setMonitorReplicas(ti,
true);
3267 TriggerInfo::setMonitorAllAttributes(ti,
true);
3268 TriggerInfo::setReportAllMonitoredAttributes(ti,
3269 subPtr.p->m_options & Subscription::REPORT_ALL);
3270 req->triggerInfo = ti;
3272 req->receiverRef = SUMA_REF;
3273 req->triggerId = triggerId;
3274 req->tableId = subPtr.p->m_tableId;
3275 req->tableVersion = 0;
3276 req->indexId = ~(Uint32)0;
3277 req->indexVersion = 0;
3280 ptr[0].p = attrMask.rep.data;
3281 ptr[0].sz = attrMask.getSizeInWords();
3282 sendSignal(DBTUP_REF, GSN_CREATE_TRIG_IMPL_REQ,
3283 signal, CreateTrigImplReq::SignalLength, JBB, ptr, 1);
3293 const Uint32 triggerId = conf->triggerId;
3294 Uint32 type = (triggerId >> 16) & 0x3;
3295 Uint32 tableId = conf->tableId;
3299 c_subscriptions.
getPtr(subPtr, conf->senderData);
3302 ndbrequire(tabPtr.p->m_tableId == tableId);
3303 ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3305 ndbrequire(type < 3);
3306 ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3307 subPtr.p->m_triggers[
type] = triggerId;
3309 ndbrequire(subPtr.p->m_outstanding_trigger);
3310 subPtr.p->m_outstanding_trigger--;
3312 if (subPtr.p->m_outstanding_trigger)
3321 if (subPtr.p->m_errorCode == 0)
3324 subPtr.p->m_trigger_state = Subscription::T_DEFINED;
3330 subPtr.p->m_trigger_state = Subscription::T_ERROR;
3331 drop_triggers(signal, subPtr);
3341 const Uint32 triggerId = ref->triggerId;
3342 Uint32 type = (triggerId >> 16) & 0x3;
3343 Uint32 tableId = ref->tableId;
3347 c_subscriptions.
getPtr(subPtr, ref->senderData);
3350 ndbrequire(tabPtr.p->m_tableId == tableId);
3351 ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3353 ndbrequire(type < 3);
3354 ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3356 subPtr.p->m_errorCode = ref->errorCode;
3358 ndbrequire(subPtr.p->m_outstanding_trigger);
3359 subPtr.p->m_outstanding_trigger--;
3361 if (subPtr.p->m_outstanding_trigger)
3370 subPtr.p->m_trigger_state = Subscription::T_ERROR;
3371 drop_triggers(signal, subPtr);
3375 Suma::check_sub_start(Uint32 subscriberRef)
3377 Uint32 nodeId = refToNode(subscriberRef);
3378 bool startme = c_startup.m_restart_server_node_id;
3379 bool handover = c_startup.m_wait_handover;
3381 c_failedApiNodes.
get(nodeId) ==
false &&
3382 c_connected_nodes.
get(nodeId);
3384 return (startme || handover || connected);
3390 const Uint64 gci = get_current_gci(signal);
3393 subPtr.p->m_subscribers);
3398 for (subOpList.
first(subOpPtr); !subOpPtr.isNull(); )
3402 Uint32 senderRef = subOpPtr.p->m_senderRef;
3403 Uint32 senderData = subOpPtr.p->m_senderData;
3406 if (check_sub_start(ptr.p->m_senderRef))
3409 conf->senderRef = reference();
3410 conf->senderData = senderData;
3411 conf->subscriptionId = subPtr.p->m_subscriptionId;
3412 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
3413 conf->firstGCI = Uint32(gci >> 32);
3414 conf->part = SubscriptionData::TableData;
3415 conf->bucketCount = c_no_of_buckets;
3416 conf->nodegroup = c_nodeGroup;
3417 sendSignal(senderRef, GSN_SUB_START_CONF, signal,
3418 SubStartConf::SignalLength, JBB);
3424 bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3425 send_sub_start_stop_event(signal, ptr,NdbDictionary::Event::_TE_ACTIVE,
3429 c_subscriber_nodes.
set(refToNode(ptr.p->m_senderRef));
3430 c_subscriber_per_node[refToNode(ptr.p->m_senderRef)]++;
3436 sendSubStartRef(signal,
3437 senderRef, senderData, SubStartRef::NodeDied);
3443 subOpList.
next(subOpPtr);
3444 subOpList.release(tmp);
3452 Suma::report_sub_start_ref(
Signal* signal,
3457 subPtr.p->m_subscribers);
3462 for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3466 Uint32 senderRef = subOpPtr.p->m_senderRef;
3467 Uint32 senderData = subOpPtr.p->m_senderData;
3471 ref->senderRef = reference();
3472 ref->senderData = senderData;
3473 ref->errorCode = errCode;
3475 sendSignal(senderRef, GSN_SUB_START_REF, signal,
3476 SubStartConf::SignalLength, JBB);
3480 subOpList.next(subOpPtr);
3481 subOpList.release(tmp);
3487 Suma::drop_triggers(
Signal* signal, SubscriptionPtr subPtr)
3491 subPtr.p->m_outstanding_trigger = 0;
3494 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
3495 if (tabPtr.p->m_state == Table::DROPPED)
3498 subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
3499 subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
3500 subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
3504 for(Uint32 j = 0; j<3; j++)
3507 Uint32 triggerId = subPtr.p->m_triggers[j];
3508 if (triggerId != ILLEGAL_TRIGGER_ID)
3510 subPtr.p->m_outstanding_trigger++;
3514 req->senderRef = SUMA_REF;
3515 req->senderData = subPtr.i;
3516 req->requestType = 0;
3520 TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3521 TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3523 TriggerInfo::setMonitorReplicas(ti,
true);
3525 TriggerInfo::setMonitorAllAttributes(ti,
true);
3526 TriggerInfo::setReportAllMonitoredAttributes(ti,
3527 subPtr.p->m_options & Subscription::REPORT_ALL);
3528 req->triggerInfo = ti;
3530 req->tableId = subPtr.p->m_tableId;
3531 req->tableVersion = 0;
3532 req->indexId = RNIL;
3533 req->indexVersion = 0;
3534 req->triggerId = triggerId;
3535 req->receiverRef = SUMA_REF;
3537 c_outstanding_drop_trig_req++;
3538 sendSignal(DBTUP_REF, GSN_DROP_TRIG_IMPL_REQ,
3539 signal, DropTrigImplReq::SignalLength, JBB);
3544 if (subPtr.p->m_outstanding_trigger == 0)
3558 const Uint32 triggerId = ref->triggerId;
3559 const Uint32 type = (triggerId >> 16) & 0x3;
3561 c_subscriptionPool.
getPtr(subPtr, ref->senderData);
3563 ndbrequire(tabPtr.p->m_tableId == ref->tableId);
3565 ndbrequire(type < 3);
3566 ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3567 subPtr.p->m_triggers[
type] = ILLEGAL_TRIGGER_ID;
3569 ndbrequire(subPtr.p->m_outstanding_trigger);
3570 subPtr.p->m_outstanding_trigger--;
3572 ndbrequire(c_outstanding_drop_trig_req);
3573 c_outstanding_drop_trig_req--;
3575 if (subPtr.p->m_outstanding_trigger)
3596 const Uint32 triggerId = conf->triggerId;
3597 const Uint32 type = (triggerId >> 16) & 0x3;
3599 c_subscriptionPool.
getPtr(subPtr, conf->senderData);
3601 ndbrequire(tabPtr.p->m_tableId == conf->tableId);
3603 ndbrequire(type < 3);
3604 ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3605 subPtr.p->m_triggers[
type] = ILLEGAL_TRIGGER_ID;
3607 ndbrequire(subPtr.p->m_outstanding_trigger);
3608 subPtr.p->m_outstanding_trigger--;
3610 ndbrequire(c_outstanding_drop_trig_req);
3611 c_outstanding_drop_trig_req--;
3613 if (subPtr.p->m_outstanding_trigger)
3628 switch(subPtr.p->m_trigger_state){
3629 case Subscription::T_UNDEFINED:
3630 case Subscription::T_CREATING:
3631 case Subscription::T_DEFINED:
3635 case Subscription::T_DROPPING:
3639 subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3640 if (!subPtr.p->m_start_req.isEmpty())
3643 create_triggers(signal, subPtr);
3647 case Subscription::T_ERROR:
3649 Uint32 err = subPtr.p->m_errorCode;
3650 subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3651 subPtr.p->m_errorCode = 0;
3652 report_sub_start_ref(signal, subPtr, err);
3669 ndbassert(signal->getNoOfSections() == 0);
3670 DBUG_ENTER(
"Suma::execSUB_STOP_REQ");
3672 CRASH_INSERTION(13019);
3675 Uint32 senderRef = req->senderRef;
3676 Uint32 senderData = req->senderData;
3677 Uint32 subscriberRef = req->subscriberRef;
3678 Uint32 subscriberData = req->subscriberData;
3681 key.m_subscriptionId = req->subscriptionId;
3682 key.m_subscriptionKey = req->subscriptionKey;
3683 bool abortStart = (req->requestInfo & SubStopReq::RI_ABORT_START);
3685 if (c_startup.m_restart_server_node_id == RNIL)
3692 sendSubStopRef(signal,
3693 senderRef, senderData, SubStopRef::NotStarted);
3697 bool found = c_subscriptions.
find(subPtr, key);
3701 sendSubStopRef(signal,
3702 senderRef, senderData, SubStopRef::NoSuchSubscription);
3706 switch(subPtr.p->m_state){
3707 case Subscription::UNDEFINED:
3710 case Subscription::DEFINING:
3712 sendSubStopRef(signal,
3713 senderRef, senderData, SubStopRef::Defining);
3715 case Subscription::DEFINED:
3722 bool empty = list.isEmpty();
3723 if (list.seize(subOpPtr) ==
false)
3726 sendSubStopRef(signal,
3727 senderRef, senderData, SubStopRef::OutOfSubOpRecords);
3734 subOpPtr.p->m_opType = SubOpRecord::R_SUB_ABORT_START_REQ;
3739 subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ;
3741 subOpPtr.p->m_subPtrI = subPtr.i;
3742 subOpPtr.p->m_senderRef = senderRef;
3743 subOpPtr.p->m_senderData = senderData;
3744 subOpPtr.p->m_subscriberRef = subscriberRef;
3745 subOpPtr.p->m_subscriberData = subscriberData;
3751 signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3752 signal->theData[1] = subOpPtr.i;
3753 signal->theData[2] = RNIL;
3754 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3764 c_subOpPool.
getPtr(subOpPtr, signal->theData[1]);
3767 c_subscriptionPool.
getPtr(subPtr, subOpPtr.p->m_subPtrI);
3772 if (signal->theData[2] == RNIL)
3780 list.
getPtr(ptr, signal->theData[2]);
3783 for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.
next(ptr))
3785 if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef &&
3786 ptr.p->m_senderData == subOpPtr.p->m_subscriberData)
3797 sendSubStopRef(signal,
3798 subOpPtr.p->m_senderRef,
3799 subOpPtr.p->m_senderData,
3800 SubStopRef::NoSuchSubscriber);
3801 check_remove_queue(signal, subPtr, subOpPtr,
true,
true);
3805 signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3806 signal->theData[1] = subOpPtr.i;
3807 signal->theData[2] = ptr.i;
3808 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3818 bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3819 report_sub_stop_conf(signal, subOpPtr, ptr, report, list);
3822 check_remove_queue(signal, subPtr, subOpPtr,
true,
true);
3827 Suma::check_remove_queue(
Signal* signal,
3841 ndbrequire(tmp.i == subOpPtr.i);
3846 ishead = (tmp.i == subOpPtr.i);
3853 list.release(subOpPtr);
3858 list.remove(subOpPtr);
3864 if (list.first(subOpPtr) ==
false)
3867 c_restart.m_waiting_on_self = 1;
3878 switch(subOpPtr.p->m_opType){
3879 case SubOpRecord::R_SUB_ABORT_START_REQ:
3880 case SubOpRecord::R_SUB_STOP_REQ:
3882 signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3883 signal->theData[1] = subOpPtr.i;
3884 signal->theData[2] = RNIL;
3885 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3887 case SubOpRecord::R_API_FAIL_REQ:
3889 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
3890 signal->theData[1] = subOpPtr.i;
3891 signal->theData[2] = RNIL;
3892 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3894 case SubOpRecord::R_START_ME_REQ:
3902 Suma::report_sub_stop_conf(
Signal* signal,
3909 CRASH_INSERTION(13020);
3911 Uint32 senderRef = subOpPtr.p->m_senderRef;
3912 Uint32 senderData = subOpPtr.p->m_senderData;
3913 bool abortStart = subOpPtr.p->m_opType == SubOpRecord::R_SUB_ABORT_START_REQ;
3919 send_sub_start_stop_event(signal, ptr, NdbDictionary::Event::_TE_STOP,
3924 const Uint64 gci = m_max_seen_gci;
3925 conf->senderRef= reference();
3926 conf->senderData= senderData;
3927 conf->gci_hi= Uint32(gci>>32);
3928 conf->gci_lo= Uint32(gci);
3929 sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
3930 SubStopConf::SignalLength, JBB);
3932 Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3933 if (c_subscriber_per_node[nodeId])
3935 c_subscriber_per_node[nodeId]--;
3936 if (c_subscriber_per_node[nodeId] == 0)
3939 c_subscriber_nodes.
clear(nodeId);
3945 Suma::sendSubStopRef(
Signal* signal,
3952 ref->senderRef = reference();
3953 ref->errorCode = errCode;
3954 ref->senderData = data;
3955 sendSignal(retref, GSN_SUB_STOP_REF, signal, SubStopRef::SignalLength, JBB);
3960 Suma::send_sub_start_stop_event(
Signal *signal,
3962 NdbDictionary::Event::_TableEvent
event,
3966 const Uint64 gci = get_current_gci(signal);
3968 Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3970 NdbDictionary::Event::_TableEvent other;
3971 if (event == NdbDictionary::Event::_TE_STOP)
3973 other = NdbDictionary::Event::_TE_UNSUBSCRIBE;
3975 else if (event == NdbDictionary::Event::_TE_ACTIVE)
3977 other = NdbDictionary::Event::_TE_SUBSCRIBE;
3985 data->gci_hi = Uint32(gci >> 32);
3986 data->gci_lo = Uint32(gci);
3988 data->requestInfo = 0;
3989 SubTableData::setOperation(data->requestInfo, event);
3990 SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
3991 SubTableData::setReqNodeId(data->requestInfo, nodeId);
3992 data->changeMask = 0;
3994 data->senderData = ptr.p->m_senderData;
3995 sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
3996 SubTableData::SignalLength, JBB);
3998 if (report ==
false)
4003 data->requestInfo = 0;
4004 SubTableData::setOperation(data->requestInfo, other);
4005 SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
4008 for(list.
first(tmp); !tmp.isNull(); list.
next(tmp))
4011 SubTableData::setReqNodeId(data->requestInfo, nodeId);
4012 data->senderData = tmp.p->m_senderData;
4013 sendSignal(tmp.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4014 SubTableData::SignalLength, JBB);
4016 ndbassert(tmp.i != ptr.i);
4017 if (other != NdbDictionary::Event::_TE_UNSUBSCRIBE)
4020 SubTableData::setReqNodeId(data->requestInfo,
4021 refToNode(tmp.p->m_senderRef));
4023 data->senderData = ptr.p->m_senderData;
4024 sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4025 SubTableData::SignalLength, JBB);
4035 for(Uint32 i = 0; i<m_noOfAttributes; i++)
4039 void Suma::suma_ndbrequire(
bool v) { ndbrequire(v); }
4049 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
4050 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
4052 static Uint32 f_bufferLock = 0;
4053 static Uint32 f_buffer[SUMA_BUF_SZ];
4054 static Uint32 f_trigBufferSize = 0;
4055 static Uint32 b_bufferLock = 0;
4056 static Uint32 b_buffer[SUMA_BUF_SZ];
4057 static Uint32 b_trigBufferSize = 0;
4060 Suma::execTRANSID_AI(
Signal* signal)
4063 DBUG_ENTER(
"Suma::execTRANSID_AI");
4065 CRASH_INSERTION(13015);
4067 const Uint32 opPtrI = data->connectPtr;
4068 Uint32 length = signal->
length() - 3;
4070 if(f_bufferLock == 0){
4071 f_bufferLock = opPtrI;
4073 ndbrequire(f_bufferLock == opPtrI);
4076 if (signal->getNoOfSections())
4080 handle.getSection(dataPtr, 0);
4081 length = dataPtr.sz;
4082 copy(data->attrData, dataPtr);
4083 releaseSections(handle);
4087 c_syncPool.
getPtr(syncPtr, (opPtrI >> 16));
4090 Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4091 Uint32 * headers = f_buffer;
4092 const Uint32 * src = &data->attrData[0];
4093 const Uint32 *
const end = &src[length];
4095 const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4096 for(Uint32 i = 0; i<attribs; i++){
4097 Uint32 tmp = * src++;
4099 Uint32 len = AttributeHeader::getDataSize(tmp);
4101 memcpy(dst, src, 4 * len);
4106 f_trigBufferSize = sum;
4108 ndbrequire(src == end);
4110 if ((syncPtr.p->m_requestInfo & SubSyncReq::LM_Exclusive) == 0)
4112 sendScanSubTableData(signal, syncPtr, 0);
4119 Suma::execKEYINFO20(
Signal* signal)
4124 const Uint32 opPtrI = data->clientOpPtr;
4125 const Uint32 takeOver = data->scanInfo_Node;
4127 ndbrequire(f_bufferLock == opPtrI);
4130 c_syncPool.
getPtr(syncPtr, (opPtrI >> 16));
4131 sendScanSubTableData(signal, syncPtr, takeOver);
4135 Suma::sendScanSubTableData(
Signal* signal,
4138 const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4139 const Uint32 sum = f_trigBufferSize;
4145 ptr[0].p = f_buffer;
4146 ptr[0].sz = attribs;
4148 ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4151 SubscriptionPtr subPtr;
4152 c_subscriptions.
getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
4159 Uint32 ref = syncPtr.p->m_senderRef;
4160 sdata->tableId = syncPtr.p->m_tableId;
4161 sdata->senderData = syncPtr.p->m_senderData;
4162 sdata->requestInfo = 0;
4163 SubTableData::setOperation(sdata->requestInfo,
4164 NdbDictionary::Event::_TE_SCAN);
4167 sdata->takeOver = takeOver;
4169 ndbout_c(
"GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
4174 SubTableData::SignalLength, JBB,
4194 DBUG_ENTER(
"Suma::execTRIG_ATTRINFO");
4196 CRASH_INSERTION(13016);
4198 const Uint32 trigId = trg->getTriggerId();
4200 const Uint32 dataLen = signal->
length() - TrigAttrInfo::StaticLength;
4202 if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
4205 ndbrequire(b_bufferLock == trigId);
4207 memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
4208 b_trigBufferSize += dataLen;
4214 if(f_bufferLock == 0){
4215 f_bufferLock = trigId;
4216 f_trigBufferSize = 0;
4217 b_bufferLock = trigId;
4218 b_trigBufferSize = 0;
4220 ndbrequire(f_bufferLock == trigId);
4223 memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
4224 f_trigBufferSize += dataLen;
4231 #ifdef NODEFAIL_DEBUG2
4232 static int theCounts[64] = {0};
4236 Suma::get_responsible_node(Uint32 bucket)
const
4243 const Bucket* ptr= c_buckets + bucket;
4244 for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4246 node= ptr->m_nodes[
i];
4247 if(c_alive_nodes.
get(node))
4249 #ifdef NODEFAIL_DEBUG2
4251 ndbout_c(
"Suma:responsible n=%u, D=%u, id = %u, count=%u",
4252 n,D,
id, theCounts[node]);
4262 Suma::get_responsible_node(Uint32 bucket,
const NdbNodeBitmask& mask)
const
4266 const Bucket* ptr= c_buckets + bucket;
4267 for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4269 node= ptr->m_nodes[
i];
4280 Suma::check_switchover(Uint32 bucket, Uint64 gci)
4282 const Uint32 send_mask =
4283 Bucket::BUCKET_STARTING |
4284 Bucket::BUCKET_TAKEOVER |
4285 Bucket::BUCKET_SHUTDOWN_TO;
4287 bool send = c_buckets[bucket].m_state & send_mask;
4288 ndbassert(m_switchover_buckets.
get(bucket));
4289 if(unlikely(gci > c_buckets[bucket].m_switchover_gci))
4299 Uint32 * src_1, Uint32 sz_1,
4300 Uint32 * src_2, Uint32 sz_2)
4302 Uint32 noOfAttrs = 0, dataLen = 0;
4303 Uint32 * headers = signal->theData + 25;
4304 Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
4310 Uint32 tmp = * src_1 ++;
4312 Uint32 len = AttributeHeader::getDataSize(tmp);
4313 memcpy(dst, src_1, 4 * len);
4323 ptr[0].sz = noOfAttrs;
4324 ptr[1].sz = dataLen;
4329 return sz_2 > 0 ? 3 : 2;
4341 ndbassert(signal->getNoOfSections() == 0);
4342 Uint32 pageId = signal->theData[0];
4343 Uint32 len = signal->theData[1];
4345 if (pageId == RNIL && len == 0)
4351 out_of_buffer(signal);
4355 Uint32 * ptr =
reinterpret_cast<Uint32*
>(c_page_pool.
getPtr(pageId));
4358 Uint32 * save = ptr;
4359 Uint32 msglen = * ptr++;
4360 Uint32 siglen = * ptr++;
4361 Uint32 sec0len = * ptr++;
4362 Uint32 sec1len = * ptr++;
4363 Uint32 sec2len = * ptr++;
4368 Uint32 trigId = ((
FireTrigOrd*)ptr)->getTriggerId();
4369 memcpy(signal->theData, ptr, 4 * siglen);
4371 memcpy(f_buffer, ptr, 4*sec0len);
4373 memcpy(b_buffer, ptr, 4*sec1len);
4375 memcpy(f_buffer + sec0len, ptr, 4*sec2len);
4378 f_trigBufferSize = sec0len + sec2len;
4379 b_trigBufferSize = sec1len;
4380 f_bufferLock = trigId;
4381 b_bufferLock = trigId;
4385 ndbrequire(ptr == save + msglen);
4386 ndbrequire(len >= msglen);
4390 m_ctx.m_mm.release_page(RT_DBTUP_PAGE, pageId);
4397 DBUG_ENTER(
"Suma::execFIRE_TRIG_ORD");
4399 CRASH_INSERTION(13016);
4401 const Uint32 trigId = trg->getTriggerId();
4402 const Uint32 hashValue = trg->getHashValue();
4403 const Uint32 gci_hi = trg->getGCI();
4404 const Uint32 gci_lo = trg->m_gci_lo;
4405 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4406 const Uint32
event = trg->getTriggerEvent();
4407 const Uint32 any_value = trg->getAnyValue();
4408 const Uint32 transId1 = trg->m_transId1;
4409 const Uint32 transId2 = trg->m_transId2;
4412 c_subscriptionPool.
getPtr(subPtr, trigId & 0xFFFF);
4414 ndbassert(gci > m_last_complete_gci);
4416 if (signal->getNoOfSections())
4419 ndbassert(isNdbMtLqh());
4422 ndbrequire(b_bufferLock == 0);
4423 ndbrequire(f_bufferLock == 0);
4424 f_bufferLock = trigId;
4425 b_bufferLock = trigId;
4428 handle.getSection(ptr, 0);
4430 copy(f_buffer, ptr);
4432 handle.getSection(ptr, 2);
4433 copy(f_buffer + sz, ptr);
4434 f_trigBufferSize = sz + ptr.sz;
4436 handle.getSection(ptr, 1);
4437 copy(b_buffer, ptr);
4438 b_trigBufferSize = ptr.sz;
4439 releaseSections(handle);
4443 ndbrequire(f_bufferLock == trigId);
4451 Uint32 schemaVersion =
4452 c_tablePool.
getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion;
4454 Uint32 bucket= hashValue % c_no_of_buckets;
4455 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4456 if(m_active_buckets.
get(bucket) ||
4457 (m_switchover_buckets.
get(bucket) && (check_switchover(bucket, gci))))
4459 m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
4460 Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
4461 ndbrequire(sz == f_trigBufferSize);
4464 const Uint32 nptr= reformat(signal, ptr,
4465 f_buffer, f_trigBufferSize,
4466 b_buffer, b_trigBufferSize);
4468 for(Uint32 i =0; i < nptr; i++)
4474 data->gci_hi = gci_hi;
4475 data->gci_lo = gci_lo;
4476 data->tableId = tableId;
4477 data->requestInfo = 0;
4478 SubTableData::setOperation(data->requestInfo, event);
4480 data->anyValue = any_value;
4481 data->totalLen = ptrLen;
4482 data->transId1 = transId1;
4483 data->transId2 = transId2;
4488 for(list.
first(subbPtr); !subbPtr.isNull(); list.
next(subbPtr))
4490 data->senderData = subbPtr.p->m_senderData;
4491 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4492 SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
4498 const uint buffer_header_sz = 6;
4500 Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
4501 if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
4504 * dst++ = schemaVersion;
4505 * dst++ = (
event << 16) | f_trigBufferSize;
4506 * dst++ = any_value;
4509 memcpy(dst, f_buffer, f_trigBufferSize << 2);
4510 dst += f_trigBufferSize;
4511 memcpy(dst, b_buffer, b_trigBufferSize << 2);
4519 Suma::checkMaxBufferedEpochs(
Signal *signal)
4526 if (c_gcp_list.isEmpty())
4531 c_gcp_list.
first(gcp);
4532 if (ERROR_INSERTED(13037))
4535 CLEAR_ERROR_INSERT_VALUE;
4536 ndbout_c(
"Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)",
4537 c_maxBufferedEpochs, m_max_seen_gci,
4538 m_last_complete_gci, gcp.p->m_gci);
4540 else if (c_gcp_list.count() < c_maxBufferedEpochs)
4547 ndbout_c(
"Found lagging epoch %llu", gcp.p->m_gci);
4548 for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++)
4550 if (subs.
get(nodeId))
4556 signal->theData[1] = 1;
4557 signal->theData[2] = nodeId;
4558 signal->theData[3] = (Uint32) gcp.p->m_gci;
4559 signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
4560 signal->theData[5] = (Uint32) c_gcp_list.count();
4561 signal->theData[6] = c_maxBufferedEpochs;
4562 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB);
4567 signal->theData[0] = nodeId;
4568 sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
4577 ndbassert(signal->getNoOfSections() == 0);
4580 Uint32 gci_hi = rep->gci_hi;
4581 Uint32 gci_lo = rep->gci_lo;
4582 Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4584 if (isNdbMtLqh() && m_gcp_rep_cnt > 1)
4590 printf(
"execSUB_GCP_COMPLETE_REP(%u/%u)", gci_hi, gci_lo);
4592 Uint32 min = m_min_gcp_rep_counter_index;
4593 Uint32 sz = NDB_ARRAY_SIZE(m_gcp_rep_counter);
4594 for (Uint32 i = min; i != m_max_gcp_rep_counter_index; i = (i + 1) % sz)
4597 if (m_gcp_rep_counter[i].m_gci == gci)
4600 m_gcp_rep_counter[
i].m_cnt ++;
4601 if (m_gcp_rep_counter[i].m_cnt == m_gcp_rep_cnt)
4610 m_gcp_rep_counter[
i] = m_gcp_rep_counter[min];
4612 m_min_gcp_rep_counter_index = (min + 1) % sz;
4614 ndbout_c(
" found - complete after: (min: %u max: %u)",
4615 m_min_gcp_rep_counter_index,
4616 m_max_gcp_rep_counter_index);
4623 ndbout_c(
" found - wait unchanged: (min: %u max: %u)",
4624 m_min_gcp_rep_counter_index,
4625 m_max_gcp_rep_counter_index);
4633 Uint32 next = (m_max_gcp_rep_counter_index + 1) % sz;
4634 ndbrequire(next != min);
4635 m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_gci = gci;
4636 m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_cnt = 1;
4637 m_max_gcp_rep_counter_index = next;
4639 ndbout_c(
" new - after: (min: %u max: %u)",
4640 m_min_gcp_rep_counter_index,
4641 m_max_gcp_rep_counter_index);
4646 Uint32
flags = (m_missing_data)
4647 ? rep->flags | SubGcpCompleteRep::MISSING_DATA
4650 if (ERROR_INSERTED(13036))
4653 CLEAR_ERROR_INSERT_VALUE;
4654 ndbout_c(
"Simulating out of event buffer at node failure");
4655 flags |= SubGcpCompleteRep::MISSING_DATA;
4659 if (m_gcp_monitor == 0)
4662 else if (gci_hi == Uint32(m_gcp_monitor >> 32))
4664 ndbrequire(gci_lo == Uint32(m_gcp_monitor) + 1);
4668 ndbrequire(gci_hi == Uint32(m_gcp_monitor >> 32) + 1);
4669 ndbrequire(gci_lo == 0);
4671 m_gcp_monitor = gci;
4674 m_last_complete_gci = gci;
4675 checkMaxBufferedEpochs(signal);
4676 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4681 if(!m_switchover_buckets.
isclear())
4683 bool unlock =
false;
4684 Uint32 i = m_switchover_buckets.
find(0);
4685 for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.
find(i + 1))
4687 if(gci > c_buckets[i].m_switchover_gci)
4689 Uint32 state = c_buckets[
i].m_state;
4690 m_switchover_buckets.
clear(i);
4691 printf(
"%u/%u (%u/%u) switchover complete bucket %d state: %x",
4694 Uint32(c_buckets[i].m_switchover_gci >> 32),
4695 Uint32(c_buckets[i].m_switchover_gci),
4698 if(state & Bucket::BUCKET_STARTING)
4704 m_active_buckets.
set(i);
4705 c_buckets[
i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
4706 ndbout_c(
"starting");
4707 m_gcp_complete_rep_count++;
4710 else if(state & Bucket::BUCKET_TAKEOVER)
4716 Bucket* bucket= c_buckets +
i;
4717 Page_pos pos= bucket->m_buffer_head;
4718 ndbrequire(pos.m_max_gci < gci);
4720 Buffer_page*
page= c_page_pool.
getPtr(pos.m_page_id);
4721 ndbout_c(
"takeover %d", pos.m_page_id);
4722 page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
4723 page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
4724 ndbassert(pos.m_max_gci != 0);
4725 page->m_words_used = pos.m_page_pos;
4726 page->m_next_page = RNIL;
4727 memset(&bucket->m_buffer_head, 0,
sizeof(bucket->m_buffer_head));
4728 bucket->m_buffer_head.m_page_id = RNIL;
4729 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
4731 m_active_buckets.
set(i);
4732 m_gcp_complete_rep_count++;
4733 c_buckets[
i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
4735 else if (state & Bucket::BUCKET_HANDOVER)
4741 c_buckets[
i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
4742 m_gcp_complete_rep_count--;
4743 ndbout_c(
"handover");
4745 else if (state & Bucket::BUCKET_CREATED_MASK)
4748 Uint32 cnt = state >> 8;
4749 Uint32 mask = Uint32(Bucket::BUCKET_CREATED_MASK) | (cnt << 8);
4750 c_buckets[
i].m_state &= ~mask;
4751 flags |= SubGcpCompleteRep::ADD_CNT;
4752 flags |= (cnt << 16);
4753 ndbout_c(
"add %u %s", cnt,
4754 state & Bucket::BUCKET_CREATED_SELF ?
"self" :
"other");
4755 if (state & Bucket::BUCKET_CREATED_SELF &&
4756 get_responsible_node(i) == getOwnNodeId())
4759 m_active_buckets.
set(i);
4760 m_gcp_complete_rep_count++;
4763 else if (state & Bucket::BUCKET_DROPPED_MASK)
4766 Uint32 cnt = state >> 8;
4767 Uint32 mask = Uint32(Bucket::BUCKET_DROPPED_MASK) | (cnt << 8);
4768 c_buckets[
i].m_state &= ~mask;
4769 flags |= SubGcpCompleteRep::SUB_CNT;
4770 flags |= (cnt << 16);
4771 ndbout_c(
"sub %u %s", cnt,
4772 state & Bucket::BUCKET_DROPPED_SELF ?
"self" :
"other");
4773 if (state & Bucket::BUCKET_DROPPED_SELF)
4775 m_active_buckets.
clear(i);
4779 else if (state & Bucket::BUCKET_SHUTDOWN)
4782 Uint32 nodeId = c_buckets[
i].m_switchover_node;
4783 ndbrequire(nodeId == getOwnNodeId());
4784 m_active_buckets.
clear(i);
4785 m_gcp_complete_rep_count--;
4786 ndbout_c(
"shutdown handover");
4787 c_buckets[
i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN;
4789 else if (state & Bucket::BUCKET_SHUTDOWN_TO)
4792 Uint32 nodeId = c_buckets[
i].m_switchover_node;
4794 nodegroup.
clear(nodeId);
4795 ndbrequire(get_responsible_node(i) == nodeId &&
4796 get_responsible_node(i, nodegroup) == getOwnNodeId());
4797 m_active_buckets.
set(i);
4798 m_gcp_complete_rep_count++;
4799 ndbout_c(
"shutdown takover");
4800 c_buckets[
i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN_TO;
4805 if (m_switchover_buckets.
isclear())
4809 c_startup.m_handover_nodes.isclear())
4812 sendSTTORRY(signal);
4817 ndbrequire(c_shutdown.m_wait_handover);
4819 conf->senderData = c_shutdown.m_senderData;
4820 conf->senderRef = reference();
4821 sendSignal(c_shutdown.m_senderRef, GSN_STOP_ME_CONF, signal,
4822 StopMeConf::SignalLength, JBB);
4823 c_shutdown.m_wait_handover =
false;
4831 send_dict_unlock_ord(signal, DictLockReq::SumaHandOver);
4835 if(ERROR_INSERTED(13010))
4837 CLEAR_ERROR_INSERT_VALUE;
4838 ndbout_c(
"Don't send GCP_COMPLETE_REP(%llu)", gci);
4845 rep->gci_hi = gci_hi;
4846 rep->gci_lo = gci_lo;
4848 rep->senderRef = reference();
4849 rep->gcp_complete_rep_count = m_gcp_complete_rep_count;
4851 if(m_gcp_complete_rep_count && !c_subscriber_nodes.
isclear())
4853 CRASH_INSERTION(13033);
4856 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
4857 SubGcpCompleteRep::SignalLength, JBB);
4860 if(c_gcp_list.seize(gcp))
4863 gcp.p->m_subscribers = c_subscriber_nodes;
4868 c_subscriber_nodes.
getText(buf);
4869 g_eventLogger->
error(
"c_gcp_list.seize() failed: gci: %llu nodes: %s",
4877 bool subscribers = !c_subscriber_nodes.
isclear();
4878 for(Uint32 i = 0; i<c_no_of_buckets; i++)
4880 if(m_active_buckets.
get(i))
4883 if (subscribers || (c_buckets[i].m_state & Bucket::BUCKET_RESEND))
4886 get_buffer_ptr(signal, i, gci, 0);
4890 if(m_out_of_buffer_gci && gci > m_out_of_buffer_gci)
4894 m_out_of_buffer_gci = 0;
4895 m_missing_data =
false;
4901 m_gcp_complete_rep_count = 0;
4903 c_nodes_in_nodegroup_mask.
clear();
4909 Suma::execCREATE_TAB_CONF(
Signal *signal)
4912 DBUG_ENTER(
"Suma::execCREATE_TAB_CONF");
4921 ndbassert(signal->getNoOfSections() == 0);
4924 Uint32 senderRef= conf->senderRef;
4925 Uint32 tableId= conf->tableId;
4934 DBUG_PRINT(
"info",(
"drop table id: %d[i=%u]", tableId, tabPtr.i));
4935 const Table::State old_state = tabPtr.p->m_state;
4936 tabPtr.p->m_state = Table::DROPPED;
4945 const Uint64 gci = get_current_gci(signal);
4947 data->gci_hi = Uint32(gci >> 32);
4948 data->gci_lo = Uint32(gci);
4949 data->tableId = tableId;
4950 data->requestInfo = 0;
4951 SubTableData::setOperation(data->requestInfo,
4952 NdbDictionary::Event::_TE_DROP);
4953 SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
4957 tabPtr.p->m_subscriptions);
4959 for (subList.
first(subPtr); !subPtr.isNull(); subList.
next(subPtr))
4962 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
4970 if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
4978 for(list.
first(ptr); !ptr.isNull(); list.
next(ptr))
4981 data->senderData= ptr.p->m_senderData;
4982 sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4983 SubTableData::SignalLength, JBB);
4988 if (old_state == Table::DEFINING)
4994 if (tabPtr.p->m_subscriptions.isEmpty())
4997 tabPtr.p->release(*
this);
5010 tabPtr.p->m_subscriptions);
5011 subList.
first(subPtr);
5013 while (!subPtr.isNull())
5018 tabPtr.p->m_subscriptions);
5019 subList.
next(subPtr);
5036 Uint32 senderRef= req->senderRef;
5037 Uint32 tableId= req->tableId;
5038 Uint32 changeMask= req->changeMask;
5044 handle.getSection(tabInfoPtr, 0);
5049 releaseSections(handle);
5056 releaseSections(handle);
5062 ndbout_c(
"DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
5064 getSectionSegmentPool());
5067 copy(b_dti_buf, tabInfoPtr);
5068 releaseSections(handle);
5071 lptr[0].p = b_dti_buf;
5072 lptr[0].sz = tabInfoPtr.sz;
5074 const Uint64 gci = get_current_gci(signal);
5076 data->gci_hi = Uint32(gci >> 32);
5077 data->gci_lo = Uint32(gci);
5078 data->tableId = tableId;
5079 data->requestInfo = 0;
5080 SubTableData::setOperation(data->requestInfo,
5081 NdbDictionary::Event::_TE_ALTER);
5082 SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
5084 data->changeMask = changeMask;
5085 data->totalLen = tabInfoPtr.sz;
5088 tabPtr.p->m_subscriptions);
5090 for (subList.
first(subPtr); !subPtr.isNull(); subList.
next(subPtr))
5092 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
5100 if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5108 for(list.
first(ptr); !ptr.isNull(); list.
next(ptr))
5111 data->senderData= ptr.p->m_senderData;
5113 sendFragmentedSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
5114 SubTableData::SignalLength, JBB, lptr, 1, c);
5120 Suma::execSUB_GCP_COMPLETE_ACK(
Signal* signal)
5123 ndbassert(signal->getNoOfSections() == 0);
5126 Uint32 gci_hi = ack->rep.gci_hi;
5127 Uint32 gci_lo = ack->rep.gci_lo;
5128 Uint32 senderRef = ack->rep.senderRef;
5129 if (unlikely(signal->getLength() < SubGcpCompleteAck::SignalLength))
5132 ndbassert(!ndb_check_micro_gcp(
getNodeInfo(refToNode(senderRef)).m_version));
5136 Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
5137 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
5139 if (ERROR_INSERTED(13037))
5142 ndbout_c(
"Simulating exceeding the MaxBufferedEpochs, ignoring ack");
5146 if (refToBlock(senderRef) == SUMA)
5151 Uint32 nodeId= refToNode(senderRef);
5152 for(Uint32 i = 0; i<c_no_of_buckets; i++)
5154 if(m_active_buckets.
get(i) ||
5155 (m_switchover_buckets.
get(i) && (check_switchover(i, gci))) ||
5156 (!m_switchover_buckets.
get(i) && get_responsible_node(i) == nodeId))
5158 release_gci(signal, i, gci);
5166 Uint32 nodeId = refToNode(senderRef);
5167 if (ERROR_INSERTED(13023))
5169 ndbout_c(
"Throwing SUB_GCP_COMPLETE_ACK gci: %u/%u from %u",
5170 Uint32(gci>>32), Uint32(gci), nodeId);
5177 for(c_gcp_list.
first(gcp); !gcp.isNull(); c_gcp_list.
next(gcp))
5179 if(gcp.p->m_gci == gci)
5181 gcp.p->m_subscribers.clear(nodeId);
5182 gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
5183 if(!gcp.p->m_subscribers.isclear())
5194 g_eventLogger->
warning(
"ACK wo/ gcp record (gci: %u/%u) ref: %.8x from: %.8x",
5195 Uint32(gci >> 32), Uint32(gci),
5196 senderRef, signal->getSendersBlockRef());
5200 c_gcp_list.release(gcp);
5203 CRASH_INSERTION(13011);
5204 if(ERROR_INSERTED(13012))
5206 CLEAR_ERROR_INSERT_VALUE;
5207 ndbout_c(
"Don't redistribute SUB_GCP_COMPLETE_ACK");
5211 ack->rep.senderRef = reference();
5213 sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
5214 SubGcpCompleteAck::SignalLength, JBB);
5227 DBUG_ENTER(
"Suma::execSUB_REMOVE_REQ");
5229 CRASH_INSERTION(13021);
5234 key.m_subscriptionId = req.subscriptionId;
5235 key.m_subscriptionKey = req.subscriptionKey;
5237 if (c_startup.m_restart_server_node_id == RNIL)
5244 sendSubRemoveRef(signal, req, SubRemoveRef::NotStarted);
5248 bool found = c_subscriptions.
find(subPtr, key);
5253 sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription);
5257 switch(subPtr.p->m_state){
5258 case Subscription::UNDEFINED:
5261 case Subscription::DEFINING:
5263 sendSubRemoveRef(signal, req, SubRemoveRef::Defining);
5265 case Subscription::DEFINED:
5266 if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5272 sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
5278 subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5282 conf->senderRef = reference();
5283 conf->senderData = req.senderData;
5284 conf->subscriptionId = req.subscriptionId;
5285 conf->subscriptionKey = req.subscriptionKey;
5287 sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
5288 SubRemoveConf::SignalLength, JBB);
5295 if (!subPtr.p->m_subscribers.isEmpty())
5301 if (!subPtr.p->m_start_req.isEmpty())
5307 if (!subPtr.p->m_stop_req.isEmpty())
5313 switch(subPtr.p->m_trigger_state){
5314 case Subscription::T_UNDEFINED:
5317 case Subscription::T_CREATING:
5323 case Subscription::T_DEFINED:
5325 subPtr.p->m_trigger_state = Subscription::T_DROPPING;
5326 drop_triggers(signal, subPtr);
5328 case Subscription::T_DROPPING:
5334 case Subscription::T_ERROR:
5347 if (tabPtr.p->m_state == Table::DROPPED)
5350 subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5353 if ((subPtr.p->m_options & Subscription::MARKED_DROPPED) == 0)
5361 tabPtr.p->m_subscriptions);
5365 if (tabPtr.p->m_subscriptions.isEmpty())
5368 switch(tabPtr.p->m_state){
5369 case Table::UNDEFINED:
5371 case Table::DEFINING:
5373 case Table::DEFINED:
5377 case Table::DROPPED:
5379 tabPtr.p->release(*
this);
5384 c_subscriptions.
release(subPtr);
5388 Suma::sendSubRemoveRef(
Signal* signal,
5393 DBUG_ENTER(
"Suma::sendSubRemoveRef");
5395 ref->senderRef = reference();
5396 ref->senderData = req.senderData;
5397 ref->subscriptionId = req.subscriptionId;
5398 ref->subscriptionKey = req.subscriptionKey;
5399 ref->errorCode = errCode;
5400 sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
5401 signal, SubRemoveRef::SignalLength, JBB);
5406 Suma::Table::release(
Suma & suma){
5409 m_state = UNDEFINED;
5413 Suma::SyncRecord::release(){
5441 Uint32 retref = signal->getSendersBlockRef();
5442 if (c_restart.m_ref)
5446 ref->errorCode = SumaStartMeRef::Busy;
5447 sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5448 SumaStartMeRef::SignalLength, JBB);
5456 ref->errorCode = SumaStartMeRef::NotStarted;
5457 sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5458 SumaStartMeRef::SignalLength, JBB);
5463 if (c_subOpPool.
seize(subOpPtr) ==
false)
5467 ref->errorCode = SumaStartMeRef::Busy;
5468 sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5469 SumaStartMeRef::SignalLength, JBB);
5473 subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ;
5475 c_restart.m_abort = 0;
5476 c_restart.m_waiting_on_self = 0;
5477 c_restart.m_ref = retref;
5478 c_restart.m_max_seq = c_current_seq;
5479 c_restart.m_subOpPtrI = subOpPtr.i;
5482 if (c_subscriptions.
first(it))
5503 c_subOpPool.
getPtr(subOpPtr, c_restart.m_subOpPtrI);
5506 if (!subPtr.isNull())
5509 c_restart.m_subPtrI = subPtr.i;
5510 c_restart.m_bucket = it.bucket;
5513 bool empty = list.isEmpty();
5522 c_restart.m_waiting_on_self = 1;
5533 sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal,
5534 SumaStartMeConf::SignalLength, JBB);
5536 c_subOpPool.
release(subOpPtr);
5537 c_restart.m_ref = 0;
5547 if (c_restart.m_abort)
5550 abort_start_me(signal, subPtr,
true);
5554 c_restart.m_waiting_on_self = 0;
5556 req->senderRef = reference();
5557 req->senderData = subPtr.i;
5558 req->subscriptionId = subPtr.p->m_subscriptionId;
5559 req->subscriptionKey = subPtr.p->m_subscriptionKey;
5560 req->subscriptionType = subPtr.p->m_subscriptionType;
5562 req->schemaTransId = 0;
5564 if (subPtr.p->m_options & Subscription::REPORT_ALL)
5566 req->subscriptionType |= SubCreateReq::ReportAll;
5569 if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
5571 req->subscriptionType |= SubCreateReq::ReportSubscribe;
5574 if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5576 req->subscriptionType |= SubCreateReq::NoReportDDL;
5579 if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5581 req->subscriptionType |= SubCreateReq::NR_Sub_Dropped;
5582 ndbout_c(
"copying dropped sub: %u", subPtr.i);
5586 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
5587 if (tabPtr.p->m_state != Table::DROPPED)
5590 c_restart.m_waiting_on_self = 0;
5604 sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
5605 SubCreateReq::SignalLength, JBB);
5610 ndbout_c(
"not copying sub %u with dropped table: %u/%u",
5612 tabPtr.p->m_tableId, tabPtr.i);
5614 c_restart.m_waiting_on_self = 1;
5616 conf->senderRef = reference();
5617 conf->senderData = subPtr.i;
5618 sendSignal(reference(), GSN_SUB_CREATE_CONF, signal,
5619 SubCreateConf::SignalLength, JBB);
5629 Uint32 error= ref->errorCode;
5633 ref->errorCode = error;
5634 sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5635 SumaStartMeRef::SignalLength, JBB);
5639 c_subscriptionPool.
getPtr(subPtr, c_restart.m_subPtrI);
5640 abort_start_me(signal, subPtr,
true);
5652 c_subscriptionPool.
getPtr(subPtr, c_restart.m_subPtrI);
5654 c_restart.m_waiting_on_self = 0;
5660 if (c_restart.m_abort)
5663 abort_start_me(signal, subPtr,
true);
5668 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
5671 if (tabPtr.p->m_state != Table::DROPPED)
5681 ndbout_c(
"not copying subscribers on sub: %u with dropped table %u/%u",
5682 subPtr.i, tabPtr.p->m_tableId, tabPtr.i);
5685 copySubscriber(signal, subPtr, ptr);
5689 Suma::copySubscriber(
Signal* signal,
5698 req->senderRef = reference();
5699 req->senderData = ptr.i;
5700 req->subscriptionId = subPtr.p->m_subscriptionId;
5701 req->subscriptionKey = subPtr.p->m_subscriptionKey;
5702 req->part = SubscriptionData::TableData;
5703 req->subscriberData = ptr.p->m_senderData;
5704 req->subscriberRef = ptr.p->m_senderRef;
5706 sendSignal(c_restart.m_ref, GSN_SUB_START_REQ,
5707 signal, SubStartReq::SignalLength, JBB);
5714 c_subOpPool.
getPtr(subOpPtr, c_restart.m_subOpPtrI);
5715 check_remove_queue(signal, subPtr, subOpPtr,
true,
false);
5720 it.bucket = c_restart.m_bucket;
5721 c_subscriptions.
next(it);
5727 Suma::execSUB_START_CONF(
Signal* signal)
5734 c_subscriptionPool.
getPtr(subPtr, c_restart.m_subPtrI);
5741 copySubscriber(signal, subPtr, ptr);
5745 Suma::execSUB_START_REF(
Signal* signal)
5750 Uint32 errorCode = sig->errorCode;
5754 ref->errorCode = errorCode;
5755 sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5756 SumaStartMeRef::SignalLength, JBB);
5760 c_subscriptionPool.
getPtr(subPtr, c_restart.m_subPtrI);
5762 abort_start_me(signal, subPtr,
true);
5770 c_subOpPool.
getPtr(subOpPtr, c_restart.m_subOpPtrI);
5771 check_remove_queue(signal, subPtr, subOpPtr, lockowner,
true);
5774 c_restart.m_ref = 0;
5778 Suma::execSUMA_HANDOVER_REQ(
Signal* signal)
5781 DBUG_ENTER(
"Suma::execSUMA_HANDOVER_REQ");
5784 signal->getDataPtr());
5786 Uint32 gci = req->gci;
5787 Uint32 nodeId = req->nodeId;
5788 Uint32 new_gci = Uint32(m_last_complete_gci >> 32) + MAX_CONCURRENT_GCP + 1;
5789 Uint32 requestType = req->requestType;
5790 if (!ndbd_suma_stop_me(
getNodeInfo(nodeId).m_version))
5793 requestType = SumaHandoverReq::RT_START_NODE;
5796 Uint32 start_gci = (gci > new_gci ? gci : new_gci);
5800 if (requestType == SumaHandoverReq::RT_START_NODE)
5803 c_alive_nodes.
set(nodeId);
5805 ndbout_c(
"%u c_alive_nodes.set(%u)", __LINE__, nodeId);
5807 for( Uint32 i = 0; i < c_no_of_buckets; i++)
5809 if(get_responsible_node(i) == nodeId)
5811 if (m_active_buckets.
get(i))
5815 m_active_buckets.
clear(i);
5816 m_switchover_buckets.
set(i);
5817 c_buckets[
i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5818 c_buckets[
i].m_state |= Bucket::BUCKET_HANDOVER;
5819 c_buckets[
i].m_switchover_node = nodeId;
5820 ndbout_c(
"prepare to handover bucket: %d", i);
5822 else if(m_switchover_buckets.
get(i))
5824 ndbout_c(
"dont handover bucket: %d %d", i, nodeId);
5829 else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5833 for( Uint32 i = 0; i < c_no_of_buckets; i++)
5836 nodegroup.
clear(nodeId);
5837 if(get_responsible_node(i) == nodeId &&
5838 get_responsible_node(i, nodegroup) == getOwnNodeId())
5843 m_switchover_buckets.
set(i);
5844 c_buckets[
i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5845 c_buckets[
i].m_state |= Bucket::BUCKET_SHUTDOWN_TO;
5846 c_buckets[
i].m_switchover_node = nodeId;
5847 ndbout_c(
"prepare to takeover bucket: %d", i);
5859 tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
5860 conf->gci = start_gci;
5861 conf->nodeId = getOwnNodeId();
5862 conf->requestType = requestType;
5863 sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
5864 SumaHandoverConf::SignalLength, JBB);
5870 signal->theData[0] = 111;
5871 signal->theData[1] = getOwnNodeId();
5872 signal->theData[2] = nodeId;
5873 sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_REF, signal, 3, JBB);
5879 Suma::execSUMA_HANDOVER_REF(
Signal* signal)
5885 Suma::execSUMA_HANDOVER_CONF(
Signal* signal) {
5887 DBUG_ENTER(
"Suma::execSUMA_HANDOVER_CONF");
5890 signal->getDataPtr());
5892 CRASH_INSERTION(13043);
5894 Uint32 gci = conf->gci;
5895 Uint32 nodeId = conf->nodeId;
5896 Uint32 requestType = conf->requestType;
5898 tmp.
assign(BUCKET_MASK_SIZE, conf->theBucketMask);
5899 #ifdef HANDOVER_DEBUG
5900 ndbout_c(
"Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
5903 if (!ndbd_suma_stop_me(
getNodeInfo(nodeId).m_version))
5906 requestType = SumaHandoverReq::RT_START_NODE;
5909 if (requestType == SumaHandoverReq::RT_START_NODE)
5912 for (Uint32 i = 0; i < c_no_of_buckets; i++)
5917 ndbout_c(
"%u : %u %u", i, get_responsible_node(i), getOwnNodeId());
5918 ndbrequire(get_responsible_node(i) == getOwnNodeId());
5920 c_buckets[
i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5921 c_buckets[
i].m_state |= Bucket::BUCKET_STARTING;
5927 infoEvent(
"Suma: handover from node %u gci: %u buckets: %s (%u)",
5928 nodeId, gci, buf, c_no_of_buckets);
5929 g_eventLogger->
info(
"Suma: handover from node %u gci: %u buckets: %s (%u)",
5930 nodeId, gci, buf, c_no_of_buckets);
5931 m_switchover_buckets.
bitOR(tmp);
5932 c_startup.m_handover_nodes.clear(nodeId);
5935 else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5938 for (Uint32 i = 0; i < c_no_of_buckets; i++)
5942 ndbrequire(get_responsible_node(i) == getOwnNodeId());
5944 c_buckets[
i].m_switchover_node = getOwnNodeId();
5945 c_buckets[
i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5946 c_buckets[
i].m_state |= Bucket::BUCKET_SHUTDOWN;
5952 infoEvent(
"Suma: handover to node %u gci: %u buckets: %s (%u)",
5953 nodeId, gci, buf, c_no_of_buckets);
5954 g_eventLogger->
info(
"Suma: handover to node %u gci: %u buckets: %s (%u)",
5955 nodeId, gci, buf, c_no_of_buckets);
5956 m_switchover_buckets.
bitOR(tmp);
5957 c_startup.m_handover_nodes.clear(nodeId);
5968 ndbrequire(refToNode(req.senderRef) == getOwnNodeId());
5969 ndbrequire(c_shutdown.m_wait_handover ==
false);
5970 c_shutdown.m_wait_handover =
true;
5971 c_shutdown.m_senderRef = req.senderRef;
5972 c_shutdown.m_senderData = req.senderData;
5974 for (Uint32 i = c_nodes_in_nodegroup_mask.
find(0);
5975 i != c_nodes_in_nodegroup_mask.NotFound ;
5976 i = c_nodes_in_nodegroup_mask.
find(i + 1))
5988 "Not all versions support graceful shutdown (suma)."
5989 " Shutdown directly instead");
5991 NDBD_EXIT_GRACEFUL_SHUTDOWN_ERROR,
5996 send_handover_req(signal, SumaHandoverReq::RT_STOP_NODE);
6004 out <<
"[ Page_pos:"
6005 <<
" m_page_id: " << pos.m_page_id
6006 <<
" m_page_pos: " << pos.m_page_pos
6007 <<
" m_max_gci: " << pos.m_max_gci
6014 Suma::get_buffer_ptr(
Signal* signal, Uint32 buck, Uint64 gci, Uint32 sz)
6017 Bucket* bucket= c_buckets+buck;
6018 Page_pos pos= bucket->m_buffer_head;
6020 Buffer_page*
page = 0;
6023 if (likely(pos.m_page_id != RNIL))
6025 page= c_page_pool.
getPtr(pos.m_page_id);
6026 ptr= page->m_data + pos.m_page_pos;
6029 const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
6031 pos.m_page_pos += sz;
6032 pos.m_last_gci = gci;
6033 Uint64 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
6035 if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
6037 pos.m_max_gci = max;
6038 bucket->m_buffer_head = pos;
6039 * ptr++ = (0x8000 << 16) | sz;
6042 else if(pos.m_page_pos + Buffer_page::GCI_SZ32 <= Buffer_page::DATA_WORDS)
6045 pos.m_max_gci = max;
6046 pos.m_page_pos += Buffer_page::GCI_SZ32;
6047 bucket->m_buffer_head = pos;
6048 * ptr++ = (sz + Buffer_page::GCI_SZ32);
6049 * ptr++ = (Uint32)(gci >> 32);
6050 * ptr++ = (Uint32)(gci & 0xFFFFFFFF);
6061 if(unlikely((next= seize_page()) == RNIL))
6066 out_of_buffer(signal);
6070 if(likely(pos.m_page_id != RNIL))
6072 page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
6073 page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
6074 page->m_words_used = pos.m_page_pos - sz;
6075 page->m_next_page= next;
6076 ndbassert(pos.m_max_gci != 0);
6080 bucket->m_buffer_tail = next;
6083 memset(&pos, 0,
sizeof(pos));
6084 pos.m_page_id = next;
6085 pos.m_page_pos = sz;
6086 pos.m_last_gci = gci;
6088 page= c_page_pool.
getPtr(pos.m_page_id);
6089 page->m_next_page= RNIL;
6096 Suma::out_of_buffer(
Signal* signal)
6098 if(m_out_of_buffer_gci)
6103 m_out_of_buffer_gci = m_last_complete_gci - 1;
6104 infoEvent(
"Out of event buffer: nodefailure will cause event failures");
6105 m_missing_data =
false;
6106 out_of_buffer_release(signal, 0);
6110 Suma::out_of_buffer_release(
Signal* signal, Uint32 buck)
6112 Bucket* bucket= c_buckets+buck;
6113 Uint32 tail= bucket->m_buffer_tail;
6117 Buffer_page* page= c_page_pool.
getPtr(tail);
6118 bucket->m_buffer_tail = page->m_next_page;
6119 free_page(tail, page);
6120 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6121 signal->theData[1] = buck;
6122 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6129 bucket->m_buffer_head.m_page_id = RNIL;
6130 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
6133 if(buck != c_no_of_buckets)
6135 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6136 signal->theData[1] = buck;
6137 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6145 m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci
6146 ? m_max_seen_gci : m_last_complete_gci;
6147 m_missing_data =
false;
6153 if (ERROR_INSERTED(13038))
6156 CLEAR_ERROR_INSERT_VALUE;
6157 ndbout_c(
"Simulating out of event buffer");
6158 m_out_of_buffer_gci = m_max_seen_gci;
6160 if(unlikely(m_out_of_buffer_gci))
6166 Uint32 ref= m_first_free_page;
6167 if(likely(ref != RNIL))
6169 m_first_free_page = (c_page_pool.
getPtr(ref))->m_next_page;
6170 Uint32 chunk = (c_page_pool.
getPtr(ref))->m_page_chunk_ptr_i;
6171 c_page_chunk_pool.
getPtr(ptr, chunk);
6172 ndbassert(ptr.p->m_free);
6177 if(!c_page_chunk_pool.
seize(ptr))
6181 m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &ref, &count, 1);
6185 ndbout_c(
"alloc_chunk(%d %d) - ", ref, count);
6187 m_first_free_page = ptr.p->m_page_id = ref;
6188 ptr.p->m_size = count;
6189 ptr.p->m_free = count;
6193 for(Uint32 i = 0; i<count; i++)
6195 page = c_page_pool.
getPtr(ref);
6196 page->m_page_state= SUMA_SEQUENCE;
6197 page->m_page_chunk_ptr_i = ptr.i;
6198 page->m_next_page = ++ref;
6200 page->m_next_page = RNIL;
6206 Suma::free_page(Uint32 page_id, Buffer_page* page)
6209 ndbrequire(page->m_page_state == SUMA_SEQUENCE);
6211 Uint32 chunk= page->m_page_chunk_ptr_i;
6213 c_page_chunk_pool.
getPtr(ptr, chunk);
6216 page->m_next_page = m_first_free_page;
6217 ndbrequire(ptr.p->m_free <= ptr.p->m_size);
6219 m_first_free_page = page_id;
6223 Suma::release_gci(
Signal* signal, Uint32 buck, Uint64 gci)
6225 Bucket* bucket= c_buckets+buck;
6226 Uint32 tail= bucket->m_buffer_tail;
6227 Page_pos head= bucket->m_buffer_head;
6228 Uint64 max_acked = bucket->m_max_acked_gci;
6230 const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
6231 if(unlikely(bucket->m_state & mask))
6234 ndbout_c(
"release_gci(%d, %u/%u) 0x%x-> node failure -> abort",
6235 buck, Uint32(gci >> 32), Uint32(gci), bucket->m_state);
6239 bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
6240 if(unlikely(tail == RNIL))
6245 if(tail == head.m_page_id)
6247 if(gci >= head.m_max_gci)
6250 if (ERROR_INSERTED(13034))
6253 SET_ERROR_INSERT_VALUE(13035);
6256 if (ERROR_INSERTED(13035))
6258 CLEAR_ERROR_INSERT_VALUE;
6260 rg.m_nodes.clear(getOwnNodeId());
6261 signal->theData[0] = 9999;
6262 sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA);
6265 head.m_page_pos = 0;
6266 head.m_max_gci = gci;
6267 head.m_last_gci = 0;
6268 bucket->m_buffer_head = head;
6275 Buffer_page* page= c_page_pool.
getPtr(tail);
6276 Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6277 Uint32 next_page = page->m_next_page;
6279 ndbassert(max_gci != 0);
6284 free_page(tail, page);
6286 bucket->m_buffer_tail = next_page;
6287 signal->theData[0] = SumaContinueB::RELEASE_GCI;
6288 signal->theData[1] = buck;
6289 signal->theData[2] = (Uint32)(gci >> 32);
6290 signal->theData[3] = (Uint32)(gci & 0xFFFFFFFF);
6291 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 4, JBB);
6301 static Uint32 g_cnt = 0;
6304 Suma::start_resend(
Signal* signal, Uint32 buck)
6306 printf(
"start_resend(%d, ", buck);
6311 Bucket* bucket= c_buckets + buck;
6312 Page_pos pos= bucket->m_buffer_head;
6314 if(m_out_of_buffer_gci)
6317 c_gcp_list.
last(gcp);
6319 signal->theData[1] = 2;
6320 signal->theData[2] = 0;
6321 signal->theData[3] = (Uint32) pos.m_max_gci;
6322 signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
6323 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
6324 m_missing_data =
true;
6328 if(pos.m_page_id == RNIL)
6331 m_active_buckets.
set(buck);
6332 m_gcp_complete_rep_count ++;
6333 ndbout_c(
"empty bucket(RNIL) -> active max_acked: %u/%u max_gci: %u/%u",
6334 Uint32(bucket->m_max_acked_gci >> 32),
6335 Uint32(bucket->m_max_acked_gci),
6336 Uint32(pos.m_max_gci >> 32),
6337 Uint32(pos.m_max_gci));
6341 Uint64 min= bucket->m_max_acked_gci + 1;
6342 Uint64 max = m_max_seen_gci;
6344 ndbrequire(max <= m_max_seen_gci);
6348 ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
6349 m_active_buckets.
set(buck);
6350 m_gcp_complete_rep_count ++;
6351 ndbout_c(
"empty bucket (%u/%u %u/%u) -> active",
6352 Uint32(min >> 32), Uint32(min),
6353 Uint32(max >> 32), Uint32(max));
6358 bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
6359 bucket->m_switchover_node = get_responsible_node(buck);
6360 bucket->m_switchover_gci = max;
6362 m_switchover_buckets.
set(buck);
6364 signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6365 signal->theData[1] = buck;
6366 signal->theData[2] = (Uint32)(min >> 32);
6367 signal->theData[3] = 0;
6368 signal->theData[4] = 0;
6369 signal->theData[5] = (Uint32)(min & 0xFFFFFFFF);
6370 signal->theData[6] = 0;
6371 sendSignal(reference(), GSN_CONTINUEB, signal, 7, JBB);
6373 ndbout_c(
"min: %u/%u - max: %u/%u) page: %d",
6374 Uint32(min >> 32), Uint32(min), Uint32(max >> 32), Uint32(max),
6375 bucket->m_buffer_tail);
6376 ndbrequire(max >= min);
6380 Suma::resend_bucket(
Signal* signal, Uint32 buck, Uint64 min_gci,
6381 Uint32 pos, Uint64 last_gci)
6383 Bucket* bucket= c_buckets+buck;
6384 Uint32 tail= bucket->m_buffer_tail;
6386 Buffer_page* page= c_page_pool.
getPtr(tail);
6387 Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6388 Uint32 next_page = page->m_next_page;
6389 Uint32 *ptr = page->m_data + pos;
6390 Uint32 *end = page->m_data + page->m_words_used;
6393 ndbrequire(tail != RNIL);
6395 if(tail == bucket->m_buffer_head.m_page_id)
6397 max_gci= bucket->m_buffer_head.m_max_gci;
6398 end= page->m_data + bucket->m_buffer_head.m_page_pos;
6407 else if(pos == 0 && min_gci > max_gci)
6409 free_page(tail, page);
6410 tail = bucket->m_buffer_tail = next_page;
6415 for(Uint32 i = 0; i<page->m_words_used; i++)
6417 printf(
"%.8x ", page->m_data[i]);
6418 if(((i + 1) % 8) == 0)
6427 Uint32 tmp = * src++;
6428 Uint32 sz = tmp & 0xFFFF;
6432 if(! (tmp & (0x8000 << 16)))
6434 ndbrequire(sz >= Buffer_page::GCI_SZ32);
6435 sz -= Buffer_page::GCI_SZ32;
6436 Uint32 last_gci_hi = * src++;
6437 Uint32 last_gci_lo = * src++;
6438 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
6442 ndbrequire(ptr - sz > page->m_data);
6445 if(last_gci < min_gci)
6456 rep->gci_hi = (Uint32)(last_gci >> 32);
6457 rep->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6458 rep->flags = (m_missing_data)
6459 ? SubGcpCompleteRep::MISSING_DATA
6461 rep->senderRef = reference();
6462 rep->gcp_complete_rep_count = 1;
6464 if (ERROR_INSERTED(13036))
6467 CLEAR_ERROR_INSERT_VALUE;
6468 ndbout_c(
"Simulating out of event buffer at node failure");
6469 rep->flags |= SubGcpCompleteRep::MISSING_DATA;
6473 c_subscriber_nodes.
getText(buf);
6476 ndbout_c(
"resending GCI: %u/%u rows: %d -> %s",
6477 Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf);
6482 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
6483 SubGcpCompleteRep::SignalLength, JBB);
6487 const uint buffer_header_sz = 6;
6489 Uint32 subPtrI = * src++ ;
6490 Uint32 schemaVersion = * src++;
6491 Uint32
event = * src >> 16;
6492 Uint32 sz_1 = (* src ++) & 0xFFFF;
6493 Uint32 any_value = * src++;
6494 Uint32 transId1 = * src++;
6495 Uint32 transId2 = * src++;
6497 ndbassert(sz - buffer_header_sz >= sz_1);
6500 const Uint32 nptr= reformat(signal, ptr,
6502 src + sz_1, sz - buffer_header_sz - sz_1);
6504 for(Uint32 i =0; i < nptr; i++)
6511 c_subscriptionPool.
getPtr(subPtr, subPtrI);
6513 c_tablePool.
getPtr(tabPtr, subPtr.p->m_table_ptrI);
6515 if (table_version_major(tabPtr.p->m_schemaVersion) ==
6516 table_version_major(schemaVersion))
6519 data->gci_hi = (Uint32)(last_gci >> 32);
6520 data->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6521 data->tableId =
table;
6522 data->requestInfo = 0;
6523 SubTableData::setOperation(data->requestInfo, event);
6525 data->anyValue = any_value;
6526 data->totalLen = ptrLen;
6527 data->transId1 = transId1;
6528 data->transId2 = transId2;
6532 subPtr.p->m_subscribers);
6533 SubscriberPtr subbPtr;
6534 for(list.
first(subbPtr); !subbPtr.isNull(); list.
next(subbPtr))
6536 data->senderData = subbPtr.p->m_senderData;
6537 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
6538 SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
6547 if(ptr == end && (tail != bucket->m_buffer_head.m_page_id))
6552 free_page(tail, page);
6553 tail = bucket->m_buffer_tail = next_page;
6559 pos = Uint32(ptr - page->m_data);
6565 bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
6566 ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
6567 ndbout_c(
"resend done...");
6571 signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6572 signal->theData[1] = buck;
6573 signal->theData[2] = (Uint32)(min_gci >> 32);
6574 signal->theData[3] = pos;
6575 signal->theData[4] = (Uint32)(last_gci >> 32);
6576 signal->theData[5] = (Uint32)(min_gci & 0xFFFFFFFF);
6577 signal->theData[6] = (Uint32)(last_gci & 0xFFFFFFFF);
6579 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 7, JBB);
6581 sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 7);
6585 Suma::execGCP_PREPARE(
Signal *signal)
6589 m_current_gci = prep->gci_lo | (Uint64(prep->gci_hi) << 32);
6593 Suma::get_current_gci(
Signal*)
6595 return m_current_gci;
6599 Suma::execCREATE_NODEGROUP_IMPL_REQ(
Signal* signal)
6602 signal->getDataPtr();
6606 Uint32 rt = req->requestType;
6609 for (Uint32 i = 0; i<NDB_ARRAY_SIZE(req->nodes) && req->nodes[
i]; i++)
6611 tmp.
set(req->nodes[i]);
6613 Uint32 cnt = tmp.
count();
6614 Uint32
group = req->nodegroupId;
6617 case CreateNodegroupImplReq::RT_ABORT:
6620 case CreateNodegroupImplReq::RT_PARSE:
6623 case CreateNodegroupImplReq::RT_PREPARE:
6626 case CreateNodegroupImplReq::RT_COMMIT:
6629 case CreateNodegroupImplReq::RT_COMPLETE:
6631 CRASH_INSERTION(13043);
6633 Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6634 ndbrequire(gci > m_last_complete_gci);
6637 if (c_nodeGroup != RNIL)
6641 check.
bitAND(c_nodes_in_nodegroup_mask);
6643 ndbrequire(c_nodeGroup != group);
6644 ndbrequire(cnt == c_nodes_in_nodegroup_mask.
count());
6645 state = Bucket::BUCKET_CREATED_OTHER;
6647 else if (tmp.
get(getOwnNodeId()))
6650 c_nodeGroup =
group;
6651 c_nodes_in_nodegroup_mask.
assign(tmp);
6653 state = Bucket::BUCKET_CREATED_SELF;
6657 for (Uint32 i = 0; i<c_no_of_buckets; i++)
6660 m_switchover_buckets.
set(i);
6661 c_buckets[
i].m_switchover_gci = gci - 1;
6662 c_buckets[
i].m_state = state | (c_no_of_buckets << 8);
6670 conf->senderRef = reference();
6671 conf->senderData = req->senderData;
6672 sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_CONF, signal,
6673 CreateNodegroupImplConf::SignalLength, JBB);
6680 ref->senderRef = reference();
6681 ref->senderData = req->senderData;
6682 ref->errorCode = err;
6683 sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_REF, signal,
6684 CreateNodegroupImplRef::SignalLength, JBB);
6689 Suma::execDROP_NODEGROUP_IMPL_REQ(
Signal* signal)
6692 signal->getDataPtr();
6696 Uint32 rt = req->requestType;
6697 Uint32 group = req->nodegroupId;
6700 case DropNodegroupImplReq::RT_ABORT:
6703 case DropNodegroupImplReq::RT_PARSE:
6706 case DropNodegroupImplReq::RT_PREPARE:
6709 case DropNodegroupImplReq::RT_COMMIT:
6712 case DropNodegroupImplReq::RT_COMPLETE:
6714 CRASH_INSERTION(13043);
6716 Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6717 ndbrequire(gci > m_last_complete_gci);
6720 if (c_nodeGroup != group)
6723 state = Bucket::BUCKET_DROPPED_OTHER;
6729 state = Bucket::BUCKET_DROPPED_SELF;
6732 for (Uint32 i = 0; i<c_no_of_buckets; i++)
6735 m_switchover_buckets.
set(i);
6736 if (c_buckets[i].m_state != 0)
6738 jamLine(c_buckets[i].m_state);
6739 ndbout_c(
"c_buckets[%u].m_state: %u", i, c_buckets[i].m_state);
6741 ndbrequire(c_buckets[i].m_state == 0);
6742 c_buckets[
i].m_switchover_gci = gci - 1;
6743 c_buckets[
i].m_state = state | (c_no_of_buckets << 8);
6751 conf->senderRef = reference();
6752 conf->senderData = req->senderData;
6753 sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_CONF, signal,
6754 DropNodegroupImplConf::SignalLength, JBB);
6761 ref->senderRef = reference();
6762 ref->senderData = req->senderData;
6763 ref->errorCode = err;
6764 sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_REF, signal,
6765 DropNodegroupImplRef::SignalLength, JBB);