diff --git a/Cargo.lock b/Cargo.lock index ebbbd6c..ae10be2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,7 +2732,7 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "itsgoin-cli" -version = "0.7.2" +version = "0.7.3" dependencies = [ "anyhow", "hex", @@ -2744,7 +2744,7 @@ dependencies = [ [[package]] name = "itsgoin-core" -version = "0.7.2" +version = "0.7.3" dependencies = [ "anyhow", "base64 0.22.1", @@ -2769,7 +2769,7 @@ dependencies = [ [[package]] name = "itsgoin-desktop" -version = "0.7.2" +version = "0.7.3" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index db2ce1a..2e53c35 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsgoin-cli" -version = "0.7.2" +version = "0.7.3" edition = "2021" [[bin]] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 5bf29e4..71a4f85 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsgoin-core" -version = "0.7.2" +version = "0.7.3" edition = "2021" [dependencies] diff --git a/crates/core/src/android_wifi.rs b/crates/core/src/android_wifi.rs index f355bd3..bee1dcb 100644 --- a/crates/core/src/android_wifi.rs +++ b/crates/core/src/android_wifi.rs @@ -213,3 +213,44 @@ impl Drop for MulticastLockGuard { } } } + +/// Stop the Android `NodeService` foreground service. Called from the +/// in-app close button so the network process actually exits rather +/// than continuing to run as a foreground service after the Activity +/// closes (foreground services are kept alive across Activity exit by +/// design). +/// +/// Errors are logged but not propagated — best-effort cleanup before +/// `AppHandle::exit(0)` finishes the Activity. +pub fn stop_node_service() { + if let Err(e) = stop_node_service_inner() { + warn!("stop_node_service failed (will exit anyway): {}", e); + } +} + +fn stop_node_service_inner() -> Result<(), String> { + let ctx = ndk_context::android_context(); + if ctx.vm().is_null() { + return Err("ndk_context: null JavaVM".into()); + } + if ctx.context().is_null() { + return Err("ndk_context: null activity context".into()); + } + let vm = unsafe { JavaVM::from_raw(ctx.vm() as *mut _) } + .map_err(|e| format!("JavaVM init: {:?}", e))?; + let mut env = vm + .attach_current_thread() + .map_err(|e| format!("attach_current_thread: {:?}", e))?; + let activity = unsafe { JObject::from_raw(ctx.context() as *mut _) }; + + // NodeService.stopFromNative(activity) + env.call_static_method( + "com/itsgoin/app/NodeService", + "stopFromNative", + "(Landroid/content/Context;)V", + &[JValue::Object(&activity)], + ) + .map_err(|e| format!("stopFromNative: {:?}", e))?; + + Ok(()) +} diff --git a/crates/core/src/connection.rs b/crates/core/src/connection.rs index 0892455..07b6633 100644 --- a/crates/core/src/connection.rs +++ b/crates/core/src/connection.rs @@ -145,14 +145,22 @@ pub(crate) async fn hole_punch_parallel( None } +// EDM port scanner — DISABLED in v0.7.3 (see hole_punch_with_scanning). +// Constants and helpers preserved as the refactor target for a raw-UDP +// scanner that bypasses iroh's path-store accumulation. + /// Timeout for each individual scan connect attempt (200ms → ~20 in-flight at 100/sec) +#[allow(dead_code)] const SCAN_CONNECT_TIMEOUT_MS: u64 = 200; /// Scan rate: one attempt every 10ms = 100 ports/sec +#[allow(dead_code)] const SCAN_INTERVAL_MS: u64 = 10; /// How often to punch peer's anchor-observed address during scanning (seconds). /// Each punch checks if the peer has opened a firewall port matching our actual port. +#[allow(dead_code)] const SCAN_PUNCH_INTERVAL_SECS: u64 = 2; /// Maximum scan duration (seconds) — accept the cost for otherwise-impossible connections +#[allow(dead_code)] const SCAN_MAX_DURATION_SECS: u64 = 300; // 5 minutes /// Global cap on concurrent port-scan hole punches. Each scanner fires @@ -164,11 +172,63 @@ const SCAN_MAX_DURATION_SECS: u64 = 300; // 5 minutes /// at proxy timeouts. A permit is acquired before the scanning loop /// starts and held until the scanner returns; extra callers fall back /// to the cheaper `hole_punch_parallel`. +#[allow(dead_code)] fn scanner_semaphore() -> &'static tokio::sync::Semaphore { static SEM: std::sync::OnceLock = std::sync::OnceLock::new(); SEM.get_or_init(|| tokio::sync::Semaphore::new(1)) } +/// Hole punch orchestrator. +/// +/// **v0.7.3:** the EDM port scanner is DISABLED. We do Step 1 (quick punch to +/// the anchor-observed address) → Step 2 (parallel punch over the 30s window +/// to all known addresses). No port scan. +/// +/// **Why disabled:** iroh's `Endpoint` accumulates every `endpoint.connect()` +/// target into a per-endpoint paths set and probes them all in the background +/// under QUIC NAT-traversal. A 100-probes/sec / 5-min scan inserts ~30,000 +/// paths; iroh then probes all of them. Observed at 22MB/s outbound from a +/// single client. Disabled until we replace per-probe `endpoint.connect()` +/// with a raw `socket.send_to()` on the endpoint's bound UDP socket — see +/// `edm_port_scan_disabled_v0_7_3` for the preserved scanner logic to +/// refactor against. +/// +/// Original docstring is preserved on `edm_port_scan_disabled_v0_7_3`. +pub(crate) async fn hole_punch_with_scanning( + endpoint: &iroh::Endpoint, + target: &NodeId, + addresses: &[String], + _our_profile: crate::types::NatProfile, + _peer_profile: crate::types::NatProfile, +) -> Option { + if let Some(conn) = hole_punch_single(endpoint, target, addresses).await { + return Some(conn); + } + hole_punch_parallel(endpoint, target, addresses).await +} + +/// **DISABLED in v0.7.3** — kept as the refactor target for a safe replacement. +/// +/// **Why disabled:** iroh's `Endpoint` accumulates every `endpoint.connect()` +/// target into a per-endpoint paths set and probes them all in the background +/// under QUIC NAT-traversal. A 100-probes/sec / 5-min scan inserts ~30,000 +/// paths; iroh then probes all of them. Observed at 22MB/s outbound from a +/// single client (DoS-grade). +/// +/// **Refactor target:** replace `endpoint.connect()` in the per-probe path +/// with a raw `socket.send_to(...)` on the endpoint's bound UDP socket. The +/// probe still opens a NAT mapping on our side; we just don't ask iroh to +/// manage the path. The every-2s punch retains `endpoint.connect()` so the +/// real handshake completes when the peer's punch arrives. +/// +/// Logic worth preserving below: role-based scanner/puncher split, +/// `PortWalkIter`, `scanner_semaphore`, `found_tx`/`found_rx` channel +/// pattern, deadline + `tokio::select!` orchestration. +/// +/// --- +/// +/// Original docstring: +/// /// Advanced hole punch with port scanning fallback for EDM/port-restricted NAT. /// /// **Role-based behavior** (each side calls this independently): @@ -183,7 +243,8 @@ fn scanner_semaphore() -> &'static tokio::sync::Semaphore { /// NAT mapping alive and checks if the peer's scan has opened their firewall for us. /// /// For both-EDM pairs: both sides scan + punch simultaneously. -pub(crate) async fn hole_punch_with_scanning( +#[allow(dead_code)] +async fn edm_port_scan_disabled_v0_7_3( endpoint: &iroh::Endpoint, target: &NodeId, addresses: &[String], @@ -389,12 +450,17 @@ pub(crate) async fn hole_punch_with_scanning( /// Iterator that walks outward from a base port: base, base+1, base-1, base+2, base-2, ... /// Skips ports outside [1, 65535]. +/// +/// Used by `edm_port_scan_disabled_v0_7_3` — preserved for the future +/// raw-UDP scanner refactor. +#[allow(dead_code)] struct PortWalkIter { base: u16, offset: u32, tried_plus: bool, // within current offset, have we tried base+offset? } +#[allow(dead_code)] impl PortWalkIter { fn new(base: u16) -> Self { Self { base, offset: 0, tried_plus: false } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index e6ed36d..ede6096 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -92,6 +92,175 @@ async fn ensure_initial_v_me( generate_and_store_initial_v_me(&s, persona_id, now_ms) } +/// Probe a list of anchors with batched parallelism, returning the first +/// successful NodeId. Remaining probes continue in background tasks after +/// first success and naturally register additional mesh connections. +/// +/// **Parameters fixed in v0.7.3:** +/// - 3 anchors in flight at a time +/// - 2-second stagger between batch dispatches +/// - 10s per-anchor connect timeout +/// - Failed probes to anchors with `last_seen_ms` older than 3 days +/// auto-delete from `known_anchors` (self-healing pruning) +/// +/// Returns `None` only when every probe completed without success. +async fn probe_anchors_batched( + anchors: Vec<(NodeId, Vec)>, + network: Arc, + storage: Arc, + self_node_id: NodeId, + label: &'static str, +) -> Option { + use std::sync::atomic::{AtomicUsize, Ordering}; + + const BATCH_SIZE: usize = 3; + const BATCH_STAGGER_SECS: u64 = 2; + const PER_ANCHOR_TIMEOUT_SECS: u64 = 10; + const STALE_THRESHOLD_MS: u64 = 3 * 86_400 * 1000; + + let total = anchors.len(); + if total == 0 { + return None; + } + + let (success_tx, success_rx) = tokio::sync::oneshot::channel::(); + let success_tx = Arc::new(tokio::sync::Mutex::new(Some(success_tx))); + let completed = Arc::new(AtomicUsize::new(0)); + let all_done = Arc::new(tokio::sync::Notify::new()); + + // Dispatcher: spawns per-anchor tasks in batches of BATCH_SIZE, + // sleeping BATCH_STAGGER_SECS between batches. The per-anchor tasks + // continue running after the dispatcher exits. + let dispatcher = { + let network = Arc::clone(&network); + let storage = Arc::clone(&storage); + let success_tx = Arc::clone(&success_tx); + let completed = Arc::clone(&completed); + let all_done = Arc::clone(&all_done); + tokio::spawn(async move { + let mut iter = anchors.into_iter(); + loop { + let batch: Vec<_> = (&mut iter).take(BATCH_SIZE).collect(); + if batch.is_empty() { + break; + } + let more = iter.size_hint().0 > 0; + for (nid, addrs) in batch { + let network = Arc::clone(&network); + let storage = Arc::clone(&storage); + let success_tx = Arc::clone(&success_tx); + let completed = Arc::clone(&completed); + let all_done = Arc::clone(&all_done); + tokio::spawn(async move { + let result = probe_one_anchor(&network, &storage, nid, addrs, self_node_id, label).await; + if let Some(nid) = result { + let mut guard = success_tx.lock().await; + if let Some(sender) = guard.take() { + let _ = sender.send(nid); + } + } + let prev = completed.fetch_add(1, Ordering::SeqCst); + if prev + 1 == total { + all_done.notify_one(); + } + }); + } + if more { + tokio::time::sleep(std::time::Duration::from_secs(BATCH_STAGGER_SECS)).await; + } + } + }) + }; + + // Race: first success vs all probes complete unsuccessfully. + let result = tokio::select! { + Ok(nid) = success_rx => Some(nid), + _ = all_done.notified() => None, + }; + + // Detach the dispatcher; in-flight per-anchor tasks continue. + drop(dispatcher); + + let _ = BATCH_STAGGER_SECS; // silence unused-const if compiler is picky + let _ = PER_ANCHOR_TIMEOUT_SECS; + let _ = STALE_THRESHOLD_MS; + result +} + +async fn probe_one_anchor( + network: &crate::network::Network, + storage: &Arc, + nid: NodeId, + addrs: Vec, + self_node_id: NodeId, + label: &'static str, +) -> Option { + const PER_ANCHOR_TIMEOUT_SECS: u64 = 10; + const STALE_THRESHOLD_MS: u64 = 3 * 86_400 * 1000; + + if nid == self_node_id || network.is_peer_connected_or_session(&nid).await { + return None; + } + let endpoint_id = match iroh::EndpointId::from_bytes(&nid) { + Ok(eid) => eid, + Err(_) => return None, + }; + let mut addr = iroh::EndpointAddr::from(endpoint_id); + for sa in &addrs { + addr = addr.with_ip_addr(*sa); + } + info!(peer = hex::encode(&nid), label, "Trying anchor"); + let result = tokio::time::timeout( + std::time::Duration::from_secs(PER_ANCHOR_TIMEOUT_SECS), + network.connect_to_anchor(nid, addr), + ).await; + + match result { + Ok(Ok(())) => { + info!(peer = hex::encode(&nid), label, "Connected to anchor"); + Some(nid) + } + Ok(Err(e)) => { + debug!(error = %e, peer = hex::encode(&nid), label, "Anchor connect failed"); + maybe_prune_stale_anchor(storage, &nid, STALE_THRESHOLD_MS).await; + None + } + Err(_) => { + debug!(peer = hex::encode(&nid), label, "Anchor connect timed out"); + maybe_prune_stale_anchor(storage, &nid, STALE_THRESHOLD_MS).await; + None + } + } +} + +/// If the anchor's last successful contact was more than `threshold_ms` +/// ago, delete it from `known_anchors`. Future startups won't waste a +/// probe slot on it. Anchors that were recently successful are preserved +/// even when they fail a single probe (likely transient). +async fn maybe_prune_stale_anchor( + storage: &Arc, + nid: &NodeId, + threshold_ms: u64, +) { + let s = storage.get().await; + let last_seen_ms = match s.get_known_anchor_last_seen(nid) { + Ok(Some(ms)) => ms, + _ => return, + }; + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + if now_ms > last_seen_ms && now_ms - last_seen_ms > threshold_ms { + let _ = s.delete_known_anchor(nid); + debug!( + peer = hex::encode(nid), + age_ms = now_ms - last_seen_ms, + "Pruned stale anchor (>3 days since last success + failed probe)" + ); + } +} + impl Node { /// Create or open a node in the given data directory (Desktop profile) pub async fn open(data_dir: impl AsRef) -> anyhow::Result { @@ -272,6 +441,11 @@ impl Node { /// Bootstrap: connect to anchors, pull initial data, NAT probe, referrals. /// Can be called during open_with_bind (blocking startup) or deferred to background. + /// + /// v0.7.3: anchor probing is batched (3 in flight, 2s stagger between batches, + /// 10s per-anchor timeout, first success unblocks downstream, remaining probes + /// continue in background and naturally fill peer connections). Failed probes + /// to anchors >3 days stale auto-prune from `known_anchors`. pub async fn run_bootstrap(&self, data_dir: &Path) -> anyhow::Result<()> { let storage = &self.storage; let network = &self.network; @@ -479,57 +653,28 @@ impl Node { let (discovered, bootstrap_known): (Vec<_>, Vec<_>) = known.into_iter() .partition(|(nid, _)| !bootstrap_anchor_ids.contains(nid)); - // Phase 1: Try discovered (non-bootstrap) anchors first - let mut connected_anchor = None; - for (anchor_nid, anchor_addrs) in &discovered { - if *anchor_nid == node_id || network.is_peer_connected_or_session(anchor_nid).await { - continue; - } - let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) { - Ok(eid) => eid, - Err(_) => continue, - }; - let mut addr = iroh::EndpointAddr::from(endpoint_id); - for sa in anchor_addrs { - addr = addr.with_ip_addr(*sa); - } - info!(peer = hex::encode(anchor_nid), "Trying discovered anchor"); - match tokio::time::timeout(std::time::Duration::from_secs(10), network.connect_to_anchor(*anchor_nid, addr)).await { - Ok(Ok(())) => { - info!(peer = hex::encode(anchor_nid), "Connected to discovered anchor"); - connected_anchor = Some(*anchor_nid); - break; - } - Ok(Err(e)) => debug!(error = %e, peer = hex::encode(anchor_nid), "Discovered anchor: connect failed"), - Err(_) => debug!(peer = hex::encode(anchor_nid), "Discovered anchor: connect timed out"), - } - } + // Phase 1: probe discovered (non-bootstrap) anchors in batches. + // First success returns immediately; remaining probes continue in + // background. Failed probes to anchors >3 days stale auto-prune. + let mut connected_anchor = probe_anchors_batched( + discovered.clone(), + network.clone(), + Arc::clone(storage), + node_id, + "discovered", + ).await; - // Phase 2: Fall back to bootstrap anchors only if no discovered anchor worked + // Phase 2: bootstrap anchors as fallback — only fires if every + // Phase 1 entry failed. Preserves the load-distribution intent + // (don't smash the central anchor when discovered anchors work). if connected_anchor.is_none() { - for (anchor_nid, anchor_addrs) in &bootstrap_known { - if *anchor_nid == node_id || network.is_peer_connected_or_session(anchor_nid).await { - continue; - } - let endpoint_id = match iroh::EndpointId::from_bytes(anchor_nid) { - Ok(eid) => eid, - Err(_) => continue, - }; - let mut addr = iroh::EndpointAddr::from(endpoint_id); - for sa in anchor_addrs { - addr = addr.with_ip_addr(*sa); - } - info!(peer = hex::encode(anchor_nid), "Trying bootstrap anchor (fallback)"); - match tokio::time::timeout(std::time::Duration::from_secs(10), network.connect_to_anchor(*anchor_nid, addr)).await { - Ok(Ok(())) => { - info!(peer = hex::encode(anchor_nid), "Connected to bootstrap anchor"); - connected_anchor = Some(*anchor_nid); - break; - } - Ok(Err(e)) => debug!(error = %e, peer = hex::encode(anchor_nid), "Bootstrap anchor: connect failed"), - Err(_) => debug!(peer = hex::encode(anchor_nid), "Bootstrap anchor: connect timed out"), - } - } + connected_anchor = probe_anchors_batched( + bootstrap_known.clone(), + network.clone(), + Arc::clone(storage), + node_id, + "bootstrap", + ).await; } // Phase 3: NAT probe + referrals from whichever anchor we connected to diff --git a/crates/core/src/storage.rs b/crates/core/src/storage.rs index a330986..9cd95cb 100644 --- a/crates/core/src/storage.rs +++ b/crates/core/src/storage.rs @@ -2248,6 +2248,33 @@ impl Storage { Ok(result) } + /// Get the last successful contact time (ms since epoch) for a known anchor. + /// Returns None if the anchor isn't in the table. + pub fn get_known_anchor_last_seen(&self, node_id: &NodeId) -> anyhow::Result> { + let mut stmt = self.conn.prepare( + "SELECT last_seen_ms FROM known_anchors WHERE node_id = ?1", + )?; + let mut rows = stmt.query(params![node_id.as_slice()])?; + if let Some(row) = rows.next()? { + let ms: i64 = row.get(0)?; + Ok(Some(ms as u64)) + } else { + Ok(None) + } + } + + /// Remove a known anchor entry. Used by the bootstrap connect path + /// when a stale anchor (>3 days since last successful contact) fails + /// to connect — self-healing pruning so future startups don't re-try + /// long-dead entries. + pub fn delete_known_anchor(&self, node_id: &NodeId) -> anyhow::Result<()> { + self.conn.execute( + "DELETE FROM known_anchors WHERE node_id = ?1", + params![node_id.as_slice()], + )?; + Ok(()) + } + /// Prune known anchors to keep at most `max` entries (by highest success_count). pub fn prune_known_anchors(&self, max: usize) -> anyhow::Result { let count: i64 = self.conn.query_row( diff --git a/crates/tauri-app/Cargo.toml b/crates/tauri-app/Cargo.toml index 7283ae1..7e9e5a2 100644 --- a/crates/tauri-app/Cargo.toml +++ b/crates/tauri-app/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "itsgoin-desktop" -version = "0.7.2" +version = "0.7.3" edition = "2021" [lib] diff --git a/crates/tauri-app/gen/android/app/src/main/java/com/itsgoin/app/NodeService.kt b/crates/tauri-app/gen/android/app/src/main/java/com/itsgoin/app/NodeService.kt index 4a5aaf0..0112936 100644 --- a/crates/tauri-app/gen/android/app/src/main/java/com/itsgoin/app/NodeService.kt +++ b/crates/tauri-app/gen/android/app/src/main/java/com/itsgoin/app/NodeService.kt @@ -5,6 +5,7 @@ import android.app.NotificationChannel import android.app.NotificationManager import android.app.PendingIntent import android.app.Service +import android.content.Context import android.content.Intent import android.content.pm.ServiceInfo import android.os.Build @@ -16,6 +17,17 @@ class NodeService : Service() { companion object { const val CHANNEL_ID = "itsgoin_node" const val NOTIFICATION_ID = 1 + + // Called via JNI from Rust when the user taps the in-app close + // button. Foreground services survive Activity exit by design + // (keeps connections alive when backgrounded). When the user + // explicitly wants to stop networking, we need to stop the + // service in addition to ending the Activity. + @JvmStatic + fun stopFromNative(context: Context) { + val intent = Intent(context, NodeService::class.java) + context.stopService(intent) + } } private var wakeLock: PowerManager.WakeLock? = null diff --git a/crates/tauri-app/src/lib.rs b/crates/tauri-app/src/lib.rs index a713ceb..5d9dfdd 100644 --- a/crates/tauri-app/src/lib.rs +++ b/crates/tauri-app/src/lib.rs @@ -1144,6 +1144,13 @@ async fn list_vouches_given(state: State<'_, AppNode>) -> Result - +