veecle_os_runtime/
memory_pool.rs

1//! An interrupt/thread-safe memory pool.
2//!
3//! The memory pool allows using static, stack or heap memory to store `SIZE` instances of `T`.
4//! [`MemoryPool::chunk`] provides [`Chunk`]s to interact with instances of `T`.
5//! [`Chunk`] is a pointer type, which means it is cheap to move.
6//! This makes the memory pool well suited for moving data between actors without copying.
7//! The memory pool is especially useful for large chunks of data or data that is expensive to move.
8//!
9//! [`Chunk`]s are automatically made available for re-use on drop.
10//!
11//! [`Chunk`]s can be created by:
12//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init`], which uses the provided value of `T` to initialize the
13//!   chunk. [`MemoryPool::chunk`] combines both into a single method call.
14//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init_in_place`] to initialize `T` in place.
15//!
16//! # Example
17//!
18//! ```
19//! use veecle_os_runtime::{ExclusiveReader, Never, Writer};
20//! use veecle_os_runtime::memory_pool::{Chunk, MemoryPool};
21//! use veecle_os_runtime::Storable;
22//!
23//! #[derive(Debug)]
24//! pub struct Data;
25//!
26//! impl Storable for Data {
27//!     type DataType = Chunk<'static, u8>;
28//! }
29//!
30//! #[veecle_os_runtime::actor]
31//! async fn exclusive_read_actor(mut reader: ExclusiveReader<'_, Data>) -> Never {
32//!     loop {
33//!         if let Some(chunk) = reader.take() {
34//!             println!("Chunk received: {:?}", chunk);
35//!             println!("Chunk content: {:?}", *chunk);
36//!         } else {
37//!             reader.wait_for_update().await;
38//!         }
39//!     }
40//! }
41//!
42//! #[veecle_os_runtime::actor]
43//! async fn write_actor(
44//!     mut writer: Writer<'_, Data>,
45//!     #[init_context] pool: &'static MemoryPool<u8, 5>,
46//! ) -> Never {
47//!     for index in 0..10 {
48//!         writer.write(pool.chunk(index).unwrap()).await;
49//!     }
50//! #       // Exit the application to allow doc-tests to complete.
51//! #       std::process::exit(0);
52//! }
53//!
54//! static POOL: MemoryPool<u8, 5> = MemoryPool::new();
55//!
56//! # futures::executor::block_on(
57//! #
58//! veecle_os_runtime::execute! {
59//!    actors: [
60//!        ExclusiveReadActor,
61//!        WriteActor: &POOL,
62//!    ]
63//! }
64//! # );
65//!  ```
66
67use core::cell::UnsafeCell;
68use core::fmt;
69use core::fmt::{Debug, Formatter};
70use core::mem::MaybeUninit;
71use core::ops::{Deref, DerefMut};
72use core::sync::atomic::{AtomicBool, Ordering};
73
74/// Interrupt- and thread-safe memory pool.
75///
76/// See [module-level documentation][self] for more information.
77#[derive(Debug)]
78pub struct MemoryPool<T, const SIZE: usize> {
79    chunks: [MemoryPoolInner<T>; SIZE],
80}
81
82impl<T, const SIZE: usize> Default for MemoryPool<T, SIZE> {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl<T, const SIZE: usize> MemoryPool<T, SIZE> {
89    /// Creates a new [`MemoryPool`].
90    ///
91    /// `SIZE` is required to be larger than 0.
92    pub const fn new() -> Self {
93        const {
94            assert!(SIZE > 0, "empty ObjectPool");
95        }
96
97        Self {
98            chunks: [const { MemoryPoolInner::new() }; SIZE],
99        }
100    }
101
102    /// Reserves an element in the [`MemoryPool`].
103    ///
104    /// Returns `None` if no element is available.
105    ///
106    /// The returned token has to be initialized via [`MemoryPoolToken::init`] before use.
107    /// See [`MemoryPool::chunk`] for a convenience wrapper combining reserving and initializing a [`Chunk`].
108    pub fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
109        self.chunks.iter().find_map(|chunk| chunk.reserve())
110    }
111
112    /// Retrieves a [`Chunk`] from the [`MemoryPool`] and initializes it with `init_value`.
113    ///
114    /// Returns `Err(init_value)` if no more [`Chunk`]s are available.
115    ///
116    /// Convenience wrapper combining [`MemoryPool::reserve`] and [`MemoryPoolToken::init].
117    pub fn chunk(&self, init_value: T) -> Result<Chunk<'_, T>, T> {
118        // We need to split reserving and initializing of the `Chunk` because we cannot copy the `init_value` into
119        // every `reserve` call.
120        let token = self.reserve();
121
122        if let Some(token) = token {
123            Ok(token.init(init_value))
124        } else {
125            Err(init_value)
126        }
127    }
128
129    /// Calculates the amount of chunks currently available.
130    ///
131    /// Due to accesses from interrupts and/or other threads, this value might not be correct.
132    /// Only intended for metrics.
133    pub fn chunks_available(&self) -> usize {
134        self.chunks
135            .iter()
136            .map(|chunk| usize::from(chunk.is_available()))
137            .sum()
138    }
139}
140
141// SAFETY: All accesses to the `MemoryPool` are done through the `MemoryPool::chunk` method which is synchronized by
142// atomics.
143unsafe impl<T, const N: usize> Sync for MemoryPool<T, N> {}
144
145/// Container for the `T` instance and synchronization atomic for the [`MemoryPool`].
146#[derive(Debug)]
147struct MemoryPoolInner<T> {
148    data: UnsafeCell<MaybeUninit<T>>,
149    available: AtomicBool,
150}
151
152impl<T> MemoryPoolInner<T> {
153    /// Creates a new `MemoryPoolInner`.
154    ///
155    /// Marked available and uninitialized.
156    const fn new() -> Self {
157        Self {
158            data: UnsafeCell::new(MaybeUninit::uninit()),
159            available: AtomicBool::new(true),
160        }
161    }
162
163    /// Reserves this [`MemoryPoolInner`].
164    fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
165        if self.available.swap(false, Ordering::AcqRel) {
166            Some(MemoryPoolToken { inner: Some(self) })
167        } else {
168            None
169        }
170    }
171
172    /// Returns `true` if the [`MemoryPoolInner`] is currently available.
173    fn is_available(&self) -> bool {
174        self.available.load(Ordering::Acquire)
175    }
176}
177
178/// A token reserving an element in a [`MemoryPool`] which can be initialized to create a [`Chunk`].
179#[derive(Debug)]
180pub struct MemoryPoolToken<'a, T> {
181    inner: Option<&'a MemoryPoolInner<T>>,
182}
183
184impl<'a, T> MemoryPoolToken<'a, T> {
185    /// Consumes the [`MemoryPoolToken.inner`][field@MemoryPoolToken::inner] to prevent [`MemoryPoolToken`]'s drop
186    /// implementation from making the element available.
187    fn consume(&mut self) -> (&'a mut MaybeUninit<T>, &'a AtomicBool) {
188        let Some(inner) = self.inner.take() else {
189            unreachable!("`MemoryPoolToken` should only be consumed once");
190        };
191
192        let inner_data = {
193            let inner_data_ptr = inner.data.get();
194            // SAFETY:
195            // - `UnsafeCell` has the same layout as its content, thus the `chunk_ptr` points to an aligned and valid
196            //   value of `MaybeUninit<T>`.
197            // - We ensure via the `ChunkMetadata` that only this single mutable reference to the content of the
198            //   `UnsafeCell` exists.
199            unsafe { inner_data_ptr.as_mut() }
200                .expect("pointer to the contents of an `UnsafeCell` should not be null")
201        };
202
203        (inner_data, &inner.available)
204    }
205
206    /// Consumes and turns the [`MemoryPoolToken`] into an initialized [`Chunk`].
207    pub fn init(mut self, init_value: T) -> Chunk<'a, T> {
208        let (inner_data, available) = self.consume();
209
210        inner_data.write(init_value);
211
212        // SAFETY:
213        // `inner_data` has be initialized by writing the `init_value`.
214        unsafe { Chunk::new(inner_data, available) }
215    }
216
217    /// Initializes a [`Chunk`] in place via `init_function`.
218    ///
219    /// # Safety
220    ///
221    /// `init_function` must initialize the passed parameter to a valid `T` before the function returns.
222    pub unsafe fn init_in_place(
223        mut self,
224        init_function: impl FnOnce(&mut MaybeUninit<T>),
225    ) -> Chunk<'a, T> {
226        let (inner_data, available) = self.consume();
227
228        init_function(inner_data);
229
230        // SAFETY:
231        // `inner_data` has be initialized by `init_function`.
232        unsafe { Chunk::new(inner_data, available) }
233    }
234}
235
236impl<T> Drop for MemoryPoolToken<'_, T> {
237    fn drop(&mut self) {
238        if let Some(inner) = self.inner.take() {
239            inner.available.store(true, Ordering::Release);
240        }
241    }
242}
243
244/// A pointer type pointing to an instance of `T` in a [`MemoryPool`].
245///
246/// See [module-level documentation][self] for more information.
247pub struct Chunk<'a, T> {
248    // We're using `&mut MaybeUninit<T>` instead of `&mut T` to be able to drop `T` without going through a pointer
249    // while only having a reference.
250    // We cannot drop the contents of a reference without creating a dangling reference in the `Drop` implementation.
251    inner: &'a mut MaybeUninit<T>,
252    // Only held to ensure the chunk is made available on drop.
253    token: &'a AtomicBool,
254}
255
256// Required so `Chunk` can be used in `yoke::Yoke` as the cart.
257// SAFETY: While `Chunk` has a reference to its assigned memory location in the `MemoryPool`,
258// the address of that memory cannot change as a reference to the `MemoryPool` instance is held.
259// With that, the address returned by the `Deref` and `DerefMut` implementations
260// are stable for the duration of the lifetime of `Chunk`.
261unsafe impl<'a, T> stable_deref_trait::StableDeref for Chunk<'a, T> {}
262
263impl<T> Debug for Chunk<'_, T>
264where
265    T: Debug,
266{
267    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
268        Debug::fmt(&**self, f)
269    }
270}
271
272impl<'a, T> Chunk<'a, T> {
273    /// Creates a new [`Chunk`].
274    ///
275    /// # Safety
276    ///
277    /// The `chunk` must be initialized.
278    unsafe fn new(chunk: &'a mut MaybeUninit<T>, token: &'a AtomicBool) -> Self {
279        Self {
280            inner: chunk,
281            token,
282        }
283    }
284}
285
286impl<T> Deref for Chunk<'_, T> {
287    type Target = T;
288
289    fn deref(&self) -> &Self::Target {
290        // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
291        // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
292        // initialized here.
293        unsafe { self.inner.assume_init_ref() }
294    }
295}
296
297impl<T> DerefMut for Chunk<'_, T> {
298    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
299        // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
300        // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
301        // initialized here.
302        unsafe { self.inner.assume_init_mut() }
303    }
304}
305
306impl<T> Drop for Chunk<'_, T> {
307    fn drop(&mut self) {
308        // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
309        // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
310        // initialized here.
311        unsafe { self.inner.assume_init_drop() };
312        debug_assert!(
313            !self.token.swap(true, Ordering::AcqRel),
314            "chunk was made available a second time"
315        );
316    }
317}
318
319#[cfg(test)]
320#[cfg_attr(coverage_nightly, coverage(off))]
321mod test {
322    use std::format;
323    use std::sync::atomic::AtomicUsize;
324
325    use super::*;
326
327    #[test]
328    fn pool() {
329        static POOL: MemoryPool<[u8; 10], 2> = MemoryPool::new();
330
331        let mut chunk = POOL.chunk([0; 10]).unwrap();
332        let chunk1 = POOL.chunk([0; 10]).unwrap();
333        assert!(POOL.chunk([0; 10]).is_err());
334        assert_eq!(chunk[0], 0);
335        chunk[0] += 1;
336        assert_eq!(chunk[0], 1);
337        assert_eq!(chunk1[0], 0);
338    }
339
340    #[test]
341    fn drop_test() {
342        #[derive(Debug)]
343        pub struct Dropper {}
344        impl Drop for Dropper {
345            fn drop(&mut self) {
346                COUNTER.fetch_add(1, Ordering::Relaxed);
347            }
348        }
349
350        static COUNTER: AtomicUsize = AtomicUsize::new(0);
351
352        {
353            let pool: MemoryPool<Dropper, 2> = MemoryPool::new();
354
355            let _ = pool.chunk(Dropper {});
356            assert_eq!(COUNTER.load(Ordering::Relaxed), 1);
357
358            {
359                let _dropper1 = pool.chunk(Dropper {}).unwrap();
360                let _dropper2 = pool.chunk(Dropper {}).unwrap();
361                assert!(pool.chunk(Dropper {}).is_err());
362            }
363            assert_eq!(COUNTER.load(Ordering::Relaxed), 4);
364            let _ = pool.chunk(Dropper {});
365            assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
366        }
367
368        // After dropping `pool`, there were no additional drops of the contained type.
369        assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
370    }
371
372    #[test]
373    fn drop_memory_pool_token() {
374        let pool = MemoryPool::<usize, 1>::new();
375        assert_eq!(pool.chunks_available(), 1);
376        {
377            let _token = pool.reserve().unwrap();
378            assert_eq!(pool.chunks_available(), 0);
379        }
380        assert_eq!(pool.chunks_available(), 1);
381    }
382
383    #[test]
384    fn chunks_available() {
385        let pool = MemoryPool::<usize, 2>::new();
386        assert_eq!(pool.chunks_available(), 2);
387        {
388            let _chunk = pool.chunk(0);
389            assert_eq!(pool.chunks_available(), 1);
390            let _chunk = pool.chunk(0);
391            assert_eq!(pool.chunks_available(), 0);
392        }
393        assert_eq!(pool.chunks_available(), 2);
394    }
395
396    #[test]
397    fn reserve_init() {
398        let pool = MemoryPool::<usize, 2>::new();
399        let token = pool.reserve().unwrap();
400        let chunk = token.init(2);
401        assert_eq!(*chunk, 2);
402    }
403
404    #[test]
405    fn reserve_init_in_place() {
406        let pool = MemoryPool::<usize, 2>::new();
407        let token = pool.reserve().unwrap();
408        // SAFETY: The passed closure initializes the chunk correctly.
409        let chunk = unsafe {
410            token.init_in_place(|m| {
411                m.write(2);
412            })
413        };
414        assert_eq!(*chunk, 2);
415    }
416
417    #[test]
418    #[should_panic(expected = "`MemoryPoolToken` should only be consumed once")]
419    fn consume_none() {
420        let pool = MemoryPool::<usize, 2>::new();
421        let mut token = pool.reserve().unwrap();
422        let _ = token.consume();
423        let _ = token.consume();
424    }
425
426    /// Ensures the `MemoryPool` and `Chunk` don't lose their `Send` & `Sync` auto trait implementations when
427    /// refactoring.
428    #[test]
429    fn send_sync() {
430        fn send<T>()
431        where
432            T: Send,
433        {
434        }
435        fn sync<T>()
436        where
437            T: Sync,
438        {
439        }
440        send::<MemoryPool<[u8; 10], 2>>();
441        sync::<MemoryPool<[u8; 10], 2>>();
442
443        send::<Chunk<[u8; 10]>>();
444        sync::<Chunk<[u8; 10]>>();
445    }
446
447    #[test]
448    fn debug_chunk() {
449        let pool = MemoryPool::<usize, 2>::new();
450        let chunk = pool.chunk(0).unwrap();
451        assert_eq!(format!("{chunk:?}"), "0");
452    }
453
454    #[test]
455    fn default_memory_pool() {
456        let pool: MemoryPool<usize, 2> = MemoryPool::default();
457        assert_eq!(pool.chunks_available(), 2);
458    }
459}