Phase 2b (0.6.1-beta): dual-write file_holders on all propagation events

Populate the flat holder set alongside every existing post_upstream /
post_downstream / blob_upstream / blob_downstream write so that read
paths can be switched over in the next commit without losing continuity.

Events wired:
- Pull sync receive (3 paths in connection.rs)
- PostPush receive (public posts only after Phase 1)
- PostFetch via notification (discovery pull)
- PostDownstreamRegister
- Replication accept (downstream) + replication-driven pull (upstream)
- Attachment upstream recorded after replication blob fetch
- ManifestPush receive (remote is a CID holder)
- ManifestPush send (downstream peer becomes CID holder)
- Blob fetch fallback (upstream lateral sources)

Direction is tracked as Received vs Sent. Not load-bearing for routing;
retained for future use. LRU cap of 5 enforced on every touch.

Legacy upstream/downstream writes remain in place; they'll go away
together with the table drops at the end of this phase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-04-21 20:56:28 -04:00
parent 1658762a68
commit 0b2b4f5a68
3 changed files with 88 additions and 1 deletions

View file

@ -1395,6 +1395,12 @@ impl ConnectionManager {
for pid in &new_post_ids { for pid in &new_post_ids {
let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = s.add_post_upstream(pid, peer_id, prio); let _ = s.add_post_upstream(pid, peer_id, prio);
let _ = s.touch_file_holder(
pid,
peer_id,
&[],
crate::storage::HolderDirection::Received,
);
} }
for author in &synced_authors { for author in &synced_authors {
let _ = s.update_follow_last_sync(author, now_ms); let _ = s.update_follow_last_sync(author, now_ms);
@ -1942,6 +1948,12 @@ impl ConnectionManager {
for pid in &new_post_ids { for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, from, prio); let _ = storage.add_post_upstream(pid, from, prio);
let _ = storage.touch_file_holder(
pid,
from,
&[],
crate::storage::HolderDirection::Received,
);
} }
for author in &synced_authors { for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms); let _ = storage.update_follow_last_sync(author, now_ms);
@ -2036,6 +2048,12 @@ impl ConnectionManager {
for pid in &new_post_ids { for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, peer_id, prio); let _ = storage.add_post_upstream(pid, peer_id, prio);
let _ = storage.touch_file_holder(
pid,
peer_id,
&[],
crate::storage::HolderDirection::Received,
);
} }
for author in &synced_authors { for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms); let _ = storage.update_follow_last_sync(author, now_ms);
@ -4980,6 +4998,12 @@ impl ConnectionManager {
); );
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
let _ = storage.touch_file_holder(
&push.post.id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
info!( info!(
peer = hex::encode(remote_node_id), peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id), post_id = hex::encode(push.post.id),
@ -5073,6 +5097,13 @@ impl ConnectionManager {
&entry.manifest.author_manifest.author, &entry.manifest.author_manifest.author,
entry.manifest.author_manifest.updated_at, entry.manifest.author_manifest.updated_at,
); );
// Remote peer pushed us this manifest → they hold the file.
let _ = storage.touch_file_holder(
&entry.cid,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
stored_entries.push(entry.clone()); stored_entries.push(entry.clone());
} }
// Gather downstream peers for relay before dropping locks // Gather downstream peers for relay before dropping locks
@ -5188,6 +5219,12 @@ impl ConnectionManager {
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
let _ = storage.touch_file_holder(
&sync_post.id,
&sender_id,
&[],
crate::storage::HolderDirection::Received,
);
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@ -5448,6 +5485,12 @@ impl ConnectionManager {
let cm = conn_mgr.lock().await; let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id);
let _ = storage.touch_file_holder(
&payload.post_id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Sent,
);
drop(storage); drop(storage);
trace!( trace!(
peer = hex::encode(remote_node_id), peer = hex::encode(remote_node_id),
@ -5708,6 +5751,12 @@ impl ConnectionManager {
}); });
let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() { let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() {
let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false); let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false);
let _ = storage.touch_file_holder(
&payload.cid,
&remote_node_id,
&payload.requester_addresses,
crate::storage::HolderDirection::Sent,
);
if ok { (true, vec![]) } else { if ok { (true, vec![]) } else {
let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default(); let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default();
let redirects: Vec<PeerWithAddress> = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect(); let redirects: Vec<PeerWithAddress> = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect();
@ -6085,6 +6134,12 @@ impl ConnectionManager {
// Register as downstream for all accepted posts // Register as downstream for all accepted posts
for pid in &acc { for pid in &acc {
let _ = storage.add_post_downstream(pid, &remote_node_id); let _ = storage.add_post_downstream(pid, &remote_node_id);
let _ = storage.touch_file_holder(
pid,
&remote_node_id,
&[],
crate::storage::HolderDirection::Sent,
);
} }
(acc, rej, to_pull) (acc, rej, to_pull)
@ -6137,6 +6192,12 @@ impl ConnectionManager {
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, &sender, prio); let _ = storage.add_post_upstream(&sp.id, &sender, prio);
let _ = storage.touch_file_holder(
&sp.id,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
let blob_store = cm.blob_store.clone(); let blob_store = cm.blob_store.clone();
drop(storage); drop(storage);
drop(cm); drop(cm);
@ -6164,6 +6225,12 @@ impl ConnectionManager {
let storage = cm.storage.get().await; let storage = cm.storage.get().await;
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
let _ = storage.add_post_upstream(&att.cid, &sender, 0); let _ = storage.add_post_upstream(&att.cid, &sender, 0);
let _ = storage.touch_file_holder(
&att.cid,
&sender,
&[],
crate::storage::HolderDirection::Received,
);
} }
Ok(()) Ok(())
}.await; }.await;

View file

@ -1032,9 +1032,17 @@ impl Network {
}], }],
}; };
let mut sent = 0; let mut sent = 0;
for (ds_nid, _) in &downstream { for (ds_nid, ds_addrs) in &downstream {
if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() { if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() {
sent += 1; sent += 1;
// We pushed this file's manifest → downstream peer now holds it.
let storage = self.storage.get().await;
let _ = storage.touch_file_holder(
cid,
ds_nid,
ds_addrs,
crate::storage::HolderDirection::Sent,
);
} }
} }
sent sent

View file

@ -1351,6 +1351,12 @@ impl Node {
.map(|m| m.host_addresses.clone()) .map(|m| m.host_addresses.clone())
.unwrap_or_default(); .unwrap_or_default();
let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
let _ = storage.touch_file_holder(
cid,
from_peer,
&source_addrs,
crate::storage::HolderDirection::Received,
);
} }
Ok(data) Ok(data)
} }
@ -1413,6 +1419,12 @@ impl Node {
} }
} }
let _ = storage.store_blob_upstream(cid, &lateral, &[]); let _ = storage.store_blob_upstream(cid, &lateral, &[]);
let _ = storage.touch_file_holder(
cid,
&lateral,
&[],
crate::storage::HolderDirection::Received,
);
return Ok(Some(data)); return Ok(Some(data));
} }
Ok((None, response)) => { Ok((None, response)) => {