Skip to content

Commit 277f45e

Browse files
Add RPC provider failover for block ingestor (#6430)
* graph: Fix retry_strategy underflow when limit is 0 Use saturating_sub to avoid underflow when limit is set to 0. * chain/ethereum: Add RPC provider failover for block ingestor When do_poll() fails, the ingestor now probes the current provider before switching. If the current provider is unreachable, all alternatives are probed in parallel and the first healthy one is selected. This avoids unnecessary switches on non-RPC failures (e.g. DB errors, chain reorgs). Also limits latest_block_ptr retries to ENV_VARS.request_retries so failures surface to the failover logic instead of retrying indefinitely. * docs: Document block ingestor failover behavior * chain/ethereum: Return provider directly from resolve_provider Simplify resolve_provider_idx into resolve_provider by returning a reference to the Arc<A> instead of a usize index, eliminating the separate indexing step at the call site.
1 parent 192869f commit 277f45e

File tree

8 files changed

+442
-31
lines changed

8 files changed

+442
-31
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ tiny-keccak = "1.5.0"
1616
hex = "0.4.3"
1717
semver = { workspace = true }
1818
thiserror = { workspace = true }
19+
futures = { workspace = true }
1920
tokio = { workspace = true }
2021
tokio-stream = { workspace = true }
2122
tower = { workspace = true }

chain/ethereum/src/adapter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,6 +1174,11 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11741174
address: Address,
11751175
block_ptr: BlockPtr,
11761176
) -> Result<alloy::primitives::Bytes, EthereumRpcError>;
1177+
1178+
/// Returns a boolean indicating whether the adapter can reach
1179+
/// the RPC provider it is configured to use.
1180+
/// This is used to determine if a provider should be considered healthy.
1181+
async fn is_reachable(&self) -> bool;
11771182
}
11781183

11791184
#[cfg(test)]

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use std::convert::TryFrom;
5757
use std::iter::FromIterator;
5858
use std::pin::Pin;
5959
use std::sync::Arc;
60-
use std::time::Instant;
60+
use std::time::{Duration, Instant};
6161
use tokio::sync::RwLock;
6262
use tokio::time::timeout;
6363

@@ -1246,7 +1246,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
12461246
let alloy = self.alloy.clone();
12471247
retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
12481248
.redact_log_urls(true)
1249-
.no_limit()
1249+
.limit(ENV_VARS.request_retries)
12501250
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
12511251
.run(move || {
12521252
let alloy = alloy.cheap_clone();
@@ -1270,6 +1270,14 @@ impl EthereumAdapterTrait for EthereumAdapter {
12701270
.await
12711271
}
12721272

1273+
async fn is_reachable(&self) -> bool {
1274+
let alloy = self.alloy.clone();
1275+
tokio::time::timeout(Duration::from_secs(10), alloy.get_block_number())
1276+
.await
1277+
.map(|r| r.is_ok())
1278+
.unwrap_or(false)
1279+
}
1280+
12731281
async fn block_by_hash(
12741282
&self,
12751283
logger: &Logger,

0 commit comments

Comments
 (0)