tokio_quiche/metrics/tokio_task.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Instrumentation and metrics for spawned tokio tasks.
28//!
29//! Currently, this is implemented by creating wrapper futures and wakers for
30//! the future inside of a spawned task. Ideally we would be able to move at
31//! least some of this work into tokio proper at some point, but this should be
32//! sufficient for now.
33//!
34//! This does *not* rely on the tokio-metrics crate, as that has more overhead
35//! than we would like.
36
37use crate::metrics::Metrics;
38use foundations::telemetry::TelemetryContext;
39use pin_project::pin_project;
40use std::future::Future;
41use std::pin::pin;
42use std::pin::Pin;
43use std::sync::Arc;
44use std::sync::Mutex;
45use std::task::Context;
46use std::task::Poll;
47use std::task::Wake;
48use std::task::Waker;
49use std::time::Instant;
50use task_killswitch::spawn_with_killswitch as killswitch_spawn;
51use tokio::task::JoinHandle;
52
53/// An instrumented future.
54///
55/// It's important to keep overhead low here, especially where contention is
56/// concerned.
57#[pin_project]
58struct Instrumented<F, M> {
59 #[pin]
60 future: F,
61 name: Arc<str>,
62 timer: Arc<Mutex<Option<Instant>>>,
63 metrics: M,
64}
65
66/// An instrumented waker for our instrumented future.
67///
68/// It's very important to keep overhead low here, especially where contention
69/// is concerned.
70struct InstrumentedWaker {
71 timer: Arc<Mutex<Option<Instant>>>,
72 waker: Waker,
73}
74
75impl Wake for InstrumentedWaker {
76 fn wake(self: Arc<Self>) {
77 self.wake_by_ref()
78 }
79
80 fn wake_by_ref(self: &Arc<Self>) {
81 // let's scope the guard's lifespan in case the inner waker is slow
82 // this is still highly unlikely to be contended ever
83 {
84 let mut guard = self.timer.lock().unwrap();
85
86 if guard.is_none() {
87 *guard = Some(Instant::now())
88 }
89 }
90
91 self.waker.wake_by_ref();
92 }
93}
94
95impl<F, M> Instrumented<F, M>
96where
97 M: Metrics,
98{
99 fn new(name: &str, metrics: M, future: F) -> Self {
100 let name = Arc::from(name);
101
102 Self {
103 future,
104 name,
105 metrics,
106 timer: Arc::new(Mutex::new(Some(Instant::now()))),
107 }
108 }
109}
110
111impl<F: Future, M: Metrics> Future for Instrumented<F, M> {
112 type Output = F::Output;
113
114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115 let total_timer = Instant::now();
116
117 // if we were to hold the lock over the poll boundary, self-wakes would
118 // deadlock us, so we won't do that.
119 //
120 // this is unlikely to be contended much otherwise.
121 let maybe_schedule_timer = self.timer.lock().unwrap().take();
122
123 // for various reasons related to how rust does lifetime things, we will
124 // not acquire the lock in the if statement
125 if let Some(schedule_timer) = maybe_schedule_timer {
126 let elapsed = schedule_timer.elapsed();
127
128 self.metrics
129 .tokio_runtime_task_schedule_delay_histogram(&self.name)
130 .observe(elapsed.as_nanos() as u64);
131 }
132
133 let projected = self.project();
134
135 let waker = Waker::from(Arc::new(InstrumentedWaker {
136 timer: Arc::clone(projected.timer),
137 waker: cx.waker().clone(),
138 }));
139
140 let mut new_cx = Context::from_waker(&waker);
141
142 let timer = Instant::now();
143
144 let output = projected.future.poll(&mut new_cx);
145
146 let elapsed = timer.elapsed();
147
148 projected
149 .metrics
150 .tokio_runtime_task_poll_duration_histogram(projected.name)
151 .observe(elapsed.as_nanos() as u64);
152
153 let total_elapsed = total_timer.elapsed();
154
155 projected
156 .metrics
157 .tokio_runtime_task_total_poll_time_micros(projected.name)
158 .inc_by(total_elapsed.as_micros() as u64);
159
160 output
161 }
162}
163
164/// Spawn a potentially instrumented task.
165///
166/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
167/// instrument the task and collect metrics for it.
168pub fn spawn<M, T>(name: &str, metrics: M, future: T) -> JoinHandle<T::Output>
169where
170 T: Future + Send + 'static,
171 T::Output: Send + 'static,
172 M: Metrics,
173{
174 let ctx = TelemetryContext::current();
175
176 if cfg!(feature = "tokio-task-metrics") {
177 tokio::spawn(Instrumented::new(name, metrics, ctx.apply(future)))
178 } else {
179 tokio::spawn(ctx.apply(future))
180 }
181}
182
183/// Spawn a potentially instrumented, long-lived task. Integrates with
184/// [task-killswitch](task_killswitch).
185///
186/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
187/// instrument the task and collect metrics for it.
188pub fn spawn_with_killswitch<M, T>(name: &str, metrics: M, future: T)
189where
190 T: Future<Output = ()> + Send + 'static,
191 M: Metrics,
192{
193 let ctx = TelemetryContext::current();
194
195 if cfg!(feature = "tokio-task-metrics") {
196 killswitch_spawn(Instrumented::new(name, metrics, ctx.apply(future)))
197 } else {
198 killswitch_spawn(ctx.apply(future))
199 }
200}