veecle_telemetry/collector/mod.rs
1//! Telemetry data collection and export infrastructure.
2//!
3//! This module provides the core infrastructure for collecting telemetry data and exporting it
4//! to various backends.
5//! It includes the global collector singleton, export trait, and various
6//! built-in exporters.
7//!
8//! # Global Collector
9//!
10//! The collector uses a global singleton pattern to ensure telemetry data is collected
11//! consistently across the entire application.
12//! The collector must be initialized once
13//! using [`set_exporter`] before any telemetry data can be collected.
14//!
15//! # Export Trait
16//!
17//! The [`Export`] trait defines the interface for exporting telemetry data.
18//! Custom exporters can be implemented by providing an implementation of this trait.
19//!
20//! # Built-in Exporters
21//!
22//! - [`ConsoleJsonExporter`] - Exports telemetry data as JSON to stdout
23//! - [`TestExporter`] - Collects telemetry data in memory for testing purposes
24
25#[cfg(feature = "std")]
26mod json_exporter;
27#[cfg(feature = "std")]
28mod pretty_exporter;
29#[cfg(feature = "std")]
30mod test_exporter;
31
32use core::fmt::Debug;
33#[cfg(feature = "enable")]
34use core::sync::atomic::{AtomicUsize, Ordering};
35use core::{error, fmt};
36
37#[cfg(feature = "std")]
38pub use json_exporter::ConsoleJsonExporter;
39#[cfg(feature = "std")]
40pub use pretty_exporter::ConsolePrettyExporter;
41#[cfg(feature = "std")]
42#[doc(hidden)]
43pub use test_exporter::TestExporter;
44
45use crate::protocol::InstanceMessage;
46#[cfg(feature = "enable")]
47pub use crate::protocol::ProcessId;
48#[cfg(feature = "enable")]
49use crate::protocol::{
50 LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
51 SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, ThreadId,
52 TracingMessage,
53};
54
55/// Trait for exporting telemetry data to external systems.
56///
57/// Implementors of this trait define how telemetry data should be exported,
58/// whether to files, network endpoints, or other destinations.
59///
60/// # Examples
61///
62/// ```rust
63/// use veecle_telemetry::collector::Export;
64/// use veecle_telemetry::protocol::InstanceMessage;
65///
66/// #[derive(Debug)]
67/// struct CustomExporter;
68///
69/// impl Export for CustomExporter {
70/// fn export(&self, message: InstanceMessage<'_>) {
71/// // Custom export logic here
72/// println!("Exporting: {:?}", message);
73/// }
74/// }
75/// ```
76pub trait Export: Debug {
77 /// Exports a telemetry message.
78 ///
79 /// This method is called for each telemetry message that needs to be exported.
80 /// The implementation should handle the message appropriately based on its type.
81 fn export(&self, message: InstanceMessage<'_>);
82}
83
84/// The global telemetry collector.
85///
86/// This structure manages the collection and export of telemetry data.
87/// It maintains a unique execution ID, handles trace ID generation, and coordinates with the
88/// configured exporter.
89///
90/// The collector is typically accessed through the [`get_collector`] function rather
91/// than being constructed directly.
92#[derive(Debug)]
93pub struct Collector {
94 #[cfg(feature = "enable")]
95 inner: CollectorInner,
96}
97
98#[cfg(feature = "enable")]
99#[derive(Debug)]
100struct CollectorInner {
101 process_id: ProcessId,
102
103 exporter: &'static (dyn Export + Sync),
104}
105
106#[cfg(feature = "enable")]
107#[derive(Debug)]
108struct NopExporter;
109
110#[cfg(feature = "enable")]
111impl Export for NopExporter {
112 fn export(&self, _: InstanceMessage) {}
113}
114
115// The GLOBAL_COLLECTOR static holds a pointer to the global exporter. It is protected by
116// the GLOBAL_INIT static which determines whether GLOBAL_EXPORTER has been initialized.
117#[cfg(feature = "enable")]
118static mut GLOBAL_COLLECTOR: Collector = Collector {
119 inner: CollectorInner {
120 process_id: ProcessId::from_raw(0),
121 exporter: &NO_EXPORTER,
122 },
123};
124static NO_COLLECTOR: Collector = Collector {
125 #[cfg(feature = "enable")]
126 inner: CollectorInner {
127 process_id: ProcessId::from_raw(0),
128 exporter: &NO_EXPORTER,
129 },
130};
131#[cfg(feature = "enable")]
132static NO_EXPORTER: NopExporter = NopExporter;
133
134#[cfg(feature = "enable")]
135static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
136
137// There are three different states that we care about:
138// - the collector is uninitialized
139// - the collector is initializing (set_exporter has been called but GLOBAL_COLLECTOR hasn't been set yet)
140// - the collector is active
141#[cfg(feature = "enable")]
142const UNINITIALIZED: usize = 0;
143#[cfg(feature = "enable")]
144const INITIALIZING: usize = 1;
145#[cfg(feature = "enable")]
146const INITIALIZED: usize = 2;
147
148/// Initializes the collector with the given Exporter and [`ProcessId`].
149///
150/// A [`ProcessId`] should never be re-used as it's used to collect metadata about the execution and to generate
151/// [`SpanContext`]s which need to be globally unique.
152///
153/// [`SpanContext`]: crate::SpanContext
154#[cfg(feature = "enable")]
155pub fn set_exporter(
156 process_id: ProcessId,
157 exporter: &'static (dyn Export + Sync),
158) -> Result<(), SetExporterError> {
159 if GLOBAL_INIT
160 .compare_exchange(
161 UNINITIALIZED,
162 INITIALIZING,
163 Ordering::Acquire,
164 Ordering::Relaxed,
165 )
166 .is_ok()
167 {
168 // SAFETY: this is guarded by the atomic
169 unsafe { GLOBAL_COLLECTOR = Collector::new(process_id, exporter) }
170 GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
171
172 Ok(())
173 } else {
174 Err(SetExporterError(()))
175 }
176}
177
178/// Returns a reference to the collector.
179///
180/// If an exporter has not been set, a no-op implementation is returned.
181pub fn get_collector() -> &'static Collector {
182 #[cfg(not(feature = "enable"))]
183 {
184 &NO_COLLECTOR
185 }
186
187 // Acquire memory ordering guarantees that current thread would see any
188 // memory writes that happened before store of the value
189 // into `GLOBAL_INIT` with memory ordering `Release` or stronger.
190 //
191 // Since the value `INITIALIZED` is written only after `GLOBAL_COLLECTOR` was
192 // initialized, observing it after `Acquire` load here makes both
193 // write to the `GLOBAL_COLLECTOR` static and initialization of the exporter
194 // internal state synchronized with current thread.
195 #[cfg(feature = "enable")]
196 if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
197 &NO_COLLECTOR
198 } else {
199 // SAFETY: this is guarded by the atomic
200 unsafe {
201 #[expect(clippy::deref_addrof, reason = "false positive")]
202 &*&raw const GLOBAL_COLLECTOR
203 }
204 }
205}
206
207/// The type returned by [`set_exporter`] if [`set_exporter`] has already been called.
208///
209/// [`set_exporter`]: fn.set_exporter.html
210#[derive(Debug)]
211pub struct SetExporterError(());
212
213impl SetExporterError {
214 const MESSAGE: &'static str = "a global exporter has already been set";
215}
216
217impl fmt::Display for SetExporterError {
218 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
219 fmt.write_str(Self::MESSAGE)
220 }
221}
222
223impl error::Error for SetExporterError {}
224
225#[cfg(feature = "enable")]
226impl Collector {
227 fn new(process_id: ProcessId, exporter: &'static (dyn Export + Sync)) -> Self {
228 Self {
229 inner: CollectorInner {
230 process_id,
231 exporter,
232 },
233 }
234 }
235
236 #[inline]
237 pub(crate) fn process_id(&self) -> ProcessId {
238 self.inner.process_id
239 }
240
241 /// Collects and exports an external telemetry message.
242 ///
243 /// This method allows external systems to inject telemetry messages into the
244 /// collector pipeline.
245 /// The message will be exported using the configured exporter.
246 ///
247 /// # Examples
248 ///
249 /// ```rust
250 /// use core::num::NonZeroU64;
251 /// use veecle_telemetry::collector::get_collector;
252 /// use veecle_telemetry::protocol::{
253 /// ThreadId,
254 /// ProcessId,
255 /// InstanceMessage,
256 /// TelemetryMessage,
257 /// TimeSyncMessage,
258 /// };
259 ///
260 /// let collector = get_collector();
261 /// let message = InstanceMessage {
262 /// thread_id: ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
263 /// message: TelemetryMessage::TimeSync(TimeSyncMessage {
264 /// local_timestamp: 0,
265 /// since_epoch: 0,
266 /// }),
267 /// };
268 /// collector.collect_external(message);
269 /// ```
270 #[inline]
271 pub fn collect_external(&self, message: InstanceMessage<'_>) {
272 self.inner.exporter.export(message);
273 }
274
275 #[inline]
276 pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
277 self.tracing_message(TracingMessage::CreateSpan(span));
278 }
279
280 #[inline]
281 pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
282 self.tracing_message(TracingMessage::EnterSpan(enter));
283 }
284
285 #[inline]
286 pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
287 self.tracing_message(TracingMessage::ExitSpan(exit));
288 }
289
290 #[inline]
291 pub(crate) fn close_span(&self, span: SpanCloseMessage) {
292 self.tracing_message(TracingMessage::CloseSpan(span));
293 }
294
295 #[inline]
296 pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
297 self.tracing_message(TracingMessage::AddEvent(event));
298 }
299
300 #[inline]
301 pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
302 self.tracing_message(TracingMessage::AddLink(link));
303 }
304
305 #[inline]
306 pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
307 self.tracing_message(TracingMessage::SetAttribute(attribute));
308 }
309
310 #[inline]
311 pub(crate) fn log_message(&self, log: LogMessage<'_>) {
312 self.inner.exporter.export(InstanceMessage {
313 thread_id: ThreadId::current(self.inner.process_id),
314 message: TelemetryMessage::Log(log),
315 });
316 }
317
318 #[inline]
319 fn tracing_message(&self, message: TracingMessage<'_>) {
320 self.inner.exporter.export(InstanceMessage {
321 thread_id: ThreadId::current(self.inner.process_id),
322 message: TelemetryMessage::Tracing(message),
323 });
324 }
325}