Communication over channels.
Channels can be used to communicate data between running tasks. The channel is essentially a wait queue, allowing tasks with multiple producers and a single receiver. A channel is constructed in the init
task and backed by statically allocated memory. Send and receive endpoints are distributed to software tasks:
...
const CAPACITY: usize = 5;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
sender2::spawn(s.clone()).unwrap();
...
In this case the channel holds data of u32
type with a capacity of 5 elements.
Channels can also be used from hardware tasks, but only in a non-async
manner using the Try API.
Sending data
The send
method post a message on the channel as shown below:
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
}
Receiving data
The receiver can await
incoming messages:
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
...
}
}
Channels are implemented using a small (global) Critical Section (CS) for protection against race-conditions. The user must provide an CS implementation. Compiling the examples given the --features test-critical-section
gives one possible implementation.
For a complete example:
//! examples/async-channel.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 5;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
sender2::spawn(s.clone()).unwrap();
sender3::spawn(s).unwrap();
(Shared {}, Local {})
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
if val == 3 {
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
}
#[task]
async fn sender2(_c: sender2::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 2 sending: 2");
sender.send(2).await.unwrap();
}
#[task]
async fn sender3(_c: sender3::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 3 sending: 3");
sender.send(3).await.unwrap();
}
}
$ cargo xtask qemu --verbose --example async-channel --features test-critical-section
Sender 1 sending: 1
Sender 2 sending: 2
Sender 3 sending: 3
Receiver got: 1
Receiver got: 2
Receiver got: 3
Also sender endpoint can be awaited. In case the channel capacity has not yet been reached, await
-ing the sender can progress immediately, while in the case the capacity is reached, the sender is blocked until there is free space in the queue. In this way data is never lost.
In the following example the CAPACITY
has been reduced to 1, forcing sender tasks to wait until the data in the channel has been received.
//! examples/async-channel-done.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
sender2::spawn(s.clone()).unwrap();
sender3::spawn(s).unwrap();
(Shared {}, Local {})
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
if val == 3 {
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
hprintln!("Sender 1 done");
}
#[task]
async fn sender2(_c: sender2::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 2 sending: 2");
sender.send(2).await.unwrap();
hprintln!("Sender 2 done");
}
#[task]
async fn sender3(_c: sender3::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 3 sending: 3");
sender.send(3).await.unwrap();
hprintln!("Sender 3 done");
}
}
Looking at the output, we find that Sender 2
will wait until the data sent by Sender 1
as been received.
NOTICE Software tasks at the same priority are executed asynchronously to each other, thus NO strict order can be assumed. (The presented order here applies only to the current implementation, and may change between RTIC framework releases.)
$ cargo xtask qemu --verbose --example async-channel-done --features test-critical-section
Sender 1 sending: 1
Sender 1 done
Sender 2 sending: 2
Sender 3 sending: 3
Receiver got: 1
Sender 2 done
Receiver got: 2
Sender 3 done
Receiver got: 3
Error handling
In case all senders have been dropped await
-ing on an empty receiver channel results in an error. This allows to gracefully implement different types of shutdown operations.
//! examples/async-channel-no-sender.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (_s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
(Shared {}, Local {})
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
hprintln!("Receiver got: {:?}", receiver.recv().await);
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
$ cargo xtask qemu --verbose --example async-channel-no-sender --features test-critical-section
Receiver got: Err(NoSender)
Similarly, await
-ing on a send channel results in an error in case the receiver has been dropped. This allows to gracefully implement application level error handling.
The resulting error returns the data back to the sender, allowing the sender to take appropriate action (e.g., storing the data to later retry sending it).
//! examples/async-channel-no-receiver.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, _r) = make_channel!(u32, CAPACITY);
sender1::spawn(s.clone()).unwrap();
(Shared {}, Local {})
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1 {:?}", sender.send(1).await);
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
$ cargo xtask qemu --verbose --example async-channel-no-receiver --features test-critical-section
Sender 1 sending: 1 Err(NoReceiver(1))
Try API
Using the Try API, you can send or receive data from or to a channel without requiring that the operation succeeds, and in non-async
contexts.
This API is exposed through Receiver::try_recv
and Sender::try_send
.
//! examples/async-channel-try.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {
sender: Sender<'static, u32, CAPACITY>,
}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
(Shared {}, Local { sender: s.clone() })
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
}
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
hprintln!("Sender 1 try sending: 2 {:?}", sender.try_send(2));
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
// This interrupt is never triggered, but is used to demonstrate that
// one can (try to) send data into a channel from a hardware task.
#[task(binds = GPIOA, local = [sender])]
fn hw_task(cx: hw_task::Context) {
cx.local.sender.try_send(3).ok();
}
}
$ cargo xtask qemu --verbose --example async-channel-try --features test-critical-section
Sender 1 sending: 1
Sender 1 try sending: 2 Err(Full(2))