From 975e7b9bfe8401d0a6f686b18939c15cb3ad32e7 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Tue, 21 Apr 2026 22:15:55 -0400 Subject: [PATCH] Phase 3 (0.6.2-beta): merged pull + recipient-match MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A non-follower can now receive DMs addressed to them via a normal pull cycle, with no distinguishable "searching for DMs" traffic pattern — the pull query is a uniform list of NodeIds that the server matches against both authors and wrapped-key recipients. Schema (migrations on first 0.6.2 launch): - New post_recipients(post_id, recipient) index table with index on recipient column - Seed migration scans existing encrypted posts, extracts recipients and group members from visibility JSON, populates the index Write path: - store_post_with_visibility / store_post_with_intent populate post_recipients on successful insert - update_post_visibility rebuilds the index for the updated post - apply_delete cascade-removes post_recipients entries Server pull handler (should_send_post): - Renamed semantic: requester_follows → query_list. Contains every NodeId the client wants posts for (follows + own NodeId). - Encrypted/GroupEncrypted posts match if ANY recipient / group member is in query_list (previously only if == requester). - Wire protocol unchanged — the same PullSyncRequestPayload.follows field now carries both follow targets and own NodeId indistinguishably. Client pull paths (all three call sites in network.rs + connection.rs): - Always append own NodeId to the query list before sending pull sync. Storage helper: - get_post_ids_for_recipients(nids) — bulk IN-match using the idx_post_recipients_recipient index, for future SQL-side filtering. Tests: - should_send_post's recipient tests updated to pass query_list containing requester (matches new contract). - Added encrypted_post_matches_via_query_list_even_for_third_party_recipient proving the server matches on any recipient in the list, not just the requester itself. All 111 core tests pass. Smoke-tested end-to-end: A posts encrypted DM to B; B connects + syncs; B decrypts and reads DM; both sides' post_recipients correctly populated on store. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/connection.rs | 35 ++++++++-- crates/core/src/network.rs | 64 ++++++++++++++---- crates/core/src/storage.rs | 123 +++++++++++++++++++++++++++++++++- 3 files changed, 201 insertions(+), 21 deletions(-) diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 7d97a7a..3ae2388 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1346,14 +1346,22 @@ impl ConnectionManager { conn: iroh::endpoint::Connection, storage: &Arc, peer_id: &NodeId, + our_node_id: NodeId, ) -> anyhow::Result { let (our_follows, follows_sync) = { let s = storage.get().await; (s.list_follows()?, s.get_follows_with_last_sync().unwrap_or_default()) }; + // Merged pull: include our own NodeId in the query so the peer returns + // posts where we're either a followed author OR a recipient (DM). + let mut query_list = our_follows; + if !query_list.contains(&our_node_id) { + query_list.push(our_node_id); + } + let request = PullSyncRequestPayload { - follows: our_follows, + follows: query_list, have_post_ids: vec![], since_ms: follows_sync, }; @@ -1903,9 +1911,15 @@ impl ConnectionManager { ) }; + // Merged pull: include our own NodeId in the query list. + let mut query_list = our_follows; + if !query_list.contains(&self.our_node_id) { + query_list.push(self.our_node_id); + } + let (mut send, mut recv) = pull_conn.open_bi().await?; let request = PullSyncRequestPayload { - follows: our_follows, + follows: query_list, have_post_ids: vec![], // v4: empty, using since_ms instead since_ms: follows_sync, }; @@ -1995,8 +2009,14 @@ impl ConnectionManager { ) }; + // Merged pull: include our own NodeId in the query list. + let mut query_list = our_follows; + if !query_list.contains(&self.our_node_id) { + query_list.push(self.our_node_id); + } + let request = PullSyncRequestPayload { - follows: our_follows, + follows: query_list, have_post_ids: vec![], // v4: empty, using since_ms instead since_ms: follows_sync, }; @@ -8295,15 +8315,16 @@ impl ConnectionActor { let _ = reply.send(r); } ConnCommand::PullFromPeer { peer, reply } => { - // Brief lock: grab connection clone + follows data + // Brief lock: grab connection clone + our_node_id let gather = { let cm = self.cm.lock().await; - cm.connections.get(&peer).map(|pc| pc.connection.clone()) + cm.connections.get(&peer) + .map(|pc| (pc.connection.clone(), cm.our_node_id)) }; let r = match gather { - Some(conn) => { + Some((conn, our_node_id)) => { // All I/O outside the lock, storage accessed via hoisted Arc - ConnectionManager::pull_from_peer_unlocked(conn, &self.storage, &peer).await + ConnectionManager::pull_from_peer_unlocked(conn, &self.storage, &peer, our_node_id).await } None => Err(anyhow::anyhow!("not connected to {}", hex::encode(peer))), }; diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 7b3a11a..1d16f4a 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -1719,12 +1719,17 @@ impl Network { storage.get_follows_with_last_sync().unwrap_or_default(), ) }; + // Merged pull: include our own NodeId so DMs addressed to us match. + let mut query_list = our_follows; + if !query_list.contains(&self.our_node_id) { + query_list.push(self.our_node_id); + } let (mut send, mut recv) = conn.open_bi().await?; write_typed_message( &mut send, MessageType::PullSyncRequest, &PullSyncRequestPayload { - follows: our_follows, + follows: query_list, have_post_ids: vec![], // v4: empty, using since_ms instead since_ms: follows_sync, }, @@ -2331,24 +2336,39 @@ pub struct PullStats { } /// Decide whether a post should be sent to a requesting peer. +/// Decide whether to send a post to a requesting peer based on their pull +/// query list. The query_list contains every NodeId the requester wants +/// posts for — their follows AND their own NodeId (added by the client so +/// DMs addressed to them match). A post is returned if: +/// - its author is in the query list (public-follow path), OR +/// - any recipient / group-member is in the query list (DM / group path). +/// +/// This uniform author-OR-recipient match means the wire-level query shape is +/// indistinguishable between "looking for follows" and "looking for DMs." pub fn should_send_post( post: &Post, visibility: &PostVisibility, requester: &NodeId, - requester_follows: &HashSet, + query_list: &HashSet, group_members: &std::collections::HashMap>, ) -> bool { if &post.author == requester { return true; } + if query_list.contains(&post.author) { + // Author-match path (public-follow or self-authored self-query) + if matches!(visibility, PostVisibility::Public) { + return true; + } + } match visibility { - PostVisibility::Public => requester_follows.contains(&post.author), + PostVisibility::Public => query_list.contains(&post.author), PostVisibility::Encrypted { recipients } => { - recipients.iter().any(|wk| &wk.recipient == requester) + recipients.iter().any(|wk| query_list.contains(&wk.recipient)) } PostVisibility::GroupEncrypted { group_id, .. } => { group_members.get(group_id) - .map(|members| members.contains(requester)) + .map(|members| members.iter().any(|m| query_list.contains(m))) .unwrap_or(false) } } @@ -2414,8 +2434,9 @@ mod tests { wrapped_cek: vec![0u8; 60], }], }; - let follows = HashSet::new(); - assert!(should_send_post(&post, &visibility, &requester, &follows, &empty_groups())); + // Client includes own NodeId in query_list — server matches on recipient. + let query_list: HashSet = [requester].into_iter().collect(); + assert!(should_send_post(&post, &visibility, &requester, &query_list, &empty_groups())); } #[test] @@ -2430,8 +2451,8 @@ mod tests { wrapped_cek: vec![0u8; 60], }], }; - let follows = HashSet::new(); - assert!(!should_send_post(&post, &visibility, &requester, &follows, &empty_groups())); + let query_list: HashSet = [requester].into_iter().collect(); + assert!(!should_send_post(&post, &visibility, &requester, &query_list, &empty_groups())); } #[test] @@ -2445,10 +2466,10 @@ mod tests { epoch: 1, wrapped_cek: vec![0u8; 60], }; - let follows = HashSet::new(); + let query_list: HashSet = [requester].into_iter().collect(); let mut groups = HashMap::new(); groups.insert(group_id, [requester].into_iter().collect()); - assert!(should_send_post(&post, &visibility, &requester, &follows, &groups)); + assert!(should_send_post(&post, &visibility, &requester, &query_list, &groups)); } #[test] @@ -2463,9 +2484,26 @@ mod tests { epoch: 1, wrapped_cek: vec![0u8; 60], }; - let follows = HashSet::new(); + let query_list: HashSet = [requester].into_iter().collect(); let mut groups = HashMap::new(); groups.insert(group_id, [other].into_iter().collect()); - assert!(!should_send_post(&post, &visibility, &requester, &follows, &groups)); + assert!(!should_send_post(&post, &visibility, &requester, &query_list, &groups)); + } + + #[test] + fn encrypted_post_matches_via_query_list_even_for_third_party_recipient() { + // Scenario: A queries for DMs to B (A includes B in query_list). + // Post encrypted to B should match via recipient-in-query-list + // (even though A can't decrypt it, the pull itself still matches). + // This isn't a real client flow but verifies the uniform matching. + let author = make_node_id(1); + let requester = make_node_id(10); + let b = make_node_id(2); + let post = make_post(author); + let visibility = PostVisibility::Encrypted { + recipients: vec![WrappedKey { recipient: b, wrapped_cek: vec![0u8; 60] }], + }; + let query_list: HashSet = [b].into_iter().collect(); + assert!(should_send_post(&post, &visibility, &requester, &query_list, &empty_groups())); } } diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 31434bb..9c064a8 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -394,7 +394,14 @@ impl Storage { PRIMARY KEY (file_id, peer_id) ); CREATE INDEX IF NOT EXISTS idx_file_holders_recency - ON file_holders(file_id, last_interaction_ms DESC);", + ON file_holders(file_id, last_interaction_ms DESC); + CREATE TABLE IF NOT EXISTS post_recipients ( + post_id BLOB NOT NULL, + recipient BLOB NOT NULL, + PRIMARY KEY (post_id, recipient) + ); + CREATE INDEX IF NOT EXISTS idx_post_recipients_recipient + ON post_recipients(recipient);", )?; Ok(()) } @@ -674,6 +681,9 @@ impl Storage { DROP TABLE IF EXISTS post_downstream;", )?; + // 0.6.2-beta: seed post_recipients index from existing encrypted posts. + self.seed_post_recipients_from_posts()?; + Ok(()) } @@ -704,6 +714,9 @@ impl Storage { visibility_json, ], )?; + if inserted > 0 { + self.index_post_recipients(id, visibility)?; + } Ok(inserted > 0) } @@ -2385,6 +2398,7 @@ impl Storage { )?; if deleted > 0 { self.conn.execute("DELETE FROM file_holders WHERE file_id = ?1", params![record.post_id.as_slice()])?; + self.conn.execute("DELETE FROM post_recipients WHERE post_id = ?1", params![record.post_id.as_slice()])?; self.conn.execute("DELETE FROM seen_engagement WHERE post_id = ?1", params![record.post_id.as_slice()])?; } Ok(deleted > 0) @@ -2431,6 +2445,14 @@ impl Storage { "UPDATE posts SET visibility = ?1 WHERE id = ?2", params![vis_json, post_id.as_slice()], )?; + if updated > 0 { + // Rebuild recipient index from new visibility + self.conn.execute( + "DELETE FROM post_recipients WHERE post_id = ?1", + params![post_id.as_slice()], + )?; + self.index_post_recipients(post_id, new_visibility)?; + } Ok(updated > 0) } @@ -2459,6 +2481,9 @@ impl Storage { intent_json, ], )?; + if inserted > 0 { + self.index_post_recipients(id, visibility)?; + } Ok(inserted > 0) } @@ -4116,6 +4141,102 @@ impl Storage { Ok(result) } + // --- Post recipients index (for merged-pull recipient-match) --- + + /// Insert all recipient NodeIds for an encrypted post into post_recipients. + /// No-op for Public visibility. Called on post insert / visibility update. + fn index_post_recipients( + &self, + post_id: &PostId, + visibility: &PostVisibility, + ) -> anyhow::Result<()> { + match visibility { + PostVisibility::Public => Ok(()), + PostVisibility::Encrypted { recipients } => { + for wk in recipients { + self.conn.execute( + "INSERT OR IGNORE INTO post_recipients (post_id, recipient) VALUES (?1, ?2)", + params![post_id.as_slice(), wk.recipient.as_slice()], + )?; + } + Ok(()) + } + PostVisibility::GroupEncrypted { group_id, .. } => { + // For group-encrypted posts, index the group's members. + let members = self.get_all_group_members() + .ok() + .and_then(|m| m.get(group_id).cloned()) + .unwrap_or_default(); + for member in members { + self.conn.execute( + "INSERT OR IGNORE INTO post_recipients (post_id, recipient) VALUES (?1, ?2)", + params![post_id.as_slice(), member.as_slice()], + )?; + } + Ok(()) + } + } + } + + /// Return all post IDs for which any of the given NodeIds is a recipient. + /// Uses the idx_post_recipients_recipient index. + pub fn get_post_ids_for_recipients( + &self, + recipients: &[NodeId], + ) -> anyhow::Result> { + if recipients.is_empty() { + return Ok(Vec::new()); + } + let placeholders: Vec<&str> = (0..recipients.len()).map(|_| "?").collect(); + let sql = format!( + "SELECT DISTINCT post_id FROM post_recipients WHERE recipient IN ({})", + placeholders.join(",") + ); + let mut stmt = self.conn.prepare(&sql)?; + let params = rusqlite::params_from_iter(recipients.iter().map(|r| r.to_vec())); + let rows = stmt.query_map(params, |row| { + let bytes: Vec = row.get(0)?; + Ok(bytes) + })?; + let mut out = Vec::new(); + for row in rows { + if let Ok(pid) = <[u8; 32]>::try_from(row?.as_slice()) { + out.push(pid); + } + } + Ok(out) + } + + /// Seed the post_recipients index from existing encrypted posts. + /// One-time idempotent migration for users upgrading from pre-0.6.2. + pub fn seed_post_recipients_from_posts(&self) -> anyhow::Result<()> { + let existing: i64 = self.conn.prepare("SELECT COUNT(*) FROM post_recipients")? + .query_row([], |row| row.get(0))?; + if existing > 0 { + return Ok(()); + } + // Scan all posts, parse visibility, index recipients. + let mut stmt = self.conn.prepare("SELECT id, visibility FROM posts")?; + let rows = stmt.query_map([], |row| { + let id_bytes: Vec = row.get(0)?; + let vis_json: String = row.get(1)?; + Ok((id_bytes, vis_json)) + })?; + let entries: Vec<([u8; 32], PostVisibility)> = rows + .filter_map(|r| r.ok()) + .filter_map(|(id_bytes, vis_json)| { + let pid = <[u8; 32]>::try_from(id_bytes.as_slice()).ok()?; + let vis: PostVisibility = serde_json::from_str(&vis_json).ok()?; + Some((pid, vis)) + }) + .collect(); + drop(stmt); + for (pid, vis) in entries { + self.index_post_recipients(&pid, &vis)?; + } + Ok(()) + } + // --- File holders (flat, per-file, LRU-capped at 5) --- // // A single table for PostId-keyed engagement propagation and CID-keyed