veecle_os_runtime/
actor.rs

1//! Smallest unit of work within a runtime instance.
2use core::pin::Pin;
3
4use crate::Never;
5use crate::cons::{Cons, Nil};
6use crate::datastore::{ExclusiveReader, InitializedReader, Reader, SlotTrait, Storable, Writer};
7use crate::datastore::{Slot, generational};
8#[doc(inline)]
9pub use veecle_os_runtime_macros::actor;
10
11mod sealed {
12    pub trait Sealed {}
13}
14
15/// Actor interface.
16///
17/// The [`Actor`] trait allows writing actors that communicate within a runtime.
18/// It allows to define an initial context, which will be available for the whole life of the actor;
19/// a constructor method, with all the [`StoreRequest`] types it needs to communicate with other actors;
20/// and also the [`Actor::run`] method.
21///
22/// # Usage
23///
24/// Add the `Actor` implementing types to the actor list in [`veecle_os::runtime::execute!`](crate::execute!) when
25/// constructing a runtime instance.
26///
27/// The [`Actor::run`] method implements the actor's event loop.
28/// To yield back to the executor, every event loop must contain at least one `await`.
29/// Otherwise, the endless loop of the actor will block the executor and other actors.
30///
31/// ## Macros
32///
33/// The [`actor`][macro@crate::actor::actor] attribute macro can be used to implement actors.
34/// The function the macro is applied to is converted into the event loop.
35/// See its documentation for more details.
36///
37/// ### Example
38///
39/// ```rust
40/// # use std::fmt::Debug;
41/// #
42/// # use veecle_os_runtime::{Never, Storable, Reader, Writer};
43/// #
44/// # #[derive(Debug, Default, Storable)]
45/// # pub struct Foo;
46/// #
47/// # #[derive(Debug, Default, Storable)]
48/// # pub struct Bar;
49/// #
50/// # pub struct Ctx;
51///
52/// #[veecle_os_runtime::actor]
53/// async fn my_actor(
54///     reader: Reader<'_, Foo>,
55///     writer: Writer<'_, Bar>,
56///     #[init_context] ctx: Ctx,
57/// ) -> Never {
58///     loop {
59///         // Do something here.
60///     }
61/// }
62/// ```
63///
64/// This will create a new struct called `MyActor` which implements [`Actor`], letting you register it into a runtime.
65///
66/// ## Manual
67///
68/// For cases where the macro is not sufficient, the [`Actor`] trait can also be implemented manually:
69///
70/// ```rust
71/// # use std::fmt::Debug;
72/// #
73/// # use veecle_os_runtime::{Never, Storable, Reader, Writer, Actor};
74/// # use veecle_os_runtime::__exports::{AppendCons, DefinesSlot};
75/// #
76/// # #[derive(Debug, Default, Storable)]
77/// # pub struct Foo;
78/// #
79/// # #[derive(Debug, Default, Storable)]
80/// # pub struct Bar;
81/// #
82/// # pub struct Ctx;
83///
84/// struct MyActor<'a> {
85///     reader: Reader<'a, Foo>,
86///     writer: Writer<'a, Bar>,
87///     context: Ctx,
88/// }
89///
90/// impl<'a> Actor<'a> for MyActor<'a> {
91///     type StoreRequest = (Reader<'a, Foo>, Writer<'a, Bar>);
92///     type InitContext = Ctx;
93///     type Error = Never;
94///     type Slots = <<Reader<'a, Foo> as DefinesSlot>::Slot as AppendCons<<Writer<'a, Bar> as DefinesSlot>::Slot>>::Result;
95///
96///     fn new((reader, writer): Self::StoreRequest, context: Self::InitContext) -> Self {
97///         Self {
98///             reader,
99///             writer,
100///             context,
101///         }
102///     }
103///
104///     async fn run(mut self) -> Result<Never, Self::Error> {
105///         loop {
106///             // Do something here.
107///         }
108///     }
109/// }
110/// ```
111pub trait Actor<'a> {
112    /// [`Reader`]s and [`Writer`]s this actor requires.
113    type StoreRequest: StoreRequest<'a>;
114
115    /// Context that needs to be passed to the actor at initialisation.
116    type InitContext;
117
118    /// Cons list of slots required by this actor.
119    ///
120    /// This is a type-level cons list of `Slot<T>` types.
121    type Slots;
122
123    /// Error that this actor might return while running.
124    ///
125    /// This error is treated as fatal, if any actor returns an error the whole runtime will shutdown.
126    type Error: core::error::Error;
127
128    /// Creates a new instance of the struct implementing [`Actor`].
129    ///
130    /// See the [crate documentation][crate] for examples.
131    fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
132
133    /// Runs the [`Actor`] event loop.
134    ///
135    /// See the [crate documentation][crate] for examples.
136    fn run(self) -> impl core::future::Future<Output = Result<Never, Self::Error>>;
137}
138
139/// Allows requesting a (nearly) arbitrary amount of [`Reader`]s and [`Writer`]s in an [`Actor`].
140///
141/// This trait is not intended for direct usage by users.
142// Developer notes: This works by using type inference via `Datastore::reader` etc. to request `Reader`s etc. from the
143// `Datastore`.
144pub trait StoreRequest<'a>: sealed::Sealed {
145    /// Requests an instance of `Self` from the [`Datastore`].
146    ///
147    /// # Panics
148    ///
149    /// * If there is no [`Slot`] for one of the types in `Self` in the [`Datastore`].
150    ///
151    /// `requestor` will be included in the panic message for context.
152    #[doc(hidden)]
153    #[allow(async_fn_in_trait, reason = "it's actually private so it's fine")]
154    async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self;
155}
156
157impl sealed::Sealed for () {}
158
159#[doc(hidden)]
160/// Internal trait to abstract out type-erased and concrete data stores.
161pub trait Datastore {
162    /// Returns a generational source tracking the global datastore generation.
163    ///
164    /// This is used to ensure that every reader has had (or will have) a chance to read a value before a writer may
165    /// overwrite it.
166    fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
167
168    /// Returns a reference to a slot of a specific type.
169    ///
170    /// # Panics
171    ///
172    /// If there is no slot of type `S` in the datastore.
173    ///
174    /// `requestor` will be included in the panic message for context.
175    #[expect(private_bounds, reason = "the methods are internal")]
176    fn slot<S>(self: Pin<&Self>, requestor: &'static str) -> Pin<&S>
177    where
178        S: SlotTrait;
179}
180
181pub(crate) trait DatastoreExt<'a>: Copy {
182    #[cfg(test)]
183    /// Increments the global datastore generation.
184    ///
185    /// Asserts that every reader has had (or will have) a chance to read a value before a writer may overwrite it.
186    fn increment_generation(self);
187
188    /// Returns the [`Reader`] for a specific slot.
189    ///
190    /// # Panics
191    ///
192    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
193    ///
194    /// `requestor` will be included in the panic message for context.
195    fn reader<T>(self, requestor: &'static str) -> Reader<'a, T>
196    where
197        T: Storable + 'static;
198
199    /// Returns the [`ExclusiveReader`] for a specific slot.
200    ///
201    /// Exclusivity of the reader is not guaranteed by this method and must be ensured via other means (e.g.
202    /// [`crate::execute::make_store_and_validate`]).
203    ///
204    /// # Panics
205    ///
206    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
207    ///
208    /// `requestor` will be included in the panic message for context.
209    fn exclusive_reader<T>(self, requestor: &'static str) -> ExclusiveReader<'a, T>
210    where
211        T: Storable + 'static;
212
213    /// Returns the [`Writer`] for a specific slot.
214    ///
215    /// # Panics
216    ///
217    /// * If the [`Writer`] for this slot has already been acquired.
218    ///
219    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
220    ///
221    /// `requestor` will be included in the panic message for context.
222    fn writer<T>(self, requestor: &'static str) -> Writer<'a, T>
223    where
224        T: Storable + 'static;
225}
226
227impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
228where
229    S: Datastore,
230{
231    #[cfg(test)]
232    #[cfg_attr(coverage_nightly, coverage(off))]
233    fn increment_generation(self) {
234        self.source().increment_generation()
235    }
236
237    fn reader<T>(self, requestor: &'static str) -> Reader<'a, T>
238    where
239        T: Storable + 'static,
240    {
241        Reader::from_slot(self.slot::<Slot<T>>(requestor))
242    }
243
244    fn exclusive_reader<T>(self, requestor: &'static str) -> ExclusiveReader<'a, T>
245    where
246        T: Storable + 'static,
247    {
248        ExclusiveReader::from_slot(self.slot::<Slot<T>>(requestor))
249    }
250
251    fn writer<T>(self, requestor: &'static str) -> Writer<'a, T>
252    where
253        T: Storable + 'static,
254    {
255        Writer::new(self.source().waiter(), self.slot::<Slot<T>>(requestor))
256    }
257}
258
259/// Implements a no-op for Actors that do not read or write any values.
260impl<'a> StoreRequest<'a> for () {
261    async fn request(_store: Pin<&'a impl Datastore>, _requestor: &'static str) -> Self {}
262}
263
264impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
265
266impl<'a, T> StoreRequest<'a> for Reader<'a, T>
267where
268    T: Storable + 'static,
269{
270    async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self {
271        datastore.reader(requestor)
272    }
273}
274
275impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
276
277impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
278where
279    T: Storable + 'static,
280{
281    async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self {
282        datastore.exclusive_reader(requestor)
283    }
284}
285
286impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
287
288impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
289where
290    T: Storable + 'static,
291{
292    async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self {
293        Reader::from_slot(datastore.slot(requestor))
294            .wait_init()
295            .await
296    }
297}
298
299impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
300
301impl<'a, T> StoreRequest<'a> for Writer<'a, T>
302where
303    T: Storable + 'static,
304{
305    async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self {
306        datastore.writer(requestor)
307    }
308}
309
310/// Implements [`StoreRequest`] for provided types.
311macro_rules! impl_request_helper {
312    ($t:ident) => {
313        #[cfg_attr(docsrs, doc(fake_variadic))]
314        /// This trait is implemented for tuples up to seven items long.
315        impl<'a, $t> sealed::Sealed for ($t,) { }
316
317        #[cfg_attr(docsrs, doc(fake_variadic))]
318        /// This trait is implemented for tuples up to seven items long.
319        impl<'a, $t> StoreRequest<'a> for ($t,)
320        where
321            $t: StoreRequest<'a>,
322        {
323            async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self {
324                (<$t as StoreRequest>::request(datastore, requestor).await,)
325            }
326        }
327    };
328
329    (@impl $($t:ident)*) => {
330        #[cfg_attr(docsrs, doc(hidden))]
331        impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
332        where
333            $($t: sealed::Sealed),*
334        { }
335
336        #[cfg_attr(docsrs, doc(hidden))]
337        impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
338        where
339            $($t: StoreRequest<'a>),*
340        {
341            async fn request(datastore: Pin<&'a impl Datastore>, requestor: &'static str) -> Self {
342                // join! is necessary here to avoid argument-order-dependence with the #[actor] macro.
343                // This ensures that any `InitializedReaders` in self correctly track the generation at which they were
344                // first ready, so that the first `wait_for_update` sees the value that caused them to become
345                // initialized.
346                // See `multi_request_order_independence` for the verification of this.
347                futures::join!($( <$t as StoreRequest>::request(datastore, requestor), )*)
348            }
349        }
350    };
351
352    ($head:ident $($rest:ident)*) => {
353        impl_request_helper!(@impl $head $($rest)*);
354        impl_request_helper!($($rest)*);
355    };
356}
357
358impl_request_helper!(Z Y X W V U T);
359
360/// Macro helper to allow actors to return either a [`Result`] type or [`Never`] (and eventually [`!`]).
361#[diagnostic::on_unimplemented(
362    message = "#[veecle_os_runtime::actor] functions should return either a `Result<Never, _>` or `Never`",
363    label = "not a valid actor return type"
364)]
365pub trait IsActorResult: sealed::Sealed {
366    /// The error type this result converts into.
367    type Error;
368
369    /// Convert the result into an actual [`Result`] value.
370    fn into_result(self) -> Result<Never, Self::Error>;
371}
372
373impl<E> sealed::Sealed for Result<Never, E> {}
374
375impl<E> IsActorResult for Result<Never, E> {
376    type Error = E;
377
378    fn into_result(self) -> Result<Never, E> {
379        self
380    }
381}
382
383impl sealed::Sealed for Never {}
384
385impl IsActorResult for Never {
386    type Error = Never;
387
388    fn into_result(self) -> Result<Never, Self::Error> {
389        match self {}
390    }
391}
392
393/// Determines whether a reader/writer defines a slot.
394///
395/// A slot is defined by one side of a reader/writer combo.
396/// For a [`Writer`] - [`Reader`] combination, the `Writer` defines the slot, as that is the unique side.
397/// As a consequence, every possible combination must have a side that defines the slot.
398#[doc(hidden)]
399pub trait DefinesSlot {
400    /// The slot cons list for this store request type.
401    ///
402    /// Returns [`Nil`] for readers or `Cons<Slot<T>, Nil>` for writers.
403    type Slot;
404}
405
406impl<'a, T> DefinesSlot for Writer<'a, T>
407where
408    T: Storable,
409{
410    type Slot = Cons<Slot<T>, Nil>;
411}
412
413impl<'a, T> DefinesSlot for Reader<'a, T>
414where
415    T: Storable,
416{
417    type Slot = Nil;
418}
419
420impl<'a, T> DefinesSlot for ExclusiveReader<'a, T>
421where
422    T: Storable,
423{
424    type Slot = Nil;
425}
426
427impl<'a, T> DefinesSlot for InitializedReader<'a, T>
428where
429    T: Storable,
430{
431    type Slot = Nil;
432}
433
434#[cfg(test)]
435#[cfg_attr(coverage_nightly, coverage(off))]
436mod tests {
437    use core::future::Future;
438    use core::pin::pin;
439    use core::task::{Context, Poll};
440
441    use futures::future::FutureExt;
442
443    use crate::actor::{DatastoreExt, StoreRequest};
444    use crate::cons::{Cons, Nil};
445    use crate::datastore::{InitializedReader, Slot, Storable};
446
447    #[test]
448    fn multi_request_order_independence() {
449        #[derive(Debug, Storable)]
450        #[storable(crate = crate)]
451        struct A;
452
453        #[derive(Debug, Storable)]
454        #[storable(crate = crate)]
455        struct B;
456
457        let datastore = pin!(crate::execute::make_store::<
458            Cons<Slot<A>, Cons<Slot<B>, Nil>>,
459        >());
460
461        let mut a_writer = datastore.as_ref().writer::<A>("a_writer");
462        let mut b_writer = datastore.as_ref().writer::<B>("b_writer");
463
464        // No matter the order these two request the readers, they should both resolve during the generation where the
465        // later of the two is first written.
466        let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
467            datastore.as_ref(),
468            "request_1",
469        ));
470        let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
471            datastore.as_ref(),
472            "request_2",
473        ));
474
475        let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
476        let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
477
478        let mut request_1_context = Context::from_waker(&request_1_waker);
479        let mut request_2_context = Context::from_waker(&request_2_waker);
480
481        assert!(matches!(
482            request_1.as_mut().poll(&mut request_1_context),
483            Poll::Pending
484        ));
485        assert!(matches!(
486            request_2.as_mut().poll(&mut request_2_context),
487            Poll::Pending
488        ));
489
490        let old_request_1_wake_count = request_1_wake_count.get();
491        let old_request_2_wake_count = request_2_wake_count.get();
492
493        datastore.as_ref().increment_generation();
494
495        a_writer.write(A).now_or_never().unwrap();
496
497        // When the first value is written, each future may or may not wake up, but if they do we need to poll them.
498        if request_1_wake_count.get() > old_request_1_wake_count {
499            assert!(matches!(
500                request_1.as_mut().poll(&mut request_1_context),
501                Poll::Pending
502            ));
503        }
504        if request_2_wake_count.get() > old_request_2_wake_count {
505            assert!(matches!(
506                request_2.as_mut().poll(&mut request_2_context),
507                Poll::Pending
508            ));
509        }
510
511        let old_request_1_wake_count = request_1_wake_count.get();
512        let old_request_2_wake_count = request_2_wake_count.get();
513
514        datastore.as_ref().increment_generation();
515
516        b_writer.write(B).now_or_never().unwrap();
517
518        // When the second value is written, both futures _must_ wake up and complete.
519        assert!(request_1_wake_count.get() > old_request_1_wake_count);
520        assert!(request_2_wake_count.get() > old_request_2_wake_count);
521
522        let Poll::Ready((mut request_1_a, mut request_1_b)) =
523            request_1.as_mut().poll(&mut request_1_context)
524        else {
525            panic!("request 1 was not ready")
526        };
527
528        let Poll::Ready((mut request_2_a, mut request_2_b)) =
529            request_2.as_mut().poll(&mut request_2_context)
530        else {
531            panic!("request 2 was not ready")
532        };
533
534        // All readers should see an update, since they've just been initialized but not `wait_for_update`d.
535        assert!(request_1_a.wait_for_update().now_or_never().is_some());
536        assert!(request_1_b.wait_for_update().now_or_never().is_some());
537
538        assert!(request_2_a.wait_for_update().now_or_never().is_some());
539        assert!(request_2_b.wait_for_update().now_or_never().is_some());
540    }
541}