18 #include "NdbInfo.hpp"
19 #include "SignalSender.hpp"
20 #include <kernel/GlobalSignalNumbers.h>
21 #include <AttributeHeader.hpp>
22 #include <signaldata/DbinfoScan.hpp>
23 #include <signaldata/TransIdAI.hpp>
24 #include <signaldata/NodeFailRep.hpp>
31 NdbInfoScanOperation::NdbInfoScanOperation(
const NdbInfo& info,
34 Uint32 max_rows, Uint32 max_bytes) :
38 m_connection(connection),
39 m_signal_sender(NULL),
43 m_max_bytes(max_bytes),
53 NdbInfoScanOperation::init(Uint32
id)
55 DBUG_ENTER(
"NdbInfoScanoperation::init");
56 if (m_state != Undefined)
64 m_transid1 = m_table->getTableId();
65 m_result_ref = m_signal_sender->getOwnRef();
67 for (
unsigned i = 0;
i < m_table->columns();
i++)
68 m_recAttrs.push_back(NULL);
75 for (Uint32
i = 1;
i < MAX_NDB_NODES;
i++)
76 m_impl.m_nodes_to_scan.
set(
i);
77 m_impl.m_nodes_to_scan.
clear(refToNode(m_result_ref));
84 NdbInfoScanOperation::~NdbInfoScanOperation()
87 delete m_signal_sender;
92 NdbInfoScanOperation::readTuples()
94 if (m_state != Initial)
95 return NdbInfo::ERR_WrongState;
102 NdbInfoScanOperation::getValue(
const char * anAttrName)
104 if (m_state != Prepared)
110 return getValue(column->m_column_id);
114 NdbInfoScanOperation::getValue(Uint32 anAttrId)
116 if (m_state != Prepared)
119 if (anAttrId >= m_recAttrs.size())
123 m_recAttrs[anAttrId] = recAttr;
129 NdbInfoScanOperation::find_next_node()
131 DBUG_ENTER(
"NdbInfoScanOperation::find_next_node");
134 m_signal_sender->find_confirmed_node(m_impl.m_nodes_to_scan);
137 DBUG_PRINT(
"info", (
"no more alive nodes"));
140 assert(m_node_id != next);
141 m_impl.m_nodes_to_scan.
clear(next);
146 DBUG_PRINT(
"info", (
"nodes: %d, max_nodes: %d", m_nodes, m_max_nodes));
147 if (m_max_nodes && m_nodes > m_max_nodes)
149 DBUG_PRINT(
"info", (
"Reached max nodes to scan"));
153 DBUG_PRINT(
"info", (
"switched to node %d", m_node_id));
158 int NdbInfoScanOperation::execute()
160 DBUG_ENTER(
"NdbInfoScanOperation::execute");
161 DBUG_PRINT(
"info", (
"name: '%s', id: %d",
162 m_table->getName(), m_table->getTableId()));
164 if (m_state != Prepared)
165 DBUG_RETURN(NdbInfo::ERR_WrongState);
167 assert(m_cursor.size() == 0);
170 m_signal_sender->lock();
172 if (!find_next_node())
174 m_signal_sender->unlock();
175 DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
178 int ret = sendDBINFO_SCANREQ();
179 m_signal_sender->unlock();
185 NdbInfoScanOperation::sendDBINFO_SCANREQ(
void)
187 DBUG_ENTER(
"NdbInfoScanOperation::sendDBINFO_SCANREQ");
193 req->resultData = m_result_data;
194 req->transId[0] = m_transid0;
195 req->transId[1] = m_transid1;
196 req->resultRef = m_result_ref;
199 req->tableId = m_table->getTableId();
200 req->colBitmap[0] = ~0;
201 req->colBitmap[1] = ~0;
202 req->requestInfo = 0;
203 req->maxRows = m_max_rows;
204 req->maxBytes = m_max_bytes;
205 DBUG_PRINT(
"info", (
"max rows: %d, max bytes: %d", m_max_rows, m_max_bytes));
208 req->returnedRows = 0;
211 Uint32* cursor_ptr = DbinfoScan::getCursorPtrSend(req);
212 for (
unsigned i = 0;
i < m_cursor.size();
i++)
214 *cursor_ptr = m_cursor[
i];
215 DBUG_PRINT(
"info", (
"cursor[%u]: 0x%x",
i, m_cursor[
i]));
218 req->cursor_sz = m_cursor.size();
221 assert((m_rows_received == 0 && m_rows_confirmed == (Uint32)~0) ||
222 m_rows_received == m_rows_confirmed);
228 m_rows_confirmed = ~0;
231 Uint32 len = DbinfoScanReq::SignalLength + req->cursor_sz;
232 if (m_signal_sender->sendSignal(m_node_id, ss, DBINFO,
233 GSN_DBINFO_SCANREQ, len) != SEND_OK)
236 DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
242 int NdbInfoScanOperation::receive(
void)
244 DBUG_ENTER(
"NdbInfoScanOperation::receive");
252 int sig_number = sig->readSignalNumber();
253 switch (sig_number) {
255 case GSN_DBINFO_TRANSID_AI:
257 if (execDBINFO_TRANSID_AI(sig))
260 if (m_rows_received < m_rows_confirmed)
264 assert(m_rows_received == m_rows_confirmed);
266 if (m_cursor.size() == 0 && !find_next_node())
268 DBUG_PRINT(
"info", (
"No cursor -> EOF"));
274 assert(m_state == MoreData);
275 int err = sendDBINFO_SCANREQ();
278 DBUG_PRINT(
"error", (
"Failed to request more data"));
279 assert(m_state == Error);
288 case GSN_DBINFO_SCANCONF:
290 if (execDBINFO_SCANCONF(sig))
293 if (m_rows_received < m_rows_confirmed)
297 assert(m_rows_received == m_rows_confirmed);
299 if (m_cursor.size() == 0 && !find_next_node())
301 DBUG_PRINT(
"info", (
"No cursor -> EOF"));
307 assert(m_state == MoreData);
308 int err = sendDBINFO_SCANREQ();
311 DBUG_PRINT(
"error", (
"Failed to request more data"));
312 assert(m_state == Error);
319 case GSN_DBINFO_SCANREF:
322 if (execDBINFO_SCANREF(sig, error))
324 assert(m_state == Error);
329 case GSN_NODE_FAILREP:
335 DBUG_PRINT(
"info", (
"Node %d where scan was runnig failed", m_node_id));
337 DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
342 case GSN_NF_COMPLETEREP:
346 case GSN_SUB_GCP_COMPLETE_REP:
347 case GSN_API_REGCONF:
348 case GSN_TAKE_OVERTCCONF:
349 case GSN_CONNECT_REP:
354 DBUG_PRINT(
"error", (
"Got unexpected signal: %d", sig_number));
364 NdbInfoScanOperation::nextResult()
366 DBUG_ENTER(
"NdbInfoScanOperation::nextResult");
372 m_signal_sender->lock();
374 m_signal_sender->unlock();
388 NdbInfoScanOperation::close()
390 DBUG_ENTER(
"NdbInfoScanOperation::close");
392 for (
unsigned i = 0;
i < m_recAttrs.size();
i++)
396 delete m_recAttrs[
i];
397 m_recAttrs[
i] = NULL;
405 NdbInfoScanOperation::execDBINFO_TRANSID_AI(
const SimpleSignal * signal)
407 DBUG_ENTER(
"NdbInfoScanOperation::execDBINFO_TRANSID_AI");
409 CAST_CONSTPTR(
TransIdAI, signal->getDataPtr());
410 if (transid->connectPtr != m_result_data ||
411 transid->transId[0] != m_transid0 ||
412 transid->transId[1] != m_transid1)
419 DBUG_PRINT(
"info", (
"rows received: %d", m_rows_received));
422 for (
unsigned i = 0;
i < m_recAttrs.size();
i++)
425 m_recAttrs[
i]->m_defined =
false;
435 const Uint32 len = attr->getByteSize();
436 DBUG_PRINT(
"info", (
"col: %u, len: %u", col, len));
437 if (col < m_recAttrs.size())
443 rec_attr->m_data = (
const char*)attr->
getDataPtr();
444 rec_attr->m_len = len;
445 rec_attr->m_defined =
true;
456 NdbInfoScanOperation::execDBINFO_SCANCONF(
const SimpleSignal * sig)
458 DBUG_ENTER(
"NdbInfoScanOperation::execDBINFO_SCANCONF");
461 if (conf->resultData != m_result_data ||
462 conf->transId[0] != m_transid0 ||
463 conf->transId[1] != m_transid1 ||
464 conf->resultRef != m_result_ref)
469 const Uint32 tableId = conf->tableId;
470 assert(tableId == m_table->getTableId());
473 assert(conf->colBitmap[0] == (Uint32)~0);
474 assert(conf->colBitmap[1] == (Uint32)~0);
475 assert(conf->requestInfo == 0);
476 assert(conf->maxRows == m_max_rows);
477 assert(conf->maxBytes == m_max_bytes);
479 DBUG_PRINT(
"info", (
"returnedRows : %d", conf->returnedRows));
482 DBUG_PRINT(
"info", (
"cursor size: %d", conf->cursor_sz));
483 assert(m_cursor.size() == 0);
484 const Uint32* cursor_ptr = DbinfoScan::getCursorPtr(conf);
485 for (
unsigned i = 0;
i < conf->cursor_sz;
i++)
487 m_cursor.push_back(*cursor_ptr);
491 assert(conf->cursor_sz == m_cursor.size());
493 assert(m_rows_confirmed == (Uint32)~0);
494 m_rows_confirmed = conf->returnedRows;
497 DBUG_PRINT(
"info", (
"received: %d, confirmed: %d", m_rows_received, m_rows_confirmed));
498 assert(m_rows_received <= m_rows_confirmed);
504 NdbInfoScanOperation::execDBINFO_SCANREF(
const SimpleSignal * signal,
507 DBUG_ENTER(
"NdbInfoScanOperation::execDBINFO_SCANREF");
511 if (ref->resultData != m_result_data ||
512 ref->transId[0] != m_transid0 ||
513 ref->transId[1] != m_transid1 ||
514 ref->resultRef != m_result_ref)
520 error_code = ref->errorCode;