1//! A generic timer queue for async executors.
23use crate::linked_list::{self, Link, LinkedList};
4use crate::TimeoutError;
56use core::future::Future;
7use core::pin::Pin;
8use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use core::task::{Poll, Waker};
1011mod backend;
12mod tick_type;
13pub use backend::TimerQueueBackend;
14pub use tick_type::TimerQueueTicks;
1516/// Holds a waker and at which time instant this waker shall be awoken.
17struct WaitingWaker<Backend: TimerQueueBackend> {
18 waker: Waker,
19 release_at: Backend::Ticks,
20 was_popped: AtomicBool,
21}
2223impl<Backend: TimerQueueBackend> Clone for WaitingWaker<Backend> {
24fn clone(&self) -> Self {
25Self {
26 waker: self.waker.clone(),
27 release_at: self.release_at,
28 was_popped: AtomicBool::new(self.was_popped.load(Ordering::Relaxed)),
29 }
30 }
31}
3233impl<Backend: TimerQueueBackend> PartialEq for WaitingWaker<Backend> {
34fn eq(&self, other: &Self) -> bool {
35self.release_at == other.release_at
36 }
37}
3839impl<Backend: TimerQueueBackend> PartialOrd for WaitingWaker<Backend> {
40fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
41Some(self.release_at.compare(other.release_at))
42 }
43}
4445/// A generic timer queue for async executors.
46///
47/// # Blocking
48///
49/// The internal priority queue uses global critical sections to manage access. This means that
50/// `await`ing a delay will cause a lock of the entire system for O(n) time. In practice the lock
51/// duration is ~10 clock cycles per element in the queue.
52///
53/// # Safety
54///
55/// This timer queue is based on an intrusive linked list, and by extension the links are stored
56/// on the async stacks of callers. The links are deallocated on `drop` or when the wait is
57/// complete.
58///
59/// Do not call `mem::forget` on an awaited future, or there will be dragons!
60pub struct TimerQueue<Backend: TimerQueueBackend> {
61 queue: LinkedList<WaitingWaker<Backend>>,
62 initialized: AtomicBool,
63}
6465impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
66fn default() -> Self {
67Self::new()
68 }
69}
7071impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
72/// Make a new queue.
73pub const fn new() -> Self {
74Self {
75 queue: LinkedList::new(),
76 initialized: AtomicBool::new(false),
77 }
78 }
7980/// Forwards the `Monotonic::now()` method.
81#[inline(always)]
82pub fn now(&self) -> Backend::Ticks {
83 Backend::now()
84 }
8586/// Takes the initialized monotonic to initialize the TimerQueue.
87pub fn initialize(&self, backend: Backend) {
88self.initialized.store(true, Ordering::SeqCst);
8990// Don't run drop on `Backend`
91core::mem::forget(backend);
92 }
9394/// Call this in the interrupt handler of the hardware timer supporting the `Monotonic`
95 ///
96 /// # Safety
97 ///
98 /// It's always safe to call, but it must only be called from the interrupt of the
99 /// monotonic timer for correct operation.
100pub unsafe fn on_monotonic_interrupt(&self) {
101 Backend::clear_compare_flag();
102 Backend::on_interrupt();
103104loop {
105let mut release_at = None;
106let head = self.queue.pop_if(|head| {
107 release_at = Some(head.release_at);
108109let should_pop = Backend::now().is_at_least(head.release_at);
110 head.was_popped.store(should_pop, Ordering::Relaxed);
111112 should_pop
113 });
114115match (head, release_at) {
116 (Some(link), _) => {
117 link.waker.wake();
118 }
119 (None, Some(instant)) => {
120 Backend::enable_timer();
121 Backend::set_compare(instant);
122123if Backend::now().is_at_least(instant) {
124// The time for the next instant passed while handling it,
125 // continue dequeueing
126continue;
127 }
128129break;
130 }
131 (None, None) => {
132// Queue is empty
133Backend::disable_timer();
134135break;
136 }
137 }
138 }
139 }
140141/// Timeout at a specific time.
142pub fn timeout_at<F: Future>(
143&self,
144 instant: Backend::Ticks,
145 future: F,
146 ) -> Timeout<'_, Backend, F> {
147 Timeout {
148 delay: Delay::<Backend> {
149 instant,
150 queue: &self.queue,
151 link_ptr: None,
152 marker: AtomicUsize::new(0),
153 },
154 future,
155 }
156 }
157158/// Timeout after at least a specific duration.
159#[inline]
160pub fn timeout_after<F: Future>(
161&self,
162 duration: Backend::Ticks,
163 future: F,
164 ) -> Timeout<'_, Backend, F> {
165let now = Backend::now();
166let mut timeout = now.wrapping_add(duration);
167if now != timeout {
168 timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
169 }
170171// Wait for one period longer, because by definition timers have an uncertainty
172 // of one period, so waiting for 'at least' needs to compensate for that.
173self.timeout_at(timeout, future)
174 }
175176/// Delay for at least some duration of time.
177#[inline]
178pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> {
179let now = Backend::now();
180let mut timeout = now.wrapping_add(duration);
181if now != timeout {
182 timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
183 }
184185// Wait for one period longer, because by definition timers have an uncertainty
186 // of one period, so waiting for 'at least' needs to compensate for that.
187self.delay_until(timeout)
188 }
189190/// Delay to some specific time instant.
191pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> {
192if !self.initialized.load(Ordering::Relaxed) {
193panic!(
194"The timer queue is not initialized with a monotonic, you need to run `initialize`"
195);
196 }
197 Delay::<Backend> {
198 instant,
199 queue: &self.queue,
200 link_ptr: None,
201 marker: AtomicUsize::new(0),
202 }
203 }
204}
205206/// Future returned by `delay` and `delay_until`.
207pub struct Delay<'q, Backend: TimerQueueBackend> {
208 instant: Backend::Ticks,
209 queue: &'q LinkedList<WaitingWaker<Backend>>,
210 link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>>,
211 marker: AtomicUsize,
212}
213214impl<Backend: TimerQueueBackend> Future for Delay<'_, Backend> {
215type Output = ();
216217fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
218// SAFETY: We ensure we never move anything out of this.
219let this = unsafe { self.get_unchecked_mut() };
220221if Backend::now().is_at_least(this.instant) {
222return Poll::Ready(());
223 }
224225// SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
226 // in `drop` we can't do this access concurrently with queue removal.
227let link = &mut this.link_ptr;
228if link.is_none() {
229let link_ref = link.insert(Link::new(WaitingWaker {
230 waker: cx.waker().clone(),
231 release_at: this.instant,
232 was_popped: AtomicBool::new(false),
233 }));
234235// SAFETY(new_unchecked): The address to the link is stable as it is defined
236 // outside this stack frame.
237 // SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
238 // the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
239 // queue on drop, which happens before the struct and thus `link_ptr` goes out of
240 // scope.
241let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
242 this.marker.store(addr, Ordering::Relaxed);
243if head_updated {
244 Backend::pend_interrupt()
245 }
246 }
247248 Poll::Pending
249 }
250}
251252impl<Backend: TimerQueueBackend> Drop for Delay<'_, Backend> {
253fn drop(&mut self) {
254// SAFETY: Drop cannot be run at the same time as poll, so we can't end up
255 // derefencing this concurrently to the one in `poll`.
256match self.link_ptr.as_ref() {
257None => return,
258// If it was popped from the queue there is no need to run delete
259Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
260_ => {}
261 }
262self.queue.delete(self.marker.load(Ordering::Relaxed));
263 }
264}
265266/// Future returned by `timeout` and `timeout_at`.
267pub struct Timeout<'q, Backend: TimerQueueBackend, F> {
268 delay: Delay<'q, Backend>,
269 future: F,
270}
271272impl<Backend: TimerQueueBackend, F: Future> Future for Timeout<'_, Backend, F> {
273type Output = Result<F::Output, TimeoutError>;
274275fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
276let inner = unsafe { self.get_unchecked_mut() };
277278 {
279let f = unsafe { Pin::new_unchecked(&mut inner.future) };
280if let Poll::Ready(v) = f.poll(cx) {
281return Poll::Ready(Ok(v));
282 }
283 }
284285 {
286let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
287if d.poll(cx).is_ready() {
288return Poll::Ready(Err(TimeoutError));
289 }
290 }
291292 Poll::Pending
293 }
294}