From e6265b52b6fc92abe9ab7bed697fbdfe7bfaad79 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 20:46:34 -0400 Subject: [PATCH 1/6] Phase 1 (0.6.0-beta): remove direct PostPush for encrypted posts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Encrypted posts now propagate only via the CDN (ManifestPush + neighbor header updates), eliminating the sender→recipient traffic signal on the wire. Encrypted DMs are indistinguishable from any other encrypted post. - Remove push_post_to_recipients entirely from network.rs - Remove call sites in create_post and re-encrypt-on-revoke - PostPush handler now ignores non-public visibility (kept for public audience push path) Known gap: non-follower DMs won't reach until Phase 3 (merged pull + recipient-match). Followers receive via the existing CDN path — new posts trigger neighbor-manifest updates, ManifestPush fans out to downstream holders, recipients pull missing post IDs from followed authors. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 40 +++++++++++++++++++------------ crates/core/src/network.rs | 44 ----------------------------------- crates/core/src/node.rs | 15 +++++------- 3 files changed, 31 insertions(+), 68 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index bfb0090..6b73b03 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -4958,24 +4958,34 @@ impl ConnectionManager { } MessageType::PostPush => { let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?; - let cm = conn_mgr.lock().await; - let storage = cm.storage.get().await; - if !storage.is_deleted(&push.post.id)? - && storage.get_post(&push.post.id)?.is_none() - && crate::content::verify_post_id(&push.post.id, &push.post.post) - { - let _ = storage.store_post_with_visibility( - &push.post.id, - &push.post.post, - &push.post.visibility, - ); - let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); - info!( + // Encrypted posts are no longer accepted via direct push — they propagate + // via the CDN to eliminate the sender→recipient traffic signal. + if !matches!(push.post.visibility, crate::types::PostVisibility::Public) { + debug!( peer = hex::encode(remote_node_id), post_id = hex::encode(push.post.id), - "Received direct post push" + "Ignoring non-public PostPush" ); + } else { + let cm = conn_mgr.lock().await; + let storage = cm.storage.get().await; + if !storage.is_deleted(&push.post.id)? + && storage.get_post(&push.post.id)?.is_none() + && crate::content::verify_post_id(&push.post.id, &push.post.post) + { + let _ = storage.store_post_with_visibility( + &push.post.id, + &push.post.post, + &push.post.visibility, + ); + let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); + info!( + peer = hex::encode(remote_node_id), + post_id = hex::encode(push.post.id), + "Received direct post push" + ); + } } } MessageType::AudienceRequest => { diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 8874a2e..32273cd 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -902,50 +902,6 @@ impl Network { self.send_to_audience(MessageType::PostNotification, &payload).await } - /// Push a full post directly to recipients (persistent if available, ephemeral otherwise). - pub async fn push_post_to_recipients( - &self, - post_id: &crate::types::PostId, - post: &Post, - visibility: &PostVisibility, - ) -> usize { - let recipients: Vec = match visibility { - PostVisibility::Public => return 0, - PostVisibility::Encrypted { recipients } => { - recipients.iter().map(|wk| wk.recipient).collect() - } - PostVisibility::GroupEncrypted { group_id, .. } => { - // Push to all group members - match self.storage.get().await.get_all_group_members() { - Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(), - Err(_) => return 0, - } - } - }; - - let payload = PostPushPayload { - post: SyncPost { - id: *post_id, - post: post.clone(), - visibility: visibility.clone(), - }, - }; - - let mut pushed = 0; - for recipient in &recipients { - if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() { - pushed += 1; - debug!( - recipient = hex::encode(recipient), - post_id = hex::encode(post_id), - "Pushed post to recipient" - ); - } - } - - pushed - } - /// Push a profile update to all audience members (ephemeral-capable). pub async fn push_profile(&self, profile: &PublicProfile) -> usize { // Sanitize: if public_visible=false, strip display_name/bio from pushed profile diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 067c632..8e89b80 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -836,13 +836,12 @@ impl Node { } } - // For encrypted posts, push directly to recipients - let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await; - - // For public posts, push to audience members + // For public posts, push to audience members. Encrypted posts propagate + // via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient + // traffic signal. let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await; - info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post"); + info!(post_id = hex::encode(post_id), audience_pushed, "Created new post"); Ok((post_id, post, visibility)) } @@ -2108,12 +2107,10 @@ impl Node { storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?; } - // delete_post already pushes the DeleteRecord + // delete_post already pushes the DeleteRecord. + // Replacement post propagates via the CDN to remaining recipients. self.delete_post(post_id).await?; - // Push replacement post directly to remaining recipients - self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await; - info!( old_id = hex::encode(post_id), new_id = hex::encode(new_post_id), From 1658762a68f871ca1b7bd4b96aa0703700fdff00 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 20:52:30 -0400 Subject: [PATCH 2/6] Phase 2a (0.6.1-beta): add file_holders table + legacy seed migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New flat per-file holder set replaces the directional upstream/downstream trees. Keyed by 32-byte content-addressed file_id (works for both PostId and blob CID). LRU-capped at 5 holders per file on touch. - HolderDirection enum (Sent/Received/Both) — tracked for potential reuse, not load-bearing for propagation - touch_file_holder / get_file_holders / delete_file_holders - seed_file_holders_from_legacy: one-time idempotent seed from post_upstream, post_downstream, blob_upstream, blob_downstream so users upgrading from 0.6.0 don't start with empty holder sets Table and methods land here; call-site refactor and legacy-table drop follow in subsequent commits within this phase. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/storage.rs | 152 ++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 1 deletion(-) diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 5f37559..8b8ba25 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -12,6 +12,26 @@ use crate::types::{ VisibilityIntent, }; +/// Direction for file_holders entries: whether we sent the file to this peer, +/// received it from them, or both. Not load-bearing for propagation decisions — +/// any holder can serve as a diff target — but retained for potential reuse. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HolderDirection { + Sent, + Received, + Both, +} + +impl HolderDirection { + pub fn as_str(&self) -> &'static str { + match self { + HolderDirection::Sent => "sent", + HolderDirection::Received => "received", + HolderDirection::Both => "both", + } + } +} + /// Blob metadata for eviction scoring. pub struct EvictionCandidate { pub cid: [u8; 32], @@ -389,7 +409,17 @@ impl Storage { CREATE TABLE IF NOT EXISTS seen_messages ( partner_id BLOB PRIMARY KEY, last_read_ms INTEGER NOT NULL DEFAULT 0 - );", + ); + CREATE TABLE IF NOT EXISTS file_holders ( + file_id BLOB NOT NULL, + peer_id BLOB NOT NULL, + peer_addresses TEXT NOT NULL DEFAULT '[]', + last_interaction_ms INTEGER NOT NULL, + direction TEXT NOT NULL, + PRIMARY KEY (file_id, peer_id) + ); + CREATE INDEX IF NOT EXISTS idx_file_holders_recency + ON file_holders(file_id, last_interaction_ms DESC);", )?; Ok(()) } @@ -686,6 +716,11 @@ impl Storage { )?; } + // 0.6.1-beta: seed file_holders from legacy upstream/downstream tables + // before they're dropped. Idempotent — only fires on an empty + // file_holders table. + self.seed_file_holders_from_legacy()?; + Ok(()) } @@ -4393,6 +4428,121 @@ impl Storage { Ok(count as u32) } + // --- File holders (flat, per-file, LRU-capped at 5) --- + // + // A single table for PostId-keyed engagement propagation and CID-keyed + // manifest/blob propagation. Any 32-byte content-addressed file_id fits. + + /// Upsert a holder for a file. Bumps last_interaction_ms to now and + /// enforces an LRU cap of 5 holders per file. + pub fn touch_file_holder( + &self, + file_id: &[u8; 32], + peer_id: &NodeId, + peer_addresses: &[String], + direction: HolderDirection, + ) -> anyhow::Result<()> { + let addrs_json = serde_json::to_string(peer_addresses)?; + let now = now_ms(); + let new_dir = direction.as_str(); + // Upsert. If the row exists with a different direction, promote to "both". + self.conn.execute( + "INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(file_id, peer_id) DO UPDATE SET + peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END, + last_interaction_ms = ?4, + direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END", + params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir], + )?; + // Enforce LRU cap of 5. Oldest get dropped. + self.conn.execute( + "DELETE FROM file_holders + WHERE file_id = ?1 + AND peer_id NOT IN ( + SELECT peer_id FROM file_holders + WHERE file_id = ?1 + ORDER BY last_interaction_ms DESC + LIMIT 5 + )", + params![file_id.as_slice()], + )?; + Ok(()) + } + + /// Return the up-to-5 most recently interacted holders of a file. + pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result)>> { + let mut stmt = self.conn.prepare( + "SELECT peer_id, peer_addresses FROM file_holders + WHERE file_id = ?1 + ORDER BY last_interaction_ms DESC + LIMIT 5", + )?; + let rows = stmt.query_map(params![file_id.as_slice()], |row| { + let peer_bytes: Vec = row.get(0)?; + let addrs_json: String = row.get(1)?; + Ok((peer_bytes, addrs_json)) + })?; + let mut out = Vec::new(); + for row in rows { + let (peer_bytes, addrs_json) = row?; + if peer_bytes.len() != 32 { continue; } + let mut peer = [0u8; 32]; + peer.copy_from_slice(&peer_bytes); + let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); + out.push((NodeId::from(peer), addrs)); + } + Ok(out) + } + + /// Remove all holders for a file (e.g. on post/blob deletion). + pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM file_holders WHERE file_id = ?1", + params![file_id.as_slice()], + )?; + Ok(()) + } + + /// One-time migration: seed file_holders from the legacy upstream/downstream + /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder + /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the + /// PRIMARY KEY. + pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { + // Skip if file_holders already populated (idempotent re-run protection). + let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? + .query_row([], |row| row.get(0))?; + if existing > 0 { + return Ok(()); + } + let now = now_ms() as i64; + // post_upstream → holders we received engagement diffs from + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", + params![now], + )?; + // post_downstream → holders we sent engagement diffs to + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", + params![now], + )?; + // blob_upstream → peer we fetched the blob/manifest from + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", + params![now], + )?; + // blob_downstream → peers we served the blob/manifest to + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", + params![now], + )?; + Ok(()) + } + // --- Engagement: reactions --- /// Store a reaction (upsert by reactor+post_id+emoji). From 0b2b4f5a687522f7423d59455f1308fa6a7e63d0 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 20:56:28 -0400 Subject: [PATCH 3/6] Phase 2b (0.6.1-beta): dual-write file_holders on all propagation events Populate the flat holder set alongside every existing post_upstream / post_downstream / blob_upstream / blob_downstream write so that read paths can be switched over in the next commit without losing continuity. Events wired: - Pull sync receive (3 paths in connection.rs) - PostPush receive (public posts only after Phase 1) - PostFetch via notification (discovery pull) - PostDownstreamRegister - Replication accept (downstream) + replication-driven pull (upstream) - Attachment upstream recorded after replication blob fetch - ManifestPush receive (remote is a CID holder) - ManifestPush send (downstream peer becomes CID holder) - Blob fetch fallback (upstream lateral sources) Direction is tracked as Received vs Sent. Not load-bearing for routing; retained for future use. LRU cap of 5 enforced on every touch. Legacy upstream/downstream writes remain in place; they'll go away together with the table drops at the end of this phase. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 67 +++++++++++++++++++++++++++++++++++ crates/core/src/network.rs | 10 +++++- crates/core/src/node.rs | 12 +++++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 6b73b03..7c8b7b3 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1395,6 +1395,12 @@ impl ConnectionManager { for pid in &new_post_ids { let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let _ = s.add_post_upstream(pid, peer_id, prio); + let _ = s.touch_file_holder( + pid, + peer_id, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = s.update_follow_last_sync(author, now_ms); @@ -1942,6 +1948,12 @@ impl ConnectionManager { for pid in &new_post_ids { let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(pid, from, prio); + let _ = storage.touch_file_holder( + pid, + from, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -2036,6 +2048,12 @@ impl ConnectionManager { for pid in &new_post_ids { let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(pid, peer_id, prio); + let _ = storage.touch_file_holder( + pid, + peer_id, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -4980,6 +4998,12 @@ impl ConnectionManager { ); let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); + let _ = storage.touch_file_holder( + &push.post.id, + &remote_node_id, + &[], + crate::storage::HolderDirection::Received, + ); info!( peer = hex::encode(remote_node_id), post_id = hex::encode(push.post.id), @@ -5073,6 +5097,13 @@ impl ConnectionManager { &entry.manifest.author_manifest.author, entry.manifest.author_manifest.updated_at, ); + // Remote peer pushed us this manifest → they hold the file. + let _ = storage.touch_file_holder( + &entry.cid, + &remote_node_id, + &[], + crate::storage::HolderDirection::Received, + ); stored_entries.push(entry.clone()); } // Gather downstream peers for relay before dropping locks @@ -5188,6 +5219,12 @@ impl ConnectionManager { if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); + let _ = storage.touch_file_holder( + &sync_post.id, + &sender_id, + &[], + crate::storage::HolderDirection::Received, + ); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -5448,6 +5485,12 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; let storage = cm.storage.get().await; let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); + let _ = storage.touch_file_holder( + &payload.post_id, + &remote_node_id, + &[], + crate::storage::HolderDirection::Sent, + ); drop(storage); trace!( peer = hex::encode(remote_node_id), @@ -5708,6 +5751,12 @@ impl ConnectionManager { }); let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); + let _ = storage.touch_file_holder( + &payload.cid, + &remote_node_id, + &payload.requester_addresses, + crate::storage::HolderDirection::Sent, + ); if ok { (true, vec![]) } else { let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); let redirects: Vec = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); @@ -6085,6 +6134,12 @@ impl ConnectionManager { // Register as downstream for all accepted posts for pid in &acc { let _ = storage.add_post_downstream(pid, &remote_node_id); + let _ = storage.touch_file_holder( + pid, + &remote_node_id, + &[], + crate::storage::HolderDirection::Sent, + ); } (acc, rej, to_pull) @@ -6137,6 +6192,12 @@ impl ConnectionManager { let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(&sp.id, &sender, prio); + let _ = storage.touch_file_holder( + &sp.id, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); let blob_store = cm.blob_store.clone(); drop(storage); drop(cm); @@ -6164,6 +6225,12 @@ impl ConnectionManager { let storage = cm.storage.get().await; let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); let _ = storage.add_post_upstream(&att.cid, &sender, 0); + let _ = storage.touch_file_holder( + &att.cid, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); } Ok(()) }.await; diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 32273cd..55f4124 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -1032,9 +1032,17 @@ impl Network { }], }; let mut sent = 0; - for (ds_nid, _) in &downstream { + for (ds_nid, ds_addrs) in &downstream { if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { sent += 1; + // We pushed this file's manifest → downstream peer now holds it. + let storage = self.storage.get().await; + let _ = storage.touch_file_holder( + cid, + ds_nid, + ds_addrs, + crate::storage::HolderDirection::Sent, + ); } } sent diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 8e89b80..4b328ac 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1351,6 +1351,12 @@ impl Node { .map(|m| m.host_addresses.clone()) .unwrap_or_default(); let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); + let _ = storage.touch_file_holder( + cid, + from_peer, + &source_addrs, + crate::storage::HolderDirection::Received, + ); } Ok(data) } @@ -1413,6 +1419,12 @@ impl Node { } } let _ = storage.store_blob_upstream(cid, &lateral, &[]); + let _ = storage.touch_file_holder( + cid, + &lateral, + &[], + crate::storage::HolderDirection::Received, + ); return Ok(Some(data)); } Ok((None, response)) => { From 3a0d2e93ab106b13f4d5cac9d62cffb593b2caf1 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 21:00:53 -0400 Subject: [PATCH 4/6] Phase 2c (0.6.1-beta): route engagement diffs through file_holders propagate_engagement_diff now targets the post's flat holder set (up to 5 most-recent) instead of the post_downstream directional tree. The holder set naturally subsumes the old upstream+downstream partition, so the separate "also send to upstreams" loops at each engagement call site are removed (reactions, comments, comment edit/delete, receipt slots, comment slots). handle_blob_header_diff on receive: - records the sending peer as a file holder (an engagement exchange is proof the peer holds the post) - re-propagates to the holder set minus the sender Writes to post_upstream / post_downstream still occur from Phase 2b (dual-write); those and the legacy tables will be removed in 2e. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 41 +++++++++++++--------------- crates/core/src/network.rs | 14 +++++----- crates/core/src/node.rs | 50 ++--------------------------------- 3 files changed, 28 insertions(+), 77 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 7c8b7b3..67656cf 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -6255,12 +6255,14 @@ impl ConnectionManager { Ok(()) } - /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream. + /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate + /// to the post's file_holders (flat set, up to 5 most recent). async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) { use crate::types::BlobHeaderDiffOp; - // Gather policy + audience data, then drop lock immediately - let (policy, approved_audience, downstream, upstreams) = { + // Gather policy + audience data + holders, then drop lock immediately. + // Remote peer clearly holds this post — record them as a holder. + let (policy, approved_audience, holders) = { let storage = self.storage.get().await; let policy = storage.get_comment_policy(&payload.post_id) .ok() @@ -6270,13 +6272,18 @@ impl ConnectionManager { crate::types::AudienceDirection::Inbound, Some(crate::types::AudienceStatus::Approved), ).unwrap_or_default(); - let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default(); - let upstreams: Vec = storage.get_post_upstreams(&payload.post_id) + let _ = storage.touch_file_holder( + &payload.post_id, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); + let holders: Vec = storage.get_file_holders(&payload.post_id) .unwrap_or_default() .into_iter() - .map(|(nid, _)| nid) + .map(|(nid, _addrs)| nid) .collect(); - (policy, approved, downstream, upstreams) + (policy, approved, holders) }; // Filter ops using gathered data (no lock held) @@ -6458,26 +6465,16 @@ impl ConnectionManager { let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms); } - // Collect all targets (downstream + all upstreams), then send in a single batched task + // Re-propagate to all file holders (flat set, max 5). Exclude sender. let mut targets: Vec = Vec::new(); - for peer_id in downstream { - if peer_id == sender { continue; } - if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone())) + for peer_id in &holders { + if *peer_id == sender { continue; } + if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone())) { targets.push(conn); } } - // Phase 6: Try all upstreams, not just one - for up in &upstreams { - if *up != sender { - if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone())) - { - targets.push(conn); - } - } - } if !targets.is_empty() { let payload_clone = payload.clone(); tokio::spawn(async move { diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 55f4124..835713f 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -2320,24 +2320,24 @@ impl Network { self.endpoint.close().await; } - /// Propagate an engagement diff to all downstream holders of a post (CDN tree). - /// Excludes the sender to avoid loops. + /// Propagate an engagement diff to all known holders of a post (flat set, + /// up to 5 most-recent). Excludes the sender to avoid loops. pub async fn propagate_engagement_diff( &self, post_id: &crate::types::PostId, payload: &crate::protocol::BlobHeaderDiffPayload, exclude_peer: &crate::types::NodeId, ) -> usize { - let downstream = { + let holders = { let storage = self.storage.get().await; - storage.get_post_downstream(post_id).unwrap_or_default() + storage.get_file_holders(post_id).unwrap_or_default() }; let mut sent = 0; - for ds_nid in &downstream { - if ds_nid == exclude_peer { + for (peer, _addrs) in &holders { + if peer == exclude_peer { continue; } - if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() { + if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() { sent += 1; } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 4b328ac..c072d15 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -3585,15 +3585,9 @@ impl Node { ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())], timestamp_ms: now, }; + // propagate_engagement_diff targets all file_holders (flat set, max 5) + // which already subsumes what used to be upstream + downstream. network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send to all upstreams (toward author) — Phase 6 multi-upstream - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(reaction) @@ -3700,14 +3694,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send to all upstreams (toward author) — Phase 6 multi-upstream - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(comment) @@ -3744,14 +3730,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(()) } @@ -3785,14 +3763,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(()) } @@ -4014,14 +3984,6 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } Ok(()) } @@ -4136,14 +4098,6 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } Ok(()) } From 60463d18172a123dd2806a5cc92ca36852bb2a57 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 21:09:45 -0400 Subject: [PATCH 5/6] Phase 2d (0.6.1-beta): route manifest + blob ops through file_holders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch ALL propagation-decision reads to the flat holder set. push_manifest_to_downstream now targets file_holders instead of blob_downstream. ManifestPush receive-side relay likewise — known holders fan out to up to 5 most-recent peers instead of a directional tree. Blob delete notices: single flat fan-out to file_holders; the legacy upstream_node tree-healing field is emitted as None (wire-stable via serde default) and ignored on receive — the post-0.6 flat model doesn't need sender-role distinction. send_blob_delete_notices keeps its Option<&Upstream> parameter as an unused placeholder for signature stability with the call sites in this commit. Other reads migrated: - blob fetch cascade: step 2 now tries "known holders" (up to 5) instead of a single upstream - manifest refresh: downstream_count reported from file_holder_count - web/http post holder enumeration - Worm search post/blob holder fallback (both connection.rs paths) - DeleteRecord fan-out rewires to file_holders - Under-replication replication check: < 2 holders Storage additions: - get_file_holder_count(file_id) - remove_file_holder(file_id, peer_id) Legacy upstream/downstream writes are still happening from Phase 2b; those + the tables themselves go in 2e. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 103 +++++++++++++--------------------- crates/core/src/http.rs | 6 +- crates/core/src/network.rs | 57 ++++++------------- crates/core/src/node.rs | 44 +++++++-------- crates/core/src/storage.rs | 17 ++++++ crates/core/src/web.rs | 4 +- 6 files changed, 103 insertions(+), 128 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 67656cf..ccfbf90 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -2828,13 +2828,9 @@ impl ConnectionManager { if store.get_post_with_visibility(post_id).ok().flatten().is_some() { Some(self.our_node_id) } else { - // CDN tree: do any of our downstream hosts have it? - let downstream = store.get_post_downstream(post_id).unwrap_or_default(); - if !downstream.is_empty() { - Some(downstream[0]) - } else { - None - } + // Any known holder of this post? + let holders = store.get_file_holders(post_id).unwrap_or_default(); + holders.first().map(|(nid, _)| *nid) } }; post_holder = found; @@ -2848,9 +2844,9 @@ impl ConnectionManager { // Check CDN: do we know who has it via blob post ownership? let store = self.storage.get().await; if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) { - let downstream = store.get_post_downstream(&pid).unwrap_or_default(); - if !downstream.is_empty() { - blob_holder = Some(downstream[0]); + let holders = store.get_file_holders(&pid).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { + blob_holder = Some(*nid); } } } @@ -4889,7 +4885,7 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; // Collect blob CIDs + CDN peers before async work - let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = Vec::new(); + let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>)> = Vec::new(); { let storage = cm.storage.get().await; for dr in &payload.records { @@ -4897,9 +4893,8 @@ impl ConnectionManager { // Collect blobs for CDN cleanup before deleting let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default(); for cid in blob_cids { - let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); - let upstream = storage.get_blob_upstream(&cid).ok().flatten(); - blob_cleanup.push((cid, downstream, upstream)); + let holders = storage.get_file_holders(&cid).unwrap_or_default(); + blob_cleanup.push((cid, holders)); } let _ = storage.store_delete(dr); let _ = storage.apply_delete(dr); @@ -4915,18 +4910,11 @@ impl ConnectionManager { // Gather connections for CDN delete notices under lock, then send outside let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new(); - for (cid, downstream, upstream) in &blob_cleanup { - let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() }); - let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info }; - for (ds_nid, _) in downstream { - if let Some(pc) = cm.connections_ref().get(ds_nid) { - delete_notices.push((pc.connection.clone(), ds_payload.clone())); - } - } - if let Some((up_nid, _)) = upstream { - let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; - if let Some(pc) = cm.connections_ref().get(up_nid) { - delete_notices.push((pc.connection.clone(), up_payload)); + for (cid, holders) in &blob_cleanup { + let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; + for (peer, _addrs) in holders { + if let Some(pc) = cm.connections_ref().get(peer) { + delete_notices.push((pc.connection.clone(), payload.clone())); } } } @@ -5106,15 +5094,15 @@ impl ConnectionManager { ); stored_entries.push(entry.clone()); } - // Gather downstream peers for relay before dropping locks + // Gather file holders for relay before dropping locks let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new(); for entry in &stored_entries { - let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default(); - for (ds_nid, _) in downstream { - if ds_nid == remote_node_id { + let holders = storage.get_file_holders(&entry.cid).unwrap_or_default(); + for (peer, _addrs) in holders { + if peer == remote_node_id { continue; } - relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload { + relay_targets.push((peer, crate::protocol::ManifestPushPayload { manifests: vec![entry.clone()], })); } @@ -5315,32 +5303,14 @@ impl ConnectionManager { let storage = cm.storage.get().await; let cid = payload.cid; - // Check if sender was our upstream for this blob - let was_upstream = storage.get_blob_upstream(&cid).ok().flatten() - .map(|(nid, _)| nid == remote_node_id) - .unwrap_or(false); - - if was_upstream { - // Sender was our upstream — clear it - let _ = storage.remove_blob_upstream(&cid); - - // If they provided their upstream, store it as our new upstream - if let Some(ref new_up) = payload.upstream_node { - if let Ok(nid_bytes) = hex::decode(&new_up.n) { - if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { - let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a); - } - } - } - } else { - // Sender was our downstream — remove them - let _ = storage.remove_blob_downstream(&cid, &remote_node_id); - } + // Flat-holder model: drop the sender as a holder of this file. + // The author's DeleteRecord (separate signed message) is what + // triggers the actual blob removal for followers. + let _ = storage.remove_file_holder(&cid, &remote_node_id); info!( peer = hex::encode(remote_node_id), cid = hex::encode(cid), - was_upstream, "Received blob delete notice" ); } @@ -5745,21 +5715,28 @@ impl ConnectionManager { let storage = storage.get().await; let manifest: Option = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| { if let Ok(am) = serde_json::from_str::(&json) { - let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0); + let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { serde_json::from_str(&json).ok() } }); let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { - let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); + let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); let _ = storage.touch_file_holder( &payload.cid, &remote_node_id, &payload.requester_addresses, crate::storage::HolderDirection::Sent, ); - if ok { (true, vec![]) } else { - let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); - let redirects: Vec = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); + // If we already had 5 holders before adding this one, the + // requester should consult them too for CDN lookups. + if prior_count < 5 { + (true, vec![]) + } else { + let holders = storage.get_file_holders(&payload.cid).unwrap_or_default(); + let redirects: Vec = holders.into_iter() + .filter(|(nid, _)| *nid != remote_node_id) + .map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }) + .collect(); (false, redirects) } } else { (false, vec![]) }; @@ -5786,7 +5763,7 @@ impl ConnectionManager { Some(json) => { let manifest = if let Ok(am) = serde_json::from_str::(&json) { if am.updated_at > payload.current_updated_at { - let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0); + let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { None } } else { None }; @@ -7758,8 +7735,8 @@ impl ConnectionActor { if s.get_post_with_visibility(post_id).ok().flatten().is_some() { post_holder = Some(ctx.our_node_id); } else { - let downstream = s.get_post_downstream(post_id).unwrap_or_default(); - if !downstream.is_empty() { post_holder = Some(downstream[0]); } + let holders = s.get_file_holders(post_id).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); } } } @@ -7769,8 +7746,8 @@ impl ConnectionActor { } else { let s = ctx.storage.get().await; if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) { - let downstream = s.get_post_downstream(&pid).unwrap_or_default(); - if !downstream.is_empty() { blob_holder = Some(downstream[0]); } + let holders = s.get_file_holders(&pid).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); } } } } diff --git a/crates/core/src/http.rs b/crates/core/src/http.rs index 42fbbe6..d79fcb1 100644 --- a/crates/core/src/http.rs +++ b/crates/core/src/http.rs @@ -378,7 +378,11 @@ async fn try_redirect( Ok(Some((_, PostVisibility::Public))) => {} _ => return false, // not found or not public — hard close } - store.get_post_downstream(post_id).unwrap_or_default() + store.get_file_holders(post_id) + .unwrap_or_default() + .into_iter() + .map(|(nid, _addrs)| nid) + .collect::>() }; // Get addresses for downstream peers diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 835713f..7b3a11a 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -1015,15 +1015,16 @@ impl Network { sent } - /// Push updated manifests to all downstream peers for a given CID. + /// Push an updated manifest to all known holders of the file (flat set, + /// up to 5 most-recent). Replaces the legacy downstream-tree push. pub async fn push_manifest_to_downstream( &self, cid: &[u8; 32], manifest: &crate::types::CdnManifest, ) -> usize { - let downstream = { + let holders = { let storage = self.storage.get().await; - storage.get_blob_downstream(cid).unwrap_or_default() + storage.get_file_holders(cid).unwrap_or_default() }; let payload = crate::protocol::ManifestPushPayload { manifests: vec![crate::protocol::ManifestPushEntry { @@ -1032,15 +1033,14 @@ impl Network { }], }; let mut sent = 0; - for (ds_nid, ds_addrs) in &downstream { - if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { + for (peer, peer_addrs) in &holders { + if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() { sent += 1; - // We pushed this file's manifest → downstream peer now holds it. let storage = self.storage.get().await; let _ = storage.touch_file_holder( cid, - ds_nid, - ds_addrs, + peer, + peer_addrs, crate::storage::HolderDirection::Sent, ); } @@ -1048,46 +1048,25 @@ impl Network { sent } - /// Send blob delete notices to downstream and upstream peers. - /// Downstream peers receive our upstream info for tree healing. - /// Upstream peers receive no upstream info (just "remove me as downstream"). + /// Send blob delete notices to all known holders of a file. + /// Second argument kept as Option for signature stability; flat-holder + /// model doesn't need separate upstream handling. pub async fn send_blob_delete_notices( &self, cid: &[u8; 32], - downstream: &[(NodeId, Vec)], - upstream: Option<&(NodeId, Vec)>, + holders: &[(NodeId, Vec)], + _legacy_upstream: Option<&(NodeId, Vec)>, ) -> usize { - let upstream_info = upstream.map(|(nid, addrs)| { - crate::types::PeerWithAddress { - n: hex::encode(nid), - a: addrs.clone(), - } - }); - - let mut sent = 0; - - // Notify downstream (with upstream info for tree healing) - let ds_payload = crate::protocol::BlobDeleteNoticePayload { + let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, - upstream_node: upstream_info, + upstream_node: None, }; - for (ds_nid, _) in downstream { - if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() { + let mut sent = 0; + for (peer, _addrs) in holders { + if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() { sent += 1; } } - - // Notify upstream (no upstream info) - if let Some((up_nid, _)) = upstream { - let up_payload = crate::protocol::BlobDeleteNoticePayload { - cid: *cid, - upstream_node: None, - }; - if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() { - sent += 1; - } - } - sent } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index c072d15..3fafe20 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1384,16 +1384,17 @@ impl Node { // Collect redirect peers from responses in case we need them later let mut redirect_peers: Vec = Vec::new(); - // 2. Try existing upstream (if we previously fetched this blob) - let upstream = { + // 2. Try known holders (up to 5 most-recent peers we've interacted + // with about this file). + let known_holders = { let storage = self.storage.get().await; - storage.get_blob_upstream(cid)? + storage.get_file_holders(cid).unwrap_or_default() }; - if let Some((upstream_nid, _upstream_addrs)) = upstream { - match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await { + for (holder_nid, _addrs) in &known_holders { + match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} - Err(e) => warn!(error = %e, "blob fetch from upstream failed"), + Err(e) => warn!(error = %e, "blob fetch from known holder failed"), } } @@ -1992,14 +1993,13 @@ impl Node { signature, }; - // Collect blob CIDs + CDN peers before cleanup + // Collect blob CIDs + known holders before cleanup (for delete notices) let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = { let storage = self.storage.get().await; let cids = storage.get_blobs_for_post(post_id).unwrap_or_default(); cids.into_iter().map(|cid| { - let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); - let upstream = storage.get_blob_upstream(&cid).ok().flatten(); - (cid, downstream, upstream) + let holders = storage.get_file_holders(&cid).unwrap_or_default(); + (cid, holders, None::<(NodeId, Vec)>) }).collect() }; @@ -3119,10 +3119,10 @@ impl Node { &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at, ); - // Relay to our downstream - let downstream = s.get_blob_downstream(cid).unwrap_or_default(); + // Relay to known holders (flat set) + let holders = s.get_file_holders(cid).unwrap_or_default(); drop(s); - if !downstream.is_empty() { + if !holders.is_empty() { network.push_manifest_to_downstream(cid, &cdn_manifest).await; } tracing::debug!( @@ -3286,18 +3286,16 @@ impl Node { compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms) } - /// Delete a blob with CDN notifications to upstream/downstream. + /// Delete a blob with CDN notifications to known holders. pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - // Gather CDN peers before cleanup - let (downstream, upstream) = { + // Gather known holders before cleanup + let holders = { let storage = self.storage.get().await; - let ds = storage.get_blob_downstream(cid).unwrap_or_default(); - let up = storage.get_blob_upstream(cid).ok().flatten(); - (ds, up) + storage.get_file_holders(cid).unwrap_or_default() }; - // Send CDN delete notices - self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await; + // Send CDN delete notices to all holders + self.network.send_blob_delete_notices(cid, &holders, None).await; // Clean up local storage { @@ -4330,10 +4328,10 @@ impl Node { } }; - // Filter to under-replicated (< 2 downstream) + // Filter to under-replicated (< 2 holders) let mut needs_replication = Vec::new(); for pid in &recent_ids { - match storage.get_post_downstream_count(pid) { + match storage.get_file_holder_count(pid) { Ok(count) if count < 2 => { needs_replication.push(*pid); } diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 8b8ba25..171c7e5 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -4470,6 +4470,14 @@ impl Storage { Ok(()) } + /// Count file holders (bounded at 5 by touch_file_holder's LRU cap). + pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result { + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM file_holders WHERE file_id = ?1", + )?.query_row(params![file_id.as_slice()], |row| row.get(0))?; + Ok(count as u32) + } + /// Return the up-to-5 most recently interacted holders of a file. pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result)>> { let mut stmt = self.conn.prepare( @@ -4504,6 +4512,15 @@ impl Storage { Ok(()) } + /// Remove a single peer's holder entry for a file. + pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", + params![file_id.as_slice(), peer_id.as_slice()], + )?; + Ok(()) + } + /// One-time migration: seed file_holders from the legacy upstream/downstream /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the diff --git a/crates/core/src/web.rs b/crates/core/src/web.rs index b9fa902..c946abb 100644 --- a/crates/core/src/web.rs +++ b/crates/core/src/web.rs @@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse if let Some(author) = author_id { holders.push(author); } - if let Ok(downstream) = store.get_post_downstream(&post_id) { - for peer in downstream { + if let Ok(file_holders) = store.get_file_holders(&post_id) { + for (peer, _addrs) in file_holders { if !holders.contains(&peer) { holders.push(peer); } From 5d9ba224279c3452f218bffeb70f75d92ee7e791 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 21:42:15 -0400 Subject: [PATCH 6/6] Phase 2e (0.6.1-beta): drop legacy upstream/downstream tables The file_holders table is now the only tracker of per-file peer relationships. post_upstream, post_downstream, blob_upstream, and blob_downstream are dropped at first launch after the seed migration copies any existing entries. Schema: - DROP TABLE IF EXISTS on all four legacy tables after seeding - Seed migration guards with sqlite_master table_exists check so fresh installs don't crash trying to read non-existent sources - Remove CREATE TABLE statements for the four tables from init - Remove Protocol v4 Phase 6 post_upstream priority migration (dead) - Remove blob_upstream preferred_tree column migration (dead) Rust: - Remove add/get/remove post_upstream, post_downstream, blob_upstream, blob_downstream methods - Remove get_blob_upstream_preferred_tree / update variant - Rewrite get_eviction_candidates's downstream_count subquery to count file_holders entries - Rewrite apply_delete's cascade cleanup to clear file_holders instead of post_upstream/post_downstream - cleanup_cdn_for_blob now clears file_holders for the CID Callers: - All dual-write sites in connection.rs and node.rs now do touch_file_holder only (legacy writes removed) - get_stale_manifests replaced with get_stale_manifest_cids; caller in node.rs picks a refresh source from file_holders Tests: - Remove blob_upstream_crud, blob_downstream_crud_and_limit, blob_upstream_preferred_tree, remove_blob_upstream, post_downstream_crud - Add file_holders_lru_cap and file_holders_direction_promotion tests All 110 core tests passing. Workspace compiles clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 17 +- crates/core/src/node.rs | 27 +- crates/core/src/storage.rs | 572 ++++++---------------------------- 3 files changed, 112 insertions(+), 504 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index ccfbf90..7d97a7a 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1393,8 +1393,6 @@ impl ConnectionManager { { let s = storage.get().await; for pid in &new_post_ids { - let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = s.add_post_upstream(pid, peer_id, prio); let _ = s.touch_file_holder( pid, peer_id, @@ -1946,8 +1944,6 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(pid, from, prio); let _ = storage.touch_file_holder( pid, from, @@ -2046,8 +2042,6 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(pid, peer_id, prio); let _ = storage.touch_file_holder( pid, peer_id, @@ -4984,8 +4978,6 @@ impl ConnectionManager { &push.post.post, &push.post.visibility, ); - let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); let _ = storage.touch_file_holder( &push.post.id, &remote_node_id, @@ -5205,8 +5197,6 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { - let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); let _ = storage.touch_file_holder( &sync_post.id, &sender_id, @@ -5454,7 +5444,6 @@ impl ConnectionManager { let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?; let cm = conn_mgr.lock().await; let storage = cm.storage.get().await; - let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); let _ = storage.touch_file_holder( &payload.post_id, &remote_node_id, @@ -6108,9 +6097,8 @@ impl ConnectionManager { to_pull.push(*pid); } - // Register as downstream for all accepted posts + // Register as holder for all accepted posts for pid in &acc { - let _ = storage.add_post_downstream(pid, &remote_node_id); let _ = storage.touch_file_holder( pid, &remote_node_id, @@ -6167,8 +6155,6 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); - let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&sp.id, &sender, prio); let _ = storage.touch_file_holder( &sp.id, &sender, @@ -6201,7 +6187,6 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); - let _ = storage.add_post_upstream(&att.cid, &sender, 0); let _ = storage.touch_file_holder( &att.cid, &sender, diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 3fafe20..85c7606 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1350,7 +1350,6 @@ impl Node { let source_addrs: Vec = response.manifest.as_ref() .map(|m| m.host_addresses.clone()) .unwrap_or_default(); - let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); let _ = storage.touch_file_holder( cid, from_peer, @@ -1419,7 +1418,6 @@ impl Node { let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at); } } - let _ = storage.store_blob_upstream(cid, &lateral, &[]); let _ = storage.touch_file_holder( cid, &lateral, @@ -3095,20 +3093,27 @@ impl Node { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 - max_age_ms; - let stale = { + let stale_cids = { let s = storage.get().await; - s.get_stale_manifests(cutoff).unwrap_or_default() + s.get_stale_manifest_cids(cutoff).unwrap_or_default() }; - for (cid, upstream_nid, _upstream_addrs) in &stale { - // Get current updated_at for this manifest - let current_updated_at = { + for cid in &stale_cids { + // Get current updated_at + pick a holder to refresh from + let (current_updated_at, refresh_source) = { let s = storage.get().await; - s.get_cdn_manifest(cid).ok().flatten() + let updated_at = s.get_cdn_manifest(cid).ok().flatten() .and_then(|json| serde_json::from_str::(&json).ok()) .map(|m| m.updated_at) - .unwrap_or(0) + .unwrap_or(0); + let source = s.get_file_holders(cid) + .unwrap_or_default() + .into_iter() + .next() + .map(|(nid, _)| nid); + (updated_at, source) }; - match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await { + let Some(upstream_nid) = refresh_source else { continue; }; + match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await { Ok(Some(cdn_manifest)) => { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); @@ -3135,7 +3140,7 @@ impl Node { Err(e) => { tracing::debug!( cid = hex::encode(cid), - upstream = hex::encode(upstream_nid), + upstream = hex::encode(&upstream_nid), error = %e, "Manifest refresh from upstream failed" ); diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 171c7e5..31434bb 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -282,20 +282,6 @@ impl Storage { updated_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author); - CREATE TABLE IF NOT EXISTS blob_upstream ( - cid BLOB PRIMARY KEY, - source_node_id BLOB NOT NULL, - source_addresses TEXT NOT NULL DEFAULT '[]', - stored_at INTEGER NOT NULL - ); - CREATE TABLE IF NOT EXISTS blob_downstream ( - cid BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - peer_addresses TEXT NOT NULL DEFAULT '[]', - registered_at INTEGER NOT NULL, - PRIMARY KEY (cid, peer_node_id) - ); - CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid); CREATE TABLE IF NOT EXISTS group_keys ( group_id BLOB PRIMARY KEY, circle_name TEXT NOT NULL, @@ -346,17 +332,6 @@ impl Storage { last_seen_ms INTEGER NOT NULL, success_count INTEGER NOT NULL DEFAULT 0 ); - CREATE TABLE IF NOT EXISTS post_downstream ( - post_id BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - registered_at INTEGER NOT NULL, - PRIMARY KEY (post_id, peer_node_id) - ); - CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id); - CREATE TABLE IF NOT EXISTS post_upstream ( - post_id BLOB PRIMARY KEY, - peer_node_id BLOB NOT NULL - ); CREATE TABLE IF NOT EXISTS blob_headers ( post_id BLOB PRIMARY KEY, author BLOB NOT NULL, @@ -573,16 +548,6 @@ impl Storage { )?; } - // Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration) - let has_blob_pref_tree = self.conn.prepare( - "SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'" - )?.query_row([], |row| row.get::<_, i64>(0))?; - if has_blob_pref_tree == 0 { - self.conn.execute_batch( - "ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';" - )?; - } - // Add public_visible column to profiles if missing (Phase D-4 migration) let has_public_visible = self.conn.prepare( "SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'" @@ -696,31 +661,19 @@ impl Storage { )?; } - // Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max) - let has_priority = self.conn.prepare( - "SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'" - )?.query_row([], |row| row.get::<_, i64>(0))?; - if has_priority == 0 { - self.conn.execute_batch( - "ALTER TABLE post_upstream RENAME TO post_upstream_old; - CREATE TABLE post_upstream ( - post_id BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - priority INTEGER NOT NULL DEFAULT 0, - registered_at INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (post_id, peer_node_id) - ); - INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at) - SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old; - DROP TABLE post_upstream_old;" - )?; - } - // 0.6.1-beta: seed file_holders from legacy upstream/downstream tables // before they're dropped. Idempotent — only fires on an empty // file_holders table. self.seed_file_holders_from_legacy()?; + // 0.6.1-beta: drop legacy directional tables — replaced by file_holders. + self.conn.execute_batch( + "DROP TABLE IF EXISTS blob_upstream; + DROP TABLE IF EXISTS blob_downstream; + DROP TABLE IF EXISTS post_upstream; + DROP TABLE IF EXISTS post_downstream;", + )?; + Ok(()) } @@ -2431,8 +2384,7 @@ impl Storage { params![record.post_id.as_slice(), record.author.as_slice()], )?; if deleted > 0 { - self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; - self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; + self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?; } Ok(deleted > 0) @@ -3431,28 +3383,6 @@ impl Storage { Ok(()) } - /// Update the preferred_tree JSON for a blob upstream entry. - pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> { - let json = serde_json::to_string( - &tree.iter().map(hex::encode).collect::>() - )?; - self.conn.execute( - "UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2", - params![json, cid.as_slice()], - )?; - Ok(()) - } - - /// Get the preferred_tree for a blob upstream entry. - pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result> { - let json: String = self.conn.query_row( - "SELECT preferred_tree FROM blob_upstream WHERE cid = ?1", - params![cid.as_slice()], - |row| row.get(0), - ).unwrap_or_else(|_| "[]".to_string()); - Ok(parse_anchors_json(&json)) - } - // ---- Social Routes ---- /// Insert or update a social route entry. @@ -3924,10 +3854,10 @@ impl Storage { GROUP BY post_id ) r ON b.post_id = r.post_id LEFT JOIN ( - SELECT cid, COUNT(*) as ds_count - FROM blob_downstream - GROUP BY cid - ) d ON b.cid = d.cid" + SELECT file_id, COUNT(*) as ds_count + FROM file_holders + GROUP BY file_id + ) d ON b.cid = d.file_id" )?; let rows = stmt.query_map(params![cutoff], |row| { let cid_bytes: Vec = row.get(0)?; @@ -3981,11 +3911,10 @@ impl Storage { Ok(count as u64) } - /// Clean up all CDN metadata for a blob (manifests + upstream + downstream). + /// Clean up all CDN metadata for a blob (manifests + file_holders). pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> { self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?; + self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?; Ok(()) } @@ -4004,12 +3933,6 @@ impl Storage { Ok(cids) } - /// Remove upstream tracking for a blob CID. - pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; - Ok(()) - } - pub fn post_count(&self) -> anyhow::Result { let count: i64 = self .conn @@ -4072,137 +3995,24 @@ impl Storage { Ok(result) } - /// Record the upstream source for a blob CID. - pub fn store_blob_upstream( - &self, - cid: &[u8; 32], - source_node_id: &NodeId, - source_addresses: &[String], - ) -> anyhow::Result<()> { - let addrs_json = serde_json::to_string(source_addresses)?; - self.conn.execute( - "INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4", - params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()], - )?; - Ok(()) - } - - /// Get the upstream source for a blob CID: (node_id, addresses). - pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { - let result = self.conn.query_row( - "SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1", - params![cid.as_slice()], - |row| { - let nid_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((nid_bytes, addrs_json)) - }, - ); - match result { - Ok((nid_bytes, addrs_json)) => { - let nid = blob_to_nodeid(nid_bytes)?; - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - Ok(Some((nid, addrs))) - } - Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(e) => Err(e.into()), - } - } - - /// Register a downstream peer for a blob CID. Returns false if already at 100 downstream. - pub fn add_blob_downstream( - &self, - cid: &[u8; 32], - peer_node_id: &NodeId, - peer_addresses: &[String], - ) -> anyhow::Result { - let count = self.get_blob_downstream_count(cid)?; - if count >= 100 { - return Ok(false); - } - let addrs_json = serde_json::to_string(peer_addresses)?; - self.conn.execute( - "INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4", - params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()], - )?; - Ok(true) - } - - /// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>. - pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { + /// Get CIDs of manifests older than a cutoff. Callers look up holders + /// via file_holders to pick a refresh source. + pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result> { let mut stmt = self.conn.prepare( - "SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1" - )?; - let rows = stmt.query_map(params![cid.as_slice()], |row| { - let nid_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((nid_bytes, addrs_json)) - })?; - let mut result = Vec::new(); - for row in rows { - let (nid_bytes, addrs_json) = row?; - let nid = blob_to_nodeid(nid_bytes)?; - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - result.push((nid, addrs)); - } - Ok(result) - } - - /// Count downstream peers for a blob CID. - pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result { - let count: i64 = self.conn.query_row( - "SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1", - params![cid.as_slice()], - |row| row.get(0), - )?; - Ok(count as u32) - } - - /// Remove a downstream peer for a blob CID. - pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2", - params![cid.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>. - pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result)>> { - let mut stmt = self.conn.prepare( - "SELECT m.cid, u.source_node_id, u.source_addresses - FROM cdn_manifests m - LEFT JOIN blob_upstream u ON m.cid = u.cid - WHERE m.updated_at < ?1" + "SELECT cid FROM cdn_manifests WHERE updated_at < ?1", )?; let rows = stmt.query_map(params![older_than_ms as i64], |row| { let cid_bytes: Vec = row.get(0)?; - let nid_bytes: Option> = row.get(1)?; - let addrs_json: Option = row.get(2)?; - Ok((cid_bytes, nid_bytes, addrs_json)) + Ok(cid_bytes) })?; - let mut result = Vec::new(); + let mut out = Vec::new(); for row in rows { - let (cid_bytes, nid_bytes, addrs_json) = row?; - let cid: [u8; 32] = match cid_bytes.try_into() { - Ok(c) => c, - Err(_) => continue, - }; - let nid = match nid_bytes { - Some(b) => match blob_to_nodeid(b) { - Ok(n) => n, - Err(_) => continue, - }, - None => continue, - }; - let addrs: Vec = addrs_json - .map(|j| serde_json::from_str(&j).unwrap_or_default()) - .unwrap_or_default(); - result.push((cid, nid, addrs)); + let cid_bytes = row?; + if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) { + out.push(cid); + } } - Ok(result) + Ok(out) } /// Get the 10 posts before and 10 posts after a reference timestamp for an author. @@ -4306,128 +4116,6 @@ impl Storage { Ok(result) } - // --- Engagement: post_downstream --- - - /// Register a peer as downstream for a post (max 100 per post). - /// Returns true if added, false if at capacity. - pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result { - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - if count >= 100 { - return Ok(false); - } - self.conn.execute( - "INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3) - ON CONFLICT DO NOTHING", - params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()], - )?; - Ok(true) - } - - /// Get all downstream peers for a post. - pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result> { - let mut stmt = self.conn.prepare( - "SELECT peer_node_id FROM post_downstream WHERE post_id = ?1" - )?; - let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec>(0))?; - let mut result = Vec::new(); - for row in rows { - if let Ok(nid) = blob_to_nodeid(row?) { - result.push(nid); - } - } - Ok(result) - } - - /// Remove a downstream peer for a post. - pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - // --- Engagement: post_upstream (multi-upstream, 3 max) --- - - /// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post. - pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> { - // Check current count - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - if count >= 3 { - return Ok(()); // Already at cap - } - let now = now_ms(); - self.conn.execute( - "INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at) - VALUES (?1, ?2, ?3, ?4)", - params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now], - )?; - Ok(()) - } - - /// Get all upstream peers for a post, ordered by priority ASC (0 = primary). - pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result> { - let mut stmt = self.conn.prepare( - "SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC" - )?; - let rows = stmt.query_map(params![post_id.as_slice()], |row| { - let bytes: Vec = row.get(0)?; - let prio: i64 = row.get(1)?; - Ok((bytes, prio as u8)) - })?; - let mut result = Vec::new(); - for row in rows { - let (bytes, prio) = row?; - if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) { - result.push((nid, prio)); - } - } - Ok(result) - } - - /// Get the primary (lowest priority) upstream peer for a post. - /// Backward-compatible wrapper for code that only needs a single upstream. - pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result> { - let upstreams = self.get_post_upstreams(post_id)?; - Ok(upstreams.into_iter().next().map(|(nid, _)| nid)) - } - - /// Remove a specific upstream peer for a post. - pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Promote an upstream peer to primary (priority 0), pushing others up. - pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - // Shift all priorities up by 1 - self.conn.execute( - "UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1", - params![post_id.as_slice()], - )?; - // Set the promoted peer to priority 0 - self.conn.execute( - "UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Count downstream peers for a post. - pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result { - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - Ok(count as u32) - } - // --- File holders (flat, per-file, LRU-capped at 5) --- // // A single table for PostId-keyed engagement propagation and CID-keyed @@ -4524,7 +4212,7 @@ impl Storage { /// One-time migration: seed file_holders from the legacy upstream/downstream /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the - /// PRIMARY KEY. + /// PRIMARY KEY. Skips tables that don't exist on fresh installs. pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { // Skip if file_holders already populated (idempotent re-run protection). let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? @@ -4533,30 +4221,40 @@ impl Storage { return Ok(()); } let now = now_ms() as i64; - // post_upstream → holders we received engagement diffs from - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", - params![now], - )?; - // post_downstream → holders we sent engagement diffs to - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", - params![now], - )?; - // blob_upstream → peer we fetched the blob/manifest from - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", - params![now], - )?; - // blob_downstream → peers we served the blob/manifest to - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", - params![now], - )?; + let table_exists = |name: &str| -> anyhow::Result { + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", + )?.query_row(params![name], |row| row.get(0))?; + Ok(count > 0) + }; + if table_exists("post_upstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", + params![now], + )?; + } + if table_exists("post_downstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", + params![now], + )?; + } + if table_exists("blob_upstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", + params![now], + )?; + } + if table_exists("blob_downstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", + params![now], + )?; + } Ok(()) } @@ -5468,60 +5166,6 @@ mod tests { assert_eq!(manifests[0].0, cid); } - #[test] - fn blob_upstream_crud() { - let s = temp_storage(); - let cid = [42u8; 32]; - let source = make_node_id(1); - let addrs = vec!["10.0.0.1:4433".to_string()]; - - s.store_blob_upstream(&cid, &source, &addrs).unwrap(); - let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap(); - assert_eq!(nid, source); - assert_eq!(got_addrs, addrs); - - // Missing - assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none()); - - // Update - let source2 = make_node_id(2); - s.store_blob_upstream(&cid, &source2, &[]).unwrap(); - let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap(); - assert_eq!(nid, source2); - } - - #[test] - fn blob_downstream_crud_and_limit() { - let s = temp_storage(); - let cid = [42u8; 32]; - - // Add downstream peers - for i in 0..100u8 { - let peer = make_node_id(i); - let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap(); - assert!(ok, "should accept peer {}", i); - } - - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100); - - // 101st should be rejected - let peer_101 = make_node_id(200); - let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); - assert!(!ok, "should reject 101st downstream"); - - // Get all downstream - let downstream = s.get_blob_downstream(&cid).unwrap(); - assert_eq!(downstream.len(), 100); - - // Remove one - s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap(); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99); - - // Now adding one more should work - let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); - assert!(ok, "should accept after removal"); - } - #[test] fn blob_pin_unpin() { let s = temp_storage(); @@ -5605,18 +5249,15 @@ mod tests { let peer = make_node_id(2); s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap(); - s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); - s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap(); + s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_some()); - assert!(s.get_blob_upstream(&cid).unwrap().is_some()); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1); + assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1); s.cleanup_cdn_for_blob(&cid).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_none()); - assert!(s.get_blob_upstream(&cid).unwrap().is_none()); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0); + assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0); } #[test] @@ -5636,18 +5277,6 @@ mod tests { assert!(cids.contains(&cid2)); } - #[test] - fn remove_blob_upstream() { - let s = temp_storage(); - let cid = [42u8; 32]; - let peer = make_node_id(1); - - s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); - assert!(s.get_blob_upstream(&cid).unwrap().is_some()); - - s.remove_blob_upstream(&cid).unwrap(); - assert!(s.get_blob_upstream(&cid).unwrap().is_none()); - } #[test] fn author_post_neighborhood() { @@ -6007,24 +5636,6 @@ mod tests { assert_eq!(got2.preferred_tree.len(), 2); } - #[test] - fn blob_upstream_preferred_tree() { - let s = temp_storage(); - let cid = [42u8; 32]; - let source = make_node_id(1); - s.store_blob_upstream(&cid, &source, &[]).unwrap(); - - // Initially empty - let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap(); - assert!(tree.is_empty()); - - // Update - let nodes = vec![make_node_id(10), make_node_id(11)]; - s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap(); - let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap(); - assert_eq!(tree2.len(), 2); - } - // ---- Circle Profile tests ---- #[test] @@ -6225,32 +5836,39 @@ mod tests { // --- Engagement tests --- #[test] - fn post_downstream_crud() { + fn file_holders_lru_cap() { let s = temp_storage(); - let post_id = make_post_id(1); - let peer1 = make_node_id(1); - let peer2 = make_node_id(2); - - // Add downstream peers - assert!(s.add_post_downstream(&post_id, &peer1).unwrap()); - assert!(s.add_post_downstream(&post_id, &peer2).unwrap()); - - let downstream = s.get_post_downstream(&post_id).unwrap(); - assert_eq!(downstream.len(), 2); - assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2); - - // Remove one - s.remove_post_downstream(&post_id, &peer1).unwrap(); - assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1); - - // Capacity limit - let big_post = make_post_id(99); - for i in 0..100u8 { - assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap()); + let file = [42u8; 32]; + // Sleep between inserts so last_interaction_ms actually differs (ms resolution). + for i in 0..7u8 { + s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(2)); } - assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100); - // 101st should fail - assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap()); + // Only 5 most-recent survive + assert_eq!(s.get_file_holder_count(&file).unwrap(), 5); + let holders = s.get_file_holders(&file).unwrap(); + assert_eq!(holders.len(), 5); + let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect(); + // Oldest two (i=0, i=1) got evicted; most recent (i=6) survives + assert!(!kept.contains(&make_node_id(0))); + assert!(!kept.contains(&make_node_id(1))); + assert!(kept.contains(&make_node_id(6))); + } + + #[test] + fn file_holders_direction_promotion() { + let s = temp_storage(); + let file = [42u8; 32]; + let peer = make_node_id(1); + s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap(); + s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap(); + // Re-insert with opposite direction should promote to "both" + let dir: String = s.conn.query_row( + "SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", + rusqlite::params![file.as_slice(), peer.as_slice()], + |row| row.get(0), + ).unwrap(); + assert_eq!(dir, "both"); } #[test]