veecle_os_runtime/
actor.rs

1//! Smallest unit of work within a runtime instance.
2use core::convert::Infallible;
3use core::pin::Pin;
4
5#[doc(inline)]
6pub use veecle_os_runtime_macros::actor;
7
8use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
9use crate::datastore::{Slot, generational};
10
11mod sealed {
12    pub trait Sealed {}
13}
14
15/// Actor interface.
16///
17/// The [`Actor`] trait allows writing actors that communicate within a runtime.
18/// It allows to define an initial context, which will be available for the whole life of the actor;
19/// a constructor method, with all the [`StoreRequest`] types it needs to communicate with other actors;
20/// and also the [`Actor::run`] method.
21///
22/// # Usage
23///
24/// Add the `Actor` implementing types to the actor list in [`veecle_os::runtime::execute!`](crate::execute!) when
25/// constructing a runtime instance.
26///
27/// The [`Actor::run`] method implements the actor's event loop.
28/// To yield back to the executor, every event loop must contain at least one `await`.
29/// Otherwise, the endless loop of the actor will block the executor and other actors.
30///
31/// ## Macros
32///
33/// The [`actor`][macro@crate::actor::actor] attribute macro can be used to implement actors.
34/// The function the macro is applied to is converted into the event loop.
35/// See its documentation for more details.
36///
37/// ### Example
38///
39/// ```rust
40/// # use std::convert::Infallible;
41/// # use std::fmt::Debug;
42/// #
43/// # use veecle_os_runtime::{Storable, Reader, Writer};
44/// #
45/// # #[derive(Debug, Default, Storable)]
46/// # pub struct Foo;
47/// #
48/// # #[derive(Debug, Default, Storable)]
49/// # pub struct Bar;
50/// #
51/// # pub struct Ctx;
52///
53/// #[veecle_os_runtime::actor]
54/// async fn my_actor(
55///     reader: Reader<'_, Foo>,
56///     writer: Writer<'_, Bar>,
57///     #[init_context] ctx: Ctx,
58/// ) -> Infallible {
59///     loop {
60///         // Do something here.
61///     }
62/// }
63/// ```
64///
65/// This will create a new struct called `MyActor` which implements [`Actor`], letting you register it into a runtime.
66///
67/// ## Manual
68///
69/// For cases where the macro is not sufficient, the [`Actor`] trait can also be implemented manually:
70///
71/// ```rust
72/// # use std::convert::Infallible;
73/// # use std::fmt::Debug;
74/// #
75/// # use veecle_os_runtime::{Storable, Reader, Writer, Actor};
76/// #
77/// # #[derive(Debug, Default, Storable)]
78/// # pub struct Foo;
79/// #
80/// # #[derive(Debug, Default, Storable)]
81/// # pub struct Bar;
82/// #
83/// # pub struct Ctx;
84///
85/// struct MyActor<'a> {
86///     reader: Reader<'a, Foo>,
87///     writer: Writer<'a, Bar>,
88///     context: Ctx,
89/// }
90///
91/// impl<'a> Actor<'a> for MyActor<'a> {
92///     type StoreRequest = (Reader<'a, Foo>, Writer<'a, Bar>);
93///     type InitContext = Ctx;
94///     type Error = Infallible;
95///
96///     fn new((reader, writer): Self::StoreRequest, context: Self::InitContext) -> Self {
97///         Self {
98///             reader,
99///             writer,
100///             context,
101///         }
102///     }
103///
104///     async fn run(mut self) -> Result<Infallible, Self::Error> {
105///         loop {
106///             // Do something here.
107///         }
108///     }
109/// }
110/// ```
111pub trait Actor<'a> {
112    /// [`Reader`]s and [`Writer`]s this actor requires.
113    type StoreRequest: StoreRequest<'a>;
114
115    /// Context that needs to be passed to the actor at initialisation.
116    type InitContext;
117
118    /// Error that this actor might return while running.
119    ///
120    /// This error is treated as fatal, if any actor returns an error the whole runtime will shutdown.
121    type Error: core::error::Error;
122
123    /// Creates a new instance of the struct implementing [`Actor`].
124    ///
125    /// See the [crate documentation][crate] for examples.
126    fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
127
128    /// Runs the [`Actor`] event loop.
129    ///
130    /// See the [crate documentation][crate] for examples.
131    fn run(
132        self,
133    ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
134}
135
136/// Allows requesting a (nearly) arbitrary amount of [`Reader`]s and [`Writer`]s in an [`Actor`].
137///
138/// This trait is not intended for direct usage by users.
139// Developer notes: This works by using type inference via `Datastore::reader` etc. to request `Reader`s etc. from the
140// `Datastore`.
141pub trait StoreRequest<'a>: sealed::Sealed {
142    /// Requests an instance of `Self` from the [`Datastore`].
143    #[doc(hidden)]
144    #[allow(async_fn_in_trait, reason = "it's actually private so it's fine")]
145    async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
146}
147
148impl sealed::Sealed for () {}
149
150/// Internal trait to abstract out type-erased and concrete data stores.
151pub trait Datastore {
152    /// Returns a generational source tracking the global datastore generation.
153    ///
154    /// This is used to ensure that every reader has had (or will have) a chance to read a value before a writer may
155    /// overwrite it.
156    fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
157
158    #[expect(
159        rustdoc::private_intra_doc_links,
160        reason = "`rustdoc` is buggy with links from `pub` but unreachable types"
161    )]
162    /// Returns a reference to the slot for a specific type.
163    ///
164    /// # Panics
165    ///
166    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
167    #[expect(private_interfaces, reason = "the methods are internal")]
168    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
169    where
170        T: Storable + 'static;
171}
172
173impl<S> Datastore for Pin<&S>
174where
175    S: Datastore,
176{
177    fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
178        Pin::into_inner(self).source()
179    }
180
181    #[expect(private_interfaces, reason = "the methods are internal")]
182    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
183    where
184        T: Storable + 'static,
185    {
186        Pin::into_inner(self).slot()
187    }
188}
189
190pub(crate) trait DatastoreExt<'a>: Copy {
191    #[cfg(test)]
192    /// Increments the global datastore generation.
193    ///
194    /// Asserts that every reader has had (or will have) a chance to read a value before a writer may overwrite it.
195    fn increment_generation(self);
196
197    /// Returns the [`Reader`] for a specific slot.
198    ///
199    /// # Panics
200    ///
201    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
202    fn reader<T>(self) -> Reader<'a, T>
203    where
204        T: Storable + 'static;
205
206    /// Returns the [`ExclusiveReader`] for a specific slot.
207    ///
208    /// Exclusivity of the reader is not guaranteed by this method and must be ensured via other means (e.g.
209    /// [`crate::execute::validate_actors`]).
210    ///
211    /// # Panics
212    ///
213    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
214    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
215    where
216        T: Storable + 'static;
217
218    /// Returns the [`Writer`] for a specific slot.
219    ///
220    /// # Panics
221    ///
222    /// * If the [`Writer`] for this slot has already been acquired.
223    ///
224    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
225    fn writer<T>(self) -> Writer<'a, T>
226    where
227        T: Storable + 'static;
228}
229
230impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
231where
232    S: Datastore,
233{
234    #[cfg(test)]
235    #[cfg_attr(coverage_nightly, coverage(off))]
236    fn increment_generation(self) {
237        self.source().increment_generation()
238    }
239
240    fn reader<T>(self) -> Reader<'a, T>
241    where
242        T: Storable + 'static,
243    {
244        Reader::from_slot(self.slot::<T>())
245    }
246
247    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
248    where
249        T: Storable + 'static,
250    {
251        ExclusiveReader::from_slot(self.slot::<T>())
252    }
253
254    fn writer<T>(self) -> Writer<'a, T>
255    where
256        T: Storable + 'static,
257    {
258        Writer::new(self.source().waiter(), self.slot::<T>())
259    }
260}
261
262/// Implements a no-op for Actors that do not read or write any values.
263impl<'a> StoreRequest<'a> for () {
264    async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
265}
266
267impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
268
269impl<'a, T> StoreRequest<'a> for Reader<'a, T>
270where
271    T: Storable + 'static,
272{
273    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
274        datastore.reader()
275    }
276}
277
278impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
279
280impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
281where
282    T: Storable + 'static,
283{
284    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
285        datastore.exclusive_reader()
286    }
287}
288
289impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
290
291impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
292where
293    T: Storable + 'static,
294{
295    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
296        Reader::from_slot(datastore.slot()).wait_init().await
297    }
298}
299
300impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
301
302impl<'a, T> StoreRequest<'a> for Writer<'a, T>
303where
304    T: Storable + 'static,
305{
306    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
307        datastore.writer()
308    }
309}
310
311/// Implements [`StoreRequest`] for provided types.
312macro_rules! impl_request_helper {
313    ($t:ident) => {
314        #[cfg_attr(docsrs, doc(fake_variadic))]
315        /// This trait is implemented for tuples up to seven items long.
316        impl<'a, $t> sealed::Sealed for ($t,) { }
317
318        #[cfg_attr(docsrs, doc(fake_variadic))]
319        /// This trait is implemented for tuples up to seven items long.
320        impl<'a, $t> StoreRequest<'a> for ($t,)
321        where
322            $t: StoreRequest<'a>,
323        {
324            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
325                (<$t as StoreRequest>::request(datastore).await,)
326            }
327        }
328    };
329
330    (@impl $($t:ident)*) => {
331        #[cfg_attr(docsrs, doc(hidden))]
332        impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
333        where
334            $($t: sealed::Sealed),*
335        { }
336
337        #[cfg_attr(docsrs, doc(hidden))]
338        impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
339        where
340            $($t: StoreRequest<'a>),*
341        {
342            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
343                // join! is necessary here to avoid argument-order-dependence with the #[actor] macro.
344                // This ensures that any `InitializedReaders` in self correctly track the generation at which they were
345                // first ready, so that the first `wait_for_update` sees the value that caused them to become
346                // initialized.
347                // See `multi_request_order_independence` for the verification of this.
348                futures::join!($( <$t as StoreRequest>::request(datastore), )*)
349            }
350        }
351    };
352
353    ($head:ident $($rest:ident)*) => {
354        impl_request_helper!(@impl $head $($rest)*);
355        impl_request_helper!($($rest)*);
356    };
357}
358
359impl_request_helper!(Z Y X W V U T);
360
361/// Macro helper to allow actors to return either a [`Result`] type or [`Infallible`] (and eventually [`!`]).
362#[diagnostic::on_unimplemented(
363    message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
364    label = "not a valid actor return type"
365)]
366pub trait IsActorResult: sealed::Sealed {
367    /// The error type this result converts into.
368    type Error;
369
370    /// Convert the result into an actual [`Result`] value.
371    fn into_result(self) -> Result<Infallible, Self::Error>;
372}
373
374impl<E> sealed::Sealed for Result<Infallible, E> {}
375
376impl<E> IsActorResult for Result<Infallible, E> {
377    type Error = E;
378
379    fn into_result(self) -> Result<Infallible, E> {
380        self
381    }
382}
383
384impl sealed::Sealed for Infallible {}
385
386impl IsActorResult for Infallible {
387    type Error = Infallible;
388
389    fn into_result(self) -> Result<Infallible, Self::Error> {
390        match self {}
391    }
392}
393
394#[cfg(test)]
395#[cfg_attr(coverage_nightly, coverage(off))]
396mod tests {
397    use core::future::Future;
398    use core::pin::pin;
399    use core::task::{Context, Poll};
400
401    use futures::future::FutureExt;
402
403    use crate::actor::{DatastoreExt, StoreRequest};
404    use crate::cons::{Cons, Nil};
405    use crate::datastore::{InitializedReader, Storable};
406
407    #[test]
408    fn multi_request_order_independence() {
409        #[derive(Debug, Storable)]
410        #[storable(crate = crate)]
411        struct A;
412
413        #[derive(Debug, Storable)]
414        #[storable(crate = crate)]
415        struct B;
416
417        let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
418
419        let mut a_writer = datastore.as_ref().writer::<A>();
420        let mut b_writer = datastore.as_ref().writer::<B>();
421
422        // No matter the order these two request the readers, they should both resolve during the generation where the
423        // later of the two is first written.
424        let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
425            datastore.as_ref()
426        ));
427        let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
428            datastore.as_ref()
429        ));
430
431        let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
432        let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
433
434        let mut request_1_context = Context::from_waker(&request_1_waker);
435        let mut request_2_context = Context::from_waker(&request_2_waker);
436
437        assert!(matches!(
438            request_1.as_mut().poll(&mut request_1_context),
439            Poll::Pending
440        ));
441        assert!(matches!(
442            request_2.as_mut().poll(&mut request_2_context),
443            Poll::Pending
444        ));
445
446        let old_request_1_wake_count = request_1_wake_count.get();
447        let old_request_2_wake_count = request_2_wake_count.get();
448
449        datastore.as_ref().increment_generation();
450
451        a_writer.write(A).now_or_never().unwrap();
452
453        // When the first value is written, each future may or may not wake up, but if they do we need to poll them.
454        if request_1_wake_count.get() > old_request_1_wake_count {
455            assert!(matches!(
456                request_1.as_mut().poll(&mut request_1_context),
457                Poll::Pending
458            ));
459        }
460        if request_2_wake_count.get() > old_request_2_wake_count {
461            assert!(matches!(
462                request_2.as_mut().poll(&mut request_2_context),
463                Poll::Pending
464            ));
465        }
466
467        let old_request_1_wake_count = request_1_wake_count.get();
468        let old_request_2_wake_count = request_2_wake_count.get();
469
470        datastore.as_ref().increment_generation();
471
472        b_writer.write(B).now_or_never().unwrap();
473
474        // When the second value is written, both futures _must_ wake up and complete.
475        assert!(request_1_wake_count.get() > old_request_1_wake_count);
476        assert!(request_2_wake_count.get() > old_request_2_wake_count);
477
478        let Poll::Ready((mut request_1_a, mut request_1_b)) =
479            request_1.as_mut().poll(&mut request_1_context)
480        else {
481            panic!("request 1 was not ready")
482        };
483
484        let Poll::Ready((mut request_2_a, mut request_2_b)) =
485            request_2.as_mut().poll(&mut request_2_context)
486        else {
487            panic!("request 2 was not ready")
488        };
489
490        // All readers should see an update, since they've just been initialized but not `wait_for_update`d.
491        assert!(request_1_a.wait_for_update().now_or_never().is_some());
492        assert!(request_1_b.wait_for_update().now_or_never().is_some());
493
494        assert!(request_2_a.wait_for_update().now_or_never().is_some());
495        assert!(request_2_b.wait_for_update().now_or_never().is_some());
496    }
497}