use mqttClient library

This commit is contained in:
Laila van Reenen 2025-01-30 11:48:28 +01:00
parent 92699cb37f
commit 2d7c7a76a9
Signed by: LailaTheElf
GPG Key ID: 8A3EF0226518C12D
3 changed files with 122 additions and 212 deletions

10
Cargo.lock generated
View File

@ -340,12 +340,22 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "mqtt-client"
version = "2.0.0"
source = "git+https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git?tag=v2.0.0#c924081133beb158fa71edc34d44ab254641293a"
dependencies = [
"crossbeam",
"rumqttc",
]
[[package]] [[package]]
name = "mqttClock" name = "mqttClock"
version = "1.1.0" version = "1.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"crossbeam", "crossbeam",
"mqtt-client",
"rumqttc", "rumqttc",
"serde", "serde",
"serde_yaml", "serde_yaml",

View File

@ -9,3 +9,4 @@ crossbeam = "0.8.4"
rumqttc = "0.24.0" rumqttc = "0.24.0"
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
mqtt-client = { tag = "v2.0.0", git = "https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git" }

View File

@ -1,101 +1,120 @@
use std::time::Duration; use std::{fs, thread, time::Duration};
use std::thread;
use std::fs;
use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS};
use chrono::{Local, Timelike, Datelike};
use crossbeam::channel::{unbounded, Sender};
use serde::Deserialize; use serde::Deserialize;
use chrono::{Local, Timelike, Datelike};
struct MqttMessage { use mqtt_client::mqtt_client::{QoS, Sender, Receiver, MqttMessage, MqttTool};
topic: String,
payload: String
}
fn send_publish(publish: &Sender<MqttMessage>, message: MqttMessage) { fn get_u16_from_i32(value: i32) -> Option<u16> {
match publish.send(message) { match u16::try_from(value) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n), Ok(n) => Some(n),
Ok(_n) => {} Err(_) => None
} }
} }
fn mqtt_clock(publish: Sender<MqttMessage>) { fn get_u16_from_u32(value: u32) -> Option<u16> {
// init last values with invalid values match u16::try_from(value) {
let mut last_year: i32 = 65535; Ok(n) => Some(n),
let mut last_month: u32 = 65535; Err(_) => None
let mut last_dom: u32 = 65535; }
let mut last_dow: u32 = 65535; }
let mut last_iso_week: u32 = 65535;
let mut last_iso_year: i32 = 65535;
let mut last_hour: u32 = 65535;
let mut last_minute: u32 = 65535;
let mut last_second: u32 = 65535;
loop { struct Clock {
let datetime = Local::now(); tx: Sender<MqttMessage>,
if last_second != datetime.second() { last_year: Option<u16>,
last_second = datetime.second(); last_month: Option<u16>,
send_publish(&publish, { MqttMessage { last_dom: Option<u16>,
topic: String::from("clock/time/second"), last_dow: Option<u16>,
payload: last_second.to_string() last_iso_week: Option<u16>,
}}); last_iso_year: Option<u16>,
if last_minute != datetime.minute() { last_hour: Option<u16>,
last_minute = datetime.minute(); last_minute: Option<u16>,
send_publish(&publish, { MqttMessage { last_second: Option<u16>
topic: String::from("clock/time/minute"), }
payload: last_minute.to_string() impl Clock {
}}); fn tx(&self, topic: String, value: Option<u16>, retain: bool) {
if last_hour != datetime.hour() { match value {
last_hour = datetime.hour(); None => {},
send_publish(&publish, { MqttMessage { Some(v) => {
topic: String::from("clock/time/hour"), let message = MqttMessage {
payload: last_hour.to_string() topic,
}}); payload: v.to_string(),
if last_dom != datetime.day() { retain: retain,
last_dom = datetime.day(); qos: QoS::AtMostOnce,
send_publish(&publish, { MqttMessage {topic: String::from("clock/date/dom"), };
payload: last_dom.to_string() match self.tx.send(message) {
}}); Err(n) => println!("ERROR: faild to send publish ({:?})", n),
if last_iso_week != datetime.iso_week().week() { Ok(_) => {}
last_iso_week = datetime.iso_week().week();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/isoWeek"),
payload: last_iso_week.to_string()
}});
if last_iso_year != datetime.iso_week().year() {
last_iso_year = datetime.iso_week().year();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/isoYear"),
payload: last_iso_year.to_string()
}});
}
}
if last_dow != datetime.weekday() as u32 {
last_dow = datetime.weekday() as u32;
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/dow"),
payload: last_dow.to_string()
}});
}
if last_month != datetime.month() {
last_month = datetime.month();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/month"),
payload: last_month.to_string()
}});
if last_year != datetime.year() {
last_year = datetime.year();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/year"),
payload: last_year.to_string()
}});
}
}
}
} }
} }
} }
thread::sleep(Duration::from_millis(500)); }
fn time(&mut self) {
let datetime = Local::now();
if self.last_second != get_u16_from_u32(datetime.second()) {
self.last_second = get_u16_from_u32(datetime.second());
self.tx(String::from("clock/time/second"), self.last_second, false);
if self.last_minute != get_u16_from_u32(datetime.minute()) {
self.last_minute = get_u16_from_u32(datetime.minute());
self.tx(String::from("clock/time/minute"), self.last_minute, false);
if self.last_hour != get_u16_from_u32(datetime.hour()) {
self.last_hour = get_u16_from_u32(datetime.hour());
self.tx(String::from("clock/time/hour"), self.last_hour, false);
self.date(datetime);
}
}
}
}
fn date(&mut self, datetime: chrono::DateTime<Local>) {
if self.last_dom != get_u16_from_u32(datetime.day()) {
self.last_dom = get_u16_from_u32(datetime.day());
self.tx(String::from("clock/date/dom"), self.last_dom, true);
if self.last_iso_week != get_u16_from_u32(datetime.iso_week().week()) {
self.last_iso_week = get_u16_from_u32(datetime.iso_week().week());
self.tx(String::from("clock/date/isoWeek"), self.last_iso_week, true);
if self.last_iso_year != get_u16_from_i32(datetime.iso_week().year()) {
self.last_iso_year = get_u16_from_i32(datetime.iso_week().year());
self.tx(String::from("clock/date/isoYear"), self.last_iso_year, true);
}
}
if self.last_dow != Some(datetime.weekday() as u16) {
self.last_dow = Some(datetime.weekday() as u16);
self.tx(String::from("clock/date/dow"), self.last_dow, true);
}
if self.last_month != get_u16_from_u32(datetime.month()) {
self.last_month = get_u16_from_u32(datetime.month());
self.tx(String::from("clock/date/month"), self.last_month, true);
if self.last_year != get_u16_from_i32(datetime.year()) {
self.last_year = get_u16_from_i32(datetime.year());
self.tx(String::from("clock/date/year"), self.last_year, true);
}
}
}
}
}
impl MqttTool for Clock {
fn new(_client: rumqttc::Client, tx: Sender<MqttMessage>) -> Clock {
Clock {
tx,
last_year: None,
last_month: None,
last_dom: None,
last_dow: None,
last_iso_week: None,
last_iso_year: None,
last_hour: None,
last_minute: None,
last_second: None
}
}
fn run(&mut self, _rx: Receiver<MqttMessage>) {
loop {
self.time();
thread::sleep(Duration::from_millis(500));
}
} }
} }
@ -152,133 +171,13 @@ fn read_config() -> Result<Settings,SettingsError> {
} }
fn main() { fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
// get setting // get setting
let conf_ok: bool;
let conf: Settings;
match read_config() { match read_config() {
Ok(n) => { Ok(conf) =>
conf = n; mqtt_client::mqtt_client::run::<Clock>(conf.mqtt.host, conf.mqtt.port, conf.mqtt.client, conf.mqtt.user, conf.mqtt.pass),
conf_ok = true; Err(_) => {}
},
Err(_) => {
conf = Settings {
mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()}
};
conf_ok = false;
}
} }
if conf_ok { println!("INFO : main: exit");
let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass);
let (client, mut connection) = Client::new(mqttoptions, 10);
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e),
Ok(msg) => {
// println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// treath mqtt clock
let mqtt_clock_stat = thread::Builder::new()
.name("mqtt_clock".to_string())
.spawn(move || {
mqtt_clock(mqtt_publish);
});
match mqtt_clock_stat {
Err(_n) => println!("ERROR: main: faild to start mqtt clock thread"),
Ok(_n) => {}
}
for (_i, notification) in connection.iter().enumerate() {
let mut delay: bool = false;
match notification {
Err(e) => match e {
rumqttc::ConnectionError::MqttState(state) => match state {
rumqttc::StateError::Io(e) => {
println!("ERROR: mqtt: Io ({}) {}", e.kind(), e);
delay = true;
},
rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"),
rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e),
rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"),
rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"),
rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"),
rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"),
rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e),
rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max }
=> println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max),
},
rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"),
rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"),
rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()),
rumqttc::ConnectionError::ConnectionRefused(code) => {
println!("ERROR: mqtt: ConnectionRefused {:?}", code);
delay = true;
},
rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet),
rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"),
rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error),
},
Ok(event) => {
match event {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"),
Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"),
Packet::Publish(_) => {},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => println!("INFO : mqtt: disconected"),
}
}
}
}
if delay {
thread::sleep(Duration::from_millis(500));
}
}
println!("INFO : main: exit");
}
} }