veecle_os_runtime/datastore/
exclusive_reader.rs1use core::cell::Ref;
2use core::fmt::Debug;
3use core::marker::PhantomData;
4use core::pin::Pin;
5
6use crate::datastore::Storable;
7use crate::datastore::slot::{self, Slot};
8
9#[derive(Debug)]
59pub struct ExclusiveReader<'a, T>
60where
61 T: Storable + 'static,
62{
63 waiter: slot::Waiter<'a, T>,
64
65 marker: PhantomData<fn(T)>,
66}
67
68impl<T> ExclusiveReader<'_, T>
69where
70 T: Storable + 'static,
71{
72 #[veecle_telemetry::instrument]
78 pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
79 self.waiter.read(|value| {
80 let value = value.as_ref();
81
82 veecle_telemetry::trace!("Slot read", value = format_args!("{value:?}"));
83
84 f(value)
85 })
86 }
87
88 pub fn take(&mut self) -> Option<T::DataType> {
90 let span = veecle_telemetry::span!("take");
91 let _guard = span.enter();
92
93 let value = self.waiter.take(span.context());
94
95 veecle_telemetry::trace!("Slot value taken", value = format_args!("{value:?}"));
96
97 value
98 }
99
100 pub fn read_cloned(&self) -> Option<T::DataType>
105 where
106 T::DataType: Clone,
107 {
108 self.read(|t| t.cloned())
109 }
110
111 #[veecle_telemetry::instrument]
119 pub async fn wait_for_update(&mut self) -> &mut Self {
120 self.waiter.wait().await;
121 self.waiter.update_generation();
122 self
123 }
124}
125
126impl<'a, T> ExclusiveReader<'a, T>
127where
128 T: Storable + 'static,
129{
130 pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
132 ExclusiveReader {
133 waiter: slot.waiter(),
134 marker: PhantomData,
135 }
136 }
137}
138
139impl<T> super::combined_readers::Sealed for ExclusiveReader<'_, T> where T: Storable {}
140
141impl<T> super::combined_readers::CombinableReader for ExclusiveReader<'_, T>
142where
143 T: Storable,
144{
145 type ToBeRead = Option<T::DataType>;
146
147 fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
148 self.waiter.borrow()
149 }
150
151 async fn wait_for_update(&mut self) {
152 self.wait_for_update().await;
153 }
154}
155
156#[cfg(test)]
157#[cfg_attr(coverage_nightly, coverage(off))]
158mod tests {
159 use core::pin::pin;
160 use futures::FutureExt;
161
162 use crate::datastore::{ExclusiveReader, Slot, Storable, Writer, generational};
163
164 #[test]
165 fn read() {
166 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
167 #[storable(crate = crate)]
168 struct Sensor(u8);
169
170 let source = pin!(generational::Source::new());
171 let slot = pin!(Slot::<Sensor>::new());
172
173 let reader = ExclusiveReader::from_slot(slot.as_ref());
174 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
175
176 assert_eq!(reader.read(|x| x.cloned()), None);
177 assert_eq!(reader.read_cloned(), None);
178
179 source.as_ref().increment_generation();
180 writer.write(Sensor(1)).now_or_never().unwrap();
181
182 assert_eq!(
183 reader.read(|x: Option<&Sensor>| x.cloned()),
184 Some(Sensor(1))
185 );
186 assert_eq!(reader.read_cloned(), Some(Sensor(1)));
187 }
188
189 #[test]
190 fn take() {
191 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
192 #[storable(crate = crate)]
193 struct Sensor(u8);
194
195 let source = pin!(generational::Source::new());
196 let slot = pin!(Slot::<Sensor>::new());
197
198 let mut reader = ExclusiveReader::from_slot(slot.as_ref());
199 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
200
201 assert_eq!(reader.take(), None);
202 source.as_ref().increment_generation();
203 writer.write(Sensor(10)).now_or_never().unwrap();
204 assert_eq!(reader.take(), Some(Sensor(10)));
205 assert_eq!(reader.take(), None);
206 }
207
208 #[test]
209 fn wait_for_update() {
210 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
211 #[storable(crate = crate)]
212 struct Sensor(u8);
213
214 let source = pin!(generational::Source::new());
215 let slot = pin!(Slot::<Sensor>::new());
216
217 let mut reader = ExclusiveReader::from_slot(slot.as_ref());
218 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
219
220 assert!(reader.wait_for_update().now_or_never().is_none());
221
222 source.as_ref().increment_generation();
223 writer.write(Sensor(1)).now_or_never().unwrap();
224
225 reader
226 .wait_for_update()
227 .now_or_never()
228 .unwrap()
229 .read(|x| assert_eq!(x, Some(&Sensor(1))));
230 }
231}