Added sender types, support for dropping receivers, converting to dyn-types, revised tests.
This commit is contained in:
parent
6defb4fed9
commit
ae2f109921
@ -1,4 +1,4 @@
|
|||||||
//! A synchronization primitive for passing the latest value to **multiple** tasks.
|
//! A synchronization primitive for passing the latest value to **multiple** receivers.
|
||||||
|
|
||||||
use core::cell::RefCell;
|
use core::cell::RefCell;
|
||||||
use core::future::poll_fn;
|
use core::future::poll_fn;
|
||||||
@ -10,22 +10,17 @@ use crate::blocking_mutex::raw::RawMutex;
|
|||||||
use crate::blocking_mutex::Mutex;
|
use crate::blocking_mutex::Mutex;
|
||||||
use crate::waitqueue::MultiWakerRegistration;
|
use crate::waitqueue::MultiWakerRegistration;
|
||||||
|
|
||||||
/// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s.
|
/// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await
|
||||||
|
/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
|
||||||
|
/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
|
||||||
|
/// value when a new one is sent, without waiting for all receivers to read the previous value.
|
||||||
///
|
///
|
||||||
/// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to
|
/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
|
||||||
/// `.await` the latest value, and all receive it.
|
/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
|
||||||
|
/// always provided with the latest value.
|
||||||
///
|
///
|
||||||
/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except
|
/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
|
||||||
/// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead
|
/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program.
|
||||||
/// of waiting for the receivers to pop the previous value.
|
|
||||||
///
|
|
||||||
/// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other
|
|
||||||
/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for
|
|
||||||
/// [`Receiver`]s to "lose" stale values.
|
|
||||||
///
|
|
||||||
/// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared
|
|
||||||
/// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`]
|
|
||||||
/// with [`Watch::receiver`] which has async methods.
|
|
||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
/// use futures_executor::block_on;
|
/// use futures_executor::block_on;
|
||||||
@ -36,18 +31,18 @@ use crate::waitqueue::MultiWakerRegistration;
|
|||||||
///
|
///
|
||||||
/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
///
|
///
|
||||||
/// // Obtain Receivers
|
/// // Obtain receivers and sender
|
||||||
/// let mut rcv0 = WATCH.receiver().unwrap();
|
/// let mut rcv0 = WATCH.receiver().unwrap();
|
||||||
/// let mut rcv1 = WATCH.receiver().unwrap();
|
/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
|
||||||
/// assert!(WATCH.receiver().is_err());
|
/// let mut snd = WATCH.sender();
|
||||||
///
|
///
|
||||||
|
/// // No more receivers, and no update
|
||||||
|
/// assert!(WATCH.receiver().is_err());
|
||||||
/// assert_eq!(rcv1.try_changed(), None);
|
/// assert_eq!(rcv1.try_changed(), None);
|
||||||
///
|
///
|
||||||
/// WATCH.write(10);
|
/// snd.send(10);
|
||||||
/// assert_eq!(WATCH.try_peek(), Some(10));
|
|
||||||
///
|
///
|
||||||
///
|
/// // Receive the new value (async or try)
|
||||||
/// // Receive the new value
|
|
||||||
/// assert_eq!(rcv0.changed().await, 10);
|
/// assert_eq!(rcv0.changed().await, 10);
|
||||||
/// assert_eq!(rcv1.try_changed(), Some(10));
|
/// assert_eq!(rcv1.try_changed(), Some(10));
|
||||||
///
|
///
|
||||||
@ -55,13 +50,14 @@ use crate::waitqueue::MultiWakerRegistration;
|
|||||||
/// assert_eq!(rcv0.try_changed(), None);
|
/// assert_eq!(rcv0.try_changed(), None);
|
||||||
/// assert_eq!(rcv1.try_changed(), None);
|
/// assert_eq!(rcv1.try_changed(), None);
|
||||||
///
|
///
|
||||||
/// WATCH.write(20);
|
/// snd.send(20);
|
||||||
///
|
///
|
||||||
/// // Defference `between` peek `get`.
|
/// // Peek does not mark the value as seen
|
||||||
/// assert_eq!(rcv0.peek().await, 20);
|
/// assert_eq!(rcv0.peek().await, 20);
|
||||||
/// assert_eq!(rcv1.get().await, 20);
|
|
||||||
///
|
|
||||||
/// assert_eq!(rcv0.try_changed(), Some(20));
|
/// assert_eq!(rcv0.try_changed(), Some(20));
|
||||||
|
///
|
||||||
|
/// // Get marks the value as seen
|
||||||
|
/// assert_eq!(rcv1.get().await, 20);
|
||||||
/// assert_eq!(rcv1.try_changed(), None);
|
/// assert_eq!(rcv1.try_changed(), None);
|
||||||
///
|
///
|
||||||
/// };
|
/// };
|
||||||
@ -80,30 +76,57 @@ struct WatchState<const N: usize, T: Clone> {
|
|||||||
|
|
||||||
/// A trait representing the 'inner' behavior of the `Watch`.
|
/// A trait representing the 'inner' behavior of the `Watch`.
|
||||||
pub trait WatchBehavior<T: Clone> {
|
pub trait WatchBehavior<T: Clone> {
|
||||||
|
/// Sends a new value to the `Watch`.
|
||||||
|
fn send(&self, val: T);
|
||||||
|
|
||||||
|
/// Clears the value of the `Watch`.
|
||||||
|
fn clear(&self);
|
||||||
|
|
||||||
/// Poll the `Watch` for the current value, **without** making it as seen.
|
/// Poll the `Watch` for the current value, **without** making it as seen.
|
||||||
fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>;
|
fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>;
|
||||||
|
|
||||||
/// Tries to peek the value of the `Watch`, **without** marking it as seen.
|
/// Tries to peek the value of the `Watch`, **without** marking it as seen.
|
||||||
fn inner_try_peek(&self) -> Option<T>;
|
fn try_peek(&self) -> Option<T>;
|
||||||
|
|
||||||
/// Poll the `Watch` for the current value, making it as seen.
|
/// Poll the `Watch` for the current value, making it as seen.
|
||||||
fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
|
fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
|
||||||
|
|
||||||
/// Tries to get the value of the `Watch`, marking it as seen.
|
/// Tries to get the value of the `Watch`, marking it as seen.
|
||||||
fn inner_try_get(&self, id: &mut u64) -> Option<T>;
|
fn try_get(&self, id: &mut u64) -> Option<T>;
|
||||||
|
|
||||||
/// Poll the `Watch` for a changed value, marking it as seen.
|
/// Poll the `Watch` for a changed value, marking it as seen.
|
||||||
fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
|
fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
|
||||||
|
|
||||||
/// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
|
/// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
|
||||||
fn inner_try_changed(&self, id: &mut u64) -> Option<T>;
|
fn try_changed(&self, id: &mut u64) -> Option<T>;
|
||||||
|
|
||||||
/// Checks if the `Watch` is been initialized with a value.
|
/// Checks if the `Watch` is been initialized with a value.
|
||||||
fn inner_contains_value(&self) -> bool;
|
fn contains_value(&self) -> bool;
|
||||||
|
|
||||||
|
/// Used when a receiver is dropped to decrement the receiver count.
|
||||||
|
///
|
||||||
|
/// ## This method should not be called by the user.
|
||||||
|
fn drop_receiver(&self);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
|
impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
|
||||||
fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> {
|
fn send(&self, val: T) {
|
||||||
|
self.mutex.lock(|state| {
|
||||||
|
let mut s = state.borrow_mut();
|
||||||
|
s.data = Some(val);
|
||||||
|
s.current_id += 1;
|
||||||
|
s.wakers.wake();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&self) {
|
||||||
|
self.mutex.lock(|state| {
|
||||||
|
let mut s = state.borrow_mut();
|
||||||
|
s.data = None;
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
self.mutex.lock(|state| {
|
self.mutex.lock(|state| {
|
||||||
let mut s = state.borrow_mut();
|
let mut s = state.borrow_mut();
|
||||||
match &s.data {
|
match &s.data {
|
||||||
@ -116,11 +139,11 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inner_try_peek(&self) -> Option<T> {
|
fn try_peek(&self) -> Option<T> {
|
||||||
self.mutex.lock(|state| state.borrow().data.clone())
|
self.mutex.lock(|state| state.borrow().data.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
|
fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
self.mutex.lock(|state| {
|
self.mutex.lock(|state| {
|
||||||
let mut s = state.borrow_mut();
|
let mut s = state.borrow_mut();
|
||||||
match &s.data {
|
match &s.data {
|
||||||
@ -136,7 +159,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inner_try_get(&self, id: &mut u64) -> Option<T> {
|
fn try_get(&self, id: &mut u64) -> Option<T> {
|
||||||
self.mutex.lock(|state| {
|
self.mutex.lock(|state| {
|
||||||
let s = state.borrow();
|
let s = state.borrow();
|
||||||
*id = s.current_id;
|
*id = s.current_id;
|
||||||
@ -144,7 +167,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
|
fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
self.mutex.lock(|state| {
|
self.mutex.lock(|state| {
|
||||||
let mut s = state.borrow_mut();
|
let mut s = state.borrow_mut();
|
||||||
match (&s.data, s.current_id > *id) {
|
match (&s.data, s.current_id > *id) {
|
||||||
@ -160,7 +183,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inner_try_changed(&self, id: &mut u64) -> Option<T> {
|
fn try_changed(&self, id: &mut u64) -> Option<T> {
|
||||||
self.mutex.lock(|state| {
|
self.mutex.lock(|state| {
|
||||||
let s = state.borrow();
|
let s = state.borrow();
|
||||||
match s.current_id > *id {
|
match s.current_id > *id {
|
||||||
@ -173,9 +196,16 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inner_contains_value(&self) -> bool {
|
fn contains_value(&self) -> bool {
|
||||||
self.mutex.lock(|state| state.borrow().data.is_some())
|
self.mutex.lock(|state| state.borrow().data.is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn drop_receiver(&self) {
|
||||||
|
self.mutex.lock(|state| {
|
||||||
|
let mut s = state.borrow_mut();
|
||||||
|
s.receiver_count -= 1;
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -198,14 +228,14 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write a new value to the `Watch`.
|
/// Create a new [`Receiver`] for the `Watch`.
|
||||||
pub fn write(&self, val: T) {
|
pub fn sender(&self) -> Sender<'_, M, T, N> {
|
||||||
self.mutex.lock(|state| {
|
Sender(Snd::new(self))
|
||||||
let mut s = state.borrow_mut();
|
}
|
||||||
s.data = Some(val);
|
|
||||||
s.current_id += 1;
|
/// Create a new [`DynReceiver`] for the `Watch`.
|
||||||
s.wakers.wake();
|
pub fn dyn_sender(&self) -> DynSender<'_, T> {
|
||||||
})
|
DynSender(Snd::new(self))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new [`Receiver`] for the `Watch`.
|
/// Create a new [`Receiver`] for the `Watch`.
|
||||||
@ -214,7 +244,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
|
|||||||
let mut s = state.borrow_mut();
|
let mut s = state.borrow_mut();
|
||||||
if s.receiver_count < N {
|
if s.receiver_count < N {
|
||||||
s.receiver_count += 1;
|
s.receiver_count += 1;
|
||||||
Ok(Receiver(Rcv::new(self)))
|
Ok(Receiver(Rcv::new(self, 0)))
|
||||||
} else {
|
} else {
|
||||||
Err(Error::MaximumReceiversReached)
|
Err(Error::MaximumReceiversReached)
|
||||||
}
|
}
|
||||||
@ -227,29 +257,121 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
|
|||||||
let mut s = state.borrow_mut();
|
let mut s = state.borrow_mut();
|
||||||
if s.receiver_count < N {
|
if s.receiver_count < N {
|
||||||
s.receiver_count += 1;
|
s.receiver_count += 1;
|
||||||
Ok(DynReceiver(Rcv::new(self)))
|
Ok(DynReceiver(Rcv::new(self, 0)))
|
||||||
} else {
|
} else {
|
||||||
Err(Error::MaximumReceiversReached)
|
Err(Error::MaximumReceiversReached)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A receiver can `.await` a change in the `Watch` value.
|
||||||
|
pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
|
||||||
|
watch: &'a W,
|
||||||
|
_phantom: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
watch: self.watch,
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
|
||||||
|
/// Creates a new `Receiver` with a reference to the `Watch`.
|
||||||
|
fn new(watch: &'a W) -> Self {
|
||||||
|
Self {
|
||||||
|
watch,
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends a new value to the `Watch`.
|
||||||
|
pub fn send(&self, val: T) {
|
||||||
|
self.watch.send(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clears the value of the `Watch`.
|
||||||
|
/// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending.
|
||||||
|
pub fn clear(&self) {
|
||||||
|
self.watch.clear()
|
||||||
|
}
|
||||||
|
|
||||||
/// Tries to retrieve the value of the `Watch`.
|
/// Tries to retrieve the value of the `Watch`.
|
||||||
pub fn try_peek(&self) -> Option<T> {
|
pub fn try_peek(&self) -> Option<T> {
|
||||||
self.inner_try_peek()
|
self.watch.try_peek()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if the `Watch` contains a value.
|
/// Returns true if the `Watch` contains a value.
|
||||||
pub fn contains_value(&self) -> bool {
|
pub fn contains_value(&self) -> bool {
|
||||||
self.inner_contains_value()
|
self.watch.contains_value()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending.
|
/// A sender of a `Watch` channel.
|
||||||
pub fn clear(&self) {
|
///
|
||||||
self.mutex.lock(|state| {
|
/// For a simpler type definition, consider [`DynSender`] at the expense of
|
||||||
let mut s = state.borrow_mut();
|
/// some runtime performance due to dynamic dispatch.
|
||||||
s.data = None;
|
pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
|
||||||
})
|
|
||||||
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(self.0.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
|
||||||
|
/// Converts the `Sender` into a [`DynSender`].
|
||||||
|
pub fn as_dyn(self) -> DynSender<'a, T> {
|
||||||
|
DynSender(Snd::new(self.watch))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
|
||||||
|
fn into(self) -> DynSender<'a, T> {
|
||||||
|
self.as_dyn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
|
||||||
|
type Target = Snd<'a, T, Watch<M, T, N>>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A sender which holds a **dynamic** reference to a `Watch` channel.
|
||||||
|
///
|
||||||
|
/// This is an alternative to [`Sender`] with a simpler type definition,
|
||||||
|
pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
|
||||||
|
|
||||||
|
impl<'a, T: Clone> Clone for DynSender<'a, T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(self.0.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Clone> Deref for DynSender<'a, T> {
|
||||||
|
type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,59 +384,83 @@ pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
|
|||||||
|
|
||||||
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
|
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
|
||||||
/// Creates a new `Receiver` with a reference to the `Watch`.
|
/// Creates a new `Receiver` with a reference to the `Watch`.
|
||||||
fn new(watch: &'a W) -> Self {
|
fn new(watch: &'a W, at_id: u64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
watch,
|
watch,
|
||||||
at_id: 0,
|
at_id,
|
||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen.
|
/// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen.
|
||||||
|
///
|
||||||
|
/// **Note**: Futures do nothing unless you `.await` or poll them.
|
||||||
pub async fn peek(&self) -> T {
|
pub async fn peek(&self) -> T {
|
||||||
poll_fn(|cx| self.watch.inner_poll_peek(cx)).await
|
poll_fn(|cx| self.watch.poll_peek(cx)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen.
|
/// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen.
|
||||||
pub fn try_peek(&self) -> Option<T> {
|
pub fn try_peek(&self) -> Option<T> {
|
||||||
self.watch.inner_try_peek()
|
self.watch.try_peek()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the current value of the `Watch` if it is initialized, marking it as seen.
|
/// Returns the current value of the `Watch` once it is initialized, marking it as seen.
|
||||||
|
///
|
||||||
|
/// **Note**: Futures do nothing unless you `.await` or poll them.
|
||||||
pub async fn get(&mut self) -> T {
|
pub async fn get(&mut self) -> T {
|
||||||
poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await
|
poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tries to get the current value of the `Watch` without waiting, marking it as seen.
|
/// Tries to get the current value of the `Watch` without waiting, marking it as seen.
|
||||||
pub fn try_get(&mut self) -> Option<T> {
|
pub fn try_get(&mut self) -> Option<T> {
|
||||||
self.watch.inner_try_get(&mut self.at_id)
|
self.watch.try_get(&mut self.at_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Waits for the `Watch` to change and returns the new value, marking it as seen.
|
/// Waits for the `Watch` to change and returns the new value, marking it as seen.
|
||||||
|
///
|
||||||
|
/// **Note**: Futures do nothing unless you `.await` or poll them.
|
||||||
pub async fn changed(&mut self) -> T {
|
pub async fn changed(&mut self) -> T {
|
||||||
poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await
|
poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tries to get the new value of the watch without waiting, marking it as seen.
|
/// Tries to get the new value of the watch without waiting, marking it as seen.
|
||||||
pub fn try_changed(&mut self) -> Option<T> {
|
pub fn try_changed(&mut self) -> Option<T> {
|
||||||
self.watch.inner_try_changed(&mut self.at_id)
|
self.watch.try_changed(&mut self.at_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks if the `Watch` contains a value. If this returns true,
|
/// Checks if the `Watch` contains a value. If this returns true,
|
||||||
/// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately.
|
/// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately.
|
||||||
pub fn contains_value(&self) -> bool {
|
pub fn contains_value(&self) -> bool {
|
||||||
self.watch.inner_contains_value()
|
self.watch.contains_value()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.watch.drop_receiver();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A receiver of a `Watch` channel.
|
/// A receiver of a `Watch` channel.
|
||||||
pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
|
pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
|
||||||
|
|
||||||
/// A receiver which holds a **reference** to a `Watch` channel.
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
|
||||||
///
|
/// Converts the `Receiver` into a [`DynReceiver`].
|
||||||
/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
|
pub fn as_dyn(self) -> DynReceiver<'a, T> {
|
||||||
/// some runtime performance due to dynamic dispatch.
|
// We need to increment the receiver count since the original
|
||||||
pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
|
// receiver is being dropped, which decrements the count.
|
||||||
|
self.watch.mutex.lock(|state| {
|
||||||
|
state.borrow_mut().receiver_count += 1;
|
||||||
|
});
|
||||||
|
DynReceiver(Rcv::new(self.0.watch, self.at_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
|
||||||
|
fn into(self) -> DynReceiver<'a, T> {
|
||||||
|
self.as_dyn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
|
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
|
||||||
type Target = Rcv<'a, T, Watch<M, T, N>>;
|
type Target = Rcv<'a, T, Watch<M, T, N>>;
|
||||||
@ -330,6 +476,12 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A receiver which holds a **dynamic** reference to a `Watch` channel.
|
||||||
|
///
|
||||||
|
/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
|
||||||
|
/// some runtime performance due to dynamic dispatch.
|
||||||
|
pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
|
||||||
|
|
||||||
impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
|
impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
|
||||||
type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
|
type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
|
||||||
|
|
||||||
@ -348,167 +500,242 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use futures_executor::block_on;
|
use futures_executor::block_on;
|
||||||
|
|
||||||
use super::*;
|
use super::Watch;
|
||||||
use crate::blocking_mutex::raw::CriticalSectionRawMutex;
|
use crate::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn multiple_writes() {
|
fn multiple_sends() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Obtain receiver and sender
|
||||||
let mut rcv0 = WATCH.receiver().unwrap();
|
let mut rcv = WATCH.receiver().unwrap();
|
||||||
let mut rcv1 = WATCH.dyn_receiver().unwrap();
|
let snd = WATCH.sender();
|
||||||
|
|
||||||
WATCH.write(10);
|
// Not initialized
|
||||||
|
assert_eq!(rcv.try_changed(), None);
|
||||||
|
|
||||||
// Receive the new value
|
// Receive the new value
|
||||||
assert_eq!(rcv0.changed().await, 10);
|
snd.send(10);
|
||||||
assert_eq!(rcv1.changed().await, 10);
|
assert_eq!(rcv.changed().await, 10);
|
||||||
|
|
||||||
|
// Receive another value
|
||||||
|
snd.send(20);
|
||||||
|
assert_eq!(rcv.try_changed(), Some(20));
|
||||||
|
|
||||||
// No update
|
// No update
|
||||||
assert_eq!(rcv0.try_changed(), None);
|
assert_eq!(rcv.try_changed(), None);
|
||||||
assert_eq!(rcv1.try_changed(), None);
|
|
||||||
|
|
||||||
WATCH.write(20);
|
|
||||||
|
|
||||||
assert_eq!(rcv0.changed().await, 20);
|
|
||||||
assert_eq!(rcv1.changed().await, 20);
|
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn max_receivers() {
|
fn receive_after_create() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Obtain sender and send value
|
||||||
let _ = WATCH.receiver().unwrap();
|
let snd = WATCH.sender();
|
||||||
let _ = WATCH.receiver().unwrap();
|
snd.send(10);
|
||||||
assert!(WATCH.receiver().is_err());
|
|
||||||
|
// Obtain receiver and receive value
|
||||||
|
let mut rcv = WATCH.receiver().unwrap();
|
||||||
|
assert_eq!(rcv.try_changed(), Some(10));
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn receive_initial() {
|
fn max_receivers_drop() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Try to create 3 receivers (only 2 can exist at once)
|
||||||
|
let rcv0 = WATCH.receiver();
|
||||||
|
let rcv1 = WATCH.receiver();
|
||||||
|
let rcv2 = WATCH.receiver();
|
||||||
|
|
||||||
|
// Ensure the first two are successful and the third is not
|
||||||
|
assert!(rcv0.is_ok());
|
||||||
|
assert!(rcv1.is_ok());
|
||||||
|
assert!(rcv2.is_err());
|
||||||
|
|
||||||
|
// Drop the first receiver
|
||||||
|
drop(rcv0);
|
||||||
|
|
||||||
|
// Create another receiver and ensure it is successful
|
||||||
|
let rcv3 = WATCH.receiver();
|
||||||
|
assert!(rcv3.is_ok());
|
||||||
|
};
|
||||||
|
block_on(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn multiple_receivers() {
|
||||||
|
let f = async {
|
||||||
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
|
// Obtain receivers and sender
|
||||||
let mut rcv0 = WATCH.receiver().unwrap();
|
let mut rcv0 = WATCH.receiver().unwrap();
|
||||||
let mut rcv1 = WATCH.receiver().unwrap();
|
let mut rcv1 = WATCH.receiver().unwrap();
|
||||||
|
let snd = WATCH.sender();
|
||||||
|
|
||||||
assert_eq!(rcv0.contains_value(), false);
|
// No update for both
|
||||||
|
|
||||||
assert_eq!(rcv0.try_changed(), None);
|
assert_eq!(rcv0.try_changed(), None);
|
||||||
assert_eq!(rcv1.try_changed(), None);
|
assert_eq!(rcv1.try_changed(), None);
|
||||||
|
|
||||||
WATCH.write(0);
|
// Send a new value
|
||||||
|
snd.send(0);
|
||||||
assert_eq!(rcv0.contains_value(), true);
|
|
||||||
|
|
||||||
|
// Both receivers receive the new value
|
||||||
assert_eq!(rcv0.try_changed(), Some(0));
|
assert_eq!(rcv0.try_changed(), Some(0));
|
||||||
assert_eq!(rcv1.try_changed(), Some(0));
|
assert_eq!(rcv1.try_changed(), Some(0));
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn clone_senders() {
|
||||||
|
let f = async {
|
||||||
|
// Obtain different ways to send
|
||||||
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
|
||||||
|
let snd0 = WATCH.sender();
|
||||||
|
let snd1 = snd0.clone();
|
||||||
|
|
||||||
|
// Obtain Receiver
|
||||||
|
let mut rcv = WATCH.receiver().unwrap().as_dyn();
|
||||||
|
|
||||||
|
// Send a value from first sender
|
||||||
|
snd0.send(10);
|
||||||
|
assert_eq!(rcv.try_changed(), Some(10));
|
||||||
|
|
||||||
|
// Send a value from second sender
|
||||||
|
snd1.send(20);
|
||||||
|
assert_eq!(rcv.try_changed(), Some(20));
|
||||||
|
};
|
||||||
|
block_on(f);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn peek_get_changed() {
|
fn peek_get_changed() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Obtain receiver and sender
|
||||||
let mut rcv0 = WATCH.receiver().unwrap();
|
let mut rcv = WATCH.receiver().unwrap();
|
||||||
|
let snd = WATCH.sender();
|
||||||
|
|
||||||
WATCH.write(10);
|
// Send a value
|
||||||
|
snd.send(10);
|
||||||
|
|
||||||
// Ensure peek does not mark as seen
|
// Ensure peek does not mark as seen
|
||||||
assert_eq!(rcv0.peek().await, 10);
|
assert_eq!(rcv.peek().await, 10);
|
||||||
assert_eq!(rcv0.try_changed(), Some(10));
|
assert_eq!(rcv.try_changed(), Some(10));
|
||||||
assert_eq!(rcv0.try_changed(), None);
|
assert_eq!(rcv.try_changed(), None);
|
||||||
assert_eq!(rcv0.peek().await, 10);
|
assert_eq!(rcv.try_peek(), Some(10));
|
||||||
|
|
||||||
WATCH.write(20);
|
// Send a value
|
||||||
|
snd.send(20);
|
||||||
|
|
||||||
// Ensure get does mark as seen
|
// Ensure get does mark as seen
|
||||||
assert_eq!(rcv0.get().await, 20);
|
assert_eq!(rcv.get().await, 20);
|
||||||
assert_eq!(rcv0.try_changed(), None);
|
assert_eq!(rcv.try_changed(), None);
|
||||||
assert_eq!(rcv0.try_get(), Some(20));
|
assert_eq!(rcv.try_get(), Some(20));
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn count_ids() {
|
fn use_dynamics() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Obtain receiver and sender
|
||||||
let mut rcv0 = WATCH.receiver().unwrap();
|
let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
|
||||||
let mut rcv1 = WATCH.receiver().unwrap();
|
let dyn_snd = WATCH.dyn_sender();
|
||||||
|
|
||||||
let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id);
|
// Send a value
|
||||||
|
dyn_snd.send(10);
|
||||||
|
|
||||||
WATCH.write(10);
|
// Ensure the dynamic receiver receives the value
|
||||||
|
assert_eq!(dyn_rcv.try_changed(), Some(10));
|
||||||
assert_eq!(rcv0.changed().await, 10);
|
assert_eq!(dyn_rcv.try_changed(), None);
|
||||||
assert_eq!(rcv1.changed().await, 10);
|
|
||||||
|
|
||||||
assert_eq!(rcv0.try_changed(), None);
|
|
||||||
assert_eq!(rcv1.try_changed(), None);
|
|
||||||
|
|
||||||
WATCH.write(20);
|
|
||||||
WATCH.write(20);
|
|
||||||
WATCH.write(20);
|
|
||||||
|
|
||||||
assert_eq!(rcv0.changed().await, 20);
|
|
||||||
assert_eq!(rcv1.changed().await, 20);
|
|
||||||
|
|
||||||
assert_eq!(rcv0.try_changed(), None);
|
|
||||||
assert_eq!(rcv1.try_changed(), None);
|
|
||||||
|
|
||||||
assert_eq!(get_id(), 4);
|
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn peek_still_await() {
|
fn convert_to_dyn() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Obtain receiver and sender
|
||||||
let mut rcv0 = WATCH.receiver().unwrap();
|
let rcv = WATCH.receiver().unwrap();
|
||||||
let mut rcv1 = WATCH.receiver().unwrap();
|
let snd = WATCH.sender();
|
||||||
|
|
||||||
WATCH.write(10);
|
// Convert to dynamic
|
||||||
|
let mut dyn_rcv = rcv.as_dyn();
|
||||||
|
let dyn_snd = snd.as_dyn();
|
||||||
|
|
||||||
assert_eq!(rcv0.peek().await, 10);
|
// Send a value
|
||||||
assert_eq!(rcv1.try_peek(), Some(10));
|
dyn_snd.send(10);
|
||||||
|
|
||||||
assert_eq!(rcv0.changed().await, 10);
|
// Ensure the dynamic receiver receives the value
|
||||||
assert_eq!(rcv1.changed().await, 10);
|
assert_eq!(dyn_rcv.try_changed(), Some(10));
|
||||||
|
assert_eq!(dyn_rcv.try_changed(), None);
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn peek_with_static() {
|
fn dynamic_receiver_count() {
|
||||||
let f = async {
|
let f = async {
|
||||||
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
// Obtain Receivers
|
// Obtain receiver and sender
|
||||||
let rcv0 = WATCH.receiver().unwrap();
|
let rcv0 = WATCH.receiver();
|
||||||
let rcv1 = WATCH.receiver().unwrap();
|
let rcv1 = WATCH.receiver();
|
||||||
|
let rcv2 = WATCH.receiver();
|
||||||
|
|
||||||
WATCH.write(20);
|
// Ensure the first two are successful and the third is not
|
||||||
|
assert!(rcv0.is_ok());
|
||||||
|
assert!(rcv1.is_ok());
|
||||||
|
assert!(rcv2.is_err());
|
||||||
|
|
||||||
assert_eq!(rcv0.peek().await, 20);
|
// Convert to dynamic
|
||||||
assert_eq!(rcv1.peek().await, 20);
|
let dyn_rcv0 = rcv0.unwrap().as_dyn();
|
||||||
assert_eq!(WATCH.try_peek(), Some(20));
|
|
||||||
|
// Drop the (now dynamic) receiver
|
||||||
|
drop(dyn_rcv0);
|
||||||
|
|
||||||
|
// Create another receiver and ensure it is successful
|
||||||
|
let rcv3 = WATCH.receiver();
|
||||||
|
let rcv4 = WATCH.receiver();
|
||||||
|
assert!(rcv3.is_ok());
|
||||||
|
assert!(rcv4.is_err());
|
||||||
|
};
|
||||||
|
block_on(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn contains_value() {
|
||||||
|
let f = async {
|
||||||
|
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
|
||||||
|
|
||||||
|
// Obtain receiver and sender
|
||||||
|
let rcv = WATCH.receiver().unwrap();
|
||||||
|
let snd = WATCH.sender();
|
||||||
|
|
||||||
|
// check if the watch contains a value
|
||||||
|
assert_eq!(rcv.contains_value(), false);
|
||||||
|
assert_eq!(snd.contains_value(), false);
|
||||||
|
|
||||||
|
// Send a value
|
||||||
|
snd.send(10);
|
||||||
|
|
||||||
|
// check if the watch contains a value
|
||||||
|
assert_eq!(rcv.contains_value(), true);
|
||||||
|
assert_eq!(snd.contains_value(), true);
|
||||||
};
|
};
|
||||||
block_on(f);
|
block_on(f);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user