veecle_os_runtime/memory_pool.rs
1//! An interrupt/thread-safe memory pool.
2//!
3//! The memory pool allows using static, stack or heap memory to store `SIZE` instances of `T`.
4//! [`MemoryPool::chunk`] provides [`Chunk`]s to interact with instances of `T`.
5//! [`Chunk`] is a pointer type, which means it is cheap to move.
6//! This makes the memory pool well suited for moving data between actors without copying.
7//! The memory pool is especially useful for large chunks of data or data that is expensive to move.
8//!
9//! [`Chunk`]s are automatically made available for re-use on drop.
10//!
11//! [`Chunk`]s can be created by:
12//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init`], which uses the provided value of `T` to initialize the
13//! chunk. [`MemoryPool::chunk`] combines both into a single method call.
14//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init_in_place`] to initialize `T` in place.
15//!
16//! # Example
17//!
18//! ```
19//! use veecle_os_runtime::{ExclusiveReader, Never, Writer};
20//! use veecle_os_runtime::memory_pool::{Chunk, MemoryPool};
21//! use veecle_os_runtime::Storable;
22//!
23//! #[derive(Debug)]
24//! pub struct Data;
25//!
26//! impl Storable for Data {
27//! type DataType = Chunk<'static, u8>;
28//! }
29//!
30//! #[veecle_os_runtime::actor]
31//! async fn exclusive_read_actor(mut reader: ExclusiveReader<'_, Data>) -> Never {
32//! loop {
33//! if let Some(chunk) = reader.take() {
34//! println!("Chunk received: {:?}", chunk);
35//! println!("Chunk content: {:?}", *chunk);
36//! } else {
37//! reader.wait_for_update().await;
38//! }
39//! }
40//! }
41//!
42//! #[veecle_os_runtime::actor]
43//! async fn write_actor(
44//! mut writer: Writer<'_, Data>,
45//! #[init_context] pool: &'static MemoryPool<u8, 5>,
46//! ) -> Never {
47//! for index in 0..10 {
48//! writer.write(pool.chunk(index).unwrap()).await;
49//! }
50//! # // Exit the application to allow doc-tests to complete.
51//! # std::process::exit(0);
52//! }
53//!
54//! static POOL: MemoryPool<u8, 5> = MemoryPool::new();
55//!
56//! # futures::executor::block_on(
57//! #
58//! veecle_os_runtime::execute! {
59//! actors: [
60//! ExclusiveReadActor,
61//! WriteActor: &POOL,
62//! ]
63//! }
64//! # );
65//! ```
66
67use core::cell::UnsafeCell;
68use core::fmt;
69use core::fmt::{Debug, Formatter};
70use core::mem::MaybeUninit;
71use core::ops::{Deref, DerefMut};
72use core::sync::atomic::{AtomicBool, Ordering};
73
74/// Interrupt- and thread-safe memory pool.
75///
76/// See [module-level documentation][self] for more information.
77#[derive(Debug)]
78pub struct MemoryPool<T, const SIZE: usize> {
79 chunks: [MemoryPoolInner<T>; SIZE],
80}
81
82impl<T, const SIZE: usize> Default for MemoryPool<T, SIZE> {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl<T, const SIZE: usize> MemoryPool<T, SIZE> {
89 /// Creates a new [`MemoryPool`].
90 ///
91 /// `SIZE` is required to be larger than 0.
92 pub const fn new() -> Self {
93 const {
94 assert!(SIZE > 0, "empty ObjectPool");
95 }
96
97 Self {
98 chunks: [const { MemoryPoolInner::new() }; SIZE],
99 }
100 }
101
102 /// Reserves an element in the [`MemoryPool`].
103 ///
104 /// Returns `None` if no element is available.
105 ///
106 /// The returned token has to be initialized via [`MemoryPoolToken::init`] before use.
107 /// See [`MemoryPool::chunk`] for a convenience wrapper combining reserving and initializing a [`Chunk`].
108 pub fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
109 self.chunks.iter().find_map(|chunk| chunk.reserve())
110 }
111
112 /// Retrieves a [`Chunk`] from the [`MemoryPool`] and initializes it with `init_value`.
113 ///
114 /// Returns `Err(init_value)` if no more [`Chunk`]s are available.
115 ///
116 /// Convenience wrapper combining [`MemoryPool::reserve`] and [`MemoryPoolToken::init].
117 pub fn chunk(&self, init_value: T) -> Result<Chunk<'_, T>, T> {
118 // We need to split reserving and initializing of the `Chunk` because we cannot copy the `init_value` into
119 // every `reserve` call.
120 let token = self.reserve();
121
122 if let Some(token) = token {
123 Ok(token.init(init_value))
124 } else {
125 Err(init_value)
126 }
127 }
128
129 /// Calculates the amount of chunks currently available.
130 ///
131 /// Due to accesses from interrupts and/or other threads, this value might not be correct.
132 /// Only intended for metrics.
133 pub fn chunks_available(&self) -> usize {
134 self.chunks
135 .iter()
136 .map(|chunk| usize::from(chunk.is_available()))
137 .sum()
138 }
139}
140
141// SAFETY: All accesses to the `MemoryPool` are done through the `MemoryPool::chunk` method which is synchronized by
142// atomics.
143unsafe impl<T, const N: usize> Sync for MemoryPool<T, N> {}
144
145/// Container for the `T` instance and synchronization atomic for the [`MemoryPool`].
146#[derive(Debug)]
147struct MemoryPoolInner<T> {
148 data: UnsafeCell<MaybeUninit<T>>,
149 available: AtomicBool,
150}
151
152impl<T> MemoryPoolInner<T> {
153 /// Creates a new `MemoryPoolInner`.
154 ///
155 /// Marked available and uninitialized.
156 const fn new() -> Self {
157 Self {
158 data: UnsafeCell::new(MaybeUninit::uninit()),
159 available: AtomicBool::new(true),
160 }
161 }
162
163 /// Reserves this [`MemoryPoolInner`].
164 fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
165 if self.available.swap(false, Ordering::AcqRel) {
166 Some(MemoryPoolToken { inner: Some(self) })
167 } else {
168 None
169 }
170 }
171
172 /// Returns `true` if the [`MemoryPoolInner`] is currently available.
173 fn is_available(&self) -> bool {
174 self.available.load(Ordering::Acquire)
175 }
176}
177
178/// A token reserving an element in a [`MemoryPool`] which can be initialized to create a [`Chunk`].
179#[derive(Debug)]
180pub struct MemoryPoolToken<'a, T> {
181 inner: Option<&'a MemoryPoolInner<T>>,
182}
183
184impl<'a, T> MemoryPoolToken<'a, T> {
185 /// Consumes the [`MemoryPoolToken.inner`][field@MemoryPoolToken::inner] to prevent [`MemoryPoolToken`]'s drop
186 /// implementation from making the element available.
187 fn consume(&mut self) -> (&'a mut MaybeUninit<T>, &'a AtomicBool) {
188 let Some(inner) = self.inner.take() else {
189 unreachable!("`MemoryPoolToken` should only be consumed once");
190 };
191
192 let inner_data = {
193 let inner_data_ptr = inner.data.get();
194 // SAFETY:
195 // - `UnsafeCell` has the same layout as its content, thus the `chunk_ptr` points to an aligned and valid
196 // value of `MaybeUninit<T>`.
197 // - We ensure via the `ChunkMetadata` that only this single mutable reference to the content of the
198 // `UnsafeCell` exists.
199 unsafe { inner_data_ptr.as_mut() }
200 .expect("pointer to the contents of an `UnsafeCell` should not be null")
201 };
202
203 (inner_data, &inner.available)
204 }
205
206 /// Consumes and turns the [`MemoryPoolToken`] into an initialized [`Chunk`].
207 pub fn init(mut self, init_value: T) -> Chunk<'a, T> {
208 let (inner_data, available) = self.consume();
209
210 inner_data.write(init_value);
211
212 // SAFETY:
213 // `inner_data` has be initialized by writing the `init_value`.
214 unsafe { Chunk::new(inner_data, available) }
215 }
216
217 /// Initializes a [`Chunk`] in place via `init_function`.
218 ///
219 /// # Safety
220 ///
221 /// `init_function` must initialize the passed parameter to a valid `T` before the function returns.
222 pub unsafe fn init_in_place(
223 mut self,
224 init_function: impl FnOnce(&mut MaybeUninit<T>),
225 ) -> Chunk<'a, T> {
226 let (inner_data, available) = self.consume();
227
228 init_function(inner_data);
229
230 // SAFETY:
231 // `inner_data` has be initialized by `init_function`.
232 unsafe { Chunk::new(inner_data, available) }
233 }
234}
235
236impl<T> Drop for MemoryPoolToken<'_, T> {
237 fn drop(&mut self) {
238 if let Some(inner) = self.inner.take() {
239 inner.available.store(true, Ordering::Release);
240 }
241 }
242}
243
244/// A pointer type pointing to an instance of `T` in a [`MemoryPool`].
245///
246/// See [module-level documentation][self] for more information.
247pub struct Chunk<'a, T> {
248 // We're using `&mut MaybeUninit<T>` instead of `&mut T` to be able to drop `T` without going through a pointer
249 // while only having a reference.
250 // We cannot drop the contents of a reference without creating a dangling reference in the `Drop` implementation.
251 inner: &'a mut MaybeUninit<T>,
252 // Only held to ensure the chunk is made available on drop.
253 token: &'a AtomicBool,
254}
255
256// Required so `Chunk` can be used in `yoke::Yoke` as the cart.
257// SAFETY: While `Chunk` has a reference to its assigned memory location in the `MemoryPool`,
258// the address of that memory cannot change as a reference to the `MemoryPool` instance is held.
259// With that, the address returned by the `Deref` and `DerefMut` implementations
260// are stable for the duration of the lifetime of `Chunk`.
261unsafe impl<'a, T> stable_deref_trait::StableDeref for Chunk<'a, T> {}
262
263impl<T> Debug for Chunk<'_, T>
264where
265 T: Debug,
266{
267 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
268 Debug::fmt(&**self, f)
269 }
270}
271
272impl<'a, T> Chunk<'a, T> {
273 /// Creates a new [`Chunk`].
274 ///
275 /// # Safety
276 ///
277 /// The `chunk` must be initialized.
278 unsafe fn new(chunk: &'a mut MaybeUninit<T>, token: &'a AtomicBool) -> Self {
279 Self {
280 inner: chunk,
281 token,
282 }
283 }
284}
285
286impl<T> Deref for Chunk<'_, T> {
287 type Target = T;
288
289 fn deref(&self) -> &Self::Target {
290 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
291 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
292 // initialized here.
293 unsafe { self.inner.assume_init_ref() }
294 }
295}
296
297impl<T> DerefMut for Chunk<'_, T> {
298 fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
299 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
300 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
301 // initialized here.
302 unsafe { self.inner.assume_init_mut() }
303 }
304}
305
306impl<T> Drop for Chunk<'_, T> {
307 fn drop(&mut self) {
308 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
309 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
310 // initialized here.
311 unsafe { self.inner.assume_init_drop() };
312 debug_assert!(
313 !self.token.swap(true, Ordering::AcqRel),
314 "chunk was made available a second time"
315 );
316 }
317}
318
319#[cfg(test)]
320#[cfg_attr(coverage_nightly, coverage(off))]
321mod test {
322 use std::format;
323 use std::sync::atomic::AtomicUsize;
324
325 use super::*;
326
327 #[test]
328 fn pool() {
329 static POOL: MemoryPool<[u8; 10], 2> = MemoryPool::new();
330
331 let mut chunk = POOL.chunk([0; 10]).unwrap();
332 let chunk1 = POOL.chunk([0; 10]).unwrap();
333 assert!(POOL.chunk([0; 10]).is_err());
334 assert_eq!(chunk[0], 0);
335 chunk[0] += 1;
336 assert_eq!(chunk[0], 1);
337 assert_eq!(chunk1[0], 0);
338 }
339
340 #[test]
341 fn drop_test() {
342 #[derive(Debug)]
343 pub struct Dropper {}
344 impl Drop for Dropper {
345 fn drop(&mut self) {
346 COUNTER.fetch_add(1, Ordering::Relaxed);
347 }
348 }
349
350 static COUNTER: AtomicUsize = AtomicUsize::new(0);
351
352 {
353 let pool: MemoryPool<Dropper, 2> = MemoryPool::new();
354
355 let _ = pool.chunk(Dropper {});
356 assert_eq!(COUNTER.load(Ordering::Relaxed), 1);
357
358 {
359 let _dropper1 = pool.chunk(Dropper {}).unwrap();
360 let _dropper2 = pool.chunk(Dropper {}).unwrap();
361 assert!(pool.chunk(Dropper {}).is_err());
362 }
363 assert_eq!(COUNTER.load(Ordering::Relaxed), 4);
364 let _ = pool.chunk(Dropper {});
365 assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
366 }
367
368 // After dropping `pool`, there were no additional drops of the contained type.
369 assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
370 }
371
372 #[test]
373 fn drop_memory_pool_token() {
374 let pool = MemoryPool::<usize, 1>::new();
375 assert_eq!(pool.chunks_available(), 1);
376 {
377 let _token = pool.reserve().unwrap();
378 assert_eq!(pool.chunks_available(), 0);
379 }
380 assert_eq!(pool.chunks_available(), 1);
381 }
382
383 #[test]
384 fn chunks_available() {
385 let pool = MemoryPool::<usize, 2>::new();
386 assert_eq!(pool.chunks_available(), 2);
387 {
388 let _chunk = pool.chunk(0);
389 assert_eq!(pool.chunks_available(), 1);
390 let _chunk = pool.chunk(0);
391 assert_eq!(pool.chunks_available(), 0);
392 }
393 assert_eq!(pool.chunks_available(), 2);
394 }
395
396 #[test]
397 fn reserve_init() {
398 let pool = MemoryPool::<usize, 2>::new();
399 let token = pool.reserve().unwrap();
400 let chunk = token.init(2);
401 assert_eq!(*chunk, 2);
402 }
403
404 #[test]
405 fn reserve_init_in_place() {
406 let pool = MemoryPool::<usize, 2>::new();
407 let token = pool.reserve().unwrap();
408 // SAFETY: The passed closure initializes the chunk correctly.
409 let chunk = unsafe {
410 token.init_in_place(|m| {
411 m.write(2);
412 })
413 };
414 assert_eq!(*chunk, 2);
415 }
416
417 #[test]
418 #[should_panic(expected = "`MemoryPoolToken` should only be consumed once")]
419 fn consume_none() {
420 let pool = MemoryPool::<usize, 2>::new();
421 let mut token = pool.reserve().unwrap();
422 let _ = token.consume();
423 let _ = token.consume();
424 }
425
426 /// Ensures the `MemoryPool` and `Chunk` don't lose their `Send` & `Sync` auto trait implementations when
427 /// refactoring.
428 #[test]
429 fn send_sync() {
430 fn send<T>()
431 where
432 T: Send,
433 {
434 }
435 fn sync<T>()
436 where
437 T: Sync,
438 {
439 }
440 send::<MemoryPool<[u8; 10], 2>>();
441 sync::<MemoryPool<[u8; 10], 2>>();
442
443 send::<Chunk<[u8; 10]>>();
444 sync::<Chunk<[u8; 10]>>();
445 }
446
447 #[test]
448 fn debug_chunk() {
449 let pool = MemoryPool::<usize, 2>::new();
450 let chunk = pool.chunk(0).unwrap();
451 assert_eq!(format!("{chunk:?}"), "0");
452 }
453
454 #[test]
455 fn default_memory_pool() {
456 let pool: MemoryPool<usize, 2> = MemoryPool::default();
457 assert_eq!(pool.chunks_available(), 2);
458 }
459}