Compare commits
6 commits
921a0ec40a
...
5d9ba22427
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d9ba22427 | ||
|
|
60463d1817 | ||
|
|
3a0d2e93ab | ||
|
|
0b2b4f5a68 | ||
|
|
1658762a68 | ||
|
|
e6265b52b6 |
6 changed files with 470 additions and 736 deletions
|
|
@ -1393,8 +1393,12 @@ impl ConnectionManager {
|
|||
{
|
||||
let s = storage.get().await;
|
||||
for pid in &new_post_ids {
|
||||
let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = s.add_post_upstream(pid, peer_id, prio);
|
||||
let _ = s.touch_file_holder(
|
||||
pid,
|
||||
peer_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
}
|
||||
for author in &synced_authors {
|
||||
let _ = s.update_follow_last_sync(author, now_ms);
|
||||
|
|
@ -1940,8 +1944,12 @@ impl ConnectionManager {
|
|||
{
|
||||
let storage = self.storage.get().await;
|
||||
for pid in &new_post_ids {
|
||||
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = storage.add_post_upstream(pid, from, prio);
|
||||
let _ = storage.touch_file_holder(
|
||||
pid,
|
||||
from,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
}
|
||||
for author in &synced_authors {
|
||||
let _ = storage.update_follow_last_sync(author, now_ms);
|
||||
|
|
@ -2034,8 +2042,12 @@ impl ConnectionManager {
|
|||
{
|
||||
let storage = self.storage.get().await;
|
||||
for pid in &new_post_ids {
|
||||
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = storage.add_post_upstream(pid, peer_id, prio);
|
||||
let _ = storage.touch_file_holder(
|
||||
pid,
|
||||
peer_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
}
|
||||
for author in &synced_authors {
|
||||
let _ = storage.update_follow_last_sync(author, now_ms);
|
||||
|
|
@ -2810,13 +2822,9 @@ impl ConnectionManager {
|
|||
if store.get_post_with_visibility(post_id).ok().flatten().is_some() {
|
||||
Some(self.our_node_id)
|
||||
} else {
|
||||
// CDN tree: do any of our downstream hosts have it?
|
||||
let downstream = store.get_post_downstream(post_id).unwrap_or_default();
|
||||
if !downstream.is_empty() {
|
||||
Some(downstream[0])
|
||||
} else {
|
||||
None
|
||||
}
|
||||
// Any known holder of this post?
|
||||
let holders = store.get_file_holders(post_id).unwrap_or_default();
|
||||
holders.first().map(|(nid, _)| *nid)
|
||||
}
|
||||
};
|
||||
post_holder = found;
|
||||
|
|
@ -2830,9 +2838,9 @@ impl ConnectionManager {
|
|||
// Check CDN: do we know who has it via blob post ownership?
|
||||
let store = self.storage.get().await;
|
||||
if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) {
|
||||
let downstream = store.get_post_downstream(&pid).unwrap_or_default();
|
||||
if !downstream.is_empty() {
|
||||
blob_holder = Some(downstream[0]);
|
||||
let holders = store.get_file_holders(&pid).unwrap_or_default();
|
||||
if let Some((nid, _)) = holders.first() {
|
||||
blob_holder = Some(*nid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4871,7 +4879,7 @@ impl ConnectionManager {
|
|||
let cm = conn_mgr.lock().await;
|
||||
|
||||
// Collect blob CIDs + CDN peers before async work
|
||||
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = Vec::new();
|
||||
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>)> = Vec::new();
|
||||
{
|
||||
let storage = cm.storage.get().await;
|
||||
for dr in &payload.records {
|
||||
|
|
@ -4879,9 +4887,8 @@ impl ConnectionManager {
|
|||
// Collect blobs for CDN cleanup before deleting
|
||||
let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default();
|
||||
for cid in blob_cids {
|
||||
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
|
||||
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
|
||||
blob_cleanup.push((cid, downstream, upstream));
|
||||
let holders = storage.get_file_holders(&cid).unwrap_or_default();
|
||||
blob_cleanup.push((cid, holders));
|
||||
}
|
||||
let _ = storage.store_delete(dr);
|
||||
let _ = storage.apply_delete(dr);
|
||||
|
|
@ -4897,18 +4904,11 @@ impl ConnectionManager {
|
|||
|
||||
// Gather connections for CDN delete notices under lock, then send outside
|
||||
let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new();
|
||||
for (cid, downstream, upstream) in &blob_cleanup {
|
||||
let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() });
|
||||
let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info };
|
||||
for (ds_nid, _) in downstream {
|
||||
if let Some(pc) = cm.connections_ref().get(ds_nid) {
|
||||
delete_notices.push((pc.connection.clone(), ds_payload.clone()));
|
||||
}
|
||||
}
|
||||
if let Some((up_nid, _)) = upstream {
|
||||
let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
|
||||
if let Some(pc) = cm.connections_ref().get(up_nid) {
|
||||
delete_notices.push((pc.connection.clone(), up_payload));
|
||||
for (cid, holders) in &blob_cleanup {
|
||||
let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
|
||||
for (peer, _addrs) in holders {
|
||||
if let Some(pc) = cm.connections_ref().get(peer) {
|
||||
delete_notices.push((pc.connection.clone(), payload.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4958,24 +4958,38 @@ impl ConnectionManager {
|
|||
}
|
||||
MessageType::PostPush => {
|
||||
let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||
let cm = conn_mgr.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
if !storage.is_deleted(&push.post.id)?
|
||||
&& storage.get_post(&push.post.id)?.is_none()
|
||||
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
|
||||
{
|
||||
let _ = storage.store_post_with_visibility(
|
||||
&push.post.id,
|
||||
&push.post.post,
|
||||
&push.post.visibility,
|
||||
);
|
||||
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
|
||||
info!(
|
||||
// Encrypted posts are no longer accepted via direct push — they propagate
|
||||
// via the CDN to eliminate the sender→recipient traffic signal.
|
||||
if !matches!(push.post.visibility, crate::types::PostVisibility::Public) {
|
||||
debug!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
post_id = hex::encode(push.post.id),
|
||||
"Received direct post push"
|
||||
"Ignoring non-public PostPush"
|
||||
);
|
||||
} else {
|
||||
let cm = conn_mgr.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
if !storage.is_deleted(&push.post.id)?
|
||||
&& storage.get_post(&push.post.id)?.is_none()
|
||||
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
|
||||
{
|
||||
let _ = storage.store_post_with_visibility(
|
||||
&push.post.id,
|
||||
&push.post.post,
|
||||
&push.post.visibility,
|
||||
);
|
||||
let _ = storage.touch_file_holder(
|
||||
&push.post.id,
|
||||
&remote_node_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
info!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
post_id = hex::encode(push.post.id),
|
||||
"Received direct post push"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageType::AudienceRequest => {
|
||||
|
|
@ -5063,17 +5077,24 @@ impl ConnectionManager {
|
|||
&entry.manifest.author_manifest.author,
|
||||
entry.manifest.author_manifest.updated_at,
|
||||
);
|
||||
// Remote peer pushed us this manifest → they hold the file.
|
||||
let _ = storage.touch_file_holder(
|
||||
&entry.cid,
|
||||
&remote_node_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
stored_entries.push(entry.clone());
|
||||
}
|
||||
// Gather downstream peers for relay before dropping locks
|
||||
// Gather file holders for relay before dropping locks
|
||||
let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new();
|
||||
for entry in &stored_entries {
|
||||
let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default();
|
||||
for (ds_nid, _) in downstream {
|
||||
if ds_nid == remote_node_id {
|
||||
let holders = storage.get_file_holders(&entry.cid).unwrap_or_default();
|
||||
for (peer, _addrs) in holders {
|
||||
if peer == remote_node_id {
|
||||
continue;
|
||||
}
|
||||
relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload {
|
||||
relay_targets.push((peer, crate::protocol::ManifestPushPayload {
|
||||
manifests: vec![entry.clone()],
|
||||
}));
|
||||
}
|
||||
|
|
@ -5176,8 +5197,12 @@ impl ConnectionManager {
|
|||
let cm = cm_arc.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
|
||||
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
|
||||
let _ = storage.touch_file_holder(
|
||||
&sync_post.id,
|
||||
&sender_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
|
|
@ -5268,32 +5293,14 @@ impl ConnectionManager {
|
|||
let storage = cm.storage.get().await;
|
||||
let cid = payload.cid;
|
||||
|
||||
// Check if sender was our upstream for this blob
|
||||
let was_upstream = storage.get_blob_upstream(&cid).ok().flatten()
|
||||
.map(|(nid, _)| nid == remote_node_id)
|
||||
.unwrap_or(false);
|
||||
|
||||
if was_upstream {
|
||||
// Sender was our upstream — clear it
|
||||
let _ = storage.remove_blob_upstream(&cid);
|
||||
|
||||
// If they provided their upstream, store it as our new upstream
|
||||
if let Some(ref new_up) = payload.upstream_node {
|
||||
if let Ok(nid_bytes) = hex::decode(&new_up.n) {
|
||||
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
|
||||
let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Sender was our downstream — remove them
|
||||
let _ = storage.remove_blob_downstream(&cid, &remote_node_id);
|
||||
}
|
||||
// Flat-holder model: drop the sender as a holder of this file.
|
||||
// The author's DeleteRecord (separate signed message) is what
|
||||
// triggers the actual blob removal for followers.
|
||||
let _ = storage.remove_file_holder(&cid, &remote_node_id);
|
||||
|
||||
info!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
cid = hex::encode(cid),
|
||||
was_upstream,
|
||||
"Received blob delete notice"
|
||||
);
|
||||
}
|
||||
|
|
@ -5437,7 +5444,12 @@ impl ConnectionManager {
|
|||
let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||
let cm = conn_mgr.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id);
|
||||
let _ = storage.touch_file_holder(
|
||||
&payload.post_id,
|
||||
&remote_node_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Sent,
|
||||
);
|
||||
drop(storage);
|
||||
trace!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
|
|
@ -5692,15 +5704,28 @@ impl ConnectionManager {
|
|||
let storage = storage.get().await;
|
||||
let manifest: Option<crate::types::CdnManifest> = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| {
|
||||
if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
|
||||
let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0);
|
||||
let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0);
|
||||
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
|
||||
} else { serde_json::from_str(&json).ok() }
|
||||
});
|
||||
let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() {
|
||||
let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false);
|
||||
if ok { (true, vec![]) } else {
|
||||
let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default();
|
||||
let redirects: Vec<PeerWithAddress> = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect();
|
||||
let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0);
|
||||
let _ = storage.touch_file_holder(
|
||||
&payload.cid,
|
||||
&remote_node_id,
|
||||
&payload.requester_addresses,
|
||||
crate::storage::HolderDirection::Sent,
|
||||
);
|
||||
// If we already had 5 holders before adding this one, the
|
||||
// requester should consult them too for CDN lookups.
|
||||
if prior_count < 5 {
|
||||
(true, vec![])
|
||||
} else {
|
||||
let holders = storage.get_file_holders(&payload.cid).unwrap_or_default();
|
||||
let redirects: Vec<PeerWithAddress> = holders.into_iter()
|
||||
.filter(|(nid, _)| *nid != remote_node_id)
|
||||
.map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs })
|
||||
.collect();
|
||||
(false, redirects)
|
||||
}
|
||||
} else { (false, vec![]) };
|
||||
|
|
@ -5727,7 +5752,7 @@ impl ConnectionManager {
|
|||
Some(json) => {
|
||||
let manifest = if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
|
||||
if am.updated_at > payload.current_updated_at {
|
||||
let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0);
|
||||
let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0);
|
||||
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
|
||||
} else { None }
|
||||
} else { None };
|
||||
|
|
@ -6072,9 +6097,14 @@ impl ConnectionManager {
|
|||
to_pull.push(*pid);
|
||||
}
|
||||
|
||||
// Register as downstream for all accepted posts
|
||||
// Register as holder for all accepted posts
|
||||
for pid in &acc {
|
||||
let _ = storage.add_post_downstream(pid, &remote_node_id);
|
||||
let _ = storage.touch_file_holder(
|
||||
pid,
|
||||
&remote_node_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Sent,
|
||||
);
|
||||
}
|
||||
|
||||
(acc, rej, to_pull)
|
||||
|
|
@ -6125,8 +6155,12 @@ impl ConnectionManager {
|
|||
let cm = cm_arc.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
|
||||
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = storage.add_post_upstream(&sp.id, &sender, prio);
|
||||
let _ = storage.touch_file_holder(
|
||||
&sp.id,
|
||||
&sender,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
let blob_store = cm.blob_store.clone();
|
||||
drop(storage);
|
||||
drop(cm);
|
||||
|
|
@ -6153,7 +6187,12 @@ impl ConnectionManager {
|
|||
let cm = cm_arc.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
|
||||
let _ = storage.add_post_upstream(&att.cid, &sender, 0);
|
||||
let _ = storage.touch_file_holder(
|
||||
&att.cid,
|
||||
&sender,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}.await;
|
||||
|
|
@ -6178,12 +6217,14 @@ impl ConnectionManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream.
|
||||
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate
|
||||
/// to the post's file_holders (flat set, up to 5 most recent).
|
||||
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
|
||||
use crate::types::BlobHeaderDiffOp;
|
||||
|
||||
// Gather policy + audience data, then drop lock immediately
|
||||
let (policy, approved_audience, downstream, upstreams) = {
|
||||
// Gather policy + audience data + holders, then drop lock immediately.
|
||||
// Remote peer clearly holds this post — record them as a holder.
|
||||
let (policy, approved_audience, holders) = {
|
||||
let storage = self.storage.get().await;
|
||||
let policy = storage.get_comment_policy(&payload.post_id)
|
||||
.ok()
|
||||
|
|
@ -6193,13 +6234,18 @@ impl ConnectionManager {
|
|||
crate::types::AudienceDirection::Inbound,
|
||||
Some(crate::types::AudienceStatus::Approved),
|
||||
).unwrap_or_default();
|
||||
let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default();
|
||||
let upstreams: Vec<NodeId> = storage.get_post_upstreams(&payload.post_id)
|
||||
let _ = storage.touch_file_holder(
|
||||
&payload.post_id,
|
||||
&sender,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
let holders: Vec<NodeId> = storage.get_file_holders(&payload.post_id)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|(nid, _)| nid)
|
||||
.map(|(nid, _addrs)| nid)
|
||||
.collect();
|
||||
(policy, approved, downstream, upstreams)
|
||||
(policy, approved, holders)
|
||||
};
|
||||
|
||||
// Filter ops using gathered data (no lock held)
|
||||
|
|
@ -6381,26 +6427,16 @@ impl ConnectionManager {
|
|||
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
|
||||
}
|
||||
|
||||
// Collect all targets (downstream + all upstreams), then send in a single batched task
|
||||
// Re-propagate to all file holders (flat set, max 5). Exclude sender.
|
||||
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
|
||||
for peer_id in downstream {
|
||||
if peer_id == sender { continue; }
|
||||
if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone())
|
||||
.or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone()))
|
||||
for peer_id in &holders {
|
||||
if *peer_id == sender { continue; }
|
||||
if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone())
|
||||
.or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone()))
|
||||
{
|
||||
targets.push(conn);
|
||||
}
|
||||
}
|
||||
// Phase 6: Try all upstreams, not just one
|
||||
for up in &upstreams {
|
||||
if *up != sender {
|
||||
if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone())
|
||||
.or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone()))
|
||||
{
|
||||
targets.push(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
if !targets.is_empty() {
|
||||
let payload_clone = payload.clone();
|
||||
tokio::spawn(async move {
|
||||
|
|
@ -7684,8 +7720,8 @@ impl ConnectionActor {
|
|||
if s.get_post_with_visibility(post_id).ok().flatten().is_some() {
|
||||
post_holder = Some(ctx.our_node_id);
|
||||
} else {
|
||||
let downstream = s.get_post_downstream(post_id).unwrap_or_default();
|
||||
if !downstream.is_empty() { post_holder = Some(downstream[0]); }
|
||||
let holders = s.get_file_holders(post_id).unwrap_or_default();
|
||||
if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -7695,8 +7731,8 @@ impl ConnectionActor {
|
|||
} else {
|
||||
let s = ctx.storage.get().await;
|
||||
if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) {
|
||||
let downstream = s.get_post_downstream(&pid).unwrap_or_default();
|
||||
if !downstream.is_empty() { blob_holder = Some(downstream[0]); }
|
||||
let holders = s.get_file_holders(&pid).unwrap_or_default();
|
||||
if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -378,7 +378,11 @@ async fn try_redirect(
|
|||
Ok(Some((_, PostVisibility::Public))) => {}
|
||||
_ => return false, // not found or not public — hard close
|
||||
}
|
||||
store.get_post_downstream(post_id).unwrap_or_default()
|
||||
store.get_file_holders(post_id)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|(nid, _addrs)| nid)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// Get addresses for downstream peers
|
||||
|
|
|
|||
|
|
@ -902,50 +902,6 @@ impl Network {
|
|||
self.send_to_audience(MessageType::PostNotification, &payload).await
|
||||
}
|
||||
|
||||
/// Push a full post directly to recipients (persistent if available, ephemeral otherwise).
|
||||
pub async fn push_post_to_recipients(
|
||||
&self,
|
||||
post_id: &crate::types::PostId,
|
||||
post: &Post,
|
||||
visibility: &PostVisibility,
|
||||
) -> usize {
|
||||
let recipients: Vec<NodeId> = match visibility {
|
||||
PostVisibility::Public => return 0,
|
||||
PostVisibility::Encrypted { recipients } => {
|
||||
recipients.iter().map(|wk| wk.recipient).collect()
|
||||
}
|
||||
PostVisibility::GroupEncrypted { group_id, .. } => {
|
||||
// Push to all group members
|
||||
match self.storage.get().await.get_all_group_members() {
|
||||
Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(),
|
||||
Err(_) => return 0,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let payload = PostPushPayload {
|
||||
post: SyncPost {
|
||||
id: *post_id,
|
||||
post: post.clone(),
|
||||
visibility: visibility.clone(),
|
||||
},
|
||||
};
|
||||
|
||||
let mut pushed = 0;
|
||||
for recipient in &recipients {
|
||||
if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() {
|
||||
pushed += 1;
|
||||
debug!(
|
||||
recipient = hex::encode(recipient),
|
||||
post_id = hex::encode(post_id),
|
||||
"Pushed post to recipient"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pushed
|
||||
}
|
||||
|
||||
/// Push a profile update to all audience members (ephemeral-capable).
|
||||
pub async fn push_profile(&self, profile: &PublicProfile) -> usize {
|
||||
// Sanitize: if public_visible=false, strip display_name/bio from pushed profile
|
||||
|
|
@ -1059,15 +1015,16 @@ impl Network {
|
|||
sent
|
||||
}
|
||||
|
||||
/// Push updated manifests to all downstream peers for a given CID.
|
||||
/// Push an updated manifest to all known holders of the file (flat set,
|
||||
/// up to 5 most-recent). Replaces the legacy downstream-tree push.
|
||||
pub async fn push_manifest_to_downstream(
|
||||
&self,
|
||||
cid: &[u8; 32],
|
||||
manifest: &crate::types::CdnManifest,
|
||||
) -> usize {
|
||||
let downstream = {
|
||||
let holders = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_blob_downstream(cid).unwrap_or_default()
|
||||
storage.get_file_holders(cid).unwrap_or_default()
|
||||
};
|
||||
let payload = crate::protocol::ManifestPushPayload {
|
||||
manifests: vec![crate::protocol::ManifestPushEntry {
|
||||
|
|
@ -1076,54 +1033,40 @@ impl Network {
|
|||
}],
|
||||
};
|
||||
let mut sent = 0;
|
||||
for (ds_nid, _) in &downstream {
|
||||
if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() {
|
||||
for (peer, peer_addrs) in &holders {
|
||||
if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() {
|
||||
sent += 1;
|
||||
let storage = self.storage.get().await;
|
||||
let _ = storage.touch_file_holder(
|
||||
cid,
|
||||
peer,
|
||||
peer_addrs,
|
||||
crate::storage::HolderDirection::Sent,
|
||||
);
|
||||
}
|
||||
}
|
||||
sent
|
||||
}
|
||||
|
||||
/// Send blob delete notices to downstream and upstream peers.
|
||||
/// Downstream peers receive our upstream info for tree healing.
|
||||
/// Upstream peers receive no upstream info (just "remove me as downstream").
|
||||
/// Send blob delete notices to all known holders of a file.
|
||||
/// Second argument kept as Option for signature stability; flat-holder
|
||||
/// model doesn't need separate upstream handling.
|
||||
pub async fn send_blob_delete_notices(
|
||||
&self,
|
||||
cid: &[u8; 32],
|
||||
downstream: &[(NodeId, Vec<String>)],
|
||||
upstream: Option<&(NodeId, Vec<String>)>,
|
||||
holders: &[(NodeId, Vec<String>)],
|
||||
_legacy_upstream: Option<&(NodeId, Vec<String>)>,
|
||||
) -> usize {
|
||||
let upstream_info = upstream.map(|(nid, addrs)| {
|
||||
crate::types::PeerWithAddress {
|
||||
n: hex::encode(nid),
|
||||
a: addrs.clone(),
|
||||
}
|
||||
});
|
||||
|
||||
let mut sent = 0;
|
||||
|
||||
// Notify downstream (with upstream info for tree healing)
|
||||
let ds_payload = crate::protocol::BlobDeleteNoticePayload {
|
||||
let payload = crate::protocol::BlobDeleteNoticePayload {
|
||||
cid: *cid,
|
||||
upstream_node: upstream_info,
|
||||
upstream_node: None,
|
||||
};
|
||||
for (ds_nid, _) in downstream {
|
||||
if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() {
|
||||
let mut sent = 0;
|
||||
for (peer, _addrs) in holders {
|
||||
if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() {
|
||||
sent += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Notify upstream (no upstream info)
|
||||
if let Some((up_nid, _)) = upstream {
|
||||
let up_payload = crate::protocol::BlobDeleteNoticePayload {
|
||||
cid: *cid,
|
||||
upstream_node: None,
|
||||
};
|
||||
if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() {
|
||||
sent += 1;
|
||||
}
|
||||
}
|
||||
|
||||
sent
|
||||
}
|
||||
|
||||
|
|
@ -2356,24 +2299,24 @@ impl Network {
|
|||
self.endpoint.close().await;
|
||||
}
|
||||
|
||||
/// Propagate an engagement diff to all downstream holders of a post (CDN tree).
|
||||
/// Excludes the sender to avoid loops.
|
||||
/// Propagate an engagement diff to all known holders of a post (flat set,
|
||||
/// up to 5 most-recent). Excludes the sender to avoid loops.
|
||||
pub async fn propagate_engagement_diff(
|
||||
&self,
|
||||
post_id: &crate::types::PostId,
|
||||
payload: &crate::protocol::BlobHeaderDiffPayload,
|
||||
exclude_peer: &crate::types::NodeId,
|
||||
) -> usize {
|
||||
let downstream = {
|
||||
let holders = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_downstream(post_id).unwrap_or_default()
|
||||
storage.get_file_holders(post_id).unwrap_or_default()
|
||||
};
|
||||
let mut sent = 0;
|
||||
for ds_nid in &downstream {
|
||||
if ds_nid == exclude_peer {
|
||||
for (peer, _addrs) in &holders {
|
||||
if peer == exclude_peer {
|
||||
continue;
|
||||
}
|
||||
if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() {
|
||||
if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() {
|
||||
sent += 1;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -836,13 +836,12 @@ impl Node {
|
|||
}
|
||||
}
|
||||
|
||||
// For encrypted posts, push directly to recipients
|
||||
let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await;
|
||||
|
||||
// For public posts, push to audience members
|
||||
// For public posts, push to audience members. Encrypted posts propagate
|
||||
// via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient
|
||||
// traffic signal.
|
||||
let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await;
|
||||
|
||||
info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post");
|
||||
info!(post_id = hex::encode(post_id), audience_pushed, "Created new post");
|
||||
Ok((post_id, post, visibility))
|
||||
}
|
||||
|
||||
|
|
@ -1351,7 +1350,12 @@ impl Node {
|
|||
let source_addrs: Vec<String> = response.manifest.as_ref()
|
||||
.map(|m| m.host_addresses.clone())
|
||||
.unwrap_or_default();
|
||||
let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
|
||||
let _ = storage.touch_file_holder(
|
||||
cid,
|
||||
from_peer,
|
||||
&source_addrs,
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
}
|
||||
Ok(data)
|
||||
}
|
||||
|
|
@ -1379,16 +1383,17 @@ impl Node {
|
|||
// Collect redirect peers from responses in case we need them later
|
||||
let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new();
|
||||
|
||||
// 2. Try existing upstream (if we previously fetched this blob)
|
||||
let upstream = {
|
||||
// 2. Try known holders (up to 5 most-recent peers we've interacted
|
||||
// with about this file).
|
||||
let known_holders = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_blob_upstream(cid)?
|
||||
storage.get_file_holders(cid).unwrap_or_default()
|
||||
};
|
||||
if let Some((upstream_nid, _upstream_addrs)) = upstream {
|
||||
match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await {
|
||||
for (holder_nid, _addrs) in &known_holders {
|
||||
match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await {
|
||||
Ok(Some(data)) => return Ok(Some(data)),
|
||||
Ok(None) => {}
|
||||
Err(e) => warn!(error = %e, "blob fetch from upstream failed"),
|
||||
Err(e) => warn!(error = %e, "blob fetch from known holder failed"),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1413,7 +1418,12 @@ impl Node {
|
|||
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
|
||||
}
|
||||
}
|
||||
let _ = storage.store_blob_upstream(cid, &lateral, &[]);
|
||||
let _ = storage.touch_file_holder(
|
||||
cid,
|
||||
&lateral,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
return Ok(Some(data));
|
||||
}
|
||||
Ok((None, response)) => {
|
||||
|
|
@ -1981,14 +1991,13 @@ impl Node {
|
|||
signature,
|
||||
};
|
||||
|
||||
// Collect blob CIDs + CDN peers before cleanup
|
||||
// Collect blob CIDs + known holders before cleanup (for delete notices)
|
||||
let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = {
|
||||
let storage = self.storage.get().await;
|
||||
let cids = storage.get_blobs_for_post(post_id).unwrap_or_default();
|
||||
cids.into_iter().map(|cid| {
|
||||
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
|
||||
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
|
||||
(cid, downstream, upstream)
|
||||
let holders = storage.get_file_holders(&cid).unwrap_or_default();
|
||||
(cid, holders, None::<(NodeId, Vec<String>)>)
|
||||
}).collect()
|
||||
};
|
||||
|
||||
|
|
@ -2108,12 +2117,10 @@ impl Node {
|
|||
storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?;
|
||||
}
|
||||
|
||||
// delete_post already pushes the DeleteRecord
|
||||
// delete_post already pushes the DeleteRecord.
|
||||
// Replacement post propagates via the CDN to remaining recipients.
|
||||
self.delete_post(post_id).await?;
|
||||
|
||||
// Push replacement post directly to remaining recipients
|
||||
self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await;
|
||||
|
||||
info!(
|
||||
old_id = hex::encode(post_id),
|
||||
new_id = hex::encode(new_post_id),
|
||||
|
|
@ -3086,20 +3093,27 @@ impl Node {
|
|||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64 - max_age_ms;
|
||||
let stale = {
|
||||
let stale_cids = {
|
||||
let s = storage.get().await;
|
||||
s.get_stale_manifests(cutoff).unwrap_or_default()
|
||||
s.get_stale_manifest_cids(cutoff).unwrap_or_default()
|
||||
};
|
||||
for (cid, upstream_nid, _upstream_addrs) in &stale {
|
||||
// Get current updated_at for this manifest
|
||||
let current_updated_at = {
|
||||
for cid in &stale_cids {
|
||||
// Get current updated_at + pick a holder to refresh from
|
||||
let (current_updated_at, refresh_source) = {
|
||||
let s = storage.get().await;
|
||||
s.get_cdn_manifest(cid).ok().flatten()
|
||||
let updated_at = s.get_cdn_manifest(cid).ok().flatten()
|
||||
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
|
||||
.map(|m| m.updated_at)
|
||||
.unwrap_or(0)
|
||||
.unwrap_or(0);
|
||||
let source = s.get_file_holders(cid)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|(nid, _)| nid);
|
||||
(updated_at, source)
|
||||
};
|
||||
match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await {
|
||||
let Some(upstream_nid) = refresh_source else { continue; };
|
||||
match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await {
|
||||
Ok(Some(cdn_manifest)) => {
|
||||
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
|
||||
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
|
||||
|
|
@ -3110,10 +3124,10 @@ impl Node {
|
|||
&cdn_manifest.author_manifest.author,
|
||||
cdn_manifest.author_manifest.updated_at,
|
||||
);
|
||||
// Relay to our downstream
|
||||
let downstream = s.get_blob_downstream(cid).unwrap_or_default();
|
||||
// Relay to known holders (flat set)
|
||||
let holders = s.get_file_holders(cid).unwrap_or_default();
|
||||
drop(s);
|
||||
if !downstream.is_empty() {
|
||||
if !holders.is_empty() {
|
||||
network.push_manifest_to_downstream(cid, &cdn_manifest).await;
|
||||
}
|
||||
tracing::debug!(
|
||||
|
|
@ -3126,7 +3140,7 @@ impl Node {
|
|||
Err(e) => {
|
||||
tracing::debug!(
|
||||
cid = hex::encode(cid),
|
||||
upstream = hex::encode(upstream_nid),
|
||||
upstream = hex::encode(&upstream_nid),
|
||||
error = %e,
|
||||
"Manifest refresh from upstream failed"
|
||||
);
|
||||
|
|
@ -3277,18 +3291,16 @@ impl Node {
|
|||
compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms)
|
||||
}
|
||||
|
||||
/// Delete a blob with CDN notifications to upstream/downstream.
|
||||
/// Delete a blob with CDN notifications to known holders.
|
||||
pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
||||
// Gather CDN peers before cleanup
|
||||
let (downstream, upstream) = {
|
||||
// Gather known holders before cleanup
|
||||
let holders = {
|
||||
let storage = self.storage.get().await;
|
||||
let ds = storage.get_blob_downstream(cid).unwrap_or_default();
|
||||
let up = storage.get_blob_upstream(cid).ok().flatten();
|
||||
(ds, up)
|
||||
storage.get_file_holders(cid).unwrap_or_default()
|
||||
};
|
||||
|
||||
// Send CDN delete notices
|
||||
self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await;
|
||||
// Send CDN delete notices to all holders
|
||||
self.network.send_blob_delete_notices(cid, &holders, None).await;
|
||||
|
||||
// Clean up local storage
|
||||
{
|
||||
|
|
@ -3576,15 +3588,9 @@ impl Node {
|
|||
ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())],
|
||||
timestamp_ms: now,
|
||||
};
|
||||
// propagate_engagement_diff targets all file_holders (flat set, max 5)
|
||||
// which already subsumes what used to be upstream + downstream.
|
||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
|
||||
let upstreams = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
||||
};
|
||||
for (up, _prio) in upstreams {
|
||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(reaction)
|
||||
|
|
@ -3691,14 +3697,6 @@ impl Node {
|
|||
timestamp_ms: now,
|
||||
};
|
||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
|
||||
let upstreams = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
||||
};
|
||||
for (up, _prio) in upstreams {
|
||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(comment)
|
||||
|
|
@ -3735,14 +3733,6 @@ impl Node {
|
|||
timestamp_ms: now,
|
||||
};
|
||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||
// Phase 6: send to all upstreams
|
||||
let upstreams = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
||||
};
|
||||
for (up, _prio) in upstreams {
|
||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -3776,14 +3766,6 @@ impl Node {
|
|||
timestamp_ms: now,
|
||||
};
|
||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||
// Phase 6: send to all upstreams
|
||||
let upstreams = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
||||
};
|
||||
for (up, _prio) in upstreams {
|
||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -4005,14 +3987,6 @@ impl Node {
|
|||
timestamp_ms: now,
|
||||
};
|
||||
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
|
||||
// Phase 6: send to all upstreams
|
||||
let upstreams = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
||||
};
|
||||
for (up, _prio) in upstreams {
|
||||
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -4127,14 +4101,6 @@ impl Node {
|
|||
timestamp_ms: now,
|
||||
};
|
||||
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
|
||||
// Phase 6: send to all upstreams
|
||||
let upstreams = {
|
||||
let storage = self.storage.get().await;
|
||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
||||
};
|
||||
for (up, _prio) in upstreams {
|
||||
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -4367,10 +4333,10 @@ impl Node {
|
|||
}
|
||||
};
|
||||
|
||||
// Filter to under-replicated (< 2 downstream)
|
||||
// Filter to under-replicated (< 2 holders)
|
||||
let mut needs_replication = Vec::new();
|
||||
for pid in &recent_ids {
|
||||
match storage.get_post_downstream_count(pid) {
|
||||
match storage.get_file_holder_count(pid) {
|
||||
Ok(count) if count < 2 => {
|
||||
needs_replication.push(*pid);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,26 @@ use crate::types::{
|
|||
VisibilityIntent,
|
||||
};
|
||||
|
||||
/// Direction for file_holders entries: whether we sent the file to this peer,
|
||||
/// received it from them, or both. Not load-bearing for propagation decisions —
|
||||
/// any holder can serve as a diff target — but retained for potential reuse.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum HolderDirection {
|
||||
Sent,
|
||||
Received,
|
||||
Both,
|
||||
}
|
||||
|
||||
impl HolderDirection {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
HolderDirection::Sent => "sent",
|
||||
HolderDirection::Received => "received",
|
||||
HolderDirection::Both => "both",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Blob metadata for eviction scoring.
|
||||
pub struct EvictionCandidate {
|
||||
pub cid: [u8; 32],
|
||||
|
|
@ -262,20 +282,6 @@ impl Storage {
|
|||
updated_at INTEGER NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author);
|
||||
CREATE TABLE IF NOT EXISTS blob_upstream (
|
||||
cid BLOB PRIMARY KEY,
|
||||
source_node_id BLOB NOT NULL,
|
||||
source_addresses TEXT NOT NULL DEFAULT '[]',
|
||||
stored_at INTEGER NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS blob_downstream (
|
||||
cid BLOB NOT NULL,
|
||||
peer_node_id BLOB NOT NULL,
|
||||
peer_addresses TEXT NOT NULL DEFAULT '[]',
|
||||
registered_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (cid, peer_node_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid);
|
||||
CREATE TABLE IF NOT EXISTS group_keys (
|
||||
group_id BLOB PRIMARY KEY,
|
||||
circle_name TEXT NOT NULL,
|
||||
|
|
@ -326,17 +332,6 @@ impl Storage {
|
|||
last_seen_ms INTEGER NOT NULL,
|
||||
success_count INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS post_downstream (
|
||||
post_id BLOB NOT NULL,
|
||||
peer_node_id BLOB NOT NULL,
|
||||
registered_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (post_id, peer_node_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id);
|
||||
CREATE TABLE IF NOT EXISTS post_upstream (
|
||||
post_id BLOB PRIMARY KEY,
|
||||
peer_node_id BLOB NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS blob_headers (
|
||||
post_id BLOB PRIMARY KEY,
|
||||
author BLOB NOT NULL,
|
||||
|
|
@ -389,7 +384,17 @@ impl Storage {
|
|||
CREATE TABLE IF NOT EXISTS seen_messages (
|
||||
partner_id BLOB PRIMARY KEY,
|
||||
last_read_ms INTEGER NOT NULL DEFAULT 0
|
||||
);",
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS file_holders (
|
||||
file_id BLOB NOT NULL,
|
||||
peer_id BLOB NOT NULL,
|
||||
peer_addresses TEXT NOT NULL DEFAULT '[]',
|
||||
last_interaction_ms INTEGER NOT NULL,
|
||||
direction TEXT NOT NULL,
|
||||
PRIMARY KEY (file_id, peer_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_file_holders_recency
|
||||
ON file_holders(file_id, last_interaction_ms DESC);",
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -543,16 +548,6 @@ impl Storage {
|
|||
)?;
|
||||
}
|
||||
|
||||
// Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration)
|
||||
let has_blob_pref_tree = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'"
|
||||
)?.query_row([], |row| row.get::<_, i64>(0))?;
|
||||
if has_blob_pref_tree == 0 {
|
||||
self.conn.execute_batch(
|
||||
"ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';"
|
||||
)?;
|
||||
}
|
||||
|
||||
// Add public_visible column to profiles if missing (Phase D-4 migration)
|
||||
let has_public_visible = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'"
|
||||
|
|
@ -666,25 +661,18 @@ impl Storage {
|
|||
)?;
|
||||
}
|
||||
|
||||
// Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max)
|
||||
let has_priority = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'"
|
||||
)?.query_row([], |row| row.get::<_, i64>(0))?;
|
||||
if has_priority == 0 {
|
||||
self.conn.execute_batch(
|
||||
"ALTER TABLE post_upstream RENAME TO post_upstream_old;
|
||||
CREATE TABLE post_upstream (
|
||||
post_id BLOB NOT NULL,
|
||||
peer_node_id BLOB NOT NULL,
|
||||
priority INTEGER NOT NULL DEFAULT 0,
|
||||
registered_at INTEGER NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (post_id, peer_node_id)
|
||||
);
|
||||
INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at)
|
||||
SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old;
|
||||
DROP TABLE post_upstream_old;"
|
||||
)?;
|
||||
}
|
||||
// 0.6.1-beta: seed file_holders from legacy upstream/downstream tables
|
||||
// before they're dropped. Idempotent — only fires on an empty
|
||||
// file_holders table.
|
||||
self.seed_file_holders_from_legacy()?;
|
||||
|
||||
// 0.6.1-beta: drop legacy directional tables — replaced by file_holders.
|
||||
self.conn.execute_batch(
|
||||
"DROP TABLE IF EXISTS blob_upstream;
|
||||
DROP TABLE IF EXISTS blob_downstream;
|
||||
DROP TABLE IF EXISTS post_upstream;
|
||||
DROP TABLE IF EXISTS post_downstream;",
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -2396,8 +2384,7 @@ impl Storage {
|
|||
params![record.post_id.as_slice(), record.author.as_slice()],
|
||||
)?;
|
||||
if deleted > 0 {
|
||||
self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
||||
self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
||||
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?;
|
||||
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
||||
}
|
||||
Ok(deleted > 0)
|
||||
|
|
@ -3396,28 +3383,6 @@ impl Storage {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the preferred_tree JSON for a blob upstream entry.
|
||||
pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> {
|
||||
let json = serde_json::to_string(
|
||||
&tree.iter().map(hex::encode).collect::<Vec<_>>()
|
||||
)?;
|
||||
self.conn.execute(
|
||||
"UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2",
|
||||
params![json, cid.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the preferred_tree for a blob upstream entry.
|
||||
pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<NodeId>> {
|
||||
let json: String = self.conn.query_row(
|
||||
"SELECT preferred_tree FROM blob_upstream WHERE cid = ?1",
|
||||
params![cid.as_slice()],
|
||||
|row| row.get(0),
|
||||
).unwrap_or_else(|_| "[]".to_string());
|
||||
Ok(parse_anchors_json(&json))
|
||||
}
|
||||
|
||||
// ---- Social Routes ----
|
||||
|
||||
/// Insert or update a social route entry.
|
||||
|
|
@ -3889,10 +3854,10 @@ impl Storage {
|
|||
GROUP BY post_id
|
||||
) r ON b.post_id = r.post_id
|
||||
LEFT JOIN (
|
||||
SELECT cid, COUNT(*) as ds_count
|
||||
FROM blob_downstream
|
||||
GROUP BY cid
|
||||
) d ON b.cid = d.cid"
|
||||
SELECT file_id, COUNT(*) as ds_count
|
||||
FROM file_holders
|
||||
GROUP BY file_id
|
||||
) d ON b.cid = d.file_id"
|
||||
)?;
|
||||
let rows = stmt.query_map(params![cutoff], |row| {
|
||||
let cid_bytes: Vec<u8> = row.get(0)?;
|
||||
|
|
@ -3946,11 +3911,10 @@ impl Storage {
|
|||
Ok(count as u64)
|
||||
}
|
||||
|
||||
/// Clean up all CDN metadata for a blob (manifests + upstream + downstream).
|
||||
/// Clean up all CDN metadata for a blob (manifests + file_holders).
|
||||
pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
||||
self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?;
|
||||
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
|
||||
self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?;
|
||||
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -3969,12 +3933,6 @@ impl Storage {
|
|||
Ok(cids)
|
||||
}
|
||||
|
||||
/// Remove upstream tracking for a blob CID.
|
||||
pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
||||
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn post_count(&self) -> anyhow::Result<usize> {
|
||||
let count: i64 = self
|
||||
.conn
|
||||
|
|
@ -4037,137 +3995,24 @@ impl Storage {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
/// Record the upstream source for a blob CID.
|
||||
pub fn store_blob_upstream(
|
||||
&self,
|
||||
cid: &[u8; 32],
|
||||
source_node_id: &NodeId,
|
||||
source_addresses: &[String],
|
||||
) -> anyhow::Result<()> {
|
||||
let addrs_json = serde_json::to_string(source_addresses)?;
|
||||
self.conn.execute(
|
||||
"INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4)
|
||||
ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4",
|
||||
params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the upstream source for a blob CID: (node_id, addresses).
|
||||
pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<Option<(NodeId, Vec<String>)>> {
|
||||
let result = self.conn.query_row(
|
||||
"SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1",
|
||||
params![cid.as_slice()],
|
||||
|row| {
|
||||
let nid_bytes: Vec<u8> = row.get(0)?;
|
||||
let addrs_json: String = row.get(1)?;
|
||||
Ok((nid_bytes, addrs_json))
|
||||
},
|
||||
);
|
||||
match result {
|
||||
Ok((nid_bytes, addrs_json)) => {
|
||||
let nid = blob_to_nodeid(nid_bytes)?;
|
||||
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
|
||||
Ok(Some((nid, addrs)))
|
||||
}
|
||||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a downstream peer for a blob CID. Returns false if already at 100 downstream.
|
||||
pub fn add_blob_downstream(
|
||||
&self,
|
||||
cid: &[u8; 32],
|
||||
peer_node_id: &NodeId,
|
||||
peer_addresses: &[String],
|
||||
) -> anyhow::Result<bool> {
|
||||
let count = self.get_blob_downstream_count(cid)?;
|
||||
if count >= 100 {
|
||||
return Ok(false);
|
||||
}
|
||||
let addrs_json = serde_json::to_string(peer_addresses)?;
|
||||
self.conn.execute(
|
||||
"INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4)
|
||||
ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4",
|
||||
params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()],
|
||||
)?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>.
|
||||
pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
|
||||
/// Get CIDs of manifests older than a cutoff. Callers look up holders
|
||||
/// via file_holders to pick a refresh source.
|
||||
pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result<Vec<[u8; 32]>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1"
|
||||
)?;
|
||||
let rows = stmt.query_map(params![cid.as_slice()], |row| {
|
||||
let nid_bytes: Vec<u8> = row.get(0)?;
|
||||
let addrs_json: String = row.get(1)?;
|
||||
Ok((nid_bytes, addrs_json))
|
||||
})?;
|
||||
let mut result = Vec::new();
|
||||
for row in rows {
|
||||
let (nid_bytes, addrs_json) = row?;
|
||||
let nid = blob_to_nodeid(nid_bytes)?;
|
||||
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
|
||||
result.push((nid, addrs));
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Count downstream peers for a blob CID.
|
||||
pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result<u32> {
|
||||
let count: i64 = self.conn.query_row(
|
||||
"SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1",
|
||||
params![cid.as_slice()],
|
||||
|row| row.get(0),
|
||||
)?;
|
||||
Ok(count as u32)
|
||||
}
|
||||
|
||||
/// Remove a downstream peer for a blob CID.
|
||||
pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> {
|
||||
self.conn.execute(
|
||||
"DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2",
|
||||
params![cid.as_slice(), peer_node_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>.
|
||||
pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result<Vec<([u8; 32], NodeId, Vec<String>)>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT m.cid, u.source_node_id, u.source_addresses
|
||||
FROM cdn_manifests m
|
||||
LEFT JOIN blob_upstream u ON m.cid = u.cid
|
||||
WHERE m.updated_at < ?1"
|
||||
"SELECT cid FROM cdn_manifests WHERE updated_at < ?1",
|
||||
)?;
|
||||
let rows = stmt.query_map(params![older_than_ms as i64], |row| {
|
||||
let cid_bytes: Vec<u8> = row.get(0)?;
|
||||
let nid_bytes: Option<Vec<u8>> = row.get(1)?;
|
||||
let addrs_json: Option<String> = row.get(2)?;
|
||||
Ok((cid_bytes, nid_bytes, addrs_json))
|
||||
Ok(cid_bytes)
|
||||
})?;
|
||||
let mut result = Vec::new();
|
||||
let mut out = Vec::new();
|
||||
for row in rows {
|
||||
let (cid_bytes, nid_bytes, addrs_json) = row?;
|
||||
let cid: [u8; 32] = match cid_bytes.try_into() {
|
||||
Ok(c) => c,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let nid = match nid_bytes {
|
||||
Some(b) => match blob_to_nodeid(b) {
|
||||
Ok(n) => n,
|
||||
Err(_) => continue,
|
||||
},
|
||||
None => continue,
|
||||
};
|
||||
let addrs: Vec<String> = addrs_json
|
||||
.map(|j| serde_json::from_str(&j).unwrap_or_default())
|
||||
.unwrap_or_default();
|
||||
result.push((cid, nid, addrs));
|
||||
let cid_bytes = row?;
|
||||
if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) {
|
||||
out.push(cid);
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Get the 10 posts before and 10 posts after a reference timestamp for an author.
|
||||
|
|
@ -4271,128 +4116,148 @@ impl Storage {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
// --- Engagement: post_downstream ---
|
||||
// --- File holders (flat, per-file, LRU-capped at 5) ---
|
||||
//
|
||||
// A single table for PostId-keyed engagement propagation and CID-keyed
|
||||
// manifest/blob propagation. Any 32-byte content-addressed file_id fits.
|
||||
|
||||
/// Register a peer as downstream for a post (max 100 per post).
|
||||
/// Returns true if added, false if at capacity.
|
||||
pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<bool> {
|
||||
let count: i64 = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
|
||||
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
|
||||
if count >= 100 {
|
||||
return Ok(false);
|
||||
}
|
||||
self.conn.execute(
|
||||
"INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3)
|
||||
ON CONFLICT DO NOTHING",
|
||||
params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()],
|
||||
)?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Get all downstream peers for a post.
|
||||
pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result<Vec<NodeId>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT peer_node_id FROM post_downstream WHERE post_id = ?1"
|
||||
)?;
|
||||
let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
|
||||
let mut result = Vec::new();
|
||||
for row in rows {
|
||||
if let Ok(nid) = blob_to_nodeid(row?) {
|
||||
result.push(nid);
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Remove a downstream peer for a post.
|
||||
pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
|
||||
self.conn.execute(
|
||||
"DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2",
|
||||
params![post_id.as_slice(), peer_node_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Engagement: post_upstream (multi-upstream, 3 max) ---
|
||||
|
||||
/// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post.
|
||||
pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> {
|
||||
// Check current count
|
||||
let count: i64 = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1"
|
||||
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
|
||||
if count >= 3 {
|
||||
return Ok(()); // Already at cap
|
||||
}
|
||||
/// Upsert a holder for a file. Bumps last_interaction_ms to now and
|
||||
/// enforces an LRU cap of 5 holders per file.
|
||||
pub fn touch_file_holder(
|
||||
&self,
|
||||
file_id: &[u8; 32],
|
||||
peer_id: &NodeId,
|
||||
peer_addresses: &[String],
|
||||
direction: HolderDirection,
|
||||
) -> anyhow::Result<()> {
|
||||
let addrs_json = serde_json::to_string(peer_addresses)?;
|
||||
let now = now_ms();
|
||||
let new_dir = direction.as_str();
|
||||
// Upsert. If the row exists with a different direction, promote to "both".
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now],
|
||||
"INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5)
|
||||
ON CONFLICT(file_id, peer_id) DO UPDATE SET
|
||||
peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END,
|
||||
last_interaction_ms = ?4,
|
||||
direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END",
|
||||
params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir],
|
||||
)?;
|
||||
// Enforce LRU cap of 5. Oldest get dropped.
|
||||
self.conn.execute(
|
||||
"DELETE FROM file_holders
|
||||
WHERE file_id = ?1
|
||||
AND peer_id NOT IN (
|
||||
SELECT peer_id FROM file_holders
|
||||
WHERE file_id = ?1
|
||||
ORDER BY last_interaction_ms DESC
|
||||
LIMIT 5
|
||||
)",
|
||||
params![file_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all upstream peers for a post, ordered by priority ASC (0 = primary).
|
||||
pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result<Vec<(NodeId, u8)>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC"
|
||||
)?;
|
||||
let rows = stmt.query_map(params![post_id.as_slice()], |row| {
|
||||
let bytes: Vec<u8> = row.get(0)?;
|
||||
let prio: i64 = row.get(1)?;
|
||||
Ok((bytes, prio as u8))
|
||||
})?;
|
||||
let mut result = Vec::new();
|
||||
for row in rows {
|
||||
let (bytes, prio) = row?;
|
||||
if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) {
|
||||
result.push((nid, prio));
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Get the primary (lowest priority) upstream peer for a post.
|
||||
/// Backward-compatible wrapper for code that only needs a single upstream.
|
||||
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
|
||||
let upstreams = self.get_post_upstreams(post_id)?;
|
||||
Ok(upstreams.into_iter().next().map(|(nid, _)| nid))
|
||||
}
|
||||
|
||||
/// Remove a specific upstream peer for a post.
|
||||
pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
|
||||
self.conn.execute(
|
||||
"DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2",
|
||||
params![post_id.as_slice(), peer_node_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Promote an upstream peer to primary (priority 0), pushing others up.
|
||||
pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
|
||||
// Shift all priorities up by 1
|
||||
self.conn.execute(
|
||||
"UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1",
|
||||
params![post_id.as_slice()],
|
||||
)?;
|
||||
// Set the promoted peer to priority 0
|
||||
self.conn.execute(
|
||||
"UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2",
|
||||
params![post_id.as_slice(), peer_node_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Count downstream peers for a post.
|
||||
pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result<u32> {
|
||||
/// Count file holders (bounded at 5 by touch_file_holder's LRU cap).
|
||||
pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result<u32> {
|
||||
let count: i64 = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
|
||||
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
|
||||
"SELECT COUNT(*) FROM file_holders WHERE file_id = ?1",
|
||||
)?.query_row(params![file_id.as_slice()], |row| row.get(0))?;
|
||||
Ok(count as u32)
|
||||
}
|
||||
|
||||
/// Return the up-to-5 most recently interacted holders of a file.
|
||||
pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT peer_id, peer_addresses FROM file_holders
|
||||
WHERE file_id = ?1
|
||||
ORDER BY last_interaction_ms DESC
|
||||
LIMIT 5",
|
||||
)?;
|
||||
let rows = stmt.query_map(params![file_id.as_slice()], |row| {
|
||||
let peer_bytes: Vec<u8> = row.get(0)?;
|
||||
let addrs_json: String = row.get(1)?;
|
||||
Ok((peer_bytes, addrs_json))
|
||||
})?;
|
||||
let mut out = Vec::new();
|
||||
for row in rows {
|
||||
let (peer_bytes, addrs_json) = row?;
|
||||
if peer_bytes.len() != 32 { continue; }
|
||||
let mut peer = [0u8; 32];
|
||||
peer.copy_from_slice(&peer_bytes);
|
||||
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
|
||||
out.push((NodeId::from(peer), addrs));
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Remove all holders for a file (e.g. on post/blob deletion).
|
||||
pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> {
|
||||
self.conn.execute(
|
||||
"DELETE FROM file_holders WHERE file_id = ?1",
|
||||
params![file_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove a single peer's holder entry for a file.
|
||||
pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> {
|
||||
self.conn.execute(
|
||||
"DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
|
||||
params![file_id.as_slice(), peer_id.as_slice()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// One-time migration: seed file_holders from the legacy upstream/downstream
|
||||
/// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder
|
||||
/// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the
|
||||
/// PRIMARY KEY. Skips tables that don't exist on fresh installs.
|
||||
pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> {
|
||||
// Skip if file_holders already populated (idempotent re-run protection).
|
||||
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")?
|
||||
.query_row([], |row| row.get(0))?;
|
||||
if existing > 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let now = now_ms() as i64;
|
||||
let table_exists = |name: &str| -> anyhow::Result<bool> {
|
||||
let count: i64 = self.conn.prepare(
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
|
||||
)?.query_row(params![name], |row| row.get(0))?;
|
||||
Ok(count > 0)
|
||||
};
|
||||
if table_exists("post_upstream")? {
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
|
||||
params![now],
|
||||
)?;
|
||||
}
|
||||
if table_exists("post_downstream")? {
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
|
||||
params![now],
|
||||
)?;
|
||||
}
|
||||
if table_exists("blob_upstream")? {
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
|
||||
params![now],
|
||||
)?;
|
||||
}
|
||||
if table_exists("blob_downstream")? {
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
|
||||
params![now],
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Engagement: reactions ---
|
||||
|
||||
/// Store a reaction (upsert by reactor+post_id+emoji).
|
||||
|
|
@ -5301,60 +5166,6 @@ mod tests {
|
|||
assert_eq!(manifests[0].0, cid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_upstream_crud() {
|
||||
let s = temp_storage();
|
||||
let cid = [42u8; 32];
|
||||
let source = make_node_id(1);
|
||||
let addrs = vec!["10.0.0.1:4433".to_string()];
|
||||
|
||||
s.store_blob_upstream(&cid, &source, &addrs).unwrap();
|
||||
let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap();
|
||||
assert_eq!(nid, source);
|
||||
assert_eq!(got_addrs, addrs);
|
||||
|
||||
// Missing
|
||||
assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none());
|
||||
|
||||
// Update
|
||||
let source2 = make_node_id(2);
|
||||
s.store_blob_upstream(&cid, &source2, &[]).unwrap();
|
||||
let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap();
|
||||
assert_eq!(nid, source2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_downstream_crud_and_limit() {
|
||||
let s = temp_storage();
|
||||
let cid = [42u8; 32];
|
||||
|
||||
// Add downstream peers
|
||||
for i in 0..100u8 {
|
||||
let peer = make_node_id(i);
|
||||
let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap();
|
||||
assert!(ok, "should accept peer {}", i);
|
||||
}
|
||||
|
||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100);
|
||||
|
||||
// 101st should be rejected
|
||||
let peer_101 = make_node_id(200);
|
||||
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
|
||||
assert!(!ok, "should reject 101st downstream");
|
||||
|
||||
// Get all downstream
|
||||
let downstream = s.get_blob_downstream(&cid).unwrap();
|
||||
assert_eq!(downstream.len(), 100);
|
||||
|
||||
// Remove one
|
||||
s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap();
|
||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99);
|
||||
|
||||
// Now adding one more should work
|
||||
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
|
||||
assert!(ok, "should accept after removal");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_pin_unpin() {
|
||||
let s = temp_storage();
|
||||
|
|
@ -5438,18 +5249,15 @@ mod tests {
|
|||
let peer = make_node_id(2);
|
||||
|
||||
s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap();
|
||||
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
|
||||
s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap();
|
||||
s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap();
|
||||
|
||||
assert!(s.get_cdn_manifest(&cid).unwrap().is_some());
|
||||
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
|
||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1);
|
||||
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1);
|
||||
|
||||
s.cleanup_cdn_for_blob(&cid).unwrap();
|
||||
|
||||
assert!(s.get_cdn_manifest(&cid).unwrap().is_none());
|
||||
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
|
||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0);
|
||||
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -5469,18 +5277,6 @@ mod tests {
|
|||
assert!(cids.contains(&cid2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_blob_upstream() {
|
||||
let s = temp_storage();
|
||||
let cid = [42u8; 32];
|
||||
let peer = make_node_id(1);
|
||||
|
||||
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
|
||||
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
|
||||
|
||||
s.remove_blob_upstream(&cid).unwrap();
|
||||
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn author_post_neighborhood() {
|
||||
|
|
@ -5840,24 +5636,6 @@ mod tests {
|
|||
assert_eq!(got2.preferred_tree.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_upstream_preferred_tree() {
|
||||
let s = temp_storage();
|
||||
let cid = [42u8; 32];
|
||||
let source = make_node_id(1);
|
||||
s.store_blob_upstream(&cid, &source, &[]).unwrap();
|
||||
|
||||
// Initially empty
|
||||
let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap();
|
||||
assert!(tree.is_empty());
|
||||
|
||||
// Update
|
||||
let nodes = vec![make_node_id(10), make_node_id(11)];
|
||||
s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap();
|
||||
let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap();
|
||||
assert_eq!(tree2.len(), 2);
|
||||
}
|
||||
|
||||
// ---- Circle Profile tests ----
|
||||
|
||||
#[test]
|
||||
|
|
@ -6058,32 +5836,39 @@ mod tests {
|
|||
// --- Engagement tests ---
|
||||
|
||||
#[test]
|
||||
fn post_downstream_crud() {
|
||||
fn file_holders_lru_cap() {
|
||||
let s = temp_storage();
|
||||
let post_id = make_post_id(1);
|
||||
let peer1 = make_node_id(1);
|
||||
let peer2 = make_node_id(2);
|
||||
|
||||
// Add downstream peers
|
||||
assert!(s.add_post_downstream(&post_id, &peer1).unwrap());
|
||||
assert!(s.add_post_downstream(&post_id, &peer2).unwrap());
|
||||
|
||||
let downstream = s.get_post_downstream(&post_id).unwrap();
|
||||
assert_eq!(downstream.len(), 2);
|
||||
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2);
|
||||
|
||||
// Remove one
|
||||
s.remove_post_downstream(&post_id, &peer1).unwrap();
|
||||
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1);
|
||||
|
||||
// Capacity limit
|
||||
let big_post = make_post_id(99);
|
||||
for i in 0..100u8 {
|
||||
assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap());
|
||||
let file = [42u8; 32];
|
||||
// Sleep between inserts so last_interaction_ms actually differs (ms resolution).
|
||||
for i in 0..7u8 {
|
||||
s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap();
|
||||
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||
}
|
||||
assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100);
|
||||
// 101st should fail
|
||||
assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap());
|
||||
// Only 5 most-recent survive
|
||||
assert_eq!(s.get_file_holder_count(&file).unwrap(), 5);
|
||||
let holders = s.get_file_holders(&file).unwrap();
|
||||
assert_eq!(holders.len(), 5);
|
||||
let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect();
|
||||
// Oldest two (i=0, i=1) got evicted; most recent (i=6) survives
|
||||
assert!(!kept.contains(&make_node_id(0)));
|
||||
assert!(!kept.contains(&make_node_id(1)));
|
||||
assert!(kept.contains(&make_node_id(6)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_holders_direction_promotion() {
|
||||
let s = temp_storage();
|
||||
let file = [42u8; 32];
|
||||
let peer = make_node_id(1);
|
||||
s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap();
|
||||
s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap();
|
||||
// Re-insert with opposite direction should promote to "both"
|
||||
let dir: String = s.conn.query_row(
|
||||
"SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
|
||||
rusqlite::params![file.as_slice(), peer.as_slice()],
|
||||
|row| row.get(0),
|
||||
).unwrap();
|
||||
assert_eq!(dir, "both");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
|
|
@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
|
|||
if let Some(author) = author_id {
|
||||
holders.push(author);
|
||||
}
|
||||
if let Ok(downstream) = store.get_post_downstream(&post_id) {
|
||||
for peer in downstream {
|
||||
if let Ok(file_holders) = store.get_file_holders(&post_id) {
|
||||
for (peer, _addrs) in file_holders {
|
||||
if !holders.contains(&peer) {
|
||||
holders.push(peer);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue