From 0fe76885ca98722e84f1e123d4de7ffd6e059e49 Mon Sep 17 00:00:00 2001 From: Tanner Sommers Date: Wed, 6 Mar 2024 14:02:55 -0500 Subject: [PATCH] MQTT Listner --- src-tauri/Cargo.lock | 1 + src-tauri/Cargo.toml | 1 + src-tauri/src/commands/bambu/mod.rs | 89 ++++++- src-tauri/src/handlers/bambu/mod.rs | 385 +++++++++++++++++++++++++++- src-tauri/src/main.rs | 11 +- 5 files changed, 473 insertions(+), 14 deletions(-) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 2dc777b..1207fdf 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -67,6 +67,7 @@ name = "app" version = "0.1.0" dependencies = [ "dirs", + "futures", "jsonwebtoken", "lazy_static", "paho-mqtt", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 63ff15a..5081216 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -24,6 +24,7 @@ lazy_static = "1.4.0" tokio = { version = "1.36.0", features = ["full"] } jsonwebtoken = "9.2.0" paho-mqtt = "0.12.3" +futures = "0.3.30" [features] # this feature is used for production builds or when `devPath` points to the filesystem and the built-in dev server is disabled. diff --git a/src-tauri/src/commands/bambu/mod.rs b/src-tauri/src/commands/bambu/mod.rs index 9ef536c..f7a8a42 100644 --- a/src-tauri/src/commands/bambu/mod.rs +++ b/src-tauri/src/commands/bambu/mod.rs @@ -1,11 +1,12 @@ -use std::borrow::Borrow; - -use crate::handlers::bambu::{BambuClient, BambuDevice}; +use crate::handlers::bambu::{BambuClient, BambuDevice, BambuMQTTClient}; use lazy_static::lazy_static; use serde_json::json; +use std::borrow::Borrow; +use tokio::sync::Mutex; lazy_static! { static ref BAMBU_CLIENT: BambuClient = BambuClient::new(); + static ref BAMBU_MQTT_CLIENT: Mutex = Mutex::new(BambuMQTTClient::new()); } #[tauri::command] @@ -89,3 +90,85 @@ pub async fn discover_devices(devices: Vec) -> Result Err(e.to_string()), } } + +#[tauri::command] +pub async fn init_mqtt_worker() -> Result { + println!("[commands::bambu::init_mqtt_worker] initializing mqtt worker"); + + let result: Result<(), ()> = async { + let mut client = BAMBU_MQTT_CLIENT.lock().await; + client.initialize().await; + + Ok(()) + } + .await; + + match result { + Ok(_) => Ok("".to_string()), + Err(_) => Err("Failed to initialize mqtt worker".to_string()), + } +} + +#[tauri::command] +pub async fn watch_device(device: BambuDevice) -> Result { + println!( + "[commands::bambu::watch_device] watching device: {:?}", + device.name + ); + + let result: Result<(), std::io::Error> = async { + let mut client = BAMBU_MQTT_CLIENT.lock().await; + client.watch_device(device).await + } + .await; + + match result { + Ok(_) => Ok("".to_string()), // Return an empty string on success + Err(e) => { + println!("[commands::bambu::watch_device] error watching: {:?}", e); + Err(e.to_string()) // Return the error as is + } + } +} + +#[tauri::command] +pub async fn unwatch_device(device: BambuDevice) -> Result { + println!( + "[commands::bambu::unwatch_device] unwatching device: {:?}", + device.name + ); + + let result: Result<(), std::io::Error> = async { + let mut client = BAMBU_MQTT_CLIENT.lock().await; + client.unwatch_device(device).await + } + .await; + + match result { + Ok(_) => Ok("".to_string()), // Return an empty string on success + Err(e) => { + println!( + "[commands::bambu::unwatch_device] error unwatching: {:?}", + e + ); + Err(e.to_string()) // Return the error as is + } + } +} + +#[tauri::command] +pub async fn deinit_mqtt_worker() -> Result { + println!("[commands::bambu::deinit_mqtt_worker] deinitializing mqtt worker"); + + let result: Result<(), ()> = async { + let mut client = BAMBU_MQTT_CLIENT.lock().await; + client.deinitialize().await; + Ok(()) + } + .await; + + match result { + Ok(_) => Ok("".to_string()), + Err(_) => Err("Failed to deinitialize mqtt worker".to_string()), + } +} diff --git a/src-tauri/src/handlers/bambu/mod.rs b/src-tauri/src/handlers/bambu/mod.rs index 3678759..080d757 100644 --- a/src-tauri/src/handlers/bambu/mod.rs +++ b/src-tauri/src/handlers/bambu/mod.rs @@ -2,6 +2,7 @@ use super::ssdp::SsdpMessage; use crate::constants; use crate::handlers::ssdp::SsdpListener; +use futures::{StreamExt, TryFutureExt}; use serde_json::{json, Number}; use std::time::Duration; use tokio::sync::Mutex; @@ -11,6 +12,13 @@ pub struct BambuClient { jwt: Mutex>, } +pub struct BambuMQTTClient { + watched_devices: Vec<(BambuDevice, paho_mqtt::AsyncClient)>, + device_watch_threads: Vec<(BambuDevice, tokio::task::JoinHandle<()>)>, + device_updater_thread: Option>, + is_initialized: bool, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct BambuUserResponse { token: String, @@ -61,15 +69,15 @@ impl std::fmt::Display for BambuDeviceResponse { #[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] pub struct BambuDevice { - dev_id: String, - name: String, - online: bool, - ip: Option, - print_status: String, - dev_model_name: String, - dev_product_name: String, - dev_access_code: String, - nozzle_diameter: Number, + pub dev_id: String, + pub name: String, + pub online: bool, + pub ip: Option, + pub print_status: String, + pub dev_model_name: String, + pub dev_product_name: String, + pub dev_access_code: String, + pub nozzle_diameter: Number, } #[derive(Debug)] @@ -87,6 +95,365 @@ impl std::fmt::Display for BambuLoginError { } } +impl BambuMQTTClient { + pub fn new() -> BambuMQTTClient { + BambuMQTTClient { + watched_devices: vec![], + device_watch_threads: vec![], + device_updater_thread: None, + is_initialized: false, + } + } + + pub async fn initialize(&mut self) { + if self.is_initialized { + return; + } + + let watched_devices = self.watched_devices.clone(); + + // Create a thread to update the device statuses + let device_upd_thread = tokio::spawn(async move { + let mut seq_id = 1; + + loop { + // Update the device statuses + println!("[BambuMQTTClient::task::device_updater] Updating device statuses ..."); + + // For each watched device, resend the status request. Assume we are at seq id 1, and increment by 1 each time + for (device, client) in watched_devices.iter() { + println!( + "[BambuMQTTClient::task::device_updater] Sending status request for device: {}", + device.name, + ); + + let request_topic = format!("device/{}/request", device.dev_id); + let status_topic_payload = json!({ + "pushing": { + "sequence_id": seq_id, + "command": "pushall", + "version": 1, + "push_target": 1 + } + }); + + let request_msg: Result = async { + let msg = serde_json::to_string(&status_topic_payload).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Failed to serialize status topic payload for device: {}: {}", + device.name, e + ), + ) + })?; + + Ok(paho_mqtt::Message::new(request_topic, msg.as_bytes(), 1)) + } + .await; + + if request_msg.is_err() { + println!( + "[BambuMQTTClient::task::device_updater] Failed to create request message for device: {}: {}", + device.name, + request_msg.unwrap_err() + ); + + continue; + } + + let _ = client + .publish(request_msg.unwrap()) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "[BambuMQTTClient::task::device_updater] Failed to publish to status topic for device: {}: {}", + device.name, e + ), + ) + }) + .await; + } + + // Sleep for 5 minutes before updating again + seq_id += 1; + tokio::time::sleep(Duration::from_secs(300)).await; + } + }); + + self.device_updater_thread = Some(device_upd_thread); + self.is_initialized = true; + + println!("[BambuMQTTClient::initialize] Successfully initialized BambuMQTTClient"); + } + + pub async fn deinitialize(&mut self) { + if !self.is_initialized { + return; + } + + // Kill the device updater thread + if let Some(handle) = self.device_updater_thread.take() { + handle.abort(); + } + + // Unwatch all devices + let unwatchResult = self + .unwatch_all_devices() + .map_err(|e| { + println!( + "[BambuMQTTClient::deinitialize] Failed to unwatch all devices: {}", + e + ) + }) + .await; + + if unwatchResult.is_ok() { + println!("[BambuMQTTClient::deinitialize] Successfully deinitialized BambuMQTTClient"); + } else { + println!( + "[BambuMQTTClient::deinitialize] Failed to deinitialize BambuMQTTClient cleanly, error: {:?}", + unwatchResult.unwrap_err() + ); + } + + self.is_initialized = false; + } + + pub async fn watch_device(&mut self, device: BambuDevice) -> Result<(), std::io::Error> { + let device_ip = match &device.ip { + Some(ip) => ip, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Expected device: {} to have an IP address, but none was found.", + device.name + ), + )); + } + }; + + let ssl_options = paho_mqtt::SslOptions::new(); + + let connection_opts = paho_mqtt::ConnectOptionsBuilder::new() + .user_name("bblp") + .password(device.dev_access_code.clone()) + .ssl_options(ssl_options) + .keep_alive_interval(std::time::Duration::from_secs(30)) + .finalize(); + + let client = + paho_mqtt::AsyncClient::new(format!("mqtts://{}:8883", device_ip)).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to create MQTT client: {}", e), + ) + })?; + + let mut connected = false; + + // Attempt to connect to the MQTT broker 3 times + for i in 0..3 { + match client.connect(connection_opts.clone()).await { + Ok(_) => { + println!( + "[BambuMQTTClient::watch_device] Successfully connected to MQTT broker at {} for device: {}", + device_ip, device.name + ); + + connected = true; + break; + } + Err(e) => { + println!( + "[BambuMQTTClient::watch_device] Failed to connect to MQTT broker at {}: {} for device: {}. (Attempt {} of 3) Retrying in 5 seconds ...", + device_ip, e, device.name, i + 1 + ); + } + } + + // Sleep for 5 seconds before retrying + tokio::time::sleep(Duration::from_secs(5)).await; + } + + if !connected { + return Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!( + "Failed to connect to MQTT broker at {} for device: {} after 3 attempts", + device_ip, device.name + ), + )); + } + + // Clone the client for use in the closure + let mut client_clone = client.clone(); + let device_clone = device.clone(); + + // Create yet another clone of the device to pass into vec + // This is utterly ridiculous, but it's the only way to get the device into the vec without rust bitching + let device_vec_clone = device.clone(); + + // Subscribe to the device's status topic + let status_topic = format!("device/{}/report", device.dev_id); + let request_topic = format!("device/{}/request", device.dev_id); + let status_topic_payload = json!({ + "pushing": { + "sequence_id": "0", + "command": "pushall", + "version": 1, + "push_target": 1 + } + }); + + let request_msg = paho_mqtt::Message::new( + request_topic, + serde_json::to_string(&status_topic_payload) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Failed to serialize status topic payload for device: {}: {}", + device.name, e + ), + ) + })? + .as_bytes(), + 1, + ); + + client + .subscribe(status_topic, 1) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Failed to subscribe to status topic for device: {}: {}", + device.name, e + ), + ) + }) + .await?; + + client + .publish(request_msg) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Failed to publish to status topic for device: {}: {}", + device.name, e + ), + ) + }) + .await?; + + // Create a thread to watch the devices messages + let device_watch_thread = tokio::spawn(async move { + let mut stream = client_clone.get_stream(100); + + while let Some(msg) = stream.next().await { + // Ensure we have a message + let msg = msg.ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "[BambuMQTTClient::task::device_watch] Expected a message from device: {}, but none was found.", + device_clone.name, + ), + ) + }); + + match msg { + Ok(msg) => { + println!( + "[BambuMQTTClient::task::device_watch] Received message from device: {}: {}", + device_clone.name, msg.payload_str() + ); + } + Err(e) => { + println!( + "[BambuMQTTClient::task::device_watch] Failed to receive message from device: {}: {}", + device_clone.name, e + ); + } + } + } + }); + + // Save our client and threads + self.watched_devices.push((device, client)); + self.device_watch_threads + .push((device_vec_clone, device_watch_thread)); + + Ok(()) + } + + pub async fn unwatch_device(&mut self, device: BambuDevice) -> Result<(), std::io::Error> { + let device_ip = match &device.ip { + Some(ip) => ip, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Expected device: {} to have an IP address, but none was found.", + device.name + ), + )); + } + }; + + // Find the device in the watched devices + let device_index = self + .watched_devices + .iter() + .position(|(d, _)| d.dev_id == device.dev_id); + + if let Some(index) = device_index { + let (device, client) = self.watched_devices.remove(index); + + // Disconnect the client + client.disconnect(None).await.map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Failed to disconnect from MQTT broker at {} for device: {}: {}", + device_ip, device.name, e + ), + ) + })?; + + // Kill the thread and remove it from the list + let (_, handle) = self.device_watch_threads.remove(index); + handle.abort(); + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Expected to find device: {} in the watched devices, but none was found.", + device.name + ), + )); + } + + println!( + "[BambuMQTTClient::unwatch_device] Successfully unwatched device: {}", + device.name + ); + Ok(()) + } + + pub async fn unwatch_all_devices(&mut self) -> Result<(), std::io::Error> { + for (device, _) in self.watched_devices.clone() { + self.unwatch_device(device).await?; + } + + Ok(()) + } +} + impl BambuClient { pub fn new() -> BambuClient { BambuClient { diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 47f3d7a..f33d609 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -4,7 +4,10 @@ mod commands; mod constants; mod handlers; -use commands::bambu::{discover_devices, fetch_devices, get_jwt, login_to_bambu, set_jwt}; +use commands::bambu::{ + deinit_mqtt_worker, discover_devices, fetch_devices, get_jwt, init_mqtt_worker, login_to_bambu, + set_jwt, unwatch_device, watch_device, +}; use commands::config::{get_config, init_config, save_config}; use commands::util::quit; @@ -20,7 +23,11 @@ async fn main() { set_jwt, get_jwt, fetch_devices, - discover_devices + discover_devices, + init_mqtt_worker, + deinit_mqtt_worker, + watch_device, + unwatch_device ]) .run(tauri::generate_context!()) .expect("error while running tauri application");