From 1658762a68f871ca1b7bd4b96aa0703700fdff00 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 20:52:30 -0400 Subject: [PATCH] Phase 2a (0.6.1-beta): add file_holders table + legacy seed migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New flat per-file holder set replaces the directional upstream/downstream trees. Keyed by 32-byte content-addressed file_id (works for both PostId and blob CID). LRU-capped at 5 holders per file on touch. - HolderDirection enum (Sent/Received/Both) — tracked for potential reuse, not load-bearing for propagation - touch_file_holder / get_file_holders / delete_file_holders - seed_file_holders_from_legacy: one-time idempotent seed from post_upstream, post_downstream, blob_upstream, blob_downstream so users upgrading from 0.6.0 don't start with empty holder sets Table and methods land here; call-site refactor and legacy-table drop follow in subsequent commits within this phase. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/storage.rs | 152 ++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 1 deletion(-) diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 5f37559..8b8ba25 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -12,6 +12,26 @@ use crate::types::{ VisibilityIntent, }; +/// Direction for file_holders entries: whether we sent the file to this peer, +/// received it from them, or both. Not load-bearing for propagation decisions — +/// any holder can serve as a diff target — but retained for potential reuse. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HolderDirection { + Sent, + Received, + Both, +} + +impl HolderDirection { + pub fn as_str(&self) -> &'static str { + match self { + HolderDirection::Sent => "sent", + HolderDirection::Received => "received", + HolderDirection::Both => "both", + } + } +} + /// Blob metadata for eviction scoring. pub struct EvictionCandidate { pub cid: [u8; 32], @@ -389,7 +409,17 @@ impl Storage { CREATE TABLE IF NOT EXISTS seen_messages ( partner_id BLOB PRIMARY KEY, last_read_ms INTEGER NOT NULL DEFAULT 0 - );", + ); + CREATE TABLE IF NOT EXISTS file_holders ( + file_id BLOB NOT NULL, + peer_id BLOB NOT NULL, + peer_addresses TEXT NOT NULL DEFAULT '[]', + last_interaction_ms INTEGER NOT NULL, + direction TEXT NOT NULL, + PRIMARY KEY (file_id, peer_id) + ); + CREATE INDEX IF NOT EXISTS idx_file_holders_recency + ON file_holders(file_id, last_interaction_ms DESC);", )?; Ok(()) } @@ -686,6 +716,11 @@ impl Storage { )?; } + // 0.6.1-beta: seed file_holders from legacy upstream/downstream tables + // before they're dropped. Idempotent — only fires on an empty + // file_holders table. + self.seed_file_holders_from_legacy()?; + Ok(()) } @@ -4393,6 +4428,121 @@ impl Storage { Ok(count as u32) } + // --- File holders (flat, per-file, LRU-capped at 5) --- + // + // A single table for PostId-keyed engagement propagation and CID-keyed + // manifest/blob propagation. Any 32-byte content-addressed file_id fits. + + /// Upsert a holder for a file. Bumps last_interaction_ms to now and + /// enforces an LRU cap of 5 holders per file. + pub fn touch_file_holder( + &self, + file_id: &[u8; 32], + peer_id: &NodeId, + peer_addresses: &[String], + direction: HolderDirection, + ) -> anyhow::Result<()> { + let addrs_json = serde_json::to_string(peer_addresses)?; + let now = now_ms(); + let new_dir = direction.as_str(); + // Upsert. If the row exists with a different direction, promote to "both". + self.conn.execute( + "INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(file_id, peer_id) DO UPDATE SET + peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END, + last_interaction_ms = ?4, + direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END", + params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir], + )?; + // Enforce LRU cap of 5. Oldest get dropped. + self.conn.execute( + "DELETE FROM file_holders + WHERE file_id = ?1 + AND peer_id NOT IN ( + SELECT peer_id FROM file_holders + WHERE file_id = ?1 + ORDER BY last_interaction_ms DESC + LIMIT 5 + )", + params![file_id.as_slice()], + )?; + Ok(()) + } + + /// Return the up-to-5 most recently interacted holders of a file. + pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result)>> { + let mut stmt = self.conn.prepare( + "SELECT peer_id, peer_addresses FROM file_holders + WHERE file_id = ?1 + ORDER BY last_interaction_ms DESC + LIMIT 5", + )?; + let rows = stmt.query_map(params![file_id.as_slice()], |row| { + let peer_bytes: Vec = row.get(0)?; + let addrs_json: String = row.get(1)?; + Ok((peer_bytes, addrs_json)) + })?; + let mut out = Vec::new(); + for row in rows { + let (peer_bytes, addrs_json) = row?; + if peer_bytes.len() != 32 { continue; } + let mut peer = [0u8; 32]; + peer.copy_from_slice(&peer_bytes); + let addrs: Vec = serde_json::from_str(&addrs_json).unwrap_or_default(); + out.push((NodeId::from(peer), addrs)); + } + Ok(out) + } + + /// Remove all holders for a file (e.g. on post/blob deletion). + pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM file_holders WHERE file_id = ?1", + params![file_id.as_slice()], + )?; + Ok(()) + } + + /// One-time migration: seed file_holders from the legacy upstream/downstream + /// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder + /// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the + /// PRIMARY KEY. + pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> { + // Skip if file_holders already populated (idempotent re-run protection). + let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")? + .query_row([], |row| row.get(0))?; + if existing > 0 { + return Ok(()); + } + let now = now_ms() as i64; + // post_upstream → holders we received engagement diffs from + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream", + params![now], + )?; + // post_downstream → holders we sent engagement diffs to + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream", + params![now], + )?; + // blob_upstream → peer we fetched the blob/manifest from + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream", + params![now], + )?; + // blob_downstream → peers we served the blob/manifest to + self.conn.execute( + "INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction) + SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream", + params![now], + )?; + Ok(()) + } + // --- Engagement: reactions --- /// Store a reaction (upsert by reactor+post_id+emoji).