diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml index 2a64b9c83..5effaca65 100644 --- a/embassy-executor/Cargo.toml +++ b/embassy-executor/Cargo.toml @@ -55,6 +55,7 @@ avr-device = { version = "0.5.3", optional = true } [dev-dependencies] critical-section = { version = "1.1", features = ["std"] } trybuild = "1.0" +embassy-sync = { path = "../embassy-sync" } [features] diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 0ac569946..6503b556f 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -52,7 +52,7 @@ use super::SpawnToken; /// ```text /// ┌────────────┐ ┌────────────────────────┐ /// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│ -/// │ │ │ │ │ +/// │ │ ├7─►│ │ /// │ └─────┬──────┘ └──────▲─────────────────┘ /// │ 1 │ /// │ │ ┌────────────┘ @@ -76,6 +76,7 @@ use super::SpawnToken; /// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` /// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` /// - 6: Task is dequeued and then ignored via `State::run_dequeue` +/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index 6f5266bda..bdd317b53 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -44,19 +44,8 @@ impl State { /// function if the task was successfully marked. #[inline(always)] pub fn run_enqueue(&self, f: impl FnOnce(Token)) { - if self - .state - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - None - } else { - // Mark it as scheduled - Some(state | STATE_RUN_QUEUED) - } - }) - .is_ok() - { + let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel); + if prev & STATE_RUN_QUEUED == 0 { locked(f); } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index 4896b33c5..06bf24343 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -72,7 +72,7 @@ impl State { let state: u32; asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack)); - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { + if state & STATE_RUN_QUEUED != 0 { asm!("clrex", options(nomem, nostack)); return; } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 29b10f6e3..4733af278 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -56,12 +56,9 @@ impl State { pub fn run_enqueue(&self, f: impl FnOnce(Token)) { critical_section::with(|cs| { if self.update_with_cs(cs, |s| { - if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { - false - } else { - *s |= STATE_RUN_QUEUED; - true - } + let ok = *s & STATE_RUN_QUEUED == 0; + *s |= STATE_RUN_QUEUED; + ok }) { f(cs); } diff --git a/embassy-executor/tests/test.rs b/embassy-executor/tests/test.rs index 0ce1f1891..78c49c071 100644 --- a/embassy-executor/tests/test.rs +++ b/embassy-executor/tests/test.rs @@ -137,6 +137,139 @@ fn executor_task_self_wake_twice() { ) } +#[test] +fn waking_after_completion_does_not_poll() { + use embassy_sync::waitqueue::AtomicWaker; + + #[task] + async fn task1(trace: Trace, waker: &'static AtomicWaker) { + poll_fn(|cx| { + trace.push("poll task1"); + waker.register(cx.waker()); + Poll::Ready(()) + }) + .await + } + + let waker = Box::leak(Box::new(AtomicWaker::new())); + + let (executor, trace) = setup(); + executor.spawner().spawn(task1(trace.clone(), waker)).unwrap(); + + unsafe { executor.poll() }; + waker.wake(); + unsafe { executor.poll() }; + + // Exited task may be waken but is not polled + waker.wake(); + waker.wake(); + unsafe { executor.poll() }; // Clears running status + + // Can respawn waken-but-dead task + executor.spawner().spawn(task1(trace.clone(), waker)).unwrap(); + + unsafe { executor.poll() }; + + assert_eq!( + trace.get(), + &[ + "pend", // spawning a task pends the executor + "poll task1", // + "pend", // manual wake, gets cleared by poll + "pend", // manual wake, single pend for two wakes + "pend", // respawning a task pends the executor + "poll task1", // + ] + ) +} + +#[test] +fn waking_with_old_waker_after_respawn() { + use embassy_sync::waitqueue::AtomicWaker; + + async fn yield_now(trace: Trace) { + let mut yielded = false; + poll_fn(|cx| { + if yielded { + Poll::Ready(()) + } else { + trace.push("yield_now"); + yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .await + } + + #[task] + async fn task1(trace: Trace, waker: &'static AtomicWaker) { + yield_now(trace.clone()).await; + poll_fn(|cx| { + trace.push("poll task1"); + waker.register(cx.waker()); + Poll::Ready(()) + }) + .await; + } + + let waker = Box::leak(Box::new(AtomicWaker::new())); + + let (executor, trace) = setup(); + executor.spawner().spawn(task1(trace.clone(), waker)).unwrap(); + + unsafe { executor.poll() }; + unsafe { executor.poll() }; // progress to registering the waker + waker.wake(); + unsafe { executor.poll() }; + // Task has exited + + assert_eq!( + trace.get(), + &[ + "pend", // spawning a task pends the executor + "yield_now", // + "pend", // yield_now wakes the task + "poll task1", // + "pend", // task self-wakes + ] + ); + + // Can respawn task on another executor + let (other_executor, other_trace) = setup(); + other_executor + .spawner() + .spawn(task1(other_trace.clone(), waker)) + .unwrap(); + + unsafe { other_executor.poll() }; // just run to the yield_now + waker.wake(); // trigger old waker registration + unsafe { executor.poll() }; + unsafe { other_executor.poll() }; + + // First executor's trace has not changed + assert_eq!( + trace.get(), + &[ + "pend", // spawning a task pends the executor + "yield_now", // + "pend", // yield_now wakes the task + "poll task1", // + "pend", // task self-wakes + ] + ); + + assert_eq!( + other_trace.get(), + &[ + "pend", // spawning a task pends the executor + "yield_now", // + "pend", // manual wake, gets cleared by poll + "poll task1", // + ] + ); +} + #[test] fn executor_task_cfg_args() { // simulate cfg'ing away argument c