diff --git a/Cargo.lock b/Cargo.lock index 5758560..1cef079 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,12 +340,22 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mqtt-client" +version = "2.0.0" +source = "git+https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git?tag=v2.0.0#c924081133beb158fa71edc34d44ab254641293a" +dependencies = [ + "crossbeam", + "rumqttc", +] + [[package]] name = "mqttClock" version = "1.1.0" dependencies = [ "chrono", "crossbeam", + "mqtt-client", "rumqttc", "serde", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index f761fa1..c92ac6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ crossbeam = "0.8.4" rumqttc = "0.24.0" serde = { version = "1.0.217", features = ["derive"] } serde_yaml = "0.9.34" +mqtt-client = { tag = "v2.0.0", git = "https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git" } diff --git a/src/main.rs b/src/main.rs index 2540723..e8fb022 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,101 +1,120 @@ -use std::time::Duration; -use std::thread; -use std::fs; +use std::{fs, thread, time::Duration}; -use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS}; -use chrono::{Local, Timelike, Datelike}; -use crossbeam::channel::{unbounded, Sender}; use serde::Deserialize; +use chrono::{Local, Timelike, Datelike}; -struct MqttMessage { - topic: String, - payload: String -} +use mqtt_client::mqtt_client::{QoS, Sender, Receiver, MqttMessage, MqttTool}; -fn send_publish(publish: &Sender, message: MqttMessage) { - match publish.send(message) { - Err(n) => println!("ERROR: faild to send publish ({:?})", n), - Ok(_n) => {} +fn get_u16_from_i32(value: i32) -> Option { + match u16::try_from(value) { + Ok(n) => Some(n), + Err(_) => None } } -fn mqtt_clock(publish: Sender) { - // init last values with invalid values - let mut last_year: i32 = 65535; - let mut last_month: u32 = 65535; - let mut last_dom: u32 = 65535; - let mut last_dow: u32 = 65535; - let mut last_iso_week: u32 = 65535; - let mut last_iso_year: i32 = 65535; - let mut last_hour: u32 = 65535; - let mut last_minute: u32 = 65535; - let mut last_second: u32 = 65535; +fn get_u16_from_u32(value: u32) -> Option { + match u16::try_from(value) { + Ok(n) => Some(n), + Err(_) => None + } +} - loop { - let datetime = Local::now(); - if last_second != datetime.second() { - last_second = datetime.second(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/time/second"), - payload: last_second.to_string() - }}); - if last_minute != datetime.minute() { - last_minute = datetime.minute(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/time/minute"), - payload: last_minute.to_string() - }}); - if last_hour != datetime.hour() { - last_hour = datetime.hour(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/time/hour"), - payload: last_hour.to_string() - }}); - if last_dom != datetime.day() { - last_dom = datetime.day(); - send_publish(&publish, { MqttMessage {topic: String::from("clock/date/dom"), - payload: last_dom.to_string() - }}); - if last_iso_week != datetime.iso_week().week() { - last_iso_week = datetime.iso_week().week(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/isoWeek"), - payload: last_iso_week.to_string() - }}); - if last_iso_year != datetime.iso_week().year() { - last_iso_year = datetime.iso_week().year(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/isoYear"), - payload: last_iso_year.to_string() - }}); - } - } - if last_dow != datetime.weekday() as u32 { - last_dow = datetime.weekday() as u32; - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/dow"), - payload: last_dow.to_string() - }}); - } - if last_month != datetime.month() { - last_month = datetime.month(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/month"), - payload: last_month.to_string() - }}); - if last_year != datetime.year() { - last_year = datetime.year(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/year"), - payload: last_year.to_string() - }}); - } - } - } +struct Clock { + tx: Sender, + last_year: Option, + last_month: Option, + last_dom: Option, + last_dow: Option, + last_iso_week: Option, + last_iso_year: Option, + last_hour: Option, + last_minute: Option, + last_second: Option +} +impl Clock { + fn tx(&self, topic: String, value: Option, retain: bool) { + match value { + None => {}, + Some(v) => { + let message = MqttMessage { + topic, + payload: v.to_string(), + retain: retain, + qos: QoS::AtMostOnce, + }; + match self.tx.send(message) { + Err(n) => println!("ERROR: faild to send publish ({:?})", n), + Ok(_) => {} } } } - thread::sleep(Duration::from_millis(500)); + } + + fn time(&mut self) { + let datetime = Local::now(); + if self.last_second != get_u16_from_u32(datetime.second()) { + self.last_second = get_u16_from_u32(datetime.second()); + self.tx(String::from("clock/time/second"), self.last_second, false); + if self.last_minute != get_u16_from_u32(datetime.minute()) { + self.last_minute = get_u16_from_u32(datetime.minute()); + self.tx(String::from("clock/time/minute"), self.last_minute, false); + if self.last_hour != get_u16_from_u32(datetime.hour()) { + self.last_hour = get_u16_from_u32(datetime.hour()); + self.tx(String::from("clock/time/hour"), self.last_hour, false); + self.date(datetime); + } + } + } + } + + fn date(&mut self, datetime: chrono::DateTime) { + if self.last_dom != get_u16_from_u32(datetime.day()) { + self.last_dom = get_u16_from_u32(datetime.day()); + self.tx(String::from("clock/date/dom"), self.last_dom, true); + if self.last_iso_week != get_u16_from_u32(datetime.iso_week().week()) { + self.last_iso_week = get_u16_from_u32(datetime.iso_week().week()); + self.tx(String::from("clock/date/isoWeek"), self.last_iso_week, true); + if self.last_iso_year != get_u16_from_i32(datetime.iso_week().year()) { + self.last_iso_year = get_u16_from_i32(datetime.iso_week().year()); + self.tx(String::from("clock/date/isoYear"), self.last_iso_year, true); + } + } + if self.last_dow != Some(datetime.weekday() as u16) { + self.last_dow = Some(datetime.weekday() as u16); + self.tx(String::from("clock/date/dow"), self.last_dow, true); + } + if self.last_month != get_u16_from_u32(datetime.month()) { + self.last_month = get_u16_from_u32(datetime.month()); + self.tx(String::from("clock/date/month"), self.last_month, true); + if self.last_year != get_u16_from_i32(datetime.year()) { + self.last_year = get_u16_from_i32(datetime.year()); + self.tx(String::from("clock/date/year"), self.last_year, true); + } + } + } + } +} +impl MqttTool for Clock { + fn new(_client: rumqttc::Client, tx: Sender) -> Clock { + Clock { + tx, + last_year: None, + last_month: None, + last_dom: None, + last_dow: None, + last_iso_week: None, + last_iso_year: None, + last_hour: None, + last_minute: None, + last_second: None + } + } + + fn run(&mut self, _rx: Receiver) { + loop { + self.time(); + thread::sleep(Duration::from_millis(500)); + } } } @@ -152,133 +171,13 @@ fn read_config() -> Result { } fn main() { - let (mqtt_publish, mqtt_publish_rx) = unbounded::(); - // get setting - let conf_ok: bool; - let conf: Settings; match read_config() { - Ok(n) => { - conf = n; - conf_ok = true; - }, - Err(_) => { - conf = Settings { - mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()} - }; - conf_ok = false; - } + Ok(conf) => + mqtt_client::mqtt_client::run::(conf.mqtt.host, conf.mqtt.port, conf.mqtt.client, conf.mqtt.user, conf.mqtt.pass), + Err(_) => {} } - if conf_ok { - let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); - let (client, mut connection) = Client::new(mqttoptions, 10); - - // thread publisher - let publisher = thread::Builder::new() - .name("publisher".to_string()) - .spawn(move || { - loop { - let message = mqtt_publish_rx.recv(); - match message { - Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), - Ok(msg) => { - // println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload); - match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { - Err(_n) => println!("ERROR: publisher: faild to publish"), - Ok(_n) => {} - } - } - } - } - }); - match publisher { - Err(_n) => println!("ERROR: main: fait to create publisher thread"), - Ok(_n) => {} - } - - // treath mqtt clock - let mqtt_clock_stat = thread::Builder::new() - .name("mqtt_clock".to_string()) - .spawn(move || { - mqtt_clock(mqtt_publish); - }); - match mqtt_clock_stat { - Err(_n) => println!("ERROR: main: faild to start mqtt clock thread"), - Ok(_n) => {} - } - - for (_i, notification) in connection.iter().enumerate() { - let mut delay: bool = false; - match notification { - Err(e) => match e { - rumqttc::ConnectionError::MqttState(state) => match state { - rumqttc::StateError::Io(e) => { - println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); - delay = true; - }, - rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), - rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), - rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), - rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), - rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), - rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), - rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), - rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } - => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), - }, - rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), - rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), - rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), - rumqttc::ConnectionError::ConnectionRefused(code) => { - println!("ERROR: mqtt: ConnectionRefused {:?}", code); - delay = true; - }, - rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), - rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), - rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), - }, - Ok(event) => { - match event { - Event::Outgoing(n) => match n { - - Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), - Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), - Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), - Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), - Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), - Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), - Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), - Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), - Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), - Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), - Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") - }, - Event::Incoming(n) => match n { - Packet::Connect(_) => println!("INFO : mqtt: connected"), - Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), - Packet::Publish(_) => {}, - Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), - Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), - Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), - Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), - Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), - Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), - Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), - Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), - Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), - Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), - Packet::Disconnect => println!("INFO : mqtt: disconected"), - } - } - } - } - if delay { - thread::sleep(Duration::from_millis(500)); - } - } - println!("INFO : main: exit"); - } + println!("INFO : main: exit"); } +