1use crate::linked_list::{self, Link, LinkedList};
4use crate::TimeoutError;
5
6use core::future::Future;
7use core::pin::Pin;
8use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use core::task::{Poll, Waker};
10
11mod backend;
12mod tick_type;
13pub use backend::TimerQueueBackend;
14pub use tick_type::TimerQueueTicks;
15
16struct WaitingWaker<Backend: TimerQueueBackend> {
18 waker: Waker,
19 release_at: Backend::Ticks,
20 was_popped: AtomicBool,
21}
22
23impl<Backend: TimerQueueBackend> Clone for WaitingWaker<Backend> {
24 fn clone(&self) -> Self {
25 Self {
26 waker: self.waker.clone(),
27 release_at: self.release_at,
28 was_popped: AtomicBool::new(self.was_popped.load(Ordering::Relaxed)),
29 }
30 }
31}
32
33impl<Backend: TimerQueueBackend> PartialEq for WaitingWaker<Backend> {
34 fn eq(&self, other: &Self) -> bool {
35 self.release_at == other.release_at
36 }
37}
38
39impl<Backend: TimerQueueBackend> PartialOrd for WaitingWaker<Backend> {
40 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
41 Some(self.release_at.compare(other.release_at))
42 }
43}
44
45pub struct TimerQueue<Backend: TimerQueueBackend> {
61 queue: LinkedList<WaitingWaker<Backend>>,
62 initialized: AtomicBool,
63}
64
65impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
72 pub const fn new() -> Self {
74 Self {
75 queue: LinkedList::new(),
76 initialized: AtomicBool::new(false),
77 }
78 }
79
80 #[inline(always)]
82 pub fn now(&self) -> Backend::Ticks {
83 Backend::now()
84 }
85
86 pub fn initialize(&self, backend: Backend) {
88 self.initialized.store(true, Ordering::SeqCst);
89
90 core::mem::forget(backend);
92 }
93
94 pub unsafe fn on_monotonic_interrupt(&self) {
101 Backend::clear_compare_flag();
102 Backend::on_interrupt();
103
104 loop {
105 let mut release_at = None;
106 let head = self.queue.pop_if(|head| {
107 release_at = Some(head.release_at);
108
109 let should_pop = Backend::now().is_at_least(head.release_at);
110 head.was_popped.store(should_pop, Ordering::Relaxed);
111
112 should_pop
113 });
114
115 match (head, release_at) {
116 (Some(link), _) => {
117 link.waker.wake();
118 }
119 (None, Some(instant)) => {
120 Backend::enable_timer();
121 Backend::set_compare(instant);
122
123 if Backend::now().is_at_least(instant) {
124 continue;
127 }
128
129 break;
130 }
131 (None, None) => {
132 Backend::disable_timer();
134
135 break;
136 }
137 }
138 }
139 }
140
141 pub fn timeout_at<F: Future>(
143 &self,
144 instant: Backend::Ticks,
145 future: F,
146 ) -> Timeout<'_, Backend, F> {
147 Timeout {
148 delay: Delay::<Backend> {
149 instant,
150 queue: &self.queue,
151 link_ptr: None,
152 marker: AtomicUsize::new(0),
153 },
154 future,
155 }
156 }
157
158 #[inline]
160 pub fn timeout_after<F: Future>(
161 &self,
162 duration: Backend::Ticks,
163 future: F,
164 ) -> Timeout<'_, Backend, F> {
165 let now = Backend::now();
166 let mut timeout = now.wrapping_add(duration);
167 if now != timeout {
168 timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
169 }
170
171 self.timeout_at(timeout, future)
174 }
175
176 #[inline]
178 pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> {
179 let now = Backend::now();
180 let mut timeout = now.wrapping_add(duration);
181 if now != timeout {
182 timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
183 }
184
185 self.delay_until(timeout)
188 }
189
190 pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> {
192 if !self.initialized.load(Ordering::Relaxed) {
193 panic!(
194 "The timer queue is not initialized with a monotonic, you need to run `initialize`"
195 );
196 }
197 Delay::<Backend> {
198 instant,
199 queue: &self.queue,
200 link_ptr: None,
201 marker: AtomicUsize::new(0),
202 }
203 }
204}
205
206pub struct Delay<'q, Backend: TimerQueueBackend> {
208 instant: Backend::Ticks,
209 queue: &'q LinkedList<WaitingWaker<Backend>>,
210 link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>>,
211 marker: AtomicUsize,
212}
213
214impl<Backend: TimerQueueBackend> Future for Delay<'_, Backend> {
215 type Output = ();
216
217 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
218 let this = unsafe { self.get_unchecked_mut() };
220
221 if Backend::now().is_at_least(this.instant) {
222 return Poll::Ready(());
223 }
224
225 let link = &mut this.link_ptr;
228 if link.is_none() {
229 let link_ref = link.insert(Link::new(WaitingWaker {
230 waker: cx.waker().clone(),
231 release_at: this.instant,
232 was_popped: AtomicBool::new(false),
233 }));
234
235 let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
242 this.marker.store(addr, Ordering::Relaxed);
243 if head_updated {
244 Backend::pend_interrupt()
245 }
246 }
247
248 Poll::Pending
249 }
250}
251
252impl<Backend: TimerQueueBackend> Drop for Delay<'_, Backend> {
253 fn drop(&mut self) {
254 match self.link_ptr.as_ref() {
257 None => return,
258 Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
260 _ => {}
261 }
262 self.queue.delete(self.marker.load(Ordering::Relaxed));
263 }
264}
265
266pub struct Timeout<'q, Backend: TimerQueueBackend, F> {
268 delay: Delay<'q, Backend>,
269 future: F,
270}
271
272impl<Backend: TimerQueueBackend, F: Future> Future for Timeout<'_, Backend, F> {
273 type Output = Result<F::Output, TimeoutError>;
274
275 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
276 let inner = unsafe { self.get_unchecked_mut() };
277
278 {
279 let f = unsafe { Pin::new_unchecked(&mut inner.future) };
280 if let Poll::Ready(v) = f.poll(cx) {
281 return Poll::Ready(Ok(v));
282 }
283 }
284
285 {
286 let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
287 if d.poll(cx).is_ready() {
288 return Poll::Ready(Err(TimeoutError));
289 }
290 }
291
292 Poll::Pending
293 }
294}