Phase 2b: control-post flow (delete/visibility) + remove BlobDeleteNotice

Replaces two persona-signed direct pushes with CDN-propagated control posts:
a single `VisibilityIntent::Control` post type whose content is a signed
`ControlOp` the receiver verifies and applies. Deletes and visibility updates
now flow through the same neighbor-manifest CDN path as regular content — no
direct recipient push needed for persona-signed ops.

Core pieces:
- `VisibilityIntent::Control` + `VisibilityIntent::Profile` variants.
- `ControlOp::DeletePost` / `ControlOp::UpdateVisibility` (JSON, ed25519-signed
  by the target post's author over op-specific byte strings).
- `crypto::{sign,verify}_control_{delete,visibility}` signing primitives.
- `control::build_delete_control_post` + `build_visibility_control_post`
  for authors to construct control posts.
- `control::receive_post` — unified incoming-post path used by all 6 receive
  sites. Verifies control signatures BEFORE storing, so bogus controls never
  enter storage and can't be re-propagated via neighbor-manifest diffs.
- `control::apply_control_post_if_applicable` — executes the op under the
  same storage guard as the insert.

Feed filter:
- Feeds (`get_feed`, `get_feed_page`, `list_posts_page`,
  `list_posts_reverse_chron`) now exclude `Control` and `Profile` posts so
  they propagate + tombstone without surfacing.
- Sync/export path (`list_posts_with_visibility`) keeps its own unfiltered
  query so control posts still propagate via CDN.

Wire protocol:
- `SyncPost` carries `intent: Option<VisibilityIntent>` so control posts
  arrive with their intent preserved.
- `BlobDeleteNotice` (0x95) removed — orphan blobs on remote holders evict
  naturally via LRU rather than via a persona-signed push. Code path,
  payload, sender, tests, and `delete_blob_with_cdn_notify` all gone.

Tests: control delete roundtrip (apply + tombstone) and wrong-author
rejection (not stored, not applied). 112/112 core tests pass.
This commit is contained in:
Scott Reimers 2026-04-22 21:17:34 -04:00
parent 4da6a8dc85
commit 36b6a466d2
10 changed files with 585 additions and 196 deletions

View file

@ -1395,11 +1395,19 @@ impl ConnectionManager {
for sp in &response.posts {
if s.is_deleted(&sp.id)? { continue; }
if verify_post_id(&sp.id, &sp.post) {
if s.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? {
new_post_ids.push(sp.id);
posts_received += 1;
match crate::control::receive_post(&s, &sp.id, &sp.post, &sp.visibility, sp.intent.as_ref()) {
Ok(true) => {
new_post_ids.push(sp.id);
posts_received += 1;
synced_authors.insert(sp.post.author);
}
Ok(false) => {
synced_authors.insert(sp.post.author);
}
Err(e) => {
warn!(post_id = hex::encode(sp.id), error = %e, "rejecting post");
}
}
synced_authors.insert(sp.post.author);
}
}
}
@ -1961,11 +1969,17 @@ impl ConnectionManager {
let storage = self.storage.get().await;
for sp in &response.posts {
if verify_post_id(&sp.id, &sp.post) && !storage.is_deleted(&sp.id)? {
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
new_post_ids.push(sp.id);
synced_authors.insert(sp.post.author);
if sp.id == notification.post_id {
stored = true;
match crate::control::receive_post(&storage, &sp.id, &sp.post, &sp.visibility, sp.intent.as_ref()) {
Ok(_) => {
new_post_ids.push(sp.id);
synced_authors.insert(sp.post.author);
if sp.id == notification.post_id {
stored = true;
}
}
Err(e) => {
warn!(post_id = hex::encode(sp.id), error = %e, "rejecting post");
}
}
}
}
@ -2069,11 +2083,19 @@ impl ConnectionManager {
continue;
}
if verify_post_id(&sp.id, &sp.post) {
if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? {
new_post_ids.push(sp.id);
posts_received += 1;
match crate::control::receive_post(&storage, &sp.id, &sp.post, &sp.visibility, sp.intent.as_ref()) {
Ok(true) => {
new_post_ids.push(sp.id);
posts_received += 1;
synced_authors.insert(sp.post.author);
}
Ok(false) => {
synced_authors.insert(sp.post.author);
}
Err(e) => {
warn!(post_id = hex::encode(sp.id), error = %e, "rejecting post");
}
}
synced_authors.insert(sp.post.author);
}
}
}
@ -2294,12 +2316,15 @@ impl ConnectionManager {
}
}
// Phase 3: Brief re-lock for is_deleted checks on filtered posts
// Phase 3: Brief re-lock for is_deleted checks + intent fetch on filtered posts
let (posts, vis_updates) = {
let s = storage.get().await;
let posts_to_send: Vec<SyncPost> = candidates_to_send.into_iter()
.filter(|(id, _, _)| !s.is_deleted(id).unwrap_or(false))
.map(|(id, post, visibility)| SyncPost { id, post, visibility })
.map(|(id, post, visibility)| {
let intent = s.get_post_intent(&id).ok().flatten();
SyncPost { id, post, visibility, intent }
})
.collect();
(posts_to_send, vis_updates_to_send)
};
@ -4943,24 +4968,11 @@ impl ConnectionManager {
}
}
// Gather connections for CDN delete notices under lock, then send outside
let mut delete_notices: Vec<(iroh::endpoint::Connection, crate::protocol::BlobDeleteNoticePayload)> = Vec::new();
for (cid, holders) in &blob_cleanup {
let payload = crate::protocol::BlobDeleteNoticePayload { cid: *cid, upstream_node: None };
for (peer, _addrs) in holders {
if let Some(pc) = cm.connections_ref().get(peer) {
delete_notices.push((pc.connection.clone(), payload.clone()));
}
}
}
drop(cm);
// Send outside lock
for (conn, payload) in &delete_notices {
if let Ok(mut send) = conn.open_uni().await {
let _ = write_typed_message(&mut send, MessageType::BlobDeleteNotice, payload).await;
let _ = send.finish();
}
}
// BlobDeleteNotice removed in v0.6.2: orphaned blobs on remote
// holders are evicted naturally via LRU rather than by a
// persona-signed push.
let _ = blob_cleanup;
}
MessageType::VisibilityUpdate => {
let payload: crate::protocol::VisibilityUpdatePayload =
@ -5014,22 +5026,30 @@ impl ConnectionManager {
&& storage.get_post(&push.post.id)?.is_none()
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
{
let _ = storage.store_post_with_visibility(
match crate::control::receive_post(
&storage,
&push.post.id,
&push.post.post,
&push.post.visibility,
);
let _ = storage.touch_file_holder(
&push.post.id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
info!(
peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id),
"Received direct post push"
);
push.post.intent.as_ref(),
) {
Ok(_) => {
let _ = storage.touch_file_holder(
&push.post.id,
&remote_node_id,
&[],
crate::storage::HolderDirection::Received,
);
info!(
peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id),
"Received direct post push"
);
}
Err(e) => {
warn!(post_id = hex::encode(push.post.id), error = %e, "rejecting pushed post");
}
}
}
}
}
@ -5237,7 +5257,14 @@ impl ConnectionManager {
let stored = {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
let newly_stored = crate::control::receive_post(
&storage,
&sync_post.id,
&sync_post.post,
&sync_post.visibility,
sync_post.intent.as_ref(),
).unwrap_or(false);
if newly_stored {
let _ = storage.touch_file_holder(
&sync_post.id,
&sender_id,
@ -5327,24 +5354,6 @@ impl ConnectionManager {
"Received social disconnect notice"
);
}
MessageType::BlobDeleteNotice => {
let payload: crate::protocol::BlobDeleteNoticePayload =
read_payload(recv, MAX_PAYLOAD).await?;
let cm = conn_mgr.lock().await;
let storage = cm.storage.get().await;
let cid = payload.cid;
// Flat-holder model: drop the sender as a holder of this file.
// The author's DeleteRecord (separate signed message) is what
// triggers the actual blob removal for followers.
let _ = storage.remove_file_holder(&cid, &remote_node_id);
info!(
peer = hex::encode(remote_node_id),
cid = hex::encode(cid),
"Received blob delete notice"
);
}
MessageType::GroupKeyDistribute => {
let payload: GroupKeyDistributePayload = read_payload(recv, MAX_PAYLOAD).await?;
let cm = conn_mgr.lock().await;
@ -5675,11 +5684,13 @@ impl ConnectionManager {
};
let result = {
let store = storage.get().await;
store.get_post_with_visibility(&payload.post_id).ok().flatten()
let pv = store.get_post_with_visibility(&payload.post_id).ok().flatten();
let intent = store.get_post_intent(&payload.post_id).ok().flatten();
pv.map(|(p, v)| (p, v, intent))
};
let resp = if let Some((post, visibility)) = result {
let resp = if let Some((post, visibility, intent)) = result {
if matches!(visibility, PostVisibility::Public) {
crate::protocol::PostFetchResponsePayload { post_id: payload.post_id, found: true, post: Some(SyncPost { id: payload.post_id, post, visibility }) }
crate::protocol::PostFetchResponsePayload { post_id: payload.post_id, found: true, post: Some(SyncPost { id: payload.post_id, post, visibility, intent }) }
} else {
crate::protocol::PostFetchResponsePayload { post_id: payload.post_id, found: false, post: None }
}
@ -6195,7 +6206,13 @@ impl ConnectionManager {
let post_author = sp.post.author;
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let _ = crate::control::receive_post(
&storage,
&sp.id,
&sp.post,
&sp.visibility,
sp.intent.as_ref(),
);
let _ = storage.touch_file_holder(
&sp.id,
&sender,