From 5d9ba224279c3452f218bffeb70f75d92ee7e791 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 21:42:15 -0400 Subject: [PATCH] Phase 2e (0.6.1-beta): drop legacy upstream/downstream tables The file_holders table is now the only tracker of per-file peer relationships. post_upstream, post_downstream, blob_upstream, and blob_downstream are dropped at first launch after the seed migration copies any existing entries. Schema: - DROP TABLE IF EXISTS on all four legacy tables after seeding - Seed migration guards with sqlite_master table_exists check so fresh installs don't crash trying to read non-existent sources - Remove CREATE TABLE statements for the four tables from init - Remove Protocol v4 Phase 6 post_upstream priority migration (dead) - Remove blob_upstream preferred_tree column migration (dead) Rust: - Remove add/get/remove post_upstream, post_downstream, blob_upstream, blob_downstream methods - Remove get_blob_upstream_preferred_tree / update variant - Rewrite get_eviction_candidates's downstream_count subquery to count file_holders entries - Rewrite apply_delete's cascade cleanup to clear file_holders instead of post_upstream/post_downstream - cleanup_cdn_for_blob now clears file_holders for the CID Callers: - All dual-write sites in connection.rs and node.rs now do touch_file_holder only (legacy writes removed) - get_stale_manifests replaced with get_stale_manifest_cids; caller in node.rs picks a refresh source from file_holders Tests: - Remove blob_upstream_crud, blob_downstream_crud_and_limit, blob_upstream_preferred_tree, remove_blob_upstream, post_downstream_crud - Add file_holders_lru_cap and file_holders_direction_promotion tests All 110 core tests passing. Workspace compiles clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 17 +- crates/core/src/node.rs | 27 +- crates/core/src/storage.rs | 572 ++++++---------------------------- 3 files changed, 112 insertions(+), 504 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index ccfbf90..7d97a7a 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1393,8 +1393,6 @@ impl ConnectionManager { { let s = storage.get().await; for pid in &new_post_ids { - let prio = s.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = s.add_post_upstream(pid, peer_id, prio); let _ = s.touch_file_holder( pid, peer_id, @@ -1946,8 +1944,6 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(pid, from, prio); let _ = storage.touch_file_holder( pid, from, @@ -2046,8 +2042,6 @@ impl ConnectionManager { { let storage = self.storage.get().await; for pid in &new_post_ids { - let prio = storage.get_post_upstreams(pid).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(pid, peer_id, prio); let _ = storage.touch_file_holder( pid, peer_id, @@ -4984,8 +4978,6 @@ impl ConnectionManager { &push.post.post, &push.post.visibility, ); - let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); let _ = storage.touch_file_holder( &push.post.id, &remote_node_id, @@ -5205,8 +5197,6 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { - let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); let _ = storage.touch_file_holder( &sync_post.id, &sender_id, @@ -5454,7 +5444,6 @@ impl ConnectionManager { let payload: PostDownstreamRegisterPayload = read_payload(recv, MAX_PAYLOAD).await?; let cm = conn_mgr.lock().await; let storage = cm.storage.get().await; - let _ = storage.add_post_downstream(&payload.post_id, &remote_node_id); let _ = storage.touch_file_holder( &payload.post_id, &remote_node_id, @@ -6108,9 +6097,8 @@ impl ConnectionManager { to_pull.push(*pid); } - // Register as downstream for all accepted posts + // Register as holder for all accepted posts for pid in &acc { - let _ = storage.add_post_downstream(pid, &remote_node_id); let _ = storage.touch_file_holder( pid, &remote_node_id, @@ -6167,8 +6155,6 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); - let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); - let _ = storage.add_post_upstream(&sp.id, &sender, prio); let _ = storage.touch_file_holder( &sp.id, &sender, @@ -6201,7 +6187,6 @@ impl ConnectionManager { let cm = cm_arc.lock().await; let storage = cm.storage.get().await; let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes); - let _ = storage.add_post_upstream(&att.cid, &sender, 0); let _ = storage.touch_file_holder( &att.cid, &sender, diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 3fafe20..85c7606 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1350,7 +1350,6 @@ impl Node { let source_addrs: Vec = response.manifest.as_ref() .map(|m| m.host_addresses.clone()) .unwrap_or_default(); - let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); let _ = storage.touch_file_holder( cid, from_peer, @@ -1419,7 +1418,6 @@ impl Node { let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at); } } - let _ = storage.store_blob_upstream(cid, &lateral, &[]); let _ = storage.touch_file_holder( cid, &lateral, @@ -3095,20 +3093,27 @@ impl Node { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 - max_age_ms; - let stale = { + let stale_cids = { let s = storage.get().await; - s.get_stale_manifests(cutoff).unwrap_or_default() + s.get_stale_manifest_cids(cutoff).unwrap_or_default() }; - for (cid, upstream_nid, _upstream_addrs) in &stale { - // Get current updated_at for this manifest - let current_updated_at = { + for cid in &stale_cids { + // Get current updated_at + pick a holder to refresh from + let (current_updated_at, refresh_source) = { let s = storage.get().await; - s.get_cdn_manifest(cid).ok().flatten() + let updated_at = s.get_cdn_manifest(cid).ok().flatten() .and_then(|json| serde_json::from_str::(&json).ok()) .map(|m| m.updated_at) - .unwrap_or(0) + .unwrap_or(0); + let source = s.get_file_holders(cid) + .unwrap_or_default() + .into_iter() + .next() + .map(|(nid, _)| nid); + (updated_at, source) }; - match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await { + let Some(upstream_nid) = refresh_source else { continue; }; + match network.request_manifest_refresh(cid, &upstream_nid, current_updated_at).await { Ok(Some(cdn_manifest)) => { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); @@ -3135,7 +3140,7 @@ impl Node { Err(e) => { tracing::debug!( cid = hex::encode(cid), - upstream = hex::encode(upstream_nid), + upstream = hex::encode(&upstream_nid), error = %e, "Manifest refresh from upstream failed" ); diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 171c7e5..31434bb 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -282,20 +282,6 @@ impl Storage { updated_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_cdn_manifests_author ON cdn_manifests(author); - CREATE TABLE IF NOT EXISTS blob_upstream ( - cid BLOB PRIMARY KEY, - source_node_id BLOB NOT NULL, - source_addresses TEXT NOT NULL DEFAULT '[]', - stored_at INTEGER NOT NULL - ); - CREATE TABLE IF NOT EXISTS blob_downstream ( - cid BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - peer_addresses TEXT NOT NULL DEFAULT '[]', - registered_at INTEGER NOT NULL, - PRIMARY KEY (cid, peer_node_id) - ); - CREATE INDEX IF NOT EXISTS idx_blob_downstream_cid ON blob_downstream(cid); CREATE TABLE IF NOT EXISTS group_keys ( group_id BLOB PRIMARY KEY, circle_name TEXT NOT NULL, @@ -346,17 +332,6 @@ impl Storage { last_seen_ms INTEGER NOT NULL, success_count INTEGER NOT NULL DEFAULT 0 ); - CREATE TABLE IF NOT EXISTS post_downstream ( - post_id BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - registered_at INTEGER NOT NULL, - PRIMARY KEY (post_id, peer_node_id) - ); - CREATE INDEX IF NOT EXISTS idx_post_downstream_post ON post_downstream(post_id); - CREATE TABLE IF NOT EXISTS post_upstream ( - post_id BLOB PRIMARY KEY, - peer_node_id BLOB NOT NULL - ); CREATE TABLE IF NOT EXISTS blob_headers ( post_id BLOB PRIMARY KEY, author BLOB NOT NULL, @@ -573,16 +548,6 @@ impl Storage { )?; } - // Add preferred_tree column to blob_upstream if missing (CDN Preferred Tree migration) - let has_blob_pref_tree = self.conn.prepare( - "SELECT COUNT(*) FROM pragma_table_info('blob_upstream') WHERE name='preferred_tree'" - )?.query_row([], |row| row.get::<_, i64>(0))?; - if has_blob_pref_tree == 0 { - self.conn.execute_batch( - "ALTER TABLE blob_upstream ADD COLUMN preferred_tree TEXT NOT NULL DEFAULT '[]';" - )?; - } - // Add public_visible column to profiles if missing (Phase D-4 migration) let has_public_visible = self.conn.prepare( "SELECT COUNT(*) FROM pragma_table_info('profiles') WHERE name='public_visible'" @@ -696,31 +661,19 @@ impl Storage { )?; } - // Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max) - let has_priority = self.conn.prepare( - "SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'" - )?.query_row([], |row| row.get::<_, i64>(0))?; - if has_priority == 0 { - self.conn.execute_batch( - "ALTER TABLE post_upstream RENAME TO post_upstream_old; - CREATE TABLE post_upstream ( - post_id BLOB NOT NULL, - peer_node_id BLOB NOT NULL, - priority INTEGER NOT NULL DEFAULT 0, - registered_at INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (post_id, peer_node_id) - ); - INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at) - SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old; - DROP TABLE post_upstream_old;" - )?; - } - // 0.6.1-beta: seed file_holders from legacy upstream/downstream tables // before they're dropped. Idempotent — only fires on an empty // file_holders table. self.seed_file_holders_from_legacy()?; + // 0.6.1-beta: drop legacy directional tables — replaced by file_holders. + self.conn.execute_batch( + "DROP TABLE IF EXISTS blob_upstream; + DROP TABLE IF EXISTS blob_downstream; + DROP TABLE IF EXISTS post_upstream; + DROP TABLE IF EXISTS post_downstream;", + )?; + Ok(()) } @@ -2431,8 +2384,7 @@ impl Storage { params![record.post_id.as_slice(), record.author.as_slice()], )?; if deleted > 0 { - self.conn.execute("DELETE FROM post_downstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; - self.conn.execute("DELETE FROM post_upstream WHERE post_id = ?1", params![record.post_id.as_slice()])?; + self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?; } Ok(deleted > 0) @@ -3431,28 +3383,6 @@ impl Storage { Ok(()) } - /// Update the preferred_tree JSON for a blob upstream entry. - pub fn update_blob_upstream_preferred_tree(&self, cid: &[u8; 32], tree: &[NodeId]) -> anyhow::Result<()> { - let json = serde_json::to_string( - &tree.iter().map(hex::encode).collect::>() - )?; - self.conn.execute( - "UPDATE blob_upstream SET preferred_tree = ?1 WHERE cid = ?2", - params![json, cid.as_slice()], - )?; - Ok(()) - } - - /// Get the preferred_tree for a blob upstream entry. - pub fn get_blob_upstream_preferred_tree(&self, cid: &[u8; 32]) -> anyhow::Result> { - let json: String = self.conn.query_row( - "SELECT preferred_tree FROM blob_upstream WHERE cid = ?1", - params![cid.as_slice()], - |row| row.get(0), - ).unwrap_or_else(|_| "[]".to_string()); - Ok(parse_anchors_json(&json)) - } - // ---- Social Routes ---- /// Insert or update a social route entry. @@ -3924,10 +3854,10 @@ impl Storage { GROUP BY post_id ) r ON b.post_id = r.post_id LEFT JOIN ( - SELECT cid, COUNT(*) as ds_count - FROM blob_downstream - GROUP BY cid - ) d ON b.cid = d.cid" + SELECT file_id, COUNT(*) as ds_count + FROM file_holders + GROUP BY file_id + ) d ON b.cid = d.file_id" )?; let rows = stmt.query_map(params![cutoff], |row| { let cid_bytes: Vec = row.get(0)?; @@ -3981,11 +3911,10 @@ impl Storage { Ok(count as u64) } - /// Clean up all CDN metadata for a blob (manifests + upstream + downstream). + /// Clean up all CDN metadata for a blob (manifests + file_holders). pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> { self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; - self.conn.execute("DELETE FROM blob_downstream WHERE cid = ?1", params![cid.as_slice()])?; + self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![cid.as_slice()])?; Ok(()) } @@ -4004,12 +3933,6 @@ impl Storage { Ok(cids) } - /// Remove upstream tracking for a blob CID. - pub fn remove_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result<()> { - self.conn.execute("DELETE FROM blob_upstream WHERE cid = ?1", params![cid.as_slice()])?; - Ok(()) - } - pub fn post_count(&self) -> anyhow::Result { let count: i64 = self .conn @@ -4072,137 +3995,24 @@ impl Storage { Ok(result) } - /// Record the upstream source for a blob CID. - pub fn store_blob_upstream( - &self, - cid: &[u8; 32], - source_node_id: &NodeId, - source_addresses: &[String], - ) -> anyhow::Result<()> { - let addrs_json = serde_json::to_string(source_addresses)?; - self.conn.execute( - "INSERT INTO blob_upstream (cid, source_node_id, source_addresses, stored_at) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(cid) DO UPDATE SET source_node_id = ?2, source_addresses = ?3, stored_at = ?4", - params![cid.as_slice(), source_node_id.as_slice(), addrs_json, now_ms()], - )?; - Ok(()) - } - - /// Get the upstream source for a blob CID: (node_id, addresses). - pub fn get_blob_upstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { - let result = self.conn.query_row( - "SELECT source_node_id, source_addresses FROM blob_upstream WHERE cid = ?1", - params![cid.as_slice()], - |row| { - let nid_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((nid_bytes, addrs_json)) - }, - ); - match result { - Ok((nid_bytes, addrs_json)) => { - let nid = blob_to_nodeid(nid_bytes)?; - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - Ok(Some((nid, addrs))) - } - Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(e) => Err(e.into()), - } - } - - /// Register a downstream peer for a blob CID. Returns false if already at 100 downstream. - pub fn add_blob_downstream( - &self, - cid: &[u8; 32], - peer_node_id: &NodeId, - peer_addresses: &[String], - ) -> anyhow::Result { - let count = self.get_blob_downstream_count(cid)?; - if count >= 100 { - return Ok(false); - } - let addrs_json = serde_json::to_string(peer_addresses)?; - self.conn.execute( - "INSERT INTO blob_downstream (cid, peer_node_id, peer_addresses, registered_at) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(cid, peer_node_id) DO UPDATE SET peer_addresses = ?3, registered_at = ?4", - params![cid.as_slice(), peer_node_id.as_slice(), addrs_json, now_ms()], - )?; - Ok(true) - } - - /// Get all downstream peers for a blob CID: Vec<(node_id, addresses)>. - pub fn get_blob_downstream(&self, cid: &[u8; 32]) -> anyhow::Result)>> { + /// Get CIDs of manifests older than a cutoff. Callers look up holders + /// via file_holders to pick a refresh source. + pub fn get_stale_manifest_cids(&self, older_than_ms: u64) -> anyhow::Result> { let mut stmt = self.conn.prepare( - "SELECT peer_node_id, peer_addresses FROM blob_downstream WHERE cid = ?1" - )?; - let rows = stmt.query_map(params![cid.as_slice()], |row| { - let nid_bytes: Vec = row.get(0)?; - let addrs_json: String = row.get(1)?; - Ok((nid_bytes, addrs_json)) - })?; - let mut result = Vec::new(); - for row in rows { - let (nid_bytes, addrs_json) = row?; - let nid = blob_to_nodeid(nid_bytes)?; - let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); - result.push((nid, addrs)); - } - Ok(result) - } - - /// Count downstream peers for a blob CID. - pub fn get_blob_downstream_count(&self, cid: &[u8; 32]) -> anyhow::Result { - let count: i64 = self.conn.query_row( - "SELECT COUNT(*) FROM blob_downstream WHERE cid = ?1", - params![cid.as_slice()], - |row| row.get(0), - )?; - Ok(count as u32) - } - - /// Remove a downstream peer for a blob CID. - pub fn remove_blob_downstream(&self, cid: &[u8; 32], peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM blob_downstream WHERE cid = ?1 AND peer_node_id = ?2", - params![cid.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Get manifests older than a cutoff: Vec<(cid, upstream_node_id, upstream_addresses)>. - pub fn get_stale_manifests(&self, older_than_ms: u64) -> anyhow::Result)>> { - let mut stmt = self.conn.prepare( - "SELECT m.cid, u.source_node_id, u.source_addresses - FROM cdn_manifests m - LEFT JOIN blob_upstream u ON m.cid = u.cid - WHERE m.updated_at < ?1" + "SELECT cid FROM cdn_manifests WHERE updated_at < ?1", )?; let rows = stmt.query_map(params![older_than_ms as i64], |row| { let cid_bytes: Vec = row.get(0)?; - let nid_bytes: Option> = row.get(1)?; - let addrs_json: Option = row.get(2)?; - Ok((cid_bytes, nid_bytes, addrs_json)) + Ok(cid_bytes) })?; - let mut result = Vec::new(); + let mut out = Vec::new(); for row in rows { - let (cid_bytes, nid_bytes, addrs_json) = row?; - let cid: [u8; 32] = match cid_bytes.try_into() { - Ok(c) => c, - Err(_) => continue, - }; - let nid = match nid_bytes { - Some(b) => match blob_to_nodeid(b) { - Ok(n) => n, - Err(_) => continue, - }, - None => continue, - }; - let addrs: Vec = addrs_json - .map(|j| serde_json::from_str(&j).unwrap_or_default()) - .unwrap_or_default(); - result.push((cid, nid, addrs)); + let cid_bytes = row?; + if let Ok(cid) = <[u8; 32]>::try_from(cid_bytes.as_slice()) { + out.push(cid); + } } - Ok(result) + Ok(out) } /// Get the 10 posts before and 10 posts after a reference timestamp for an author. @@ -4306,128 +4116,6 @@ impl Storage { Ok(result) } - // --- Engagement: post_downstream --- - - /// Register a peer as downstream for a post (max 100 per post). - /// Returns true if added, false if at capacity. - pub fn add_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result { - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - if count >= 100 { - return Ok(false); - } - self.conn.execute( - "INSERT INTO post_downstream (post_id, peer_node_id, registered_at) VALUES (?1, ?2, ?3) - ON CONFLICT DO NOTHING", - params![post_id.as_slice(), peer_node_id.as_slice(), now_ms()], - )?; - Ok(true) - } - - /// Get all downstream peers for a post. - pub fn get_post_downstream(&self, post_id: &PostId) -> anyhow::Result> { - let mut stmt = self.conn.prepare( - "SELECT peer_node_id FROM post_downstream WHERE post_id = ?1" - )?; - let rows = stmt.query_map(params![post_id.as_slice()], |row| row.get::<_, Vec>(0))?; - let mut result = Vec::new(); - for row in rows { - if let Ok(nid) = blob_to_nodeid(row?) { - result.push(nid); - } - } - Ok(result) - } - - /// Remove a downstream peer for a post. - pub fn remove_post_downstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM post_downstream WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - // --- Engagement: post_upstream (multi-upstream, 3 max) --- - - /// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post. - pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> { - // Check current count - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - if count >= 3 { - return Ok(()); // Already at cap - } - let now = now_ms(); - self.conn.execute( - "INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at) - VALUES (?1, ?2, ?3, ?4)", - params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now], - )?; - Ok(()) - } - - /// Get all upstream peers for a post, ordered by priority ASC (0 = primary). - pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result> { - let mut stmt = self.conn.prepare( - "SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC" - )?; - let rows = stmt.query_map(params![post_id.as_slice()], |row| { - let bytes: Vec = row.get(0)?; - let prio: i64 = row.get(1)?; - Ok((bytes, prio as u8)) - })?; - let mut result = Vec::new(); - for row in rows { - let (bytes, prio) = row?; - if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) { - result.push((nid, prio)); - } - } - Ok(result) - } - - /// Get the primary (lowest priority) upstream peer for a post. - /// Backward-compatible wrapper for code that only needs a single upstream. - pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result> { - let upstreams = self.get_post_upstreams(post_id)?; - Ok(upstreams.into_iter().next().map(|(nid, _)| nid)) - } - - /// Remove a specific upstream peer for a post. - pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - self.conn.execute( - "DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Promote an upstream peer to primary (priority 0), pushing others up. - pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { - // Shift all priorities up by 1 - self.conn.execute( - "UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1", - params![post_id.as_slice()], - )?; - // Set the promoted peer to priority 0 - self.conn.execute( - "UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2", - params![post_id.as_slice(), peer_node_id.as_slice()], - )?; - Ok(()) - } - - /// Count downstream peers for a post. - pub fn get_post_downstream_count(&self, post_id: &PostId) -> anyhow::Result { - let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM post_downstream WHERE post_id = ?1" - )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; - Ok(count as u32) - } - // --- File holders (flat, per-file, LRU-capped at 5) --- // // A single table for PostId-keyed engagement propagation and CID-keyed @@ -4524,7 +4212,7 @@ impl Storage { /// One-time migration: seed file_holders from the legacy upstream/downstream /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the - /// PRIMARY KEY. + /// PRIMARY KEY. Skips tables that don't exist on fresh installs. pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { // Skip if file_holders already populated (idempotent re-run protection). let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? @@ -4533,30 +4221,40 @@ impl Storage { return Ok(()); } let now = now_ms() as i64; - // post_upstream → holders we received engagement diffs from - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", - params![now], - )?; - // post_downstream → holders we sent engagement diffs to - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", - params![now], - )?; - // blob_upstream → peer we fetched the blob/manifest from - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", - params![now], - )?; - // blob_downstream → peers we served the blob/manifest to - self.conn.execute( - "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) - SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", - params![now], - )?; + let table_exists = |name: &str| -> anyhow::Result { + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", + )?.query_row(params![name], |row| row.get(0))?; + Ok(count > 0) + }; + if table_exists("post_upstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", + params![now], + )?; + } + if table_exists("post_downstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", + params![now], + )?; + } + if table_exists("blob_upstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", + params![now], + )?; + } + if table_exists("blob_downstream")? { + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", + params![now], + )?; + } Ok(()) } @@ -5468,60 +5166,6 @@ mod tests { assert_eq!(manifests[0].0, cid); } - #[test] - fn blob_upstream_crud() { - let s = temp_storage(); - let cid = [42u8; 32]; - let source = make_node_id(1); - let addrs = vec!["10.0.0.1:4433".to_string()]; - - s.store_blob_upstream(&cid, &source, &addrs).unwrap(); - let (nid, got_addrs) = s.get_blob_upstream(&cid).unwrap().unwrap(); - assert_eq!(nid, source); - assert_eq!(got_addrs, addrs); - - // Missing - assert!(s.get_blob_upstream(&[99u8; 32]).unwrap().is_none()); - - // Update - let source2 = make_node_id(2); - s.store_blob_upstream(&cid, &source2, &[]).unwrap(); - let (nid, _) = s.get_blob_upstream(&cid).unwrap().unwrap(); - assert_eq!(nid, source2); - } - - #[test] - fn blob_downstream_crud_and_limit() { - let s = temp_storage(); - let cid = [42u8; 32]; - - // Add downstream peers - for i in 0..100u8 { - let peer = make_node_id(i); - let ok = s.add_blob_downstream(&cid, &peer, &[format!("10.0.0.{}:4433", i)]).unwrap(); - assert!(ok, "should accept peer {}", i); - } - - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 100); - - // 101st should be rejected - let peer_101 = make_node_id(200); - let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); - assert!(!ok, "should reject 101st downstream"); - - // Get all downstream - let downstream = s.get_blob_downstream(&cid).unwrap(); - assert_eq!(downstream.len(), 100); - - // Remove one - s.remove_blob_downstream(&cid, &make_node_id(0)).unwrap(); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 99); - - // Now adding one more should work - let ok = s.add_blob_downstream(&cid, &peer_101, &[]).unwrap(); - assert!(ok, "should accept after removal"); - } - #[test] fn blob_pin_unpin() { let s = temp_storage(); @@ -5605,18 +5249,15 @@ mod tests { let peer = make_node_id(2); s.store_cdn_manifest(&cid, r#"{"test": true}"#, &author, 100).unwrap(); - s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); - s.add_blob_downstream(&cid, &peer, &["10.0.0.2:4433".to_string()]).unwrap(); + s.touch_file_holder(&cid, &peer, &["10.0.0.1:4433".to_string()], HolderDirection::Received).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_some()); - assert!(s.get_blob_upstream(&cid).unwrap().is_some()); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 1); + assert_eq!(s.get_file_holder_count(&cid).unwrap(), 1); s.cleanup_cdn_for_blob(&cid).unwrap(); assert!(s.get_cdn_manifest(&cid).unwrap().is_none()); - assert!(s.get_blob_upstream(&cid).unwrap().is_none()); - assert_eq!(s.get_blob_downstream_count(&cid).unwrap(), 0); + assert_eq!(s.get_file_holder_count(&cid).unwrap(), 0); } #[test] @@ -5636,18 +5277,6 @@ mod tests { assert!(cids.contains(&cid2)); } - #[test] - fn remove_blob_upstream() { - let s = temp_storage(); - let cid = [42u8; 32]; - let peer = make_node_id(1); - - s.store_blob_upstream(&cid, &peer, &["10.0.0.1:4433".to_string()]).unwrap(); - assert!(s.get_blob_upstream(&cid).unwrap().is_some()); - - s.remove_blob_upstream(&cid).unwrap(); - assert!(s.get_blob_upstream(&cid).unwrap().is_none()); - } #[test] fn author_post_neighborhood() { @@ -6007,24 +5636,6 @@ mod tests { assert_eq!(got2.preferred_tree.len(), 2); } - #[test] - fn blob_upstream_preferred_tree() { - let s = temp_storage(); - let cid = [42u8; 32]; - let source = make_node_id(1); - s.store_blob_upstream(&cid, &source, &[]).unwrap(); - - // Initially empty - let tree = s.get_blob_upstream_preferred_tree(&cid).unwrap(); - assert!(tree.is_empty()); - - // Update - let nodes = vec![make_node_id(10), make_node_id(11)]; - s.update_blob_upstream_preferred_tree(&cid, &nodes).unwrap(); - let tree2 = s.get_blob_upstream_preferred_tree(&cid).unwrap(); - assert_eq!(tree2.len(), 2); - } - // ---- Circle Profile tests ---- #[test] @@ -6225,32 +5836,39 @@ mod tests { // --- Engagement tests --- #[test] - fn post_downstream_crud() { + fn file_holders_lru_cap() { let s = temp_storage(); - let post_id = make_post_id(1); - let peer1 = make_node_id(1); - let peer2 = make_node_id(2); - - // Add downstream peers - assert!(s.add_post_downstream(&post_id, &peer1).unwrap()); - assert!(s.add_post_downstream(&post_id, &peer2).unwrap()); - - let downstream = s.get_post_downstream(&post_id).unwrap(); - assert_eq!(downstream.len(), 2); - assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 2); - - // Remove one - s.remove_post_downstream(&post_id, &peer1).unwrap(); - assert_eq!(s.get_post_downstream_count(&post_id).unwrap(), 1); - - // Capacity limit - let big_post = make_post_id(99); - for i in 0..100u8 { - assert!(s.add_post_downstream(&big_post, &make_node_id(i + 1)).unwrap()); + let file = [42u8; 32]; + // Sleep between inserts so last_interaction_ms actually differs (ms resolution). + for i in 0..7u8 { + s.touch_file_holder(&file, &make_node_id(i), &[], HolderDirection::Received).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(2)); } - assert_eq!(s.get_post_downstream_count(&big_post).unwrap(), 100); - // 101st should fail - assert!(!s.add_post_downstream(&big_post, &make_node_id(200)).unwrap()); + // Only 5 most-recent survive + assert_eq!(s.get_file_holder_count(&file).unwrap(), 5); + let holders = s.get_file_holders(&file).unwrap(); + assert_eq!(holders.len(), 5); + let kept: std::collections::HashSet<_> = holders.iter().map(|(n, _)| *n).collect(); + // Oldest two (i=0, i=1) got evicted; most recent (i=6) survives + assert!(!kept.contains(&make_node_id(0))); + assert!(!kept.contains(&make_node_id(1))); + assert!(kept.contains(&make_node_id(6))); + } + + #[test] + fn file_holders_direction_promotion() { + let s = temp_storage(); + let file = [42u8; 32]; + let peer = make_node_id(1); + s.touch_file_holder(&file, &peer, &[], HolderDirection::Received).unwrap(); + s.touch_file_holder(&file, &peer, &[], HolderDirection::Sent).unwrap(); + // Re-insert with opposite direction should promote to "both" + let dir: String = s.conn.query_row( + "SELECT direction FROM file_holders WHERE file_id = ?1 AND peer_id = ?2", + rusqlite::params![file.as_slice(), peer.as_slice()], + |row| row.get(0), + ).unwrap(); + assert_eq!(dir, "both"); } #[test]