Skip to content

Commit fa4185b

Browse files
committed
Add ability to drop the replication slot completely on teardown
1 parent f3dc43b commit fa4185b

2 files changed

Lines changed: 106 additions & 0 deletions

File tree

internal-packages/replication/src/client.test.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,99 @@ describe("Replication Client", () => {
8686
await client.stop();
8787
}
8888
);
89+
90+
postgresAndRedisTest(
91+
"should be able to teardown",
92+
async ({ postgresContainer, prisma, redisOptions }) => {
93+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
94+
95+
const client = new LogicalReplicationClient({
96+
name: "test",
97+
slotName: "test_slot",
98+
publicationName: "test_publication",
99+
redisOptions,
100+
table: "TaskRun",
101+
pgConfig: {
102+
connectionString: postgresContainer.getConnectionUri(),
103+
},
104+
});
105+
106+
const logs: Array<{
107+
lsn: string;
108+
log: unknown;
109+
}> = [];
110+
111+
client.events.on("data", (data) => {
112+
console.log(data);
113+
logs.push(data);
114+
});
115+
116+
client.events.on("error", (error) => {
117+
console.error(error);
118+
});
119+
120+
await client.subscribe();
121+
122+
const organization = await prisma.organization.create({
123+
data: {
124+
title: "test",
125+
slug: "test",
126+
},
127+
});
128+
129+
const project = await prisma.project.create({
130+
data: {
131+
name: "test",
132+
slug: "test",
133+
organizationId: organization.id,
134+
externalRef: "test",
135+
},
136+
});
137+
138+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
139+
data: {
140+
slug: "test",
141+
type: "DEVELOPMENT",
142+
projectId: project.id,
143+
organizationId: organization.id,
144+
apiKey: "test",
145+
pkApiKey: "test",
146+
shortcode: "test",
147+
},
148+
});
149+
150+
// Now we insert a row into the table
151+
await prisma.taskRun.create({
152+
data: {
153+
friendlyId: "run_1234",
154+
taskIdentifier: "my-task",
155+
payload: JSON.stringify({ foo: "bar" }),
156+
traceId: "1234",
157+
spanId: "1234",
158+
queue: "test",
159+
runtimeEnvironmentId: runtimeEnvironment.id,
160+
projectId: project.id,
161+
},
162+
});
163+
164+
// Wait for a bit of time
165+
await setTimeout(50);
166+
167+
// Now we should see the row in the logs
168+
expect(logs.length).toBeGreaterThan(0);
169+
170+
const slotDropped = await client.teardown();
171+
172+
expect(slotDropped).toBe(true);
173+
174+
// Now the replication slot should be gone
175+
const slotExists = await prisma.$queryRaw<
176+
{ exists: boolean }[]
177+
>`SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'test_slot');`;
178+
179+
console.log(slotExists);
180+
181+
expect(slotExists[0].exists).toBe(false);
182+
}
183+
);
89184
});

internal-packages/replication/src/client.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,20 @@ export class LogicalReplicationClient {
190190
return false;
191191
}
192192

193+
this.client = new Client({
194+
...this.options.pgConfig,
195+
// @ts-expect-error
196+
replication: "database",
197+
application_name: this.options.name,
198+
});
199+
await this.client.connect();
200+
193201
// Drop the slot
194202
const slotDropped = await this.#dropSlot();
195203

204+
await this.client.end();
205+
this.client = null;
206+
196207
await this.#releaseLeaderLock();
197208

198209
return slotDropped;

0 commit comments

Comments
 (0)