diff --git a/Cargo.lock b/Cargo.lock index bfc0eef..d632eff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -278,12 +278,22 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mqtt-client" +version = "1.0.0" +source = "git+https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git?tag=v1.0.0#f0b34311457b26a0a3b3a7027845b407a3ca47ee" +dependencies = [ + "crossbeam", + "rumqttc", +] + [[package]] name = "mqttAutomation" version = "1.1.0" dependencies = [ "crossbeam", "json", + "mqtt-client", "rumqttc", "serde", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index 013043d..2b3efab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ json = "0.12.4" rumqttc = "0.24.0" serde = { version = "1.0.217", features = ["derive"] } serde_yaml = "0.9.34" +mqtt-client = { tag = "v1.0.0", git = "https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git" } diff --git a/src/main.rs b/src/main.rs index b3519a5..1fd42ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,9 @@ -use std::time::Duration; -use std::thread; use std::fs; -use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS}; -use crossbeam::channel::{unbounded, Sender}; +use crossbeam::channel::Sender; use serde::Deserialize; -struct MqttMessage { - topic: String, - payload: String -} +use mqtt_client::mqtt_client; mod json_parser { pub enum Error { @@ -93,38 +87,49 @@ mod json_parser { } struct Automation { - publish: Sender, + tx: Sender, clock_dow: u8 } impl Automation { - fn new(publish: Sender) -> Automation { - Automation { - publish, - clock_dow: u8::MAX - } - } - - pub(self) fn tx(&self, message: MqttMessage) { - match self.publish.send(message) { + fn tx(&self, message: mqtt_client::MqttMessage) { + match self.tx.send(message) { Err(n) => println!("ERROR: faild to send publish ({:?})", n), Ok(_n) => {} } } - pub(self) fn lamp01_set(&self, state: bool) { + fn lamp01_set(&self, state: bool) { let payload: String; if state { payload = String::from("ON"); } else { payload = String::from("OFF"); } - self.tx({ MqttMessage { + self.tx({ mqtt_client::MqttMessage { topic: String::from("/cool/devices/lamp-01/set"), payload: payload }}); } +} +impl mqtt_client::MqttTool for Automation { + fn new(client: rumqttc::Client, tx: Sender) -> Automation { - fn rx(&mut self, message: MqttMessage) { + match client.subscribe("clock/hour", rumqttc::QoS::AtMostOnce) { + Err(e) => println!("ERROR: main: faild to subscribe to clock/hour ({})", e), + Ok(_) => {} + } + match client.subscribe("/cool/devices/KNMITemp/values", rumqttc::QoS::AtMostOnce) { + Err(e) => println!("ERROR: main: faild to subscribe to KNMITemp/values ({})", e), + Ok(_) => {} + } + + Automation { + tx, + clock_dow: u8::MAX + } + } + + fn rx(&mut self, message: mqtt_client::MqttMessage) { println!("INFO : mqtt_automation: {}: {}", message.topic, message.payload); if message.topic.eq("clock/hour") { @@ -207,163 +212,14 @@ fn read_config() -> Result { } } -fn mqtt_hadeler(mut connection: Connection, mqtt_publish: Sender) { - let mut automation: Automation = Automation::new(mqtt_publish); - for (_i, notification) in connection.iter().enumerate() { - let mut delay: bool = false; - match notification { - Err(e) => match e { - rumqttc::ConnectionError::MqttState(state) => match state { - rumqttc::StateError::Io(e) => { - println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); - delay = true; - }, - rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), - rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), - rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), - rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), - rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), - rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), - rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), - rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } - => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), - }, - rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), - rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), - rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), - rumqttc::ConnectionError::ConnectionRefused(code) => { - println!("ERROR: mqtt: ConnectionRefused {:?}", code); - delay = true; - }, - rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), - rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), - rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), - }, - Ok(event) => { - match event { - Event::Outgoing(n) => match n { - - Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), - Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), - Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), - Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), - Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), - Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), - Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), - Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), - Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), - Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), - Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") - }, - Event::Incoming(n) => match n { - Packet::Connect(_) => println!("INFO : mqtt: connected"), - Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), - Packet::Publish(msg) => { - let mut payload: String = String::from(""); - match String::from_utf8(msg.payload.to_vec()) { - Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e), - Ok(v) => payload = v - }; - if payload.len() > 0 { - let message: MqttMessage = { MqttMessage { - topic: msg.topic, - payload: payload - }}; - - automation.rx(message); - } - }, - Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), - Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), - Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), - Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), - Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), - Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), - Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), - Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), - Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), - Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), - Packet::Disconnect => println!("INFO : mqtt: disconected"), - } - } - } - } - if delay { - thread::sleep(Duration::from_millis(500)); - } - } -} fn main() { - let (mqtt_publish, mqtt_publish_rx) = unbounded::(); - // get setting - let conf_ok: bool; - let conf: Settings; match read_config() { - Ok(n) => { - conf = n; - conf_ok = true; - }, - Err(_) => { - conf = Settings { - mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()} - }; - conf_ok = false; - } + Ok(conf) => + mqtt_client::run::(conf.mqtt.host, conf.mqtt.port, conf.mqtt.client, conf.mqtt.user, conf.mqtt.pass), + Err(_) => {} } - if conf_ok { - let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); - let (client, connection) = Client::new(mqttoptions, 10); - - match client.subscribe("clock/hour", QoS::AtMostOnce) { - Err(e) => println!("ERROR: main: faild to subscribe to clock/hour ({})", e), - Ok(_) => {} - } - match client.subscribe("/cool/devices/KNMITemp/values", QoS::AtMostOnce) { - Err(e) => println!("ERROR: main: faild to subscribe to .../KNMITemp/values ({})", e), - Ok(_) => {} - } - - // thread publisher - let publisher = thread::Builder::new() - .name("publisher".to_string()) - .spawn(move || { - loop { - let message = mqtt_publish_rx.recv(); - match message { - Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), - Ok(msg) => { - println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); - match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { - Err(_n) => println!("ERROR: publisher: faild to publish"), - Ok(_n) => {} - } - } - } - } - }); - match publisher { - Err(_n) => println!("ERROR: main: fait to create publisher thread"), - Ok(_n) => {} - } - - // thread mqtt - let mqtt = thread::Builder::new() - .name("mqtt".to_string()) - .spawn(move || { - mqtt_hadeler(connection, mqtt_publish); - }); - match mqtt { - Err(_n) => println!("ERROR: main: fait to create mqtt thread"), - Ok(join) => match join.join() { - Err(_) => println!("ERROR: main: failt to join mqtt thread"), - Ok(_) => {} - } - } - println!("INFO : main: exit"); - } + println!("INFO : main: exit"); }