From 8c40e0da484b62be40be95cb5ee1f3f74641345e Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Wed, 22 Apr 2026 23:48:49 -0400 Subject: [PATCH] Fix: dedup concurrent outgoing connects to the same peer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Multiple code paths could each fire an outgoing connect to the same peer simultaneously with no coordination: the existing connections.contains_key() check under a lock, drop lock, connect pattern leaves a window where another path passes its own check and spawns a parallel attempt. Auto-reconnect, rebalance-slots, and the relay-introduction target-side handler were the three identified races (rank 1–3 in the pre-release audit). Observable as multiple near-simultaneous "Auto-connected to peer [hex]" / "Target-side hole punch succeeded to [hex]" log lines for the same peer. Fix: add `pending_connects: Arc>>` to ConnectionManager plus a `PendingConnectGuard` RAII type. Entry to each outgoing-connect path now acquires a guard via `try_begin_connect(peer)` under the CM lock; the guard inserts the NodeId into the set and the Drop impl removes it. Concurrent callers for the same peer see the NodeId already in `pending_connects` (or already in the `connections` / `sessions` maps) and return None, so they skip their attempt. Scope: - Only gates outgoing duplicates to the SAME peer. Different peers connect independently. Inbound connections from the guarded peer are not affected — the simultaneous-open race is still resolved by the existing check-before-insert on registration. - The std::sync::Mutex is held for a single O(1) hash op on acquire / drop — never across an await — so the guard lifetime spans the full connect attempt without blocking anything else. Sites wired: - Auto-reconnect after unexpected disconnect (connection.rs ~4492) - Rebalance-slots outgoing loop (connection.rs ~8049) - Relay-introduction target-side both handlers (connection.rs ~3945, ~5783) Tests: `pending_connect_guard_gates_same_peer_and_releases_on_drop` asserts second same-peer acquire is refused, different peers acquire independently, and drop releases the slot. 121 / 121 core tests pass. --- crates/core/src/connection.rs | 143 ++++++++++++++++++++++++++++++++-- 1 file changed, 136 insertions(+), 7 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 75b8ddb..1d4917b 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -666,6 +666,31 @@ pub struct ConnectionManager { /// Sticky N1 entries: NodeIds to report in N1 share until expiry (ms). /// Used to advertise the bootstrap anchor for 24h after isolation recovery. sticky_n1: HashMap, + /// NodeIds with an outgoing connect attempt currently in flight. + /// Used by `try_begin_connect` to suppress duplicate concurrent outgoing + /// connects from racing paths (auto-reconnect, rebalance, relay + /// introduction target-side) against the same peer. Held in a sync + /// Mutex because every operation is a single O(1) hash insert/remove — + /// never held across an await. + pending_connects: Arc>>, +} + +/// RAII guard for a pending-outgoing-connect entry. Returned by +/// `ConnectionManager::try_begin_connect`. The NodeId is inserted into +/// `pending_connects` at construction and removed on drop, so a second +/// call to `try_begin_connect` for the same peer returns `None` for as +/// long as this guard is alive. +pub struct PendingConnectGuard { + peer_id: NodeId, + set: Arc>>, +} + +impl Drop for PendingConnectGuard { + fn drop(&mut self) { + if let Ok(mut s) = self.set.lock() { + s.remove(&self.peer_id); + } + } } impl ConnectionManager { @@ -728,9 +753,36 @@ impl ConnectionManager { http_capable: false, http_addr: None, sticky_n1: HashMap::new(), + pending_connects: Arc::new(std::sync::Mutex::new(HashSet::new())), } } + /// Reserve an outgoing-connect slot for `peer`. Returns `Some(guard)` + /// if no other outgoing connect to this peer is already in flight and + /// we aren't already connected. The guard is held by the caller for + /// the duration of the connect attempt — subsequent calls for the + /// same peer return `None` until the guard drops. + /// + /// Only gates outgoing duplicates; has no effect on incoming + /// connections from the peer, which are accepted normally. + pub fn try_begin_connect(&self, peer: NodeId) -> Option { + if self.connections.contains_key(&peer) || self.sessions.contains_key(&peer) { + return None; + } + let mut set = match self.pending_connects.lock() { + Ok(g) => g, + Err(_) => return None, // Poisoned — fail closed rather than risk a racing connect. + }; + if set.contains(&peer) { + return None; + } + set.insert(peer); + Some(PendingConnectGuard { + peer_id: peer, + set: self.pending_connects.clone(), + }) + } + /// Our detected NAT type pub fn nat_type(&self) -> crate::types::NatType { self.nat_type @@ -3891,6 +3943,16 @@ impl ConnectionManager { s.get_peer_nat_profile(&requester) }; tokio::spawn(async move { + // Reserve the outgoing-connect slot for this requester so + // rebalance / auto-reconnect can't fire a parallel connect + // to the same peer while our hole-punch is in flight. + let _connect_guard = { + let cm = conn_mgr_arc.lock().await; + match cm.try_begin_connect(requester) { + Some(g) => g, + None => return, // Already connected or connect in flight. + } + }; if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &requester_addrs, our_nat_profile, peer_nat_profile).await { // Register as session with the peer's address for relay introduction let remote_sock = requester_addrs.iter() @@ -4440,13 +4502,16 @@ impl ConnectionManager { tokio::spawn(async move { // Brief delay to let the disconnect settle and avoid reconnect storms tokio::time::sleep(std::time::Duration::from_secs(3)).await; - // Check if already reconnected (by the other side or growth loop) - { + // Reserve the outgoing-connect slot for this peer. If + // another path (rebalance, relay-introduction) is + // already connecting to them, skip. + let _connect_guard = { let cm = cm_arc.lock().await; - if cm.connections.contains_key(&remote_node_id) || cm.sessions.contains_key(&remote_node_id) { - return; // Already reconnected + match cm.try_begin_connect(remote_node_id) { + Some(g) => g, + None => return, // Already connected or connect in flight. } - } + }; if let Ok(eid) = iroh::EndpointId::from_bytes(&remote_node_id) { let ep_addr = iroh::EndpointAddr::from(eid).with_ip_addr(addr); let endpoint = { @@ -5726,6 +5791,17 @@ impl ConnectionManager { .cloned().collect(); let requester = payload.requester; tokio::spawn(async move { + // Reserve the outgoing-connect slot for this + // requester so we don't race with rebalance / + // auto-reconnect paths firing their own + // outgoing connect to the same peer. + let _connect_guard = { + let cm = cm_arc.lock().await; + match cm.try_begin_connect(requester) { + Some(g) => g, + None => return, // Already connected or connect in flight. + } + }; if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &routable_addrs, our_nat_profile, peer_nat_profile).await { let remote_sock = routable_addrs.iter().filter_map(|a| a.parse::().ok()).find(|s| crate::network::is_shareable_addr(s)); let mut cm = cm_arc.lock().await; @@ -7995,8 +8071,21 @@ impl ConnectionActor { let mut cm = self.cm.lock().await; cm.rebalance_slots().await.unwrap_or_default() }; - // Connect outside the lock — no 15s hold + // Connect outside the lock — no 15s hold. Reserve an + // outgoing-connect slot per peer so we don't race with + // auto-reconnect / relay-introduction paths for the same + // target; skip peers already mid-connect. for (peer_id, addr, _addr_s, slot_kind) in pending_connects { + let _connect_guard = { + let cm = self.cm.lock().await; + match cm.try_begin_connect(peer_id) { + Some(g) => g, + None => { + debug!(peer = hex::encode(peer_id), "rebalance: skipping — connect already in flight"); + continue; + } + } + }; let addrs: Vec = addr.ip_addrs().copied().collect(); if !addrs.is_empty() { let s = storage.get().await; @@ -8411,7 +8500,10 @@ fn now_ms() -> u64 { #[cfg(test)] mod tests { - use super::scanner_semaphore; + use super::{scanner_semaphore, PendingConnectGuard}; + use crate::types::NodeId; + use std::collections::HashSet; + use std::sync::{Arc, Mutex as StdMutex}; #[test] fn scanner_semaphore_caps_concurrent_scans_at_one() { @@ -8425,4 +8517,41 @@ mod tests { let p2 = sem.try_acquire().expect("after release, next scan should acquire"); drop(p2); } + + /// Construct a guard directly against a test-owned set, bypassing + /// ConnectionManager. This verifies the guard's state-machine + /// (insert on acquire, remove on drop) without needing a full CM. + fn try_begin(set: &Arc>>, peer: NodeId) -> Option { + let mut s = set.lock().ok()?; + if s.contains(&peer) { return None; } + s.insert(peer); + Some(PendingConnectGuard { peer_id: peer, set: Arc::clone(set) }) + } + + #[test] + fn pending_connect_guard_gates_same_peer_and_releases_on_drop() { + let set: Arc>> = Arc::new(StdMutex::new(HashSet::new())); + let peer_a: NodeId = [1u8; 32]; + let peer_b: NodeId = [2u8; 32]; + + // First acquire for A succeeds. + let g_a = try_begin(&set, peer_a).expect("first guard should acquire for peer A"); + // Second concurrent acquire for A is rejected. + assert!(try_begin(&set, peer_a).is_none(), "second concurrent guard for A must be refused"); + // A different peer is unaffected. + let g_b = try_begin(&set, peer_b).expect("guard for peer B should acquire independently"); + + // Dropping A's guard releases the slot. + drop(g_a); + assert!(!set.lock().unwrap().contains(&peer_a), "peer A should be removed from pending_connects on drop"); + // A new acquire for A now succeeds. + let g_a2 = try_begin(&set, peer_a).expect("after release, new guard for A should acquire"); + + // B's guard still active — independent. + assert!(set.lock().unwrap().contains(&peer_b)); + + drop(g_a2); + drop(g_b); + assert!(set.lock().unwrap().is_empty(), "all guards dropped — set should be empty"); + } }