MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DblqhProxy.cpp
1 /* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
15 
16 #include "DblqhProxy.hpp"
17 #include "Dblqh.hpp"
18 #include "DblqhCommon.hpp"
19 
20 #include <signaldata/StartFragReq.hpp>
21 #include <signaldata/ExecFragReq.hpp>
22 
23 DblqhProxy::DblqhProxy(Block_context& ctx) :
24  LocalProxy(DBLQH, ctx),
25  c_tableRecSize(0),
26  c_tableRec(0)
27 {
28  // GSN_CREATE_TAB_REQ
29  addRecSignal(GSN_CREATE_TAB_REQ, &DblqhProxy::execCREATE_TAB_REQ);
30  addRecSignal(GSN_CREATE_TAB_CONF, &DblqhProxy::execCREATE_TAB_CONF);
31  addRecSignal(GSN_CREATE_TAB_REF, &DblqhProxy::execCREATE_TAB_REF);
32 
33  // GSN_LQHADDATTREQ
34  addRecSignal(GSN_LQHADDATTREQ, &DblqhProxy::execLQHADDATTREQ);
35  addRecSignal(GSN_LQHADDATTCONF, &DblqhProxy::execLQHADDATTCONF);
36  addRecSignal(GSN_LQHADDATTREF, &DblqhProxy::execLQHADDATTREF);
37 
38  // GSN_LQHFRAGREQ
39  addRecSignal(GSN_LQHFRAGREQ, &DblqhProxy::execLQHFRAGREQ);
40 
41  // GSN_TAB_COMMITREQ
42  addRecSignal(GSN_TAB_COMMITREQ, &DblqhProxy::execTAB_COMMITREQ);
43  addRecSignal(GSN_TAB_COMMITCONF, &DblqhProxy::execTAB_COMMITCONF);
44  addRecSignal(GSN_TAB_COMMITREF, &DblqhProxy::execTAB_COMMITREF);
45 
46  // GSN_LCP_FRAG_ORD
47  addRecSignal(GSN_LCP_FRAG_ORD, &DblqhProxy::execLCP_FRAG_ORD);
48  addRecSignal(GSN_LCP_FRAG_REP, &DblqhProxy::execLCP_FRAG_REP);
49  addRecSignal(GSN_END_LCP_CONF, &DblqhProxy::execEND_LCP_CONF);
50  addRecSignal(GSN_LCP_COMPLETE_REP, &DblqhProxy::execLCP_COMPLETE_REP);
51 
52  addRecSignal(GSN_EMPTY_LCP_REQ, &DblqhProxy::execEMPTY_LCP_REQ);
53 
54  // GSN_GCP_SAVEREQ
55  addRecSignal(GSN_GCP_SAVEREQ, &DblqhProxy::execGCP_SAVEREQ);
56  addRecSignal(GSN_GCP_SAVECONF, &DblqhProxy::execGCP_SAVECONF);
57  addRecSignal(GSN_GCP_SAVEREF, &DblqhProxy::execGCP_SAVEREF);
58 
59  // GSN_PREP_DROP_TAB_REQ
60  addRecSignal(GSN_PREP_DROP_TAB_REQ, &DblqhProxy::execPREP_DROP_TAB_REQ);
61  addRecSignal(GSN_PREP_DROP_TAB_CONF, &DblqhProxy::execPREP_DROP_TAB_CONF);
62  addRecSignal(GSN_PREP_DROP_TAB_REF, &DblqhProxy::execPREP_DROP_TAB_REF);
63 
64  // GSN_DROP_TAB_REQ
65  addRecSignal(GSN_DROP_TAB_REQ, &DblqhProxy::execDROP_TAB_REQ);
66  addRecSignal(GSN_DROP_TAB_CONF, &DblqhProxy::execDROP_TAB_CONF);
67  addRecSignal(GSN_DROP_TAB_REF, &DblqhProxy::execDROP_TAB_REF);
68 
69  // GSN_ALTER_TAB_REQ
70  addRecSignal(GSN_ALTER_TAB_REQ, &DblqhProxy::execALTER_TAB_REQ);
71  addRecSignal(GSN_ALTER_TAB_CONF, &DblqhProxy::execALTER_TAB_CONF);
72  addRecSignal(GSN_ALTER_TAB_REF, &DblqhProxy::execALTER_TAB_REF);
73 
74  // GSN_START_FRAGREQ
75  addRecSignal(GSN_START_FRAGREQ, &DblqhProxy::execSTART_FRAGREQ);
76 
77  // GSN_START_RECREQ
78  addRecSignal(GSN_START_RECREQ, &DblqhProxy::execSTART_RECREQ);
79  addRecSignal(GSN_START_RECCONF, &DblqhProxy::execSTART_RECCONF);
80 
81  // GSN_LQH_TRANSREQ
82  addRecSignal(GSN_LQH_TRANSREQ, &DblqhProxy::execLQH_TRANSREQ);
83  addRecSignal(GSN_LQH_TRANSCONF, &DblqhProxy::execLQH_TRANSCONF);
84 
85  // GSN_SUB_GCP_COMPLETE_REP
86  addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
87 
88  // GSN_EXEC_SRREQ
89  addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
90  addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
91 
92  // GSN_EXEC_FRAG
93  addRecSignal(GSN_EXEC_FRAGREQ, &DblqhProxy::execEXEC_FRAGREQ);
94  addRecSignal(GSN_EXEC_FRAGCONF, &DblqhProxy::execEXEC_FRAGCONF);
95 
96  // GSN_DROP_FRAG_REQ
97  addRecSignal(GSN_DROP_FRAG_REQ, &DblqhProxy::execDROP_FRAG_REQ);
98  addRecSignal(GSN_DROP_FRAG_CONF, &DblqhProxy::execDROP_FRAG_CONF);
99  addRecSignal(GSN_DROP_FRAG_REF, &DblqhProxy::execDROP_FRAG_REF);
100 
101 }
102 
103 DblqhProxy::~DblqhProxy()
104 {
105 }
106 
108 DblqhProxy::newWorker(Uint32 instanceNo)
109 {
110  return new Dblqh(m_ctx, instanceNo);
111 }
112 
113 // GSN_NDB_STTOR
114 
115 void
116 DblqhProxy::callNDB_STTOR(Signal* signal)
117 {
118  Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
119  ndbrequire(ss.m_gsn == 0);
120 
121  const Uint32 startPhase = signal->theData[2];
122  switch (startPhase) {
123  case 3:
124  ss.m_gsn = GSN_NDB_STTOR;
125  sendREAD_NODESREQ(signal);
126  break;
127  default:
128  backNDB_STTOR(signal);
129  break;
130  }
131 }
132 
133 // GSN_READ_CONFIG_REQ
134 void
135 DblqhProxy::callREAD_CONFIG_REQ(Signal* signal)
136 {
137  const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
138  ndbrequire(req->noOfParameters == 0);
139 
140  const ndb_mgm_configuration_iterator * p =
141  m_ctx.m_config.getOwnConfigIterator();
142  ndbrequire(p != 0);
143 
144  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_TABLE, &c_tableRecSize));
145  c_tableRec = (Uint8*)allocRecord("TableRec", sizeof(Uint8), c_tableRecSize);
146  D("proxy:" << V(c_tableRecSize));
147  Uint32 i;
148  for (i = 0; i < c_tableRecSize; i++)
149  c_tableRec[i] = 0;
150  backREAD_CONFIG_REQ(signal);
151 }
152 
153 // GSN_CREATE_TAB_REQ
154 
155 // there is no consistent LQH connect pointer to use as ssId
156 
157 void
158 DblqhProxy::execCREATE_TAB_REQ(Signal* signal)
159 {
160  Ss_CREATE_TAB_REQ& ss = ssSeize<Ss_CREATE_TAB_REQ>(1);
161 
162  const CreateTabReq* req = (const CreateTabReq*)signal->getDataPtr();
163  ss.m_req = *req;
164  ndbrequire(signal->getLength() == CreateTabReq::SignalLengthLDM);
165 
166  sendREQ(signal, ss);
167 }
168 
169 void
170 DblqhProxy::sendCREATE_TAB_REQ(Signal* signal, Uint32 ssId,
171  SectionHandle* handle)
172 {
173  Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
174 
175  CreateTabReq* req = (CreateTabReq*)signal->getDataPtrSend();
176  *req = ss.m_req;
177  req->senderRef = reference();
178  req->senderData = ssId;
179  sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TAB_REQ,
180  signal, CreateTabReq::SignalLengthLDM, JBB, handle);
181 }
182 
183 void
184 DblqhProxy::execCREATE_TAB_CONF(Signal* signal)
185 {
186  const CreateTabConf* conf = (const CreateTabConf*)signal->getDataPtr();
187  Uint32 ssId = conf->senderData;
188  Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
189  recvCONF(signal, ss);
190 }
191 
192 void
193 DblqhProxy::execCREATE_TAB_REF(Signal* signal)
194 {
195  const CreateTabRef* ref = (const CreateTabRef*)signal->getDataPtr();
196  Uint32 ssId = ref->senderData;
197  Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
198  recvREF(signal, ss, ref->errorCode);
199 }
200 
201 void
202 DblqhProxy::sendCREATE_TAB_CONF(Signal* signal, Uint32 ssId)
203 {
204  Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
205  BlockReference dictRef = ss.m_req.senderRef;
206 
207  {
208  const CreateTabConf* conf = (const CreateTabConf*)signal->getDataPtr();
209  ss.m_lqhConnectPtr[ss.m_worker] = conf->lqhConnectPtr;
210  }
211 
212  if (!lastReply(ss))
213  return;
214 
215  if (ss.m_error == 0) {
216  jam();
217  CreateTabConf* conf = (CreateTabConf*)signal->getDataPtrSend();
218  conf->senderRef = reference();
219  conf->senderData = ss.m_req.senderData;
220  conf->lqhConnectPtr = ssId;
221  sendSignal(dictRef, GSN_CREATE_TAB_CONF,
222  signal, CreateTabConf::SignalLength, JBB);
223 
224  // inform DBTUP proxy
225  CreateTabReq* req = (CreateTabReq*)signal->getDataPtrSend();
226  *req = ss.m_req;
227  EXECUTE_DIRECT(DBTUP, GSN_CREATE_TAB_REQ,
228  signal, CreateTabReq::SignalLength);
229 
230  Uint32 tableId = ss.m_req.tableId;
231  ndbrequire(tableId < c_tableRecSize);
232  c_tableRec[tableId] = 1;
233  } else {
234  CreateTabRef* ref = (CreateTabRef*)signal->getDataPtrSend();
235  ref->senderRef = reference();
236  ref->senderData = ss.m_req.senderData;
237  ref->errorCode = ss.m_error;
238  ref->errorLine = 0;
239  ref->errorKey = 0;
240  ref->errorStatus = 0;
241  sendSignal(dictRef, GSN_CREATE_TAB_REF,
242  signal, CreateTabRef::SignalLength, JBB);
243  ssRelease<Ss_CREATE_TAB_REQ>(ssId);
244  }
245 }
246 
247 // GSN_LQHADDATTREQ [ sub-op ]
248 
249 void
251 {
252  const LqhAddAttrReq* req = (const LqhAddAttrReq*)signal->getDataPtr();
253  Uint32 ssId = req->lqhFragPtr;
254  Ss_LQHADDATTREQ& ss = ssSeize<Ss_LQHADDATTREQ>(ssId);
255 
256  const Uint32 reqlength =
257  LqhAddAttrReq::HeaderLength +
258  req->noOfAttributes * LqhAddAttrReq::EntryLength;
259  ndbrequire(signal->getLength() == reqlength);
260  memcpy(&ss.m_req, req, reqlength << 2);
261  ss.m_reqlength = reqlength;
262 
268  Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
269  ndbrequire(ss_main.m_req.noOfAttributes >= req->noOfAttributes);
270  ss_main.m_req.noOfAttributes -= req->noOfAttributes;
271 
272  /* Save long section(s) in ss for forwarding to
273  * workers
274  */
275  SectionHandle handle(this, signal);
276  saveSections(ss, handle);
277 
278  sendREQ(signal, ss);
279 }
280 
281 void
282 DblqhProxy::sendLQHADDATTREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
283 {
284  Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
285  Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
286 
287  LqhAddAttrReq* req = (LqhAddAttrReq*)signal->getDataPtrSend();
288  const Uint32 reqlength = ss.m_reqlength;
289  memcpy(req, &ss.m_req, reqlength << 2);
290  req->lqhFragPtr = ss_main.m_lqhConnectPtr[ss.m_worker];
291  req->noOfAttributes = ss.m_req.noOfAttributes;
292  req->senderData = ssId;
293  req->senderAttrPtr = ss.m_req.senderAttrPtr;
294  sendSignalNoRelease(workerRef(ss.m_worker), GSN_LQHADDATTREQ,
295  signal, reqlength, JBB, handle);
296 }
297 
298 void
299 DblqhProxy::execLQHADDATTCONF(Signal* signal)
300 {
301  const LqhAddAttrConf* conf = (const LqhAddAttrConf*)signal->getDataPtr();
302  Uint32 ssId = conf->senderData;
303  Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
304  recvCONF(signal, ss);
305 }
306 
307 void
308 DblqhProxy::execLQHADDATTREF(Signal* signal)
309 {
310  const LqhAddAttrRef* ref = (const LqhAddAttrRef*)signal->getDataPtr();
311  Uint32 ssId = ref->senderData;
312  Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
313  recvREF(signal, ss, ref->errorCode);
314 }
315 
316 void
318 {
319  Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
320  Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
321  BlockReference dictRef = ss_main.m_req.senderRef;
322 
323  if (!lastReply(ss))
324  return;
325 
326  if (ss.m_error == 0)
327  {
328  jam();
329  LqhAddAttrConf* conf = (LqhAddAttrConf*)signal->getDataPtrSend();
330  conf->senderData = ss.m_req.senderData;
331  conf->senderAttrPtr = ss.m_req.senderAttrPtr;
332  sendSignal(dictRef, GSN_LQHADDATTCONF,
333  signal, LqhAddAttrConf::SignalLength, JBB);
334 
335  if (ss_main.m_req.noOfAttributes == 0)
336  {
337  jam();
342  ssRelease<Ss_CREATE_TAB_REQ>(ssId);
343  }
344  }
345  else
346  {
347  jam();
348  LqhAddAttrRef* ref = (LqhAddAttrRef*)signal->getDataPtrSend();
349  ref->senderData = ss.m_req.senderData;
350  ref->errorCode = ss.m_error;
351  sendSignal(dictRef, GSN_LQHADDATTREF,
352  signal, LqhAddAttrRef::SignalLength, JBB);
353  ssRelease<Ss_CREATE_TAB_REQ>(ssId);
354  }
355 
356  ssRelease<Ss_LQHADDATTREQ>(ssId);
357 }
358 
359 // GSN_LQHFRAGREQ [ pass-through ]
360 
361 void
362 DblqhProxy::execLQHFRAGREQ(Signal* signal)
363 {
364  LqhFragReq* req = (LqhFragReq*)signal->getDataPtrSend();
365  Uint32 instance = getInstanceKey(req->tableId, req->fragId);
366 
367  // wl4391_todo impl. method that fakes senders block-ref
368  sendSignal(numberToRef(DBLQH, instance, getOwnNodeId()),
369  GSN_LQHFRAGREQ, signal, signal->getLength(), JBB);
370 }
371 
372 // GSN_TAB_COMMITREQ [ sub-op ]
373 
374 void
375 DblqhProxy::execTAB_COMMITREQ(Signal* signal)
376 {
377  Ss_TAB_COMMITREQ& ss = ssSeize<Ss_TAB_COMMITREQ>(1); // lost connection
378 
379  const TabCommitReq* req = (const TabCommitReq*)signal->getDataPtr();
380  ss.m_req = *req;
381  sendREQ(signal, ss);
382 }
383 
384 void
385 DblqhProxy::sendTAB_COMMITREQ(Signal* signal, Uint32 ssId,
386  SectionHandle* handle)
387 {
388  Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
389 
390  TabCommitReq* req = (TabCommitReq*)signal->getDataPtrSend();
391  req->senderRef = reference();
392  req->senderData = ssId;
393  req->tableId = ss.m_req.tableId;
394  sendSignalNoRelease(workerRef(ss.m_worker), GSN_TAB_COMMITREQ,
395  signal, TabCommitReq::SignalLength, JBB, handle);
396 }
397 
398 void
399 DblqhProxy::execTAB_COMMITCONF(Signal* signal)
400 {
401  const TabCommitConf* conf = (TabCommitConf*)signal->getDataPtr();
402  Uint32 ssId = conf->senderData;
403  Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
404  recvCONF(signal, ss);
405 }
406 
407 void
408 DblqhProxy::execTAB_COMMITREF(Signal* signal)
409 {
410  const TabCommitRef* ref = (TabCommitRef*)signal->getDataPtr();
411  Uint32 ssId = ref->senderData;
412  Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
413 
414  // wl4391_todo omit extra info now since DBDICT only does ndbrequire
415  recvREF(signal, ss, ref->errorCode);
416 }
417 
418 void
419 DblqhProxy::sendTAB_COMMITCONF(Signal* signal, Uint32 ssId)
420 {
421  Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
422  BlockReference dictRef = ss.m_req.senderRef;
423 
424  if (!lastReply(ss))
425  return;
426 
427  if (ss.m_error == 0) {
428  jam();
429  TabCommitConf* conf = (TabCommitConf*)signal->getDataPtrSend();
430  conf->senderData = ss.m_req.senderData;
431  conf->nodeId = getOwnNodeId();
432  conf->tableId = ss.m_req.tableId;
433  sendSignal(dictRef, GSN_TAB_COMMITCONF,
434  signal, TabCommitConf::SignalLength, JBB);
435  } else {
436  jam();
437  TabCommitRef* ref = (TabCommitRef*)signal->getDataPtrSend();
438  ref->senderData = ss.m_req.senderData;
439  ref->nodeId = getOwnNodeId();
440  ref->tableId = ss.m_req.tableId;
441  sendSignal(dictRef, GSN_TAB_COMMITREF,
442  signal, TabCommitRef::SignalLength, JBB);
443  return;
444  }
445 
446  ssRelease<Ss_TAB_COMMITREQ>(ssId);
447 }
448 
449 // LCP handling
450 
451 Uint32
452 DblqhProxy::getNoOfOutstanding(const LcpRecord & rec) const
453 {
454  ndbrequire(rec.m_lcp_frag_ord_cnt >= rec.m_lcp_frag_rep_cnt);
455  return rec.m_lcp_frag_ord_cnt - rec.m_lcp_frag_rep_cnt;
456 }
457 
458 void
460 {
461  ndbrequire(signal->getLength() == LcpFragOrd::SignalLength);
462 
463  const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
464  const LcpFragOrd req_copy = *req;
465 
466  bool lcp_complete_ord = req->lastFragmentFlag;
467 
468  if (c_lcpRecord.m_state == LcpRecord::L_IDLE)
469  {
470  jam();
471  D("LCP: start" << V(req->lcpId));
472  c_lcpRecord.m_state = LcpRecord::L_STARTING;
473  c_lcpRecord.m_lcpId = req->lcpId;
474  c_lcpRecord.m_lcp_frag_rep_cnt = 0;
475  c_lcpRecord.m_lcp_frag_ord_cnt = 0;
476  c_lcpRecord.m_complete_outstanding = 0;
477  c_lcpRecord.m_lastFragmentFlag = false;
478  c_lcpRecord.m_empty_lcp_req.clear();
479 
480  // handle start of LCP in PGMAN and TSMAN
481  LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
482  *req = req_copy;
483  EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD,
484  signal, LcpFragOrd::SignalLength);
485  *req = req_copy;
486  EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD,
487  signal, LcpFragOrd::SignalLength);
488 
489  c_lcpRecord.m_state = LcpRecord::L_RUNNING;
490  }
491 
492  jam();
493  D("LCP: continue" << V(req->lcpId) << V(c_lcpRecord.m_lcp_frag_ord_cnt));
494  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_RUNNING);
495  ndbrequire(c_lcpRecord.m_lcpId == req->lcpId);
496 
497  if (lcp_complete_ord)
498  {
499  jam();
500  c_lcpRecord.m_lastFragmentFlag = true;
501  if (getNoOfOutstanding(c_lcpRecord) == 0)
502  {
503  jam();
504  completeLCP_1(signal);
505  return;
506  }
507 
511  return;
512  }
513  else
514  {
515  jam();
516  c_lcpRecord.m_last_lcp_frag_ord = req_copy;
517  }
518 
519  c_lcpRecord.m_lcp_frag_ord_cnt++;
520 
521  // Forward
522  ndbrequire(req->tableId < c_tableRecSize);
523  if (c_tableRec[req->tableId] == 0)
524  {
525  jam();
529  sendSignal(workerRef(0),
530  GSN_LCP_FRAG_ORD, signal, LcpFragOrd::SignalLength, JBB);
531  }
532  else
533  {
534  jam();
535  Uint32 instance = getInstanceKey(req->tableId, req->fragmentId);
536  sendSignal(numberToRef(DBLQH, instance, getOwnNodeId()),
537  GSN_LCP_FRAG_ORD, signal, LcpFragOrd::SignalLength, JBB);
538  }
539 }
540 
541 void
543 {
544  ndbrequire(signal->getLength() == LcpFragRep::SignalLength);
545 
546  LcpFragRep* conf = (LcpFragRep*)signal->getDataPtr();
547 
548  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_RUNNING);
549  ndbrequire(c_lcpRecord.m_lcpId == conf->lcpId);
550 
551  c_lcpRecord.m_lcp_frag_rep_cnt++;
552  D("LCP: rep" << V(conf->lcpId) << V(c_lcpRecord.m_lcp_frag_rep_cnt));
553 
558  conf->nodeId = LcpFragRep::BROADCAST_REQ;
559  sendSignal(DBDIH_REF, GSN_LCP_FRAG_REP,
560  signal, LcpFragRep::SignalLength, JBB);
561 
562  if (c_lcpRecord.m_lastFragmentFlag)
563  {
564  jam();
568  if (getNoOfOutstanding(c_lcpRecord) == 0)
569  {
570  jam();
571  /*
572  * and we have all fragments has been processed
573  */
574  completeLCP_1(signal);
575  }
576  return;
577  }
578 
579  checkSendEMPTY_LCP_CONF(signal);
580 }
581 
582 void
584 {
585  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_RUNNING);
586  c_lcpRecord.m_state = LcpRecord::L_COMPLETING_1;
587  ndbrequire(c_lcpRecord.m_complete_outstanding == 0);
588 
594  LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtrSend();
595  ord->lcpId = c_lcpRecord.m_lcpId;
596  ord->lastFragmentFlag = true;
597  for (Uint32 i = 0; i<c_workers; i++)
598  {
599  jam();
600  c_lcpRecord.m_complete_outstanding++;
601  sendSignal(workerRef(i), GSN_LCP_FRAG_ORD, signal,
602  LcpFragOrd::SignalLength, JBB);
603  }
604 
609  EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
610  req->senderData= 0;
611  req->senderRef= reference();
612  req->backupPtr= 0;
613  req->backupId= c_lcpRecord.m_lcpId;
614  for (Uint32 i = 0; i<c_workers; i++)
615  {
616  jam();
617  c_lcpRecord.m_complete_outstanding++;
618  sendSignal(numberToRef(PGMAN, workerInstance(i), getOwnNodeId()),
619  GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, JBB);
620  }
621 }
622 
623 void
624 DblqhProxy::execLCP_COMPLETE_REP(Signal* signal)
625 {
626  jamEntry();
627  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1);
628  ndbrequire(c_lcpRecord.m_complete_outstanding);
629  c_lcpRecord.m_complete_outstanding--;
630 
631  if (c_lcpRecord.m_complete_outstanding == 0)
632  {
633  jam();
634  completeLCP_2(signal);
635  return;
636  }
637 }
638 
639 void
640 DblqhProxy::execEND_LCP_CONF(Signal* signal)
641 {
642  jamEntry();
643  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1 ||
644  c_lcpRecord.m_state == LcpRecord::L_COMPLETING_2 ||
645  c_lcpRecord.m_state == LcpRecord::L_COMPLETING_3);
646 
647  ndbrequire(c_lcpRecord.m_complete_outstanding);
648  c_lcpRecord.m_complete_outstanding--;
649 
650  if (c_lcpRecord.m_complete_outstanding == 0)
651  {
652  jam();
653  if (c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1)
654  {
655  jam();
656  completeLCP_2(signal);
657  return;
658  }
659  else if (c_lcpRecord.m_state == LcpRecord::L_COMPLETING_2)
660  {
661  jam();
662  completeLCP_3(signal);
663  return;
664  }
665  else
666  {
667  jam();
668  sendLCP_COMPLETE_REP(signal);
669  return;
670  }
671  }
672 }
673 
674 void
676 {
677  jamEntry();
678  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_1);
679  c_lcpRecord.m_state = LcpRecord::L_COMPLETING_2;
680 
681  EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
682  req->senderData= 0;
683  req->senderRef= reference();
684  req->backupPtr= 0;
685  req->backupId= c_lcpRecord.m_lcpId;
686  c_lcpRecord.m_complete_outstanding++;
687 
692  // NOTE: ugly to use MaxLqhWorkers directly
693  Uint32 instance = MaxLqhWorkers + 1;
694  sendSignal(numberToRef(PGMAN, instance, getOwnNodeId()),
695  GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, JBB);
696 }
697 
698 
699 void
701 {
702  jamEntry();
703  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_2);
704  c_lcpRecord.m_state = LcpRecord::L_COMPLETING_3;
705 
710  EndLcpReq* req = (EndLcpReq*)signal->getDataPtrSend();
711  req->senderData= 0;
712  req->senderRef= reference();
713  req->backupPtr= 0;
714  req->backupId= c_lcpRecord.m_lcpId;
715 
716  // no reply from this
717  sendSignal(TSMAN_REF, GSN_END_LCP_REQ, signal,
718  EndLcpReq::SignalLength, JBB);
719 
720  if (c_lcpRecord.m_lcp_frag_rep_cnt)
721  {
722  jam();
723  c_lcpRecord.m_complete_outstanding++;
724  sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal,
725  EndLcpReq::SignalLength, JBB);
726  }
727  else
728  {
729  jam();
736  sendLCP_COMPLETE_REP(signal);
737  }
738 }
739 
740 void
741 DblqhProxy::sendLCP_COMPLETE_REP(Signal* signal)
742 {
743  ndbrequire(c_lcpRecord.m_state == LcpRecord::L_COMPLETING_3);
744 
745  LcpCompleteRep* conf = (LcpCompleteRep*)signal->getDataPtrSend();
746  conf->nodeId = LcpFragRep::BROADCAST_REQ;
747  conf->blockNo = DBLQH;
748  conf->lcpId = c_lcpRecord.m_lcpId;
749  sendSignal(DBDIH_REF, GSN_LCP_COMPLETE_REP,
750  signal, LcpCompleteRep::SignalLength, JBB);
751 
752  c_lcpRecord.m_state = LcpRecord::L_IDLE;
753  checkSendEMPTY_LCP_CONF(signal);
754 }
755 
756 void
757 DblqhProxy::execEMPTY_LCP_REQ(Signal* signal)
758 {
759  jam();
760 
761  CRASH_INSERTION(5008);
762 
763  EmptyLcpReq * const req = (EmptyLcpReq*)&signal->theData[0];
764  Uint32 nodeId = refToNode(req->senderRef);
765  c_lcpRecord.m_empty_lcp_req.set(nodeId);
766  checkSendEMPTY_LCP_CONF(signal);
767 }
768 
769 void
771 {
772  ndbrequire(!c_lcpRecord.m_empty_lcp_req.isclear());
773 
774  EmptyLcpRep * rep = (EmptyLcpRep*)signal->getDataPtrSend();
775  EmptyLcpConf * conf = (EmptyLcpConf*)rep->conf;
776 
777  switch(c_lcpRecord.m_state){
778  case LcpRecord::L_IDLE:
779  jam();
780  conf->idle = true;
781  break;
782  case LcpRecord::L_STARTING:
783  jam();
784  return;
785  case LcpRecord::L_RUNNING:{
786  jam();
787  if (getNoOfOutstanding(c_lcpRecord) == 0)
788  {
789  jam();
794  conf->tableId = c_lcpRecord.m_last_lcp_frag_ord.tableId;
795  conf->fragmentId = c_lcpRecord.m_last_lcp_frag_ord.fragmentId;
796  conf->lcpId = c_lcpRecord.m_last_lcp_frag_ord.lcpId;
797  conf->lcpNo = c_lcpRecord.m_last_lcp_frag_ord.lcpNo;
798  break;
799  }
800  return;
801  }
802  case LcpRecord::L_COMPLETING_1:
803  jam();
804  case LcpRecord::L_COMPLETING_2:
805  jam();
806  case LcpRecord::L_COMPLETING_3:
807  jam();
808  return;
809  }
810 
811  conf->senderNodeId = getOwnNodeId();
812 
813  c_lcpRecord.m_empty_lcp_req.copyto(NdbNodeBitmask::Size, rep->receiverGroup);
814  sendSignal(DBDIH_REF, GSN_EMPTY_LCP_REP, signal,
815  EmptyLcpRep::SignalLength + EmptyLcpConf::SignalLength, JBB);
816 
817  c_lcpRecord.m_empty_lcp_req.clear();
818 }
819 
820 // GSN_GCP_SAVEREQ
821 
822 void
823 DblqhProxy::execGCP_SAVEREQ(Signal* signal)
824 {
825  const GCPSaveReq* req = (const GCPSaveReq*)signal->getDataPtr();
826  Uint32 ssId = getSsId(req);
827  Ss_GCP_SAVEREQ& ss = ssSeize<Ss_GCP_SAVEREQ>(ssId);
828  ss.m_req = *req;
829  sendREQ(signal, ss);
830 }
831 
832 void
833 DblqhProxy::sendGCP_SAVEREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
834 {
835  Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
836 
837  GCPSaveReq* req = (GCPSaveReq*)signal->getDataPtrSend();
838  *req = ss.m_req;
839 
840  req->dihBlockRef = reference();
841  req->dihPtr = ss.m_worker;
842  sendSignalNoRelease(workerRef(ss.m_worker), GSN_GCP_SAVEREQ,
843  signal, GCPSaveReq::SignalLength, JBB, handle);
844 }
845 
846 void
847 DblqhProxy::execGCP_SAVECONF(Signal* signal)
848 {
849  const GCPSaveConf* conf = (const GCPSaveConf*)signal->getDataPtr();
850  Uint32 ssId = getSsId(conf);
851  Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
852  recvCONF(signal, ss);
853 }
854 
855 void
856 DblqhProxy::execGCP_SAVEREF(Signal* signal)
857 {
858  const GCPSaveRef* ref = (const GCPSaveRef*)signal->getDataPtr();
859  Uint32 ssId = getSsId(ref);
860  Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
861 
862  if (ss.m_error != 0) {
863  // wl4391_todo check
864  ndbrequire(ss.m_error == ref->errorCode);
865  }
866  recvREF(signal, ss, ref->errorCode);
867 }
868 
869 void
870 DblqhProxy::sendGCP_SAVECONF(Signal* signal, Uint32 ssId)
871 {
872  Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
873 
874  if (!lastReply(ss))
875  return;
876 
877  if (ss.m_error == 0) {
878  GCPSaveConf* conf = (GCPSaveConf*)signal->getDataPtrSend();
879  conf->dihPtr = ss.m_req.dihPtr;
880  conf->nodeId = getOwnNodeId();
881  conf->gci = ss.m_req.gci;
882  sendSignal(ss.m_req.dihBlockRef, GSN_GCP_SAVECONF,
883  signal, GCPSaveConf::SignalLength, JBB);
884  } else {
885  jam();
886  GCPSaveRef* ref = (GCPSaveRef*)signal->getDataPtrSend();
887  ref->dihPtr = ss.m_req.dihPtr;
888  ref->nodeId = getOwnNodeId();
889  ref->gci = ss.m_req.gci;
890  ref->errorCode = ss.m_error;
891  sendSignal(ss.m_req.dihBlockRef, GSN_GCP_SAVEREF,
892  signal, GCPSaveRef::SignalLength, JBB);
893  }
894 
895  ssRelease<Ss_GCP_SAVEREQ>(ssId);
896 }
897 
898 // GSN_SUB_GCP_COMPLETE_REP
899 void
900 DblqhProxy::execSUB_GCP_COMPLETE_REP(Signal* signal)
901 {
902  jamEntry();
903  for (Uint32 i = 0; i<c_workers; i++)
904  {
905  jam();
906  sendSignal(workerRef(i), GSN_SUB_GCP_COMPLETE_REP, signal,
907  signal->getLength(), JBB);
908  }
909 }
910 
911 // GSN_PREP_DROP_TAB_REQ
912 
913 void
914 DblqhProxy::execPREP_DROP_TAB_REQ(Signal* signal)
915 {
916  const PrepDropTabReq* req = (const PrepDropTabReq*)signal->getDataPtr();
917  Uint32 ssId = getSsId(req);
918  Ss_PREP_DROP_TAB_REQ& ss = ssSeize<Ss_PREP_DROP_TAB_REQ>(ssId);
919  ss.m_req = *req;
920  ndbrequire(signal->getLength() == PrepDropTabReq::SignalLength);
921  sendREQ(signal, ss);
922 }
923 
924 void
925 DblqhProxy::sendPREP_DROP_TAB_REQ(Signal* signal, Uint32 ssId,
926  SectionHandle * handle)
927 {
928  Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
929 
930  PrepDropTabReq* req = (PrepDropTabReq*)signal->getDataPtrSend();
931  *req = ss.m_req;
932  req->senderRef = reference();
933  req->senderData = ssId; // redundant since tableId is used
934  sendSignalNoRelease(workerRef(ss.m_worker), GSN_PREP_DROP_TAB_REQ,
935  signal, PrepDropTabReq::SignalLength, JBB, handle);
936 }
937 
938 void
939 DblqhProxy::execPREP_DROP_TAB_CONF(Signal* signal)
940 {
941  const PrepDropTabConf* conf = (const PrepDropTabConf*)signal->getDataPtr();
942  Uint32 ssId = getSsId(conf);
943  Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
944  recvCONF(signal, ss);
945 }
946 
947 void
948 DblqhProxy::execPREP_DROP_TAB_REF(Signal* signal)
949 {
950  const PrepDropTabRef* ref = (const PrepDropTabRef*)signal->getDataPtr();
951  Uint32 ssId = getSsId(ref);
952  Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
953  recvREF(signal, ss, ref->errorCode);
954 }
955 
956 void
957 DblqhProxy::sendPREP_DROP_TAB_CONF(Signal* signal, Uint32 ssId)
958 {
959  Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
960  BlockReference dictRef = ss.m_req.senderRef;
961 
962  if (!lastReply(ss))
963  return;
964 
965  if (ss.m_error == 0) {
966  jam();
967  PrepDropTabConf* conf = (PrepDropTabConf*)signal->getDataPtrSend();
968  conf->senderRef = reference();
969  conf->senderData = ss.m_req.senderData;
970  conf->tableId = ss.m_req.tableId;
971  sendSignal(dictRef, GSN_PREP_DROP_TAB_CONF,
972  signal, PrepDropTabConf::SignalLength, JBB);
973  } else {
974  jam();
975  PrepDropTabRef* ref = (PrepDropTabRef*)signal->getDataPtrSend();
976  ref->senderRef = reference();
977  ref->senderData = ss.m_req.senderData;
978  ref->tableId = ss.m_req.tableId;
979  ref->errorCode = ss.m_error;
980  sendSignal(dictRef, GSN_PREP_DROP_TAB_REF,
981  signal, PrepDropTabRef::SignalLength, JBB);
982  }
983 
984  ssRelease<Ss_PREP_DROP_TAB_REQ>(ssId);
985 }
986 
987 // GSN_DROP_TAB_REQ
988 
989 void
990 DblqhProxy::execDROP_TAB_REQ(Signal* signal)
991 {
992  const DropTabReq* req = (const DropTabReq*)signal->getDataPtr();
993  Uint32 ssId = getSsId(req);
994  Ss_DROP_TAB_REQ& ss = ssSeize<Ss_DROP_TAB_REQ>(ssId);
995  ss.m_req = *req;
996  ndbrequire(signal->getLength() == DropTabReq::SignalLength);
997  sendREQ(signal, ss);
998 
999  Uint32 tableId = ss.m_req.tableId;
1000  ndbrequire(tableId < c_tableRecSize);
1001  c_tableRec[tableId] = 0;
1002 }
1003 
1004 void
1005 DblqhProxy::sendDROP_TAB_REQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
1006 {
1007  Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
1008 
1009  DropTabReq* req = (DropTabReq*)signal->getDataPtrSend();
1010  *req = ss.m_req;
1011  req->senderRef = reference();
1012  req->senderData = ssId; // redundant since tableId is used
1013  sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TAB_REQ,
1014  signal, DropTabReq::SignalLength, JBB, handle);
1015 }
1016 
1017 void
1018 DblqhProxy::execDROP_TAB_CONF(Signal* signal)
1019 {
1020  const DropTabConf* conf = (const DropTabConf*)signal->getDataPtr();
1021  Uint32 ssId = getSsId(conf);
1022  Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
1023  recvCONF(signal, ss);
1024 }
1025 
1026 void
1027 DblqhProxy::execDROP_TAB_REF(Signal* signal)
1028 {
1029  const DropTabRef* ref = (const DropTabRef*)signal->getDataPtr();
1030  Uint32 ssId = getSsId(ref);
1031  Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
1032  recvREF(signal, ss, ref->errorCode);
1033 }
1034 
1035 void
1036 DblqhProxy::sendDROP_TAB_CONF(Signal* signal, Uint32 ssId)
1037 {
1038  Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
1039  BlockReference dictRef = ss.m_req.senderRef;
1040 
1041  if (!lastReply(ss))
1042  return;
1043 
1044  if (ss.m_error == 0) {
1045  jam();
1046  DropTabConf* conf = (DropTabConf*)signal->getDataPtrSend();
1047  conf->senderRef = reference();
1048  conf->senderData = ss.m_req.senderData;
1049  conf->tableId = ss.m_req.tableId;
1050  sendSignal(dictRef, GSN_DROP_TAB_CONF,
1051  signal, DropTabConf::SignalLength, JBB);
1052 
1053  // inform DBTUP proxy
1054  DropTabReq* req = (DropTabReq*)signal->getDataPtrSend();
1055  *req = ss.m_req;
1056  EXECUTE_DIRECT(DBTUP, GSN_DROP_TAB_REQ,
1057  signal, DropTabReq::SignalLength);
1058  } else {
1059  jam();
1060  DropTabRef* ref = (DropTabRef*)signal->getDataPtrSend();
1061  ref->senderRef = reference();
1062  ref->senderData = ss.m_req.senderData;
1063  ref->tableId = ss.m_req.tableId;
1064  ref->errorCode = ss.m_error;
1065  sendSignal(dictRef, GSN_DROP_TAB_REF,
1066  signal, DropTabConf::SignalLength, JBB);
1067  }
1068 
1069  ssRelease<Ss_DROP_TAB_REQ>(ssId);
1070 }
1071 
1072 // GSN_ALTER_TAB_REQ
1073 
1074 void
1075 DblqhProxy::execALTER_TAB_REQ(Signal* signal)
1076 {
1077  jamEntry();
1078  if (!assembleFragments(signal))
1079  {
1080  jam();
1081  return;
1082  }
1083  const AlterTabReq* req = (const AlterTabReq*)signal->getDataPtr();
1084  Uint32 ssId = getSsId(req);
1085  Ss_ALTER_TAB_REQ& ss = ssSeize<Ss_ALTER_TAB_REQ>(ssId);
1086  ss.m_req = *req;
1087  ndbrequire(signal->getLength() == AlterTabReq::SignalLength);
1088 
1089  SectionHandle handle(this, signal);
1090  saveSections(ss, handle);
1091 
1092  sendREQ(signal, ss);
1093 }
1094 
1095 void
1096 DblqhProxy::sendALTER_TAB_REQ(Signal* signal, Uint32 ssId,
1097  SectionHandle* handle)
1098 {
1099  Ss_ALTER_TAB_REQ& ss = ssFind<Ss_ALTER_TAB_REQ>(ssId);
1100 
1101  AlterTabReq* req = (AlterTabReq*)signal->getDataPtrSend();
1102  *req = ss.m_req;
1103  req->senderRef = reference();
1104  req->senderData = ssId;
1105  sendSignalNoRelease(workerRef(ss.m_worker), GSN_ALTER_TAB_REQ,
1106  signal, AlterTabReq::SignalLength, JBB, handle);
1107 }
1108 
1109 void
1110 DblqhProxy::execALTER_TAB_CONF(Signal* signal)
1111 {
1112  const AlterTabConf* conf = (const AlterTabConf*)signal->getDataPtr();
1113  Uint32 ssId = getSsId(conf);
1114  Ss_ALTER_TAB_REQ& ss = ssFind<Ss_ALTER_TAB_REQ>(ssId);
1115  recvCONF(signal, ss);
1116 }
1117 
1118 void
1119 DblqhProxy::execALTER_TAB_REF(Signal* signal)
1120 {
1121  const AlterTabRef* ref = (const AlterTabRef*)signal->getDataPtr();
1122  Uint32 ssId = getSsId(ref);
1123  Ss_ALTER_TAB_REQ& ss = ssFind<Ss_ALTER_TAB_REQ>(ssId);
1124  recvREF(signal, ss, ref->errorCode);
1125 }
1126 
1127 void
1128 DblqhProxy::sendALTER_TAB_CONF(Signal* signal, Uint32 ssId)
1129 {
1130  Ss_ALTER_TAB_REQ& ss = ssFind<Ss_ALTER_TAB_REQ>(ssId);
1131  BlockReference dictRef = ss.m_req.senderRef;
1132 
1133  if (!lastReply(ss))
1134  return;
1135 
1136  if (ss.m_error == 0) {
1137  jam();
1138  AlterTabConf* conf = (AlterTabConf*)signal->getDataPtrSend();
1139  conf->senderRef = reference();
1140  conf->senderData = ss.m_req.senderData;
1141  sendSignal(dictRef, GSN_ALTER_TAB_CONF,
1142  signal, AlterTabConf::SignalLength, JBB);
1143  } else {
1144  jam();
1145  AlterTabRef* ref = (AlterTabRef*)signal->getDataPtrSend();
1146  ref->senderRef = reference();
1147  ref->senderData = ss.m_req.senderData;
1148  ref->errorCode = ss.m_error;
1149  sendSignal(dictRef, GSN_ALTER_TAB_REF,
1150  signal, AlterTabConf::SignalLength, JBB);
1151  }
1152 
1153  ssRelease<Ss_ALTER_TAB_REQ>(ssId);
1154 }
1155 
1156 // GSN_START_FRAGREQ
1157 
1158 void
1160 {
1161  StartFragReq* req = (StartFragReq*)signal->getDataPtrSend();
1162  Uint32 instance = getInstanceKey(req->tableId, req->fragId);
1163 
1164  // wl4391_todo impl. method that fakes senders block-ref
1165  sendSignal(numberToRef(DBLQH, instance, getOwnNodeId()),
1166  GSN_START_FRAGREQ, signal, signal->getLength(), JBB);
1167 }
1168 
1169 // GSN_START_RECREQ
1170 
1171 void
1172 DblqhProxy::execSTART_RECREQ(Signal* signal)
1173 {
1174  if (refToMain(signal->getSendersBlockRef()) == DBLQH) {
1175  jam();
1176  execSTART_RECREQ_2(signal);
1177  return;
1178  }
1179 
1180  const StartRecReq* req = (const StartRecReq*)signal->getDataPtr();
1181  Ss_START_RECREQ& ss = ssSeize<Ss_START_RECREQ>();
1182  ss.m_req = *req;
1183 
1184  // seize records for sub-ops
1185  Uint32 i;
1186  for (i = 0; i < ss.m_req2cnt; i++) {
1187  Ss_START_RECREQ_2::Req tmp;
1188  tmp.proxyBlockNo = ss.m_req2[i].m_blockNo;
1189  Uint32 ssId2 = getSsId(&tmp);
1190  Ss_START_RECREQ_2& ss2 = ssSeize<Ss_START_RECREQ_2>(ssId2);
1191  ss.m_req2[i].m_ssId = ssId2;
1192 
1193  // set wait-for bitmask in SsParallel
1194  setMask(ss2);
1195  }
1196 
1197  ndbrequire(signal->getLength() == StartRecReq::SignalLength);
1198  sendREQ(signal, ss);
1199 }
1200 
1201 void
1202 DblqhProxy::sendSTART_RECREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
1203 {
1204  Ss_START_RECREQ& ss = ssFind<Ss_START_RECREQ>(ssId);
1205 
1206  StartRecReq* req = (StartRecReq*)signal->getDataPtrSend();
1207  *req = ss.m_req;
1208 
1209  req->senderRef = reference();
1210  req->senderData = ssId;
1211  sendSignalNoRelease(workerRef(ss.m_worker), GSN_START_RECREQ,
1212  signal, StartRecReq::SignalLength, JBB, handle);
1213 }
1214 
1215 void
1216 DblqhProxy::execSTART_RECCONF(Signal* signal)
1217 {
1218  const StartRecConf* conf = (const StartRecConf*)signal->getDataPtr();
1219 
1220  if (refToMain(signal->getSendersBlockRef()) != DBLQH) {
1221  jam();
1222  execSTART_RECCONF_2(signal);
1223  return;
1224  }
1225 
1226  Uint32 ssId = conf->senderData;
1227  Ss_START_RECREQ& ss = ssFind<Ss_START_RECREQ>(ssId);
1228  recvCONF(signal, ss);
1229 }
1230 
1231 void
1233 {
1234  Ss_START_RECREQ& ss = ssFind<Ss_START_RECREQ>(ssId);
1235 
1236  if (!lastReply(ss))
1237  return;
1238 
1239  if (ss.m_error == 0) {
1240  jam();
1241 
1245  signal->theData[0] = 12003;
1246  sendSignal(LGMAN_REF, GSN_DUMP_STATE_ORD, signal, 1, JBB);
1247 
1248  StartRecConf* conf = (StartRecConf*)signal->getDataPtrSend();
1249  conf->startingNodeId = getOwnNodeId();
1250  conf->senderData = ss.m_req.senderData;
1251  sendSignal(ss.m_req.senderRef, GSN_START_RECCONF,
1252  signal, StartRecConf::SignalLength, JBB);
1253  } else {
1254  ndbrequire(false);
1255  }
1256 
1257  {
1258  Uint32 i;
1259  for (i = 0; i < ss.m_req2cnt; i++) {
1260  jam();
1261  Uint32 ssId2 = ss.m_req2[i].m_ssId;
1262  ssRelease<Ss_START_RECREQ_2>(ssId2);
1263  }
1264  }
1265  ssRelease<Ss_START_RECREQ>(ssId);
1266 }
1267 
1268 // GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
1269 
1270 void
1271 DblqhProxy::execSTART_RECREQ_2(Signal* signal)
1272 {
1273  ndbrequire(signal->getLength() == Ss_START_RECREQ_2::Req::SignalLength);
1274 
1275  const Ss_START_RECREQ_2::Req* req =
1276  (const Ss_START_RECREQ_2::Req*)signal->getDataPtr();
1277  Uint32 ssId = getSsId(req);
1278  Ss_START_RECREQ_2& ss = ssFind<Ss_START_RECREQ_2>(ssId);
1279 
1280  // reversed roles
1281  recvCONF(signal, ss);
1282 }
1283 
1284 void
1285 DblqhProxy::sendSTART_RECREQ_2(Signal* signal, Uint32 ssId)
1286 {
1287  Ss_START_RECREQ_2& ss = ssFind<Ss_START_RECREQ_2>(ssId);
1288 
1289  const Ss_START_RECREQ_2::Req* req =
1290  (const Ss_START_RECREQ_2::Req*)signal->getDataPtr();
1291 
1292  if (firstReply(ss)) {
1293  ss.m_req = *req;
1294  } else {
1295  jam();
1296  /*
1297  * Fragments can be started from different lcpId's. LGMAN must run
1298  * UNDO until lowest lcpId. Each DBLQH instance computes the lowest
1299  * lcpId in START_FRAGREQ. In MT case the proxy further computes
1300  * the lowest of the lcpId's from worker instances.
1301  */
1302  if (req->lcpId < ss.m_req.lcpId)
1303  {
1304  jam();
1305  ss.m_req.lcpId = req->lcpId;
1306  }
1307  ndbrequire(ss.m_req.proxyBlockNo == req->proxyBlockNo);
1308  }
1309 
1310  if (!lastReply(ss))
1311  return;
1312 
1313  {
1314  Ss_START_RECREQ_2::Req* req =
1315  (Ss_START_RECREQ_2::Req*)signal->getDataPtrSend();
1316  *req = ss.m_req;
1317  BlockReference ref = numberToRef(req->proxyBlockNo, getOwnNodeId());
1318  sendSignal(ref, GSN_START_RECREQ, signal,
1319  Ss_START_RECREQ_2::Req::SignalLength, JBB);
1320  }
1321 }
1322 
1323 void
1324 DblqhProxy::execSTART_RECCONF_2(Signal* signal)
1325 {
1326  ndbrequire(signal->getLength() == Ss_START_RECREQ_2::Conf::SignalLength);
1327 
1328  const Ss_START_RECREQ_2::Conf* conf =
1329  (const Ss_START_RECREQ_2::Conf*)signal->getDataPtr();
1330  Uint32 ssId = getSsId(conf);
1331  Ss_START_RECREQ_2& ss = ssFind<Ss_START_RECREQ_2>(ssId);
1332  ss.m_conf = *conf;
1333 
1334  // reversed roles
1335  sendREQ(signal, ss);
1336 }
1337 
1338 void
1339 DblqhProxy::sendSTART_RECCONF_2(Signal* signal, Uint32 ssId,
1340  SectionHandle* handle)
1341 {
1342  Ss_START_RECREQ_2& ss = ssFind<Ss_START_RECREQ_2>(ssId);
1343 
1344  Ss_START_RECREQ_2::Conf* conf =
1345  (Ss_START_RECREQ_2::Conf*)signal->getDataPtrSend();
1346  *conf = ss.m_conf;
1347  sendSignalNoRelease(workerRef(ss.m_worker), GSN_START_RECCONF, signal,
1348  Ss_START_RECREQ_2::Conf::SignalLength, JBB, handle);
1349 }
1350 
1351 // GSN_LQH_TRANSREQ
1352 
1353 void
1355 {
1356  jamEntry();
1357 
1358  if (!checkNodeFailSequence(signal))
1359  {
1360  jam();
1361  return;
1362  }
1363  const LqhTransReq* req = (const LqhTransReq*)signal->getDataPtr();
1364  Ss_LQH_TRANSREQ& ss = ssSeize<Ss_LQH_TRANSREQ>();
1365  ss.m_req = *req;
1366  ndbrequire(signal->getLength() == LqhTransReq::SignalLength);
1367  sendREQ(signal, ss);
1368 
1373  Uint32 nodeId = ss.m_req.failedNodeId;
1374  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(c_ss_LQH_TRANSREQ.m_pool); i++)
1375  {
1376  if (c_ss_LQH_TRANSREQ.m_pool[i].m_ssId != 0 &&
1377  c_ss_LQH_TRANSREQ.m_pool[i].m_ssId != ss.m_ssId &&
1378  c_ss_LQH_TRANSREQ.m_pool[i].m_req.failedNodeId == nodeId)
1379  {
1380  jam();
1381  c_ss_LQH_TRANSREQ.m_pool[i].m_valid = false;
1382  }
1383  }
1384 }
1385 
1386 void
1387 DblqhProxy::sendLQH_TRANSREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
1388 {
1389  Ss_LQH_TRANSREQ& ss = ssFind<Ss_LQH_TRANSREQ>(ssId);
1390 
1391  LqhTransReq* req = (LqhTransReq*)signal->getDataPtrSend();
1392  *req = ss.m_req;
1393 
1394  req->senderData = ssId;
1395  req->senderRef = reference();
1396  sendSignalNoRelease(workerRef(ss.m_worker), GSN_LQH_TRANSREQ,
1397  signal, LqhTransReq::SignalLength, JBB, handle);
1398 }
1399 
1400 void
1402 {
1403  const LqhTransConf* conf = (const LqhTransConf*)signal->getDataPtr();
1404  Uint32 ssId = conf->tcRef;
1405  Ss_LQH_TRANSREQ& ss = ssFind<Ss_LQH_TRANSREQ>(ssId);
1406  ss.m_conf = *conf;
1407 
1408  BlockReference ref = signal->getSendersBlockRef();
1409  ndbrequire(refToMain(ref) == number());
1410  const Uint32 ino = refToInstance(ref);
1411  const Uint32 worker = workerIndex(ino);
1412 
1413  ndbrequire(ref == workerRef(worker));
1414  ndbrequire(worker < c_workers);
1415 
1416  if (ss.m_valid == false)
1417  {
1418  jam();
1422  if (ss.m_conf.operationStatus == LqhTransConf::LastTransConf)
1423  {
1424  jam();
1425  ndbrequire(ss.m_workerMask.get(worker));
1426  ss.m_workerMask.clear(worker);
1427  if (ss.m_workerMask.isclear())
1428  {
1429  jam();
1430  ssRelease<Ss_LQH_TRANSREQ>(ssId);
1431  }
1432  }
1433  return;
1434  }
1435  else if (ss.m_conf.operationStatus == LqhTransConf::LastTransConf)
1436  {
1437  jam();
1445  Uint32 nodeId = ss.m_req.failedNodeId;
1446  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(c_ss_LQH_TRANSREQ.m_pool); i++)
1447  {
1448  if (c_ss_LQH_TRANSREQ.m_pool[i].m_ssId != 0 &&
1449  c_ss_LQH_TRANSREQ.m_pool[i].m_ssId != ssId &&
1450  c_ss_LQH_TRANSREQ.m_pool[i].m_req.failedNodeId == nodeId &&
1451  c_ss_LQH_TRANSREQ.m_pool[i].m_valid == false)
1452  {
1453  jam();
1454  if (c_ss_LQH_TRANSREQ.m_pool[i].m_workerMask.get(worker))
1455  {
1456  jam();
1457  c_ss_LQH_TRANSREQ.m_pool[i].m_workerMask.clear(worker);
1458  if (c_ss_LQH_TRANSREQ.m_pool[i].m_workerMask.isclear())
1459  {
1460  jam();
1461  ssRelease<Ss_LQH_TRANSREQ>(c_ss_LQH_TRANSREQ.m_pool[i].m_ssId);
1462  }
1463  }
1464  }
1465  }
1466  }
1467 
1468  recvCONF(signal, ss);
1469 }
1470 
1471 void
1472 DblqhProxy::sendLQH_TRANSCONF(Signal* signal, Uint32 ssId)
1473 {
1474  Ss_LQH_TRANSREQ& ss = ssFind<Ss_LQH_TRANSREQ>(ssId);
1475 
1476  if (ss.m_conf.operationStatus != LqhTransConf::LastTransConf) {
1477  jam();
1478  LqhTransConf* conf = (LqhTransConf*)signal->getDataPtrSend();
1479  *conf = ss.m_conf;
1480  conf->tcRef = ss.m_req.senderData;
1481  sendSignal(ss.m_req.senderRef, GSN_LQH_TRANSCONF,
1482  signal, LqhTransConf::SignalLength, JBB);
1483 
1484  // more replies from this worker
1485  skipConf(ss);
1486  }
1487 
1488  if (!lastReply(ss))
1489  return;
1490 
1491  if (ss.m_error == 0) {
1492  jam();
1493  LqhTransConf* conf = (LqhTransConf*)signal->getDataPtrSend();
1494  conf->tcRef = ss.m_req.senderData;
1495  conf->lqhNodeId = getOwnNodeId();
1496  conf->operationStatus = LqhTransConf::LastTransConf;
1497  sendSignal(ss.m_req.senderRef, GSN_LQH_TRANSCONF,
1498  signal, LqhTransConf::SignalLength, JBB);
1499  } else {
1500  ndbrequire(false);
1501  }
1502 
1503  ssRelease<Ss_LQH_TRANSREQ>(ssId);
1504 }
1505 
1506 // GSN_EXEC_SR_1 [fictional gsn ]
1507 
1508 void
1509 DblqhProxy::execEXEC_SRREQ(Signal* signal)
1510 {
1511  const BlockReference senderRef = signal->getSendersBlockRef();
1512 
1513  if (refToInstance(senderRef) != 0) {
1514  jam();
1515  execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
1516  return;
1517  }
1518 
1519  execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
1520 }
1521 
1522 void
1523 DblqhProxy::execEXEC_SRCONF(Signal* signal)
1524 {
1525  const BlockReference senderRef = signal->getSendersBlockRef();
1526 
1527  if (refToInstance(senderRef) != 0) {
1528  jam();
1529  execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
1530  return;
1531  }
1532 
1533  execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
1534 }
1535 
1536 void
1537 DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
1538 {
1539  ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
1540 
1541  const Ss_EXEC_SR_1::Sig* sig =
1542  (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
1543  Uint32 ssId = getSsId(sig);
1544  Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
1545  ss.m_gsn = gsn;
1546  ss.m_sig = *sig;
1547 
1548  sendREQ(signal, ss);
1549  ssRelease<Ss_EXEC_SR_1>(ss);
1550 }
1551 
1552 void
1553 DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId, SectionHandle* handle)
1554 {
1555  Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
1556  signal->theData[0] = ss.m_sig.nodeId;
1557  sendSignalNoRelease(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB, handle);
1558 }
1559 
1560 // GSN_EXEC_SRREQ_2 [ fictional gsn ]
1561 
1562 void
1563 DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
1564 {
1565  ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
1566 
1567  const Ss_EXEC_SR_2::Sig* sig =
1568  (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
1569  Uint32 ssId = getSsId(sig);
1570 
1571  bool found = false;
1572  Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
1573  if (!found) {
1574  jam();
1575  setMask(ss);
1576  }
1577 
1578  ndbrequire(sig->nodeId == getOwnNodeId());
1579  if (ss.m_sigcount == 0) {
1580  jam();
1581  ss.m_gsn = gsn;
1582  ss.m_sig = *sig;
1583  } else {
1584  jam();
1585  ndbrequire(ss.m_gsn == gsn);
1586  ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
1587  }
1588  ss.m_sigcount++;
1589 
1590  // reversed roles
1591  recvCONF(signal, ss);
1592 }
1593 
1594 void
1595 DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
1596 {
1597  Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
1598 
1599  if (!lastReply(ss)) {
1600  jam();
1601  return;
1602  }
1603 
1604  NodeBitmask nodes;
1605  nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
1606  NodeReceiverGroup rg(DBLQH, nodes);
1607 
1608  signal->theData[0] = ss.m_sig.nodeId;
1609  sendSignal(rg, ss.m_gsn, signal, 1, JBB);
1610 
1611  ssRelease<Ss_EXEC_SR_2>(ssId);
1612 }
1613 
1614 // GSN_EXEC_FRAGREQ
1615 void
1617 {
1618  Uint32 ref = ((ExecFragReq*)signal->getDataPtr())->dst;
1619 
1620  if (refToNode(ref) == getOwnNodeId())
1621  {
1622  jam();
1623  sendSignal(ref, GSN_EXEC_FRAGREQ, signal, signal->getLength(), JBB);
1624  }
1625  else if (ndb_route_exec_frag(getNodeInfo(refToNode(ref)).m_version))
1626  {
1627  jam();
1628  sendSignal(numberToRef(DBLQH, refToNode(ref)), GSN_EXEC_FRAGREQ, signal,
1629  signal->getLength(), JBB);
1630  }
1631  else
1632  {
1633  jam();
1634  sendSignal(ref, GSN_EXEC_FRAGREQ, signal,
1635  signal->getLength(), JBB);
1636  }
1637 }
1638 
1639 // GSN_EXEC_FRAGCONF
1640 void
1641 DblqhProxy::execEXEC_FRAGCONF(Signal* signal)
1642 {
1643  Uint32 ref = signal->theData[1];
1644 
1645  if (refToNode(ref) == getOwnNodeId())
1646  {
1647  jam();
1648  sendSignal(ref, GSN_EXEC_FRAGCONF, signal, 1, JBB);
1649  }
1650  else if (ndb_route_exec_frag(getNodeInfo(refToNode(ref)).m_version))
1651  {
1652  jam();
1653  sendSignal(numberToRef(DBLQH, refToNode(ref)), GSN_EXEC_FRAGCONF,
1654  signal, 2, JBB);
1655  }
1656  else
1657  {
1658  jam();
1659  sendSignal(ref, GSN_EXEC_FRAGCONF, signal, 2, JBB);
1660  }
1661 }
1662 
1663 // GSN_DROP_FRAG_REQ
1664 
1665 void
1666 DblqhProxy::execDROP_FRAG_REQ(Signal* signal)
1667 {
1668  const DropFragReq* req = (const DropFragReq*)signal->getDataPtr();
1669  Uint32 ssId = getSsId(req);
1670  Ss_DROP_FRAG_REQ& ss = ssSeize<Ss_DROP_FRAG_REQ>(ssId);
1671  ss.m_req = *req;
1672  ndbrequire(signal->getLength() == DropFragReq::SignalLength);
1673  sendREQ(signal, ss);
1674 }
1675 
1676 void
1677 DblqhProxy::sendDROP_FRAG_REQ(Signal* signal, Uint32 ssId,
1678  SectionHandle* handle)
1679 {
1680  Ss_DROP_FRAG_REQ& ss = ssFind<Ss_DROP_FRAG_REQ>(ssId);
1681 
1682  DropFragReq* req = (DropFragReq*)signal->getDataPtrSend();
1683  *req = ss.m_req;
1684  req->senderRef = reference();
1685  req->senderData = ssId;
1686  sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_FRAG_REQ,
1687  signal, DropFragReq::SignalLength, JBB, handle);
1688 }
1689 
1690 void
1691 DblqhProxy::execDROP_FRAG_CONF(Signal* signal)
1692 {
1693  const DropFragConf* conf = (const DropFragConf*)signal->getDataPtr();
1694  Uint32 ssId = getSsId(conf);
1695  Ss_DROP_FRAG_REQ& ss = ssFind<Ss_DROP_FRAG_REQ>(ssId);
1696  recvCONF(signal, ss);
1697 }
1698 
1699 void
1700 DblqhProxy::execDROP_FRAG_REF(Signal* signal)
1701 {
1702  const DropFragRef* ref = (const DropFragRef*)signal->getDataPtr();
1703  Uint32 ssId = getSsId(ref);
1704  Ss_DROP_FRAG_REQ& ss = ssFind<Ss_DROP_FRAG_REQ>(ssId);
1705  recvREF(signal, ss, ref->errCode);
1706 }
1707 
1708 void
1709 DblqhProxy::sendDROP_FRAG_CONF(Signal* signal, Uint32 ssId)
1710 {
1711  Ss_DROP_FRAG_REQ& ss = ssFind<Ss_DROP_FRAG_REQ>(ssId);
1712  BlockReference dictRef = ss.m_req.senderRef;
1713 
1714  if (!lastReply(ss))
1715  return;
1716 
1717  if (ss.m_error == 0) {
1718  jam();
1719  DropFragConf* conf = (DropFragConf*)signal->getDataPtrSend();
1720  conf->senderRef = reference();
1721  conf->senderData = ss.m_req.senderData;
1722  conf->tableId = ss.m_req.tableId;
1723  conf->fragId = ss.m_req.fragId;
1724  sendSignal(dictRef, GSN_DROP_FRAG_CONF,
1725  signal, DropFragConf::SignalLength, JBB);
1726  } else {
1727  jam();
1728  DropFragRef* ref = (DropFragRef*)signal->getDataPtrSend();
1729  ref->senderRef = reference();
1730  ref->senderData = ss.m_req.senderData;
1731  ref->tableId = ss.m_req.tableId;
1732  ref->fragId = ss.m_req.fragId;
1733  ref->errCode = ss.m_error;
1734  sendSignal(dictRef, GSN_DROP_FRAG_REF,
1735  signal, DropFragConf::SignalLength, JBB);
1736  }
1737 
1738  ssRelease<Ss_DROP_FRAG_REQ>(ssId);
1739 }
1740 
1741 BLOCK_FUNCTIONS(DblqhProxy)