Skip to content

Commit f24423e

Browse files
committed
Add payload schema handling for task indexing
This change introduces support for handling payload schemas during task indexing. By incorporating the `payloadSchema` attribute into various components, we ensure that each task's payload structure is clearly defined and can be validated before processing. - Updated the TaskManifest and task metadata structures to include an optional `payloadSchema` attribute. This addition allows for more robust validation and handling of task payloads. - Enhanced several core modules to export and utilize the new `getSchemaToJsonSchema` function, providing easier conversion of schema types to JSON schemas. - Modified the database schema to store the `payloadSchema` attribute, ensuring that the payload schema information is persisted. - The change helps in maintaining consistency in data handling and improves the integrity of task data across the application.
1 parent 1f44340 commit f24423e

8 files changed

Lines changed: 47 additions & 0 deletions

File tree

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ async function createWorkerTask(
280280
fileId: tasksToBackgroundFiles?.get(task.id) ?? null,
281281
maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null,
282282
queueId: queue.id,
283+
payloadSchema: task.payloadSchema,
283284
},
284285
});
285286
} catch (error) {

internal-packages/database/prisma/schema.prisma

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,8 @@ model BackgroundWorkerTask {
510510
511511
triggerSource TaskTriggerSource @default(STANDARD)
512512
513+
payloadSchema Json?
514+
513515
@@unique([workerId, slug])
514516
// Quick lookup of task identifiers
515517
@@index([projectId, slug])

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export * from "./utils/interval.js";
7373
export * from "./config.js";
7474
export {
7575
getSchemaParseFn,
76+
getSchemaToJsonSchema,
7677
type AnySchemaParseFn,
7778
type SchemaParseFn,
7879
isSchemaZodEsque,

packages/core/src/v3/schemas/resources.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export const TaskResource = z.object({
1313
triggerSource: z.string().optional(),
1414
schedule: ScheduleMetadata.optional(),
1515
maxDuration: z.number().optional(),
16+
payloadSchema: z.any().optional(),
1617
});
1718

1819
export type TaskResource = z.infer<typeof TaskResource>;

packages/core/src/v3/schemas/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ const taskMetadata = {
189189
triggerSource: z.string().optional(),
190190
schedule: ScheduleMetadata.optional(),
191191
maxDuration: z.number().optional(),
192+
payloadSchema: z.any().optional(),
192193
};
193194

194195
export const TaskMetadata = z.object(taskMetadata);

packages/core/src/v3/types/schemas.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,36 @@ export function getSchemaParseFn<TType>(procedureParser: Schema): SchemaParseFn<
146146

147147
throw new Error("Could not find a validator fn");
148148
}
149+
150+
export function getSchemaToJsonSchema(schema: Schema): any | undefined {
151+
const parser = schema as any;
152+
153+
// Check if schema has a built-in toJsonSchema method (e.g., ArkType)
154+
if (typeof parser.toJsonSchema === "function") {
155+
return parser.toJsonSchema();
156+
}
157+
158+
// Check if it's a Zod schema
159+
if (typeof parser.parseAsync === "function" || typeof parser.parse === "function") {
160+
// Zod schema detected, but we need zod-to-json-schema library to convert
161+
// Return undefined for now, will be handled by the caller
162+
return undefined;
163+
}
164+
165+
// Check if it's a Yup schema
166+
if (typeof parser.validateSync === "function") {
167+
// Yup schema detected, but we need a yup-to-json-schema library to convert
168+
// Return undefined for now, will be handled by the caller
169+
return undefined;
170+
}
171+
172+
// Check if it's an Effect schema
173+
if (parser._tag === "Schema" || parser._tag === "SchemaClass") {
174+
// Effect schema detected, but we need Effect's JSONSchema.make to convert
175+
// Return undefined for now, will be handled by the caller
176+
return undefined;
177+
}
178+
179+
// For other schema types, return undefined
180+
return undefined;
181+
}

packages/core/src/v3/types/tasks.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ type CommonTaskOptions<
339339
* onFailure is called after a task run has failed (meaning the run function threw an error and won't be retried anymore)
340340
*/
341341
onFailure?: OnFailureHookFunction<TPayload, TInitOutput>;
342+
343+
/**
344+
* JSON Schema for the task payload. This will be synced to the server during indexing.
345+
*/
346+
payloadSchema?: any;
342347
};
343348

344349
export type TaskOptions<

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
flattenIdempotencyKey,
1212
getEnvVar,
1313
getSchemaParseFn,
14+
getSchemaToJsonSchema,
1415
InitOutput,
1516
lifecycleHooks,
1617
makeIdempotencyKey,
@@ -204,6 +205,7 @@ export function createTask<
204205
retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined,
205206
machine: typeof params.machine === "string" ? { preset: params.machine } : params.machine,
206207
maxDuration: params.maxDuration,
208+
payloadSchema: params.payloadSchema,
207209
fns: {
208210
run: params.run,
209211
},
@@ -334,6 +336,7 @@ export function createSchemaTask<
334336
retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined,
335337
machine: typeof params.machine === "string" ? { preset: params.machine } : params.machine,
336338
maxDuration: params.maxDuration,
339+
payloadSchema: params.schema ? getSchemaToJsonSchema(params.schema) : undefined,
337340
fns: {
338341
run: params.run,
339342
parsePayload,

0 commit comments

Comments
 (0)