convert to library

This commit is contained in:
Laila van Reenen 2025-01-25 20:06:28 +01:00
parent c7fc8f00e2
commit 2d2b0026ad
Signed by: LailaTheElf
GPG Key ID: 8A3EF0226518C12D
5 changed files with 164 additions and 353 deletions

172
Cargo.lock generated
View File

@ -17,21 +17,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "autocfg"
version = "1.4.0"
@ -59,12 +44,6 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
[[package]]
name = "bumpalo"
version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "bytes"
version = "1.9.0"
@ -86,20 +65,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -231,45 +196,6 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "json"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
[[package]]
name = "libc"
version = "0.2.169"
@ -322,21 +248,10 @@ dependencies = [
name = "mqttclient"
version = "0.1.0"
dependencies = [
"chrono",
"crossbeam",
"json",
"rumqttc",
]
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "object"
version = "0.36.7"
@ -346,17 +261,11 @@ dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
[[package]]
name = "openssl-probe"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "pin-project-lite"
@ -480,12 +389,6 @@ dependencies = [
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4"
[[package]]
name = "schannel"
version = "0.1.27"
@ -635,9 +538,9 @@ dependencies = [
[[package]]
name = "unicode-ident"
version = "1.0.14"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243"
[[package]]
name = "untrusted"
@ -651,73 +554,6 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.52.0"

View File

@ -4,7 +4,8 @@ version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.39"
crossbeam = "0.8.4"
json = "0.12.4"
rumqttc = "0.24.0"
[lib]
path = "src/lib.rs"

View File

@ -1,20 +0,0 @@
# systemd service file to start relayClient
[Unit]
Description=MQTT client
#After=network.target
After=mosquitto.service
[Service]
Type=simple
User=<username>
Group=<username>
WorkingDirectory=/home/<username>
ExecStart=/home/<username>/.local/bin/relayClient
Restart=on-failure
RestartSec=20
SyslogIdentifier=relayClient
[Install]
WantedBy=multi-user.target

157
src/lib.rs Normal file
View File

@ -0,0 +1,157 @@
mod mqtt_client {
use std::time::Duration;
use std::thread;
use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS};
use crossbeam::channel::{unbounded, Receiver, Sender};
pub trait MqttTool {
fn new(client: Client, tx: Sender<MqttMessage>) -> Self;
fn rx(&self, messaage: MqttMessage);
}
pub struct MqttMessage {
topic: String,
payload: String
}
pub fn run<T: MqttTool>(host: String, port: u16, client: String, user: String, pass: String) {
let (tx, rx) = unbounded::<MqttMessage>();
let mut mqttoptions = MqttOptions::new(client, host, port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(user, pass);
let (client, connection) = Client::new(mqttoptions, 10);
let client_publisher = client.clone();
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
publisher(rx, client_publisher);
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// thread mqtt
let mqtt = thread::Builder::new()
.name("mqtt".to_string())
.spawn(move || {
let tool = T::new(client, tx);
handeler::<T>(connection, tool);
});
match mqtt {
Err(_n) => println!("ERROR: main: fait to create mqtt thread"),
Ok(join) => match join.join() {
Err(_) => println!("ERROR: main: failt to join mqtt thread"),
Ok(_) => {}
}
}
}
pub(self) fn publisher(rx: Receiver<MqttMessage>, client: rumqttc::Client) {
loop {
let message = rx.recv();
match message {
Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e),
Ok(msg) => {
println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
}
pub(self) fn handeler<T: MqttTool>(mut connection: Connection, tool: T) {
for (_i, notification) in connection.iter().enumerate() {
let mut delay: bool = false;
match notification {
Err(e) => match e {
rumqttc::ConnectionError::MqttState(state) => match state {
rumqttc::StateError::Io(e) => {
println!("ERROR: mqtt: Io ({}) {}", e.kind(), e);
delay = true;
},
rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"),
rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e),
rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"),
rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"),
rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"),
rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"),
rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e),
rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max }
=> println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max),
},
rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"),
rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"),
rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()),
rumqttc::ConnectionError::ConnectionRefused(code) => {
println!("ERROR: mqtt: ConnectionRefused {:?}", code);
delay = true;
},
rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet),
rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"),
rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error),
},
Ok(event) => {
match event {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"),
Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"),
Packet::Publish(msg) => {
let mut payload: String = String::from("");
match String::from_utf8(msg.payload.to_vec()) {
Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e),
Ok(v) => payload = v
};
if payload.len() > 0 {
let message: MqttMessage = { MqttMessage {
topic: msg.topic,
payload: payload
}};
tool.rx(message);
}
},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => println!("INFO : mqtt: disconected"),
}
}
}
}
if delay {
thread::sleep(Duration::from_millis(500));
}
}
}
}

View File

@ -1,163 +0,0 @@
use std::time::Duration;
use std::thread;
use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS};
use crossbeam::channel::{unbounded, Sender};
struct MqttMessage {
topic: String,
payload: String
}
fn send_publish(publish: &Sender<MqttMessage>, message: MqttMessage) {
match publish.send(message) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n),
Ok(_n) => {}
}
}
fn lamp01_set(publish: &Sender<MqttMessage>, state: bool) {
let payload: String;
if state {
payload = String::from("ON");
} else {
payload = String::from("OFF");
}
send_publish(publish, { MqttMessage {
topic: String::from("/cool/devices/lamp-01/set"),
payload: payload
}});
}
fn mqtt_automation(publish: &Sender<MqttMessage>, message: MqttMessage) {
println!("DEBUG: mqtt_automation: {}: {}", message.topic, message.payload);
if message.topic.eq("clock/hour") && message.payload.eq("7") {
lamp01_set(publish, true);
} else if message.topic.eq("/cool/devices/KNMITemp/values") {
match json::parse(&message.payload) {
Err(e) => println!("ERROR: mqtt_automation: KNMITemp: invalid json ({})", e),
Ok(payload) => {
match payload {
json::JsonValue::Object(obj) => {
match obj["gr"] {
json::JsonValue::Number(number) => {
let gr = match u16::try_from(number) {
Err(_) => 0,
Ok(n) => n
};
println!("DEBUG: mqtt_automation: KNMITemp: gr = {}", gr);
if gr > 30 {
lamp01_set(publish, false);
}
}
json::JsonValue::Null => {},json::JsonValue::Short(_) => {},json::JsonValue::String(_) => {},json::JsonValue::Boolean(_) => {},json::JsonValue::Object(_) => {},json::JsonValue::Array(_) => {},
}
}
json::JsonValue::Null => {},json::JsonValue::Short(_) => {},json::JsonValue::String(_) => {},json::JsonValue::Number(_) => {},json::JsonValue::Boolean(_) => {},json::JsonValue::Array(_) => {},
}
}
}
}
// else if message.topic.eq("clock/dow") {
// match u8::try_from(message.payload) {
// Err(e) => println!("ERROR: mqtt_automation: clock/dow has invalid payload ({:?})", e),
// Ok(n) => automation_dow = n
// }
// }
}
fn mqtt_recive(publish: &Sender<MqttMessage>, e: Event) {
match e {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_connect) => {}, //println!("INFO : mqtt_recive: in Connect"),
Packet::ConnAck(_connack) => {}, //println!("INFO : mqtt_recive: in ConnAck"),
Packet::Publish(msg) => {
let payload = match String::from_utf8(msg.payload.to_vec()) {
Err(e) => panic!("ERROR: pqtt_recive: faild to decode payload ({})", e),
Ok(v) => v
};
let message: MqttMessage = { MqttMessage {
topic: msg.topic,
payload: payload
}};
mqtt_automation(&publish, message);
},
Packet::PubAck(_puback) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_pubrec) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_pubrel) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_pubcomp) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_subscribe) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_suback) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_unsubscribe) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_unsuback) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => {}, //println!("INFO : mqtt_recive in Disconnect:")
}
}
}
fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "10.1.2.2", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials("rustClient", "test");
let (client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("/cool/devices/KNMITemp/values", QoS::AtMostOnce).unwrap();
client.subscribe("clock/hour", QoS::AtMostOnce).unwrap();
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(_err) => println!("ERROR: publisher: faild to receve an message"),
Ok(msg) => {
println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
for (_i, notification) in connection.iter().enumerate() {
// println!("Notification = {:?}", notification);
match notification {
Err(n) => println!("ERROR: mqtt: {}", n),
Ok(n) => mqtt_recive(&mqtt_publish, n)
}
}
}