Expose new length functions in the subs and pubs
This commit is contained in:
		
							parent
							
								
									2a4a714060
								
							
						
					
					
						commit
						a76082b104
					
				| @ -248,13 +248,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     fn space(&self) -> usize { |  | ||||||
|         self.inner.lock(|s| { |  | ||||||
|             let s = s.borrow(); |  | ||||||
|             s.queue.capacity() - s.queue.len() |  | ||||||
|         }) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |     fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | ||||||
|         self.inner.lock(|s| { |         self.inner.lock(|s| { | ||||||
|             let mut s = s.borrow_mut(); |             let mut s = s.borrow_mut(); | ||||||
| @ -268,6 +261,26 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||||||
|             s.unregister_publisher() |             s.unregister_publisher() | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     fn capacity(&self) -> usize { | ||||||
|  |         self.capacity() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn free_capacity(&self) -> usize { | ||||||
|  |         self.free_capacity() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn len(&self) -> usize { | ||||||
|  |         self.len() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn is_empty(&self) -> bool { | ||||||
|  |         self.is_empty() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn is_full(&self) -> bool { | ||||||
|  |         self.is_full() | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// Internal state for the PubSub channel
 | /// Internal state for the PubSub channel
 | ||||||
| @ -439,8 +452,22 @@ trait SealedPubSubBehavior<T> { | |||||||
|     /// Publish a message immediately
 |     /// Publish a message immediately
 | ||||||
|     fn publish_immediate(&self, message: T); |     fn publish_immediate(&self, message: T); | ||||||
| 
 | 
 | ||||||
|     /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
 |     /// Returns the maximum number of elements the channel can hold.
 | ||||||
|     fn space(&self) -> usize; |     fn capacity(&self) -> usize; | ||||||
|  | 
 | ||||||
|  |     /// Returns the free capacity of the channel.
 | ||||||
|  |     ///
 | ||||||
|  |     /// This is equivalent to `capacity() - len()`
 | ||||||
|  |     fn free_capacity(&self) -> usize; | ||||||
|  | 
 | ||||||
|  |     /// Returns the number of elements currently in the channel.
 | ||||||
|  |     fn len(&self) -> usize; | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the channel is empty.
 | ||||||
|  |     fn is_empty(&self) -> bool; | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the channel is full.
 | ||||||
|  |     fn is_full(&self) -> bool; | ||||||
| 
 | 
 | ||||||
|     /// Let the channel know that a subscriber has dropped
 |     /// Let the channel know that a subscriber has dropped
 | ||||||
|     fn unregister_subscriber(&self, subscriber_next_message_id: u64); |     fn unregister_subscriber(&self, subscriber_next_message_id: u64); | ||||||
| @ -588,6 +615,7 @@ mod tests { | |||||||
|         assert_eq!(pub0.try_publish(0), Ok(())); |         assert_eq!(pub0.try_publish(0), Ok(())); | ||||||
|         assert_eq!(pub0.try_publish(0), Ok(())); |         assert_eq!(pub0.try_publish(0), Ok(())); | ||||||
|         assert_eq!(pub0.try_publish(0), Ok(())); |         assert_eq!(pub0.try_publish(0), Ok(())); | ||||||
|  |         assert!(pub0.is_full()); | ||||||
|         assert_eq!(pub0.try_publish(0), Err(0)); |         assert_eq!(pub0.try_publish(0), Err(0)); | ||||||
| 
 | 
 | ||||||
|         drop(sub0); |         drop(sub0); | ||||||
| @ -620,32 +648,42 @@ mod tests { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     #[futures_test::test] |     #[futures_test::test] | ||||||
|     async fn correct_space() { |     async fn correct_len() { | ||||||
|         let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); |         let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||||||
| 
 | 
 | ||||||
|         let mut sub0 = channel.subscriber().unwrap(); |         let mut sub0 = channel.subscriber().unwrap(); | ||||||
|         let mut sub1 = channel.subscriber().unwrap(); |         let mut sub1 = channel.subscriber().unwrap(); | ||||||
|         let pub0 = channel.publisher().unwrap(); |         let pub0 = channel.publisher().unwrap(); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(pub0.space(), 4); |         assert!(sub0.is_empty()); | ||||||
|  |         assert!(sub1.is_empty()); | ||||||
|  |         assert!(pub0.is_empty()); | ||||||
|  |         assert_eq!(pub0.free_capacity(), 4); | ||||||
|  |         assert_eq!(pub0.len(), 0); | ||||||
| 
 | 
 | ||||||
|         pub0.publish(42).await; |         pub0.publish(42).await; | ||||||
| 
 | 
 | ||||||
|         assert_eq!(pub0.space(), 3); |         assert_eq!(pub0.free_capacity(), 3); | ||||||
|  |         assert_eq!(pub0.len(), 1); | ||||||
| 
 | 
 | ||||||
|         pub0.publish(42).await; |         pub0.publish(42).await; | ||||||
| 
 | 
 | ||||||
|         assert_eq!(pub0.space(), 2); |         assert_eq!(pub0.free_capacity(), 2); | ||||||
|  |         assert_eq!(pub0.len(), 2); | ||||||
| 
 | 
 | ||||||
|         sub0.next_message().await; |         sub0.next_message().await; | ||||||
|         sub0.next_message().await; |         sub0.next_message().await; | ||||||
| 
 | 
 | ||||||
|         assert_eq!(pub0.space(), 2); |         assert_eq!(pub0.free_capacity(), 2); | ||||||
|  |         assert_eq!(pub0.len(), 2); | ||||||
| 
 | 
 | ||||||
|         sub1.next_message().await; |         sub1.next_message().await; | ||||||
|         assert_eq!(pub0.space(), 3); |         assert_eq!(pub0.free_capacity(), 3); | ||||||
|  |         assert_eq!(pub0.len(), 1); | ||||||
|  | 
 | ||||||
|         sub1.next_message().await; |         sub1.next_message().await; | ||||||
|         assert_eq!(pub0.space(), 4); |         assert_eq!(pub0.free_capacity(), 4); | ||||||
|  |         assert_eq!(pub0.len(), 0); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     #[futures_test::test] |     #[futures_test::test] | ||||||
| @ -656,29 +694,29 @@ mod tests { | |||||||
|         let mut sub0 = channel.subscriber().unwrap(); |         let mut sub0 = channel.subscriber().unwrap(); | ||||||
|         let mut sub1 = channel.subscriber().unwrap(); |         let mut sub1 = channel.subscriber().unwrap(); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(4, pub0.space()); |         assert_eq!(4, pub0.free_capacity()); | ||||||
| 
 | 
 | ||||||
|         pub0.publish(1).await; |         pub0.publish(1).await; | ||||||
|         pub0.publish(2).await; |         pub0.publish(2).await; | ||||||
| 
 | 
 | ||||||
|         assert_eq!(2, channel.space()); |         assert_eq!(2, channel.free_capacity()); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(1, sub0.try_next_message_pure().unwrap()); |         assert_eq!(1, sub0.try_next_message_pure().unwrap()); | ||||||
|         assert_eq!(2, sub0.try_next_message_pure().unwrap()); |         assert_eq!(2, sub0.try_next_message_pure().unwrap()); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(2, channel.space()); |         assert_eq!(2, channel.free_capacity()); | ||||||
| 
 | 
 | ||||||
|         drop(sub0); |         drop(sub0); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(2, channel.space()); |         assert_eq!(2, channel.free_capacity()); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(1, sub1.try_next_message_pure().unwrap()); |         assert_eq!(1, sub1.try_next_message_pure().unwrap()); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(3, channel.space()); |         assert_eq!(3, channel.free_capacity()); | ||||||
| 
 | 
 | ||||||
|         drop(sub1); |         drop(sub1); | ||||||
| 
 | 
 | ||||||
|         assert_eq!(4, channel.space()); |         assert_eq!(4, channel.free_capacity()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     struct CloneCallCounter(usize); |     struct CloneCallCounter(usize); | ||||||
|  | |||||||
| @ -43,12 +43,31 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||||||
|         self.channel.publish_with_context(message, None) |         self.channel.publish_with_context(message, None) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
 |     /// Returns the maximum number of elements the ***channel*** can hold.
 | ||||||
|  |     pub fn capacity(&self) -> usize { | ||||||
|  |         self.channel.capacity() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the free capacity of the ***channel***.
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
 |     /// This is equivalent to `capacity() - len()`
 | ||||||
|     /// So checking doesn't give any guarantees.*
 |     pub fn free_capacity(&self) -> usize { | ||||||
|     pub fn space(&self) -> usize { |         self.channel.free_capacity() | ||||||
|         self.channel.space() |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the number of elements currently in the ***channel***.
 | ||||||
|  |     pub fn len(&self) -> usize { | ||||||
|  |         self.channel.len() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the ***channel*** is empty.
 | ||||||
|  |     pub fn is_empty(&self) -> bool { | ||||||
|  |         self.channel.is_empty() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the ***channel*** is full.
 | ||||||
|  |     pub fn is_full(&self) -> bool { | ||||||
|  |         self.channel.is_full() | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -124,12 +143,31 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { | |||||||
|         self.channel.publish_with_context(message, None) |         self.channel.publish_with_context(message, None) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
 |     /// Returns the maximum number of elements the ***channel*** can hold.
 | ||||||
|  |     pub fn capacity(&self) -> usize { | ||||||
|  |         self.channel.capacity() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the free capacity of the ***channel***.
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
 |     /// This is equivalent to `capacity() - len()`
 | ||||||
|     /// So checking doesn't give any guarantees.*
 |     pub fn free_capacity(&self) -> usize { | ||||||
|     pub fn space(&self) -> usize { |         self.channel.free_capacity() | ||||||
|         self.channel.space() |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the number of elements currently in the ***channel***.
 | ||||||
|  |     pub fn len(&self) -> usize { | ||||||
|  |         self.channel.len() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the ***channel*** is empty.
 | ||||||
|  |     pub fn is_empty(&self) -> bool { | ||||||
|  |         self.channel.is_empty() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the ***channel*** is full.
 | ||||||
|  |     pub fn is_full(&self) -> bool { | ||||||
|  |         self.channel.is_full() | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -65,10 +65,39 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// The amount of messages this subscriber hasn't received yet
 |     /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
 | ||||||
|  |     /// for this subscriber.
 | ||||||
|     pub fn available(&self) -> u64 { |     pub fn available(&self) -> u64 { | ||||||
|         self.channel.available(self.next_message_id) |         self.channel.available(self.next_message_id) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the maximum number of elements the ***channel*** can hold.
 | ||||||
|  |     pub fn capacity(&self) -> usize { | ||||||
|  |         self.channel.capacity() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the free capacity of the ***channel***.
 | ||||||
|  |     ///
 | ||||||
|  |     /// This is equivalent to `capacity() - len()`
 | ||||||
|  |     pub fn free_capacity(&self) -> usize { | ||||||
|  |         self.channel.free_capacity() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the number of elements currently in the ***channel***.
 | ||||||
|  |     /// See [Self::available] for how many messages are available for this subscriber.
 | ||||||
|  |     pub fn len(&self) -> usize { | ||||||
|  |         self.channel.len() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the ***channel*** is empty.
 | ||||||
|  |     pub fn is_empty(&self) -> bool { | ||||||
|  |         self.channel.is_empty() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns whether the ***channel*** is full.
 | ||||||
|  |     pub fn is_full(&self) -> bool { | ||||||
|  |         self.channel.is_full() | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user