diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 075d8a254..b4adfe01b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -93,6 +93,78 @@ pub(crate) struct TaskHeader { pub(crate) name: Option<&'static str>, #[cfg(feature = "trace")] pub(crate) id: u32, + #[cfg(feature = "trace")] + all_tasks_next: AtomicPtr, +} + +/// A thread-safe tracker for all tasks in the system +/// +/// This struct uses an intrusive linked list approach to track all tasks +/// without additional memory allocations. It maintains a global list of +/// tasks that can be traversed to find all currently existing tasks. +#[cfg(feature = "trace")] +pub struct TaskTracker { + head: AtomicPtr, +} + +#[cfg(feature = "trace")] +impl TaskTracker { + /// Creates a new empty task tracker + /// + /// Initializes a tracker with no tasks in its list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } + + /// Adds a task to the tracker + /// + /// This method inserts a task at the head of the intrusive linked list. + /// The operation is thread-safe and lock-free, using atomic operations + /// to ensure consistency even when called from different contexts. + /// + /// # Arguments + /// * `task` - The task reference to add to the tracker + pub fn add(&self, task: TaskRef) { + let task_ptr = task.as_ptr() as *mut TaskHeader; + + loop { + let current_head = self.head.load(Ordering::Acquire); + unsafe { + (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); + } + + if self + .head + .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + /// Performs an operation on each task in the tracker + /// + /// This method traverses the entire list of tasks and calls the provided + /// function for each task. This allows inspecting or processing all tasks + /// in the system without modifying the tracker's structure. + /// + /// # Arguments + /// * `f` - A function to call for each task in the tracker + pub fn for_each(&self, mut f: F) + where + F: FnMut(TaskRef), + { + let mut current = self.head.load(Ordering::Acquire); + while !current.is_null() { + let task = unsafe { TaskRef::from_ptr(current) }; + f(task); + + current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; + } + } } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -173,7 +245,7 @@ impl TaskRef { #[cfg(feature = "trace")] pub fn id(&self) -> u32 { self.header().id - } + } /// Set the ID for a task #[cfg(feature = "trace")] @@ -228,6 +300,8 @@ impl TaskStorage { name: None, #[cfg(feature = "trace")] id: 0, + #[cfg(feature = "trace")] + all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), }, future: UninitCell::uninit(), } diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 28be79cee..81c8a0024 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,107 +81,19 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; use rtos_trace::TaskInfo; -const MAX_TASKS: usize = 1000; - -/// Represents a task being tracked in the task registry. +/// Global task tracker instance /// -/// Contains the task's unique identifier and optional name. -#[derive(Clone)] -pub struct TrackedTask { - task_id: u32, -} - -/// A thread-safe registry for tracking tasks in the system. -/// -/// This registry maintains a list of active tasks with their IDs and optional names. -/// It supports registering, unregistering, and querying information about tasks. -/// The registry has a fixed capacity of `MAX_TASKS`. -pub struct TaskRegistry { - tasks: [UnsafeCell>; MAX_TASKS], - count: AtomicUsize, -} - -impl TaskRegistry { - /// Creates a new empty task registry. - /// - /// This initializes a registry that can track up to `MAX_TASKS` tasks. - pub const fn new() -> Self { - const EMPTY: UnsafeCell> = UnsafeCell::new(None); - Self { - tasks: [EMPTY; MAX_TASKS], - count: AtomicUsize::new(0), - } - } - - /// Registers a new task in the registry. - /// - /// # Arguments - /// * `task_id` - Unique identifier for the task - /// * `name` - Optional name for the task - /// - /// # Note - /// If the registry is full, the task will not be registered. - pub fn register(&self, task_id: u32) { - let count = self.count.load(Ordering::Relaxed); - if count < MAX_TASKS { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &mut *slot.get(); - if slot_ref.is_none() { - *slot_ref = Some(TrackedTask { task_id }); - self.count.fetch_add(1, Ordering::Relaxed); - break; - } - } - } - } - } - - /// Removes a task from the registry. - /// - /// # Arguments - /// * `task_id` - Unique identifier of the task to remove - pub fn unregister(&self, task_id: u32) { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &mut *slot.get(); - if let Some(task) = slot_ref { - if task.task_id == task_id { - *slot_ref = None; - self.count.fetch_sub(1, Ordering::Relaxed); - break; - } - } - } - } - } - - /// Returns an iterator over all registered tasks. - /// - /// This allows accessing information about all tasks currently in the registry. - pub fn get_all_tasks(&self) -> impl Iterator + '_ { - (0..MAX_TASKS).filter_map(move |i| unsafe { - let slot = &self.tasks[i]; - (*slot.get()).clone() - }) - } -} - -unsafe impl Sync for TaskRegistry {} -unsafe impl Send for TaskRegistry {} - -/// Global task registry instance used for tracking all tasks in the system. -/// -/// This provides a centralized registry accessible from anywhere in the application. -pub static TASK_REGISTRY: TaskRegistry = TaskRegistry::new(); +/// This static provides access to the global task tracker which maintains +/// a list of all tasks in the system. It's automatically updated by the +/// task lifecycle hooks in the trace module. +#[cfg(feature = "trace")] +pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); #[cfg(not(feature = "rtos-trace"))] extern "Rust" { @@ -262,6 +174,9 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[cfg(feature = "rtos-trace")] rtos_trace::trace::task_new(task.as_ptr() as u32); + + #[cfg(feature = "rtos-trace")] + TASK_TRACKER.add(*task); } #[inline] @@ -272,8 +187,6 @@ pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } - - TASK_REGISTRY.unregister(task_id); } #[inline] @@ -316,20 +229,93 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { rtos_trace::trace::system_idle(); } +/// Returns an iterator over all active tasks in the system +/// +/// This function provides a convenient way to iterate over all tasks +/// that are currently tracked in the system. The returned iterator +/// yields each task in the global task tracker. +/// +/// # Returns +/// An iterator that yields `TaskRef` items for each task +#[cfg(feature = "trace")] +pub fn get_all_active_tasks() -> impl Iterator + 'static { + struct TaskIterator<'a> { + tracker: &'a TaskTracker, + current: *mut TaskHeader, + } + + impl<'a> Iterator for TaskIterator<'a> { + type Item = TaskRef; + + fn next(&mut self) -> Option { + if self.current.is_null() { + return None; + } + + let task = unsafe { TaskRef::from_ptr(self.current) }; + self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) }; + + Some(task) + } + } + + TaskIterator { + tracker: &TASK_TRACKER, + current: TASK_TRACKER.head.load(Ordering::Acquire), + } +} + +/// Get all active tasks, filtered by a predicate function +#[cfg(feature = "trace")] +pub fn filter_active_tasks(predicate: F) -> impl Iterator + 'static +where + F: Fn(&TaskRef) -> bool + 'static, +{ + get_all_active_tasks().filter(move |task| predicate(task)) +} + +/// Count the number of active tasks +#[cfg(feature = "trace")] +pub fn count_active_tasks() -> usize { + let mut count = 0; + TASK_TRACKER.for_each(|_| count += 1); + count +} + +/// Perform an action on each active task +#[cfg(feature = "trace")] +pub fn with_all_active_tasks(f: F) +where + F: FnMut(TaskRef), +{ + TASK_TRACKER.for_each(f); +} + +/// Get tasks by name +#[cfg(feature = "trace")] +pub fn get_tasks_by_name(name: &'static str) -> impl Iterator + 'static { + filter_active_tasks(move |task| task.name() == Some(name)) +} + +/// Get tasks by ID +#[cfg(feature = "trace")] +pub fn get_task_by_id(id: u32) -> Option { + filter_active_tasks(move |task| task.id() == id).next() +} + #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { - for task in TASK_REGISTRY.get_all_tasks() { - let task_ref = unsafe { TaskRef::from_ptr(task.task_id as *const TaskHeader) }; - let name = task_ref.name().unwrap_or("unnamed\0"); + with_all_active_tasks(|task| { + let name = task.name().unwrap_or("unnamed task\0"); let info = rtos_trace::TaskInfo { name, priority: 0, stack_base: 0, stack_size: 0, }; - rtos_trace::trace::task_send_info(task.task_id, info); - } + rtos_trace::trace::task_send_info(task.as_id(), info); + }); } fn time() -> u64 { const fn gcd(a: u64, b: u64) -> u64 { diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 7f907346d..5e42f01bf 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -5,8 +5,6 @@ use core::sync::atomic::Ordering; use core::task::Poll; use super::raw; -#[cfg(feature = "rtos-trace")] -use super::raw::trace::TASK_REGISTRY; /// Token to spawn a newly-created task in an executor. /// @@ -173,7 +171,6 @@ impl Spawner { Some(task) => { task.set_name(Some(name)); let task_id = task.as_ptr() as u32; - TASK_REGISTRY.register(task_id); task.set_id(task_id); unsafe { self.executor.spawn(task) };