Merge pull request #3644 from bugadani/lock_once

Only lock once to wake a task
This commit is contained in:
Dario Nieuwenhuis 2024-12-16 14:46:08 +00:00 committed by GitHub
commit 47e96beff4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 73 additions and 40 deletions

View File

@ -386,11 +386,11 @@ impl SyncExecutor {
/// - `task` must be set up to run in this executor. /// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one). /// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)] #[inline(always)]
unsafe fn enqueue(&self, task: TaskRef) { unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
trace::task_ready_begin(self, &task); trace::task_ready_begin(self, &task);
if self.run_queue.enqueue(task) { if self.run_queue.enqueue(task, l) {
self.pender.pend(); self.pender.pend();
} }
} }
@ -401,7 +401,9 @@ impl SyncExecutor {
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
trace::task_new(self, &task); trace::task_new(self, &task);
self.enqueue(task); state::locked(|l| {
self.enqueue(task, l);
})
} }
/// # Safety /// # Safety
@ -544,13 +546,13 @@ impl Executor {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task(task: TaskRef) { pub fn wake_task(task: TaskRef) {
let header = task.header(); let header = task.header();
if header.state.run_enqueue() { header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it. // We have just marked the task as scheduled, so enqueue it.
unsafe { unsafe {
let executor = header.executor.get().unwrap_unchecked(); let executor = header.executor.get().unwrap_unchecked();
executor.enqueue(task); executor.enqueue(task, l);
} }
} });
} }
/// Wake a task by `TaskRef` without calling pend. /// Wake a task by `TaskRef` without calling pend.
@ -558,11 +560,11 @@ pub fn wake_task(task: TaskRef) {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task_no_pend(task: TaskRef) { pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header(); let header = task.header();
if header.state.run_enqueue() { header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it. // We have just marked the task as scheduled, so enqueue it.
unsafe { unsafe {
let executor = header.executor.get().unwrap_unchecked(); let executor = header.executor.get().unwrap_unchecked();
executor.run_queue.enqueue(task); executor.run_queue.enqueue(task, l);
} }
} });
} }

View File

@ -45,7 +45,7 @@ impl RunQueue {
/// ///
/// `item` must NOT be already enqueued in any queue. /// `item` must NOT be already enqueued in any queue.
#[inline(always)] #[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
let mut was_empty = false; let mut was_empty = false;
self.head self.head

View File

@ -44,13 +44,11 @@ impl RunQueue {
/// ///
/// `item` must NOT be already enqueued in any queue. /// `item` must NOT be already enqueued in any queue.
#[inline(always)] #[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
critical_section::with(|cs| { let prev = self.head.borrow(cs).replace(Some(task));
let prev = self.head.borrow(cs).replace(Some(task)); task.header().run_queue_item.next.borrow(cs).set(prev);
task.header().run_queue_item.next.borrow(cs).set(prev);
prev.is_none() prev.is_none()
})
} }
/// Empty the queue, then call `on_task` for each task that was in the queue. /// Empty the queue, then call `on_task` for each task that was in the queue.

View File

@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering};
use super::timer_queue::TimerEnqueueOperation; use super::timer_queue::TimerEnqueueOperation;
pub(crate) struct Token(());
/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}
/// Task is spawned (has a future) /// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0; pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue /// Task is in the executor run queue
@ -34,10 +43,12 @@ impl State {
self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
} }
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)] #[inline(always)]
pub fn run_enqueue(&self) -> bool { pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
self.state if self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started, // If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
@ -48,6 +59,9 @@ impl State {
} }
}) })
.is_ok() .is_ok()
{
locked(f);
}
} }
/// Unmark the task as run-queued. Return whether the task is spawned. /// Unmark the task as run-queued. Return whether the task is spawned.

View File

@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
use super::timer_queue::TimerEnqueueOperation; use super::timer_queue::TimerEnqueueOperation;
pub(crate) struct Token(());
/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}
// Must be kept in sync with the layout of `State`! // Must be kept in sync with the layout of `State`!
pub(crate) const STATE_SPAWNED: u32 = 1 << 0; pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
@ -57,9 +66,10 @@ impl State {
self.spawned.store(false, Ordering::Relaxed); self.spawned.store(false, Ordering::Relaxed);
} }
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)] #[inline(always)]
pub fn run_enqueue(&self) -> bool { pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
unsafe { unsafe {
loop { loop {
let state: u32; let state: u32;
@ -67,14 +77,15 @@ impl State {
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
asm!("clrex", options(nomem, nostack)); asm!("clrex", options(nomem, nostack));
return false; return;
} }
let outcome: usize; let outcome: usize;
let new_state = state | STATE_RUN_QUEUED; let new_state = state | STATE_RUN_QUEUED;
asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack)); asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
if outcome == 0 { if outcome == 0 {
return true; locked(f);
return;
} }
} }
} }

View File

@ -1,6 +1,7 @@
use core::cell::Cell; use core::cell::Cell;
use critical_section::Mutex; pub(crate) use critical_section::{with as locked, CriticalSection as Token};
use critical_section::{CriticalSection, Mutex};
use super::timer_queue::TimerEnqueueOperation; use super::timer_queue::TimerEnqueueOperation;
@ -23,13 +24,15 @@ impl State {
} }
fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
critical_section::with(|cs| { critical_section::with(|cs| self.update_with_cs(cs, f))
let s = self.state.borrow(cs); }
let mut val = s.get();
let r = f(&mut val); fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R {
s.set(val); let s = self.state.borrow(cs);
r let mut val = s.get();
}) let r = f(&mut val);
s.set(val);
r
} }
/// If task is idle, mark it as spawned + run_queued and return true. /// If task is idle, mark it as spawned + run_queued and return true.
@ -51,17 +54,22 @@ impl State {
self.update(|s| *s &= !STATE_SPAWNED); self.update(|s| *s &= !STATE_SPAWNED);
} }
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)] #[inline(always)]
pub fn run_enqueue(&self) -> bool { pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
self.update(|s| { critical_section::with(|cs| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { if self.update_with_cs(cs, |s| {
false if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
} else { false
*s |= STATE_RUN_QUEUED; } else {
true *s |= STATE_RUN_QUEUED;
true
}
}) {
f(cs);
} }
}) });
} }
/// Unmark the task as run-queued. Return whether the task is spawned. /// Unmark the task as run-queued. Return whether the task is spawned.