//! A synchronization primitive for passing the latest value to **multiple** receivers. use core::cell::RefCell; use core::future::{poll_fn, Future}; use core::marker::PhantomData; use core::ops::{Deref, DerefMut}; use core::task::{Context, Poll}; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::MultiWakerRegistration; /// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await /// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers, /// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous /// value when a new one is sent, without waiting for all receivers to read the previous value. /// /// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks /// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are /// always provided with the latest value. /// /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`] /// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the /// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel. /// ``` /// /// use futures_executor::block_on; /// use embassy_sync::watch::Watch; /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; /// /// let f = async { /// /// static WATCH: Watch = Watch::new(); /// /// // Obtain receivers and sender /// let mut rcv0 = WATCH.receiver().unwrap(); /// let mut rcv1 = WATCH.dyn_receiver().unwrap(); /// let mut snd = WATCH.sender(); /// /// // No more receivers, and no update /// assert!(WATCH.receiver().is_none()); /// assert_eq!(rcv1.try_changed(), None); /// /// snd.send(10); /// /// // Receive the new value (async or try) /// assert_eq!(rcv0.changed().await, 10); /// assert_eq!(rcv1.try_changed(), Some(10)); /// /// // No update /// assert_eq!(rcv0.try_changed(), None); /// assert_eq!(rcv1.try_changed(), None); /// /// snd.send(20); /// /// // Using `get` marks the value as seen /// assert_eq!(rcv1.get().await, 20); /// assert_eq!(rcv1.try_changed(), None); /// /// // But `get` also returns when unchanged /// assert_eq!(rcv1.get().await, 20); /// assert_eq!(rcv1.get().await, 20); /// /// }; /// block_on(f); /// ``` pub struct Watch { mutex: Mutex>>, } struct WatchState { data: Option, current_id: u64, wakers: MultiWakerRegistration, receiver_count: usize, } trait SealedWatchBehavior { /// Poll the `Watch` for the current value, making it as seen. fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Poll the `Watch` for the value if it matches the predicate function /// `f`, making it as seen. fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. fn try_changed(&self, id: &mut u64) -> Option; /// Poll the `Watch` for a changed value that matches the predicate function /// `f`, marking it as seen. fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; /// Tries to retrieve the value of the `Watch` if it has changed and matches the /// predicate function `f`, marking it as seen. fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option; /// Used when a receiver is dropped to decrement the receiver count. /// /// ## This method should not be called by the user. fn drop_receiver(&self); /// Clears the value of the `Watch`. fn clear(&self); /// Sends a new value to the `Watch`. fn send(&self, val: T); /// Modify the value of the `Watch` using a closure. Returns `false` if the /// `Watch` does not already contain a value. fn send_modify(&self, f: &mut dyn Fn(&mut Option)); /// Modify the value of the `Watch` using a closure. Returns `false` if the /// `Watch` does not already contain a value. fn send_if_modified(&self, f: &mut dyn Fn(&mut Option) -> bool); } /// A trait representing the 'inner' behavior of the `Watch`. #[allow(private_bounds)] pub trait WatchBehavior: SealedWatchBehavior { /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. fn try_get(&self, id: Option<&mut u64>) -> Option; /// Tries to get the value of the `Watch` if it matches the predicate function /// `f`, marking it as seen. fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option; /// Checks if the `Watch` is been initialized with a value. fn contains_value(&self) -> bool; } impl SealedWatchBehavior for Watch { fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match &s.data { Some(data) => { *id = s.current_id; Poll::Ready(data.clone()) } None => { s.wakers.register(cx.waker()); Poll::Pending } } }) } fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match s.data { Some(ref data) if f(data) => { *id = s.current_id; Poll::Ready(data.clone()) } _ => { s.wakers.register(cx.waker()); Poll::Pending } } }) } fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match (&s.data, s.current_id > *id) { (Some(data), true) => { *id = s.current_id; Poll::Ready(data.clone()) } _ => { s.wakers.register(cx.waker()); Poll::Pending } } }) } fn try_changed(&self, id: &mut u64) -> Option { self.mutex.lock(|state| { let s = state.borrow(); match s.current_id > *id { true => { *id = s.current_id; s.data.clone() } false => None, } }) } fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match (&s.data, s.current_id > *id) { (Some(data), true) if f(data) => { *id = s.current_id; Poll::Ready(data.clone()) } _ => { s.wakers.register(cx.waker()); Poll::Pending } } }) } fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option { self.mutex.lock(|state| { let s = state.borrow(); match (&s.data, s.current_id > *id) { (Some(data), true) if f(data) => { *id = s.current_id; s.data.clone() } _ => None, } }) } fn drop_receiver(&self) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); s.receiver_count -= 1; }) } fn clear(&self) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); s.data = None; }) } fn send(&self, val: T) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); s.data = Some(val); s.current_id += 1; s.wakers.wake(); }) } fn send_modify(&self, f: &mut dyn Fn(&mut Option)) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); f(&mut s.data); s.current_id += 1; s.wakers.wake(); }) } fn send_if_modified(&self, f: &mut dyn Fn(&mut Option) -> bool) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); if f(&mut s.data) { s.current_id += 1; s.wakers.wake(); } }) } } impl WatchBehavior for Watch { fn try_get(&self, id: Option<&mut u64>) -> Option { self.mutex.lock(|state| { let s = state.borrow(); if let Some(id) = id { *id = s.current_id; } s.data.clone() }) } fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option { self.mutex.lock(|state| { let s = state.borrow(); match s.data { Some(ref data) if f(data) => { if let Some(id) = id { *id = s.current_id; } Some(data.clone()) } _ => None, } }) } fn contains_value(&self) -> bool { self.mutex.lock(|state| state.borrow().data.is_some()) } } impl Watch { /// Create a new `Watch` channel. pub const fn new() -> Self { Self { mutex: Mutex::new(RefCell::new(WatchState { data: None, current_id: 0, wakers: MultiWakerRegistration::new(), receiver_count: 0, })), } } /// Create a new `Watch` channel with default data. pub const fn new_with(data: T) -> Self { Self { mutex: Mutex::new(RefCell::new(WatchState { data: Some(data), current_id: 0, wakers: MultiWakerRegistration::new(), receiver_count: 0, })), } } /// Create a new [`Sender`] for the `Watch`. pub fn sender(&self) -> Sender<'_, M, T, N> { Sender(Snd::new(self)) } /// Create a new [`DynSender`] for the `Watch`. pub fn dyn_sender(&self) -> DynSender<'_, T> { DynSender(Snd::new(self)) } /// Try to create a new [`Receiver`] for the `Watch`. If the /// maximum number of receivers has been reached, `None` is returned. pub fn receiver(&self) -> Option> { self.mutex.lock(|state| { let mut s = state.borrow_mut(); if s.receiver_count < N { s.receiver_count += 1; Some(Receiver(Rcv::new(self, 0))) } else { None } }) } /// Try to create a new [`DynReceiver`] for the `Watch`. If the /// maximum number of receivers has been reached, `None` is returned. pub fn dyn_receiver(&self) -> Option> { self.mutex.lock(|state| { let mut s = state.borrow_mut(); if s.receiver_count < N { s.receiver_count += 1; Some(DynReceiver(Rcv::new(self, 0))) } else { None } }) } /// Try to create a new [`AnonReceiver`] for the `Watch`. pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> { AnonReceiver(AnonRcv::new(self, 0)) } /// Try to create a new [`DynAnonReceiver`] for the `Watch`. pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> { DynAnonReceiver(AnonRcv::new(self, 0)) } /// Returns the message ID of the latest message sent to the `Watch`. /// /// This counter is monotonic, and is incremented every time a new message is sent. pub fn get_msg_id(&self) -> u64 { self.mutex.lock(|state| state.borrow().current_id) } /// Tries to get the value of the `Watch`. pub fn try_get(&self) -> Option { WatchBehavior::try_get(self, None) } /// Tries to get the value of the `Watch` if it matches the predicate function `f`. pub fn try_get_and(&self, mut f: F) -> Option where F: Fn(&T) -> bool, { WatchBehavior::try_get_and(self, None, &mut f) } } /// A receiver can `.await` a change in the `Watch` value. pub struct Snd<'a, T: Clone, W: WatchBehavior + ?Sized> { watch: &'a W, _phantom: PhantomData, } impl<'a, T: Clone, W: WatchBehavior + ?Sized> Clone for Snd<'a, T, W> { fn clone(&self) -> Self { Self { watch: self.watch, _phantom: PhantomData, } } } impl<'a, T: Clone, W: WatchBehavior + ?Sized> Snd<'a, T, W> { /// Creates a new `Receiver` with a reference to the `Watch`. fn new(watch: &'a W) -> Self { Self { watch, _phantom: PhantomData, } } /// Sends a new value to the `Watch`. pub fn send(&self, val: T) { self.watch.send(val) } /// Clears the value of the `Watch`. /// This will cause calls to [`Rcv::get`] to be pending. pub fn clear(&self) { self.watch.clear() } /// Tries to retrieve the value of the `Watch`. pub fn try_get(&self) -> Option { self.watch.try_get(None) } /// Tries to peek the current value of the `Watch` if it matches the predicate /// function `f`. pub fn try_get_and(&self, mut f: F) -> Option where F: Fn(&T) -> bool, { self.watch.try_get_and(None, &mut f) } /// Returns true if the `Watch` contains a value. pub fn contains_value(&self) -> bool { self.watch.contains_value() } /// Modify the value of the `Watch` using a closure. pub fn send_modify(&self, mut f: F) where F: Fn(&mut Option), { self.watch.send_modify(&mut f) } /// Modify the value of the `Watch` using a closure. The closure must return /// `true` if the value was modified, which notifies all receivers. pub fn send_if_modified(&self, mut f: F) where F: Fn(&mut Option) -> bool, { self.watch.send_if_modified(&mut f) } } /// A sender of a `Watch` channel. /// /// For a simpler type definition, consider [`DynSender`] at the expense of /// some runtime performance due to dynamic dispatch. pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch>); impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { fn clone(&self) -> Self { Self(self.0.clone()) } } impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> { /// Converts the `Sender` into a [`DynSender`]. pub fn as_dyn(self) -> DynSender<'a, T> { DynSender(Snd::new(self.watch)) } } impl<'a, M: RawMutex, T: Clone, const N: usize> Into> for Sender<'a, M, T, N> { fn into(self) -> DynSender<'a, T> { self.as_dyn() } } impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> { type Target = Snd<'a, T, Watch>; fn deref(&self) -> &Self::Target { &self.0 } } impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } /// A sender which holds a **dynamic** reference to a `Watch` channel. /// /// This is an alternative to [`Sender`] with a simpler type definition, pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior + 'a>); impl<'a, T: Clone> Clone for DynSender<'a, T> { fn clone(&self) -> Self { Self(self.0.clone()) } } impl<'a, T: Clone> Deref for DynSender<'a, T> { type Target = Snd<'a, T, dyn WatchBehavior + 'a>; fn deref(&self) -> &Self::Target { &self.0 } } impl<'a, T: Clone> DerefMut for DynSender<'a, T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } /// A receiver can `.await` a change in the `Watch` value. pub struct Rcv<'a, T: Clone, W: WatchBehavior + ?Sized> { watch: &'a W, at_id: u64, _phantom: PhantomData, } impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { /// Creates a new `Receiver` with a reference to the `Watch`. fn new(watch: &'a W, at_id: u64) -> Self { Self { watch, at_id, _phantom: PhantomData, } } /// Returns the current value of the `Watch` once it is initialized, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. pub fn get(&mut self) -> impl Future + '_ { poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)) } /// Tries to get the current value of the `Watch` without waiting, marking it as seen. pub fn try_get(&mut self) -> Option { self.watch.try_get(Some(&mut self.at_id)) } /// Returns the value of the `Watch` if it matches the predicate function `f`, /// or waits for it to match, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. pub async fn get_and(&mut self, mut f: F) -> T where F: Fn(&T) -> bool, { poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await } /// Tries to get the current value of the `Watch` if it matches the predicate /// function `f` without waiting, marking it as seen. pub fn try_get_and(&mut self, mut f: F) -> Option where F: Fn(&T) -> bool, { self.watch.try_get_and(Some(&mut self.at_id), &mut f) } /// Waits for the `Watch` to change and returns the new value, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. pub async fn changed(&mut self) -> T { poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await } /// Tries to get the new value of the watch without waiting, marking it as seen. pub fn try_changed(&mut self) -> Option { self.watch.try_changed(&mut self.at_id) } /// Waits for the `Watch` to change to a value which satisfies the predicate /// function `f` and returns the new value, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. pub async fn changed_and(&mut self, mut f: F) -> T where F: Fn(&T) -> bool, { poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await } /// Tries to get the new value of the watch which satisfies the predicate /// function `f` and returns the new value without waiting, marking it as seen. pub fn try_changed_and(&mut self, mut f: F) -> Option where F: Fn(&T) -> bool, { self.watch.try_changed_and(&mut self.at_id, &mut f) } /// Checks if the `Watch` contains a value. If this returns true, /// then awaiting [`Rcv::get`] will return immediately. pub fn contains_value(&self) -> bool { self.watch.contains_value() } } impl<'a, T: Clone, W: WatchBehavior + ?Sized> Drop for Rcv<'a, T, W> { fn drop(&mut self) { self.watch.drop_receiver(); } } /// A anonymous receiver can NOT `.await` a change in the `Watch` value. pub struct AnonRcv<'a, T: Clone, W: WatchBehavior + ?Sized> { watch: &'a W, at_id: u64, _phantom: PhantomData, } impl<'a, T: Clone, W: WatchBehavior + ?Sized> AnonRcv<'a, T, W> { /// Creates a new `Receiver` with a reference to the `Watch`. fn new(watch: &'a W, at_id: u64) -> Self { Self { watch, at_id, _phantom: PhantomData, } } /// Tries to get the current value of the `Watch` without waiting, marking it as seen. pub fn try_get(&mut self) -> Option { self.watch.try_get(Some(&mut self.at_id)) } /// Tries to get the current value of the `Watch` if it matches the predicate /// function `f` without waiting, marking it as seen. pub fn try_get_and(&mut self, mut f: F) -> Option where F: Fn(&T) -> bool, { self.watch.try_get_and(Some(&mut self.at_id), &mut f) } /// Tries to get the new value of the watch without waiting, marking it as seen. pub fn try_changed(&mut self) -> Option { self.watch.try_changed(&mut self.at_id) } /// Tries to get the new value of the watch which satisfies the predicate /// function `f` and returns the new value without waiting, marking it as seen. pub fn try_changed_and(&mut self, mut f: F) -> Option where F: Fn(&T) -> bool, { self.watch.try_changed_and(&mut self.at_id, &mut f) } /// Checks if the `Watch` contains a value. If this returns true, /// then awaiting [`Rcv::get`] will return immediately. pub fn contains_value(&self) -> bool { self.watch.contains_value() } } /// A receiver of a `Watch` channel. pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch>); impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { /// Converts the `Receiver` into a [`DynReceiver`]. pub fn as_dyn(self) -> DynReceiver<'a, T> { let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id)); core::mem::forget(self); // Ensures the destructor is not called rcv } } impl<'a, M: RawMutex, T: Clone, const N: usize> Into> for Receiver<'a, M, T, N> { fn into(self) -> DynReceiver<'a, T> { self.as_dyn() } } impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { type Target = Rcv<'a, T, Watch>; fn deref(&self) -> &Self::Target { &self.0 } } impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } /// A receiver which holds a **dynamic** reference to a `Watch` channel. /// /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of /// some runtime performance due to dynamic dispatch. pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior + 'a>); impl<'a, T: Clone> Deref for DynReceiver<'a, T> { type Target = Rcv<'a, T, dyn WatchBehavior + 'a>; fn deref(&self) -> &Self::Target { &self.0 } } impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } /// A receiver of a `Watch` channel that cannot `.await` values. pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch>); impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { /// Converts the `Receiver` into a [`DynReceiver`]. pub fn as_dyn(self) -> DynAnonReceiver<'a, T> { let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id)); core::mem::forget(self); // Ensures the destructor is not called rcv } } impl<'a, M: RawMutex, T: Clone, const N: usize> Into> for AnonReceiver<'a, M, T, N> { fn into(self) -> DynAnonReceiver<'a, T> { self.as_dyn() } } impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> { type Target = AnonRcv<'a, T, Watch>; fn deref(&self) -> &Self::Target { &self.0 } } impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } /// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel. /// /// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of /// some runtime performance due to dynamic dispatch. pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior + 'a>); impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> { type Target = AnonRcv<'a, T, dyn WatchBehavior + 'a>; fn deref(&self) -> &Self::Target { &self.0 } } impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } #[cfg(test)] mod tests { use futures_executor::block_on; use super::Watch; use crate::blocking_mutex::raw::CriticalSectionRawMutex; #[test] fn multiple_sends() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let mut rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // Not initialized assert_eq!(rcv.try_changed(), None); // Receive the new value snd.send(10); assert_eq!(rcv.changed().await, 10); // Receive another value snd.send(20); assert_eq!(rcv.try_changed(), Some(20)); // No update assert_eq!(rcv.try_changed(), None); }; block_on(f); } #[test] fn all_try_get() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let mut rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // Not initialized assert_eq!(WATCH.try_get(), None); assert_eq!(rcv.try_get(), None); assert_eq!(snd.try_get(), None); // Receive the new value snd.send(10); assert_eq!(WATCH.try_get(), Some(10)); assert_eq!(rcv.try_get(), Some(10)); assert_eq!(snd.try_get(), Some(10)); assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10)); assert_eq!(rcv.try_get_and(|x| x > &5), Some(10)); assert_eq!(snd.try_get_and(|x| x > &5), Some(10)); assert_eq!(WATCH.try_get_and(|x| x < &5), None); assert_eq!(rcv.try_get_and(|x| x < &5), None); assert_eq!(snd.try_get_and(|x| x < &5), None); }; block_on(f); } #[test] fn once_lock_like() { let f = async { static CONFIG0: u8 = 10; static CONFIG1: u8 = 20; static WATCH: Watch = Watch::new(); // Obtain receiver and sender let mut rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // Not initialized assert_eq!(rcv.try_changed(), None); // Receive the new value snd.send(&CONFIG0); let rcv0 = rcv.changed().await; assert_eq!(rcv0, &10); // Receive another value snd.send(&CONFIG1); let rcv1 = rcv.try_changed(); assert_eq!(rcv1, Some(&20)); // No update assert_eq!(rcv.try_changed(), None); // Ensure similarity with original static assert_eq!(rcv0, &CONFIG0); assert_eq!(rcv1, Some(&CONFIG1)); }; block_on(f); } #[test] fn sender_modify() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let mut rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // Receive the new value snd.send(10); assert_eq!(rcv.try_changed(), Some(10)); // Modify the value inplace snd.send_modify(|opt| { if let Some(inner) = opt { *inner += 5; } }); // Get the modified value assert_eq!(rcv.try_changed(), Some(15)); assert_eq!(rcv.try_changed(), None); }; block_on(f); } #[test] fn predicate_fn() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let mut rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); snd.send(15); assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); assert_eq!(rcv.try_get_and(|x| x < &5), None); assert!(rcv.try_changed().is_none()); snd.send(20); assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20)); assert_eq!(rcv.try_changed_and(|x| x > &5), None); snd.send(25); assert_eq!(rcv.try_changed_and(|x| x < &5), None); assert_eq!(rcv.try_changed(), Some(25)); snd.send(30); assert_eq!(rcv.changed_and(|x| x > &5).await, 30); assert_eq!(rcv.get_and(|x| x > &5).await, 30); }; block_on(f); } #[test] fn receive_after_create() { let f = async { static WATCH: Watch = Watch::new(); // Obtain sender and send value let snd = WATCH.sender(); snd.send(10); // Obtain receiver and receive value let mut rcv = WATCH.receiver().unwrap(); assert_eq!(rcv.try_changed(), Some(10)); }; block_on(f); } #[test] fn max_receivers_drop() { let f = async { static WATCH: Watch = Watch::new(); // Try to create 3 receivers (only 2 can exist at once) let rcv0 = WATCH.receiver(); let rcv1 = WATCH.receiver(); let rcv2 = WATCH.receiver(); // Ensure the first two are successful and the third is not assert!(rcv0.is_some()); assert!(rcv1.is_some()); assert!(rcv2.is_none()); // Drop the first receiver drop(rcv0); // Create another receiver and ensure it is successful let rcv3 = WATCH.receiver(); assert!(rcv3.is_some()); }; block_on(f); } #[test] fn multiple_receivers() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receivers and sender let mut rcv0 = WATCH.receiver().unwrap(); let mut rcv1 = WATCH.anon_receiver(); let snd = WATCH.sender(); // No update for both assert_eq!(rcv0.try_changed(), None); assert_eq!(rcv1.try_changed(), None); // Send a new value snd.send(0); // Both receivers receive the new value assert_eq!(rcv0.try_changed(), Some(0)); assert_eq!(rcv1.try_changed(), Some(0)); }; block_on(f); } #[test] fn clone_senders() { let f = async { // Obtain different ways to send static WATCH: Watch = Watch::new(); let snd0 = WATCH.sender(); let snd1 = snd0.clone(); // Obtain Receiver let mut rcv = WATCH.receiver().unwrap().as_dyn(); // Send a value from first sender snd0.send(10); assert_eq!(rcv.try_changed(), Some(10)); // Send a value from second sender snd1.send(20); assert_eq!(rcv.try_changed(), Some(20)); }; block_on(f); } #[test] fn use_dynamics() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let mut anon_rcv = WATCH.dyn_anon_receiver(); let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); let dyn_snd = WATCH.dyn_sender(); // Send a value dyn_snd.send(10); // Ensure the dynamic receiver receives the value assert_eq!(anon_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), None); }; block_on(f); } #[test] fn convert_to_dyn() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let anon_rcv = WATCH.anon_receiver(); let rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // Convert to dynamic let mut dyn_anon_rcv = anon_rcv.as_dyn(); let mut dyn_rcv = rcv.as_dyn(); let dyn_snd = snd.as_dyn(); // Send a value dyn_snd.send(10); // Ensure the dynamic receiver receives the value assert_eq!(dyn_anon_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), None); }; block_on(f); } #[test] fn dynamic_receiver_count() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let rcv0 = WATCH.receiver(); let rcv1 = WATCH.receiver(); let rcv2 = WATCH.receiver(); // Ensure the first two are successful and the third is not assert!(rcv0.is_some()); assert!(rcv1.is_some()); assert!(rcv2.is_none()); // Convert to dynamic let dyn_rcv0 = rcv0.unwrap().as_dyn(); // Drop the (now dynamic) receiver drop(dyn_rcv0); // Create another receiver and ensure it is successful let rcv3 = WATCH.receiver(); let rcv4 = WATCH.receiver(); assert!(rcv3.is_some()); assert!(rcv4.is_none()); }; block_on(f); } #[test] fn contains_value() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender let rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // check if the watch contains a value assert_eq!(rcv.contains_value(), false); assert_eq!(snd.contains_value(), false); // Send a value snd.send(10); // check if the watch contains a value assert_eq!(rcv.contains_value(), true); assert_eq!(snd.contains_value(), true); }; block_on(f); } }