Skip to content

Commit 23bb6f5

Browse files
committed
Merge branch 'main' into devnet4
2 parents 66053d8 + e98859d commit 23bb6f5

8 files changed

Lines changed: 110 additions & 38 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ libssz-types = "0.2"
6666
# Build-time version info
6767
vergen-git2 = { version = "9", features = ["rustc"] }
6868

69+
rayon = "1.11"
6970
rand = "0.9"
7071
rocksdb = "0.24"
7172
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }

crates/blockchain/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ spawned-concurrency.workspace = true
2323

2424
tokio.workspace = true
2525

26+
rayon.workspace = true
2627
thiserror.workspace = true
2728
tracing.workspace = true
2829

crates/blockchain/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,17 @@ impl BlockChainServer {
198198
signature,
199199
};
200200

201+
// Self-deliver: store our own attestation locally for aggregation.
202+
// Gossipsub does not deliver messages back to the sender, so without
203+
// this the aggregator never sees its own validator's signature in
204+
// gossip_signatures and it is excluded from aggregated proofs.
205+
if self.is_aggregator {
206+
let _ = store::on_gossip_attestation(&mut self.store, signed_attestation.clone())
207+
.inspect_err(|err| {
208+
warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed")
209+
});
210+
}
211+
201212
// Publish to gossip network
202213
if let Some(ref p2p) = self.p2p {
203214
let _ = p2p.publish_attestation(signed_attestation).inspect_err(

crates/blockchain/src/store.rs

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,40 +1271,58 @@ fn verify_signatures(
12711271
let validators = &state.validators;
12721272
let num_validators = validators.len() as u64;
12731273

1274-
// Verify each attestation's signature proof
1274+
// Verify each attestation's signature proof in parallel
12751275
let aggregated_start = std::time::Instant::now();
1276-
for (attestation, aggregated_proof) in attestations.iter().zip(attestation_signatures) {
1277-
if attestation.aggregation_bits != aggregated_proof.participants {
1278-
return Err(StoreError::ParticipantsMismatch);
1279-
}
12801276

1281-
let slot: u32 = attestation.data.slot.try_into().expect("slot exceeds u32");
1282-
let message = attestation.data.hash_tree_root();
1277+
// Prepare verification inputs sequentially (cheap: bit checks + pubkey lookups)
1278+
let verification_inputs: Vec<_> = attestations
1279+
.iter()
1280+
.zip(attestation_signatures)
1281+
.map(|(attestation, aggregated_proof)| {
1282+
if attestation.aggregation_bits != aggregated_proof.participants {
1283+
return Err(StoreError::ParticipantsMismatch);
1284+
}
12831285

1284-
// Collect public keys with bounds check in a single pass
1285-
let public_keys: Vec<_> = validator_indices(&attestation.aggregation_bits)
1286-
.map(|vid| {
1287-
if vid >= num_validators {
1288-
return Err(StoreError::InvalidValidatorIndex);
1289-
}
1290-
validators[vid as usize]
1291-
.get_pubkey()
1292-
.map_err(|_| StoreError::PubkeyDecodingFailed(vid))
1293-
})
1294-
.collect::<Result<_, _>>()?;
1286+
let slot: u32 = attestation.data.slot.try_into().expect("slot exceeds u32");
1287+
let message = attestation.data.hash_tree_root();
1288+
1289+
let public_keys: Vec<_> = validator_indices(&attestation.aggregation_bits)
1290+
.map(|vid| {
1291+
if vid >= num_validators {
1292+
return Err(StoreError::InvalidValidatorIndex);
1293+
}
1294+
validators[vid as usize]
1295+
.get_pubkey()
1296+
.map_err(|_| StoreError::PubkeyDecodingFailed(vid))
1297+
})
1298+
.collect::<Result<_, _>>()?;
12951299

1296-
let verification_result = {
1297-
let _timing = metrics::time_pq_sig_aggregated_signatures_verification();
1298-
verify_aggregated_signature(&aggregated_proof.proof_data, public_keys, &message, slot)
1299-
};
1300-
match verification_result {
1301-
Ok(()) => metrics::inc_pq_sig_aggregated_signatures_valid(),
1302-
Err(e) => {
1303-
metrics::inc_pq_sig_aggregated_signatures_invalid();
1304-
return Err(StoreError::AggregateVerificationFailed(e));
1300+
Ok((&aggregated_proof.proof_data, public_keys, message, slot))
1301+
})
1302+
.collect::<Result<_, StoreError>>()?;
1303+
1304+
// Run expensive signature verification in parallel.
1305+
// into_par_iter() moves each tuple, avoiding a clone of public_keys.
1306+
use rayon::prelude::*;
1307+
verification_inputs.into_par_iter().try_for_each(
1308+
|(proof_data, public_keys, message, slot)| {
1309+
let result = {
1310+
let _timing = metrics::time_pq_sig_aggregated_signatures_verification();
1311+
verify_aggregated_signature(proof_data, public_keys, &message, slot)
1312+
};
1313+
match result {
1314+
Ok(()) => {
1315+
metrics::inc_pq_sig_aggregated_signatures_valid();
1316+
Ok(())
1317+
}
1318+
Err(e) => {
1319+
metrics::inc_pq_sig_aggregated_signatures_invalid();
1320+
Err(StoreError::AggregateVerificationFailed(e))
1321+
}
13051322
}
1306-
}
1307-
}
1323+
},
1324+
)?;
1325+
13081326
let aggregated_elapsed = aggregated_start.elapsed();
13091327

13101328
let proposer_start = std::time::Instant::now();

crates/net/p2p/src/gossipsub/handler.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,18 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte
142142
.cloned()
143143
.unwrap_or_else(|| attestation_subnet_topic(subnet_id));
144144

145-
// Publish to the attestation subnet topic
146-
server.swarm_handle.publish(topic, compressed);
145+
// Publish to the attestation subnet topic.
146+
// Aggregators are subscribed to the subnet, so gossipsub uses mesh (not fanout).
147+
// In small networks where no other node subscribes, this returns
148+
// NoPeersSubscribedToTopic. Use best-effort to suppress the expected warning;
149+
// the aggregator already self-delivered its attestation locally.
150+
if server.is_aggregator {
151+
server
152+
.swarm_handle
153+
.publish_ignore_no_peers(topic, compressed);
154+
} else {
155+
server.swarm_handle.publish(topic, compressed);
156+
}
147157
info!(
148158
%slot,
149159
validator,

crates/net/p2p/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub struct BuiltSwarm {
9292
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
9393
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
9494
pub(crate) bootnode_addrs: HashMap<PeerId, Multiaddr>,
95+
pub(crate) is_aggregator: bool,
9596
}
9697

9798
/// Build and configure the libp2p swarm, dial bootnodes, subscribe to topics.
@@ -257,6 +258,7 @@ pub fn build_swarm(
257258
block_topic,
258259
aggregation_topic,
259260
bootnode_addrs,
261+
is_aggregator: config.is_aggregator,
260262
})
261263
}
262264

@@ -280,6 +282,7 @@ impl P2P {
280282
attestation_committee_count: built.attestation_committee_count,
281283
block_topic: built.block_topic,
282284
aggregation_topic: built.aggregation_topic,
285+
is_aggregator: built.is_aggregator,
283286
connected_peers: HashSet::new(),
284287
pending_requests: HashMap::new(),
285288
request_id_map: HashMap::new(),
@@ -314,6 +317,7 @@ pub struct P2PServer {
314317
pub(crate) attestation_committee_count: u64,
315318
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
316319
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
320+
pub(crate) is_aggregator: bool,
317321

318322
pub(crate) connected_peers: HashSet<PeerId>,
319323
pub(crate) pending_requests: HashMap<H256, PendingRequest>,

crates/net/p2p/src/swarm_adapter.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use libp2p::{
22
Multiaddr, PeerId, StreamProtocol,
33
futures::StreamExt,
4+
gossipsub::PublishError,
45
request_response::{self, OutboundRequestId},
56
swarm::SwarmEvent,
67
};
@@ -13,6 +14,8 @@ pub enum SwarmCommand {
1314
Publish {
1415
topic: libp2p::gossipsub::IdentTopic,
1516
data: Vec<u8>,
17+
/// When true, suppress NoPeersSubscribedToTopic errors (other errors still warn).
18+
ignore_no_peers: bool,
1619
},
1720
Dial(Multiaddr),
1821
SendRequest {
@@ -37,7 +40,25 @@ impl SwarmHandle {
3740
pub fn publish(&self, topic: libp2p::gossipsub::IdentTopic, data: Vec<u8>) {
3841
let _ = self
3942
.cmd_tx
40-
.send(SwarmCommand::Publish { topic, data })
43+
.send(SwarmCommand::Publish {
44+
topic,
45+
data,
46+
ignore_no_peers: false,
47+
})
48+
.inspect_err(|_| warn!("Swarm adapter closed, cannot publish"));
49+
}
50+
51+
/// Publish, suppressing NoPeersSubscribedToTopic errors. Used when the sender
52+
/// is also subscribed to the topic (e.g., aggregator publishing its own
53+
/// attestation to a subnet it subscribes to) and no other peer subscribes.
54+
pub fn publish_ignore_no_peers(&self, topic: libp2p::gossipsub::IdentTopic, data: Vec<u8>) {
55+
let _ = self
56+
.cmd_tx
57+
.send(SwarmCommand::Publish {
58+
topic,
59+
data,
60+
ignore_no_peers: true,
61+
})
4162
.inspect_err(|_| warn!("Swarm adapter closed, cannot publish"));
4263
}
4364

@@ -123,12 +144,17 @@ async fn swarm_loop(
123144

124145
fn execute_command(swarm: &mut libp2p::Swarm<Behaviour>, cmd: SwarmCommand) {
125146
match cmd {
126-
SwarmCommand::Publish { topic, data } => {
127-
let _ = swarm
128-
.behaviour_mut()
129-
.gossipsub
130-
.publish(topic, data)
131-
.inspect_err(|err| warn!(%err, "Swarm adapter: publish failed"));
147+
SwarmCommand::Publish {
148+
topic,
149+
data,
150+
ignore_no_peers,
151+
} => {
152+
let result = swarm.behaviour_mut().gossipsub.publish(topic, data);
153+
if let Err(err) = result
154+
&& !(ignore_no_peers && matches!(err, PublishError::NoPeersSubscribedToTopic))
155+
{
156+
warn!(%err, "Swarm adapter: publish failed");
157+
}
132158
}
133159
SwarmCommand::Dial(addr) => {
134160
let _ = swarm

0 commit comments

Comments
 (0)