From 89d6a853f5776d2bd4ecc0554b001cdcce407b07 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Sat, 21 Mar 2026 13:02:30 -0400 Subject: [PATCH] Fix storage lock contention: reduce lock holds across 6 hot paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - get_blob_for_post: 3 sequential locks → 1 combined acquisition - prefetch_blobs_from_peer: lock only for DB reads, blob checks outside lock - fetch_engagement_from_peer: explicit lock release before next network I/O - serve_post: 4 locks (2 redundant) → 2 - run_replication_check: 2 locks → 1 combined - Badge cycle: N+2 IPC calls → 1 (new get_badge_counts command) - Follow timeout: 15s cap on auto-sync-on-follow to prevent UI hang - Notification clearing: clear system notifications on conversation read Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/connection.rs | 5 +- crates/core/src/node.rs | 125 +++++++++++++++++----------------- crates/core/src/web.rs | 32 +++++---- crates/tauri-app/src/lib.rs | 67 +++++++++++++++++- frontend/app.js | 29 ++------ 5 files changed, 152 insertions(+), 106 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 7b318cd..74ed049 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1716,7 +1716,7 @@ impl ConnectionManager { .get(peer_id) .ok_or_else(|| anyhow::anyhow!("not connected to {}", hex::encode(peer_id)))?; - // Get post IDs and their current header timestamps + // Brief lock: gather post IDs and their current header timestamps let post_headers: Vec<([u8; 32], u64)> = { let storage = self.storage.lock().await; let post_ids = storage.list_post_ids()?; @@ -1733,6 +1733,7 @@ impl ConnectionManager { }) .collect() }; + // Lock RELEASED — all network I/O happens without the lock let mut updated = 0; // Request headers in batches to avoid opening too many streams @@ -1757,6 +1758,7 @@ impl ConnectionManager { if response.updated { if let Some(json) = &response.header_json { if let Ok(header) = serde_json::from_str::(json) { + // Brief re-lock for writes only let storage = self.storage.lock().await; // Store the full header JSON let _ = storage.store_blob_header( @@ -1775,6 +1777,7 @@ impl ConnectionManager { let _ = storage.store_comment(comment); } let _ = storage.set_comment_policy(&header.post_id, &header.policy); + drop(storage); updated += 1; } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 1f15bad..cc26842 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1162,38 +1162,32 @@ impl Node { cid: &[u8; 32], post_id: &PostId, ) -> anyhow::Result>> { - // Get raw blob data (local) + // Get raw blob data (local — no lock needed) let raw_data = match self.blob_store.get(cid)? { Some(d) => d, None => return Ok(None), }; - { + + // Single lock acquisition for all DB reads + let (post, visibility, group_seeds) = { let storage = self.storage.lock().await; let _ = storage.touch_blob_access(cid); - } - - // Get post + visibility - let (post, visibility) = { - let storage = self.storage.lock().await; match storage.get_post_with_visibility(post_id)? { - Some(pv) => pv, + Some((post, vis)) => { + let seeds = if matches!(vis, PostVisibility::GroupEncrypted { .. }) { + storage.get_all_group_seeds_map().unwrap_or_default() + } else { + std::collections::HashMap::new() + }; + (post, vis, seeds) + } None => return Ok(Some(raw_data)), // No post context — return raw } }; - + // Lock released — decrypt without lock match &visibility { PostVisibility::Public => Ok(Some(raw_data)), - PostVisibility::Encrypted { .. } => { - let empty_map = std::collections::HashMap::new(); - self.decrypt_blob_for_post(raw_data, &post, &visibility, &empty_map) - } - PostVisibility::GroupEncrypted { .. } => { - let group_seeds = { - let storage = self.storage.lock().await; - storage.get_all_group_seeds_map().unwrap_or_default() - }; - self.decrypt_blob_for_post(raw_data, &post, &visibility, &group_seeds) - } + _ => self.decrypt_blob_for_post(raw_data, &post, &visibility, &group_seeds), } } @@ -1203,28 +1197,34 @@ impl Node { const MAX_PREFETCH_PER_CYCLE: usize = 20; pub async fn prefetch_blobs_from_peer(&self, peer_id: &NodeId) { - // Gather posts with missing blobs, newest first, capped - let missing: Vec<(PostId, NodeId, Vec)> = { + // Brief lock: get post IDs and their attachment info + let posts_with_atts: Vec<(PostId, NodeId, Vec)> = { let storage = self.storage.lock().await; let post_ids = storage.list_post_ids().unwrap_or_default(); let mut result = Vec::new(); - let mut total_missing = 0usize; - // list_post_ids returns newest first typically; cap total missing blobs for pid in post_ids { - if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; } + if result.len() >= Self::MAX_PREFETCH_PER_CYCLE { break; } if let Ok(Some(post)) = storage.get_post(&pid) { - let missing_atts: Vec<_> = post.attachments.iter() - .filter(|a| !self.blob_store.has(&a.cid)) - .cloned() - .collect(); - if !missing_atts.is_empty() { - total_missing += missing_atts.len(); - result.push((pid, post.author, missing_atts)); + if !post.attachments.is_empty() { + result.push((pid, post.author, post.attachments.clone())); } } } result }; + // Lock released — check blob store and filter without lock + let mut missing: Vec<(PostId, NodeId, Vec)> = Vec::new(); + let mut total_missing = 0usize; + for (pid, author, atts) in posts_with_atts { + if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; } + let missing_atts: Vec<_> = atts.into_iter() + .filter(|a| !self.blob_store.has(&a.cid)) + .collect(); + if !missing_atts.is_empty() { + total_missing += missing_atts.len(); + missing.push((pid, author, missing_atts)); + } + } if missing.is_empty() { return; @@ -4250,35 +4250,7 @@ impl Node { .as_millis() as u64; let since_ms = now_ms.saturating_sub(seventy_two_hours_ms); - let under_replicated: Vec = { - let storage = self.storage.lock().await; - let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) { - Ok(ids) => ids, - Err(e) => { - debug!(error = %e, "Replication: failed to get own recent posts"); - return; - } - }; - - // 3. Filter to under-replicated (< 2 downstream) - let mut needs_replication = Vec::new(); - for pid in &recent_ids { - match storage.get_post_downstream_count(pid) { - Ok(count) if count < 2 => { - needs_replication.push(*pid); - } - _ => {} - } - } - needs_replication - }; - - // 4. If none need replication, skip silently - if under_replicated.is_empty() { - return; - } - - // 5. Find connected Available/Persistent peers + // Get connected peers first (no storage lock needed) let connected = self.network.connected_peers().await; if connected.is_empty() { debug!("No peers for replication"); @@ -4294,8 +4266,29 @@ impl Node { } }; - let suitable_peers: Vec<(NodeId, u16)> = { + // Single lock: get under-replicated posts AND peer roles/pressure + let (under_replicated, suitable_peers) = { let storage = self.storage.lock().await; + let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) { + Ok(ids) => ids, + Err(e) => { + debug!(error = %e, "Replication: failed to get own recent posts"); + return; + } + }; + + // Filter to under-replicated (< 2 downstream) + let mut needs_replication = Vec::new(); + for pid in &recent_ids { + match storage.get_post_downstream_count(pid) { + Ok(count) if count < 2 => { + needs_replication.push(*pid); + } + _ => {} + } + } + + // Get peer roles + cache pressure in same lock let mut candidates = Vec::new(); for peer_id in &connected { if *peer_id == self.node_id { continue; } @@ -4312,9 +4305,15 @@ impl Node { let score = role_priority(&role) + pressure; candidates.push((*peer_id, score)); } - candidates + + (needs_replication, candidates) }; + // If none need replication, skip silently + if under_replicated.is_empty() { + return; + } + if suitable_peers.is_empty() { debug!("No peers available for replication"); return; diff --git a/crates/core/src/web.rs b/crates/core/src/web.rs index 3bc0fd8..4fc5b52 100644 --- a/crates/core/src/web.rs +++ b/crates/core/src/web.rs @@ -124,8 +124,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse None }; - // Gather all known holders: author + CDN downstream peers - let (holders, local_post) = { + // Single lock: gather holders, local post, AND author name if local + let (holders, local_post, local_author_name) = { let store = node.storage.lock().await; let mut holders = Vec::new(); @@ -141,7 +141,18 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse } let local = store.get_post_with_visibility(&post_id).ok().flatten(); - (holders, local) + // If we have the post locally and it's public, get author name now + let author_name = if let Some((ref post, ref vis)) = local { + if matches!(vis, PostVisibility::Public) { + store.get_profile(&post.author).ok().flatten() + .map(|p| p.display_name).unwrap_or_default() + } else { + String::new() + } + } else { + String::new() + }; + (holders, local, author_name) }; // --- Tier 1 & 2: Try direct redirect to an HTTP-capable holder --- @@ -157,15 +168,10 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse // --- Tier 3: QUIC proxy fallback --- - // Check local storage first + // Check local storage first (author_name already fetched above) if let Some((post, visibility)) = local_post { if matches!(visibility, PostVisibility::Public) { - let author_name = { - let store = node.storage.lock().await; - store.get_profile(&post.author).ok().flatten() - .map(|p| p.display_name).unwrap_or_default() - }; - let html = render_post_html(&post, &post_id, &author_name); + let html = render_post_html(&post, &post_id, &local_author_name); let _ = write_http_response(stream, 200, "text/html; charset=utf-8", html.as_bytes()).await; return; } @@ -182,14 +188,12 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse match search_result { Ok(Ok(Some(sync_post))) => { - { + // Single lock: store post AND get author name + let author_name = { let store = node.storage.lock().await; let _ = store.store_post_with_visibility( &sync_post.id, &sync_post.post, &sync_post.visibility, ); - } - let author_name = { - let store = node.storage.lock().await; store.get_profile(&sync_post.post.author).ok().flatten() .map(|p| p.display_name).unwrap_or_default() }; diff --git a/crates/tauri-app/src/lib.rs b/crates/tauri-app/src/lib.rs index 52789e2..c714dd8 100644 --- a/crates/tauri-app/src/lib.rs +++ b/crates/tauri-app/src/lib.rs @@ -116,6 +116,13 @@ struct StatsDto { follow_count: usize, } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct BadgeCountsDto { + new_feed: usize, + new_engagement: usize, +} + #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct RedundancyDto { @@ -666,11 +673,16 @@ async fn follow_node(state: State<'_, AppState>, node_id_hex: String) -> Result< let node = state.inner(); let nid = parse_node_id(&node_id_hex)?; node.follow(&nid).await.map_err(|e| e.to_string())?; - // Auto-sync: pull posts from the followed peer in the background + // Auto-sync: pull posts from the followed peer in the background (15s timeout) let node_clone = state.inner().clone(); tokio::spawn(async move { - if let Err(e) = node_clone.sync_with(nid).await { - tracing::debug!(error = %e, "Auto-sync after follow failed (peer may not be connected)"); + match tokio::time::timeout( + std::time::Duration::from_secs(15), + node_clone.sync_with(nid), + ).await { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::debug!(error = %e, "Auto-sync after follow failed"), + Err(_) => tracing::debug!("Auto-sync after follow timed out (15s)"), } }); Ok(()) @@ -1399,6 +1411,54 @@ async fn get_seen_engagement( })) } +#[tauri::command] +async fn get_badge_counts( + state: State<'_, AppState>, + last_feed_view_ms: u64, +) -> Result { + let node = state.inner(); + let storage = node.storage.lock().await; + + // Feed badge: count non-DM posts from others newer than last_feed_view_ms + let feed_posts = storage.get_feed().map_err(|e| e.to_string())?; + let new_feed = feed_posts.iter() + .filter(|(id, p, _vis)| { + p.author != node.node_id + && p.timestamp_ms > last_feed_view_ms + && !matches!( + storage.get_post_intent(id).ok().flatten(), + Some(VisibilityIntent::Direct(_)) + ) + }) + .count(); + + // My Posts badge: count own non-DM posts with unseen engagement + let all_posts = storage.list_posts_reverse_chron().map_err(|e| e.to_string())?; + let mut new_engagement = 0usize; + for (id, post, _vis) in &all_posts { + if post.author != node.node_id { continue; } + // Skip DMs + if matches!( + storage.get_post_intent(id).ok().flatten(), + Some(VisibilityIntent::Direct(_)) + ) { continue; } + let total_reacts: u64 = storage.get_reaction_counts(id, &node.node_id) + .unwrap_or_default() + .iter() + .map(|(_, count, _)| *count) + .sum(); + let total_comments = storage.get_comment_count(id).unwrap_or(0); + if total_reacts > 0 || total_comments > 0 { + let (seen_r, seen_c) = storage.get_seen_engagement(id).unwrap_or((0, 0)); + if total_reacts > seen_r as u64 || total_comments > seen_c as u64 { + new_engagement += 1; + } + } + } + + Ok(BadgeCountsDto { new_feed, new_engagement }) +} + #[tauri::command] async fn get_last_read_message( state: State<'_, AppState>, @@ -2057,6 +2117,7 @@ pub fn run() { mark_post_seen, mark_conversation_read, get_seen_engagement, + get_badge_counts, get_last_read_message, generate_share_link, ]) diff --git a/frontend/app.js b/frontend/app.js index 29f08ac..43d66aa 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -3007,33 +3007,12 @@ async function init() { updateNetworkIndicator(); }, 10000); - // Badge updates for non-active tabs — every 30 seconds (lightweight) + // Badge updates for non-active tabs — every 30 seconds (single IPC call) setInterval(async () => { try { - // Feed badge - if (currentTab !== 'feed' && _lastFeedViewMs > 0) { - const allPosts = await invoke('get_feed'); - const isDM = p => p.intentKind === 'direct' || (p.intentKind === 'unknown' && (p.visibility === 'encrypted-for-me' || (p.isMe && p.recipients && p.recipients.length > 0))); - const newCount = allPosts.filter(p => !isDM(p) && !p.isMe && p.timestampMs > _lastFeedViewMs).length; - updateTabBadge('feed', newCount); - } - // My Posts badge (new engagement) - if (currentTab !== 'myposts') { - const posts = await invoke('get_all_posts'); - const mine = posts.filter(p => p.isMe && p.intentKind !== 'direct' && !(p.intentKind === 'unknown' && p.recipients && p.recipients.length > 0)); - let newEngagement = 0; - for (const p of mine) { - const totalReacts = (p.reactionCounts || []).reduce((sum, r) => sum + r.count, 0); - const totalComments = p.commentCount || 0; - if (totalReacts > 0 || totalComments > 0) { - try { - const seen = await invoke('get_seen_engagement', { postId: p.id }); - if (totalReacts > (seen.seenReactCount || 0) || totalComments > (seen.seenCommentCount || 0)) newEngagement++; - } catch (_) { newEngagement++; } - } - } - updateTabBadge('myposts', newEngagement); - } + const badges = await invoke('get_badge_counts', { lastFeedViewMs: _lastFeedViewMs }); + if (currentTab !== 'feed') updateTabBadge('feed', badges.newFeed); + if (currentTab !== 'myposts') updateTabBadge('myposts', badges.newEngagement); } catch (_) {} }, 30000);