Skip to content

Commit 95adf51

Browse files
committed
graph, node, store: Change how restore deterimes the shard
If --shard is not given, consult deployment rules first, only fall back to the primary if that doesn't yield a result
1 parent b801c44 commit 95adf51

4 files changed

Lines changed: 25 additions & 45 deletions

File tree

graph/src/data/store/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ pub mod ethereum;
3333
/// Conversion of values to/from SQL
3434
pub mod sql;
3535

36+
lazy_static! {
37+
/// The name of the default node is `"default"`. This is used when no
38+
/// node is specified for a deployment.
39+
pub static ref DEFAULT_NODE_ID: NodeId = NodeId::new("default").unwrap();
40+
}
41+
3642
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
3743
pub struct NodeId(String);
3844

node/src/bin/manager.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,10 @@ pub enum Command {
352352
Restore {
353353
/// Path to the dump directory
354354
directory: String,
355-
/// The database shard to restore into (default: primary)
355+
/// The database shard to restore into. When not given, use the
356+
/// deployment rules to determine the shard, or default to the
357+
/// primary shard if no rules match. This option is required when
358+
/// using `--add`
356359
#[clap(long)]
357360
shard: Option<String>,
358361
/// Subgraph name for deployment rule matching and node assignment.

node/src/manager/commands/restore.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use graph::{bail, prelude::anyhow::Result};
44

55
use graph::prelude::SubgraphName;
6-
use graph_store_postgres::{RestoreMode, Shard, SubgraphStore, PRIMARY_SHARD};
6+
use graph_store_postgres::{RestoreMode, Shard, SubgraphStore};
77

88
pub async fn run(
99
subgraph_store: Arc<SubgraphStore>,
@@ -28,11 +28,6 @@ pub async fn run(
2828
);
2929
}
3030

31-
let shard = match shard {
32-
Some(s) => Shard::new(s)?,
33-
None => PRIMARY_SHARD.clone(),
34-
};
35-
3631
let name = name
3732
.map(|n| SubgraphName::new(n.clone()).map_err(|_| anyhow::anyhow!("invalid name `{n}`")))
3833
.transpose()?;
@@ -47,6 +42,8 @@ pub async fn run(
4742
RestoreMode::Default
4843
};
4944

45+
let shard = shard.map(Shard::new).transpose()?;
46+
5047
subgraph_store
5148
.restore(&directory, shard, name, mode)
5249
.await?;

store/postgres/src/subgraph_store.rs

Lines changed: 12 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use graph::{
2424
},
2525
data::{
2626
query::QueryTarget,
27+
store::DEFAULT_NODE_ID,
2728
subgraph::{schema::DeploymentCreate, status, DeploymentFeatures},
2829
},
2930
internal_error,
@@ -737,35 +738,6 @@ impl Inner {
737738
}
738739
}
739740

740-
/// Determine the target node for a deployment using the configured
741-
/// deployment rules, ignoring the shard selection. Returns an error
742-
/// if no rule matches.
743-
async fn node_for_deployment(
744-
&self,
745-
name: &SubgraphName,
746-
network: &str,
747-
) -> Result<NodeId, StoreError> {
748-
let placement = self
749-
.placer
750-
.place(name.as_str(), network)
751-
.map_err(|msg| internal_error!("illegal indexer name in deployment rule: {}", msg))?;
752-
753-
match placement {
754-
Some((_, nodes)) if !nodes.is_empty() => {
755-
if nodes.len() == 1 {
756-
Ok(nodes.into_iter().next().unwrap())
757-
} else {
758-
let mut pconn = self.primary_conn().await?;
759-
// unwrap: nodes is not empty
760-
Ok(pconn.least_assigned_node(&nodes).await?.unwrap())
761-
}
762-
}
763-
_ => Err(StoreError::InternalError(
764-
"no deployment rule matches this deployment".into(),
765-
)),
766-
}
767-
}
768-
769741
pub async fn copy_deployment(
770742
&self,
771743
src: &DeploymentLocator,
@@ -1481,7 +1453,7 @@ impl Inner {
14811453
pub async fn restore(
14821454
&self,
14831455
dir: &std::path::Path,
1484-
shard: Shard,
1456+
shard: Option<Shard>,
14851457
name: Option<SubgraphName>,
14861458
mode: RestoreMode,
14871459
) -> Result<(), StoreError> {
@@ -1490,11 +1462,6 @@ impl Inner {
14901462
let metadata_path = dir.join("metadata.json");
14911463
let metadata = Metadata::from_file(&metadata_path)?;
14921464

1493-
// Validate that the target shard exists before making any DB changes
1494-
self.stores
1495-
.get(&shard)
1496-
.ok_or_else(|| StoreError::UnknownShard(shard.to_string()))?;
1497-
14981465
// Resolve the subgraph name for deployment rule matching. If not
14991466
// supplied, look up an existing name from the DB; error if none.
15001467
let name = match name {
@@ -1517,9 +1484,16 @@ impl Inner {
15171484
};
15181485

15191486
// Use deployment rules to determine which node should index this
1520-
// deployment. The rules also return candidate shards, but we ignore
1521-
// those since the shard is user-specified for restore.
1522-
let node = self.node_for_deployment(&name, &metadata.network).await?;
1487+
// deployment and how to place it.
1488+
let (placed_shard, node) = self
1489+
.place(&name, &metadata.network, DEFAULT_NODE_ID.clone())
1490+
.await?;
1491+
let shard = shard.unwrap_or(placed_shard);
1492+
1493+
// Validate that the target shard exists before making any DB changes
1494+
self.stores
1495+
.get(&shard)
1496+
.ok_or_else(|| StoreError::UnknownShard(shard.to_string()))?;
15231497

15241498
let mut pconn = self.primary_conn().await?;
15251499
let action = pconn

0 commit comments

Comments
 (0)