20 package com.mysql.cluster.crund;
22 import java.util.Arrays;
23 import java.nio.ByteBuffer;
24 import java.nio.ByteOrder;
25 import java.nio.IntBuffer;
27 import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;
28 import com.mysql.ndbjtie.ndbapi.Ndb;
29 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
30 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
31 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Table;
32 import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
33 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
34 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
35 import com.mysql.ndbjtie.ndbapi.NdbError;
36 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
37 import com.mysql.ndbjtie.ndbapi.NdbOperation;
38 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
39 import com.mysql.ndbjtie.ndbapi.NdbRecAttr;
67 protected Model model;
76 protected void initProperties() {
77 super.initProperties();
78 descr =
"->ndbjtie(" + mgmdConnect +
")";
81 protected void initLoad()
throws Exception {
87 loadSystemLibrary(
"ndbclient");
90 out.print(
"creating cluster connection ...");
97 out.print(
"connecting to mgmd ...");
99 final int retries = 0;
101 final int verbose = 1;
103 if (mgmd.connect(retries, delay, verbose) != 0) {
104 final String msg = (
"mgmd@" + mgmdConnect
105 +
" was not ready within "
106 + (retries * delay) +
"s.");
108 throw new RuntimeException(
"!!! " + msg);
110 out.println(
" [ok: " + mgmdConnect +
"]");
113 protected void closeLoad()
throws Exception {
115 out.print(
"closing mgmd connection ...");
120 out.println(
" [ok]");
132 return "NdbError[" + e.code() +
"]: " + e.message();
136 static protected class Model {
153 public final int attr_id;
154 public final int attr_cint;
155 public final int attr_clong;
156 public final int attr_cfloat;
157 public final int attr_cdouble;
158 public final int attr_B0_a_id;
159 public final int attr_B0_cvarbinary_def;
160 public final int attr_B0_cvarchar_def;
163 public Model(
Ndb ndb) {
164 final Dictionary dict = ndb.getDictionary();
167 if ((table_A = dict.getTable(
"a")) == null)
168 throw new RuntimeException(toStr(dict.getNdbError()));
169 if ((column_A_id = table_A.getColumn(
"id")) == null)
170 throw new RuntimeException(toStr(dict.getNdbError()));
171 if ((column_A_cint = table_A.getColumn(
"cint")) == null)
172 throw new RuntimeException(toStr(dict.getNdbError()));
173 if ((column_A_clong = table_A.getColumn(
"clong")) == null)
174 throw new RuntimeException(toStr(dict.getNdbError()));
175 if ((column_A_cfloat = table_A.getColumn(
"cfloat")) == null)
176 throw new RuntimeException(toStr(dict.getNdbError()));
177 if ((column_A_cdouble = table_A.getColumn(
"cdouble")) == null)
178 throw new RuntimeException(toStr(dict.getNdbError()));
181 if ((table_B0 = dict.getTable(
"b0")) == null)
182 throw new RuntimeException(toStr(dict.getNdbError()));
183 if ((column_B0_id = table_B0.getColumn(
"id")) == null)
184 throw new RuntimeException(toStr(dict.getNdbError()));
185 if ((column_B0_cint = table_B0.getColumn(
"cint")) == null)
186 throw new RuntimeException(toStr(dict.getNdbError()));
187 if ((column_B0_clong = table_B0.getColumn(
"clong")) == null)
188 throw new RuntimeException(toStr(dict.getNdbError()));
189 if ((column_B0_cfloat = table_B0.getColumn(
"cfloat")) == null)
190 throw new RuntimeException(toStr(dict.getNdbError()));
191 if ((column_B0_cdouble = table_B0.getColumn(
"cdouble")) == null)
192 throw new RuntimeException(toStr(dict.getNdbError()));
193 if ((column_B0_a_id = table_B0.getColumn(
"a_id")) == null)
194 throw new RuntimeException(toStr(dict.getNdbError()));
195 if ((column_B0_cvarbinary_def = table_B0.getColumn(
"cvarbinary_def")) == null)
196 throw new RuntimeException(toStr(dict.getNdbError()));
197 if ((column_B0_cvarchar_def = table_B0.getColumn(
"cvarchar_def")) == null)
198 throw new RuntimeException(toStr(dict.getNdbError()));
201 if ((idx_B0_a_id = dict.getIndex(
"I_B0_FK",
"b0")) == null)
202 throw new RuntimeException(toStr(dict.getNdbError()));
205 attr_id = column_A_id.getColumnNo();
206 if (attr_id != column_B0_id.getColumnNo())
207 throw new RuntimeException(
"attribute id mismatch");
208 attr_cint = column_A_cint.getColumnNo();
209 if (attr_cint != column_B0_cint.getColumnNo())
210 throw new RuntimeException(
"attribute id mismatch");
211 attr_clong = column_A_clong.getColumnNo();
212 if (attr_clong != column_B0_clong.getColumnNo())
213 throw new RuntimeException(
"attribute id mismatch");
214 attr_cfloat = column_A_cfloat.getColumnNo();
215 if (attr_cfloat != column_B0_cfloat.getColumnNo())
216 throw new RuntimeException(
"attribute id mismatch");
217 attr_cdouble = column_A_cdouble.getColumnNo();
218 if (attr_cdouble != column_B0_cdouble.getColumnNo())
219 throw new RuntimeException(
"attribute id mismatch");
222 attr_B0_a_id = column_B0_a_id.getColumnNo();
223 attr_B0_cvarbinary_def = column_B0_cvarbinary_def.getColumnNo();
224 attr_B0_cvarchar_def = column_B0_cvarchar_def.getColumnNo();
228 protected void initOperations() {
229 out.print(
"initializing operations ...");
235 for (
boolean f =
false, done =
false; !done; done = f, f =
true) {
237 final boolean batch = f;
238 final boolean forceSend = f;
239 final boolean setAttrs =
true;
242 new Op(
"insA" + (batch ?
"_batch" :
"")) {
243 public void run(
int nOps) {
244 ins(model.table_A, 1, nOps, !setAttrs, batch);
249 new Op(
"insB0" + (batch ?
"_batch" :
"")) {
250 public void run(
int nOps) {
251 ins(model.table_B0, 1, nOps, !setAttrs, batch);
256 new Op(
"setAByPK" + (batch ?
"_batch" :
"")) {
257 public void run(
int nOps) {
258 setByPK(model.table_A, 1, nOps, batch);
263 new Op(
"setB0ByPK" + (batch ?
"_batch" :
"")) {
264 public void run(
int nOps) {
265 setByPK(model.table_B0, 1, nOps, batch);
270 new Op(
"getAByPK_bb" + (batch ?
"_batch" :
"")) {
271 public void run(
int nOps) {
272 getByPK_bb(model.table_A, 1, nOps, batch);
277 new Op(
"getAByPK_ar" + (batch ?
"_batch" :
"")) {
278 public void run(
int nOps) {
279 getByPK_ar(model.table_A, 1, nOps, batch);
284 new Op(
"getB0ByPK_bb" + (batch ?
"_batch" :
"")) {
285 public void run(
int nOps) {
286 getByPK_bb(model.table_B0, 1, nOps, batch);
291 new Op(
"getB0ByPK_ar" + (batch ?
"_batch" :
"")) {
292 public void run(
int nOps) {
293 getByPK_ar(model.table_B0, 1, nOps, batch);
297 for (
int i = 0, l = 1; l <= maxVarbinaryBytes; l *= 10,
i++) {
298 final byte[] b = bytes[
i];
299 assert l == b.length;
302 new Op(
"setVarbinary" + l + (batch ?
"_batch" :
"")) {
303 public void run(
int nOps) {
304 setVarbinary(model.table_B0, 1, nOps, batch, b);
309 new Op(
"getVarbinary" + l + (batch ?
"_batch" :
"")) {
310 public void run(
int nOps) {
311 getVarbinary(model.table_B0, 1, nOps, batch, b);
316 new Op(
"clearVarbinary" + l + (batch ?
"_batch" :
"")) {
317 public void run(
int nOps) {
318 setVarbinary(model.table_B0, 1, nOps, batch, null);
323 for (
int i = 0, l = 1; l <= maxVarcharChars; l *= 10,
i++) {
325 assert l == s.length();
328 new Op(
"setVarchar" + l + (batch ?
"_batch" :
"")) {
329 public void run(
int nOps) {
330 setVarchar(model.table_B0, 1, nOps, batch, s);
335 new Op(
"getVarchar" + l + (batch ?
"_batch" :
"")) {
336 public void run(
int nOps) {
337 getVarchar(model.table_B0, 1, nOps, batch, s);
342 new Op(
"clearVarchar" + l + (batch ?
"_batch" :
"")) {
343 public void run(
int nOps) {
344 setVarchar(model.table_B0, 1, nOps, batch, null);
350 new Op(
"setB0->A" + (batch ?
"_batch" :
"")) {
351 public void run(
int nOps) {
352 setB0ToA(nOps, batch);
357 new Op(
"navB0->A" + (batch ?
"_batch" :
"")) {
358 public void run(
int nOps) {
359 navB0ToA(nOps, batch);
364 new Op(
"navB0->A_alt" + (batch ?
"_batch" :
"")) {
365 public void run(
int nOps) {
366 navB0ToAalt(nOps, batch);
371 new Op(
"navA->B0" + (forceSend ?
"_forceSend" :
"")) {
372 public void run(
int nOps) {
373 navAToB0(nOps, forceSend);
378 new Op(
"navA->B0_alt" + (forceSend ?
"_forceSend" :
"")) {
379 public void run(
int nOps) {
380 navAToB0alt(nOps, forceSend);
385 new Op(
"nullB0->A" + (batch ?
"_batch" :
"")) {
386 public void run(
int nOps) {
387 nullB0ToA(nOps, batch);
392 new Op(
"delB0ByPK" + (batch ?
"_batch" :
"")) {
393 public void run(
int nOps) {
394 delByPK(model.table_B0, 1, nOps, batch);
399 new Op(
"delAByPK" + (batch ?
"_batch" :
"")) {
400 public void run(
int nOps) {
401 delByPK(model.table_A, 1, nOps, batch);
406 new Op(
"insA_attr" + (batch ?
"_batch" :
"")) {
407 public void run(
int nOps) {
408 ins(model.table_A, 1, nOps, setAttrs, batch);
413 new Op(
"insB0_attr" + (batch ?
"_batch" :
"")) {
414 public void run(
int nOps) {
415 ins(model.table_B0, 1, nOps, setAttrs, batch);
420 new Op(
"delAllB0" + (batch ?
"_batch" :
"")) {
421 public void run(
int nOps) {
422 final int count = delByScan(model.table_B0, batch);
423 assert count == nOps;
428 new Op(
"delAllA" + (batch ?
"_batch" :
"")) {
429 public void run(
int nOps) {
430 final int count = delByScan(model.table_A, batch);
431 assert count == nOps;
436 out.println(
" [Op: " + ops.size() +
"]");
439 protected void closeOperations() {
440 out.print(
"closing operations ...");
443 out.println(
" [ok]");
446 protected void beginTransaction() {
450 final ByteBuffer keyData = null;
451 final int keyLen = 0;
452 if ((tx = ndb.startTransaction(table, keyData, keyLen)) == null)
453 throw new RuntimeException(toStr(ndb.getNdbError()));
456 protected void executeOperations() {
459 final int abortOption =
NdbOperation.AbortOption.AbortOnError;
461 if (tx.execute(execType, abortOption, force) != 0
462 || tx.getNdbError().status() !=
NdbError.Status.Success)
463 throw new RuntimeException(toStr(tx.getNdbError()));
466 protected void commitTransaction() {
469 final int abortOption =
NdbOperation.AbortOption.AbortOnError;
471 if (tx.execute(execType, abortOption, force) != 0
472 || tx.getNdbError().status() !=
NdbError.Status.Success)
473 throw new RuntimeException(toStr(tx.getNdbError()));
476 protected void rollbackTransaction() {
479 final int abortOption =
NdbOperation.AbortOption.DefaultAbortOption;
481 if (tx.execute(execType, abortOption, force) != 0
482 || tx.getNdbError().status() !=
NdbError.Status.Success)
483 throw new RuntimeException(toStr(tx.getNdbError()));
486 protected void closeTransaction() {
490 ndb.closeTransaction(tx);
496 static protected class CommonAB_RA {
504 protected void fetchCommonAttributes(CommonAB_RA cab,
NdbOperation op) {
505 final ByteBuffer val = null;
506 if ((cab.id = op.getValue(model.attr_id, val)) == null)
507 throw new RuntimeException(toStr(tx.getNdbError()));
508 if ((cab.cint = op.getValue(model.attr_cint, val)) == null)
509 throw new RuntimeException(toStr(tx.getNdbError()));
510 if ((cab.clong = op.getValue(model.attr_clong, val)) == null)
511 throw new RuntimeException(toStr(tx.getNdbError()));
512 if ((cab.cfloat = op.getValue(model.attr_cfloat, val)) == null)
513 throw new RuntimeException(toStr(tx.getNdbError()));
514 if ((cab.cdouble = op.getValue(model.attr_cdouble, val)) == null)
515 throw new RuntimeException(toStr(tx.getNdbError()));
518 protected int verifyCommonAttributes(CommonAB_RA cab) {
519 final int id = cab.id.int32_value();
520 final int cint = cab.cint.int32_value();
522 final long clong = cab.clong.int64_value();
523 verify(clong == cint);
524 final float cfloat = cab.cfloat.float_value();
525 verify(cfloat == cint);
526 final double cdouble = cab.cdouble.double_value();
527 verify(cdouble == cint);
532 boolean setAttrs,
boolean batch) {
534 for (
int i = from;
i <=
to;
i++) {
538 throw new RuntimeException(toStr(tx.getNdbError()));
539 if (op.insertTuple() != 0)
540 throw new RuntimeException(toStr(tx.getNdbError()));
543 if (op.equal(model.attr_id,
i) != 0)
544 throw new RuntimeException(toStr(tx.getNdbError()));
546 if (op.setValue(model.attr_cint, -
i) != 0)
547 throw new RuntimeException(toStr(tx.getNdbError()));
548 if (op.setValue(model.attr_clong, (
long)-
i) != 0)
549 throw new RuntimeException(toStr(tx.getNdbError()));
550 if (op.setValue(model.attr_cfloat, (
float)-
i) != 0)
551 throw new RuntimeException(toStr(tx.getNdbError()));
552 if (op.setValue(model.attr_cdouble, (
double)-
i) != 0)
553 throw new RuntimeException(toStr(tx.getNdbError()));
564 protected void delByPK(
TableConst table,
int from,
int to,
567 for (
int i = from;
i <=
to;
i++) {
571 throw new RuntimeException(toStr(tx.getNdbError()));
572 if (op.deleteTuple() != 0)
573 throw new RuntimeException(toStr(tx.getNdbError()));
576 if (op.equal(model.attr_id,
i) != 0)
577 throw new RuntimeException(toStr(tx.getNdbError()));
587 protected int delByScan(
TableConst table,
boolean batch) {
593 throw new RuntimeException(toStr(tx.getNdbError()));
596 final int lock_mode =
NdbOperation.LockMode.LM_Exclusive;
597 final int scan_flags = 0;
598 final int parallel = 0;
599 final int batch_ = 0;
600 if (op.readTuples(lock_mode, scan_flags, parallel, batch_) != 0)
601 throw new RuntimeException(toStr(tx.getNdbError()));
609 final boolean allowFetch =
true;
610 final boolean forceSend =
false;
611 while ((stat = op.nextResult(allowFetch, forceSend)) == 0) {
614 if (op.deleteCurrentTuple() != 0)
615 throw new RuntimeException(toStr(tx.getNdbError()));
621 }
while ((stat = op.nextResult(!allowFetch, forceSend)) == 0);
630 final int abortOption =
NdbOperation.AbortOption.AbortOnError;
632 if (tx.execute(execType, abortOption, force) != 0
633 || tx.getNdbError().status() !=
NdbError.Status.Success)
634 throw new RuntimeException(toStr(tx.getNdbError()));
637 throw new RuntimeException(
"stat == " + stat);
640 throw new RuntimeException(
"stat == " + stat);
643 final boolean forceSend_ =
false;
644 final boolean releaseOp =
false;
645 op.close(forceSend_, releaseOp);
652 protected void setByPK(
TableConst table,
int from,
int to,
655 for (
int i = from;
i <=
to;
i++) {
659 throw new RuntimeException(toStr(tx.getNdbError()));
660 if (op.updateTuple() != 0)
661 throw new RuntimeException(toStr(tx.getNdbError()));
664 if (op.equal(model.attr_id,
i) != 0)
665 throw new RuntimeException(toStr(tx.getNdbError()));
668 if (op.setValue(model.attr_cint,
i) != 0)
669 throw new RuntimeException(toStr(tx.getNdbError()));
670 if (op.setValue(model.attr_clong, (
long)
i) != 0)
671 throw new RuntimeException(toStr(tx.getNdbError()));
672 if (op.setValue(model.attr_cfloat, (
float)i) != 0)
673 throw new RuntimeException(toStr(tx.getNdbError()));
674 if (op.setValue(model.attr_cdouble, (
double)i) != 0)
675 throw new RuntimeException(toStr(tx.getNdbError()));
685 protected void fetchCommonAttributes(ByteBuffer cab,
NdbOperation op) {
686 if (op.getValue(model.attr_id, cab) == null)
687 throw new RuntimeException(toStr(tx.getNdbError()));
688 int p = cab.position();
690 cab.position(p += 4);
691 if (op.getValue(model.attr_cint, cab) == null)
692 throw new RuntimeException(toStr(tx.getNdbError()));
693 cab.position(p += 4);
694 if (op.getValue(model.attr_clong, cab) == null)
695 throw new RuntimeException(toStr(tx.getNdbError()));
696 cab.position(p += 8);
697 if (op.getValue(model.attr_cfloat, cab) == null)
698 throw new RuntimeException(toStr(tx.getNdbError()));
699 cab.position(p += 4);
700 if (op.getValue(model.attr_cdouble, cab) == null)
701 throw new RuntimeException(toStr(tx.getNdbError()));
702 cab.position(p += 8);
705 protected int verifyCommonAttributes(ByteBuffer cab) {
706 final int id = cab.getInt();
707 final int cint = cab.getInt();
708 final long clong = cab.getLong();
709 final float cfloat = cab.getFloat();
710 final double cdouble = cab.getDouble();
713 out.println(
"id == " +
id);
714 out.println(
"cint == " + cint);
715 out.println(
"clong == " + clong);
716 out.println(
"cfloat == " + cfloat);
717 out.println(
"cdouble == " + cdouble);
721 verify(clong == cint);
722 verify(cfloat == cint);
723 verify(cdouble == cint);
728 protected void getByPK_bb(
TableConst table,
int from,
int to,
731 final int count = (to - from) + 1;
732 final ByteBuffer cab = ByteBuffer.allocateDirect(count * 28);
733 cab.order(ByteOrder.nativeOrder());
736 for (
int i = 0, j = from;
i < count;
i++, j++) {
740 throw new RuntimeException(toStr(tx.getNdbError()));
741 if (op.readTuple(
NdbOperation.LockMode.LM_CommittedRead) != 0)
742 throw new RuntimeException(toStr(tx.getNdbError()));
745 if (op.equal(model.attr_id, j) != 0)
746 throw new RuntimeException(toStr(tx.getNdbError()));
749 fetchCommonAttributes(cab, op);
760 for (
int i = 0, j = from;
i < count;
i++, j++) {
762 final int id1 = verifyCommonAttributes(cab);
768 protected void getByPK_ar(
TableConst table,
int from,
int to,
771 final int count = (to - from) + 1;
772 final CommonAB_RA[] cab_ra =
new CommonAB_RA[count];
775 for (
int i = 0, j = from;
i < count;
i++, j++) {
779 throw new RuntimeException(toStr(tx.getNdbError()));
780 if (op.readTuple(
NdbOperation.LockMode.LM_CommittedRead) != 0)
781 throw new RuntimeException(toStr(tx.getNdbError()));
784 if (op.equal(model.attr_id, j) != 0)
785 throw new RuntimeException(toStr(tx.getNdbError()));
788 final CommonAB_RA c =
new CommonAB_RA();
791 fetchCommonAttributes(c, op);
801 for (
int i = 0, j = from;
i < count;
i++, j++) {
803 verify(cab_ra[
i].
id.int32_value() == j);
806 final int id1 = verifyCommonAttributes(cab_ra[
i]);
812 protected void setVarbinary(
TableConst table,
int from,
int to,
813 boolean batch, byte[] bytes) {
858 protected void setVarchar(
TableConst table,
int from,
int to,
859 boolean batch,
String string) {
863 protected void getVarbinary(
TableConst table,
int from,
int to,
864 boolean batch, byte[] bytes) {
868 protected void getVarchar(
TableConst table,
int from,
int to,
869 boolean batch,
String string) {
873 protected void setB0ToA(
int nOps,
boolean batch) {
877 protected void nullB0ToA(
int nOps,
boolean batch) {
881 protected void navB0ToA(
int nOps,
boolean batch) {
885 protected void navB0ToAalt(
int nOps,
boolean batch) {
889 protected void navAToB0(
int nOps,
boolean forceSend) {
893 protected void navAToB0alt(
int nOps,
boolean forceSend) {
901 protected void initConnection() {
905 out.print(
"waiting for ndbd ...");
907 final int initial_wait = 10;
908 final int final_wait = 0;
910 if (mgmd.wait_until_ready(initial_wait, final_wait) < 0) {
911 final String msg = (
"data nodes were not ready within "
912 + (initial_wait + final_wait) +
"s.");
914 throw new RuntimeException(msg);
916 out.println(
" [ok]");
919 out.print(
"connecting to ndbd ...");
921 ndb =
Ndb.create(mgmd, catalog, schema);
922 final int max_no_tx = 10;
924 if (ndb.init(max_no_tx) != 0) {
925 String msg =
"Error caught: " + ndb.getNdbError().message();
926 throw new RuntimeException(msg);
928 out.println(
" [ok]");
931 model =
new Model(ndb);
934 protected void closeConnection() {
936 out.print(
"closing ndbd connection ...");
941 out.println(
" [ok]");
944 protected void clearData() {
945 out.print(
"deleting all rows ...");
947 final int delB0 = delByScan(model.table_B0,
true);
948 out.print(
" [B0: " + delB0);
950 final int delA = delByScan(model.table_A,
true);
951 out.print(
", A: " + delA);
958 static public void main(
String[] args) {
959 System.out.println(
"NdbJTieLoad.main()");
962 System.out.println();
963 System.out.println(
"NdbJTieLoad.main(): done.");