|
| 1 | +import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3"; |
| 2 | +import { env } from "~/env.server"; |
| 3 | +import { startActiveSpan } from "~/v3/tracer.server"; |
| 4 | +import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server"; |
| 5 | +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; |
| 6 | +import { logger } from "~/services/logger.server"; |
| 7 | + |
| 8 | +export type BatchPayloadProcessResult = { |
| 9 | + /** The processed payload - either the original or an R2 path */ |
| 10 | + payload: unknown; |
| 11 | + /** The payload type - "application/store" if offloaded to R2 */ |
| 12 | + payloadType: string; |
| 13 | + /** Whether the payload was offloaded to R2 */ |
| 14 | + wasOffloaded: boolean; |
| 15 | + /** Size of the payload in bytes */ |
| 16 | + size: number; |
| 17 | +}; |
| 18 | + |
| 19 | +/** |
| 20 | + * BatchPayloadProcessor handles payload offloading for batch items. |
| 21 | + * |
| 22 | + * When a batch item's payload exceeds the configured threshold, it's uploaded |
| 23 | + * to object storage (R2) and the payload is replaced with the storage path. |
| 24 | + * This aligns with how single task triggers work via DefaultPayloadProcessor. |
| 25 | + * |
| 26 | + * Path format: batch_{batchId}/item_{index}/payload.json |
| 27 | + */ |
| 28 | +export class BatchPayloadProcessor { |
| 29 | + /** |
| 30 | + * Check if object storage is available for payload offloading. |
| 31 | + * If not available, large payloads will be stored inline (which may fail for very large payloads). |
| 32 | + */ |
| 33 | + isObjectStoreAvailable(): boolean { |
| 34 | + return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined; |
| 35 | + } |
| 36 | + |
| 37 | + /** |
| 38 | + * Process a batch item payload, offloading to R2 if it exceeds the threshold. |
| 39 | + * |
| 40 | + * @param payload - The raw payload from the batch item |
| 41 | + * @param payloadType - The payload type (e.g., "application/json") |
| 42 | + * @param batchId - The batch ID (internal format) |
| 43 | + * @param itemIndex - The item index within the batch |
| 44 | + * @param environment - The authenticated environment for R2 path construction |
| 45 | + * @returns The processed result with potentially offloaded payload |
| 46 | + */ |
| 47 | + async process( |
| 48 | + payload: unknown, |
| 49 | + payloadType: string, |
| 50 | + batchId: string, |
| 51 | + itemIndex: number, |
| 52 | + environment: AuthenticatedEnvironment |
| 53 | + ): Promise<BatchPayloadProcessResult> { |
| 54 | + return startActiveSpan("BatchPayloadProcessor.process()", async (span) => { |
| 55 | + span.setAttribute("batchId", batchId); |
| 56 | + span.setAttribute("itemIndex", itemIndex); |
| 57 | + span.setAttribute("payloadType", payloadType); |
| 58 | + |
| 59 | + // Create the packet for size checking |
| 60 | + const packet = this.#createPayloadPacket(payload, payloadType); |
| 61 | + |
| 62 | + if (!packet.data) { |
| 63 | + return { |
| 64 | + payload, |
| 65 | + payloadType, |
| 66 | + wasOffloaded: false, |
| 67 | + size: 0, |
| 68 | + }; |
| 69 | + } |
| 70 | + |
| 71 | + const threshold = env.BATCH_PAYLOAD_OFFLOAD_THRESHOLD ?? env.TASK_PAYLOAD_OFFLOAD_THRESHOLD; |
| 72 | + const { needsOffloading, size } = packetRequiresOffloading(packet, threshold); |
| 73 | + |
| 74 | + span.setAttribute("payloadSize", size); |
| 75 | + span.setAttribute("needsOffloading", needsOffloading); |
| 76 | + span.setAttribute("threshold", threshold); |
| 77 | + |
| 78 | + if (!needsOffloading) { |
| 79 | + return { |
| 80 | + payload, |
| 81 | + payloadType, |
| 82 | + wasOffloaded: false, |
| 83 | + size, |
| 84 | + }; |
| 85 | + } |
| 86 | + |
| 87 | + // Check if object store is available |
| 88 | + if (!this.isObjectStoreAvailable()) { |
| 89 | + logger.warn("Payload exceeds threshold but object store is not available", { |
| 90 | + batchId, |
| 91 | + itemIndex, |
| 92 | + size, |
| 93 | + threshold, |
| 94 | + }); |
| 95 | + |
| 96 | + // Return without offloading - the payload will be stored inline |
| 97 | + // This may fail downstream for very large payloads |
| 98 | + return { |
| 99 | + payload, |
| 100 | + payloadType, |
| 101 | + wasOffloaded: false, |
| 102 | + size, |
| 103 | + }; |
| 104 | + } |
| 105 | + |
| 106 | + // Upload to R2 |
| 107 | + const filename = `batch_${batchId}/item_${itemIndex}/payload.json`; |
| 108 | + |
| 109 | + const [uploadError] = await tryCatch( |
| 110 | + uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment) |
| 111 | + ); |
| 112 | + |
| 113 | + if (uploadError) { |
| 114 | + logger.error("Failed to upload batch item payload to object store", { |
| 115 | + batchId, |
| 116 | + itemIndex, |
| 117 | + error: uploadError instanceof Error ? uploadError.message : String(uploadError), |
| 118 | + }); |
| 119 | + |
| 120 | + // Throw to fail this item - SDK can retry |
| 121 | + throw new Error( |
| 122 | + `Failed to upload large payload to object store: ${uploadError instanceof Error ? uploadError.message : String(uploadError)}` |
| 123 | + ); |
| 124 | + } |
| 125 | + |
| 126 | + logger.debug("Batch item payload offloaded to R2", { |
| 127 | + batchId, |
| 128 | + itemIndex, |
| 129 | + filename, |
| 130 | + size, |
| 131 | + }); |
| 132 | + |
| 133 | + span.setAttribute("wasOffloaded", true); |
| 134 | + span.setAttribute("offloadPath", filename); |
| 135 | + |
| 136 | + return { |
| 137 | + payload: filename, |
| 138 | + payloadType: "application/store", |
| 139 | + wasOffloaded: true, |
| 140 | + size, |
| 141 | + }; |
| 142 | + }); |
| 143 | + } |
| 144 | + |
| 145 | + /** |
| 146 | + * Create an IOPacket from payload for size checking. |
| 147 | + */ |
| 148 | + #createPayloadPacket(payload: unknown, payloadType: string): IOPacket { |
| 149 | + if (payloadType === "application/json") { |
| 150 | + return { data: JSON.stringify(payload), dataType: "application/json" }; |
| 151 | + } |
| 152 | + |
| 153 | + if (typeof payload === "string") { |
| 154 | + return { data: payload, dataType: payloadType }; |
| 155 | + } |
| 156 | + |
| 157 | + // For other types, try to stringify |
| 158 | + try { |
| 159 | + return { data: JSON.stringify(payload), dataType: payloadType }; |
| 160 | + } catch { |
| 161 | + return { dataType: payloadType }; |
| 162 | + } |
| 163 | + } |
| 164 | +} |
| 165 | + |
0 commit comments