Fix storage lock contention: reduce lock holds across 6 hot paths

- get_blob_for_post: 3 sequential locks → 1 combined acquisition
- prefetch_blobs_from_peer: lock only for DB reads, blob checks outside lock
- fetch_engagement_from_peer: explicit lock release before next network I/O
- serve_post: 4 locks (2 redundant) → 2
- run_replication_check: 2 locks → 1 combined
- Badge cycle: N+2 IPC calls → 1 (new get_badge_counts command)
- Follow timeout: 15s cap on auto-sync-on-follow to prevent UI hang
- Notification clearing: clear system notifications on conversation read

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-03-21 13:02:30 -04:00
parent 3cc39590a7
commit 89d6a853f5
5 changed files with 152 additions and 106 deletions

View file

@ -1716,7 +1716,7 @@ impl ConnectionManager {
.get(peer_id)
.ok_or_else(|| anyhow::anyhow!("not connected to {}", hex::encode(peer_id)))?;
// Get post IDs and their current header timestamps
// Brief lock: gather post IDs and their current header timestamps
let post_headers: Vec<([u8; 32], u64)> = {
let storage = self.storage.lock().await;
let post_ids = storage.list_post_ids()?;
@ -1733,6 +1733,7 @@ impl ConnectionManager {
})
.collect()
};
// Lock RELEASED — all network I/O happens without the lock
let mut updated = 0;
// Request headers in batches to avoid opening too many streams
@ -1757,6 +1758,7 @@ impl ConnectionManager {
if response.updated {
if let Some(json) = &response.header_json {
if let Ok(header) = serde_json::from_str::<crate::types::BlobHeader>(json) {
// Brief re-lock for writes only
let storage = self.storage.lock().await;
// Store the full header JSON
let _ = storage.store_blob_header(
@ -1775,6 +1777,7 @@ impl ConnectionManager {
let _ = storage.store_comment(comment);
}
let _ = storage.set_comment_policy(&header.post_id, &header.policy);
drop(storage);
updated += 1;
}
}

View file

@ -1162,38 +1162,32 @@ impl Node {
cid: &[u8; 32],
post_id: &PostId,
) -> anyhow::Result<Option<Vec<u8>>> {
// Get raw blob data (local)
// Get raw blob data (local — no lock needed)
let raw_data = match self.blob_store.get(cid)? {
Some(d) => d,
None => return Ok(None),
};
{
// Single lock acquisition for all DB reads
let (post, visibility, group_seeds) = {
let storage = self.storage.lock().await;
let _ = storage.touch_blob_access(cid);
}
// Get post + visibility
let (post, visibility) = {
let storage = self.storage.lock().await;
match storage.get_post_with_visibility(post_id)? {
Some(pv) => pv,
Some((post, vis)) => {
let seeds = if matches!(vis, PostVisibility::GroupEncrypted { .. }) {
storage.get_all_group_seeds_map().unwrap_or_default()
} else {
std::collections::HashMap::new()
};
(post, vis, seeds)
}
None => return Ok(Some(raw_data)), // No post context — return raw
}
};
// Lock released — decrypt without lock
match &visibility {
PostVisibility::Public => Ok(Some(raw_data)),
PostVisibility::Encrypted { .. } => {
let empty_map = std::collections::HashMap::new();
self.decrypt_blob_for_post(raw_data, &post, &visibility, &empty_map)
}
PostVisibility::GroupEncrypted { .. } => {
let group_seeds = {
let storage = self.storage.lock().await;
storage.get_all_group_seeds_map().unwrap_or_default()
};
self.decrypt_blob_for_post(raw_data, &post, &visibility, &group_seeds)
}
_ => self.decrypt_blob_for_post(raw_data, &post, &visibility, &group_seeds),
}
}
@ -1203,28 +1197,34 @@ impl Node {
const MAX_PREFETCH_PER_CYCLE: usize = 20;
pub async fn prefetch_blobs_from_peer(&self, peer_id: &NodeId) {
// Gather posts with missing blobs, newest first, capped
let missing: Vec<(PostId, NodeId, Vec<crate::types::Attachment>)> = {
// Brief lock: get post IDs and their attachment info
let posts_with_atts: Vec<(PostId, NodeId, Vec<crate::types::Attachment>)> = {
let storage = self.storage.lock().await;
let post_ids = storage.list_post_ids().unwrap_or_default();
let mut result = Vec::new();
let mut total_missing = 0usize;
// list_post_ids returns newest first typically; cap total missing blobs
for pid in post_ids {
if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; }
if result.len() >= Self::MAX_PREFETCH_PER_CYCLE { break; }
if let Ok(Some(post)) = storage.get_post(&pid) {
let missing_atts: Vec<_> = post.attachments.iter()
.filter(|a| !self.blob_store.has(&a.cid))
.cloned()
.collect();
if !missing_atts.is_empty() {
total_missing += missing_atts.len();
result.push((pid, post.author, missing_atts));
if !post.attachments.is_empty() {
result.push((pid, post.author, post.attachments.clone()));
}
}
}
result
};
// Lock released — check blob store and filter without lock
let mut missing: Vec<(PostId, NodeId, Vec<crate::types::Attachment>)> = Vec::new();
let mut total_missing = 0usize;
for (pid, author, atts) in posts_with_atts {
if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; }
let missing_atts: Vec<_> = atts.into_iter()
.filter(|a| !self.blob_store.has(&a.cid))
.collect();
if !missing_atts.is_empty() {
total_missing += missing_atts.len();
missing.push((pid, author, missing_atts));
}
}
if missing.is_empty() {
return;
@ -4250,35 +4250,7 @@ impl Node {
.as_millis() as u64;
let since_ms = now_ms.saturating_sub(seventy_two_hours_ms);
let under_replicated: Vec<PostId> = {
let storage = self.storage.lock().await;
let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) {
Ok(ids) => ids,
Err(e) => {
debug!(error = %e, "Replication: failed to get own recent posts");
return;
}
};
// 3. Filter to under-replicated (< 2 downstream)
let mut needs_replication = Vec::new();
for pid in &recent_ids {
match storage.get_post_downstream_count(pid) {
Ok(count) if count < 2 => {
needs_replication.push(*pid);
}
_ => {}
}
}
needs_replication
};
// 4. If none need replication, skip silently
if under_replicated.is_empty() {
return;
}
// 5. Find connected Available/Persistent peers
// Get connected peers first (no storage lock needed)
let connected = self.network.connected_peers().await;
if connected.is_empty() {
debug!("No peers for replication");
@ -4294,8 +4266,29 @@ impl Node {
}
};
let suitable_peers: Vec<(NodeId, u16)> = {
// Single lock: get under-replicated posts AND peer roles/pressure
let (under_replicated, suitable_peers) = {
let storage = self.storage.lock().await;
let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) {
Ok(ids) => ids,
Err(e) => {
debug!(error = %e, "Replication: failed to get own recent posts");
return;
}
};
// Filter to under-replicated (< 2 downstream)
let mut needs_replication = Vec::new();
for pid in &recent_ids {
match storage.get_post_downstream_count(pid) {
Ok(count) if count < 2 => {
needs_replication.push(*pid);
}
_ => {}
}
}
// Get peer roles + cache pressure in same lock
let mut candidates = Vec::new();
for peer_id in &connected {
if *peer_id == self.node_id { continue; }
@ -4312,9 +4305,15 @@ impl Node {
let score = role_priority(&role) + pressure;
candidates.push((*peer_id, score));
}
candidates
(needs_replication, candidates)
};
// If none need replication, skip silently
if under_replicated.is_empty() {
return;
}
if suitable_peers.is_empty() {
debug!("No peers available for replication");
return;

View file

@ -124,8 +124,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
None
};
// Gather all known holders: author + CDN downstream peers
let (holders, local_post) = {
// Single lock: gather holders, local post, AND author name if local
let (holders, local_post, local_author_name) = {
let store = node.storage.lock().await;
let mut holders = Vec::new();
@ -141,7 +141,18 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
}
let local = store.get_post_with_visibility(&post_id).ok().flatten();
(holders, local)
// If we have the post locally and it's public, get author name now
let author_name = if let Some((ref post, ref vis)) = local {
if matches!(vis, PostVisibility::Public) {
store.get_profile(&post.author).ok().flatten()
.map(|p| p.display_name).unwrap_or_default()
} else {
String::new()
}
} else {
String::new()
};
(holders, local, author_name)
};
// --- Tier 1 & 2: Try direct redirect to an HTTP-capable holder ---
@ -157,15 +168,10 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
// --- Tier 3: QUIC proxy fallback ---
// Check local storage first
// Check local storage first (author_name already fetched above)
if let Some((post, visibility)) = local_post {
if matches!(visibility, PostVisibility::Public) {
let author_name = {
let store = node.storage.lock().await;
store.get_profile(&post.author).ok().flatten()
.map(|p| p.display_name).unwrap_or_default()
};
let html = render_post_html(&post, &post_id, &author_name);
let html = render_post_html(&post, &post_id, &local_author_name);
let _ = write_http_response(stream, 200, "text/html; charset=utf-8", html.as_bytes()).await;
return;
}
@ -182,14 +188,12 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
match search_result {
Ok(Ok(Some(sync_post))) => {
{
// Single lock: store post AND get author name
let author_name = {
let store = node.storage.lock().await;
let _ = store.store_post_with_visibility(
&sync_post.id, &sync_post.post, &sync_post.visibility,
);
}
let author_name = {
let store = node.storage.lock().await;
store.get_profile(&sync_post.post.author).ok().flatten()
.map(|p| p.display_name).unwrap_or_default()
};