rtic/export/
executor.rs

1use super::atomic::{AtomicBool, AtomicPtr, Ordering};
2use core::{
3    cell::UnsafeCell,
4    future::Future,
5    mem::{self, ManuallyDrop, MaybeUninit},
6    pin::Pin,
7    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
8};
9
10static WAKER_VTABLE: RawWakerVTable =
11    RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
12
13unsafe fn waker_clone(p: *const ()) -> RawWaker {
14    RawWaker::new(p, &WAKER_VTABLE)
15}
16
17unsafe fn waker_wake(p: *const ()) {
18    // The only thing we need from a waker is the function to call to pend the async
19    // dispatcher.
20    let f: fn() = mem::transmute(p);
21    f();
22}
23
24unsafe fn waker_drop(_: *const ()) {
25    // nop
26}
27
28//============
29// AsyncTaskExecutor
30
31/// Pointer to executor holder.
32pub struct AsyncTaskExecutorPtr {
33    // Void pointer.
34    ptr: AtomicPtr<()>,
35}
36
37impl AsyncTaskExecutorPtr {
38    pub const fn new() -> Self {
39        Self {
40            ptr: AtomicPtr::new(core::ptr::null_mut()),
41        }
42    }
43
44    #[inline(always)]
45    pub fn set_in_main<F: Future>(&self, executor: &ManuallyDrop<AsyncTaskExecutor<F>>) {
46        self.ptr.store(executor as *const _ as _, Ordering::Relaxed);
47    }
48
49    #[inline(always)]
50    pub fn get(&self) -> *const () {
51        self.ptr.load(Ordering::Relaxed)
52    }
53}
54
55impl Default for AsyncTaskExecutorPtr {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61/// Executor for an async task.
62pub struct AsyncTaskExecutor<F: Future> {
63    // `task` is protected by the `running` flag.
64    task: UnsafeCell<MaybeUninit<F>>,
65    running: AtomicBool,
66    pending: AtomicBool,
67}
68
69unsafe impl<F: Future> Sync for AsyncTaskExecutor<F> {}
70
71macro_rules! new_n_args {
72    ($name:ident, $($t:ident),*) => {
73        #[inline(always)]
74        pub fn $name<$($t,)* Fun: Fn($($t,)*) -> F>(_f: Fun) -> Self {
75            Self::new()
76        }
77    };
78}
79
80macro_rules! from_ptr_n_args {
81    ($name:ident, $($t:ident),*) => {
82        #[inline(always)]
83        pub unsafe fn $name<$($t,)* Fun: Fn($($t,)*) -> F>(_f: Fun, ptr: &AsyncTaskExecutorPtr) -> &Self {
84            &*(ptr.get() as *const _)
85        }
86    };
87}
88
89impl<F: Future> Default for AsyncTaskExecutor<F> {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl<F: Future> AsyncTaskExecutor<F> {
96    /// Create a new executor.
97    #[inline(always)]
98    pub const fn new() -> Self {
99        Self {
100            task: UnsafeCell::new(MaybeUninit::uninit()),
101            running: AtomicBool::new(false),
102            pending: AtomicBool::new(false),
103        }
104    }
105
106    // Support for up to 16 arguments on async functions. Should be
107    // enough for now, else extend this list.
108    new_n_args!(new_0_args,);
109    new_n_args!(new_1_args, A1);
110    new_n_args!(new_2_args, A1, A2);
111    new_n_args!(new_3_args, A1, A2, A3);
112    new_n_args!(new_4_args, A1, A2, A3, A4);
113    new_n_args!(new_5_args, A1, A2, A3, A4, A5);
114    new_n_args!(new_6_args, A1, A2, A3, A4, A5, A6);
115    new_n_args!(new_7_args, A1, A2, A3, A4, A5, A6, A7);
116    new_n_args!(new_8_args, A1, A2, A3, A4, A5, A6, A7, A8);
117    new_n_args!(new_9_args, A1, A2, A3, A4, A5, A6, A7, A8, A9);
118    new_n_args!(new_10_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10);
119    new_n_args!(new_11_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11);
120    #[rustfmt::skip]
121    new_n_args!(new_12_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12);
122    #[rustfmt::skip]
123    new_n_args!(new_13_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13);
124    #[rustfmt::skip]
125    new_n_args!(new_14_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14);
126    #[rustfmt::skip]
127    new_n_args!(new_15_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15);
128    #[rustfmt::skip]
129    new_n_args!(new_16_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16);
130
131    from_ptr_n_args!(from_ptr_0_args,);
132    from_ptr_n_args!(from_ptr_1_args, A1);
133    from_ptr_n_args!(from_ptr_2_args, A1, A2);
134    from_ptr_n_args!(from_ptr_3_args, A1, A2, A3);
135    from_ptr_n_args!(from_ptr_4_args, A1, A2, A3, A4);
136    from_ptr_n_args!(from_ptr_5_args, A1, A2, A3, A4, A5);
137    from_ptr_n_args!(from_ptr_6_args, A1, A2, A3, A4, A5, A6);
138    from_ptr_n_args!(from_ptr_7_args, A1, A2, A3, A4, A5, A6, A7);
139    from_ptr_n_args!(from_ptr_8_args, A1, A2, A3, A4, A5, A6, A7, A8);
140    from_ptr_n_args!(from_ptr_9_args, A1, A2, A3, A4, A5, A6, A7, A8, A9);
141    from_ptr_n_args!(from_ptr_10_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10);
142    #[rustfmt::skip]
143    from_ptr_n_args!(from_ptr_11_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11);
144    #[rustfmt::skip]
145    from_ptr_n_args!(from_ptr_12_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12);
146    #[rustfmt::skip]
147    from_ptr_n_args!(from_ptr_13_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13);
148    #[rustfmt::skip]
149    from_ptr_n_args!(from_ptr_14_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14);
150    #[rustfmt::skip]
151    from_ptr_n_args!(from_ptr_15_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15);
152    #[rustfmt::skip]
153    from_ptr_n_args!(from_ptr_16_args, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16);
154
155    /// Check if there is an active task in the executor.
156    #[inline(always)]
157    pub fn is_running(&self) -> bool {
158        self.running.load(Ordering::Relaxed)
159    }
160
161    /// Checks if a waker has pended the executor and simultaneously clears the flag.
162    #[inline(always)]
163    fn check_and_clear_pending(&self) -> bool {
164        // Ordering::Acquire to enforce that update of task is visible to poll
165        self.pending
166            .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
167            .is_ok()
168    }
169
170    // Used by wakers to indicate that the executor needs to run.
171    #[inline(always)]
172    pub fn set_pending(&self) {
173        self.pending.store(true, Ordering::Release);
174    }
175
176    /// Allocate the executor. To use with `spawn`.
177    #[inline(always)]
178    pub unsafe fn try_allocate(&self) -> bool {
179        // Try to reserve the executor for a future.
180        self.running
181            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
182            .is_ok()
183    }
184
185    /// Spawn a future
186    #[inline(always)]
187    pub unsafe fn spawn(&self, future: F) {
188        // This unsafe is protected by `running` being false and the atomic setting it to true.
189        unsafe {
190            self.task.get().write(MaybeUninit::new(future));
191        }
192        self.set_pending();
193    }
194
195    /// Poll the future in the executor.
196    #[inline(always)]
197    pub fn poll(&self, wake: fn()) {
198        if self.is_running() && self.check_and_clear_pending() {
199            let waker = unsafe { Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)) };
200            let mut cx = Context::from_waker(&waker);
201            let future = unsafe { Pin::new_unchecked(&mut *(self.task.get() as *mut F)) };
202
203            match future.poll(&mut cx) {
204                Poll::Ready(_) => {
205                    self.running.store(false, Ordering::Release);
206                }
207                Poll::Pending => {}
208            }
209        }
210    }
211}