Sync fixes, hole punch address family filtering, ManifestPush blob fetch

Per-peer sync (People tab Sync button) now resets last_sync_ms to 0 so
the responder sends ALL posts, not just posts newer than last sync.

ManifestPush post discovery now fetches blobs alongside discovered posts
while the connection is live, instead of waiting for the next prefetch cycle.

Hole punch: filter_reachable_families() checks endpoint bound sockets and
removes addresses the local endpoint can't reach (IPv4-only won't try IPv6).
Applied to hole_punch_single, hole_punch_parallel, hole_punch_with_scanning.

Relay introduce paths switched from is_publicly_routable to is_shareable_addr
so LAN addresses (192.168.x.x) are included for same-WiFi hole punching.
is_shareable_addr made pub(crate).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-03-31 17:00:43 -04:00
parent 9e87679c39
commit 1030dc21a7
3 changed files with 90 additions and 15 deletions

View file

@ -88,7 +88,9 @@ pub(crate) async fn hole_punch_parallel(
) -> Option<iroh::endpoint::Connection> {
use crate::protocol::ALPN_V2;
let addrs: Vec<iroh::EndpointAddr> = addresses
// Filter to address families this endpoint can actually reach
let reachable = filter_reachable_families(endpoint, addresses);
let addrs: Vec<iroh::EndpointAddr> = reachable
.iter()
.filter_map(|addr_str| {
let sock = normalize_addr(addr_str.parse::<std::net::SocketAddr>().ok()?);
@ -187,8 +189,9 @@ pub(crate) async fn hole_punch_with_scanning(
return hole_punch_parallel(endpoint, target, addresses).await;
}
// Parse anchor-observed address (first in list, injected by relay)
let observed_addr = addresses.first()
// Filter to reachable families, then use observed address (first in list, injected by relay)
let reachable = filter_reachable_families(endpoint, addresses);
let observed_addr = reachable.first()
.and_then(|a| a.parse::<std::net::SocketAddr>().ok())
.map(|s| normalize_addr(s));
@ -419,7 +422,9 @@ async fn hole_punch_single(
target: &NodeId,
addresses: &[String],
) -> Option<iroh::endpoint::Connection> {
let addr = addresses.first()
// Filter to reachable families first, then take the first address
let reachable = filter_reachable_families(endpoint, addresses);
let addr = reachable.first()
.and_then(|a| a.parse::<std::net::SocketAddr>().ok())
.map(|s| normalize_addr(s))?;
let eid = iroh::EndpointId::from_bytes(target).ok()?;
@ -438,6 +443,29 @@ async fn hole_punch_single(
}
/// Filter addresses to only families the local endpoint can reach.
/// An IPv4-only endpoint (e.g., VPN) shouldn't waste time trying IPv6 addresses.
fn filter_reachable_families(endpoint: &iroh::Endpoint, addresses: &[String]) -> Vec<String> {
let sockets = endpoint.bound_sockets();
let has_v4 = sockets.iter().any(|s| s.is_ipv4());
let has_v6 = sockets.iter().any(|s| s.is_ipv6());
// If we have both families (or can't determine), pass everything through
if (has_v4 && has_v6) || (!has_v4 && !has_v6) {
return addresses.to_vec();
}
addresses.iter().filter(|a| {
if let Ok(sock) = a.parse::<std::net::SocketAddr>() {
let normalized = normalize_addr(sock);
match normalized {
std::net::SocketAddr::V4(_) => has_v4,
std::net::SocketAddr::V6(_) => has_v6,
}
} else {
true // can't parse — keep it, let the connect attempt sort it out
}
}).cloned().collect()
}
/// Normalize IPv4-mapped IPv6 addresses (e.g. [::ffff:1.2.3.4]:port) to plain IPv4.
/// Dual-stack servers report IPv4 peers as mapped-v6 but v4-only clients can't reach them.
pub fn normalize_addr(addr: std::net::SocketAddr) -> std::net::SocketAddr {
@ -3827,8 +3855,9 @@ impl ConnectionManager {
.ok_or_else(|| anyhow::anyhow!("relay peer not connected"))?;
let intro_id: IntroId = rand::random();
// Include LAN addresses — peers may be on same WiFi behind same NAT
let mut our_addrs: Vec<String> = self.endpoint.addr().ip_addrs()
.filter(|s| crate::network::is_publicly_routable(s))
.filter(|s| crate::network::is_shareable_addr(s))
.map(|s| s.to_string())
.collect();
// Prepend UPnP external address if available
@ -3890,9 +3919,9 @@ impl ConnectionManager {
// Are WE the target?
if payload.target == self.our_node_id {
// Respond with our globally-routable addresses only (no Docker bridge / private IPs)
// Include LAN addresses — peers may be on same WiFi
let mut our_addrs: Vec<String> = self.endpoint.addr().ip_addrs()
.filter(|s| crate::network::is_publicly_routable(s))
.filter(|s| crate::network::is_shareable_addr(s))
.map(|s| s.to_string())
.collect();
// Prepend UPnP external address if available
@ -3913,9 +3942,9 @@ impl ConnectionManager {
write_typed_message(&mut send, MessageType::RelayIntroduceResult, &result).await?;
send.finish()?;
// Hole punch: filter to routable addresses only (skip Docker bridge IPs etc.)
// Hole punch: filter to shareable addresses (includes LAN, skips Docker/loopback)
let routable_requester_addrs: Vec<String> = payload.requester_addresses.iter()
.filter(|a| a.parse::<std::net::SocketAddr>().map_or(false, |s| crate::network::is_publicly_routable(&s)))
.filter(|a| a.parse::<std::net::SocketAddr>().map_or(false, |s| crate::network::is_shareable_addr(&s)))
.cloned()
.collect();
info!(
@ -3943,7 +3972,7 @@ impl ConnectionManager {
// Register as session with the peer's address for relay introduction
let remote_sock = requester_addrs.iter()
.filter_map(|a| a.parse::<std::net::SocketAddr>().ok())
.find(|s| crate::network::is_publicly_routable(s));
.find(|s| crate::network::is_shareable_addr(s));
let mut cm = conn_mgr_arc.lock().await;
if cm.is_connected(&requester) {
// Initiator already connected to us (their punch succeeded first)
@ -5218,8 +5247,46 @@ impl ConnectionManager {
_ => {}
}
}
// Fetch blobs for discovered posts while connection is live
if fetched > 0 {
debug!(discovered = fetched, "ManifestPush post discovery");
debug!(discovered = fetched, "ManifestPush post discovery — fetching blobs");
for (post_id, author) in &discovery_posts {
let attachments = {
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
storage.get_post(post_id).ok().flatten()
.map(|p| p.attachments.clone())
.unwrap_or_default()
};
let blob_store = {
let cm = cm_arc.lock().await;
Arc::clone(&cm.blob_store)
};
for att in &attachments {
if blob_store.has(&att.cid) { continue; }
let blob_result: anyhow::Result<()> = async {
let (mut bs, mut br) = conn.open_bi().await?;
let req = BlobRequestPayload { cid: att.cid, requester_addresses: vec![] };
write_typed_message(&mut bs, MessageType::BlobRequest, &req).await?;
bs.finish()?;
let mt = read_message_type(&mut br).await?;
if mt != MessageType::BlobResponse { return Ok(()); }
let resp: BlobResponsePayload = read_payload(&mut br, MAX_PAYLOAD).await?;
if resp.found {
use base64::Engine;
let data = base64::engine::general_purpose::STANDARD.decode(resp.data_b64.as_bytes())?;
blob_store.store(&att.cid, &data)?;
let cm = cm_arc.lock().await;
let storage = cm.storage.get().await;
let _ = storage.record_blob(&att.cid, post_id, author, data.len() as u64, &att.mime_type, att.size_bytes);
}
Ok(())
}.await;
if let Err(e) = blob_result {
debug!(cid = hex::encode(att.cid), error = %e, "ManifestPush blob fetch failed");
}
}
}
}
});
}
@ -5761,8 +5828,9 @@ impl ConnectionManager {
// Are we the target?
if payload.target == cm.our_node_id {
// Gather our addresses, then handle outside lock
// Include LAN addresses (192.168.x.x) — peers may be on the same WiFi
let mut our_addrs: Vec<String> = cm.endpoint.addr().ip_addrs()
.filter(|s| crate::network::is_publicly_routable(s)).map(|s| s.to_string()).collect();
.filter(|s| crate::network::is_shareable_addr(s)).map(|s| s.to_string()).collect();
if let Some(ref ext) = cm.upnp_external_addr {
let ext_str = ext.to_string();
if !our_addrs.contains(&ext_str) { our_addrs.insert(0, ext_str); }
@ -5807,13 +5875,14 @@ impl ConnectionManager {
};
write_typed_message(&mut send, MessageType::RelayIntroduceResult, &result).await?;
send.finish()?;
// Accept LAN addresses too — peers may be on same WiFi
let routable_addrs: Vec<String> = payload.requester_addresses.iter()
.filter(|a| a.parse::<std::net::SocketAddr>().map_or(false, |s| crate::network::is_publicly_routable(&s)))
.filter(|a| a.parse::<std::net::SocketAddr>().map_or(false, |s| crate::network::is_shareable_addr(&s)))
.cloned().collect();
let requester = payload.requester;
tokio::spawn(async move {
if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &routable_addrs, our_nat_profile, peer_nat_profile).await {
let remote_sock = routable_addrs.iter().filter_map(|a| a.parse::<std::net::SocketAddr>().ok()).find(|s| crate::network::is_publicly_routable(s));
let remote_sock = routable_addrs.iter().filter_map(|a| a.parse::<std::net::SocketAddr>().ok()).find(|s| crate::network::is_shareable_addr(s));
let mut cm = cm_arc.lock().await;
if cm.is_connected(&requester) { return; }
cm.add_session(requester, conn, SessionReachMethod::HolePunch, remote_sock);

View file

@ -63,7 +63,7 @@ fn is_public_ip(ip: IpAddr) -> bool {
/// Filter out addresses that are never useful to share (loopback, link-local, unspecified).
/// Keeps LAN addresses (192.168.x, 10.x, 172.16-31.x) since peers might be on the same LAN.
fn is_shareable_addr(addr: &SocketAddr) -> bool {
pub(crate) fn is_shareable_addr(addr: &SocketAddr) -> bool {
match addr.ip() {
IpAddr::V4(v4) => !v4.is_loopback() && !v4.is_link_local() && !v4.is_unspecified(),
IpAddr::V6(v6) => !v6.is_loopback() && !v6.is_unspecified(),

View file

@ -2421,6 +2421,12 @@ impl Node {
/// Connect to a peer and establish a mesh connection
pub async fn sync_with(&self, peer_id: NodeId) -> anyhow::Result<()> {
self.connect_by_node_id(peer_id).await?;
// Reset last_sync_ms for this author so the responder sends ALL posts,
// not just posts newer than our last sync timestamp.
{
let storage = self.storage.get().await;
let _ = storage.update_follow_last_sync(&peer_id, 0);
}
let stats = self.network.conn_handle().pull_from_peer(&peer_id).await?;
// Also fetch engagement data (reactions, comments) for posts we hold
let engagement = self.network.conn_handle().fetch_engagement_from_peer(&peer_id).await.unwrap_or(0);