Move integrated timer queue into time-queue-driver
This commit is contained in:
		
							parent
							
								
									dc18ee29a0
								
							
						
					
					
						commit
						d45ea43892
					
				@ -10,7 +10,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 | 
				
			|||||||
- embassy-executor no longer provides an `embassy-time-queue-driver` implementation
 | 
					- embassy-executor no longer provides an `embassy-time-queue-driver` implementation
 | 
				
			||||||
- Added `TaskRef::executor` to obtain a reference to a task's executor
 | 
					- Added `TaskRef::executor` to obtain a reference to a task's executor
 | 
				
			||||||
- integrated-timers are no longer processed when polling the executor.
 | 
					- integrated-timers are no longer processed when polling the executor.
 | 
				
			||||||
- `raw::timer_queue::TimerQueue` is now public.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
## 0.6.3 - 2024-11-12
 | 
					## 0.6.3 - 2024-11-12
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -1,99 +1,25 @@
 | 
				
			|||||||
//! Timer queue operations.
 | 
					//! Timer queue operations.
 | 
				
			||||||
use core::cmp::min;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::util::SyncUnsafeCell;
 | 
					use core::cell::Cell;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::TaskRef;
 | 
					use super::TaskRef;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// An item in the timer queue.
 | 
					/// An item in the timer queue.
 | 
				
			||||||
pub struct TimerQueueItem {
 | 
					pub struct TimerQueueItem {
 | 
				
			||||||
    next: SyncUnsafeCell<Option<TaskRef>>,
 | 
					    /// The next item in the queue.
 | 
				
			||||||
    expires_at: SyncUnsafeCell<u64>,
 | 
					    pub next: Cell<Option<TaskRef>>,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// The time at which this item expires.
 | 
				
			||||||
 | 
					    pub expires_at: Cell<u64>,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					unsafe impl Sync for TimerQueueItem {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl TimerQueueItem {
 | 
					impl TimerQueueItem {
 | 
				
			||||||
    pub(crate) const fn new() -> Self {
 | 
					    pub(crate) const fn new() -> Self {
 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
            next: SyncUnsafeCell::new(None),
 | 
					            next: Cell::new(None),
 | 
				
			||||||
            expires_at: SyncUnsafeCell::new(0),
 | 
					            expires_at: Cell::new(0),
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/// A timer queue, with items integrated into tasks.
 | 
					 | 
				
			||||||
pub struct TimerQueue {
 | 
					 | 
				
			||||||
    head: SyncUnsafeCell<Option<TaskRef>>,
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
impl TimerQueue {
 | 
					 | 
				
			||||||
    /// Creates a new timer queue.
 | 
					 | 
				
			||||||
    pub const fn new() -> Self {
 | 
					 | 
				
			||||||
        Self {
 | 
					 | 
				
			||||||
            head: SyncUnsafeCell::new(None),
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    /// Schedules a task to run at a specific time.
 | 
					 | 
				
			||||||
    ///
 | 
					 | 
				
			||||||
    /// If this function returns `true`, the called should find the next expiration time and set
 | 
					 | 
				
			||||||
    /// a new alarm for that time.
 | 
					 | 
				
			||||||
    pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
 | 
					 | 
				
			||||||
        unsafe {
 | 
					 | 
				
			||||||
            let item = p.timer_queue_item();
 | 
					 | 
				
			||||||
            if item.next.get().is_none() {
 | 
					 | 
				
			||||||
                // If not in the queue, add it and update.
 | 
					 | 
				
			||||||
                let prev = self.head.replace(Some(p));
 | 
					 | 
				
			||||||
                item.next.set(prev);
 | 
					 | 
				
			||||||
            } else if at <= item.expires_at.get() {
 | 
					 | 
				
			||||||
                // If expiration is sooner than previously set, update.
 | 
					 | 
				
			||||||
            } else {
 | 
					 | 
				
			||||||
                // Task does not need to be updated.
 | 
					 | 
				
			||||||
                return false;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            item.expires_at.set(at);
 | 
					 | 
				
			||||||
            true
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    /// Dequeues expired timers and returns the next alarm time.
 | 
					 | 
				
			||||||
    ///
 | 
					 | 
				
			||||||
    /// The provided callback will be called for each expired task. Tasks that never expire
 | 
					 | 
				
			||||||
    /// will be removed, but the callback will not be called.
 | 
					 | 
				
			||||||
    pub fn next_expiration(&mut self, now: u64) -> u64 {
 | 
					 | 
				
			||||||
        let mut next_expiration = u64::MAX;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.retain(|p| {
 | 
					 | 
				
			||||||
            let item = p.timer_queue_item();
 | 
					 | 
				
			||||||
            let expires = unsafe { item.expires_at.get() };
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if expires <= now {
 | 
					 | 
				
			||||||
                // Timer expired, process task.
 | 
					 | 
				
			||||||
                super::wake_task(p);
 | 
					 | 
				
			||||||
                false
 | 
					 | 
				
			||||||
            } else {
 | 
					 | 
				
			||||||
                // Timer didn't yet expire, or never expires.
 | 
					 | 
				
			||||||
                next_expiration = min(next_expiration, expires);
 | 
					 | 
				
			||||||
                expires != u64::MAX
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        });
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        next_expiration
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
 | 
					 | 
				
			||||||
        unsafe {
 | 
					 | 
				
			||||||
            let mut prev = &self.head;
 | 
					 | 
				
			||||||
            while let Some(p) = prev.get() {
 | 
					 | 
				
			||||||
                let item = p.timer_queue_item();
 | 
					 | 
				
			||||||
                if f(p) {
 | 
					 | 
				
			||||||
                    // Skip to next
 | 
					 | 
				
			||||||
                    prev = &item.next;
 | 
					 | 
				
			||||||
                } else {
 | 
					 | 
				
			||||||
                    // Remove it
 | 
					 | 
				
			||||||
                    prev.set(item.next.get());
 | 
					 | 
				
			||||||
                    item.next.set(None);
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -54,9 +54,4 @@ impl<T> SyncUnsafeCell<T> {
 | 
				
			|||||||
    {
 | 
					    {
 | 
				
			||||||
        *self.value.get()
 | 
					        *self.value.get()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					 | 
				
			||||||
    #[cfg(feature = "integrated-timers")]
 | 
					 | 
				
			||||||
    pub unsafe fn replace(&self, value: T) -> T {
 | 
					 | 
				
			||||||
        core::mem::replace(&mut *self.value.get(), value)
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -22,9 +22,9 @@
 | 
				
			|||||||
//! );
 | 
					//! );
 | 
				
			||||||
//! ```
 | 
					//! ```
 | 
				
			||||||
//!
 | 
					//!
 | 
				
			||||||
//! You can also use the `queue_generic` or the `embassy_executor::raw::timer_queue` modules to
 | 
					//! You can also use the `queue_generic` or the `queue_integrated` modules to implement your own
 | 
				
			||||||
//! implement your own timer queue. These modules contain queue implementations which you can wrap
 | 
					//! timer queue. These modules contain queue implementations which you can wrap and tailor to
 | 
				
			||||||
//! and tailor to your needs.
 | 
					//! your needs.
 | 
				
			||||||
//!
 | 
					//!
 | 
				
			||||||
//! If you are providing an embassy-executor implementation besides a timer queue, you can choose to
 | 
					//! If you are providing an embassy-executor implementation besides a timer queue, you can choose to
 | 
				
			||||||
//! expose the `integrated-timers` feature in your implementation. This feature stores timer items
 | 
					//! expose the `integrated-timers` feature in your implementation. This feature stores timer items
 | 
				
			||||||
@ -49,7 +49,10 @@
 | 
				
			|||||||
//! embassy_time_queue_driver::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
 | 
					//! embassy_time_queue_driver::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
 | 
				
			||||||
//! ```
 | 
					//! ```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(not(feature = "integrated-timers"))]
 | 
				
			||||||
pub mod queue_generic;
 | 
					pub mod queue_generic;
 | 
				
			||||||
 | 
					#[cfg(feature = "integrated-timers")]
 | 
				
			||||||
 | 
					pub mod queue_integrated;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use core::cell::RefCell;
 | 
					use core::cell::RefCell;
 | 
				
			||||||
use core::task::Waker;
 | 
					use core::task::Waker;
 | 
				
			||||||
@ -89,7 +92,7 @@ macro_rules! timer_queue_impl {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(feature = "integrated-timers")]
 | 
					#[cfg(feature = "integrated-timers")]
 | 
				
			||||||
type InnerQueue = embassy_executor::raw::timer_queue::TimerQueue;
 | 
					type InnerQueue = queue_integrated::TimerQueue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(not(feature = "integrated-timers"))]
 | 
					#[cfg(not(feature = "integrated-timers"))]
 | 
				
			||||||
type InnerQueue = queue_generic::Queue;
 | 
					type InnerQueue = queue_generic::Queue;
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										78
									
								
								embassy-time-queue-driver/src/queue_integrated.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								embassy-time-queue-driver/src/queue_integrated.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,78 @@
 | 
				
			|||||||
 | 
					//! Timer queue operations.
 | 
				
			||||||
 | 
					use core::cell::Cell;
 | 
				
			||||||
 | 
					use core::cmp::min;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use embassy_executor::raw::TaskRef;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/// A timer queue, with items integrated into tasks.
 | 
				
			||||||
 | 
					pub struct TimerQueue {
 | 
				
			||||||
 | 
					    head: Cell<Option<TaskRef>>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl TimerQueue {
 | 
				
			||||||
 | 
					    /// Creates a new timer queue.
 | 
				
			||||||
 | 
					    pub const fn new() -> Self {
 | 
				
			||||||
 | 
					        Self { head: Cell::new(None) }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Schedules a task to run at a specific time.
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// If this function returns `true`, the called should find the next expiration time and set
 | 
				
			||||||
 | 
					    /// a new alarm for that time.
 | 
				
			||||||
 | 
					    pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
 | 
				
			||||||
 | 
					        let item = p.timer_queue_item();
 | 
				
			||||||
 | 
					        if item.next.get().is_none() {
 | 
				
			||||||
 | 
					            // If not in the queue, add it and update.
 | 
				
			||||||
 | 
					            let prev = self.head.replace(Some(p));
 | 
				
			||||||
 | 
					            item.next.set(prev);
 | 
				
			||||||
 | 
					        } else if at <= item.expires_at.get() {
 | 
				
			||||||
 | 
					            // If expiration is sooner than previously set, update.
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            // Task does not need to be updated.
 | 
				
			||||||
 | 
					            return false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        item.expires_at.set(at);
 | 
				
			||||||
 | 
					        true
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Dequeues expired timers and returns the next alarm time.
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// The provided callback will be called for each expired task. Tasks that never expire
 | 
				
			||||||
 | 
					    /// will be removed, but the callback will not be called.
 | 
				
			||||||
 | 
					    pub fn next_expiration(&mut self, now: u64) -> u64 {
 | 
				
			||||||
 | 
					        let mut next_expiration = u64::MAX;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.retain(|p| {
 | 
				
			||||||
 | 
					            let item = p.timer_queue_item();
 | 
				
			||||||
 | 
					            let expires = item.expires_at.get();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if expires <= now {
 | 
				
			||||||
 | 
					                // Timer expired, process task.
 | 
				
			||||||
 | 
					                embassy_executor::raw::wake_task(p);
 | 
				
			||||||
 | 
					                false
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                // Timer didn't yet expire, or never expires.
 | 
				
			||||||
 | 
					                next_expiration = min(next_expiration, expires);
 | 
				
			||||||
 | 
					                expires != u64::MAX
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        next_expiration
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
 | 
				
			||||||
 | 
					        let mut prev = &self.head;
 | 
				
			||||||
 | 
					        while let Some(p) = prev.get() {
 | 
				
			||||||
 | 
					            let item = p.timer_queue_item();
 | 
				
			||||||
 | 
					            if f(p) {
 | 
				
			||||||
 | 
					                // Skip to next
 | 
				
			||||||
 | 
					                prev = &item.next;
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                // Remove it
 | 
				
			||||||
 | 
					                prev.set(item.next.get());
 | 
				
			||||||
 | 
					                item.next.set(None);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user