MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DbtupScan.cpp
1 /*
2  Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #define DBTUP_C
19 #define DBTUP_SCAN_CPP
20 #include "Dbtup.hpp"
21 #include <signaldata/AccScan.hpp>
22 #include <signaldata/NextScan.hpp>
23 #include <signaldata/AccLock.hpp>
24 #include <md5_hash.hpp>
25 
26 #undef jam
27 #undef jamEntry
28 #define jam() { jamLine(32000 + __LINE__); }
29 #define jamEntry() { jamEntryLine(32000 + __LINE__); }
30 
31 #ifdef VM_TRACE
32 #define dbg(x) globalSignalLoggers.log x
33 #else
34 #define dbg(x)
35 #endif
36 
37 void
38 Dbtup::execACC_SCANREQ(Signal* signal)
39 {
40  jamEntry();
41  const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
42  const AccScanReq* const req = &reqCopy;
43  ScanOpPtr scanPtr;
44  scanPtr.i = RNIL;
45  do {
46  // find table and fragment
47  TablerecPtr tablePtr;
48  tablePtr.i = req->tableId;
49  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
50  FragrecordPtr fragPtr;
51  Uint32 fragId = req->fragmentNo;
52  fragPtr.i = RNIL;
53  getFragmentrec(fragPtr, fragId, tablePtr.p);
54  ndbrequire(fragPtr.i != RNIL);
55  Fragrecord& frag = *fragPtr.p;
56  // flags
57  Uint32 bits = 0;
58 
59  if (AccScanReq::getLcpScanFlag(req->requestInfo))
60  {
61  jam();
62  bits |= ScanOp::SCAN_LCP;
63  c_scanOpPool.getPtr(scanPtr, c_lcp_scan_op);
64  ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i);
65  ndbrequire(scanPtr.p->m_state == ScanOp::First);
66  }
67  else
68  {
69  // seize from pool and link to per-fragment list
70  LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
71  if (! list.seize(scanPtr)) {
72  jam();
73  break;
74  }
75  new (scanPtr.p) ScanOp;
76  }
77 
78  if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
79  && tablePtr.p->m_no_of_disk_attributes)
80  {
81  bits |= ScanOp::SCAN_DD;
82  }
83 
84  bool mm = (bits & ScanOp::SCAN_DD);
85  if ((tablePtr.p->m_attributes[mm].m_no_of_varsize +
86  tablePtr.p->m_attributes[mm].m_no_of_dynamic) > 0)
87  {
88  if (bits & ScanOp::SCAN_DD)
89  {
90  // only dd scan varsize pages
91  // mm always has a fixed part
92  bits |= ScanOp::SCAN_VS;
93  }
94  }
95 
96  if (! AccScanReq::getReadCommittedFlag(req->requestInfo))
97  {
98  if (AccScanReq::getLockMode(req->requestInfo) == 0)
99  bits |= ScanOp::SCAN_LOCK_SH;
100  else
101  bits |= ScanOp::SCAN_LOCK_EX;
102  }
103 
104  if (AccScanReq::getNRScanFlag(req->requestInfo))
105  {
106  jam();
107  bits |= ScanOp::SCAN_NR;
108  scanPtr.p->m_endPage = req->maxPage;
109  if (req->maxPage != RNIL && req->maxPage > frag.m_max_page_no)
110  {
111  ndbout_c("%u %u endPage: %u (noOfPages: %u maxPage: %u)",
112  tablePtr.i, fragId,
113  req->maxPage, fragPtr.p->noOfPages,
114  fragPtr.p->m_max_page_no);
115  }
116  }
117  else
118  {
119  jam();
120  scanPtr.p->m_endPage = RNIL;
121  }
122 
123  if (AccScanReq::getLcpScanFlag(req->requestInfo))
124  {
125  jam();
126  ndbrequire((bits & ScanOp::SCAN_DD) == 0);
127  ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
128  }
129 
130  if (bits & ScanOp::SCAN_VS)
131  {
132  ndbrequire((bits & ScanOp::SCAN_NR) == 0);
133  ndbrequire((bits & ScanOp::SCAN_LCP) == 0);
134  }
135 
136  // set up scan op
137  ScanOp& scan = *scanPtr.p;
138  scan.m_state = ScanOp::First;
139  scan.m_bits = bits;
140  scan.m_userPtr = req->senderData;
141  scan.m_userRef = req->senderRef;
142  scan.m_tableId = tablePtr.i;
143  scan.m_fragId = frag.fragmentId;
144  scan.m_fragPtrI = fragPtr.i;
145  scan.m_transId1 = req->transId1;
146  scan.m_transId2 = req->transId2;
147  scan.m_savePointId = req->savePointId;
148 
149  // conf
150  AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
151  conf->scanPtr = req->senderData;
152  conf->accPtr = scanPtr.i;
153  conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
154  sendSignal(req->senderRef, GSN_ACC_SCANCONF,
155  signal, AccScanConf::SignalLength, JBB);
156  return;
157  } while (0);
158  if (scanPtr.i != RNIL) {
159  jam();
160  releaseScanOp(scanPtr);
161  }
162  // LQH does not handle REF
163  signal->theData[0] = 0x313;
164  sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
165 }
166 
167 void
168 Dbtup::execNEXT_SCANREQ(Signal* signal)
169 {
170  jamEntry();
171  const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
172  const NextScanReq* const req = &reqCopy;
173  ScanOpPtr scanPtr;
174  c_scanOpPool.getPtr(scanPtr, req->accPtr);
175  ScanOp& scan = *scanPtr.p;
176  switch (req->scanFlag) {
177  case NextScanReq::ZSCAN_NEXT:
178  jam();
179  break;
180  case NextScanReq::ZSCAN_NEXT_COMMIT:
181  jam();
182  case NextScanReq::ZSCAN_COMMIT:
183  jam();
184  if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
185  jam();
186  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
187  lockReq->returnCode = RNIL;
188  lockReq->requestInfo = AccLockReq::Unlock;
189  lockReq->accOpPtr = req->accOperationPtr;
190  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
191  signal, AccLockReq::UndoSignalLength);
192  jamEntry();
193  ndbrequire(lockReq->returnCode == AccLockReq::Success);
194  removeAccLockOp(scan, req->accOperationPtr);
195  }
196  if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
197  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
198  conf->scanPtr = scan.m_userPtr;
199  unsigned signalLength = 1;
200  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
201  signal, signalLength, JBB);
202  return;
203  }
204  break;
205  case NextScanReq::ZSCAN_CLOSE:
206  jam();
207  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
208  jam();
209  ndbrequire(scan.m_accLockOp != RNIL);
210  // use ACC_ABORTCONF to flush out any reply in job buffer
211  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
212  lockReq->returnCode = RNIL;
213  lockReq->requestInfo = AccLockReq::AbortWithConf;
214  lockReq->accOpPtr = scan.m_accLockOp;
215  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
216  signal, AccLockReq::UndoSignalLength);
217  jamEntry();
218  ndbrequire(lockReq->returnCode == AccLockReq::Success);
219  scan.m_state = ScanOp::Aborting;
220  return;
221  }
222  if (scan.m_state == ScanOp::Locked) {
223  jam();
224  ndbrequire(scan.m_accLockOp != RNIL);
225  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
226  lockReq->returnCode = RNIL;
227  lockReq->requestInfo = AccLockReq::Abort;
228  lockReq->accOpPtr = scan.m_accLockOp;
229  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
230  signal, AccLockReq::UndoSignalLength);
231  jamEntry();
232  ndbrequire(lockReq->returnCode == AccLockReq::Success);
233  scan.m_accLockOp = RNIL;
234  }
235  scan.m_state = ScanOp::Aborting;
236  scanClose(signal, scanPtr);
237  return;
238  case NextScanReq::ZSCAN_NEXT_ABORT:
239  jam();
240  default:
241  jam();
242  ndbrequire(false);
243  break;
244  }
245  // start looking for next scan result
246  AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
247  checkReq->accPtr = scanPtr.i;
248  checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
249  EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
250  jamEntry();
251 }
252 
253 void
254 Dbtup::execACC_CHECK_SCAN(Signal* signal)
255 {
256  jamEntry();
257  const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
258  const AccCheckScan* const req = &reqCopy;
259  ScanOpPtr scanPtr;
260  c_scanOpPool.getPtr(scanPtr, req->accPtr);
261  ScanOp& scan = *scanPtr.p;
262  // fragment
263  FragrecordPtr fragPtr;
264  fragPtr.i = scan.m_fragPtrI;
265  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
266  Fragrecord& frag = *fragPtr.p;
267  if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
268  jam();
269  signal->theData[0] = scan.m_userPtr;
270  signal->theData[1] = true;
271  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
272  jamEntry();
273  return;
274  }
275  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
276  jam();
277  // LQH asks if we are waiting for lock and we tell it to ask again
278  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
279  conf->scanPtr = scan.m_userPtr;
280  conf->accOperationPtr = RNIL; // no tuple returned
281  conf->fragId = frag.fragmentId;
282  unsigned signalLength = 3;
283  // if TC has ordered scan close, it will be detected here
284  sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
285  signal, signalLength, JBB);
286  return; // stop
287  }
288 
289  const bool lcp = (scan.m_bits & ScanOp::SCAN_LCP);
290 
291  if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
292  {
293  jam();
299  handle_lcp_keep(signal, fragPtr.p, scanPtr.p);
300  return;
301  }
302 
303  if (scan.m_state == ScanOp::First) {
304  jam();
305  scanFirst(signal, scanPtr);
306  }
307  if (scan.m_state == ScanOp::Next) {
308  jam();
309  bool immediate = scanNext(signal, scanPtr);
310  if (! immediate) {
311  jam();
312  // time-slicing via TUP or PGMAN
313  return;
314  }
315  }
316  scanReply(signal, scanPtr);
317 }
318 
319 void
320 Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
321 {
322  ScanOp& scan = *scanPtr.p;
323  FragrecordPtr fragPtr;
324  fragPtr.i = scan.m_fragPtrI;
325  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
326  Fragrecord& frag = *fragPtr.p;
327  // for reading tuple key in Current state
328  Uint32* pkData = (Uint32*)c_dataBuffer;
329  unsigned pkSize = 0;
330  if (scan.m_state == ScanOp::Current) {
331  // found an entry to return
332  jam();
333  ndbrequire(scan.m_accLockOp == RNIL);
334  if (scan.m_bits & ScanOp::SCAN_LOCK) {
335  jam();
336  // read tuple key - use TUX routine
337  const ScanPos& pos = scan.m_scanPos;
338  const Local_key& key_mm = pos.m_key_mm;
339  int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
340  pkData, true);
341  ndbrequire(ret > 0);
342  pkSize = ret;
343  dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
344  // get read lock or exclusive lock
345  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
346  lockReq->returnCode = RNIL;
347  lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
348  AccLockReq::LockShared : AccLockReq::LockExclusive;
349  lockReq->accOpPtr = RNIL;
350  lockReq->userPtr = scanPtr.i;
351  lockReq->userRef = reference();
352  lockReq->tableId = scan.m_tableId;
353  lockReq->fragId = frag.fragmentId;
354  lockReq->fragPtrI = RNIL; // no cached frag ptr yet
355  lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
356  lockReq->page_id = key_mm.m_page_no;
357  lockReq->page_idx = key_mm.m_page_idx;
358  lockReq->transId1 = scan.m_transId1;
359  lockReq->transId2 = scan.m_transId2;
360  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
361  signal, AccLockReq::LockSignalLength);
362  jamEntry();
363  switch (lockReq->returnCode) {
364  case AccLockReq::Success:
365  jam();
366  scan.m_state = ScanOp::Locked;
367  scan.m_accLockOp = lockReq->accOpPtr;
368  break;
369  case AccLockReq::IsBlocked:
370  jam();
371  // normal lock wait
372  scan.m_state = ScanOp::Blocked;
373  scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
374  scan.m_accLockOp = lockReq->accOpPtr;
375  // LQH will wake us up
376  signal->theData[0] = scan.m_userPtr;
377  signal->theData[1] = true;
378  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
379  jamEntry();
380  return;
381  break;
382  case AccLockReq::Refused:
383  jam();
384  // we cannot see deleted tuple (assert only)
385  ndbassert(false);
386  // skip it
387  scan.m_state = ScanOp::Next;
388  signal->theData[0] = scan.m_userPtr;
389  signal->theData[1] = true;
390  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
391  jamEntry();
392  return;
393  break;
394  case AccLockReq::NoFreeOp:
395  jam();
396  // max ops should depend on max scans (assert only)
397  ndbassert(false);
398  // stay in Current state
399  scan.m_state = ScanOp::Current;
400  signal->theData[0] = scan.m_userPtr;
401  signal->theData[1] = true;
402  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
403  jamEntry();
404  return;
405  break;
406  default:
407  ndbrequire(false);
408  break;
409  }
410  } else {
411  scan.m_state = ScanOp::Locked;
412  }
413  }
414 
415  if (scan.m_state == ScanOp::Locked) {
416  // we have lock or do not need one
417  jam();
418  // conf signal
419  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
420  conf->scanPtr = scan.m_userPtr;
421  // the lock is passed to LQH
422  Uint32 accLockOp = scan.m_accLockOp;
423  if (accLockOp != RNIL) {
424  scan.m_accLockOp = RNIL;
425  // remember it until LQH unlocks it
426  addAccLockOp(scan, accLockOp);
427  } else {
428  ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
429  // operation RNIL in LQH would signal no tuple returned
430  accLockOp = (Uint32)-1;
431  }
432  const ScanPos& pos = scan.m_scanPos;
433  conf->accOperationPtr = accLockOp;
434  conf->fragId = frag.fragmentId;
435  conf->localKey[0] = pos.m_key_mm.m_page_no;
436  conf->localKey[1] = pos.m_key_mm.m_page_idx;
437  unsigned signalLength = 5;
438  if (scan.m_bits & ScanOp::SCAN_LOCK) {
439  sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
440  signal, signalLength, JBB);
441  } else {
442  Uint32 blockNo = refToMain(scan.m_userRef);
443  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
444  jamEntry();
445  }
446  // next time look for next entry
447  scan.m_state = ScanOp::Next;
448  return;
449  }
450  if (scan.m_state == ScanOp::Last ||
451  scan.m_state == ScanOp::Invalid) {
452  jam();
453  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
454  conf->scanPtr = scan.m_userPtr;
455  conf->accOperationPtr = RNIL;
456  conf->fragId = RNIL;
457  unsigned signalLength = 3;
458  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
459  signal, signalLength, JBB);
460  return;
461  }
462  ndbrequire(false);
463 }
464 
465 /*
466  * Lock succeeded (after delay) in ACC. If the lock is for current
467  * entry, set state to Locked. If the lock is for an entry we were
468  * moved away from, simply unlock it. Finally, if we are closing the
469  * scan, do nothing since we have already sent an abort request.
470  */
471 void
472 Dbtup::execACCKEYCONF(Signal* signal)
473 {
474  jamEntry();
475  ScanOpPtr scanPtr;
476  scanPtr.i = signal->theData[0];
477 
478  Uint32 localKey1 = signal->theData[3];
479  Uint32 localKey2 = signal->theData[4];
480  Local_key tmp;
481  tmp.m_page_no = localKey1;
482  tmp.m_page_idx = localKey2;
483 
484  c_scanOpPool.getPtr(scanPtr);
485  ScanOp& scan = *scanPtr.p;
486  ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
487  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
488  if (scan.m_state == ScanOp::Blocked) {
489  // the lock wait was for current entry
490  jam();
491 
492  if (likely(scan.m_scanPos.m_key_mm.m_page_no == tmp.m_page_no &&
493  scan.m_scanPos.m_key_mm.m_page_idx == tmp.m_page_idx))
494  {
495  jam();
496  scan.m_state = ScanOp::Locked;
497  // LQH has the ball
498  return;
499  }
500  else
501  {
502  jam();
513  ndbout << "execACCKEYCONF "
514  << scan.m_scanPos.m_key_mm
515  << " != " << tmp << " ";
516  scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
517  execACCKEYREF(signal);
518  return;
519  }
520  }
521 
522  if (scan.m_state != ScanOp::Aborting) {
523  // we were moved, release lock
524  jam();
525  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
526  lockReq->returnCode = RNIL;
527  lockReq->requestInfo = AccLockReq::Abort;
528  lockReq->accOpPtr = scan.m_accLockOp;
529  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
530  jamEntry();
531  ndbrequire(lockReq->returnCode == AccLockReq::Success);
532  scan.m_accLockOp = RNIL;
533  // LQH has the ball
534  return;
535  }
536  // lose the lock
537  scan.m_accLockOp = RNIL;
538  // continue at ACC_ABORTCONF
539 }
540 
541 /*
542  * Lock failed (after delay) in ACC. Probably means somebody ahead of
543  * us in lock queue deleted the tuple.
544  */
545 void
546 Dbtup::execACCKEYREF(Signal* signal)
547 {
548  jamEntry();
549  ScanOpPtr scanPtr;
550  scanPtr.i = signal->theData[0];
551  c_scanOpPool.getPtr(scanPtr);
552  ScanOp& scan = *scanPtr.p;
553  ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
554  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
555  if (scan.m_state != ScanOp::Aborting) {
556  jam();
557  // release the operation
558  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
559  lockReq->returnCode = RNIL;
560  lockReq->requestInfo = AccLockReq::Abort;
561  lockReq->accOpPtr = scan.m_accLockOp;
562  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
563  jamEntry();
564  ndbrequire(lockReq->returnCode == AccLockReq::Success);
565  scan.m_accLockOp = RNIL;
566  // scan position should already have been moved (assert only)
567  if (scan.m_state == ScanOp::Blocked) {
568  jam();
569  //ndbassert(false);
570  if (scan.m_bits & ScanOp::SCAN_NR)
571  {
572  jam();
573  scan.m_state = ScanOp::Next;
574  scan.m_scanPos.m_get = ScanPos::Get_tuple;
575  ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
576  }
577  else
578  {
579  jam();
580  scan.m_state = ScanOp::Next;
581  ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
582  }
583  }
584  // LQH has the ball
585  return;
586  }
587  // lose the lock
588  scan.m_accLockOp = RNIL;
589  // continue at ACC_ABORTCONF
590 }
591 
592 /*
593  * Received when scan is closing. This signal arrives after any
594  * ACCKEYCON or ACCKEYREF which may have been in job buffer.
595  */
596 void
597 Dbtup::execACC_ABORTCONF(Signal* signal)
598 {
599  jamEntry();
600  ScanOpPtr scanPtr;
601  scanPtr.i = signal->theData[0];
602  c_scanOpPool.getPtr(scanPtr);
603  ScanOp& scan = *scanPtr.p;
604  ndbrequire(scan.m_state == ScanOp::Aborting);
605  // most likely we are still in lock wait
606  if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
607  jam();
608  scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
609  scan.m_accLockOp = RNIL;
610  }
611  scanClose(signal, scanPtr);
612 }
613 
614 void
615 Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
616 {
617  ScanOp& scan = *scanPtr.p;
618  ScanPos& pos = scan.m_scanPos;
619  Local_key& key = pos.m_key;
620  const Uint32 bits = scan.m_bits;
621  // fragment
622  FragrecordPtr fragPtr;
623  fragPtr.i = scan.m_fragPtrI;
624  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
625  Fragrecord& frag = *fragPtr.p;
626 
627  if (bits & ScanOp::SCAN_NR)
628  {
629  if (scan.m_endPage == 0 && frag.m_max_page_no == 0)
630  {
631  jam();
632  scan.m_state = ScanOp::Last;
633  return;
634  }
635  }
636  else if (frag.noOfPages == 0)
637  {
638  jam();
639  scan.m_state = ScanOp::Last;
640  return;
641  }
642 
643  if (! (bits & ScanOp::SCAN_DD)) {
644  key.m_file_no = ZNIL;
645  key.m_page_no = 0;
646  pos.m_get = ScanPos::Get_page_mm;
647  // for MM scan real page id is cached for efficiency
648  pos.m_realpid_mm = RNIL;
649  } else {
650  Disk_alloc_info& alloc = frag.m_disk_alloc_info;
651  // for now must check disk part explicitly
652  if (alloc.m_extent_list.firstItem == RNIL) {
653  jam();
654  scan.m_state = ScanOp::Last;
655  return;
656  }
657  pos.m_extent_info_ptr_i = alloc.m_extent_list.firstItem;
658  Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
659  key.m_file_no = ext->m_key.m_file_no;
660  key.m_page_no = ext->m_first_page_no;
661  pos.m_get = ScanPos::Get_page_dd;
662  }
663  key.m_page_idx = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
664  // let scanNext() do the work
665  scan.m_state = ScanOp::Next;
666 }
667 
668 bool
670 {
671  ScanOp& scan = *scanPtr.p;
672  ScanPos& pos = scan.m_scanPos;
673  Local_key& key = pos.m_key;
674  const Uint32 bits = scan.m_bits;
675  // table
676  TablerecPtr tablePtr;
677  tablePtr.i = scan.m_tableId;
678  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
679  Tablerec& table = *tablePtr.p;
680  // fragment
681  FragrecordPtr fragPtr;
682  fragPtr.i = scan.m_fragPtrI;
683  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
684  Fragrecord& frag = *fragPtr.p;
685  // tuple found
686  Tuple_header* th = 0;
687  Uint32 thbits = 0;
688  Uint32 loop_count = 0;
689  Uint32 scanGCI = scanPtr.p->m_scanGCI;
690  Uint32 foundGCI;
691 
692  const bool mm = (bits & ScanOp::SCAN_DD);
693  const bool lcp = (bits & ScanOp::SCAN_LCP);
694 
695  const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
696  table.m_offsets[mm].m_fix_header_size : 1;
697  const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
698 
699  if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
700  {
701  jam();
705  handle_lcp_keep(signal, fragPtr.p, scanPtr.p);
706  return false;
707  }
708 
709  switch(pos.m_get){
710  case ScanPos::Get_next_tuple:
711  jam();
712  key.m_page_idx += size;
713  // fall through
714  case ScanPos::Get_tuple:
715  jam();
719  pos.m_get = ScanPos::Get_page;
720  pos.m_realpid_mm = RNIL;
721  break;
722  default:
723  break;
724  }
725 
726  while (true) {
727  switch (pos.m_get) {
728  case ScanPos::Get_next_page:
729  // move to next page
730  jam();
731  {
732  if (! (bits & ScanOp::SCAN_DD))
733  pos.m_get = ScanPos::Get_next_page_mm;
734  else
735  pos.m_get = ScanPos::Get_next_page_dd;
736  }
737  continue;
738  case ScanPos::Get_page:
739  // get real page
740  jam();
741  {
742  if (! (bits & ScanOp::SCAN_DD))
743  pos.m_get = ScanPos::Get_page_mm;
744  else
745  pos.m_get = ScanPos::Get_page_dd;
746  }
747  continue;
748  case ScanPos::Get_next_page_mm:
749  // move to next logical TUP page
750  jam();
751  {
752  key.m_page_no++;
753  if (key.m_page_no >= frag.m_max_page_no) {
754  jam();
755 
756  if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
757  {
758  jam();
759  if (key.m_page_no < scan.m_endPage)
760  {
761  jam();
762  ndbout_c("scanning page %u", key.m_page_no);
763  goto cont;
764  }
765  }
766  // no more pages, scan ends
767  pos.m_get = ScanPos::Get_undef;
768  scan.m_state = ScanOp::Last;
769  return true;
770  }
771  cont:
772  key.m_page_idx = first;
773  pos.m_get = ScanPos::Get_page_mm;
774  // clear cached value
775  pos.m_realpid_mm = RNIL;
776  }
777  /*FALLTHRU*/
778  case ScanPos::Get_page_mm:
779  // get TUP real page
780  jam();
781  {
782  if (pos.m_realpid_mm == RNIL) {
783  jam();
784  pos.m_realpid_mm = getRealpidCheck(fragPtr.p, key.m_page_no);
785 
786  if (pos.m_realpid_mm == RNIL)
787  {
788  jam();
789  if (bits & ScanOp::SCAN_NR)
790  {
791  jam();
792  goto nopage;
793  }
794  pos.m_get = ScanPos::Get_next_page_mm;
795  break; // incr loop count
796  }
797  }
798  PagePtr pagePtr;
799  c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
800 
801  nopage:
802  pos.m_page = pagePtr.p;
803  pos.m_get = ScanPos::Get_tuple;
804  }
805  continue;
806  case ScanPos::Get_next_page_dd:
807  // move to next disk page
808  jam();
809  {
810  Disk_alloc_info& alloc = frag.m_disk_alloc_info;
811  Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
812  Ptr<Extent_info> ext_ptr;
813  c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
814  Extent_info* ext = ext_ptr.p;
815  key.m_page_no++;
816  if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
817  // no more pages in this extent
818  jam();
819  if (! list.next(ext_ptr)) {
820  // no more extents, scan ends
821  jam();
822  pos.m_get = ScanPos::Get_undef;
823  scan.m_state = ScanOp::Last;
824  return true;
825  } else {
826  // move to next extent
827  jam();
828  pos.m_extent_info_ptr_i = ext_ptr.i;
829  ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
830  key.m_file_no = ext->m_key.m_file_no;
831  key.m_page_no = ext->m_first_page_no;
832  }
833  }
834  key.m_page_idx = first;
835  pos.m_get = ScanPos::Get_page_dd;
836  /*
837  read ahead for scan in disk order
838  do read ahead every 8:th page
839  */
840  if ((bits & ScanOp::SCAN_DD) &&
841  (((key.m_page_no - ext->m_first_page_no) & 7) == 0))
842  {
843  jam();
844  // initialize PGMAN request
846  preq.m_page = pos.m_key;
847  preq.m_callback = TheNULLCallback;
848 
849  // set maximum read ahead
850  Uint32 read_ahead = m_max_page_read_ahead;
851 
852  while (true)
853  {
854  // prepare page read ahead in current extent
855  Uint32 page_no = preq.m_page.m_page_no;
856  Uint32 page_no_limit = page_no + read_ahead;
857  Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
858  if (page_no_limit > limit)
859  {
860  jam();
861  // read ahead crosses extent, set limit for this extent
862  read_ahead = page_no_limit - limit;
863  page_no_limit = limit;
864  // and make sure we only read one extra extent next time around
865  if (read_ahead > alloc.m_extent_size)
866  read_ahead = alloc.m_extent_size;
867  }
868  else
869  {
870  jam();
871  read_ahead = 0; // no more to read ahead after this
872  }
873  // do read ahead pages for this extent
874  while (page_no < page_no_limit)
875  {
876  // page request to PGMAN
877  jam();
878  preq.m_page.m_page_no = page_no;
879  int flags = 0;
880  // ignore result
881  Page_cache_client pgman(this, c_pgman);
882  pgman.get_page(signal, preq, flags);
883  m_pgman_ptr = pgman.m_ptr;
884  jamEntry();
885  page_no++;
886  }
887  if (!read_ahead || !list.next(ext_ptr))
888  {
889  // no more extents after this or read ahead done
890  jam();
891  break;
892  }
893  // move to next extent and initialize PGMAN request accordingly
894  Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
895  preq.m_page.m_file_no = ext->m_key.m_file_no;
896  preq.m_page.m_page_no = ext->m_first_page_no;
897  }
898  } // if ScanOp::SCAN_DD read ahead
899  }
900  /*FALLTHRU*/
901  case ScanPos::Get_page_dd:
902  // get global page in PGMAN cache
903  jam();
904  {
905  // check if page is un-allocated or empty
906  if (likely(! (bits & ScanOp::SCAN_NR)))
907  {
908  D("Tablespace_client - scanNext");
909  Tablespace_client tsman(signal, this, c_tsman,
910  frag.fragTableId,
911  frag.fragmentId,
912  frag.m_tablespace_id);
913  unsigned uncommitted, committed;
914  uncommitted = committed = ~(unsigned)0;
915  int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
916  ndbrequire(ret == 0);
917  if (committed == 0 && uncommitted == 0) {
918  // skip empty page
919  jam();
920  pos.m_get = ScanPos::Get_next_page_dd;
921  break; // incr loop count
922  }
923  }
924  // page request to PGMAN
926  preq.m_page = pos.m_key;
927  preq.m_callback.m_callbackData = scanPtr.i;
928  preq.m_callback.m_callbackFunction =
929  safe_cast(&Dbtup::disk_page_tup_scan_callback);
930  int flags = 0;
931  Page_cache_client pgman(this, c_pgman);
932  int res = pgman.get_page(signal, preq, flags);
933  m_pgman_ptr = pgman.m_ptr;
934  jamEntry();
935  if (res == 0) {
936  jam();
937  // request queued
938  pos.m_get = ScanPos::Get_tuple;
939  return false;
940  }
941  ndbrequire(res > 0);
942  pos.m_page = (Page*)m_pgman_ptr.p;
943  }
944  pos.m_get = ScanPos::Get_tuple;
945  continue;
946  // get tuple
947  // move to next tuple
948  case ScanPos::Get_next_tuple:
949  // move to next fixed size tuple
950  jam();
951  {
952  key.m_page_idx += size;
953  pos.m_get = ScanPos::Get_tuple;
954  }
955  /*FALLTHRU*/
956  case ScanPos::Get_tuple:
957  // get fixed size tuple
958  jam();
959  if ((bits & ScanOp::SCAN_VS) == 0)
960  {
961  Fix_page* page = (Fix_page*)pos.m_page;
962  if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
963  {
964  pos.m_get = ScanPos::Get_next_tuple;
965 #ifdef VM_TRACE
966  if (! (bits & ScanOp::SCAN_DD))
967  {
968  Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
969  ndbassert(pos.m_realpid_mm == realpid);
970  }
971 #endif
972  th = (Tuple_header*)&page->m_data[key.m_page_idx];
973 
974  if (likely(! (bits & ScanOp::SCAN_NR)))
975  {
976  jam();
977  thbits = th->m_header_bits;
978  if (! (thbits & Tuple_header::FREE))
979  {
980  goto found_tuple;
981  }
982  }
983  else
984  {
985  if (pos.m_realpid_mm == RNIL)
986  {
987  jam();
988  foundGCI = 0;
989  goto found_deleted_rowid;
990  }
991  thbits = th->m_header_bits;
992  if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
993  foundGCI == 0)
994  {
995  if (! (thbits & Tuple_header::FREE))
996  {
997  jam();
998  goto found_tuple;
999  }
1000  else
1001  {
1002  goto found_deleted_rowid;
1003  }
1004  }
1005  else if (thbits != Fix_page::FREE_RECORD &&
1006  th->m_operation_ptr_i != RNIL)
1007  {
1008  jam();
1009  goto found_tuple; // Locked tuple...
1010  // skip free tuple
1011  }
1012  }
1013  } else {
1014  jam();
1015  // no more tuples on this page
1016  pos.m_get = ScanPos::Get_next_page;
1017  }
1018  }
1019  else
1020  {
1021  jam();
1022  Var_page * page = (Var_page*)pos.m_page;
1023  if (key.m_page_idx < page->high_index)
1024  {
1025  jam();
1026  pos.m_get = ScanPos::Get_next_tuple;
1027  if (!page->is_free(key.m_page_idx))
1028  {
1029  th = (Tuple_header*)page->get_ptr(key.m_page_idx);
1030  thbits = th->m_header_bits;
1031  goto found_tuple;
1032  }
1033  }
1034  else
1035  {
1036  jam();
1037  // no more tuples on this page
1038  pos.m_get = ScanPos::Get_next_page;
1039  break;
1040  }
1041  }
1042  break; // incr loop count
1043  found_tuple:
1044  // found possible tuple to return
1045  jam();
1046  {
1047  // caller has already set pos.m_get to next tuple
1048  if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
1049  Local_key& key_mm = pos.m_key_mm;
1050  if (! (bits & ScanOp::SCAN_DD)) {
1051  key_mm = pos.m_key;
1052  // real page id is already set
1053  } else {
1054  key_mm.assref(th->m_base_record_ref);
1055  // recompute for each disk tuple
1056  pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
1057  }
1058  // TUPKEYREQ handles savepoint stuff
1059  scan.m_state = ScanOp::Current;
1060  return true;
1061  } else {
1062  jam();
1063  // clear it so that it will show up in next LCP
1064  th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
1065  if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
1066  jam();
1067  setChecksum(th, tablePtr.p);
1068  }
1069  }
1070  }
1071  break;
1072  found_deleted_rowid:
1073  jam();
1074  {
1075  ndbassert(bits & ScanOp::SCAN_NR);
1076  Local_key& key_mm = pos.m_key_mm;
1077  if (! (bits & ScanOp::SCAN_DD)) {
1078  key_mm = pos.m_key;
1079  // caller has already set pos.m_get to next tuple
1080  // real page id is already set
1081  } else {
1082  key_mm.assref(th->m_base_record_ref);
1083  // recompute for each disk tuple
1084  pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
1085 
1086  Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
1087  th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
1088  if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
1089  foundGCI == 0)
1090  {
1091  if (! (thbits & Tuple_header::FREE))
1092  break;
1093  }
1094  }
1095 
1096  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1097  conf->scanPtr = scan.m_userPtr;
1098  conf->accOperationPtr = RNIL;
1099  conf->fragId = frag.fragmentId;
1100  conf->localKey[0] = pos.m_key_mm.m_page_no;
1101  conf->localKey[1] = pos.m_key_mm.m_page_idx;
1102  conf->gci = foundGCI;
1103  Uint32 blockNo = refToMain(scan.m_userRef);
1104  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 6);
1105  jamEntry();
1106 
1107  // TUPKEYREQ handles savepoint stuff
1108  loop_count = 32;
1109  scan.m_state = ScanOp::Next;
1110  return false;
1111  }
1112  break; // incr loop count
1113  default:
1114  ndbrequire(false);
1115  break;
1116  }
1117  if (++loop_count >= 32)
1118  break;
1119  }
1120  // TODO: at drop table we have to flush and terminate these
1121  jam();
1122  signal->theData[0] = ZTUP_SCAN;
1123  signal->theData[1] = scanPtr.i;
1124  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1125  return false;
1126 }
1127 
1128 void
1129 Dbtup::handle_lcp_keep(Signal* signal,
1130  Fragrecord* fragPtrP,
1131  ScanOp* scanPtrP)
1132 {
1133  TablerecPtr tablePtr;
1134  tablePtr.i = scanPtrP->m_tableId;
1135  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
1136 
1137  ndbassert(!fragPtrP->m_lcp_keep_list_head.isNull());
1138  Local_key tmp = fragPtrP->m_lcp_keep_list_head;
1139  Uint32 * copytuple = get_copy_tuple_raw(&tmp);
1140  memcpy(&fragPtrP->m_lcp_keep_list_head,
1141  copytuple+2,
1142  sizeof(Local_key));
1143 
1144  if (fragPtrP->m_lcp_keep_list_head.isNull())
1145  {
1146  jam();
1147  ndbassert(tmp.m_page_no == fragPtrP->m_lcp_keep_list_tail.m_page_no);
1148  ndbassert(tmp.m_page_idx == fragPtrP->m_lcp_keep_list_tail.m_page_idx);
1149  fragPtrP->m_lcp_keep_list_tail.setNull();
1150  }
1151 
1152  Local_key save = tmp;
1153  setCopyTuple(tmp.m_page_no, tmp.m_page_idx);
1154  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1155  conf->scanPtr = scanPtrP->m_userPtr;
1156  conf->accOperationPtr = (Uint32)-1;
1157  conf->fragId = fragPtrP->fragmentId;
1158  conf->localKey[0] = tmp.m_page_no;
1159  conf->localKey[1] = tmp.m_page_idx;
1160  conf->gci = 0;
1161  Uint32 blockNo = refToMain(scanPtrP->m_userRef);
1162  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 6);
1163 
1164  c_undo_buffer.free_copy_tuple(&save);
1165 }
1166 
1167 void
1168 Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
1169 {
1170  bool immediate = scanNext(signal, scanPtr);
1171  if (! immediate) {
1172  jam();
1173  // time-slicing again
1174  return;
1175  }
1176  scanReply(signal, scanPtr);
1177 }
1178 
1179 void
1180 Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
1181 {
1182  ScanOpPtr scanPtr;
1183  c_scanOpPool.getPtr(scanPtr, scanPtrI);
1184  ScanOp& scan = *scanPtr.p;
1185  ScanPos& pos = scan.m_scanPos;
1186  // get cache page
1187  Ptr<GlobalPage> gptr;
1188  m_global_page_pool.getPtr(gptr, page_i);
1189  pos.m_page = (Page*)gptr.p;
1190  // continue
1191  scanCont(signal, scanPtr);
1192 }
1193 
1194 void
1195 Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
1196 {
1197  ScanOp& scan = *scanPtr.p;
1198  ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
1199  // unlock all not unlocked by LQH
1200  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1201  ScanLockPtr lockPtr;
1202  while (list.first(lockPtr)) {
1203  jam();
1204  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1205  lockReq->returnCode = RNIL;
1206  lockReq->requestInfo = AccLockReq::Abort;
1207  lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1208  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1209  jamEntry();
1210  ndbrequire(lockReq->returnCode == AccLockReq::Success);
1211  list.release(lockPtr);
1212  }
1213  // send conf
1214  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1215  conf->scanPtr = scanPtr.p->m_userPtr;
1216  conf->accOperationPtr = RNIL;
1217  conf->fragId = RNIL;
1218  unsigned signalLength = 3;
1219  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
1220  signal, signalLength, JBB);
1221  releaseScanOp(scanPtr);
1222 }
1223 
1224 void
1225 Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
1226 {
1227  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1228  ScanLockPtr lockPtr;
1229 #ifdef VM_TRACE
1230  list.first(lockPtr);
1231  while (lockPtr.i != RNIL) {
1232  ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1233  list.next(lockPtr);
1234  }
1235 #endif
1236  bool ok = list.seize(lockPtr);
1237  ndbrequire(ok);
1238  lockPtr.p->m_accLockOp = accLockOp;
1239 }
1240 
1241 void
1242 Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
1243 {
1244  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1245  ScanLockPtr lockPtr;
1246  list.first(lockPtr);
1247  while (lockPtr.i != RNIL) {
1248  if (lockPtr.p->m_accLockOp == accLockOp) {
1249  jam();
1250  break;
1251  }
1252  list.next(lockPtr);
1253  }
1254  ndbrequire(lockPtr.i != RNIL);
1255  list.release(lockPtr);
1256 }
1257 
1258 void
1259 Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
1260 {
1261  FragrecordPtr fragPtr;
1262  fragPtr.i = scanPtr.p->m_fragPtrI;
1263  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1264 
1265  if(scanPtr.p->m_bits & ScanOp::SCAN_LCP)
1266  {
1267  jam();
1268  fragPtr.p->m_lcp_scan_op = RNIL;
1269  scanPtr.p->m_fragPtrI = RNIL;
1270  }
1271  else
1272  {
1273  jam();
1274  LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);
1275  list.release(scanPtr);
1276  }
1277 }
1278 
1279 void
1280 Dbtup::execLCP_FRAG_ORD(Signal* signal)
1281 {
1282  jamEntry();
1283  LcpFragOrd* req= (LcpFragOrd*)signal->getDataPtr();
1284 
1285  TablerecPtr tablePtr;
1286  tablePtr.i = req->tableId;
1287  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
1288 
1289  FragrecordPtr fragPtr;
1290  Uint32 fragId = req->fragmentId;
1291  fragPtr.i = RNIL;
1292  getFragmentrec(fragPtr, fragId, tablePtr.p);
1293  ndbrequire(fragPtr.i != RNIL);
1294  Fragrecord& frag = *fragPtr.p;
1295 
1296  ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
1297  frag.m_lcp_scan_op = c_lcp_scan_op;
1298  ScanOpPtr scanPtr;
1299  c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
1300  ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
1301  new (scanPtr.p) ScanOp;
1302  scanPtr.p->m_fragPtrI = fragPtr.i;
1303  scanPtr.p->m_state = ScanOp::First;
1304 
1305  ndbassert(frag.m_lcp_keep_list_head.isNull());
1306  ndbassert(frag.m_lcp_keep_list_tail.isNull());
1307 }