Skip to content

Commit c8b3ba9

Browse files
committed
switch to placement tags
1 parent 8de7ef5 commit c8b3ba9

6 files changed

Lines changed: 61 additions & 43 deletions

File tree

apps/supervisor/src/env.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,9 @@ const Env = z.object({
7777
KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
7878
KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),
7979

80-
// Tier scheduling settings
81-
ENABLE_TIER_SCHEDULING: BoolEnv.default(false),
82-
TIER_LABEL_KEY: z.string().default("node.cluster.x-k8s.io/paid"),
83-
TIER_LABEL_VALUE_FREE: z.string().default("false"),
84-
TIER_LABEL_VALUE_PAID: z.string().default("true"),
80+
// Placement tags settings
81+
PLACEMENT_TAGS_ENABLED: BoolEnv.default(false),
82+
PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"),
8583

8684
// Metrics
8785
METRICS_ENABLED: BoolEnv.default(true),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ class ManagedSupervisor {
247247
nextAttemptNumber: message.run.attemptNumber,
248248
snapshotId: message.snapshot.id,
249249
snapshotFriendlyId: message.snapshot.friendlyId,
250-
isPaidTier: message.billing?.currentPlan.isPaying ?? false,
250+
placementTags: message.placementTags,
251251
});
252252

253253
// Disabled for now

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
type WorkloadManagerCreateOptions,
55
type WorkloadManagerOptions,
66
} from "./types.js";
7-
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
7+
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
88
import { env } from "../env.js";
99
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
1010
import { getRunnerId } from "../util.js";
@@ -13,22 +13,20 @@ type ResourceQuantities = {
1313
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
1414
};
1515

16-
interface TierConfig {
16+
interface PlacementConfig {
1717
enabled: boolean;
18-
labelKey: string;
19-
freeValue: string;
20-
paidValue: string;
18+
prefix: string;
2119
}
2220

2321
export class KubernetesWorkloadManager implements WorkloadManager {
2422
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
2523
private k8s: K8sApi;
2624
private namespace = env.KUBERNETES_NAMESPACE;
27-
private tierConfig: TierConfig;
25+
private placementConfig: PlacementConfig;
2826

2927
constructor(private opts: WorkloadManagerOptions) {
3028
this.k8s = createK8sApi();
31-
this.tierConfig = this.tierSchedulingConfig;
29+
this.placementConfig = this.placementTagsConfig;
3230

3331
if (opts.workloadApiDomain) {
3432
this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", {
@@ -37,34 +35,56 @@ export class KubernetesWorkloadManager implements WorkloadManager {
3735
}
3836
}
3937

40-
private get tierSchedulingConfig(): TierConfig {
38+
private get placementTagsConfig(): PlacementConfig {
4139
return {
42-
enabled: env.ENABLE_TIER_SCHEDULING,
43-
labelKey: env.TIER_LABEL_KEY,
44-
freeValue: env.TIER_LABEL_VALUE_FREE,
45-
paidValue: env.TIER_LABEL_VALUE_PAID,
40+
enabled: env.PLACEMENT_TAGS_ENABLED,
41+
prefix: env.PLACEMENT_TAGS_PREFIX,
4642
};
4743
}
4844

49-
private addTierScheduling(
45+
private addPlacementTags(
5046
podSpec: Omit<k8s.V1PodSpec, "containers">,
51-
isPaidTier: boolean
47+
placementTags?: PlacementTag[]
5248
): Omit<k8s.V1PodSpec, "containers"> {
53-
if (!this.tierConfig.enabled) {
49+
if (!this.placementConfig.enabled || !placementTags || placementTags.length === 0) {
5450
return podSpec;
5551
}
5652

57-
const labelValue = isPaidTier ? this.tierConfig.paidValue : this.tierConfig.freeValue;
53+
const nodeSelector: Record<string, string> = { ...podSpec.nodeSelector };
54+
55+
// Convert placement tags to nodeSelector labels
56+
for (const tag of placementTags) {
57+
const labelKey = `${this.placementConfig.prefix}/${tag.key}`;
58+
59+
// Print warnings (if any)
60+
this.printTagWarnings(tag);
61+
62+
// For now we only support single values via nodeSelector
63+
nodeSelector[labelKey] = tag.values?.[0] ?? "";
64+
}
5865

5966
return {
6067
...podSpec,
61-
nodeSelector: {
62-
...podSpec.nodeSelector,
63-
[this.tierConfig.labelKey]: labelValue,
64-
},
68+
nodeSelector,
6569
};
6670
}
6771

72+
private printTagWarnings(tag: PlacementTag) {
73+
if (!tag.values || tag.values.length === 0) {
74+
// No values provided
75+
this.logger.warn(
76+
"[KubernetesWorkloadManager] Placement tag has no values, using empty string",
77+
tag
78+
);
79+
} else if (tag.values.length > 1) {
80+
// Multiple values provided
81+
this.logger.warn(
82+
"[KubernetesWorkloadManager] Placement tag has multiple values, only using first one",
83+
tag
84+
);
85+
}
86+
}
87+
6888
async create(opts: WorkloadManagerCreateOptions) {
6989
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });
7090

@@ -85,7 +105,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
85105
},
86106
},
87107
spec: {
88-
...this.addTierScheduling(this.#defaultPodSpec, opts.isPaidTier ?? false),
108+
...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags),
89109
terminationGracePeriodSeconds: 60 * 60,
90110
containers: [
91111
{

apps/supervisor/src/workloadManager/types.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type EnvironmentType, type MachinePreset } from "@trigger.dev/core/v3";
1+
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
22

33
export interface WorkloadManagerOptions {
44
workloadApiProtocol: "http" | "https";
@@ -23,6 +23,7 @@ export interface WorkloadManagerCreateOptions {
2323
version: string;
2424
nextAttemptNumber?: number;
2525
dequeuedAt: Date;
26+
placementTags?: PlacementTag[];
2627
// identifiers
2728
envId: string;
2829
envType: EnvironmentType;
@@ -32,6 +33,4 @@ export interface WorkloadManagerCreateOptions {
3233
runFriendlyId: string;
3334
snapshotId: string;
3435
snapshotFriendlyId: string;
35-
// tier scheduling
36-
isPaidTier?: boolean;
3736
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { BillingCache } from "../billingCache.js";
22
import { startSpan } from "@internal/tracing";
33
import { assertExhaustive } from "@trigger.dev/core";
4-
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
4+
import { DequeuedMessage, RetryOptions, placementTag } from "@trigger.dev/core/v3";
55
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
66
import { PrismaClientOrTransaction } from "@trigger.dev/database";
77
import { getRunWithBackgroundWorkerTasks } from "../db/worker.js";
@@ -474,11 +474,7 @@ export class DequeueSystem {
474474
project: {
475475
id: lockedTaskRun.projectId,
476476
},
477-
billing: {
478-
currentPlan: {
479-
isPaying,
480-
},
481-
},
477+
placementTags: [placementTag("paid", isPaying ? "true" : "false")],
482478
} satisfies DequeuedMessage;
483479
}
484480
);

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,17 @@ export const DequeueMessageCheckpoint = z.object({
224224
});
225225
export type DequeueMessageCheckpoint = z.infer<typeof DequeueMessageCheckpoint>;
226226

227+
export const PlacementTag = z.object({
228+
key: z.string(),
229+
values: z.array(z.string()).optional(),
230+
});
231+
export type PlacementTag = z.infer<typeof PlacementTag>;
232+
233+
/** Helper functions for placement tags. Accepts a single value or an array of values. */
234+
export function placementTag(key: string, valueOrValues: string | string[]): PlacementTag {
235+
return { key, values: Array.isArray(valueOrValues) ? valueOrValues : [valueOrValues] };
236+
}
237+
227238
/** This is sent to a Worker when a run is dequeued (a new run or continuing run) */
228239
export const DequeuedMessage = z.object({
229240
version: z.literal("1"),
@@ -261,12 +272,6 @@ export const DequeuedMessage = z.object({
261272
project: z.object({
262273
id: z.string(),
263274
}),
264-
billing: z
265-
.object({
266-
currentPlan: z.object({
267-
isPaying: z.boolean(),
268-
}),
269-
})
270-
.optional(),
275+
placementTags: z.array(PlacementTag).optional(),
271276
});
272277
export type DequeuedMessage = z.infer<typeof DequeuedMessage>;

0 commit comments

Comments
 (0)