@@ -3618,6 +3618,8 @@ static int32_t stRealtimeContextInit(SSTriggerRealtimeContext *pContext, SStream
36183618
36193619 pContext->pSlices = tSimpleHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
36203620 QUERY_CHECK_NULL(pContext->pSlices, code, lino, _end, terrno);
3621+ code = taosObjListInit(&pContext->dumpTableUids, &pContext->tableUidPool);
3622+ QUERY_CHECK_CODE(code, lino, _end);
36213623 if (pTask->isVirtualTable) {
36223624 code = stRealtimeContextInitPatchContext(pContext);
36233625 QUERY_CHECK_CODE(code, lino, _end);
@@ -3810,6 +3812,7 @@ static void stRealtimeContextDestroy(void *ptr) {
38103812 tSimpleHashCleanup(pContext->pSlices);
38113813 pContext->pSlices = NULL;
38123814 }
3815+ taosObjListClear(&pContext->dumpTableUids);
38133816 stRealtimeContextDestroyPatchContext(pContext);
38143817
38153818 if (pContext->pSorter != NULL) {
@@ -4340,7 +4343,7 @@ static int32_t stRealtimeContextSendPullReq(SSTriggerRealtimeContext *pContext,
43404343
43414344// send STRIGGER_PULL_GROUP_COL_VALUE for given (pProgress, gid); used when pulling groupInfo for pending create-table
43424345static int32_t stRealtimeContextSendPullReqForGid(SSTriggerRealtimeContext *pContext, SSTriggerWalProgress *pProgress,
4343- int64_t gid) {
4346+ int64_t gid) {
43444347 SStreamTriggerTask *pTask = pContext->pTask;
43454348 SSTriggerPullRequest *pReq = &pProgress->pullReq.base;
43464349 SStreamTaskAddr *pReader = pProgress->pTaskAddr;
@@ -5211,6 +5214,31 @@ static int32_t stRealtimeContextCheckIdleGroup(SSTriggerRealtimeContext *pContex
52115214 return code;
52125215}
52135216
5217+ static int32_t stRealtimeContextCopyTableUids(SSTriggerRealtimeContext *pContext, SObjList *pSrc, SObjList *pDst) {
5218+ int32_t code = TSDB_CODE_SUCCESS;
5219+ int32_t lino = 0;
5220+ int64_t *id = NULL;
5221+ SObjListIter iter = {0};
5222+ SStreamTriggerTask *pTask = pContext->pTask;
5223+
5224+ if (pSrc == NULL || pDst == NULL || pSrc->neles == 0) {
5225+ return TSDB_CODE_SUCCESS;
5226+ }
5227+
5228+ taosObjListClear(pDst);
5229+ taosObjListInitIter(pSrc, &iter, TOBJLIST_ITER_FORWARD);
5230+ while ((id = taosObjListIterNext(&iter)) != NULL) {
5231+ code = taosObjListAppend(pDst, id);
5232+ QUERY_CHECK_CODE(code, lino, _end);
5233+ }
5234+
5235+ _end:
5236+ if (code != TSDB_CODE_SUCCESS) {
5237+ ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
5238+ }
5239+ return code;
5240+ }
5241+
52145242static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
52155243 int32_t code = TSDB_CODE_SUCCESS;
52165244 int32_t lino = 0;
@@ -5371,20 +5399,20 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
53715399 }
53725400 }
53735401
5374- // Check for idle groups
5375- code = stRealtimeContextCheckIdleGroup(pContext);
5376- QUERY_CHECK_CODE(code, lino, _end);
5377-
5378- while (TD_DLIST_NELES(&pContext->groupsToCheck) > 0) {
5402+ _check:
5403+ while (TD_DLIST_NELES(&pContext->groupsToCheck) > 0 && pContext->status != STRIGGER_CONTEXT_ACQUIRE_REQUEST &&
5404+ pContext->status != STRIGGER_CONTEXT_SEND_CALC_REQ) {
53795405 SSTriggerRealtimeGroup *pGroup = TD_DLIST_HEAD(&pContext->groupsToCheck);
53805406 switch (pContext->status) {
53815407 case STRIGGER_CONTEXT_FETCH_META: {
5382- pContext->status = STRIGGER_CONTEXT_ACQUIRE_REQUEST;
5383- }
5384- case STRIGGER_CONTEXT_ACQUIRE_REQUEST: {
53855408 pContext->status = STRIGGER_CONTEXT_CHECK_CONDITION;
53865409 }
53875410 case STRIGGER_CONTEXT_CHECK_CONDITION: {
5411+ if (pTask->triggerType == STREAM_TRIGGER_SLIDING) {
5412+ // only sliding trigger may need to check again
5413+ code = stRealtimeContextCopyTableUids(pContext, &pGroup->tableUids, &pContext->dumpTableUids);
5414+ QUERY_CHECK_CODE(code, lino, _end);
5415+ }
53885416 code = stRealtimeGroupCheck(pGroup);
53895417 QUERY_CHECK_CODE(code, lino, _end);
53905418 if (pContext->needPseudoCols) {
@@ -5409,18 +5437,29 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
54095437 QUERY_CHECK_CODE(code, lino, _end);
54105438 }
54115439 }
5412- if (!pContext->needCheckAgain) {
5413- stRealtimeGroupClearMetadatas(pGroup);
5414- TD_DLIST_POP(&pContext->groupsToCheck, pGroup);
5415- code = stTriggerTaskReadyRecalcRequest(pTask, pGroup);
5440+ if (pContext->needCheckAgain && pTask->triggerType == STREAM_TRIGGER_SLIDING) {
5441+ code = stRealtimeContextCopyTableUids(pContext, &pContext->dumpTableUids, &pGroup->tableUids);
54165442 QUERY_CHECK_CODE(code, lino, _end);
5443+ } else {
5444+ TD_DLIST_POP(&pContext->groupsToCheck, pGroup);
5445+ stRealtimeGroupClearMetadatas(pGroup);
5446+ }
5447+ pContext->needCheckAgain = false;
5448+ taosObjListClear(&pContext->dumpTableUids);
5449+ code = stTriggerTaskReadyRecalcRequest(pTask, pGroup);
5450+ QUERY_CHECK_CODE(code, lino, _end);
5451+ pContext->status = STRIGGER_CONTEXT_FETCH_META;
5452+ if (pGroup->pPendingCalcParams.neles >= STREAM_CALC_REQ_MAX_WIN_NUM ||
5453+ pContext->calcParamPool.size >= STREAM_TRIGGER_MAX_PENDING_PARAMS) {
5454+ break;
54175455 }
5418- pContext->status = STRIGGER_CONTEXT_ACQUIRE_REQUEST;
54195456 }
54205457
54215458 if (pContext->pMinGroup == NULL && pContext->pMaxDelayHeap->min != NULL) {
54225459 pContext->pMinGroup = container_of(pContext->pMaxDelayHeap->min, SSTriggerRealtimeGroup, heapNode);
5423- if (pContext->pMinGroup->nextExecTime > now) {
5460+ if (pContext->pMinGroup->nextExecTime > now &&
5461+ pContext->pMinGroup->pPendingCalcParams.neles < STREAM_CALC_REQ_MAX_WIN_NUM &&
5462+ pContext->calcParamPool.size < STREAM_TRIGGER_MAX_PENDING_PARAMS) {
54245463 pContext->pMinGroup = NULL;
54255464 }
54265465 }
@@ -5465,7 +5504,7 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
54655504 // calc req has not been set
54665505 goto _end;
54675506 }
5468- if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) {
5507+ if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS && !IS_TRIGGER_GROUP_TO_CHECK(pGroup) ) {
54695508 stRealtimeGroupClearMetadatas(pGroup);
54705509 }
54715510 stRealtimeGroupClearTempState(pGroup);
@@ -5490,7 +5529,9 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
54905529 pContext->status = STRIGGER_CONTEXT_ACQUIRE_REQUEST;
54915530 if (pContext->pMaxDelayHeap->min != NULL) {
54925531 pContext->pMinGroup = container_of(pContext->pMaxDelayHeap->min, SSTriggerRealtimeGroup, heapNode);
5493- if (pContext->pMinGroup->nextExecTime > now) {
5532+ if (pContext->pMinGroup->nextExecTime > now &&
5533+ pContext->pMinGroup->pPendingCalcParams.neles < STREAM_CALC_REQ_MAX_WIN_NUM &&
5534+ pContext->calcParamPool.size < STREAM_TRIGGER_MAX_PENDING_PARAMS) {
54945535 pContext->pMinGroup = NULL;
54955536 }
54965537 } else {
@@ -5507,6 +5548,11 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
55075548 }
55085549 }
55095550
5551+ if (TD_DLIST_NELES(&pContext->groupsToCheck) > 0) {
5552+ pContext->status = STRIGGER_CONTEXT_CHECK_CONDITION;
5553+ goto _check;
5554+ }
5555+
55105556 int32_t deleteGroupNum = taosArrayGetSize(pContext->groupsToDelete);
55115557 if (deleteGroupNum > 0) {
55125558 pContext->status = STRIGGER_CONTEXT_SEND_DROP_REQ;
@@ -6121,6 +6167,9 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext,
61216167 if (latestVersionTime != INT64_MAX) {
61226168 atomic_store_64(&pTask->latestVersionTime, latestVersionTime);
61236169 }
6170+ // Check for idle groups
6171+ code = stRealtimeContextCheckIdleGroup(pContext);
6172+ QUERY_CHECK_CODE(code, lino, _end);
61246173
61256174 if (pContext->recovering && recoveryDone) {
61266175 ST_TASK_DLOG("stop fetch wal metas since recovery is done, pool size: %" PRId64, pContext->metaPool.size);
@@ -6384,6 +6433,9 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext,
63846433 if (latestVersionTime != INT64_MAX) {
63856434 atomic_store_64(&pTask->latestVersionTime, latestVersionTime);
63866435 }
6436+ // Check for idle groups
6437+ code = stRealtimeContextCheckIdleGroup(pContext);
6438+ QUERY_CHECK_CODE(code, lino, _end);
63876439 }
63886440
63896441 pContext->catchUp = (TD_DLIST_NELES(&pContext->groupsToCheck) == 0);
@@ -9001,7 +9053,6 @@ static void stRealtimeGroupClearTempState(SSTriggerRealtimeGroup *pGroup) {
90019053
90029054 pContext->needPseudoCols = false;
90039055 pContext->needMergeWindow = false;
9004- pContext->needCheckAgain = false;
90059056 if (pContext->pSorter != NULL) {
90069057 stNewTimestampSorterReset(pContext->pSorter);
90079058 }
@@ -9100,7 +9151,7 @@ static int32_t stRealtimeGroupAddMeta(SSTriggerRealtimeGroup *pGroup, int32_t vg
91009151 SObjList *pMetas = NULL;
91019152
91029153 // Update idle trigger timestamps when receiving data
9103- if (pTask->idleTimeoutMs > 0) {
9154+ if (pTask->idleTimeoutMs > 0 && !pContext->recovering ) {
91049155 int64_t prevRecvTimeMono = pGroup->lastRecvTimeMono;
91059156 int64_t prevRecvTimeWall = pGroup->lastRecvTimeWall;
91069157 pGroup->lastRecvTimeMono = taosGetMonoTimestampMs();
@@ -9425,10 +9476,9 @@ static int32_t stRealtimeGroupDoSlidingCheck(SSTriggerRealtimeGroup *pGroup) {
94259476 while (newWin.range.skey <= pGroup->newThreshold) {
94269477 void *px = taosArrayPush(pContext->pWindows, &newWin);
94279478 QUERY_CHECK_NULL(px, code, lino, _end, terrno);
9428- if (pContext->walMode == STRIGGER_WAL_META_ONLY &&
9429- TARRAY_SIZE(pContext->pWindows) >= STREAM_CALC_REQ_MAX_WIN_NUM) {
9479+ if (TARRAY_SIZE(pContext->pWindows) >= STREAM_CALC_REQ_MAX_WIN_NUM) {
94309480 pContext->needCheckAgain = true;
9431- goto _end ;
9481+ break ;
94329482 }
94339483 stTriggerTaskNextTimeWindow(pTask, &newWin.range);
94349484 }
0 commit comments