Compare commits

...

6 commits

Author SHA1 Message Date
Scott Reimers
5d9ba22427 Phase 2e (0.6.1-beta): drop legacy upstream/downstream tables
The file_holders table is now the only tracker of per-file peer
relationships. post_upstream, post_downstream, blob_upstream, and
blob_downstream are dropped at first launch after the seed migration
copies any existing entries.

Schema:
- DROP TABLE IF EXISTS on all four legacy tables after seeding
- Seed migration guards with sqlite_master table_exists check so fresh
  installs don't crash trying to read non-existent sources
- Remove CREATE TABLE statements for the four tables from init
- Remove Protocol v4 Phase 6 post_upstream priority migration (dead)
- Remove blob_upstream preferred_tree column migration (dead)

Rust:
- Remove add/get/remove post_upstream, post_downstream,
  blob_upstream, blob_downstream methods
- Remove get_blob_upstream_preferred_tree / update variant
- Rewrite get_eviction_candidates's downstream_count subquery to
  count file_holders entries
- Rewrite apply_delete's cascade cleanup to clear file_holders
  instead of post_upstream/post_downstream
- cleanup_cdn_for_blob now clears file_holders for the CID

Callers:
- All dual-write sites in connection.rs and node.rs now do
  touch_file_holder only (legacy writes removed)
- get_stale_manifests replaced with get_stale_manifest_cids; caller
  in node.rs picks a refresh source from file_holders

Tests:
- Remove blob_upstream_crud, blob_downstream_crud_and_limit,
  blob_upstream_preferred_tree, remove_blob_upstream,
  post_downstream_crud
- Add file_holders_lru_cap and file_holders_direction_promotion tests

All 110 core tests passing. Workspace compiles clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 21:42:15 -04:00
Scott Reimers
60463d1817 Phase 2d (0.6.1-beta): route manifest + blob ops through file_holders
Switch ALL propagation-decision reads to the flat holder set.

push_manifest_to_downstream now targets file_holders instead of
blob_downstream. ManifestPush receive-side relay likewise — known
holders fan out to up to 5 most-recent peers instead of a directional
tree.

Blob delete notices: single flat fan-out to file_holders; the legacy
upstream_node tree-healing field is emitted as None (wire-stable via
serde default) and ignored on receive — the post-0.6 flat model
doesn't need sender-role distinction. send_blob_delete_notices keeps
its Option<&Upstream> parameter as an unused placeholder for signature
stability with the call sites in this commit.

Other reads migrated:
- blob fetch cascade: step 2 now tries "known holders" (up to 5)
  instead of a single upstream
- manifest refresh: downstream_count reported from file_holder_count
- web/http post holder enumeration
- Worm search post/blob holder fallback (both connection.rs paths)
- DeleteRecord fan-out rewires to file_holders
- Under-replication replication check: < 2 holders

Storage additions:
- get_file_holder_count(file_id)
- remove_file_holder(file_id, peer_id)

Legacy upstream/downstream writes are still happening from Phase 2b;
those + the tables themselves go in 2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 21:09:45 -04:00
Scott Reimers
3a0d2e93ab Phase 2c (0.6.1-beta): route engagement diffs through file_holders
propagate_engagement_diff now targets the post's flat holder set (up to
5 most-recent) instead of the post_downstream directional tree. The
holder set naturally subsumes the old upstream+downstream partition, so
the separate "also send to upstreams" loops at each engagement call
site are removed (reactions, comments, comment edit/delete, receipt
slots, comment slots).

handle_blob_header_diff on receive:
- records the sending peer as a file holder (an engagement exchange is
  proof the peer holds the post)
- re-propagates to the holder set minus the sender

Writes to post_upstream / post_downstream still occur from Phase 2b
(dual-write); those and the legacy tables will be removed in 2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 21:00:53 -04:00
Scott Reimers
0b2b4f5a68 Phase 2b (0.6.1-beta): dual-write file_holders on all propagation events
Populate the flat holder set alongside every existing post_upstream /
post_downstream / blob_upstream / blob_downstream write so that read
paths can be switched over in the next commit without losing continuity.

Events wired:
- Pull sync receive (3 paths in connection.rs)
- PostPush receive (public posts only after Phase 1)
- PostFetch via notification (discovery pull)
- PostDownstreamRegister
- Replication accept (downstream) + replication-driven pull (upstream)
- Attachment upstream recorded after replication blob fetch
- ManifestPush receive (remote is a CID holder)
- ManifestPush send (downstream peer becomes CID holder)
- Blob fetch fallback (upstream lateral sources)

Direction is tracked as Received vs Sent. Not load-bearing for routing;
retained for future use. LRU cap of 5 enforced on every touch.

Legacy upstream/downstream writes remain in place; they'll go away
together with the table drops at the end of this phase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 20:56:28 -04:00
Scott Reimers
1658762a68 Phase 2a (0.6.1-beta): add file_holders table + legacy seed migration
New flat per-file holder set replaces the directional upstream/downstream
trees. Keyed by 32-byte content-addressed file_id (works for both PostId
and blob CID). LRU-capped at 5 holders per file on touch.

- HolderDirection enum (Sent/Received/Both) — tracked for potential
  reuse, not load-bearing for propagation
- touch_file_holder / get_file_holders / delete_file_holders
- seed_file_holders_from_legacy: one-time idempotent seed from
  post_upstream, post_downstream, blob_upstream, blob_downstream so
  users upgrading from 0.6.0 don't start with empty holder sets

Table and methods land here; call-site refactor and legacy-table drop
follow in subsequent commits within this phase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 20:52:30 -04:00
Scott Reimers
e6265b52b6 Phase 1 (0.6.0-beta): remove direct PostPush for encrypted posts
Encrypted posts now propagate only via the CDN (ManifestPush + neighbor
header updates), eliminating the sender→recipient traffic signal on the
wire. Encrypted DMs are indistinguishable from any other encrypted post.

- Remove push_post_to_recipients entirely from network.rs
- Remove call sites in create_post and re-encrypt-on-revoke
- PostPush handler now ignores non-public visibility (kept for public
  audience push path)

Known gap: non-follower DMs won't reach until Phase 3 (merged pull +
recipient-match). Followers receive via the existing CDN path — new
posts trigger neighbor-manifest updates, ManifestPush fans out to
downstream holders, recipients pull missing post IDs from followed
authors.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 20:46:34 -04:00
6 changed files with 470 additions and 736 deletions

View file

@ -1393,8 +1393,12 @@ impl ConnectionManager {
{
let s = storage.get().await;
for pid in &new_post_ids {
let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = s.add_post_upstream(pid, peer_id, prio);
let _ = s.touch_file_holder(
pid,
peer_id,
&[],
crate::storage::HolderDirection::Received,
);
}
for author in &synced_authors {
let _ = s.update_follow_last_sync(author, now_ms);
@ -1940,8 +1944,12 @@ impl ConnectionManager {
{
let storage = self.storage.get().await;
for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, from, prio);
let _ = storage.touch_file_holder(
pid,
from,
&[],
crate::storage::HolderDirection::Received,
);
}
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
@ -2034,8 +2042,12 @@ impl ConnectionManager {
{
let storage = self.storage.get().await;
for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, peer_id, prio);
let _ = storage.touch_file_holder(
pid,
peer_id,
&[],
crate::storage::HolderDirection::Received,
);
}
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
@ -2810,13 +2822,9 @@ impl ConnectionManager {
if store.get_post_with_visibility(post_id).ok().flatten().is_some() {
Some(self.our_node_id)
} else {
// CDN tree: do any of our downstream hosts have it?
let downstream = store.get_post_downstream(post_id).unwrap_or_default();
if !downstream.is_empty() {
Some(downstream[0])
} else {
None
}
// Any known holder of this post?
let holders = store.get_file_holders(post_id).unwrap_or_default();
holders.first().map(|(nid, _)| *nid)
}
};
post_holder = found;
@ -2830,9 +2838,9 @@ impl ConnectionManager {
// Check CDN: do we know who has it via blob post ownership?
let store = self.storage.get().await;
if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) {
let downstream = store.get_post_downstream(&pid).unwrap_or_default();
if !downstream.is_empty() {
blob_holder = Some(downstream[0]);
let holders = store.get_file_holders(&pid).unwrap_or_default();
if let Some((nid, _)) = holders.first() {
blob_holder = Some(*nid);
}
}
}
@ -4871,7 +4879,7 @@ impl ConnectionManager {
let cm = conn_mgr.lock().await;
// Collect blob CIDs + CDN peers before async work
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = Vec::new();
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>)> = Vec::new();
{
let storage = cm.storage.get().await;
for dr in &payload.records {
@ -4879,9 +4887,8 @@ impl ConnectionManager {
// Collect blobs for CDN cleanup before deleting
let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default();
for cid in blob_cids {
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
blob_cleanup.push((cid, downstream, upstream));
let holders = storage.get_file_holders(&cid).unwrap_or_default();
blob_cleanup.push((cid, holders));
}
let _ = storage.store_delete(dr);
let _ = storage.apply_delete(dr);
@ -4897,18 +4904,11 @@ impl ConnectionManager {
// Gather connections for CDN delete notices under lock, then send outside
let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new();
for (cid, downstream, upstream) in &blob_cleanup {
let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() });
let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info };
for (ds_nid, _) in downstream {
if let Some(pc) = cm.connections_ref().get(ds_nid) {
delete_notices.push((pc.connection.clone(), ds_payload.clone()));
}
}
if let Some((up_nid, _)) = upstream {
let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
if let Some(pc) = cm.connections_ref().get(up_nid) {
delete_notices.push((pc.connection.clone(), up_payload));
for (cid, holders) in &blob_cleanup {
let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
for (peer, _addrs) in holders {
if let Some(pc) = cm.connections_ref().get(peer) {
delete_notices.push((pc.connection.clone(), payload.clone()));
}
}
}
@ -4958,24 +4958,38 @@ impl ConnectionManager {
}
MessageType::PostPush => {
let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?;
let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await;
if !storage.is_deleted(&push.post.id)?
&& storage.get_post(&push.post.id)?.is_none()
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
{
let _ = storage.store_post_with_visibility(
&push.post.id,
&push.post.post,
&push.post.visibility,
);
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
info!(
// Encrypted posts are no longer accepted via direct push — they propagate
// via the CDN to eliminate the sender→recipient traffic signal.
if !matches!(push.post.visibility, crate::types::PostVisibility::Public) {
debug!(
peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id),
"Received direct post push"
"Ignoring non-public PostPush"
);
} else {
let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await;
if !storage.is_deleted(&push.post.id)?
&& storage.get_post(&push.post.id)?.is_none()
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
{
let _ = storage.store_post_with_visibility(
&push.post.id,
&push.post.post,
&push.post.visibility,
);
let _ = storage.touch_file_holder(
&push.post.id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
info!(
peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id),
"Received direct post push"
);
}
}
}
MessageType::AudienceRequest => {
@ -5063,17 +5077,24 @@ impl ConnectionManager {
&entry.manifest.author_manifest.author,
entry.manifest.author_manifest.updated_at,
);
// Remote peer pushed us this manifest → they hold the file.
let _ = storage.touch_file_holder(
&entry.cid,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
stored_entries.push(entry.clone());
}
// Gather downstream peers for relay before dropping locks
// Gather file holders for relay before dropping locks
let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new();
for entry in &stored_entries {
let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default();
for (ds_nid, _) in downstream {
if ds_nid == remote_node_id {
let holders = storage.get_file_holders(&entry.cid).unwrap_or_default();
for (peer, _addrs) in holders {
if peer == remote_node_id {
continue;
}
relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload {
relay_targets.push((peer, crate::protocol::ManifestPushPayload {
manifests: vec![entry.clone()],
}));
}
@ -5176,8 +5197,12 @@ impl ConnectionManager {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
let _ = storage.touch_file_holder(
&sync_post.id,
&sender_id,
&[],
crate::storage::HolderDirection::Received,
);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
@ -5268,32 +5293,14 @@ impl ConnectionManager {
let storage = cm.storage.get().await;
let cid = payload.cid;
// Check if sender was our upstream for this blob
let was_upstream = storage.get_blob_upstream(&cid).ok().flatten()
.map(|(nid, _)| nid == remote_node_id)
.unwrap_or(false);
if was_upstream {
// Sender was our upstream — clear it
let _ = storage.remove_blob_upstream(&cid);
// If they provided their upstream, store it as our new upstream
if let Some(ref new_up) = payload.upstream_node {
if let Ok(nid_bytes) = hex::decode(&new_up.n) {
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a);
}
}
}
} else {
// Sender was our downstream — remove them
let _ = storage.remove_blob_downstream(&cid, &remote_node_id);
}
// Flat-holder model: drop the sender as a holder of this file.
// The author's DeleteRecord (separate signed message) is what
// triggers the actual blob removal for followers.
let _ = storage.remove_file_holder(&cid, &remote_node_id);
info!(
peer = hex::encode(remote_node_id),
cid = hex::encode(cid),
was_upstream,
"Received blob delete notice"
);
}
@ -5437,7 +5444,12 @@ impl ConnectionManager {
let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?;
let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await;
let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id);
let _ = storage.touch_file_holder(
&payload.post_id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Sent,
);
drop(storage);
trace!(
peer = hex::encode(remote_node_id),
@ -5692,15 +5704,28 @@ impl ConnectionManager {
let storage = storage.get().await;
let manifest: Option<crate::types::CdnManifest> = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| {
if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0);
let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0);
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
} else { serde_json::from_str(&json).ok() }
});
let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() {
let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false);
if ok { (true, vec![]) } else {
let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default();
let redirects: Vec<PeerWithAddress> = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect();
let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0);
let _ = storage.touch_file_holder(
&payload.cid,
&remote_node_id,
&payload.requester_addresses,
crate::storage::HolderDirection::Sent,
);
// If we already had 5 holders before adding this one, the
// requester should consult them too for CDN lookups.
if prior_count < 5 {
(true, vec![])
} else {
let holders = storage.get_file_holders(&payload.cid).unwrap_or_default();
let redirects: Vec<PeerWithAddress> = holders.into_iter()
.filter(|(nid, _)| *nid != remote_node_id)
.map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs })
.collect();
(false, redirects)
}
} else { (false, vec![]) };
@ -5727,7 +5752,7 @@ impl ConnectionManager {
Some(json) => {
let manifest = if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
if am.updated_at > payload.current_updated_at {
let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0);
let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0);
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
} else { None }
} else { None };
@ -6072,9 +6097,14 @@ impl ConnectionManager {
to_pull.push(*pid);
}
// Register as downstream for all accepted posts
// Register as holder for all accepted posts
for pid in &acc {
let _ = storage.add_post_downstream(pid, &remote_node_id);
let _ = storage.touch_file_holder(
pid,
&remote_node_id,
&[],
crate::storage::HolderDirection::Sent,
);
}
(acc, rej, to_pull)
@ -6125,8 +6155,12 @@ impl ConnectionManager {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, &sender, prio);
let _ = storage.touch_file_holder(
&sp.id,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
let blob_store = cm.blob_store.clone();
drop(storage);
drop(cm);
@ -6153,7 +6187,12 @@ impl ConnectionManager {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
let _ = storage.add_post_upstream(&att.cid, &sender, 0);
let _ = storage.touch_file_holder(
&att.cid,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
}
Ok(())
}.await;
@ -6178,12 +6217,14 @@ impl ConnectionManager {
Ok(())
}
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream.
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate
/// to the post's file_holders (flat set, up to 5 most recent).
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
use crate::types::BlobHeaderDiffOp;
// Gather policy + audience data, then drop lock immediately
let (policy, approved_audience, downstream, upstreams) = {
// Gather policy + audience data + holders, then drop lock immediately.
// Remote peer clearly holds this post — record them as a holder.
let (policy, approved_audience, holders) = {
let storage = self.storage.get().await;
let policy = storage.get_comment_policy(&payload.post_id)
.ok()
@ -6193,13 +6234,18 @@ impl ConnectionManager {
crate::types::AudienceDirection::Inbound,
Some(crate::types::AudienceStatus::Approved),
).unwrap_or_default();
let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default();
let upstreams: Vec<NodeId> = storage.get_post_upstreams(&payload.post_id)
let _ = storage.touch_file_holder(
&payload.post_id,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
let holders: Vec<NodeId> = storage.get_file_holders(&payload.post_id)
.unwrap_or_default()
.into_iter()
.map(|(nid, _)| nid)
.map(|(nid, _addrs)| nid)
.collect();
(policy, approved, downstream, upstreams)
(policy, approved, holders)
};
// Filter ops using gathered data (no lock held)
@ -6381,26 +6427,16 @@ impl ConnectionManager {
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
}
// Collect all targets (downstream + all upstreams), then send in a single batched task
// Re-propagate to all file holders (flat set, max 5). Exclude sender.
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
for peer_id in downstream {
if peer_id == sender { continue; }
if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone()))
for peer_id in &holders {
if *peer_id == sender { continue; }
if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone()))
{
targets.push(conn);
}
}
// Phase 6: Try all upstreams, not just one
for up in &upstreams {
if *up != sender {
if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone()))
{
targets.push(conn);
}
}
}
if !targets.is_empty() {
let payload_clone = payload.clone();
tokio::spawn(async move {
@ -7684,8 +7720,8 @@ impl ConnectionActor {
if s.get_post_with_visibility(post_id).ok().flatten().is_some() {
post_holder = Some(ctx.our_node_id);
} else {
let downstream = s.get_post_downstream(post_id).unwrap_or_default();
if !downstream.is_empty() { post_holder = Some(downstream[0]); }
let holders = s.get_file_holders(post_id).unwrap_or_default();
if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); }
}
}
@ -7695,8 +7731,8 @@ impl ConnectionActor {
} else {
let s = ctx.storage.get().await;
if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) {
let downstream = s.get_post_downstream(&pid).unwrap_or_default();
if !downstream.is_empty() { blob_holder = Some(downstream[0]); }
let holders = s.get_file_holders(&pid).unwrap_or_default();
if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); }
}
}
}

View file

@ -378,7 +378,11 @@ async fn try_redirect(
Ok(Some((_, PostVisibility::Public))) => {}
_ => return false, // not found or not public — hard close
}
store.get_post_downstream(post_id).unwrap_or_default()
store.get_file_holders(post_id)
.unwrap_or_default()
.into_iter()
.map(|(nid, _addrs)| nid)
.collect::<Vec<_>>()
};
// Get addresses for downstream peers

View file

@ -902,50 +902,6 @@ impl Network {
self.send_to_audience(MessageType::PostNotification, &payload).await
}
/// Push a full post directly to recipients (persistent if available, ephemeral otherwise).
pub async fn push_post_to_recipients(
&self,
post_id: &crate::types::PostId,
post: &Post,
visibility: &PostVisibility,
) -> usize {
let recipients: Vec<NodeId> = match visibility {
PostVisibility::Public => return 0,
PostVisibility::Encrypted { recipients } => {
recipients.iter().map(|wk| wk.recipient).collect()
}
PostVisibility::GroupEncrypted { group_id, .. } => {
// Push to all group members
match self.storage.get().await.get_all_group_members() {
Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(),
Err(_) => return 0,
}
}
};
let payload = PostPushPayload {
post: SyncPost {
id: *post_id,
post: post.clone(),
visibility: visibility.clone(),
},
};
let mut pushed = 0;
for recipient in &recipients {
if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() {
pushed += 1;
debug!(
recipient = hex::encode(recipient),
post_id = hex::encode(post_id),
"Pushed post to recipient"
);
}
}
pushed
}
/// Push a profile update to all audience members (ephemeral-capable).
pub async fn push_profile(&self, profile: &PublicProfile) -> usize {
// Sanitize: if public_visible=false, strip display_name/bio from pushed profile
@ -1059,15 +1015,16 @@ impl Network {
sent
}
/// Push updated manifests to all downstream peers for a given CID.
/// Push an updated manifest to all known holders of the file (flat set,
/// up to 5 most-recent). Replaces the legacy downstream-tree push.
pub async fn push_manifest_to_downstream(
&self,
cid: &[u8; 32],
manifest: &crate::types::CdnManifest,
) -> usize {
let downstream = {
let holders = {
let storage = self.storage.get().await;
storage.get_blob_downstream(cid).unwrap_or_default()
storage.get_file_holders(cid).unwrap_or_default()
};
let payload = crate::protocol::ManifestPushPayload {
manifests: vec![crate::protocol::ManifestPushEntry {
@ -1076,54 +1033,40 @@ impl Network {
}],
};
let mut sent = 0;
for (ds_nid, _) in &downstream {
if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() {
for (peer, peer_addrs) in &holders {
if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() {
sent += 1;
let storage = self.storage.get().await;
let _ = storage.touch_file_holder(
cid,
peer,
peer_addrs,
crate::storage::HolderDirection::Sent,
);
}
}
sent
}
/// Send blob delete notices to downstream and upstream peers.
/// Downstream peers receive our upstream info for tree healing.
/// Upstream peers receive no upstream info (just "remove me as downstream").
/// Send blob delete notices to all known holders of a file.
/// Second argument kept as Option for signature stability; flat-holder
/// model doesn't need separate upstream handling.
pub async fn send_blob_delete_notices(
&self,
cid: &[u8; 32],
downstream: &[(NodeId, Vec<String>)],
upstream: Option<&(NodeId, Vec<String>)>,
holders: &[(NodeId, Vec<String>)],
_legacy_upstream: Option<&(NodeId, Vec<String>)>,
) -> usize {
let upstream_info = upstream.map(|(nid, addrs)| {
crate::types::PeerWithAddress {
n: hex::encode(nid),
a: addrs.clone(),
}
});
let mut sent = 0;
// Notify downstream (with upstream info for tree healing)
let ds_payload = crate::protocol::BlobDeleteNoticePayload {
let payload = crate::protocol::BlobDeleteNoticePayload {
cid: *cid,
upstream_node: upstream_info,
upstream_node: None,
};
for (ds_nid, _) in downstream {
if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() {
let mut sent = 0;
for (peer, _addrs) in holders {
if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() {
sent += 1;
}
}
// Notify upstream (no upstream info)
if let Some((up_nid, _)) = upstream {
let up_payload = crate::protocol::BlobDeleteNoticePayload {
cid: *cid,
upstream_node: None,
};
if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() {
sent += 1;
}
}
sent
}
@ -2356,24 +2299,24 @@ impl Network {
self.endpoint.close().await;
}
/// Propagate an engagement diff to all downstream holders of a post (CDN tree).
/// Excludes the sender to avoid loops.
/// Propagate an engagement diff to all known holders of a post (flat set,
/// up to 5 most-recent). Excludes the sender to avoid loops.
pub async fn propagate_engagement_diff(
&self,
post_id: &crate::types::PostId,
payload: &crate::protocol::BlobHeaderDiffPayload,
exclude_peer: &crate::types::NodeId,
) -> usize {
let downstream = {
let holders = {
let storage = self.storage.get().await;
storage.get_post_downstream(post_id).unwrap_or_default()
storage.get_file_holders(post_id).unwrap_or_default()
};
let mut sent = 0;
for ds_nid in &downstream {
if ds_nid == exclude_peer {
for (peer, _addrs) in &holders {
if peer == exclude_peer {
continue;
}
if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() {
if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() {
sent += 1;
}
}

View file

@ -836,13 +836,12 @@ impl Node {
}
}
// For encrypted posts, push directly to recipients
let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await;
// For public posts, push to audience members
// For public posts, push to audience members. Encrypted posts propagate
// via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient
// traffic signal.
let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await;
info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post");
info!(post_id = hex::encode(post_id), audience_pushed, "Created new post");
Ok((post_id, post, visibility))
}
@ -1351,7 +1350,12 @@ impl Node {
let source_addrs: Vec<String> = response.manifest.as_ref()
.map(|m| m.host_addresses.clone())
.unwrap_or_default();
let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
let _ = storage.touch_file_holder(
cid,
from_peer,
&source_addrs,
crate::storage::HolderDirection::Received,
);
}
Ok(data)
}
@ -1379,16 +1383,17 @@ impl Node {
// Collect redirect peers from responses in case we need them later
let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new();
// 2. Try existing upstream (if we previously fetched this blob)
let upstream = {
// 2. Try known holders (up to 5 most-recent peers we've interacted
// with about this file).
let known_holders = {
let storage = self.storage.get().await;
storage.get_blob_upstream(cid)?
storage.get_file_holders(cid).unwrap_or_default()
};
if let Some((upstream_nid, _upstream_addrs)) = upstream {
match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await {
for (holder_nid, _addrs) in &known_holders {
match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(error = %e, "blob fetch from upstream failed"),
Err(e) => warn!(error = %e, "blob fetch from known holder failed"),
}
}
@ -1413,7 +1418,12 @@ impl Node {
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
}
}
let _ = storage.store_blob_upstream(cid, &lateral, &[]);
let _ = storage.touch_file_holder(
cid,
&lateral,
&[],
crate::storage::HolderDirection::Received,
);
return Ok(Some(data));
}
Ok((None, response)) => {
@ -1981,14 +1991,13 @@ impl Node {
signature,
};
// Collect blob CIDs + CDN peers before cleanup
// Collect blob CIDs + known holders before cleanup (for delete notices)
let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = {
let storage = self.storage.get().await;
let cids = storage.get_blobs_for_post(post_id).unwrap_or_default();
cids.into_iter().map(|cid| {
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
(cid, downstream, upstream)
let holders = storage.get_file_holders(&cid).unwrap_or_default();
(cid, holders, None::<(NodeId, Vec<String>)>)
}).collect()
};
@ -2108,12 +2117,10 @@ impl Node {
storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?;
}
// delete_post already pushes the DeleteRecord
// delete_post already pushes the DeleteRecord.
// Replacement post propagates via the CDN to remaining recipients.
self.delete_post(post_id).await?;
// Push replacement post directly to remaining recipients
self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await;
info!(
old_id = hex::encode(post_id),
new_id = hex::encode(new_post_id),
@ -3086,20 +3093,27 @@ impl Node {
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64 - max_age_ms;
let stale = {
let stale_cids = {
let s = storage.get().await;
s.get_stale_manifests(cutoff).unwrap_or_default()
s.get_stale_manifest_cids(cutoff).unwrap_or_default()
};
for (cid, upstream_nid, _upstream_addrs) in &stale {
// Get current updated_at for this manifest
let current_updated_at = {
for cid in &stale_cids {
// Get current updated_at + pick a holder to refresh from
let (current_updated_at, refresh_source) = {
let s = storage.get().await;
s.get_cdn_manifest(cid).ok().flatten()
let updated_at = s.get_cdn_manifest(cid).ok().flatten()
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
.map(|m| m.updated_at)
.unwrap_or(0)
.unwrap_or(0);
let source = s.get_file_holders(cid)
.unwrap_or_default()
.into_iter()
.next()
.map(|(nid, _)| nid);
(updated_at, source)
};
match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await {
let Some(upstream_nid) = refresh_source else { continue; };
match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await {
Ok(Some(cdn_manifest)) => {
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
@ -3110,10 +3124,10 @@ impl Node {
&cdn_manifest.author_manifest.author,
cdn_manifest.author_manifest.updated_at,
);
// Relay to our downstream
let downstream = s.get_blob_downstream(cid).unwrap_or_default();
// Relay to known holders (flat set)
let holders = s.get_file_holders(cid).unwrap_or_default();
drop(s);
if !downstream.is_empty() {
if !holders.is_empty() {
network.push_manifest_to_downstream(cid, &cdn_manifest).await;
}
tracing::debug!(
@ -3126,7 +3140,7 @@ impl Node {
Err(e) => {
tracing::debug!(
cid = hex::encode(cid),
upstream = hex::encode(upstream_nid),
upstream = hex::encode(&upstream_nid),
error = %e,
"Manifest refresh from upstream failed"
);
@ -3277,18 +3291,16 @@ impl Node {
compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms)
}
/// Delete a blob with CDN notifications to upstream/downstream.
/// Delete a blob with CDN notifications to known holders.
pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
// Gather CDN peers before cleanup
let (downstream, upstream) = {
// Gather known holders before cleanup
let holders = {
let storage = self.storage.get().await;
let ds = storage.get_blob_downstream(cid).unwrap_or_default();
let up = storage.get_blob_upstream(cid).ok().flatten();
(ds, up)
storage.get_file_holders(cid).unwrap_or_default()
};
// Send CDN delete notices
self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await;
// Send CDN delete notices to all holders
self.network.send_blob_delete_notices(cid, &holders, None).await;
// Clean up local storage
{
@ -3576,15 +3588,9 @@ impl Node {
ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())],
timestamp_ms: now,
};
// propagate_engagement_diff targets all file_holders (flat set, max 5)
// which already subsumes what used to be upstream + downstream.
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(reaction)
@ -3691,14 +3697,6 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(comment)
@ -3735,14 +3733,6 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(())
}
@ -3776,14 +3766,6 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(())
}
@ -4005,14 +3987,6 @@ impl Node {
timestamp_ms: now,
};
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
Ok(())
}
@ -4127,14 +4101,6 @@ impl Node {
timestamp_ms: now,
};
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.get().await;
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
for (up, _prio) in upstreams {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
Ok(())
}
@ -4367,10 +4333,10 @@ impl Node {
}
};
// Filter to under-replicated (< 2 downstream)
// Filter to under-replicated (< 2 holders)
let mut needs_replication = Vec::new();
for pid in &recent_ids {
match storage.get_post_downstream_count(pid) {
match storage.get_file_holder_count(pid) {
Ok(count) if count < 2 => {
needs_replication.push(*pid);
}

View file

@ -12,6 +12,26 @@ use crate::types::{
VisibilityIntent,
};
/// Direction for file_holders entries: whether we sent the file to this peer,
/// received it from them, or both. Not load-bearing for propagation decisions —
/// any holder can serve as a diff target — but retained for potential reuse.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HolderDirection {
Sent,
Received,
Both,
}
impl HolderDirection {
pub fn as_str(&self) -> &'static str {
match self {
HolderDirection::Sent => "sent",
HolderDirection::Received => "received",
HolderDirection::Both => "both",
}
}
}
/// Blob metadata for eviction scoring.
pub struct EvictionCandidate {
pub cid: [u8; 32],
@ -262,20 +282,6 @@ impl Storage {
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author);
CREATE TABLE IF NOT EXISTS blob_upstream (
cid BLOB PRIMARY KEY,
source_node_id BLOB NOT NULL,
source_addresses TEXT NOT NULL DEFAULT '[]',
stored_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_downstream (
cid BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
peer_addresses TEXT NOT NULL DEFAULT '[]',
registered_at INTEGER NOT NULL,
PRIMARY KEY (cid, peer_node_id)
);
CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid);
CREATE TABLE IF NOT EXISTS group_keys (
group_id BLOB PRIMARY KEY,
circle_name TEXT NOT NULL,
@ -326,17 +332,6 @@ impl Storage {
last_seen_ms INTEGER NOT NULL,
success_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS post_downstream (
post_id BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
registered_at INTEGER NOT NULL,
PRIMARY KEY (post_id, peer_node_id)
);
CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id);
CREATE TABLE IF NOT EXISTS post_upstream (
post_id BLOB PRIMARY KEY,
peer_node_id BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_headers (
post_id BLOB PRIMARY KEY,
author BLOB NOT NULL,
@ -389,7 +384,17 @@ impl Storage {
CREATE TABLE IF NOT EXISTS seen_messages (
partner_id BLOB PRIMARY KEY,
last_read_ms INTEGER NOT NULL DEFAULT 0
);",
);
CREATE TABLE IF NOT EXISTS file_holders (
file_id BLOB NOT NULL,
peer_id BLOB NOT NULL,
peer_addresses TEXT NOT NULL DEFAULT '[]',
last_interaction_ms INTEGER NOT NULL,
direction TEXT NOT NULL,
PRIMARY KEY (file_id, peer_id)
);
CREATE INDEX IF NOT EXISTS idx_file_holders_recency
ON file_holders(file_id, last_interaction_ms DESC);",
)?;
Ok(())
}
@ -543,16 +548,6 @@ impl Storage {
)?;
}
// Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration)
let has_blob_pref_tree = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_blob_pref_tree == 0 {
self.conn.execute_batch(
"ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';"
)?;
}
// Add public_visible column to profiles if missing (Phase D-4 migration)
let has_public_visible = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'"
@ -666,25 +661,18 @@ impl Storage {
)?;
}
// Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max)
let has_priority = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_priority == 0 {
self.conn.execute_batch(
"ALTER TABLE post_upstream RENAME TO post_upstream_old;
CREATE TABLE post_upstream (
post_id BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
registered_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (post_id, peer_node_id)
);
INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at)
SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old;
DROP TABLE post_upstream_old;"
)?;
}
// 0.6.1-beta: seed file_holders from legacy upstream/downstream tables
// before they're dropped. Idempotent — only fires on an empty
// file_holders table.
self.seed_file_holders_from_legacy()?;
// 0.6.1-beta: drop legacy directional tables — replaced by file_holders.
self.conn.execute_batch(
"DROP TABLE IF EXISTS blob_upstream;
DROP TABLE IF EXISTS blob_downstream;
DROP TABLE IF EXISTS post_upstream;
DROP TABLE IF EXISTS post_downstream;",
)?;
Ok(())
}
@ -2396,8 +2384,7 @@ impl Storage {
params![record.post_id.as_slice(), record.author.as_slice()],
)?;
if deleted > 0 {
self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
}
Ok(deleted > 0)
@ -3396,28 +3383,6 @@ impl Storage {
Ok(())
}
/// Update the preferred_tree JSON for a blob upstream entry.
pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> {
let json = serde_json::to_string(
&tree.iter().map(hex::encode).collect::<Vec<_>>()
)?;
self.conn.execute(
"UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2",
params![json, cid.as_slice()],
)?;
Ok(())
}
/// Get the preferred_tree for a blob upstream entry.
pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<NodeId>> {
let json: String = self.conn.query_row(
"SELECT preferred_tree FROM blob_upstream WHERE cid = ?1",
params![cid.as_slice()],
|row| row.get(0),
).unwrap_or_else(|_| "[]".to_string());
Ok(parse_anchors_json(&json))
}
// ---- Social Routes ----
/// Insert or update a social route entry.
@ -3889,10 +3854,10 @@ impl Storage {
GROUP BY post_id
) r ON b.post_id = r.post_id
LEFT JOIN (
SELECT cid, COUNT(*) as ds_count
FROM blob_downstream
GROUP BY cid
) d ON b.cid = d.cid"
SELECT file_id, COUNT(*) as ds_count
FROM file_holders
GROUP BY file_id
) d ON b.cid = d.file_id"
)?;
let rows = stmt.query_map(params![cutoff], |row| {
let cid_bytes: Vec<u8> = row.get(0)?;
@ -3946,11 +3911,10 @@ impl Storage {
Ok(count as u64)
}
/// Clean up all CDN metadata for a blob (manifests + upstream + downstream).
/// Clean up all CDN metadata for a blob (manifests + file_holders).
pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?;
Ok(())
}
@ -3969,12 +3933,6 @@ impl Storage {
Ok(cids)
}
/// Remove upstream tracking for a blob CID.
pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
Ok(())
}
pub fn post_count(&self) -> anyhow::Result<usize> {
let count: i64 = self
.conn
@ -4037,137 +3995,24 @@ impl Storage {
Ok(result)
}
/// Record the upstream source for a blob CID.
pub fn store_blob_upstream(
&self,
cid: &[u8; 32],
source_node_id: &NodeId,
source_addresses: &[String],
) -> anyhow::Result<()> {
let addrs_json = serde_json::to_string(source_addresses)?;
self.conn.execute(
"INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4",
params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()],
)?;
Ok(())
}
/// Get the upstream source for a blob CID: (node_id, addresses).
pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<Option<(NodeId, Vec<String>)>> {
let result = self.conn.query_row(
"SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1",
params![cid.as_slice()],
|row| {
let nid_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((nid_bytes, addrs_json))
},
);
match result {
Ok((nid_bytes, addrs_json)) => {
let nid = blob_to_nodeid(nid_bytes)?;
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
Ok(Some((nid, addrs)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Register a downstream peer for a blob CID. Returns false if already at 100 downstream.
pub fn add_blob_downstream(
&self,
cid: &[u8; 32],
peer_node_id: &NodeId,
peer_addresses: &[String],
) -> anyhow::Result<bool> {
let count = self.get_blob_downstream_count(cid)?;
if count >= 100 {
return Ok(false);
}
let addrs_json = serde_json::to_string(peer_addresses)?;
self.conn.execute(
"INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4",
params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()],
)?;
Ok(true)
}
/// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>.
pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
/// Get CIDs of manifests older than a cutoff. Callers look up holders
/// via file_holders to pick a refresh source.
pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result<Vec<[u8; 32]>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1"
)?;
let rows = stmt.query_map(params![cid.as_slice()], |row| {
let nid_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((nid_bytes, addrs_json))
})?;
let mut result = Vec::new();
for row in rows {
let (nid_bytes, addrs_json) = row?;
let nid = blob_to_nodeid(nid_bytes)?;
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
result.push((nid, addrs));
}
Ok(result)
}
/// Count downstream peers for a blob CID.
pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result<u32> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1",
params![cid.as_slice()],
|row| row.get(0),
)?;
Ok(count as u32)
}
/// Remove a downstream peer for a blob CID.
pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2",
params![cid.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>.
pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result<Vec<([u8; 32], NodeId, Vec<String>)>> {
let mut stmt = self.conn.prepare(
"SELECT m.cid, u.source_node_id, u.source_addresses
FROM cdn_manifests m
LEFT JOIN blob_upstream u ON m.cid = u.cid
WHERE m.updated_at < ?1"
"SELECT cid FROM cdn_manifests WHERE updated_at < ?1",
)?;
let rows = stmt.query_map(params![older_than_ms as i64], |row| {
let cid_bytes: Vec<u8> = row.get(0)?;
let nid_bytes: Option<Vec<u8>> = row.get(1)?;
let addrs_json: Option<String> = row.get(2)?;
Ok((cid_bytes, nid_bytes, addrs_json))
Ok(cid_bytes)
})?;
let mut result = Vec::new();
let mut out = Vec::new();
for row in rows {
let (cid_bytes, nid_bytes, addrs_json) = row?;
let cid: [u8; 32] = match cid_bytes.try_into() {
Ok(c) => c,
Err(_) => continue,
};
let nid = match nid_bytes {
Some(b) => match blob_to_nodeid(b) {
Ok(n) => n,
Err(_) => continue,
},
None => continue,
};
let addrs: Vec<String> = addrs_json
.map(|j| serde_json::from_str(&j).unwrap_or_default())
.unwrap_or_default();
result.push((cid, nid, addrs));
let cid_bytes = row?;
if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) {
out.push(cid);
}
}
Ok(result)
Ok(out)
}
/// Get the 10 posts before and 10 posts after a reference timestamp for an author.
@ -4271,128 +4116,148 @@ impl Storage {
Ok(result)
}
// --- Engagement: post_downstream ---
// --- File holders (flat, per-file, LRU-capped at 5) ---
//
// A single table for PostId-keyed engagement propagation and CID-keyed
// manifest/blob propagation. Any 32-byte content-addressed file_id fits.
/// Register a peer as downstream for a post (max 100 per post).
/// Returns true if added, false if at capacity.
pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<bool> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
if count >= 100 {
return Ok(false);
}
self.conn.execute(
"INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING",
params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()],
)?;
Ok(true)
}
/// Get all downstream peers for a post.
pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result<Vec<NodeId>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id FROM post_downstream WHERE post_id = ?1"
)?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
let mut result = Vec::new();
for row in rows {
if let Ok(nid) = blob_to_nodeid(row?) {
result.push(nid);
}
}
Ok(result)
}
/// Remove a downstream peer for a post.
pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
// --- Engagement: post_upstream (multi-upstream, 3 max) ---
/// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post.
pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> {
// Check current count
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
if count >= 3 {
return Ok(()); // Already at cap
}
/// Upsert a holder for a file. Bumps last_interaction_ms to now and
/// enforces an LRU cap of 5 holders per file.
pub fn touch_file_holder(
&self,
file_id: &[u8; 32],
peer_id: &NodeId,
peer_addresses: &[String],
direction: HolderDirection,
) -> anyhow::Result<()> {
let addrs_json = serde_json::to_string(peer_addresses)?;
let now = now_ms();
let new_dir = direction.as_str();
// Upsert. If the row exists with a different direction, promote to "both".
self.conn.execute(
"INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at)
VALUES (?1, ?2, ?3, ?4)",
params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now],
"INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(file_id, peer_id) DO UPDATE SET
peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END,
last_interaction_ms = ?4,
direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END",
params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir],
)?;
// Enforce LRU cap of 5. Oldest get dropped.
self.conn.execute(
"DELETE FROM file_holders
WHERE file_id = ?1
AND peer_id NOT IN (
SELECT peer_id FROM file_holders
WHERE file_id = ?1
ORDER BY last_interaction_ms DESC
LIMIT 5
)",
params![file_id.as_slice()],
)?;
Ok(())
}
/// Get all upstream peers for a post, ordered by priority ASC (0 = primary).
pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result<Vec<(NodeId, u8)>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC"
)?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| {
let bytes: Vec<u8> = row.get(0)?;
let prio: i64 = row.get(1)?;
Ok((bytes, prio as u8))
})?;
let mut result = Vec::new();
for row in rows {
let (bytes, prio) = row?;
if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) {
result.push((nid, prio));
}
}
Ok(result)
}
/// Get the primary (lowest priority) upstream peer for a post.
/// Backward-compatible wrapper for code that only needs a single upstream.
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
let upstreams = self.get_post_upstreams(post_id)?;
Ok(upstreams.into_iter().next().map(|(nid, _)| nid))
}
/// Remove a specific upstream peer for a post.
pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Promote an upstream peer to primary (priority 0), pushing others up.
pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
// Shift all priorities up by 1
self.conn.execute(
"UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1",
params![post_id.as_slice()],
)?;
// Set the promoted peer to priority 0
self.conn.execute(
"UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Count downstream peers for a post.
pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result<u32> {
/// Count file holders (bounded at 5 by touch_file_holder's LRU cap).
pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result<u32> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
"SELECT COUNT(*) FROM file_holders WHERE file_id = ?1",
)?.query_row(params![file_id.as_slice()], |row| row.get(0))?;
Ok(count as u32)
}
/// Return the up-to-5 most recently interacted holders of a file.
pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
let mut stmt = self.conn.prepare(
"SELECT peer_id, peer_addresses FROM file_holders
WHERE file_id = ?1
ORDER BY last_interaction_ms DESC
LIMIT 5",
)?;
let rows = stmt.query_map(params![file_id.as_slice()], |row| {
let peer_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((peer_bytes, addrs_json))
})?;
let mut out = Vec::new();
for row in rows {
let (peer_bytes, addrs_json) = row?;
if peer_bytes.len() != 32 { continue; }
let mut peer = [0u8; 32];
peer.copy_from_slice(&peer_bytes);
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
out.push((NodeId::from(peer), addrs));
}
Ok(out)
}
/// Remove all holders for a file (e.g. on post/blob deletion).
pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM file_holders WHERE file_id = ?1",
params![file_id.as_slice()],
)?;
Ok(())
}
/// Remove a single peer's holder entry for a file.
pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
params![file_id.as_slice(), peer_id.as_slice()],
)?;
Ok(())
}
/// One-time migration: seed file_holders from the legacy upstream/downstream
/// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder
/// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the
/// PRIMARY KEY. Skips tables that don't exist on fresh installs.
pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> {
// Skip if file_holders already populated (idempotent re-run protection).
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")?
.query_row([], |row| row.get(0))?;
if existing > 0 {
return Ok(());
}
let now = now_ms() as i64;
let table_exists = |name: &str| -> anyhow::Result<bool> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
)?.query_row(params![name], |row| row.get(0))?;
Ok(count > 0)
};
if table_exists("post_upstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
params![now],
)?;
}
if table_exists("post_downstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
params![now],
)?;
}
if table_exists("blob_upstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
params![now],
)?;
}
if table_exists("blob_downstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
params![now],
)?;
}
Ok(())
}
// --- Engagement: reactions ---
/// Store a reaction (upsert by reactor+post_id+emoji).
@ -5301,60 +5166,6 @@ mod tests {
assert_eq!(manifests[0].0, cid);
}
#[test]
fn blob_upstream_crud() {
let s = temp_storage();
let cid = [42u8; 32];
let source = make_node_id(1);
let addrs = vec!["10.0.0.1:4433".to_string()];
s.store_blob_upstream(&cid, &source, &addrs).unwrap();
let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap();
assert_eq!(nid, source);
assert_eq!(got_addrs, addrs);
// Missing
assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none());
// Update
let source2 = make_node_id(2);
s.store_blob_upstream(&cid, &source2, &[]).unwrap();
let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap();
assert_eq!(nid, source2);
}
#[test]
fn blob_downstream_crud_and_limit() {
let s = temp_storage();
let cid = [42u8; 32];
// Add downstream peers
for i in 0..100u8 {
let peer = make_node_id(i);
let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap();
assert!(ok, "should accept peer {}", i);
}
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100);
// 101st should be rejected
let peer_101 = make_node_id(200);
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
assert!(!ok, "should reject 101st downstream");
// Get all downstream
let downstream = s.get_blob_downstream(&cid).unwrap();
assert_eq!(downstream.len(), 100);
// Remove one
s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap();
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99);
// Now adding one more should work
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
assert!(ok, "should accept after removal");
}
#[test]
fn blob_pin_unpin() {
let s = temp_storage();
@ -5438,18 +5249,15 @@ mod tests {
let peer = make_node_id(2);
s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap();
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap();
s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap();
assert!(s.get_cdn_manifest(&cid).unwrap().is_some());
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1);
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1);
s.cleanup_cdn_for_blob(&cid).unwrap();
assert!(s.get_cdn_manifest(&cid).unwrap().is_none());
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0);
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0);
}
#[test]
@ -5469,18 +5277,6 @@ mod tests {
assert!(cids.contains(&cid2));
}
#[test]
fn remove_blob_upstream() {
let s = temp_storage();
let cid = [42u8; 32];
let peer = make_node_id(1);
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
s.remove_blob_upstream(&cid).unwrap();
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
}
#[test]
fn author_post_neighborhood() {
@ -5840,24 +5636,6 @@ mod tests {
assert_eq!(got2.preferred_tree.len(), 2);
}
#[test]
fn blob_upstream_preferred_tree() {
let s = temp_storage();
let cid = [42u8; 32];
let source = make_node_id(1);
s.store_blob_upstream(&cid, &source, &[]).unwrap();
// Initially empty
let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap();
assert!(tree.is_empty());
// Update
let nodes = vec![make_node_id(10), make_node_id(11)];
s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap();
let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap();
assert_eq!(tree2.len(), 2);
}
// ---- Circle Profile tests ----
#[test]
@ -6058,32 +5836,39 @@ mod tests {
// --- Engagement tests ---
#[test]
fn post_downstream_crud() {
fn file_holders_lru_cap() {
let s = temp_storage();
let post_id = make_post_id(1);
let peer1 = make_node_id(1);
let peer2 = make_node_id(2);
// Add downstream peers
assert!(s.add_post_downstream(&post_id, &peer1).unwrap());
assert!(s.add_post_downstream(&post_id, &peer2).unwrap());
let downstream = s.get_post_downstream(&post_id).unwrap();
assert_eq!(downstream.len(), 2);
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2);
// Remove one
s.remove_post_downstream(&post_id, &peer1).unwrap();
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1);
// Capacity limit
let big_post = make_post_id(99);
for i in 0..100u8 {
assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap());
let file = [42u8; 32];
// Sleep between inserts so last_interaction_ms actually differs (ms resolution).
for i in 0..7u8 {
s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap();
std::thread::sleep(std::time::Duration::from_millis(2));
}
assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100);
// 101st should fail
assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap());
// Only 5 most-recent survive
assert_eq!(s.get_file_holder_count(&file).unwrap(), 5);
let holders = s.get_file_holders(&file).unwrap();
assert_eq!(holders.len(), 5);
let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect();
// Oldest two (i=0, i=1) got evicted; most recent (i=6) survives
assert!(!kept.contains(&make_node_id(0)));
assert!(!kept.contains(&make_node_id(1)));
assert!(kept.contains(&make_node_id(6)));
}
#[test]
fn file_holders_direction_promotion() {
let s = temp_storage();
let file = [42u8; 32];
let peer = make_node_id(1);
s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap();
s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap();
// Re-insert with opposite direction should promote to "both"
let dir: String = s.conn.query_row(
"SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
rusqlite::params![file.as_slice(), peer.as_slice()],
|row| row.get(0),
).unwrap();
assert_eq!(dir, "both");
}
#[test]

View file

@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
if let Some(author) = author_id {
holders.push(author);
}
if let Ok(downstream) = store.get_post_downstream(&post_id) {
for peer in downstream {
if let Ok(file_holders) = store.get_file_holders(&post_id) {
for (peer, _addrs) in file_holders {
if !holders.contains(&peer) {
holders.push(peer);
}