MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
asyncGenerator.cpp
1 /*
2  Copyright (C) 2003-2006 MySQL AB
3  All rights reserved. Use is subject to license terms.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; version 2 of the License.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 /***************************************************************
20 * I N C L U D E D F I L E S *
21 ***************************************************************/
22 
23 #include <ndb_global.h>
24 
25 #include "dbGenerator.h"
26 #include <NdbApi.hpp>
27 #include <NdbOut.hpp>
28 #include <NdbSleep.h>
29 
30 /***************************************************************
31 * L O C A L C O N S T A N T S *
32 ***************************************************************/
33 
34 /***************************************************************
35 * L O C A L D A T A S T R U C T U R E S *
36 ***************************************************************/
37 
38 /***************************************************************
39 * L O C A L F U N C T I O N S *
40 ***************************************************************/
41 
42 static void getRandomSubscriberNumber(SubscriberNumber number);
43 static void getRandomServerId(ServerId *serverId);
44 static void getRandomChangedBy(ChangedBy changedBy);
45 static void getRandomChangedTime(ChangedTime changedTime);
46 
47 static void clearTransaction(TransactionDefinition *trans);
48 static void initGeneratorStatistics(GeneratorStatistics *gen);
49 
50 static void doOneTransaction(ThreadData * td,
51  int parallellism,
52  int millisSendPoll,
53  int minEventSendPoll,
54  int forceSendPoll);
55 static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
56 static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
57 static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
58 static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
59 static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
60 
61 /***************************************************************
62 * L O C A L D A T A *
63 ***************************************************************/
64 
65 static SequenceValues transactionDefinition[] = {
66  {25, 1},
67  {25, 2},
68  {20, 3},
69  {15, 4},
70  {15, 5},
71  {0, 0}
72 };
73 
74 static SequenceValues rollbackDefinition[] = {
75  {98, 0},
76  {2 , 1},
77  {0, 0}
78 };
79 
80 static int maxsize = 0;
81 
82 /***************************************************************
83 * P U B L I C D A T A *
84 ***************************************************************/
85 
86 /***************************************************************
87 ****************************************************************
88 * L O C A L F U N C T I O N S C O D E S E C T I O N *
89 ****************************************************************
90 ***************************************************************/
91 
92 static void getRandomSubscriberNumber(SubscriberNumber number)
93 {
94  uint32 tmp;
95  char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
96  tmp = myRandom48(NO_OF_SUBSCRIBERS);
97  sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
98  memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
99 }
100 
101 static void getRandomServerId(ServerId *serverId)
102 {
103  *serverId = myRandom48(NO_OF_SERVERS);
104 }
105 
106 static void getRandomChangedBy(ChangedBy changedBy)
107 {
108  memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
109  changedBy[CHANGED_BY_LENGTH] = 0;
110 }
111 
112 static void getRandomChangedTime(ChangedTime changedTime)
113 {
114  memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
115  changedTime[CHANGED_TIME_LENGTH] = 0;
116 }
117 
118 static void clearTransaction(TransactionDefinition *trans)
119 {
120  trans->count = 0;
121  trans->branchExecuted = 0;
122  trans->rollbackExecuted = 0;
123  trans->latencyCounter = myRandom48(127);
124  trans->latency.reset();
125 }
126 
127 static int listFull(SessionList *list)
128 {
129  return(list->numberInList == SESSION_LIST_LENGTH);
130 }
131 
132 static int listEmpty(SessionList *list)
133 {
134  return(list->numberInList == 0);
135 }
136 
137 static void insertSession(SessionList *list,
138  SubscriberNumber number,
139  ServerId serverId)
140 {
141  SessionElement *e;
142  if( listFull(list) ) return;
143 
144  e = &list->list[list->writeIndex];
145 
146  strcpy(e->subscriberNumber, number);
147  e->serverId = serverId;
148 
149  list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
150  list->numberInList++;
151 
152  if( list->numberInList > maxsize )
153  maxsize = list->numberInList;
154 }
155 
156 static SessionElement *getNextSession(SessionList *list)
157 {
158  if( listEmpty(list) ) return(0);
159 
160  return(&list->list[list->readIndex]);
161 }
162 
163 static void deleteSession(SessionList *list)
164 {
165  if( listEmpty(list) ) return;
166 
167  list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
168  list->numberInList--;
169 }
170 
171 static void initGeneratorStatistics(GeneratorStatistics *gen)
172 {
173  int i;
174 
175  if( initSequence(&gen->transactionSequence,
176  transactionDefinition) != 0 ) {
177  ndbout_c("could not set the transaction types");
178  exit(0);
179  }
180 
181  if( initSequence(&gen->rollbackSequenceT4,
182  rollbackDefinition) != 0 ) {
183  ndbout_c("could not set the rollback sequence");
184  exit(0);
185  }
186 
187  if( initSequence(&gen->rollbackSequenceT5,
188  rollbackDefinition) != 0 ) {
189  ndbout_c("could not set the rollback sequence");
190  exit(0);
191  }
192 
193  for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
194  clearTransaction(&gen->transactions[i]);
195 
196  gen->totalTransactions = 0;
197 
198  gen->activeSessions.numberInList = 0;
199  gen->activeSessions.readIndex = 0;
200  gen->activeSessions.writeIndex = 0;
201 }
202 
203 
204 static
205 void
206 doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
207 {
208  int i;
209  unsigned int transactionType;
210  int async = 1;
211  if (p == 1) {
212  async = 0;
213  }//if
214  for(i = 0; i<p; i++){
215  if(td[i].runState == Runnable){
216  transactionType = getNextRandom(&td[i].generator.transactionSequence);
217 
218  switch(transactionType) {
219  case 1:
220  doTransaction_T1(td[i].pNDB, &td[i], async);
221  break;
222  case 2:
223  doTransaction_T2(td[i].pNDB, &td[i], async);
224  break;
225  case 3:
226  doTransaction_T3(td[i].pNDB, &td[i], async);
227  break;
228  case 4:
229  doTransaction_T4(td[i].pNDB, &td[i], async);
230  break;
231  case 5:
232  doTransaction_T5(td[i].pNDB, &td[i], async);
233  break;
234  default:
235  ndbout_c("Unknown transaction type: %d", transactionType);
236  }
237  }
238  }
239  if (async == 1) {
240  td[0].pNDB->sendPollNdb(millis, minEvents, force);
241  }//if
242 }
243 
244 static
245 void
246 doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
247 {
248  /*----------------*/
249  /* Init arguments */
250  /*----------------*/
251  getRandomSubscriberNumber(td->transactionData.number);
252  getRandomChangedBy(td->transactionData.changed_by);
253  BaseString::snprintf(td->transactionData.changed_time,
254  sizeof(td->transactionData.changed_time),
255  "%ld - %d", td->changedTime++, myRandom48(65536*1024));
256  //getRandomChangedTime(td->transactionData.changed_time);
257  td->transactionData.location = td->transactionData.changed_by[0];
258 
259  /*-----------------*/
260  /* Run transaction */
261  /*-----------------*/
262  td->runState = Running;
263  td->generator.transactions[0].startLatency();
264 
265  start_T1(pNDB, td, async);
266 }
267 
268 static
269 void
270 doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
271 {
272  /*----------------*/
273  /* Init arguments */
274  /*----------------*/
275  getRandomSubscriberNumber(td->transactionData.number);
276 
277  /*-----------------*/
278  /* Run transaction */
279  /*-----------------*/
280  td->runState = Running;
281  td->generator.transactions[1].startLatency();
282 
283  start_T2(pNDB, td, async);
284 }
285 
286 static
287 void
288 doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
289 {
290  SessionElement *se;
291 
292  /*----------------*/
293  /* Init arguments */
294  /*----------------*/
295  se = getNextSession(&td->generator.activeSessions);
296  if( se ) {
297  strcpy(td->transactionData.number, se->subscriberNumber);
298  td->transactionData.server_id = se->serverId;
299  td->transactionData.sessionElement = 1;
300  } else {
301  getRandomSubscriberNumber(td->transactionData.number);
302  getRandomServerId(&td->transactionData.server_id);
303  td->transactionData.sessionElement = 0;
304  }
305 
306  td->transactionData.server_bit = (1 << td->transactionData.server_id);
307 
308  /*-----------------*/
309  /* Run transaction */
310  /*-----------------*/
311  td->runState = Running;
312  td->generator.transactions[2].startLatency();
313  start_T3(pNDB, td, async);
314 }
315 
316 static
317 void
318 doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
319 {
320  /*----------------*/
321  /* Init arguments */
322  /*----------------*/
323  getRandomSubscriberNumber(td->transactionData.number);
324  getRandomServerId(&td->transactionData.server_id);
325 
326  td->transactionData.server_bit = (1 << td->transactionData.server_id);
327  td->transactionData.do_rollback =
328  getNextRandom(&td->generator.rollbackSequenceT4);
329 
330 #if 0
331  memset(td->transactionData.session_details,
332  myRandom48(26)+'A', SESSION_DETAILS_LENGTH);
333 #endif
334  td->transactionData.session_details[SESSION_DETAILS_LENGTH] = 0;
335 
336  /*-----------------*/
337  /* Run transaction */
338  /*-----------------*/
339  td->runState = Running;
340  td->generator.transactions[3].startLatency();
341  start_T4(pNDB, td, async);
342 }
343 
344 static
345 void
346 doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
347 {
348  SessionElement * se;
349  se = getNextSession(&td->generator.activeSessions);
350  if( se ) {
351  strcpy(td->transactionData.number, se->subscriberNumber);
352  td->transactionData.server_id = se->serverId;
353  td->transactionData.sessionElement = 1;
354  }
355  else {
356  getRandomSubscriberNumber(td->transactionData.number);
357  getRandomServerId(&td->transactionData.server_id);
358  td->transactionData.sessionElement = 0;
359  }
360 
361  td->transactionData.server_bit = (1 << td->transactionData.server_id);
362  td->transactionData.do_rollback
363  = getNextRandom(&td->generator.rollbackSequenceT5);
364 
365  /*-----------------*/
366  /* Run transaction */
367  /*-----------------*/
368  td->runState = Running;
369  td->generator.transactions[4].startLatency();
370  start_T5(pNDB, td, async);
371 }
372 
373 void
374 complete_T1(ThreadData * data){
375  data->generator.transactions[0].stopLatency();
376  data->generator.transactions[0].count++;
377 
378  data->runState = Runnable;
379  data->generator.totalTransactions++;
380 }
381 
382 void
383 complete_T2(ThreadData * data){
384  data->generator.transactions[1].stopLatency();
385  data->generator.transactions[1].count++;
386 
387  data->runState = Runnable;
388  data->generator.totalTransactions++;
389 }
390 
391 void
392 complete_T3(ThreadData * data){
393 
394  data->generator.transactions[2].stopLatency();
395  data->generator.transactions[2].count++;
396 
397  if(data->transactionData.branchExecuted)
398  data->generator.transactions[2].branchExecuted++;
399 
400  data->runState = Runnable;
401  data->generator.totalTransactions++;
402 }
403 
404 void
405 complete_T4(ThreadData * data){
406 
407  data->generator.transactions[3].stopLatency();
408  data->generator.transactions[3].count++;
409 
410  if(data->transactionData.branchExecuted)
411  data->generator.transactions[3].branchExecuted++;
412  if(data->transactionData.do_rollback)
413  data->generator.transactions[3].rollbackExecuted++;
414 
415  if(data->transactionData.branchExecuted &&
416  !data->transactionData.do_rollback){
417  insertSession(&data->generator.activeSessions,
418  data->transactionData.number,
419  data->transactionData.server_id);
420  }
421 
422  data->runState = Runnable;
423  data->generator.totalTransactions++;
424 
425 }
426 void
427 complete_T5(ThreadData * data){
428 
429  data->generator.transactions[4].stopLatency();
430  data->generator.transactions[4].count++;
431 
432  if(data->transactionData.branchExecuted)
433  data->generator.transactions[4].branchExecuted++;
434  if(data->transactionData.do_rollback)
435  data->generator.transactions[4].rollbackExecuted++;
436 
437  if(data->transactionData.sessionElement &&
438  !data->transactionData.do_rollback){
439  deleteSession(&data->generator.activeSessions);
440  }
441 
442  data->runState = Runnable;
443  data->generator.totalTransactions++;
444 }
445 
446 /***************************************************************
447 ****************************************************************
448 * P U B L I C F U N C T I O N S C O D E S E C T I O N *
449 ****************************************************************
450 ***************************************************************/
451 void
452 asyncGenerator(ThreadData *data,
453  int parallellism,
454  int millisSendPoll,
455  int minEventSendPoll,
456  int forceSendPoll)
457 {
458  ThreadData * startUp;
459 
461  double periodStop;
462  double benchTimeStart;
463  double benchTimeEnd;
464  int i, j, done;
465 
466  myRandom48Init(data->randomSeed);
467 
468  for(i = 0; i<parallellism; i++){
469  initGeneratorStatistics(&data[i].generator);
470  }
471 
472  startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
473  memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
474 
475  /*----------------*/
476  /* warm up period */
477  /*----------------*/
478  periodStop = userGetTime() + (double)data[0].warmUpSeconds;
479 
480  while(userGetTime() < periodStop){
481  doOneTransaction(startUp, parallellism,
482  millisSendPoll, minEventSendPoll, forceSendPoll);
483  }
484 
485  ndbout_c("Waiting for startup to finish");
486 
490  done = 0;
491  while(!done){
492  done = 1;
493  for(i = 0; i<parallellism; i++){
494  if(startUp[i].runState != Runnable){
495  done = 0;
496  break;
497  }
498  }
499  if(!done){
500  startUp[0].pNDB->sendPollNdb();
501  }
502  }
503  ndbout_c("Benchmark period starts");
504 
505  /*-------------------------*/
506  /* normal benchmark period */
507  /*-------------------------*/
508  benchTimeStart = userGetTime();
509 
510  periodStop = benchTimeStart + (double)data[0].testSeconds;
511  while(userGetTime() < periodStop)
512  doOneTransaction(data, parallellism,
513  millisSendPoll, minEventSendPoll, forceSendPoll);
514 
515  benchTimeEnd = userGetTime();
516 
517  ndbout_c("Benchmark period done");
518 
522  done = 0;
523  while(!done){
524  done = 1;
525  for(i = 0; i<parallellism; i++){
526  if(data[i].runState != Runnable){
527  done = 0;
528  break;
529  }
530  }
531  if(!done){
532  data[0].pNDB->sendPollNdb();
533  }
534  }
535 
536  /*------------------*/
537  /* cool down period */
538  /*------------------*/
539  periodStop = userGetTime() + (double)data[0].coolDownSeconds;
540  while(userGetTime() < periodStop){
541  doOneTransaction(startUp, parallellism,
542  millisSendPoll, minEventSendPoll, forceSendPoll);
543  }
544 
545  done = 0;
546  while(!done){
547  done = 1;
548  for(i = 0; i<parallellism; i++){
549  if(startUp[i].runState != Runnable){
550  done = 0;
551  break;
552  }
553  }
554  if(!done){
555  startUp[0].pNDB->sendPollNdb();
556  }
557  }
558 
559 
560  /*---------------------------------------------------------*/
561  /* add the times for all transaction for inner loop timing */
562  /*---------------------------------------------------------*/
563  for(j = 0; j<parallellism; j++){
564  st = &data[j].generator;
565 
566  st->outerLoopTime = benchTimeEnd - benchTimeStart;
567  st->outerTps = getTps(st->totalTransactions, st->outerLoopTime);
568  }
569  /* ndbout_c("maxsize = %d\n",maxsize); */
570 
571  free(startUp);
572 }
573