From 4379b6fdfcfb7bd7f98594c9b2ae0880172af62d Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Mon, 6 Apr 2026 00:35:38 -0400 Subject: [PATCH] Fix 3 pre-existing lock contention issues in message handlers - handle_pull_request: extracted to static method, conn_mgr lock no longer held during network I/O (read request + write response) - handle_address_request: inlined at call site with brief lock to gather data, response written after lock release - handle_session_relay: capacity check + target lookup under brief lock, rejection write moved outside lock scope Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/connection.rs | 190 ++++++++++++++-------------------- 1 file changed, 80 insertions(+), 110 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 2b74e16..8e25fc0 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -2173,8 +2173,10 @@ impl ConnectionManager { } /// Handle an incoming pull request from a peer. - pub async fn handle_pull_request( - &self, + /// Handle a pull sync request — no conn_mgr lock needed, only storage + our_node_id. + pub async fn handle_pull_request_unlocked( + storage: &StoragePool, + our_node_id: NodeId, remote_node_id: NodeId, mut recv: iroh::endpoint::RecvStream, mut send: iroh::endpoint::SendStream, @@ -2190,12 +2192,11 @@ impl ConnectionManager { // Phase 1: Brief lock — load data let (all_posts, group_members) = { - let storage = self.storage.get().await; - let posts = storage.list_posts_with_visibility()?; - let members = storage.get_all_group_members().unwrap_or_default(); + let s = storage.get().await; + let posts = s.list_posts_with_visibility()?; + let members = s.get_all_group_members().unwrap_or_default(); (posts, members) }; - // Lock RELEASED // Phase 2: Filter without lock (pure CPU) let mut candidates_to_send = Vec::new(); @@ -2209,27 +2210,23 @@ impl ConnectionManager { continue; } - // Determine if peer already has this post let peer_has_post = if use_since_ms { - // v4 path: filter by per-author timestamp (60s fudge for clock skew) if let Some(&since) = since_ms_map.get(&post.author) { post.timestamp_ms <= since + 60_000 } else { - false // no since_ms for this author — they want everything + false } } else { - // Legacy path: use have_post_ids their_post_ids.contains(&id) }; if !peer_has_post { candidates_to_send.push((id, post, visibility)); } else { - // They already have the post — send visibility update if we authored it - if post.author == self.our_node_id { + if post.author == our_node_id { vis_updates_to_send.push(crate::types::VisibilityUpdate { post_id: id, - author: self.our_node_id, + author: our_node_id, visibility, }); } @@ -2238,9 +2235,9 @@ impl ConnectionManager { // Phase 3: Brief re-lock for is_deleted checks on filtered posts let (posts, vis_updates) = { - let storage = self.storage.get().await; + let s = storage.get().await; let posts_to_send: Vec = candidates_to_send.into_iter() - .filter(|(id, _, _)| !storage.is_deleted(id).unwrap_or(false)) + .filter(|(id, _, _)| !s.is_deleted(id).unwrap_or(false)) .map(|(id, post, visibility)| SyncPost { id, post, visibility }) .collect(); (posts_to_send, vis_updates_to_send) @@ -2259,79 +2256,6 @@ impl ConnectionManager { /// Handle an address resolution request: check connections, social routes, then peer records. /// If target is disconnected, register the requester as a watcher. - pub async fn handle_address_request( - &self, - mut recv: iroh::endpoint::RecvStream, - mut send: iroh::endpoint::SendStream, - requester: NodeId, - ) -> anyhow::Result<()> { - let req: crate::protocol::AddressRequestPayload = - read_payload(&mut recv, MAX_PAYLOAD).await?; - - // Check if target is directly connected to us - if let Some(_pc) = self.connections.get(&req.target) { - let storage = self.storage.get().await; - let addr = storage.get_peer_record(&req.target)? - .and_then(|r| r.addresses.first().map(|a| a.to_string())); - let response = crate::protocol::AddressResponsePayload { - target: req.target, - address: addr, - disconnected_at: None, - peer_addresses: vec![], - }; - write_typed_message(&mut send, MessageType::AddressResponse, &response).await?; - send.finish()?; - return Ok(()); - } - - let storage = self.storage.get().await; - - // Check social routes (richer info) - if let Some(route) = storage.get_social_route(&req.target)? { - match route.status { - SocialStatus::Online => { - let addr = route.addresses.first().map(|a| a.to_string()); - let response = crate::protocol::AddressResponsePayload { - target: req.target, - address: addr, - disconnected_at: None, - peer_addresses: route.peer_addresses.clone(), - }; - write_typed_message(&mut send, MessageType::AddressResponse, &response).await?; - send.finish()?; - return Ok(()); - } - SocialStatus::Disconnected => { - let _ = storage.add_reconnect_watcher(&req.target, &requester); - let response = crate::protocol::AddressResponsePayload { - target: req.target, - address: None, - disconnected_at: Some(route.last_seen_ms), - peer_addresses: route.peer_addresses.clone(), - }; - write_typed_message(&mut send, MessageType::AddressResponse, &response).await?; - send.finish()?; - return Ok(()); - } - } - } - - // Fall back to peer record - let address = storage.get_peer_record(&req.target)? - .and_then(|r| r.addresses.first().map(|a| a.to_string())); - - let response = crate::protocol::AddressResponsePayload { - target: req.target, - address, - disconnected_at: None, - peer_addresses: vec![], - }; - write_typed_message(&mut send, MessageType::AddressResponse, &response).await?; - send.finish()?; - - Ok(()) - } - /// Resolve a peer's address using connections, social routes, and N2/N3 referral chain. pub async fn resolve_address(&self, target: &NodeId) -> anyhow::Result> { // Check if target is directly connected @@ -4289,27 +4213,27 @@ impl ConnectionManager { let payload: SessionRelayPayload = read_payload(&mut requester_recv, 4096).await?; // Check capacity and find target connection - let (target_conn, active_pipes) = { + let (can_accept, target_conn, active_pipes) = { let cm = conn_mgr.lock().await; - if !cm.can_accept_relay_pipe() { - // Reject — at capacity - let result = RelayIntroduceResultPayload { - intro_id: payload.intro_id, - accepted: false, - target_addresses: vec![], - relay_available: false, - reject_reason: Some("relay at capacity".to_string()), - nat_mapping: None, nat_filtering: None, - }; - write_typed_message(&mut requester_send, MessageType::RelayIntroduceResult, &result).await?; - requester_send.finish()?; - return Ok(()); - } - - let target_conn = cm.connections.get(&payload.target) + let can = cm.can_accept_relay_pipe(); + let tc = cm.connections.get(&payload.target) .map(|pc| pc.connection.clone()); - (target_conn, Arc::clone(cm.active_relay_pipes())) + (can, tc, Arc::clone(cm.active_relay_pipes())) }; + // Lock RELEASED — reject outside lock if at capacity + if !can_accept { + let result = RelayIntroduceResultPayload { + intro_id: payload.intro_id, + accepted: false, + target_addresses: vec![], + relay_available: false, + reject_reason: Some("relay at capacity".to_string()), + nat_mapping: None, nat_filtering: None, + }; + write_typed_message(&mut requester_send, MessageType::RelayIntroduceResult, &result).await?; + requester_send.finish()?; + return Ok(()); + } let target_conn = match target_conn { Some(c) => c, @@ -5542,8 +5466,12 @@ impl ConnectionManager { ) -> anyhow::Result<()> { match msg_type { MessageType::PullSyncRequest => { - let cm = conn_mgr.lock().await; - cm.handle_pull_request(remote_node_id, recv, send).await?; + let (storage, our_node_id) = { + let cm = conn_mgr.lock().await; + (Arc::clone(&cm.storage), *cm.our_node_id()) + }; + // Lock RELEASED — handler does its own brief storage locks + network I/O + ConnectionManager::handle_pull_request_unlocked(&storage, our_node_id, remote_node_id, recv, send).await?; } MessageType::InitialExchange => { let (storage, our_node_id, anchor_addr, our_nat_type, our_http_capable, our_http_addr) = { @@ -5554,8 +5482,50 @@ impl ConnectionManager { .await?; } MessageType::AddressRequest => { - let cm = conn_mgr.lock().await; - cm.handle_address_request(recv, send, remote_node_id).await?; + // Read request OUTSIDE lock (network I/O) + let req: crate::protocol::AddressRequestPayload = read_payload(&mut recv, MAX_PAYLOAD).await?; + // Brief lock: gather address data + let response = { + let cm = conn_mgr.lock().await; + let storage = cm.storage.get().await; + if cm.connections.contains_key(&req.target) { + let addr = storage.get_peer_record(&req.target).ok().flatten() + .and_then(|r| r.addresses.first().map(|a| a.to_string())); + crate::protocol::AddressResponsePayload { + target: req.target, address: addr, + disconnected_at: None, peer_addresses: vec![], + } + } else if let Some(route) = storage.get_social_route(&req.target).ok().flatten() { + match route.status { + SocialStatus::Online => { + crate::protocol::AddressResponsePayload { + target: req.target, + address: route.addresses.first().map(|a| a.to_string()), + disconnected_at: None, + peer_addresses: route.peer_addresses.clone(), + } + } + SocialStatus::Disconnected => { + let _ = storage.add_reconnect_watcher(&req.target, &remote_node_id); + crate::protocol::AddressResponsePayload { + target: req.target, address: None, + disconnected_at: Some(route.last_seen_ms), + peer_addresses: route.peer_addresses.clone(), + } + } + } + } else { + let address = storage.get_peer_record(&req.target).ok().flatten() + .and_then(|r| r.addresses.first().map(|a| a.to_string())); + crate::protocol::AddressResponsePayload { + target: req.target, address, + disconnected_at: None, peer_addresses: vec![], + } + } + }; + // Lock RELEASED — write response outside lock + write_typed_message(&mut send, MessageType::AddressResponse, &response).await?; + send.finish()?; } MessageType::SocialCheckin => { let payload: SocialCheckinPayload = read_payload(&mut recv, MAX_PAYLOAD).await?;