P4: messages, topics, pubkey

Backend
- api/messages.rs covers send/pop/reply/status with an externally
  tagged MessageDestination enum that matches the daemon's
  {ip|pk: ...} body shape; pop_message uses an inflated request
  timeout to outlast the long-poll window
- api/topics.rs implements default action, topic CRUD, sources
  whitelist, and forward-socket get/set/remove. POST /topics ships
  the raw base64 string as the body (not JSON); path segments are
  percent-encoded inline (topics contain '/' and '+')
- api/pubkey.rs resolves an overlay IPv6 to a hex public key
- poller spawns a third long-poll loop on /messages?peek=false
  that fans every inbound message into a 200-deep ring buffer and
  emits messages://incoming for the UI

Frontend
- messages store: live inbox via the event, persisted outbox via
  tauri-plugin-store keyed under outbox.json
- ComposeMessage form: ip/pk toggle, optional UTF-8 topic and
  payload that get base64-encoded with a TextEncoder-based helper
- MessageList renders printable payloads decoded; binary payloads
  fall back to a "(N bytes binary)" hint
- Topics view: split layout with whitelist on the left, per-topic
  sources/forward editor on the right; default-action toggle is
  surfaced at the top
This commit is contained in:
syoul
2026-04-25 23:10:21 +02:00
parent 95e7cb4bd3
commit f28d0e1338
17 changed files with 1449 additions and 22 deletions

View File

@@ -0,0 +1,132 @@
use crate::api::MyceliumClient;
use crate::error::{AppError, AppResult};
use serde::{Deserialize, Serialize};
/// Destination of an outgoing message: either a fully resolved overlay IPv6
/// or the recipient's public key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum MessageDestination {
#[serde(rename = "ip")]
Ip(String),
#[serde(rename = "pk")]
PublicKey(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushMessageBody {
pub dst: MessageDestination,
/// base64-encoded topic bytes (≤ 340 chars).
pub topic: String,
/// base64-encoded payload bytes.
pub payload: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PushMessageReceipt {
/// 16-char hex id assigned by the daemon.
pub id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IncomingMessage {
pub id: String,
pub src_ip: String,
pub src_pk: String,
pub dst_ip: String,
pub dst_pk: String,
pub topic: String,
pub payload: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageStatus {
/// Pass-through of the daemon's response. We deliberately keep it as a
/// JSON Value because the upstream schema isn't pinned in the spec and
/// fields can be added between releases.
#[serde(flatten)]
pub raw: serde_json::Value,
}
impl MyceliumClient {
pub async fn send_message(&self, body: &PushMessageBody) -> AppResult<PushMessageReceipt> {
let resp = self
.http()
.post(self.url("/messages"))
.json(body)
.send()
.await?;
Self::parse(resp).await
}
/// Long-poll the daemon for an incoming message. `timeout` is seconds and
/// must be ≥ 0; the daemon returns 204/empty when nothing arrives within
/// the window. Caller is responsible for swallowing the resulting Err.
pub async fn pop_message(
&self,
peek: bool,
timeout: u64,
topic: Option<&str>,
) -> AppResult<Option<IncomingMessage>> {
let mut req = self.http().get(self.url("/messages"));
// The daemon expects query params; we hand-build to avoid url crate.
let mut q: Vec<(&str, String)> =
vec![("peek", peek.to_string()), ("timeout", timeout.to_string())];
if let Some(t) = topic {
q.push(("topic", t.to_string()));
}
req = req.query(&q);
// The long-poll can run nearly as long as `timeout`; loosen the
// client-default request timeout for this single call.
req = req.timeout(std::time::Duration::from_secs(
timeout.saturating_add(5).max(15),
));
let resp = req.send().await?;
let status = resp.status();
if status == reqwest::StatusCode::NO_CONTENT {
return Ok(None);
}
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(AppError::DaemonStatus {
status: status.as_u16(),
body,
});
}
// Some implementations also return 200 with empty body to signal
// "nothing to read". Try to parse and tolerate empty.
let bytes = resp.bytes().await?;
if bytes.is_empty() {
return Ok(None);
}
let msg: IncomingMessage = serde_json::from_slice(&bytes)
.map_err(|e| AppError::Other(format!("failed to parse incoming message: {e}")))?;
Ok(Some(msg))
}
pub async fn reply_message(
&self,
id: &str,
body: &PushMessageBody,
) -> AppResult<PushMessageReceipt> {
let resp = self
.http()
.post(self.url(&format!("/messages/reply/{id}")))
.json(body)
.send()
.await?;
Self::parse(resp).await
}
pub async fn message_status(&self, id: &str) -> AppResult<MessageStatus> {
let resp = self
.http()
.get(self.url(&format!("/messages/status/{id}")))
.send()
.await?;
Self::parse(resp).await
}
}

View File

@@ -1,6 +1,9 @@
pub mod admin;
pub mod messages;
pub mod peers;
pub mod pubkey;
pub mod routes;
pub mod topics;
use crate::error::{AppError, AppResult};
use reqwest::{Client, Response};

View File

@@ -93,13 +93,9 @@ fn url_encode_path_segment(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'a'..=b'z'
| b'A'..=b'Z'
| b'0'..=b'9'
| b'-'
| b'_'
| b'.'
| b'~' => out.push(b as char),
b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{:02X}", b)),
}
}

View File

@@ -0,0 +1,21 @@
use crate::api::MyceliumClient;
use crate::error::AppResult;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PubkeyLookup {
#[serde(rename = "NodePubKey")]
pub node_pub_key: String,
}
impl MyceliumClient {
/// Resolve the public key of an overlay node from its IPv6 address.
pub async fn lookup_pubkey(&self, ip: &str) -> AppResult<PubkeyLookup> {
let resp = self
.http()
.get(self.url(&format!("/pubkey/{ip}")))
.send()
.await?;
Self::parse(resp).await
}
}

View File

@@ -37,17 +37,29 @@ pub struct RoutesSnapshot {
impl MyceliumClient {
pub async fn routes_selected(&self) -> AppResult<Vec<Route>> {
let r = self.http().get(self.url("/admin/routes/selected")).send().await?;
let r = self
.http()
.get(self.url("/admin/routes/selected"))
.send()
.await?;
Self::parse(r).await
}
pub async fn routes_fallback(&self) -> AppResult<Vec<Route>> {
let r = self.http().get(self.url("/admin/routes/fallback")).send().await?;
let r = self
.http()
.get(self.url("/admin/routes/fallback"))
.send()
.await?;
Self::parse(r).await
}
pub async fn routes_queried(&self) -> AppResult<Vec<QueriedSubnet>> {
let r = self.http().get(self.url("/admin/routes/queried")).send().await?;
let r = self
.http()
.get(self.url("/admin/routes/queried"))
.send()
.await?;
Self::parse(r).await
}

158
src-tauri/src/api/topics.rs Normal file
View File

@@ -0,0 +1,158 @@
use crate::api::MyceliumClient;
use crate::error::AppResult;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DefaultAction {
pub accept: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceBody {
pub subnet: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct ForwardBody {
pub socket_path: String,
}
impl MyceliumClient {
pub async fn topics_default_get(&self) -> AppResult<DefaultAction> {
let resp = self
.http()
.get(self.url("/messages/topics/default"))
.send()
.await?;
Self::parse(resp).await
}
pub async fn topics_default_set(&self, action: &DefaultAction) -> AppResult<()> {
let resp = self
.http()
.put(self.url("/messages/topics/default"))
.json(action)
.send()
.await?;
Self::check_status(resp).await
}
pub async fn topics_list(&self) -> AppResult<Vec<String>> {
let resp = self.http().get(self.url("/messages/topics")).send().await?;
Self::parse(resp).await
}
/// The daemon expects the raw base64 topic as the request body, not
/// wrapped in JSON.
pub async fn topic_add(&self, topic_b64: &str) -> AppResult<()> {
let resp = self
.http()
.post(self.url("/messages/topics"))
.header(reqwest::header::CONTENT_TYPE, "text/plain")
.body(topic_b64.to_string())
.send()
.await?;
Self::check_status(resp).await
}
pub async fn topic_remove(&self, topic_b64: &str) -> AppResult<()> {
let encoded = encode_segment(topic_b64);
let resp = self
.http()
.delete(self.url(&format!("/messages/topics/{encoded}")))
.send()
.await?;
Self::check_status(resp).await
}
pub async fn topic_sources_list(&self, topic_b64: &str) -> AppResult<Vec<String>> {
let encoded = encode_segment(topic_b64);
let resp = self
.http()
.get(self.url(&format!("/messages/topics/{encoded}/sources")))
.send()
.await?;
Self::parse(resp).await
}
pub async fn topic_source_add(&self, topic_b64: &str, subnet: &str) -> AppResult<()> {
let encoded = encode_segment(topic_b64);
let resp = self
.http()
.post(self.url(&format!("/messages/topics/{encoded}/sources")))
.json(&SourceBody {
subnet: subnet.to_string(),
})
.send()
.await?;
Self::check_status(resp).await
}
pub async fn topic_source_remove(&self, topic_b64: &str, subnet: &str) -> AppResult<()> {
let topic = encode_segment(topic_b64);
let sub = encode_segment(subnet);
let resp = self
.http()
.delete(self.url(&format!("/messages/topics/{topic}/sources/{sub}")))
.send()
.await?;
Self::check_status(resp).await
}
/// Returns Ok(None) if no forward is configured (the daemon yields
/// `null`).
pub async fn topic_forward_get(&self, topic_b64: &str) -> AppResult<Option<String>> {
let encoded = encode_segment(topic_b64);
let resp = self
.http()
.get(self.url(&format!("/messages/topics/{encoded}/forward")))
.send()
.await?;
let v: serde_json::Value = Self::parse(resp).await?;
Ok(match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s),
other => Some(other.to_string()),
})
}
pub async fn topic_forward_set(&self, topic_b64: &str, socket_path: &str) -> AppResult<()> {
let encoded = encode_segment(topic_b64);
let resp = self
.http()
.put(self.url(&format!("/messages/topics/{encoded}/forward")))
.json(&ForwardBody {
socket_path: socket_path.to_string(),
})
.send()
.await?;
Self::check_status(resp).await
}
pub async fn topic_forward_remove(&self, topic_b64: &str) -> AppResult<()> {
let encoded = encode_segment(topic_b64);
let resp = self
.http()
.delete(self.url(&format!("/messages/topics/{encoded}/forward")))
.send()
.await?;
Self::check_status(resp).await
}
}
/// Percent-encodes path segments. Topics are base64 (which contains `/` and
/// `+` and may end with `=`); subnets carry `/` and `:`. We encode anything
/// outside the unreserved set.
fn encode_segment(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{:02X}", b)),
}
}
out
}

View File

@@ -1,6 +1,11 @@
use crate::api::admin::NodeInfo;
use crate::api::messages::{
IncomingMessage, MessageDestination, MessageStatus, PushMessageBody, PushMessageReceipt,
};
use crate::api::peers::{AggregatedStats, PeerInfo};
use crate::api::pubkey::PubkeyLookup;
use crate::api::routes::RoutesSnapshot;
use crate::api::topics::DefaultAction;
use crate::api::MyceliumClient;
use crate::error::{AppError, AppResult};
use crate::sidecar::SidecarConfig;
@@ -99,3 +104,146 @@ pub async fn peers_stats(state: State<'_, AppState>) -> AppResult<AggregatedStat
pub async fn routes_snapshot(state: State<'_, AppState>) -> AppResult<RoutesSnapshot> {
require_client(&state)?.routes_snapshot().await
}
// ─── Messages ────────────────────────────────────────────────────────────────
#[tauri::command]
pub async fn send_message(
state: State<'_, AppState>,
destination: MessageDestination,
topic_b64: String,
payload_b64: String,
) -> AppResult<PushMessageReceipt> {
let body = PushMessageBody {
dst: destination,
topic: topic_b64,
payload: payload_b64,
};
require_client(&state)?.send_message(&body).await
}
#[tauri::command]
pub async fn reply_message(
state: State<'_, AppState>,
id: String,
destination: MessageDestination,
topic_b64: String,
payload_b64: String,
) -> AppResult<PushMessageReceipt> {
let body = PushMessageBody {
dst: destination,
topic: topic_b64,
payload: payload_b64,
};
require_client(&state)?.reply_message(&id, &body).await
}
#[tauri::command]
pub async fn message_status(state: State<'_, AppState>, id: String) -> AppResult<MessageStatus> {
require_client(&state)?.message_status(&id).await
}
#[tauri::command]
pub fn inbox_messages(state: State<'_, AppState>) -> Vec<IncomingMessage> {
state.poller.inbox_snapshot()
}
#[tauri::command]
pub fn inbox_clear(state: State<'_, AppState>) {
state.poller.clear_inbox();
}
// ─── Topics ──────────────────────────────────────────────────────────────────
#[tauri::command]
pub async fn topics_default_get(state: State<'_, AppState>) -> AppResult<DefaultAction> {
require_client(&state)?.topics_default_get().await
}
#[tauri::command]
pub async fn topics_default_set(state: State<'_, AppState>, accept: bool) -> AppResult<()> {
require_client(&state)?
.topics_default_set(&DefaultAction { accept })
.await
}
#[tauri::command]
pub async fn topics_list(state: State<'_, AppState>) -> AppResult<Vec<String>> {
require_client(&state)?.topics_list().await
}
#[tauri::command]
pub async fn topic_add(state: State<'_, AppState>, topic_b64: String) -> AppResult<()> {
if topic_b64.trim().is_empty() {
return Err(AppError::BadInput("topic must not be empty".into()));
}
require_client(&state)?.topic_add(topic_b64.trim()).await
}
#[tauri::command]
pub async fn topic_remove(state: State<'_, AppState>, topic_b64: String) -> AppResult<()> {
require_client(&state)?.topic_remove(&topic_b64).await
}
#[tauri::command]
pub async fn topic_sources_list(
state: State<'_, AppState>,
topic_b64: String,
) -> AppResult<Vec<String>> {
require_client(&state)?.topic_sources_list(&topic_b64).await
}
#[tauri::command]
pub async fn topic_source_add(
state: State<'_, AppState>,
topic_b64: String,
subnet: String,
) -> AppResult<()> {
require_client(&state)?
.topic_source_add(&topic_b64, &subnet)
.await
}
#[tauri::command]
pub async fn topic_source_remove(
state: State<'_, AppState>,
topic_b64: String,
subnet: String,
) -> AppResult<()> {
require_client(&state)?
.topic_source_remove(&topic_b64, &subnet)
.await
}
#[tauri::command]
pub async fn topic_forward_get(
state: State<'_, AppState>,
topic_b64: String,
) -> AppResult<Option<String>> {
require_client(&state)?.topic_forward_get(&topic_b64).await
}
#[tauri::command]
pub async fn topic_forward_set(
state: State<'_, AppState>,
topic_b64: String,
socket_path: String,
) -> AppResult<()> {
require_client(&state)?
.topic_forward_set(&topic_b64, &socket_path)
.await
}
#[tauri::command]
pub async fn topic_forward_remove(state: State<'_, AppState>, topic_b64: String) -> AppResult<()> {
require_client(&state)?
.topic_forward_remove(&topic_b64)
.await
}
// ─── Pubkey ──────────────────────────────────────────────────────────────────
#[tauri::command]
pub async fn lookup_pubkey(state: State<'_, AppState>, ip: String) -> AppResult<PubkeyLookup> {
require_client(&state)?.lookup_pubkey(&ip).await
}

View File

@@ -40,6 +40,23 @@ pub fn run() {
commands::peer_remove,
commands::peers_stats,
commands::routes_snapshot,
commands::send_message,
commands::reply_message,
commands::message_status,
commands::inbox_messages,
commands::inbox_clear,
commands::topics_default_get,
commands::topics_default_set,
commands::topics_list,
commands::topic_add,
commands::topic_remove,
commands::topic_sources_list,
commands::topic_source_add,
commands::topic_source_remove,
commands::topic_forward_get,
commands::topic_forward_set,
commands::topic_forward_remove,
commands::lookup_pubkey,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");

View File

@@ -1,6 +1,8 @@
use crate::api::messages::IncomingMessage;
use crate::api::peers;
use crate::sidecar::SidecarHandle;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tauri::{AppHandle, Emitter};
@@ -9,10 +11,15 @@ use tracing::warn;
const PEERS_INTERVAL: Duration = Duration::from_secs(3);
const ROUTES_INTERVAL: Duration = Duration::from_secs(5);
const INBOX_LONG_POLL_SECS: u64 = 30;
const INBOX_RETRY_BACKOFF: Duration = Duration::from_secs(2);
const INBOX_CAPACITY: usize = 200;
pub struct Poller {
peers_handle: Mutex<Option<JoinHandle<()>>>,
routes_handle: Mutex<Option<JoinHandle<()>>>,
inbox_handle: Mutex<Option<JoinHandle<()>>>,
inbox: Mutex<VecDeque<IncomingMessage>>,
}
impl Poller {
@@ -20,30 +27,49 @@ impl Poller {
Arc::new(Self {
peers_handle: Mutex::new(None),
routes_handle: Mutex::new(None),
inbox_handle: Mutex::new(None),
inbox: Mutex::new(VecDeque::with_capacity(INBOX_CAPACITY)),
})
}
/// Spawn the two background loops. Cancels any previously-running tasks
/// so consecutive `start_daemon` calls don't leak handles.
pub fn start(self: &Arc<Self>, app: AppHandle, sidecar: Arc<SidecarHandle>) {
self.stop();
*self.peers_handle.lock() = Some(spawn_peers_loop(app.clone(), Arc::clone(&sidecar)));
*self.routes_handle.lock() = Some(spawn_routes_loop(app, sidecar));
*self.routes_handle.lock() = Some(spawn_routes_loop(app.clone(), Arc::clone(&sidecar)));
*self.inbox_handle.lock() = Some(spawn_inbox_loop(
app,
Arc::clone(&sidecar),
Arc::clone(self),
));
}
pub fn stop(&self) {
if let Some(h) = self.peers_handle.lock().take() {
h.abort();
for slot in [&self.peers_handle, &self.routes_handle, &self.inbox_handle] {
if let Some(h) = slot.lock().take() {
h.abort();
}
}
if let Some(h) = self.routes_handle.lock().take() {
h.abort();
}
pub fn inbox_snapshot(&self) -> Vec<IncomingMessage> {
self.inbox.lock().iter().cloned().collect()
}
pub fn clear_inbox(&self) {
self.inbox.lock().clear();
}
fn push_inbox(&self, msg: IncomingMessage) {
let mut buf = self.inbox.lock();
if buf.len() >= INBOX_CAPACITY {
buf.pop_front();
}
buf.push_back(msg);
}
}
fn spawn_peers_loop(app: AppHandle, sidecar: Arc<SidecarHandle>) -> JoinHandle<()> {
tokio::spawn(async move {
// Tick once immediately so the UI doesn't wait the full interval.
let mut first = true;
loop {
if !first {
@@ -87,3 +113,31 @@ fn spawn_routes_loop(app: AppHandle, sidecar: Arc<SidecarHandle>) -> JoinHandle<
}
})
}
fn spawn_inbox_loop(
app: AppHandle,
sidecar: Arc<SidecarHandle>,
me: Arc<Poller>,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let Some(client) = sidecar.client() else {
break;
};
// Each iteration is a fresh long-poll. The daemon answers as
// soon as a message arrives, or returns an empty body / 204
// when the timeout window elapses.
match client.pop_message(false, INBOX_LONG_POLL_SECS, None).await {
Ok(Some(msg)) => {
me.push_inbox(msg.clone());
let _ = app.emit("messages://incoming", &msg);
}
Ok(None) => {} // window expired, loop
Err(e) => {
warn!(error = %e, "inbox: pop_message failed");
tokio::time::sleep(INBOX_RETRY_BACKOFF).await;
}
}
}
})
}