diff --git a/Cargo.lock b/Cargo.lock index ff9851c..5758560 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,7 +342,7 @@ dependencies = [ [[package]] name = "mqttClock" -version = "1.0.0" +version = "1.1.0" dependencies = [ "chrono", "crossbeam", diff --git a/Cargo.toml b/Cargo.toml index 6ed7ad3..f761fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqttClock" -version = "1.0.0" +version = "1.1.0" edition = "2021" [dependencies] diff --git a/mqttClock.yml b/mqttClock.yml index ee73005..ebff895 100644 --- a/mqttClock.yml +++ b/mqttClock.yml @@ -1,5 +1,6 @@ mqtt: host: "localhost" port: 1883 + client: "mqttClock" user: "mqttClock" pass: "password" diff --git a/src/main.rs b/src/main.rs index de5c5fe..2540723 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,6 +104,7 @@ fn mqtt_clock(publish: Sender) { struct SettingsMQTT { host: String, port: u16, + client: String, user: String, pass: String } @@ -163,14 +164,14 @@ fn main() { }, Err(_) => { conf = Settings { - mqtt: SettingsMQTT {host:String::new(),port:0,user:String::new(),pass:String::new()} + mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()} }; conf_ok = false; } } if conf_ok { - let mut mqttoptions = MqttOptions::new("rumqtt-sync", conf.mqtt.host, conf.mqtt.port); + let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); let (client, mut connection) = Client::new(mqttoptions, 10); @@ -210,8 +211,35 @@ fn main() { } for (_i, notification) in connection.iter().enumerate() { + let mut delay: bool = false; match notification { - Err(e) => println!("ERROR: mqtt: {}", e), + Err(e) => match e { + rumqttc::ConnectionError::MqttState(state) => match state { + rumqttc::StateError::Io(e) => { + println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); + delay = true; + }, + rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), + rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), + rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), + rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), + rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), + rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), + rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), + rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } + => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), + }, + rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), + rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), + rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), + rumqttc::ConnectionError::ConnectionRefused(code) => { + println!("ERROR: mqtt: ConnectionRefused {:?}", code); + delay = true; + }, + rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), + rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), + rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), + }, Ok(event) => { match event { Event::Outgoing(n) => match n { @@ -229,8 +257,8 @@ fn main() { Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") }, Event::Incoming(n) => match n { - Packet::Connect(_) => println!("INFO : mqtt: connected"), //println!("INFO : mqtt_recive: in Connect"), - Packet::ConnAck(_) => println!("INFO : mqtt: conn ack"), //println!("INFO : mqtt_recive: in ConnAck"), + Packet::Connect(_) => println!("INFO : mqtt: connected"), + Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), Packet::Publish(_) => {}, Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), @@ -247,6 +275,9 @@ fn main() { } } } + if delay { + thread::sleep(Duration::from_millis(500)); + } } println!("INFO : main: exit"); }