Refactor integrated-timers
This commit is contained in:
@@ -42,29 +42,6 @@ defmt-timestamp-uptime-tus = ["defmt"]
|
||||
## Create a `MockDriver` that can be manually advanced for testing purposes.
|
||||
mock-driver = ["tick-hz-1_000_000"]
|
||||
|
||||
#! ### Generic Queue
|
||||
|
||||
## Create a global, generic queue that can be used with any executor.
|
||||
## To use this you must have a time driver provided.
|
||||
generic-queue = []
|
||||
|
||||
#! The following features set how many timers are used for the generic queue. At most one
|
||||
#! `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used.
|
||||
#!
|
||||
#! When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the
|
||||
#! end user to pick.
|
||||
|
||||
## Generic Queue with 8 timers
|
||||
generic-queue-8 = ["generic-queue"]
|
||||
## Generic Queue with 16 timers
|
||||
generic-queue-16 = ["generic-queue"]
|
||||
## Generic Queue with 32 timers
|
||||
generic-queue-32 = ["generic-queue"]
|
||||
## Generic Queue with 64 timers
|
||||
generic-queue-64 = ["generic-queue"]
|
||||
## Generic Queue with 128 timers
|
||||
generic-queue-128 = ["generic-queue"]
|
||||
|
||||
#! ### Tick Rate
|
||||
#!
|
||||
#! At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
|
||||
@@ -419,7 +396,6 @@ embedded-hal-async = { version = "1.0" }
|
||||
futures-util = { version = "0.3.17", default-features = false }
|
||||
critical-section = "1.1"
|
||||
cfg-if = "1.0.0"
|
||||
heapless = "0.8"
|
||||
|
||||
document-features = "0.2.7"
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use core::cell::RefCell;
|
||||
|
||||
use critical_section::Mutex as CsMutex;
|
||||
use embassy_time_driver::{AlarmHandle, Driver};
|
||||
use embassy_time_driver::Driver;
|
||||
|
||||
use crate::{Duration, Instant};
|
||||
|
||||
@@ -60,15 +60,13 @@ impl MockDriver {
|
||||
|
||||
let now = inner.now.as_ticks();
|
||||
|
||||
inner
|
||||
.alarm
|
||||
.as_mut()
|
||||
.filter(|alarm| alarm.timestamp <= now)
|
||||
.map(|alarm| {
|
||||
alarm.timestamp = u64::MAX;
|
||||
if inner.alarm.timestamp <= now {
|
||||
inner.alarm.timestamp = u64::MAX;
|
||||
|
||||
(alarm.callback, alarm.ctx)
|
||||
})
|
||||
Some((inner.alarm.callback, inner.alarm.ctx))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
@@ -76,68 +74,48 @@ impl MockDriver {
|
||||
(callback)(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Driver for MockDriver {
|
||||
fn now(&self) -> u64 {
|
||||
critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks()
|
||||
}
|
||||
|
||||
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
|
||||
/// Configures a callback to be called when the alarm fires.
|
||||
pub fn set_alarm_callback(&self, callback: fn(*mut ()), ctx: *mut ()) {
|
||||
critical_section::with(|cs| {
|
||||
let mut inner = self.0.borrow_ref_mut(cs);
|
||||
|
||||
if inner.alarm.is_some() {
|
||||
None
|
||||
} else {
|
||||
inner.alarm.replace(AlarmState::new());
|
||||
|
||||
Some(AlarmHandle::new(0))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
|
||||
critical_section::with(|cs| {
|
||||
let mut inner = self.0.borrow_ref_mut(cs);
|
||||
|
||||
let Some(alarm) = inner.alarm.as_mut() else {
|
||||
panic!("Alarm not allocated");
|
||||
};
|
||||
|
||||
alarm.callback = callback;
|
||||
alarm.ctx = ctx;
|
||||
inner.alarm.callback = callback;
|
||||
inner.alarm.ctx = ctx;
|
||||
});
|
||||
}
|
||||
|
||||
fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
/// Sets the alarm to fire at the specified timestamp.
|
||||
pub fn set_alarm(&self, timestamp: u64) -> bool {
|
||||
critical_section::with(|cs| {
|
||||
let mut inner = self.0.borrow_ref_mut(cs);
|
||||
|
||||
if timestamp <= inner.now.as_ticks() {
|
||||
false
|
||||
} else {
|
||||
let Some(alarm) = inner.alarm.as_mut() else {
|
||||
panic!("Alarm not allocated");
|
||||
};
|
||||
|
||||
alarm.timestamp = timestamp;
|
||||
inner.alarm.timestamp = timestamp;
|
||||
true
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Driver for MockDriver {
|
||||
fn now(&self) -> u64 {
|
||||
critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks()
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerMockDriver {
|
||||
now: Instant,
|
||||
alarm: Option<AlarmState>,
|
||||
alarm: AlarmState,
|
||||
}
|
||||
|
||||
impl InnerMockDriver {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
now: Instant::from_ticks(0),
|
||||
alarm: None,
|
||||
alarm: AlarmState::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -189,8 +167,7 @@ mod tests {
|
||||
setup();
|
||||
|
||||
let driver = MockDriver::get();
|
||||
let alarm = unsafe { AlarmHandle::new(0) };
|
||||
assert_eq!(false, driver.set_alarm(alarm, driver.now()));
|
||||
assert_eq!(false, driver.set_alarm(driver.now()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -199,23 +176,11 @@ mod tests {
|
||||
setup();
|
||||
|
||||
let driver = MockDriver::get();
|
||||
let alarm = unsafe { driver.allocate_alarm() }.expect("No alarms available");
|
||||
static mut CALLBACK_CALLED: bool = false;
|
||||
let ctx = &mut () as *mut ();
|
||||
driver.set_alarm_callback(alarm, |_| unsafe { CALLBACK_CALLED = true }, ctx);
|
||||
driver.set_alarm(alarm, driver.now() + 1);
|
||||
driver.set_alarm_callback(|_| unsafe { CALLBACK_CALLED = true }, core::ptr::null_mut());
|
||||
driver.set_alarm(driver.now() + 1);
|
||||
assert_eq!(false, unsafe { CALLBACK_CALLED });
|
||||
driver.advance(Duration::from_secs(1));
|
||||
assert_eq!(true, unsafe { CALLBACK_CALLED });
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_allocate_alarm() {
|
||||
setup();
|
||||
|
||||
let driver = MockDriver::get();
|
||||
assert!(unsafe { driver.allocate_alarm() }.is_some());
|
||||
assert!(unsafe { driver.allocate_alarm() }.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,53 +1,38 @@
|
||||
use core::sync::atomic::{AtomicU8, Ordering};
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::{Condvar, Mutex, Once};
|
||||
use std::time::{Duration as StdDuration, Instant as StdInstant};
|
||||
use std::{mem, ptr, thread};
|
||||
use std::{ptr, thread};
|
||||
|
||||
use critical_section::Mutex as CsMutex;
|
||||
use embassy_time_driver::{AlarmHandle, Driver};
|
||||
|
||||
const ALARM_COUNT: usize = 4;
|
||||
use embassy_time_driver::Driver;
|
||||
use embassy_time_queue_driver::GlobalTimerQueue;
|
||||
|
||||
struct AlarmState {
|
||||
timestamp: u64,
|
||||
|
||||
// This is really a Option<(fn(*mut ()), *mut ())>
|
||||
// but fn pointers aren't allowed in const yet
|
||||
callback: *const (),
|
||||
ctx: *mut (),
|
||||
}
|
||||
|
||||
unsafe impl Send for AlarmState {}
|
||||
|
||||
impl AlarmState {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
timestamp: u64::MAX,
|
||||
callback: ptr::null(),
|
||||
ctx: ptr::null_mut(),
|
||||
}
|
||||
Self { timestamp: u64::MAX }
|
||||
}
|
||||
}
|
||||
|
||||
struct TimeDriver {
|
||||
alarm_count: AtomicU8,
|
||||
|
||||
once: Once,
|
||||
// The STD Driver implementation requires the alarms' mutex to be reentrant, which the STD Mutex isn't
|
||||
// The STD Driver implementation requires the alarm's mutex to be reentrant, which the STD Mutex isn't
|
||||
// Fortunately, mutexes based on the `critical-section` crate are reentrant, because the critical sections
|
||||
// themselves are reentrant
|
||||
alarms: UninitCell<CsMutex<RefCell<[AlarmState; ALARM_COUNT]>>>,
|
||||
alarm: UninitCell<CsMutex<RefCell<AlarmState>>>,
|
||||
zero_instant: UninitCell<StdInstant>,
|
||||
signaler: UninitCell<Signaler>,
|
||||
}
|
||||
|
||||
embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
|
||||
alarm_count: AtomicU8::new(0),
|
||||
|
||||
once: Once::new(),
|
||||
alarms: UninitCell::uninit(),
|
||||
alarm: UninitCell::uninit(),
|
||||
zero_instant: UninitCell::uninit(),
|
||||
signaler: UninitCell::uninit(),
|
||||
});
|
||||
@@ -55,8 +40,8 @@ embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
|
||||
impl TimeDriver {
|
||||
fn init(&self) {
|
||||
self.once.call_once(|| unsafe {
|
||||
self.alarms
|
||||
.write(CsMutex::new(RefCell::new([const { AlarmState::new() }; ALARM_COUNT])));
|
||||
self.alarm
|
||||
.write(CsMutex::new(RefCell::new(const { AlarmState::new() })));
|
||||
self.zero_instant.write(StdInstant::now());
|
||||
self.signaler.write(Signaler::new());
|
||||
|
||||
@@ -70,36 +55,13 @@ impl TimeDriver {
|
||||
let now = DRIVER.now();
|
||||
|
||||
let next_alarm = critical_section::with(|cs| {
|
||||
let alarms = unsafe { DRIVER.alarms.as_ref() }.borrow(cs);
|
||||
loop {
|
||||
let pending = alarms
|
||||
.borrow_mut()
|
||||
.iter_mut()
|
||||
.find(|alarm| alarm.timestamp <= now)
|
||||
.map(|alarm| {
|
||||
alarm.timestamp = u64::MAX;
|
||||
let mut alarm = unsafe { DRIVER.alarm.as_ref() }.borrow_ref_mut(cs);
|
||||
if alarm.timestamp <= now {
|
||||
alarm.timestamp = u64::MAX;
|
||||
|
||||
(alarm.callback, alarm.ctx)
|
||||
});
|
||||
|
||||
if let Some((callback, ctx)) = pending {
|
||||
// safety:
|
||||
// - we can ignore the possiblity of `f` being unset (null) because of the safety contract of `allocate_alarm`.
|
||||
// - other than that we only store valid function pointers into alarm.callback
|
||||
let f: fn(*mut ()) = unsafe { mem::transmute(callback) };
|
||||
f(ctx);
|
||||
} else {
|
||||
// No alarm due
|
||||
break;
|
||||
}
|
||||
TIMER_QUEUE_DRIVER.dispatch();
|
||||
}
|
||||
|
||||
alarms
|
||||
.borrow()
|
||||
.iter()
|
||||
.map(|alarm| alarm.timestamp)
|
||||
.min()
|
||||
.unwrap_or(u64::MAX)
|
||||
alarm.timestamp
|
||||
});
|
||||
|
||||
// Ensure we don't overflow
|
||||
@@ -110,6 +72,17 @@ impl TimeDriver {
|
||||
unsafe { DRIVER.signaler.as_ref() }.wait_until(until);
|
||||
}
|
||||
}
|
||||
|
||||
fn set_alarm(&self, timestamp: u64) -> bool {
|
||||
self.init();
|
||||
critical_section::with(|cs| {
|
||||
let mut alarm = unsafe { self.alarm.as_ref() }.borrow_ref_mut(cs);
|
||||
alarm.timestamp = timestamp;
|
||||
unsafe { self.signaler.as_ref() }.signal();
|
||||
});
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl Driver for TimeDriver {
|
||||
@@ -119,43 +92,6 @@ impl Driver for TimeDriver {
|
||||
let zero = unsafe { self.zero_instant.read() };
|
||||
StdInstant::now().duration_since(zero).as_micros() as u64
|
||||
}
|
||||
|
||||
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
|
||||
let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
|
||||
if x < ALARM_COUNT as u8 {
|
||||
Some(x + 1)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
match id {
|
||||
Ok(id) => Some(AlarmHandle::new(id)),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
|
||||
self.init();
|
||||
critical_section::with(|cs| {
|
||||
let mut alarms = unsafe { self.alarms.as_ref() }.borrow_ref_mut(cs);
|
||||
let alarm = &mut alarms[alarm.id() as usize];
|
||||
alarm.callback = callback as *const ();
|
||||
alarm.ctx = ctx;
|
||||
});
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
self.init();
|
||||
critical_section::with(|cs| {
|
||||
let mut alarms = unsafe { self.alarms.as_ref() }.borrow_ref_mut(cs);
|
||||
let alarm = &mut alarms[alarm.id() as usize];
|
||||
alarm.timestamp = timestamp;
|
||||
unsafe { self.signaler.as_ref() }.signal();
|
||||
});
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
struct Signaler {
|
||||
@@ -228,3 +164,8 @@ impl<T: Copy> UninitCell<T> {
|
||||
ptr::read(self.as_mut_ptr())
|
||||
}
|
||||
}
|
||||
|
||||
embassy_time_queue_driver::timer_queue_impl!(
|
||||
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
|
||||
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
|
||||
);
|
||||
|
||||
@@ -1,28 +1,22 @@
|
||||
use core::sync::atomic::{AtomicU8, Ordering};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ptr;
|
||||
use std::sync::{Mutex, Once};
|
||||
|
||||
use embassy_time_driver::{AlarmHandle, Driver};
|
||||
use embassy_time_driver::Driver;
|
||||
use embassy_time_queue_driver::GlobalTimerQueue;
|
||||
use wasm_bindgen::prelude::*;
|
||||
use wasm_timer::Instant as StdInstant;
|
||||
|
||||
const ALARM_COUNT: usize = 4;
|
||||
|
||||
struct AlarmState {
|
||||
token: Option<f64>,
|
||||
closure: Option<Closure<dyn FnMut() + 'static>>,
|
||||
}
|
||||
|
||||
unsafe impl Send for AlarmState {}
|
||||
|
||||
impl AlarmState {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
token: None,
|
||||
closure: None,
|
||||
}
|
||||
Self { token: None }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,66 +27,32 @@ extern "C" {
|
||||
}
|
||||
|
||||
struct TimeDriver {
|
||||
alarm_count: AtomicU8,
|
||||
|
||||
once: Once,
|
||||
alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
|
||||
alarm: UninitCell<Mutex<AlarmState>>,
|
||||
zero_instant: UninitCell<StdInstant>,
|
||||
closure: UninitCell<Closure<dyn FnMut()>>,
|
||||
}
|
||||
|
||||
embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
|
||||
alarm_count: AtomicU8::new(0),
|
||||
once: Once::new(),
|
||||
alarms: UninitCell::uninit(),
|
||||
alarm: UninitCell::uninit(),
|
||||
zero_instant: UninitCell::uninit(),
|
||||
closure: UninitCell::uninit()
|
||||
});
|
||||
|
||||
impl TimeDriver {
|
||||
fn init(&self) {
|
||||
self.once.call_once(|| unsafe {
|
||||
self.alarms
|
||||
.write(Mutex::new([const { AlarmState::new() }; ALARM_COUNT]));
|
||||
self.alarm.write(Mutex::new(const { AlarmState::new() }));
|
||||
self.zero_instant.write(StdInstant::now());
|
||||
self.closure
|
||||
.write(Closure::new(Box::new(|| TIMER_QUEUE_DRIVER.dispatch())));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Driver for TimeDriver {
|
||||
fn now(&self) -> u64 {
|
||||
fn set_alarm(&self, timestamp: u64) -> bool {
|
||||
self.init();
|
||||
|
||||
let zero = unsafe { self.zero_instant.read() };
|
||||
StdInstant::now().duration_since(zero).as_micros() as u64
|
||||
}
|
||||
|
||||
unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
|
||||
let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
|
||||
if x < ALARM_COUNT as u8 {
|
||||
Some(x + 1)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
match id {
|
||||
Ok(id) => Some(AlarmHandle::new(id)),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
|
||||
self.init();
|
||||
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
|
||||
let alarm = &mut alarms[alarm.id() as usize];
|
||||
alarm.closure.replace(Closure::new(move || {
|
||||
callback(ctx);
|
||||
}));
|
||||
}
|
||||
|
||||
fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
|
||||
self.init();
|
||||
let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
|
||||
let alarm = &mut alarms[alarm.id() as usize];
|
||||
let mut alarm = unsafe { self.alarm.as_ref() }.lock().unwrap();
|
||||
if let Some(token) = alarm.token {
|
||||
clearTimeout(token);
|
||||
}
|
||||
@@ -102,13 +62,22 @@ impl Driver for TimeDriver {
|
||||
false
|
||||
} else {
|
||||
let timeout = (timestamp - now) as u32;
|
||||
alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000));
|
||||
alarm.token = Some(setTimeout(unsafe { self.closure.as_ref() }, timeout / 1000));
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Driver for TimeDriver {
|
||||
fn now(&self) -> u64 {
|
||||
self.init();
|
||||
|
||||
let zero = unsafe { self.zero_instant.read() };
|
||||
StdInstant::now().duration_since(zero).as_micros() as u64
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
|
||||
unsafe impl<T> Send for UninitCell<T> {}
|
||||
unsafe impl<T> Sync for UninitCell<T> {}
|
||||
@@ -139,3 +108,8 @@ impl<T: Copy> UninitCell<T> {
|
||||
ptr::read(self.as_mut_ptr())
|
||||
}
|
||||
}
|
||||
|
||||
embassy_time_queue_driver::timer_queue_impl!(
|
||||
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
|
||||
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
|
||||
);
|
||||
|
||||
@@ -25,8 +25,6 @@ pub use driver_mock::MockDriver;
|
||||
mod driver_std;
|
||||
#[cfg(feature = "wasm")]
|
||||
mod driver_wasm;
|
||||
#[cfg(feature = "generic-queue")]
|
||||
mod queue_generic;
|
||||
|
||||
pub use delay::{block_for, Delay};
|
||||
pub use duration::Duration;
|
||||
|
||||
@@ -1,346 +0,0 @@
|
||||
use core::cell::RefCell;
|
||||
use core::cmp::{min, Ordering};
|
||||
use core::task::Waker;
|
||||
|
||||
use critical_section::Mutex;
|
||||
use embassy_time_driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
|
||||
use embassy_time_queue_driver::TimerQueue;
|
||||
use heapless::Vec;
|
||||
|
||||
use crate::Instant;
|
||||
|
||||
#[cfg(feature = "generic-queue-8")]
|
||||
const QUEUE_SIZE: usize = 8;
|
||||
#[cfg(feature = "generic-queue-16")]
|
||||
const QUEUE_SIZE: usize = 16;
|
||||
#[cfg(feature = "generic-queue-32")]
|
||||
const QUEUE_SIZE: usize = 32;
|
||||
#[cfg(feature = "generic-queue-64")]
|
||||
const QUEUE_SIZE: usize = 64;
|
||||
#[cfg(feature = "generic-queue-128")]
|
||||
const QUEUE_SIZE: usize = 128;
|
||||
#[cfg(not(any(
|
||||
feature = "generic-queue-8",
|
||||
feature = "generic-queue-16",
|
||||
feature = "generic-queue-32",
|
||||
feature = "generic-queue-64",
|
||||
feature = "generic-queue-128"
|
||||
)))]
|
||||
const QUEUE_SIZE: usize = 64;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Timer {
|
||||
at: Instant,
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
impl PartialEq for Timer {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.at == other.at
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Timer {}
|
||||
|
||||
impl PartialOrd for Timer {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
self.at.partial_cmp(&other.at)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Timer {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.at.cmp(&other.at)
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerQueue {
|
||||
queue: Vec<Timer, QUEUE_SIZE>,
|
||||
alarm: AlarmHandle,
|
||||
}
|
||||
|
||||
impl InnerQueue {
|
||||
fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
|
||||
self.queue
|
||||
.iter_mut()
|
||||
.find(|timer| timer.waker.will_wake(waker))
|
||||
.map(|timer| {
|
||||
timer.at = min(timer.at, at);
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
let mut timer = Timer {
|
||||
waker: waker.clone(),
|
||||
at,
|
||||
};
|
||||
|
||||
loop {
|
||||
match self.queue.push(timer) {
|
||||
Ok(()) => break,
|
||||
Err(e) => timer = e,
|
||||
}
|
||||
|
||||
self.queue.pop().unwrap().waker.wake();
|
||||
}
|
||||
});
|
||||
|
||||
// Don't wait for the alarm callback to trigger and directly
|
||||
// dispatch all timers that are already due
|
||||
//
|
||||
// Then update the alarm if necessary
|
||||
self.dispatch();
|
||||
}
|
||||
|
||||
fn dispatch(&mut self) {
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut next_alarm = Instant::MAX;
|
||||
|
||||
let mut i = 0;
|
||||
while i < self.queue.len() {
|
||||
let timer = &self.queue[i];
|
||||
if timer.at <= now {
|
||||
let timer = self.queue.swap_remove(i);
|
||||
timer.waker.wake();
|
||||
} else {
|
||||
next_alarm = min(next_alarm, timer.at);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if self.update_alarm(next_alarm) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_alarm(&mut self, next_alarm: Instant) -> bool {
|
||||
if next_alarm == Instant::MAX {
|
||||
true
|
||||
} else {
|
||||
set_alarm(self.alarm, next_alarm.as_ticks())
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_alarm(&mut self) {
|
||||
self.dispatch();
|
||||
}
|
||||
}
|
||||
|
||||
struct Queue {
|
||||
inner: Mutex<RefCell<Option<InnerQueue>>>,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(RefCell::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
|
||||
critical_section::with(|cs| {
|
||||
let mut inner = self.inner.borrow_ref_mut(cs);
|
||||
|
||||
inner
|
||||
.get_or_insert_with(|| {
|
||||
let handle = unsafe { allocate_alarm() }.unwrap();
|
||||
set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
|
||||
InnerQueue {
|
||||
queue: Vec::new(),
|
||||
alarm: handle,
|
||||
}
|
||||
})
|
||||
.schedule_wake(at, waker)
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_alarm(&self) {
|
||||
critical_section::with(|cs| self.inner.borrow_ref_mut(cs).as_mut().unwrap().handle_alarm())
|
||||
}
|
||||
|
||||
fn handle_alarm_callback(ctx: *mut ()) {
|
||||
unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
|
||||
}
|
||||
}
|
||||
|
||||
impl TimerQueue for Queue {
|
||||
fn schedule_wake(&'static self, at: u64, waker: &Waker) {
|
||||
Queue::schedule_wake(self, Instant::from_ticks(at), waker);
|
||||
}
|
||||
}
|
||||
|
||||
embassy_time_queue_driver::timer_queue_impl!(static QUEUE: Queue = Queue::new());
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "mock-driver")]
|
||||
mod tests {
|
||||
use core::sync::atomic::{AtomicBool, Ordering};
|
||||
use core::task::Waker;
|
||||
use std::sync::Arc;
|
||||
use std::task::Wake;
|
||||
|
||||
use serial_test::serial;
|
||||
|
||||
use crate::driver_mock::MockDriver;
|
||||
use crate::queue_generic::QUEUE;
|
||||
use crate::{Duration, Instant};
|
||||
|
||||
struct TestWaker {
|
||||
pub awoken: AtomicBool,
|
||||
}
|
||||
|
||||
impl Wake for TestWaker {
|
||||
fn wake(self: Arc<Self>) {
|
||||
self.awoken.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn wake_by_ref(self: &Arc<Self>) {
|
||||
self.awoken.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
fn test_waker() -> (Arc<TestWaker>, Waker) {
|
||||
let arc = Arc::new(TestWaker {
|
||||
awoken: AtomicBool::new(false),
|
||||
});
|
||||
let waker = Waker::from(arc.clone());
|
||||
|
||||
(arc, waker)
|
||||
}
|
||||
|
||||
fn setup() {
|
||||
MockDriver::get().reset();
|
||||
critical_section::with(|cs| *QUEUE.inner.borrow_ref_mut(cs) = None);
|
||||
}
|
||||
|
||||
fn queue_len() -> usize {
|
||||
critical_section::with(|cs| {
|
||||
QUEUE
|
||||
.inner
|
||||
.borrow_ref(cs)
|
||||
.as_ref()
|
||||
.map(|inner| inner.queue.iter().count())
|
||||
.unwrap_or(0)
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_schedule() {
|
||||
setup();
|
||||
|
||||
assert_eq!(queue_len(), 0);
|
||||
|
||||
let (flag, waker) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(1), &waker);
|
||||
|
||||
assert!(!flag.awoken.load(Ordering::Relaxed));
|
||||
assert_eq!(queue_len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_schedule_same() {
|
||||
setup();
|
||||
|
||||
let (_flag, waker) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(1), &waker);
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(1), &waker);
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker);
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
let (_flag2, waker2) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker2);
|
||||
|
||||
assert_eq!(queue_len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_trigger() {
|
||||
setup();
|
||||
|
||||
let (flag, waker) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker);
|
||||
|
||||
assert!(!flag.awoken.load(Ordering::Relaxed));
|
||||
|
||||
MockDriver::get().advance(Duration::from_secs(99));
|
||||
|
||||
assert!(!flag.awoken.load(Ordering::Relaxed));
|
||||
|
||||
assert_eq!(queue_len(), 1);
|
||||
|
||||
MockDriver::get().advance(Duration::from_secs(1));
|
||||
|
||||
assert!(flag.awoken.load(Ordering::Relaxed));
|
||||
|
||||
assert_eq!(queue_len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_immediate_trigger() {
|
||||
setup();
|
||||
|
||||
let (flag, waker) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(100), &waker);
|
||||
|
||||
MockDriver::get().advance(Duration::from_secs(50));
|
||||
|
||||
let (flag2, waker2) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(40), &waker2);
|
||||
|
||||
assert!(!flag.awoken.load(Ordering::Relaxed));
|
||||
assert!(flag2.awoken.load(Ordering::Relaxed));
|
||||
assert_eq!(queue_len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_queue_overflow() {
|
||||
setup();
|
||||
|
||||
for i in 1..super::QUEUE_SIZE {
|
||||
let (flag, waker) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(310), &waker);
|
||||
|
||||
assert_eq!(queue_len(), i);
|
||||
assert!(!flag.awoken.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
let (flag, waker) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(300), &waker);
|
||||
|
||||
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||
assert!(!flag.awoken.load(Ordering::Relaxed));
|
||||
|
||||
let (flag2, waker2) = test_waker();
|
||||
|
||||
QUEUE.schedule_wake(Instant::from_secs(305), &waker2);
|
||||
|
||||
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||
assert!(flag.awoken.load(Ordering::Relaxed));
|
||||
|
||||
let (_flag3, waker3) = test_waker();
|
||||
QUEUE.schedule_wake(Instant::from_secs(320), &waker3);
|
||||
assert_eq!(queue_len(), super::QUEUE_SIZE);
|
||||
assert!(flag2.awoken.load(Ordering::Relaxed));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user