tokio_quiche/metrics/
tokio_task.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Instrumentation and metrics for spawned tokio tasks.
28//!
29//! Currently, this is implemented by creating wrapper futures and wakers for
30//! the future inside of a spawned task. Ideally we would be able to move at
31//! least some of this work into tokio proper at some point, but this should be
32//! sufficient for now.
33//!
34//! This does *not* rely on the tokio-metrics crate, as that has more overhead
35//! than we would like.
36
37use crate::metrics::Metrics;
38use foundations::telemetry::TelemetryContext;
39use pin_project::pin_project;
40use std::future::Future;
41use std::pin::pin;
42use std::pin::Pin;
43use std::sync::Arc;
44use std::sync::Mutex;
45use std::task::Context;
46use std::task::Poll;
47use std::task::Wake;
48use std::task::Waker;
49use std::time::Instant;
50use task_killswitch::spawn_with_killswitch as killswitch_spawn;
51use tokio::task::JoinHandle;
52
53/// An instrumented future.
54///
55/// It's important to keep overhead low here, especially where contention is
56/// concerned.
57#[pin_project]
58struct Instrumented<F, M> {
59    #[pin]
60    future: F,
61    name: Arc<str>,
62    timer: Arc<Mutex<Option<Instant>>>,
63    metrics: M,
64}
65
66/// An instrumented waker for our instrumented future.
67///
68/// It's very important to keep overhead low here, especially where contention
69/// is concerned.
70struct InstrumentedWaker {
71    timer: Arc<Mutex<Option<Instant>>>,
72    waker: Waker,
73}
74
75impl Wake for InstrumentedWaker {
76    fn wake(self: Arc<Self>) {
77        self.wake_by_ref()
78    }
79
80    fn wake_by_ref(self: &Arc<Self>) {
81        // let's scope the guard's lifespan in case the inner waker is slow
82        // this is still highly unlikely to be contended ever
83        {
84            let mut guard = self.timer.lock().unwrap();
85
86            if guard.is_none() {
87                *guard = Some(Instant::now())
88            }
89        }
90
91        self.waker.wake_by_ref();
92    }
93}
94
95impl<F, M> Instrumented<F, M>
96where
97    M: Metrics,
98{
99    fn new(name: &str, metrics: M, future: F) -> Self {
100        let name = Arc::from(name);
101
102        Self {
103            future,
104            name,
105            metrics,
106            timer: Arc::new(Mutex::new(Some(Instant::now()))),
107        }
108    }
109}
110
111impl<F: Future, M: Metrics> Future for Instrumented<F, M> {
112    type Output = F::Output;
113
114    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115        let total_timer = Instant::now();
116
117        // if we were to hold the lock over the poll boundary, self-wakes would
118        // deadlock us, so we won't do that.
119        //
120        // this is unlikely to be contended much otherwise.
121        let maybe_schedule_timer = self.timer.lock().unwrap().take();
122
123        // for various reasons related to how rust does lifetime things, we will
124        // not acquire the lock in the if statement
125        if let Some(schedule_timer) = maybe_schedule_timer {
126            let elapsed = schedule_timer.elapsed();
127
128            self.metrics
129                .tokio_runtime_task_schedule_delay_histogram(&self.name)
130                .observe(elapsed.as_nanos() as u64);
131        }
132
133        let projected = self.project();
134
135        let waker = Waker::from(Arc::new(InstrumentedWaker {
136            timer: Arc::clone(projected.timer),
137            waker: cx.waker().clone(),
138        }));
139
140        let mut new_cx = Context::from_waker(&waker);
141
142        let timer = Instant::now();
143
144        let output = projected.future.poll(&mut new_cx);
145
146        let elapsed = timer.elapsed();
147
148        projected
149            .metrics
150            .tokio_runtime_task_poll_duration_histogram(projected.name)
151            .observe(elapsed.as_nanos() as u64);
152
153        let total_elapsed = total_timer.elapsed();
154
155        projected
156            .metrics
157            .tokio_runtime_task_total_poll_time_micros(projected.name)
158            .inc_by(total_elapsed.as_micros() as u64);
159
160        output
161    }
162}
163
164/// Spawn a potentially instrumented task.
165///
166/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
167/// instrument the task and collect metrics for it.
168pub fn spawn<M, T>(name: &str, metrics: M, future: T) -> JoinHandle<T::Output>
169where
170    T: Future + Send + 'static,
171    T::Output: Send + 'static,
172    M: Metrics,
173{
174    let ctx = TelemetryContext::current();
175
176    if cfg!(feature = "tokio-task-metrics") {
177        tokio::spawn(Instrumented::new(name, metrics, ctx.apply(future)))
178    } else {
179        tokio::spawn(ctx.apply(future))
180    }
181}
182
183/// Spawn a potentially instrumented, long-lived task. Integrates with
184/// [task-killswitch](task_killswitch).
185///
186/// Depending on whether the `tokio-task-metrics` feature is enabled, this may
187/// instrument the task and collect metrics for it.
188pub fn spawn_with_killswitch<M, T>(name: &str, metrics: M, future: T)
189where
190    T: Future<Output = ()> + Send + 'static,
191    M: Metrics,
192{
193    let ctx = TelemetryContext::current();
194
195    if cfg!(feature = "tokio-task-metrics") {
196        killswitch_spawn(Instrumented::new(name, metrics, ctx.apply(future)))
197    } else {
198        killswitch_spawn(ctx.apply(future))
199    }
200}