1use core::{cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
88
89#[cfg(full_atomic_polyfill)]
90use atomic_polyfill::{AtomicUsize, Ordering};
91#[cfg(not(full_atomic_polyfill))]
92use core::sync::atomic::{AtomicUsize, Ordering};
93
94pub struct Queue<T, const N: usize> {
99 pub(crate) head: AtomicUsize,
101
102 pub(crate) tail: AtomicUsize,
104
105 pub(crate) buffer: [UnsafeCell<MaybeUninit<T>>; N],
106}
107
108impl<T, const N: usize> Queue<T, N> {
109 const INIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());
110
111 #[inline]
112 fn increment(val: usize) -> usize {
113 (val + 1) % N
114 }
115
116 pub const fn new() -> Self {
118 crate::sealed::greater_than_1::<N>();
120
121 Queue {
122 head: AtomicUsize::new(0),
123 tail: AtomicUsize::new(0),
124 buffer: [Self::INIT; N],
125 }
126 }
127
128 #[inline]
130 pub const fn capacity(&self) -> usize {
131 N - 1
132 }
133
134 #[inline]
136 pub fn len(&self) -> usize {
137 let current_head = self.head.load(Ordering::Relaxed);
138 let current_tail = self.tail.load(Ordering::Relaxed);
139
140 current_tail.wrapping_sub(current_head).wrapping_add(N) % N
141 }
142
143 #[inline]
145 pub fn is_empty(&self) -> bool {
146 self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
147 }
148
149 #[inline]
151 pub fn is_full(&self) -> bool {
152 Self::increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
153 }
154
155 pub fn iter(&self) -> Iter<'_, T, N> {
157 Iter {
158 rb: self,
159 index: 0,
160 len: self.len(),
161 }
162 }
163
164 pub fn iter_mut(&mut self) -> IterMut<'_, T, N> {
166 let len = self.len();
167 IterMut {
168 rb: self,
169 index: 0,
170 len,
171 }
172 }
173
174 #[inline]
178 pub fn enqueue(&mut self, val: T) -> Result<(), T> {
179 unsafe { self.inner_enqueue(val) }
180 }
181
182 #[inline]
184 pub fn dequeue(&mut self) -> Option<T> {
185 unsafe { self.inner_dequeue() }
186 }
187
188 pub fn peek(&self) -> Option<&T> {
204 if !self.is_empty() {
205 let head = self.head.load(Ordering::Relaxed);
206 Some(unsafe { &*(self.buffer.get_unchecked(head).get() as *const T) })
207 } else {
208 None
209 }
210 }
211
212 unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
216 let current_tail = self.tail.load(Ordering::Relaxed);
217 let next_tail = Self::increment(current_tail);
218
219 if next_tail != self.head.load(Ordering::Acquire) {
220 (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
221 self.tail.store(next_tail, Ordering::Release);
222
223 Ok(())
224 } else {
225 Err(val)
226 }
227 }
228
229 unsafe fn inner_enqueue_unchecked(&self, val: T) {
233 let current_tail = self.tail.load(Ordering::Relaxed);
234
235 (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
236 self.tail
237 .store(Self::increment(current_tail), Ordering::Release);
238 }
239
240 pub unsafe fn enqueue_unchecked(&mut self, val: T) {
249 self.inner_enqueue_unchecked(val)
250 }
251
252 unsafe fn inner_dequeue(&self) -> Option<T> {
256 let current_head = self.head.load(Ordering::Relaxed);
257
258 if current_head == self.tail.load(Ordering::Acquire) {
259 None
260 } else {
261 let v = (self.buffer.get_unchecked(current_head).get() as *const T).read();
262
263 self.head
264 .store(Self::increment(current_head), Ordering::Release);
265
266 Some(v)
267 }
268 }
269
270 unsafe fn inner_dequeue_unchecked(&self) -> T {
274 let current_head = self.head.load(Ordering::Relaxed);
275 let v = (self.buffer.get_unchecked(current_head).get() as *const T).read();
276
277 self.head
278 .store(Self::increment(current_head), Ordering::Release);
279
280 v
281 }
282
283 pub unsafe fn dequeue_unchecked(&mut self) -> T {
290 self.inner_dequeue_unchecked()
291 }
292
293 pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) {
295 (Producer { rb: self }, Consumer { rb: self })
296 }
297}
298
299impl<T, const N: usize> Default for Queue<T, N> {
300 fn default() -> Self {
301 Self::new()
302 }
303}
304
305impl<T, const N: usize> Clone for Queue<T, N>
306where
307 T: Clone,
308{
309 fn clone(&self) -> Self {
310 let mut new: Queue<T, N> = Queue::new();
311
312 for s in self.iter() {
313 unsafe {
314 new.enqueue_unchecked(s.clone());
317 }
318 }
319
320 new
321 }
322}
323
324impl<T, const N: usize, const N2: usize> PartialEq<Queue<T, N2>> for Queue<T, N>
325where
326 T: PartialEq,
327{
328 fn eq(&self, other: &Queue<T, N2>) -> bool {
329 self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
330 }
331}
332
333impl<T, const N: usize> Eq for Queue<T, N> where T: Eq {}
334
335pub struct Iter<'a, T, const N: usize> {
337 rb: &'a Queue<T, N>,
338 index: usize,
339 len: usize,
340}
341
342impl<'a, T, const N: usize> Clone for Iter<'a, T, N> {
343 fn clone(&self) -> Self {
344 Self {
345 rb: self.rb,
346 index: self.index,
347 len: self.len,
348 }
349 }
350}
351
352pub struct IterMut<'a, T, const N: usize> {
354 rb: &'a mut Queue<T, N>,
355 index: usize,
356 len: usize,
357}
358
359impl<'a, T, const N: usize> Iterator for Iter<'a, T, N> {
360 type Item = &'a T;
361
362 fn next(&mut self) -> Option<Self::Item> {
363 if self.index < self.len {
364 let head = self.rb.head.load(Ordering::Relaxed);
365
366 let i = (head + self.index) % N;
367 self.index += 1;
368
369 Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) })
370 } else {
371 None
372 }
373 }
374}
375
376impl<'a, T, const N: usize> Iterator for IterMut<'a, T, N> {
377 type Item = &'a mut T;
378
379 fn next(&mut self) -> Option<Self::Item> {
380 if self.index < self.len {
381 let head = self.rb.head.load(Ordering::Relaxed);
382
383 let i = (head + self.index) % N;
384 self.index += 1;
385
386 Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) })
387 } else {
388 None
389 }
390 }
391}
392
393impl<'a, T, const N: usize> DoubleEndedIterator for Iter<'a, T, N> {
394 fn next_back(&mut self) -> Option<Self::Item> {
395 if self.index < self.len {
396 let head = self.rb.head.load(Ordering::Relaxed);
397
398 let i = (head + self.len - 1) % N;
400 self.len -= 1;
401 Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) })
402 } else {
403 None
404 }
405 }
406}
407
408impl<'a, T, const N: usize> DoubleEndedIterator for IterMut<'a, T, N> {
409 fn next_back(&mut self) -> Option<Self::Item> {
410 if self.index < self.len {
411 let head = self.rb.head.load(Ordering::Relaxed);
412
413 let i = (head + self.len - 1) % N;
415 self.len -= 1;
416 Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) })
417 } else {
418 None
419 }
420 }
421}
422
423impl<T, const N: usize> Drop for Queue<T, N> {
424 fn drop(&mut self) {
425 for item in self {
426 unsafe {
427 ptr::drop_in_place(item);
428 }
429 }
430 }
431}
432
433impl<T, const N: usize> fmt::Debug for Queue<T, N>
434where
435 T: fmt::Debug,
436{
437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 f.debug_list().entries(self.iter()).finish()
439 }
440}
441
442impl<T, const N: usize> hash::Hash for Queue<T, N>
443where
444 T: hash::Hash,
445{
446 fn hash<H: hash::Hasher>(&self, state: &mut H) {
447 for t in self.iter() {
449 hash::Hash::hash(t, state);
450 }
451 }
452}
453
454impl<T, const N: usize> hash32::Hash for Queue<T, N>
455where
456 T: hash32::Hash,
457{
458 fn hash<H: hash32::Hasher>(&self, state: &mut H) {
459 for t in self.iter() {
461 hash32::Hash::hash(t, state);
462 }
463 }
464}
465
466impl<'a, T, const N: usize> IntoIterator for &'a Queue<T, N> {
467 type Item = &'a T;
468 type IntoIter = Iter<'a, T, N>;
469
470 fn into_iter(self) -> Self::IntoIter {
471 self.iter()
472 }
473}
474
475impl<'a, T, const N: usize> IntoIterator for &'a mut Queue<T, N> {
476 type Item = &'a mut T;
477 type IntoIter = IterMut<'a, T, N>;
478
479 fn into_iter(self) -> Self::IntoIter {
480 self.iter_mut()
481 }
482}
483
484pub struct Consumer<'a, T, const N: usize> {
487 rb: &'a Queue<T, N>,
488}
489
490unsafe impl<'a, T, const N: usize> Send for Consumer<'a, T, N> where T: Send {}
491
492pub struct Producer<'a, T, const N: usize> {
495 rb: &'a Queue<T, N>,
496}
497
498unsafe impl<'a, T, const N: usize> Send for Producer<'a, T, N> where T: Send {}
499
500impl<'a, T, const N: usize> Consumer<'a, T, N> {
501 #[inline]
503 pub fn dequeue(&mut self) -> Option<T> {
504 unsafe { self.rb.inner_dequeue() }
505 }
506
507 #[inline]
512 pub unsafe fn dequeue_unchecked(&mut self) -> T {
513 self.rb.inner_dequeue_unchecked()
514 }
515
516 #[inline]
519 pub fn ready(&self) -> bool {
520 !self.rb.is_empty()
521 }
522
523 #[inline]
525 pub fn len(&self) -> usize {
526 self.rb.len()
527 }
528
529 #[inline]
531 pub fn capacity(&self) -> usize {
532 self.rb.capacity()
533 }
534
535 #[inline]
551 pub fn peek(&self) -> Option<&T> {
552 self.rb.peek()
553 }
554}
555
556impl<'a, T, const N: usize> Producer<'a, T, N> {
557 #[inline]
559 pub fn enqueue(&mut self, val: T) -> Result<(), T> {
560 unsafe { self.rb.inner_enqueue(val) }
561 }
562
563 #[inline]
567 pub unsafe fn enqueue_unchecked(&mut self, val: T) {
568 self.rb.inner_enqueue_unchecked(val)
569 }
570
571 #[inline]
574 pub fn ready(&self) -> bool {
575 !self.rb.is_full()
576 }
577
578 #[inline]
580 pub fn len(&self) -> usize {
581 self.rb.len()
582 }
583
584 #[inline]
586 pub fn capacity(&self) -> usize {
587 self.rb.capacity()
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use crate::spsc::Queue;
594 use hash32::Hasher;
595
596 #[test]
597 fn full() {
598 let mut rb: Queue<i32, 3> = Queue::new();
599
600 assert_eq!(rb.is_full(), false);
601
602 rb.enqueue(1).unwrap();
603 assert_eq!(rb.is_full(), false);
604
605 rb.enqueue(2).unwrap();
606 assert_eq!(rb.is_full(), true);
607 }
608
609 #[test]
610 fn empty() {
611 let mut rb: Queue<i32, 3> = Queue::new();
612
613 assert_eq!(rb.is_empty(), true);
614
615 rb.enqueue(1).unwrap();
616 assert_eq!(rb.is_empty(), false);
617
618 rb.enqueue(2).unwrap();
619 assert_eq!(rb.is_empty(), false);
620 }
621
622 #[test]
623 #[cfg_attr(miri, ignore)] fn len() {
625 let mut rb: Queue<i32, 3> = Queue::new();
626
627 assert_eq!(rb.len(), 0);
628
629 rb.enqueue(1).unwrap();
630 assert_eq!(rb.len(), 1);
631
632 rb.enqueue(2).unwrap();
633 assert_eq!(rb.len(), 2);
634
635 for _ in 0..1_000_000 {
636 let v = rb.dequeue().unwrap();
637 println!("{}", v);
638 rb.enqueue(v).unwrap();
639 assert_eq!(rb.len(), 2);
640 }
641 }
642
643 #[test]
644 #[cfg_attr(miri, ignore)] fn try_overflow() {
646 const N: usize = 23;
647 let mut rb: Queue<i32, N> = Queue::new();
648
649 for i in 0..N as i32 - 1 {
650 rb.enqueue(i).unwrap();
651 }
652
653 for _ in 0..1_000_000 {
654 for i in 0..N as i32 - 1 {
655 let d = rb.dequeue().unwrap();
656 assert_eq!(d, i);
657 rb.enqueue(i).unwrap();
658 }
659 }
660 }
661
662 #[test]
663 fn sanity() {
664 let mut rb: Queue<i32, 10> = Queue::new();
665
666 let (mut p, mut c) = rb.split();
667
668 assert_eq!(p.ready(), true);
669
670 assert_eq!(c.ready(), false);
671
672 assert_eq!(c.dequeue(), None);
673
674 p.enqueue(0).unwrap();
675
676 assert_eq!(c.dequeue(), Some(0));
677 }
678
679 #[test]
680 fn static_new() {
681 static mut _Q: Queue<i32, 4> = Queue::new();
682 }
683
684 #[test]
685 fn drop() {
686 struct Droppable;
687 impl Droppable {
688 fn new() -> Self {
689 unsafe {
690 COUNT += 1;
691 }
692 Droppable
693 }
694 }
695
696 impl Drop for Droppable {
697 fn drop(&mut self) {
698 unsafe {
699 COUNT -= 1;
700 }
701 }
702 }
703
704 static mut COUNT: i32 = 0;
705
706 {
707 let mut v: Queue<Droppable, 4> = Queue::new();
708 v.enqueue(Droppable::new()).ok().unwrap();
709 v.enqueue(Droppable::new()).ok().unwrap();
710 v.dequeue().unwrap();
711 }
712
713 assert_eq!(unsafe { COUNT }, 0);
714
715 {
716 let mut v: Queue<Droppable, 4> = Queue::new();
717 v.enqueue(Droppable::new()).ok().unwrap();
718 v.enqueue(Droppable::new()).ok().unwrap();
719 }
720
721 assert_eq!(unsafe { COUNT }, 0);
722 }
723
724 #[test]
725 fn iter() {
726 let mut rb: Queue<i32, 4> = Queue::new();
727
728 rb.enqueue(0).unwrap();
729 rb.dequeue().unwrap();
730 rb.enqueue(1).unwrap();
731 rb.enqueue(2).unwrap();
732 rb.enqueue(3).unwrap();
733
734 let mut items = rb.iter();
735
736 assert_eq!(items.next(), Some(&1));
738 assert_eq!(items.next(), Some(&2));
739 assert_eq!(items.next(), Some(&3));
740 assert_eq!(items.next(), None);
741 }
742
743 #[test]
744 fn iter_double_ended() {
745 let mut rb: Queue<i32, 4> = Queue::new();
746
747 rb.enqueue(0).unwrap();
748 rb.enqueue(1).unwrap();
749 rb.enqueue(2).unwrap();
750
751 let mut items = rb.iter();
752
753 assert_eq!(items.next(), Some(&0));
754 assert_eq!(items.next_back(), Some(&2));
755 assert_eq!(items.next(), Some(&1));
756 assert_eq!(items.next(), None);
757 assert_eq!(items.next_back(), None);
758 }
759
760 #[test]
761 fn iter_mut() {
762 let mut rb: Queue<i32, 4> = Queue::new();
763
764 rb.enqueue(0).unwrap();
765 rb.enqueue(1).unwrap();
766 rb.enqueue(2).unwrap();
767
768 let mut items = rb.iter_mut();
769
770 assert_eq!(items.next(), Some(&mut 0));
771 assert_eq!(items.next(), Some(&mut 1));
772 assert_eq!(items.next(), Some(&mut 2));
773 assert_eq!(items.next(), None);
774 }
775
776 #[test]
777 fn iter_mut_double_ended() {
778 let mut rb: Queue<i32, 4> = Queue::new();
779
780 rb.enqueue(0).unwrap();
781 rb.enqueue(1).unwrap();
782 rb.enqueue(2).unwrap();
783
784 let mut items = rb.iter_mut();
785
786 assert_eq!(items.next(), Some(&mut 0));
787 assert_eq!(items.next_back(), Some(&mut 2));
788 assert_eq!(items.next(), Some(&mut 1));
789 assert_eq!(items.next(), None);
790 assert_eq!(items.next_back(), None);
791 }
792
793 #[test]
794 fn wrap_around() {
795 let mut rb: Queue<i32, 4> = Queue::new();
796
797 rb.enqueue(0).unwrap();
798 rb.enqueue(1).unwrap();
799 rb.enqueue(2).unwrap();
800 rb.dequeue().unwrap();
801 rb.dequeue().unwrap();
802 rb.dequeue().unwrap();
803 rb.enqueue(3).unwrap();
804 rb.enqueue(4).unwrap();
805
806 assert_eq!(rb.len(), 2);
807 }
808
809 #[test]
810 fn ready_flag() {
811 let mut rb: Queue<i32, 3> = Queue::new();
812 let (mut p, mut c) = rb.split();
813 assert_eq!(c.ready(), false);
814 assert_eq!(p.ready(), true);
815
816 p.enqueue(0).unwrap();
817
818 assert_eq!(c.ready(), true);
819 assert_eq!(p.ready(), true);
820
821 p.enqueue(1).unwrap();
822
823 assert_eq!(c.ready(), true);
824 assert_eq!(p.ready(), false);
825
826 c.dequeue().unwrap();
827
828 assert_eq!(c.ready(), true);
829 assert_eq!(p.ready(), true);
830
831 c.dequeue().unwrap();
832
833 assert_eq!(c.ready(), false);
834 assert_eq!(p.ready(), true);
835 }
836
837 #[test]
838 fn clone() {
839 let mut rb1: Queue<i32, 4> = Queue::new();
840 rb1.enqueue(0).unwrap();
841 rb1.enqueue(0).unwrap();
842 rb1.dequeue().unwrap();
843 rb1.enqueue(0).unwrap();
844 let rb2 = rb1.clone();
845 assert_eq!(rb1.capacity(), rb2.capacity());
846 assert_eq!(rb1.len(), rb2.len());
847 assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
848 }
849
850 #[test]
851 fn eq() {
852 let mut rb1: Queue<i32, 4> = Queue::new();
855 rb1.enqueue(0).unwrap();
856 rb1.enqueue(0).unwrap();
857 rb1.dequeue().unwrap();
858 rb1.enqueue(0).unwrap();
859 let mut rb2: Queue<i32, 4> = Queue::new();
860 rb2.enqueue(0).unwrap();
861 rb2.enqueue(0).unwrap();
862 assert!(rb1 == rb2);
863 assert!(rb2 == rb1);
865 rb1.enqueue(0).unwrap();
867 assert!(rb1 != rb2);
868 rb2.enqueue(1).unwrap();
869 assert!(rb1 != rb2);
870 assert!(rb1 == rb1);
872 assert!(rb2 == rb2);
873 }
874
875 #[test]
876 fn hash_equality() {
877 let rb1 = {
880 let mut rb1: Queue<i32, 4> = Queue::new();
881 rb1.enqueue(0).unwrap();
882 rb1.enqueue(0).unwrap();
883 rb1.dequeue().unwrap();
884 rb1.enqueue(0).unwrap();
885 rb1
886 };
887 let rb2 = {
888 let mut rb2: Queue<i32, 4> = Queue::new();
889 rb2.enqueue(0).unwrap();
890 rb2.enqueue(0).unwrap();
891 rb2
892 };
893 let hash1 = {
894 let mut hasher1 = hash32::FnvHasher::default();
895 hash32::Hash::hash(&rb1, &mut hasher1);
896 let hash1 = hasher1.finish();
897 hash1
898 };
899 let hash2 = {
900 let mut hasher2 = hash32::FnvHasher::default();
901 hash32::Hash::hash(&rb2, &mut hasher2);
902 let hash2 = hasher2.finish();
903 hash2
904 };
905 assert_eq!(hash1, hash2);
906 }
907}