itsgoin/crates/core/src/protocol.rs
Scott Reimers be253e8001 Our Info panel, hole punch race fix, NAT profiles in relay introduction
- Network Diagnostics: "Our Info" button shows addresses with NAT status,
  device role, UPnP, HTTP capability. Addresses stacked for mobile.
- Hole punch race: re-check for existing connection before and after relay
  introduction to avoid wasting minutes on redundant punch attempts.
- Relay introduction now carries requester/target NAT mapping+filtering
  so hole punch strategy uses fresh profiles instead of stale stored ones.
  Critical for phones that switch between WiFi/cellular/VPN.
- STUN fix: filter DNS results to IPv4 (was resolving to IPv6 first on
  dual-stack, causing silent send failure and "NAT unknown").
- Welcome screen: Ready button with loading bar for instant feed access.
- LAN addresses show just "LAN" (no misleading punchability label).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:57:41 -04:00

1044 lines
36 KiB
Rust

use serde::{Deserialize, Serialize};
use crate::types::{
BlobHeaderDiffOp, CdnManifest, DeleteRecord, GroupEpoch, GroupId, GroupMemberKey, NodeId,
PeerWithAddress, Post, PostId, PostVisibility, PublicProfile, VisibilityUpdate, WormId,
};
/// Single ALPN for Discovery Protocol v3 (N1/N2/N3 architecture)
pub const ALPN_V2: &[u8] = b"itsgoin/3";
/// A post bundled with its visibility metadata for sync
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncPost {
pub id: PostId,
pub post: Post,
pub visibility: PostVisibility,
}
/// Message type byte for stream multiplexing
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum MessageType {
NodeListUpdate = 0x01,
InitialExchange = 0x02,
AddressRequest = 0x03,
AddressResponse = 0x04,
RefuseRedirect = 0x05,
PullSyncRequest = 0x40,
PullSyncResponse = 0x41,
PostNotification = 0x42,
PostPush = 0x43,
AudienceRequest = 0x44,
AudienceResponse = 0x45,
ProfileUpdate = 0x50,
DeleteRecord = 0x51,
VisibilityUpdate = 0x52,
WormQuery = 0x60,
WormResponse = 0x61,
SocialAddressUpdate = 0x70,
SocialDisconnectNotice = 0x71,
SocialCheckin = 0x72,
// 0x80-0x81 reserved
BlobRequest = 0x90,
BlobResponse = 0x91,
ManifestRefreshRequest = 0x92,
ManifestRefreshResponse = 0x93,
ManifestPush = 0x94,
BlobDeleteNotice = 0x95,
GroupKeyDistribute = 0xA0,
GroupKeyRequest = 0xA1,
GroupKeyResponse = 0xA2,
RelayIntroduce = 0xB0,
RelayIntroduceResult = 0xB1,
SessionRelay = 0xB2,
MeshPrefer = 0xB3,
CircleProfileUpdate = 0xB4,
AnchorRegister = 0xC0,
AnchorReferralRequest = 0xC1,
AnchorReferralResponse = 0xC2,
AnchorProbeRequest = 0xC3,
AnchorProbeResult = 0xC4,
PortScanHeartbeat = 0xC5,
NatFilterProbe = 0xC6,
NatFilterProbeResult = 0xC7,
BlobHeaderDiff = 0xD0,
BlobHeaderRequest = 0xD1,
BlobHeaderResponse = 0xD2,
PostDownstreamRegister = 0xD3,
PostFetchRequest = 0xD4,
PostFetchResponse = 0xD5,
TcpPunchRequest = 0xD6,
TcpPunchResult = 0xD7,
MeshKeepalive = 0xE0,
ReplicationRequest = 0xE1,
ReplicationResponse = 0xE2,
}
impl MessageType {
pub fn from_byte(b: u8) -> Option<Self> {
match b {
0x01 => Some(Self::NodeListUpdate),
0x02 => Some(Self::InitialExchange),
0x03 => Some(Self::AddressRequest),
0x04 => Some(Self::AddressResponse),
0x05 => Some(Self::RefuseRedirect),
0x40 => Some(Self::PullSyncRequest),
0x41 => Some(Self::PullSyncResponse),
0x42 => Some(Self::PostNotification),
0x43 => Some(Self::PostPush),
0x44 => Some(Self::AudienceRequest),
0x45 => Some(Self::AudienceResponse),
0x50 => Some(Self::ProfileUpdate),
0x51 => Some(Self::DeleteRecord),
0x52 => Some(Self::VisibilityUpdate),
0x60 => Some(Self::WormQuery),
0x61 => Some(Self::WormResponse),
0x70 => Some(Self::SocialAddressUpdate),
0x71 => Some(Self::SocialDisconnectNotice),
0x72 => Some(Self::SocialCheckin),
0x90 => Some(Self::BlobRequest),
0x91 => Some(Self::BlobResponse),
0x92 => Some(Self::ManifestRefreshRequest),
0x93 => Some(Self::ManifestRefreshResponse),
0x94 => Some(Self::ManifestPush),
0x95 => Some(Self::BlobDeleteNotice),
0xA0 => Some(Self::GroupKeyDistribute),
0xA1 => Some(Self::GroupKeyRequest),
0xA2 => Some(Self::GroupKeyResponse),
0xB0 => Some(Self::RelayIntroduce),
0xB1 => Some(Self::RelayIntroduceResult),
0xB2 => Some(Self::SessionRelay),
0xB3 => Some(Self::MeshPrefer),
0xB4 => Some(Self::CircleProfileUpdate),
0xC0 => Some(Self::AnchorRegister),
0xC1 => Some(Self::AnchorReferralRequest),
0xC2 => Some(Self::AnchorReferralResponse),
0xC3 => Some(Self::AnchorProbeRequest),
0xC4 => Some(Self::AnchorProbeResult),
0xC5 => Some(Self::PortScanHeartbeat),
0xC6 => Some(Self::NatFilterProbe),
0xC7 => Some(Self::NatFilterProbeResult),
0xD0 => Some(Self::BlobHeaderDiff),
0xD1 => Some(Self::BlobHeaderRequest),
0xD2 => Some(Self::BlobHeaderResponse),
0xD3 => Some(Self::PostDownstreamRegister),
0xD4 => Some(Self::PostFetchRequest),
0xD5 => Some(Self::PostFetchResponse),
0xD6 => Some(Self::TcpPunchRequest),
0xD7 => Some(Self::TcpPunchResult),
0xE0 => Some(Self::MeshKeepalive),
0xE1 => Some(Self::ReplicationRequest),
0xE2 => Some(Self::ReplicationResponse),
_ => None,
}
}
pub fn as_byte(self) -> u8 {
self as u8
}
}
// --- Payload structs ---
/// Initial exchange: N1/N2 node lists + profile + deletes + post_ids + peer addresses
#[derive(Debug, Serialize, Deserialize)]
pub struct InitialExchangePayload {
/// Our connections + social contacts NodeIds (no addresses)
pub n1_node_ids: Vec<NodeId>,
/// Our deduplicated N2 NodeIds (no addresses)
pub n2_node_ids: Vec<NodeId>,
/// Our profile
pub profile: Option<PublicProfile>,
/// Our delete records
pub deletes: Vec<DeleteRecord>,
/// Our post IDs (for replica tracking)
pub post_ids: Vec<PostId>,
/// Our N+10:Addresses (connected peers with addresses) for social routing
#[serde(default)]
pub peer_addresses: Vec<PeerWithAddress>,
/// If sender is an anchor, their stable advertised address (e.g. "174.127.120.52:4433")
#[serde(default)]
pub anchor_addr: Option<String>,
/// What the sender sees as the receiver's address (STUN-like observed addr)
#[serde(default)]
pub your_observed_addr: Option<String>,
/// Sender's NAT type ("public", "easy", "hard", "unknown")
#[serde(default)]
pub nat_type: Option<String>,
/// Sender's NAT mapping behavior ("eim", "edm", "unknown")
#[serde(default)]
pub nat_mapping: Option<String>,
/// Sender's NAT filtering behavior ("open", "port_restricted", "unknown")
#[serde(default)]
pub nat_filtering: Option<String>,
/// Whether the sender is running an HTTP server for direct browser access
#[serde(default)]
pub http_capable: bool,
/// External HTTP address if known (e.g. "1.2.3.4:4433")
#[serde(default)]
pub http_addr: Option<String>,
/// CDN replication device role: "intermittent", "available", "persistent"
#[serde(default)]
pub device_role: Option<String>,
/// CDN cache pressure: 0-255 availability score (255 = lots of capacity)
#[serde(default)]
pub cache_pressure: Option<u8>,
}
/// Incremental N1/N2 changes
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeListUpdatePayload {
pub seq: u64,
pub n1_added: Vec<NodeId>,
pub n1_removed: Vec<NodeId>,
pub n2_added: Vec<NodeId>,
pub n2_removed: Vec<NodeId>,
}
/// Pull-based post sync request
#[derive(Debug, Serialize, Deserialize)]
pub struct PullSyncRequestPayload {
/// Our follows (for the responder to filter)
pub follows: Vec<NodeId>,
/// Post IDs we already have (backward compat — empty for v4 senders)
#[serde(default)]
pub have_post_ids: Vec<PostId>,
/// Protocol v4: per-author timestamps (Vec of tuples for serde compat)
#[serde(default)]
pub since_ms: Vec<(NodeId, u64)>,
}
/// Pull-based post sync response
#[derive(Debug, Serialize, Deserialize)]
pub struct PullSyncResponsePayload {
pub posts: Vec<SyncPost>,
pub visibility_updates: Vec<VisibilityUpdate>,
}
/// Profile update (pushed via uni-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct ProfileUpdatePayload {
pub profiles: Vec<PublicProfile>,
}
/// Delete record (pushed via uni-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleteRecordPayload {
pub records: Vec<DeleteRecord>,
}
/// Visibility update (pushed via uni-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct VisibilityUpdatePayload {
pub updates: Vec<VisibilityUpdate>,
}
/// Post notification: lightweight push when a new post is created
#[derive(Debug, Serialize, Deserialize)]
pub struct PostNotificationPayload {
pub post_id: PostId,
pub author: NodeId,
}
/// Audience request: ask a peer to join their audience
#[derive(Debug, Serialize, Deserialize)]
pub struct AudienceRequestPayload {
pub requester: NodeId,
}
/// Audience response: approve or deny an audience request
#[derive(Debug, Serialize, Deserialize)]
pub struct AudienceResponsePayload {
pub responder: NodeId,
pub approved: bool,
}
/// Post push: full post content pushed directly to a recipient
#[derive(Debug, Serialize, Deserialize)]
pub struct PostPushPayload {
pub post: SyncPost,
}
/// Address resolution request (bi-stream: ask reporter for a hop-2 peer's address)
#[derive(Debug, Serialize, Deserialize)]
pub struct AddressRequestPayload {
pub target: NodeId,
}
/// Address resolution response
#[derive(Debug, Serialize, Deserialize)]
pub struct AddressResponsePayload {
pub target: NodeId,
pub address: Option<String>,
/// Set when the target is known-disconnected (requester registered as watcher)
#[serde(default)]
pub disconnected_at: Option<u64>,
/// Target's N+10:Addresses if known
#[serde(default)]
pub peer_addresses: Vec<PeerWithAddress>,
}
/// Refuse mesh connection with optional redirect to another peer
#[derive(Debug, Serialize, Deserialize)]
pub struct RefuseRedirectPayload {
pub reason: String,
pub redirect: Option<PeerWithAddress>,
}
/// Worm lookup query (bi-stream) — searches for nodes, posts, or blobs
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WormQueryPayload {
pub worm_id: WormId,
pub target: NodeId,
/// Additional IDs to search for (up to 10 recent_peers of target)
#[serde(default)]
pub needle_peers: Vec<NodeId>,
pub ttl: u8,
pub visited: Vec<NodeId>,
/// Optional: also search for a specific post by ID
#[serde(default)]
pub post_id: Option<PostId>,
/// Optional: also search for a specific blob by CID
#[serde(default)]
pub blob_id: Option<[u8; 32]>,
}
/// Worm lookup response (bi-stream, paired with query)
#[derive(Debug, Serialize, Deserialize)]
pub struct WormResponsePayload {
pub worm_id: WormId,
pub found: bool,
/// Which needle was actually found (target or one of its recent_peers)
#[serde(default)]
pub found_id: Option<NodeId>,
pub addresses: Vec<String>,
pub reporter: Option<NodeId>,
pub hop: Option<u8>,
/// One random wide-peer referral: (node_id, address) for bloom round
#[serde(default)]
pub wide_referral: Option<(NodeId, String)>,
/// Node that holds the requested post (may differ from found_id)
#[serde(default)]
pub post_holder: Option<NodeId>,
/// Node that holds the requested blob
#[serde(default)]
pub blob_holder: Option<NodeId>,
}
// --- Social routing payloads ---
/// Address update notification: "here's N+10:Addresses for a peer"
#[derive(Debug, Serialize, Deserialize)]
pub struct SocialAddressUpdatePayload {
pub node_id: NodeId,
pub addresses: Vec<String>,
pub peer_addresses: Vec<PeerWithAddress>,
}
/// Disconnect notice: "peer X disconnected"
#[derive(Debug, Serialize, Deserialize)]
pub struct SocialDisconnectNoticePayload {
pub node_id: NodeId,
}
/// Lightweight keepalive checkin (bidirectional)
#[derive(Debug, Serialize, Deserialize)]
pub struct SocialCheckinPayload {
pub node_id: NodeId,
pub addresses: Vec<String>,
pub peer_addresses: Vec<PeerWithAddress>,
}
// --- Blob transfer payloads ---
/// Request a blob by CID (bi-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobRequestPayload {
pub cid: [u8; 32],
/// Requester's addresses so the host can record downstream
#[serde(default)]
pub requester_addresses: Vec<String>,
}
/// Blob response: found flag + base64-encoded data + CDN manifest
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobResponsePayload {
pub cid: [u8; 32],
pub found: bool,
/// Base64-encoded blob bytes (empty if not found)
#[serde(default)]
pub data_b64: String,
/// Author manifest + host info (if available)
#[serde(default)]
pub manifest: Option<CdnManifest>,
/// Whether host accepted requester as downstream
#[serde(default)]
pub cdn_registered: bool,
/// If not registered (host full), try these peers
#[serde(default)]
pub cdn_redirect_peers: Vec<PeerWithAddress>,
}
/// Request a manifest refresh for a CID (bi-stream: ask upstream)
#[derive(Debug, Serialize, Deserialize)]
pub struct ManifestRefreshRequestPayload {
pub cid: [u8; 32],
pub current_updated_at: u64,
}
/// Manifest refresh response
#[derive(Debug, Serialize, Deserialize)]
pub struct ManifestRefreshResponsePayload {
pub cid: [u8; 32],
pub updated: bool,
pub manifest: Option<CdnManifest>,
}
/// Push updated manifests to downstream peers (uni-stream)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestPushPayload {
pub manifests: Vec<ManifestPushEntry>,
}
/// A single manifest push entry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestPushEntry {
pub cid: [u8; 32],
pub manifest: CdnManifest,
}
/// Notify upstream/downstream that a blob has been deleted (uni-stream)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobDeleteNoticePayload {
pub cid: [u8; 32],
/// If sender was upstream and is providing their own upstream for tree healing
#[serde(default)]
pub upstream_node: Option<PeerWithAddress>,
}
// --- Group key distribution payloads ---
/// Admin pushes wrapped group key to a member (uni-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct GroupKeyDistributePayload {
pub group_id: GroupId,
pub circle_name: String,
pub epoch: GroupEpoch,
pub group_public_key: [u8; 32],
pub admin: NodeId,
pub member_keys: Vec<GroupMemberKey>,
}
/// Member requests current group key (bi-stream request)
#[derive(Debug, Serialize, Deserialize)]
pub struct GroupKeyRequestPayload {
pub group_id: GroupId,
pub known_epoch: GroupEpoch,
}
/// Admin responds with wrapped key (bi-stream response)
#[derive(Debug, Serialize, Deserialize)]
pub struct GroupKeyResponsePayload {
pub group_id: GroupId,
pub epoch: GroupEpoch,
pub group_public_key: [u8; 32],
pub admin: NodeId,
pub member_key: Option<GroupMemberKey>,
}
// --- Relay introduction payloads ---
/// Relay introduction identifier for deduplication
pub type IntroId = [u8; 16];
/// Request introduction to a target through a relay peer (bi-stream)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayIntroducePayload {
pub intro_id: IntroId,
pub target: NodeId,
pub requester: NodeId,
pub requester_addresses: Vec<String>,
/// Max forwarding hops remaining (0 = relay must know target directly)
pub ttl: u8,
/// Requester's current NAT mapping type (for hole punch strategy)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nat_mapping: Option<String>,
/// Requester's current NAT filtering type
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nat_filtering: Option<String>,
}
/// Target's response to a relay introduction (bi-stream response)
#[derive(Debug, Serialize, Deserialize)]
pub struct RelayIntroduceResultPayload {
pub intro_id: IntroId,
pub accepted: bool,
pub target_addresses: Vec<String>,
/// Relay is willing to serve as stream relay fallback
pub relay_available: bool,
pub reject_reason: Option<String>,
/// Target's current NAT mapping type (for hole punch strategy)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nat_mapping: Option<String>,
/// Target's current NAT filtering type
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nat_filtering: Option<String>,
}
/// Open a relay pipe — intermediary splices two bi-streams (bi-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct SessionRelayPayload {
pub intro_id: IntroId,
pub target: NodeId,
}
/// Mesh preference negotiation (bi-stream: request + response)
#[derive(Debug, Serialize, Deserialize)]
pub struct MeshPreferPayload {
/// true = "I want us to be preferred peers" (request)
pub requesting: bool,
/// true = "I agree to be preferred peers" (response only)
pub accepted: bool,
/// Reason for rejection (response only, when accepted=false)
#[serde(default)]
pub reject_reason: Option<String>,
}
/// Circle profile update: encrypted profile variant for a circle (uni-stream push)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircleProfileUpdatePayload {
pub author: NodeId,
pub circle_name: String,
pub group_id: GroupId,
pub epoch: GroupEpoch,
/// base64(ChaCha20-Poly1305 encrypted JSON of CircleProfile)
pub encrypted_payload: String,
/// 60 bytes: nonce(12) || encrypted_cek(32) || tag(16)
pub wrapped_cek: Vec<u8>,
pub updated_at: u64,
}
// --- Anchor referral payloads ---
/// Node registers its address with an anchor (uni-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct AnchorRegisterPayload {
pub node_id: NodeId,
pub addresses: Vec<String>,
}
/// Node requests peer referrals from an anchor (bi-stream request)
#[derive(Debug, Serialize, Deserialize)]
pub struct AnchorReferralRequestPayload {
pub requester: NodeId,
pub requester_addresses: Vec<String>,
}
/// Anchor responds with peer referrals (bi-stream response)
#[derive(Debug, Serialize, Deserialize)]
pub struct AnchorReferralResponsePayload {
pub referrals: Vec<AnchorReferral>,
}
/// A single peer referral from an anchor
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnchorReferral {
pub node_id: NodeId,
pub addresses: Vec<String>,
}
// --- Anchor probe payloads ---
/// Request an anchor self-verification probe (bi-stream: sent to reporter)
#[derive(Debug, Serialize, Deserialize)]
pub struct AnchorProbeRequestPayload {
/// Address to cold-connect to (the candidate's external address)
pub target_addr: String,
/// Stranger from N2 who will perform the cold connect test
pub witness: NodeId,
/// The node requesting verification (us)
pub candidate: NodeId,
/// Candidate's addresses for the witness to deliver result directly
pub candidate_addresses: Vec<String>,
/// Dedup identifier
pub probe_id: [u8; 16],
}
/// Result of an anchor self-verification probe
#[derive(Debug, Serialize, Deserialize)]
pub struct AnchorProbeResultPayload {
pub probe_id: [u8; 16],
pub reachable: bool,
pub observed_addr: Option<String>,
}
/// Port scan heartbeat during scanning hole punch (informational)
#[derive(Debug, Serialize, Deserialize)]
pub struct PortScanHeartbeatPayload {
pub peer: NodeId,
/// Port the peer was seen from (if any)
pub seen_from_port: Option<u16>,
}
/// Request NAT filtering probe from anchor (bi-stream).
/// Anchor will attempt to reach us from a different source port.
#[derive(Debug, Serialize, Deserialize)]
pub struct NatFilterProbePayload {
/// Our node ID
pub node_id: NodeId,
}
/// Result of NAT filtering probe
#[derive(Debug, Serialize, Deserialize)]
pub struct NatFilterProbeResultPayload {
/// true = we reached you from a different port (address-restricted / Open)
/// false = could not reach you (port-restricted)
pub reachable: bool,
}
// --- Engagement payloads ---
/// Incremental engagement diff (uni-stream: propagated through post_downstream tree)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobHeaderDiffPayload {
pub post_id: PostId,
pub author: NodeId,
pub ops: Vec<BlobHeaderDiffOp>,
/// Timestamp of this diff batch
pub timestamp_ms: u64,
}
/// Request full engagement header for a post (bi-stream request)
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobHeaderRequestPayload {
pub post_id: PostId,
/// Requester's current header timestamp (0 = never seen)
pub current_updated_at: u64,
}
/// Full engagement header response (bi-stream response)
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobHeaderResponsePayload {
pub post_id: PostId,
/// True if the sender has a newer header than requested
pub updated: bool,
/// JSON-serialized BlobHeader (if updated)
#[serde(default)]
pub header_json: Option<String>,
}
/// Register as a downstream holder of a post (uni-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct PostDownstreamRegisterPayload {
pub post_id: PostId,
}
/// Request a single post by ID (bi-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct PostFetchRequestPayload {
pub post_id: PostId,
}
/// Single-post fetch response (bi-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct PostFetchResponsePayload {
pub post_id: PostId,
pub found: bool,
pub post: Option<SyncPost>,
}
// --- Active CDN Replication payloads ---
/// Request a peer to replicate (cache) specific posts and their blobs (bi-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct ReplicationRequestPayload {
/// Posts to replicate (with their blobs)
pub post_ids: Vec<PostId>,
/// 0-255 urgency (higher = more important to cache)
pub priority: u8,
}
/// Response to a replication request (bi-stream)
#[derive(Debug, Serialize, Deserialize)]
pub struct ReplicationResponsePayload {
/// Posts the peer agreed to hold
pub accepted: Vec<PostId>,
/// Posts the peer declined (over budget or no space)
pub rejected: Vec<PostId>,
}
/// Request a TCP hole punch toward a browser IP (bi-stream).
/// Sent by the anchor to a node that holds a post, so the node's NAT
/// opens a pinhole allowing the browser to connect directly via HTTP.
#[derive(Debug, Serialize, Deserialize)]
pub struct TcpPunchRequestPayload {
/// Browser's public IP (from X-Forwarded-For)
pub browser_ip: String,
/// Post being requested (for validation — node must hold this post)
pub post_id: PostId,
}
/// Result of a TCP punch attempt (bi-stream response).
#[derive(Debug, Serialize, Deserialize)]
pub struct TcpPunchResultPayload {
/// Whether the punch SYN was sent
pub success: bool,
/// External HTTP address the browser should be redirected to
pub http_addr: Option<String>,
}
// --- Wire helpers ---
/// Write a typed message: 1-byte type + length-prefixed JSON payload
pub async fn write_typed_message<T: Serialize>(
send: &mut iroh::endpoint::SendStream,
msg_type: MessageType,
payload: &T,
) -> anyhow::Result<()> {
let bytes = serde_json::to_vec(payload)?;
send.write_all(&[msg_type.as_byte()]).await?;
let len = (bytes.len() as u32).to_be_bytes();
send.write_all(&len).await?;
send.write_all(&bytes).await?;
Ok(())
}
/// Read the 1-byte message type header from a stream
pub async fn read_message_type(
recv: &mut iroh::endpoint::RecvStream,
) -> anyhow::Result<MessageType> {
let mut buf = [0u8; 1];
recv.read_exact(&mut buf).await?;
MessageType::from_byte(buf[0])
.ok_or_else(|| anyhow::anyhow!("unknown message type: 0x{:02x}", buf[0]))
}
/// Read a length-prefixed JSON payload (after type byte has been consumed)
pub async fn read_payload<T: serde::de::DeserializeOwned>(
recv: &mut iroh::endpoint::RecvStream,
max_size: usize,
) -> anyhow::Result<T> {
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
if len > max_size {
anyhow::bail!("payload too large: {} bytes (max {})", len, max_size);
}
let mut buf = vec![0u8; len];
recv.read_exact(&mut buf).await?;
Ok(serde_json::from_slice(&buf)?)
}
/// Generic length-prefixed JSON write for any serializable type
pub async fn write_framed<T: Serialize>(
send: &mut iroh::endpoint::SendStream,
msg: &T,
) -> anyhow::Result<()> {
let bytes = serde_json::to_vec(msg)?;
let len = (bytes.len() as u32).to_be_bytes();
send.write_all(&len).await?;
send.write_all(&bytes).await?;
Ok(())
}
/// Generic length-prefixed JSON read for any deserializable type
pub async fn read_framed<T: serde::de::DeserializeOwned>(
recv: &mut iroh::endpoint::RecvStream,
max_size: usize,
) -> anyhow::Result<T> {
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
if len > max_size {
anyhow::bail!("framed message too large: {} bytes (max {})", len, max_size);
}
let mut buf = vec![0u8; len];
recv.read_exact(&mut buf).await?;
Ok(serde_json::from_slice(&buf)?)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message_type_roundtrip() {
let types = [
MessageType::NodeListUpdate,
MessageType::InitialExchange,
MessageType::AddressRequest,
MessageType::AddressResponse,
MessageType::RefuseRedirect,
MessageType::PullSyncRequest,
MessageType::PullSyncResponse,
MessageType::PostNotification,
MessageType::PostPush,
MessageType::AudienceRequest,
MessageType::AudienceResponse,
MessageType::ProfileUpdate,
MessageType::DeleteRecord,
MessageType::VisibilityUpdate,
MessageType::WormQuery,
MessageType::WormResponse,
MessageType::SocialAddressUpdate,
MessageType::SocialDisconnectNotice,
MessageType::SocialCheckin,
MessageType::BlobRequest,
MessageType::BlobResponse,
MessageType::ManifestRefreshRequest,
MessageType::ManifestRefreshResponse,
MessageType::ManifestPush,
MessageType::BlobDeleteNotice,
MessageType::GroupKeyDistribute,
MessageType::GroupKeyRequest,
MessageType::GroupKeyResponse,
MessageType::RelayIntroduce,
MessageType::RelayIntroduceResult,
MessageType::SessionRelay,
MessageType::MeshPrefer,
MessageType::CircleProfileUpdate,
MessageType::AnchorRegister,
MessageType::AnchorReferralRequest,
MessageType::AnchorReferralResponse,
MessageType::AnchorProbeRequest,
MessageType::AnchorProbeResult,
MessageType::PortScanHeartbeat,
MessageType::NatFilterProbe,
MessageType::NatFilterProbeResult,
MessageType::BlobHeaderDiff,
MessageType::BlobHeaderRequest,
MessageType::BlobHeaderResponse,
MessageType::PostDownstreamRegister,
MessageType::PostFetchRequest,
MessageType::PostFetchResponse,
MessageType::TcpPunchRequest,
MessageType::TcpPunchResult,
MessageType::MeshKeepalive,
MessageType::ReplicationRequest,
MessageType::ReplicationResponse,
];
for mt in types {
let byte = mt.as_byte();
let recovered = MessageType::from_byte(byte).expect("roundtrip failed");
assert_eq!(mt, recovered);
}
}
#[test]
fn unknown_message_type_returns_none() {
assert!(MessageType::from_byte(0xFF).is_none());
assert!(MessageType::from_byte(0x00).is_none());
assert!(MessageType::from_byte(0x06).is_none());
}
#[test]
fn blob_delete_notice_payload_roundtrip() {
use crate::types::PeerWithAddress;
// Without upstream
let payload = BlobDeleteNoticePayload {
cid: [42u8; 32],
upstream_node: None,
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: BlobDeleteNoticePayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.cid, [42u8; 32]);
assert!(decoded.upstream_node.is_none());
// With upstream
let payload_with_up = BlobDeleteNoticePayload {
cid: [99u8; 32],
upstream_node: Some(PeerWithAddress {
n: hex::encode([1u8; 32]),
a: vec!["10.0.0.1:4433".to_string()],
}),
};
let json2 = serde_json::to_string(&payload_with_up).unwrap();
let decoded2: BlobDeleteNoticePayload = serde_json::from_str(&json2).unwrap();
assert_eq!(decoded2.cid, [99u8; 32]);
assert!(decoded2.upstream_node.is_some());
let up = decoded2.upstream_node.unwrap();
assert_eq!(up.a, vec!["10.0.0.1:4433".to_string()]);
}
#[test]
fn relay_introduce_payload_roundtrip() {
let payload = RelayIntroducePayload {
intro_id: [42u8; 16],
target: [1u8; 32],
requester: [2u8; 32],
requester_addresses: vec!["10.0.0.2:4433".to_string()],
ttl: 1,
nat_mapping: Some("eim".to_string()),
nat_filtering: Some("open".to_string()),
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: RelayIntroducePayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.intro_id, [42u8; 16]);
assert_eq!(decoded.target, [1u8; 32]);
assert_eq!(decoded.requester, [2u8; 32]);
assert_eq!(decoded.requester_addresses, vec!["10.0.0.2:4433".to_string()]);
assert_eq!(decoded.ttl, 1);
}
#[test]
fn relay_introduce_result_payload_roundtrip() {
let payload = RelayIntroduceResultPayload {
intro_id: [7u8; 16],
accepted: true,
target_addresses: vec!["10.0.0.1:4433".to_string(), "192.168.1.1:4433".to_string()],
relay_available: true,
reject_reason: None,
nat_mapping: Some("eim".to_string()),
nat_filtering: Some("open".to_string()),
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: RelayIntroduceResultPayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.intro_id, [7u8; 16]);
assert!(decoded.accepted);
assert_eq!(decoded.target_addresses.len(), 2);
assert!(decoded.relay_available);
assert!(decoded.reject_reason.is_none());
// Test rejected case
let rejected = RelayIntroduceResultPayload {
intro_id: [8u8; 16],
accepted: false,
target_addresses: vec![],
relay_available: false,
reject_reason: Some("target not reachable".to_string()),
nat_mapping: None,
nat_filtering: None,
};
let json2 = serde_json::to_string(&rejected).unwrap();
let decoded2: RelayIntroduceResultPayload = serde_json::from_str(&json2).unwrap();
assert!(!decoded2.accepted);
assert_eq!(decoded2.reject_reason.unwrap(), "target not reachable");
}
#[test]
fn mesh_prefer_payload_roundtrip() {
// Request
let request = MeshPreferPayload {
requesting: true,
accepted: false,
reject_reason: None,
};
let json = serde_json::to_string(&request).unwrap();
let decoded: MeshPreferPayload = serde_json::from_str(&json).unwrap();
assert!(decoded.requesting);
assert!(!decoded.accepted);
assert!(decoded.reject_reason.is_none());
// Accepted response
let accept = MeshPreferPayload {
requesting: false,
accepted: true,
reject_reason: None,
};
let json2 = serde_json::to_string(&accept).unwrap();
let decoded2: MeshPreferPayload = serde_json::from_str(&json2).unwrap();
assert!(!decoded2.requesting);
assert!(decoded2.accepted);
// Rejected response
let reject = MeshPreferPayload {
requesting: false,
accepted: false,
reject_reason: Some("slots full".to_string()),
};
let json3 = serde_json::to_string(&reject).unwrap();
let decoded3: MeshPreferPayload = serde_json::from_str(&json3).unwrap();
assert!(!decoded3.accepted);
assert_eq!(decoded3.reject_reason.unwrap(), "slots full");
}
#[test]
fn session_relay_payload_roundtrip() {
let payload = SessionRelayPayload {
intro_id: [55u8; 16],
target: [3u8; 32],
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: SessionRelayPayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.intro_id, [55u8; 16]);
assert_eq!(decoded.target, [3u8; 32]);
}
#[test]
fn circle_profile_update_payload_roundtrip() {
let payload = CircleProfileUpdatePayload {
author: [1u8; 32],
circle_name: "friends".to_string(),
group_id: [2u8; 32],
epoch: 3,
encrypted_payload: "base64data==".to_string(),
wrapped_cek: vec![0u8; 60],
updated_at: 1700000000000,
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: CircleProfileUpdatePayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.author, [1u8; 32]);
assert_eq!(decoded.circle_name, "friends");
assert_eq!(decoded.group_id, [2u8; 32]);
assert_eq!(decoded.epoch, 3);
assert_eq!(decoded.encrypted_payload, "base64data==");
assert_eq!(decoded.wrapped_cek.len(), 60);
assert_eq!(decoded.updated_at, 1700000000000);
}
#[test]
fn anchor_register_payload_roundtrip() {
let payload = AnchorRegisterPayload {
node_id: [1u8; 32],
addresses: vec!["192.168.1.5:4433".to_string(), "10.0.0.1:4433".to_string()],
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: AnchorRegisterPayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.node_id, [1u8; 32]);
assert_eq!(decoded.addresses.len(), 2);
assert_eq!(decoded.addresses[0], "192.168.1.5:4433");
}
#[test]
fn anchor_referral_request_payload_roundtrip() {
let payload = AnchorReferralRequestPayload {
requester: [2u8; 32],
requester_addresses: vec!["10.0.0.2:4433".to_string()],
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: AnchorReferralRequestPayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.requester, [2u8; 32]);
assert_eq!(decoded.requester_addresses, vec!["10.0.0.2:4433"]);
}
#[test]
fn anchor_referral_response_payload_roundtrip() {
let payload = AnchorReferralResponsePayload {
referrals: vec![
AnchorReferral {
node_id: [3u8; 32],
addresses: vec!["10.0.0.3:4433".to_string()],
},
AnchorReferral {
node_id: [4u8; 32],
addresses: vec!["10.0.0.4:4433".to_string(), "192.168.1.4:4433".to_string()],
},
],
};
let json = serde_json::to_string(&payload).unwrap();
let decoded: AnchorReferralResponsePayload = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.referrals.len(), 2);
assert_eq!(decoded.referrals[0].node_id, [3u8; 32]);
assert_eq!(decoded.referrals[0].addresses, vec!["10.0.0.3:4433"]);
assert_eq!(decoded.referrals[1].node_id, [4u8; 32]);
assert_eq!(decoded.referrals[1].addresses.len(), 2);
// Empty referrals
let empty = AnchorReferralResponsePayload { referrals: vec![] };
let json2 = serde_json::to_string(&empty).unwrap();
let decoded2: AnchorReferralResponsePayload = serde_json::from_str(&json2).unwrap();
assert!(decoded2.referrals.is_empty());
}
}