From 583033e065d54f94a574e142114b0355ed507e33 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Thu, 14 May 2026 14:51:02 -0400 Subject: [PATCH] feat(fof-layer2): persist FoF fields + revocation table Storage now persists the FoF Layer 2 wire fields across restarts: posts.fof_gating_json - JSON-serialized FoFCommentGating, added via migration to existing DBs. get_post / store_post / store_post_with_visibility / store_post_with_intent all roundtrip the field. comments.{pub_x_index, group_sig, encrypted_payload} - Added via migration. store_comment writes them; get_comments reads them back. Old non-FoF comments deserialize to None for all three. fof_revocations (new table) - (post_id, revoked_pub_x, revoked_at_ms, reason_code, author_sig) - Local live state. Distinct from the post's snapshot fof_gating.revocation_list (rarely populated on publish). New storage methods: - add_fof_revocation(...): idempotent insert. - is_fof_pub_x_revoked(post_id, pub_x): cheap COUNT for CDN-verify. - list_fof_revocations(post_id): batch read. - delete_fof_comments_by_pub_x_index(post_id, pub_x_index): cascade delete for the receive-side revocation handler (next slice). CDN four-check gate in connection.rs now consults BOTH the post's snapshot revocation_list and the live fof_revocations table. 139 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 15 +-- crates/core/src/storage.rs | 196 +++++++++++++++++++++++++++++++--- 2 files changed, 189 insertions(+), 22 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 2c27307..926adcd 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -6197,12 +6197,12 @@ impl ConnectionManager { if !crate::fof::verify_fof_group_sig(comment, gating) { continue; } - // Revocation check (step 4). The - // revocation_list on the post's stored - // copy is the on-publish snapshot; - // revocation diffs that arrive later - // are applied against the local - // BlobHeader copy (separate slice). + // Revocation check (step 4). Two + // sources: the post's snapshot + // revocation_list (rarely populated on + // publish) and the live local table + // fof_revocations (accumulated as + // revocation diffs arrive on the wire). if let Some(idx) = comment.pub_x_index { let pub_x = gating.pub_post_set .get(idx as usize) @@ -6212,6 +6212,9 @@ impl ConnectionManager { .any(|r| r.revoked_pub_x == pub_x) { continue; } + if storage.is_fof_pub_x_revoked(&payload.post_id, &pub_x).unwrap_or(false) { + continue; + } } } } diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index b9452ea..95d0eea 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -361,6 +361,11 @@ impl Storage { timestamp_ms INTEGER NOT NULL, signature BLOB NOT NULL, ref_post_id BLOB, + -- FoF Layer 2: optional comment fields (NULL on non-FoF) + pub_x_index INTEGER, + group_sig BLOB, + encrypted_payload BLOB, + deleted_at INTEGER, PRIMARY KEY (author, post_id, timestamp_ms) ); CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id); @@ -469,7 +474,23 @@ impl Storage { granted_at_ms INTEGER NOT NULL, current INTEGER NOT NULL DEFAULT 1, PRIMARY KEY (voucher_persona_id, target_persona_id) - );", + ); + -- FoF Layer 2: per-post revocations applied locally as + -- revocation diffs propagate through the CDN. The post's + -- own fof_gating.revocation_list is the t=0 snapshot + -- (usually empty); this table is the live accumulated + -- state. CDN-verify rejects any comment whose + -- pub_post_set[pub_x_index] appears here for this post. + CREATE TABLE IF NOT EXISTS fof_revocations ( + post_id BLOB NOT NULL, + revoked_pub_x BLOB NOT NULL, + revoked_at_ms INTEGER NOT NULL, + reason_code INTEGER NOT NULL DEFAULT 0, + author_sig BLOB NOT NULL, + PRIMARY KEY (post_id, revoked_pub_x) + ); + CREATE INDEX IF NOT EXISTS idx_fof_revocations_post + ON fof_revocations(post_id);", )?; Ok(()) } @@ -843,6 +864,32 @@ impl Storage { // 0.6.2-beta: seed post_recipients index from existing encrypted posts. self.seed_post_recipients_from_posts()?; + // FoF Layer 2: add comment columns for pub_x_index / group_sig / + // encrypted_payload. Old DBs have NULL → deserializes to None. + let has_comment_pub_x = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('comments') WHERE name='pub_x_index'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_comment_pub_x == 0 { + self.conn.execute_batch( + "ALTER TABLE comments ADD COLUMN pub_x_index INTEGER; + ALTER TABLE comments ADD COLUMN group_sig BLOB; + ALTER TABLE comments ADD COLUMN encrypted_payload BLOB;" + )?; + } + + // FoF Layer 2: post.fof_gating is serialized as JSON in a new + // column so we can rehydrate the gating block on receive paths + // (CDN verify needs pub_post_set / revocation_list). Stored + // alongside the existing post fields. + let has_post_fof = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('posts') WHERE name='fof_gating_json'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_post_fof == 0 { + self.conn.execute_batch( + "ALTER TABLE posts ADD COLUMN fof_gating_json TEXT;" + )?; + } + Ok(()) } @@ -862,8 +909,13 @@ impl Storage { ) -> anyhow::Result { let attachments_json = serde_json::to_string(&post.attachments)?; let visibility_json = serde_json::to_string(visibility)?; + let fof_json = match &post.fof_gating { + Some(g) => Some(serde_json::to_string(g)?), + None => None, + }; let inserted = self.conn.execute( - "INSERT OR IGNORE INTO posts (id, author, content, attachments, timestamp_ms, visibility) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + "INSERT OR IGNORE INTO posts (id, author, content, attachments, timestamp_ms, visibility, fof_gating_json) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", params![ id.as_slice(), post.author.as_slice(), @@ -871,6 +923,7 @@ impl Storage { attachments_json, post.timestamp_ms as i64, visibility_json, + fof_json, ], )?; if inserted > 0 { @@ -880,20 +933,24 @@ impl Storage { } pub fn get_post(&self, id: &PostId) -> anyhow::Result> { - let mut stmt = self - .conn - .prepare("SELECT author, content, attachments, timestamp_ms FROM posts WHERE id = ?1")?; + let mut stmt = self.conn.prepare( + "SELECT author, content, attachments, timestamp_ms, fof_gating_json + FROM posts WHERE id = ?1", + )?; let mut rows = stmt.query(params![id.as_slice()])?; if let Some(row) = rows.next()? { let attachments: Vec = serde_json::from_str( &row.get::<_, String>(2)? ).unwrap_or_default(); + let fof_json: Option = row.get(4)?; + let fof_gating = fof_json + .and_then(|s| serde_json::from_str::(&s).ok()); Ok(Some(Post { author: blob_to_nodeid(row.get(0)?)?, content: row.get(1)?, attachments, timestamp_ms: row.get::<_, i64>(3)? as u64, - fof_gating: None, + fof_gating, })) } else { Ok(None) @@ -2845,8 +2902,14 @@ impl Storage { let attachments_json = serde_json::to_string(&post.attachments)?; let visibility_json = serde_json::to_string(visibility)?; let intent_json = serde_json::to_string(intent)?; + let fof_json = match &post.fof_gating { + Some(g) => Some(serde_json::to_string(g)?), + None => None, + }; let inserted = self.conn.execute( - "INSERT OR IGNORE INTO posts (id, author, content, attachments, timestamp_ms, visibility, visibility_intent) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + "INSERT OR IGNORE INTO posts + (id, author, content, attachments, timestamp_ms, visibility, visibility_intent, fof_gating_json) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", params![ id.as_slice(), post.author.as_slice(), @@ -2855,6 +2918,7 @@ impl Storage { post.timestamp_ms as i64, visibility_json, intent_json, + fof_json, ], )?; if inserted > 0 { @@ -4960,6 +5024,94 @@ impl Storage { Ok(()) } + /// FoF Layer 2: record a post-level revocation locally. Idempotent + /// on `(post_id, revoked_pub_x)`. Subsequent incoming comments + /// where `pub_post_set[pub_x_index] == revoked_pub_x` are rejected + /// at the CDN verify step (see `is_fof_pub_x_revoked`). + /// + /// Retroactive delete of already-stored comments under the revoked + /// pub_x is handled by `delete_fof_comments_by_pub_x`, which the + /// receive path calls after applying the revocation (it has the + /// post + pub_post_set context required to resolve pub_x_index → + /// pub_x bytes). + pub fn add_fof_revocation( + &self, + post_id: &PostId, + revoked_pub_x: &[u8; 32], + revoked_at_ms: u64, + reason_code: u8, + author_sig: &[u8], + ) -> anyhow::Result<()> { + self.conn.execute( + "INSERT OR IGNORE INTO fof_revocations + (post_id, revoked_pub_x, revoked_at_ms, reason_code, author_sig) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + post_id.as_slice(), + revoked_pub_x.as_slice(), + revoked_at_ms as i64, + reason_code as i64, + author_sig, + ], + )?; + Ok(()) + } + + /// FoF Layer 2: delete locally-stored comments on a post whose + /// `pub_x_index` matches the given index. Returns the number of + /// rows deleted. Called by the receive path after applying a + /// revocation (the index → pub_x_bytes resolution happens in the + /// caller via `pub_post_set`). + pub fn delete_fof_comments_by_pub_x_index( + &self, + post_id: &PostId, + pub_x_index: u32, + ) -> anyhow::Result { + let n = self.conn.execute( + "DELETE FROM comments + WHERE post_id = ?1 AND pub_x_index = ?2", + params![post_id.as_slice(), pub_x_index as i64], + )?; + Ok(n) + } + + /// FoF Layer 2: is the given pub_x revoked for this post? + pub fn is_fof_pub_x_revoked( + &self, + post_id: &PostId, + pub_x: &[u8; 32], + ) -> anyhow::Result { + let n: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM fof_revocations + WHERE post_id = ?1 AND revoked_pub_x = ?2", + )?.query_row( + params![post_id.as_slice(), pub_x.as_slice()], + |row| row.get(0), + )?; + Ok(n > 0) + } + + /// FoF Layer 2: list all revoked pub_x's for a post (for the CDN + /// four-check; cheaper than a per-comment query when verifying many + /// comments in a row). + pub fn list_fof_revocations(&self, post_id: &PostId) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT revoked_pub_x FROM fof_revocations WHERE post_id = ?1", + )?; + let rows = stmt.query_map(params![post_id.as_slice()], |row| { + let b: Vec = row.get(0)?; + Ok(b) + })?; + let mut out = Vec::new(); + for r in rows { + let b = r?; + let arr: [u8; 32] = b.as_slice().try_into() + .map_err(|_| anyhow::anyhow!("invalid revoked_pub_x in storage"))?; + out.push(arr); + } + Ok(out) + } + /// Increment and return the next bio-publish epoch for a persona. /// Counter is monotonic; used by receivers' scan cache to short-circuit /// re-scanning unchanged bios. Stored in `settings` keyed by persona. @@ -5240,12 +5392,17 @@ impl Storage { /// deleted_at tombstone, store it so the tombstone propagates. pub fn store_comment(&self, comment: &InlineComment) -> anyhow::Result<()> { self.conn.execute( - "INSERT INTO comments (author, post_id, content, timestamp_ms, signature, deleted_at, ref_post_id) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) + "INSERT INTO comments + (author, post_id, content, timestamp_ms, signature, deleted_at, + ref_post_id, pub_x_index, group_sig, encrypted_payload) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) ON CONFLICT(author, post_id, timestamp_ms) DO UPDATE SET content = CASE WHEN excluded.deleted_at IS NOT NULL THEN content ELSE excluded.content END, deleted_at = CASE WHEN excluded.deleted_at IS NOT NULL THEN excluded.deleted_at ELSE deleted_at END, - ref_post_id = COALESCE(excluded.ref_post_id, ref_post_id)", + ref_post_id = COALESCE(excluded.ref_post_id, ref_post_id), + pub_x_index = COALESCE(excluded.pub_x_index, pub_x_index), + group_sig = COALESCE(excluded.group_sig, group_sig), + encrypted_payload = COALESCE(excluded.encrypted_payload, encrypted_payload)", params![ comment.author.as_slice(), comment.post_id.as_slice(), @@ -5254,6 +5411,9 @@ impl Storage { comment.signature, comment.deleted_at.map(|v| v as i64), comment.ref_post_id.as_ref().map(|r| r.as_slice()), + comment.pub_x_index.map(|i| i as i64), + comment.group_sig.as_ref().map(|b| b.as_slice()), + comment.encrypted_payload.as_ref().map(|b| b.as_slice()), ], )?; Ok(()) @@ -5280,7 +5440,8 @@ impl Storage { /// Get live (non-tombstoned) comments for a post. Used for UI display. pub fn get_comments(&self, post_id: &PostId) -> anyhow::Result> { let mut stmt = self.conn.prepare( - "SELECT author, post_id, content, timestamp_ms, signature, ref_post_id + "SELECT author, post_id, content, timestamp_ms, signature, ref_post_id, + pub_x_index, group_sig, encrypted_payload FROM comments WHERE post_id = ?1 AND deleted_at IS NULL ORDER BY timestamp_ms ASC" )?; let rows = stmt.query_map(params![post_id.as_slice()], |row| { @@ -5290,11 +5451,14 @@ impl Storage { let ts: i64 = row.get(3)?; let sig: Vec = row.get(4)?; let ref_post: Option> = row.get(5)?; - Ok((author, pid, content, ts, sig, ref_post)) + let pxi: Option = row.get(6)?; + let gsig: Option> = row.get(7)?; + let epl: Option> = row.get(8)?; + Ok((author, pid, content, ts, sig, ref_post, pxi, gsig, epl)) })?; let mut result = Vec::new(); for row in rows { - let (author_bytes, pid_bytes, content, ts, sig, ref_post) = row?; + let (author_bytes, pid_bytes, content, ts, sig, ref_post, pxi, gsig, epl) = row?; let author = blob_to_nodeid(author_bytes)?; let post_id = blob_to_postid(pid_bytes)?; let ref_post_id = match ref_post { @@ -5309,9 +5473,9 @@ impl Storage { signature: sig, deleted_at: None, ref_post_id, - pub_x_index: None, - group_sig: None, - encrypted_payload: None, + pub_x_index: pxi.map(|v| v as u32), + group_sig: gsig, + encrypted_payload: epl, }); } Ok(result)