Phase 3 (0.6.2-beta): merged pull + recipient-match

A non-follower can now receive DMs addressed to them via a normal pull
cycle, with no distinguishable "searching for DMs" traffic pattern —
the pull query is a uniform list of NodeIds that the server matches
against both authors and wrapped-key recipients.

Schema (migrations on first 0.6.2 launch):
- New post_recipients(post_id, recipient) index table with index on
  recipient column
- Seed migration scans existing encrypted posts, extracts recipients
  and group members from visibility JSON, populates the index

Write path:
- store_post_with_visibility / store_post_with_intent populate
  post_recipients on successful insert
- update_post_visibility rebuilds the index for the updated post
- apply_delete cascade-removes post_recipients entries

Server pull handler (should_send_post):
- Renamed semantic: requester_follows → query_list. Contains every
  NodeId the client wants posts for (follows + own NodeId).
- Encrypted/GroupEncrypted posts match if ANY recipient / group
  member is in query_list (previously only if == requester).
- Wire protocol unchanged — the same PullSyncRequestPayload.follows
  field now carries both follow targets and own NodeId indistinguishably.

Client pull paths (all three call sites in network.rs + connection.rs):
- Always append own NodeId to the query list before sending pull sync.

Storage helper:
- get_post_ids_for_recipients(nids) — bulk IN-match using the
  idx_post_recipients_recipient index, for future SQL-side filtering.

Tests:
- should_send_post's recipient tests updated to pass query_list
  containing requester (matches new contract).
- Added encrypted_post_matches_via_query_list_even_for_third_party_recipient
  proving the server matches on any recipient in the list, not just
  the requester itself.

All 111 core tests pass. Smoke-tested end-to-end: A posts encrypted
DM to B; B connects + syncs; B decrypts and reads DM; both sides'
post_recipients correctly populated on store.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-04-21 22:15:55 -04:00
parent 5d9ba22427
commit 975e7b9bfe
3 changed files with 201 additions and 21 deletions

View file

@ -394,7 +394,14 @@ impl Storage {
PRIMARY KEY (file_id, peer_id)
);
CREATE INDEX IF NOT EXISTS idx_file_holders_recency
ON file_holders(file_id, last_interaction_ms DESC);",
ON file_holders(file_id, last_interaction_ms DESC);
CREATE TABLE IF NOT EXISTS post_recipients (
post_id BLOB NOT NULL,
recipient BLOB NOT NULL,
PRIMARY KEY (post_id, recipient)
);
CREATE INDEX IF NOT EXISTS idx_post_recipients_recipient
ON post_recipients(recipient);",
)?;
Ok(())
}
@ -674,6 +681,9 @@ impl Storage {
DROP TABLE IF EXISTS post_downstream;",
)?;
// 0.6.2-beta: seed post_recipients index from existing encrypted posts.
self.seed_post_recipients_from_posts()?;
Ok(())
}
@ -704,6 +714,9 @@ impl Storage {
visibility_json,
],
)?;
if inserted > 0 {
self.index_post_recipients(id, visibility)?;
}
Ok(inserted > 0)
}
@ -2385,6 +2398,7 @@ impl Storage {
)?;
if deleted > 0 {
self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM post_recipients WHERE post_id = ?1", params![record.post_id.as_slice()])?;
self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?;
}
Ok(deleted > 0)
@ -2431,6 +2445,14 @@ impl Storage {
"UPDATE posts SET visibility = ?1 WHERE id = ?2",
params![vis_json, post_id.as_slice()],
)?;
if updated > 0 {
// Rebuild recipient index from new visibility
self.conn.execute(
"DELETE FROM post_recipients WHERE post_id = ?1",
params![post_id.as_slice()],
)?;
self.index_post_recipients(post_id, new_visibility)?;
}
Ok(updated > 0)
}
@ -2459,6 +2481,9 @@ impl Storage {
intent_json,
],
)?;
if inserted > 0 {
self.index_post_recipients(id, visibility)?;
}
Ok(inserted > 0)
}
@ -4116,6 +4141,102 @@ impl Storage {
Ok(result)
}
// --- Post recipients index (for merged-pull recipient-match) ---
/// Insert all recipient NodeIds for an encrypted post into post_recipients.
/// No-op for Public visibility. Called on post insert / visibility update.
fn index_post_recipients(
&self,
post_id: &PostId,
visibility: &PostVisibility,
) -> anyhow::Result<()> {
match visibility {
PostVisibility::Public => Ok(()),
PostVisibility::Encrypted { recipients } => {
for wk in recipients {
self.conn.execute(
"INSERT OR IGNORE INTO post_recipients (post_id, recipient) VALUES (?1, ?2)",
params![post_id.as_slice(), wk.recipient.as_slice()],
)?;
}
Ok(())
}
PostVisibility::GroupEncrypted { group_id, .. } => {
// For group-encrypted posts, index the group's members.
let members = self.get_all_group_members()
.ok()
.and_then(|m| m.get(group_id).cloned())
.unwrap_or_default();
for member in members {
self.conn.execute(
"INSERT OR IGNORE INTO post_recipients (post_id, recipient) VALUES (?1, ?2)",
params![post_id.as_slice(), member.as_slice()],
)?;
}
Ok(())
}
}
}
/// Return all post IDs for which any of the given NodeIds is a recipient.
/// Uses the idx_post_recipients_recipient index.
pub fn get_post_ids_for_recipients(
&self,
recipients: &[NodeId],
) -> anyhow::Result<Vec<PostId>> {
if recipients.is_empty() {
return Ok(Vec::new());
}
let placeholders: Vec<&str> = (0..recipients.len()).map(|_| "?").collect();
let sql = format!(
"SELECT DISTINCT post_id FROM post_recipients WHERE recipient IN ({})",
placeholders.join(",")
);
let mut stmt = self.conn.prepare(&sql)?;
let params = rusqlite::params_from_iter(recipients.iter().map(|r| r.to_vec()));
let rows = stmt.query_map(params, |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
})?;
let mut out = Vec::new();
for row in rows {
if let Ok(pid) = <[u8; 32]>::try_from(row?.as_slice()) {
out.push(pid);
}
}
Ok(out)
}
/// Seed the post_recipients index from existing encrypted posts.
/// One-time idempotent migration for users upgrading from pre-0.6.2.
pub fn seed_post_recipients_from_posts(&self) -> anyhow::Result<()> {
let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM post_recipients")?
.query_row([], |row| row.get(0))?;
if existing > 0 {
return Ok(());
}
// Scan all posts, parse visibility, index recipients.
let mut stmt = self.conn.prepare("SELECT id, visibility FROM posts")?;
let rows = stmt.query_map([], |row| {
let id_bytes: Vec<u8> = row.get(0)?;
let vis_json: String = row.get(1)?;
Ok((id_bytes, vis_json))
})?;
let entries: Vec<([u8; 32], PostVisibility)> = rows
.filter_map(|r| r.ok())
.filter_map(|(id_bytes, vis_json)| {
let pid = <[u8; 32]>::try_from(id_bytes.as_slice()).ok()?;
let vis: PostVisibility = serde_json::from_str(&vis_json).ok()?;
Some((pid, vis))
})
.collect();
drop(stmt);
for (pid, vis) in entries {
self.index_post_recipients(&pid, &vis)?;
}
Ok(())
}
// --- File holders (flat, per-file, LRU-capped at 5) ---
//
// A single table for PostId-keyed engagement propagation and CID-keyed