Merge pull request #3573 from bugadani/enqueue
Unconditionally set `RUN_QUEUED`
This commit is contained in:
		
						commit
						c1120c7138
					
				@ -55,6 +55,7 @@ avr-device = { version = "0.5.3", optional = true }
 | 
				
			|||||||
[dev-dependencies]
 | 
					[dev-dependencies]
 | 
				
			||||||
critical-section = { version = "1.1", features = ["std"] }
 | 
					critical-section = { version = "1.1", features = ["std"] }
 | 
				
			||||||
trybuild = "1.0"
 | 
					trybuild = "1.0"
 | 
				
			||||||
 | 
					embassy-sync = { path = "../embassy-sync" }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[features]
 | 
					[features]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -52,7 +52,7 @@ use super::SpawnToken;
 | 
				
			|||||||
/// ```text
 | 
					/// ```text
 | 
				
			||||||
///    ┌────────────┐   ┌────────────────────────┐
 | 
					///    ┌────────────┐   ┌────────────────────────┐
 | 
				
			||||||
/// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│
 | 
					/// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│
 | 
				
			||||||
/// │  │            │   │                        │
 | 
					/// │  │            ├7─►│                        │
 | 
				
			||||||
/// │  └─────┬──────┘   └──────▲─────────────────┘
 | 
					/// │  └─────┬──────┘   └──────▲─────────────────┘
 | 
				
			||||||
/// │        1                 │
 | 
					/// │        1                 │
 | 
				
			||||||
/// │        │    ┌────────────┘
 | 
					/// │        │    ┌────────────┘
 | 
				
			||||||
@ -76,6 +76,7 @@ use super::SpawnToken;
 | 
				
			|||||||
/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready`
 | 
					/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready`
 | 
				
			||||||
/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
 | 
					/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
 | 
				
			||||||
/// - 6: Task is dequeued and then ignored via `State::run_dequeue`
 | 
					/// - 6: Task is dequeued and then ignored via `State::run_dequeue`
 | 
				
			||||||
 | 
					/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
 | 
				
			||||||
pub(crate) struct TaskHeader {
 | 
					pub(crate) struct TaskHeader {
 | 
				
			||||||
    pub(crate) state: State,
 | 
					    pub(crate) state: State,
 | 
				
			||||||
    pub(crate) run_queue_item: RunQueueItem,
 | 
					    pub(crate) run_queue_item: RunQueueItem,
 | 
				
			||||||
 | 
				
			|||||||
@ -44,19 +44,8 @@ impl State {
 | 
				
			|||||||
    /// function if the task was successfully marked.
 | 
					    /// function if the task was successfully marked.
 | 
				
			||||||
    #[inline(always)]
 | 
					    #[inline(always)]
 | 
				
			||||||
    pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
 | 
					    pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
 | 
				
			||||||
        if self
 | 
					        let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel);
 | 
				
			||||||
            .state
 | 
					        if prev & STATE_RUN_QUEUED == 0 {
 | 
				
			||||||
            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
 | 
					 | 
				
			||||||
                // If already scheduled, or if not started,
 | 
					 | 
				
			||||||
                if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
 | 
					 | 
				
			||||||
                    None
 | 
					 | 
				
			||||||
                } else {
 | 
					 | 
				
			||||||
                    // Mark it as scheduled
 | 
					 | 
				
			||||||
                    Some(state | STATE_RUN_QUEUED)
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            })
 | 
					 | 
				
			||||||
            .is_ok()
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            locked(f);
 | 
					            locked(f);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -1,4 +1,3 @@
 | 
				
			|||||||
use core::arch::asm;
 | 
					 | 
				
			||||||
use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
 | 
					use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Clone, Copy)]
 | 
					#[derive(Clone, Copy)]
 | 
				
			||||||
@ -67,24 +66,10 @@ impl State {
 | 
				
			|||||||
    /// function if the task was successfully marked.
 | 
					    /// function if the task was successfully marked.
 | 
				
			||||||
    #[inline(always)]
 | 
					    #[inline(always)]
 | 
				
			||||||
    pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
 | 
					    pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
 | 
				
			||||||
        unsafe {
 | 
					        let old = self.run_queued.swap(true, Ordering::AcqRel);
 | 
				
			||||||
            loop {
 | 
					 | 
				
			||||||
                let state: u32;
 | 
					 | 
				
			||||||
                asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
 | 
					        if !old {
 | 
				
			||||||
                    asm!("clrex", options(nomem, nostack));
 | 
					 | 
				
			||||||
                    return;
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                let outcome: usize;
 | 
					 | 
				
			||||||
                let new_state = state | STATE_RUN_QUEUED;
 | 
					 | 
				
			||||||
                asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
 | 
					 | 
				
			||||||
                if outcome == 0 {
 | 
					 | 
				
			||||||
            locked(f);
 | 
					            locked(f);
 | 
				
			||||||
                    return;
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -56,12 +56,9 @@ impl State {
 | 
				
			|||||||
    pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
 | 
					    pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
 | 
				
			||||||
        critical_section::with(|cs| {
 | 
					        critical_section::with(|cs| {
 | 
				
			||||||
            if self.update_with_cs(cs, |s| {
 | 
					            if self.update_with_cs(cs, |s| {
 | 
				
			||||||
                if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
 | 
					                let ok = *s & STATE_RUN_QUEUED == 0;
 | 
				
			||||||
                    false
 | 
					 | 
				
			||||||
                } else {
 | 
					 | 
				
			||||||
                *s |= STATE_RUN_QUEUED;
 | 
					                *s |= STATE_RUN_QUEUED;
 | 
				
			||||||
                    true
 | 
					                ok
 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }) {
 | 
					            }) {
 | 
				
			||||||
                f(cs);
 | 
					                f(cs);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
				
			|||||||
@ -137,6 +137,139 @@ fn executor_task_self_wake_twice() {
 | 
				
			|||||||
    )
 | 
					    )
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[test]
 | 
				
			||||||
 | 
					fn waking_after_completion_does_not_poll() {
 | 
				
			||||||
 | 
					    use embassy_sync::waitqueue::AtomicWaker;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[task]
 | 
				
			||||||
 | 
					    async fn task1(trace: Trace, waker: &'static AtomicWaker) {
 | 
				
			||||||
 | 
					        poll_fn(|cx| {
 | 
				
			||||||
 | 
					            trace.push("poll task1");
 | 
				
			||||||
 | 
					            waker.register(cx.waker());
 | 
				
			||||||
 | 
					            Poll::Ready(())
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					        .await
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    let waker = Box::leak(Box::new(AtomicWaker::new()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    let (executor, trace) = setup();
 | 
				
			||||||
 | 
					    executor.spawner().spawn(task1(trace.clone(), waker)).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    unsafe { executor.poll() };
 | 
				
			||||||
 | 
					    waker.wake();
 | 
				
			||||||
 | 
					    unsafe { executor.poll() };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Exited task may be waken but is not polled
 | 
				
			||||||
 | 
					    waker.wake();
 | 
				
			||||||
 | 
					    waker.wake();
 | 
				
			||||||
 | 
					    unsafe { executor.poll() }; // Clears running status
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Can respawn waken-but-dead task
 | 
				
			||||||
 | 
					    executor.spawner().spawn(task1(trace.clone(), waker)).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    unsafe { executor.poll() };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assert_eq!(
 | 
				
			||||||
 | 
					        trace.get(),
 | 
				
			||||||
 | 
					        &[
 | 
				
			||||||
 | 
					            "pend",       // spawning a task pends the executor
 | 
				
			||||||
 | 
					            "poll task1", //
 | 
				
			||||||
 | 
					            "pend",       // manual wake, gets cleared by poll
 | 
				
			||||||
 | 
					            "pend",       // manual wake, single pend for two wakes
 | 
				
			||||||
 | 
					            "pend",       // respawning a task pends the executor
 | 
				
			||||||
 | 
					            "poll task1", //
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[test]
 | 
				
			||||||
 | 
					fn waking_with_old_waker_after_respawn() {
 | 
				
			||||||
 | 
					    use embassy_sync::waitqueue::AtomicWaker;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async fn yield_now(trace: Trace) {
 | 
				
			||||||
 | 
					        let mut yielded = false;
 | 
				
			||||||
 | 
					        poll_fn(|cx| {
 | 
				
			||||||
 | 
					            if yielded {
 | 
				
			||||||
 | 
					                Poll::Ready(())
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                trace.push("yield_now");
 | 
				
			||||||
 | 
					                yielded = true;
 | 
				
			||||||
 | 
					                cx.waker().wake_by_ref();
 | 
				
			||||||
 | 
					                Poll::Pending
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					        .await
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[task]
 | 
				
			||||||
 | 
					    async fn task1(trace: Trace, waker: &'static AtomicWaker) {
 | 
				
			||||||
 | 
					        yield_now(trace.clone()).await;
 | 
				
			||||||
 | 
					        poll_fn(|cx| {
 | 
				
			||||||
 | 
					            trace.push("poll task1");
 | 
				
			||||||
 | 
					            waker.register(cx.waker());
 | 
				
			||||||
 | 
					            Poll::Ready(())
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					        .await;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    let waker = Box::leak(Box::new(AtomicWaker::new()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    let (executor, trace) = setup();
 | 
				
			||||||
 | 
					    executor.spawner().spawn(task1(trace.clone(), waker)).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    unsafe { executor.poll() };
 | 
				
			||||||
 | 
					    unsafe { executor.poll() }; // progress to registering the waker
 | 
				
			||||||
 | 
					    waker.wake();
 | 
				
			||||||
 | 
					    unsafe { executor.poll() };
 | 
				
			||||||
 | 
					    // Task has exited
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assert_eq!(
 | 
				
			||||||
 | 
					        trace.get(),
 | 
				
			||||||
 | 
					        &[
 | 
				
			||||||
 | 
					            "pend",       // spawning a task pends the executor
 | 
				
			||||||
 | 
					            "yield_now",  //
 | 
				
			||||||
 | 
					            "pend",       // yield_now wakes the task
 | 
				
			||||||
 | 
					            "poll task1", //
 | 
				
			||||||
 | 
					            "pend",       // task self-wakes
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					    );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Can respawn task on another executor
 | 
				
			||||||
 | 
					    let (other_executor, other_trace) = setup();
 | 
				
			||||||
 | 
					    other_executor
 | 
				
			||||||
 | 
					        .spawner()
 | 
				
			||||||
 | 
					        .spawn(task1(other_trace.clone(), waker))
 | 
				
			||||||
 | 
					        .unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    unsafe { other_executor.poll() }; // just run to the yield_now
 | 
				
			||||||
 | 
					    waker.wake(); // trigger old waker registration
 | 
				
			||||||
 | 
					    unsafe { executor.poll() };
 | 
				
			||||||
 | 
					    unsafe { other_executor.poll() };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // First executor's trace has not changed
 | 
				
			||||||
 | 
					    assert_eq!(
 | 
				
			||||||
 | 
					        trace.get(),
 | 
				
			||||||
 | 
					        &[
 | 
				
			||||||
 | 
					            "pend",       // spawning a task pends the executor
 | 
				
			||||||
 | 
					            "yield_now",  //
 | 
				
			||||||
 | 
					            "pend",       // yield_now wakes the task
 | 
				
			||||||
 | 
					            "poll task1", //
 | 
				
			||||||
 | 
					            "pend",       // task self-wakes
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					    );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assert_eq!(
 | 
				
			||||||
 | 
					        other_trace.get(),
 | 
				
			||||||
 | 
					        &[
 | 
				
			||||||
 | 
					            "pend",       // spawning a task pends the executor
 | 
				
			||||||
 | 
					            "yield_now",  //
 | 
				
			||||||
 | 
					            "pend",       // manual wake, gets cleared by poll
 | 
				
			||||||
 | 
					            "poll task1", //
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					    );
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[test]
 | 
					#[test]
 | 
				
			||||||
fn executor_task_cfg_args() {
 | 
					fn executor_task_cfg_args() {
 | 
				
			||||||
    // simulate cfg'ing away argument c
 | 
					    // simulate cfg'ing away argument c
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user