fix double double client bug

This commit is contained in:
Laila van Reenen 2025-01-25 14:07:53 +01:00
parent 132e774ebf
commit 92699cb37f
Signed by: LailaTheElf
GPG Key ID: 8A3EF0226518C12D
4 changed files with 39 additions and 7 deletions

2
Cargo.lock generated
View File

@ -342,7 +342,7 @@ dependencies = [
[[package]] [[package]]
name = "mqttClock" name = "mqttClock"
version = "1.0.0" version = "1.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"crossbeam", "crossbeam",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "mqttClock" name = "mqttClock"
version = "1.0.0" version = "1.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -1,5 +1,6 @@
mqtt: mqtt:
host: "localhost" host: "localhost"
port: 1883 port: 1883
client: "mqttClock"
user: "mqttClock" user: "mqttClock"
pass: "password" pass: "password"

View File

@ -104,6 +104,7 @@ fn mqtt_clock(publish: Sender<MqttMessage>) {
struct SettingsMQTT { struct SettingsMQTT {
host: String, host: String,
port: u16, port: u16,
client: String,
user: String, user: String,
pass: String pass: String
} }
@ -163,14 +164,14 @@ fn main() {
}, },
Err(_) => { Err(_) => {
conf = Settings { conf = Settings {
mqtt: SettingsMQTT {host:String::new(),port:0,user:String::new(),pass:String::new()} mqtt: SettingsMQTT {host:String::new(),port:0,client:String::new(),user:String::new(),pass:String::new()}
}; };
conf_ok = false; conf_ok = false;
} }
} }
if conf_ok { if conf_ok {
let mut mqttoptions = MqttOptions::new("rumqtt-sync", conf.mqtt.host, conf.mqtt.port); let mut mqttoptions = MqttOptions::new(conf.mqtt.client, conf.mqtt.host, conf.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass); mqttoptions.set_credentials(conf.mqtt.user, conf.mqtt.pass);
let (client, mut connection) = Client::new(mqttoptions, 10); let (client, mut connection) = Client::new(mqttoptions, 10);
@ -210,8 +211,35 @@ fn main() {
} }
for (_i, notification) in connection.iter().enumerate() { for (_i, notification) in connection.iter().enumerate() {
let mut delay: bool = false;
match notification { match notification {
Err(e) => println!("ERROR: mqtt: {}", e), Err(e) => match e {
rumqttc::ConnectionError::MqttState(state) => match state {
rumqttc::StateError::Io(e) => {
println!("ERROR: mqtt: Io ({}) {}", e.kind(), e);
delay = true;
},
rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"),
rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e),
rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"),
rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"),
rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"),
rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"),
rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e),
rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max }
=> println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max),
},
rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"),
rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"),
rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()),
rumqttc::ConnectionError::ConnectionRefused(code) => {
println!("ERROR: mqtt: ConnectionRefused {:?}", code);
delay = true;
},
rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet),
rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"),
rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error),
},
Ok(event) => { Ok(event) => {
match event { match event {
Event::Outgoing(n) => match n { Event::Outgoing(n) => match n {
@ -229,8 +257,8 @@ fn main() {
Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck")
}, },
Event::Incoming(n) => match n { Event::Incoming(n) => match n {
Packet::Connect(_) => println!("INFO : mqtt: connected"), //println!("INFO : mqtt_recive: in Connect"), Packet::Connect(_) => println!("INFO : mqtt: connected"),
Packet::ConnAck(_) => println!("INFO : mqtt: conn ack"), //println!("INFO : mqtt_recive: in ConnAck"), Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"),
Packet::Publish(_) => {}, Packet::Publish(_) => {},
Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"),
@ -247,6 +275,9 @@ fn main() {
} }
} }
} }
if delay {
thread::sleep(Duration::from_millis(500));
}
} }
println!("INFO : main: exit"); println!("INFO : main: exit");
} }