P2: peers CRUD and aggregated stats

Backend
- api/peers.rs: list/add/remove + aggregate() that derives totals,
  per-state counts, and tx/rx sums in one pass over the peer list
- poller.rs spawns a 3s tokio loop that emits peers://updated and
  stats://updated; cancelled via abort() on stop_daemon
- DELETE peer URL-encodes the endpoint (the path includes ://) with
  a small inline percent-encoder to avoid a url crate dep
- Tauri commands: peers_list, peer_add (with empty-string guard),
  peer_remove, peers_stats

Frontend
- peers store subscribes to the two events and refreshes after
  add/remove for immediate UI feedback
- Peers view renders endpoint, type, color-coded state badge, and
  formatBytes-formatted rx/tx; the four stat cards re-use a
  reusable Stat component
- AddPeerDialog uses radix-vue's Dialog primitive with regex
  validation for tcp:// and quic:// schemes
This commit is contained in:
syoul
2026-04-25 22:56:50 +02:00
parent d737231123
commit c1a81a9065
11 changed files with 608 additions and 8 deletions

View File

@@ -1,4 +1,5 @@
pub mod admin;
pub mod peers;
use crate::error::{AppError, AppResult};
use reqwest::{Client, Response};
@@ -48,7 +49,6 @@ impl MyceliumClient {
}
}
#[allow(dead_code)] // wired in P2 (peers add/remove)
pub(crate) async fn check_status(resp: Response) -> AppResult<()> {
let status = resp.status();
if status.is_success() {

107
src-tauri/src/api/peers.rs Normal file
View File

@@ -0,0 +1,107 @@
use crate::api::MyceliumClient;
use crate::error::AppResult;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PeerEndpoint {
pub proto: String,
pub socket_addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PeerInfo {
pub endpoint: PeerEndpoint,
#[serde(rename = "type")]
pub kind: String, // "static" | "inbound" | "linkLocalDiscovery"
pub connection_state: String, // "alive" | "connecting" | "dead"
#[serde(default)]
pub tx_bytes: u64,
#[serde(default)]
pub rx_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct AggregatedStats {
pub peers_total: usize,
pub peers_alive: usize,
pub peers_connecting: usize,
pub peers_dead: usize,
pub tx_bytes: u64,
pub rx_bytes: u64,
}
pub fn aggregate(peers: &[PeerInfo]) -> AggregatedStats {
let mut s = AggregatedStats {
peers_total: peers.len(),
..Default::default()
};
for p in peers {
s.tx_bytes = s.tx_bytes.saturating_add(p.tx_bytes);
s.rx_bytes = s.rx_bytes.saturating_add(p.rx_bytes);
match p.connection_state.as_str() {
"alive" => s.peers_alive += 1,
"connecting" => s.peers_connecting += 1,
"dead" => s.peers_dead += 1,
_ => {}
}
}
s
}
#[derive(Debug, Serialize)]
struct AddPeerBody<'a> {
endpoint: &'a str,
}
impl MyceliumClient {
pub async fn list_peers(&self) -> AppResult<Vec<PeerInfo>> {
let resp = self.http().get(self.url("/admin/peers")).send().await?;
Self::parse(resp).await
}
/// The upstream OpenAPI doesn't pin the request body shape; the daemon
/// accepts `{"endpoint": "<proto>://<addr>:<port>"}`.
pub async fn add_peer(&self, endpoint: &str) -> AppResult<()> {
let resp = self
.http()
.post(self.url("/admin/peers"))
.json(&AddPeerBody { endpoint })
.send()
.await?;
Self::check_status(resp).await
}
pub async fn remove_peer(&self, endpoint: &str) -> AppResult<()> {
// Endpoints look like `tcp://188.40.132.242:9651` — the slashes and
// colons must be percent-encoded for the path segment.
let encoded = url_encode_path_segment(endpoint);
let resp = self
.http()
.delete(self.url(&format!("/admin/peers/{encoded}")))
.send()
.await?;
Self::check_status(resp).await
}
}
fn url_encode_path_segment(s: &str) -> String {
// Minimal ASCII percent-encoder for the chars that would otherwise
// break path parsing. Avoids pulling a full url-encoding crate.
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'a'..=b'z'
| b'A'..=b'Z'
| b'0'..=b'9'
| b'-'
| b'_'
| b'.'
| b'~' => out.push(b as char),
_ => out.push_str(&format!("%{:02X}", b)),
}
}
out
}

View File

@@ -1,4 +1,6 @@
use crate::api::admin::NodeInfo;
use crate::api::peers::{AggregatedStats, PeerInfo};
use crate::api::MyceliumClient;
use crate::error::{AppError, AppResult};
use crate::sidecar::SidecarConfig;
use crate::state::AppState;
@@ -13,8 +15,7 @@ pub struct DaemonStatus {
pub config_path: Option<String>,
}
#[tauri::command]
pub fn daemon_status(state: State<'_, AppState>) -> DaemonStatus {
fn snapshot(state: &AppState) -> DaemonStatus {
let sc = &state.sidecar;
DaemonStatus {
running: sc.is_running(),
@@ -24,6 +25,15 @@ pub fn daemon_status(state: State<'_, AppState>) -> DaemonStatus {
}
}
fn require_client(state: &AppState) -> AppResult<MyceliumClient> {
state.sidecar.client().ok_or(AppError::DaemonNotRunning)
}
#[tauri::command]
pub fn daemon_status(state: State<'_, AppState>) -> DaemonStatus {
snapshot(&state)
}
#[tauri::command]
pub async fn start_daemon(
app: AppHandle,
@@ -32,18 +42,22 @@ pub async fn start_daemon(
) -> AppResult<DaemonStatus> {
let cfg = config.unwrap_or_default();
state.sidecar.start(&app, &cfg).await?;
Ok(daemon_status(state))
state
.poller
.start(app.clone(), std::sync::Arc::clone(&state.sidecar));
Ok(snapshot(&state))
}
#[tauri::command]
pub async fn stop_daemon(state: State<'_, AppState>) -> AppResult<DaemonStatus> {
state.poller.stop();
state.sidecar.stop().await;
Ok(daemon_status(state))
Ok(snapshot(&state))
}
#[tauri::command]
pub async fn node_info(state: State<'_, AppState>) -> AppResult<NodeInfo> {
let client = state.sidecar.client().ok_or(AppError::DaemonNotRunning)?;
let client = require_client(&state)?;
client.node_info().await
}
@@ -51,3 +65,29 @@ pub async fn node_info(state: State<'_, AppState>) -> AppResult<NodeInfo> {
pub fn sidecar_logs(state: State<'_, AppState>) -> Vec<String> {
state.sidecar.logs_snapshot()
}
// ─── Peers ───────────────────────────────────────────────────────────────────
#[tauri::command]
pub async fn peers_list(state: State<'_, AppState>) -> AppResult<Vec<PeerInfo>> {
require_client(&state)?.list_peers().await
}
#[tauri::command]
pub async fn peer_add(state: State<'_, AppState>, endpoint: String) -> AppResult<()> {
if endpoint.trim().is_empty() {
return Err(AppError::BadInput("endpoint must not be empty".into()));
}
require_client(&state)?.add_peer(endpoint.trim()).await
}
#[tauri::command]
pub async fn peer_remove(state: State<'_, AppState>, endpoint: String) -> AppResult<()> {
require_client(&state)?.remove_peer(&endpoint).await
}
#[tauri::command]
pub async fn peers_stats(state: State<'_, AppState>) -> AppResult<AggregatedStats> {
let peers = require_client(&state)?.list_peers().await?;
Ok(crate::api::peers::aggregate(&peers))
}

View File

@@ -2,6 +2,7 @@ pub mod api;
pub mod commands;
pub mod elevation;
pub mod error;
pub mod poller;
pub mod sidecar;
pub mod state;
@@ -34,6 +35,10 @@ pub fn run() {
commands::stop_daemon,
commands::node_info,
commands::sidecar_logs,
commands::peers_list,
commands::peer_add,
commands::peer_remove,
commands::peers_stats,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");

61
src-tauri/src/poller.rs Normal file
View File

@@ -0,0 +1,61 @@
use crate::api::peers;
use crate::sidecar::SidecarHandle;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use tauri::{AppHandle, Emitter};
use tokio::task::JoinHandle;
use tracing::warn;
const PEERS_INTERVAL: Duration = Duration::from_secs(3);
pub struct Poller {
handle: Mutex<Option<JoinHandle<()>>>,
}
impl Poller {
pub fn new() -> Arc<Self> {
Arc::new(Self {
handle: Mutex::new(None),
})
}
/// Spawn a background task that pulls /admin/peers every few seconds and
/// fans the result out as `peers://updated` and an aggregated
/// `stats://updated` event. Cancels any previously-running task.
pub fn start(self: &Arc<Self>, app: AppHandle, sidecar: Arc<SidecarHandle>) {
self.stop();
let h = tokio::spawn(async move {
// First tick is immediate so the UI doesn't wait a full interval
// for the first peer list right after the daemon comes up.
let mut first = true;
loop {
if !first {
tokio::time::sleep(PEERS_INTERVAL).await;
}
first = false;
let Some(client) = sidecar.client() else {
break;
};
match client.list_peers().await {
Ok(list) => {
let stats = peers::aggregate(&list);
let _ = app.emit("peers://updated", &list);
let _ = app.emit("stats://updated", &stats);
}
Err(e) => {
warn!(error = %e, "poller: list_peers failed");
}
}
}
});
*self.handle.lock() = Some(h);
}
pub fn stop(&self) {
if let Some(h) = self.handle.lock().take() {
h.abort();
}
}
}

View File

@@ -1,14 +1,17 @@
use crate::poller::Poller;
use crate::sidecar::SidecarHandle;
use std::sync::Arc;
pub struct AppState {
pub sidecar: Arc<SidecarHandle>,
pub poller: Arc<Poller>,
}
impl AppState {
pub fn new() -> Self {
Self {
sidecar: SidecarHandle::new(),
poller: Poller::new(),
}
}
}