itsgoin/crates/core/src/node.rs
Scott Reimers b7f2d369fa v0.3.5: Encrypted receipt & comment slots, message delivery indicators
Encrypted slots in BlobHeader:
- Private posts get noise-prefilled receipt slots (64B, 1 per participant)
  and comment slots (256B, ceil(participants/3), expandable)
- Slot key derived from post CEK via BLAKE3 — only participants can read
- CDN relays propagate opaque encrypted bytes without decryption
- 3 new BlobHeaderDiffOps: WriteReceiptSlot, WriteCommentSlot, AddCommentSlots

Receipt system:
- States: empty(0), delivered(1), seen(2), reacted(3)
- Slot index = position in sorted participant NodeId list
- Author can pre-feed emoji reaction at creation time
- 6 new crypto tests for slot encrypt/decrypt/derivation

Node methods:
- write_receipt_slot, write_comment_slot with upstream+downstream propagation
- read_receipt_slots, read_comment_slots with CEK-based decryption
- get_post_cek_and_participants helper for both Encrypted and GroupEncrypted

IPC: write_message_receipt, write_message_comment, get_message_receipts,
     get_message_comments

Frontend:
- DM chat bubbles show delivery indicators (check → double → blue → emoji)
- Opening conversation auto-marks incoming messages as seen
- React button on messages with emoji prompt

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 14:15:33 -04:00

4145 lines
175 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use crate::activity::{ActivityCategory, ActivityEvent, ActivityLevel, ActivityLog};
use crate::blob::BlobStore;
use crate::content::compute_post_id;
use crate::crypto;
use crate::network::Network;
use crate::storage::Storage;
use crate::types::{
Attachment, AudienceDirection, AudienceRecord, AudienceStatus, Circle, DeleteRecord,
DeviceProfile, NodeId, PeerRecord, PeerSlotKind, PeerWithAddress, Post, PostId, PostVisibility,
PublicProfile, ReachMethod, RevocationMode, SessionReachMethod, SocialRelation,
SocialRouteEntry, SocialStatus, VisibilityIntent, VisibilityUpdate, WormResult,
};
/// Built-in default anchor — always available as a bootstrap fallback.
const DEFAULT_ANCHOR: &str = "17af141956ae0b50dc1cb9248cadf5fca371ea2d8531ac9add3c03caffc61441@itsgoin.net:4433";
/// A distsoc node: ties together identity, storage, and networking
pub struct Node {
pub data_dir: PathBuf,
pub storage: Arc<Mutex<Storage>>,
pub network: Arc<Network>,
pub node_id: NodeId,
pub blob_store: Arc<BlobStore>,
secret_seed: [u8; 32],
bootstrap_anchors: Vec<(NodeId, iroh::EndpointAddr)>,
#[allow(dead_code)]
profile: DeviceProfile,
pub activity_log: Arc<std::sync::Mutex<ActivityLog>>,
pub last_rebalance_ms: Arc<AtomicU64>,
pub last_anchor_register_ms: Arc<AtomicU64>,
}
impl Node {
/// Create or open a node in the given data directory (Desktop profile)
pub async fn open(data_dir: impl AsRef<Path>) -> anyhow::Result<Self> {
Self::open_with_bind(data_dir, None, DeviceProfile::Desktop).await
}
/// Create or open a mobile node in the given data directory
pub async fn open_mobile(data_dir: impl AsRef<Path>) -> anyhow::Result<Self> {
Self::open_with_bind(data_dir, None, DeviceProfile::Mobile).await
}
/// Create or open a node, optionally binding to a specific address
pub async fn open_with_bind(
data_dir: impl AsRef<Path>,
bind_addr: Option<SocketAddr>,
profile: DeviceProfile,
) -> anyhow::Result<Self> {
let data_dir = data_dir.as_ref().to_path_buf();
std::fs::create_dir_all(&data_dir)?;
// Load or generate identity key
let key_path = data_dir.join("identity.key");
let (secret_key, secret_seed) = if key_path.exists() {
let key_bytes = std::fs::read(&key_path)?;
let bytes: [u8; 32] = key_bytes
.try_into()
.map_err(|_| anyhow::anyhow!("invalid key file"))?;
(iroh::SecretKey::from_bytes(&bytes), bytes)
} else {
let key = iroh::SecretKey::generate(&mut rand::rng());
let seed = key.to_bytes();
std::fs::write(&key_path, seed)?;
info!("Generated new identity key");
(key, seed)
};
// Open storage
let db_path = data_dir.join("itsgoin.db");
let storage = Arc::new(Mutex::new(Storage::open(&db_path)?));
// Startup sweep: clear stale N2/N3 and mesh_peers from prior session
{
let s = storage.lock().await;
let n_cleared = s.clear_all_n2_n3().unwrap_or(0);
let m_cleared = s.clear_all_mesh_peers().unwrap_or(0);
if n_cleared > 0 || m_cleared > 0 {
info!(n2_n3 = n_cleared, mesh_peers = m_cleared, "Startup sweep: cleared stale entries");
}
}
// Open blob store
let blob_store = Arc::new(BlobStore::open(&data_dir)?);
// Activity log + timer atomics
let activity_log = Arc::new(std::sync::Mutex::new(ActivityLog::new()));
let last_rebalance_ms = Arc::new(AtomicU64::new(0));
let last_anchor_register_ms = Arc::new(AtomicU64::new(0));
// Start network (v2: single ALPN, connection manager)
let network = Arc::new(
Network::new(secret_key, Arc::clone(&storage), bind_addr, secret_seed, Arc::clone(&blob_store), profile, Arc::clone(&activity_log)).await?,
);
let node_id = network.node_id_bytes();
// Auto-follow ourselves so our own posts show in the feed
{
let s = storage.lock().await;
s.add_follow(&node_id)?;
}
// Bootstrap: if peers table is empty, try bootstrap.json then default anchor
{
let s = storage.lock().await;
let has_peers = s.has_peers()?;
drop(s);
if !has_peers {
let mut entries = Vec::new();
let bootstrap_path = data_dir.join("bootstrap.json");
if bootstrap_path.exists() {
info!("Loading bootstrap peers from {:?}", bootstrap_path);
if let Ok(data) = std::fs::read_to_string(&bootstrap_path) {
if let Ok(file_entries) = serde_json::from_str::<Vec<String>>(&data) {
entries.extend(file_entries);
}
}
}
let default = DEFAULT_ANCHOR.to_string();
if !entries.contains(&default) {
entries.push(default);
}
for entry in entries {
match crate::parse_connect_string(&entry) {
Ok((nid, addr)) => {
if nid == node_id {
continue;
}
info!(peer = hex::encode(nid), "Bootstrap: connecting to peer");
let ip_addrs: Vec<_> = addr.ip_addrs().copied().collect();
{
let s = storage.lock().await;
if ip_addrs.is_empty() {
let _ = s.add_peer(&nid);
} else {
let _ = s.upsert_peer(&nid, &ip_addrs, None);
}
// Mark as anchor — bootstrap peers are infrastructure, not social follows
let _ = s.set_peer_anchor(&nid, true);
}
// Connect persistently
match network.connect_to_peer(nid, addr).await {
Ok(()) => {
info!(peer = hex::encode(nid), "Bootstrap: connected");
// Pull posts from the bootstrap peer
match network.pull_from_all().await {
Ok(stats) => {
info!(
"Bootstrap pull: {} posts from {} peers",
stats.posts_received, stats.peers_pulled
);
}
Err(e) => warn!(error = %e, "Bootstrap pull failed"),
}
// Always store anchor in known_anchors (even before referrals)
// so the periodic cycle can re-register and request referrals later
{
let s = storage.lock().await;
let anchor_addrs: Vec<std::net::SocketAddr> = s.get_peer_record(&nid)
.ok().flatten()
.map(|r| r.addresses).unwrap_or_default();
if !anchor_addrs.is_empty() {
let _ = s.upsert_known_anchor(&nid, &anchor_addrs);
} else if !ip_addrs.is_empty() {
let _ = s.upsert_known_anchor(&nid, &ip_addrs);
}
}
// Request referrals from anchor (10s timeout)
match tokio::time::timeout(std::time::Duration::from_secs(10), network.request_anchor_referrals(&nid)).await {
Ok(Ok(referrals)) if !referrals.is_empty() => {
info!(count = referrals.len(), "Bootstrap: got anchor referrals");
// Spawn referral connections in background — don't block startup
let net = Arc::clone(&network);
let my_id = node_id;
let anchor = nid;
tokio::spawn(async move {
for referral in referrals {
if referral.node_id == my_id {
continue;
}
if let Some(addr_str) = referral.addresses.first() {
let connect_str = format!(
"{}@{}",
hex::encode(referral.node_id),
addr_str,
);
if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) {
let connect_fut = async {
match net.connect_to_peer(rid, raddr).await {
Ok(()) => { info!(peer = hex::encode(rid), "Connected to referred peer"); Ok(()) },
Err(e) => {
debug!(error = %e, peer = hex::encode(rid), "One-sided connect failed, requesting introduction from anchor");
match net.connect_via_introduction(rid, anchor).await {
Ok(()) => { info!(peer = hex::encode(rid), "Connected to referred peer via hole punch"); Ok(()) },
Err(e2) => Err(e2),
}
}
}
};
match tokio::time::timeout(std::time::Duration::from_secs(15), connect_fut).await {
Ok(Ok(())) => {},
Ok(Err(e)) => debug!(error = %e, peer = hex::encode(rid), "Bootstrap referral connect failed"),
Err(_) => debug!(peer = hex::encode(rid), "Bootstrap referral connect timed out"),
}
}
}
}
net.notify_growth().await;
});
}
Ok(Ok(_)) => debug!("Bootstrap: no referrals from anchor (first to register)"),
Ok(Err(e)) => debug!(error = %e, "Bootstrap: referral request failed"),
Err(_) => debug!("Bootstrap: referral request timed out"),
}
break;
}
Err(e) => {
warn!(error = %e, "Bootstrap peer failed, trying next");
}
}
}
Err(e) => {
warn!(entry = %entry, error = %e, "Invalid bootstrap entry");
}
}
}
}
}
// Load bootstrap anchors: anchors.json + built-in default
let mut bootstrap_anchors = Vec::new();
let mut anchor_ids = std::collections::HashSet::new();
let anchors_path = data_dir.join("anchors.json");
if anchors_path.exists() {
if let Ok(data) = std::fs::read_to_string(&anchors_path) {
if let Ok(entries) = serde_json::from_str::<Vec<String>>(&data) {
for entry in entries {
match crate::parse_connect_string(&entry) {
Ok((nid, addr)) => {
info!(peer = hex::encode(nid), "Loaded bootstrap anchor");
anchor_ids.insert(nid);
bootstrap_anchors.push((nid, addr));
}
Err(e) => {
warn!(entry = %entry, error = %e, "Invalid bootstrap anchor entry");
}
}
}
}
}
}
if let Ok((nid, addr)) = crate::parse_connect_string(DEFAULT_ANCHOR) {
if nid != node_id && !anchor_ids.contains(&nid) {
info!("Including built-in default anchor");
bootstrap_anchors.push((nid, addr));
}
}
// Collect bootstrap anchor node IDs so we can deprioritize them
let bootstrap_anchor_ids: std::collections::HashSet<NodeId> =
bootstrap_anchors.iter().map(|(nid, _)| *nid).collect();
// Update known_anchors + peers with freshly DNS-resolved bootstrap addresses.
// Without this, stale IPv6 addresses from previous sessions can block reconnection
// on devices without IPv6 connectivity (see bugs-fixed.md #1).
{
let s = storage.lock().await;
for (nid, addr) in &bootstrap_anchors {
let ip_addrs: Vec<std::net::SocketAddr> = addr.ip_addrs().copied().collect();
if !ip_addrs.is_empty() {
let _ = s.upsert_known_anchor(nid, &ip_addrs);
let _ = s.upsert_peer(nid, &ip_addrs, None);
}
}
}
// Rebuild social routes from follows + audience
{
let s = storage.lock().await;
match s.rebuild_social_routes() {
Ok(count) if count > 0 => info!(count, "Rebuilt social routes on startup"),
_ => {}
}
}
// Startup connection: try discovered anchors FIRST, bootstrap anchors LAST.
// This keeps load off bootstrap anchors — they're only needed when nothing else works.
// Order: known non-bootstrap anchors → mDNS (via iroh) → bootstrap anchors
{
let conn_count = network.connection_count().await;
if conn_count < 5 {
let known = {
let s = storage.lock().await;
s.list_known_anchors().unwrap_or_default()
};
// Split into discovered anchors (priority) and bootstrap anchors (fallback)
let (discovered, bootstrap_known): (Vec<_>, Vec<_>) = known.into_iter()
.partition(|(nid, _)| !bootstrap_anchor_ids.contains(nid));
// Phase 1: Try discovered (non-bootstrap) anchors first
let mut connected_anchor = None;
for (anchor_nid, anchor_addrs) in &discovered {
if *anchor_nid == node_id || network.is_peer_connected_or_session(anchor_nid).await {
continue;
}
let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) {
Ok(eid) => eid,
Err(_) => continue,
};
let mut addr = iroh::EndpointAddr::from(endpoint_id);
for sa in anchor_addrs {
addr = addr.with_ip_addr(*sa);
}
info!(peer = hex::encode(anchor_nid), "Trying discovered anchor");
match tokio::time::timeout(std::time::Duration::from_secs(10), network.connect_to_anchor(*anchor_nid, addr)).await {
Ok(Ok(())) => {
info!(peer = hex::encode(anchor_nid), "Connected to discovered anchor");
connected_anchor = Some(*anchor_nid);
break;
}
Ok(Err(e)) => debug!(error = %e, peer = hex::encode(anchor_nid), "Discovered anchor: connect failed"),
Err(_) => debug!(peer = hex::encode(anchor_nid), "Discovered anchor: connect timed out"),
}
}
// Phase 2: Fall back to bootstrap anchors only if no discovered anchor worked
if connected_anchor.is_none() {
for (anchor_nid, anchor_addrs) in &bootstrap_known {
if *anchor_nid == node_id || network.is_peer_connected_or_session(anchor_nid).await {
continue;
}
let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) {
Ok(eid) => eid,
Err(_) => continue,
};
let mut addr = iroh::EndpointAddr::from(endpoint_id);
for sa in anchor_addrs {
addr = addr.with_ip_addr(*sa);
}
info!(peer = hex::encode(anchor_nid), "Trying bootstrap anchor (fallback)");
match tokio::time::timeout(std::time::Duration::from_secs(10), network.connect_to_anchor(*anchor_nid, addr)).await {
Ok(Ok(())) => {
info!(peer = hex::encode(anchor_nid), "Connected to bootstrap anchor");
connected_anchor = Some(*anchor_nid);
break;
}
Ok(Err(e)) => debug!(error = %e, peer = hex::encode(anchor_nid), "Bootstrap anchor: connect failed"),
Err(_) => debug!(peer = hex::encode(anchor_nid), "Bootstrap anchor: connect timed out"),
}
}
}
// Phase 3: NAT probe + referrals from whichever anchor we connected to
if let Some(anchor_nid) = connected_anchor {
match tokio::time::timeout(
std::time::Duration::from_secs(15),
network.request_nat_filter_probe(&anchor_nid),
).await {
Ok(Ok(())) => info!("NAT filter probe completed during bootstrap"),
Ok(Err(e)) => warn!(error = %e, "NAT filter probe failed during bootstrap"),
Err(_) => warn!("NAT filter probe timed out during bootstrap"),
}
match tokio::time::timeout(std::time::Duration::from_secs(10), network.request_anchor_referrals(&anchor_nid)).await {
Ok(Ok(referrals)) if !referrals.is_empty() => {
info!(count = referrals.len(), "Got anchor referrals");
let net = Arc::clone(&network);
let my_id = node_id;
let anchor = anchor_nid;
tokio::spawn(async move {
for referral in referrals {
if referral.node_id == my_id {
continue;
}
if let Some(addr_str) = referral.addresses.first() {
let connect_str = format!(
"{}@{}",
hex::encode(referral.node_id),
addr_str,
);
if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) {
let connect_fut = async {
match net.connect_to_peer(rid, raddr).await {
Ok(()) => { info!(peer = hex::encode(rid), "Connected to referred peer"); Ok(()) },
Err(_) => {
match net.connect_via_introduction(rid, anchor).await {
Ok(()) => { info!(peer = hex::encode(rid), "Connected via hole punch"); Ok(()) },
Err(e) => Err(e),
}
}
}
};
match tokio::time::timeout(std::time::Duration::from_secs(15), connect_fut).await {
Ok(Ok(())) => {},
Ok(Err(e)) => debug!(error = %e, peer = hex::encode(rid), "Referral connect failed"),
Err(_) => debug!(peer = hex::encode(rid), "Referral connect timed out"),
}
}
}
}
net.notify_growth().await;
});
}
Ok(Ok(_)) => debug!("No referrals from anchor"),
Ok(Err(e)) => debug!(error = %e, "Referral request failed"),
Err(_) => debug!("Referral request timed out"),
}
}
}
}
Ok(Self {
data_dir,
storage,
network,
node_id,
blob_store,
secret_seed,
bootstrap_anchors,
profile,
activity_log,
last_rebalance_ms,
last_anchor_register_ms,
})
}
/// Get recent activity events (for diagnostics UI).
pub fn get_activity_log(&self, limit: usize) -> Vec<ActivityEvent> {
self.activity_log.lock().unwrap().recent(limit)
}
/// Get timer state: (last_rebalance_ms, last_anchor_register_ms).
pub fn timer_state(&self) -> (u64, u64) {
(
self.last_rebalance_ms.load(AtomicOrdering::Relaxed),
self.last_anchor_register_ms.load(AtomicOrdering::Relaxed),
)
}
/// Get the secret seed bytes (for crypto operations by consumers like Tauri)
pub fn secret_seed_bytes(&self) -> [u8; 32] {
self.secret_seed
}
// ---- Identity export/import ----
pub fn export_identity_hex(&self) -> anyhow::Result<String> {
let key_path = self.data_dir.join("identity.key");
let key_bytes = std::fs::read(&key_path)?;
Ok(hex::encode(key_bytes))
}
pub fn import_identity(data_dir: &Path, hex_key: &str) -> anyhow::Result<()> {
std::fs::create_dir_all(data_dir)?;
let key_path = data_dir.join("identity.key");
if key_path.exists() {
anyhow::bail!("identity.key already exists in {:?} — refusing to overwrite", data_dir);
}
let bytes = hex::decode(hex_key)?;
if bytes.len() != 32 {
anyhow::bail!("key must be exactly 32 bytes (64 hex chars), got {} bytes", bytes.len());
}
std::fs::write(&key_path, &bytes)?;
Ok(())
}
/// Get up to 10 currently-connected peer NodeIds (for recent_peers in profile).
/// Prefers social peers, then wide.
async fn current_recent_peers(&self) -> Vec<NodeId> {
let conns = self.network.connection_info().await;
let mut social: Vec<NodeId> = Vec::new();
let mut wide: Vec<NodeId> = Vec::new();
for (nid, kind, _) in conns {
if nid == self.node_id {
continue;
}
match kind {
PeerSlotKind::Preferred | PeerSlotKind::Local => social.push(nid),
PeerSlotKind::Wide => wide.push(nid),
}
}
let mut result = social;
result.extend(wide);
result.truncate(10);
result
}
// ---- Posts ----
pub async fn create_post(&self, content: String) -> anyhow::Result<(PostId, Post)> {
let (id, post, _vis) = self
.create_post_with_visibility(content, VisibilityIntent::Public, vec![])
.await?;
Ok((id, post))
}
pub async fn create_post_with_visibility(
&self,
content: String,
intent: VisibilityIntent,
attachment_data: Vec<(Vec<u8>, String)>,
) -> anyhow::Result<(PostId, Post, PostVisibility)> {
// Validate attachments
if attachment_data.len() > 4 {
anyhow::bail!("max 4 attachments per post");
}
for (data, _) in &attachment_data {
if data.len() > 10 * 1024 * 1024 {
anyhow::bail!("attachment exceeds 10MB limit");
}
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
// Determine encryption parameters and generate CEK if needed.
// The CEK is generated BEFORE both content and blob encryption so they share the same key.
enum EncryptionMode {
Public,
Recipient { cek: [u8; 32], recipients: Vec<NodeId> },
Group { cek: [u8; 32], group_id: [u8; 32], epoch: u64, group_seed: [u8; 32], group_pubkey: [u8; 32] },
}
let mode = match &intent {
VisibilityIntent::Public => EncryptionMode::Public,
VisibilityIntent::Circle(circle_name) => {
// Try group encryption first
let group_info = {
let storage = self.storage.lock().await;
storage.get_group_key_by_circle(circle_name)?
.and_then(|gk| {
storage.get_group_seed(&gk.group_id, gk.epoch).ok().flatten()
.map(|seed| (gk.group_id, gk.epoch, seed, gk.group_public_key))
})
};
if let Some((group_id, epoch, group_seed, group_pubkey)) = group_info {
let mut cek = [0u8; 32];
rand::RngCore::fill_bytes(&mut rand::rng(), &mut cek);
EncryptionMode::Group { cek, group_id, epoch, group_seed, group_pubkey }
} else {
let recipients = self.resolve_recipients(&intent).await?;
if recipients.is_empty() {
anyhow::bail!("no recipients resolved for this visibility");
}
let mut cek = [0u8; 32];
rand::RngCore::fill_bytes(&mut rand::rng(), &mut cek);
EncryptionMode::Recipient { cek, recipients }
}
}
_ => {
let recipients = self.resolve_recipients(&intent).await?;
if recipients.is_empty() {
anyhow::bail!("no recipients resolved for this visibility");
}
let mut cek = [0u8; 32];
rand::RngCore::fill_bytes(&mut rand::rng(), &mut cek);
EncryptionMode::Recipient { cek, recipients }
}
};
// Store blob files — for encrypted posts, encrypt each blob with the shared CEK.
// CID is computed on the ciphertext so peers can verify what they store.
let mut attachments = Vec::with_capacity(attachment_data.len());
for (data, mime) in &attachment_data {
let (store_data, size) = match &mode {
EncryptionMode::Public => {
(data.clone(), data.len() as u64)
}
EncryptionMode::Recipient { cek, .. } | EncryptionMode::Group { cek, .. } => {
let encrypted = crypto::encrypt_bytes_with_cek(data, cek)?;
let sz = encrypted.len() as u64;
(encrypted, sz)
}
};
let cid = crate::blob::compute_blob_id(&store_data);
self.blob_store.store(&cid, &store_data)?;
attachments.push(Attachment {
cid,
mime_type: mime.clone(),
size_bytes: size,
});
}
// Encrypt content and build visibility
let (final_content, visibility) = match mode {
EncryptionMode::Public => (content, PostVisibility::Public),
EncryptionMode::Recipient { cek, recipients } => {
let (encrypted, wrapped_keys) =
crypto::encrypt_post_with_cek(&content, &cek, &self.secret_seed, &self.node_id, &recipients)?;
(
encrypted,
PostVisibility::Encrypted {
recipients: wrapped_keys,
},
)
}
EncryptionMode::Group { cek, group_id, epoch, group_seed, group_pubkey } => {
let (encrypted, wrapped_cek) =
crypto::encrypt_post_for_group_with_cek(&content, &cek, &group_seed, &group_pubkey)?;
(
encrypted,
PostVisibility::GroupEncrypted {
group_id,
epoch,
wrapped_cek,
},
)
}
};
let post = Post {
author: self.node_id,
content: final_content,
attachments,
timestamp_ms: now,
};
let post_id = compute_post_id(&post);
{
let storage = self.storage.lock().await;
storage.store_post_with_intent(&post_id, &post, &visibility, &intent)?;
for att in &post.attachments {
storage.record_blob(&att.cid, &post_id, &self.node_id, att.size_bytes, &att.mime_type, now)?;
// Auto-pin own blobs so they're never evicted before foreign content
let _ = storage.pin_blob(&att.cid);
}
// Initialize encrypted receipt + comment slots for non-public posts
if !matches!(visibility, PostVisibility::Public) {
let participant_count = match &visibility {
PostVisibility::Encrypted { recipients } => recipients.len(),
PostVisibility::GroupEncrypted { .. } => {
// For group posts, we don't know exact member count at creation time;
// use a reasonable default (the circle members count, resolved earlier)
match &intent {
VisibilityIntent::Circle(circle_name) => {
storage.get_circle_members(circle_name)
.map(|m| m.len() + 1) // +1 for author
.unwrap_or(2)
}
_ => 2,
}
}
PostVisibility::Public => unreachable!(),
};
let receipt_slots: Vec<Vec<u8>> = (0..participant_count)
.map(|_| crypto::random_slot_noise(64))
.collect();
let comment_slot_count = (participant_count + 2) / 3; // ceil(participants / 3)
let comment_slots: Vec<Vec<u8>> = (0..comment_slot_count)
.map(|_| crypto::random_slot_noise(256))
.collect();
let blob_header = crate::types::BlobHeader {
post_id,
author: self.node_id,
reactions: vec![],
comments: vec![],
policy: Default::default(),
updated_at: now,
thread_splits: vec![],
receipt_slots,
comment_slots,
};
let header_json = serde_json::to_string(&blob_header)?;
storage.store_blob_header(&post_id, &self.node_id, &header_json, now)?;
}
}
// Build and store CDN manifests for blobs
if !post.attachments.is_empty() {
let storage = self.storage.lock().await;
let (previous, _following) = storage.get_author_post_neighborhood(&self.node_id, now, 10)?;
drop(storage);
let manifest = crate::types::AuthorManifest {
post_id,
author: self.node_id,
author_addresses: self.network.our_addresses(),
created_at: now,
updated_at: now,
previous_posts: previous,
following_posts: vec![],
signature: vec![],
};
let sig = crypto::sign_manifest(&self.secret_seed, &manifest);
let mut manifest = manifest;
manifest.signature = sig;
let manifest_json = serde_json::to_string(&manifest)?;
{
let storage = self.storage.lock().await;
for att in &post.attachments {
storage.store_cdn_manifest(&att.cid, &manifest_json, &self.node_id, now)?;
}
}
// Update previous posts' manifests to include this new post as a following_post
self.update_neighbor_manifests(&post_id, now).await;
// Push updated manifests to downstream peers
let manifests_to_push = {
let storage = self.storage.lock().await;
storage.get_manifests_for_author_blobs(&self.node_id).unwrap_or_default()
};
let our_addrs = self.network.our_addresses();
for (push_cid, push_json) in &manifests_to_push {
if let Ok(author_manifest) = serde_json::from_str::<crate::types::AuthorManifest>(push_json) {
let cdn_manifest = crate::types::CdnManifest {
author_manifest: author_manifest,
host: self.node_id,
host_addresses: our_addrs.clone(),
source: self.node_id,
source_addresses: our_addrs.clone(),
downstream_count: 0,
};
self.network.push_manifest_to_downstream(push_cid, &cdn_manifest).await;
}
}
}
// For encrypted posts, push directly to recipients
let pushed = self.network.push_post_to_recipients(&post_id, &post, &visibility).await;
// For public posts, push to audience members
let audience_pushed = self.network.push_to_audience(&post_id, &post, &visibility).await;
info!(post_id = hex::encode(post_id), pushed, audience_pushed, "Created new post");
Ok((post_id, post, visibility))
}
/// Update the manifests of recent prior posts to include a newly created post
/// in their following_posts list. Re-signs each updated manifest.
async fn update_neighbor_manifests(&self, new_post_id: &PostId, new_timestamp_ms: u64) {
let storage = self.storage.lock().await;
let manifests = match storage.get_manifests_for_author_blobs(&self.node_id) {
Ok(m) => m,
Err(e) => {
warn!("Failed to get manifests for neighbor update: {}", e);
return;
}
};
drop(storage);
let new_entry = crate::types::ManifestEntry {
post_id: *new_post_id,
timestamp_ms: new_timestamp_ms,
has_attachments: true,
};
for (cid, json) in manifests {
let mut manifest: crate::types::AuthorManifest = match serde_json::from_str(&json) {
Ok(m) => m,
Err(_) => continue,
};
// Only update if this manifest's post was created before the new post
if manifest.created_at >= new_timestamp_ms {
continue;
}
// Don't add duplicate
if manifest.following_posts.iter().any(|e| e.post_id == *new_post_id) {
continue;
}
// Keep max 10 following_posts
if manifest.following_posts.len() >= 10 {
continue;
}
manifest.following_posts.push(new_entry.clone());
manifest.updated_at = new_timestamp_ms;
manifest.signature = crypto::sign_manifest(&self.secret_seed, &manifest);
let updated_json = match serde_json::to_string(&manifest) {
Ok(j) => j,
Err(_) => continue,
};
let storage = self.storage.lock().await;
let _ = storage.store_cdn_manifest(&cid, &updated_json, &self.node_id, new_timestamp_ms);
drop(storage);
}
}
async fn resolve_recipients(&self, intent: &VisibilityIntent) -> anyhow::Result<Vec<NodeId>> {
let storage = self.storage.lock().await;
match intent {
VisibilityIntent::Public => Ok(vec![]),
VisibilityIntent::Friends => storage.list_public_follows(),
VisibilityIntent::Circle(name) => storage.get_circle_members(name),
VisibilityIntent::Direct(ids) => Ok(ids.clone()),
}
}
pub async fn get_feed(
&self,
) -> anyhow::Result<Vec<(PostId, Post, PostVisibility, Option<String>)>> {
let (raw, group_seeds) = {
let storage = self.storage.lock().await;
let posts = storage.get_feed()?;
let seeds = storage.get_all_group_seeds_map().unwrap_or_default();
(posts, seeds)
};
Ok(self.decrypt_posts(raw, &group_seeds))
}
pub async fn get_all_posts(
&self,
) -> anyhow::Result<Vec<(PostId, Post, PostVisibility, Option<String>)>> {
let (raw, group_seeds) = {
let storage = self.storage.lock().await;
let posts = storage.list_posts_reverse_chron()?;
let seeds = storage.get_all_group_seeds_map().unwrap_or_default();
(posts, seeds)
};
Ok(self.decrypt_posts(raw, &group_seeds))
}
fn decrypt_posts(
&self,
posts: Vec<(PostId, Post, PostVisibility)>,
group_seeds: &std::collections::HashMap<(crate::types::GroupId, crate::types::GroupEpoch), ([u8; 32], [u8; 32])>,
) -> Vec<(PostId, Post, PostVisibility, Option<String>)> {
posts
.into_iter()
.map(|(id, post, vis)| {
let decrypted = match &vis {
PostVisibility::Public => None,
PostVisibility::Encrypted { recipients } => {
crypto::decrypt_post(
&post.content,
&self.secret_seed,
&self.node_id,
&post.author,
recipients,
)
.unwrap_or(None)
}
PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek } => {
group_seeds.get(&(*group_id, *epoch))
.and_then(|(seed, pubkey)| {
crypto::decrypt_group_post(
&post.content,
seed,
pubkey,
wrapped_cek,
).ok()
})
}
};
(id, post, vis, decrypted)
})
.collect()
}
// ---- Follows ----
pub async fn follow(&self, node_id: &NodeId) -> anyhow::Result<()> {
let connected = self.network.is_connected(node_id).await;
let storage = self.storage.lock().await;
storage.add_follow(node_id)?;
// Upsert social route
let is_audience = storage.list_audience_members()?.contains(node_id);
let relation = if is_audience { SocialRelation::Mutual } else { SocialRelation::Follow };
let addresses = storage.get_peer_record(node_id)?
.map(|r| r.addresses).unwrap_or_default();
let peer_addresses = storage.build_peer_addresses_for(node_id)?;
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default().as_millis() as u64;
let preferred_tree = storage.build_preferred_tree_for(node_id).unwrap_or_default();
storage.upsert_social_route(&SocialRouteEntry {
node_id: *node_id,
addresses,
peer_addresses,
relation,
status: if connected { SocialStatus::Online } else { SocialStatus::Disconnected },
last_connected_ms: 0,
last_seen_ms: now,
reach_method: ReachMethod::Direct,
preferred_tree,
})?;
Ok(())
}
pub async fn unfollow(&self, node_id: &NodeId) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.remove_follow(node_id)?;
// Downgrade or remove social route
let is_audience = storage.list_audience_members()?.contains(node_id);
if is_audience {
// Downgrade from Mutual to Audience
if let Some(mut route) = storage.get_social_route(node_id)? {
route.relation = SocialRelation::Audience;
storage.upsert_social_route(&route)?;
}
} else {
storage.remove_social_route(node_id)?;
}
Ok(())
}
pub async fn list_follows(&self) -> anyhow::Result<Vec<NodeId>> {
let storage = self.storage.lock().await;
storage.list_follows()
}
// ---- Profiles ----
pub async fn set_profile(&self, display_name: String, bio: String) -> anyhow::Result<PublicProfile> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let recent_peers = self.current_recent_peers().await;
let profile = {
let storage = self.storage.lock().await;
let existing_anchors = storage.get_peer_anchors(&self.node_id).unwrap_or_default();
let preferred_peers = storage.list_preferred_peers().unwrap_or_default();
let (existing_visible, existing_avatar) = storage.get_profile(&self.node_id)
.ok()
.flatten()
.map(|p| (p.public_visible, p.avatar_cid))
.unwrap_or((true, None));
let profile = PublicProfile {
node_id: self.node_id,
display_name,
bio,
updated_at: now,
anchors: existing_anchors,
recent_peers,
preferred_peers,
public_visible: existing_visible,
avatar_cid: existing_avatar,
};
storage.store_profile(&profile)?;
profile
};
let pushed = self.network.push_profile(&profile).await;
if pushed > 0 {
info!(pushed, "Pushed profile update to peers");
}
Ok(profile)
}
pub async fn set_anchors(&self, anchors: Vec<NodeId>) -> anyhow::Result<PublicProfile> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let recent_peers = self.current_recent_peers().await;
let profile = {
let storage = self.storage.lock().await;
let existing = storage.get_profile(&self.node_id)?;
let (display_name, bio, public_visible, avatar_cid) = match existing {
Some(p) => (p.display_name, p.bio, p.public_visible, p.avatar_cid),
None => (String::new(), String::new(), true, None),
};
let preferred_peers = storage.list_preferred_peers().unwrap_or_default();
let profile = PublicProfile {
node_id: self.node_id,
display_name,
bio,
updated_at: now,
anchors,
recent_peers,
preferred_peers,
public_visible,
avatar_cid,
};
storage.store_profile(&profile)?;
profile
};
let pushed = self.network.push_profile(&profile).await;
if pushed > 0 {
info!(pushed, "Pushed anchor update to peers");
}
Ok(profile)
}
pub async fn get_peer_anchors(&self, node_id: &NodeId) -> anyhow::Result<Vec<NodeId>> {
let storage = self.storage.lock().await;
storage.get_peer_anchors(node_id)
}
pub async fn get_profile(&self, node_id: &NodeId) -> anyhow::Result<Option<PublicProfile>> {
let storage = self.storage.lock().await;
storage.get_profile(node_id)
}
pub async fn my_profile(&self) -> anyhow::Result<Option<PublicProfile>> {
let storage = self.storage.lock().await;
storage.get_profile(&self.node_id)
}
pub async fn has_profile(&self) -> anyhow::Result<bool> {
let storage = self.storage.lock().await;
Ok(storage.get_profile(&self.node_id)?.is_some())
}
pub async fn get_display_name(&self, node_id: &NodeId) -> anyhow::Result<Option<String>> {
let storage = self.storage.lock().await;
storage.get_display_name(node_id)
}
// ---- Blobs ----
/// Get a blob by CID from local store.
pub async fn get_blob(&self, cid: &[u8; 32]) -> anyhow::Result<Option<Vec<u8>>> {
let data = self.blob_store.get(cid)?;
if data.is_some() {
let storage = self.storage.lock().await;
let _ = storage.touch_blob_access(cid);
}
Ok(data)
}
/// Decrypt a blob in the context of a post's visibility.
/// Public posts pass through unchanged. Encrypted/group-encrypted posts decrypt with the CEK.
fn decrypt_blob_for_post(
&self,
data: Vec<u8>,
post: &Post,
visibility: &PostVisibility,
group_seeds: &std::collections::HashMap<([u8; 32], u64), ([u8; 32], [u8; 32])>,
) -> anyhow::Result<Option<Vec<u8>>> {
match visibility {
PostVisibility::Public => Ok(Some(data)),
PostVisibility::Encrypted { recipients } => {
let cek = crypto::unwrap_cek_for_recipient(
&self.secret_seed,
&self.node_id,
&post.author,
recipients,
)?;
match cek {
Some(cek) => {
let plaintext = crypto::decrypt_bytes_with_cek(&data, &cek)?;
Ok(Some(plaintext))
}
None => Ok(None),
}
}
PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek } => {
if let Some((seed, pubkey)) = group_seeds.get(&(*group_id, *epoch)) {
let cek = crypto::unwrap_group_cek(seed, pubkey, wrapped_cek)?;
let plaintext = crypto::decrypt_bytes_with_cek(&data, &cek)?;
Ok(Some(plaintext))
} else {
Ok(None)
}
}
}
}
/// Get a blob by CID, decrypting it in the context of the given post.
/// For public posts, returns raw blob data. For encrypted posts, decrypts with the post's CEK.
pub async fn get_blob_for_post(
&self,
cid: &[u8; 32],
post_id: &PostId,
) -> anyhow::Result<Option<Vec<u8>>> {
// Get raw blob data (local)
let raw_data = match self.blob_store.get(cid)? {
Some(d) => d,
None => return Ok(None),
};
{
let storage = self.storage.lock().await;
let _ = storage.touch_blob_access(cid);
}
// Get post + visibility
let (post, visibility) = {
let storage = self.storage.lock().await;
match storage.get_post_with_visibility(post_id)? {
Some(pv) => pv,
None => return Ok(Some(raw_data)), // No post context — return raw
}
};
match &visibility {
PostVisibility::Public => Ok(Some(raw_data)),
PostVisibility::Encrypted { .. } => {
let empty_map = std::collections::HashMap::new();
self.decrypt_blob_for_post(raw_data, &post, &visibility, &empty_map)
}
PostVisibility::GroupEncrypted { .. } => {
let group_seeds = {
let storage = self.storage.lock().await;
storage.get_all_group_seeds_map().unwrap_or_default()
};
self.decrypt_blob_for_post(raw_data, &post, &visibility, &group_seeds)
}
}
}
/// Prefetch blobs for recently synced posts from a peer.
/// Queries storage for posts with attachments missing from the local blob store,
/// then fetches each missing blob. Runs outside any locks.
pub async fn prefetch_blobs_from_peer(&self, peer_id: &NodeId) {
// Gather posts with missing blobs
let missing: Vec<(PostId, NodeId, Vec<crate::types::Attachment>)> = {
let storage = self.storage.lock().await;
let post_ids = storage.list_post_ids().unwrap_or_default();
let mut result = Vec::new();
for pid in post_ids {
if let Ok(Some(post)) = storage.get_post(&pid) {
let missing_atts: Vec<_> = post.attachments.iter()
.filter(|a| !self.blob_store.has(&a.cid))
.cloned()
.collect();
if !missing_atts.is_empty() {
result.push((pid, post.author, missing_atts));
}
}
}
result
};
if missing.is_empty() {
return;
}
let mut fetched = 0usize;
for (post_id, author, attachments) in &missing {
for att in attachments {
match self.fetch_blob_with_fallback(
&att.cid, post_id, author, &att.mime_type, 0,
).await {
Ok(Some(_)) => { fetched += 1; }
Ok(None) => {}
Err(e) => {
tracing::debug!(
cid = hex::encode(att.cid),
error = %e,
"Blob prefetch failed"
);
}
}
}
}
if fetched > 0 {
tracing::info!(fetched, peer = hex::encode(peer_id), "Prefetched blobs after sync");
}
}
/// Check if a blob exists locally.
pub fn has_blob(&self, cid: &[u8; 32]) -> bool {
self.blob_store.has(cid)
}
/// Fetch a blob from a peer, storing it locally and recording CDN metadata.
pub async fn fetch_blob_from_peer(
&self,
cid: &[u8; 32],
from_peer: &NodeId,
post_id: &PostId,
author: &NodeId,
mime_type: &str,
created_at: u64,
) -> anyhow::Result<Option<Vec<u8>>> {
// Check local first
if let Some(data) = self.blob_store.get(cid)? {
return Ok(Some(data));
}
// Fetch with CDN metadata
let (data, response) = self.network.fetch_blob_full(cid, from_peer).await?;
if let Some(ref data) = data {
// Store blob locally
self.blob_store.store(cid, data)?;
let storage = self.storage.lock().await;
storage.record_blob(cid, post_id, author, data.len() as u64, mime_type, created_at)?;
// Store AuthorManifest if provided (extract from CdnManifest wrapper)
if let Some(ref cdn_manifest) = response.manifest {
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
let _ = storage.store_cdn_manifest(
cid,
&author_json,
&cdn_manifest.author_manifest.author,
cdn_manifest.author_manifest.updated_at,
);
}
}
// Record upstream source
let source_addrs: Vec<String> = response.manifest.as_ref()
.map(|m| m.host_addresses.clone())
.unwrap_or_default();
let _ = storage.store_blob_upstream(cid, from_peer, &source_addrs);
}
Ok(data)
}
/// Fetch a blob with CDN-aware cascade:
/// 1. Local → 2. Existing upstream → 3. Lateral N0-N2 peers → 4. Author → 5. Redirect peers
pub async fn fetch_blob_with_fallback(
&self,
cid: &[u8; 32],
post_id: &PostId,
author: &NodeId,
mime_type: &str,
created_at: u64,
) -> anyhow::Result<Option<Vec<u8>>> {
// 1. Check local
if let Some(data) = self.blob_store.get(cid)? {
let storage = self.storage.lock().await;
let _ = storage.touch_blob_access(cid);
return Ok(Some(data));
}
// Collect redirect peers from responses in case we need them later
let mut redirect_peers: Vec<crate::types::PeerWithAddress> = Vec::new();
// 2. Try existing upstream (if we previously fetched this blob)
let upstream = {
let storage = self.storage.lock().await;
storage.get_blob_upstream(cid)?
};
if let Some((upstream_nid, _upstream_addrs)) = upstream {
match self.fetch_blob_from_peer(cid, &upstream_nid, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(error = %e, "blob fetch from upstream failed"),
}
}
// 3. Lateral N0-N2: mesh peers + N2 peers who have the author's posts
let lateral_sources = {
let storage = self.storage.lock().await;
storage.get_lateral_blob_sources(author, post_id).unwrap_or_default()
};
for lateral in lateral_sources {
if lateral == *author {
continue; // Author is step 4
}
match self.network.fetch_blob_full(cid, &lateral).await {
Ok((Some(data), response)) => {
self.blob_store.store(cid, &data)?;
let storage = self.storage.lock().await;
storage.record_blob(cid, post_id, author, data.len() as u64, mime_type, created_at)?;
if let Some(ref cdn_manifest) = response.manifest {
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
let _ = storage.store_cdn_manifest(cid, &author_json, &cdn_manifest.author_manifest.author, cdn_manifest.author_manifest.updated_at);
}
}
let _ = storage.store_blob_upstream(cid, &lateral, &[]);
return Ok(Some(data));
}
Ok((None, response)) => {
redirect_peers.extend(response.cdn_redirect_peers);
}
Err(e) => warn!(peer = hex::encode(lateral), error = %e, "lateral blob fetch failed"),
}
}
// 4. Try author (last resort for direct)
match self.fetch_blob_from_peer(cid, author, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(error = %e, "blob fetch from author failed"),
}
// 5. Try redirect peers (from any step that returned cdn_redirect_peers)
for rp in &redirect_peers {
if let Ok(nid_bytes) = hex::decode(&rp.n) {
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
match self.fetch_blob_from_peer(cid, &nid, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(peer = &rp.n, error = %e, "redirect blob fetch failed"),
}
}
}
}
// 6. Try replica peers as final fallback (1-hour staleness window)
let replicas = {
let storage = self.storage.lock().await;
storage.get_replica_peers(post_id, 3_600_000)?
};
for replica in replicas {
match self.fetch_blob_from_peer(cid, &replica, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(peer = hex::encode(replica), error = %e, "blob fetch from replica failed"),
}
}
Ok(None)
}
// ---- Circles ----
pub async fn create_circle(&self, name: String) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.create_circle(&name)?;
drop(storage);
self.create_group_key_for_circle(&name).await?;
Ok(())
}
pub async fn delete_circle(&self, name: String) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
// Delete group key and associated data
if let Ok(Some(gk)) = storage.get_group_key_by_circle(&name) {
let _ = storage.delete_group_key(&gk.group_id);
}
storage.delete_circle(&name)
}
pub async fn add_to_circle(&self, circle_name: String, node_id: NodeId) -> anyhow::Result<()> {
{
let storage = self.storage.lock().await;
storage.add_circle_member(&circle_name, &node_id)?;
}
// Wrap current group key for new member and distribute
let distribute_payload = {
let storage = self.storage.lock().await;
if let Ok(Some(gk)) = storage.get_group_key_by_circle(&circle_name) {
if gk.admin == self.node_id {
if let Ok(Some(seed)) = storage.get_group_seed(&gk.group_id, gk.epoch) {
match crypto::wrap_group_key_for_member(&self.secret_seed, &node_id, &seed) {
Ok(wrapped) => {
let mk = crate::types::GroupMemberKey {
member: node_id,
epoch: gk.epoch,
wrapped_group_key: wrapped,
};
let _ = storage.store_group_member_key(&gk.group_id, &mk);
Some(crate::protocol::GroupKeyDistributePayload {
group_id: gk.group_id,
circle_name: circle_name.clone(),
epoch: gk.epoch,
group_public_key: gk.group_public_key,
admin: self.node_id,
member_keys: vec![mk],
})
}
Err(e) => {
warn!(error = %e, "Failed to wrap group key for new member");
None
}
}
} else { None }
} else { None }
} else { None }
};
if let Some(payload) = distribute_payload {
self.network.push_group_key(&node_id, &payload).await;
}
Ok(())
}
pub async fn remove_from_circle(
&self,
circle_name: String,
node_id: NodeId,
) -> anyhow::Result<()> {
{
let storage = self.storage.lock().await;
storage.remove_circle_member(&circle_name, &node_id)?;
}
// Rotate group key if we're the admin
self.rotate_group_key(&circle_name).await;
Ok(())
}
/// Create a group key for a circle (called on circle creation).
async fn create_group_key_for_circle(&self, circle_name: &str) -> anyhow::Result<()> {
let (seed, pubkey) = crypto::generate_group_keypair();
let group_id = crypto::compute_group_id(&pubkey);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let record = crate::types::GroupKeyRecord {
group_id,
circle_name: circle_name.to_string(),
epoch: 1,
group_public_key: pubkey,
admin: self.node_id,
created_at: now,
};
let storage = self.storage.lock().await;
storage.create_group_key(&record, Some(&seed))?;
storage.store_group_seed(&group_id, 1, &seed)?;
// Wrap for ourselves
let self_wrapped = crypto::wrap_group_key_for_member(&self.secret_seed, &self.node_id, &seed)?;
let self_mk = crate::types::GroupMemberKey {
member: self.node_id,
epoch: 1,
wrapped_group_key: self_wrapped,
};
storage.store_group_member_key(&group_id, &self_mk)?;
// Wrap for existing circle members and distribute
let members = storage.get_circle_members(circle_name)?;
drop(storage);
for member in &members {
if *member == self.node_id {
continue;
}
match crypto::wrap_group_key_for_member(&self.secret_seed, member, &seed) {
Ok(wrapped) => {
let mk = crate::types::GroupMemberKey {
member: *member,
epoch: 1,
wrapped_group_key: wrapped,
};
{
let storage = self.storage.lock().await;
let _ = storage.store_group_member_key(&group_id, &mk);
}
let payload = crate::protocol::GroupKeyDistributePayload {
group_id,
circle_name: circle_name.to_string(),
epoch: 1,
group_public_key: pubkey,
admin: self.node_id,
member_keys: vec![mk],
};
self.network.push_group_key(member, &payload).await;
}
Err(e) => {
warn!(member = hex::encode(member), error = %e, "Failed to wrap group key for member");
}
}
}
info!(circle = %circle_name, group_id = hex::encode(group_id), "Created group key for circle");
Ok(())
}
/// Rotate the group key for a circle (called on member removal).
async fn rotate_group_key(&self, circle_name: &str) {
let rotate_result = {
let storage = self.storage.lock().await;
let gk = match storage.get_group_key_by_circle(circle_name) {
Ok(Some(gk)) if gk.admin == self.node_id => gk,
_ => return,
};
let remaining_members = match storage.get_circle_members(circle_name) {
Ok(m) => m,
Err(_) => return,
};
// Always include ourselves
let mut all_members = remaining_members;
if !all_members.contains(&self.node_id) {
all_members.push(self.node_id);
}
match crypto::rotate_group_key(&self.secret_seed, gk.epoch, &all_members) {
Ok((new_seed, new_pubkey, new_epoch, member_keys)) => {
Some((gk.group_id, new_seed, new_pubkey, new_epoch, member_keys, circle_name.to_string()))
}
Err(e) => {
warn!(error = %e, "Failed to rotate group key");
None
}
}
};
if let Some((group_id, new_seed, new_pubkey, new_epoch, member_keys, circle_name)) = rotate_result {
// Update storage
{
let storage = self.storage.lock().await;
let _ = storage.update_group_epoch(&group_id, new_epoch, &new_pubkey, Some(&new_seed));
let _ = storage.store_group_seed(&group_id, new_epoch, &new_seed);
for mk in &member_keys {
let _ = storage.store_group_member_key(&group_id, mk);
}
}
// Distribute to each member
for mk in &member_keys {
if mk.member == self.node_id {
continue;
}
let payload = crate::protocol::GroupKeyDistributePayload {
group_id,
circle_name: circle_name.clone(),
epoch: new_epoch,
group_public_key: new_pubkey,
admin: self.node_id,
member_keys: vec![mk.clone()],
};
self.network.push_group_key(&mk.member, &payload).await;
}
info!(circle = %circle_name, epoch = new_epoch, "Rotated group key");
}
}
pub async fn list_circles(&self) -> anyhow::Result<Vec<Circle>> {
let storage = self.storage.lock().await;
storage.list_circles()
}
// ---- Circle Profiles ----
/// Set a circle profile: store locally, encrypt with group key, push to connected peers.
pub async fn set_circle_profile(
&self,
circle_name: String,
display_name: String,
bio: String,
avatar_cid: Option<[u8; 32]>,
) -> anyhow::Result<crate::types::CircleProfile> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let cp = crate::types::CircleProfile {
author: self.node_id,
circle_name: circle_name.clone(),
display_name,
bio,
avatar_cid,
updated_at: now,
};
// Get group key for this circle
let (encrypted_payload, wrapped_cek, group_id, epoch) = {
let storage = self.storage.lock().await;
// Verify circle exists
let circles = storage.list_circles()?;
if !circles.iter().any(|c| c.name == circle_name) {
anyhow::bail!("circle '{}' does not exist", circle_name);
}
let gk = storage.get_group_key_by_circle(&circle_name)?
.ok_or_else(|| anyhow::anyhow!("no group key for circle '{}'", circle_name))?;
if gk.admin != self.node_id {
anyhow::bail!("not admin of circle '{}'", circle_name);
}
let seed = storage.get_group_seed(&gk.group_id, gk.epoch)?
.ok_or_else(|| anyhow::anyhow!("group seed not found for circle '{}'", circle_name))?;
// Encrypt circle profile as JSON
let json = serde_json::to_string(&cp)?;
let (encrypted, wrapped) = crypto::encrypt_post_for_group(&json, &seed, &gk.group_public_key)?;
// Store plaintext + encrypted form
storage.set_circle_profile(&cp)?;
storage.store_remote_circle_profile(
&self.node_id,
&circle_name,
&cp,
&encrypted,
&wrapped,
&gk.group_id,
gk.epoch,
)?;
(encrypted, wrapped, gk.group_id, gk.epoch)
};
// Push to all connected mesh peers
let payload = crate::protocol::CircleProfileUpdatePayload {
author: self.node_id,
circle_name,
group_id,
epoch,
encrypted_payload,
wrapped_cek,
updated_at: now,
};
let pushed = self.network.push_circle_profile(&payload).await;
if pushed > 0 {
info!(pushed, "Pushed circle profile update to peers");
}
Ok(cp)
}
/// Delete a circle profile and push tombstone.
pub async fn delete_circle_profile(&self, circle_name: String) -> anyhow::Result<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let payload = {
let storage = self.storage.lock().await;
let gk = storage.get_group_key_by_circle(&circle_name)?
.ok_or_else(|| anyhow::anyhow!("no group key for circle '{}'", circle_name))?;
let seed = storage.get_group_seed(&gk.group_id, gk.epoch)?
.ok_or_else(|| anyhow::anyhow!("group seed not found"))?;
// Encrypt empty string as tombstone
let (encrypted, wrapped) = crypto::encrypt_post_for_group("", &seed, &gk.group_public_key)?;
storage.delete_circle_profile(&self.node_id, &circle_name)?;
crate::protocol::CircleProfileUpdatePayload {
author: self.node_id,
circle_name,
group_id: gk.group_id,
epoch: gk.epoch,
encrypted_payload: encrypted,
wrapped_cek: wrapped,
updated_at: now,
}
};
self.network.push_circle_profile(&payload).await;
Ok(())
}
/// Set public_visible flag and push profile update.
pub async fn set_public_visible(&self, visible: bool) -> anyhow::Result<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let recent_peers = self.current_recent_peers().await;
let profile = {
let storage = self.storage.lock().await;
let existing = storage.get_profile(&self.node_id)?;
let (display_name, bio, avatar_cid) = match existing {
Some(p) => (p.display_name, p.bio, p.avatar_cid),
None => (String::new(), String::new(), None),
};
let existing_anchors = storage.get_peer_anchors(&self.node_id).unwrap_or_default();
let preferred_peers = storage.list_preferred_peers().unwrap_or_default();
let profile = PublicProfile {
node_id: self.node_id,
display_name,
bio,
updated_at: now,
anchors: existing_anchors,
recent_peers,
preferred_peers,
public_visible: visible,
avatar_cid,
};
storage.store_profile(&profile)?;
profile
};
self.network.push_profile(&profile).await;
Ok(())
}
/// Resolve display info for any peer, taking circle profiles into account.
pub async fn resolve_display_name(
&self,
author: &NodeId,
) -> anyhow::Result<(String, String, Option<[u8; 32]>)> {
let storage = self.storage.lock().await;
storage.resolve_display_for_peer(author, &self.node_id)
}
/// Get our own circle profile for a given circle.
pub async fn get_circle_profile(
&self,
circle_name: &str,
) -> anyhow::Result<Option<crate::types::CircleProfile>> {
let storage = self.storage.lock().await;
storage.get_circle_profile(&self.node_id, circle_name)
}
/// Get the public_visible setting for our own profile.
pub async fn get_public_visible(&self) -> anyhow::Result<bool> {
let storage = self.storage.lock().await;
Ok(storage
.get_profile(&self.node_id)?
.map(|p| p.public_visible)
.unwrap_or(true))
}
// ---- Settings ----
/// Get a setting value by key.
pub async fn get_setting(&self, key: &str) -> anyhow::Result<Option<String>> {
let storage = self.storage.lock().await;
storage.get_setting(key)
}
/// Set a setting value (upsert).
pub async fn set_setting(&self, key: &str, value: &str) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.set_setting(key, value)
}
// ---- Delete / Revocation ----
pub async fn delete_post(&self, post_id: &PostId) -> anyhow::Result<()> {
let post = {
let storage = self.storage.lock().await;
storage
.get_post(post_id)?
.ok_or_else(|| anyhow::anyhow!("post not found"))?
};
if post.author != self.node_id {
anyhow::bail!("cannot delete: you are not the author");
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let signature = crypto::sign_delete(&self.secret_seed, post_id);
let record = DeleteRecord {
post_id: *post_id,
author: self.node_id,
timestamp_ms: now,
signature,
};
// Collect blob CIDs + CDN peers before cleanup
let blob_cdn_info: Vec<([u8; 32], Vec<(NodeId, Vec<String>)>, Option<(NodeId, Vec<String>)>)> = {
let storage = self.storage.lock().await;
let cids = storage.get_blobs_for_post(post_id).unwrap_or_default();
cids.into_iter().map(|cid| {
let downstream = storage.get_blob_downstream(&cid).unwrap_or_default();
let upstream = storage.get_blob_upstream(&cid).ok().flatten();
(cid, downstream, upstream)
}).collect()
};
// Clean up blobs (DB metadata + CDN metadata + filesystem)
let blob_cids = {
let storage = self.storage.lock().await;
let cids = storage.delete_blobs_for_post(post_id)?;
for cid in &cids {
let _ = storage.cleanup_cdn_for_blob(cid);
}
cids
};
for cid in &blob_cids {
if let Err(e) = self.blob_store.delete(cid) {
warn!(cid = hex::encode(cid), error = %e, "Failed to delete blob file");
}
}
{
let storage = self.storage.lock().await;
storage.store_delete(&record)?;
storage.apply_delete(&record)?;
}
// Send CDN delete notices for each blob
for (cid, downstream, upstream) in &blob_cdn_info {
self.network.send_blob_delete_notices(cid, downstream, upstream.as_ref()).await;
}
let pushed = self.network.push_delete(&record).await;
info!(post_id = hex::encode(post_id), pushed, blobs_removed = blob_cids.len(), "Deleted post");
Ok(())
}
pub async fn revoke_post_access(
&self,
post_id: &PostId,
revoked: &NodeId,
mode: RevocationMode,
) -> anyhow::Result<Option<PostId>> {
let (post, visibility) = {
let storage = self.storage.lock().await;
storage
.get_post_with_visibility(post_id)?
.ok_or_else(|| anyhow::anyhow!("post not found"))?
};
if post.author != self.node_id {
anyhow::bail!("cannot revoke: you are not the author");
}
let existing_recipients = match &visibility {
PostVisibility::Public => anyhow::bail!("cannot revoke access on a public post"),
PostVisibility::Encrypted { recipients } => recipients,
PostVisibility::GroupEncrypted { .. } => {
anyhow::bail!("cannot revoke individual access on a group-encrypted post; remove from circle instead")
}
};
let new_recipient_ids: Vec<NodeId> = existing_recipients
.iter()
.map(|wk| wk.recipient)
.filter(|r| r != revoked)
.collect();
if new_recipient_ids.len() == existing_recipients.len() {
anyhow::bail!("revoked node was not a recipient of this post");
}
match mode {
RevocationMode::SyncAccessList => {
let new_wrapped = crypto::rewrap_visibility(
&self.secret_seed,
&self.node_id,
existing_recipients,
&new_recipient_ids,
)?;
let new_vis = PostVisibility::Encrypted {
recipients: new_wrapped,
};
{
let storage = self.storage.lock().await;
storage.update_post_visibility(post_id, &new_vis)?;
}
let update = VisibilityUpdate {
post_id: *post_id,
author: self.node_id,
visibility: new_vis,
};
let pushed = self.network.push_visibility(&update).await;
info!(post_id = hex::encode(post_id), pushed, "Revoked access (sync mode)");
Ok(None)
}
RevocationMode::ReEncrypt => {
let (new_content, new_wrapped) = crypto::re_encrypt_post(
&post.content,
&self.secret_seed,
&self.node_id,
existing_recipients,
&new_recipient_ids,
)?;
let new_vis = PostVisibility::Encrypted {
recipients: new_wrapped,
};
let new_post = Post {
author: self.node_id,
content: new_content,
attachments: post.attachments.clone(),
timestamp_ms: post.timestamp_ms,
};
let new_post_id = compute_post_id(&new_post);
{
let storage = self.storage.lock().await;
storage.store_post_with_visibility(&new_post_id, &new_post, &new_vis)?;
}
// delete_post already pushes the DeleteRecord
self.delete_post(post_id).await?;
// Push replacement post directly to remaining recipients
self.network.push_post_to_recipients(&new_post_id, &new_post, &new_vis).await;
info!(
old_id = hex::encode(post_id),
new_id = hex::encode(new_post_id),
"Re-encrypted post (revoke)"
);
Ok(Some(new_post_id))
}
}
}
pub async fn revoke_circle_access(
&self,
circle_name: &str,
revoked: &NodeId,
mode: RevocationMode,
) -> anyhow::Result<usize> {
let posts = {
let storage = self.storage.lock().await;
storage.find_posts_by_circle_intent(circle_name, &self.node_id)?
};
let mut count = 0;
for (post_id, _post, vis) in &posts {
if let PostVisibility::Encrypted { recipients } = vis {
if recipients.iter().any(|wk| &wk.recipient == revoked) {
match self.revoke_post_access(post_id, revoked, mode).await {
Ok(_) => count += 1,
Err(e) => {
warn!(
post_id = hex::encode(post_id),
error = %e,
"Failed to revoke post access"
);
}
}
}
}
}
info!(circle = circle_name, count, "Revoked circle access");
Ok(count)
}
pub async fn get_redundancy_summary(&self) -> anyhow::Result<(usize, usize, usize, usize)> {
let storage = self.storage.lock().await;
storage.get_redundancy_summary(&self.node_id, 3_600_000)
}
// ---- Networking ----
pub fn endpoint_addr(&self) -> iroh::EndpointAddr {
self.network.endpoint_addr()
}
/// Connect to a peer by node ID using address resolution:
/// 0. Already connected or has session → done
/// 1. Social route cache → try cached address
/// 2. Peers table → connect directly
/// 3. N2/N3 lookup → ask tagged reporter for address
/// 4. Worm lookup → fan-out search beyond N3
/// 5. Relay introduction → coordinate hole punch via relay peer
/// 6. Session relay fallback → pipe through intermediary
pub async fn connect_by_node_id(&self, peer_id: NodeId) -> anyhow::Result<()> {
if self.network.is_connected(&peer_id).await {
return Ok(());
}
// Check if we already have a session connection
if self.network.conn_handle().has_session(&peer_id).await {
return Ok(());
}
// Check if this peer is known to be behind NAT / unreachable directly
let skip_direct = self.network.conn_handle().is_likely_unreachable(&peer_id).await;
// Step 0: Try social route cache (skipped for known-unreachable peers)
if !skip_direct {
let storage = self.storage.lock().await;
if let Some(route) = storage.get_social_route(&peer_id)? {
// Try cached addresses directly
for addr in &route.addresses {
let endpoint_id = match iroh::EndpointId::from_bytes(&peer_id) {
Ok(eid) => eid,
Err(_) => continue,
};
let ep_addr = iroh::EndpointAddr::from(endpoint_id).with_ip_addr(*addr);
drop(storage);
if self.network.connect_to_peer(peer_id, ep_addr).await.is_ok() {
info!(peer = hex::encode(peer_id), "Connected via social route cache");
return Ok(());
}
// Re-acquire lock for next iteration
break; // Only try first address from route directly
}
// Try peer_addresses: connect to their known peers and ask for target
for pa in &route.peer_addresses {
if let Ok(pa_nid) = crate::parse_node_id_hex(&pa.n) {
if self.network.is_connected(&pa_nid).await {
// Already connected to this peer — ask them
let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None);
if let Some(addr_str) = resolved {
if let Ok((_nid, ep_addr)) = crate::parse_connect_string(
&format!("{}@{}", hex::encode(peer_id), addr_str)
) {
if self.network.connect_to_peer(peer_id, ep_addr).await.is_ok() {
info!(peer = hex::encode(peer_id), via = &pa.n[..12], "Connected via social route peer referral");
return Ok(());
}
}
}
} else if let Some(pa_addr_str) = pa.a.first() {
// Try connecting to the peer first, then ask
if let Ok(pa_sock) = pa_addr_str.parse::<std::net::SocketAddr>() {
let pa_eid = match iroh::EndpointId::from_bytes(&pa_nid) {
Ok(eid) => eid,
Err(_) => continue,
};
let pa_ep = iroh::EndpointAddr::from(pa_eid).with_ip_addr(pa_sock);
if self.network.connect_to_peer(pa_nid, pa_ep).await.is_ok() {
let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None);
if let Some(addr_str) = resolved {
if let Ok((_nid, ep_addr)) = crate::parse_connect_string(
&format!("{}@{}", hex::encode(peer_id), addr_str)
) {
if self.network.connect_to_peer(peer_id, ep_addr).await.is_ok() {
info!(peer = hex::encode(peer_id), via = &pa.n[..12], "Connected via social route peer referral (new conn)");
return Ok(());
}
}
}
}
}
}
}
}
}
}
// Steps 1-4: Direct connection attempts (skipped for known-unreachable peers)
if !skip_direct {
// Step 1: Try direct address from peers table
if let Some(addr) = self.network.addr_from_storage(&peer_id).await {
if self.network.connect_to_peer(peer_id, addr).await.is_ok() {
return Ok(());
}
}
// Step 2-3: Try address resolution via N2/N3
let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None);
if let Some(addr_str) = resolved {
if let Ok(addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(peer_id), addr_str)) {
if self.network.connect_to_peer(peer_id, addr.1).await.is_ok() {
return Ok(());
}
}
}
// Step 4: Try worm lookup (fan-out search beyond N3)
info!(peer = hex::encode(peer_id), "Trying worm lookup...");
if let Ok(Some(wr)) = self.network.worm_lookup(&peer_id).await {
if wr.node_id == peer_id {
if let Some(addr_str) = wr.addresses.first() {
if let Ok(addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(peer_id), addr_str)) {
if self.network.connect_to_peer(peer_id, addr.1).await.is_ok() {
return Ok(());
}
}
}
} else {
info!(
target = hex::encode(peer_id),
found_via = hex::encode(wr.node_id),
"Worm found target via recent peer"
);
if let Some(addr_str) = wr.addresses.first() {
if let Ok(needle_addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(wr.node_id), addr_str)) {
if self.network.connect_to_peer(wr.node_id, needle_addr.1).await.is_ok() {
let resolved = self.network.conn_handle().resolve_address(&peer_id).await.unwrap_or(None);
if let Some(target_addr_str) = resolved {
if let Ok(target_addr) = crate::parse_connect_string(&format!("{}@{}", hex::encode(peer_id), target_addr_str)) {
if self.network.connect_to_peer(peer_id, target_addr.1).await.is_ok() {
return Ok(());
}
}
}
}
}
}
}
}
// All direct attempts failed — mark peer as likely unreachable
self.network.conn_handle().mark_unreachable(&peer_id);
}
// Step 6: Relay introduction — find relay peer(s) and request introduction
{
let on_cooldown = {
let storage = self.storage.lock().await;
storage.is_relay_cooldown(&peer_id, 300_000).unwrap_or(false)
};
if !on_cooldown {
let relay_candidates = self.network.conn_handle().find_relays_for(&peer_id).await;
let mut had_capacity_reject = false;
let mut last_intro_id: Option<crate::connection::IntroId> = None;
let mut last_relay_peer: Option<NodeId> = None;
let mut last_relay_available = false;
for (relay_peer, ttl) in &relay_candidates {
info!(
target = hex::encode(peer_id),
relay = hex::encode(relay_peer),
ttl,
"Attempting relay introduction"
);
let intro_result = tokio::time::timeout(
std::time::Duration::from_secs(15),
self.network.send_relay_introduce_standalone(relay_peer, &peer_id, *ttl),
).await;
match intro_result {
Ok(Ok(result)) if result.accepted => {
info!(
target = hex::encode(peer_id),
addrs = ?result.target_addresses,
relay_available = result.relay_available,
"Relay introduction accepted, attempting hole punch"
);
// Save for potential session relay fallback
last_intro_id = Some(result.intro_id);
last_relay_peer = Some(*relay_peer);
last_relay_available = result.relay_available;
// Try direct connection to target's addresses (hole punch with scanning)
let our_profile = self.network.conn_handle().our_nat_profile().await;
let peer_profile = {
let s = self.storage.lock().await;
s.get_peer_nat_profile(&peer_id)
};
if let Some(conn) = crate::connection::hole_punch_with_scanning(
self.network.endpoint(),
&peer_id,
&result.target_addresses,
our_profile,
peer_profile,
).await {
self.network.conn_handle().add_session(peer_id, conn, SessionReachMethod::HolePunch, None).await;
self.network.conn_handle().mark_reachable(&peer_id);
info!(peer = hex::encode(peer_id), "Connected via hole punch");
return Ok(());
}
// Intro accepted but hole punch failed — try session relay below
break;
}
Ok(Ok(result)) => {
let reason = result.reject_reason.as_deref().unwrap_or("unknown");
if reason.contains("capacity") {
debug!(
relay = hex::encode(relay_peer),
"Relay at capacity, trying next candidate"
);
had_capacity_reject = true;
continue; // Try next relay candidate
}
debug!(
target = hex::encode(peer_id),
reason,
"Relay introduction rejected"
);
// Target explicitly rejected — don't try more relays
break;
}
Ok(Err(e)) => {
debug!(error = %e, "Relay introduction failed, trying next candidate");
continue; // Network error — try next relay
}
Err(_) => {
debug!("Relay introduction timed out, trying next candidate");
continue; // Timeout — try next relay
}
}
}
// Step 7: Session relay fallback — if intro was accepted but hole punch failed
if let (Some(intro_id), Some(relay_peer)) = (last_intro_id, last_relay_peer) {
if last_relay_available {
info!(
target = hex::encode(peer_id),
relay = hex::encode(relay_peer),
"Hole punch failed, attempting session relay"
);
match self.attempt_session_relay(&relay_peer, &peer_id, &intro_id).await {
Ok(()) => {
info!(peer = hex::encode(peer_id), "Connected via session relay");
return Ok(());
}
Err(e) => {
debug!(error = %e, "Session relay failed");
}
}
}
}
// Record cooldown on failure (skip if all rejections were capacity-related)
if !relay_candidates.is_empty() && !had_capacity_reject {
let storage = self.storage.lock().await;
let _ = storage.record_relay_miss(&peer_id);
}
}
}
anyhow::bail!(
"cannot resolve address for peer {} (tried social routes, peers table, N2/N3, worm lookup, and relay introduction)",
hex::encode(peer_id)
)
}
/// Attempt to establish a session relay through an intermediary.
async fn attempt_session_relay(
&self,
relay_peer: &NodeId,
target: &NodeId,
intro_id: &crate::connection::IntroId,
) -> anyhow::Result<()> {
use crate::protocol::{
write_typed_message, MessageType, SessionRelayPayload,
};
let relay_conn = self.network.conn_handle().get_connection(relay_peer).await
.ok_or_else(|| anyhow::anyhow!("relay peer disconnected"))?;
let (mut send, _recv) = relay_conn.open_bi().await?;
let payload = SessionRelayPayload {
intro_id: *intro_id,
target: *target,
};
write_typed_message(&mut send, MessageType::SessionRelay, &payload).await?;
self.network.conn_handle().add_session(*target, relay_conn, SessionReachMethod::Relayed, None).await;
Ok(())
}
/// Worm lookup: fan-out search for a peer beyond the 3-hop discovery map.
pub async fn worm_lookup(&self, target: &NodeId) -> anyhow::Result<Option<WormResult>> {
self.network.worm_lookup(target).await
}
/// Connect to a peer and establish a mesh connection
pub async fn sync_with(&self, peer_id: NodeId) -> anyhow::Result<()> {
self.connect_by_node_id(peer_id).await?;
let stats = self.network.conn_handle().pull_from_peer(&peer_id).await?;
// Also fetch engagement data (reactions, comments) for posts we hold
let engagement = self.network.conn_handle().fetch_engagement_from_peer(&peer_id).await.unwrap_or(0);
info!(
peer = hex::encode(peer_id),
posts = stats.posts_received,
engagement_headers = engagement,
"Sync complete"
);
// Prefetch blobs for posts we just received
if stats.posts_received > 0 {
self.prefetch_blobs_from_peer(&peer_id).await;
}
Ok(())
}
/// Connect to a peer using full address
pub async fn sync_with_addr(&self, addr: iroh::EndpointAddr) -> anyhow::Result<()> {
let peer_id = *addr.id.as_bytes();
self.network.connect_to_peer(peer_id, addr).await?;
let stats = self.network.conn_handle().pull_from_peer(&peer_id).await?;
info!(
peer = hex::encode(peer_id),
posts = stats.posts_received,
"Sync complete"
);
Ok(())
}
/// Pull from all connected peers
pub async fn sync_all(&self) -> anyhow::Result<()> {
let stats = self.network.pull_from_all().await?;
info!(
"Pull complete: {} posts from {} peers",
stats.posts_received, stats.peers_pulled
);
Ok(())
}
pub async fn add_peer(&self, peer_id: NodeId) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.add_peer(&peer_id)?;
Ok(())
}
pub async fn list_peers(&self) -> anyhow::Result<Vec<NodeId>> {
let storage = self.storage.lock().await;
storage.list_peers()
}
pub async fn list_peer_records(&self) -> anyhow::Result<Vec<PeerRecord>> {
let storage = self.storage.lock().await;
storage.list_peer_records()
}
pub fn list_bootstrap_anchors(&self) -> &[(NodeId, iroh::EndpointAddr)] {
&self.bootstrap_anchors
}
/// Get connection info for display: (node_id, slot_kind, connected_at)
pub async fn list_connections(&self) -> Vec<(NodeId, PeerSlotKind, u64)> {
self.network.connection_info().await
}
pub async fn stats(&self) -> anyhow::Result<NodeStats> {
let storage = self.storage.lock().await;
Ok(NodeStats {
post_count: storage.post_count()?,
peer_count: storage.list_peers()?.len(),
follow_count: storage.list_follows()?.len(),
})
}
/// Start the accept loop (run in background)
pub fn start_accept_loop(&self) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let network = Arc::clone(&self.network);
tokio::spawn(async move { network.run_accept_loop().await })
}
/// Start pull cycle: every interval_secs, pull from connected peers + prefetch blobs.
pub fn start_pull_cycle(self: &Arc<Self>, interval_secs: u64) -> tokio::task::JoinHandle<()> {
let node = Arc::clone(self);
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
match node.network.pull_from_all().await {
Ok(stats) => {
if stats.posts_received > 0 {
tracing::debug!(
posts = stats.posts_received,
peers = stats.peers_pulled,
"Pull cycle complete"
);
// Prefetch blobs for newly received posts
let peers = node.network.conn_handle().connected_peers().await;
for peer_id in peers {
node.prefetch_blobs_from_peer(&peer_id).await;
}
}
}
Err(e) => {
tracing::debug!(error = %e, "Pull cycle failed");
}
}
}
})
}
/// Start diff cycle: every interval_secs, broadcast N1/N2 changes to connected peers.
pub fn start_diff_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let full_sync_interval = (4 * 60 * 60) / interval_secs; // every 4 hours
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
let mut tick_count: u64 = 0;
loop {
interval.tick().await;
tick_count += 1;
if tick_count % full_sync_interval == 0 {
// Full state re-broadcast every 4 hours to catch missed diffs
match network.broadcast_full_state().await {
Ok(count) => {
if count > 0 {
tracing::info!(count, "Full N1/N2 state broadcast (4h cycle)");
}
}
Err(e) => {
tracing::debug!(error = %e, "Full state broadcast failed");
}
}
} else {
match network.broadcast_diff().await {
Ok(count) => {
if count > 0 {
tracing::debug!(count, "Broadcast routing diff");
}
}
Err(e) => {
tracing::debug!(error = %e, "Routing diff broadcast failed");
}
}
}
}
})
}
/// Start rebalance cycle: every interval_secs, rebalance connection slots.
pub fn start_rebalance_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let timer = Arc::clone(&self.last_rebalance_ms);
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
timer.store(now, AtomicOrdering::Relaxed);
if let Err(e) = network.rebalance().await {
tracing::debug!(error = %e, "Rebalance failed");
}
}
})
}
/// Start the reactive growth loop: wakes on signal, sequentially fills local
/// slots with the most diverse N2 candidates. Each connection updates N2/N3
/// knowledge before picking the next candidate.
pub fn start_growth_loop(&self) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
network.set_growth_tx(tx.clone()).await;
// Initial kick: bootstrap may have already populated N2 before this started
let _ = tx.try_send(());
network.run_growth_loop(rx).await;
})
}
/// Start recovery loop: triggered when mesh drops below 2 connections.
/// Immediately reconnects to anchors and requests referrals.
pub fn start_recovery_loop(&self) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let storage = Arc::clone(&self.storage);
let node_id = self.node_id;
let alog = Arc::clone(&self.activity_log);
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
tokio::spawn(async move {
let log_evt = |level: ActivityLevel, cat: ActivityCategory, msg: String, peer: Option<NodeId>| {
if let Ok(mut log) = alog.try_lock() { log.log(level, cat, msg, peer); }
};
network.set_recovery_tx(tx).await;
while rx.recv().await.is_some() {
tracing::info!("Recovery triggered: reconnecting to anchors");
log_evt(ActivityLevel::Warn, ActivityCategory::Recovery, "Recovery triggered: mesh empty".into(), None);
// Debounce: wait briefly for more disconnects to settle
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Drain any queued signals
while rx.try_recv().is_ok() {}
// Gather anchors: known_anchors table, then anchor peers fallback
let anchors: Vec<(crate::types::NodeId, Vec<std::net::SocketAddr>)> = {
let s = storage.lock().await;
let known = s.list_known_anchors().unwrap_or_default();
if !known.is_empty() {
known
} else {
s.list_anchor_peers().unwrap_or_default()
.into_iter()
.map(|r| (r.node_id, r.addresses))
.collect()
}
};
for (anchor_nid, anchor_addrs) in &anchors {
if *anchor_nid == node_id { continue; }
// Connect to anchor (mesh or session fallback)
if !network.is_peer_connected_or_session(anchor_nid).await {
let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) {
Ok(eid) => eid,
Err(_) => continue,
};
let mut addr = iroh::EndpointAddr::from(endpoint_id);
for sa in anchor_addrs {
addr = addr.with_ip_addr(*sa);
}
match network.connect_to_anchor(*anchor_nid, addr).await {
Ok(()) => {
log_evt(ActivityLevel::Info, ActivityCategory::Recovery, "Connected to anchor".into(), Some(*anchor_nid));
}
Err(e) => {
tracing::debug!(error = %e, "Recovery: anchor connect failed");
log_evt(ActivityLevel::Warn, ActivityCategory::Recovery, format!("Anchor connect failed: {}", e), Some(*anchor_nid));
continue;
}
}
}
// Register with anchor
let _ = network.send_anchor_register(anchor_nid).await;
// Request referrals
match network.request_anchor_referrals(anchor_nid).await {
Ok(referrals) => {
for referral in referrals {
if referral.node_id == node_id { continue; }
if let Some(addr_str) = referral.addresses.first() {
let connect_str = format!(
"{}@{}", hex::encode(referral.node_id), addr_str,
);
if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) {
match network.connect_to_peer(rid, raddr).await {
Ok(()) => {
tracing::info!(peer = hex::encode(rid), "Recovery: connected to referred peer");
log_evt(ActivityLevel::Info, ActivityCategory::Recovery, "Connected to referred peer".into(), Some(rid));
}
Err(_) => {
match network.connect_via_introduction(rid, *anchor_nid).await {
Ok(()) => {
tracing::info!(peer = hex::encode(rid), "Recovery: connected via hole punch");
log_evt(ActivityLevel::Info, ActivityCategory::Recovery, "Connected via hole punch".into(), Some(rid));
}
Err(e) => {
tracing::debug!(error = %e, peer = hex::encode(rid), "Recovery: hole punch failed");
log_evt(ActivityLevel::Warn, ActivityCategory::Recovery, format!("Hole punch failed: {}", e), Some(rid));
}
}
}
}
}
}
}
}
Err(e) => tracing::debug!(error = %e, "Recovery: referral request failed"),
}
}
let conn_count = network.connection_count().await;
tracing::info!(connections = conn_count, "Recovery complete");
log_evt(ActivityLevel::Info, ActivityCategory::Recovery, format!("Recovery complete, {} connections", conn_count), None);
}
})
}
/// Start social checkin cycle: every interval_secs, refresh stale social routes.
/// Uses ephemeral connections if not persistently connected.
pub fn start_social_checkin_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let storage = Arc::clone(&self.storage);
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let stale = {
let s = storage.lock().await;
s.list_stale_social_routes(interval_secs as u64 * 1000).unwrap_or_default()
};
for route in stale {
let our_addrs: Vec<String> = network.endpoint_addr().ip_addrs()
.map(|s| s.to_string()).collect();
let result = network.send_social_checkin(
&route.node_id, &our_addrs, &[],
).await;
match result {
Ok(reply) => {
let s = storage.lock().await;
let addrs: Vec<std::net::SocketAddr> = reply.addresses.iter()
.filter_map(|a| a.parse().ok()).collect();
let _ = s.touch_social_route_connect(
&reply.node_id, &addrs, ReachMethod::Direct,
);
let _ = s.update_social_route_peer_addrs(
&reply.node_id, &reply.peer_addresses,
);
}
Err(e) => {
tracing::debug!(
peer = hex::encode(route.node_id),
error = %e,
"Social checkin failed"
);
}
}
}
}
})
}
/// Register with all connected anchor peers. Returns count registered.
pub async fn register_with_anchors(&self) -> usize {
let conns = self.network.connection_info().await;
let mut count = 0;
for (nid, _, _) in &conns {
if self.network.is_anchor_peer(nid).await {
match self.network.send_anchor_register(nid).await {
Ok(()) => {
count += 1;
info!(anchor = hex::encode(nid), "Registered with anchor");
}
Err(e) => debug!(error = %e, anchor = hex::encode(nid), "Anchor register failed"),
}
}
}
count
}
/// Start anchor register cycle: periodically re-register with anchors and request referrals
/// when connection count is low.
pub fn start_anchor_register_cycle(&self, interval_secs: u64) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let storage = Arc::clone(&self.storage);
let node_id = self.node_id;
let alog = Arc::clone(&self.activity_log);
let timer = Arc::clone(&self.last_anchor_register_ms);
tokio::spawn(async move {
let log_evt = |level: ActivityLevel, cat: ActivityCategory, msg: String, peer: Option<NodeId>| {
if let Ok(mut log) = alog.try_lock() { log.log(level, cat, msg, peer); }
};
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
timer.store(now, AtomicOrdering::Relaxed);
// Re-register with connected anchors (mesh + session)
let conns = network.connection_info().await;
let session_peers = network.session_peer_ids().await;
let mut registered_anchors = std::collections::HashSet::new();
// Mesh-connected anchors
for (nid, _, _) in &conns {
if network.is_anchor_peer(nid).await {
match network.send_anchor_register(nid).await {
Ok(()) => {
log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Re-registered with anchor".into(), Some(*nid));
registered_anchors.insert(*nid);
}
Err(e) => {
tracing::debug!(error = %e, "Anchor re-register failed");
log_evt(ActivityLevel::Warn, ActivityCategory::Anchor, format!("Re-register failed: {}", e), Some(*nid));
}
}
}
}
// Session-connected anchors (e.g. anchor with full mesh)
for nid in &session_peers {
if registered_anchors.contains(nid) { continue; }
if network.is_anchor_peer(nid).await {
match network.send_anchor_register(nid).await {
Ok(()) => {
log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Re-registered with anchor (session)".into(), Some(*nid));
}
Err(e) => {
tracing::debug!(error = %e, "Anchor session re-register failed");
}
}
}
}
// If few connections, try requesting referrals from known anchors
let conn_count = network.connection_count().await;
if conn_count < 10 {
log_evt(ActivityLevel::Info, ActivityCategory::Anchor, format!("Low connections ({}), requesting referrals", conn_count), None);
let known = {
let s = storage.lock().await;
s.list_known_anchors().unwrap_or_default()
};
for (anchor_nid, anchor_addrs) in known {
if anchor_nid == node_id {
continue;
}
// Connect if not already connected (mesh or session)
if !network.is_peer_connected_or_session(&anchor_nid).await {
let endpoint_id = match iroh::EndpointId::from_bytes(&anchor_nid) {
Ok(eid) => eid,
Err(_) => continue,
};
let mut addr = iroh::EndpointAddr::from(endpoint_id);
for sa in &anchor_addrs {
addr = addr.with_ip_addr(*sa);
}
if let Err(e) = network.connect_to_anchor(anchor_nid, addr).await {
tracing::debug!(error = %e, "Anchor cycle: connect failed");
continue;
}
}
match network.request_anchor_referrals(&anchor_nid).await {
Ok(referrals) => {
for referral in referrals {
if referral.node_id == node_id {
continue;
}
if let Some(addr_str) = referral.addresses.first() {
let connect_str = format!(
"{}@{}",
hex::encode(referral.node_id),
addr_str,
);
if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) {
match network.connect_to_peer(rid, raddr).await {
Ok(()) => {
tracing::info!(peer = hex::encode(rid), "Anchor cycle: connected to referred peer");
log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Connected to referred peer".into(), Some(rid));
}
Err(_) => {
match network.connect_via_introduction(rid, anchor_nid).await {
Ok(()) => {
tracing::info!(peer = hex::encode(rid), "Anchor cycle: connected via hole punch");
log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Connected via hole punch".into(), Some(rid));
}
Err(e) => {
tracing::debug!(error = %e, peer = hex::encode(rid), "Anchor cycle: hole punch failed");
log_evt(ActivityLevel::Warn, ActivityCategory::Anchor, format!("Hole punch failed: {}", e), Some(rid));
}
}
}
}
}
}
}
}
Err(e) => tracing::debug!(error = %e, "Anchor cycle: referral request failed"),
}
}
}
// Anchor self-verification probe
{
let probe_due = network.conn_handle().probe_due().await;
if probe_due {
log_evt(ActivityLevel::Info, ActivityCategory::Anchor, "Initiating anchor self-verification probe".into(), None);
match network.conn_handle().initiate_anchor_probe().await {
Ok(true) => {}, // success already logged inside
Ok(false) => {}, // failure already logged inside
Err(e) => {
tracing::debug!(error = %e, "Anchor probe error");
}
}
}
}
}
})
}
/// Start bootstrap connectivity check: 24 hours after startup, verify the bootstrap
/// anchor is within our network knowledge (N1/N2/N3). If not, we may be in an isolated
/// segment — reconnect to bootstrap and request referrals to bridge back.
pub fn start_bootstrap_connectivity_check(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
let node = Arc::clone(self);
tokio::spawn(async move {
// Wait 24 hours before first check
tokio::time::sleep(std::time::Duration::from_secs(24 * 60 * 60)).await;
let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 60 * 60));
loop {
interval.tick().await;
// Parse bootstrap anchor NodeId
let bootstrap_nid = match crate::parse_connect_string(DEFAULT_ANCHOR) {
Ok((nid, _)) => nid,
Err(_) => continue,
};
// Skip if we ARE the bootstrap
if bootstrap_nid == node.node_id {
continue;
}
// Check if bootstrap is in N1 (mesh), N2, or N3
let is_reachable = {
let connected = node.network.is_connected(&bootstrap_nid).await;
if connected {
true
} else {
let storage = node.storage.lock().await;
let in_n2 = storage.find_in_n2(&bootstrap_nid).unwrap_or_default();
if !in_n2.is_empty() {
true
} else {
let in_n3 = storage.find_in_n3(&bootstrap_nid).unwrap_or_default();
!in_n3.is_empty()
}
}
};
if is_reachable {
tracing::debug!("Bootstrap connectivity check: bootstrap in reach, network OK");
continue;
}
// Bootstrap not in N1/N2/N3 — we may be isolated
tracing::info!("Bootstrap connectivity check: bootstrap not in reach, reconnecting");
// Connect to bootstrap and request referrals
if let Err(e) = node.connect_by_node_id(bootstrap_nid).await {
tracing::warn!(error = %e, "Bootstrap connectivity: failed to connect");
continue;
}
// Report bootstrap in our N1 for 24 hours so peers learn about it
node.network.conn_handle().add_sticky_n1(&bootstrap_nid, 24 * 60 * 60 * 1000);
match node.network.request_anchor_referrals(&bootstrap_nid).await {
Ok(referrals) => {
tracing::info!(count = referrals.len(), "Bootstrap connectivity: got referrals");
for referral in referrals {
if referral.node_id == node.node_id { continue; }
if let Some(addr_str) = referral.addresses.first() {
let connect_str = format!("{}@{}", hex::encode(referral.node_id), addr_str);
if let Ok((rid, raddr)) = crate::parse_connect_string(&connect_str) {
let _ = node.network.connect_to_peer(rid, raddr).await;
}
}
}
}
Err(e) => {
tracing::warn!(error = %e, "Bootstrap connectivity: referral request failed");
}
}
}
})
}
/// Start CDN manifest refresh cycle: periodically ask upstream for newer manifests.
/// Manifests older than `max_age_ms` are refreshed from their upstream source.
pub fn start_manifest_refresh_cycle(&self, interval_secs: u64, max_age_ms: u64) -> tokio::task::JoinHandle<()> {
let network = Arc::clone(&self.network);
let storage = Arc::clone(&self.storage);
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let cutoff = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64 - max_age_ms;
let stale = {
let s = storage.lock().await;
s.get_stale_manifests(cutoff).unwrap_or_default()
};
for (cid, upstream_nid, _upstream_addrs) in &stale {
// Get current updated_at for this manifest
let current_updated_at = {
let s = storage.lock().await;
s.get_cdn_manifest(cid).ok().flatten()
.and_then(|json| serde_json::from_str::<crate::types::AuthorManifest>(&json).ok())
.map(|m| m.updated_at)
.unwrap_or(0)
};
match network.request_manifest_refresh(cid, upstream_nid, current_updated_at).await {
Ok(Some(cdn_manifest)) => {
if crypto::verify_manifest_signature(&cdn_manifest.author_manifest) {
let author_json = serde_json::to_string(&cdn_manifest.author_manifest).unwrap_or_default();
let s = storage.lock().await;
let _ = s.store_cdn_manifest(
cid,
&author_json,
&cdn_manifest.author_manifest.author,
cdn_manifest.author_manifest.updated_at,
);
// Relay to our downstream
let downstream = s.get_blob_downstream(cid).unwrap_or_default();
drop(s);
if !downstream.is_empty() {
network.push_manifest_to_downstream(cid, &cdn_manifest).await;
}
tracing::debug!(
cid = hex::encode(cid),
"Refreshed stale manifest from upstream"
);
}
}
Ok(None) => {} // No update available
Err(e) => {
tracing::debug!(
cid = hex::encode(cid),
upstream = hex::encode(upstream_nid),
error = %e,
"Manifest refresh from upstream failed"
);
}
}
}
}
})
}
/// Build our N+10:Addresses (our connected peers with their addresses).
pub async fn build_peer_addresses(&self) -> Vec<PeerWithAddress> {
let conns = self.network.connection_info().await;
let storage = self.storage.lock().await;
let mut result = Vec::new();
for (nid, kind, _) in conns {
if nid == self.node_id {
continue;
}
// Prefer social peers
if kind != PeerSlotKind::Local && result.len() >= 10 {
continue;
}
let addrs: Vec<String> = storage.get_peer_record(&nid)
.ok()
.flatten()
.map(|r| r.addresses.iter().map(|a| a.to_string()).collect())
.unwrap_or_default();
result.push(PeerWithAddress {
n: hex::encode(nid),
a: addrs,
});
if result.len() >= 10 {
break;
}
}
result
}
/// List all social routes (for CLI/Tauri display).
pub async fn list_social_routes(&self) -> anyhow::Result<Vec<SocialRouteEntry>> {
let storage = self.storage.lock().await;
storage.list_social_routes()
}
// ---- Audience ----
pub async fn request_audience(&self, node_id: &NodeId) -> anyhow::Result<()> {
{
let storage = self.storage.lock().await;
storage.store_audience(node_id, AudienceDirection::Outbound, AudienceStatus::Pending)?;
}
// Send the request (persistent if available, ephemeral otherwise)
if let Err(e) = self.network.send_audience_request(node_id).await {
warn!(peer = hex::encode(node_id), error = %e, "Failed to send audience request");
}
info!(peer = hex::encode(node_id), "Requested audience membership");
Ok(())
}
pub async fn approve_audience(&self, node_id: &NodeId) -> anyhow::Result<()> {
let connected = self.network.is_connected(node_id).await;
{
let storage = self.storage.lock().await;
storage.store_audience(node_id, AudienceDirection::Inbound, AudienceStatus::Approved)?;
// Upsert social route (Audience or Mutual)
let is_follow = storage.list_follows()?.contains(node_id);
let relation = if is_follow { SocialRelation::Mutual } else { SocialRelation::Audience };
let addresses = storage.get_peer_record(node_id)?
.map(|r| r.addresses).unwrap_or_default();
let peer_addresses = storage.build_peer_addresses_for(node_id)?;
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default().as_millis() as u64;
let preferred_tree = storage.build_preferred_tree_for(node_id).unwrap_or_default();
storage.upsert_social_route(&SocialRouteEntry {
node_id: *node_id,
addresses,
peer_addresses,
relation,
status: if connected { SocialStatus::Online } else { SocialStatus::Disconnected },
last_connected_ms: 0,
last_seen_ms: now,
reach_method: ReachMethod::Direct,
preferred_tree,
})?;
}
// Send approval response (persistent if available, ephemeral otherwise)
if let Err(e) = self.network.send_audience_response(node_id, true).await {
warn!(peer = hex::encode(node_id), error = %e, "Failed to send audience approval");
}
info!(peer = hex::encode(node_id), "Approved audience request");
Ok(())
}
pub async fn deny_audience(&self, node_id: &NodeId) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.store_audience(node_id, AudienceDirection::Inbound, AudienceStatus::Denied)?;
Ok(())
}
pub async fn remove_audience(&self, node_id: &NodeId) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.remove_audience(node_id, AudienceDirection::Inbound)?;
// Downgrade or remove social route
let is_follow = storage.list_follows()?.contains(node_id);
if is_follow {
if let Some(mut route) = storage.get_social_route(node_id)? {
route.relation = SocialRelation::Follow;
storage.upsert_social_route(&route)?;
}
} else {
storage.remove_social_route(node_id)?;
}
Ok(())
}
pub async fn list_audience_members(&self) -> anyhow::Result<Vec<NodeId>> {
let storage = self.storage.lock().await;
storage.list_audience_members()
}
pub async fn list_audience(
&self,
direction: AudienceDirection,
status: Option<AudienceStatus>,
) -> anyhow::Result<Vec<AudienceRecord>> {
let storage = self.storage.lock().await;
storage.list_audience(direction, status)
}
// ---- Blob Eviction ----
/// Compute priority score for a blob. Higher score = keep longer.
pub fn compute_blob_priority(
&self,
candidate: &crate::storage::EvictionCandidate,
follows: &[NodeId],
audience_members: &[NodeId],
now_ms: u64,
) -> f64 {
compute_blob_priority_standalone(candidate, &self.node_id, follows, audience_members, now_ms)
}
/// Delete a blob with CDN notifications to upstream/downstream.
pub async fn delete_blob_with_cdn_notify(&self, cid: &[u8; 32]) -> anyhow::Result<()> {
// Gather CDN peers before cleanup
let (downstream, upstream) = {
let storage = self.storage.lock().await;
let ds = storage.get_blob_downstream(cid).unwrap_or_default();
let up = storage.get_blob_upstream(cid).ok().flatten();
(ds, up)
};
// Send CDN delete notices
self.network.send_blob_delete_notices(cid, &downstream, upstream.as_ref()).await;
// Clean up local storage
{
let storage = self.storage.lock().await;
storage.cleanup_cdn_for_blob(cid)?;
storage.remove_blob(cid)?;
}
// Delete from filesystem
let _ = self.blob_store.delete(cid);
Ok(())
}
/// Evict lowest-priority blobs until total storage is under max_bytes.
pub async fn evict_blobs(&self, max_bytes: u64) -> anyhow::Result<usize> {
let total = {
let storage = self.storage.lock().await;
storage.total_blob_bytes()?
};
if total <= max_bytes {
return Ok(0);
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
// 1-hour staleness for replica counts
let staleness_ms = 3600 * 1000;
let (candidates, follows, audience_members) = {
let storage = self.storage.lock().await;
let candidates = storage.get_eviction_candidates(staleness_ms)?;
let follows = storage.list_follows().unwrap_or_default();
let audience = storage.list_audience_members().unwrap_or_default();
(candidates, follows, audience)
};
// Score and sort ascending (lowest priority first)
let mut scored: Vec<(f64, &crate::storage::EvictionCandidate)> = candidates
.iter()
.map(|c| (self.compute_blob_priority(c, &follows, &audience_members, now), c))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let mut bytes_freed: u64 = 0;
let target_free = total - max_bytes;
let mut evicted = 0;
for (score, candidate) in &scored {
if bytes_freed >= target_free {
break;
}
if let Err(e) = self.delete_blob_with_cdn_notify(&candidate.cid).await {
warn!(cid = hex::encode(candidate.cid), error = %e, "Failed to evict blob");
continue;
}
bytes_freed += candidate.size_bytes;
evicted += 1;
info!(
cid = hex::encode(candidate.cid),
score = score,
size = candidate.size_bytes,
"Evicted blob"
);
}
info!(evicted, bytes_freed, "Blob eviction complete");
Ok(evicted)
}
/// Start a periodic eviction cycle.
pub fn start_eviction_cycle(
node: Arc<Self>,
interval_secs: u64,
max_bytes: u64,
) -> tokio::task::JoinHandle<()>
where
Self: Send + Sync + 'static,
{
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
match node.evict_blobs(max_bytes).await {
Ok(0) => {}
Ok(n) => info!(evicted = n, "Eviction cycle complete"),
Err(e) => warn!(error = %e, "Eviction cycle failed"),
}
}
})
}
/// Start UPnP lease renewal cycle. Renews every lease_secs/2.
/// On 3 consecutive failures: clears is_anchor and logs a warning.
pub fn start_upnp_renewal_cycle(&self) -> Option<tokio::task::JoinHandle<()>> {
let mapping = self.network.upnp_mapping()?;
let local_port = mapping.local_port;
let external_port = mapping.external_addr.port();
let interval_secs = (mapping.lease_secs / 2) as u64;
let network = Arc::clone(&self.network);
let alog = Arc::clone(&self.activity_log);
Some(tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
let mut consecutive_failures: u32 = 0;
loop {
interval.tick().await;
if crate::upnp::renew_upnp_mapping(local_port, external_port).await {
consecutive_failures = 0;
debug!("UPnP: lease renewed (port {})", external_port);
} else {
consecutive_failures += 1;
warn!("UPnP: renewal failed ({}/3)", consecutive_failures);
if consecutive_failures >= 3 {
network.clear_anchor();
if let Ok(mut log) = alog.try_lock() {
log.log(
ActivityLevel::Warn,
ActivityCategory::Connection,
"UPnP lease lost after 3 renewal failures, auto-anchor disabled".into(),
None,
);
}
warn!("UPnP: 3 consecutive renewal failures, auto-anchor disabled");
return; // stop the cycle
}
}
}
}))
}
// --- HTTP Post Delivery ---
/// Start the HTTP server for serving public posts to browsers.
/// Only starts if this node is publicly TCP-reachable.
pub fn start_http_server(&self) -> Option<tokio::task::JoinHandle<()>> {
if !self.network.is_http_capable() {
debug!("HTTP server not started: node is not publicly TCP-reachable");
return None;
}
let port = self.network.bound_port();
if port == 0 {
return None;
}
let storage = Arc::clone(&self.storage);
let blob_store = Arc::clone(&self.blob_store);
let downstream_addrs = Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::<[u8; 32], Vec<std::net::SocketAddr>>::new(),
));
// Advertise HTTP capability to peers
let http_addr = self.network.http_addr();
self.network.conn_handle().set_http_info(true, http_addr.clone());
// Also update the ConnectionManager's fields for payload construction
{
let rt = tokio::runtime::Handle::current();
let conn_mgr = Arc::clone(&self.network.conn_mgr_arc());
rt.spawn(async move {
let mut cm = conn_mgr.lock().await;
cm.http_capable = true;
cm.http_addr = http_addr;
});
}
info!("Starting HTTP server on TCP port {}", port);
Some(tokio::spawn(async move {
if let Err(e) = crate::http::run_http_server(port, storage, blob_store, downstream_addrs).await {
warn!("HTTP server stopped: {}", e);
}
}))
}
/// Start the web redirect handler (itsgoin.net share link resolution).
pub fn start_web_handler(self: &Arc<Self>, port: u16) -> tokio::task::JoinHandle<()> {
let node = Arc::clone(self);
info!("Starting web redirect handler on port {}", port);
tokio::spawn(async move {
if let Err(e) = crate::web::run_web_handler(port, node).await {
warn!("Web redirect handler stopped: {}", e);
}
})
}
/// Start UPnP TCP lease renewal cycle alongside the UDP renewal.
pub fn start_upnp_tcp_renewal_cycle(&self) -> Option<tokio::task::JoinHandle<()>> {
if !self.network.has_upnp_tcp() {
return None;
}
let mapping = self.network.upnp_mapping()?;
let local_port = mapping.local_port;
let external_port = mapping.external_addr.port();
let interval_secs = (mapping.lease_secs / 2) as u64;
Some(tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
if !crate::upnp::renew_upnp_tcp_mapping(local_port, external_port).await {
warn!("UPnP: TCP lease renewal failed");
// Don't stop the cycle — TCP is best-effort
}
}
}))
}
/// Generate a share link URL for a public post.
/// Returns None if post is not public or not found.
pub async fn generate_share_link(&self, post_id: &PostId) -> anyhow::Result<Option<String>> {
// Look up the post to verify it's public and get the author
let (post, visibility) = {
let store = self.storage.lock().await;
match store.get_post_with_visibility(post_id)? {
Some(pv) => pv,
None => return Ok(None),
}
};
if !matches!(visibility, PostVisibility::Public) {
return Ok(None);
}
let post_hex = hex::encode(post_id);
let author_hex = hex::encode(post.author);
Ok(Some(format!("https://itsgoin.net/p/{}/{}", post_hex, author_hex)))
}
// --- Engagement API ---
/// React to a post with an emoji. If `private`, encrypts payload for post author only.
pub async fn react_to_post(
&self,
post_id: PostId,
emoji: String,
private: bool,
) -> anyhow::Result<crate::types::Reaction> {
let our_node_id = self.node_id;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
// For private reactions, look up the post author and encrypt
let encrypted_payload = if private {
let storage = self.storage.lock().await;
let post = storage.get_post(&post_id)?
.ok_or_else(|| anyhow::anyhow!("post not found"))?;
drop(storage);
let seed = self.secret_seed;
let payload_json = serde_json::json!({
"emoji": emoji,
"reactor": hex::encode(our_node_id),
"timestamp_ms": now,
}).to_string();
Some(crate::crypto::encrypt_private_reaction(&seed, &post.author, &payload_json)?)
} else {
None
};
let reaction = crate::types::Reaction {
reactor: our_node_id,
emoji: emoji.clone(),
post_id,
timestamp_ms: now,
encrypted_payload,
};
// Store locally
let storage = self.storage.lock().await;
storage.store_reaction(&reaction)?;
drop(storage);
// Propagate via BlobHeaderDiff to downstream + upstream
{
let network = &self.network;
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: our_node_id,
ops: vec![crate::types::BlobHeaderDiffOp::AddReaction(reaction.clone())],
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send upstream (toward author)
let upstream = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
};
if let Some(up) = upstream {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(reaction)
}
/// Remove a reaction from a post.
pub async fn remove_reaction(&self, post_id: PostId, emoji: String) -> anyhow::Result<()> {
let our_node_id = self.node_id;
let storage = self.storage.lock().await;
storage.remove_reaction(&our_node_id, &post_id, &emoji)?;
drop(storage);
// Propagate removal
{
let network = &self.network;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: our_node_id,
ops: vec![crate::types::BlobHeaderDiffOp::RemoveReaction {
reactor: our_node_id,
emoji,
post_id,
}],
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
}
Ok(())
}
/// Get all reactions for a post. Decrypts private reactions if we're the post author.
pub async fn get_reactions(&self, post_id: PostId) -> anyhow::Result<Vec<crate::types::Reaction>> {
let storage = self.storage.lock().await;
let reactions = storage.get_reactions(&post_id)?;
let post_info = storage.get_post(&post_id)?;
drop(storage);
let our_node_id = self.node_id;
// If we're the author, decrypt private reactions
if let Some(post) = post_info {
if post.author == our_node_id {
let seed = self.secret_seed;
return Ok(reactions.into_iter().map(|mut r| {
if let Some(ref enc) = r.encrypted_payload {
if let Ok(decrypted) = crate::crypto::decrypt_private_reaction(&seed, &r.reactor, enc) {
r.encrypted_payload = Some(decrypted);
}
}
r
}).collect());
}
}
Ok(reactions)
}
/// Get reaction counts grouped by emoji for a post.
pub async fn get_reaction_counts(&self, post_id: PostId) -> anyhow::Result<Vec<(String, u64, bool)>> {
let our_node_id = self.node_id;
let storage = self.storage.lock().await;
let counts = storage.get_reaction_counts(&post_id, &our_node_id)?;
Ok(counts)
}
/// Add a comment to a post (signed with our key).
pub async fn comment_on_post(
&self,
post_id: PostId,
content: String,
) -> anyhow::Result<crate::types::InlineComment> {
let our_node_id = self.node_id;
let seed = self.secret_seed;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let signature = crate::crypto::sign_comment(&seed, &our_node_id, &post_id, &content, now);
let comment = crate::types::InlineComment {
author: our_node_id,
post_id,
content,
timestamp_ms: now,
signature,
};
let storage = self.storage.lock().await;
storage.store_comment(&comment)?;
drop(storage);
// Propagate via BlobHeaderDiff to downstream + upstream
{
let network = &self.network;
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: our_node_id,
ops: vec![crate::types::BlobHeaderDiffOp::AddComment(comment.clone())],
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send upstream (toward author)
let upstream = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
};
if let Some(up) = upstream {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(comment)
}
/// Edit one of your own comments on a post.
pub async fn edit_comment(
&self,
post_id: PostId,
timestamp_ms: u64,
new_content: String,
) -> anyhow::Result<()> {
let our_node_id = self.node_id;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let storage = self.storage.lock().await;
storage.edit_comment(&our_node_id, &post_id, timestamp_ms, &new_content)?;
drop(storage);
// Propagate via BlobHeaderDiff
{
let network = &self.network;
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: our_node_id,
ops: vec![crate::types::BlobHeaderDiffOp::EditComment {
author: our_node_id,
post_id,
timestamp_ms,
new_content,
}],
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
let upstream = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
};
if let Some(up) = upstream {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(())
}
/// Delete one of your own comments on a post.
pub async fn delete_comment(
&self,
post_id: PostId,
timestamp_ms: u64,
) -> anyhow::Result<()> {
let our_node_id = self.node_id;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let storage = self.storage.lock().await;
storage.delete_comment(&our_node_id, &post_id, timestamp_ms)?;
drop(storage);
// Propagate via BlobHeaderDiff
{
let network = &self.network;
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: our_node_id,
ops: vec![crate::types::BlobHeaderDiffOp::DeleteComment {
author: our_node_id,
post_id,
timestamp_ms,
}],
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
let upstream = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
};
if let Some(up) = upstream {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
Ok(())
}
/// Get all comments for a post.
pub async fn get_comments(&self, post_id: PostId) -> anyhow::Result<Vec<crate::types::InlineComment>> {
let storage = self.storage.lock().await;
let comments = storage.get_comments(&post_id)?;
Ok(comments)
}
/// Set the comment/reaction policy for a post (author-only).
pub async fn set_comment_policy(
&self,
post_id: PostId,
policy: crate::types::CommentPolicy,
) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.set_comment_policy(&post_id, &policy)?;
drop(storage);
// Propagate policy change
{
let network = &self.network;
let our_node_id = self.node_id;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: our_node_id,
ops: vec![crate::types::BlobHeaderDiffOp::SetPolicy(policy)],
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
}
Ok(())
}
/// Get the comment policy for a post.
pub async fn get_comment_policy(&self, post_id: PostId) -> anyhow::Result<Option<crate::types::CommentPolicy>> {
let storage = self.storage.lock().await;
let policy = storage.get_comment_policy(&post_id)?;
Ok(policy)
}
/// Get the full comment thread for a post (inline comments + split posts merged).
pub async fn get_comment_thread(&self, post_id: PostId) -> anyhow::Result<Vec<crate::types::InlineComment>> {
let storage = self.storage.lock().await;
// 1. Inline comments
let mut comments = storage.get_comments(&post_id)?;
// 2. Split posts (thread children)
let children = storage.get_thread_children(&post_id)?;
for child_id in children {
if let Ok(Some(child_post)) = storage.get_post(&child_id) {
// Split posts store comments as JSON in content
if let Ok(split_comments) = serde_json::from_str::<Vec<crate::types::InlineComment>>(&child_post.content) {
comments.extend(split_comments);
}
}
}
// Dedup by (author, timestamp_ms) and sort
let mut seen = std::collections::HashSet::new();
comments.retain(|c| seen.insert((c.author, c.timestamp_ms)));
comments.sort_by_key(|c| c.timestamp_ms);
Ok(comments)
}
// --- Encrypted receipt/comment slot methods ---
/// Unwrap the CEK for a post we are a participant of, returning (cek, sorted_participants).
/// Returns None if this is a public post or we cannot decrypt.
async fn get_post_cek_and_participants(
&self,
post_id: &PostId,
) -> anyhow::Result<Option<([u8; 32], Vec<NodeId>)>> {
let storage = self.storage.lock().await;
let (post, visibility) = match storage.get_post_with_visibility(post_id)? {
Some(pv) => pv,
None => return Ok(None),
};
drop(storage);
match &visibility {
PostVisibility::Public => Ok(None),
PostVisibility::Encrypted { recipients } => {
let cek = crypto::unwrap_cek_for_recipient(
&self.secret_seed,
&self.node_id,
&post.author,
recipients,
)?;
match cek {
Some(cek) => {
let mut participants: Vec<NodeId> = recipients.iter().map(|wk| wk.recipient).collect();
participants.sort();
participants.dedup();
Ok(Some((cek, participants)))
}
None => Ok(None),
}
}
PostVisibility::GroupEncrypted { group_id, epoch, wrapped_cek } => {
let storage = self.storage.lock().await;
let group_seeds = storage.get_all_group_seeds_map().unwrap_or_default();
let group_key_record = storage.get_group_key(group_id)?;
let members = if let Some(ref gk) = group_key_record {
storage.get_circle_members(&gk.circle_name).unwrap_or_default()
} else {
vec![]
};
drop(storage);
if let Some((seed, pubkey)) = group_seeds.get(&(*group_id, *epoch)) {
let cek = crypto::unwrap_group_cek(seed, pubkey, wrapped_cek)?;
let mut participants: Vec<NodeId> = members;
// Ensure the author is included
if !participants.contains(&post.author) {
participants.push(post.author);
}
participants.sort();
participants.dedup();
Ok(Some((cek, participants)))
} else {
Ok(None)
}
}
}
}
/// Write our receipt slot for an encrypted post.
/// `state` is the receipt state, `emoji` is optional (only used when state == Reacted).
pub async fn write_receipt_slot(
&self,
post_id: PostId,
state: crate::types::ReceiptState,
emoji: Option<String>,
) -> anyhow::Result<()> {
let (cek, participants) = self.get_post_cek_and_participants(&post_id).await?
.ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?;
let slot_key = crypto::derive_slot_key(&cek);
// Find our slot index (sorted participant position)
let our_slot = participants.iter().position(|nid| nid == &self.node_id)
.ok_or_else(|| anyhow::anyhow!("our node_id not found in participants"))?;
// Build plaintext: [1 byte state][8 bytes timestamp_ms][23 bytes emoji+padding]
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let mut plaintext = [0u8; 32];
plaintext[0] = state as u8;
plaintext[1..9].copy_from_slice(&now.to_le_bytes());
if let Some(ref emoji_str) = emoji {
let emoji_bytes = emoji_str.as_bytes();
let copy_len = emoji_bytes.len().min(23);
plaintext[9..9 + copy_len].copy_from_slice(&emoji_bytes[..copy_len]);
}
let encrypted = crypto::encrypt_slot(&plaintext, &slot_key)?;
// Update the BlobHeader
let storage = self.storage.lock().await;
let header = storage.get_blob_header(&post_id)?;
let mut blob_header = if let Some((json, _ts)) = header {
serde_json::from_str::<crate::types::BlobHeader>(&json)
.unwrap_or_else(|_| crate::types::BlobHeader {
post_id,
author: self.node_id,
reactions: vec![],
comments: vec![],
policy: Default::default(),
updated_at: now,
thread_splits: vec![],
receipt_slots: vec![],
comment_slots: vec![],
})
} else {
crate::types::BlobHeader {
post_id,
author: self.node_id,
reactions: vec![],
comments: vec![],
policy: Default::default(),
updated_at: now,
thread_splits: vec![],
receipt_slots: vec![],
comment_slots: vec![],
}
};
// Ensure enough slots exist
while blob_header.receipt_slots.len() <= our_slot {
blob_header.receipt_slots.push(crypto::random_slot_noise(64));
}
blob_header.receipt_slots[our_slot] = encrypted.clone();
blob_header.updated_at = now;
let header_json = serde_json::to_string(&blob_header)?;
storage.store_blob_header(&post_id, &blob_header.author, &header_json, now)?;
drop(storage);
// Propagate via BlobHeaderDiff
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: self.node_id,
ops: vec![crate::types::BlobHeaderDiffOp::WriteReceiptSlot {
post_id,
slot_index: our_slot as u32,
data: encrypted,
}],
timestamp_ms: now,
};
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
let upstream = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
};
if let Some(up) = upstream {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
Ok(())
}
/// Write a private comment to an encrypted post's comment slot.
pub async fn write_comment_slot(
&self,
post_id: PostId,
content: String,
) -> anyhow::Result<()> {
let (cek, _participants) = self.get_post_cek_and_participants(&post_id).await?
.ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?;
let slot_key = crypto::derive_slot_key(&cek);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
// Build plaintext: [32 bytes author_node_id][8 bytes timestamp_ms][216 bytes content+padding]
let mut plaintext = [0u8; 256];
plaintext[..32].copy_from_slice(&self.node_id);
plaintext[32..40].copy_from_slice(&now.to_le_bytes());
let content_bytes = content.as_bytes();
let copy_len = content_bytes.len().min(216);
plaintext[40..40 + copy_len].copy_from_slice(&content_bytes[..copy_len]);
let encrypted = crypto::encrypt_slot(&plaintext, &slot_key)?;
// Find first available comment slot or add new ones
let storage = self.storage.lock().await;
let header = storage.get_blob_header(&post_id)?;
let mut blob_header = if let Some((json, _ts)) = header {
serde_json::from_str::<crate::types::BlobHeader>(&json)
.unwrap_or_else(|_| crate::types::BlobHeader {
post_id,
author: self.node_id,
reactions: vec![],
comments: vec![],
policy: Default::default(),
updated_at: now,
thread_splits: vec![],
receipt_slots: vec![],
comment_slots: vec![],
})
} else {
crate::types::BlobHeader {
post_id,
author: self.node_id,
reactions: vec![],
comments: vec![],
policy: Default::default(),
updated_at: now,
thread_splits: vec![],
receipt_slots: vec![],
comment_slots: vec![],
}
};
// Try to find an empty slot by attempting decryption
let mut target_index = None;
for (i, slot) in blob_header.comment_slots.iter().enumerate() {
if let Ok(decrypted) = crypto::decrypt_slot(slot, &slot_key) {
// Check if all 256 plaintext bytes are zero (empty)
if decrypted.len() == 256 && decrypted.iter().all(|&b| b == 0) {
target_index = Some(i);
break;
}
} else {
// Cannot decrypt — could be random noise (empty), use it
target_index = Some(i);
break;
}
}
let (slot_index, add_new) = if let Some(idx) = target_index {
(idx, false)
} else {
// No available slots — add one
let idx = blob_header.comment_slots.len();
blob_header.comment_slots.push(crypto::random_slot_noise(256));
(idx, true)
};
blob_header.comment_slots[slot_index] = encrypted.clone();
blob_header.updated_at = now;
let header_json = serde_json::to_string(&blob_header)?;
storage.store_blob_header(&post_id, &blob_header.author, &header_json, now)?;
drop(storage);
// Propagate
let op = if add_new {
crate::types::BlobHeaderDiffOp::AddCommentSlots {
post_id,
count: 1,
slots: vec![encrypted],
}
} else {
crate::types::BlobHeaderDiffOp::WriteCommentSlot {
post_id,
slot_index: slot_index as u32,
data: encrypted,
}
};
let diff = crate::protocol::BlobHeaderDiffPayload {
post_id,
author: self.node_id,
ops: vec![op],
timestamp_ms: now,
};
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
let upstream = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
};
if let Some(up) = upstream {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
Ok(())
}
/// Read and decrypt all receipt slots for an encrypted post.
pub async fn read_receipt_slots(
&self,
post_id: PostId,
) -> anyhow::Result<Vec<crate::types::ReceiptSlotData>> {
let (cek, participants) = self.get_post_cek_and_participants(&post_id).await?
.ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?;
let slot_key = crypto::derive_slot_key(&cek);
let storage = self.storage.lock().await;
let header = storage.get_blob_header(&post_id)?;
drop(storage);
let blob_header = match header {
Some((json, _ts)) => serde_json::from_str::<crate::types::BlobHeader>(&json)?,
None => return Ok(vec![]),
};
let mut results = Vec::new();
for (i, slot) in blob_header.receipt_slots.iter().enumerate() {
let participant_id = participants.get(i).copied();
match crypto::decrypt_slot(slot, &slot_key) {
Ok(plaintext) if plaintext.len() >= 9 => {
let state = crate::types::ReceiptState::from_u8(plaintext[0]);
let timestamp_ms = u64::from_le_bytes(
plaintext[1..9].try_into().unwrap_or([0u8; 8]),
);
let emoji = if state == crate::types::ReceiptState::Reacted && plaintext.len() >= 32 {
let emoji_bytes = &plaintext[9..32];
let end = emoji_bytes.iter().position(|&b| b == 0).unwrap_or(23);
if end > 0 {
String::from_utf8(emoji_bytes[..end].to_vec()).ok()
} else {
None
}
} else {
None
};
results.push(crate::types::ReceiptSlotData {
slot_index: i as u32,
node_id: participant_id,
state,
timestamp_ms,
emoji,
});
}
_ => {
// Could not decrypt — noise/uninitialized slot
results.push(crate::types::ReceiptSlotData {
slot_index: i as u32,
node_id: participant_id,
state: crate::types::ReceiptState::Empty,
timestamp_ms: 0,
emoji: None,
});
}
}
}
Ok(results)
}
/// Read and decrypt all comment slots for an encrypted post.
pub async fn read_comment_slots(
&self,
post_id: PostId,
) -> anyhow::Result<Vec<crate::types::CommentSlotData>> {
let (cek, _participants) = self.get_post_cek_and_participants(&post_id).await?
.ok_or_else(|| anyhow::anyhow!("not a participant of this encrypted post"))?;
let slot_key = crypto::derive_slot_key(&cek);
let storage = self.storage.lock().await;
let header = storage.get_blob_header(&post_id)?;
drop(storage);
let blob_header = match header {
Some((json, _ts)) => serde_json::from_str::<crate::types::BlobHeader>(&json)?,
None => return Ok(vec![]),
};
let mut results = Vec::new();
for (i, slot) in blob_header.comment_slots.iter().enumerate() {
match crypto::decrypt_slot(slot, &slot_key) {
Ok(plaintext) if plaintext.len() >= 40 => {
// Check if it's an empty slot (all zeros)
if plaintext.iter().all(|&b| b == 0) {
continue;
}
let mut author = [0u8; 32];
author.copy_from_slice(&plaintext[..32]);
// Skip if author is all zeros (empty)
if author == [0u8; 32] {
continue;
}
let timestamp_ms = u64::from_le_bytes(
plaintext[32..40].try_into().unwrap_or([0u8; 8]),
);
let content_bytes = &plaintext[40..];
let end = content_bytes.iter().position(|&b| b == 0).unwrap_or(content_bytes.len());
let content = String::from_utf8_lossy(&content_bytes[..end]).to_string();
results.push(crate::types::CommentSlotData {
slot_index: i as u32,
author,
timestamp_ms,
content,
});
}
_ => {
// Cannot decrypt or too short — skip
}
}
}
results.sort_by_key(|c| c.timestamp_ms);
Ok(results)
}
}
pub struct NodeStats {
pub post_count: usize,
pub peer_count: usize,
pub follow_count: usize,
}
/// Standalone priority scoring for testing.
/// score = pin_boost + (relationship × heart_recency × freshness / (peer_copies + 1))
pub fn compute_blob_priority_standalone(
candidate: &crate::storage::EvictionCandidate,
our_node_id: &NodeId,
follows: &[NodeId],
audience_members: &[NodeId],
now_ms: u64,
) -> f64 {
let pin_boost = if candidate.pinned { 1000.0 } else { 0.0 };
let relationship = if candidate.author == *our_node_id {
5.0
} else if follows.contains(&candidate.author) && audience_members.contains(&candidate.author) {
3.0
} else if follows.contains(&candidate.author) {
2.0
} else if audience_members.contains(&candidate.author) {
1.0
} else {
0.1
};
let thirty_days_ms = 30u64 * 24 * 3600 * 1000;
let access_age_ms = now_ms.saturating_sub(candidate.last_accessed_at);
let heart_recency = (1.0 - (access_age_ms as f64 / thirty_days_ms as f64)).max(0.0);
let post_age_days = now_ms.saturating_sub(candidate.created_at) as f64 / (24.0 * 3600.0 * 1000.0);
let freshness = 1.0 / (1.0 + post_age_days);
let copies_factor = 1.0 / (candidate.peer_copies as f64 + 1.0);
pin_boost + (relationship * heart_recency * freshness * copies_factor)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::EvictionCandidate;
fn make_node_id(byte: u8) -> NodeId {
[byte; 32]
}
fn make_candidate(
author: NodeId,
pinned: bool,
created_at: u64,
last_accessed_at: u64,
peer_copies: u32,
) -> EvictionCandidate {
EvictionCandidate {
cid: [0u8; 32],
post_id: [0u8; 32],
author,
size_bytes: 1000,
created_at,
last_accessed_at,
pinned,
peer_copies,
}
}
#[test]
fn own_pinned_scores_highest() {
let our_id = make_node_id(1);
let now = 10_000_000_000u64; // ~115 days in ms
let candidate = make_candidate(our_id, true, now - 86400_000, now, 0);
let score = compute_blob_priority_standalone(
&candidate, &our_id, &[], &[], now,
);
// Should be 1000 + (5.0 * 1.0 * ~0.5 * 1.0) = ~1002.5
assert!(score > 1000.0, "own pinned should score >1000, got {}", score);
}
#[test]
fn follow_recent_scores_higher_than_audience_stale() {
let our_id = make_node_id(1);
let follow_id = make_node_id(2);
let audience_id = make_node_id(3);
let now = 10_000_000_000u64;
// Follow: recently accessed, 1-day-old post
let follow_candidate = make_candidate(follow_id, false, now - 86400_000, now, 0);
let follow_score = compute_blob_priority_standalone(
&follow_candidate, &our_id, &[follow_id], &[], now,
);
// Audience: stale (20 days no access), 10-day-old post, 5 copies
let audience_candidate = make_candidate(
audience_id, false,
now - 10 * 86400_000,
now - 20 * 86400_000,
5,
);
let audience_score = compute_blob_priority_standalone(
&audience_candidate, &our_id, &[], &[audience_id], now,
);
assert!(follow_score > audience_score,
"follow recent ({}) should score higher than audience stale ({})",
follow_score, audience_score);
}
#[test]
fn no_relationship_scores_near_zero() {
let our_id = make_node_id(1);
let stranger = make_node_id(99);
let now = 10_000_000_000u64;
// Stale stranger post with many copies
let candidate = make_candidate(
stranger, false,
now - 30 * 86400_000,
now - 30 * 86400_000,
10,
);
let score = compute_blob_priority_standalone(
&candidate, &our_id, &[], &[], now,
);
// 0.1 relationship * 0.0 heart_recency * ~0.03 freshness / 11 = ~0
assert!(score < 0.01, "stranger stale should score near 0, got {}", score);
}
#[test]
fn priority_ordering() {
let our_id = make_node_id(1);
let follow_id = make_node_id(2);
let audience_id = make_node_id(3);
let stranger_id = make_node_id(4);
let now = 10_000_000_000u64;
// Own pinned (highest)
let own = make_candidate(our_id, true, now - 86400_000, now, 0);
// Follow recent
let follow = make_candidate(follow_id, false, now - 86400_000, now, 0);
// Audience stale
let audience = make_candidate(audience_id, false, now - 10 * 86400_000, now - 20 * 86400_000, 5);
// Stranger
let stranger = make_candidate(stranger_id, false, now - 30 * 86400_000, now - 30 * 86400_000, 10);
let own_score = compute_blob_priority_standalone(&own, &our_id, &[follow_id], &[audience_id], now);
let follow_score = compute_blob_priority_standalone(&follow, &our_id, &[follow_id], &[audience_id], now);
let audience_score = compute_blob_priority_standalone(&audience, &our_id, &[follow_id], &[audience_id], now);
let stranger_score = compute_blob_priority_standalone(&stranger, &our_id, &[follow_id], &[audience_id], now);
assert!(own_score > follow_score, "own ({}) > follow ({})", own_score, follow_score);
assert!(follow_score > audience_score, "follow ({}) > audience ({})", follow_score, audience_score);
assert!(audience_score > stranger_score, "audience ({}) > stranger ({})", audience_score, stranger_score);
}
}