Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit a6dca45

Browse files
merge: #3455
3455: feat(si-layer): basic activities support r=adamhjk a=adamhjk This adds basic activities into layerdb. Activities represent events that we expect someone else to react to. The first two examples are a quick port of the rebaser events. ```rust let stream = layerdb.activity_subscribe(Some([vec![ActivityPayloadDiscriminates::RebaseFinished])).await?; layerdb.activity_publish(&rebase_request)?; layerdb.activity_publish(&rebase_finished)?; ``` The resulting stream would only receive the second activity event. If you want to subscribe to all events, pass 'None' in as the argument to activity_subscribe. <img src="https://media0.giphy.com/media/lvDdZh6PZMzvSAUaPj/giphy.gif"/> Co-authored-by: Adam Jacob <adam@systeminit.com>
2 parents a4a4926 + 9bb00c6 commit a6dca45

13 files changed

Lines changed: 532 additions & 14 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/si-events-rs/src/tenancy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl FromStr for ChangeSetId {
8383
}
8484
}
8585

86-
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
86+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
8787
pub struct Tenancy {
8888
pub change_set_id: ChangeSetId,
8989
pub workspace_pk: WorkspacePk,

lib/si-layer-cache/BUCK

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ rust_library(
99
"//lib/si-std:si-std",
1010
"//lib/telemetry-rs:telemetry",
1111
"//third-party/rust:async-trait",
12-
"//third-party/rust:base64",
1312
"//third-party/rust:blake3",
1413
"//third-party/rust:bytes",
1514
"//third-party/rust:chrono",
@@ -20,6 +19,7 @@ rust_library(
2019
"//third-party/rust:refinery",
2120
"//third-party/rust:remain",
2221
"//third-party/rust:serde",
22+
"//third-party/rust:serde_json",
2323
"//third-party/rust:sled",
2424
"//third-party/rust:strum",
2525
"//third-party/rust:tempfile",

lib/si-layer-cache/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ publish = false
77

88
[dependencies]
99
async-trait = { workspace = true }
10-
base64 = { workspace = true }
1110
blake3 = { workspace = true }
1211
bytes = { workspace = true }
1312
chrono = { workspace = true }
@@ -30,6 +29,7 @@ thiserror = { workspace = true }
3029
tokio = { workspace = true }
3130
tokio-util = { workspace = true }
3231
ulid = { workspace = true }
32+
serde_json = { workspace = true }
3333

3434
[dev_dependencies]
3535
buck2-resources = { path = "../../lib/buck2-resources" }
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use std::{fmt, str::FromStr, sync::Arc};
2+
3+
use serde::{Deserialize, Serialize};
4+
use si_data_nats::{async_nats::jetstream, NatsClient};
5+
use strum::EnumDiscriminants;
6+
use ulid::{Ulid, ULID_LEN};
7+
8+
use crate::{
9+
error::LayerDbResult,
10+
event::LayeredEventMetadata,
11+
nats::{self, subject},
12+
};
13+
14+
use self::rebase::{RebaseFinished, RebaseRequest};
15+
16+
pub mod rebase;
17+
18+
#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
19+
pub struct ActivityId(Ulid);
20+
21+
impl ActivityId {
22+
pub fn new() -> ActivityId {
23+
ActivityId(Ulid::new())
24+
}
25+
26+
pub fn array_to_str<'buf>(&self, buf: &'buf mut [u8; ULID_LEN]) -> &'buf mut str {
27+
self.0.array_to_str(buf)
28+
}
29+
30+
pub fn into_inner(self) -> Ulid {
31+
self.0
32+
}
33+
}
34+
35+
impl Default for ActivityId {
36+
fn default() -> Self {
37+
Self::new()
38+
}
39+
}
40+
41+
impl FromStr for ActivityId {
42+
type Err = ulid::DecodeError;
43+
44+
fn from_str(s: &str) -> Result<Self, Self::Err> {
45+
Ok(Self(Ulid::from_str(s)?))
46+
}
47+
}
48+
49+
impl From<ulid::Ulid> for ActivityId {
50+
fn from(value: ulid::Ulid) -> Self {
51+
Self(value)
52+
}
53+
}
54+
55+
impl fmt::Display for ActivityId {
56+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57+
self.0.fmt(f)
58+
}
59+
}
60+
61+
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
62+
pub struct Activity {
63+
pub id: ActivityId,
64+
pub payload: ActivityPayload,
65+
pub metadata: LayeredEventMetadata,
66+
}
67+
68+
impl Activity {
69+
pub fn new(payload: ActivityPayload, metadata: LayeredEventMetadata) -> Activity {
70+
Activity {
71+
id: ActivityId::new(),
72+
payload,
73+
metadata,
74+
}
75+
}
76+
77+
pub fn rebase(request: RebaseRequest, metadata: LayeredEventMetadata) -> Activity {
78+
Activity::new(ActivityPayload::RebaseRequest(request), metadata)
79+
}
80+
81+
pub fn rebase_finished(request: RebaseFinished, metadata: LayeredEventMetadata) -> Activity {
82+
Activity::new(ActivityPayload::RebaseFinished(request), metadata)
83+
}
84+
}
85+
86+
#[derive(Debug, Serialize, Deserialize, Clone, EnumDiscriminants, PartialEq, Eq)]
87+
pub enum ActivityPayload {
88+
RebaseRequest(RebaseRequest),
89+
RebaseFinished(RebaseFinished),
90+
}
91+
92+
impl ActivityPayload {
93+
pub fn to_subject(&self) -> String {
94+
let discriminate: ActivityPayloadDiscriminants = self.into();
95+
discriminate.to_subject()
96+
}
97+
}
98+
99+
impl ActivityPayloadDiscriminants {
100+
pub fn to_subject(&self) -> String {
101+
match self {
102+
ActivityPayloadDiscriminants::RebaseRequest => "rebase.request".to_string(),
103+
ActivityPayloadDiscriminants::RebaseFinished => "rebase.finished".to_string(),
104+
}
105+
}
106+
}
107+
108+
#[derive(Debug, Clone)]
109+
pub struct ActivityPublisher {
110+
prefix: Option<Arc<str>>,
111+
context: jetstream::context::Context,
112+
}
113+
114+
impl ActivityPublisher {
115+
pub fn new(nats_client: &NatsClient) -> ActivityPublisher {
116+
let prefix = nats_client.metadata().subject_prefix().map(|s| s.into());
117+
let context = jetstream::new(nats_client.as_inner().clone());
118+
ActivityPublisher { context, prefix }
119+
}
120+
121+
pub fn prefix(&self) -> Option<&str> {
122+
self.prefix.as_deref()
123+
}
124+
125+
pub fn publish(&self, activity: &Activity) -> LayerDbResult<()> {
126+
let nats_subject = subject::for_activity(self.prefix(), activity);
127+
let nats = self.context.clone();
128+
let nats_payload = postcard::to_stdvec(&activity)?;
129+
let _nats_join =
130+
tokio::spawn(async move { nats.publish(nats_subject, nats_payload.into()).await });
131+
Ok(())
132+
}
133+
}
134+
135+
#[allow(dead_code)]
136+
pub struct ActivitySubscriber {
137+
instance_id: Ulid,
138+
messages: jetstream::consumer::pull::Stream,
139+
}
140+
141+
impl ActivitySubscriber {
142+
pub async fn new(
143+
instance_id: Ulid,
144+
nats_client: &NatsClient,
145+
to_receive: Option<Vec<ActivityPayloadDiscriminants>>,
146+
) -> LayerDbResult<ActivitySubscriber> {
147+
let context = jetstream::new(nats_client.as_inner().clone());
148+
149+
let activities =
150+
nats::layerdb_activities_stream(&context, nats_client.metadata().subject_prefix())
151+
.await?
152+
.create_consumer(Self::consumer_config(
153+
nats_client.metadata().subject_prefix(),
154+
instance_id,
155+
to_receive,
156+
))
157+
.await?
158+
.messages()
159+
.await?;
160+
161+
Ok(ActivitySubscriber {
162+
instance_id,
163+
messages: activities,
164+
})
165+
}
166+
167+
pub fn messages(&mut self) -> &mut jetstream::consumer::pull::Stream {
168+
&mut self.messages
169+
}
170+
171+
#[inline]
172+
fn consumer_config(
173+
prefix: Option<&str>,
174+
instance_id: Ulid,
175+
to_receive: Option<Vec<ActivityPayloadDiscriminants>>,
176+
) -> jetstream::consumer::pull::Config {
177+
let name = format!("activity-stream-{instance_id}");
178+
let description = format!("activity stream for [{name}]");
179+
180+
match to_receive {
181+
Some(payload_types) => jetstream::consumer::pull::Config {
182+
name: Some(name),
183+
description: Some(description),
184+
deliver_policy: jetstream::consumer::DeliverPolicy::New,
185+
filter_subjects: payload_types
186+
.iter()
187+
.map(|t| nats::subject::for_activity_discriminate(prefix, t))
188+
.map(|s| s.to_string())
189+
.collect(),
190+
..Default::default()
191+
},
192+
None => jetstream::consumer::pull::Config {
193+
name: Some(name),
194+
description: Some(description),
195+
deliver_policy: jetstream::consumer::DeliverPolicy::New,
196+
..Default::default()
197+
},
198+
}
199+
}
200+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use serde::{Deserialize, Serialize};
2+
use serde_json::Value;
3+
use ulid::Ulid;
4+
5+
/// The message that the server receives to perform a rebase.
6+
#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
7+
pub struct RebaseRequest {
8+
/// Corresponds to the change set whose pointer is to be updated.
9+
pub to_rebase_change_set_id: Ulid,
10+
/// Corresponds to the workspace snapshot that will be the "onto" workspace snapshot when
11+
/// rebasing the "to rebase" workspace snapshot.
12+
pub onto_workspace_snapshot_id: Ulid,
13+
/// Derived from the ephemeral or persisted change set that's either the base change set, the
14+
/// last change set before edits were made, or the change set that you are trying to rebase
15+
/// onto base.
16+
pub onto_vector_clock_id: Ulid,
17+
}
18+
19+
impl RebaseRequest {
20+
pub fn new(
21+
to_rebase_change_set_id: Ulid,
22+
onto_workspace_snapshot_id: Ulid,
23+
onto_vector_clock_id: Ulid,
24+
) -> RebaseRequest {
25+
RebaseRequest {
26+
to_rebase_change_set_id,
27+
onto_workspace_snapshot_id,
28+
onto_vector_clock_id,
29+
}
30+
}
31+
}
32+
33+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
34+
pub struct RebaseFinished {
35+
status: RebaseStatus,
36+
to_rebase_change_set_id: Ulid,
37+
onto_workspace_snapshot_id: Ulid,
38+
}
39+
40+
impl RebaseFinished {
41+
pub fn new(
42+
status: RebaseStatus,
43+
to_rebase_change_set_id: Ulid,
44+
onto_workspace_snapshot_id: Ulid,
45+
) -> RebaseFinished {
46+
RebaseFinished {
47+
status,
48+
to_rebase_change_set_id,
49+
onto_workspace_snapshot_id,
50+
}
51+
}
52+
}
53+
54+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
55+
pub enum RebaseStatus {
56+
/// Processing the request and performing updates were both successful. Additionally, no conflicts were found.
57+
Success {
58+
/// The serialized updates performed when rebasing.
59+
updates_performed: Value,
60+
},
61+
/// Conflicts found when processing the request.
62+
ConflictsFound {
63+
/// A serialized list of the conflicts found during detection.
64+
conflicts_found: Value,
65+
/// A serialized list of the updates found during detection and skipped because at least
66+
/// once conflict was found.
67+
updates_found_and_skipped: Value,
68+
},
69+
/// Error encountered when processing the request.
70+
Error {
71+
/// The error message.
72+
message: String,
73+
},
74+
}

lib/si-layer-cache/src/chunking_nats.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ impl ChunkedMessagesStream {
232232
},
233233
ackers: Arc::new(ackers),
234234
};
235-
236235
Some(Poll::Ready(Some(Ok(new_msg))))
237236
} else {
238237
// Still more chunks to go with this message, so try for more

lib/si-layer-cache/src/db.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use telemetry::tracing::{info, warn};
66
use ulid::Ulid;
77

88
use crate::{
9+
activities::{Activity, ActivityPayloadDiscriminants, ActivityPublisher, ActivitySubscriber},
910
error::LayerDbResult,
1011
layer_cache::LayerCache,
1112
persister::{PersisterClient, PersisterServer},
@@ -28,6 +29,7 @@ where
2829
pg_pool: PgPool,
2930
nats_client: NatsClient,
3031
persister_client: PersisterClient,
32+
activity_publisher: ActivityPublisher,
3133
instance_id: Ulid,
3234
tracker: TaskTracker,
3335
cancellation_token: CancellationToken,
@@ -88,7 +90,10 @@ where
8890

8991
let cas = CasDb::new(cas_cache, persister_client.clone());
9092

93+
let activity_publisher = ActivityPublisher::new(&nats_client);
94+
9195
Ok(LayerDb {
96+
activity_publisher,
9297
cas,
9398
sled,
9499
pg_pool,
@@ -138,4 +143,18 @@ where
138143

139144
Ok(())
140145
}
146+
147+
// Publish an activity
148+
pub fn activity_publish(&self, activity: &Activity) -> LayerDbResult<()> {
149+
self.activity_publisher.publish(activity)
150+
}
151+
152+
// Subscribe to all activities, or provide an optional array of activity kinds
153+
// to subscribe to.
154+
pub async fn activity_subscribe(
155+
&self,
156+
to_receive: Option<Vec<ActivityPayloadDiscriminants>>,
157+
) -> LayerDbResult<ActivitySubscriber> {
158+
ActivitySubscriber::new(self.instance_id, &self.nats_client, to_receive).await
159+
}
141160
}

lib/si-layer-cache/src/event.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ use si_events::{Actor, Tenancy, WebEvent};
88
use strum::AsRefStr;
99
use ulid::Ulid;
1010

11-
#[derive(Debug, Serialize, Deserialize)]
11+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
1212
pub struct LayeredEventMetadata {
1313
pub tenancy: Tenancy,
1414
pub actor: Actor,
1515
pub timestamp: DateTime<Utc>,
1616
}
1717

1818
impl LayeredEventMetadata {
19-
fn new(tenancy: Tenancy, actor: Actor) -> Self {
19+
pub fn new(tenancy: Tenancy, actor: Actor) -> Self {
2020
LayeredEventMetadata {
2121
tenancy,
2222
actor,

0 commit comments

Comments
 (0)