Skip to content

Commit 1ff0f6b

Browse files
committed
Removed the Redis pubsub, just use the presence key
1 parent 30df947 commit 1ff0f6b

3 files changed

Lines changed: 8 additions & 61 deletions

File tree

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
const PRESENCE_KEY_PREFIX = "dev-presence:connection:";
2-
const PRESENCE_CHANNEL_PREFIX = "dev-presence:updates:";
32

43
export class DevPresenceStream {
54
static getPresenceKey(environmentId: string) {
65
return `${PRESENCE_KEY_PREFIX}${environmentId}`;
76
}
8-
9-
static getPresenceChannel(environmentId: string) {
10-
return `${PRESENCE_CHANNEL_PREFIX}${environmentId}`;
11-
}
127
}

apps/webapp/app/routes/engine.v1.dev.presence.ts

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,50 +29,26 @@ export const loader = createSSELoader({
2929
const environmentId = authentication.environment.id;
3030

3131
const presenceKey = DevPresenceStream.getPresenceKey(environmentId);
32-
const presenceChannel = DevPresenceStream.getPresenceChannel(environmentId);
32+
33+
const ttl = (env.DEV_PRESENCE_POLL_INTERVAL_MS / 1000) * 2;
3334

3435
return {
3536
beforeStream: async () => {
3637
logger.debug("Start dev presence SSE session", {
3738
environmentId,
3839
presenceKey,
39-
presenceChannel,
4040
});
4141
},
4242
initStream: async ({ send }) => {
4343
// Set initial presence with more context
44-
await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, new Date().toISOString());
45-
46-
// Publish presence update
47-
await redis.publish(
48-
presenceChannel,
49-
JSON.stringify({
50-
type: "connected",
51-
environmentId,
52-
timestamp: Date.now(),
53-
})
54-
);
55-
44+
await redis.setex(presenceKey, ttl, new Date().toISOString());
5645
send({ event: "start", data: `Started ${id}` });
5746
},
5847
iterator: async ({ send, date }) => {
59-
await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, date.toISOString());
60-
48+
await redis.setex(presenceKey, ttl, date.toISOString());
6149
send({ event: "time", data: new Date().toISOString() });
6250
},
63-
cleanup: async () => {
64-
await redis.del(presenceKey);
65-
66-
// Publish disconnect event
67-
await redis.publish(
68-
presenceChannel,
69-
JSON.stringify({
70-
type: "disconnected",
71-
environmentId,
72-
timestamp: Date.now(),
73-
})
74-
);
75-
},
51+
cleanup: async () => {},
7652
};
7753
},
7854
});

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dev.presence.tsx

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,15 @@ export const loader = createSSELoader({
3333
}
3434

3535
const presenceKey = DevPresenceStream.getPresenceKey(environment.id);
36-
const presenceChannel = DevPresenceStream.getPresenceChannel(environment.id);
3736

38-
// Create two Redis clients - one for subscribing and one for regular commands
39-
const redisConfig = {
37+
const cmdRedis = new Redis({
4038
port: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PORT ?? undefined,
4139
host: env.RUN_ENGINE_DEV_PRESENCE_REDIS_HOST ?? undefined,
4240
username: env.RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME ?? undefined,
4341
password: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD ?? undefined,
4442
enableAutoPipelining: true,
4543
...(env.RUN_ENGINE_DEV_PRESENCE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
46-
};
47-
48-
// Subscriber client for pubsub
49-
const subRedis = new Redis(redisConfig);
50-
51-
// Command client for regular Redis commands
52-
const cmdRedis = new Redis(redisConfig);
44+
});
5345

5446
const checkAndSendPresence = async (send: SendFunction) => {
5547
try {
@@ -93,35 +85,19 @@ export const loader = createSSELoader({
9385
beforeStream: async () => {
9486
logger.debug("Start dev presence listening SSE session", {
9587
environmentId: environment.id,
96-
presenceChannel,
88+
presenceKey,
9789
});
9890
},
9991
initStream: async ({ send }) => {
10092
await checkAndSendPresence(send);
10193

102-
//start subscribing with the subscriber client
103-
await subRedis.subscribe(presenceChannel);
104-
105-
subRedis.on("message", async (channel, message) => {
106-
if (channel === presenceChannel) {
107-
try {
108-
await checkAndSendPresence(send);
109-
} catch (error) {
110-
logger.error("Failed to parse presence message", { error, message });
111-
}
112-
}
113-
});
114-
11594
send({ event: "time", data: new Date().toISOString() });
11695
},
11796
iterator: async ({ send, date }) => {
11897
await checkAndSendPresence(send);
11998
},
12099
cleanup: async ({ send }) => {
121100
await checkAndSendPresence(send);
122-
123-
await subRedis.unsubscribe(presenceChannel);
124-
await subRedis.quit();
125101
await cmdRedis.quit();
126102
},
127103
};

0 commit comments

Comments
 (0)