Compare commits

...

15 Commits
v1.1.0 ... main

11 changed files with 1147 additions and 313 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target /target
/mqttAutomation-*

222
Cargo.lock generated
View File

@ -17,6 +17,21 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.4.0" version = "1.4.0"
@ -44,6 +59,12 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
[[package]]
name = "bumpalo"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.9.0" version = "1.9.0"
@ -65,6 +86,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-link",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@ -208,6 +243,30 @@ version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "iana-time-zone"
version = "0.1.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.7.1" version = "2.7.1"
@ -224,6 +283,16 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]] [[package]]
name = "json" name = "json"
version = "0.12.4" version = "0.12.4"
@ -279,16 +348,36 @@ dependencies = [
] ]
[[package]] [[package]]
name = "mqttAutomation" name = "mqtt-client"
version = "1.1.0" version = "4.0.0"
source = "git+https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git?tag=v4.0.0#49e8adf2eb768fcd147f0d6508a2f32eed86a641"
dependencies = [ dependencies = [
"crossbeam",
"rumqttc",
]
[[package]]
name = "mqttAutomation"
version = "1.3.2"
dependencies = [
"chrono",
"crossbeam", "crossbeam",
"json", "json",
"mqtt-client",
"rumqttc", "rumqttc",
"serde", "serde",
"serde_yaml", "serde_yaml",
] ]
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "object" name = "object"
version = "0.36.7" version = "0.36.7"
@ -298,6 +387,12 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "once_cell"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]] [[package]]
name = "openssl-probe" name = "openssl-probe"
version = "0.1.5" version = "0.1.5"
@ -426,6 +521,12 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "rustversion"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.18" version = "1.0.18"
@ -636,6 +737,123 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]]
name = "windows-core"
version = "0.61.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
[[package]]
name = "windows-result"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
dependencies = [
"windows-link",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.52.0" version = "0.52.0"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "mqttAutomation" name = "mqttAutomation"
version = "1.1.0" version = "1.3.2"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -9,3 +9,5 @@ json = "0.12.4"
rumqttc = "0.24.0" rumqttc = "0.24.0"
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
mqtt-client = { tag = "v4.0.0", git = "https://gitea.finnvanreenen.nl/LailaTheElf/mqttClient.git" }
chrono = "0.4.41"

9
build.sh Normal file
View File

@ -0,0 +1,9 @@
#!/bin/sh
set -e
cross build --target aarch64-unknown-linux-gnu --release
cargo build --release
cp target/aarch64-unknown-linux-gnu/release/mqttAutomation mqttAutomation-aarch64
cp target/release/mqttAutomation mqttAutomation-x86_64

View File

@ -4,3 +4,22 @@ mqtt:
client: "mqttAutomation01" client: "mqttAutomation01"
user: "mqttAutomation" user: "mqttAutomation"
pass: "password" pass: "password"
automation:
base_topic: "/kees/automation/"
alarm_hour: 7
alarm_minute: 0
# auto:
# - trigger:
# !Time
# date:
# weekday: "0,1,2,3,4"
# time:
# hour: "7"
# minute: "0"
# seconds: "0"
# actions:
# - !Publish
# type: publish
# topic: "/cool/devices/lamp-01/set"
# pauload: "ON"

View File

@ -0,0 +1,66 @@
mod triggers;
pub mod automation {
use std::collections::HashMap;
use serde::Deserialize;
pub use crate::automation::auto::triggers::triggers;
mod actions {
use serde::Deserialize;
#[derive(Deserialize, Clone)]
struct Publish {
topic: String,
payload: Option<String>,
retain: Option<bool>
}
#[derive(Deserialize, Clone)]
pub enum Action {
Publish(Publish),
}
}
#[derive(Deserialize, Clone)]
pub struct Automation {
pub trigger: triggers::Trigger,
pub actions: Vec<actions::Action>
}
pub struct Generator {
mqtt_triggers: HashMap<String, triggers::Mqtt>,
time_triggers: HashMap<String, triggers::Datetime>,
}
impl Generator {
pub fn new() -> Generator {
Generator {
mqtt_triggers: HashMap::new(),
time_triggers: HashMap::new()
}
}
pub fn read_automations(&mut self, automations: Vec<Automation>) {
for auto in automations {
self.read_automation_single(auto);
}
}
fn read_automation_single(&mut self, auto: Automation) {
// let auto_cloned = auto.clone();
match auto.trigger {
triggers::Trigger::Mqtt(mqtt) => {
self.mqtt_triggers.insert(mqtt.topic.clone(), mqtt.clone());
},
triggers::Trigger::Time(_datetime) => {
// self.time_triggers.insert(mqtt.topic, auto_cloned);
todo!()
}
}
}
}
}

View File

@ -0,0 +1,39 @@
mod datetime;
pub mod triggers {
use serde::Deserialize;
pub trait TriggerBase {
fn get_next_trigger(&self) -> String;
}
#[derive(Deserialize, Clone)]
enum StringOrU16 {
String(String),
U16(u16)
}
// ===== mqtt
#[derive(Deserialize, Clone)]
pub struct Mqtt {
pub topic: String,
pub payload: Option<String>
}
impl TriggerBase for Mqtt {
fn get_next_trigger(&self) -> String {
self.topic.clone()
}
}
// ===== datetime
pub use crate::automation::auto::triggers::datetime::trigger_datetime::Datetime;
#[derive(Deserialize, Clone)]
pub enum Trigger {
Mqtt(Mqtt),
Time(Datetime),
}
}

View File

@ -0,0 +1,316 @@
pub mod trigger_datetime {
use serde::Deserialize;
use crate::automation::auto::triggers::triggers::TriggerBase;
use chrono::{DateTime, Datelike, Local, NaiveDate, TimeZone, Timelike};
fn get_u16_from_u32(value: u32) -> Option<u16> {
match u16::try_from(value) {
Ok(n) => Some(n),
Err(_) => None
}
}
fn get_u16_from_i32(value: i32) -> Option<u16> {
match u16::try_from(value) {
Ok(n) => Some(n),
Err(_) => None
}
}
#[derive(Debug)]
pub enum CompareError {
InvalidValue,
DivNaN
}
#[derive(Deserialize, Clone)]
struct Date {
year: Option<String>,
month: Option<String>,
date: Option<String>,
weekday: Option<String>
}
#[derive(Deserialize, Clone)]
struct Time {
hour: Option<String>,
minute: Option<String>,
second: Option<String>
}
#[derive(Deserialize, Clone)]
pub struct Datetime {
date: Date,
time: Time
}
impl Datetime {
pub fn first_match(condition: String, start: u16, end: u16) -> Result<Option<u16>, CompareError> {
if condition == "*" {
return Ok(Some(start));
}
if condition.starts_with("*/") {
let div = condition[2..].to_string();
let mut options: Vec<u16> = (start..end).collect();
match div.parse::<u16>() {
Ok(n) => {
options.retain(|x| x % n == 0);
},
Err(_) => {
return Err(CompareError::DivNaN);
},
}
let min = options.iter().min();
match min {
Some(min) => {
return Ok(Some(min.clone()));
},
None => {
return Ok(None);
},
}
}
if condition.split(',').count() > 0 {
let parts = condition.split(',');
let mut options: Vec<u16> = [].to_vec();
for part in parts {
match part.parse::<u16>() {
Ok(n) => {
options.push(n);
},
Err(_) => {
return Err(CompareError::DivNaN);
},
}
}
options.retain(|x| *x >= start && *x <= end);
let min = options.iter().min();
match min {
Some(min) => {
return Ok(Some(min.clone()));
},
None => {
return Ok(None);
},
}
}
match condition.parse::<u16>() {
Ok(n) => {
if n >= start && n <= end {
return Ok(Some(n))
}
else {
return Ok(None);
}
},
Err(_) => {
return Err(CompareError::InvalidValue);
},
}
}
fn check_thing(&self, option: String, start: u16, end: u16) -> Option<u16> {
match self::Datetime::first_match(option, start, end) {
Ok(r) => {
return r
},
Err(e) => {
println!("ERROR: trigger.datetime.find_next: failed to parse month: {:?}", e);
return None
},
}
}
fn get_days_in_month(date: &DateTime<Local>) -> u16 {
let year = date.year();
let month = date.month();
let date = match NaiveDate::from_ymd_opt(year, month + 1, 1) {
Some(d) => d,
None => NaiveDate::from_ymd_opt(year + 1, 1, 1).unwrap()
};
let d = date.signed_duration_since(NaiveDate::from_ymd_opt(year, month, 1).unwrap());
u16::try_from(d.num_days()).unwrap()
}
fn find_next_trigger(&self, from: DateTime<Local>) -> Option<DateTime<Local>> {
let mut next = from.clone();
// year
match &self.date.year {
Some(y) => {
match self::Datetime::first_match(y.to_string(), get_u16_from_i32(next.year()).unwrap(), u16::MAX) {
Ok(r) => {
match r {
Some(y) => {
if get_u16_from_i32(next.year()).unwrap() != y {
next = match Local::with_ymd_and_hms(&Local, i32::from(y), 1, 1, 0, 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None,
}
}
},
None => return None,
}
},
Err(e) => {
println!("ERROR: trigger.datetime.find_next: failed to parse year: {:?}", e);
return None
},
}
},
None => {},
}
// month
match &self.date.month {
Some(opt) => {
let now = get_u16_from_u32(next.month()).unwrap();
match self.check_thing(opt.to_string(), now, 12) {
Some(r) => {
if r > now {
next = match Local::with_ymd_and_hms(&Local, next.year(), u32::from(r), 1, 0, 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None,
}
}
},
None => {
next = match Local::with_ymd_and_hms(&Local, next.year()+1, 1, 1, 0, 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None
};
if (next - Local::now()).num_days() < 2*365 {
return self.find_next_trigger(next);
}
else {
return None;
}
}
}
},
None => {},
}
// date
match &self.date.date {
Some(opt) => {
let now = get_u16_from_u32(next.day()).unwrap();
match self.check_thing(opt.to_string(), now, Datetime::get_days_in_month(&next)) {
Some(r) => {
if r > now {
next = match Local::with_ymd_and_hms(&Local, next.year(), next.month(), u32::from(r), 0, 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None,
}
}
},
None => {
let m = match next.month() {
12 => 1,
munth => munth
};
let y = match m {
1 => next.year()+1,
_ => next.year()
};
next = match Local::with_ymd_and_hms(&Local, y, m, 1, 0, 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None
};
if (next - Local::now()).num_days() < 2*365 {
return self.find_next_trigger(next);
}
else {
return None;
}
}
}
},
None => {},
}
// hour
match &self.time.hour {
Some(opt) => {
let now = get_u16_from_u32(next.hour()).unwrap();
match self.check_thing(opt.to_string(), now, 23) {
Some(r) => {
if r > now {
next = match Local::with_ymd_and_hms(&Local, next.year(), next.month(), next.hour(), u32::from(r), 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None,
}
}
},
None => {
let mut d = next.day() + 1;
if d == u32::from(Datetime::get_days_in_month(&next)) {
d = 1;
}
let m = match d {
1 => next.month()+1,
_ => next.month()
};
let y = match m {
1 => next.year()+1,
_ => next.year()
};
next = match Local::with_ymd_and_hms(&Local, y, m, d, 0, 0, 0) {
chrono::offset::LocalResult::Single(d) => d,
chrono::offset::LocalResult::Ambiguous(d, _) => d,
chrono::offset::LocalResult::None => return None
};
if (next - Local::now()).num_days() < 2*365 {
return self.find_next_trigger(next);
}
else {
return None;
}
}
}
},
None => {},
}
Some(next)
}
}
impl TriggerBase for Datetime {
fn get_next_trigger(&self) -> String {
let next_trigger: Option<DateTime<Local>> = self.find_next_trigger(Local::now());
match next_trigger {
Some(next) => {
let str = next.format("%S").to_string();
print!("datetime string: {str}");
return str
},
None => {
print!("ERROR: trigger.datetime.get_next: No next trigger found");
return "None".to_string();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn datetime_first_match() {
assert_eq!(Datetime::first_match("*".to_string(), 123, 128).unwrap(), Some(123));
assert_eq!(Datetime::first_match("*/4".to_string(), 123, 128).unwrap(), Some(124));
assert_eq!(Datetime::first_match("*/10".to_string(), 123, 128).unwrap(), None);
assert_eq!(Datetime::first_match("125".to_string(), 123, 128).unwrap(), Some(125));
assert_eq!(Datetime::first_match("122,126".to_string(), 123, 128).unwrap(), Some(126));
assert_eq!(Datetime::first_match("122,129".to_string(), 123, 128).unwrap(), None);
}
}
}

80
src/automation/json.rs Normal file
View File

@ -0,0 +1,80 @@
pub mod json_parser {
pub enum Error {
Null,
InvalidType,
ConvertionFaild,
JsonParseError(String)
}
impl Error {
pub fn to_string(&self) -> String {
match self {
Error::Null => String::from("path not found"),
Error::InvalidType => String::from("invalid type"),
Error::ConvertionFaild => String::from("type convertion faild"),
Error::JsonParseError(s) => s.to_string(),
}
}
}
pub enum Json {
Value(json::JsonValue),
Text(String)
}
pub fn get_value(value: json::JsonValue, mut path: Vec<String>) -> json::JsonValue {
if path.len() == 0 {
return value
}
match value {
json::JsonValue::Object(obj) => {
let key = path[0].clone();
path.remove(0);
get_value(obj[key].clone(), path)
},
json::JsonValue::Array(a) => {
let key = path[0].clone();
match key.parse::<usize>() {
Ok(i) => {
if i < a.len() {
get_value(a[i].clone(), path)
} else {
json::JsonValue::Null
}
},
Err(_) => json::JsonValue::Null
}
},
json::JsonValue::String(_) => json::JsonValue::Null,
json::JsonValue::Short(_) => json::JsonValue::Null,
json::JsonValue::Number(_) => json::JsonValue::Null,
json::JsonValue::Boolean(_) => json::JsonValue::Null,
json::JsonValue::Null => json::JsonValue::Null,
}
}
pub fn get_u32(data: Json, path: Vec<String>) -> Result<u32,Error> {
match data {
Json::Value(value) => match get_value(value, path) {
json::JsonValue::Object(_) => Err(Error::InvalidType),
json::JsonValue::Array(_) => Err(Error::InvalidType),
json::JsonValue::String(_) => Err(Error::InvalidType),
json::JsonValue::Short(_) => Err(Error::InvalidType),
json::JsonValue::Number(num) => {
match u32::try_from(num) {
Err(_) => Err(Error::ConvertionFaild),
Ok(n) => Ok(n)
}
},
json::JsonValue::Boolean(_) => Err(Error::InvalidType),
json::JsonValue::Null => Err(Error::Null),
}
Json::Text(data) => match json::parse(&data) {
Err(e) => {
Err(Error::JsonParseError(e.to_string()))
},
Ok(value) => get_u32(Json::Value(value), path)
}
}
}
}

376
src/automation/mod.rs Normal file
View File

@ -0,0 +1,376 @@
mod json;
//AUTO mod auto;
use std::{thread, time::Duration};
use serde::Deserialize;
use mqtt_client::{MqttMessage, MqttEvent, Sender, Receiver, QoS};
use mqtt_client::mqtt_client;
use crate::automation::json::json_parser;
//AUTO use crate::automation::auto::automation::{Automation as AutoConfig, Generator};
#[derive(Deserialize)]
pub struct SettingsConf {
base_topic: String,
alarm_hour: u8,
alarm_minute: u8,
//AUTO auto: Vec<AutoConfig>
}
struct PingStats {
total: u16,
fails: u16,
avg: f32
}
fn ping_stat_update(stats: &mut PingStats, payload: String) -> Option<f32> {
let mut new_value: Option<f32> = None;
if !payload.eq("false") {
match payload.parse::<f32>() {
Ok(value) => new_value = Some(value),
Err(_) => {}
}
}
stats.total += 1;
match new_value {
Some(value) => stats.avg += (value - stats.avg) / f32::from(stats.total - stats.fails),
None => stats.fails += 1
}
new_value
}
pub struct Automation {
tx: Sender<MqttMessage>,
client: mqtt_client::Client,
clock_dow: u8,
clock_hour: u8,
clock_min: u8,
clock_sec: u8,
elfdesktop_last_on: u32,
elfdesktop_state: bool,
ping_elfdesktop: PingStats,
config: SettingsConf,
//AUTO generator: Generator
}
impl Automation {
fn tx(&self, message: MqttMessage) {
match self.tx.send(message) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n),
Ok(_n) => {}
}
}
fn tx_set(&self, topic: String, state: bool) {
let payload: String;
if state {
payload = String::from("ON");
} else {
payload = String::from("OFF");
}
self.tx({ MqttMessage {
topic: topic,
payload: payload,
retain: false,
qos: mqtt_client::QoS::AtMostOnce,
}});
}
fn lamp01_set(&self, state: bool) {
self.tx_set(String::from("/kees/devices/lamp-01/set"), state);
}
fn pc_sw_set(&self, state: bool) {
self.tx_set(String::from("/kees/devices/sw-01/set"), state);
}
fn get_current_time(&self) -> u32 {
self.get_time(None, None, None, None)
}
fn get_time(&self, dow: Option<u8>, hour: Option<u8>, min: Option<u8>, sec: Option<u8>) -> u32 {
let d = match dow {
Some(n) => n,
None => self.clock_dow
};
let h = match hour {
Some(n) => n,
None => self.clock_hour
};
let m = match min {
Some(n) => n,
None => self.clock_min
};
let s = match sec {
Some(n) => n,
None => self.clock_sec
};
((u32::from(d) * 24 + u32::from(h)) * 60 + u32::from(m)) * 60 + u32::from(s)
}
fn alarm_trigger(&self) {
self.lamp01_set(true);
}
fn alarm_send_config(&self) {
self.tx({ MqttMessage {
topic: format!("{}alarm", self.config.base_topic),
payload: format!("{}:{}", self.config.alarm_hour, self.config.alarm_minute),
retain: true,
qos: mqtt_client::QoS::AtMostOnce,
}});
}
fn config_message_in(&mut self, message: MqttMessage, topic: String) {
if topic.starts_with("alarm/set") {
let mut time = message.payload.split(':');
if time.clone().count() != 2 && time.clone().count() != 3 {
println!("ERROR: config_message_in: alarm/set has invalid payload. incorect number of slices ({})", message.payload)
}
else {
let hour_str = time.next().unwrap();
let min_str = time.next().unwrap();
let mut hour: Option<u8> = None;
let mut min: Option<u8> = None;
match hour_str.parse::<u8>() {
Err(_) =>
println!("ERROR: config_message_in: alarm/set has invalid payload. hour is NaN ({})", message.payload),
Ok(n) => hour = Some(n)
}
match min_str.parse::<u8>() {
Err(_) =>
println!("ERROR: config_message_in: alarm/set has invalid payload. min is NaN ({})", message.payload),
Ok(n) => min = Some(n)
}
if hour != None && min != None {
self.config.alarm_hour = hour.unwrap();
self.config.alarm_minute = min.unwrap();
}
}
self.alarm_send_config();
}
}
fn clock_message_in(&mut self, message: MqttMessage) {
if message.topic.eq("clock/time/second") {
match message.payload.parse::<u8>() {
Err(e) =>
println!("ERROR: clock_message_in: clock/time/second has invalid payload ({:?})", e),
Ok(n) => {
self.clock_sec = n;
if n % 20 == 0 {
let json = format!(
"{{ \"deviceID\": 0, \"ping\": {}, \"pong\": {} }}",
self.ping_elfdesktop.avg,
1.0 - (f32::from(self.ping_elfdesktop.fails) / f32::from(self.ping_elfdesktop.total))
);
self.tx({ MqttMessage {
topic: String::from("/kees/db/insert/ping"),
payload: json,
retain: false,
qos: mqtt_client::QoS::AtMostOnce,
}});
self.ping_elfdesktop.total = 0;
self.ping_elfdesktop.fails = 0;
self.ping_elfdesktop.avg = 0.0;
}
}
}
}
else if message.topic.eq("clock/time/minute") {
match message.payload.parse::<u8>() {
Err(e) =>
println!("ERROR: clock_message_in: clock/time/minute has invalid payload ({:?})", e),
Ok(n) => {
self.clock_min = n;
if n == self.config.alarm_minute
&& n != 0
&& self.clock_hour == self.config.alarm_hour
&& self.clock_dow < 5 {
self.alarm_trigger();
}
}
}
}
else if message.topic.eq("clock/time/hour") {
match message.payload.parse::<u8>() {
Err(e) =>
println!("ERROR: clock_message_in: clock/time/hour has invalid payload ({:?})", e),
Ok(n) => {
self.clock_hour = n;
self.alarm_send_config();
if self.config.alarm_minute == 0 && n == self.config.alarm_hour && self.clock_dow < 5 {
self.alarm_trigger();
}
}
}
}
else if message.topic.eq("clock/date/dow") {
match message.payload.parse::<u8>() {
Err(e) =>
println!("ERROR: clock_message_in: clock/date/dow has invalid payload ({:?})", e),
Ok(n) => self.clock_dow = n
}
}
// println!("DEBUG: clock_message_in: current time: {}:{}:{}", self.clock_hour, self.clock_min, self.clock_sec);
}
fn message_in(&mut self, message: MqttMessage) {
// println!("DEBUG : mqtt_automation: {}: {}", message.topic, message.payload);
if message.topic.starts_with(&self.config.base_topic) {
let topic = message.topic[self.config.base_topic.len()..].to_string();
self.config_message_in(message, topic);
}
else if message.topic.starts_with("clock/") {
self.clock_message_in(message);
}
else if message.topic.eq("/kees/devices/KNMITemp/values") {
let payload_json = json_parser::Json::Text(message.payload);
let path = Vec::from([String::from("gr")]);
match json_parser::get_u32(payload_json, path) {
Ok(gr) => {
if gr > 30 {
let alarm_time =
self.get_time(None, Some(self.config.alarm_hour), Some(self.config.alarm_minute), Some(0));
let time_30min =
self.get_time(Some(0), Some(0), Some(29), Some(0));
let time_diff: i64 = i64::from(self.get_current_time()) - i64::from(alarm_time);
if self.clock_dow >= 5 || time_diff < 0 || time_diff > i64::from(time_30min) {
self.lamp01_set(false);
}
}
},
Err(e) =>
print!("ERROR: mqtt_automation: KNMITemp: {}", e.to_string())
}
}
else if message.topic.eq("/kees/ping/elfdesktop") {
let now: u32 = self.get_current_time();
let ping = ping_stat_update(&mut self.ping_elfdesktop, message.payload);
match ping {
None => {
if self.elfdesktop_last_on < now - 3 {
if self.elfdesktop_state {
self.pc_sw_set(false);
self.elfdesktop_state = false;
}
}
},
Some(_) => {
self.elfdesktop_last_on = now;
if !self.elfdesktop_state {
self.pc_sw_set(true);
self.elfdesktop_state = true;
}
}
}
}
}
fn init(&self) {
println!("DEBUG: init");
match self.client.subscribe("clock/time/#", QoS::AtMostOnce) {
Err(e) =>
println!("ERROR: main: faild to subscribe to clock/time/hour ({})", e),
Ok(_) => {}
}
match self.client.subscribe("clock/date/dow", QoS::AtMostOnce) {
Err(e) =>
println!("ERROR: main: faild to subscribe to clock/date/dow ({})", e),
Ok(_) => {}
}
match self.client.subscribe("/kees/automation/#", QoS::AtMostOnce) {
Err(e) =>
println!("ERROR: main: faild to subscribe to automation/alarm/# ({})", e),
Ok(_) => {}
}
match self.client.subscribe("/kees/devices/KNMITemp/values", QoS::AtMostOnce) {
Err(e) =>
println!("ERROR: main: faild to subscribe to KNMITemp/values ({})", e),
Ok(_) => {}
}
match self.client.subscribe("/kees/ping/#", QoS::AtMostOnce) {
Err(e) =>
println!("ERROR: main: faild to subscribe to KNMITemp/values ({})", e),
Ok(_) => {}
}
self.alarm_send_config();
}
}
impl mqtt_client::MqttTool<SettingsConf> for Automation {
fn new(tx: Sender<MqttMessage>, config: SettingsConf, client: mqtt_client::Client) -> Automation {
Automation {
tx,
client,
clock_dow: u8::MAX,
clock_hour: u8::MAX,
clock_min: u8::MAX,
clock_sec: u8::MAX,
elfdesktop_last_on: u32::MAX,
elfdesktop_state: false,
ping_elfdesktop: { PingStats { total: 0, fails: 0, avg: 0.0 } },
config: config,
//AUTO generator: Generator::new()
}
}
fn run(&mut self, rx: Receiver<MqttEvent>) {
loop {
let message = rx.recv();
match message {
Err(e) => {
println!("ERROR: mqttAutomation: failed to receve an message ({})", e);
thread::sleep(Duration::from_millis(500));
},
Ok(event) => {
match event {
MqttEvent::Connected => {
self.init();
},
MqttEvent::Disconnected => {},
MqttEvent::Message(message) => {
self.message_in(message);
},
}
}
}
}
}
}

View File

@ -1,160 +1,10 @@
use std::time::Duration;
use std::thread;
use std::fs; use std::fs;
use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS};
use crossbeam::channel::{unbounded, Sender};
use serde::Deserialize; use serde::Deserialize;
struct MqttMessage { use mqtt_client::mqtt_client;
topic: String,
payload: String
}
mod json_parser {
pub enum Error {
Null,
InvalidType,
ConvertionFaild,
JsonParseError(String)
}
impl Error {
pub fn to_string(&self) -> String {
match self {
Error::Null => String::from("path not found"),
Error::InvalidType => String::from("invalid type"),
Error::ConvertionFaild => String::from("type convertion faild"),
Error::JsonParseError(s) => s.to_string(),
}
}
}
pub enum Json {
Value(json::JsonValue),
Text(String)
}
pub fn get_value(value: json::JsonValue, mut path: Vec<String>) -> json::JsonValue {
if path.len() == 0 {
return value
}
match value {
json::JsonValue::Object(obj) => {
let key = path[0].clone();
path.remove(0);
get_value(obj[key].clone(), path)
},
json::JsonValue::Array(a) => {
let key = path[0].clone();
match key.parse::<usize>() {
Ok(i) => {
if i < a.len() {
get_value(a[i].clone(), path)
} else {
json::JsonValue::Null
}
},
Err(_) => json::JsonValue::Null
}
},
json::JsonValue::String(_) => json::JsonValue::Null,
json::JsonValue::Short(_) => json::JsonValue::Null,
json::JsonValue::Number(_) => json::JsonValue::Null,
json::JsonValue::Boolean(_) => json::JsonValue::Null,
json::JsonValue::Null => json::JsonValue::Null,
}
}
pub fn get_u32(data: Json, path: Vec<String>) -> Result<u32,Error> {
match data {
Json::Value(value) => match get_value(value, path) {
json::JsonValue::Object(_) => Err(Error::InvalidType),
json::JsonValue::Array(_) => Err(Error::InvalidType),
json::JsonValue::String(_) => Err(Error::InvalidType),
json::JsonValue::Short(_) => Err(Error::InvalidType),
json::JsonValue::Number(num) => {
match u32::try_from(num) {
Err(_) => Err(Error::ConvertionFaild),
Ok(n) => Ok(n)
}
},
json::JsonValue::Boolean(_) => Err(Error::InvalidType),
json::JsonValue::Null => Err(Error::Null),
}
Json::Text(data) => match json::parse(&data) {
Err(e) => {
Err(Error::JsonParseError(e.to_string()))
},
Ok(value) => get_u32(Json::Value(value), path)
}
}
}
}
struct Automation {
publish: Sender<MqttMessage>,
clock_dow: u8
}
impl Automation {
fn new(publish: Sender<MqttMessage>) -> Automation {
Automation {
publish,
clock_dow: u8::MAX
}
}
pub(self) fn tx(&self, message: MqttMessage) {
match self.publish.send(message) {
Err(n) => println!("ERROR: faild to send publish ({:?})", n),
Ok(_n) => {}
}
}
pub(self) fn lamp01_set(&self, state: bool) {
let payload: String;
if state {
payload = String::from("ON");
} else {
payload = String::from("OFF");
}
self.tx({ MqttMessage {
topic: String::from("/cool/devices/lamp-01/set"),
payload: payload
}});
}
fn rx(&mut self, message: MqttMessage) {
println!("INFO : mqtt_automation: {}: {}", message.topic, message.payload);
if message.topic.eq("clock/hour") {
if message.payload.eq("7") && (self.clock_dow >= 1 && self.clock_dow <= 7) {
self.lamp01_set(true);
}
} else if message.topic.eq("/cool/devices/KNMITemp/values") {
match json_parser::get_u32(json_parser::Json::Text(message.payload), Vec::from([String::from("gr")])) {
Ok(gr) => {
if gr > 30 {
self.lamp01_set(false);
}
},
Err(e) => print!("ERROR: mqtt_automation: KNMITemp: {}", e.to_string())
}
}
else if message.topic.eq("clock/dow") {
match message.payload.parse::<u8>() {
Err(e) => println!("ERROR: mqtt_automation: clock/dow has invalid payload ({:?})", e),
Ok(n) => self.clock_dow = n
}
}
}
}
mod automation;
use automation::{Automation, SettingsConf};
#[derive(Deserialize)] #[derive(Deserialize)]
struct SettingsMQTT { struct SettingsMQTT {
@ -164,12 +14,11 @@ struct SettingsMQTT {
user: String, user: String,
pass: String pass: String
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct Settings { struct Settings {
mqtt: SettingsMQTT mqtt: SettingsMQTT,
automation: SettingsConf
} }
enum SettingsError { enum SettingsError {
ReadError, ReadError,
SyntaxError SyntaxError
@ -184,7 +33,8 @@ fn read_config() -> Result<Settings,SettingsError> {
Err(_) => { Err(_) => {
println!("INFO : read_config: could not find '~/.config/mqttAutomation.yml'. try '/etc/mqttAutomation.yml"); println!("INFO : read_config: could not find '~/.config/mqttAutomation.yml'. try '/etc/mqttAutomation.yml");
match fs::read_to_string("/etc/mqttAutomation.yml") { match fs::read_to_string("/etc/mqttAutomation.yml") {
Err(_) => println!("ERROR: read_config: could not find any config file"), Err(_) =>
println!("ERROR: read_config: could not find any config file"),
Ok(str) => config_str = str Ok(str) => config_str = str
} }
}, },
@ -207,163 +57,21 @@ fn read_config() -> Result<Settings,SettingsError> {
} }
} }
fn mqtt_hadeler(mut connection: Connection, mqtt_publish: Sender<MqttMessage>) {
let mut automation: Automation = Automation::new(mqtt_publish);
for (_i, notification) in connection.iter().enumerate() {
let mut delay: bool = false;
match notification {
Err(e) => match e {
rumqttc::ConnectionError::MqttState(state) => match state {
rumqttc::StateError::Io(e) => {
println!("ERROR: mqtt: Io ({}) {}", e.kind(), e);
delay = true;
},
rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"),
rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e),
rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"),
rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"),
rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"),
rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"),
rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e),
rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max }
=> println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max),
},
rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"),
rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"),
rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()),
rumqttc::ConnectionError::ConnectionRefused(code) => {
println!("ERROR: mqtt: ConnectionRefused {:?}", code);
delay = true;
},
rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet),
rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"),
rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error),
},
Ok(event) => {
match event {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"),
Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"),
Packet::Publish(msg) => {
let mut payload: String = String::from("");
match String::from_utf8(msg.payload.to_vec()) {
Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e),
Ok(v) => payload = v
};
if payload.len() > 0 {
let message: MqttMessage = { MqttMessage {
topic: msg.topic,
payload: payload
}};
automation.rx(message);
}
},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => println!("INFO : mqtt: disconected"),
}
}
}
}
if delay {
thread::sleep(Duration::from_millis(500));
}
}
}
fn main() { fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
// get setting // get setting
let conf_ok: bool;
let conf: Settings;
match read_config() { match read_config() {
Ok(n) => { Ok(conf) =>
conf = n; mqtt_client::run::<SettingsConf, Automation>(
conf_ok = true; conf.mqtt.host,
}, conf.mqtt.port,
Err(_) => { conf.mqtt.client,
conf = Settings { conf.mqtt.user,
mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()} conf.mqtt.pass,
}; conf.automation
conf_ok = false; ),
} Err(_) => {}
} }
if conf_ok { println!("INFO : main: exit");
let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass);
let (client, connection) = Client::new(mqttoptions, 10);
match client.subscribe("clock/hour", QoS::AtMostOnce) {
Err(e) => println!("ERROR: main: faild to subscribe to clock/hour ({})", e),
Ok(_) => {}
}
match client.subscribe("/cool/devices/KNMITemp/values", QoS::AtMostOnce) {
Err(e) => println!("ERROR: main: faild to subscribe to .../KNMITemp/values ({})", e),
Ok(_) => {}
}
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e),
Ok(msg) => {
println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// thread mqtt
let mqtt = thread::Builder::new()
.name("mqtt".to_string())
.spawn(move || {
mqtt_hadeler(connection, mqtt_publish);
});
match mqtt {
Err(_n) => println!("ERROR: main: fait to create mqtt thread"),
Ok(join) => match join.join() {
Err(_) => println!("ERROR: main: failt to join mqtt thread"),
Ok(_) => {}
}
}
println!("INFO : main: exit");
}
} }