Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit 030a772

Browse files
merge: #3470
3470: feat(si-layer-cache): impl rebaser requests work queue stream r=adamhjk a=fnichol This is an initial implementation of a work queue stream of rebaser requests, suitable for use by the rebaser service. Note that it's not yet clear if additional work queue streams follow this pattern, if the code is extractable into the service-related crates, or if some other code de-duplication is possible. At the moment, the newly added Rust types are doing a *lot* of heavy lifting to ensure correctness of behavior and type safety. Also note that this NATS Jetstream setup may require further testing and refinement to confirm the desired stream sourcing behavior. <img src="https://media1.giphy.com/media/5YuhLwDgrgtRVwI7OY/giphy.gif"/> Co-authored-by: Fletcher Nichol <fletcher@systeminit.com>
2 parents a9a12d3 + 8d22ded commit 030a772

4 files changed

Lines changed: 146 additions & 1 deletion

File tree

lib/si-layer-cache/src/activities.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,107 @@ impl Stream for ActivityStream {
283283
}
284284
}
285285
}
286+
287+
#[derive(Clone)]
288+
pub struct AckRebaseRequest {
289+
pub id: ActivityId,
290+
pub payload: RebaseRequest,
291+
pub metadata: LayeredEventMetadata,
292+
acker: Arc<Acker>,
293+
}
294+
295+
impl AckRebaseRequest {
296+
pub async fn ack(&self) -> LayerDbResult<()> {
297+
self.acker.ack().await.map_err(LayerDbError::NatsAck)
298+
}
299+
300+
pub async fn ack_with(&self, kind: AckKind) -> LayerDbResult<()> {
301+
self.acker
302+
.ack_with(kind)
303+
.await
304+
.map_err(LayerDbError::NatsAck)
305+
}
306+
307+
pub async fn double_ack(&self) -> LayerDbResult<()> {
308+
self.acker.double_ack().await.map_err(LayerDbError::NatsAck)
309+
}
310+
}
311+
312+
pub struct RebaserRequestsWorkQueueStream {
313+
inner: jetstream::consumer::pull::Stream,
314+
}
315+
316+
impl RebaserRequestsWorkQueueStream {
317+
const CONSUMER_NAME: &'static str = "rebaser-requests";
318+
319+
pub(crate) async fn create(nats_client: &NatsClient) -> LayerDbResult<Self> {
320+
let context = jetstream::new(nats_client.as_inner().clone());
321+
322+
// Ensure the sourced stream is created
323+
let _activities =
324+
nats::layerdb_activities_stream(&context, nats_client.metadata().subject_prefix())
325+
.await?;
326+
327+
let inner = nats::rebaser_requests_work_queue_stream(
328+
&context,
329+
nats_client.metadata().subject_prefix(),
330+
)
331+
.await?
332+
.create_consumer(Self::consumer_config())
333+
.await?
334+
.messages()
335+
.await?;
336+
337+
Ok(Self { inner })
338+
}
339+
340+
#[inline]
341+
fn consumer_config() -> jetstream::consumer::pull::Config {
342+
jetstream::consumer::pull::Config {
343+
durable_name: Some(Self::CONSUMER_NAME.to_string()),
344+
description: Some("rebaser requests consumer".to_string()),
345+
..Default::default()
346+
}
347+
}
348+
}
349+
350+
impl Stream for RebaserRequestsWorkQueueStream {
351+
type Item = LayerDbResult<AckRebaseRequest>;
352+
353+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
354+
match Pin::new(&mut self.inner.next()).poll(cx) {
355+
// Process the message
356+
Poll::Ready(Some(Ok(msg))) => {
357+
let (msg, acker) = msg.split();
358+
359+
match postcard::from_bytes::<Activity>(&msg.payload) {
360+
// Successfully deserialized into an activity
361+
Ok(activity) => match activity.payload {
362+
// Correct variant, convert to work-specific type
363+
ActivityPayload::RebaseRequest(req) => {
364+
Poll::Ready(Some(Ok(AckRebaseRequest {
365+
id: activity.id,
366+
payload: req,
367+
metadata: activity.metadata,
368+
acker: Arc::new(acker),
369+
})))
370+
}
371+
// Unexpected variant, message is invalid
372+
_ => Poll::Ready(Some(Err(LayerDbError::UnexpectedActivityVariant(
373+
ActivityPayloadDiscriminants::RebaseRequest.to_subject(),
374+
ActivityPayloadDiscriminants::from(activity.payload).to_subject(),
375+
)))),
376+
},
377+
// Error deserializing message
378+
Err(err) => Poll::Ready(Some(Err(err.into()))),
379+
}
380+
}
381+
// Upstream errors are propagated downstream
382+
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
383+
// If the upstream closes, then we do too
384+
Poll::Ready(None) => Poll::Ready(None),
385+
// Not ready, so...not ready!
386+
Poll::Pending => Poll::Pending,
387+
}
388+
}
389+
}

lib/si-layer-cache/src/db.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker};
99
use ulid::Ulid;
1010

1111
use crate::{
12-
activities::{Activity, ActivityPayloadDiscriminants, ActivityPublisher, ActivityStream},
12+
activities::{
13+
Activity, ActivityPayloadDiscriminants, ActivityPublisher, ActivityStream,
14+
RebaserRequestsWorkQueueStream,
15+
},
1316
error::LayerDbResult,
1417
layer_cache::LayerCache,
1518
persister::{PersisterClient, PersisterTask},
@@ -168,6 +171,12 @@ where
168171
)
169172
.await
170173
}
174+
175+
pub async fn subscribe_rebaser_requests_work_queue(
176+
&self,
177+
) -> LayerDbResult<RebaserRequestsWorkQueueStream> {
178+
RebaserRequestsWorkQueueStream::create(&self.nats_client).await
179+
}
171180
}
172181

173182
#[must_use = "graceful shutdown must be spawned on runtime"]

lib/si-layer-cache/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ pub enum LayerDbError {
6363
SledError(#[from] sled::Error),
6464
#[error("tokio oneshot recv error: {0}")]
6565
TokioOneShotRecv(#[from] tokio::sync::oneshot::error::RecvError),
66+
#[error("unexpected activity variant; expected={0}, actual={1}")]
67+
UnexpectedActivityVariant(String, String),
6668
}
6769

6870
impl LayerDbError {

lib/si-layer-cache/src/nats.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const NATS_EVENT_STREAM_SUBJECTS: &[&str] = &["si.layerdb.events.*.*.*.*"];
1717
const NATS_ACTIVITIES_STREAM_NAME: &str = "LAYERDB_ACTIVITIES";
1818
const NATS_ACTIVITIES_STREAM_SUBJECTS: &[&str] = &["si.layerdb.activities.>"];
1919

20+
const NATS_REBASER_REQUESTS_WORK_QUEUE_STREAM_NAME: &str = "REBASER_REQUESTS";
21+
2022
/// Returns a Jetstream Stream and creates it if it doesn't yet exist.
2123
pub async fn layerdb_events_stream(
2224
context: &jetstream::Context,
@@ -68,6 +70,34 @@ pub async fn layerdb_activities_stream(
6870
Ok(stream)
6971
}
7072

73+
pub async fn rebaser_requests_work_queue_stream(
74+
context: &jetstream::Context,
75+
prefix: Option<&str>,
76+
) -> Result<jetstream::stream::Stream, jetstream::context::CreateStreamError> {
77+
let requests_subject = subject::for_activity_discriminate(
78+
prefix,
79+
crate::activities::ActivityPayloadDiscriminants::RebaseRequest,
80+
);
81+
82+
let source = jetstream::stream::Source {
83+
name: nats_stream_name(prefix, NATS_ACTIVITIES_STREAM_NAME),
84+
filter_subject: Some(requests_subject.to_string()),
85+
..Default::default()
86+
};
87+
88+
let stream = context
89+
.get_or_create_stream(jetstream::stream::Config {
90+
name: nats_stream_name(prefix, NATS_REBASER_REQUESTS_WORK_QUEUE_STREAM_NAME),
91+
description: Some("Rebaser requests work queue".to_owned()),
92+
retention: jetstream::stream::RetentionPolicy::WorkQueue,
93+
sources: Some(vec![source]),
94+
..Default::default()
95+
})
96+
.await?;
97+
98+
Ok(stream)
99+
}
100+
71101
fn nats_stream_name(prefix: Option<&str>, suffix: impl AsRef<str>) -> String {
72102
let suffix = suffix.as_ref();
73103

0 commit comments

Comments
 (0)