use mqtt-client library

This commit is contained in:
Laila van Reenen 2025-01-25 20:57:31 +01:00
parent 931fe5e3cc
commit bf03ecda12
Signed by: LailaTheElf
GPG Key ID: 8A3EF0226518C12D
3 changed files with 41 additions and 174 deletions

10
Cargo.lock generated
View File

@ -278,12 +278,22 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "mqtt-client"
version = "1.0.0"
source = "git+https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git?tag=v1.0.0#f0b34311457b26a0a3b3a7027845b407a3ca47ee"
dependencies = [
"crossbeam",
"rumqttc",
]
[[package]]
name = "mqttAutomation"
version = "1.1.0"
dependencies = [
"crossbeam",
"json",
"mqtt-client",
"rumqttc",
"serde",
"serde_yaml",

View File

@ -9,3 +9,4 @@ json = "0.12.4"
rumqttc = "0.24.0"
serde = { version = "1.0.217", features = ["derive"] }
serde_yaml = "0.9.34"
mqtt-client = { tag = "v1.0.0", git = "https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git" }

View File

@ -1,15 +1,9 @@
use std::time::Duration;
use std::thread;
use std::fs;
use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS};
use crossbeam::channel::{unbounded, Sender};
use crossbeam::channel::Sender;
use serde::Deserialize;
struct MqttMessage {
topic: String,
payload: String
}
use mqtt_client::mqtt_client;
mod json_parser {
pub enum Error {
@ -93,38 +87,49 @@ mod json_parser {
}
struct Automation {
publish: Sender<MqttMessage>,
tx: Sender<mqtt_client::MqttMessage>,
clock_dow: u8
}
impl Automation {
fn new(publish: Sender<MqttMessage>) -> Automation {
Automation {
publish,
clock_dow: u8::MAX
}
}
pub(self) fn tx(&self, message: MqttMessage) {
match self.publish.send(message) {
fn tx(&self, message: mqtt_client::MqttMessage) {
match self.tx.send(message) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n),
Ok(_n) => {}
}
}
pub(self) fn lamp01_set(&self, state: bool) {
fn lamp01_set(&self, state: bool) {
let payload: String;
if state {
payload = String::from("ON");
} else {
payload = String::from("OFF");
}
self.tx({ MqttMessage {
self.tx({ mqtt_client::MqttMessage {
topic: String::from("/cool/devices/lamp-01/set"),
payload: payload
}});
}
}
impl mqtt_client::MqttTool for Automation {
fn new(client: rumqttc::Client, tx: Sender<mqtt_client::MqttMessage>) -> Automation {
fn rx(&mut self, message: MqttMessage) {
match client.subscribe("clock/hour", rumqttc::QoS::AtMostOnce) {
Err(e) => println!("ERROR: main: faild to subscribe to clock/hour ({})", e),
Ok(_) => {}
}
match client.subscribe("/cool/devices/KNMITemp/values", rumqttc::QoS::AtMostOnce) {
Err(e) => println!("ERROR: main: faild to subscribe to KNMITemp/values ({})", e),
Ok(_) => {}
}
Automation {
tx,
clock_dow: u8::MAX
}
}
fn rx(&mut self, message: mqtt_client::MqttMessage) {
println!("INFO : mqtt_automation: {}: {}", message.topic, message.payload);
if message.topic.eq("clock/hour") {
@ -207,163 +212,14 @@ fn read_config() -> Result<Settings,SettingsError> {
}
}
fn mqtt_hadeler(mut connection: Connection, mqtt_publish: Sender<MqttMessage>) {
let mut automation: Automation = Automation::new(mqtt_publish);
for (_i, notification) in connection.iter().enumerate() {
let mut delay: bool = false;
match notification {
Err(e) => match e {
rumqttc::ConnectionError::MqttState(state) => match state {
rumqttc::StateError::Io(e) => {
println!("ERROR: mqtt: Io ({}) {}", e.kind(), e);
delay = true;
},
rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"),
rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e),
rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"),
rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"),
rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"),
rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"),
rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e),
rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max }
=> println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max),
},
rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"),
rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"),
rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()),
rumqttc::ConnectionError::ConnectionRefused(code) => {
println!("ERROR: mqtt: ConnectionRefused {:?}", code);
delay = true;
},
rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet),
rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"),
rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error),
},
Ok(event) => {
match event {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"),
Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"),
Packet::Publish(msg) => {
let mut payload: String = String::from("");
match String::from_utf8(msg.payload.to_vec()) {
Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e),
Ok(v) => payload = v
};
if payload.len() > 0 {
let message: MqttMessage = { MqttMessage {
topic: msg.topic,
payload: payload
}};
automation.rx(message);
}
},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => println!("INFO : mqtt: disconected"),
}
}
}
}
if delay {
thread::sleep(Duration::from_millis(500));
}
}
}
fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
// get setting
let conf_ok: bool;
let conf: Settings;
match read_config() {
Ok(n) => {
conf = n;
conf_ok = true;
},
Err(_) => {
conf = Settings {
mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()}
};
conf_ok = false;
}
Ok(conf) =>
mqtt_client::run::<Automation>(conf.mqtt.host, conf.mqtt.port, conf.mqtt.client, conf.mqtt.user, conf.mqtt.pass),
Err(_) => {}
}
if conf_ok {
let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass);
let (client, connection) = Client::new(mqttoptions, 10);
match client.subscribe("clock/hour", QoS::AtMostOnce) {
Err(e) => println!("ERROR: main: faild to subscribe to clock/hour ({})", e),
Ok(_) => {}
}
match client.subscribe("/cool/devices/KNMITemp/values", QoS::AtMostOnce) {
Err(e) => println!("ERROR: main: faild to subscribe to .../KNMITemp/values ({})", e),
Ok(_) => {}
}
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e),
Ok(msg) => {
println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// thread mqtt
let mqtt = thread::Builder::new()
.name("mqtt".to_string())
.spawn(move || {
mqtt_hadeler(connection, mqtt_publish);
});
match mqtt {
Err(_n) => println!("ERROR: main: fait to create mqtt thread"),
Ok(join) => match join.join() {
Err(_) => println!("ERROR: main: failt to join mqtt thread"),
Ok(_) => {}
}
}
println!("INFO : main: exit");
}
println!("INFO : main: exit");
}