//! Minimal HTTP/1.1 server for serving public posts to browsers. //! Zero external dependencies — raw TCP with tokio. //! Runs alongside the QUIC listener on the same port number (TCP vs UDP). use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Mutex; use tracing::{debug, info}; use crate::blob::BlobStore; use crate::storage::Storage; use crate::types::PostVisibility; /// Connection budget: 5 content slots, 15 redirect slots, 1 per IP. const MAX_CONTENT_SLOTS: usize = 5; const MAX_REDIRECT_SLOTS: usize = 15; const MAX_TOTAL: usize = MAX_CONTENT_SLOTS + MAX_REDIRECT_SLOTS; const MAX_PER_IP: usize = 1; const HEADER_TIMEOUT_SECS: u64 = 5; /// Static HTML footer appended to every post page. pub const POST_FOOTER: &str = r#""#; /// Tracks active HTTP connections. struct HttpBudget { ip_counts: HashMap, content_slots: usize, redirect_slots: usize, } impl HttpBudget { fn new() -> Self { Self { ip_counts: HashMap::new(), content_slots: 0, redirect_slots: 0, } } fn total(&self) -> usize { self.content_slots + self.redirect_slots } fn try_acquire_content(&mut self, ip: IpAddr) -> bool { if self.total() >= MAX_TOTAL { return false; } if self.content_slots >= MAX_CONTENT_SLOTS { return false; } let count = self.ip_counts.entry(ip).or_insert(0); if *count >= MAX_PER_IP { return false; } *count += 1; self.content_slots += 1; true } fn try_acquire_redirect(&mut self, ip: IpAddr) -> bool { if self.total() >= MAX_TOTAL { return false; } if self.redirect_slots >= MAX_REDIRECT_SLOTS { return false; } let count = self.ip_counts.entry(ip).or_insert(0); if *count >= MAX_PER_IP { return false; } *count += 1; self.redirect_slots += 1; true } fn release_content(&mut self, ip: IpAddr) { self.content_slots = self.content_slots.saturating_sub(1); if let Some(count) = self.ip_counts.get_mut(&ip) { *count = count.saturating_sub(1); if *count == 0 { self.ip_counts.remove(&ip); } } } fn release_redirect(&mut self, ip: IpAddr) { self.redirect_slots = self.redirect_slots.saturating_sub(1); if let Some(count) = self.ip_counts.get_mut(&ip) { *count = count.saturating_sub(1); if *count == 0 { self.ip_counts.remove(&ip); } } } } /// Run the HTTP server on the given port. Blocks forever. pub async fn run_http_server( port: u16, storage: Arc>, blob_store: Arc, downstream_addrs: Arc>>>, ) -> anyhow::Result<()> { let addr: SocketAddr = ([0, 0, 0, 0], port).into(); // Use SO_REUSEADDR + SO_REUSEPORT so TCP punch sockets can share the port let socket = tokio::net::TcpSocket::new_v4()?; socket.set_reuseaddr(true)?; #[cfg(unix)] socket.set_reuseport(true)?; socket.bind(addr)?; let listener = socket.listen(128)?; info!("HTTP server listening on TCP port {}", port); let budget = Arc::new(std::sync::Mutex::new(HttpBudget::new())); loop { let (stream, peer_addr) = match listener.accept().await { Ok(v) => v, Err(e) => { debug!("HTTP accept error: {}", e); continue; } }; let ip = peer_addr.ip(); // Try to acquire a content slot first (keeps connection alive for blob pulls). // If content slots full, try redirect slot. let slot = { let mut b = budget.lock().unwrap(); if b.try_acquire_content(ip) { Some(SlotKind::Content) } else if b.try_acquire_redirect(ip) { Some(SlotKind::Redirect) } else { None } }; let slot = match slot { Some(s) => s, None => { // Over budget — hard close drop(stream); continue; } }; let storage = Arc::clone(&storage); let blob_store = Arc::clone(&blob_store); let budget = Arc::clone(&budget); let downstream_addrs = Arc::clone(&downstream_addrs); tokio::spawn(async move { handle_connection(stream, ip, slot, &storage, &blob_store, &downstream_addrs).await; let mut b = budget.lock().unwrap(); match slot { SlotKind::Content => b.release_content(ip), SlotKind::Redirect => b.release_redirect(ip), } }); } } #[derive(Debug, Clone, Copy)] enum SlotKind { Content, Redirect, } /// Handle one HTTP connection (potentially keep-alive with multiple requests). async fn handle_connection( mut stream: TcpStream, _ip: IpAddr, slot: SlotKind, storage: &Arc>, blob_store: &Arc, downstream_addrs: &Arc>>>, ) { // Keep-alive loop: handle sequential requests on the same connection loop { let mut buf = vec![0u8; 4096]; let n = match tokio::time::timeout( std::time::Duration::from_secs(HEADER_TIMEOUT_SECS), stream.read(&mut buf), ) .await { Ok(Ok(0)) => return, // connection closed Ok(Ok(n)) => n, Ok(Err(_)) | Err(_) => return, // error or timeout — hard close }; let request = &buf[..n]; // Parse method and path from first line let (method, path) = match parse_request_line(request) { Some(v) => v, None => return, // malformed — hard close }; if method != "GET" { return; // only GET — hard close } if let Some(hex) = path.strip_prefix("/p/") { let post_id = match validate_hex64(hex) { Some(id) => id, None => return, // malformed — hard close }; match slot { SlotKind::Content => { if !serve_post(&mut stream, &post_id, storage, blob_store).await { return; } } SlotKind::Redirect => { if !try_redirect(&mut stream, &post_id, storage, downstream_addrs).await { return; } } } } else if let Some(hex) = path.strip_prefix("/b/") { let blob_id = match validate_hex64(hex) { Some(id) => id, None => return, // malformed — hard close }; match slot { SlotKind::Content => { if !serve_blob(&mut stream, &blob_id, storage, blob_store).await { return; } } SlotKind::Redirect => { // Redirect blob requests to the same host as the post // (browser will follow the redirect and pull from there) return; // hard close — blobs only served on content slots } } } else { return; // unknown path — hard close } } } /// Parse "GET /path HTTP/1.x\r\n..." → ("GET", "/path") fn parse_request_line(buf: &[u8]) -> Option<(&str, &str)> { let line_end = buf.iter().position(|&b| b == b'\r' || b == b'\n')?; let line = std::str::from_utf8(&buf[..line_end]).ok()?; let mut parts = line.split(' '); let method = parts.next()?; let path = parts.next()?; // Must have HTTP/1.x version let version = parts.next()?; if !version.starts_with("HTTP/") { return None; } Some((method, path)) } /// Validate a string is exactly 64 lowercase hex chars and decode to 32 bytes. fn validate_hex64(s: &str) -> Option<[u8; 32]> { if s.len() != 64 { return None; } if !s.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()) { return None; } let bytes = hex::decode(s).ok()?; bytes.try_into().ok() } /// Serve a post as HTML. Returns true if connection should stay alive. async fn serve_post( stream: &mut TcpStream, post_id: &[u8; 32], storage: &Arc>, blob_store: &Arc, ) -> bool { // Look up post + visibility let result = { let store = storage.lock().await; store.get_post_with_visibility(post_id) }; let (post, visibility) = match result { Ok(Some((p, v))) => (p, v), _ => return false, // not found — hard close (same as "not public") }; if !matches!(visibility, PostVisibility::Public) { return false; // not public — hard close } // Look up author name let author_name = { let store = storage.lock().await; store .get_profile(&post.author) .ok() .flatten() .map(|p| p.display_name) .unwrap_or_default() }; let _ = blob_store; // blob data served via /b/ route, not inlined // Build HTML let html = render_post_html(&post, post_id, &author_name); write_http_response(stream, 200, "text/html; charset=utf-8", html.as_bytes()).await } /// Serve a blob's raw bytes. Returns true if connection should stay alive. async fn serve_blob( stream: &mut TcpStream, blob_id: &[u8; 32], storage: &Arc>, blob_store: &Arc, ) -> bool { // Verify this blob belongs to a public post let (mime_type, _post_id) = { let store = storage.lock().await; match find_public_blob_info(&store, blob_id) { Some(info) => info, None => return false, // not found or not public — hard close } }; // Read blob data from filesystem let data = match blob_store.get(blob_id) { Ok(Some(data)) => data, _ => return false, // blob not on disk — hard close }; write_http_response(stream, 200, &mime_type, &data).await } /// Find a blob's mime type and verify it belongs to a public post. /// Returns (mime_type, post_id) or None. fn find_public_blob_info(store: &Storage, blob_id: &[u8; 32]) -> Option<(String, [u8; 32])> { // Search posts for one that has this blob as an attachment and is public // Use the blobs table to find which post owns this blob let post_id = store.get_blob_post_id(blob_id).ok()??; let (post, visibility) = store.get_post_with_visibility(&post_id).ok()??; if !matches!(visibility, PostVisibility::Public) { return None; } // Find the mime type from the post's attachments for att in &post.attachments { if att.cid == *blob_id { return Some((att.mime_type.clone(), post_id)); } } // Blob exists but isn't in this post's attachments (shouldn't happen) None } /// Try to 302 redirect to a downstream host that has this post. /// Returns true if redirect was sent, false to hard close. async fn try_redirect( stream: &mut TcpStream, post_id: &[u8; 32], storage: &Arc>, _downstream_addrs: &Arc>>>, ) -> bool { // Get downstream peers for this post let downstream_peers = { let store = storage.lock().await; // Verify post exists and is public first match store.get_post_with_visibility(post_id) { Ok(Some((_, PostVisibility::Public))) => {} _ => return false, // not found or not public — hard close } store.get_post_downstream(post_id).unwrap_or_default() }; // Get addresses for downstream peers let candidates: Vec = { let store = storage.lock().await; let mut addrs = Vec::new(); for peer_id in &downstream_peers { if let Ok(Some(peer)) = store.get_peer_record(peer_id) { for addr in &peer.addresses { if crate::network::is_publicly_routable(addr) { addrs.push(*addr); break; // one address per peer is enough } } } } addrs }; // TCP probe candidates (200ms timeout) and redirect to first live one let post_hex = hex::encode(post_id); for candidate in &candidates { if tcp_probe(candidate, 200).await { let location = format_http_url(candidate, &format!("/p/{}", post_hex)); return write_redirect(stream, &location).await; } } false // all dead — hard close } /// TCP handshake probe with timeout in milliseconds. async fn tcp_probe(addr: &SocketAddr, timeout_ms: u64) -> bool { tokio::time::timeout( std::time::Duration::from_millis(timeout_ms), TcpStream::connect(addr), ) .await .map(|r| r.is_ok()) .unwrap_or(false) } /// Format an HTTP URL for a socket address (handles IPv6 bracket notation). fn format_http_url(addr: &SocketAddr, path: &str) -> String { match addr { SocketAddr::V4(v4) => format!("http://{}:{}{}", v4.ip(), v4.port(), path), SocketAddr::V6(v6) => format!("http://[{}]:{}{}", v6.ip(), v6.port(), path), } } /// Write a 302 redirect response. Returns true on success. async fn write_redirect(stream: &mut TcpStream, location: &str) -> bool { let response = format!( "HTTP/1.1 302 Found\r\nLocation: {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n", location ); stream.write_all(response.as_bytes()).await.is_ok() } /// Write an HTTP response with status, content type, and body. Returns true on success. async fn write_http_response( stream: &mut TcpStream, status: u16, content_type: &str, body: &[u8], ) -> bool { let status_text = match status { 200 => "OK", _ => "Error", }; let header = format!( "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nAccess-Control-Allow-Origin: *\r\nConnection: keep-alive\r\n\r\n", status, status_text, content_type, body.len() ); if stream.write_all(header.as_bytes()).await.is_err() { return false; } stream.write_all(body).await.is_ok() } /// Render a post as a minimal HTML page. pub fn render_post_html(post: &crate::types::Post, _post_id: &[u8; 32], author_name: &str) -> String { let escaped_content = html_escape(&post.content); let display_name = if author_name.is_empty() { &hex::encode(&post.author)[..12] } else { author_name }; let escaped_name = html_escape(display_name); let mut attachments_html = String::new(); for att in &post.attachments { let cid_hex = hex::encode(&att.cid); if att.mime_type.starts_with("video/") { attachments_html.push_str(&format!( r#""#, cid_hex )); } else { attachments_html.push_str(&format!( r#"attachment"#, cid_hex )); } } let timestamp = post.timestamp_ms / 1000; format!( r#" {name} on ItsGoin
{name}
{content}
{attachments}
{footer} "#, name = escaped_name, content = escaped_content, attachments = attachments_html, ts = timestamp, footer = POST_FOOTER, ) } /// Execute a TCP hole punch: send a SYN toward the browser's IP from our HTTP port. /// The connect will almost certainly fail (browser isn't listening), but the outbound /// SYN creates a NAT mapping allowing the browser's inbound HTTP connection. pub async fn tcp_punch(http_port: u16, browser_ip: std::net::IpAddr) -> bool { use std::net::SocketAddr; // Bind to the same port as our HTTP server (SO_REUSEPORT allows this) let socket = match browser_ip { std::net::IpAddr::V4(_) => tokio::net::TcpSocket::new_v4(), std::net::IpAddr::V6(_) => tokio::net::TcpSocket::new_v6(), }; let socket = match socket { Ok(s) => s, Err(e) => { debug!("TCP punch: failed to create socket: {}", e); return false; } }; let _ = socket.set_reuseaddr(true); #[cfg(unix)] let _ = socket.set_reuseport(true); let local_addr: SocketAddr = match browser_ip { std::net::IpAddr::V4(_) => ([0, 0, 0, 0], http_port).into(), std::net::IpAddr::V6(_) => (std::net::Ipv6Addr::UNSPECIFIED, http_port).into(), }; if let Err(e) = socket.bind(local_addr) { debug!("TCP punch: failed to bind port {}: {}", http_port, e); return false; } // Connect to browser IP on port 80 (destination port doesn't matter for EIM NAT, // the purpose is to create a NAT mapping entry). 500ms timeout — fire and forget. let target: SocketAddr = (browser_ip, 80).into(); let _ = tokio::time::timeout( std::time::Duration::from_millis(500), socket.connect(target), ).await; // Success or failure doesn't matter — the SYN left our NAT debug!(browser_ip = %browser_ip, port = http_port, "TCP punch SYN sent"); true } /// Minimal HTML entity escaping. pub fn html_escape(s: &str) -> String { let mut out = String::with_capacity(s.len()); for c in s.chars() { match c { '<' => out.push_str("<"), '>' => out.push_str(">"), '&' => out.push_str("&"), '"' => out.push_str("""), '\'' => out.push_str("'"), _ => out.push(c), } } out } // --- Share link generation --- /// Encode a list of socket addresses as compact binary, then base64url. /// Per IPv4: [0x04][4 bytes IP][2 bytes port] = 7 bytes /// Per IPv6: [0x06][16 bytes IP][2 bytes port] = 19 bytes pub fn encode_hostlist(hosts: &[SocketAddr]) -> String { let mut buf = Vec::with_capacity(hosts.len() * 19); for host in hosts.iter().take(5) { match host { SocketAddr::V4(v4) => { buf.push(0x04); buf.extend_from_slice(&v4.ip().octets()); buf.extend_from_slice(&v4.port().to_be_bytes()); } SocketAddr::V6(v6) => { buf.push(0x06); buf.extend_from_slice(&v6.ip().octets()); buf.extend_from_slice(&v6.port().to_be_bytes()); } } } base64url_encode(&buf) } /// Decode a base64url-encoded hostlist back to socket addresses. pub fn decode_hostlist(encoded: &str) -> Vec { let buf = match base64url_decode(encoded) { Some(b) => b, None => return Vec::new(), }; let mut addrs = Vec::new(); let mut i = 0; while i < buf.len() { match buf[i] { 0x04 if i + 7 <= buf.len() => { let ip = std::net::Ipv4Addr::new(buf[i + 1], buf[i + 2], buf[i + 3], buf[i + 4]); let port = u16::from_be_bytes([buf[i + 5], buf[i + 6]]); addrs.push(SocketAddr::new(ip.into(), port)); i += 7; } 0x06 if i + 19 <= buf.len() => { let mut octets = [0u8; 16]; octets.copy_from_slice(&buf[i + 1..i + 17]); let ip = std::net::Ipv6Addr::from(octets); let port = u16::from_be_bytes([buf[i + 17], buf[i + 18]]); addrs.push(SocketAddr::new(ip.into(), port)); i += 19; } _ => break, // malformed } } addrs } // --- Minimal base64url implementation (no external dependency) --- const B64_CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; fn base64url_encode(data: &[u8]) -> String { let mut out = String::with_capacity((data.len() * 4 + 2) / 3); let mut i = 0; while i + 2 < data.len() { let n = ((data[i] as u32) << 16) | ((data[i + 1] as u32) << 8) | data[i + 2] as u32; out.push(B64_CHARS[((n >> 18) & 0x3F) as usize] as char); out.push(B64_CHARS[((n >> 12) & 0x3F) as usize] as char); out.push(B64_CHARS[((n >> 6) & 0x3F) as usize] as char); out.push(B64_CHARS[(n & 0x3F) as usize] as char); i += 3; } let remaining = data.len() - i; if remaining == 2 { let n = ((data[i] as u32) << 16) | ((data[i + 1] as u32) << 8); out.push(B64_CHARS[((n >> 18) & 0x3F) as usize] as char); out.push(B64_CHARS[((n >> 12) & 0x3F) as usize] as char); out.push(B64_CHARS[((n >> 6) & 0x3F) as usize] as char); } else if remaining == 1 { let n = (data[i] as u32) << 16; out.push(B64_CHARS[((n >> 18) & 0x3F) as usize] as char); out.push(B64_CHARS[((n >> 12) & 0x3F) as usize] as char); } out // no padding } fn base64url_decode(s: &str) -> Option> { let mut buf = Vec::with_capacity(s.len() * 3 / 4); let mut accum: u32 = 0; let mut bits: u32 = 0; for c in s.bytes() { let val = match c { b'A'..=b'Z' => c - b'A', b'a'..=b'z' => c - b'a' + 26, b'0'..=b'9' => c - b'0' + 52, b'-' => 62, b'_' => 63, b'=' => continue, // skip padding _ => return None, }; accum = (accum << 6) | val as u32; bits += 6; if bits >= 8 { bits -= 8; buf.push((accum >> bits) as u8); accum &= (1 << bits) - 1; } } Some(buf) } #[cfg(test)] mod tests { use super::*; #[test] fn test_validate_hex64() { let valid = "a".repeat(64); assert!(validate_hex64(&valid).is_some()); let short = "a".repeat(63); assert!(validate_hex64(&short).is_none()); let upper = "A".repeat(64); assert!(validate_hex64(&upper).is_none()); } #[test] fn test_html_escape() { assert_eq!(html_escape("