Merge branch 'embassy-rs:main' into multi-signal

This commit is contained in:
Peter Krull
2024-09-23 19:02:59 +02:00
committed by GitHub
936 changed files with 72495 additions and 17825 deletions

View File

@@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
- Add LazyLock sync primitive.
## 0.6.0 - 2024-05-29
- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `Channel`.
- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PriorityChannel`.
- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`.
- Made `PubSubBehavior` sealed
- If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)`
- Add OnceLock sync primitive.
- Add constructor for DynamicChannel
- Add ready_to_receive functions to Channel and Receiver.
## 0.5.0 - 2023-12-04
- Add a PriorityChannel.
@@ -35,7 +50,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Remove unnecessary uses of `atomic-polyfill`
- Add `#[must_use]` to all futures.
## 0.1.0 - 2022-08-26
- First release

View File

@@ -1,6 +1,6 @@
[package]
name = "embassy-sync"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
description = "no-std, no-alloc synchronization primitives with async support"
repository = "https://github.com/embassy-rs/embassy"
@@ -20,7 +20,7 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/
target = "thumbv7em-none-eabi"
[features]
std = []
std = ["critical-section/std"]
turbowakers = []
[dependencies]

View File

@@ -5,7 +5,7 @@ An [Embassy](https://embassy.dev) project.
Synchronization primitives and data structures with async support:
- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer.
- [`PriorityChannel`](channel::priority::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are sifted to the front of the channel.
- [`PriorityChannel`](channel::priority_channel::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are shifted to the front of the channel.
- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers.
- [`Signal`](signal::Signal) - Signalling latest value to a single consumer.
- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks.
@@ -13,6 +13,7 @@ Synchronization primitives and data structures with async support:
- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`.
- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API.
- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s.
- [`LazyLock`](lazy_lock::LazyLock) - A value which is initialized on the first access
## Interoperability

View File

@@ -1,31 +1,7 @@
use std::env;
#[path = "./build_common.rs"]
mod common;
fn main() {
println!("cargo:rerun-if-changed=build.rs");
let target = env::var("TARGET").unwrap();
if target.starts_with("thumbv6m-") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv6m");
} else if target.starts_with("thumbv7m-") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv7m");
} else if target.starts_with("thumbv7em-") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv7m");
println!("cargo:rustc-cfg=armv7em"); // (not currently used)
} else if target.starts_with("thumbv8m.base") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv8m");
println!("cargo:rustc-cfg=armv8m_base");
} else if target.starts_with("thumbv8m.main") {
println!("cargo:rustc-cfg=cortex_m");
println!("cargo:rustc-cfg=armv8m");
println!("cargo:rustc-cfg=armv8m_main");
}
if target.ends_with("-eabihf") {
println!("cargo:rustc-cfg=has_fpu");
}
let mut cfgs = common::CfgSet::new();
common::set_target_cfgs(&mut cfgs);
}

View File

@@ -0,0 +1,94 @@
// NOTE: this file is copy-pasted between several Embassy crates, because there is no
// straightforward way to share this code:
// - it cannot be placed into the root of the repo and linked from each build.rs using `#[path =
// "../build_common.rs"]`, because `cargo publish` requires that all files published with a crate
// reside in the crate's directory,
// - it cannot be symlinked from `embassy-xxx/build_common.rs` to `../build_common.rs`, because
// symlinks don't work on Windows.
use std::collections::HashSet;
use std::env;
/// Helper for emitting cargo instruction for enabling configs (`cargo:rustc-cfg=X`) and declaring
/// them (`cargo:rust-check-cfg=cfg(X)`).
#[derive(Debug)]
pub struct CfgSet {
enabled: HashSet<String>,
declared: HashSet<String>,
}
impl CfgSet {
pub fn new() -> Self {
Self {
enabled: HashSet::new(),
declared: HashSet::new(),
}
}
/// Enable a config, which can then be used in `#[cfg(...)]` for conditional compilation.
///
/// All configs that can potentially be enabled should be unconditionally declared using
/// [`Self::declare()`].
pub fn enable(&mut self, cfg: impl AsRef<str>) {
if self.enabled.insert(cfg.as_ref().to_owned()) {
println!("cargo:rustc-cfg={}", cfg.as_ref());
}
}
pub fn enable_all(&mut self, cfgs: &[impl AsRef<str>]) {
for cfg in cfgs.iter() {
self.enable(cfg.as_ref());
}
}
/// Declare a valid config for conditional compilation, without enabling it.
///
/// This enables rustc to check that the configs in `#[cfg(...)]` attributes are valid.
pub fn declare(&mut self, cfg: impl AsRef<str>) {
if self.declared.insert(cfg.as_ref().to_owned()) {
println!("cargo:rustc-check-cfg=cfg({})", cfg.as_ref());
}
}
pub fn declare_all(&mut self, cfgs: &[impl AsRef<str>]) {
for cfg in cfgs.iter() {
self.declare(cfg.as_ref());
}
}
pub fn set(&mut self, cfg: impl Into<String>, enable: bool) {
let cfg = cfg.into();
if enable {
self.enable(cfg.clone());
}
self.declare(cfg);
}
}
/// Sets configs that describe the target platform.
pub fn set_target_cfgs(cfgs: &mut CfgSet) {
let target = env::var("TARGET").unwrap();
if target.starts_with("thumbv6m-") {
cfgs.enable_all(&["cortex_m", "armv6m"]);
} else if target.starts_with("thumbv7m-") {
cfgs.enable_all(&["cortex_m", "armv7m"]);
} else if target.starts_with("thumbv7em-") {
cfgs.enable_all(&["cortex_m", "armv7m", "armv7em"]);
} else if target.starts_with("thumbv8m.base") {
cfgs.enable_all(&["cortex_m", "armv8m", "armv8m_base"]);
} else if target.starts_with("thumbv8m.main") {
cfgs.enable_all(&["cortex_m", "armv8m", "armv8m_main"]);
}
cfgs.declare_all(&[
"cortex_m",
"armv6m",
"armv7m",
"armv7em",
"armv8m",
"armv8m_base",
"armv8m_main",
]);
cfgs.set("has_fpu", target.ends_with("-eabihf"));
}

View File

@@ -42,7 +42,7 @@ where
M: RawMutex,
{
fn clone(&self) -> Self {
Sender { channel: self.channel }
*self
}
}
@@ -81,7 +81,7 @@ pub struct DynamicSender<'ch, T> {
impl<'ch, T> Clone for DynamicSender<'ch, T> {
fn clone(&self) -> Self {
DynamicSender { channel: self.channel }
*self
}
}
@@ -135,7 +135,7 @@ where
M: RawMutex,
{
fn clone(&self) -> Self {
Receiver { channel: self.channel }
*self
}
}
@@ -152,6 +152,13 @@ where
self.channel.receive()
}
/// Is a value ready to be received in the channel
///
/// See [`Channel::ready_to_receive()`].
pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
self.channel.ready_to_receive()
}
/// Attempt to immediately receive the next value.
///
/// See [`Channel::try_receive()`]
@@ -181,7 +188,7 @@ pub struct DynamicReceiver<'ch, T> {
impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
fn clone(&self) -> Self {
DynamicReceiver { channel: self.channel }
*self
}
}
@@ -246,6 +253,26 @@ where
}
}
/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`].
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
where
M: RawMutex,
{
channel: &'ch Channel<M, T, N>,
}
impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
where
M: RawMutex,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_receive(cx)
}
}
/// Future returned by [`DynamicReceiver::receive`].
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct DynamicReceiveFuture<'ch, T> {
@@ -263,6 +290,12 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
}
}
impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
Self { channel: value.channel }
}
}
/// Future returned by [`Channel::send`] and [`Sender::send`].
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendFuture<'ch, M, T, const N: usize>
@@ -321,6 +354,15 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
fn from(value: SendFuture<'ch, M, T, N>) -> Self {
Self {
channel: value.channel,
message: value.message,
}
}
}
pub(crate) trait DynamicChannel<T> {
fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
@@ -434,6 +476,22 @@ impl<T, const N: usize> ChannelState<T, N> {
Poll::Pending
}
}
fn clear(&mut self) {
self.queue.clear();
}
fn len(&self) -> usize {
self.queue.len()
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
fn is_full(&self) -> bool {
self.queue.is_full()
}
}
/// A bounded channel for communicating between asynchronous tasks
@@ -507,6 +565,16 @@ where
Receiver { channel: self }
}
/// Get a sender for this channel using dynamic dispatch.
pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
DynamicSender { channel: self }
}
/// Get a receiver for this channel using dynamic dispatch.
pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
DynamicReceiver { channel: self }
}
/// Send a value, waiting until there is capacity.
///
/// Sending completes when the value has been pushed to the channel's queue.
@@ -540,6 +608,14 @@ where
ReceiveFuture { channel: self }
}
/// Is a value ready to be received in the channel
///
/// If there are no messages in the channel's buffer, this method will
/// wait until there is at least one
pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
ReceiveReadyFuture { channel: self }
}
/// Attempt to immediately receive a message.
///
/// This method will either receive a message from the channel immediately or return an error
@@ -547,6 +623,38 @@ where
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
self.lock(|c| c.try_receive())
}
/// Returns the maximum number of elements the channel can hold.
pub const fn capacity(&self) -> usize {
N
}
/// Returns the free capacity of the channel.
///
/// This is equivalent to `capacity() - len()`
pub fn free_capacity(&self) -> usize {
N - self.len()
}
/// Clears all elements in the channel.
pub fn clear(&self) {
self.lock(|c| c.clear());
}
/// Returns the number of elements currently in the channel.
pub fn len(&self) -> usize {
self.lock(|c| c.len())
}
/// Returns whether the channel is empty.
pub fn is_empty(&self) -> bool {
self.lock(|c| c.is_empty())
}
/// Returns whether the channel is full.
pub fn is_full(&self) -> bool {
self.lock(|c| c.is_full())
}
}
/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
@@ -648,7 +756,7 @@ mod tests {
}
#[test]
fn dynamic_dispatch() {
fn dynamic_dispatch_into() {
let c = Channel::<NoopRawMutex, u32, 3>::new();
let s: DynamicSender<'_, u32> = c.sender().into();
let r: DynamicReceiver<'_, u32> = c.receiver().into();
@@ -657,6 +765,16 @@ mod tests {
assert_eq!(r.try_receive().unwrap(), 1);
}
#[test]
fn dynamic_dispatch_constructor() {
let c = Channel::<NoopRawMutex, u32, 3>::new();
let s = c.dyn_sender();
let r = c.dyn_receiver();
assert!(s.try_send(1).is_ok());
assert_eq!(r.try_receive().unwrap(), 1);
}
#[futures_test::test]
async fn receiver_receives_given_try_send_async() {
let executor = ThreadPool::new().unwrap();

View File

@@ -1,11 +1,12 @@
#![macro_use]
#![allow(unused_macros)]
#![allow(unused)]
use core::fmt::{Debug, Display, LowerHex};
#[cfg(all(feature = "defmt", feature = "log"))]
compile_error!("You may not enable both `defmt` and `log` features.");
#[collapse_debuginfo(yes)]
macro_rules! assert {
($($x:tt)*) => {
{
@@ -17,6 +18,7 @@ macro_rules! assert {
};
}
#[collapse_debuginfo(yes)]
macro_rules! assert_eq {
($($x:tt)*) => {
{
@@ -28,6 +30,7 @@ macro_rules! assert_eq {
};
}
#[collapse_debuginfo(yes)]
macro_rules! assert_ne {
($($x:tt)*) => {
{
@@ -39,6 +42,7 @@ macro_rules! assert_ne {
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug_assert {
($($x:tt)*) => {
{
@@ -50,6 +54,7 @@ macro_rules! debug_assert {
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug_assert_eq {
($($x:tt)*) => {
{
@@ -61,6 +66,7 @@ macro_rules! debug_assert_eq {
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug_assert_ne {
($($x:tt)*) => {
{
@@ -72,6 +78,7 @@ macro_rules! debug_assert_ne {
};
}
#[collapse_debuginfo(yes)]
macro_rules! todo {
($($x:tt)*) => {
{
@@ -83,20 +90,19 @@ macro_rules! todo {
};
}
#[cfg(not(feature = "defmt"))]
#[collapse_debuginfo(yes)]
macro_rules! unreachable {
($($x:tt)*) => {
::core::unreachable!($($x)*)
};
}
#[cfg(feature = "defmt")]
macro_rules! unreachable {
($($x:tt)*) => {
::defmt::unreachable!($($x)*)
{
#[cfg(not(feature = "defmt"))]
::core::unreachable!($($x)*);
#[cfg(feature = "defmt")]
::defmt::unreachable!($($x)*);
}
};
}
#[collapse_debuginfo(yes)]
macro_rules! panic {
($($x:tt)*) => {
{
@@ -108,6 +114,7 @@ macro_rules! panic {
};
}
#[collapse_debuginfo(yes)]
macro_rules! trace {
($s:literal $(, $x:expr)* $(,)?) => {
{
@@ -121,6 +128,7 @@ macro_rules! trace {
};
}
#[collapse_debuginfo(yes)]
macro_rules! debug {
($s:literal $(, $x:expr)* $(,)?) => {
{
@@ -134,6 +142,7 @@ macro_rules! debug {
};
}
#[collapse_debuginfo(yes)]
macro_rules! info {
($s:literal $(, $x:expr)* $(,)?) => {
{
@@ -147,6 +156,7 @@ macro_rules! info {
};
}
#[collapse_debuginfo(yes)]
macro_rules! warn {
($s:literal $(, $x:expr)* $(,)?) => {
{
@@ -160,6 +170,7 @@ macro_rules! warn {
};
}
#[collapse_debuginfo(yes)]
macro_rules! error {
($s:literal $(, $x:expr)* $(,)?) => {
{
@@ -174,6 +185,7 @@ macro_rules! error {
}
#[cfg(feature = "defmt")]
#[collapse_debuginfo(yes)]
macro_rules! unwrap {
($($x:tt)*) => {
::defmt::unwrap!($($x)*)
@@ -181,6 +193,7 @@ macro_rules! unwrap {
}
#[cfg(not(feature = "defmt"))]
#[collapse_debuginfo(yes)]
macro_rules! unwrap {
($arg:expr) => {
match $crate::fmt::Try::into_result($arg) {
@@ -229,7 +242,6 @@ impl<T, E> Try for Result<T, E> {
}
}
#[allow(unused)]
pub(crate) struct Bytes<'a>(pub &'a [u8]);
impl<'a> Debug for Bytes<'a> {

View File

@@ -0,0 +1,152 @@
//! Synchronization primitive for initializing a value once, allowing others to get a reference to the value.
use core::cell::UnsafeCell;
use core::mem::ManuallyDrop;
use core::sync::atomic::{AtomicBool, Ordering};
/// The `LazyLock` is a synchronization primitive that allows for
/// initializing a value once, and allowing others to obtain a
/// reference to the value. This is useful for lazy initialization of
/// a static value.
///
/// # Example
/// ```
/// use futures_executor::block_on;
/// use embassy_sync::lazy_lock::LazyLock;
///
/// // Define a static value that will be lazily initialized
/// // at runtime at the first access.
/// static VALUE: LazyLock<u32> = LazyLock::new(|| 20);
///
/// let reference = VALUE.get();
/// assert_eq!(reference, &20);
/// ```
pub struct LazyLock<T, F = fn() -> T> {
init: AtomicBool,
data: UnsafeCell<Data<T, F>>,
}
union Data<T, F> {
value: ManuallyDrop<T>,
f: ManuallyDrop<F>,
}
unsafe impl<T, F> Sync for LazyLock<T, F> {}
impl<T, F: FnOnce() -> T> LazyLock<T, F> {
/// Create a new uninitialized `StaticLock`.
pub const fn new(init_fn: F) -> Self {
Self {
init: AtomicBool::new(false),
data: UnsafeCell::new(Data {
f: ManuallyDrop::new(init_fn),
}),
}
}
/// Get a reference to the underlying value, initializing it if it
/// has not been done already.
#[inline]
pub fn get(&self) -> &T {
self.ensure_init_fast();
unsafe { &(*self.data.get()).value }
}
/// Consume the `LazyLock`, returning the underlying value. The
/// initialization function will be called if it has not been
/// already.
#[inline]
pub fn into_inner(self) -> T {
self.ensure_init_fast();
let this = ManuallyDrop::new(self);
let data = unsafe { core::ptr::read(&this.data) }.into_inner();
ManuallyDrop::into_inner(unsafe { data.value })
}
/// Initialize the `LazyLock` if it has not been initialized yet.
/// This function is a fast track to [`Self::ensure_init`]
/// which does not require a critical section in most cases when
/// the value has been initialized already.
/// When this function returns, `self.data` is guaranteed to be
/// initialized and visible on the current core.
#[inline]
fn ensure_init_fast(&self) {
if !self.init.load(Ordering::Acquire) {
self.ensure_init();
}
}
/// Initialize the `LazyLock` if it has not been initialized yet.
/// When this function returns, `self.data` is guaranteed to be
/// initialized and visible on the current core.
fn ensure_init(&self) {
critical_section::with(|_| {
if !self.init.load(Ordering::Acquire) {
let data = unsafe { &mut *self.data.get() };
let f = unsafe { ManuallyDrop::take(&mut data.f) };
let value = f();
data.value = ManuallyDrop::new(value);
self.init.store(true, Ordering::Release);
}
});
}
}
impl<T, F> Drop for LazyLock<T, F> {
fn drop(&mut self) {
if self.init.load(Ordering::Acquire) {
unsafe { ManuallyDrop::drop(&mut self.data.get_mut().value) };
} else {
unsafe { ManuallyDrop::drop(&mut self.data.get_mut().f) };
}
}
}
#[cfg(test)]
mod tests {
use core::sync::atomic::{AtomicU32, Ordering};
use super::*;
#[test]
fn test_lazy_lock() {
static VALUE: LazyLock<u32> = LazyLock::new(|| 20);
let reference = VALUE.get();
assert_eq!(reference, &20);
}
#[test]
fn test_lazy_lock_into_inner() {
let lazy: LazyLock<u32> = LazyLock::new(|| 20);
let value = lazy.into_inner();
assert_eq!(value, 20);
}
static DROP_CHECKER: AtomicU32 = AtomicU32::new(0);
struct DropCheck;
impl Drop for DropCheck {
fn drop(&mut self) {
DROP_CHECKER.fetch_add(1, Ordering::Acquire);
}
}
#[test]
fn test_lazy_drop() {
let lazy: LazyLock<DropCheck> = LazyLock::new(|| DropCheck);
assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 0);
lazy.get();
drop(lazy);
assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1);
let dropper = DropCheck;
let lazy_fn: LazyLock<u32, _> = LazyLock::new(move || {
let _a = dropper;
20
});
assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1);
drop(lazy_fn);
assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 2);
}
}

View File

@@ -1,4 +1,4 @@
#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
#![cfg_attr(not(feature = "std"), no_std)]
#![allow(async_fn_in_trait)]
#![allow(clippy::new_without_default)]
#![doc = include_str!("../README.md")]
@@ -12,10 +12,13 @@ mod ring_buffer;
pub mod blocking_mutex;
pub mod channel;
pub mod lazy_lock;
pub mod mutex;
pub mod once_lock;
pub mod pipe;
pub mod priority_channel;
pub mod pubsub;
pub mod semaphore;
pub mod signal;
pub mod waitqueue;
pub mod watch;

View File

@@ -5,6 +5,7 @@ use core::cell::{RefCell, UnsafeCell};
use core::future::poll_fn;
use core::ops::{Deref, DerefMut};
use core::task::Poll;
use core::{fmt, mem};
use crate::blocking_mutex::raw::RawMutex;
use crate::blocking_mutex::Mutex as BlockingMutex;
@@ -128,12 +129,49 @@ where
}
}
impl<M: RawMutex, T> From<T> for Mutex<M, T> {
fn from(from: T) -> Self {
Self::new(from)
}
}
impl<M, T> Default for Mutex<M, T>
where
M: RawMutex,
T: ?Sized + Default,
{
fn default() -> Self {
Self::new(Default::default())
}
}
impl<M, T> fmt::Debug for Mutex<M, T>
where
M: RawMutex,
T: ?Sized + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("Mutex");
match self.try_lock() {
Ok(value) => {
d.field("inner", &&*value);
}
Err(TryLockError) => {
d.field("inner", &format_args!("<locked>"));
}
}
d.finish_non_exhaustive()
}
}
/// Async mutex guard.
///
/// Owning an instance of this type indicates having
/// successfully locked the mutex, and grants access to the contents.
///
/// Dropping it unlocks the mutex.
#[clippy::has_significant_drop]
pub struct MutexGuard<'a, M, T>
where
M: RawMutex,
@@ -142,6 +180,25 @@ where
mutex: &'a Mutex<M, T>,
}
impl<'a, M, T> MutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized,
{
/// Returns a locked view over a portion of the locked data.
pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
let mutex = this.mutex;
let value = fun(unsafe { &mut *this.mutex.inner.get() });
// Don't run the `drop` method for MutexGuard. The ownership of the underlying
// locked state is being moved to the returned MappedMutexGuard.
mem::forget(this);
MappedMutexGuard {
state: &mutex.state,
value,
}
}
}
impl<'a, M, T> Drop for MutexGuard<'a, M, T>
where
M: RawMutex,
@@ -180,3 +237,155 @@ where
unsafe { &mut *(self.mutex.inner.get()) }
}
}
impl<'a, M, T> fmt::Debug for MutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<'a, M, T> fmt::Display for MutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized + fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`] or
/// [`MappedMutexGuard::map`].
///
/// This can be used to hold a subfield of the protected data.
#[clippy::has_significant_drop]
pub struct MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized,
{
state: &'a BlockingMutex<M, RefCell<State>>,
value: *mut T,
}
impl<'a, M, T> MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized,
{
/// Returns a locked view over a portion of the locked data.
pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
let state = this.state;
let value = fun(unsafe { &mut *this.value });
// Don't run the `drop` method for MutexGuard. The ownership of the underlying
// locked state is being moved to the returned MappedMutexGuard.
mem::forget(this);
MappedMutexGuard { state, value }
}
}
impl<'a, M, T> Deref for MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized,
{
type Target = T;
fn deref(&self) -> &Self::Target {
// Safety: the MutexGuard represents exclusive access to the contents
// of the mutex, so it's OK to get it.
unsafe { &*self.value }
}
}
impl<'a, M, T> DerefMut for MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized,
{
fn deref_mut(&mut self) -> &mut Self::Target {
// Safety: the MutexGuard represents exclusive access to the contents
// of the mutex, so it's OK to get it.
unsafe { &mut *self.value }
}
}
impl<'a, M, T> Drop for MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized,
{
fn drop(&mut self) {
self.state.lock(|s| {
let mut s = unwrap!(s.try_borrow_mut());
s.locked = false;
s.waker.wake();
})
}
}
unsafe impl<M, T> Send for MappedMutexGuard<'_, M, T>
where
M: RawMutex + Sync,
T: Send + ?Sized,
{
}
unsafe impl<M, T> Sync for MappedMutexGuard<'_, M, T>
where
M: RawMutex + Sync,
T: Sync + ?Sized,
{
}
impl<'a, M, T> fmt::Debug for MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<'a, M, T> fmt::Display for MappedMutexGuard<'a, M, T>
where
M: RawMutex,
T: ?Sized + fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
#[cfg(test)]
mod tests {
use crate::blocking_mutex::raw::NoopRawMutex;
use crate::mutex::{Mutex, MutexGuard};
#[futures_test::test]
async fn mapped_guard_releases_lock_when_dropped() {
let mutex: Mutex<NoopRawMutex, [i32; 2]> = Mutex::new([0, 1]);
{
let guard = mutex.lock().await;
assert_eq!(*guard, [0, 1]);
let mut mapped = MutexGuard::map(guard, |this| &mut this[1]);
assert_eq!(*mapped, 1);
*mapped = 2;
}
{
let guard = mutex.lock().await;
assert_eq!(*guard, [0, 2]);
let mut mapped = MutexGuard::map(guard, |this| &mut this[1]);
assert_eq!(*mapped, 2);
*mapped = 3;
}
assert_eq!(*mutex.lock().await, [0, 3]);
}
}

View File

@@ -0,0 +1,236 @@
//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
use core::cell::Cell;
use core::future::poll_fn;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicBool, Ordering};
use core::task::Poll;
/// The `OnceLock` is a synchronization primitive that allows for
/// initializing a value once, and allowing others to `.await` a
/// reference to the value. This is useful for lazy initialization of
/// a static value.
///
/// **Note**: this implementation uses a busy loop to poll the value,
/// which is not as efficient as registering a dedicated `Waker`.
/// However, if the usecase for it is to initialize a static variable
/// relatively early in the program life cycle, it should be fine.
///
/// # Example
/// ```
/// use futures_executor::block_on;
/// use embassy_sync::once_lock::OnceLock;
///
/// // Define a static value that will be lazily initialized
/// static VALUE: OnceLock<u32> = OnceLock::new();
///
/// let f = async {
///
/// // Initialize the value
/// let reference = VALUE.get_or_init(|| 20);
/// assert_eq!(reference, &20);
///
/// // Wait for the value to be initialized
/// // and get a static reference it
/// assert_eq!(VALUE.get().await, &20);
///
/// };
/// block_on(f)
/// ```
pub struct OnceLock<T> {
init: AtomicBool,
data: Cell<MaybeUninit<T>>,
}
unsafe impl<T> Sync for OnceLock<T> {}
impl<T> OnceLock<T> {
/// Create a new uninitialized `OnceLock`.
pub const fn new() -> Self {
Self {
init: AtomicBool::new(false),
data: Cell::new(MaybeUninit::zeroed()),
}
}
/// Get a reference to the underlying value, waiting for it to be set.
/// If the value is already set, this will return immediately.
pub async fn get(&self) -> &T {
poll_fn(|cx| match self.try_get() {
Some(data) => Poll::Ready(data),
None => {
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
/// Try to get a reference to the underlying value if it exists.
pub fn try_get(&self) -> Option<&T> {
if self.init.load(Ordering::Relaxed) {
Some(unsafe { self.get_ref_unchecked() })
} else {
None
}
}
/// Set the underlying value. If the value is already set, this will return an error with the given value.
pub fn init(&self, value: T) -> Result<(), T> {
// Critical section is required to ensure that the value is
// not simultaneously initialized elsewhere at the same time.
critical_section::with(|_| {
// If the value is not set, set it and return Ok.
if !self.init.load(Ordering::Relaxed) {
self.data.set(MaybeUninit::new(value));
self.init.store(true, Ordering::Relaxed);
Ok(())
// Otherwise return an error with the given value.
} else {
Err(value)
}
})
}
/// Get a reference to the underlying value, initializing it if it does not exist.
pub fn get_or_init<F>(&self, f: F) -> &T
where
F: FnOnce() -> T,
{
// Critical section is required to ensure that the value is
// not simultaneously initialized elsewhere at the same time.
critical_section::with(|_| {
// If the value is not set, set it.
if !self.init.load(Ordering::Relaxed) {
self.data.set(MaybeUninit::new(f()));
self.init.store(true, Ordering::Relaxed);
}
});
// Return a reference to the value.
unsafe { self.get_ref_unchecked() }
}
/// Consume the `OnceLock`, returning the underlying value if it was initialized.
pub fn into_inner(self) -> Option<T> {
if self.init.load(Ordering::Relaxed) {
Some(unsafe { self.data.into_inner().assume_init() })
} else {
None
}
}
/// Take the underlying value if it was initialized, uninitializing the `OnceLock` in the process.
pub fn take(&mut self) -> Option<T> {
// If the value is set, uninitialize the lock and return the value.
critical_section::with(|_| {
if self.init.load(Ordering::Relaxed) {
let val = unsafe { self.data.replace(MaybeUninit::zeroed()).assume_init() };
self.init.store(false, Ordering::Relaxed);
Some(val)
// Otherwise return None.
} else {
None
}
})
}
/// Check if the value has been set.
pub fn is_set(&self) -> bool {
self.init.load(Ordering::Relaxed)
}
/// Get a reference to the underlying value.
/// # Safety
/// Must only be used if a value has been set.
unsafe fn get_ref_unchecked(&self) -> &T {
(*self.data.as_ptr()).assume_init_ref()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn once_lock() {
let lock = OnceLock::new();
assert_eq!(lock.try_get(), None);
assert_eq!(lock.is_set(), false);
let v = 42;
assert_eq!(lock.init(v), Ok(()));
assert_eq!(lock.is_set(), true);
assert_eq!(lock.try_get(), Some(&v));
assert_eq!(lock.try_get(), Some(&v));
let v = 43;
assert_eq!(lock.init(v), Err(v));
assert_eq!(lock.is_set(), true);
assert_eq!(lock.try_get(), Some(&42));
}
#[test]
fn once_lock_get_or_init() {
let lock = OnceLock::new();
assert_eq!(lock.try_get(), None);
assert_eq!(lock.is_set(), false);
let v = lock.get_or_init(|| 42);
assert_eq!(v, &42);
assert_eq!(lock.is_set(), true);
assert_eq!(lock.try_get(), Some(&42));
let v = lock.get_or_init(|| 43);
assert_eq!(v, &42);
assert_eq!(lock.is_set(), true);
assert_eq!(lock.try_get(), Some(&42));
}
#[test]
fn once_lock_static() {
static LOCK: OnceLock<i32> = OnceLock::new();
let v: &'static i32 = LOCK.get_or_init(|| 42);
assert_eq!(v, &42);
let v: &'static i32 = LOCK.get_or_init(|| 43);
assert_eq!(v, &42);
}
#[futures_test::test]
async fn once_lock_async() {
static LOCK: OnceLock<i32> = OnceLock::new();
assert!(LOCK.init(42).is_ok());
let v: &'static i32 = LOCK.get().await;
assert_eq!(v, &42);
}
#[test]
fn once_lock_into_inner() {
let lock: OnceLock<i32> = OnceLock::new();
let v = lock.get_or_init(|| 42);
assert_eq!(v, &42);
assert_eq!(lock.into_inner(), Some(42));
}
#[test]
fn once_lock_take_init() {
let mut lock: OnceLock<i32> = OnceLock::new();
assert_eq!(lock.get_or_init(|| 42), &42);
assert_eq!(lock.is_set(), true);
assert_eq!(lock.take(), Some(42));
assert_eq!(lock.is_set(), false);
assert_eq!(lock.get_or_init(|| 43), &43);
assert_eq!(lock.is_set(), true);
}
}

View File

@@ -25,7 +25,7 @@ where
M: RawMutex,
{
fn clone(&self) -> Self {
Writer { pipe: self.pipe }
*self
}
}

View File

@@ -33,7 +33,7 @@ where
M: RawMutex,
{
fn clone(&self) -> Self {
Sender { channel: self.channel }
*self
}
}
@@ -101,7 +101,7 @@ where
M: RawMutex,
{
fn clone(&self) -> Self {
Receiver { channel: self.channel }
*self
}
}
@@ -314,6 +314,22 @@ where
Poll::Pending
}
}
fn clear(&mut self) {
self.queue.clear();
}
fn len(&self) -> usize {
self.queue.len()
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
fn is_full(&self) -> bool {
self.queue.len() == self.queue.capacity()
}
}
/// A bounded channel for communicating between asynchronous tasks
@@ -323,7 +339,7 @@ where
/// buffer is full, attempts to `send` new messages will wait until a message is
/// received from the channel.
///
/// Sent data may be reordered based on their priorty within the channel.
/// Sent data may be reordered based on their priority within the channel.
/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
pub struct PriorityChannel<M, T, K, const N: usize>
@@ -433,6 +449,38 @@ where
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
self.lock(|c| c.try_receive())
}
/// Returns the maximum number of elements the channel can hold.
pub const fn capacity(&self) -> usize {
N
}
/// Returns the free capacity of the channel.
///
/// This is equivalent to `capacity() - len()`
pub fn free_capacity(&self) -> usize {
N - self.len()
}
/// Clears all elements in the channel.
pub fn clear(&self) {
self.lock(|c| c.clear());
}
/// Returns the number of elements currently in the channel.
pub fn len(&self) -> usize {
self.lock(|c| c.len())
}
/// Returns whether the channel is empty.
pub fn is_empty(&self) -> bool {
self.lock(|c| c.is_empty())
}
/// Returns whether the channel is full.
pub fn is_full(&self) -> bool {
self.lock(|c| c.is_full())
}
}
/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the

View File

@@ -160,9 +160,60 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
DynImmediatePublisher(ImmediatePub::new(self))
}
/// Returns the maximum number of elements the channel can hold.
pub const fn capacity(&self) -> usize {
CAP
}
/// Returns the free capacity of the channel.
///
/// This is equivalent to `capacity() - len()`
pub fn free_capacity(&self) -> usize {
CAP - self.len()
}
/// Clears all elements in the channel.
pub fn clear(&self) {
self.inner.lock(|inner| inner.borrow_mut().clear());
}
/// Returns the number of elements currently in the channel.
pub fn len(&self) -> usize {
self.inner.lock(|inner| inner.borrow().len())
}
/// Returns whether the channel is empty.
pub fn is_empty(&self) -> bool {
self.inner.lock(|inner| inner.borrow().is_empty())
}
/// Returns whether the channel is full.
pub fn is_full(&self) -> bool {
self.inner.lock(|inner| inner.borrow().is_full())
}
}
impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T>
impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
for PubSubChannel<M, T, CAP, SUBS, PUBS>
{
fn publish_immediate(&self, message: T) {
self.inner.lock(|s| {
let mut s = s.borrow_mut();
s.publish_immediate(message)
})
}
fn capacity(&self) -> usize {
self.capacity()
}
fn is_full(&self) -> bool {
self.is_full()
}
}
impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
for PubSubChannel<M, T, CAP, SUBS, PUBS>
{
fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
@@ -214,20 +265,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
})
}
fn publish_immediate(&self, message: T) {
self.inner.lock(|s| {
let mut s = s.borrow_mut();
s.publish_immediate(message)
})
}
fn space(&self) -> usize {
self.inner.lock(|s| {
let s = s.borrow();
s.queue.capacity() - s.queue.len()
})
}
fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
self.inner.lock(|s| {
let mut s = s.borrow_mut();
@@ -241,6 +278,22 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
s.unregister_publisher()
})
}
fn free_capacity(&self) -> usize {
self.free_capacity()
}
fn clear(&self) {
self.clear();
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.is_empty()
}
}
/// Internal state for the PubSub channel
@@ -366,10 +419,26 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
fn unregister_publisher(&mut self) {
self.publisher_count -= 1;
}
fn clear(&mut self) {
self.queue.clear();
}
fn len(&self) -> usize {
self.queue.len()
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
fn is_full(&self) -> bool {
self.queue.is_full()
}
}
/// Error type for the [PubSubChannel]
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error {
/// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
@@ -380,12 +449,10 @@ pub enum Error {
MaximumPublishersReached,
}
/// 'Middle level' behaviour of the pubsub channel.
/// This trait is used so that Sub and Pub can be generic over the channel.
pub trait PubSubBehavior<T> {
trait SealedPubSubBehavior<T> {
/// Try to get a message from the queue with the given message id.
///
/// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
/// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers.
fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
/// Get the amount of messages that are between the given the next_message_id and the most recent message.
@@ -397,11 +464,19 @@ pub trait PubSubBehavior<T> {
/// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
/// Publish a message immediately
fn publish_immediate(&self, message: T);
/// Returns the free capacity of the channel.
///
/// This is equivalent to `capacity() - len()`
fn free_capacity(&self) -> usize;
/// The amount of messages that can still be published without having to wait or without having to lag the subscribers
fn space(&self) -> usize;
/// Clears all elements in the channel.
fn clear(&self);
/// Returns the number of elements currently in the channel.
fn len(&self) -> usize;
/// Returns whether the channel is empty.
fn is_empty(&self) -> bool;
/// Let the channel know that a subscriber has dropped
fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -410,6 +485,20 @@ pub trait PubSubBehavior<T> {
fn unregister_publisher(&self);
}
/// 'Middle level' behaviour of the pubsub channel.
/// This trait is used so that Sub and Pub can be generic over the channel.
#[allow(private_bounds)]
pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
/// Publish a message immediately
fn publish_immediate(&self, message: T);
/// Returns the maximum number of elements the channel can hold.
fn capacity(&self) -> usize;
/// Returns whether the channel is full.
fn is_full(&self) -> bool;
}
/// The result of the subscriber wait procedure
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -542,6 +631,7 @@ mod tests {
assert_eq!(pub0.try_publish(0), Ok(()));
assert_eq!(pub0.try_publish(0), Ok(()));
assert_eq!(pub0.try_publish(0), Ok(()));
assert!(pub0.is_full());
assert_eq!(pub0.try_publish(0), Err(0));
drop(sub0);
@@ -574,32 +664,42 @@ mod tests {
}
#[futures_test::test]
async fn correct_space() {
async fn correct_len() {
let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
let mut sub0 = channel.subscriber().unwrap();
let mut sub1 = channel.subscriber().unwrap();
let pub0 = channel.publisher().unwrap();
assert_eq!(pub0.space(), 4);
assert!(sub0.is_empty());
assert!(sub1.is_empty());
assert!(pub0.is_empty());
assert_eq!(pub0.free_capacity(), 4);
assert_eq!(pub0.len(), 0);
pub0.publish(42).await;
assert_eq!(pub0.space(), 3);
assert_eq!(pub0.free_capacity(), 3);
assert_eq!(pub0.len(), 1);
pub0.publish(42).await;
assert_eq!(pub0.space(), 2);
assert_eq!(pub0.free_capacity(), 2);
assert_eq!(pub0.len(), 2);
sub0.next_message().await;
sub0.next_message().await;
assert_eq!(pub0.space(), 2);
assert_eq!(pub0.free_capacity(), 2);
assert_eq!(pub0.len(), 2);
sub1.next_message().await;
assert_eq!(pub0.space(), 3);
assert_eq!(pub0.free_capacity(), 3);
assert_eq!(pub0.len(), 1);
sub1.next_message().await;
assert_eq!(pub0.space(), 4);
assert_eq!(pub0.free_capacity(), 4);
assert_eq!(pub0.len(), 0);
}
#[futures_test::test]
@@ -610,29 +710,29 @@ mod tests {
let mut sub0 = channel.subscriber().unwrap();
let mut sub1 = channel.subscriber().unwrap();
assert_eq!(4, pub0.space());
assert_eq!(4, pub0.free_capacity());
pub0.publish(1).await;
pub0.publish(2).await;
assert_eq!(2, channel.space());
assert_eq!(2, channel.free_capacity());
assert_eq!(1, sub0.try_next_message_pure().unwrap());
assert_eq!(2, sub0.try_next_message_pure().unwrap());
assert_eq!(2, channel.space());
assert_eq!(2, channel.free_capacity());
drop(sub0);
assert_eq!(2, channel.space());
assert_eq!(2, channel.free_capacity());
assert_eq!(1, sub1.try_next_message_pure().unwrap());
assert_eq!(3, channel.space());
assert_eq!(3, channel.free_capacity());
drop(sub1);
assert_eq!(4, channel.space());
assert_eq!(4, channel.free_capacity());
}
struct CloneCallCounter(usize);

View File

@@ -43,12 +43,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
self.channel.publish_with_context(message, None)
}
/// The amount of messages that can still be published without having to wait or without having to lag the subscribers
/// Returns the maximum number of elements the ***channel*** can hold.
pub fn capacity(&self) -> usize {
self.channel.capacity()
}
/// Returns the free capacity of the ***channel***.
///
/// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
/// So checking doesn't give any guarantees.*
pub fn space(&self) -> usize {
self.channel.space()
/// This is equivalent to `capacity() - len()`
pub fn free_capacity(&self) -> usize {
self.channel.free_capacity()
}
/// Clears all elements in the ***channel***.
pub fn clear(&self) {
self.channel.clear();
}
/// Returns the number of elements currently in the ***channel***.
pub fn len(&self) -> usize {
self.channel.len()
}
/// Returns whether the ***channel*** is empty.
pub fn is_empty(&self) -> bool {
self.channel.is_empty()
}
/// Returns whether the ***channel*** is full.
pub fn is_full(&self) -> bool {
self.channel.is_full()
}
}
@@ -124,12 +148,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
self.channel.publish_with_context(message, None)
}
/// The amount of messages that can still be published without having to wait or without having to lag the subscribers
/// Returns the maximum number of elements the ***channel*** can hold.
pub fn capacity(&self) -> usize {
self.channel.capacity()
}
/// Returns the free capacity of the ***channel***.
///
/// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
/// So checking doesn't give any guarantees.*
pub fn space(&self) -> usize {
self.channel.space()
/// This is equivalent to `capacity() - len()`
pub fn free_capacity(&self) -> usize {
self.channel.free_capacity()
}
/// Clears all elements in the ***channel***.
pub fn clear(&self) {
self.channel.clear();
}
/// Returns the number of elements currently in the ***channel***.
pub fn len(&self) -> usize {
self.channel.len()
}
/// Returns whether the ***channel*** is empty.
pub fn is_empty(&self) -> bool {
self.channel.is_empty()
}
/// Returns whether the ***channel*** is full.
pub fn is_full(&self) -> bool {
self.channel.is_full()
}
}

View File

@@ -65,10 +65,44 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
}
}
/// The amount of messages this subscriber hasn't received yet
/// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
/// for this subscriber.
pub fn available(&self) -> u64 {
self.channel.available(self.next_message_id)
}
/// Returns the maximum number of elements the ***channel*** can hold.
pub fn capacity(&self) -> usize {
self.channel.capacity()
}
/// Returns the free capacity of the ***channel***.
///
/// This is equivalent to `capacity() - len()`
pub fn free_capacity(&self) -> usize {
self.channel.free_capacity()
}
/// Clears all elements in the ***channel***.
pub fn clear(&self) {
self.channel.clear();
}
/// Returns the number of elements currently in the ***channel***.
/// See [Self::available] for how many messages are available for this subscriber.
pub fn len(&self) -> usize {
self.channel.len()
}
/// Returns whether the ***channel*** is empty.
pub fn is_empty(&self) -> bool {
self.channel.is_empty()
}
/// Returns whether the ***channel*** is full.
pub fn is_full(&self) -> bool {
self.channel.is_full()
}
}
impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {

View File

@@ -0,0 +1,772 @@
//! A synchronization primitive for controlling access to a pool of resources.
use core::cell::{Cell, RefCell};
use core::convert::Infallible;
use core::future::{poll_fn, Future};
use core::task::{Poll, Waker};
use heapless::Deque;
use crate::blocking_mutex::raw::RawMutex;
use crate::blocking_mutex::Mutex;
use crate::waitqueue::WakerRegistration;
/// An asynchronous semaphore.
///
/// A semaphore tracks a number of permits, typically representing a pool of shared resources.
/// Users can acquire permits to synchronize access to those resources. The semaphore does not
/// contain the resources themselves, only the count of available permits.
pub trait Semaphore: Sized {
/// The error returned when the semaphore is unable to acquire the requested permits.
type Error;
/// Asynchronously acquire one or more permits from the semaphore.
async fn acquire(&self, permits: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error>;
/// Try to immediately acquire one or more permits from the semaphore.
fn try_acquire(&self, permits: usize) -> Option<SemaphoreReleaser<'_, Self>>;
/// Asynchronously acquire all permits controlled by the semaphore.
///
/// This method will wait until at least `min` permits are available, then acquire all available permits
/// from the semaphore. Note that other tasks may have already acquired some permits which could be released
/// back to the semaphore at any time. The number of permits actually acquired may be determined by calling
/// [`SemaphoreReleaser::permits`].
async fn acquire_all(&self, min: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error>;
/// Try to immediately acquire all available permits from the semaphore, if at least `min` permits are available.
fn try_acquire_all(&self, min: usize) -> Option<SemaphoreReleaser<'_, Self>>;
/// Release `permits` back to the semaphore, making them available to be acquired.
fn release(&self, permits: usize);
/// Reset the number of available permints in the semaphore to `permits`.
fn set(&self, permits: usize);
}
/// A representation of a number of acquired permits.
///
/// The acquired permits will be released back to the [`Semaphore`] when this is dropped.
pub struct SemaphoreReleaser<'a, S: Semaphore> {
semaphore: &'a S,
permits: usize,
}
impl<'a, S: Semaphore> Drop for SemaphoreReleaser<'a, S> {
fn drop(&mut self) {
self.semaphore.release(self.permits);
}
}
impl<'a, S: Semaphore> SemaphoreReleaser<'a, S> {
/// The number of acquired permits.
pub fn permits(&self) -> usize {
self.permits
}
/// Prevent the acquired permits from being released on drop.
///
/// Returns the number of acquired permits.
pub fn disarm(self) -> usize {
let permits = self.permits;
core::mem::forget(self);
permits
}
}
/// A greedy [`Semaphore`] implementation.
///
/// Tasks can acquire permits as soon as they become available, even if another task
/// is waiting on a larger number of permits.
pub struct GreedySemaphore<M: RawMutex> {
state: Mutex<M, Cell<SemaphoreState>>,
}
impl<M: RawMutex> Default for GreedySemaphore<M> {
fn default() -> Self {
Self::new(0)
}
}
impl<M: RawMutex> GreedySemaphore<M> {
/// Create a new `Semaphore`.
pub const fn new(permits: usize) -> Self {
Self {
state: Mutex::new(Cell::new(SemaphoreState {
permits,
waker: WakerRegistration::new(),
})),
}
}
#[cfg(test)]
fn permits(&self) -> usize {
self.state.lock(|cell| {
let state = cell.replace(SemaphoreState::EMPTY);
let permits = state.permits;
cell.replace(state);
permits
})
}
fn poll_acquire(
&self,
permits: usize,
acquire_all: bool,
waker: Option<&Waker>,
) -> Poll<Result<SemaphoreReleaser<'_, Self>, Infallible>> {
self.state.lock(|cell| {
let mut state = cell.replace(SemaphoreState::EMPTY);
if let Some(permits) = state.take(permits, acquire_all) {
cell.set(state);
Poll::Ready(Ok(SemaphoreReleaser {
semaphore: self,
permits,
}))
} else {
if let Some(waker) = waker {
state.register(waker);
}
cell.set(state);
Poll::Pending
}
})
}
}
impl<M: RawMutex> Semaphore for GreedySemaphore<M> {
type Error = Infallible;
async fn acquire(&self, permits: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error> {
poll_fn(|cx| self.poll_acquire(permits, false, Some(cx.waker()))).await
}
fn try_acquire(&self, permits: usize) -> Option<SemaphoreReleaser<'_, Self>> {
match self.poll_acquire(permits, false, None) {
Poll::Ready(Ok(n)) => Some(n),
_ => None,
}
}
async fn acquire_all(&self, min: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error> {
poll_fn(|cx| self.poll_acquire(min, true, Some(cx.waker()))).await
}
fn try_acquire_all(&self, min: usize) -> Option<SemaphoreReleaser<'_, Self>> {
match self.poll_acquire(min, true, None) {
Poll::Ready(Ok(n)) => Some(n),
_ => None,
}
}
fn release(&self, permits: usize) {
if permits > 0 {
self.state.lock(|cell| {
let mut state = cell.replace(SemaphoreState::EMPTY);
state.permits += permits;
state.wake();
cell.set(state);
});
}
}
fn set(&self, permits: usize) {
self.state.lock(|cell| {
let mut state = cell.replace(SemaphoreState::EMPTY);
if permits > state.permits {
state.wake();
}
state.permits = permits;
cell.set(state);
});
}
}
struct SemaphoreState {
permits: usize,
waker: WakerRegistration,
}
impl SemaphoreState {
const EMPTY: SemaphoreState = SemaphoreState {
permits: 0,
waker: WakerRegistration::new(),
};
fn register(&mut self, w: &Waker) {
self.waker.register(w);
}
fn take(&mut self, mut permits: usize, acquire_all: bool) -> Option<usize> {
if self.permits < permits {
None
} else {
if acquire_all {
permits = self.permits;
}
self.permits -= permits;
Some(permits)
}
}
fn wake(&mut self) {
self.waker.wake();
}
}
/// A fair [`Semaphore`] implementation.
///
/// Tasks are allowed to acquire permits in FIFO order. A task waiting to acquire
/// a large number of permits will prevent other tasks from acquiring any permits
/// until its request is satisfied.
///
/// Up to `N` tasks may attempt to acquire permits concurrently. If additional
/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned.
pub struct FairSemaphore<M, const N: usize>
where
M: RawMutex,
{
state: Mutex<M, RefCell<FairSemaphoreState<N>>>,
}
impl<M, const N: usize> Default for FairSemaphore<M, N>
where
M: RawMutex,
{
fn default() -> Self {
Self::new(0)
}
}
impl<M, const N: usize> FairSemaphore<M, N>
where
M: RawMutex,
{
/// Create a new `FairSemaphore`.
pub const fn new(permits: usize) -> Self {
Self {
state: Mutex::new(RefCell::new(FairSemaphoreState::new(permits))),
}
}
#[cfg(test)]
fn permits(&self) -> usize {
self.state.lock(|cell| cell.borrow().permits)
}
fn poll_acquire(
&self,
permits: usize,
acquire_all: bool,
cx: Option<(&mut Option<usize>, &Waker)>,
) -> Poll<Result<SemaphoreReleaser<'_, Self>, WaitQueueFull>> {
let ticket = cx.as_ref().map(|(x, _)| **x).unwrap_or(None);
self.state.lock(|cell| {
let mut state = cell.borrow_mut();
if let Some(permits) = state.take(ticket, permits, acquire_all) {
Poll::Ready(Ok(SemaphoreReleaser {
semaphore: self,
permits,
}))
} else if let Some((ticket_ref, waker)) = cx {
match state.register(ticket, waker) {
Ok(ticket) => {
*ticket_ref = Some(ticket);
Poll::Pending
}
Err(err) => Poll::Ready(Err(err)),
}
} else {
Poll::Pending
}
})
}
}
/// An error indicating the [`FairSemaphore`]'s wait queue is full.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct WaitQueueFull;
impl<M: RawMutex, const N: usize> Semaphore for FairSemaphore<M, N> {
type Error = WaitQueueFull;
fn acquire(&self, permits: usize) -> impl Future<Output = Result<SemaphoreReleaser<'_, Self>, Self::Error>> {
FairAcquire {
sema: self,
permits,
ticket: None,
}
}
fn try_acquire(&self, permits: usize) -> Option<SemaphoreReleaser<'_, Self>> {
match self.poll_acquire(permits, false, None) {
Poll::Ready(Ok(x)) => Some(x),
_ => None,
}
}
fn acquire_all(&self, min: usize) -> impl Future<Output = Result<SemaphoreReleaser<'_, Self>, Self::Error>> {
FairAcquireAll {
sema: self,
min,
ticket: None,
}
}
fn try_acquire_all(&self, min: usize) -> Option<SemaphoreReleaser<'_, Self>> {
match self.poll_acquire(min, true, None) {
Poll::Ready(Ok(x)) => Some(x),
_ => None,
}
}
fn release(&self, permits: usize) {
if permits > 0 {
self.state.lock(|cell| {
let mut state = cell.borrow_mut();
state.permits += permits;
state.wake();
});
}
}
fn set(&self, permits: usize) {
self.state.lock(|cell| {
let mut state = cell.borrow_mut();
if permits > state.permits {
state.wake();
}
state.permits = permits;
});
}
}
struct FairAcquire<'a, M: RawMutex, const N: usize> {
sema: &'a FairSemaphore<M, N>,
permits: usize,
ticket: Option<usize>,
}
impl<'a, M: RawMutex, const N: usize> Drop for FairAcquire<'a, M, N> {
fn drop(&mut self) {
self.sema
.state
.lock(|cell| cell.borrow_mut().cancel(self.ticket.take()));
}
}
impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M, N> {
type Output = Result<SemaphoreReleaser<'a, FairSemaphore<M, N>>, WaitQueueFull>;
fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
self.sema
.poll_acquire(self.permits, false, Some((&mut self.ticket, cx.waker())))
}
}
struct FairAcquireAll<'a, M: RawMutex, const N: usize> {
sema: &'a FairSemaphore<M, N>,
min: usize,
ticket: Option<usize>,
}
impl<'a, M: RawMutex, const N: usize> Drop for FairAcquireAll<'a, M, N> {
fn drop(&mut self) {
self.sema
.state
.lock(|cell| cell.borrow_mut().cancel(self.ticket.take()));
}
}
impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a, M, N> {
type Output = Result<SemaphoreReleaser<'a, FairSemaphore<M, N>>, WaitQueueFull>;
fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
self.sema
.poll_acquire(self.min, true, Some((&mut self.ticket, cx.waker())))
}
}
struct FairSemaphoreState<const N: usize> {
permits: usize,
next_ticket: usize,
wakers: Deque<Option<Waker>, N>,
}
impl<const N: usize> FairSemaphoreState<N> {
/// Create a new empty instance
const fn new(permits: usize) -> Self {
Self {
permits,
next_ticket: 0,
wakers: Deque::new(),
}
}
/// Register a waker. If the queue is full the function returns an error
fn register(&mut self, ticket: Option<usize>, w: &Waker) -> Result<usize, WaitQueueFull> {
self.pop_canceled();
match ticket {
None => {
let ticket = self.next_ticket.wrapping_add(self.wakers.len());
self.wakers.push_back(Some(w.clone())).or(Err(WaitQueueFull))?;
Ok(ticket)
}
Some(ticket) => {
self.set_waker(ticket, Some(w.clone()));
Ok(ticket)
}
}
}
fn cancel(&mut self, ticket: Option<usize>) {
if let Some(ticket) = ticket {
self.set_waker(ticket, None);
}
}
fn set_waker(&mut self, ticket: usize, waker: Option<Waker>) {
let i = ticket.wrapping_sub(self.next_ticket);
if i < self.wakers.len() {
let (a, b) = self.wakers.as_mut_slices();
let x = if i < a.len() { &mut a[i] } else { &mut b[i - a.len()] };
*x = waker;
}
}
fn take(&mut self, ticket: Option<usize>, mut permits: usize, acquire_all: bool) -> Option<usize> {
self.pop_canceled();
if permits > self.permits {
return None;
}
match ticket {
Some(n) if n != self.next_ticket => return None,
None if !self.wakers.is_empty() => return None,
_ => (),
}
if acquire_all {
permits = self.permits;
}
self.permits -= permits;
if ticket.is_some() {
self.pop();
if self.permits > 0 {
self.wake();
}
}
Some(permits)
}
fn pop_canceled(&mut self) {
while let Some(None) = self.wakers.front() {
self.pop();
}
}
/// Panics if `self.wakers` is empty
fn pop(&mut self) {
self.wakers.pop_front().unwrap();
self.next_ticket = self.next_ticket.wrapping_add(1);
}
fn wake(&mut self) {
self.pop_canceled();
if let Some(Some(waker)) = self.wakers.front() {
waker.wake_by_ref();
}
}
}
#[cfg(test)]
mod tests {
mod greedy {
use core::pin::pin;
use futures_util::poll;
use super::super::*;
use crate::blocking_mutex::raw::NoopRawMutex;
#[test]
fn try_acquire() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.try_acquire(1).unwrap();
assert_eq!(a.permits(), 1);
assert_eq!(semaphore.permits(), 2);
core::mem::drop(a);
assert_eq!(semaphore.permits(), 3);
}
#[test]
fn disarm() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.try_acquire(1).unwrap();
assert_eq!(a.disarm(), 1);
assert_eq!(semaphore.permits(), 2);
}
#[futures_test::test]
async fn acquire() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.acquire(1).await.unwrap();
assert_eq!(a.permits(), 1);
assert_eq!(semaphore.permits(), 2);
core::mem::drop(a);
assert_eq!(semaphore.permits(), 3);
}
#[test]
fn try_acquire_all() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.try_acquire_all(1).unwrap();
assert_eq!(a.permits(), 3);
assert_eq!(semaphore.permits(), 0);
}
#[futures_test::test]
async fn acquire_all() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.acquire_all(1).await.unwrap();
assert_eq!(a.permits(), 3);
assert_eq!(semaphore.permits(), 0);
}
#[test]
fn release() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
assert_eq!(semaphore.permits(), 3);
semaphore.release(2);
assert_eq!(semaphore.permits(), 5);
}
#[test]
fn set() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
assert_eq!(semaphore.permits(), 3);
semaphore.set(2);
assert_eq!(semaphore.permits(), 2);
}
#[test]
fn contested() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.try_acquire(1).unwrap();
let b = semaphore.try_acquire(3);
assert!(b.is_none());
core::mem::drop(a);
let b = semaphore.try_acquire(3);
assert!(b.is_some());
}
#[futures_test::test]
async fn greedy() {
let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
let a = semaphore.try_acquire(1).unwrap();
let b_fut = semaphore.acquire(3);
let mut b_fut = pin!(b_fut);
let b = poll!(b_fut.as_mut());
assert!(b.is_pending());
// Succeed even through `b` is waiting
let c = semaphore.try_acquire(1);
assert!(c.is_some());
let b = poll!(b_fut.as_mut());
assert!(b.is_pending());
core::mem::drop(a);
let b = poll!(b_fut.as_mut());
assert!(b.is_pending());
core::mem::drop(c);
let b = poll!(b_fut.as_mut());
assert!(b.is_ready());
}
}
mod fair {
use core::pin::pin;
use core::time::Duration;
use futures_executor::ThreadPool;
use futures_timer::Delay;
use futures_util::poll;
use futures_util::task::SpawnExt;
use static_cell::StaticCell;
use super::super::*;
use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
#[test]
fn try_acquire() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.try_acquire(1).unwrap();
assert_eq!(a.permits(), 1);
assert_eq!(semaphore.permits(), 2);
core::mem::drop(a);
assert_eq!(semaphore.permits(), 3);
}
#[test]
fn disarm() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.try_acquire(1).unwrap();
assert_eq!(a.disarm(), 1);
assert_eq!(semaphore.permits(), 2);
}
#[futures_test::test]
async fn acquire() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.acquire(1).await.unwrap();
assert_eq!(a.permits(), 1);
assert_eq!(semaphore.permits(), 2);
core::mem::drop(a);
assert_eq!(semaphore.permits(), 3);
}
#[test]
fn try_acquire_all() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.try_acquire_all(1).unwrap();
assert_eq!(a.permits(), 3);
assert_eq!(semaphore.permits(), 0);
}
#[futures_test::test]
async fn acquire_all() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.acquire_all(1).await.unwrap();
assert_eq!(a.permits(), 3);
assert_eq!(semaphore.permits(), 0);
}
#[test]
fn release() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
assert_eq!(semaphore.permits(), 3);
semaphore.release(2);
assert_eq!(semaphore.permits(), 5);
}
#[test]
fn set() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
assert_eq!(semaphore.permits(), 3);
semaphore.set(2);
assert_eq!(semaphore.permits(), 2);
}
#[test]
fn contested() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.try_acquire(1).unwrap();
let b = semaphore.try_acquire(3);
assert!(b.is_none());
core::mem::drop(a);
let b = semaphore.try_acquire(3);
assert!(b.is_some());
}
#[futures_test::test]
async fn fairness() {
let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
let a = semaphore.try_acquire(1);
assert!(a.is_some());
let b_fut = semaphore.acquire(3);
let mut b_fut = pin!(b_fut);
let b = poll!(b_fut.as_mut()); // Poll `b_fut` once so it is registered
assert!(b.is_pending());
let c = semaphore.try_acquire(1);
assert!(c.is_none());
let c_fut = semaphore.acquire(1);
let mut c_fut = pin!(c_fut);
let c = poll!(c_fut.as_mut()); // Poll `c_fut` once so it is registered
assert!(c.is_pending()); // `c` is blocked behind `b`
let d = semaphore.acquire(1).await;
assert!(matches!(d, Err(WaitQueueFull)));
core::mem::drop(a);
let c = poll!(c_fut.as_mut());
assert!(c.is_pending()); // `c` is still blocked behind `b`
let b = poll!(b_fut.as_mut());
assert!(b.is_ready());
let c = poll!(c_fut.as_mut());
assert!(c.is_pending()); // `c` is still blocked behind `b`
core::mem::drop(b);
let c = poll!(c_fut.as_mut());
assert!(c.is_ready());
}
#[futures_test::test]
async fn wakers() {
let executor = ThreadPool::new().unwrap();
static SEMAPHORE: StaticCell<FairSemaphore<CriticalSectionRawMutex, 2>> = StaticCell::new();
let semaphore = &*SEMAPHORE.init(FairSemaphore::new(3));
let a = semaphore.try_acquire(2);
assert!(a.is_some());
let b_task = executor
.spawn_with_handle(async move { semaphore.acquire(2).await })
.unwrap();
while semaphore.state.lock(|x| x.borrow().wakers.is_empty()) {
Delay::new(Duration::from_millis(50)).await;
}
let c_task = executor
.spawn_with_handle(async move { semaphore.acquire(1).await })
.unwrap();
core::mem::drop(a);
let b = b_task.await.unwrap();
assert_eq!(b.permits(), 2);
let c = c_task.await.unwrap();
assert_eq!(c.permits(), 1);
}
}
}

View File

@@ -65,7 +65,7 @@ where
}
}
impl<M, T: Send> Signal<M, T>
impl<M, T> Signal<M, T>
where
M: RawMutex,
{
@@ -125,7 +125,7 @@ where
})
}
/// non-blocking method to check whether this signal has been signaled.
/// non-blocking method to check whether this signal has been signaled. This does not clear the signal.
pub fn signaled(&self) -> bool {
self.state.lock(|cell| {
let state = cell.replace(State::None);

View File

@@ -14,7 +14,7 @@ impl<const N: usize> MultiWakerRegistration<N> {
}
/// Register a waker. If the buffer is full the function returns it in the error
pub fn register<'a>(&mut self, w: &'a Waker) {
pub fn register(&mut self, w: &Waker) {
// If we already have some waker that wakes the same task as `w`, do nothing.
// This avoids cloning wakers, and avoids unnecessary mass-wakes.
for w2 in &self.wakers {

View File

@@ -1,10 +1,7 @@
//! A zero-copy queue for sending values between asynchronous tasks.
//!
//! It can be used concurrently by multiple producers (senders) and multiple
//! consumers (receivers), i.e. it is an "MPMC channel".
//!
//! Receivers are competing for messages. So a message that is received by
//! one receiver is not received by any other.
//! It can be used concurrently by a producer (sender) and a
//! consumer (receiver), i.e. it is an "SPSC channel".
//!
//! This queue takes a Mutex type so that various
//! targets can be attained. For example, a ThreadModeMutex can be used