v0.4.3: Lock contention overhaul, StoragePool, mobile bottom nav, text scaling

Eliminate all conn_mgr lock holds during network I/O across 14 actor commands
and bi-stream handlers. PostFetch, TcpPunch, PullFromPeer, FetchEngagement,
ResolveAddress, AnchorProbe use brief locks for data gathering only. WormLookup,
ContentSearch, WormQuery use connection snapshots for lock-free cascade fan-out.
RelayIntroduce extracts forwarding data under brief lock, does I/O outside.
BlobRequest, PostFetchRequest, ManifestRefresh use Arc clones instead of conn_mgr
lock. ConnectionActor hoists shared Arcs (storage, blob_store, endpoint) for
lock-free access. ResolveAddress adds 5s per-query timeout (was unbounded).

Initial exchange failure now aborts mesh upgrade (was silently continuing with
broken connection). connect_to_peer/connect_to_anchor use consistent 15s timeout.
Rebalance connects outside the lock via pending_connects pattern.

StoragePool: 8 concurrent SQLite connections in WAL mode replace single
Mutex<Storage>. Reads run fully parallel; writes serialize at SQLite level only.
PRAGMA busy_timeout=5000 for graceful write contention.

Mobile bottom nav bar (<=768px) with icon tabs. Text sizes: XS/S/M/L/XL
(75%/100%/125%/150%/200%), default M. localStorage persistence for instant
restore. Toast repositioned above mobile nav.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Scott Reimers 2026-03-22 21:35:38 -04:00
parent f17535d61d
commit 43adbbdf7d
15 changed files with 1546 additions and 618 deletions

File diff suppressed because it is too large Load diff

View file

@ -12,7 +12,7 @@ use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::blob::BlobStore;
use crate::storage::Storage;
use crate::storage::{Storage, StoragePool};
use crate::types::PostVisibility;
/// Connection budget: 5 content slots, 15 redirect slots, 1 per IP.
@ -104,7 +104,7 @@ impl HttpBudget {
/// Run the HTTP server on the given port. Blocks forever.
pub async fn run_http_server(
port: u16,
storage: Arc<Mutex<Storage>>,
storage: Arc<StoragePool>,
blob_store: Arc<BlobStore>,
downstream_addrs: Arc<Mutex<HashMap<[u8; 32], Vec<SocketAddr>>>>,
) -> anyhow::Result<()> {
@ -180,7 +180,7 @@ async fn handle_connection(
mut stream: TcpStream,
_ip: IpAddr,
slot: SlotKind,
storage: &Arc<Mutex<Storage>>,
storage: &Arc<StoragePool>,
blob_store: &Arc<BlobStore>,
downstream_addrs: &Arc<Mutex<HashMap<[u8; 32], Vec<SocketAddr>>>>,
) {
@ -281,12 +281,12 @@ fn validate_hex64(s: &str) -> Option<[u8; 32]> {
async fn serve_post(
stream: &mut TcpStream,
post_id: &[u8; 32],
storage: &Arc<Mutex<Storage>>,
storage: &Arc<StoragePool>,
blob_store: &Arc<BlobStore>,
) -> bool {
// Look up post + visibility
let result = {
let store = storage.lock().await;
let store = storage.get().await;
store.get_post_with_visibility(post_id)
};
@ -301,7 +301,7 @@ async fn serve_post(
// Look up author name
let author_name = {
let store = storage.lock().await;
let store = storage.get().await;
store
.get_profile(&post.author)
.ok()
@ -321,12 +321,12 @@ async fn serve_post(
async fn serve_blob(
stream: &mut TcpStream,
blob_id: &[u8; 32],
storage: &Arc<Mutex<Storage>>,
storage: &Arc<StoragePool>,
blob_store: &Arc<BlobStore>,
) -> bool {
// Verify this blob belongs to a public post
let (mime_type, _post_id) = {
let store = storage.lock().await;
let store = storage.get().await;
match find_public_blob_info(&store, blob_id) {
Some(info) => info,
None => return false, // not found or not public — hard close
@ -367,12 +367,12 @@ fn find_public_blob_info(store: &Storage, blob_id: &[u8; 32]) -> Option<(String,
async fn try_redirect(
stream: &mut TcpStream,
post_id: &[u8; 32],
storage: &Arc<Mutex<Storage>>,
storage: &Arc<StoragePool>,
_downstream_addrs: &Arc<Mutex<HashMap<[u8; 32], Vec<SocketAddr>>>>,
) -> bool {
// Get downstream peers for this post
let downstream_peers = {
let store = storage.lock().await;
let store = storage.get().await;
// Verify post exists and is public first
match store.get_post_with_visibility(post_id) {
Ok(Some((_, PostVisibility::Public))) => {}
@ -383,7 +383,7 @@ async fn try_redirect(
// Get addresses for downstream peers
let candidates: Vec<SocketAddr> = {
let store = storage.lock().await;
let store = storage.get().await;
let mut addrs = Vec::new();
for peer_id in &downstream_peers {
if let Ok(Some(peer)) = store.get_peer_record(peer_id) {

View file

@ -18,7 +18,7 @@ use crate::protocol::{
PullSyncRequestPayload, PullSyncResponsePayload, RefuseRedirectPayload,
SocialAddressUpdatePayload, SocialDisconnectNoticePayload, SyncPost, ALPN_V2,
};
use crate::storage::Storage;
use crate::storage::{Storage, StoragePool};
use crate::types::{
DeleteRecord, DeviceProfile, DeviceRole, NodeId, PeerSlotKind, PeerWithAddress, Post, PostId,
PostVisibility, PublicProfile, SessionReachMethod, WormResult,
@ -27,7 +27,7 @@ use crate::types::{
/// The network layer: manages the iroh endpoint and mesh connections
pub struct Network {
endpoint: iroh::Endpoint,
storage: Arc<Mutex<Storage>>,
storage: Arc<StoragePool>,
our_node_id: NodeId,
is_anchor: Arc<AtomicBool>,
conn_mgr: Arc<Mutex<ConnectionManager>>,
@ -79,7 +79,7 @@ pub(crate) fn is_publicly_routable(addr: &SocketAddr) -> bool {
impl Network {
pub async fn new(
secret_key: iroh::SecretKey,
storage: Arc<Mutex<Storage>>,
storage: Arc<StoragePool>,
bind_addr: Option<SocketAddr>,
secret_seed: [u8; 32],
blob_store: Arc<BlobStore>,
@ -469,7 +469,7 @@ impl Network {
// Store peer with their address
{
let storage = this.storage.lock().await;
let storage = this.storage.get().await;
let _ = storage.upsert_peer(&remote_node_id, &[remote_sock], None);
}
@ -512,7 +512,7 @@ impl Network {
async fn handle_incoming_connection(
conn_mgr: Arc<Mutex<ConnectionManager>>,
conn_handle: ConnHandle,
storage: Arc<Mutex<Storage>>,
storage: Arc<StoragePool>,
conn: iroh::endpoint::Connection,
remote_node_id: NodeId,
remote_sock: SocketAddr,
@ -608,7 +608,7 @@ impl Network {
/// Uses ConnHandle for all state access — no direct conn_mgr lock.
async fn try_mesh_upgrade(
conn_handle: &ConnHandle,
storage: &Arc<Mutex<Storage>>,
storage: &Arc<StoragePool>,
conn: &iroh::endpoint::Connection,
remote_node_id: NodeId,
remote_sock: SocketAddr,
@ -638,7 +638,7 @@ impl Network {
}
{
let s = storage.lock().await;
let s = storage.get().await;
let _ = s.upsert_peer(&remote_node_id, &[remote_sock], None);
let _ = s.add_mesh_peer(&remote_node_id, PeerSlotKind::Local, 0);
if s.has_social_route(&remote_node_id).unwrap_or(false) {
@ -663,6 +663,7 @@ impl Network {
}
Err(e) => {
error!(peer = hex::encode(remote_node_id), error = ?e, "Initial exchange failed");
return false;
}
}
@ -692,13 +693,12 @@ impl Network {
// Store addresses so they're available during initial exchange
let addrs: Vec<std::net::SocketAddr> = addr.ip_addrs().copied().collect();
if !addrs.is_empty() {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
let _ = storage.upsert_peer(&peer_id, &addrs, None);
}
// QUIC connect OUTSIDE the conn_mgr lock — this can take 60+ seconds
// on unreachable peers and must not block other tasks
let conn = self.endpoint.connect(addr, ALPN_V2).await?;
// QUIC connect OUTSIDE the conn_mgr lock with 15s timeout
let conn = ConnectionManager::connect_to_unlocked(&self.endpoint, addr).await?;
// Register the established connection
self.conn_handle.register_connection(peer_id, conn.clone(), addrs, PeerSlotKind::Local).await;
@ -734,7 +734,7 @@ impl Network {
let addrs: Vec<SocketAddr> = redir.a.iter()
.filter_map(|a| a.parse::<SocketAddr>().ok())
.collect();
let s = self.storage.lock().await;
let s = self.storage.get().await;
let _ = s.upsert_peer(&redir_id, &addrs, None);
drop(s);
self.conn_handle.notify_growth();
@ -837,7 +837,7 @@ impl Network {
// Build full state: all current N1 and N2 as "added", nothing removed
let all_n1 = self.conn_handle.connected_peers().await;
let all_n2 = {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
storage.build_n2_share().unwrap_or_default()
};
@ -891,7 +891,7 @@ impl Network {
}
PostVisibility::GroupEncrypted { group_id, .. } => {
// Push to all group members
match self.storage.lock().await.get_all_group_members() {
match self.storage.get().await.get_all_group_members() {
Ok(map) => map.get(group_id).cloned().unwrap_or_default().into_iter().collect(),
Err(_) => return 0,
}
@ -1041,7 +1041,7 @@ impl Network {
manifest: &crate::types::CdnManifest,
) -> usize {
let downstream = {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
storage.get_blob_downstream(cid).unwrap_or_default()
};
let payload = crate::protocol::ManifestPushPayload {
@ -1156,7 +1156,7 @@ impl Network {
}
let audience_members: Vec<NodeId> = {
match self.storage.lock().await.list_audience_members() {
match self.storage.get().await.list_audience_members() {
Ok(m) => m,
Err(_) => return 0,
}
@ -1318,7 +1318,7 @@ impl Network {
Ok(()) => Ok(()),
Err(e) if e.to_string().contains("mesh refused") => {
// Anchor refused mesh — reconnect as session for registration
let conn = self.endpoint.connect(addr, ALPN_V2).await?;
let conn = ConnectionManager::connect_to_unlocked(&self.endpoint, addr).await?;
self.conn_handle.add_session(peer_id, conn, crate::types::SessionReachMethod::Direct, None).await;
self.conn_handle.log_activity(
ActivityLevel::Info,
@ -1352,7 +1352,7 @@ impl Network {
Ok(ExchangeResult::Accepted) => {
self.conn_handle.register_connection(peer_id, conn.clone(), vec![], PeerSlotKind::Local).await;
{
let s = self.storage.lock().await;
let s = self.storage.get().await;
let _ = s.add_mesh_peer(&peer_id, PeerSlotKind::Local, 0);
}
@ -1464,7 +1464,7 @@ impl Network {
let addrs: Vec<SocketAddr> = redir.a.iter()
.filter_map(|a| a.parse::<SocketAddr>().ok())
.collect();
let _ = self.storage.lock().await.upsert_peer(&redir_id, &addrs, None);
let _ = self.storage.get().await.upsert_peer(&redir_id, &addrs, None);
}
}
}
@ -1568,7 +1568,7 @@ impl Network {
} else {
// Network resolution: get reporter connections, resolve outside lock
let reporters_and_conns = {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
let n2 = storage.find_in_n2(&candidate_id).unwrap_or_default();
let n3 = storage.find_in_n3(&candidate_id).unwrap_or_default();
drop(storage);
@ -1660,7 +1660,7 @@ impl Network {
// Find N2 reporter(s) who told us about this peer — they can introduce us
let reporters = {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
storage.find_in_n2(&candidate_id).unwrap_or_default()
};
@ -1724,7 +1724,7 @@ impl Network {
/// Send a uni-stream message to all audience members (persistent if available, ephemeral otherwise).
async fn send_to_audience<T: Serialize>(&self, msg_type: MessageType, payload: &T) -> usize {
let audience: Vec<NodeId> = match self.storage.lock().await.list_audience_members() {
let audience: Vec<NodeId> = match self.storage.get().await.list_audience_members() {
Ok(m) => m,
Err(_) => return 0,
};
@ -1741,7 +1741,7 @@ impl Network {
pub async fn pull_from_peer(&self, peer_id: &NodeId) -> anyhow::Result<PullStats> {
let conn = self.get_connection(peer_id).await?;
let (our_follows, follows_sync) = {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
(
storage.list_follows()?,
storage.get_follows_with_last_sync().unwrap_or_default(),
@ -1768,7 +1768,7 @@ impl Network {
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
let mut posts_received = 0;
let mut vis_updates = 0;
for sp in &response.posts {
@ -1968,7 +1968,7 @@ impl Network {
Ok(Ok(result)) if result.accepted => {
let our_profile = self.conn_handle.our_nat_profile().await;
let peer_profile = {
let s = self.storage.lock().await;
let s = self.storage.get().await;
s.get_peer_nat_profile(peer_id)
};
if let Some(conn) = crate::connection::hole_punch_with_scanning(&self.endpoint, peer_id, &result.target_addresses, our_profile, peer_profile).await {
@ -2010,7 +2010,7 @@ impl Network {
pub async fn addr_from_storage(&self, peer_id: &NodeId) -> Option<iroh::EndpointAddr> {
let endpoint_id = iroh::EndpointId::from_bytes(peer_id).ok()?;
let mut addr = iroh::EndpointAddr::from(endpoint_id);
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
if let Ok(Some(rec)) = storage.get_peer_record(peer_id) {
for sock in &rec.addresses {
addr = addr.with_ip_addr(*sock);
@ -2158,7 +2158,7 @@ impl Network {
/// Check if a peer is a known anchor.
pub async fn is_anchor_peer(&self, node_id: &NodeId) -> bool {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
storage.is_peer_anchor(node_id).unwrap_or(false)
}
@ -2222,7 +2222,7 @@ impl Network {
let our_profile = self.conn_handle.our_nat_profile().await;
let peer_profile = {
let s = self.storage.lock().await;
let s = self.storage.get().await;
s.get_peer_nat_profile(&target)
};
@ -2301,7 +2301,7 @@ impl Network {
exclude_peer: &crate::types::NodeId,
) -> usize {
let downstream = {
let storage = self.storage.lock().await;
let storage = self.storage.get().await;
storage.get_post_downstream(post_id).unwrap_or_default()
};
let mut sent = 0;

File diff suppressed because it is too large Load diff

View file

@ -287,7 +287,7 @@ pub struct RefuseRedirectPayload {
}
/// Worm lookup query (bi-stream) — searches for nodes, posts, or blobs
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WormQueryPayload {
pub worm_id: WormId,
pub target: NodeId,

View file

@ -30,6 +30,44 @@ pub struct Storage {
conn: Connection,
}
/// Pool of Storage connections for concurrent SQLite access in WAL mode.
/// Each connection is independently locked — readers don't block each other.
/// Uses tokio::sync::Mutex so guards are Send (safe across .await points).
pub struct StoragePool {
slots: Vec<tokio::sync::Mutex<Storage>>,
}
const STORAGE_POOL_SIZE: usize = 8;
impl StoragePool {
/// Create a pool of Storage connections to the same database.
pub fn open(path: impl AsRef<std::path::Path>) -> anyhow::Result<Self> {
let mut slots = Vec::with_capacity(STORAGE_POOL_SIZE);
// First connection does schema init + migration
let first = Storage::open(path.as_ref())?;
slots.push(tokio::sync::Mutex::new(first));
// Additional connections just open + WAL mode (schema already exists)
for _ in 1..STORAGE_POOL_SIZE {
let conn = Connection::open(path.as_ref())?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
slots.push(tokio::sync::Mutex::new(Storage { conn }));
}
Ok(Self { slots })
}
/// Get an available Storage connection. Tries each slot with try_lock;
/// if all busy, awaits the first (rare under normal load).
pub async fn get(&self) -> tokio::sync::MutexGuard<'_, Storage> {
for slot in &self.slots {
if let Ok(guard) = slot.try_lock() {
return guard;
}
}
// All busy — await the first
self.slots[0].lock().await
}
}
/// Current schema version. Bump this when making schema or data changes
/// that require migration. Old databases with a lower version will be migrated.
/// If the gap is too large (major version mismatch), the DB is reset instead.

View file

@ -126,7 +126,7 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
// Single lock: gather holders, local post, AND author name if local
let (holders, local_post, local_author_name) = {
let store = node.storage.lock().await;
let store = node.storage.get().await;
let mut holders = Vec::new();
if let Some(author) = author_id {
@ -190,7 +190,7 @@ async fn serve_post(stream: &mut TcpStream, path: &str, node: &Arc<Node>, browse
Ok(Ok(Some(sync_post))) => {
// Single lock: store post AND get author name
let author_name = {
let store = node.storage.lock().await;
let store = node.storage.get().await;
let _ = store.store_post_with_visibility(
&sync_post.id, &sync_post.post, &sync_post.visibility,
);
@ -230,7 +230,7 @@ async fn try_redirect(
use crate::types::NatMapping;
let post_hex = hex::encode(post_id);
let store = node.storage.lock().await;
let store = node.storage.get().await;
// Classify holders into tiers
let mut direct_candidates: Vec<(NodeId, String)> = Vec::new(); // http_addr known
@ -354,7 +354,7 @@ async fn serve_blob(stream: &mut TcpStream, path: &str, node: &Arc<Node>) {
// Check blobs table first, then scan post attachments (for posts stored via PostFetch
// which don't populate the blobs table).
let (mime_type, author_id) = {
let store = node.storage.lock().await;
let store = node.storage.get().await;
// Try blobs table first
if let Some(mime) = find_public_blob_mime(&store, &blob_id) {
let author = store.get_blob_post_id(&blob_id).ok().flatten().and_then(|pid| {