20 #include <mysqld_error.h>
25 #include <ndb_global.h>
29 #include "../../src/ndbapi/NdbQueryBuilder.hpp"
30 #include "../../src/ndbapi/NdbQueryOperation.hpp"
38 #define ASSERT_ALWAYS(cond) if(unlikely(!(cond))){abort();}
40 #define ASSERT_ALWAYS assert
47 #define PRINT_ERROR(code,msg) \
48 std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
49 << ", code: " << code \
50 << ", msg: " << msg << "." << std::endl
52 #define APIERROR(error) { \
53 PRINT_ERROR((error).code,(error).message); \
60 const char* databaseName =
"PTDB";
61 const char* tableName =
"TT";
73 bool m_useLinkedOperations;
81 static void *callback(
void* thread);
104 enum {State_Active, State_Stopping, State_Stopped} m_state;
105 pthread_t m_posixThread;
106 pthread_mutex_t m_mutex;
107 pthread_cond_t m_condition;
117 void doLinkedAPITest();
118 void doNonLinkedAPITest();
122 static void *callback(
void* thread){
127 static void printMySQLError(
MYSQL& mysql,
const char* before=NULL){
131 ndbout << mysql_error(&mysql) << endl;
135 static void mySQLExec(
MYSQL& mysql,
const char* stmt){
137 if(mysql_query(&mysql, stmt) != 0){
138 ndbout <<
"Error executing '" << stmt <<
"' : ";
139 printMySQLError(mysql);
141 mysql_free_result(mysql_use_result(&mysql));
149 m_ndb(&con, databaseName),
150 m_state(State_Active){
151 ASSERT_ALWAYS(m_ndb.
init()==0);
152 ASSERT_ALWAYS(pthread_mutex_init(&m_mutex, NULL)==0);
153 ASSERT_ALWAYS(pthread_cond_init(&m_condition, NULL)==0);
154 ASSERT_ALWAYS(pthread_create(&m_posixThread, NULL,
callback,
this)
159 m_index = dict->
getIndex(
"PRIMARY", tableName);
160 ASSERT_ALWAYS(m_index != NULL);
164 ASSERT_ALWAYS(m_resultRec!=NULL);
168 ASSERT_ALWAYS(col1 != NULL);
173 m_keyRec = dict->createRecord(m_tab, &spec, 1,
sizeof spec);
174 ASSERT_ALWAYS(m_keyRec != NULL);
177 ASSERT_ALWAYS(m_indexRec != NULL);
180 ASSERT_ALWAYS(mysql_init(&m_mysql));
181 if(!mysql_real_connect(&m_mysql, host,
"root",
"",
"",
183 printMySQLError(m_mysql,
"mysql_real_connect() failed:");
184 ASSERT_ALWAYS(
false);
187 sprintf(text,
"use %s", databaseName);
188 mySQLExec(m_mysql, text);
191 TestThread::~TestThread(){
192 ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
194 m_state = State_Stopping;
195 ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
197 while(m_state != State_Stopped){
198 ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
200 ASSERT_ALWAYS(m_params == NULL);
201 ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
203 ASSERT_ALWAYS(pthread_cond_destroy(&m_condition)==0);
204 ASSERT_ALWAYS(pthread_mutex_destroy(&m_mutex)==0);
208 ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
209 ASSERT_ALWAYS(m_params == NULL);
211 ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
212 ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
215 void TestThread::run(){
217 ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
219 while(m_params==NULL && m_state==State_Active){
221 ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
223 if(m_state != State_Active){
225 ASSERT_ALWAYS(m_state == State_Stopping);
226 m_state = State_Stopped;
228 ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
229 ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
237 if(m_params->m_useLinkedOperations){
240 doNonLinkedAPITest();
244 ASSERT_ALWAYS(m_params != NULL);
246 ASSERT_ALWAYS(pthread_cond_signal(&m_condition)==0);
250 void TestThread::doLinkedAPITest(){
254 const Row** resultPtrs =
new const Row*[m_params->
m_depth+1];
258 for(
int iterNo = 0; iterNo<m_params->m_iterations; iterNo++){
263 if(queryDef != NULL){
267 if(m_params->m_scanLength==0){
270 builder->constValue(0),
273 parentOpDef = builder->readTuple(m_tab, rootKey);
274 }
else if(m_params->m_scanLength==1){
276 builder->constValue(m_params->m_scanLength),
281 parentOpDef = builder->scanIndex(m_index, m_tab, &eqBound);
285 builder->constValue(m_params->m_scanLength),
290 parentOpDef = builder->scanIndex(m_index, m_tab, &bound);
296 builder->linkedValue(parentOpDef,
"b"),
299 parentOpDef = builder->readTuple(m_tab, key);
301 queryDef = builder->prepare();
310 query->getQueryOperation(
i)
311 ->setResultRowRef(m_resultRec,
312 reinterpret_cast<const char*&>(resultPtrs[
i]),
315 int res = trans->
execute(NoCommit);
318 ASSERT_ALWAYS(res == 0);
323 if(outcome == NdbQuery::NextResult_scanComplete){
326 ASSERT_ALWAYS(outcome== NdbQuery::NextResult_gotRow);
331 ASSERT_ALWAYS(cnt== MAX(1,m_params->m_scanLength));
333 if ((iterNo % 5) == 0) {
345 void TestThread::doNonLinkedAPITest(){
348 for(
int iterNo = 0; iterNo<m_params->m_iterations; iterNo++){
350 if(m_params->m_scanLength>0){
351 const KeyRow highKey = { m_params->m_scanLength };
353 if(m_params->m_scanLength==1){
355 reinterpret_cast<const char*
>(&highKey),
358 reinterpret_cast<const char*>(&highKey),
367 NdbOperation::LM_Dirty,
376 reinterpret_cast<const char*
>(&highKey),
385 NdbOperation::LM_Dirty,
389 ASSERT_ALWAYS(scanOp != NULL);
391 ASSERT_ALWAYS(trans->
execute(NoCommit) == 0);
396 const Row* scanRow = NULL;
398 scanOp->nextResult(reinterpret_cast<const char**>(&scanRow),
404 ASSERT_ALWAYS(retVal== 0);
410 const KeyRow key = {row.b};
412 trans->readTuple(m_keyRec,
413 reinterpret_cast<const char*>(&key),
415 reinterpret_cast<char*>(&row),
416 NdbOperation::LM_Dirty);
417 ASSERT_ALWAYS(lookupOp != NULL);
418 ASSERT_ALWAYS(trans->
execute(NoCommit) == 0);
425 ASSERT_ALWAYS(cnt== m_params->m_scanLength);
426 scanOp->close(
false,
true);
430 const KeyRow key = {row.b};
432 trans->readTuple(m_keyRec,
433 reinterpret_cast<const char*>(&key),
435 reinterpret_cast<char*>(&row),
436 NdbOperation::LM_Dirty);
437 ASSERT_ALWAYS(lookupOp != NULL);
438 ASSERT_ALWAYS(trans->
execute(NoCommit) == 0);
446 static bool printQuery =
false;
448 void TestThread::doSQLTest(){
449 if(m_params->m_useLinkedOperations){
450 mySQLExec(m_mysql,
"set ndb_join_pushdown = on;");
452 mySQLExec(m_mysql,
"set ndb_join_pushdown = off;");
454 mySQLExec(m_mysql,
"SET SESSION query_cache_type = OFF");
460 explicit TextBuf(){m_buffer[0] =
'\0';}
463 char* tail(){
return m_buffer + strlen(m_buffer);}
468 sprintf(text.tail(),
"select * from ");
470 sprintf(text.tail(),
"%s t%d", tableName,
i);
471 if(i < m_params->m_depth){
472 sprintf(text.tail(),
", ");
474 sprintf(text.tail(),
" where ");
478 if(m_params->m_scanLength==0){
480 sprintf(text.tail(),
"t0.a=0 ");
483 sprintf(text.tail(),
"t0.a<%d ", m_params->m_scanLength);
488 sprintf(text.tail(),
"and t%d.b=t%d.a ",
i-1,
i);
491 ndbout << text.m_buffer << endl;
494 for(
int i = 0;
i < m_params->m_iterations;
i++){
495 mySQLExec(m_mysql, text.m_buffer);
500 ASSERT_ALWAYS(pthread_mutex_lock(&m_mutex)==0);
501 while(m_params!=NULL){
502 ASSERT_ALWAYS(pthread_cond_wait(&m_condition, &m_mutex)==0);
504 ASSERT_ALWAYS(pthread_mutex_unlock(&m_mutex)==0);
510 static void makeDatabase(
const char* host,
int port,
int rowCount){
512 ASSERT_ALWAYS(mysql_init(&mysql));
513 if(!mysql_real_connect(&mysql, host,
"root",
"",
"",
515 printMySQLError(mysql,
"mysql_real_connect() failed:");
516 ASSERT_ALWAYS(
false);
519 sprintf(text,
"create database if not exists %s", databaseName);
520 mySQLExec(mysql, text);
521 sprintf(text,
"use %s", databaseName);
522 mySQLExec(mysql, text);
523 sprintf(text,
"drop table if exists %s", tableName);
524 mySQLExec(mysql, text);
525 sprintf(text,
"create table %s(a int not null,"
527 "primary key(a)) ENGINE=NDB", tableName);
528 mySQLExec(mysql, text);
529 for(
int i = 0;
i<rowCount;
i++){
530 sprintf(text,
"insert into %s values(%d, %d)", tableName,
532 mySQLExec(mysql, text);
536 static void printHeading(){
537 ndbout << endl <<
"Use SQL; Use linked; Thread count; Iterations; "
538 "Scan length; Depth; Def re-use; Duration (ms); Tuples per sec;" << endl;
542 void runTest(
TestThread** threads,
int threadCount,
545 const NDB_TICKS
start = NdbTick_CurrentMillisecond();
546 for(
int i = 0;
i<threadCount;
i++){
549 for(
int i = 0;
i<threadCount;
i++){
552 const NDB_TICKS duration = NdbTick_CurrentMillisecond() -
start;
554 ndbout << param.m_useLinkedOperations <<
"; ";
555 ndbout << threadCount <<
"; ";
556 ndbout << param.m_iterations <<
"; ";
557 ndbout << param.m_scanLength <<
"; ";
558 ndbout << param.
m_depth <<
"; ";
560 ndbout << duration <<
"; ";
565 if(param.m_scanLength==0){
566 tupPerSec = threadCount *
568 (param.
m_depth+1) * 1000 / duration;
570 tupPerSec = threadCount *
573 (param.
m_depth+1) * 1000 / duration;
576 ndbout << tupPerSec <<
"; ";
581 const int threadCount = 1;
585 ndbout << endl <<
"warmUp()" << endl;
588 param.m_iterations = 10;
589 param.m_useLinkedOperations =
false;
590 param.m_scanLength = 0;
594 for(
int i = 0;
i<20;
i++){
596 runTest(threads, threadCount, param);
599 param.m_useLinkedOperations =
true;
600 for(
int i = 0;
i<20;
i++){
602 runTest(threads, threadCount, param);
606 void testLookupDepth(
bool useSQL){
607 ndbout << endl <<
"testLookupDepth()" << endl;
610 param.m_iterations = 100;
611 param.m_useLinkedOperations =
false;
612 param.m_scanLength = 0;
616 for(
int i = 0;
i<20;
i++){
618 runTest(threads, threadCount, param);
621 param.m_useLinkedOperations =
true;
622 for(
int i = 0;
i<20;
i++){
624 runTest(threads, threadCount, param);
628 void testScanDepth(
int scanLength,
bool useSQL){
629 ndbout << endl <<
"testScanDepth()" << endl;
632 param.m_iterations = 20;
633 param.m_useLinkedOperations =
false;
634 param.m_scanLength = scanLength;
637 for(
int i = 0;
i<10;
i++){
639 runTest(threads, threadCount, param);
642 param.m_useLinkedOperations =
true;
643 for(
int i = 0;
i<10;
i++){
645 runTest(threads, threadCount, param);
649 int main(
int argc,
char* argv[]){
651 if(argc!=4 && argc!=5){
652 ndbout <<
"Usage: " << argv[0] <<
" [--print-query]"
653 <<
" <mysql IP address> <mysql port> <cluster connect string>"
658 if(strcmp(argv[argno],
"--print-query")==0){
662 const char*
const host=argv[argno++];
663 const int port = atoi(argv[argno++]);
664 const char*
const connectString = argv[argno];
666 makeDatabase(host, port, 200);
669 ASSERT_ALWAYS(con.
connect(12, 5, 1) == 0);
672 const int threadCount = 1;
674 for(
int i = 0;
i<threadCount;
i++){
683 testScanDepth(50,
true);
684 testLookupDepth(
true);
686 for(
int i = 0;
i<threadCount;
i++){