feat(fof-layer2): persist FoF fields + revocation table

Storage now persists the FoF Layer 2 wire fields across restarts:

posts.fof_gating_json
- JSON-serialized FoFCommentGating, added via migration to existing
  DBs. get_post / store_post / store_post_with_visibility /
  store_post_with_intent all roundtrip the field.

comments.{pub_x_index, group_sig, encrypted_payload}
- Added via migration. store_comment writes them; get_comments reads
  them back. Old non-FoF comments deserialize to None for all three.

fof_revocations (new table)
- (post_id, revoked_pub_x, revoked_at_ms, reason_code, author_sig)
- Local live state. Distinct from the post's snapshot
  fof_gating.revocation_list (rarely populated on publish).

New storage methods:
- add_fof_revocation(...): idempotent insert.
- is_fof_pub_x_revoked(post_id, pub_x): cheap COUNT for CDN-verify.
- list_fof_revocations(post_id): batch read.
- delete_fof_comments_by_pub_x_index(post_id, pub_x_index): cascade
  delete for the receive-side revocation handler (next slice).

CDN four-check gate in connection.rs now consults BOTH the post's
snapshot revocation_list and the live fof_revocations table.

139 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-05-14 14:51:02 -04:00
parent 63ff5ad6eb
commit 583033e065
2 changed files with 189 additions and 22 deletions

View file

@ -6197,12 +6197,12 @@ impl ConnectionManager {
if !crate::fof::verify_fof_group_sig(comment, gating) { if !crate::fof::verify_fof_group_sig(comment, gating) {
continue; continue;
} }
// Revocation check (step 4). The // Revocation check (step 4). Two
// revocation_list on the post's stored // sources: the post's snapshot
// copy is the on-publish snapshot; // revocation_list (rarely populated on
// revocation diffs that arrive later // publish) and the live local table
// are applied against the local // fof_revocations (accumulated as
// BlobHeader copy (separate slice). // revocation diffs arrive on the wire).
if let Some(idx) = comment.pub_x_index { if let Some(idx) = comment.pub_x_index {
let pub_x = gating.pub_post_set let pub_x = gating.pub_post_set
.get(idx as usize) .get(idx as usize)
@ -6212,6 +6212,9 @@ impl ConnectionManager {
.any(|r| r.revoked_pub_x == pub_x) { .any(|r| r.revoked_pub_x == pub_x) {
continue; continue;
} }
if storage.is_fof_pub_x_revoked(&payload.post_id, &pub_x).unwrap_or(false) {
continue;
}
} }
} }
} }

View file

@ -361,6 +361,11 @@ impl Storage {
timestamp_ms INTEGER NOT NULL, timestamp_ms INTEGER NOT NULL,
signature BLOB NOT NULL, signature BLOB NOT NULL,
ref_post_id BLOB, ref_post_id BLOB,
-- FoF Layer 2: optional comment fields (NULL on non-FoF)
pub_x_index INTEGER,
group_sig BLOB,
encrypted_payload BLOB,
deleted_at INTEGER,
PRIMARY KEY (author, post_id, timestamp_ms) PRIMARY KEY (author, post_id, timestamp_ms)
); );
CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id); CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id);
@ -469,7 +474,23 @@ impl Storage {
granted_at_ms INTEGER NOT NULL, granted_at_ms INTEGER NOT NULL,
current INTEGER NOT NULL DEFAULT 1, current INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (voucher_persona_id, target_persona_id) PRIMARY KEY (voucher_persona_id, target_persona_id)
);", );
-- FoF Layer 2: per-post revocations applied locally as
-- revocation diffs propagate through the CDN. The post's
-- own fof_gating.revocation_list is the t=0 snapshot
-- (usually empty); this table is the live accumulated
-- state. CDN-verify rejects any comment whose
-- pub_post_set[pub_x_index] appears here for this post.
CREATE TABLE IF NOT EXISTS fof_revocations (
post_id BLOB NOT NULL,
revoked_pub_x BLOB NOT NULL,
revoked_at_ms INTEGER NOT NULL,
reason_code INTEGER NOT NULL DEFAULT 0,
author_sig BLOB NOT NULL,
PRIMARY KEY (post_id, revoked_pub_x)
);
CREATE INDEX IF NOT EXISTS idx_fof_revocations_post
ON fof_revocations(post_id);",
)?; )?;
Ok(()) Ok(())
} }
@ -843,6 +864,32 @@ impl Storage {
// 0.6.2-beta: seed post_recipients index from existing encrypted posts. // 0.6.2-beta: seed post_recipients index from existing encrypted posts.
self.seed_post_recipients_from_posts()?; self.seed_post_recipients_from_posts()?;
// FoF Layer 2: add comment columns for pub_x_index / group_sig /
// encrypted_payload. Old DBs have NULL → deserializes to None.
let has_comment_pub_x = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('comments') WHERE name='pub_x_index'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_comment_pub_x == 0 {
self.conn.execute_batch(
"ALTER TABLE comments ADD COLUMN pub_x_index INTEGER;
ALTER TABLE comments ADD COLUMN group_sig BLOB;
ALTER TABLE comments ADD COLUMN encrypted_payload BLOB;"
)?;
}
// FoF Layer 2: post.fof_gating is serialized as JSON in a new
// column so we can rehydrate the gating block on receive paths
// (CDN verify needs pub_post_set / revocation_list). Stored
// alongside the existing post fields.
let has_post_fof = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('posts') WHERE name='fof_gating_json'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_post_fof == 0 {
self.conn.execute_batch(
"ALTER TABLE posts ADD COLUMN fof_gating_json TEXT;"
)?;
}
Ok(()) Ok(())
} }
@ -862,8 +909,13 @@ impl Storage {
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
let attachments_json = serde_json::to_string(&post.attachments)?; let attachments_json = serde_json::to_string(&post.attachments)?;
let visibility_json = serde_json::to_string(visibility)?; let visibility_json = serde_json::to_string(visibility)?;
let fof_json = match &post.fof_gating {
Some(g) => Some(serde_json::to_string(g)?),
None => None,
};
let inserted = self.conn.execute( let inserted = self.conn.execute(
"INSERT OR IGNORE INTO posts (id, author, content, attachments, timestamp_ms, visibility) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", "INSERT OR IGNORE INTO posts (id, author, content, attachments, timestamp_ms, visibility, fof_gating_json)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![ params![
id.as_slice(), id.as_slice(),
post.author.as_slice(), post.author.as_slice(),
@ -871,6 +923,7 @@ impl Storage {
attachments_json, attachments_json,
post.timestamp_ms as i64, post.timestamp_ms as i64,
visibility_json, visibility_json,
fof_json,
], ],
)?; )?;
if inserted > 0 { if inserted > 0 {
@ -880,20 +933,24 @@ impl Storage {
} }
pub fn get_post(&self, id: &PostId) -> anyhow::Result<Option<Post>> { pub fn get_post(&self, id: &PostId) -> anyhow::Result<Option<Post>> {
let mut stmt = self let mut stmt = self.conn.prepare(
.conn "SELECT author, content, attachments, timestamp_ms, fof_gating_json
.prepare("SELECT author, content, attachments, timestamp_ms FROM posts WHERE id = ?1")?; FROM posts WHERE id = ?1",
)?;
let mut rows = stmt.query(params![id.as_slice()])?; let mut rows = stmt.query(params![id.as_slice()])?;
if let Some(row) = rows.next()? { if let Some(row) = rows.next()? {
let attachments: Vec<Attachment> = serde_json::from_str( let attachments: Vec<Attachment> = serde_json::from_str(
&row.get::<_, String>(2)? &row.get::<_, String>(2)?
).unwrap_or_default(); ).unwrap_or_default();
let fof_json: Option<String> = row.get(4)?;
let fof_gating = fof_json
.and_then(|s| serde_json::from_str::<crate::types::FoFCommentGating>(&s).ok());
Ok(Some(Post { Ok(Some(Post {
author: blob_to_nodeid(row.get(0)?)?, author: blob_to_nodeid(row.get(0)?)?,
content: row.get(1)?, content: row.get(1)?,
attachments, attachments,
timestamp_ms: row.get::<_, i64>(3)? as u64, timestamp_ms: row.get::<_, i64>(3)? as u64,
fof_gating: None, fof_gating,
})) }))
} else { } else {
Ok(None) Ok(None)
@ -2845,8 +2902,14 @@ impl Storage {
let attachments_json = serde_json::to_string(&post.attachments)?; let attachments_json = serde_json::to_string(&post.attachments)?;
let visibility_json = serde_json::to_string(visibility)?; let visibility_json = serde_json::to_string(visibility)?;
let intent_json = serde_json::to_string(intent)?; let intent_json = serde_json::to_string(intent)?;
let fof_json = match &post.fof_gating {
Some(g) => Some(serde_json::to_string(g)?),
None => None,
};
let inserted = self.conn.execute( let inserted = self.conn.execute(
"INSERT OR IGNORE INTO posts (id, author, content, attachments, timestamp_ms, visibility, visibility_intent) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", "INSERT OR IGNORE INTO posts
(id, author, content, attachments, timestamp_ms, visibility, visibility_intent, fof_gating_json)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![ params![
id.as_slice(), id.as_slice(),
post.author.as_slice(), post.author.as_slice(),
@ -2855,6 +2918,7 @@ impl Storage {
post.timestamp_ms as i64, post.timestamp_ms as i64,
visibility_json, visibility_json,
intent_json, intent_json,
fof_json,
], ],
)?; )?;
if inserted > 0 { if inserted > 0 {
@ -4960,6 +5024,94 @@ impl Storage {
Ok(()) Ok(())
} }
/// FoF Layer 2: record a post-level revocation locally. Idempotent
/// on `(post_id, revoked_pub_x)`. Subsequent incoming comments
/// where `pub_post_set[pub_x_index] == revoked_pub_x` are rejected
/// at the CDN verify step (see `is_fof_pub_x_revoked`).
///
/// Retroactive delete of already-stored comments under the revoked
/// pub_x is handled by `delete_fof_comments_by_pub_x`, which the
/// receive path calls after applying the revocation (it has the
/// post + pub_post_set context required to resolve pub_x_index →
/// pub_x bytes).
pub fn add_fof_revocation(
&self,
post_id: &PostId,
revoked_pub_x: &[u8; 32],
revoked_at_ms: u64,
reason_code: u8,
author_sig: &[u8],
) -> anyhow::Result<()> {
self.conn.execute(
"INSERT OR IGNORE INTO fof_revocations
(post_id, revoked_pub_x, revoked_at_ms, reason_code, author_sig)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
post_id.as_slice(),
revoked_pub_x.as_slice(),
revoked_at_ms as i64,
reason_code as i64,
author_sig,
],
)?;
Ok(())
}
/// FoF Layer 2: delete locally-stored comments on a post whose
/// `pub_x_index` matches the given index. Returns the number of
/// rows deleted. Called by the receive path after applying a
/// revocation (the index → pub_x_bytes resolution happens in the
/// caller via `pub_post_set`).
pub fn delete_fof_comments_by_pub_x_index(
&self,
post_id: &PostId,
pub_x_index: u32,
) -> anyhow::Result<usize> {
let n = self.conn.execute(
"DELETE FROM comments
WHERE post_id = ?1 AND pub_x_index = ?2",
params![post_id.as_slice(), pub_x_index as i64],
)?;
Ok(n)
}
/// FoF Layer 2: is the given pub_x revoked for this post?
pub fn is_fof_pub_x_revoked(
&self,
post_id: &PostId,
pub_x: &[u8; 32],
) -> anyhow::Result<bool> {
let n: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM fof_revocations
WHERE post_id = ?1 AND revoked_pub_x = ?2",
)?.query_row(
params![post_id.as_slice(), pub_x.as_slice()],
|row| row.get(0),
)?;
Ok(n > 0)
}
/// FoF Layer 2: list all revoked pub_x's for a post (for the CDN
/// four-check; cheaper than a per-comment query when verifying many
/// comments in a row).
pub fn list_fof_revocations(&self, post_id: &PostId) -> anyhow::Result<Vec<[u8; 32]>> {
let mut stmt = self.conn.prepare(
"SELECT revoked_pub_x FROM fof_revocations WHERE post_id = ?1",
)?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| {
let b: Vec<u8> = row.get(0)?;
Ok(b)
})?;
let mut out = Vec::new();
for r in rows {
let b = r?;
let arr: [u8; 32] = b.as_slice().try_into()
.map_err(|_| anyhow::anyhow!("invalid revoked_pub_x in storage"))?;
out.push(arr);
}
Ok(out)
}
/// Increment and return the next bio-publish epoch for a persona. /// Increment and return the next bio-publish epoch for a persona.
/// Counter is monotonic; used by receivers' scan cache to short-circuit /// Counter is monotonic; used by receivers' scan cache to short-circuit
/// re-scanning unchanged bios. Stored in `settings` keyed by persona. /// re-scanning unchanged bios. Stored in `settings` keyed by persona.
@ -5240,12 +5392,17 @@ impl Storage {
/// deleted_at tombstone, store it so the tombstone propagates. /// deleted_at tombstone, store it so the tombstone propagates.
pub fn store_comment(&self, comment: &InlineComment) -> anyhow::Result<()> { pub fn store_comment(&self, comment: &InlineComment) -> anyhow::Result<()> {
self.conn.execute( self.conn.execute(
"INSERT INTO comments (author, post_id, content, timestamp_ms, signature, deleted_at, ref_post_id) "INSERT INTO comments
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) (author, post_id, content, timestamp_ms, signature, deleted_at,
ref_post_id, pub_x_index, group_sig, encrypted_payload)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
ON CONFLICT(author, post_id, timestamp_ms) DO UPDATE SET ON CONFLICT(author, post_id, timestamp_ms) DO UPDATE SET
content = CASE WHEN excluded.deleted_at IS NOT NULL THEN content ELSE excluded.content END, content = CASE WHEN excluded.deleted_at IS NOT NULL THEN content ELSE excluded.content END,
deleted_at = CASE WHEN excluded.deleted_at IS NOT NULL THEN excluded.deleted_at ELSE deleted_at END, deleted_at = CASE WHEN excluded.deleted_at IS NOT NULL THEN excluded.deleted_at ELSE deleted_at END,
ref_post_id = COALESCE(excluded.ref_post_id, ref_post_id)", ref_post_id = COALESCE(excluded.ref_post_id, ref_post_id),
pub_x_index = COALESCE(excluded.pub_x_index, pub_x_index),
group_sig = COALESCE(excluded.group_sig, group_sig),
encrypted_payload = COALESCE(excluded.encrypted_payload, encrypted_payload)",
params![ params![
comment.author.as_slice(), comment.author.as_slice(),
comment.post_id.as_slice(), comment.post_id.as_slice(),
@ -5254,6 +5411,9 @@ impl Storage {
comment.signature, comment.signature,
comment.deleted_at.map(|v| v as i64), comment.deleted_at.map(|v| v as i64),
comment.ref_post_id.as_ref().map(|r| r.as_slice()), comment.ref_post_id.as_ref().map(|r| r.as_slice()),
comment.pub_x_index.map(|i| i as i64),
comment.group_sig.as_ref().map(|b| b.as_slice()),
comment.encrypted_payload.as_ref().map(|b| b.as_slice()),
], ],
)?; )?;
Ok(()) Ok(())
@ -5280,7 +5440,8 @@ impl Storage {
/// Get live (non-tombstoned) comments for a post. Used for UI display. /// Get live (non-tombstoned) comments for a post. Used for UI display.
pub fn get_comments(&self, post_id: &PostId) -> anyhow::Result<Vec<InlineComment>> { pub fn get_comments(&self, post_id: &PostId) -> anyhow::Result<Vec<InlineComment>> {
let mut stmt = self.conn.prepare( let mut stmt = self.conn.prepare(
"SELECT author, post_id, content, timestamp_ms, signature, ref_post_id "SELECT author, post_id, content, timestamp_ms, signature, ref_post_id,
pub_x_index, group_sig, encrypted_payload
FROM comments WHERE post_id = ?1 AND deleted_at IS NULL ORDER BY timestamp_ms ASC" FROM comments WHERE post_id = ?1 AND deleted_at IS NULL ORDER BY timestamp_ms ASC"
)?; )?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| { let rows = stmt.query_map(params![post_id.as_slice()], |row| {
@ -5290,11 +5451,14 @@ impl Storage {
let ts: i64 = row.get(3)?; let ts: i64 = row.get(3)?;
let sig: Vec<u8> = row.get(4)?; let sig: Vec<u8> = row.get(4)?;
let ref_post: Option<Vec<u8>> = row.get(5)?; let ref_post: Option<Vec<u8>> = row.get(5)?;
Ok((author, pid, content, ts, sig, ref_post)) let pxi: Option<i64> = row.get(6)?;
let gsig: Option<Vec<u8>> = row.get(7)?;
let epl: Option<Vec<u8>> = row.get(8)?;
Ok((author, pid, content, ts, sig, ref_post, pxi, gsig, epl))
})?; })?;
let mut result = Vec::new(); let mut result = Vec::new();
for row in rows { for row in rows {
let (author_bytes, pid_bytes, content, ts, sig, ref_post) = row?; let (author_bytes, pid_bytes, content, ts, sig, ref_post, pxi, gsig, epl) = row?;
let author = blob_to_nodeid(author_bytes)?; let author = blob_to_nodeid(author_bytes)?;
let post_id = blob_to_postid(pid_bytes)?; let post_id = blob_to_postid(pid_bytes)?;
let ref_post_id = match ref_post { let ref_post_id = match ref_post {
@ -5309,9 +5473,9 @@ impl Storage {
signature: sig, signature: sig,
deleted_at: None, deleted_at: None,
ref_post_id, ref_post_id,
pub_x_index: None, pub_x_index: pxi.map(|v| v as u32),
group_sig: None, group_sig: gsig,
encrypted_payload: None, encrypted_payload: epl,
}); });
} }
Ok(result) Ok(result)