v0.4.1: Security hardening, lock contention fixes, data cleanup

Security:
- Reaction signatures: ed25519 sign/verify (sign_reaction, verify_reaction_signature)
  Backward-compatible — unsigned reactions from old nodes still accepted
- Comment signature verification: verify_comment_signature now called on receipt
- Reaction removal authorization: only reactor or post author can remove
- BlobHeader author verification: lookup actual author from storage, don't trust payload

Lock contention (4 fixes):
- ManifestPush discovery: cm lock released before PostFetch I/O
- Pull request handler: load under lock, filter without lock, brief re-lock for is_deleted
- Pull sender: split into two brief locks (store posts, then batch upstream+sync)
- Engagement checker: batch all chunk results, single lock for writes

Data cleanup:
- Post deletion cleans post_downstream, post_upstream, seen_engagement tables
- Added TODO-hardening.md documenting remaining DOS/security/lock/data issues

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-03-21 19:30:38 -04:00
parent bbaacf9b6c
commit bb6f2b64b0
11 changed files with 500 additions and 138 deletions

View file

@ -1596,32 +1596,42 @@ impl ConnectionManager {
.as_millis() as u64;
let mut stored = false;
let mut new_post_ids: Vec<PostId> = Vec::new();
let storage = self.storage.lock().await;
let mut synced_authors: HashSet<NodeId> = HashSet::new();
for sp in response.posts {
if verify_post_id(&sp.id, &sp.post) && !storage.is_deleted(&sp.id)? {
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, from, prio);
new_post_ids.push(sp.id);
synced_authors.insert(sp.post.author);
if sp.id == notification.post_id {
stored = true;
// Brief lock 1: store posts
{
let storage = self.storage.lock().await;
for sp in &response.posts {
if verify_post_id(&sp.id, &sp.post) && !storage.is_deleted(&sp.id)? {
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
new_post_ids.push(sp.id);
synced_authors.insert(sp.post.author);
if sp.id == notification.post_id {
stored = true;
}
}
}
}
// Protocol v4: update last_sync_ms for authors whose posts were received
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
}
for vu in response.visibility_updates {
if let Some(post) = storage.get_post(&vu.post_id)? {
if post.author == vu.author {
let _ = storage.update_post_visibility(&vu.post_id, &vu.visibility);
// Lock RELEASED
// Brief lock 2: upstream + last_sync + visibility updates
{
let storage = self.storage.lock().await;
for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, from, prio);
}
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
}
for vu in &response.visibility_updates {
if let Some(post) = storage.get_post(&vu.post_id)? {
if post.author == vu.author {
let _ = storage.update_post_visibility(&vu.post_id, &vu.visibility);
}
}
}
}
drop(storage);
// Register as downstream for new posts (cap at 50 to avoid flooding)
if !new_post_ids.is_empty() {
@ -1679,32 +1689,35 @@ impl ConnectionManager {
.unwrap_or_default()
.as_millis() as u64;
// Brief lock 1: store posts
let mut synced_authors: HashSet<NodeId> = HashSet::new();
{
let storage = self.storage.lock().await;
// Track which authors had posts received for last_sync_ms update
let mut synced_authors: HashSet<NodeId> = HashSet::new();
for sp in &response.posts {
if storage.is_deleted(&sp.id)? {
continue;
}
if verify_post_id(&sp.id, &sp.post) {
if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? {
// Record who we got this post from (upstream for engagement propagation)
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, peer_id, prio);
new_post_ids.push(sp.id);
posts_received += 1;
}
synced_authors.insert(sp.post.author);
}
}
}
// Lock RELEASED
// Protocol v4: update last_sync_ms for authors whose posts were received
// Brief lock 2: upstream + last_sync + visibility updates
{
let storage = self.storage.lock().await;
for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, peer_id, prio);
}
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
}
for vu in response.visibility_updates {
if vu.author != *peer_id {
// Only accept visibility updates authored by the responding peer
@ -1775,8 +1788,10 @@ impl ConnectionManager {
let mut updated = 0;
// Request headers in batches to avoid opening too many streams
for chunk in post_headers.chunks(20) {
// Collect all results for this chunk WITHOUT holding the lock
let mut results: Vec<([u8; 32], Option<(String, crate::types::BlobHeader)>)> = Vec::new();
for (post_id, current_ts) in chunk {
let result: anyhow::Result<()> = async {
let result: anyhow::Result<Option<(String, crate::types::BlobHeader)>> = async {
let (mut send, mut recv) = pc.connection.open_bi().await?;
let request = BlobHeaderRequestPayload {
post_id: *post_id,
@ -1792,45 +1807,52 @@ impl ConnectionManager {
let response: BlobHeaderResponsePayload =
read_payload(&mut recv, MAX_PAYLOAD).await?;
// Brief re-lock for writes
let storage = self.storage.lock().await;
// Always update last_check_ms regardless of whether engagement changed
let _ = storage.update_post_last_check(post_id, now_ms);
if response.updated {
if let Some(json) = &response.header_json {
if let Ok(header) = serde_json::from_str::<crate::types::BlobHeader>(json) {
// Store the full header JSON
let _ = storage.store_blob_header(
&header.post_id,
&header.author,
json,
header.updated_at,
);
// Apply individual reactions and comments.
// store_reaction / store_comment are tombstone-aware:
// they compare timestamps and respect deleted_at fields.
for reaction in &header.reactions {
let _ = storage.store_reaction(reaction);
}
for comment in &header.comments {
let _ = storage.store_comment(comment);
}
let _ = storage.set_comment_policy(&header.post_id, &header.policy);
// Update last_engagement_ms when new engagement arrives
let _ = storage.update_post_last_engagement(post_id, now_ms);
updated += 1;
if let Some(json) = response.header_json {
if let Ok(header) = serde_json::from_str::<crate::types::BlobHeader>(&json) {
return Ok(Some((json, header)));
}
}
}
drop(storage);
Ok(())
Ok(None)
}
.await;
if let Err(e) = result {
trace!(post_id = hex::encode(post_id), error = %e, "Failed to fetch engagement header");
match result {
Ok(header_opt) => results.push((*post_id, header_opt)),
Err(e) => {
trace!(post_id = hex::encode(post_id), error = %e, "Failed to fetch engagement header");
}
}
}
// Single lock for ALL writes in this chunk
if !results.is_empty() {
let storage = self.storage.lock().await;
for (post_id, header_opt) in &results {
let _ = storage.update_post_last_check(post_id, now_ms);
if let Some((json, header)) = header_opt {
let _ = storage.store_blob_header(
&header.post_id,
&header.author,
json,
header.updated_at,
);
// store_reaction / store_comment are tombstone-aware:
// they compare timestamps and respect deleted_at fields.
for reaction in &header.reactions {
let _ = storage.store_reaction(reaction);
}
for comment in &header.comments {
let _ = storage.store_comment(comment);
}
let _ = storage.set_comment_policy(&header.post_id, &header.policy);
let _ = storage.update_post_last_engagement(post_id, now_ms);
updated += 1;
}
}
drop(storage);
}
// Lock RELEASED before next chunk
}
Ok(updated)
@ -1852,55 +1874,61 @@ impl ConnectionManager {
let since_ms_map: HashMap<NodeId, u64> = request.since_ms.into_iter().collect();
let use_since_ms = !since_ms_map.is_empty();
let (posts, vis_updates) = {
// Phase 1: Brief lock — load data
let (all_posts, group_members) = {
let storage = self.storage.lock().await;
let all_posts = storage.list_posts_with_visibility()?;
let group_members = storage.get_all_group_members().unwrap_or_default();
let posts = storage.list_posts_with_visibility()?;
let members = storage.get_all_group_members().unwrap_or_default();
(posts, members)
};
// Lock RELEASED
let mut posts_to_send = Vec::new();
let mut vis_updates_to_send = Vec::new();
// Phase 2: Filter without lock (pure CPU)
let mut candidates_to_send = Vec::new();
let mut vis_updates_to_send = Vec::new();
for (id, post, visibility) in all_posts {
let should_send =
crate::network::should_send_post(&post, &visibility, &remote_node_id, &their_follows, &group_members);
for (id, post, visibility) in all_posts {
let should_send =
crate::network::should_send_post(&post, &visibility, &remote_node_id, &their_follows, &group_members);
if !should_send {
continue;
}
// Determine if peer already has this post
let peer_has_post = if use_since_ms {
// v4 path: filter by per-author timestamp (60s fudge for clock skew)
if let Some(&since) = since_ms_map.get(&post.author) {
post.timestamp_ms <= since + 60_000
} else {
false // no since_ms for this author — they want everything
}
} else {
// Legacy path: use have_post_ids
their_post_ids.contains(&id)
};
if !peer_has_post {
if !storage.is_deleted(&id)? {
posts_to_send.push(SyncPost {
id,
post,
visibility,
});
}
} else {
// They already have the post — send visibility update if we authored it
if post.author == self.our_node_id {
vis_updates_to_send.push(crate::types::VisibilityUpdate {
post_id: id,
author: self.our_node_id,
visibility,
});
}
}
if !should_send {
continue;
}
// Determine if peer already has this post
let peer_has_post = if use_since_ms {
// v4 path: filter by per-author timestamp (60s fudge for clock skew)
if let Some(&since) = since_ms_map.get(&post.author) {
post.timestamp_ms <= since + 60_000
} else {
false // no since_ms for this author — they want everything
}
} else {
// Legacy path: use have_post_ids
their_post_ids.contains(&id)
};
if !peer_has_post {
candidates_to_send.push((id, post, visibility));
} else {
// They already have the post — send visibility update if we authored it
if post.author == self.our_node_id {
vis_updates_to_send.push(crate::types::VisibilityUpdate {
post_id: id,
author: self.our_node_id,
visibility,
});
}
}
}
// Phase 3: Brief re-lock for is_deleted checks on filtered posts
let (posts, vis_updates) = {
let storage = self.storage.lock().await;
let posts_to_send: Vec<SyncPost> = candidates_to_send.into_iter()
.filter(|(id, _, _)| !storage.is_deleted(id).unwrap_or(false))
.map(|(id, post, visibility)| SyncPost { id, post, visibility })
.collect();
(posts_to_send, vis_updates_to_send)
};
@ -4940,32 +4968,63 @@ impl ConnectionManager {
let cm_arc = conn_mgr.clone();
let sender_id = remote_node_id;
tokio::spawn(async move {
let cm = cm_arc.lock().await;
// Brief lock: get connection handle only
let conn = {
let cm = cm_arc.lock().await;
cm.connections_ref().get(&sender_id).map(|pc| pc.connection.clone())
};
// cm lock RELEASED
let Some(conn) = conn else { return };
let mut fetched = 0usize;
for (post_id, _author) in &discovery_posts {
if fetched >= 10 { break; }
match cm.send_post_fetch(&sender_id, post_id).await {
// PostFetch network I/O WITHOUT any lock
let result = async {
use crate::protocol::{PostFetchRequestPayload, PostFetchResponsePayload};
let (mut send, mut recv) = conn.open_bi().await?;
let req = PostFetchRequestPayload { post_id: *post_id };
write_typed_message(&mut send, MessageType::PostFetchRequest, &req).await?;
send.finish()?;
let msg_type = tokio::time::timeout(
std::time::Duration::from_secs(10),
read_message_type(&mut recv),
).await??;
if msg_type != MessageType::PostFetchResponse {
return anyhow::Ok(None);
}
let resp: PostFetchResponsePayload = read_payload(&mut recv, MAX_PAYLOAD).await?;
anyhow::Ok(resp.post)
}.await;
match result {
Ok(Some(sync_post)) => {
if crate::content::verify_post_id(&sync_post.id, &sync_post.post) {
let storage = cm.storage.lock().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
// Set upstream + register as downstream
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
// Update last_sync_ms for the author
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let _ = storage.update_follow_last_sync(&sync_post.post.author, now);
drop(storage);
// Register as downstream with the sender
if let Some(pc) = cm.connections_ref().get(&sender_id) {
let reg = crate::protocol::PostDownstreamRegisterPayload { post_id: sync_post.id };
if let Ok(mut send) = pc.connection.open_uni().await {
let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, &reg).await;
let _ = send.finish();
}
// Brief re-acquire for storage writes only
let stored = {
let cm = cm_arc.lock().await;
let storage = cm.storage.lock().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let _ = storage.update_follow_last_sync(&sync_post.post.author, now);
true
} else {
false
}
};
// cm lock RELEASED — register downstream without lock
if stored {
let reg = crate::protocol::PostDownstreamRegisterPayload { post_id: sync_post.id };
if let Ok(mut send) = conn.open_uni().await {
let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, &reg).await;
let _ = send.finish();
}
fetched += 1;
}
@ -5703,10 +5762,22 @@ impl ConnectionManager {
if let crate::types::ReactPermission::None = policy.allow_reacts {
continue;
}
// Verify signature (skip if empty for backward compat with old nodes)
if !reaction.signature.is_empty() && !crate::crypto::verify_reaction_signature(
&reaction.reactor,
&payload.post_id,
&reaction.emoji,
reaction.timestamp_ms,
&reaction.signature,
) {
continue; // Skip forged reactions
}
let _ = storage.store_reaction(reaction);
}
BlobHeaderDiffOp::RemoveReaction { reactor, emoji, post_id } => {
let _ = storage.remove_reaction(reactor, post_id, emoji);
if *reactor == sender || sender == payload.author {
let _ = storage.remove_reaction(reactor, post_id, emoji);
}
}
BlobHeaderDiffOp::AddComment(comment) => {
if policy.blocklist.contains(&comment.author) {
@ -5721,6 +5792,15 @@ impl ConnectionManager {
}
crate::types::CommentPermission::Public => {}
}
if !crate::crypto::verify_comment_signature(
&comment.author,
&payload.post_id,
&comment.content,
comment.timestamp_ms,
&comment.signature,
) {
continue; // Skip forged comments
}
let _ = storage.store_comment(comment);
}
BlobHeaderDiffOp::EditComment { author, post_id, timestamp_ms, new_content } => {
@ -5832,8 +5912,14 @@ impl ConnectionManager {
header.comments = comments;
header.policy = policy;
header.updated_at = payload.timestamp_ms;
// Look up actual post author (don't trust payload.author)
let actual_author = storage.get_post(&payload.post_id)
.ok().flatten()
.map(|p| p.author)
.unwrap_or(payload.author); // fallback if post not stored yet
header.author = actual_author;
if let Ok(json) = serde_json::to_string(&header) {
let _ = storage.store_blob_header(&payload.post_id, &payload.author, &json, payload.timestamp_ms);
let _ = storage.store_blob_header(&payload.post_id, &actual_author, &json, payload.timestamp_ms);
}
// Phase 4: Update last_engagement_ms when engagement arrives via diff
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);