1mod buffer;
28mod raw_pool_buf_io;
29
30use std::collections::VecDeque;
31use std::ops::Deref;
32use std::ops::DerefMut;
33use std::sync::atomic::AtomicUsize;
34use std::sync::atomic::Ordering;
35
36use crossbeam::queue::SegQueue;
37
38pub use crate::buffer::*;
39pub use crate::raw_pool_buf_io::*;
40
41#[derive(Debug)]
43pub struct Pool<const S: usize, T: 'static> {
44    queues: [QueueShard<T>; S],
46    next_shard: AtomicUsize,
48}
49
50#[derive(Debug)]
51struct QueueShard<T> {
52    queue: SegQueue<T>,
54    elem_cnt: AtomicUsize,
56    trim: usize,
59    max: usize,
61}
62
63impl<T> QueueShard<T> {
64    const fn new(trim: usize, max: usize) -> Self {
65        QueueShard {
66            queue: SegQueue::new(),
67            elem_cnt: AtomicUsize::new(0),
68            trim,
69            max,
70        }
71    }
72}
73
74#[derive(Debug)]
76pub struct Pooled<T: Default + Reuse + 'static> {
77    inner: T,
78    pool: &'static QueueShard<T>,
79}
80
81impl<T: Default + Reuse> Pooled<T> {
82    pub fn into_inner(mut self) -> T {
83        std::mem::take(&mut self.inner)
84    }
85}
86
87impl<T: Default + Reuse> Drop for Pooled<T> {
88    fn drop(&mut self) {
89        let QueueShard {
90            queue,
91            elem_cnt,
92            trim,
93            max,
94        } = self.pool;
95        if self.inner.reuse(*trim) {
96            if elem_cnt.fetch_add(1, Ordering::Acquire) < *max {
97                queue.push(std::mem::take(&mut self.inner));
100                return;
101            }
102            elem_cnt.fetch_sub(1, Ordering::Release);
105        }
106        }
108}
109
110macro_rules! array_impl_new_queues {
114    {$n:expr, $t:ident $($ts:ident)*} => {
115        impl<$t: Default + Reuse> Pool<{$n}, $t> {
116            #[allow(dead_code)]
117            pub const fn new(limit: usize, trim: usize) -> Self {
118                let limit = limit / $n;
119                Pool {
120                    queues: [QueueShard::new(trim, limit), $(QueueShard::<$ts>::new(trim, limit)),*],
121                    next_shard: AtomicUsize::new(0),
122                }
123            }
124        }
125
126        array_impl_new_queues!{($n - 1), $($ts)*}
127    };
128    {$n:expr,} => {  };
129}
130
131array_impl_new_queues! { 32, T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T }
132
133impl<const S: usize, T: Default + Reuse> Pool<S, T> {
134    pub fn get(&'static self) -> Pooled<T> {
137        let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
138        let shard = &self.queues[shard];
139        let inner = match shard.queue.pop() {
140            Some(el) => {
141                shard.elem_cnt.fetch_sub(1, Ordering::Relaxed);
142                el
143            },
144            None => Default::default(),
145        };
146
147        Pooled { inner, pool: shard }
148    }
149
150    pub fn get_empty(&'static self) -> Pooled<T> {
154        let shard = self.next_shard.load(Ordering::Relaxed) % S;
155        let shard = &self.queues[shard];
156
157        Pooled {
158            inner: Default::default(),
159            pool: shard,
160        }
161    }
162
163    pub fn get_with(&'static self, f: impl Fn(&mut T)) -> Pooled<T> {
166        let mut pooled = self.get();
167        f(&mut pooled);
168        pooled
169    }
170
171    pub fn from_owned(&'static self, inner: T) -> Pooled<T> {
172        let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
173        let shard = &self.queues[shard];
174        Pooled { inner, pool: shard }
175    }
176}
177
178impl<'a, const S: usize, T: Default + Extend<&'a u8> + Reuse> Pool<S, T> {
179    pub fn with_slice(&'static self, v: &'a [u8]) -> Pooled<T> {
181        let mut buf = self.get();
182        buf.deref_mut().extend(v);
183        buf
184    }
185}
186
187impl<T: Default + Reuse> Deref for Pooled<T> {
188    type Target = T;
189
190    fn deref(&self) -> &Self::Target {
191        &self.inner
192    }
193}
194
195impl<T: Default + Reuse> DerefMut for Pooled<T> {
196    fn deref_mut(&mut self) -> &mut Self::Target {
197        &mut self.inner
198    }
199}
200
201pub trait Reuse {
205    fn reuse(&mut self, trim: usize) -> bool;
206}
207
208impl Reuse for Vec<u8> {
209    fn reuse(&mut self, trim: usize) -> bool {
210        self.clear();
211        self.shrink_to(trim);
212        self.capacity() > 0
213    }
214}
215
216impl Reuse for VecDeque<u8> {
217    fn reuse(&mut self, val: usize) -> bool {
218        self.clear();
219        self.shrink_to(val);
220        self.capacity() > 0
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_sharding() {
230        const SHARDS: usize = 3;
231        const MAX_IN_SHARD: usize = 2;
232
233        let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
234            SHARDS * MAX_IN_SHARD,
235            4,
236        )));
237
238        let bufs = (0..SHARDS * 4).map(|_| pool.get()).collect::<Vec<_>>();
239
240        for shard in pool.queues.iter() {
241            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
242        }
243
244        for (i, buf) in bufs.iter().enumerate() {
245            assert!(buf.is_empty());
246            assert_eq!(
248                buf.pool as *const _,
249                &pool.queues[i % SHARDS] as *const _
250            );
251        }
252
253        for shard in pool.queues.iter() {
255            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
256        }
257
258        drop(bufs);
262
263        let bufs = (0..SHARDS * 4)
265            .map(|_| pool.get_with(|b| b.extend(&[0, 1])))
266            .collect::<Vec<_>>();
267
268        for (i, buf) in bufs.iter().enumerate() {
269            assert_eq!(
271                buf.pool as *const _,
272                &pool.queues[i % SHARDS] as *const _
273            );
274            assert_eq!(&buf[..], &[0, 1]);
276        }
277
278        drop(bufs);
279
280        for shard in pool.queues.iter() {
281            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
282        }
283
284        let bufs = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
286
287        for (i, buf) in bufs.iter().enumerate() {
288            assert!(buf.is_empty());
290            assert_eq!(
292                buf.pool as *const _,
293                &pool.queues[i % SHARDS] as *const _
294            );
295        }
296
297        for shard in pool.queues.iter() {
298            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
299        }
300
301        let bufs2 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
303        for shard in pool.queues.iter() {
304            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
305        }
306
307        let bufs3 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
309        for shard in pool.queues.iter() {
310            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
311        }
312
313        drop(bufs);
315        for shard in pool.queues.iter() {
316            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
317        }
318
319        drop(bufs2);
320        for shard in pool.queues.iter() {
321            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
322        }
323
324        drop(bufs3);
325        for shard in pool.queues.iter() {
326            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
328        }
329    }
330}