diff --git a/Cargo.lock b/Cargo.lock index f83d931..c7a0d6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "flume" version = "0.11.1" @@ -231,6 +237,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -254,6 +266,22 @@ dependencies = [ "cc", ] +[[package]] +name = "indexmap" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "itoa" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" + [[package]] name = "js-sys" version = "0.3.77" @@ -319,6 +347,8 @@ dependencies = [ "chrono", "crossbeam", "rumqttc", + "serde", + "serde_yaml", ] [[package]] @@ -479,6 +509,12 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + [[package]] name = "schannel" version = "0.1.27" @@ -517,6 +553,39 @@ dependencies = [ "libc", ] +[[package]] +name = "serde" +version = "1.0.217" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.217" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "shlex" version = "1.3.0" @@ -632,6 +701,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 2780daf..9025740 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,5 @@ edition = "2021" chrono = "0.4.39" crossbeam = "0.8.4" rumqttc = "0.24.0" +serde = { version = "1.0.217", features = ["derive"] } +serde_yaml = "0.9.34" diff --git a/mqttClock.yml b/mqttClock.yml new file mode 100644 index 0000000..ee73005 --- /dev/null +++ b/mqttClock.yml @@ -0,0 +1,5 @@ +mqtt: + host: "localhost" + port: 1883 + user: "mqttClock" + pass: "password" diff --git a/src/main.rs b/src/main.rs index 11eac12..de5c5fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,11 @@ use std::time::Duration; use std::thread; +use std::fs; -use rumqttc::{Client, MqttOptions, QoS}; +use rumqttc::{Client, Event, MqttOptions, Outgoing, Packet, QoS}; use chrono::{Local, Timelike, Datelike}; use crossbeam::channel::{unbounded, Sender}; +use serde::Deserialize; struct MqttMessage { topic: String, @@ -97,50 +99,155 @@ fn mqtt_clock(publish: Sender) { } } + +#[derive(Deserialize)] +struct SettingsMQTT { + host: String, + port: u16, + user: String, + pass: String +} + +#[derive(Deserialize)] +struct Settings { + mqtt: SettingsMQTT +} + +enum SettingsError { + ReadError, + SyntaxError +} + +fn read_config() -> Result { + let mut config_str: String = String::from("error"); + match fs::read_to_string("./mqttClock.yml") { + Err(_) => { + println!("INFO : read_config: could not find './mqttClock.yml'. try '~/.config/mqttClock.yml"); + match fs::read_to_string("~/.config/mqttClock.yml") { + Err(_) => { + println!("INFO : read_config: could not find '~/.config/mqttClock.yml'. try '/etc/mqttClock.yml"); + match fs::read_to_string("/etc/mqttClock.yml") { + Err(_) => println!("ERROR: read_config: could not find any config file"), + Ok(str) => config_str = str + } + }, + Ok(str) => config_str = str + } + }, + Ok(str) => config_str = str + } + + if config_str.eq("error") { + Err::(SettingsError::ReadError) + } else { + match serde_yaml::from_str::(&config_str) { + Ok(n) => Ok(n), + Err(e) => { + println!("ERROR: read_config: syntax error: {:?}", e); + Err(SettingsError::SyntaxError) + } + } + } +} + fn main() { let (mqtt_publish, mqtt_publish_rx) = unbounded::(); - let mut mqttoptions = MqttOptions::new("rumqtt-sync", "10.1.2.2", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_credentials("rustClient", "test"); - let (client, mut connection) = Client::new(mqttoptions, 10); + // get setting + let conf_ok: bool; + let conf: Settings; + match read_config() { + Ok(n) => { + conf = n; + conf_ok = true; + }, + Err(_) => { + conf = Settings { + mqtt: SettingsMQTT {host:String::new(),port:0,user:String::new(),pass:String::new()} + }; + conf_ok = false; + } + } - // treath publisher - let publisher = thread::Builder::new() - .name("publisher".to_string()) - .spawn(move || { - loop { - let message = mqtt_publish_rx.recv(); - match message { - Err(_err) => println!("ERROR: publisher: faild to receve an message"), - Ok(msg) => { - println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload); - match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { - Err(_n) => println!("ERROR: publisher: faild to publish"), - Ok(_n) => {} + if conf_ok { + let mut mqttoptions = MqttOptions::new("rumqtt-sync", conf.mqtt.host, conf.mqtt.port); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); + let (client, mut connection) = Client::new(mqttoptions, 10); + + // thread publisher + let publisher = thread::Builder::new() + .name("publisher".to_string()) + .spawn(move || { + loop { + let message = mqtt_publish_rx.recv(); + match message { + Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), + Ok(msg) => { + // println!("DEBUG: publisher: topic={}; payload={}", msg.topic, msg.payload); + match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { + Err(_n) => println!("ERROR: publisher: faild to publish"), + Ok(_n) => {} + } + } + } + } + }); + match publisher { + Err(_n) => println!("ERROR: main: fait to create publisher thread"), + Ok(_n) => {} + } + + // treath mqtt clock + let mqtt_clock_stat = thread::Builder::new() + .name("mqtt_clock".to_string()) + .spawn(move || { + mqtt_clock(mqtt_publish); + }); + match mqtt_clock_stat { + Err(_n) => println!("ERROR: main: faild to start mqtt clock thread"), + Ok(_n) => {} + } + + for (_i, notification) in connection.iter().enumerate() { + match notification { + Err(e) => println!("ERROR: mqtt: {}", e), + Ok(event) => { + match event { + Event::Outgoing(n) => match n { + + Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), + Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), + Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), + Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), + Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), + Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), + Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), + Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), + Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), + Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), + Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") + }, + Event::Incoming(n) => match n { + Packet::Connect(_) => println!("INFO : mqtt: connected"), //println!("INFO : mqtt_recive: in Connect"), + Packet::ConnAck(_) => println!("INFO : mqtt: conn ack"), //println!("INFO : mqtt_recive: in ConnAck"), + Packet::Publish(_) => {}, + Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), + Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), + Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), + Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), + Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), + Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), + Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), + Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), + Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), + Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), + Packet::Disconnect => println!("INFO : mqtt: disconected"), } } } } - }); - match publisher { - Err(_n) => println!("ERROR: main: fait to create publisher thread"), - Ok(_n) => {} - } - - // treath mqtt clock - let mqtt_publish_clock = mqtt_publish.clone(); - let mqtt_clock_stat = thread::Builder::new() - .name("mqtt_clock".to_string()) - .spawn(move || { - mqtt_clock(mqtt_publish_clock); - }); - match mqtt_clock_stat { - Err(_n) => println!("ERROR: mqtt clock: faild to start thread"), - Ok(_n) => {} - } - - loop { - let _ = connection.iter().enumerate(); + } + println!("INFO : main: exit"); } }