Compare commits

..

6 Commits
v1.0.1 ... main

Author SHA1 Message Date
a5bca9058a
fix time format 2025-05-05 16:33:56 +02:00
ed1d701d94
add date and time object publishes 2025-05-05 16:19:25 +02:00
77e45e1d2f
bumb mqttClient lib version 2025-01-31 21:18:55 +01:00
1edc7870c4
bump version 2025-01-30 11:59:57 +01:00
2d7c7a76a9
use mqttClient library 2025-01-30 11:48:28 +01:00
92699cb37f
fix double double client bug 2025-01-25 14:07:53 +01:00
5 changed files with 195 additions and 184 deletions

12
Cargo.lock generated
View File

@ -340,12 +340,22 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "mqtt-client"
version = "4.0.0"
source = "git+https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git?tag=v4.0.0#49e8adf2eb768fcd147f0d6508a2f32eed86a641"
dependencies = [
"crossbeam",
"rumqttc",
]
[[package]] [[package]]
name = "mqttClock" name = "mqttClock"
version = "1.0.0" version = "1.3.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"crossbeam", "crossbeam",
"mqtt-client",
"rumqttc", "rumqttc",
"serde", "serde",
"serde_yaml", "serde_yaml",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "mqttClock" name = "mqttClock"
version = "1.0.0" version = "1.3.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -9,3 +9,4 @@ crossbeam = "0.8.4"
rumqttc = "0.24.0" rumqttc = "0.24.0"
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
mqtt-client = { tag = "v4.0.0", git = "https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git" }

7
build.sh Normal file
View File

@ -0,0 +1,7 @@
#!/bin/sh
cross build --target aarch64-unknown-linux-gnu --release
cargo build --release
cp target/aarch64-unknown-linux-gnu/release/mqttClock mqttClock-aarch64
cp target/release/mqttClock mqttClock-x86_64

View File

@ -1,5 +1,6 @@
mqtt: mqtt:
host: "localhost" host: "localhost"
port: 1883 port: 1883
client: "mqttClock"
user: "mqttClock" user: "mqttClock"
pass: "password" pass: "password"

View File

@ -1,101 +1,175 @@
use std::time::Duration; use std::{fs, time::Duration};
use std::thread;
use std::fs;
use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS};
use chrono::{Local, Timelike, Datelike};
use crossbeam::channel::{unbounded, Sender};
use serde::Deserialize; use serde::Deserialize;
use chrono::{Local, Timelike, Datelike};
struct MqttMessage { use mqtt_client::{mqtt_client, MqttMessage, MqttEvent, Sender, Receiver, QoS, Client, MqttTool};
topic: String,
payload: String
}
fn send_publish(publish: &Sender<MqttMessage>, message: MqttMessage) { fn get_u16_from_i32(value: i32) -> Option<u16> {
match publish.send(message) { match u16::try_from(value) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n), Ok(n) => Some(n),
Ok(_n) => {} Err(_) => None
} }
} }
fn mqtt_clock(publish: Sender<MqttMessage>) { fn get_u16_from_u32(value: u32) -> Option<u16> {
// init last values with invalid values match u16::try_from(value) {
let mut last_year: i32 = 65535; Ok(n) => Some(n),
let mut last_month: u32 = 65535; Err(_) => None
let mut last_dom: u32 = 65535; }
let mut last_dow: u32 = 65535; }
let mut last_iso_week: u32 = 65535;
let mut last_iso_year: i32 = 65535;
let mut last_hour: u32 = 65535;
let mut last_minute: u32 = 65535;
let mut last_second: u32 = 65535;
loop { struct Clock {
let datetime = Local::now(); tx: Sender<MqttMessage>,
if last_second != datetime.second() { // client: Client,
last_second = datetime.second(); connected: bool,
send_publish(&publish, { MqttMessage {
topic: String::from("clock/time/second"), last_year: Option<u16>,
payload: last_second.to_string() last_month: Option<u16>,
}}); last_dom: Option<u16>,
if last_minute != datetime.minute() { last_dow: Option<u16>,
last_minute = datetime.minute(); last_iso_week: Option<u16>,
send_publish(&publish, { MqttMessage { last_iso_year: Option<u16>,
topic: String::from("clock/time/minute"), last_hour: Option<u16>,
payload: last_minute.to_string() last_minute: Option<u16>,
}}); last_second: Option<u16>
if last_hour != datetime.hour() { }
last_hour = datetime.hour(); impl Clock {
send_publish(&publish, { MqttMessage { fn tx(&self, topic: String, value: Option<u16>, retain: bool) {
topic: String::from("clock/time/hour"), match value {
payload: last_hour.to_string() None => {},
}}); Some(v) => {
if last_dom != datetime.day() { let message = MqttMessage {
last_dom = datetime.day(); topic,
send_publish(&publish, { MqttMessage {topic: String::from("clock/date/dom"), payload: v.to_string(),
payload: last_dom.to_string() retain: retain,
}}); qos: QoS::AtMostOnce,
if last_iso_week != datetime.iso_week().week() { };
last_iso_week = datetime.iso_week().week(); match self.tx.send(message) {
send_publish(&publish, { MqttMessage { Err(n) => println!("ERROR: faild to send publish ({:?})", n),
topic: String::from("clock/date/isoWeek"), Ok(_) => {}
payload: last_iso_week.to_string()
}});
if last_iso_year != datetime.iso_week().year() {
last_iso_year = datetime.iso_week().year();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/isoYear"),
payload: last_iso_year.to_string()
}});
}
}
if last_dow != datetime.weekday() as u32 {
last_dow = datetime.weekday() as u32;
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/dow"),
payload: last_dow.to_string()
}});
}
if last_month != datetime.month() {
last_month = datetime.month();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/month"),
payload: last_month.to_string()
}});
if last_year != datetime.year() {
last_year = datetime.year();
send_publish(&publish, { MqttMessage {
topic: String::from("clock/date/year"),
payload: last_year.to_string()
}});
}
}
}
} }
} }
} }
thread::sleep(Duration::from_millis(500)); }
fn tx_time(&self) {
let second = self.last_second;
let minute = self.last_minute;
let hour = self.last_hour;
if let (Some(second), Some(minute), Some(hour)) = (second, minute, hour) {
let payload =
format!("{{\"second\":{},\"minute\":{},\"hour\":{},\"time\":\"{}:{:02}:{:02}\"}}",
second, minute, hour, hour, minute, second);
let message = MqttMessage {
topic: String::from("clock/time/time"),
payload,
retain: false,
qos: QoS::AtMostOnce,
};
match self.tx.send(message) {
Err(n) =>
println!("ERROR: faild to send publish ({:?})", n),
Ok(_) => {}
}
}
}
fn time(&mut self) {
let datetime = Local::now();
if self.last_second != get_u16_from_u32(datetime.second()) {
self.last_second = get_u16_from_u32(datetime.second());
if self.last_minute != get_u16_from_u32(datetime.minute()) {
self.last_minute = get_u16_from_u32(datetime.minute());
if self.last_hour != get_u16_from_u32(datetime.hour()) {
self.last_hour = get_u16_from_u32(datetime.hour());
self.tx(String::from("clock/time/hour"), self.last_hour, true);
self.date(datetime);
}
self.tx(String::from("clock/time/minute"), self.last_minute, false);
}
self.tx(String::from("clock/time/second"), self.last_second, false);
self.tx_time();
}
}
fn date(&mut self, datetime: chrono::DateTime<Local>) {
if self.last_dom != get_u16_from_u32(datetime.day()) {
self.last_dom = get_u16_from_u32(datetime.day());
if self.last_iso_week != get_u16_from_u32(datetime.iso_week().week()) {
self.last_iso_week = get_u16_from_u32(datetime.iso_week().week());
if self.last_iso_year != get_u16_from_i32(datetime.iso_week().year()) {
self.last_iso_year = get_u16_from_i32(datetime.iso_week().year());
self.tx(String::from("clock/date/isoYear"), self.last_iso_year, true);
}
self.tx(String::from("clock/date/isoWeek"), self.last_iso_week, true);
}
self.tx(String::from("clock/date/dom"), self.last_dom, true);
if self.last_month != get_u16_from_u32(datetime.month()) {
self.last_month = get_u16_from_u32(datetime.month());
if self.last_year != get_u16_from_i32(datetime.year()) {
self.last_year = get_u16_from_i32(datetime.year());
self.tx(String::from("clock/date/year"), self.last_year, true);
}
self.tx(String::from("clock/date/month"), self.last_month, true);
}
self.last_dow = Some(datetime.weekday() as u16);
self.tx(String::from("clock/date/dow"), self.last_dow, true);
}
}
fn init(&mut self) {
self.last_year = None;
self.last_month = None;
self.last_dom = None;
self.last_dow = None;
self.last_iso_week = None;
self.last_iso_year = None;
self.last_hour = None;
}
}
impl MqttTool<i8> for Clock {
fn new(tx: Sender<MqttMessage>, _config: i8, _client: Client) -> Clock {
Clock {
tx,
// client,
connected: false,
last_year: None,
last_month: None,
last_dom: None,
last_dow: None,
last_iso_week: None,
last_iso_year: None,
last_hour: None,
last_minute: None,
last_second: None
}
}
fn run(&mut self, event: Receiver<MqttEvent>) {
loop {
match event.recv_timeout(Duration::from_millis(500)) {
Ok(e) => {
match e {
MqttEvent::Connected => {
self.init();
self.connected = true;
},
MqttEvent::Disconnected => {
self.connected = false;
},
MqttEvent::Message(_) => {/* nothing to do, no subscriptions */}
}
},
Err(_) => {},
}
if self.connected {
self.time();
}
}
} }
} }
@ -104,6 +178,7 @@ fn mqtt_clock(publish: Sender<MqttMessage>) {
struct SettingsMQTT { struct SettingsMQTT {
host: String, host: String,
port: u16, port: u16,
client: String,
user: String, user: String,
pass: String pass: String
} }
@ -151,103 +226,20 @@ fn read_config() -> Result<Settings,SettingsError> {
} }
fn main() { fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
// get setting // get setting
let conf_ok: bool;
let conf: Settings;
match read_config() { match read_config() {
Ok(n) => { Ok(conf) =>
conf = n; mqtt_client::run::<i8, Clock>(
conf_ok = true; conf.mqtt.host,
}, conf.mqtt.port,
Err(_) => { conf.mqtt.client,
conf = Settings { conf.mqtt.user,
mqtt: SettingsMQTT {host:String::new(),port:0,user:String::new(),pass:String::new()} conf.mqtt.pass,
}; 0
conf_ok = false; ),
} Err(_) => {}
} }
if conf_ok { println!("INFO : main: exit");
let mut mqttoptions = MqttOptions::new("rumqtt-sync", conf.mqtt.host, conf.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass);
let (client, mut connection) = Client::new(mqttoptions, 10);
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e),
Ok(msg) => {
// println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// treath mqtt clock
let mqtt_clock_stat = thread::Builder::new()
.name("mqtt_clock".to_string())
.spawn(move || {
mqtt_clock(mqtt_publish);
});
match mqtt_clock_stat {
Err(_n) => println!("ERROR: main: faild to start mqtt clock thread"),
Ok(_n) => {}
}
for (_i, notification) in connection.iter().enumerate() {
match notification {
Err(e) => println!("ERROR: mqtt: {}", e),
Ok(event) => {
match event {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"), //println!("INFO : mqtt_recive: in Connect"),
Packet::ConnAck(_) => println!("INFO : mqtt: conn ack"), //println!("INFO : mqtt_recive: in ConnAck"),
Packet::Publish(_) => {},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => println!("INFO : mqtt: disconected"),
}
}
}
}
}
println!("INFO : main: exit");
}
} }