MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DbtuxScan.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #define DBTUX_SCAN_CPP
19 #include "Dbtux.hpp"
20 #include <my_sys.h>
21 
22 /*
23  * Error handling: Any seized scan op is released. ACC_SCANREF is sent
24  * to LQH. LQH sets error code, and treats this like ZEMPTY_FRAGMENT.
25  * Therefore scan is now closed on both sides.
26  */
27 void
28 Dbtux::execACC_SCANREQ(Signal* signal)
29 {
30  jamEntry();
31  const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
32  const AccScanReq* const req = &reqCopy;
33  Uint32 errorCode = 0;
34  ScanOpPtr scanPtr;
35  scanPtr.i = RNIL;
36  do {
37  // get the index
38  IndexPtr indexPtr;
39  c_indexPool.getPtr(indexPtr, req->tableId);
40  // get the fragment
41  FragPtr fragPtr;
42  findFrag(*indexPtr.p, req->fragmentNo, fragPtr);
43  ndbrequire(fragPtr.i != RNIL);
44  Frag& frag = *fragPtr.p;
45  // check for index not Online (i.e. Dropping)
46  if (unlikely(indexPtr.p->m_state != Index::Online)) {
47  jam();
48 #ifdef VM_TRACE
49  if (debugFlags & (DebugMeta | DebugScan)) {
50  debugOut << "Index dropping at ACC_SCANREQ " << indexPtr.i << " " << *indexPtr.p << endl;
51  }
52 #endif
53  errorCode = AccScanRef::TuxIndexNotOnline;
54  break;
55  }
56  // must be normal DIH/TC fragment
57  TreeHead& tree = frag.m_tree;
58  // check for empty fragment
59  if (tree.m_root == NullTupLoc) {
60  jam();
61  AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
62  conf->scanPtr = req->senderData;
63  conf->accPtr = RNIL;
64  conf->flag = AccScanConf::ZEMPTY_FRAGMENT;
65  sendSignal(req->senderRef, GSN_ACC_SCANCONF,
66  signal, AccScanConf::SignalLength, JBB);
67  return;
68  }
69  // seize from pool and link to per-fragment list
70  if (ERROR_INSERTED(12008) ||
71  ! frag.m_scanList.seize(scanPtr)) {
72  CLEAR_ERROR_INSERT_VALUE;
73  jam();
74  // should never happen but can be used to test error handling
75  errorCode = AccScanRef::TuxNoFreeScanOp;
76  break;
77  }
78  new (scanPtr.p) ScanOp;
79  scanPtr.p->m_state = ScanOp::First;
80  scanPtr.p->m_userPtr = req->senderData;
81  scanPtr.p->m_userRef = req->senderRef;
82  scanPtr.p->m_tableId = indexPtr.p->m_tableId;
83  scanPtr.p->m_indexId = indexPtr.i;
84  scanPtr.p->m_fragId = fragPtr.p->m_fragId;
85  scanPtr.p->m_fragPtrI = fragPtr.i;
86  scanPtr.p->m_transId1 = req->transId1;
87  scanPtr.p->m_transId2 = req->transId2;
88  scanPtr.p->m_savePointId = req->savePointId;
89  scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
90  scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
91  scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo);
92  /*
93  * readCommitted lockMode keyInfo
94  * 1 0 0 - read committed (no lock)
95  * 0 0 0 - read latest (read lock)
96  * 0 1 1 - read exclusive (write lock)
97  */
98  const bool isStatScan = AccScanReq::getStatScanFlag(req->requestInfo);
99  if (unlikely(isStatScan)) {
100  jam();
101  if (!scanPtr.p->m_readCommitted) {
102  jam();
103  errorCode = AccScanRef::TuxInvalidLockMode;
104  break;
105  }
106  StatOpPtr statPtr;
107  if (!c_statOpPool.seize(statPtr)) {
108  jam();
109  errorCode = AccScanRef::TuxNoFreeStatOp;
110  break;
111  }
112  scanPtr.p->m_statOpPtrI = statPtr.i;
113  new (statPtr.p) StatOp(*indexPtr.p);
114  statPtr.p->m_scanOpPtrI = scanPtr.i;
115  // rest of StatOp is initialized in execTUX_BOUND_INFO
116 #ifdef VM_TRACE
117  if (debugFlags & DebugStat) {
118  debugOut << "Seize stat op" << endl;
119  }
120 #endif
121  }
122 #ifdef VM_TRACE
123  if (debugFlags & DebugScan) {
124  debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
125  }
126 #endif
127  // conf
128  AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
129  conf->scanPtr = req->senderData;
130  conf->accPtr = scanPtr.i;
131  conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
132  sendSignal(req->senderRef, GSN_ACC_SCANCONF,
133  signal, AccScanConf::SignalLength, JBB);
134  return;
135  } while (0);
136  if (scanPtr.i != RNIL) {
137  jam();
138  releaseScanOp(scanPtr);
139  }
140  // ref
141  ndbrequire(errorCode != 0);
142  AccScanRef* ref = (AccScanRef*)signal->getDataPtrSend();
143  ref->scanPtr = req->senderData;
144  ref->accPtr = RNIL;
145  ref->errorCode = errorCode;
146  sendSignal(req->senderRef, GSN_ACC_SCANREF,
147  signal, AccScanRef::SignalLength, JBB);
148 }
149 
150 /*
151  * Receive bounds for scan in single direct call. The bounds can arrive
152  * in any order. Attribute ids are those of index table.
153  *
154  * Replace EQ by equivalent LE + GE. Check for conflicting bounds.
155  * Check that sets of lower and upper bounds are on initial sequences of
156  * keys and that all but possibly last bound is non-strict.
157  *
158  * Finally convert the sets of lower and upper bounds (i.e. start key
159  * and end key) to NdbPack format. The data is saved in segmented
160  * memory. The bound is reconstructed at use time via unpackBound().
161  *
162  * Error handling: Error code is set in the scan and also returned in
163  * EXECUTE_DIRECT (the old way).
164  */
165 void
166 Dbtux::execTUX_BOUND_INFO(Signal* signal)
167 {
168  jamEntry();
169  // get records
170  TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
171  ScanOpPtr scanPtr;
172  scanPtr.i = req->tuxScanPtrI;
173  c_scanOpPool.getPtr(scanPtr);
174  ScanOp& scan = *scanPtr.p;
175  const Index& index = *c_indexPool.getPtr(scan.m_indexId);
176  const DescHead& descHead = getDescHead(index);
177  const KeyType* keyTypes = getKeyTypes(descHead);
178  // data passed in Signal
179  const Uint32* const boundData = &req->data[0];
180  Uint32 boundLen = req->boundAiLength;
181  Uint32 boundOffset = 0;
182  // initialize stats scan
183  if (unlikely(scan.m_statOpPtrI != RNIL)) {
184  // stats options before bounds
185  StatOpPtr statPtr;
186  statPtr.i = scan.m_statOpPtrI;
187  c_statOpPool.getPtr(statPtr);
188  Uint32 usedLen = 0;
189  if (statScanInit(statPtr, boundData, boundLen, &usedLen) == -1) {
190  jam();
191  ndbrequire(scan.m_errorCode != 0);
192  req->errorCode = scan.m_errorCode;
193  return;
194  }
195  ndbrequire(usedLen <= boundLen);
196  boundLen -= usedLen;
197  boundOffset += usedLen;
198  }
199  // extract lower and upper bound in separate passes
200  for (unsigned idir = 0; idir <= 1; idir++) {
201  jam();
202  struct BoundInfo {
203  int type2; // with EQ -> LE/GE
204  Uint32 offset; // word offset in signal data
205  Uint32 bytes;
206  };
207  BoundInfo boundInfo[MaxIndexAttributes];
208  // largest attrId seen plus one
209  Uint32 maxAttrId = 0;
210  const Uint32* const data = &boundData[boundOffset];
211  Uint32 offset = 0;
212  while (offset + 2 <= boundLen) {
213  jam();
214  const Uint32 type = data[offset];
215  const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
216  const Uint32 attrId = ah->getAttributeId();
217  const Uint32 byteSize = ah->getByteSize();
218  const Uint32 dataSize = ah->getDataSize();
219  // check type
220  if (unlikely(type > 4)) {
221  jam();
222  scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
223  req->errorCode = scan.m_errorCode;
224  return;
225  }
226  Uint32 type2 = type;
227  if (type2 == 4) {
228  jam();
229  type2 = (idir << 1); // LE=0 GE=2
230  }
231  // check if attribute belongs to this bound
232  if ((type2 & 0x2) == (idir << 1)) {
233  if (unlikely(attrId >= index.m_numAttrs)) {
234  jam();
235  scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
236  req->errorCode = scan.m_errorCode;
237  return;
238  }
239  // mark entries in any gap as undefined
240  while (maxAttrId <= attrId) {
241  jam();
242  BoundInfo& b = boundInfo[maxAttrId];
243  b.type2 = -1;
244  maxAttrId++;
245  }
246  BoundInfo& b = boundInfo[attrId];
247  // duplicate no longer allowed (wl#4163)
248  if (unlikely(b.type2 != -1)) {
249  jam();
250  scan.m_errorCode = TuxBoundInfo::InvalidBounds;
251  req->errorCode = scan.m_errorCode;
252  return;
253  }
254  b.type2 = (int)type2;
255  b.offset = offset + 1; // poai
256  b.bytes = byteSize;
257  }
258  // jump to next
259  offset += 2 + dataSize;
260  }
261  if (unlikely(offset != boundLen)) {
262  jam();
263  scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
264  req->errorCode = scan.m_errorCode;
265  return;
266  }
267  // check and pack the bound data
268  KeyData searchBoundData(index.m_keySpec, true, 0);
269  KeyBound searchBound(searchBoundData);
270  searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
271  int strict = 0; // 0 or 1
272  Uint32 i;
273  for (i = 0; i < maxAttrId; i++) {
274  jam();
275  const BoundInfo& b = boundInfo[i];
276  // check for gap or strict bound before last
277  strict = (b.type2 & 0x1);
278  if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
279  jam();
280  scan.m_errorCode = TuxBoundInfo::InvalidBounds;
281  req->errorCode = scan.m_errorCode;
282  return;
283  }
284  Uint32 len;
285  if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
286  b.bytes != len)) {
287  jam();
288  scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
289  req->errorCode = scan.m_errorCode;
290  return;
291  }
292  }
293  int side = 0;
294  if (maxAttrId != 0) {
295  // arithmetic is faster
296  // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
297  side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
298  }
299  if (unlikely(searchBound.finalize(side) == -1)) {
300  jam();
301  scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
302  req->errorCode = scan.m_errorCode;
303  return;
304  }
305  ScanBound& scanBound = scan.m_scanBound[idir];
306  scanBound.m_cnt = maxAttrId;
307  scanBound.m_side = side;
308  // save data words in segmented memory
309  {
310  DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
311  LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
312  const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
313  Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
314  bool ok = b.append(data, size);
315  if (unlikely(!ok)) {
316  jam();
317  scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
318  req->errorCode = scan.m_errorCode;
319  return;
320  }
321  }
322  }
323  if (ERROR_INSERTED(12009)) {
324  jam();
325  CLEAR_ERROR_INSERT_VALUE;
326  scan.m_errorCode = TuxBoundInfo::InvalidBounds;
327  req->errorCode = scan.m_errorCode;
328  return;
329  }
330  // no error
331  req->errorCode = 0;
332 }
333 
334 void
335 Dbtux::execNEXT_SCANREQ(Signal* signal)
336 {
337  jamEntry();
338  const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
339  const NextScanReq* const req = &reqCopy;
340  ScanOpPtr scanPtr;
341  scanPtr.i = req->accPtr;
342  c_scanOpPool.getPtr(scanPtr);
343  ScanOp& scan = *scanPtr.p;
344  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
345 #ifdef VM_TRACE
346  if (debugFlags & DebugScan) {
347  debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
348  }
349 #endif
350  // handle unlock previous and close scan
351  switch (req->scanFlag) {
352  case NextScanReq::ZSCAN_NEXT:
353  jam();
354  break;
355  case NextScanReq::ZSCAN_NEXT_COMMIT:
356  jam();
357  case NextScanReq::ZSCAN_COMMIT:
358  jam();
359  if (! scan.m_readCommitted) {
360  jam();
361  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
362  lockReq->returnCode = RNIL;
363  lockReq->requestInfo = AccLockReq::Unlock;
364  lockReq->accOpPtr = req->accOperationPtr;
365  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
366  jamEntry();
367  ndbrequire(lockReq->returnCode == AccLockReq::Success);
368  removeAccLockOp(scanPtr, req->accOperationPtr);
369  }
370  if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
371  jam();
372  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
373  conf->scanPtr = scan.m_userPtr;
374  unsigned signalLength = 1;
375  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
376  signal, signalLength, JBB);
377  return;
378  }
379  break;
380  case NextScanReq::ZSCAN_CLOSE:
381  jam();
382  // unlink from tree node first to avoid state changes
383  if (scan.m_scanPos.m_loc != NullTupLoc) {
384  jam();
385  const TupLoc loc = scan.m_scanPos.m_loc;
386  NodeHandle node(frag);
387  selectNode(node, loc);
388  unlinkScan(node, scanPtr);
389  scan.m_scanPos.m_loc = NullTupLoc;
390  }
391  if (scan.m_lockwait) {
392  jam();
393  ndbrequire(scan.m_accLockOp != RNIL);
394  // use ACC_ABORTCONF to flush out any reply in job buffer
395  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
396  lockReq->returnCode = RNIL;
397  lockReq->requestInfo = AccLockReq::AbortWithConf;
398  lockReq->accOpPtr = scan.m_accLockOp;
399  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
400  AccLockReq::UndoSignalLength);
401  jamEntry();
402  ndbrequire(lockReq->returnCode == AccLockReq::Success);
403  scan.m_state = ScanOp::Aborting;
404  return;
405  }
406  if (scan.m_state == ScanOp::Locked) {
407  jam();
408  ndbrequire(scan.m_accLockOp != RNIL);
409  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
410  lockReq->returnCode = RNIL;
411  lockReq->requestInfo = AccLockReq::Abort;
412  lockReq->accOpPtr = scan.m_accLockOp;
413  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
414  AccLockReq::UndoSignalLength);
415  jamEntry();
416  ndbrequire(lockReq->returnCode == AccLockReq::Success);
417  scan.m_accLockOp = RNIL;
418  }
419  scan.m_state = ScanOp::Aborting;
420  scanClose(signal, scanPtr);
421  return;
422  case NextScanReq::ZSCAN_NEXT_ABORT:
423  jam();
424  default:
425  jam();
426  ndbrequire(false);
427  break;
428  }
429  // start looking for next scan result
430  AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
431  checkReq->accPtr = scanPtr.i;
432  checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
433  EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
434  jamEntry();
435 }
436 
437 void
438 Dbtux::execACC_CHECK_SCAN(Signal* signal)
439 {
440  jamEntry();
441  const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
442  const AccCheckScan* const req = &reqCopy;
443  ScanOpPtr scanPtr;
444  scanPtr.i = req->accPtr;
445  c_scanOpPool.getPtr(scanPtr);
446  ScanOp& scan = *scanPtr.p;
447  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
448 #ifdef VM_TRACE
449  if (debugFlags & DebugScan) {
450  debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl;
451  }
452 #endif
453  if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
454  jam();
455  signal->theData[0] = scan.m_userPtr;
456  signal->theData[1] = true;
457  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
458  jamEntry();
459  return; // stop
460  }
461  if (scan.m_lockwait) {
462  jam();
463  // LQH asks if we are waiting for lock and we tell it to ask again
464  const TreeEnt ent = scan.m_scanEnt;
465  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
466  conf->scanPtr = scan.m_userPtr;
467  conf->accOperationPtr = RNIL; // no tuple returned
468  conf->fragId = frag.m_fragId;
469  unsigned signalLength = 3;
470  // if TC has ordered scan close, it will be detected here
471  sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
472  signal, signalLength, JBB);
473  return; // stop
474  }
475  // check index online
476  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
477  if (unlikely(index.m_state != Index::Online) &&
478  scanPtr.p->m_errorCode == 0) {
479  jam();
480 #ifdef VM_TRACE
481  if (debugFlags & (DebugMeta | DebugScan)) {
482  debugOut << "Index dropping at execACC_CHECK_SCAN " << scanPtr.i << " " << *scanPtr.p << endl;
483  }
484 #endif
485  scanPtr.p->m_errorCode = AccScanRef::TuxIndexNotOnline;
486  }
487  if (scan.m_state == ScanOp::First) {
488  jam();
489  // search is done only once in single range scan
490  scanFirst(scanPtr);
491  }
492  if (scan.m_state == ScanOp::Current ||
493  scan.m_state == ScanOp::Next) {
494  jam();
495  // look for next
496  scanFind(scanPtr);
497  }
498  // for reading tuple key in Found or Locked state
499  Uint32* pkData = c_ctx.c_dataBuffer;
500  unsigned pkSize = 0; // indicates not yet done
501  if (scan.m_state == ScanOp::Found) {
502  // found an entry to return
503  jam();
504  ndbrequire(scan.m_accLockOp == RNIL);
505  if (! scan.m_readCommitted) {
506  jam();
507  const TreeEnt ent = scan.m_scanEnt;
508  // read tuple key
509  readTablePk(frag, ent, pkData, pkSize);
510  // get read lock or exclusive lock
511  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
512  lockReq->returnCode = RNIL;
513  lockReq->requestInfo =
514  scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive;
515  lockReq->accOpPtr = RNIL;
516  lockReq->userPtr = scanPtr.i;
517  lockReq->userRef = reference();
518  lockReq->tableId = scan.m_tableId;
519  lockReq->fragId = frag.m_fragId;
520  lockReq->fragPtrI = frag.m_accTableFragPtrI;
521  const Uint32* const buf32 = static_cast<Uint32*>(pkData);
522  const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
523  lockReq->hashValue = md5_hash(buf64, pkSize);
524  Uint32 lkey1, lkey2;
525  getTupAddr(frag, ent, lkey1, lkey2);
526  lockReq->page_id = lkey1;
527  lockReq->page_idx = lkey2;
528  lockReq->transId1 = scan.m_transId1;
529  lockReq->transId2 = scan.m_transId2;
530  // execute
531  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength);
532  jamEntry();
533  switch (lockReq->returnCode) {
534  case AccLockReq::Success:
535  jam();
536  scan.m_state = ScanOp::Locked;
537  scan.m_accLockOp = lockReq->accOpPtr;
538 #ifdef VM_TRACE
539  if (debugFlags & (DebugScan | DebugLock)) {
540  debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
541  }
542 #endif
543  break;
544  case AccLockReq::IsBlocked:
545  jam();
546  // normal lock wait
547  scan.m_state = ScanOp::Blocked;
548  scan.m_lockwait = true;
549  scan.m_accLockOp = lockReq->accOpPtr;
550 #ifdef VM_TRACE
551  if (debugFlags & (DebugScan | DebugLock)) {
552  debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
553  }
554 #endif
555  // LQH will wake us up
556  signal->theData[0] = scan.m_userPtr;
557  signal->theData[1] = true;
558  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
559  jamEntry();
560  return; // stop
561  break;
562  case AccLockReq::Refused:
563  jam();
564  // we cannot see deleted tuple (assert only)
565  ndbassert(false);
566  // skip it
567  scan.m_state = ScanOp::Next;
568  signal->theData[0] = scan.m_userPtr;
569  signal->theData[1] = true;
570  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
571  jamEntry();
572  return; // stop
573  break;
574  case AccLockReq::NoFreeOp:
575  jam();
576  // max ops should depend on max scans (assert only)
577  ndbassert(false);
578  // stay in Found state
579  scan.m_state = ScanOp::Found;
580  signal->theData[0] = scan.m_userPtr;
581  signal->theData[1] = true;
582  EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
583  jamEntry();
584  return; // stop
585  break;
586  default:
587  ndbrequire(false);
588  break;
589  }
590  } else {
591  scan.m_state = ScanOp::Locked;
592  }
593  }
594  if (scan.m_state == ScanOp::Locked) {
595  // we have lock or do not need one
596  jam();
597  // read keys if not already done (uses signal)
598  const TreeEnt ent = scan.m_scanEnt;
599  // conf signal
600  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
601  conf->scanPtr = scan.m_userPtr;
602  // the lock is passed to LQH
603  Uint32 accLockOp = scan.m_accLockOp;
604  if (accLockOp != RNIL) {
605  scan.m_accLockOp = RNIL;
606  // remember it until LQH unlocks it
607  addAccLockOp(scanPtr, accLockOp);
608  } else {
609  ndbrequire(scan.m_readCommitted);
610  // operation RNIL in LQH would signal no tuple returned
611  accLockOp = (Uint32)-1;
612  }
613  conf->accOperationPtr = accLockOp;
614  conf->fragId = frag.m_fragId;
615  Uint32 lkey1, lkey2;
616  getTupAddr(frag, ent, lkey1, lkey2);
617  conf->localKey[0] = lkey1;
618  conf->localKey[1] = lkey2;
619  unsigned signalLength = 5;
620  // add key info
621  if (! scan.m_readCommitted) {
622  sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
623  signal, signalLength, JBB);
624  } else {
625  Uint32 blockNo = refToMain(scan.m_userRef);
626  EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
627  }
628  // next time look for next entry
629  scan.m_state = ScanOp::Next;
630  return;
631  }
632  // XXX in ACC this is checked before req->checkLcpStop
633  if (scan.m_state == ScanOp::Last) {
634  jam();
635  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
636  conf->scanPtr = scan.m_userPtr;
637  conf->accOperationPtr = RNIL;
638  conf->fragId = RNIL;
639  unsigned signalLength = 3;
640  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
641  signal, signalLength, JBB);
642  return;
643  }
644  ndbrequire(false);
645 }
646 
647 /*
648  * Lock succeeded (after delay) in ACC. If the lock is for current
649  * entry, set state to Locked. If the lock is for an entry we were
650  * moved away from, simply unlock it. Finally, if we are closing the
651  * scan, do nothing since we have already sent an abort request.
652  */
653 void
654 Dbtux::execACCKEYCONF(Signal* signal)
655 {
656  jamEntry();
657  ScanOpPtr scanPtr;
658  scanPtr.i = signal->theData[0];
659  c_scanOpPool.getPtr(scanPtr);
660  ScanOp& scan = *scanPtr.p;
661 #ifdef VM_TRACE
662  if (debugFlags & (DebugScan | DebugLock)) {
663  debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
664  }
665 #endif
666  ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
667  scan.m_lockwait = false;
668  if (scan.m_state == ScanOp::Blocked) {
669  // the lock wait was for current entry
670  jam();
671  scan.m_state = ScanOp::Locked;
672  // LQH has the ball
673  return;
674  }
675  if (scan.m_state != ScanOp::Aborting) {
676  // we were moved, release lock
677  jam();
678  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
679  lockReq->returnCode = RNIL;
680  lockReq->requestInfo = AccLockReq::Abort;
681  lockReq->accOpPtr = scan.m_accLockOp;
682  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
683  jamEntry();
684  ndbrequire(lockReq->returnCode == AccLockReq::Success);
685  scan.m_accLockOp = RNIL;
686  // LQH has the ball
687  return;
688  }
689  // lose the lock
690  scan.m_accLockOp = RNIL;
691  // continue at ACC_ABORTCONF
692 }
693 
694 /*
695  * Lock failed (after delay) in ACC. Probably means somebody ahead of
696  * us in lock queue deleted the tuple.
697  */
698 void
699 Dbtux::execACCKEYREF(Signal* signal)
700 {
701  jamEntry();
702  ScanOpPtr scanPtr;
703  scanPtr.i = signal->theData[0];
704  c_scanOpPool.getPtr(scanPtr);
705  ScanOp& scan = *scanPtr.p;
706 #ifdef VM_TRACE
707  if (debugFlags & (DebugScan | DebugLock)) {
708  debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
709  }
710 #endif
711  ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
712  scan.m_lockwait = false;
713  if (scan.m_state != ScanOp::Aborting) {
714  jam();
715  // release the operation
716  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
717  lockReq->returnCode = RNIL;
718  lockReq->requestInfo = AccLockReq::Abort;
719  lockReq->accOpPtr = scan.m_accLockOp;
720  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
721  jamEntry();
722  ndbrequire(lockReq->returnCode == AccLockReq::Success);
723  scan.m_accLockOp = RNIL;
724  // scan position should already have been moved (assert only)
725  if (scan.m_state == ScanOp::Blocked) {
726  jam();
727  // can happen when Dropping
728 #ifdef VM_TRACE
729  const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
730  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
731  ndbassert(index.m_state != Index::Online);
732 #endif
733  scan.m_state = ScanOp::Next;
734  }
735  // LQH has the ball
736  return;
737  }
738  // lose the lock
739  scan.m_accLockOp = RNIL;
740  // continue at ACC_ABORTCONF
741 }
742 
743 /*
744  * Received when scan is closing. This signal arrives after any
745  * ACCKEYCON or ACCKEYREF which may have been in job buffer.
746  */
747 void
748 Dbtux::execACC_ABORTCONF(Signal* signal)
749 {
750  jamEntry();
751  ScanOpPtr scanPtr;
752  scanPtr.i = signal->theData[0];
753  c_scanOpPool.getPtr(scanPtr);
754  ScanOp& scan = *scanPtr.p;
755 #ifdef VM_TRACE
756  if (debugFlags & (DebugScan | DebugLock)) {
757  debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
758  }
759 #endif
760  ndbrequire(scan.m_state == ScanOp::Aborting);
761  // most likely we are still in lock wait
762  if (scan.m_lockwait) {
763  jam();
764  scan.m_lockwait = false;
765  scan.m_accLockOp = RNIL;
766  }
767  scanClose(signal, scanPtr);
768 }
769 
770 /*
771  * Find start position for single range scan.
772  */
773 void
774 Dbtux::scanFirst(ScanOpPtr scanPtr)
775 {
776  ScanOp& scan = *scanPtr.p;
777  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
778  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
779 #ifdef VM_TRACE
780  if (debugFlags & DebugScan) {
781  debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
782  }
783 #endif
784  // scan direction 0, 1
785  const unsigned idir = scan.m_descending;
786  // set up bound from segmented memory
787  const ScanBound& scanBound = scan.m_scanBound[idir];
788  KeyDataC searchBoundData(index.m_keySpec, true);
789  KeyBoundC searchBound(searchBoundData);
790  unpackBound(c_ctx, scanBound, searchBound);
791  TreePos treePos;
792  searchToScan(frag, idir, searchBound, treePos);
793  if (treePos.m_loc != NullTupLoc) {
794  scan.m_scanPos = treePos;
795  // link the scan to node found
796  NodeHandle node(frag);
797  selectNode(node, treePos.m_loc);
798  linkScan(node, scanPtr);
799  if (treePos.m_dir == 3) {
800  jam();
801  // check upper bound
802  TreeEnt ent = node.getEnt(treePos.m_pos);
803  if (scanCheck(scanPtr, ent)) {
804  jam();
805  scan.m_state = ScanOp::Current;
806  } else {
807  jam();
808  scan.m_state = ScanOp::Last;
809  }
810  } else {
811  jam();
812  scan.m_state = ScanOp::Next;
813  }
814  } else {
815  jam();
816  scan.m_state = ScanOp::Last;
817  }
818 #ifdef VM_TRACE
819  if (debugFlags & DebugScan) {
820  debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
821  }
822 #endif
823 }
824 
825 /*
826  * Look for entry to return as scan result.
827  */
828 void
829 Dbtux::scanFind(ScanOpPtr scanPtr)
830 {
831  ScanOp& scan = *scanPtr.p;
832  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
833 #ifdef VM_TRACE
834  if (debugFlags & DebugScan) {
835  debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
836  }
837 #endif
838  ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
839  while (1) {
840  jam();
841  if (scan.m_state == ScanOp::Next)
842  scanNext(scanPtr, false);
843  if (scan.m_state == ScanOp::Current) {
844  jam();
845  const TreePos pos = scan.m_scanPos;
846  NodeHandle node(frag);
847  selectNode(node, pos.m_loc);
848  const TreeEnt ent = node.getEnt(pos.m_pos);
849  if (unlikely(scan.m_statOpPtrI != RNIL)) {
850  StatOpPtr statPtr;
851  statPtr.i = scan.m_statOpPtrI;
852  c_statOpPool.getPtr(statPtr);
853  // report row to stats, returns true if a sample is available
854  int ret = statScanAddRow(statPtr, ent);
855  if (ret == 1) {
856  jam();
857  scan.m_state = ScanOp::Found;
858  // may not access non-pseudo cols but must return valid ent
859  scan.m_scanEnt = ent;
860  break;
861  }
862  } else if (scanVisible(scanPtr, ent)) {
863  jam();
864  scan.m_state = ScanOp::Found;
865  scan.m_scanEnt = ent;
866  break;
867  }
868  } else {
869  jam();
870  break;
871  }
872  scan.m_state = ScanOp::Next;
873  }
874 #ifdef VM_TRACE
875  if (debugFlags & DebugScan) {
876  debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
877  }
878 #endif
879 }
880 
881 /*
882  * Move to next entry. The scan is already linked to some node. When
883  * we leave, if an entry was found, it will be linked to a possibly
884  * different node. The scan has a position, and a direction which tells
885  * from where we came to this position. This is one of (all comments
886  * are in terms of ascending scan):
887  *
888  * 0 - up from left child (scan this node next)
889  * 1 - up from right child (proceed to parent)
890  * 2 - up from root (the scan ends)
891  * 3 - left to right within node (at end set state 5)
892  * 4 - down from parent (proceed to left child)
893  * 5 - at node end proceed to right child (state becomes 4)
894  *
895  * If an entry was found, scan direction is 3. Therefore tree
896  * re-organizations need not worry about scan direction.
897  *
898  * This method is also used to move a scan when its entry is removed
899  * (see moveScanList). If the scan is Blocked, we check if it remains
900  * Blocked on a different version of the tuple. Otherwise the tuple is
901  * lost and state becomes Current.
902  */
903 void
904 Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
905 {
906  ScanOp& scan = *scanPtr.p;
907  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
908 #ifdef VM_TRACE
909  if (debugFlags & (DebugMaint | DebugScan)) {
910  debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
911  }
912 #endif
913  // cannot be moved away from tuple we have locked
914  ndbrequire(scan.m_state != ScanOp::Locked);
915  // scan direction
916  const unsigned idir = scan.m_descending; // 0, 1
917  const int jdir = 1 - 2 * (int)idir; // 1, -1
918  // use copy of position
919  TreePos pos = scan.m_scanPos;
920  // get and remember original node
921  NodeHandle origNode(frag);
922  selectNode(origNode, pos.m_loc);
923  ndbrequire(islinkScan(origNode, scanPtr));
924  // current node in loop
925  NodeHandle node = origNode;
926  // copy of entry found
927  TreeEnt ent;
928  while (true) {
929  jam();
930 #ifdef VM_TRACE
931  if (debugFlags & (DebugMaint | DebugScan)) {
932  debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
933  }
934 #endif
935  if (pos.m_dir == 2) {
936  // coming up from root ends the scan
937  jam();
938  pos.m_loc = NullTupLoc;
939  break;
940  }
941  if (node.m_loc != pos.m_loc) {
942  jam();
943  selectNode(node, pos.m_loc);
944  }
945  if (pos.m_dir == 4) {
946  // coming down from parent proceed to left child
947  jam();
948  TupLoc loc = node.getLink(idir);
949  if (loc != NullTupLoc) {
950  jam();
951  pos.m_loc = loc;
952  pos.m_dir = 4; // unchanged
953  continue;
954  }
955  // pretend we came from left child
956  pos.m_dir = idir;
957  }
958  if (pos.m_dir == 5) {
959  // at node end proceed to right child
960  jam();
961  TupLoc loc = node.getLink(1 - idir);
962  if (loc != NullTupLoc) {
963  jam();
964  pos.m_loc = loc;
965  pos.m_dir = 4; // down from parent as usual
966  continue;
967  }
968  // pretend we came from right child
969  pos.m_dir = 1 - idir;
970  }
971  const unsigned occup = node.getOccup();
972  if (occup == 0) {
973  jam();
974  ndbrequire(fromMaintReq);
975  // move back to parent - see comment in treeRemoveInner
976  pos.m_loc = node.getLink(2);
977  pos.m_dir = node.getSide();
978  continue;
979  }
980  if (pos.m_dir == idir) {
981  // coming up from left child scan current node
982  jam();
983  pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
984  pos.m_dir = 3;
985  }
986  if (pos.m_dir == 3) {
987  // before or within node
988  jam();
989  // advance position - becomes ZNIL (> occup) if 0 and descending
990  pos.m_pos += jdir;
991  if (pos.m_pos < occup) {
992  jam();
993  pos.m_dir = 3; // unchanged
994  ent = node.getEnt(pos.m_pos);
995  if (! scanCheck(scanPtr, ent)) {
996  jam();
997  pos.m_loc = NullTupLoc;
998  }
999  break;
1000  }
1001  // after node proceed to right child
1002  pos.m_dir = 5;
1003  continue;
1004  }
1005  if (pos.m_dir == 1 - idir) {
1006  // coming up from right child proceed to parent
1007  jam();
1008  pos.m_loc = node.getLink(2);
1009  pos.m_dir = node.getSide();
1010  continue;
1011  }
1012  ndbrequire(false);
1013  }
1014  // copy back position
1015  scan.m_scanPos = pos;
1016  // relink
1017  if (pos.m_loc != NullTupLoc) {
1018  ndbrequire(pos.m_dir == 3);
1019  ndbrequire(pos.m_loc == node.m_loc);
1020  if (origNode.m_loc != node.m_loc) {
1021  jam();
1022  unlinkScan(origNode, scanPtr);
1023  linkScan(node, scanPtr);
1024  }
1025  if (scan.m_state != ScanOp::Blocked) {
1026  scan.m_state = ScanOp::Current;
1027  } else {
1028  jam();
1029  ndbrequire(fromMaintReq);
1030  TreeEnt& scanEnt = scan.m_scanEnt;
1031  ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
1032  if (scanEnt.eqtuple(ent)) {
1033  // remains blocked on another version
1034  scanEnt = ent;
1035  } else {
1036  jam();
1037  scanEnt.m_tupLoc = NullTupLoc;
1038  scan.m_state = ScanOp::Current;
1039  }
1040  }
1041  } else {
1042  jam();
1043  unlinkScan(origNode, scanPtr);
1044  scan.m_state = ScanOp::Last;
1045  }
1046 #ifdef VM_TRACE
1047  if (debugFlags & (DebugMaint | DebugScan)) {
1048  debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
1049  }
1050 #endif
1051 }
1052 
1053 /*
1054  * Check end key. Return true if scan is still within range.
1055  *
1056  * Error handling: If scan error code has been set, return false at
1057  * once. This terminates the scan and also avoids kernel crash on
1058  * invalid data.
1059  */
1060 bool
1061 Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
1062 {
1063  ScanOp& scan = *scanPtr.p;
1064  if (unlikely(scan.m_errorCode != 0)) {
1065  jam();
1066  return false;
1067  }
1068  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1069  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
1070  const unsigned idir = scan.m_descending;
1071  const int jdir = 1 - 2 * (int)idir;
1072  const ScanBound& scanBound = scan.m_scanBound[1 - idir];
1073  int ret = 0;
1074  if (scanBound.m_cnt != 0) {
1075  jam();
1076  // set up bound from segmented memory
1077  KeyDataC searchBoundData(index.m_keySpec, true);
1078  KeyBoundC searchBound(searchBoundData);
1079  unpackBound(c_ctx, scanBound, searchBound);
1080  // key data for the entry
1081  KeyData entryKey(index.m_keySpec, true, 0);
1082  entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
1083  readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
1084  // compare bound to key
1085  const Uint32 boundCount = searchBound.get_data().get_cnt();
1086  ret = cmpSearchBound(c_ctx, searchBound, entryKey, boundCount);
1087  ndbrequire(ret != 0);
1088  ret = (-1) * ret; // reverse for key vs bound
1089  ret = jdir * ret; // reverse for descending scan
1090  }
1091 #ifdef VM_TRACE
1092  if (debugFlags & DebugScan) {
1093  debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
1094  }
1095 #endif
1096  return (ret <= 0);
1097 }
1098 
1099 /*
1100  * Check if an entry is visible to the scan.
1101  *
1102  * There is a special check to never accept same tuple twice in a row.
1103  * This is faster than asking TUP. It also fixes some special cases
1104  * which are not analyzed or handled yet.
1105  *
1106  * Error handling: If scan error code has been set, return false since
1107  * no new result can be returned to LQH. The scan will then look for
1108  * next result and terminate via scanCheck():
1109  */
1110 bool
1111 Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
1112 {
1113  const ScanOp& scan = *scanPtr.p;
1114  if (unlikely(scan.m_errorCode != 0)) {
1115  jam();
1116  return false;
1117  }
1118  const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1119  Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
1120  Uint32 pageId = ent.m_tupLoc.getPageId();
1121  Uint32 pageOffset = ent.m_tupLoc.getPageOffset();
1122  Uint32 tupVersion = ent.m_tupVersion;
1123  // check for same tuple twice in row
1124  if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc)
1125  {
1126  jam();
1127  return false;
1128  }
1129  Uint32 transId1 = scan.m_transId1;
1130  Uint32 transId2 = scan.m_transId2;
1131  bool dirty = scan.m_readCommitted;
1132  Uint32 savePointId = scan.m_savePointId;
1133  bool ret = c_tup->tuxQueryTh(tableFragPtrI, pageId, pageOffset, tupVersion, transId1, transId2, dirty, savePointId);
1134  jamEntry();
1135  return ret;
1136 }
1137 
1138 /*
1139  * Finish closing of scan and send conf. Any lock wait has been done
1140  * already.
1141  *
1142  * Error handling: Every scan ends here. If error code has been set,
1143  * send a REF.
1144  */
1145 void
1146 Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
1147 {
1148  ScanOp& scan = *scanPtr.p;
1149  ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
1150  // unlock all not unlocked by LQH
1151  if (! scan.m_accLockOps.isEmpty()) {
1152  jam();
1153  abortAccLockOps(signal, scanPtr);
1154  }
1155  if (scanPtr.p->m_errorCode == 0) {
1156  jam();
1157  // send conf
1158  NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1159  conf->scanPtr = scanPtr.p->m_userPtr;
1160  conf->accOperationPtr = RNIL;
1161  conf->fragId = RNIL;
1162  unsigned signalLength = 3;
1163  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
1164  signal, signalLength, JBB);
1165  } else {
1166  // send ref
1167  NextScanRef* ref = (NextScanRef*)signal->getDataPtr();
1168  ref->scanPtr = scanPtr.p->m_userPtr;
1169  ref->accOperationPtr = RNIL;
1170  ref->fragId = RNIL;
1171  ref->errorCode = scanPtr.p->m_errorCode;
1172  sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANREF,
1173  signal, NextScanRef::SignalLength, JBB);
1174  }
1175  releaseScanOp(scanPtr);
1176 }
1177 
1178 void
1179 Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr)
1180 {
1181  ScanOp& scan = *scanPtr.p;
1182 #ifdef VM_TRACE
1183  if (debugFlags & (DebugScan | DebugLock)) {
1184  debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl;
1185  }
1186 #endif
1187  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1188  ScanLockPtr lockPtr;
1189  while (list.first(lockPtr)) {
1190  jam();
1191  AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1192  lockReq->returnCode = RNIL;
1193  lockReq->requestInfo = AccLockReq::Abort;
1194  lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1195  EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1196  jamEntry();
1197  ndbrequire(lockReq->returnCode == AccLockReq::Success);
1198  list.release(lockPtr);
1199  }
1200 }
1201 
1202 void
1203 Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1204 {
1205  ScanOp& scan = *scanPtr.p;
1206 #ifdef VM_TRACE
1207  if (debugFlags & (DebugScan | DebugLock)) {
1208  debugOut << "Add lock " << hex << accLockOp << dec
1209  << " to scan " << scanPtr.i << " " << scan << endl;
1210  }
1211 #endif
1212  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1213  ScanLockPtr lockPtr;
1214 #ifdef VM_TRACE
1215  list.first(lockPtr);
1216  while (lockPtr.i != RNIL) {
1217  ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1218  list.next(lockPtr);
1219  }
1220 #endif
1221  bool ok = list.seize(lockPtr);
1222  ndbrequire(ok);
1223  ndbrequire(accLockOp != RNIL);
1224  lockPtr.p->m_accLockOp = accLockOp;
1225 }
1226 
1227 void
1228 Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1229 {
1230  ScanOp& scan = *scanPtr.p;
1231 #ifdef VM_TRACE
1232  if (debugFlags & (DebugScan | DebugLock)) {
1233  debugOut << "Remove lock " << hex << accLockOp << dec
1234  << " from scan " << scanPtr.i << " " << scan << endl;
1235  }
1236 #endif
1237  LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1238  ScanLockPtr lockPtr;
1239  list.first(lockPtr);
1240  while (lockPtr.i != RNIL) {
1241  if (lockPtr.p->m_accLockOp == accLockOp) {
1242  jam();
1243  break;
1244  }
1245  list.next(lockPtr);
1246  }
1247  ndbrequire(lockPtr.i != RNIL);
1248  list.release(lockPtr);
1249 }
1250 
1251 /*
1252  * Release allocated records.
1253  */
1254 void
1255 Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
1256 {
1257 #ifdef VM_TRACE
1258  if (debugFlags & DebugScan) {
1259  debugOut << "Release scan " << scanPtr.i << " " << *scanPtr.p << endl;
1260  }
1261 #endif
1262  Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
1263  for (unsigned i = 0; i <= 1; i++) {
1264  ScanBound& scanBound = scanPtr.p->m_scanBound[i];
1265  DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
1266  LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
1267  b.release();
1268  }
1269  if (unlikely(scanPtr.p->m_statOpPtrI != RNIL)) {
1270  jam();
1271  StatOpPtr statPtr;
1272  statPtr.i = scanPtr.p->m_statOpPtrI;
1273  c_statOpPool.getPtr(statPtr);
1274  c_statOpPool.release(statPtr);
1275  }
1276  // unlink from per-fragment list and release from pool
1277  frag.m_scanList.release(scanPtr);
1278 }