From eb46bec9d2602ce048b573e0a81b34f0491474d9 Mon Sep 17 00:00:00 2001 From: LailaTheElf Date: Wed, 22 Jan 2025 15:47:07 +0100 Subject: [PATCH] create first automation --- src/main.rs | 174 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 113 insertions(+), 61 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1fd8d4c..b288735 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,37 @@ -use std::time::Duration; +use std::{time::Duration}; use std::thread; -use rumqttc::{MqttOptions, Client, QoS}; +use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS}; use chrono::{Local, Timelike, Datelike}; -use crossbeam::channel::unbounded; +use crossbeam::channel::{unbounded, Sender}; struct MqttMessage { topic: String, payload: String } -fn mqtt_clock(publish: crossbeam::channel::Sender) { + +fn send_publish(publish: &Sender, message: MqttMessage) { + match publish.send(message) { + Err(n) => println!("ERROR: faild to send publish ({:?})", n), + Ok(_n) => {} + } +} + +fn lamp_on(publish: &Sender) { + send_publish(publish, { MqttMessage { + topic: String::from("/cool/devices/lamp-01/set"), + payload: String::from("ON") + }}); +} + +fn mqtt_automation(publish: &Sender, message: MqttMessage) { + if message.topic.eq("clock/hour") && message.payload.eq("16") { + lamp_on(publish); + } +} + +fn mqtt_clock(publish: Sender) { // let clock_topic: String = String::from("clock"); // init last values with invalid values @@ -27,100 +48,68 @@ fn mqtt_clock(publish: crossbeam::channel::Sender) { loop { let datetime = Local::now(); - println!("DEBUG: mqtt_clock: {}", datetime.format("%Y-%m-%d %H:%M:%S")); - println!("DEBUG: mqtt_clock: last_second={}", last_second); if last_second != datetime.second() { last_second = datetime.second(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/second"), payload: last_second.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send second"), - Ok(_n) => println!("DEBUG: mqtt_clock: send second") - } + }}); if last_minute != datetime.minute() { last_minute = datetime.minute(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/minute"), payload: last_minute.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send minute"), - Ok(_n) => {} - } + }}); if last_hour != datetime.hour() { last_hour = datetime.hour(); // last_hour12 = datetime.hour12(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/hour"), payload: last_hour.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send hour"), - Ok(_n) => {} - } - // match publish.send({ MqttMessage { + }}); + // send_publish(&publish, { MqttMessage { // topic: String::from("clock/hour12"), // payload: last_hour12.to_string() - // }}) { - // Err(_n) => println!("ERROR: mqtt_clock: faild to send hour12"), - // Ok(_n) => {} - // } + // }}); if last_dom != datetime.day() { last_dom = datetime.day(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/dom"), payload: last_dom.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send dom"), - Ok(_n) => {} - } + }}); if last_iso_week != datetime.iso_week().week() { last_iso_week = datetime.iso_week().week(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/isoWeek"), payload: last_iso_week.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send iso week"), - Ok(_n) => {} - } + }}); if last_iso_year != datetime.iso_week().year() { last_iso_year = datetime.iso_week().year(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/isoYear"), payload: last_iso_year.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send iso year"), - Ok(_n) => {} - } + }}); } } if last_dow != datetime.weekday() as u32 { last_dow = datetime.weekday() as u32; - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/dow"), payload: last_dow.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send dow"), - Ok(_n) => {} - } + }}); } if last_month != datetime.month() { last_month = datetime.month(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/month"), payload: last_month.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send month"), - Ok(_n) => {} - } + }}); if last_year != datetime.year() { last_year = datetime.year(); - match publish.send({ MqttMessage { + send_publish(&publish, { MqttMessage { topic: String::from("clock/year"), payload: last_year.to_string() - }}) { - Err(_n) => println!("ERROR: mqtt_clock: faild to send year"), - Ok(_n) => {} - } + }}); } } } @@ -131,15 +120,66 @@ fn mqtt_clock(publish: crossbeam::channel::Sender) { } } +fn mqtt_recive(publish: &Sender, e: Event) { + match e { + Event::Outgoing(n) => match n { + + Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), + Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), + Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), + Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), + Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), + Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), + Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), + Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), + Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), + Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), + Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") + }, + Event::Incoming(n) => match n { + Packet::Connect(_connect) => {}, //println!("INFO : mqtt_recive: in Connect"), + Packet::ConnAck(_connack) => {}, //println!("INFO : mqtt_recive: in ConnAck"), + Packet::Publish(msg) => { + // println!("INFO : mqtt_recive: in msg {:?}", msg); + + let payload = match String::from_utf8(msg.payload.to_vec()) { + Err(e) => panic!("ERROR: pqtt_recive: faild to decode payload ({})", e), + Ok(v) => v + }; + + let message: MqttMessage = { MqttMessage { + topic: msg.topic, + payload: payload + }}; + + mqtt_automation(&publish, message); + }, + Packet::PubAck(_puback) => {}, //println!("INFO : mqtt_recive: in PubAck"), + Packet::PubRec(_pubrec) => {}, //println!("INFO : mqtt_recive: in PubRec"), + Packet::PubRel(_pubrel) => {}, //println!("INFO : mqtt_recive: in PubRel"), + Packet::PubComp(_pubcomp) => {}, //println!("INFO : mqtt_recive: in PubComp"), + Packet::Subscribe(_subscribe) => {}, //println!("INFO : mqtt_recive: in Subscribe"), + Packet::SubAck(_suback) => {}, //println!("INFO : mqtt_recive: in SubAck"), + Packet::Unsubscribe(_unsubscribe) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), + Packet::UnsubAck(_unsuback) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), + Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), + Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), + Packet::Disconnect => {}, //println!("INFO : mqtt_recive in Disconnect:") + } + } +} fn main() { let (mqtt_publish, mqtt_publish_rx) = unbounded::(); let mut mqttoptions = MqttOptions::new("rumqtt-sync", "10.1.2.2", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); - let (client, _connection) = Client::new(mqttoptions, 10); - // client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap(); - + mqttoptions.set_credentials("rustClient", "test"); + let (client, mut connection) = Client::new(mqttoptions, 10); + + client.subscribe("/cool/devices/#", QoS::AtMostOnce).unwrap(); + client.subscribe("clock/#", QoS::AtMostOnce).unwrap(); + // treath publisher let publisher = thread::Builder::new() .name("publisher".to_string()) @@ -149,7 +189,7 @@ fn main() { match message { Err(_err) => println!("ERROR: publisher: faild to receve an message"), Ok(msg) => { - println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); + println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload); match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { Err(_n) => println!("ERROR: publisher: faild to publish"), Ok(_n) => {} @@ -158,6 +198,10 @@ fn main() { } } }); + match publisher { + Err(_n) => println!("ERROR: main: fait to create publisher thread"), + Ok(_n) => {} + } // treath mqtt clock let mqtt_publish_clock = mqtt_publish.clone(); @@ -168,6 +212,14 @@ fn main() { }); match mqtt_clock_stat { Err(_n) => println!("ERROR: mqtt clock: faild to start thread"), - Ok(n) => n.join().expect("msg") + Ok(_n) => {} + } + + for (_i, notification) in connection.iter().enumerate() { + // println!("Notification = {:?}", notification); + match notification { + Err(n) => println!("ERROR: mqtt: {}", n), + Ok(n) => mqtt_recive(&mqtt_publish, n) + } } }