v0.4.0: Protocol v4 — header-driven sync, tiered engagement, multi-upstream

Protocol v4 sync overhaul:
- Slim PullSyncRequest: per-author timestamps (since_ms) replace full post ID lists
  Request size O(follows) instead of O(posts). Backward-compatible via serde default.
- Tiered pull frequency: 60s ticks, only syncs stale authors (4hr default)
  Full pull only on first tick (bootstrap). Most ticks skip — no stale authors.
- Tiered engagement checks: frequency scales with content age
  5min (<72h), 1hr (3-14d), 4hr (14-30d), 24hr (>30d)
  Single SQL query filters posts due for check.
- Header-driven post discovery: ManifestPush triggers PostFetch for missing
  followed-author posts (capped 10 per manifest). CDN tree = notification system.
- Multi-upstream (3 max): composite PK, priority ordering, engagement diffs
  sent to all upstreams, promote/remove on failure.

DB schema:
- follows.last_sync_ms — Self Last Encounter per author
- posts.last_engagement_ms — last reaction/comment timestamp
- posts.last_check_ms — last engagement check timestamp
- post_upstream: single-row → 3-row with priority column

Lock contention fixes:
- get_blob_for_post: 3 locks → 1
- prefetch_blobs_from_peer: lock-free blob checks
- fetch_engagement_from_peer: explicit lock release before I/O
- serve_post: 4 locks → 2 (eliminated redundant queries)
- run_replication_check: 2 locks → 1
- Badge cycle: N+2 IPC calls → 1 (get_badge_counts)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-03-21 16:13:45 -04:00
parent 1df00eebf8
commit bbaacf9b6c
10 changed files with 489 additions and 100 deletions

View file

@ -1569,15 +1569,19 @@ impl ConnectionManager {
},
};
let (our_follows, our_post_ids) = {
let (our_follows, follows_sync) = {
let storage = self.storage.lock().await;
(storage.list_follows()?, storage.list_post_ids()?)
(
storage.list_follows()?,
storage.get_follows_with_last_sync().unwrap_or_default(),
)
};
let (mut send, mut recv) = pull_conn.open_bi().await?;
let request = PullSyncRequestPayload {
follows: our_follows,
have_post_ids: our_post_ids,
have_post_ids: vec![], // v4: empty, using since_ms instead
since_ms: follows_sync,
};
write_typed_message(&mut send, MessageType::PullSyncRequest, &request).await?;
send.finish()?;
@ -1586,19 +1590,30 @@ impl ConnectionManager {
let response: PullSyncResponsePayload =
read_payload(&mut recv, MAX_PAYLOAD).await?;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut stored = false;
let mut new_post_ids: Vec<PostId> = Vec::new();
let storage = self.storage.lock().await;
let mut synced_authors: HashSet<NodeId> = HashSet::new();
for sp in response.posts {
if verify_post_id(&sp.id, &sp.post) && !storage.is_deleted(&sp.id)? {
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
let _ = storage.set_post_upstream(&sp.id, from);
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, from, prio);
new_post_ids.push(sp.id);
synced_authors.insert(sp.post.author);
if sp.id == notification.post_id {
stored = true;
}
}
}
// Protocol v4: update last_sync_ms for authors whose posts were received
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
}
for vu in response.visibility_updates {
if let Some(post) = storage.get_post(&vu.post_id)? {
if post.author == vu.author {
@ -1632,14 +1647,18 @@ impl ConnectionManager {
.get(peer_id)
.ok_or_else(|| anyhow::anyhow!("not connected to {}", hex::encode(peer_id)))?;
let (our_follows, our_post_ids) = {
let (our_follows, follows_sync) = {
let storage = self.storage.lock().await;
(storage.list_follows()?, storage.list_post_ids()?)
(
storage.list_follows()?,
storage.get_follows_with_last_sync().unwrap_or_default(),
)
};
let request = PullSyncRequestPayload {
follows: our_follows,
have_post_ids: our_post_ids,
have_post_ids: vec![], // v4: empty, using since_ms instead
since_ms: follows_sync,
};
let (mut send, mut recv) = pc.connection.open_bi().await?;
@ -1655,24 +1674,37 @@ impl ConnectionManager {
let mut posts_received = 0;
let mut vis_updates = 0;
let mut new_post_ids: Vec<PostId> = Vec::new();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
{
let storage = self.storage.lock().await;
// Track which authors had posts received for last_sync_ms update
let mut synced_authors: HashSet<NodeId> = HashSet::new();
for sp in response.posts {
for sp in &response.posts {
if storage.is_deleted(&sp.id)? {
continue;
}
if verify_post_id(&sp.id, &sp.post) {
if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? {
// Record who we got this post from (upstream for engagement propagation)
let _ = storage.set_post_upstream(&sp.id, peer_id);
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sp.id, peer_id, prio);
new_post_ids.push(sp.id);
posts_received += 1;
}
synced_authors.insert(sp.post.author);
}
}
// Protocol v4: update last_sync_ms for authors whose posts were received
for author in &synced_authors {
let _ = storage.update_follow_last_sync(author, now_ms);
}
for vu in response.visibility_updates {
if vu.author != *peer_id {
// Only accept visibility updates authored by the responding peer
@ -1708,19 +1740,19 @@ impl ConnectionManager {
})
}
/// Fetch engagement headers (reactions, comments, policies) for our posts from a peer.
/// Requests BlobHeader for each post we hold, applies newer data.
/// Fetch engagement headers (reactions, comments, policies) for posts due for check from a peer.
/// Uses tiered check rates: active posts checked more often, cold posts less frequently.
pub async fn fetch_engagement_from_peer(&self, peer_id: &NodeId) -> anyhow::Result<usize> {
let pc = self
.connections
.get(peer_id)
.ok_or_else(|| anyhow::anyhow!("not connected to {}", hex::encode(peer_id)))?;
// Brief lock: gather post IDs and their current header timestamps
// Brief lock: gather only posts DUE for engagement check (tiered frequency)
let post_headers: Vec<([u8; 32], u64)> = {
let storage = self.storage.lock().await;
let post_ids = storage.list_post_ids()?;
post_ids
let due_ids = storage.get_posts_due_for_engagement_check()?;
due_ids
.into_iter()
.map(|pid| {
let ts = storage
@ -1735,6 +1767,11 @@ impl ConnectionManager {
};
// Lock RELEASED — all network I/O happens without the lock
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut updated = 0;
// Request headers in batches to avoid opening too many streams
for chunk in post_headers.chunks(20) {
@ -1755,11 +1792,14 @@ impl ConnectionManager {
let response: BlobHeaderResponsePayload =
read_payload(&mut recv, MAX_PAYLOAD).await?;
// Brief re-lock for writes
let storage = self.storage.lock().await;
// Always update last_check_ms regardless of whether engagement changed
let _ = storage.update_post_last_check(post_id, now_ms);
if response.updated {
if let Some(json) = &response.header_json {
if let Ok(header) = serde_json::from_str::<crate::types::BlobHeader>(json) {
// Brief re-lock for writes only
let storage = self.storage.lock().await;
// Store the full header JSON
let _ = storage.store_blob_header(
&header.post_id,
@ -1777,11 +1817,13 @@ impl ConnectionManager {
let _ = storage.store_comment(comment);
}
let _ = storage.set_comment_policy(&header.post_id, &header.policy);
drop(storage);
// Update last_engagement_ms when new engagement arrives
let _ = storage.update_post_last_engagement(post_id, now_ms);
updated += 1;
}
}
}
drop(storage);
Ok(())
}
.await;
@ -1806,6 +1848,10 @@ impl ConnectionManager {
let their_follows: HashSet<NodeId> = request.follows.into_iter().collect();
let their_post_ids: HashSet<[u8; 32]> = request.have_post_ids.into_iter().collect();
// Protocol v4: build per-author since_ms lookup
let since_ms_map: HashMap<NodeId, u64> = request.since_ms.into_iter().collect();
let use_since_ms = !since_ms_map.is_empty();
let (posts, vis_updates) = {
let storage = self.storage.lock().await;
let all_posts = storage.list_posts_with_visibility()?;
@ -1818,7 +1864,24 @@ impl ConnectionManager {
let should_send =
crate::network::should_send_post(&post, &visibility, &remote_node_id, &their_follows, &group_members);
if should_send && !their_post_ids.contains(&id) {
if !should_send {
continue;
}
// Determine if peer already has this post
let peer_has_post = if use_since_ms {
// v4 path: filter by per-author timestamp (60s fudge for clock skew)
if let Some(&since) = since_ms_map.get(&post.author) {
post.timestamp_ms <= since + 60_000
} else {
false // no since_ms for this author — they want everything
}
} else {
// Legacy path: use have_post_ids
their_post_ids.contains(&id)
};
if !peer_has_post {
if !storage.is_deleted(&id)? {
posts_to_send.push(SyncPost {
id,
@ -1826,7 +1889,7 @@ impl ConnectionManager {
visibility,
});
}
} else if should_send && their_post_ids.contains(&id) {
} else {
// They already have the post — send visibility update if we authored it
if post.author == self.our_node_id {
vis_updates_to_send.push(crate::types::VisibilityUpdate {
@ -4714,7 +4777,8 @@ impl ConnectionManager {
&push.post.post,
&push.post.visibility,
);
let _ = storage.set_post_upstream(&push.post.id, &remote_node_id);
let prio = storage.get_post_upstreams(&push.post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&push.post.id, &remote_node_id, prio);
info!(
peer = hex::encode(remote_node_id),
post_id = hex::encode(push.post.id),
@ -4823,6 +4887,43 @@ impl ConnectionManager {
}
}
let stored = stored_entries.len();
// Phase 5: Gather post IDs from manifests for discovery
let our_follows: std::collections::HashSet<NodeId> =
storage.list_follows().unwrap_or_default().into_iter().collect();
let mut discovery_posts: Vec<(PostId, NodeId)> = Vec::new();
for entry in &stored_entries {
let am = &entry.manifest.author_manifest;
let author = am.author;
// Collect post IDs from the manifest's neighborhood
let mut candidate_ids: Vec<PostId> = Vec::new();
candidate_ids.push(am.post_id);
for me in &am.previous_posts {
candidate_ids.push(me.post_id);
}
for me in &am.following_posts {
candidate_ids.push(me.post_id);
}
for pid in candidate_ids {
// Only discover posts from authors we follow
if !our_follows.contains(&author) {
continue;
}
// Only discover posts we don't have locally
if storage.get_post(&pid).ok().flatten().is_some() {
continue;
}
discovery_posts.push((pid, author));
// Cap at 10 posts per manifest push to avoid storms
if discovery_posts.len() >= 10 {
break;
}
}
if discovery_posts.len() >= 10 {
break;
}
}
drop(storage);
// Relay to downstream (best-effort via mesh connections)
for (ds_nid, relay_payload) in &relay_targets {
@ -4833,6 +4934,52 @@ impl ConnectionManager {
}
}
}
// Phase 5: Spawn post discovery task (non-blocking)
if !discovery_posts.is_empty() {
let cm_arc = conn_mgr.clone();
let sender_id = remote_node_id;
tokio::spawn(async move {
let cm = cm_arc.lock().await;
let mut fetched = 0usize;
for (post_id, _author) in &discovery_posts {
if fetched >= 10 { break; }
match cm.send_post_fetch(&sender_id, post_id).await {
Ok(Some(sync_post)) => {
if crate::content::verify_post_id(&sync_post.id, &sync_post.post) {
let storage = cm.storage.lock().await;
if storage.store_post_with_visibility(&sync_post.id, &sync_post.post, &sync_post.visibility).unwrap_or(false) {
// Set upstream + register as downstream
let prio = storage.get_post_upstreams(&sync_post.id).map(|v| v.len() as u8).unwrap_or(0);
let _ = storage.add_post_upstream(&sync_post.id, &sender_id, prio);
// Update last_sync_ms for the author
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let _ = storage.update_follow_last_sync(&sync_post.post.author, now);
drop(storage);
// Register as downstream with the sender
if let Some(pc) = cm.connections_ref().get(&sender_id) {
let reg = crate::protocol::PostDownstreamRegisterPayload { post_id: sync_post.id };
if let Ok(mut send) = pc.connection.open_uni().await {
let _ = write_typed_message(&mut send, MessageType::PostDownstreamRegister, &reg).await;
let _ = send.finish();
}
}
fetched += 1;
}
}
}
_ => {}
}
}
if fetched > 0 {
debug!(discovered = fetched, "ManifestPush post discovery");
}
});
}
drop(cm);
debug!(peer = hex::encode(remote_node_id), stored, relayed = relay_targets.len(), "Received manifest push");
}
@ -5522,7 +5669,7 @@ impl ConnectionManager {
use crate::types::BlobHeaderDiffOp;
// Gather policy + audience data, then drop lock immediately
let (policy, approved_audience, downstream, upstream) = {
let (policy, approved_audience, downstream, upstreams) = {
let storage = self.storage.lock().await;
let policy = storage.get_comment_policy(&payload.post_id)
.ok()
@ -5533,8 +5680,12 @@ impl ConnectionManager {
Some(crate::types::AudienceStatus::Approved),
).unwrap_or_default();
let downstream = storage.get_post_downstream(&payload.post_id).unwrap_or_default();
let upstream = storage.get_post_upstream(&payload.post_id).ok().flatten();
(policy, approved, downstream, upstream)
let upstreams: Vec<NodeId> = storage.get_post_upstreams(&payload.post_id)
.unwrap_or_default()
.into_iter()
.map(|(nid, _)| nid)
.collect();
(policy, approved, downstream, upstreams)
};
// Filter ops using gathered data (no lock held)
@ -5684,9 +5835,11 @@ impl ConnectionManager {
if let Ok(json) = serde_json::to_string(&header) {
let _ = storage.store_blob_header(&payload.post_id, &payload.author, &json, payload.timestamp_ms);
}
// Phase 4: Update last_engagement_ms when engagement arrives via diff
let _ = storage.update_post_last_engagement(&payload.post_id, payload.timestamp_ms);
}
// Collect all targets (downstream + upstream), then send in a single batched task
// Collect all targets (downstream + all upstreams), then send in a single batched task
let mut targets: Vec<iroh::endpoint::Connection> = Vec::new();
for peer_id in downstream {
if peer_id == sender { continue; }
@ -5696,10 +5849,11 @@ impl ConnectionManager {
targets.push(conn);
}
}
if let Some(up) = upstream {
if up != sender {
if let Some(conn) = self.connections.get(&up).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(&up).map(|sc| sc.connection.clone()))
// Phase 6: Try all upstreams, not just one
for up in &upstreams {
if *up != sender {
if let Some(conn) = self.connections.get(up).map(|mc| mc.connection.clone())
.or_else(|| self.sessions.get(up).map(|sc| sc.connection.clone()))
{
targets.push(conn);
}

View file

@ -1740,9 +1740,12 @@ impl Network {
/// Pull posts from a peer (persistent if available, ephemeral otherwise).
pub async fn pull_from_peer(&self, peer_id: &NodeId) -> anyhow::Result<PullStats> {
let conn = self.get_connection(peer_id).await?;
let (our_follows, our_post_ids) = {
let (our_follows, follows_sync) = {
let storage = self.storage.lock().await;
(storage.list_follows()?, storage.list_post_ids()?)
(
storage.list_follows()?,
storage.get_follows_with_last_sync().unwrap_or_default(),
)
};
let (mut send, mut recv) = conn.open_bi().await?;
write_typed_message(
@ -1750,7 +1753,8 @@ impl Network {
MessageType::PullSyncRequest,
&PullSyncRequestPayload {
follows: our_follows,
have_post_ids: our_post_ids,
have_post_ids: vec![], // v4: empty, using since_ms instead
since_ms: follows_sync,
},
)
.await?;
@ -1760,14 +1764,20 @@ impl Network {
anyhow::bail!("expected PullSyncResponse, got {:?}", msg_type);
}
let response: PullSyncResponsePayload = read_payload(&mut recv, 64 * 1024 * 1024).await?;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let storage = self.storage.lock().await;
let mut posts_received = 0;
let mut vis_updates = 0;
for sp in response.posts {
for sp in &response.posts {
if !storage.is_deleted(&sp.id)? && verify_post_id(&sp.id, &sp.post) {
if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? {
posts_received += 1;
}
// Protocol v4: update last_sync_ms for the author
let _ = storage.update_follow_last_sync(&sp.post.author, now_ms);
}
}
for vu in response.visibility_updates {

View file

@ -2501,31 +2501,53 @@ impl Node {
tokio::spawn(async move { network.run_accept_loop().await })
}
/// Start pull cycle: every interval_secs, pull from connected peers + prefetch blobs.
pub fn start_pull_cycle(self: &Arc<Self>, interval_secs: u64) -> tokio::task::JoinHandle<()> {
/// Start pull cycle: Protocol v4 tiered pull — 60s ticks, full pull on first tick,
/// then only pull for stale authors (last_sync_ms > 4 hours old).
pub fn start_pull_cycle(self: &Arc<Self>, _interval_secs: u64) -> tokio::task::JoinHandle<()> {
let node = Arc::clone(self);
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
tokio::time::interval(std::time::Duration::from_secs(60));
let mut is_first_tick = true;
loop {
interval.tick().await;
match node.network.pull_from_all().await {
Ok(stats) => {
if stats.posts_received > 0 {
tracing::debug!(
posts = stats.posts_received,
peers = stats.peers_pulled,
"Pull cycle complete"
);
// Prefetch blobs for newly received posts
let peers = node.network.conn_handle().connected_peers().await;
for peer_id in peers {
node.prefetch_blobs_from_peer(&peer_id).await;
if is_first_tick {
// Full pull on startup
let _ = node.network.pull_from_all().await;
is_first_tick = false;
// Prefetch after initial sync
let peers = node.network.conn_handle().connected_peers().await;
for peer_id in peers {
node.prefetch_blobs_from_peer(&peer_id).await;
}
continue;
}
// Tiered: only pull for stale authors (4-hour default)
let stale_authors = {
let storage = node.storage.lock().await;
storage.get_stale_follows(4 * 3600 * 1000).unwrap_or_default()
};
if stale_authors.is_empty() {
continue; // Most ticks skip — no stale authors
}
// Find a connected peer and pull
let peers = node.network.conn_handle().connected_peers().await;
if let Some(peer_id) = peers.first() {
match node.network.conn_handle().pull_from_peer(peer_id).await {
Ok(stats) => {
if stats.posts_received > 0 {
tracing::debug!(
posts = stats.posts_received,
"Tiered pull complete"
);
node.prefetch_blobs_from_peer(peer_id).await;
}
}
}
Err(e) => {
tracing::debug!(error = %e, "Pull cycle failed");
Err(e) => tracing::debug!(error = %e, "Tiered pull failed"),
}
}
}
@ -3495,12 +3517,12 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send upstream (toward author)
let upstream = {
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
let upstreams = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
if let Some(up) = upstream {
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
@ -3609,12 +3631,12 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
// Also send upstream (toward author)
let upstream = {
// Also send to all upstreams (toward author) — Phase 6 multi-upstream
let upstreams = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
if let Some(up) = upstream {
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
@ -3653,11 +3675,12 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
let upstream = {
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
if let Some(up) = upstream {
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
@ -3693,11 +3716,12 @@ impl Node {
timestamp_ms: now,
};
network.propagate_engagement_diff(&post_id, &diff, &our_node_id).await;
let upstream = {
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
if let Some(up) = upstream {
for (up, _prio) in upstreams {
let _ = network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
}
@ -3919,11 +3943,12 @@ impl Node {
timestamp_ms: now,
};
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
let upstream = {
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
if let Some(up) = upstream {
for (up, _prio) in upstreams {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}
@ -4038,11 +4063,12 @@ impl Node {
timestamp_ms: now,
};
self.network.propagate_engagement_diff(&post_id, &diff, &self.node_id).await;
let upstream = {
// Phase 6: send to all upstreams
let upstreams = {
let storage = self.storage.lock().await;
storage.get_post_upstream(&post_id).ok().flatten()
storage.get_post_upstreams(&post_id).unwrap_or_default()
};
if let Some(up) = upstream {
for (up, _prio) in upstreams {
let _ = self.network.send_to_peer_uni(&up, crate::protocol::MessageType::BlobHeaderDiff, &diff).await;
}

View file

@ -201,8 +201,12 @@ pub struct NodeListUpdatePayload {
pub struct PullSyncRequestPayload {
/// Our follows (for the responder to filter)
pub follows: Vec<NodeId>,
/// Post IDs we already have
/// Post IDs we already have (backward compat — empty for v4 senders)
#[serde(default)]
pub have_post_ids: Vec<PostId>,
/// Protocol v4: per-author timestamps (Vec of tuples for serde compat)
#[serde(default)]
pub since_ms: Vec<(NodeId, u64)>,
}
/// Pull-based post sync response

View file

@ -606,6 +606,47 @@ impl Storage {
)?;
}
// Protocol v4: Add last_sync_ms to follows if missing
let has_last_sync = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('follows') WHERE name='last_sync_ms'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_last_sync == 0 {
self.conn.execute_batch(
"ALTER TABLE follows ADD COLUMN last_sync_ms INTEGER NOT NULL DEFAULT 0;"
)?;
}
// Protocol v4: Add last_engagement_ms and last_check_ms to posts if missing
let has_last_engagement = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('posts') WHERE name='last_engagement_ms'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_last_engagement == 0 {
self.conn.execute_batch(
"ALTER TABLE posts ADD COLUMN last_engagement_ms INTEGER NOT NULL DEFAULT 0;
ALTER TABLE posts ADD COLUMN last_check_ms INTEGER NOT NULL DEFAULT 0;"
)?;
}
// Protocol v4 Phase 6: Migrate post_upstream to multi-upstream (3 max)
let has_priority = self.conn.prepare(
"SELECT COUNT(*) FROM pragma_table_info('post_upstream') WHERE name='priority'"
)?.query_row([], |row| row.get::<_, i64>(0))?;
if has_priority == 0 {
self.conn.execute_batch(
"ALTER TABLE post_upstream RENAME TO post_upstream_old;
CREATE TABLE post_upstream (
post_id BLOB NOT NULL,
peer_node_id BLOB NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
registered_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (post_id, peer_node_id)
);
INSERT INTO post_upstream (post_id, peer_node_id, priority, registered_at)
SELECT post_id, peer_node_id, 0, 0 FROM post_upstream_old;
DROP TABLE post_upstream_old;"
)?;
}
Ok(())
}
@ -880,6 +921,104 @@ impl Storage {
Ok(ids)
}
// ---- Protocol v4: Per-Author Sync Tracking ----
/// Update the last_sync_ms timestamp for a followed author.
pub fn update_follow_last_sync(&self, node_id: &NodeId, timestamp_ms: u64) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE follows SET last_sync_ms = ?2 WHERE node_id = ?1",
params![node_id.as_slice(), timestamp_ms as i64],
)?;
Ok(())
}
/// Get all follows with their last_sync_ms timestamps.
pub fn get_follows_with_last_sync(&self) -> anyhow::Result<Vec<(NodeId, u64)>> {
let mut stmt = self.conn.prepare("SELECT node_id, last_sync_ms FROM follows")?;
let rows = stmt.query_map([], |row| {
let bytes: Vec<u8> = row.get(0)?;
let ts: i64 = row.get(1)?;
Ok((bytes, ts))
})?;
let mut result = Vec::new();
for row in rows {
let (bytes, ts) = row?;
result.push((blob_to_nodeid(bytes)?, ts as u64));
}
Ok(result)
}
/// Get follows whose last_sync_ms is older than max_age_ms from now.
pub fn get_stale_follows(&self, max_age_ms: u64) -> anyhow::Result<Vec<NodeId>> {
let now = now_ms() as u64;
let cutoff = now.saturating_sub(max_age_ms) as i64;
let mut stmt = self.conn.prepare(
"SELECT node_id FROM follows WHERE last_sync_ms < ?1"
)?;
let rows = stmt.query_map(params![cutoff], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
})?;
let mut ids = Vec::new();
for row in rows {
ids.push(blob_to_nodeid(row?)?);
}
Ok(ids)
}
/// Get posts due for engagement check using tiered frequency:
/// - Active (engagement within 72h): check every 5 min
/// - Recent (engagement within 14d): check every 1 hour
/// - Aging (engagement within 30d): check every 4 hours
/// - Cold (older): check every 24 hours
pub fn get_posts_due_for_engagement_check(&self) -> anyhow::Result<Vec<PostId>> {
let now = now_ms() as u64;
let h72 = now.saturating_sub(72 * 3600 * 1000) as i64;
let d14 = now.saturating_sub(14 * 24 * 3600 * 1000) as i64;
let d30 = now.saturating_sub(30 * 24 * 3600 * 1000) as i64;
let now_i64 = now as i64;
let mut stmt = self.conn.prepare(
"SELECT id FROM posts WHERE last_check_ms < ?1 - CASE
WHEN last_engagement_ms > ?2 THEN 300000
WHEN last_engagement_ms > ?3 THEN 3600000
WHEN last_engagement_ms > ?4 THEN 14400000
ELSE 86400000
END"
)?;
let rows = stmt.query_map(params![now_i64, h72, d14, d30], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(bytes)
})?;
let mut ids = Vec::new();
for row in rows {
let bytes = row?;
if bytes.len() == 32 {
let mut id = [0u8; 32];
id.copy_from_slice(&bytes);
ids.push(id);
}
}
Ok(ids)
}
/// Update the last_check_ms timestamp for a post.
pub fn update_post_last_check(&self, post_id: &PostId, timestamp_ms: u64) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE posts SET last_check_ms = ?2 WHERE id = ?1",
params![post_id.as_slice(), timestamp_ms as i64],
)?;
Ok(())
}
/// Update the last_engagement_ms timestamp for a post.
pub fn update_post_last_engagement(&self, post_id: &PostId, timestamp_ms: u64) -> anyhow::Result<()> {
self.conn.execute(
"UPDATE posts SET last_engagement_ms = ?2 WHERE id = ?1",
params![post_id.as_slice(), timestamp_ms as i64],
)?;
Ok(())
}
// ---- Peers ----
/// Add or update a peer (backward-compat: no addresses)
@ -3976,30 +4115,75 @@ impl Storage {
Ok(())
}
// --- Engagement: post_upstream ---
// --- Engagement: post_upstream (multi-upstream, 3 max) ---
/// Set the upstream peer for a post (who we got it from).
pub fn set_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
/// Add an upstream peer for a post. INSERT OR IGNORE, cap at 3 per post.
pub fn add_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId, priority: u8) -> anyhow::Result<()> {
// Check current count
let count: i64 = self.conn.prepare(
"SELECT COUNT(*) FROM post_upstream WHERE post_id = ?1"
)?.query_row(params![post_id.as_slice()], |row| row.get(0))?;
if count >= 3 {
return Ok(()); // Already at cap
}
let now = now_ms();
self.conn.execute(
"INSERT INTO post_upstream (post_id, peer_node_id) VALUES (?1, ?2)
ON CONFLICT(post_id) DO UPDATE SET peer_node_id = excluded.peer_node_id",
"INSERT OR IGNORE INTO post_upstream (post_id, peer_node_id, priority, registered_at)
VALUES (?1, ?2, ?3, ?4)",
params![post_id.as_slice(), peer_node_id.as_slice(), priority as i64, now],
)?;
Ok(())
}
/// Get all upstream peers for a post, ordered by priority ASC (0 = primary).
pub fn get_post_upstreams(&self, post_id: &PostId) -> anyhow::Result<Vec<(NodeId, u8)>> {
let mut stmt = self.conn.prepare(
"SELECT peer_node_id, priority FROM post_upstream WHERE post_id = ?1 ORDER BY priority ASC"
)?;
let rows = stmt.query_map(params![post_id.as_slice()], |row| {
let bytes: Vec<u8> = row.get(0)?;
let prio: i64 = row.get(1)?;
Ok((bytes, prio as u8))
})?;
let mut result = Vec::new();
for row in rows {
let (bytes, prio) = row?;
if let Ok(nid) = <[u8; 32]>::try_from(bytes.as_slice()) {
result.push((nid, prio));
}
}
Ok(result)
}
/// Get the primary (lowest priority) upstream peer for a post.
/// Backward-compatible wrapper for code that only needs a single upstream.
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
let upstreams = self.get_post_upstreams(post_id)?;
Ok(upstreams.into_iter().next().map(|(nid, _)| nid))
}
/// Remove a specific upstream peer for a post.
pub fn remove_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
self.conn.execute(
"DELETE FROM post_upstream WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Get the upstream peer for a post.
pub fn get_post_upstream(&self, post_id: &PostId) -> anyhow::Result<Option<NodeId>> {
let result = self.conn.query_row(
"SELECT peer_node_id FROM post_upstream WHERE post_id = ?1",
/// Promote an upstream peer to primary (priority 0), pushing others up.
pub fn promote_post_upstream(&self, post_id: &PostId, peer_node_id: &NodeId) -> anyhow::Result<()> {
// Shift all priorities up by 1
self.conn.execute(
"UPDATE post_upstream SET priority = priority + 1 WHERE post_id = ?1",
params![post_id.as_slice()],
|row| row.get::<_, Vec<u8>>(0),
);
match result {
Ok(bytes) => Ok(bytes.try_into().ok()),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
)?;
// Set the promoted peer to priority 0
self.conn.execute(
"UPDATE post_upstream SET priority = 0 WHERE post_id = ?1 AND peer_node_id = ?2",
params![post_id.as_slice(), peer_node_id.as_slice()],
)?;
Ok(())
}
/// Count downstream peers for a post.