From e6001e66f8b21483a707d0015f9d278556759a7c Mon Sep 17 00:00:00 2001 From: 1-rafael-1 Date: Tue, 31 Dec 2024 15:54:42 +0100 Subject: [PATCH] Improve orchestrate_tasks example with shared state and better documentation Add mutex-protected shared system state Improve task coordination and signaling Add more documentation --- examples/rp/src/bin/orchestrate_tasks.rs | 322 +++++++++++------------ 1 file changed, 161 insertions(+), 161 deletions(-) diff --git a/examples/rp/src/bin/orchestrate_tasks.rs b/examples/rp/src/bin/orchestrate_tasks.rs index 0e21d5833..6f209da95 100644 --- a/examples/rp/src/bin/orchestrate_tasks.rs +++ b/examples/rp/src/bin/orchestrate_tasks.rs @@ -1,20 +1,18 @@ //! This example demonstrates some approaches to communicate between tasks in order to orchestrate the state of the system. //! +//! The system consists of several tasks: +//! - Three tasks that generate random numbers at different intervals (simulating i.e. sensor readings) +//! - A task that monitors USB power connection (hardware event handling) +//! - A task that reads system voltage (ADC sampling) +//! - A consumer task that processes all this information +//! +//! The system maintains state in a single place, wrapped in a Mutex. +//! //! We demonstrate how to: -//! - use a channel to send messages between tasks, in this case here in order to have one task control the state of the system. -//! - use a signal to terminate a task. -//! - use command channels to send commands to another task. -//! - use different ways to receive messages, from a straightforwar awaiting on one channel to a more complex awaiting on multiple futures. -//! -//! There are more patterns to orchestrate tasks, this is just one example. -//! -//! We will use these tasks to generate example "state information": -//! - a task that generates random numbers in intervals of 60s -//! - a task that generates random numbers in intervals of 30s -//! - a task that generates random numbers in intervals of 90s -//! - a task that notifies about being attached/disattached from usb power -//! - a task that measures vsys voltage in intervals of 30s -//! - a task that consumes the state information and reacts to it +//! - use a mutex to maintain shared state between tasks +//! - use a channel to send events between tasks +//! - use an orchestrator task to coordinate tasks and handle state transitions +//! - use signals to notify about state changes and terminate tasks #![no_std] #![no_main] @@ -28,15 +26,12 @@ use embassy_rp::clocks::RoscRng; use embassy_rp::gpio::{Input, Pull}; use embassy_rp::{bind_interrupts, peripherals}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_sync::{channel, signal}; +use embassy_sync::{channel, mutex::Mutex, signal}; use embassy_time::{Duration, Timer}; use rand::RngCore; use {defmt_rtt as _, panic_probe as _}; -// This is just some preparation, see example `assign_resources.rs` for more information on this. We prep the rresources that we will be using in different tasks. -// **Note**: This will not work with a board that has a wifi chip, because the wifi chip uses pins 24 and 29 for its own purposes. A way around this in software -// is not trivial, at least if you intend to use wifi, too. Workaround is to wire from vsys and vbus pins to appropriate pins on the board through a voltage divider. Then use those pins. -// For this example it will not matter much, the concept of what we are showing remains valid. +// Hardware resource assignment. See other examples for different ways of doing this. assign_resources! { vsys: Vsys { adc: ADC, @@ -47,228 +42,233 @@ assign_resources! { }, } +// Interrupt binding - required for hardware peripherals like ADC bind_interrupts!(struct Irqs { ADC_IRQ_FIFO => InterruptHandler; }); -/// This is the type of Events that we will send from the worker tasks to the orchestrating task. +/// Events that worker tasks send to the orchestrator enum Events { - UsbPowered(bool), - VsysVoltage(f32), - FirstRandomSeed(u32), - SecondRandomSeed(u32), - ThirdRandomSeed(u32), - ResetFirstRandomSeed, + UsbPowered(bool), // USB connection state changed + VsysVoltage(f32), // New voltage reading + FirstRandomSeed(u32), // Random number from 30s timer + SecondRandomSeed(u32), // Random number from 60s timer + ThirdRandomSeed(u32), // Random number from 90s timer + ResetFirstRandomSeed, // Signal to reset the first counter } -/// This is the type of Commands that we will send from the orchestrating task to the worker tasks. -/// Note that we are lazy here and only have one command, you might want to have more. +/// Commands that can control task behavior. +/// Currently only used to stop tasks, but could be extended for other controls. enum Commands { - /// This command will stop the appropriate worker task + /// Signals a task to stop execution Stop, } -/// This is the state of the system, we will use this to orchestrate the system. This is a simple example, in a real world application this would be more complex. -#[derive(Default, Debug, Clone, Format)] +/// The central state of our system, shared between tasks. +#[derive(Clone, Format)] struct State { usb_powered: bool, vsys_voltage: f32, first_random_seed: u32, second_random_seed: u32, third_random_seed: u32, + first_random_seed_task_running: bool, times_we_got_first_random_seed: u8, maximum_times_we_want_first_random_seed: u8, } +/// A formatted view of the system status, used for logging. Used for the below `get_system_summary` fn. +#[derive(Format)] +struct SystemStatus { + power_source: &'static str, + voltage: f32, +} + impl State { - fn new() -> Self { + const fn new() -> Self { Self { usb_powered: false, vsys_voltage: 0.0, first_random_seed: 0, second_random_seed: 0, third_random_seed: 0, + first_random_seed_task_running: false, times_we_got_first_random_seed: 0, maximum_times_we_want_first_random_seed: 3, } } + + /// Returns a formatted summary of power state and voltage. + /// Shows how to create methods that work with shared state. + fn get_system_summary(&self) -> SystemStatus { + SystemStatus { + power_source: if self.usb_powered { + "USB powered" + } else { + "Battery powered" + }, + voltage: self.vsys_voltage, + } + } } -/// Channel for the events that we want the orchestrator to react to, all state events are of the type Enum Events. -/// We use a channel with an arbitrary size of 10, the precise size of the queue depends on your use case. This depends on how many events we -/// expect to be generated in a given time frame and how fast the orchestrator can react to them. And then if we rather want the senders to wait for -/// new slots in the queue or if we want the orchestrator to have a backlog of events to process. In this case here we expect to always be enough slots -/// in the queue, so the worker tasks can in all nominal cases send their events and continue with their work without waiting. -/// For the events we - in this case here - do not want to loose any events, so a channel is a good choice. See embassy_sync docs for other options. +/// The shared state protected by a mutex +static SYSTEM_STATE: Mutex = Mutex::new(State::new()); + +/// Channel for events from worker tasks to the orchestrator static EVENT_CHANNEL: channel::Channel = channel::Channel::new(); -/// Signal for stopping the first random signal task. We use a signal here, because we need no queue. It is suffiient to have one signal active. +/// Signal used to stop the first random number task static STOP_FIRST_RANDOM_SIGNAL: signal::Signal = signal::Signal::new(); -/// Channel for the state that we want the consumer task to react to. We use a channel here, because we want to have a queue of state changes, although -/// we want the queue to be of size 1, because we want to finish rwacting to the state change before the next one comes in. This is just a design choice -/// and depends on your use case. -static CONSUMER_CHANNEL: channel::Channel = channel::Channel::new(); +/// Signal for notifying about state changes +static STATE_CHANGED: signal::Signal = signal::Signal::new(); -// And now we can put all this into use - -/// This is the main task, that will not do very much besides spawning the other tasks. This is a design choice, you could do the -/// orchestrating here. This is to show that we do not need a main loop here, the system will run indefinitely as long as at least one task is running. #[embassy_executor::main] async fn main(spawner: Spawner) { - // initialize the peripherals let p = embassy_rp::init(Default::default()); - // split the resources, for convenience - see above let r = split_resources! {p}; - // spawn the tasks spawner.spawn(orchestrate(spawner)).unwrap(); spawner.spawn(random_60s(spawner)).unwrap(); spawner.spawn(random_90s(spawner)).unwrap(); + // `random_30s` is not spawned here, butin the orchestrate task depending on state spawner.spawn(usb_power(spawner, r.vbus)).unwrap(); spawner.spawn(vsys_voltage(spawner, r.vsys)).unwrap(); spawner.spawn(consumer(spawner)).unwrap(); } -/// This is the task handling the system state and orchestrating the other tasks. WEe can regard this as the "main loop" of the system. +/// Main task that processes all events and updates system state. #[embassy_executor::task] -async fn orchestrate(_spawner: Spawner) { - let mut state = State::new(); - - // we need to have a receiver for the events +async fn orchestrate(spawner: Spawner) { let receiver = EVENT_CHANNEL.receiver(); - // and we need a sender for the consumer task - let state_sender = CONSUMER_CHANNEL.sender(); - loop { - // we await on the receiver, this will block until a new event is available - // as an alternative to this, we could also await on multiple channels, this would block until at least one of the channels has an event - // see the embassy_futures docs: https://docs.embassy.dev/embassy-futures/git/default/select/index.html - // The task random_30s does a select, if you want to have a look at that. - // Another reason to use select may also be that we want to have a timeout, so we can react to the absence of events within a time frame. - // We keep it simple here. + // Do nothing until we receive any event let event = receiver.receive().await; - // react to the events - match event { - Events::UsbPowered(usb_powered) => { - // update the state and/or react to the event here - state.usb_powered = usb_powered; - info!("Usb powered: {}", usb_powered); - } - Events::VsysVoltage(voltage) => { - // update the state and/or react to the event here - state.vsys_voltage = voltage; - info!("Vsys voltage: {}", voltage); - } - Events::FirstRandomSeed(seed) => { - // update the state and/or react to the event here - state.first_random_seed = seed; - // here we change some meta state, we count how many times we got the first random seed - state.times_we_got_first_random_seed += 1; - info!( - "First random seed: {}, and that was iteration {} of receiving this.", - seed, &state.times_we_got_first_random_seed - ); - } - Events::SecondRandomSeed(seed) => { - // update the state and/or react to the event here - state.second_random_seed = seed; - info!("Second random seed: {}", seed); - } - Events::ThirdRandomSeed(seed) => { - // update the state and/or react to the event here - state.third_random_seed = seed; - info!("Third random seed: {}", seed); - } - Events::ResetFirstRandomSeed => { - // update the state and/or react to the event here - state.times_we_got_first_random_seed = 0; - state.first_random_seed = 0; - info!("Resetting the first random seed counter"); - } - } - // we now have an altered state - // there is a crate for detecting field changes on crates.io (https://crates.io/crates/fieldset) that might be useful here - // for now we just keep it simple + // Scope in which we want to lock the system state. As an alternative we could also call `drop` on the state + { + let mut state = SYSTEM_STATE.lock().await; - // we send the state to the consumer task - // since the channel has a size of 1, this will block until the consumer task has received the state, which is what we want here in this example - // **Note:** It is bad design to send too much data between tasks, with no clear definition of what "too much" is. In this example we send the - // whole state, in a real world application you might want to send only the data, that is relevant to the consumer task AND only when it has changed. - // We keep it simple here. - state_sender.send(state.clone()).await; - } -} - -/// This task will consume the state information and react to it. This is a simple example, in a real world application this would be more complex -/// and we could have multiple consumer tasks, each reacting to different parts of the state. -#[embassy_executor::task] -async fn consumer(spawner: Spawner) { - // we need to have a receiver for the state - let receiver = CONSUMER_CHANNEL.receiver(); - let sender = EVENT_CHANNEL.sender(); - loop { - // we await on the receiver, this will block until a new state is available - let state = receiver.receive().await; - // react to the state, in this case here we just log it - info!("The consumer has reveived this state: {:?}", &state); - - // here we react to the state, in this case here we want to start or stop the first random signal task depending on the state of the system - match state.times_we_got_first_random_seed { - max if max == state.maximum_times_we_want_first_random_seed => { - info!("Stopping the first random signal task"); - // we send a command to the task - STOP_FIRST_RANDOM_SIGNAL.signal(Commands::Stop); - // we notify the orchestrator that we have sent the command - sender.send(Events::ResetFirstRandomSeed).await; - } - 0 => { - // we start the task, which presents us with an interesting problem, because we may return here before the task has started - // here we just try and log if the task has started, in a real world application you might want to handle this more gracefully - info!("Starting the first random signal task"); - match spawner.spawn(random_30s(spawner)) { - Ok(_) => info!("Successfully spawned random_30s task"), - Err(e) => info!("Failed to spawn random_30s task: {:?}", e), + match event { + Events::UsbPowered(usb_powered) => { + state.usb_powered = usb_powered; + info!("Usb powered: {}", usb_powered); + info!("System summary: {}", state.get_system_summary()); + } + Events::VsysVoltage(voltage) => { + state.vsys_voltage = voltage; + info!("Vsys voltage: {}", voltage); + } + Events::FirstRandomSeed(seed) => { + state.first_random_seed = seed; + state.times_we_got_first_random_seed += 1; + info!( + "First random seed: {}, and that was iteration {} of receiving this.", + seed, &state.times_we_got_first_random_seed + ); + } + Events::SecondRandomSeed(seed) => { + state.second_random_seed = seed; + info!("Second random seed: {}", seed); + } + Events::ThirdRandomSeed(seed) => { + state.third_random_seed = seed; + info!("Third random seed: {}", seed); + } + Events::ResetFirstRandomSeed => { + state.times_we_got_first_random_seed = 0; + state.first_random_seed = 0; + info!("Resetting the first random seed counter"); } } - _ => {} + + // Handle task orchestration based on state + // Just placed as an example here, could be hooked into the event system, puton a timer, ... + match state.times_we_got_first_random_seed { + max if max == state.maximum_times_we_want_first_random_seed => { + info!("Stopping the first random signal task"); + STOP_FIRST_RANDOM_SIGNAL.signal(Commands::Stop); + EVENT_CHANNEL.sender().send(Events::ResetFirstRandomSeed).await; + } + 0 => { + let respawn_first_random_seed_task = !state.first_random_seed_task_running; + // Deliberately dropping the Mutex lock here to release it before a lengthy operation + drop(state); + if respawn_first_random_seed_task { + info!("(Re)-Starting the first random signal task"); + spawner.spawn(random_30s(spawner)).unwrap(); + } + } + _ => {} + } } + + STATE_CHANGED.signal(()); } } -/// This task will generate random numbers in intervals of 30s -/// The task will terminate after it has received a command signal to stop, see the orchestrate task for that. -/// Note that we are not spawning this task from main, as we will show how such a task can be spawned and closed dynamically. +/// Task that monitors state changes and logs system status. +#[embassy_executor::task] +async fn consumer(_spawner: Spawner) { + loop { + // Wait for state change notification + STATE_CHANGED.wait().await; + + let state = SYSTEM_STATE.lock().await; + info!( + "State update - {} | Seeds - First: {} (count: {}/{}, running: {}), Second: {}, Third: {}", + state.get_system_summary(), + state.first_random_seed, + state.times_we_got_first_random_seed, + state.maximum_times_we_want_first_random_seed, + state.first_random_seed_task_running, + state.second_random_seed, + state.third_random_seed + ); + } +} + +/// Task that generates random numbers every 30 seconds until stopped. +/// Shows how to handle both timer events and stop signals. +/// As an example of some routine we want to be on or off depending on other needs. #[embassy_executor::task] async fn random_30s(_spawner: Spawner) { + { + let mut state = SYSTEM_STATE.lock().await; + state.first_random_seed_task_running = true; + } + let mut rng = RoscRng; let sender = EVENT_CHANNEL.sender(); + loop { - // we either await on the timer or the signal, whichever comes first. - let futures = select(Timer::after(Duration::from_secs(30)), STOP_FIRST_RANDOM_SIGNAL.wait()).await; - match futures { + // Wait for either 30s timer or stop signal (like select() in Go) + match select(Timer::after(Duration::from_secs(30)), STOP_FIRST_RANDOM_SIGNAL.wait()).await { Either::First(_) => { - // we received are operating on the timer info!("30s are up, generating random number"); let random_number = rng.next_u32(); sender.send(Events::FirstRandomSeed(random_number)).await; } Either::Second(_) => { - // we received the signal to stop info!("Received signal to stop, goodbye!"); + + let mut state = SYSTEM_STATE.lock().await; + state.first_random_seed_task_running = false; + break; } } } } -/// This task will generate random numbers in intervals of 60s +/// Task that generates random numbers every 60 seconds. As an example of some routine. #[embassy_executor::task] async fn random_60s(_spawner: Spawner) { let mut rng = RoscRng; let sender = EVENT_CHANNEL.sender(); + loop { Timer::after(Duration::from_secs(60)).await; let random_number = rng.next_u32(); @@ -276,11 +276,12 @@ async fn random_60s(_spawner: Spawner) { } } -/// This task will generate random numbers in intervals of 90s +/// Task that generates random numbers every 90 seconds. . As an example of some routine. #[embassy_executor::task] async fn random_90s(_spawner: Spawner) { let mut rng = RoscRng; let sender = EVENT_CHANNEL.sender(); + loop { Timer::after(Duration::from_secs(90)).await; let random_number = rng.next_u32(); @@ -288,31 +289,30 @@ async fn random_90s(_spawner: Spawner) { } } -/// This task will notify if we are connected to usb power +/// Task that monitors USB power connection. As an example of some Interrupt somewhere. #[embassy_executor::task] pub async fn usb_power(_spawner: Spawner, r: Vbus) { let mut vbus_in = Input::new(r.pin_24, Pull::None); let sender = EVENT_CHANNEL.sender(); + loop { sender.send(Events::UsbPowered(vbus_in.is_high())).await; vbus_in.wait_for_any_edge().await; } } -/// This task will measure the vsys voltage in intervals of 30s +/// Task that reads system voltage through ADC. As an example of some continuous sensor reading. #[embassy_executor::task] pub async fn vsys_voltage(_spawner: Spawner, r: Vsys) { let mut adc = Adc::new(r.adc, Irqs, Config::default()); let vsys_in = r.pin_29; let mut channel = Channel::new_pin(vsys_in, Pull::None); let sender = EVENT_CHANNEL.sender(); + loop { - // read the adc value + Timer::after(Duration::from_secs(30)).await; let adc_value = adc.read(&mut channel).await.unwrap(); - // convert the adc value to voltage. - // 3.3 is the reference voltage, 3.0 is the factor for the inbuilt voltage divider and 4096 is the resolution of the adc let voltage = (adc_value as f32) * 3.3 * 3.0 / 4096.0; sender.send(Events::VsysVoltage(voltage)).await; - Timer::after(Duration::from_secs(30)).await; } }