Skip to content

Commit 891eac1

Browse files
authored
fix(tmq): remove epset that is useless (#35126)
1 parent f716257 commit 891eac1

6 files changed

Lines changed: 431 additions & 129 deletions

File tree

source/dnode/mnode/impl/inc/mndDef.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,6 @@ typedef struct {
10391039

10401040
// data for display
10411041
int32_t pid;
1042-
SEpSet ep;
10431042
int64_t createTime;
10441043
int64_t pollTime;
10451044
int64_t subscribeTime;
@@ -1060,17 +1059,9 @@ void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
10601059
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
10611060
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
10621061

1063-
typedef struct {
1064-
int32_t vgId;
1065-
SEpSet epSet;
1066-
} SMqVgEp;
1067-
1068-
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
1069-
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver);
1070-
10711062
typedef struct {
10721063
int64_t consumerId; // -1 for unassigned
1073-
SArray* vgs; // SArray<SMqVgEp>
1064+
SArray* vgs; // SArray<vgId>
10741065
SArray* offsetRows; // SArray<OffsetRows>
10751066
} SMqConsumerEp;
10761067

@@ -1086,7 +1077,7 @@ typedef struct {
10861077
int8_t withMeta;
10871078
int64_t stbUid;
10881079
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
1089-
SArray* unassignedVgs; // SArray<SMqVgEp>
1080+
SArray* unassignedVgs; // SArray<vgId>
10901081
SArray* offsetRows;
10911082
char dbName[TSDB_DB_FNAME_LEN];
10921083
SRWLatch lock;
@@ -1106,7 +1097,7 @@ typedef struct {
11061097
typedef struct {
11071098
int64_t oldConsumerId;
11081099
int64_t newConsumerId;
1109-
SMqVgEp pVgEp;
1100+
int32_t vgId;
11101101
} SMqRebOutputVg;
11111102

11121103
typedef struct {

source/dnode/mnode/impl/src/mndConsumer.c

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
#include "tcompare.h"
2727
#include "tname.h"
2828

29-
#define MND_CONSUMER_VER_NUMBER 3
29+
#define MND_CONSUMER_VER_NUMBER 4
3030
#define MND_CONSUMER_RESERVE_SIZE 64
3131

3232
#define MND_MAX_GROUP_PER_TOPIC 100
@@ -332,7 +332,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
332332
return code;
333333
}
334334

335-
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp, int32_t epoch) {
335+
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp) {
336336
int32_t code = 0;
337337
int32_t lino = 0;
338338
SMqSubscribeObj *pSub = NULL;
@@ -356,18 +356,19 @@ static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, cha
356356

357357
tstrncpy(topicEp.db, pSub->dbName, TSDB_DB_FNAME_LEN);
358358
for (int32_t j = 0; j < vgNum; j++) {
359-
SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
360-
if (pVgEp == NULL) {
359+
int32_t *vgId = taosArrayGet(pConsumerEp->vgs, j);
360+
if (vgId == NULL) {
361361
continue;
362362
}
363-
if (epoch == -1) {
364-
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
365-
if (pVgroup) {
366-
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
367-
mndReleaseVgroup(pMnode, pVgroup);
368-
}
363+
SMqSubVgEp vgEp = {.epSet = {0}, .vgId = *vgId, .offset = -1};
364+
SVgObj *pVgroup = mndAcquireVgroup(pMnode, *vgId);
365+
if (pVgroup == NULL) {
366+
mWarn("failed to acquire vgroup:%d", *vgId);
367+
code = terrno;
368+
goto END;
369369
}
370-
SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
370+
vgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
371+
mndReleaseVgroup(pMnode, pVgroup);
371372
MND_TMQ_NULL_CHECK(taosArrayPush(topicEp.vgs, &vgEp));
372373
}
373374
MND_TMQ_NULL_CHECK(taosArrayPush(rsp->topics, &topicEp));
@@ -383,7 +384,7 @@ static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, cha
383384
return code;
384385
}
385386

386-
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
387+
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqAskEpRsp *rsp){
387388
if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
388389
return TSDB_CODE_INVALID_PARA;
389390
}
@@ -398,7 +399,7 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
398399
// handle all topics subscribed by this consumer
399400
for (int32_t i = 0; i < numOfTopics; i++) {
400401
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
401-
MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp, epoch));
402+
MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp));
402403
}
403404

404405
END:
@@ -474,7 +475,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
474475
if (epoch != serverEpoch) {
475476
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
476477
consumerId, epoch, serverEpoch);
477-
MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
478+
MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, &rsp));
478479
}
479480
}
480481
code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
@@ -814,8 +815,6 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
814815
goto CM_DECODE_OVER;
815816
}
816817

817-
tmsgUpdateDnodeEpSet(&pConsumer->ep);
818-
819818
CM_DECODE_OVER:
820819
taosMemoryFreeClear(buf);
821820
if (terrno != TSDB_CODE_SUCCESS) {

source/dnode/mnode/impl/src/mndDef.c

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,17 @@ void freeSMqConsumerEp(void* data) {
8585
pConsumerEp->offsetRows = NULL;
8686
}
8787

88-
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
89-
int32_t tlen = 0;
90-
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
91-
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
92-
return tlen;
93-
}
94-
95-
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
96-
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
88+
static void *tDecodeSMqVgEp(const void *buf, int32_t *vgId, int8_t sver) {
89+
buf = taosDecodeFixedI32(buf, vgId);
9790
if (sver == 1) {
9891
uint64_t size = 0;
9992
buf = taosDecodeVariantU64(buf, &size);
10093
buf = POINTER_SHIFT(buf, size);
10194
}
102-
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
95+
if (sver <= 3) {
96+
SEpSet epSet = {0};
97+
buf = taosDecodeSEpSet(buf, &epSet);
98+
}
10399
return (void *)buf;
104100
}
105101

@@ -204,7 +200,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
204200
tlen += taosEncodeFixedI32(buf, pConsumer->status);
205201

206202
tlen += taosEncodeFixedI32(buf, pConsumer->pid);
207-
tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
208203
tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
209204
tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
210205
tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
@@ -278,7 +273,10 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
278273
buf = taosDecodeFixedI32(buf, &pConsumer->status);
279274

280275
buf = taosDecodeFixedI32(buf, &pConsumer->pid);
281-
buf = taosDecodeSEpSet(buf, &pConsumer->ep);
276+
if (sver <= 3) {
277+
SEpSet ep = {0};
278+
buf = taosDecodeSEpSet(buf, &ep);
279+
}
282280
buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
283281
buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
284282
buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
@@ -378,8 +376,9 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
378376
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
379377
tlen += taosEncodeFixedI32(buf, sz);
380378
for (int32_t i = 0; i < sz; i++) {
381-
void* data = taosArrayGet(pConsumerEp->vgs, i);
382-
tlen += tEncodeSMqVgEp(buf, data);
379+
int32_t* vgId = (int32_t*)taosArrayGet(pConsumerEp->vgs, i);
380+
if (vgId == NULL) continue;
381+
tlen += taosEncodeFixedI32(buf, *vgId);
383382
}
384383

385384
return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
@@ -416,14 +415,20 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
416415
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
417416
int32_t sz = 0;
418417
buf = taosDecodeFixedI32(buf, &sz);
419-
pConsumerEp->vgs = taosArrayInit(sz, sizeof(SMqVgEp));
418+
pConsumerEp->vgs = taosArrayInit(sz, sizeof(int32_t));
420419
if (pConsumerEp->vgs == NULL) {
421420
return NULL;
422421
}
423422
for (int32_t i = 0; i < sz; i++) {
424-
SMqVgEp* vgEp = taosArrayReserve(pConsumerEp->vgs, 1);
425-
if (vgEp != NULL)
426-
buf = tDecodeSMqVgEp(buf, vgEp, sver);
423+
int32_t* vgId = taosArrayReserve(pConsumerEp->vgs, 1);
424+
if (vgId == NULL) {
425+
return NULL;
426+
}
427+
if (sver <= 3) {
428+
buf = tDecodeSMqVgEp(buf, vgId, sver);
429+
} else {
430+
buf = taosDecodeFixedI32(buf, vgId);
431+
}
427432
}
428433
if (sver > 1) {
429434
buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
@@ -446,7 +451,7 @@ int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
446451
MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
447452
taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
448453

449-
pSubObj->unassignedVgs = taosArrayInit(0, sizeof(SMqVgEp));
454+
pSubObj->unassignedVgs = taosArrayInit(0, sizeof(int32_t));
450455
MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
451456
*ppSub = pSubObj;
452457
pSubObj = NULL;
@@ -539,8 +544,9 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
539544
int32_t len = taosArrayGetSize(pSub->unassignedVgs);
540545
tlen += taosEncodeFixedI32(buf, len);
541546
for (int32_t i = 0; i < len; i++) {
542-
void* data = taosArrayGet(pSub->unassignedVgs, i);
543-
tlen += tEncodeSMqVgEp(buf, data);
547+
int32_t* vgId = (int32_t*)taosArrayGet(pSub->unassignedVgs, i);
548+
if (vgId == NULL) continue;
549+
tlen += taosEncodeFixedI32(buf, *vgId);
544550
}
545551
tlen += taosEncodeString(buf, pSub->dbName);
546552

@@ -576,14 +582,20 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
576582

577583
int32_t len = 0;
578584
buf = taosDecodeFixedI32(buf, &len);
579-
pSub->unassignedVgs = taosArrayInit(len, sizeof(SMqVgEp));
585+
pSub->unassignedVgs = taosArrayInit(len, sizeof(int32_t));
580586
if (pSub->unassignedVgs == NULL) {
581587
return NULL;
582588
}
583589
for (int32_t i = 0; i < len; i++) {
584-
SMqVgEp* vgEp = taosArrayReserve(pSub->unassignedVgs, 1);
585-
if (vgEp != NULL)
586-
buf = tDecodeSMqVgEp(buf, vgEp, sver);
590+
int32_t* vgId = taosArrayReserve(pSub->unassignedVgs, 1);
591+
if (vgId == NULL) {
592+
return NULL;
593+
}
594+
if (sver <= 3) {
595+
buf = tDecodeSMqVgEp(buf, vgId, sver);
596+
} else {
597+
buf = taosDecodeFixedI32(buf, vgId);
598+
}
587599
}
588600

589601
buf = taosDecodeStringTo(buf, pSub->dbName);

0 commit comments

Comments
 (0)