From 3a0d2e93ab106b13f4d5cac9d62cffb593b2caf1 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 21:00:53 -0400 Subject: [PATCH] Phase 2c (0.6.1-beta): route engagement diffs through file_holders propagate_engagement_diff now targets the post's flat holder set (up to 5 most-recent) instead of the post_downstream directional tree. The holder set naturally subsumes the old upstream+downstream partition, so the separate "also send to upstreams" loops at each engagement call site are removed (reactions, comments, comment edit/delete, receipt slots, comment slots). handle_blob_header_diff on receive: - records the sending peer as a file holder (an engagement exchange is proof the peer holds the post) - re-propagates to the holder set minus the sender Writes to post_upstream / post_downstream still occur from Phase 2b (dual-write); those and the legacy tables will be removed in 2e. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 41 +++++++++++++--------------- crates/core/src/network.rs | 14 +++++----- crates/core/src/node.rs | 50 ++--------------------------------- 3 files changed, 28 insertions(+), 77 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 7c8b7b3..67656cf 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -6255,12 +6255,14 @@ impl ConnectionManager { Ok(()) } - /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream. + /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate + /// to the post's file_holders (flat set, up to 5 most recent). async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) { use crate::types::BlobHeaderDiffOp; - // Gather policy + audience data, then drop lock immediately - let (policy, approved_audience, downstream, upstreams) = { + // Gather policy + audience data + holders, then drop lock immediately. + // Remote peer clearly holds this post — record them as a holder. + let (policy, approved_audience, holders) = { let storage = self.storage.get().await; let policy = storage.get_comment_policy(&payload.post_id) .ok() @@ -6270,13 +6272,18 @@ impl ConnectionManager { crate::types::AudienceDirection::Inbound, Some(crate::types::AudienceStatus::Approved), ).unwrap_or_default(); - let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default(); - let upstreams: Vec = storage.get_post_upstreams(&payload.post_id) + let _ = storage.touch_file_holder( + &payload.post_id, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); + let holders: Vec = storage.get_file_holders(&payload.post_id) .unwrap_or_default() .into_iter() - .map(|(nid, _)| nid) + .map(|(nid, _addrs)| nid) .collect(); - (policy, approved, downstream, upstreams) + (policy, approved, holders) }; // Filter ops using gathered data (no lock held) @@ -6458,26 +6465,16 @@ impl ConnectionManager { let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms); } - // Collect all targets (downstream + all upstreams), then send in a single batched task + // Re-propagate to all file holders (flat set, max 5). Exclude sender. let mut targets: Vec = Vec::new(); - for peer_id in downstream { - if peer_id == sender { continue; } - if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone())) + for peer_id in &holders { + if *peer_id == sender { continue; } + if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone())) { targets.push(conn); } } - // Phase 6: Try all upstreams, not just one - for up in &upstreams { - if *up != sender { - if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone())) - { - targets.push(conn); - } - } - } if !targets.is_empty() { let payload_clone = payload.clone(); tokio::spawn(async move { diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 55f4124..835713f 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -2320,24 +2320,24 @@ impl Network { self.endpoint.close().await; } - /// Propagate an engagement diff to all downstream holders of a post (CDN tree). - /// Excludes the sender to avoid loops. + /// Propagate an engagement diff to all known holders of a post (flat set, + /// up to 5 most-recent). Excludes the sender to avoid loops. pub async fn propagate_engagement_diff( &self, post_id: &crate::types::PostId, payload: &crate::protocol::BlobHeaderDiffPayload, exclude_peer: &crate::types::NodeId, ) -> usize { - let downstream = { + let holders = { let storage = self.storage.get().await; - storage.get_post_downstream(post_id).unwrap_or_default() + storage.get_file_holders(post_id).unwrap_or_default() }; let mut sent = 0; - for ds_nid in &downstream { - if ds_nid == exclude_peer { + for (peer, _addrs) in &holders { + if peer == exclude_peer { continue; } - if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() { + if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() { sent += 1; } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 4b328ac..c072d15 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -3585,15 +3585,9 @@ impl Node { ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())], timestamp_ms: now, }; + // propagate_engagement_diff targets all file_holders (flat set, max 5) + // which already subsumes what used to be upstream + downstream. network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send to all upstreams (toward author) — Phase 6 multi-upstream - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(reaction) @@ -3700,14 +3694,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send to all upstreams (toward author) — Phase 6 multi-upstream - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(comment) @@ -3744,14 +3730,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(()) } @@ -3785,14 +3763,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(()) } @@ -4014,14 +3984,6 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } Ok(()) } @@ -4136,14 +4098,6 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } Ok(()) }