diff --git a/Cargo.lock b/Cargo.lock index d3cf5c1..c3c6706 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2746,7 +2746,7 @@ dependencies = [ [[package]] name = "itsgoin-desktop" -version = "0.3.5" +version = "0.3.6" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index b29fdaf..bbdc81e 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -166,6 +166,18 @@ async fn main() -> anyhow::Result<()> { let _upnp_tcp_handle = node.start_upnp_tcp_renewal_cycle(); // UPnP TCP lease renewal let _http_handle = node.start_http_server(); // HTTP post delivery (if publicly reachable) let _bootstrap_check = node.start_bootstrap_connectivity_check(); // 24h isolation check + let _replication_handle = node.start_replication_cycle(600); // 10 min active replication + + // Start blob eviction cycle (every 5 min, default 1 GB cache limit) + let cache_max_bytes: u64 = { + let storage = node.storage.lock().await; + storage.get_setting("cache_size_bytes") + .ok() + .flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(1_073_741_824u64) + }; + let _eviction_handle = Node::start_eviction_cycle(Arc::clone(&node), 300, cache_max_bytes); let _web_handle = if let Some(wp) = web_port { Some(node.start_web_handler(wp)) } else { diff --git a/crates/core/src/blob.rs b/crates/core/src/blob.rs index b3f5032..392df2d 100644 --- a/crates/core/src/blob.rs +++ b/crates/core/src/blob.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; /// A blob identifier — BLAKE3 hash of the blob data pub type BlobId = [u8; 32]; @@ -17,6 +18,13 @@ pub fn verify_blob(cid: &BlobId, data: &[u8]) -> bool { /// Blobs are stored at `{base_dir}/{hex[0..2]}/{hex}`. pub struct BlobStore { base_dir: PathBuf, + /// CDN delivery budget: bytes remaining we're willing to serve this hour. + /// Shared between Node (for resets) and ConnectionManager (for enforcement). + delivery_budget_remaining: AtomicU64, + /// Hourly delivery limit for resets + delivery_limit: AtomicU64, + /// Last delivery budget reset timestamp (ms) + delivery_last_reset_ms: AtomicU64, } impl BlobStore { @@ -24,7 +32,53 @@ impl BlobStore { pub fn open(data_dir: &Path) -> anyhow::Result { let base_dir = data_dir.join("blobs"); std::fs::create_dir_all(&base_dir)?; - Ok(Self { base_dir }) + Ok(Self { + base_dir, + delivery_budget_remaining: AtomicU64::new(u64::MAX), // unlimited until set + delivery_limit: AtomicU64::new(u64::MAX), + delivery_last_reset_ms: AtomicU64::new(0), + }) + } + + /// Initialize delivery budget from device role (called once by Node after construction). + pub fn set_delivery_budget(&self, limit: u64) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.delivery_limit.store(limit, Ordering::Relaxed); + self.delivery_budget_remaining.store(limit, Ordering::Relaxed); + self.delivery_last_reset_ms.store(now, Ordering::Relaxed); + } + + /// Check and consume delivery budget. Returns true if within budget. + /// Auto-resets if an hour has elapsed. + pub fn consume_delivery_budget(&self, bytes: u64) -> bool { + // Auto-reset check + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let last = self.delivery_last_reset_ms.load(Ordering::Relaxed); + if now.saturating_sub(last) >= 3_600_000 { + let limit = self.delivery_limit.load(Ordering::Relaxed); + self.delivery_budget_remaining.store(limit, Ordering::Relaxed); + self.delivery_last_reset_ms.store(now, Ordering::Relaxed); + } + + let prev = self.delivery_budget_remaining.fetch_update( + Ordering::Relaxed, + Ordering::Relaxed, + |current| { + if current >= bytes { Some(current - bytes) } else { None } + }, + ); + prev.is_ok() + } + + /// Get remaining delivery budget bytes. + pub fn delivery_budget_remaining(&self) -> u64 { + self.delivery_budget_remaining.load(Ordering::Relaxed) } fn blob_path(&self, cid: &BlobId) -> PathBuf { diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 1c2b97a..7b318cd 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -22,7 +22,8 @@ use crate::protocol::{ ProfileUpdatePayload, PullSyncRequestPayload, PullSyncResponsePayload, RefuseRedirectPayload, RelayIntroducePayload, RelayIntroduceResultPayload, SessionRelayPayload, SocialAddressUpdatePayload, SocialCheckinPayload, SocialDisconnectNoticePayload, - SyncPost, VisibilityUpdatePayload, WormQueryPayload, WormResponsePayload, ALPN_V2, + SyncPost, VisibilityUpdatePayload, WormQueryPayload, WormResponsePayload, + ReplicationRequestPayload, ReplicationResponsePayload, ALPN_V2, }; use crate::storage::Storage; use crate::types::{ @@ -1202,6 +1203,8 @@ impl ConnectionManager { nat_filtering: Some(our_profile.filtering.to_string()), http_capable: self.http_capable, http_addr: self.http_addr.clone(), + device_role: None, + cache_pressure: None, } }; @@ -1339,6 +1342,8 @@ impl ConnectionManager { nat_filtering: Some(our_profile.filtering.to_string()), http_capable: self.http_capable, http_addr: self.http_addr.clone(), + device_role: None, + cache_pressure: None, } }; @@ -1603,11 +1608,11 @@ impl ConnectionManager { } drop(storage); - // Register as downstream for new posts + // Register as downstream for new posts (cap at 50 to avoid flooding) if !new_post_ids.is_empty() { let reg_conn = pull_conn.clone(); tokio::spawn(async move { - for post_id in new_post_ids { + for post_id in new_post_ids.into_iter().take(50) { let payload = PostDownstreamRegisterPayload { post_id }; if let Ok(mut send) = reg_conn.open_uni().await { let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, &payload).await; @@ -1683,12 +1688,11 @@ impl ConnectionManager { } } - // Register as downstream with the sender for new posts - // so they push engagement diffs (reactions, comments) to us + // Register as downstream with the sender for new posts (cap at 50 to avoid flooding) if !new_post_ids.is_empty() { let conn = pc.connection.clone(); tokio::spawn(async move { - for post_id in new_post_ids { + for post_id in new_post_ids.into_iter().take(50) { let payload = PostDownstreamRegisterPayload { post_id }; if let Ok(mut send) = conn.open_uni().await { let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, &payload).await; @@ -1761,7 +1765,9 @@ impl ConnectionManager { json, header.updated_at, ); - // Apply individual reactions and comments + // Apply individual reactions and comments. + // store_reaction / store_comment are tombstone-aware: + // they compare timestamps and respect deleted_at fields. for reaction in &header.reactions { let _ = storage.store_reaction(reaction); } @@ -3737,7 +3743,7 @@ impl ConnectionManager { if let Some(session) = cm.sessions.get(&requester) { let session_conn = session.connection.clone(); drop(cm); // release lock before async work - match initial_exchange_connect(&storage_clone, &our_node_id, &session_conn, requester, None, our_nat_type, our_http_capable, our_http_addr.clone()).await { + match initial_exchange_connect(&storage_clone, &our_node_id, &session_conn, requester, None, our_nat_type, our_http_capable, our_http_addr.clone(), None, None).await { Ok(ExchangeResult::Accepted) => { tracing::info!(peer = hex::encode(requester), "Target-side: initial exchange after hole punch"); } @@ -5061,7 +5067,7 @@ impl ConnectionManager { let cm = conn_mgr.lock().await; (cm.storage_ref(), *cm.our_node_id(), cm.build_anchor_advertised_addr(), cm.nat_type(), cm.http_capable, cm.http_addr.clone()) }; - initial_exchange_accept(&storage, &our_node_id, send, recv, remote_node_id, anchor_addr, None, our_nat_type, our_http_capable, our_http_addr) + initial_exchange_accept(&storage, &our_node_id, send, recv, remote_node_id, anchor_addr, None, our_nat_type, our_http_capable, our_http_addr, None, None) .await?; } MessageType::AddressRequest => { @@ -5193,6 +5199,23 @@ impl ConnectionManager { let data = cm.blob_store.get(&payload.cid)?; let response = match data { Some(bytes) => { + // Check delivery budget before serving + if !cm.blob_store.consume_delivery_budget(bytes.len() as u64) { + debug!( + peer = hex::encode(remote_node_id), + cid = hex::encode(payload.cid), + blob_size = bytes.len(), + "Delivery budget exhausted, declining blob request" + ); + BlobResponsePayload { + cid: payload.cid, + found: false, + data_b64: String::new(), + manifest: None, + cdn_registered: false, + cdn_redirect_peers: vec![], + } + } else { use base64::Engine; // Load manifest if available, wrap in CdnManifest @@ -5252,6 +5275,7 @@ impl ConnectionManager { cdn_registered, cdn_redirect_peers, } + } // end delivery budget else } None => BlobResponsePayload { cid: payload.cid, @@ -5430,6 +5454,58 @@ impl ConnectionManager { }; write_typed_message(&mut send, MessageType::BlobHeaderResponse, &response).await?; } + MessageType::ReplicationRequest => { + let payload: ReplicationRequestPayload = read_payload(&mut recv, MAX_PAYLOAD).await?; + let (accepted, rejected, needs_pull) = { + let cm = conn_mgr.lock().await; + let storage = cm.storage.lock().await; + let mut acc = Vec::new(); + let mut rej = Vec::new(); + let mut to_pull = Vec::new(); + // Estimate ~1 MB per post with blobs for budget tracking + let est_bytes_per_post: u64 = 1024 * 1024; + let mut budget_used: u64 = 0; + let budget_cap: u64 = 20 * est_bytes_per_post; // cap per request + + for pid in &payload.post_ids { + // Already have it — accept for free + if storage.get_post(pid).ok().flatten().is_some() { + acc.push(*pid); + continue; + } + // Check budget before accepting a post we need to pull + if budget_used + est_bytes_per_post > budget_cap { + rej.push(*pid); + continue; + } + budget_used += est_bytes_per_post; + acc.push(*pid); + to_pull.push(*pid); + } + + // Register as downstream for all accepted posts + for pid in &acc { + let _ = storage.add_post_downstream(pid, &remote_node_id); + } + + (acc, rej, to_pull) + }; + let response = ReplicationResponsePayload { accepted: accepted.clone(), rejected }; + write_typed_message(&mut send, MessageType::ReplicationResponse, &response).await?; + send.finish()?; + let accepted_count = accepted.len(); + let needs_pull_count = needs_pull.len(); + debug!( + peer = hex::encode(remote_node_id), + accepted = accepted_count, + rejected = response.rejected.len(), + needs_pull = needs_pull_count, + "Handled replication request" + ); + // Posts we accepted but don't have will be fetched on the next pull cycle + // from the requester (they have these posts since they asked us to hold them). + // No explicit pull spawn needed — the periodic pull cycle handles it. + } other => { warn!(msg_type = ?other, "Unexpected message type on bi-stream"); } @@ -5575,41 +5651,78 @@ impl ConnectionManager { BlobHeaderDiffOp::Unknown => {} // future ops — silently skip } } + + // Rebuild blob header JSON from current DB state so pull-based sync gets fresh data. + // Use _with_tombstones so tombstones propagate through the pull path. + let reactions = storage.get_reactions_with_tombstones(&payload.post_id).unwrap_or_default(); + let comments = storage.get_comments_with_tombstones(&payload.post_id).unwrap_or_default(); + let policy = storage.get_comment_policy(&payload.post_id).ok().flatten().unwrap_or_default(); + let (existing_header_json, _) = storage.get_blob_header(&payload.post_id) + .ok() + .flatten() + .unwrap_or((String::new(), 0)); + let mut header: crate::types::BlobHeader = serde_json::from_str(&existing_header_json).unwrap_or_else(|_| { + crate::types::BlobHeader { + post_id: payload.post_id, + author: payload.author, + reactions: vec![], + comments: vec![], + policy: crate::types::CommentPolicy::default(), + updated_at: 0, + thread_splits: vec![], + receipt_slots: vec![], + comment_slots: vec![], + } + }); + header.reactions = reactions; + header.comments = comments; + header.policy = policy; + header.updated_at = payload.timestamp_ms; + if let Ok(json) = serde_json::to_string(&header) { + let _ = storage.store_blob_header(&payload.post_id, &payload.author, &json, payload.timestamp_ms); + } } + // Collect all targets (downstream + upstream), then send in a single batched task + let mut targets: Vec = Vec::new(); for peer_id in downstream { - if peer_id == sender { - continue; - } - // Try mesh connection first, then session - let conn = self.connections.get(&peer_id).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone())); - if let Some(conn) = conn { - let payload_clone = payload.clone(); - tokio::spawn(async move { - if let Ok(mut send) = conn.open_uni().await { - let _ = write_typed_message(&mut send, MessageType::BlobHeaderDiff, &payload_clone).await; - let _ = send.finish(); - } - }); + if peer_id == sender { continue; } + if let Some(conn) = self.connections.get(&peer_id).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(&peer_id).map(|sc| sc.connection.clone())) + { + targets.push(conn); } } - - // Also propagate upstream (toward the author) if let Some(up) = upstream { if up != sender { - let conn = self.connections.get(&up).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(&up).map(|sc| sc.connection.clone())); - if let Some(conn) = conn { - let payload_clone = payload.clone(); - tokio::spawn(async move { + if let Some(conn) = self.connections.get(&up).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(&up).map(|sc| sc.connection.clone())) + { + targets.push(conn); + } + } + } + if !targets.is_empty() { + let payload_clone = payload.clone(); + tokio::spawn(async move { + // Send to up to 10 concurrently, then batch the rest + use tokio::task::JoinSet; + let mut set = JoinSet::new(); + for conn in targets { + let p = payload_clone.clone(); + set.spawn(async move { if let Ok(mut send) = conn.open_uni().await { - let _ = write_typed_message(&mut send, MessageType::BlobHeaderDiff, &payload_clone).await; + let _ = write_typed_message(&mut send, MessageType::BlobHeaderDiff, &p).await; let _ = send.finish(); } }); + // Cap concurrency at 10 + if set.len() >= 10 { + let _ = set.join_next().await; + } } - } + while set.join_next().await.is_some() {} + }); } } @@ -5958,6 +6071,8 @@ pub struct ConnHandle { http_capable: Arc, /// External HTTP address if known (set once at startup) http_addr: Arc>>, + /// CDN device role (set once at startup by Network) + device_role_val: Arc>>, } impl ConnHandle { @@ -5967,6 +6082,7 @@ impl ConnHandle { tx, http_capable: Arc::new(AtomicBool::new(false)), http_addr: Arc::new(std::sync::Mutex::new(None)), + device_role_val: Arc::new(std::sync::Mutex::new(None)), } } @@ -5976,6 +6092,16 @@ impl ConnHandle { *self.http_addr.lock().unwrap() = addr; } + /// Set CDN device role (called once at Network startup). + pub fn set_device_role(&self, role: crate::types::DeviceRole) { + *self.device_role_val.lock().unwrap() = Some(role); + } + + /// Get CDN device role, if set. + pub fn device_role(&self) -> Option { + *self.device_role_val.lock().unwrap() + } + /// Whether this node is HTTP-capable. pub fn is_http_capable(&self) -> bool { self.http_capable.load(Ordering::Relaxed) @@ -6944,6 +7070,8 @@ pub async fn initial_exchange_connect( our_nat_type: crate::types::NatType, our_http_capable: bool, our_http_addr: Option, + our_device_role: Option, + our_cache_pressure: Option, ) -> anyhow::Result { let our_payload = { let storage = storage.lock().await; @@ -6967,6 +7095,8 @@ pub async fn initial_exchange_connect( nat_filtering: Some(crate::types::NatProfile::from_nat_type(our_nat_type).filtering.to_string()), http_capable: our_http_capable, http_addr: our_http_addr, + device_role: our_device_role.map(|r| r.as_str().to_string()), + cache_pressure: our_cache_pressure, } }; @@ -7011,6 +7141,8 @@ pub async fn initial_exchange_accept( our_nat_type: crate::types::NatType, our_http_capable: bool, our_http_addr: Option, + our_device_role: Option, + our_cache_pressure: Option, ) -> anyhow::Result<()> { let their_payload: InitialExchangePayload = read_payload(&mut recv, MAX_PAYLOAD).await?; @@ -7036,6 +7168,8 @@ pub async fn initial_exchange_accept( nat_filtering: Some(crate::types::NatProfile::from_nat_type(our_nat_type).filtering.to_string()), http_capable: our_http_capable, http_addr: our_http_addr, + device_role: our_device_role.map(|r| r.as_str().to_string()), + cache_pressure: our_cache_pressure, } }; @@ -7137,6 +7271,21 @@ async fn process_exchange_payload( debug!(peer = hex::encode(remote_node_id), http_addr = ?payload.http_addr, "Stored peer HTTP capability"); } + // Store peer's CDN device role and cache pressure + if payload.device_role.is_some() || payload.cache_pressure.is_some() { + let _ = storage.set_peer_device_role( + remote_node_id, + payload.device_role.as_deref(), + payload.cache_pressure, + ); + debug!( + peer = hex::encode(remote_node_id), + role = ?payload.device_role, + pressure = ?payload.cache_pressure, + "Stored peer CDN role" + ); + } + Ok(()) } diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index f6eab8d..16c6e0d 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -20,7 +20,7 @@ use crate::protocol::{ }; use crate::storage::Storage; use crate::types::{ - DeleteRecord, DeviceProfile, NodeId, PeerSlotKind, PeerWithAddress, Post, PostId, + DeleteRecord, DeviceProfile, DeviceRole, NodeId, PeerSlotKind, PeerWithAddress, Post, PostId, PostVisibility, PublicProfile, SessionReachMethod, WormResult, }; @@ -44,6 +44,8 @@ pub struct Network { has_public_v6: bool, /// Stable bind address (from --bind flag), passed to ConnectionManager for anchor advertised address bind_addr: Option, + /// CDN replication role: determines budget limits and pull ordering + device_role: DeviceRole, } fn is_public_ip(ip: IpAddr) -> bool { @@ -233,6 +235,17 @@ impl Network { "Network started (v2)" ); + // Determine CDN replication role from device characteristics + let device_role = if is_mobile { + DeviceRole::Intermittent + } else if is_anchor.load(Ordering::Relaxed) { + DeviceRole::Persistent + } else { + DeviceRole::Available + }; + info!(role = %device_role, "CDN replication role determined"); + conn_handle.set_device_role(device_role); + Ok(Self { endpoint, storage, @@ -246,6 +259,7 @@ impl Network { has_upnp_tcp, has_public_v6, bind_addr, + device_role, }) } @@ -319,6 +333,11 @@ impl Network { self.is_anchor.load(Ordering::Relaxed) } + /// Get the CDN replication device role. + pub fn device_role(&self) -> DeviceRole { + self.device_role + } + /// Whether this node can serve HTTP (has TCP reachability). pub fn is_http_capable(&self) -> bool { self.has_upnp_tcp || self.has_public_v6 || self.bind_addr.is_some() @@ -626,7 +645,7 @@ impl Network { let our_nat_type = conn_handle.nat_type().await; let our_http_capable = conn_handle.is_http_capable(); let our_http_addr = conn_handle.http_addr(); - match initial_exchange_accept(storage, &our_node_id, send, recv, remote_node_id, anchor_addr, Some(remote_sock), our_nat_type, our_http_capable, our_http_addr).await { + match initial_exchange_accept(storage, &our_node_id, send, recv, remote_node_id, anchor_addr, Some(remote_sock), our_nat_type, our_http_capable, our_http_addr, conn_handle.device_role(), None).await { Ok(()) => { info!(peer = hex::encode(remote_node_id), "Initial exchange complete (upgraded to mesh)"); conn_handle.log_activity(ActivityLevel::Info, ActivityCategory::Connection, format!("Upgraded {} to mesh", &hex::encode(remote_node_id)[..8]), Some(remote_node_id)); @@ -676,7 +695,7 @@ impl Network { let our_nat_type = self.conn_handle.nat_type().await; // Initial exchange WITHOUT holding conn_mgr lock - match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr()).await? { + match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await? { ExchangeResult::Accepted => { // Spawn the per-connection stream loop let conn_data = self.conn_handle.get_connection_map().await; @@ -1318,7 +1337,7 @@ impl Network { let anchor_addr = self.conn_handle.build_anchor_advertised_addr().await; let our_nat_type = self.conn_handle.nat_type().await; - match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr()).await { + match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await { Ok(ExchangeResult::Accepted) => { self.conn_handle.register_connection(peer_id, conn.clone(), vec![], PeerSlotKind::Local).await; { @@ -1423,7 +1442,7 @@ impl Network { for peer_id in &newly_connected { let conn = self.conn_handle.get_connection(peer_id).await; if let Some(conn) = conn { - match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, *peer_id, anchor_addr.clone(), our_nat_type, self.is_http_capable(), self.http_addr()).await { + match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, *peer_id, anchor_addr.clone(), our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await { Ok(ExchangeResult::Accepted) => {} Ok(ExchangeResult::Refused { redirect }) => { debug!(peer = hex::encode(peer_id), "Auto-connect refused, disconnecting"); @@ -1789,6 +1808,31 @@ impl Network { Ok(read_payload(&mut recv, 10 * 1024 * 1024).await?) } + /// Send a replication request to a peer, asking them to hold specific posts. + /// Returns the list of post IDs the peer accepted. Times out after 10 seconds. + pub async fn send_replication_request( + &self, + peer_id: &NodeId, + post_ids: Vec, + priority: u8, + ) -> anyhow::Result> { + use crate::protocol::{ReplicationRequestPayload, ReplicationResponsePayload}; + + let payload = ReplicationRequestPayload { post_ids, priority }; + let response: ReplicationResponsePayload = tokio::time::timeout( + std::time::Duration::from_secs(10), + self.send_to_peer_bi( + peer_id, + MessageType::ReplicationRequest, + &payload, + MessageType::ReplicationResponse, + ), + ) + .await + .map_err(|_| anyhow::anyhow!("replication request timed out"))??; + Ok(response.accepted) + } + /// Fetch a blob from a peer by CID. /// Returns None if the peer doesn't have it. /// Returns (data, response) so caller can handle manifest + CDN fields. diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 8d3f6a8..1f15bad 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -14,8 +14,8 @@ use crate::network::Network; use crate::storage::Storage; use crate::types::{ Attachment, AudienceDirection, AudienceRecord, AudienceStatus, Circle, DeleteRecord, - DeviceProfile, NodeId, PeerRecord, PeerSlotKind, PeerWithAddress, Post, PostId, PostVisibility, - PublicProfile, ReachMethod, RevocationMode, SessionReachMethod, SocialRelation, + DeviceProfile, DeviceRole, NodeId, PeerRecord, PeerSlotKind, PeerWithAddress, Post, PostId, + PostVisibility, PublicProfile, ReachMethod, RevocationMode, SessionReachMethod, SocialRelation, SocialRouteEntry, SocialStatus, VisibilityIntent, VisibilityUpdate, WormResult, }; @@ -36,6 +36,12 @@ pub struct Node { pub activity_log: Arc>, pub last_rebalance_ms: Arc, pub last_anchor_register_ms: Arc, + /// CDN replication budget: bytes remaining we're willing to pull and cache this hour + replication_budget_remaining: Arc, + /// CDN delivery budget: bytes remaining we're willing to serve this hour + delivery_budget_remaining: Arc, + /// Last budget reset timestamp (ms) + budget_last_reset_ms: Arc, } impl Node { @@ -421,6 +427,19 @@ impl Node { } } + // Initialize CDN replication budgets based on device role + let role = network.device_role(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let replication_budget_remaining = Arc::new(AtomicU64::new(role.replication_limit())); + let delivery_budget_remaining = Arc::new(AtomicU64::new(role.delivery_limit())); + let budget_last_reset_ms = Arc::new(AtomicU64::new(now)); + + // Set delivery budget on blob store (shared with ConnectionManager) + blob_store.set_delivery_budget(role.delivery_limit()); + Ok(Self { data_dir, storage, @@ -433,6 +452,9 @@ impl Node { activity_log, last_rebalance_ms, last_anchor_register_ms, + replication_budget_remaining, + delivery_budget_remaining, + budget_last_reset_ms, }) } @@ -454,6 +476,62 @@ impl Node { self.secret_seed } + // --- CDN Replication Budget --- + + /// Reset budgets if an hour has elapsed since last reset. + fn maybe_reset_budgets(&self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let last = self.budget_last_reset_ms.load(AtomicOrdering::Relaxed); + if now.saturating_sub(last) >= 3_600_000 { + let role = self.network.device_role(); + self.replication_budget_remaining.store(role.replication_limit(), AtomicOrdering::Relaxed); + self.delivery_budget_remaining.store(role.delivery_limit(), AtomicOrdering::Relaxed); + self.budget_last_reset_ms.store(now, AtomicOrdering::Relaxed); + debug!(role = %role, "CDN budgets reset for new hour"); + } + } + + /// Try to consume replication budget. Returns true if within budget. + pub fn consume_replication_budget(&self, bytes: u64) -> bool { + self.maybe_reset_budgets(); + let prev = self.replication_budget_remaining.fetch_update( + AtomicOrdering::Relaxed, + AtomicOrdering::Relaxed, + |current| { + if current >= bytes { Some(current - bytes) } else { None } + }, + ); + prev.is_ok() + } + + /// Try to consume delivery budget. Returns true if within budget. + pub fn consume_delivery_budget(&self, bytes: u64) -> bool { + self.maybe_reset_budgets(); + let prev = self.delivery_budget_remaining.fetch_update( + AtomicOrdering::Relaxed, + AtomicOrdering::Relaxed, + |current| { + if current >= bytes { Some(current - bytes) } else { None } + }, + ); + prev.is_ok() + } + + /// Get remaining replication budget bytes. + pub fn replication_budget_remaining(&self) -> u64 { + self.maybe_reset_budgets(); + self.replication_budget_remaining.load(AtomicOrdering::Relaxed) + } + + /// Get remaining delivery budget bytes. + pub fn delivery_budget_remaining(&self) -> u64 { + self.maybe_reset_budgets(); + self.delivery_budget_remaining.load(AtomicOrdering::Relaxed) + } + // ---- Identity export/import ---- pub fn export_identity_hex(&self) -> anyhow::Result { @@ -1120,21 +1198,27 @@ impl Node { } /// Prefetch blobs for recently synced posts from a peer. - /// Queries storage for posts with attachments missing from the local blob store, - /// then fetches each missing blob. Runs outside any locks. + /// Scans recent posts (newest first) for missing blobs, caps at 20 per cycle. + /// Runs outside any locks. + const MAX_PREFETCH_PER_CYCLE: usize = 20; + pub async fn prefetch_blobs_from_peer(&self, peer_id: &NodeId) { - // Gather posts with missing blobs + // Gather posts with missing blobs, newest first, capped let missing: Vec<(PostId, NodeId, Vec)> = { let storage = self.storage.lock().await; let post_ids = storage.list_post_ids().unwrap_or_default(); let mut result = Vec::new(); + let mut total_missing = 0usize; + // list_post_ids returns newest first typically; cap total missing blobs for pid in post_ids { + if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; } if let Ok(Some(post)) = storage.get_post(&pid) { let missing_atts: Vec<_> = post.attachments.iter() .filter(|a| !self.blob_store.has(&a.cid)) .cloned() .collect(); if !missing_atts.is_empty() { + total_missing += missing_atts.len(); result.push((pid, post.author, missing_atts)); } } @@ -1149,6 +1233,7 @@ impl Node { let mut fetched = 0usize; for (post_id, author, attachments) in &missing { for att in attachments { + if fetched >= Self::MAX_PREFETCH_PER_CYCLE { break; } match self.fetch_blob_with_fallback( &att.cid, post_id, author, &att.mime_type, 0, ).await { @@ -1219,8 +1304,11 @@ impl Node { Ok(data) } - /// Fetch a blob with CDN-aware cascade: - /// 1. Local → 2. Existing upstream → 3. Lateral N0-N2 peers → 4. Author → 5. Redirect peers + /// Fetch a blob with CDN-aware cascade, preferring non-anchor sources to save anchor + /// delivery budget: + /// 1. Local → 2. Existing upstream → 3. Lateral peers (non-anchor first) + /// → 4. Replicas → 5. Author → 6. Redirect peers + /// Anchors are deprioritized at each step via storage-level ordering. pub async fn fetch_blob_with_fallback( &self, cid: &[u8; 32], @@ -1253,13 +1341,14 @@ impl Node { } // 3. Lateral N0-N2: mesh peers + N2 peers who have the author's posts + // (sorted by get_lateral_blob_sources: non-anchors first) let lateral_sources = { let storage = self.storage.lock().await; storage.get_lateral_blob_sources(author, post_id).unwrap_or_default() }; for lateral in lateral_sources { if lateral == *author { - continue; // Author is step 4 + continue; // Author tried separately below } match self.network.fetch_blob_full(cid, &lateral).await { Ok((Some(data), response)) => { @@ -1282,27 +1371,7 @@ impl Node { } } - // 4. Try author (last resort for direct) - match self.fetch_blob_from_peer(cid, author, post_id, author, mime_type, created_at).await { - Ok(Some(data)) => return Ok(Some(data)), - Ok(None) => {} - Err(e) => warn!(error = %e, "blob fetch from author failed"), - } - - // 5. Try redirect peers (from any step that returned cdn_redirect_peers) - for rp in &redirect_peers { - if let Ok(nid_bytes) = hex::decode(&rp.n) { - if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { - match self.fetch_blob_from_peer(cid, &nid, post_id, author, mime_type, created_at).await { - Ok(Some(data)) => return Ok(Some(data)), - Ok(None) => {} - Err(e) => warn!(peer = &rp.n, error = %e, "redirect blob fetch failed"), - } - } - } - } - - // 6. Try replica peers as final fallback (1-hour staleness window) + // 4. Try replica peers (before author — replicas are often closer/cheaper) let replicas = { let storage = self.storage.lock().await; storage.get_replica_peers(post_id, 3_600_000)? @@ -1315,6 +1384,26 @@ impl Node { } } + // 5. Try author + match self.fetch_blob_from_peer(cid, author, post_id, author, mime_type, created_at).await { + Ok(Some(data)) => return Ok(Some(data)), + Ok(None) => {} + Err(e) => warn!(error = %e, "blob fetch from author failed"), + } + + // 6. Try redirect peers (from any step that returned cdn_redirect_peers) + for rp in &redirect_peers { + if let Ok(nid_bytes) = hex::decode(&rp.n) { + if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) { + match self.fetch_blob_from_peer(cid, &nid, post_id, author, mime_type, created_at).await { + Ok(Some(data)) => return Ok(Some(data)), + Ok(None) => {} + Err(e) => warn!(peer = &rp.n, error = %e, "redirect blob fetch failed"), + } + } + } + } + Ok(None) } @@ -1722,6 +1811,99 @@ impl Node { storage.set_setting(key, value) } + // ---- Cache stats & pressure ---- + + /// Get cache statistics: (used_bytes, max_bytes, blob_count). + /// max_bytes comes from the `cache_size_bytes` setting (default 1 GB, 0 = unlimited). + pub async fn get_cache_stats(&self) -> anyhow::Result<(u64, u64, u64)> { + let storage = self.storage.lock().await; + let used = storage.total_blob_bytes()?; + let count = storage.count_blobs()?; + let max_str = storage.get_setting("cache_size_bytes")?.unwrap_or_default(); + let max: u64 = max_str.parse().unwrap_or(1_073_741_824); + Ok((used, max, count)) + } + + /// Compute cache pressure score (0-255). + /// 0 = no pressure (plenty of room or cache empty). + /// 255 = maximum pressure (lowest-priority blob is >72 h old). + /// Scales linearly: 0 h → 0, 36 h → 128, 72 h → 255. + pub async fn compute_cache_pressure(&self) -> anyhow::Result { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() as u64; + + let staleness_ms = 3600 * 1000; + + let (candidates, follows, audience_members) = { + let storage = self.storage.lock().await; + let candidates = storage.get_eviction_candidates(staleness_ms)?; + let follows = storage.list_follows().unwrap_or_default(); + let audience = storage.list_audience_members().unwrap_or_default(); + (candidates, follows, audience) + }; + + if candidates.is_empty() { + return Ok(255); // Empty cache = max willingness to accept + } + + // Filter to non-elevated blobs (not pinned, not own content, not followed author) + let non_elevated: Vec<_> = candidates.iter().filter(|c| { + !c.pinned && c.author != self.node_id && !follows.contains(&c.author) + }).collect(); + + if non_elevated.is_empty() { + return Ok(255); // All blobs are elevated — plenty of room for new content + } + + // Find the lowest priority (oldest/least-valuable) blob + let mut min_priority = f64::MAX; + let mut min_created_at = u64::MAX; + for c in &non_elevated { + let priority = self.compute_blob_priority(c, &follows, &audience_members, now); + if priority < min_priority { + min_priority = priority; + min_created_at = c.created_at; + } + } + + // Scale based on age of the oldest non-elevated blob + let age_hours = now.saturating_sub(min_created_at) as f64 / (3600.0 * 1000.0); + let pressure = if age_hours >= 72.0 { + 255 + } else { + ((age_hours / 72.0) * 255.0) as u8 + }; + + Ok(pressure) + } + + // ---- Seen engagement tracking ---- + + /// Get seen engagement counts for a post. + pub async fn get_seen_engagement(&self, post_id: &PostId) -> anyhow::Result<(u32, u32)> { + let storage = self.storage.lock().await; + storage.get_seen_engagement(post_id) + } + + /// Mark a post's engagement as seen (upsert). + pub async fn set_seen_engagement(&self, post_id: &PostId, react_count: u32, comment_count: u32) -> anyhow::Result<()> { + let storage = self.storage.lock().await; + storage.set_seen_engagement(post_id, react_count, comment_count) + } + + /// Get last-read timestamp for a conversation partner. + pub async fn get_last_read_message(&self, partner_id: &NodeId) -> anyhow::Result { + let storage = self.storage.lock().await; + storage.get_last_read_message(partner_id) + } + + /// Mark a conversation as read up to the given timestamp. + pub async fn set_last_read_message(&self, partner_id: &NodeId, timestamp_ms: u64) -> anyhow::Result<()> { + let storage = self.storage.lock().await; + storage.set_last_read_message(partner_id, timestamp_ms) + } + // ---- Delete / Revocation ---- pub async fn delete_post(&self, post_id: &PostId) -> anyhow::Result<()> { @@ -3295,6 +3477,7 @@ impl Node { post_id, timestamp_ms: now, encrypted_payload, + deleted_at: None, }; // Store locally @@ -3409,6 +3592,7 @@ impl Node { content, timestamp_ms: now, signature, + deleted_at: None, }; let storage = self.storage.lock().await; @@ -4001,6 +4185,16 @@ pub fn compute_blob_priority_standalone( ) -> f64 { let pin_boost = if candidate.pinned { 1000.0 } else { 0.0 }; + // Share-link popularity boost: high downstream count indicates the blob + // has been shared via share links and is actively being served to others. + let share_boost = if candidate.downstream_count >= 3 { + 100.0 + } else if candidate.downstream_count >= 1 { + 50.0 * candidate.downstream_count as f64 / 3.0 + } else { + 0.0 + }; + let relationship = if candidate.author == *our_node_id { 5.0 } else if follows.contains(&candidate.author) && audience_members.contains(&candidate.author) { @@ -4022,7 +4216,147 @@ pub fn compute_blob_priority_standalone( let copies_factor = 1.0 / (candidate.peer_copies as f64 + 1.0); - pin_boost + (relationship * heart_recency * freshness * copies_factor) + pin_boost + share_boost + (relationship * heart_recency * freshness * copies_factor) +} + +// --- Active Replication Cycle --- + +impl Node { + /// Start the active replication cycle: periodically ask peers to hold our + /// under-replicated recent content. Only Available/Persistent devices initiate. + pub fn start_replication_cycle(self: &Arc, interval_secs: u64) -> tokio::task::JoinHandle<()> { + let node = Arc::clone(self); + tokio::spawn(async move { + // Wait 2 minutes before first cycle (let connections establish) + tokio::time::sleep(std::time::Duration::from_secs(120)).await; + let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); + loop { + interval.tick().await; + node.run_replication_check().await; + } + }) + } + + /// Single replication check iteration. + async fn run_replication_check(&self) { + // All devices initiate replication — phones need their content replicated + // before they go to sleep. + + // 1. Get own posts < 72h old + let seventy_two_hours_ms = 72u64 * 3600 * 1000; + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let since_ms = now_ms.saturating_sub(seventy_two_hours_ms); + + let under_replicated: Vec = { + let storage = self.storage.lock().await; + let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) { + Ok(ids) => ids, + Err(e) => { + debug!(error = %e, "Replication: failed to get own recent posts"); + return; + } + }; + + // 3. Filter to under-replicated (< 2 downstream) + let mut needs_replication = Vec::new(); + for pid in &recent_ids { + match storage.get_post_downstream_count(pid) { + Ok(count) if count < 2 => { + needs_replication.push(*pid); + } + _ => {} + } + } + needs_replication + }; + + // 4. If none need replication, skip silently + if under_replicated.is_empty() { + return; + } + + // 5. Find connected Available/Persistent peers + let connected = self.network.connected_peers().await; + if connected.is_empty() { + debug!("No peers for replication"); + return; + } + + // Priority: Available (desktops) > Persistent (anchors) > Intermittent (phones) + let role_priority = |role: &DeviceRole| -> u16 { + match role { + DeviceRole::Available => 300, // desktops — best replication targets + DeviceRole::Persistent => 200, // anchors — good but save for web + DeviceRole::Intermittent => 100, // phones — last resort but still useful + } + }; + + let suitable_peers: Vec<(NodeId, u16)> = { + let storage = self.storage.lock().await; + let mut candidates = Vec::new(); + for peer_id in &connected { + if *peer_id == self.node_id { continue; } + let role_str = storage.get_peer_device_role(peer_id) + .ok() + .flatten() + .unwrap_or_default(); + let role = DeviceRole::from_str_label(&role_str); + let pressure = storage.get_peer_cache_pressure(peer_id) + .ok() + .flatten() + .unwrap_or(128) as u16; + // Combined score: role priority + cache pressure + let score = role_priority(&role) + pressure; + candidates.push((*peer_id, score)); + } + candidates + }; + + if suitable_peers.is_empty() { + debug!("No peers available for replication"); + return; + } + + // Pick best candidate (highest combined score) + let best_peer = suitable_peers + .iter() + .max_by_key(|(_, score)| *score) + .map(|(id, _)| *id) + .unwrap(); + + // 7. Cap at 20 post IDs per request, one request per cycle + let batch: Vec = under_replicated.into_iter().take(20).collect(); + let batch_len = batch.len(); + + // 8. Send ReplicationRequest + match self.network.send_replication_request(&best_peer, batch, 128).await { + Ok(accepted) => { + if accepted.is_empty() { + debug!( + peer = hex::encode(best_peer), + "Replication: peer rejected all posts" + ); + } else { + debug!( + peer = hex::encode(best_peer), + accepted = accepted.len(), + requested = batch_len, + "Replication: peer accepted posts" + ); + } + } + Err(e) => { + debug!( + peer = hex::encode(best_peer), + error = %e, + "Replication: request failed" + ); + } + } + } } #[cfg(test)] @@ -4050,6 +4384,7 @@ mod tests { last_accessed_at, pinned, peer_copies, + downstream_count: 0, } } diff --git a/crates/core/src/protocol.rs b/crates/core/src/protocol.rs index 09e2bba..b264625 100644 --- a/crates/core/src/protocol.rs +++ b/crates/core/src/protocol.rs @@ -71,6 +71,8 @@ pub enum MessageType { TcpPunchRequest = 0xD6, TcpPunchResult = 0xD7, MeshKeepalive = 0xE0, + ReplicationRequest = 0xE1, + ReplicationResponse = 0xE2, } impl MessageType { @@ -126,6 +128,8 @@ impl MessageType { 0xD6 => Some(Self::TcpPunchRequest), 0xD7 => Some(Self::TcpPunchResult), 0xE0 => Some(Self::MeshKeepalive), + 0xE1 => Some(Self::ReplicationRequest), + 0xE2 => Some(Self::ReplicationResponse), _ => None, } } @@ -174,6 +178,12 @@ pub struct InitialExchangePayload { /// External HTTP address if known (e.g. "1.2.3.4:4433") #[serde(default)] pub http_addr: Option, + /// CDN replication device role: "intermittent", "available", "persistent" + #[serde(default)] + pub device_role: Option, + /// CDN cache pressure: 0-255 availability score (255 = lots of capacity) + #[serde(default)] + pub cache_pressure: Option, } /// Incremental N1/N2 changes @@ -622,6 +632,26 @@ pub struct PostFetchResponsePayload { pub post: Option, } +// --- Active CDN Replication payloads --- + +/// Request a peer to replicate (cache) specific posts and their blobs (bi-stream) +#[derive(Debug, Serialize, Deserialize)] +pub struct ReplicationRequestPayload { + /// Posts to replicate (with their blobs) + pub post_ids: Vec, + /// 0-255 urgency (higher = more important to cache) + pub priority: u8, +} + +/// Response to a replication request (bi-stream) +#[derive(Debug, Serialize, Deserialize)] +pub struct ReplicationResponsePayload { + /// Posts the peer agreed to hold + pub accepted: Vec, + /// Posts the peer declined (over budget or no space) + pub rejected: Vec, +} + /// Request a TCP hole punch toward a browser IP (bi-stream). /// Sent by the anchor to a node that holds a post, so the node's NAT /// opens a pinhole allowing the browser to connect directly via HTTP. @@ -766,6 +796,11 @@ mod tests { MessageType::PostDownstreamRegister, MessageType::PostFetchRequest, MessageType::PostFetchResponse, + MessageType::TcpPunchRequest, + MessageType::TcpPunchResult, + MessageType::MeshKeepalive, + MessageType::ReplicationRequest, + MessageType::ReplicationResponse, ]; for mt in types { let byte = mt.as_byte(); diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 64c8a45..01e7b9b 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -22,6 +22,8 @@ pub struct EvictionCandidate { pub last_accessed_at: u64, pub pinned: bool, pub peer_copies: u32, + /// Number of downstream CDN peers — proxy for share-link popularity. + pub downstream_count: u32, } pub struct Storage { @@ -338,6 +340,16 @@ impl Storage { host TEXT NOT NULL, last_seen_ms INTEGER NOT NULL, PRIMARY KEY (post_id, host) + ); + CREATE TABLE IF NOT EXISTS seen_engagement ( + post_id BLOB PRIMARY KEY, + seen_react_count INTEGER NOT NULL DEFAULT 0, + seen_comment_count INTEGER NOT NULL DEFAULT 0, + updated_at INTEGER NOT NULL DEFAULT 0 + ); + CREATE TABLE IF NOT EXISTS seen_messages ( + partner_id BLOB PRIMARY KEY, + last_read_ms INTEGER NOT NULL DEFAULT 0 );", )?; Ok(()) @@ -563,6 +575,37 @@ impl Storage { )?; } + // Add deleted_at column to reactions if missing (tombstone support) + let has_deleted_at_react = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('reactions') WHERE name='deleted_at'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_deleted_at_react == 0 { + self.conn.execute_batch( + "ALTER TABLE reactions ADD COLUMN deleted_at INTEGER DEFAULT NULL;" + )?; + } + + // Add deleted_at column to comments if missing (tombstone support) + let has_deleted_at_comment = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('comments') WHERE name='deleted_at'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_deleted_at_comment == 0 { + self.conn.execute_batch( + "ALTER TABLE comments ADD COLUMN deleted_at INTEGER DEFAULT NULL;" + )?; + } + + // Add device_role column to peers if missing (Active CDN replication) + let has_device_role = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('peers') WHERE name='device_role'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_device_role == 0 { + self.conn.execute_batch( + "ALTER TABLE peers ADD COLUMN device_role TEXT DEFAULT NULL; + ALTER TABLE peers ADD COLUMN cache_pressure INTEGER DEFAULT NULL;" + )?; + } + Ok(()) } @@ -661,6 +704,32 @@ impl Storage { } } + /// Get post IDs authored by `author` with timestamp_ms >= `since_ms`. + pub fn get_own_recent_post_ids(&self, author: &NodeId, since_ms: u64) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT id FROM posts WHERE author = ?1 AND timestamp_ms >= ?2" + )?; + let rows = stmt.query_map(params![author.as_slice(), since_ms as i64], |row| { + let bytes: Vec = row.get(0)?; + Ok(bytes) + })?; + let mut ids = Vec::new(); + for row in rows { + ids.push(blob_to_postid(row?)?); + } + Ok(ids) + } + + /// Get a peer's cache_pressure score (0-255), or None if unknown. + pub fn get_peer_cache_pressure(&self, node_id: &NodeId) -> anyhow::Result> { + let result: Option = self.conn.query_row( + "SELECT cache_pressure FROM peers WHERE node_id = ?1", + params![node_id.as_slice()], + |row| row.get(0), + ).ok().flatten(); + Ok(result.map(|p| p as u8)) + } + pub fn list_post_ids(&self) -> anyhow::Result> { let mut stmt = self.conn.prepare("SELECT id FROM posts")?; let rows = stmt.query_map([], |row| { @@ -1141,6 +1210,69 @@ impl Storage { Ok(()) } + // --- Seen engagement tracking --- + + /// Get the seen engagement counts for a post (react_count, comment_count). + pub fn get_seen_engagement(&self, post_id: &PostId) -> anyhow::Result<(u32, u32)> { + let mut stmt = self.conn.prepare( + "SELECT seen_react_count, seen_comment_count FROM seen_engagement WHERE post_id = ?1" + )?; + let result = stmt.query_row(params![post_id.as_slice()], |row| { + let rc: i64 = row.get(0)?; + let cc: i64 = row.get(1)?; + Ok((rc as u32, cc as u32)) + }); + match result { + Ok(r) => Ok(r), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok((0, 0)), + Err(e) => Err(e.into()), + } + } + + /// Set the seen engagement counts for a post (upsert). + pub fn set_seen_engagement(&self, post_id: &PostId, react_count: u32, comment_count: u32) -> anyhow::Result<()> { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0); + self.conn.execute( + "INSERT INTO seen_engagement (post_id, seen_react_count, seen_comment_count, updated_at) + VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(post_id) DO UPDATE SET + seen_react_count = excluded.seen_react_count, + seen_comment_count = excluded.seen_comment_count, + updated_at = excluded.updated_at", + params![post_id.as_slice(), react_count as i64, comment_count as i64, now_ms], + )?; + Ok(()) + } + + /// Get the last-read timestamp for a conversation partner. + pub fn get_last_read_message(&self, partner_id: &NodeId) -> anyhow::Result { + let mut stmt = self.conn.prepare( + "SELECT last_read_ms FROM seen_messages WHERE partner_id = ?1" + )?; + let result = stmt.query_row(params![partner_id.as_slice()], |row| { + let ts: i64 = row.get(0)?; + Ok(ts as u64) + }); + match result { + Ok(r) => Ok(r), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0), + Err(e) => Err(e.into()), + } + } + + /// Set the last-read timestamp for a conversation partner (upsert). + pub fn set_last_read_message(&self, partner_id: &NodeId, timestamp_ms: u64) -> anyhow::Result<()> { + self.conn.execute( + "INSERT INTO seen_messages (partner_id, last_read_ms) VALUES (?1, ?2) + ON CONFLICT(partner_id) DO UPDATE SET last_read_ms = excluded.last_read_ms", + params![partner_id.as_slice(), timestamp_ms as i64], + )?; + Ok(()) + } + /// Initialize post_hosts table (called by web handler). pub fn init_post_hosts_table(&self) -> anyhow::Result<()> { // Already in init_tables, but safe to call again @@ -1340,6 +1472,25 @@ impl Storage { Ok(result.unwrap_or(0) != 0) } + /// Set a peer's CDN device role and cache pressure. + pub fn set_peer_device_role(&self, node_id: &NodeId, role: Option<&str>, pressure: Option) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE peers SET device_role = ?2, cache_pressure = ?3 WHERE node_id = ?1", + params![node_id.as_slice(), role, pressure.map(|p| p as i32)], + )?; + Ok(()) + } + + /// Get a peer's CDN device role (from InitialExchange). + pub fn get_peer_device_role(&self, node_id: &NodeId) -> anyhow::Result> { + let result: Option = self.conn.query_row( + "SELECT device_role FROM peers WHERE node_id = ?1", + params![node_id.as_slice()], + |row| row.get(0), + ).ok().flatten(); + Ok(result) + } + /// Get the display name for a node, or None if no profile exists pub fn get_display_name(&self, node_id: &NodeId) -> anyhow::Result> { let result: Option = self.conn.query_row( @@ -3390,14 +3541,20 @@ impl Storage { let mut stmt = self.conn.prepare( "SELECT b.cid, b.post_id, b.author, b.size_bytes, b.created_at, b.last_accessed_at, b.pinned, - COALESCE(r.copies, 0) as peer_copies + COALESCE(r.copies, 0) as peer_copies, + COALESCE(d.ds_count, 0) as downstream_count FROM blobs b LEFT JOIN ( SELECT post_id, COUNT(*) as copies FROM post_replicas WHERE last_confirmed_ms >= ?1 GROUP BY post_id - ) r ON b.post_id = r.post_id" + ) r ON b.post_id = r.post_id + LEFT JOIN ( + SELECT cid, COUNT(*) as ds_count + FROM blob_downstream + GROUP BY cid + ) d ON b.cid = d.cid" )?; let rows = stmt.query_map(params![cutoff], |row| { let cid_bytes: Vec = row.get(0)?; @@ -3408,11 +3565,12 @@ impl Storage { let last_accessed_at = row.get::<_, i64>(5)? as u64; let pinned = row.get::<_, i64>(6)? != 0; let peer_copies = row.get::<_, i64>(7)? as u32; - Ok((cid_bytes, post_id_bytes, author_bytes, size_bytes, created_at, last_accessed_at, pinned, peer_copies)) + let downstream_count = row.get::<_, i64>(8)? as u32; + Ok((cid_bytes, post_id_bytes, author_bytes, size_bytes, created_at, last_accessed_at, pinned, peer_copies, downstream_count)) })?; let mut result = Vec::new(); for row in rows { - let (cid_bytes, post_id_bytes, author_bytes, size_bytes, created_at, last_accessed_at, pinned, peer_copies) = row?; + let (cid_bytes, post_id_bytes, author_bytes, size_bytes, created_at, last_accessed_at, pinned, peer_copies, downstream_count) = row?; let cid: [u8; 32] = match cid_bytes.try_into() { Ok(c) => c, Err(_) => continue, @@ -3434,11 +3592,22 @@ impl Storage { last_accessed_at, pinned, peer_copies, + downstream_count, }); } Ok(result) } + /// Count total number of blobs. + pub fn count_blobs(&self) -> anyhow::Result { + let count: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM blobs", + [], + |row| row.get(0), + )?; + Ok(count as u64) + } + /// Clean up all CDN metadata for a blob (manifests + upstream + downstream). pub fn cleanup_cdn_for_blob(&self, cid: &[u8; 32]) -> anyhow::Result<()> { self.conn.execute("DELETE FROM cdn_manifests WHERE cid = ?1", params![cid.as_slice()])?; @@ -3730,19 +3899,24 @@ impl Storage { /// Get mesh peers and N2 peers known to have an author's posts (from post_replicas overlap). /// Used by the lateral fetch cascade step. + /// Results are sorted: non-anchor peers first (to save anchor delivery budget), + /// then by specificity (peers with this exact post first) and recency. pub fn get_lateral_blob_sources(&self, author: &NodeId, post_id: &PostId) -> anyhow::Result> { // Find peers who have replicas of any post by this author, prioritizing those // who have this specific post, then any other posts by the same author. // Cross-reference with mesh_peers and reachable_n2 for reachability. + // Sort: non-anchors first (COALESCE is_anchor default 0), then post match, then recency. let mut stmt = self.conn.prepare( "SELECT DISTINCT pr.node_id FROM post_replicas pr INNER JOIN posts p ON pr.post_id = p.id + LEFT JOIN peers pe ON pr.node_id = pe.node_id WHERE p.author = ?1 AND ( pr.node_id IN (SELECT node_id FROM mesh_peers) OR pr.node_id IN (SELECT reachable_node_id FROM reachable_n2) ) - ORDER BY CASE WHEN pr.post_id = ?2 THEN 0 ELSE 1 END, + ORDER BY COALESCE(pe.is_anchor, 0) ASC, + CASE WHEN pr.post_id = ?2 THEN 0 ELSE 1 END, pr.last_confirmed_ms DESC LIMIT 10" )?; @@ -3839,38 +4013,41 @@ impl Storage { // --- Engagement: reactions --- /// Store a reaction (upsert by reactor+post_id+emoji). + /// Tombstone-aware: incoming reaction wins only if its timestamp is newer. pub fn store_reaction(&self, reaction: &Reaction) -> anyhow::Result<()> { self.conn.execute( - "INSERT INTO reactions (reactor, post_id, emoji, timestamp_ms, encrypted_payload) - VALUES (?1, ?2, ?3, ?4, ?5) + "INSERT INTO reactions (reactor, post_id, emoji, timestamp_ms, encrypted_payload, deleted_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6) ON CONFLICT(reactor, post_id, emoji) DO UPDATE SET - timestamp_ms = excluded.timestamp_ms, - encrypted_payload = excluded.encrypted_payload", + timestamp_ms = CASE WHEN excluded.timestamp_ms > timestamp_ms THEN excluded.timestamp_ms ELSE timestamp_ms END, + deleted_at = CASE WHEN excluded.timestamp_ms > timestamp_ms THEN excluded.deleted_at ELSE deleted_at END, + encrypted_payload = CASE WHEN excluded.timestamp_ms > timestamp_ms THEN excluded.encrypted_payload ELSE encrypted_payload END", params![ reaction.reactor.as_slice(), reaction.post_id.as_slice(), reaction.emoji, reaction.timestamp_ms as i64, reaction.encrypted_payload, + reaction.deleted_at.map(|v| v as i64), ], )?; Ok(()) } - /// Remove a reaction. + /// Tombstone a reaction (soft-delete by setting deleted_at). pub fn remove_reaction(&self, reactor: &NodeId, post_id: &PostId, emoji: &str) -> anyhow::Result<()> { self.conn.execute( - "DELETE FROM reactions WHERE reactor = ?1 AND post_id = ?2 AND emoji = ?3", - params![reactor.as_slice(), post_id.as_slice(), emoji], + "UPDATE reactions SET deleted_at = ?4 WHERE reactor = ?1 AND post_id = ?2 AND emoji = ?3", + params![reactor.as_slice(), post_id.as_slice(), emoji, now_ms()], )?; Ok(()) } - /// Get all reactions for a post. + /// Get live (non-tombstoned) reactions for a post. Used for UI display. pub fn get_reactions(&self, post_id: &PostId) -> anyhow::Result> { let mut stmt = self.conn.prepare( "SELECT reactor, post_id, emoji, timestamp_ms, encrypted_payload - FROM reactions WHERE post_id = ?1 ORDER BY timestamp_ms ASC" + FROM reactions WHERE post_id = ?1 AND deleted_at IS NULL ORDER BY timestamp_ms ASC" )?; let rows = stmt.query_map(params![post_id.as_slice()], |row| { let reactor: Vec = row.get(0)?; @@ -3891,17 +4068,51 @@ impl Storage { post_id, timestamp_ms: ts as u64, encrypted_payload: enc, + deleted_at: None, }); } Ok(result) } - /// Get reaction counts grouped by emoji for a post. + /// Get ALL reactions for a post, including tombstoned ones. Used for header rebuild + /// so tombstones propagate through pull-based sync. + pub fn get_reactions_with_tombstones(&self, post_id: &PostId) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT reactor, post_id, emoji, timestamp_ms, encrypted_payload, deleted_at + FROM reactions WHERE post_id = ?1 ORDER BY timestamp_ms ASC" + )?; + let rows = stmt.query_map(params![post_id.as_slice()], |row| { + let reactor: Vec = row.get(0)?; + let pid: Vec = row.get(1)?; + let emoji: String = row.get(2)?; + let ts: i64 = row.get(3)?; + let enc: Option = row.get(4)?; + let del: Option = row.get(5)?; + Ok((reactor, pid, emoji, ts, enc, del)) + })?; + let mut result = Vec::new(); + for row in rows { + let (reactor_bytes, pid_bytes, emoji, ts, enc, del) = row?; + let reactor = blob_to_nodeid(reactor_bytes)?; + let post_id = blob_to_postid(pid_bytes)?; + result.push(Reaction { + reactor, + emoji, + post_id, + timestamp_ms: ts as u64, + encrypted_payload: enc, + deleted_at: del.map(|v| v as u64), + }); + } + Ok(result) + } + + /// Get reaction counts grouped by emoji for a post (excludes tombstoned reactions). pub fn get_reaction_counts(&self, post_id: &PostId, my_node_id: &NodeId) -> anyhow::Result> { let mut stmt = self.conn.prepare( "SELECT emoji, COUNT(*) as cnt, SUM(CASE WHEN reactor = ?2 THEN 1 ELSE 0 END) as my_count - FROM reactions WHERE post_id = ?1 GROUP BY emoji ORDER BY cnt DESC" + FROM reactions WHERE post_id = ?1 AND deleted_at IS NULL GROUP BY emoji ORDER BY cnt DESC" )?; let rows = stmt.query_map(params![post_id.as_slice(), my_node_id.as_slice()], |row| { let emoji: String = row.get(0)?; @@ -3918,18 +4129,22 @@ impl Storage { // --- Engagement: comments --- - /// Store a comment. + /// Store a comment. Tombstone-aware upsert: if the incoming comment carries a + /// deleted_at tombstone, store it so the tombstone propagates. pub fn store_comment(&self, comment: &InlineComment) -> anyhow::Result<()> { self.conn.execute( - "INSERT INTO comments (author, post_id, content, timestamp_ms, signature) - VALUES (?1, ?2, ?3, ?4, ?5) - ON CONFLICT DO NOTHING", + "INSERT INTO comments (author, post_id, content, timestamp_ms, signature, deleted_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6) + ON CONFLICT(author, post_id, timestamp_ms) DO UPDATE SET + content = CASE WHEN excluded.deleted_at IS NOT NULL THEN content ELSE excluded.content END, + deleted_at = CASE WHEN excluded.deleted_at IS NOT NULL THEN excluded.deleted_at ELSE deleted_at END", params![ comment.author.as_slice(), comment.post_id.as_slice(), comment.content, comment.timestamp_ms as i64, comment.signature, + comment.deleted_at.map(|v| v as i64), ], )?; Ok(()) @@ -3944,20 +4159,20 @@ impl Storage { Ok(updated > 0) } - /// Delete a comment (must match author + post_id + timestamp_ms). + /// Tombstone a comment (soft-delete by setting deleted_at). pub fn delete_comment(&self, author: &NodeId, post_id: &PostId, timestamp_ms: u64) -> anyhow::Result { - let deleted = self.conn.execute( - "DELETE FROM comments WHERE author = ?1 AND post_id = ?2 AND timestamp_ms = ?3", - params![author.as_slice(), post_id.as_slice(), timestamp_ms as i64], + let updated = self.conn.execute( + "UPDATE comments SET deleted_at = ?4 WHERE author = ?1 AND post_id = ?2 AND timestamp_ms = ?3", + params![author.as_slice(), post_id.as_slice(), timestamp_ms as i64, now_ms()], )?; - Ok(deleted > 0) + Ok(updated > 0) } - /// Get all comments for a post, ordered by timestamp. + /// Get live (non-tombstoned) comments for a post. Used for UI display. pub fn get_comments(&self, post_id: &PostId) -> anyhow::Result> { let mut stmt = self.conn.prepare( "SELECT author, post_id, content, timestamp_ms, signature - FROM comments WHERE post_id = ?1 ORDER BY timestamp_ms ASC" + FROM comments WHERE post_id = ?1 AND deleted_at IS NULL ORDER BY timestamp_ms ASC" )?; let rows = stmt.query_map(params![post_id.as_slice()], |row| { let author: Vec = row.get(0)?; @@ -3978,15 +4193,49 @@ impl Storage { content, timestamp_ms: ts as u64, signature: sig, + deleted_at: None, }); } Ok(result) } - /// Get comment count for a post. + /// Get ALL comments for a post, including tombstoned ones. Used for header rebuild + /// so tombstones propagate through pull-based sync. + pub fn get_comments_with_tombstones(&self, post_id: &PostId) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT author, post_id, content, timestamp_ms, signature, deleted_at + FROM comments WHERE post_id = ?1 ORDER BY timestamp_ms ASC" + )?; + let rows = stmt.query_map(params![post_id.as_slice()], |row| { + let author: Vec = row.get(0)?; + let pid: Vec = row.get(1)?; + let content: String = row.get(2)?; + let ts: i64 = row.get(3)?; + let sig: Vec = row.get(4)?; + let del: Option = row.get(5)?; + Ok((author, pid, content, ts, sig, del)) + })?; + let mut result = Vec::new(); + for row in rows { + let (author_bytes, pid_bytes, content, ts, sig, del) = row?; + let author = blob_to_nodeid(author_bytes)?; + let post_id = blob_to_postid(pid_bytes)?; + result.push(InlineComment { + author, + post_id, + content, + timestamp_ms: ts as u64, + signature: sig, + deleted_at: del.map(|v| v as u64), + }); + } + Ok(result) + } + + /// Get comment count for a post (excludes tombstoned comments). pub fn get_comment_count(&self, post_id: &PostId) -> anyhow::Result { let count: i64 = self.conn.prepare( - "SELECT COUNT(*) FROM comments WHERE post_id = ?1" + "SELECT COUNT(*) FROM comments WHERE post_id = ?1 AND deleted_at IS NULL" )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; Ok(count as u64) } @@ -5460,6 +5709,7 @@ mod tests { post_id, timestamp_ms: 1000, encrypted_payload: None, + deleted_at: None, }).unwrap(); s.store_reaction(&Reaction { @@ -5468,6 +5718,7 @@ mod tests { post_id, timestamp_ms: 1001, encrypted_payload: None, + deleted_at: None, }).unwrap(); s.store_reaction(&Reaction { @@ -5476,6 +5727,7 @@ mod tests { post_id, timestamp_ms: 1002, encrypted_payload: None, + deleted_at: None, }).unwrap(); let reactions = s.get_reactions(&post_id).unwrap(); @@ -5508,6 +5760,7 @@ mod tests { content: "Nice post!".to_string(), timestamp_ms: 1000, signature: vec![0u8; 64], + deleted_at: None, }).unwrap(); s.store_comment(&InlineComment { @@ -5516,6 +5769,7 @@ mod tests { content: "I agree".to_string(), timestamp_ms: 1001, signature: vec![1u8; 64], + deleted_at: None, }).unwrap(); let comments = s.get_comments(&post_id).unwrap(); diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index f3c7120..57352b1 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -721,6 +721,9 @@ pub struct Reaction { /// If private: X25519-encrypted payload (only author can decrypt) #[serde(default)] pub encrypted_payload: Option, + /// Tombstone timestamp — if set, this reaction has been soft-deleted + #[serde(default)] + pub deleted_at: Option, } /// An inline comment on a post @@ -736,6 +739,9 @@ pub struct InlineComment { pub timestamp_ms: u64, /// ed25519 signature over BLAKE3(author || post_id || content || timestamp_ms) pub signature: Vec, + /// Tombstone timestamp — if set, this comment has been soft-deleted + #[serde(default)] + pub deleted_at: Option, } /// Permission level for comments on a post @@ -919,3 +925,61 @@ pub struct ThreadMeta { /// The original parent post this was split from pub parent_post_id: PostId, } + +// --- Active CDN Replication --- + +/// Device role for CDN replication budgets and pull ordering. +/// Determines how aggressively a device replicates content and how much +/// delivery bandwidth it reserves. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum DeviceRole { + /// Phone/tablet — high consumer, intermittent availability + Intermittent, + /// Desktop/laptop — moderate provider, usually available during sessions + Available, + /// Anchor/server — persistent availability, reserved for web serving + Persistent, +} + +impl DeviceRole { + /// Hourly replication budget in bytes (content we pull to cache for others). + pub fn replication_limit(&self) -> u64 { + match self { + DeviceRole::Intermittent => 100 * 1024 * 1024, // 100 MB/hr — phones rarely get replication requests + DeviceRole::Available => 200 * 1024 * 1024, // 200 MB/hr — desktops are workhorses + DeviceRole::Persistent => 200 * 1024 * 1024, // 200 MB/hr — anchors save capacity for web serving + } + } + + /// Hourly delivery budget in bytes (content we serve when peers ask). + pub fn delivery_limit(&self) -> u64 { + match self { + DeviceRole::Intermittent => 1024 * 1024 * 1024, // 1 GB/hr — phones serve heavily when online (OS data limits apply) + DeviceRole::Available => 2048 * 1024 * 1024, // 2 GB/hr — desktops on cheap home bandwidth + DeviceRole::Persistent => 1024 * 1024 * 1024, // 1 GB/hr — anchors reserved for web/browser serving + } + } + + pub fn as_str(&self) -> &'static str { + match self { + DeviceRole::Intermittent => "intermittent", + DeviceRole::Available => "available", + DeviceRole::Persistent => "persistent", + } + } + + pub fn from_str_label(s: &str) -> Self { + match s { + "intermittent" => DeviceRole::Intermittent, + "available" => DeviceRole::Available, + "persistent" => DeviceRole::Persistent, + _ => DeviceRole::Available, + } + } +} + +impl std::fmt::Display for DeviceRole { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} diff --git a/crates/tauri-app/Cargo.toml b/crates/tauri-app/Cargo.toml index 04885b6..6407378 100644 --- a/crates/tauri-app/Cargo.toml +++ b/crates/tauri-app/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsgoin-desktop" -version = "0.3.5" +version = "0.3.6" edition = "2021" [lib] diff --git a/crates/tauri-app/src/lib.rs b/crates/tauri-app/src/lib.rs index bb756c0..9477f72 100644 --- a/crates/tauri-app/src/lib.rs +++ b/crates/tauri-app/src/lib.rs @@ -1328,6 +1328,25 @@ async fn get_public_visible( .map_err(|e| e.to_string()) } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct CacheStatsDto { + used_bytes: u64, + max_bytes: u64, + blob_count: u64, +} + +#[tauri::command] +async fn get_cache_stats(state: State<'_, AppState>) -> Result { + let node = state.inner(); + let (used, max, count) = node.get_cache_stats().await.map_err(|e| e.to_string())?; + Ok(CacheStatsDto { + used_bytes: used, + max_bytes: max, + blob_count: count, + }) +} + #[tauri::command] async fn get_setting(state: State<'_, AppState>, key: String) -> Result, String> { let node = state.inner(); @@ -1340,6 +1359,56 @@ async fn set_setting(state: State<'_, AppState>, key: String, value: String) -> node.set_setting(&key, &value).await.map_err(|e| e.to_string()) } +#[tauri::command] +async fn mark_post_seen( + state: State<'_, AppState>, + post_id: String, + react_count: u32, + comment_count: u32, +) -> Result<(), String> { + let node = state.inner(); + let pid = hex_to_postid(&post_id)?; + node.set_seen_engagement(&pid, react_count, comment_count).await.map_err(|e| e.to_string()) +} + +#[tauri::command] +async fn mark_conversation_read( + state: State<'_, AppState>, + partner_id: String, +) -> Result<(), String> { + let node = state.inner(); + let nid = parse_node_id(&partner_id)?; + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + node.set_last_read_message(&nid, now_ms).await.map_err(|e| e.to_string()) +} + +#[tauri::command] +async fn get_seen_engagement( + state: State<'_, AppState>, + post_id: String, +) -> Result { + let node = state.inner(); + let pid = hex_to_postid(&post_id)?; + let (rc, cc) = node.get_seen_engagement(&pid).await.map_err(|e| e.to_string())?; + Ok(serde_json::json!({ + "seenReactCount": rc, + "seenCommentCount": cc, + })) +} + +#[tauri::command] +async fn get_last_read_message( + state: State<'_, AppState>, + partner_id_hex: String, +) -> Result { + let node = state.inner(); + let nid = parse_node_id(&partner_id_hex)?; + node.get_last_read_message(&nid).await.map_err(|e| e.to_string()) +} + #[tauri::command] async fn generate_share_link(state: State<'_, AppState>, post_id_hex: String) -> Result, String> { let node = state.inner(); @@ -1890,6 +1959,18 @@ pub fn run() { n.start_upnp_tcp_renewal_cycle(); // UPnP TCP lease renewal (for HTTP serving) n.start_http_server(); // HTTP post delivery (if publicly reachable) n.start_bootstrap_connectivity_check(); // 24h isolation check + n.start_replication_cycle(600); // 10 min active replication + + // Start blob eviction cycle (every 5 min) + let cache_max_bytes: u64 = { + let storage = n.storage.lock().await; + storage.get_setting("cache_size_bytes") + .ok() + .flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(1_073_741_824u64) // default 1 GB + }; + Node::start_eviction_cycle(Arc::clone(&n), 300, cache_max_bytes); Ok::<_, anyhow::Error>(n) })?; @@ -1964,8 +2045,13 @@ pub fn run() { write_message_comment, get_message_receipts, get_message_comments, + get_cache_stats, get_setting, set_setting, + mark_post_seen, + mark_conversation_read, + get_seen_engagement, + get_last_read_message, generate_share_link, ]) .build(tauri::generate_context!()) diff --git a/crates/tauri-app/tauri.conf.json b/crates/tauri-app/tauri.conf.json index bbf78ca..ae6d314 100644 --- a/crates/tauri-app/tauri.conf.json +++ b/crates/tauri-app/tauri.conf.json @@ -1,6 +1,6 @@ { "productName": "itsgoin", - "version": "0.3.5", + "version": "0.3.6", "identifier": "com.itsgoin.app", "build": { "frontendDist": "../../frontend", diff --git a/frontend/app.js b/frontend/app.js index 375b95e..2c1c4e4 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -358,10 +358,6 @@ function toast(msg) { } // --- Notifications (Tauri plugin) --- -let _notifiedMessages = new Set(); -let _notifiedReacts = new Set(); -let _notifiedComments = new Set(); -let _notifiedPosts = new Set(); let _notifReady = false; async function maybeNotify(title, body, tag) { try { @@ -635,40 +631,24 @@ async function loadFeed(force) { const oldFp = _feedFingerprint; _feedFingerprint = fp; - // Notify on new posts and engagement (skip first load) + // Notify on new posts and engagement (DB-backed seen tracking) if (_notifReady && oldFp) { try { - const notifPosts = await invoke('get_setting', { key: 'notif_posts' }).catch(() => null) || 'off'; const notifReacts = await invoke('get_setting', { key: 'notif_reacts' }).catch(() => null) || 'on'; for (const p of posts) { - // New post notifications - if (!p.isMe && notifPosts !== 'off' && !_notifiedPosts.has(p.id)) { - _notifiedPosts.add(p.id); - if (_notifiedPosts.size > posts.length) { // skip initial bulk - maybeNotify(`New post from ${p.authorName || p.author.slice(0,8)}`, (p.content || '').slice(0, 80), `post-${p.id}`); - } + if (!p.isMe) continue; + if (notifReacts === 'off') continue; + // Get DB-persisted seen counts + const seen = await invoke('get_seen_engagement', { postId: p.id }).catch(() => ({ seenReactCount: 0, seenCommentCount: 0 })); + const totalReacts = (p.reactionCounts || []).reduce((sum, r) => sum + r.count, 0); + const totalComments = p.commentCount || 0; + if (totalReacts > seen.seenReactCount) { + const newReacts = totalReacts - seen.seenReactCount; + maybeNotify('New reactions on your post', `${newReacts} new reaction${newReacts > 1 ? 's' : ''}`, `react-${p.id}`); } - // Reaction notifications on our posts - if (p.isMe && notifReacts !== 'off' && p.reactionCounts) { - for (const r of p.reactionCounts) { - const key = `${p.id}-${r.emoji}-${r.count}`; - if (!_notifiedReacts.has(key)) { - _notifiedReacts.add(key); - if (_notifiedReacts.size > 1) { - maybeNotify(`${r.emoji} on your post`, `${r.count} ${r.emoji} reactions`, `react-${key}`); - } - } - } - } - // Comment notifications on our posts - if (p.isMe && notifReacts !== 'off' && p.commentCount > 0) { - const key = `${p.id}-comments-${p.commentCount}`; - if (!_notifiedComments.has(key)) { - _notifiedComments.add(key); - if (_notifiedComments.size > 1) { - maybeNotify('New comment on your post', (p.content || '').slice(0, 40), `comment-${p.id}`); - } - } + if (totalComments > seen.seenCommentCount) { + const newComments = totalComments - seen.seenCommentCount; + maybeNotify('New comment on your post', (p.content || '').slice(0, 40), `comment-${p.id}`); } } } catch (_) {} @@ -730,6 +710,14 @@ async function loadMyPosts(force) { } } } + // Mark all visible own posts' engagement as seen (DB-backed) + for (const p of mine) { + const totalReacts = (p.reactionCounts || []).reduce((sum, r) => sum + r.count, 0); + const totalComments = p.commentCount || 0; + if (totalReacts > 0 || totalComments > 0) { + invoke('mark_post_seen', { postId: p.id, reactCount: totalReacts, commentCount: totalComments }).catch(() => {}); + } + } } catch (e) { myPostsList.innerHTML = `

Error: ${e}

`; } @@ -802,19 +790,18 @@ async function loadMessages(force) { if (!force && fp === _messagesFingerprint) return; _messagesFingerprint = fp; - // Notify on new incoming messages + // Notify on new incoming messages (DB-backed seen tracking) try { const notifMsg = await invoke('get_setting', { key: 'notif_messages' }).catch(() => null) || 'on'; - if (notifMsg !== 'off') { + if (_notifReady && notifMsg !== 'off') { for (const [partnerId, thread] of sortedThreads) { + const lastReadMs = await invoke('get_last_read_message', { partnerIdHex: partnerId }).catch(() => 0); for (const p of thread.posts) { - if (p.isMe || _notifiedMessages.has(p.id)) continue; - _notifiedMessages.add(p.id); - if (_notifiedMessages.size > 1) { // skip first load - const name = thread.partnerName || partnerId.slice(0, 8); - const body = notifMsg === 'preview' ? (p.decryptedContent || '').slice(0, 100) : 'New message'; - maybeNotify(`Message from ${name}`, body, `msg-${p.id}`); - } + if (p.isMe) continue; + if (p.timestampMs <= lastReadMs) continue; + const name = thread.partnerName || partnerId.slice(0, 8); + const body = notifMsg === 'preview' ? (p.decryptedContent || '').slice(0, 100) : 'New message'; + maybeNotify(`Message from ${name}`, body, `msg-${p.id}`); } } } @@ -897,6 +884,9 @@ async function loadMessages(force) { const input = $('#popover-reply-input'); if (input) setTimeout(() => input.focus(), 100); + // Mark conversation as read (DB-backed) + invoke('mark_conversation_read', { partnerId }).catch(() => {}); + // Mark incoming encrypted messages as "seen" for (const p of thread.posts) { if (!p.isMe && threadPostIds.includes(p.id)) { @@ -2364,7 +2354,7 @@ async function doSyncAll() { if (currentTab === 'myposts') loadMyPosts(true); if (currentTab === 'people') { loadFollows(); } if (currentTab === 'messages') loadMessages(true); - if (currentTab === 'settings') { loadRedundancy(); loadPublicVisible(); if (diagnosticsInterval) loadAllDiagnostics(); } + if (currentTab === 'settings') { loadRedundancy(); loadPublicVisible(); loadCacheStats(); if (diagnosticsInterval) loadAllDiagnostics(); } loadStats(); } catch (e) { toast('Sync error: ' + e); @@ -2422,6 +2412,53 @@ async function loadPublicVisible() { } } +// --- Cache storage settings --- +function formatBytes(bytes) { + if (bytes === 0) return '0 B'; + const units = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(bytes) / Math.log(1024)); + return (bytes / Math.pow(1024, i)).toFixed(i > 0 ? 1 : 0) + ' ' + units[i]; +} + +async function loadCacheStats() { + try { + const stats = await invoke('get_cache_stats'); + const display = $('#cache-stats-display'); + const maxLabel = stats.maxBytes === 0 ? 'Unlimited' : formatBytes(stats.maxBytes); + const pct = stats.maxBytes > 0 ? ` (${(stats.usedBytes / stats.maxBytes * 100).toFixed(1)}%)` : ''; + display.textContent = `${formatBytes(stats.usedBytes)} used of ${maxLabel}${pct} — ${stats.blobCount} blobs`; + } catch (e) { + console.error('loadCacheStats:', e); + } +} + +async function loadCacheSizeSetting() { + try { + const val = await invoke('get_setting', { key: 'cache_size_bytes' }); + if (val) { + const sel = $('#cache-size-select'); + // Match the option value + for (const opt of sel.options) { + if (opt.value === val) { sel.value = val; break; } + } + } + await loadCacheStats(); + } catch (e) { + console.error('loadCacheSizeSetting:', e); + } +} + +$('#cache-size-select').addEventListener('change', async () => { + const value = $('#cache-size-select').value; + try { + await invoke('set_setting', { key: 'cache_size_bytes', value }); + toast('Cache size updated — takes effect on next eviction cycle'); + await loadCacheStats(); + } catch (e) { + toast('Error saving cache size: ' + e); + } +}); + // --- Circle profiles --- async function loadCircleProfiles() { const container = $('#circle-profiles-list'); @@ -2659,7 +2696,7 @@ document.querySelectorAll('.tab').forEach(tab => { if (!conversationsList.children.length) conversationsList.innerHTML = renderLoading(); loadMessages(true); loadDmRecipientOptions(); } - if (target === 'settings') { loadRedundancy(); loadPublicVisible(); } + if (target === 'settings') { loadRedundancy(); loadPublicVisible(); loadCacheSizeSetting(); } }); }); }); diff --git a/frontend/index.html b/frontend/index.html index c29569e..babd113 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -214,6 +214,21 @@ +
+

Cache Storage

+
Loading...
+ + +
+

Redundancy

diff --git a/website/design.html b/website/design.html index 4af2104..aa728af 100644 --- a/website/design.html +++ b/website/design.html @@ -44,7 +44,8 @@

This is the canonical technical reference for ItsGoin. It describes the vision, the architecture, and the current state of every subsystem — with full implementation detail. This document is versioned; each update records what changed.

Changelog -

v0.3.5 (2026-03-20): Private blob encryption — attachments on encrypted posts (Friends/Circle/Direct) now encrypted with same CEK as post text; public blobs unchanged; CID on ciphertext. Blob prefetch on sync — attachments eagerly fetched after post pull for offline availability. Crypto refactoring — extracted reusable primitives (encrypt/decrypt_bytes_with_cek, unwrap_cek_for_recipient, unwrap_group_cek). Intent-based post filtering — feed/myposts/messages filter on intentKind instead of encryption state. Blob decryption API (get_blob_for_post). Download filename sanitization. Encrypted receipt & comment slots — private posts carry noise-prefilled encrypted slots in BlobHeader for delivery/read/react receipts and private comments; CDN-propagated as opaque bytes; slot key derived from post CEK; 3 new BlobHeaderDiffOps (WriteReceiptSlot, WriteCommentSlot, AddCommentSlots). Message UI — DM delivery indicators (checkmark/double/blue/emoji), auto-seen on view, react button on messages.

+

v0.3.6 (2026-03-20): Active CDN replication — all devices proactively replicate recent posts to peers (desktops > anchors > phones priority). ReplicationRequest/Response (0xE1/0xE2). Device roles (Intermittent/Available/Persistent) advertised in InitialExchange. Bandwidth budgets: replication (pull to cache) + delivery (serve requests), hourly auto-reset. Cache management: 1GB default, configurable, eviction cycle activated with share-link priority boost. Engagement distribution fix — BlobHeader JSON rebuilt after diff ops. Tombstone system — deleted reactions/comments tombstoned, propagate via pull sync. Persistent notifications via seen_engagement/seen_messages tables. DOS hardening: fan-out cap (10), prefetch cap (20), downstream registration cap (50), delivery budget enforcement. Pull preference reordered: non-anchors first.

+

v0.3.5 (2026-03-20): Private blob encryption — attachments on encrypted posts (Friends/Circle/Direct) now encrypted with same CEK as post text; public blobs unchanged; CID on ciphertext. Blob prefetch on sync — attachments eagerly fetched after post pull for offline availability. Crypto refactoring — extracted reusable primitives (encrypt/decrypt_bytes_with_cek, unwrap_cek_for_recipient, unwrap_group_cek). Intent-based post filtering — feed/myposts/messages filter on intentKind instead of encryption state. Blob decryption API (get_blob_for_post). Download filename sanitization. Encrypted receipt & comment slots — private posts carry noise-prefilled encrypted slots in BlobHeader for delivery/read/react receipts and private comments; CDN-propagated as opaque bytes; slot key derived from post CEK; 3 new BlobHeaderDiffOps (WriteReceiptSlot, WriteCommentSlot, AddCommentSlots). Message UI — DM delivery indicators (checkmark/double/blue/emoji), auto-seen on view, react button on messages.

v0.3.4 (2026-03-18): Comment edit & delete with trust-based propagation. Native notifications via Tauri plugin (messages, posts, reactions, comments). Forward-compatible BlobHeaderDiffOp::Unknown variant. Following Online/Offline lightbox. Comment threading scoping fix. Dropdown text legibility fix. Mobile hamburger nav for website.

v0.3.3 (2026-03-16): Connection rate limiting — incoming auth failures rate-limited per source IP (3 attempts, exponential backoff to ~256s). Schema versioning — PRAGMA user_version tracks DB version with migration framework. N2/N3 freshness — TTL 7d→5h, full N1/N2 re-broadcast every 4h, startup sweep clears stale entries. Bootstrap isolation recovery — 24h check verifies bootstrap is in N1/N2/N3, reconnects + sticky N1 advertisement if absent. IPv6 HTTP address fix — nodes advertise actual public IPv6 (not 0.0.0.0) for share link redirects. Upstream tracking — post_upstream table records post source for engagement diff routing toward author. Video preload fix — share links and in-app videos use preload=auto. Following Online/Offline split. DM filter from My Posts. Any-type file attachments with download prompt + trust warning. Image lightbox. Audio player.

v0.3.2 (2026-03-14): Bidirectional engagement propagation — BlobHeaderDiff flows upstream + downstream through CDN tree. Auto downstream registration on pull sync/push notification. TCP hole punch protocol (TcpPunchRequest/Result 0xD6/0xD7). Tiered web serving (redirect → TCP punch → QUIC proxy). Video playback fix (asset protocol + blob URL fallback). On-demand blob fetch for synced posts missing blob data.

diff --git a/website/download.html b/website/download.html index ef75f7b..74f9a16 100644 --- a/website/download.html +++ b/website/download.html @@ -25,16 +25,16 @@

Download ItsGoin

Available for Android and Linux. Free and open source.

-

Version 0.3.5 — March 15, 2026

+

Version 0.3.6 — March 15, 2026

@@ -46,7 +46,7 @@

Android

  1. Download the APK — Tap the button above. Your browser may warn that this type of file can be harmful — tap Download anyway.
  2. -
  3. Open the file — When the download finishes, tap the notification or find itsgoin-0.3.5.apk in your Downloads folder and tap it.
  4. +
  5. Open the file — When the download finishes, tap the notification or find itsgoin-0.3.6.apk in your Downloads folder and tap it.
  6. Allow installation — Android will ask you to allow installs from this source. Tap Settings, toggle "Allow from this source", then go back and tap Install.
  7. Launch the app — Once installed, tap Open or find ItsGoin in your app drawer.
@@ -59,8 +59,8 @@

Linux (AppImage)

  1. Download the AppImage — Click the button above to download.
  2. -
  3. Make it executable — Open a terminal and run:
    chmod +x itsgoin_0.3.5_amd64.AppImage
  4. -
  5. Run it — Double-click the file, or from the terminal:
    ./itsgoin_0.3.5_amd64.AppImage
  6. +
  7. Make it executable — Open a terminal and run:
    chmod +x itsgoin_0.3.6_amd64.AppImage
  8. +
  9. Run it — Double-click the file, or from the terminal:
    ./itsgoin_0.3.6_amd64.AppImage
Note: If it doesn't launch, you may need to install FUSE:
sudo apt install libfuse2 (Debian/Ubuntu) or sudo dnf install fuse (Fedora). @@ -71,16 +71,26 @@

Changelog

+
v0.3.6 — March 20, 2026
+
    +
  • Active CDN replication — All devices proactively request replication of their recent posts (<72h) to connected peers. Targets prioritized: desktops > anchors > phones. Graceful with small networks (1 peer = 1 replica). ReplicationRequest/Response (0xE1/0xE2) wire messages.
  • +
  • Device roles — Nodes classified as Intermittent (phones), Available (desktops), or Persistent (anchors). Advertised in InitialExchange. Influences replication target selection and budget defaults.
  • +
  • Bandwidth budgets — Hourly replication budget (content pulled to cache) and delivery budget (content served). Phones: 100MB/1GB, Desktops: 200MB/2GB, Anchors: 200MB/1GB. Auto-reset hourly. Blob serving declines when delivery budget exhausted.
  • +
  • Cache management — 1GB default cache limit (configurable 256MB–unlimited). Eviction cycle now active (was implemented but never started). Priority scoring with share-link boost (+100 for 3+ downstream). Cache pressure score (0–255) for future budget advertisement.
  • +
  • Engagement distribution fix — BlobHeader JSON now rebuilt after processing BlobHeaderDiff ops. Previously reactions/comments stored in tables but header JSON stayed stale, breaking pull-based sync for downstream peers.
  • +
  • Tombstone system — Deleted reactions/comments are tombstoned (deleted_at timestamp) instead of hard-deleted. Tombstones propagate through pull sync, ensuring deletes reach peers that missed the real-time diff.
  • +
  • Persistent notifications — Notification tracking backed by seen_engagement and seen_messages tables. Only notifies on genuinely unseen content. Survives app restarts.
  • +
  • DOS hardening — BlobHeaderDiff fan-out capped at 10 concurrent sends. Blob prefetch capped at 20 per cycle. PostDownstreamRegister capped at 50 per sync. Delivery budget enforcement on blob serving.
  • +
  • Pull preference — Blob fetches prefer non-anchor sources (phones > desktops > replicas > anchors) to preserve anchor delivery budget for web requests.
  • +
+
v0.3.5 — March 20, 2026
  • Private blob encryption — Attachments on encrypted posts (Friends, Circle, Direct) are now encrypted with the same CEK as the post text. Public blobs remain plaintext. CID computed on ciphertext preserves content addressing.
  • Blob prefetch on sync — When posts are pulled from peers, their attachments are eagerly fetched for offline availability. Previously blobs were only fetched on view.
  • Crypto refactoring — Extracted reusable primitives: encrypt_bytes_with_cek, decrypt_bytes_with_cek, unwrap_cek_for_recipient, unwrap_group_cek. Foundation for encrypted blob storage and future chunk-level encryption.
  • -
  • Intent-based post filtering — Feed, My Posts, and Messages now filter on the author's original visibility intent (intentKind) rather than encryption state. Direct messages are identified by intent, not by being “encrypted-for-me.” Backward-compatible with pre-intent posts.
  • -
  • Blob decryption on retrieval — New get_blob_for_post API decrypts private blobs in context of their post’s visibility. Public blobs pass through unchanged.
  • -
  • Encrypted receipt slots — Private messages get encrypted receipt and comment slots in their BlobHeader. Pre-filled with random noise so slot writes are indistinguishable from creation. Receipt states: delivered, seen, reacted. Only participants with the CEK can read slots; relay nodes propagate opaque bytes.
  • -
  • Message receipts & reactions — DM conversations show delivery indicators (checkmark → double checkmark → emoji). Opening a conversation marks messages as seen. React to messages with emoji.
  • -
  • Private comment slots — Encrypted comment capacity in private post headers (ceil(participants/3) slots, expandable). Participants can write short comments that propagate via CDN without revealing content to relays.
  • +
  • Intent-based post filtering — Feed, My Posts, and Messages now filter on the author's original visibility intent (intentKind) rather than encryption state.
  • +
  • Encrypted receipt slots — Private messages get encrypted receipt and comment slots in BlobHeader. Delivery indicators, read receipts, and message reactions.
  • Download filename sanitization — Prevents path traversal in downloaded file names.