diff --git a/Cargo.lock b/Cargo.lock index 3312b8e..073b626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,21 +17,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "autocfg" version = "1.4.0" @@ -59,12 +44,6 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" -[[package]] -name = "bumpalo" -version = "3.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" - [[package]] name = "bytes" version = "1.9.0" @@ -86,20 +65,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" -dependencies = [ - "android-tzdata", - "iana-time-zone", - "js-sys", - "num-traits", - "wasm-bindgen", - "windows-targets", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -231,45 +196,6 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" -[[package]] -name = "iana-time-zone" -version = "0.1.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "windows-core", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" -dependencies = [ - "cc", -] - -[[package]] -name = "js-sys" -version = "0.3.77" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" -dependencies = [ - "once_cell", - "wasm-bindgen", -] - -[[package]] -name = "json" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" - [[package]] name = "libc" version = "0.2.169" @@ -322,21 +248,10 @@ dependencies = [ name = "mqttclient" version = "0.1.0" dependencies = [ - "chrono", "crossbeam", - "json", "rumqttc", ] -[[package]] -name = "num-traits" -version = "0.2.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" -dependencies = [ - "autocfg", -] - [[package]] name = "object" version = "0.36.7" @@ -346,17 +261,11 @@ dependencies = [ "memchr", ] -[[package]] -name = "once_cell" -version = "1.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" - [[package]] name = "openssl-probe" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "pin-project-lite" @@ -480,12 +389,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "rustversion" -version = "1.0.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" - [[package]] name = "schannel" version = "0.1.27" @@ -635,9 +538,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243" [[package]] name = "untrusted" @@ -651,73 +554,6 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasm-bindgen" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" -dependencies = [ - "cfg-if", - "once_cell", - "rustversion", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" -dependencies = [ - "bumpalo", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "windows-core" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 68c3ccd..a52e299 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -chrono = "0.4.39" crossbeam = "0.8.4" -json = "0.12.4" rumqttc = "0.24.0" + +[lib] +path = "src/lib.rs" diff --git a/mqttClient.service b/mqttClient.service deleted file mode 100644 index cbed63b..0000000 --- a/mqttClient.service +++ /dev/null @@ -1,20 +0,0 @@ -# systemd service file to start relayClient - -[Unit] -Description=MQTT client -#After=network.target -After=mosquitto.service - -[Service] -Type=simple -User= -Group= -WorkingDirectory=/home/ - -ExecStart=/home//.local/bin/relayClient -Restart=on-failure -RestartSec=20 -SyslogIdentifier=relayClient - -[Install] -WantedBy=multi-user.target diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a0b6930 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,157 @@ + +mod mqtt_client { + use std::time::Duration; + use std::thread; + + use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS}; + use crossbeam::channel::{unbounded, Receiver, Sender}; + + pub trait MqttTool { + fn new(client: Client, tx: Sender) -> Self; + fn rx(&self, messaage: MqttMessage); + } + + pub struct MqttMessage { + topic: String, + payload: String + } + + pub fn run(host: String, port: u16, client: String, user: String, pass: String) { + let (tx, rx) = unbounded::(); + + let mut mqttoptions = MqttOptions::new(client, host, port); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + mqttoptions.set_credentials(user, pass); + let (client, connection) = Client::new(mqttoptions, 10); + + let client_publisher = client.clone(); + + // thread publisher + let publisher = thread::Builder::new() + .name("publisher".to_string()) + .spawn(move || { + publisher(rx, client_publisher); + }); + match publisher { + Err(_n) => println!("ERROR: main: fait to create publisher thread"), + Ok(_n) => {} + } + + // thread mqtt + let mqtt = thread::Builder::new() + .name("mqtt".to_string()) + .spawn(move || { + let tool = T::new(client, tx); + handeler::(connection, tool); + }); + match mqtt { + Err(_n) => println!("ERROR: main: fait to create mqtt thread"), + Ok(join) => match join.join() { + Err(_) => println!("ERROR: main: failt to join mqtt thread"), + Ok(_) => {} + } + } + } + + pub(self) fn publisher(rx: Receiver, client: rumqttc::Client) { + loop { + let message = rx.recv(); + match message { + Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), + Ok(msg) => { + println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); + match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { + Err(_n) => println!("ERROR: publisher: faild to publish"), + Ok(_n) => {} + } + } + } + } + } + + pub(self) fn handeler(mut connection: Connection, tool: T) { + for (_i, notification) in connection.iter().enumerate() { + let mut delay: bool = false; + match notification { + Err(e) => match e { + rumqttc::ConnectionError::MqttState(state) => match state { + rumqttc::StateError::Io(e) => { + println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); + delay = true; + }, + rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), + rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), + rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), + rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), + rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), + rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), + rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), + rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } + => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), + }, + rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), + rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), + rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), + rumqttc::ConnectionError::ConnectionRefused(code) => { + println!("ERROR: mqtt: ConnectionRefused {:?}", code); + delay = true; + }, + rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), + rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), + rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), + }, + Ok(event) => { + match event { + Event::Outgoing(n) => match n { + + Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), + Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), + Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), + Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), + Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), + Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), + Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), + Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), + Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), + Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), + Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") + }, + Event::Incoming(n) => match n { + Packet::Connect(_) => println!("INFO : mqtt: connected"), + Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), + Packet::Publish(msg) => { + let mut payload: String = String::from(""); + match String::from_utf8(msg.payload.to_vec()) { + Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e), + Ok(v) => payload = v + }; + if payload.len() > 0 { + let message: MqttMessage = { MqttMessage { + topic: msg.topic, + payload: payload + }}; + + tool.rx(message); + } + }, + Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), + Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), + Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), + Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), + Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), + Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), + Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), + Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), + Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), + Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), + Packet::Disconnect => println!("INFO : mqtt: disconected"), + } + } + } + } + if delay { + thread::sleep(Duration::from_millis(500)); + } + } + } +} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index f9a289e..0000000 --- a/src/main.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::time::Duration; -use std::thread; - -use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS}; -use crossbeam::channel::{unbounded, Sender}; - -struct MqttMessage { - topic: String, - payload: String -} - - -fn send_publish(publish: &Sender, message: MqttMessage) { - match publish.send(message) { - Err(n) => println!("ERROR: faild to send publish ({:?})", n), - Ok(_n) => {} - } -} - -fn lamp01_set(publish: &Sender, state: bool) { - let payload: String; - if state { - payload = String::from("ON"); - } else { - payload = String::from("OFF"); - } - send_publish(publish, { MqttMessage { - topic: String::from("/cool/devices/lamp-01/set"), - payload: payload - }}); -} - -fn mqtt_automation(publish: &Sender, message: MqttMessage) { - println!("DEBUG: mqtt_automation: {}: {}", message.topic, message.payload); - if message.topic.eq("clock/hour") && message.payload.eq("7") { - - lamp01_set(publish, true); - - } else if message.topic.eq("/cool/devices/KNMITemp/values") { - - match json::parse(&message.payload) { - Err(e) => println!("ERROR: mqtt_automation: KNMITemp: invalid json ({})", e), - Ok(payload) => { - match payload { - json::JsonValue::Object(obj) => { - match obj["gr"] { - json::JsonValue::Number(number) => { - let gr = match u16::try_from(number) { - Err(_) => 0, - Ok(n) => n - }; - println!("DEBUG: mqtt_automation: KNMITemp: gr = {}", gr); - if gr > 30 { - lamp01_set(publish, false); - } - } - json::JsonValue::Null => {},json::JsonValue::Short(_) => {},json::JsonValue::String(_) => {},json::JsonValue::Boolean(_) => {},json::JsonValue::Object(_) => {},json::JsonValue::Array(_) => {}, - } - } - json::JsonValue::Null => {},json::JsonValue::Short(_) => {},json::JsonValue::String(_) => {},json::JsonValue::Number(_) => {},json::JsonValue::Boolean(_) => {},json::JsonValue::Array(_) => {}, - } - - } - } - - } - // else if message.topic.eq("clock/dow") { - // match u8::try_from(message.payload) { - // Err(e) => println!("ERROR: mqtt_automation: clock/dow has invalid payload ({:?})", e), - // Ok(n) => automation_dow = n - // } - // } -} - -fn mqtt_recive(publish: &Sender, e: Event) { - match e { - Event::Outgoing(n) => match n { - - Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), - Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), - Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), - Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), - Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), - Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), - Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), - Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), - Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), - Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), - Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") - }, - Event::Incoming(n) => match n { - Packet::Connect(_connect) => {}, //println!("INFO : mqtt_recive: in Connect"), - Packet::ConnAck(_connack) => {}, //println!("INFO : mqtt_recive: in ConnAck"), - Packet::Publish(msg) => { - let payload = match String::from_utf8(msg.payload.to_vec()) { - Err(e) => panic!("ERROR: pqtt_recive: faild to decode payload ({})", e), - Ok(v) => v - }; - - let message: MqttMessage = { MqttMessage { - topic: msg.topic, - payload: payload - }}; - - mqtt_automation(&publish, message); - }, - Packet::PubAck(_puback) => {}, //println!("INFO : mqtt_recive: in PubAck"), - Packet::PubRec(_pubrec) => {}, //println!("INFO : mqtt_recive: in PubRec"), - Packet::PubRel(_pubrel) => {}, //println!("INFO : mqtt_recive: in PubRel"), - Packet::PubComp(_pubcomp) => {}, //println!("INFO : mqtt_recive: in PubComp"), - Packet::Subscribe(_subscribe) => {}, //println!("INFO : mqtt_recive: in Subscribe"), - Packet::SubAck(_suback) => {}, //println!("INFO : mqtt_recive: in SubAck"), - Packet::Unsubscribe(_unsubscribe) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), - Packet::UnsubAck(_unsuback) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), - Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), - Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), - Packet::Disconnect => {}, //println!("INFO : mqtt_recive in Disconnect:") - } - } -} - -fn main() { - let (mqtt_publish, mqtt_publish_rx) = unbounded::(); - - let mut mqttoptions = MqttOptions::new("rumqtt-sync", "10.1.2.2", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_credentials("rustClient", "test"); - let (client, mut connection) = Client::new(mqttoptions, 10); - - client.subscribe("/cool/devices/KNMITemp/values", QoS::AtMostOnce).unwrap(); - client.subscribe("clock/hour", QoS::AtMostOnce).unwrap(); - - // thread publisher - let publisher = thread::Builder::new() - .name("publisher".to_string()) - .spawn(move || { - loop { - let message = mqtt_publish_rx.recv(); - match message { - Err(_err) => println!("ERROR: publisher: faild to receve an message"), - Ok(msg) => { - println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload); - match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { - Err(_n) => println!("ERROR: publisher: faild to publish"), - Ok(_n) => {} - } - } - } - } - }); - match publisher { - Err(_n) => println!("ERROR: main: fait to create publisher thread"), - Ok(_n) => {} - } - - for (_i, notification) in connection.iter().enumerate() { - // println!("Notification = {:?}", notification); - match notification { - Err(n) => println!("ERROR: mqtt: {}", n), - Ok(n) => mqtt_recive(&mqtt_publish, n) - } - } -}