veecle_telemetry/
protocol.rs

1//! Telemetry protocol types and message definitions.
2//!
3//! This module defines the core data structures used for telemetry message exchange.
4//! It includes message types for logging, tracing, and time synchronization, as well
5//! as supporting types for execution tracking and attribute management.
6//!
7//! # Message Types
8//!
9//! The protocol supports several categories of telemetry messages:
10//!
11//! - **Log Messages** - Structured logging with severity levels and attributes
12//! - **Tracing Messages** - Distributed tracing with spans, events, and links
13//! - **Time Sync Messages** - Time synchronization between systems
14//!
15//! # Thread Tracking
16//!
17//! Each message is associated with an [`ThreadId`] that uniquely identifies
18//! the thread it came from (globally unique across all processes).
19//! This allows telemetry data from multiple threads to be correlated and analyzed separately.
20//!
21//! # Serialization
22//!
23//! All protocol types implement [`serde::Serialize`] and optionally [`serde::Deserialize`]
24//! (when the `alloc` feature is enabled) for easy serialization to various formats.
25
26#[cfg(feature = "alloc")]
27use alloc::vec::Vec;
28use core::fmt;
29use core::num::NonZeroU64;
30use core::str::FromStr;
31
32use serde::{Deserialize, Serialize};
33
34use crate::SpanContext;
35pub use crate::id::{ProcessId, SpanId};
36#[cfg(feature = "alloc")]
37use crate::to_static::ToStatic;
38use crate::types::{ListType, StringType, list_from_slice};
39use crate::value::KeyValue;
40
41/// A specialised form of [`list_from_slice`] for attributes.
42pub fn attribute_list_from_slice<'a>(slice: &'a [KeyValue<'a>]) -> AttributeListType<'a> {
43    list_from_slice::<KeyValue<'a>>(slice)
44}
45
46/// Type alias for a list of attributes.
47pub type AttributeListType<'a> = ListType<'a, KeyValue<'a>>;
48
49#[cfg(feature = "alloc")]
50impl ToStatic for AttributeListType<'_> {
51    type Static = AttributeListType<'static>;
52
53    fn to_static(&self) -> Self::Static {
54        self.iter()
55            .map(|item| item.to_static())
56            .collect::<Vec<_>>()
57            .into()
58    }
59}
60
61/// A globally-unique id identifying a thread within a specific process.
62///
63/// The primary purpose of this id is to allow the consumer of telemetry messages to associate
64/// spans with the callstack they came from to reconstruct parent-child relationships. On a normal
65/// operating system this is the thread, on other systems it should be whatever is the closest
66/// equivalent, e.g. for FreeRTOS it would be a task. On a single threaded bare-metal system it
67/// would be a constant as there is only the one callstack.
68#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
69pub struct ThreadId {
70    /// The globally-unique id for the process this thread is within.
71    pub process: ProcessId,
72
73    /// The process-unique id for this thread within the process.
74    raw: NonZeroU64,
75}
76
77impl ThreadId {
78    /// Creates a [`ThreadId`] from a raw value.
79    ///
80    /// Extra care needs to be taken that this is not a constant value or re-used within this
81    /// process in any way.
82    pub const fn from_raw(process: ProcessId, raw: NonZeroU64) -> Self {
83        Self { process, raw }
84    }
85
86    /// Creates a [`ThreadId`] for the current thread, using OS specific means to acquire it.
87    #[cfg(feature = "enable")]
88    pub(crate) fn current(process: ProcessId) -> Self {
89        #[cfg_attr(not(feature = "std"), expect(unreachable_code))]
90        Self::from_raw(process, {
91            #[cfg(feature = "std")]
92            {
93                use veecle_osal_std::thread::{Thread, ThreadAbstraction};
94                Thread::current_thread_id()
95            }
96
97            #[cfg(not(feature = "std"))]
98            {
99                panic!("not yet supported")
100            }
101        })
102    }
103
104    /// Returns the raw value of this id.
105    pub fn raw(&self) -> NonZeroU64 {
106        self.raw
107    }
108}
109
110impl fmt::Display for ThreadId {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        let Self { process, raw } = self;
113        write!(f, "{process}:{raw:032x}")
114    }
115}
116
117/// Errors that can occur while parsing [`ThreadId`] from a string.
118#[derive(Clone, Debug)]
119pub enum ParseThreadIdError {
120    /// The string is missing a `:` separator.
121    MissingSeparator,
122
123    /// The embedded [`ProcessId`] failed to parse.
124    InvalidProcessId(core::num::ParseIntError),
125
126    /// The embedded [`ThreadId`] failed to parse.
127    InvalidThreadId(core::num::ParseIntError),
128
129    /// The embedded [`ThreadId`] had a zero value.
130    ZeroThreadId,
131}
132
133impl fmt::Display for ParseThreadIdError {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        match self {
136            Self::MissingSeparator => f.write_str("missing ':' separator"),
137            Self::InvalidProcessId(_) => f.write_str("failed to parse process id"),
138            Self::InvalidThreadId(_) => f.write_str("failed to parse thread id"),
139            Self::ZeroThreadId => f.write_str("zero thread id"),
140        }
141    }
142}
143
144impl core::error::Error for ParseThreadIdError {
145    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
146        match self {
147            Self::MissingSeparator => None,
148            Self::InvalidProcessId(error) => Some(error),
149            Self::InvalidThreadId(error) => Some(error),
150            Self::ZeroThreadId => None,
151        }
152    }
153}
154
155impl FromStr for ThreadId {
156    type Err = ParseThreadIdError;
157
158    fn from_str(s: &str) -> Result<Self, Self::Err> {
159        let Some((process, thread)) = s.split_once(":") else {
160            return Err(ParseThreadIdError::MissingSeparator);
161        };
162        let process = ProcessId::from_str(process).map_err(ParseThreadIdError::InvalidProcessId)?;
163        let thread = NonZeroU64::new(
164            u64::from_str_radix(thread, 16).map_err(ParseThreadIdError::InvalidThreadId)?,
165        )
166        .ok_or(ParseThreadIdError::ZeroThreadId)?;
167        Ok(Self::from_raw(process, thread))
168    }
169}
170
171impl serde::Serialize for ThreadId {
172    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
173    where
174        S: serde::Serializer,
175    {
176        let mut bytes = [0u8; 49];
177
178        hex::encode_to_slice(self.process.to_raw().to_le_bytes(), &mut bytes[..32]).unwrap();
179        bytes[32] = b':';
180        hex::encode_to_slice(self.raw().get().to_le_bytes(), &mut bytes[33..]).unwrap();
181
182        serializer.serialize_str(str::from_utf8(&bytes).unwrap())
183    }
184}
185
186impl<'de> serde::Deserialize<'de> for ThreadId {
187    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
188    where
189        D: serde::Deserializer<'de>,
190    {
191        use serde::de::Error;
192
193        let string = <&str>::deserialize(deserializer)?;
194
195        if string.len() != 49 {
196            return Err(D::Error::invalid_length(
197                string.len(),
198                &"expected 49 byte string",
199            ));
200        }
201
202        let bytes = string.as_bytes();
203
204        if bytes[32] != b':' {
205            return Err(D::Error::invalid_value(
206                serde::de::Unexpected::Str(string),
207                &"expected : separator at byte 32",
208            ));
209        }
210
211        let mut process = [0; 16];
212        hex::decode_to_slice(&bytes[..32], &mut process).map_err(D::Error::custom)?;
213        let process = ProcessId::from_raw(u128::from_le_bytes(process));
214
215        let mut thread = [0; 8];
216        hex::decode_to_slice(&bytes[33..], &mut thread).map_err(D::Error::custom)?;
217        let thread = NonZeroU64::new(u64::from_le_bytes(thread))
218            .ok_or_else(|| D::Error::custom("zero thread id"))?;
219
220        Ok(Self::from_raw(process, thread))
221    }
222}
223
224/// A telemetry message associated with a specific execution thread.
225///
226/// This structure wraps a telemetry message with its execution context,
227/// allowing messages from different executions to be properly correlated.
228#[derive(Clone, Debug, Serialize)]
229#[cfg_attr(feature = "alloc", derive(Deserialize))]
230pub struct InstanceMessage<'a> {
231    /// The thread this message belongs to
232    pub thread_id: ThreadId,
233
234    /// The telemetry message content
235    #[serde(borrow)]
236    pub message: TelemetryMessage<'a>,
237}
238
239#[cfg(feature = "alloc")]
240impl ToStatic for InstanceMessage<'_> {
241    type Static = InstanceMessage<'static>;
242
243    fn to_static(&self) -> Self::Static {
244        InstanceMessage {
245            thread_id: self.thread_id,
246            message: self.message.to_static(),
247        }
248    }
249}
250
251/// An enumeration of all possible telemetry message types.
252///
253/// This enum represents the different categories of telemetry data that can be
254/// collected and exported by the system.
255#[derive(Clone, Debug, Serialize)]
256#[cfg_attr(feature = "alloc", derive(Deserialize))]
257pub enum TelemetryMessage<'a> {
258    /// A structured log message with severity and attributes
259    Log(#[serde(borrow)] LogMessage<'a>),
260    /// A time synchronization message for clock coordination
261    TimeSync(TimeSyncMessage),
262    /// A distributed tracing message (spans, events, links)
263    Tracing(#[serde(borrow)] TracingMessage<'a>),
264}
265
266#[cfg(feature = "alloc")]
267impl ToStatic for TelemetryMessage<'_> {
268    type Static = TelemetryMessage<'static>;
269
270    fn to_static(&self) -> Self::Static {
271        match self {
272            TelemetryMessage::Log(msg) => TelemetryMessage::Log(msg.to_static()),
273            TelemetryMessage::TimeSync(msg) => TelemetryMessage::TimeSync(msg.clone()),
274            TelemetryMessage::Tracing(msg) => TelemetryMessage::Tracing(msg.to_static()),
275        }
276    }
277}
278
279/// Log message severity levels.
280///
281/// These levels follow standard logging conventions, ordered from most verbose
282/// to most critical.
283#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
284pub enum Severity {
285    /// The "trace" level.
286    ///
287    /// Designates very low priority, often extremely verbose, information.
288    Trace,
289    /// The "debug" level.
290    ///
291    /// Designates lower priority information.
292    Debug,
293    /// The "info" level.
294    ///
295    /// Designates useful information.
296    Info,
297    /// The "warn" level.
298    ///
299    /// Designates hazardous situations.
300    Warn,
301    /// The "error" level.
302    ///
303    /// Designates very serious errors.
304    Error,
305    /// The "fatal" level.
306    ///
307    /// Designates critical failures that might crash the program.
308    Fatal,
309}
310
311/// A structured log message with severity, timestamp, and attributes.
312///
313/// Log messages can be optionally correlated with traces by including trace and span IDs when available.
314#[derive(Clone, Debug, Serialize)]
315#[cfg_attr(feature = "alloc", derive(Deserialize))]
316pub struct LogMessage<'a> {
317    /// Timestamp in nanoseconds since Unix epoch (or system start)
318    pub time_unix_nano: u64,
319    /// The severity level of this log message
320    pub severity: Severity,
321
322    /// The message body
323    #[serde(borrow)]
324    pub body: StringType<'a>,
325
326    /// Key-value attributes providing additional context
327    #[serde(borrow)]
328    pub attributes: AttributeListType<'a>,
329}
330
331#[cfg(feature = "alloc")]
332impl ToStatic for LogMessage<'_> {
333    type Static = LogMessage<'static>;
334
335    fn to_static(&self) -> Self::Static {
336        LogMessage {
337            time_unix_nano: self.time_unix_nano,
338            severity: self.severity,
339            body: self.body.to_static(),
340            attributes: self.attributes.to_static(),
341        }
342    }
343}
344
345/// A time synchronization message for coordinating clocks between systems.
346///
347/// This message contains both local time and absolute time information,
348/// allowing downstream systems to correlate local timestamps with real-world time.
349#[derive(Clone, Debug, Serialize, Deserialize)]
350pub struct TimeSyncMessage {
351    /// Local timestamp in system-specific units
352    pub local_timestamp: u64,
353    /// Time since Unix epoch in nanoseconds
354    pub since_epoch: u64,
355}
356
357/// Messages related to distributed tracing operations.
358///
359/// This enum encompasses all the different types of tracing messages that can be
360/// generated during span lifecycle management and tracing operations.
361#[derive(Clone, Debug, Serialize)]
362#[cfg_attr(feature = "alloc", derive(Deserialize))]
363pub enum TracingMessage<'a> {
364    /// A new span has been created
365    CreateSpan(#[serde(borrow)] SpanCreateMessage<'a>),
366    /// A span has been entered (made current)
367    EnterSpan(SpanEnterMessage),
368    /// A span has been exited (no longer current)
369    ExitSpan(SpanExitMessage),
370    /// A span has been closed (completed)
371    CloseSpan(SpanCloseMessage),
372    /// An event has been added to a span
373    AddEvent(#[serde(borrow)] SpanAddEventMessage<'a>),
374    /// A link has been added to a span
375    AddLink(SpanAddLinkMessage),
376    /// An attribute has been set on a span
377    SetAttribute(#[serde(borrow)] SpanSetAttributeMessage<'a>),
378}
379
380#[cfg(feature = "alloc")]
381impl ToStatic for TracingMessage<'_> {
382    type Static = TracingMessage<'static>;
383
384    fn to_static(&self) -> Self::Static {
385        match self {
386            TracingMessage::CreateSpan(msg) => TracingMessage::CreateSpan(msg.to_static()),
387            TracingMessage::EnterSpan(msg) => TracingMessage::EnterSpan(*msg),
388            TracingMessage::ExitSpan(msg) => TracingMessage::ExitSpan(*msg),
389            TracingMessage::CloseSpan(msg) => TracingMessage::CloseSpan(*msg),
390            TracingMessage::AddEvent(msg) => TracingMessage::AddEvent(msg.to_static()),
391            TracingMessage::AddLink(msg) => TracingMessage::AddLink(*msg),
392            TracingMessage::SetAttribute(msg) => TracingMessage::SetAttribute(msg.to_static()),
393        }
394    }
395}
396
397/// Message indicating the creation of a new span.
398///
399/// This message provides all the information needed to create a new span
400/// in the trace, including its identity, timing, and initial attributes.
401#[derive(Clone, Debug, Serialize)]
402#[cfg_attr(feature = "alloc", derive(Deserialize))]
403pub struct SpanCreateMessage<'a> {
404    /// The unique identifier (within the associated process) for this span.
405    pub span_id: SpanId,
406
407    /// The name of the span
408    #[serde(borrow)]
409    pub name: StringType<'a>,
410
411    /// Timestamp when the span was started
412    pub start_time_unix_nano: u64,
413
414    /// Initial attributes attached to the span
415    #[serde(borrow)]
416    pub attributes: AttributeListType<'a>,
417}
418
419#[cfg(feature = "alloc")]
420impl ToStatic for SpanCreateMessage<'_> {
421    type Static = SpanCreateMessage<'static>;
422
423    fn to_static(&self) -> Self::Static {
424        SpanCreateMessage {
425            span_id: self.span_id,
426            name: self.name.to_static(),
427            start_time_unix_nano: self.start_time_unix_nano,
428            attributes: self.attributes.to_static(),
429        }
430    }
431}
432
433/// Message indicating a span has been entered.
434#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
435pub struct SpanEnterMessage {
436    /// The span being entered
437    pub span_id: SpanId,
438
439    /// Timestamp when the span was entered
440    pub time_unix_nano: u64,
441}
442
443/// Message indicating a span has been exited.
444#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
445pub struct SpanExitMessage {
446    /// The span being exited
447    pub span_id: SpanId,
448
449    /// Timestamp when the span was exited
450    pub time_unix_nano: u64,
451}
452
453/// Message indicating a span has been closed (completed).
454#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
455pub struct SpanCloseMessage {
456    /// The span being closed
457    pub span_id: SpanId,
458
459    /// Timestamp when the span was closed
460    pub end_time_unix_nano: u64,
461}
462
463/// Message indicating an attribute has been set on a span.
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct SpanSetAttributeMessage<'a> {
466    /// The span the attribute is being set on, if [`None`] then this applies to the "current span"
467    /// as determined by tracking [`SpanEnterMessage`] and [`SpanExitMessage`] pairs.
468    pub span_id: Option<SpanId>,
469
470    /// The attribute being set
471    #[serde(borrow)]
472    pub attribute: KeyValue<'a>,
473}
474
475#[cfg(feature = "alloc")]
476impl ToStatic for SpanSetAttributeMessage<'_> {
477    type Static = SpanSetAttributeMessage<'static>;
478
479    fn to_static(&self) -> Self::Static {
480        SpanSetAttributeMessage {
481            span_id: self.span_id,
482            attribute: self.attribute.to_static(),
483        }
484    }
485}
486
487/// Message indicating an event has been added to a span.
488#[derive(Clone, Debug, Serialize)]
489#[cfg_attr(feature = "alloc", derive(Deserialize))]
490pub struct SpanAddEventMessage<'a> {
491    /// The span the event is being added to, if [`None`] then this applies to the "current span"
492    /// as determined by tracking [`SpanEnterMessage`] and [`SpanExitMessage`] pairs.
493    pub span_id: Option<SpanId>,
494
495    /// The name of the event
496    #[serde(borrow)]
497    pub name: StringType<'a>,
498
499    /// Timestamp when the event occurred
500    pub time_unix_nano: u64,
501
502    /// Attributes providing additional context for the event
503    #[serde(borrow)]
504    pub attributes: AttributeListType<'a>,
505}
506
507#[cfg(feature = "alloc")]
508impl ToStatic for SpanAddEventMessage<'_> {
509    type Static = SpanAddEventMessage<'static>;
510
511    fn to_static(&self) -> Self::Static {
512        SpanAddEventMessage {
513            span_id: self.span_id,
514            name: self.name.to_static(),
515            time_unix_nano: self.time_unix_nano,
516            attributes: self.attributes.to_static(),
517        }
518    }
519}
520
521/// Message indicating a link has been added to a span.
522///
523/// Links connect spans across different traces, representing relationships
524/// that are not parent-child hierarchies.
525#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
526pub struct SpanAddLinkMessage {
527    /// The span the link is being added to, if [`None`] then this applies to the "current span"
528    /// as determined by tracking [`SpanEnterMessage`] and [`SpanExitMessage`] pairs.
529    pub span_id: Option<SpanId>,
530
531    /// The span context being linked to
532    pub link: SpanContext,
533}
534
535#[cfg(test)]
536#[cfg_attr(coverage_nightly, coverage(off))]
537mod tests {
538    use alloc::format;
539    #[cfg(feature = "alloc")]
540    use alloc::string::String;
541
542    use super::*;
543
544    #[test]
545    fn thread_id_format_from_str_roundtrip() {
546        let test_cases = [
547            ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
548            ThreadId::from_raw(
549                ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
550                NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
551            ),
552            ThreadId::from_raw(
553                ProcessId::from_raw(u128::MAX),
554                NonZeroU64::new(u64::MAX).unwrap(),
555            ),
556            ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
557        ];
558
559        for thread_id in test_cases {
560            let formatted = format!("{thread_id}");
561            let parsed = formatted.parse::<ThreadId>().unwrap();
562            assert_eq!(
563                thread_id,
564                parsed,
565                "Failed roundtrip for {:#x}:{:#x}",
566                thread_id.process.to_raw(),
567                thread_id.raw(),
568            );
569        }
570    }
571
572    #[test]
573    fn thread_id_serde_roundtrip() {
574        let test_cases = [
575            ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
576            ThreadId::from_raw(
577                ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
578                NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
579            ),
580            ThreadId::from_raw(
581                ProcessId::from_raw(u128::MAX),
582                NonZeroU64::new(u64::MAX).unwrap(),
583            ),
584            ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
585        ];
586
587        for original in test_cases {
588            let json = serde_json::to_string(&original).unwrap();
589            let deserialized: ThreadId = serde_json::from_str(&json).unwrap();
590            assert_eq!(original, deserialized);
591        }
592    }
593
594    #[test]
595    fn string_type_conversions() {
596        let static_str: StringType<'static> = "static".into();
597
598        let _event = SpanAddEventMessage {
599            span_id: Some(SpanId(0)),
600            name: static_str,
601            time_unix_nano: 0,
602            attributes: attribute_list_from_slice(&[]),
603        };
604
605        let borrowed_str: StringType = "borrowed".into();
606
607        let _event = SpanAddEventMessage {
608            span_id: Some(SpanId(0)),
609            name: borrowed_str,
610            time_unix_nano: 0,
611            attributes: attribute_list_from_slice(&[]),
612        };
613    }
614
615    #[cfg(any(feature = "std", feature = "alloc"))]
616    #[test]
617    fn string_type_with_owned_strings() {
618        let string = String::from("owned");
619        let owned: StringType<'static> = StringType::from(string);
620
621        let _event = SpanAddEventMessage {
622            span_id: Some(SpanId(0)),
623            name: owned,
624            time_unix_nano: 0,
625            attributes: attribute_list_from_slice(&[]),
626        };
627    }
628
629    #[cfg(feature = "alloc")]
630    #[test]
631    fn to_static_conversion() {
632        use alloc::string::String;
633
634        use crate::value::Value;
635
636        // Create some data with non-static lifetime
637        let borrowed_name_str = "test_span";
638        let borrowed_name: StringType = borrowed_name_str.into();
639
640        let owned_key = String::from("test_key");
641        let owned_value = String::from("test_value");
642        let attribute = KeyValue {
643            key: owned_key.as_str().into(),
644            value: Value::String(owned_value.as_str().into()),
645        };
646
647        let attributes = [attribute];
648        let span_event = SpanAddEventMessage {
649            span_id: Some(SpanId(0)),
650            name: borrowed_name,
651            time_unix_nano: 0,
652            attributes: attribute_list_from_slice(&attributes),
653        };
654
655        let tracing_message = TracingMessage::AddEvent(span_event);
656        let telemetry_message = TelemetryMessage::Tracing(tracing_message);
657        let instance_message = InstanceMessage {
658            thread_id: ThreadId::from_raw(ProcessId::from_raw(999), NonZeroU64::new(111).unwrap()),
659            message: telemetry_message,
660        };
661
662        let static_message: InstanceMessage<'static> = instance_message.to_static();
663
664        // Verify the conversion worked - the static message should have the same data
665        if let TelemetryMessage::Tracing(TracingMessage::AddEvent(span_event)) =
666            &static_message.message
667        {
668            assert_eq!(span_event.name.as_ref(), "test_span");
669        } else {
670            panic!("Expected CreateSpan message");
671        }
672    }
673}