veecle_telemetry/collector/
collector.rs

1use core::fmt::Debug;
2
3use super::{Export, ProcessId};
4
5#[cfg(feature = "enable")]
6use crate::protocol::transient::{
7    InstanceMessage, KeyValue, LogMessage, Severity, SpanAddEventMessage, SpanAddLinkMessage,
8    SpanCloseMessage, SpanContext, SpanCreateMessage, SpanEnterMessage, SpanExitMessage, SpanId,
9    SpanSetAttributeMessage, TelemetryMessage, ThreadId, TracingMessage,
10};
11
12/// The global telemetry collector.
13///
14/// This structure manages the collection and export of telemetry data.
15/// It maintains a unique execution ID, handles trace ID generation, and coordinates with the
16/// configured exporter.
17///
18/// The collector is typically accessed through the [`get_collector`][super::get_collector] function rather
19/// than being constructed directly.
20#[derive(Debug)]
21pub struct Collector {
22    #[cfg(feature = "enable")]
23    inner: CollectorInner,
24}
25
26#[cfg(feature = "enable")]
27#[derive(Debug)]
28struct CollectorInner {
29    process_id: ProcessId,
30    exporter: &'static (dyn Export + Sync),
31    now_fn: fn() -> u64,
32    thread_id_fn: fn() -> core::num::NonZeroU64,
33}
34
35impl Collector {
36    pub(super) const fn new(
37        process_id: ProcessId,
38        exporter: &'static (dyn Export + Sync),
39        now_fn: fn() -> u64,
40        thread_id_fn: fn() -> core::num::NonZeroU64,
41    ) -> Self {
42        #[cfg(not(feature = "enable"))]
43        let _ = (process_id, exporter, now_fn, thread_id_fn);
44
45        Self {
46            #[cfg(feature = "enable")]
47            inner: CollectorInner {
48                process_id,
49                exporter,
50                now_fn,
51                thread_id_fn,
52            },
53        }
54    }
55
56    #[inline]
57    #[cfg(feature = "enable")]
58    pub(crate) fn process_id(&self) -> ProcessId {
59        self.inner.process_id
60    }
61
62    #[inline]
63    #[cfg(feature = "enable")]
64    pub(crate) fn now(&self) -> u64 {
65        (self.inner.now_fn)()
66    }
67
68    #[inline]
69    #[cfg(feature = "enable")]
70    pub(crate) fn thread_id(&self) -> ThreadId {
71        ThreadId::from_raw(self.inner.process_id, (self.inner.thread_id_fn)())
72    }
73
74    /// Collects and exports an external telemetry message.
75    ///
76    /// This method allows external systems to inject telemetry messages into the
77    /// collector pipeline.
78    /// The message will be exported using the configured exporter.
79    ///
80    /// # Examples
81    ///
82    /// ```rust
83    /// use core::num::NonZeroU64;
84    /// use veecle_telemetry::collector::get_collector;
85    /// use veecle_telemetry::protocol::transient::{
86    ///     ThreadId,
87    ///     ProcessId,
88    ///     InstanceMessage,
89    ///     TelemetryMessage,
90    ///     TimeSyncMessage,
91    /// };
92    ///
93    /// let collector = get_collector();
94    /// let message = InstanceMessage {
95    ///     thread_id: ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
96    ///     message: TelemetryMessage::TimeSync(TimeSyncMessage {
97    ///         local_timestamp: 0,
98    ///         since_epoch: 0,
99    ///     }),
100    /// };
101    /// collector.collect_external(message);
102    /// ```
103    #[inline]
104    #[cfg(feature = "enable")]
105    pub fn collect_external(&self, message: InstanceMessage<'_>) {
106        self.inner.exporter.export(message);
107    }
108
109    #[inline]
110    #[cfg(feature = "enable")]
111    pub(crate) fn new_span<'a>(
112        &self,
113        span_id: SpanId,
114        name: &'a str,
115        attributes: &'a [KeyValue<'a>],
116    ) {
117        self.tracing_message(TracingMessage::CreateSpan(SpanCreateMessage {
118            span_id,
119            name,
120            start_time_unix_nano: self.now(),
121            attributes,
122        }));
123    }
124
125    #[inline]
126    #[cfg(feature = "enable")]
127    pub(crate) fn enter_span(&self, span_id: SpanId) {
128        self.tracing_message(TracingMessage::EnterSpan(SpanEnterMessage {
129            span_id,
130            time_unix_nano: self.now(),
131        }));
132    }
133
134    #[inline]
135    #[cfg(feature = "enable")]
136    pub(crate) fn exit_span(&self, span_id: SpanId) {
137        self.tracing_message(TracingMessage::ExitSpan(SpanExitMessage {
138            span_id,
139            time_unix_nano: self.now(),
140        }));
141    }
142
143    #[inline]
144    #[cfg(feature = "enable")]
145    pub(crate) fn close_span(&self, span_id: SpanId) {
146        self.tracing_message(TracingMessage::CloseSpan(SpanCloseMessage {
147            span_id,
148            end_time_unix_nano: self.now(),
149        }));
150    }
151
152    #[inline]
153    #[cfg(feature = "enable")]
154    pub(crate) fn span_event<'a>(
155        &self,
156        span_id: Option<SpanId>,
157        name: &'a str,
158        attributes: &'a [KeyValue<'a>],
159    ) {
160        self.tracing_message(TracingMessage::AddEvent(SpanAddEventMessage {
161            span_id,
162            name,
163            time_unix_nano: self.now(),
164            attributes,
165        }));
166    }
167
168    #[inline]
169    #[cfg(feature = "enable")]
170    pub(crate) fn span_link(&self, span_id: Option<SpanId>, link: SpanContext) {
171        self.tracing_message(TracingMessage::AddLink(SpanAddLinkMessage {
172            span_id,
173            link,
174        }));
175    }
176
177    #[inline]
178    #[cfg(feature = "enable")]
179    pub(crate) fn span_attribute<'a>(&self, span_id: Option<SpanId>, attribute: KeyValue<'a>) {
180        self.tracing_message(TracingMessage::SetAttribute(SpanSetAttributeMessage {
181            span_id,
182            attribute,
183        }));
184    }
185
186    #[inline]
187    #[cfg(feature = "enable")]
188    pub(crate) fn log_message<'a>(
189        &self,
190        severity: Severity,
191        body: &'a str,
192        attributes: &'a [KeyValue<'a>],
193    ) {
194        self.inner.exporter.export(InstanceMessage {
195            thread_id: self.thread_id(),
196            message: TelemetryMessage::Log(LogMessage {
197                time_unix_nano: self.now(),
198                severity,
199                body,
200                attributes,
201            }),
202        });
203    }
204
205    #[inline]
206    #[cfg(feature = "enable")]
207    fn tracing_message(&self, message: TracingMessage<'_>) {
208        self.inner.exporter.export(InstanceMessage {
209            thread_id: self.thread_id(),
210            message: TelemetryMessage::Tracing(message),
211        });
212    }
213}