diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index d88c76db5..f02985564 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -12,6 +12,7 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; +pub mod multi_signal; pub mod mutex; pub mod pipe; pub mod priority_channel; diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs new file mode 100644 index 000000000..db858f269 --- /dev/null +++ b/embassy-sync/src/multi_signal.rs @@ -0,0 +1,285 @@ +//! A synchronization primitive for passing the latest value to **multiple** tasks. +use core::{ + cell::RefCell, + marker::PhantomData, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::Future; + +use crate::{ + blocking_mutex::{raw::RawMutex, Mutex}, + waitqueue::MultiWakerRegistration, +}; + +/// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. +/// +/// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to +/// `.await` the latest value, and all receive it. +/// +/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except +/// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead +/// of waiting for the receivers to pop the previous value. +/// +/// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other +/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for +/// [`Receiver`]s to "lose" stale values. +/// +/// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared +/// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`] +/// with [`MultiSignal::receiver`] which has async methods. +/// ``` +/// +/// use futures_executor::block_on; +/// use embassy_sync::multi_signal::MultiSignal; +/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +/// +/// let f = async { +/// +/// static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); +/// +/// // Obtain Receivers +/// let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); +/// let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); +/// assert!(SOME_SIGNAL.receiver().is_err()); +/// +/// SOME_SIGNAL.write(10); +/// +/// // Receive the new value +/// assert_eq!(rcv0.changed().await, 10); +/// assert_eq!(rcv1.try_changed(), Some(10)); +/// +/// // No update +/// assert_eq!(rcv0.try_changed(), None); +/// assert_eq!(rcv1.try_changed(), None); +/// +/// SOME_SIGNAL.write(20); +/// +/// // Receive new value with predicate +/// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20); +/// assert_eq!(rcv1.try_changed_and(|x|x>&30), None); +/// +/// // Anyone can peek the current value +/// assert_eq!(rcv0.peek(), 20); +/// assert_eq!(rcv1.peek(), 20); +/// assert_eq!(SOME_SIGNAL.peek(), 20); +/// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None); +/// }; +/// block_on(f); +/// ``` +pub struct MultiSignal<'a, M: RawMutex, T: Clone, const N: usize> { + mutex: Mutex>>, + _phantom: PhantomData<&'a ()>, +} + +struct MultiSignalState { + data: T, + current_id: u64, + wakers: MultiWakerRegistration, + receiver_count: usize, +} + +#[derive(Debug)] +/// An error that can occur when a `MultiSignal` returns a `Result`. +pub enum Error { + /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached. + MaximumReceiversReached, +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { + /// Create a new `MultiSignal` initialized with the given value. + pub const fn new(init: T) -> Self { + Self { + mutex: Mutex::new(RefCell::new(MultiSignalState { + data: init, + current_id: 1, + wakers: MultiWakerRegistration::new(), + receiver_count: 0, + })), + _phantom: PhantomData, + } + } + + /// Get a [`Receiver`] for the `MultiSignal`. + pub fn receiver(&'a self) -> Result, Error> { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if s.receiver_count < N { + s.receiver_count += 1; + Ok(Receiver(Rcv::new(self))) + } else { + Err(Error::MaximumReceiversReached) + } + }) + } + + /// Update the value of the `MultiSignal`. + pub fn write(&self, data: T) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = data; + s.current_id += 1; + s.wakers.wake(); + }) + } + + /// Peek the current value of the `MultiSignal`. + pub fn peek(&self) -> T { + self.mutex.lock(|state| state.borrow().data.clone()) + } + + /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. + pub fn peek_and(&self, f: fn(&T) -> bool) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + if f(&s.data) { + Some(s.data.clone()) + } else { + None + } + }) + } + + /// Get the ID of the current value of the `MultiSignal`. + /// This method is mostly for testing purposes. + #[allow(dead_code)] + fn get_id(&self) -> u64 { + self.mutex.lock(|state| state.borrow().current_id) + } + + /// Poll the `MultiSignal` with an optional context. + fn get_with_context(&'a self, waker: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match (s.current_id > waker.at_id, waker.predicate) { + (true, None) => { + waker.at_id = s.current_id; + Poll::Ready(s.data.clone()) + } + (true, Some(f)) if f(&s.data) => { + waker.at_id = s.current_id; + Poll::Ready(s.data.clone()) + } + _ => { + if let Some(cx) = cx { + s.wakers.register(cx.waker()); + } + Poll::Pending + } + } + }) + } +} + +/// A receiver is able to `.await` a changed `MultiSignal` value. +pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { + multi_sig: &'a MultiSignal<'a, M, T, N>, + predicate: Option bool>, + at_id: u64, +} + +// f: Option bool> +impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { + /// Create a new `Receiver` with a reference the given `MultiSignal`. + fn new(multi_sig: &'a MultiSignal<'a, M, T, N>) -> Self { + Self { + multi_sig, + predicate: None, + at_id: 0, + } + } + + /// Wait for a change to the value of the corresponding `MultiSignal`. + pub fn changed<'s>(&'s mut self) -> ReceiverFuture<'s, 'a, M, T, N> { + self.predicate = None; + ReceiverFuture { subscriber: self } + } + + /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. + pub fn changed_and<'s>(&'s mut self, f: fn(&T) -> bool) -> ReceiverFuture<'s, 'a, M, T, N> { + self.predicate = Some(f); + ReceiverFuture { subscriber: self } + } + + /// Try to get a changed value of the corresponding `MultiSignal`. + pub fn try_changed(&mut self) -> Option { + self.multi_sig.mutex.lock(|state| { + let s = state.borrow(); + match s.current_id > self.at_id { + true => { + self.at_id = s.current_id; + Some(s.data.clone()) + } + false => None, + } + }) + } + + /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. + pub fn try_changed_and(&mut self, f: fn(&T) -> bool) -> Option { + self.multi_sig.mutex.lock(|state| { + let s = state.borrow(); + match s.current_id > self.at_id && f(&s.data) { + true => { + self.at_id = s.current_id; + Some(s.data.clone()) + } + false => None, + } + }) + } + + /// Peek the current value of the corresponding `MultiSignal`. + pub fn peek(&self) -> T { + self.multi_sig.peek() + } + + /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. + pub fn peek_and(&self, f: fn(&T) -> bool) -> Option { + self.multi_sig.peek_and(f) + } + + /// Check if the value of the corresponding `MultiSignal` has changed. + pub fn has_changed(&mut self) -> bool { + self.multi_sig + .mutex + .lock(|state| state.borrow().current_id > self.at_id) + } +} + +/// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. +pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); + +impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { + type Target = Rcv<'a, M, T, N>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the `Receiver` wait action +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReceiverFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { + subscriber: &'s mut Rcv<'a, M, T, N>, +} + +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverFuture<'s, 'a, M, T, N> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.subscriber + .multi_sig + .get_with_context(&mut self.subscriber, Some(cx)) + } +} + +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverFuture<'s, 'a, M, T, N> {}