diff --git a/src-tauri/src/api/messages.rs b/src-tauri/src/api/messages.rs new file mode 100644 index 0000000..71785d0 --- /dev/null +++ b/src-tauri/src/api/messages.rs @@ -0,0 +1,132 @@ +use crate::api::MyceliumClient; +use crate::error::{AppError, AppResult}; +use serde::{Deserialize, Serialize}; + +/// Destination of an outgoing message: either a fully resolved overlay IPv6 +/// or the recipient's public key. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum MessageDestination { + #[serde(rename = "ip")] + Ip(String), + #[serde(rename = "pk")] + PublicKey(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PushMessageBody { + pub dst: MessageDestination, + /// base64-encoded topic bytes (≤ 340 chars). + pub topic: String, + /// base64-encoded payload bytes. + pub payload: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PushMessageReceipt { + /// 16-char hex id assigned by the daemon. + pub id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct IncomingMessage { + pub id: String, + pub src_ip: String, + pub src_pk: String, + pub dst_ip: String, + pub dst_pk: String, + pub topic: String, + pub payload: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MessageStatus { + /// Pass-through of the daemon's response. We deliberately keep it as a + /// JSON Value because the upstream schema isn't pinned in the spec and + /// fields can be added between releases. + #[serde(flatten)] + pub raw: serde_json::Value, +} + +impl MyceliumClient { + pub async fn send_message(&self, body: &PushMessageBody) -> AppResult { + let resp = self + .http() + .post(self.url("/messages")) + .json(body) + .send() + .await?; + Self::parse(resp).await + } + + /// Long-poll the daemon for an incoming message. `timeout` is seconds and + /// must be ≥ 0; the daemon returns 204/empty when nothing arrives within + /// the window. Caller is responsible for swallowing the resulting Err. + pub async fn pop_message( + &self, + peek: bool, + timeout: u64, + topic: Option<&str>, + ) -> AppResult> { + let mut req = self.http().get(self.url("/messages")); + // The daemon expects query params; we hand-build to avoid url crate. + let mut q: Vec<(&str, String)> = + vec![("peek", peek.to_string()), ("timeout", timeout.to_string())]; + if let Some(t) = topic { + q.push(("topic", t.to_string())); + } + req = req.query(&q); + + // The long-poll can run nearly as long as `timeout`; loosen the + // client-default request timeout for this single call. + req = req.timeout(std::time::Duration::from_secs( + timeout.saturating_add(5).max(15), + )); + let resp = req.send().await?; + let status = resp.status(); + if status == reqwest::StatusCode::NO_CONTENT { + return Ok(None); + } + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(AppError::DaemonStatus { + status: status.as_u16(), + body, + }); + } + // Some implementations also return 200 with empty body to signal + // "nothing to read". Try to parse and tolerate empty. + let bytes = resp.bytes().await?; + if bytes.is_empty() { + return Ok(None); + } + let msg: IncomingMessage = serde_json::from_slice(&bytes) + .map_err(|e| AppError::Other(format!("failed to parse incoming message: {e}")))?; + Ok(Some(msg)) + } + + pub async fn reply_message( + &self, + id: &str, + body: &PushMessageBody, + ) -> AppResult { + let resp = self + .http() + .post(self.url(&format!("/messages/reply/{id}"))) + .json(body) + .send() + .await?; + Self::parse(resp).await + } + + pub async fn message_status(&self, id: &str) -> AppResult { + let resp = self + .http() + .get(self.url(&format!("/messages/status/{id}"))) + .send() + .await?; + Self::parse(resp).await + } +} diff --git a/src-tauri/src/api/mod.rs b/src-tauri/src/api/mod.rs index 5be78c7..fc96336 100644 --- a/src-tauri/src/api/mod.rs +++ b/src-tauri/src/api/mod.rs @@ -1,6 +1,9 @@ pub mod admin; +pub mod messages; pub mod peers; +pub mod pubkey; pub mod routes; +pub mod topics; use crate::error::{AppError, AppResult}; use reqwest::{Client, Response}; diff --git a/src-tauri/src/api/peers.rs b/src-tauri/src/api/peers.rs index a5f7b56..5eb87ad 100644 --- a/src-tauri/src/api/peers.rs +++ b/src-tauri/src/api/peers.rs @@ -93,13 +93,9 @@ fn url_encode_path_segment(s: &str) -> String { let mut out = String::with_capacity(s.len()); for b in s.bytes() { match b { - b'a'..=b'z' - | b'A'..=b'Z' - | b'0'..=b'9' - | b'-' - | b'_' - | b'.' - | b'~' => out.push(b as char), + b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { + out.push(b as char) + } _ => out.push_str(&format!("%{:02X}", b)), } } diff --git a/src-tauri/src/api/pubkey.rs b/src-tauri/src/api/pubkey.rs new file mode 100644 index 0000000..762a161 --- /dev/null +++ b/src-tauri/src/api/pubkey.rs @@ -0,0 +1,21 @@ +use crate::api::MyceliumClient; +use crate::error::AppResult; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PubkeyLookup { + #[serde(rename = "NodePubKey")] + pub node_pub_key: String, +} + +impl MyceliumClient { + /// Resolve the public key of an overlay node from its IPv6 address. + pub async fn lookup_pubkey(&self, ip: &str) -> AppResult { + let resp = self + .http() + .get(self.url(&format!("/pubkey/{ip}"))) + .send() + .await?; + Self::parse(resp).await + } +} diff --git a/src-tauri/src/api/routes.rs b/src-tauri/src/api/routes.rs index 98fa64b..8388c7f 100644 --- a/src-tauri/src/api/routes.rs +++ b/src-tauri/src/api/routes.rs @@ -37,17 +37,29 @@ pub struct RoutesSnapshot { impl MyceliumClient { pub async fn routes_selected(&self) -> AppResult> { - let r = self.http().get(self.url("/admin/routes/selected")).send().await?; + let r = self + .http() + .get(self.url("/admin/routes/selected")) + .send() + .await?; Self::parse(r).await } pub async fn routes_fallback(&self) -> AppResult> { - let r = self.http().get(self.url("/admin/routes/fallback")).send().await?; + let r = self + .http() + .get(self.url("/admin/routes/fallback")) + .send() + .await?; Self::parse(r).await } pub async fn routes_queried(&self) -> AppResult> { - let r = self.http().get(self.url("/admin/routes/queried")).send().await?; + let r = self + .http() + .get(self.url("/admin/routes/queried")) + .send() + .await?; Self::parse(r).await } diff --git a/src-tauri/src/api/topics.rs b/src-tauri/src/api/topics.rs new file mode 100644 index 0000000..5cb1b4d --- /dev/null +++ b/src-tauri/src/api/topics.rs @@ -0,0 +1,158 @@ +use crate::api::MyceliumClient; +use crate::error::AppResult; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DefaultAction { + pub accept: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SourceBody { + pub subnet: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct ForwardBody { + pub socket_path: String, +} + +impl MyceliumClient { + pub async fn topics_default_get(&self) -> AppResult { + let resp = self + .http() + .get(self.url("/messages/topics/default")) + .send() + .await?; + Self::parse(resp).await + } + + pub async fn topics_default_set(&self, action: &DefaultAction) -> AppResult<()> { + let resp = self + .http() + .put(self.url("/messages/topics/default")) + .json(action) + .send() + .await?; + Self::check_status(resp).await + } + + pub async fn topics_list(&self) -> AppResult> { + let resp = self.http().get(self.url("/messages/topics")).send().await?; + Self::parse(resp).await + } + + /// The daemon expects the raw base64 topic as the request body, not + /// wrapped in JSON. + pub async fn topic_add(&self, topic_b64: &str) -> AppResult<()> { + let resp = self + .http() + .post(self.url("/messages/topics")) + .header(reqwest::header::CONTENT_TYPE, "text/plain") + .body(topic_b64.to_string()) + .send() + .await?; + Self::check_status(resp).await + } + + pub async fn topic_remove(&self, topic_b64: &str) -> AppResult<()> { + let encoded = encode_segment(topic_b64); + let resp = self + .http() + .delete(self.url(&format!("/messages/topics/{encoded}"))) + .send() + .await?; + Self::check_status(resp).await + } + + pub async fn topic_sources_list(&self, topic_b64: &str) -> AppResult> { + let encoded = encode_segment(topic_b64); + let resp = self + .http() + .get(self.url(&format!("/messages/topics/{encoded}/sources"))) + .send() + .await?; + Self::parse(resp).await + } + + pub async fn topic_source_add(&self, topic_b64: &str, subnet: &str) -> AppResult<()> { + let encoded = encode_segment(topic_b64); + let resp = self + .http() + .post(self.url(&format!("/messages/topics/{encoded}/sources"))) + .json(&SourceBody { + subnet: subnet.to_string(), + }) + .send() + .await?; + Self::check_status(resp).await + } + + pub async fn topic_source_remove(&self, topic_b64: &str, subnet: &str) -> AppResult<()> { + let topic = encode_segment(topic_b64); + let sub = encode_segment(subnet); + let resp = self + .http() + .delete(self.url(&format!("/messages/topics/{topic}/sources/{sub}"))) + .send() + .await?; + Self::check_status(resp).await + } + + /// Returns Ok(None) if no forward is configured (the daemon yields + /// `null`). + pub async fn topic_forward_get(&self, topic_b64: &str) -> AppResult> { + let encoded = encode_segment(topic_b64); + let resp = self + .http() + .get(self.url(&format!("/messages/topics/{encoded}/forward"))) + .send() + .await?; + let v: serde_json::Value = Self::parse(resp).await?; + Ok(match v { + serde_json::Value::Null => None, + serde_json::Value::String(s) => Some(s), + other => Some(other.to_string()), + }) + } + + pub async fn topic_forward_set(&self, topic_b64: &str, socket_path: &str) -> AppResult<()> { + let encoded = encode_segment(topic_b64); + let resp = self + .http() + .put(self.url(&format!("/messages/topics/{encoded}/forward"))) + .json(&ForwardBody { + socket_path: socket_path.to_string(), + }) + .send() + .await?; + Self::check_status(resp).await + } + + pub async fn topic_forward_remove(&self, topic_b64: &str) -> AppResult<()> { + let encoded = encode_segment(topic_b64); + let resp = self + .http() + .delete(self.url(&format!("/messages/topics/{encoded}/forward"))) + .send() + .await?; + Self::check_status(resp).await + } +} + +/// Percent-encodes path segments. Topics are base64 (which contains `/` and +/// `+` and may end with `=`); subnets carry `/` and `:`. We encode anything +/// outside the unreserved set. +fn encode_segment(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for b in s.bytes() { + match b { + b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { + out.push(b as char) + } + _ => out.push_str(&format!("%{:02X}", b)), + } + } + out +} diff --git a/src-tauri/src/commands.rs b/src-tauri/src/commands.rs index 6f9273a..ec82733 100644 --- a/src-tauri/src/commands.rs +++ b/src-tauri/src/commands.rs @@ -1,6 +1,11 @@ use crate::api::admin::NodeInfo; +use crate::api::messages::{ + IncomingMessage, MessageDestination, MessageStatus, PushMessageBody, PushMessageReceipt, +}; use crate::api::peers::{AggregatedStats, PeerInfo}; +use crate::api::pubkey::PubkeyLookup; use crate::api::routes::RoutesSnapshot; +use crate::api::topics::DefaultAction; use crate::api::MyceliumClient; use crate::error::{AppError, AppResult}; use crate::sidecar::SidecarConfig; @@ -99,3 +104,146 @@ pub async fn peers_stats(state: State<'_, AppState>) -> AppResult) -> AppResult { require_client(&state)?.routes_snapshot().await } + +// ─── Messages ──────────────────────────────────────────────────────────────── + +#[tauri::command] +pub async fn send_message( + state: State<'_, AppState>, + destination: MessageDestination, + topic_b64: String, + payload_b64: String, +) -> AppResult { + let body = PushMessageBody { + dst: destination, + topic: topic_b64, + payload: payload_b64, + }; + require_client(&state)?.send_message(&body).await +} + +#[tauri::command] +pub async fn reply_message( + state: State<'_, AppState>, + id: String, + destination: MessageDestination, + topic_b64: String, + payload_b64: String, +) -> AppResult { + let body = PushMessageBody { + dst: destination, + topic: topic_b64, + payload: payload_b64, + }; + require_client(&state)?.reply_message(&id, &body).await +} + +#[tauri::command] +pub async fn message_status(state: State<'_, AppState>, id: String) -> AppResult { + require_client(&state)?.message_status(&id).await +} + +#[tauri::command] +pub fn inbox_messages(state: State<'_, AppState>) -> Vec { + state.poller.inbox_snapshot() +} + +#[tauri::command] +pub fn inbox_clear(state: State<'_, AppState>) { + state.poller.clear_inbox(); +} + +// ─── Topics ────────────────────────────────────────────────────────────────── + +#[tauri::command] +pub async fn topics_default_get(state: State<'_, AppState>) -> AppResult { + require_client(&state)?.topics_default_get().await +} + +#[tauri::command] +pub async fn topics_default_set(state: State<'_, AppState>, accept: bool) -> AppResult<()> { + require_client(&state)? + .topics_default_set(&DefaultAction { accept }) + .await +} + +#[tauri::command] +pub async fn topics_list(state: State<'_, AppState>) -> AppResult> { + require_client(&state)?.topics_list().await +} + +#[tauri::command] +pub async fn topic_add(state: State<'_, AppState>, topic_b64: String) -> AppResult<()> { + if topic_b64.trim().is_empty() { + return Err(AppError::BadInput("topic must not be empty".into())); + } + require_client(&state)?.topic_add(topic_b64.trim()).await +} + +#[tauri::command] +pub async fn topic_remove(state: State<'_, AppState>, topic_b64: String) -> AppResult<()> { + require_client(&state)?.topic_remove(&topic_b64).await +} + +#[tauri::command] +pub async fn topic_sources_list( + state: State<'_, AppState>, + topic_b64: String, +) -> AppResult> { + require_client(&state)?.topic_sources_list(&topic_b64).await +} + +#[tauri::command] +pub async fn topic_source_add( + state: State<'_, AppState>, + topic_b64: String, + subnet: String, +) -> AppResult<()> { + require_client(&state)? + .topic_source_add(&topic_b64, &subnet) + .await +} + +#[tauri::command] +pub async fn topic_source_remove( + state: State<'_, AppState>, + topic_b64: String, + subnet: String, +) -> AppResult<()> { + require_client(&state)? + .topic_source_remove(&topic_b64, &subnet) + .await +} + +#[tauri::command] +pub async fn topic_forward_get( + state: State<'_, AppState>, + topic_b64: String, +) -> AppResult> { + require_client(&state)?.topic_forward_get(&topic_b64).await +} + +#[tauri::command] +pub async fn topic_forward_set( + state: State<'_, AppState>, + topic_b64: String, + socket_path: String, +) -> AppResult<()> { + require_client(&state)? + .topic_forward_set(&topic_b64, &socket_path) + .await +} + +#[tauri::command] +pub async fn topic_forward_remove(state: State<'_, AppState>, topic_b64: String) -> AppResult<()> { + require_client(&state)? + .topic_forward_remove(&topic_b64) + .await +} + +// ─── Pubkey ────────────────────────────────────────────────────────────────── + +#[tauri::command] +pub async fn lookup_pubkey(state: State<'_, AppState>, ip: String) -> AppResult { + require_client(&state)?.lookup_pubkey(&ip).await +} diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 2bd696b..7ecdfab 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -40,6 +40,23 @@ pub fn run() { commands::peer_remove, commands::peers_stats, commands::routes_snapshot, + commands::send_message, + commands::reply_message, + commands::message_status, + commands::inbox_messages, + commands::inbox_clear, + commands::topics_default_get, + commands::topics_default_set, + commands::topics_list, + commands::topic_add, + commands::topic_remove, + commands::topic_sources_list, + commands::topic_source_add, + commands::topic_source_remove, + commands::topic_forward_get, + commands::topic_forward_set, + commands::topic_forward_remove, + commands::lookup_pubkey, ]) .run(tauri::generate_context!()) .expect("error while running tauri application"); diff --git a/src-tauri/src/poller.rs b/src-tauri/src/poller.rs index ad53830..ed0908a 100644 --- a/src-tauri/src/poller.rs +++ b/src-tauri/src/poller.rs @@ -1,6 +1,8 @@ +use crate::api::messages::IncomingMessage; use crate::api::peers; use crate::sidecar::SidecarHandle; use parking_lot::Mutex; +use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use tauri::{AppHandle, Emitter}; @@ -9,10 +11,15 @@ use tracing::warn; const PEERS_INTERVAL: Duration = Duration::from_secs(3); const ROUTES_INTERVAL: Duration = Duration::from_secs(5); +const INBOX_LONG_POLL_SECS: u64 = 30; +const INBOX_RETRY_BACKOFF: Duration = Duration::from_secs(2); +const INBOX_CAPACITY: usize = 200; pub struct Poller { peers_handle: Mutex>>, routes_handle: Mutex>>, + inbox_handle: Mutex>>, + inbox: Mutex>, } impl Poller { @@ -20,30 +27,49 @@ impl Poller { Arc::new(Self { peers_handle: Mutex::new(None), routes_handle: Mutex::new(None), + inbox_handle: Mutex::new(None), + inbox: Mutex::new(VecDeque::with_capacity(INBOX_CAPACITY)), }) } - /// Spawn the two background loops. Cancels any previously-running tasks - /// so consecutive `start_daemon` calls don't leak handles. pub fn start(self: &Arc, app: AppHandle, sidecar: Arc) { self.stop(); *self.peers_handle.lock() = Some(spawn_peers_loop(app.clone(), Arc::clone(&sidecar))); - *self.routes_handle.lock() = Some(spawn_routes_loop(app, sidecar)); + *self.routes_handle.lock() = Some(spawn_routes_loop(app.clone(), Arc::clone(&sidecar))); + *self.inbox_handle.lock() = Some(spawn_inbox_loop( + app, + Arc::clone(&sidecar), + Arc::clone(self), + )); } pub fn stop(&self) { - if let Some(h) = self.peers_handle.lock().take() { - h.abort(); + for slot in [&self.peers_handle, &self.routes_handle, &self.inbox_handle] { + if let Some(h) = slot.lock().take() { + h.abort(); + } } - if let Some(h) = self.routes_handle.lock().take() { - h.abort(); + } + + pub fn inbox_snapshot(&self) -> Vec { + self.inbox.lock().iter().cloned().collect() + } + + pub fn clear_inbox(&self) { + self.inbox.lock().clear(); + } + + fn push_inbox(&self, msg: IncomingMessage) { + let mut buf = self.inbox.lock(); + if buf.len() >= INBOX_CAPACITY { + buf.pop_front(); } + buf.push_back(msg); } } fn spawn_peers_loop(app: AppHandle, sidecar: Arc) -> JoinHandle<()> { tokio::spawn(async move { - // Tick once immediately so the UI doesn't wait the full interval. let mut first = true; loop { if !first { @@ -87,3 +113,31 @@ fn spawn_routes_loop(app: AppHandle, sidecar: Arc) -> JoinHandle< } }) } + +fn spawn_inbox_loop( + app: AppHandle, + sidecar: Arc, + me: Arc, +) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + let Some(client) = sidecar.client() else { + break; + }; + // Each iteration is a fresh long-poll. The daemon answers as + // soon as a message arrives, or returns an empty body / 204 + // when the timeout window elapses. + match client.pop_message(false, INBOX_LONG_POLL_SECS, None).await { + Ok(Some(msg)) => { + me.push_inbox(msg.clone()); + let _ = app.emit("messages://incoming", &msg); + } + Ok(None) => {} // window expired, loop + Err(e) => { + warn!(error = %e, "inbox: pop_message failed"); + tokio::time::sleep(INBOX_RETRY_BACKOFF).await; + } + } + } + }) +} diff --git a/src/components/ComposeMessage.vue b/src/components/ComposeMessage.vue new file mode 100644 index 0000000..6449122 --- /dev/null +++ b/src/components/ComposeMessage.vue @@ -0,0 +1,115 @@ + + +