Compare commits

..

No commits in common. "5d9ba224279c3452f218bffeb70f75d92ee7e791" and "921a0ec40a06aad7a6a870604182c368ce753f46" have entirely different histories.

6 changed files with 724 additions and 458 deletions

View file

@ -1393,12 +1393,8 @@ impl ConnectionManager {
{ {
let s = storage.get().await; let s = storage.get().await;
for pid in &new_post_ids { for pid in &new_post_ids {
let _ = s.touch_file_holder( let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
pid, let _ = s.add_post_upstream(pid, peer_id, prio);
peer_id,
&[],
crate::storage::HolderDirection::Received,
);
} }
for author in &synced_authors { for author in &synced_authors {
let _ = s.update_follow_last_sync(author, now_ms); let _ = s.update_follow_last_sync(author, now_ms);
@ -1944,12 +1940,8 @@ impl ConnectionManager {
{ {
let storage = self.storage.get().await; let storage = self.storage.get().await;
for pid in &new_post_ids { for pid in &new_post_ids {
let _ = storage.touch_file_holder( let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
pid, let _ = storage.add_post_upstream(pid, from, prio);
from,
&[],
crate::storage::HolderDirection::Received,
);
} }
for author in &synced_authors { for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms); let _ = storage.update_follow_last_sync(author, now_ms);
@ -2042,12 +2034,8 @@ impl ConnectionManager {
{ {
let storage = self.storage.get().await; let storage = self.storage.get().await;
for pid in &new_post_ids { for pid in &new_post_ids {
let _ = storage.touch_file_holder( let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
pid, let _ = storage.add_post_upstream(pid, peer_id, prio);
peer_id,
&[],
crate::storage::HolderDirection::Received,
);
} }
for author in &synced_authors { for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms); let _ = storage.update_follow_last_sync(author, now_ms);
@ -2822,9 +2810,13 @@ impl ConnectionManager {
if store.get_post_with_visibility(post_id).ok().flatten().is_some() { if store.get_post_with_visibility(post_id).ok().flatten().is_some() {
Some(self.our_node_id) Some(self.our_node_id)
} else { } else {
// Any known holder of this post? // CDN tree: do any of our downstream hosts have it?
let holders = store.get_file_holders(post_id).unwrap_or_default(); let downstream = store.get_post_downstream(post_id).unwrap_or_default();
holders.first().map(|(nid, _)| *nid) if !downstream.is_empty() {
Some(downstream[0])
} else {
None
}
} }
}; };
post_holder = found; post_holder = found;
@ -2838,9 +2830,9 @@ impl ConnectionManager {
// Check CDN: do we know who has it via blob post ownership? // Check CDN: do we know who has it via blob post ownership?
let store = self.storage.get().await; let store = self.storage.get().await;
if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) { if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) {
let holders = store.get_file_holders(&pid).unwrap_or_default(); let downstream = store.get_post_downstream(&pid).unwrap_or_default();
if let Some((nid, _)) = holders.first() { if !downstream.is_empty() {
blob_holder = Some(*nid); blob_holder = Some(downstream[0]);
} }
} }
} }
@ -4879,7 +4871,7 @@ impl ConnectionManager {
let cm = conn_mgr.lock().await; let cm = conn_mgr.lock().await;
// Collect blob CIDs + CDN peers before async work // Collect blob CIDs + CDN peers before async work
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>)> = Vec::new(); let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = Vec::new();
{ {
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
for dr in &payload.records { for dr in &payload.records {
@ -4887,8 +4879,9 @@ impl ConnectionManager {
// Collect blobs for CDN cleanup before deleting // Collect blobs for CDN cleanup before deleting
let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default(); let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default();
for cid in blob_cids { for cid in blob_cids {
let holders = storage.get_file_holders(&cid).unwrap_or_default(); let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
blob_cleanup.push((cid, holders)); let upstream = storage.get_blob_upstream(&cid).ok().flatten();
blob_cleanup.push((cid, downstream, upstream));
} }
let _ = storage.store_delete(dr); let _ = storage.store_delete(dr);
let _ = storage.apply_delete(dr); let _ = storage.apply_delete(dr);
@ -4904,11 +4897,18 @@ impl ConnectionManager {
// Gather connections for CDN delete notices under lock, then send outside // Gather connections for CDN delete notices under lock, then send outside
let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new(); let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new();
for (cid, holders) in &blob_cleanup { for (cid, downstream, upstream) in &blob_cleanup {
let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None }; let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() });
for (peer, _addrs) in holders { let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info };
if let Some(pc) = cm.connections_ref().get(peer) { for (ds_nid, _) in downstream {
delete_notices.push((pc.connection.clone(), payload.clone())); if let Some(pc) = cm.connections_ref().get(ds_nid) {
delete_notices.push((pc.connection.clone(), ds_payload.clone()));
}
}
if let Some((up_nid, _)) = upstream {
let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
if let Some(pc) = cm.connections_ref().get(up_nid) {
delete_notices.push((pc.connection.clone(), up_payload));
} }
} }
} }
@ -4958,38 +4958,24 @@ impl ConnectionManager {
} }
MessageType::PostPush => { MessageType::PostPush => {
let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?; let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?;
// Encrypted posts are no longer accepted via direct push — they propagate let cm = conn_mgr.lock().await;
// via the CDN to eliminate the sender→recipient traffic signal. let storage = cm.storage.get().await;
if !matches!(push.post.visibility, crate::types::PostVisibility::Public) { if !storage.is_deleted(&push.post.id)?
debug!( && storage.get_post(&push.post.id)?.is_none()
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
{
let _ = storage.store_post_with_visibility(
&push.post.id,
&push.post.post,
&push.post.visibility,
);
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
info!(
peer = hex::encode(remote_node_id), peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id), post_id = hex::encode(push.post.id),
"Ignoring non-public PostPush" "Received direct post push"
); );
} else {
let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await;
if !storage.is_deleted(&push.post.id)?
&& storage.get_post(&push.post.id)?.is_none()
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
{
let _ = storage.store_post_with_visibility(
&push.post.id,
&push.post.post,
&push.post.visibility,
);
let _ = storage.touch_file_holder(
&push.post.id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
info!(
peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id),
"Received direct post push"
);
}
} }
} }
MessageType::AudienceRequest => { MessageType::AudienceRequest => {
@ -5077,24 +5063,17 @@ impl ConnectionManager {
&entry.manifest.author_manifest.author, &entry.manifest.author_manifest.author,
entry.manifest.author_manifest.updated_at, entry.manifest.author_manifest.updated_at,
); );
// Remote peer pushed us this manifest → they hold the file.
let _ = storage.touch_file_holder(
&entry.cid,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
stored_entries.push(entry.clone()); stored_entries.push(entry.clone());
} }
// Gather file holders for relay before dropping locks // Gather downstream peers for relay before dropping locks
let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new(); let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new();
for entry in &stored_entries { for entry in &stored_entries {
let holders = storage.get_file_holders(&entry.cid).unwrap_or_default(); let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default();
for (peer, _addrs) in holders { for (ds_nid, _) in downstream {
if peer == remote_node_id { if ds_nid == remote_node_id {
continue; continue;
} }
relay_targets.push((peer, crate::protocol::ManifestPushPayload { relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload {
manifests: vec![entry.clone()], manifests: vec![entry.clone()],
})); }));
} }
@ -5197,12 +5176,8 @@ impl ConnectionManager {
let cm = cm_arc.lock().await; let cm = cm_arc.lock().await;
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
let _ = storage.touch_file_holder( let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
&sync_post.id, let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
&sender_id,
&[],
crate::storage::HolderDirection::Received,
);
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@ -5293,14 +5268,32 @@ impl ConnectionManager {
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
let cid = payload.cid; let cid = payload.cid;
// Flat-holder model: drop the sender as a holder of this file. // Check if sender was our upstream for this blob
// The author's DeleteRecord (separate signed message) is what let was_upstream = storage.get_blob_upstream(&cid).ok().flatten()
// triggers the actual blob removal for followers. .map(|(nid, _)| nid == remote_node_id)
let _ = storage.remove_file_holder(&cid, &remote_node_id); .unwrap_or(false);
if was_upstream {
// Sender was our upstream — clear it
let _ = storage.remove_blob_upstream(&cid);
// If they provided their upstream, store it as our new upstream
if let Some(ref new_up) = payload.upstream_node {
if let Ok(nid_bytes) = hex::decode(&new_up.n) {
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a);
}
}
}
} else {
// Sender was our downstream — remove them
let _ = storage.remove_blob_downstream(&cid, &remote_node_id);
}
info!( info!(
peer = hex::encode(remote_node_id), peer = hex::encode(remote_node_id),
cid = hex::encode(cid), cid = hex::encode(cid),
was_upstream,
"Received blob delete notice" "Received blob delete notice"
); );
} }
@ -5444,12 +5437,7 @@ impl ConnectionManager {
let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?; let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?;
let cm = conn_mgr.lock().await; let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
let _ = storage.touch_file_holder( let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id);
&payload.post_id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Sent,
);
drop(storage); drop(storage);
trace!( trace!(
peer = hex::encode(remote_node_id), peer = hex::encode(remote_node_id),
@ -5704,28 +5692,15 @@ impl ConnectionManager {
let storage = storage.get().await; let storage = storage.get().await;
let manifest: Option<crate::types::CdnManifest> = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| { let manifest: Option<crate::types::CdnManifest> = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| {
if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) { if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0);
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
} else { serde_json::from_str(&json).ok() } } else { serde_json::from_str(&json).ok() }
}); });
let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() {
let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0); let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false);
let _ = storage.touch_file_holder( if ok { (true, vec![]) } else {
&payload.cid, let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default();
&remote_node_id, let redirects: Vec<PeerWithAddress> = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect();
&payload.requester_addresses,
crate::storage::HolderDirection::Sent,
);
// If we already had 5 holders before adding this one, the
// requester should consult them too for CDN lookups.
if prior_count < 5 {
(true, vec![])
} else {
let holders = storage.get_file_holders(&payload.cid).unwrap_or_default();
let redirects: Vec<PeerWithAddress> = holders.into_iter()
.filter(|(nid, _)| *nid != remote_node_id)
.map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs })
.collect();
(false, redirects) (false, redirects)
} }
} else { (false, vec![]) }; } else { (false, vec![]) };
@ -5752,7 +5727,7 @@ impl ConnectionManager {
Some(json) => { Some(json) => {
let manifest = if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) { let manifest = if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
if am.updated_at > payload.current_updated_at { if am.updated_at > payload.current_updated_at {
let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0); let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0);
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count }) Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
} else { None } } else { None }
} else { None }; } else { None };
@ -6097,14 +6072,9 @@ impl ConnectionManager {
to_pull.push(*pid); to_pull.push(*pid);
} }
// Register as holder for all accepted posts // Register as downstream for all accepted posts
for pid in &acc { for pid in &acc {
let _ = storage.touch_file_holder( let _ = storage.add_post_downstream(pid, &remote_node_id);
pid,
&remote_node_id,
&[],
crate::storage::HolderDirection::Sent,
);
} }
(acc, rej, to_pull) (acc, rej, to_pull)
@ -6155,12 +6125,8 @@ impl ConnectionManager {
let cm = cm_arc.lock().await; let cm = cm_arc.lock().await;
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let _ = storage.touch_file_holder( let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
&sp.id, let _ = storage.add_post_upstream(&sp.id, &sender, prio);
&sender,
&[],
crate::storage::HolderDirection::Received,
);
let blob_store = cm.blob_store.clone(); let blob_store = cm.blob_store.clone();
drop(storage); drop(storage);
drop(cm); drop(cm);
@ -6187,12 +6153,7 @@ impl ConnectionManager {
let cm = cm_arc.lock().await; let cm = cm_arc.lock().await;
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
let _ = storage.touch_file_holder( let _ = storage.add_post_upstream(&att.cid, &sender, 0);
&att.cid,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
} }
Ok(()) Ok(())
}.await; }.await;
@ -6217,14 +6178,12 @@ impl ConnectionManager {
Ok(()) Ok(())
} }
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate /// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream.
/// to the post's file_holders (flat set, up to 5 most recent).
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) { async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
use crate::types::BlobHeaderDiffOp; use crate::types::BlobHeaderDiffOp;
// Gather policy + audience data + holders, then drop lock immediately. // Gather policy + audience data, then drop lock immediately
// Remote peer clearly holds this post — record them as a holder. let (policy, approved_audience, downstream, upstreams) = {
let (policy, approved_audience, holders) = {
let storage = self.storage.get().await; let storage = self.storage.get().await;
let policy = storage.get_comment_policy(&payload.post_id) let policy = storage.get_comment_policy(&payload.post_id)
.ok() .ok()
@ -6234,18 +6193,13 @@ impl ConnectionManager {
crate::types::AudienceDirection::Inbound, crate::types::AudienceDirection::Inbound,
Some(crate::types::AudienceStatus::Approved), Some(crate::types::AudienceStatus::Approved),
).unwrap_or_default(); ).unwrap_or_default();
let _ = storage.touch_file_holder( let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default();
&payload.post_id, let upstreams: Vec<NodeId> = storage.get_post_upstreams(&payload.post_id)
&sender,
&[],
crate::storage::HolderDirection::Received,
);
let holders: Vec<NodeId> = storage.get_file_holders(&payload.post_id)
.unwrap_or_default() .unwrap_or_default()
.into_iter() .into_iter()
.map(|(nid, _addrs)| nid) .map(|(nid, _)| nid)
.collect(); .collect();
(policy, approved, holders) (policy, approved, downstream, upstreams)
}; };
// Filter ops using gathered data (no lock held) // Filter ops using gathered data (no lock held)
@ -6427,16 +6381,26 @@ impl ConnectionManager {
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms); let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
} }
// Re-propagate to all file holders (flat set, max 5). Exclude sender. // Collect all targets (downstream + all upstreams), then send in a single batched task
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new(); let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
for peer_id in &holders { for peer_id in downstream {
if *peer_id == sender { continue; } if peer_id == sender { continue; }
if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone()) if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone())) .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone()))
{ {
targets.push(conn); targets.push(conn);
} }
} }
// Phase 6: Try all upstreams, not just one
for up in &upstreams {
if *up != sender {
if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone()))
{
targets.push(conn);
}
}
}
if !targets.is_empty() { if !targets.is_empty() {
let payload_clone = payload.clone(); let payload_clone = payload.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -7720,8 +7684,8 @@ impl ConnectionActor {
if s.get_post_with_visibility(post_id).ok().flatten().is_some() { if s.get_post_with_visibility(post_id).ok().flatten().is_some() {
post_holder = Some(ctx.our_node_id); post_holder = Some(ctx.our_node_id);
} else { } else {
let holders = s.get_file_holders(post_id).unwrap_or_default(); let downstream = s.get_post_downstream(post_id).unwrap_or_default();
if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); } if !downstream.is_empty() { post_holder = Some(downstream[0]); }
} }
} }
@ -7731,8 +7695,8 @@ impl ConnectionActor {
} else { } else {
let s = ctx.storage.get().await; let s = ctx.storage.get().await;
if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) { if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) {
let holders = s.get_file_holders(&pid).unwrap_or_default(); let downstream = s.get_post_downstream(&pid).unwrap_or_default();
if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); } if !downstream.is_empty() { blob_holder = Some(downstream[0]); }
} }
} }
} }

View file

@ -378,11 +378,7 @@ async fn try_redirect(
Ok(Some((_, PostVisibility::Public))) => {} Ok(Some((_, PostVisibility::Public))) => {}
_ => return false, // not found or not public — hard close _ => return false, // not found or not public — hard close
} }
store.get_file_holders(post_id) store.get_post_downstream(post_id).unwrap_or_default()
.unwrap_or_default()
.into_iter()
.map(|(nid, _addrs)| nid)
.collect::<Vec<_>>()
}; };
// Get addresses for downstream peers // Get addresses for downstream peers

View file

@ -902,6 +902,50 @@ impl Network {
self.send_to_audience(MessageType::PostNotification, &payload).await self.send_to_audience(MessageType::PostNotification, &payload).await
} }
/// Push a full post directly to recipients (persistent if available, ephemeral otherwise).
pub async fn push_post_to_recipients(
&self,
post_id: &crate::types::PostId,
post: &Post,
visibility: &PostVisibility,
) -> usize {
let recipients: Vec<NodeId> = match visibility {
PostVisibility::Public => return 0,
PostVisibility::Encrypted { recipients } => {
recipients.iter().map(|wk| wk.recipient).collect()
}
PostVisibility::GroupEncrypted { group_id, .. } => {
// Push to all group members
match self.storage.get().await.get_all_group_members() {
Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(),
Err(_) => return 0,
}
}
};
let payload = PostPushPayload {
post: SyncPost {
id: *post_id,
post: post.clone(),
visibility: visibility.clone(),
},
};
let mut pushed = 0;
for recipient in &recipients {
if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() {
pushed += 1;
debug!(
recipient = hex::encode(recipient),
post_id = hex::encode(post_id),
"Pushed post to recipient"
);
}
}
pushed
}
/// Push a profile update to all audience members (ephemeral-capable). /// Push a profile update to all audience members (ephemeral-capable).
pub async fn push_profile(&self, profile: &PublicProfile) -> usize { pub async fn push_profile(&self, profile: &PublicProfile) -> usize {
// Sanitize: if public_visible=false, strip display_name/bio from pushed profile // Sanitize: if public_visible=false, strip display_name/bio from pushed profile
@ -1015,16 +1059,15 @@ impl Network {
sent sent
} }
/// Push an updated manifest to all known holders of the file (flat set, /// Push updated manifests to all downstream peers for a given CID.
/// up to 5 most-recent). Replaces the legacy downstream-tree push.
pub async fn push_manifest_to_downstream( pub async fn push_manifest_to_downstream(
&self, &self,
cid: &[u8; 32], cid: &[u8; 32],
manifest: &crate::types::CdnManifest, manifest: &crate::types::CdnManifest,
) -> usize { ) -> usize {
let holders = { let downstream = {
let storage = self.storage.get().await; let storage = self.storage.get().await;
storage.get_file_holders(cid).unwrap_or_default() storage.get_blob_downstream(cid).unwrap_or_default()
}; };
let payload = crate::protocol::ManifestPushPayload { let payload = crate::protocol::ManifestPushPayload {
manifests: vec![crate::protocol::ManifestPushEntry { manifests: vec![crate::protocol::ManifestPushEntry {
@ -1033,40 +1076,54 @@ impl Network {
}], }],
}; };
let mut sent = 0; let mut sent = 0;
for (peer, peer_addrs) in &holders { for (ds_nid, _) in &downstream {
if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() { if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() {
sent += 1; sent += 1;
let storage = self.storage.get().await;
let _ = storage.touch_file_holder(
cid,
peer,
peer_addrs,
crate::storage::HolderDirection::Sent,
);
} }
} }
sent sent
} }
/// Send blob delete notices to all known holders of a file. /// Send blob delete notices to downstream and upstream peers.
/// Second argument kept as Option for signature stability; flat-holder /// Downstream peers receive our upstream info for tree healing.
/// model doesn't need separate upstream handling. /// Upstream peers receive no upstream info (just "remove me as downstream").
pub async fn send_blob_delete_notices( pub async fn send_blob_delete_notices(
&self, &self,
cid: &[u8; 32], cid: &[u8; 32],
holders: &[(NodeId, Vec<String>)], downstream: &[(NodeId, Vec<String>)],
_legacy_upstream: Option<&(NodeId, Vec<String>)>, upstream: Option<&(NodeId, Vec<String>)>,
) -> usize { ) -> usize {
let payload = crate::protocol::BlobDeleteNoticePayload { let upstream_info = upstream.map(|(nid, addrs)| {
cid: *cid, crate::types::PeerWithAddress {
upstream_node: None, n: hex::encode(nid),
}; a: addrs.clone(),
}
});
let mut sent = 0; let mut sent = 0;
for (peer, _addrs) in holders {
if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() { // Notify downstream (with upstream info for tree healing)
let ds_payload = crate::protocol::BlobDeleteNoticePayload {
cid: *cid,
upstream_node: upstream_info,
};
for (ds_nid, _) in downstream {
if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() {
sent += 1; sent += 1;
} }
} }
// Notify upstream (no upstream info)
if let Some((up_nid, _)) = upstream {
let up_payload = crate::protocol::BlobDeleteNoticePayload {
cid: *cid,
upstream_node: None,
};
if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() {
sent += 1;
}
}
sent sent
} }
@ -2299,24 +2356,24 @@ impl Network {
self.endpoint.close().await; self.endpoint.close().await;
} }
/// Propagate an engagement diff to all known holders of a post (flat set, /// Propagate an engagement diff to all downstream holders of a post (CDN tree).
/// up to 5 most-recent). Excludes the sender to avoid loops. /// Excludes the sender to avoid loops.
pub async fn propagate_engagement_diff( pub async fn propagate_engagement_diff(
&self, &self,
post_id: &crate::types::PostId, post_id: &crate::types::PostId,
payload: &crate::protocol::BlobHeaderDiffPayload, payload: &crate::protocol::BlobHeaderDiffPayload,
exclude_peer: &crate::types::NodeId, exclude_peer: &crate::types::NodeId,
) -> usize { ) -> usize {
let holders = { let downstream = {
let storage = self.storage.get().await; let storage = self.storage.get().await;
storage.get_file_holders(post_id).unwrap_or_default() storage.get_post_downstream(post_id).unwrap_or_default()
}; };
let mut sent = 0; let mut sent = 0;
for (peer, _addrs) in &holders { for ds_nid in &downstream {
if peer == exclude_peer { if ds_nid == exclude_peer {
continue; continue;
} }
if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() { if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() {
sent += 1; sent += 1;
} }
} }

View file

@ -836,12 +836,13 @@ impl Node {
} }
} }
// For public posts, push to audience members. Encrypted posts propagate // For encrypted posts, push directly to recipients
// via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await;
// traffic signal.
// For public posts, push to audience members
let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await; let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await;
info!(post_id = hex::encode(post_id), audience_pushed, "Created new post"); info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post");
Ok((post_id, post, visibility)) Ok((post_id, post, visibility))
} }
@ -1350,12 +1351,7 @@ impl Node {
let source_addrs: Vec<String> = response.manifest.as_ref() let source_addrs: Vec<String> = response.manifest.as_ref()
.map(|m| m.host_addresses.clone()) .map(|m| m.host_addresses.clone())
.unwrap_or_default(); .unwrap_or_default();
let _ = storage.touch_file_holder( let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
cid,
from_peer,
&source_addrs,
crate::storage::HolderDirection::Received,
);
} }
Ok(data) Ok(data)
} }
@ -1383,17 +1379,16 @@ impl Node {
// Collect redirect peers from responses in case we need them later // Collect redirect peers from responses in case we need them later
let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new(); let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new();
// 2. Try known holders (up to 5 most-recent peers we've interacted // 2. Try existing upstream (if we previously fetched this blob)
// with about this file). let upstream = {
let known_holders = {
let storage = self.storage.get().await; let storage = self.storage.get().await;
storage.get_file_holders(cid).unwrap_or_default() storage.get_blob_upstream(cid)?
}; };
for (holder_nid, _addrs) in &known_holders { if let Some((upstream_nid, _upstream_addrs)) = upstream {
match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await { match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)), Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {} Ok(None) => {}
Err(e) => warn!(error = %e, "blob fetch from known holder failed"), Err(e) => warn!(error = %e, "blob fetch from upstream failed"),
} }
} }
@ -1418,12 +1413,7 @@ impl Node {
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at); let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
} }
} }
let _ = storage.touch_file_holder( let _ = storage.store_blob_upstream(cid, &lateral, &[]);
cid,
&lateral,
&[],
crate::storage::HolderDirection::Received,
);
return Ok(Some(data)); return Ok(Some(data));
} }
Ok((None, response)) => { Ok((None, response)) => {
@ -1991,13 +1981,14 @@ impl Node {
signature, signature,
}; };
// Collect blob CIDs + known holders before cleanup (for delete notices) // Collect blob CIDs + CDN peers before cleanup
let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = { let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = {
let storage = self.storage.get().await; let storage = self.storage.get().await;
let cids = storage.get_blobs_for_post(post_id).unwrap_or_default(); let cids = storage.get_blobs_for_post(post_id).unwrap_or_default();
cids.into_iter().map(|cid| { cids.into_iter().map(|cid| {
let holders = storage.get_file_holders(&cid).unwrap_or_default(); let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
(cid, holders, None::<(NodeId, Vec<String>)>) let upstream = storage.get_blob_upstream(&cid).ok().flatten();
(cid, downstream, upstream)
}).collect() }).collect()
}; };
@ -2117,10 +2108,12 @@ impl Node {
storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?; storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?;
} }
// delete_post already pushes the DeleteRecord. // delete_post already pushes the DeleteRecord
// Replacement post propagates via the CDN to remaining recipients.
self.delete_post(post_id).await?; self.delete_post(post_id).await?;
// Push replacement post directly to remaining recipients
self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await;
info!( info!(
old_id = hex::encode(post_id), old_id = hex::encode(post_id),
new_id = hex::encode(new_post_id), new_id = hex::encode(new_post_id),
@ -3093,27 +3086,20 @@ impl Node {
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
.as_millis() as u64 - max_age_ms; .as_millis() as u64 - max_age_ms;
let stale_cids = { let stale = {
let s = storage.get().await; let s = storage.get().await;
s.get_stale_manifest_cids(cutoff).unwrap_or_default() s.get_stale_manifests(cutoff).unwrap_or_default()
}; };
for cid in &stale_cids { for (cid, upstream_nid, _upstream_addrs) in &stale {
// Get current updated_at + pick a holder to refresh from // Get current updated_at for this manifest
let (current_updated_at, refresh_source) = { let current_updated_at = {
let s = storage.get().await; let s = storage.get().await;
let updated_at = s.get_cdn_manifest(cid).ok().flatten() s.get_cdn_manifest(cid).ok().flatten()
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok()) .and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
.map(|m| m.updated_at) .map(|m| m.updated_at)
.unwrap_or(0); .unwrap_or(0)
let source = s.get_file_holders(cid)
.unwrap_or_default()
.into_iter()
.next()
.map(|(nid, _)| nid);
(updated_at, source)
}; };
let Some(upstream_nid) = refresh_source else { continue; }; match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await {
match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await {
Ok(Some(cdn_manifest)) => { Ok(Some(cdn_manifest)) => {
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
@ -3124,10 +3110,10 @@ impl Node {
&cdn_manifest.author_manifest.author, &cdn_manifest.author_manifest.author,
cdn_manifest.author_manifest.updated_at, cdn_manifest.author_manifest.updated_at,
); );
// Relay to known holders (flat set) // Relay to our downstream
let holders = s.get_file_holders(cid).unwrap_or_default(); let downstream = s.get_blob_downstream(cid).unwrap_or_default();
drop(s); drop(s);
if !holders.is_empty() { if !downstream.is_empty() {
network.push_manifest_to_downstream(cid, &cdn_manifest).await; network.push_manifest_to_downstream(cid, &cdn_manifest).await;
} }
tracing::debug!( tracing::debug!(
@ -3140,7 +3126,7 @@ impl Node {
Err(e) => { Err(e) => {
tracing::debug!( tracing::debug!(
cid = hex::encode(cid), cid = hex::encode(cid),
upstream = hex::encode(&upstream_nid), upstream = hex::encode(upstream_nid),
error = %e, error = %e,
"Manifest refresh from upstream failed" "Manifest refresh from upstream failed"
); );
@ -3291,16 +3277,18 @@ impl Node {
compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms) compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms)
} }
/// Delete a blob with CDN notifications to known holders. /// Delete a blob with CDN notifications to upstream/downstream.
pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> { pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
// Gather known holders before cleanup // Gather CDN peers before cleanup
let holders = { let (downstream, upstream) = {
let storage = self.storage.get().await; let storage = self.storage.get().await;
storage.get_file_holders(cid).unwrap_or_default() let ds = storage.get_blob_downstream(cid).unwrap_or_default();
let up = storage.get_blob_upstream(cid).ok().flatten();
(ds, up)
}; };
// Send CDN delete notices to all holders // Send CDN delete notices
self.network.send_blob_delete_notices(cid, &holders, None).await; self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await;
// Clean up local storage // Clean up local storage
{ {
@ -3588,9 +3576,15 @@ impl Node {
ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())], ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())],
timestamp_ms: now, timestamp_ms: now,
}; };
// propagate_engagement_diff targets all file_holders (flat set, max 5)
// which already subsumes what used to be upstream + downstream.
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
} }
Ok(reaction) Ok(reaction)
@ -3697,6 +3691,14 @@ impl Node {
timestamp_ms: now, timestamp_ms: now,
}; };
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
} }
Ok(comment) Ok(comment)
@ -3733,6 +3735,14 @@ impl Node {
timestamp_ms: now, timestamp_ms: now,
}; };
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
} }
Ok(()) Ok(())
} }
@ -3766,6 +3776,14 @@ impl Node {
timestamp_ms: now, timestamp_ms: now,
}; };
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
} }
Ok(()) Ok(())
} }
@ -3987,6 +4005,14 @@ impl Node {
timestamp_ms: now, timestamp_ms: now,
}; };
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
Ok(()) Ok(())
} }
@ -4101,6 +4127,14 @@ impl Node {
timestamp_ms: now, timestamp_ms: now,
}; };
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
Ok(()) Ok(())
} }
@ -4333,10 +4367,10 @@ impl Node {
} }
}; };
// Filter to under-replicated (< 2 holders) // Filter to under-replicated (< 2 downstream)
let mut needs_replication = Vec::new(); let mut needs_replication = Vec::new();
for pid in &recent_ids { for pid in &recent_ids {
match storage.get_file_holder_count(pid) { match storage.get_post_downstream_count(pid) {
Ok(count) if count < 2 => { Ok(count) if count < 2 => {
needs_replication.push(*pid); needs_replication.push(*pid);
} }

View file

@ -12,26 +12,6 @@ use crate::types::{
VisibilityIntent, VisibilityIntent,
}; };
/// Direction for file_holders entries: whether we sent the file to this peer,
/// received it from them, or both. Not load-bearing for propagation decisions —
/// any holder can serve as a diff target — but retained for potential reuse.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HolderDirection {
Sent,
Received,
Both,
}
impl HolderDirection {
pub fn as_str(&self) -> &'static str {
match self {
HolderDirection::Sent => "sent",
HolderDirection::Received => "received",
HolderDirection::Both => "both",
}
}
}
/// Blob metadata for eviction scoring. /// Blob metadata for eviction scoring.
pub struct EvictionCandidate { pub struct EvictionCandidate {
pub cid: [u8; 32], pub cid: [u8; 32],
@ -282,6 +262,20 @@ impl Storage {
updated_at INTEGER NOT NULL updated_at INTEGER NOT NULL
); );
CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author); CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author);
CREATE TABLE IF NOT EXISTS blob_upstream (
cid BLOB PRIMARY KEY,
source_node_id BLOB NOT NULL,
source_addresses TEXT NOT NULL DEFAULT '[]',
stored_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_downstream (
cid BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
peer_addresses TEXT NOT NULL DEFAULT '[]',
registered_at INTEGER NOT NULL,
PRIMARY KEY (cid, peer_node_id)
);
CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid);
CREATE TABLE IF NOT EXISTS group_keys ( CREATE TABLE IF NOT EXISTS group_keys (
group_id BLOB PRIMARY KEY, group_id BLOB PRIMARY KEY,
circle_name TEXT NOT NULL, circle_name TEXT NOT NULL,
@ -332,6 +326,17 @@ impl Storage {
last_seen_ms INTEGER NOT NULL, last_seen_ms INTEGER NOT NULL,
success_count INTEGER NOT NULL DEFAULT 0 success_count INTEGER NOT NULL DEFAULT 0
); );
CREATE TABLE IF NOT EXISTS post_downstream (
post_id BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
registered_at INTEGER NOT NULL,
PRIMARY KEY (post_id, peer_node_id)
);
CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id);
CREATE TABLE IF NOT EXISTS post_upstream (
post_id BLOB PRIMARY KEY,
peer_node_id BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_headers ( CREATE TABLE IF NOT EXISTS blob_headers (
post_id BLOB PRIMARY KEY, post_id BLOB PRIMARY KEY,
author BLOB NOT NULL, author BLOB NOT NULL,
@ -384,17 +389,7 @@ impl Storage {
CREATE TABLE IF NOT EXISTS seen_messages ( CREATE TABLE IF NOT EXISTS seen_messages (
partner_id BLOB PRIMARY KEY, partner_id BLOB PRIMARY KEY,
last_read_ms INTEGER NOT NULL DEFAULT 0 last_read_ms INTEGER NOT NULL DEFAULT 0
); );",
CREATE TABLE IF NOT EXISTS file_holders (
file_id BLOB NOT NULL,
peer_id BLOB NOT NULL,
peer_addresses TEXT NOT NULL DEFAULT '[]',
last_interaction_ms INTEGER NOT NULL,
direction TEXT NOT NULL,
PRIMARY KEY (file_id, peer_id)
);
CREATE INDEX IF NOT EXISTS idx_file_holders_recency
ON file_holders(file_id, last_interaction_ms DESC);",
)?; )?;
Ok(()) Ok(())
} }
@ -548,6 +543,16 @@ impl Storage {
)?; )?;
} }
// Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration)
let has_blob_pref_tree = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_blob_pref_tree == 0 {
self.conn.execute_batch(
"ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';"
)?;
}
// Add public_visible column to profiles if missing (Phase D-4 migration) // Add public_visible column to profiles if missing (Phase D-4 migration)
let has_public_visible = self.conn.prepare( let has_public_visible = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'" "SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'"
@ -661,18 +666,25 @@ impl Storage {
)?; )?;
} }
// 0.6.1-beta: seed file_holders from legacy upstream/downstream tables // Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max)
// before they're dropped. Idempotent — only fires on an empty let has_priority = self.conn.prepare(
// file_holders table. "SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'"
self.seed_file_holders_from_legacy()?; )?.query_row([], |row| row.get::<_, i64>(0))?;
if has_priority == 0 {
// 0.6.1-beta: drop legacy directional tables — replaced by file_holders. self.conn.execute_batch(
self.conn.execute_batch( "ALTER TABLE post_upstream RENAME TO post_upstream_old;
"DROP TABLE IF EXISTS blob_upstream; CREATE TABLE post_upstream (
DROP TABLE IF EXISTS blob_downstream; post_id BLOB NOT NULL,
DROP TABLE IF EXISTS post_upstream; peer_node_id BLOB NOT NULL,
DROP TABLE IF EXISTS post_downstream;", priority INTEGER NOT NULL DEFAULT 0,
)?; registered_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (post_id, peer_node_id)
);
INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at)
SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old;
DROP TABLE post_upstream_old;"
)?;
}
Ok(()) Ok(())
} }
@ -2384,7 +2396,8 @@ impl Storage {
params![record.post_id.as_slice(), record.author.as_slice()], params![record.post_id.as_slice(), record.author.as_slice()],
)?; )?;
if deleted > 0 { if deleted > 0 {
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
} }
Ok(deleted > 0) Ok(deleted > 0)
@ -3383,6 +3396,28 @@ impl Storage {
Ok(()) Ok(())
} }
/// Update the preferred_tree JSON for a blob upstream entry.
pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> {
let json = serde_json::to_string(
&tree.iter().map(hex::encode).collect::<Vec<_>>()
)?;
self.conn.execute(
"UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2",
params![json, cid.as_slice()],
)?;
Ok(())
}
/// Get the preferred_tree for a blob upstream entry.
pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<NodeId>> {
let json: String = self.conn.query_row(
"SELECT preferred_tree FROM blob_upstream WHERE cid = ?1",
params![cid.as_slice()],
|row| row.get(0),
).unwrap_or_else(|_| "[]".to_string());
Ok(parse_anchors_json(&json))
}
// ---- Social Routes ---- // ---- Social Routes ----
/// Insert or update a social route entry. /// Insert or update a social route entry.
@ -3854,10 +3889,10 @@ impl Storage {
GROUP BY post_id GROUP BY post_id
) r ON b.post_id = r.post_id ) r ON b.post_id = r.post_id
LEFT JOIN ( LEFT JOIN (
SELECT file_id, COUNT(*) as ds_count SELECT cid, COUNT(*) as ds_count
FROM file_holders FROM blob_downstream
GROUP BY file_id GROUP BY cid
) d ON b.cid = d.file_id" ) d ON b.cid = d.cid"
)?; )?;
let rows = stmt.query_map(params![cutoff], |row| { let rows = stmt.query_map(params![cutoff], |row| {
let cid_bytes: Vec<u8> = row.get(0)?; let cid_bytes: Vec<u8> = row.get(0)?;
@ -3911,10 +3946,11 @@ impl Storage {
Ok(count as u64) Ok(count as u64)
} }
/// Clean up all CDN metadata for a blob (manifests + file_holders). /// Clean up all CDN metadata for a blob (manifests + upstream + downstream).
pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> { pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?; self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?; self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?;
Ok(()) Ok(())
} }
@ -3933,6 +3969,12 @@ impl Storage {
Ok(cids) Ok(cids)
} }
/// Remove upstream tracking for a blob CID.
pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
Ok(())
}
pub fn post_count(&self) -> anyhow::Result<usize> { pub fn post_count(&self) -> anyhow::Result<usize> {
let count: i64 = self let count: i64 = self
.conn .conn
@ -3995,24 +4037,137 @@ impl Storage {
Ok(result) Ok(result)
} }
/// Get CIDs of manifests older than a cutoff. Callers look up holders /// Record the upstream source for a blob CID.
/// via file_holders to pick a refresh source. pub fn store_blob_upstream(
pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result<Vec<[u8; 32]>> { &self,
cid: &[u8; 32],
source_node_id: &NodeId,
source_addresses: &[String],
) -> anyhow::Result<()> {
let addrs_json = serde_json::to_string(source_addresses)?;
self.conn.execute(
"INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4",
params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()],
)?;
Ok(())
}
/// Get the upstream source for a blob CID: (node_id, addresses).
pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<Option<(NodeId, Vec<String>)>> {
let result = self.conn.query_row(
"SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1",
params![cid.as_slice()],
|row| {
let nid_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((nid_bytes, addrs_json))
},
);
match result {
Ok((nid_bytes, addrs_json)) => {
let nid = blob_to_nodeid(nid_bytes)?;
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
Ok(Some((nid, addrs)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Register a downstream peer for a blob CID. Returns false if already at 100 downstream.
pub fn add_blob_downstream(
&self,
cid: &[u8; 32],
peer_node_id: &NodeId,
peer_addresses: &[String],
) -> anyhow::Result<bool> {
let count = self.get_blob_downstream_count(cid)?;
if count >= 100 {
return Ok(false);
}
let addrs_json = serde_json::to_string(peer_addresses)?;
self.conn.execute(
"INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4",
params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()],
)?;
Ok(true)
}
/// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>.
pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
let mut stmt = self.conn.prepare( let mut stmt = self.conn.prepare(
"SELECT cid FROM cdn_manifests WHERE updated_at < ?1", "SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1"
)?;
let rows = stmt.query_map(params![cid.as_slice()], |row| {
let nid_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((nid_bytes, addrs_json))
})?;
let mut result = Vec::new();
for row in rows {
let (nid_bytes, addrs_json) = row?;
let nid = blob_to_nodeid(nid_bytes)?;
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
result.push((nid, addrs));
}
Ok(result)
}
/// Count downstream peers for a blob CID.
pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result<u32> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1",
params![cid.as_slice()],
|row| row.get(0),
)?;
Ok(count as u32)
}
/// Remove a downstream peer for a blob CID.
pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2",
params![cid.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>.
pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result<Vec<([u8; 32], NodeId, Vec<String>)>> {
let mut stmt = self.conn.prepare(
"SELECT m.cid, u.source_node_id, u.source_addresses
FROM cdn_manifests m
LEFT JOIN blob_upstream u ON m.cid = u.cid
WHERE m.updated_at < ?1"
)?; )?;
let rows = stmt.query_map(params![older_than_ms as i64], |row| { let rows = stmt.query_map(params![older_than_ms as i64], |row| {
let cid_bytes: Vec<u8> = row.get(0)?; let cid_bytes: Vec<u8> = row.get(0)?;
Ok(cid_bytes) let nid_bytes: Option<Vec<u8>> = row.get(1)?;
let addrs_json: Option<String> = row.get(2)?;
Ok((cid_bytes, nid_bytes, addrs_json))
})?; })?;
let mut out = Vec::new(); let mut result = Vec::new();
for row in rows { for row in rows {
let cid_bytes = row?; let (cid_bytes, nid_bytes, addrs_json) = row?;
if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) { let cid: [u8; 32] = match cid_bytes.try_into() {
out.push(cid); Ok(c) => c,
} Err(_) => continue,
};
let nid = match nid_bytes {
Some(b) => match blob_to_nodeid(b) {
Ok(n) => n,
Err(_) => continue,
},
None => continue,
};
let addrs: Vec<String> = addrs_json
.map(|j| serde_json::from_str(&j).unwrap_or_default())
.unwrap_or_default();
result.push((cid, nid, addrs));
} }
Ok(out) Ok(result)
} }
/// Get the 10 posts before and 10 posts after a reference timestamp for an author. /// Get the 10 posts before and 10 posts after a reference timestamp for an author.
@ -4116,148 +4271,128 @@ impl Storage {
Ok(result) Ok(result)
} }
// --- File holders (flat, per-file, LRU-capped at 5) --- // --- Engagement: post_downstream ---
//
// A single table for PostId-keyed engagement propagation and CID-keyed
// manifest/blob propagation. Any 32-byte content-addressed file_id fits.
/// Upsert a holder for a file. Bumps last_interaction_ms to now and /// Register a peer as downstream for a post (max 100 per post).
/// enforces an LRU cap of 5 holders per file. /// Returns true if added, false if at capacity.
pub fn touch_file_holder( pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<bool> {
&self,
file_id: &[u8; 32],
peer_id: &NodeId,
peer_addresses: &[String],
direction: HolderDirection,
) -> anyhow::Result<()> {
let addrs_json = serde_json::to_string(peer_addresses)?;
let now = now_ms();
let new_dir = direction.as_str();
// Upsert. If the row exists with a different direction, promote to "both".
self.conn.execute(
"INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(file_id, peer_id) DO UPDATE SET
peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END,
last_interaction_ms = ?4,
direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END",
params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir],
)?;
// Enforce LRU cap of 5. Oldest get dropped.
self.conn.execute(
"DELETE FROM file_holders
WHERE file_id = ?1
AND peer_id NOT IN (
SELECT peer_id FROM file_holders
WHERE file_id = ?1
ORDER BY last_interaction_ms DESC
LIMIT 5
)",
params![file_id.as_slice()],
)?;
Ok(())
}
/// Count file holders (bounded at 5 by touch_file_holder's LRU cap).
pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result<u32> {
let count: i64 = self.conn.prepare( let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM file_holders WHERE file_id = ?1", "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
)?.query_row(params![file_id.as_slice()], |row| row.get(0))?; )?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
Ok(count as u32) if count >= 100 {
return Ok(false);
}
self.conn.execute(
"INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING",
params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()],
)?;
Ok(true)
} }
/// Return the up-to-5 most recently interacted holders of a file. /// Get all downstream peers for a post.
pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> { pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result<Vec<NodeId>> {
let mut stmt = self.conn.prepare( let mut stmt = self.conn.prepare(
"SELECT peer_id, peer_addresses FROM file_holders "SELECT peer_node_id FROM post_downstream WHERE post_id = ?1"
WHERE file_id = ?1
ORDER BY last_interaction_ms DESC
LIMIT 5",
)?; )?;
let rows = stmt.query_map(params![file_id.as_slice()], |row| { let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
let peer_bytes: Vec<u8> = row.get(0)?; let mut result = Vec::new();
let addrs_json: String = row.get(1)?;
Ok((peer_bytes, addrs_json))
})?;
let mut out = Vec::new();
for row in rows { for row in rows {
let (peer_bytes, addrs_json) = row?; if let Ok(nid) = blob_to_nodeid(row?) {
if peer_bytes.len() != 32 { continue; } result.push(nid);
let mut peer = [0u8; 32]; }
peer.copy_from_slice(&peer_bytes);
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
out.push((NodeId::from(peer), addrs));
} }
Ok(out) Ok(result)
} }
/// Remove all holders for a file (e.g. on post/blob deletion). /// Remove a downstream peer for a post.
pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> { pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute( self.conn.execute(
"DELETE FROM file_holders WHERE file_id = ?1", "DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![file_id.as_slice()], params![post_id.as_slice(), peer_node_id.as_slice()],
)?; )?;
Ok(()) Ok(())
} }
/// Remove a single peer's holder entry for a file. // --- Engagement: post_upstream (multi-upstream, 3 max) ---
pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> {
/// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post.
pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> {
// Check current count
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
if count >= 3 {
return Ok(()); // Already at cap
}
let now = now_ms();
self.conn.execute( self.conn.execute(
"DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", "INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at)
params![file_id.as_slice(), peer_id.as_slice()], VALUES (?1, ?2, ?3, ?4)",
params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now],
)?; )?;
Ok(()) Ok(())
} }
/// One-time migration: seed file_holders from the legacy upstream/downstream /// Get all upstream peers for a post, ordered by priority ASC (0 = primary).
/// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result<Vec<(NodeId, u8)>> {
/// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the let mut stmt = self.conn.prepare(
/// PRIMARY KEY. Skips tables that don't exist on fresh installs. "SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC"
pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { )?;
// Skip if file_holders already populated (idempotent re-run protection). let rows = stmt.query_map(params![post_id.as_slice()], |row| {
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? let bytes: Vec<u8> = row.get(0)?;
.query_row([], |row| row.get(0))?; let prio: i64 = row.get(1)?;
if existing > 0 { Ok((bytes, prio as u8))
return Ok(()); })?;
} let mut result = Vec::new();
let now = now_ms() as i64; for row in rows {
let table_exists = |name: &str| -> anyhow::Result<bool> { let (bytes, prio) = row?;
let count: i64 = self.conn.prepare( if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) {
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", result.push((nid, prio));
)?.query_row(params![name], |row| row.get(0))?; }
Ok(count > 0)
};
if table_exists("post_upstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
params![now],
)?;
}
if table_exists("post_downstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
params![now],
)?;
}
if table_exists("blob_upstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
params![now],
)?;
}
if table_exists("blob_downstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
params![now],
)?;
} }
Ok(result)
}
/// Get the primary (lowest priority) upstream peer for a post.
/// Backward-compatible wrapper for code that only needs a single upstream.
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
let upstreams = self.get_post_upstreams(post_id)?;
Ok(upstreams.into_iter().next().map(|(nid, _)| nid))
}
/// Remove a specific upstream peer for a post.
pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(()) Ok(())
} }
/// Promote an upstream peer to primary (priority 0), pushing others up.
pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
// Shift all priorities up by 1
self.conn.execute(
"UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1",
params![post_id.as_slice()],
)?;
// Set the promoted peer to priority 0
self.conn.execute(
"UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Count downstream peers for a post.
pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result<u32> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
Ok(count as u32)
}
// --- Engagement: reactions --- // --- Engagement: reactions ---
/// Store a reaction (upsert by reactor+post_id+emoji). /// Store a reaction (upsert by reactor+post_id+emoji).
@ -5166,6 +5301,60 @@ mod tests {
assert_eq!(manifests[0].0, cid); assert_eq!(manifests[0].0, cid);
} }
#[test]
fn blob_upstream_crud() {
let s = temp_storage();
let cid = [42u8; 32];
let source = make_node_id(1);
let addrs = vec!["10.0.0.1:4433".to_string()];
s.store_blob_upstream(&cid, &source, &addrs).unwrap();
let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap();
assert_eq!(nid, source);
assert_eq!(got_addrs, addrs);
// Missing
assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none());
// Update
let source2 = make_node_id(2);
s.store_blob_upstream(&cid, &source2, &[]).unwrap();
let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap();
assert_eq!(nid, source2);
}
#[test]
fn blob_downstream_crud_and_limit() {
let s = temp_storage();
let cid = [42u8; 32];
// Add downstream peers
for i in 0..100u8 {
let peer = make_node_id(i);
let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap();
assert!(ok, "should accept peer {}", i);
}
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100);
// 101st should be rejected
let peer_101 = make_node_id(200);
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
assert!(!ok, "should reject 101st downstream");
// Get all downstream
let downstream = s.get_blob_downstream(&cid).unwrap();
assert_eq!(downstream.len(), 100);
// Remove one
s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap();
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99);
// Now adding one more should work
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
assert!(ok, "should accept after removal");
}
#[test] #[test]
fn blob_pin_unpin() { fn blob_pin_unpin() {
let s = temp_storage(); let s = temp_storage();
@ -5249,15 +5438,18 @@ mod tests {
let peer = make_node_id(2); let peer = make_node_id(2);
s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap(); s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap();
s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap(); s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap();
assert!(s.get_cdn_manifest(&cid).unwrap().is_some()); assert!(s.get_cdn_manifest(&cid).unwrap().is_some());
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1); assert!(s.get_blob_upstream(&cid).unwrap().is_some());
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1);
s.cleanup_cdn_for_blob(&cid).unwrap(); s.cleanup_cdn_for_blob(&cid).unwrap();
assert!(s.get_cdn_manifest(&cid).unwrap().is_none()); assert!(s.get_cdn_manifest(&cid).unwrap().is_none());
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0); assert!(s.get_blob_upstream(&cid).unwrap().is_none());
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0);
} }
#[test] #[test]
@ -5277,6 +5469,18 @@ mod tests {
assert!(cids.contains(&cid2)); assert!(cids.contains(&cid2));
} }
#[test]
fn remove_blob_upstream() {
let s = temp_storage();
let cid = [42u8; 32];
let peer = make_node_id(1);
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
s.remove_blob_upstream(&cid).unwrap();
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
}
#[test] #[test]
fn author_post_neighborhood() { fn author_post_neighborhood() {
@ -5636,6 +5840,24 @@ mod tests {
assert_eq!(got2.preferred_tree.len(), 2); assert_eq!(got2.preferred_tree.len(), 2);
} }
#[test]
fn blob_upstream_preferred_tree() {
let s = temp_storage();
let cid = [42u8; 32];
let source = make_node_id(1);
s.store_blob_upstream(&cid, &source, &[]).unwrap();
// Initially empty
let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap();
assert!(tree.is_empty());
// Update
let nodes = vec![make_node_id(10), make_node_id(11)];
s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap();
let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap();
assert_eq!(tree2.len(), 2);
}
// ---- Circle Profile tests ---- // ---- Circle Profile tests ----
#[test] #[test]
@ -5836,39 +6058,32 @@ mod tests {
// --- Engagement tests --- // --- Engagement tests ---
#[test] #[test]
fn file_holders_lru_cap() { fn post_downstream_crud() {
let s = temp_storage(); let s = temp_storage();
let file = [42u8; 32]; let post_id = make_post_id(1);
// Sleep between inserts so last_interaction_ms actually differs (ms resolution). let peer1 = make_node_id(1);
for i in 0..7u8 { let peer2 = make_node_id(2);
s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap();
std::thread::sleep(std::time::Duration::from_millis(2));
}
// Only 5 most-recent survive
assert_eq!(s.get_file_holder_count(&file).unwrap(), 5);
let holders = s.get_file_holders(&file).unwrap();
assert_eq!(holders.len(), 5);
let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect();
// Oldest two (i=0, i=1) got evicted; most recent (i=6) survives
assert!(!kept.contains(&make_node_id(0)));
assert!(!kept.contains(&make_node_id(1)));
assert!(kept.contains(&make_node_id(6)));
}
#[test] // Add downstream peers
fn file_holders_direction_promotion() { assert!(s.add_post_downstream(&post_id, &peer1).unwrap());
let s = temp_storage(); assert!(s.add_post_downstream(&post_id, &peer2).unwrap());
let file = [42u8; 32];
let peer = make_node_id(1); let downstream = s.get_post_downstream(&post_id).unwrap();
s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap(); assert_eq!(downstream.len(), 2);
s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap(); assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2);
// Re-insert with opposite direction should promote to "both"
let dir: String = s.conn.query_row( // Remove one
"SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", s.remove_post_downstream(&post_id, &peer1).unwrap();
rusqlite::params![file.as_slice(), peer.as_slice()], assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1);
|row| row.get(0),
).unwrap(); // Capacity limit
assert_eq!(dir, "both"); let big_post = make_post_id(99);
for i in 0..100u8 {
assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap());
}
assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100);
// 101st should fail
assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap());
} }
#[test] #[test]

View file

@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
if let Some(author) = author_id { if let Some(author) = author_id {
holders.push(author); holders.push(author);
} }
if let Ok(file_holders) = store.get_file_holders(&post_id) { if let Ok(downstream) = store.get_post_downstream(&post_id) {
for (peer, _addrs) in file_holders { for peer in downstream {
if !holders.contains(&peer) { if !holders.contains(&peer) {
holders.push(peer); holders.push(peer);
} }