Skip to content

Commit ee7c9c3

Browse files
authored
fix(tmq): add test case for drop table & updata docs (#35141)
1 parent cfe899f commit ee7c9c3

9 files changed

Lines changed: 175 additions & 49 deletions

File tree

docs/en/06-advanced/01-subscription/01-topic.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ CREATE TOPIC [IF NOT EXISTS] topic_name as subquery;
3131
This SQL query subscribes to data using a SELECT statement (for example, SELECT * or SELECT ts, c1), and may include filter conditions and scalar function calculations. However, aggregate functions and time-window aggregations are not supported.
3232

3333
1. Once this type of topic is created, the structure of the subscribed data is fixed.
34-
2. Columns or tags that are subscribed to or referenced in calculations cannot be deleted (`ALTER TABLE DROP`) or modified (`ALTER TABLE MODIFY`). From 3.4.0.0, you can modify and delete, but you need to reload the topic.
35-
3. If the table schema changes, any newly added columns will not appear in the subscription result.
36-
4. For SELECT \*, the subscription expands to include all columns present at creation time. For subtables and normal tables, these are data columns; for supertables, they include both data and tag columns.
37-
5. Query subscription on virtual tables is not supported.
34+
2. Columns or tags that are subscribed to or referenced in calculations cannot be deleted (`ALTER TABLE DROP`) or modified (`ALTER TABLE MODIFY`). From 3.4.0.0, you can modify or delete or add, but you need to execute the command of "reload topic".
35+
3. For SELECT \*, the subscription expands to include all columns present at creation time. For subtables and normal tables, these are data columns; for supertables, they include both data and tag columns.
36+
4. Query subscription on virtual tables is not supported.
37+
5. The super table, sub-table, and regular table within the subquery can be deleted. After deletion, the subscribed data becomes empty. If the table is recreated after deletion, the subscribed data remains empty because the table ID has changed. If you want to subscribe to the new table data, you can reload the topic using the reload topic syntax
3838

3939
For example, if you need to subscribe to all smart meter records where the voltage is greater than 200, and only return the timestamp, current, and voltage (excluding the phase), you can create a topic named `power_topic` with the following SQL statement:
4040

@@ -97,7 +97,7 @@ SHOW TOPICS;
9797
RELOAD TOPIC IF EXISTS topic_name as subquery;
9898
```
9999

100-
1. This syntax is supported since version 3.3.9 and is used to reload topics. It primarily addresses issues where the output results do not take effect after deleting or adding columns and tags in queries involving topic changes or tag lengths, as well as when selecting * to query subscriptions.
100+
1. This syntax is supported since version 3.4.0 and is used to reload topics. It primarily addresses issues where the output results do not take effect after deleting or adding columns and tags in queries involving topic changes or tag lengths, as well as when selecting * to query subscriptions.
101101
2. When it is necessary to change the schema of the subscription table structure, first stop consuming, then make the change, execute "reload topic", and then restart the subscription.
102102

103103
The above SQL statement displays information about all topics in the current database.

docs/en/08-operation/16-security.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ auditLevel = 3 // AUDIT_LEVEL_DATABASE
199199
| recalc stream | recalcStream | streamName | recalcName | SQL |
200200
| create topic | createTopic | topic's DB | name of the created topic | SQL |
201201
| drop topic | dropTopic | topic's DB | name of the deleted topic | SQL |
202-
| reload topic | reloadTopic | NULL | name of the deleted topic | SQL |
202+
| reload topic | reloadTopic | topic's DB | name of the topic | SQL |
203203
| create Rsma | createRsma | Rsma name | NULL | SQL |
204204
| alter Rsma | alterRsma | Rsma name | Table name | SQL |
205205
| drop Rsma | dropRsma | Rsma name | NULL | SQL |

docs/en/14-reference/03-taos-sql/40-tmq.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ CREATE TOPIC [IF NOT EXISTS] topic_name as subquery
2121
Subscribe through a `SELECT` statement (including `SELECT *`, or specific query subscriptions like `SELECT ts, c1`, which can include conditional filtering and scalar function calculations, but do not support aggregate functions or time window aggregation). It is important to note:
2222

2323
- Once this type of TOPIC is created, the structure of the subscribed data is fixed.
24-
- Columns or tags that are subscribed to or used for calculations cannot be deleted (`ALTER table DROP`) or modified (`ALTER table MODIFY`). From 3.4.0.0, you can modify and delete, but you need to reload the topic.
25-
- If there are changes to the table structure, new columns will not appear in the results.
24+
- Columns or tags that are subscribed to or used for calculations cannot be deleted (`ALTER table DROP`) or modified (`ALTER table MODIFY`). From 3.4.0.0, you can modify or delete or add, but you need to execute the command of "reload topic".
2625
- For `select *`, the subscription expands to include all columns at the time of creation (data columns for subtables and basic tables, data columns plus tag columns for supertables).
2726
- Query subscription on virtual tables is not supported.
27+
- The super table, sub-table, and regular table within the subquery can be deleted. After deletion, the subscribed data becomes empty. If the table is recreated after deletion, the subscribed data remains empty because the table ID has changed. If you want to subscribe to the new table data, you can reload the topic using the reload topic syntax
2828

2929
### Supertable topic
3030

@@ -81,7 +81,7 @@ SHOW TOPICS;
8181
RELOAD TOPIC IF EXISTS topic_name as subquery;
8282
```
8383

84-
- This syntax is supported since version 3.3.9 and is used to reload topics. It primarily addresses issues where the output results do not take effect after deleting or adding columns and tags in queries involving topic changes or tag lengths, as well as when selecting * to query subscriptions.
84+
- This syntax is supported since version 3.4.0 and is used to reload topics. It primarily addresses issues where the output results do not take effect after deleting or adding columns and tags in queries involving topic changes or tag lengths, as well as when selecting * to query subscriptions.
8585
- When it is necessary to change the schema of the subscription table structure, first stop consuming, then make the change, execute "reload topic", and then restart the subscription.
8686

8787
Displays information about all topics in the current database.

docs/zh/06-advanced/01-subscription/01-topic.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ CREATE TOPIC [IF NOT EXISTS] topic_name as subquery
2727
该 SQL 通过 SELECT 语句订阅(包括 SELECT *,或 SELECT ts, c1 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
2828

2929
1. 该类型 TOPIC 一旦创建则订阅数据的结构确定。
30-
2. 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、修改(ALTER table MODIFY)。(从 3.4.0.0 开始,可以修改,删除,但是需要重新 reload topic)。
31-
3. 若发生表结构变更,新增的列不出现在结果中
32-
4. 对于 select *,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
33-
5. 不支持虚拟表的查询订阅
30+
2. 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、修改(ALTER table MODIFY)。(从 3.4.0.0 开始,可以修改,删除,增加,但是需要重新 reload topic)。
31+
3. 对于 select *,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
32+
4. 不支持虚拟表的查询订阅
33+
5. subquery 里的超级表,子表,普通表可以被删除,删除后,订阅的数据为空。如果删除后重新建,订阅的数据仍然为空,因为表的 id 变了。如果想订阅到新的表数据,可以通过 reload topic 语法重新加载 topic 即可
3434

3535
假设需要订阅所有智能电表中电压值大于 200 的数据,且仅仅返回时间戳、电流、电压 3 个采集量(不返回相位),那么可以通过下面的 SQL 创建 power_topic 这个主题。
3636

@@ -90,7 +90,7 @@ SHOW TOPICS;
9090
RELOAD TOPIC IF EXISTS topic_name as subquery;
9191
```
9292

93-
1. 该语法从 3.3.9 版本开始支持,用于重新加载主题,主要解决查询主题里变更列或 tag 长度,以及 select * 查询订阅时,删除增加列和 tag 后,输出结果不生效问题。
93+
1. 该语法从 3.4.0 版本开始支持,用于重新加载主题,主要解决查询主题里变更列或 tag 长度,以及 select * 查询订阅时,删除增加列和 tag 后,输出结果不生效问题。
9494
2. 需要变更订阅表结构的 schema 时,先停止消费,然后变更,然后执行 reload topic,接着重新开始订阅即可。
9595

9696
上面的 SQL 会显示当前数据库下的所有主题的信息。

docs/zh/08-operation/16-security.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ auditLevel = 3 // AUDIT_LEVEL_DATABASE
193193
| recalc stream | recalcStream | streamName | recalcName | SQL |
194194
| create topic | createTopic | topic 所在 DB | 创建的 topic 名字 | SQL |
195195
| drop topic | dropTopic | topic 所在 DB | 删除的 topic 名字 | SQL |
196-
| reload topic | reloadTopic | NULL | 删除的 topic 名字 | SQL |
196+
| reload topic | reloadTopic | topic 所在 DB | topic 名字 | SQL |
197197
| create Rsma | createRsma | Rsma name | NULL | SQL |
198198
| alter Rsma | alterRsma | Rsma name | Table name | SQL |
199199
| drop Rsma | dropRsma | Rsma name | NULL | SQL |

docs/zh/14-reference/03-taos-sql/40-tmq.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ CREATE TOPIC [IF NOT EXISTS] topic_name as subquery
2323
通过 `SELECT` 语句订阅(包括 `SELECT *``SELECT ts, c1` 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
2424

2525
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
26-
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。(从 3.4.0.0 开始,可以修改,删除,但是需要重新 reload topic)。
27-
- 若发生表结构变更,新增的列不出现在结果中。
26+
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。(从 3.4.0.0 开始,可以修改,删除,增加,但是需要重新 reload topic)。
2827
- 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)。
2928
- 不支持虚拟表的查询订阅。
29+
- subquery 里的超级表,子表,普通表可以被删除,删除后,订阅的数据为空。如果删除后重新建,订阅的数据仍然为空,因为表的 id 变了。如果想订阅到新的表数据,可以通过 reload topic 语法重新加载 topic 即可。
3030

3131
### 超级表 topic
3232

@@ -83,7 +83,7 @@ SHOW TOPICS;
8383
RELOAD TOPIC IF EXISTS topic_name as subquery;
8484
```
8585

86-
1. 该语法从 3.3.9 版本开始支持,用于重新加载主题,主要解决查询主题里变更列或 tag 长度,以及 select * 查询订阅时,删除增加列和 tag 后,输出结果不生效问题。
86+
1. 该语法从 3.4.0 版本开始支持,用于重新加载主题,主要解决查询主题里变更列或 tag 长度,以及 select * 查询订阅时,删除增加列和 tag 后,输出结果不生效问题。
8787
2. 需要变更订阅表结构的 schema 时,先停止消费,然后变更,然后执行 reload topic,接着重新开始订阅即可。
8888

8989
显示当前数据库下的所有主题的信息。

source/dnode/mnode/impl/src/mndStb.c

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3164,25 +3164,6 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
31643164
TAOS_RETURN(code);
31653165
}
31663166

3167-
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
3168-
int32_t code = 0;
3169-
SSdb *pSdb = pMnode->pSdb;
3170-
void *pIter = NULL;
3171-
while (1) {
3172-
SMqTopicObj *pTopic = NULL;
3173-
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
3174-
if (pIter == NULL) break;
3175-
3176-
if (pTopic->stbUid == suid) {
3177-
sdbRelease(pSdb, pTopic);
3178-
sdbCancelFetch(pSdb, pIter);
3179-
TAOS_RETURN(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED);
3180-
}
3181-
sdbRelease(pSdb, pTopic);
3182-
}
3183-
TAOS_RETURN(code);
3184-
}
3185-
31863167
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
31873168
int32_t code = 0;
31883169
SSdb *pSdb = pMnode->pSdb;
@@ -3258,9 +3239,6 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
32583239
TAOS_CHECK_GOTO(
32593240
mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_TBL, pStb->ownerId, pDb->name, name.tname),
32603241
NULL, _OVER);
3261-
if ((code = mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid)) != 0) {
3262-
goto _OVER;
3263-
}
32643242

32653243
if (pDb->cfg.isMount) {
32663244
code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;

test/cases/uncatalog/system-test/7-tmq/test_tmq_ts6379.py renamed to test/cases/17-DataSubscription/02-Consume/test_tmq_ts6379.py

Lines changed: 155 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -376,14 +376,6 @@ def check_add_drop_col(self):
376376
print(f"index {index},data len: {len(data)}")
377377

378378
if index == 0:
379-
if len(data) != 1:
380-
tdLog.exit(f"error data index {index} len: {len(data)}")
381-
for element in data:
382-
print(element)
383-
if element[4] != 1 or element[-1] != 20:
384-
tdLog.exit(f"error: {element[4]} {element[-1]}")
385-
386-
if index == 1:
387379
if len(data) != 1:
388380
tdLog.exit(f"error data index {index} len: {len(data)}")
389381
for element in data:
@@ -608,6 +600,157 @@ def check_alter_col_name_bytes_normal_table(self):
608600
finally:
609601
consumer.close()
610602

603+
def check_drop_stable_child_topic(self):
604+
func_name = self.get_function_name()
605+
print(f"start to excute {func_name}")
606+
tdSql.execute(f'create database db_stable vgroups 1')
607+
tdSql.execute(f'use db_stable')
608+
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
609+
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
610+
611+
tdSql.execute(f'create topic t2 as select * from t1')
612+
time.sleep(1)
613+
tdSql.execute(f'drop table st')
614+
615+
consumer_dict = {
616+
"group.id": "g1",
617+
"td.connect.user": "root",
618+
"td.connect.pass": "taosdata",
619+
"auto.offset.reset": "earliest",
620+
}
621+
consumer1 = Consumer(consumer_dict)
622+
623+
try:
624+
consumer1.subscribe(["t2"])
625+
except TmqError:
626+
tdLog.exit(f"subscribe error")
627+
628+
try:
629+
while True:
630+
res = consumer1.poll(1)
631+
if not res:
632+
break
633+
else:
634+
tdLog.exit(f"error: should not get data after drop table")
635+
636+
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
637+
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
638+
tdSql.execute(f"reload topic if exists t2 as select * from t1")
639+
640+
time.sleep(1)
641+
tdSql.execute(f'insert into t1 values(now, 1) (now+1s, 2)')
642+
643+
while True:
644+
res = consumer1.poll(1)
645+
if not res:
646+
tdLog.exit(f"error: should get data after new table")
647+
else:
648+
print(res)
649+
break
650+
finally:
651+
consumer1.close()
652+
653+
def check_drop_stable_stable_topic(self):
654+
func_name = self.get_function_name()
655+
print(f"start to excute {func_name}")
656+
tdSql.execute(f'drop topic if exists t2')
657+
tdSql.execute(f'drop database if exists db_stable')
658+
tdSql.execute(f'create database db_stable vgroups 1')
659+
tdSql.execute(f'use db_stable')
660+
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
661+
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
662+
663+
tdSql.execute(f'create topic t3 as select * from st')
664+
time.sleep(1)
665+
tdSql.execute(f'drop table st')
666+
667+
consumer_dict = {
668+
"group.id": "g1",
669+
"td.connect.user": "root",
670+
"td.connect.pass": "taosdata",
671+
"auto.offset.reset": "earliest",
672+
}
673+
consumer1 = Consumer(consumer_dict)
674+
675+
try:
676+
consumer1.subscribe(["t3"])
677+
except TmqError:
678+
tdLog.exit(f"subscribe error")
679+
680+
try:
681+
while True:
682+
res = consumer1.poll(1)
683+
if not res:
684+
break
685+
else:
686+
tdLog.exit(f"error: should not get data after drop table")
687+
688+
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
689+
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
690+
tdSql.execute(f"reload topic if exists t3 as select * from st")
691+
692+
time.sleep(1)
693+
tdSql.execute(f'insert into t1 values(now, 1) (now+1s, 2)')
694+
695+
while True:
696+
res = consumer1.poll(1)
697+
if not res:
698+
tdLog.exit(f"error: should get data after new table")
699+
else:
700+
print(res)
701+
break
702+
finally:
703+
consumer1.close()
704+
705+
def check_drop_ctable(self):
706+
func_name = self.get_function_name()
707+
print(f"start to excute {func_name}")
708+
tdSql.execute(f'create database db_ctable vgroups 1')
709+
tdSql.execute(f'use db_ctable')
710+
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
711+
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
712+
713+
tdSql.execute(f'create topic t1 as select * from t1')
714+
time.sleep(1)
715+
tdSql.execute(f'drop table t1')
716+
717+
consumer_dict = {
718+
"group.id": "g1",
719+
"td.connect.user": "root",
720+
"td.connect.pass": "taosdata",
721+
"auto.offset.reset": "earliest",
722+
}
723+
consumer1 = Consumer(consumer_dict)
724+
725+
try:
726+
consumer1.subscribe(["t1"])
727+
except TmqError:
728+
tdLog.exit(f"subscribe error")
729+
730+
try:
731+
while True:
732+
res = consumer1.poll(1)
733+
if not res:
734+
break
735+
else:
736+
tdLog.exit(f"error: should not get data after drop table")
737+
738+
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
739+
tdSql.execute(f"reload topic if exists t1 as select * from t1")
740+
741+
time.sleep(1)
742+
tdSql.execute(f'insert into t1 values(now, 1) (now+1s, 2)')
743+
744+
while True:
745+
res = consumer1.poll(1)
746+
if not res:
747+
tdLog.exit(f"error: should get data after new table")
748+
else:
749+
print(res)
750+
break
751+
finally:
752+
consumer1.close()
753+
611754
def test_tmq_ts6379(self):
612755
"""summary: xxx
613756
@@ -627,6 +770,10 @@ def test_tmq_ts6379(self):
627770
- xxx
628771
629772
"""
773+
self.check_drop_ctable()
774+
self.check_drop_stable_child_topic()
775+
self.check_drop_stable_stable_topic()
776+
630777
tdSql.execute(f'create database if not exists db vgroups 1')
631778
tdSql.execute(f'alter dnode 1 "debugflag 143"')
632779

test/ci/cases.task

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,8 @@
661661
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_stb_tag_filter.py
662662
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_tsdb_and_wal.py -N 3 --replica 3
663663
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_tsdb_wal_multi_ctb.py -N 3 --replica 3
664-
#,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_taosx.py
664+
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_ts6379.py
665+
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_taosx.py
665666
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_meta_json.py
666667
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_seek_commit.py
667668
,,y,.,./ci/pytest.sh pytest cases/17-DataSubscription/02-Consume/test_tmq_vnode_split_column.py -N 3 --replica 3

0 commit comments

Comments
 (0)