Skip to content

Commit 4767f86

Browse files
authored
EFF-749 Backport sql-pg dedicated listen connection (#6140)
1 parent 6e3782a commit 4767f86

4 files changed

Lines changed: 76 additions & 2 deletions

File tree

.changeset/bright-pants-thank.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/sql-pg": patch
3+
---
4+
5+
Use a dedicated PostgreSQL connection for LISTEN / UNLISTEN so active listeners do not consume a pooled query connection.

packages/sql-pg/src/PgClient.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,33 @@ const makeClient = (
311311
})
312312
const reserve = Effect.map(reserveRaw, (client) => new ConnectionImpl(client))
313313

314+
const onListenClientError = (_: Error) => {
315+
}
316+
314317
const listenClient = yield* RcRef.make({
315-
acquire: reserveRaw
318+
acquire: Effect.acquireRelease(
319+
Effect.tryPromise({
320+
try: async () => {
321+
const client = new Pg.Client(pool.options)
322+
await client.connect()
323+
client.on("error", onListenClientError)
324+
return client
325+
},
326+
catch: (cause) =>
327+
new SqlError({
328+
cause,
329+
message: "Failed to acquire connection for listen"
330+
})
331+
}),
332+
(client) =>
333+
Effect.promise(() => {
334+
client.off("error", onListenClientError)
335+
return client.end()
336+
}).pipe(
337+
Effect.interruptible,
338+
Effect.timeoutOption(1000)
339+
)
340+
)
316341
})
317342

318343
return Object.assign(

packages/sql-pg/test/Client.test.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { PgClient } from "@effect/sql-pg"
22
import * as SqlClient from "@effect/sql/SqlClient"
33
import * as Statement from "@effect/sql/Statement"
44
import { assert, expect, it } from "@effect/vitest"
5-
import { Effect, Redacted, String } from "effect"
5+
import { Effect, Fiber, Redacted, String } from "effect"
66
import * as Chunk from "effect/Chunk"
77
import * as Stream from "effect/Stream"
88
import * as TestServices from "effect/TestServices"
@@ -344,3 +344,37 @@ it.layer(PgContainer.ClientFromPoolLive, { timeout: "30 seconds" })("PgClient fr
344344
expect(sql.config.database).toEqual(parsedConfig.database)
345345
}))
346346
})
347+
348+
it.layer(PgContainer.ClientSingleConnectionLive, { timeout: "30 seconds" })("PgClient listen", (it) => {
349+
it.scoped("listen does not reserve a pool connection", () =>
350+
Effect.gen(function*() {
351+
const sql = yield* PgClient.PgClient
352+
const channel = "pool_connection_listen"
353+
354+
const listenFiber = yield* sql.listen(channel).pipe(
355+
Stream.take(1),
356+
Stream.runCollect,
357+
Effect.forkScoped
358+
)
359+
360+
yield* Effect.sleep("250 millis")
361+
362+
const rows = yield* sql<{ value: number }>`SELECT 1 as value`.pipe(
363+
Effect.timeoutFail({
364+
duration: "3 seconds",
365+
onTimeout: () => new Error("query timed out while listener was active")
366+
})
367+
)
368+
expect(rows).toEqual([{ value: 1 }])
369+
370+
yield* sql`SELECT pg_notify(${channel}, ${"payload"})`
371+
372+
const payloads = yield* Fiber.join(listenFiber).pipe(
373+
Effect.timeoutFail({
374+
duration: "3 seconds",
375+
onTimeout: () => new Error("listener did not receive notification in time")
376+
})
377+
)
378+
expect(Chunk.toReadonlyArray(payloads)).toEqual(["payload"])
379+
}).pipe(TestServices.provideLive))
380+
})

packages/sql-pg/test/utils.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,14 @@ export class PgContainer extends Effect.Service<PgContainer>()("test/PgContainer
5050
})
5151
})
5252
).pipe(Layer.provide(this.Default))
53+
54+
static ClientSingleConnectionLive = Layer.unwrapEffect(
55+
Effect.gen(function*() {
56+
const container = yield* PgContainer
57+
return PgClient.layer({
58+
url: Redacted.make(container.getConnectionUri()),
59+
maxConnections: 1
60+
})
61+
})
62+
).pipe(Layer.provide(this.Default))
5363
}

0 commit comments

Comments
 (0)