Phase 2c (0.6.1-beta): route engagement diffs through file_holders

propagate_engagement_diff now targets the post's flat holder set (up to
5 most-recent) instead of the post_downstream directional tree. The
holder set naturally subsumes the old upstream+downstream partition, so
the separate "also send to upstreams" loops at each engagement call
site are removed (reactions, comments, comment edit/delete, receipt
slots, comment slots).

handle_blob_header_diff on receive:
- records the sending peer as a file holder (an engagement exchange is
  proof the peer holds the post)
- re-propagates to the holder set minus the sender

Writes to post_upstream / post_downstream still occur from Phase 2b
(dual-write); those and the legacy tables will be removed in 2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-04-21 21:00:53 -04:00
parent 0b2b4f5a68
commit 3a0d2e93ab
3 changed files with 28 additions and 77 deletions

View file

@ -6255,12 +6255,14 @@ impl ConnectionManager {
Ok(())
}
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream.
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate
/// to the post's file_holders (flat set, up to 5 most recent).
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
use crate::types::BlobHeaderDiffOp;
// Gather policy + audience data, then drop lock immediately
let (policy, approved_audience, downstream, upstreams) = {
// Gather policy + audience data + holders, then drop lock immediately.
// Remote peer clearly holds this post — record them as a holder.
let (policy, approved_audience, holders) = {
let storage = self.storage.get().await;
let policy = storage.get_comment_policy(&payload.post_id)
.ok()
@ -6270,13 +6272,18 @@ impl ConnectionManager {
crate::types::AudienceDirection::Inbound,
Some(crate::types::AudienceStatus::Approved),
).unwrap_or_default();
let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default();
let upstreams: Vec<NodeId> = storage.get_post_upstreams(&payload.post_id)
let _ = storage.touch_file_holder(
&payload.post_id,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
let holders: Vec<NodeId> = storage.get_file_holders(&payload.post_id)
.unwrap_or_default()
.into_iter()
.map(|(nid, _)| nid)
.map(|(nid, _addrs)| nid)
.collect();
(policy, approved, downstream, upstreams)
(policy, approved, holders)
};
// Filter ops using gathered data (no lock held)
@ -6458,26 +6465,16 @@ impl ConnectionManager {
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
}
// Collect all targets (downstream + all upstreams), then send in a single batched task
// Re-propagate to all file holders (flat set, max 5). Exclude sender.
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
for peer_id in downstream {
if peer_id == sender { continue; }
if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone()))
for peer_id in &holders {
if *peer_id == sender { continue; }
if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone()))
{
targets.push(conn);
}
}
// Phase 6: Try all upstreams, not just one
for up in &upstreams {
if *up != sender {
if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone()))
{
targets.push(conn);
}
}
}
if !targets.is_empty() {
let payload_clone = payload.clone();
tokio::spawn(async move {