v0.3.6: Active CDN replication, device roles, budgets, tombstones, engagement fix, DOS hardening

Active CDN replication:
- All devices proactively replicate recent posts (<72h, <2 replicas) to peers
- Target priority: desktops (300) > anchors (200) > phones (100) + cache_pressure
- ReplicationRequest/Response (0xE1/0xE2) wire messages
- 10-min cycle, 2-min initial delay, cap 20 posts per request
- Graceful with small networks (1 peer = 1 replica, 0 peers = silent skip)

Device roles & budgets:
- Intermittent (phone), Available (desktop), Persistent (anchor)
- Advertised in InitialExchange, stored per-peer
- Replication budget: phones 100MB/hr, desktops/anchors 200MB/hr
- Delivery budget: phones 1GB/hr, desktops 2GB/hr, anchors 1GB/hr
- Hourly auto-reset, enforcement on blob serving

Cache management:
- 1GB default cache limit, configurable in settings UI
- Eviction cycle activated (was implemented but never started)
- Share-link priority boost (+100 for 3+ downstream)
- Cache pressure score (0-255) for replication targeting

Engagement distribution fix:
- BlobHeader JSON rebuilt after BlobHeaderDiff ops
- Previously reactions/comments stored in tables but header stayed stale

Tombstone system:
- deleted_at column on reactions and comments
- Tombstones propagate through pull sync (additive merge respects timestamps)
- UI queries filter WHERE deleted_at IS NULL

Persistent notifications:
- seen_engagement and seen_messages tables replace in-memory Sets
- Only notify on genuinely unseen content, survives restarts

DOS hardening:
- BlobHeaderDiff fan-out: single batched task, max 10 concurrent via JoinSet
- Blob prefetch: cap 20 per cycle, newest first
- PostDownstreamRegister: cap 50 per sync
- Delivery budget enforcement on BlobRequest handler
- Pull preference: non-anchors first to preserve anchor delivery budget

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-03-20 21:00:28 -04:00
parent b7f2d369fa
commit a7e632de88
16 changed files with 1254 additions and 158 deletions

View file

@ -14,8 +14,8 @@ use crate::network::Network;
use crate::storage::Storage;
use crate::types::{
Attachment, AudienceDirection, AudienceRecord, AudienceStatus, Circle, DeleteRecord,
DeviceProfile, NodeId, PeerRecord, PeerSlotKind, PeerWithAddress, Post, PostId, PostVisibility,
PublicProfile, ReachMethod, RevocationMode, SessionReachMethod, SocialRelation,
DeviceProfile, DeviceRole, NodeId, PeerRecord, PeerSlotKind, PeerWithAddress, Post, PostId,
PostVisibility, PublicProfile, ReachMethod, RevocationMode, SessionReachMethod, SocialRelation,
SocialRouteEntry, SocialStatus, VisibilityIntent, VisibilityUpdate, WormResult,
};
@ -36,6 +36,12 @@ pub struct Node {
pub activity_log: Arc<std::sync::Mutex<ActivityLog>>,
pub last_rebalance_ms: Arc<AtomicU64>,
pub last_anchor_register_ms: Arc<AtomicU64>,
/// CDN replication budget: bytes remaining we're willing to pull and cache this hour
replication_budget_remaining: Arc<AtomicU64>,
/// CDN delivery budget: bytes remaining we're willing to serve this hour
delivery_budget_remaining: Arc<AtomicU64>,
/// Last budget reset timestamp (ms)
budget_last_reset_ms: Arc<AtomicU64>,
}
impl Node {
@ -421,6 +427,19 @@ impl Node {
}
}
// Initialize CDN replication budgets based on device role
let role = network.device_role();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let replication_budget_remaining = Arc::new(AtomicU64::new(role.replication_limit()));
let delivery_budget_remaining = Arc::new(AtomicU64::new(role.delivery_limit()));
let budget_last_reset_ms = Arc::new(AtomicU64::new(now));
// Set delivery budget on blob store (shared with ConnectionManager)
blob_store.set_delivery_budget(role.delivery_limit());
Ok(Self {
data_dir,
storage,
@ -433,6 +452,9 @@ impl Node {
activity_log,
last_rebalance_ms,
last_anchor_register_ms,
replication_budget_remaining,
delivery_budget_remaining,
budget_last_reset_ms,
})
}
@ -454,6 +476,62 @@ impl Node {
self.secret_seed
}
// --- CDN Replication Budget ---
/// Reset budgets if an hour has elapsed since last reset.
fn maybe_reset_budgets(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let last = self.budget_last_reset_ms.load(AtomicOrdering::Relaxed);
if now.saturating_sub(last) >= 3_600_000 {
let role = self.network.device_role();
self.replication_budget_remaining.store(role.replication_limit(), AtomicOrdering::Relaxed);
self.delivery_budget_remaining.store(role.delivery_limit(), AtomicOrdering::Relaxed);
self.budget_last_reset_ms.store(now, AtomicOrdering::Relaxed);
debug!(role = %role, "CDN budgets reset for new hour");
}
}
/// Try to consume replication budget. Returns true if within budget.
pub fn consume_replication_budget(&self, bytes: u64) -> bool {
self.maybe_reset_budgets();
let prev = self.replication_budget_remaining.fetch_update(
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
|current| {
if current >= bytes { Some(current - bytes) } else { None }
},
);
prev.is_ok()
}
/// Try to consume delivery budget. Returns true if within budget.
pub fn consume_delivery_budget(&self, bytes: u64) -> bool {
self.maybe_reset_budgets();
let prev = self.delivery_budget_remaining.fetch_update(
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
|current| {
if current >= bytes { Some(current - bytes) } else { None }
},
);
prev.is_ok()
}
/// Get remaining replication budget bytes.
pub fn replication_budget_remaining(&self) -> u64 {
self.maybe_reset_budgets();
self.replication_budget_remaining.load(AtomicOrdering::Relaxed)
}
/// Get remaining delivery budget bytes.
pub fn delivery_budget_remaining(&self) -> u64 {
self.maybe_reset_budgets();
self.delivery_budget_remaining.load(AtomicOrdering::Relaxed)
}
// ---- Identity export/import ----
pub fn export_identity_hex(&self) -> anyhow::Result<String> {
@ -1120,21 +1198,27 @@ impl Node {
}
/// Prefetch blobs for recently synced posts from a peer.
/// Queries storage for posts with attachments missing from the local blob store,
/// then fetches each missing blob. Runs outside any locks.
/// Scans recent posts (newest first) for missing blobs, caps at 20 per cycle.
/// Runs outside any locks.
const MAX_PREFETCH_PER_CYCLE: usize = 20;
pub async fn prefetch_blobs_from_peer(&self, peer_id: &NodeId) {
// Gather posts with missing blobs
// Gather posts with missing blobs, newest first, capped
let missing: Vec<(PostId, NodeId, Vec<crate::types::Attachment>)> = {
let storage = self.storage.lock().await;
let post_ids = storage.list_post_ids().unwrap_or_default();
let mut result = Vec::new();
let mut total_missing = 0usize;
// list_post_ids returns newest first typically; cap total missing blobs
for pid in post_ids {
if total_missing >= Self::MAX_PREFETCH_PER_CYCLE { break; }
if let Ok(Some(post)) = storage.get_post(&pid) {
let missing_atts: Vec<_> = post.attachments.iter()
.filter(|a| !self.blob_store.has(&a.cid))
.cloned()
.collect();
if !missing_atts.is_empty() {
total_missing += missing_atts.len();
result.push((pid, post.author, missing_atts));
}
}
@ -1149,6 +1233,7 @@ impl Node {
let mut fetched = 0usize;
for (post_id, author, attachments) in &missing {
for att in attachments {
if fetched >= Self::MAX_PREFETCH_PER_CYCLE { break; }
match self.fetch_blob_with_fallback(
&att.cid, post_id, author, &att.mime_type, 0,
).await {
@ -1219,8 +1304,11 @@ impl Node {
Ok(data)
}
/// Fetch a blob with CDN-aware cascade:
/// 1. Local → 2. Existing upstream → 3. Lateral N0-N2 peers → 4. Author → 5. Redirect peers
/// Fetch a blob with CDN-aware cascade, preferring non-anchor sources to save anchor
/// delivery budget:
/// 1. Local → 2. Existing upstream → 3. Lateral peers (non-anchor first)
/// → 4. Replicas → 5. Author → 6. Redirect peers
/// Anchors are deprioritized at each step via storage-level ordering.
pub async fn fetch_blob_with_fallback(
&self,
cid: &[u8; 32],
@ -1253,13 +1341,14 @@ impl Node {
}
// 3. Lateral N0-N2: mesh peers + N2 peers who have the author's posts
// (sorted by get_lateral_blob_sources: non-anchors first)
let lateral_sources = {
let storage = self.storage.lock().await;
storage.get_lateral_blob_sources(author, post_id).unwrap_or_default()
};
for lateral in lateral_sources {
if lateral == *author {
continue; // Author is step 4
continue; // Author tried separately below
}
match self.network.fetch_blob_full(cid, &lateral).await {
Ok((Some(data), response)) => {
@ -1282,27 +1371,7 @@ impl Node {
}
}
// 4. Try author (last resort for direct)
match self.fetch_blob_from_peer(cid, author, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(error = %e, "blob fetch from author failed"),
}
// 5. Try redirect peers (from any step that returned cdn_redirect_peers)
for rp in &redirect_peers {
if let Ok(nid_bytes) = hex::decode(&rp.n) {
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
match self.fetch_blob_from_peer(cid, &nid, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(peer = &rp.n, error = %e, "redirect blob fetch failed"),
}
}
}
}
// 6. Try replica peers as final fallback (1-hour staleness window)
// 4. Try replica peers (before author — replicas are often closer/cheaper)
let replicas = {
let storage = self.storage.lock().await;
storage.get_replica_peers(post_id, 3_600_000)?
@ -1315,6 +1384,26 @@ impl Node {
}
}
// 5. Try author
match self.fetch_blob_from_peer(cid, author, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(error = %e, "blob fetch from author failed"),
}
// 6. Try redirect peers (from any step that returned cdn_redirect_peers)
for rp in &redirect_peers {
if let Ok(nid_bytes) = hex::decode(&rp.n) {
if let Ok(nid) = <[u8; 32]>::try_from(nid_bytes.as_slice()) {
match self.fetch_blob_from_peer(cid, &nid, post_id, author, mime_type, created_at).await {
Ok(Some(data)) => return Ok(Some(data)),
Ok(None) => {}
Err(e) => warn!(peer = &rp.n, error = %e, "redirect blob fetch failed"),
}
}
}
}
Ok(None)
}
@ -1722,6 +1811,99 @@ impl Node {
storage.set_setting(key, value)
}
// ---- Cache stats & pressure ----
/// Get cache statistics: (used_bytes, max_bytes, blob_count).
/// max_bytes comes from the `cache_size_bytes` setting (default 1 GB, 0 = unlimited).
pub async fn get_cache_stats(&self) -> anyhow::Result<(u64, u64, u64)> {
let storage = self.storage.lock().await;
let used = storage.total_blob_bytes()?;
let count = storage.count_blobs()?;
let max_str = storage.get_setting("cache_size_bytes")?.unwrap_or_default();
let max: u64 = max_str.parse().unwrap_or(1_073_741_824);
Ok((used, max, count))
}
/// Compute cache pressure score (0-255).
/// 0 = no pressure (plenty of room or cache empty).
/// 255 = maximum pressure (lowest-priority blob is >72 h old).
/// Scales linearly: 0 h → 0, 36 h → 128, 72 h → 255.
pub async fn compute_cache_pressure(&self) -> anyhow::Result<u8> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let staleness_ms = 3600 * 1000;
let (candidates, follows, audience_members) = {
let storage = self.storage.lock().await;
let candidates = storage.get_eviction_candidates(staleness_ms)?;
let follows = storage.list_follows().unwrap_or_default();
let audience = storage.list_audience_members().unwrap_or_default();
(candidates, follows, audience)
};
if candidates.is_empty() {
return Ok(255); // Empty cache = max willingness to accept
}
// Filter to non-elevated blobs (not pinned, not own content, not followed author)
let non_elevated: Vec<_> = candidates.iter().filter(|c| {
!c.pinned && c.author != self.node_id && !follows.contains(&c.author)
}).collect();
if non_elevated.is_empty() {
return Ok(255); // All blobs are elevated — plenty of room for new content
}
// Find the lowest priority (oldest/least-valuable) blob
let mut min_priority = f64::MAX;
let mut min_created_at = u64::MAX;
for c in &non_elevated {
let priority = self.compute_blob_priority(c, &follows, &audience_members, now);
if priority < min_priority {
min_priority = priority;
min_created_at = c.created_at;
}
}
// Scale based on age of the oldest non-elevated blob
let age_hours = now.saturating_sub(min_created_at) as f64 / (3600.0 * 1000.0);
let pressure = if age_hours >= 72.0 {
255
} else {
((age_hours / 72.0) * 255.0) as u8
};
Ok(pressure)
}
// ---- Seen engagement tracking ----
/// Get seen engagement counts for a post.
pub async fn get_seen_engagement(&self, post_id: &PostId) -> anyhow::Result<(u32, u32)> {
let storage = self.storage.lock().await;
storage.get_seen_engagement(post_id)
}
/// Mark a post's engagement as seen (upsert).
pub async fn set_seen_engagement(&self, post_id: &PostId, react_count: u32, comment_count: u32) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.set_seen_engagement(post_id, react_count, comment_count)
}
/// Get last-read timestamp for a conversation partner.
pub async fn get_last_read_message(&self, partner_id: &NodeId) -> anyhow::Result<u64> {
let storage = self.storage.lock().await;
storage.get_last_read_message(partner_id)
}
/// Mark a conversation as read up to the given timestamp.
pub async fn set_last_read_message(&self, partner_id: &NodeId, timestamp_ms: u64) -> anyhow::Result<()> {
let storage = self.storage.lock().await;
storage.set_last_read_message(partner_id, timestamp_ms)
}
// ---- Delete / Revocation ----
pub async fn delete_post(&self, post_id: &PostId) -> anyhow::Result<()> {
@ -3295,6 +3477,7 @@ impl Node {
post_id,
timestamp_ms: now,
encrypted_payload,
deleted_at: None,
};
// Store locally
@ -3409,6 +3592,7 @@ impl Node {
content,
timestamp_ms: now,
signature,
deleted_at: None,
};
let storage = self.storage.lock().await;
@ -4001,6 +4185,16 @@ pub fn compute_blob_priority_standalone(
) -> f64 {
let pin_boost = if candidate.pinned { 1000.0 } else { 0.0 };
// Share-link popularity boost: high downstream count indicates the blob
// has been shared via share links and is actively being served to others.
let share_boost = if candidate.downstream_count >= 3 {
100.0
} else if candidate.downstream_count >= 1 {
50.0 * candidate.downstream_count as f64 / 3.0
} else {
0.0
};
let relationship = if candidate.author == *our_node_id {
5.0
} else if follows.contains(&candidate.author) && audience_members.contains(&candidate.author) {
@ -4022,7 +4216,147 @@ pub fn compute_blob_priority_standalone(
let copies_factor = 1.0 / (candidate.peer_copies as f64 + 1.0);
pin_boost + (relationship * heart_recency * freshness * copies_factor)
pin_boost + share_boost + (relationship * heart_recency * freshness * copies_factor)
}
// --- Active Replication Cycle ---
impl Node {
/// Start the active replication cycle: periodically ask peers to hold our
/// under-replicated recent content. Only Available/Persistent devices initiate.
pub fn start_replication_cycle(self: &Arc<Self>, interval_secs: u64) -> tokio::task::JoinHandle<()> {
let node = Arc::clone(self);
tokio::spawn(async move {
// Wait 2 minutes before first cycle (let connections establish)
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
node.run_replication_check().await;
}
})
}
/// Single replication check iteration.
async fn run_replication_check(&self) {
// All devices initiate replication — phones need their content replicated
// before they go to sleep.
// 1. Get own posts < 72h old
let seventy_two_hours_ms = 72u64 * 3600 * 1000;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let since_ms = now_ms.saturating_sub(seventy_two_hours_ms);
let under_replicated: Vec<PostId> = {
let storage = self.storage.lock().await;
let recent_ids = match storage.get_own_recent_post_ids(&self.node_id, since_ms) {
Ok(ids) => ids,
Err(e) => {
debug!(error = %e, "Replication: failed to get own recent posts");
return;
}
};
// 3. Filter to under-replicated (< 2 downstream)
let mut needs_replication = Vec::new();
for pid in &recent_ids {
match storage.get_post_downstream_count(pid) {
Ok(count) if count < 2 => {
needs_replication.push(*pid);
}
_ => {}
}
}
needs_replication
};
// 4. If none need replication, skip silently
if under_replicated.is_empty() {
return;
}
// 5. Find connected Available/Persistent peers
let connected = self.network.connected_peers().await;
if connected.is_empty() {
debug!("No peers for replication");
return;
}
// Priority: Available (desktops) > Persistent (anchors) > Intermittent (phones)
let role_priority = |role: &DeviceRole| -> u16 {
match role {
DeviceRole::Available => 300, // desktops — best replication targets
DeviceRole::Persistent => 200, // anchors — good but save for web
DeviceRole::Intermittent => 100, // phones — last resort but still useful
}
};
let suitable_peers: Vec<(NodeId, u16)> = {
let storage = self.storage.lock().await;
let mut candidates = Vec::new();
for peer_id in &connected {
if *peer_id == self.node_id { continue; }
let role_str = storage.get_peer_device_role(peer_id)
.ok()
.flatten()
.unwrap_or_default();
let role = DeviceRole::from_str_label(&role_str);
let pressure = storage.get_peer_cache_pressure(peer_id)
.ok()
.flatten()
.unwrap_or(128) as u16;
// Combined score: role priority + cache pressure
let score = role_priority(&role) + pressure;
candidates.push((*peer_id, score));
}
candidates
};
if suitable_peers.is_empty() {
debug!("No peers available for replication");
return;
}
// Pick best candidate (highest combined score)
let best_peer = suitable_peers
.iter()
.max_by_key(|(_, score)| *score)
.map(|(id, _)| *id)
.unwrap();
// 7. Cap at 20 post IDs per request, one request per cycle
let batch: Vec<PostId> = under_replicated.into_iter().take(20).collect();
let batch_len = batch.len();
// 8. Send ReplicationRequest
match self.network.send_replication_request(&best_peer, batch, 128).await {
Ok(accepted) => {
if accepted.is_empty() {
debug!(
peer = hex::encode(best_peer),
"Replication: peer rejected all posts"
);
} else {
debug!(
peer = hex::encode(best_peer),
accepted = accepted.len(),
requested = batch_len,
"Replication: peer accepted posts"
);
}
}
Err(e) => {
debug!(
peer = hex::encode(best_peer),
error = %e,
"Replication: request failed"
);
}
}
}
}
#[cfg(test)]
@ -4050,6 +4384,7 @@ mod tests {
last_accessed_at,
pinned,
peer_copies,
downstream_count: 0,
}
}