Phase 2c: remove audience + PostPush + PostNotification + AudienceRequest/Response
v0.6.2 wire fork: every persona-identifying direct push is gone. Public posts propagate only through the CDN (pull + header-diff neighbor propagation). Encrypted posts propagate only through pull with merged author-or-recipient match. There is no remaining sender→recipient traffic correlation signal on the wire for content. Protocol (network-breaking): - Retire MessageType 0x42 (PostNotification), 0x43 (PostPush), 0x44 (AudienceRequest), 0x45 (AudienceResponse). Their payload structs are deleted along with the handlers and senders. - SocialDisconnectNotice (0x71) / SocialAddressUpdate (0x70) sender functions targeting audience are deleted; the existing handlers stay (both already dead code on the send side). Core removals: - `push_to_audience`, `notify_post`, `push_delete`, `push_disconnect_to_audience`, `push_address_update_to_audience`, `send_audience_request`, `send_audience_response`, `send_to_audience` — all gone from network.rs. - `handle_post_notification` removed from connection.rs. - `request_audience`, `approve_audience`, `deny_audience`, `remove_audience`, `list_audience_members`, `list_audience` removed from Node. - `audience_pushed` step removed from post creation. - `AudienceDirection`, `AudienceStatus`, `AudienceRecord`, `AudienceApprovalMode` removed from types. - Storage: `store_audience`, `list_audience`, `list_audience_members`, `remove_audience`, `row_to_audience_record`, `audience_crud` test, the `audience` CREATE TABLE, and the audience-dependent social route rebuild branch all removed. Upgraded DBs retain the orphan `audience` table; nothing touches it. Follow-on cleanups: - `SocialRelation::Audience` + `::Mutual` collapsed into just `Follow`. The Display/FromStr impl accepts legacy "audience"/"mutual" strings from pre-v0.6.2 DBs and maps them to Follow. - Blob-eviction priority function drops the audience factor; relationship is now own-author vs followed vs other. Tests updated accordingly. - `CommentPermission::AudienceOnly` → `FollowersOnly`. Check uses the author's public follows (`list_public_follows`) rather than a separate audience table. `ModerationMode::AudienceOnly` similarly renamed. - Follow/unfollow routines simplified: no audience downgrade logic; unfollow removes the social route entirely. UI: - CLI: `audience*` commands removed. - Tauri: `AudienceDto`, `list_audience`, `list_audience_outbound`, `request_audience`, `approve_audience`, `remove_audience` commands removed from invoke_handler. Frontend: audience panel and audience/mutual badges removed; compose permission dropdown shows "Followers" instead of "Audience"; `loadAudience` is a no-op stub that hides any leftover DOM. Tests: 111 / 111 core tests pass. Breaking change: v0.6.2 nodes won't interoperate with v0.6.1 for delete propagation, visibility updates, direct post push, post notifications, or audience requests. Upgrade both ends.
This commit is contained in:
parent
36b6a466d2
commit
eabdb7ba4f
10 changed files with 98 additions and 1140 deletions
|
|
@ -13,12 +13,11 @@ use crate::crypto;
|
|||
use crate::protocol::{
|
||||
read_message_type, read_payload, write_typed_message, AnchorReferral,
|
||||
AnchorReferralRequestPayload, AnchorReferralResponsePayload, AnchorRegisterPayload,
|
||||
AudienceRequestPayload, AudienceResponsePayload, BlobHeaderDiffPayload,
|
||||
BlobHeaderDiffPayload,
|
||||
BlobHeaderRequestPayload, BlobHeaderResponsePayload, BlobRequestPayload, BlobResponsePayload,
|
||||
CircleProfileUpdatePayload, GroupKeyDistributePayload, GroupKeyRequestPayload,
|
||||
GroupKeyResponsePayload, InitialExchangePayload, MeshPreferPayload,
|
||||
MessageType, NodeListUpdatePayload, PostDownstreamRegisterPayload,
|
||||
PostNotificationPayload, PostPushPayload,
|
||||
ProfileUpdatePayload, PullSyncRequestPayload, PullSyncResponsePayload,
|
||||
RefuseRedirectPayload, RelayIntroducePayload, RelayIntroduceResultPayload, SessionRelayPayload,
|
||||
SocialAddressUpdatePayload, SocialCheckinPayload, SocialDisconnectNoticePayload,
|
||||
|
|
@ -1894,138 +1893,6 @@ impl ConnectionManager {
|
|||
sent
|
||||
}
|
||||
|
||||
/// Handle an incoming post notification: if we follow the author, pull the post.
|
||||
/// `conn` is a fallback connection for ephemeral callers (not persistently connected).
|
||||
pub async fn handle_post_notification(
|
||||
&self,
|
||||
from: &NodeId,
|
||||
notification: PostNotificationPayload,
|
||||
conn: Option<&iroh::endpoint::Connection>,
|
||||
) -> anyhow::Result<bool> {
|
||||
let dominated = {
|
||||
let storage = self.storage.get().await;
|
||||
// Already have this post?
|
||||
if storage.get_post(¬ification.post_id)?.is_some() {
|
||||
return Ok(false);
|
||||
}
|
||||
// Do we follow the author?
|
||||
let follows = storage.list_follows()?;
|
||||
follows.contains(¬ification.author)
|
||||
};
|
||||
|
||||
if !dominated {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// We follow the author and don't have the post — pull it from the notifier
|
||||
let pull_conn = match self.connections.get(from) {
|
||||
Some(pc) => pc.connection.clone(),
|
||||
None => match conn {
|
||||
Some(c) => c.clone(),
|
||||
None => return Ok(false),
|
||||
},
|
||||
};
|
||||
|
||||
let (our_follows, follows_sync, our_personas) = {
|
||||
let storage = self.storage.get().await;
|
||||
(
|
||||
storage.list_follows()?,
|
||||
storage.get_follows_with_last_sync().unwrap_or_default(),
|
||||
storage.list_posting_identities().unwrap_or_default(),
|
||||
)
|
||||
};
|
||||
|
||||
// Merged pull: include every posting identity so DMs match recipient.
|
||||
let mut query_list = our_follows;
|
||||
for pi in &our_personas {
|
||||
if !query_list.contains(&pi.node_id) {
|
||||
query_list.push(pi.node_id);
|
||||
}
|
||||
}
|
||||
|
||||
let (mut send, mut recv) = pull_conn.open_bi().await?;
|
||||
let request = PullSyncRequestPayload {
|
||||
follows: query_list,
|
||||
have_post_ids: vec![], // v4: empty, using since_ms instead
|
||||
since_ms: follows_sync,
|
||||
};
|
||||
write_typed_message(&mut send, MessageType::PullSyncRequest, &request).await?;
|
||||
send.finish()?;
|
||||
|
||||
let _resp_type = read_message_type(&mut recv).await?;
|
||||
let response: PullSyncResponsePayload =
|
||||
read_payload(&mut recv, MAX_PAYLOAD).await?;
|
||||
|
||||
let now_ms = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64;
|
||||
let mut stored = false;
|
||||
let mut new_post_ids: Vec<PostId> = Vec::new();
|
||||
let mut synced_authors: HashSet<NodeId> = HashSet::new();
|
||||
|
||||
// Brief lock 1: store posts
|
||||
{
|
||||
let storage = self.storage.get().await;
|
||||
for sp in &response.posts {
|
||||
if verify_post_id(&sp.id, &sp.post) && !storage.is_deleted(&sp.id)? {
|
||||
match crate::control::receive_post(&storage, &sp.id, &sp.post, &sp.visibility, sp.intent.as_ref()) {
|
||||
Ok(_) => {
|
||||
new_post_ids.push(sp.id);
|
||||
synced_authors.insert(sp.post.author);
|
||||
if sp.id == notification.post_id {
|
||||
stored = true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(post_id = hex::encode(sp.id), error = %e, "rejecting post");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Lock RELEASED
|
||||
|
||||
// Brief lock 2: upstream + last_sync + visibility updates
|
||||
{
|
||||
let storage = self.storage.get().await;
|
||||
for pid in &new_post_ids {
|
||||
let _ = storage.touch_file_holder(
|
||||
pid,
|
||||
from,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
}
|
||||
for author in &synced_authors {
|
||||
let _ = storage.update_follow_last_sync(author, now_ms);
|
||||
}
|
||||
for vu in &response.visibility_updates {
|
||||
if let Some(post) = storage.get_post(&vu.post_id)? {
|
||||
if post.author == vu.author {
|
||||
let _ = storage.update_post_visibility(&vu.post_id, &vu.visibility);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Register as downstream for new posts (cap at 50 to avoid flooding)
|
||||
if !new_post_ids.is_empty() {
|
||||
let reg_conn = pull_conn.clone();
|
||||
tokio::spawn(async move {
|
||||
for post_id in new_post_ids.into_iter().take(50) {
|
||||
let payload = PostDownstreamRegisterPayload { post_id };
|
||||
if let Ok(mut send) = reg_conn.open_uni().await {
|
||||
let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, &payload).await;
|
||||
let _ = send.finish();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(stored)
|
||||
}
|
||||
|
||||
/// Pull posts from a connected peer.
|
||||
pub async fn pull_from_peer(&self, peer_id: &NodeId) -> anyhow::Result<PullSyncStats> {
|
||||
let pc = self
|
||||
|
|
@ -4987,110 +4854,6 @@ impl ConnectionManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
MessageType::PostNotification => {
|
||||
let notification: PostNotificationPayload =
|
||||
read_payload(recv, MAX_PAYLOAD).await?;
|
||||
info!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
post_id = hex::encode(notification.post_id),
|
||||
author = hex::encode(notification.author),
|
||||
"Received post notification"
|
||||
);
|
||||
let cm = conn_mgr.lock().await;
|
||||
match cm.handle_post_notification(&remote_node_id, notification, None).await {
|
||||
Ok(true) => {
|
||||
info!(peer = hex::encode(remote_node_id), "Pulled post from notification");
|
||||
}
|
||||
Ok(false) => {
|
||||
info!(peer = hex::encode(remote_node_id), "Post notification ignored (not following or already have)");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(peer = hex::encode(remote_node_id), error = %e, "Post notification pull failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageType::PostPush => {
|
||||
let push: PostPushPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||
// Encrypted posts are no longer accepted via direct push — they propagate
|
||||
// via the CDN to eliminate the sender→recipient traffic signal.
|
||||
if !matches!(push.post.visibility, crate::types::PostVisibility::Public) {
|
||||
debug!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
post_id = hex::encode(push.post.id),
|
||||
"Ignoring non-public PostPush"
|
||||
);
|
||||
} else {
|
||||
let cm = conn_mgr.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
if !storage.is_deleted(&push.post.id)?
|
||||
&& storage.get_post(&push.post.id)?.is_none()
|
||||
&& crate::content::verify_post_id(&push.post.id, &push.post.post)
|
||||
{
|
||||
match crate::control::receive_post(
|
||||
&storage,
|
||||
&push.post.id,
|
||||
&push.post.post,
|
||||
&push.post.visibility,
|
||||
push.post.intent.as_ref(),
|
||||
) {
|
||||
Ok(_) => {
|
||||
let _ = storage.touch_file_holder(
|
||||
&push.post.id,
|
||||
&remote_node_id,
|
||||
&[],
|
||||
crate::storage::HolderDirection::Received,
|
||||
);
|
||||
info!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
post_id = hex::encode(push.post.id),
|
||||
"Received direct post push"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(post_id = hex::encode(push.post.id), error = %e, "rejecting pushed post");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageType::AudienceRequest => {
|
||||
let req: AudienceRequestPayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||
info!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
requester = hex::encode(req.requester),
|
||||
"Received audience request"
|
||||
);
|
||||
let cm = conn_mgr.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
// Store as inbound pending request
|
||||
let _ = storage.store_audience(
|
||||
&req.requester,
|
||||
crate::types::AudienceDirection::Inbound,
|
||||
crate::types::AudienceStatus::Pending,
|
||||
);
|
||||
}
|
||||
MessageType::AudienceResponse => {
|
||||
let resp: AudienceResponsePayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||
let status = if resp.approved { "approved" } else { "denied" };
|
||||
info!(
|
||||
peer = hex::encode(remote_node_id),
|
||||
responder = hex::encode(resp.responder),
|
||||
status,
|
||||
"Received audience response"
|
||||
);
|
||||
let cm = conn_mgr.lock().await;
|
||||
let storage = cm.storage.get().await;
|
||||
let new_status = if resp.approved {
|
||||
crate::types::AudienceStatus::Approved
|
||||
} else {
|
||||
crate::types::AudienceStatus::Denied
|
||||
};
|
||||
let _ = storage.store_audience(
|
||||
&resp.responder,
|
||||
crate::types::AudienceDirection::Outbound,
|
||||
new_status,
|
||||
);
|
||||
}
|
||||
MessageType::SocialAddressUpdate => {
|
||||
let payload: SocialAddressUpdatePayload = read_payload(recv, MAX_PAYLOAD).await?;
|
||||
let cm = conn_mgr.lock().await;
|
||||
|
|
@ -6280,18 +6043,18 @@ impl ConnectionManager {
|
|||
async fn handle_blob_header_diff(&self, payload: BlobHeaderDiffPayload, sender: NodeId) {
|
||||
use crate::types::BlobHeaderDiffOp;
|
||||
|
||||
// Gather policy + audience data + holders, then drop lock immediately.
|
||||
// Gather policy + followers set + holders, then drop lock immediately.
|
||||
// Remote peer clearly holds this post — record them as a holder.
|
||||
let (policy, approved_audience, holders) = {
|
||||
// v0.6.2: `AudienceOnly` → `FollowersOnly`; checked against our public
|
||||
// follows list rather than a separate audience table.
|
||||
let (policy, followers_set, holders) = {
|
||||
let storage = self.storage.get().await;
|
||||
let policy = storage.get_comment_policy(&payload.post_id)
|
||||
.ok()
|
||||
.flatten()
|
||||
.unwrap_or_default();
|
||||
let approved = storage.list_audience(
|
||||
crate::types::AudienceDirection::Inbound,
|
||||
Some(crate::types::AudienceStatus::Approved),
|
||||
).unwrap_or_default();
|
||||
let follows: std::collections::HashSet<NodeId> =
|
||||
storage.list_public_follows().unwrap_or_default().into_iter().collect();
|
||||
let _ = storage.touch_file_holder(
|
||||
&payload.post_id,
|
||||
&sender,
|
||||
|
|
@ -6303,12 +6066,9 @@ impl ConnectionManager {
|
|||
.into_iter()
|
||||
.map(|(nid, _addrs)| nid)
|
||||
.collect();
|
||||
(policy, approved, holders)
|
||||
(policy, follows, holders)
|
||||
};
|
||||
|
||||
// Filter ops using gathered data (no lock held)
|
||||
let audience_set: std::collections::HashSet<NodeId> = approved_audience.iter().map(|a| a.node_id).collect();
|
||||
|
||||
// Apply ops in a short lock acquisition
|
||||
{
|
||||
let storage = self.storage.get().await;
|
||||
|
|
@ -6344,8 +6104,8 @@ impl ConnectionManager {
|
|||
}
|
||||
match policy.allow_comments {
|
||||
crate::types::CommentPermission::None => continue,
|
||||
crate::types::CommentPermission::AudienceOnly => {
|
||||
if !audience_set.contains(&comment.author) {
|
||||
crate::types::CommentPermission::FollowersOnly => {
|
||||
if !followers_set.contains(&comment.author) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue