Compare commits
	
		
			7 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 49e8adf2eb | |||
| f51a08fb50 | |||
| f186e43733 | |||
| d2d31b2bc1 | |||
| e7a46b82a5 | |||
| 858e6f2c63 | |||
| c924081133 | 
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -246,7 +246,7 @@ dependencies = [ | ||||
| 
 | ||||
| [[package]] | ||||
| name = "mqtt-client" | ||||
| version = "1.0.0" | ||||
| version = "4.0.0" | ||||
| dependencies = [ | ||||
|  "crossbeam", | ||||
|  "rumqttc", | ||||
|  | ||||
| @ -1,6 +1,6 @@ | ||||
| [package] | ||||
| name = "mqtt-client" | ||||
| version = "1.0.0" | ||||
| version = "4.0.0" | ||||
| edition = "2021" | ||||
| 
 | ||||
| [dependencies] | ||||
|  | ||||
							
								
								
									
										97
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										97
									
								
								src/lib.rs
									
									
									
									
									
								
							| @ -3,21 +3,36 @@ pub mod mqtt_client { | ||||
|     use std::time::Duration; | ||||
|     use std::thread; | ||||
| 
 | ||||
|     use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS}; | ||||
|     use crossbeam::channel::{unbounded, Receiver, Sender}; | ||||
|     use rumqttc::{Connection, Event, MqttOptions, Outgoing, Packet}; | ||||
|     use crossbeam::channel::unbounded; | ||||
|     
 | ||||
|     pub use rumqttc::Client; | ||||
|     pub use crossbeam::channel::{Receiver, Sender}; | ||||
|     pub use rumqttc::QoS; | ||||
| 
 | ||||
|     pub trait MqttTool { | ||||
|         fn new(client: Client, tx: Sender<MqttMessage>) -> Self; | ||||
|         fn rx(&mut self, messaage: MqttMessage); | ||||
|     pub trait MqttTool<S: std::marker::Send> { | ||||
|         fn new(tx: Sender<MqttMessage>, config: S, client: Client) -> Self; | ||||
|         fn run(&mut self, event_recever: Receiver<MqttEvent>); | ||||
|     } | ||||
| 
 | ||||
|     pub struct MqttMessage { | ||||
|         pub topic: String, | ||||
|         pub payload: String | ||||
|         pub payload: String, | ||||
|         pub retain: bool, | ||||
|         pub qos: QoS, | ||||
|     } | ||||
| 
 | ||||
|     pub fn run<T: MqttTool>(host: String, port: u16, client: String, user: String, pass: String) { | ||||
|         let (tx, rx) = unbounded::<MqttMessage>(); | ||||
|     pub enum MqttEvent { | ||||
|         Connected, | ||||
|         Disconnected, | ||||
|         Message(MqttMessage) | ||||
|     } | ||||
| 
 | ||||
|     pub fn run<S: std::marker::Send, T: MqttTool<S>>(host: String, port: u16, client: String, user: String, pass: String, config: S) where S: 'static { | ||||
|         let (tx_sender, tx_recever) = unbounded::<MqttMessage>(); | ||||
|         let (event_sender, event_recever) = unbounded::<MqttEvent>(); | ||||
| 
 | ||||
|         println!("INFO : mqtt client: run"); | ||||
| 
 | ||||
|         let mut mqttoptions = MqttOptions::new(client, host, port); | ||||
|         mqttoptions.set_keep_alive(Duration::from_secs(5)); | ||||
| @ -30,10 +45,10 @@ pub mod mqtt_client { | ||||
|         let publisher = thread::Builder::new() | ||||
|             .name("publisher".to_string()) | ||||
|             .spawn(move || { | ||||
|                 publisher(rx, client_publisher); | ||||
|                 publisher(tx_recever, client_publisher); | ||||
|             }); | ||||
|         match publisher { | ||||
|             Err(_n) => println!("ERROR: main: fait to create publisher thread"), | ||||
|             Err(_n) => println!("ERROR: mqtt client: failed to create publisher thread"), | ||||
|             Ok(_n) => {} | ||||
|         } | ||||
| 
 | ||||
| @ -41,27 +56,43 @@ pub mod mqtt_client { | ||||
|         let mqtt = thread::Builder::new() | ||||
|             .name("mqtt".to_string()) | ||||
|             .spawn(move || { | ||||
|                 let tool = T::new(client, tx); | ||||
|                 handeler::<T>(connection, tool); | ||||
|                 handeler::<S, T>(connection, event_sender); | ||||
|             }); | ||||
|         match mqtt { | ||||
|             Err(_n) => println!("ERROR: main: fait to create mqtt thread"), | ||||
|             Ok(join) => match join.join() { | ||||
|                 Err(_) => println!("ERROR: main: failt to join mqtt thread"), | ||||
|             Err(_n) => println!("ERROR: mqtt client: failed to create mqtt thread"), | ||||
|             Ok(_) => {} | ||||
|         } | ||||
| 
 | ||||
|         // thread tool runner
 | ||||
|         let tool_runner = thread::Builder::new() | ||||
|             .name("tool runner".to_string()) | ||||
|             .spawn(move || { | ||||
|                 let mut tool = T::new(tx_sender, config, client); | ||||
|                 tool.run(event_recever); | ||||
|             }); | ||||
|         match tool_runner { | ||||
|             Err(_n) => println!("ERROR: mqtt client: failed to create publisher thread"), | ||||
|             Ok(runner) => match runner.join() { | ||||
|                 Err(_) => println!("ERROR: mqtt client: failed to join tool runner thread"), | ||||
|                 Ok(_) => {} | ||||
|             } | ||||
|         } | ||||
|         println!("INFO : mqtt client: exit"); | ||||
|     } | ||||
| 
 | ||||
|     pub(self) fn publisher(rx: Receiver<MqttMessage>, client: rumqttc::Client) { | ||||
|         loop { | ||||
|             let message = rx.recv(); | ||||
|             match message { | ||||
|                 Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), | ||||
|                 Err(e) => { | ||||
|                     println!("WARN : publisher: {}", e); | ||||
|                     println!("INFO : publisher: exit, no one can send messages"); | ||||
|                     return | ||||
|                 }, | ||||
|                 Ok(msg) => { | ||||
|                     println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); | ||||
|                     match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { | ||||
|                         Err(_n) => println!("ERROR: publisher: faild to publish"), | ||||
|                     match client.publish(msg.topic, msg.qos, msg.retain, msg.payload) { | ||||
|                         Err(_n) => println!("ERROR: publisher: failed to publish"), | ||||
|                         Ok(_n) => {} | ||||
|                     } | ||||
|                 } | ||||
| @ -69,14 +100,14 @@ pub mod mqtt_client { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub(self) fn handeler<T: MqttTool>(mut connection: Connection, mut tool: T) { | ||||
|     pub(self) fn handeler<S: std::marker::Send,T: MqttTool<S>>(mut connection: Connection, rx: Sender<MqttEvent>) { | ||||
|         for (_i, notification) in connection.iter().enumerate() { | ||||
|             let mut delay: bool = false; | ||||
|             match notification { | ||||
|                 Err(e) => match e { | ||||
|                     rumqttc::ConnectionError::MqttState(state) => match state { | ||||
|                         rumqttc::StateError::Io(e) => { | ||||
|                             println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); | ||||
|                             println!("ERROR: mqtt: State io {}: {}", e.kind(), e); | ||||
|                             delay = true; | ||||
|                         }, | ||||
|                         rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), | ||||
| @ -91,7 +122,10 @@ pub mod mqtt_client { | ||||
|                     }, | ||||
|                     rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), | ||||
|                     rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), | ||||
|                     rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), | ||||
|                     rumqttc::ConnectionError::Io(e) => { | ||||
|                         println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()); | ||||
|                         delay = true; | ||||
|                     }, | ||||
|                     rumqttc::ConnectionError::ConnectionRefused(code) => { | ||||
|                         println!("ERROR: mqtt: ConnectionRefused {:?}", code); | ||||
|                         delay = true; | ||||
| @ -103,7 +137,6 @@ pub mod mqtt_client { | ||||
|                 Ok(event) => { | ||||
|                     match event { | ||||
|                         Event::Outgoing(n) => match n { | ||||
| 
 | ||||
|                             Outgoing::Publish(_n) => {}, //println!("INFO : mqtt_recive: out Publish"),
 | ||||
|                             Outgoing::Subscribe(_n) => {}, //println!("INFO : mqtt_recive: out Subscribe"),
 | ||||
|                             Outgoing::Unsubscribe(_n) => {}, //println!("INFO : mqtt_recive: out Unsubscribe"),
 | ||||
| @ -118,20 +151,28 @@ pub mod mqtt_client { | ||||
|                         }, | ||||
|                         Event::Incoming(n) => match n { | ||||
|                             Packet::Connect(_) => println!("INFO : mqtt: connected"), | ||||
|                             Packet::ConnAck(_) => println!("INFO : mqtt: connaction acknolaged"), | ||||
|                             Packet::ConnAck(_) => { | ||||
|                                 println!("INFO : mqtt: connaction acknolaged"); | ||||
|                                 let _ = rx.send(MqttEvent::Connected); | ||||
|                             }, | ||||
|                             Packet::Publish(msg) => { | ||||
|                                 let mut payload: String = String::from(""); | ||||
|                                 match String::from_utf8(msg.payload.to_vec()) { | ||||
|                                     Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e), | ||||
|                                     Err(e) => println!("ERROR: pqtt_recive: failed to decode payload ({})", e), | ||||
|                                     Ok(v) => payload = v | ||||
|                                 }; | ||||
|                                 if payload.len() > 0 { | ||||
|                                     let message: MqttMessage = { MqttMessage { | ||||
|                                     let message = MqttMessage { | ||||
|                                         topic: msg.topic, | ||||
|                                         payload: payload | ||||
|                                     }}; | ||||
|                                         payload: payload, | ||||
|                                         retain: msg.retain, | ||||
|                                         qos: msg.qos, | ||||
|                                     }; | ||||
|                     
 | ||||
|                                     tool.rx(message); | ||||
|                                     match rx.send(MqttEvent::Message(message)) { | ||||
|                                         Err(n) => println!("ERROR: faild to send incomming message to tool ({:?})", n), | ||||
|                                         Ok(_n) => {} | ||||
|                                     } | ||||
|                                 } | ||||
|                             }, | ||||
|                             Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user