create first automation

This commit is contained in:
Laila van Reenen 2025-01-22 15:47:07 +01:00
parent d77a3da68d
commit eb46bec9d2
Signed by: LailaTheElf
GPG Key ID: 8A3EF0226518C12D

View File

@ -1,16 +1,37 @@
use std::time::Duration;
use std::{time::Duration};
use std::thread;
use rumqttc::{MqttOptions, Client, QoS};
use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS};
use chrono::{Local, Timelike, Datelike};
use crossbeam::channel::unbounded;
use crossbeam::channel::{unbounded, Sender};
struct MqttMessage {
topic: String,
payload: String
}
fn mqtt_clock(publish: crossbeam::channel::Sender<MqttMessage>) {
fn send_publish(publish: &Sender<MqttMessage>, message: MqttMessage) {
match publish.send(message) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n),
Ok(_n) => {}
}
}
fn lamp_on(publish: &Sender<MqttMessage>) {
send_publish(publish, { MqttMessage {
topic: String::from("/cool/devices/lamp-01/set"),
payload: String::from("ON")
}});
}
fn mqtt_automation(publish: &Sender<MqttMessage>, message: MqttMessage) {
if message.topic.eq("clock/hour") && message.payload.eq("16") {
lamp_on(publish);
}
}
fn mqtt_clock(publish: Sender<MqttMessage>) {
// let clock_topic: String = String::from("clock");
// init last values with invalid values
@ -27,100 +48,68 @@ fn mqtt_clock(publish: crossbeam::channel::Sender<MqttMessage>) {
loop {
let datetime = Local::now();
println!("DEBUG: mqtt_clock: {}", datetime.format("%Y-%m-%d %H:%M:%S"));
println!("DEBUG: mqtt_clock: last_second={}", last_second);
if last_second != datetime.second() {
last_second = datetime.second();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/second"),
payload: last_second.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send second"),
Ok(_n) => println!("DEBUG: mqtt_clock: send second")
}
}});
if last_minute != datetime.minute() {
last_minute = datetime.minute();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/minute"),
payload: last_minute.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send minute"),
Ok(_n) => {}
}
}});
if last_hour != datetime.hour() {
last_hour = datetime.hour();
// last_hour12 = datetime.hour12();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/hour"),
payload: last_hour.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send hour"),
Ok(_n) => {}
}
// match publish.send({ MqttMessage {
}});
// send_publish(&publish, { MqttMessage {
// topic: String::from("clock/hour12"),
// payload: last_hour12.to_string()
// }}) {
// Err(_n) => println!("ERROR: mqtt_clock: faild to send hour12"),
// Ok(_n) => {}
// }
// }});
if last_dom != datetime.day() {
last_dom = datetime.day();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/dom"),
payload: last_dom.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send dom"),
Ok(_n) => {}
}
}});
if last_iso_week != datetime.iso_week().week() {
last_iso_week = datetime.iso_week().week();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/isoWeek"),
payload: last_iso_week.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send iso week"),
Ok(_n) => {}
}
}});
if last_iso_year != datetime.iso_week().year() {
last_iso_year = datetime.iso_week().year();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/isoYear"),
payload: last_iso_year.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send iso year"),
Ok(_n) => {}
}
}});
}
}
if last_dow != datetime.weekday() as u32 {
last_dow = datetime.weekday() as u32;
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/dow"),
payload: last_dow.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send dow"),
Ok(_n) => {}
}
}});
}
if last_month != datetime.month() {
last_month = datetime.month();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/month"),
payload: last_month.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send month"),
Ok(_n) => {}
}
}});
if last_year != datetime.year() {
last_year = datetime.year();
match publish.send({ MqttMessage {
send_publish(&publish, { MqttMessage {
topic: String::from("clock/year"),
payload: last_year.to_string()
}}) {
Err(_n) => println!("ERROR: mqtt_clock: faild to send year"),
Ok(_n) => {}
}
}});
}
}
}
@ -131,15 +120,66 @@ fn mqtt_clock(publish: crossbeam::channel::Sender<MqttMessage>) {
}
}
fn mqtt_recive(publish: &Sender<MqttMessage>, e: Event) {
match e {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_connect) => {}, //println!("INFO : mqtt_recive: in Connect"),
Packet::ConnAck(_connack) => {}, //println!("INFO : mqtt_recive: in ConnAck"),
Packet::Publish(msg) => {
// println!("INFO : mqtt_recive: in msg {:?}", msg);
let payload = match String::from_utf8(msg.payload.to_vec()) {
Err(e) => panic!("ERROR: pqtt_recive: faild to decode payload ({})", e),
Ok(v) => v
};
let message: MqttMessage = { MqttMessage {
topic: msg.topic,
payload: payload
}};
mqtt_automation(&publish, message);
},
Packet::PubAck(_puback) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_pubrec) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_pubrel) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_pubcomp) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_subscribe) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_suback) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_unsubscribe) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_unsuback) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => {}, //println!("INFO : mqtt_recive in Disconnect:")
}
}
}
fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "10.1.2.2", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, _connection) = Client::new(mqttoptions, 10);
// client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
mqttoptions.set_credentials("rustClient", "test");
let (client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("/cool/devices/#", QoS::AtMostOnce).unwrap();
client.subscribe("clock/#", QoS::AtMostOnce).unwrap();
// treath publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
@ -149,7 +189,7 @@ fn main() {
match message {
Err(_err) => println!("ERROR: publisher: faild to receve an message"),
Ok(msg) => {
println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload);
println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
@ -158,6 +198,10 @@ fn main() {
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// treath mqtt clock
let mqtt_publish_clock = mqtt_publish.clone();
@ -168,6 +212,14 @@ fn main() {
});
match mqtt_clock_stat {
Err(_n) => println!("ERROR: mqtt clock: faild to start thread"),
Ok(n) => n.join().expect("msg")
Ok(_n) => {}
}
for (_i, notification) in connection.iter().enumerate() {
// println!("Notification = {:?}", notification);
match notification {
Err(n) => println!("ERROR: mqtt: {}", n),
Ok(n) => mqtt_recive(&mqtt_publish, n)
}
}
}