@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
88use si_crypto:: SymmetricCryptoService ;
99use si_data_nats:: { NatsClient , NatsError , NatsTxn } ;
1010use si_data_pg:: { InstrumentedClient , PgError , PgPool , PgPoolError , PgPoolResult , PgTxn } ;
11+ use si_events:: WorkspaceSnapshotAddress ;
1112use si_layer_cache:: db:: LayerDb ;
1213use telemetry:: prelude:: * ;
1314use thiserror:: Error ;
@@ -16,10 +17,9 @@ use tokio::time::Instant;
1617use veritech_client:: { Client as VeritechClient , CycloneEncryptionKey } ;
1718
1819use crate :: layer_db_types:: ContentTypes ;
19- use crate :: workspace_snapshot:: conflict:: Conflict ;
20- use crate :: workspace_snapshot:: update:: Update ;
21- use crate :: workspace_snapshot:: vector_clock:: VectorClockId ;
22- use crate :: workspace_snapshot:: WorkspaceSnapshotId ;
20+ use crate :: workspace_snapshot:: {
21+ conflict:: Conflict , graph:: WorkspaceSnapshotGraph , update:: Update , vector_clock:: VectorClockId ,
22+ } ;
2323use crate :: Workspace ;
2424use crate :: {
2525 change_set_pointer:: { ChangeSetId , ChangeSetPointer } ,
@@ -34,7 +34,7 @@ use crate::{
3434 WorkspacePk , WorkspaceSnapshot ,
3535} ;
3636
37- pub type DalLayerDb = LayerDb < ContentTypes > ;
37+ pub type DalLayerDb = LayerDb < ContentTypes , WorkspaceSnapshotGraph > ;
3838
3939/// A context type which contains handles to common core service dependencies.
4040///
@@ -207,17 +207,29 @@ impl ConnectionState {
207207 tenancy : & Tenancy ,
208208 rebase_request : Option < RebaseRequest > ,
209209 ) -> Result < ( Self , Option < Conflicts > ) , TransactionsError > {
210- match self {
211- Self :: Connections ( _) => {
212- trace ! ( "no active transactions present when commit was called, taking no action" ) ;
213- Ok ( ( self , None ) )
210+ let ( conns, nats_conn, rebaser_config) = match self {
211+ Self :: Connections ( conns) => {
212+ trace ! ( "no active transactions present when commit was called" ) ;
213+ let rebaser_config = conns. rebaser_config . clone ( ) ;
214+ let nats_conn = conns. nats_conn . clone ( ) ;
215+ Ok ( ( Self :: Connections ( conns) , nats_conn, rebaser_config) )
214216 }
215217 Self :: Transactions ( txns) => {
216- let ( conns, conflicts) = txns. commit_into_conns ( tenancy, rebase_request) . await ?;
217- Ok ( ( Self :: Connections ( conns) , conflicts) )
218+ let conns = txns. commit_into_conns ( ) . await ?;
219+ let rebaser_config = conns. rebaser_config . clone ( ) ;
220+ let nats_conn = conns. nats_conn . clone ( ) ;
221+ Ok ( ( Self :: Connections ( conns) , nats_conn, rebaser_config) )
218222 }
219223 Self :: Invalid => Err ( TransactionsError :: TxnCommit ) ,
220- }
224+ } ?;
225+
226+ let conflicts = if let Some ( rebase_request) = rebase_request {
227+ rebase ( tenancy, nats_conn, rebaser_config, rebase_request) . await ?
228+ } else {
229+ None
230+ } ;
231+
232+ Ok ( ( conns, conflicts) )
221233 }
222234
223235 async fn blocking_commit (
@@ -226,9 +238,24 @@ impl ConnectionState {
226238 rebase_request : Option < RebaseRequest > ,
227239 ) -> Result < ( Self , Option < Conflicts > ) , TransactionsError > {
228240 match self {
229- Self :: Connections ( _) => {
230- trace ! ( "no active transactions present when commit was called, taking no action" ) ;
231- Ok ( ( self , None ) )
241+ Self :: Connections ( conns) => {
242+ trace ! ( "no active transactions present when commit was called, but we will still attempt rebase" ) ;
243+
244+ // Even if there are no open dal transactions, we may have written to the layer db
245+ // and we need to perform a rebase if one is requested
246+ let conflicts = if let Some ( rebase_request) = rebase_request {
247+ rebase (
248+ tenancy,
249+ conns. nats_conn . clone ( ) ,
250+ conns. rebaser_config . clone ( ) ,
251+ rebase_request,
252+ )
253+ . await ?
254+ } else {
255+ None
256+ } ;
257+
258+ Ok ( ( Self :: Connections ( conns) , conflicts) )
232259 }
233260 Self :: Transactions ( txns) => {
234261 let ( conns, conflicts) = txns
@@ -345,7 +372,9 @@ impl DalContext {
345372 Ok ( ( ) )
346373 }
347374
348- pub async fn write_snapshot ( & self ) -> Result < Option < WorkspaceSnapshotId > , TransactionsError > {
375+ pub async fn write_snapshot (
376+ & self ,
377+ ) -> Result < Option < WorkspaceSnapshotAddress > , TransactionsError > {
349378 if let Some ( snapshot) = & self . workspace_snapshot {
350379 let vector_clock_id = self . change_set_pointer ( ) ?. vector_clock_id ( ) ;
351380
@@ -359,11 +388,11 @@ impl DalContext {
359388
360389 fn get_rebase_request (
361390 & self ,
362- onto_workspace_snapshot_id : WorkspaceSnapshotId ,
391+ onto_workspace_snapshot_address : WorkspaceSnapshotAddress ,
363392 ) -> Result < RebaseRequest , TransactionsError > {
364393 let vector_clock_id = self . change_set_pointer ( ) ?. vector_clock_id ( ) ;
365394 Ok ( RebaseRequest {
366- onto_workspace_snapshot_id ,
395+ onto_workspace_snapshot_address ,
367396 // the vector clock id of the current change set is just the id
368397 // of the current change set
369398 to_rebase_change_set_id : self . change_set_id ( ) ,
@@ -427,7 +456,9 @@ impl DalContext {
427456 /// Consumes all inner transactions and committing all changes made within them.
428457 pub async fn commit ( & self ) -> Result < Option < Conflicts > , TransactionsError > {
429458 let rebase_request = match self . write_snapshot ( ) . await ? {
430- Some ( workspace_snapshot_id) => Some ( self . get_rebase_request ( workspace_snapshot_id) ?) ,
459+ Some ( workspace_snapshot_address) => {
460+ Some ( self . get_rebase_request ( workspace_snapshot_address) ?)
461+ }
431462 None => None ,
432463 } ;
433464
@@ -510,9 +541,10 @@ impl DalContext {
510541 /// Consumes all inner transactions, committing all changes made within them, and
511542 /// blocks until all queued jobs have reported as finishing.
512543 pub async fn blocking_commit ( & self ) -> Result < Option < Conflicts > , TransactionsError > {
513- info ! ( "blocking_commit" ) ;
514544 let rebase_request = match self . write_snapshot ( ) . await ? {
515- Some ( workspace_snapshot_id) => Some ( self . get_rebase_request ( workspace_snapshot_id) ?) ,
545+ Some ( workspace_snapshot_address) => {
546+ Some ( self . get_rebase_request ( workspace_snapshot_address) ?)
547+ }
516548 None => None ,
517549 } ;
518550
@@ -1001,7 +1033,7 @@ pub enum TransactionsError {
10011033 #[ error( transparent) ]
10021034 PgPool ( #[ from] PgPoolError ) ,
10031035 #[ error( "rebase of snapshot {0} change set id {1} failed {2}" ) ]
1004- RebaseFailed ( WorkspaceSnapshotId , ChangeSetId , String ) ,
1036+ RebaseFailed ( WorkspaceSnapshotAddress , ChangeSetId , String ) ,
10051037 #[ error( transparent) ]
10061038 RebaserClient ( #[ from] RebaserClientError ) ,
10071039 #[ error( transparent) ]
@@ -1098,7 +1130,7 @@ pub struct Transactions {
10981130#[ derive( Clone , Debug ) ]
10991131pub struct RebaseRequest {
11001132 pub to_rebase_change_set_id : ChangeSetId ,
1101- pub onto_workspace_snapshot_id : WorkspaceSnapshotId ,
1133+ pub onto_workspace_snapshot_address : WorkspaceSnapshotAddress ,
11021134 pub onto_vector_clock_id : VectorClockId ,
11031135}
11041136
@@ -1127,7 +1159,7 @@ async fn rebase(
11271159 let response = rebaser_client
11281160 . request_rebase (
11291161 rebase_request. to_rebase_change_set_id . into ( ) ,
1130- rebase_request. onto_workspace_snapshot_id . into ( ) ,
1162+ rebase_request. onto_workspace_snapshot_address ,
11311163 rebase_request. onto_vector_clock_id . into ( ) ,
11321164 )
11331165 . await ?;
@@ -1136,7 +1168,7 @@ async fn rebase(
11361168 match response {
11371169 ReplyRebaseMessage :: Success { .. } => Ok ( None ) ,
11381170 ReplyRebaseMessage :: Error { message } => Err ( TransactionsError :: RebaseFailed (
1139- rebase_request. onto_workspace_snapshot_id ,
1171+ rebase_request. onto_workspace_snapshot_address ,
11401172 rebase_request. to_rebase_change_set_id ,
11411173 message,
11421174 ) ) ,
@@ -1190,33 +1222,18 @@ impl Transactions {
11901222 skip_all,
11911223 fields( )
11921224 ) ]
1193- pub async fn commit_into_conns (
1194- self ,
1195- tenancy : & Tenancy ,
1196- rebase_request : Option < RebaseRequest > ,
1197- ) -> Result < ( Connections , Option < Conflicts > ) , TransactionsError > {
1225+ pub async fn commit_into_conns ( self ) -> Result < Connections , TransactionsError > {
11981226 let pg_conn = self . pg_txn . commit_into_conn ( ) . await ?;
11991227 let nats_conn = self . nats_txn . commit_into_conn ( ) . await ?;
12001228
1201- let conflicts = if let Some ( rebase_request) = rebase_request {
1202- let start = Instant :: now ( ) ;
1203- let conflicts = rebase (
1204- tenancy,
1205- nats_conn. clone ( ) ,
1206- self . rebaser_config . clone ( ) ,
1207- rebase_request,
1208- )
1209- . await ?;
1210- info ! ( "rebase took: {:?}" , start. elapsed( ) ) ;
1211- conflicts
1212- } else {
1213- None
1214- } ;
1215-
12161229 self . job_processor . process_queue ( self . job_queue ) . await ?;
1217- let conns = Connections :: new ( pg_conn, nats_conn, self . job_processor , self . rebaser_config ) ;
12181230
1219- Ok ( ( conns, conflicts) )
1231+ Ok ( Connections :: new (
1232+ pg_conn,
1233+ nats_conn,
1234+ self . job_processor ,
1235+ self . rebaser_config ,
1236+ ) )
12201237 }
12211238
12221239 /// Consumes all inner transactions, committing all changes made within them, and returns
@@ -1236,7 +1253,6 @@ impl Transactions {
12361253 let nats_conn = self . nats_txn . commit_into_conn ( ) . await ?;
12371254
12381255 let conflicts = if let Some ( rebase_request) = rebase_request {
1239- info ! ( "rebase request" ) ;
12401256 rebase (
12411257 tenancy,
12421258 nats_conn. clone ( ) ,
0 commit comments