diff --git a/Cargo.lock b/Cargo.lock index ff9851c..36ef5ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,21 +17,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "autocfg" version = "1.4.0" @@ -59,12 +44,6 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" -[[package]] -name = "bumpalo" -version = "3.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" - [[package]] name = "bytes" version = "1.9.0" @@ -86,20 +65,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" -dependencies = [ - "android-tzdata", - "iana-time-zone", - "js-sys", - "num-traits", - "wasm-bindgen", - "windows-targets", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -243,29 +208,6 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" -[[package]] -name = "iana-time-zone" -version = "0.1.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "windows-core", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" -dependencies = [ - "cc", -] - [[package]] name = "indexmap" version = "2.7.1" @@ -283,14 +225,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] -name = "js-sys" -version = "0.3.77" +name = "json" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" -dependencies = [ - "once_cell", - "wasm-bindgen", -] +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" [[package]] name = "libc" @@ -341,25 +279,16 @@ dependencies = [ ] [[package]] -name = "mqttClock" -version = "1.0.0" +name = "mqttAutomation" +version = "0.1.0" dependencies = [ - "chrono", "crossbeam", + "json", "rumqttc", "serde", "serde_yaml", ] -[[package]] -name = "num-traits" -version = "0.2.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" -dependencies = [ - "autocfg", -] - [[package]] name = "object" version = "0.36.7" @@ -369,12 +298,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "once_cell" -version = "1.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" - [[package]] name = "openssl-probe" version = "0.1.5" @@ -503,12 +426,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "rustversion" -version = "1.0.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" - [[package]] name = "ryu" version = "1.0.18" @@ -719,73 +636,6 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasm-bindgen" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" -dependencies = [ - "cfg-if", - "once_cell", - "rustversion", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" -dependencies = [ - "bumpalo", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "windows-core" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 6ed7ad3..8427262 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "mqttClock" -version = "1.0.0" +name = "mqttAutomation" +version = "0.1.0" edition = "2021" [dependencies] -chrono = "0.4.39" crossbeam = "0.8.4" +json = "0.12.4" rumqttc = "0.24.0" serde = { version = "1.0.217", features = ["derive"] } serde_yaml = "0.9.34" diff --git a/mqttClock.yml b/mqttClock.yml index ee73005..1a42470 100644 --- a/mqttClock.yml +++ b/mqttClock.yml @@ -1,5 +1,5 @@ mqtt: host: "localhost" port: 1883 - user: "mqttClock" + user: "mqttAutomation" pass: "password" diff --git a/src/main.rs b/src/main.rs index de5c5fe..3780da5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ use std::thread; use std::fs; use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS}; -use chrono::{Local, Timelike, Datelike}; use crossbeam::channel::{unbounded, Sender}; use serde::Deserialize; @@ -12,6 +11,62 @@ struct MqttMessage { payload: String } + +fn json_get_value(value: json::JsonValue, mut path: Vec) -> json::JsonValue { + if path.len() == 0 { + return value + } + match value { + json::JsonValue::Object(obj) => { + let key = path[0].clone(); + path.remove(0); + json_get_value(obj[key].clone(), path) + }, + json::JsonValue::Array(a) => { + let key = path[0].clone(); + match key.parse::() { + Ok(i) => { + if i < a.len() { + json_get_value(a[i].clone(), path) + } else { + json::JsonValue::Null + } + }, + Err(_) => json::JsonValue::Null + } + }, + json::JsonValue::String(_) => json::JsonValue::Null, + json::JsonValue::Short(_) => json::JsonValue::Null, + json::JsonValue::Number(_) => json::JsonValue::Null, + json::JsonValue::Boolean(_) => json::JsonValue::Null, + json::JsonValue::Null => json::JsonValue::Null, + } +} + +#[derive(Debug)] +enum JsonGetError { + Null, + InvalidType, + ConvertionFaild +} + +fn json_get_u32(value: json::JsonValue, path: Vec) -> Result { + match json_get_value(value, path) { + json::JsonValue::Object(_) => Err(JsonGetError::InvalidType), + json::JsonValue::Array(_) => Err(JsonGetError::InvalidType), + json::JsonValue::String(_) => Err(JsonGetError::InvalidType), + json::JsonValue::Short(_) => Err(JsonGetError::InvalidType), + json::JsonValue::Number(num) => { + match u32::try_from(num) { + Err(_) => Err(JsonGetError::ConvertionFaild), + Ok(n) => Ok(n) + } + }, + json::JsonValue::Boolean(_) => Err(JsonGetError::InvalidType), + json::JsonValue::Null => Err(JsonGetError::Null), + } +} + fn send_publish(publish: &Sender, message: MqttMessage) { match publish.send(message) { Err(n) => println!("ERROR: faild to send publish ({:?})", n), @@ -19,84 +74,50 @@ fn send_publish(publish: &Sender, message: MqttMessage) { } } -fn mqtt_clock(publish: Sender) { - // init last values with invalid values - let mut last_year: i32 = 65535; - let mut last_month: u32 = 65535; - let mut last_dom: u32 = 65535; - let mut last_dow: u32 = 65535; - let mut last_iso_week: u32 = 65535; - let mut last_iso_year: i32 = 65535; - let mut last_hour: u32 = 65535; - let mut last_minute: u32 = 65535; - let mut last_second: u32 = 65535; +fn lamp01_set(publish: &Sender, state: bool) { + let payload: String; + if state { + payload = String::from("ON"); + } else { + payload = String::from("OFF"); + } + send_publish(publish, { MqttMessage { + topic: String::from("/cool/devices/lamp-01/set"), + payload: payload + }}); +} - loop { - let datetime = Local::now(); - if last_second != datetime.second() { - last_second = datetime.second(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/time/second"), - payload: last_second.to_string() - }}); - if last_minute != datetime.minute() { - last_minute = datetime.minute(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/time/minute"), - payload: last_minute.to_string() - }}); - if last_hour != datetime.hour() { - last_hour = datetime.hour(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/time/hour"), - payload: last_hour.to_string() - }}); - if last_dom != datetime.day() { - last_dom = datetime.day(); - send_publish(&publish, { MqttMessage {topic: String::from("clock/date/dom"), - payload: last_dom.to_string() - }}); - if last_iso_week != datetime.iso_week().week() { - last_iso_week = datetime.iso_week().week(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/isoWeek"), - payload: last_iso_week.to_string() - }}); - if last_iso_year != datetime.iso_week().year() { - last_iso_year = datetime.iso_week().year(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/isoYear"), - payload: last_iso_year.to_string() - }}); - } +fn mqtt_automation(publish: &Sender, message: MqttMessage) { + println!("DEBUG: mqtt_automation: {}: {}", message.topic, message.payload); + if message.topic.eq("clock/hour") && message.payload.eq("7") { + + lamp01_set(publish, true); + + } else if message.topic.eq("/cool/devices/KNMITemp/values") { + + match json::parse(&message.payload) { + // Err(e) => println!("ERROR: mqtt_automation: KNMITemp: invalid json ({})", e), + Err(e) => println!("ERROR: mqtt_automation: KNMITemp faild to parse json ({})", e), + Ok(payload) => { + match json_get_u32(payload, Vec::from([String::from("gr")])) { + Ok(gr) => { + println!("DEBUG: mqtt_automation: KNMITemp: gr = {}", gr); + if gr > 30 { + lamp01_set(publish, false); } - if last_dow != datetime.weekday() as u32 { - last_dow = datetime.weekday() as u32; - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/dow"), - payload: last_dow.to_string() - }}); - } - if last_month != datetime.month() { - last_month = datetime.month(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/month"), - payload: last_month.to_string() - }}); - if last_year != datetime.year() { - last_year = datetime.year(); - send_publish(&publish, { MqttMessage { - topic: String::from("clock/date/year"), - payload: last_year.to_string() - }}); - } - } - } + }, + Err(e) => print!("ERROR: mqtt_automation: KNMITemp: {:?}", e) } } } - thread::sleep(Duration::from_millis(500)); + } + // else if message.topic.eq("clock/dow") { + // match u8::try_from(message.payload) { + // Err(e) => println!("ERROR: mqtt_automation: clock/dow has invalid payload ({:?})", e), + // Ok(n) => automation_dow = n + // } + // } } @@ -174,7 +195,7 @@ fn main() { mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); let (client, mut connection) = Client::new(mqttoptions, 10); - + // thread publisher let publisher = thread::Builder::new() .name("publisher".to_string()) @@ -197,21 +218,38 @@ fn main() { Err(_n) => println!("ERROR: main: fait to create publisher thread"), Ok(_n) => {} } - - // treath mqtt clock - let mqtt_clock_stat = thread::Builder::new() - .name("mqtt_clock".to_string()) - .spawn(move || { - mqtt_clock(mqtt_publish); - }); - match mqtt_clock_stat { - Err(_n) => println!("ERROR: main: faild to start mqtt clock thread"), - Ok(_n) => {} - } for (_i, notification) in connection.iter().enumerate() { + let mut delay: bool = false; match notification { - Err(e) => println!("ERROR: mqtt: {}", e), + // Err(e) => println!("ERROR: mqtt: {}", e), + Err(e) => match e { + rumqttc::ConnectionError::MqttState(state) => match state { + rumqttc::StateError::Io(e) => { + println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); + delay = true; + }, + rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), + rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), + rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), + rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), + rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), + rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), + rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), + rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } + => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), + }, + rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), + rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), + rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), + rumqttc::ConnectionError::ConnectionRefused(code) => { + println!("ERROR: mqtt: ConnectionRefused {:?}", code); + delay = true; + }, + rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), + rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), + rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), + }, Ok(event) => { match event { Event::Outgoing(n) => match n { @@ -229,9 +267,21 @@ fn main() { Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") }, Event::Incoming(n) => match n { - Packet::Connect(_) => println!("INFO : mqtt: connected"), //println!("INFO : mqtt_recive: in Connect"), - Packet::ConnAck(_) => println!("INFO : mqtt: conn ack"), //println!("INFO : mqtt_recive: in ConnAck"), - Packet::Publish(_) => {}, + Packet::Connect(_) => println!("INFO : mqtt: connected"), + Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), + Packet::Publish(msg) => { + let payload = match String::from_utf8(msg.payload.to_vec()) { + Err(e) => panic!("ERROR: pqtt_recive: faild to decode payload ({})", e), + Ok(v) => v + }; + + let message: MqttMessage = { MqttMessage { + topic: msg.topic, + payload: payload + }}; + + mqtt_automation(&mqtt_publish, message); + }, Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), @@ -247,6 +297,9 @@ fn main() { } } } + if delay { + thread::sleep(Duration::from_millis(500)); + } } println!("INFO : main: exit"); }