rtic_time/
timer_queue.rs

1//! A generic timer queue for async executors.
2
3use crate::linked_list::{self, Link, LinkedList};
4use crate::TimeoutError;
5
6use core::future::Future;
7use core::pin::Pin;
8use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use core::task::{Poll, Waker};
10
11mod backend;
12mod tick_type;
13pub use backend::TimerQueueBackend;
14pub use tick_type::TimerQueueTicks;
15
16/// Holds a waker and at which time instant this waker shall be awoken.
17struct WaitingWaker<Backend: TimerQueueBackend> {
18    waker: Waker,
19    release_at: Backend::Ticks,
20    was_popped: AtomicBool,
21}
22
23impl<Backend: TimerQueueBackend> Clone for WaitingWaker<Backend> {
24    fn clone(&self) -> Self {
25        Self {
26            waker: self.waker.clone(),
27            release_at: self.release_at,
28            was_popped: AtomicBool::new(self.was_popped.load(Ordering::Relaxed)),
29        }
30    }
31}
32
33impl<Backend: TimerQueueBackend> PartialEq for WaitingWaker<Backend> {
34    fn eq(&self, other: &Self) -> bool {
35        self.release_at == other.release_at
36    }
37}
38
39impl<Backend: TimerQueueBackend> PartialOrd for WaitingWaker<Backend> {
40    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
41        Some(self.release_at.compare(other.release_at))
42    }
43}
44
45/// A generic timer queue for async executors.
46///
47/// # Blocking
48///
49/// The internal priority queue uses global critical sections to manage access. This means that
50/// `await`ing a delay will cause a lock of the entire system for O(n) time. In practice the lock
51/// duration is ~10 clock cycles per element in the queue.
52///
53/// # Safety
54///
55/// This timer queue is based on an intrusive linked list, and by extension the links are stored
56/// on the async stacks of callers. The links are deallocated on `drop` or when the wait is
57/// complete.
58///
59/// Do not call `mem::forget` on an awaited future, or there will be dragons!
60pub struct TimerQueue<Backend: TimerQueueBackend> {
61    queue: LinkedList<WaitingWaker<Backend>>,
62    initialized: AtomicBool,
63}
64
65impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
72    /// Make a new queue.
73    pub const fn new() -> Self {
74        Self {
75            queue: LinkedList::new(),
76            initialized: AtomicBool::new(false),
77        }
78    }
79
80    /// Forwards the `Monotonic::now()` method.
81    #[inline(always)]
82    pub fn now(&self) -> Backend::Ticks {
83        Backend::now()
84    }
85
86    /// Takes the initialized monotonic to initialize the TimerQueue.
87    pub fn initialize(&self, backend: Backend) {
88        self.initialized.store(true, Ordering::SeqCst);
89
90        // Don't run drop on `Backend`
91        core::mem::forget(backend);
92    }
93
94    /// Call this in the interrupt handler of the hardware timer supporting the `Monotonic`
95    ///
96    /// # Safety
97    ///
98    /// It's always safe to call, but it must only be called from the interrupt of the
99    /// monotonic timer for correct operation.
100    pub unsafe fn on_monotonic_interrupt(&self) {
101        Backend::clear_compare_flag();
102        Backend::on_interrupt();
103
104        loop {
105            let mut release_at = None;
106            let head = self.queue.pop_if(|head| {
107                release_at = Some(head.release_at);
108
109                let should_pop = Backend::now().is_at_least(head.release_at);
110                head.was_popped.store(should_pop, Ordering::Relaxed);
111
112                should_pop
113            });
114
115            match (head, release_at) {
116                (Some(link), _) => {
117                    link.waker.wake();
118                }
119                (None, Some(instant)) => {
120                    Backend::enable_timer();
121                    Backend::set_compare(instant);
122
123                    if Backend::now().is_at_least(instant) {
124                        // The time for the next instant passed while handling it,
125                        // continue dequeueing
126                        continue;
127                    }
128
129                    break;
130                }
131                (None, None) => {
132                    // Queue is empty
133                    Backend::disable_timer();
134
135                    break;
136                }
137            }
138        }
139    }
140
141    /// Timeout at a specific time.
142    pub fn timeout_at<F: Future>(
143        &self,
144        instant: Backend::Ticks,
145        future: F,
146    ) -> Timeout<'_, Backend, F> {
147        Timeout {
148            delay: Delay::<Backend> {
149                instant,
150                queue: &self.queue,
151                link_ptr: None,
152                marker: AtomicUsize::new(0),
153            },
154            future,
155        }
156    }
157
158    /// Timeout after at least a specific duration.
159    #[inline]
160    pub fn timeout_after<F: Future>(
161        &self,
162        duration: Backend::Ticks,
163        future: F,
164    ) -> Timeout<'_, Backend, F> {
165        let now = Backend::now();
166        let mut timeout = now.wrapping_add(duration);
167        if now != timeout {
168            timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
169        }
170
171        // Wait for one period longer, because by definition timers have an uncertainty
172        // of one period, so waiting for 'at least' needs to compensate for that.
173        self.timeout_at(timeout, future)
174    }
175
176    /// Delay for at least some duration of time.
177    #[inline]
178    pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> {
179        let now = Backend::now();
180        let mut timeout = now.wrapping_add(duration);
181        if now != timeout {
182            timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
183        }
184
185        // Wait for one period longer, because by definition timers have an uncertainty
186        // of one period, so waiting for 'at least' needs to compensate for that.
187        self.delay_until(timeout)
188    }
189
190    /// Delay to some specific time instant.
191    pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> {
192        if !self.initialized.load(Ordering::Relaxed) {
193            panic!(
194                "The timer queue is not initialized with a monotonic, you need to run `initialize`"
195            );
196        }
197        Delay::<Backend> {
198            instant,
199            queue: &self.queue,
200            link_ptr: None,
201            marker: AtomicUsize::new(0),
202        }
203    }
204}
205
206/// Future returned by `delay` and `delay_until`.
207pub struct Delay<'q, Backend: TimerQueueBackend> {
208    instant: Backend::Ticks,
209    queue: &'q LinkedList<WaitingWaker<Backend>>,
210    link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>>,
211    marker: AtomicUsize,
212}
213
214impl<Backend: TimerQueueBackend> Future for Delay<'_, Backend> {
215    type Output = ();
216
217    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
218        // SAFETY: We ensure we never move anything out of this.
219        let this = unsafe { self.get_unchecked_mut() };
220
221        if Backend::now().is_at_least(this.instant) {
222            return Poll::Ready(());
223        }
224
225        // SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
226        // in `drop` we can't do this access concurrently with queue removal.
227        let link = &mut this.link_ptr;
228        if link.is_none() {
229            let link_ref = link.insert(Link::new(WaitingWaker {
230                waker: cx.waker().clone(),
231                release_at: this.instant,
232                was_popped: AtomicBool::new(false),
233            }));
234
235            // SAFETY(new_unchecked): The address to the link is stable as it is defined
236            // outside this stack frame.
237            // SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
238            // the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
239            // queue on drop, which happens before the struct and thus `link_ptr` goes out of
240            // scope.
241            let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
242            this.marker.store(addr, Ordering::Relaxed);
243            if head_updated {
244                Backend::pend_interrupt()
245            }
246        }
247
248        Poll::Pending
249    }
250}
251
252impl<Backend: TimerQueueBackend> Drop for Delay<'_, Backend> {
253    fn drop(&mut self) {
254        // SAFETY: Drop cannot be run at the same time as poll, so we can't end up
255        // derefencing this concurrently to the one in `poll`.
256        match self.link_ptr.as_ref() {
257            None => return,
258            // If it was popped from the queue there is no need to run delete
259            Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
260            _ => {}
261        }
262        self.queue.delete(self.marker.load(Ordering::Relaxed));
263    }
264}
265
266/// Future returned by `timeout` and `timeout_at`.
267pub struct Timeout<'q, Backend: TimerQueueBackend, F> {
268    delay: Delay<'q, Backend>,
269    future: F,
270}
271
272impl<Backend: TimerQueueBackend, F: Future> Future for Timeout<'_, Backend, F> {
273    type Output = Result<F::Output, TimeoutError>;
274
275    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
276        let inner = unsafe { self.get_unchecked_mut() };
277
278        {
279            let f = unsafe { Pin::new_unchecked(&mut inner.future) };
280            if let Poll::Ready(v) = f.poll(cx) {
281                return Poll::Ready(Ok(v));
282            }
283        }
284
285        {
286            let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
287            if d.poll(cx).is_ready() {
288                return Poll::Ready(Err(TimeoutError));
289            }
290        }
291
292        Poll::Pending
293    }
294}