basic functionality

This commit is contained in:
Laila van Reenen 2025-01-22 20:03:51 +01:00
parent 8bf5648398
commit bf5da9739f
Signed by: LailaTheElf
GPG Key ID: 8A3EF0226518C12D
4 changed files with 227 additions and 38 deletions

75
Cargo.lock generated
View File

@ -172,6 +172,12 @@ version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "flume"
version = "0.11.1"
@ -231,6 +237,12 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "iana-time-zone"
version = "0.1.61"
@ -254,6 +266,22 @@ dependencies = [
"cc",
]
[[package]]
name = "indexmap"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652"
dependencies = [
"equivalent",
"hashbrown",
]
[[package]]
name = "itoa"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "js-sys"
version = "0.3.77"
@ -319,6 +347,8 @@ dependencies = [
"chrono",
"crossbeam",
"rumqttc",
"serde",
"serde_yaml",
]
[[package]]
@ -479,6 +509,12 @@ version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4"
[[package]]
name = "ryu"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "schannel"
version = "0.1.27"
@ -517,6 +553,39 @@ dependencies = [
"libc",
]
[[package]]
name = "serde"
version = "1.0.217"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.217"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_yaml"
version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap",
"itoa",
"ryu",
"serde",
"unsafe-libyaml",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -632,6 +701,12 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "unsafe-libyaml"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "untrusted"
version = "0.9.0"

View File

@ -7,3 +7,5 @@ edition = "2021"
chrono = "0.4.39"
crossbeam = "0.8.4"
rumqttc = "0.24.0"
serde = { version = "1.0.217", features = ["derive"] }
serde_yaml = "0.9.34"

5
mqttClock.yml Normal file
View File

@ -0,0 +1,5 @@
mqtt:
host: "localhost"
port: 1883
user: "mqttClock"
pass: "password"

View File

@ -1,9 +1,11 @@
use std::time::Duration;
use std::thread;
use std::fs;
use rumqttc::{Client, MqttOptions, QoS};
use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS};
use chrono::{Local, Timelike, Datelike};
use crossbeam::channel::{unbounded, Sender};
use serde::Deserialize;
struct MqttMessage {
topic: String,
@ -97,50 +99,155 @@ fn mqtt_clock(publish: Sender<MqttMessage>) {
}
}
#[derive(Deserialize)]
struct SettingsMQTT {
host: String,
port: u16,
user: String,
pass: String
}
#[derive(Deserialize)]
struct Settings {
mqtt: SettingsMQTT
}
enum SettingsError {
ReadError,
SyntaxError
}
fn read_config() -> Result<Settings,SettingsError> {
let mut config_str: String = String::from("error");
match fs::read_to_string("./mqttClock.yml") {
Err(_) => {
println!("INFO : read_config: could not find './mqttClock.yml'. try '~/.config/mqttClock.yml");
match fs::read_to_string("~/.config/mqttClock.yml") {
Err(_) => {
println!("INFO : read_config: could not find '~/.config/mqttClock.yml'. try '/etc/mqttClock.yml");
match fs::read_to_string("/etc/mqttClock.yml") {
Err(_) => println!("ERROR: read_config: could not find any config file"),
Ok(str) => config_str = str
}
},
Ok(str) => config_str = str
}
},
Ok(str) => config_str = str
}
if config_str.eq("error") {
Err::<Settings, SettingsError>(SettingsError::ReadError)
} else {
match serde_yaml::from_str::<Settings>(&config_str) {
Ok(n) => Ok(n),
Err(e) => {
println!("ERROR: read_config: syntax error: {:?}", e);
Err(SettingsError::SyntaxError)
}
}
}
}
fn main() {
let (mqtt_publish, mqtt_publish_rx) = unbounded::<MqttMessage>();
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "10.1.2.2", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials("rustClient", "test");
let (client, mut connection) = Client::new(mqttoptions, 10);
// get setting
let conf_ok: bool;
let conf: Settings;
match read_config() {
Ok(n) => {
conf = n;
conf_ok = true;
},
Err(_) => {
conf = Settings {
mqtt: SettingsMQTT {host:String::new(),port:0,user:String::new(),pass:String::new()}
};
conf_ok = false;
}
}
// treath publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(_err) => println!("ERROR: publisher: faild to receve an message"),
Ok(msg) => {
println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
if conf_ok {
let mut mqttoptions = MqttOptions::new("rumqtt-sync", conf.mqtt.host, conf.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass);
let (client, mut connection) = Client::new(mqttoptions, 10);
// thread publisher
let publisher = thread::Builder::new()
.name("publisher".to_string())
.spawn(move || {
loop {
let message = mqtt_publish_rx.recv();
match message {
Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e),
Ok(msg) => {
// println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload);
match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) {
Err(_n) => println!("ERROR: publisher: faild to publish"),
Ok(_n) => {}
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// treath mqtt clock
let mqtt_clock_stat = thread::Builder::new()
.name("mqtt_clock".to_string())
.spawn(move || {
mqtt_clock(mqtt_publish);
});
match mqtt_clock_stat {
Err(_n) => println!("ERROR: main: faild to start mqtt clock thread"),
Ok(_n) => {}
}
for (_i, notification) in connection.iter().enumerate() {
match notification {
Err(e) => println!("ERROR: mqtt: {}", e),
Ok(event) => {
match event {
Event::Outgoing(n) => match n {
Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"),
Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"),
Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"),
Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"),
Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"),
Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"),
Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"),
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
},
Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"), //println!("INFO : mqtt_recive: in Connect"),
Packet::ConnAck(_) => println!("INFO : mqtt: conn ack"), //println!("INFO : mqtt_recive: in ConnAck"),
Packet::Publish(_) => {},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"),
Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"),
Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"),
Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"),
Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"),
Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"),
Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"),
Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"),
Packet::Disconnect => println!("INFO : mqtt: disconected"),
}
}
}
}
});
match publisher {
Err(_n) => println!("ERROR: main: fait to create publisher thread"),
Ok(_n) => {}
}
// treath mqtt clock
let mqtt_publish_clock = mqtt_publish.clone();
let mqtt_clock_stat = thread::Builder::new()
.name("mqtt_clock".to_string())
.spawn(move || {
mqtt_clock(mqtt_publish_clock);
});
match mqtt_clock_stat {
Err(_n) => println!("ERROR: mqtt clock: faild to start thread"),
Ok(_n) => {}
}
loop {
let _ = connection.iter().enumerate();
}
println!("INFO : main: exit");
}
}