rtic_sync/
channel.rs

1//! An async aware MPSC channel that can be used on no-alloc systems.
2
3use crate::unsafecell::UnsafeCell;
4use core::{
5    future::poll_fn,
6    mem::MaybeUninit,
7    pin::Pin,
8    ptr,
9    sync::atomic::{fence, Ordering},
10    task::{Poll, Waker},
11};
12#[doc(hidden)]
13pub use critical_section;
14use heapless::Deque;
15use rtic_common::{
16    dropper::OnDrop, wait_queue::DoublyLinkedList, wait_queue::Link,
17    waker_registration::CriticalSectionWakerRegistration as WakerRegistration,
18};
19
20#[cfg(feature = "defmt-03")]
21use crate::defmt;
22
23type WaitQueueData = (Waker, SlotPtr);
24type WaitQueue = DoublyLinkedList<WaitQueueData>;
25
26/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
27///
28/// This channel uses critical sections, however there are extremely small and all `memcpy`
29/// operations of `T` are done without critical sections.
30pub struct Channel<T, const N: usize> {
31    // Here are all indexes that are not used in `slots` and ready to be allocated.
32    freeq: UnsafeCell<Deque<u8, N>>,
33    // Here are wakers and indexes to slots that are ready to be dequeued by the receiver.
34    readyq: UnsafeCell<Deque<u8, N>>,
35    // Waker for the receiver.
36    receiver_waker: WakerRegistration,
37    // Storage for N `T`s, so we don't memcpy around a lot of `T`s.
38    slots: [UnsafeCell<MaybeUninit<T>>; N],
39    // If there is no room in the queue a `Sender`s can wait for there to be place in the queue.
40    wait_queue: WaitQueue,
41    // Keep track of the receiver.
42    receiver_dropped: UnsafeCell<bool>,
43    // Keep track of the number of senders.
44    num_senders: UnsafeCell<usize>,
45}
46
47unsafe impl<T, const N: usize> Send for Channel<T, N> {}
48
49unsafe impl<T, const N: usize> Sync for Channel<T, N> {}
50
51macro_rules! cs_access {
52    ($name:ident, $type:ty) => {
53        /// Access the value mutably.
54        ///
55        /// SAFETY: this function must not be called recursively within `f`.
56        unsafe fn $name<F, R>(&self, _cs: critical_section::CriticalSection, f: F) -> R
57        where
58            F: FnOnce(&mut $type) -> R,
59        {
60            let v = self.$name.get_mut();
61            // SAFETY: we have exclusive access due to the critical section.
62            let v = unsafe { v.deref() };
63            f(v)
64        }
65    };
66}
67
68impl<T, const N: usize> Default for Channel<T, N> {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl<T, const N: usize> Channel<T, N> {
75    const _CHECK: () = assert!(N < 256, "This queue support a maximum of 255 entries");
76
77    /// Create a new channel.
78    #[cfg(not(loom))]
79    pub const fn new() -> Self {
80        Self {
81            freeq: UnsafeCell::new(Deque::new()),
82            readyq: UnsafeCell::new(Deque::new()),
83            receiver_waker: WakerRegistration::new(),
84            slots: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N],
85            wait_queue: WaitQueue::new(),
86            receiver_dropped: UnsafeCell::new(false),
87            num_senders: UnsafeCell::new(0),
88        }
89    }
90
91    /// Create a new channel.
92    #[cfg(loom)]
93    pub fn new() -> Self {
94        Self {
95            freeq: UnsafeCell::new(Deque::new()),
96            readyq: UnsafeCell::new(Deque::new()),
97            receiver_waker: WakerRegistration::new(),
98            slots: core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit())),
99            wait_queue: WaitQueue::new(),
100            receiver_dropped: UnsafeCell::new(false),
101            num_senders: UnsafeCell::new(0),
102        }
103    }
104
105    /// Split the queue into a `Sender`/`Receiver` pair.
106    pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
107        let freeq = self.freeq.as_mut();
108
109        // Fill free queue
110        for idx in 0..N as u8 {
111            // NOTE(assert): `split`-ing does not put `freeq` into a known-empty
112            // state, so `debug_assert` is not good enough.
113            assert!(!freeq.is_full());
114
115            // SAFETY: This safe as the loop goes from 0 to the capacity of the underlying queue.
116            unsafe {
117                freeq.push_back_unchecked(idx);
118            }
119        }
120
121        debug_assert!(freeq.is_full());
122
123        // There is now 1 sender
124        *self.num_senders.as_mut() = 1;
125
126        (Sender(self), Receiver(self))
127    }
128
129    cs_access!(freeq, Deque<u8, N>);
130    cs_access!(readyq, Deque<u8, N>);
131    cs_access!(receiver_dropped, bool);
132    cs_access!(num_senders, usize);
133
134    /// Return free slot `slot` to the channel.
135    ///
136    /// This will do one of two things:
137    /// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
138    /// 2. else, insert `slot` into `self.freeq`.
139    ///
140    /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
141    unsafe fn return_free_slot(&self, slot: u8) {
142        critical_section::with(|cs| {
143            fence(Ordering::SeqCst);
144
145            // If someone is waiting in the `wait_queue`, wake the first one up & hand it the free slot.
146            if let Some((wait_head, mut freeq_slot)) = self.wait_queue.pop() {
147                // SAFETY: `freeq_slot` is valid for writes: we are in a critical
148                // section & the `SlotPtr` lives for at least the duration of the wait queue link.
149                unsafe { freeq_slot.replace(Some(slot), cs) };
150                wait_head.wake();
151            } else {
152                // SAFETY: `self.freeq` is not called recursively.
153                unsafe {
154                    self.freeq(cs, |freeq| {
155                        assert!(!freeq.is_full());
156                        // SAFETY: `freeq` is not full.
157                        freeq.push_back_unchecked(slot);
158                    });
159                }
160            }
161        })
162    }
163}
164
165/// Creates a split channel with `'static` lifetime.
166#[macro_export]
167#[cfg(not(loom))]
168macro_rules! make_channel {
169    ($type:ty, $size:expr) => {{
170        static mut CHANNEL: $crate::channel::Channel<$type, $size> =
171            $crate::channel::Channel::new();
172
173        static CHECK: $crate::portable_atomic::AtomicU8 = $crate::portable_atomic::AtomicU8::new(0);
174
175        $crate::channel::critical_section::with(|_| {
176            if CHECK.load(::core::sync::atomic::Ordering::Relaxed) != 0 {
177                panic!("call to the same `make_channel` instance twice");
178            }
179
180            CHECK.store(1, ::core::sync::atomic::Ordering::Relaxed);
181        });
182
183        // SAFETY: This is safe as we hide the static mut from others to access it.
184        // Only this point is where the mutable access happens.
185        #[allow(static_mut_refs)]
186        unsafe {
187            CHANNEL.split()
188        }
189    }};
190}
191
192// -------- Sender
193
194/// Error state for when the receiver has been dropped.
195#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
196pub struct NoReceiver<T>(pub T);
197
198/// Errors that 'try_send` can have.
199#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
200pub enum TrySendError<T> {
201    /// Error state for when the receiver has been dropped.
202    NoReceiver(T),
203    /// Error state when the queue is full.
204    Full(T),
205}
206
207impl<T> core::fmt::Debug for NoReceiver<T>
208where
209    T: core::fmt::Debug,
210{
211    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
212        write!(f, "NoReceiver({:?})", self.0)
213    }
214}
215
216impl<T> core::fmt::Debug for TrySendError<T>
217where
218    T: core::fmt::Debug,
219{
220    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
221        match self {
222            TrySendError::NoReceiver(v) => write!(f, "NoReceiver({v:?})"),
223            TrySendError::Full(v) => write!(f, "Full({v:?})"),
224        }
225    }
226}
227
228impl<T> PartialEq for TrySendError<T>
229where
230    T: PartialEq,
231{
232    fn eq(&self, other: &Self) -> bool {
233        match (self, other) {
234            (TrySendError::NoReceiver(v1), TrySendError::NoReceiver(v2)) => v1.eq(v2),
235            (TrySendError::NoReceiver(_), TrySendError::Full(_)) => false,
236            (TrySendError::Full(_), TrySendError::NoReceiver(_)) => false,
237            (TrySendError::Full(v1), TrySendError::Full(v2)) => v1.eq(v2),
238        }
239    }
240}
241
242/// A `Sender` can send to the channel and can be cloned.
243pub struct Sender<'a, T, const N: usize>(&'a Channel<T, N>);
244
245unsafe impl<T, const N: usize> Send for Sender<'_, T, N> {}
246
247/// This is needed to make the async closure in `send` accept that we "share"
248/// the link possible between threads.
249#[derive(Clone)]
250struct LinkPtr(*mut Option<Link<WaitQueueData>>);
251
252impl LinkPtr {
253    /// This will dereference the pointer stored within and give out an `&mut`.
254    unsafe fn get(&mut self) -> &mut Option<Link<WaitQueueData>> {
255        &mut *self.0
256    }
257}
258
259unsafe impl Send for LinkPtr {}
260
261unsafe impl Sync for LinkPtr {}
262
263/// This is needed to make the async closure in `send` accept that we "share"
264/// the link possible between threads.
265#[derive(Clone)]
266struct SlotPtr(*mut Option<u8>);
267
268impl SlotPtr {
269    /// Replace the value of this slot with `new_value`, and return
270    /// the old value.
271    ///
272    /// SAFETY: the pointer in this `SlotPtr` must be valid for writes.
273    unsafe fn replace(
274        &mut self,
275        new_value: Option<u8>,
276        _cs: critical_section::CriticalSection,
277    ) -> Option<u8> {
278        // SAFETY: the critical section guarantees exclusive access, and the
279        // caller guarantees that the pointer is valid.
280        self.replace_exclusive(new_value)
281    }
282
283    /// Replace the value of this slot with `new_value`, and return
284    /// the old value.
285    ///
286    /// SAFETY: the pointer in this `SlotPtr` must be valid for writes, and the caller must guarantee exclusive
287    /// access to the underlying value..
288    unsafe fn replace_exclusive(&mut self, new_value: Option<u8>) -> Option<u8> {
289        // SAFETY: the caller has ensured that we have exclusive access & that
290        // the pointer is valid.
291        unsafe { core::ptr::replace(self.0, new_value) }
292    }
293}
294
295unsafe impl Send for SlotPtr {}
296
297unsafe impl Sync for SlotPtr {}
298
299impl<T, const N: usize> core::fmt::Debug for Sender<'_, T, N> {
300    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
301        write!(f, "Sender")
302    }
303}
304
305#[cfg(feature = "defmt-03")]
306impl<T, const N: usize> defmt::Format for Sender<'_, T, N> {
307    fn format(&self, f: defmt::Formatter) {
308        defmt::write!(f, "Sender",)
309    }
310}
311
312impl<T, const N: usize> Sender<'_, T, N> {
313    #[inline(always)]
314    fn send_footer(&mut self, idx: u8, val: T) {
315        // Write the value to the slots, note; this memcpy is not under a critical section.
316        unsafe {
317            let first_element = self.0.slots.get_unchecked(idx as usize).get_mut();
318            let ptr = first_element.deref().as_mut_ptr();
319            ptr::write(ptr, val)
320        }
321
322        // Write the value into the ready queue.
323        critical_section::with(|cs| {
324            // SAFETY: `self.0.readyq` is not called recursively.
325            unsafe {
326                self.0.readyq(cs, |readyq| {
327                    assert!(!readyq.is_full());
328                    // SAFETY: ready is not full.
329                    readyq.push_back_unchecked(idx);
330                });
331            }
332        });
333
334        fence(Ordering::SeqCst);
335
336        // If there is a receiver waker, wake it.
337        self.0.receiver_waker.wake();
338    }
339
340    /// Try to send a value, non-blocking. If the channel is full this will return an error.
341    pub fn try_send(&mut self, val: T) -> Result<(), TrySendError<T>> {
342        // If the wait queue is not empty, we can't try to push into the queue.
343        if !self.0.wait_queue.is_empty() {
344            return Err(TrySendError::Full(val));
345        }
346
347        // No receiver available.
348        if self.is_closed() {
349            return Err(TrySendError::NoReceiver(val));
350        }
351
352        let free_slot = critical_section::with(|cs| unsafe {
353            // SAFETY: `self.0.freeq` is not called recursively.
354            self.0.freeq(cs, |q| q.pop_front())
355        });
356
357        let idx = if let Some(idx) = free_slot {
358            idx
359        } else {
360            return Err(TrySendError::Full(val));
361        };
362
363        self.send_footer(idx, val);
364
365        Ok(())
366    }
367
368    /// Send a value. If there is no place left in the queue this will wait until there is.
369    /// If the receiver does not exist this will return an error.
370    pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
371        let mut free_slot_ptr: Option<u8> = None;
372        let mut link_ptr: Option<Link<WaitQueueData>> = None;
373
374        // Make this future `Drop`-safe.
375        // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
376        let mut link_ptr = LinkPtr(core::ptr::addr_of_mut!(link_ptr));
377        // SAFETY(freed_slot): Shadow the original definition of `free_slot_ptr` so we can't abuse it.
378        let mut free_slot_ptr = SlotPtr(core::ptr::addr_of_mut!(free_slot_ptr));
379
380        let mut link_ptr2 = link_ptr.clone();
381        let mut free_slot_ptr2 = free_slot_ptr.clone();
382        let dropper = OnDrop::new(|| {
383            // SAFETY: We only run this closure and dereference the pointer if we have
384            // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
385            // of this pointer is in the `poll_fn`.
386            if let Some(link) = unsafe { link_ptr2.get() } {
387                link.remove_from_list(&self.0.wait_queue);
388            }
389
390            // Return our potentially-unused free slot.
391            // Since we are certain that our link has been removed from the list (either
392            // pop-ed or removed just above), we have exclusive access to the free slot pointer.
393            if let Some(freed_slot) = unsafe { free_slot_ptr2.replace_exclusive(None) } {
394                // SAFETY: freed slot is passed to us from `return_free_slot`, which either
395                // directly (through `try_recv`), or indirectly (through another `return_free_slot`)
396                // comes from `readyq`.
397                unsafe { self.0.return_free_slot(freed_slot) };
398            }
399        });
400
401        let idx = poll_fn(|cx| {
402            //  Do all this in one critical section, else there can be race conditions
403            critical_section::with(|cs| {
404                if self.is_closed() {
405                    return Poll::Ready(Err(()));
406                }
407
408                let wq_empty = self.0.wait_queue.is_empty();
409                // SAFETY: `self.0.freeq` is not called recursively.
410                let freeq_empty = unsafe { self.0.freeq(cs, |q| q.is_empty()) };
411
412                // SAFETY: This pointer is only dereferenced here and on drop of the future
413                // which happens outside this `poll_fn`'s stack frame.
414                let link = unsafe { link_ptr.get() };
415
416                // We are already in the wait queue.
417                if let Some(queue_link) = link {
418                    if queue_link.is_popped() {
419                        // SAFETY: `free_slot_ptr` is valid for writes until the end of this future.
420                        let slot = unsafe { free_slot_ptr.replace(None, cs) };
421
422                        // Our link was popped, so it is most definitely not in the list.
423                        // We can safely & correctly `take` it to prevent ourselves from
424                        // redundantly attempting to remove it from the list a 2nd time.
425                        link.take();
426
427                        // If our link is popped, then:
428                        // 1. We were popped by `return_free_lot` and provided us with a slot.
429                        // 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed.
430                        if let Some(slot) = slot {
431                            Poll::Ready(Ok(slot))
432                        } else {
433                            Poll::Ready(Err(()))
434                        }
435                    } else {
436                        Poll::Pending
437                    }
438                }
439                // We are not in the wait queue, but others are, or there is currently no free
440                // slot available.
441                else if !wq_empty || freeq_empty {
442                    // Place the link in the wait queue.
443                    let link_ref =
444                        link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
445
446                    // SAFETY(new_unchecked): The address to the link is stable as it is defined
447                    // outside this stack frame.
448                    // SAFETY(push): `link_ref` lifetime comes from `link_ptr` and `free_slot_ptr` that
449                    // are shadowed and we make sure in `dropper` that the link is removed from the queue
450                    // before dropping `link_ptr` AND `dropper` makes sure that the shadowed
451                    // `ptr`s live until the end of the stack frame.
452                    unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
453
454                    Poll::Pending
455                }
456                // We are not in the wait queue, no one else is waiting, and there is a free slot available.
457                else {
458                    // SAFETY: `self.0.freeq` is not called recursively.
459                    unsafe {
460                        self.0.freeq(cs, |freeq| {
461                            assert!(!freeq.is_empty());
462                            // SAFETY: `freeq` is non-empty
463                            let slot = freeq.pop_back_unchecked();
464                            Poll::Ready(Ok(slot))
465                        })
466                    }
467                }
468            })
469        })
470        .await;
471
472        // Make sure the link is removed from the queue.
473        drop(dropper);
474
475        if let Ok(idx) = idx {
476            self.send_footer(idx, val);
477
478            Ok(())
479        } else {
480            Err(NoReceiver(val))
481        }
482    }
483
484    /// Returns true if there is no `Receiver`s.
485    pub fn is_closed(&self) -> bool {
486        critical_section::with(|cs| unsafe {
487            // SAFETY: `self.0.receiver_dropped` is not called recursively.
488            self.0.receiver_dropped(cs, |v| *v)
489        })
490    }
491
492    /// Is the queue full.
493    pub fn is_full(&self) -> bool {
494        critical_section::with(|cs| unsafe {
495            // SAFETY: `self.0.freeq` is not called recursively.
496            self.0.freeq(cs, |v| v.is_empty())
497        })
498    }
499
500    /// Is the queue empty.
501    pub fn is_empty(&self) -> bool {
502        critical_section::with(|cs| unsafe {
503            // SAFETY: `self.0.freeq` is not called recursively.
504            self.0.freeq(cs, |v| v.is_full())
505        })
506    }
507}
508
509impl<T, const N: usize> Drop for Sender<'_, T, N> {
510    fn drop(&mut self) {
511        // Count down the reference counter
512        let num_senders = critical_section::with(|cs| {
513            unsafe {
514                // SAFETY: `self.0.num_senders` is not called recursively.
515                self.0.num_senders(cs, |s| {
516                    *s -= 1;
517                    *s
518                })
519            }
520        });
521
522        // If there are no senders, wake the receiver to do error handling.
523        if num_senders == 0 {
524            self.0.receiver_waker.wake();
525        }
526    }
527}
528
529impl<T, const N: usize> Clone for Sender<'_, T, N> {
530    fn clone(&self) -> Self {
531        // Count up the reference counter
532        critical_section::with(|cs| unsafe {
533            // SAFETY: `self.0.num_senders` is not called recursively.
534            self.0.num_senders(cs, |v| *v += 1);
535        });
536
537        Self(self.0)
538    }
539}
540
541// -------- Receiver
542
543/// A receiver of the channel. There can only be one receiver at any time.
544pub struct Receiver<'a, T, const N: usize>(&'a Channel<T, N>);
545
546unsafe impl<T, const N: usize> Send for Receiver<'_, T, N> {}
547
548impl<T, const N: usize> core::fmt::Debug for Receiver<'_, T, N> {
549    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
550        write!(f, "Receiver")
551    }
552}
553
554#[cfg(feature = "defmt-03")]
555impl<T, const N: usize> defmt::Format for Receiver<'_, T, N> {
556    fn format(&self, f: defmt::Formatter) {
557        defmt::write!(f, "Receiver",)
558    }
559}
560
561/// Possible receive errors.
562#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
563#[derive(Debug, PartialEq, Eq, Clone, Copy)]
564pub enum ReceiveError {
565    /// Error state for when all senders has been dropped.
566    NoSender,
567    /// Error state for when the queue is empty.
568    Empty,
569}
570
571impl<T, const N: usize> Receiver<'_, T, N> {
572    /// Receives a value if there is one in the channel, non-blocking.
573    pub fn try_recv(&mut self) -> Result<T, ReceiveError> {
574        // Try to get a ready slot.
575        let ready_slot = critical_section::with(|cs| unsafe {
576            // SAFETY: `self.0.readyq` is not called recursively.
577            self.0.readyq(cs, |q| q.pop_front())
578        });
579
580        if let Some(rs) = ready_slot {
581            // Read the value from the slots, note; this memcpy is not under a critical section.
582            let r = unsafe {
583                let first_element = self.0.slots.get_unchecked(rs as usize).get_mut();
584                let ptr = first_element.deref().as_ptr();
585                ptr::read(ptr)
586            };
587
588            // Return the index to the free queue after we've read the value.
589            // SAFETY: `rs` comes directly from `readyq`.
590            unsafe { self.0.return_free_slot(rs) };
591
592            Ok(r)
593        } else if self.is_closed() {
594            Err(ReceiveError::NoSender)
595        } else {
596            Err(ReceiveError::Empty)
597        }
598    }
599
600    /// Receives a value, waiting if the queue is empty.
601    /// If all senders are dropped this will error with `NoSender`.
602    pub async fn recv(&mut self) -> Result<T, ReceiveError> {
603        // There was nothing in the queue, setup the waiting.
604        poll_fn(|cx| {
605            // Register waker.
606            // TODO: Should it happen here or after the if? This might cause a spurious wake.
607            self.0.receiver_waker.register(cx.waker());
608
609            // Try to dequeue.
610            match self.try_recv() {
611                Ok(val) => {
612                    return Poll::Ready(Ok(val));
613                }
614                Err(ReceiveError::NoSender) => {
615                    return Poll::Ready(Err(ReceiveError::NoSender));
616                }
617                _ => {}
618            }
619
620            Poll::Pending
621        })
622        .await
623    }
624
625    /// Returns true if there are no `Sender`s.
626    pub fn is_closed(&self) -> bool {
627        critical_section::with(|cs| unsafe {
628            // SAFETY: `self.0.num_senders` is not called recursively.
629            self.0.num_senders(cs, |v| *v == 0)
630        })
631    }
632
633    /// Is the queue full.
634    pub fn is_full(&self) -> bool {
635        critical_section::with(|cs| unsafe {
636            // SAFETY: `self.0.readyq` is not called recursively.
637            self.0.readyq(cs, |v| v.is_full())
638        })
639    }
640
641    /// Is the queue empty.
642    pub fn is_empty(&self) -> bool {
643        critical_section::with(|cs| unsafe {
644            // SAFETY: `self.0.readyq` is not called recursively.
645            self.0.readyq(cs, |v| v.is_empty())
646        })
647    }
648}
649
650impl<T, const N: usize> Drop for Receiver<'_, T, N> {
651    fn drop(&mut self) {
652        // Mark the receiver as dropped and wake all waiters
653        critical_section::with(|cs| unsafe {
654            // SAFETY: `self.0.receiver_dropped` is not called recursively.
655            self.0.receiver_dropped(cs, |v| *v = true);
656        });
657
658        while let Some((waker, _)) = self.0.wait_queue.pop() {
659            waker.wake();
660        }
661    }
662}
663
664#[cfg(test)]
665#[cfg(not(loom))]
666mod tests {
667    use cassette::Cassette;
668
669    use super::*;
670
671    #[test]
672    fn empty() {
673        let (mut s, mut r) = make_channel!(u32, 10);
674
675        assert!(s.is_empty());
676        assert!(r.is_empty());
677
678        s.try_send(1).unwrap();
679
680        assert!(!s.is_empty());
681        assert!(!r.is_empty());
682
683        r.try_recv().unwrap();
684
685        assert!(s.is_empty());
686        assert!(r.is_empty());
687    }
688
689    #[test]
690    fn full() {
691        let (mut s, mut r) = make_channel!(u32, 3);
692
693        for _ in 0..3 {
694            assert!(!s.is_full());
695            assert!(!r.is_full());
696
697            s.try_send(1).unwrap();
698        }
699
700        assert!(s.is_full());
701        assert!(r.is_full());
702
703        for _ in 0..3 {
704            r.try_recv().unwrap();
705
706            assert!(!s.is_full());
707            assert!(!r.is_full());
708        }
709    }
710
711    #[test]
712    fn send_recieve() {
713        let (mut s, mut r) = make_channel!(u32, 10);
714
715        for i in 0..10 {
716            s.try_send(i).unwrap();
717        }
718
719        assert_eq!(s.try_send(11), Err(TrySendError::Full(11)));
720
721        for i in 0..10 {
722            assert_eq!(r.try_recv().unwrap(), i);
723        }
724
725        assert_eq!(r.try_recv(), Err(ReceiveError::Empty));
726    }
727
728    #[test]
729    fn closed_recv() {
730        let (s, mut r) = make_channel!(u32, 10);
731
732        drop(s);
733
734        assert!(r.is_closed());
735
736        assert_eq!(r.try_recv(), Err(ReceiveError::NoSender));
737    }
738
739    #[test]
740    fn closed_sender() {
741        let (mut s, r) = make_channel!(u32, 10);
742
743        drop(r);
744
745        assert!(s.is_closed());
746
747        assert_eq!(s.try_send(11), Err(TrySendError::NoReceiver(11)));
748    }
749
750    fn make() {
751        let _ = make_channel!(u32, 10);
752    }
753
754    #[test]
755    #[should_panic]
756    fn double_make_channel() {
757        make();
758        make();
759    }
760
761    #[test]
762    fn tuple_channel() {
763        let _ = make_channel!((i32, u32), 10);
764    }
765
766    fn freeq<const N: usize, T, F, R>(channel: &Channel<T, N>, f: F) -> R
767    where
768        F: FnOnce(&mut Deque<u8, N>) -> R,
769    {
770        critical_section::with(|cs| unsafe { channel.freeq(cs, f) })
771    }
772
773    #[test]
774    fn dropping_waked_send_returns_freeq_item() {
775        let (mut tx, mut rx) = make_channel!(u8, 1);
776
777        tx.try_send(0).unwrap();
778        assert!(freeq(&rx.0, |q| q.is_empty()));
779
780        // Running this in a separate thread scope to ensure that `pinned_future` is dropped fully.
781        //
782        // Calling drop explicitly gets hairy because dropping things behind a `Pin` is not easy.
783        std::thread::scope(|scope| {
784            scope.spawn(|| {
785                let pinned_future = core::pin::pin!(tx.send(1));
786                let mut future = Cassette::new(pinned_future);
787
788                future.poll_on();
789
790                assert!(freeq(&rx.0, |q| q.is_empty()));
791                assert!(!rx.0.wait_queue.is_empty());
792
793                assert_eq!(rx.try_recv(), Ok(0));
794
795                assert!(freeq(&rx.0, |q| q.is_empty()));
796            });
797        });
798
799        assert!(!freeq(&rx.0, |q| q.is_empty()));
800
801        // Make sure that rx & tx are alive until here for good measure.
802        drop((tx, rx));
803    }
804}
805
806#[cfg(not(loom))]
807#[cfg(test)]
808mod tokio_tests {
809    #[tokio::test]
810    async fn stress_channel() {
811        const NUM_RUNS: usize = 1_000;
812        const QUEUE_SIZE: usize = 10;
813
814        let (s, mut r) = make_channel!(u32, QUEUE_SIZE);
815        let mut v = std::vec::Vec::new();
816
817        for i in 0..NUM_RUNS {
818            let mut s = s.clone();
819
820            v.push(tokio::spawn(async move {
821                s.send(i as _).await.unwrap();
822            }));
823        }
824
825        let mut map = std::collections::BTreeSet::new();
826
827        for _ in 0..NUM_RUNS {
828            map.insert(r.recv().await.unwrap());
829        }
830
831        assert_eq!(map.len(), NUM_RUNS);
832
833        for v in v {
834            v.await.unwrap();
835        }
836    }
837}
838
839#[cfg(test)]
840#[cfg(loom)]
841mod loom_test {
842    use cassette::Cassette;
843    use loom::thread;
844
845    #[macro_export]
846    #[allow(missing_docs)]
847    macro_rules! make_loom_channel {
848        ($type:ty, $size:expr) => {{
849            let channel: crate::channel::Channel<$type, $size> = super::Channel::new();
850            let boxed = Box::new(channel);
851            let boxed = Box::leak(boxed);
852
853            // SAFETY: This is safe as we hide the static mut from others to access it.
854            // Only this point is where the mutable access happens.
855            boxed.split()
856        }};
857    }
858
859    // This test tests the following scenarios:
860    // 1. Receiver is dropped while concurrent senders are waiting to send.
861    // 2. Concurrent senders are competing for the same free slot.
862    #[test]
863    pub fn concurrent_send_while_full_and_drop() {
864        loom::model(|| {
865            let (mut tx, mut rx) = make_loom_channel!([u8; 20], 1);
866            let mut cloned = tx.clone();
867
868            tx.try_send([1; 20]).unwrap();
869
870            let handle1 = thread::spawn(move || {
871                let future = std::pin::pin!(tx.send([1; 20]));
872                let mut future = Cassette::new(future);
873                if future.poll_on().is_none() {
874                    future.poll_on();
875                }
876            });
877
878            rx.try_recv().ok();
879
880            let future = std::pin::pin!(cloned.send([1; 20]));
881            let mut future = Cassette::new(future);
882            if future.poll_on().is_none() {
883                future.poll_on();
884            }
885
886            drop(rx);
887
888            handle1.join().unwrap();
889        });
890    }
891}