Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit 21813a7

Browse files
zacharyhammjhelwignickgeracesprutton1
committed
fix(dal): synchronize graph access in dependent values update
The concurrency types used by the WorkspaceSnapshot to provide interior mutability in the tokio run time are *not* sufficient to prevent data races when operating on the same graph on different threads, since our graph operations are not "atomic" and the graph *WILL* end up being read from different threads while a write operation is still in progress if it is shared between threads for modification. For example after a node is added but *before* the edges necessary to place that node in the right spot in the graph. A more general solution is needed to represent the concept of a "transaction" when mutating the graph, to prevent other readers from seeing incomplete graphs from in-progress write operations. For now, we have added a lock to `DependentValuesUpdate` that ensures the portion of the update that gathers the inputs to a function acquires a read lock, (which is released as soon as the function is sent to the executor, so that we don't hold on to it while executing in veritech), while the portion of the update that updates the graph with the execution result acquires a write lock. Co-Authored-By: Jacob Helwig <jacob@systeminit.com> Co-Authored-By: Nick Gerace <nick@systeminit.com> Co-Authored-By: Scott Prutton <scott@systeminit.com>
1 parent 3fc360f commit 21813a7

3 files changed

Lines changed: 49 additions & 6 deletions

File tree

lib/dal/src/attribute/value.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use serde::{Deserialize, Serialize};
4646
use serde_json::Value;
4747
use telemetry::prelude::*;
4848
use thiserror::Error;
49-
use tokio::sync::TryLockError;
49+
use tokio::sync::{RwLock, TryLockError};
5050
use ulid::Ulid;
5151

5252
pub use dependent_value_graph::DependentValueGraph;
@@ -497,7 +497,17 @@ impl AttributeValue {
497497
pub async fn execute_prototype_function(
498498
ctx: &DalContext,
499499
attribute_value_id: AttributeValueId,
500+
read_lock: Arc<RwLock<()>>,
500501
) -> AttributeValueResult<PrototypeExecutionResult> {
502+
// When functions are being executed in the dependent values update job,
503+
// we need to ensure we are not reading our input sources from a graph
504+
// that is in the process of being mutated on another thread, since it
505+
// will be incomplete (some nodes will not have all their edges added
506+
// yet, for example, or a reference replacement may still be in
507+
// progress). To handle this here, we grab a read lock, which will be
508+
// locked for writing in the dependent values update job while the
509+
// execution result is being written to the graph.
510+
let read_guard = read_lock.read().await;
501511
let prototype_id = AttributeValue::prototype_id(ctx, attribute_value_id).await?;
502512
let prototype_func_id = AttributePrototype::func_id(ctx, prototype_id).await?;
503513
let destination_component_id =
@@ -619,6 +629,10 @@ impl AttributeValue {
619629
.await
620630
.map_err(|e| AttributeValueError::BeforeFunc(e.to_string()))?;
621631

632+
// We have gathered up all our inputs and so no longer need a lock on
633+
// the graph. Be sure not to add graph walk operations below this drop
634+
drop(read_guard);
635+
622636
let (_, func_binding_return_value) = match FuncBinding::create_and_execute(
623637
ctx,
624638
prepared_func_binding_args.clone(),
@@ -721,8 +735,10 @@ impl AttributeValue {
721735
ctx: &DalContext,
722736
attribute_value_id: AttributeValueId,
723737
) -> AttributeValueResult<()> {
738+
// this lock is never locked for writing so is effectively a no-op here
739+
let read_lock = Arc::new(RwLock::new(()));
724740
let execution_result =
725-
AttributeValue::execute_prototype_function(ctx, attribute_value_id).await?;
741+
AttributeValue::execute_prototype_function(ctx, attribute_value_id, read_lock).await?;
726742

727743
AttributeValue::set_values_from_execution_result(ctx, attribute_value_id, execution_result)
728744
.await?;

lib/dal/src/job/definition/dependent_values_update.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use std::{
22
collections::{HashMap, HashSet},
33
convert::TryFrom,
4+
sync::Arc,
45
};
56

67
use async_trait::async_trait;
78
use serde::{Deserialize, Serialize};
89
use telemetry::prelude::*;
910
use thiserror::Error;
10-
use tokio::task::{JoinError, JoinSet};
11+
use tokio::{
12+
sync::RwLock,
13+
task::{JoinError, JoinSet},
14+
};
1115
use ulid::Ulid;
1216

1317
//use crate::tasks::StatusReceiverClient;
@@ -58,6 +62,8 @@ pub struct DependentValuesUpdate {
5862
access_builder: AccessBuilder,
5963
visibility: Visibility,
6064
job: Option<JobInfo>,
65+
#[serde(skip)]
66+
set_value_lock: Arc<RwLock<()>>,
6167
}
6268

6369
impl DependentValuesUpdate {
@@ -71,6 +77,7 @@ impl DependentValuesUpdate {
7177
access_builder,
7278
visibility,
7379
job: None,
80+
set_value_lock: Arc::new(RwLock::new(())),
7481
})
7582
}
7683
}
@@ -149,6 +156,7 @@ impl DependentValuesUpdate {
149156
id,
150157
ctx.clone(),
151158
attribute_value_id,
159+
self.set_value_lock.clone(),
152160
));
153161
task_id_to_av_id.insert(id, attribute_value_id);
154162
seen_ids.insert(attribute_value_id);
@@ -161,6 +169,12 @@ impl DependentValuesUpdate {
161169
if let Some(finished_value_id) = task_id_to_av_id.remove(&task_id) {
162170
match execution_result {
163171
Ok(execution_values) => {
172+
// Lock the graph for writing inside this job. The
173+
// lock will be released when this guard is dropped
174+
// at the end of the scope.
175+
#[allow(unused_variables)]
176+
let write_guard = self.set_value_lock.write().await;
177+
164178
match AttributeValue::set_values_from_execution_result(
165179
ctx,
166180
finished_value_id,
@@ -216,10 +230,12 @@ async fn values_from_prototype_function_execution(
216230
task_id: Ulid,
217231
ctx: DalContext,
218232
attribute_value_id: AttributeValueId,
233+
set_value_lock: Arc<RwLock<()>>,
219234
) -> (Ulid, DependentValueUpdateResult<PrototypeExecutionResult>) {
220-
let result = AttributeValue::execute_prototype_function(&ctx, attribute_value_id)
221-
.await
222-
.map_err(Into::into);
235+
let result =
236+
AttributeValue::execute_prototype_function(&ctx, attribute_value_id, set_value_lock)
237+
.await
238+
.map_err(Into::into);
223239

224240
(task_id, result)
225241
}
@@ -234,6 +250,7 @@ impl TryFrom<JobInfo> for DependentValuesUpdate {
234250
access_builder: job.access_builder,
235251
visibility: job.visibility,
236252
job: Some(job),
253+
set_value_lock: Arc::new(RwLock::new(())),
237254
})
238255
}
239256
}

lib/dal/src/workspace_snapshot.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,16 @@ pub enum WorkspaceSnapshotError {
101101

102102
pub type WorkspaceSnapshotResult<T> = Result<T, WorkspaceSnapshotError>;
103103

104+
/// The workspace graph. The concurrency types used here to give us interior
105+
/// mutability in the tokio run time are *not* sufficient to prevent data races
106+
/// when operating on the same graph on different threads, since our graph
107+
/// operations are not "atomic" and the graph *WILL* end up being read from
108+
/// different threads while a write operation is still in progress if it is
109+
/// shared between threads for modification. For example after a node is added
110+
/// but *before* the edges necessary to place that node in the right spot in the
111+
/// graph have been added. We need a more general solution here, but for now an
112+
/// example of synchronization when accessing a snapshot across threads can be
113+
/// found in [`crate::job::definition::DependentValuesUpdate`].
104114
#[derive(Debug, Clone)]
105115
pub struct WorkspaceSnapshot {
106116
address: Arc<RwLock<WorkspaceSnapshotAddress>>,

0 commit comments

Comments
 (0)