tokio_quiche/metrics/tokio_task.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Instrumentation and metrics for spawned tokio tasks.
28//!
29//! Currently, this is implemented by creating wrapper futures and wakers for
30//! the future inside of a spawned task. Ideally we would be able to move at
31//! least some of this work into tokio proper at some point, but this should be
32//! sufficient for now.
33//!
34//! This does *not* rely on the tokio-metrics crate, as that has more overhead
35//! than we would like.
36
37use crate::metrics::Metrics;
38use foundations::telemetry::TelemetryContext;
39use pin_project::pin_project;
40use std::future::Future;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::sync::Mutex;
44use std::task::Context;
45use std::task::Poll;
46use std::task::Wake;
47use std::task::Waker;
48use std::time::Instant;
49use task_killswitch::spawn_with_killswitch as killswitch_spawn;
50use tokio::task::JoinHandle;
51
52/// An instrumented future.
53///
54/// It's important to keep overhead low here, especially where contention is
55/// concerned.
56#[pin_project]
57struct Instrumented<F, M> {
58 #[pin]
59 future: F,
60 name: Arc<str>,
61 timer: Arc<Mutex<Option<Instant>>>,
62 metrics: M,
63}
64
65/// An instrumented waker for our instrumented future.
66///
67/// It's very important to keep overhead low here, especially where contention
68/// is concerned.
69struct InstrumentedWaker {
70 timer: Arc<Mutex<Option<Instant>>>,
71 waker: Waker,
72}
73
74impl Wake for InstrumentedWaker {
75 fn wake(self: Arc<Self>) {
76 self.wake_by_ref()
77 }
78
79 fn wake_by_ref(self: &Arc<Self>) {
80 // let's scope the guard's lifespan in case the inner waker is slow
81 // this is still highly unlikely to be contended ever
82 {
83 let mut guard = self.timer.lock().unwrap();
84
85 if guard.is_none() {
86 *guard = Some(Instant::now())
87 }
88 }
89
90 self.waker.wake_by_ref();
91 }
92}
93
94impl<F, M> Instrumented<F, M>
95where
96 M: Metrics,
97{
98 fn new(name: &str, metrics: M, future: F) -> Self {
99 let name = Arc::from(name);
100
101 Self {
102 future,
103 name,
104 metrics,
105 timer: Arc::new(Mutex::new(Some(Instant::now()))),
106 }
107 }
108}
109
110impl<F: Future, M: Metrics> Future for Instrumented<F, M> {
111 type Output = F::Output;
112
113 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114 let total_timer = Instant::now();
115
116 // if we were to hold the lock over the poll boundary, self-wakes would
117 // deadlock us, so we won't do that.
118 //
119 // this is unlikely to be contended much otherwise.
120 let maybe_schedule_timer = self.timer.lock().unwrap().take();
121
122 // for various reasons related to how rust does lifetime things, we will
123 // not acquire the lock in the if statement
124 if let Some(schedule_timer) = maybe_schedule_timer {
125 let elapsed = schedule_timer.elapsed();
126
127 self.metrics
128 .tokio_runtime_task_schedule_delay_histogram(&self.name)
129 .observe(elapsed.as_nanos() as u64);
130 }
131
132 let projected = self.project();
133
134 let waker = Waker::from(Arc::new(InstrumentedWaker {
135 timer: Arc::clone(projected.timer),
136 waker: cx.waker().clone(),
137 }));
138
139 let mut new_cx = Context::from_waker(&waker);
140
141 let timer = Instant::now();
142
143 let output = projected.future.poll(&mut new_cx);
144
145 let elapsed = timer.elapsed();
146
147 projected
148 .metrics
149 .tokio_runtime_task_poll_duration_histogram(projected.name)
150 .observe(elapsed.as_nanos() as u64);
151
152 let total_elapsed = total_timer.elapsed();
153
154 projected
155 .metrics
156 .tokio_runtime_task_total_poll_time_micros(projected.name)
157 .inc_by(total_elapsed.as_micros() as u64);
158
159 output
160 }
161}
162
163/// Spawn a potentially instrumented task.
164///
165/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
166/// instrument the task and collect metrics for it.
167pub fn spawn<M, T>(name: &str, metrics: M, future: T) -> JoinHandle<T::Output>
168where
169 T: Future + Send + 'static,
170 T::Output: Send + 'static,
171 M: Metrics,
172{
173 let ctx = TelemetryContext::current();
174
175 if cfg!(feature = "tokio-task-metrics") {
176 tokio::spawn(Instrumented::new(name, metrics, ctx.apply(future)))
177 } else {
178 tokio::spawn(ctx.apply(future))
179 }
180}
181
182/// Spawn a potentially instrumented, long-lived task. Integrates with
183/// [task-killswitch](task_killswitch).
184///
185/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
186/// instrument the task and collect metrics for it.
187pub fn spawn_with_killswitch<M, T>(name: &str, metrics: M, future: T)
188where
189 T: Future<Output = ()> + Send + 'static,
190 M: Metrics,
191{
192 let ctx = TelemetryContext::current();
193
194 if cfg!(feature = "tokio-task-metrics") {
195 killswitch_spawn(Instrumented::new(name, metrics, ctx.apply(future)))
196 } else {
197 killswitch_spawn(ctx.apply(future))
198 }
199}