MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ndb_async2.cpp
1 /*
2  Copyright (C) 2005, 2006, 2008 MySQL AB, 2009 Sun Microsystems, Inc.
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 //#define DEBUG_ON
20 
21 #include <string.h>
22 #include "userInterface.h"
23 
24 #include "macros.h"
25 #include "ndb_schema.hpp"
26 #include "ndb_error.hpp"
27 #include <NdbSleep.h>
28 
29 #include <NdbApi.hpp>
30 
31 void T1_Callback(int result, NdbConnection * pCon, void * threadData);
32 void T2_Callback(int result, NdbConnection * pCon, void * threadData);
33 void T3_Callback_1(int result, NdbConnection * pCon, void * threadData);
34 void T3_Callback_2(int result, NdbConnection * pCon, void * threadData);
35 void T3_Callback_3(int result, NdbConnection * pCon, void * threadData);
36 void T4_Callback_1(int result, NdbConnection * pCon, void * threadData);
37 void T4_Callback_2(int result, NdbConnection * pCon, void * threadData);
38 void T4_Callback_3(int result, NdbConnection * pCon, void * threadData);
39 void T5_Callback_1(int result, NdbConnection * pCon, void * threadData);
40 void T5_Callback_2(int result, NdbConnection * pCon, void * threadData);
41 void T5_Callback_3(int result, NdbConnection * pCon, void * threadData);
42 
43 static int stat_async = 0;
44 
59 #define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
60 
61 inline
63 startTransaction(Ndb * pNDB, ThreadData * td){
64  return pNDB->startTransaction();
65 #ifdef OLD_CODE
66  return pNDB->startTransactionDGroup (0,
67  &td->transactionData.number[SFX_START],
68  1);
69 #endif
70 }
71 
72 // NdbRecord helper macros
73 #define SET_MASK(mask, attrId) \
74  mask[attrId >> 3] |= (1 << (attrId & 7))
75 
76 void
77 start_T1(Ndb * pNDB, ThreadData * td, int async){
78 
79  DEBUG2("T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH,
80  td->transactionData.number);
81 
82  NdbConnection * pCON = 0;
83  while((pCON = startTransaction(pNDB, td)) == 0){
84  CHECK_ALLOWED_ERROR("T1: startTransaction", td, pNDB->getNdbError());
85  NdbSleep_MilliSleep(10);
86  }
87 
88  const NdbOperation* op= NULL;
89 
90  if (td->ndbRecordSharedData)
91  {
92  char* rowPtr= (char*) &td->transactionData;
93  const NdbRecord* record= td->ndbRecordSharedData->
94  subscriberTableNdbRecord;
95  Uint32 m=0;
96  unsigned char* mask= (unsigned char*) &m;
97 
98  //SET_MASK(mask, IND_SUBSCRIBER_NUMBER);
99  SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
100  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
101  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
102 
103  op= pCON->updateTuple(record,
104  rowPtr,
105  record,
106  rowPtr,
107  mask);
108  }
109  else
110  {
111  NdbOperation *MyOp = pCON->getNdbOperation(SUBSCRIBER_TABLE);
112  op= MyOp;
113  if (MyOp != NULL) {
114  MyOp->updateTuple();
115  MyOp->equal(IND_SUBSCRIBER_NUMBER,
116  td->transactionData.number);
117  MyOp->setValue(IND_SUBSCRIBER_LOCATION,
118  (char *)&td->transactionData.location);
119  MyOp->setValue(IND_SUBSCRIBER_CHANGED_BY,
120  td->transactionData.changed_by);
121  MyOp->setValue(IND_SUBSCRIBER_CHANGED_TIME,
122  td->transactionData.changed_time);
123  }
124  }
125 
126  if (op != NULL)
127  {
128  if (async == 1) {
129  pCON->executeAsynchPrepare( Commit , T1_Callback, td);
130  } else {
131  int result = pCON->execute(Commit);
132  T1_Callback(result, pCON, (void*)td);
133  return;
134  }//if
135  } else {
136  CHECK_NULL(NULL, "T1: getNdbOperation", td, pCON->getNdbError());
137  }//if
138 }
139 
140 void
141 T1_Callback(int result, NdbConnection * pCON, void * threadData) {
142  ThreadData * td = (ThreadData *)threadData;
143 
144  DEBUG2("T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH,
145  td->transactionData.number);
146 
147  if (result == -1) {
148  CHECK_ALLOWED_ERROR("T1: Commit", td, pCON->getNdbError());
149  td->pNDB->closeTransaction(pCON);
150  start_T1(td->pNDB, td, stat_async);
151  return;
152  }//if
153  td->pNDB->closeTransaction(pCON);
154  complete_T1(td);
155 }
156 
171 void
172 start_T2(Ndb * pNDB, ThreadData * td, int async){
173 
174  DEBUG3("T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
175  td->transactionData.number,
176  td->transactionData.location);
177 
178  NdbConnection * pCON = 0;
179 
180  while((pCON = startTransaction(pNDB, td)) == 0){
181  CHECK_ALLOWED_ERROR("T2-1: startTransaction", td, pNDB->getNdbError());
182  NdbSleep_MilliSleep(10);
183  }
184 
185  if (td->ndbRecordSharedData)
186  {
187  char* rowPtr= (char*) &td->transactionData;
188  const NdbRecord* record= td->ndbRecordSharedData->
189  subscriberTableNdbRecord;
190  Uint32 m=0;
191  unsigned char* mask= (unsigned char*) &m;
192 
193  SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
194  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
195  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
196  SET_MASK(mask, IND_SUBSCRIBER_NAME);
197 
198  const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
199  NdbOperation::LM_Read, mask);
200  CHECK_NULL((void*) MyOp, "T2: readTuple", td,
201  pCON->getNdbError());
202  }
203  else
204  {
205  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
206  CHECK_NULL(MyOp, "T2: getNdbOperation", td,
207  pCON->getNdbError());
208 
209  MyOp->readTuple();
210  MyOp->equal(IND_SUBSCRIBER_NUMBER,
211  td->transactionData.number);
212  MyOp->getValue(IND_SUBSCRIBER_LOCATION,
213  (char *)&td->transactionData.location);
214  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
215  td->transactionData.changed_by);
216  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
217  td->transactionData.changed_time);
218  MyOp->getValue(IND_SUBSCRIBER_NAME,
219  td->transactionData.name);
220  }
221 
222  if (async == 1) {
223  pCON->executeAsynchPrepare( Commit , T2_Callback, td);
224  } else {
225  int result = pCON->execute(Commit);
226  T2_Callback(result, pCON, (void*)td);
227  return;
228  }//if
229 }
230 
231 void
232 T2_Callback(int result, NdbConnection * pCON, void * threadData){
233  ThreadData * td = (ThreadData *)threadData;
234  DEBUG3("T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH,
235  td->transactionData.number,
236  td->transactionData.location);
237 
238  if (result == -1) {
239  CHECK_ALLOWED_ERROR("T2: Commit", td, pCON->getNdbError());
240  td->pNDB->closeTransaction(pCON);
241  start_T2(td->pNDB, td, stat_async);
242  return;
243  }//if
244 
245  td->pNDB->closeTransaction(pCON);
246  complete_T2(td);
247 }
248 
266 void
267 start_T3(Ndb * pNDB, ThreadData * td, int async){
268 
269  DEBUG3("T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
270  td->transactionData.number,
271  td->transactionData.server_id);
272 
273  NdbConnection * pCON = 0;
274 
275  while((pCON = startTransaction(pNDB, td)) == 0){
276  CHECK_ALLOWED_ERROR("T3-1: startTransaction", td, pNDB->getNdbError());
277  NdbSleep_MilliSleep(10);
278  }
279 
280  const NdbOperation* op;
281 
282  if (td->ndbRecordSharedData)
283  {
284  char* rowPtr= (char*) &td->transactionData;
285  const NdbRecord* record= td->ndbRecordSharedData->
286  subscriberTableNdbRecord;
287  Uint32 m=0;
288  unsigned char* mask= (unsigned char*) &m;
289 
290  SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
291  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
292  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
293  SET_MASK(mask, IND_SUBSCRIBER_GROUP);
294  SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
295 
296  op= pCON->readTuple(record, rowPtr, record, rowPtr,
297  NdbOperation::LM_Read, mask);
298  }
299  else
300  {
301  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
302  op= MyOp;
303  CHECK_NULL(MyOp, "T3-1: getNdbOperation", td,
304  pCON->getNdbError());
305 
306  MyOp->readTuple();
307  MyOp->equal(IND_SUBSCRIBER_NUMBER,
308  td->transactionData.number);
309  MyOp->getValue(IND_SUBSCRIBER_LOCATION,
310  (char *)&td->transactionData.location);
311  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
312  td->transactionData.changed_by);
313  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
314  td->transactionData.changed_time);
315  MyOp->getValue(IND_SUBSCRIBER_GROUP,
316  (char *)&td->transactionData.group_id);
317  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
318  (char *)&td->transactionData.sessions);
319  }
320 
321  stat_async = async;
322  if (async == 1) {
323  pCON->executeAsynchPrepare( NoCommit , T3_Callback_1, td);
324  } else {
325  int result = pCON->execute( NoCommit );
326  T3_Callback_1(result, pCON, (void*)td);
327  return;
328  }//if
329 }
330 
331 void
332 T3_Callback_1(int result, NdbConnection * pCON, void * threadData){
333  ThreadData * td = (ThreadData *)threadData;
334  DEBUG3("T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH,
335  td->transactionData.number,
336  td->transactionData.server_id);
337 
338  if (result == -1) {
339  CHECK_ALLOWED_ERROR("T3-1: execute", td, pCON->getNdbError());
340  td->pNDB->closeTransaction(pCON);
341  start_T3(td->pNDB, td, stat_async);
342  return;
343  }//if
344 
345  const NdbOperation* op= NULL;
346 
347  if (td->ndbRecordSharedData)
348  {
349  char* rowPtr= (char*) &td->transactionData;
350  const NdbRecord* record= td->ndbRecordSharedData->
351  groupTableAllowReadNdbRecord;
352  Uint32 m=0;
353  unsigned char* mask= (unsigned char*) &m;
354 
355  SET_MASK(mask, IND_GROUP_ALLOW_READ);
356 
357  op= pCON->readTuple(record, rowPtr, record, rowPtr,
358  NdbOperation::LM_Read, mask);
359  }
360  else
361  {
362  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
363  op= MyOp;
364  CHECK_NULL(MyOp, "T3-2: getNdbOperation", td,
365  pCON->getNdbError());
366 
367  MyOp->readTuple();
368  MyOp->equal(IND_GROUP_ID,
369  (char*)&td->transactionData.group_id);
370  MyOp->getValue(IND_GROUP_ALLOW_READ,
371  (char *)&td->transactionData.permission);
372  }
373 
374  if (stat_async == 1) {
375  pCON->executeAsynchPrepare( NoCommit , T3_Callback_2, td);
376  } else {
377  int result = pCON->execute( NoCommit );
378  T3_Callback_2(result, pCON, (void*)td);
379  return;
380  }//if
381 }
382 
383 void
384 T3_Callback_2(int result, NdbConnection * pCON, void * threadData){
385  ThreadData * td = (ThreadData *)threadData;
386 
387  if (result == -1) {
388  CHECK_ALLOWED_ERROR("T3-2: execute", td, pCON->getNdbError());
389  td->pNDB->closeTransaction(pCON);
390  start_T3(td->pNDB, td, stat_async);
391  return;
392  }//if
393 
394  Uint32 permission = td->transactionData.permission;
395  Uint32 sessions = td->transactionData.sessions;
396  Uint32 server_bit = td->transactionData.server_bit;
397 
398  if(((permission & server_bit) == server_bit) &&
399  ((sessions & server_bit) == server_bit)){
400 
401  memcpy(td->transactionData.suffix,
402  &td->transactionData.number[SFX_START],
403  SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
404  DEBUG5("T3(%.*s, %.2d): - Callback 2 - reading(%.*s)",
405  SUBSCRIBER_NUMBER_LENGTH,
406  td->transactionData.number,
407  td->transactionData.server_id,
408  SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
409  td->transactionData.suffix);
410 
411  /* Operations 3 + 4 */
412  if (td->ndbRecordSharedData)
413  {
414  /* Op 3 */
415  char* rowPtr= (char*) &td->transactionData;
416  const NdbRecord* record= td->ndbRecordSharedData->
417  sessionTableNdbRecord;
418  Uint32 m=0;
419  unsigned char* mask= (unsigned char*) &m;
420 
421  SET_MASK(mask, IND_SESSION_DATA);
422 
423  const NdbOperation* MyOp = pCON->readTuple(record, rowPtr, record, rowPtr,
425  mask);
426  CHECK_NULL((void*) MyOp, "T3-3: readTuple", td,
427  pCON->getNdbError());
428 
429  /* Op 4 */
430  record= td->ndbRecordSharedData->
431  serverTableNdbRecord;
432  m= 0;
433 
434  /* Attach interpreted program */
436  opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
437  opts.interpretedCode= td->ndbRecordSharedData->incrServerReadsProg;
438 
439  MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
440  &opts,
441  sizeof(opts));
442  CHECK_NULL((void*) MyOp, "T3-3: updateTuple", td,
443  pCON->getNdbError());
444  }
445  else
446  {
447  NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
448  CHECK_NULL(MyOp, "T3-3: getNdbOperation", td,
449  pCON->getNdbError());
450 
451  MyOp->simpleRead();
452  MyOp->equal(IND_SESSION_SUBSCRIBER,
453  (char*)td->transactionData.number);
454  MyOp->equal(IND_SESSION_SERVER,
455  (char*)&td->transactionData.server_id);
456  MyOp->getValue(IND_SESSION_DATA,
457  (char *)td->transactionData.session_details);
458 
459  MyOp = pCON->getNdbOperation(SERVER_TABLE);
460  CHECK_NULL(MyOp, "T3-4: getNdbOperation", td,
461  pCON->getNdbError());
462 
463  MyOp->interpretedUpdateTuple();
464  MyOp->equal(IND_SERVER_ID,
465  (char*)&td->transactionData.server_id);
466  MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
467  (char*)td->transactionData.suffix);
468  MyOp->incValue(IND_SERVER_READS, (uint32)1);
469  }
470 
471  td->transactionData.branchExecuted = 1;
472  } else {
473  DEBUG3("T3(%.*s, %.2d): - Callback 2 - no read",
474  SUBSCRIBER_NUMBER_LENGTH,
475  td->transactionData.number,
476  td->transactionData.server_id);
477  td->transactionData.branchExecuted = 0;
478  }
479  if (stat_async == 1) {
480  pCON->executeAsynchPrepare( Commit , T3_Callback_3, td);
481  } else {
482  int result = pCON->execute( Commit );
483  T3_Callback_3(result, pCON, (void*)td);
484  return;
485  }//if
486 }
487 
488 void
489 T3_Callback_3(int result, NdbConnection * pCON, void * threadData){
490  ThreadData * td = (ThreadData *)threadData;
491  DEBUG3("T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH,
492  td->transactionData.number,
493  td->transactionData.server_id);
494 
495  if (result == -1) {
496  CHECK_ALLOWED_ERROR("T3-3: Commit", td, pCON->getNdbError());
497  td->pNDB->closeTransaction(pCON);
498  start_T3(td->pNDB, td, stat_async);
499  return;
500  }//if
501 
502  td->pNDB->closeTransaction(pCON);
503  complete_T3(td);
504 }
505 
523 void
524 start_T4(Ndb * pNDB, ThreadData * td, int async){
525 
526  DEBUG3("T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
527  td->transactionData.number,
528  td->transactionData.server_id);
529 
530  NdbConnection * pCON = 0;
531  while((pCON = startTransaction(pNDB, td)) == 0){
532  CHECK_ALLOWED_ERROR("T4-1: startTransaction", td, pNDB->getNdbError());
533  NdbSleep_MilliSleep(10);
534  }
535 
536  if (td->ndbRecordSharedData)
537  {
538  char* rowPtr= (char*) &td->transactionData;
539  const NdbRecord* record= td->ndbRecordSharedData->
540  subscriberTableNdbRecord;
541  Uint32 m=0;
542  unsigned char* mask= (unsigned char*) &m;
543 
544  SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
545  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
546  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
547  SET_MASK(mask, IND_SUBSCRIBER_GROUP);
548  SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
549 
550  const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
552  mask);
553  CHECK_NULL((void*)MyOp, "T4-1: readTuple", td,
554  pCON->getNdbError());
555 
556  m= 0;
557 
558  /* Create program to add something to the subscriber
559  * sessions column
560  */
561  Uint32 codeBuf[20];
562 
563  for (Uint32 p=0; p<20; p++)
564  codeBuf[p]= 0;
565 
566  NdbInterpretedCode program(pNDB->getDictionary()->
567  getTable(SUBSCRIBER_TABLE),
568  codeBuf,
569  20);
570 
571  if (program.add_val(IND_SUBSCRIBER_SESSIONS,
572  (uint32)td->transactionData.server_bit) ||
573  program.interpret_exit_ok() ||
574  program.finalise())
575  {
576  CHECK_NULL(NULL , "T4-1: Program create failed", td,
577  program.getNdbError());
578  }
579 
581  opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
582  opts.interpretedCode= &program;
583 
584  MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
585  mask,
586  &opts,
587  sizeof(opts));
588  CHECK_NULL((void*)MyOp, "T4-1: updateTuple", td,
589  pCON->getNdbError());
590 
591  }
592  else
593  {
594  /* Use old Api */
595  if (td->useCombinedUpdate)
596  {
597  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
598  CHECK_NULL(MyOp, "T4-1: getNdbOperation", td,
599  pCON->getNdbError());
600 
601  MyOp->interpretedUpdateTuple();
602  MyOp->equal(IND_SUBSCRIBER_NUMBER,
603  td->transactionData.number);
604  MyOp->getValue(IND_SUBSCRIBER_LOCATION,
605  (char *)&td->transactionData.location);
606  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
607  td->transactionData.changed_by);
608  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
609  td->transactionData.changed_time);
610  MyOp->getValue(IND_SUBSCRIBER_GROUP,
611  (char *)&td->transactionData.group_id);
612  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
613  (char *)&td->transactionData.sessions);
614  MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
615  (uint32)td->transactionData.server_bit);
616  }
617  else
618  {
619  /* Separate read op + update op
620  * Relies on relative ordering of operation execution on a single
621  * row
622  */
623  NdbOperation *MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
624  CHECK_NULL(MyOp, "T4-1: getNdbOperation (read)", td,
625  pCON->getNdbError());
626  MyOp->readTuple();
627  MyOp->equal(IND_SUBSCRIBER_NUMBER,
628  td->transactionData.number);
629  MyOp->getValue(IND_SUBSCRIBER_LOCATION,
630  (char *)&td->transactionData.location);
631  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
632  td->transactionData.changed_by);
633  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
634  td->transactionData.changed_time);
635  MyOp->getValue(IND_SUBSCRIBER_GROUP,
636  (char *)&td->transactionData.group_id);
637  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
638  (char *)&td->transactionData.sessions);
639  MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
640  CHECK_NULL(MyOp, "T4-1: getNdbOperation (update)", td,
641  pCON->getNdbError());
642  MyOp->interpretedUpdateTuple();
643  MyOp->equal(IND_SUBSCRIBER_NUMBER,
644  td->transactionData.number);
645  MyOp->incValue(IND_SUBSCRIBER_SESSIONS,
646  (uint32)td->transactionData.server_bit);
647  }
648  }
649  stat_async = async;
650  if (async == 1) {
651  pCON->executeAsynchPrepare( NoCommit , T4_Callback_1, td);
652  } else {
653  int result = pCON->execute( NoCommit );
654  T4_Callback_1(result, pCON, (void*)td);
655  return;
656  }//if
657 }
658 
659 void
660 T4_Callback_1(int result, NdbConnection * pCON, void * threadData){
661  ThreadData * td = (ThreadData *)threadData;
662  if (result == -1) {
663  CHECK_ALLOWED_ERROR("T4-1: execute", td, pCON->getNdbError());
664  td->pNDB->closeTransaction(pCON);
665  start_T4(td->pNDB, td, stat_async);
666  return;
667  }//if
668 
669  DEBUG3("T4(%.*s, %.2d): - Callback 1",
670  SUBSCRIBER_NUMBER_LENGTH,
671  td->transactionData.number,
672  td->transactionData.server_id);
673 
674 
675  if (td->ndbRecordSharedData)
676  {
677  char* rowPtr= (char*) &td->transactionData;
678  const NdbRecord* record= td->ndbRecordSharedData->
679  groupTableAllowInsertNdbRecord;
680  Uint32 m=0;
681  unsigned char* mask= (unsigned char*) &m;
682 
683  SET_MASK(mask, IND_GROUP_ALLOW_INSERT);
684 
685  const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
687  mask);
688 
689  CHECK_NULL((void*)MyOp, "T4-2: readTuple", td,
690  pCON->getNdbError());
691  }
692  else
693  {
694  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
695  CHECK_NULL(MyOp, "T4-2: getNdbOperation", td,
696  pCON->getNdbError());
697 
698  MyOp->readTuple();
699  MyOp->equal(IND_GROUP_ID,
700  (char*)&td->transactionData.group_id);
701  MyOp->getValue(IND_GROUP_ALLOW_INSERT,
702  (char *)&td->transactionData.permission);
703  }
704  if (stat_async == 1) {
705  pCON->executeAsynchPrepare( NoCommit , T4_Callback_2, td);
706  } else {
707  int result = pCON->execute( NoCommit );
708  T4_Callback_2(result, pCON, (void*)td);
709  return;
710  }//if
711 }
712 
713 void
714 T4_Callback_2(int result, NdbConnection * pCON, void * threadData){
715  ThreadData * td = (ThreadData *)threadData;
716  if (result == -1) {
717  CHECK_ALLOWED_ERROR("T4-2: execute", td, pCON->getNdbError());
718  td->pNDB->closeTransaction(pCON);
719  start_T4(td->pNDB, td, stat_async);
720  return;
721  }//if
722 
723  Uint32 permission = td->transactionData.permission;
724  Uint32 sessions = td->transactionData.sessions;
725  Uint32 server_bit = td->transactionData.server_bit;
726 
727  if(((permission & server_bit) == server_bit) &&
728  ((sessions & server_bit) == 0)){
729 
730  memcpy(td->transactionData.suffix,
731  &td->transactionData.number[SFX_START],
732  SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
733 
734  DEBUG5("T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)",
735  SUBSCRIBER_NUMBER_LENGTH,
736  td->transactionData.number,
737  td->transactionData.server_id,
738  SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
739  td->transactionData.suffix);
740 
741  /* Operations 3 + 4 */
742 
743  if (td->ndbRecordSharedData)
744  {
745  char* rowPtr= (char*) &td->transactionData;
746  const NdbRecord* record= td->ndbRecordSharedData->
747  sessionTableNdbRecord;
748  Uint32 m=0;
749  unsigned char* mask= (unsigned char*) &m;
750 
751  SET_MASK(mask, IND_SESSION_SUBSCRIBER);
752  SET_MASK(mask, IND_SESSION_SERVER);
753  SET_MASK(mask, IND_SESSION_DATA);
754 
755  const NdbOperation* MyOp= pCON->insertTuple(record, rowPtr, mask);
756 
757  CHECK_NULL((void*)MyOp, "T4-3: insertTuple", td,
758  pCON->getNdbError());
759 
760  record= td->ndbRecordSharedData->
761  serverTableNdbRecord;
762  m= 0;
763 
765  opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
766  opts.interpretedCode= td->ndbRecordSharedData->incrServerInsertsProg;
767 
768  MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
769  &opts, sizeof(opts));
770 
771  CHECK_NULL((void*)MyOp, "T4-3: updateTuple", td,
772  pCON->getNdbError());
773  }
774  else
775  {
776  NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
777  CHECK_NULL(MyOp, "T4-3: getNdbOperation", td,
778  pCON->getNdbError());
779 
780  MyOp->insertTuple();
781  MyOp->equal(IND_SESSION_SUBSCRIBER,
782  (char*)td->transactionData.number);
783  MyOp->equal(IND_SESSION_SERVER,
784  (char*)&td->transactionData.server_id);
785  MyOp->setValue(IND_SESSION_DATA,
786  (char *)td->transactionData.session_details);
787  /* Operation 4 */
788 
789  /* Operation 5 */
790  MyOp = pCON->getNdbOperation(SERVER_TABLE);
791  CHECK_NULL(MyOp, "T4-5: getNdbOperation", td,
792  pCON->getNdbError());
793 
794  MyOp->interpretedUpdateTuple();
795  MyOp->equal(IND_SERVER_ID,
796  (char*)&td->transactionData.server_id);
797  MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
798  (char*)td->transactionData.suffix);
799  MyOp->incValue(IND_SERVER_INSERTS, (uint32)1);
800  }
801  td->transactionData.branchExecuted = 1;
802  } else {
803  td->transactionData.branchExecuted = 0;
804  DEBUG5("T4(%.*s, %.2d): - Callback 2 - %s %s",
805  SUBSCRIBER_NUMBER_LENGTH,
806  td->transactionData.number,
807  td->transactionData.server_id,
808  ((permission & server_bit) ?
809  "permission - " : "no permission - "),
810  ((sessions & server_bit) ?
811  "in session - " : "no in session - "));
812  }
813 
814  if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
815  if (stat_async == 1) {
816  pCON->executeAsynchPrepare( Commit , T4_Callback_3, td);
817  } else {
818  int result = pCON->execute( Commit );
819  T4_Callback_3(result, pCON, (void*)td);
820  return;
821  }//if
822  } else {
823  if (stat_async == 1) {
824  pCON->executeAsynchPrepare( Rollback , T4_Callback_3, td);
825  } else {
826  int result = pCON->execute( Rollback );
827  T4_Callback_3(result, pCON, (void*)td);
828  return;
829  }//if
830  }
831 }
832 
833 void
834 T4_Callback_3(int result, NdbConnection * pCON, void * threadData){
835  ThreadData * td = (ThreadData *)threadData;
836  if (result == -1) {
837  CHECK_ALLOWED_ERROR("T4-3: Commit", td, pCON->getNdbError());
838  td->pNDB->closeTransaction(pCON);
839  start_T4(td->pNDB, td, stat_async);
840  return;
841  }//if
842 
843  DEBUG3("T4(%.*s, %.2d): - Completing",
844  SUBSCRIBER_NUMBER_LENGTH,
845  td->transactionData.number,
846  td->transactionData.server_id);
847 
848  td->pNDB->closeTransaction(pCON);
849  complete_T4(td);
850 }
851 
868 void
869 start_T5(Ndb * pNDB, ThreadData * td, int async){
870 
871  DEBUG3("T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
872  td->transactionData.number,
873  td->transactionData.server_id);
874 
875  NdbConnection * pCON = 0;
876  while((pCON = startTransaction(pNDB, td)) == 0){
877  CHECK_ALLOWED_ERROR("T5-1: startTransaction", td, pNDB->getNdbError());
878  NdbSleep_MilliSleep(10);
879  }
880 
881  if (td->ndbRecordSharedData)
882  {
883  char* rowPtr= (char*) &td->transactionData;
884  const NdbRecord* record= td->ndbRecordSharedData->
885  subscriberTableNdbRecord;
886  Uint32 m=0;
887  unsigned char* mask= (unsigned char*) &m;
888 
889  SET_MASK(mask, IND_SUBSCRIBER_LOCATION);
890  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_BY);
891  SET_MASK(mask, IND_SUBSCRIBER_CHANGED_TIME);
892  SET_MASK(mask, IND_SUBSCRIBER_GROUP);
893  SET_MASK(mask, IND_SUBSCRIBER_SESSIONS);
894 
895  const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
897  mask);
898  CHECK_NULL((void*)MyOp, "T5-1: readTuple", td,
899  pCON->getNdbError());
900 
901  m= 0;
902 
903  /* Create program to subtract something from the
904  * subscriber sessions column
905  */
906  Uint32 codeBuf[20];
907  NdbInterpretedCode program(pNDB->getDictionary()->
908  getTable(SUBSCRIBER_TABLE),
909  codeBuf,
910  20);
911  if (program.sub_val(IND_SUBSCRIBER_SESSIONS,
912  (uint32)td->transactionData.server_bit) ||
913  program.interpret_exit_ok() ||
914  program.finalise())
915  {
916  CHECK_NULL(NULL , "T5: Program create failed", td,
917  program.getNdbError());
918  }
920  opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
921  opts.interpretedCode= &program;
922 
923  MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr,
924  mask,
925  &opts,
926  sizeof(opts));
927  CHECK_NULL((void*)MyOp, "T5-1: updateTuple", td,
928  pCON->getNdbError());
929  }
930  else
931  {
932  /* Use old Api */
933  if (td->useCombinedUpdate)
934  {
935  NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
936  CHECK_NULL(MyOp, "T5-1: getNdbOperation", td,
937  pCON->getNdbError());
938 
939  MyOp->interpretedUpdateTuple();
940  MyOp->equal(IND_SUBSCRIBER_NUMBER,
941  td->transactionData.number);
942  MyOp->getValue(IND_SUBSCRIBER_LOCATION,
943  (char *)&td->transactionData.location);
944  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
945  td->transactionData.changed_by);
946  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
947  td->transactionData.changed_time);
948  MyOp->getValue(IND_SUBSCRIBER_GROUP,
949  (char *)&td->transactionData.group_id);
950  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
951  (char *)&td->transactionData.sessions);
952  MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
953  (uint32)td->transactionData.server_bit);
954  }
955  else
956  {
957  /* Use separate read and update operations
958  * This relies on execution ordering between operations on
959  * the same row
960  */
961  NdbOperation * MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
962  CHECK_NULL(MyOp, "T5-1: getNdbOperation (readTuple)", td,
963  pCON->getNdbError());
964  MyOp->readTuple();
965  MyOp->equal(IND_SUBSCRIBER_NUMBER,
966  td->transactionData.number);
967  MyOp->getValue(IND_SUBSCRIBER_LOCATION,
968  (char *)&td->transactionData.location);
969  MyOp->getValue(IND_SUBSCRIBER_CHANGED_BY,
970  td->transactionData.changed_by);
971  MyOp->getValue(IND_SUBSCRIBER_CHANGED_TIME,
972  td->transactionData.changed_time);
973  MyOp->getValue(IND_SUBSCRIBER_GROUP,
974  (char *)&td->transactionData.group_id);
975  MyOp->getValue(IND_SUBSCRIBER_SESSIONS,
976  (char *)&td->transactionData.sessions);
977 
978  MyOp= pCON->getNdbOperation(SUBSCRIBER_TABLE);
979  CHECK_NULL(MyOp, "T5-1: getNdbOperation (updateTuple)", td,
980  pCON->getNdbError());
981  MyOp->interpretedUpdateTuple();
982  MyOp->equal(IND_SUBSCRIBER_NUMBER,
983  td->transactionData.number);
984  MyOp->subValue(IND_SUBSCRIBER_SESSIONS,
985  (uint32)td->transactionData.server_bit);
986  }
987  }
988  stat_async = async;
989  if (async == 1) {
990  pCON->executeAsynchPrepare( NoCommit , T5_Callback_1, td);
991  } else {
992  int result = pCON->execute( NoCommit );
993  T5_Callback_1(result, pCON, (void*)td);
994  return;
995  }//if
996 }
997 
998 void
999 T5_Callback_1(int result, NdbConnection * pCON, void * threadData){
1000  ThreadData * td = (ThreadData *)threadData;
1001  if (result == -1) {
1002  CHECK_ALLOWED_ERROR("T5-1: execute", td, pCON->getNdbError());
1003  td->pNDB->closeTransaction(pCON);
1004  start_T5(td->pNDB, td, stat_async);
1005  return;
1006  }//if
1007 
1008  DEBUG3("T5(%.*s, %.2d): - Callback 1",
1009  SUBSCRIBER_NUMBER_LENGTH,
1010  td->transactionData.number,
1011  td->transactionData.server_id);
1012 
1013  if (td->ndbRecordSharedData)
1014  {
1015  char* rowPtr= (char*) &td->transactionData;
1016  const NdbRecord* record= td->ndbRecordSharedData->
1017  groupTableAllowDeleteNdbRecord;
1018  Uint32 m=0;
1019  unsigned char* mask= (unsigned char*) &m;
1020 
1021  SET_MASK(mask, IND_GROUP_ALLOW_DELETE);
1022 
1023  const NdbOperation* MyOp= pCON->readTuple(record, rowPtr, record, rowPtr,
1025  mask);
1026 
1027  CHECK_NULL((void*)MyOp, "T5-2: readTuple", td,
1028  pCON->getNdbError());
1029  }
1030  else
1031  {
1032  NdbOperation * MyOp = pCON->getNdbOperation(GROUP_TABLE);
1033  CHECK_NULL(MyOp, "T5-2: getNdbOperation", td,
1034  pCON->getNdbError());
1035 
1036  MyOp->readTuple();
1037  MyOp->equal(IND_GROUP_ID,
1038  (char*)&td->transactionData.group_id);
1039  MyOp->getValue(IND_GROUP_ALLOW_DELETE,
1040  (char *)&td->transactionData.permission);
1041  }
1042 
1043  if (stat_async == 1) {
1044  pCON->executeAsynchPrepare( NoCommit , T5_Callback_2, td);
1045  } else {
1046  int result = pCON->execute( NoCommit );
1047  T5_Callback_2(result, pCON, (void*)td);
1048  return;
1049  }//if
1050 }
1051 
1052 void
1053 T5_Callback_2(int result, NdbConnection * pCON, void * threadData){
1054  ThreadData * td = (ThreadData *)threadData;
1055  if (result == -1) {
1056  CHECK_ALLOWED_ERROR("T5-2: execute", td, pCON->getNdbError());
1057  td->pNDB->closeTransaction(pCON);
1058  start_T5(td->pNDB, td, stat_async);
1059  return;
1060  }//if
1061 
1062  Uint32 permission = td->transactionData.permission;
1063  Uint32 sessions = td->transactionData.sessions;
1064  Uint32 server_bit = td->transactionData.server_bit;
1065 
1066  if(((permission & server_bit) == server_bit) &&
1067  ((sessions & server_bit) == server_bit)){
1068 
1069  memcpy(td->transactionData.suffix,
1070  &td->transactionData.number[SFX_START],
1071  SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
1072 
1073  DEBUG5("T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)",
1074  SUBSCRIBER_NUMBER_LENGTH,
1075  td->transactionData.number,
1076  td->transactionData.server_id,
1077  SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
1078  td->transactionData.suffix);
1079 
1080  if (td->ndbRecordSharedData)
1081  {
1082  char* rowPtr= (char*) &td->transactionData;
1083  const NdbRecord* record= td->ndbRecordSharedData->
1084  sessionTableNdbRecord;
1085  Uint32 m=0;
1086  unsigned char* mask= (unsigned char*) &m;
1087 
1088  const NdbOperation* MyOp= pCON->deleteTuple(record, rowPtr, record);
1089  CHECK_NULL((void*) MyOp, "T5-3: deleteTuple", td,
1090  pCON->getNdbError());
1091 
1092  record= td->ndbRecordSharedData->
1093  serverTableNdbRecord;
1094  m= 0;
1095 
1097  opts.optionsPresent= NdbOperation::OperationOptions::OO_INTERPRETED;
1098  opts.interpretedCode= td->ndbRecordSharedData->incrServerDeletesProg;
1099 
1100  MyOp= pCON->updateTuple(record, rowPtr, record, rowPtr, mask,
1101  &opts, sizeof(opts));
1102 
1103  CHECK_NULL((void*)MyOp, "T5-2: updateTuple", td,
1104  pCON->getNdbError());
1105  }
1106  else
1107  {
1108  /* Operation 3 */
1109  NdbOperation * MyOp = pCON->getNdbOperation(SESSION_TABLE);
1110  CHECK_NULL(MyOp, "T5-3: getNdbOperation", td,
1111  pCON->getNdbError());
1112 
1113  MyOp->deleteTuple();
1114  MyOp->equal(IND_SESSION_SUBSCRIBER,
1115  (char*)td->transactionData.number);
1116  MyOp->equal(IND_SESSION_SERVER,
1117  (char*)&td->transactionData.server_id);
1118  /* Operation 4 */
1119 
1120  /* Operation 5 */
1121  MyOp = pCON->getNdbOperation(SERVER_TABLE);
1122  CHECK_NULL(MyOp, "T5-5: getNdbOperation", td,
1123  pCON->getNdbError());
1124 
1125  MyOp->interpretedUpdateTuple();
1126  MyOp->equal(IND_SERVER_ID,
1127  (char*)&td->transactionData.server_id);
1128  MyOp->equal(IND_SERVER_SUBSCRIBER_SUFFIX,
1129  (char*)td->transactionData.suffix);
1130  MyOp->incValue(IND_SERVER_DELETES, (uint32)1);
1131  }
1132  td->transactionData.branchExecuted = 1;
1133  } else {
1134  td->transactionData.branchExecuted = 0;
1135 
1136  DEBUG5("T5(%.*s, %.2d): - Callback 2 - no delete - %s %s",
1137  SUBSCRIBER_NUMBER_LENGTH,
1138  td->transactionData.number,
1139  td->transactionData.server_id,
1140  ((permission & server_bit) ?
1141  "permission - " : "no permission - "),
1142  ((sessions & server_bit) ?
1143  "in session - " : "no in session - "));
1144  }
1145 
1146  if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
1147  if (stat_async == 1) {
1148  pCON->executeAsynchPrepare( Commit , T5_Callback_3, td);
1149  } else {
1150  int result = pCON->execute( Commit );
1151  T5_Callback_3(result, pCON, (void*)td);
1152  return;
1153  }//if
1154  } else {
1155  if (stat_async == 1) {
1156  pCON->executeAsynchPrepare( Rollback , T5_Callback_3, td);
1157  } else {
1158  int result = pCON->execute( Rollback );
1159  T5_Callback_3(result, pCON, (void*)td);
1160  return;
1161  }//if
1162  }
1163 }
1164 
1165 void
1166 T5_Callback_3(int result, NdbConnection * pCON, void * threadData){
1167  ThreadData * td = (ThreadData *)threadData;
1168  if (result == -1) {
1169  CHECK_ALLOWED_ERROR("T5-3: Commit", td, pCON->getNdbError());
1170  td->pNDB->closeTransaction(pCON);
1171  start_T5(td->pNDB, td, stat_async);
1172  return;
1173  }//if
1174 
1175  DEBUG3("T5(%.*s, %.2d): - Completing",
1176  SUBSCRIBER_NUMBER_LENGTH,
1177  td->transactionData.number,
1178  td->transactionData.server_id);
1179 
1180  td->pNDB->closeTransaction(pCON);
1181  complete_T5(td);
1182 }