Fix 3 pre-existing lock contention issues in message handlers

- handle_pull_request: extracted to static method, conn_mgr lock no longer
  held during network I/O (read request + write response)
- handle_address_request: inlined at call site with brief lock to gather
  data, response written after lock release
- handle_session_relay: capacity check + target lookup under brief lock,
  rejection write moved outside lock scope

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-04-06 00:35:38 -04:00
parent be253e8001
commit 4379b6fdfc

View file

@ -2173,8 +2173,10 @@ impl ConnectionManager {
}
/// Handle an incoming pull request from a peer.
pub async fn handle_pull_request(
&self,
/// Handle a pull sync request — no conn_mgr lock needed, only storage + our_node_id.
pub async fn handle_pull_request_unlocked(
storage: &StoragePool,
our_node_id: NodeId,
remote_node_id: NodeId,
mut recv: iroh::endpoint::RecvStream,
mut send: iroh::endpoint::SendStream,
@ -2190,12 +2192,11 @@ impl ConnectionManager {
// Phase 1: Brief lock — load data
let (all_posts, group_members) = {
let storage = self.storage.get().await;
let posts = storage.list_posts_with_visibility()?;
let members = storage.get_all_group_members().unwrap_or_default();
let s = storage.get().await;
let posts = s.list_posts_with_visibility()?;
let members = s.get_all_group_members().unwrap_or_default();
(posts, members)
};
// Lock RELEASED
// Phase 2: Filter without lock (pure CPU)
let mut candidates_to_send = Vec::new();
@ -2209,27 +2210,23 @@ impl ConnectionManager {
continue;
}
// Determine if peer already has this post
let peer_has_post = if use_since_ms {
// v4 path: filter by per-author timestamp (60s fudge for clock skew)
if let Some(&since) = since_ms_map.get(&post.author) {
post.timestamp_ms <= since + 60_000
} else {
false // no since_ms for this author — they want everything
false
}
} else {
// Legacy path: use have_post_ids
their_post_ids.contains(&id)
};
if !peer_has_post {
candidates_to_send.push((id, post, visibility));
} else {
// They already have the post — send visibility update if we authored it
if post.author == self.our_node_id {
if post.author == our_node_id {
vis_updates_to_send.push(crate::types::VisibilityUpdate {
post_id: id,
author: self.our_node_id,
author: our_node_id,
visibility,
});
}
@ -2238,9 +2235,9 @@ impl ConnectionManager {
// Phase 3: Brief re-lock for is_deleted checks on filtered posts
let (posts, vis_updates) = {
let storage = self.storage.get().await;
let s = storage.get().await;
let posts_to_send: Vec<SyncPost> = candidates_to_send.into_iter()
.filter(|(id, _, _)| !storage.is_deleted(id).unwrap_or(false))
.filter(|(id, _, _)| !s.is_deleted(id).unwrap_or(false))
.map(|(id, post, visibility)| SyncPost { id, post, visibility })
.collect();
(posts_to_send, vis_updates_to_send)
@ -2259,79 +2256,6 @@ impl ConnectionManager {
/// Handle an address resolution request: check connections, social routes, then peer records.
/// If target is disconnected, register the requester as a watcher.
pub async fn handle_address_request(
&self,
mut recv: iroh::endpoint::RecvStream,
mut send: iroh::endpoint::SendStream,
requester: NodeId,
) -> anyhow::Result<()> {
let req: crate::protocol::AddressRequestPayload =
read_payload(&mut recv, MAX_PAYLOAD).await?;
// Check if target is directly connected to us
if let Some(_pc) = self.connections.get(&req.target) {
let storage = self.storage.get().await;
let addr = storage.get_peer_record(&req.target)?
.and_then(|r| r.addresses.first().map(|a| a.to_string()));
let response = crate::protocol::AddressResponsePayload {
target: req.target,
address: addr,
disconnected_at: None,
peer_addresses: vec![],
};
write_typed_message(&mut send, MessageType::AddressResponse, &response).await?;
send.finish()?;
return Ok(());
}
let storage = self.storage.get().await;
// Check social routes (richer info)
if let Some(route) = storage.get_social_route(&req.target)? {
match route.status {
SocialStatus::Online => {
let addr = route.addresses.first().map(|a| a.to_string());
let response = crate::protocol::AddressResponsePayload {
target: req.target,
address: addr,
disconnected_at: None,
peer_addresses: route.peer_addresses.clone(),
};
write_typed_message(&mut send, MessageType::AddressResponse, &response).await?;
send.finish()?;
return Ok(());
}
SocialStatus::Disconnected => {
let _ = storage.add_reconnect_watcher(&req.target, &requester);
let response = crate::protocol::AddressResponsePayload {
target: req.target,
address: None,
disconnected_at: Some(route.last_seen_ms),
peer_addresses: route.peer_addresses.clone(),
};
write_typed_message(&mut send, MessageType::AddressResponse, &response).await?;
send.finish()?;
return Ok(());
}
}
}
// Fall back to peer record
let address = storage.get_peer_record(&req.target)?
.and_then(|r| r.addresses.first().map(|a| a.to_string()));
let response = crate::protocol::AddressResponsePayload {
target: req.target,
address,
disconnected_at: None,
peer_addresses: vec![],
};
write_typed_message(&mut send, MessageType::AddressResponse, &response).await?;
send.finish()?;
Ok(())
}
/// Resolve a peer's address using connections, social routes, and N2/N3 referral chain.
pub async fn resolve_address(&self, target: &NodeId) -> anyhow::Result<Option<String>> {
// Check if target is directly connected
@ -4289,10 +4213,15 @@ impl ConnectionManager {
let payload: SessionRelayPayload = read_payload(&mut requester_recv, 4096).await?;
// Check capacity and find target connection
let (target_conn, active_pipes) = {
let (can_accept, target_conn, active_pipes) = {
let cm = conn_mgr.lock().await;
if !cm.can_accept_relay_pipe() {
// Reject — at capacity
let can = cm.can_accept_relay_pipe();
let tc = cm.connections.get(&payload.target)
.map(|pc| pc.connection.clone());
(can, tc, Arc::clone(cm.active_relay_pipes()))
};
// Lock RELEASED — reject outside lock if at capacity
if !can_accept {
let result = RelayIntroduceResultPayload {
intro_id: payload.intro_id,
accepted: false,
@ -4306,11 +4235,6 @@ impl ConnectionManager {
return Ok(());
}
let target_conn = cm.connections.get(&payload.target)
.map(|pc| pc.connection.clone());
(target_conn, Arc::clone(cm.active_relay_pipes()))
};
let target_conn = match target_conn {
Some(c) => c,
None => {
@ -5542,8 +5466,12 @@ impl ConnectionManager {
) -> anyhow::Result<()> {
match msg_type {
MessageType::PullSyncRequest => {
let (storage, our_node_id) = {
let cm = conn_mgr.lock().await;
cm.handle_pull_request(remote_node_id, recv, send).await?;
(Arc::clone(&cm.storage), *cm.our_node_id())
};
// Lock RELEASED — handler does its own brief storage locks + network I/O
ConnectionManager::handle_pull_request_unlocked(&storage, our_node_id, remote_node_id, recv, send).await?;
}
MessageType::InitialExchange => {
let (storage, our_node_id, anchor_addr, our_nat_type, our_http_capable, our_http_addr) = {
@ -5554,8 +5482,50 @@ impl ConnectionManager {
.await?;
}
MessageType::AddressRequest => {
// Read request OUTSIDE lock (network I/O)
let req: crate::protocol::AddressRequestPayload = read_payload(&mut recv, MAX_PAYLOAD).await?;
// Brief lock: gather address data
let response = {
let cm = conn_mgr.lock().await;
cm.handle_address_request(recv, send, remote_node_id).await?;
let storage = cm.storage.get().await;
if cm.connections.contains_key(&req.target) {
let addr = storage.get_peer_record(&req.target).ok().flatten()
.and_then(|r| r.addresses.first().map(|a| a.to_string()));
crate::protocol::AddressResponsePayload {
target: req.target, address: addr,
disconnected_at: None, peer_addresses: vec![],
}
} else if let Some(route) = storage.get_social_route(&req.target).ok().flatten() {
match route.status {
SocialStatus::Online => {
crate::protocol::AddressResponsePayload {
target: req.target,
address: route.addresses.first().map(|a| a.to_string()),
disconnected_at: None,
peer_addresses: route.peer_addresses.clone(),
}
}
SocialStatus::Disconnected => {
let _ = storage.add_reconnect_watcher(&req.target, &remote_node_id);
crate::protocol::AddressResponsePayload {
target: req.target, address: None,
disconnected_at: Some(route.last_seen_ms),
peer_addresses: route.peer_addresses.clone(),
}
}
}
} else {
let address = storage.get_peer_record(&req.target).ok().flatten()
.and_then(|r| r.addresses.first().map(|a| a.to_string()));
crate::protocol::AddressResponsePayload {
target: req.target, address,
disconnected_at: None, peer_addresses: vec![],
}
}
};
// Lock RELEASED — write response outside lock
write_typed_message(&mut send, MessageType::AddressResponse, &response).await?;
send.finish()?;
}
MessageType::SocialCheckin => {
let payload: SocialCheckinPayload = read_payload(&mut recv, MAX_PAYLOAD).await?;