Feed pagination, duplicate identity detection, pkarr leak fix, Android SAF
Feed pagination: - Cursor-based pagination: get_feed_page/get_all_posts_page (20 posts/page) - Batched engagement queries (3 bulk SQL queries instead of 4 per post) - IntersectionObserver for infinite scroll (sentinel at midpoint) - Viewport-based media loading (blobs only load when post enters view) - Pre-fetch next page immediately after current page renders Duplicate identity detection: - Anchor detects when a NodeId is already mesh-connected during initial exchange and sets duplicate_active flag in response - Client skips sync tasks when duplicate detected - Frontend shows red warning banner Privacy: - Fixed pkarr leak: clear_address_lookup() removes default dns.iroh.link publishing. Only mDNS (local network) discovery enabled. Android: - SAF integration via tauri-plugin-android-fs: exports open native "Save As" dialog so users can save to Downloads/Drive/etc. - Download/export paths use app data dir on Android (writable) - File picker gated behind desktop cfg (blocking_pick not on Android) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
5e7eed9638
commit
288b53ffb1
12 changed files with 910 additions and 120 deletions
|
|
@ -74,7 +74,7 @@ pub type IntroId = [u8; 16];
|
|||
|
||||
/// Result of initial exchange: accepted or refused with optional redirect peer.
|
||||
pub enum ExchangeResult {
|
||||
Accepted,
|
||||
Accepted { duplicate_active: bool },
|
||||
Refused { redirect: Option<PeerWithAddress> },
|
||||
}
|
||||
|
||||
|
|
@ -1519,6 +1519,7 @@ impl ConnectionManager {
|
|||
http_addr: self.http_addr.clone(),
|
||||
device_role: None,
|
||||
cache_pressure: None,
|
||||
duplicate_active: None,
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -1658,6 +1659,7 @@ impl ConnectionManager {
|
|||
http_addr: self.http_addr.clone(),
|
||||
device_role: None,
|
||||
cache_pressure: None,
|
||||
duplicate_active: None,
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -3937,7 +3939,7 @@ impl ConnectionManager {
|
|||
let session_conn = session.connection.clone();
|
||||
drop(cm); // release lock before async work
|
||||
match initial_exchange_connect(&storage_clone, &our_node_id, &session_conn, requester, None, our_nat_type, our_http_capable, our_http_addr.clone(), None, None).await {
|
||||
Ok(ExchangeResult::Accepted) => {
|
||||
Ok(ExchangeResult::Accepted { .. }) => {
|
||||
tracing::info!(peer = hex::encode(requester), "Target-side: initial exchange after hole punch");
|
||||
}
|
||||
Ok(ExchangeResult::Refused { .. }) => {
|
||||
|
|
@ -5474,11 +5476,13 @@ impl ConnectionManager {
|
|||
ConnectionManager::handle_pull_request_unlocked(&storage, our_node_id, remote_node_id, recv, send).await?;
|
||||
}
|
||||
MessageType::InitialExchange => {
|
||||
let (storage, our_node_id, anchor_addr, our_nat_type, our_http_capable, our_http_addr) = {
|
||||
let (storage, our_node_id, anchor_addr, our_nat_type, our_http_capable, our_http_addr, is_duplicate) = {
|
||||
let cm = conn_mgr.lock().await;
|
||||
(cm.storage_ref(), *cm.our_node_id(), cm.build_anchor_advertised_addr(), cm.nat_type(), cm.http_capable, cm.http_addr.clone())
|
||||
// Duplicate identity detection: is this NodeId already mesh-connected?
|
||||
let dup = cm.connections.contains_key(&remote_node_id);
|
||||
(cm.storage_ref(), *cm.our_node_id(), cm.build_anchor_advertised_addr(), cm.nat_type(), cm.http_capable, cm.http_addr.clone(), dup)
|
||||
};
|
||||
initial_exchange_accept(&storage, &our_node_id, send, recv, remote_node_id, anchor_addr, None, our_nat_type, our_http_capable, our_http_addr, None, None)
|
||||
initial_exchange_accept(&storage, &our_node_id, send, recv, remote_node_id, anchor_addr, None, our_nat_type, our_http_capable, our_http_addr, None, None, is_duplicate)
|
||||
.await?;
|
||||
}
|
||||
MessageType::AddressRequest => {
|
||||
|
|
@ -8331,6 +8335,7 @@ pub async fn initial_exchange_connect(
|
|||
http_addr: our_http_addr,
|
||||
device_role: our_device_role.map(|r| r.as_str().to_string()),
|
||||
cache_pressure: our_cache_pressure,
|
||||
duplicate_active: None,
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -8349,8 +8354,12 @@ pub async fn initial_exchange_connect(
|
|||
anyhow::bail!("expected InitialExchange, got {:?}", msg_type);
|
||||
}
|
||||
let their_payload: InitialExchangePayload = read_payload(&mut recv, MAX_PAYLOAD).await?;
|
||||
let dup = their_payload.duplicate_active.unwrap_or(false);
|
||||
if dup {
|
||||
tracing::warn!(peer = hex::encode(remote_node_id), "Anchor reports duplicate identity active on network");
|
||||
}
|
||||
process_exchange_payload(storage, our_node_id, &remote_node_id, &their_payload).await?;
|
||||
Ok(ExchangeResult::Accepted)
|
||||
Ok(ExchangeResult::Accepted { duplicate_active: dup })
|
||||
};
|
||||
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(10), exchange_fut).await {
|
||||
|
|
@ -8377,6 +8386,7 @@ pub async fn initial_exchange_accept(
|
|||
our_http_addr: Option<String>,
|
||||
our_device_role: Option<crate::types::DeviceRole>,
|
||||
our_cache_pressure: Option<u8>,
|
||||
duplicate_detected: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let their_payload: InitialExchangePayload = read_payload(&mut recv, MAX_PAYLOAD).await?;
|
||||
|
||||
|
|
@ -8404,9 +8414,14 @@ pub async fn initial_exchange_accept(
|
|||
http_addr: our_http_addr,
|
||||
device_role: our_device_role.map(|r| r.as_str().to_string()),
|
||||
cache_pressure: our_cache_pressure,
|
||||
duplicate_active: if duplicate_detected { Some(true) } else { None },
|
||||
}
|
||||
};
|
||||
|
||||
if duplicate_detected {
|
||||
tracing::warn!(peer = hex::encode(remote_node_id), "Duplicate identity detected — notifying connecting node");
|
||||
}
|
||||
|
||||
write_typed_message(&mut send, MessageType::InitialExchange, &our_payload).await?;
|
||||
send.finish()?;
|
||||
|
||||
|
|
|
|||
|
|
@ -46,6 +46,8 @@ pub struct Network {
|
|||
bind_addr: Option<SocketAddr>,
|
||||
/// CDN replication role: determines budget limits and pull ordering
|
||||
device_role: DeviceRole,
|
||||
/// True if an anchor reported this identity is already connected from elsewhere
|
||||
pub duplicate_detected: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
fn is_public_ip(ip: IpAddr) -> bool {
|
||||
|
|
@ -100,9 +102,10 @@ impl Network {
|
|||
let mut builder = iroh::Endpoint::builder()
|
||||
.secret_key(secret_key)
|
||||
.relay_mode(iroh::RelayMode::Disabled)
|
||||
.alpns(vec![ALPN_V2.to_vec()]);
|
||||
.alpns(vec![ALPN_V2.to_vec()])
|
||||
.clear_address_lookup(); // Remove default pkarr + DNS (no dns.iroh.link publishing)
|
||||
|
||||
// mDNS LAN discovery: enables automatic peer discovery on local network
|
||||
// mDNS LAN discovery only: enables automatic peer discovery on local network
|
||||
builder = builder.address_lookup(
|
||||
iroh::address_lookup::MdnsAddressLookupBuilder::default(),
|
||||
);
|
||||
|
|
@ -271,6 +274,7 @@ impl Network {
|
|||
has_public_v6,
|
||||
bind_addr,
|
||||
device_role,
|
||||
duplicate_detected: Arc::new(AtomicBool::new(false)),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -672,7 +676,7 @@ impl Network {
|
|||
let our_nat_type = conn_handle.nat_type().await;
|
||||
let our_http_capable = conn_handle.is_http_capable();
|
||||
let our_http_addr = conn_handle.http_addr();
|
||||
match initial_exchange_accept(storage, &our_node_id, send, recv, remote_node_id, anchor_addr, Some(remote_sock), our_nat_type, our_http_capable, our_http_addr, conn_handle.device_role(), None).await {
|
||||
match initial_exchange_accept(storage, &our_node_id, send, recv, remote_node_id, anchor_addr, Some(remote_sock), our_nat_type, our_http_capable, our_http_addr, conn_handle.device_role(), None, false).await {
|
||||
Ok(()) => {
|
||||
info!(peer = hex::encode(remote_node_id), "Initial exchange complete (upgraded to mesh)");
|
||||
conn_handle.log_activity(ActivityLevel::Info, ActivityCategory::Connection, format!("Upgraded {} to mesh", &hex::encode(remote_node_id)[..8]), Some(remote_node_id));
|
||||
|
|
@ -723,7 +727,12 @@ impl Network {
|
|||
|
||||
// Initial exchange WITHOUT holding conn_mgr lock
|
||||
match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await? {
|
||||
ExchangeResult::Accepted => {
|
||||
ExchangeResult::Accepted { duplicate_active } => {
|
||||
if duplicate_active {
|
||||
self.duplicate_detected.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
warn!(peer = hex::encode(peer_id), "Duplicate identity detected by anchor — this identity is active elsewhere");
|
||||
self.log_activity(ActivityLevel::Warn, ActivityCategory::Connection, "Duplicate identity active on network".into(), None);
|
||||
}
|
||||
// Spawn the per-connection stream loop
|
||||
let conn_data = self.conn_handle.get_connection_map().await;
|
||||
if let Some((_, conn, _, last_activity)) = conn_data.into_iter().find(|(nid, _, _, _)| *nid == peer_id) {
|
||||
|
|
@ -1365,7 +1374,7 @@ impl Network {
|
|||
let our_nat_type = self.conn_handle.nat_type().await;
|
||||
|
||||
match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, peer_id, anchor_addr, our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await {
|
||||
Ok(ExchangeResult::Accepted) => {
|
||||
Ok(ExchangeResult::Accepted { .. }) => {
|
||||
self.conn_handle.register_connection(peer_id, conn.clone(), vec![], PeerSlotKind::Local).await;
|
||||
{
|
||||
let s = self.storage.get().await;
|
||||
|
|
@ -1470,7 +1479,7 @@ impl Network {
|
|||
let conn = self.conn_handle.get_connection(peer_id).await;
|
||||
if let Some(conn) = conn {
|
||||
match initial_exchange_connect(&self.storage, &self.our_node_id, &conn, *peer_id, anchor_addr.clone(), our_nat_type, self.is_http_capable(), self.http_addr(), Some(self.device_role), None).await {
|
||||
Ok(ExchangeResult::Accepted) => {}
|
||||
Ok(ExchangeResult::Accepted { .. }) => {}
|
||||
Ok(ExchangeResult::Refused { redirect }) => {
|
||||
debug!(peer = hex::encode(peer_id), "Auto-connect refused, disconnecting");
|
||||
self.conn_handle.disconnect_peer(peer_id).await;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tracing::{debug, info, warn};
|
||||
|
|
@ -30,6 +30,8 @@ pub struct Node {
|
|||
pub blob_store: Arc<BlobStore>,
|
||||
secret_seed: [u8; 32],
|
||||
bootstrap_anchors: tokio::sync::Mutex<Vec<(NodeId, iroh::EndpointAddr)>>,
|
||||
/// True if an anchor reported another instance of this identity is already active
|
||||
pub duplicate_detected: Arc<AtomicBool>,
|
||||
#[allow(dead_code)]
|
||||
profile: DeviceProfile,
|
||||
pub activity_log: Arc<std::sync::Mutex<ActivityLog>>,
|
||||
|
|
@ -136,6 +138,7 @@ impl Node {
|
|||
blob_store,
|
||||
secret_seed,
|
||||
bootstrap_anchors: tokio::sync::Mutex::new(Vec::new()),
|
||||
duplicate_detected: Arc::new(AtomicBool::new(false)),
|
||||
profile,
|
||||
activity_log: activity_log_ref,
|
||||
last_rebalance_ms,
|
||||
|
|
@ -927,6 +930,34 @@ impl Node {
|
|||
Ok(self.decrypt_posts(raw, &group_seeds))
|
||||
}
|
||||
|
||||
pub async fn get_feed_page(
|
||||
&self,
|
||||
before_ms: Option<u64>,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<Vec<(PostId, Post, PostVisibility, Option<String>)>> {
|
||||
let (raw, group_seeds) = {
|
||||
let storage = self.storage.get().await;
|
||||
let posts = storage.get_feed_page(before_ms, limit)?;
|
||||
let seeds = storage.get_all_group_seeds_map().unwrap_or_default();
|
||||
(posts, seeds)
|
||||
};
|
||||
Ok(self.decrypt_posts(raw, &group_seeds))
|
||||
}
|
||||
|
||||
pub async fn get_all_posts_page(
|
||||
&self,
|
||||
before_ms: Option<u64>,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<Vec<(PostId, Post, PostVisibility, Option<String>)>> {
|
||||
let (raw, group_seeds) = {
|
||||
let storage = self.storage.get().await;
|
||||
let posts = storage.list_posts_page(before_ms, limit)?;
|
||||
let seeds = storage.get_all_group_seeds_map().unwrap_or_default();
|
||||
(posts, seeds)
|
||||
};
|
||||
Ok(self.decrypt_posts(raw, &group_seeds))
|
||||
}
|
||||
|
||||
fn decrypt_posts(
|
||||
&self,
|
||||
posts: Vec<(PostId, Post, PostVisibility)>,
|
||||
|
|
|
|||
|
|
@ -184,6 +184,9 @@ pub struct InitialExchangePayload {
|
|||
/// CDN cache pressure: 0-255 availability score (255 = lots of capacity)
|
||||
#[serde(default)]
|
||||
pub cache_pressure: Option<u8>,
|
||||
/// Set by anchor when it detects this NodeId is already connected from a different address
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub duplicate_active: Option<bool>,
|
||||
}
|
||||
|
||||
/// Incremental N1/N2 changes
|
||||
|
|
|
|||
|
|
@ -892,6 +892,158 @@ impl Storage {
|
|||
Ok(posts)
|
||||
}
|
||||
|
||||
/// Feed: paginated — posts from followed users, cursor-based by timestamp
|
||||
pub fn get_feed_page(&self, before_ms: Option<u64>, limit: usize) -> anyhow::Result<Vec<(PostId, Post, PostVisibility)>> {
|
||||
let sql = if before_ms.is_some() {
|
||||
"SELECT p.id, p.author, p.content, p.attachments, p.timestamp_ms, p.visibility
|
||||
FROM posts p INNER JOIN follows f ON p.author = f.node_id
|
||||
WHERE p.timestamp_ms < ?1
|
||||
ORDER BY p.timestamp_ms DESC LIMIT ?2"
|
||||
} else {
|
||||
"SELECT p.id, p.author, p.content, p.attachments, p.timestamp_ms, p.visibility
|
||||
FROM posts p INNER JOIN follows f ON p.author = f.node_id
|
||||
ORDER BY p.timestamp_ms DESC LIMIT ?2"
|
||||
};
|
||||
let mut stmt = self.conn.prepare(sql)?;
|
||||
let rows = if let Some(bms) = before_ms {
|
||||
stmt.query_map(rusqlite::params![bms as i64, limit as i64], Self::parse_post_row)?
|
||||
} else {
|
||||
stmt.query_map(rusqlite::params![i64::MAX, limit as i64], Self::parse_post_row)?
|
||||
};
|
||||
Self::collect_posts(rows)
|
||||
}
|
||||
|
||||
/// All posts: paginated — cursor-based by timestamp
|
||||
pub fn list_posts_page(&self, before_ms: Option<u64>, limit: usize) -> anyhow::Result<Vec<(PostId, Post, PostVisibility)>> {
|
||||
let sql = if before_ms.is_some() {
|
||||
"SELECT id, author, content, attachments, timestamp_ms, visibility
|
||||
FROM posts WHERE timestamp_ms < ?1
|
||||
ORDER BY timestamp_ms DESC LIMIT ?2"
|
||||
} else {
|
||||
"SELECT id, author, content, attachments, timestamp_ms, visibility
|
||||
FROM posts ORDER BY timestamp_ms DESC LIMIT ?2"
|
||||
};
|
||||
let mut stmt = self.conn.prepare(sql)?;
|
||||
let rows = if let Some(bms) = before_ms {
|
||||
stmt.query_map(rusqlite::params![bms as i64, limit as i64], Self::parse_post_row)?
|
||||
} else {
|
||||
stmt.query_map(rusqlite::params![i64::MAX, limit as i64], Self::parse_post_row)?
|
||||
};
|
||||
Self::collect_posts(rows)
|
||||
}
|
||||
|
||||
/// Batch: reaction counts for multiple posts at once
|
||||
pub fn get_reaction_counts_batch(&self, post_ids: &[PostId], our_node_id: &NodeId) -> anyhow::Result<std::collections::HashMap<PostId, Vec<(String, u64, bool)>>> {
|
||||
use std::collections::HashMap;
|
||||
let mut result: HashMap<PostId, Vec<(String, u64, bool)>> = HashMap::new();
|
||||
if post_ids.is_empty() { return Ok(result); }
|
||||
let placeholders: String = (0..post_ids.len()).map(|i| format!("?{}", i + 1)).collect::<Vec<_>>().join(",");
|
||||
let sql = format!(
|
||||
"SELECT post_id, emoji, COUNT(*) as cnt, SUM(CASE WHEN reactor = ?{} THEN 1 ELSE 0 END) as my_count
|
||||
FROM reactions WHERE post_id IN ({}) AND deleted_at IS NULL
|
||||
GROUP BY post_id, emoji ORDER BY cnt DESC",
|
||||
post_ids.len() + 1, placeholders
|
||||
);
|
||||
let mut stmt = self.conn.prepare(&sql)?;
|
||||
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = post_ids.iter().map(|id| Box::new(id.to_vec()) as Box<dyn rusqlite::types::ToSql>).collect();
|
||||
params.push(Box::new(our_node_id.to_vec()));
|
||||
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
|
||||
let rows = stmt.query_map(param_refs.as_slice(), |row| {
|
||||
let pid: Vec<u8> = row.get(0)?;
|
||||
let emoji: String = row.get(1)?;
|
||||
let count: i64 = row.get(2)?;
|
||||
let my_count: i64 = row.get(3)?;
|
||||
Ok((pid, emoji, count as u64, my_count > 0))
|
||||
})?;
|
||||
for row in rows {
|
||||
let (pid, emoji, count, reacted_by_me) = row?;
|
||||
if let Ok(id) = blob_to_postid(pid) {
|
||||
result.entry(id).or_default().push((emoji, count, reacted_by_me));
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Batch: comment counts for multiple posts at once
|
||||
pub fn get_comment_counts_batch(&self, post_ids: &[PostId]) -> anyhow::Result<std::collections::HashMap<PostId, u64>> {
|
||||
use std::collections::HashMap;
|
||||
let mut result: HashMap<PostId, u64> = HashMap::new();
|
||||
if post_ids.is_empty() { return Ok(result); }
|
||||
let placeholders: String = (0..post_ids.len()).map(|i| format!("?{}", i + 1)).collect::<Vec<_>>().join(",");
|
||||
let sql = format!(
|
||||
"SELECT post_id, COUNT(*) FROM comments WHERE post_id IN ({}) AND deleted_at IS NULL GROUP BY post_id",
|
||||
placeholders
|
||||
);
|
||||
let mut stmt = self.conn.prepare(&sql)?;
|
||||
let params: Vec<Box<dyn rusqlite::types::ToSql>> = post_ids.iter().map(|id| Box::new(id.to_vec()) as Box<dyn rusqlite::types::ToSql>).collect();
|
||||
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
|
||||
let rows = stmt.query_map(param_refs.as_slice(), |row| {
|
||||
let pid: Vec<u8> = row.get(0)?;
|
||||
let count: i64 = row.get(1)?;
|
||||
Ok((pid, count as u64))
|
||||
})?;
|
||||
for row in rows {
|
||||
let (pid, count) = row?;
|
||||
if let Ok(id) = blob_to_postid(pid) {
|
||||
result.insert(id, count);
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Batch: visibility intents for multiple posts at once
|
||||
pub fn get_post_intents_batch(&self, post_ids: &[PostId]) -> anyhow::Result<std::collections::HashMap<PostId, String>> {
|
||||
use std::collections::HashMap;
|
||||
let mut result: HashMap<PostId, String> = HashMap::new();
|
||||
if post_ids.is_empty() { return Ok(result); }
|
||||
let placeholders: String = (0..post_ids.len()).map(|i| format!("?{}", i + 1)).collect::<Vec<_>>().join(",");
|
||||
let sql = format!(
|
||||
"SELECT id, visibility_intent FROM posts WHERE id IN ({})",
|
||||
placeholders
|
||||
);
|
||||
let mut stmt = self.conn.prepare(&sql)?;
|
||||
let params: Vec<Box<dyn rusqlite::types::ToSql>> = post_ids.iter().map(|id| Box::new(id.to_vec()) as Box<dyn rusqlite::types::ToSql>).collect();
|
||||
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
|
||||
let rows = stmt.query_map(param_refs.as_slice(), |row| {
|
||||
let pid: Vec<u8> = row.get(0)?;
|
||||
let intent: Option<String> = row.get(1)?;
|
||||
Ok((pid, intent.unwrap_or_default()))
|
||||
})?;
|
||||
for row in rows {
|
||||
let (pid, intent) = row?;
|
||||
if let Ok(id) = blob_to_postid(pid) {
|
||||
result.insert(id, intent);
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Helper: parse a post row from a query
|
||||
fn parse_post_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(Vec<u8>, Vec<u8>, String, String, i64, String)> {
|
||||
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?))
|
||||
}
|
||||
|
||||
/// Helper: collect parsed post rows into typed results
|
||||
fn collect_posts(rows: rusqlite::MappedRows<'_, impl FnMut(&rusqlite::Row<'_>) -> rusqlite::Result<(Vec<u8>, Vec<u8>, String, String, i64, String)>>) -> anyhow::Result<Vec<(PostId, Post, PostVisibility)>> {
|
||||
let mut posts = Vec::new();
|
||||
for row in rows {
|
||||
let (id_bytes, author_bytes, content, attachments_json, timestamp_ms, vis_json) = row?;
|
||||
let attachments: Vec<Attachment> = serde_json::from_str(&attachments_json).unwrap_or_default();
|
||||
let visibility: PostVisibility = serde_json::from_str(&vis_json).unwrap_or_default();
|
||||
posts.push((
|
||||
blob_to_postid(id_bytes)?,
|
||||
Post {
|
||||
author: blob_to_nodeid(author_bytes)?,
|
||||
content,
|
||||
attachments,
|
||||
timestamp_ms: timestamp_ms as u64,
|
||||
},
|
||||
visibility,
|
||||
));
|
||||
}
|
||||
Ok(posts)
|
||||
}
|
||||
|
||||
/// All posts with visibility (for sync protocol)
|
||||
pub fn list_posts_with_visibility(&self) -> anyhow::Result<Vec<(PostId, Post, PostVisibility)>> {
|
||||
self.list_posts_reverse_chron()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue