Skip to content

Commit 06c4306

Browse files
authored
fix: self-deliver aggregator attestations to gossip_signatures (#270)
## Summary - Aggregator now calls `on_gossip_attestation()` locally before publishing to gossip, ensuring its own validator's attestation enters `gossip_signatures` and is included in aggregated proofs - Adds `publish_best_effort()` to suppress the expected `NoPeersSubscribedToTopic` warning when the aggregator publishes to attestation subnets it subscribes to - Threads `is_aggregator` from `SwarmConfig` through `BuiltSwarm` into `P2PServer` so the gossipsub handler can use best-effort publishing ## Context Gossipsub does not deliver messages back to the sender. The aggregator subscribes to the attestation subnet and publishes via mesh, but in small devnets no other node subscribes to attestation subnets. This causes the aggregator's own attestation publish to fail silently with `NoPeersSubscribedToTopic`, meaning its vote never enters `gossip_signatures`, is never aggregated, and is never included in blocks. The reference spec handles this explicitly in `_produce_attestations()`: ```python # Process attestation locally before publishing. # # Gossipsub does not deliver messages back to the sender. # Without local processing, the aggregator node never sees its own # validator's attestation in gossip_signatures, reducing the # aggregation count below the 2/3 safe-target threshold. self.sync_service.store = self.sync_service.store.on_gossip_attestation( signed_attestation=signed_attestation, is_aggregator=is_aggregator_role, ) ``` ## Test plan - [x] `cargo build` passes - [x] `cargo clippy --workspace -- -D warnings` passes - [x] `cargo test --workspace --release --lib` passes (all 32 unit tests) - [ ] Deploy to devnet and verify: - Validator 3's attestation appears in "Attestation processed" logs on the aggregator - Blocks from all proposers include aggregator's vote in aggregated proofs (higher attestation counts) - No more `NoPeersSubscribedToTopic` warnings on the aggregator node - Finalization continues normally
1 parent fd2cc63 commit 06c4306

4 files changed

Lines changed: 60 additions & 9 deletions

File tree

crates/blockchain/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,17 @@ impl BlockChainServer {
198198
signature,
199199
};
200200

201+
// Self-deliver: store our own attestation locally for aggregation.
202+
// Gossipsub does not deliver messages back to the sender, so without
203+
// this the aggregator never sees its own validator's signature in
204+
// gossip_signatures and it is excluded from aggregated proofs.
205+
if self.is_aggregator {
206+
let _ = store::on_gossip_attestation(&mut self.store, signed_attestation.clone())
207+
.inspect_err(|err| {
208+
warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed")
209+
});
210+
}
211+
201212
// Publish to gossip network
202213
if let Some(ref p2p) = self.p2p {
203214
let _ = p2p.publish_attestation(signed_attestation).inspect_err(

crates/net/p2p/src/gossipsub/handler.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,18 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte
142142
.cloned()
143143
.unwrap_or_else(|| attestation_subnet_topic(subnet_id));
144144

145-
// Publish to the attestation subnet topic
146-
server.swarm_handle.publish(topic, compressed);
145+
// Publish to the attestation subnet topic.
146+
// Aggregators are subscribed to the subnet, so gossipsub uses mesh (not fanout).
147+
// In small networks where no other node subscribes, this returns
148+
// NoPeersSubscribedToTopic. Use best-effort to suppress the expected warning;
149+
// the aggregator already self-delivered its attestation locally.
150+
if server.is_aggregator {
151+
server
152+
.swarm_handle
153+
.publish_ignore_no_peers(topic, compressed);
154+
} else {
155+
server.swarm_handle.publish(topic, compressed);
156+
}
147157
info!(
148158
%slot,
149159
validator,

crates/net/p2p/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub struct BuiltSwarm {
9292
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
9393
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
9494
pub(crate) bootnode_addrs: HashMap<PeerId, Multiaddr>,
95+
pub(crate) is_aggregator: bool,
9596
}
9697

9798
/// Build and configure the libp2p swarm, dial bootnodes, subscribe to topics.
@@ -257,6 +258,7 @@ pub fn build_swarm(
257258
block_topic,
258259
aggregation_topic,
259260
bootnode_addrs,
261+
is_aggregator: config.is_aggregator,
260262
})
261263
}
262264

@@ -280,6 +282,7 @@ impl P2P {
280282
attestation_committee_count: built.attestation_committee_count,
281283
block_topic: built.block_topic,
282284
aggregation_topic: built.aggregation_topic,
285+
is_aggregator: built.is_aggregator,
283286
connected_peers: HashSet::new(),
284287
pending_requests: HashMap::new(),
285288
request_id_map: HashMap::new(),
@@ -314,6 +317,7 @@ pub struct P2PServer {
314317
pub(crate) attestation_committee_count: u64,
315318
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
316319
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
320+
pub(crate) is_aggregator: bool,
317321

318322
pub(crate) connected_peers: HashSet<PeerId>,
319323
pub(crate) pending_requests: HashMap<H256, PendingRequest>,

crates/net/p2p/src/swarm_adapter.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use libp2p::{
22
Multiaddr, PeerId, StreamProtocol,
33
futures::StreamExt,
4+
gossipsub::PublishError,
45
request_response::{self, OutboundRequestId},
56
swarm::SwarmEvent,
67
};
@@ -13,6 +14,8 @@ pub enum SwarmCommand {
1314
Publish {
1415
topic: libp2p::gossipsub::IdentTopic,
1516
data: Vec<u8>,
17+
/// When true, suppress NoPeersSubscribedToTopic errors (other errors still warn).
18+
ignore_no_peers: bool,
1619
},
1720
Dial(Multiaddr),
1821
SendRequest {
@@ -37,7 +40,25 @@ impl SwarmHandle {
3740
pub fn publish(&self, topic: libp2p::gossipsub::IdentTopic, data: Vec<u8>) {
3841
let _ = self
3942
.cmd_tx
40-
.send(SwarmCommand::Publish { topic, data })
43+
.send(SwarmCommand::Publish {
44+
topic,
45+
data,
46+
ignore_no_peers: false,
47+
})
48+
.inspect_err(|_| warn!("Swarm adapter closed, cannot publish"));
49+
}
50+
51+
/// Publish, suppressing NoPeersSubscribedToTopic errors. Used when the sender
52+
/// is also subscribed to the topic (e.g., aggregator publishing its own
53+
/// attestation to a subnet it subscribes to) and no other peer subscribes.
54+
pub fn publish_ignore_no_peers(&self, topic: libp2p::gossipsub::IdentTopic, data: Vec<u8>) {
55+
let _ = self
56+
.cmd_tx
57+
.send(SwarmCommand::Publish {
58+
topic,
59+
data,
60+
ignore_no_peers: true,
61+
})
4162
.inspect_err(|_| warn!("Swarm adapter closed, cannot publish"));
4263
}
4364

@@ -123,12 +144,17 @@ async fn swarm_loop(
123144

124145
fn execute_command(swarm: &mut libp2p::Swarm<Behaviour>, cmd: SwarmCommand) {
125146
match cmd {
126-
SwarmCommand::Publish { topic, data } => {
127-
let _ = swarm
128-
.behaviour_mut()
129-
.gossipsub
130-
.publish(topic, data)
131-
.inspect_err(|err| warn!(%err, "Swarm adapter: publish failed"));
147+
SwarmCommand::Publish {
148+
topic,
149+
data,
150+
ignore_no_peers,
151+
} => {
152+
let result = swarm.behaviour_mut().gossipsub.publish(topic, data);
153+
if let Err(err) = result
154+
&& !(ignore_no_peers && matches!(err, PublishError::NoPeersSubscribedToTopic))
155+
{
156+
warn!(%err, "Swarm adapter: publish failed");
157+
}
132158
}
133159
SwarmCommand::Dial(addr) => {
134160
let _ = swarm

0 commit comments

Comments
 (0)