19 #include <ndb_global.h>
20 #include <kernel_types.h>
25 #include <signaldata/CreateEvnt.hpp>
26 #include <signaldata/SumaImpl.hpp>
27 #include <SimpleProperties.hpp>
28 #include <Bitmask.hpp>
29 #include <AttributeHeader.hpp>
30 #include <AttributeList.hpp>
31 #include <NdbError.hpp>
32 #include <BaseString.hpp>
33 #include <UtilBuffer.hpp>
34 #include <portlib/NdbMem.h>
35 #include <signaldata/AlterTable.hpp>
36 #include "ndb_internal.hpp"
38 #include <EventLogger.hpp>
41 #define TOTAL_BUCKETS_INIT (1 << 15)
44 #if defined(VM_TRACE) && defined(NOT_USED)
48 printf(
"addr=%p gci{hi/lo}hi=%u/%u op=%d\n", (
void*)sdata,
49 sdata->gci_hi, sdata->gci_lo,
50 SubTableData::getOperation(sdata->requestInfo));
51 for (
int i = 0;
i <= 2;
i++) {
52 printf(
"sec=%d addr=%p sz=%d\n",
i, (
void*)ptr[
i].p, ptr[
i].sz);
53 for (
int j = 0; (uint) j < ptr[
i].sz; j++)
54 printf(
"%08x ", ptr[
i].p[j]);
63 EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz)
const
75 data3 = data3->m_next;
77 data2 = data2->m_next_blob;
79 full_count += tmp_count;
94 const char* eventName) :
101 DBUG_ENTER(
"NdbEventOperationImpl::NdbEventOperationImpl");
103 assert(m_ndb != NULL);
105 assert(myDict != NULL);
114 init(myEvnt->m_impl);
118 NdbEventOperationImpl::NdbEventOperationImpl(
Ndb *theNdb,
126 DBUG_ENTER(
"NdbEventOperationImpl::NdbEventOperationImpl [evnt]");
134 DBUG_ENTER(
"NdbEventOperationImpl::init");
140 m_data_done_count = 0;
147 theFirstPkAttrs[0] = NULL;
148 theCurrentPkAttrs[0] = NULL;
149 theFirstPkAttrs[1] = NULL;
150 theCurrentPkAttrs[1] = NULL;
151 theFirstDataAttrs[0] = NULL;
152 theCurrentDataAttrs[0] = NULL;
153 theFirstDataAttrs[1] = NULL;
154 theCurrentDataAttrs[1] = NULL;
157 theBlobOpList = NULL;
172 m_eventId = m_eventImpl->m_eventId;
174 m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(
this);
179 #ifdef ndb_event_stores_merge_events_flag
180 m_mergeEvents = m_eventImpl->m_mergeEvents;
182 m_mergeEvents =
false;
185 DBUG_PRINT(
"info", (
"m_ref_count = 0 for op: 0x%lx", (
long)
this));
189 DBUG_PRINT(
"exit",(
"this: 0x%lx oid: %u", (
long)
this, m_oid));
193 NdbEventOperationImpl::~NdbEventOperationImpl()
195 DBUG_ENTER(
"NdbEventOperationImpl::~NdbEventOperationImpl");
198 if (m_oid == ~(Uint32)0)
203 if (theMainOp == NULL)
206 while (tBlobOp != NULL)
209 tBlobOp = tBlobOp->m_next;
214 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid,
this);
215 DBUG_PRINT(
"exit",(
"this: %p/%p oid: %u main: %p",
216 this, m_facade, m_oid, theMainOp));
220 delete m_eventImpl->m_facade;
234 NdbEventOperationImpl::getValue(
const char *colName,
char *aValue,
int n)
236 DBUG_ENTER(
"NdbEventOperationImpl::getValue");
238 ndbout_c(
"NdbEventOperationImpl::getValue may only be called between "
239 "instantiation and execute()");
243 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
245 if (tAttrInfo == NULL) {
246 ndbout_c(
"NdbEventOperationImpl::getValue attribute %s not found",colName);
250 DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
254 NdbEventOperationImpl::getValue(
const NdbColumnImpl *tAttrInfo,
char *aValue,
int n)
256 DBUG_ENTER(
"NdbEventOperationImpl::getValue");
264 theFirstAttr = &theFirstPkAttrs[
n];
265 theCurrentAttr = &theCurrentPkAttrs[
n];
269 theFirstAttr = &theFirstDataAttrs[
n];
270 theCurrentAttr = &theCurrentDataAttrs[
n];
288 if (tAttr->setup(tAttrInfo, aValue)) {
290 m_ndb->releaseRecAttr(tAttr);
296 tAttr->setUNDEFINED();
300 if (*theFirstAttr == NULL) {
301 *theFirstAttr = tAttr;
302 *theCurrentAttr = tAttr;
305 Uint32 tAttrId = tAttrInfo->m_attrId;
306 if (tAttrId > (*theCurrentAttr)->attrId()) {
307 (*theCurrentAttr)->next(tAttr);
309 *theCurrentAttr = tAttr;
310 }
else if ((*theFirstAttr)->next() == NULL ||
311 (*theFirstAttr)->attrId() > tAttrId) {
312 tAttr->next(*theFirstAttr);
313 *theFirstAttr = tAttr;
317 while (tAttrId > p_next->attrId()) {
321 if (tAttrId == p_next->attrId()) {
323 m_ndb->releaseRecAttr(tAttr);
336 NdbEventOperationImpl::getBlobHandle(
const char *colName,
int n)
338 DBUG_ENTER(
"NdbEventOperationImpl::getBlobHandle (colName)");
340 assert(m_mergeEvents);
343 ndbout_c(
"NdbEventOperationImpl::getBlobHandle may only be called between "
344 "instantiation and execute()");
348 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
350 if (tAttrInfo == NULL) {
351 ndbout_c(
"NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
355 NdbBlob* bh = getBlobHandle(tAttrInfo, n);
360 NdbEventOperationImpl::getBlobHandle(
const NdbColumnImpl *tAttrInfo,
int n)
362 DBUG_ENTER(
"NdbEventOperationImpl::getBlobHandle");
363 DBUG_PRINT(
"info", (
"attr=%s post/pre=%d", tAttrInfo->m_name.
c_str(),
n));
368 while (tBlob != NULL) {
369 if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
372 tBlob = tBlob->theNext;
377 const bool is_tinyblob = (tAttrInfo->
getPartSize() == 0);
378 assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
382 char bename[MAX_TAB_NAME_SIZE];
386 tBlobOp = theBlobOpList;
388 while (tBlobOp != NULL) {
389 if (strcmp(tBlobOp->m_eventImpl->m_name.
c_str(), bename) == 0) {
392 tLastBlopOp = tBlobOp;
393 tBlobOp = tBlobOp->m_next;
396 DBUG_PRINT(
"info", (
"%s blob event op for %s",
397 tBlobOp ?
" reuse" :
" create", bename));
400 if (tBlobOp == NULL) {
405 dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no);
406 if (blobEvnt == NULL) {
413 m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
418 tBlobOp->theMainOp =
this;
419 tBlobOp->m_mergeEvents = m_mergeEvents;
420 tBlobOp->theBlobVersion = tAttrInfo->m_blobVersion;
423 if (tLastBlopOp == NULL)
424 theBlobOpList = tBlobOp;
426 tLastBlopOp->m_next = tBlobOp;
427 tBlobOp->m_next = NULL;
431 tBlob = m_ndb->getNdbBlob();
438 if (tBlob->atPrepare(
this, tBlobOp, tAttrInfo, n) == -1) {
440 m_ndb->releaseNdbBlob(tBlob);
445 if (tLastBlob == NULL)
448 tLastBlob->theNext = tBlob;
449 tBlob->theNext = NULL;
454 NdbEventOperationImpl::get_blob_part_no(
bool hasDist)
456 assert(theBlobVersion == 1 || theBlobVersion == 2);
457 assert(theMainOp != NULL);
458 const NdbTableImpl* mainTable = theMainOp->m_eventImpl->m_tableImpl;
459 assert(m_data_item != NULL);
464 if (unlikely(theBlobVersion == 1)) {
471 for (i = 0; i <
n; i++) {
478 assert(pos < ptr[1].sz);
479 Uint32 no = ptr[1].p[pos];
484 NdbEventOperationImpl::readBlobParts(
char*
buf,
NdbBlob* blob,
485 Uint32 part, Uint32 count, Uint16* lenLoc)
487 DBUG_ENTER_EVENT(
"NdbEventOperationImpl::readBlobParts");
488 DBUG_PRINT_EVENT(
"info", (
"part=%u count=%u post/pre=%d",
489 part, count, blob->theEventBlobVersion));
492 const bool hasDist = (blob->theStripeSize != 0);
495 DBUG_PRINT_EVENT(
"info", (
"main_data=%p", main_data));
496 assert(main_data != NULL);
500 assert(m_data_item != NULL);
501 head = m_data_item->m_next_blob;
504 if (head->m_event_op == blob_op)
506 DBUG_PRINT_EVENT(
"info", (
"found blob parts head %p", head));
509 head = head->m_next_blob;
523 blob_op->m_data_item = data;
524 int r = blob_op->receive_event();
527 Uint32 no = blob_op->get_blob_part_no(hasDist);
529 DBUG_PRINT_EVENT(
"info", (
"part_data=%p part no=%u part", data, no));
531 if (part <= no && no < part + count)
533 DBUG_PRINT_EVENT(
"info", (
"part within read range"));
535 const char* src = blob->theBlobEventDataBuf.data;
537 if (blob->theFixedDataFlag) {
538 sz = blob->thePartSize;
540 const uchar* p = (
const uchar*)blob->theBlobEventDataBuf.data;
541 sz = p[0] + (p[1] << 8);
544 memcpy(buf + (no - part) * sz, src, sz);
546 if (lenLoc != NULL) {
550 assert(sz == blob->thePartSize);
555 DBUG_PRINT_EVENT(
"info", (
"part outside read range"));
560 if (unlikely(nparts != count))
562 ndbout_c(
"nparts: %u count: %u noutside: %u", nparts, count, noutside);
564 assert(nparts == count);
566 DBUG_RETURN_EVENT(0);
572 DBUG_ENTER(
"NdbEventOperationImpl::execute");
573 m_ndb->theEventBuffer->add_drop_lock();
574 int r = execute_nolock();
575 m_ndb->theEventBuffer->add_drop_unlock();
580 NdbEventOperationImpl::execute_nolock()
582 DBUG_ENTER(
"NdbEventOperationImpl::execute_nolock");
583 DBUG_PRINT(
"info", (
"this=%p type=%s",
this, !theMainOp ?
"main" :
"blob"));
591 bool schemaTrans =
false;
592 if (m_ndb->theEventBuffer->m_total_buckets == TOTAL_BUCKETS_INIT)
594 int res = NdbDictionaryImpl::getImpl(* myDict).beginSchemaTrans(
false);
613 if (theFirstPkAttrs[0] == NULL &&
614 theFirstDataAttrs[0] == NULL) {
617 m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
619 mi_type= m_eventImpl->mi_type;
620 m_ndb->theEventBuffer->add_op();
624 m_stop_gci= ~(Uint64)0;
625 DBUG_PRINT(
"info", (
"m_ref_count: %u for op: %p", m_ref_count,
this));
627 int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*
this,
634 if (buckets == ~ (Uint32)0)
635 buckets = m_ndb->theImpl->theNoOfDBnodes;
637 m_ndb->theEventBuffer->set_total_buckets(buckets);
644 if (theMainOp == NULL) {
645 DBUG_PRINT(
"info", (
"execute blob ops"));
647 while (blob_op != NULL) {
648 r = blob_op->execute_nolock();
657 blob_op = blob_op->m_next;
670 DBUG_PRINT(
"info", (
"m_ref_count: %u for op: %p", m_ref_count,
this));
675 m_ndb->theEventBuffer->remove_op();
687 NdbEventOperationImpl::stop()
689 DBUG_ENTER(
"NdbEventOperationImpl::stop");
692 for (i=0 ; i<2; i++) {
696 m_ndb->releaseRecAttr(p);
699 theFirstPkAttrs[
i]= 0;
701 for (i=0 ; i<2; i++) {
705 m_ndb->releaseRecAttr(p);
708 theFirstDataAttrs[
i]= 0;
722 m_ndb->theEventBuffer->add_drop_lock();
723 int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*
this);
724 m_ndb->theEventBuffer->remove_op();
731 Uint64 gci= m_ndb->theEventBuffer->m_highest_sub_gcp_complete_GCI;
735 gci += Uint64(3) << 32;
745 m_ndb->theEventBuffer->add_drop_unlock();
749 m_error.
code= NdbDictionaryImpl::getImpl(*myDict).m_error.
code;
751 m_ndb->theEventBuffer->add_drop_unlock();
762 return (
bool)AlterTableReq::getFrmFlag(m_change_mask);
767 return (
bool)AlterTableReq::getFragDataFlag(m_change_mask);
772 return (
bool)AlterTableReq::getRangeListFlag(m_change_mask);
776 NdbEventOperationImpl::getGCI()
778 Uint32 gci_hi = m_data_item->sdata->gci_hi;
779 Uint32 gci_lo = m_data_item->sdata->gci_lo;
780 return gci_lo | (Uint64(gci_hi) << 32);
786 return m_data_item->sdata->anyValue;
790 NdbEventOperationImpl::getLatestGCI()
792 return m_ndb->theEventBuffer->getLatestGCI();
799 Uint32 transId1 = m_data_item->sdata->transId1;
800 Uint32 transId2 = m_data_item->sdata->transId2;
801 return Uint64(transId1) << 32 | transId2;
805 NdbEventOperationImpl::execSUB_TABLE_DATA(
const NdbApiSignal * signal,
808 DBUG_ENTER(
"NdbEventOperationImpl::execSUB_TABLE_DATA");
812 if(signal->isFirstFragment()){
813 m_fragmentId = signal->getFragmentId();
814 m_buffer.grow(4 * sdata->totalLen);
816 if(m_fragmentId != signal->getFragmentId()){
821 const Uint32 i = SubTableData::DICT_TAB_INFO;
822 DBUG_PRINT(
"info", (
"Accumulated %u bytes for fragment %u",
823 4 * ptr[i].sz, m_fragmentId));
824 m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
826 if(!signal->isLastFragment()){
835 NdbEventOperationImpl::receive_event()
838 SubTableData::getOperation(m_data_item->sdata->requestInfo);
839 if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
841 DBUG_ENTER(
"NdbEventOperationImpl::receive_event");
842 DBUG_PRINT(
"info",(
"sdata->operation %u this: %p", operation,
this));
843 m_ndb->theImpl->incClientStat(Ndb::NonDataEventsRecvdCount, 1);
844 if (operation == NdbDictionary::Event::_TE_ALTER)
853 m_change_mask = m_data_item->sdata->changeMask;
854 error.
code = dif.parseTableInfo(&at,
855 (Uint32*)m_buffer.get_data(),
856 m_buffer.length() / 4,
859 if (unlikely(error.
code))
861 DBUG_PRINT(
"info", (
"Failed to parse DictTabInfo error %u",
863 ndbout_c(
"Failed to parse DictTabInfo error %u", error.
code);
866 at->buildColumnHash();
869 m_eventImpl->m_tableImpl = at;
871 DBUG_PRINT(
"info", (
"switching table impl 0x%lx -> 0x%lx",
872 (
long) tmp_table_impl, (
long) at));
876 for (i = 0; i < 2; i++)
883 DBUG_PRINT(
"info", (
"rec_attr: 0x%lx "
884 "switching column impl 0x%lx -> 0x%lx",
885 (
long) p, (
long) p->m_column, (
long) tAttrInfo));
886 p->m_column = tAttrInfo;
890 for (i = 0; i < 2; i++)
897 DBUG_PRINT(
"info", (
"rec_attr: 0x%lx "
898 "switching column impl 0x%lx -> 0x%lx",
899 (
long) p, (
long) p->m_column, (
long) tAttrInfo));
900 p->m_column = tAttrInfo;
910 DBUG_PRINT(
"info", (
"blob_handle: 0x%lx "
911 "switching column impl 0x%lx -> 0x%lx",
912 (
long) p, (
long) p->theColumn, (
long) tAttrInfo));
913 p->theColumn = tAttrInfo;
917 delete tmp_table_impl;
922 DBUG_ENTER_EVENT(
"NdbEventOperationImpl::receive_event");
923 DBUG_PRINT_EVENT(
"info",(
"sdata->operation %u this: %p", operation,
this));
925 m_ndb->theImpl->incClientStat(Ndb::DataEventsRecvdCount, 1);
927 int is_insert= operation == NdbDictionary::Event::_TE_INSERT;
929 Uint32 *aAttrPtr = m_data_item->ptr[0].p;
930 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
931 Uint32 *aDataPtr = m_data_item->ptr[1].p;
933 DBUG_DUMP_EVENT(
"after",(
char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4);
934 DBUG_DUMP_EVENT(
"before",(
char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4);
945 assert(aAttrPtr < aAttrEndPtr);
947 assert(tAttr->attrId() ==
949 receive_data(tAttr, aDataPtr, tDataSz);
951 receive_data(tAttr1, aDataPtr, tDataSz);
953 tAttr1->setUNDEFINED();
954 tAttr1= tAttr1->next();
957 aDataPtr+= (tDataSz + 3) >> 2;
958 tAttr= tAttr->next();
962 NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
967 int hasSomeData= (operation != NdbDictionary::Event::_TE_UPDATE);
968 while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
969 tRecAttrId = tWorkingRecAttr->attrId();
973 while (tAttrId > tRecAttrId) {
974 DBUG_PRINT_EVENT(
"info",(
"undef [%u] %u 0x%x [%u] 0x%x",
975 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
976 tWorkingRecAttr->setUNDEFINED();
977 tWorkingRecAttr = tWorkingRecAttr->next();
978 if (tWorkingRecAttr == NULL)
980 tRecAttrId = tWorkingRecAttr->attrId();
982 if (tWorkingRecAttr == NULL)
985 if (tAttrId == tRecAttrId) {
988 DBUG_PRINT_EVENT(
"info",(
"set [%u] %u 0x%x [%u] 0x%x",
989 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr));
991 receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
992 tWorkingRecAttr = tWorkingRecAttr->next();
995 aDataPtr += (tDataSz + 3) >> 2;
998 while (tWorkingRecAttr != NULL) {
999 tRecAttrId = tWorkingRecAttr->attrId();
1002 tWorkingRecAttr->setUNDEFINED();
1003 tWorkingRecAttr = tWorkingRecAttr->next();
1006 tWorkingRecAttr = theFirstDataAttrs[1];
1007 aDataPtr = m_data_item->ptr[2].p;
1008 Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz;
1009 while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
1010 tRecAttrId = tWorkingRecAttr->attrId();
1014 while (tAttrId > tRecAttrId) {
1015 tWorkingRecAttr->setUNDEFINED();
1016 tWorkingRecAttr = tWorkingRecAttr->next();
1017 if (tWorkingRecAttr == NULL)
1019 tRecAttrId = tWorkingRecAttr->attrId();
1021 if (tWorkingRecAttr == NULL)
1023 if (tAttrId == tRecAttrId) {
1024 assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->
getPrimaryKey());
1027 receive_data(tWorkingRecAttr, aDataPtr, tDataSz);
1028 tWorkingRecAttr = tWorkingRecAttr->next();
1030 aDataPtr += (tDataSz + 3) >> 2;
1032 while (tWorkingRecAttr != NULL) {
1033 tWorkingRecAttr->setUNDEFINED();
1034 tWorkingRecAttr = tWorkingRecAttr->next();
1039 DBUG_RETURN_EVENT(1);
1042 DBUG_RETURN_EVENT(0);
1046 NdbEventOperationImpl::getEventType()
1049 (1 << SubTableData::getOperation(m_data_item->sdata->requestInfo));
1055 NdbEventOperationImpl::print()
1058 ndbout <<
"EventId " << m_eventId <<
"\n";
1060 for (i = 0; i < 2; i++) {
1062 ndbout <<
" %u " <<
i;
1064 ndbout <<
" : " << p->attrId() <<
" = " << *p;
1069 for (i = 0; i < 2; i++) {
1071 ndbout <<
" %u " <<
i;
1073 ndbout <<
" : " << p->attrId() <<
" = " << *p;
1081 NdbEventOperationImpl::printAll()
1083 Uint32 *aAttrPtr = m_data_item->ptr[0].p;
1084 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz;
1085 Uint32 *aDataPtr = m_data_item->ptr[1].p;
1091 for (; aAttrPtr < aAttrEndPtr; ) {
1096 aDataPtr += tDataSz;
1104 NdbEventBuffer::NdbEventBuffer(
Ndb *ndb) :
1105 m_total_buckets(TOTAL_BUCKETS_INIT),
1109 m_latestGCI(0), m_latest_complete_GCI(0),
1110 m_highest_sub_gcp_complete_GCI(0),
1111 m_latest_poll_GCI(0),
1114 m_min_free_thresh(0),
1115 m_max_free_thresh(0),
1116 m_gci_slip_thresh(0),
1118 m_active_op_count(0),
1122 m_latest_command=
"NdbEventBuffer::NdbEventBuffer";
1126 if ((p_cond = NdbCondition_Create()) == NULL) {
1127 ndbout_c(
"NdbEventHandle: NdbCondition_Create() failed");
1136 m_free_data_count= 0;
1143 m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
1147 init_gci_containers();
1149 m_alive_node_bit_mask.clear();
1152 NdbEventBuffer::~NdbEventBuffer()
1156 while ((op = m_dropped_ev_op))
1158 m_dropped_ev_op = m_dropped_ev_op->m_next;
1159 delete op->m_facade;
1163 Uint32 sz= m_active_gci.size();
1165 for(j = 0; j < sz; j++)
1167 array[j].~Gci_container();
1170 for (j= 0; j < m_allocated_data.size(); j++)
1172 unsigned sz= m_allocated_data[j]->sz;
1175 for (; data < end_data; data++)
1178 NdbMem_Free(data->sdata);
1180 NdbMem_Free((
char*)m_allocated_data[j]);
1183 NdbCondition_Destroy(p_cond);
1187 NdbEventBuffer::add_op()
1189 if(m_active_op_count == 0)
1191 init_gci_containers();
1193 m_active_op_count++;
1197 NdbEventBuffer::remove_op()
1199 m_active_op_count--;
1203 NdbEventBuffer::init_gci_containers()
1205 m_startup_hack =
true;
1206 bzero(&m_complete_data,
sizeof(m_complete_data));
1207 m_latest_complete_GCI = m_latestGCI = m_latest_poll_GCI = 0;
1208 m_active_gci.clear();
1209 m_active_gci.fill(3, g_empty_gci_container);
1210 m_min_gci_index = m_max_gci_index = 1;
1212 m_known_gci.clear();
1213 m_known_gci.fill(7, gci);
1216 int NdbEventBuffer::expand(
unsigned sz)
1218 unsigned alloc_size=
1219 sizeof(EventBufData_chunk) +(sz-1)*
sizeof(
EventBufData);
1220 EventBufData_chunk *chunk_data=
1221 (EventBufData_chunk *)NdbMem_Allocate(alloc_size);
1224 m_allocated_data.push_back(chunk_data);
1231 for (; data < end_data; data++)
1233 data->m_next= last_data;
1236 m_free_data= last_data;
1240 m_free_data_count+= sz;
1246 NdbEventBuffer::pollEvents(
int aMillisecondNumber, Uint64 *latestGCI)
1250 const char *m_latest_command_save= m_latest_command;
1251 m_latest_command=
"NdbEventBuffer::pollEvents";
1254 NdbMutex_Lock(m_mutex);
1256 if (unlikely(ev_op == 0 && aMillisecondNumber))
1258 NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
1260 if (unlikely(ev_op == 0))
1263 m_latest_poll_GCI= m_latestGCI;
1269 ev_op->m_data_count-= ev_op->m_data_done_count;
1270 ev_op->m_data_done_count= 0;
1272 m_latest_command= m_latest_command_save;
1274 if (unlikely(ev_op == 0))
1280 deleteUsedEventOperations(m_latest_poll_GCI);
1282 NdbMutex_Unlock(m_mutex);
1285 *latestGCI= m_latest_poll_GCI;
1296 Uint64 * array = m_known_gci.getBase();
1297 Uint32 mask = m_known_gci.size() - 1;
1298 Uint32 minpos = m_min_gci_index;
1299 Uint32 maxpos = m_max_gci_index;
1301 g_eventLogger->
info(
"Flushing incomplete GCI:s < %u/%u",
1302 Uint32(gci >> 32), Uint32(gci));
1303 while (minpos != maxpos && array[minpos] < gci)
1307 assert(maxpos == m_max_gci_index);
1309 if(!tmp->m_data.is_empty())
1311 free_list(tmp->m_data);
1313 tmp->~Gci_container();
1315 minpos = (minpos + 1) & mask;
1318 m_min_gci_index = minpos;
1328 NdbEventBuffer::nextEvent()
1330 DBUG_ENTER_EVENT(
"NdbEventBuffer::nextEvent");
1332 const char *m_latest_command_save= m_latest_command;
1335 if (m_used_data.m_count > 1024)
1338 m_latest_command=
"NdbEventBuffer::nextEvent (lock)";
1340 NdbMutex_Lock(m_mutex);
1342 free_list(m_used_data);
1344 NdbMutex_Unlock(m_mutex);
1347 m_latest_command=
"NdbEventBuffer::nextEvent";
1352 while ((data= m_available_data.m_head))
1360 if (!op && !isConsistent(gci))
1362 DBUG_PRINT_EVENT(
"info", (
"detected inconsistent gci %u", gci));
1363 DBUG_RETURN_EVENT(0);
1366 DBUG_PRINT_EVENT(
"info", (
"available data=%p op=%p", data, op));
1372 assert(op->theMainOp == NULL);
1375 op->m_data_item= data;
1378 Uint32 full_count, full_sz;
1379 m_available_data.remove_first(full_count, full_sz);
1382 m_used_data.append_used_data(data, full_count, full_sz);
1384 m_ndb->theImpl->incClientStat(Ndb::EventBytesRecvdCount, full_sz);
1387 op->m_data_done_count++;
1392 int r= op->receive_event();
1396 m_latest_command= m_latest_command_save;
1398 NdbBlob* tBlob = op->theBlobList;
1399 while (tBlob != NULL)
1401 (void)tBlob->atNextEvent();
1402 tBlob = tBlob->theNext;
1405 while (gci_ops && op->getGCI() > gci_ops->m_gci)
1407 gci_ops = m_available_data.delete_next_gci_ops();
1409 if (!gci_ops->m_consistent)
1410 DBUG_RETURN_EVENT(0);
1411 assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
1413 if (SubTableData::getOperation(data->sdata->requestInfo) ==
1414 NdbDictionary::Event::_TE_NUL)
1416 DBUG_PRINT_EVENT(
"info", (
"skip _TE_NUL"));
1419 DBUG_RETURN_EVENT(op->m_facade);
1426 m_latest_command= m_latest_command_save;
1431 m_latest_command= m_latest_command_save;
1437 gci_ops = m_available_data.delete_next_gci_ops();
1443 if (m_dropped_ev_op)
1445 NdbMutex_Lock(m_mutex);
1446 deleteUsedEventOperations(m_latest_poll_GCI);
1447 NdbMutex_Unlock(m_mutex);
1449 DBUG_RETURN_EVENT(0);
1453 NdbEventBuffer::isConsistent(Uint64& gci)
1455 DBUG_ENTER(
"NdbEventBuffer::isConsistent");
1459 if (!gci_ops->m_consistent)
1461 gci = gci_ops->m_gci;
1464 gci_ops = gci_ops->m_next;
1471 NdbEventBuffer::isConsistentGCI(Uint64 gci)
1473 DBUG_ENTER(
"NdbEventBuffer::isConsistentGCI");
1477 if (gci_ops->m_gci == gci && !gci_ops->m_consistent)
1479 gci_ops = gci_ops->m_next;
1487 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
1489 DBUG_ENTER(
"NdbEventBuffer::getGCIEventOperations");
1491 if (*iter < gci_ops->m_gci_op_count)
1494 if (event_types != NULL)
1495 *event_types = g.event_types;
1496 DBUG_PRINT(
"info", (
"gci: %u g.op: 0x%lx g.event_types: 0x%lx",
1497 (
unsigned)gci_ops->m_gci, (
long) g.op,
1498 (
long) g.event_types));
1505 NdbEventBuffer::deleteUsedEventOperations(Uint64 last_consumed_gci)
1508 while (op && op->m_stop_gci)
1510 if (last_consumed_gci > op->m_stop_gci)
1517 if (op->m_ref_count == 0)
1520 op->m_next->m_prev = op->m_prev;
1522 op->m_prev->m_next = op->m_next;
1524 m_dropped_ev_op = op->m_next;
1525 delete op->m_facade;
1540 out <<
"[ GCI: " << (gci.m_gci >> 32) <<
"/" << (gci.m_gci & 0xFFFFFFFF)
1541 <<
" state: " << hex << gci.m_state
1542 <<
" head: " << hex << gci.m_data.m_head
1543 <<
" tail: " << hex << gci.m_data.m_tail
1545 <<
" cnt: " << dec << gci.m_data.m_count
1547 <<
" gcp: " << dec << gci.m_gcp_complete_rep_count
1563 NdbEventBuffer::resize_known_gci()
1565 Uint32 minpos = m_min_gci_index;
1566 Uint32 maxpos = m_max_gci_index;
1567 Uint32 mask = m_known_gci.size() - 1;
1570 Uint32 newsize = 2 * (mask + 1);
1571 m_known_gci.fill(newsize - 1, fill);
1572 Uint64 * array = m_known_gci.getBase();
1576 printf(
"before (%u): ", minpos);
1577 for (Uint32 i = minpos; i != maxpos; i = (i + 1) & mask)
1579 Uint32(array[i] >> 32),
1584 Uint32 idx = mask + 1;
1585 if (0) printf(
"swapping ");
1586 while (minpos != maxpos)
1588 if (0) printf(
"%u-%u ", minpos, idx);
1589 Uint64 tmp = array[idx];
1590 array[idx] = array[minpos];
1591 array[minpos] = tmp;
1594 minpos = (minpos + 1) & mask;
1596 if (0) printf(
"\n");
1598 minpos = m_min_gci_index = mask + 1;
1599 maxpos = m_max_gci_index = idx;
1600 assert(minpos < maxpos);
1604 ndbout_c(
"resize_known_gci from %u to %u", (mask + 1), newsize);
1606 for (Uint32 i = minpos; i < maxpos; i++)
1609 Uint32(array[i] >> 32),
1616 Uint64 gci = array[minpos];
1617 for (Uint32 i = minpos + 1; i<maxpos; i++)
1619 assert(array[i] > gci);
1627 NdbEventBuffer::verify_known_gci(
bool allowempty)
1629 Uint32 minpos = m_min_gci_index;
1630 Uint32 maxpos = m_max_gci_index;
1631 Uint32 mask = m_known_gci.size() - 1;
1634 #define MMASSERT(x) { if (!(x)) { line = __LINE__; goto fail; }}
1635 if (m_min_gci_index == m_max_gci_index)
1637 MMASSERT(allowempty);
1638 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1639 MMASSERT(((
Gci_container*)(m_active_gci.getBase()+
i))->m_gci == 0);
1644 Uint64 last = m_known_gci[minpos];
1645 MMASSERT(last > m_latestGCI);
1646 MMASSERT(find_bucket(last) != 0);
1647 MMASSERT(maxpos == m_max_gci_index);
1649 minpos = (minpos + 1) & mask;
1650 while (minpos != maxpos)
1652 MMASSERT(m_known_gci[minpos] > last);
1653 last = m_known_gci[minpos];
1654 MMASSERT(find_bucket(last) != 0);
1655 MMASSERT(maxpos == m_max_gci_index);
1656 minpos = (minpos + 1) & mask;
1662 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1664 if (bucktets[i].m_gci)
1667 for (Uint32 j = m_min_gci_index; j != m_max_gci_index;
1670 if (m_known_gci[j] == bucktets[i].m_gci)
1677 ndbout_c(
"%u/%u not found",
1678 Uint32(bucktets[i].m_gci >> 32),
1679 Uint32(bucktets[i].m_gci));
1680 MMASSERT(found ==
true);
1687 ndbout_c(
"assertion at %d", line);
1688 printf(
"known gci: ");
1689 for (Uint32 i = m_min_gci_index; i != m_max_gci_index; i = (i + 1) & mask)
1691 printf(
"%u/%u ", Uint32(m_known_gci[i] >> 32), Uint32(m_known_gci[i]));
1694 printf(
"\nContainers");
1695 for (Uint32 i = 0; i<m_active_gci.size(); i++)
1696 ndbout << m_active_gci[i] << endl;
1702 NdbEventBuffer::find_bucket_chained(Uint64 gci)
1705 printf(
"find_bucket_chained(%u/%u) ", Uint32(gci >> 32), Uint32(gci));
1706 if (unlikely(gci <= m_latestGCI))
1712 ndbout_c(
"already complete (%u/%u)",
1713 Uint32(m_latestGCI >> 32),
1714 Uint32(m_latestGCI));
1718 if (unlikely(m_total_buckets == 0))
1723 Uint32 pos = Uint32(gci & ACTIVE_GCI_MASK);
1724 Uint32
size = m_active_gci.size();
1728 Uint64 cmp = (buckets + pos)->m_gci;
1732 ndbout_c(
"found pos: %u", pos);
1733 return buckets + pos;
1739 ndbout_c(
"empty(%u) ", pos);
1740 Uint32 search = pos + ACTIVE_GCI_DIRECTORY_SIZE;
1741 while (search < size)
1743 if ((buckets + search)->m_gci == gci)
1745 memcpy(buckets + pos, buckets + search,
sizeof(
Gci_container));
1748 printf(
"moved from %u to %u", search, pos);
1749 if (search == size - 1)
1751 m_active_gci.erase(search);
1753 ndbout_c(
" shrink");
1760 return buckets + pos;
1762 search += ACTIVE_GCI_DIRECTORY_SIZE;
1766 pos += ACTIVE_GCI_DIRECTORY_SIZE;
1773 ndbout_c(
"new (with expand) ");
1774 m_active_gci.fill(pos, g_empty_gci_container);
1778 bucket->m_gci = gci;
1779 bucket->m_gcp_complete_rep_count = m_total_buckets;
1781 Uint32 mask = m_known_gci.size() - 1;
1782 Uint64 * array = m_known_gci.getBase();
1784 Uint32 minpos = m_min_gci_index;
1785 Uint32 maxpos = m_max_gci_index;
1786 bool full = ((maxpos + 1) & mask) == minpos;
1790 minpos = m_min_gci_index;
1791 maxpos = m_max_gci_index;
1792 mask = m_known_gci.size() - 1;
1793 array = m_known_gci.getBase();
1796 Uint32 maxindex = (maxpos - 1) & mask;
1797 Uint32 newmaxpos = (maxpos + 1) & mask;
1798 m_max_gci_index = newmaxpos;
1799 if (likely(minpos == maxpos || gci > array[maxindex]))
1801 array[maxpos] = gci;
1803 verify_known_gci(
false);
1808 for (pos = minpos; pos != maxpos; pos = (pos + 1) & mask)
1810 if (array[pos] > gci)
1815 ndbout_c(
"insert %u/%u (max %u/%u) at pos %u (min: %u max: %u)",
1818 Uint32(array[maxindex] >> 32),
1819 Uint32(array[maxindex]),
1821 m_min_gci_index, m_max_gci_index);
1823 assert(pos != maxpos);
1826 oldgci = array[pos];
1829 pos = (pos + 1) & mask;
1830 }
while (pos != maxpos);
1834 verify_known_gci(
false);
1841 crash_on_invalid_SUB_GCP_COMPLETE_REP(
const Gci_container* bucket,
1845 Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
1847 ndbout_c(
"INVALID SUB_GCP_COMPLETE_REP");
1848 ndbout_c(
"gci_hi: %u", rep->gci_hi);
1849 ndbout_c(
"gci_lo: %u", rep->gci_lo);
1850 ndbout_c(
"sender: %x", rep->senderRef);
1851 ndbout_c(
"count: %d", rep->gcp_complete_rep_count);
1852 ndbout_c(
"bucket count: %u", old_cnt);
1853 ndbout_c(
"total buckets: %u", buckets);
1860 Uint64 gci = bucket->m_gci;
1864 ndbout_c(
"complete %u/%u pos: %u", Uint32(gci >> 32), Uint32(gci),
1865 Uint32(bucket - buckets));
1868 verify_known_gci(
false);
1874 if(!bucket->m_data.is_empty())
1877 assert(bucket->m_data.m_count);
1879 m_complete_data.m_data.append_list(&bucket->m_data, gci);
1880 if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1887 assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1888 m_complete_data.m_data.m_gci_ops_list_tail->m_consistent =
false;
1893 if (bucket->m_state & Gci_container::GC_INCONSISTENT)
1903 dummy_event_list->append_used_data(dummy_data);
1904 dummy_event_list->m_is_not_multi_list =
true;
1905 m_complete_data.m_data.append_list(dummy_event_list, gci);
1906 assert(m_complete_data.m_data.m_gci_ops_list_tail != NULL);
1907 m_complete_data.m_data.m_gci_ops_list_tail->m_consistent =
false;
1911 Uint32 minpos = m_min_gci_index;
1912 Uint32 mask = m_known_gci.size() - 1;
1913 assert((mask & (mask + 1)) == 0);
1917 m_min_gci_index = (minpos + 1) & mask;
1920 verify_known_gci(
true);
1926 Uint32 len,
int complete_cluster_failure)
1928 Uint32 gci_hi = rep->gci_hi;
1929 Uint32 gci_lo = rep->gci_lo;
1931 if (unlikely(len < SubGcpCompleteRep::SignalLength))
1936 const Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
1937 if (gci > m_highest_sub_gcp_complete_GCI)
1938 m_highest_sub_gcp_complete_GCI = gci;
1940 if (!complete_cluster_failure)
1942 m_alive_node_bit_mask.
set(refToNode(rep->senderRef));
1944 if (unlikely(m_active_op_count == 0))
1950 DBUG_ENTER_EVENT(
"NdbEventBuffer::execSUB_GCP_COMPLETE_REP");
1952 const Uint32 cnt= rep->gcp_complete_rep_count;
1957 ndbout_c(
"execSUB_GCP_COMPLETE_REP(%u/%u) cnt: %u from %x flags: 0x%x",
1958 Uint32(gci >> 32), Uint32(gci), cnt, rep->senderRef,
1961 if (unlikely(rep->flags & (SubGcpCompleteRep::ADD_CNT |
1962 SubGcpCompleteRep::SUB_CNT)))
1964 handle_change_nodegroup(rep);
1967 if (unlikely(bucket == 0))
1974 Uint64 minGCI = m_known_gci[m_min_gci_index];
1975 ndbout_c(
"bucket == 0, gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
1976 Uint32(gci >> 32), Uint32(gci),
1977 Uint32(minGCI >> 32), Uint32(minGCI),
1978 Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
1979 ndbout <<
" complete: " << m_complete_data << endl;
1980 for(Uint32 i = 0; i<m_active_gci.size(); i++)
1983 ndbout << i <<
" - " << m_active_gci[i] << endl;
1986 DBUG_VOID_RETURN_EVENT;
1989 if (rep->flags & SubGcpCompleteRep::MISSING_DATA)
1991 bucket->m_state = Gci_container::GC_INCONSISTENT;
1994 Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
1995 if(unlikely(old_cnt == ~(Uint32)0))
1997 old_cnt = m_total_buckets;
2001 if (unlikely(! (old_cnt >= cnt)))
2003 crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_total_buckets);
2005 bucket->m_gcp_complete_rep_count = old_cnt - cnt;
2009 Uint64 minGCI = m_known_gci[m_min_gci_index];
2010 if(likely(minGCI == 0 || gci == minGCI))
2013 m_startup_hack =
false;
2014 complete_bucket(bucket);
2015 m_latestGCI = m_complete_data.m_gci = gci;
2018 if(unlikely(m_latest_complete_GCI > gci))
2020 complete_outof_order_gcis();
2025 NdbCondition_Signal(p_cond);
2029 if (unlikely(m_startup_hack))
2032 bucket = find_bucket(gci);
2034 assert(bucket->m_gci == gci);
2038 g_eventLogger->
info(
"out of order bucket: %d gci: %u/%u minGCI: %u/%u m_latestGCI: %u/%u",
2040 Uint32(gci >> 32), Uint32(gci),
2041 Uint32(minGCI >> 32), Uint32(minGCI),
2042 Uint32(m_latestGCI >> 32), Uint32(m_latestGCI));
2043 bucket->m_state = Gci_container::GC_COMPLETE;
2044 bucket->m_gcp_complete_rep_count = 1;
2045 m_latest_complete_GCI = gci;
2049 DBUG_VOID_RETURN_EVENT;
2053 NdbEventBuffer::complete_outof_order_gcis()
2056 verify_known_gci(
false);
2059 Uint64 * array = m_known_gci.getBase();
2060 Uint32 mask = m_known_gci.size() - 1;
2061 Uint32 minpos = m_min_gci_index;
2062 Uint32 maxpos = m_max_gci_index;
2063 Uint64 stop_gci = m_latest_complete_GCI;
2065 Uint64 start_gci = array[minpos];
2066 g_eventLogger->
info(
"complete_outof_order_gcis from: %u/%u(%u) to: %u/%u(%u)",
2067 Uint32(start_gci >> 32), Uint32(start_gci), minpos,
2068 Uint32(stop_gci >> 32), Uint32(stop_gci), maxpos);
2070 assert(start_gci <= stop_gci);
2073 start_gci = array[minpos];
2076 assert(maxpos == m_max_gci_index);
2077 if (!(bucket->m_state & Gci_container::GC_COMPLETE))
2080 verify_known_gci(
false);
2086 ndbout_c(
"complete_outof_order_gcis - completing %u/%u rows: %u",
2087 Uint32(start_gci >> 32), Uint32(start_gci), bucket->m_data.m_count);
2089 ndbout_c(
"complete_outof_order_gcis - completing %u/%u",
2090 Uint32(start_gci >> 32), Uint32(start_gci));
2093 complete_bucket(bucket);
2094 m_latestGCI = m_complete_data.m_gci = start_gci;
2097 verify_known_gci(
true);
2099 minpos = (minpos + 1) & mask;
2100 }
while (start_gci != stop_gci);
2109 DBUG_PRINT(
"info", (
"gci{hi/lo}: %u/%u", data.gci_hi, data.gci_lo));
2112 if (impl->m_stop_gci == ~Uint64(0))
2114 oid_ref = impl->m_oid;
2115 insertDataL(impl, &data, SubTableData::SignalLength, ptr);
2118 while (blob_op != NULL)
2120 if (blob_op->m_stop_gci == ~Uint64(0))
2122 oid_ref = blob_op->m_oid;
2123 insertDataL(blob_op, &data, SubTableData::SignalLength, ptr);
2125 blob_op = blob_op->m_next;
2127 }
while((impl = impl->m_next));
2131 NdbEventBuffer::find_max_known_gci(Uint64 * res)
const
2133 const Uint64 * array = m_known_gci.getBase();
2134 Uint32 mask = m_known_gci.size() - 1;
2135 Uint32 minpos = m_min_gci_index;
2136 Uint32 maxpos = m_max_gci_index;
2138 if (minpos == maxpos)
2143 * res = array[(maxpos - 1) & mask];
2152 Uint64 gci = (Uint64(rep->gci_hi) << 32) | rep->gci_lo;
2153 Uint32 cnt = (rep->flags >> 16);
2154 Uint64 * array = m_known_gci.getBase();
2155 Uint32 mask = m_known_gci.size() - 1;
2156 Uint32 minpos = m_min_gci_index;
2157 Uint32 maxpos = m_max_gci_index;
2159 if (rep->flags & SubGcpCompleteRep::ADD_CNT)
2161 ndbout_c(
"handle_change_nodegroup(add, cnt=%u,gci=%u/%u)",
2162 cnt, Uint32(gci >> 32), Uint32(gci));
2165 Uint32 pos = minpos;
2166 for (; pos != maxpos; pos = (pos + 1) & mask)
2168 if (array[pos] == gci)
2171 if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2174 ndbout_c(
" - gci %u/%u already marked complete",
2175 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2181 ndbout_c(
" - gci %u/%u marking (and increasing)",
2182 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2183 tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2184 tmp->m_gcp_complete_rep_count += cnt;
2190 ndbout_c(
" - ignore %u/%u",
2191 Uint32(array[pos] >> 32), Uint32(array[pos]));
2197 ndbout_c(
" - NOT FOUND (total: %u cnt: %u)", m_total_buckets, cnt);
2206 m_total_buckets += cnt;
2208 pos = (pos + 1) & mask;
2209 for (; pos != maxpos; pos = (pos + 1) & mask)
2211 assert(array[pos] > gci);
2213 assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2214 tmp->m_gcp_complete_rep_count += cnt;
2215 ndbout_c(
" - increasing cnt on %u/%u by %u",
2216 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci), cnt);
2219 else if (rep->flags & SubGcpCompleteRep::SUB_CNT)
2221 ndbout_c(
"handle_change_nodegroup(sub, cnt=%u,gci=%u/%u)",
2222 cnt, Uint32(gci >> 32), Uint32(gci));
2225 Uint32 pos = minpos;
2226 for (; pos != maxpos; pos = (pos + 1) & mask)
2228 if (array[pos] == gci)
2231 if (tmp->m_state & Gci_container::GC_CHANGE_CNT)
2234 ndbout_c(
" - gci %u/%u already marked complete",
2235 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2241 ndbout_c(
" - gci %u/%u marking",
2242 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2243 tmp->m_state |= Gci_container::GC_CHANGE_CNT;
2249 ndbout_c(
" - ignore %u/%u",
2250 Uint32(array[pos] >> 32), Uint32(array[pos]));
2256 ndbout_c(
" - NOT FOUND");
2265 m_total_buckets -= cnt;
2267 pos = (pos + 1) & mask;
2268 for (; pos != maxpos; pos = (pos + 1) & mask)
2270 assert(array[pos] > gci);
2272 assert((tmp->m_state & Gci_container::GC_CHANGE_CNT) == 0);
2273 tmp->m_gcp_complete_rep_count -= cnt;
2274 ndbout_c(
" - decreasing cnt on %u/%u by %u to: %u",
2275 Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci),
2277 tmp->m_gcp_complete_rep_count);
2283 NdbEventBuffer::set_total_buckets(Uint32 cnt)
2285 if (m_total_buckets == cnt)
2288 assert(m_total_buckets == TOTAL_BUCKETS_INIT);
2289 m_total_buckets = cnt;
2291 Uint64 * array = m_known_gci.getBase();
2292 Uint32 mask = m_known_gci.size() - 1;
2293 Uint32 minpos = m_min_gci_index;
2294 Uint32 maxpos = m_max_gci_index;
2297 Uint32 pos = minpos;
2298 for (; pos != maxpos; pos = (pos + 1) & mask)
2301 if (TOTAL_BUCKETS_INIT >= tmp->m_gcp_complete_rep_count)
2305 ndbout_c(
"set_total_buckets(%u) complete %u/%u",
2306 cnt, Uint32(tmp->m_gci >> 32), Uint32(tmp->m_gci));
2307 tmp->m_gcp_complete_rep_count = 0;
2308 complete_bucket(tmp);
2312 assert(tmp->m_gcp_complete_rep_count > TOTAL_BUCKETS_INIT);
2313 tmp->m_gcp_complete_rep_count -= TOTAL_BUCKETS_INIT;
2318 NdbCondition_Signal(p_cond);
2325 m_alive_node_bit_mask.
clear(node_id);
2331 DBUG_ENTER(
"NdbEventBuffer::report_node_failure_completed");
2334 bzero(&data,
sizeof(data));
2335 bzero(ptr,
sizeof(ptr));
2338 data.requestInfo = 0;
2339 SubTableData::setOperation(data.requestInfo,
2340 NdbDictionary::Event::_TE_NODE_FAILURE);
2341 SubTableData::setReqNodeId(data.requestInfo, node_id);
2342 SubTableData::setNdbdNodeId(data.requestInfo, node_id);
2343 data.flags = SubTableData::LOG;
2345 Uint64 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2346 find_max_known_gci(&gci);
2348 data.gci_hi = Uint32(gci >> 32);
2349 data.gci_lo = Uint32(gci);
2355 insert_event(&op->m_impl, data, ptr, data.senderData);
2357 if (!m_alive_node_bit_mask.
isclear())
2364 DBUG_PRINT(
"info", (
"Cluster failure"));
2366 gci = Uint64((m_latestGCI >> 32) + 1) << 32;
2367 bool found = find_max_known_gci(&gci);
2369 Uint64 * array = m_known_gci.getBase();
2370 Uint32 mask = m_known_gci.size() - 1;
2371 Uint32 minpos = m_min_gci_index;
2372 Uint32 maxpos = m_max_gci_index;
2374 while (minpos != maxpos && array[minpos] != gci)
2378 assert(maxpos == m_max_gci_index);
2380 if(!tmp->m_data.is_empty())
2382 free_list(tmp->m_data);
2384 tmp->~Gci_container();
2387 minpos = (minpos + 1) & mask;
2389 m_min_gci_index = minpos;
2392 assert(((minpos + 1) & mask) == maxpos);
2396 assert(minpos == maxpos);
2403 data.requestInfo = 0;
2404 SubTableData::setOperation(data.requestInfo,
2405 NdbDictionary::Event::_TE_CLUSTER_FAILURE);
2411 insert_event(&op->m_impl, data, ptr, data.senderData);
2424 assert(m_max_gci_index == maxpos);
2428 assert(m_max_gci_index == ((maxpos + 1) & mask));
2430 Uint32 cnt = tmp->m_gcp_complete_rep_count;
2433 rep.gci_hi= (Uint32)(gci >> 32);
2434 rep.gci_lo= (Uint32)(gci & 0xFFFFFFFF);
2435 rep.gcp_complete_rep_count= cnt;
2443 NdbEventBuffer::getLatestGCI()
2454 DBUG_ENTER_EVENT(
"NdbEventBuffer::insertDataL");
2455 const Uint32 ri = sdata->requestInfo;
2456 const Uint32 operation = SubTableData::getOperation(ri);
2457 Uint32 gci_hi = sdata->gci_hi;
2458 Uint32 gci_lo = sdata->gci_lo;
2460 if (unlikely(len < SubTableData::SignalLength))
2465 Uint64 gci= gci_lo | (Uint64(gci_hi) << 32);
2466 const bool is_data_event =
2467 operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
2471 if (operation == NdbDictionary::Event::_TE_CLUSTER_FAILURE)
2477 op->m_stop_gci = gci;
2479 else if (operation == NdbDictionary::Event::_TE_ACTIVE)
2483 (
"_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
2484 op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2485 DBUG_RETURN_EVENT(0);
2487 else if (operation == NdbDictionary::Event::_TE_STOP)
2491 (
"_TE_STOP: m_ref_count: %u for op: %p id: %u",
2492 op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
2493 DBUG_RETURN_EVENT(0);
2497 if ( likely((Uint32)op->mi_type & (1 << operation)))
2501 DBUG_PRINT_EVENT(
"info", (
"data insertion in eventId %d", op->m_eventId));
2502 DBUG_PRINT_EVENT(
"info", (
"gci=%d tab=%d op=%d node=%d",
2503 sdata->gci, sdata->tableId,
2504 SubTableData::getOperation(sdata->requestInfo),
2505 SubTableData::getReqNodeId(sdata->requestInfo)));
2507 if (unlikely(bucket == 0))
2513 DBUG_RETURN_EVENT(0);
2516 const bool is_blob_event = (op->theMainOp != NULL);
2517 const bool use_hash = op->m_mergeEvents && is_data_event;
2519 if (! is_data_event && is_blob_event)
2522 DBUG_PRINT_EVENT(
"info", (
"ignore non-data event on blob table"));
2523 DBUG_RETURN_EVENT(0);
2531 bucket->m_data_hash.search(hpos, op, ptr);
2538 data = alloc_data();
2539 if (unlikely(data == 0))
2541 op->m_has_error = 2;
2542 DBUG_RETURN_EVENT(-1);
2544 if (unlikely(copy_data(sdata, len, ptr, data, NULL)))
2546 op->m_has_error = 3;
2547 DBUG_RETURN_EVENT(-1);
2549 data->m_event_op = op;
2550 if (! is_blob_event || ! is_data_event)
2552 bucket->m_data.append_data(data);
2558 int ret = get_main_data(bucket, main_hpos, data);
2561 op->m_has_error = 4;
2562 DBUG_RETURN_EVENT(-1);
2567 main_data->m_event_op = op->theMainOp;
2568 bucket->m_data.append_data(main_data);
2571 main_data->m_pkhash = main_hpos.pkhash;
2572 bucket->m_data_hash.append(main_hpos, main_data);
2576 add_blob_data(bucket, main_data, data);
2580 data->m_pkhash = hpos.pkhash;
2581 bucket->m_data_hash.append(hpos, data);
2590 if (unlikely(merge_data(sdata, len, ptr, data, &bucket->m_data.m_sz)))
2592 op->m_has_error = 3;
2593 DBUG_RETURN_EVENT(-1);
2596 if (! is_blob_event) {
2603 bucket->m_data.add_gci_op(g);
2608 (1 << SubTableData::getOperation(data->sdata->requestInfo))};
2609 bucket->m_data.add_gci_op(g);
2613 #ifdef NDB_EVENT_VERIFY_SIZE
2614 verify_size(bucket->m_data);
2616 DBUG_RETURN_EVENT(0);
2620 if ((Uint32)op->m_eventImpl->mi_type & (1 << operation))
2622 DBUG_PRINT_EVENT(
"info",(
"Data arrived before ready eventId", op->m_eventId));
2623 DBUG_RETURN_EVENT(0);
2626 DBUG_PRINT_EVENT(
"info",(
"skipped"));
2627 DBUG_RETURN_EVENT(0);
2630 DBUG_RETURN_EVENT(0);
2636 NdbEventBuffer::alloc_data()
2638 DBUG_ENTER_EVENT(
"alloc_data");
2641 if (unlikely(data == 0))
2644 assert(m_free_data_count == 0);
2645 assert(m_free_data_sz == 0);
2651 if (unlikely(data == 0))
2654 printf(
"m_latest_command: %s\n", m_latest_command);
2655 printf(
"no free data, m_latestGCI %u/%u\n",
2656 (Uint32)(m_latestGCI << 32), (Uint32)m_latestGCI);
2657 printf(
"m_free_data_count %d\n", m_free_data_count);
2658 printf(
"m_available_data_count %d first gci{hi/lo} %u/%u last gci{hi/lo} %u/%u\n",
2659 m_available_data.m_count,
2660 m_available_data.m_head?m_available_data.m_head->sdata->gci_hi:0,
2661 m_available_data.m_head?m_available_data.m_head->sdata->gci_lo:0,
2662 m_available_data.m_tail?m_available_data.m_tail->sdata->gci_hi:0,
2663 m_available_data.m_tail?m_available_data.m_tail->sdata->gci_lo:0);
2664 printf(
"m_used_data_count %d\n", m_used_data.m_count);
2666 DBUG_RETURN_EVENT(0);
2671 if (data->m_next_blob == 0)
2672 m_free_data = data->m_next;
2675 if (data2->m_next == 0) {
2676 data->m_next_blob = data2->m_next_blob;
2680 data2->m_next = data3->m_next;
2685 data->m_next_blob = 0;
2687 m_free_data_count--;
2688 assert(m_free_data_sz >= data->sz);
2690 m_free_data_sz -= data->sz;
2691 DBUG_RETURN_EVENT(data);
2701 DBUG_ENTER(
"NdbEventBuffer::alloc_mem");
2702 DBUG_PRINT(
"info", (
"ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
2703 const Uint32 min_alloc_size = 128;
2706 Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
2707 if (alloc_size < min_alloc_size)
2708 alloc_size = min_alloc_size;
2710 if (data->sz < alloc_size)
2712 Uint32 add_sz = alloc_size - data->sz;
2714 NdbMem_Free((
char*)data->memory);
2715 assert(m_total_alloc >= data->sz);
2719 data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
2720 if (data->memory == 0)
2722 m_total_alloc -= data->sz;
2725 data->sz = alloc_size;
2726 m_total_alloc += add_sz;
2728 if (change_sz != NULL)
2729 *change_sz += add_sz;
2732 Uint32* memptr = data->memory;
2735 for (i = 0; i <= 2; i++)
2737 data->ptr[
i].p = memptr;
2738 data->ptr[
i].sz = ptr[
i].sz;
2739 memptr += ptr[
i].sz;
2749 NdbMem_Free((
char*)data->memory);
2750 assert(m_total_alloc >= data->sz);
2751 m_total_alloc -= data->sz;
2752 if (change_sz != NULL) {
2753 assert(*change_sz >= data->sz);
2754 *change_sz -= data->sz;
2761 NdbEventBuffer::copy_data(
const SubTableData *
const sdata, Uint32 len,
2766 DBUG_ENTER_EVENT(
"NdbEventBuffer::copy_data");
2768 if (alloc_mem(data, ptr, change_sz) != 0)
2769 DBUG_RETURN_EVENT(-1);
2772 if (unlikely(len < SubTableData::SignalLength))
2774 data->sdata->gci_lo = 0;
2776 if (len < SubTableData::SignalLengthWithTransId)
2779 data->sdata->transId1 = ~Uint32(0);
2780 data->sdata->transId2 = ~Uint32(0);
2784 for (i = 0; i <= 2; i++)
2785 memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
2786 DBUG_RETURN_EVENT(0);
2789 static struct Ev_t {
2791 enum_INS = NdbDictionary::Event::_TE_INSERT,
2792 enum_DEL = NdbDictionary::Event::_TE_DELETE,
2793 enum_UPD = NdbDictionary::Event::_TE_UPDATE,
2794 enum_NUL = NdbDictionary::Event::_TE_NUL,
2800 { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM },
2801 { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL },
2802 { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS },
2803 { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD },
2804 { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM },
2805 { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR },
2806 { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR },
2807 { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL },
2808 { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD }
2819 copy_head(Uint32& i1, Uint32* p1, Uint32& i2,
const Uint32* p2,
2823 bool do_copy = (flags & 1);
2833 Uint32& j1, Uint32* p1, Uint32& j2,
const Uint32* p2,
2836 bool do_copy = (flags & 1);
2837 bool with_head = (flags & 2);
2838 Uint32 n = with_head + ah.getDataSize();
2842 for (k = 0; k <
n; k++)
2843 p1[j1 + k] = p2[j2 + k];
2850 NdbEventBuffer::merge_data(
const SubTableData *
const sdata, Uint32 len,
2855 DBUG_ENTER_EVENT(
"NdbEventBuffer::merge_data");
2862 Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->
m_noOfKeys;
2864 int t1 = SubTableData::getOperation(data->sdata->requestInfo);
2865 int t2 = SubTableData::getOperation(sdata->requestInfo);
2866 if (t1 == Ev_t::enum_NUL)
2867 DBUG_RETURN_EVENT(copy_data(sdata, len, ptr2, data, change_sz));
2871 for (i = 0; (uint) i <
sizeof(ev_t)/
sizeof(ev_t[0]); i++) {
2872 if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
2877 assert(tp != 0 && tp->t3 != Ev_t::enum_ERR);
2879 if (tp->t3 == Ev_t::enum_IDM) {
2891 const int maxsec = 1;
2894 for (i = 0; i <= maxsec; i++) {
2895 if (ptr1[i].sz != ptr2[i].sz ||
2896 memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) {
2897 DBUG_PRINT(
"info", (
"idempotent op %d*%d data differs in sec %d",
2898 tp->t1, tp->t2, i));
2900 DBUG_RETURN_EVENT(-1);
2903 DBUG_PRINT(
"info", (
"idempotent op %d*%d data ok", tp->t1, tp->t2));
2904 DBUG_RETURN_EVENT(0);
2921 for (loop = 0; loop <= 1; loop++)
2925 if (alloc_mem(data, ptr, change_sz) != 0)
2930 *data->sdata = *sdata;
2931 SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
2934 ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
2945 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2946 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
2953 if (tp->t3 != Ev_t::enum_DEL)
2956 Uint32 i = ptr[0].sz;
2957 Uint32 j = ptr[1].sz;
2961 Uint32 j2 = ptr[1].sz;
2968 bool b1 = (i1 < ptr1[0].sz);
2969 bool b2 = (i2 < ptr2[0].sz);
2986 ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
2987 copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
2991 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
2992 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
3002 if (tp->t3 != Ev_t::enum_INS)
3010 bool b1 = (k1 < ptr1[2].sz);
3011 bool b2 = (k2 < ptr2[2].sz);
3029 copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
3034 copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
3044 dealloc_mem(&olddata, change_sz);
3045 DBUG_RETURN_EVENT(result);
3059 DBUG_ENTER_EVENT(
"NdbEventBuffer::get_main_data");
3061 int blobVersion = blob_data->m_event_op->theBlobVersion;
3062 assert(blobVersion == 1 || blobVersion == 2);
3065 assert(main_op != NULL);
3066 const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
3071 Uint32 pk_ah[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
3072 Uint32* pk_data = blob_data->ptr[1].p;
3075 if (unlikely(blobVersion == 1)) {
3081 Uint32 max_size =
AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
3086 for (i = 0; n < mainTable->
m_noOfKeys; i++) {
3092 Uint32 bytesize = c->m_attrSize * c->m_arraySize;
3094 assert(sz < max_size);
3100 pk_ah[
n] = ah.m_value;
3101 sz += ah.getDataSize();
3105 assert(sz <= max_size);
3115 for (i = 0; n < mainTable->
m_noOfKeys; i++) {
3122 ah.setAttributeId(i);
3123 pk_ah[
n] = ah.m_value;
3124 sz += ah.getDataSize();
3133 ptr[1].sz = pk_size;
3138 DBUG_DUMP_EVENT(
"ah", (
char*)ptr[0].p, ptr[0].sz << 2);
3139 DBUG_DUMP_EVENT(
"pk", (
char*)ptr[1].p, ptr[1].sz << 2);
3142 bucket->m_data_hash.search(hpos, main_op, ptr);
3143 if (hpos.data != NULL)
3144 DBUG_RETURN_EVENT(0);
3148 if (main_data == NULL)
3149 DBUG_RETURN_EVENT(-1);
3151 sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
3152 SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
3153 if (copy_data(&sdata, SubTableData::SignalLength, ptr, main_data, NULL) != 0)
3154 DBUG_RETURN_EVENT(-1);
3155 hpos.data = main_data;
3157 DBUG_RETURN_EVENT(1);
3165 DBUG_ENTER_EVENT(
"NdbEventBuffer::add_blob_data");
3166 DBUG_PRINT_EVENT(
"info", (
"main_data=%p blob_data=%p", main_data, blob_data));
3168 head = main_data->m_next_blob;
3169 while (head != NULL)
3171 if (head->m_event_op == blob_data->m_event_op)
3173 head = head->m_next_blob;
3178 head->m_next_blob = main_data->m_next_blob;
3179 main_data->m_next_blob = head;
3183 blob_data->m_next = head->m_next;
3184 head->m_next = blob_data;
3187 bucket->m_data.m_count += 1;
3188 bucket->m_data.m_sz += blob_data->sz;
3189 DBUG_VOID_RETURN_EVENT;
3193 NdbEventBuffer::move_data()
3196 if (!m_complete_data.m_data.is_empty())
3199 m_available_data.append_list(&m_complete_data.m_data, 0);
3201 bzero(&m_complete_data,
sizeof(m_complete_data));
3205 if (!m_used_data.is_empty())
3208 free_list(m_used_data);
3210 if (!m_available_data.is_empty())
3212 DBUG_ENTER_EVENT(
"NdbEventBuffer::move_data");
3214 DBUG_PRINT_EVENT(
"exit",(
"m_available_data_count %u", m_available_data.m_count));
3216 DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op);
3224 #ifdef NDB_EVENT_VERIFY_SIZE
3228 list.m_tail->m_next= m_free_data;
3229 m_free_data= list.m_head;
3231 m_free_data_count+= list.m_count;
3233 m_free_data_sz+= list.m_sz;
3235 list.m_head = list.m_tail = NULL;
3236 list.m_count = list.m_sz = 0;
3241 #ifdef NDB_EVENT_VERIFY_SIZE
3242 NdbEventBuffer::verify_size(*list);
3244 move_gci_ops(list, gci);
3247 m_tail->m_next= list->m_head;
3249 m_head= list->m_head;
3250 m_tail= list->m_tail;
3251 m_count+= list->m_count;
3256 EventBufData_list::add_gci_op(Gci_op g)
3258 DBUG_ENTER_EVENT(
"EventBufData_list::add_gci_op");
3259 DBUG_PRINT_EVENT(
"info", (
"p.op: %p g.event_types: %x", g.op, g.event_types));
3260 assert(g.op != NULL && g.op->theMainOp == NULL);
3262 for (i = 0; i < m_gci_op_count; i++) {
3263 if (m_gci_op_list[i].op == g.op)
3266 if (i < m_gci_op_count) {
3267 m_gci_op_list[
i].event_types |= g.event_types;
3269 if (m_gci_op_count == m_gci_op_alloc) {
3270 Uint32 n = 1 + 2 * m_gci_op_alloc;
3271 Gci_op* old_list = m_gci_op_list;
3272 m_gci_op_list =
new Gci_op [
n];
3273 if (m_gci_op_alloc != 0) {
3274 Uint32 bytes = m_gci_op_alloc *
sizeof(Gci_op);
3275 memcpy(m_gci_op_list, old_list, bytes);
3276 DBUG_PRINT_EVENT(
"info", (
"this: %p delete m_gci_op_list: %p",
3281 assert(old_list == 0);
3282 DBUG_PRINT_EVENT(
"info", (
"this: %p new m_gci_op_list: %p",
3283 this, m_gci_op_list));
3286 assert(m_gci_op_count < m_gci_op_alloc);
3290 m_gci_op_list[m_gci_op_count++] = g;
3292 DBUG_PRINT_EVENT(
"exit", (
"m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types));
3293 DBUG_VOID_RETURN_EVENT;
3299 DBUG_ENTER_EVENT(
"EventBufData_list::move_gci_ops");
3300 DBUG_PRINT_EVENT(
"info", (
"this: %p list: %p gci: %u/%u",
3301 this, list, (Uint32)(gci >> 32), (Uint32)gci));
3302 assert(!m_is_not_multi_list);
3303 if (!list->m_is_not_multi_list)
3306 if (m_gci_ops_list_tail)
3307 m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
3310 m_gci_ops_list = list->m_gci_ops_list;
3312 m_gci_ops_list_tail = list->m_gci_ops_list_tail;
3316 Gci_ops *new_gci_ops =
new Gci_ops;
3317 DBUG_PRINT_EVENT(
"info", (
"this: %p m_gci_op_list: %p",
3318 new_gci_ops, list->m_gci_op_list));
3319 if (m_gci_ops_list_tail)
3320 m_gci_ops_list_tail->m_next = new_gci_ops;
3323 assert(m_gci_ops_list == 0);
3324 m_gci_ops_list = new_gci_ops;
3326 m_gci_ops_list_tail = new_gci_ops;
3328 new_gci_ops->m_gci_op_list = list->m_gci_op_list;
3329 new_gci_ops->m_gci_op_count = list->m_gci_op_count;
3330 new_gci_ops->m_gci = gci;
3331 new_gci_ops->m_next = 0;
3334 list->m_gci_op_list = 0;
3335 list->m_gci_ops_list_tail = 0;
3336 list->m_gci_op_alloc = 0;
3337 DBUG_VOID_RETURN_EVENT;
3341 NdbEventBuffer::createEventOperation(
const char* eventName,
3344 DBUG_ENTER(
"NdbEventBuffer::createEventOperation");
3348 theError.
code= 4000;
3358 getEventOperationImpl(tOp)->m_ref_count = 1;
3359 DBUG_PRINT(
"info", (
"m_ref_count: %u for op: %p",
3360 getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
3365 NdbEventBuffer::createEventOperationImpl(
NdbEventImpl& evnt,
3368 DBUG_ENTER(
"NdbEventBuffer::createEventOperationImpl");
3372 theError.
code= 4000;
3386 DBUG_ENTER(
"NdbEventBuffer::dropEventOperation");
3391 if (op->theMainOp == NULL)
3393 Uint64 max_stop_gci = op->m_stop_gci;
3395 while (tBlobOp != NULL)
3398 Uint64 stop_gci = tBlobOp->m_stop_gci;
3399 if (stop_gci > max_stop_gci)
3400 max_stop_gci = stop_gci;
3401 tBlobOp = tBlobOp->m_next;
3403 tBlobOp = op->theBlobOpList;
3404 while (tBlobOp != NULL)
3406 tBlobOp->m_stop_gci = max_stop_gci;
3407 tBlobOp = tBlobOp->m_next;
3409 op->m_stop_gci = max_stop_gci;
3415 NdbMutex_Lock(m_mutex);
3418 if (op->theMainOp == NULL)
3420 while (op->theBlobList != NULL)
3422 NdbBlob* tBlob = op->theBlobList;
3423 op->theBlobList = tBlob->theNext;
3424 m_ndb->releaseNdbBlob(tBlob);
3429 op->m_next->m_prev= op->m_prev;
3431 op->m_prev->m_next= op->m_next;
3433 m_ndb->theImpl->m_ev_op= op->m_next;
3435 assert(m_ndb->theImpl->m_ev_op == 0 || m_ndb->theImpl->m_ev_op->m_prev == 0);
3437 DBUG_ASSERT(op->m_ref_count > 0);
3442 DBUG_PRINT(
"info", (
"m_ref_count: %u for op: %p", op->m_ref_count, op));
3443 if (op->m_ref_count == 0)
3445 NdbMutex_Unlock(m_mutex);
3446 DBUG_PRINT(
"info", (
"deleting op: %p", op));
3447 delete op->m_facade;
3451 op->m_next= m_dropped_ev_op;
3453 if (m_dropped_ev_op)
3454 m_dropped_ev_op->m_prev= op;
3455 m_dropped_ev_op= op;
3457 NdbMutex_Unlock(m_mutex);
3463 NdbEventBuffer::reportStatus()
3466 Uint64 apply_gci, latest_gci= m_latestGCI;
3468 apply_buf= m_complete_data.m_data.m_head;
3469 if (apply_buf && apply_buf->sdata)
3471 Uint32 gci_hi = apply_buf->sdata->gci_hi;
3472 Uint32 gci_lo = apply_buf->sdata->gci_lo;
3473 apply_gci= gci_lo | (Uint64(gci_hi) << 32);
3476 apply_gci= latest_gci;
3480 if (100*(Uint64)m_free_data_sz < m_min_free_thresh*(Uint64)m_total_alloc &&
3481 m_total_alloc > 1024*1024)
3486 m_min_free_thresh= 0;
3487 m_max_free_thresh= 2 * m_free_thresh;
3491 if (100*(Uint64)m_free_data_sz > m_max_free_thresh*(Uint64)m_total_alloc &&
3492 m_total_alloc > 1024*1024)
3497 m_min_free_thresh= m_free_thresh;
3498 m_max_free_thresh= 100;
3502 if (m_gci_slip_thresh &&
3503 (latest_gci-apply_gci >= m_gci_slip_thresh))
3512 data[1]= m_total_alloc-m_free_data_sz;
3513 data[2]= m_total_alloc;
3515 data[4]= (Uint32)(apply_gci);
3516 data[5]= (Uint32)(apply_gci >> 32);
3517 data[6]= (Uint32)(latest_gci);
3518 data[7]= (Uint32)(latest_gci >> 32);
3519 Ndb_internal::send_event_report(
true, m_ndb, data,8);
3521 assert(m_total_alloc >= m_free_data_sz);
3527 NdbEventBuffer::verify_size(
const EventBufData* data, Uint32 count, Uint32 sz)
3530 Uint32 tmp_count = 0;
3533 Uint32 full_count, full_sz;
3534 data->get_full_size(full_count, full_sz);
3535 tmp_count += full_count;
3537 data = data->m_next;
3539 assert(tmp_count == count);
3540 assert(tmp_sz == sz);
3547 verify_size(list.m_head, list.m_count, list.m_sz);
3558 DBUG_ENTER_EVENT(
"EventBufData_hash::getpkhash");
3559 DBUG_DUMP_EVENT(
"ah", (
char*)ptr[0].p, ptr[0].sz << 2);
3560 DBUG_DUMP_EVENT(
"pk", (
char*)ptr[1].p, ptr[1].sz << 2);
3562 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3567 assert(nkey != 0 && nkey <= ptr[0].sz);
3568 const Uint32* hptr = ptr[0].p;
3569 const uchar* dptr = (uchar*)ptr[1].p;
3577 Uint32 bytesize = ah.getByteSize();
3578 assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
3588 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3589 (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
3590 dptr += ((bytesize + 3) / 4) * 4;
3592 DBUG_PRINT_EVENT(
"info", (
"hash result=%08x", nr1));
3593 DBUG_RETURN_EVENT(nr1);
3599 DBUG_ENTER_EVENT(
"EventBufData_hash::getpkequal");
3600 DBUG_DUMP_EVENT(
"ah1", (
char*)ptr1[0].p, ptr1[0].sz << 2);
3601 DBUG_DUMP_EVENT(
"pk1", (
char*)ptr1[1].p, ptr1[1].sz << 2);
3602 DBUG_DUMP_EVENT(
"ah2", (
char*)ptr2[0].p, ptr2[0].sz << 2);
3603 DBUG_DUMP_EVENT(
"pk2", (
char*)ptr2[1].p, ptr2[1].sz << 2);
3605 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
3608 assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
3609 const Uint32* hptr1 = ptr1[0].p;
3610 const Uint32* hptr2 = ptr2[0].p;
3611 const uchar* dptr1 = (uchar*)ptr1[1].p;
3612 const uchar* dptr2 = (uchar*)ptr2[1].p;
3621 Uint32 bytesize1 = ah1.getByteSize();
3622 Uint32 bytesize2 = ah2.getByteSize();
3623 assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
3624 assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
3626 assert(ah1.getAttributeId() == ah2.getAttributeId());
3627 Uint32 i = ah1.getAttributeId();
3635 assert(ok1 && ok2 && lb1 == lb2);
3637 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
3638 int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2,
false);
3644 dptr1 += ((bytesize1 + 3) / 4) * 4;
3645 dptr2 += ((bytesize2 + 3) / 4) * 4;
3648 DBUG_PRINT_EVENT(
"info", (
"equal=%s", equal ?
"true" :
"false"));
3649 DBUG_RETURN_EVENT(equal);
3655 DBUG_ENTER_EVENT(
"EventBufData_hash::search");
3656 Uint32 pkhash = getpkhash(op, ptr);
3657 Uint32
index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
3661 if (data->m_event_op == op &&
3662 data->m_pkhash == pkhash &&
3663 getpkequal(op, data->ptr, ptr))
3665 data = data->m_next_hash;
3669 hpos.pkhash = pkhash;
3670 DBUG_PRINT_EVENT(
"info", (
"search result=%p", data));
3671 DBUG_VOID_RETURN_EVENT;