From 0b2b4f5a687522f7423d59455f1308fa6a7e63d0 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 20:56:28 -0400 Subject: [PATCH] Phase 2b (0.6.1-beta): dual-write file_holders on all propagation events Populate the flat holder set alongside every existing post_upstream / post_downstream / blob_upstream / blob_downstream write so that read paths can be switched over in the next commit without losing continuity. Events wired: - Pull sync receive (3 paths in connection.rs) - PostPush receive (public posts only after Phase 1) - PostFetch via notification (discovery pull) - PostDownstreamRegister - Replication accept (downstream) + replication-driven pull (upstream) - Attachment upstream recorded after replication blob fetch - ManifestPush receive (remote is a CID holder) - ManifestPush send (downstream peer becomes CID holder) - Blob fetch fallback (upstream lateral sources) Direction is tracked as Received vs Sent. Not load-bearing for routing; retained for future use. LRU cap of 5 enforced on every touch. Legacy upstream/downstream writes remain in place; they'll go away together with the table drops at the end of this phase. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 67 +++++++++++++++++++++++++++++++++++ crates/core/src/network.rs | 10 +++++- crates/core/src/node.rs | 12 +++++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 6b73b03..7c8b7b3 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1395,6 +1395,12 @@ impl ConnectionManager { for pid in &new_post_ids { let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let _ = s.add_post_upstream(pid, peer_id, prio); + let _ = s.touch_file_holder( + pid, + peer_id, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = s.update_follow_last_sync(author, now_ms); @@ -1942,6 +1948,12 @@ impl ConnectionManager { for pid in &new_post_ids { let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(pid, from, prio); + let _ = storage.touch_file_holder( + pid, + from, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -2036,6 +2048,12 @@ impl ConnectionManager { for pid in &new_post_ids { let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(pid, peer_id, prio); + let _ = storage.touch_file_holder( + pid, + peer_id, + &[], + crate::storage::HolderDirection::Received, + ); } for author in &synced_authors { let _ = storage.update_follow_last_sync(author, now_ms); @@ -4980,6 +4998,12 @@ impl ConnectionManager { ); let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); + let _ = storage.touch_file_holder( + &push.post.id, + &remote_node_id, + &[], + crate::storage::HolderDirection::Received, + ); info!( peer = hex::encode(remote_node_id), post_id = hex::encode(push.post.id), @@ -5073,6 +5097,13 @@ impl ConnectionManager { &entry.manifest.author_manifest.author, entry.manifest.author_manifest.updated_at, ); + // Remote peer pushed us this manifest → they hold the file. + let _ = storage.touch_file_holder( + &entry.cid, + &remote_node_id, + &[], + crate::storage::HolderDirection::Received, + ); stored_entries.push(entry.clone()); } // Gather downstream peers for relay before dropping locks @@ -5188,6 +5219,12 @@ impl ConnectionManager { if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); + let _ = storage.touch_file_holder( + &sync_post.id, + &sender_id, + &[], + crate::storage::HolderDirection::Received, + ); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -5448,6 +5485,12 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; let storage = cm.storage.get().await; let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); + let _ = storage.touch_file_holder( + &payload.post_id, + &remote_node_id, + &[], + crate::storage::HolderDirection::Sent, + ); drop(storage); trace!( peer = hex::encode(remote_node_id), @@ -5708,6 +5751,12 @@ impl ConnectionManager { }); let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); + let _ = storage.touch_file_holder( + &payload.cid, + &remote_node_id, + &payload.requester_addresses, + crate::storage::HolderDirection::Sent, + ); if ok { (true, vec![]) } else { let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); let redirects: Vec = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); @@ -6085,6 +6134,12 @@ impl ConnectionManager { // Register as downstream for all accepted posts for pid in &acc { let _ = storage.add_post_downstream(pid, &remote_node_id); + let _ = storage.touch_file_holder( + pid, + &remote_node_id, + &[], + crate::storage::HolderDirection::Sent, + ); } (acc, rej, to_pull) @@ -6137,6 +6192,12 @@ impl ConnectionManager { let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); let _ = storage.add_post_upstream(&sp.id, &sender, prio); + let _ = storage.touch_file_holder( + &sp.id, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); let blob_store = cm.blob_store.clone(); drop(storage); drop(cm); @@ -6164,6 +6225,12 @@ impl ConnectionManager { let storage = cm.storage.get().await; let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); let _ = storage.add_post_upstream(&att.cid, &sender, 0); + let _ = storage.touch_file_holder( + &att.cid, + &sender, + &[], + crate::storage::HolderDirection::Received, + ); } Ok(()) }.await; diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 32273cd..55f4124 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -1032,9 +1032,17 @@ impl Network { }], }; let mut sent = 0; - for (ds_nid, _) in &downstream { + for (ds_nid, ds_addrs) in &downstream { if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { sent += 1; + // We pushed this file's manifest → downstream peer now holds it. + let storage = self.storage.get().await; + let _ = storage.touch_file_holder( + cid, + ds_nid, + ds_addrs, + crate::storage::HolderDirection::Sent, + ); } } sent diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 8e89b80..4b328ac 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1351,6 +1351,12 @@ impl Node { .map(|m| m.host_addresses.clone()) .unwrap_or_default(); let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); + let _ = storage.touch_file_holder( + cid, + from_peer, + &source_addrs, + crate::storage::HolderDirection::Received, + ); } Ok(data) } @@ -1413,6 +1419,12 @@ impl Node { } } let _ = storage.store_blob_upstream(cid, &lateral, &[]); + let _ = storage.touch_file_holder( + cid, + &lateral, + &[], + crate::storage::HolderDirection::Received, + ); return Ok(Some(data)); } Ok((None, response)) => {