Merge branch 'master' of https://github.com/embassy-rs/embassy into embassy-stm32/uart-eh1
This commit is contained in:
		
						commit
						88a3c360e8
					
				| @ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn available(&self, next_message_id: u64) -> u64 { | ||||||
|  |         self.inner.lock(|s| s.borrow().next_message_id - next_message_id) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |     fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | ||||||
|         self.inner.lock(|s| { |         self.inner.lock(|s| { | ||||||
|             let mut s = s.borrow_mut(); |             let mut s = s.borrow_mut(); | ||||||
| @ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn space(&self) -> usize { | ||||||
|  |         self.inner.lock(|s| { | ||||||
|  |             let s = s.borrow(); | ||||||
|  |             s.queue.capacity() - s.queue.len() | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |     fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | ||||||
|         self.inner.lock(|s| { |         self.inner.lock(|s| { | ||||||
|             let mut s = s.borrow_mut(); |             let mut s = s.borrow_mut(); | ||||||
| @ -388,6 +399,10 @@ pub trait PubSubBehavior<T> { | |||||||
|     /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
 |     /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
 | ||||||
|     fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |     fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | ||||||
| 
 | 
 | ||||||
|  |     /// Get the amount of messages that are between the given the next_message_id and the most recent message.
 | ||||||
|  |     /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
 | ||||||
|  |     fn available(&self, next_message_id: u64) -> u64; | ||||||
|  | 
 | ||||||
|     /// Try to publish a message to the queue.
 |     /// Try to publish a message to the queue.
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
 |     /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
 | ||||||
| @ -396,6 +411,9 @@ pub trait PubSubBehavior<T> { | |||||||
|     /// Publish a message immediately
 |     /// Publish a message immediately
 | ||||||
|     fn publish_immediate(&self, message: T); |     fn publish_immediate(&self, message: T); | ||||||
| 
 | 
 | ||||||
|  |     /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
 | ||||||
|  |     fn space(&self) -> usize; | ||||||
|  | 
 | ||||||
|     /// Let the channel know that a subscriber has dropped
 |     /// Let the channel know that a subscriber has dropped
 | ||||||
|     fn unregister_subscriber(&self, subscriber_next_message_id: u64); |     fn unregister_subscriber(&self, subscriber_next_message_id: u64); | ||||||
| 
 | 
 | ||||||
| @ -539,4 +557,59 @@ mod tests { | |||||||
| 
 | 
 | ||||||
|         drop(sub0); |         drop(sub0); | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     #[futures_test::test] | ||||||
|  |     async fn correct_available() { | ||||||
|  |         let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||||||
|  | 
 | ||||||
|  |         let sub0 = channel.subscriber().unwrap(); | ||||||
|  |         let mut sub1 = channel.subscriber().unwrap(); | ||||||
|  |         let pub0 = channel.publisher().unwrap(); | ||||||
|  | 
 | ||||||
|  |         assert_eq!(sub0.available(), 0); | ||||||
|  |         assert_eq!(sub1.available(), 0); | ||||||
|  | 
 | ||||||
|  |         pub0.publish(42).await; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(sub0.available(), 1); | ||||||
|  |         assert_eq!(sub1.available(), 1); | ||||||
|  | 
 | ||||||
|  |         sub1.next_message().await; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(sub1.available(), 0); | ||||||
|  | 
 | ||||||
|  |         pub0.publish(42).await; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(sub0.available(), 2); | ||||||
|  |         assert_eq!(sub1.available(), 1); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     #[futures_test::test] | ||||||
|  |     async fn correct_space() { | ||||||
|  |         let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||||||
|  | 
 | ||||||
|  |         let mut sub0 = channel.subscriber().unwrap(); | ||||||
|  |         let mut sub1 = channel.subscriber().unwrap(); | ||||||
|  |         let pub0 = channel.publisher().unwrap(); | ||||||
|  | 
 | ||||||
|  |         assert_eq!(pub0.space(), 4); | ||||||
|  | 
 | ||||||
|  |         pub0.publish(42).await; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(pub0.space(), 3); | ||||||
|  | 
 | ||||||
|  |         pub0.publish(42).await; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(pub0.space(), 2); | ||||||
|  | 
 | ||||||
|  |         sub0.next_message().await; | ||||||
|  |         sub0.next_message().await; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(pub0.space(), 2); | ||||||
|  | 
 | ||||||
|  |         sub1.next_message().await; | ||||||
|  |         assert_eq!(pub0.space(), 3); | ||||||
|  |         sub1.next_message().await; | ||||||
|  |         assert_eq!(pub0.space(), 4); | ||||||
|  |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||||||
|     pub fn try_publish(&self, message: T) -> Result<(), T> { |     pub fn try_publish(&self, message: T) -> Result<(), T> { | ||||||
|         self.channel.publish_with_context(message, None) |         self.channel.publish_with_context(message, None) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
 | ||||||
|  |     ///
 | ||||||
|  |     /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
 | ||||||
|  |     /// So checking doesn't give any guarantees.*
 | ||||||
|  |     pub fn space(&self) -> usize { | ||||||
|  |         self.channel.space() | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | ||||||
| @ -158,6 +166,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// Future for the publisher wait action
 | /// Future for the publisher wait action
 | ||||||
|  | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||||||
| pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||||||
|     /// The message we need to publish
 |     /// The message we need to publish
 | ||||||
|     message: Option<T>, |     message: Option<T>, | ||||||
|  | |||||||
| @ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     /// The amount of messages this subscriber hasn't received yet
 | ||||||
|  |     pub fn available(&self) -> u64 { | ||||||
|  |         self.channel.available(self.next_message_id) | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | ||||||
| @ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// Future for the subscriber wait action
 | /// Future for the subscriber wait action
 | ||||||
|  | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||||||
| pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||||||
|     subscriber: &'s mut Sub<'a, PSB, T>, |     subscriber: &'s mut Sub<'a, PSB, T>, | ||||||
| } | } | ||||||
|  | |||||||
| @ -54,12 +54,16 @@ impl From<EndpointAddress> for u8 { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl EndpointAddress { | impl EndpointAddress { | ||||||
|     const INBITS: u8 = Direction::In as u8; |     const INBITS: u8 = 0x80; | ||||||
| 
 | 
 | ||||||
|     /// Constructs a new EndpointAddress with the given index and direction.
 |     /// Constructs a new EndpointAddress with the given index and direction.
 | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn from_parts(index: usize, dir: Direction) -> Self { |     pub fn from_parts(index: usize, dir: Direction) -> Self { | ||||||
|         EndpointAddress(index as u8 | dir as u8) |         let dir_u8 = match dir { | ||||||
|  |             Direction::Out => 0x00, | ||||||
|  |             Direction::In => Self::INBITS, | ||||||
|  |         }; | ||||||
|  |         EndpointAddress(index as u8 | dir_u8) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// Gets the direction part of the address.
 |     /// Gets the direction part of the address.
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user