use an intrusive linked list in TaskHeader to track tasks
This commit is contained in:
		
							parent
							
								
									f4e0cbb7cc
								
							
						
					
					
						commit
						6085916714
					
				| @ -93,6 +93,78 @@ pub(crate) struct TaskHeader { | ||||
|     pub(crate) name: Option<&'static str>, | ||||
|     #[cfg(feature = "trace")] | ||||
|     pub(crate) id: u32, | ||||
|     #[cfg(feature = "trace")] | ||||
|     all_tasks_next: AtomicPtr<TaskHeader>, | ||||
| } | ||||
| 
 | ||||
| /// A thread-safe tracker for all tasks in the system
 | ||||
| ///
 | ||||
| /// This struct uses an intrusive linked list approach to track all tasks
 | ||||
| /// without additional memory allocations. It maintains a global list of
 | ||||
| /// tasks that can be traversed to find all currently existing tasks.
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub struct TaskTracker { | ||||
|     head: AtomicPtr<TaskHeader>, | ||||
| } | ||||
| 
 | ||||
| #[cfg(feature = "trace")] | ||||
| impl TaskTracker { | ||||
|     /// Creates a new empty task tracker
 | ||||
|     ///
 | ||||
|     /// Initializes a tracker with no tasks in its list.
 | ||||
|     pub const fn new() -> Self { | ||||
|         Self { | ||||
|             head: AtomicPtr::new(core::ptr::null_mut()), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Adds a task to the tracker
 | ||||
|     ///
 | ||||
|     /// This method inserts a task at the head of the intrusive linked list.
 | ||||
|     /// The operation is thread-safe and lock-free, using atomic operations
 | ||||
|     /// to ensure consistency even when called from different contexts.
 | ||||
|     ///
 | ||||
|     /// # Arguments
 | ||||
|     /// * `task` - The task reference to add to the tracker
 | ||||
|     pub fn add(&self, task: TaskRef) { | ||||
|         let task_ptr = task.as_ptr() as *mut TaskHeader; | ||||
| 
 | ||||
|         loop { | ||||
|             let current_head = self.head.load(Ordering::Acquire); | ||||
|             unsafe { | ||||
|                 (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); | ||||
|             } | ||||
| 
 | ||||
|             if self | ||||
|                 .head | ||||
|                 .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) | ||||
|                 .is_ok() | ||||
|             { | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Performs an operation on each task in the tracker
 | ||||
|     ///
 | ||||
|     /// This method traverses the entire list of tasks and calls the provided
 | ||||
|     /// function for each task. This allows inspecting or processing all tasks
 | ||||
|     /// in the system without modifying the tracker's structure.
 | ||||
|     ///
 | ||||
|     /// # Arguments
 | ||||
|     /// * `f` - A function to call for each task in the tracker
 | ||||
|     pub fn for_each<F>(&self, mut f: F) | ||||
|     where | ||||
|         F: FnMut(TaskRef), | ||||
|     { | ||||
|         let mut current = self.head.load(Ordering::Acquire); | ||||
|         while !current.is_null() { | ||||
|             let task = unsafe { TaskRef::from_ptr(current) }; | ||||
|             f(task); | ||||
| 
 | ||||
|             current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
 | ||||
| @ -228,6 +300,8 @@ impl<F: Future + 'static> TaskStorage<F> { | ||||
|                 name: None, | ||||
|                 #[cfg(feature = "trace")] | ||||
|                 id: 0, | ||||
|                 #[cfg(feature = "trace")] | ||||
|                 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), | ||||
|             }, | ||||
|             future: UninitCell::uninit(), | ||||
|         } | ||||
|  | ||||
| @ -81,107 +81,19 @@ | ||||
| 
 | ||||
| #![allow(unused)] | ||||
| 
 | ||||
| use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; | ||||
| use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; | ||||
| 
 | ||||
| use core::cell::UnsafeCell; | ||||
| use core::sync::atomic::{AtomicUsize, Ordering}; | ||||
| use rtos_trace::TaskInfo; | ||||
| 
 | ||||
| const MAX_TASKS: usize = 1000; | ||||
| 
 | ||||
| /// Represents a task being tracked in the task registry.
 | ||||
| /// Global task tracker instance
 | ||||
| ///
 | ||||
| /// Contains the task's unique identifier and optional name.
 | ||||
| #[derive(Clone)] | ||||
| pub struct TrackedTask { | ||||
|     task_id: u32, | ||||
| } | ||||
| 
 | ||||
| /// A thread-safe registry for tracking tasks in the system.
 | ||||
| ///
 | ||||
| /// This registry maintains a list of active tasks with their IDs and optional names.
 | ||||
| /// It supports registering, unregistering, and querying information about tasks.
 | ||||
| /// The registry has a fixed capacity of `MAX_TASKS`.
 | ||||
| pub struct TaskRegistry { | ||||
|     tasks: [UnsafeCell<Option<TrackedTask>>; MAX_TASKS], | ||||
|     count: AtomicUsize, | ||||
| } | ||||
| 
 | ||||
| impl TaskRegistry { | ||||
|     /// Creates a new empty task registry.
 | ||||
|     ///
 | ||||
|     /// This initializes a registry that can track up to `MAX_TASKS` tasks.  
 | ||||
|     pub const fn new() -> Self { | ||||
|         const EMPTY: UnsafeCell<Option<TrackedTask>> = UnsafeCell::new(None); | ||||
|         Self { | ||||
|             tasks: [EMPTY; MAX_TASKS], | ||||
|             count: AtomicUsize::new(0), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Registers a new task in the registry.
 | ||||
|     ///
 | ||||
|     /// # Arguments
 | ||||
|     /// * `task_id` - Unique identifier for the task
 | ||||
|     /// * `name` - Optional name for the task
 | ||||
|     ///
 | ||||
|     /// # Note
 | ||||
|     /// If the registry is full, the task will not be registered.
 | ||||
|     pub fn register(&self, task_id: u32) { | ||||
|         let count = self.count.load(Ordering::Relaxed); | ||||
|         if count < MAX_TASKS { | ||||
|             for i in 0..MAX_TASKS { | ||||
|                 unsafe { | ||||
|                     let slot = &self.tasks[i]; | ||||
|                     let slot_ref = &mut *slot.get(); | ||||
|                     if slot_ref.is_none() { | ||||
|                         *slot_ref = Some(TrackedTask { task_id }); | ||||
|                         self.count.fetch_add(1, Ordering::Relaxed); | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Removes a task from the registry.
 | ||||
|     ///
 | ||||
|     /// # Arguments
 | ||||
|     /// * `task_id` - Unique identifier of the task to remove
 | ||||
|     pub fn unregister(&self, task_id: u32) { | ||||
|         for i in 0..MAX_TASKS { | ||||
|             unsafe { | ||||
|                 let slot = &self.tasks[i]; | ||||
|                 let slot_ref = &mut *slot.get(); | ||||
|                 if let Some(task) = slot_ref { | ||||
|                     if task.task_id == task_id { | ||||
|                         *slot_ref = None; | ||||
|                         self.count.fetch_sub(1, Ordering::Relaxed); | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Returns an iterator over all registered tasks.
 | ||||
|     ///
 | ||||
|     /// This allows accessing information about all tasks currently in the registry.
 | ||||
|     pub fn get_all_tasks(&self) -> impl Iterator<Item = TrackedTask> + '_ { | ||||
|         (0..MAX_TASKS).filter_map(move |i| unsafe { | ||||
|             let slot = &self.tasks[i]; | ||||
|             (*slot.get()).clone() | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| unsafe impl Sync for TaskRegistry {} | ||||
| unsafe impl Send for TaskRegistry {} | ||||
| 
 | ||||
| /// Global task registry instance used for tracking all tasks in the system.
 | ||||
| ///
 | ||||
| /// This provides a centralized registry accessible from anywhere in the application.
 | ||||
| pub static TASK_REGISTRY: TaskRegistry = TaskRegistry::new(); | ||||
| /// This static provides access to the global task tracker which maintains
 | ||||
| /// a list of all tasks in the system. It's automatically updated by the
 | ||||
| /// task lifecycle hooks in the trace module.
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); | ||||
| 
 | ||||
| #[cfg(not(feature = "rtos-trace"))] | ||||
| extern "Rust" { | ||||
| @ -262,6 +174,9 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { | ||||
| 
 | ||||
|     #[cfg(feature = "rtos-trace")] | ||||
|     rtos_trace::trace::task_new(task.as_ptr() as u32); | ||||
| 
 | ||||
|     #[cfg(feature = "rtos-trace")] | ||||
|     TASK_TRACKER.add(*task); | ||||
| } | ||||
| 
 | ||||
| #[inline] | ||||
| @ -272,8 +187,6 @@ pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { | ||||
|     unsafe { | ||||
|         _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) | ||||
|     } | ||||
| 
 | ||||
|     TASK_REGISTRY.unregister(task_id); | ||||
| } | ||||
| 
 | ||||
| #[inline] | ||||
| @ -316,20 +229,93 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { | ||||
|     rtos_trace::trace::system_idle(); | ||||
| } | ||||
| 
 | ||||
| /// Returns an iterator over all active tasks in the system
 | ||||
| ///
 | ||||
| /// This function provides a convenient way to iterate over all tasks
 | ||||
| /// that are currently tracked in the system. The returned iterator
 | ||||
| /// yields each task in the global task tracker.
 | ||||
| ///
 | ||||
| /// # Returns
 | ||||
| /// An iterator that yields `TaskRef` items for each task
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub fn get_all_active_tasks() -> impl Iterator<Item = TaskRef> + 'static { | ||||
|     struct TaskIterator<'a> { | ||||
|         tracker: &'a TaskTracker, | ||||
|         current: *mut TaskHeader, | ||||
|     } | ||||
| 
 | ||||
|     impl<'a> Iterator for TaskIterator<'a> { | ||||
|         type Item = TaskRef; | ||||
| 
 | ||||
|         fn next(&mut self) -> Option<Self::Item> { | ||||
|             if self.current.is_null() { | ||||
|                 return None; | ||||
|             } | ||||
| 
 | ||||
|             let task = unsafe { TaskRef::from_ptr(self.current) }; | ||||
|             self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) }; | ||||
| 
 | ||||
|             Some(task) | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     TaskIterator { | ||||
|         tracker: &TASK_TRACKER, | ||||
|         current: TASK_TRACKER.head.load(Ordering::Acquire), | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Get all active tasks, filtered by a predicate function
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub fn filter_active_tasks<F>(predicate: F) -> impl Iterator<Item = TaskRef> + 'static | ||||
| where | ||||
|     F: Fn(&TaskRef) -> bool + 'static, | ||||
| { | ||||
|     get_all_active_tasks().filter(move |task| predicate(task)) | ||||
| } | ||||
| 
 | ||||
| /// Count the number of active tasks
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub fn count_active_tasks() -> usize { | ||||
|     let mut count = 0; | ||||
|     TASK_TRACKER.for_each(|_| count += 1); | ||||
|     count | ||||
| } | ||||
| 
 | ||||
| /// Perform an action on each active task
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub fn with_all_active_tasks<F>(f: F) | ||||
| where | ||||
|     F: FnMut(TaskRef), | ||||
| { | ||||
|     TASK_TRACKER.for_each(f); | ||||
| } | ||||
| 
 | ||||
| /// Get tasks by name
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub fn get_tasks_by_name(name: &'static str) -> impl Iterator<Item = TaskRef> + 'static { | ||||
|     filter_active_tasks(move |task| task.name() == Some(name)) | ||||
| } | ||||
| 
 | ||||
| /// Get tasks by ID
 | ||||
| #[cfg(feature = "trace")] | ||||
| pub fn get_task_by_id(id: u32) -> Option<TaskRef> { | ||||
|     filter_active_tasks(move |task| task.id() == id).next() | ||||
| } | ||||
| 
 | ||||
| #[cfg(feature = "rtos-trace")] | ||||
| impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { | ||||
|     fn task_list() { | ||||
|         for task in TASK_REGISTRY.get_all_tasks() { | ||||
|             let task_ref = unsafe { TaskRef::from_ptr(task.task_id as *const TaskHeader) }; | ||||
|             let name = task_ref.name().unwrap_or("unnamed\0"); | ||||
|         with_all_active_tasks(|task| { | ||||
|             let name = task.name().unwrap_or("unnamed task\0"); | ||||
|             let info = rtos_trace::TaskInfo { | ||||
|                 name, | ||||
|                 priority: 0, | ||||
|                 stack_base: 0, | ||||
|                 stack_size: 0, | ||||
|             }; | ||||
|             rtos_trace::trace::task_send_info(task.task_id, info); | ||||
|         } | ||||
|             rtos_trace::trace::task_send_info(task.as_id(), info); | ||||
|         }); | ||||
|     } | ||||
|     fn time() -> u64 { | ||||
|         const fn gcd(a: u64, b: u64) -> u64 { | ||||
|  | ||||
| @ -5,8 +5,6 @@ use core::sync::atomic::Ordering; | ||||
| use core::task::Poll; | ||||
| 
 | ||||
| use super::raw; | ||||
| #[cfg(feature = "rtos-trace")] | ||||
| use super::raw::trace::TASK_REGISTRY; | ||||
| 
 | ||||
| /// Token to spawn a newly-created task in an executor.
 | ||||
| ///
 | ||||
| @ -173,7 +171,6 @@ impl Spawner { | ||||
|             Some(task) => { | ||||
|                 task.set_name(Some(name)); | ||||
|                 let task_id = task.as_ptr() as u32; | ||||
|                 TASK_REGISTRY.register(task_id); | ||||
|                 task.set_id(task_id); | ||||
| 
 | ||||
|                 unsafe { self.executor.spawn(task) }; | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user