MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
testOIBasic.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 /*
19  * testOIBasic - ordered index test
20  *
21  * dummy push to re-created mysql-5.1-telco ...
22  */
23 
24 #include <ndb_global.h>
25 
26 #include <NdbMain.h>
27 #include <NdbOut.hpp>
28 #include <NdbApi.hpp>
29 #include <NdbTest.hpp>
30 #include <NdbMutex.h>
31 #include <NdbCondition.h>
32 #include <NdbThread.h>
33 #include <NdbTick.h>
34 #include <NdbSleep.h>
35 #include <my_sys.h>
36 #include <NdbSqlUtil.hpp>
37 #include <ndb_version.h>
38 
39 // options
40 
41 struct Opt {
42  // common options
43  uint m_batch;
44  const char* m_bound;
45  const char* m_case;
46  bool m_collsp;
47  bool m_cont;
48  bool m_core;
49  const char* m_csname;
50  CHARSET_INFO* m_cs;
51  int m_die;
52  bool m_dups;
54  const char* m_index;
55  uint m_loop;
56  uint m_mrrmaxrng;
57  bool m_msglock;
58  bool m_nologging;
59  bool m_noverify;
60  uint m_pctmrr;
61  uint m_pctnull;
62  uint m_rows;
63  uint m_samples;
64  uint m_scanbatch;
65  uint m_scanpar;
66  uint m_scanstop;
67  int m_seed;
68  const char* m_skip;
69  uint m_sloop;
70  uint m_ssloop;
71  const char* m_table;
72  uint m_threads;
73  int m_v; // int for lint
74  Opt() :
75  m_batch(32),
76  m_bound("01234"),
77  m_case(0),
78  m_collsp(false),
79  m_cont(false),
80  m_core(false),
81  m_csname("random"),
82  m_cs(0),
83  m_die(0),
84  m_dups(false),
85  m_fragtype(NdbDictionary::Object::FragUndefined),
86  m_index(0),
87  m_loop(1),
88  m_mrrmaxrng(10),
89  m_msglock(true),
90  m_nologging(false),
91  m_noverify(false),
92  m_pctmrr(50),
93  m_pctnull(10),
94  m_rows(1000),
95  m_samples(0),
96  m_scanbatch(0),
97  m_scanpar(0),
98  m_scanstop(0),
99  m_seed(-1),
100  m_skip(0),
101  m_sloop(4),
102  m_ssloop(4),
103  m_table(0),
104  m_threads(4),
105  m_v(1) {
106  }
107 };
108 
109 static Opt g_opt;
110 
111 static void printcases();
112 static void printtables();
113 
114 static void
115 printhelp()
116 {
117  Opt d;
118  ndbout
119  << "usage: testOIbasic [options]" << endl
120  << " -batch N pk operations in batch [" << d.m_batch << "]" << endl
121  << " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
122  << " -case abc only given test cases (letters a-z)" << endl
123  << " -collsp use strnncollsp instead of strnxfrm" << endl
124  << " -cont on error continue to next test case [" << d.m_cont << "]" << endl
125  << " -core core dump on error [" << d.m_core << "]" << endl
126  << " -csname S charset or collation [" << d.m_csname << "]" << endl
127  << " -die nnn exit immediately on NDB error code nnn" << endl
128  << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
129  << " -fragtype T fragment type single/small/medium/large" << endl
130  << " -index xyz only given index numbers (digits 0-9)" << endl
131  << " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl
132  << " -mrrmaxrng N max ranges to supply for MRR scan [" << d.m_mrrmaxrng << "]" << endl
133  << " -nologging create tables in no-logging mode" << endl
134  << " -noverify skip index verifications" << endl
135  << " -pctmrr N pct of index scans to use MRR [" << d.m_pctmrr << "]" << endl
136  << " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
137  << " -rows N rows per thread [" << d.m_rows << "]" << endl
138  << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
139  << " -scanbatch N scan batch 0=default [" << d.m_scanbatch << "]" << endl
140  << " -scanpar N scan parallel 0=default [" << d.m_scanpar << "]" << endl
141  << " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
142  << " -skip abc skip given test cases (letters a-z)" << endl
143  << " -sloop N level 2 (sub)loop count [" << d.m_sloop << "]" << endl
144  << " -ssloop N level 3 (sub)loop count [" << d.m_ssloop << "]" << endl
145  << " -table xyz only given table numbers (digits 0-9)" << endl
146  << " -threads N number of threads [" << d.m_threads << "]" << endl
147  << " -vN verbosity [" << d.m_v << "]" << endl
148  << " -h or -help print this help text" << endl
149  ;
150  printcases();
151  printtables();
152 }
153 
154 // not yet configurable
155 static const bool g_store_null_key = true;
156 
157 // compare NULL like normal value (NULL < not NULL, NULL == NULL)
158 static const bool g_compare_null = true;
159 
160 static const char* hexstr = "0123456789abcdef";
161 
162 // random ints
163 #ifdef NDB_WIN
164 #define random() rand()
165 #define srandom(SEED) srand(SEED)
166 #endif
167 
168 static uint
169 urandom(uint n)
170 {
171  if (n == 0)
172  return 0;
173  uint i = random() % n;
174  return i;
175 }
176 
177 static int
178 irandom(uint n)
179 {
180  if (n == 0)
181  return 0;
182  int i = random() % n;
183  if (random() & 0x1)
184  i = -i;
185  return i;
186 }
187 
188 static bool
189 randompct(uint pct)
190 {
191  if (pct == 0)
192  return false;
193  if (pct >= 100)
194  return true;
195  return urandom(100) < pct;
196 }
197 
198 static uint
199 random_coprime(uint n)
200 {
201  uint prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
202  uint count = sizeof(prime) / sizeof(prime[0]);
203  if (n == 0)
204  return 0;
205  while (1) {
206  uint i = urandom(count);
207  if (n % prime[i] != 0)
208  return prime[i];
209  }
210 }
211 
212 // random re-sequence of 0...(n-1)
213 
214 struct Rsq {
215  Rsq(uint n);
216  uint next();
217 private:
218  uint m_n;
219  uint m_i;
220  uint m_start;
221  uint m_prime;
222 };
223 
224 Rsq::Rsq(uint n)
225 {
226  m_n = n;
227  m_i = 0;
228  m_start = urandom(n);
229  m_prime = random_coprime(n);
230 }
231 
232 uint
233 Rsq::next()
234 {
235  assert(m_n != 0);
236  return (m_start + m_i++ * m_prime) % m_n;
237 }
238 
239 // log and error macros
240 
241 static NdbMutex *ndbout_mutex = NULL;
242 static const char* getthrprefix();
243 
244 #define LLN(n, s) \
245  do { \
246  if ((n) > g_opt.m_v) break; \
247  if (g_opt.m_msglock) NdbMutex_Lock(ndbout_mutex); \
248  ndbout << getthrprefix(); \
249  if ((n) > 2) \
250  ndbout << "line " << __LINE__ << ": "; \
251  ndbout << s << endl; \
252  if (g_opt.m_msglock) NdbMutex_Unlock(ndbout_mutex); \
253  } while(0)
254 
255 #define LL0(s) LLN(0, s)
256 #define LL1(s) LLN(1, s)
257 #define LL2(s) LLN(2, s)
258 #define LL3(s) LLN(3, s)
259 #define LL4(s) LLN(4, s)
260 #define LL5(s) LLN(5, s)
261 
262 #define HEX(x) hex << (x) << dec
263 
264 // following check a condition and return -1 on failure
265 
266 #undef CHK // simple check
267 #undef CHKTRY // check with action on fail
268 #undef CHKCON // print NDB API errors on failure
269 
270 #define CHK(x) CHKTRY(x, ;)
271 
272 #define CHKTRY(x, act) \
273  do { \
274  if (x) break; \
275  LL0("line " << __LINE__ << ": " << #x << " failed"); \
276  if (g_opt.m_core) abort(); \
277  act; \
278  return -1; \
279  } while (0)
280 
281 #define CHKCON(x, con) \
282  do { \
283  if (x) break; \
284  LL0("line " << __LINE__ << ": " << #x << " failed"); \
285  (con).printerror(ndbout); \
286  if (g_opt.m_core) abort(); \
287  return -1; \
288  } while (0)
289 
290 // method parameters
291 
292 struct Thr;
293 struct Con;
294 struct Tab;
295 struct ITab;
296 struct Set;
297 struct Tmr;
298 
299 struct Par : public Opt {
300  uint m_no;
301  Con* m_con;
302  Con& con() const { assert(m_con != 0); return *m_con; }
303  const Tab* m_tab;
304  const Tab& tab() const { assert(m_tab != 0); return *m_tab; }
305  const ITab* m_itab;
306  const ITab& itab() const { assert(m_itab != 0); return *m_itab; }
307  Set* m_set;
308  Set& set() const { assert(m_set != 0); return *m_set; }
309  Tmr* m_tmr;
310  Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
311  char m_currcase[2];
312  uint m_lno;
313  uint m_slno;
314  uint m_totrows;
315  // value calculation
316  uint m_range;
317  uint m_pctrange;
318  uint m_pctbrange;
319  int m_bdir;
320  bool m_noindexkeyupdate;
321  // choice of key
322  bool m_randomkey;
323  // do verify after read
324  bool m_verify;
325  // errors to catch (see Con)
326  uint m_catcherr;
327  // abort percentage
328  uint m_abortpct;
329  NdbOperation::LockMode m_lockmode;
330  // scan options
331  bool m_tupscan;
332  bool m_ordered;
333  bool m_descending;
334  bool m_multiRange;
335  // threads used by current test case
336  uint m_usedthreads;
337  Par(const Opt& opt) :
338  Opt(opt),
339  m_no(0),
340  m_con(0),
341  m_tab(0),
342  m_itab(0),
343  m_set(0),
344  m_tmr(0),
345  m_lno(0),
346  m_slno(0),
347  m_totrows(0),
348  m_range(m_rows),
349  m_pctrange(40),
350  m_pctbrange(80),
351  m_bdir(0),
352  m_noindexkeyupdate(false),
353  m_randomkey(false),
354  m_verify(false),
355  m_catcherr(0),
356  m_abortpct(0),
357  m_lockmode(NdbOperation::LM_Read),
358  m_tupscan(false),
359  m_ordered(false),
360  m_descending(false),
361  m_multiRange(false),
362  m_usedthreads(0)
363  {
364  m_currcase[0] = 0;
365  }
366 };
367 
368 static bool
369 usetable(Par par, uint i)
370 {
371  return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
372 }
373 
374 static bool
375 useindex(Par par, uint i)
376 {
377  return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
378 }
379 
380 static uint
381 thrrow(Par par, uint j)
382 {
383  return par.m_usedthreads * j + par.m_no;
384 }
385 
386 #if 0
387 static bool
388 isthrrow(Par par, uint i)
389 {
390  return i % par.m_usedthreads == par.m_no;
391 }
392 #endif
393 
394 // timer
395 
396 struct Tmr {
397  void clr();
398  void on();
399  void off(uint cnt = 0);
400  const char* time();
401  const char* pct(const Tmr& t1);
402  const char* over(const Tmr& t1);
403  NDB_TICKS m_on;
404  NDB_TICKS m_ms;
405  uint m_cnt;
406  char m_time[100];
407  char m_text[100];
408  Tmr() { clr(); }
409 };
410 
411 void
412 Tmr::clr()
413 {
414  m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
415 }
416 
417 void
418 Tmr::on()
419 {
420  assert(m_on == 0);
421  m_on = NdbTick_CurrentMillisecond();
422 }
423 
424 void
425 Tmr::off(uint cnt)
426 {
427  NDB_TICKS off = NdbTick_CurrentMillisecond();
428  assert(m_on != 0 && off >= m_on);
429  m_ms += off - m_on;
430  m_cnt += cnt;
431  m_on = 0;
432 }
433 
434 const char*
435 Tmr::time()
436 {
437  if (m_cnt == 0) {
438  sprintf(m_time, "%u ms", (unsigned)m_ms);
439  } else {
440  sprintf(m_time, "%u ms per %u ( %u ms per 1000 )", (unsigned)m_ms, m_cnt, (unsigned)((1000 * m_ms) / m_cnt));
441  }
442  return m_time;
443 }
444 
445 const char*
446 Tmr::pct(const Tmr& t1)
447 {
448  if (0 < t1.m_ms) {
449  sprintf(m_text, "%u pct", (unsigned)((100 * m_ms) / t1.m_ms));
450  } else {
451  sprintf(m_text, "[cannot measure]");
452  }
453  return m_text;
454 }
455 
456 const char*
457 Tmr::over(const Tmr& t1)
458 {
459  if (0 < t1.m_ms) {
460  if (t1.m_ms <= m_ms)
461  sprintf(m_text, "%u pct", (unsigned)((100 * (m_ms - t1.m_ms)) / t1.m_ms));
462  else
463  sprintf(m_text, "-%u pct", (unsigned)((100 * (t1.m_ms - m_ms)) / t1.m_ms));
464  } else {
465  sprintf(m_text, "[cannot measure]");
466  }
467  return m_text;
468 }
469 
470 // character sets
471 
472 static const uint maxcsnumber = 512;
473 static const uint maxcharcount = 32;
474 static const uint maxcharsize = 4;
475 static const uint maxxmulsize = 8;
476 
477 // single mb char
478 struct Chr {
479  uchar m_bytes[maxcharsize];
480  uchar m_xbytes[maxxmulsize * maxcharsize];
481  uint m_size;
482  Chr();
483 };
484 
485 Chr::Chr()
486 {
487  memset(m_bytes, 0, sizeof(m_bytes));
488  memset(m_xbytes, 0, sizeof(m_xbytes));
489  m_size = 0;
490 }
491 
492 // charset and random valid chars to use
493 struct Chs {
494  CHARSET_INFO* m_cs;
495  uint m_xmul;
496  Chr* m_chr;
497  Chs(CHARSET_INFO* cs);
498  ~Chs();
499 };
500 
501 static NdbOut&
502 operator<<(NdbOut& out, const Chs& chs);
503 
504 Chs::Chs(CHARSET_INFO* cs) :
505  m_cs(cs)
506 {
507  m_xmul = m_cs->strxfrm_multiply;
508  if (m_xmul == 0)
509  m_xmul = 1;
510  assert(m_xmul <= maxxmulsize);
511  m_chr = new Chr [maxcharcount];
512  uint i = 0;
513  uint miss1 = 0;
514  uint miss2 = 0;
515  uint miss3 = 0;
516  uint miss4 = 0;
517  while (i < maxcharcount) {
518  uchar* bytes = m_chr[i].m_bytes;
519  uchar* xbytes = m_chr[i].m_xbytes;
520  uint& size = m_chr[i].m_size;
521  bool ok;
522  size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
523  assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
524  // prefer longer chars
525  if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
526  continue;
527  for (uint j = 0; j < size; j++) {
528  bytes[j] = urandom(256);
529  }
530  int not_used;
531  // check wellformed
532  const char* sbytes = (const char*)bytes;
533  if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1, &not_used) != size) {
534  miss1++;
535  continue;
536  }
537  // check no proper prefix wellformed
538  ok = true;
539  for (uint j = 1; j < size; j++) {
540  if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1, &not_used) == j) {
541  ok = false;
542  break;
543  }
544  }
545  if (!ok) {
546  miss2++;
547  continue;
548  }
549  // normalize
550  memset(xbytes, 0, sizeof(xbytes));
551  // currently returns buffer size always
552  int xlen = NdbSqlUtil::ndb_strnxfrm(cs, xbytes, m_xmul * size, bytes, size);
553  // check we got something
554  ok = false;
555  for (uint j = 0; j < (uint)xlen; j++) {
556  if (xbytes[j] != 0) {
557  ok = true;
558  break;
559  }
560  }
561  if (!ok) {
562  miss3++;
563  continue;
564  }
565  // check for duplicate (before normalize)
566  ok = true;
567  for (uint j = 0; j < i; j++) {
568  const Chr& chr = m_chr[j];
569  if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
570  ok = false;
571  break;
572  }
573  }
574  if (!ok) {
575  miss4++;
576  continue;
577  }
578  i++;
579  }
580  bool disorder = true;
581  uint bubbles = 0;
582  while (disorder) {
583  disorder = false;
584  for (uint i = 1; i < maxcharcount; i++) {
585  uint len = sizeof(m_chr[i].m_xbytes);
586  if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
587  Chr chr = m_chr[i];
588  m_chr[i] = m_chr[i-1];
589  m_chr[i-1] = chr;
590  disorder = true;
591  bubbles++;
592  }
593  }
594  }
595  LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles);
596 }
597 
598 Chs::~Chs()
599 {
600  delete [] m_chr;
601 }
602 
603 static NdbOut&
604 operator<<(NdbOut& out, const Chs& chs)
605 {
606  CHARSET_INFO* cs = chs.m_cs;
607  out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]";
608  return out;
609 }
610 
611 static Chs* cslist[maxcsnumber];
612 
613 static void
614 resetcslist()
615 {
616  for (uint i = 0; i < maxcsnumber; i++) {
617  delete cslist[i];
618  cslist[i] = 0;
619  }
620 }
621 
622 static Chs*
623 getcs(Par par)
624 {
625  CHARSET_INFO* cs;
626  if (par.m_cs != 0) {
627  cs = par.m_cs;
628  } else {
629  while (1) {
630  uint n = urandom(maxcsnumber);
631  cs = get_charset(n, MYF(0));
632  if (cs != 0) {
633  // avoid dodgy internal character sets
634  // see bug# 37554
635  if (cs->state & MY_CS_HIDDEN)
636  continue;
637  // prefer complex charsets
638  if (cs->mbmaxlen != 1 || urandom(5) == 0)
639  break;
640  }
641  }
642  }
643  if (cslist[cs->number] == 0)
644  cslist[cs->number] = new Chs(cs);
645  return cslist[cs->number];
646 }
647 
648 // tables and indexes
649 
650 // Col - table column
651 
652 struct Col {
653  enum Type {
658  };
659  const struct Tab& m_tab;
660  uint m_num;
661  const char* m_name;
662  bool m_pk;
663  Type m_type;
664  uint m_length;
665  uint m_bytelength; // multiplied by char width
666  uint m_attrsize; // base type size
667  uint m_headsize; // length bytes
668  uint m_bytesize; // full value size
669  bool m_nullable;
670  const Chs* m_chs;
671  Col(const struct Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs);
672  ~Col();
673  bool equal(const Col& col2) const;
674  void wellformed(const void* addr) const;
675 };
676 
677 Col::Col(const struct Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs) :
678  m_tab(tab),
679  m_num(num),
680  m_name(strcpy(new char [strlen(name) + 1], name)),
681  m_pk(pk),
682  m_type(type),
683  m_length(length),
684  m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
685  m_attrsize(
686  type == Unsigned ? sizeof(Uint32) :
687  type == Char ? sizeof(char) :
688  type == Varchar ? sizeof(char) :
689  type == Longvarchar ? sizeof(char) : ~0),
690  m_headsize(
691  type == Unsigned ? 0 :
692  type == Char ? 0 :
693  type == Varchar ? 1 :
694  type == Longvarchar ? 2 : ~0),
695  m_bytesize(m_headsize + m_attrsize * m_bytelength),
696  m_nullable(nullable),
697  m_chs(chs)
698 {
699  // fix long varchar
700  if (type == Varchar && m_bytelength > 255) {
701  m_type = Longvarchar;
702  m_headsize += 1;
703  m_bytesize += 1;
704  }
705 }
706 
707 Col::~Col()
708 {
709  delete [] m_name;
710 }
711 
712 bool
713 Col::equal(const Col& col2) const
714 {
715  return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs;
716 }
717 
718 void
719 Col::wellformed(const void* addr) const
720 {
721  switch (m_type) {
722  case Col::Unsigned:
723  break;
724  case Col::Char:
725  {
726  CHARSET_INFO* cs = m_chs->m_cs;
727  const char* src = (const char*)addr;
728  uint len = m_bytelength;
729  int not_used;
730  (void)not_used; /* squash warning when assert is #defined to nothing */
731  assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff, &not_used) == len);
732  }
733  break;
734  case Col::Varchar:
735  {
736  CHARSET_INFO* cs = m_chs->m_cs;
737  const uchar* src = (const uchar*)addr;
738  const char* ssrc = (const char*)src;
739  uint len = src[0];
740  int not_used;
741  (void)not_used; /* squash warning when assert is #defined to nothing */
742  assert(len <= m_bytelength);
743  assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff, &not_used) == len);
744  }
745  break;
746  case Col::Longvarchar:
747  {
748  CHARSET_INFO* cs = m_chs->m_cs;
749  const uchar* src = (const uchar*)addr;
750  const char* ssrc = (const char*)src;
751  uint len = src[0] + (src[1] << 8);
752  int not_used;
753  (void)not_used; /* squash warning when assert is #defined to nothing */
754  assert(len <= m_bytelength);
755  assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff, &not_used) == len);
756  }
757  break;
758  default:
759  assert(false);
760  break;
761  }
762 }
763 
764 static NdbOut&
765 operator<<(NdbOut& out, const Col& col)
766 {
767  out << "col[" << col.m_num << "] " << col.m_name;
768  switch (col.m_type) {
769  case Col::Unsigned:
770  out << " uint";
771  break;
772  case Col::Char:
773  {
774  CHARSET_INFO* cs = col.m_chs->m_cs;
775  out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
776  }
777  break;
778  case Col::Varchar:
779  {
780  CHARSET_INFO* cs = col.m_chs->m_cs;
781  out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
782  }
783  break;
784  case Col::Longvarchar:
785  {
786  CHARSET_INFO* cs = col.m_chs->m_cs;
787  out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
788  }
789  break;
790  default:
791  out << "type" << (int)col.m_type;
792  assert(false);
793  break;
794  }
795  out << (col.m_pk ? " pk" : "");
796  out << (col.m_nullable ? " nullable" : "");
797  return out;
798 }
799 
800 // ICol - index column
801 
802 struct ICol {
803  const struct ITab& m_itab;
804  uint m_num;
805  const Col& m_col;
806  ICol(const struct ITab& itab, uint num, const Col& col);
807  ~ICol();
808 };
809 
810 ICol::ICol(const struct ITab& itab, uint num, const Col& col) :
811  m_itab(itab),
812  m_num(num),
813  m_col(col)
814 {
815 }
816 
817 ICol::~ICol()
818 {
819 }
820 
821 static NdbOut&
822 operator<<(NdbOut& out, const ICol& icol)
823 {
824  out << "icol[" << icol.m_num << "] " << icol.m_col;
825  return out;
826 }
827 
828 // ITab - index
829 
830 struct ITab {
831  enum Type {
832  OrderedIndex = NdbDictionary::Index::OrderedIndex,
833  UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex
834  };
835  const struct Tab& m_tab;
836  const char* m_name;
837  Type m_type;
838  uint m_icols;
839  const ICol** m_icol;
840  uint m_keymask;
841  ITab(const struct Tab& tab, const char* name, Type type, uint icols);
842  ~ITab();
843  void icoladd(uint k, const ICol* icolptr);
844 };
845 
846 ITab::ITab(const struct Tab& tab, const char* name, Type type, uint icols) :
847  m_tab(tab),
848  m_name(strcpy(new char [strlen(name) + 1], name)),
849  m_type(type),
850  m_icols(icols),
851  m_icol(new const ICol* [icols + 1]),
852  m_keymask(0)
853 {
854  for (uint k = 0; k <= m_icols; k++)
855  m_icol[k] = 0;
856 }
857 
858 ITab::~ITab()
859 {
860  delete [] m_name;
861  for (uint i = 0; i < m_icols; i++)
862  delete m_icol[i];
863  delete [] m_icol;
864 }
865 
866 void
867 ITab::icoladd(uint k, const ICol* icolptr)
868 {
869  assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
870  m_icol[k] = icolptr;
871  m_keymask |= (1 << icolptr->m_col.m_num);
872 }
873 
874 static NdbOut&
875 operator<<(NdbOut& out, const ITab& itab)
876 {
877  out << "itab " << itab.m_name << " icols=" << itab.m_icols;
878  for (uint k = 0; k < itab.m_icols; k++) {
879  const ICol& icol = *itab.m_icol[k];
880  out << endl << icol;
881  }
882  return out;
883 }
884 
885 // Tab - table
886 
887 struct Tab {
888  const char* m_name;
889  uint m_cols;
890  const Col** m_col;
891  uint m_pkmask;
892  uint m_itabs;
893  const ITab** m_itab;
894  uint m_orderedindexes;
895  uint m_hashindexes;
896  // pk must contain an Unsigned column
897  uint m_keycol;
898  void coladd(uint k, Col* colptr);
899  void itabadd(uint j, ITab* itab);
900  Tab(const char* name, uint cols, uint itabs, uint keycol);
901  ~Tab();
902 };
903 
904 Tab::Tab(const char* name, uint cols, uint itabs, uint keycol) :
905  m_name(strcpy(new char [strlen(name) + 1], name)),
906  m_cols(cols),
907  m_col(new const Col* [cols + 1]),
908  m_pkmask(0),
909  m_itabs(itabs),
910  m_itab(new const ITab* [itabs + 1]),
911  m_orderedindexes(0),
912  m_hashindexes(0),
913  m_keycol(keycol)
914 {
915  for (uint k = 0; k <= cols; k++)
916  m_col[k] = 0;
917  for (uint j = 0; j <= itabs; j++)
918  m_itab[j] = 0;
919 }
920 
921 Tab::~Tab()
922 {
923  delete [] m_name;
924  for (uint i = 0; i < m_cols; i++)
925  delete m_col[i];
926  delete [] m_col;
927  for (uint i = 0; i < m_itabs; i++)
928  delete m_itab[i];
929  delete [] m_itab;
930 }
931 
932 void
933 Tab::coladd(uint k, Col* colptr)
934 {
935  assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
936  m_col[k] = colptr;
937  if (colptr->m_pk)
938  m_pkmask |= (1 << k);
939 }
940 
941 void
942 Tab::itabadd(uint j, ITab* itabptr)
943 {
944  assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
945  m_itab[j] = itabptr;
946  if (itabptr->m_type == ITab::OrderedIndex)
947  m_orderedindexes++;
948  else
949  m_hashindexes++;
950 }
951 
952 static NdbOut&
953 operator<<(NdbOut& out, const Tab& tab)
954 {
955  out << "tab " << tab.m_name << " cols=" << tab.m_cols;
956  for (uint k = 0; k < tab.m_cols; k++) {
957  const Col& col = *tab.m_col[k];
958  out << endl << col;
959  }
960  for (uint i = 0; i < tab.m_itabs; i++) {
961  if (tab.m_itab[i] == 0)
962  continue;
963  const ITab& itab = *tab.m_itab[i];
964  out << endl << itab;
965  }
966  return out;
967 }
968 
969 // make table structs
970 
971 static const Tab** tablist = 0;
972 static uint tabcount = 0;
973 
974 static void
975 verifytables()
976 {
977  for (uint j = 0; j < tabcount; j++) {
978  const Tab* t = tablist[j];
979  if (t == 0)
980  continue;
981  assert(t->m_cols != 0 && t->m_col != 0);
982  for (uint k = 0; k < t->m_cols; k++) {
983  const Col* c = t->m_col[k];
984  assert(c != 0 && c->m_num == k);
985  assert(!(c->m_pk && c->m_nullable));
986  }
987  assert(t->m_col[t->m_cols] == 0);
988  {
989  assert(t->m_keycol < t->m_cols);
990  const Col* c = t->m_col[t->m_keycol];
991  assert(c->m_pk && c->m_type == Col::Unsigned);
992  }
993  assert(t->m_itabs != 0 && t->m_itab != 0);
994  for (uint i = 0; i < t->m_itabs; i++) {
995  const ITab* x = t->m_itab[i];
996  if (x == 0)
997  continue;
998  assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
999  for (uint k = 0; k < x->m_icols; k++) {
1000  const ICol* c = x->m_icol[k];
1001  assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
1002  if (x->m_type == ITab::UniqueHashIndex) {
1003  assert(!c->m_col.m_nullable);
1004  }
1005  }
1006  }
1007  assert(t->m_itab[t->m_itabs] == 0);
1008  }
1009 }
1010 
1011 static void
1012 makebuiltintables(Par par)
1013 {
1014  LL2("makebuiltintables");
1015  resetcslist();
1016  tabcount = 3;
1017  if (tablist == 0) {
1018  tablist = new const Tab* [tabcount];
1019  for (uint j = 0; j < tabcount; j++) {
1020  tablist[j] = 0;
1021  }
1022  } else {
1023  for (uint j = 0; j < tabcount; j++) {
1024  delete tablist[j];
1025  tablist[j] = 0;
1026  }
1027  }
1028  // ti0 - basic
1029  if (usetable(par, 0)) {
1030  Tab* t = new Tab("ti0", 5, 7, 0);
1031  // name - pk - type - length - nullable - cs
1032  t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0));
1033  t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0));
1034  t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0));
1035  t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0));
1036  t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0));
1037  if (useindex(par, 0)) {
1038  // a
1039  ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1);
1040  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1041  t->itabadd(0, x);
1042  }
1043  if (useindex(par, 1)) {
1044  // b
1045  ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1);
1046  x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1047  t->itabadd(1, x);
1048  }
1049  if (useindex(par, 2)) {
1050  // b, c
1051  ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2);
1052  x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1053  x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1054  t->itabadd(2, x);
1055  }
1056  if (useindex(par, 3)) {
1057  // b, e, c, d
1058  ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4);
1059  x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1060  x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
1061  x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1062  x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
1063  t->itabadd(3, x);
1064  }
1065  if (useindex(par, 4)) {
1066  // a, c
1067  ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2);
1068  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1069  x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1070  t->itabadd(4, x);
1071  }
1072  if (useindex(par, 5)) {
1073  // a, e
1074  ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
1075  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1076  x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
1077  t->itabadd(5, x);
1078  }
1079  tablist[0] = t;
1080  }
1081  // ti1 - simple char fields
1082  if (usetable(par, 1)) {
1083  Tab* t = new Tab("ti1", 5, 7, 1);
1084  // name - pk - type - length - nullable - cs
1085  t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
1086  t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
1087  t->coladd(2, new Col(*t, 2, "c", 0, Col::Varchar, 20, 0, getcs(par)));
1088  t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
1089  t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par)));
1090  if (useindex(par, 0)) {
1091  // b
1092  ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
1093  x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1094  t->itabadd(0, x);
1095  }
1096  if (useindex(par, 1)) {
1097  // c, a
1098  ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
1099  x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
1100  x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
1101  t->itabadd(1, x);
1102  }
1103  if (useindex(par, 2)) {
1104  // d
1105  ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1);
1106  x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
1107  t->itabadd(2, x);
1108  }
1109  if (useindex(par, 3)) {
1110  // e, d, c, b
1111  ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4);
1112  x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
1113  x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
1114  x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1115  x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
1116  t->itabadd(3, x);
1117  }
1118  if (useindex(par, 4)) {
1119  // a, b
1120  ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2);
1121  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1122  x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
1123  t->itabadd(4, x);
1124  }
1125  if (useindex(par, 5)) {
1126  // b, c, d
1127  ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3);
1128  x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1129  x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1130  x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1131  t->itabadd(5, x);
1132  }
1133  tablist[1] = t;
1134  }
1135  // ti2 - complex char fields
1136  if (usetable(par, 2)) {
1137  Tab* t = new Tab("ti2", 5, 7, 2);
1138  // name - pk - type - length - nullable - cs
1139  t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
1140  t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
1141  t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
1142  t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par)));
1143  t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par)));
1144  if (useindex(par, 0)) {
1145  // a, c, d
1146  ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
1147  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1148  x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1149  x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1150  t->itabadd(0, x);
1151  }
1152  if (useindex(par, 1)) {
1153  // e, d, c, b, a
1154  ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5);
1155  x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
1156  x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
1157  x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
1158  x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
1159  x->icoladd(4, new ICol(*x, 4, *t->m_col[0]));
1160  t->itabadd(1, x);
1161  }
1162  if (useindex(par, 2)) {
1163  // d
1164  ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1);
1165  x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
1166  t->itabadd(2, x);
1167  }
1168  if (useindex(par, 3)) {
1169  // b
1170  ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1);
1171  x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
1172  t->itabadd(3, x);
1173  }
1174  if (useindex(par, 4)) {
1175  // a, c
1176  ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2);
1177  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1178  x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1179  t->itabadd(4, x);
1180  }
1181  if (useindex(par, 5)) {
1182  // a, c, d, e
1183  ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4);
1184  x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
1185  x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
1186  x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
1187  x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
1188  t->itabadd(5, x);
1189  }
1190  tablist[2] = t;
1191  }
1192  verifytables();
1193 }
1194 
1195 // connections
1196 
1197 static Ndb_cluster_connection* g_ncc = 0;
1198 
1199 struct Con {
1200  Ndb* m_ndb;
1202  NdbTransaction* m_tx;
1203  Uint64 m_txid;
1204  NdbOperation* m_op;
1205  NdbIndexOperation* m_indexop;
1206  NdbScanOperation* m_scanop;
1207  NdbIndexScanOperation* m_indexscanop;
1208  NdbScanFilter* m_scanfilter;
1209  enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
1210  ScanMode m_scanmode;
1211  enum ErrType {
1212  ErrNone = 0,
1213  ErrDeadlock = 1,
1214  ErrNospace = 2,
1215  ErrOther = 4
1216  };
1217  ErrType m_errtype;
1218  char m_errname[100];
1219  Con() :
1220  m_ndb(0), m_dic(0), m_tx(0), m_txid(0), m_op(0), m_indexop(0),
1221  m_scanop(0), m_indexscanop(0), m_scanfilter(0),
1222  m_scanmode(ScanNo), m_errtype(ErrNone) {}
1223  ~Con() {
1224  if (m_tx != 0)
1225  closeTransaction();
1226  }
1227  int connect();
1228  void connect(const Con& con);
1229  void disconnect();
1230  int startTransaction();
1231  int getNdbOperation(const Tab& tab);
1232  int getNdbIndexOperation1(const ITab& itab, const Tab& tab);
1233  int getNdbIndexOperation(const ITab& itab, const Tab& tab);
1234  int getNdbScanOperation(const Tab& tab);
1235  int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
1236  int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
1237  int getNdbScanFilter();
1238  int equal(int num, const char* addr);
1239  int getValue(int num, NdbRecAttr*& rec);
1240  int setValue(int num, const char* addr);
1241  int setBound(int num, int type, const void* value);
1242  int beginFilter(int group);
1243  int endFilter();
1244  int setFilter(int num, int cond, const void* value, uint len);
1245  int execute(ExecType et);
1246  int execute(ExecType et, uint& err);
1247  int readTuple(Par par);
1248  int readTuples(Par par);
1249  int readIndexTuples(Par par);
1250  int executeScan();
1251  int nextScanResult(bool fetchAllowed);
1252  int nextScanResult(bool fetchAllowed, uint& err);
1253  int updateScanTuple(Con& con2);
1254  int deleteScanTuple(Con& con2);
1255  void closeScan();
1256  void closeTransaction();
1257  const char* errname(uint err);
1258  void printerror(NdbOut& out);
1259 };
1260 
1261 int
1262 Con::connect()
1263 {
1264  assert(m_ndb == 0);
1265  m_ndb = new Ndb(g_ncc, "TEST_DB");
1266  CHKCON(m_ndb->init() == 0, *this);
1267  CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
1268  m_tx = 0, m_txid = 0, m_op = 0;
1269  return 0;
1270 }
1271 
1272 void
1273 Con::connect(const Con& con)
1274 {
1275  assert(m_ndb == 0);
1276  m_ndb = con.m_ndb;
1277 }
1278 
1279 void
1280 Con::disconnect()
1281 {
1282  delete m_ndb;
1283  m_ndb = 0, m_dic = 0, m_tx = 0, m_txid = 0, m_op = 0;
1284 }
1285 
1286 int
1287 Con::startTransaction()
1288 {
1289  assert(m_ndb != 0);
1290  if (m_tx != 0)
1291  closeTransaction();
1292  CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
1293  m_txid = m_tx->getTransactionId();
1294  return 0;
1295 }
1296 
1297 int
1298 Con::getNdbOperation(const Tab& tab)
1299 {
1300  assert(m_tx != 0);
1301  CHKCON((m_op = m_tx->getNdbOperation(tab.m_name)) != 0, *this);
1302  return 0;
1303 }
1304 
1305 int
1306 Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab)
1307 {
1308  assert(m_tx != 0);
1309  CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
1310  return 0;
1311 }
1312 
1313 int
1314 Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
1315 {
1316  assert(m_tx != 0);
1317  uint tries = 0;
1318  while (1) {
1319  if (getNdbIndexOperation1(itab, tab) == 0)
1320  break;
1321  CHK(++tries < 10);
1322  NdbSleep_MilliSleep(100);
1323  }
1324  return 0;
1325 }
1326 
1327 int
1328 Con::getNdbScanOperation(const Tab& tab)
1329 {
1330  assert(m_tx != 0);
1331  CHKCON((m_op = m_scanop = m_tx->getNdbScanOperation(tab.m_name)) != 0, *this);
1332  return 0;
1333 }
1334 
1335 int
1336 Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab)
1337 {
1338  assert(m_tx != 0);
1339  CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this);
1340  return 0;
1341 }
1342 
1343 int
1344 Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
1345 {
1346  assert(m_tx != 0);
1347  uint tries = 0;
1348  while (1) {
1349  if (getNdbIndexScanOperation1(itab, tab) == 0)
1350  break;
1351  CHK(++tries < 10);
1352  NdbSleep_MilliSleep(100);
1353  }
1354  return 0;
1355 }
1356 
1357 int
1358 Con::getNdbScanFilter()
1359 {
1360  assert(m_tx != 0 && m_scanop != 0);
1361  delete m_scanfilter;
1362  m_scanfilter = new NdbScanFilter(m_scanop);
1363  return 0;
1364 }
1365 
1366 int
1367 Con::equal(int num, const char* addr)
1368 {
1369  assert(m_tx != 0 && m_op != 0);
1370  CHKCON(m_op->equal(num, addr) == 0, *this);
1371  return 0;
1372 }
1373 
1374 int
1375 Con::getValue(int num, NdbRecAttr*& rec)
1376 {
1377  assert(m_tx != 0 && m_op != 0);
1378  CHKCON((rec = m_op->getValue(num, 0)) != 0, *this);
1379  return 0;
1380 }
1381 
1382 int
1383 Con::setValue(int num, const char* addr)
1384 {
1385  assert(m_tx != 0 && m_op != 0);
1386  CHKCON(m_op->setValue(num, addr) == 0, *this);
1387  return 0;
1388 }
1389 
1390 int
1391 Con::setBound(int num, int type, const void* value)
1392 {
1393  assert(m_tx != 0 && m_indexscanop != 0);
1394  CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
1395  return 0;
1396 }
1397 
1398 int
1399 Con::beginFilter(int group)
1400 {
1401  assert(m_tx != 0 && m_scanfilter != 0);
1402  CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
1403  return 0;
1404 }
1405 
1406 int
1407 Con::endFilter()
1408 {
1409  assert(m_tx != 0 && m_scanfilter != 0);
1410  CHKCON(m_scanfilter->end() == 0, *this);
1411  return 0;
1412 }
1413 
1414 int
1415 Con::setFilter(int num, int cond, const void* value, uint len)
1416 {
1417  assert(m_tx != 0 && m_scanfilter != 0);
1418  CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
1419  return 0;
1420 }
1421 
1422 int
1423 Con::execute(ExecType et)
1424 {
1425  assert(m_tx != 0);
1426  CHKCON(m_tx->execute(et) == 0, *this);
1427  return 0;
1428 }
1429 
1430 int
1431 Con::execute(ExecType et, uint& err)
1432 {
1433  int ret = execute(et);
1434  // err in: errors to catch, out: error caught
1435  const uint errin = err;
1436  err = 0;
1437  if (ret == -1) {
1438  if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
1439  LL3("caught deadlock");
1440  err = ErrDeadlock;
1441  ret = 0;
1442  }
1443  if (m_errtype == ErrNospace && (errin & ErrNospace)) {
1444  LL3("caught nospace");
1445  err = ErrNospace;
1446  ret = 0;
1447  }
1448  }
1449  CHK(ret == 0);
1450  return 0;
1451 }
1452 
1453 int
1454 Con::readTuple(Par par)
1455 {
1456  assert(m_tx != 0 && m_op != 0);
1457  NdbOperation::LockMode lm = par.m_lockmode;
1458  CHKCON(m_op->readTuple(lm) == 0, *this);
1459  return 0;
1460 }
1461 
1462 int
1463 Con::readTuples(Par par)
1464 {
1465  assert(m_tx != 0 && m_scanop != 0);
1466  int scan_flags = 0;
1467  if (par.m_tupscan)
1468  scan_flags |= NdbScanOperation::SF_TupScan;
1469  CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
1470  return 0;
1471 }
1472 
1473 int
1474 Con::readIndexTuples(Par par)
1475 {
1476  assert(m_tx != 0 && m_indexscanop != 0);
1477  int scan_flags = 0;
1478  if (par.m_ordered)
1479  scan_flags |= NdbScanOperation::SF_OrderBy;
1480  if (par.m_descending)
1481  scan_flags |= NdbScanOperation::SF_Descending;
1482  if (par.m_multiRange)
1483  {
1484  scan_flags |= NdbScanOperation::SF_MultiRange;
1485  scan_flags |= NdbScanOperation::SF_ReadRangeNo;
1486  }
1487  CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
1488  return 0;
1489 }
1490 
1491 int
1492 Con::executeScan()
1493 {
1494  CHKCON(m_tx->execute(NoCommit) == 0, *this);
1495  return 0;
1496 }
1497 
1498 int
1499 Con::nextScanResult(bool fetchAllowed)
1500 {
1501  int ret;
1502  assert(m_scanop != 0);
1503  CHKCON((ret = m_scanop->nextResult(fetchAllowed)) != -1, *this);
1504  assert(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
1505  return ret;
1506 }
1507 
1508 int
1509 Con::nextScanResult(bool fetchAllowed, uint& err)
1510 {
1511  int ret = nextScanResult(fetchAllowed);
1512  // err in: errors to catch, out: error caught
1513  const uint errin = err;
1514  err = 0;
1515  if (ret == -1) {
1516  if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
1517  LL3("caught deadlock");
1518  err = ErrDeadlock;
1519  ret = 0;
1520  }
1521  }
1522  CHK(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
1523  return ret;
1524 }
1525 
1526 int
1527 Con::updateScanTuple(Con& con2)
1528 {
1529  assert(con2.m_tx != 0);
1530  CHKCON((con2.m_op = m_scanop->updateCurrentTuple(con2.m_tx)) != 0, *this);
1531  con2.m_txid = m_txid; // in the kernel
1532  return 0;
1533 }
1534 
1535 int
1536 Con::deleteScanTuple(Con& con2)
1537 {
1538  assert(con2.m_tx != 0);
1539  CHKCON(m_scanop->deleteCurrentTuple(con2.m_tx) == 0, *this);
1540  con2.m_txid = m_txid; // in the kernel
1541  return 0;
1542 }
1543 
1544 void
1545 Con::closeScan()
1546 {
1547  assert(m_scanop != 0);
1548  m_scanop->close();
1549  m_scanop = 0, m_indexscanop = 0;
1550 
1551 }
1552 
1553 void
1554 Con::closeTransaction()
1555 {
1556  assert(m_ndb != 0 && m_tx != 0);
1557  m_ndb->closeTransaction(m_tx);
1558  m_tx = 0, m_txid = 0, m_op = 0;
1559  m_scanop = 0, m_indexscanop = 0;
1560 }
1561 
1562 const char*
1563 Con::errname(uint err)
1564 {
1565  sprintf(m_errname, "0x%x", err);
1566  if (err & ErrDeadlock)
1567  strcat(m_errname, ",deadlock");
1568  if (err & ErrNospace)
1569  strcat(m_errname, ",nospace");
1570  return m_errname;
1571 }
1572 
1573 void
1574 Con::printerror(NdbOut& out)
1575 {
1576  m_errtype = ErrOther;
1577  uint any = 0;
1578  int code;
1579  int die = 0;
1580  if (m_ndb) {
1581  if ((code = m_ndb->getNdbError().code) != 0) {
1582  LL0(++any << " ndb: error " << m_ndb->getNdbError());
1583  die += (code == g_opt.m_die);
1584  }
1585  if (m_dic && (code = m_dic->getNdbError().code) != 0) {
1586  LL0(++any << " dic: error " << m_dic->getNdbError());
1587  die += (code == g_opt.m_die);
1588  }
1589  if (m_tx) {
1590  if ((code = m_tx->getNdbError().code) != 0) {
1591  LL0(++any << " con: error " << m_tx->getNdbError());
1592  die += (code == g_opt.m_die);
1593  // 631 is new, occurs only on 4 db nodes, needs to be checked out
1594  if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
1595  m_errtype = ErrDeadlock;
1596  if (code == 826 || code == 827 || code == 902)
1597  m_errtype = ErrNospace;
1598  }
1599  if (m_op && m_op->getNdbError().code != 0) {
1600  LL0(++any << " op : error " << m_op->getNdbError());
1601  die += (code == g_opt.m_die);
1602  }
1603  }
1604  }
1605  if (!any) {
1606  LL0("failed but no NDB error code");
1607  }
1608  if (die) {
1609  if (g_opt.m_core)
1610  abort();
1611  exit(1);
1612  }
1613 }
1614 
1615 // dictionary operations
1616 
1617 static int
1618 invalidateindex(Par par, const ITab& itab)
1619 {
1620  Con& con = par.con();
1621  const Tab& tab = par.tab();
1622  con.m_ndb->getDictionary()->invalidateIndex(itab.m_name, tab.m_name);
1623  return 0;
1624 }
1625 
1626 static int
1627 invalidateindex(Par par)
1628 {
1629  const Tab& tab = par.tab();
1630  for (uint i = 0; i < tab.m_itabs; i++) {
1631  if (tab.m_itab[i] == 0)
1632  continue;
1633  const ITab& itab = *tab.m_itab[i];
1634  invalidateindex(par, itab);
1635  }
1636  return 0;
1637 }
1638 
1639 static int
1640 invalidatetable(Par par)
1641 {
1642  Con& con = par.con();
1643  const Tab& tab = par.tab();
1644  invalidateindex(par);
1645  con.m_ndb->getDictionary()->invalidateTable(tab.m_name);
1646  return 0;
1647 }
1648 
1649 static int
1650 droptable(Par par)
1651 {
1652  Con& con = par.con();
1653  const Tab& tab = par.tab();
1654  con.m_dic = con.m_ndb->getDictionary();
1655  if (con.m_dic->getTable(tab.m_name) == 0) {
1656  // how to check for error
1657  LL4("no table " << tab.m_name);
1658  } else {
1659  LL3("drop table " << tab.m_name);
1660  CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con);
1661  }
1662  con.m_dic = 0;
1663  return 0;
1664 }
1665 
1666 static int
1667 createtable(Par par)
1668 {
1669  Con& con = par.con();
1670  const Tab& tab = par.tab();
1671  LL3("create table " << tab.m_name);
1672  LL4(tab);
1673  NdbDictionary::Table t(tab.m_name);
1674  if (par.m_fragtype != NdbDictionary::Object::FragUndefined) {
1675  t.setFragmentType(par.m_fragtype);
1676  }
1677  if (par.m_nologging) {
1678  t.setLogging(false);
1679  }
1680  for (uint k = 0; k < tab.m_cols; k++) {
1681  const Col& col = *tab.m_col[k];
1682  NdbDictionary::Column c(col.m_name);
1683  c.setType((NdbDictionary::Column::Type)col.m_type);
1684  c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
1685  c.setPrimaryKey(col.m_pk);
1686  c.setNullable(col.m_nullable);
1687  if (col.m_chs != 0)
1688  c.setCharset(col.m_chs->m_cs);
1689  t.addColumn(c);
1690  }
1691  con.m_dic = con.m_ndb->getDictionary();
1692  CHKCON(con.m_dic->createTable(t) == 0, con);
1693  con.m_dic = 0;
1694  return 0;
1695 }
1696 
1697 static int
1698 dropindex(Par par, const ITab& itab)
1699 {
1700  Con& con = par.con();
1701  const Tab& tab = par.tab();
1702  con.m_dic = con.m_ndb->getDictionary();
1703  if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) {
1704  // how to check for error
1705  LL4("no index " << itab.m_name);
1706  } else {
1707  LL3("drop index " << itab.m_name);
1708  CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con);
1709  }
1710  con.m_dic = 0;
1711  return 0;
1712 }
1713 
1714 static int
1715 dropindex(Par par)
1716 {
1717  const Tab& tab = par.tab();
1718  for (uint i = 0; i < tab.m_itabs; i++) {
1719  if (tab.m_itab[i] == 0)
1720  continue;
1721  const ITab& itab = *tab.m_itab[i];
1722  CHK(dropindex(par, itab) == 0);
1723  }
1724  return 0;
1725 }
1726 
1727 static int
1728 createindex(Par par, const ITab& itab)
1729 {
1730  Con& con = par.con();
1731  const Tab& tab = par.tab();
1732  LL3("create index " << itab.m_name);
1733  LL4(itab);
1734  NdbDictionary::Index x(itab.m_name);
1735  x.setTable(tab.m_name);
1736  x.setType((NdbDictionary::Index::Type)itab.m_type);
1737  if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
1738  x.setLogging(false);
1739  }
1740  for (uint k = 0; k < itab.m_icols; k++) {
1741  const ICol& icol = *itab.m_icol[k];
1742  const Col& col = icol.m_col;
1743  x.addColumnName(col.m_name);
1744  }
1745  con.m_dic = con.m_ndb->getDictionary();
1746  CHKCON(con.m_dic->createIndex(x) == 0, con);
1747  con.m_dic = 0;
1748  return 0;
1749 }
1750 
1751 static int
1752 createindex(Par par)
1753 {
1754  const Tab& tab = par.tab();
1755  for (uint i = 0; i < tab.m_itabs; i++) {
1756  if (tab.m_itab[i] == 0)
1757  continue;
1758  const ITab& itab = *tab.m_itab[i];
1759  CHK(createindex(par, itab) == 0);
1760  }
1761  return 0;
1762 }
1763 
1764 // data sets
1765 
1766 // Val - typed column value
1767 
1768 struct Val {
1769  const Col& m_col;
1770  union {
1771  Uint32 m_uint32;
1772  uchar* m_char;
1773  uchar* m_varchar;
1774  uchar* m_longvarchar;
1775  };
1776  bool m_null;
1777  // construct
1778  Val(const Col& col);
1779  ~Val();
1780  void copy(const Val& val2);
1781  void copy(const void* addr);
1782  const void* dataaddr() const;
1783  void calc(Par par, uint i);
1784  void calckey(Par par, uint i);
1785  void calckeychars(Par par, uint i, uint& n, uchar* buf);
1786  void calcnokey(Par par);
1787  void calcnokeychars(Par par, uint& n, uchar* buf);
1788  // operations
1789  int setval(Par par) const;
1790  int setval(Par par, const ICol& icol) const;
1791  // compare
1792  int cmp(Par par, const Val& val2) const;
1793  int cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const;
1794  int verify(Par par, const Val& val2) const;
1795 private:
1796  Val& operator=(const Val& val2);
1797 };
1798 
1799 static NdbOut&
1800 operator<<(NdbOut& out, const Val& val);
1801 
1802 // construct
1803 
1804 Val::Val(const Col& col) :
1805  m_col(col)
1806 {
1807  switch (col.m_type) {
1808  case Col::Unsigned:
1809  m_uint32 = 0x7e7e7e7e;
1810  break;
1811  case Col::Char:
1812  m_char = new uchar [col.m_bytelength];
1813  memset(m_char, 0x7e, col.m_bytelength);
1814  break;
1815  case Col::Varchar:
1816  m_varchar = new uchar [1 + col.m_bytelength];
1817  memset(m_char, 0x7e, 1 + col.m_bytelength);
1818  break;
1819  case Col::Longvarchar:
1820  m_longvarchar = new uchar [2 + col.m_bytelength];
1821  memset(m_char, 0x7e, 2 + col.m_bytelength);
1822  break;
1823  default:
1824  assert(false);
1825  break;
1826  }
1827 }
1828 
1829 Val::~Val()
1830 {
1831  const Col& col = m_col;
1832  switch (col.m_type) {
1833  case Col::Unsigned:
1834  break;
1835  case Col::Char:
1836  delete [] m_char;
1837  break;
1838  case Col::Varchar:
1839  delete [] m_varchar;
1840  break;
1841  case Col::Longvarchar:
1842  delete [] m_longvarchar;
1843  break;
1844  default:
1845  assert(false);
1846  break;
1847  }
1848 }
1849 
1850 void
1851 Val::copy(const Val& val2)
1852 {
1853  const Col& col = m_col;
1854  const Col& col2 = val2.m_col;
1855  assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
1856  if (val2.m_null) {
1857  m_null = true;
1858  return;
1859  }
1860  copy(val2.dataaddr());
1861 }
1862 
1863 void
1864 Val::copy(const void* addr)
1865 {
1866  const Col& col = m_col;
1867  switch (col.m_type) {
1868  case Col::Unsigned:
1869  m_uint32 = *(const Uint32*)addr;
1870  break;
1871  case Col::Char:
1872  memcpy(m_char, addr, col.m_bytelength);
1873  break;
1874  case Col::Varchar:
1875  memcpy(m_varchar, addr, 1 + col.m_bytelength);
1876  break;
1877  case Col::Longvarchar:
1878  memcpy(m_longvarchar, addr, 2 + col.m_bytelength);
1879  break;
1880  default:
1881  assert(false);
1882  break;
1883  }
1884  m_null = false;
1885 }
1886 
1887 const void*
1888 Val::dataaddr() const
1889 {
1890  const Col& col = m_col;
1891  switch (col.m_type) {
1892  case Col::Unsigned:
1893  return &m_uint32;
1894  case Col::Char:
1895  return m_char;
1896  case Col::Varchar:
1897  return m_varchar;
1898  case Col::Longvarchar:
1899  return m_longvarchar;
1900  default:
1901  break;
1902  }
1903  assert(false);
1904  return 0;
1905 }
1906 
1907 void
1908 Val::calc(Par par, uint i)
1909 {
1910  const Col& col = m_col;
1911  col.m_pk ? calckey(par, i) : calcnokey(par);
1912  if (!m_null)
1913  col.wellformed(dataaddr());
1914 }
1915 
1916 void
1917 Val::calckey(Par par, uint i)
1918 {
1919  const Col& col = m_col;
1920  m_null = false;
1921  switch (col.m_type) {
1922  case Col::Unsigned:
1923  m_uint32 = i;
1924  break;
1925  case Col::Char:
1926  {
1927  const Chs* chs = col.m_chs;
1928  CHARSET_INFO* cs = chs->m_cs;
1929  uint n = 0;
1930  calckeychars(par, i, n, m_char);
1931  // extend by appropriate space
1932  (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
1933  }
1934  break;
1935  case Col::Varchar:
1936  {
1937  uint n = 0;
1938  calckeychars(par, i, n, m_varchar + 1);
1939  // set length and pad with nulls
1940  m_varchar[0] = n;
1941  memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
1942  }
1943  break;
1944  case Col::Longvarchar:
1945  {
1946  uint n = 0;
1947  calckeychars(par, i, n, m_longvarchar + 2);
1948  // set length and pad with nulls
1949  m_longvarchar[0] = (n & 0xff);
1950  m_longvarchar[1] = (n >> 8);
1951  memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
1952  }
1953  break;
1954  default:
1955  assert(false);
1956  break;
1957  }
1958 }
1959 
1960 void
1961 Val::calckeychars(Par par, uint i, uint& n, uchar* buf)
1962 {
1963  const Col& col = m_col;
1964  const Chs* chs = col.m_chs;
1965  n = 0;
1966  uint len = 0;
1967  while (len < col.m_length) {
1968  if (i % (1 + n) == 0) {
1969  break;
1970  }
1971  const Chr& chr = chs->m_chr[i % maxcharcount];
1972  assert(n + chr.m_size <= col.m_bytelength);
1973  memcpy(buf + n, chr.m_bytes, chr.m_size);
1974  n += chr.m_size;
1975  len++;
1976  }
1977 }
1978 
1979 void
1980 Val::calcnokey(Par par)
1981 {
1982  const Col& col = m_col;
1983  m_null = false;
1984  if (col.m_nullable && urandom(100) < par.m_pctnull) {
1985  m_null = true;
1986  return;
1987  }
1988  int r = irandom((par.m_pctrange * par.m_range) / 100);
1989  if (par.m_bdir != 0 && urandom(10) != 0) {
1990  if ((r < 0 && par.m_bdir > 0) || (r > 0 && par.m_bdir < 0))
1991  r = -r;
1992  }
1993  uint v = par.m_range + r;
1994  switch (col.m_type) {
1995  case Col::Unsigned:
1996  m_uint32 = v;
1997  break;
1998  case Col::Char:
1999  {
2000  const Chs* chs = col.m_chs;
2001  CHARSET_INFO* cs = chs->m_cs;
2002  uint n = 0;
2003  calcnokeychars(par, n, m_char);
2004  // extend by appropriate space
2005  (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
2006  }
2007  break;
2008  case Col::Varchar:
2009  {
2010  uint n = 0;
2011  calcnokeychars(par, n, m_varchar + 1);
2012  // set length and pad with nulls
2013  m_varchar[0] = n;
2014  memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
2015  }
2016  break;
2017  case Col::Longvarchar:
2018  {
2019  uint n = 0;
2020  calcnokeychars(par, n, m_longvarchar + 2);
2021  // set length and pad with nulls
2022  m_longvarchar[0] = (n & 0xff);
2023  m_longvarchar[1] = (n >> 8);
2024  memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
2025  }
2026  break;
2027  default:
2028  assert(false);
2029  break;
2030  }
2031 }
2032 
2033 void
2034 Val::calcnokeychars(Par par, uint& n, uchar* buf)
2035 {
2036  const Col& col = m_col;
2037  const Chs* chs = col.m_chs;
2038  n = 0;
2039  uint len = 0;
2040  while (len < col.m_length) {
2041  if (urandom(1 + col.m_bytelength) == 0) {
2042  break;
2043  }
2044  uint half = maxcharcount / 2;
2045  int r = irandom((par.m_pctrange * half) / 100);
2046  if (par.m_bdir != 0 && urandom(10) != 0) {
2047  if ((r < 0 && par.m_bdir > 0) || (r > 0 && par.m_bdir < 0))
2048  r = -r;
2049  }
2050  uint i = half + r;
2051  assert(i < maxcharcount);
2052  const Chr& chr = chs->m_chr[i];
2053  assert(n + chr.m_size <= col.m_bytelength);
2054  memcpy(buf + n, chr.m_bytes, chr.m_size);
2055  n += chr.m_size;
2056  len++;
2057  }
2058 }
2059 
2060 // operations
2061 
2062 int
2063 Val::setval(Par par) const
2064 {
2065  Con& con = par.con();
2066  const Col& col = m_col;
2067  if (col.m_pk) {
2068  assert(!m_null);
2069  const char* addr = (const char*)dataaddr();
2070  LL5("setval pk [" << col << "] " << *this);
2071  CHK(con.equal(col.m_num, addr) == 0);
2072  } else {
2073  const char* addr = !m_null ? (const char*)dataaddr() : 0;
2074  LL5("setval non-pk [" << col << "] " << *this);
2075  CHK(con.setValue(col.m_num, addr) == 0);
2076  }
2077  return 0;
2078 }
2079 
2080 int
2081 Val::setval(Par par, const ICol& icol) const
2082 {
2083  Con& con = par.con();
2084  assert(!m_null);
2085  const char* addr = (const char*)dataaddr();
2086  LL5("setval key [" << icol << "] " << *this);
2087  CHK(con.equal(icol.m_num, addr) == 0);
2088  return 0;
2089 }
2090 
2091 // compare
2092 
2093 int
2094 Val::cmp(Par par, const Val& val2) const
2095 {
2096  const Col& col = m_col;
2097  const Col& col2 = val2.m_col;
2098  assert(col.equal(col2));
2099  if (m_null || val2.m_null) {
2100  if (!m_null)
2101  return +1;
2102  if (!val2.m_null)
2103  return -1;
2104  return 0;
2105  }
2106  // verify data formats
2107  col.wellformed(dataaddr());
2108  col.wellformed(val2.dataaddr());
2109  // compare
2110  switch (col.m_type) {
2111  case Col::Unsigned:
2112  {
2113  if (m_uint32 < val2.m_uint32)
2114  return -1;
2115  if (m_uint32 > val2.m_uint32)
2116  return +1;
2117  return 0;
2118  }
2119  break;
2120  case Col::Char:
2121  {
2122  uint len = col.m_bytelength;
2123  return cmpchars(par, m_char, len, val2.m_char, len);
2124  }
2125  break;
2126  case Col::Varchar:
2127  {
2128  uint len1 = m_varchar[0];
2129  uint len2 = val2.m_varchar[0];
2130  return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2);
2131  }
2132  break;
2133  case Col::Longvarchar:
2134  {
2135  uint len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
2136  uint len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
2137  return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2);
2138  }
2139  break;
2140  default:
2141  break;
2142  }
2143  assert(false);
2144  return 0;
2145 }
2146 
2147 int
2148 Val::cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const
2149 {
2150  const Col& col = m_col;
2151  const Chs* chs = col.m_chs;
2152  CHARSET_INFO* cs = chs->m_cs;
2153  int k;
2154  if (!par.m_collsp) {
2155  uchar x1[maxxmulsize * NDB_MAX_TUPLE_SIZE];
2156  uchar x2[maxxmulsize * NDB_MAX_TUPLE_SIZE];
2157  // make strxfrm pad both to same length
2158  uint len = maxxmulsize * col.m_bytelength;
2159  int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1);
2160  int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2);
2161  assert(n1 != -1 && n1 == n2);
2162  k = memcmp(x1, x2, n1);
2163  } else {
2164  k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false);
2165  }
2166  return k < 0 ? -1 : k > 0 ? +1 : 0;
2167 }
2168 
2169 int
2170 Val::verify(Par par, const Val& val2) const
2171 {
2172  CHK(cmp(par, val2) == 0);
2173  return 0;
2174 }
2175 
2176 // print
2177 
2178 static void
2179 printstring(NdbOut& out, const uchar* str, uint len, bool showlen)
2180 {
2181  char buf[4 * NDB_MAX_TUPLE_SIZE];
2182  char *p = buf;
2183  *p++ = '[';
2184  if (showlen) {
2185  sprintf(p, "%u:", len);
2186  p += strlen(p);
2187  }
2188  for (uint i = 0; i < len; i++) {
2189  uchar c = str[i];
2190  if (c == '\\') {
2191  *p++ = '\\';
2192  *p++ = c;
2193  } else if (0x20 <= c && c <= 0x7e) {
2194  *p++ = c;
2195  } else {
2196  *p++ = '\\';
2197  *p++ = hexstr[c >> 4];
2198  *p++ = hexstr[c & 15];
2199  }
2200  }
2201  *p++ = ']';
2202  *p = 0;
2203  out << buf;
2204 }
2205 
2206 static NdbOut&
2207 operator<<(NdbOut& out, const Val& val)
2208 {
2209  const Col& col = val.m_col;
2210  if (val.m_null) {
2211  out << "NULL";
2212  return out;
2213  }
2214  switch (col.m_type) {
2215  case Col::Unsigned:
2216  out << val.m_uint32;
2217  break;
2218  case Col::Char:
2219  {
2220  uint len = col.m_bytelength;
2221  printstring(out, val.m_char, len, false);
2222  }
2223  break;
2224  case Col::Varchar:
2225  {
2226  uint len = val.m_varchar[0];
2227  printstring(out, val.m_varchar + 1, len, true);
2228  }
2229  break;
2230  case Col::Longvarchar:
2231  {
2232  uint len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
2233  printstring(out, val.m_longvarchar + 2, len, true);
2234  }
2235  break;
2236  default:
2237  out << "type" << col.m_type;
2238  assert(false);
2239  break;
2240  }
2241  return out;
2242 }
2243 
2244 // Row - table tuple
2245 
2246 struct Row {
2247  const Tab& m_tab;
2248  Val** m_val;
2249  enum St {
2250  StUndef = 0,
2251  StDefine = 1,
2252  StPrepare = 2,
2253  StCommit = 3
2254  };
2255  enum Op {
2256  OpNone = 0,
2257  OpIns = 2,
2258  OpUpd = 4,
2259  OpDel = 8,
2260  OpRead = 16,
2261  OpReadEx = 32,
2262  OpReadCom = 64,
2263  OpDML = 2 | 4 | 8,
2264  OpREAD = 16 | 32 | 64
2265  };
2266  St m_st;
2267  Op m_op;
2268  Uint64 m_txid;
2269  Row* m_bi;
2270  // construct
2271  Row(const Tab& tab);
2272  ~Row();
2273  void copy(const Row& row2, bool copy_bi);
2274  void copyval(const Row& row2, uint colmask = ~0);
2275  void calc(Par par, uint i, uint colmask = ~0);
2276  // operations
2277  int setval(Par par, uint colmask = ~0);
2278  int setval(Par par, const ITab& itab);
2279  int insrow(Par par);
2280  int updrow(Par par);
2281  int updrow(Par par, const ITab& itab);
2282  int delrow(Par par);
2283  int delrow(Par par, const ITab& itab);
2284  int selrow(Par par);
2285  int selrow(Par par, const ITab& itab);
2286  int setrow(Par par);
2287  // compare
2288  int cmp(Par par, const Row& row2) const;
2289  int cmp(Par par, const Row& row2, const ITab& itab) const;
2290  int verify(Par par, const Row& row2, bool pkonly) const;
2291 private:
2292  Row& operator=(const Row& row2);
2293 };
2294 
2295 static NdbOut&
2296 operator<<(NdbOut& out, const Row* rowp);
2297 
2298 static NdbOut&
2299 operator<<(NdbOut& out, const Row& row);
2300 
2301 // construct
2302 
2303 Row::Row(const Tab& tab) :
2304  m_tab(tab)
2305 {
2306  m_val = new Val* [tab.m_cols];
2307  for (uint k = 0; k < tab.m_cols; k++) {
2308  const Col& col = *tab.m_col[k];
2309  m_val[k] = new Val(col);
2310  }
2311  m_st = StUndef;
2312  m_op = OpNone;
2313  m_txid = 0;
2314  m_bi = 0;
2315 }
2316 
2317 Row::~Row()
2318 {
2319  const Tab& tab = m_tab;
2320  for (uint k = 0; k < tab.m_cols; k++) {
2321  delete m_val[k];
2322  }
2323  delete [] m_val;
2324  delete m_bi;
2325 }
2326 
2327 void
2328 Row::copy(const Row& row2, bool copy_bi)
2329 {
2330  const Tab& tab = m_tab;
2331  copyval(row2);
2332  m_st = row2.m_st;
2333  m_op = row2.m_op;
2334  m_txid = row2.m_txid;
2335  assert(m_bi == 0);
2336  if (copy_bi && row2.m_bi != 0) {
2337  m_bi = new Row(tab);
2338  m_bi->copy(*row2.m_bi, copy_bi);
2339  }
2340 }
2341 
2342 void
2343 Row::copyval(const Row& row2, uint colmask)
2344 {
2345  const Tab& tab = m_tab;
2346  assert(&tab == &row2.m_tab);
2347  for (uint k = 0; k < tab.m_cols; k++) {
2348  Val& val = *m_val[k];
2349  const Val& val2 = *row2.m_val[k];
2350  if ((1 << k) & colmask)
2351  val.copy(val2);
2352  }
2353 }
2354 
2355 void
2356 Row::calc(Par par, uint i, uint colmask)
2357 {
2358  const Tab& tab = m_tab;
2359  for (uint k = 0; k < tab.m_cols; k++) {
2360  if ((1 << k) & colmask) {
2361  Val& val = *m_val[k];
2362  val.calc(par, i);
2363  }
2364  }
2365 }
2366 
2367 // operations
2368 
2369 int
2370 Row::setval(Par par, uint colmask)
2371 {
2372  const Tab& tab = m_tab;
2373  Rsq rsq(tab.m_cols);
2374  for (uint k = 0; k < tab.m_cols; k++) {
2375  uint k2 = rsq.next();
2376  if ((1 << k2) & colmask) {
2377  const Val& val = *m_val[k2];
2378  CHK(val.setval(par) == 0);
2379  }
2380  }
2381  return 0;
2382 }
2383 
2384 int
2385 Row::setval(Par par, const ITab& itab)
2386 {
2387  Rsq rsq(itab.m_icols);
2388  for (uint k = 0; k < itab.m_icols; k++) {
2389  uint k2 = rsq.next();
2390  const ICol& icol = *itab.m_icol[k2];
2391  const Col& col = icol.m_col;
2392  uint m = col.m_num;
2393  const Val& val = *m_val[m];
2394  CHK(val.setval(par, icol) == 0);
2395  }
2396  return 0;
2397 }
2398 
2399 int
2400 Row::insrow(Par par)
2401 {
2402  Con& con = par.con();
2403  const Tab& tab = m_tab;
2404  CHK(con.getNdbOperation(tab) == 0);
2405  CHKCON(con.m_op->insertTuple() == 0, con);
2406  CHK(setval(par, tab.m_pkmask) == 0);
2407  CHK(setval(par, ~tab.m_pkmask) == 0);
2408  assert(m_st == StUndef);
2409  m_st = StDefine;
2410  m_op = OpIns;
2411  m_txid = con.m_txid;
2412  return 0;
2413 }
2414 
2415 int
2416 Row::updrow(Par par)
2417 {
2418  Con& con = par.con();
2419  const Tab& tab = m_tab;
2420  CHK(con.getNdbOperation(tab) == 0);
2421  CHKCON(con.m_op->updateTuple() == 0, con);
2422  CHK(setval(par, tab.m_pkmask) == 0);
2423  CHK(setval(par, ~tab.m_pkmask) == 0);
2424  assert(m_st == StUndef);
2425  m_st = StDefine;
2426  m_op = OpUpd;
2427  m_txid = con.m_txid;
2428  return 0;
2429 }
2430 
2431 int
2432 Row::updrow(Par par, const ITab& itab)
2433 {
2434  Con& con = par.con();
2435  const Tab& tab = m_tab;
2436  assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2437  CHK(con.getNdbIndexOperation(itab, tab) == 0);
2438  CHKCON(con.m_op->updateTuple() == 0, con);
2439  CHK(setval(par, itab) == 0);
2440  CHK(setval(par, ~tab.m_pkmask) == 0);
2441  assert(m_st == StUndef);
2442  m_st = StDefine;
2443  m_op = OpUpd;
2444  m_txid = con.m_txid;
2445  return 0;
2446 }
2447 
2448 int
2449 Row::delrow(Par par)
2450 {
2451  Con& con = par.con();
2452  const Tab& tab = m_tab;
2453  CHK(con.getNdbOperation(m_tab) == 0);
2454  CHKCON(con.m_op->deleteTuple() == 0, con);
2455  CHK(setval(par, tab.m_pkmask) == 0);
2456  assert(m_st == StUndef);
2457  m_st = StDefine;
2458  m_op = OpDel;
2459  m_txid = con.m_txid;
2460  return 0;
2461 }
2462 
2463 int
2464 Row::delrow(Par par, const ITab& itab)
2465 {
2466  Con& con = par.con();
2467  const Tab& tab = m_tab;
2468  assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2469  CHK(con.getNdbIndexOperation(itab, tab) == 0);
2470  CHKCON(con.m_op->deleteTuple() == 0, con);
2471  CHK(setval(par, itab) == 0);
2472  assert(m_st == StUndef);
2473  m_st = StDefine;
2474  m_op = OpDel;
2475  m_txid = con.m_txid;
2476  return 0;
2477 }
2478 
2479 int
2480 Row::selrow(Par par)
2481 {
2482  Con& con = par.con();
2483  const Tab& tab = m_tab;
2484  CHK(con.getNdbOperation(m_tab) == 0);
2485  CHKCON(con.readTuple(par) == 0, con);
2486  CHK(setval(par, tab.m_pkmask) == 0);
2487  // TODO state
2488  return 0;
2489 }
2490 
2491 int
2492 Row::selrow(Par par, const ITab& itab)
2493 {
2494  Con& con = par.con();
2495  const Tab& tab = m_tab;
2496  assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
2497  CHK(con.getNdbIndexOperation(itab, tab) == 0);
2498  CHKCON(con.readTuple(par) == 0, con);
2499  CHK(setval(par, itab) == 0);
2500  // TODO state
2501  return 0;
2502 }
2503 
2504 int
2505 Row::setrow(Par par)
2506 {
2507  Con& con = par.con();
2508  const Tab& tab = m_tab;
2509  CHK(setval(par, ~tab.m_pkmask) == 0);
2510  assert(m_st == StUndef);
2511  m_st = StDefine;
2512  m_op = OpUpd;
2513  m_txid = con.m_txid;
2514  return 0;
2515 }
2516 
2517 // compare
2518 
2519 int
2520 Row::cmp(Par par, const Row& row2) const
2521 {
2522  const Tab& tab = m_tab;
2523  assert(&tab == &row2.m_tab);
2524  int c = 0;
2525  for (uint k = 0; k < tab.m_cols; k++) {
2526  const Val& val = *m_val[k];
2527  const Val& val2 = *row2.m_val[k];
2528  if ((c = val.cmp(par, val2)) != 0)
2529  break;
2530  }
2531  return c;
2532 }
2533 
2534 int
2535 Row::cmp(Par par, const Row& row2, const ITab& itab) const
2536 {
2537  const Tab& tab = m_tab;
2538  int c = 0;
2539  for (uint i = 0; i < itab.m_icols; i++) {
2540  const ICol& icol = *itab.m_icol[i];
2541  const Col& col = icol.m_col;
2542  uint k = col.m_num;
2543  assert(k < tab.m_cols);
2544  const Val& val = *m_val[k];
2545  const Val& val2 = *row2.m_val[k];
2546  if ((c = val.cmp(par, val2)) != 0)
2547  break;
2548  }
2549  return c;
2550 }
2551 
2552 int
2553 Row::verify(Par par, const Row& row2, bool pkonly) const
2554 {
2555  const Tab& tab = m_tab;
2556  const Row& row1 = *this;
2557  assert(&row1.m_tab == &row2.m_tab);
2558  for (uint k = 0; k < tab.m_cols; k++) {
2559  const Col& col = row1.m_val[k]->m_col;
2560  if (!pkonly || col.m_pk) {
2561  const Val& val1 = *row1.m_val[k];
2562  const Val& val2 = *row2.m_val[k];
2563  CHK(val1.verify(par, val2) == 0);
2564  }
2565  }
2566  return 0;
2567 }
2568 
2569 // print
2570 
2571 static NdbOut&
2572 operator<<(NdbOut& out, const Row::St st)
2573 {
2574  if (st == Row::StUndef)
2575  out << "StUndef";
2576  else if (st == Row::StDefine)
2577  out << "StDefine";
2578  else if (st == Row::StPrepare)
2579  out << "StPrepare";
2580  else if (st == Row::StCommit)
2581  out << "StCommit";
2582  else
2583  out << "st=" << st;
2584  return out;
2585 }
2586 
2587 static NdbOut&
2588 operator<<(NdbOut& out, const Row::Op op)
2589 {
2590  if (op == Row::OpNone)
2591  out << "OpNone";
2592  else if (op == Row::OpIns)
2593  out << "OpIns";
2594  else if (op == Row::OpUpd)
2595  out << "OpUpd";
2596  else if (op == Row::OpDel)
2597  out << "OpDel";
2598  else if (op == Row::OpRead)
2599  out << "OpRead";
2600  else if (op == Row::OpReadEx)
2601  out << "OpReadEx";
2602  else if (op == Row::OpReadCom)
2603  out << "OpReadCom";
2604  else
2605  out << "op=" << op;
2606  return out;
2607 }
2608 
2609 static NdbOut&
2610 operator<<(NdbOut& out, const Row* rowp)
2611 {
2612  if (rowp == 0)
2613  out << "[null]";
2614  else
2615  out << *rowp;
2616  return out;
2617 }
2618 
2619 static NdbOut&
2620 operator<<(NdbOut& out, const Row& row)
2621 {
2622  const Tab& tab = row.m_tab;
2623  out << "[";
2624  for (uint i = 0; i < tab.m_cols; i++) {
2625  if (i > 0)
2626  out << " ";
2627  out << *row.m_val[i];
2628  }
2629  out << " " << row.m_st;
2630  out << " " << row.m_op;
2631  out << " " << HEX(row.m_txid);
2632  if (row.m_bi != 0)
2633  out << " " << row.m_bi;
2634  out << "]";
2635  return out;
2636 }
2637 
2638 // Set - set of table tuples
2639 
2640 struct Set {
2641  const Tab& m_tab;
2642  uint m_rows;
2643  Row** m_row;
2644  uint* m_rowkey; // maps row number (from 0) in scan to tuple key
2645  Row* m_keyrow;
2646  NdbRecAttr** m_rec;
2647  // construct
2648  Set(const Tab& tab, uint rows);
2649  ~Set();
2650  void reset();
2651  bool compat(Par par, uint i, const Row::Op op) const;
2652  void push(uint i);
2653  void copyval(uint i, uint colmask = ~0); // from bi
2654  void calc(Par par, uint i, uint colmask = ~0);
2655  uint count() const;
2656  const Row* getrow(uint i, bool dirty = false) const;
2657  int setrow(uint i, const Row* src, bool force=false);
2658  // transaction
2659  void post(Par par, ExecType et);
2660  // operations
2661  int insrow(Par par, uint i);
2662  int updrow(Par par, uint i);
2663  int updrow(Par par, const ITab& itab, uint i);
2664  int delrow(Par par, uint i);
2665  int delrow(Par par, const ITab& itab, uint i);
2666  int selrow(Par par, const Row& keyrow);
2667  int selrow(Par par, const ITab& itab, const Row& keyrow);
2668  int setrow(Par par, uint i);
2669  int getval(Par par);
2670  int getkey(Par par, uint* i);
2671  int putval(uint i, bool force, uint n = ~0);
2672  // compare
2673  void sort(Par par, const ITab& itab);
2674  int verify(Par par, const Set& set2, bool pkonly, bool dirty = false) const;
2675  int verifyorder(Par par, const ITab& itab, bool descending) const;
2676  // protect structure
2677  NdbMutex* m_mutex;
2678  void lock() const {
2679  NdbMutex_Lock(m_mutex);
2680  }
2681  void unlock() const {
2682  NdbMutex_Unlock(m_mutex);
2683  }
2684 private:
2685  void sort(Par par, const ITab& itab, uint lo, uint hi);
2686  Set& operator=(const Set& set2);
2687 };
2688 
2689 // construct
2690 
2691 Set::Set(const Tab& tab, uint rows) :
2692  m_tab(tab)
2693 {
2694  m_rows = rows;
2695  m_row = new Row* [m_rows];
2696  for (uint i = 0; i < m_rows; i++) {
2697  m_row[i] = 0;
2698  }
2699  m_rowkey = new uint [m_rows];
2700  for (uint n = 0; n < m_rows; n++) {
2701  m_rowkey[n] = ~0;
2702  }
2703  m_keyrow = new Row(tab);
2704  m_rec = new NdbRecAttr* [tab.m_cols];
2705  for (uint k = 0; k < tab.m_cols; k++) {
2706  m_rec[k] = 0;
2707  }
2708  m_mutex = NdbMutex_Create();
2709  assert(m_mutex != 0);
2710 }
2711 
2712 Set::~Set()
2713 {
2714  for (uint i = 0; i < m_rows; i++) {
2715  delete m_row[i];
2716  }
2717  delete [] m_row;
2718  delete [] m_rowkey;
2719  delete m_keyrow;
2720  delete [] m_rec;
2721  NdbMutex_Destroy(m_mutex);
2722 }
2723 
2724 void
2725 Set::reset()
2726 {
2727  for (uint i = 0; i < m_rows; i++) {
2728  m_row[i] = 0;
2729  }
2730 }
2731 
2732 // this sucks
2733 bool
2734 Set::compat(Par par, uint i, const Row::Op op) const
2735 {
2736  Con& con = par.con();
2737  int ret = -1;
2738  int place = 0;
2739  do {
2740  const Row* rowp = getrow(i);
2741  if (rowp == 0) {
2742  ret = op == Row::OpIns;
2743  place = 1;
2744  break;
2745  }
2746  const Row& row = *rowp;
2747  if (!(op & Row::OpREAD)) {
2748  if (row.m_st == Row::StDefine || row.m_st == Row::StPrepare) {
2749  assert(row.m_op & Row::OpDML);
2750  assert(row.m_txid != 0);
2751  if (con.m_txid != row.m_txid) {
2752  ret = false;
2753  place = 2;
2754  break;
2755  }
2756  if (row.m_op != Row::OpDel) {
2757  ret = op == Row::OpUpd || op == Row::OpDel;
2758  place = 3;
2759  break;
2760  }
2761  ret = op == Row::OpIns;
2762  place = 4;
2763  break;
2764  }
2765  if (row.m_st == Row::StCommit) {
2766  assert(row.m_op == Row::OpNone);
2767  assert(row.m_txid == 0);
2768  ret = op == Row::OpUpd || op == Row::OpDel;
2769  place = 5;
2770  break;
2771  }
2772  }
2773  if (op & Row::OpREAD) {
2774  bool dirty =
2775  con.m_txid != row.m_txid &&
2776  par.m_lockmode == NdbOperation::LM_CommittedRead;
2777  const Row* rowp2 = getrow(i, dirty);
2778  if (rowp2 == 0 || rowp2->m_op == Row::OpDel) {
2779  ret = false;
2780  place = 6;
2781  break;
2782  }
2783  ret = true;
2784  place = 7;
2785  break;
2786  }
2787  } while (0);
2788  LL4("compat ret=" << ret << " place=" << place);
2789  assert(ret == false || ret == true);
2790  return ret;
2791 }
2792 
2793 void
2794 Set::push(uint i)
2795 {
2796  const Tab& tab = m_tab;
2797  assert(i < m_rows);
2798  Row* bi = m_row[i];
2799  m_row[i] = new Row(tab);
2800  Row& row = *m_row[i];
2801  row.m_bi = bi;
2802  if (bi != 0)
2803  row.copyval(*bi);
2804 }
2805 
2806 void
2807 Set::copyval(uint i, uint colmask)
2808 {
2809  assert(m_row[i] != 0);
2810  Row& row = *m_row[i];
2811  assert(row.m_bi != 0);
2812  row.copyval(*row.m_bi, colmask);
2813 }
2814 
2815 void
2816 Set::calc(Par par, uint i, uint colmask)
2817 {
2818  assert(m_row[i] != 0);
2819  Row& row = *m_row[i];
2820  row.calc(par, i, colmask);
2821 }
2822 
2823 uint
2824 Set::count() const
2825 {
2826  uint count = 0;
2827  for (uint i = 0; i < m_rows; i++) {
2828  if (m_row[i] != 0)
2829  count++;
2830  }
2831  return count;
2832 }
2833 
2834 const Row*
2835 Set::getrow(uint i, bool dirty) const
2836 {
2837  assert(i < m_rows);
2838  const Row* rowp = m_row[i];
2839  if (dirty) {
2840  while (rowp != 0) {
2841  bool b1 = rowp->m_op == Row::OpNone;
2842  bool b2 = rowp->m_st == Row::StCommit;
2843  assert(b1 == b2);
2844  if (b1) {
2845  assert(rowp->m_bi == 0);
2846  break;
2847  }
2848  rowp = rowp->m_bi;
2849  }
2850  }
2851  return rowp;
2852 }
2853 
2854 int
2855 Set::setrow(uint i, const Row* src, bool force)
2856 {
2857  assert(i < m_rows);
2858  if (m_row[i] != 0)
2859  if (!force)
2860  return -1;
2861 
2862  Row* newRow= new Row(src->m_tab);
2863  newRow->copy(*src, true);
2864  return 0;
2865 }
2866 
2867 
2868 // transaction
2869 
2870 void
2871 Set::post(Par par, ExecType et)
2872 {
2873  LL4("post");
2874  Con& con = par.con();
2875  assert(con.m_txid != 0);
2876  uint i;
2877  for (i = 0; i < m_rows; i++) {
2878  Row* rowp = m_row[i];
2879  if (rowp == 0) {
2880  LL5("skip " << i << " " << rowp);
2881  continue;
2882  }
2883  if (rowp->m_st == Row::StCommit) {
2884  assert(rowp->m_op == Row::OpNone);
2885  assert(rowp->m_txid == 0);
2886  assert(rowp->m_bi == 0);
2887  LL5("skip committed " << i << " " << rowp);
2888  continue;
2889  }
2890  assert(rowp->m_st == Row::StDefine || rowp->m_st == Row::StPrepare);
2891  assert(rowp->m_txid != 0);
2892  if (con.m_txid != rowp->m_txid) {
2893  LL5("skip txid " << i << " " << HEX(con.m_txid) << " " << rowp);
2894  continue;
2895  }
2896  // TODO read ops
2897  assert(rowp->m_op & Row::OpDML);
2898  LL4("post BEFORE " << rowp);
2899  if (et == NoCommit) {
2900  if (rowp->m_st == Row::StDefine) {
2901  rowp->m_st = Row::StPrepare;
2902  Row* bi = rowp->m_bi;
2903  while (bi != 0 && bi->m_st == Row::StDefine) {
2904  bi->m_st = Row::StPrepare;
2905  bi = bi->m_bi;
2906  }
2907  }
2908  } else if (et == Commit) {
2909  if (rowp->m_op != Row::OpDel) {
2910  rowp->m_st = Row::StCommit;
2911  rowp->m_op = Row::OpNone;
2912  rowp->m_txid = 0;
2913  delete rowp->m_bi;
2914  rowp->m_bi = 0;
2915  } else {
2916  delete rowp;
2917  rowp = 0;
2918  }
2919  } else if (et == Rollback) {
2920  while (rowp != 0 && rowp->m_st != Row::StCommit) {
2921  Row* tmp = rowp;
2922  rowp = rowp->m_bi;
2923  tmp->m_bi = 0;
2924  delete tmp;
2925  }
2926  } else {
2927  assert(false);
2928  }
2929  m_row[i] = rowp;
2930  LL4("post AFTER " << rowp);
2931  }
2932 }
2933 
2934 // operations
2935 
2936 int
2937 Set::insrow(Par par, uint i)
2938 {
2939  assert(m_row[i] != 0);
2940  Row& row = *m_row[i];
2941  CHK(row.insrow(par) == 0);
2942  return 0;
2943 }
2944 
2945 int
2946 Set::updrow(Par par, uint i)
2947 {
2948  assert(m_row[i] != 0);
2949  Row& row = *m_row[i];
2950  CHK(row.updrow(par) == 0);
2951  return 0;
2952 }
2953 
2954 int
2955 Set::updrow(Par par, const ITab& itab, uint i)
2956 {
2957  assert(m_row[i] != 0);
2958  Row& row = *m_row[i];
2959  CHK(row.updrow(par, itab) == 0);
2960  return 0;
2961 }
2962 
2963 int
2964 Set::delrow(Par par, uint i)
2965 {
2966  assert(m_row[i] != 0);
2967  Row& row = *m_row[i];
2968  CHK(row.delrow(par) == 0);
2969  return 0;
2970 }
2971 
2972 int
2973 Set::delrow(Par par, const ITab& itab, uint i)
2974 {
2975  assert(m_row[i] != 0);
2976  Row& row = *m_row[i];
2977  CHK(row.delrow(par, itab) == 0);
2978  return 0;
2979 }
2980 
2981 int
2982 Set::selrow(Par par, const Row& keyrow)
2983 {
2984  const Tab& tab = par.tab();
2985  LL5("selrow " << tab.m_name << " keyrow " << keyrow);
2986  m_keyrow->copyval(keyrow, tab.m_pkmask);
2987  CHK(m_keyrow->selrow(par) == 0);
2988  CHK(getval(par) == 0);
2989  return 0;
2990 }
2991 
2992 int
2993 Set::selrow(Par par, const ITab& itab, const Row& keyrow)
2994 {
2995  LL5("selrow " << itab.m_name << " keyrow " << keyrow);
2996  m_keyrow->copyval(keyrow, itab.m_keymask);
2997  CHK(m_keyrow->selrow(par, itab) == 0);
2998  CHK(getval(par) == 0);
2999  return 0;
3000 }
3001 
3002 int
3003 Set::setrow(Par par, uint i)
3004 {
3005  assert(m_row[i] != 0);
3006  CHK(m_row[i]->setrow(par) == 0);
3007  return 0;
3008 }
3009 
3010 int
3011 Set::getval(Par par)
3012 {
3013  Con& con = par.con();
3014  const Tab& tab = m_tab;
3015  Rsq rsq1(tab.m_cols);
3016  for (uint k = 0; k < tab.m_cols; k++) {
3017  uint k2 = rsq1.next();
3018  CHK(con.getValue(k2, m_rec[k2]) == 0);
3019  }
3020  return 0;
3021 }
3022 
3023 int
3024 Set::getkey(Par par, uint* i)
3025 {
3026  const Tab& tab = m_tab;
3027  uint k = tab.m_keycol;
3028  assert(m_rec[k] != 0);
3029  const char* aRef = m_rec[k]->aRef();
3030  Uint32 key = *(const Uint32*)aRef;
3031  LL5("getkey: " << key);
3032  CHK(key < m_rows);
3033  *i = key;
3034  return 0;
3035 }
3036 
3037 int
3038 Set::putval(uint i, bool force, uint n)
3039 {
3040  const Tab& tab = m_tab;
3041  LL4("putval key=" << i << " row=" << n << " old=" << m_row[i]);
3042  CHK( i<m_rows );
3043  if (m_row[i] != 0) {
3044  assert(force);
3045  delete m_row[i];
3046  m_row[i] = 0;
3047  }
3048  m_row[i] = new Row(tab);
3049  Row& row = *m_row[i];
3050  for (uint k = 0; k < tab.m_cols; k++) {
3051  Val& val = *row.m_val[k];
3052  NdbRecAttr* rec = m_rec[k];
3053  assert(rec != 0);
3054  if (rec->isNULL()) {
3055  val.m_null = true;
3056  continue;
3057  }
3058  const char* aRef = m_rec[k]->aRef();
3059  val.copy(aRef);
3060  val.m_null = false;
3061  }
3062  if (n != (uint) ~0)
3063  {
3064  CHK(n < m_rows);
3065  m_rowkey[n] = i;
3066  }
3067  return 0;
3068 }
3069 
3070 // compare
3071 
3072 void
3073 Set::sort(Par par, const ITab& itab)
3074 {
3075  if (m_rows != 0)
3076  sort(par, itab, 0, m_rows - 1);
3077 }
3078 
3079 void
3080 Set::sort(Par par, const ITab& itab, uint lo, uint hi)
3081 {
3082  assert(lo < m_rows && hi < m_rows && lo <= hi);
3083  Row* const p = m_row[lo];
3084  uint i = lo;
3085  uint j = hi;
3086  while (i < j) {
3087  while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
3088  j--;
3089  if (i < j) {
3090  m_row[i] = m_row[j];
3091  i++;
3092  }
3093  while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
3094  i++;
3095  if (i < j) {
3096  m_row[j] = m_row[i];
3097  j--;
3098  }
3099  }
3100  m_row[i] = p;
3101  if (lo < i)
3102  sort(par, itab, lo, i - 1);
3103  if (hi > i)
3104  sort(par, itab, i + 1, hi);
3105 }
3106 
3107 /*
3108  * set1 (self) is from dml and can contain un-committed operations.
3109  * set2 is from read and contains no operations. "dirty" applies
3110  * to set1: false = use latest row, true = use committed row.
3111  */
3112 int
3113 Set::verify(Par par, const Set& set2, bool pkonly, bool dirty) const
3114 {
3115  const Set& set1 = *this;
3116  assert(&set1.m_tab == &set2.m_tab && set1.m_rows == set2.m_rows);
3117  LL3("verify dirty:" << dirty);
3118  for (uint i = 0; i < set1.m_rows; i++) {
3119  // the row versions we actually compare
3120  const Row* row1p = set1.getrow(i, dirty);
3121  const Row* row2p = set2.getrow(i);
3122  bool ok = true;
3123  int place = 0;
3124  if (row1p == 0) {
3125  if (row2p != 0) {
3126  ok = false;
3127  place = 1;
3128  }
3129  } else {
3130  Row::Op op1 = row1p->m_op;
3131  if (op1 != Row::OpDel) {
3132  if (row2p == 0) {
3133  ok = false;
3134  place = 2;
3135  } else if (row1p->verify(par, *row2p, pkonly) == -1) {
3136  ok = false;
3137  place = 3;
3138  }
3139  } else if (row2p != 0) {
3140  ok = false;
3141  place = 4;
3142  }
3143  }
3144  if (!ok) {
3145  LL1("verify " << i << " failed at " << place);
3146  LL1("row1 " << row1p);
3147  LL1("row2 " << row2p);
3148  CHK(false);
3149  }
3150  }
3151  return 0;
3152 }
3153 
3154 int
3155 Set::verifyorder(Par par, const ITab& itab, bool descending) const
3156 {
3157  for (uint n = 0; n < m_rows; n++) {
3158  uint i2 = m_rowkey[n];
3159  if (i2 == (uint) ~0)
3160  break;
3161  if (n == 0)
3162  continue;
3163  uint i1 = m_rowkey[n - 1];
3164  assert(m_row[i1] != 0 && m_row[i2] != 0);
3165  const Row& row1 = *m_row[i1];
3166  const Row& row2 = *m_row[i2];
3167  bool ok;
3168  if (!descending)
3169  ok= (row1.cmp(par, row2, itab) <= 0);
3170  else
3171  ok= (row1.cmp(par, row2, itab) >= 0);
3172 
3173  if (!ok)
3174  {
3175  LL1("verifyorder " << n << " failed");
3176  LL1("row1 " << row1);
3177  LL1("row2 " << row2);
3178  CHK(false);
3179  }
3180  }
3181  return 0;
3182 }
3183 
3184 // print
3185 
3186 #if 0
3187 static NdbOut&
3188 operator<<(NdbOut& out, const Set& set)
3189 {
3190  for (uint i = 0; i < set.m_rows; i++) {
3191  const Row& row = *set.m_row[i];
3192  if (i > 0)
3193  out << endl;
3194  out << row;
3195  }
3196  return out;
3197 }
3198 #endif
3199 
3200 // BVal - range scan bound
3201 
3202 struct BVal : public Val {
3203  const ICol& m_icol;
3204  int m_type;
3205  BVal(const ICol& icol);
3206  int setbnd(Par par) const;
3207  int setflt(Par par) const;
3208 };
3209 
3210 BVal::BVal(const ICol& icol) :
3211  Val(icol.m_col),
3212  m_icol(icol)
3213 {
3214 }
3215 
3216 int
3217 BVal::setbnd(Par par) const
3218 {
3219  Con& con = par.con();
3220  assert(g_compare_null || !m_null);
3221  const char* addr = !m_null ? (const char*)dataaddr() : 0;
3222  const ICol& icol = m_icol;
3223  CHK(con.setBound(icol.m_num, m_type, addr) == 0);
3224  return 0;
3225 }
3226 
3227 int
3228 BVal::setflt(Par par) const
3229 {
3230  static uint index_bound_to_filter_bound[5] = {
3236  };
3237  Con& con = par.con();
3238  assert(g_compare_null || !m_null);
3239  const char* addr = !m_null ? (const char*)dataaddr() : 0;
3240  const ICol& icol = m_icol;
3241  const Col& col = icol.m_col;
3242  uint length = col.m_bytesize;
3243  uint cond = index_bound_to_filter_bound[m_type];
3244  CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
3245  return 0;
3246 }
3247 
3248 static NdbOut&
3249 operator<<(NdbOut& out, const BVal& bval)
3250 {
3251  const ICol& icol = bval.m_icol;
3252  const Col& col = icol.m_col;
3253  const Val& val = bval;
3254  out << "type=" << bval.m_type;
3255  out << " icol=" << icol.m_num;
3256  out << " col=" << col.m_num << "," << col.m_name;
3257  out << " value=" << val;
3258  return out;
3259 }
3260 
3261 // BSet - set of bounds
3262 
3263 struct BSet {
3264  const Tab& m_tab;
3265  const ITab& m_itab;
3266  uint m_alloc;
3267  uint m_bvals;
3268  BVal** m_bval;
3269  BSet(const Tab& tab, const ITab& itab);
3270  ~BSet();
3271  void reset();
3272  void calc(Par par);
3273  void calcpk(Par par, uint i);
3274  int setbnd(Par par) const;
3275  int setflt(Par par) const;
3276  void filter(Par par, const Set& set, Set& set2) const;
3277 };
3278 
3279 BSet::BSet(const Tab& tab, const ITab& itab) :
3280  m_tab(tab),
3281  m_itab(itab),
3282  m_alloc(2 * itab.m_icols),
3283  m_bvals(0)
3284 {
3285  m_bval = new BVal* [m_alloc];
3286  for (uint i = 0; i < m_alloc; i++) {
3287  m_bval[i] = 0;
3288  }
3289 }
3290 
3291 BSet::~BSet()
3292 {
3293  delete [] m_bval;
3294 }
3295 
3296 void
3297 BSet::reset()
3298 {
3299  while (m_bvals > 0) {
3300  uint i = --m_bvals;
3301  delete m_bval[i];
3302  m_bval[i] = 0;
3303  }
3304 }
3305 
3306 void
3307 BSet::calc(Par par)
3308 {
3309  const ITab& itab = m_itab;
3310  par.m_pctrange = par.m_pctbrange;
3311  reset();
3312  for (uint k = 0; k < itab.m_icols; k++) {
3313  const ICol& icol = *itab.m_icol[k];
3314  for (uint i = 0; i <= 1; i++) {
3315  if (m_bvals == 0 && urandom(100) == 0)
3316  return;
3317  if (m_bvals != 0 && urandom(3) == 0)
3318  return;
3319  assert(m_bvals < m_alloc);
3320  BVal& bval = *new BVal(icol);
3321  m_bval[m_bvals++] = &bval;
3322  bval.m_null = false;
3323  uint sel;
3324  do {
3325  // equality bound only on i==0
3326  sel = urandom(5 - i);
3327  } while (strchr(par.m_bound, '0' + sel) == 0);
3328  if (sel < 2)
3329  bval.m_type = 0 | (1 << i);
3330  else if (sel < 4)
3331  bval.m_type = 1 | (1 << i);
3332  else
3333  bval.m_type = 4;
3334  if (k + 1 < itab.m_icols)
3335  bval.m_type = 4;
3336  if (!g_compare_null)
3337  par.m_pctnull = 0;
3338  if (bval.m_type == 0 || bval.m_type == 1)
3339  par.m_bdir = -1;
3340  if (bval.m_type == 2 || bval.m_type == 3)
3341  par.m_bdir = +1;
3342  do {
3343  bval.calcnokey(par);
3344  if (i == 1) {
3345  assert(m_bvals >= 2);
3346  const BVal& bv1 = *m_bval[m_bvals - 2];
3347  const BVal& bv2 = *m_bval[m_bvals - 1];
3348  if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0)
3349  continue;
3350  }
3351  } while (0);
3352  // equality bound only once
3353  if (bval.m_type == 4)
3354  break;
3355  }
3356  }
3357 }
3358 
3359 void
3360 BSet::calcpk(Par par, uint i)
3361 {
3362  const ITab& itab = m_itab;
3363  reset();
3364  for (uint k = 0; k < itab.m_icols; k++) {
3365  const ICol& icol = *itab.m_icol[k];
3366  const Col& col = icol.m_col;
3367  assert(col.m_pk);
3368  assert(m_bvals < m_alloc);
3369  BVal& bval = *new BVal(icol);
3370  m_bval[m_bvals++] = &bval;
3371  bval.m_type = 4;
3372  bval.calc(par, i);
3373  }
3374 }
3375 
3376 int
3377 BSet::setbnd(Par par) const
3378 {
3379  if (m_bvals != 0) {
3380  Rsq rsq1(m_bvals);
3381  for (uint j = 0; j < m_bvals; j++) {
3382  uint j2 = rsq1.next();
3383  const BVal& bval = *m_bval[j2];
3384  CHK(bval.setbnd(par) == 0);
3385  }
3386  }
3387  return 0;
3388 }
3389 
3390 int
3391 BSet::setflt(Par par) const
3392 {
3393  Con& con = par.con();
3394  CHK(con.getNdbScanFilter() == 0);
3395  CHK(con.beginFilter(NdbScanFilter::AND) == 0);
3396  if (m_bvals != 0) {
3397  Rsq rsq1(m_bvals);
3398  for (uint j = 0; j < m_bvals; j++) {
3399  uint j2 = rsq1.next();
3400  const BVal& bval = *m_bval[j2];
3401  CHK(bval.setflt(par) == 0);
3402  }
3403  // duplicate
3404  if (urandom(5) == 0) {
3405  uint j3 = urandom(m_bvals);
3406  const BVal& bval = *m_bval[j3];
3407  CHK(bval.setflt(par) == 0);
3408  }
3409  }
3410  CHK(con.endFilter() == 0);
3411  return 0;
3412 }
3413 
3414 void
3415 BSet::filter(Par par, const Set& set, Set& set2) const
3416 {
3417  const Tab& tab = m_tab;
3418  const ITab& itab = m_itab;
3419  assert(&tab == &set2.m_tab && set.m_rows == set2.m_rows);
3420  assert(set2.count() == 0);
3421  for (uint i = 0; i < set.m_rows; i++) {
3422  set.lock();
3423  do {
3424  if (set.m_row[i] == 0) {
3425  break;
3426  }
3427  const Row& row = *set.m_row[i];
3428  if (!g_store_null_key) {
3429  bool ok1 = false;
3430  for (uint k = 0; k < itab.m_icols; k++) {
3431  const ICol& icol = *itab.m_icol[k];
3432  const Col& col = icol.m_col;
3433  const Val& val = *row.m_val[col.m_num];
3434  if (!val.m_null) {
3435  ok1 = true;
3436  break;
3437  }
3438  }
3439  if (!ok1)
3440  break;
3441  }
3442  bool ok2 = true;
3443  for (uint j = 0; j < m_bvals; j++) {
3444  const BVal& bval = *m_bval[j];
3445  const ICol& icol = bval.m_icol;
3446  const Col& col = icol.m_col;
3447  const Val& val = *row.m_val[col.m_num];
3448  int ret = bval.cmp(par, val);
3449  LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
3450  if (bval.m_type == 0)
3451  ok2 = (ret <= 0);
3452  else if (bval.m_type == 1)
3453  ok2 = (ret < 0);
3454  else if (bval.m_type == 2)
3455  ok2 = (ret >= 0);
3456  else if (bval.m_type == 3)
3457  ok2 = (ret > 0);
3458  else if (bval.m_type == 4)
3459  ok2 = (ret == 0);
3460  else {
3461  assert(false);
3462  }
3463  if (!ok2)
3464  break;
3465  }
3466  if (!ok2)
3467  break;
3468  assert(set2.m_row[i] == 0);
3469  set2.m_row[i] = new Row(tab);
3470  Row& row2 = *set2.m_row[i];
3471  row2.copy(row, true);
3472  } while (0);
3473  set.unlock();
3474  }
3475 }
3476 
3477 static NdbOut&
3478 operator<<(NdbOut& out, const BSet& bset)
3479 {
3480  out << "bounds=" << bset.m_bvals;
3481  for (uint j = 0; j < bset.m_bvals; j++) {
3482  const BVal& bval = *bset.m_bval[j];
3483  out << " [bound " << j << ": " << bval << "]";
3484  }
3485  return out;
3486 }
3487 
3488 // pk operations
3489 
3490 static int
3491 pkinsert(Par par)
3492 {
3493  Con& con = par.con();
3494  const Tab& tab = par.tab();
3495  Set& set = par.set();
3496  LL3("pkinsert " << tab.m_name);
3497  CHK(con.startTransaction() == 0);
3498  uint batch = 0;
3499  for (uint j = 0; j < par.m_rows; j++) {
3500  uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3501  uint i = thrrow(par, j2);
3502  set.lock();
3503  if (!set.compat(par, i, Row::OpIns)) {
3504  LL3("pkinsert SKIP " << i << " " << set.getrow(i));
3505  set.unlock();
3506  } else {
3507  set.push(i);
3508  set.calc(par, i);
3509  CHK(set.insrow(par, i) == 0);
3510  set.unlock();
3511  LL4("pkinsert key=" << i << " " << set.getrow(i));
3512  batch++;
3513  }
3514  bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3515  if (batch == par.m_batch || lastbatch) {
3516  uint err = par.m_catcherr;
3517  ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3518  CHK(con.execute(et, err) == 0);
3519  set.lock();
3520  set.post(par, !err ? et : Rollback);
3521  set.unlock();
3522  if (err) {
3523  LL1("pkinsert key=" << i << " stop on " << con.errname(err));
3524  break;
3525  }
3526  batch = 0;
3527  if (!lastbatch) {
3528  con.closeTransaction();
3529  CHK(con.startTransaction() == 0);
3530  }
3531  }
3532  }
3533  con.closeTransaction();
3534  return 0;
3535 }
3536 
3537 static int
3538 pkupdate(Par par)
3539 {
3540  Con& con = par.con();
3541  const Tab& tab = par.tab();
3542  Set& set = par.set();
3543  LL3("pkupdate " << tab.m_name);
3544  CHK(con.startTransaction() == 0);
3545  uint batch = 0;
3546  for (uint j = 0; j < par.m_rows; j++) {
3547  uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3548  uint i = thrrow(par, j2);
3549  set.lock();
3550  if (!set.compat(par, i, Row::OpUpd)) {
3551  LL3("pkupdate SKIP " << i << " " << set.getrow(i));
3552  set.unlock();
3553  } else {
3554  set.push(i);
3555  set.copyval(i, tab.m_pkmask);
3556  set.calc(par, i, ~tab.m_pkmask);
3557  CHK(set.updrow(par, i) == 0);
3558  set.unlock();
3559  LL4("pkupdate key=" << i << " " << set.getrow(i));
3560  batch++;
3561  }
3562  bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3563  if (batch == par.m_batch || lastbatch) {
3564  uint err = par.m_catcherr;
3565  ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3566  CHK(con.execute(et, err) == 0);
3567  set.lock();
3568  set.post(par, !err ? et : Rollback);
3569  set.unlock();
3570  if (err) {
3571  LL1("pkupdate key=" << i << ": stop on " << con.errname(err));
3572  break;
3573  }
3574  batch = 0;
3575  if (!lastbatch) {
3576  con.closeTransaction();
3577  CHK(con.startTransaction() == 0);
3578  }
3579  }
3580  }
3581  con.closeTransaction();
3582  return 0;
3583 }
3584 
3585 static int
3586 pkdelete(Par par)
3587 {
3588  Con& con = par.con();
3589  const Tab& tab = par.tab();
3590  Set& set = par.set();
3591  LL3("pkdelete " << tab.m_name);
3592  CHK(con.startTransaction() == 0);
3593  uint batch = 0;
3594  for (uint j = 0; j < par.m_rows; j++) {
3595  uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3596  uint i = thrrow(par, j2);
3597  set.lock();
3598  if (!set.compat(par, i, Row::OpDel)) {
3599  LL3("pkdelete SKIP " << i << " " << set.getrow(i));
3600  set.unlock();
3601  } else {
3602  set.push(i);
3603  set.copyval(i, tab.m_pkmask);
3604  CHK(set.delrow(par, i) == 0);
3605  set.unlock();
3606  LL4("pkdelete key=" << i << " " << set.getrow(i));
3607  batch++;
3608  }
3609  bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3610  if (batch == par.m_batch || lastbatch) {
3611  uint err = par.m_catcherr;
3612  ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3613  CHK(con.execute(et, err) == 0);
3614  set.lock();
3615  set.post(par, !err ? et : Rollback);
3616  set.unlock();
3617  if (err) {
3618  LL1("pkdelete key=" << i << " stop on " << con.errname(err));
3619  break;
3620  }
3621  batch = 0;
3622  if (!lastbatch) {
3623  con.closeTransaction();
3624  CHK(con.startTransaction() == 0);
3625  }
3626  }
3627  }
3628  con.closeTransaction();
3629  return 0;
3630 }
3631 
3632 #if 0
3633 static int
3634 pkread(Par par)
3635 {
3636  Con& con = par.con();
3637  const Tab& tab = par.tab();
3638  Set& set = par.set();
3639  LL3("pkread " << tab.m_name << " verify=" << par.m_verify);
3640  // expected
3641  const Set& set1 = set;
3642  Set set2(tab, set.m_rows);
3643  for (uint i = 0; i < set.m_rows; i++) {
3644  set.lock();
3645  // TODO lock mode
3646  if (!set.compat(par, i, Row::OpREAD)) {
3647  LL3("pkread SKIP " << i << " " << set.getrow(i));
3648  set.unlock();
3649  continue;
3650  }
3651  set.unlock();
3652  CHK(con.startTransaction() == 0);
3653  CHK(set2.selrow(par, *set1.m_row[i]) == 0);
3654  CHK(con.execute(Commit) == 0);
3655  uint i2 = (uint)-1;
3656  CHK(set2.getkey(par, &i2) == 0 && i == i2);
3657  CHK(set2.putval(i, false) == 0);
3658  LL4("row " << set2.count() << " " << set2.getrow(i));
3659  con.closeTransaction();
3660  }
3661  if (par.m_verify)
3662  CHK(set1.verify(par, set2, false) == 0);
3663  return 0;
3664 }
3665 #endif
3666 
3667 static int
3668 pkreadfast(Par par, uint count)
3669 {
3670  Con& con = par.con();
3671  const Tab& tab = par.tab();
3672  const Set& set = par.set();
3673  LL3("pkfast " << tab.m_name);
3674  Row keyrow(tab);
3675  // not batched on purpose
3676  for (uint j = 0; j < count; j++) {
3677  uint i = urandom(set.m_rows);
3678  assert(set.compat(par, i, Row::OpREAD));
3679  CHK(con.startTransaction() == 0);
3680  // define key
3681  keyrow.calc(par, i);
3682  CHK(keyrow.selrow(par) == 0);
3683  NdbRecAttr* rec;
3684  // get 1st column
3685  CHK(con.getValue((Uint32)0, rec) == 0);
3686  CHK(con.execute(Commit) == 0);
3687  con.closeTransaction();
3688  }
3689  return 0;
3690 }
3691 
3692 // hash index operations
3693 
3694 static int
3695 hashindexupdate(Par par, const ITab& itab)
3696 {
3697  Con& con = par.con();
3698  const Tab& tab = par.tab();
3699  Set& set = par.set();
3700  LL3("hashindexupdate " << itab.m_name);
3701  CHK(con.startTransaction() == 0);
3702  uint batch = 0;
3703  for (uint j = 0; j < par.m_rows; j++) {
3704  uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3705  uint i = thrrow(par, j2);
3706  set.lock();
3707  if (!set.compat(par, i, Row::OpUpd)) {
3708  LL3("hashindexupdate SKIP " << i << " " << set.getrow(i));
3709  set.unlock();
3710  } else {
3711  // table pk and index key are not updated
3712  set.push(i);
3713  uint keymask = tab.m_pkmask | itab.m_keymask;
3714  set.copyval(i, keymask);
3715  set.calc(par, i, ~keymask);
3716  CHK(set.updrow(par, itab, i) == 0);
3717  set.unlock();
3718  LL4("hashindexupdate " << i << " " << set.getrow(i));
3719  batch++;
3720  }
3721  bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3722  if (batch == par.m_batch || lastbatch) {
3723  uint err = par.m_catcherr;
3724  ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3725  CHK(con.execute(et, err) == 0);
3726  set.lock();
3727  set.post(par, !err ? et : Rollback);
3728  set.unlock();
3729  if (err) {
3730  LL1("hashindexupdate " << i << " stop on " << con.errname(err));
3731  break;
3732  }
3733  batch = 0;
3734  if (!lastbatch) {
3735  con.closeTransaction();
3736  CHK(con.startTransaction() == 0);
3737  }
3738  }
3739  }
3740  con.closeTransaction();
3741  return 0;
3742 }
3743 
3744 static int
3745 hashindexdelete(Par par, const ITab& itab)
3746 {
3747  Con& con = par.con();
3748  Set& set = par.set();
3749  LL3("hashindexdelete " << itab.m_name);
3750  CHK(con.startTransaction() == 0);
3751  uint batch = 0;
3752  for (uint j = 0; j < par.m_rows; j++) {
3753  uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
3754  uint i = thrrow(par, j2);
3755  set.lock();
3756  if (!set.compat(par, i, Row::OpDel)) {
3757  LL3("hashindexdelete SKIP " << i << " " << set.getrow(i));
3758  set.unlock();
3759  } else {
3760  set.push(i);
3761  set.copyval(i, itab.m_keymask);
3762  CHK(set.delrow(par, itab, i) == 0);
3763  set.unlock();
3764  LL4("hashindexdelete " << i << " " << set.getrow(i));
3765  batch++;
3766  }
3767  bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
3768  if (batch == par.m_batch || lastbatch) {
3769  uint err = par.m_catcherr;
3770  ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
3771  CHK(con.execute(et, err) == 0);
3772  set.lock();
3773  set.post(par, !err ? et : Rollback);
3774  set.unlock();
3775  if (err) {
3776  LL1("hashindexdelete " << i << " stop on " << con.errname(err));
3777  break;
3778  }
3779  batch = 0;
3780  if (!lastbatch) {
3781  con.closeTransaction();
3782  CHK(con.startTransaction() == 0);
3783  }
3784  }
3785  }
3786  con.closeTransaction();
3787  return 0;
3788 }
3789 
3790 static int
3791 hashindexread(Par par, const ITab& itab)
3792 {
3793  Con& con = par.con();
3794  const Tab& tab = par.tab();
3795  Set& set = par.set();
3796  LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify);
3797  // expected
3798  const Set& set1 = set;
3799  Set set2(tab, set.m_rows);
3800  for (uint i = 0; i < set.m_rows; i++) {
3801  set.lock();
3802  // TODO lock mode
3803  if (!set.compat(par, i, Row::OpREAD)) {
3804  LL3("hashindexread SKIP " << i << " " << set.getrow(i));
3805  set.unlock();
3806  continue;
3807  }
3808  set.unlock();
3809  CHK(con.startTransaction() == 0);
3810  CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
3811  CHK(con.execute(Commit) == 0);
3812  uint i2 = (uint)-1;
3813  CHK(set2.getkey(par, &i2) == 0 && i == i2);
3814  CHK(set2.putval(i, false) == 0);
3815  LL4("row " << set2.count() << " " << *set2.m_row[i]);
3816  con.closeTransaction();
3817  }
3818  if (par.m_verify)
3819  CHK(set1.verify(par, set2, false) == 0);
3820  return 0;
3821 }
3822 
3823 // scan read
3824 
3825 static int
3826 scanreadtable(Par par)
3827 {
3828  Con& con = par.con();
3829  const Tab& tab = par.tab();
3830  const Set& set = par.set();
3831  // expected
3832  const Set& set1 = set;
3833  LL3("scanreadtable " << tab.m_name << " lockmode=" << par.m_lockmode << " tupscan=" << par.m_tupscan << " expect=" << set1.count() << " verify=" << par.m_verify);
3834  Set set2(tab, set.m_rows);
3835  CHK(con.startTransaction() == 0);
3836  CHK(con.getNdbScanOperation(tab) == 0);
3837  CHK(con.readTuples(par) == 0);
3838  set2.getval(par);
3839  CHK(con.executeScan() == 0);
3840  uint n = 0;
3841  while (1) {
3842  int ret;
3843  uint err = par.m_catcherr;
3844  CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
3845  if (ret == 1)
3846  break;
3847  if (err) {
3848  LL1("scanreadtable stop on " << con.errname(err));
3849  break;
3850  }
3851  uint i = (uint)-1;
3852  CHK(set2.getkey(par, &i) == 0);
3853  CHK(set2.putval(i, false, n) == 0);
3854  LL4("row " << n << " " << *set2.m_row[i]);
3855  n++;
3856  }
3857  con.closeTransaction();
3858  if (par.m_verify)
3859  CHK(set1.verify(par, set2, false) == 0);
3860  LL3("scanreadtable " << tab.m_name << " done rows=" << n);
3861  return 0;
3862 }
3863 
3864 static int
3865 scanreadtablefast(Par par, uint countcheck)
3866 {
3867  Con& con = par.con();
3868  const Tab& tab = par.tab();
3869  LL3("scanfast " << tab.m_name);
3870  CHK(con.startTransaction() == 0);
3871  CHK(con.getNdbScanOperation(tab) == 0);
3872  CHK(con.readTuples(par) == 0);
3873  // get 1st column
3874  NdbRecAttr* rec;
3875  CHK(con.getValue((Uint32)0, rec) == 0);
3876  CHK(con.executeScan() == 0);
3877  uint count = 0;
3878  while (1) {
3879  int ret;
3880  CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
3881  if (ret == 1)
3882  break;
3883  count++;
3884  }
3885  con.closeTransaction();
3886  CHK(count == countcheck);
3887  return 0;
3888 }
3889 
3890 // try to get interesting bounds
3891 static void
3892 calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
3893 {
3894  while (true) {
3895  bset.calc(par);
3896  bset.filter(par, set, set1);
3897  uint n = set1.count();
3898  // prefer proper subset
3899  if (0 < n && n < set.m_rows)
3900  break;
3901  if (urandom(5) == 0)
3902  break;
3903  set1.reset();
3904  }
3905 }
3906 
3907 static int
3908 scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
3909 {
3910  Con& con = par.con();
3911  const Tab& tab = par.tab();
3912  const Set& set = par.set();
3913  Set set1(tab, set.m_rows);
3914  if (calc) {
3915  calcscanbounds(par, itab, bset, set, set1);
3916  } else {
3917  bset.filter(par, set, set1);
3918  }
3919  LL3("scanreadindex " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
3920  Set set2(tab, set.m_rows);
3921  CHK(con.startTransaction() == 0);
3922  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
3923  CHK(con.readIndexTuples(par) == 0);
3924  CHK(bset.setbnd(par) == 0);
3925  set2.getval(par);
3926  CHK(con.executeScan() == 0);
3927  uint n = 0;
3928  while (1) {
3929  int ret;
3930  uint err = par.m_catcherr;
3931  CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
3932  if (ret == 1)
3933  break;
3934  if (err) {
3935  LL1("scanreadindex stop on " << con.errname(err));
3936  break;
3937  }
3938  uint i = (uint)-1;
3939  CHK(set2.getkey(par, &i) == 0);
3940  CHK(set2.putval(i, par.m_dups, n) == 0);
3941  LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
3942  n++;
3943  }
3944  con.closeTransaction();
3945  if (par.m_verify) {
3946  CHK(set1.verify(par, set2, false) == 0);
3947  if (par.m_ordered)
3948  CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
3949  }
3950  LL3("scanreadindex " << itab.m_name << " done rows=" << n);
3951  return 0;
3952 }
3953 
3954 
3955 static int
3956 scanreadindexmrr(Par par, const ITab& itab, int numBsets)
3957 {
3958  Con& con = par.con();
3959  const Tab& tab = par.tab();
3960  const Set& set = par.set();
3961 
3962  /* Create space for different sets of bounds, expected results and
3963  * results
3964  * Calculate bounds and the sets of rows which would result
3965  */
3966  BSet** boundSets;
3967  Set** expectedResults;
3968  Set** actualResults;
3969  uint* setSizes;
3970 
3971  CHK((boundSets= (BSet**) malloc(numBsets * sizeof(BSet*))) != 0);
3972  CHK((expectedResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
3973  CHK((actualResults= (Set**) malloc(numBsets * sizeof(Set*))) != 0);
3974  CHK((setSizes= (uint*) malloc(numBsets * sizeof(uint))) != 0);
3975 
3976  for (int n=0; n < numBsets; n++)
3977  {
3978  CHK((boundSets[n]= new BSet(tab, itab)) != NULL );
3979  CHK((expectedResults[n]= new Set(tab, set.m_rows)) != NULL);
3980  CHK((actualResults[n]= new Set(tab, set.m_rows)) != NULL);
3981  setSizes[n]= 0;
3982 
3983  Set& results= *expectedResults[n];
3984  /* Calculate some scan bounds which are selective */
3985  do {
3986  results.reset();
3987  calcscanbounds(par, itab, *boundSets[n], set, results);
3988  } while ((*boundSets[n]).m_bvals == 0);
3989  }
3990 
3991  /* Define scan with bounds */
3992  LL3("scanreadindexmrr " << itab.m_name << " ranges= " << numBsets << " lockmode=" << par.m_lockmode << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
3993  Set set2(tab, set.m_rows);
3994  /* Multirange + Read range number for this scan */
3995  par.m_multiRange= true;
3996  CHK(con.startTransaction() == 0);
3997  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
3998  CHK(con.readIndexTuples(par) == 0);
3999  /* Set the bounds */
4000  for (int n=0; n < numBsets; n++)
4001  {
4002  CHK(boundSets[n]->setbnd(par) == 0);
4003  int res= con.m_indexscanop->end_of_bound(n);
4004  if (res != 0)
4005  {
4006  LL1("end_of_bound error : " << con.m_indexscanop->getNdbError().code);
4007  CHK (false);
4008  }
4009  }
4010  set2.getval(par);
4011  CHK(con.executeScan() == 0);
4012  int rows_received= 0;
4013  while (1) {
4014  int ret;
4015  uint err = par.m_catcherr;
4016  CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4017  if (ret == 1)
4018  break;
4019  if (err) {
4020  LL1("scanreadindexmrr stop on " << con.errname(err));
4021  break;
4022  }
4023  uint i = (uint)-1;
4024  /* Put value into set2 temporarily */
4025  CHK(set2.getkey(par, &i) == 0);
4026  CHK(set2.putval(i, false, -1) == 0);
4027 
4028  /* Now move it to the correct set, based on the range no */
4029  int rangeNum= con.m_indexscanop->get_range_no();
4030  CHK(rangeNum < numBsets);
4031  CHK(set2.m_row[i] != NULL);
4032  /* Get rowNum based on what's in the set already (slow) */
4033  CHK(setSizes[rangeNum] == actualResults[rangeNum]->count());
4034  int rowNum= setSizes[rangeNum];
4035  setSizes[rangeNum] ++;
4036  CHK((uint) rowNum < set2.m_rows);
4037  actualResults[rangeNum]->m_row[i]= set2.m_row[i];
4038  actualResults[rangeNum]->m_rowkey[rowNum]= i;
4039  set2.m_row[i]= 0;
4040  LL4("range " << rangeNum << " key " << i << " row " << rowNum << " " << *set2.m_row[i]);
4041  rows_received++;
4042  }
4043  con.closeTransaction();
4044 
4045  /* Verify that each set has the expected rows, and optionally, that
4046  * they're ordered
4047  */
4048  if (par.m_verify)
4049  {
4050  LL4("Verifying " << numBsets << " sets, " << rows_received << " rows");
4051  for (int n=0; n < numBsets; n++)
4052  {
4053  LL5("Set " << n << " of " << expectedResults[n]->count() << " rows");
4054  CHK(expectedResults[n]->verify(par, *actualResults[n], false) == 0);
4055  if (par.m_ordered)
4056  {
4057  LL5("Verifying ordering");
4058  CHK(actualResults[n]->verifyorder(par, itab, par.m_descending) == 0);
4059  }
4060  }
4061  }
4062 
4063  /* Cleanup */
4064  for (int n=0; n < numBsets; n++)
4065  {
4066  boundSets[n]->reset();
4067  delete boundSets[n];
4068  delete expectedResults[n];
4069  delete actualResults[n];
4070  }
4071 
4072  free(boundSets);
4073  free(expectedResults);
4074  free(actualResults);
4075  free(setSizes);
4076 
4077  LL3("scanreadindexmrr " << itab.m_name << " done rows=" << rows_received);
4078  return 0;
4079 }
4080 
4081 static int
4082 scanreadindexfast(Par par, const ITab& itab, const BSet& bset, uint countcheck)
4083 {
4084  Con& con = par.con();
4085  const Tab& tab = par.tab();
4086  LL3("scanfast " << itab.m_name << " " << bset);
4087  LL4(bset);
4088  CHK(con.startTransaction() == 0);
4089  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4090  CHK(con.readIndexTuples(par) == 0);
4091  CHK(bset.setbnd(par) == 0);
4092  // get 1st column
4093  NdbRecAttr* rec;
4094  CHK(con.getValue((Uint32)0, rec) == 0);
4095  CHK(con.executeScan() == 0);
4096  uint count = 0;
4097  while (1) {
4098  int ret;
4099  CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
4100  if (ret == 1)
4101  break;
4102  count++;
4103  }
4104  con.closeTransaction();
4105  CHK(count == countcheck);
4106  return 0;
4107 }
4108 
4109 static int
4110 scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
4111 {
4112  Con& con = par.con();
4113  const Tab& tab = par.tab();
4114  const Set& set = par.set();
4115  Set set1(tab, set.m_rows);
4116  if (calc) {
4117  calcscanbounds(par, itab, bset, set, set1);
4118  } else {
4119  bset.filter(par, set, set1);
4120  }
4121  LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
4122  Set set2(tab, set.m_rows);
4123  CHK(con.startTransaction() == 0);
4124  CHK(con.getNdbScanOperation(tab) == 0);
4125  CHK(con.readTuples(par) == 0);
4126  CHK(bset.setflt(par) == 0);
4127  set2.getval(par);
4128  CHK(con.executeScan() == 0);
4129  uint n = 0;
4130  while (1) {
4131  int ret;
4132  uint err = par.m_catcherr;
4133  CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4134  if (ret == 1)
4135  break;
4136  if (err) {
4137  LL1("scanfilter stop on " << con.errname(err));
4138  break;
4139  }
4140  uint i = (uint)-1;
4141  CHK(set2.getkey(par, &i) == 0);
4142  CHK(set2.putval(i, par.m_dups, n) == 0);
4143  LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
4144  n++;
4145  }
4146  con.closeTransaction();
4147  if (par.m_verify) {
4148  CHK(set1.verify(par, set2, false) == 0);
4149  }
4150  LL3("scanfilter " << itab.m_name << " done rows=" << n);
4151  return 0;
4152 }
4153 
4154 static int
4155 scanreadindex(Par par, const ITab& itab)
4156 {
4157  const Tab& tab = par.tab();
4158  for (uint i = 0; i < par.m_ssloop; i++) {
4159  if (itab.m_type == ITab::OrderedIndex) {
4160  BSet bset(tab, itab);
4161  CHK(scanreadfilter(par, itab, bset, true) == 0);
4162  /* Single range or Multi range scan */
4163  if (randompct(g_opt.m_pctmrr))
4164  CHK(scanreadindexmrr(par,
4165  itab,
4166  1+urandom(g_opt.m_mrrmaxrng-1)) == 0);
4167  else
4168  CHK(scanreadindex(par, itab, bset, true) == 0);
4169  }
4170  }
4171  return 0;
4172 }
4173 
4174 static int
4175 scanreadindex(Par par)
4176 {
4177  const Tab& tab = par.tab();
4178  for (uint i = 0; i < tab.m_itabs; i++) {
4179  if (tab.m_itab[i] == 0)
4180  continue;
4181  const ITab& itab = *tab.m_itab[i];
4182  if (itab.m_type == ITab::OrderedIndex) {
4183  CHK(scanreadindex(par, itab) == 0);
4184  } else {
4185  CHK(hashindexread(par, itab) == 0);
4186  }
4187  }
4188  return 0;
4189 }
4190 
4191 #if 0
4192 static int
4193 scanreadall(Par par)
4194 {
4195  CHK(scanreadtable(par) == 0);
4196  CHK(scanreadindex(par) == 0);
4197  return 0;
4198 }
4199 #endif
4200 
4201 // timing scans
4202 
4203 static int
4204 timescantable(Par par)
4205 {
4206  par.tmr().on();
4207  CHK(scanreadtablefast(par, par.m_totrows) == 0);
4208  par.tmr().off(par.set().m_rows);
4209  return 0;
4210 }
4211 
4212 static int
4213 timescanpkindex(Par par)
4214 {
4215  const Tab& tab = par.tab();
4216  const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
4217  BSet bset(tab, itab);
4218  par.tmr().on();
4219  CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
4220  par.tmr().off(par.set().m_rows);
4221  return 0;
4222 }
4223 
4224 static int
4225 timepkreadtable(Par par)
4226 {
4227  par.tmr().on();
4228  uint count = par.m_samples;
4229  if (count == 0)
4230  count = par.m_totrows;
4231  CHK(pkreadfast(par, count) == 0);
4232  par.tmr().off(count);
4233  return 0;
4234 }
4235 
4236 static int
4237 timepkreadindex(Par par)
4238 {
4239  const Tab& tab = par.tab();
4240  const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
4241  BSet bset(tab, itab);
4242  uint count = par.m_samples;
4243  if (count == 0)
4244  count = par.m_totrows;
4245  par.tmr().on();
4246  for (uint j = 0; j < count; j++) {
4247  uint i = urandom(par.m_totrows);
4248  bset.calcpk(par, i);
4249  CHK(scanreadindexfast(par, itab, bset, 1) == 0);
4250  }
4251  par.tmr().off(count);
4252  return 0;
4253 }
4254 
4255 // scan update
4256 
4257 static int
4258 scanupdatetable(Par par)
4259 {
4260  Con& con = par.con();
4261  const Tab& tab = par.tab();
4262  Set& set = par.set();
4263  LL3("scanupdatetable " << tab.m_name);
4264  Set set2(tab, set.m_rows);
4265  par.m_lockmode = NdbOperation::LM_Exclusive;
4266  CHK(con.startTransaction() == 0);
4267  CHK(con.getNdbScanOperation(tab) == 0);
4268  CHK(con.readTuples(par) == 0);
4269  set2.getval(par);
4270  CHK(con.executeScan() == 0);
4271  uint count = 0;
4272  // updating trans
4273  Con con2;
4274  con2.connect(con);
4275  CHK(con2.startTransaction() == 0);
4276  uint batch = 0;
4277  while (1) {
4278  int ret;
4279  uint32 err = par.m_catcherr;
4280  CHK((ret = con.nextScanResult(true, err)) != -1);
4281  if (ret != 0)
4282  break;
4283  if (err) {
4284  LL1("scanupdatetable [scan] stop on " << con.errname(err));
4285  break;
4286  }
4287  if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
4288  con.closeScan();
4289  break;
4290  }
4291  while (1) {
4292  uint i = (uint)-1;
4293  CHK(set2.getkey(par, &i) == 0);
4294  set.lock();
4295  if (!set.compat(par, i, Row::OpUpd)) {
4296  LL3("scanupdatetable SKIP " << i << " " << set.getrow(i));
4297  } else {
4298  CHKTRY(set2.putval(i, false) == 0, set.unlock());
4299  CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
4300  Par par2 = par;
4301  par2.m_con = &con2;
4302  set.push(i);
4303  set.calc(par, i, ~tab.m_pkmask);
4304  CHKTRY(set.setrow(par2, i) == 0, set.unlock());
4305  LL4("scanupdatetable " << i << " " << set.getrow(i));
4306  batch++;
4307  }
4308  set.unlock();
4309  CHK((ret = con.nextScanResult(false)) != -1);
4310  bool lastbatch = (batch != 0 && ret != 0);
4311  if (batch == par.m_batch || lastbatch) {
4312  uint err = par.m_catcherr;
4313  ExecType et = Commit;
4314  CHK(con2.execute(et, err) == 0);
4315  set.lock();
4316  set.post(par, !err ? et : Rollback);
4317  set.unlock();
4318  if (err) {
4319  LL1("scanupdatetable [update] stop on " << con2.errname(err));
4320  goto out;
4321  }
4322  LL4("scanupdatetable committed batch");
4323  count += batch;
4324  batch = 0;
4325  con2.closeTransaction();
4326  CHK(con2.startTransaction() == 0);
4327  }
4328  if (ret != 0)
4329  break;
4330  }
4331  }
4332 out:
4333  con2.closeTransaction();
4334  LL3("scanupdatetable " << tab.m_name << " rows updated=" << count);
4335  con.closeTransaction();
4336  return 0;
4337 }
4338 
4339 static int
4340 scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
4341 {
4342  Con& con = par.con();
4343  const Tab& tab = par.tab();
4344  Set& set = par.set();
4345  // expected
4346  Set set1(tab, set.m_rows);
4347  if (calc) {
4348  calcscanbounds(par, itab, bset, set, set1);
4349  } else {
4350  bset.filter(par, set, set1);
4351  }
4352  LL3("scanupdateindex " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
4353  Set set2(tab, set.m_rows);
4354  par.m_lockmode = NdbOperation::LM_Exclusive;
4355  CHK(con.startTransaction() == 0);
4356  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4357  CHK(con.readTuples(par) == 0);
4358  CHK(bset.setbnd(par) == 0);
4359  set2.getval(par);
4360  CHK(con.executeScan() == 0);
4361  uint count = 0;
4362  // updating trans
4363  Con con2;
4364  con2.connect(con);
4365  CHK(con2.startTransaction() == 0);
4366  uint batch = 0;
4367  while (1) {
4368  int ret;
4369  uint err = par.m_catcherr;
4370  CHK((ret = con.nextScanResult(true, err)) != -1);
4371  if (ret != 0)
4372  break;
4373  if (err) {
4374  LL1("scanupdateindex [scan] stop on " << con.errname(err));
4375  break;
4376  }
4377  if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
4378  con.closeScan();
4379  break;
4380  }
4381  while (1) {
4382  uint i = (uint)-1;
4383  CHK(set2.getkey(par, &i) == 0);
4384  set.lock();
4385  if (!set.compat(par, i, Row::OpUpd)) {
4386  LL4("scanupdateindex SKIP " << set.getrow(i));
4387  } else {
4388  CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
4389  CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
4390  Par par2 = par;
4391  par2.m_con = &con2;
4392  set.push(i);
4393  uint colmask = !par.m_noindexkeyupdate ? ~0 : ~itab.m_keymask;
4394  set.calc(par, i, colmask);
4395  CHKTRY(set.setrow(par2, i) == 0, set.unlock());
4396  LL4("scanupdateindex " << i << " " << set.getrow(i));
4397  batch++;
4398  }
4399  set.unlock();
4400  CHK((ret = con.nextScanResult(false)) != -1);
4401  bool lastbatch = (batch != 0 && ret != 0);
4402  if (batch == par.m_batch || lastbatch) {
4403  uint err = par.m_catcherr;
4404  ExecType et = Commit;
4405  CHK(con2.execute(et, err) == 0);
4406  set.lock();
4407  set.post(par, !err ? et : Rollback);
4408  set.unlock();
4409  if (err) {
4410  LL1("scanupdateindex [update] stop on " << con2.errname(err));
4411  goto out;
4412  }
4413  LL4("scanupdateindex committed batch");
4414  count += batch;
4415  batch = 0;
4416  con2.closeTransaction();
4417  CHK(con2.startTransaction() == 0);
4418  }
4419  if (ret != 0)
4420  break;
4421  }
4422  }
4423 out:
4424  con2.closeTransaction();
4425  if (par.m_verify) {
4426  CHK(set1.verify(par, set2, true) == 0);
4427  if (par.m_ordered)
4428  CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
4429  }
4430  LL3("scanupdateindex " << itab.m_name << " rows updated=" << count);
4431  con.closeTransaction();
4432  return 0;
4433 }
4434 
4435 static int
4436 scanupdateindex(Par par, const ITab& itab)
4437 {
4438  const Tab& tab = par.tab();
4439  for (uint i = 0; i < par.m_ssloop; i++) {
4440  if (itab.m_type == ITab::OrderedIndex) {
4441  BSet bset(tab, itab);
4442  CHK(scanupdateindex(par, itab, bset, true) == 0);
4443  } else {
4444  CHK(hashindexupdate(par, itab) == 0);
4445  }
4446  }
4447  return 0;
4448 }
4449 
4450 static int
4451 scanupdateindex(Par par)
4452 {
4453  const Tab& tab = par.tab();
4454  for (uint i = 0; i < tab.m_itabs; i++) {
4455  if (tab.m_itab[i] == 0)
4456  continue;
4457  const ITab& itab = *tab.m_itab[i];
4458  CHK(scanupdateindex(par, itab) == 0);
4459  }
4460  return 0;
4461 }
4462 
4463 #if 0
4464 static int
4465 scanupdateall(Par par)
4466 {
4467  CHK(scanupdatetable(par) == 0);
4468  CHK(scanupdateindex(par) == 0);
4469  return 0;
4470 }
4471 #endif
4472 
4473 // medium level routines
4474 
4475 static int
4476 readverifyfull(Par par)
4477 {
4478  if (par.m_noverify)
4479  return 0;
4480  par.m_verify = true;
4481  if (par.m_abortpct != 0) {
4482  LL2("skip verify in this version"); // implement in 5.0 version
4483  par.m_verify = false;
4484  }
4485  par.m_lockmode = NdbOperation::LM_CommittedRead;
4486  const Tab& tab = par.tab();
4487  if (par.m_no == 0) {
4488  // thread 0 scans table
4489  CHK(scanreadtable(par) == 0);
4490  // once more via tup scan
4491  par.m_tupscan = true;
4492  CHK(scanreadtable(par) == 0);
4493  }
4494  // each thread scans different indexes
4495  for (uint i = 0; i < tab.m_itabs; i++) {
4496  if (i % par.m_usedthreads != par.m_no)
4497  continue;
4498  if (tab.m_itab[i] == 0)
4499  continue;
4500  const ITab& itab = *tab.m_itab[i];
4501  if (itab.m_type == ITab::OrderedIndex) {
4502  BSet bset(tab, itab);
4503  CHK(scanreadindex(par, itab, bset, false) == 0);
4504  } else {
4505  CHK(hashindexread(par, itab) == 0);
4506  }
4507  }
4508  return 0;
4509 }
4510 
4511 static int
4512 readverifyindex(Par par)
4513 {
4514  if (par.m_noverify)
4515  return 0;
4516  par.m_verify = true;
4517  par.m_lockmode = NdbOperation::LM_CommittedRead;
4518  uint sel = urandom(10);
4519  if (sel < 9) {
4520  par.m_ordered = true;
4521  par.m_descending = (sel < 5);
4522  }
4523  CHK(scanreadindex(par) == 0);
4524  return 0;
4525 }
4526 
4527 static int
4528 pkops(Par par)
4529 {
4530  const Tab& tab = par.tab();
4531  par.m_randomkey = true;
4532  for (uint i = 0; i < par.m_ssloop; i++) {
4533  uint j = 0;
4534  while (j < tab.m_itabs) {
4535  if (tab.m_itab[j] != 0) {
4536  const ITab& itab = *tab.m_itab[j];
4537  if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0)
4538  break;
4539  }
4540  j++;
4541  }
4542  uint sel = urandom(10);
4543  if (par.m_slno % 2 == 0) {
4544  // favor insert
4545  if (sel < 8) {
4546  CHK(pkinsert(par) == 0);
4547  } else if (sel < 9) {
4548  if (j == tab.m_itabs)
4549  CHK(pkupdate(par) == 0);
4550  else {
4551  const ITab& itab = *tab.m_itab[j];
4552  CHK(hashindexupdate(par, itab) == 0);
4553  }
4554  } else {
4555  if (j == tab.m_itabs)
4556  CHK(pkdelete(par) == 0);
4557  else {
4558  const ITab& itab = *tab.m_itab[j];
4559  CHK(hashindexdelete(par, itab) == 0);
4560  }
4561  }
4562  } else {
4563  // favor delete
4564  if (sel < 1) {
4565  CHK(pkinsert(par) == 0);
4566  } else if (sel < 2) {
4567  if (j == tab.m_itabs)
4568  CHK(pkupdate(par) == 0);
4569  else {
4570  const ITab& itab = *tab.m_itab[j];
4571  CHK(hashindexupdate(par, itab) == 0);
4572  }
4573  } else {
4574  if (j == tab.m_itabs)
4575  CHK(pkdelete(par) == 0);
4576  else {
4577  const ITab& itab = *tab.m_itab[j];
4578  CHK(hashindexdelete(par, itab) == 0);
4579  }
4580  }
4581  }
4582  }
4583  return 0;
4584 }
4585 
4586 static int
4587 pkupdatescanread(Par par)
4588 {
4589  par.m_dups = true;
4590  par.m_catcherr |= Con::ErrDeadlock;
4591  uint sel = urandom(10);
4592  if (sel < 5) {
4593  CHK(pkupdate(par) == 0);
4594  } else if (sel < 6) {
4595  par.m_verify = false;
4596  CHK(scanreadtable(par) == 0);
4597  } else {
4598  par.m_verify = false;
4599  if (sel < 8) {
4600  par.m_ordered = true;
4601  par.m_descending = (sel < 7);
4602  }
4603  CHK(scanreadindex(par) == 0);
4604  }
4605  return 0;
4606 }
4607 
4608 static int
4609 mixedoperations(Par par)
4610 {
4611  par.m_dups = true;
4612  par.m_catcherr |= Con::ErrDeadlock;
4613  par.m_scanstop = par.m_totrows; // randomly close scans
4614  uint sel = urandom(10);
4615  if (sel < 2) {
4616  CHK(pkdelete(par) == 0);
4617  } else if (sel < 4) {
4618  CHK(pkupdate(par) == 0);
4619  } else if (sel < 6) {
4620  CHK(scanupdatetable(par) == 0);
4621  } else {
4622  if (sel < 8) {
4623  par.m_ordered = true;
4624  par.m_descending = (sel < 7);
4625  }
4626  CHK(scanupdateindex(par) == 0);
4627  }
4628  return 0;
4629 }
4630 
4631 static int
4632 parallelorderedupdate(Par par)
4633 {
4634  const Tab& tab = par.tab();
4635  uint k = 0;
4636  for (uint i = 0; i < tab.m_itabs; i++) {
4637  if (tab.m_itab[i] == 0)
4638  continue;
4639  const ITab& itab = *tab.m_itab[i];
4640  if (itab.m_type != ITab::OrderedIndex)
4641  continue;
4642  // cannot sync threads yet except via subloop
4643  if (k++ == par.m_slno % tab.m_orderedindexes) {
4644  LL3("parallelorderedupdate: " << itab.m_name);
4645  par.m_noindexkeyupdate = true;
4646  par.m_ordered = true;
4647  par.m_descending = (par.m_slno != 0);
4648  par.m_dups = false;
4649  par.m_verify = true;
4650  BSet bset(tab, itab); // empty bounds
4651  // prefer empty bounds
4652  uint sel = urandom(10);
4653  CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
4654  }
4655  }
4656  return 0;
4657 }
4658 
4659 static int
4660 pkupdateindexbuild(Par par)
4661 {
4662  if (par.m_no == 0) {
4663  NdbSleep_MilliSleep(10 + urandom(100));
4664  CHK(createindex(par) == 0);
4665  } else {
4666  NdbSleep_MilliSleep(10 + urandom(100));
4667  par.m_randomkey = true;
4668  CHK(pkupdate(par) == 0);
4669  }
4670  return 0;
4671 }
4672 
4673 // savepoint tests (single thread for now)
4674 
4675 struct Spt {
4676  enum Res { Committed, Latest, Deadlock };
4677  bool m_same; // same transaction
4679  Res m_res;
4680 };
4681 
4682 static Spt sptlist[] = {
4683  { 1, NdbOperation::LM_Read, Spt::Latest },
4684  { 1, NdbOperation::LM_Exclusive, Spt::Latest },
4685  { 1, NdbOperation::LM_CommittedRead, Spt::Latest },
4686  { 0, NdbOperation::LM_Read, Spt::Deadlock },
4687  { 0, NdbOperation::LM_Exclusive, Spt::Deadlock },
4688  { 0, NdbOperation::LM_CommittedRead, Spt::Committed }
4689 };
4690 static uint sptcount = sizeof(sptlist)/sizeof(sptlist[0]);
4691 
4692 static int
4693 savepointreadpk(Par par, Spt spt)
4694 {
4695  LL3("savepointreadpk");
4696  Con& con = par.con();
4697  const Tab& tab = par.tab();
4698  Set& set = par.set();
4699  const Set& set1 = set;
4700  Set set2(tab, set.m_rows);
4701  uint n = 0;
4702  for (uint i = 0; i < set.m_rows; i++) {
4703  set.lock();
4704  if (!set.compat(par, i, Row::OpREAD)) {
4705  LL4("savepointreadpk SKIP " << i << " " << set.getrow(i));
4706  set.unlock();
4707  continue;
4708  }
4709  set.unlock();
4710  CHK(set2.selrow(par, *set1.m_row[i]) == 0);
4711  uint err = par.m_catcherr | Con::ErrDeadlock;
4712  ExecType et = NoCommit;
4713  CHK(con.execute(et, err) == 0);
4714  if (err) {
4715  if (err & Con::ErrDeadlock) {
4716  CHK(spt.m_res == Spt::Deadlock);
4717  // all rows have same behaviour
4718  CHK(n == 0);
4719  }
4720  LL1("savepointreadpk stop on " << con.errname(err));
4721  break;
4722  }
4723  uint i2 = (uint)-1;
4724  CHK(set2.getkey(par, &i2) == 0 && i == i2);
4725  CHK(set2.putval(i, false) == 0);
4726  LL4("row " << set2.count() << " " << set2.getrow(i));
4727  n++;
4728  }
4729  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4730  if (spt.m_res != Spt::Deadlock)
4731  CHK(set1.verify(par, set2, false, dirty) == 0);
4732  return 0;
4733 }
4734 
4735 static int
4736 savepointreadhashindex(Par par, Spt spt)
4737 {
4738  if (spt.m_lm == NdbOperation::LM_CommittedRead && !spt.m_same) {
4739  LL1("skip hash index dirty read");
4740  return 0;
4741  }
4742  LL3("savepointreadhashindex");
4743  Con& con = par.con();
4744  const Tab& tab = par.tab();
4745  const ITab& itab = par.itab();
4746  Set& set = par.set();
4747  const Set& set1 = set;
4748  Set set2(tab, set.m_rows);
4749  uint n = 0;
4750  for (uint i = 0; i < set.m_rows; i++) {
4751  set.lock();
4752  if (!set.compat(par, i, Row::OpREAD)) {
4753  LL3("savepointreadhashindex SKIP " << i << " " << set.getrow(i));
4754  set.unlock();
4755  continue;
4756  }
4757  set.unlock();
4758  CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
4759  uint err = par.m_catcherr | Con::ErrDeadlock;
4760  ExecType et = NoCommit;
4761  CHK(con.execute(et, err) == 0);
4762  if (err) {
4763  if (err & Con::ErrDeadlock) {
4764  CHK(spt.m_res == Spt::Deadlock);
4765  // all rows have same behaviour
4766  CHK(n == 0);
4767  }
4768  LL1("savepointreadhashindex stop on " << con.errname(err));
4769  break;
4770  }
4771  uint i2 = (uint)-1;
4772  CHK(set2.getkey(par, &i2) == 0 && i == i2);
4773  CHK(set2.putval(i, false) == 0);
4774  LL4("row " << set2.count() << " " << *set2.m_row[i]);
4775  n++;
4776  }
4777  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4778  if (spt.m_res != Spt::Deadlock)
4779  CHK(set1.verify(par, set2, false, dirty) == 0);
4780  return 0;
4781 }
4782 
4783 static int
4784 savepointscantable(Par par, Spt spt)
4785 {
4786  LL3("savepointscantable");
4787  Con& con = par.con();
4788  const Tab& tab = par.tab();
4789  const Set& set = par.set();
4790  const Set& set1 = set; // not modifying current set
4791  Set set2(tab, set.m_rows); // scan result
4792  CHK(con.getNdbScanOperation(tab) == 0);
4793  CHK(con.readTuples(par) == 0);
4794  set2.getval(par); // getValue all columns
4795  CHK(con.executeScan() == 0);
4796  bool deadlock = false;
4797  uint n = 0;
4798  while (1) {
4799  int ret;
4800  uint err = par.m_catcherr | Con::ErrDeadlock;
4801  CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4802  if (ret == 1)
4803  break;
4804  if (err) {
4805  if (err & Con::ErrDeadlock) {
4806  CHK(spt.m_res == Spt::Deadlock);
4807  // all rows have same behaviour
4808  CHK(n == 0);
4809  deadlock = true;
4810  }
4811  LL1("savepointscantable stop on " << con.errname(err));
4812  break;
4813  }
4814  CHK(spt.m_res != Spt::Deadlock);
4815  uint i = (uint)-1;
4816  CHK(set2.getkey(par, &i) == 0);
4817  CHK(set2.putval(i, false, n) == 0);
4818  LL4("row " << n << " key " << i << " " << set2.getrow(i));
4819  n++;
4820  }
4821  if (set1.m_rows > 0) {
4822  if (!deadlock)
4823  CHK(spt.m_res != Spt::Deadlock);
4824  else
4825  CHK(spt.m_res == Spt::Deadlock);
4826  }
4827  LL2("savepointscantable " << n << " rows");
4828  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4829  if (spt.m_res != Spt::Deadlock)
4830  CHK(set1.verify(par, set2, false, dirty) == 0);
4831  return 0;
4832 }
4833 
4834 static int
4835 savepointscanindex(Par par, Spt spt)
4836 {
4837  LL3("savepointscanindex");
4838  Con& con = par.con();
4839  const Tab& tab = par.tab();
4840  const ITab& itab = par.itab();
4841  const Set& set = par.set();
4842  const Set& set1 = set;
4843  Set set2(tab, set.m_rows);
4844  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
4845  CHK(con.readIndexTuples(par) == 0);
4846  set2.getval(par);
4847  CHK(con.executeScan() == 0);
4848  bool deadlock = false;
4849  uint n = 0;
4850  while (1) {
4851  int ret;
4852  uint err = par.m_catcherr | Con::ErrDeadlock;
4853  CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
4854  if (ret == 1)
4855  break;
4856  if (err) {
4857  if (err & Con::ErrDeadlock) {
4858  CHK(spt.m_res == Spt::Deadlock);
4859  // all rows have same behaviour
4860  CHK(n == 0);
4861  deadlock = true;
4862  }
4863  LL1("savepointscanindex stop on " << con.errname(err));
4864  break;
4865  }
4866  CHK(spt.m_res != Spt::Deadlock);
4867  uint i = (uint)-1;
4868  CHK(set2.getkey(par, &i) == 0);
4869  CHK(set2.putval(i, par.m_dups, n) == 0);
4870  LL4("row " << n << " key " << i << " " << set2.getrow(i));
4871  n++;
4872  }
4873  if (set1.m_rows > 0) {
4874  if (!deadlock)
4875  CHK(spt.m_res != Spt::Deadlock);
4876  else
4877  CHK(spt.m_res == Spt::Deadlock);
4878  }
4879  LL2("savepointscanindex " << n << " rows");
4880  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
4881  if (spt.m_res != Spt::Deadlock)
4882  CHK(set1.verify(par, set2, false, dirty) == 0);
4883  return 0;
4884 }
4885 
4886 typedef int (*SptFun)(Par, Spt);
4887 
4888 static int
4889 savepointtest(Par par, Spt spt, SptFun fun)
4890 {
4891  Con& con = par.con();
4892  Par par2 = par;
4893  Con con2;
4894  if (!spt.m_same) {
4895  con2.connect(con); // copy ndb reference
4896  par2.m_con = &con2;
4897  CHK(con2.startTransaction() == 0);
4898  }
4899  par2.m_lockmode = spt.m_lm;
4900  CHK((*fun)(par2, spt) == 0);
4901  if (!spt.m_same) {
4902  con2.closeTransaction();
4903  }
4904  return 0;
4905 }
4906 
4907 static int
4908 savepointtest(Par par, const char* op)
4909 {
4910  Con& con = par.con();
4911  const Tab& tab = par.tab();
4912  Set& set = par.set();
4913  LL2("savepointtest op=\"" << op << "\"");
4914  CHK(con.startTransaction() == 0);
4915  const char* p = op;
4916  char c;
4917  while ((c = *p++) != 0) {
4918  uint j;
4919  for (j = 0; j < par.m_rows; j++) {
4920  uint i = thrrow(par, j);
4921  if (c == 'c') {
4922  ExecType et = Commit;
4923  CHK(con.execute(et) == 0);
4924  set.lock();
4925  set.post(par, et);
4926  set.unlock();
4927  CHK(con.startTransaction() == 0);
4928  } else {
4929  set.lock();
4930  set.push(i);
4931  if (c == 'i') {
4932  set.calc(par, i);
4933  CHK(set.insrow(par, i) == 0);
4934  } else if (c == 'u') {
4935  set.copyval(i, tab.m_pkmask);
4936  set.calc(par, i, ~tab.m_pkmask);
4937  CHK(set.updrow(par, i) == 0);
4938  } else if (c == 'd') {
4939  set.copyval(i, tab.m_pkmask);
4940  CHK(set.delrow(par, i) == 0);
4941  } else {
4942  assert(false);
4943  }
4944  set.unlock();
4945  }
4946  }
4947  }
4948  {
4949  ExecType et = NoCommit;
4950  CHK(con.execute(et) == 0);
4951  set.lock();
4952  set.post(par, et);
4953  set.unlock();
4954  }
4955  for (uint k = 0; k < sptcount; k++) {
4956  Spt spt = sptlist[k];
4957  LL2("spt lm=" << spt.m_lm << " same=" << spt.m_same);
4958  CHK(savepointtest(par, spt, &savepointreadpk) == 0);
4959  CHK(savepointtest(par, spt, &savepointscantable) == 0);
4960  for (uint i = 0; i < tab.m_itabs; i++) {
4961  if (tab.m_itab[i] == 0)
4962  continue;
4963  const ITab& itab = *tab.m_itab[i];
4964  par.m_itab = &itab;
4965  if (itab.m_type == ITab::OrderedIndex)
4966  CHK(savepointtest(par, spt, &savepointscanindex) == 0);
4967  else
4968  CHK(savepointtest(par, spt, &savepointreadhashindex) == 0);
4969  par.m_itab = 0;
4970  }
4971  }
4972  {
4973  ExecType et = Rollback;
4974  CHK(con.execute(et) == 0);
4975  set.lock();
4976  set.post(par, et);
4977  set.unlock();
4978  }
4979  con.closeTransaction();
4980  return 0;
4981 }
4982 
4983 static int
4984 savepointtest(Par par)
4985 {
4986  assert(par.m_usedthreads == 1);
4987  const char* oplist[] = {
4988  // each based on previous and "c" not last
4989  "i",
4990  "icu",
4991  "uuuuu",
4992  "d",
4993  "dciuuuuud",
4994  0
4995  };
4996  int i;
4997  for (i = 0; oplist[i] != 0; i++) {
4998  CHK(savepointtest(par, oplist[i]) == 0);
4999  }
5000  return 0;
5001 }
5002 
5003 static int
5004 halloweentest(Par par, const ITab& itab)
5005 {
5006  LL2("halloweentest " << itab.m_name);
5007  Con& con = par.con();
5008  const Tab& tab = par.tab();
5009  Set& set = par.set();
5010  CHK(con.startTransaction() == 0);
5011  // insert 1 row
5012  uint i = 0;
5013  set.push(i);
5014  set.calc(par, i);
5015  CHK(set.insrow(par, i) == 0);
5016  CHK(con.execute(NoCommit) == 0);
5017  // scan via index until Set m_rows reached
5018  uint scancount = 0;
5019  bool stop = false;
5020  while (!stop) {
5021  par.m_lockmode = // makes no difference
5022  scancount % 2 == 0 ? NdbOperation::LM_CommittedRead :
5024  Set set1(tab, set.m_rows); // expected scan result
5025  Set set2(tab, set.m_rows); // actual scan result
5026  BSet bset(tab, itab);
5027  calcscanbounds(par, itab, bset, set, set1);
5028  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
5029  CHK(con.readIndexTuples(par) == 0);
5030  CHK(bset.setbnd(par) == 0);
5031  set2.getval(par);
5032  CHK(con.executeScan() == 0);
5033  const uint savepoint = i;
5034  LL3("scancount=" << scancount << " savepoint=" << savepoint);
5035  uint n = 0;
5036  while (1) {
5037  int ret;
5038  CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
5039  if (ret == 1)
5040  break;
5041  uint k = (uint)-1;
5042  CHK(set2.getkey(par, &k) == 0);
5043  CHK(set2.putval(k, false, n) == 0);
5044  LL3("row=" << n << " key=" << k);
5045  CHK(k <= savepoint);
5046  if (++i == set.m_rows) {
5047  stop = true;
5048  break;
5049  }
5050  set.push(i);
5051  set.calc(par, i);
5052  CHK(set.insrow(par, i) == 0);
5053  CHK(con.execute(NoCommit) == 0);
5054  n++;
5055  }
5056  con.closeScan();
5057  LL3("scanrows=" << n);
5058  if (!stop) {
5059  CHK(set1.verify(par, set2, false) == 0);
5060  }
5061  scancount++;
5062  }
5063  CHK(con.execute(Commit) == 0);
5064  set.post(par, Commit);
5065  assert(set.count() == set.m_rows);
5066  CHK(pkdelete(par) == 0);
5067  return 0;
5068 }
5069 
5070 static int
5071 halloweentest(Par par)
5072 {
5073  assert(par.m_usedthreads == 1);
5074  const Tab& tab = par.tab();
5075  for (uint i = 0; i < tab.m_itabs; i++) {
5076  if (tab.m_itab[i] == 0)
5077  continue;
5078  const ITab& itab = *tab.m_itab[i];
5079  if (itab.m_type == ITab::OrderedIndex)
5080  CHK(halloweentest(par, itab) == 0);
5081  }
5082  return 0;
5083 }
5084 
5085 // threads
5086 
5087 typedef int (*TFunc)(Par par);
5088 enum TMode { ST = 1, MT = 2 };
5089 
5090 extern "C" { static void* runthread(void* arg); }
5091 
5092 struct Thr {
5093  enum State { Wait, Start, Stop, Exit };
5094  State m_state;
5095  Par m_par;
5096  pthread_t m_id;
5097  NdbThread* m_thread;
5098  NdbMutex* m_mutex;
5099  NdbCondition* m_cond;
5100  TFunc m_func;
5101  int m_ret;
5102  void* m_status;
5103  char m_tmp[20]; // used for debug msg prefix
5104  Thr(Par par, uint n);
5105  ~Thr();
5106  int run();
5107  void start();
5108  void stop();
5109  void exit();
5110  //
5111  void lock() {
5112  NdbMutex_Lock(m_mutex);
5113  }
5114  void unlock() {
5115  NdbMutex_Unlock(m_mutex);
5116  }
5117  void wait() {
5118  NdbCondition_Wait(m_cond, m_mutex);
5119  }
5120  void signal() {
5121  NdbCondition_Signal(m_cond);
5122  }
5123  void join() {
5124  NdbThread_WaitFor(m_thread, &m_status);
5125  m_thread = 0;
5126  }
5127 };
5128 
5129 Thr::Thr(Par par, uint n) :
5130  m_state(Wait),
5131  m_par(par),
5132  m_thread(0),
5133  m_mutex(0),
5134  m_cond(0),
5135  m_func(0),
5136  m_ret(0),
5137  m_status(0)
5138 {
5139  m_par.m_no = n;
5140  char buf[10];
5141  sprintf(buf, "thr%03u", par.m_no);
5142  const char* name = strcpy(new char[10], buf);
5143  // mutex
5144  m_mutex = NdbMutex_Create();
5145  m_cond = NdbCondition_Create();
5146  assert(m_mutex != 0 && m_cond != 0);
5147  // run
5148  const uint stacksize = 256 * 1024;
5149  const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
5150  m_thread = NdbThread_Create(runthread, (void**)this, stacksize, name, prio);
5151 }
5152 
5153 Thr::~Thr()
5154 {
5155  if (m_thread != 0) {
5156  NdbThread_Destroy(&m_thread);
5157  m_thread = 0;
5158  }
5159  if (m_cond != 0) {
5160  NdbCondition_Destroy(m_cond);
5161  m_cond = 0;
5162  }
5163  if (m_mutex != 0) {
5164  NdbMutex_Destroy(m_mutex);
5165  m_mutex = 0;
5166  }
5167 }
5168 
5169 static void*
5170 runthread(void* arg)
5171 {
5172  Thr& thr = *(Thr*)arg;
5173  thr.m_id = pthread_self();
5174  if (thr.run() < 0) {
5175  LL1("exit on error");
5176  } else {
5177  LL4("exit ok");
5178  }
5179  return 0;
5180 }
5181 
5182 int
5183 Thr::run()
5184 {
5185  LL4("run");
5186  Con con;
5187  CHK(con.connect() == 0);
5188  m_par.m_con = &con;
5189  LL4("connected");
5190  while (1) {
5191  lock();
5192  while (m_state != Start && m_state != Exit) {
5193  LL4("wait");
5194  wait();
5195  }
5196  if (m_state == Exit) {
5197  LL4("exit");
5198  unlock();
5199  break;
5200  }
5201  LL4("start");
5202  assert(m_state == Start);
5203  m_ret = (*m_func)(m_par);
5204  m_state = Stop;
5205  LL4("stop");
5206  signal();
5207  unlock();
5208  if (m_ret == -1) {
5209  if (m_par.m_cont)
5210  LL1("continue running due to -cont");
5211  else
5212  return -1;
5213  }
5214  }
5215  con.disconnect();
5216  return 0;
5217 }
5218 
5219 void
5220 Thr::start()
5221 {
5222  lock();
5223  m_state = Start;
5224  signal();
5225  unlock();
5226 }
5227 
5228 void
5229 Thr::stop()
5230 {
5231  lock();
5232  while (m_state != Stop)
5233  wait();
5234  m_state = Wait;
5235  unlock();
5236 }
5237 
5238 void
5239 Thr::exit()
5240 {
5241  lock();
5242  m_state = Exit;
5243  signal();
5244  unlock();
5245 }
5246 
5247 // test run
5248 
5249 static Thr** g_thrlist = 0;
5250 
5251 static Thr*
5252 getthr()
5253 {
5254  if (g_thrlist != 0) {
5255  pthread_t id = pthread_self();
5256  for (uint n = 0; n < g_opt.m_threads; n++) {
5257  if (g_thrlist[n] != 0) {
5258  Thr& thr = *g_thrlist[n];
5259  if (pthread_equal(thr.m_id, id))
5260  return &thr;
5261  }
5262  }
5263  }
5264  return 0;
5265 }
5266 
5267 // for debug messages (par.m_no not available)
5268 static const char*
5269 getthrprefix()
5270 {
5271  Thr* thrp = getthr();
5272  if (thrp != 0) {
5273  Thr& thr = *thrp;
5274  uint n = thr.m_par.m_no;
5275  uint m =
5276  g_opt.m_threads < 10 ? 1 :
5277  g_opt.m_threads < 100 ? 2 : 3;
5278  sprintf(thr.m_tmp, "[%0*u] ", m, n);
5279  return thr.m_tmp;
5280  }
5281  return "";
5282 }
5283 
5284 static int
5285 runstep(Par par, const char* fname, TFunc func, uint mode)
5286 {
5287  LL2("step: " << fname);
5288  const int threads = (mode & ST ? 1 : par.m_usedthreads);
5289  int n;
5290  for (n = 0; n < threads; n++) {
5291  LL4("start " << n);
5292  Thr& thr = *g_thrlist[n];
5293  Par oldpar = thr.m_par;
5294  // update parameters
5295  thr.m_par = par;
5296  thr.m_par.m_no = oldpar.m_no;
5297  thr.m_par.m_con = oldpar.m_con;
5298  thr.m_func = func;
5299  thr.start();
5300  }
5301  uint errs = 0;
5302  for (n = threads - 1; n >= 0; n--) {
5303  LL4("stop " << n);
5304  Thr& thr = *g_thrlist[n];
5305  thr.stop();
5306  if (thr.m_ret != 0)
5307  errs++;
5308  }
5309  CHK(errs == 0);
5310  return 0;
5311 }
5312 
5313 #define RUNSTEP(par, func, mode) \
5314  CHK(runstep(par, #func, func, mode) == 0)
5315 
5316 #define SUBLOOP(par) \
5317  "sloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
5318  par.m_tab->m_name << "/" << par.m_slno
5319 
5320 static int
5321 tbuild(Par par)
5322 {
5323  RUNSTEP(par, droptable, ST);
5324  RUNSTEP(par, createtable, ST);
5325  RUNSTEP(par, invalidatetable, MT);
5326  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5327  LL1(SUBLOOP(par));
5328  if (par.m_slno % 3 == 0) {
5329  RUNSTEP(par, createindex, ST);
5330  RUNSTEP(par, invalidateindex, MT);
5331  RUNSTEP(par, pkinsert, MT);
5332  RUNSTEP(par, pkupdate, MT);
5333  } else if (par.m_slno % 3 == 1) {
5334  RUNSTEP(par, pkinsert, MT);
5335  RUNSTEP(par, createindex, ST);
5336  RUNSTEP(par, invalidateindex, MT);
5337  RUNSTEP(par, pkupdate, MT);
5338  } else {
5339  RUNSTEP(par, pkinsert, MT);
5340  RUNSTEP(par, pkupdate, MT);
5341  RUNSTEP(par, createindex, ST);
5342  RUNSTEP(par, invalidateindex, MT);
5343  }
5344  RUNSTEP(par, readverifyfull, MT);
5345  // leave last one
5346  if (par.m_slno + 1 < par.m_sloop) {
5347  RUNSTEP(par, pkdelete, MT);
5348  RUNSTEP(par, readverifyfull, MT);
5349  RUNSTEP(par, dropindex, ST);
5350  }
5351  }
5352  return 0;
5353 }
5354 
5355 static int
5356 tindexscan(Par par)
5357 {
5358  RUNSTEP(par, droptable, ST);
5359  RUNSTEP(par, createtable, ST);
5360  RUNSTEP(par, invalidatetable, MT);
5361  RUNSTEP(par, createindex, ST);
5362  RUNSTEP(par, invalidateindex, MT);
5363  RUNSTEP(par, pkinsert, MT);
5364  RUNSTEP(par, readverifyfull, MT);
5365  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5366  LL1(SUBLOOP(par));
5367  RUNSTEP(par, readverifyindex, MT);
5368  }
5369  return 0;
5370 }
5371 
5372 
5373 static int
5374 tpkops(Par par)
5375 {
5376  RUNSTEP(par, droptable, ST);
5377  RUNSTEP(par, createtable, ST);
5378  RUNSTEP(par, invalidatetable, MT);
5379  RUNSTEP(par, createindex, ST);
5380  RUNSTEP(par, invalidateindex, MT);
5381  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5382  LL1(SUBLOOP(par));
5383  RUNSTEP(par, pkops, MT);
5384  LL2("rows=" << par.set().count());
5385  RUNSTEP(par, readverifyfull, MT);
5386  }
5387  return 0;
5388 }
5389 
5390 static int
5391 tpkopsread(Par par)
5392 {
5393  RUNSTEP(par, droptable, ST);
5394  RUNSTEP(par, createtable, ST);
5395  RUNSTEP(par, invalidatetable, MT);
5396  RUNSTEP(par, pkinsert, MT);
5397  RUNSTEP(par, createindex, ST);
5398  RUNSTEP(par, invalidateindex, MT);
5399  RUNSTEP(par, readverifyfull, MT);
5400  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5401  LL1(SUBLOOP(par));
5402  RUNSTEP(par, pkupdatescanread, MT);
5403  RUNSTEP(par, readverifyfull, MT);
5404  }
5405  RUNSTEP(par, pkdelete, MT);
5406  RUNSTEP(par, readverifyfull, MT);
5407  return 0;
5408 }
5409 
5410 static int
5411 tmixedops(Par par)
5412 {
5413  RUNSTEP(par, droptable, ST);
5414  RUNSTEP(par, createtable, ST);
5415  RUNSTEP(par, invalidatetable, MT);
5416  RUNSTEP(par, pkinsert, MT);
5417  RUNSTEP(par, createindex, ST);
5418  RUNSTEP(par, invalidateindex, MT);
5419  RUNSTEP(par, readverifyfull, MT);
5420  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5421  LL1(SUBLOOP(par));
5422  RUNSTEP(par, mixedoperations, MT);
5423  RUNSTEP(par, readverifyfull, MT);
5424  }
5425  return 0;
5426 }
5427 
5428 static int
5429 tbusybuild(Par par)
5430 {
5431  RUNSTEP(par, droptable, ST);
5432  RUNSTEP(par, createtable, ST);
5433  RUNSTEP(par, invalidatetable, MT);
5434  RUNSTEP(par, pkinsert, MT);
5435  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5436  LL1(SUBLOOP(par));
5437  RUNSTEP(par, pkupdateindexbuild, MT);
5438  RUNSTEP(par, invalidateindex, MT);
5439  RUNSTEP(par, readverifyfull, MT);
5440  RUNSTEP(par, dropindex, ST);
5441  }
5442  return 0;
5443 }
5444 
5445 static int
5446 trollback(Par par)
5447 {
5448  par.m_abortpct = 50;
5449  RUNSTEP(par, droptable, ST);
5450  RUNSTEP(par, createtable, ST);
5451  RUNSTEP(par, invalidatetable, MT);
5452  RUNSTEP(par, pkinsert, MT);
5453  RUNSTEP(par, createindex, ST);
5454  RUNSTEP(par, invalidateindex, MT);
5455  RUNSTEP(par, readverifyfull, MT);
5456  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5457  LL1(SUBLOOP(par));
5458  RUNSTEP(par, mixedoperations, MT);
5459  RUNSTEP(par, readverifyfull, MT);
5460  }
5461  return 0;
5462 }
5463 
5464 static int
5465 tparupdate(Par par)
5466 {
5467  RUNSTEP(par, droptable, ST);
5468  RUNSTEP(par, createtable, ST);
5469  RUNSTEP(par, invalidatetable, MT);
5470  RUNSTEP(par, pkinsert, MT);
5471  RUNSTEP(par, createindex, ST);
5472  RUNSTEP(par, invalidateindex, MT);
5473  RUNSTEP(par, readverifyfull, MT);
5474  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5475  LL1(SUBLOOP(par));
5476  RUNSTEP(par, parallelorderedupdate, MT);
5477  RUNSTEP(par, readverifyfull, MT);
5478  }
5479  return 0;
5480 }
5481 
5482 static int
5483 tsavepoint(Par par)
5484 {
5485  RUNSTEP(par, droptable, ST);
5486  RUNSTEP(par, createtable, ST);
5487  RUNSTEP(par, invalidatetable, MT);
5488  RUNSTEP(par, createindex, ST);
5489  RUNSTEP(par, invalidateindex, MT);
5490  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5491  LL1(SUBLOOP(par));
5492  RUNSTEP(par, savepointtest, MT);
5493  RUNSTEP(par, readverifyfull, MT);
5494  }
5495  return 0;
5496 }
5497 
5498 static int
5499 thalloween(Par par)
5500 {
5501  RUNSTEP(par, droptable, ST);
5502  RUNSTEP(par, createtable, ST);
5503  RUNSTEP(par, invalidatetable, MT);
5504  RUNSTEP(par, createindex, ST);
5505  RUNSTEP(par, invalidateindex, MT);
5506  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5507  LL1(SUBLOOP(par));
5508  RUNSTEP(par, halloweentest, MT);
5509  }
5510  return 0;
5511 }
5512 
5513 static int
5514 ttimebuild(Par par)
5515 {
5516  Tmr t1;
5517  RUNSTEP(par, droptable, ST);
5518  RUNSTEP(par, createtable, ST);
5519  RUNSTEP(par, invalidatetable, MT);
5520  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5521  LL1(SUBLOOP(par));
5522  RUNSTEP(par, pkinsert, MT);
5523  t1.on();
5524  RUNSTEP(par, createindex, ST);
5525  t1.off(par.m_totrows);
5526  RUNSTEP(par, invalidateindex, MT);
5527  RUNSTEP(par, dropindex, ST);
5528  }
5529  LL1("build index - " << t1.time());
5530  return 0;
5531 }
5532 
5533 static int
5534 ttimemaint(Par par)
5535 {
5536  Tmr t1, t2;
5537  RUNSTEP(par, droptable, ST);
5538  RUNSTEP(par, createtable, ST);
5539  RUNSTEP(par, invalidatetable, MT);
5540  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5541  LL1(SUBLOOP(par));
5542  RUNSTEP(par, pkinsert, MT);
5543  t1.on();
5544  RUNSTEP(par, pkupdate, MT);
5545  t1.off(par.m_totrows);
5546  RUNSTEP(par, createindex, ST);
5547  RUNSTEP(par, invalidateindex, MT);
5548  t2.on();
5549  RUNSTEP(par, pkupdate, MT);
5550  t2.off(par.m_totrows);
5551  RUNSTEP(par, dropindex, ST);
5552  }
5553  LL1("update - " << t1.time());
5554  LL1("update indexed - " << t2.time());
5555  LL1("overhead - " << t2.over(t1));
5556  return 0;
5557 }
5558 
5559 static int
5560 ttimescan(Par par)
5561 {
5562  if (par.tab().m_itab[0] == 0) {
5563  LL1("ttimescan - no index 0, skipped");
5564  return 0;
5565  }
5566  Tmr t1, t2;
5567  RUNSTEP(par, droptable, ST);
5568  RUNSTEP(par, createtable, ST);
5569  RUNSTEP(par, invalidatetable, MT);
5570  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5571  LL1(SUBLOOP(par));
5572  RUNSTEP(par, pkinsert, MT);
5573  RUNSTEP(par, createindex, ST);
5574  par.m_tmr = &t1;
5575  RUNSTEP(par, timescantable, ST);
5576  par.m_tmr = &t2;
5577  RUNSTEP(par, timescanpkindex, ST);
5578  RUNSTEP(par, dropindex, ST);
5579  }
5580  LL1("full scan table - " << t1.time());
5581  LL1("full scan PK index - " << t2.time());
5582  LL1("overhead - " << t2.over(t1));
5583  return 0;
5584 }
5585 
5586 static int
5587 ttimepkread(Par par)
5588 {
5589  if (par.tab().m_itab[0] == 0) {
5590  LL1("ttimescan - no index 0, skipped");
5591  return 0;
5592  }
5593  Tmr t1, t2;
5594  RUNSTEP(par, droptable, ST);
5595  RUNSTEP(par, createtable, ST);
5596  RUNSTEP(par, invalidatetable, MT);
5597  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
5598  LL1(SUBLOOP(par));
5599  RUNSTEP(par, pkinsert, MT);
5600  RUNSTEP(par, createindex, ST);
5601  par.m_tmr = &t1;
5602  RUNSTEP(par, timepkreadtable, ST);
5603  par.m_tmr = &t2;
5604  RUNSTEP(par, timepkreadindex, ST);
5605  RUNSTEP(par, dropindex, ST);
5606  }
5607  LL1("pk read table - " << t1.time());
5608  LL1("pk read PK index - " << t2.time());
5609  LL1("overhead - " << t2.over(t1));
5610  return 0;
5611 }
5612 
5613 static int
5614 tdrop(Par par)
5615 {
5616  RUNSTEP(par, droptable, ST);
5617  return 0;
5618 }
5619 
5620 struct TCase {
5621  const char* m_name;
5622  TFunc m_func;
5623  const char* m_desc;
5624  TCase(const char* name, TFunc func, const char* desc) :
5625  m_name(name),
5626  m_func(func),
5627  m_desc(desc) {
5628  }
5629 };
5630 
5631 static const TCase
5632 tcaselist[] = {
5633  TCase("a", tbuild, "index build"),
5634  TCase("b", tindexscan, "index scans"),
5635  TCase("c", tpkops, "pk operations"),
5636  TCase("d", tpkopsread, "pk operations and scan reads"),
5637  TCase("e", tmixedops, "pk operations and scan operations"),
5638  TCase("f", tbusybuild, "pk operations and index build"),
5639  TCase("g", trollback, "operations with random rollbacks"),
5640  TCase("h", tparupdate, "parallel ordered update bug#20446"),
5641  TCase("i", tsavepoint, "savepoint test locking bug#31477"),
5642  TCase("j", thalloween, "savepoint test halloween problem"),
5643  TCase("t", ttimebuild, "time index build"),
5644  TCase("u", ttimemaint, "time index maintenance"),
5645  TCase("v", ttimescan, "time full scan table vs index on pk"),
5646  TCase("w", ttimepkread, "time pk read table vs index on pk"),
5647  TCase("z", tdrop, "drop test tables")
5648 };
5649 
5650 static const uint
5651 tcasecount = sizeof(tcaselist) / sizeof(tcaselist[0]);
5652 
5653 static void
5654 printcases()
5655 {
5656  ndbout << "test cases:" << endl;
5657  for (uint i = 0; i < tcasecount; i++) {
5658  const TCase& tcase = tcaselist[i];
5659  ndbout << " " << tcase.m_name << " - " << tcase.m_desc << endl;
5660  }
5661 }
5662 
5663 static void
5664 printtables()
5665 {
5666  Par par(g_opt);
5667  makebuiltintables(par);
5668  ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl;
5669  for (uint j = 0; j < tabcount; j++) {
5670  if (tablist[j] == 0)
5671  continue;
5672  const Tab& tab = *tablist[j];
5673  const char* tname = tab.m_name;
5674  ndbout << " " << tname;
5675  for (uint i = 0; i < tab.m_itabs; i++) {
5676  if (tab.m_itab[i] == 0)
5677  continue;
5678  const ITab& itab = *tab.m_itab[i];
5679  const char* iname = itab.m_name;
5680  if (strncmp(tname, iname, strlen(tname)) == 0)
5681  iname += strlen(tname);
5682  ndbout << " " << iname;
5683  ndbout << "(";
5684  for (uint k = 0; k < itab.m_icols; k++) {
5685  if (k != 0)
5686  ndbout << ",";
5687  const ICol& icol = *itab.m_icol[k];
5688  const Col& col = icol.m_col;
5689  ndbout << col.m_name;
5690  }
5691  ndbout << ")";
5692  }
5693  ndbout << endl;
5694  }
5695 }
5696 
5697 static bool
5698 setcasepar(Par& par)
5699 {
5700  Opt d;
5701  const char* c = par.m_currcase;
5702  switch (c[0]) {
5703  case 'i':
5704  {
5705  if (par.m_usedthreads > 1) {
5706  par.m_usedthreads = 1;
5707  LL1("case " << c << " reduce threads to " << par.m_usedthreads);
5708  }
5709  const uint rows = 100;
5710  if (par.m_rows > rows) {
5711  par.m_rows = rows;
5712  LL1("case " << c << " reduce rows to " << rows);
5713  }
5714  }
5715  break;
5716  case 'j':
5717  {
5718  if (par.m_usedthreads > 1) {
5719  par.m_usedthreads = 1;
5720  LL1("case " << c << " reduce threads to " << par.m_usedthreads);
5721  }
5722  }
5723  break;
5724  default:
5725  break;
5726  }
5727  return true;
5728 }
5729 
5730 static int
5731 runtest(Par par)
5732 {
5733  int totret = 0;
5734  if (par.m_seed == -1) {
5735  // good enough for daily run
5736  ushort seed = (ushort)getpid();
5737  LL0("random seed: " << seed);
5738  srandom((uint)seed);
5739  } else if (par.m_seed != 0) {
5740  LL0("random seed: " << par.m_seed);
5741  srandom(par.m_seed);
5742  } else {
5743  LL0("random seed: loop number");
5744  }
5745  // cs
5746  assert(par.m_csname != 0);
5747  if (strcmp(par.m_csname, "random") != 0) {
5748  CHARSET_INFO* cs;
5749  CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
5750  par.m_cs = cs;
5751  }
5752  // con
5753  Con con;
5754  CHK(con.connect() == 0);
5755  par.m_con = &con;
5756  par.m_catcherr |= Con::ErrNospace;
5757  // threads
5758  g_thrlist = new Thr* [par.m_threads];
5759  uint n;
5760  for (n = 0; n < par.m_threads; n++) {
5761  g_thrlist[n] = 0;
5762  }
5763  for (n = 0; n < par.m_threads; n++) {
5764  g_thrlist[n] = new Thr(par, n);
5765  Thr& thr = *g_thrlist[n];
5766  assert(thr.m_thread != 0);
5767  }
5768  for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
5769  LL1("loop: " << par.m_lno);
5770  if (par.m_seed == 0) {
5771  LL1("random seed: " << par.m_lno);
5772  srandom(par.m_lno);
5773  }
5774  for (uint i = 0; i < tcasecount; i++) {
5775  const TCase& tcase = tcaselist[i];
5776  if ((par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) ||
5777  (par.m_skip != 0 && strchr(par.m_skip, tcase.m_name[0]) != 0)) {
5778  continue;
5779  }
5780  sprintf(par.m_currcase, "%c", tcase.m_name[0]);
5781  par.m_usedthreads = par.m_threads;
5782  if (!setcasepar(par)) {
5783  LL1("case " << tcase.m_name << " cannot run with given options");
5784  continue;
5785  }
5786  par.m_totrows = par.m_usedthreads * par.m_rows;
5787  makebuiltintables(par);
5788  LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
5789  for (uint j = 0; j < tabcount; j++) {
5790  if (tablist[j] == 0)
5791  continue;
5792  const Tab& tab = *tablist[j];
5793  par.m_tab = &tab;
5794  par.m_set = new Set(tab, par.m_totrows);
5795  LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
5796  int ret = tcase.m_func(par);
5797  delete par.m_set;
5798  par.m_set = 0;
5799  if (ret == -1) {
5800  if (!par.m_cont)
5801  return -1;
5802  totret = -1;
5803  LL1("continue to next case due to -cont");
5804  break;
5805  }
5806  }
5807  }
5808  }
5809  for (n = 0; n < par.m_threads; n++) {
5810  Thr& thr = *g_thrlist[n];
5811  thr.exit();
5812  }
5813  for (n = 0; n < par.m_threads; n++) {
5814  Thr& thr = *g_thrlist[n];
5815  thr.join();
5816  delete &thr;
5817  }
5818  delete [] g_thrlist;
5819  g_thrlist = 0;
5820  con.disconnect();
5821  return totret;
5822 }
5823 
5824 static const char* g_progname = "testOIBasic";
5825 
5826 int
5827 main(int argc, char** argv)
5828 {
5829  ndb_init();
5830  uint i;
5831  ndbout << g_progname;
5832  for (i = 1; i < (uint) argc; i++)
5833  ndbout << " " << argv[i];
5834  ndbout << endl;
5835  ndbout_mutex = NdbMutex_Create();
5836  while (++argv, --argc > 0) {
5837  const char* arg = argv[0];
5838  if (*arg != '-') {
5839  ndbout << "testOIBasic: unknown argument " << arg;
5840  goto usage;
5841  }
5842  if (strcmp(arg, "-batch") == 0) {
5843  if (++argv, --argc > 0) {
5844  g_opt.m_batch = atoi(argv[0]);
5845  continue;
5846  }
5847  }
5848  if (strcmp(arg, "-bound") == 0) {
5849  if (++argv, --argc > 0) {
5850  const char* p = argv[0];
5851  if (strlen(p) != 0 && strlen(p) == strspn(p, "01234")) {
5852  g_opt.m_bound = strdup(p);
5853  continue;
5854  }
5855  }
5856  }
5857  if (strcmp(arg, "-case") == 0) {
5858  if (++argv, --argc > 0) {
5859  g_opt.m_case = strdup(argv[0]);
5860  continue;
5861  }
5862  }
5863  if (strcmp(arg, "-collsp") == 0) {
5864  g_opt.m_collsp = true;
5865  continue;
5866  }
5867  if (strcmp(arg, "-cont") == 0) {
5868  g_opt.m_cont = true;
5869  continue;
5870  }
5871  if (strcmp(arg, "-core") == 0) {
5872  g_opt.m_core = true;
5873  continue;
5874  }
5875  if (strcmp(arg, "-csname") == 0) {
5876  if (++argv, --argc > 0) {
5877  g_opt.m_csname = strdup(argv[0]);
5878  continue;
5879  }
5880  }
5881  if (strcmp(arg, "-die") == 0) {
5882  if (++argv, --argc > 0) {
5883  g_opt.m_die = atoi(argv[0]);
5884  continue;
5885  }
5886  }
5887  if (strcmp(arg, "-dups") == 0) {
5888  g_opt.m_dups = true;
5889  continue;
5890  }
5891  if (strcmp(arg, "-fragtype") == 0) {
5892  if (++argv, --argc > 0) {
5893  if (strcmp(argv[0], "single") == 0) {
5894  g_opt.m_fragtype = NdbDictionary::Object::FragSingle;
5895  continue;
5896  }
5897  if (strcmp(argv[0], "small") == 0) {
5898  g_opt.m_fragtype = NdbDictionary::Object::FragAllSmall;
5899  continue;
5900  }
5901  if (strcmp(argv[0], "medium") == 0) {
5902  g_opt.m_fragtype = NdbDictionary::Object::FragAllMedium;
5903  continue;
5904  }
5905  if (strcmp(argv[0], "large") == 0) {
5906  g_opt.m_fragtype = NdbDictionary::Object::FragAllLarge;
5907  continue;
5908  }
5909  }
5910  }
5911  if (strcmp(arg, "-index") == 0) {
5912  if (++argv, --argc > 0) {
5913  g_opt.m_index = strdup(argv[0]);
5914  continue;
5915  }
5916  }
5917  if (strcmp(arg, "-loop") == 0) {
5918  if (++argv, --argc > 0) {
5919  g_opt.m_loop = atoi(argv[0]);
5920  continue;
5921  }
5922  }
5923  if (strcmp(arg, "-mrrmaxrng") == 0) {
5924  if (++argv, --argc > 0) {
5925  g_opt.m_mrrmaxrng = atoi(argv[0]);
5926  continue;
5927  }
5928  }
5929  if (strcmp(arg, "-nologging") == 0) {
5930  g_opt.m_nologging = true;
5931  continue;
5932  }
5933  if (strcmp(arg, "-noverify") == 0) {
5934  g_opt.m_noverify = true;
5935  continue;
5936  }
5937  if (strcmp(arg, "-pctmrr") == 0) {
5938  if (++argv, --argc > 0) {
5939  g_opt.m_pctmrr = atoi(argv[0]);
5940  continue;
5941  }
5942  }
5943  if (strcmp(arg, "-pctnull") == 0) {
5944  if (++argv, --argc > 0) {
5945  g_opt.m_pctnull = atoi(argv[0]);
5946  continue;
5947  }
5948  }
5949  if (strcmp(arg, "-rows") == 0) {
5950  if (++argv, --argc > 0) {
5951  g_opt.m_rows = atoi(argv[0]);
5952  continue;
5953  }
5954  }
5955  if (strcmp(arg, "-samples") == 0) {
5956  if (++argv, --argc > 0) {
5957  g_opt.m_samples = atoi(argv[0]);
5958  continue;
5959  }
5960  }
5961  if (strcmp(arg, "-scanbatch") == 0) {
5962  if (++argv, --argc > 0) {
5963  g_opt.m_scanbatch = atoi(argv[0]);
5964  continue;
5965  }
5966  }
5967  if (strcmp(arg, "-scanpar") == 0) {
5968  if (++argv, --argc > 0) {
5969  g_opt.m_scanpar = atoi(argv[0]);
5970  continue;
5971  }
5972  }
5973  if (strcmp(arg, "-seed") == 0) {
5974  if (++argv, --argc > 0) {
5975  g_opt.m_seed = atoi(argv[0]);
5976  continue;
5977  }
5978  }
5979  if (strcmp(arg, "-skip") == 0) {
5980  if (++argv, --argc > 0) {
5981  g_opt.m_skip = strdup(argv[0]);
5982  continue;
5983  }
5984  }
5985  if (strcmp(arg, "-sloop") == 0) {
5986  if (++argv, --argc > 0) {
5987  g_opt.m_sloop = atoi(argv[0]);
5988  continue;
5989  }
5990  }
5991  if (strcmp(arg, "-ssloop") == 0) {
5992  if (++argv, --argc > 0) {
5993  g_opt.m_ssloop = atoi(argv[0]);
5994  continue;
5995  }
5996  }
5997  if (strcmp(arg, "-table") == 0) {
5998  if (++argv, --argc > 0) {
5999  g_opt.m_table = strdup(argv[0]);
6000  continue;
6001  }
6002  }
6003  if (strcmp(arg, "-threads") == 0) {
6004  if (++argv, --argc > 0) {
6005  g_opt.m_threads = atoi(argv[0]);
6006  if (1 <= g_opt.m_threads)
6007  continue;
6008  }
6009  }
6010  if (strcmp(arg, "-v") == 0) {
6011  if (++argv, --argc > 0) {
6012  g_opt.m_v = atoi(argv[0]);
6013  continue;
6014  }
6015  }
6016  if (strncmp(arg, "-v", 2) == 0 && isdigit(arg[2])) {
6017  g_opt.m_v = atoi(&arg[2]);
6018  continue;
6019  }
6020  if (strcmp(arg, "-h") == 0 || strcmp(arg, "-help") == 0) {
6021  printhelp();
6022  goto wrongargs;
6023  }
6024  ndbout << "testOIBasic: bad or unknown option " << arg;
6025  goto usage;
6026  }
6027  {
6028  Par par(g_opt);
6029  g_ncc = new Ndb_cluster_connection();
6030  if (g_ncc->connect(30) != 0 || runtest(par) < 0)
6031  goto failed;
6032  delete g_ncc;
6033  g_ncc = 0;
6034  }
6035 // ok
6036  return NDBT_ProgramExit(NDBT_OK);
6037 failed:
6038  return NDBT_ProgramExit(NDBT_FAILED);
6039 usage:
6040  ndbout << " (use -h for help)" << endl;
6041 wrongargs:
6042  return NDBT_ProgramExit(NDBT_WRONGARGS);
6043 }
6044 
6045 // vim: set sw=2 et: