pub mod mqtt_client { use std::time::Duration; use std::thread; use rumqttc::{Connection, Event, MqttOptions, Outgoing, Packet}; use crossbeam::channel::unbounded; pub use rumqttc::Client; pub use crossbeam::channel::{Receiver, Sender}; pub use rumqttc::QoS; pub trait MqttTool { fn new(client: Client, tx: Sender, config: S) -> Self; fn run(&mut self, rx: Receiver); } pub struct MqttMessage { pub topic: String, pub payload: String, pub retain: bool, pub qos: QoS, } pub fn run>(host: String, port: u16, client: String, user: String, pass: String, config: S) where S: 'static { let (tx_sender, tx_recever) = unbounded::(); let (rx_sender, rx_recever) = unbounded::(); println!("INFO : mqtt client: run"); let mut mqttoptions = MqttOptions::new(client, host, port); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials(user, pass); let (client, connection) = Client::new(mqttoptions, 10); let client_publisher = client.clone(); // thread publisher let publisher = thread::Builder::new() .name("publisher".to_string()) .spawn(move || { publisher(tx_recever, client_publisher); }); match publisher { Err(_n) => println!("ERROR: mqtt client: failed to create publisher thread"), Ok(_n) => {} } // thread mqtt let mqtt = thread::Builder::new() .name("mqtt".to_string()) .spawn(move || { handeler::(connection, rx_sender); }); match mqtt { Err(_n) => println!("ERROR: mqtt client: failed to create mqtt thread"), Ok(_) => {} } // thread tool runner let tool_runner = thread::Builder::new() .name("tool runner".to_string()) .spawn(move || { let mut tool = T::new(client, tx_sender, config); tool.run(rx_recever); println!("WARN : rool_runner: tool has ended"); }); match tool_runner { Err(_n) => println!("ERROR: mqtt client: failed to create publisher thread"), Ok(runner) => match runner.join() { Err(_) => println!("ERROR: mqtt client: failed to join tool runner thread"), Ok(_) => {} } } println!("INFO : mqtt client: exit"); } pub(self) fn publisher(rx: Receiver, client: rumqttc::Client) { loop { let message = rx.recv(); match message { Err(e) => { println!("WARN : publisher: {}", e); println!("INFO : publisher: exit, no one can send messages"); return }, Ok(msg) => { println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); match client.publish(msg.topic, msg.qos, msg.retain, msg.payload) { Err(_n) => println!("ERROR: publisher: failed to publish"), Ok(_n) => {} } } } } } pub(self) fn handeler>(mut connection: Connection, rx: Sender) { for (_i, notification) in connection.iter().enumerate() { let mut delay: bool = false; match notification { Err(e) => match e { rumqttc::ConnectionError::MqttState(state) => match state { rumqttc::StateError::Io(e) => { println!("ERROR: mqtt: State io {}: {}", e.kind(), e); delay = true; }, rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), rumqttc::StateError::Unsolicited(e) => println!("ERROR: mqtt: Unsolicited {}", e), rumqttc::StateError::AwaitPingResp => println!("ERROR: mqtt: AwaitPingResp"), rumqttc::StateError::WrongPacket => println!("ERROR: mqtt: WrongPacket"), rumqttc::StateError::CollisionTimeout => println!("ERROR: mqtt: CollisionTimeout"), rumqttc::StateError::EmptySubscription => println!("ERROR: mqtt: EmptySubscription"), rumqttc::StateError::Deserialization(e) => println!("ERROR: mqtt: Deserialization {}", e), rumqttc::StateError::OutgoingPacketTooLarge { pkt_size: size, max } => println!("ERROR: mqtt: OutgoingPacketTooLarge, packet is {}; max is {}", size, max), }, rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), rumqttc::ConnectionError::Io(e) => { println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()); delay = true; }, rumqttc::ConnectionError::ConnectionRefused(code) => { println!("ERROR: mqtt: ConnectionRefused {:?}", code); delay = true; }, rumqttc::ConnectionError::NotConnAck(packet) => println!("ERROR: mqtt: NotConnAck {:?}", packet), rumqttc::ConnectionError::RequestsDone => println!("ERROR: mqtt: RequestsDone"), rumqttc::ConnectionError::Tls(error) => println!("ERROR: mqtt: Tls {}", error), }, Ok(event) => { match event { Event::Outgoing(n) => match n { Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"), Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"), Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"), Outgoing::PubAck(_n) => {}, //println!("INFO : mqtt_recive: out PubAck"), Outgoing::PubRec(_n) => {}, //println!("INFO : mqtt_recive: out PubRec"), Outgoing::PubRel(_n) => {}, //println!("INFO : mqtt_recive: out PubRel"), Outgoing::PubComp(_n) => {}, //println!("INFO : mqtt_recive: out PubComp"), Outgoing::PingReq => {}, //println!("INFO : mqtt_recive: out PingReq"), Outgoing::PingResp => {}, //println!("INFO : mqtt_recive: out PingResp"), Outgoing::Disconnect => {}, //println!("INFO : mqtt_recive: out Disconnect"), Outgoing::AwaitAck(_n) => {} //println!("INFO : mqtt_recive: out AwaitAck") }, Event::Incoming(n) => match n { Packet::Connect(_) => println!("INFO : mqtt: connected"), Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), Packet::Publish(msg) => { let mut payload: String = String::from(""); match String::from_utf8(msg.payload.to_vec()) { Err(e) => println!("ERROR: pqtt_recive: failed to decode payload ({})", e), Ok(v) => payload = v }; if payload.len() > 0 { let message = MqttMessage { topic: msg.topic, payload: payload, retain: msg.retain, qos: msg.qos, }; match rx.send(message) { Err(n) => println!("ERROR: faild to send incomming message to tool ({:?})", n), Ok(_n) => {} } } }, Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"), Packet::PubRec(_) => {}, //println!("INFO : mqtt_recive: in PubRec"), Packet::PubRel(_) => {}, //println!("INFO : mqtt_recive: in PubRel"), Packet::PubComp(_) => {}, //println!("INFO : mqtt_recive: in PubComp"), Packet::Subscribe(_) => {}, //println!("INFO : mqtt_recive: in Subscribe"), Packet::SubAck(_) => {}, //println!("INFO : mqtt_recive: in SubAck"), Packet::Unsubscribe(_) => {}, //println!("INFO : mqtt_recive: in Unsubscribe"), Packet::UnsubAck(_) => {}, //println!("INFO : mqtt_recive: in UnsubAck"), Packet::PingReq => {}, //println!("INFO : mqtt_recive: in PingReq"), Packet::PingResp => {}, //println!("INFO : mqtt_recive: in PingResp"), Packet::Disconnect => println!("INFO : mqtt: disconected"), } } } } if delay { thread::sleep(Duration::from_millis(500)); } } } }