add retain and qos to message and use crossbeam channel for subscriptions
This commit is contained in:
		
							parent
							
								
									f0b3431145
								
							
						
					
					
						commit
						c924081133
					
				
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -246,7 +246,7 @@ dependencies = [ | |||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "mqtt-client" | name = "mqtt-client" | ||||||
| version = "1.0.0" | version = "2.0.0" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "crossbeam", |  "crossbeam", | ||||||
|  "rumqttc", |  "rumqttc", | ||||||
|  | |||||||
| @ -1,6 +1,6 @@ | |||||||
| [package] | [package] | ||||||
| name = "mqtt-client" | name = "mqtt-client" | ||||||
| version = "1.0.0" | version = "2.0.0" | ||||||
| edition = "2021" | edition = "2021" | ||||||
| 
 | 
 | ||||||
| [dependencies] | [dependencies] | ||||||
|  | |||||||
							
								
								
									
										67
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										67
									
								
								src/lib.rs
									
									
									
									
									
								
							| @ -3,21 +3,27 @@ pub mod mqtt_client { | |||||||
|     use std::time::Duration; |     use std::time::Duration; | ||||||
|     use std::thread; |     use std::thread; | ||||||
| 
 | 
 | ||||||
|     use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet, QoS}; |     use rumqttc::{Client, Connection, Event, MqttOptions, Outgoing, Packet}; | ||||||
|     use crossbeam::channel::{unbounded, Receiver, Sender}; |     use crossbeam::channel::unbounded; | ||||||
|  | 
 | ||||||
|  |     pub use crossbeam::channel::{Receiver, Sender}; | ||||||
|  |     pub use rumqttc::QoS; | ||||||
| 
 | 
 | ||||||
|     pub trait MqttTool { |     pub trait MqttTool { | ||||||
|         fn new(client: Client, tx: Sender<MqttMessage>) -> Self; |         fn new(client: Client, tx: Sender<MqttMessage>) -> Self; | ||||||
|         fn rx(&mut self, messaage: MqttMessage); |         fn run(&mut self, rx: Receiver<MqttMessage>); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub struct MqttMessage { |     pub struct MqttMessage { | ||||||
|         pub topic: String, |         pub topic: String, | ||||||
|         pub payload: String |         pub payload: String, | ||||||
|  |         pub retain: bool, | ||||||
|  |         pub qos: QoS, | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn run<T: MqttTool>(host: String, port: u16, client: String, user: String, pass: String) { |     pub fn run<T: MqttTool>(host: String, port: u16, client: String, user: String, pass: String) { | ||||||
|         let (tx, rx) = unbounded::<MqttMessage>(); |         let (tx_sender, tx_recever) = unbounded::<MqttMessage>(); | ||||||
|  |         let (rx_sender, rx_recever) = unbounded::<MqttMessage>(); | ||||||
| 
 | 
 | ||||||
|         let mut mqttoptions = MqttOptions::new(client, host, port); |         let mut mqttoptions = MqttOptions::new(client, host, port); | ||||||
|         mqttoptions.set_keep_alive(Duration::from_secs(5)); |         mqttoptions.set_keep_alive(Duration::from_secs(5)); | ||||||
| @ -30,10 +36,22 @@ pub mod mqtt_client { | |||||||
|         let publisher = thread::Builder::new() |         let publisher = thread::Builder::new() | ||||||
|             .name("publisher".to_string()) |             .name("publisher".to_string()) | ||||||
|             .spawn(move || { |             .spawn(move || { | ||||||
|                 publisher(rx, client_publisher); |                 publisher(tx_recever, client_publisher); | ||||||
|             }); |             }); | ||||||
|         match publisher { |         match publisher { | ||||||
|             Err(_n) => println!("ERROR: main: fait to create publisher thread"), |             Err(_n) => println!("ERROR: main: failed to create publisher thread"), | ||||||
|  |             Ok(_n) => {} | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         // thread tool runner
 | ||||||
|  |         let tool_runner = thread::Builder::new() | ||||||
|  |             .name("tool runner".to_string()) | ||||||
|  |             .spawn(move || { | ||||||
|  |                 let mut tool = T::new(client, tx_sender); | ||||||
|  |                 tool.run(rx_recever); | ||||||
|  |             }); | ||||||
|  |         match tool_runner { | ||||||
|  |             Err(_n) => println!("ERROR: main: failed to create publisher thread"), | ||||||
|             Ok(_n) => {} |             Ok(_n) => {} | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
| @ -41,13 +59,12 @@ pub mod mqtt_client { | |||||||
|         let mqtt = thread::Builder::new() |         let mqtt = thread::Builder::new() | ||||||
|             .name("mqtt".to_string()) |             .name("mqtt".to_string()) | ||||||
|             .spawn(move || { |             .spawn(move || { | ||||||
|                 let tool = T::new(client, tx); |                 handeler::<T>(connection, rx_sender); | ||||||
|                 handeler::<T>(connection, tool); |  | ||||||
|             }); |             }); | ||||||
|         match mqtt { |         match mqtt { | ||||||
|             Err(_n) => println!("ERROR: main: fait to create mqtt thread"), |             Err(_n) => println!("ERROR: main: failed to create mqtt thread"), | ||||||
|             Ok(join) => match join.join() { |             Ok(join) => match join.join() { | ||||||
|                 Err(_) => println!("ERROR: main: failt to join mqtt thread"), |                 Err(_) => println!("ERROR: main: failed to join mqtt thread"), | ||||||
|                 Ok(_) => {} |                 Ok(_) => {} | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @ -57,11 +74,11 @@ pub mod mqtt_client { | |||||||
|         loop { |         loop { | ||||||
|             let message = rx.recv(); |             let message = rx.recv(); | ||||||
|             match message { |             match message { | ||||||
|                 Err(e) => println!("ERROR: publisher: faild to receve an message ({})", e), |                 Err(e) => println!("ERROR: publisher: failed to receve an message ({})", e), | ||||||
|                 Ok(msg) => { |                 Ok(msg) => { | ||||||
|                     println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); |                     println!("INFO : publisher: topic={}; payload={}", msg.topic, msg.payload); | ||||||
|                     match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { |                     match client.publish(msg.topic, QoS::AtMostOnce, false, msg.payload) { | ||||||
|                         Err(_n) => println!("ERROR: publisher: faild to publish"), |                         Err(_n) => println!("ERROR: publisher: failed to publish"), | ||||||
|                         Ok(_n) => {} |                         Ok(_n) => {} | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
| @ -69,14 +86,14 @@ pub mod mqtt_client { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub(self) fn handeler<T: MqttTool>(mut connection: Connection, mut tool: T) { |     pub(self) fn handeler<T: MqttTool>(mut connection: Connection, rx: Sender<MqttMessage>) { | ||||||
|         for (_i, notification) in connection.iter().enumerate() { |         for (_i, notification) in connection.iter().enumerate() { | ||||||
|             let mut delay: bool = false; |             let mut delay: bool = false; | ||||||
|             match notification { |             match notification { | ||||||
|                 Err(e) => match e { |                 Err(e) => match e { | ||||||
|                     rumqttc::ConnectionError::MqttState(state) => match state { |                     rumqttc::ConnectionError::MqttState(state) => match state { | ||||||
|                         rumqttc::StateError::Io(e) => { |                         rumqttc::StateError::Io(e) => { | ||||||
|                             println!("ERROR: mqtt: Io ({}) {}", e.kind(), e); |                             println!("ERROR: mqtt: State io {}: {}", e.kind(), e); | ||||||
|                             delay = true; |                             delay = true; | ||||||
|                         }, |                         }, | ||||||
|                         rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), |                         rumqttc::StateError::InvalidState => println!("ERROR: mqtt: InvalidState"), | ||||||
| @ -91,7 +108,10 @@ pub mod mqtt_client { | |||||||
|                     }, |                     }, | ||||||
|                     rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), |                     rumqttc::ConnectionError::NetworkTimeout => println!("ERROR: mqtt: NetworkTimeout"), | ||||||
|                     rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), |                     rumqttc::ConnectionError::FlushTimeout => println!("ERROR: mqtt: FlushTimeout"), | ||||||
|                     rumqttc::ConnectionError::Io(e) => println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()), |                     rumqttc::ConnectionError::Io(e) => { | ||||||
|  |                         println!("ERROR: mqtt: Io ({}) {}", e.kind(), e.to_string()); | ||||||
|  |                         delay = true; | ||||||
|  |                     }, | ||||||
|                     rumqttc::ConnectionError::ConnectionRefused(code) => { |                     rumqttc::ConnectionError::ConnectionRefused(code) => { | ||||||
|                         println!("ERROR: mqtt: ConnectionRefused {:?}", code); |                         println!("ERROR: mqtt: ConnectionRefused {:?}", code); | ||||||
|                         delay = true; |                         delay = true; | ||||||
| @ -122,16 +142,21 @@ pub mod mqtt_client { | |||||||
|                             Packet::Publish(msg) => { |                             Packet::Publish(msg) => { | ||||||
|                                 let mut payload: String = String::from(""); |                                 let mut payload: String = String::from(""); | ||||||
|                                 match String::from_utf8(msg.payload.to_vec()) { |                                 match String::from_utf8(msg.payload.to_vec()) { | ||||||
|                                     Err(e) => println!("ERROR: pqtt_recive: faild to decode payload ({})", e), |                                     Err(e) => println!("ERROR: pqtt_recive: failed to decode payload ({})", e), | ||||||
|                                     Ok(v) => payload = v |                                     Ok(v) => payload = v | ||||||
|                                 }; |                                 }; | ||||||
|                                 if payload.len() > 0 { |                                 if payload.len() > 0 { | ||||||
|                                     let message: MqttMessage = { MqttMessage { |                                     let message = MqttMessage { | ||||||
|                                         topic: msg.topic, |                                         topic: msg.topic, | ||||||
|                                         payload: payload |                                         payload: payload, | ||||||
|                                     }}; |                                         retain: msg.retain, | ||||||
|  |                                         qos: msg.qos, | ||||||
|  |                                     }; | ||||||
|                     
 |                     
 | ||||||
|                                     tool.rx(message); |                                     match rx.send(message) { | ||||||
|  |                                         Err(n) => println!("ERROR: faild to send incomming message to tool ({:?})", n), | ||||||
|  |                                         Ok(_n) => {} | ||||||
|  |                                     } | ||||||
|                                 } |                                 } | ||||||
|                             }, |                             }, | ||||||
|                             Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
 |                             Packet::PubAck(_) => {}, //println!("INFO : mqtt_recive: in PubAck"),
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user