itsgoin/crates/core/src/network.rs
Scott Reimers 60463d1817 Phase 2d (0.6.1-beta): route manifest + blob ops through file_holders
Switch ALL propagation-decision reads to the flat holder set.

push_manifest_to_downstream now targets file_holders instead of
blob_downstream. ManifestPush receive-side relay likewise — known
holders fan out to up to 5 most-recent peers instead of a directional
tree.

Blob delete notices: single flat fan-out to file_holders; the legacy
upstream_node tree-healing field is emitted as None (wire-stable via
serde default) and ignored on receive — the post-0.6 flat model
doesn't need sender-role distinction. send_blob_delete_notices keeps
its Option<&Upstream> parameter as an unused placeholder for signature
stability with the call sites in this commit.

Other reads migrated:
- blob fetch cascade: step 2 now tries "known holders" (up to 5)
  instead of a single upstream
- manifest refresh: downstream_count reported from file_holder_count
- web/http post holder enumeration
- Worm search post/blob holder fallback (both connection.rs paths)
- DeleteRecord fan-out rewires to file_holders
- Under-replication replication check: < 2 holders

Storage additions:
- get_file_holder_count(file_id)
- remove_file_holder(file_id, peer_id)

Legacy upstream/downstream writes are still happening from Phase 2b;
those + the tables themselves go in 2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 21:09:45 -04:00

2471 lines
100 KiB
Rust

use std::collections::HashSet;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use crate::activity::{ActivityCategory, ActivityLevel, ActivityLog};
use crate::blob::BlobStore;
use crate::connection::{initial_exchange_accept, initial_exchange_connect, ConnHandle, ConnectionActor, ConnectionManager, ExchangeResult};
use crate::content::verify_post_id;
use crate::protocol::{
read_message_type, read_payload, write_typed_message, AudienceRequestPayload,
AudienceResponsePayload, BlobRequestPayload, BlobResponsePayload, DeleteRecordPayload,
MessageType, PostNotificationPayload, PostPushPayload, ProfileUpdatePayload,
PullSyncRequestPayload, PullSyncResponsePayload, RefuseRedirectPayload,
SocialAddressUpdatePayload, SocialDisconnectNoticePayload, SyncPost, ALPN_V2,
};
use crate::storage::StoragePool;
use crate::types::{
DeleteRecord, DeviceProfile, DeviceRole, NodeId, PeerSlotKind, PeerWithAddress, Post, PostId,
PostVisibility, PublicProfile, SessionReachMethod, WormResult,
};
/// The network layer: manages the iroh endpoint and mesh connections
pub struct Network {
endpoint: iroh::Endpoint,
storage: Arc<StoragePool>,
our_node_id: NodeId,
is_anchor: Arc<AtomicBool>,
conn_mgr: Arc<Mutex<ConnectionManager>>,
/// Actor-based handle (Phase 1+): replaces conn_mgr.lock() at call sites
conn_handle: ConnHandle,
/// Growth loop signal sender (set by start_growth_loop)
growth_tx: tokio::sync::Mutex<Option<tokio::sync::mpsc::Sender<()>>>,
activity_log: Arc<std::sync::Mutex<ActivityLog>>,
/// UPnP mapping result (None if no mapping or on mobile)
upnp_mapping: Option<crate::upnp::UpnpMapping>,
/// Whether UPnP TCP mapping succeeded (for HTTP serving)
has_upnp_tcp: bool,
/// Whether this node has a public IPv6 address
has_public_v6: bool,
/// Stable bind address (from --bind flag), passed to ConnectionManager for anchor advertised address
bind_addr: Option<SocketAddr>,
/// CDN replication role: determines budget limits and pull ordering
device_role: DeviceRole,
/// True if an anchor reported this identity is already connected from elsewhere
pub duplicate_detected: Arc<AtomicBool>,
}
fn is_public_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
!v4.is_loopback()
&& !v4.is_private()
&& !v4.is_link_local()
&& !v4.is_broadcast()
&& !v4.is_unspecified()
}
IpAddr::V6(v6) => !v6.is_loopback() && !v6.is_unspecified(),
}
}
/// Filter out addresses that are never useful to share (loopback, link-local, unspecified,
/// Docker bridge). Keeps common LAN addresses (192.168.x, 10.x) for same-WiFi discovery.
/// Excludes 172.17-31.x (Docker/container bridges) to avoid topology disclosure.
pub(crate) fn is_shareable_addr(addr: &SocketAddr) -> bool {
match addr.ip() {
IpAddr::V4(v4) => {
if v4.is_loopback() || v4.is_link_local() || v4.is_unspecified() {
return false;
}
// Exclude Docker bridge range (172.17.0.0 - 172.31.255.255)
let octets = v4.octets();
if octets[0] == 172 && octets[1] >= 17 {
return false;
}
true
}
IpAddr::V6(v6) => !v6.is_loopback() && !v6.is_unspecified(),
}
}
/// Filter to only globally-routable addresses — used for relay introductions
/// that cross network boundaries (Docker bridge IPs, private LANs are useless here).
pub(crate) fn is_publicly_routable(addr: &SocketAddr) -> bool {
is_public_ip(addr.ip())
}
impl Network {
pub async fn new(
secret_key: iroh::SecretKey,
storage: Arc<StoragePool>,
bind_addr: Option<SocketAddr>,
secret_seed: [u8; 32],
blob_store: Arc<BlobStore>,
profile: DeviceProfile,
activity_log: Arc<std::sync::Mutex<ActivityLog>>,
) -> anyhow::Result<Self> {
let mut builder = iroh::Endpoint::builder()
.secret_key(secret_key)
.relay_mode(iroh::RelayMode::Disabled)
.alpns(vec![ALPN_V2.to_vec()])
.clear_address_lookup(); // Remove default pkarr + DNS (no dns.iroh.link publishing)
// mDNS LAN discovery only: enables automatic peer discovery on local network
builder = builder.address_lookup(
iroh::address_lookup::MdnsAddressLookupBuilder::default(),
);
if let Some(addr) = bind_addr {
builder = builder
.clear_ip_transports()
.bind_addr(addr)
.map_err(|e| anyhow::anyhow!("{}", e))?;
}
let endpoint = builder.bind().await?;
let our_node_id = *endpoint.id().as_bytes();
// Best-effort UPnP port mapping (desktop only, skip if --bind was used)
let is_mobile = cfg!(target_os = "android") || cfg!(target_os = "ios");
let upnp_mapping = if !is_mobile && bind_addr.is_none() {
let bound_port = endpoint.bound_sockets().first()
.map(|s| s.port()).unwrap_or(0);
crate::upnp::try_upnp_mapping(bound_port).await
} else {
None
};
// Per-family public address detection (before anchor/STUN decisions)
let has_public_v4_bound = if !is_mobile {
endpoint.bound_sockets().iter()
.any(|s| matches!(s.ip(), std::net::IpAddr::V4(_)) && is_public_ip(s.ip()))
|| endpoint.addr().ip_addrs()
.any(|s| matches!(s.ip(), std::net::IpAddr::V4(_)) && is_public_ip(s.ip()))
} else {
false
};
let has_public_v6 = endpoint.addr().ip_addrs()
.any(|s| matches!(s.ip(), std::net::IpAddr::V6(_)) && is_publicly_routable(&s));
// Auto-detect anchor mode: desktop/server with any public IP (v4 or v6).
// Mobile devices have carrier IPs that look public but are behind CGNAT.
// A node can be an IPv6 anchor while needing NAT traversal on IPv4.
let is_anchor = Arc::new(AtomicBool::new(false));
if !is_mobile {
if has_public_v4_bound {
is_anchor.store(true, Ordering::Relaxed);
info!("Detected public IPv4, running as anchor");
} else if has_public_v6 {
is_anchor.store(true, Ordering::Relaxed);
info!("Detected public IPv6 (no public IPv4), running as anchor (v6)");
}
if !is_anchor.load(Ordering::Relaxed) {
// UPnP success → publicly reachable on IPv4, auto-anchor
if let Some(ref mapping) = upnp_mapping {
is_anchor.store(true, Ordering::Relaxed);
info!("UPnP: {} → :{}, auto-anchor enabled",
mapping.external_addr, mapping.local_port);
if let Ok(mut log) = activity_log.try_lock() {
log.log(
ActivityLevel::Info,
ActivityCategory::Connection,
format!("UPnP mapping acquired: {} → :{}, auto-anchor enabled",
mapping.external_addr, mapping.local_port),
None,
);
}
}
}
}
// STUN-based NAT type detection.
// Always run STUN unless --bind is set (explicit server deployment).
// Previously skipped for all anchors, but an IPv6-only anchor still needs
// STUN to classify its IPv4 NAT for hole punching with v4-only peers.
let (nat_type, nat_mapping) = if bind_addr.is_some() {
(crate::types::NatType::Public, crate::types::NatMapping::EndpointIndependent)
} else {
let bound_port = endpoint.bound_sockets().first()
.map(|s| s.port()).unwrap_or(0);
let (detected, mapping) = crate::stun::detect_nat_type(bound_port).await;
// UPnP success overrides to Public
if upnp_mapping.is_some() && detected != crate::types::NatType::Public {
info!("NAT type override: {} → Public (UPnP mapped)", detected);
(crate::types::NatType::Public, crate::types::NatMapping::EndpointIndependent)
} else {
(detected, mapping)
}
};
// Final per-family public determination:
// has_public_v4 is true only if we actually have a bound/UPnP/--bind public IPv4.
// NOT derived from nat_type (which could be Public from STUN on a different family).
let has_public_v4 = has_public_v4_bound
|| upnp_mapping.is_some()
|| bind_addr.map_or(false, |a| matches!(a.ip(), std::net::IpAddr::V4(_)) && is_public_ip(a.ip()));
let public_detail = match (has_public_v4, has_public_v6) {
(true, true) => "public (v4+v6)",
(true, false) => "public (v4 only)",
(false, true) => "public (v6 only)",
(false, false) => "not public",
};
info!("NAT type: {}, mapping: {}, {}", nat_type, nat_mapping, public_detail);
if let Ok(mut log) = activity_log.try_lock() {
log.log(
ActivityLevel::Info,
ActivityCategory::Connection,
format!("NAT: {}, mapping: {}, {}", nat_type, nat_mapping, public_detail),
None,
);
}
let upnp_external_addr = upnp_mapping.as_ref().map(|m| m.external_addr);
let conn_mgr = ConnectionManager::new(
endpoint.clone(),
Arc::clone(&storage),
our_node_id,
Arc::clone(&is_anchor),
secret_seed,
blob_store,
profile,
Arc::clone(&activity_log),
upnp_external_addr,
bind_addr,
nat_type,
nat_mapping,
);
let conn_mgr = Arc::new(Mutex::new(conn_mgr));
// Spawn actor wrapping the same Arc<Mutex<CM>> (Phase 1: additive)
let conn_handle = ConnectionActor::spawn_with_arc(Arc::clone(&conn_mgr)).await;
// TCP UPnP mapping for HTTP post delivery (only if UDP UPnP succeeded)
let has_upnp_tcp = if let Some(ref mapping) = upnp_mapping {
crate::upnp::try_upnp_tcp_mapping(mapping.local_port, mapping.external_addr.port()).await
} else {
false
};
info!(
node_id = %endpoint.id(),
anchor = is_anchor.load(Ordering::Relaxed),
http_capable = has_upnp_tcp || has_public_v6 || bind_addr.is_some(),
"Network started (v2)"
);
// Determine CDN replication role from device characteristics
let device_role = if is_mobile {
DeviceRole::Intermittent
} else if is_anchor.load(Ordering::Relaxed) {
DeviceRole::Persistent
} else {
DeviceRole::Available
};
info!(role = %device_role, "CDN replication role determined");
conn_handle.set_device_role(device_role);
Ok(Self {
endpoint,
storage,
our_node_id,
is_anchor,
conn_mgr,
conn_handle,
growth_tx: tokio::sync::Mutex::new(None),
activity_log,
upnp_mapping,
has_upnp_tcp,
has_public_v6,
bind_addr,
device_role,
duplicate_detected: Arc::new(AtomicBool::new(false)),
})
}
fn log_activity(&self, level: ActivityLevel, cat: ActivityCategory, msg: String, peer: Option<NodeId>) {
if let Ok(mut log) = self.activity_log.try_lock() {
log.log(level, cat, msg, peer.map(|p| p));
}
}
pub fn node_id_bytes(&self) -> NodeId {
self.our_node_id
}
pub fn endpoint(&self) -> &iroh::Endpoint {
&self.endpoint
}
pub fn endpoint_id(&self) -> iroh::EndpointId {
self.endpoint.id()
}
pub fn endpoint_addr(&self) -> iroh::EndpointAddr {
self.endpoint.addr()
}
pub fn bound_sockets(&self) -> Vec<SocketAddr> {
self.endpoint.bound_sockets()
}
/// Our addresses as string list (for manifests and CDN metadata).
/// Filters out loopback and link-local (never useful to share).
/// UPnP external address is prepended if available.
pub fn our_addresses(&self) -> Vec<String> {
let mut addrs: Vec<String> = Vec::new();
// UPnP external address first (most useful for remote peers)
if let Some(ref mapping) = self.upnp_mapping {
addrs.push(mapping.external_addr.to_string());
}
for sock in self.endpoint.bound_sockets()
.iter()
.filter(|s| is_shareable_addr(s))
{
let s = sock.to_string();
if !addrs.contains(&s) {
addrs.push(s);
}
}
for sock in self.endpoint.addr().ip_addrs() {
if !is_shareable_addr(sock) {
continue;
}
let s = sock.to_string();
if !addrs.contains(&s) {
addrs.push(s);
}
}
addrs
}
/// Get the UPnP mapping, if one was successfully acquired.
pub fn upnp_mapping(&self) -> Option<&crate::upnp::UpnpMapping> {
self.upnp_mapping.as_ref()
}
/// Clear anchor status (e.g. after UPnP lease loss).
pub fn clear_anchor(&self) {
self.is_anchor.store(false, Ordering::Relaxed);
}
pub fn is_anchor(&self) -> bool {
self.is_anchor.load(Ordering::Relaxed)
}
/// Get the CDN replication device role.
pub fn device_role(&self) -> DeviceRole {
self.device_role
}
/// Get the explicit bind address (from --bind flag), if any.
pub fn bind_addr(&self) -> Option<std::net::SocketAddr> {
self.bind_addr
}
/// Whether this node can serve HTTP (has TCP reachability).
pub fn is_http_capable(&self) -> bool {
self.has_upnp_tcp || self.has_public_v6 || self.bind_addr.is_some()
}
/// Get the port to bind the HTTP TCP listener on (same as QUIC).
pub fn bound_port(&self) -> u16 {
if let Some(bind) = self.bind_addr {
bind.port()
} else {
self.endpoint.bound_sockets().first()
.map(|s| s.port())
.unwrap_or(0)
}
}
/// Whether UPnP TCP mapping is active.
pub fn has_upnp_tcp(&self) -> bool {
self.has_upnp_tcp
}
/// Whether this node has a public IPv6 address.
pub fn has_public_v6(&self) -> bool {
self.has_public_v6
}
/// Whether this node has UPnP mapping (UDP or TCP).
pub fn has_upnp(&self) -> bool {
self.upnp_mapping.is_some() || self.has_upnp_tcp
}
/// Get external HTTP address string for InitialExchange advertisement.
pub fn http_addr(&self) -> Option<String> {
if let Some(ref mapping) = self.upnp_mapping {
return Some(mapping.external_addr.to_string());
}
if let Some(bind) = self.bind_addr {
// Don't advertise 0.0.0.0 — use the observed public IP with the bind port
let ip = bind.ip();
if ip.is_unspecified() {
// Try to find a publicly-routable address from the endpoint
if let Some(sock) = self.endpoint.bound_sockets().first() {
if !sock.ip().is_unspecified() && !sock.ip().is_loopback() {
return Some(std::net::SocketAddr::new(sock.ip(), bind.port()).to_string());
}
}
// Fall back to STUN-observed address if available
return None;
}
return Some(bind.to_string());
}
// For public IPv6, use the actual public address from iroh + bound port
if self.has_public_v6 {
let port = self.endpoint.bound_sockets().first().map(|s| s.port()).unwrap_or(0);
if let Some(public_v6) = self.endpoint.addr().ip_addrs()
.find(|s| matches!(s.ip(), std::net::IpAddr::V6(_)) && is_publicly_routable(s))
{
return Some(std::net::SocketAddr::new(public_v6.ip(), port).to_string());
}
}
None
}
/// Get the connection manager arc (for direct access when needed).
pub fn conn_mgr_arc(&self) -> &Arc<Mutex<ConnectionManager>> {
&self.conn_mgr
}
/// Actor-based handle for connection management.
pub fn conn_handle(&self) -> &ConnHandle {
&self.conn_handle
}
// ---- Accept loop ----
/// Run the connection accept loop. Connections start as ephemeral;
/// only InitialExchange triggers mesh slot allocation.
pub async fn run_accept_loop(self: Arc<Self>) -> anyhow::Result<()> {
info!("Accepting incoming connections (v3 ephemeral)...");
// Rate limit: track auth failures per source IP
let fail_tracker: Arc<tokio::sync::Mutex<std::collections::HashMap<std::net::IpAddr, (u32, tokio::time::Instant)>>> =
Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()));
// Cleanup stale entries every 60s
let ft_cleanup: Arc<tokio::sync::Mutex<std::collections::HashMap<std::net::IpAddr, (u32, tokio::time::Instant)>>> = Arc::clone(&fail_tracker);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
let mut ft = ft_cleanup.lock().await;
let now = tokio::time::Instant::now();
ft.retain(|_, (_, last)| now.duration_since(*last).as_secs() < 300);
}
});
while let Some(incoming) = self.endpoint.accept().await {
let this = Arc::clone(&self);
let remote_sock = crate::connection::normalize_addr(incoming.remote_address());
let ft = Arc::clone(&fail_tracker);
// Check if this IP is rate-limited before spawning
{
let tracker = ft.lock().await;
if let Some((count, last)) = tracker.get(&remote_sock.ip()) {
let elapsed = tokio::time::Instant::now().duration_since(*last);
// Exponential backoff: block for 2^(failures-3) seconds after 3+ failures
if *count >= 3 {
let block_secs = 1u64 << (*count - 3).min(8); // max ~256s
if elapsed.as_secs() < block_secs {
// Silently drop — don't even log to avoid log spam
continue;
}
}
}
}
tokio::spawn(async move {
match incoming.await {
Ok(conn) => {
let remote = conn.remote_id();
let remote_node_id = *remote.as_bytes();
// Successful connection — clear failure count
{
let mut tracker = ft.lock().await;
tracker.remove(&remote_sock.ip());
}
// Store peer with their address
{
let storage = this.storage.get().await;
let _ = storage.upsert_peer(&remote_node_id, &[remote_sock], None);
}
info!(%remote, addr = %remote_sock, "Accepted connection (ephemeral)");
this.log_activity(ActivityLevel::Info, ActivityCategory::Connection, format!("Incoming connection from {}", &hex::encode(remote_node_id)[..8]), Some(remote_node_id));
info!(peer = hex::encode(remote_node_id), "Starting incoming connection handler loop");
// Run the ephemeral connection handler
Self::handle_incoming_connection(
Arc::clone(&this.conn_mgr),
this.conn_handle.clone(),
Arc::clone(&this.storage),
conn,
remote_node_id,
remote_sock,
)
.await;
}
Err(e) => {
// Track auth failure for this IP
let mut tracker = ft.lock().await;
let entry = tracker.entry(remote_sock.ip())
.or_insert((0, tokio::time::Instant::now()));
entry.0 += 1;
entry.1 = tokio::time::Instant::now();
if entry.0 <= 3 {
warn!(error = %e, addr = %remote_sock.ip(), failures = entry.0, "Failed to accept connection");
}
// After 3 failures, stop logging (rate limited silently)
}
}
});
}
Ok(())
}
/// Handle an incoming connection. Runs a stream loop that dispatches
/// uni/bi-streams. InitialExchange triggers mesh slot upgrade.
/// All other message types work ephemerally.
async fn handle_incoming_connection(
conn_mgr: Arc<Mutex<ConnectionManager>>,
conn_handle: ConnHandle,
storage: Arc<StoragePool>,
conn: iroh::endpoint::Connection,
remote_node_id: NodeId,
remote_sock: SocketAddr,
) {
let is_mesh = Arc::new(AtomicBool::new(false));
loop {
tokio::select! {
uni_result = conn.accept_uni() => {
match uni_result {
Ok(mut recv) => {
let cm = Arc::clone(&conn_mgr);
let remote = remote_node_id;
tokio::spawn(async move {
if let Err(e) = ConnectionManager::handle_uni_stream(&cm, &mut recv, remote).await {
debug!(peer = hex::encode(remote), error = %e, "Uni-stream handler failed");
}
});
}
Err(_) => break,
}
}
bi_result = conn.accept_bi() => {
match bi_result {
Ok((send, mut recv)) => {
info!(peer = hex::encode(remote_node_id), "Accepted bi-stream from peer");
let msg_type = match read_message_type(&mut recv).await {
Ok(mt) => mt,
Err(e) => {
debug!(peer = hex::encode(remote_node_id), error = %e, "Failed to read message type");
continue;
}
};
if msg_type == MessageType::InitialExchange {
info!(peer = hex::encode(remote_node_id), "Received InitialExchange bi-stream, attempting mesh upgrade");
// Try mesh upgrade (uses ConnHandle, no conn_mgr lock)
let upgraded = Self::try_mesh_upgrade(
&conn_handle,
&storage,
&conn,
remote_node_id,
remote_sock,
send,
recv,
&is_mesh,
).await;
if upgraded {
// Now run the mesh stream loop for remaining streams.
// The connection is now managed by ConnectionManager.
// We break out and run run_mesh_streams instead.
break;
}
// If not upgraded (refused), continue handling ephemeral streams
} else {
// Handle as ephemeral bi-stream
let cm = Arc::clone(&conn_mgr);
let remote = remote_node_id;
tokio::spawn(async move {
if let Err(e) = ConnectionManager::handle_bi_stream_typed(
&cm, recv, send, remote, msg_type
).await {
debug!(peer = hex::encode(remote), error = %e, "Bi-stream handler failed");
}
});
}
}
Err(_) => break,
}
}
}
}
if is_mesh.load(Ordering::Relaxed) {
// Upgraded to mesh: run the mesh stream loop
let last_activity = conn_handle.get_peer_last_activity(&remote_node_id).await
.unwrap_or_else(|| Arc::new(std::sync::atomic::AtomicU64::new(0)));
ConnectionManager::run_mesh_streams(
conn_mgr.clone(),
conn,
remote_node_id,
last_activity,
).await;
} else {
// Ephemeral connection ended — no cleanup needed
debug!(peer = hex::encode(remote_node_id), "Ephemeral connection closed");
}
}
/// Attempt to upgrade an incoming connection to a mesh slot.
/// Returns true if upgraded, false if refused (sends RefuseRedirect).
/// Uses ConnHandle for all state access — no direct conn_mgr lock.
async fn try_mesh_upgrade(
conn_handle: &ConnHandle,
storage: &Arc<StoragePool>,
conn: &iroh::endpoint::Connection,
remote_node_id: NodeId,
remote_sock: SocketAddr,
send: iroh::endpoint::SendStream,
recv: iroh::endpoint::RecvStream,
is_mesh: &AtomicBool,
) -> bool {
// Try to allocate a slot
let accepted = conn_handle.accept_connection(
conn.clone(), remote_node_id, Some(remote_sock),
).await;
info!(peer = hex::encode(remote_node_id), accepted, "try_mesh_upgrade: accept_connection result");
if !accepted {
let redirect = conn_handle.pick_random_redirect_peer(&remote_node_id).await;
let payload = RefuseRedirectPayload {
reason: "slots full".to_string(),
redirect,
};
let mut send = send;
let _ = write_typed_message(&mut send, MessageType::RefuseRedirect, &payload).await;
let _ = send.finish();
debug!(peer = hex::encode(remote_node_id), "Refused mesh connection (slots full)");
conn_handle.add_session(remote_node_id, conn.clone(), crate::types::SessionReachMethod::Direct, Some(remote_sock)).await;
conn_handle.log_activity(ActivityLevel::Info, ActivityCategory::Connection, format!("Refused {} mesh (slots full), added session", &hex::encode(remote_node_id)[..8]), Some(remote_node_id));
return false;
}
{
let s = storage.get().await;
let _ = s.upsert_peer(&remote_node_id, &[remote_sock], None);
let _ = s.add_mesh_peer(&remote_node_id, PeerSlotKind::Local, 0);
if s.has_social_route(&remote_node_id).unwrap_or(false) {
let _ = s.touch_social_route_connect(
&remote_node_id,
&[remote_sock],
crate::types::ReachMethod::Inbound,
);
}
}
// Handle initial exchange — no conn_mgr lock needed
let our_node_id = conn_handle.our_node_id().await;
let anchor_addr = conn_handle.build_anchor_advertised_addr().await;
let our_nat_type = conn_handle.nat_type().await;
let our_http_capable = conn_handle.is_http_capable();
let our_http_addr = conn_handle.http_addr();
match initial_exchange_accept(storage, &our_node_id, send, recv, remote_node_id, anchor_addr, Some(remote_sock), our_nat_type, our_http_capable, our_http_addr, conn_handle.device_role(), None, false).await {
Ok(()) => {
info!(peer = hex::encode(remote_node_id), "Initial exchange complete (upgraded to mesh)");
conn_handle.log_activity(ActivityLevel::Info, ActivityCategory::Connection, format!("Upgraded {} to mesh", &hex::encode(remote_node_id)[..8]), Some(remote_node_id));
}
Err(e) => {
error!(peer = hex::encode(remote_node_id), error = ?e, "Initial exchange failed");
return false;
}
}
conn_handle.notify_growth();
is_mesh.store(true, Ordering::Relaxed);
true
}
// ---- Connection management ----
/// Connect to a peer and establish a mesh connection.
pub async fn connect_to_peer(
&self,
peer_id: NodeId,
addr: iroh::EndpointAddr,
) -> anyhow::Result<()> {
// Never connect to ourselves
if peer_id == self.our_node_id {
return Ok(());
}
// Check if already connected
if self.conn_handle.is_connected(&peer_id).await {
return Ok(());
}
// Store addresses so they're available during initial exchange
let addrs: Vec<std::net::SocketAddr> = addr.ip_addrs().copied().collect();
if !addrs.is_empty() {
let storage = self.storage.get().await;
let _ = storage.upsert_peer(&peer_id, &addrs, None);
}
// QUIC connect OUTSIDE the conn_mgr lock with 15s timeout
let conn = ConnectionManager::connect_to_unlocked(&self.endpoint, addr).await?;
// Register the established connection
self.conn_handle.register_connection(peer_id, conn.clone(), addrs, PeerSlotKind::Local).await;
let anchor_addr = self.conn_handle.build_anchor_advertised_addr().await;
let our_nat_type = self.conn_handle.nat_type().await;
// Initial exchange WITHOUT holding conn_mgr lock
match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await? {
ExchangeResult::Accepted { duplicate_active } => {
if duplicate_active {
self.duplicate_detected.store(true, std::sync::atomic::Ordering::Relaxed);
warn!(peer = hex::encode(peer_id), "Duplicate identity detected by anchor — this identity is active elsewhere");
self.log_activity(ActivityLevel::Warn, ActivityCategory::Connection, "Duplicate identity active on network".into(), None);
}
// Spawn the per-connection stream loop
let conn_data = self.conn_handle.get_connection_map().await;
if let Some((_, conn, _, last_activity)) = conn_data.into_iter().find(|(nid, _, _, _)| *nid == peer_id) {
let conn_mgr = Arc::clone(&self.conn_mgr);
tokio::spawn(async move {
ConnectionManager::run_mesh_streams(conn_mgr, conn, peer_id, last_activity).await;
});
}
Ok(())
}
ExchangeResult::Refused { redirect } => {
// Remove the connection we just registered
self.conn_handle.disconnect_peer(&peer_id).await;
// Try one redirect if provided
if let Some(ref redir) = redirect {
info!(
peer = hex::encode(peer_id),
redirect = &redir.n,
"Mesh refused, trying redirect peer"
);
if let Ok(redirect_bytes) = hex::decode(&redir.n) {
if let Ok(redir_id) = <[u8; 32]>::try_from(redirect_bytes.as_slice()) {
if redir_id != self.our_node_id {
let addrs: Vec<SocketAddr> = redir.a.iter()
.filter_map(|a| a.parse::<SocketAddr>().ok())
.collect();
let s = self.storage.get().await;
let _ = s.upsert_peer(&redir_id, &addrs, None);
drop(s);
self.conn_handle.notify_growth();
}
}
}
}
anyhow::bail!("mesh refused: slots full");
}
}
}
/// Pull from all connected peers.
pub async fn pull_from_all(&self) -> anyhow::Result<PullStats> {
let peers = self.conn_handle.connected_peers().await;
let mut total_posts = 0;
let mut total_vis = 0;
let mut success = 0;
for peer_id in peers {
// Uses Network::pull_from_peer which doesn't hold conn_mgr lock during I/O
let result = self.pull_from_peer(&peer_id).await;
match result {
Ok(stats) => {
total_posts += stats.posts_received;
total_vis += stats.visibility_updates;
success += 1;
// Also fetch engagement data
let _ = self.conn_handle.fetch_engagement_from_peer(&peer_id).await;
if stats.posts_received > 0 {
info!(
peer = hex::encode(peer_id),
posts = stats.posts_received,
"Pulled posts"
);
}
}
Err(e) => {
debug!(
peer = hex::encode(peer_id),
error = %e,
"Pull failed"
);
}
}
}
Ok(PullStats {
peers_pulled: success,
posts_received: total_posts,
visibility_updates: total_vis,
})
}
/// Broadcast routing diff to all connected peers.
/// Uses ConnHandle to get diff data, then sends outside the lock.
pub async fn broadcast_diff(&self) -> anyhow::Result<usize> {
use crate::protocol::{NodeListUpdatePayload, write_typed_message, MessageType};
let snapshot = self.conn_handle.get_diff_data().await;
if snapshot.n1_added.is_empty() && snapshot.n1_removed.is_empty()
&& snapshot.n2_added.is_empty() && snapshot.n2_removed.is_empty()
{
return Ok(0);
}
let payload = NodeListUpdatePayload {
seq: snapshot.diff_seq,
n1_added: snapshot.n1_added,
n1_removed: snapshot.n1_removed,
n2_added: snapshot.n2_added,
n2_removed: snapshot.n2_removed,
};
let mut sent = 0;
for (peer_id, conn) in &snapshot.connections {
let result = async {
let mut send = conn.open_uni().await?;
write_typed_message(&mut send, MessageType::NodeListUpdate, &payload).await?;
send.finish()?;
anyhow::Ok(())
}.await;
if result.is_ok() {
sent += 1;
} else {
debug!(peer = hex::encode(peer_id), "Failed to send routing diff");
}
}
Ok(sent)
}
/// Broadcast full N1/N2 state to all mesh peers (periodic catch-up for missed diffs).
pub async fn broadcast_full_state(&self) -> anyhow::Result<usize> {
use crate::protocol::{NodeListUpdatePayload, write_typed_message, MessageType};
let snapshot = self.conn_handle.get_diff_data().await;
// Build full state: all current N1 and N2 as "added", nothing removed
let all_n1 = self.conn_handle.connected_peers().await;
let all_n2 = {
let storage = self.storage.get().await;
storage.build_n2_share().unwrap_or_default()
};
if all_n1.is_empty() && all_n2.is_empty() {
return Ok(0);
}
let payload = NodeListUpdatePayload {
seq: snapshot.diff_seq,
n1_added: all_n1,
n1_removed: vec![],
n2_added: all_n2,
n2_removed: vec![],
};
let mut sent = 0;
for (_peer_id, conn) in &snapshot.connections {
let result = async {
let mut send = conn.open_uni().await?;
write_typed_message(&mut send, MessageType::NodeListUpdate, &payload).await?;
send.finish()?;
anyhow::Ok(())
}.await;
if result.is_ok() {
sent += 1;
}
}
Ok(sent)
}
/// Send a post notification to all audience members (ephemeral-capable).
pub async fn notify_post(&self, post_id: &crate::types::PostId, author: &NodeId) -> usize {
let payload = PostNotificationPayload {
post_id: *post_id,
author: *author,
};
self.send_to_audience(MessageType::PostNotification, &payload).await
}
/// Push a profile update to all audience members (ephemeral-capable).
pub async fn push_profile(&self, profile: &PublicProfile) -> usize {
// Sanitize: if public_visible=false, strip display_name/bio from pushed profile
let mut push_profile = profile.clone();
if !profile.public_visible {
push_profile.display_name = String::new();
push_profile.bio = String::new();
}
let payload = ProfileUpdatePayload {
profiles: vec![push_profile],
};
// Push to all connected mesh peers (not just audience) so name changes propagate immediately
let conns = self.conn_handle.get_connection_map().await;
let mut sent = 0;
for (_peer_id, conn, _, _) in &conns {
if let Ok(mut send) = conn.open_uni().await {
if write_typed_message(&mut send, MessageType::ProfileUpdate, &payload)
.await
.is_ok()
{
let _ = send.finish();
sent += 1;
}
}
}
sent
}
/// Push a circle profile update to all connected mesh peers.
pub async fn push_circle_profile(
&self,
payload: &crate::protocol::CircleProfileUpdatePayload,
) -> usize {
// Get connections snapshot (no lock during I/O)
let conns = self.conn_handle.get_connection_map().await;
let mut sent = 0;
for (peer_id, conn, _, _) in &conns {
if let Ok(mut send) = conn.open_uni().await {
if write_typed_message(
&mut send,
MessageType::CircleProfileUpdate,
payload,
)
.await
.is_ok()
{
let _ = send.finish();
sent += 1;
}
} else {
tracing::debug!(peer = hex::encode(peer_id), "Failed to open uni for circle profile push");
}
}
sent
}
/// Push a delete record to all audience members (ephemeral-capable).
pub async fn push_delete(&self, record: &DeleteRecord) -> usize {
let payload = DeleteRecordPayload {
records: vec![record.clone()],
};
self.send_to_audience(MessageType::DeleteRecord, &payload).await
}
/// Push a disconnect notice to all audience members (ephemeral-capable).
pub async fn push_disconnect_to_audience(&self, disconnected_peer: &NodeId) -> usize {
let payload = SocialDisconnectNoticePayload {
node_id: *disconnected_peer,
};
self.send_to_audience(MessageType::SocialDisconnectNotice, &payload).await
}
/// Push a social address update to all audience members (ephemeral-capable).
pub async fn push_address_update_to_audience(
&self,
node_id: &NodeId,
addresses: &[String],
peer_addresses: &[PeerWithAddress],
) -> usize {
let payload = SocialAddressUpdatePayload {
node_id: *node_id,
addresses: addresses.to_vec(),
peer_addresses: peer_addresses.to_vec(),
};
self.send_to_audience(MessageType::SocialAddressUpdate, &payload).await
}
/// Push a visibility update to all connected peers.
/// Gets connections snapshot, sends I/O outside the lock.
pub async fn push_visibility(&self, update: &crate::types::VisibilityUpdate) -> usize {
use crate::protocol::{VisibilityUpdatePayload, write_typed_message, MessageType};
let conns = self.conn_handle.get_connection_map().await;
let payload = VisibilityUpdatePayload {
updates: vec![update.clone()],
};
let mut sent = 0;
for (peer_id, conn, _, _) in &conns {
let result = async {
let mut send = conn.open_uni().await?;
write_typed_message(&mut send, MessageType::VisibilityUpdate, &payload).await?;
send.finish()?;
anyhow::Ok(())
}.await;
if result.is_ok() {
sent += 1;
} else {
debug!(peer = hex::encode(peer_id), "Failed to push visibility update");
}
}
sent
}
/// Push an updated manifest to all known holders of the file (flat set,
/// up to 5 most-recent). Replaces the legacy downstream-tree push.
pub async fn push_manifest_to_downstream(
&self,
cid: &[u8; 32],
manifest: &crate::types::CdnManifest,
) -> usize {
let holders = {
let storage = self.storage.get().await;
storage.get_file_holders(cid).unwrap_or_default()
};
let payload = crate::protocol::ManifestPushPayload {
manifests: vec![crate::protocol::ManifestPushEntry {
cid: *cid,
manifest: manifest.clone(),
}],
};
let mut sent = 0;
for (peer, peer_addrs) in &holders {
if self.send_to_peer_uni(peer, MessageType::ManifestPush, &payload).await.is_ok() {
sent += 1;
let storage = self.storage.get().await;
let _ = storage.touch_file_holder(
cid,
peer,
peer_addrs,
crate::storage::HolderDirection::Sent,
);
}
}
sent
}
/// Send blob delete notices to all known holders of a file.
/// Second argument kept as Option for signature stability; flat-holder
/// model doesn't need separate upstream handling.
pub async fn send_blob_delete_notices(
&self,
cid: &[u8; 32],
holders: &[(NodeId, Vec<String>)],
_legacy_upstream: Option<&(NodeId, Vec<String>)>,
) -> usize {
let payload = crate::protocol::BlobDeleteNoticePayload {
cid: *cid,
upstream_node: None,
};
let mut sent = 0;
for (peer, _addrs) in holders {
if self.send_to_peer_uni(peer, MessageType::BlobDeleteNotice, &payload).await.is_ok() {
sent += 1;
}
}
sent
}
/// Request a manifest refresh from the upstream peer for a blob CID.
/// Returns the updated manifest if the upstream has a newer version.
pub async fn request_manifest_refresh(
&self,
cid: &[u8; 32],
upstream: &NodeId,
current_updated_at: u64,
) -> anyhow::Result<Option<crate::types::CdnManifest>> {
let payload = crate::protocol::ManifestRefreshRequestPayload {
cid: *cid,
current_updated_at,
};
let response: crate::protocol::ManifestRefreshResponsePayload = self.send_to_peer_bi(
upstream,
MessageType::ManifestRefreshRequest,
&payload,
MessageType::ManifestRefreshResponse,
).await?;
if response.updated {
Ok(response.manifest)
} else {
Ok(None)
}
}
/// Send an audience request to a peer (persistent if available, ephemeral otherwise).
pub async fn send_audience_request(&self, target: &NodeId) -> anyhow::Result<()> {
let payload = AudienceRequestPayload {
requester: self.our_node_id,
};
self.send_to_peer_uni(target, MessageType::AudienceRequest, &payload).await
}
/// Send an audience response to a peer (persistent if available, ephemeral otherwise).
pub async fn send_audience_response(&self, target: &NodeId, approved: bool) -> anyhow::Result<()> {
let payload = AudienceResponsePayload {
responder: self.our_node_id,
approved,
};
self.send_to_peer_uni(target, MessageType::AudienceResponse, &payload).await
}
/// Push a public post to audience members (persistent if available, ephemeral otherwise).
pub async fn push_to_audience(
&self,
post_id: &crate::types::PostId,
post: &Post,
visibility: &PostVisibility,
) -> usize {
if !matches!(visibility, PostVisibility::Public) {
return 0;
}
let audience_members: Vec<NodeId> = {
match self.storage.get().await.list_audience_members() {
Ok(m) => m,
Err(_) => return 0,
}
};
let payload = PostPushPayload {
post: SyncPost {
id: *post_id,
post: post.clone(),
visibility: visibility.clone(),
},
};
let mut pushed = 0;
for member in &audience_members {
if self.send_to_peer_uni(member, MessageType::PostPush, &payload).await.is_ok() {
pushed += 1;
}
}
pushed
}
/// Push a group key to a specific peer (uni-stream).
pub async fn push_group_key(
&self,
peer: &NodeId,
payload: &crate::protocol::GroupKeyDistributePayload,
) -> bool {
self.send_to_peer_uni(peer, MessageType::GroupKeyDistribute, payload)
.await
.is_ok()
}
/// Send a social checkin to a peer (persistent if available, ephemeral otherwise).
pub async fn send_social_checkin(
&self,
peer_id: &NodeId,
our_addresses: &[String],
our_peer_addresses: &[crate::types::PeerWithAddress],
) -> anyhow::Result<crate::protocol::SocialCheckinPayload> {
// Try persistent first — get connection without holding lock during I/O
if let Some(conn) = self.conn_handle.get_connection(peer_id).await {
let payload = crate::protocol::SocialCheckinPayload {
node_id: self.our_node_id,
addresses: our_addresses.to_vec(),
peer_addresses: our_peer_addresses.to_vec(),
};
let (mut send, mut recv) = conn.open_bi().await?;
write_typed_message(&mut send, MessageType::SocialCheckin, &payload).await?;
send.finish()?;
let msg_type = read_message_type(&mut recv).await?;
if msg_type != MessageType::SocialCheckin {
anyhow::bail!("expected SocialCheckin reply, got {:?}", msg_type);
}
let reply: crate::protocol::SocialCheckinPayload = read_payload(&mut recv, 64 * 1024 * 1024).await?;
return Ok(reply);
}
// Ephemeral
let payload = crate::protocol::SocialCheckinPayload {
node_id: self.our_node_id,
addresses: our_addresses.to_vec(),
peer_addresses: our_peer_addresses.to_vec(),
};
self.send_to_peer_bi(
peer_id,
MessageType::SocialCheckin,
&payload,
MessageType::SocialCheckin,
).await
}
/// Worm lookup: fan-out search for a peer beyond N3.
pub async fn worm_lookup(&self, target: &NodeId) -> anyhow::Result<Option<WormResult>> {
self.conn_handle.worm_lookup(target).await
}
/// Content-aware worm search: find a post or blob across the network.
pub async fn content_search(
&self,
target: &NodeId,
post_id: Option<PostId>,
blob_id: Option<[u8; 32]>,
) -> anyhow::Result<Option<WormResult>> {
self.conn_handle.content_search(target, post_id, blob_id).await
}
/// Fetch a single post from a specific peer.
pub async fn post_fetch(
&self,
holder: &NodeId,
post_id: &PostId,
) -> anyhow::Result<Option<crate::protocol::SyncPost>> {
self.conn_handle.post_fetch(holder, post_id).await
}
/// Send a TCP punch request to a peer, asking them to open a NAT pinhole for a browser.
pub async fn tcp_punch(
&self,
holder: &NodeId,
browser_ip: String,
post_id: &PostId,
) -> anyhow::Result<Option<String>> {
self.conn_handle.tcp_punch(holder, browser_ip, post_id).await
}
/// Check if we're connected to a peer.
pub async fn is_connected(&self, peer_id: &NodeId) -> bool {
self.conn_handle.is_connected(peer_id).await
}
/// Check if we have an active session with a peer.
pub async fn has_session(&self, peer_id: &NodeId) -> bool {
self.conn_handle.has_session(peer_id).await
}
/// Get list of connected peers.
pub async fn connected_peers(&self) -> Vec<NodeId> {
self.conn_handle.connected_peers().await
}
/// Get connection info for display.
pub async fn connection_info(&self) -> Vec<(NodeId, PeerSlotKind, u64)> {
self.conn_handle.connection_info().await
}
pub async fn connection_count(&self) -> usize {
self.conn_handle.connection_count().await
}
pub async fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
self.conn_handle.is_connected(peer_id).await
}
/// Check if a peer is connected via mesh OR session.
pub async fn is_peer_connected_or_session(&self, peer_id: &NodeId) -> bool {
self.conn_handle.is_connected_or_session(peer_id).await
}
/// Get list of session peer NodeIds.
pub async fn session_peer_ids(&self) -> Vec<NodeId> {
self.conn_handle.session_peer_ids().await
}
/// Connect to an anchor peer, falling back to session if mesh is refused.
/// Unlike connect_to_peer(), this keeps a session connection when the anchor's
/// mesh is full, allowing anchor registration and referral requests.
pub async fn connect_to_anchor(
&self,
peer_id: NodeId,
addr: iroh::EndpointAddr,
) -> anyhow::Result<()> {
if self.conn_handle.is_connected_or_session(&peer_id).await {
return Ok(());
}
match self.connect_to_peer(peer_id, addr.clone()).await {
Ok(()) => Ok(()),
Err(e) if e.to_string().contains("mesh refused") => {
// Anchor refused mesh — reconnect as session for registration
let conn = ConnectionManager::connect_to_unlocked(&self.endpoint, addr).await?;
self.conn_handle.add_session(peer_id, conn, crate::types::SessionReachMethod::Direct, None).await;
self.conn_handle.log_activity(
ActivityLevel::Info,
ActivityCategory::Anchor,
format!("Anchor {} mesh full, using session", &hex::encode(peer_id)[..8]),
Some(peer_id),
);
Ok(())
}
Err(e) => Err(e),
}
}
/// Register an already-established QUIC connection as a mesh peer.
/// Sends InitialExchange first — if the remote accepts (responds with
/// InitialExchange), registers as mesh and spawns the stream loop.
/// If the remote refuses (RefuseRedirect), falls back to session.
/// Used by the growth loop after a successful hole punch.
pub async fn register_as_mesh(
&self,
peer_id: NodeId,
conn: iroh::endpoint::Connection,
) -> anyhow::Result<()> {
if peer_id == self.our_node_id {
anyhow::bail!("cannot mesh with self");
}
let anchor_addr = self.conn_handle.build_anchor_advertised_addr().await;
let our_nat_type = self.conn_handle.nat_type().await;
match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await {
Ok(ExchangeResult::Accepted { .. }) => {
self.conn_handle.register_connection(peer_id, conn.clone(), vec![], PeerSlotKind::Local).await;
{
let s = self.storage.get().await;
let _ = s.add_mesh_peer(&peer_id, PeerSlotKind::Local, 0);
}
// Spawn the per-connection stream loop
let conn_data = self.conn_handle.get_connection_map().await;
if let Some((_, conn, _, last_activity)) = conn_data.into_iter().find(|(nid, _, _, _)| *nid == peer_id) {
let conn_mgr = Arc::clone(&self.conn_mgr);
tokio::spawn(async move {
ConnectionManager::run_mesh_streams(conn_mgr, conn, peer_id, last_activity).await;
});
}
Ok(())
}
Ok(ExchangeResult::Refused { redirect }) => {
let redir_info = redirect.as_ref().map(|r| r.n.clone());
info!(peer = hex::encode(peer_id), redirect = ?redir_info, "Mesh refused after hole punch, keeping as session");
self.conn_handle.add_session(peer_id, conn, crate::types::SessionReachMethod::HolePunch, None).await;
anyhow::bail!("mesh refused: slots full");
}
Err(e) => Err(e),
}
}
/// Probe all mesh connections after app resume (e.g. phone waking from sleep).
/// Sends a keepalive to each peer with a short timeout. Dead connections are
/// removed immediately, then recovery/growth are triggered if needed.
/// Returns the number of dead connections removed.
pub async fn wake_health_check(&self) -> usize {
use crate::protocol::MessageType;
let peers: Vec<(NodeId, iroh::endpoint::Connection)> = self.conn_handle.get_connection_map().await
.into_iter()
.map(|(nid, conn, _, _)| (nid, conn))
.collect();
if peers.is_empty() {
return 0;
}
let before = peers.len();
self.log_activity(
ActivityLevel::Info,
ActivityCategory::Connection,
format!("Wake health check: probing {} connections", before),
None,
);
let mut dead: Vec<NodeId> = Vec::new();
for (nid, conn) in &peers {
if conn.close_reason().is_some() {
dead.push(*nid);
continue;
}
let probe = async {
let mut send = conn.open_uni().await?;
send.write_all(&[MessageType::MeshKeepalive.as_byte()]).await?;
send.finish()?;
anyhow::Ok(())
};
match tokio::time::timeout(std::time::Duration::from_secs(3), probe).await {
Ok(Ok(())) => {}
_ => dead.push(*nid),
}
}
for nid in &dead {
self.conn_handle.disconnect_peer(nid).await;
}
let removed = dead.len();
let remaining = before - removed;
self.log_activity(
ActivityLevel::Info,
ActivityCategory::Connection,
format!("Wake health check: {removed} dead, {remaining} alive"),
None,
);
if remaining == 0 {
self.conn_handle.notify_recovery();
} else {
self.conn_handle.notify_growth();
}
removed
}
/// Rebalance connection slots and spawn stream loops for new connections.
pub async fn rebalance(&self) -> anyhow::Result<()> {
let newly_connected = self.conn_handle.rebalance_slots().await?;
// Do initial exchange for newly connected (outside conn_mgr lock)
let anchor_addr = self.conn_handle.build_anchor_advertised_addr().await;
let our_nat_type = self.conn_handle.nat_type().await;
for peer_id in &newly_connected {
let conn = self.conn_handle.get_connection(peer_id).await;
if let Some(conn) = conn {
match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, *peer_id, anchor_addr.clone(), our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await {
Ok(ExchangeResult::Accepted { .. }) => {}
Ok(ExchangeResult::Refused { redirect }) => {
debug!(peer = hex::encode(peer_id), "Auto-connect refused, disconnecting");
self.conn_handle.disconnect_peer(peer_id).await;
if let Some(ref redir) = redirect {
if let Ok(redir_bytes) = hex::decode(&redir.n) {
if let Ok(redir_id) = <[u8; 32]>::try_from(redir_bytes.as_slice()) {
let addrs: Vec<SocketAddr> = redir.a.iter()
.filter_map(|a| a.parse::<SocketAddr>().ok())
.collect();
let _ = self.storage.get().await.upsert_peer(&redir_id, &addrs, None);
}
}
}
}
Err(e) => {
debug!(peer = hex::encode(peer_id), error = %e, "Auto-connect initial exchange failed");
}
}
}
}
// Spawn run_mesh_streams for each newly connected peer
let conn_data = self.conn_handle.get_connection_map().await;
for peer_id in &newly_connected {
if let Some((_, conn, _, last_activity)) = conn_data.iter().find(|(nid, _, _, _)| nid == peer_id) {
let conn = conn.clone();
let last_activity = Arc::clone(last_activity);
let conn_mgr = Arc::clone(&self.conn_mgr);
let pid = *peer_id;
tokio::spawn(async move {
ConnectionManager::run_mesh_streams(conn_mgr, conn, pid, last_activity).await;
});
}
}
Ok(())
}
// ---- Reactive growth loop ----
/// Set the growth loop signal sender on both Network and ConnectionManager.
pub async fn set_growth_tx(&self, tx: tokio::sync::mpsc::Sender<()>) {
self.conn_handle.set_growth_tx(tx.clone()).await;
*self.growth_tx.lock().await = Some(tx);
}
/// Set the recovery loop signal sender on ConnectionManager.
pub async fn set_recovery_tx(&self, tx: tokio::sync::mpsc::Sender<()>) {
self.conn_handle.set_recovery_tx(tx).await;
}
/// Signal the growth loop to wake up.
pub async fn notify_growth(&self) {
if let Some(tx) = self.growth_tx.lock().await.as_ref() {
let _ = tx.try_send(());
}
}
/// Run the growth loop: wakes on signal, sequentially connects to the most
/// diverse N2 candidate, then re-scores with updated knowledge. Each new
/// connection triggers InitialExchange which updates N2/N3 tables, so the
/// next pick uses that expanded view.
pub async fn run_growth_loop(
self: Arc<Self>,
mut rx: tokio::sync::mpsc::Receiver<()>,
) {
loop {
// Block until signaled
if rx.recv().await.is_none() {
break; // Channel closed
}
// Drain any queued signals (coalesce)
while rx.try_recv().is_ok() {}
let mut consecutive_failures: u32 = 0;
loop {
// Check slots + pick candidate via ConnHandle (no lock contention)
let available = self.conn_handle.available_local_slots().await;
if available == 0 {
debug!("Growth loop: local slots full");
self.log_activity(ActivityLevel::Info, ActivityCategory::Growth, "Local slots full".into(), None);
break;
}
let candidate = {
let scored = self.conn_handle.score_n2_candidates().await;
scored.into_iter().next().map(|(nid, score)| (nid, score))
};
let (candidate_id, score) = match candidate {
Some((nid, score)) if nid == self.our_node_id => {
debug!("Growth loop: skipping self as candidate");
continue;
}
Some(c) => c,
None => {
debug!("Growth loop: no N2 candidates available");
self.log_activity(ActivityLevel::Info, ActivityCategory::Growth, "No N2 candidates available".into(), None);
break;
}
};
debug!(
peer = hex::encode(candidate_id),
score = format!("{:.2}", score),
"Growth loop: trying diverse peer"
);
self.log_activity(ActivityLevel::Info, ActivityCategory::Growth, format!("Trying candidate {} (score {:.1})", &hex::encode(candidate_id)[..8], score), Some(candidate_id));
// Resolve address via ConnHandle (no lock during I/O)
let addr_str = {
let local_addr = self.conn_handle.resolve_peer_addr_local(&candidate_id).await;
if let Some(endpoint_addr) = local_addr {
endpoint_addr.ip_addrs().next().map(|a| a.to_string())
} else {
// Network resolution: get reporter connections, resolve outside lock
let reporters_and_conns = {
let storage = self.storage.get().await;
let n2 = storage.find_in_n2(&candidate_id).unwrap_or_default();
let n3 = storage.find_in_n3(&candidate_id).unwrap_or_default();
drop(storage);
let conn_map = self.conn_handle.get_connection_map().await;
let reporter_set: std::collections::HashSet<NodeId> = n2.into_iter().chain(n3).collect();
conn_map.into_iter()
.filter(|(nid, _, _, _)| reporter_set.contains(nid))
.map(|(_, conn, _, _)| conn)
.collect::<Vec<_>>()
};
let mut resolved = None;
for conn in reporters_and_conns {
let result: anyhow::Result<Option<String>> = async {
let (mut send, mut recv) = conn.open_bi().await?;
let req = crate::protocol::AddressRequestPayload { target: candidate_id };
write_typed_message(&mut send, MessageType::AddressRequest, &req).await?;
send.finish()?;
let _resp_type = read_message_type(&mut recv).await?;
let resp: crate::protocol::AddressResponsePayload =
read_payload(&mut recv, 4096).await?;
Ok(resp.address)
}.await;
if let Ok(Some(addr)) = result {
resolved = Some(addr);
break;
}
}
resolved
}
};
let addr_str = match addr_str {
Some(a) => a,
None => {
debug!(
peer = hex::encode(candidate_id),
"Growth loop: no address, marking unreachable"
);
self.log_activity(ActivityLevel::Warn, ActivityCategory::Growth, format!("No address for {}", &hex::encode(candidate_id)[..8]), Some(candidate_id));
self.conn_handle.mark_unreachable(&candidate_id);
consecutive_failures += 1;
if consecutive_failures >= 3 {
debug!("Growth loop: 3 consecutive failures, backing off");
self.log_activity(ActivityLevel::Warn, ActivityCategory::Growth, "3 failures, backing off".into(), None);
break;
}
continue;
}
};
// Build EndpointAddr and connect
let endpoint_id = match iroh::EndpointId::from_bytes(&candidate_id) {
Ok(eid) => eid,
Err(_) => {
consecutive_failures += 1;
if consecutive_failures >= 3 { break; }
continue;
}
};
let mut addr = iroh::EndpointAddr::from(endpoint_id);
if let Ok(sock) = addr_str.parse::<SocketAddr>() {
addr = addr.with_ip_addr(sock);
}
match self.connect_to_peer(candidate_id, addr).await {
Ok(()) => {
info!(
peer = hex::encode(candidate_id),
score = format!("{:.2}", score),
"Growth loop: connected to diverse peer"
);
self.log_activity(ActivityLevel::Info, ActivityCategory::Growth, format!("Connected directly (score {:.1})", score), Some(candidate_id));
consecutive_failures = 0;
// Broadcast diff so peers learn about our new connection
let _ = self.broadcast_diff().await;
// Brief pause to let InitialExchange update N2/N3 before next pick
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Err(e) => {
warn!(
peer = hex::encode(candidate_id),
error = %e,
"Growth loop: direct connect failed, trying introduction"
);
self.log_activity(ActivityLevel::Warn, ActivityCategory::Growth, format!("Direct connect failed: {}", e), Some(candidate_id));
// Find N2 reporter(s) who told us about this peer — they can introduce us
let reporters = {
let storage = self.storage.get().await;
storage.find_in_n2(&candidate_id).unwrap_or_default()
};
let mut introduced = false;
for reporter in reporters {
// Reporter must be a connected peer (our N1)
if !self.conn_handle.is_connected(&reporter).await {
continue;
}
info!(
peer = hex::encode(candidate_id),
introducer = hex::encode(reporter),
"Growth loop: requesting introduction"
);
self.log_activity(ActivityLevel::Info, ActivityCategory::Growth, format!("Requesting introduction via {}", &hex::encode(reporter)[..8]), Some(candidate_id));
match self.connect_via_introduction_as_mesh(candidate_id, reporter).await {
Ok(()) => {
info!(
peer = hex::encode(candidate_id),
score = format!("{:.2}", score),
"Growth loop: mesh connected via introduction"
);
self.log_activity(ActivityLevel::Info, ActivityCategory::Growth, format!("Mesh connected via introduction (score {:.1})", score), Some(candidate_id));
introduced = true;
break;
}
Err(e) => {
warn!(
peer = hex::encode(candidate_id),
introducer = hex::encode(reporter),
error = %e,
"Growth loop: introduction failed"
);
self.log_activity(ActivityLevel::Warn, ActivityCategory::Growth, format!("Introduction failed: {}", e), Some(candidate_id));
}
}
}
if introduced {
consecutive_failures = 0;
let _ = self.broadcast_diff().await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} else {
self.conn_handle.mark_unreachable(&candidate_id);
consecutive_failures += 1;
if consecutive_failures >= 3 {
debug!("Growth loop: 3 consecutive failures, backing off");
self.log_activity(ActivityLevel::Warn, ActivityCategory::Growth, "3 failures, backing off".into(), None);
break;
}
}
}
}
}
}
}
// ---- Audience-targeted + ephemeral helpers ----
/// Send a uni-stream message to all audience members (persistent if available, ephemeral otherwise).
async fn send_to_audience<T: Serialize>(&self, msg_type: MessageType, payload: &T) -> usize {
let audience: Vec<NodeId> = match self.storage.get().await.list_audience_members() {
Ok(m) => m,
Err(_) => return 0,
};
let mut sent = 0;
for member in &audience {
if self.send_to_peer_uni(member, msg_type, payload).await.is_ok() {
sent += 1;
}
}
sent
}
/// Pull posts from a peer (persistent if available, ephemeral otherwise).
pub async fn pull_from_peer(&self, peer_id: &NodeId) -> anyhow::Result<PullStats> {
let conn = self.get_connection(peer_id).await?;
let (our_follows, follows_sync) = {
let storage = self.storage.get().await;
(
storage.list_follows()?,
storage.get_follows_with_last_sync().unwrap_or_default(),
)
};
let (mut send, mut recv) = conn.open_bi().await?;
write_typed_message(
&mut send,
MessageType::PullSyncRequest,
&PullSyncRequestPayload {
follows: our_follows,
have_post_ids: vec![], // v4: empty, using since_ms instead
since_ms: follows_sync,
},
)
.await?;
send.finish()?;
let msg_type = read_message_type(&mut recv).await?;
if msg_type != MessageType::PullSyncResponse {
anyhow::bail!("expected PullSyncResponse, got {:?}", msg_type);
}
let response: PullSyncResponsePayload = read_payload(&mut recv, 64 * 1024 * 1024).await?;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let storage = self.storage.get().await;
let mut posts_received = 0;
let mut vis_updates = 0;
for sp in &response.posts {
if !storage.is_deleted(&sp.id)? && verify_post_id(&sp.id, &sp.post) {
if storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility)? {
posts_received += 1;
}
// Protocol v4: update last_sync_ms for the author
let _ = storage.update_follow_last_sync(&sp.post.author, now_ms);
}
}
for vu in response.visibility_updates {
if let Some(post) = storage.get_post(&vu.post_id)? {
if post.author == vu.author {
if storage.update_post_visibility(&vu.post_id, &vu.visibility)? {
vis_updates += 1;
}
}
}
}
Ok(PullStats {
peers_pulled: 1,
posts_received,
visibility_updates: vis_updates,
})
}
/// Send a uni-stream message. Uses persistent connection if available, ephemeral otherwise.
pub async fn send_to_peer_uni<T: Serialize>(
&self,
peer_id: &NodeId,
msg_type: MessageType,
payload: &T,
) -> anyhow::Result<()> {
let conn = self.get_connection(peer_id).await?;
let mut send = conn.open_uni().await?;
write_typed_message(&mut send, msg_type, payload).await?;
send.finish()?;
Ok(())
}
/// Send a bi-stream request, read typed response.
pub async fn send_to_peer_bi<T: Serialize, R: DeserializeOwned>(
&self,
peer_id: &NodeId,
msg_type: MessageType,
payload: &T,
expected_response: MessageType,
) -> anyhow::Result<R> {
let conn = self.get_connection(peer_id).await?;
let (mut send, mut recv) = conn.open_bi().await?;
write_typed_message(&mut send, msg_type, payload).await?;
send.finish()?;
let resp_type = read_message_type(&mut recv).await?;
if resp_type != expected_response {
anyhow::bail!("expected {:?}, got {:?}", expected_response, resp_type);
}
Ok(read_payload(&mut recv, 10 * 1024 * 1024).await?)
}
/// Send a replication request to a peer, asking them to hold specific posts.
/// Returns the list of post IDs the peer accepted. Times out after 10 seconds.
pub async fn send_replication_request(
&self,
peer_id: &NodeId,
post_ids: Vec<PostId>,
priority: u8,
) -> anyhow::Result<Vec<PostId>> {
use crate::protocol::{ReplicationRequestPayload, ReplicationResponsePayload};
let payload = ReplicationRequestPayload { post_ids, priority };
let response: ReplicationResponsePayload = tokio::time::timeout(
std::time::Duration::from_secs(10),
self.send_to_peer_bi(
peer_id,
MessageType::ReplicationRequest,
&payload,
MessageType::ReplicationResponse,
),
)
.await
.map_err(|_| anyhow::anyhow!("replication request timed out"))??;
Ok(response.accepted)
}
/// Fetch a blob from a peer by CID.
/// Returns None if the peer doesn't have it.
/// Returns (data, response) so caller can handle manifest + CDN fields.
pub async fn fetch_blob(
&self,
cid: &[u8; 32],
from_peer: &NodeId,
) -> anyhow::Result<Option<Vec<u8>>> {
let (data, _response) = self.fetch_blob_full(cid, from_peer).await?;
Ok(data)
}
/// Fetch a blob from a peer, returning the full response including CDN metadata.
pub async fn fetch_blob_full(
&self,
cid: &[u8; 32],
from_peer: &NodeId,
) -> anyhow::Result<(Option<Vec<u8>>, BlobResponsePayload)> {
let conn = self.get_connection(from_peer).await?;
let (mut send, mut recv) = conn.open_bi().await?;
write_typed_message(
&mut send,
MessageType::BlobRequest,
&BlobRequestPayload {
cid: *cid,
requester_addresses: self.our_addresses(),
},
)
.await?;
send.finish()?;
let msg_type = read_message_type(&mut recv).await?;
if msg_type != MessageType::BlobResponse {
anyhow::bail!("expected BlobResponse, got {:?}", msg_type);
}
// 15MB limit for base64 overhead on 10MB blobs + manifest
let response: BlobResponsePayload = read_payload(&mut recv, 15 * 1024 * 1024).await?;
if !response.found {
return Ok((None, response));
}
use base64::Engine;
let data = base64::engine::general_purpose::STANDARD
.decode(&response.data_b64)
.map_err(|e| anyhow::anyhow!("invalid base64 in blob response: {}", e))?;
// Verify CID
if !crate::blob::verify_blob(cid, &data) {
anyhow::bail!("blob CID mismatch from peer {}", hex::encode(from_peer));
}
Ok((Some(data), response))
}
/// Get a connection to a peer — mesh, session, direct (if reachable), then relay.
async fn get_connection(&self, peer_id: &NodeId) -> anyhow::Result<iroh::endpoint::Connection> {
// 1. Check mesh connections
if let Some(conn) = self.conn_handle.get_connection(peer_id).await {
return Ok(conn);
}
// 2. Check session connections (touch last_active_at)
if let Some(conn) = self.conn_handle.get_any_connection(peer_id).await {
self.conn_handle.touch_session(peer_id).await;
return Ok(conn);
}
// 3. Try direct connect — skip if peer is known unreachable
if !self.conn_handle.is_likely_unreachable(peer_id).await {
let addr = self.conn_handle.resolve_peer_addr_local(peer_id).await;
if let Some(addr) = addr {
match tokio::time::timeout(
std::time::Duration::from_secs(5),
self.endpoint.connect(addr, ALPN_V2),
).await {
Ok(Ok(conn)) => {
self.conn_handle.mark_reachable(peer_id);
return Ok(conn);
}
Ok(Err(e)) => {
debug!(peer = hex::encode(peer_id), error = %e, "Direct ephemeral connect failed, trying relay");
self.conn_handle.mark_unreachable(peer_id);
}
Err(_) => {
debug!(peer = hex::encode(peer_id), "Direct ephemeral connect timed out, trying relay");
self.conn_handle.mark_unreachable(peer_id);
}
}
}
}
// 4. Re-check connection — peer may have connected to us while we were trying direct
if let Some(conn) = self.conn_handle.get_any_connection(peer_id).await {
debug!(peer = hex::encode(peer_id), "Peer connected to us while we were trying — using existing connection");
return Ok(conn);
}
// 5. Try relay introduction + hole punch (no lock during I/O)
let relay_candidates = self.conn_handle.find_relays_for(peer_id).await;
if relay_candidates.is_empty() {
anyhow::bail!(
"cannot reach peer {} (no direct path and no relay candidates)",
hex::encode(peer_id)
);
}
for (relay_peer, ttl) in &relay_candidates {
debug!(
target = hex::encode(peer_id),
relay = hex::encode(relay_peer),
ttl,
"get_connection: trying relay introduction"
);
let intro_result = tokio::time::timeout(
std::time::Duration::from_secs(15),
self.send_relay_introduce_standalone(relay_peer, peer_id, *ttl),
).await;
// Re-check: peer may have connected while relay intro was in flight
if let Some(conn) = self.conn_handle.get_any_connection(peer_id).await {
debug!(peer = hex::encode(peer_id), "Peer connected during relay intro — using existing");
return Ok(conn);
}
match intro_result {
Ok(Ok(ref result)) if result.accepted => {
let our_profile = self.conn_handle.our_nat_profile().await;
// Prefer fresh NAT profile from relay response over stale stored profile
let peer_profile = if result.nat_mapping.is_some() || result.nat_filtering.is_some() {
let mapping = result.nat_mapping.as_deref()
.map(crate::types::NatMapping::from_str_label)
.unwrap_or(crate::types::NatMapping::Unknown);
let filtering = result.nat_filtering.as_deref()
.map(crate::types::NatFiltering::from_str_label)
.unwrap_or(crate::types::NatFiltering::Unknown);
let fresh = crate::types::NatProfile::new(mapping, filtering);
let s = self.storage.get().await;
let _ = s.set_peer_nat_profile(peer_id, &fresh);
fresh
} else {
let s = self.storage.get().await;
s.get_peer_nat_profile(peer_id)
};
if let Some(conn) = crate::connection::hole_punch_with_scanning(&self.endpoint, peer_id, &result.target_addresses, our_profile, peer_profile).await {
self.conn_handle.add_session(*peer_id, conn.clone(), SessionReachMethod::HolePunch, None).await;
info!(peer = hex::encode(peer_id), "get_connection: connected via hole punch");
return Ok(conn);
}
debug!(
target = hex::encode(peer_id),
relay = hex::encode(relay_peer),
"Relay intro accepted but hole punch failed (all addrs)"
);
}
Ok(Ok(result)) => {
debug!(
target = hex::encode(peer_id),
relay = hex::encode(relay_peer),
reason = ?result.reject_reason,
"Relay intro rejected, trying next relay"
);
}
Ok(Err(e)) => {
debug!(relay = hex::encode(relay_peer), error = %e, "Relay intro failed, trying next relay");
}
Err(_) => {
debug!(relay = hex::encode(relay_peer), "Relay intro timed out, trying next relay");
}
}
}
anyhow::bail!(
"cannot reach peer {} (direct unreachable, {} relay candidates exhausted)",
hex::encode(peer_id),
relay_candidates.len()
)
}
/// Build an EndpointAddr for a peer using stored addresses from the DB.
pub async fn addr_from_storage(&self, peer_id: &NodeId) -> Option<iroh::EndpointAddr> {
let endpoint_id = iroh::EndpointId::from_bytes(peer_id).ok()?;
let mut addr = iroh::EndpointAddr::from(endpoint_id);
let storage = self.storage.get().await;
if let Ok(Some(rec)) = storage.get_peer_record(peer_id) {
for sock in &rec.addresses {
addr = addr.with_ip_addr(*sock);
}
}
Some(addr)
}
// ---- Anchor referral delegation ----
/// Request peer referrals from an anchor peer (mesh or session).
/// Does NOT hold conn_mgr lock during network I/O.
pub async fn request_anchor_referrals(
&self,
anchor: &NodeId,
) -> anyhow::Result<Vec<crate::protocol::AnchorReferral>> {
let conn = self.conn_handle.get_any_connection(anchor).await
.ok_or_else(|| anyhow::anyhow!("anchor peer not connected (mesh or session)"))?;
let endpoint = self.conn_handle.endpoint().await;
let our_addrs: Vec<String> = endpoint.addr().ip_addrs()
.map(|s| s.to_string())
.collect();
let request = crate::protocol::AnchorReferralRequestPayload {
requester: self.our_node_id,
requester_addresses: our_addrs,
};
let (mut send, mut recv) = conn.open_bi().await?;
crate::protocol::write_typed_message(&mut send, crate::protocol::MessageType::AnchorReferralRequest, &request).await?;
send.finish()?;
let msg_type = crate::protocol::read_message_type(&mut recv).await?;
if msg_type != crate::protocol::MessageType::AnchorReferralResponse {
anyhow::bail!("expected AnchorReferralResponse, got {:?}", msg_type);
}
let response: crate::protocol::AnchorReferralResponsePayload = crate::protocol::read_payload(&mut recv, 4096).await?;
// Touch session to prevent idle reaping
self.conn_handle.touch_session_if_exists(anchor);
Ok(response.referrals)
}
/// Request NAT filter probe from an anchor (determines address-restricted vs port-restricted).
/// Does NOT hold conn_mgr lock during the network wait.
pub async fn request_nat_filter_probe(&self, anchor: &NodeId) -> anyhow::Result<()> {
let conn = self.conn_handle.get_any_connection(anchor).await
.ok_or_else(|| anyhow::anyhow!("no connection to anchor"))?;
let payload = crate::protocol::NatFilterProbePayload {
node_id: self.our_node_id,
};
info!(anchor = hex::encode(anchor), "Sending NAT filter probe request");
let (mut send, mut recv) = conn.open_bi().await?;
crate::protocol::write_typed_message(&mut send, crate::protocol::MessageType::NatFilterProbe, &payload).await?;
send.finish()?;
info!("NAT filter probe request sent, waiting for response (up to 10s)");
let result: crate::protocol::NatFilterProbeResultPayload = tokio::time::timeout(
std::time::Duration::from_secs(10),
async {
let msg_type = crate::protocol::read_message_type(&mut recv).await?;
if msg_type != crate::protocol::MessageType::NatFilterProbeResult {
anyhow::bail!("expected NatFilterProbeResult, got {:?}", msg_type);
}
crate::protocol::read_payload(&mut recv, 256).await
}
).await.map_err(|_| anyhow::anyhow!("NAT filter probe response timed out after 10s"))??;
let filtering = if result.reachable {
crate::types::NatFiltering::Open
} else {
crate::types::NatFiltering::PortRestricted
};
self.conn_handle.set_nat_filtering(filtering);
info!(
filtering = %filtering,
reachable = result.reachable,
"NAT filtering determined via anchor probe"
);
self.conn_handle.log_activity(
crate::activity::ActivityLevel::Info,
crate::activity::ActivityCategory::Connection,
format!("NAT filtering: {} (anchor probe)", filtering),
None,
);
Ok(())
}
/// Register our address with an anchor peer (mesh or session).
/// Also runs NAT filter probe if filtering is still Unknown.
/// Does NOT hold conn_mgr lock during network I/O.
pub async fn send_anchor_register(&self, anchor: &NodeId) -> anyhow::Result<()> {
let conn = self.conn_handle.get_any_connection(anchor).await
.ok_or_else(|| anyhow::anyhow!("anchor peer not connected (mesh or session)"))?;
let endpoint = self.conn_handle.endpoint().await;
let mut our_addrs: Vec<String> = endpoint.addr().ip_addrs()
.map(|s| s.to_string())
.collect();
// Prepend UPnP external address (most useful for remote peers)
if let Some(ext) = self.conn_handle.upnp_external_addr().await {
let ext_str = ext.to_string();
if !our_addrs.contains(&ext_str) {
our_addrs.insert(0, ext_str);
}
}
// Prepend stable anchor advertised address
if let Some(anchor_addr) = self.conn_handle.build_anchor_advertised_addr().await {
if !our_addrs.contains(&anchor_addr) {
our_addrs.insert(0, anchor_addr);
}
}
let payload = crate::protocol::AnchorRegisterPayload {
node_id: self.our_node_id,
addresses: our_addrs,
};
let mut send = conn.open_uni().await?;
crate::protocol::write_typed_message(&mut send, crate::protocol::MessageType::AnchorRegister, &payload).await?;
send.finish()?;
// Touch session to prevent idle reaping
self.conn_handle.touch_session_if_exists(anchor);
debug!(anchor = hex::encode(anchor), "Registered with anchor");
// Also run NAT filter probe if filtering is still Unknown
let needs_probe = self.conn_handle.nat_filtering().await == crate::types::NatFiltering::Unknown;
if needs_probe {
if let Err(e) = self.request_nat_filter_probe(anchor).await {
debug!(error = %e, "NAT filter probe request failed");
}
}
Ok(())
}
/// Check if a peer is a known anchor.
pub async fn is_anchor_peer(&self, node_id: &NodeId) -> bool {
let storage = self.storage.get().await;
storage.is_peer_anchor(node_id).unwrap_or(false)
}
/// Ask a peer to introduce us to a target peer (matchmaking).
/// The introducer notifies the target so both sides can attempt simultaneous
/// hole punch. No byte relay — if hole punch fails, connection doesn't happen.
/// When `as_mesh` is true, registers the result as a mesh connection (with
/// initial exchange + stream loop). Otherwise registers as a session.
pub async fn connect_via_introduction(
&self,
target: NodeId,
introducer: NodeId,
) -> anyhow::Result<()> {
self.connect_via_introduction_inner(target, introducer, false).await
}
/// Like connect_via_introduction but registers as mesh (initial exchange + stream loop).
/// Used by the growth loop where the goal is structural mesh diversity.
pub async fn connect_via_introduction_as_mesh(
&self,
target: NodeId,
introducer: NodeId,
) -> anyhow::Result<()> {
self.connect_via_introduction_inner(target, introducer, true).await
}
async fn connect_via_introduction_inner(
&self,
target: NodeId,
introducer: NodeId,
as_mesh: bool,
) -> anyhow::Result<()> {
use crate::connection::hole_punch_with_scanning;
use crate::types::SessionReachMethod;
if target == self.our_node_id {
anyhow::bail!("cannot introduce to self");
}
if self.conn_handle.is_connected(&target).await {
return Ok(());
}
let result = tokio::time::timeout(
std::time::Duration::from_secs(15),
self.send_relay_introduce_standalone(&introducer, &target, 0),
).await;
let result = match result {
Ok(Ok(r)) if r.accepted => r,
Ok(Ok(r)) => anyhow::bail!("introduction rejected: {}", r.reject_reason.unwrap_or_default()),
Ok(Err(e)) => anyhow::bail!("introduction failed: {}", e),
Err(_) => anyhow::bail!("introduction timed out"),
};
info!(
target = hex::encode(target),
addrs = ?result.target_addresses,
"Introduction accepted, hole punching (30s window)"
);
let our_profile = self.conn_handle.our_nat_profile().await;
let peer_profile = {
let s = self.storage.get().await;
s.get_peer_nat_profile(&target)
};
match hole_punch_with_scanning(&self.endpoint, &target, &result.target_addresses, our_profile, peer_profile).await {
Some(conn) => {
if as_mesh {
self.conn_handle.mark_reachable(&target);
self.register_as_mesh(target, conn).await?;
} else {
self.conn_handle.add_session(target, conn, SessionReachMethod::HolePunch, None).await;
self.conn_handle.mark_reachable(&target);
}
Ok(())
}
None => anyhow::bail!("hole punch failed after 30s"),
}
}
/// Send a relay introduce request (standalone — no lock during I/O).
pub async fn send_relay_introduce_standalone(
&self,
relay_peer: &NodeId,
target: &NodeId,
ttl: u8,
) -> anyhow::Result<crate::protocol::RelayIntroduceResultPayload> {
let conn = self.conn_handle.get_connection(relay_peer).await
.ok_or_else(|| anyhow::anyhow!("relay peer not connected"))?;
let intro_id: crate::connection::IntroId = rand::random();
let mut our_addrs: Vec<String> = self.endpoint.addr().ip_addrs()
.filter(|s| is_publicly_routable(s))
.map(|s| s.to_string())
.collect();
if let Some(ref mapping) = self.upnp_mapping {
let ext_str = mapping.external_addr.to_string();
if !our_addrs.contains(&ext_str) {
our_addrs.insert(0, ext_str);
}
}
let our_profile = self.conn_handle.our_nat_profile().await;
let payload = crate::protocol::RelayIntroducePayload {
intro_id,
target: *target,
requester: self.our_node_id,
requester_addresses: our_addrs,
ttl,
nat_mapping: Some(our_profile.mapping.to_string()),
nat_filtering: Some(our_profile.filtering.to_string()),
};
let (mut send, mut recv) = conn.open_bi().await?;
write_typed_message(&mut send, MessageType::RelayIntroduce, &payload).await?;
send.finish()?;
let msg_type = read_message_type(&mut recv).await?;
if msg_type != MessageType::RelayIntroduceResult {
anyhow::bail!("expected RelayIntroduceResult, got {:?}", msg_type);
}
let result: crate::protocol::RelayIntroduceResultPayload = read_payload(&mut recv, 64 * 1024 * 1024).await?;
Ok(result)
}
pub async fn shutdown(self) -> anyhow::Result<()> {
// Remove UPnP port mapping before closing endpoint
if let Some(ref mapping) = self.upnp_mapping {
crate::upnp::remove_upnp_mapping(mapping.external_addr.port()).await;
}
self.endpoint.close().await;
Ok(())
}
/// Shutdown via Arc reference — closes the endpoint, causing all background tasks to exit.
pub async fn shutdown_ref(&self) {
if let Some(ref mapping) = self.upnp_mapping {
crate::upnp::remove_upnp_mapping(mapping.external_addr.port()).await;
}
self.endpoint.close().await;
}
/// Propagate an engagement diff to all known holders of a post (flat set,
/// up to 5 most-recent). Excludes the sender to avoid loops.
pub async fn propagate_engagement_diff(
&self,
post_id: &crate::types::PostId,
payload: &crate::protocol::BlobHeaderDiffPayload,
exclude_peer: &crate::types::NodeId,
) -> usize {
let holders = {
let storage = self.storage.get().await;
storage.get_file_holders(post_id).unwrap_or_default()
};
let mut sent = 0;
for (peer, _addrs) in &holders {
if peer == exclude_peer {
continue;
}
if self.send_to_peer_uni(peer, MessageType::BlobHeaderDiff, payload).await.is_ok() {
sent += 1;
}
}
sent
}
}
pub struct PullStats {
pub peers_pulled: usize,
pub posts_received: usize,
pub visibility_updates: usize,
}
/// Decide whether a post should be sent to a requesting peer.
pub fn should_send_post(
post: &Post,
visibility: &PostVisibility,
requester: &NodeId,
requester_follows: &HashSet<NodeId>,
group_members: &std::collections::HashMap<crate::types::GroupId, HashSet<NodeId>>,
) -> bool {
if &post.author == requester {
return true;
}
match visibility {
PostVisibility::Public => requester_follows.contains(&post.author),
PostVisibility::Encrypted { recipients } => {
recipients.iter().any(|wk| &wk.recipient == requester)
}
PostVisibility::GroupEncrypted { group_id, .. } => {
group_members.get(group_id)
.map(|members| members.contains(requester))
.unwrap_or(false)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use crate::types::WrappedKey;
fn make_node_id(byte: u8) -> NodeId {
[byte; 32]
}
fn make_post(author: NodeId) -> Post {
Post {
author,
content: "test".to_string(),
attachments: vec![],
timestamp_ms: 1000,
}
}
fn empty_groups() -> HashMap<crate::types::GroupId, HashSet<NodeId>> {
HashMap::new()
}
#[test]
fn own_post_always_sent() {
let requester = make_node_id(1);
let post = make_post(requester);
let follows = HashSet::new();
assert!(should_send_post(&post, &PostVisibility::Public, &requester, &follows, &empty_groups()));
}
#[test]
fn public_post_followed_author_sent() {
let author = make_node_id(1);
let requester = make_node_id(2);
let post = make_post(author);
let follows: HashSet<NodeId> = [author].into_iter().collect();
assert!(should_send_post(&post, &PostVisibility::Public, &requester, &follows, &empty_groups()));
}
#[test]
fn public_post_unfollowed_author_filtered() {
let author = make_node_id(1);
let requester = make_node_id(2);
let post = make_post(author);
let follows = HashSet::new();
assert!(!should_send_post(&post, &PostVisibility::Public, &requester, &follows, &empty_groups()));
}
#[test]
fn encrypted_post_is_recipient_sent() {
let author = make_node_id(1);
let requester = make_node_id(2);
let post = make_post(author);
let visibility = PostVisibility::Encrypted {
recipients: vec![WrappedKey {
recipient: requester,
wrapped_cek: vec![0u8; 60],
}],
};
let follows = HashSet::new();
assert!(should_send_post(&post, &visibility, &requester, &follows, &empty_groups()));
}
#[test]
fn encrypted_post_not_recipient_filtered() {
let author = make_node_id(1);
let requester = make_node_id(2);
let other = make_node_id(3);
let post = make_post(author);
let visibility = PostVisibility::Encrypted {
recipients: vec![WrappedKey {
recipient: other,
wrapped_cek: vec![0u8; 60],
}],
};
let follows = HashSet::new();
assert!(!should_send_post(&post, &visibility, &requester, &follows, &empty_groups()));
}
#[test]
fn group_encrypted_member_sent() {
let author = make_node_id(1);
let requester = make_node_id(2);
let post = make_post(author);
let group_id = [42u8; 32];
let visibility = PostVisibility::GroupEncrypted {
group_id,
epoch: 1,
wrapped_cek: vec![0u8; 60],
};
let follows = HashSet::new();
let mut groups = HashMap::new();
groups.insert(group_id, [requester].into_iter().collect());
assert!(should_send_post(&post, &visibility, &requester, &follows, &groups));
}
#[test]
fn group_encrypted_non_member_filtered() {
let author = make_node_id(1);
let requester = make_node_id(2);
let other = make_node_id(3);
let post = make_post(author);
let group_id = [42u8; 32];
let visibility = PostVisibility::GroupEncrypted {
group_id,
epoch: 1,
wrapped_cek: vec![0u8; 60],
};
let follows = HashSet::new();
let mut groups = HashMap::new();
groups.insert(group_id, [other].into_iter().collect());
assert!(!should_send_post(&post, &visibility, &requester, &follows, &groups));
}
}