Skip to content

Commit e01f829

Browse files
Simon9997Copilot
andcommitted
feat: [6598098782] support FILL(NONE/NULL/VALUE) for indefRows functions in interval queries
Allow indefRows functions (csum, diff, derivative, mavg, statecount, stateduration) to work with FILL clause in interval queries. Changes: - parser: lift restriction blocking FILL with indefRows functions - planner: propagate indefRowsMode flag through plan nodes - nodes: serialize/deserialize/clone indefRowsMode field - executor: implement multi-row window pass-through in fill engine, reset indefWindowActive on fill reset, validate fill values Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 1ae4504 commit e01f829

10 files changed

Lines changed: 94 additions & 15 deletions

File tree

include/libs/nodes/plannodes.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ typedef struct SFillLogicNode {
411411
SNodeList* pFillNullExprs;
412412
// duration expression for surrounding_time (only for PREV/NEXT/NEAR)
413413
SNode* pSurroundingTime;
414+
bool indefRowsMode;
414415
} SFillLogicNode;
415416

416417
typedef struct SSortLogicNode {
@@ -815,6 +816,7 @@ typedef struct SFillPhysiNode {
815816
SNodeList* pFillNullExprs;
816817
// duration expression for surrounding_time (only for PREV/NEXT/NEAR)
817818
SNode* pSurroundingTime;
819+
bool indefRowsMode;
818820
} SFillPhysiNode;
819821

820822
typedef struct SMultiTableIntervalPhysiNode {

source/libs/executor/inc/tfill.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ typedef struct SFillInfo {
101101
SArray* pColFillProgress;
102102
int64_t surroundingTime; // surrounding time for fill PREV/NEXT
103103
bool ascNextOrDescPrev;
104+
bool indefRowsMode;
105+
bool indefWindowActive;
104106
} SFillInfo;
105107

106108
typedef struct SResultCellData {
@@ -162,7 +164,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols,
162164
int32_t fillType, struct SFillColInfo* pCol,
163165
int32_t primaryTsSlotId, int32_t order,
164166
const char* id, SExecTaskInfo* pTaskInfo,
165-
int64_t surroundingTime, SFillInfo** ppFillInfo);
167+
int64_t surroundingTime, bool indefRowsMode,
168+
SFillInfo** ppFillInfo);
166169

167170
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
168171
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity, bool *wantMoreBlock);

source/libs/executor/src/filloperator.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr,
363363
STimeWindow win, int32_t capacity, const char* id,
364364
SInterval* pInterval, int32_t fillType,
365365
int32_t order, SExecTaskInfo* pTaskInfo,
366-
int64_t surroundingTime) {
366+
int64_t surroundingTime, bool indefRowsMode) {
367367
SFillColInfo* pColInfo =
368368
createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols,
369369
pFillNullExpr, numOfFillNullExprs, pValNode);
@@ -381,7 +381,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr,
381381
numOfFillNullExprs, capacity, pInterval,
382382
fillType, pColInfo, pInfo->primaryTsCol,
383383
order, id, pTaskInfo, surroundingTime,
384-
&pInfo->pFillInfo);
384+
indefRowsMode, &pInfo->pFillInfo);
385385
if (code != TSDB_CODE_SUCCESS) {
386386
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
387387
return code;
@@ -590,7 +590,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
590590
(SNodeListNode*)pPhyFillNode->pValues,
591591
pPhyFillNode->timeRange, pResultInfo->capacity,
592592
pTaskInfo->id.str, pInterval, type, order, pTaskInfo,
593-
surroundingTime);
593+
surroundingTime, pPhyFillNode->indefRowsMode);
594594
if (code != TSDB_CODE_SUCCESS) {
595595
goto _error;
596596
}

source/libs/executor/src/tfill.c

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols,
320320
int32_t fillType, struct SFillColInfo* pCol,
321321
int32_t primaryTsSlotId, int32_t order,
322322
const char* id, SExecTaskInfo* pTaskInfo,
323-
int64_t surroundingTime, SFillInfo** ppFillInfo) {
323+
int64_t surroundingTime, bool indefRowsMode,
324+
SFillInfo** ppFillInfo) {
324325
int32_t code = TSDB_CODE_SUCCESS;
325326
int32_t lino = 0;
326327
if (fillType == TSDB_FILL_NONE) {
@@ -384,6 +385,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols,
384385
QUERY_CHECK_CODE(code, lino, _end);
385386

386387
pFillInfo->pTaskInfo = pTaskInfo;
388+
pFillInfo->indefRowsMode = indefRowsMode;
389+
pFillInfo->indefWindowActive = false;
387390

388391
_end:
389392
if (code != TSDB_CODE_SUCCESS) {
@@ -402,6 +405,7 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
402405
pFillInfo->numOfRows = 0;
403406
pFillInfo->numOfCurrent = 0;
404407
pFillInfo->numOfTotal = 0;
408+
pFillInfo->indefWindowActive = false;
405409
}
406410

407411
void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
@@ -1029,6 +1033,15 @@ int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBl
10291033

10301034
// if all blocks are consumed, we have to fill for not filled cols
10311035
if (pFillInfo->numOfRows == 0) {
1036+
if (pFillInfo->indefRowsMode && pFillInfo->indefWindowActive) {
1037+
const SInterval* pInterval = &pFillInfo->interval;
1038+
pFillInfo->currentKey =
1039+
taosTimeAdd(pFillInfo->currentKey,
1040+
pInterval->sliding *
1041+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1042+
pInterval->slidingUnit, pInterval->precision, NULL);
1043+
pFillInfo->indefWindowActive = false;
1044+
}
10321045
if (!pFillBlock) {
10331046
code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
10341047
QUERY_CHECK_CODE(code, lino, _end);
@@ -1070,6 +1083,16 @@ int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBl
10701083
QUERY_CHECK_CODE(code, lino, _end);
10711084

10721085
if (blockCurTs != fillCurTs || !pFillInfo->pSrcBlock) {
1086+
if (pFillInfo->indefRowsMode && pFillInfo->indefWindowActive) {
1087+
const SInterval* pInterval = &pFillInfo->interval;
1088+
pFillInfo->currentKey =
1089+
taosTimeAdd(pFillInfo->currentKey,
1090+
pInterval->sliding *
1091+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1092+
pInterval->slidingUnit, pInterval->precision, NULL);
1093+
pFillInfo->indefWindowActive = false;
1094+
continue;
1095+
}
10731096
doFillOneRow(pFillInfo, pFillBlock->pBlock, blockCurTs, false);
10741097
} else {
10751098
for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
@@ -1105,15 +1128,37 @@ int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBl
11051128
}
11061129
tryResetColNextPrev(pFillInfo, colIdx);
11071130
}
1108-
const SInterval* pInterval = &pFillInfo->interval;
1109-
pFillInfo->currentKey =
1110-
taosTimeAdd(pFillInfo->currentKey,
1111-
pInterval->sliding *
1112-
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1113-
pInterval->slidingUnit, pInterval->precision, NULL);
11141131
pFillBlock->pBlock->info.rows += 1;
11151132
pFillInfo->index += 1;
11161133
pFillInfo->numOfCurrent += 1;
1134+
1135+
if (pFillInfo->indefRowsMode) {
1136+
pFillInfo->indefWindowActive = true;
1137+
bool lastRowOfWindow = true;
1138+
if (pFillInfo->index < pFillInfo->numOfRows) {
1139+
TSKEY nextTs = getBlockCurTs(pFillInfo->pSrcBlock, pFillInfo->index,
1140+
pFillInfo->srcTsSlotId);
1141+
if (nextTs == fillCurTs) {
1142+
lastRowOfWindow = false;
1143+
}
1144+
}
1145+
if (lastRowOfWindow && pFillInfo->index < pFillInfo->numOfRows) {
1146+
const SInterval* pInterval = &pFillInfo->interval;
1147+
pFillInfo->currentKey =
1148+
taosTimeAdd(pFillInfo->currentKey,
1149+
pInterval->sliding *
1150+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1151+
pInterval->slidingUnit, pInterval->precision, NULL);
1152+
pFillInfo->indefWindowActive = false;
1153+
}
1154+
} else {
1155+
const SInterval* pInterval = &pFillInfo->interval;
1156+
pFillInfo->currentKey =
1157+
taosTimeAdd(pFillInfo->currentKey,
1158+
pInterval->sliding *
1159+
GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
1160+
pInterval->slidingUnit, pInterval->precision, NULL);
1161+
}
11171162
}
11181163
tryExtractReadyBlocks(pFillInfo, pDstBlock, capacity);
11191164
}

source/libs/nodes/src/nodesCloneFuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
833833
CLONE_NODE_FIELD(pTimeRange);
834834
CLONE_NODE_LIST_FIELD(pFillNullExprs);
835835
CLONE_NODE_FIELD(pSurroundingTime);
836+
COPY_SCALAR_FIELD(indefRowsMode);
836837
return TSDB_CODE_SUCCESS;
837838
}
838839

source/libs/nodes/src/nodesCodeFuncs.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3657,6 +3657,7 @@ static const char* jkFillPhysiPlanEndTime = "EndTime";
36573657
static const char* jkFillPhysiPlanFillNullExprs = "FillNullExprs";
36583658
static const char* jkFillPhysiPlanFillTimeRangeExpr = "TimeRangeExpr";
36593659
static const char* jkFillPhysiPlanSurroundingTime = "SurroundingTime";
3660+
static const char* jkFillPhysiPlanIndefRowsMode = "IndefRowsMode";
36603661

36613662
static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
36623663
const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj;
@@ -3693,6 +3694,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
36933694
code = tjsonAddObject(pJson, jkFillPhysiPlanSurroundingTime, nodeToJson,
36943695
pNode->pSurroundingTime);
36953696
}
3697+
if (TSDB_CODE_SUCCESS == code) {
3698+
code = tjsonAddBoolToObject(pJson, jkFillPhysiPlanIndefRowsMode, pNode->indefRowsMode);
3699+
}
36963700

36973701
return code;
36983702
}
@@ -3732,6 +3736,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
37323736
code = jsonToNodeObject(pJson, jkFillPhysiPlanSurroundingTime,
37333737
&pNode->pSurroundingTime);
37343738
}
3739+
if (TSDB_CODE_SUCCESS == code) {
3740+
code = tjsonGetBoolValue(pJson, jkFillPhysiPlanIndefRowsMode, &pNode->indefRowsMode);
3741+
}
37353742

37363743
return code;
37373744
}

source/libs/nodes/src/nodesMsgFuncs.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3890,6 +3890,7 @@ enum {
38903890
PHY_FILL_CODE_FILL_NULL_EXPRS,
38913891
PHY_FILL_CODE_TIME_RANGE_EXPR,
38923892
PHY_FILL_CODE_SURROUNDING_TIME,
3893+
PHY_FILL_CODE_INDEF_ROWS_MODE,
38933894
};
38943895

38953896
static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@@ -3924,6 +3925,9 @@ static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
39243925
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_SURROUNDING_TIME, nodeToMsg,
39253926
pNode->pSurroundingTime);
39263927
}
3928+
if (TSDB_CODE_SUCCESS == code) {
3929+
code = tlvEncodeBool(pEncoder, PHY_FILL_CODE_INDEF_ROWS_MODE, pNode->indefRowsMode);
3930+
}
39273931
return code;
39283932
}
39293933

@@ -3964,6 +3968,9 @@ static int32_t msgToPhysiFillNode(STlvDecoder* pDecoder, void* pObj) {
39643968
case PHY_FILL_CODE_SURROUNDING_TIME:
39653969
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSurroundingTime);
39663970
break;
3971+
case PHY_FILL_CODE_INDEF_ROWS_MODE:
3972+
code = tlvDecodeBool(pTlv, &pNode->indefRowsMode);
3973+
break;
39673974
default:
39683975
break;
39693976
}

source/libs/parser/src/parTranslater.c

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3645,9 +3645,16 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod
36453645
pFunc->functionName);
36463646
}
36473647
if (hasFillClause(pCxt->pCurrStmt)) {
3648-
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
3649-
"FILL is not supported with indefinite rows function '%s' in window query",
3650-
pFunc->functionName);
3648+
SSelectStmt* pSel = (SSelectStmt*)pCxt->pCurrStmt;
3649+
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSel->pWindow;
3650+
SFillNode* pFillNode = (SFillNode*)pInterval->pFill;
3651+
if (pFillNode->mode != FILL_MODE_NONE &&
3652+
pFillNode->mode != FILL_MODE_NULL &&
3653+
pFillNode->mode != FILL_MODE_VALUE) {
3654+
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC,
3655+
"Only FILL(NONE/NULL/VALUE) is supported with indefinite rows function '%s'",
3656+
pFunc->functionName);
3657+
}
36513658
}
36523659
if (hasInvalidFuncNesting(pFunc)) {
36533660
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AGG_FUNC_NESTING);
@@ -7844,7 +7851,8 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
78447851
}
78457852

78467853
static EDealRes needFillImpl(SNode* pNode, void* pContext) {
7847-
if ((isAggFunc(pNode) || isInterpFunc(pNode)) && FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pNode)->funcType &&
7854+
if ((isAggFunc(pNode) || isInterpFunc(pNode) || isIndefiniteRowsFunc(pNode)) &&
7855+
FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pNode)->funcType &&
78487856
FUNCTION_TYPE_GROUP_CONST_VALUE != ((SFunctionNode*)pNode)->funcType) {
78497857
*(bool*)pContext = true;
78507858
return DEAL_RES_END;

source/libs/planner/src/planLogicCreater.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2983,6 +2983,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
29832983
return TSDB_CODE_SUCCESS;
29842984
}
29852985

2986+
bool isIndefRows = (NULL != pCxt->pCurrRoot &&
2987+
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pCxt->pCurrRoot) &&
2988+
((SWindowLogicNode*)pCxt->pCurrRoot)->indefRowsFunc);
2989+
29862990
SFillLogicNode* pFill = NULL;
29872991
int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILL, (SNode**)&pFill);
29882992
if (NULL == pFill) {
@@ -3012,6 +3016,7 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
30123016
}
30133017

30143018
pFill->mode = pFillNode->mode;
3019+
pFill->indefRowsMode = isIndefRows;
30153020
pFill->timeRange = pFillNode->timeRange;
30163021
TSWAP(pFill->pTimeRange, pFillNode->pTimeRange);
30173022
pFill->pValues = NULL;

source/libs/planner/src/planPhysiCreater.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3194,6 +3194,7 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
31943194
}
31953195

31963196
pFill->mode = pFillNode->mode;
3197+
pFill->indefRowsMode = pFillNode->indefRowsMode;
31973198
pFill->timeRange = pFillNode->timeRange;
31983199
TSWAP(pFill->pTimeRange, pFillNode->pTimeRange);
31993200
pFill->node.inputTsOrder = pFillNode->node.inputTsOrder;

0 commit comments

Comments
 (0)