tokio_quiche/metrics/
tokio_task.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Instrumentation and metrics for spawned tokio tasks.
28//!
29//! Currently, this is implemented by creating wrapper futures and wakers for
30//! the future inside of a spawned task. Ideally we would be able to move at
31//! least some of this work into tokio proper at some point, but this should be
32//! sufficient for now.
33//!
34//! This does *not* rely on the tokio-metrics crate, as that has more overhead
35//! than we would like.
36
37use crate::metrics::Metrics;
38use foundations::telemetry::TelemetryContext;
39use pin_project::pin_project;
40use std::future::Future;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::sync::Mutex;
44use std::task::Context;
45use std::task::Poll;
46use std::task::Wake;
47use std::task::Waker;
48use std::time::Instant;
49use task_killswitch::spawn_with_killswitch as killswitch_spawn;
50use tokio::task::JoinHandle;
51
52/// An instrumented future.
53///
54/// It's important to keep overhead low here, especially where contention is
55/// concerned.
56#[pin_project]
57struct Instrumented<F, M> {
58    #[pin]
59    future: F,
60    name: Arc<str>,
61    timer: Arc<Mutex<Option<Instant>>>,
62    metrics: M,
63}
64
65/// An instrumented waker for our instrumented future.
66///
67/// It's very important to keep overhead low here, especially where contention
68/// is concerned.
69struct InstrumentedWaker {
70    timer: Arc<Mutex<Option<Instant>>>,
71    waker: Waker,
72}
73
74impl Wake for InstrumentedWaker {
75    fn wake(self: Arc<Self>) {
76        self.wake_by_ref()
77    }
78
79    fn wake_by_ref(self: &Arc<Self>) {
80        // let's scope the guard's lifespan in case the inner waker is slow
81        // this is still highly unlikely to be contended ever
82        {
83            let mut guard = self.timer.lock().unwrap();
84
85            if guard.is_none() {
86                *guard = Some(Instant::now())
87            }
88        }
89
90        self.waker.wake_by_ref();
91    }
92}
93
94impl<F, M> Instrumented<F, M>
95where
96    M: Metrics,
97{
98    fn new(name: &str, metrics: M, future: F) -> Self {
99        let name = Arc::from(name);
100
101        Self {
102            future,
103            name,
104            metrics,
105            timer: Arc::new(Mutex::new(Some(Instant::now()))),
106        }
107    }
108}
109
110impl<F: Future, M: Metrics> Future for Instrumented<F, M> {
111    type Output = F::Output;
112
113    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114        let total_timer = Instant::now();
115
116        // if we were to hold the lock over the poll boundary, self-wakes would
117        // deadlock us, so we won't do that.
118        //
119        // this is unlikely to be contended much otherwise.
120        let maybe_schedule_timer = self.timer.lock().unwrap().take();
121
122        // for various reasons related to how rust does lifetime things, we will
123        // not acquire the lock in the if statement
124        if let Some(schedule_timer) = maybe_schedule_timer {
125            let elapsed = schedule_timer.elapsed();
126
127            self.metrics
128                .tokio_runtime_task_schedule_delay_histogram(&self.name)
129                .observe(elapsed.as_nanos() as u64);
130        }
131
132        let projected = self.project();
133
134        let waker = Waker::from(Arc::new(InstrumentedWaker {
135            timer: Arc::clone(projected.timer),
136            waker: cx.waker().clone(),
137        }));
138
139        let mut new_cx = Context::from_waker(&waker);
140
141        let timer = Instant::now();
142
143        let output = projected.future.poll(&mut new_cx);
144
145        let elapsed = timer.elapsed();
146
147        projected
148            .metrics
149            .tokio_runtime_task_poll_duration_histogram(projected.name)
150            .observe(elapsed.as_nanos() as u64);
151
152        let total_elapsed = total_timer.elapsed();
153
154        projected
155            .metrics
156            .tokio_runtime_task_total_poll_time_micros(projected.name)
157            .inc_by(total_elapsed.as_micros() as u64);
158
159        output
160    }
161}
162
163/// Spawn a potentially instrumented task.
164///
165/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
166/// instrument the task and collect metrics for it.
167pub fn spawn<M, T>(name: &str, metrics: M, future: T) -> JoinHandle<T::Output>
168where
169    T: Future + Send + 'static,
170    T::Output: Send + 'static,
171    M: Metrics,
172{
173    let ctx = TelemetryContext::current();
174
175    if cfg!(feature = "tokio-task-metrics") {
176        tokio::spawn(Instrumented::new(name, metrics, ctx.apply(future)))
177    } else {
178        tokio::spawn(ctx.apply(future))
179    }
180}
181
182/// Spawn a potentially instrumented, long-lived task. Integrates with
183/// [task-killswitch](task_killswitch).
184///
185/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
186/// instrument the task and collect metrics for it.
187pub fn spawn_with_killswitch<M, T>(name: &str, metrics: M, future: T)
188where
189    T: Future<Output = ()> + Send + 'static,
190    M: Metrics,
191{
192    let ctx = TelemetryContext::current();
193
194    if cfg!(feature = "tokio-task-metrics") {
195        killswitch_spawn(Instrumented::new(name, metrics, ctx.apply(future)))
196    } else {
197        killswitch_spawn(ctx.apply(future))
198    }
199}