Skip to content

Commit bb96390

Browse files
Merge branch '3.0' into merge/mainto3.0
2 parents 57371d0 + 891eac1 commit bb96390

21 files changed

Lines changed: 1420 additions & 150 deletions

File tree

docs/zh/05-basic/02-insert.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,19 @@ delete from meters where ts < '2021-10-01 10:40:00.100' ;
146146
通过查询系统表 `INS_DISK_USAGE`,可以查看数据库的压缩率和磁盘空间,参见 [文档](../../reference/taos-sql/database/#查看数据库的磁盘空间占用)
147147

148148
```sql
149-
select * from INFORMATION_SCHEMA.INS_DISK_USAGE where db_name = 'db_name';
149+
select * from INFORMATION_SCHEMA.INS_DISK_USAGE where db_name = 'db_name';
150150
```
151151

152152
通过如下命令,可以查看表的表的压缩率及具体分布情况,参见 [文档](../../reference/taos-sql/show/#show-table-distributed)
153153

154154
```sql
155155
show table distributed table_name;
156156
```
157+
158+
上面的 `show` 命令是按照数据块可能的数据条数动态计算的等分区间显示的分布情况,也可使用下面的语句查询系统表 `INS_TABLE_FIXED_DISTRIBUTED` 按照固定区间查看分布情况,后者更适合多表低频等单个数据块中数据条数较少的场景。
159+
160+
注意:目前,下面的 SQL 语句仅支持使用 `SELECT *`,不支持指定具体的查询字段,且 `WHERE` 条件中也只能包含 `` 关系的 `db_name``table_name` 条件。
161+
162+
```sql
163+
select * from INFORMATION_SCHEMA.INS_TABLE_FIXED_DISTRIBUTED where db_name = 'db_name' and table_name = 'table_name';
164+
```

include/common/systable.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ extern "C" {
4848
#define TSDB_INS_TABLE_COLS "ins_columns"
4949
#define TSDB_INS_TABLE_VC_COLS "ins_virtual_child_columns"
5050
#define TSDB_INS_TABLE_TABLE_DISTRIBUTED "ins_table_distributed"
51+
#define TSDB_INS_TABLE_TABLE_FIXED_DISTRIBUTED "ins_table_fixed_distributed"
5152
#define TSDB_INS_TABLE_USERS "ins_users"
5253
#define TSDB_INS_TABLE_USERS_FULL "ins_users_full"
5354
#define TSDB_INS_TABLE_TOKENS "ins_tokens"

include/common/tcommon.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ typedef struct STableBlockDistInfo {
307307
uint32_t numOfSttRows;
308308
uint32_t numOfVgroups;
309309
int32_t blockRowsHisto[20];
310+
int32_t blockRowsHistoFixed[8]; // buckets: ≤64, ≤128, ≤256, ≤512, ≤1024, ≤2048, ≤4096, >4096
310311
} STableBlockDistInfo;
311312

312313
int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDistInfo* pInfo);

source/common/src/systable.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,34 @@ static const SSysDbTableSchema userTblDistSchema[] = {
290290
{.name = "seek_header_time", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
291291
};
292292

293+
static const SSysDbTableSchema userTblFixedDistSchema[] = {
294+
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
295+
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
296+
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
297+
{.name = "total_blocks", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
298+
{.name = "total_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
299+
{.name = "average_size", .bytes = 8, .type = TSDB_DATA_TYPE_DOUBLE, .sysInfo = false},
300+
{.name = "compression_ratio", .bytes = 8, .type = TSDB_DATA_TYPE_DOUBLE, .sysInfo = false},
301+
{.name = "block_rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
302+
{.name = "min_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
303+
{.name = "max_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
304+
{.name = "avg_rows", .bytes = 8, .type = TSDB_DATA_TYPE_DOUBLE, .sysInfo = false},
305+
{.name = "in_mem_rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
306+
{.name = "stt_rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
307+
{.name = "total_tables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
308+
{.name = "total_filesets", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
309+
{.name = "total_vgroups", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
310+
{.name = "row_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
311+
{.name = "block_dist_64", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
312+
{.name = "block_dist_128", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
313+
{.name = "block_dist_256", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
314+
{.name = "block_dist_512", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
315+
{.name = "block_dist_1024", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
316+
{.name = "block_dist_2048", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
317+
{.name = "block_dist_4096", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
318+
{.name = "block_dist_other", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
319+
};
320+
293321
static const SSysDbTableSchema userUsersSchema[] = {
294322
{.name = "name", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
295323
{.name = "super", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
@@ -777,6 +805,7 @@ static const SSysTableMeta infosMeta[] = {
777805
{TSDB_INS_TABLE_TAGS, userTagsSchema, tListLen(userTagsSchema), false, PRIV_CAT_BASIC},
778806
{TSDB_INS_TABLE_COLS, userColsSchema, tListLen(userColsSchema), false, PRIV_CAT_BASIC},
779807
{TSDB_INS_TABLE_VC_COLS, userVctbColsSchema, tListLen(userVctbColsSchema), false, PRIV_CAT_BASIC},
808+
{TSDB_INS_TABLE_TABLE_FIXED_DISTRIBUTED, userTblFixedDistSchema, tListLen(userTblFixedDistSchema), false, PRIV_CAT_BASIC},
780809
// {TSDB_INS_TABLE_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema), PRIV_CAT_PRIVILEGED},
781810
{TSDB_INS_TABLE_USERS, userUsersSchema, tListLen(userUsersSchema), true, PRIV_CAT_SECURITY},
782811
{TSDB_INS_TABLE_USERS_FULL, userUsersFullSchema, tListLen(userUsersFullSchema), true, PRIV_CAT_SECURITY},

source/dnode/mnode/impl/inc/mndDef.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,6 @@ typedef struct {
10391039

10401040
// data for display
10411041
int32_t pid;
1042-
SEpSet ep;
10431042
int64_t createTime;
10441043
int64_t pollTime;
10451044
int64_t subscribeTime;
@@ -1060,17 +1059,9 @@ void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
10601059
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
10611060
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
10621061

1063-
typedef struct {
1064-
int32_t vgId;
1065-
SEpSet epSet;
1066-
} SMqVgEp;
1067-
1068-
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
1069-
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver);
1070-
10711062
typedef struct {
10721063
int64_t consumerId; // -1 for unassigned
1073-
SArray* vgs; // SArray<SMqVgEp>
1064+
SArray* vgs; // SArray<vgId>
10741065
SArray* offsetRows; // SArray<OffsetRows>
10751066
} SMqConsumerEp;
10761067

@@ -1086,7 +1077,7 @@ typedef struct {
10861077
int8_t withMeta;
10871078
int64_t stbUid;
10881079
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
1089-
SArray* unassignedVgs; // SArray<SMqVgEp>
1080+
SArray* unassignedVgs; // SArray<vgId>
10901081
SArray* offsetRows;
10911082
char dbName[TSDB_DB_FNAME_LEN];
10921083
SRWLatch lock;
@@ -1106,7 +1097,7 @@ typedef struct {
11061097
typedef struct {
11071098
int64_t oldConsumerId;
11081099
int64_t newConsumerId;
1109-
SMqVgEp pVgEp;
1100+
int32_t vgId;
11101101
} SMqRebOutputVg;
11111102

11121103
typedef struct {

source/dnode/mnode/impl/src/mndConsumer.c

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
#include "tcompare.h"
2727
#include "tname.h"
2828

29-
#define MND_CONSUMER_VER_NUMBER 3
29+
#define MND_CONSUMER_VER_NUMBER 4
3030
#define MND_CONSUMER_RESERVE_SIZE 64
3131

3232
#define MND_MAX_GROUP_PER_TOPIC 100
@@ -332,7 +332,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
332332
return code;
333333
}
334334

335-
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp, int32_t epoch) {
335+
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp) {
336336
int32_t code = 0;
337337
int32_t lino = 0;
338338
SMqSubscribeObj *pSub = NULL;
@@ -356,18 +356,19 @@ static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, cha
356356

357357
tstrncpy(topicEp.db, pSub->dbName, TSDB_DB_FNAME_LEN);
358358
for (int32_t j = 0; j < vgNum; j++) {
359-
SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
360-
if (pVgEp == NULL) {
359+
int32_t *vgId = taosArrayGet(pConsumerEp->vgs, j);
360+
if (vgId == NULL) {
361361
continue;
362362
}
363-
if (epoch == -1) {
364-
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
365-
if (pVgroup) {
366-
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
367-
mndReleaseVgroup(pMnode, pVgroup);
368-
}
363+
SMqSubVgEp vgEp = {.epSet = {0}, .vgId = *vgId, .offset = -1};
364+
SVgObj *pVgroup = mndAcquireVgroup(pMnode, *vgId);
365+
if (pVgroup == NULL) {
366+
mWarn("failed to acquire vgroup:%d", *vgId);
367+
code = terrno;
368+
goto END;
369369
}
370-
SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
370+
vgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
371+
mndReleaseVgroup(pMnode, pVgroup);
371372
MND_TMQ_NULL_CHECK(taosArrayPush(topicEp.vgs, &vgEp));
372373
}
373374
MND_TMQ_NULL_CHECK(taosArrayPush(rsp->topics, &topicEp));
@@ -383,7 +384,7 @@ static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, cha
383384
return code;
384385
}
385386

386-
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
387+
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqAskEpRsp *rsp){
387388
if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
388389
return TSDB_CODE_INVALID_PARA;
389390
}
@@ -398,7 +399,7 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
398399
// handle all topics subscribed by this consumer
399400
for (int32_t i = 0; i < numOfTopics; i++) {
400401
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
401-
MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp, epoch));
402+
MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp));
402403
}
403404

404405
END:
@@ -474,7 +475,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
474475
if (epoch != serverEpoch) {
475476
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
476477
consumerId, epoch, serverEpoch);
477-
MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
478+
MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, &rsp));
478479
}
479480
}
480481
code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
@@ -814,8 +815,6 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
814815
goto CM_DECODE_OVER;
815816
}
816817

817-
tmsgUpdateDnodeEpSet(&pConsumer->ep);
818-
819818
CM_DECODE_OVER:
820819
taosMemoryFreeClear(buf);
821820
if (terrno != TSDB_CODE_SUCCESS) {

source/dnode/mnode/impl/src/mndDef.c

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,17 @@ void freeSMqConsumerEp(void* data) {
8585
pConsumerEp->offsetRows = NULL;
8686
}
8787

88-
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
89-
int32_t tlen = 0;
90-
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
91-
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
92-
return tlen;
93-
}
94-
95-
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
96-
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
88+
static void *tDecodeSMqVgEp(const void *buf, int32_t *vgId, int8_t sver) {
89+
buf = taosDecodeFixedI32(buf, vgId);
9790
if (sver == 1) {
9891
uint64_t size = 0;
9992
buf = taosDecodeVariantU64(buf, &size);
10093
buf = POINTER_SHIFT(buf, size);
10194
}
102-
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
95+
if (sver <= 3) {
96+
SEpSet epSet = {0};
97+
buf = taosDecodeSEpSet(buf, &epSet);
98+
}
10399
return (void *)buf;
104100
}
105101

@@ -204,7 +200,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
204200
tlen += taosEncodeFixedI32(buf, pConsumer->status);
205201

206202
tlen += taosEncodeFixedI32(buf, pConsumer->pid);
207-
tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
208203
tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
209204
tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
210205
tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
@@ -278,7 +273,10 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
278273
buf = taosDecodeFixedI32(buf, &pConsumer->status);
279274

280275
buf = taosDecodeFixedI32(buf, &pConsumer->pid);
281-
buf = taosDecodeSEpSet(buf, &pConsumer->ep);
276+
if (sver <= 3) {
277+
SEpSet ep = {0};
278+
buf = taosDecodeSEpSet(buf, &ep);
279+
}
282280
buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
283281
buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
284282
buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
@@ -378,8 +376,9 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
378376
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
379377
tlen += taosEncodeFixedI32(buf, sz);
380378
for (int32_t i = 0; i < sz; i++) {
381-
void* data = taosArrayGet(pConsumerEp->vgs, i);
382-
tlen += tEncodeSMqVgEp(buf, data);
379+
int32_t* vgId = (int32_t*)taosArrayGet(pConsumerEp->vgs, i);
380+
if (vgId == NULL) continue;
381+
tlen += taosEncodeFixedI32(buf, *vgId);
383382
}
384383

385384
return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
@@ -416,14 +415,20 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
416415
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
417416
int32_t sz = 0;
418417
buf = taosDecodeFixedI32(buf, &sz);
419-
pConsumerEp->vgs = taosArrayInit(sz, sizeof(SMqVgEp));
418+
pConsumerEp->vgs = taosArrayInit(sz, sizeof(int32_t));
420419
if (pConsumerEp->vgs == NULL) {
421420
return NULL;
422421
}
423422
for (int32_t i = 0; i < sz; i++) {
424-
SMqVgEp* vgEp = taosArrayReserve(pConsumerEp->vgs, 1);
425-
if (vgEp != NULL)
426-
buf = tDecodeSMqVgEp(buf, vgEp, sver);
423+
int32_t* vgId = taosArrayReserve(pConsumerEp->vgs, 1);
424+
if (vgId == NULL) {
425+
return NULL;
426+
}
427+
if (sver <= 3) {
428+
buf = tDecodeSMqVgEp(buf, vgId, sver);
429+
} else {
430+
buf = taosDecodeFixedI32(buf, vgId);
431+
}
427432
}
428433
if (sver > 1) {
429434
buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
@@ -446,7 +451,7 @@ int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
446451
MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
447452
taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
448453

449-
pSubObj->unassignedVgs = taosArrayInit(0, sizeof(SMqVgEp));
454+
pSubObj->unassignedVgs = taosArrayInit(0, sizeof(int32_t));
450455
MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
451456
*ppSub = pSubObj;
452457
pSubObj = NULL;
@@ -539,8 +544,9 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
539544
int32_t len = taosArrayGetSize(pSub->unassignedVgs);
540545
tlen += taosEncodeFixedI32(buf, len);
541546
for (int32_t i = 0; i < len; i++) {
542-
void* data = taosArrayGet(pSub->unassignedVgs, i);
543-
tlen += tEncodeSMqVgEp(buf, data);
547+
int32_t* vgId = (int32_t*)taosArrayGet(pSub->unassignedVgs, i);
548+
if (vgId == NULL) continue;
549+
tlen += taosEncodeFixedI32(buf, *vgId);
544550
}
545551
tlen += taosEncodeString(buf, pSub->dbName);
546552

@@ -576,14 +582,20 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
576582

577583
int32_t len = 0;
578584
buf = taosDecodeFixedI32(buf, &len);
579-
pSub->unassignedVgs = taosArrayInit(len, sizeof(SMqVgEp));
585+
pSub->unassignedVgs = taosArrayInit(len, sizeof(int32_t));
580586
if (pSub->unassignedVgs == NULL) {
581587
return NULL;
582588
}
583589
for (int32_t i = 0; i < len; i++) {
584-
SMqVgEp* vgEp = taosArrayReserve(pSub->unassignedVgs, 1);
585-
if (vgEp != NULL)
586-
buf = tDecodeSMqVgEp(buf, vgEp, sver);
590+
int32_t* vgId = taosArrayReserve(pSub->unassignedVgs, 1);
591+
if (vgId == NULL) {
592+
return NULL;
593+
}
594+
if (sver <= 3) {
595+
buf = tDecodeSMqVgEp(buf, vgId, sver);
596+
} else {
597+
buf = taosDecodeFixedI32(buf, vgId);
598+
}
587599
}
588600

589601
buf = taosDecodeStringTo(buf, pSub->dbName);

0 commit comments

Comments
 (0)