diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index bfb0090..7d97a7a 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1393,8 +1393,12 @@ impl ConnectionManager { { let s = storage.get().await; for pid in &new_post_ids { - let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = s.add_post_upstream(pid, peer_id, prio); + let _ = s.touch_file_holder( + pid, + peer_id, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = s.update_follow_last_sync(author, now_ms); @@ -1940,8 +1944,12 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(pid, from, prio); + let _ = storage.touch_file_holder( + pid, + from, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -2034,8 +2042,12 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(pid, peer_id, prio); + let _ = storage.touch_file_holder( + pid, + peer_id, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -2810,13 +2822,9 @@ impl ConnectionManager { if store.get_post_with_visibility(post_id).ok().flatten().is_some() { Some(self.our_node_id) } else { - // CDN tree: do any of our downstream hosts have it? - let downstream = store.get_post_downstream(post_id).unwrap_or_default(); - if !downstream.is_empty() { - Some(downstream[0]) - } else { - None - } + // Any known holder of this post? + let holders = store.get_file_holders(post_id).unwrap_or_default(); + holders.first().map(|(nid, _)| *nid) } }; post_holder = found; @@ -2830,9 +2838,9 @@ impl ConnectionManager { // Check CDN: do we know who has it via blob post ownership? let store = self.storage.get().await; if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) { - let downstream = store.get_post_downstream(&pid).unwrap_or_default(); - if !downstream.is_empty() { - blob_holder = Some(downstream[0]); + let holders = store.get_file_holders(&pid).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { + blob_holder = Some(*nid); } } } @@ -4871,7 +4879,7 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; // Collect blob CIDs + CDN peers before async work - let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = Vec::new(); + let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>)> = Vec::new(); { let storage = cm.storage.get().await; for dr in &payload.records { @@ -4879,9 +4887,8 @@ impl ConnectionManager { // Collect blobs for CDN cleanup before deleting let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default(); for cid in blob_cids { - let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); - let upstream = storage.get_blob_upstream(&cid).ok().flatten(); - blob_cleanup.push((cid, downstream, upstream)); + let holders = storage.get_file_holders(&cid).unwrap_or_default(); + blob_cleanup.push((cid, holders)); } let _ = storage.store_delete(dr); let _ = storage.apply_delete(dr); @@ -4897,18 +4904,11 @@ impl ConnectionManager { // Gather connections for CDN delete notices under lock, then send outside let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new(); - for (cid, downstream, upstream) in &blob_cleanup { - let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() }); - let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info }; - for (ds_nid, _) in downstream { - if let Some(pc) = cm.connections_ref().get(ds_nid) { - delete_notices.push((pc.connection.clone(), ds_payload.clone())); - } - } - if let Some((up_nid, _)) = upstream { - let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; - if let Some(pc) = cm.connections_ref().get(up_nid) { - delete_notices.push((pc.connection.clone(), up_payload)); + for (cid, holders) in &blob_cleanup { + let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; + for (peer, _addrs) in holders { + if let Some(pc) = cm.connections_ref().get(peer) { + delete_notices.push((pc.connection.clone(), payload.clone())); } } } @@ -4958,24 +4958,38 @@ impl ConnectionManager { } MessageType::PostPush => { let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?; - let cm = conn_mgr.lock().await; - let storage = cm.storage.get().await; - if !storage.is_deleted(&push.post.id)? - && storage.get_post(&push.post.id)?.is_none() - && crate::content::verify_post_id(&push.post.id, &push.post.post) - { - let _ = storage.store_post_with_visibility( - &push.post.id, - &push.post.post, - &push.post.visibility, - ); - let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); - info!( + // Encrypted posts are no longer accepted via direct push — they propagate + // via the CDN to eliminate the sender→recipient traffic signal. + if !matches!(push.post.visibility, crate::types::PostVisibility::Public) { + debug!( peer = hex::encode(remote_node_id), post_id = hex::encode(push.post.id), - "Received direct post push" + "Ignoring non-public PostPush" ); + } else { + let cm = conn_mgr.lock().await; + let storage = cm.storage.get().await; + if !storage.is_deleted(&push.post.id)? + && storage.get_post(&push.post.id)?.is_none() + && crate::content::verify_post_id(&push.post.id, &push.post.post) + { + let _ = storage.store_post_with_visibility( + &push.post.id, + &push.post.post, + &push.post.visibility, + ); + let _ = storage.touch_file_holder( + &push.post.id, + &remote_node_id, + &[], + crate::storage::HolderDirection::Received, + ); + info!( + peer = hex::encode(remote_node_id), + post_id = hex::encode(push.post.id), + "Received direct post push" + ); + } } } MessageType::AudienceRequest => { @@ -5063,17 +5077,24 @@ impl ConnectionManager { &entry.manifest.author_manifest.author, entry.manifest.author_manifest.updated_at, ); + // Remote peer pushed us this manifest → they hold the file. + let _ = storage.touch_file_holder( + &entry.cid, + &remote_node_id, + &[], + crate::storage::HolderDirection::Received, + ); stored_entries.push(entry.clone()); } - // Gather downstream peers for relay before dropping locks + // Gather file holders for relay before dropping locks let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new(); for entry in &stored_entries { - let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default(); - for (ds_nid, _) in downstream { - if ds_nid == remote_node_id { + let holders = storage.get_file_holders(&entry.cid).unwrap_or_default(); + for (peer, _addrs) in holders { + if peer == remote_node_id { continue; } - relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload { + relay_targets.push((peer, crate::protocol::ManifestPushPayload { manifests: vec![entry.clone()], })); } @@ -5176,8 +5197,12 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { - let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); + let _ = storage.touch_file_holder( + &sync_post.id, + &sender_id, + &[], + crate::storage::HolderDirection::Received, + ); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -5268,32 +5293,14 @@ impl ConnectionManager { let storage = cm.storage.get().await; let cid = payload.cid; - // Check if sender was our upstream for this blob - let was_upstream = storage.get_blob_upstream(&cid).ok().flatten() - .map(|(nid, _)| nid == remote_node_id) - .unwrap_or(false); - - if was_upstream { - // Sender was our upstream — clear it - let _ = storage.remove_blob_upstream(&cid); - - // If they provided their upstream, store it as our new upstream - if let Some(ref new_up) = payload.upstream_node { - if let Ok(nid_bytes) = hex::decode(&new_up.n) { - if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { - let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a); - } - } - } - } else { - // Sender was our downstream — remove them - let _ = storage.remove_blob_downstream(&cid, &remote_node_id); - } + // Flat-holder model: drop the sender as a holder of this file. + // The author's DeleteRecord (separate signed message) is what + // triggers the actual blob removal for followers. + let _ = storage.remove_file_holder(&cid, &remote_node_id); info!( peer = hex::encode(remote_node_id), cid = hex::encode(cid), - was_upstream, "Received blob delete notice" ); } @@ -5437,7 +5444,12 @@ impl ConnectionManager { let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?; let cm = conn_mgr.lock().await; let storage = cm.storage.get().await; - let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); + let _ = storage.touch_file_holder( + &payload.post_id, + &remote_node_id, + &[], + crate::storage::HolderDirection::Sent, + ); drop(storage); trace!( peer = hex::encode(remote_node_id), @@ -5692,15 +5704,28 @@ impl ConnectionManager { let storage = storage.get().await; let manifest: Option = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| { if let Ok(am) = serde_json::from_str::(&json) { - let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0); + let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { serde_json::from_str(&json).ok() } }); let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { - let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); - if ok { (true, vec![]) } else { - let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); - let redirects: Vec = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); + let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); + let _ = storage.touch_file_holder( + &payload.cid, + &remote_node_id, + &payload.requester_addresses, + crate::storage::HolderDirection::Sent, + ); + // If we already had 5 holders before adding this one, the + // requester should consult them too for CDN lookups. + if prior_count < 5 { + (true, vec![]) + } else { + let holders = storage.get_file_holders(&payload.cid).unwrap_or_default(); + let redirects: Vec = holders.into_iter() + .filter(|(nid, _)| *nid != remote_node_id) + .map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }) + .collect(); (false, redirects) } } else { (false, vec![]) }; @@ -5727,7 +5752,7 @@ impl ConnectionManager { Some(json) => { let manifest = if let Ok(am) = serde_json::from_str::(&json) { if am.updated_at > payload.current_updated_at { - let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0); + let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { None } } else { None }; @@ -6072,9 +6097,14 @@ impl ConnectionManager { to_pull.push(*pid); } - // Register as downstream for all accepted posts + // Register as holder for all accepted posts for pid in &acc { - let _ = storage.add_post_downstream(pid, &remote_node_id); + let _ = storage.touch_file_holder( + pid, + &remote_node_id, + &[], + crate::storage::HolderDirection::Sent, + ); } (acc, rej, to_pull) @@ -6125,8 +6155,12 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); - let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&sp.id, &sender, prio); + let _ = storage.touch_file_holder( + &sp.id, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); let blob_store = cm.blob_store.clone(); drop(storage); drop(cm); @@ -6153,7 +6187,12 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); - let _ = storage.add_post_upstream(&att.cid, &sender, 0); + let _ = storage.touch_file_holder( + &att.cid, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); } Ok(()) }.await; @@ -6178,12 +6217,14 @@ impl ConnectionManager { Ok(()) } - /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream. + /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate + /// to the post's file_holders (flat set, up to 5 most recent). async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) { use crate::types::BlobHeaderDiffOp; - // Gather policy + audience data, then drop lock immediately - let (policy, approved_audience, downstream, upstreams) = { + // Gather policy + audience data + holders, then drop lock immediately. + // Remote peer clearly holds this post — record them as a holder. + let (policy, approved_audience, holders) = { let storage = self.storage.get().await; let policy = storage.get_comment_policy(&payload.post_id) .ok() @@ -6193,13 +6234,18 @@ impl ConnectionManager { crate::types::AudienceDirection::Inbound, Some(crate::types::AudienceStatus::Approved), ).unwrap_or_default(); - let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default(); - let upstreams: Vec = storage.get_post_upstreams(&payload.post_id) + let _ = storage.touch_file_holder( + &payload.post_id, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); + let holders: Vec = storage.get_file_holders(&payload.post_id) .unwrap_or_default() .into_iter() - .map(|(nid, _)| nid) + .map(|(nid, _addrs)| nid) .collect(); - (policy, approved, downstream, upstreams) + (policy, approved, holders) }; // Filter ops using gathered data (no lock held) @@ -6381,26 +6427,16 @@ impl ConnectionManager { let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms); } - // Collect all targets (downstream + all upstreams), then send in a single batched task + // Re-propagate to all file holders (flat set, max 5). Exclude sender. let mut targets: Vec = Vec::new(); - for peer_id in downstream { - if peer_id == sender { continue; } - if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone())) + for peer_id in &holders { + if *peer_id == sender { continue; } + if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone())) { targets.push(conn); } } - // Phase 6: Try all upstreams, not just one - for up in &upstreams { - if *up != sender { - if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone())) - { - targets.push(conn); - } - } - } if !targets.is_empty() { let payload_clone = payload.clone(); tokio::spawn(async move { @@ -7684,8 +7720,8 @@ impl ConnectionActor { if s.get_post_with_visibility(post_id).ok().flatten().is_some() { post_holder = Some(ctx.our_node_id); } else { - let downstream = s.get_post_downstream(post_id).unwrap_or_default(); - if !downstream.is_empty() { post_holder = Some(downstream[0]); } + let holders = s.get_file_holders(post_id).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); } } } @@ -7695,8 +7731,8 @@ impl ConnectionActor { } else { let s = ctx.storage.get().await; if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) { - let downstream = s.get_post_downstream(&pid).unwrap_or_default(); - if !downstream.is_empty() { blob_holder = Some(downstream[0]); } + let holders = s.get_file_holders(&pid).unwrap_or_default(); + if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); } } } } diff --git a/crates/core/src/http.rs b/crates/core/src/http.rs index 42fbbe6..d79fcb1 100644 --- a/crates/core/src/http.rs +++ b/crates/core/src/http.rs @@ -378,7 +378,11 @@ async fn try_redirect( Ok(Some((_, PostVisibility::Public))) => {} _ => return false, // not found or not public — hard close } - store.get_post_downstream(post_id).unwrap_or_default() + store.get_file_holders(post_id) + .unwrap_or_default() + .into_iter() + .map(|(nid, _addrs)| nid) + .collect::>() }; // Get addresses for downstream peers diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 8874a2e..7b3a11a 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -902,50 +902,6 @@ impl Network { self.send_to_audience(MessageType::PostNotification, &payload).await } - /// Push a full post directly to recipients (persistent if available, ephemeral otherwise). - pub async fn push_post_to_recipients( - &self, - post_id: &crate::types::PostId, - post: &Post, - visibility: &PostVisibility, - ) -> usize { - let recipients: Vec = match visibility { - PostVisibility::Public => return 0, - PostVisibility::Encrypted { recipients } => { - recipients.iter().map(|wk| wk.recipient).collect() - } - PostVisibility::GroupEncrypted { group_id, .. } => { - // Push to all group members - match self.storage.get().await.get_all_group_members() { - Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(), - Err(_) => return 0, - } - } - }; - - let payload = PostPushPayload { - post: SyncPost { - id: *post_id, - post: post.clone(), - visibility: visibility.clone(), - }, - }; - - let mut pushed = 0; - for recipient in &recipients { - if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() { - pushed += 1; - debug!( - recipient = hex::encode(recipient), - post_id = hex::encode(post_id), - "Pushed post to recipient" - ); - } - } - - pushed - } - /// Push a profile update to all audience members (ephemeral-capable). pub async fn push_profile(&self, profile: &PublicProfile) -> usize { // Sanitize: if public_visible=false, strip display_name/bio from pushed profile @@ -1059,15 +1015,16 @@ impl Network { sent } - /// Push updated manifests to all downstream peers for a given CID. + /// Push an updated manifest to all known holders of the file (flat set, + /// up to 5 most-recent). Replaces the legacy downstream-tree push. pub async fn push_manifest_to_downstream( &self, cid: &[u8; 32], manifest: &crate::types::CdnManifest, ) -> usize { - let downstream = { + let holders = { let storage = self.storage.get().await; - storage.get_blob_downstream(cid).unwrap_or_default() + storage.get_file_holders(cid).unwrap_or_default() }; let payload = crate::protocol::ManifestPushPayload { manifests: vec![crate::protocol::ManifestPushEntry { @@ -1076,54 +1033,40 @@ impl Network { }], }; let mut sent = 0; - for (ds_nid, _) in &downstream { - if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { + for (peer, peer_addrs) in &holders { + if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() { sent += 1; + let storage = self.storage.get().await; + let _ = storage.touch_file_holder( + cid, + peer, + peer_addrs, + crate::storage::HolderDirection::Sent, + ); } } sent } - /// Send blob delete notices to downstream and upstream peers. - /// Downstream peers receive our upstream info for tree healing. - /// Upstream peers receive no upstream info (just "remove me as downstream"). + /// Send blob delete notices to all known holders of a file. + /// Second argument kept as Option for signature stability; flat-holder + /// model doesn't need separate upstream handling. pub async fn send_blob_delete_notices( &self, cid: &[u8; 32], - downstream: &[(NodeId, Vec)], - upstream: Option<&(NodeId, Vec)>, + holders: &[(NodeId, Vec)], + _legacy_upstream: Option<&(NodeId, Vec)>, ) -> usize { - let upstream_info = upstream.map(|(nid, addrs)| { - crate::types::PeerWithAddress { - n: hex::encode(nid), - a: addrs.clone(), - } - }); - - let mut sent = 0; - - // Notify downstream (with upstream info for tree healing) - let ds_payload = crate::protocol::BlobDeleteNoticePayload { + let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, - upstream_node: upstream_info, + upstream_node: None, }; - for (ds_nid, _) in downstream { - if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() { + let mut sent = 0; + for (peer, _addrs) in holders { + if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() { sent += 1; } } - - // Notify upstream (no upstream info) - if let Some((up_nid, _)) = upstream { - let up_payload = crate::protocol::BlobDeleteNoticePayload { - cid: *cid, - upstream_node: None, - }; - if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() { - sent += 1; - } - } - sent } @@ -2356,24 +2299,24 @@ impl Network { self.endpoint.close().await; } - /// Propagate an engagement diff to all downstream holders of a post (CDN tree). - /// Excludes the sender to avoid loops. + /// Propagate an engagement diff to all known holders of a post (flat set, + /// up to 5 most-recent). Excludes the sender to avoid loops. pub async fn propagate_engagement_diff( &self, post_id: &crate::types::PostId, payload: &crate::protocol::BlobHeaderDiffPayload, exclude_peer: &crate::types::NodeId, ) -> usize { - let downstream = { + let holders = { let storage = self.storage.get().await; - storage.get_post_downstream(post_id).unwrap_or_default() + storage.get_file_holders(post_id).unwrap_or_default() }; let mut sent = 0; - for ds_nid in &downstream { - if ds_nid == exclude_peer { + for (peer, _addrs) in &holders { + if peer == exclude_peer { continue; } - if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() { + if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() { sent += 1; } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 067c632..85c7606 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -836,13 +836,12 @@ impl Node { } } - // For encrypted posts, push directly to recipients - let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await; - - // For public posts, push to audience members + // For public posts, push to audience members. Encrypted posts propagate + // via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient + // traffic signal. let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await; - info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post"); + info!(post_id = hex::encode(post_id), audience_pushed, "Created new post"); Ok((post_id, post, visibility)) } @@ -1351,7 +1350,12 @@ impl Node { let source_addrs: Vec = response.manifest.as_ref() .map(|m| m.host_addresses.clone()) .unwrap_or_default(); - let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); + let _ = storage.touch_file_holder( + cid, + from_peer, + &source_addrs, + crate::storage::HolderDirection::Received, + ); } Ok(data) } @@ -1379,16 +1383,17 @@ impl Node { // Collect redirect peers from responses in case we need them later let mut redirect_peers: Vec = Vec::new(); - // 2. Try existing upstream (if we previously fetched this blob) - let upstream = { + // 2. Try known holders (up to 5 most-recent peers we've interacted + // with about this file). + let known_holders = { let storage = self.storage.get().await; - storage.get_blob_upstream(cid)? + storage.get_file_holders(cid).unwrap_or_default() }; - if let Some((upstream_nid, _upstream_addrs)) = upstream { - match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await { + for (holder_nid, _addrs) in &known_holders { + match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} - Err(e) => warn!(error = %e, "blob fetch from upstream failed"), + Err(e) => warn!(error = %e, "blob fetch from known holder failed"), } } @@ -1413,7 +1418,12 @@ impl Node { let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at); } } - let _ = storage.store_blob_upstream(cid, &lateral, &[]); + let _ = storage.touch_file_holder( + cid, + &lateral, + &[], + crate::storage::HolderDirection::Received, + ); return Ok(Some(data)); } Ok((None, response)) => { @@ -1981,14 +1991,13 @@ impl Node { signature, }; - // Collect blob CIDs + CDN peers before cleanup + // Collect blob CIDs + known holders before cleanup (for delete notices) let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = { let storage = self.storage.get().await; let cids = storage.get_blobs_for_post(post_id).unwrap_or_default(); cids.into_iter().map(|cid| { - let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); - let upstream = storage.get_blob_upstream(&cid).ok().flatten(); - (cid, downstream, upstream) + let holders = storage.get_file_holders(&cid).unwrap_or_default(); + (cid, holders, None::<(NodeId, Vec)>) }).collect() }; @@ -2108,12 +2117,10 @@ impl Node { storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?; } - // delete_post already pushes the DeleteRecord + // delete_post already pushes the DeleteRecord. + // Replacement post propagates via the CDN to remaining recipients. self.delete_post(post_id).await?; - // Push replacement post directly to remaining recipients - self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await; - info!( old_id = hex::encode(post_id), new_id = hex::encode(new_post_id), @@ -3086,20 +3093,27 @@ impl Node { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 - max_age_ms; - let stale = { + let stale_cids = { let s = storage.get().await; - s.get_stale_manifests(cutoff).unwrap_or_default() + s.get_stale_manifest_cids(cutoff).unwrap_or_default() }; - for (cid, upstream_nid, _upstream_addrs) in &stale { - // Get current updated_at for this manifest - let current_updated_at = { + for cid in &stale_cids { + // Get current updated_at + pick a holder to refresh from + let (current_updated_at, refresh_source) = { let s = storage.get().await; - s.get_cdn_manifest(cid).ok().flatten() + let updated_at = s.get_cdn_manifest(cid).ok().flatten() .and_then(|json| serde_json::from_str::(&json).ok()) .map(|m| m.updated_at) - .unwrap_or(0) + .unwrap_or(0); + let source = s.get_file_holders(cid) + .unwrap_or_default() + .into_iter() + .next() + .map(|(nid, _)| nid); + (updated_at, source) }; - match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await { + let Some(upstream_nid) = refresh_source else { continue; }; + match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await { Ok(Some(cdn_manifest)) => { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); @@ -3110,10 +3124,10 @@ impl Node { &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at, ); - // Relay to our downstream - let downstream = s.get_blob_downstream(cid).unwrap_or_default(); + // Relay to known holders (flat set) + let holders = s.get_file_holders(cid).unwrap_or_default(); drop(s); - if !downstream.is_empty() { + if !holders.is_empty() { network.push_manifest_to_downstream(cid, &cdn_manifest).await; } tracing::debug!( @@ -3126,7 +3140,7 @@ impl Node { Err(e) => { tracing::debug!( cid = hex::encode(cid), - upstream = hex::encode(upstream_nid), + upstream = hex::encode(&upstream_nid), error = %e, "Manifest refresh from upstream failed" ); @@ -3277,18 +3291,16 @@ impl Node { compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms) } - /// Delete a blob with CDN notifications to upstream/downstream. + /// Delete a blob with CDN notifications to known holders. pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - // Gather CDN peers before cleanup - let (downstream, upstream) = { + // Gather known holders before cleanup + let holders = { let storage = self.storage.get().await; - let ds = storage.get_blob_downstream(cid).unwrap_or_default(); - let up = storage.get_blob_upstream(cid).ok().flatten(); - (ds, up) + storage.get_file_holders(cid).unwrap_or_default() }; - // Send CDN delete notices - self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await; + // Send CDN delete notices to all holders + self.network.send_blob_delete_notices(cid, &holders, None).await; // Clean up local storage { @@ -3576,15 +3588,9 @@ impl Node { ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())], timestamp_ms: now, }; + // propagate_engagement_diff targets all file_holders (flat set, max 5) + // which already subsumes what used to be upstream + downstream. network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send to all upstreams (toward author) — Phase 6 multi-upstream - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(reaction) @@ -3691,14 +3697,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send to all upstreams (toward author) — Phase 6 multi-upstream - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(comment) @@ -3735,14 +3733,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(()) } @@ -3776,14 +3766,6 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } } Ok(()) } @@ -4005,14 +3987,6 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } Ok(()) } @@ -4127,14 +4101,6 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - // Phase 6: send to all upstreams - let upstreams = { - let storage = self.storage.get().await; - storage.get_post_upstreams(&post_id).unwrap_or_default() - }; - for (up, _prio) in upstreams { - let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; - } Ok(()) } @@ -4367,10 +4333,10 @@ impl Node { } }; - // Filter to under-replicated (< 2 downstream) + // Filter to under-replicated (< 2 holders) let mut needs_replication = Vec::new(); for pid in &recent_ids { - match storage.get_post_downstream_count(pid) { + match storage.get_file_holder_count(pid) { Ok(count) if count < 2 => { needs_replication.push(*pid); } diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 5f37559..31434bb 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -12,6 +12,26 @@ use crate::types::{ VisibilityIntent, }; +/// Direction for file_holders entries: whether we sent the file to this peer, +/// received it from them, or both. Not load-bearing for propagation decisions — +/// any holder can serve as a diff target — but retained for potential reuse. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HolderDirection { + Sent, + Received, + Both, +} + +impl HolderDirection { + pub fn as_str(&self) -> &'static str { + match self { + HolderDirection::Sent => "sent", + HolderDirection::Received => "received", + HolderDirection::Both => "both", + } + } +} + /// Blob metadata for eviction scoring. pub struct EvictionCandidate { pub cid: [u8; 32], @@ -262,20 +282,6 @@ impl Storage { updated_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author); - CREATE TABLE IF NOT EXISTS blob_upstream ( - cid BLOB PRIMARY KEY, - source_node_id BLOB NOT NULL, - source_addresses TEXT NOT NULL DEFAULT '[]', - stored_at INTEGER NOT NULL - ); - CREATE TABLE IF NOT EXISTS blob_downstream ( - cid BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - peer_addresses TEXT NOT NULL DEFAULT '[]', - registered_at INTEGER NOT NULL, - PRIMARY KEY (cid, peer_node_id) - ); - CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid); CREATE TABLE IF NOT EXISTS group_keys ( group_id BLOB PRIMARY KEY, circle_name TEXT NOT NULL, @@ -326,17 +332,6 @@ impl Storage { last_seen_ms INTEGER NOT NULL, success_count INTEGER NOT NULL DEFAULT 0 ); - CREATE TABLE IF NOT EXISTS post_downstream ( - post_id BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - registered_at INTEGER NOT NULL, - PRIMARY KEY (post_id, peer_node_id) - ); - CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id); - CREATE TABLE IF NOT EXISTS post_upstream ( - post_id BLOB PRIMARY KEY, - peer_node_id BLOB NOT NULL - ); CREATE TABLE IF NOT EXISTS blob_headers ( post_id BLOB PRIMARY KEY, author BLOB NOT NULL, @@ -389,7 +384,17 @@ impl Storage { CREATE TABLE IF NOT EXISTS seen_messages ( partner_id BLOB PRIMARY KEY, last_read_ms INTEGER NOT NULL DEFAULT 0 - );", + ); + CREATE TABLE IF NOT EXISTS file_holders ( + file_id BLOB NOT NULL, + peer_id BLOB NOT NULL, + peer_addresses TEXT NOT NULL DEFAULT '[]', + last_interaction_ms INTEGER NOT NULL, + direction TEXT NOT NULL, + PRIMARY KEY (file_id, peer_id) + ); + CREATE INDEX IF NOT EXISTS idx_file_holders_recency + ON file_holders(file_id, last_interaction_ms DESC);", )?; Ok(()) } @@ -543,16 +548,6 @@ impl Storage { )?; } - // Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration) - let has_blob_pref_tree = self.conn.prepare( - "SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'" - )?.query_row([], |row| row.get::<_, i64>(0))?; - if has_blob_pref_tree == 0 { - self.conn.execute_batch( - "ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';" - )?; - } - // Add public_visible column to profiles if missing (Phase D-4 migration) let has_public_visible = self.conn.prepare( "SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'" @@ -666,25 +661,18 @@ impl Storage { )?; } - // Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max) - let has_priority = self.conn.prepare( - "SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'" - )?.query_row([], |row| row.get::<_, i64>(0))?; - if has_priority == 0 { - self.conn.execute_batch( - "ALTER TABLE post_upstream RENAME TO post_upstream_old; - CREATE TABLE post_upstream ( - post_id BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - priority INTEGER NOT NULL DEFAULT 0, - registered_at INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (post_id, peer_node_id) - ); - INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at) - SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old; - DROP TABLE post_upstream_old;" - )?; - } + // 0.6.1-beta: seed file_holders from legacy upstream/downstream tables + // before they're dropped. Idempotent — only fires on an empty + // file_holders table. + self.seed_file_holders_from_legacy()?; + + // 0.6.1-beta: drop legacy directional tables — replaced by file_holders. + self.conn.execute_batch( + "DROP TABLE IF EXISTS blob_upstream; + DROP TABLE IF EXISTS blob_downstream; + DROP TABLE IF EXISTS post_upstream; + DROP TABLE IF EXISTS post_downstream;", + )?; Ok(()) } @@ -2396,8 +2384,7 @@ impl Storage { params![record.post_id.as_slice(), record.author.as_slice()], )?; if deleted > 0 { - self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; - self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; + self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?; } Ok(deleted > 0) @@ -3396,28 +3383,6 @@ impl Storage { Ok(()) } - /// Update the preferred_tree JSON for a blob upstream entry. - pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> { - let json = serde_json::to_string( - &tree.iter().map(hex::encode).collect::>() - )?; - self.conn.execute( - "UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2", - params![json, cid.as_slice()], - )?; - Ok(()) - } - - /// Get the preferred_tree for a blob upstream entry. - pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result> { - let json: String = self.conn.query_row( - "SELECT preferred_tree FROM blob_upstream WHERE cid = ?1", - params![cid.as_slice()], - |row| row.get(0), - ).unwrap_or_else(|_| "[]".to_string()); - Ok(parse_anchors_json(&json)) - } - // ---- Social Routes ---- /// Insert or update a social route entry. @@ -3889,10 +3854,10 @@ impl Storage { GROUP BY post_id ) r ON b.post_id = r.post_id LEFT JOIN ( - SELECT cid, COUNT(*) as ds_count - FROM blob_downstream - GROUP BY cid - ) d ON b.cid = d.cid" + SELECT file_id, COUNT(*) as ds_count + FROM file_holders + GROUP BY file_id + ) d ON b.cid = d.file_id" )?; let rows = stmt.query_map(params![cutoff], |row| { let cid_bytes: Vec = row.get(0)?; @@ -3946,11 +3911,10 @@ impl Storage { Ok(count as u64) } - /// Clean up all CDN metadata for a blob (manifests + upstream + downstream). + /// Clean up all CDN metadata for a blob (manifests + file_holders). pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> { self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?; + self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?; Ok(()) } @@ -3969,12 +3933,6 @@ impl Storage { Ok(cids) } - /// Remove upstream tracking for a blob CID. - pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; - Ok(()) - } - pub fn post_count(&self) -> anyhow::Result { let count: i64 = self .conn @@ -4037,137 +3995,24 @@ impl Storage { Ok(result) } - /// Record the upstream source for a blob CID. - pub fn store_blob_upstream( - &self, - cid: &[u8; 32], - source_node_id: &NodeId, - source_addresses: &[String], - ) -> anyhow::Result<()> { - let addrs_json = serde_json::to_string(source_addresses)?; - self.conn.execute( - "INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4", - params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()], - )?; - Ok(()) - } - - /// Get the upstream source for a blob CID: (node_id, addresses). - pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { - let result = self.conn.query_row( - "SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1", - params![cid.as_slice()], - |row| { - let nid_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((nid_bytes, addrs_json)) - }, - ); - match result { - Ok((nid_bytes, addrs_json)) => { - let nid = blob_to_nodeid(nid_bytes)?; - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - Ok(Some((nid, addrs))) - } - Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(e) => Err(e.into()), - } - } - - /// Register a downstream peer for a blob CID. Returns false if already at 100 downstream. - pub fn add_blob_downstream( - &self, - cid: &[u8; 32], - peer_node_id: &NodeId, - peer_addresses: &[String], - ) -> anyhow::Result { - let count = self.get_blob_downstream_count(cid)?; - if count >= 100 { - return Ok(false); - } - let addrs_json = serde_json::to_string(peer_addresses)?; - self.conn.execute( - "INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4", - params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()], - )?; - Ok(true) - } - - /// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>. - pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { + /// Get CIDs of manifests older than a cutoff. Callers look up holders + /// via file_holders to pick a refresh source. + pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result> { let mut stmt = self.conn.prepare( - "SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1" - )?; - let rows = stmt.query_map(params![cid.as_slice()], |row| { - let nid_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((nid_bytes, addrs_json)) - })?; - let mut result = Vec::new(); - for row in rows { - let (nid_bytes, addrs_json) = row?; - let nid = blob_to_nodeid(nid_bytes)?; - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - result.push((nid, addrs)); - } - Ok(result) - } - - /// Count downstream peers for a blob CID. - pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result { - let count: i64 = self.conn.query_row( - "SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1", - params![cid.as_slice()], - |row| row.get(0), - )?; - Ok(count as u32) - } - - /// Remove a downstream peer for a blob CID. - pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2", - params![cid.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>. - pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result)>> { - let mut stmt = self.conn.prepare( - "SELECT m.cid, u.source_node_id, u.source_addresses - FROM cdn_manifests m - LEFT JOIN blob_upstream u ON m.cid = u.cid - WHERE m.updated_at < ?1" + "SELECT cid FROM cdn_manifests WHERE updated_at < ?1", )?; let rows = stmt.query_map(params![older_than_ms as i64], |row| { let cid_bytes: Vec = row.get(0)?; - let nid_bytes: Option> = row.get(1)?; - let addrs_json: Option = row.get(2)?; - Ok((cid_bytes, nid_bytes, addrs_json)) + Ok(cid_bytes) })?; - let mut result = Vec::new(); + let mut out = Vec::new(); for row in rows { - let (cid_bytes, nid_bytes, addrs_json) = row?; - let cid: [u8; 32] = match cid_bytes.try_into() { - Ok(c) => c, - Err(_) => continue, - }; - let nid = match nid_bytes { - Some(b) => match blob_to_nodeid(b) { - Ok(n) => n, - Err(_) => continue, - }, - None => continue, - }; - let addrs: Vec = addrs_json - .map(|j| serde_json::from_str(&j).unwrap_or_default()) - .unwrap_or_default(); - result.push((cid, nid, addrs)); + let cid_bytes = row?; + if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) { + out.push(cid); + } } - Ok(result) + Ok(out) } /// Get the 10 posts before and 10 posts after a reference timestamp for an author. @@ -4271,128 +4116,148 @@ impl Storage { Ok(result) } - // --- Engagement: post_downstream --- + // --- File holders (flat, per-file, LRU-capped at 5) --- + // + // A single table for PostId-keyed engagement propagation and CID-keyed + // manifest/blob propagation. Any 32-byte content-addressed file_id fits. - /// Register a peer as downstream for a post (max 100 per post). - /// Returns true if added, false if at capacity. - pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result { - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - if count >= 100 { - return Ok(false); - } - self.conn.execute( - "INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3) - ON CONFLICT DO NOTHING", - params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()], - )?; - Ok(true) - } - - /// Get all downstream peers for a post. - pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result> { - let mut stmt = self.conn.prepare( - "SELECT peer_node_id FROM post_downstream WHERE post_id = ?1" - )?; - let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec>(0))?; - let mut result = Vec::new(); - for row in rows { - if let Ok(nid) = blob_to_nodeid(row?) { - result.push(nid); - } - } - Ok(result) - } - - /// Remove a downstream peer for a post. - pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - // --- Engagement: post_upstream (multi-upstream, 3 max) --- - - /// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post. - pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> { - // Check current count - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - if count >= 3 { - return Ok(()); // Already at cap - } + /// Upsert a holder for a file. Bumps last_interaction_ms to now and + /// enforces an LRU cap of 5 holders per file. + pub fn touch_file_holder( + &self, + file_id: &[u8; 32], + peer_id: &NodeId, + peer_addresses: &[String], + direction: HolderDirection, + ) -> anyhow::Result<()> { + let addrs_json = serde_json::to_string(peer_addresses)?; let now = now_ms(); + let new_dir = direction.as_str(); + // Upsert. If the row exists with a different direction, promote to "both". self.conn.execute( - "INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at) - VALUES (?1, ?2, ?3, ?4)", - params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now], + "INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(file_id, peer_id) DO UPDATE SET + peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END, + last_interaction_ms = ?4, + direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END", + params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir], + )?; + // Enforce LRU cap of 5. Oldest get dropped. + self.conn.execute( + "DELETE FROM file_holders + WHERE file_id = ?1 + AND peer_id NOT IN ( + SELECT peer_id FROM file_holders + WHERE file_id = ?1 + ORDER BY last_interaction_ms DESC + LIMIT 5 + )", + params![file_id.as_slice()], )?; Ok(()) } - /// Get all upstream peers for a post, ordered by priority ASC (0 = primary). - pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result> { - let mut stmt = self.conn.prepare( - "SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC" - )?; - let rows = stmt.query_map(params![post_id.as_slice()], |row| { - let bytes: Vec = row.get(0)?; - let prio: i64 = row.get(1)?; - Ok((bytes, prio as u8)) - })?; - let mut result = Vec::new(); - for row in rows { - let (bytes, prio) = row?; - if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) { - result.push((nid, prio)); - } - } - Ok(result) - } - - /// Get the primary (lowest priority) upstream peer for a post. - /// Backward-compatible wrapper for code that only needs a single upstream. - pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result> { - let upstreams = self.get_post_upstreams(post_id)?; - Ok(upstreams.into_iter().next().map(|(nid, _)| nid)) - } - - /// Remove a specific upstream peer for a post. - pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Promote an upstream peer to primary (priority 0), pushing others up. - pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - // Shift all priorities up by 1 - self.conn.execute( - "UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1", - params![post_id.as_slice()], - )?; - // Set the promoted peer to priority 0 - self.conn.execute( - "UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Count downstream peers for a post. - pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result { + /// Count file holders (bounded at 5 by touch_file_holder's LRU cap). + pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result { let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; + "SELECT COUNT(*) FROM file_holders WHERE file_id = ?1", + )?.query_row(params![file_id.as_slice()], |row| row.get(0))?; Ok(count as u32) } + /// Return the up-to-5 most recently interacted holders of a file. + pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result)>> { + let mut stmt = self.conn.prepare( + "SELECT peer_id, peer_addresses FROM file_holders + WHERE file_id = ?1 + ORDER BY last_interaction_ms DESC + LIMIT 5", + )?; + let rows = stmt.query_map(params![file_id.as_slice()], |row| { + let peer_bytes: Vec = row.get(0)?; + let addrs_json: String = row.get(1)?; + Ok((peer_bytes, addrs_json)) + })?; + let mut out = Vec::new(); + for row in rows { + let (peer_bytes, addrs_json) = row?; + if peer_bytes.len() != 32 { continue; } + let mut peer = [0u8; 32]; + peer.copy_from_slice(&peer_bytes); + let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); + out.push((NodeId::from(peer), addrs)); + } + Ok(out) + } + + /// Remove all holders for a file (e.g. on post/blob deletion). + pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM file_holders WHERE file_id = ?1", + params![file_id.as_slice()], + )?; + Ok(()) + } + + /// Remove a single peer's holder entry for a file. + pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", + params![file_id.as_slice(), peer_id.as_slice()], + )?; + Ok(()) + } + + /// One-time migration: seed file_holders from the legacy upstream/downstream + /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder + /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the + /// PRIMARY KEY. Skips tables that don't exist on fresh installs. + pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { + // Skip if file_holders already populated (idempotent re-run protection). + let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? + .query_row([], |row| row.get(0))?; + if existing > 0 { + return Ok(()); + } + let now = now_ms() as i64; + let table_exists = |name: &str| -> anyhow::Result { + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", + )?.query_row(params![name], |row| row.get(0))?; + Ok(count > 0) + }; + if table_exists("post_upstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", + params![now], + )?; + } + if table_exists("post_downstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", + params![now], + )?; + } + if table_exists("blob_upstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", + params![now], + )?; + } + if table_exists("blob_downstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", + params![now], + )?; + } + Ok(()) + } + // --- Engagement: reactions --- /// Store a reaction (upsert by reactor+post_id+emoji). @@ -5301,60 +5166,6 @@ mod tests { assert_eq!(manifests[0].0, cid); } - #[test] - fn blob_upstream_crud() { - let s = temp_storage(); - let cid = [42u8; 32]; - let source = make_node_id(1); - let addrs = vec!["10.0.0.1:4433".to_string()]; - - s.store_blob_upstream(&cid, &source, &addrs).unwrap(); - let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap(); - assert_eq!(nid, source); - assert_eq!(got_addrs, addrs); - - // Missing - assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none()); - - // Update - let source2 = make_node_id(2); - s.store_blob_upstream(&cid, &source2, &[]).unwrap(); - let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap(); - assert_eq!(nid, source2); - } - - #[test] - fn blob_downstream_crud_and_limit() { - let s = temp_storage(); - let cid = [42u8; 32]; - - // Add downstream peers - for i in 0..100u8 { - let peer = make_node_id(i); - let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap(); - assert!(ok, "should accept peer {}", i); - } - - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100); - - // 101st should be rejected - let peer_101 = make_node_id(200); - let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); - assert!(!ok, "should reject 101st downstream"); - - // Get all downstream - let downstream = s.get_blob_downstream(&cid).unwrap(); - assert_eq!(downstream.len(), 100); - - // Remove one - s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap(); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99); - - // Now adding one more should work - let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); - assert!(ok, "should accept after removal"); - } - #[test] fn blob_pin_unpin() { let s = temp_storage(); @@ -5438,18 +5249,15 @@ mod tests { let peer = make_node_id(2); s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap(); - s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); - s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap(); + s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_some()); - assert!(s.get_blob_upstream(&cid).unwrap().is_some()); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1); + assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1); s.cleanup_cdn_for_blob(&cid).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_none()); - assert!(s.get_blob_upstream(&cid).unwrap().is_none()); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0); + assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0); } #[test] @@ -5469,18 +5277,6 @@ mod tests { assert!(cids.contains(&cid2)); } - #[test] - fn remove_blob_upstream() { - let s = temp_storage(); - let cid = [42u8; 32]; - let peer = make_node_id(1); - - s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); - assert!(s.get_blob_upstream(&cid).unwrap().is_some()); - - s.remove_blob_upstream(&cid).unwrap(); - assert!(s.get_blob_upstream(&cid).unwrap().is_none()); - } #[test] fn author_post_neighborhood() { @@ -5840,24 +5636,6 @@ mod tests { assert_eq!(got2.preferred_tree.len(), 2); } - #[test] - fn blob_upstream_preferred_tree() { - let s = temp_storage(); - let cid = [42u8; 32]; - let source = make_node_id(1); - s.store_blob_upstream(&cid, &source, &[]).unwrap(); - - // Initially empty - let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap(); - assert!(tree.is_empty()); - - // Update - let nodes = vec![make_node_id(10), make_node_id(11)]; - s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap(); - let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap(); - assert_eq!(tree2.len(), 2); - } - // ---- Circle Profile tests ---- #[test] @@ -6058,32 +5836,39 @@ mod tests { // --- Engagement tests --- #[test] - fn post_downstream_crud() { + fn file_holders_lru_cap() { let s = temp_storage(); - let post_id = make_post_id(1); - let peer1 = make_node_id(1); - let peer2 = make_node_id(2); - - // Add downstream peers - assert!(s.add_post_downstream(&post_id, &peer1).unwrap()); - assert!(s.add_post_downstream(&post_id, &peer2).unwrap()); - - let downstream = s.get_post_downstream(&post_id).unwrap(); - assert_eq!(downstream.len(), 2); - assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2); - - // Remove one - s.remove_post_downstream(&post_id, &peer1).unwrap(); - assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1); - - // Capacity limit - let big_post = make_post_id(99); - for i in 0..100u8 { - assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap()); + let file = [42u8; 32]; + // Sleep between inserts so last_interaction_ms actually differs (ms resolution). + for i in 0..7u8 { + s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(2)); } - assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100); - // 101st should fail - assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap()); + // Only 5 most-recent survive + assert_eq!(s.get_file_holder_count(&file).unwrap(), 5); + let holders = s.get_file_holders(&file).unwrap(); + assert_eq!(holders.len(), 5); + let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect(); + // Oldest two (i=0, i=1) got evicted; most recent (i=6) survives + assert!(!kept.contains(&make_node_id(0))); + assert!(!kept.contains(&make_node_id(1))); + assert!(kept.contains(&make_node_id(6))); + } + + #[test] + fn file_holders_direction_promotion() { + let s = temp_storage(); + let file = [42u8; 32]; + let peer = make_node_id(1); + s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap(); + s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap(); + // Re-insert with opposite direction should promote to "both" + let dir: String = s.conn.query_row( + "SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", + rusqlite::params![file.as_slice(), peer.as_slice()], + |row| row.get(0), + ).unwrap(); + assert_eq!(dir, "both"); } #[test] diff --git a/crates/core/src/web.rs b/crates/core/src/web.rs index b9fa902..c946abb 100644 --- a/crates/core/src/web.rs +++ b/crates/core/src/web.rs @@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse if let Some(author) = author_id { holders.push(author); } - if let Ok(downstream) = store.get_post_downstream(&post_id) { - for peer in downstream { + if let Ok(file_holders) = store.get_file_holders(&post_id) { + for (peer, _addrs) in file_holders { if !holders.contains(&peer) { holders.push(peer); }