Phase 2e (0.6.1-beta): drop legacy upstream/downstream tables

The file_holders table is now the only tracker of per-file peer
relationships. post_upstream, post_downstream, blob_upstream, and
blob_downstream are dropped at first launch after the seed migration
copies any existing entries.

Schema:
- DROP TABLE IF EXISTS on all four legacy tables after seeding
- Seed migration guards with sqlite_master table_exists check so fresh
  installs don't crash trying to read non-existent sources
- Remove CREATE TABLE statements for the four tables from init
- Remove Protocol v4 Phase 6 post_upstream priority migration (dead)
- Remove blob_upstream preferred_tree column migration (dead)

Rust:
- Remove add/get/remove post_upstream, post_downstream,
  blob_upstream, blob_downstream methods
- Remove get_blob_upstream_preferred_tree / update variant
- Rewrite get_eviction_candidates's downstream_count subquery to
  count file_holders entries
- Rewrite apply_delete's cascade cleanup to clear file_holders
  instead of post_upstream/post_downstream
- cleanup_cdn_for_blob now clears file_holders for the CID

Callers:
- All dual-write sites in connection.rs and node.rs now do
  touch_file_holder only (legacy writes removed)
- get_stale_manifests replaced with get_stale_manifest_cids; caller
  in node.rs picks a refresh source from file_holders

Tests:
- Remove blob_upstream_crud, blob_downstream_crud_and_limit,
  blob_upstream_preferred_tree, remove_blob_upstream,
  post_downstream_crud
- Add file_holders_lru_cap and file_holders_direction_promotion tests

All 110 core tests passing. Workspace compiles clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-04-21 21:42:15 -04:00
parent 60463d1817
commit 5d9ba22427
3 changed files with 112 additions and 504 deletions

View file

@ -1393,8 +1393,6 @@ impl ConnectionManager {
{
let s = storage.get().await;
for pid in &new_post_ids {
let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = s.add_post_upstream(pid, peer_id, prio);
let _ = s.touch_file_holder(
pid,
peer_id,
@ -1946,8 +1944,6 @@ impl ConnectionManager {
{
let storage = self.storage.get().await;
for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, from, prio);
let _ = storage.touch_file_holder(
pid,
from,
@ -2046,8 +2042,6 @@ impl ConnectionManager {
{
let storage = self.storage.get().await;
for pid in &new_post_ids {
let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(pid, peer_id, prio);
let _ = storage.touch_file_holder(
pid,
peer_id,
@ -4984,8 +4978,6 @@ impl ConnectionManager {
&push.post.post,
&push.post.visibility,
);
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
let _ = storage.touch_file_holder(
&push.post.id,
&remote_node_id,
@ -5205,8 +5197,6 @@ impl ConnectionManager {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
let _ = storage.touch_file_holder(
&sync_post.id,
&sender_id,
@ -5454,7 +5444,6 @@ impl ConnectionManager {
let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?;
let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await;
let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id);
let _ = storage.touch_file_holder(
&payload.post_id,
&remote_node_id,
@ -6108,9 +6097,8 @@ impl ConnectionManager {
to_pull.push(*pid);
}
// Register as downstream for all accepted posts
// Register as holder for all accepted posts
for pid in &acc {
let _ = storage.add_post_downstream(pid, &remote_node_id);
let _ = storage.touch_file_holder(
pid,
&remote_node_id,
@ -6167,8 +6155,6 @@ impl ConnectionManager {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, &sender, prio);
let _ = storage.touch_file_holder(
&sp.id,
&sender,
@ -6201,7 +6187,6 @@ impl ConnectionManager {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
let _ = storage.add_post_upstream(&att.cid, &sender, 0);
let _ = storage.touch_file_holder(
&att.cid,
&sender,

View file

@ -1350,7 +1350,6 @@ impl Node {
let source_addrs: Vec<String> = response.manifest.as_ref()
.map(|m| m.host_addresses.clone())
.unwrap_or_default();
let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
let _ = storage.touch_file_holder(
cid,
from_peer,
@ -1419,7 +1418,6 @@ impl Node {
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
}
}
let _ = storage.store_blob_upstream(cid, &lateral, &[]);
let _ = storage.touch_file_holder(
cid,
&lateral,
@ -3095,20 +3093,27 @@ impl Node {
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64 - max_age_ms;
let stale = {
let stale_cids = {
let s = storage.get().await;
s.get_stale_manifests(cutoff).unwrap_or_default()
s.get_stale_manifest_cids(cutoff).unwrap_or_default()
};
for (cid, upstream_nid, _upstream_addrs) in &stale {
// Get current updated_at for this manifest
let current_updated_at = {
for cid in &stale_cids {
// Get current updated_at + pick a holder to refresh from
let (current_updated_at, refresh_source) = {
let s = storage.get().await;
s.get_cdn_manifest(cid).ok().flatten()
let updated_at = s.get_cdn_manifest(cid).ok().flatten()
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
.map(|m| m.updated_at)
.unwrap_or(0)
.unwrap_or(0);
let source = s.get_file_holders(cid)
.unwrap_or_default()
.into_iter()
.next()
.map(|(nid, _)| nid);
(updated_at, source)
};
match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await {
let Some(upstream_nid) = refresh_source else { continue; };
match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await {
Ok(Some(cdn_manifest)) => {
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
@ -3135,7 +3140,7 @@ impl Node {
Err(e) => {
tracing::debug!(
cid = hex::encode(cid),
upstream = hex::encode(upstream_nid),
upstream = hex::encode(&upstream_nid),
error = %e,
"Manifest refresh from upstream failed"
);

View file

@ -282,20 +282,6 @@ impl Storage {
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author);
CREATE TABLE IF NOT EXISTS blob_upstream (
cid BLOB PRIMARY KEY,
source_node_id BLOB NOT NULL,
source_addresses TEXT NOT NULL DEFAULT '[]',
stored_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_downstream (
cid BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
peer_addresses TEXT NOT NULL DEFAULT '[]',
registered_at INTEGER NOT NULL,
PRIMARY KEY (cid, peer_node_id)
);
CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid);
CREATE TABLE IF NOT EXISTS group_keys (
group_id BLOB PRIMARY KEY,
circle_name TEXT NOT NULL,
@ -346,17 +332,6 @@ impl Storage {
last_seen_ms INTEGER NOT NULL,
success_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS post_downstream (
post_id BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
registered_at INTEGER NOT NULL,
PRIMARY KEY (post_id, peer_node_id)
);
CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id);
CREATE TABLE IF NOT EXISTS post_upstream (
post_id BLOB PRIMARY KEY,
peer_node_id BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS blob_headers (
post_id BLOB PRIMARY KEY,
author BLOB NOT NULL,
@ -573,16 +548,6 @@ impl Storage {
)?;
}
// Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration)
let has_blob_pref_tree = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_blob_pref_tree == 0 {
self.conn.execute_batch(
"ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';"
)?;
}
// Add public_visible column to profiles if missing (Phase D-4 migration)
let has_public_visible = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'"
@ -696,31 +661,19 @@ impl Storage {
)?;
}
// Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max)
let has_priority = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_priority == 0 {
self.conn.execute_batch(
"ALTER TABLE post_upstream RENAME TO post_upstream_old;
CREATE TABLE post_upstream (
post_id BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
registered_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (post_id, peer_node_id)
);
INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at)
SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old;
DROP TABLE post_upstream_old;"
)?;
}
// 0.6.1-beta: seed file_holders from legacy upstream/downstream tables
// before they're dropped. Idempotent — only fires on an empty
// file_holders table.
self.seed_file_holders_from_legacy()?;
// 0.6.1-beta: drop legacy directional tables — replaced by file_holders.
self.conn.execute_batch(
"DROP TABLE IF EXISTS blob_upstream;
DROP TABLE IF EXISTS blob_downstream;
DROP TABLE IF EXISTS post_upstream;
DROP TABLE IF EXISTS post_downstream;",
)?;
Ok(())
}
@ -2431,8 +2384,7 @@ impl Storage {
params![record.post_id.as_slice(), record.author.as_slice()],
)?;
if deleted > 0 {
self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
}
Ok(deleted > 0)
@ -3431,28 +3383,6 @@ impl Storage {
Ok(())
}
/// Update the preferred_tree JSON for a blob upstream entry.
pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> {
let json = serde_json::to_string(
&tree.iter().map(hex::encode).collect::<Vec<_>>()
)?;
self.conn.execute(
"UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2",
params![json, cid.as_slice()],
)?;
Ok(())
}
/// Get the preferred_tree for a blob upstream entry.
pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<NodeId>> {
let json: String = self.conn.query_row(
"SELECT preferred_tree FROM blob_upstream WHERE cid = ?1",
params![cid.as_slice()],
|row| row.get(0),
).unwrap_or_else(|_| "[]".to_string());
Ok(parse_anchors_json(&json))
}
// ---- Social Routes ----
/// Insert or update a social route entry.
@ -3924,10 +3854,10 @@ impl Storage {
GROUP BY post_id
) r ON b.post_id = r.post_id
LEFT JOIN (
SELECT cid, COUNT(*) as ds_count
FROM blob_downstream
GROUP BY cid
) d ON b.cid = d.cid"
SELECT file_id, COUNT(*) as ds_count
FROM file_holders
GROUP BY file_id
) d ON b.cid = d.file_id"
)?;
let rows = stmt.query_map(params![cutoff], |row| {
let cid_bytes: Vec<u8> = row.get(0)?;
@ -3981,11 +3911,10 @@ impl Storage {
Ok(count as u64)
}
/// Clean up all CDN metadata for a blob (manifests + upstream + downstream).
/// Clean up all CDN metadata for a blob (manifests + file_holders).
pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?;
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?;
Ok(())
}
@ -4004,12 +3933,6 @@ impl Storage {
Ok(cids)
}
/// Remove upstream tracking for a blob CID.
pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?;
Ok(())
}
pub fn post_count(&self) -> anyhow::Result<usize> {
let count: i64 = self
.conn
@ -4072,137 +3995,24 @@ impl Storage {
Ok(result)
}
/// Record the upstream source for a blob CID.
pub fn store_blob_upstream(
&self,
cid: &[u8; 32],
source_node_id: &NodeId,
source_addresses: &[String],
) -> anyhow::Result<()> {
let addrs_json = serde_json::to_string(source_addresses)?;
self.conn.execute(
"INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4",
params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()],
)?;
Ok(())
}
/// Get the upstream source for a blob CID: (node_id, addresses).
pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<Option<(NodeId, Vec<String>)>> {
let result = self.conn.query_row(
"SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1",
params![cid.as_slice()],
|row| {
let nid_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((nid_bytes, addrs_json))
},
);
match result {
Ok((nid_bytes, addrs_json)) => {
let nid = blob_to_nodeid(nid_bytes)?;
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
Ok(Some((nid, addrs)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Register a downstream peer for a blob CID. Returns false if already at 100 downstream.
pub fn add_blob_downstream(
&self,
cid: &[u8; 32],
peer_node_id: &NodeId,
peer_addresses: &[String],
) -> anyhow::Result<bool> {
let count = self.get_blob_downstream_count(cid)?;
if count >= 100 {
return Ok(false);
}
let addrs_json = serde_json::to_string(peer_addresses)?;
self.conn.execute(
"INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4",
params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()],
)?;
Ok(true)
}
/// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>.
pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
/// Get CIDs of manifests older than a cutoff. Callers look up holders
/// via file_holders to pick a refresh source.
pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result<Vec<[u8; 32]>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1"
)?;
let rows = stmt.query_map(params![cid.as_slice()], |row| {
let nid_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((nid_bytes, addrs_json))
})?;
let mut result = Vec::new();
for row in rows {
let (nid_bytes, addrs_json) = row?;
let nid = blob_to_nodeid(nid_bytes)?;
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
result.push((nid, addrs));
}
Ok(result)
}
/// Count downstream peers for a blob CID.
pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result<u32> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1",
params![cid.as_slice()],
|row| row.get(0),
)?;
Ok(count as u32)
}
/// Remove a downstream peer for a blob CID.
pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2",
params![cid.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>.
pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result<Vec<([u8; 32], NodeId, Vec<String>)>> {
let mut stmt = self.conn.prepare(
"SELECT m.cid, u.source_node_id, u.source_addresses
FROM cdn_manifests m
LEFT JOIN blob_upstream u ON m.cid = u.cid
WHERE m.updated_at < ?1"
"SELECT cid FROM cdn_manifests WHERE updated_at < ?1",
)?;
let rows = stmt.query_map(params![older_than_ms as i64], |row| {
let cid_bytes: Vec<u8> = row.get(0)?;
let nid_bytes: Option<Vec<u8>> = row.get(1)?;
let addrs_json: Option<String> = row.get(2)?;
Ok((cid_bytes, nid_bytes, addrs_json))
Ok(cid_bytes)
})?;
let mut result = Vec::new();
let mut out = Vec::new();
for row in rows {
let (cid_bytes, nid_bytes, addrs_json) = row?;
let cid: [u8; 32] = match cid_bytes.try_into() {
Ok(c) => c,
Err(_) => continue,
};
let nid = match nid_bytes {
Some(b) => match blob_to_nodeid(b) {
Ok(n) => n,
Err(_) => continue,
},
None => continue,
};
let addrs: Vec<String> = addrs_json
.map(|j| serde_json::from_str(&j).unwrap_or_default())
.unwrap_or_default();
result.push((cid, nid, addrs));
let cid_bytes = row?;
if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) {
out.push(cid);
}
}
Ok(result)
Ok(out)
}
/// Get the 10 posts before and 10 posts after a reference timestamp for an author.
@ -4306,128 +4116,6 @@ impl Storage {
Ok(result)
}
// --- Engagement: post_downstream ---
/// Register a peer as downstream for a post (max 100 per post).
/// Returns true if added, false if at capacity.
pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<bool> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
if count >= 100 {
return Ok(false);
}
self.conn.execute(
"INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING",
params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()],
)?;
Ok(true)
}
/// Get all downstream peers for a post.
pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result<Vec<NodeId>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id FROM post_downstream WHERE post_id = ?1"
)?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
let mut result = Vec::new();
for row in rows {
if let Ok(nid) = blob_to_nodeid(row?) {
result.push(nid);
}
}
Ok(result)
}
/// Remove a downstream peer for a post.
pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
// --- Engagement: post_upstream (multi-upstream, 3 max) ---
/// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post.
pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> {
// Check current count
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
if count >= 3 {
return Ok(()); // Already at cap
}
let now = now_ms();
self.conn.execute(
"INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at)
VALUES (?1, ?2, ?3, ?4)",
params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now],
)?;
Ok(())
}
/// Get all upstream peers for a post, ordered by priority ASC (0 = primary).
pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result<Vec<(NodeId, u8)>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC"
)?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| {
let bytes: Vec<u8> = row.get(0)?;
let prio: i64 = row.get(1)?;
Ok((bytes, prio as u8))
})?;
let mut result = Vec::new();
for row in rows {
let (bytes, prio) = row?;
if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) {
result.push((nid, prio));
}
}
Ok(result)
}
/// Get the primary (lowest priority) upstream peer for a post.
/// Backward-compatible wrapper for code that only needs a single upstream.
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
let upstreams = self.get_post_upstreams(post_id)?;
Ok(upstreams.into_iter().next().map(|(nid, _)| nid))
}
/// Remove a specific upstream peer for a post.
pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Promote an upstream peer to primary (priority 0), pushing others up.
pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
// Shift all priorities up by 1
self.conn.execute(
"UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1",
params![post_id.as_slice()],
)?;
// Set the promoted peer to priority 0
self.conn.execute(
"UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Count downstream peers for a post.
pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result<u32> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
Ok(count as u32)
}
// --- File holders (flat, per-file, LRU-capped at 5) ---
//
// A single table for PostId-keyed engagement propagation and CID-keyed
@ -4524,7 +4212,7 @@ impl Storage {
/// One-time migration: seed file_holders from the legacy upstream/downstream
/// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder
/// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the
/// PRIMARY KEY.
/// PRIMARY KEY. Skips tables that don't exist on fresh installs.
pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> {
// Skip if file_holders already populated (idempotent re-run protection).
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")?
@ -4533,30 +4221,40 @@ impl Storage {
return Ok(());
}
let now = now_ms() as i64;
// post_upstream → holders we received engagement diffs from
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
params![now],
)?;
// post_downstream → holders we sent engagement diffs to
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
params![now],
)?;
// blob_upstream → peer we fetched the blob/manifest from
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
params![now],
)?;
// blob_downstream → peers we served the blob/manifest to
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
params![now],
)?;
let table_exists = |name: &str| -> anyhow::Result<bool> {
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
)?.query_row(params![name], |row| row.get(0))?;
Ok(count > 0)
};
if table_exists("post_upstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
params![now],
)?;
}
if table_exists("post_downstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
params![now],
)?;
}
if table_exists("blob_upstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
params![now],
)?;
}
if table_exists("blob_downstream")? {
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
params![now],
)?;
}
Ok(())
}
@ -5468,60 +5166,6 @@ mod tests {
assert_eq!(manifests[0].0, cid);
}
#[test]
fn blob_upstream_crud() {
let s = temp_storage();
let cid = [42u8; 32];
let source = make_node_id(1);
let addrs = vec!["10.0.0.1:4433".to_string()];
s.store_blob_upstream(&cid, &source, &addrs).unwrap();
let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap();
assert_eq!(nid, source);
assert_eq!(got_addrs, addrs);
// Missing
assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none());
// Update
let source2 = make_node_id(2);
s.store_blob_upstream(&cid, &source2, &[]).unwrap();
let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap();
assert_eq!(nid, source2);
}
#[test]
fn blob_downstream_crud_and_limit() {
let s = temp_storage();
let cid = [42u8; 32];
// Add downstream peers
for i in 0..100u8 {
let peer = make_node_id(i);
let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap();
assert!(ok, "should accept peer {}", i);
}
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100);
// 101st should be rejected
let peer_101 = make_node_id(200);
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
assert!(!ok, "should reject 101st downstream");
// Get all downstream
let downstream = s.get_blob_downstream(&cid).unwrap();
assert_eq!(downstream.len(), 100);
// Remove one
s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap();
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99);
// Now adding one more should work
let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap();
assert!(ok, "should accept after removal");
}
#[test]
fn blob_pin_unpin() {
let s = temp_storage();
@ -5605,18 +5249,15 @@ mod tests {
let peer = make_node_id(2);
s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap();
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap();
s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap();
assert!(s.get_cdn_manifest(&cid).unwrap().is_some());
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1);
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1);
s.cleanup_cdn_for_blob(&cid).unwrap();
assert!(s.get_cdn_manifest(&cid).unwrap().is_none());
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0);
assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0);
}
#[test]
@ -5636,18 +5277,6 @@ mod tests {
assert!(cids.contains(&cid2));
}
#[test]
fn remove_blob_upstream() {
let s = temp_storage();
let cid = [42u8; 32];
let peer = make_node_id(1);
s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap();
assert!(s.get_blob_upstream(&cid).unwrap().is_some());
s.remove_blob_upstream(&cid).unwrap();
assert!(s.get_blob_upstream(&cid).unwrap().is_none());
}
#[test]
fn author_post_neighborhood() {
@ -6007,24 +5636,6 @@ mod tests {
assert_eq!(got2.preferred_tree.len(), 2);
}
#[test]
fn blob_upstream_preferred_tree() {
let s = temp_storage();
let cid = [42u8; 32];
let source = make_node_id(1);
s.store_blob_upstream(&cid, &source, &[]).unwrap();
// Initially empty
let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap();
assert!(tree.is_empty());
// Update
let nodes = vec![make_node_id(10), make_node_id(11)];
s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap();
let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap();
assert_eq!(tree2.len(), 2);
}
// ---- Circle Profile tests ----
#[test]
@ -6225,32 +5836,39 @@ mod tests {
// --- Engagement tests ---
#[test]
fn post_downstream_crud() {
fn file_holders_lru_cap() {
let s = temp_storage();
let post_id = make_post_id(1);
let peer1 = make_node_id(1);
let peer2 = make_node_id(2);
// Add downstream peers
assert!(s.add_post_downstream(&post_id, &peer1).unwrap());
assert!(s.add_post_downstream(&post_id, &peer2).unwrap());
let downstream = s.get_post_downstream(&post_id).unwrap();
assert_eq!(downstream.len(), 2);
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2);
// Remove one
s.remove_post_downstream(&post_id, &peer1).unwrap();
assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1);
// Capacity limit
let big_post = make_post_id(99);
for i in 0..100u8 {
assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap());
let file = [42u8; 32];
// Sleep between inserts so last_interaction_ms actually differs (ms resolution).
for i in 0..7u8 {
s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap();
std::thread::sleep(std::time::Duration::from_millis(2));
}
assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100);
// 101st should fail
assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap());
// Only 5 most-recent survive
assert_eq!(s.get_file_holder_count(&file).unwrap(), 5);
let holders = s.get_file_holders(&file).unwrap();
assert_eq!(holders.len(), 5);
let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect();
// Oldest two (i=0, i=1) got evicted; most recent (i=6) survives
assert!(!kept.contains(&make_node_id(0)));
assert!(!kept.contains(&make_node_id(1)));
assert!(kept.contains(&make_node_id(6)));
}
#[test]
fn file_holders_direction_promotion() {
let s = temp_storage();
let file = [42u8; 32];
let peer = make_node_id(1);
s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap();
s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap();
// Re-insert with opposite direction should promote to "both"
let dir: String = s.conn.query_row(
"SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2",
rusqlite::params![file.as_slice(), peer.as_slice()],
|row| row.get(0),
).unwrap();
assert_eq!(dir, "both");
}
#[test]