use crate::api::messages::IncomingMessage; use crate::api::peers; use crate::sidecar::SidecarHandle; use parking_lot::Mutex; use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use tauri::{AppHandle, Emitter}; use tokio::task::JoinHandle; use tracing::warn; const PEERS_INTERVAL: Duration = Duration::from_secs(3); const ROUTES_INTERVAL: Duration = Duration::from_secs(5); const INBOX_INTERVAL: Duration = Duration::from_secs(2); const INBOX_RETRY_BACKOFF: Duration = Duration::from_secs(2); const INBOX_CAPACITY: usize = 200; pub struct Poller { peers_handle: Mutex>>, routes_handle: Mutex>>, inbox_handle: Mutex>>, inbox: Mutex>, } impl Poller { pub fn new() -> Arc { Arc::new(Self { peers_handle: Mutex::new(None), routes_handle: Mutex::new(None), inbox_handle: Mutex::new(None), inbox: Mutex::new(VecDeque::with_capacity(INBOX_CAPACITY)), }) } pub fn start(self: &Arc, app: AppHandle, sidecar: Arc) { self.stop(); *self.peers_handle.lock() = Some(spawn_peers_loop(app.clone(), Arc::clone(&sidecar))); *self.routes_handle.lock() = Some(spawn_routes_loop(app.clone(), Arc::clone(&sidecar))); *self.inbox_handle.lock() = Some(spawn_inbox_loop( app, Arc::clone(&sidecar), Arc::clone(self), )); } pub fn stop(&self) { for slot in [&self.peers_handle, &self.routes_handle, &self.inbox_handle] { if let Some(h) = slot.lock().take() { h.abort(); } } } pub fn inbox_snapshot(&self) -> Vec { self.inbox.lock().iter().cloned().collect() } pub fn clear_inbox(&self) { self.inbox.lock().clear(); } fn push_inbox(&self, msg: IncomingMessage) { let mut buf = self.inbox.lock(); if buf.len() >= INBOX_CAPACITY { buf.pop_front(); } buf.push_back(msg); } } fn spawn_peers_loop(app: AppHandle, sidecar: Arc) -> JoinHandle<()> { tokio::spawn(async move { let mut first = true; loop { if !first { tokio::time::sleep(PEERS_INTERVAL).await; } first = false; let Some(client) = sidecar.client() else { break; }; match client.list_peers().await { Ok(list) => { let stats = peers::aggregate(&list); let _ = app.emit("peers://updated", &list); let _ = app.emit("stats://updated", &stats); } Err(e) => warn!(error = %e, "poller: list_peers failed"), } } }) } fn spawn_routes_loop(app: AppHandle, sidecar: Arc) -> JoinHandle<()> { tokio::spawn(async move { let mut first = true; loop { if !first { tokio::time::sleep(ROUTES_INTERVAL).await; } first = false; let Some(client) = sidecar.client() else { break; }; match client.routes_snapshot().await { Ok(snap) => { let _ = app.emit("routes://updated", &snap); } Err(e) => warn!(error = %e, "poller: routes_snapshot failed"), } } }) } fn spawn_inbox_loop( app: AppHandle, sidecar: Arc, me: Arc, ) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::time::sleep(INBOX_INTERVAL).await; let Some(client) = sidecar.client() else { break; }; // Short-poll: timeout=0 returns immediately if no message. // We previously used a 30s long-poll, but mycelium 0.6.1's // HTTP server appears to serialise requests behind a single // worker — holding the connection for 30s starved every // other endpoint (peers, routes, admin) until our own // 10s reqwest timeout kicked in. match client.pop_message(false, 0, None).await { Ok(Some(msg)) => { me.push_inbox(msg.clone()); let _ = app.emit("messages://incoming", &msg); } Ok(None) => {} Err(e) => { warn!(error = %e, "inbox: pop_message failed"); tokio::time::sleep(INBOX_RETRY_BACKOFF).await; } } } }) }