diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index d72780c..3f97624 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -88,7 +88,9 @@ pub(crate) async fn hole_punch_parallel( ) -> Option { use crate::protocol::ALPN_V2; - let addrs: Vec = addresses + // Filter to address families this endpoint can actually reach + let reachable = filter_reachable_families(endpoint, addresses); + let addrs: Vec = reachable .iter() .filter_map(|addr_str| { let sock = normalize_addr(addr_str.parse::().ok()?); @@ -187,8 +189,9 @@ pub(crate) async fn hole_punch_with_scanning( return hole_punch_parallel(endpoint, target, addresses).await; } - // Parse anchor-observed address (first in list, injected by relay) - let observed_addr = addresses.first() + // Filter to reachable families, then use observed address (first in list, injected by relay) + let reachable = filter_reachable_families(endpoint, addresses); + let observed_addr = reachable.first() .and_then(|a| a.parse::().ok()) .map(|s| normalize_addr(s)); @@ -419,7 +422,9 @@ async fn hole_punch_single( target: &NodeId, addresses: &[String], ) -> Option { - let addr = addresses.first() + // Filter to reachable families first, then take the first address + let reachable = filter_reachable_families(endpoint, addresses); + let addr = reachable.first() .and_then(|a| a.parse::().ok()) .map(|s| normalize_addr(s))?; let eid = iroh::EndpointId::from_bytes(target).ok()?; @@ -438,6 +443,29 @@ async fn hole_punch_single( } +/// Filter addresses to only families the local endpoint can reach. +/// An IPv4-only endpoint (e.g., VPN) shouldn't waste time trying IPv6 addresses. +fn filter_reachable_families(endpoint: &iroh::Endpoint, addresses: &[String]) -> Vec { + let sockets = endpoint.bound_sockets(); + let has_v4 = sockets.iter().any(|s| s.is_ipv4()); + let has_v6 = sockets.iter().any(|s| s.is_ipv6()); + // If we have both families (or can't determine), pass everything through + if (has_v4 && has_v6) || (!has_v4 && !has_v6) { + return addresses.to_vec(); + } + addresses.iter().filter(|a| { + if let Ok(sock) = a.parse::() { + let normalized = normalize_addr(sock); + match normalized { + std::net::SocketAddr::V4(_) => has_v4, + std::net::SocketAddr::V6(_) => has_v6, + } + } else { + true // can't parse — keep it, let the connect attempt sort it out + } + }).cloned().collect() +} + /// Normalize IPv4-mapped IPv6 addresses (e.g. [::ffff:1.2.3.4]:port) to plain IPv4. /// Dual-stack servers report IPv4 peers as mapped-v6 but v4-only clients can't reach them. pub fn normalize_addr(addr: std::net::SocketAddr) -> std::net::SocketAddr { @@ -3827,8 +3855,9 @@ impl ConnectionManager { .ok_or_else(|| anyhow::anyhow!("relay peer not connected"))?; let intro_id: IntroId = rand::random(); + // Include LAN addresses — peers may be on same WiFi behind same NAT let mut our_addrs: Vec = self.endpoint.addr().ip_addrs() - .filter(|s| crate::network::is_publicly_routable(s)) + .filter(|s| crate::network::is_shareable_addr(s)) .map(|s| s.to_string()) .collect(); // Prepend UPnP external address if available @@ -3890,9 +3919,9 @@ impl ConnectionManager { // Are WE the target? if payload.target == self.our_node_id { - // Respond with our globally-routable addresses only (no Docker bridge / private IPs) + // Include LAN addresses — peers may be on same WiFi let mut our_addrs: Vec = self.endpoint.addr().ip_addrs() - .filter(|s| crate::network::is_publicly_routable(s)) + .filter(|s| crate::network::is_shareable_addr(s)) .map(|s| s.to_string()) .collect(); // Prepend UPnP external address if available @@ -3913,9 +3942,9 @@ impl ConnectionManager { write_typed_message(&mut send, MessageType::RelayIntroduceResult, &result).await?; send.finish()?; - // Hole punch: filter to routable addresses only (skip Docker bridge IPs etc.) + // Hole punch: filter to shareable addresses (includes LAN, skips Docker/loopback) let routable_requester_addrs: Vec = payload.requester_addresses.iter() - .filter(|a| a.parse::().map_or(false, |s| crate::network::is_publicly_routable(&s))) + .filter(|a| a.parse::().map_or(false, |s| crate::network::is_shareable_addr(&s))) .cloned() .collect(); info!( @@ -3943,7 +3972,7 @@ impl ConnectionManager { // Register as session with the peer's address for relay introduction let remote_sock = requester_addrs.iter() .filter_map(|a| a.parse::().ok()) - .find(|s| crate::network::is_publicly_routable(s)); + .find(|s| crate::network::is_shareable_addr(s)); let mut cm = conn_mgr_arc.lock().await; if cm.is_connected(&requester) { // Initiator already connected to us (their punch succeeded first) @@ -5218,8 +5247,46 @@ impl ConnectionManager { _ => {} } } + // Fetch blobs for discovered posts while connection is live if fetched > 0 { - debug!(discovered = fetched, "ManifestPush post discovery"); + debug!(discovered = fetched, "ManifestPush post discovery — fetching blobs"); + for (post_id, author) in &discovery_posts { + let attachments = { + let cm = cm_arc.lock().await; + let storage = cm.storage.get().await; + storage.get_post(post_id).ok().flatten() + .map(|p| p.attachments.clone()) + .unwrap_or_default() + }; + let blob_store = { + let cm = cm_arc.lock().await; + Arc::clone(&cm.blob_store) + }; + for att in &attachments { + if blob_store.has(&att.cid) { continue; } + let blob_result: anyhow::Result<()> = async { + let (mut bs, mut br) = conn.open_bi().await?; + let req = BlobRequestPayload { cid: att.cid, requester_addresses: vec![] }; + write_typed_message(&mut bs, MessageType::BlobRequest, &req).await?; + bs.finish()?; + let mt = read_message_type(&mut br).await?; + if mt != MessageType::BlobResponse { return Ok(()); } + let resp: BlobResponsePayload = read_payload(&mut br, MAX_PAYLOAD).await?; + if resp.found { + use base64::Engine; + let data = base64::engine::general_purpose::STANDARD.decode(resp.data_b64.as_bytes())?; + blob_store.store(&att.cid, &data)?; + let cm = cm_arc.lock().await; + let storage = cm.storage.get().await; + let _ = storage.record_blob(&att.cid, post_id, author, data.len() as u64, &att.mime_type, att.size_bytes); + } + Ok(()) + }.await; + if let Err(e) = blob_result { + debug!(cid = hex::encode(att.cid), error = %e, "ManifestPush blob fetch failed"); + } + } + } } }); } @@ -5761,8 +5828,9 @@ impl ConnectionManager { // Are we the target? if payload.target == cm.our_node_id { // Gather our addresses, then handle outside lock + // Include LAN addresses (192.168.x.x) — peers may be on the same WiFi let mut our_addrs: Vec = cm.endpoint.addr().ip_addrs() - .filter(|s| crate::network::is_publicly_routable(s)).map(|s| s.to_string()).collect(); + .filter(|s| crate::network::is_shareable_addr(s)).map(|s| s.to_string()).collect(); if let Some(ref ext) = cm.upnp_external_addr { let ext_str = ext.to_string(); if !our_addrs.contains(&ext_str) { our_addrs.insert(0, ext_str); } @@ -5807,13 +5875,14 @@ impl ConnectionManager { }; write_typed_message(&mut send, MessageType::RelayIntroduceResult, &result).await?; send.finish()?; + // Accept LAN addresses too — peers may be on same WiFi let routable_addrs: Vec = payload.requester_addresses.iter() - .filter(|a| a.parse::().map_or(false, |s| crate::network::is_publicly_routable(&s))) + .filter(|a| a.parse::().map_or(false, |s| crate::network::is_shareable_addr(&s))) .cloned().collect(); let requester = payload.requester; tokio::spawn(async move { if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &routable_addrs, our_nat_profile, peer_nat_profile).await { - let remote_sock = routable_addrs.iter().filter_map(|a| a.parse::().ok()).find(|s| crate::network::is_publicly_routable(s)); + let remote_sock = routable_addrs.iter().filter_map(|a| a.parse::().ok()).find(|s| crate::network::is_shareable_addr(s)); let mut cm = cm_arc.lock().await; if cm.is_connected(&requester) { return; } cm.add_session(requester, conn, SessionReachMethod::HolePunch, remote_sock); diff --git a/crates/core/src/network.rs b/crates/core/src/network.rs index 4830873..fe5cb67 100644 --- a/crates/core/src/network.rs +++ b/crates/core/src/network.rs @@ -63,7 +63,7 @@ fn is_public_ip(ip: IpAddr) -> bool { /// Filter out addresses that are never useful to share (loopback, link-local, unspecified). /// Keeps LAN addresses (192.168.x, 10.x, 172.16-31.x) since peers might be on the same LAN. -fn is_shareable_addr(addr: &SocketAddr) -> bool { +pub(crate) fn is_shareable_addr(addr: &SocketAddr) -> bool { match addr.ip() { IpAddr::V4(v4) => !v4.is_loopback() && !v4.is_link_local() && !v4.is_unspecified(), IpAddr::V6(v6) => !v6.is_loopback() && !v6.is_unspecified(), diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 7652c8d..3eba7af 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -2421,6 +2421,12 @@ impl Node { /// Connect to a peer and establish a mesh connection pub async fn sync_with(&self, peer_id: NodeId) -> anyhow::Result<()> { self.connect_by_node_id(peer_id).await?; + // Reset last_sync_ms for this author so the responder sends ALL posts, + // not just posts newer than our last sync timestamp. + { + let storage = self.storage.get().await; + let _ = storage.update_follow_last_sync(&peer_id, 0); + } let stats = self.network.conn_handle().pull_from_peer(&peer_id).await?; // Also fetch engagement data (reactions, comments) for posts we hold let engagement = self.network.conn_handle().fetch_engagement_from_peer(&peer_id).await.unwrap_or(0);