diff --git a/src/main.rs b/src/main.rs index 56cf4f9..b3519a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::thread; use std::fs; -use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS}; +use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS}; use crossbeam::channel::{unbounded, Sender}; use serde::Deserialize; @@ -11,113 +11,148 @@ struct MqttMessage { payload: String } - -fn json_get_value(value: json::JsonValue, mut path: Vec) -> json::JsonValue { - if path.len() == 0 { - return value +mod json_parser { + pub enum Error { + Null, + InvalidType, + ConvertionFaild, + JsonParseError(String) } - match value { - json::JsonValue::Object(obj) => { - let key = path[0].clone(); - path.remove(0); - json_get_value(obj[key].clone(), path) - }, - json::JsonValue::Array(a) => { - let key = path[0].clone(); - match key.parse::() { - Ok(i) => { - if i < a.len() { - json_get_value(a[i].clone(), path) - } else { - json::JsonValue::Null - } - }, - Err(_) => json::JsonValue::Null - } - }, - json::JsonValue::String(_) => json::JsonValue::Null, - json::JsonValue::Short(_) => json::JsonValue::Null, - json::JsonValue::Number(_) => json::JsonValue::Null, - json::JsonValue::Boolean(_) => json::JsonValue::Null, - json::JsonValue::Null => json::JsonValue::Null, - } -} - -#[derive(Debug)] -enum JsonGetError { - Null, - InvalidType, - ConvertionFaild -} - -fn json_get_u32(value: json::JsonValue, path: Vec) -> Result { - match json_get_value(value, path) { - json::JsonValue::Object(_) => Err(JsonGetError::InvalidType), - json::JsonValue::Array(_) => Err(JsonGetError::InvalidType), - json::JsonValue::String(_) => Err(JsonGetError::InvalidType), - json::JsonValue::Short(_) => Err(JsonGetError::InvalidType), - json::JsonValue::Number(num) => { - match u32::try_from(num) { - Err(_) => Err(JsonGetError::ConvertionFaild), - Ok(n) => Ok(n) - } - }, - json::JsonValue::Boolean(_) => Err(JsonGetError::InvalidType), - json::JsonValue::Null => Err(JsonGetError::Null), - } -} - -fn send_publish(publish: &Sender, message: MqttMessage) { - match publish.send(message) { - Err(n) => println!("ERROR: faild to send publish ({:?})", n), - Ok(_n) => {} - } -} - -fn lamp01_set(publish: &Sender, state: bool) { - let payload: String; - if state { - payload = String::from("ON"); - } else { - payload = String::from("OFF"); - } - send_publish(publish, { MqttMessage { - topic: String::from("/cool/devices/lamp-01/set"), - payload: payload - }}); -} - -fn mqtt_automation(publish: &Sender, message: MqttMessage) { - println!("DEBUG: mqtt_automation: {}: {}", message.topic, message.payload); - if message.topic.eq("clock/hour") && message.payload.eq("7") { - - lamp01_set(publish, true); - - } else if message.topic.eq("/cool/devices/KNMITemp/values") { - - match json::parse(&message.payload) { - // Err(e) => println!("ERROR: mqtt_automation: KNMITemp: invalid json ({})", e), - Err(e) => println!("ERROR: mqtt_automation: KNMITemp faild to parse json ({})", e), - Ok(payload) => { - match json_get_u32(payload, Vec::from([String::from("gr")])) { - Ok(gr) => { - println!("DEBUG: mqtt_automation: KNMITemp: gr = {}", gr); - if gr > 30 { - lamp01_set(publish, false); - } - }, - Err(e) => print!("ERROR: mqtt_automation: KNMITemp: {:?}", e) - } + impl Error { + pub fn to_string(&self) -> String { + match self { + Error::Null => String::from("path not found"), + Error::InvalidType => String::from("invalid type"), + Error::ConvertionFaild => String::from("type convertion faild"), + Error::JsonParseError(s) => s.to_string(), } } - } - // else if message.topic.eq("clock/dow") { - // match u8::try_from(message.payload) { - // Err(e) => println!("ERROR: mqtt_automation: clock/dow has invalid payload ({:?})", e), - // Ok(n) => automation_dow = n - // } - // } + + pub enum Json { + Value(json::JsonValue), + Text(String) + } + + pub fn get_value(value: json::JsonValue, mut path: Vec) -> json::JsonValue { + if path.len() == 0 { + return value + } + match value { + json::JsonValue::Object(obj) => { + let key = path[0].clone(); + path.remove(0); + get_value(obj[key].clone(), path) + }, + json::JsonValue::Array(a) => { + let key = path[0].clone(); + match key.parse::() { + Ok(i) => { + if i < a.len() { + get_value(a[i].clone(), path) + } else { + json::JsonValue::Null + } + }, + Err(_) => json::JsonValue::Null + } + }, + json::JsonValue::String(_) => json::JsonValue::Null, + json::JsonValue::Short(_) => json::JsonValue::Null, + json::JsonValue::Number(_) => json::JsonValue::Null, + json::JsonValue::Boolean(_) => json::JsonValue::Null, + json::JsonValue::Null => json::JsonValue::Null, + } + } + + pub fn get_u32(data: Json, path: Vec) -> Result { + match data { + Json::Value(value) => match get_value(value, path) { + json::JsonValue::Object(_) => Err(Error::InvalidType), + json::JsonValue::Array(_) => Err(Error::InvalidType), + json::JsonValue::String(_) => Err(Error::InvalidType), + json::JsonValue::Short(_) => Err(Error::InvalidType), + json::JsonValue::Number(num) => { + match u32::try_from(num) { + Err(_) => Err(Error::ConvertionFaild), + Ok(n) => Ok(n) + } + }, + json::JsonValue::Boolean(_) => Err(Error::InvalidType), + json::JsonValue::Null => Err(Error::Null), + } + Json::Text(data) => match json::parse(&data) { + Err(e) => { + Err(Error::JsonParseError(e.to_string())) + }, + Ok(value) => get_u32(Json::Value(value), path) + } + } + + } +} + +struct Automation { + publish: Sender, + clock_dow: u8 +} +impl Automation { + fn new(publish: Sender) -> Automation { + Automation { + publish, + clock_dow: u8::MAX + } + } + + pub(self) fn tx(&self, message: MqttMessage) { + match self.publish.send(message) { + Err(n) => println!("ERROR: faild to send publish ({:?})", n), + Ok(_n) => {} + } + } + + pub(self) fn lamp01_set(&self, state: bool) { + let payload: String; + if state { + payload = String::from("ON"); + } else { + payload = String::from("OFF"); + } + self.tx({ MqttMessage { + topic: String::from("/cool/devices/lamp-01/set"), + payload: payload + }}); + } + + fn rx(&mut self, message: MqttMessage) { + println!("INFO : mqtt_automation: {}: {}", message.topic, message.payload); + if message.topic.eq("clock/hour") { + + if message.payload.eq("7") && (self.clock_dow >= 1 && self.clock_dow <= 7) { + self.lamp01_set(true); + } + + } else if message.topic.eq("/cool/devices/KNMITemp/values") { + + match json_parser::get_u32(json_parser::Json::Text(message.payload), Vec::from([String::from("gr")])) { + Ok(gr) => { + if gr > 30 { + self.lamp01_set(false); + } + }, + Err(e) => print!("ERROR: mqtt_automation: KNMITemp: {}", e.to_string()) + } + + } + else if message.topic.eq("clock/dow") { + + match message.payload.parse::() { + Err(e) => println!("ERROR: mqtt_automation: clock/dow has invalid payload ({:?})", e), + Ok(n) => self.clock_dow = n + } + + } + } } @@ -172,6 +207,93 @@ fn read_config() -> Result { } } +fn mqtt_hadeler(mut connection: Connection, mqtt_publish: Sender) { + let mut automation: Automation = Automation::new(mqtt_publish); + for (_i, notification) in connection.iter().enumerate() { + let mut delay: bool = false; + match notification { + Err(e) => match e { + rumqttc::ConnectionError::MqttState(state) => match state { + rumqttc::StateError::Io(e) => { + println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); + delay = true; + }, + rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), + rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), + rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), + rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), + rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), + rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), + rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), + rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } + => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), + }, + rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), + rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), + rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), + rumqttc::ConnectionError::ConnectionRefused(code) => { + println!("ERROR: mqtt: ConnectionRefused {:?}", code); + delay = true; + }, + rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), + rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), + rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), + }, + Ok(event) => { + match event { + Event::Outgoing(n) => match n { + + Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), + Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), + Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), + Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), + Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), + Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), + Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), + Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), + Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), + Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), + Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") + }, + Event::Incoming(n) => match n { + Packet::Connect(_) => println!("INFO : mqtt: connected"), + Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), + Packet::Publish(msg) => { + let mut payload: String = String::from(""); + match String::from_utf8(msg.payload.to_vec()) { + Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e), + Ok(v) => payload = v + }; + if payload.len() > 0 { + let message: MqttMessage = { MqttMessage { + topic: msg.topic, + payload: payload + }}; + + automation.rx(message); + } + }, + Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), + Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), + Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), + Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), + Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), + Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), + Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), + Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), + Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), + Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), + Packet::Disconnect => println!("INFO : mqtt: disconected"), + } + } + } + } + if delay { + thread::sleep(Duration::from_millis(500)); + } + } +} + fn main() { let (mqtt_publish, mqtt_publish_rx) = unbounded::(); @@ -195,7 +317,7 @@ fn main() { let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); - let (client, mut connection) = Client::new(mqttoptions, 10); + let (client, connection) = Client::new(mqttoptions, 10); match client.subscribe("clock/hour", QoS::AtMostOnce) { Err(e) => println!("ERROR: main: faild to subscribe to clock/hour ({})", e), @@ -215,7 +337,7 @@ fn main() { match message { Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), Ok(msg) => { - println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload); + println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { Err(_n) => println!("ERROR: publisher: faild to publish"), Ok(_n) => {} @@ -229,87 +351,17 @@ fn main() { Ok(_n) => {} } - for (_i, notification) in connection.iter().enumerate() { - let mut delay: bool = false; - match notification { - Err(e) => match e { - rumqttc::ConnectionError::MqttState(state) => match state { - rumqttc::StateError::Io(e) => { - println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); - delay = true; - }, - rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), - rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), - rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), - rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), - rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), - rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), - rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), - rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } - => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), - }, - rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), - rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), - rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), - rumqttc::ConnectionError::ConnectionRefused(code) => { - println!("ERROR: mqtt: ConnectionRefused {:?}", code); - delay = true; - }, - rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), - rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), - rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), - }, - Ok(event) => { - match event { - Event::Outgoing(n) => match n { - - Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), - Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), - Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), - Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), - Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), - Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), - Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), - Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), - Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), - Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), - Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") - }, - Event::Incoming(n) => match n { - Packet::Connect(_) => println!("INFO : mqtt: connected"), - Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), - Packet::Publish(msg) => { - let mut payload: String = String::from(""); - match String::from_utf8(msg.payload.to_vec()) { - Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e), - Ok(v) => payload = v - }; - if payload.len() > 0 { - let message: MqttMessage = { MqttMessage { - topic: msg.topic, - payload: payload - }}; - - mqtt_automation(&mqtt_publish, message); - } - }, - Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), - Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), - Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), - Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), - Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), - Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), - Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), - Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), - Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), - Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), - Packet::Disconnect => println!("INFO : mqtt: disconected"), - } - } - } - } - if delay { - thread::sleep(Duration::from_millis(500)); + // thread mqtt + let mqtt = thread::Builder::new() + .name("mqtt".to_string()) + .spawn(move || { + mqtt_hadeler(connection, mqtt_publish); + }); + match mqtt { + Err(_n) => println!("ERROR: main: fait to create mqtt thread"), + Ok(join) => match join.join() { + Err(_) => println!("ERROR: main: failt to join mqtt thread"), + Ok(_) => {} } } println!("INFO : main: exit");