veecle_telemetry/collector/
mod.rs

1//! Telemetry data collection and export infrastructure.
2//!
3//! This module provides the core infrastructure for collecting telemetry data and exporting it
4//! to various backends.
5//! It includes the global collector singleton, export trait, and various
6//! built-in exporters.
7//!
8//! # Global Collector
9//!
10//! The collector uses a global singleton pattern to ensure telemetry data is collected
11//! consistently across the entire application.
12//! The collector must be initialized once
13//! using [`set_exporter`] before any telemetry data can be collected.
14//!
15//! # Export Trait
16//!
17//! The [`Export`] trait defines the interface for exporting telemetry data.
18//! Custom exporters can be implemented by providing an implementation of this trait.
19//!
20//! # Built-in Exporters
21//!
22//! - [`ConsoleJsonExporter`] - Exports telemetry data as JSON to stdout
23//! - [`TestExporter`] - Collects telemetry data in memory for testing purposes
24
25#[cfg(feature = "std")]
26mod json_exporter;
27#[cfg(feature = "std")]
28mod pretty_exporter;
29#[cfg(feature = "std")]
30mod test_exporter;
31
32use core::fmt::Debug;
33#[cfg(feature = "enable")]
34use core::sync::atomic::{AtomicUsize, Ordering};
35use core::{error, fmt};
36
37#[cfg(feature = "std")]
38pub use json_exporter::ConsoleJsonExporter;
39#[cfg(feature = "std")]
40pub use pretty_exporter::ConsolePrettyExporter;
41#[cfg(feature = "std")]
42#[doc(hidden)]
43pub use test_exporter::TestExporter;
44
45use crate::protocol::InstanceMessage;
46#[cfg(feature = "enable")]
47pub use crate::protocol::ProcessId;
48#[cfg(feature = "enable")]
49use crate::protocol::{
50    LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
51    SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, ThreadId,
52    TracingMessage,
53};
54
55/// Trait for exporting telemetry data to external systems.
56///
57/// Implementors of this trait define how telemetry data should be exported,
58/// whether to files, network endpoints, or other destinations.
59///
60/// # Examples
61///
62/// ```rust
63/// use veecle_telemetry::collector::Export;
64/// use veecle_telemetry::protocol::InstanceMessage;
65///
66/// #[derive(Debug)]
67/// struct CustomExporter;
68///
69/// impl Export for CustomExporter {
70///     fn export(&self, message: InstanceMessage<'_>) {
71///         // Custom export logic here
72///         println!("Exporting: {:?}", message);
73///     }
74/// }
75/// ```
76pub trait Export: Debug {
77    /// Exports a telemetry message.
78    ///
79    /// This method is called for each telemetry message that needs to be exported.
80    /// The implementation should handle the message appropriately based on its type.
81    fn export(&self, message: InstanceMessage<'_>);
82}
83
84/// The global telemetry collector.
85///
86/// This structure manages the collection and export of telemetry data.
87/// It maintains a unique execution ID, handles trace ID generation, and coordinates with the
88/// configured exporter.
89///
90/// The collector is typically accessed through the [`get_collector`] function rather
91/// than being constructed directly.
92#[derive(Debug)]
93pub struct Collector {
94    #[cfg(feature = "enable")]
95    inner: CollectorInner,
96}
97
98#[cfg(feature = "enable")]
99#[derive(Debug)]
100struct CollectorInner {
101    process_id: ProcessId,
102
103    exporter: &'static (dyn Export + Sync),
104}
105
106#[cfg(feature = "enable")]
107#[derive(Debug)]
108struct NopExporter;
109
110#[cfg(feature = "enable")]
111impl Export for NopExporter {
112    fn export(&self, _: InstanceMessage) {}
113}
114
115// The GLOBAL_COLLECTOR static holds a pointer to the global exporter. It is protected by
116// the GLOBAL_INIT static which determines whether GLOBAL_EXPORTER has been initialized.
117#[cfg(feature = "enable")]
118static mut GLOBAL_COLLECTOR: Collector = Collector {
119    inner: CollectorInner {
120        process_id: ProcessId::from_raw(0),
121        exporter: &NO_EXPORTER,
122    },
123};
124static NO_COLLECTOR: Collector = Collector {
125    #[cfg(feature = "enable")]
126    inner: CollectorInner {
127        process_id: ProcessId::from_raw(0),
128        exporter: &NO_EXPORTER,
129    },
130};
131#[cfg(feature = "enable")]
132static NO_EXPORTER: NopExporter = NopExporter;
133
134#[cfg(feature = "enable")]
135static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
136
137// There are three different states that we care about:
138// - the collector is uninitialized
139// - the collector is initializing (set_exporter has been called but GLOBAL_COLLECTOR hasn't been set yet)
140// - the collector is active
141#[cfg(feature = "enable")]
142const UNINITIALIZED: usize = 0;
143#[cfg(feature = "enable")]
144const INITIALIZING: usize = 1;
145#[cfg(feature = "enable")]
146const INITIALIZED: usize = 2;
147
148/// Initializes the collector with the given Exporter and [`ProcessId`].
149///
150/// A [`ProcessId`] should never be re-used as it's used to collect metadata about the execution and to generate
151/// [`SpanContext`]s which need to be globally unique.
152///
153/// [`SpanContext`]: crate::SpanContext
154#[cfg(feature = "enable")]
155pub fn set_exporter(
156    process_id: ProcessId,
157    exporter: &'static (dyn Export + Sync),
158) -> Result<(), SetExporterError> {
159    if GLOBAL_INIT
160        .compare_exchange(
161            UNINITIALIZED,
162            INITIALIZING,
163            Ordering::Acquire,
164            Ordering::Relaxed,
165        )
166        .is_ok()
167    {
168        // SAFETY: this is guarded by the atomic
169        unsafe { GLOBAL_COLLECTOR = Collector::new(process_id, exporter) }
170        GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
171
172        Ok(())
173    } else {
174        Err(SetExporterError(()))
175    }
176}
177
178/// Returns a reference to the collector.
179///
180/// If an exporter has not been set, a no-op implementation is returned.
181pub fn get_collector() -> &'static Collector {
182    #[cfg(not(feature = "enable"))]
183    {
184        &NO_COLLECTOR
185    }
186
187    // Acquire memory ordering guarantees that current thread would see any
188    // memory writes that happened before store of the value
189    // into `GLOBAL_INIT` with memory ordering `Release` or stronger.
190    //
191    // Since the value `INITIALIZED` is written only after `GLOBAL_COLLECTOR` was
192    // initialized, observing it after `Acquire` load here makes both
193    // write to the `GLOBAL_COLLECTOR` static and initialization of the exporter
194    // internal state synchronized with current thread.
195    #[cfg(feature = "enable")]
196    if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
197        &NO_COLLECTOR
198    } else {
199        // SAFETY: this is guarded by the atomic
200        unsafe {
201            #[expect(clippy::deref_addrof, reason = "false positive")]
202            &*&raw const GLOBAL_COLLECTOR
203        }
204    }
205}
206
207/// The type returned by [`set_exporter`] if [`set_exporter`] has already been called.
208///
209/// [`set_exporter`]: fn.set_exporter.html
210#[derive(Debug)]
211pub struct SetExporterError(());
212
213impl SetExporterError {
214    const MESSAGE: &'static str = "a global exporter has already been set";
215}
216
217impl fmt::Display for SetExporterError {
218    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
219        fmt.write_str(Self::MESSAGE)
220    }
221}
222
223impl error::Error for SetExporterError {}
224
225#[cfg(feature = "enable")]
226impl Collector {
227    fn new(process_id: ProcessId, exporter: &'static (dyn Export + Sync)) -> Self {
228        Self {
229            inner: CollectorInner {
230                process_id,
231                exporter,
232            },
233        }
234    }
235
236    #[inline]
237    pub(crate) fn process_id(&self) -> ProcessId {
238        self.inner.process_id
239    }
240
241    /// Collects and exports an external telemetry message.
242    ///
243    /// This method allows external systems to inject telemetry messages into the
244    /// collector pipeline.
245    /// The message will be exported using the configured exporter.
246    ///
247    /// # Examples
248    ///
249    /// ```rust
250    /// use core::num::NonZeroU64;
251    /// use veecle_telemetry::collector::get_collector;
252    /// use veecle_telemetry::protocol::{
253    ///     ThreadId,
254    ///     ProcessId,
255    ///     InstanceMessage,
256    ///     TelemetryMessage,
257    ///     TimeSyncMessage,
258    /// };
259    ///
260    /// let collector = get_collector();
261    /// let message = InstanceMessage {
262    ///     thread_id: ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
263    ///     message: TelemetryMessage::TimeSync(TimeSyncMessage {
264    ///         local_timestamp: 0,
265    ///         since_epoch: 0,
266    ///     }),
267    /// };
268    /// collector.collect_external(message);
269    /// ```
270    #[inline]
271    pub fn collect_external(&self, message: InstanceMessage<'_>) {
272        self.inner.exporter.export(message);
273    }
274
275    #[inline]
276    pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
277        self.tracing_message(TracingMessage::CreateSpan(span));
278    }
279
280    #[inline]
281    pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
282        self.tracing_message(TracingMessage::EnterSpan(enter));
283    }
284
285    #[inline]
286    pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
287        self.tracing_message(TracingMessage::ExitSpan(exit));
288    }
289
290    #[inline]
291    pub(crate) fn close_span(&self, span: SpanCloseMessage) {
292        self.tracing_message(TracingMessage::CloseSpan(span));
293    }
294
295    #[inline]
296    pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
297        self.tracing_message(TracingMessage::AddEvent(event));
298    }
299
300    #[inline]
301    pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
302        self.tracing_message(TracingMessage::AddLink(link));
303    }
304
305    #[inline]
306    pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
307        self.tracing_message(TracingMessage::SetAttribute(attribute));
308    }
309
310    #[inline]
311    pub(crate) fn log_message(&self, log: LogMessage<'_>) {
312        self.inner.exporter.export(InstanceMessage {
313            thread_id: ThreadId::current(self.inner.process_id),
314            message: TelemetryMessage::Log(log),
315        });
316    }
317
318    #[inline]
319    fn tracing_message(&self, message: TracingMessage<'_>) {
320        self.inner.exporter.export(InstanceMessage {
321            thread_id: ThreadId::current(self.inner.process_id),
322            message: TelemetryMessage::Tracing(message),
323        });
324    }
325}