buffer_pool/
lib.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod buffer;
28mod raw_pool_buf_io;
29
30use std::collections::VecDeque;
31use std::ops::Deref;
32use std::ops::DerefMut;
33use std::sync::atomic::AtomicUsize;
34use std::sync::atomic::Ordering;
35
36use crossbeam::queue::SegQueue;
37
38pub use crate::buffer::*;
39pub use crate::raw_pool_buf_io::*;
40
41/// A sharded pool of elements.
42#[derive(Debug)]
43pub struct Pool<const S: usize, T: 'static> {
44    /// List of distinct shards to reduce contention.
45    queues: [QueueShard<T>; S],
46    /// The index of the next shard to use, in round-robin order.
47    next_shard: AtomicUsize,
48}
49
50#[derive(Debug)]
51struct QueueShard<T> {
52    /// The inner stack of pooled values.
53    queue: SegQueue<T>,
54    /// The number of elements currently stored in this shard.
55    elem_cnt: AtomicUsize,
56    /// The value to use when calling [`Reuse::reuse`]. Typically the capacity
57    /// to keep in a reused buffer.
58    trim: usize,
59    /// The max number of values to keep in the shard.
60    max: usize,
61}
62
63impl<T> QueueShard<T> {
64    const fn new(trim: usize, max: usize) -> Self {
65        QueueShard {
66            queue: SegQueue::new(),
67            elem_cnt: AtomicUsize::new(0),
68            trim,
69            max,
70        }
71    }
72}
73
74/// A value borrowed from the [`Pool`] that can be dereferenced to `T`.
75#[derive(Debug)]
76pub struct Pooled<T: Default + Reuse + 'static> {
77    inner: T,
78    pool: &'static QueueShard<T>,
79}
80
81impl<T: Default + Reuse> Pooled<T> {
82    pub fn into_inner(mut self) -> T {
83        std::mem::take(&mut self.inner)
84    }
85}
86
87impl<T: Default + Reuse> Drop for Pooled<T> {
88    fn drop(&mut self) {
89        let QueueShard {
90            queue,
91            elem_cnt,
92            trim,
93            max,
94        } = self.pool;
95        if self.inner.reuse(*trim) {
96            if elem_cnt.fetch_add(1, Ordering::Acquire) < *max {
97                // If returning the element to the queue would not exceed max
98                // number of elements, return it
99                queue.push(std::mem::take(&mut self.inner));
100                return;
101            }
102            // There was no room for the buffer, return count to previous value
103            // and drop
104            elem_cnt.fetch_sub(1, Ordering::Release);
105        }
106        // If item did not qualify for return, drop it
107    }
108}
109
110// Currently there is no way to const init an array that does not implement
111// Copy, so this macro generates initializators for up to 32 shards. If
112// const Default is ever stabilized this will all go away.
113macro_rules! array_impl_new_queues {
114    {$n:expr, $t:ident $($ts:ident)*} => {
115        impl<$t: Default + Reuse> Pool<{$n}, $t> {
116            #[allow(dead_code)]
117            pub const fn new(limit: usize, trim: usize) -> Self {
118                let limit = limit / $n;
119                Pool {
120                    queues: [QueueShard::new(trim, limit), $(QueueShard::<$ts>::new(trim, limit)),*],
121                    next_shard: AtomicUsize::new(0),
122                }
123            }
124        }
125
126        array_impl_new_queues!{($n - 1), $($ts)*}
127    };
128    {$n:expr,} => {  };
129}
130
131array_impl_new_queues! { 32, T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T }
132
133impl<const S: usize, T: Default + Reuse> Pool<S, T> {
134    /// Get a value from the pool, or create a new default value if the
135    /// assigned shard is currently empty.
136    pub fn get(&'static self) -> Pooled<T> {
137        let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
138        let shard = &self.queues[shard];
139        let inner = match shard.queue.pop() {
140            Some(el) => {
141                shard.elem_cnt.fetch_sub(1, Ordering::Relaxed);
142                el
143            },
144            None => Default::default(),
145        };
146
147        Pooled { inner, pool: shard }
148    }
149
150    /// Create a new default value assigned for a pool, if it is ends up
151    /// being expanded and eligible for reuse it will return to the pool,
152    /// otherwise it will end up being dropped.
153    pub fn get_empty(&'static self) -> Pooled<T> {
154        let shard = self.next_shard.load(Ordering::Relaxed) % S;
155        let shard = &self.queues[shard];
156
157        Pooled {
158            inner: Default::default(),
159            pool: shard,
160        }
161    }
162
163    /// Get a value from the pool and apply the provided transformation on
164    /// it before returning.
165    pub fn get_with(&'static self, f: impl Fn(&mut T)) -> Pooled<T> {
166        let mut pooled = self.get();
167        f(&mut pooled);
168        pooled
169    }
170
171    pub fn from_owned(&'static self, inner: T) -> Pooled<T> {
172        let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
173        let shard = &self.queues[shard];
174        Pooled { inner, pool: shard }
175    }
176}
177
178impl<'a, const S: usize, T: Default + Extend<&'a u8> + Reuse> Pool<S, T> {
179    /// Get a value from the pool and extend it with the provided slice.
180    pub fn with_slice(&'static self, v: &'a [u8]) -> Pooled<T> {
181        let mut buf = self.get();
182        buf.deref_mut().extend(v);
183        buf
184    }
185}
186
187impl<T: Default + Reuse> Deref for Pooled<T> {
188    type Target = T;
189
190    fn deref(&self) -> &Self::Target {
191        &self.inner
192    }
193}
194
195impl<T: Default + Reuse> DerefMut for Pooled<T> {
196    fn deref_mut(&mut self) -> &mut Self::Target {
197        &mut self.inner
198    }
199}
200
201/// A trait that prepares an item to be returned to the pool. For example
202/// clearing it. `true` is returned if the item should be returned to the pool,
203/// `false` if it should be dropped.
204pub trait Reuse {
205    fn reuse(&mut self, trim: usize) -> bool;
206}
207
208impl Reuse for Vec<u8> {
209    fn reuse(&mut self, trim: usize) -> bool {
210        self.clear();
211        self.shrink_to(trim);
212        self.capacity() > 0
213    }
214}
215
216impl Reuse for VecDeque<u8> {
217    fn reuse(&mut self, val: usize) -> bool {
218        self.clear();
219        self.shrink_to(val);
220        self.capacity() > 0
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_sharding() {
230        const SHARDS: usize = 3;
231        const MAX_IN_SHARD: usize = 2;
232
233        let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
234            SHARDS * MAX_IN_SHARD,
235            4,
236        )));
237
238        let bufs = (0..SHARDS * 4).map(|_| pool.get()).collect::<Vec<_>>();
239
240        for shard in pool.queues.iter() {
241            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
242        }
243
244        for (i, buf) in bufs.iter().enumerate() {
245            assert!(buf.is_empty());
246            // Check the buffer is sharded properly.
247            assert_eq!(
248                buf.pool as *const _,
249                &pool.queues[i % SHARDS] as *const _
250            );
251        }
252
253        // Shards are still empty.
254        for shard in pool.queues.iter() {
255            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
256        }
257
258        // Now drop the buffers, they will not go into the pool because they have
259        // no capacity, so reuse returns false. What is the point in
260        // pooling empty buffers?
261        drop(bufs);
262
263        // Get buffers with capacity next.
264        let bufs = (0..SHARDS * 4)
265            .map(|_| pool.get_with(|b| b.extend(&[0, 1])))
266            .collect::<Vec<_>>();
267
268        for (i, buf) in bufs.iter().enumerate() {
269            // Check the buffer is sharded properly.
270            assert_eq!(
271                buf.pool as *const _,
272                &pool.queues[i % SHARDS] as *const _
273            );
274            // Check that the buffer was properly extended
275            assert_eq!(&buf[..], &[0, 1]);
276        }
277
278        drop(bufs);
279
280        for shard in pool.queues.iter() {
281            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
282        }
283
284        // Now get buffers again, this time they should come from the pool.
285        let bufs = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
286
287        for (i, buf) in bufs.iter().enumerate() {
288            // Check that the buffer was properly cleared.
289            assert!(buf.is_empty());
290            // Check the buffer is sharded properly.
291            assert_eq!(
292                buf.pool as *const _,
293                &pool.queues[i % SHARDS] as *const _
294            );
295        }
296
297        for shard in pool.queues.iter() {
298            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
299        }
300
301        // Get more buffers from the pool.
302        let bufs2 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
303        for shard in pool.queues.iter() {
304            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
305        }
306
307        // Get even more buffers.
308        let bufs3 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
309        for shard in pool.queues.iter() {
310            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
311        }
312
313        // Now begin dropping.
314        drop(bufs);
315        for shard in pool.queues.iter() {
316            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
317        }
318
319        drop(bufs2);
320        for shard in pool.queues.iter() {
321            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
322        }
323
324        drop(bufs3);
325        for shard in pool.queues.iter() {
326            // Can't get over limit.
327            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
328        }
329    }
330}