use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; use tracing::{debug, info, warn}; use crate::activity::{ActivityCategory, ActivityEvent, ActivityLevel, ActivityLog}; use crate::blob::BlobStore; use crate::content::compute_post_id; use crate::crypto; use crate::network::Network; use crate::storage::StoragePool; use crate::types::{ Attachment, AudienceDirection, AudienceRecord, AudienceStatus, Circle, DeleteRecord, DeviceProfile, DeviceRole, NodeId, PeerRecord, PeerSlotKind, PeerWithAddress, Post, PostId, PostVisibility, PublicProfile, ReachMethod, RevocationMode, SessionReachMethod, SocialRelation, SocialRouteEntry, SocialStatus, VisibilityIntent, VisibilityUpdate, WormResult, }; /// Built-in default anchor — always available as a bootstrap fallback. const DEFAULT_ANCHOR: &str = "17af141956ae0b50dc1cb9248cadf5fca371ea2d8531ac9add3c03caffc61441@itsgoin.net:4433"; /// A distsoc node: ties together identity, storage, and networking pub struct Node { pub data_dir: PathBuf, pub storage: Arc, pub network: Arc, pub node_id: NodeId, pub blob_store: Arc, secret_seed: [u8; 32], bootstrap_anchors: Vec<(NodeId, iroh::EndpointAddr)>, #[allow(dead_code)] profile: DeviceProfile, pub activity_log: Arc>, pub last_rebalance_ms: Arc, pub last_anchor_register_ms: Arc, /// CDN replication budget: bytes remaining we're willing to pull and cache this hour replication_budget_remaining: Arc, /// CDN delivery budget: bytes remaining we're willing to serve this hour delivery_budget_remaining: Arc, /// Last budget reset timestamp (ms) budget_last_reset_ms: Arc, } impl Node { /// Create or open a node in the given data directory (Desktop profile) pub async fn open(data_dir: impl AsRef) -> anyhow::Result { Self::open_with_bind(data_dir, None, DeviceProfile::Desktop).await } /// Create or open a mobile node in the given data directory pub async fn open_mobile(data_dir: impl AsRef) -> anyhow::Result { Self::open_with_bind(data_dir, None, DeviceProfile::Mobile).await } /// Create or open a node, optionally binding to a specific address pub async fn open_with_bind( data_dir: impl AsRef, bind_addr: Option, profile: DeviceProfile, ) -> anyhow::Result { let data_dir = data_dir.as_ref().to_path_buf(); std::fs::create_dir_all(&data_dir)?; // Load or generate identity key let key_path = data_dir.join("identity.key"); let (secret_key, secret_seed) = if key_path.exists() { let key_bytes = std::fs::read(&key_path)?; let bytes: [u8; 32] = key_bytes .try_into() .map_err(|_| anyhow::anyhow!("invalid key file"))?; (iroh::SecretKey::from_bytes(&bytes), bytes) } else { let key = iroh::SecretKey::generate(&mut rand::rng()); let seed = key.to_bytes(); std::fs::write(&key_path, seed)?; info!("Generated new identity key"); (key, seed) }; // Open storage let db_path = data_dir.join("itsgoin.db"); let storage = Arc::new(StoragePool::open(&db_path)?); // Startup sweep: clear stale N2/N3 and mesh_peers from prior session { let s = storage.get().await; let n_cleared = s.clear_all_n2_n3().unwrap_or(0); let m_cleared = s.clear_all_mesh_peers().unwrap_or(0); if n_cleared > 0 || m_cleared > 0 { info!(n2_n3 = n_cleared, mesh_peers = m_cleared, "Startup sweep: cleared stale entries"); } } // Open blob store let blob_store = Arc::new(BlobStore::open(&data_dir)?); // Activity log + timer atomics let activity_log = Arc::new(std::sync::Mutex::new(ActivityLog::new())); let last_rebalance_ms = Arc::new(AtomicU64::new(0)); let last_anchor_register_ms = Arc::new(AtomicU64::new(0)); // Start network (v2: single ALPN, connection manager) let network = Arc::new( Network::new(secret_key, Arc::clone(&storage), bind_addr, secret_seed, Arc::clone(&blob_store), profile, Arc::clone(&activity_log)).await?, ); let node_id = network.node_id_bytes(); // Auto-follow ourselves so our own posts show in the feed { let s = storage.get().await; s.add_follow(&node_id)?; } // Bootstrap: if peers table is empty, try bootstrap.json then default anchor { let s = storage.get().await; let has_peers = s.has_peers()?; drop(s); if !has_peers { let mut entries = Vec::new(); let bootstrap_path = data_dir.join("bootstrap.json"); if bootstrap_path.exists() { info!("Loading bootstrap peers from {:?}", bootstrap_path); if let Ok(data) = std::fs::read_to_string(&bootstrap_path) { if let Ok(file_entries) = serde_json::from_str::>(&data) { entries.extend(file_entries); } } } let default = DEFAULT_ANCHOR.to_string(); if !entries.contains(&default) { entries.push(default); } for entry in entries { match crate::parse_connect_string(&entry) { Ok((nid, addr)) => { if nid == node_id { continue; } info!(peer = hex::encode(nid), "Bootstrap: connecting to peer"); let ip_addrs: Vec<_> = addr.ip_addrs().copied().collect(); { let s = storage.get().await; if ip_addrs.is_empty() { let _ = s.add_peer(&nid); } else { let _ = s.upsert_peer(&nid, &ip_addrs, None); } // Mark as anchor — bootstrap peers are infrastructure, not social follows let _ = s.set_peer_anchor(&nid, true); } // Connect persistently match network.connect_to_peer(nid, addr).await { Ok(()) => { info!(peer = hex::encode(nid), "Bootstrap: connected"); // Pull posts from the bootstrap peer match network.pull_from_all().await { Ok(stats) => { info!( "Bootstrap pull: {} posts from {} peers", stats.posts_received, stats.peers_pulled ); } Err(e) => warn!(error = %e, "Bootstrap pull failed"), } // Always store anchor in known_anchors (even before referrals) // so the periodic cycle can re-register and request referrals later { let s = storage.get().await; let anchor_addrs: Vec = s.get_peer_record(&nid) .ok().flatten() .map(|r| r.addresses).unwrap_or_default(); if !anchor_addrs.is_empty() { let _ = s.upsert_known_anchor(&nid, &anchor_addrs); } else if !ip_addrs.is_empty() { let _ = s.upsert_known_anchor(&nid, &ip_addrs); } } // Request referrals from anchor (10s timeout) match tokio::time::timeout(std::time::Duration::from_secs(10), network.request_anchor_referrals(&nid)).await { Ok(Ok(referrals)) if !referrals.is_empty() => { info!(count = referrals.len(), "Bootstrap: got anchor referrals"); // Spawn referral connections in background — don't block startup let net = Arc::clone(&network); let my_id = node_id; let anchor = nid; tokio::spawn(async move { for referral in referrals { if referral.node_id == my_id { continue; } if let Some(addr_str) = referral.addresses.first() { let connect_str = format!( "{}@{}", hex::encode(referral.node_id), addr_str, ); if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) { let connect_fut = async { match net.connect_to_peer(rid, raddr).await { Ok(()) => { info!(peer = hex::encode(rid), "Connected to referred peer"); Ok(()) }, Err(e) => { debug!(error = %e, peer = hex::encode(rid), "One-sided connect failed, requesting introduction from anchor"); match net.connect_via_introduction(rid, anchor).await { Ok(()) => { info!(peer = hex::encode(rid), "Connected to referred peer via hole punch"); Ok(()) }, Err(e2) => Err(e2), } } } }; match tokio::time::timeout(std::time::Duration::from_secs(15), connect_fut).await { Ok(Ok(())) => {}, Ok(Err(e)) => debug!(error = %e, peer = hex::encode(rid), "Bootstrap referral connect failed"), Err(_) => debug!(peer = hex::encode(rid), "Bootstrap referral connect timed out"), } } } } net.notify_growth().await; }); } Ok(Ok(_)) => debug!("Bootstrap: no referrals from anchor (first to register)"), Ok(Err(e)) => debug!(error = %e, "Bootstrap: referral request failed"), Err(_) => debug!("Bootstrap: referral request timed out"), } break; } Err(e) => { warn!(error = %e, "Bootstrap peer failed, trying next"); } } } Err(e) => { warn!(entry = %entry, error = %e, "Invalid bootstrap entry"); } } } } } // Load bootstrap anchors: anchors.json + built-in default let mut bootstrap_anchors = Vec::new(); let mut anchor_ids = std::collections::HashSet::new(); let anchors_path = data_dir.join("anchors.json"); if anchors_path.exists() { if let Ok(data) = std::fs::read_to_string(&anchors_path) { if let Ok(entries) = serde_json::from_str::>(&data) { for entry in entries { match crate::parse_connect_string(&entry) { Ok((nid, addr)) => { info!(peer = hex::encode(nid), "Loaded bootstrap anchor"); anchor_ids.insert(nid); bootstrap_anchors.push((nid, addr)); } Err(e) => { warn!(entry = %entry, error = %e, "Invalid bootstrap anchor entry"); } } } } } } if let Ok((nid, addr)) = crate::parse_connect_string(DEFAULT_ANCHOR) { if nid != node_id && !anchor_ids.contains(&nid) { info!("Including built-in default anchor"); bootstrap_anchors.push((nid, addr)); } } // Collect bootstrap anchor node IDs so we can deprioritize them let bootstrap_anchor_ids: std::collections::HashSet = bootstrap_anchors.iter().map(|(nid, _)| *nid).collect(); // Update known_anchors + peers with freshly DNS-resolved bootstrap addresses. // Without this, stale IPv6 addresses from previous sessions can block reconnection // on devices without IPv6 connectivity (see bugs-fixed.md #1). { let s = storage.get().await; for (nid, addr) in &bootstrap_anchors { let ip_addrs: Vec = addr.ip_addrs().copied().collect(); if !ip_addrs.is_empty() { let _ = s.upsert_known_anchor(nid, &ip_addrs); let _ = s.upsert_peer(nid, &ip_addrs, None); } } } // Rebuild social routes from follows + audience { let s = storage.get().await; match s.rebuild_social_routes() { Ok(count) if count > 0 => info!(count, "Rebuilt social routes on startup"), _ => {} } } // Startup connection: try discovered anchors FIRST, bootstrap anchors LAST. // This keeps load off bootstrap anchors — they're only needed when nothing else works. // Order: known non-bootstrap anchors → mDNS (via iroh) → bootstrap anchors { let conn_count = network.connection_count().await; if conn_count < 5 { let known = { let s = storage.get().await; s.list_known_anchors().unwrap_or_default() }; // Split into discovered anchors (priority) and bootstrap anchors (fallback) let (discovered, bootstrap_known): (Vec<_>, Vec<_>) = known.into_iter() .partition(|(nid, _)| !bootstrap_anchor_ids.contains(nid)); // Phase 1: Try discovered (non-bootstrap) anchors first let mut connected_anchor = None; for (anchor_nid, anchor_addrs) in &discovered { if *anchor_nid == node_id || network.is_peer_connected_or_session(anchor_nid).await { continue; } let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) { Ok(eid) => eid, Err(_) => continue, }; let mut addr = iroh::EndpointAddr::from(endpoint_id); for sa in anchor_addrs { addr = addr.with_ip_addr(*sa); } info!(peer = hex::encode(anchor_nid), "Trying discovered anchor"); match tokio::time::timeout(std::time::Duration::from_secs(10), network.connect_to_anchor(*anchor_nid, addr)).await { Ok(Ok(())) => { info!(peer = hex::encode(anchor_nid), "Connected to discovered anchor"); connected_anchor = Some(*anchor_nid); break; } Ok(Err(e)) => debug!(error = %e, peer = hex::encode(anchor_nid), "Discovered anchor: connect failed"), Err(_) => debug!(peer = hex::encode(anchor_nid), "Discovered anchor: connect timed out"), } } // Phase 2: Fall back to bootstrap anchors only if no discovered anchor worked if connected_anchor.is_none() { for (anchor_nid, anchor_addrs) in &bootstrap_known { if *anchor_nid == node_id || network.is_peer_connected_or_session(anchor_nid).await { continue; } let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) { Ok(eid) => eid, Err(_) => continue, }; let mut addr = iroh::EndpointAddr::from(endpoint_id); for sa in anchor_addrs { addr = addr.with_ip_addr(*sa); } info!(peer = hex::encode(anchor_nid), "Trying bootstrap anchor (fallback)"); match tokio::time::timeout(std::time::Duration::from_secs(10), network.connect_to_anchor(*anchor_nid, addr)).await { Ok(Ok(())) => { info!(peer = hex::encode(anchor_nid), "Connected to bootstrap anchor"); connected_anchor = Some(*anchor_nid); break; } Ok(Err(e)) => debug!(error = %e, peer = hex::encode(anchor_nid), "Bootstrap anchor: connect failed"), Err(_) => debug!(peer = hex::encode(anchor_nid), "Bootstrap anchor: connect timed out"), } } } // Phase 3: NAT probe + referrals from whichever anchor we connected to if let Some(anchor_nid) = connected_anchor { match tokio::time::timeout( std::time::Duration::from_secs(15), network.request_nat_filter_probe(&anchor_nid), ).await { Ok(Ok(())) => info!("NAT filter probe completed during bootstrap"), Ok(Err(e)) => warn!(error = %e, "NAT filter probe failed during bootstrap"), Err(_) => warn!("NAT filter probe timed out during bootstrap"), } match tokio::time::timeout(std::time::Duration::from_secs(10), network.request_anchor_referrals(&anchor_nid)).await { Ok(Ok(referrals)) if !referrals.is_empty() => { info!(count = referrals.len(), "Got anchor referrals"); let net = Arc::clone(&network); let my_id = node_id; let anchor = anchor_nid; tokio::spawn(async move { for referral in referrals { if referral.node_id == my_id { continue; } if let Some(addr_str) = referral.addresses.first() { let connect_str = format!( "{}@{}", hex::encode(referral.node_id), addr_str, ); if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) { let connect_fut = async { match net.connect_to_peer(rid, raddr).await { Ok(()) => { info!(peer = hex::encode(rid), "Connected to referred peer"); Ok(()) }, Err(_) => { match net.connect_via_introduction(rid, anchor).await { Ok(()) => { info!(peer = hex::encode(rid), "Connected via hole punch"); Ok(()) }, Err(e) => Err(e), } } } }; match tokio::time::timeout(std::time::Duration::from_secs(15), connect_fut).await { Ok(Ok(())) => {}, Ok(Err(e)) => debug!(error = %e, peer = hex::encode(rid), "Referral connect failed"), Err(_) => debug!(peer = hex::encode(rid), "Referral connect timed out"), } } } } net.notify_growth().await; }); } Ok(Ok(_)) => debug!("No referrals from anchor"), Ok(Err(e)) => debug!(error = %e, "Referral request failed"), Err(_) => debug!("Referral request timed out"), } } } } // Initialize CDN replication budgets based on device role let role = network.device_role(); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let replication_budget_remaining = Arc::new(AtomicU64::new(role.replication_limit())); let delivery_budget_remaining = Arc::new(AtomicU64::new(role.delivery_limit())); let budget_last_reset_ms = Arc::new(AtomicU64::new(now)); // Set delivery budget on blob store (shared with ConnectionManager) blob_store.set_delivery_budget(role.delivery_limit()); Ok(Self { data_dir, storage, network, node_id, blob_store, secret_seed, bootstrap_anchors, profile, activity_log, last_rebalance_ms, last_anchor_register_ms, replication_budget_remaining, delivery_budget_remaining, budget_last_reset_ms, }) } /// Get recent activity events (for diagnostics UI). pub fn get_activity_log(&self, limit: usize) -> Vec { self.activity_log.lock().unwrap().recent(limit) } /// Get timer state: (last_rebalance_ms, last_anchor_register_ms). pub fn timer_state(&self) -> (u64, u64) { ( self.last_rebalance_ms.load(AtomicOrdering::Relaxed), self.last_anchor_register_ms.load(AtomicOrdering::Relaxed), ) } /// Get the secret seed bytes (for crypto operations by consumers like Tauri) pub fn secret_seed_bytes(&self) -> [u8; 32] { self.secret_seed } // --- CDN Replication Budget --- /// Reset budgets if an hour has elapsed since last reset. fn maybe_reset_budgets(&self) { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let last = self.budget_last_reset_ms.load(AtomicOrdering::Relaxed); if now.saturating_sub(last) >= 3_600_000 { let role = self.network.device_role(); self.replication_budget_remaining.store(role.replication_limit(), AtomicOrdering::Relaxed); self.delivery_budget_remaining.store(role.delivery_limit(), AtomicOrdering::Relaxed); self.budget_last_reset_ms.store(now, AtomicOrdering::Relaxed); debug!(role = %role, "CDN budgets reset for new hour"); } } /// Try to consume replication budget. Returns true if within budget. pub fn consume_replication_budget(&self, bytes: u64) -> bool { self.maybe_reset_budgets(); let prev = self.replication_budget_remaining.fetch_update( AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |current| { if current >= bytes { Some(current - bytes) } else { None } }, ); prev.is_ok() } /// Try to consume delivery budget. Returns true if within budget. pub fn consume_delivery_budget(&self, bytes: u64) -> bool { self.maybe_reset_budgets(); let prev = self.delivery_budget_remaining.fetch_update( AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |current| { if current >= bytes { Some(current - bytes) } else { None } }, ); prev.is_ok() } /// Get remaining replication budget bytes. pub fn replication_budget_remaining(&self) -> u64 { self.maybe_reset_budgets(); self.replication_budget_remaining.load(AtomicOrdering::Relaxed) } /// Get remaining delivery budget bytes. pub fn delivery_budget_remaining(&self) -> u64 { self.maybe_reset_budgets(); self.delivery_budget_remaining.load(AtomicOrdering::Relaxed) } // ---- Identity export/import ---- pub fn export_identity_hex(&self) -> anyhow::Result { let key_path = self.data_dir.join("identity.key"); let key_bytes = std::fs::read(&key_path)?; Ok(hex::encode(key_bytes)) } pub fn import_identity(data_dir: &Path, hex_key: &str) -> anyhow::Result<()> { std::fs::create_dir_all(data_dir)?; let key_path = data_dir.join("identity.key"); if key_path.exists() { anyhow::bail!("identity.key already exists in {:?} — refusing to overwrite", data_dir); } let bytes = hex::decode(hex_key)?; if bytes.len() != 32 { anyhow::bail!("key must be exactly 32 bytes (64 hex chars), got {} bytes", bytes.len()); } std::fs::write(&key_path, &bytes)?; Ok(()) } /// Get up to 10 currently-connected peer NodeIds (for recent_peers in profile). /// Prefers social peers, then wide. async fn current_recent_peers(&self) -> Vec { let conns = self.network.connection_info().await; let mut social: Vec = Vec::new(); let mut wide: Vec = Vec::new(); for (nid, kind, _) in conns { if nid == self.node_id { continue; } match kind { PeerSlotKind::Preferred | PeerSlotKind::Local => social.push(nid), PeerSlotKind::Wide => wide.push(nid), } } let mut result = social; result.extend(wide); result.truncate(10); result } // ---- Posts ---- pub async fn create_post(&self, content: String) -> anyhow::Result<(PostId, Post)> { let (id, post, _vis) = self .create_post_with_visibility(content, VisibilityIntent::Public, vec![]) .await?; Ok((id, post)) } pub async fn create_post_with_visibility( &self, content: String, intent: VisibilityIntent, attachment_data: Vec<(Vec, String)>, ) -> anyhow::Result<(PostId, Post, PostVisibility)> { // Validate attachments if attachment_data.len() > 4 { anyhow::bail!("max 4 attachments per post"); } for (data, _) in &attachment_data { if data.len() > 10 * 1024 * 1024 { anyhow::bail!("attachment exceeds 10MB limit"); } } let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; // Determine encryption parameters and generate CEK if needed. // The CEK is generated BEFORE both content and blob encryption so they share the same key. enum EncryptionMode { Public, Recipient { cek: [u8; 32], recipients: Vec }, Group { cek: [u8; 32], group_id: [u8; 32], epoch: u64, group_seed: [u8; 32], group_pubkey: [u8; 32] }, } let mode = match &intent { VisibilityIntent::Public => EncryptionMode::Public, VisibilityIntent::Circle(circle_name) => { // Try group encryption first let group_info = { let storage = self.storage.get().await; storage.get_group_key_by_circle(circle_name)? .and_then(|gk| { storage.get_group_seed(&gk.group_id, gk.epoch).ok().flatten() .map(|seed| (gk.group_id, gk.epoch, seed, gk.group_public_key)) }) }; if let Some((group_id, epoch, group_seed, group_pubkey)) = group_info { let mut cek = [0u8; 32]; rand::RngCore::fill_bytes(&mut rand::rng(), &mut cek); EncryptionMode::Group { cek, group_id, epoch, group_seed, group_pubkey } } else { let recipients = self.resolve_recipients(&intent).await?; if recipients.is_empty() { anyhow::bail!("no recipients resolved for this visibility"); } let mut cek = [0u8; 32]; rand::RngCore::fill_bytes(&mut rand::rng(), &mut cek); EncryptionMode::Recipient { cek, recipients } } } _ => { let recipients = self.resolve_recipients(&intent).await?; if recipients.is_empty() { anyhow::bail!("no recipients resolved for this visibility"); } let mut cek = [0u8; 32]; rand::RngCore::fill_bytes(&mut rand::rng(), &mut cek); EncryptionMode::Recipient { cek, recipients } } }; // Store blob files — for encrypted posts, encrypt each blob with the shared CEK. // CID is computed on the ciphertext so peers can verify what they store. let mut attachments = Vec::with_capacity(attachment_data.len()); for (data, mime) in &attachment_data { let (store_data, size) = match &mode { EncryptionMode::Public => { (data.clone(), data.len() as u64) } EncryptionMode::Recipient { cek, .. } | EncryptionMode::Group { cek, .. } => { let encrypted = crypto::encrypt_bytes_with_cek(data, cek)?; let sz = encrypted.len() as u64; (encrypted, sz) } }; let cid = crate::blob::compute_blob_id(&store_data); self.blob_store.store(&cid, &store_data)?; attachments.push(Attachment { cid, mime_type: mime.clone(), size_bytes: size, }); } // Encrypt content and build visibility let (final_content, visibility) = match mode { EncryptionMode::Public => (content, PostVisibility::Public), EncryptionMode::Recipient { cek, recipients } => { let (encrypted, wrapped_keys) = crypto::encrypt_post_with_cek(&content, &cek, &self.secret_seed, &self.node_id, &recipients)?; ( encrypted, PostVisibility::Encrypted { recipients: wrapped_keys, }, ) } EncryptionMode::Group { cek, group_id, epoch, group_seed, group_pubkey } => { let (encrypted, wrapped_cek) = crypto::encrypt_post_for_group_with_cek(&content, &cek, &group_seed, &group_pubkey)?; ( encrypted, PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek, }, ) } }; let post = Post { author: self.node_id, content: final_content, attachments, timestamp_ms: now, }; let post_id = compute_post_id(&post); { let storage = self.storage.get().await; storage.store_post_with_intent(&post_id, &post, &visibility, &intent)?; for att in &post.attachments { storage.record_blob(&att.cid, &post_id, &self.node_id, att.size_bytes, &att.mime_type, now)?; // Auto-pin own blobs so they're never evicted before foreign content let _ = storage.pin_blob(&att.cid); } // Initialize encrypted receipt + comment slots for non-public posts if !matches!(visibility, PostVisibility::Public) { let participant_count = match &visibility { PostVisibility::Encrypted { recipients } => recipients.len(), PostVisibility::GroupEncrypted { .. } => { // For group posts, we don't know exact member count at creation time; // use a reasonable default (the circle members count, resolved earlier) match &intent { VisibilityIntent::Circle(circle_name) => { storage.get_circle_members(circle_name) .map(|m| m.len() + 1) // +1 for author .unwrap_or(2) } _ => 2, } } PostVisibility::Public => unreachable!(), }; let receipt_slots: Vec> = (0..participant_count) .map(|_| crypto::random_slot_noise(64)) .collect(); let comment_slot_count = (participant_count + 2) / 3; // ceil(participants / 3) let comment_slots: Vec> = (0..comment_slot_count) .map(|_| crypto::random_slot_noise(256)) .collect(); let blob_header = crate::types::BlobHeader { post_id, author: self.node_id, reactions: vec![], comments: vec![], policy: Default::default(), updated_at: now, thread_splits: vec![], receipt_slots, comment_slots, }; let header_json = serde_json::to_string(&blob_header)?; storage.store_blob_header(&post_id, &self.node_id, &header_json, now)?; } } // Build and store CDN manifests for blobs if !post.attachments.is_empty() { let storage = self.storage.get().await; let (previous, _following) = storage.get_author_post_neighborhood(&self.node_id, now, 10)?; drop(storage); let manifest = crate::types::AuthorManifest { post_id, author: self.node_id, author_addresses: self.network.our_addresses(), created_at: now, updated_at: now, previous_posts: previous, following_posts: vec![], signature: vec![], }; let sig = crypto::sign_manifest(&self.secret_seed, &manifest); let mut manifest = manifest; manifest.signature = sig; let manifest_json = serde_json::to_string(&manifest)?; { let storage = self.storage.get().await; for att in &post.attachments { storage.store_cdn_manifest(&att.cid, &manifest_json, &self.node_id, now)?; } } // Update previous posts' manifests to include this new post as a following_post self.update_neighbor_manifests(&post_id, now).await; // Push updated manifests to downstream peers let manifests_to_push = { let storage = self.storage.get().await; storage.get_manifests_for_author_blobs(&self.node_id).unwrap_or_default() }; let our_addrs = self.network.our_addresses(); for (push_cid, push_json) in &manifests_to_push { if let Ok(author_manifest) = serde_json::from_str::(push_json) { let cdn_manifest = crate::types::CdnManifest { author_manifest: author_manifest, host: self.node_id, host_addresses: our_addrs.clone(), source: self.node_id, source_addresses: our_addrs.clone(), downstream_count: 0, }; self.network.push_manifest_to_downstream(push_cid, &cdn_manifest).await; } } } // For encrypted posts, push directly to recipients let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await; // For public posts, push to audience members let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await; info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post"); Ok((post_id, post, visibility)) } /// Update the manifests of recent prior posts to include a newly created post /// in their following_posts list. Re-signs each updated manifest. async fn update_neighbor_manifests(&self, new_post_id: &PostId, new_timestamp_ms: u64) { let storage = self.storage.get().await; let manifests = match storage.get_manifests_for_author_blobs(&self.node_id) { Ok(m) => m, Err(e) => { warn!("Failed to get manifests for neighbor update: {}", e); return; } }; drop(storage); let new_entry = crate::types::ManifestEntry { post_id: *new_post_id, timestamp_ms: new_timestamp_ms, has_attachments: true, }; for (cid, json) in manifests { let mut manifest: crate::types::AuthorManifest = match serde_json::from_str(&json) { Ok(m) => m, Err(_) => continue, }; // Only update if this manifest's post was created before the new post if manifest.created_at >= new_timestamp_ms { continue; } // Don't add duplicate if manifest.following_posts.iter().any(|e| e.post_id == *new_post_id) { continue; } // Keep max 10 following_posts if manifest.following_posts.len() >= 10 { continue; } manifest.following_posts.push(new_entry.clone()); manifest.updated_at = new_timestamp_ms; manifest.signature = crypto::sign_manifest(&self.secret_seed, &manifest); let updated_json = match serde_json::to_string(&manifest) { Ok(j) => j, Err(_) => continue, }; let storage = self.storage.get().await; let _ = storage.store_cdn_manifest(&cid, &updated_json, &self.node_id, new_timestamp_ms); drop(storage); } } async fn resolve_recipients(&self, intent: &VisibilityIntent) -> anyhow::Result> { let storage = self.storage.get().await; match intent { VisibilityIntent::Public => Ok(vec![]), VisibilityIntent::Friends => storage.list_public_follows(), VisibilityIntent::Circle(name) => storage.get_circle_members(name), VisibilityIntent::Direct(ids) => Ok(ids.clone()), } } pub async fn get_feed( &self, ) -> anyhow::Result)>> { let (raw, group_seeds) = { let storage = self.storage.get().await; let posts = storage.get_feed()?; let seeds = storage.get_all_group_seeds_map().unwrap_or_default(); (posts, seeds) }; Ok(self.decrypt_posts(raw, &group_seeds)) } pub async fn get_all_posts( &self, ) -> anyhow::Result)>> { let (raw, group_seeds) = { let storage = self.storage.get().await; let posts = storage.list_posts_reverse_chron()?; let seeds = storage.get_all_group_seeds_map().unwrap_or_default(); (posts, seeds) }; Ok(self.decrypt_posts(raw, &group_seeds)) } fn decrypt_posts( &self, posts: Vec<(PostId, Post, PostVisibility)>, group_seeds: &std::collections::HashMap<(crate::types::GroupId, crate::types::GroupEpoch), ([u8; 32], [u8; 32])>, ) -> Vec<(PostId, Post, PostVisibility, Option)> { posts .into_iter() .map(|(id, post, vis)| { let decrypted = match &vis { PostVisibility::Public => None, PostVisibility::Encrypted { recipients } => { crypto::decrypt_post( &post.content, &self.secret_seed, &self.node_id, &post.author, recipients, ) .unwrap_or(None) } PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek } => { group_seeds.get(&(*group_id, *epoch)) .and_then(|(seed, pubkey)| { crypto::decrypt_group_post( &post.content, seed, pubkey, wrapped_cek, ).ok() }) } }; (id, post, vis, decrypted) }) .collect() } // ---- Follows ---- pub async fn follow(&self, node_id: &NodeId) -> anyhow::Result<()> { let connected = self.network.is_connected(node_id).await; let storage = self.storage.get().await; storage.add_follow(node_id)?; // Upsert social route let is_audience = storage.list_audience_members()?.contains(node_id); let relation = if is_audience { SocialRelation::Mutual } else { SocialRelation::Follow }; let addresses = storage.get_peer_record(node_id)? .map(|r| r.addresses).unwrap_or_default(); let peer_addresses = storage.build_peer_addresses_for(node_id)?; let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) .unwrap_or_default().as_millis() as u64; let preferred_tree = storage.build_preferred_tree_for(node_id).unwrap_or_default(); storage.upsert_social_route(&SocialRouteEntry { node_id: *node_id, addresses, peer_addresses, relation, status: if connected { SocialStatus::Online } else { SocialStatus::Disconnected }, last_connected_ms: 0, last_seen_ms: now, reach_method: ReachMethod::Direct, preferred_tree, })?; Ok(()) } pub async fn unfollow(&self, node_id: &NodeId) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.remove_follow(node_id)?; // Downgrade or remove social route let is_audience = storage.list_audience_members()?.contains(node_id); if is_audience { // Downgrade from Mutual to Audience if let Some(mut route) = storage.get_social_route(node_id)? { route.relation = SocialRelation::Audience; storage.upsert_social_route(&route)?; } } else { storage.remove_social_route(node_id)?; } Ok(()) } pub async fn list_follows(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_follows() } // ---- Profiles ---- pub async fn set_profile(&self, display_name: String, bio: String) -> anyhow::Result { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let recent_peers = self.current_recent_peers().await; let profile = { let storage = self.storage.get().await; let existing_anchors = storage.get_peer_anchors(&self.node_id).unwrap_or_default(); let preferred_peers = storage.list_preferred_peers().unwrap_or_default(); let (existing_visible, existing_avatar) = storage.get_profile(&self.node_id) .ok() .flatten() .map(|p| (p.public_visible, p.avatar_cid)) .unwrap_or((true, None)); let profile = PublicProfile { node_id: self.node_id, display_name, bio, updated_at: now, anchors: existing_anchors, recent_peers, preferred_peers, public_visible: existing_visible, avatar_cid: existing_avatar, }; storage.store_profile(&profile)?; profile }; let pushed = self.network.push_profile(&profile).await; if pushed > 0 { info!(pushed, "Pushed profile update to peers"); } Ok(profile) } pub async fn set_anchors(&self, anchors: Vec) -> anyhow::Result { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let recent_peers = self.current_recent_peers().await; let profile = { let storage = self.storage.get().await; let existing = storage.get_profile(&self.node_id)?; let (display_name, bio, public_visible, avatar_cid) = match existing { Some(p) => (p.display_name, p.bio, p.public_visible, p.avatar_cid), None => (String::new(), String::new(), true, None), }; let preferred_peers = storage.list_preferred_peers().unwrap_or_default(); let profile = PublicProfile { node_id: self.node_id, display_name, bio, updated_at: now, anchors, recent_peers, preferred_peers, public_visible, avatar_cid, }; storage.store_profile(&profile)?; profile }; let pushed = self.network.push_profile(&profile).await; if pushed > 0 { info!(pushed, "Pushed anchor update to peers"); } Ok(profile) } pub async fn get_peer_anchors(&self, node_id: &NodeId) -> anyhow::Result> { let storage = self.storage.get().await; storage.get_peer_anchors(node_id) } pub async fn get_profile(&self, node_id: &NodeId) -> anyhow::Result> { let storage = self.storage.get().await; storage.get_profile(node_id) } pub async fn my_profile(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.get_profile(&self.node_id) } pub async fn has_profile(&self) -> anyhow::Result { let storage = self.storage.get().await; Ok(storage.get_profile(&self.node_id)?.is_some()) } pub async fn get_display_name(&self, node_id: &NodeId) -> anyhow::Result> { let storage = self.storage.get().await; storage.get_display_name(node_id) } // ---- Blobs ---- /// Get a blob by CID from local store. pub async fn get_blob(&self, cid: &[u8; 32]) -> anyhow::Result>> { let data = self.blob_store.get(cid)?; if data.is_some() { let storage = self.storage.get().await; let _ = storage.touch_blob_access(cid); } Ok(data) } /// Decrypt a blob in the context of a post's visibility. /// Public posts pass through unchanged. Encrypted/group-encrypted posts decrypt with the CEK. fn decrypt_blob_for_post( &self, data: Vec, post: &Post, visibility: &PostVisibility, group_seeds: &std::collections::HashMap<([u8; 32], u64), ([u8; 32], [u8; 32])>, ) -> anyhow::Result>> { match visibility { PostVisibility::Public => Ok(Some(data)), PostVisibility::Encrypted { recipients } => { let cek = crypto::unwrap_cek_for_recipient( &self.secret_seed, &self.node_id, &post.author, recipients, )?; match cek { Some(cek) => { let plaintext = crypto::decrypt_bytes_with_cek(&data, &cek)?; Ok(Some(plaintext)) } None => Ok(None), } } PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek } => { if let Some((seed, pubkey)) = group_seeds.get(&(*group_id, *epoch)) { let cek = crypto::unwrap_group_cek(seed, pubkey, wrapped_cek)?; let plaintext = crypto::decrypt_bytes_with_cek(&data, &cek)?; Ok(Some(plaintext)) } else { Ok(None) } } } } /// Get a blob by CID, decrypting it in the context of the given post. /// For public posts, returns raw blob data. For encrypted posts, decrypts with the post's CEK. pub async fn get_blob_for_post( &self, cid: &[u8; 32], post_id: &PostId, ) -> anyhow::Result>> { // Get raw blob data (local — no lock needed) let raw_data = match self.blob_store.get(cid)? { Some(d) => d, None => return Ok(None), }; // Single lock acquisition for all DB reads let (post, visibility, group_seeds) = { let storage = self.storage.get().await; let _ = storage.touch_blob_access(cid); match storage.get_post_with_visibility(post_id)? { Some((post, vis)) => { let seeds = if matches!(vis, PostVisibility::GroupEncrypted { .. }) { storage.get_all_group_seeds_map().unwrap_or_default() } else { std::collections::HashMap::new() }; (post, vis, seeds) } None => return Ok(Some(raw_data)), // No post context — return raw } }; // Lock released — decrypt without lock match &visibility { PostVisibility::Public => Ok(Some(raw_data)), _ => self.decrypt_blob_for_post(raw_data, &post, &visibility, &group_seeds), } } /// Prefetch blobs for recently synced posts from a peer. /// Scans recent posts (newest first) for missing blobs, caps at 20 per cycle. /// Runs outside any locks. const MAX_PREFETCH_PER_CYCLE: usize = 20; pub async fn prefetch_blobs_from_peer(&self, peer_id: &NodeId) { // Brief lock: get post IDs and their attachment info let posts_with_atts: Vec<(PostId, NodeId, Vec)> = { let storage = self.storage.get().await; let post_ids = storage.list_post_ids().unwrap_or_default(); let mut result = Vec::new(); for pid in post_ids { if result.len() >= Self::MAX_PREFETCH_PER_CYCLE { break; } if let Ok(Some(post)) = storage.get_post(&pid) { if !post.attachments.is_empty() { result.push((pid, post.author, post.attachments.clone())); } } } result }; // Lock released — check blob store and filter without lock let mut missing: Vec<(PostId, NodeId, Vec)> = Vec::new(); let mut total_missing = 0usize; for (pid, author, atts) in posts_with_atts { if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; } let missing_atts: Vec<_> = atts.into_iter() .filter(|a| !self.blob_store.has(&a.cid)) .collect(); if !missing_atts.is_empty() { total_missing += missing_atts.len(); missing.push((pid, author, missing_atts)); } } if missing.is_empty() { return; } let mut fetched = 0usize; for (post_id, author, attachments) in &missing { for att in attachments { if fetched >= Self::MAX_PREFETCH_PER_CYCLE { break; } match self.fetch_blob_with_fallback( &att.cid, post_id, author, &att.mime_type, 0, ).await { Ok(Some(_)) => { fetched += 1; } Ok(None) => {} Err(e) => { tracing::debug!( cid = hex::encode(att.cid), error = %e, "Blob prefetch failed" ); } } } } if fetched > 0 { tracing::info!(fetched, peer = hex::encode(peer_id), "Prefetched blobs after sync"); } } /// Check if a blob exists locally. pub fn has_blob(&self, cid: &[u8; 32]) -> bool { self.blob_store.has(cid) } /// Fetch a blob from a peer, storing it locally and recording CDN metadata. pub async fn fetch_blob_from_peer( &self, cid: &[u8; 32], from_peer: &NodeId, post_id: &PostId, author: &NodeId, mime_type: &str, created_at: u64, ) -> anyhow::Result>> { // Check local first if let Some(data) = self.blob_store.get(cid)? { return Ok(Some(data)); } // Fetch with CDN metadata let (data, response) = self.network.fetch_blob_full(cid, from_peer).await?; if let Some(ref data) = data { // Store blob locally self.blob_store.store(cid, data)?; let storage = self.storage.get().await; storage.record_blob(cid, post_id, author, data.len() as u64, mime_type, created_at)?; // Store AuthorManifest if provided (extract from CdnManifest wrapper) if let Some(ref cdn_manifest) = response.manifest { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); let _ = storage.store_cdn_manifest( cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at, ); } } // Record upstream source let source_addrs: Vec = response.manifest.as_ref() .map(|m| m.host_addresses.clone()) .unwrap_or_default(); let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs); } Ok(data) } /// Fetch a blob with CDN-aware cascade, preferring non-anchor sources to save anchor /// delivery budget: /// 1. Local → 2. Existing upstream → 3. Lateral peers (non-anchor first) /// → 4. Replicas → 5. Author → 6. Redirect peers /// Anchors are deprioritized at each step via storage-level ordering. pub async fn fetch_blob_with_fallback( &self, cid: &[u8; 32], post_id: &PostId, author: &NodeId, mime_type: &str, created_at: u64, ) -> anyhow::Result>> { // 1. Check local if let Some(data) = self.blob_store.get(cid)? { let storage = self.storage.get().await; let _ = storage.touch_blob_access(cid); return Ok(Some(data)); } // Collect redirect peers from responses in case we need them later let mut redirect_peers: Vec = Vec::new(); // 2. Try existing upstream (if we previously fetched this blob) let upstream = { let storage = self.storage.get().await; storage.get_blob_upstream(cid)? }; if let Some((upstream_nid, _upstream_addrs)) = upstream { match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} Err(e) => warn!(error = %e, "blob fetch from upstream failed"), } } // 3. Lateral N0-N2: mesh peers + N2 peers who have the author's posts // (sorted by get_lateral_blob_sources: non-anchors first) let lateral_sources = { let storage = self.storage.get().await; storage.get_lateral_blob_sources(author, post_id).unwrap_or_default() }; for lateral in lateral_sources { if lateral == *author { continue; // Author tried separately below } match self.network.fetch_blob_full(cid, &lateral).await { Ok((Some(data), response)) => { self.blob_store.store(cid, &data)?; let storage = self.storage.get().await; storage.record_blob(cid, post_id, author, data.len() as u64, mime_type, created_at)?; if let Some(ref cdn_manifest) = response.manifest { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at); } } let _ = storage.store_blob_upstream(cid, &lateral, &[]); return Ok(Some(data)); } Ok((None, response)) => { redirect_peers.extend(response.cdn_redirect_peers); } Err(e) => warn!(peer = hex::encode(lateral), error = %e, "lateral blob fetch failed"), } } // 4. Try replica peers (before author — replicas are often closer/cheaper) let replicas = { let storage = self.storage.get().await; storage.get_replica_peers(post_id, 3_600_000)? }; for replica in replicas { match self.fetch_blob_from_peer(cid, &replica, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} Err(e) => warn!(peer = hex::encode(replica), error = %e, "blob fetch from replica failed"), } } // 5. Try author match self.fetch_blob_from_peer(cid, author, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} Err(e) => warn!(error = %e, "blob fetch from author failed"), } // 6. Try redirect peers (from any step that returned cdn_redirect_peers) for rp in &redirect_peers { if let Ok(nid_bytes) = hex::decode(&rp.n) { if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { match self.fetch_blob_from_peer(cid, &nid, post_id, author, mime_type, created_at).await { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => {} Err(e) => warn!(peer = &rp.n, error = %e, "redirect blob fetch failed"), } } } } Ok(None) } // ---- Circles ---- pub async fn create_circle(&self, name: String) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.create_circle(&name)?; drop(storage); self.create_group_key_for_circle(&name).await?; Ok(()) } pub async fn delete_circle(&self, name: String) -> anyhow::Result<()> { let storage = self.storage.get().await; // Delete group key and associated data if let Ok(Some(gk)) = storage.get_group_key_by_circle(&name) { let _ = storage.delete_group_key(&gk.group_id); } storage.delete_circle(&name) } pub async fn add_to_circle(&self, circle_name: String, node_id: NodeId) -> anyhow::Result<()> { { let storage = self.storage.get().await; storage.add_circle_member(&circle_name, &node_id)?; } // Wrap current group key for new member and distribute let distribute_payload = { let storage = self.storage.get().await; if let Ok(Some(gk)) = storage.get_group_key_by_circle(&circle_name) { if gk.admin == self.node_id { if let Ok(Some(seed)) = storage.get_group_seed(&gk.group_id, gk.epoch) { match crypto::wrap_group_key_for_member(&self.secret_seed, &node_id, &seed) { Ok(wrapped) => { let mk = crate::types::GroupMemberKey { member: node_id, epoch: gk.epoch, wrapped_group_key: wrapped, }; let _ = storage.store_group_member_key(&gk.group_id, &mk); Some(crate::protocol::GroupKeyDistributePayload { group_id: gk.group_id, circle_name: circle_name.clone(), epoch: gk.epoch, group_public_key: gk.group_public_key, admin: self.node_id, member_keys: vec![mk], }) } Err(e) => { warn!(error = %e, "Failed to wrap group key for new member"); None } } } else { None } } else { None } } else { None } }; if let Some(payload) = distribute_payload { self.network.push_group_key(&node_id, &payload).await; } Ok(()) } pub async fn remove_from_circle( &self, circle_name: String, node_id: NodeId, ) -> anyhow::Result<()> { { let storage = self.storage.get().await; storage.remove_circle_member(&circle_name, &node_id)?; } // Rotate group key if we're the admin self.rotate_group_key(&circle_name).await; Ok(()) } /// Create a group key for a circle (called on circle creation). async fn create_group_key_for_circle(&self, circle_name: &str) -> anyhow::Result<()> { let (seed, pubkey) = crypto::generate_group_keypair(); let group_id = crypto::compute_group_id(&pubkey); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let record = crate::types::GroupKeyRecord { group_id, circle_name: circle_name.to_string(), epoch: 1, group_public_key: pubkey, admin: self.node_id, created_at: now, }; let storage = self.storage.get().await; storage.create_group_key(&record, Some(&seed))?; storage.store_group_seed(&group_id, 1, &seed)?; // Wrap for ourselves let self_wrapped = crypto::wrap_group_key_for_member(&self.secret_seed, &self.node_id, &seed)?; let self_mk = crate::types::GroupMemberKey { member: self.node_id, epoch: 1, wrapped_group_key: self_wrapped, }; storage.store_group_member_key(&group_id, &self_mk)?; // Wrap for existing circle members and distribute let members = storage.get_circle_members(circle_name)?; drop(storage); for member in &members { if *member == self.node_id { continue; } match crypto::wrap_group_key_for_member(&self.secret_seed, member, &seed) { Ok(wrapped) => { let mk = crate::types::GroupMemberKey { member: *member, epoch: 1, wrapped_group_key: wrapped, }; { let storage = self.storage.get().await; let _ = storage.store_group_member_key(&group_id, &mk); } let payload = crate::protocol::GroupKeyDistributePayload { group_id, circle_name: circle_name.to_string(), epoch: 1, group_public_key: pubkey, admin: self.node_id, member_keys: vec![mk], }; self.network.push_group_key(member, &payload).await; } Err(e) => { warn!(member = hex::encode(member), error = %e, "Failed to wrap group key for member"); } } } info!(circle = %circle_name, group_id = hex::encode(group_id), "Created group key for circle"); Ok(()) } /// Rotate the group key for a circle (called on member removal). async fn rotate_group_key(&self, circle_name: &str) { let rotate_result = { let storage = self.storage.get().await; let gk = match storage.get_group_key_by_circle(circle_name) { Ok(Some(gk)) if gk.admin == self.node_id => gk, _ => return, }; let remaining_members = match storage.get_circle_members(circle_name) { Ok(m) => m, Err(_) => return, }; // Always include ourselves let mut all_members = remaining_members; if !all_members.contains(&self.node_id) { all_members.push(self.node_id); } match crypto::rotate_group_key(&self.secret_seed, gk.epoch, &all_members) { Ok((new_seed, new_pubkey, new_epoch, member_keys)) => { Some((gk.group_id, new_seed, new_pubkey, new_epoch, member_keys, circle_name.to_string())) } Err(e) => { warn!(error = %e, "Failed to rotate group key"); None } } }; if let Some((group_id, new_seed, new_pubkey, new_epoch, member_keys, circle_name)) = rotate_result { // Update storage { let storage = self.storage.get().await; let _ = storage.update_group_epoch(&group_id, new_epoch, &new_pubkey, Some(&new_seed)); let _ = storage.store_group_seed(&group_id, new_epoch, &new_seed); for mk in &member_keys { let _ = storage.store_group_member_key(&group_id, mk); } } // Distribute to each member for mk in &member_keys { if mk.member == self.node_id { continue; } let payload = crate::protocol::GroupKeyDistributePayload { group_id, circle_name: circle_name.clone(), epoch: new_epoch, group_public_key: new_pubkey, admin: self.node_id, member_keys: vec![mk.clone()], }; self.network.push_group_key(&mk.member, &payload).await; } info!(circle = %circle_name, epoch = new_epoch, "Rotated group key"); } } pub async fn list_circles(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_circles() } // ---- Circle Profiles ---- /// Set a circle profile: store locally, encrypt with group key, push to connected peers. pub async fn set_circle_profile( &self, circle_name: String, display_name: String, bio: String, avatar_cid: Option<[u8; 32]>, ) -> anyhow::Result { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let cp = crate::types::CircleProfile { author: self.node_id, circle_name: circle_name.clone(), display_name, bio, avatar_cid, updated_at: now, }; // Get group key for this circle let (encrypted_payload, wrapped_cek, group_id, epoch) = { let storage = self.storage.get().await; // Verify circle exists let circles = storage.list_circles()?; if !circles.iter().any(|c| c.name == circle_name) { anyhow::bail!("circle '{}' does not exist", circle_name); } let gk = storage.get_group_key_by_circle(&circle_name)? .ok_or_else(|| anyhow::anyhow!("no group key for circle '{}'", circle_name))?; if gk.admin != self.node_id { anyhow::bail!("not admin of circle '{}'", circle_name); } let seed = storage.get_group_seed(&gk.group_id, gk.epoch)? .ok_or_else(|| anyhow::anyhow!("group seed not found for circle '{}'", circle_name))?; // Encrypt circle profile as JSON let json = serde_json::to_string(&cp)?; let (encrypted, wrapped) = crypto::encrypt_post_for_group(&json, &seed, &gk.group_public_key)?; // Store plaintext + encrypted form storage.set_circle_profile(&cp)?; storage.store_remote_circle_profile( &self.node_id, &circle_name, &cp, &encrypted, &wrapped, &gk.group_id, gk.epoch, )?; (encrypted, wrapped, gk.group_id, gk.epoch) }; // Push to all connected mesh peers let payload = crate::protocol::CircleProfileUpdatePayload { author: self.node_id, circle_name, group_id, epoch, encrypted_payload, wrapped_cek, updated_at: now, }; let pushed = self.network.push_circle_profile(&payload).await; if pushed > 0 { info!(pushed, "Pushed circle profile update to peers"); } Ok(cp) } /// Delete a circle profile and push tombstone. pub async fn delete_circle_profile(&self, circle_name: String) -> anyhow::Result<()> { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let payload = { let storage = self.storage.get().await; let gk = storage.get_group_key_by_circle(&circle_name)? .ok_or_else(|| anyhow::anyhow!("no group key for circle '{}'", circle_name))?; let seed = storage.get_group_seed(&gk.group_id, gk.epoch)? .ok_or_else(|| anyhow::anyhow!("group seed not found"))?; // Encrypt empty string as tombstone let (encrypted, wrapped) = crypto::encrypt_post_for_group("", &seed, &gk.group_public_key)?; storage.delete_circle_profile(&self.node_id, &circle_name)?; crate::protocol::CircleProfileUpdatePayload { author: self.node_id, circle_name, group_id: gk.group_id, epoch: gk.epoch, encrypted_payload: encrypted, wrapped_cek: wrapped, updated_at: now, } }; self.network.push_circle_profile(&payload).await; Ok(()) } /// Set public_visible flag and push profile update. pub async fn set_public_visible(&self, visible: bool) -> anyhow::Result<()> { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let recent_peers = self.current_recent_peers().await; let profile = { let storage = self.storage.get().await; let existing = storage.get_profile(&self.node_id)?; let (display_name, bio, avatar_cid) = match existing { Some(p) => (p.display_name, p.bio, p.avatar_cid), None => (String::new(), String::new(), None), }; let existing_anchors = storage.get_peer_anchors(&self.node_id).unwrap_or_default(); let preferred_peers = storage.list_preferred_peers().unwrap_or_default(); let profile = PublicProfile { node_id: self.node_id, display_name, bio, updated_at: now, anchors: existing_anchors, recent_peers, preferred_peers, public_visible: visible, avatar_cid, }; storage.store_profile(&profile)?; profile }; self.network.push_profile(&profile).await; Ok(()) } /// Resolve display info for any peer, taking circle profiles into account. pub async fn resolve_display_name( &self, author: &NodeId, ) -> anyhow::Result<(String, String, Option<[u8; 32]>)> { let storage = self.storage.get().await; storage.resolve_display_for_peer(author, &self.node_id) } /// Get our own circle profile for a given circle. pub async fn get_circle_profile( &self, circle_name: &str, ) -> anyhow::Result> { let storage = self.storage.get().await; storage.get_circle_profile(&self.node_id, circle_name) } /// Get the public_visible setting for our own profile. pub async fn get_public_visible(&self) -> anyhow::Result { let storage = self.storage.get().await; Ok(storage .get_profile(&self.node_id)? .map(|p| p.public_visible) .unwrap_or(true)) } // ---- Settings ---- /// Get a setting value by key. pub async fn get_setting(&self, key: &str) -> anyhow::Result> { let storage = self.storage.get().await; storage.get_setting(key) } /// Set a setting value (upsert). pub async fn set_setting(&self, key: &str, value: &str) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.set_setting(key, value) } // ---- Cache stats & pressure ---- /// Get cache statistics: (used_bytes, max_bytes, blob_count). /// max_bytes comes from the `cache_size_bytes` setting (default 1 GB, 0 = unlimited). pub async fn get_cache_stats(&self) -> anyhow::Result<(u64, u64, u64)> { let storage = self.storage.get().await; let used = storage.total_blob_bytes()?; let count = storage.count_blobs()?; let max_str = storage.get_setting("cache_size_bytes")?.unwrap_or_default(); let max: u64 = max_str.parse().unwrap_or(1_073_741_824); Ok((used, max, count)) } /// Compute cache pressure score (0-255). /// 0 = no pressure (plenty of room or cache empty). /// 255 = maximum pressure (lowest-priority blob is >72 h old). /// Scales linearly: 0 h → 0, 36 h → 128, 72 h → 255. pub async fn compute_cache_pressure(&self) -> anyhow::Result { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let staleness_ms = 3600 * 1000; let (candidates, follows, audience_members) = { let storage = self.storage.get().await; let candidates = storage.get_eviction_candidates(staleness_ms)?; let follows = storage.list_follows().unwrap_or_default(); let audience = storage.list_audience_members().unwrap_or_default(); (candidates, follows, audience) }; if candidates.is_empty() { return Ok(255); // Empty cache = max willingness to accept } // Filter to non-elevated blobs (not pinned, not own content, not followed author) let non_elevated: Vec<_> = candidates.iter().filter(|c| { !c.pinned && c.author != self.node_id && !follows.contains(&c.author) }).collect(); if non_elevated.is_empty() { return Ok(255); // All blobs are elevated — plenty of room for new content } // Find the lowest priority (oldest/least-valuable) blob let mut min_priority = f64::MAX; let mut min_created_at = u64::MAX; for c in &non_elevated { let priority = self.compute_blob_priority(c, &follows, &audience_members, now); if priority < min_priority { min_priority = priority; min_created_at = c.created_at; } } // Scale based on age of the oldest non-elevated blob let age_hours = now.saturating_sub(min_created_at) as f64 / (3600.0 * 1000.0); let pressure = if age_hours >= 72.0 { 255 } else { ((age_hours / 72.0) * 255.0) as u8 }; Ok(pressure) } // ---- Seen engagement tracking ---- /// Get seen engagement counts for a post. pub async fn get_seen_engagement(&self, post_id: &PostId) -> anyhow::Result<(u32, u32)> { let storage = self.storage.get().await; storage.get_seen_engagement(post_id) } /// Mark a post's engagement as seen (upsert). pub async fn set_seen_engagement(&self, post_id: &PostId, react_count: u32, comment_count: u32) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.set_seen_engagement(post_id, react_count, comment_count) } /// Get last-read timestamp for a conversation partner. pub async fn get_last_read_message(&self, partner_id: &NodeId) -> anyhow::Result { let storage = self.storage.get().await; storage.get_last_read_message(partner_id) } /// Mark a conversation as read up to the given timestamp. pub async fn set_last_read_message(&self, partner_id: &NodeId, timestamp_ms: u64) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.set_last_read_message(partner_id, timestamp_ms) } // ---- Delete / Revocation ---- pub async fn delete_post(&self, post_id: &PostId) -> anyhow::Result<()> { let post = { let storage = self.storage.get().await; storage .get_post(post_id)? .ok_or_else(|| anyhow::anyhow!("post not found"))? }; if post.author != self.node_id { anyhow::bail!("cannot delete: you are not the author"); } let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let signature = crypto::sign_delete(&self.secret_seed, post_id); let record = DeleteRecord { post_id: *post_id, author: self.node_id, timestamp_ms: now, signature, }; // Collect blob CIDs + CDN peers before cleanup let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec)>, Option<(NodeId, Vec)>)> = { let storage = self.storage.get().await; let cids = storage.get_blobs_for_post(post_id).unwrap_or_default(); cids.into_iter().map(|cid| { let downstream = storage.get_blob_downstream(&cid).unwrap_or_default(); let upstream = storage.get_blob_upstream(&cid).ok().flatten(); (cid, downstream, upstream) }).collect() }; // Clean up blobs (DB metadata + CDN metadata + filesystem) let blob_cids = { let storage = self.storage.get().await; let cids = storage.delete_blobs_for_post(post_id)?; for cid in &cids { let _ = storage.cleanup_cdn_for_blob(cid); } cids }; for cid in &blob_cids { if let Err(e) = self.blob_store.delete(cid) { warn!(cid = hex::encode(cid), error = %e, "Failed to delete blob file"); } } { let storage = self.storage.get().await; storage.store_delete(&record)?; storage.apply_delete(&record)?; } // Send CDN delete notices for each blob for (cid, downstream, upstream) in &blob_cdn_info { self.network.send_blob_delete_notices(cid, downstream, upstream.as_ref()).await; } let pushed = self.network.push_delete(&record).await; info!(post_id = hex::encode(post_id), pushed, blobs_removed = blob_cids.len(), "Deleted post"); Ok(()) } pub async fn revoke_post_access( &self, post_id: &PostId, revoked: &NodeId, mode: RevocationMode, ) -> anyhow::Result> { let (post, visibility) = { let storage = self.storage.get().await; storage .get_post_with_visibility(post_id)? .ok_or_else(|| anyhow::anyhow!("post not found"))? }; if post.author != self.node_id { anyhow::bail!("cannot revoke: you are not the author"); } let existing_recipients = match &visibility { PostVisibility::Public => anyhow::bail!("cannot revoke access on a public post"), PostVisibility::Encrypted { recipients } => recipients, PostVisibility::GroupEncrypted { .. } => { anyhow::bail!("cannot revoke individual access on a group-encrypted post; remove from circle instead") } }; let new_recipient_ids: Vec = existing_recipients .iter() .map(|wk| wk.recipient) .filter(|r| r != revoked) .collect(); if new_recipient_ids.len() == existing_recipients.len() { anyhow::bail!("revoked node was not a recipient of this post"); } match mode { RevocationMode::SyncAccessList => { let new_wrapped = crypto::rewrap_visibility( &self.secret_seed, &self.node_id, existing_recipients, &new_recipient_ids, )?; let new_vis = PostVisibility::Encrypted { recipients: new_wrapped, }; { let storage = self.storage.get().await; storage.update_post_visibility(post_id, &new_vis)?; } let update = VisibilityUpdate { post_id: *post_id, author: self.node_id, visibility: new_vis, }; let pushed = self.network.push_visibility(&update).await; info!(post_id = hex::encode(post_id), pushed, "Revoked access (sync mode)"); Ok(None) } RevocationMode::ReEncrypt => { let (new_content, new_wrapped) = crypto::re_encrypt_post( &post.content, &self.secret_seed, &self.node_id, existing_recipients, &new_recipient_ids, )?; let new_vis = PostVisibility::Encrypted { recipients: new_wrapped, }; let new_post = Post { author: self.node_id, content: new_content, attachments: post.attachments.clone(), timestamp_ms: post.timestamp_ms, }; let new_post_id = compute_post_id(&new_post); { let storage = self.storage.get().await; storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?; } // delete_post already pushes the DeleteRecord self.delete_post(post_id).await?; // Push replacement post directly to remaining recipients self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await; info!( old_id = hex::encode(post_id), new_id = hex::encode(new_post_id), "Re-encrypted post (revoke)" ); Ok(Some(new_post_id)) } } } pub async fn revoke_circle_access( &self, circle_name: &str, revoked: &NodeId, mode: RevocationMode, ) -> anyhow::Result { let posts = { let storage = self.storage.get().await; storage.find_posts_by_circle_intent(circle_name, &self.node_id)? }; let mut count = 0; for (post_id, _post, vis) in &posts { if let PostVisibility::Encrypted { recipients } = vis { if recipients.iter().any(|wk| &wk.recipient == revoked) { match self.revoke_post_access(post_id, revoked, mode).await { Ok(_) => count += 1, Err(e) => { warn!( post_id = hex::encode(post_id), error = %e, "Failed to revoke post access" ); } } } } } info!(circle = circle_name, count, "Revoked circle access"); Ok(count) } pub async fn get_redundancy_summary(&self) -> anyhow::Result<(usize, usize, usize, usize)> { let storage = self.storage.get().await; storage.get_redundancy_summary(&self.node_id, 3_600_000) } // ---- Networking ---- pub fn endpoint_addr(&self) -> iroh::EndpointAddr { self.network.endpoint_addr() } /// Connect to a peer by node ID using address resolution: /// 0. Already connected or has session → done /// 1. Social route cache → try cached address /// 2. Peers table → connect directly /// 3. N2/N3 lookup → ask tagged reporter for address /// 4. Worm lookup → fan-out search beyond N3 /// 5. Relay introduction → coordinate hole punch via relay peer /// 6. Session relay fallback → pipe through intermediary pub async fn connect_by_node_id(&self, peer_id: NodeId) -> anyhow::Result<()> { if self.network.is_connected(&peer_id).await { return Ok(()); } // Check if we already have a session connection if self.network.conn_handle().has_session(&peer_id).await { return Ok(()); } // Check if this peer is known to be behind NAT / unreachable directly let skip_direct = self.network.conn_handle().is_likely_unreachable(&peer_id).await; // Step 0: Try social route cache (skipped for known-unreachable peers) if !skip_direct { let storage = self.storage.get().await; if let Some(route) = storage.get_social_route(&peer_id)? { // Try cached addresses directly for addr in &route.addresses { let endpoint_id = match iroh::EndpointId::from_bytes(&peer_id) { Ok(eid) => eid, Err(_) => continue, }; let ep_addr = iroh::EndpointAddr::from(endpoint_id).with_ip_addr(*addr); drop(storage); if self.network.connect_to_peer(peer_id, ep_addr).await.is_ok() { info!(peer = hex::encode(peer_id), "Connected via social route cache"); return Ok(()); } // Re-acquire lock for next iteration break; // Only try first address from route directly } // Try peer_addresses: connect to their known peers and ask for target for pa in &route.peer_addresses { if let Ok(pa_nid) = crate::parse_node_id_hex(&pa.n) { if self.network.is_connected(&pa_nid).await { // Already connected to this peer — ask them let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None); if let Some(addr_str) = resolved { if let Ok((_nid, ep_addr)) = crate::parse_connect_string( &format!("{}@{}", hex::encode(peer_id), addr_str) ) { if self.network.connect_to_peer(peer_id, ep_addr).await.is_ok() { info!(peer = hex::encode(peer_id), via = &pa.n[..12], "Connected via social route peer referral"); return Ok(()); } } } } else if let Some(pa_addr_str) = pa.a.first() { // Try connecting to the peer first, then ask if let Ok(pa_sock) = pa_addr_str.parse::() { let pa_eid = match iroh::EndpointId::from_bytes(&pa_nid) { Ok(eid) => eid, Err(_) => continue, }; let pa_ep = iroh::EndpointAddr::from(pa_eid).with_ip_addr(pa_sock); if self.network.connect_to_peer(pa_nid, pa_ep).await.is_ok() { let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None); if let Some(addr_str) = resolved { if let Ok((_nid, ep_addr)) = crate::parse_connect_string( &format!("{}@{}", hex::encode(peer_id), addr_str) ) { if self.network.connect_to_peer(peer_id, ep_addr).await.is_ok() { info!(peer = hex::encode(peer_id), via = &pa.n[..12], "Connected via social route peer referral (new conn)"); return Ok(()); } } } } } } } } } } // Steps 1-4: Direct connection attempts (skipped for known-unreachable peers) if !skip_direct { // Step 1: Try direct address from peers table if let Some(addr) = self.network.addr_from_storage(&peer_id).await { if self.network.connect_to_peer(peer_id, addr).await.is_ok() { return Ok(()); } } // Step 2-3: Try address resolution via N2/N3 let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None); if let Some(addr_str) = resolved { if let Ok(addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(peer_id), addr_str)) { if self.network.connect_to_peer(peer_id, addr.1).await.is_ok() { return Ok(()); } } } // Step 4: Try worm lookup (fan-out search beyond N3) info!(peer = hex::encode(peer_id), "Trying worm lookup..."); if let Ok(Some(wr)) = self.network.worm_lookup(&peer_id).await { if wr.node_id == peer_id { if let Some(addr_str) = wr.addresses.first() { if let Ok(addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(peer_id), addr_str)) { if self.network.connect_to_peer(peer_id, addr.1).await.is_ok() { return Ok(()); } } } } else { info!( target = hex::encode(peer_id), found_via = hex::encode(wr.node_id), "Worm found target via recent peer" ); if let Some(addr_str) = wr.addresses.first() { if let Ok(needle_addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(wr.node_id), addr_str)) { if self.network.connect_to_peer(wr.node_id, needle_addr.1).await.is_ok() { let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None); if let Some(target_addr_str) = resolved { if let Ok(target_addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(peer_id), target_addr_str)) { if self.network.connect_to_peer(peer_id, target_addr.1).await.is_ok() { return Ok(()); } } } } } } } } // All direct attempts failed — mark peer as likely unreachable self.network.conn_handle().mark_unreachable(&peer_id); } // Step 6: Relay introduction — find relay peer(s) and request introduction { let on_cooldown = { let storage = self.storage.get().await; storage.is_relay_cooldown(&peer_id, 300_000).unwrap_or(false) }; if !on_cooldown { let relay_candidates = self.network.conn_handle().find_relays_for(&peer_id).await; let mut had_capacity_reject = false; let mut last_intro_id: Option = None; let mut last_relay_peer: Option = None; let mut last_relay_available = false; for (relay_peer, ttl) in &relay_candidates { info!( target = hex::encode(peer_id), relay = hex::encode(relay_peer), ttl, "Attempting relay introduction" ); let intro_result = tokio::time::timeout( std::time::Duration::from_secs(15), self.network.send_relay_introduce_standalone(relay_peer, &peer_id, *ttl), ).await; match intro_result { Ok(Ok(result)) if result.accepted => { info!( target = hex::encode(peer_id), addrs = ?result.target_addresses, relay_available = result.relay_available, "Relay introduction accepted, attempting hole punch" ); // Save for potential session relay fallback last_intro_id = Some(result.intro_id); last_relay_peer = Some(*relay_peer); last_relay_available = result.relay_available; // Try direct connection to target's addresses (hole punch with scanning) let our_profile = self.network.conn_handle().our_nat_profile().await; let peer_profile = { let s = self.storage.get().await; s.get_peer_nat_profile(&peer_id) }; if let Some(conn) = crate::connection::hole_punch_with_scanning( self.network.endpoint(), &peer_id, &result.target_addresses, our_profile, peer_profile, ).await { self.network.conn_handle().add_session(peer_id, conn, SessionReachMethod::HolePunch, None).await; self.network.conn_handle().mark_reachable(&peer_id); info!(peer = hex::encode(peer_id), "Connected via hole punch"); return Ok(()); } // Intro accepted but hole punch failed — try session relay below break; } Ok(Ok(result)) => { let reason = result.reject_reason.as_deref().unwrap_or("unknown"); if reason.contains("capacity") { debug!( relay = hex::encode(relay_peer), "Relay at capacity, trying next candidate" ); had_capacity_reject = true; continue; // Try next relay candidate } debug!( target = hex::encode(peer_id), reason, "Relay introduction rejected" ); // Target explicitly rejected — don't try more relays break; } Ok(Err(e)) => { debug!(error = %e, "Relay introduction failed, trying next candidate"); continue; // Network error — try next relay } Err(_) => { debug!("Relay introduction timed out, trying next candidate"); continue; // Timeout — try next relay } } } // Step 7: Session relay fallback — if intro was accepted but hole punch failed if let (Some(intro_id), Some(relay_peer)) = (last_intro_id, last_relay_peer) { if last_relay_available { info!( target = hex::encode(peer_id), relay = hex::encode(relay_peer), "Hole punch failed, attempting session relay" ); match self.attempt_session_relay(&relay_peer, &peer_id, &intro_id).await { Ok(()) => { info!(peer = hex::encode(peer_id), "Connected via session relay"); return Ok(()); } Err(e) => { debug!(error = %e, "Session relay failed"); } } } } // Record cooldown on failure (skip if all rejections were capacity-related) if !relay_candidates.is_empty() && !had_capacity_reject { let storage = self.storage.get().await; let _ = storage.record_relay_miss(&peer_id); } } } anyhow::bail!( "cannot resolve address for peer {} (tried social routes, peers table, N2/N3, worm lookup, and relay introduction)", hex::encode(peer_id) ) } /// Attempt to establish a session relay through an intermediary. async fn attempt_session_relay( &self, relay_peer: &NodeId, target: &NodeId, intro_id: &crate::connection::IntroId, ) -> anyhow::Result<()> { use crate::protocol::{ write_typed_message, MessageType, SessionRelayPayload, }; let relay_conn = self.network.conn_handle().get_connection(relay_peer).await .ok_or_else(|| anyhow::anyhow!("relay peer disconnected"))?; let (mut send, _recv) = relay_conn.open_bi().await?; let payload = SessionRelayPayload { intro_id: *intro_id, target: *target, }; write_typed_message(&mut send, MessageType::SessionRelay, &payload).await?; self.network.conn_handle().add_session(*target, relay_conn, SessionReachMethod::Relayed, None).await; Ok(()) } /// Worm lookup: fan-out search for a peer beyond the 3-hop discovery map. pub async fn worm_lookup(&self, target: &NodeId) -> anyhow::Result> { self.network.worm_lookup(target).await } /// Connect to a peer and establish a mesh connection pub async fn sync_with(&self, peer_id: NodeId) -> anyhow::Result<()> { self.connect_by_node_id(peer_id).await?; let stats = self.network.conn_handle().pull_from_peer(&peer_id).await?; // Also fetch engagement data (reactions, comments) for posts we hold let engagement = self.network.conn_handle().fetch_engagement_from_peer(&peer_id).await.unwrap_or(0); info!( peer = hex::encode(peer_id), posts = stats.posts_received, engagement_headers = engagement, "Sync complete" ); // Prefetch blobs for posts we just received if stats.posts_received > 0 { self.prefetch_blobs_from_peer(&peer_id).await; } Ok(()) } /// Connect to a peer using full address pub async fn sync_with_addr(&self, addr: iroh::EndpointAddr) -> anyhow::Result<()> { let peer_id = *addr.id.as_bytes(); self.network.connect_to_peer(peer_id, addr).await?; let stats = self.network.conn_handle().pull_from_peer(&peer_id).await?; info!( peer = hex::encode(peer_id), posts = stats.posts_received, "Sync complete" ); Ok(()) } /// Pull from all connected peers pub async fn sync_all(&self) -> anyhow::Result<()> { let stats = self.network.pull_from_all().await?; info!( "Pull complete: {} posts from {} peers", stats.posts_received, stats.peers_pulled ); Ok(()) } pub async fn add_peer(&self, peer_id: NodeId) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.add_peer(&peer_id)?; Ok(()) } pub async fn list_peers(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_peers() } pub async fn list_peer_records(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_peer_records() } pub fn list_bootstrap_anchors(&self) -> &[(NodeId, iroh::EndpointAddr)] { &self.bootstrap_anchors } /// Get connection info for display: (node_id, slot_kind, connected_at) pub async fn list_connections(&self) -> Vec<(NodeId, PeerSlotKind, u64)> { self.network.connection_info().await } pub async fn stats(&self) -> anyhow::Result { let storage = self.storage.get().await; Ok(NodeStats { post_count: storage.post_count()?, peer_count: storage.list_peers()?.len(), follow_count: storage.list_follows()?.len(), }) } /// Start the accept loop (run in background) pub fn start_accept_loop(&self) -> tokio::task::JoinHandle> { let network = Arc::clone(&self.network); tokio::spawn(async move { network.run_accept_loop().await }) } /// Start pull cycle: Protocol v4 tiered pull — 60s ticks, full pull on first tick, /// then only pull for stale authors (last_sync_ms > 4 hours old). pub fn start_pull_cycle(self: &Arc, _interval_secs: u64) -> tokio::task::JoinHandle<()> { let node = Arc::clone(self); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); let mut is_first_tick = true; loop { interval.tick().await; if is_first_tick { // Full pull on startup let _ = node.network.pull_from_all().await; is_first_tick = false; // Prefetch after initial sync let peers = node.network.conn_handle().connected_peers().await; for peer_id in peers { node.prefetch_blobs_from_peer(&peer_id).await; } continue; } // Tiered: only pull for stale authors (4-hour default) let stale_authors = { let storage = node.storage.get().await; storage.get_stale_follows(4 * 3600 * 1000).unwrap_or_default() }; if stale_authors.is_empty() { continue; // Most ticks skip — no stale authors } // Find a connected peer and pull let peers = node.network.conn_handle().connected_peers().await; if let Some(peer_id) = peers.first() { match node.network.conn_handle().pull_from_peer(peer_id).await { Ok(stats) => { if stats.posts_received > 0 { tracing::debug!( posts = stats.posts_received, "Tiered pull complete" ); node.prefetch_blobs_from_peer(peer_id).await; } } Err(e) => tracing::debug!(error = %e, "Tiered pull failed"), } } } }) } /// Start diff cycle: every interval_secs, broadcast N1/N2 changes to connected peers. pub fn start_diff_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let full_sync_interval = (4 * 60 * 60) / interval_secs; // every 4 hours tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); let mut tick_count: u64 = 0; loop { interval.tick().await; tick_count += 1; if tick_count % full_sync_interval == 0 { // Full state re-broadcast every 4 hours to catch missed diffs match network.broadcast_full_state().await { Ok(count) => { if count > 0 { tracing::info!(count, "Full N1/N2 state broadcast (4h cycle)"); } } Err(e) => { tracing::debug!(error = %e, "Full state broadcast failed"); } } } else { match network.broadcast_diff().await { Ok(count) => { if count > 0 { tracing::debug!(count, "Broadcast routing diff"); } } Err(e) => { tracing::debug!(error = %e, "Routing diff broadcast failed"); } } } } }) } /// Start rebalance cycle: every interval_secs, rebalance connection slots. pub fn start_rebalance_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let timer = Arc::clone(&self.last_rebalance_ms); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; timer.store(now, AtomicOrdering::Relaxed); if let Err(e) = network.rebalance().await { tracing::debug!(error = %e, "Rebalance failed"); } } }) } /// Start the reactive growth loop: wakes on signal, sequentially fills local /// slots with the most diverse N2 candidates. Each connection updates N2/N3 /// knowledge before picking the next candidate. pub fn start_growth_loop(&self) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { network.set_growth_tx(tx.clone()).await; // Initial kick: bootstrap may have already populated N2 before this started let _ = tx.try_send(()); network.run_growth_loop(rx).await; }) } /// Start recovery loop: triggered when mesh drops below 2 connections. /// Immediately reconnects to anchors and requests referrals. pub fn start_recovery_loop(&self) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); let node_id = self.node_id; let alog = Arc::clone(&self.activity_log); let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1); tokio::spawn(async move { let log_evt = |level: ActivityLevel, cat: ActivityCategory, msg: String, peer: Option| { if let Ok(mut log) = alog.try_lock() { log.log(level, cat, msg, peer); } }; network.set_recovery_tx(tx).await; while rx.recv().await.is_some() { tracing::info!("Recovery triggered: reconnecting to anchors"); log_evt(ActivityLevel::Warn, ActivityCategory::Recovery, "Recovery triggered: mesh empty".into(), None); // Debounce: wait briefly for more disconnects to settle tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Drain any queued signals while rx.try_recv().is_ok() {} // Gather anchors: known_anchors table, then anchor peers fallback let anchors: Vec<(crate::types::NodeId, Vec)> = { let s = storage.get().await; let known = s.list_known_anchors().unwrap_or_default(); if !known.is_empty() { known } else { s.list_anchor_peers().unwrap_or_default() .into_iter() .map(|r| (r.node_id, r.addresses)) .collect() } }; for (anchor_nid, anchor_addrs) in &anchors { if *anchor_nid == node_id { continue; } // Connect to anchor (mesh or session fallback) if !network.is_peer_connected_or_session(anchor_nid).await { let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) { Ok(eid) => eid, Err(_) => continue, }; let mut addr = iroh::EndpointAddr::from(endpoint_id); for sa in anchor_addrs { addr = addr.with_ip_addr(*sa); } match network.connect_to_anchor(*anchor_nid, addr).await { Ok(()) => { log_evt(ActivityLevel::Info, ActivityCategory::Recovery, "Connected to anchor".into(), Some(*anchor_nid)); } Err(e) => { tracing::debug!(error = %e, "Recovery: anchor connect failed"); log_evt(ActivityLevel::Warn, ActivityCategory::Recovery, format!("Anchor connect failed: {}", e), Some(*anchor_nid)); continue; } } } // Register with anchor let _ = network.send_anchor_register(anchor_nid).await; // Request referrals match network.request_anchor_referrals(anchor_nid).await { Ok(referrals) => { for referral in referrals { if referral.node_id == node_id { continue; } if let Some(addr_str) = referral.addresses.first() { let connect_str = format!( "{}@{}", hex::encode(referral.node_id), addr_str, ); if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) { match network.connect_to_peer(rid, raddr).await { Ok(()) => { tracing::info!(peer = hex::encode(rid), "Recovery: connected to referred peer"); log_evt(ActivityLevel::Info, ActivityCategory::Recovery, "Connected to referred peer".into(), Some(rid)); } Err(_) => { match network.connect_via_introduction(rid, *anchor_nid).await { Ok(()) => { tracing::info!(peer = hex::encode(rid), "Recovery: connected via hole punch"); log_evt(ActivityLevel::Info, ActivityCategory::Recovery, "Connected via hole punch".into(), Some(rid)); } Err(e) => { tracing::debug!(error = %e, peer = hex::encode(rid), "Recovery: hole punch failed"); log_evt(ActivityLevel::Warn, ActivityCategory::Recovery, format!("Hole punch failed: {}", e), Some(rid)); } } } } } } } } Err(e) => tracing::debug!(error = %e, "Recovery: referral request failed"), } } let conn_count = network.connection_count().await; tracing::info!(connections = conn_count, "Recovery complete"); log_evt(ActivityLevel::Info, ActivityCategory::Recovery, format!("Recovery complete, {} connections", conn_count), None); } }) } /// Start social checkin cycle: every interval_secs, refresh stale social routes. /// Uses ephemeral connections if not persistently connected. pub fn start_social_checkin_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; let stale = { let s = storage.get().await; s.list_stale_social_routes(interval_secs as u64 * 1000).unwrap_or_default() }; for route in stale { let our_addrs: Vec = network.endpoint_addr().ip_addrs() .map(|s| s.to_string()).collect(); let result = network.send_social_checkin( &route.node_id, &our_addrs, &[], ).await; match result { Ok(reply) => { let s = storage.get().await; let addrs: Vec = reply.addresses.iter() .filter_map(|a| a.parse().ok()).collect(); let _ = s.touch_social_route_connect( &reply.node_id, &addrs, ReachMethod::Direct, ); let _ = s.update_social_route_peer_addrs( &reply.node_id, &reply.peer_addresses, ); } Err(e) => { tracing::debug!( peer = hex::encode(route.node_id), error = %e, "Social checkin failed" ); } } } } }) } /// Register with all connected anchor peers. Returns count registered. pub async fn register_with_anchors(&self) -> usize { let conns = self.network.connection_info().await; let mut count = 0; for (nid, _, _) in &conns { if self.network.is_anchor_peer(nid).await { match self.network.send_anchor_register(nid).await { Ok(()) => { count += 1; info!(anchor = hex::encode(nid), "Registered with anchor"); } Err(e) => debug!(error = %e, anchor = hex::encode(nid), "Anchor register failed"), } } } count } /// Start anchor register cycle: periodically re-register with anchors and request referrals /// when connection count is low. pub fn start_anchor_register_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); let node_id = self.node_id; let alog = Arc::clone(&self.activity_log); let timer = Arc::clone(&self.last_anchor_register_ms); tokio::spawn(async move { let log_evt = |level: ActivityLevel, cat: ActivityCategory, msg: String, peer: Option| { if let Ok(mut log) = alog.try_lock() { log.log(level, cat, msg, peer); } }; let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; timer.store(now, AtomicOrdering::Relaxed); // Re-register with connected anchors (mesh + session) let conns = network.connection_info().await; let session_peers = network.session_peer_ids().await; let mut registered_anchors = std::collections::HashSet::new(); // Mesh-connected anchors for (nid, _, _) in &conns { if network.is_anchor_peer(nid).await { match network.send_anchor_register(nid).await { Ok(()) => { log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Re-registered with anchor".into(), Some(*nid)); registered_anchors.insert(*nid); } Err(e) => { tracing::debug!(error = %e, "Anchor re-register failed"); log_evt(ActivityLevel::Warn, ActivityCategory::Anchor, format!("Re-register failed: {}", e), Some(*nid)); } } } } // Session-connected anchors (e.g. anchor with full mesh) for nid in &session_peers { if registered_anchors.contains(nid) { continue; } if network.is_anchor_peer(nid).await { match network.send_anchor_register(nid).await { Ok(()) => { log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Re-registered with anchor (session)".into(), Some(*nid)); } Err(e) => { tracing::debug!(error = %e, "Anchor session re-register failed"); } } } } // If few connections, try requesting referrals from known anchors let conn_count = network.connection_count().await; if conn_count < 10 { log_evt(ActivityLevel::Info, ActivityCategory::Anchor, format!("Low connections ({}), requesting referrals", conn_count), None); let known = { let s = storage.get().await; s.list_known_anchors().unwrap_or_default() }; for (anchor_nid, anchor_addrs) in known { if anchor_nid == node_id { continue; } // Connect if not already connected (mesh or session) if !network.is_peer_connected_or_session(&anchor_nid).await { let endpoint_id = match iroh::EndpointId::from_bytes(&anchor_nid) { Ok(eid) => eid, Err(_) => continue, }; let mut addr = iroh::EndpointAddr::from(endpoint_id); for sa in &anchor_addrs { addr = addr.with_ip_addr(*sa); } if let Err(e) = network.connect_to_anchor(anchor_nid, addr).await { tracing::debug!(error = %e, "Anchor cycle: connect failed"); continue; } } match network.request_anchor_referrals(&anchor_nid).await { Ok(referrals) => { for referral in referrals { if referral.node_id == node_id { continue; } if let Some(addr_str) = referral.addresses.first() { let connect_str = format!( "{}@{}", hex::encode(referral.node_id), addr_str, ); if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) { match network.connect_to_peer(rid, raddr).await { Ok(()) => { tracing::info!(peer = hex::encode(rid), "Anchor cycle: connected to referred peer"); log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Connected to referred peer".into(), Some(rid)); } Err(_) => { match network.connect_via_introduction(rid, anchor_nid).await { Ok(()) => { tracing::info!(peer = hex::encode(rid), "Anchor cycle: connected via hole punch"); log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Connected via hole punch".into(), Some(rid)); } Err(e) => { tracing::debug!(error = %e, peer = hex::encode(rid), "Anchor cycle: hole punch failed"); log_evt(ActivityLevel::Warn, ActivityCategory::Anchor, format!("Hole punch failed: {}", e), Some(rid)); } } } } } } } } Err(e) => tracing::debug!(error = %e, "Anchor cycle: referral request failed"), } } } // Anchor self-verification probe { let probe_due = network.conn_handle().probe_due().await; if probe_due { log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Initiating anchor self-verification probe".into(), None); match network.conn_handle().initiate_anchor_probe().await { Ok(true) => {}, // success already logged inside Ok(false) => {}, // failure already logged inside Err(e) => { tracing::debug!(error = %e, "Anchor probe error"); } } } } } }) } /// Start bootstrap connectivity check: 24 hours after startup, verify the bootstrap /// anchor is within our network knowledge (N1/N2/N3). If not, we may be in an isolated /// segment — reconnect to bootstrap and request referrals to bridge back. pub fn start_bootstrap_connectivity_check(self: &Arc) -> tokio::task::JoinHandle<()> { let node = Arc::clone(self); tokio::spawn(async move { // Wait 24 hours before first check tokio::time::sleep(std::time::Duration::from_secs(24 * 60 * 60)).await; let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 60 * 60)); loop { interval.tick().await; // Parse bootstrap anchor NodeId let bootstrap_nid = match crate::parse_connect_string(DEFAULT_ANCHOR) { Ok((nid, _)) => nid, Err(_) => continue, }; // Skip if we ARE the bootstrap if bootstrap_nid == node.node_id { continue; } // Check if bootstrap is in N1 (mesh), N2, or N3 let is_reachable = { let connected = node.network.is_connected(&bootstrap_nid).await; if connected { true } else { let storage = node.storage.get().await; let in_n2 = storage.find_in_n2(&bootstrap_nid).unwrap_or_default(); if !in_n2.is_empty() { true } else { let in_n3 = storage.find_in_n3(&bootstrap_nid).unwrap_or_default(); !in_n3.is_empty() } } }; if is_reachable { tracing::debug!("Bootstrap connectivity check: bootstrap in reach, network OK"); continue; } // Bootstrap not in N1/N2/N3 — we may be isolated tracing::info!("Bootstrap connectivity check: bootstrap not in reach, reconnecting"); // Connect to bootstrap and request referrals if let Err(e) = node.connect_by_node_id(bootstrap_nid).await { tracing::warn!(error = %e, "Bootstrap connectivity: failed to connect"); continue; } // Report bootstrap in our N1 for 24 hours so peers learn about it node.network.conn_handle().add_sticky_n1(&bootstrap_nid, 24 * 60 * 60 * 1000); match node.network.request_anchor_referrals(&bootstrap_nid).await { Ok(referrals) => { tracing::info!(count = referrals.len(), "Bootstrap connectivity: got referrals"); for referral in referrals { if referral.node_id == node.node_id { continue; } if let Some(addr_str) = referral.addresses.first() { let connect_str = format!("{}@{}", hex::encode(referral.node_id), addr_str); if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) { let _ = node.network.connect_to_peer(rid, raddr).await; } } } } Err(e) => { tracing::warn!(error = %e, "Bootstrap connectivity: referral request failed"); } } } }) } /// Start CDN manifest refresh cycle: periodically ask upstream for newer manifests. /// Manifests older than `max_age_ms` are refreshed from their upstream source. pub fn start_manifest_refresh_cycle(&self, interval_secs: u64, max_age_ms: u64) -> tokio::task::JoinHandle<()> { let network = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; let cutoff = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 - max_age_ms; let stale = { let s = storage.get().await; s.get_stale_manifests(cutoff).unwrap_or_default() }; for (cid, upstream_nid, _upstream_addrs) in &stale { // Get current updated_at for this manifest let current_updated_at = { let s = storage.get().await; s.get_cdn_manifest(cid).ok().flatten() .and_then(|json| serde_json::from_str::(&json).ok()) .map(|m| m.updated_at) .unwrap_or(0) }; match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await { Ok(Some(cdn_manifest)) => { if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) { let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default(); let s = storage.get().await; let _ = s.store_cdn_manifest( cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at, ); // Relay to our downstream let downstream = s.get_blob_downstream(cid).unwrap_or_default(); drop(s); if !downstream.is_empty() { network.push_manifest_to_downstream(cid, &cdn_manifest).await; } tracing::debug!( cid = hex::encode(cid), "Refreshed stale manifest from upstream" ); } } Ok(None) => {} // No update available Err(e) => { tracing::debug!( cid = hex::encode(cid), upstream = hex::encode(upstream_nid), error = %e, "Manifest refresh from upstream failed" ); } } } } }) } /// Build our N+10:Addresses (our connected peers with their addresses). pub async fn build_peer_addresses(&self) -> Vec { let conns = self.network.connection_info().await; let storage = self.storage.get().await; let mut result = Vec::new(); for (nid, kind, _) in conns { if nid == self.node_id { continue; } // Prefer social peers if kind != PeerSlotKind::Local && result.len() >= 10 { continue; } let addrs: Vec = storage.get_peer_record(&nid) .ok() .flatten() .map(|r| r.addresses.iter().map(|a| a.to_string()).collect()) .unwrap_or_default(); result.push(PeerWithAddress { n: hex::encode(nid), a: addrs, }); if result.len() >= 10 { break; } } result } /// List all social routes (for CLI/Tauri display). pub async fn list_social_routes(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_social_routes() } // ---- Audience ---- pub async fn request_audience(&self, node_id: &NodeId) -> anyhow::Result<()> { { let storage = self.storage.get().await; storage.store_audience(node_id, AudienceDirection::Outbound, AudienceStatus::Pending)?; } // Send the request (persistent if available, ephemeral otherwise) if let Err(e) = self.network.send_audience_request(node_id).await { warn!(peer = hex::encode(node_id), error = %e, "Failed to send audience request"); } info!(peer = hex::encode(node_id), "Requested audience membership"); Ok(()) } pub async fn approve_audience(&self, node_id: &NodeId) -> anyhow::Result<()> { let connected = self.network.is_connected(node_id).await; { let storage = self.storage.get().await; storage.store_audience(node_id, AudienceDirection::Inbound, AudienceStatus::Approved)?; // Upsert social route (Audience or Mutual) let is_follow = storage.list_follows()?.contains(node_id); let relation = if is_follow { SocialRelation::Mutual } else { SocialRelation::Audience }; let addresses = storage.get_peer_record(node_id)? .map(|r| r.addresses).unwrap_or_default(); let peer_addresses = storage.build_peer_addresses_for(node_id)?; let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) .unwrap_or_default().as_millis() as u64; let preferred_tree = storage.build_preferred_tree_for(node_id).unwrap_or_default(); storage.upsert_social_route(&SocialRouteEntry { node_id: *node_id, addresses, peer_addresses, relation, status: if connected { SocialStatus::Online } else { SocialStatus::Disconnected }, last_connected_ms: 0, last_seen_ms: now, reach_method: ReachMethod::Direct, preferred_tree, })?; } // Send approval response (persistent if available, ephemeral otherwise) if let Err(e) = self.network.send_audience_response(node_id, true).await { warn!(peer = hex::encode(node_id), error = %e, "Failed to send audience approval"); } info!(peer = hex::encode(node_id), "Approved audience request"); Ok(()) } pub async fn deny_audience(&self, node_id: &NodeId) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.store_audience(node_id, AudienceDirection::Inbound, AudienceStatus::Denied)?; Ok(()) } pub async fn remove_audience(&self, node_id: &NodeId) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.remove_audience(node_id, AudienceDirection::Inbound)?; // Downgrade or remove social route let is_follow = storage.list_follows()?.contains(node_id); if is_follow { if let Some(mut route) = storage.get_social_route(node_id)? { route.relation = SocialRelation::Follow; storage.upsert_social_route(&route)?; } } else { storage.remove_social_route(node_id)?; } Ok(()) } pub async fn list_audience_members(&self) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_audience_members() } pub async fn list_audience( &self, direction: AudienceDirection, status: Option, ) -> anyhow::Result> { let storage = self.storage.get().await; storage.list_audience(direction, status) } // ---- Blob Eviction ---- /// Compute priority score for a blob. Higher score = keep longer. pub fn compute_blob_priority( &self, candidate: &crate::storage::EvictionCandidate, follows: &[NodeId], audience_members: &[NodeId], now_ms: u64, ) -> f64 { compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms) } /// Delete a blob with CDN notifications to upstream/downstream. pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> { // Gather CDN peers before cleanup let (downstream, upstream) = { let storage = self.storage.get().await; let ds = storage.get_blob_downstream(cid).unwrap_or_default(); let up = storage.get_blob_upstream(cid).ok().flatten(); (ds, up) }; // Send CDN delete notices self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await; // Clean up local storage { let storage = self.storage.get().await; storage.cleanup_cdn_for_blob(cid)?; storage.remove_blob(cid)?; } // Delete from filesystem let _ = self.blob_store.delete(cid); Ok(()) } /// Evict lowest-priority blobs until total storage is under max_bytes. pub async fn evict_blobs(&self, max_bytes: u64) -> anyhow::Result { let total = { let storage = self.storage.get().await; storage.total_blob_bytes()? }; if total <= max_bytes { return Ok(0); } let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; // 1-hour staleness for replica counts let staleness_ms = 3600 * 1000; let (candidates, follows, audience_members) = { let storage = self.storage.get().await; let candidates = storage.get_eviction_candidates(staleness_ms)?; let follows = storage.list_follows().unwrap_or_default(); let audience = storage.list_audience_members().unwrap_or_default(); (candidates, follows, audience) }; // Score and sort ascending (lowest priority first) let mut scored: Vec<(f64, &crate::storage::EvictionCandidate)> = candidates .iter() .map(|c| (self.compute_blob_priority(c, &follows, &audience_members, now), c)) .collect(); scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)); let mut bytes_freed: u64 = 0; let target_free = total - max_bytes; let mut evicted = 0; for (score, candidate) in &scored { if bytes_freed >= target_free { break; } if let Err(e) = self.delete_blob_with_cdn_notify(&candidate.cid).await { warn!(cid = hex::encode(candidate.cid), error = %e, "Failed to evict blob"); continue; } bytes_freed += candidate.size_bytes; evicted += 1; info!( cid = hex::encode(candidate.cid), score = score, size = candidate.size_bytes, "Evicted blob" ); } info!(evicted, bytes_freed, "Blob eviction complete"); Ok(evicted) } /// Start a periodic eviction cycle. pub fn start_eviction_cycle( node: Arc, interval_secs: u64, max_bytes: u64, ) -> tokio::task::JoinHandle<()> where Self: Send + Sync + 'static, { tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; match node.evict_blobs(max_bytes).await { Ok(0) => {} Ok(n) => info!(evicted = n, "Eviction cycle complete"), Err(e) => warn!(error = %e, "Eviction cycle failed"), } } }) } /// Start UPnP lease renewal cycle. Renews every lease_secs/2. /// On 3 consecutive failures: clears is_anchor and logs a warning. pub fn start_upnp_renewal_cycle(&self) -> Option> { let mapping = self.network.upnp_mapping()?; let local_port = mapping.local_port; let external_port = mapping.external_addr.port(); let interval_secs = (mapping.lease_secs / 2) as u64; let network = Arc::clone(&self.network); let alog = Arc::clone(&self.activity_log); Some(tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); let mut consecutive_failures: u32 = 0; loop { interval.tick().await; if crate::upnp::renew_upnp_mapping(local_port, external_port).await { consecutive_failures = 0; debug!("UPnP: lease renewed (port {})", external_port); } else { consecutive_failures += 1; warn!("UPnP: renewal failed ({}/3)", consecutive_failures); if consecutive_failures >= 3 { network.clear_anchor(); if let Ok(mut log) = alog.try_lock() { log.log( ActivityLevel::Warn, ActivityCategory::Connection, "UPnP lease lost after 3 renewal failures, auto-anchor disabled".into(), None, ); } warn!("UPnP: 3 consecutive renewal failures, auto-anchor disabled"); return; // stop the cycle } } } })) } // --- HTTP Post Delivery --- /// Start the HTTP server for serving public posts to browsers. /// Only starts if this node is publicly TCP-reachable. pub fn start_http_server(&self) -> Option> { if !self.network.is_http_capable() { debug!("HTTP server not started: node is not publicly TCP-reachable"); return None; } let port = self.network.bound_port(); if port == 0 { return None; } let storage = Arc::clone(&self.storage); let blob_store = Arc::clone(&self.blob_store); let downstream_addrs = Arc::new(tokio::sync::Mutex::new( std::collections::HashMap::<[u8; 32], Vec>::new(), )); // Advertise HTTP capability to peers let http_addr = self.network.http_addr(); self.network.conn_handle().set_http_info(true, http_addr.clone()); // Also update the ConnectionManager's fields for payload construction { let rt = tokio::runtime::Handle::current(); let conn_mgr = Arc::clone(&self.network.conn_mgr_arc()); rt.spawn(async move { let mut cm = conn_mgr.lock().await; cm.http_capable = true; cm.http_addr = http_addr; }); } info!("Starting HTTP server on TCP port {}", port); Some(tokio::spawn(async move { if let Err(e) = crate::http::run_http_server(port, storage, blob_store, downstream_addrs).await { warn!("HTTP server stopped: {}", e); } })) } /// Start the web redirect handler (itsgoin.net share link resolution). pub fn start_web_handler(self: &Arc, port: u16) -> tokio::task::JoinHandle<()> { let node = Arc::clone(self); info!("Starting web redirect handler on port {}", port); tokio::spawn(async move { if let Err(e) = crate::web::run_web_handler(port, node).await { warn!("Web redirect handler stopped: {}", e); } }) } /// Start UPnP TCP lease renewal cycle alongside the UDP renewal. pub fn start_upnp_tcp_renewal_cycle(&self) -> Option> { if !self.network.has_upnp_tcp() { return None; } let mapping = self.network.upnp_mapping()?; let local_port = mapping.local_port; let external_port = mapping.external_addr.port(); let interval_secs = (mapping.lease_secs / 2) as u64; Some(tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; if !crate::upnp::renew_upnp_tcp_mapping(local_port, external_port).await { warn!("UPnP: TCP lease renewal failed"); // Don't stop the cycle — TCP is best-effort } } })) } /// Generate a share link URL for a public post. /// Returns None if post is not public or not found. pub async fn generate_share_link(&self, post_id: &PostId) -> anyhow::Result> { // Look up the post to verify it's public and get the author let (post, visibility) = { let store = self.storage.get().await; match store.get_post_with_visibility(post_id)? { Some(pv) => pv, None => return Ok(None), } }; if !matches!(visibility, PostVisibility::Public) { return Ok(None); } let post_hex = hex::encode(post_id); let author_hex = hex::encode(post.author); Ok(Some(format!("https://itsgoin.net/p/{}/{}", post_hex, author_hex))) } // --- Engagement API --- /// React to a post with an emoji. If `private`, encrypts payload for post author only. pub async fn react_to_post( &self, post_id: PostId, emoji: String, private: bool, ) -> anyhow::Result { let our_node_id = self.node_id; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; // For private reactions, look up the post author and encrypt let encrypted_payload = if private { let storage = self.storage.get().await; let post = storage.get_post(&post_id)? .ok_or_else(|| anyhow::anyhow!("post not found"))?; drop(storage); let seed = self.secret_seed; let payload_json = serde_json::json!({ "emoji": emoji, "reactor": hex::encode(our_node_id), "timestamp_ms": now, }).to_string(); Some(crate::crypto::encrypt_private_reaction(&seed, &post.author, &payload_json)?) } else { None }; let signature = crate::crypto::sign_reaction(&self.secret_seed, &our_node_id, &post_id, &emoji, now); let reaction = crate::types::Reaction { reactor: our_node_id, emoji: emoji.clone(), post_id, timestamp_ms: now, encrypted_payload, deleted_at: None, signature, }; // Store locally let storage = self.storage.get().await; storage.store_reaction(&reaction)?; drop(storage); // Propagate via BlobHeaderDiff to downstream + upstream { let network = &self.network; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: our_node_id, ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())], timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; // Also send to all upstreams (toward author) — Phase 6 multi-upstream let upstreams = { let storage = self.storage.get().await; storage.get_post_upstreams(&post_id).unwrap_or_default() }; for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } Ok(reaction) } /// Remove a reaction from a post. pub async fn remove_reaction(&self, post_id: PostId, emoji: String) -> anyhow::Result<()> { let our_node_id = self.node_id; let storage = self.storage.get().await; storage.remove_reaction(&our_node_id, &post_id, &emoji)?; drop(storage); // Propagate removal { let network = &self.network; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: our_node_id, ops: vec![crate::types::BlobHeaderDiffOp::RemoveReaction { reactor: our_node_id, emoji, post_id, }], timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; } Ok(()) } /// Get all reactions for a post. Decrypts private reactions if we're the post author. pub async fn get_reactions(&self, post_id: PostId) -> anyhow::Result> { let storage = self.storage.get().await; let reactions = storage.get_reactions(&post_id)?; let post_info = storage.get_post(&post_id)?; drop(storage); let our_node_id = self.node_id; // If we're the author, decrypt private reactions if let Some(post) = post_info { if post.author == our_node_id { let seed = self.secret_seed; return Ok(reactions.into_iter().map(|mut r| { if let Some(ref enc) = r.encrypted_payload { if let Ok(decrypted) = crate::crypto::decrypt_private_reaction(&seed, &r.reactor, enc) { r.encrypted_payload = Some(decrypted); } } r }).collect()); } } Ok(reactions) } /// Get reaction counts grouped by emoji for a post. pub async fn get_reaction_counts(&self, post_id: PostId) -> anyhow::Result> { let our_node_id = self.node_id; let storage = self.storage.get().await; let counts = storage.get_reaction_counts(&post_id, &our_node_id)?; Ok(counts) } /// Add a comment to a post (signed with our key). pub async fn comment_on_post( &self, post_id: PostId, content: String, ) -> anyhow::Result { let our_node_id = self.node_id; let seed = self.secret_seed; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let signature = crate::crypto::sign_comment(&seed, &our_node_id, &post_id, &content, now); let comment = crate::types::InlineComment { author: our_node_id, post_id, content, timestamp_ms: now, signature, deleted_at: None, }; let storage = self.storage.get().await; storage.store_comment(&comment)?; drop(storage); // Propagate via BlobHeaderDiff to downstream + upstream { let network = &self.network; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: our_node_id, ops: vec![crate::types::BlobHeaderDiffOp::AddComment(comment.clone())], timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; // Also send to all upstreams (toward author) — Phase 6 multi-upstream let upstreams = { let storage = self.storage.get().await; storage.get_post_upstreams(&post_id).unwrap_or_default() }; for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } Ok(comment) } /// Edit one of your own comments on a post. pub async fn edit_comment( &self, post_id: PostId, timestamp_ms: u64, new_content: String, ) -> anyhow::Result<()> { let our_node_id = self.node_id; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let storage = self.storage.get().await; storage.edit_comment(&our_node_id, &post_id, timestamp_ms, &new_content)?; drop(storage); // Propagate via BlobHeaderDiff { let network = &self.network; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: our_node_id, ops: vec![crate::types::BlobHeaderDiffOp::EditComment { author: our_node_id, post_id, timestamp_ms, new_content, }], timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; // Phase 6: send to all upstreams let upstreams = { let storage = self.storage.get().await; storage.get_post_upstreams(&post_id).unwrap_or_default() }; for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } Ok(()) } /// Delete one of your own comments on a post. pub async fn delete_comment( &self, post_id: PostId, timestamp_ms: u64, ) -> anyhow::Result<()> { let our_node_id = self.node_id; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let storage = self.storage.get().await; storage.delete_comment(&our_node_id, &post_id, timestamp_ms)?; drop(storage); // Propagate via BlobHeaderDiff { let network = &self.network; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: our_node_id, ops: vec![crate::types::BlobHeaderDiffOp::DeleteComment { author: our_node_id, post_id, timestamp_ms, }], timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; // Phase 6: send to all upstreams let upstreams = { let storage = self.storage.get().await; storage.get_post_upstreams(&post_id).unwrap_or_default() }; for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } Ok(()) } /// Get all comments for a post. pub async fn get_comments(&self, post_id: PostId) -> anyhow::Result> { let storage = self.storage.get().await; let comments = storage.get_comments(&post_id)?; Ok(comments) } /// Set the comment/reaction policy for a post (author-only). pub async fn set_comment_policy( &self, post_id: PostId, policy: crate::types::CommentPolicy, ) -> anyhow::Result<()> { let storage = self.storage.get().await; storage.set_comment_policy(&post_id, &policy)?; drop(storage); // Propagate policy change { let network = &self.network; let our_node_id = self.node_id; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: our_node_id, ops: vec![crate::types::BlobHeaderDiffOp::SetPolicy(policy)], timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; } Ok(()) } /// Get the comment policy for a post. pub async fn get_comment_policy(&self, post_id: PostId) -> anyhow::Result> { let storage = self.storage.get().await; let policy = storage.get_comment_policy(&post_id)?; Ok(policy) } /// Get the full comment thread for a post (inline comments + split posts merged). pub async fn get_comment_thread(&self, post_id: PostId) -> anyhow::Result> { let storage = self.storage.get().await; // 1. Inline comments let mut comments = storage.get_comments(&post_id)?; // 2. Split posts (thread children) let children = storage.get_thread_children(&post_id)?; for child_id in children { if let Ok(Some(child_post)) = storage.get_post(&child_id) { // Split posts store comments as JSON in content if let Ok(split_comments) = serde_json::from_str::>(&child_post.content) { comments.extend(split_comments); } } } // Dedup by (author, timestamp_ms) and sort let mut seen = std::collections::HashSet::new(); comments.retain(|c| seen.insert((c.author, c.timestamp_ms))); comments.sort_by_key(|c| c.timestamp_ms); Ok(comments) } // --- Encrypted receipt/comment slot methods --- /// Unwrap the CEK for a post we are a participant of, returning (cek, sorted_participants). /// Returns None if this is a public post or we cannot decrypt. async fn get_post_cek_and_participants( &self, post_id: &PostId, ) -> anyhow::Result)>> { let storage = self.storage.get().await; let (post, visibility) = match storage.get_post_with_visibility(post_id)? { Some(pv) => pv, None => return Ok(None), }; drop(storage); match &visibility { PostVisibility::Public => Ok(None), PostVisibility::Encrypted { recipients } => { let cek = crypto::unwrap_cek_for_recipient( &self.secret_seed, &self.node_id, &post.author, recipients, )?; match cek { Some(cek) => { let mut participants: Vec = recipients.iter().map(|wk| wk.recipient).collect(); participants.sort(); participants.dedup(); Ok(Some((cek, participants))) } None => Ok(None), } } PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek } => { let storage = self.storage.get().await; let group_seeds = storage.get_all_group_seeds_map().unwrap_or_default(); let group_key_record = storage.get_group_key(group_id)?; let members = if let Some(ref gk) = group_key_record { storage.get_circle_members(&gk.circle_name).unwrap_or_default() } else { vec![] }; drop(storage); if let Some((seed, pubkey)) = group_seeds.get(&(*group_id, *epoch)) { let cek = crypto::unwrap_group_cek(seed, pubkey, wrapped_cek)?; let mut participants: Vec = members; // Ensure the author is included if !participants.contains(&post.author) { participants.push(post.author); } participants.sort(); participants.dedup(); Ok(Some((cek, participants))) } else { Ok(None) } } } } /// Write our receipt slot for an encrypted post. /// `state` is the receipt state, `emoji` is optional (only used when state == Reacted). pub async fn write_receipt_slot( &self, post_id: PostId, state: crate::types::ReceiptState, emoji: Option, ) -> anyhow::Result<()> { let (cek, participants) = self.get_post_cek_and_participants(&post_id).await? .ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?; let slot_key = crypto::derive_slot_key(&cek); // Find our slot index (sorted participant position) let our_slot = participants.iter().position(|nid| nid == &self.node_id) .ok_or_else(|| anyhow::anyhow!("our node_id not found in participants"))?; // Build plaintext: [1 byte state][8 bytes timestamp_ms][23 bytes emoji+padding] let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; let mut plaintext = [0u8; 32]; plaintext[0] = state as u8; plaintext[1..9].copy_from_slice(&now.to_le_bytes()); if let Some(ref emoji_str) = emoji { let emoji_bytes = emoji_str.as_bytes(); let copy_len = emoji_bytes.len().min(23); plaintext[9..9 + copy_len].copy_from_slice(&emoji_bytes[..copy_len]); } let encrypted = crypto::encrypt_slot(&plaintext, &slot_key)?; // Update the BlobHeader let storage = self.storage.get().await; let header = storage.get_blob_header(&post_id)?; let mut blob_header = if let Some((json, _ts)) = header { serde_json::from_str::(&json) .unwrap_or_else(|_| crate::types::BlobHeader { post_id, author: self.node_id, reactions: vec![], comments: vec![], policy: Default::default(), updated_at: now, thread_splits: vec![], receipt_slots: vec![], comment_slots: vec![], }) } else { crate::types::BlobHeader { post_id, author: self.node_id, reactions: vec![], comments: vec![], policy: Default::default(), updated_at: now, thread_splits: vec![], receipt_slots: vec![], comment_slots: vec![], } }; // Ensure enough slots exist while blob_header.receipt_slots.len() <= our_slot { blob_header.receipt_slots.push(crypto::random_slot_noise(64)); } blob_header.receipt_slots[our_slot] = encrypted.clone(); blob_header.updated_at = now; let header_json = serde_json::to_string(&blob_header)?; storage.store_blob_header(&post_id, &blob_header.author, &header_json, now)?; drop(storage); // Propagate via BlobHeaderDiff let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: self.node_id, ops: vec![crate::types::BlobHeaderDiffOp::WriteReceiptSlot { post_id, slot_index: our_slot as u32, data: encrypted, }], timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; // Phase 6: send to all upstreams let upstreams = { let storage = self.storage.get().await; storage.get_post_upstreams(&post_id).unwrap_or_default() }; for (up, _prio) in upstreams { let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } Ok(()) } /// Write a private comment to an encrypted post's comment slot. pub async fn write_comment_slot( &self, post_id: PostId, content: String, ) -> anyhow::Result<()> { let (cek, _participants) = self.get_post_cek_and_participants(&post_id).await? .ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?; let slot_key = crypto::derive_slot_key(&cek); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() as u64; // Build plaintext: [32 bytes author_node_id][8 bytes timestamp_ms][216 bytes content+padding] let mut plaintext = [0u8; 256]; plaintext[..32].copy_from_slice(&self.node_id); plaintext[32..40].copy_from_slice(&now.to_le_bytes()); let content_bytes = content.as_bytes(); let copy_len = content_bytes.len().min(216); plaintext[40..40 + copy_len].copy_from_slice(&content_bytes[..copy_len]); let encrypted = crypto::encrypt_slot(&plaintext, &slot_key)?; // Find first available comment slot or add new ones let storage = self.storage.get().await; let header = storage.get_blob_header(&post_id)?; let mut blob_header = if let Some((json, _ts)) = header { serde_json::from_str::(&json) .unwrap_or_else(|_| crate::types::BlobHeader { post_id, author: self.node_id, reactions: vec![], comments: vec![], policy: Default::default(), updated_at: now, thread_splits: vec![], receipt_slots: vec![], comment_slots: vec![], }) } else { crate::types::BlobHeader { post_id, author: self.node_id, reactions: vec![], comments: vec![], policy: Default::default(), updated_at: now, thread_splits: vec![], receipt_slots: vec![], comment_slots: vec![], } }; // Try to find an empty slot by attempting decryption let mut target_index = None; for (i, slot) in blob_header.comment_slots.iter().enumerate() { if let Ok(decrypted) = crypto::decrypt_slot(slot, &slot_key) { // Check if all 256 plaintext bytes are zero (empty) if decrypted.len() == 256 && decrypted.iter().all(|&b| b == 0) { target_index = Some(i); break; } } else { // Cannot decrypt — could be random noise (empty), use it target_index = Some(i); break; } } let (slot_index, add_new) = if let Some(idx) = target_index { (idx, false) } else { // No available slots — add one let idx = blob_header.comment_slots.len(); blob_header.comment_slots.push(crypto::random_slot_noise(256)); (idx, true) }; blob_header.comment_slots[slot_index] = encrypted.clone(); blob_header.updated_at = now; let header_json = serde_json::to_string(&blob_header)?; storage.store_blob_header(&post_id, &blob_header.author, &header_json, now)?; drop(storage); // Propagate let op = if add_new { crate::types::BlobHeaderDiffOp::AddCommentSlots { post_id, count: 1, slots: vec![encrypted], } } else { crate::types::BlobHeaderDiffOp::WriteCommentSlot { post_id, slot_index: slot_index as u32, data: encrypted, } }; let diff = crate::protocol::BlobHeaderDiffPayload { post_id, author: self.node_id, ops: vec![op], timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; // Phase 6: send to all upstreams let upstreams = { let storage = self.storage.get().await; storage.get_post_upstreams(&post_id).unwrap_or_default() }; for (up, _prio) in upstreams { let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } Ok(()) } /// Read and decrypt all receipt slots for an encrypted post. pub async fn read_receipt_slots( &self, post_id: PostId, ) -> anyhow::Result> { let (cek, participants) = self.get_post_cek_and_participants(&post_id).await? .ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?; let slot_key = crypto::derive_slot_key(&cek); let storage = self.storage.get().await; let header = storage.get_blob_header(&post_id)?; drop(storage); let blob_header = match header { Some((json, _ts)) => serde_json::from_str::(&json)?, None => return Ok(vec![]), }; let mut results = Vec::new(); for (i, slot) in blob_header.receipt_slots.iter().enumerate() { let participant_id = participants.get(i).copied(); match crypto::decrypt_slot(slot, &slot_key) { Ok(plaintext) if plaintext.len() >= 9 => { let state = crate::types::ReceiptState::from_u8(plaintext[0]); let timestamp_ms = u64::from_le_bytes( plaintext[1..9].try_into().unwrap_or([0u8; 8]), ); let emoji = if state == crate::types::ReceiptState::Reacted && plaintext.len() >= 32 { let emoji_bytes = &plaintext[9..32]; let end = emoji_bytes.iter().position(|&b| b == 0).unwrap_or(23); if end > 0 { String::from_utf8(emoji_bytes[..end].to_vec()).ok() } else { None } } else { None }; results.push(crate::types::ReceiptSlotData { slot_index: i as u32, node_id: participant_id, state, timestamp_ms, emoji, }); } _ => { // Could not decrypt — noise/uninitialized slot results.push(crate::types::ReceiptSlotData { slot_index: i as u32, node_id: participant_id, state: crate::types::ReceiptState::Empty, timestamp_ms: 0, emoji: None, }); } } } Ok(results) } /// Read and decrypt all comment slots for an encrypted post. pub async fn read_comment_slots( &self, post_id: PostId, ) -> anyhow::Result> { let (cek, _participants) = self.get_post_cek_and_participants(&post_id).await? .ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?; let slot_key = crypto::derive_slot_key(&cek); let storage = self.storage.get().await; let header = storage.get_blob_header(&post_id)?; drop(storage); let blob_header = match header { Some((json, _ts)) => serde_json::from_str::(&json)?, None => return Ok(vec![]), }; let mut results = Vec::new(); for (i, slot) in blob_header.comment_slots.iter().enumerate() { match crypto::decrypt_slot(slot, &slot_key) { Ok(plaintext) if plaintext.len() >= 40 => { // Check if it's an empty slot (all zeros) if plaintext.iter().all(|&b| b == 0) { continue; } let mut author = [0u8; 32]; author.copy_from_slice(&plaintext[..32]); // Skip if author is all zeros (empty) if author == [0u8; 32] { continue; } let timestamp_ms = u64::from_le_bytes( plaintext[32..40].try_into().unwrap_or([0u8; 8]), ); let content_bytes = &plaintext[40..]; let end = content_bytes.iter().position(|&b| b == 0).unwrap_or(content_bytes.len()); let content = String::from_utf8_lossy(&content_bytes[..end]).to_string(); results.push(crate::types::CommentSlotData { slot_index: i as u32, author, timestamp_ms, content, }); } _ => { // Cannot decrypt or too short — skip } } } results.sort_by_key(|c| c.timestamp_ms); Ok(results) } } pub struct NodeStats { pub post_count: usize, pub peer_count: usize, pub follow_count: usize, } /// Standalone priority scoring for testing. /// score = pin_boost + (relationship × heart_recency × freshness / (peer_copies + 1)) pub fn compute_blob_priority_standalone( candidate: &crate::storage::EvictionCandidate, our_node_id: &NodeId, follows: &[NodeId], audience_members: &[NodeId], now_ms: u64, ) -> f64 { let pin_boost = if candidate.pinned { 1000.0 } else { 0.0 }; // Share-link popularity boost: high downstream count indicates the blob // has been shared via share links and is actively being served to others. let share_boost = if candidate.downstream_count >= 3 { 100.0 } else if candidate.downstream_count >= 1 { 50.0 * candidate.downstream_count as f64 / 3.0 } else { 0.0 }; let relationship = if candidate.author == *our_node_id { 5.0 } else if follows.contains(&candidate.author) && audience_members.contains(&candidate.author) { 3.0 } else if follows.contains(&candidate.author) { 2.0 } else if audience_members.contains(&candidate.author) { 1.0 } else { 0.1 }; let thirty_days_ms = 30u64 * 24 * 3600 * 1000; let access_age_ms = now_ms.saturating_sub(candidate.last_accessed_at); let heart_recency = (1.0 - (access_age_ms as f64 / thirty_days_ms as f64)).max(0.0); let post_age_days = now_ms.saturating_sub(candidate.created_at) as f64 / (24.0 * 3600.0 * 1000.0); let freshness = 1.0 / (1.0 + post_age_days); let copies_factor = 1.0 / (candidate.peer_copies as f64 + 1.0); pin_boost + share_boost + (relationship * heart_recency * freshness * copies_factor) } // --- Active Replication Cycle --- impl Node { /// Start the active replication cycle: periodically ask peers to hold our /// under-replicated recent content. Only Available/Persistent devices initiate. pub fn start_replication_cycle(self: &Arc, interval_secs: u64) -> tokio::task::JoinHandle<()> { let node = Arc::clone(self); tokio::spawn(async move { // Wait 2 minutes before first cycle (let connections establish) tokio::time::sleep(std::time::Duration::from_secs(120)).await; let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { interval.tick().await; node.run_replication_check().await; } }) } /// Single replication check iteration. async fn run_replication_check(&self) { // All devices initiate replication — phones need their content replicated // before they go to sleep. // 1. Get own posts < 72h old let seventy_two_hours_ms = 72u64 * 3600 * 1000; let now_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let since_ms = now_ms.saturating_sub(seventy_two_hours_ms); // Get connected peers first (no storage lock needed) let connected = self.network.connected_peers().await; if connected.is_empty() { debug!("No peers for replication"); return; } // Priority: Available (desktops) > Persistent (anchors) > Intermittent (phones) let role_priority = |role: &DeviceRole| -> u16 { match role { DeviceRole::Available => 300, // desktops — best replication targets DeviceRole::Persistent => 200, // anchors — good but save for web DeviceRole::Intermittent => 100, // phones — last resort but still useful } }; // Single lock: get under-replicated posts AND peer roles/pressure let (under_replicated, suitable_peers) = { let storage = self.storage.get().await; let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) { Ok(ids) => ids, Err(e) => { debug!(error = %e, "Replication: failed to get own recent posts"); return; } }; // Filter to under-replicated (< 2 downstream) let mut needs_replication = Vec::new(); for pid in &recent_ids { match storage.get_post_downstream_count(pid) { Ok(count) if count < 2 => { needs_replication.push(*pid); } _ => {} } } // Get peer roles + cache pressure in same lock let mut candidates = Vec::new(); for peer_id in &connected { if *peer_id == self.node_id { continue; } let role_str = storage.get_peer_device_role(peer_id) .ok() .flatten() .unwrap_or_default(); let role = DeviceRole::from_str_label(&role_str); let pressure = storage.get_peer_cache_pressure(peer_id) .ok() .flatten() .unwrap_or(128) as u16; // Combined score: role priority + cache pressure let score = role_priority(&role) + pressure; candidates.push((*peer_id, score)); } (needs_replication, candidates) }; // If none need replication, skip silently if under_replicated.is_empty() { return; } if suitable_peers.is_empty() { debug!("No peers available for replication"); return; } // Pick best candidate (highest combined score) let best_peer = suitable_peers .iter() .max_by_key(|(_, score)| *score) .map(|(id, _)| *id) .unwrap(); // 7. Cap at 20 post IDs per request, one request per cycle let batch: Vec = under_replicated.into_iter().take(20).collect(); let batch_len = batch.len(); // 8. Send ReplicationRequest match self.network.send_replication_request(&best_peer, batch, 128).await { Ok(accepted) => { if accepted.is_empty() { debug!( peer = hex::encode(best_peer), "Replication: peer rejected all posts" ); } else { debug!( peer = hex::encode(best_peer), accepted = accepted.len(), requested = batch_len, "Replication: peer accepted posts" ); } } Err(e) => { debug!( peer = hex::encode(best_peer), error = %e, "Replication: request failed" ); } } } } #[cfg(test)] mod tests { use super::*; use crate::storage::EvictionCandidate; fn make_node_id(byte: u8) -> NodeId { [byte; 32] } fn make_candidate( author: NodeId, pinned: bool, created_at: u64, last_accessed_at: u64, peer_copies: u32, ) -> EvictionCandidate { EvictionCandidate { cid: [0u8; 32], post_id: [0u8; 32], author, size_bytes: 1000, created_at, last_accessed_at, pinned, peer_copies, downstream_count: 0, } } #[test] fn own_pinned_scores_highest() { let our_id = make_node_id(1); let now = 10_000_000_000u64; // ~115 days in ms let candidate = make_candidate(our_id, true, now - 86400_000, now, 0); let score = compute_blob_priority_standalone( &candidate, &our_id, &[], &[], now, ); // Should be 1000 + (5.0 * 1.0 * ~0.5 * 1.0) = ~1002.5 assert!(score > 1000.0, "own pinned should score >1000, got {}", score); } #[test] fn follow_recent_scores_higher_than_audience_stale() { let our_id = make_node_id(1); let follow_id = make_node_id(2); let audience_id = make_node_id(3); let now = 10_000_000_000u64; // Follow: recently accessed, 1-day-old post let follow_candidate = make_candidate(follow_id, false, now - 86400_000, now, 0); let follow_score = compute_blob_priority_standalone( &follow_candidate, &our_id, &[follow_id], &[], now, ); // Audience: stale (20 days no access), 10-day-old post, 5 copies let audience_candidate = make_candidate( audience_id, false, now - 10 * 86400_000, now - 20 * 86400_000, 5, ); let audience_score = compute_blob_priority_standalone( &audience_candidate, &our_id, &[], &[audience_id], now, ); assert!(follow_score > audience_score, "follow recent ({}) should score higher than audience stale ({})", follow_score, audience_score); } #[test] fn no_relationship_scores_near_zero() { let our_id = make_node_id(1); let stranger = make_node_id(99); let now = 10_000_000_000u64; // Stale stranger post with many copies let candidate = make_candidate( stranger, false, now - 30 * 86400_000, now - 30 * 86400_000, 10, ); let score = compute_blob_priority_standalone( &candidate, &our_id, &[], &[], now, ); // 0.1 relationship * 0.0 heart_recency * ~0.03 freshness / 11 = ~0 assert!(score < 0.01, "stranger stale should score near 0, got {}", score); } #[test] fn priority_ordering() { let our_id = make_node_id(1); let follow_id = make_node_id(2); let audience_id = make_node_id(3); let stranger_id = make_node_id(4); let now = 10_000_000_000u64; // Own pinned (highest) let own = make_candidate(our_id, true, now - 86400_000, now, 0); // Follow recent let follow = make_candidate(follow_id, false, now - 86400_000, now, 0); // Audience stale let audience = make_candidate(audience_id, false, now - 10 * 86400_000, now - 20 * 86400_000, 5); // Stranger let stranger = make_candidate(stranger_id, false, now - 30 * 86400_000, now - 30 * 86400_000, 10); let own_score = compute_blob_priority_standalone(&own, &our_id, &[follow_id], &[audience_id], now); let follow_score = compute_blob_priority_standalone(&follow, &our_id, &[follow_id], &[audience_id], now); let audience_score = compute_blob_priority_standalone(&audience, &our_id, &[follow_id], &[audience_id], now); let stranger_score = compute_blob_priority_standalone(&stranger, &our_id, &[follow_id], &[audience_id], now); assert!(own_score > follow_score, "own ({}) > follow ({})", own_score, follow_score); assert!(follow_score > audience_score, "follow ({}) > audience ({})", follow_score, audience_score); assert!(audience_score > stranger_score, "audience ({}) > stranger ({})", audience_score, stranger_score); } }