Compare commits
6 commits
921a0ec40a
...
5d9ba22427
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d9ba22427 | ||
|
|
60463d1817 | ||
|
|
3a0d2e93ab | ||
|
|
0b2b4f5a68 | ||
|
|
1658762a68 | ||
|
|
e6265b52b6 |
6 changed files with 470 additions and 736 deletions
|
|
@ -1393,8 +1393,12 @@ impl ConnectionManager {
|
||||||
{
|
{
|
||||||
let s = storage.get().await;
|
let s = storage.get().await;
|
||||||
for pid in &new_post_ids {
|
for pid in &new_post_ids {
|
||||||
let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
|
let _ = s.touch_file_holder(
|
||||||
let _ = s.add_post_upstream(pid, peer_id, prio);
|
pid,
|
||||||
|
peer_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
for author in &synced_authors {
|
for author in &synced_authors {
|
||||||
let _ = s.update_follow_last_sync(author, now_ms);
|
let _ = s.update_follow_last_sync(author, now_ms);
|
||||||
|
|
@ -1940,8 +1944,12 @@ impl ConnectionManager {
|
||||||
{
|
{
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
for pid in &new_post_ids {
|
for pid in &new_post_ids {
|
||||||
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
|
let _ = storage.touch_file_holder(
|
||||||
let _ = storage.add_post_upstream(pid, from, prio);
|
pid,
|
||||||
|
from,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
for author in &synced_authors {
|
for author in &synced_authors {
|
||||||
let _ = storage.update_follow_last_sync(author, now_ms);
|
let _ = storage.update_follow_last_sync(author, now_ms);
|
||||||
|
|
@ -2034,8 +2042,12 @@ impl ConnectionManager {
|
||||||
{
|
{
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
for pid in &new_post_ids {
|
for pid in &new_post_ids {
|
||||||
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
|
let _ = storage.touch_file_holder(
|
||||||
let _ = storage.add_post_upstream(pid, peer_id, prio);
|
pid,
|
||||||
|
peer_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
for author in &synced_authors {
|
for author in &synced_authors {
|
||||||
let _ = storage.update_follow_last_sync(author, now_ms);
|
let _ = storage.update_follow_last_sync(author, now_ms);
|
||||||
|
|
@ -2810,13 +2822,9 @@ impl ConnectionManager {
|
||||||
if store.get_post_with_visibility(post_id).ok().flatten().is_some() {
|
if store.get_post_with_visibility(post_id).ok().flatten().is_some() {
|
||||||
Some(self.our_node_id)
|
Some(self.our_node_id)
|
||||||
} else {
|
} else {
|
||||||
// CDN tree: do any of our downstream hosts have it?
|
// Any known holder of this post?
|
||||||
let downstream = store.get_post_downstream(post_id).unwrap_or_default();
|
let holders = store.get_file_holders(post_id).unwrap_or_default();
|
||||||
if !downstream.is_empty() {
|
holders.first().map(|(nid, _)| *nid)
|
||||||
Some(downstream[0])
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
post_holder = found;
|
post_holder = found;
|
||||||
|
|
@ -2830,9 +2838,9 @@ impl ConnectionManager {
|
||||||
// Check CDN: do we know who has it via blob post ownership?
|
// Check CDN: do we know who has it via blob post ownership?
|
||||||
let store = self.storage.get().await;
|
let store = self.storage.get().await;
|
||||||
if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) {
|
if let Ok(Some(pid)) = store.get_blob_post_id(blob_id) {
|
||||||
let downstream = store.get_post_downstream(&pid).unwrap_or_default();
|
let holders = store.get_file_holders(&pid).unwrap_or_default();
|
||||||
if !downstream.is_empty() {
|
if let Some((nid, _)) = holders.first() {
|
||||||
blob_holder = Some(downstream[0]);
|
blob_holder = Some(*nid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -4871,7 +4879,7 @@ impl ConnectionManager {
|
||||||
let cm = conn_mgr.lock().await;
|
let cm = conn_mgr.lock().await;
|
||||||
|
|
||||||
// Collect blob CIDs + CDN peers before async work
|
// Collect blob CIDs + CDN peers before async work
|
||||||
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = Vec::new();
|
let mut blob_cleanup: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>)> = Vec::new();
|
||||||
{
|
{
|
||||||
let storage = cm.storage.get().await;
|
let storage = cm.storage.get().await;
|
||||||
for dr in &payload.records {
|
for dr in &payload.records {
|
||||||
|
|
@ -4879,9 +4887,8 @@ impl ConnectionManager {
|
||||||
// Collect blobs for CDN cleanup before deleting
|
// Collect blobs for CDN cleanup before deleting
|
||||||
let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default();
|
let blob_cids = storage.get_blobs_for_post(&dr.post_id).unwrap_or_default();
|
||||||
for cid in blob_cids {
|
for cid in blob_cids {
|
||||||
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
|
let holders = storage.get_file_holders(&cid).unwrap_or_default();
|
||||||
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
|
blob_cleanup.push((cid, holders));
|
||||||
blob_cleanup.push((cid, downstream, upstream));
|
|
||||||
}
|
}
|
||||||
let _ = storage.store_delete(dr);
|
let _ = storage.store_delete(dr);
|
||||||
let _ = storage.apply_delete(dr);
|
let _ = storage.apply_delete(dr);
|
||||||
|
|
@ -4897,18 +4904,11 @@ impl ConnectionManager {
|
||||||
|
|
||||||
// Gather connections for CDN delete notices under lock, then send outside
|
// Gather connections for CDN delete notices under lock, then send outside
|
||||||
let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new();
|
let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new();
|
||||||
for (cid, downstream, upstream) in &blob_cleanup {
|
for (cid, holders) in &blob_cleanup {
|
||||||
let upstream_info = upstream.as_ref().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs.clone() });
|
let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
|
||||||
let ds_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: upstream_info };
|
for (peer, _addrs) in holders {
|
||||||
for (ds_nid, _) in downstream {
|
if let Some(pc) = cm.connections_ref().get(peer) {
|
||||||
if let Some(pc) = cm.connections_ref().get(ds_nid) {
|
delete_notices.push((pc.connection.clone(), payload.clone()));
|
||||||
delete_notices.push((pc.connection.clone(), ds_payload.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some((up_nid, _)) = upstream {
|
|
||||||
let up_payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
|
|
||||||
if let Some(pc) = cm.connections_ref().get(up_nid) {
|
|
||||||
delete_notices.push((pc.connection.clone(), up_payload));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -4958,24 +4958,38 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
MessageType::PostPush => {
|
MessageType::PostPush => {
|
||||||
let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||||
let cm = conn_mgr.lock().await;
|
// Encrypted posts are no longer accepted via direct push — they propagate
|
||||||
let storage = cm.storage.get().await;
|
// via the CDN to eliminate the sender→recipient traffic signal.
|
||||||
if !storage.is_deleted(&push.post.id)?
|
if !matches!(push.post.visibility, crate::types::PostVisibility::Public) {
|
||||||
&& storage.get_post(&push.post.id)?.is_none()
|
debug!(
|
||||||
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
|
|
||||||
{
|
|
||||||
let _ = storage.store_post_with_visibility(
|
|
||||||
&push.post.id,
|
|
||||||
&push.post.post,
|
|
||||||
&push.post.visibility,
|
|
||||||
);
|
|
||||||
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
|
|
||||||
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
|
|
||||||
info!(
|
|
||||||
peer = hex::encode(remote_node_id),
|
peer = hex::encode(remote_node_id),
|
||||||
post_id = hex::encode(push.post.id),
|
post_id = hex::encode(push.post.id),
|
||||||
"Received direct post push"
|
"Ignoring non-public PostPush"
|
||||||
);
|
);
|
||||||
|
} else {
|
||||||
|
let cm = conn_mgr.lock().await;
|
||||||
|
let storage = cm.storage.get().await;
|
||||||
|
if !storage.is_deleted(&push.post.id)?
|
||||||
|
&& storage.get_post(&push.post.id)?.is_none()
|
||||||
|
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
|
||||||
|
{
|
||||||
|
let _ = storage.store_post_with_visibility(
|
||||||
|
&push.post.id,
|
||||||
|
&push.post.post,
|
||||||
|
&push.post.visibility,
|
||||||
|
);
|
||||||
|
let _ = storage.touch_file_holder(
|
||||||
|
&push.post.id,
|
||||||
|
&remote_node_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
|
info!(
|
||||||
|
peer = hex::encode(remote_node_id),
|
||||||
|
post_id = hex::encode(push.post.id),
|
||||||
|
"Received direct post push"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MessageType::AudienceRequest => {
|
MessageType::AudienceRequest => {
|
||||||
|
|
@ -5063,17 +5077,24 @@ impl ConnectionManager {
|
||||||
&entry.manifest.author_manifest.author,
|
&entry.manifest.author_manifest.author,
|
||||||
entry.manifest.author_manifest.updated_at,
|
entry.manifest.author_manifest.updated_at,
|
||||||
);
|
);
|
||||||
|
// Remote peer pushed us this manifest → they hold the file.
|
||||||
|
let _ = storage.touch_file_holder(
|
||||||
|
&entry.cid,
|
||||||
|
&remote_node_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
stored_entries.push(entry.clone());
|
stored_entries.push(entry.clone());
|
||||||
}
|
}
|
||||||
// Gather downstream peers for relay before dropping locks
|
// Gather file holders for relay before dropping locks
|
||||||
let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new();
|
let mut relay_targets: Vec<(NodeId, crate::protocol::ManifestPushPayload)> = Vec::new();
|
||||||
for entry in &stored_entries {
|
for entry in &stored_entries {
|
||||||
let downstream = storage.get_blob_downstream(&entry.cid).unwrap_or_default();
|
let holders = storage.get_file_holders(&entry.cid).unwrap_or_default();
|
||||||
for (ds_nid, _) in downstream {
|
for (peer, _addrs) in holders {
|
||||||
if ds_nid == remote_node_id {
|
if peer == remote_node_id {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
relay_targets.push((ds_nid, crate::protocol::ManifestPushPayload {
|
relay_targets.push((peer, crate::protocol::ManifestPushPayload {
|
||||||
manifests: vec![entry.clone()],
|
manifests: vec![entry.clone()],
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
@ -5176,8 +5197,12 @@ impl ConnectionManager {
|
||||||
let cm = cm_arc.lock().await;
|
let cm = cm_arc.lock().await;
|
||||||
let storage = cm.storage.get().await;
|
let storage = cm.storage.get().await;
|
||||||
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
|
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
|
||||||
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
|
let _ = storage.touch_file_holder(
|
||||||
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
|
&sync_post.id,
|
||||||
|
&sender_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
let now = std::time::SystemTime::now()
|
let now = std::time::SystemTime::now()
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
|
|
@ -5268,32 +5293,14 @@ impl ConnectionManager {
|
||||||
let storage = cm.storage.get().await;
|
let storage = cm.storage.get().await;
|
||||||
let cid = payload.cid;
|
let cid = payload.cid;
|
||||||
|
|
||||||
// Check if sender was our upstream for this blob
|
// Flat-holder model: drop the sender as a holder of this file.
|
||||||
let was_upstream = storage.get_blob_upstream(&cid).ok().flatten()
|
// The author's DeleteRecord (separate signed message) is what
|
||||||
.map(|(nid, _)| nid == remote_node_id)
|
// triggers the actual blob removal for followers.
|
||||||
.unwrap_or(false);
|
let _ = storage.remove_file_holder(&cid, &remote_node_id);
|
||||||
|
|
||||||
if was_upstream {
|
|
||||||
// Sender was our upstream — clear it
|
|
||||||
let _ = storage.remove_blob_upstream(&cid);
|
|
||||||
|
|
||||||
// If they provided their upstream, store it as our new upstream
|
|
||||||
if let Some(ref new_up) = payload.upstream_node {
|
|
||||||
if let Ok(nid_bytes) = hex::decode(&new_up.n) {
|
|
||||||
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
|
|
||||||
let _ = storage.store_blob_upstream(&cid, &nid, &new_up.a);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Sender was our downstream — remove them
|
|
||||||
let _ = storage.remove_blob_downstream(&cid, &remote_node_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
peer = hex::encode(remote_node_id),
|
peer = hex::encode(remote_node_id),
|
||||||
cid = hex::encode(cid),
|
cid = hex::encode(cid),
|
||||||
was_upstream,
|
|
||||||
"Received blob delete notice"
|
"Received blob delete notice"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -5437,7 +5444,12 @@ impl ConnectionManager {
|
||||||
let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||||
let cm = conn_mgr.lock().await;
|
let cm = conn_mgr.lock().await;
|
||||||
let storage = cm.storage.get().await;
|
let storage = cm.storage.get().await;
|
||||||
let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id);
|
let _ = storage.touch_file_holder(
|
||||||
|
&payload.post_id,
|
||||||
|
&remote_node_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Sent,
|
||||||
|
);
|
||||||
drop(storage);
|
drop(storage);
|
||||||
trace!(
|
trace!(
|
||||||
peer = hex::encode(remote_node_id),
|
peer = hex::encode(remote_node_id),
|
||||||
|
|
@ -5692,15 +5704,28 @@ impl ConnectionManager {
|
||||||
let storage = storage.get().await;
|
let storage = storage.get().await;
|
||||||
let manifest: Option<crate::types::CdnManifest> = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| {
|
let manifest: Option<crate::types::CdnManifest> = storage.get_cdn_manifest(&payload.cid).ok().flatten().and_then(|json| {
|
||||||
if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
|
if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
|
||||||
let ds_count = storage.get_blob_downstream_count(&payload.cid).unwrap_or(0);
|
let ds_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0);
|
||||||
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
|
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
|
||||||
} else { serde_json::from_str(&json).ok() }
|
} else { serde_json::from_str(&json).ok() }
|
||||||
});
|
});
|
||||||
let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() {
|
let (cdn_registered, cdn_redirect_peers) = if !payload.requester_addresses.is_empty() {
|
||||||
let ok = storage.add_blob_downstream(&payload.cid, &remote_node_id, &payload.requester_addresses).unwrap_or(false);
|
let prior_count = storage.get_file_holder_count(&payload.cid).unwrap_or(0);
|
||||||
if ok { (true, vec![]) } else {
|
let _ = storage.touch_file_holder(
|
||||||
let downstream = storage.get_blob_downstream(&payload.cid).unwrap_or_default();
|
&payload.cid,
|
||||||
let redirects: Vec<PeerWithAddress> = downstream.into_iter().map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs }).collect();
|
&remote_node_id,
|
||||||
|
&payload.requester_addresses,
|
||||||
|
crate::storage::HolderDirection::Sent,
|
||||||
|
);
|
||||||
|
// If we already had 5 holders before adding this one, the
|
||||||
|
// requester should consult them too for CDN lookups.
|
||||||
|
if prior_count < 5 {
|
||||||
|
(true, vec![])
|
||||||
|
} else {
|
||||||
|
let holders = storage.get_file_holders(&payload.cid).unwrap_or_default();
|
||||||
|
let redirects: Vec<PeerWithAddress> = holders.into_iter()
|
||||||
|
.filter(|(nid, _)| *nid != remote_node_id)
|
||||||
|
.map(|(nid, addrs)| PeerWithAddress { n: hex::encode(nid), a: addrs })
|
||||||
|
.collect();
|
||||||
(false, redirects)
|
(false, redirects)
|
||||||
}
|
}
|
||||||
} else { (false, vec![]) };
|
} else { (false, vec![]) };
|
||||||
|
|
@ -5727,7 +5752,7 @@ impl ConnectionManager {
|
||||||
Some(json) => {
|
Some(json) => {
|
||||||
let manifest = if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
|
let manifest = if let Ok(am) = serde_json::from_str::<crate::types::AuthorManifest>(&json) {
|
||||||
if am.updated_at > payload.current_updated_at {
|
if am.updated_at > payload.current_updated_at {
|
||||||
let ds_count = store.get_blob_downstream_count(&payload.cid).unwrap_or(0);
|
let ds_count = store.get_file_holder_count(&payload.cid).unwrap_or(0);
|
||||||
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
|
Some(crate::types::CdnManifest { author_manifest: am, host: our_node_id, host_addresses: vec![], source: our_node_id, source_addresses: vec![], downstream_count: ds_count })
|
||||||
} else { None }
|
} else { None }
|
||||||
} else { None };
|
} else { None };
|
||||||
|
|
@ -6072,9 +6097,14 @@ impl ConnectionManager {
|
||||||
to_pull.push(*pid);
|
to_pull.push(*pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register as downstream for all accepted posts
|
// Register as holder for all accepted posts
|
||||||
for pid in &acc {
|
for pid in &acc {
|
||||||
let _ = storage.add_post_downstream(pid, &remote_node_id);
|
let _ = storage.touch_file_holder(
|
||||||
|
pid,
|
||||||
|
&remote_node_id,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Sent,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
(acc, rej, to_pull)
|
(acc, rej, to_pull)
|
||||||
|
|
@ -6125,8 +6155,12 @@ impl ConnectionManager {
|
||||||
let cm = cm_arc.lock().await;
|
let cm = cm_arc.lock().await;
|
||||||
let storage = cm.storage.get().await;
|
let storage = cm.storage.get().await;
|
||||||
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
|
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
|
||||||
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
|
let _ = storage.touch_file_holder(
|
||||||
let _ = storage.add_post_upstream(&sp.id, &sender, prio);
|
&sp.id,
|
||||||
|
&sender,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
let blob_store = cm.blob_store.clone();
|
let blob_store = cm.blob_store.clone();
|
||||||
drop(storage);
|
drop(storage);
|
||||||
drop(cm);
|
drop(cm);
|
||||||
|
|
@ -6153,7 +6187,12 @@ impl ConnectionManager {
|
||||||
let cm = cm_arc.lock().await;
|
let cm = cm_arc.lock().await;
|
||||||
let storage = cm.storage.get().await;
|
let storage = cm.storage.get().await;
|
||||||
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
|
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
|
||||||
let _ = storage.add_post_upstream(&att.cid, &sender, 0);
|
let _ = storage.touch_file_holder(
|
||||||
|
&att.cid,
|
||||||
|
&sender,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}.await;
|
}.await;
|
||||||
|
|
@ -6178,12 +6217,14 @@ impl ConnectionManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate to downstream + upstream.
|
/// Handle an incoming BlobHeaderDiff — store engagement ops and re-propagate
|
||||||
|
/// to the post's file_holders (flat set, up to 5 most recent).
|
||||||
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
|
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
|
||||||
use crate::types::BlobHeaderDiffOp;
|
use crate::types::BlobHeaderDiffOp;
|
||||||
|
|
||||||
// Gather policy + audience data, then drop lock immediately
|
// Gather policy + audience data + holders, then drop lock immediately.
|
||||||
let (policy, approved_audience, downstream, upstreams) = {
|
// Remote peer clearly holds this post — record them as a holder.
|
||||||
|
let (policy, approved_audience, holders) = {
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
let policy = storage.get_comment_policy(&payload.post_id)
|
let policy = storage.get_comment_policy(&payload.post_id)
|
||||||
.ok()
|
.ok()
|
||||||
|
|
@ -6193,13 +6234,18 @@ impl ConnectionManager {
|
||||||
crate::types::AudienceDirection::Inbound,
|
crate::types::AudienceDirection::Inbound,
|
||||||
Some(crate::types::AudienceStatus::Approved),
|
Some(crate::types::AudienceStatus::Approved),
|
||||||
).unwrap_or_default();
|
).unwrap_or_default();
|
||||||
let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default();
|
let _ = storage.touch_file_holder(
|
||||||
let upstreams: Vec<NodeId> = storage.get_post_upstreams(&payload.post_id)
|
&payload.post_id,
|
||||||
|
&sender,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
|
let holders: Vec<NodeId> = storage.get_file_holders(&payload.post_id)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(nid, _)| nid)
|
.map(|(nid, _addrs)| nid)
|
||||||
.collect();
|
.collect();
|
||||||
(policy, approved, downstream, upstreams)
|
(policy, approved, holders)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Filter ops using gathered data (no lock held)
|
// Filter ops using gathered data (no lock held)
|
||||||
|
|
@ -6381,26 +6427,16 @@ impl ConnectionManager {
|
||||||
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
|
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect all targets (downstream + all upstreams), then send in a single batched task
|
// Re-propagate to all file holders (flat set, max 5). Exclude sender.
|
||||||
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
|
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
|
||||||
for peer_id in downstream {
|
for peer_id in &holders {
|
||||||
if peer_id == sender { continue; }
|
if *peer_id == sender { continue; }
|
||||||
if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone())
|
if let Some(conn) = self.connections.get(peer_id).map(|mc| mc.connection.clone())
|
||||||
.or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone()))
|
.or_else(|| self.sessions.get(peer_id).map(|sc| sc.connection.clone()))
|
||||||
{
|
{
|
||||||
targets.push(conn);
|
targets.push(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Phase 6: Try all upstreams, not just one
|
|
||||||
for up in &upstreams {
|
|
||||||
if *up != sender {
|
|
||||||
if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone())
|
|
||||||
.or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone()))
|
|
||||||
{
|
|
||||||
targets.push(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !targets.is_empty() {
|
if !targets.is_empty() {
|
||||||
let payload_clone = payload.clone();
|
let payload_clone = payload.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -7684,8 +7720,8 @@ impl ConnectionActor {
|
||||||
if s.get_post_with_visibility(post_id).ok().flatten().is_some() {
|
if s.get_post_with_visibility(post_id).ok().flatten().is_some() {
|
||||||
post_holder = Some(ctx.our_node_id);
|
post_holder = Some(ctx.our_node_id);
|
||||||
} else {
|
} else {
|
||||||
let downstream = s.get_post_downstream(post_id).unwrap_or_default();
|
let holders = s.get_file_holders(post_id).unwrap_or_default();
|
||||||
if !downstream.is_empty() { post_holder = Some(downstream[0]); }
|
if let Some((nid, _)) = holders.first() { post_holder = Some(*nid); }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -7695,8 +7731,8 @@ impl ConnectionActor {
|
||||||
} else {
|
} else {
|
||||||
let s = ctx.storage.get().await;
|
let s = ctx.storage.get().await;
|
||||||
if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) {
|
if let Ok(Some(pid)) = s.get_blob_post_id(blob_id) {
|
||||||
let downstream = s.get_post_downstream(&pid).unwrap_or_default();
|
let holders = s.get_file_holders(&pid).unwrap_or_default();
|
||||||
if !downstream.is_empty() { blob_holder = Some(downstream[0]); }
|
if let Some((nid, _)) = holders.first() { blob_holder = Some(*nid); }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -378,7 +378,11 @@ async fn try_redirect(
|
||||||
Ok(Some((_, PostVisibility::Public))) => {}
|
Ok(Some((_, PostVisibility::Public))) => {}
|
||||||
_ => return false, // not found or not public — hard close
|
_ => return false, // not found or not public — hard close
|
||||||
}
|
}
|
||||||
store.get_post_downstream(post_id).unwrap_or_default()
|
store.get_file_holders(post_id)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(nid, _addrs)| nid)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get addresses for downstream peers
|
// Get addresses for downstream peers
|
||||||
|
|
|
||||||
|
|
@ -902,50 +902,6 @@ impl Network {
|
||||||
self.send_to_audience(MessageType::PostNotification, &payload).await
|
self.send_to_audience(MessageType::PostNotification, &payload).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a full post directly to recipients (persistent if available, ephemeral otherwise).
|
|
||||||
pub async fn push_post_to_recipients(
|
|
||||||
&self,
|
|
||||||
post_id: &crate::types::PostId,
|
|
||||||
post: &Post,
|
|
||||||
visibility: &PostVisibility,
|
|
||||||
) -> usize {
|
|
||||||
let recipients: Vec<NodeId> = match visibility {
|
|
||||||
PostVisibility::Public => return 0,
|
|
||||||
PostVisibility::Encrypted { recipients } => {
|
|
||||||
recipients.iter().map(|wk| wk.recipient).collect()
|
|
||||||
}
|
|
||||||
PostVisibility::GroupEncrypted { group_id, .. } => {
|
|
||||||
// Push to all group members
|
|
||||||
match self.storage.get().await.get_all_group_members() {
|
|
||||||
Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(),
|
|
||||||
Err(_) => return 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let payload = PostPushPayload {
|
|
||||||
post: SyncPost {
|
|
||||||
id: *post_id,
|
|
||||||
post: post.clone(),
|
|
||||||
visibility: visibility.clone(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut pushed = 0;
|
|
||||||
for recipient in &recipients {
|
|
||||||
if self.send_to_peer_uni(recipient, MessageType::PostPush, &payload).await.is_ok() {
|
|
||||||
pushed += 1;
|
|
||||||
debug!(
|
|
||||||
recipient = hex::encode(recipient),
|
|
||||||
post_id = hex::encode(post_id),
|
|
||||||
"Pushed post to recipient"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pushed
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Push a profile update to all audience members (ephemeral-capable).
|
/// Push a profile update to all audience members (ephemeral-capable).
|
||||||
pub async fn push_profile(&self, profile: &PublicProfile) -> usize {
|
pub async fn push_profile(&self, profile: &PublicProfile) -> usize {
|
||||||
// Sanitize: if public_visible=false, strip display_name/bio from pushed profile
|
// Sanitize: if public_visible=false, strip display_name/bio from pushed profile
|
||||||
|
|
@ -1059,15 +1015,16 @@ impl Network {
|
||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push updated manifests to all downstream peers for a given CID.
|
/// Push an updated manifest to all known holders of the file (flat set,
|
||||||
|
/// up to 5 most-recent). Replaces the legacy downstream-tree push.
|
||||||
pub async fn push_manifest_to_downstream(
|
pub async fn push_manifest_to_downstream(
|
||||||
&self,
|
&self,
|
||||||
cid: &[u8; 32],
|
cid: &[u8; 32],
|
||||||
manifest: &crate::types::CdnManifest,
|
manifest: &crate::types::CdnManifest,
|
||||||
) -> usize {
|
) -> usize {
|
||||||
let downstream = {
|
let holders = {
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
storage.get_blob_downstream(cid).unwrap_or_default()
|
storage.get_file_holders(cid).unwrap_or_default()
|
||||||
};
|
};
|
||||||
let payload = crate::protocol::ManifestPushPayload {
|
let payload = crate::protocol::ManifestPushPayload {
|
||||||
manifests: vec![crate::protocol::ManifestPushEntry {
|
manifests: vec![crate::protocol::ManifestPushEntry {
|
||||||
|
|
@ -1076,54 +1033,40 @@ impl Network {
|
||||||
}],
|
}],
|
||||||
};
|
};
|
||||||
let mut sent = 0;
|
let mut sent = 0;
|
||||||
for (ds_nid, _) in &downstream {
|
for (peer, peer_addrs) in &holders {
|
||||||
if self.send_to_peer_uni(ds_nid, MessageType::ManifestPush, &payload).await.is_ok() {
|
if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() {
|
||||||
sent += 1;
|
sent += 1;
|
||||||
|
let storage = self.storage.get().await;
|
||||||
|
let _ = storage.touch_file_holder(
|
||||||
|
cid,
|
||||||
|
peer,
|
||||||
|
peer_addrs,
|
||||||
|
crate::storage::HolderDirection::Sent,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send blob delete notices to downstream and upstream peers.
|
/// Send blob delete notices to all known holders of a file.
|
||||||
/// Downstream peers receive our upstream info for tree healing.
|
/// Second argument kept as Option for signature stability; flat-holder
|
||||||
/// Upstream peers receive no upstream info (just "remove me as downstream").
|
/// model doesn't need separate upstream handling.
|
||||||
pub async fn send_blob_delete_notices(
|
pub async fn send_blob_delete_notices(
|
||||||
&self,
|
&self,
|
||||||
cid: &[u8; 32],
|
cid: &[u8; 32],
|
||||||
downstream: &[(NodeId, Vec<String>)],
|
holders: &[(NodeId, Vec<String>)],
|
||||||
upstream: Option<&(NodeId, Vec<String>)>,
|
_legacy_upstream: Option<&(NodeId, Vec<String>)>,
|
||||||
) -> usize {
|
) -> usize {
|
||||||
let upstream_info = upstream.map(|(nid, addrs)| {
|
let payload = crate::protocol::BlobDeleteNoticePayload {
|
||||||
crate::types::PeerWithAddress {
|
|
||||||
n: hex::encode(nid),
|
|
||||||
a: addrs.clone(),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut sent = 0;
|
|
||||||
|
|
||||||
// Notify downstream (with upstream info for tree healing)
|
|
||||||
let ds_payload = crate::protocol::BlobDeleteNoticePayload {
|
|
||||||
cid: *cid,
|
cid: *cid,
|
||||||
upstream_node: upstream_info,
|
upstream_node: None,
|
||||||
};
|
};
|
||||||
for (ds_nid, _) in downstream {
|
let mut sent = 0;
|
||||||
if self.send_to_peer_uni(ds_nid, MessageType::BlobDeleteNotice, &ds_payload).await.is_ok() {
|
for (peer, _addrs) in holders {
|
||||||
|
if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() {
|
||||||
sent += 1;
|
sent += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify upstream (no upstream info)
|
|
||||||
if let Some((up_nid, _)) = upstream {
|
|
||||||
let up_payload = crate::protocol::BlobDeleteNoticePayload {
|
|
||||||
cid: *cid,
|
|
||||||
upstream_node: None,
|
|
||||||
};
|
|
||||||
if self.send_to_peer_uni(up_nid, MessageType::BlobDeleteNotice, &up_payload).await.is_ok() {
|
|
||||||
sent += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2356,24 +2299,24 @@ impl Network {
|
||||||
self.endpoint.close().await;
|
self.endpoint.close().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Propagate an engagement diff to all downstream holders of a post (CDN tree).
|
/// Propagate an engagement diff to all known holders of a post (flat set,
|
||||||
/// Excludes the sender to avoid loops.
|
/// up to 5 most-recent). Excludes the sender to avoid loops.
|
||||||
pub async fn propagate_engagement_diff(
|
pub async fn propagate_engagement_diff(
|
||||||
&self,
|
&self,
|
||||||
post_id: &crate::types::PostId,
|
post_id: &crate::types::PostId,
|
||||||
payload: &crate::protocol::BlobHeaderDiffPayload,
|
payload: &crate::protocol::BlobHeaderDiffPayload,
|
||||||
exclude_peer: &crate::types::NodeId,
|
exclude_peer: &crate::types::NodeId,
|
||||||
) -> usize {
|
) -> usize {
|
||||||
let downstream = {
|
let holders = {
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
storage.get_post_downstream(post_id).unwrap_or_default()
|
storage.get_file_holders(post_id).unwrap_or_default()
|
||||||
};
|
};
|
||||||
let mut sent = 0;
|
let mut sent = 0;
|
||||||
for ds_nid in &downstream {
|
for (peer, _addrs) in &holders {
|
||||||
if ds_nid == exclude_peer {
|
if peer == exclude_peer {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if self.send_to_peer_uni(ds_nid, MessageType::BlobHeaderDiff, payload).await.is_ok() {
|
if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() {
|
||||||
sent += 1;
|
sent += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -836,13 +836,12 @@ impl Node {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For encrypted posts, push directly to recipients
|
// For public posts, push to audience members. Encrypted posts propagate
|
||||||
let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await;
|
// via the CDN (ManifestPush + header-diff) to eliminate the sender→recipient
|
||||||
|
// traffic signal.
|
||||||
// For public posts, push to audience members
|
|
||||||
let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await;
|
let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await;
|
||||||
|
|
||||||
info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post");
|
info!(post_id = hex::encode(post_id), audience_pushed, "Created new post");
|
||||||
Ok((post_id, post, visibility))
|
Ok((post_id, post, visibility))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1351,7 +1350,12 @@ impl Node {
|
||||||
let source_addrs: Vec<String> = response.manifest.as_ref()
|
let source_addrs: Vec<String> = response.manifest.as_ref()
|
||||||
.map(|m| m.host_addresses.clone())
|
.map(|m| m.host_addresses.clone())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
|
let _ = storage.touch_file_holder(
|
||||||
|
cid,
|
||||||
|
from_peer,
|
||||||
|
&source_addrs,
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(data)
|
Ok(data)
|
||||||
}
|
}
|
||||||
|
|
@ -1379,16 +1383,17 @@ impl Node {
|
||||||
// Collect redirect peers from responses in case we need them later
|
// Collect redirect peers from responses in case we need them later
|
||||||
let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new();
|
let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new();
|
||||||
|
|
||||||
// 2. Try existing upstream (if we previously fetched this blob)
|
// 2. Try known holders (up to 5 most-recent peers we've interacted
|
||||||
let upstream = {
|
// with about this file).
|
||||||
|
let known_holders = {
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
storage.get_blob_upstream(cid)?
|
storage.get_file_holders(cid).unwrap_or_default()
|
||||||
};
|
};
|
||||||
if let Some((upstream_nid, _upstream_addrs)) = upstream {
|
for (holder_nid, _addrs) in &known_holders {
|
||||||
match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await {
|
match self.fetch_blob_from_peer(cid, holder_nid, post_id, author, mime_type, created_at).await {
|
||||||
Ok(Some(data)) => return Ok(Some(data)),
|
Ok(Some(data)) => return Ok(Some(data)),
|
||||||
Ok(None) => {}
|
Ok(None) => {}
|
||||||
Err(e) => warn!(error = %e, "blob fetch from upstream failed"),
|
Err(e) => warn!(error = %e, "blob fetch from known holder failed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1413,7 +1418,12 @@ impl Node {
|
||||||
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
|
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = storage.store_blob_upstream(cid, &lateral, &[]);
|
let _ = storage.touch_file_holder(
|
||||||
|
cid,
|
||||||
|
&lateral,
|
||||||
|
&[],
|
||||||
|
crate::storage::HolderDirection::Received,
|
||||||
|
);
|
||||||
return Ok(Some(data));
|
return Ok(Some(data));
|
||||||
}
|
}
|
||||||
Ok((None, response)) => {
|
Ok((None, response)) => {
|
||||||
|
|
@ -1981,14 +1991,13 @@ impl Node {
|
||||||
signature,
|
signature,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Collect blob CIDs + CDN peers before cleanup
|
// Collect blob CIDs + known holders before cleanup (for delete notices)
|
||||||
let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = {
|
let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = {
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
let cids = storage.get_blobs_for_post(post_id).unwrap_or_default();
|
let cids = storage.get_blobs_for_post(post_id).unwrap_or_default();
|
||||||
cids.into_iter().map(|cid| {
|
cids.into_iter().map(|cid| {
|
||||||
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
|
let holders = storage.get_file_holders(&cid).unwrap_or_default();
|
||||||
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
|
(cid, holders, None::<(NodeId, Vec<String>)>)
|
||||||
(cid, downstream, upstream)
|
|
||||||
}).collect()
|
}).collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -2108,12 +2117,10 @@ impl Node {
|
||||||
storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?;
|
storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete_post already pushes the DeleteRecord
|
// delete_post already pushes the DeleteRecord.
|
||||||
|
// Replacement post propagates via the CDN to remaining recipients.
|
||||||
self.delete_post(post_id).await?;
|
self.delete_post(post_id).await?;
|
||||||
|
|
||||||
// Push replacement post directly to remaining recipients
|
|
||||||
self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await;
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
old_id = hex::encode(post_id),
|
old_id = hex::encode(post_id),
|
||||||
new_id = hex::encode(new_post_id),
|
new_id = hex::encode(new_post_id),
|
||||||
|
|
@ -3086,20 +3093,27 @@ impl Node {
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.as_millis() as u64 - max_age_ms;
|
.as_millis() as u64 - max_age_ms;
|
||||||
let stale = {
|
let stale_cids = {
|
||||||
let s = storage.get().await;
|
let s = storage.get().await;
|
||||||
s.get_stale_manifests(cutoff).unwrap_or_default()
|
s.get_stale_manifest_cids(cutoff).unwrap_or_default()
|
||||||
};
|
};
|
||||||
for (cid, upstream_nid, _upstream_addrs) in &stale {
|
for cid in &stale_cids {
|
||||||
// Get current updated_at for this manifest
|
// Get current updated_at + pick a holder to refresh from
|
||||||
let current_updated_at = {
|
let (current_updated_at, refresh_source) = {
|
||||||
let s = storage.get().await;
|
let s = storage.get().await;
|
||||||
s.get_cdn_manifest(cid).ok().flatten()
|
let updated_at = s.get_cdn_manifest(cid).ok().flatten()
|
||||||
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
|
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
|
||||||
.map(|m| m.updated_at)
|
.map(|m| m.updated_at)
|
||||||
.unwrap_or(0)
|
.unwrap_or(0);
|
||||||
|
let source = s.get_file_holders(cid)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.map(|(nid, _)| nid);
|
||||||
|
(updated_at, source)
|
||||||
};
|
};
|
||||||
match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await {
|
let Some(upstream_nid) = refresh_source else { continue; };
|
||||||
|
match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await {
|
||||||
Ok(Some(cdn_manifest)) => {
|
Ok(Some(cdn_manifest)) => {
|
||||||
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
|
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
|
||||||
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
|
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
|
||||||
|
|
@ -3110,10 +3124,10 @@ impl Node {
|
||||||
&cdn_manifest.author_manifest.author,
|
&cdn_manifest.author_manifest.author,
|
||||||
cdn_manifest.author_manifest.updated_at,
|
cdn_manifest.author_manifest.updated_at,
|
||||||
);
|
);
|
||||||
// Relay to our downstream
|
// Relay to known holders (flat set)
|
||||||
let downstream = s.get_blob_downstream(cid).unwrap_or_default();
|
let holders = s.get_file_holders(cid).unwrap_or_default();
|
||||||
drop(s);
|
drop(s);
|
||||||
if !downstream.is_empty() {
|
if !holders.is_empty() {
|
||||||
network.push_manifest_to_downstream(cid, &cdn_manifest).await;
|
network.push_manifest_to_downstream(cid, &cdn_manifest).await;
|
||||||
}
|
}
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
|
|
@ -3126,7 +3140,7 @@ impl Node {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
cid = hex::encode(cid),
|
cid = hex::encode(cid),
|
||||||
upstream = hex::encode(upstream_nid),
|
upstream = hex::encode(&upstream_nid),
|
||||||
error = %e,
|
error = %e,
|
||||||
"Manifest refresh from upstream failed"
|
"Manifest refresh from upstream failed"
|
||||||
);
|
);
|
||||||
|
|
@ -3277,18 +3291,16 @@ impl Node {
|
||||||
compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms)
|
compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete a blob with CDN notifications to upstream/downstream.
|
/// Delete a blob with CDN notifications to known holders.
|
||||||
pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
||||||
// Gather CDN peers before cleanup
|
// Gather known holders before cleanup
|
||||||
let (downstream, upstream) = {
|
let holders = {
|
||||||
let storage = self.storage.get().await;
|
let storage = self.storage.get().await;
|
||||||
let ds = storage.get_blob_downstream(cid).unwrap_or_default();
|
storage.get_file_holders(cid).unwrap_or_default()
|
||||||
let up = storage.get_blob_upstream(cid).ok().flatten();
|
|
||||||
(ds, up)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Send CDN delete notices
|
// Send CDN delete notices to all holders
|
||||||
self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await;
|
self.network.send_blob_delete_notices(cid, &holders, None).await;
|
||||||
|
|
||||||
// Clean up local storage
|
// Clean up local storage
|
||||||
{
|
{
|
||||||
|
|
@ -3576,15 +3588,9 @@ impl Node {
|
||||||
ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())],
|
ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())],
|
||||||
timestamp_ms: now,
|
timestamp_ms: now,
|
||||||
};
|
};
|
||||||
|
// propagate_engagement_diff targets all file_holders (flat set, max 5)
|
||||||
|
// which already subsumes what used to be upstream + downstream.
|
||||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||||
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
|
|
||||||
let upstreams = {
|
|
||||||
let storage = self.storage.get().await;
|
|
||||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
|
||||||
};
|
|
||||||
for (up, _prio) in upstreams {
|
|
||||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(reaction)
|
Ok(reaction)
|
||||||
|
|
@ -3691,14 +3697,6 @@ impl Node {
|
||||||
timestamp_ms: now,
|
timestamp_ms: now,
|
||||||
};
|
};
|
||||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||||
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
|
|
||||||
let upstreams = {
|
|
||||||
let storage = self.storage.get().await;
|
|
||||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
|
||||||
};
|
|
||||||
for (up, _prio) in upstreams {
|
|
||||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(comment)
|
Ok(comment)
|
||||||
|
|
@ -3735,14 +3733,6 @@ impl Node {
|
||||||
timestamp_ms: now,
|
timestamp_ms: now,
|
||||||
};
|
};
|
||||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||||
// Phase 6: send to all upstreams
|
|
||||||
let upstreams = {
|
|
||||||
let storage = self.storage.get().await;
|
|
||||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
|
||||||
};
|
|
||||||
for (up, _prio) in upstreams {
|
|
||||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -3776,14 +3766,6 @@ impl Node {
|
||||||
timestamp_ms: now,
|
timestamp_ms: now,
|
||||||
};
|
};
|
||||||
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
|
||||||
// Phase 6: send to all upstreams
|
|
||||||
let upstreams = {
|
|
||||||
let storage = self.storage.get().await;
|
|
||||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
|
||||||
};
|
|
||||||
for (up, _prio) in upstreams {
|
|
||||||
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -4005,14 +3987,6 @@ impl Node {
|
||||||
timestamp_ms: now,
|
timestamp_ms: now,
|
||||||
};
|
};
|
||||||
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
|
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
|
||||||
// Phase 6: send to all upstreams
|
|
||||||
let upstreams = {
|
|
||||||
let storage = self.storage.get().await;
|
|
||||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
|
||||||
};
|
|
||||||
for (up, _prio) in upstreams {
|
|
||||||
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -4127,14 +4101,6 @@ impl Node {
|
||||||
timestamp_ms: now,
|
timestamp_ms: now,
|
||||||
};
|
};
|
||||||
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
|
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
|
||||||
// Phase 6: send to all upstreams
|
|
||||||
let upstreams = {
|
|
||||||
let storage = self.storage.get().await;
|
|
||||||
storage.get_post_upstreams(&post_id).unwrap_or_default()
|
|
||||||
};
|
|
||||||
for (up, _prio) in upstreams {
|
|
||||||
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -4367,10 +4333,10 @@ impl Node {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Filter to under-replicated (< 2 downstream)
|
// Filter to under-replicated (< 2 holders)
|
||||||
let mut needs_replication = Vec::new();
|
let mut needs_replication = Vec::new();
|
||||||
for pid in &recent_ids {
|
for pid in &recent_ids {
|
||||||
match storage.get_post_downstream_count(pid) {
|
match storage.get_file_holder_count(pid) {
|
||||||
Ok(count) if count < 2 => {
|
Ok(count) if count < 2 => {
|
||||||
needs_replication.push(*pid);
|
needs_replication.push(*pid);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,26 @@ use crate::types::{
|
||||||
VisibilityIntent,
|
VisibilityIntent,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Direction for file_holders entries: whether we sent the file to this peer,
|
||||||
|
/// received it from them, or both. Not load-bearing for propagation decisions —
|
||||||
|
/// any holder can serve as a diff target — but retained for potential reuse.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum HolderDirection {
|
||||||
|
Sent,
|
||||||
|
Received,
|
||||||
|
Both,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HolderDirection {
|
||||||
|
pub fn as_str(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
HolderDirection::Sent => "sent",
|
||||||
|
HolderDirection::Received => "received",
|
||||||
|
HolderDirection::Both => "both",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Blob metadata for eviction scoring.
|
/// Blob metadata for eviction scoring.
|
||||||
pub struct EvictionCandidate {
|
pub struct EvictionCandidate {
|
||||||
pub cid: [u8; 32],
|
pub cid: [u8; 32],
|
||||||
|
|
@ -262,20 +282,6 @@ impl Storage {
|
||||||
updated_at INTEGER NOT NULL
|
updated_at INTEGER NOT NULL
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author);
|
CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author);
|
||||||
CREATE TABLE IF NOT EXISTS blob_upstream (
|
|
||||||
cid BLOB PRIMARY KEY,
|
|
||||||
source_node_id BLOB NOT NULL,
|
|
||||||
source_addresses TEXT NOT NULL DEFAULT '[]',
|
|
||||||
stored_at INTEGER NOT NULL
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS blob_downstream (
|
|
||||||
cid BLOB NOT NULL,
|
|
||||||
peer_node_id BLOB NOT NULL,
|
|
||||||
peer_addresses TEXT NOT NULL DEFAULT '[]',
|
|
||||||
registered_at INTEGER NOT NULL,
|
|
||||||
PRIMARY KEY (cid, peer_node_id)
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid);
|
|
||||||
CREATE TABLE IF NOT EXISTS group_keys (
|
CREATE TABLE IF NOT EXISTS group_keys (
|
||||||
group_id BLOB PRIMARY KEY,
|
group_id BLOB PRIMARY KEY,
|
||||||
circle_name TEXT NOT NULL,
|
circle_name TEXT NOT NULL,
|
||||||
|
|
@ -326,17 +332,6 @@ impl Storage {
|
||||||
last_seen_ms INTEGER NOT NULL,
|
last_seen_ms INTEGER NOT NULL,
|
||||||
success_count INTEGER NOT NULL DEFAULT 0
|
success_count INTEGER NOT NULL DEFAULT 0
|
||||||
);
|
);
|
||||||
CREATE TABLE IF NOT EXISTS post_downstream (
|
|
||||||
post_id BLOB NOT NULL,
|
|
||||||
peer_node_id BLOB NOT NULL,
|
|
||||||
registered_at INTEGER NOT NULL,
|
|
||||||
PRIMARY KEY (post_id, peer_node_id)
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id);
|
|
||||||
CREATE TABLE IF NOT EXISTS post_upstream (
|
|
||||||
post_id BLOB PRIMARY KEY,
|
|
||||||
peer_node_id BLOB NOT NULL
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS blob_headers (
|
CREATE TABLE IF NOT EXISTS blob_headers (
|
||||||
post_id BLOB PRIMARY KEY,
|
post_id BLOB PRIMARY KEY,
|
||||||
author BLOB NOT NULL,
|
author BLOB NOT NULL,
|
||||||
|
|
@ -389,7 +384,17 @@ impl Storage {
|
||||||
CREATE TABLE IF NOT EXISTS seen_messages (
|
CREATE TABLE IF NOT EXISTS seen_messages (
|
||||||
partner_id BLOB PRIMARY KEY,
|
partner_id BLOB PRIMARY KEY,
|
||||||
last_read_ms INTEGER NOT NULL DEFAULT 0
|
last_read_ms INTEGER NOT NULL DEFAULT 0
|
||||||
);",
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS file_holders (
|
||||||
|
file_id BLOB NOT NULL,
|
||||||
|
peer_id BLOB NOT NULL,
|
||||||
|
peer_addresses TEXT NOT NULL DEFAULT '[]',
|
||||||
|
last_interaction_ms INTEGER NOT NULL,
|
||||||
|
direction TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (file_id, peer_id)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_file_holders_recency
|
||||||
|
ON file_holders(file_id, last_interaction_ms DESC);",
|
||||||
)?;
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -543,16 +548,6 @@ impl Storage {
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration)
|
|
||||||
let has_blob_pref_tree = self.conn.prepare(
|
|
||||||
"SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'"
|
|
||||||
)?.query_row([], |row| row.get::<_, i64>(0))?;
|
|
||||||
if has_blob_pref_tree == 0 {
|
|
||||||
self.conn.execute_batch(
|
|
||||||
"ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';"
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add public_visible column to profiles if missing (Phase D-4 migration)
|
// Add public_visible column to profiles if missing (Phase D-4 migration)
|
||||||
let has_public_visible = self.conn.prepare(
|
let has_public_visible = self.conn.prepare(
|
||||||
"SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'"
|
"SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'"
|
||||||
|
|
@ -666,25 +661,18 @@ impl Storage {
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max)
|
// 0.6.1-beta: seed file_holders from legacy upstream/downstream tables
|
||||||
let has_priority = self.conn.prepare(
|
// before they're dropped. Idempotent — only fires on an empty
|
||||||
"SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'"
|
// file_holders table.
|
||||||
)?.query_row([], |row| row.get::<_, i64>(0))?;
|
self.seed_file_holders_from_legacy()?;
|
||||||
if has_priority == 0 {
|
|
||||||
self.conn.execute_batch(
|
// 0.6.1-beta: drop legacy directional tables — replaced by file_holders.
|
||||||
"ALTER TABLE post_upstream RENAME TO post_upstream_old;
|
self.conn.execute_batch(
|
||||||
CREATE TABLE post_upstream (
|
"DROP TABLE IF EXISTS blob_upstream;
|
||||||
post_id BLOB NOT NULL,
|
DROP TABLE IF EXISTS blob_downstream;
|
||||||
peer_node_id BLOB NOT NULL,
|
DROP TABLE IF EXISTS post_upstream;
|
||||||
priority INTEGER NOT NULL DEFAULT 0,
|
DROP TABLE IF EXISTS post_downstream;",
|
||||||
registered_at INTEGER NOT NULL DEFAULT 0,
|
)?;
|
||||||
PRIMARY KEY (post_id, peer_node_id)
|
|
||||||
);
|
|
||||||
INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at)
|
|
||||||
SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old;
|
|
||||||
DROP TABLE post_upstream_old;"
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -2396,8 +2384,7 @@ impl Storage {
|
||||||
params![record.post_id.as_slice(), record.author.as_slice()],
|
params![record.post_id.as_slice(), record.author.as_slice()],
|
||||||
)?;
|
)?;
|
||||||
if deleted > 0 {
|
if deleted > 0 {
|
||||||
self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?;
|
||||||
self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
|
||||||
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
|
||||||
}
|
}
|
||||||
Ok(deleted > 0)
|
Ok(deleted > 0)
|
||||||
|
|
@ -3396,28 +3383,6 @@ impl Storage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update the preferred_tree JSON for a blob upstream entry.
|
|
||||||
pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> {
|
|
||||||
let json = serde_json::to_string(
|
|
||||||
&tree.iter().map(hex::encode).collect::<Vec<_>>()
|
|
||||||
)?;
|
|
||||||
self.conn.execute(
|
|
||||||
"UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2",
|
|
||||||
params![json, cid.as_slice()],
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the preferred_tree for a blob upstream entry.
|
|
||||||
pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<NodeId>> {
|
|
||||||
let json: String = self.conn.query_row(
|
|
||||||
"SELECT preferred_tree FROM blob_upstream WHERE cid = ?1",
|
|
||||||
params![cid.as_slice()],
|
|
||||||
|row| row.get(0),
|
|
||||||
).unwrap_or_else(|_| "[]".to_string());
|
|
||||||
Ok(parse_anchors_json(&json))
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- Social Routes ----
|
// ---- Social Routes ----
|
||||||
|
|
||||||
/// Insert or update a social route entry.
|
/// Insert or update a social route entry.
|
||||||
|
|
@ -3889,10 +3854,10 @@ impl Storage {
|
||||||
GROUP BY post_id
|
GROUP BY post_id
|
||||||
) r ON b.post_id = r.post_id
|
) r ON b.post_id = r.post_id
|
||||||
LEFT JOIN (
|
LEFT JOIN (
|
||||||
SELECT cid, COUNT(*) as ds_count
|
SELECT file_id, COUNT(*) as ds_count
|
||||||
FROM blob_downstream
|
FROM file_holders
|
||||||
GROUP BY cid
|
GROUP BY file_id
|
||||||
) d ON b.cid = d.cid"
|
) d ON b.cid = d.file_id"
|
||||||
)?;
|
)?;
|
||||||
let rows = stmt.query_map(params![cutoff], |row| {
|
let rows = stmt.query_map(params![cutoff], |row| {
|
||||||
let cid_bytes: Vec<u8> = row.get(0)?;
|
let cid_bytes: Vec<u8> = row.get(0)?;
|
||||||
|
|
@ -3946,11 +3911,10 @@ impl Storage {
|
||||||
Ok(count as u64)
|
Ok(count as u64)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clean up all CDN metadata for a blob (manifests + upstream + downstream).
|
/// Clean up all CDN metadata for a blob (manifests + file_holders).
|
||||||
pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
||||||
self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?;
|
self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?;
|
||||||
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
|
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?;
|
||||||
self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3969,12 +3933,6 @@ impl Storage {
|
||||||
Ok(cids)
|
Ok(cids)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove upstream tracking for a blob CID.
|
|
||||||
pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
|
|
||||||
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn post_count(&self) -> anyhow::Result<usize> {
|
pub fn post_count(&self) -> anyhow::Result<usize> {
|
||||||
let count: i64 = self
|
let count: i64 = self
|
||||||
.conn
|
.conn
|
||||||
|
|
@ -4037,137 +3995,24 @@ impl Storage {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record the upstream source for a blob CID.
|
/// Get CIDs of manifests older than a cutoff. Callers look up holders
|
||||||
pub fn store_blob_upstream(
|
/// via file_holders to pick a refresh source.
|
||||||
&self,
|
pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result<Vec<[u8; 32]>> {
|
||||||
cid: &[u8; 32],
|
|
||||||
source_node_id: &NodeId,
|
|
||||||
source_addresses: &[String],
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let addrs_json = serde_json::to_string(source_addresses)?;
|
|
||||||
self.conn.execute(
|
|
||||||
"INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4)
|
|
||||||
ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4",
|
|
||||||
params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()],
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the upstream source for a blob CID: (node_id, addresses).
|
|
||||||
pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<Option<(NodeId, Vec<String>)>> {
|
|
||||||
let result = self.conn.query_row(
|
|
||||||
"SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1",
|
|
||||||
params![cid.as_slice()],
|
|
||||||
|row| {
|
|
||||||
let nid_bytes: Vec<u8> = row.get(0)?;
|
|
||||||
let addrs_json: String = row.get(1)?;
|
|
||||||
Ok((nid_bytes, addrs_json))
|
|
||||||
},
|
|
||||||
);
|
|
||||||
match result {
|
|
||||||
Ok((nid_bytes, addrs_json)) => {
|
|
||||||
let nid = blob_to_nodeid(nid_bytes)?;
|
|
||||||
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
|
|
||||||
Ok(Some((nid, addrs)))
|
|
||||||
}
|
|
||||||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
|
||||||
Err(e) => Err(e.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register a downstream peer for a blob CID. Returns false if already at 100 downstream.
|
|
||||||
pub fn add_blob_downstream(
|
|
||||||
&self,
|
|
||||||
cid: &[u8; 32],
|
|
||||||
peer_node_id: &NodeId,
|
|
||||||
peer_addresses: &[String],
|
|
||||||
) -> anyhow::Result<bool> {
|
|
||||||
let count = self.get_blob_downstream_count(cid)?;
|
|
||||||
if count >= 100 {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
let addrs_json = serde_json::to_string(peer_addresses)?;
|
|
||||||
self.conn.execute(
|
|
||||||
"INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4)
|
|
||||||
ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4",
|
|
||||||
params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()],
|
|
||||||
)?;
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>.
|
|
||||||
pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
|
|
||||||
let mut stmt = self.conn.prepare(
|
let mut stmt = self.conn.prepare(
|
||||||
"SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1"
|
"SELECT cid FROM cdn_manifests WHERE updated_at < ?1",
|
||||||
)?;
|
|
||||||
let rows = stmt.query_map(params![cid.as_slice()], |row| {
|
|
||||||
let nid_bytes: Vec<u8> = row.get(0)?;
|
|
||||||
let addrs_json: String = row.get(1)?;
|
|
||||||
Ok((nid_bytes, addrs_json))
|
|
||||||
})?;
|
|
||||||
let mut result = Vec::new();
|
|
||||||
for row in rows {
|
|
||||||
let (nid_bytes, addrs_json) = row?;
|
|
||||||
let nid = blob_to_nodeid(nid_bytes)?;
|
|
||||||
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
|
|
||||||
result.push((nid, addrs));
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Count downstream peers for a blob CID.
|
|
||||||
pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result<u32> {
|
|
||||||
let count: i64 = self.conn.query_row(
|
|
||||||
"SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1",
|
|
||||||
params![cid.as_slice()],
|
|
||||||
|row| row.get(0),
|
|
||||||
)?;
|
|
||||||
Ok(count as u32)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove a downstream peer for a blob CID.
|
|
||||||
pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> {
|
|
||||||
self.conn.execute(
|
|
||||||
"DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2",
|
|
||||||
params![cid.as_slice(), peer_node_id.as_slice()],
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>.
|
|
||||||
pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result<Vec<([u8; 32], NodeId, Vec<String>)>> {
|
|
||||||
let mut stmt = self.conn.prepare(
|
|
||||||
"SELECT m.cid, u.source_node_id, u.source_addresses
|
|
||||||
FROM cdn_manifests m
|
|
||||||
LEFT JOIN blob_upstream u ON m.cid = u.cid
|
|
||||||
WHERE m.updated_at < ?1"
|
|
||||||
)?;
|
)?;
|
||||||
let rows = stmt.query_map(params![older_than_ms as i64], |row| {
|
let rows = stmt.query_map(params![older_than_ms as i64], |row| {
|
||||||
let cid_bytes: Vec<u8> = row.get(0)?;
|
let cid_bytes: Vec<u8> = row.get(0)?;
|
||||||
let nid_bytes: Option<Vec<u8>> = row.get(1)?;
|
Ok(cid_bytes)
|
||||||
let addrs_json: Option<String> = row.get(2)?;
|
|
||||||
Ok((cid_bytes, nid_bytes, addrs_json))
|
|
||||||
})?;
|
})?;
|
||||||
let mut result = Vec::new();
|
let mut out = Vec::new();
|
||||||
for row in rows {
|
for row in rows {
|
||||||
let (cid_bytes, nid_bytes, addrs_json) = row?;
|
let cid_bytes = row?;
|
||||||
let cid: [u8; 32] = match cid_bytes.try_into() {
|
if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) {
|
||||||
Ok(c) => c,
|
out.push(cid);
|
||||||
Err(_) => continue,
|
}
|
||||||
};
|
|
||||||
let nid = match nid_bytes {
|
|
||||||
Some(b) => match blob_to_nodeid(b) {
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(_) => continue,
|
|
||||||
},
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
let addrs: Vec<String> = addrs_json
|
|
||||||
.map(|j| serde_json::from_str(&j).unwrap_or_default())
|
|
||||||
.unwrap_or_default();
|
|
||||||
result.push((cid, nid, addrs));
|
|
||||||
}
|
}
|
||||||
Ok(result)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the 10 posts before and 10 posts after a reference timestamp for an author.
|
/// Get the 10 posts before and 10 posts after a reference timestamp for an author.
|
||||||
|
|
@ -4271,128 +4116,148 @@ impl Storage {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Engagement: post_downstream ---
|
// --- File holders (flat, per-file, LRU-capped at 5) ---
|
||||||
|
//
|
||||||
|
// A single table for PostId-keyed engagement propagation and CID-keyed
|
||||||
|
// manifest/blob propagation. Any 32-byte content-addressed file_id fits.
|
||||||
|
|
||||||
/// Register a peer as downstream for a post (max 100 per post).
|
/// Upsert a holder for a file. Bumps last_interaction_ms to now and
|
||||||
/// Returns true if added, false if at capacity.
|
/// enforces an LRU cap of 5 holders per file.
|
||||||
pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<bool> {
|
pub fn touch_file_holder(
|
||||||
let count: i64 = self.conn.prepare(
|
&self,
|
||||||
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
|
file_id: &[u8; 32],
|
||||||
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
|
peer_id: &NodeId,
|
||||||
if count >= 100 {
|
peer_addresses: &[String],
|
||||||
return Ok(false);
|
direction: HolderDirection,
|
||||||
}
|
) -> anyhow::Result<()> {
|
||||||
self.conn.execute(
|
let addrs_json = serde_json::to_string(peer_addresses)?;
|
||||||
"INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3)
|
|
||||||
ON CONFLICT DO NOTHING",
|
|
||||||
params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()],
|
|
||||||
)?;
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get all downstream peers for a post.
|
|
||||||
pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result<Vec<NodeId>> {
|
|
||||||
let mut stmt = self.conn.prepare(
|
|
||||||
"SELECT peer_node_id FROM post_downstream WHERE post_id = ?1"
|
|
||||||
)?;
|
|
||||||
let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
|
|
||||||
let mut result = Vec::new();
|
|
||||||
for row in rows {
|
|
||||||
if let Ok(nid) = blob_to_nodeid(row?) {
|
|
||||||
result.push(nid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove a downstream peer for a post.
|
|
||||||
pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
|
|
||||||
self.conn.execute(
|
|
||||||
"DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2",
|
|
||||||
params![post_id.as_slice(), peer_node_id.as_slice()],
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Engagement: post_upstream (multi-upstream, 3 max) ---
|
|
||||||
|
|
||||||
/// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post.
|
|
||||||
pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> {
|
|
||||||
// Check current count
|
|
||||||
let count: i64 = self.conn.prepare(
|
|
||||||
"SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1"
|
|
||||||
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
|
|
||||||
if count >= 3 {
|
|
||||||
return Ok(()); // Already at cap
|
|
||||||
}
|
|
||||||
let now = now_ms();
|
let now = now_ms();
|
||||||
|
let new_dir = direction.as_str();
|
||||||
|
// Upsert. If the row exists with a different direction, promote to "both".
|
||||||
self.conn.execute(
|
self.conn.execute(
|
||||||
"INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at)
|
"INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||||
VALUES (?1, ?2, ?3, ?4)",
|
VALUES (?1, ?2, ?3, ?4, ?5)
|
||||||
params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now],
|
ON CONFLICT(file_id, peer_id) DO UPDATE SET
|
||||||
|
peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END,
|
||||||
|
last_interaction_ms = ?4,
|
||||||
|
direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END",
|
||||||
|
params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir],
|
||||||
|
)?;
|
||||||
|
// Enforce LRU cap of 5. Oldest get dropped.
|
||||||
|
self.conn.execute(
|
||||||
|
"DELETE FROM file_holders
|
||||||
|
WHERE file_id = ?1
|
||||||
|
AND peer_id NOT IN (
|
||||||
|
SELECT peer_id FROM file_holders
|
||||||
|
WHERE file_id = ?1
|
||||||
|
ORDER BY last_interaction_ms DESC
|
||||||
|
LIMIT 5
|
||||||
|
)",
|
||||||
|
params![file_id.as_slice()],
|
||||||
)?;
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all upstream peers for a post, ordered by priority ASC (0 = primary).
|
/// Count file holders (bounded at 5 by touch_file_holder's LRU cap).
|
||||||
pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result<Vec<(NodeId, u8)>> {
|
pub fn get_file_holder_count(&self, file_id: &[u8; 32]) -> anyhow::Result<u32> {
|
||||||
let mut stmt = self.conn.prepare(
|
|
||||||
"SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC"
|
|
||||||
)?;
|
|
||||||
let rows = stmt.query_map(params![post_id.as_slice()], |row| {
|
|
||||||
let bytes: Vec<u8> = row.get(0)?;
|
|
||||||
let prio: i64 = row.get(1)?;
|
|
||||||
Ok((bytes, prio as u8))
|
|
||||||
})?;
|
|
||||||
let mut result = Vec::new();
|
|
||||||
for row in rows {
|
|
||||||
let (bytes, prio) = row?;
|
|
||||||
if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) {
|
|
||||||
result.push((nid, prio));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the primary (lowest priority) upstream peer for a post.
|
|
||||||
/// Backward-compatible wrapper for code that only needs a single upstream.
|
|
||||||
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
|
|
||||||
let upstreams = self.get_post_upstreams(post_id)?;
|
|
||||||
Ok(upstreams.into_iter().next().map(|(nid, _)| nid))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove a specific upstream peer for a post.
|
|
||||||
pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
|
|
||||||
self.conn.execute(
|
|
||||||
"DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2",
|
|
||||||
params![post_id.as_slice(), peer_node_id.as_slice()],
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Promote an upstream peer to primary (priority 0), pushing others up.
|
|
||||||
pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
|
|
||||||
// Shift all priorities up by 1
|
|
||||||
self.conn.execute(
|
|
||||||
"UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1",
|
|
||||||
params![post_id.as_slice()],
|
|
||||||
)?;
|
|
||||||
// Set the promoted peer to priority 0
|
|
||||||
self.conn.execute(
|
|
||||||
"UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2",
|
|
||||||
params![post_id.as_slice(), peer_node_id.as_slice()],
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Count downstream peers for a post.
|
|
||||||
pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result<u32> {
|
|
||||||
let count: i64 = self.conn.prepare(
|
let count: i64 = self.conn.prepare(
|
||||||
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
|
"SELECT COUNT(*) FROM file_holders WHERE file_id = ?1",
|
||||||
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
|
)?.query_row(params![file_id.as_slice()], |row| row.get(0))?;
|
||||||
Ok(count as u32)
|
Ok(count as u32)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the up-to-5 most recently interacted holders of a file.
|
||||||
|
pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
|
||||||
|
let mut stmt = self.conn.prepare(
|
||||||
|
"SELECT peer_id, peer_addresses FROM file_holders
|
||||||
|
WHERE file_id = ?1
|
||||||
|
ORDER BY last_interaction_ms DESC
|
||||||
|
LIMIT 5",
|
||||||
|
)?;
|
||||||
|
let rows = stmt.query_map(params![file_id.as_slice()], |row| {
|
||||||
|
let peer_bytes: Vec<u8> = row.get(0)?;
|
||||||
|
let addrs_json: String = row.get(1)?;
|
||||||
|
Ok((peer_bytes, addrs_json))
|
||||||
|
})?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for row in rows {
|
||||||
|
let (peer_bytes, addrs_json) = row?;
|
||||||
|
if peer_bytes.len() != 32 { continue; }
|
||||||
|
let mut peer = [0u8; 32];
|
||||||
|
peer.copy_from_slice(&peer_bytes);
|
||||||
|
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
|
||||||
|
out.push((NodeId::from(peer), addrs));
|
||||||
|
}
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove all holders for a file (e.g. on post/blob deletion).
|
||||||
|
pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> {
|
||||||
|
self.conn.execute(
|
||||||
|
"DELETE FROM file_holders WHERE file_id = ?1",
|
||||||
|
params![file_id.as_slice()],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a single peer's holder entry for a file.
|
||||||
|
pub fn remove_file_holder(&self, file_id: &[u8; 32], peer_id: &NodeId) -> anyhow::Result<()> {
|
||||||
|
self.conn.execute(
|
||||||
|
"DELETE FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
|
||||||
|
params![file_id.as_slice(), peer_id.as_slice()],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One-time migration: seed file_holders from the legacy upstream/downstream
|
||||||
|
/// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder
|
||||||
|
/// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the
|
||||||
|
/// PRIMARY KEY. Skips tables that don't exist on fresh installs.
|
||||||
|
pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> {
|
||||||
|
// Skip if file_holders already populated (idempotent re-run protection).
|
||||||
|
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")?
|
||||||
|
.query_row([], |row| row.get(0))?;
|
||||||
|
if existing > 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let now = now_ms() as i64;
|
||||||
|
let table_exists = |name: &str| -> anyhow::Result<bool> {
|
||||||
|
let count: i64 = self.conn.prepare(
|
||||||
|
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
|
||||||
|
)?.query_row(params![name], |row| row.get(0))?;
|
||||||
|
Ok(count > 0)
|
||||||
|
};
|
||||||
|
if table_exists("post_upstream")? {
|
||||||
|
self.conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||||
|
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
|
||||||
|
params![now],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
if table_exists("post_downstream")? {
|
||||||
|
self.conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||||
|
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
|
||||||
|
params![now],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
if table_exists("blob_upstream")? {
|
||||||
|
self.conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||||
|
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
|
||||||
|
params![now],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
if table_exists("blob_downstream")? {
|
||||||
|
self.conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
|
||||||
|
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
|
||||||
|
params![now],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// --- Engagement: reactions ---
|
// --- Engagement: reactions ---
|
||||||
|
|
||||||
/// Store a reaction (upsert by reactor+post_id+emoji).
|
/// Store a reaction (upsert by reactor+post_id+emoji).
|
||||||
|
|
@ -5301,60 +5166,6 @@ mod tests {
|
||||||
assert_eq!(manifests[0].0, cid);
|
assert_eq!(manifests[0].0, cid);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn blob_upstream_crud() {
|
|
||||||
let s = temp_storage();
|
|
||||||
let cid = [42u8; 32];
|
|
||||||
let source = make_node_id(1);
|
|
||||||
let addrs = vec!["10.0.0.1:4433".to_string()];
|
|
||||||
|
|
||||||
s.store_blob_upstream(&cid, &source, &addrs).unwrap();
|
|
||||||
let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap();
|
|
||||||
assert_eq!(nid, source);
|
|
||||||
assert_eq!(got_addrs, addrs);
|
|
||||||
|
|
||||||
// Missing
|
|
||||||
assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none());
|
|
||||||
|
|
||||||
// Update
|
|
||||||
let source2 = make_node_id(2);
|
|
||||||
s.store_blob_upstream(&cid, &source2, &[]).unwrap();
|
|
||||||
let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap();
|
|
||||||
assert_eq!(nid, source2);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn blob_downstream_crud_and_limit() {
|
|
||||||
let s = temp_storage();
|
|
||||||
let cid = [42u8; 32];
|
|
||||||
|
|
||||||
// Add downstream peers
|
|
||||||
for i in 0..100u8 {
|
|
||||||
let peer = make_node_id(i);
|
|
||||||
let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap();
|
|
||||||
assert!(ok, "should accept peer {}", i);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100);
|
|
||||||
|
|
||||||
// 101st should be rejected
|
|
||||||
let peer_101 = make_node_id(200);
|
|
||||||
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
|
|
||||||
assert!(!ok, "should reject 101st downstream");
|
|
||||||
|
|
||||||
// Get all downstream
|
|
||||||
let downstream = s.get_blob_downstream(&cid).unwrap();
|
|
||||||
assert_eq!(downstream.len(), 100);
|
|
||||||
|
|
||||||
// Remove one
|
|
||||||
s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap();
|
|
||||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99);
|
|
||||||
|
|
||||||
// Now adding one more should work
|
|
||||||
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
|
|
||||||
assert!(ok, "should accept after removal");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn blob_pin_unpin() {
|
fn blob_pin_unpin() {
|
||||||
let s = temp_storage();
|
let s = temp_storage();
|
||||||
|
|
@ -5438,18 +5249,15 @@ mod tests {
|
||||||
let peer = make_node_id(2);
|
let peer = make_node_id(2);
|
||||||
|
|
||||||
s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap();
|
s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap();
|
||||||
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
|
s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap();
|
||||||
s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap();
|
|
||||||
|
|
||||||
assert!(s.get_cdn_manifest(&cid).unwrap().is_some());
|
assert!(s.get_cdn_manifest(&cid).unwrap().is_some());
|
||||||
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
|
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1);
|
||||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1);
|
|
||||||
|
|
||||||
s.cleanup_cdn_for_blob(&cid).unwrap();
|
s.cleanup_cdn_for_blob(&cid).unwrap();
|
||||||
|
|
||||||
assert!(s.get_cdn_manifest(&cid).unwrap().is_none());
|
assert!(s.get_cdn_manifest(&cid).unwrap().is_none());
|
||||||
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
|
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0);
|
||||||
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
@ -5469,18 +5277,6 @@ mod tests {
|
||||||
assert!(cids.contains(&cid2));
|
assert!(cids.contains(&cid2));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_blob_upstream() {
|
|
||||||
let s = temp_storage();
|
|
||||||
let cid = [42u8; 32];
|
|
||||||
let peer = make_node_id(1);
|
|
||||||
|
|
||||||
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
|
|
||||||
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
|
|
||||||
|
|
||||||
s.remove_blob_upstream(&cid).unwrap();
|
|
||||||
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn author_post_neighborhood() {
|
fn author_post_neighborhood() {
|
||||||
|
|
@ -5840,24 +5636,6 @@ mod tests {
|
||||||
assert_eq!(got2.preferred_tree.len(), 2);
|
assert_eq!(got2.preferred_tree.len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn blob_upstream_preferred_tree() {
|
|
||||||
let s = temp_storage();
|
|
||||||
let cid = [42u8; 32];
|
|
||||||
let source = make_node_id(1);
|
|
||||||
s.store_blob_upstream(&cid, &source, &[]).unwrap();
|
|
||||||
|
|
||||||
// Initially empty
|
|
||||||
let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap();
|
|
||||||
assert!(tree.is_empty());
|
|
||||||
|
|
||||||
// Update
|
|
||||||
let nodes = vec![make_node_id(10), make_node_id(11)];
|
|
||||||
s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap();
|
|
||||||
let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap();
|
|
||||||
assert_eq!(tree2.len(), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- Circle Profile tests ----
|
// ---- Circle Profile tests ----
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
@ -6058,32 +5836,39 @@ mod tests {
|
||||||
// --- Engagement tests ---
|
// --- Engagement tests ---
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn post_downstream_crud() {
|
fn file_holders_lru_cap() {
|
||||||
let s = temp_storage();
|
let s = temp_storage();
|
||||||
let post_id = make_post_id(1);
|
let file = [42u8; 32];
|
||||||
let peer1 = make_node_id(1);
|
// Sleep between inserts so last_interaction_ms actually differs (ms resolution).
|
||||||
let peer2 = make_node_id(2);
|
for i in 0..7u8 {
|
||||||
|
s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap();
|
||||||
// Add downstream peers
|
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||||
assert!(s.add_post_downstream(&post_id, &peer1).unwrap());
|
|
||||||
assert!(s.add_post_downstream(&post_id, &peer2).unwrap());
|
|
||||||
|
|
||||||
let downstream = s.get_post_downstream(&post_id).unwrap();
|
|
||||||
assert_eq!(downstream.len(), 2);
|
|
||||||
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2);
|
|
||||||
|
|
||||||
// Remove one
|
|
||||||
s.remove_post_downstream(&post_id, &peer1).unwrap();
|
|
||||||
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1);
|
|
||||||
|
|
||||||
// Capacity limit
|
|
||||||
let big_post = make_post_id(99);
|
|
||||||
for i in 0..100u8 {
|
|
||||||
assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap());
|
|
||||||
}
|
}
|
||||||
assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100);
|
// Only 5 most-recent survive
|
||||||
// 101st should fail
|
assert_eq!(s.get_file_holder_count(&file).unwrap(), 5);
|
||||||
assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap());
|
let holders = s.get_file_holders(&file).unwrap();
|
||||||
|
assert_eq!(holders.len(), 5);
|
||||||
|
let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect();
|
||||||
|
// Oldest two (i=0, i=1) got evicted; most recent (i=6) survives
|
||||||
|
assert!(!kept.contains(&make_node_id(0)));
|
||||||
|
assert!(!kept.contains(&make_node_id(1)));
|
||||||
|
assert!(kept.contains(&make_node_id(6)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn file_holders_direction_promotion() {
|
||||||
|
let s = temp_storage();
|
||||||
|
let file = [42u8; 32];
|
||||||
|
let peer = make_node_id(1);
|
||||||
|
s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap();
|
||||||
|
s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap();
|
||||||
|
// Re-insert with opposite direction should promote to "both"
|
||||||
|
let dir: String = s.conn.query_row(
|
||||||
|
"SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
|
||||||
|
rusqlite::params![file.as_slice(), peer.as_slice()],
|
||||||
|
|row| row.get(0),
|
||||||
|
).unwrap();
|
||||||
|
assert_eq!(dir, "both");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
|
|
@ -132,8 +132,8 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
|
||||||
if let Some(author) = author_id {
|
if let Some(author) = author_id {
|
||||||
holders.push(author);
|
holders.push(author);
|
||||||
}
|
}
|
||||||
if let Ok(downstream) = store.get_post_downstream(&post_id) {
|
if let Ok(file_holders) = store.get_file_holders(&post_id) {
|
||||||
for peer in downstream {
|
for (peer, _addrs) in file_holders {
|
||||||
if !holders.contains(&peer) {
|
if !holders.contains(&peer) {
|
||||||
holders.push(peer);
|
holders.push(peer);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue