Skip to content

Commit 5972c20

Browse files
Simon9997Copilot
andcommitted
feat: [6598098782] support FILL(NONE/NULL/VALUE) for indefRows functions in interval queries
Allow indefRows functions (csum, diff, derivative, mavg, statecount, stateduration) to work with FILL clause in interval queries. Changes: - parser: lift restriction blocking FILL with indefRows functions - planner: propagate indefRowsMode flag through plan nodes - nodes: serialize/deserialize/clone indefRowsMode field - executor: implement multi-row window pass-through in fill engine, reset indefWindowActive on fill reset, validate fill values Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 1ae4504 commit 5972c20

File tree

14 files changed

+3006
-15
lines changed

14 files changed

+3006
-15
lines changed

docs/en/14-reference/03-taos-sql/20-select.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ The differences between NULL, NULL_F, VALUE, VALUE_F filling modes for different
371371

372372
1. When using the FILL statement, a large amount of fill output may be generated, so be sure to specify the query time range. For each query, the system can return up to 10 million results with interpolation.
373373
2. FILL has continuity, if only the first value in a column is not NULL, then fill(prev) will fill all subsequent rows with that value.
374+
3. When using time-series specific functions (such as CSUM, DIFF, DERIVATIVE, MAVG, STATECOUNT, STATEDURATION, IRATE, TWA, LAG, LEAD, FILL_FORWARD) with INTERVAL queries, only FILL(NONE), FILL(NULL), FILL(NULL_F), FILL(VALUE), and FILL(VALUE_F) are supported. FILL(PREV), FILL(NEXT), FILL(LINEAR), and FILL(NEAR) are not supported with these functions.
374375

375376
:::
376377

docs/zh/14-reference/03-taos-sql/20-select.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ NULL、NULL_F、VALUE、VALUE_F 这几种填充模式针对不同场景区别如
373373

374374
1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。
375375
2. FILL 具有连续性,例如一列数据中仅第一条不为 NULL,则 FILL(PREV) 会为后续所有行填充该值。
376+
3. 当时序函数(如 CSUM、DIFF、DERIVATIVE、MAVG、STATECOUNT、STATEDURATION、IRATE、TWA、LAG、LEAD、FILL_FORWARD)与 INTERVAL 查询一起使用时,仅支持 FILL(NONE)、FILL(NULL)、FILL(NULL_F)、FILL(VALUE) 和 FILL(VALUE_F)。不支持 FILL(PREV)、FILL(NEXT)、FILL(LINEAR) 和 FILL(NEAR)。
376377

377378
:::
378379

include/libs/nodes/plannodes.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ typedef struct SFillLogicNode {
411411
SNodeList* pFillNullExprs;
412412
// duration expression for surrounding_time (only for PREV/NEXT/NEAR)
413413
SNode* pSurroundingTime;
414+
bool indefRowsMode;
414415
} SFillLogicNode;
415416

416417
typedef struct SSortLogicNode {
@@ -815,6 +816,7 @@ typedef struct SFillPhysiNode {
815816
SNodeList* pFillNullExprs;
816817
// duration expression for surrounding_time (only for PREV/NEXT/NEAR)
817818
SNode* pSurroundingTime;
819+
bool indefRowsMode;
818820
} SFillPhysiNode;
819821

820822
typedef struct SMultiTableIntervalPhysiNode {

source/libs/executor/inc/tfill.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ typedef struct SFillInfo {
101101
SArray* pColFillProgress;
102102
int64_t surroundingTime; // surrounding time for fill PREV/NEXT
103103
bool ascNextOrDescPrev;
104+
bool indefRowsMode;
105+
bool indefWindowActive;
104106
} SFillInfo;
105107

106108
typedef struct SResultCellData {
@@ -162,7 +164,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols,
162164
int32_t fillType, struct SFillColInfo* pCol,
163165
int32_t primaryTsSlotId, int32_t order,
164166
const char* id, SExecTaskInfo* pTaskInfo,
165-
int64_t surroundingTime, SFillInfo** ppFillInfo);
167+
int64_t surroundingTime, bool indefRowsMode,
168+
SFillInfo** ppFillInfo);
166169

167170
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
168171
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity, bool *wantMoreBlock);

source/libs/executor/src/filloperator.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr,
363363
STimeWindow win, int32_t capacity, const char* id,
364364
SInterval* pInterval, int32_t fillType,
365365
int32_t order, SExecTaskInfo* pTaskInfo,
366-
int64_t surroundingTime) {
366+
int64_t surroundingTime, bool indefRowsMode) {
367367
SFillColInfo* pColInfo =
368368
createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols,
369369
pFillNullExpr, numOfFillNullExprs, pValNode);
@@ -381,7 +381,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr,
381381
numOfFillNullExprs, capacity, pInterval,
382382
fillType, pColInfo, pInfo->primaryTsCol,
383383
order, id, pTaskInfo, surroundingTime,
384-
&pInfo->pFillInfo);
384+
indefRowsMode, &pInfo->pFillInfo);
385385
if (code != TSDB_CODE_SUCCESS) {
386386
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
387387
return code;
@@ -590,7 +590,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
590590
(SNodeListNode*)pPhyFillNode->pValues,
591591
pPhyFillNode->timeRange, pResultInfo->capacity,
592592
pTaskInfo->id.str, pInterval, type, order, pTaskInfo,
593-
surroundingTime);
593+
surroundingTime, pPhyFillNode->indefRowsMode);
594594
if (code != TSDB_CODE_SUCCESS) {
595595
goto _error;
596596
}

source/libs/executor/src/tfill.c

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols,
320320
int32_t fillType, struct SFillColInfo* pCol,
321321
int32_t primaryTsSlotId, int32_t order,
322322
const char* id, SExecTaskInfo* pTaskInfo,
323-
int64_t surroundingTime, SFillInfo** ppFillInfo) {
323+
int64_t surroundingTime, bool indefRowsMode,
324+
SFillInfo** ppFillInfo) {
324325
int32_t code = TSDB_CODE_SUCCESS;
325326
int32_t lino = 0;
326327
if (fillType == TSDB_FILL_NONE) {
@@ -384,6 +385,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols,
384385
QUERY_CHECK_CODE(code, lino, _end);
385386

386387
pFillInfo->pTaskInfo = pTaskInfo;
388+
pFillInfo->indefRowsMode = indefRowsMode;
389+
pFillInfo->indefWindowActive = false;
387390

388391
_end:
389392
if (code != TSDB_CODE_SUCCESS) {
@@ -402,6 +405,7 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
402405
pFillInfo->numOfRows = 0;
403406
pFillInfo->numOfCurrent = 0;
404407
pFillInfo->numOfTotal = 0;
408+
pFillInfo->indefWindowActive = false;
405409
}
406410

407411
void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
@@ -1029,6 +1033,15 @@ int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBl
10291033

10301034
// if all blocks are consumed, we have to fill for not filled cols
10311035
if (pFillInfo->numOfRows == 0) {
1036+
if (pFillInfo->indefRowsMode && pFillInfo->indefWindowActive) {
1037+
const SInterval* pInterval = &pFillInfo->interval;
1038+
pFillInfo->currentKey =
1039+
taosTimeAdd(pFillInfo->currentKey,
1040+
pInterval->sliding *
1041+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1042+
pInterval->slidingUnit, pInterval->precision, NULL);
1043+
pFillInfo->indefWindowActive = false;
1044+
}
10321045
if (!pFillBlock) {
10331046
code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
10341047
QUERY_CHECK_CODE(code, lino, _end);
@@ -1070,6 +1083,16 @@ int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBl
10701083
QUERY_CHECK_CODE(code, lino, _end);
10711084

10721085
if (blockCurTs != fillCurTs || !pFillInfo->pSrcBlock) {
1086+
if (pFillInfo->indefRowsMode && pFillInfo->indefWindowActive) {
1087+
const SInterval* pInterval = &pFillInfo->interval;
1088+
pFillInfo->currentKey =
1089+
taosTimeAdd(pFillInfo->currentKey,
1090+
pInterval->sliding *
1091+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1092+
pInterval->slidingUnit, pInterval->precision, NULL);
1093+
pFillInfo->indefWindowActive = false;
1094+
continue;
1095+
}
10731096
doFillOneRow(pFillInfo, pFillBlock->pBlock, blockCurTs, false);
10741097
} else {
10751098
for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
@@ -1105,15 +1128,37 @@ int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBl
11051128
}
11061129
tryResetColNextPrev(pFillInfo, colIdx);
11071130
}
1108-
const SInterval* pInterval = &pFillInfo->interval;
1109-
pFillInfo->currentKey =
1110-
taosTimeAdd(pFillInfo->currentKey,
1111-
pInterval->sliding *
1112-
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1113-
pInterval->slidingUnit, pInterval->precision, NULL);
11141131
pFillBlock->pBlock->info.rows += 1;
11151132
pFillInfo->index += 1;
11161133
pFillInfo->numOfCurrent += 1;
1134+
1135+
if (pFillInfo->indefRowsMode) {
1136+
pFillInfo->indefWindowActive = true;
1137+
bool lastRowOfWindow = true;
1138+
if (pFillInfo->index < pFillInfo->numOfRows) {
1139+
TSKEY nextTs = getBlockCurTs(pFillInfo->pSrcBlock, pFillInfo->index,
1140+
pFillInfo->srcTsSlotId);
1141+
if (nextTs == fillCurTs) {
1142+
lastRowOfWindow = false;
1143+
}
1144+
}
1145+
if (lastRowOfWindow && pFillInfo->index < pFillInfo->numOfRows) {
1146+
const SInterval* pInterval = &pFillInfo->interval;
1147+
pFillInfo->currentKey =
1148+
taosTimeAdd(pFillInfo->currentKey,
1149+
pInterval->sliding *
1150+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1151+
pInterval->slidingUnit, pInterval->precision, NULL);
1152+
pFillInfo->indefWindowActive = false;
1153+
}
1154+
} else {
1155+
const SInterval* pInterval = &pFillInfo->interval;
1156+
pFillInfo->currentKey =
1157+
taosTimeAdd(pFillInfo->currentKey,
1158+
pInterval->sliding *
1159+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1160+
pInterval->slidingUnit, pInterval->precision, NULL);
1161+
}
11171162
}
11181163
tryExtractReadyBlocks(pFillInfo, pDstBlock, capacity);
11191164
}

source/libs/nodes/src/nodesCloneFuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
833833
CLONE_NODE_FIELD(pTimeRange);
834834
CLONE_NODE_LIST_FIELD(pFillNullExprs);
835835
CLONE_NODE_FIELD(pSurroundingTime);
836+
COPY_SCALAR_FIELD(indefRowsMode);
836837
return TSDB_CODE_SUCCESS;
837838
}
838839

source/libs/nodes/src/nodesCodeFuncs.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3657,6 +3657,7 @@ static const char* jkFillPhysiPlanEndTime = "EndTime";
36573657
static const char* jkFillPhysiPlanFillNullExprs = "FillNullExprs";
36583658
static const char* jkFillPhysiPlanFillTimeRangeExpr = "TimeRangeExpr";
36593659
static const char* jkFillPhysiPlanSurroundingTime = "SurroundingTime";
3660+
static const char* jkFillPhysiPlanIndefRowsMode = "IndefRowsMode";
36603661

36613662
static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
36623663
const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj;
@@ -3693,6 +3694,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
36933694
code = tjsonAddObject(pJson, jkFillPhysiPlanSurroundingTime, nodeToJson,
36943695
pNode->pSurroundingTime);
36953696
}
3697+
if (TSDB_CODE_SUCCESS == code) {
3698+
code = tjsonAddBoolToObject(pJson, jkFillPhysiPlanIndefRowsMode, pNode->indefRowsMode);
3699+
}
36963700

36973701
return code;
36983702
}
@@ -3732,6 +3736,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
37323736
code = jsonToNodeObject(pJson, jkFillPhysiPlanSurroundingTime,
37333737
&pNode->pSurroundingTime);
37343738
}
3739+
if (TSDB_CODE_SUCCESS == code) {
3740+
code = tjsonGetBoolValue(pJson, jkFillPhysiPlanIndefRowsMode, &pNode->indefRowsMode);
3741+
}
37353742

37363743
return code;
37373744
}

source/libs/nodes/src/nodesMsgFuncs.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3890,6 +3890,7 @@ enum {
38903890
PHY_FILL_CODE_FILL_NULL_EXPRS,
38913891
PHY_FILL_CODE_TIME_RANGE_EXPR,
38923892
PHY_FILL_CODE_SURROUNDING_TIME,
3893+
PHY_FILL_CODE_INDEF_ROWS_MODE,
38933894
};
38943895

38953896
static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@@ -3924,6 +3925,9 @@ static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
39243925
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_SURROUNDING_TIME, nodeToMsg,
39253926
pNode->pSurroundingTime);
39263927
}
3928+
if (TSDB_CODE_SUCCESS == code) {
3929+
code = tlvEncodeBool(pEncoder, PHY_FILL_CODE_INDEF_ROWS_MODE, pNode->indefRowsMode);
3930+
}
39273931
return code;
39283932
}
39293933

@@ -3964,6 +3968,9 @@ static int32_t msgToPhysiFillNode(STlvDecoder* pDecoder, void* pObj) {
39643968
case PHY_FILL_CODE_SURROUNDING_TIME:
39653969
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSurroundingTime);
39663970
break;
3971+
case PHY_FILL_CODE_INDEF_ROWS_MODE:
3972+
code = tlvDecodeBool(pTlv, &pNode->indefRowsMode);
3973+
break;
39673974
default:
39683975
break;
39693976
}

source/libs/parser/src/parTranslater.c

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3645,9 +3645,18 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod
36453645
pFunc->functionName);
36463646
}
36473647
if (hasFillClause(pCxt->pCurrStmt)) {
3648-
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
3649-
"FILL is not supported with indefinite rows function '%s' in window query",
3650-
pFunc->functionName);
3648+
SSelectStmt* pSel = (SSelectStmt*)pCxt->pCurrStmt;
3649+
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSel->pWindow;
3650+
SFillNode* pFillNode = (SFillNode*)pInterval->pFill;
3651+
if (pFillNode->mode != FILL_MODE_NONE &&
3652+
pFillNode->mode != FILL_MODE_NULL &&
3653+
pFillNode->mode != FILL_MODE_NULL_F &&
3654+
pFillNode->mode != FILL_MODE_VALUE &&
3655+
pFillNode->mode != FILL_MODE_VALUE_F) {
3656+
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC,
3657+
"Only FILL(NONE/NULL/NULL_F/VALUE/VALUE_F) is supported with indefinite rows function '%s'",
3658+
pFunc->functionName);
3659+
}
36513660
}
36523661
if (hasInvalidFuncNesting(pFunc)) {
36533662
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AGG_FUNC_NESTING);
@@ -7844,7 +7853,8 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
78447853
}
78457854

78467855
static EDealRes needFillImpl(SNode* pNode, void* pContext) {
7847-
if ((isAggFunc(pNode) || isInterpFunc(pNode)) && FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pNode)->funcType &&
7856+
if ((isAggFunc(pNode) || isInterpFunc(pNode) || isIndefiniteRowsFunc(pNode)) &&
7857+
FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pNode)->funcType &&
78487858
FUNCTION_TYPE_GROUP_CONST_VALUE != ((SFunctionNode*)pNode)->funcType) {
78497859
*(bool*)pContext = true;
78507860
return DEAL_RES_END;

0 commit comments

Comments
 (0)