1mod buffer;
28mod raw_pool_buf_io;
29
30use std::collections::VecDeque;
31use std::ops::Deref;
32use std::ops::DerefMut;
33use std::sync::atomic::AtomicUsize;
34use std::sync::atomic::Ordering;
35
36use crossbeam::queue::SegQueue;
37
38pub use crate::buffer::*;
39pub use crate::raw_pool_buf_io::*;
40
41#[derive(Debug)]
43pub struct Pool<const S: usize, T: 'static> {
44 queues: [QueueShard<T>; S],
46 next_shard: AtomicUsize,
48}
49
50#[derive(Debug)]
51struct QueueShard<T> {
52 queue: SegQueue<T>,
54 elem_cnt: AtomicUsize,
56 trim: usize,
59 max: usize,
61}
62
63impl<T> QueueShard<T> {
64 const fn new(trim: usize, max: usize) -> Self {
65 QueueShard {
66 queue: SegQueue::new(),
67 elem_cnt: AtomicUsize::new(0),
68 trim,
69 max,
70 }
71 }
72}
73
74#[derive(Debug)]
76pub struct Pooled<T: Default + Reuse + 'static> {
77 inner: T,
78 pool: &'static QueueShard<T>,
79}
80
81impl<T: Default + Reuse> Pooled<T> {
82 pub fn into_inner(mut self) -> T {
83 std::mem::take(&mut self.inner)
84 }
85}
86
87impl<T: Default + Reuse> Drop for Pooled<T> {
88 fn drop(&mut self) {
89 let QueueShard {
90 queue,
91 elem_cnt,
92 trim,
93 max,
94 } = self.pool;
95 if self.inner.reuse(*trim) {
96 if elem_cnt.fetch_add(1, Ordering::Acquire) < *max {
97 queue.push(std::mem::take(&mut self.inner));
100 return;
101 }
102 elem_cnt.fetch_sub(1, Ordering::Release);
105 }
106 }
108}
109
110macro_rules! array_impl_new_queues {
114 {$n:expr, $t:ident $($ts:ident)*} => {
115 impl<$t: Default + Reuse> Pool<{$n}, $t> {
116 #[allow(dead_code)]
117 pub const fn new(limit: usize, trim: usize) -> Self {
118 let limit = limit / $n;
119 Pool {
120 queues: [QueueShard::new(trim, limit), $(QueueShard::<$ts>::new(trim, limit)),*],
121 next_shard: AtomicUsize::new(0),
122 }
123 }
124 }
125
126 array_impl_new_queues!{($n - 1), $($ts)*}
127 };
128 {$n:expr,} => { };
129}
130
131array_impl_new_queues! { 32, T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T }
132
133impl<const S: usize, T: Default + Reuse> Pool<S, T> {
134 pub fn get(&'static self) -> Pooled<T> {
137 let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
138 let shard = &self.queues[shard];
139 let inner = match shard.queue.pop() {
140 Some(el) => {
141 shard.elem_cnt.fetch_sub(1, Ordering::Relaxed);
142 el
143 },
144 None => Default::default(),
145 };
146
147 Pooled { inner, pool: shard }
148 }
149
150 pub fn get_empty(&'static self) -> Pooled<T> {
154 let shard = self.next_shard.load(Ordering::Relaxed) % S;
155 let shard = &self.queues[shard];
156
157 Pooled {
158 inner: Default::default(),
159 pool: shard,
160 }
161 }
162
163 pub fn get_with(&'static self, f: impl Fn(&mut T)) -> Pooled<T> {
166 let mut pooled = self.get();
167 f(&mut pooled);
168 pooled
169 }
170
171 pub fn from_owned(&'static self, inner: T) -> Pooled<T> {
172 let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
173 let shard = &self.queues[shard];
174 Pooled { inner, pool: shard }
175 }
176}
177
178impl<'a, const S: usize, T: Default + Extend<&'a u8> + Reuse> Pool<S, T> {
179 pub fn with_slice(&'static self, v: &'a [u8]) -> Pooled<T> {
181 let mut buf = self.get();
182 buf.deref_mut().extend(v);
183 buf
184 }
185}
186
187impl<T: Default + Reuse> Deref for Pooled<T> {
188 type Target = T;
189
190 fn deref(&self) -> &Self::Target {
191 &self.inner
192 }
193}
194
195impl<T: Default + Reuse> DerefMut for Pooled<T> {
196 fn deref_mut(&mut self) -> &mut Self::Target {
197 &mut self.inner
198 }
199}
200
201pub trait Reuse {
205 fn reuse(&mut self, trim: usize) -> bool;
206}
207
208impl Reuse for Vec<u8> {
209 fn reuse(&mut self, trim: usize) -> bool {
210 self.clear();
211 self.shrink_to(trim);
212 self.capacity() > 0
213 }
214}
215
216impl Reuse for VecDeque<u8> {
217 fn reuse(&mut self, val: usize) -> bool {
218 self.clear();
219 self.shrink_to(val);
220 self.capacity() > 0
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 #[test]
229 fn test_sharding() {
230 const SHARDS: usize = 3;
231 const MAX_IN_SHARD: usize = 2;
232
233 let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
234 SHARDS * MAX_IN_SHARD,
235 4,
236 )));
237
238 let bufs = (0..SHARDS * 4).map(|_| pool.get()).collect::<Vec<_>>();
239
240 for shard in pool.queues.iter() {
241 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
242 }
243
244 for (i, buf) in bufs.iter().enumerate() {
245 assert!(buf.is_empty());
246 assert_eq!(
248 buf.pool as *const _,
249 &pool.queues[i % SHARDS] as *const _
250 );
251 }
252
253 for shard in pool.queues.iter() {
255 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
256 }
257
258 drop(bufs);
262
263 let bufs = (0..SHARDS * 4)
265 .map(|_| pool.get_with(|b| b.extend(&[0, 1])))
266 .collect::<Vec<_>>();
267
268 for (i, buf) in bufs.iter().enumerate() {
269 assert_eq!(
271 buf.pool as *const _,
272 &pool.queues[i % SHARDS] as *const _
273 );
274 assert_eq!(&buf[..], &[0, 1]);
276 }
277
278 drop(bufs);
279
280 for shard in pool.queues.iter() {
281 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
282 }
283
284 let bufs = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
286
287 for (i, buf) in bufs.iter().enumerate() {
288 assert!(buf.is_empty());
290 assert_eq!(
292 buf.pool as *const _,
293 &pool.queues[i % SHARDS] as *const _
294 );
295 }
296
297 for shard in pool.queues.iter() {
298 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
299 }
300
301 let bufs2 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
303 for shard in pool.queues.iter() {
304 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
305 }
306
307 let bufs3 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
309 for shard in pool.queues.iter() {
310 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
311 }
312
313 drop(bufs);
315 for shard in pool.queues.iter() {
316 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
317 }
318
319 drop(bufs2);
320 for shard in pool.queues.iter() {
321 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
322 }
323
324 drop(bufs3);
325 for shard in pool.queues.iter() {
326 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
328 }
329 }
330}