Merge #1187
1187: executor: Minor refactoring r=Dirbaio a=GrantM11235 The third commit may be slightly more controversial than the first two. Personally, I think it makes the code more readable and easier to reason about, but I can drop it if you disagree. Co-authored-by: Grant Miller <GrantM11235@gmail.com>
This commit is contained in:
		
						commit
						7d8e6649b7
					
				@ -15,10 +15,10 @@ mod waker;
 | 
			
		||||
 | 
			
		||||
use core::cell::Cell;
 | 
			
		||||
use core::future::Future;
 | 
			
		||||
use core::mem;
 | 
			
		||||
use core::pin::Pin;
 | 
			
		||||
use core::ptr::NonNull;
 | 
			
		||||
use core::task::{Context, Poll};
 | 
			
		||||
use core::{mem, ptr};
 | 
			
		||||
 | 
			
		||||
use atomic_polyfill::{AtomicU32, Ordering};
 | 
			
		||||
use critical_section::CriticalSection;
 | 
			
		||||
@ -46,8 +46,8 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
 | 
			
		||||
pub(crate) struct TaskHeader {
 | 
			
		||||
    pub(crate) state: AtomicU32,
 | 
			
		||||
    pub(crate) run_queue_item: RunQueueItem,
 | 
			
		||||
    pub(crate) executor: Cell<*const Executor>,         // Valid if state != 0
 | 
			
		||||
    pub(crate) poll_fn: UninitCell<unsafe fn(TaskRef)>, // Valid if STATE_SPAWNED
 | 
			
		||||
    pub(crate) executor: Cell<Option<&'static Executor>>,
 | 
			
		||||
    poll_fn: Cell<Option<unsafe fn(TaskRef)>>,
 | 
			
		||||
 | 
			
		||||
    #[cfg(feature = "integrated-timers")]
 | 
			
		||||
    pub(crate) expires_at: Cell<Instant>,
 | 
			
		||||
@ -55,22 +55,6 @@ pub(crate) struct TaskHeader {
 | 
			
		||||
    pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl TaskHeader {
 | 
			
		||||
    const fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            state: AtomicU32::new(0),
 | 
			
		||||
            run_queue_item: RunQueueItem::new(),
 | 
			
		||||
            executor: Cell::new(ptr::null()),
 | 
			
		||||
            poll_fn: UninitCell::uninit(),
 | 
			
		||||
 | 
			
		||||
            #[cfg(feature = "integrated-timers")]
 | 
			
		||||
            expires_at: Cell::new(Instant::from_ticks(0)),
 | 
			
		||||
            #[cfg(feature = "integrated-timers")]
 | 
			
		||||
            timer_queue_item: timer_queue::TimerQueueItem::new(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
 | 
			
		||||
#[derive(Clone, Copy)]
 | 
			
		||||
pub struct TaskRef {
 | 
			
		||||
@ -128,7 +112,18 @@ impl<F: Future + 'static> TaskStorage<F> {
 | 
			
		||||
    /// Create a new TaskStorage, in not-spawned state.
 | 
			
		||||
    pub const fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            raw: TaskHeader::new(),
 | 
			
		||||
            raw: TaskHeader {
 | 
			
		||||
                state: AtomicU32::new(0),
 | 
			
		||||
                run_queue_item: RunQueueItem::new(),
 | 
			
		||||
                executor: Cell::new(None),
 | 
			
		||||
                // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
 | 
			
		||||
                poll_fn: Cell::new(None),
 | 
			
		||||
 | 
			
		||||
                #[cfg(feature = "integrated-timers")]
 | 
			
		||||
                expires_at: Cell::new(Instant::from_ticks(0)),
 | 
			
		||||
                #[cfg(feature = "integrated-timers")]
 | 
			
		||||
                timer_queue_item: timer_queue::TimerQueueItem::new(),
 | 
			
		||||
            },
 | 
			
		||||
            future: UninitCell::uninit(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -147,26 +142,14 @@ impl<F: Future + 'static> TaskStorage<F> {
 | 
			
		||||
    /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
 | 
			
		||||
    /// on a different executor.
 | 
			
		||||
    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
 | 
			
		||||
        if self.spawn_mark_used() {
 | 
			
		||||
            return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) };
 | 
			
		||||
        let task = AvailableTask::claim(self);
 | 
			
		||||
        match task {
 | 
			
		||||
            Some(task) => {
 | 
			
		||||
                let task = task.initialize(future);
 | 
			
		||||
                unsafe { SpawnToken::<F>::new(task) }
 | 
			
		||||
            }
 | 
			
		||||
            None => SpawnToken::new_failed(),
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        SpawnToken::<F>::new_failed()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn spawn_mark_used(&'static self) -> bool {
 | 
			
		||||
        let state = STATE_SPAWNED | STATE_RUN_QUEUED;
 | 
			
		||||
        self.raw
 | 
			
		||||
            .state
 | 
			
		||||
            .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
 | 
			
		||||
            .is_ok()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef {
 | 
			
		||||
        // Initialize the task
 | 
			
		||||
        self.raw.poll_fn.write(Self::poll);
 | 
			
		||||
        self.future.write(future());
 | 
			
		||||
        TaskRef::new(self)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    unsafe fn poll(p: TaskRef) {
 | 
			
		||||
@ -191,6 +174,28 @@ impl<F: Future + 'static> TaskStorage<F> {
 | 
			
		||||
 | 
			
		||||
unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
 | 
			
		||||
 | 
			
		||||
struct AvailableTask<F: Future + 'static> {
 | 
			
		||||
    task: &'static TaskStorage<F>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<F: Future + 'static> AvailableTask<F> {
 | 
			
		||||
    fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
 | 
			
		||||
        task.raw
 | 
			
		||||
            .state
 | 
			
		||||
            .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
 | 
			
		||||
            .ok()
 | 
			
		||||
            .map(|_| Self { task })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn initialize(self, future: impl FnOnce() -> F) -> TaskRef {
 | 
			
		||||
        unsafe {
 | 
			
		||||
            self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
 | 
			
		||||
            self.task.future.write(future());
 | 
			
		||||
        }
 | 
			
		||||
        TaskRef::new(self.task)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Raw storage that can hold up to N tasks of the same type.
 | 
			
		||||
///
 | 
			
		||||
/// This is essentially a `[TaskStorage<F>; N]`.
 | 
			
		||||
@ -214,13 +219,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
 | 
			
		||||
    /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
 | 
			
		||||
    /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
 | 
			
		||||
    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
 | 
			
		||||
        for task in &self.pool {
 | 
			
		||||
            if task.spawn_mark_used() {
 | 
			
		||||
                return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
 | 
			
		||||
        let task = self.pool.iter().find_map(AvailableTask::claim);
 | 
			
		||||
        match task {
 | 
			
		||||
            Some(task) => {
 | 
			
		||||
                let task = task.initialize(future);
 | 
			
		||||
                unsafe { SpawnToken::<F>::new(task) }
 | 
			
		||||
            }
 | 
			
		||||
            None => SpawnToken::new_failed(),
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        SpawnToken::<F>::new_failed()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
 | 
			
		||||
@ -262,13 +268,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
 | 
			
		||||
        // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
 | 
			
		||||
        // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
 | 
			
		||||
 | 
			
		||||
        for task in &self.pool {
 | 
			
		||||
            if task.spawn_mark_used() {
 | 
			
		||||
                return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
 | 
			
		||||
        let task = self.pool.iter().find_map(AvailableTask::claim);
 | 
			
		||||
        match task {
 | 
			
		||||
            Some(task) => {
 | 
			
		||||
                let task = task.initialize(future);
 | 
			
		||||
                unsafe { SpawnToken::<FutFn>::new(task) }
 | 
			
		||||
            }
 | 
			
		||||
            None => SpawnToken::new_failed(),
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        SpawnToken::<FutFn>::new_failed()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -353,7 +360,7 @@ impl Executor {
 | 
			
		||||
    /// In this case, the task's Future must be Send. This is because this is effectively
 | 
			
		||||
    /// sending the task to the executor thread.
 | 
			
		||||
    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
 | 
			
		||||
        task.header().executor.set(self);
 | 
			
		||||
        task.header().executor.set(Some(self));
 | 
			
		||||
 | 
			
		||||
        #[cfg(feature = "rtos-trace")]
 | 
			
		||||
        trace::task_new(task.as_ptr() as u32);
 | 
			
		||||
@ -405,7 +412,7 @@ impl Executor {
 | 
			
		||||
                trace::task_exec_begin(p.as_ptr() as u32);
 | 
			
		||||
 | 
			
		||||
                // Run the task
 | 
			
		||||
                task.poll_fn.read()(p);
 | 
			
		||||
                task.poll_fn.get().unwrap_unchecked()(p);
 | 
			
		||||
 | 
			
		||||
                #[cfg(feature = "rtos-trace")]
 | 
			
		||||
                trace::task_exec_end();
 | 
			
		||||
@ -462,7 +469,7 @@ pub fn wake_task(task: TaskRef) {
 | 
			
		||||
 | 
			
		||||
        // We have just marked the task as scheduled, so enqueue it.
 | 
			
		||||
        unsafe {
 | 
			
		||||
            let executor = &*header.executor.get();
 | 
			
		||||
            let executor = header.executor.get().unwrap_unchecked();
 | 
			
		||||
            executor.enqueue(cs, task);
 | 
			
		||||
        }
 | 
			
		||||
    })
 | 
			
		||||
 | 
			
		||||
@ -25,9 +25,3 @@ impl<T> UninitCell<T> {
 | 
			
		||||
        ptr::drop_in_place(self.as_mut_ptr())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: Copy> UninitCell<T> {
 | 
			
		||||
    pub unsafe fn read(&self) -> T {
 | 
			
		||||
        ptr::read(self.as_mut_ptr())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -89,10 +89,10 @@ impl Spawner {
 | 
			
		||||
    ///
 | 
			
		||||
    /// Panics if the current executor is not an Embassy executor.
 | 
			
		||||
    pub async fn for_current_executor() -> Self {
 | 
			
		||||
        poll_fn(|cx| unsafe {
 | 
			
		||||
        poll_fn(|cx| {
 | 
			
		||||
            let task = raw::task_from_waker(cx.waker());
 | 
			
		||||
            let executor = task.header().executor.get();
 | 
			
		||||
            Poll::Ready(Self::new(&*executor))
 | 
			
		||||
            let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
 | 
			
		||||
            Poll::Ready(Self::new(executor))
 | 
			
		||||
        })
 | 
			
		||||
        .await
 | 
			
		||||
    }
 | 
			
		||||
@ -165,10 +165,10 @@ impl SendSpawner {
 | 
			
		||||
    ///
 | 
			
		||||
    /// Panics if the current executor is not an Embassy executor.
 | 
			
		||||
    pub async fn for_current_executor() -> Self {
 | 
			
		||||
        poll_fn(|cx| unsafe {
 | 
			
		||||
        poll_fn(|cx| {
 | 
			
		||||
            let task = raw::task_from_waker(cx.waker());
 | 
			
		||||
            let executor = task.header().executor.get();
 | 
			
		||||
            Poll::Ready(Self::new(&*executor))
 | 
			
		||||
            let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
 | 
			
		||||
            Poll::Ready(Self::new(executor))
 | 
			
		||||
        })
 | 
			
		||||
        .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user