Fix: dedup concurrent outgoing connects to the same peer

Multiple code paths could each fire an outgoing connect to the same
peer simultaneously with no coordination: the existing
connections.contains_key() check under a lock, drop lock, connect
pattern leaves a window where another path passes its own check and
spawns a parallel attempt. Auto-reconnect, rebalance-slots, and the
relay-introduction target-side handler were the three identified races
(rank 1–3 in the pre-release audit). Observable as multiple
near-simultaneous "Auto-connected to peer [hex]" / "Target-side hole
punch succeeded to [hex]" log lines for the same peer.

Fix: add `pending_connects: Arc<std::sync::Mutex<HashSet<NodeId>>>` to
ConnectionManager plus a `PendingConnectGuard` RAII type. Entry to each
outgoing-connect path now acquires a guard via `try_begin_connect(peer)`
under the CM lock; the guard inserts the NodeId into the set and the
Drop impl removes it. Concurrent callers for the same peer see the
NodeId already in `pending_connects` (or already in the `connections` /
`sessions` maps) and return None, so they skip their attempt.

Scope:
- Only gates outgoing duplicates to the SAME peer. Different peers
  connect independently. Inbound connections from the guarded peer are
  not affected — the simultaneous-open race is still resolved by the
  existing check-before-insert on registration.
- The std::sync::Mutex is held for a single O(1) hash op on
  acquire / drop — never across an await — so the guard lifetime spans
  the full connect attempt without blocking anything else.

Sites wired:
- Auto-reconnect after unexpected disconnect (connection.rs ~4492)
- Rebalance-slots outgoing loop (connection.rs ~8049)
- Relay-introduction target-side both handlers (connection.rs ~3945,
  ~5783)

Tests: `pending_connect_guard_gates_same_peer_and_releases_on_drop`
asserts second same-peer acquire is refused, different peers acquire
independently, and drop releases the slot. 121 / 121 core tests pass.
This commit is contained in:
Scott Reimers 2026-04-22 23:48:49 -04:00
parent dfd3253734
commit 8c40e0da48

View file

@ -666,6 +666,31 @@ pub struct ConnectionManager {
/// Sticky N1 entries: NodeIds to report in N1 share until expiry (ms).
/// Used to advertise the bootstrap anchor for 24h after isolation recovery.
sticky_n1: HashMap<NodeId, u64>,
/// NodeIds with an outgoing connect attempt currently in flight.
/// Used by `try_begin_connect` to suppress duplicate concurrent outgoing
/// connects from racing paths (auto-reconnect, rebalance, relay
/// introduction target-side) against the same peer. Held in a sync
/// Mutex because every operation is a single O(1) hash insert/remove —
/// never held across an await.
pending_connects: Arc<std::sync::Mutex<HashSet<NodeId>>>,
}
/// RAII guard for a pending-outgoing-connect entry. Returned by
/// `ConnectionManager::try_begin_connect`. The NodeId is inserted into
/// `pending_connects` at construction and removed on drop, so a second
/// call to `try_begin_connect` for the same peer returns `None` for as
/// long as this guard is alive.
pub struct PendingConnectGuard {
peer_id: NodeId,
set: Arc<std::sync::Mutex<HashSet<NodeId>>>,
}
impl Drop for PendingConnectGuard {
fn drop(&mut self) {
if let Ok(mut s) = self.set.lock() {
s.remove(&self.peer_id);
}
}
}
impl ConnectionManager {
@ -728,9 +753,36 @@ impl ConnectionManager {
http_capable: false,
http_addr: None,
sticky_n1: HashMap::new(),
pending_connects: Arc::new(std::sync::Mutex::new(HashSet::new())),
}
}
/// Reserve an outgoing-connect slot for `peer`. Returns `Some(guard)`
/// if no other outgoing connect to this peer is already in flight and
/// we aren't already connected. The guard is held by the caller for
/// the duration of the connect attempt — subsequent calls for the
/// same peer return `None` until the guard drops.
///
/// Only gates outgoing duplicates; has no effect on incoming
/// connections from the peer, which are accepted normally.
pub fn try_begin_connect(&self, peer: NodeId) -> Option<PendingConnectGuard> {
if self.connections.contains_key(&peer) || self.sessions.contains_key(&peer) {
return None;
}
let mut set = match self.pending_connects.lock() {
Ok(g) => g,
Err(_) => return None, // Poisoned — fail closed rather than risk a racing connect.
};
if set.contains(&peer) {
return None;
}
set.insert(peer);
Some(PendingConnectGuard {
peer_id: peer,
set: self.pending_connects.clone(),
})
}
/// Our detected NAT type
pub fn nat_type(&self) -> crate::types::NatType {
self.nat_type
@ -3891,6 +3943,16 @@ impl ConnectionManager {
s.get_peer_nat_profile(&requester)
};
tokio::spawn(async move {
// Reserve the outgoing-connect slot for this requester so
// rebalance / auto-reconnect can't fire a parallel connect
// to the same peer while our hole-punch is in flight.
let _connect_guard = {
let cm = conn_mgr_arc.lock().await;
match cm.try_begin_connect(requester) {
Some(g) => g,
None => return, // Already connected or connect in flight.
}
};
if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &requester_addrs, our_nat_profile, peer_nat_profile).await {
// Register as session with the peer's address for relay introduction
let remote_sock = requester_addrs.iter()
@ -4440,13 +4502,16 @@ impl ConnectionManager {
tokio::spawn(async move {
// Brief delay to let the disconnect settle and avoid reconnect storms
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
// Check if already reconnected (by the other side or growth loop)
{
// Reserve the outgoing-connect slot for this peer. If
// another path (rebalance, relay-introduction) is
// already connecting to them, skip.
let _connect_guard = {
let cm = cm_arc.lock().await;
if cm.connections.contains_key(&remote_node_id) || cm.sessions.contains_key(&remote_node_id) {
return; // Already reconnected
}
match cm.try_begin_connect(remote_node_id) {
Some(g) => g,
None => return, // Already connected or connect in flight.
}
};
if let Ok(eid) = iroh::EndpointId::from_bytes(&remote_node_id) {
let ep_addr = iroh::EndpointAddr::from(eid).with_ip_addr(addr);
let endpoint = {
@ -5726,6 +5791,17 @@ impl ConnectionManager {
.cloned().collect();
let requester = payload.requester;
tokio::spawn(async move {
// Reserve the outgoing-connect slot for this
// requester so we don't race with rebalance /
// auto-reconnect paths firing their own
// outgoing connect to the same peer.
let _connect_guard = {
let cm = cm_arc.lock().await;
match cm.try_begin_connect(requester) {
Some(g) => g,
None => return, // Already connected or connect in flight.
}
};
if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &routable_addrs, our_nat_profile, peer_nat_profile).await {
let remote_sock = routable_addrs.iter().filter_map(|a| a.parse::<std::net::SocketAddr>().ok()).find(|s| crate::network::is_shareable_addr(s));
let mut cm = cm_arc.lock().await;
@ -7995,8 +8071,21 @@ impl ConnectionActor {
let mut cm = self.cm.lock().await;
cm.rebalance_slots().await.unwrap_or_default()
};
// Connect outside the lock — no 15s hold
// Connect outside the lock — no 15s hold. Reserve an
// outgoing-connect slot per peer so we don't race with
// auto-reconnect / relay-introduction paths for the same
// target; skip peers already mid-connect.
for (peer_id, addr, _addr_s, slot_kind) in pending_connects {
let _connect_guard = {
let cm = self.cm.lock().await;
match cm.try_begin_connect(peer_id) {
Some(g) => g,
None => {
debug!(peer = hex::encode(peer_id), "rebalance: skipping — connect already in flight");
continue;
}
}
};
let addrs: Vec<std::net::SocketAddr> = addr.ip_addrs().copied().collect();
if !addrs.is_empty() {
let s = storage.get().await;
@ -8411,7 +8500,10 @@ fn now_ms() -> u64 {
#[cfg(test)]
mod tests {
use super::scanner_semaphore;
use super::{scanner_semaphore, PendingConnectGuard};
use crate::types::NodeId;
use std::collections::HashSet;
use std::sync::{Arc, Mutex as StdMutex};
#[test]
fn scanner_semaphore_caps_concurrent_scans_at_one() {
@ -8425,4 +8517,41 @@ mod tests {
let p2 = sem.try_acquire().expect("after release, next scan should acquire");
drop(p2);
}
/// Construct a guard directly against a test-owned set, bypassing
/// ConnectionManager. This verifies the guard's state-machine
/// (insert on acquire, remove on drop) without needing a full CM.
fn try_begin(set: &Arc<StdMutex<HashSet<NodeId>>>, peer: NodeId) -> Option<PendingConnectGuard> {
let mut s = set.lock().ok()?;
if s.contains(&peer) { return None; }
s.insert(peer);
Some(PendingConnectGuard { peer_id: peer, set: Arc::clone(set) })
}
#[test]
fn pending_connect_guard_gates_same_peer_and_releases_on_drop() {
let set: Arc<StdMutex<HashSet<NodeId>>> = Arc::new(StdMutex::new(HashSet::new()));
let peer_a: NodeId = [1u8; 32];
let peer_b: NodeId = [2u8; 32];
// First acquire for A succeeds.
let g_a = try_begin(&set, peer_a).expect("first guard should acquire for peer A");
// Second concurrent acquire for A is rejected.
assert!(try_begin(&set, peer_a).is_none(), "second concurrent guard for A must be refused");
// A different peer is unaffected.
let g_b = try_begin(&set, peer_b).expect("guard for peer B should acquire independently");
// Dropping A's guard releases the slot.
drop(g_a);
assert!(!set.lock().unwrap().contains(&peer_a), "peer A should be removed from pending_connects on drop");
// A new acquire for A now succeeds.
let g_a2 = try_begin(&set, peer_a).expect("after release, new guard for A should acquire");
// B's guard still active — independent.
assert!(set.lock().unwrap().contains(&peer_b));
drop(g_a2);
drop(g_b);
assert!(set.lock().unwrap().is_empty(), "all guards dropped — set should be empty");
}
}