rtic_sync/
signal.rs

1//! A "latest only" value store with unlimited writers and async waiting.
2
3use core::{cell::UnsafeCell, future::poll_fn, task::Poll};
4use rtic_common::waker_registration::CriticalSectionWakerRegistration;
5
6/// Basically an Option but for indicating
7/// whether the store has been set or not
8#[derive(Clone, Copy)]
9enum Store<T> {
10    Set(T),
11    Unset,
12}
13
14/// A "latest only" value store with unlimited writers and async waiting.
15pub struct Signal<T: Copy> {
16    waker: CriticalSectionWakerRegistration,
17    store: UnsafeCell<Store<T>>,
18}
19
20impl<T> core::fmt::Debug for Signal<T>
21where
22    T: core::marker::Copy,
23{
24    fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
25        fmt.write_fmt(format_args!(
26            "Signal<{}>{{ .. }}",
27            core::any::type_name::<T>()
28        ))
29    }
30}
31
32impl<T: Copy> Default for Signal<T> {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38unsafe impl<T: Copy> Send for Signal<T> {}
39unsafe impl<T: Copy> Sync for Signal<T> {}
40
41impl<T: Copy> Signal<T> {
42    /// Create a new signal.
43    pub const fn new() -> Self {
44        Self {
45            waker: CriticalSectionWakerRegistration::new(),
46            store: UnsafeCell::new(Store::Unset),
47        }
48    }
49
50    /// Split the signal into a writer and reader.
51    pub fn split(&self) -> (SignalWriter<T>, SignalReader<T>) {
52        (SignalWriter { parent: self }, SignalReader { parent: self })
53    }
54}
55
56/// Facilitates the writing of values to a Signal.
57#[derive(Clone)]
58pub struct SignalWriter<'a, T: Copy> {
59    parent: &'a Signal<T>,
60}
61
62impl<T> core::fmt::Debug for SignalWriter<'_, T>
63where
64    T: core::marker::Copy,
65{
66    fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
67        fmt.write_fmt(format_args!(
68            "SignalWriter<{}>{{ .. }}",
69            core::any::type_name::<T>()
70        ))
71    }
72}
73
74impl<T: Copy> SignalWriter<'_, T> {
75    /// Write a raw Store value to the Signal.
76    fn write_inner(&mut self, value: Store<T>) {
77        critical_section::with(|_| {
78            // SAFETY: in a cs: exclusive access
79            unsafe { self.parent.store.get().replace(value) };
80        });
81
82        self.parent.waker.wake();
83    }
84
85    /// Write a value to the Signal.
86    pub fn write(&mut self, value: T) {
87        self.write_inner(Store::Set(value));
88    }
89
90    /// Clear the stored value in the Signal (if any).
91    pub fn clear(&mut self) {
92        self.write_inner(Store::Unset);
93    }
94}
95
96/// Facilitates the async reading of values from the Signal.
97pub struct SignalReader<'a, T: Copy> {
98    parent: &'a Signal<T>,
99}
100
101impl<T> core::fmt::Debug for SignalReader<'_, T>
102where
103    T: core::marker::Copy,
104{
105    fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
106        fmt.write_fmt(format_args!(
107            "SignalReader<{}>{{ .. }}",
108            core::any::type_name::<T>()
109        ))
110    }
111}
112
113impl<T: Copy> SignalReader<'_, T> {
114    /// Immediately read and evict the latest value stored in the Signal.
115    fn take(&mut self) -> Store<T> {
116        critical_section::with(|_| {
117            // SAFETY: in a cs: exclusive access
118            unsafe { self.parent.store.get().replace(Store::Unset) }
119        })
120    }
121
122    /// Returns a pending value if present, or None if no value is available.
123    ///
124    /// Upon read, the stored value is evicted.
125    pub fn try_read(&mut self) -> Option<T> {
126        match self.take() {
127            Store::Unset => None,
128            Store::Set(value) => Some(value),
129        }
130    }
131
132    /// Wait for a new value to be written and read it.
133    ///
134    /// If a value is already pending it will be returned immediately.
135    ///
136    /// Upon read, the stored value is evicted.
137    pub async fn wait(&mut self) -> T {
138        poll_fn(|ctx| {
139            self.parent.waker.register(ctx.waker());
140            match self.take() {
141                Store::Unset => Poll::Pending,
142                Store::Set(value) => Poll::Ready(value),
143            }
144        })
145        .await
146    }
147
148    /// Wait for a new value to be written and read it.
149    ///
150    /// If a value is already pending, it will be evicted and a new
151    /// value must be written for the wait to resolve.
152    ///
153    /// Upon read, the stored value is evicted.
154    pub async fn wait_fresh(&mut self) -> T {
155        self.take();
156        self.wait().await
157    }
158}
159
160/// Creates a split signal with `'static` lifetime.
161#[macro_export]
162macro_rules! make_signal {
163    ( $T:ty ) => {{
164        static SIGNAL: $crate::signal::Signal<$T> = $crate::signal::Signal::new();
165
166        SIGNAL.split()
167    }};
168}
169
170#[cfg(test)]
171#[cfg(not(loom))]
172mod tests {
173    use super::*;
174    use static_cell::StaticCell;
175
176    #[test]
177    fn empty() {
178        let (_writer, mut reader) = make_signal!(u32);
179
180        assert!(reader.try_read().is_none());
181    }
182
183    #[test]
184    fn ping_pong() {
185        let (mut writer, mut reader) = make_signal!(u32);
186
187        writer.write(0xde);
188        assert!(reader.try_read().is_some_and(|value| value == 0xde));
189    }
190
191    #[test]
192    fn latest() {
193        let (mut writer, mut reader) = make_signal!(u32);
194
195        writer.write(0xde);
196        writer.write(0xad);
197        writer.write(0xbe);
198        writer.write(0xef);
199        assert!(reader.try_read().is_some_and(|value| value == 0xef));
200    }
201
202    #[test]
203    fn consumption() {
204        let (mut writer, mut reader) = make_signal!(u32);
205
206        writer.write(0xaa);
207        assert!(reader.try_read().is_some_and(|value| value == 0xaa));
208        assert!(reader.try_read().is_none());
209    }
210
211    #[tokio::test]
212    async fn pending() {
213        let (mut writer, mut reader) = make_signal!(u32);
214
215        writer.write(0xaa);
216
217        assert_eq!(reader.wait().await, 0xaa);
218    }
219
220    #[tokio::test]
221    async fn waiting() {
222        static READER: StaticCell<SignalReader<u32>> = StaticCell::new();
223        let (mut writer, reader) = make_signal!(u32);
224
225        writer.write(0xaa);
226
227        let reader = READER.init(reader);
228        let handle = tokio::spawn(reader.wait_fresh());
229
230        tokio::task::yield_now().await; // encourage tokio executor to poll reader future
231        assert!(!handle.is_finished()); // verify reader future did not resolve after poll
232
233        writer.write(0xab);
234
235        assert!(handle.await.is_ok_and(|value| value == 0xab));
236    }
237}