diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 7d97a7a..bfb0090 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1393,12 +1393,8 @@ impl ConnectionManager { { let s = storage.get().await; for pid in &new_post_ids { - let _ = s.touch_file_holder( - pid, - peer_id, - &[], - crate::storage::HolderDirection::Received, - ); + let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); + let _ = s.add_post_upstream(pid, peer_id, prio); } for author in &synced_authors { let _ = s.update_follow_last_sync(author, now_ms); @@ -1944,12 +1940,8 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let _ = storage.touch_file_holder( - pid, - from, - &[], - crate::storage::HolderDirection::Received, - ); + let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(pid, from, prio); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -2042,12 +2034,8 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let _ = storage.touch_file_holder( - pid, - peer_id, - &[], - crate::storage::HolderDirection::Received, - ); + let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(pid, peer_id, prio); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -2822,9 +2810,13 @@ impl ConnectionManager { if store.get_post_with_visibility(post_id).ok().flatten().is_some() { Some(self.our_node_id) } else { - // Any known holder of this post? - let holders = store.get_file_holders(post_id).unwrap_or_default(); - holders.first().map(|(nid, _)| *nid) + // CDN tree: do any of our downstream hosts have it? + let downstream = store.get_post_downstream(post_id).unwrap_or_default(); + if !downstream.is_empty() { + Some(downstream[0]) + } else { + None + } } }; post_holder = found; @@ -2838,9 +2830,9 @@ impl ConnectionManager { // Check CDN: do we know who has it via blob post ownership? let store = self.storage.get().await; if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) { - let holders = store.get_file_holders(&pid).unwrap_or_default(); - if let Some((nid, _)) = holders.first() { - blob_holder = Some(*nid); + let downstream = store.get_post_downstream(&pid).unwrap_or_default(); + if !downstream.is_empty() { + blob_holder = Some(downstream[0]); } } } @@ -4879,7 +4871,7 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; // Collect blob CIDs + CDN peers before async work - let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>)> = Vec::new(); + let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = Vec::new(); { let storage = cm.storage.get().await; for dr in &payload.records { @@ -4887,8 +4879,9 @@ impl ConnectionManager { // Collect blobs for CDN cleanup before deleting let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default(); for cid in blob_cids { - let holders = storage.get_file_holders(&cid).unwrap_or_default(); - blob_cleanup.push((cid, holders)); + let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); + let upstream = storage.get_blob_upstream(&cid).ok().flatten(); + blob_cleanup.push((cid, downstream, upstream)); } let _ = storage.store_delete(dr); let _ = storage.apply_delete(dr); @@ -4904,11 +4897,18 @@ impl ConnectionManager { // Gather connections for CDN delete notices under lock, then send outside let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new(); - for (cid, holders) in &blob_cleanup { - let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; - for (peer, _addrs) in holders { - if let Some(pc) = cm.connections_ref().get(peer) { - delete_notices.push((pc.connection.clone(), payload.clone())); + for (cid, downstream, upstream) in &blob_cleanup { + let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() }); + let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info }; + for (ds_nid, _) in downstream { + if let Some(pc) = cm.connections_ref().get(ds_nid) { + delete_notices.push((pc.connection.clone(), ds_payload.clone())); + } + } + if let Some((up_nid, _)) = upstream { + let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; + if let Some(pc) = cm.connections_ref().get(up_nid) { + delete_notices.push((pc.connection.clone(), up_payload)); } } } @@ -4958,38 +4958,24 @@ impl ConnectionManager { } MessageType::PostPush => { let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?; - // Encrypted posts are no longer accepted via direct push — they propagate - // via the CDN to eliminate the sender→recipient traffic signal. - if !matches!(push.post.visibility, crate::types::PostVisibility::Public) { - debug!( + let cm = conn_mgr.lock().await; + let storage = cm.storage.get().await; + if !storage.is_deleted(&push.post.id)? + && storage.get_post(&push.post.id)?.is_none() + && crate::content::verify_post_id(&push.post.id, &push.post.post) + { + let _ = storage.store_post_with_visibility( + &push.post.id, + &push.post.post, + &push.post.visibility, + ); + let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); + info!( peer = hex::encode(remote_node_id), post_id = hex::encode(push.post.id), - "Ignoring non-public PostPush" + "Received direct post push" ); - } else { - let cm = conn_mgr.lock().await; - let storage = cm.storage.get().await; - if !storage.is_deleted(&push.post.id)? - && storage.get_post(&push.post.id)?.is_none() - && crate::content::verify_post_id(&push.post.id, &push.post.post) - { - let _ = storage.store_post_with_visibility( - &push.post.id, - &push.post.post, - &push.post.visibility, - ); - let _ = storage.touch_file_holder( - &push.post.id, - &remote_node_id, - &[], - crate::storage::HolderDirection::Received, - ); - info!( - peer = hex::encode(remote_node_id), - post_id = hex::encode(push.post.id), - "Received direct post push" - ); - } } } MessageType::AudienceRequest => { @@ -5077,24 +5063,17 @@ impl ConnectionManager { &entry.manifest.author_manifest.author, entry.manifest.author_manifest.updated_at, ); - // Remote peer pushed us this manifest → they hold the file. - let _ = storage.touch_file_holder( - &entry.cid, - &remote_node_id, - &[], - crate::storage::HolderDirection::Received, - ); stored_entries.push(entry.clone()); } - // Gather file holders for relay before dropping locks + // Gather downstream peers for relay before dropping locks let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new(); for entry in &stored_entries { - let holders = storage.get_file_holders(&entry.cid).unwrap_or_default(); - for (peer, _addrs) in holders { - if peer == remote_node_id { + let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default(); + for (ds_nid, _) in downstream { + if ds_nid == remote_node_id { continue; } - relay_targets.push((peer, crate::protocol::ManifestPushPayload { + relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload { manifests: vec![entry.clone()], })); } @@ -5197,12 +5176,8 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { - let _ = storage.touch_file_holder( - &sync_post.id, - &sender_id, - &[], - crate::storage::HolderDirection::Received, - ); + let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -5293,14 +5268,32 @@ impl ConnectionManager { let storage = cm.storage.get().await; let cid = payload.cid; - // Flat-holder model: drop the sender as a holder of this file. - // The author's DeleteRecord (separate signed message) is what - // triggers the actual blob removal for followers. - let _ = storage.remove_file_holder(&cid, &remote_node_id); + // Check if sender was our upstream for this blob + let was_upstream = storage.get_blob_upstream(&cid).ok().flatten() + .map(|(nid, _)| nid == remote_node_id) + .unwrap_or(false); + + if was_upstream { + // Sender was our upstream — clear it + let _ = storage.remove_blob_upstream(&cid); + + // If they provided their upstream, store it as our new upstream + if let Some(ref new_up) = payload.upstream_node { + if let Ok(nid_bytes) = hex::decode(&new_up.n) { + if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { + let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a); + } + } + } + } else { + // Sender was our downstream — remove them + let _ = storage.remove_blob_downstream(&cid, &remote_node_id); + } info!( peer = hex::encode(remote_node_id), cid = hex::encode(cid), + was_upstream, "Received blob delete notice" ); } @@ -5444,12 +5437,7 @@ impl ConnectionManager { let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?; let cm = conn_mgr.lock().await; let storage = cm.storage.get().await; - let _ = storage.touch_file_holder( - &payload.post_id, - &remote_node_id, - &[], - crate::storage::HolderDirection::Sent, - ); + let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); drop(storage); trace!( peer = hex::encode(remote_node_id), @@ -5704,28 +5692,15 @@ impl ConnectionManager { let storage = storage.get().await; let manifest: Option = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| { if let Ok(am) = serde_json::from_str::(&json) { - let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); + let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { serde_json::from_str(&json).ok() } }); let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { - let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); - let _ = storage.touch_file_holder( - &payload.cid, - &remote_node_id, - &payload.requester_addresses, - crate::storage::HolderDirection::Sent, - ); - // If we already had 5 holders before adding this one, the - // requester should consult them too for CDN lookups. - if prior_count < 5 { - (true, vec![]) - } else { - let holders = storage.get_file_holders(&payload.cid).unwrap_or_default(); - let redirects: Vec = holders.into_iter() - .filter(|(nid, _)| *nid != remote_node_id) - .map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }) - .collect(); + let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); + if ok { (true, vec![]) } else { + let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); + let redirects: Vec = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); (false, redirects) } } else { (false, vec![]) }; @@ -5752,7 +5727,7 @@ impl ConnectionManager { Some(json) => { let manifest = if let Ok(am) = serde_json::from_str::(&json) { if am.updated_at > payload.current_updated_at { - let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0); + let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0); Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) } else { None } } else { None }; @@ -6097,14 +6072,9 @@ impl ConnectionManager { to_pull.push(*pid); } - // Register as holder for all accepted posts + // Register as downstream for all accepted posts for pid in &acc { - let _ = storage.touch_file_holder( - pid, - &remote_node_id, - &[], - crate::storage::HolderDirection::Sent, - ); + let _ = storage.add_post_downstream(pid, &remote_node_id); } (acc, rej, to_pull) @@ -6155,12 +6125,8 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); - let _ = storage.touch_file_holder( - &sp.id, - &sender, - &[], - crate::storage::HolderDirection::Received, - ); + let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&sp.id, &sender, prio); let blob_store = cm.blob_store.clone(); drop(storage); drop(cm); @@ -6187,12 +6153,7 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); - let _ = storage.touch_file_holder( - &att.cid, - &sender, - &[], - crate::storage::HolderDirection::Received, - ); + let _ = storage.add_post_upstream(&att.cid, &sender, 0); } Ok(()) }.await; @@ -6217,14 +6178,12 @@ impl ConnectionManager { Ok(()) } - /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate - /// to the post's file_holders (flat set, up to 5 most recent). + /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream. async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) { use crate::types::BlobHeaderDiffOp; - // Gather policy + audience data + holders, then drop lock immediately. - // Remote peer clearly holds this post — record them as a holder. - let (policy, approved_audience, holders) = { + // Gather policy + audience data, then drop lock immediately + let (policy, approved_audience, downstream, upstreams) = { let storage = self.storage.get().await; let policy = storage.get_comment_policy(&payload.post_id) .ok() @@ -6234,18 +6193,13 @@ impl ConnectionManager { crate::types::AudienceDirection::Inbound, Some(crate::types::AudienceStatus::Approved), ).unwrap_or_default(); - let _ = storage.touch_file_holder( - &payload.post_id, - &sender, - &[], - crate::storage::HolderDirection::Received, - ); - let holders: Vec = storage.get_file_holders(&payload.post_id) + let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default(); + let upstreams: Vec = storage.get_post_upstreams(&payload.post_id) .unwrap_or_default() .into_iter() - .map(|(nid, _addrs)| nid) + .map(|(nid, _)| nid) .collect(); - (policy, approved, holders) + (policy, approved, downstream, upstreams) }; // Filter ops using gathered data (no lock held) @@ -6427,16 +6381,26 @@ impl ConnectionManager { let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms); } - // Re-propagate to all file holders (flat set, max 5). Exclude sender. + // Collect all targets (downstream + all upstreams), then send in a single batched task let mut targets: Vec = Vec::new(); - for peer_id in &holders { - if *peer_id == sender { continue; } - if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone())) + for peer_id in downstream { + if peer_id == sender { continue; } + if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone())) { targets.push(conn); } } + // Phase 6: Try all upstreams, not just one + for up in &upstreams { + if *up != sender { + if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone())) + { + targets.push(conn); + } + } + } if !targets.is_empty() { let payload_clone = payload.clone(); tokio::spawn(async move { @@ -7720,8 +7684,8 @@ impl ConnectionActor { if s.get_post_with_visibility(post_id).ok().flatten().is_some() { post_holder = Some(ctx.our_node_id); } else { - let holders = s.get_file_holders(post_id).unwrap_or_default(); - if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); } + let downstream = s.get_post_downstream(post_id).unwrap_or_default(); + if !downstream.is_empty() { post_holder = Some(downstream[0]); } } } @@ -7731,8 +7695,8 @@ impl ConnectionActor { } else { let s = ctx.storage.get().await; if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) { - let holders = s.get_file_holders(&pid).unwrap_or_default(); - if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); } + let downstream = s.get_post_downstream(&pid).unwrap_or_default(); + if !downstream.is_empty() { blob_holder = Some(downstream[0]); } } } } diff --git a/crates/core/src/http.rs b/crates/core/src/http.rs index d79fcb1..42fbbe6 100644 --- a/crates/core/src/http.rs +++ b/crates/core/src/http.rs @@ -378,11 +378,7 @@ async fn try_redirect( Ok(Some((_, PostVisibility::Public))) => {} _ => return false, // not found or not public — hard close } - store.get_file_holders(post_id) - .unwrap_or_default() - .into_iter() - .map(|(nid, _addrs)| nid) - .collect::>() + store.get_post_downstream(post_id).unwrap_or_default() }; // Get addresses for downstream peers diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 7b3a11a..8874a2e 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -902,6 +902,50 @@ impl Network { self.send_to_audience(MessageType::PostNotification, &payload).await } + /// Push a full post directly to recipients (persistent if available, ephemeral otherwise). + pub async fn push_post_to_recipients( + &self, + post_id: &crate::types::PostId, + post: &Post, + visibility: &PostVisibility, + ) -> usize { + let recipients: Vec = match visibility { + PostVisibility::Public => return 0, + PostVisibility::Encrypted { recipients } => { + recipients.iter().map(|wk| wk.recipient).collect() + } + PostVisibility::GroupEncrypted { group_id, .. } => { + // Push to all group members + match self.storage.get().await.get_all_group_members() { + Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(), + Err(_) => return 0, + } + } + }; + + let payload = PostPushPayload { + post: SyncPost { + id: *post_id, + post: post.clone(), + visibility: visibility.clone(), + }, + }; + + let mut pushed = 0; + for recipient in &recipients { + if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() { + pushed += 1; + debug!( + recipient = hex::encode(recipient), + post_id = hex::encode(post_id), + "Pushed post to recipient" + ); + } + } + + pushed + } + /// Push a profile update to all audience members (ephemeral-capable). pub async fn push_profile(&self, profile: &PublicProfile) -> usize { // Sanitize: if public_visible=false, strip display_name/bio from pushed profile @@ -1015,16 +1059,15 @@ impl Network { sent } - /// Push an updated manifest to all known holders of the file (flat set, - /// up to 5 most-recent). Replaces the legacy downstream-tree push. + /// Push updated manifests to all downstream peers for a given CID. pub async fn push_manifest_to_downstream( &self, cid: &[u8; 32], manifest: &crate::types::CdnManifest, ) -> usize { - let holders = { + let downstream = { let storage = self.storage.get().await; - storage.get_file_holders(cid).unwrap_or_default() + storage.get_blob_downstream(cid).unwrap_or_default() }; let payload = crate::protocol::ManifestPushPayload { manifests: vec![crate::protocol::ManifestPushEntry { @@ -1033,40 +1076,54 @@ impl Network { }], }; let mut sent = 0; - for (peer, peer_addrs) in &holders { - if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() { + for (ds_nid, _) in &downstream { + if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { sent += 1; - let storage = self.storage.get().await; - let _ = storage.touch_file_holder( - cid, - peer, - peer_addrs, - crate::storage::HolderDirection::Sent, - ); } } sent } - /// Send blob delete notices to all known holders of a file. - /// Second argument kept as Option for signature stability; flat-holder - /// model doesn't need separate upstream handling. + /// Send blob delete notices to downstream and upstream peers. + /// Downstream peers receive our upstream info for tree healing. + /// Upstream peers receive no upstream info (just "remove me as downstream"). pub async fn send_blob_delete_notices( &self, cid: &[u8; 32], - holders: &[(NodeId, Vec)], - _legacy_upstream: Option<&(NodeId, Vec)>, + downstream: &[(NodeId, Vec)], + upstream: Option<&(NodeId, Vec)>, ) -> usize { - let payload = crate::protocol::BlobDeleteNoticePayload { - cid: *cid, - upstream_node: None, - }; + let upstream_info = upstream.map(|(nid, addrs)| { + crate::types::PeerWithAddress { + n: hex::encode(nid), + a: addrs.clone(), + } + }); + let mut sent = 0; - for (peer, _addrs) in holders { - if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() { + + // Notify downstream (with upstream info for tree healing) + let ds_payload = crate::protocol::BlobDeleteNoticePayload { + cid: *cid, + upstream_node: upstream_info, + }; + for (ds_nid, _) in downstream { + if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() { sent += 1; } } + + // Notify upstream (no upstream info) + if let Some((up_nid, _)) = upstream { + let up_payload = crate::protocol::BlobDeleteNoticePayload { + cid: *cid, + upstream_node: None, + }; + if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() { + sent += 1; + } + } + sent } @@ -2299,24 +2356,24 @@ impl Network { self.endpoint.close().await; } - /// Propagate an engagement diff to all known holders of a post (flat set, - /// up to 5 most-recent). Excludes the sender to avoid loops. + /// Propagate an engagement diff to all downstream holders of a post (CDN tree). + /// Excludes the sender to avoid loops. pub async fn propagate_engagement_diff( &self, post_id: &crate::types::PostId, payload: &crate::protocol::BlobHeaderDiffPayload, exclude_peer: &crate::types::NodeId, ) -> usize { - let holders = { + let downstream = { let storage = self.storage.get().await; - storage.get_file_holders(post_id).unwrap_or_default() + storage.get_post_downstream(post_id).unwrap_or_default() }; let mut sent = 0; - for (peer, _addrs) in &holders { - if peer == exclude_peer { + for ds_nid in &downstream { + if ds_nid == exclude_peer { continue; } - if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() { + if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() { sent += 1; } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 85c7606..067c632 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -836,12 +836,13 @@ impl Node { } } - // For public posts, push to audience members. Encrypted posts propagate - // via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient - // traffic signal. + // For encrypted posts, push directly to recipients + let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await; + + // For public posts, push to audience members let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await; - info!(post_id = hex::encode(post_id), audience_pushed, "Created new post"); + info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post"); Ok((post_id, post, visibility)) } @@ -1350,12 +1351,7 @@ impl Node { let source_addrs: Vec = response.manifest.as_ref() .map(|m| m.host_addresses.clone()) .unwrap_or_default(); - let _ = storage.touch_file_holder( - cid, - from_peer, - &source_addrs, - crate::storage::HolderDirection::Received, - ); + let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); } Ok(data) } @@ -1383,17 +1379,16 @@ impl Node { // Collect redirect peers from responses in case we need them later let mut redirect_peers: Vec = Vec::new(); - // 2. Try known holders (up to 5 most-recent peers we've interacted - // with about this file). - let known_holders = { + // 2. Try existing upstream (if we previously fetched this blob) + let upstream = { let storage = self.storage.get().await; - storage.get_file_holders(cid).unwrap_or_default() + storage.get_blob_upstream(cid)? }; - for (holder_nid, _addrs) in &known_holders { - match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await { + if let Some((upstream_nid, _upstream_addrs)) = upstream { + match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} - Err(e) => warn!(error = %e, "blob fetch from known holder failed"), + Err(e) => warn!(error = %e, "blob fetch from upstream failed"), } } @@ -1418,12 +1413,7 @@ impl Node { let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at); } } - let _ = storage.touch_file_holder( - cid, - &lateral, - &[], - crate::storage::HolderDirection::Received, - ); + let _ = storage.store_blob_upstream(cid, &lateral, &[]); return Ok(Some(data)); } Ok((None, response)) => { @@ -1991,13 +1981,14 @@ impl Node { signature, }; - // Collect blob CIDs + known holders before cleanup (for delete notices) + // Collect blob CIDs + CDN peers before cleanup let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = { let storage = self.storage.get().await; let cids = storage.get_blobs_for_post(post_id).unwrap_or_default(); cids.into_iter().map(|cid| { - let holders = storage.get_file_holders(&cid).unwrap_or_default(); - (cid, holders, None::<(NodeId, Vec)>) + let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); + let upstream = storage.get_blob_upstream(&cid).ok().flatten(); + (cid, downstream, upstream) }).collect() }; @@ -2117,10 +2108,12 @@ impl Node { storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?; } - // delete_post already pushes the DeleteRecord. - // Replacement post propagates via the CDN to remaining recipients. + // delete_post already pushes the DeleteRecord self.delete_post(post_id).await?; + // Push replacement post directly to remaining recipients + self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await; + info!( old_id = hex::encode(post_id), new_id = hex::encode(new_post_id), @@ -3093,27 +3086,20 @@ impl Node { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 - max_age_ms; - let stale_cids = { + let stale = { let s = storage.get().await; - s.get_stale_manifest_cids(cutoff).unwrap_or_default() + s.get_stale_manifests(cutoff).unwrap_or_default() }; - for cid in &stale_cids { - // Get current updated_at + pick a holder to refresh from - let (current_updated_at, refresh_source) = { + for (cid, upstream_nid, _upstream_addrs) in &stale { + // Get current updated_at for this manifest + let current_updated_at = { let s = storage.get().await; - let updated_at = s.get_cdn_manifest(cid).ok().flatten() + s.get_cdn_manifest(cid).ok().flatten() .and_then(|json| serde_json::from_str::(&json).ok()) .map(|m| m.updated_at) - .unwrap_or(0); - let source = s.get_file_holders(cid) - .unwrap_or_default() - .into_iter() - .next() - .map(|(nid, _)| nid); - (updated_at, source) + .unwrap_or(0) }; - let Some(upstream_nid) = refresh_source else { continue; }; - match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await { + match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await { Ok(Some(cdn_manifest)) => { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); @@ -3124,10 +3110,10 @@ impl Node { &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at, ); - // Relay to known holders (flat set) - let holders = s.get_file_holders(cid).unwrap_or_default(); + // Relay to our downstream + let downstream = s.get_blob_downstream(cid).unwrap_or_default(); drop(s); - if !holders.is_empty() { + if !downstream.is_empty() { network.push_manifest_to_downstream(cid, &cdn_manifest).await; } tracing::debug!( @@ -3140,7 +3126,7 @@ impl Node { Err(e) => { tracing::debug!( cid = hex::encode(cid), - upstream = hex::encode(&upstream_nid), + upstream = hex::encode(upstream_nid), error = %e, "Manifest refresh from upstream failed" ); @@ -3291,16 +3277,18 @@ impl Node { compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms) } - /// Delete a blob with CDN notifications to known holders. + /// Delete a blob with CDN notifications to upstream/downstream. pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - // Gather known holders before cleanup - let holders = { + // Gather CDN peers before cleanup + let (downstream, upstream) = { let storage = self.storage.get().await; - storage.get_file_holders(cid).unwrap_or_default() + let ds = storage.get_blob_downstream(cid).unwrap_or_default(); + let up = storage.get_blob_upstream(cid).ok().flatten(); + (ds, up) }; - // Send CDN delete notices to all holders - self.network.send_blob_delete_notices(cid, &holders, None).await; + // Send CDN delete notices + self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await; // Clean up local storage { @@ -3588,9 +3576,15 @@ impl Node { ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())], timestamp_ms: now, }; - // propagate_engagement_diff targets all file_holders (flat set, max 5) - // which already subsumes what used to be upstream + downstream. network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; + // Also send to all upstreams (toward author) — Phase 6 multi-upstream + let upstreams = { + let storage = self.storage.get().await; + storage.get_post_upstreams(&post_id).unwrap_or_default() + }; + for (up, _prio) in upstreams { + let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; + } } Ok(reaction) @@ -3697,6 +3691,14 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; + // Also send to all upstreams (toward author) — Phase 6 multi-upstream + let upstreams = { + let storage = self.storage.get().await; + storage.get_post_upstreams(&post_id).unwrap_or_default() + }; + for (up, _prio) in upstreams { + let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; + } } Ok(comment) @@ -3733,6 +3735,14 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; + // Phase 6: send to all upstreams + let upstreams = { + let storage = self.storage.get().await; + storage.get_post_upstreams(&post_id).unwrap_or_default() + }; + for (up, _prio) in upstreams { + let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; + } } Ok(()) } @@ -3766,6 +3776,14 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; + // Phase 6: send to all upstreams + let upstreams = { + let storage = self.storage.get().await; + storage.get_post_upstreams(&post_id).unwrap_or_default() + }; + for (up, _prio) in upstreams { + let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; + } } Ok(()) } @@ -3987,6 +4005,14 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; + // Phase 6: send to all upstreams + let upstreams = { + let storage = self.storage.get().await; + storage.get_post_upstreams(&post_id).unwrap_or_default() + }; + for (up, _prio) in upstreams { + let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; + } Ok(()) } @@ -4101,6 +4127,14 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; + // Phase 6: send to all upstreams + let upstreams = { + let storage = self.storage.get().await; + storage.get_post_upstreams(&post_id).unwrap_or_default() + }; + for (up, _prio) in upstreams { + let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; + } Ok(()) } @@ -4333,10 +4367,10 @@ impl Node { } }; - // Filter to under-replicated (< 2 holders) + // Filter to under-replicated (< 2 downstream) let mut needs_replication = Vec::new(); for pid in &recent_ids { - match storage.get_file_holder_count(pid) { + match storage.get_post_downstream_count(pid) { Ok(count) if count < 2 => { needs_replication.push(*pid); } diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 31434bb..5f37559 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -12,26 +12,6 @@ use crate::types::{ VisibilityIntent, }; -/// Direction for file_holders entries: whether we sent the file to this peer, -/// received it from them, or both. Not load-bearing for propagation decisions — -/// any holder can serve as a diff target — but retained for potential reuse. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum HolderDirection { - Sent, - Received, - Both, -} - -impl HolderDirection { - pub fn as_str(&self) -> &'static str { - match self { - HolderDirection::Sent => "sent", - HolderDirection::Received => "received", - HolderDirection::Both => "both", - } - } -} - /// Blob metadata for eviction scoring. pub struct EvictionCandidate { pub cid: [u8; 32], @@ -282,6 +262,20 @@ impl Storage { updated_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author); + CREATE TABLE IF NOT EXISTS blob_upstream ( + cid BLOB PRIMARY KEY, + source_node_id BLOB NOT NULL, + source_addresses TEXT NOT NULL DEFAULT '[]', + stored_at INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS blob_downstream ( + cid BLOB NOT NULL, + peer_node_id BLOB NOT NULL, + peer_addresses TEXT NOT NULL DEFAULT '[]', + registered_at INTEGER NOT NULL, + PRIMARY KEY (cid, peer_node_id) + ); + CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid); CREATE TABLE IF NOT EXISTS group_keys ( group_id BLOB PRIMARY KEY, circle_name TEXT NOT NULL, @@ -332,6 +326,17 @@ impl Storage { last_seen_ms INTEGER NOT NULL, success_count INTEGER NOT NULL DEFAULT 0 ); + CREATE TABLE IF NOT EXISTS post_downstream ( + post_id BLOB NOT NULL, + peer_node_id BLOB NOT NULL, + registered_at INTEGER NOT NULL, + PRIMARY KEY (post_id, peer_node_id) + ); + CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id); + CREATE TABLE IF NOT EXISTS post_upstream ( + post_id BLOB PRIMARY KEY, + peer_node_id BLOB NOT NULL + ); CREATE TABLE IF NOT EXISTS blob_headers ( post_id BLOB PRIMARY KEY, author BLOB NOT NULL, @@ -384,17 +389,7 @@ impl Storage { CREATE TABLE IF NOT EXISTS seen_messages ( partner_id BLOB PRIMARY KEY, last_read_ms INTEGER NOT NULL DEFAULT 0 - ); - CREATE TABLE IF NOT EXISTS file_holders ( - file_id BLOB NOT NULL, - peer_id BLOB NOT NULL, - peer_addresses TEXT NOT NULL DEFAULT '[]', - last_interaction_ms INTEGER NOT NULL, - direction TEXT NOT NULL, - PRIMARY KEY (file_id, peer_id) - ); - CREATE INDEX IF NOT EXISTS idx_file_holders_recency - ON file_holders(file_id, last_interaction_ms DESC);", + );", )?; Ok(()) } @@ -548,6 +543,16 @@ impl Storage { )?; } + // Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration) + let has_blob_pref_tree = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_blob_pref_tree == 0 { + self.conn.execute_batch( + "ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';" + )?; + } + // Add public_visible column to profiles if missing (Phase D-4 migration) let has_public_visible = self.conn.prepare( "SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'" @@ -661,18 +666,25 @@ impl Storage { )?; } - // 0.6.1-beta: seed file_holders from legacy upstream/downstream tables - // before they're dropped. Idempotent — only fires on an empty - // file_holders table. - self.seed_file_holders_from_legacy()?; - - // 0.6.1-beta: drop legacy directional tables — replaced by file_holders. - self.conn.execute_batch( - "DROP TABLE IF EXISTS blob_upstream; - DROP TABLE IF EXISTS blob_downstream; - DROP TABLE IF EXISTS post_upstream; - DROP TABLE IF EXISTS post_downstream;", - )?; + // Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max) + let has_priority = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_priority == 0 { + self.conn.execute_batch( + "ALTER TABLE post_upstream RENAME TO post_upstream_old; + CREATE TABLE post_upstream ( + post_id BLOB NOT NULL, + peer_node_id BLOB NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + registered_at INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (post_id, peer_node_id) + ); + INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at) + SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old; + DROP TABLE post_upstream_old;" + )?; + } Ok(()) } @@ -2384,7 +2396,8 @@ impl Storage { params![record.post_id.as_slice(), record.author.as_slice()], )?; if deleted > 0 { - self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?; + self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; + self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?; } Ok(deleted > 0) @@ -3383,6 +3396,28 @@ impl Storage { Ok(()) } + /// Update the preferred_tree JSON for a blob upstream entry. + pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> { + let json = serde_json::to_string( + &tree.iter().map(hex::encode).collect::>() + )?; + self.conn.execute( + "UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2", + params![json, cid.as_slice()], + )?; + Ok(()) + } + + /// Get the preferred_tree for a blob upstream entry. + pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result> { + let json: String = self.conn.query_row( + "SELECT preferred_tree FROM blob_upstream WHERE cid = ?1", + params![cid.as_slice()], + |row| row.get(0), + ).unwrap_or_else(|_| "[]".to_string()); + Ok(parse_anchors_json(&json)) + } + // ---- Social Routes ---- /// Insert or update a social route entry. @@ -3854,10 +3889,10 @@ impl Storage { GROUP BY post_id ) r ON b.post_id = r.post_id LEFT JOIN ( - SELECT file_id, COUNT(*) as ds_count - FROM file_holders - GROUP BY file_id - ) d ON b.cid = d.file_id" + SELECT cid, COUNT(*) as ds_count + FROM blob_downstream + GROUP BY cid + ) d ON b.cid = d.cid" )?; let rows = stmt.query_map(params![cutoff], |row| { let cid_bytes: Vec = row.get(0)?; @@ -3911,10 +3946,11 @@ impl Storage { Ok(count as u64) } - /// Clean up all CDN metadata for a blob (manifests + file_holders). + /// Clean up all CDN metadata for a blob (manifests + upstream + downstream). pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> { self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?; + self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; + self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?; Ok(()) } @@ -3933,6 +3969,12 @@ impl Storage { Ok(cids) } + /// Remove upstream tracking for a blob CID. + pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> { + self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; + Ok(()) + } + pub fn post_count(&self) -> anyhow::Result { let count: i64 = self .conn @@ -3995,24 +4037,137 @@ impl Storage { Ok(result) } - /// Get CIDs of manifests older than a cutoff. Callers look up holders - /// via file_holders to pick a refresh source. - pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result> { + /// Record the upstream source for a blob CID. + pub fn store_blob_upstream( + &self, + cid: &[u8; 32], + source_node_id: &NodeId, + source_addresses: &[String], + ) -> anyhow::Result<()> { + let addrs_json = serde_json::to_string(source_addresses)?; + self.conn.execute( + "INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4", + params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()], + )?; + Ok(()) + } + + /// Get the upstream source for a blob CID: (node_id, addresses). + pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { + let result = self.conn.query_row( + "SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1", + params![cid.as_slice()], + |row| { + let nid_bytes: Vec = row.get(0)?; + let addrs_json: String = row.get(1)?; + Ok((nid_bytes, addrs_json)) + }, + ); + match result { + Ok((nid_bytes, addrs_json)) => { + let nid = blob_to_nodeid(nid_bytes)?; + let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); + Ok(Some((nid, addrs))) + } + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), + } + } + + /// Register a downstream peer for a blob CID. Returns false if already at 100 downstream. + pub fn add_blob_downstream( + &self, + cid: &[u8; 32], + peer_node_id: &NodeId, + peer_addresses: &[String], + ) -> anyhow::Result { + let count = self.get_blob_downstream_count(cid)?; + if count >= 100 { + return Ok(false); + } + let addrs_json = serde_json::to_string(peer_addresses)?; + self.conn.execute( + "INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4", + params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()], + )?; + Ok(true) + } + + /// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>. + pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { let mut stmt = self.conn.prepare( - "SELECT cid FROM cdn_manifests WHERE updated_at < ?1", + "SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1" + )?; + let rows = stmt.query_map(params![cid.as_slice()], |row| { + let nid_bytes: Vec = row.get(0)?; + let addrs_json: String = row.get(1)?; + Ok((nid_bytes, addrs_json)) + })?; + let mut result = Vec::new(); + for row in rows { + let (nid_bytes, addrs_json) = row?; + let nid = blob_to_nodeid(nid_bytes)?; + let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); + result.push((nid, addrs)); + } + Ok(result) + } + + /// Count downstream peers for a blob CID. + pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result { + let count: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1", + params![cid.as_slice()], + |row| row.get(0), + )?; + Ok(count as u32) + } + + /// Remove a downstream peer for a blob CID. + pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2", + params![cid.as_slice(), peer_node_id.as_slice()], + )?; + Ok(()) + } + + /// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>. + pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result)>> { + let mut stmt = self.conn.prepare( + "SELECT m.cid, u.source_node_id, u.source_addresses + FROM cdn_manifests m + LEFT JOIN blob_upstream u ON m.cid = u.cid + WHERE m.updated_at < ?1" )?; let rows = stmt.query_map(params![older_than_ms as i64], |row| { let cid_bytes: Vec = row.get(0)?; - Ok(cid_bytes) + let nid_bytes: Option> = row.get(1)?; + let addrs_json: Option = row.get(2)?; + Ok((cid_bytes, nid_bytes, addrs_json)) })?; - let mut out = Vec::new(); + let mut result = Vec::new(); for row in rows { - let cid_bytes = row?; - if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) { - out.push(cid); - } + let (cid_bytes, nid_bytes, addrs_json) = row?; + let cid: [u8; 32] = match cid_bytes.try_into() { + Ok(c) => c, + Err(_) => continue, + }; + let nid = match nid_bytes { + Some(b) => match blob_to_nodeid(b) { + Ok(n) => n, + Err(_) => continue, + }, + None => continue, + }; + let addrs: Vec = addrs_json + .map(|j| serde_json::from_str(&j).unwrap_or_default()) + .unwrap_or_default(); + result.push((cid, nid, addrs)); } - Ok(out) + Ok(result) } /// Get the 10 posts before and 10 posts after a reference timestamp for an author. @@ -4116,148 +4271,128 @@ impl Storage { Ok(result) } - // --- File holders (flat, per-file, LRU-capped at 5) --- - // - // A single table for PostId-keyed engagement propagation and CID-keyed - // manifest/blob propagation. Any 32-byte content-addressed file_id fits. + // --- Engagement: post_downstream --- - /// Upsert a holder for a file. Bumps last_interaction_ms to now and - /// enforces an LRU cap of 5 holders per file. - pub fn touch_file_holder( - &self, - file_id: &[u8; 32], - peer_id: &NodeId, - peer_addresses: &[String], - direction: HolderDirection, - ) -> anyhow::Result<()> { - let addrs_json = serde_json::to_string(peer_addresses)?; - let now = now_ms(); - let new_dir = direction.as_str(); - // Upsert. If the row exists with a different direction, promote to "both". - self.conn.execute( - "INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - VALUES (?1, ?2, ?3, ?4, ?5) - ON CONFLICT(file_id, peer_id) DO UPDATE SET - peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END, - last_interaction_ms = ?4, - direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END", - params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir], - )?; - // Enforce LRU cap of 5. Oldest get dropped. - self.conn.execute( - "DELETE FROM file_holders - WHERE file_id = ?1 - AND peer_id NOT IN ( - SELECT peer_id FROM file_holders - WHERE file_id = ?1 - ORDER BY last_interaction_ms DESC - LIMIT 5 - )", - params![file_id.as_slice()], - )?; - Ok(()) - } - - /// Count file holders (bounded at 5 by touch_file_holder's LRU cap). - pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result { + /// Register a peer as downstream for a post (max 100 per post). + /// Returns true if added, false if at capacity. + pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result { let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM file_holders WHERE file_id = ?1", - )?.query_row(params![file_id.as_slice()], |row| row.get(0))?; - Ok(count as u32) + "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" + )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; + if count >= 100 { + return Ok(false); + } + self.conn.execute( + "INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3) + ON CONFLICT DO NOTHING", + params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()], + )?; + Ok(true) } - /// Return the up-to-5 most recently interacted holders of a file. - pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result)>> { + /// Get all downstream peers for a post. + pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result> { let mut stmt = self.conn.prepare( - "SELECT peer_id, peer_addresses FROM file_holders - WHERE file_id = ?1 - ORDER BY last_interaction_ms DESC - LIMIT 5", + "SELECT peer_node_id FROM post_downstream WHERE post_id = ?1" )?; - let rows = stmt.query_map(params![file_id.as_slice()], |row| { - let peer_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((peer_bytes, addrs_json)) - })?; - let mut out = Vec::new(); + let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec>(0))?; + let mut result = Vec::new(); for row in rows { - let (peer_bytes, addrs_json) = row?; - if peer_bytes.len() != 32 { continue; } - let mut peer = [0u8; 32]; - peer.copy_from_slice(&peer_bytes); - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - out.push((NodeId::from(peer), addrs)); + if let Ok(nid) = blob_to_nodeid(row?) { + result.push(nid); + } } - Ok(out) + Ok(result) } - /// Remove all holders for a file (e.g. on post/blob deletion). - pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> { + /// Remove a downstream peer for a post. + pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { self.conn.execute( - "DELETE FROM file_holders WHERE file_id = ?1", - params![file_id.as_slice()], + "DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2", + params![post_id.as_slice(), peer_node_id.as_slice()], )?; Ok(()) } - /// Remove a single peer's holder entry for a file. - pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> { + // --- Engagement: post_upstream (multi-upstream, 3 max) --- + + /// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post. + pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> { + // Check current count + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1" + )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; + if count >= 3 { + return Ok(()); // Already at cap + } + let now = now_ms(); self.conn.execute( - "DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", - params![file_id.as_slice(), peer_id.as_slice()], + "INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at) + VALUES (?1, ?2, ?3, ?4)", + params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now], )?; Ok(()) } - /// One-time migration: seed file_holders from the legacy upstream/downstream - /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder - /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the - /// PRIMARY KEY. Skips tables that don't exist on fresh installs. - pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { - // Skip if file_holders already populated (idempotent re-run protection). - let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? - .query_row([], |row| row.get(0))?; - if existing > 0 { - return Ok(()); - } - let now = now_ms() as i64; - let table_exists = |name: &str| -> anyhow::Result { - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", - )?.query_row(params![name], |row| row.get(0))?; - Ok(count > 0) - }; - if table_exists("post_upstream")? { - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", - params![now], - )?; - } - if table_exists("post_downstream")? { - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", - params![now], - )?; - } - if table_exists("blob_upstream")? { - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", - params![now], - )?; - } - if table_exists("blob_downstream")? { - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", - params![now], - )?; + /// Get all upstream peers for a post, ordered by priority ASC (0 = primary). + pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC" + )?; + let rows = stmt.query_map(params![post_id.as_slice()], |row| { + let bytes: Vec = row.get(0)?; + let prio: i64 = row.get(1)?; + Ok((bytes, prio as u8)) + })?; + let mut result = Vec::new(); + for row in rows { + let (bytes, prio) = row?; + if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) { + result.push((nid, prio)); + } } + Ok(result) + } + + /// Get the primary (lowest priority) upstream peer for a post. + /// Backward-compatible wrapper for code that only needs a single upstream. + pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result> { + let upstreams = self.get_post_upstreams(post_id)?; + Ok(upstreams.into_iter().next().map(|(nid, _)| nid)) + } + + /// Remove a specific upstream peer for a post. + pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2", + params![post_id.as_slice(), peer_node_id.as_slice()], + )?; Ok(()) } + /// Promote an upstream peer to primary (priority 0), pushing others up. + pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { + // Shift all priorities up by 1 + self.conn.execute( + "UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1", + params![post_id.as_slice()], + )?; + // Set the promoted peer to priority 0 + self.conn.execute( + "UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2", + params![post_id.as_slice(), peer_node_id.as_slice()], + )?; + Ok(()) + } + + /// Count downstream peers for a post. + pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result { + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" + )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; + Ok(count as u32) + } + // --- Engagement: reactions --- /// Store a reaction (upsert by reactor+post_id+emoji). @@ -5166,6 +5301,60 @@ mod tests { assert_eq!(manifests[0].0, cid); } + #[test] + fn blob_upstream_crud() { + let s = temp_storage(); + let cid = [42u8; 32]; + let source = make_node_id(1); + let addrs = vec!["10.0.0.1:4433".to_string()]; + + s.store_blob_upstream(&cid, &source, &addrs).unwrap(); + let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap(); + assert_eq!(nid, source); + assert_eq!(got_addrs, addrs); + + // Missing + assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none()); + + // Update + let source2 = make_node_id(2); + s.store_blob_upstream(&cid, &source2, &[]).unwrap(); + let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap(); + assert_eq!(nid, source2); + } + + #[test] + fn blob_downstream_crud_and_limit() { + let s = temp_storage(); + let cid = [42u8; 32]; + + // Add downstream peers + for i in 0..100u8 { + let peer = make_node_id(i); + let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap(); + assert!(ok, "should accept peer {}", i); + } + + assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100); + + // 101st should be rejected + let peer_101 = make_node_id(200); + let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); + assert!(!ok, "should reject 101st downstream"); + + // Get all downstream + let downstream = s.get_blob_downstream(&cid).unwrap(); + assert_eq!(downstream.len(), 100); + + // Remove one + s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap(); + assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99); + + // Now adding one more should work + let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); + assert!(ok, "should accept after removal"); + } + #[test] fn blob_pin_unpin() { let s = temp_storage(); @@ -5249,15 +5438,18 @@ mod tests { let peer = make_node_id(2); s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap(); - s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap(); + s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); + s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_some()); - assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1); + assert!(s.get_blob_upstream(&cid).unwrap().is_some()); + assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1); s.cleanup_cdn_for_blob(&cid).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_none()); - assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0); + assert!(s.get_blob_upstream(&cid).unwrap().is_none()); + assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0); } #[test] @@ -5277,6 +5469,18 @@ mod tests { assert!(cids.contains(&cid2)); } + #[test] + fn remove_blob_upstream() { + let s = temp_storage(); + let cid = [42u8; 32]; + let peer = make_node_id(1); + + s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); + assert!(s.get_blob_upstream(&cid).unwrap().is_some()); + + s.remove_blob_upstream(&cid).unwrap(); + assert!(s.get_blob_upstream(&cid).unwrap().is_none()); + } #[test] fn author_post_neighborhood() { @@ -5636,6 +5840,24 @@ mod tests { assert_eq!(got2.preferred_tree.len(), 2); } + #[test] + fn blob_upstream_preferred_tree() { + let s = temp_storage(); + let cid = [42u8; 32]; + let source = make_node_id(1); + s.store_blob_upstream(&cid, &source, &[]).unwrap(); + + // Initially empty + let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap(); + assert!(tree.is_empty()); + + // Update + let nodes = vec![make_node_id(10), make_node_id(11)]; + s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap(); + let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap(); + assert_eq!(tree2.len(), 2); + } + // ---- Circle Profile tests ---- #[test] @@ -5836,39 +6058,32 @@ mod tests { // --- Engagement tests --- #[test] - fn file_holders_lru_cap() { + fn post_downstream_crud() { let s = temp_storage(); - let file = [42u8; 32]; - // Sleep between inserts so last_interaction_ms actually differs (ms resolution). - for i in 0..7u8 { - s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap(); - std::thread::sleep(std::time::Duration::from_millis(2)); - } - // Only 5 most-recent survive - assert_eq!(s.get_file_holder_count(&file).unwrap(), 5); - let holders = s.get_file_holders(&file).unwrap(); - assert_eq!(holders.len(), 5); - let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect(); - // Oldest two (i=0, i=1) got evicted; most recent (i=6) survives - assert!(!kept.contains(&make_node_id(0))); - assert!(!kept.contains(&make_node_id(1))); - assert!(kept.contains(&make_node_id(6))); - } + let post_id = make_post_id(1); + let peer1 = make_node_id(1); + let peer2 = make_node_id(2); - #[test] - fn file_holders_direction_promotion() { - let s = temp_storage(); - let file = [42u8; 32]; - let peer = make_node_id(1); - s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap(); - s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap(); - // Re-insert with opposite direction should promote to "both" - let dir: String = s.conn.query_row( - "SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", - rusqlite::params![file.as_slice(), peer.as_slice()], - |row| row.get(0), - ).unwrap(); - assert_eq!(dir, "both"); + // Add downstream peers + assert!(s.add_post_downstream(&post_id, &peer1).unwrap()); + assert!(s.add_post_downstream(&post_id, &peer2).unwrap()); + + let downstream = s.get_post_downstream(&post_id).unwrap(); + assert_eq!(downstream.len(), 2); + assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2); + + // Remove one + s.remove_post_downstream(&post_id, &peer1).unwrap(); + assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1); + + // Capacity limit + let big_post = make_post_id(99); + for i in 0..100u8 { + assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap()); + } + assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100); + // 101st should fail + assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap()); } #[test] diff --git a/crates/core/src/web.rs b/crates/core/src/web.rs index c946abb..b9fa902 100644 --- a/crates/core/src/web.rs +++ b/crates/core/src/web.rs @@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc, browse if let Some(author) = author_id { holders.push(author); } - if let Ok(file_holders) = store.get_file_holders(&post_id) { - for (peer, _addrs) in file_holders { + if let Ok(downstream) = store.get_post_downstream(&post_id) { + for peer in downstream { if !holders.contains(&peer) { holders.push(peer); }