Phase 2a (0.6.1-beta): add file_holders table + legacy seed migration

New flat per-file holder set replaces the directional upstream/downstream
trees. Keyed by 32-byte content-addressed file_id (works for both PostId
and blob CID). LRU-capped at 5 holders per file on touch.

- HolderDirection enum (Sent/Received/Both) — tracked for potential
  reuse, not load-bearing for propagation
- touch_file_holder / get_file_holders / delete_file_holders
- seed_file_holders_from_legacy: one-time idempotent seed from
  post_upstream, post_downstream, blob_upstream, blob_downstream so
  users upgrading from 0.6.0 don't start with empty holder sets

Table and methods land here; call-site refactor and legacy-table drop
follow in subsequent commits within this phase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-04-21 20:52:30 -04:00
parent e6265b52b6
commit 1658762a68

View file

@ -12,6 +12,26 @@ use crate::types::{
VisibilityIntent, VisibilityIntent,
}; };
/// Direction for file_holders entries: whether we sent the file to this peer,
/// received it from them, or both. Not load-bearing for propagation decisions —
/// any holder can serve as a diff target — but retained for potential reuse.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HolderDirection {
Sent,
Received,
Both,
}
impl HolderDirection {
pub fn as_str(&self) -> &'static str {
match self {
HolderDirection::Sent => "sent",
HolderDirection::Received => "received",
HolderDirection::Both => "both",
}
}
}
/// Blob metadata for eviction scoring. /// Blob metadata for eviction scoring.
pub struct EvictionCandidate { pub struct EvictionCandidate {
pub cid: [u8; 32], pub cid: [u8; 32],
@ -389,7 +409,17 @@ impl Storage {
CREATE TABLE IF NOT EXISTS seen_messages ( CREATE TABLE IF NOT EXISTS seen_messages (
partner_id BLOB PRIMARY KEY, partner_id BLOB PRIMARY KEY,
last_read_ms INTEGER NOT NULL DEFAULT 0 last_read_ms INTEGER NOT NULL DEFAULT 0
);", );
CREATE TABLE IF NOT EXISTS file_holders (
file_id BLOB NOT NULL,
peer_id BLOB NOT NULL,
peer_addresses TEXT NOT NULL DEFAULT '[]',
last_interaction_ms INTEGER NOT NULL,
direction TEXT NOT NULL,
PRIMARY KEY (file_id, peer_id)
);
CREATE INDEX IF NOT EXISTS idx_file_holders_recency
ON file_holders(file_id, last_interaction_ms DESC);",
)?; )?;
Ok(()) Ok(())
} }
@ -686,6 +716,11 @@ impl Storage {
)?; )?;
} }
// 0.6.1-beta: seed file_holders from legacy upstream/downstream tables
// before they're dropped. Idempotent — only fires on an empty
// file_holders table.
self.seed_file_holders_from_legacy()?;
Ok(()) Ok(())
} }
@ -4393,6 +4428,121 @@ impl Storage {
Ok(count as u32) Ok(count as u32)
} }
// --- File holders (flat, per-file, LRU-capped at 5) ---
//
// A single table for PostId-keyed engagement propagation and CID-keyed
// manifest/blob propagation. Any 32-byte content-addressed file_id fits.
/// Upsert a holder for a file. Bumps last_interaction_ms to now and
/// enforces an LRU cap of 5 holders per file.
pub fn touch_file_holder(
&self,
file_id: &[u8; 32],
peer_id: &NodeId,
peer_addresses: &[String],
direction: HolderDirection,
) -> anyhow::Result<()> {
let addrs_json = serde_json::to_string(peer_addresses)?;
let now = now_ms();
let new_dir = direction.as_str();
// Upsert. If the row exists with a different direction, promote to "both".
self.conn.execute(
"INSERT INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(file_id, peer_id) DO UPDATE SET
peer_addresses = CASE WHEN length(?3) > 2 THEN ?3 ELSE peer_addresses END,
last_interaction_ms = ?4,
direction = CASE WHEN direction = ?5 THEN direction ELSE 'both' END",
params![file_id.as_slice(), peer_id.as_slice(), addrs_json, now as i64, new_dir],
)?;
// Enforce LRU cap of 5. Oldest get dropped.
self.conn.execute(
"DELETE FROM file_holders
WHERE file_id = ?1
AND peer_id NOT IN (
SELECT peer_id FROM file_holders
WHERE file_id = ?1
ORDER BY last_interaction_ms DESC
LIMIT 5
)",
params![file_id.as_slice()],
)?;
Ok(())
}
/// Return the up-to-5 most recently interacted holders of a file.
pub fn get_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<Vec<(NodeId, Vec<String>)>> {
let mut stmt = self.conn.prepare(
"SELECT peer_id, peer_addresses FROM file_holders
WHERE file_id = ?1
ORDER BY last_interaction_ms DESC
LIMIT 5",
)?;
let rows = stmt.query_map(params![file_id.as_slice()], |row| {
let peer_bytes: Vec<u8> = row.get(0)?;
let addrs_json: String = row.get(1)?;
Ok((peer_bytes, addrs_json))
})?;
let mut out = Vec::new();
for row in rows {
let (peer_bytes, addrs_json) = row?;
if peer_bytes.len() != 32 { continue; }
let mut peer = [0u8; 32];
peer.copy_from_slice(&peer_bytes);
let addrs: Vec<String> = serde_json::from_str(&addrs_json).unwrap_or_default();
out.push((NodeId::from(peer), addrs));
}
Ok(out)
}
/// Remove all holders for a file (e.g. on post/blob deletion).
pub fn delete_file_holders(&self, file_id: &[u8; 32]) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM file_holders WHERE file_id = ?1",
params![file_id.as_slice()],
)?;
Ok(())
}
/// One-time migration: seed file_holders from the legacy upstream/downstream
/// tables so a user upgrading from pre-0.6.1 doesn't start with empty holder
/// sets. Idempotent — inserts use ON CONFLICT DO NOTHING semantics via the
/// PRIMARY KEY.
pub fn seed_file_holders_from_legacy(&self) -> anyhow::Result<()> {
// Skip if file_holders already populated (idempotent re-run protection).
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM file_holders")?
.query_row([], |row| row.get(0))?;
if existing > 0 {
return Ok(());
}
let now = now_ms() as i64;
// post_upstream → holders we received engagement diffs from
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'received' FROM post_upstream",
params![now],
)?;
// post_downstream → holders we sent engagement diffs to
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT post_id, peer_node_id, '[]', ?1, 'sent' FROM post_downstream",
params![now],
)?;
// blob_upstream → peer we fetched the blob/manifest from
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, source_node_id, source_addresses, ?1, 'received' FROM blob_upstream",
params![now],
)?;
// blob_downstream → peers we served the blob/manifest to
self.conn.execute(
"INSERT OR IGNORE INTO file_holders (file_id, peer_id, peer_addresses, last_interaction_ms, direction)
SELECT cid, peer_node_id, peer_addresses, ?1, 'sent' FROM blob_downstream",
params![now],
)?;
Ok(())
}
// --- Engagement: reactions --- // --- Engagement: reactions ---
/// Store a reaction (upsert by reactor+post_id+emoji). /// Store a reaction (upsert by reactor+post_id+emoji).