22 #include "userInterface.h"
25 #include "ndb_schema.hpp"
26 #include "ndb_error.hpp"
31 void T1_Callback(
int result,
NdbConnection * pCon,
void * threadData);
32 void T2_Callback(
int result,
NdbConnection * pCon,
void * threadData);
33 void T3_Callback_1(
int result,
NdbConnection * pCon,
void * threadData);
34 void T3_Callback_2(
int result,
NdbConnection * pCon,
void * threadData);
35 void T3_Callback_3(
int result,
NdbConnection * pCon,
void * threadData);
36 void T4_Callback_1(
int result,
NdbConnection * pCon,
void * threadData);
37 void T4_Callback_2(
int result,
NdbConnection * pCon,
void * threadData);
38 void T4_Callback_3(
int result,
NdbConnection * pCon,
void * threadData);
39 void T5_Callback_1(
int result,
NdbConnection * pCon,
void * threadData);
40 void T5_Callback_2(
int result,
NdbConnection * pCon,
void * threadData);
41 void T5_Callback_3(
int result,
NdbConnection * pCon,
void * threadData);
43 static int stat_async = 0;
59 #define SFX_START (SUBSCRIBER_NUMBER_LENGTH - SUBSCRIBER_NUMBER_SUFFIX_LENGTH)
64 return pNDB->startTransactionDGroup (0,
65 &td->transactionData.number[SFX_START],
72 DEBUG2(
"T1(%.*s): - Starting", SUBSCRIBER_NUMBER_LENGTH,
73 td->transactionData.number);
76 while((pCON = startTransaction(pNDB, td)) == 0){
77 CHECK_ALLOWED_ERROR(
"T1: startTransaction", td, pNDB->
getNdbError());
78 NdbSleep_MilliSleep(10);
84 MyOp->
equal(IND_SUBSCRIBER_NUMBER,
85 td->transactionData.number);
86 MyOp->
setValue(IND_SUBSCRIBER_LOCATION,
87 (
char *)&td->transactionData.location);
88 MyOp->
setValue(IND_SUBSCRIBER_CHANGED_BY,
89 td->transactionData.changed_by);
90 MyOp->
setValue(IND_SUBSCRIBER_CHANGED_TIME,
91 td->transactionData.changed_time);
95 int result = pCON->
execute(Commit);
96 T1_Callback(result, pCON, (
void*)td);
100 CHECK_NULL(MyOp,
"T1: getNdbOperation", td, pCON->
getNdbError());
105 T1_Callback(
int result,
NdbConnection * pCON,
void * threadData) {
108 DEBUG2(
"T1(%.*s): - Completing", SUBSCRIBER_NUMBER_LENGTH,
109 td->transactionData.number);
112 CHECK_ALLOWED_ERROR(
"T1: Commit", td, pCON->
getNdbError());
114 start_T1(td->pNDB, td, stat_async);
138 DEBUG3(
"T2(%.*s, %d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
139 td->transactionData.number,
140 td->transactionData.location);
144 while((pCON = startTransaction(pNDB, td)) == 0){
145 CHECK_ALLOWED_ERROR(
"T2-1: startTransaction", td, pNDB->
getNdbError());
146 NdbSleep_MilliSleep(10);
150 CHECK_NULL(MyOp,
"T2: getNdbOperation", td,
154 MyOp->
equal(IND_SUBSCRIBER_NUMBER,
155 td->transactionData.number);
156 MyOp->
getValue(IND_SUBSCRIBER_LOCATION,
157 (
char *)&td->transactionData.location);
158 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_BY,
159 td->transactionData.changed_by);
160 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_TIME,
161 td->transactionData.changed_time);
163 td->transactionData.name);
167 int result = pCON->
execute(Commit);
168 T2_Callback(result, pCON, (
void*)td);
174 T2_Callback(
int result,
NdbConnection * pCON,
void * threadData){
176 DEBUG3(
"T2(%.*s, %d): - Completing", SUBSCRIBER_NUMBER_LENGTH,
177 td->transactionData.number,
178 td->transactionData.location);
181 CHECK_ALLOWED_ERROR(
"T2: Commit", td, pCON->
getNdbError());
183 start_T2(td->pNDB, td, stat_async);
210 DEBUG3(
"T3(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
211 td->transactionData.number,
212 td->transactionData.server_id);
216 while((pCON = startTransaction(pNDB, td)) == 0){
217 CHECK_ALLOWED_ERROR(
"T3-1: startTransaction", td, pNDB->
getNdbError());
218 NdbSleep_MilliSleep(10);
222 CHECK_NULL(MyOp,
"T3-1: getNdbOperation", td,
226 MyOp->
equal(IND_SUBSCRIBER_NUMBER,
227 td->transactionData.number);
228 MyOp->
getValue(IND_SUBSCRIBER_LOCATION,
229 (
char *)&td->transactionData.location);
230 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_BY,
231 td->transactionData.changed_by);
232 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_TIME,
233 td->transactionData.changed_time);
234 MyOp->
getValue(IND_SUBSCRIBER_GROUP,
235 (
char *)&td->transactionData.group_id);
236 MyOp->
getValue(IND_SUBSCRIBER_SESSIONS,
237 (
char *)&td->transactionData.sessions);
242 int result = pCON->
execute( NoCommit );
243 T3_Callback_1(result, pCON, (
void*)td);
249 T3_Callback_1(
int result,
NdbConnection * pCON,
void * threadData){
251 DEBUG3(
"T3(%.*s, %.2d): - Callback 1", SUBSCRIBER_NUMBER_LENGTH,
252 td->transactionData.number,
253 td->transactionData.server_id);
256 CHECK_ALLOWED_ERROR(
"T3-1: execute", td, pCON->
getNdbError());
258 start_T3(td->pNDB, td, stat_async);
263 CHECK_NULL(MyOp,
"T3-2: getNdbOperation", td,
267 MyOp->
equal(IND_GROUP_ID,
268 (
char*)&td->transactionData.group_id);
269 MyOp->
getValue(IND_GROUP_ALLOW_READ,
270 (
char *)&td->transactionData.permission);
271 if (stat_async == 1) {
274 int result = pCON->
execute( NoCommit );
275 T3_Callback_2(result, pCON, (
void*)td);
281 T3_Callback_2(
int result,
NdbConnection * pCON,
void * threadData){
285 CHECK_ALLOWED_ERROR(
"T3-2: execute", td, pCON->
getNdbError());
287 start_T3(td->pNDB, td, stat_async);
291 Uint32 permission = td->transactionData.permission;
292 Uint32 sessions = td->transactionData.sessions;
293 Uint32 server_bit = td->transactionData.server_bit;
295 if(((permission & server_bit) == server_bit) &&
296 ((sessions & server_bit) == server_bit)){
298 memcpy(td->transactionData.suffix,
299 &td->transactionData.number[SFX_START],
300 SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
301 DEBUG5(
"T3(%.*s, %.2d): - Callback 2 - reading(%.*s)",
302 SUBSCRIBER_NUMBER_LENGTH,
303 td->transactionData.number,
304 td->transactionData.server_id,
305 SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
306 td->transactionData.suffix);
310 CHECK_NULL(MyOp,
"T3-3: getNdbOperation", td,
314 MyOp->
equal(IND_SESSION_SUBSCRIBER,
315 (
char*)td->transactionData.number);
316 MyOp->
equal(IND_SESSION_SERVER,
317 (
char*)&td->transactionData.server_id);
319 (
char *)td->transactionData.session_details);
323 CHECK_NULL(MyOp,
"T3-4: getNdbOperation", td,
327 MyOp->
equal(IND_SERVER_ID,
328 (
char*)&td->transactionData.server_id);
329 MyOp->
equal(IND_SERVER_SUBSCRIBER_SUFFIX,
330 (
char*)td->transactionData.suffix);
331 MyOp->
incValue(IND_SERVER_READS, (uint32)1);
332 td->transactionData.branchExecuted = 1;
334 DEBUG3(
"T3(%.*s, %.2d): - Callback 2 - no read",
335 SUBSCRIBER_NUMBER_LENGTH,
336 td->transactionData.number,
337 td->transactionData.server_id);
338 td->transactionData.branchExecuted = 0;
340 if (stat_async == 1) {
343 int result = pCON->
execute( Commit );
344 T3_Callback_3(result, pCON, (
void*)td);
350 T3_Callback_3(
int result,
NdbConnection * pCON,
void * threadData){
352 DEBUG3(
"T3(%.*s, %.2d): - Completing", SUBSCRIBER_NUMBER_LENGTH,
353 td->transactionData.number,
354 td->transactionData.server_id);
357 CHECK_ALLOWED_ERROR(
"T3-3: Commit", td, pCON->
getNdbError());
359 start_T3(td->pNDB, td, stat_async);
386 DEBUG3(
"T4(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
387 td->transactionData.number,
388 td->transactionData.server_id);
391 while((pCON = startTransaction(pNDB, td)) == 0){
392 CHECK_ALLOWED_ERROR(
"T4-1: startTransaction", td, pNDB->
getNdbError());
393 NdbSleep_MilliSleep(10);
397 CHECK_NULL(MyOp,
"T4-1: getNdbOperation", td,
401 MyOp->
equal(IND_SUBSCRIBER_NUMBER,
402 td->transactionData.number);
403 MyOp->
getValue(IND_SUBSCRIBER_LOCATION,
404 (
char *)&td->transactionData.location);
405 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_BY,
406 td->transactionData.changed_by);
407 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_TIME,
408 td->transactionData.changed_time);
409 MyOp->
getValue(IND_SUBSCRIBER_GROUP,
410 (
char *)&td->transactionData.group_id);
411 MyOp->
getValue(IND_SUBSCRIBER_SESSIONS,
412 (
char *)&td->transactionData.sessions);
413 MyOp->
incValue(IND_SUBSCRIBER_SESSIONS,
414 (uint32)td->transactionData.server_bit);
419 int result = pCON->
execute( NoCommit );
420 T4_Callback_1(result, pCON, (
void*)td);
426 T4_Callback_1(
int result,
NdbConnection * pCON,
void * threadData){
429 CHECK_ALLOWED_ERROR(
"T4-1: execute", td, pCON->
getNdbError());
431 start_T4(td->pNDB, td, stat_async);
435 DEBUG3(
"T4(%.*s, %.2d): - Callback 1",
436 SUBSCRIBER_NUMBER_LENGTH,
437 td->transactionData.number,
438 td->transactionData.server_id);
442 CHECK_NULL(MyOp,
"T4-2: getNdbOperation", td,
446 MyOp->
equal(IND_GROUP_ID,
447 (
char*)&td->transactionData.group_id);
448 MyOp->
getValue(IND_GROUP_ALLOW_INSERT,
449 (
char *)&td->transactionData.permission);
450 if (stat_async == 1) {
453 int result = pCON->
execute( NoCommit );
454 T4_Callback_2(result, pCON, (
void*)td);
460 T4_Callback_2(
int result,
NdbConnection * pCON,
void * threadData){
463 CHECK_ALLOWED_ERROR(
"T4-2: execute", td, pCON->
getNdbError());
465 start_T4(td->pNDB, td, stat_async);
469 Uint32 permission = td->transactionData.permission;
470 Uint32 sessions = td->transactionData.sessions;
471 Uint32 server_bit = td->transactionData.server_bit;
473 if(((permission & server_bit) == server_bit) &&
474 ((sessions & server_bit) == 0)){
476 memcpy(td->transactionData.suffix,
477 &td->transactionData.number[SFX_START],
478 SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
480 DEBUG5(
"T4(%.*s, %.2d): - Callback 2 - inserting(%.*s)",
481 SUBSCRIBER_NUMBER_LENGTH,
482 td->transactionData.number,
483 td->transactionData.server_id,
484 SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
485 td->transactionData.suffix);
490 CHECK_NULL(MyOp,
"T4-3: getNdbOperation", td,
494 MyOp->
equal(IND_SESSION_SUBSCRIBER,
495 (
char*)td->transactionData.number);
496 MyOp->
equal(IND_SESSION_SERVER,
497 (
char*)&td->transactionData.server_id);
499 (
char *)td->transactionData.session_details);
504 CHECK_NULL(MyOp,
"T4-5: getNdbOperation", td,
508 MyOp->
equal(IND_SERVER_ID,
509 (
char*)&td->transactionData.server_id);
510 MyOp->
equal(IND_SERVER_SUBSCRIBER_SUFFIX,
511 (
char*)td->transactionData.suffix);
512 MyOp->
incValue(IND_SERVER_INSERTS, (uint32)1);
513 td->transactionData.branchExecuted = 1;
515 td->transactionData.branchExecuted = 0;
516 DEBUG5(
"T4(%.*s, %.2d): - Callback 2 - %s %s",
517 SUBSCRIBER_NUMBER_LENGTH,
518 td->transactionData.number,
519 td->transactionData.server_id,
520 ((permission & server_bit) ?
521 "permission - " :
"no permission - "),
522 ((sessions & server_bit) ?
523 "in session - " :
"no in session - "));
526 if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
527 if (stat_async == 1) {
530 int result = pCON->
execute( Commit );
531 T4_Callback_3(result, pCON, (
void*)td);
535 if (stat_async == 1) {
538 int result = pCON->
execute( Rollback );
539 T4_Callback_3(result, pCON, (
void*)td);
546 T4_Callback_3(
int result,
NdbConnection * pCON,
void * threadData){
549 CHECK_ALLOWED_ERROR(
"T4-3: Commit", td, pCON->
getNdbError());
551 start_T4(td->pNDB, td, stat_async);
555 DEBUG3(
"T4(%.*s, %.2d): - Completing",
556 SUBSCRIBER_NUMBER_LENGTH,
557 td->transactionData.number,
558 td->transactionData.server_id);
583 DEBUG3(
"T5(%.*s, %.2d): - Starting", SUBSCRIBER_NUMBER_LENGTH,
584 td->transactionData.number,
585 td->transactionData.server_id);
588 while((pCON = startTransaction(pNDB, td)) == 0){
589 CHECK_ALLOWED_ERROR(
"T5-1: startTransaction", td, pNDB->
getNdbError());
590 NdbSleep_MilliSleep(10);
594 CHECK_NULL(MyOp,
"T5-1: getNdbOperation", td,
598 MyOp->
equal(IND_SUBSCRIBER_NUMBER,
599 td->transactionData.number);
600 MyOp->
getValue(IND_SUBSCRIBER_LOCATION,
601 (
char *)&td->transactionData.location);
602 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_BY,
603 td->transactionData.changed_by);
604 MyOp->
getValue(IND_SUBSCRIBER_CHANGED_TIME,
605 td->transactionData.changed_time);
606 MyOp->
getValue(IND_SUBSCRIBER_GROUP,
607 (
char *)&td->transactionData.group_id);
608 MyOp->
getValue(IND_SUBSCRIBER_SESSIONS,
609 (
char *)&td->transactionData.sessions);
610 MyOp->
subValue(IND_SUBSCRIBER_SESSIONS,
611 (uint32)td->transactionData.server_bit);
616 int result = pCON->
execute( NoCommit );
617 T5_Callback_1(result, pCON, (
void*)td);
623 T5_Callback_1(
int result,
NdbConnection * pCON,
void * threadData){
626 CHECK_ALLOWED_ERROR(
"T5-1: execute", td, pCON->
getNdbError());
628 start_T5(td->pNDB, td, stat_async);
632 DEBUG3(
"T5(%.*s, %.2d): - Callback 1",
633 SUBSCRIBER_NUMBER_LENGTH,
634 td->transactionData.number,
635 td->transactionData.server_id);
638 CHECK_NULL(MyOp,
"T5-2: getNdbOperation", td,
642 MyOp->
equal(IND_GROUP_ID,
643 (
char*)&td->transactionData.group_id);
644 MyOp->
getValue(IND_GROUP_ALLOW_DELETE,
645 (
char *)&td->transactionData.permission);
646 if (stat_async == 1) {
649 int result = pCON->
execute( NoCommit );
650 T5_Callback_2(result, pCON, (
void*)td);
656 T5_Callback_2(
int result,
NdbConnection * pCON,
void * threadData){
659 CHECK_ALLOWED_ERROR(
"T5-2: execute", td, pCON->
getNdbError());
661 start_T5(td->pNDB, td, stat_async);
665 Uint32 permission = td->transactionData.permission;
666 Uint32 sessions = td->transactionData.sessions;
667 Uint32 server_bit = td->transactionData.server_bit;
669 if(((permission & server_bit) == server_bit) &&
670 ((sessions & server_bit) == server_bit)){
672 memcpy(td->transactionData.suffix,
673 &td->transactionData.number[SFX_START],
674 SUBSCRIBER_NUMBER_SUFFIX_LENGTH);
676 DEBUG5(
"T5(%.*s, %.2d): - Callback 2 - deleting(%.*s)",
677 SUBSCRIBER_NUMBER_LENGTH,
678 td->transactionData.number,
679 td->transactionData.server_id,
680 SUBSCRIBER_NUMBER_SUFFIX_LENGTH,
681 td->transactionData.suffix);
685 CHECK_NULL(MyOp,
"T5-3: getNdbOperation", td,
689 MyOp->
equal(IND_SESSION_SUBSCRIBER,
690 (
char*)td->transactionData.number);
691 MyOp->
equal(IND_SESSION_SERVER,
692 (
char*)&td->transactionData.server_id);
697 CHECK_NULL(MyOp,
"T5-5: getNdbOperation", td,
701 MyOp->
equal(IND_SERVER_ID,
702 (
char*)&td->transactionData.server_id);
703 MyOp->
equal(IND_SERVER_SUBSCRIBER_SUFFIX,
704 (
char*)td->transactionData.suffix);
705 MyOp->
incValue(IND_SERVER_DELETES, (uint32)1);
706 td->transactionData.branchExecuted = 1;
708 td->transactionData.branchExecuted = 0;
710 DEBUG5(
"T5(%.*s, %.2d): - Callback 2 - no delete - %s %s",
711 SUBSCRIBER_NUMBER_LENGTH,
712 td->transactionData.number,
713 td->transactionData.server_id,
714 ((permission & server_bit) ?
715 "permission - " :
"no permission - "),
716 ((sessions & server_bit) ?
717 "in session - " :
"no in session - "));
720 if(!td->transactionData.do_rollback && td->transactionData.branchExecuted){
721 if (stat_async == 1) {
724 int result = pCON->
execute( Commit );
725 T5_Callback_3(result, pCON, (
void*)td);
729 if (stat_async == 1) {
732 int result = pCON->
execute( Rollback );
733 T5_Callback_3(result, pCON, (
void*)td);
740 T5_Callback_3(
int result,
NdbConnection * pCON,
void * threadData){
743 CHECK_ALLOWED_ERROR(
"T5-3: Commit", td, pCON->
getNdbError());
745 start_T5(td->pNDB, td, stat_async);
749 DEBUG3(
"T5(%.*s, %.2d): - Completing",
750 SUBSCRIBER_NUMBER_LENGTH,
751 td->transactionData.number,
752 td->transactionData.server_id);