feat: add support for channel peek
Add support for peeking into the front of the channel if the value implements Clone. This can be useful in single-receiver situations where you don't want to remove the item from the queue until you've successfully processed it.
This commit is contained in:
		
							parent
							
								
									645883d874
								
							
						
					
					
						commit
						042abc805a
					
				| @ -208,6 +208,16 @@ where | |||||||
|         self.channel.try_receive() |         self.channel.try_receive() | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Peek at the next value without removing it from the queue.
 | ||||||
|  |     ///
 | ||||||
|  |     /// See [`Channel::try_peek()`]
 | ||||||
|  |     pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.channel.try_peek() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Allows a poll_fn to poll until the channel is ready to receive
 |     /// Allows a poll_fn to poll until the channel is ready to receive
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// See [`Channel::poll_ready_to_receive()`]
 |     /// See [`Channel::poll_ready_to_receive()`]
 | ||||||
| @ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> { | |||||||
|         self.channel.try_receive_with_context(None) |         self.channel.try_receive_with_context(None) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Peek at the next value without removing it from the queue.
 | ||||||
|  |     ///
 | ||||||
|  |     /// See [`Channel::try_peek()`]
 | ||||||
|  |     pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.channel.try_peek_with_context(None) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Allows a poll_fn to poll until the channel is ready to receive
 |     /// Allows a poll_fn to poll until the channel is ready to receive
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// See [`Channel::poll_ready_to_receive()`]
 |     /// See [`Channel::poll_ready_to_receive()`]
 | ||||||
| @ -463,6 +483,10 @@ pub(crate) trait DynamicChannel<T> { | |||||||
| 
 | 
 | ||||||
|     fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; |     fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone; | ||||||
|  | 
 | ||||||
|     fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; |     fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; | ||||||
|     fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; |     fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; | ||||||
| 
 | 
 | ||||||
| @ -505,6 +529,31 @@ impl<T, const N: usize> ChannelState<T, N> { | |||||||
|         self.try_receive_with_context(None) |         self.try_receive_with_context(None) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.try_peek_with_context(None) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         if self.queue.is_full() { | ||||||
|  |             self.senders_waker.wake(); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         if let Some(message) = self.queue.front() { | ||||||
|  |             Ok(message.clone()) | ||||||
|  |         } else { | ||||||
|  |             if let Some(cx) = cx { | ||||||
|  |                 self.receiver_waker.register(cx.waker()); | ||||||
|  |             } | ||||||
|  |             Err(TryReceiveError::Empty) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |     fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | ||||||
|         if self.queue.is_full() { |         if self.queue.is_full() { | ||||||
|             self.senders_waker.wake(); |             self.senders_waker.wake(); | ||||||
| @ -634,6 +683,13 @@ where | |||||||
|         self.lock(|c| c.try_receive_with_context(cx)) |         self.lock(|c| c.try_receive_with_context(cx)) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.lock(|c| c.try_peek_with_context(cx)) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Poll the channel for the next message
 |     /// Poll the channel for the next message
 | ||||||
|     pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |     pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||||||
|         self.lock(|c| c.poll_receive(cx)) |         self.lock(|c| c.poll_receive(cx)) | ||||||
| @ -722,6 +778,17 @@ where | |||||||
|         self.lock(|c| c.try_receive()) |         self.lock(|c| c.try_receive()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Peek at the next value without removing it from the queue.
 | ||||||
|  |     ///
 | ||||||
|  |     /// This method will either receive a copy of the message from the channel immediately or return
 | ||||||
|  |     /// an error if the channel is empty.
 | ||||||
|  |     pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.lock(|c| c.try_peek()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Returns the maximum number of elements the channel can hold.
 |     /// Returns the maximum number of elements the channel can hold.
 | ||||||
|     pub const fn capacity(&self) -> usize { |     pub const fn capacity(&self) -> usize { | ||||||
|         N |         N | ||||||
| @ -769,6 +836,13 @@ where | |||||||
|         Channel::try_receive_with_context(self, cx) |         Channel::try_receive_with_context(self, cx) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         Channel::try_peek_with_context(self, cx) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |     fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||||||
|         Channel::poll_ready_to_send(self, cx) |         Channel::poll_ready_to_send(self, cx) | ||||||
|     } |     } | ||||||
| @ -851,6 +925,8 @@ mod tests { | |||||||
|     fn simple_send_and_receive() { |     fn simple_send_and_receive() { | ||||||
|         let c = Channel::<NoopRawMutex, u32, 3>::new(); |         let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||||||
|         assert!(c.try_send(1).is_ok()); |         assert!(c.try_send(1).is_ok()); | ||||||
|  |         assert_eq!(c.try_peek().unwrap(), 1); | ||||||
|  |         assert_eq!(c.try_peek().unwrap(), 1); | ||||||
|         assert_eq!(c.try_receive().unwrap(), 1); |         assert_eq!(c.try_receive().unwrap(), 1); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -881,6 +957,8 @@ mod tests { | |||||||
|         let r = c.dyn_receiver(); |         let r = c.dyn_receiver(); | ||||||
| 
 | 
 | ||||||
|         assert!(s.try_send(1).is_ok()); |         assert!(s.try_send(1).is_ok()); | ||||||
|  |         assert_eq!(r.try_peek().unwrap(), 1); | ||||||
|  |         assert_eq!(r.try_peek().unwrap(), 1); | ||||||
|         assert_eq!(r.try_receive().unwrap(), 1); |         assert_eq!(r.try_receive().unwrap(), 1); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -175,6 +175,16 @@ where | |||||||
|         self.channel.try_receive() |         self.channel.try_receive() | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Peek at the next value without removing it from the queue.
 | ||||||
|  |     ///
 | ||||||
|  |     /// See [`PriorityChannel::try_peek()`]
 | ||||||
|  |     pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.channel.try_peek_with_context(None) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Allows a poll_fn to poll until the channel is ready to receive
 |     /// Allows a poll_fn to poll until the channel is ready to receive
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// See [`PriorityChannel::poll_ready_to_receive()`]
 |     /// See [`PriorityChannel::poll_ready_to_receive()`]
 | ||||||
| @ -343,6 +353,31 @@ where | |||||||
|         self.try_receive_with_context(None) |         self.try_receive_with_context(None) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.try_peek_with_context(None) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         if self.queue.len() == self.queue.capacity() { | ||||||
|  |             self.senders_waker.wake(); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         if let Some(message) = self.queue.peek() { | ||||||
|  |             Ok(message.clone()) | ||||||
|  |         } else { | ||||||
|  |             if let Some(cx) = cx { | ||||||
|  |                 self.receiver_waker.register(cx.waker()); | ||||||
|  |             } | ||||||
|  |             Err(TryReceiveError::Empty) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |     fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | ||||||
|         if self.queue.len() == self.queue.capacity() { |         if self.queue.len() == self.queue.capacity() { | ||||||
|             self.senders_waker.wake(); |             self.senders_waker.wake(); | ||||||
| @ -478,6 +513,13 @@ where | |||||||
|         self.lock(|c| c.try_receive_with_context(cx)) |         self.lock(|c| c.try_receive_with_context(cx)) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.lock(|c| c.try_peek_with_context(cx)) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Poll the channel for the next message
 |     /// Poll the channel for the next message
 | ||||||
|     pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |     pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||||||
|         self.lock(|c| c.poll_receive(cx)) |         self.lock(|c| c.poll_receive(cx)) | ||||||
| @ -548,6 +590,17 @@ where | |||||||
|         self.lock(|c| c.try_receive()) |         self.lock(|c| c.try_receive()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Peek at the next value without removing it from the queue.
 | ||||||
|  |     ///
 | ||||||
|  |     /// This method will either receive a copy of the message from the channel immediately or return
 | ||||||
|  |     /// an error if the channel is empty.
 | ||||||
|  |     pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         self.lock(|c| c.try_peek()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Removes elements from the channel based on the given predicate.
 |     /// Removes elements from the channel based on the given predicate.
 | ||||||
|     pub fn remove_if<F>(&self, predicate: F) |     pub fn remove_if<F>(&self, predicate: F) | ||||||
|     where |     where | ||||||
| @ -617,6 +670,13 @@ where | |||||||
|         PriorityChannel::try_receive_with_context(self, cx) |         PriorityChannel::try_receive_with_context(self, cx) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||||||
|  |     where | ||||||
|  |         T: Clone, | ||||||
|  |     { | ||||||
|  |         PriorityChannel::try_peek_with_context(self, cx) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |     fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||||||
|         PriorityChannel::poll_ready_to_send(self, cx) |         PriorityChannel::poll_ready_to_send(self, cx) | ||||||
|     } |     } | ||||||
| @ -705,6 +765,8 @@ mod tests { | |||||||
|     fn simple_send_and_receive() { |     fn simple_send_and_receive() { | ||||||
|         let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); |         let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); | ||||||
|         assert!(c.try_send(1).is_ok()); |         assert!(c.try_send(1).is_ok()); | ||||||
|  |         assert_eq!(c.try_peek().unwrap(), 1); | ||||||
|  |         assert_eq!(c.try_peek().unwrap(), 1); | ||||||
|         assert_eq!(c.try_receive().unwrap(), 1); |         assert_eq!(c.try_receive().unwrap(), 1); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -725,6 +787,8 @@ mod tests { | |||||||
|         let r: DynamicReceiver<'_, u32> = c.receiver().into(); |         let r: DynamicReceiver<'_, u32> = c.receiver().into(); | ||||||
| 
 | 
 | ||||||
|         assert!(s.try_send(1).is_ok()); |         assert!(s.try_send(1).is_ok()); | ||||||
|  |         assert_eq!(r.try_peek().unwrap(), 1); | ||||||
|  |         assert_eq!(r.try_peek().unwrap(), 1); | ||||||
|         assert_eq!(r.try_receive().unwrap(), 1); |         assert_eq!(r.try_receive().unwrap(), 1); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user