diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 67656cf..ccfbf90 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -2828,13 +2828,9 @@ impl ConnectionManager { if store.get_post_with_visibility(post_id).ok().flatten().is_some() { Some(self.our_node_id) } else { - // CDN tree: do any of our downstream hosts have it? - let downstream = store.get_post_downstream(post_id).unwrap_or_default(); - if !downstream.is_empty() { - Some(downstream[0]) - } else { - None - } + // Any known holder of this post? + let holders = store.get_file_holders(post_id).unwrap_or_default(); + holders.first().map(|(nid, _)| *nid) } }; post_holder = found; @@ -2848,9 +2844,9 @@ impl ConnectionManager { // Check CDN: do we know who has it via blob post ownership? let store = self.storage.get().await; if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) { - let downstream = store.get_post_downstream(&pid).unwrap_or_default(); - if !downstream.is_empty() { - blob_holder = Some(downstream[0]); + let holders = store.get_file_holders(&pid).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { + blob_holder = Some(*nid); } } } @@ -4889,7 +4885,7 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; // Collect blob CIDs + CDN peers before async work - let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = Vec::new(); + let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>)> = Vec::new(); { let storage = cm.storage.get().await; for dr in &payload.records { @@ -4897,9 +4893,8 @@ impl ConnectionManager { // Collect blobs for CDN cleanup before deleting let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default(); for cid in blob_cids { - let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); - let upstream = storage.get_blob_upstream(&cid).ok().flatten(); - blob_cleanup.push((cid, downstream, upstream)); + let holders = storage.get_file_holders(&cid).unwrap_or_default(); + blob_cleanup.push((cid, holders)); } let _ = storage.store_delete(dr); let _ = storage.apply_delete(dr); @@ -4915,18 +4910,11 @@ impl ConnectionManager { // Gather connections for CDN delete notices under lock, then send outside let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new(); - for (cid, downstream, upstream) in &blob_cleanup { - let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() }); - let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info }; - for (ds_nid, _) in downstream { - if let Some(pc) = cm.connections_ref().get(ds_nid) { - delete_notices.push((pc.connection.clone(), ds_payload.clone())); - } - } - if let Some((up_nid, _)) = upstream { - let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; - if let Some(pc) = cm.connections_ref().get(up_nid) { - delete_notices.push((pc.connection.clone(), up_payload)); + for (cid, holders) in &blob_cleanup { + let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; + for (peer, _addrs) in holders { + if let Some(pc) = cm.connections_ref().get(peer) { + delete_notices.push((pc.connection.clone(), payload.clone())); } } } @@ -5106,15 +5094,15 @@ impl ConnectionManager { ); stored_entries.push(entry.clone()); } - // Gather downstream peers for relay before dropping locks + // Gather file holders for relay before dropping locks let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new(); for entry in &stored_entries { - let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default(); - for (ds_nid, _) in downstream { - if ds_nid == remote_node_id { + let holders = storage.get_file_holders(&entry.cid).unwrap_or_default(); + for (peer, _addrs) in holders { + if peer == remote_node_id { continue; } - relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload { + relay_targets.push((peer, crate::protocol::ManifestPushPayload { manifests: vec![entry.clone()], })); } @@ -5315,32 +5303,14 @@ impl ConnectionManager { let storage = cm.storage.get().await; let cid = payload.cid; - // Check if sender was our upstream for this blob - let was_upstream = storage.get_blob_upstream(&cid).ok().flatten() - .map(|(nid, _)| nid == remote_node_id) - .unwrap_or(false); - - if was_upstream { - // Sender was our upstream — clear it - let _ = storage.remove_blob_upstream(&cid); - - // If they provided their upstream, store it as our new upstream - if let Some(ref new_up) = payload.upstream_node { - if let Ok(nid_bytes) = hex::decode(&new_up.n) { - if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { - let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a); - } - } - } - } else { - // Sender was our downstream — remove them - let _ = storage.remove_blob_downstream(&cid, &remote_node_id); - } + // Flat-holder model: drop the sender as a holder of this file. + // The author's DeleteRecord (separate signed message) is what + // triggers the actual blob removal for followers. + let _ = storage.remove_file_holder(&cid, &remote_node_id); info!( peer = hex::encode(remote_node_id), cid = hex::encode(cid), - was_upstream, "Received blob delete notice" ); } @@ -5745,21 +5715,28 @@ impl ConnectionManager { let storage = storage.get().await; let manifest: Option = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| { if let Ok(am) = serde_json::from_str::(&json) { - let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0); + let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { serde_json::from_str(&json).ok() } }); let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { - let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); + let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); let _ = storage.touch_file_holder( &payload.cid, &remote_node_id, &payload.requester_addresses, crate::storage::HolderDirection::Sent, ); - if ok { (true, vec![]) } else { - let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); - let redirects: Vec = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); + // If we already had 5 holders before adding this one, the + // requester should consult them too for CDN lookups. + if prior_count < 5 { + (true, vec![]) + } else { + let holders = storage.get_file_holders(&payload.cid).unwrap_or_default(); + let redirects: Vec = holders.into_iter() + .filter(|(nid, _)| *nid != remote_node_id) + .map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }) + .collect(); (false, redirects) } } else { (false, vec![]) }; @@ -5786,7 +5763,7 @@ impl ConnectionManager { Some(json) => { let manifest = if let Ok(am) = serde_json::from_str::(&json) { if am.updated_at > payload.current_updated_at { - let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0); + let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { None } } else { None }; @@ -7758,8 +7735,8 @@ impl ConnectionActor { if s.get_post_with_visibility(post_id).ok().flatten().is_some() { post_holder = Some(ctx.our_node_id); } else { - let downstream = s.get_post_downstream(post_id).unwrap_or_default(); - if !downstream.is_empty() { post_holder = Some(downstream[0]); } + let holders = s.get_file_holders(post_id).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); } } } @@ -7769,8 +7746,8 @@ impl ConnectionActor { } else { let s = ctx.storage.get().await; if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) { - let downstream = s.get_post_downstream(&pid).unwrap_or_default(); - if !downstream.is_empty() { blob_holder = Some(downstream[0]); } + let holders = s.get_file_holders(&pid).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); } } } } diff --git a/crates/core/src/http.rs b/crates/core/src/http.rs index 42fbbe6..d79fcb1 100644 --- a/crates/core/src/http.rs +++ b/crates/core/src/http.rs @@ -378,7 +378,11 @@ async fn try_redirect( Ok(Some((_, PostVisibility::Public))) => {} _ => return false, // not found or not public — hard close } - store.get_post_downstream(post_id).unwrap_or_default() + store.get_file_holders(post_id) + .unwrap_or_default() + .into_iter() + .map(|(nid, _addrs)| nid) + .collect::>() }; // Get addresses for downstream peers diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 835713f..7b3a11a 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -1015,15 +1015,16 @@ impl Network { sent } - /// Push updated manifests to all downstream peers for a given CID. + /// Push an updated manifest to all known holders of the file (flat set, + /// up to 5 most-recent). Replaces the legacy downstream-tree push. pub async fn push_manifest_to_downstream( &self, cid: &[u8; 32], manifest: &crate::types::CdnManifest, ) -> usize { - let downstream = { + let holders = { let storage = self.storage.get().await; - storage.get_blob_downstream(cid).unwrap_or_default() + storage.get_file_holders(cid).unwrap_or_default() }; let payload = crate::protocol::ManifestPushPayload { manifests: vec![crate::protocol::ManifestPushEntry { @@ -1032,15 +1033,14 @@ impl Network { }], }; let mut sent = 0; - for (ds_nid, ds_addrs) in &downstream { - if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { + for (peer, peer_addrs) in &holders { + if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() { sent += 1; - // We pushed this file's manifest → downstream peer now holds it. let storage = self.storage.get().await; let _ = storage.touch_file_holder( cid, - ds_nid, - ds_addrs, + peer, + peer_addrs, crate::storage::HolderDirection::Sent, ); } @@ -1048,46 +1048,25 @@ impl Network { sent } - /// Send blob delete notices to downstream and upstream peers. - /// Downstream peers receive our upstream info for tree healing. - /// Upstream peers receive no upstream info (just "remove me as downstream"). + /// Send blob delete notices to all known holders of a file. + /// Second argument kept as Option for signature stability; flat-holder + /// model doesn't need separate upstream handling. pub async fn send_blob_delete_notices( &self, cid: &[u8; 32], - downstream: &[(NodeId, Vec)], - upstream: Option<&(NodeId, Vec)>, + holders: &[(NodeId, Vec)], + _legacy_upstream: Option<&(NodeId, Vec)>, ) -> usize { - let upstream_info = upstream.map(|(nid, addrs)| { - crate::types::PeerWithAddress { - n: hex::encode(nid), - a: addrs.clone(), - } - }); - - let mut sent = 0; - - // Notify downstream (with upstream info for tree healing) - let ds_payload = crate::protocol::BlobDeleteNoticePayload { + let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, - upstream_node: upstream_info, + upstream_node: None, }; - for (ds_nid, _) in downstream { - if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() { + let mut sent = 0; + for (peer, _addrs) in holders { + if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() { sent += 1; } } - - // Notify upstream (no upstream info) - if let Some((up_nid, _)) = upstream { - let up_payload = crate::protocol::BlobDeleteNoticePayload { - cid: *cid, - upstream_node: None, - }; - if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() { - sent += 1; - } - } - sent } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index c072d15..3fafe20 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1384,16 +1384,17 @@ impl Node { // Collect redirect peers from responses in case we need them later let mut redirect_peers: Vec = Vec::new(); - // 2. Try existing upstream (if we previously fetched this blob) - let upstream = { + // 2. Try known holders (up to 5 most-recent peers we've interacted + // with about this file). + let known_holders = { let storage = self.storage.get().await; - storage.get_blob_upstream(cid)? + storage.get_file_holders(cid).unwrap_or_default() }; - if let Some((upstream_nid, _upstream_addrs)) = upstream { - match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await { + for (holder_nid, _addrs) in &known_holders { + match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} - Err(e) => warn!(error = %e, "blob fetch from upstream failed"), + Err(e) => warn!(error = %e, "blob fetch from known holder failed"), } } @@ -1992,14 +1993,13 @@ impl Node { signature, }; - // Collect blob CIDs + CDN peers before cleanup + // Collect blob CIDs + known holders before cleanup (for delete notices) let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = { let storage = self.storage.get().await; let cids = storage.get_blobs_for_post(post_id).unwrap_or_default(); cids.into_iter().map(|cid| { - let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); - let upstream = storage.get_blob_upstream(&cid).ok().flatten(); - (cid, downstream, upstream) + let holders = storage.get_file_holders(&cid).unwrap_or_default(); + (cid, holders, None::<(NodeId, Vec)>) }).collect() }; @@ -3119,10 +3119,10 @@ impl Node { &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at, ); - // Relay to our downstream - let downstream = s.get_blob_downstream(cid).unwrap_or_default(); + // Relay to known holders (flat set) + let holders = s.get_file_holders(cid).unwrap_or_default(); drop(s); - if !downstream.is_empty() { + if !holders.is_empty() { network.push_manifest_to_downstream(cid, &cdn_manifest).await; } tracing::debug!( @@ -3286,18 +3286,16 @@ impl Node { compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms) } - /// Delete a blob with CDN notifications to upstream/downstream. + /// Delete a blob with CDN notifications to known holders. pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - // Gather CDN peers before cleanup - let (downstream, upstream) = { + // Gather known holders before cleanup + let holders = { let storage = self.storage.get().await; - let ds = storage.get_blob_downstream(cid).unwrap_or_default(); - let up = storage.get_blob_upstream(cid).ok().flatten(); - (ds, up) + storage.get_file_holders(cid).unwrap_or_default() }; - // Send CDN delete notices - self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await; + // Send CDN delete notices to all holders + self.network.send_blob_delete_notices(cid, &holders, None).await; // Clean up local storage { @@ -4330,10 +4328,10 @@ impl Node { } }; - // Filter to under-replicated (< 2 downstream) + // Filter to under-replicated (< 2 holders) let mut needs_replication = Vec::new(); for pid in &recent_ids { - match storage.get_post_downstream_count(pid) { + match storage.get_file_holder_count(pid) { Ok(count) if count < 2 => { needs_replication.push(*pid); } diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 8b8ba25..171c7e5 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -4470,6 +4470,14 @@ impl Storage { Ok(()) } + /// Count file holders (bounded at 5 by touch_file_holder's LRU cap). + pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result { + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM file_holders WHERE file_id = ?1", + )?.query_row(params![file_id.as_slice()], |row| row.get(0))?; + Ok(count as u32) + } + /// Return the up-to-5 most recently interacted holders of a file. pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result)>> { let mut stmt = self.conn.prepare( @@ -4504,6 +4512,15 @@ impl Storage { Ok(()) } + /// Remove a single peer's holder entry for a file. + pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", + params![file_id.as_slice(), peer_id.as_slice()], + )?; + Ok(()) + } + /// One-time migration: seed file_holders from the legacy upstream/downstream /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the diff --git a/crates/core/src/web.rs b/crates/core/src/web.rs index b9fa902..c946abb 100644 --- a/crates/core/src/web.rs +++ b/crates/core/src/web.rs @@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse if let Some(author) = author_id { holders.push(author); } - if let Ok(downstream) = store.get_post_downstream(&post_id) { - for peer in downstream { + if let Ok(file_holders) = store.get_file_holders(&post_id) { + for (peer, _addrs) in file_holders { if !holders.contains(&peer) { holders.push(peer); }