Skip to content

Commit 1c00d98

Browse files
committed
scaffold the replication package
1 parent a8533e0 commit 1c00d98

10 files changed

Lines changed: 1070 additions & 7 deletions

File tree

internal-packages/replication/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
"types": "./dist/src/index.d.ts",
77
"type": "module",
88
"dependencies": {
9+
"@internal/redis": "workspace:*",
910
"@internal/tracing": "workspace:*",
1011
"@trigger.dev/core": "workspace:*",
11-
"pg": "8.15.6"
12+
"pg": "8.15.6",
13+
"redlock": "5.0.0-beta.2"
1214
},
1315
"devDependencies": {
1416
"@internal/testcontainers": "workspace:*",
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { postgresAndRedisTest } from "@internal/testcontainers";
2+
import { LogicalReplicationClient } from "./client.js";
3+
import { setTimeout } from "timers/promises";
4+
5+
describe("Replication Client", () => {
6+
postgresAndRedisTest(
7+
"should be able to subscribe to changes on a table",
8+
async ({ postgresContainer, prisma, redisOptions }) => {
9+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
10+
11+
const client = new LogicalReplicationClient({
12+
name: "test",
13+
slotName: "test_slot",
14+
publicationName: "test_publication",
15+
redisOptions,
16+
table: "TaskRun",
17+
pgConfig: {
18+
connectionString: postgresContainer.getConnectionUri(),
19+
},
20+
});
21+
22+
const logs: Array<{
23+
lsn: string;
24+
log: unknown;
25+
}> = [];
26+
27+
client.events.on("data", (data) => {
28+
console.log(data);
29+
logs.push(data);
30+
});
31+
32+
client.events.on("error", (error) => {
33+
console.error(error);
34+
});
35+
36+
await client.subscribe();
37+
38+
const organization = await prisma.organization.create({
39+
data: {
40+
title: "test",
41+
slug: "test",
42+
},
43+
});
44+
45+
const project = await prisma.project.create({
46+
data: {
47+
name: "test",
48+
slug: "test",
49+
organizationId: organization.id,
50+
externalRef: "test",
51+
},
52+
});
53+
54+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
55+
data: {
56+
slug: "test",
57+
type: "DEVELOPMENT",
58+
projectId: project.id,
59+
organizationId: organization.id,
60+
apiKey: "test",
61+
pkApiKey: "test",
62+
shortcode: "test",
63+
},
64+
});
65+
66+
// Now we insert a row into the table
67+
await prisma.taskRun.create({
68+
data: {
69+
friendlyId: "run_1234",
70+
taskIdentifier: "my-task",
71+
payload: JSON.stringify({ foo: "bar" }),
72+
traceId: "1234",
73+
spanId: "1234",
74+
queue: "test",
75+
runtimeEnvironmentId: runtimeEnvironment.id,
76+
projectId: project.id,
77+
},
78+
});
79+
80+
// Wait for a bit of time
81+
await setTimeout(50);
82+
83+
// Now we should see the row in the logs
84+
expect(logs.length).toBeGreaterThan(0);
85+
86+
await client.stop();
87+
}
88+
);
89+
});

0 commit comments

Comments
 (0)