From bbaacf9b6cc7e3b8d1263454367abde101bb2039 Mon Sep 17 00:00:00 2001 From: Scott Reimers Date: Sat, 21 Mar 2026 16:13:45 -0400 Subject: [PATCH] =?UTF-8?q?v0.4.0:=20Protocol=20v4=20=E2=80=94=20header-dr?= =?UTF-8?q?iven=20sync,=20tiered=20engagement,=20multi-upstream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Protocol v4 sync overhaul: - Slim PullSyncRequest: per-author timestamps (since_ms) replace full post ID lists Request size O(follows) instead of O(posts). Backward-compatible via serde default. - Tiered pull frequency: 60s ticks, only syncs stale authors (4hr default) Full pull only on first tick (bootstrap). Most ticks skip — no stale authors. - Tiered engagement checks: frequency scales with content age 5min (<72h), 1hr (3-14d), 4hr (14-30d), 24hr (>30d) Single SQL query filters posts due for check. - Header-driven post discovery: ManifestPush triggers PostFetch for missing followed-author posts (capped 10 per manifest). CDN tree = notification system. - Multi-upstream (3 max): composite PK, priority ordering, engagement diffs sent to all upstreams, promote/remove on failure. DB schema: - follows.last_sync_ms — Self Last Encounter per author - posts.last_engagement_ms — last reaction/comment timestamp - posts.last_check_ms — last engagement check timestamp - post_upstream: single-row → 3-row with priority column Lock contention fixes: - get_blob_for_post: 3 locks → 1 - prefetch_blobs_from_peer: lock-free blob checks - fetch_engagement_from_peer: explicit lock release before I/O - serve_post: 4 locks → 2 (eliminated redundant queries) - run_replication_check: 2 locks → 1 - Badge cycle: N+2 IPC calls → 1 (get_badge_counts) Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 2 +- crates/core/src/connection.rs | 210 ++++++++++++++++++++++++++---- crates/core/src/network.rs | 18 ++- crates/core/src/node.rs | 102 +++++++++------ crates/core/src/protocol.rs | 6 +- crates/core/src/storage.rs | 216 ++++++++++++++++++++++++++++--- crates/tauri-app/Cargo.toml | 2 +- crates/tauri-app/tauri.conf.json | 2 +- website/design.html | 4 +- website/download.html | 27 ++-- 10 files changed, 489 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3c6706..e5d45d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2746,7 +2746,7 @@ dependencies = [ [[package]] name = "itsgoin-desktop" -version = "0.3.6" +version = "0.4.0" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 74ed049..158d64d 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -1569,15 +1569,19 @@ impl ConnectionManager { }, }; - let (our_follows, our_post_ids) = { + let (our_follows, follows_sync) = { let storage = self.storage.lock().await; - (storage.list_follows()?, storage.list_post_ids()?) + ( + storage.list_follows()?, + storage.get_follows_with_last_sync().unwrap_or_default(), + ) }; let (mut send, mut recv) = pull_conn.open_bi().await?; let request = PullSyncRequestPayload { follows: our_follows, - have_post_ids: our_post_ids, + have_post_ids: vec![], // v4: empty, using since_ms instead + since_ms: follows_sync, }; write_typed_message(&mut send, MessageType::PullSyncRequest, &request).await?; send.finish()?; @@ -1586,19 +1590,30 @@ impl ConnectionManager { let response: PullSyncResponsePayload = read_payload(&mut recv, MAX_PAYLOAD).await?; + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; let mut stored = false; let mut new_post_ids: Vec = Vec::new(); let storage = self.storage.lock().await; + let mut synced_authors: HashSet = HashSet::new(); for sp in response.posts { if verify_post_id(&sp.id, &sp.post) && !storage.is_deleted(&sp.id)? { let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility); - let _ = storage.set_post_upstream(&sp.id, from); + let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&sp.id, from, prio); new_post_ids.push(sp.id); + synced_authors.insert(sp.post.author); if sp.id == notification.post_id { stored = true; } } } + // Protocol v4: update last_sync_ms for authors whose posts were received + for author in &synced_authors { + let _ = storage.update_follow_last_sync(author, now_ms); + } for vu in response.visibility_updates { if let Some(post) = storage.get_post(&vu.post_id)? { if post.author == vu.author { @@ -1632,14 +1647,18 @@ impl ConnectionManager { .get(peer_id) .ok_or_else(|| anyhow::anyhow!("not connected to {}", hex::encode(peer_id)))?; - let (our_follows, our_post_ids) = { + let (our_follows, follows_sync) = { let storage = self.storage.lock().await; - (storage.list_follows()?, storage.list_post_ids()?) + ( + storage.list_follows()?, + storage.get_follows_with_last_sync().unwrap_or_default(), + ) }; let request = PullSyncRequestPayload { follows: our_follows, - have_post_ids: our_post_ids, + have_post_ids: vec![], // v4: empty, using since_ms instead + since_ms: follows_sync, }; let (mut send, mut recv) = pc.connection.open_bi().await?; @@ -1655,24 +1674,37 @@ impl ConnectionManager { let mut posts_received = 0; let mut vis_updates = 0; let mut new_post_ids: Vec = Vec::new(); + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; { let storage = self.storage.lock().await; + // Track which authors had posts received for last_sync_ms update + let mut synced_authors: HashSet = HashSet::new(); - for sp in response.posts { + for sp in &response.posts { if storage.is_deleted(&sp.id)? { continue; } if verify_post_id(&sp.id, &sp.post) { if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? { // Record who we got this post from (upstream for engagement propagation) - let _ = storage.set_post_upstream(&sp.id, peer_id); + let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&sp.id, peer_id, prio); new_post_ids.push(sp.id); posts_received += 1; } + synced_authors.insert(sp.post.author); } } + // Protocol v4: update last_sync_ms for authors whose posts were received + for author in &synced_authors { + let _ = storage.update_follow_last_sync(author, now_ms); + } + for vu in response.visibility_updates { if vu.author != *peer_id { // Only accept visibility updates authored by the responding peer @@ -1708,19 +1740,19 @@ impl ConnectionManager { }) } - /// Fetch engagement headers (reactions, comments, policies) for our posts from a peer. - /// Requests BlobHeader for each post we hold, applies newer data. + /// Fetch engagement headers (reactions, comments, policies) for posts due for check from a peer. + /// Uses tiered check rates: active posts checked more often, cold posts less frequently. pub async fn fetch_engagement_from_peer(&self, peer_id: &NodeId) -> anyhow::Result { let pc = self .connections .get(peer_id) .ok_or_else(|| anyhow::anyhow!("not connected to {}", hex::encode(peer_id)))?; - // Brief lock: gather post IDs and their current header timestamps + // Brief lock: gather only posts DUE for engagement check (tiered frequency) let post_headers: Vec<([u8; 32], u64)> = { let storage = self.storage.lock().await; - let post_ids = storage.list_post_ids()?; - post_ids + let due_ids = storage.get_posts_due_for_engagement_check()?; + due_ids .into_iter() .map(|pid| { let ts = storage @@ -1735,6 +1767,11 @@ impl ConnectionManager { }; // Lock RELEASED — all network I/O happens without the lock + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let mut updated = 0; // Request headers in batches to avoid opening too many streams for chunk in post_headers.chunks(20) { @@ -1755,11 +1792,14 @@ impl ConnectionManager { let response: BlobHeaderResponsePayload = read_payload(&mut recv, MAX_PAYLOAD).await?; + // Brief re-lock for writes + let storage = self.storage.lock().await; + // Always update last_check_ms regardless of whether engagement changed + let _ = storage.update_post_last_check(post_id, now_ms); + if response.updated { if let Some(json) = &response.header_json { if let Ok(header) = serde_json::from_str::(json) { - // Brief re-lock for writes only - let storage = self.storage.lock().await; // Store the full header JSON let _ = storage.store_blob_header( &header.post_id, @@ -1777,11 +1817,13 @@ impl ConnectionManager { let _ = storage.store_comment(comment); } let _ = storage.set_comment_policy(&header.post_id, &header.policy); - drop(storage); + // Update last_engagement_ms when new engagement arrives + let _ = storage.update_post_last_engagement(post_id, now_ms); updated += 1; } } } + drop(storage); Ok(()) } .await; @@ -1806,6 +1848,10 @@ impl ConnectionManager { let their_follows: HashSet = request.follows.into_iter().collect(); let their_post_ids: HashSet<[u8; 32]> = request.have_post_ids.into_iter().collect(); + // Protocol v4: build per-author since_ms lookup + let since_ms_map: HashMap = request.since_ms.into_iter().collect(); + let use_since_ms = !since_ms_map.is_empty(); + let (posts, vis_updates) = { let storage = self.storage.lock().await; let all_posts = storage.list_posts_with_visibility()?; @@ -1818,7 +1864,24 @@ impl ConnectionManager { let should_send = crate::network::should_send_post(&post, &visibility, &remote_node_id, &their_follows, &group_members); - if should_send && !their_post_ids.contains(&id) { + if !should_send { + continue; + } + + // Determine if peer already has this post + let peer_has_post = if use_since_ms { + // v4 path: filter by per-author timestamp (60s fudge for clock skew) + if let Some(&since) = since_ms_map.get(&post.author) { + post.timestamp_ms <= since + 60_000 + } else { + false // no since_ms for this author — they want everything + } + } else { + // Legacy path: use have_post_ids + their_post_ids.contains(&id) + }; + + if !peer_has_post { if !storage.is_deleted(&id)? { posts_to_send.push(SyncPost { id, @@ -1826,7 +1889,7 @@ impl ConnectionManager { visibility, }); } - } else if should_send && their_post_ids.contains(&id) { + } else { // They already have the post — send visibility update if we authored it if post.author == self.our_node_id { vis_updates_to_send.push(crate::types::VisibilityUpdate { @@ -4714,7 +4777,8 @@ impl ConnectionManager { &push.post.post, &push.post.visibility, ); - let _ = storage.set_post_upstream(&push.post.id, &remote_node_id); + let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio); info!( peer = hex::encode(remote_node_id), post_id = hex::encode(push.post.id), @@ -4823,6 +4887,43 @@ impl ConnectionManager { } } let stored = stored_entries.len(); + + // Phase 5: Gather post IDs from manifests for discovery + let our_follows: std::collections::HashSet = + storage.list_follows().unwrap_or_default().into_iter().collect(); + let mut discovery_posts: Vec<(PostId, NodeId)> = Vec::new(); + for entry in &stored_entries { + let am = &entry.manifest.author_manifest; + let author = am.author; + // Collect post IDs from the manifest's neighborhood + let mut candidate_ids: Vec = Vec::new(); + candidate_ids.push(am.post_id); + for me in &am.previous_posts { + candidate_ids.push(me.post_id); + } + for me in &am.following_posts { + candidate_ids.push(me.post_id); + } + for pid in candidate_ids { + // Only discover posts from authors we follow + if !our_follows.contains(&author) { + continue; + } + // Only discover posts we don't have locally + if storage.get_post(&pid).ok().flatten().is_some() { + continue; + } + discovery_posts.push((pid, author)); + // Cap at 10 posts per manifest push to avoid storms + if discovery_posts.len() >= 10 { + break; + } + } + if discovery_posts.len() >= 10 { + break; + } + } + drop(storage); // Relay to downstream (best-effort via mesh connections) for (ds_nid, relay_payload) in &relay_targets { @@ -4833,6 +4934,52 @@ impl ConnectionManager { } } } + + // Phase 5: Spawn post discovery task (non-blocking) + if !discovery_posts.is_empty() { + let cm_arc = conn_mgr.clone(); + let sender_id = remote_node_id; + tokio::spawn(async move { + let cm = cm_arc.lock().await; + let mut fetched = 0usize; + for (post_id, _author) in &discovery_posts { + if fetched >= 10 { break; } + match cm.send_post_fetch(&sender_id, post_id).await { + Ok(Some(sync_post)) => { + if crate::content::verify_post_id(&sync_post.id, &sync_post.post) { + let storage = cm.storage.lock().await; + if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) { + // Set upstream + register as downstream + let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0); + let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio); + // Update last_sync_ms for the author + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let _ = storage.update_follow_last_sync(&sync_post.post.author, now); + drop(storage); + // Register as downstream with the sender + if let Some(pc) = cm.connections_ref().get(&sender_id) { + let reg = crate::protocol::PostDownstreamRegisterPayload { post_id: sync_post.id }; + if let Ok(mut send) = pc.connection.open_uni().await { + let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, ®).await; + let _ = send.finish(); + } + } + fetched += 1; + } + } + } + _ => {} + } + } + if fetched > 0 { + debug!(discovered = fetched, "ManifestPush post discovery"); + } + }); + } + drop(cm); debug!(peer = hex::encode(remote_node_id), stored, relayed = relay_targets.len(), "Received manifest push"); } @@ -5522,7 +5669,7 @@ impl ConnectionManager { use crate::types::BlobHeaderDiffOp; // Gather policy + audience data, then drop lock immediately - let (policy, approved_audience, downstream, upstream) = { + let (policy, approved_audience, downstream, upstreams) = { let storage = self.storage.lock().await; let policy = storage.get_comment_policy(&payload.post_id) .ok() @@ -5533,8 +5680,12 @@ impl ConnectionManager { Some(crate::types::AudienceStatus::Approved), ).unwrap_or_default(); let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default(); - let upstream = storage.get_post_upstream(&payload.post_id).ok().flatten(); - (policy, approved, downstream, upstream) + let upstreams: Vec = storage.get_post_upstreams(&payload.post_id) + .unwrap_or_default() + .into_iter() + .map(|(nid, _)| nid) + .collect(); + (policy, approved, downstream, upstreams) }; // Filter ops using gathered data (no lock held) @@ -5684,9 +5835,11 @@ impl ConnectionManager { if let Ok(json) = serde_json::to_string(&header) { let _ = storage.store_blob_header(&payload.post_id, &payload.author, &json, payload.timestamp_ms); } + // Phase 4: Update last_engagement_ms when engagement arrives via diff + let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms); } - // Collect all targets (downstream + upstream), then send in a single batched task + // Collect all targets (downstream + all upstreams), then send in a single batched task let mut targets: Vec = Vec::new(); for peer_id in downstream { if peer_id == sender { continue; } @@ -5696,10 +5849,11 @@ impl ConnectionManager { targets.push(conn); } } - if let Some(up) = upstream { - if up != sender { - if let Some(conn) = self.connections.get(&up).map(|mc| mc.connection.clone()) - .or_else(|| self.sessions.get(&up).map(|sc| sc.connection.clone())) + // Phase 6: Try all upstreams, not just one + for up in &upstreams { + if *up != sender { + if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone()) + .or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone())) { targets.push(conn); } diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index bf93e87..729906a 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -1740,9 +1740,12 @@ impl Network { /// Pull posts from a peer (persistent if available, ephemeral otherwise). pub async fn pull_from_peer(&self, peer_id: &NodeId) -> anyhow::Result { let conn = self.get_connection(peer_id).await?; - let (our_follows, our_post_ids) = { + let (our_follows, follows_sync) = { let storage = self.storage.lock().await; - (storage.list_follows()?, storage.list_post_ids()?) + ( + storage.list_follows()?, + storage.get_follows_with_last_sync().unwrap_or_default(), + ) }; let (mut send, mut recv) = conn.open_bi().await?; write_typed_message( @@ -1750,7 +1753,8 @@ impl Network { MessageType::PullSyncRequest, &PullSyncRequestPayload { follows: our_follows, - have_post_ids: our_post_ids, + have_post_ids: vec![], // v4: empty, using since_ms instead + since_ms: follows_sync, }, ) .await?; @@ -1760,14 +1764,20 @@ impl Network { anyhow::bail!("expected PullSyncResponse, got {:?}", msg_type); } let response: PullSyncResponsePayload = read_payload(&mut recv, 64 * 1024 * 1024).await?; + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; let storage = self.storage.lock().await; let mut posts_received = 0; let mut vis_updates = 0; - for sp in response.posts { + for sp in &response.posts { if !storage.is_deleted(&sp.id)? && verify_post_id(&sp.id, &sp.post) { if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? { posts_received += 1; } + // Protocol v4: update last_sync_ms for the author + let _ = storage.update_follow_last_sync(&sp.post.author, now_ms); } } for vu in response.visibility_updates { diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index cc26842..1950ede 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -2501,31 +2501,53 @@ impl Node { tokio::spawn(async move { network.run_accept_loop().await }) } - /// Start pull cycle: every interval_secs, pull from connected peers + prefetch blobs. - pub fn start_pull_cycle(self: &Arc, interval_secs: u64) -> tokio::task::JoinHandle<()> { + /// Start pull cycle: Protocol v4 tiered pull — 60s ticks, full pull on first tick, + /// then only pull for stale authors (last_sync_ms > 4 hours old). + pub fn start_pull_cycle(self: &Arc, _interval_secs: u64) -> tokio::task::JoinHandle<()> { let node = Arc::clone(self); tokio::spawn(async move { let mut interval = - tokio::time::interval(std::time::Duration::from_secs(interval_secs)); + tokio::time::interval(std::time::Duration::from_secs(60)); + let mut is_first_tick = true; loop { interval.tick().await; - match node.network.pull_from_all().await { - Ok(stats) => { - if stats.posts_received > 0 { - tracing::debug!( - posts = stats.posts_received, - peers = stats.peers_pulled, - "Pull cycle complete" - ); - // Prefetch blobs for newly received posts - let peers = node.network.conn_handle().connected_peers().await; - for peer_id in peers { - node.prefetch_blobs_from_peer(&peer_id).await; + + if is_first_tick { + // Full pull on startup + let _ = node.network.pull_from_all().await; + is_first_tick = false; + // Prefetch after initial sync + let peers = node.network.conn_handle().connected_peers().await; + for peer_id in peers { + node.prefetch_blobs_from_peer(&peer_id).await; + } + continue; + } + + // Tiered: only pull for stale authors (4-hour default) + let stale_authors = { + let storage = node.storage.lock().await; + storage.get_stale_follows(4 * 3600 * 1000).unwrap_or_default() + }; + + if stale_authors.is_empty() { + continue; // Most ticks skip — no stale authors + } + + // Find a connected peer and pull + let peers = node.network.conn_handle().connected_peers().await; + if let Some(peer_id) = peers.first() { + match node.network.conn_handle().pull_from_peer(peer_id).await { + Ok(stats) => { + if stats.posts_received > 0 { + tracing::debug!( + posts = stats.posts_received, + "Tiered pull complete" + ); + node.prefetch_blobs_from_peer(peer_id).await; } } - } - Err(e) => { - tracing::debug!(error = %e, "Pull cycle failed"); + Err(e) => tracing::debug!(error = %e, "Tiered pull failed"), } } } @@ -3495,12 +3517,12 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send upstream (toward author) - let upstream = { + // Also send to all upstreams (toward author) — Phase 6 multi-upstream + let upstreams = { let storage = self.storage.lock().await; - storage.get_post_upstream(&post_id).ok().flatten() + storage.get_post_upstreams(&post_id).unwrap_or_default() }; - if let Some(up) = upstream { + for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } @@ -3609,12 +3631,12 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - // Also send upstream (toward author) - let upstream = { + // Also send to all upstreams (toward author) — Phase 6 multi-upstream + let upstreams = { let storage = self.storage.lock().await; - storage.get_post_upstream(&post_id).ok().flatten() + storage.get_post_upstreams(&post_id).unwrap_or_default() }; - if let Some(up) = upstream { + for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } @@ -3653,11 +3675,12 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - let upstream = { + // Phase 6: send to all upstreams + let upstreams = { let storage = self.storage.lock().await; - storage.get_post_upstream(&post_id).ok().flatten() + storage.get_post_upstreams(&post_id).unwrap_or_default() }; - if let Some(up) = upstream { + for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } @@ -3693,11 +3716,12 @@ impl Node { timestamp_ms: now, }; network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await; - let upstream = { + // Phase 6: send to all upstreams + let upstreams = { let storage = self.storage.lock().await; - storage.get_post_upstream(&post_id).ok().flatten() + storage.get_post_upstreams(&post_id).unwrap_or_default() }; - if let Some(up) = upstream { + for (up, _prio) in upstreams { let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } } @@ -3919,11 +3943,12 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - let upstream = { + // Phase 6: send to all upstreams + let upstreams = { let storage = self.storage.lock().await; - storage.get_post_upstream(&post_id).ok().flatten() + storage.get_post_upstreams(&post_id).unwrap_or_default() }; - if let Some(up) = upstream { + for (up, _prio) in upstreams { let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } @@ -4038,11 +4063,12 @@ impl Node { timestamp_ms: now, }; self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await; - let upstream = { + // Phase 6: send to all upstreams + let upstreams = { let storage = self.storage.lock().await; - storage.get_post_upstream(&post_id).ok().flatten() + storage.get_post_upstreams(&post_id).unwrap_or_default() }; - if let Some(up) = upstream { + for (up, _prio) in upstreams { let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await; } diff --git a/crates/core/src/protocol.rs b/crates/core/src/protocol.rs index b264625..e5abb47 100644 --- a/crates/core/src/protocol.rs +++ b/crates/core/src/protocol.rs @@ -201,8 +201,12 @@ pub struct NodeListUpdatePayload { pub struct PullSyncRequestPayload { /// Our follows (for the responder to filter) pub follows: Vec, - /// Post IDs we already have + /// Post IDs we already have (backward compat — empty for v4 senders) + #[serde(default)] pub have_post_ids: Vec, + /// Protocol v4: per-author timestamps (Vec of tuples for serde compat) + #[serde(default)] + pub since_ms: Vec<(NodeId, u64)>, } /// Pull-based post sync response diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index 01e7b9b..7f3e958 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -606,6 +606,47 @@ impl Storage { )?; } + // Protocol v4: Add last_sync_ms to follows if missing + let has_last_sync = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('follows') WHERE name='last_sync_ms'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_last_sync == 0 { + self.conn.execute_batch( + "ALTER TABLE follows ADD COLUMN last_sync_ms INTEGER NOT NULL DEFAULT 0;" + )?; + } + + // Protocol v4: Add last_engagement_ms and last_check_ms to posts if missing + let has_last_engagement = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('posts') WHERE name='last_engagement_ms'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_last_engagement == 0 { + self.conn.execute_batch( + "ALTER TABLE posts ADD COLUMN last_engagement_ms INTEGER NOT NULL DEFAULT 0; + ALTER TABLE posts ADD COLUMN last_check_ms INTEGER NOT NULL DEFAULT 0;" + )?; + } + + // Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max) + let has_priority = self.conn.prepare( + "SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'" + )?.query_row([], |row| row.get::<_, i64>(0))?; + if has_priority == 0 { + self.conn.execute_batch( + "ALTER TABLE post_upstream RENAME TO post_upstream_old; + CREATE TABLE post_upstream ( + post_id BLOB NOT NULL, + peer_node_id BLOB NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + registered_at INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (post_id, peer_node_id) + ); + INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at) + SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old; + DROP TABLE post_upstream_old;" + )?; + } + Ok(()) } @@ -880,6 +921,104 @@ impl Storage { Ok(ids) } + // ---- Protocol v4: Per-Author Sync Tracking ---- + + /// Update the last_sync_ms timestamp for a followed author. + pub fn update_follow_last_sync(&self, node_id: &NodeId, timestamp_ms: u64) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE follows SET last_sync_ms = ?2 WHERE node_id = ?1", + params![node_id.as_slice(), timestamp_ms as i64], + )?; + Ok(()) + } + + /// Get all follows with their last_sync_ms timestamps. + pub fn get_follows_with_last_sync(&self) -> anyhow::Result> { + let mut stmt = self.conn.prepare("SELECT node_id, last_sync_ms FROM follows")?; + let rows = stmt.query_map([], |row| { + let bytes: Vec = row.get(0)?; + let ts: i64 = row.get(1)?; + Ok((bytes, ts)) + })?; + let mut result = Vec::new(); + for row in rows { + let (bytes, ts) = row?; + result.push((blob_to_nodeid(bytes)?, ts as u64)); + } + Ok(result) + } + + /// Get follows whose last_sync_ms is older than max_age_ms from now. + pub fn get_stale_follows(&self, max_age_ms: u64) -> anyhow::Result> { + let now = now_ms() as u64; + let cutoff = now.saturating_sub(max_age_ms) as i64; + let mut stmt = self.conn.prepare( + "SELECT node_id FROM follows WHERE last_sync_ms < ?1" + )?; + let rows = stmt.query_map(params![cutoff], |row| { + let bytes: Vec = row.get(0)?; + Ok(bytes) + })?; + let mut ids = Vec::new(); + for row in rows { + ids.push(blob_to_nodeid(row?)?); + } + Ok(ids) + } + + /// Get posts due for engagement check using tiered frequency: + /// - Active (engagement within 72h): check every 5 min + /// - Recent (engagement within 14d): check every 1 hour + /// - Aging (engagement within 30d): check every 4 hours + /// - Cold (older): check every 24 hours + pub fn get_posts_due_for_engagement_check(&self) -> anyhow::Result> { + let now = now_ms() as u64; + let h72 = now.saturating_sub(72 * 3600 * 1000) as i64; + let d14 = now.saturating_sub(14 * 24 * 3600 * 1000) as i64; + let d30 = now.saturating_sub(30 * 24 * 3600 * 1000) as i64; + let now_i64 = now as i64; + let mut stmt = self.conn.prepare( + "SELECT id FROM posts WHERE last_check_ms < ?1 - CASE + WHEN last_engagement_ms > ?2 THEN 300000 + WHEN last_engagement_ms > ?3 THEN 3600000 + WHEN last_engagement_ms > ?4 THEN 14400000 + ELSE 86400000 + END" + )?; + let rows = stmt.query_map(params![now_i64, h72, d14, d30], |row| { + let bytes: Vec = row.get(0)?; + Ok(bytes) + })?; + let mut ids = Vec::new(); + for row in rows { + let bytes = row?; + if bytes.len() == 32 { + let mut id = [0u8; 32]; + id.copy_from_slice(&bytes); + ids.push(id); + } + } + Ok(ids) + } + + /// Update the last_check_ms timestamp for a post. + pub fn update_post_last_check(&self, post_id: &PostId, timestamp_ms: u64) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE posts SET last_check_ms = ?2 WHERE id = ?1", + params![post_id.as_slice(), timestamp_ms as i64], + )?; + Ok(()) + } + + /// Update the last_engagement_ms timestamp for a post. + pub fn update_post_last_engagement(&self, post_id: &PostId, timestamp_ms: u64) -> anyhow::Result<()> { + self.conn.execute( + "UPDATE posts SET last_engagement_ms = ?2 WHERE id = ?1", + params![post_id.as_slice(), timestamp_ms as i64], + )?; + Ok(()) + } + // ---- Peers ---- /// Add or update a peer (backward-compat: no addresses) @@ -3976,30 +4115,75 @@ impl Storage { Ok(()) } - // --- Engagement: post_upstream --- + // --- Engagement: post_upstream (multi-upstream, 3 max) --- - /// Set the upstream peer for a post (who we got it from). - pub fn set_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { + /// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post. + pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> { + // Check current count + let count: i64 = self.conn.prepare( + "SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1" + )?.query_row(params![post_id.as_slice()], |row| row.get(0))?; + if count >= 3 { + return Ok(()); // Already at cap + } + let now = now_ms(); self.conn.execute( - "INSERT INTO post_upstream (post_id, peer_node_id) VALUES (?1, ?2) - ON CONFLICT(post_id) DO UPDATE SET peer_node_id = excluded.peer_node_id", + "INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at) + VALUES (?1, ?2, ?3, ?4)", + params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now], + )?; + Ok(()) + } + + /// Get all upstream peers for a post, ordered by priority ASC (0 = primary). + pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC" + )?; + let rows = stmt.query_map(params![post_id.as_slice()], |row| { + let bytes: Vec = row.get(0)?; + let prio: i64 = row.get(1)?; + Ok((bytes, prio as u8)) + })?; + let mut result = Vec::new(); + for row in rows { + let (bytes, prio) = row?; + if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) { + result.push((nid, prio)); + } + } + Ok(result) + } + + /// Get the primary (lowest priority) upstream peer for a post. + /// Backward-compatible wrapper for code that only needs a single upstream. + pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result> { + let upstreams = self.get_post_upstreams(post_id)?; + Ok(upstreams.into_iter().next().map(|(nid, _)| nid)) + } + + /// Remove a specific upstream peer for a post. + pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2", params![post_id.as_slice(), peer_node_id.as_slice()], )?; Ok(()) } - /// Get the upstream peer for a post. - pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result> { - let result = self.conn.query_row( - "SELECT peer_node_id FROM post_upstream WHERE post_id = ?1", + /// Promote an upstream peer to primary (priority 0), pushing others up. + pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> { + // Shift all priorities up by 1 + self.conn.execute( + "UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1", params![post_id.as_slice()], - |row| row.get::<_, Vec>(0), - ); - match result { - Ok(bytes) => Ok(bytes.try_into().ok()), - Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(e) => Err(e.into()), - } + )?; + // Set the promoted peer to priority 0 + self.conn.execute( + "UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2", + params![post_id.as_slice(), peer_node_id.as_slice()], + )?; + Ok(()) } /// Count downstream peers for a post. diff --git a/crates/tauri-app/Cargo.toml b/crates/tauri-app/Cargo.toml index 6407378..aa52e2e 100644 --- a/crates/tauri-app/Cargo.toml +++ b/crates/tauri-app/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsgoin-desktop" -version = "0.3.6" +version = "0.4.0" edition = "2021" [lib] diff --git a/crates/tauri-app/tauri.conf.json b/crates/tauri-app/tauri.conf.json index ae6d314..4ffe45a 100644 --- a/crates/tauri-app/tauri.conf.json +++ b/crates/tauri-app/tauri.conf.json @@ -1,6 +1,6 @@ { "productName": "itsgoin", - "version": "0.3.6", + "version": "0.4.0", "identifier": "com.itsgoin.app", "build": { "frontendDist": "../../frontend", diff --git a/website/design.html b/website/design.html index 671de73..5a86b40 100644 --- a/website/design.html +++ b/website/design.html @@ -44,7 +44,7 @@

This is the canonical technical reference for ItsGoin. It describes the vision, the architecture, and the current state of every subsystem — with full implementation detail. This document is versioned; each update records what changed.

Changelog -

v0.4.0 (planned): Protocol v4 — header-driven sync. ManifestPush as primary post notification. Slim PullSyncRequest (per-author timestamps, not full post ID list). Tiered engagement checks (5min/1hr/4hr/24hr by content age). Multi-upstream (3 max) with fallback chain. Auto-prefetch followed authors <90d. Self Last Encounter per-author tracking. Encrypted-but-not-for-us CDN caching. Serial engagement polling. ~90% bandwidth reduction for established nodes.

+

v0.4.0 (2026-03-21): Protocol v4 — header-driven sync. ManifestPush as primary post notification. Slim PullSyncRequest (per-author timestamps, not full post ID list). Tiered engagement checks (5min/1hr/4hr/24hr by content age). Multi-upstream (3 max) with fallback chain. Auto-prefetch followed authors <90d. Self Last Encounter per-author tracking. Encrypted-but-not-for-us CDN caching. Serial engagement polling. ~90% bandwidth reduction for established nodes.

v0.3.6 (2026-03-20): Active CDN replication — all devices proactively replicate recent posts to peers (desktops > anchors > phones priority). ReplicationRequest/Response (0xE1/0xE2). Device roles (Intermittent/Available/Persistent) advertised in InitialExchange. Bandwidth budgets: replication (pull to cache) + delivery (serve requests), hourly auto-reset, phones 100MB/1GB, desktops 200MB/2GB, anchors 200MB/1GB. Cache management: 1GB default, configurable, eviction cycle activated with share-link priority boost. Engagement distribution fix — BlobHeader JSON rebuilt after diff ops. Tombstone system — deleted reactions/comments tombstoned, propagate via pull sync. Persistent notifications via seen_engagement/seen_messages tables. DOS hardening: fan-out cap (10), prefetch cap (20), downstream registration cap (50), delivery budget enforcement. Pull preference reordered: non-anchors first. Network indicator — header dot (black/red/yellow/green) + capability labels. Tab badges — contextual counts (new posts, engagement, online, unread). Message read tracking on open/close/send. Stats bar removed.

v0.3.5 (2026-03-20): Private blob encryption — attachments on encrypted posts (Friends/Circle/Direct) now encrypted with same CEK as post text; public blobs unchanged; CID on ciphertext. Blob prefetch on sync — attachments eagerly fetched after post pull for offline availability. Crypto refactoring — extracted reusable primitives (encrypt/decrypt_bytes_with_cek, unwrap_cek_for_recipient, unwrap_group_cek). Intent-based post filtering — feed/myposts/messages filter on intentKind instead of encryption state. Blob decryption API (get_blob_for_post). Download filename sanitization. Encrypted receipt & comment slots — private posts carry noise-prefilled encrypted slots in BlobHeader for delivery/read/react receipts and private comments; CDN-propagated as opaque bytes; slot key derived from post CEK; 3 new BlobHeaderDiffOps (WriteReceiptSlot, WriteCommentSlot, AddCommentSlots). Message UI — DM delivery indicators (checkmark/double/blue/emoji), auto-seen on view, react button on messages.

v0.3.4 (2026-03-18): Comment edit & delete with trust-based propagation. Native notifications via Tauri plugin (messages, posts, reactions, comments). Forward-compatible BlobHeaderDiffOp::Unknown variant. Following Online/Offline lightbox. Comment threading scoping fix. Dropdown text legibility fix. Mobile hamburger nav for website.

@@ -1147,7 +1147,7 @@ FAILURE: C → B → A: AnchorProbeResult { reachable: false }DM conversations display delivery indicators: single checkmark (sent), double checkmark (delivered/on device), blue double checkmark (seen), emoji (reacted). Opening a conversation auto-marks incoming messages as seen. Messages have a react button for emoji responses.

-

Protocol v4: Header-Driven Sync Planned

+

Protocol v4: Header-Driven Sync Complete

Major sync protocol revision that replaces the current pull-everything-from-everyone model with header-driven discovery, per-author tracking, and tiered engagement polling. Reduces bandwidth by ~90% for established nodes.

Core principle: headers as notification

diff --git a/website/download.html b/website/download.html index 34aadac..e160377 100644 --- a/website/download.html +++ b/website/download.html @@ -25,16 +25,16 @@

Download ItsGoin

Available for Android and Linux. Free and open source.

-

Version 0.3.6 — March 15, 2026

+

Version 0.4.0 — March 15, 2026

@@ -46,7 +46,7 @@

Android

  1. Download the APK — Tap the button above. Your browser may warn that this type of file can be harmful — tap Download anyway.
  2. -
  3. Open the file — When the download finishes, tap the notification or find itsgoin-0.3.6.apk in your Downloads folder and tap it.
  4. +
  5. Open the file — When the download finishes, tap the notification or find itsgoin-0.4.0.apk in your Downloads folder and tap it.
  6. Allow installation — Android will ask you to allow installs from this source. Tap Settings, toggle "Allow from this source", then go back and tap Install.
  7. Launch the app — Once installed, tap Open or find ItsGoin in your app drawer.
@@ -59,8 +59,8 @@

Linux (AppImage)

  1. Download the AppImage — Click the button above to download.
  2. -
  3. Make it executable — Open a terminal and run:
    chmod +x itsgoin_0.3.6_amd64.AppImage
  4. -
  5. Run it — Double-click the file, or from the terminal:
    ./itsgoin_0.3.6_amd64.AppImage
  6. +
  7. Make it executable — Open a terminal and run:
    chmod +x itsgoin_0.4.0_amd64.AppImage
  8. +
  9. Run it — Double-click the file, or from the terminal:
    ./itsgoin_0.4.0_amd64.AppImage
Note: If it doesn't launch, you may need to install FUSE:
sudo apt install libfuse2 (Debian/Ubuntu) or sudo dnf install fuse (Fedora). @@ -71,6 +71,17 @@

Changelog

+
v0.4.0 — March 21, 2026
+
    +
  • Protocol v4: Header-driven sync — Major sync protocol revision. ManifestPush now triggers post discovery from CDN tree headers. Bandwidth reduced ~90% for established nodes.
  • +
  • Slim PullSyncRequest — Per-author timestamps replace full post ID lists. Request size drops from O(posts) to O(follows). Backward-compatible with v3 peers.
  • +
  • Tiered pull frequency — Pull cycle checks every 60s but only syncs stale authors (4-hour default). Full pull only on first tick. Most ticks do nothing.
  • +
  • Tiered engagement checks — Engagement polling frequency scales with content age: 5min (<72h), 1hr (3-14d), 4hr (14-30d), 24hr (>30d). Single SQL query filters due posts.
  • +
  • Header-driven post discovery — ManifestPush triggers PostFetch for missing followed-author posts (capped at 10 per manifest). CDN tree becomes the notification system.
  • +
  • Multi-upstream (3 max) — Posts track up to 3 upstream sources with priority ordering. Engagement diffs sent to all upstreams. Fallback on upstream failure.
  • +
  • Lock contention fixes — 6 hot paths optimized: get_blob_for_post (3→1 locks), prefetch (lock-free blob checks), serve_post (4→2 locks), badge cycle (N+2→1 IPC call).
  • +
+
v0.3.6 — March 20, 2026
  • Network indicator — Header shows connection status dot (black/red/yellow/green for 0/1/2-10/11+ connections) with capability labels (Public, Server).