heapless/
mpmc.rs

1//! A fixed capacity Multiple-Producer Multiple-Consumer (MPMC) lock-free queue
2//!
3//! NOTE: This module is not available on targets that do *not* support CAS operations and are not
4//! emulated by the [`atomic_polyfill`](https://crates.io/crates/atomic-polyfill) crate (e.g.,
5//! MSP430).
6//!
7//! # Example
8//!
9//! This queue can be constructed in "const context". Placing it in a `static` variable lets *all*
10//! contexts (interrupts / threads / `main`) safely enqueue and dequeue items from it.
11//!
12//! ``` ignore
13//! #![no_main]
14//! #![no_std]
15//!
16//! use panic_semihosting as _;
17//!
18//! use cortex_m::{asm, peripheral::syst::SystClkSource};
19//! use cortex_m_rt::{entry, exception};
20//! use cortex_m_semihosting::hprintln;
21//! use heapless::mpmc::Q2;
22//!
23//! static Q: Q2<u8> = Q2::new();
24//!
25//! #[entry]
26//! fn main() -> ! {
27//!     if let Some(p) = cortex_m::Peripherals::take() {
28//!         let mut syst = p.SYST;
29//!
30//!         // configures the system timer to trigger a SysTick exception every second
31//!         syst.set_clock_source(SystClkSource::Core);
32//!         syst.set_reload(12_000_000);
33//!         syst.enable_counter();
34//!         syst.enable_interrupt();
35//!     }
36//!
37//!     loop {
38//!         if let Some(x) = Q.dequeue() {
39//!             hprintln!("{}", x).ok();
40//!         } else {
41//!             asm::wfi();
42//!         }
43//!     }
44//! }
45//!
46//! #[exception]
47//! fn SysTick() {
48//!     static mut COUNT: u8 = 0;
49//!
50//!     Q.enqueue(*COUNT).ok();
51//!     *COUNT += 1;
52//! }
53//! ```
54//!
55//! # Benchmark
56//!
57//! Measured on a ARM Cortex-M3 core running at 8 MHz and with zero Flash wait cycles
58//!
59//! N| `Q8::<u8>::enqueue().ok()` (`z`) | `Q8::<u8>::dequeue()` (`z`) |
60//! -|----------------------------------|-----------------------------|
61//! 0|34                                |35                           |
62//! 1|52                                |53                           |
63//! 2|69                                |71                           |
64//!
65//! - `N` denotes the number of *interruptions*. On Cortex-M, an interruption consists of an
66//!   interrupt handler preempting the would-be atomic section of the `enqueue` / `dequeue`
67//!   operation. Note that it does *not* matter if the higher priority handler uses the queue or
68//!   not.
69//! - All execution times are in clock cycles. 1 clock cycle = 125 ns.
70//! - Execution time is *dependent* of `mem::size_of::<T>()`. Both operations include one
71//! `memcpy(T)` in their successful path.
72//! - The optimization level is indicated in parentheses.
73//! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue`
74//! and `Ok` is returned by `enqueue`).
75//!
76//! # Portability
77//!
78//! This module requires CAS atomic instructions which are not available on all architectures
79//! (e.g.  ARMv6-M (`thumbv6m-none-eabi`) and MSP430 (`msp430-none-elf`)). These atomics can be
80//! emulated however with [`atomic_polyfill`](https://crates.io/crates/atomic-polyfill), which is
81//! enabled with the `cas` feature and is enabled by default for `thumbv6m-none-eabi` and `riscv32`
82//! targets. MSP430 is currently not supported by
83//! [`atomic_polyfill`](https://crates.io/crates/atomic-polyfill).
84//!
85//! # References
86//!
87//! This is an implementation of Dmitry Vyukov's ["Bounded MPMC queue"][0] minus the cache padding.
88//!
89//! [0]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
90
91use core::{cell::UnsafeCell, mem::MaybeUninit};
92
93#[cfg(all(feature = "mpmc_large", not(cas_atomic_polyfill)))]
94type AtomicTargetSize = core::sync::atomic::AtomicUsize;
95#[cfg(all(feature = "mpmc_large", cas_atomic_polyfill))]
96type AtomicTargetSize = atomic_polyfill::AtomicUsize;
97#[cfg(all(not(feature = "mpmc_large"), not(cas_atomic_polyfill)))]
98type AtomicTargetSize = core::sync::atomic::AtomicU8;
99#[cfg(all(not(feature = "mpmc_large"), cas_atomic_polyfill))]
100type AtomicTargetSize = atomic_polyfill::AtomicU8;
101
102#[cfg(not(cas_atomic_polyfill))]
103type Ordering = core::sync::atomic::Ordering;
104#[cfg(cas_atomic_polyfill)]
105type Ordering = atomic_polyfill::Ordering;
106
107#[cfg(feature = "mpmc_large")]
108type IntSize = usize;
109#[cfg(not(feature = "mpmc_large"))]
110type IntSize = u8;
111
112/// MPMC queue with a capability for 2 elements.
113pub type Q2<T> = MpMcQueue<T, 2>;
114
115/// MPMC queue with a capability for 4 elements.
116pub type Q4<T> = MpMcQueue<T, 4>;
117
118/// MPMC queue with a capability for 8 elements.
119pub type Q8<T> = MpMcQueue<T, 8>;
120
121/// MPMC queue with a capability for 16 elements.
122pub type Q16<T> = MpMcQueue<T, 16>;
123
124/// MPMC queue with a capability for 32 elements.
125pub type Q32<T> = MpMcQueue<T, 32>;
126
127/// MPMC queue with a capability for 64 elements.
128pub type Q64<T> = MpMcQueue<T, 64>;
129
130/// MPMC queue with a capacity for N elements
131/// N must be a power of 2
132/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
133pub struct MpMcQueue<T, const N: usize> {
134    buffer: UnsafeCell<[Cell<T>; N]>,
135    dequeue_pos: AtomicTargetSize,
136    enqueue_pos: AtomicTargetSize,
137}
138
139impl<T, const N: usize> MpMcQueue<T, N> {
140    const MASK: IntSize = (N - 1) as IntSize;
141    const EMPTY_CELL: Cell<T> = Cell::new(0);
142
143    const ASSERT: [(); 1] = [()];
144
145    /// Creates an empty queue
146    pub const fn new() -> Self {
147        // Const assert
148        crate::sealed::greater_than_1::<N>();
149        crate::sealed::power_of_two::<N>();
150
151        // Const assert on size.
152        Self::ASSERT[!(N < (IntSize::MAX as usize)) as usize];
153
154        let mut cell_count = 0;
155
156        let mut result_cells: [Cell<T>; N] = [Self::EMPTY_CELL; N];
157        while cell_count != N {
158            result_cells[cell_count] = Cell::new(cell_count);
159            cell_count += 1;
160        }
161
162        Self {
163            buffer: UnsafeCell::new(result_cells),
164            dequeue_pos: AtomicTargetSize::new(0),
165            enqueue_pos: AtomicTargetSize::new(0),
166        }
167    }
168
169    /// Returns the item in the front of the queue, or `None` if the queue is empty
170    pub fn dequeue(&self) -> Option<T> {
171        unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) }
172    }
173
174    /// Adds an `item` to the end of the queue
175    ///
176    /// Returns back the `item` if the queue is full
177    pub fn enqueue(&self, item: T) -> Result<(), T> {
178        unsafe {
179            enqueue(
180                self.buffer.get() as *mut _,
181                &self.enqueue_pos,
182                Self::MASK,
183                item,
184            )
185        }
186    }
187}
188
189impl<T, const N: usize> Default for MpMcQueue<T, N> {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}
196
197struct Cell<T> {
198    data: MaybeUninit<T>,
199    sequence: AtomicTargetSize,
200}
201
202impl<T> Cell<T> {
203    const fn new(seq: usize) -> Self {
204        Self {
205            data: MaybeUninit::uninit(),
206            sequence: AtomicTargetSize::new(seq as IntSize),
207        }
208    }
209}
210
211unsafe fn dequeue<T>(
212    buffer: *mut Cell<T>,
213    dequeue_pos: &AtomicTargetSize,
214    mask: IntSize,
215) -> Option<T> {
216    let mut pos = dequeue_pos.load(Ordering::Relaxed);
217
218    let mut cell;
219    loop {
220        cell = buffer.add(usize::from(pos & mask));
221        let seq = (*cell).sequence.load(Ordering::Acquire);
222        let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8);
223
224        if dif == 0 {
225            if dequeue_pos
226                .compare_exchange_weak(
227                    pos,
228                    pos.wrapping_add(1),
229                    Ordering::Relaxed,
230                    Ordering::Relaxed,
231                )
232                .is_ok()
233            {
234                break;
235            }
236        } else if dif < 0 {
237            return None;
238        } else {
239            pos = dequeue_pos.load(Ordering::Relaxed);
240        }
241    }
242
243    let data = (*cell).data.as_ptr().read();
244    (*cell)
245        .sequence
246        .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
247    Some(data)
248}
249
250unsafe fn enqueue<T>(
251    buffer: *mut Cell<T>,
252    enqueue_pos: &AtomicTargetSize,
253    mask: IntSize,
254    item: T,
255) -> Result<(), T> {
256    let mut pos = enqueue_pos.load(Ordering::Relaxed);
257
258    let mut cell;
259    loop {
260        cell = buffer.add(usize::from(pos & mask));
261        let seq = (*cell).sequence.load(Ordering::Acquire);
262        let dif = (seq as i8).wrapping_sub(pos as i8);
263
264        if dif == 0 {
265            if enqueue_pos
266                .compare_exchange_weak(
267                    pos,
268                    pos.wrapping_add(1),
269                    Ordering::Relaxed,
270                    Ordering::Relaxed,
271                )
272                .is_ok()
273            {
274                break;
275            }
276        } else if dif < 0 {
277            return Err(item);
278        } else {
279            pos = enqueue_pos.load(Ordering::Relaxed);
280        }
281    }
282
283    (*cell).data.as_mut_ptr().write(item);
284    (*cell)
285        .sequence
286        .store(pos.wrapping_add(1), Ordering::Release);
287    Ok(())
288}
289
290#[cfg(test)]
291mod tests {
292    use super::Q2;
293
294    #[test]
295    fn sanity() {
296        let q = Q2::new();
297        q.enqueue(0).unwrap();
298        q.enqueue(1).unwrap();
299        assert!(q.enqueue(2).is_err());
300
301        assert_eq!(q.dequeue(), Some(0));
302        assert_eq!(q.dequeue(), Some(1));
303        assert_eq!(q.dequeue(), None);
304    }
305
306    #[test]
307    fn drain_at_pos255() {
308        let q = Q2::new();
309        for _ in 0..255 {
310            assert!(q.enqueue(0).is_ok());
311            assert_eq!(q.dequeue(), Some(0));
312        }
313        // this should not block forever
314        assert_eq!(q.dequeue(), None);
315    }
316
317    #[test]
318    fn full_at_wrapped_pos0() {
319        let q = Q2::new();
320        for _ in 0..254 {
321            assert!(q.enqueue(0).is_ok());
322            assert_eq!(q.dequeue(), Some(0));
323        }
324        assert!(q.enqueue(0).is_ok());
325        assert!(q.enqueue(0).is_ok());
326        // this should not block forever
327        assert!(q.enqueue(0).is_err());
328    }
329}