Skip to content

Commit 7f43bb3

Browse files
authored
feat(tmq): optimize the logic of changing tablelist for stb sub (#35133)
1 parent 81c0cf2 commit 7f43bb3

51 files changed

Lines changed: 8429 additions & 8708 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

include/common/tmsg.h

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5597,7 +5597,7 @@ enum {
55975597
};
55985598

55995599
enum {
5600-
WITH_DATA = 0,
5600+
ONLY_DATA = 0,
56015601
WITH_META = 1,
56025602
ONLY_META = 2,
56035603
};
@@ -6203,12 +6203,6 @@ typedef struct {
62036203
int32_t debugFlag;
62046204
} SMqHbRsp;
62056205

6206-
typedef struct {
6207-
SMsgHead head;
6208-
int64_t consumerId;
6209-
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
6210-
} SMqSeekReq;
6211-
62126206
#define TD_AUTO_CREATE_TABLE 0x1
62136207
typedef struct {
62146208
int64_t suid;
@@ -6336,9 +6330,6 @@ int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
63366330
int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
63376331
void tDestroySMqHbRsp(SMqHbRsp* pRsp);
63386332

6339-
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
6340-
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
6341-
63426333
#define TD_REQ_FROM_APP 0x0
63436334
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
63446335
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2

include/common/tmsgdef.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@
466466
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
467467
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
468468
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
469-
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK, "vnode-tmq-seek", NULL, NULL)
469+
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK, "vnode-tmq-seek", NULL, NULL) // no longer used
470470
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
471471
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
472472
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)

include/libs/executor/executor.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,10 @@ bool qTaskIsDone(qTaskInfo_t tinfo);
162162
*/
163163
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
164164

165-
void qUpdateTableTagCacheForTmq(qTaskInfo_t tinfo, const SArray* tableIdList, SArray* cids, SArray* cidListArray);
166-
int32_t qUpdateTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList);
167-
int32_t qDeleteTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList);
168-
int32_t qAddTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList);
165+
void qUpdateTableTagCacheForQuerySub(qTaskInfo_t tinfo, const SArray* tableIdList, SArray* cids, SArray* cidListArray);
166+
int32_t qUpdateTableListForQuerySub(qTaskInfo_t tinfo, const SArray* tableIdList);
167+
int32_t qDeleteTableListForQuerySub(qTaskInfo_t tinfo, const SArray* tableIdList);
168+
int32_t qAddTableListForQuerySub(qTaskInfo_t tinfo, const SArray* tableIdList);
169169

170170
bool qIsDynamicExecTask(qTaskInfo_t tinfo);
171171

@@ -321,7 +321,7 @@ bool isTaskKilled(void* pTaskInfo);
321321

322322

323323
bool isTrueForSatisfied(STrueForInfo* pTrueForInfo, int64_t skey, int64_t ekey, int64_t count);
324-
int32_t qFilterTableList(void* pVnode, SArray* uidList, SNode* node, void* pTaskInfo, uint64_t suid);
324+
int32_t qFilterTableList(void* pVnode, SArray* uidList, int64_t version, SNode* node, void* pTaskInfo, uint64_t suid);
325325
bool checkCidInTagCondition(SNode* node, SArray* cidList);
326326
SNode* getTagCondNodeForStableTmq(void* node);
327327
SNode* getTagCondNodeForQueryTmq(void* tinfo);

include/libs/executor/storageapi.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ typedef struct SStoreMeta {
286286
int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
287287

288288
int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
289-
int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
289+
int32_t (*getTableTagsByUidVersion)(void* pVnode, int64_t suid, SArray* uidList, int64_t version);
290290
const void* (*extractTagVal)(const void* tag, int16_t type, STagVal* tagVal); // todo remove it
291291

292292
int32_t (*getTableUidByName)(void* pVnode, char* tbName, uint64_t* uid);
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3+
*
4+
* This program is free software: you can use, redistribute, and/or modify
5+
* it under the terms of the GNU Affero General Public License, version 3
6+
* or later ("AGPL"), as published by the Free Software Foundation.
7+
*
8+
* This program is distributed in the hope that it will be useful, but WITHOUT
9+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10+
* FITNESS FOR A PARTICULAR PURPOSE.
11+
*
12+
* You should have received a copy of the GNU Affero General Public License
13+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14+
*/
15+
16+
#ifndef CLIENT_RAW_BLOCK_WRITE_H
17+
#define CLIENT_RAW_BLOCK_WRITE_H
18+
19+
#include "clientInt.h"
20+
21+
#ifdef __cplusplus
22+
extern "C" {
23+
#endif
24+
25+
// Common macros shared across clientRawBlockWrite.c and clientRawBlockJson.c
26+
27+
#define RAW_LOG_END \
28+
if (code != 0) { \
29+
uError("%s failed at line:%d since:%s", __func__, lino, tstrerror(code)); \
30+
} else { \
31+
uDebug("%s return success", __func__); \
32+
}
33+
34+
#define RAW_LOG_START uDebug("%s start", __func__);
35+
36+
#define RAW_NULL_CHECK(c) \
37+
do { \
38+
if (c == NULL) { \
39+
lino = __LINE__; \
40+
code = terrno; \
41+
goto end; \
42+
} \
43+
} while (0)
44+
45+
#define RAW_FALSE_CHECK(c) \
46+
do { \
47+
if (!(c)) { \
48+
code = TSDB_CODE_INVALID_PARA; \
49+
lino = __LINE__; \
50+
goto end; \
51+
} \
52+
} while (0)
53+
54+
#define RAW_RETURN_CHECK(c) \
55+
do { \
56+
code = c; \
57+
if (code != 0) { \
58+
lino = __LINE__; \
59+
goto end; \
60+
} \
61+
} while (0)
62+
63+
#define LOG_ID_TAG "connId:0x%" PRIx64 ", QID:0x%" PRIx64 ",func:%s"
64+
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId, __func__
65+
66+
#define PROCESS_TABLE_NOT_EXIST(code, tbName) \
67+
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { \
68+
uWarn(LOG_ID_TAG " %s not exists, skip", LOG_ID_VALUE, tbName); \
69+
code = TSDB_CODE_SUCCESS; \
70+
continue; \
71+
} \
72+
RAW_RETURN_CHECK(code);
73+
74+
#ifdef __cplusplus
75+
}
76+
#endif
77+
78+
#endif // CLIENT_RAW_BLOCK_WRITE_H

0 commit comments

Comments
 (0)