v0.4.2: Welcome screen, status ticker, notifications, text scaling, networking fixes
Welcome screen with staggered counters while backend bootstraps. Header status ticker for new posts/messages/reactions/comments/connection changes. Notification fallback chain (Tauri plugin → Web API → notify-rust). Responsive text scaling (Small/Normal/Large, persisted). Diagnostics moved to popover with on-demand connections. Share details lightbox with QR code. Connect string prefers external address. Stale N1 fix (disconnected routes excluded). Replication handler actively fetches posts+blobs from requester. Hole punch registers remote address for relay. Replication semaphore (3 concurrent). Peer labels show truncated node ID. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
79922a9208
commit
6004cae8a8
10 changed files with 446 additions and 95 deletions
|
|
@ -3818,13 +3818,16 @@ impl ConnectionManager {
|
|||
};
|
||||
tokio::spawn(async move {
|
||||
if let Some(conn) = hole_punch_with_scanning(&endpoint, &requester, &requester_addrs, our_nat_profile, peer_nat_profile).await {
|
||||
// Register as session so the connection is actually used
|
||||
// Register as session with the peer's address for relay introduction
|
||||
let remote_sock = requester_addrs.iter()
|
||||
.filter_map(|a| a.parse::<std::net::SocketAddr>().ok())
|
||||
.find(|s| crate::network::is_publicly_routable(s));
|
||||
let mut cm = conn_mgr_arc.lock().await;
|
||||
if cm.is_connected(&requester) {
|
||||
// Initiator already connected to us (their punch succeeded first)
|
||||
return;
|
||||
}
|
||||
cm.add_session(requester, conn, SessionReachMethod::HolePunch, None);
|
||||
cm.add_session(requester, conn, SessionReachMethod::HolePunch, remote_sock);
|
||||
cm.mark_reachable(&requester);
|
||||
cm.log_activity(
|
||||
ActivityLevel::Info,
|
||||
|
|
@ -5664,6 +5667,12 @@ impl ConnectionManager {
|
|||
write_typed_message(&mut send, MessageType::BlobHeaderResponse, &response).await?;
|
||||
}
|
||||
MessageType::ReplicationRequest => {
|
||||
// Limit to 3 concurrent replication handlers to prevent overload
|
||||
static REPLICATION_SEMAPHORE: std::sync::LazyLock<tokio::sync::Semaphore> =
|
||||
std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(3));
|
||||
let _permit = REPLICATION_SEMAPHORE.acquire().await
|
||||
.map_err(|_| anyhow::anyhow!("replication semaphore closed"))?;
|
||||
|
||||
let payload: ReplicationRequestPayload = read_payload(&mut recv, MAX_PAYLOAD).await?;
|
||||
let (accepted, rejected, needs_pull) = {
|
||||
let cm = conn_mgr.lock().await;
|
||||
|
|
@ -5711,9 +5720,84 @@ impl ConnectionManager {
|
|||
needs_pull = needs_pull_count,
|
||||
"Handled replication request"
|
||||
);
|
||||
// Posts we accepted but don't have will be fetched on the next pull cycle
|
||||
// from the requester (they have these posts since they asked us to hold them).
|
||||
// No explicit pull spawn needed — the periodic pull cycle handles it.
|
||||
// Actively fetch posts we accepted but don't have from the requester
|
||||
if !needs_pull.is_empty() {
|
||||
let cm_arc = conn_mgr.clone();
|
||||
let sender = remote_node_id;
|
||||
tokio::spawn(async move {
|
||||
let conn = {
|
||||
let cm = cm_arc.lock().await;
|
||||
cm.connections_ref().get(&sender).map(|pc| pc.connection.clone())
|
||||
.or_else(|| cm.sessions.get(&sender).map(|sc| sc.connection.clone()))
|
||||
};
|
||||
let Some(conn) = conn else { return };
|
||||
let mut fetched = 0usize;
|
||||
for post_id in &needs_pull {
|
||||
// PostFetch without holding any lock
|
||||
let result: anyhow::Result<Option<crate::protocol::SyncPost>> = async {
|
||||
let (mut send, mut recv) = conn.open_bi().await?;
|
||||
let req = crate::protocol::PostFetchRequestPayload { post_id: *post_id };
|
||||
write_typed_message(&mut send, MessageType::PostFetchRequest, &req).await?;
|
||||
send.finish()?;
|
||||
let msg_type = read_message_type(&mut recv).await?;
|
||||
if msg_type != MessageType::PostFetchResponse {
|
||||
return Ok(None);
|
||||
}
|
||||
let resp: crate::protocol::PostFetchResponsePayload = read_payload(&mut recv, MAX_PAYLOAD).await?;
|
||||
Ok(resp.post)
|
||||
}.await;
|
||||
|
||||
if let Ok(Some(sp)) = result {
|
||||
if crate::content::verify_post_id(&sp.id, &sp.post) {
|
||||
let attachments = sp.post.attachments.clone();
|
||||
let post_author = sp.post.author;
|
||||
let cm = cm_arc.lock().await;
|
||||
let storage = cm.storage.lock().await;
|
||||
let _ = storage.store_post_with_visibility(&sp.id, &sp.post, &sp.visibility);
|
||||
let prio = storage.get_post_upstreams(&sp.id).map(|v| v.len() as u8).unwrap_or(0);
|
||||
let _ = storage.add_post_upstream(&sp.id, &sender, prio);
|
||||
let blob_store = cm.blob_store.clone();
|
||||
drop(storage);
|
||||
drop(cm);
|
||||
fetched += 1;
|
||||
|
||||
// Fetch blobs for this post from the requester
|
||||
for att in &attachments {
|
||||
if blob_store.has(&att.cid) { continue; }
|
||||
let blob_result: anyhow::Result<()> = async {
|
||||
let (mut bs, mut br) = conn.open_bi().await?;
|
||||
let req = BlobRequestPayload {
|
||||
cid: att.cid,
|
||||
requester_addresses: vec![],
|
||||
};
|
||||
write_typed_message(&mut bs, MessageType::BlobRequest, &req).await?;
|
||||
bs.finish()?;
|
||||
let mt = read_message_type(&mut br).await?;
|
||||
if mt != MessageType::BlobResponse { return Ok(()); }
|
||||
let resp: BlobResponsePayload = read_payload(&mut br, MAX_PAYLOAD).await?;
|
||||
if resp.found {
|
||||
use base64::Engine;
|
||||
let data = base64::engine::general_purpose::STANDARD.decode(resp.data_b64.as_bytes())?;
|
||||
blob_store.store(&att.cid, &data)?;
|
||||
let cm = cm_arc.lock().await;
|
||||
let storage = cm.storage.lock().await;
|
||||
let _ = storage.record_blob(&att.cid, post_id, &post_author, data.len() as u64, &att.mime_type, att.size_bytes);
|
||||
let _ = storage.add_post_upstream(&att.cid, &sender, 0);
|
||||
}
|
||||
Ok(())
|
||||
}.await;
|
||||
if let Err(e) = blob_result {
|
||||
debug!(cid = hex::encode(att.cid), error = %e, "Replication blob fetch failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if fetched > 0 {
|
||||
debug!(fetched, peer = hex::encode(sender), "Fetched replicated posts from requester");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
other => {
|
||||
warn!(msg_type = ?other, "Unexpected message type on bi-stream");
|
||||
|
|
@ -7189,7 +7273,9 @@ impl ConnectionActor {
|
|||
}
|
||||
if let Ok(routes) = storage.list_social_routes() {
|
||||
for route in &routes {
|
||||
set.insert(route.node_id);
|
||||
if route.status == crate::types::SocialStatus::Online {
|
||||
set.insert(route.node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
for nid in &sticky_peers {
|
||||
|
|
|
|||
|
|
@ -2883,10 +2883,12 @@ impl Storage {
|
|||
for (nid, _, _) in mesh_peers {
|
||||
ids.insert(nid);
|
||||
}
|
||||
// Add social routes
|
||||
// Add only ONLINE social routes (not disconnected)
|
||||
let routes = self.list_social_routes()?;
|
||||
for route in routes {
|
||||
ids.insert(route.node_id);
|
||||
if route.status == crate::types::SocialStatus::Online {
|
||||
ids.insert(route.node_id);
|
||||
}
|
||||
}
|
||||
Ok(ids.into_iter().collect())
|
||||
}
|
||||
|
|
@ -4870,9 +4872,16 @@ mod tests {
|
|||
preferred_tree: vec![],
|
||||
}).unwrap();
|
||||
|
||||
// Disconnected routes should NOT be in N1 share
|
||||
let n1 = s.build_n1_share().unwrap();
|
||||
assert!(n1.contains(&peer_a));
|
||||
assert!(n1.contains(&follow_b));
|
||||
assert!(!n1.contains(&follow_b), "Disconnected social route should not be in N1");
|
||||
|
||||
// Set to Online — now it should be included
|
||||
s.set_social_route_status(&follow_b, SocialStatus::Online).unwrap();
|
||||
let n1 = s.build_n1_share().unwrap();
|
||||
assert!(n1.contains(&peer_a));
|
||||
assert!(n1.contains(&follow_b), "Online social route should be in N1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue