Skip to main content

veecle_telemetry/
id.rs

1// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
2// Copyright 2025 Veecle GmbH.
3//
4// This file has been modified from the original TiKV implementation.
5
6//! Unique identifiers for traces and spans.
7//!
8//! This module provides the core identifier types used throughout the telemetry system
9//! to uniquely identify traces and spans in distributed tracing scenarios.
10//!
11//! # Core Types
12//!
13//! - [`SpanId`]: An identifier that uniquely identifies a span within a process.
14//! - [`SpanContext`]: A combination of process id and span id that uniquely identifies a span globally.
15
16use core::fmt;
17use core::num::NonZeroU64;
18use core::str::FromStr;
19use rand::RngExt;
20
21/// A globally-unique id identifying a process.
22///
23/// The primary purpose of this id is to provide a globally-unique context within which
24/// [`ThreadId`]s and [`SpanContext`]s are guaranteed to be unique. On a normal operating system
25/// that is the process, on other systems it should be whatever is the closest equivalent, e.g. for
26/// most embedded setups it should be unique for each time the system is restarted.
27#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
28pub struct ProcessId(u128);
29
30impl ProcessId {
31    /// Uses a random number generator to generate the [`ProcessId`].
32    pub fn random(rng: &mut impl rand::Rng) -> Self {
33        Self(rng.random())
34    }
35
36    /// Creates a [`ProcessId`] from a raw value
37    ///
38    /// Extra care needs to be taken that this is not a constant value or re-used in any way.
39    ///
40    /// When possible prefer using [`ProcessId::random`].
41    pub const fn from_raw(raw: u128) -> Self {
42        Self(raw)
43    }
44
45    /// Returns the raw value of this id.
46    pub fn to_raw(self) -> u128 {
47        self.0
48    }
49}
50
51impl fmt::Display for ProcessId {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        write!(f, "{:032x}", self.0)
54    }
55}
56
57impl FromStr for ProcessId {
58    type Err = core::num::ParseIntError;
59
60    fn from_str(s: &str) -> Result<Self, Self::Err> {
61        u128::from_str_radix(s, 16).map(ProcessId)
62    }
63}
64
65impl serde::Serialize for ProcessId {
66    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
67    where
68        S: serde::Serializer,
69    {
70        let mut hex_bytes = [0u8; size_of::<u128>() * 2];
71        hex::encode_to_slice(self.0.to_le_bytes(), &mut hex_bytes).unwrap();
72
73        serializer.serialize_str(str::from_utf8(&hex_bytes).unwrap())
74    }
75}
76
77impl<'de> serde::Deserialize<'de> for ProcessId {
78    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
79    where
80        D: serde::Deserializer<'de>,
81    {
82        let bytes: [u8; size_of::<u128>()] = hex::serde::deserialize(deserializer)?;
83
84        Ok(ProcessId(u128::from_le_bytes(bytes)))
85    }
86}
87
88/// A globally-unique id identifying a thread within a specific process.
89///
90/// The primary purpose of this id is to allow the consumer of telemetry messages to associate
91/// spans with the callstack they came from to reconstruct parent-child relationships. On a normal
92/// operating system this is the thread, on other systems it should be whatever is the closest
93/// equivalent, e.g. for FreeRTOS it would be a task. On a single threaded bare-metal system it
94/// would be a constant as there is only the one callstack.
95#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
96pub struct ThreadId {
97    /// The globally-unique id for the process this thread is within.
98    pub process: ProcessId,
99
100    /// The process-unique id for this thread within the process.
101    raw: NonZeroU64,
102}
103
104impl ThreadId {
105    /// Creates a [`ThreadId`] from a raw value.
106    ///
107    /// Extra care needs to be taken that this is not a constant value or re-used within this
108    /// process in any way.
109    pub const fn from_raw(process: ProcessId, raw: NonZeroU64) -> Self {
110        Self { process, raw }
111    }
112
113    /// Returns the raw value of this id.
114    pub fn raw(&self) -> NonZeroU64 {
115        self.raw
116    }
117}
118
119impl fmt::Display for ThreadId {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        let Self { process, raw } = self;
122        write!(f, "{process}:{raw:016x}")
123    }
124}
125
126/// Errors that can occur while parsing [`ThreadId`] from a string.
127#[derive(Clone, Debug)]
128pub enum ParseThreadIdError {
129    /// The string is missing a `:` separator.
130    MissingSeparator,
131
132    /// The embedded [`ProcessId`] failed to parse.
133    InvalidProcessId(core::num::ParseIntError),
134
135    /// The embedded [`ThreadId`] failed to parse.
136    InvalidThreadId(core::num::ParseIntError),
137
138    /// The embedded [`ThreadId`] had a zero value.
139    ZeroThreadId,
140}
141
142impl fmt::Display for ParseThreadIdError {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        match self {
145            Self::MissingSeparator => f.write_str("missing ':' separator"),
146            Self::InvalidProcessId(_) => f.write_str("failed to parse process id"),
147            Self::InvalidThreadId(_) => f.write_str("failed to parse thread id"),
148            Self::ZeroThreadId => f.write_str("zero thread id"),
149        }
150    }
151}
152
153impl core::error::Error for ParseThreadIdError {
154    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
155        match self {
156            Self::MissingSeparator => None,
157            Self::InvalidProcessId(error) => Some(error),
158            Self::InvalidThreadId(error) => Some(error),
159            Self::ZeroThreadId => None,
160        }
161    }
162}
163
164impl FromStr for ThreadId {
165    type Err = ParseThreadIdError;
166
167    fn from_str(s: &str) -> Result<Self, Self::Err> {
168        let Some((process, thread)) = s.split_once(":") else {
169            return Err(ParseThreadIdError::MissingSeparator);
170        };
171        let process = ProcessId::from_str(process).map_err(ParseThreadIdError::InvalidProcessId)?;
172        let thread = NonZeroU64::new(
173            u64::from_str_radix(thread, 16).map_err(ParseThreadIdError::InvalidThreadId)?,
174        )
175        .ok_or(ParseThreadIdError::ZeroThreadId)?;
176        Ok(Self::from_raw(process, thread))
177    }
178}
179
180impl serde::Serialize for ThreadId {
181    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182    where
183        S: serde::Serializer,
184    {
185        let mut bytes = [0u8; 49];
186
187        hex::encode_to_slice(self.process.to_raw().to_le_bytes(), &mut bytes[..32]).unwrap();
188        bytes[32] = b':';
189        hex::encode_to_slice(self.raw().get().to_le_bytes(), &mut bytes[33..]).unwrap();
190
191        serializer.serialize_str(str::from_utf8(&bytes).unwrap())
192    }
193}
194
195impl<'de> serde::Deserialize<'de> for ThreadId {
196    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
197    where
198        D: serde::Deserializer<'de>,
199    {
200        use serde::de::Error;
201
202        let string = <&str>::deserialize(deserializer)?;
203
204        if string.len() != 49 {
205            return Err(D::Error::invalid_length(
206                string.len(),
207                &"expected 49 byte string",
208            ));
209        }
210
211        let bytes = string.as_bytes();
212
213        if bytes[32] != b':' {
214            return Err(D::Error::invalid_value(
215                serde::de::Unexpected::Str(string),
216                &"expected : separator at byte 32",
217            ));
218        }
219
220        let mut process = [0; 16];
221        hex::decode_to_slice(&bytes[..32], &mut process).map_err(D::Error::custom)?;
222        let process = ProcessId::from_raw(u128::from_le_bytes(process));
223
224        let mut thread = [0; 8];
225        hex::decode_to_slice(&bytes[33..], &mut thread).map_err(D::Error::custom)?;
226        let thread = NonZeroU64::new(u64::from_le_bytes(thread))
227            .ok_or_else(|| D::Error::custom("zero thread id"))?;
228
229        Ok(Self::from_raw(process, thread))
230    }
231}
232
233/// A process-unique id for a span.
234#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
235pub struct SpanId(pub u64);
236
237#[cfg(feature = "enable")]
238impl SpanId {
239    #[inline]
240    #[doc(hidden)]
241    /// Creates a non-zero [`SpanId`].
242    pub fn next_id() -> Self {
243        use core::sync::atomic;
244        static COUNTER: atomic::AtomicU64 = atomic::AtomicU64::new(1);
245        SpanId(COUNTER.fetch_add(1, atomic::Ordering::Relaxed))
246    }
247}
248
249impl fmt::Display for SpanId {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        write!(f, "{:016x}", self.0)
252    }
253}
254
255impl FromStr for SpanId {
256    type Err = core::num::ParseIntError;
257
258    fn from_str(s: &str) -> Result<Self, Self::Err> {
259        u64::from_str_radix(s, 16).map(SpanId)
260    }
261}
262
263impl serde::Serialize for SpanId {
264    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
265    where
266        S: serde::Serializer,
267    {
268        let mut hex_bytes = [0u8; size_of::<u64>() * 2];
269        hex::encode_to_slice(self.0.to_le_bytes(), &mut hex_bytes).unwrap();
270
271        serializer.serialize_str(str::from_utf8(&hex_bytes).unwrap())
272    }
273}
274
275impl<'de> serde::Deserialize<'de> for SpanId {
276    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
277    where
278        D: serde::Deserializer<'de>,
279    {
280        let bytes: [u8; size_of::<u64>()] = hex::serde::deserialize(deserializer)?;
281
282        Ok(SpanId(u64::from_le_bytes(bytes)))
283    }
284}
285
286/// A struct representing the context of a span, including its [`ProcessId`] and [`SpanId`].
287#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
288pub struct SpanContext {
289    /// The id of the process this span belongs to.
290    pub process_id: ProcessId,
291    /// The unique id of this span.
292    pub span_id: SpanId,
293}
294
295impl SpanContext {
296    /// Creates a new `SpanContext` with the given [`ProcessId`] and [`SpanId`].
297    ///
298    /// # Examples
299    ///
300    /// ```
301    /// use veecle_telemetry::{ProcessId, SpanId, SpanContext};
302    ///
303    /// let span_context = SpanContext::new(ProcessId::from_raw(12), SpanId(13));
304    /// ```
305    pub fn new(process_id: ProcessId, span_id: SpanId) -> Self {
306        Self {
307            process_id,
308            span_id,
309        }
310    }
311}
312
313impl fmt::Display for SpanContext {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        let Self {
316            process_id,
317            span_id,
318        } = self;
319        write!(f, "{process_id}:{span_id}")
320    }
321}
322
323/// Errors that can occur while parsing [`SpanContext`] from a string.
324#[derive(Clone, Debug)]
325pub enum ParseSpanContextError {
326    /// The string is missing a `:` separator.
327    MissingSeparator,
328
329    /// The embedded [`ProcessId`] failed to parse.
330    InvalidProcessId(core::num::ParseIntError),
331
332    /// The embedded [`SpanId`] failed to parse.
333    InvalidSpanId(core::num::ParseIntError),
334}
335
336impl fmt::Display for ParseSpanContextError {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        match self {
339            Self::MissingSeparator => f.write_str("missing ':' separator"),
340            Self::InvalidProcessId(_) => f.write_str("failed to parse process id"),
341            Self::InvalidSpanId(_) => f.write_str("failed to parse span id"),
342        }
343    }
344}
345
346impl core::error::Error for ParseSpanContextError {
347    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
348        match self {
349            Self::MissingSeparator => None,
350            Self::InvalidProcessId(error) => Some(error),
351            Self::InvalidSpanId(error) => Some(error),
352        }
353    }
354}
355
356impl FromStr for SpanContext {
357    type Err = ParseSpanContextError;
358
359    fn from_str(s: &str) -> Result<Self, Self::Err> {
360        let Some((process_id, span_id)) = s.split_once(":") else {
361            return Err(ParseSpanContextError::MissingSeparator);
362        };
363        let process_id =
364            ProcessId::from_str(process_id).map_err(ParseSpanContextError::InvalidProcessId)?;
365        let span_id = SpanId::from_str(span_id).map_err(ParseSpanContextError::InvalidSpanId)?;
366        Ok(Self {
367            process_id,
368            span_id,
369        })
370    }
371}
372
373impl serde::Serialize for SpanContext {
374    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
375    where
376        S: serde::Serializer,
377    {
378        let mut bytes = [0u8; 49];
379
380        hex::encode_to_slice(self.process_id.to_raw().to_le_bytes(), &mut bytes[..32]).unwrap();
381        bytes[32] = b':';
382        hex::encode_to_slice(self.span_id.0.to_le_bytes(), &mut bytes[33..]).unwrap();
383
384        serializer.serialize_str(str::from_utf8(&bytes).unwrap())
385    }
386}
387
388impl<'de> serde::Deserialize<'de> for SpanContext {
389    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
390    where
391        D: serde::Deserializer<'de>,
392    {
393        use serde::de::Error;
394
395        let string = <&str>::deserialize(deserializer)?;
396
397        if string.len() != 49 {
398            return Err(D::Error::invalid_length(
399                string.len(),
400                &"expected 49 byte string",
401            ));
402        }
403
404        let bytes = string.as_bytes();
405
406        if bytes[32] != b':' {
407            return Err(D::Error::invalid_value(
408                serde::de::Unexpected::Str(string),
409                &"expected : separator at byte 32",
410            ));
411        }
412
413        let mut process = [0; 16];
414        hex::decode_to_slice(&bytes[..32], &mut process).map_err(D::Error::custom)?;
415
416        let mut span = [0; 8];
417        hex::decode_to_slice(&bytes[33..], &mut span).map_err(D::Error::custom)?;
418
419        Ok(Self {
420            process_id: ProcessId::from_raw(u128::from_le_bytes(process)),
421            span_id: SpanId(u64::from_le_bytes(span)),
422        })
423    }
424}
425
426#[cfg(all(test, feature = "std"))]
427mod tests {
428    use std::collections::HashSet;
429    use std::format;
430    use std::vec::Vec;
431
432    use test_case::test_case;
433
434    use super::*;
435
436    #[test]
437    #[cfg(not(miri))] // VERY slow with Miri.
438    #[allow(clippy::needless_collect)]
439    fn unique_id() {
440        let handles = std::iter::repeat_with(|| {
441            std::thread::spawn(|| {
442                std::iter::repeat_with(SpanId::next_id)
443                    .take(1000)
444                    .collect::<Vec<_>>()
445            })
446        })
447        .take(32)
448        .collect::<Vec<_>>();
449
450        let k = handles
451            .into_iter()
452            .flat_map(|h| h.join().unwrap())
453            .collect::<HashSet<_>>();
454
455        assert_eq!(k.len(), 32 * 1000);
456    }
457
458    #[test]
459    fn span_id_next_id_produces_non_zero_values() {
460        let ids: Vec<SpanId> = (0..100).map(|_| SpanId::next_id()).collect();
461
462        for id in &ids {
463            assert_ne!(id.0, 0, "SpanId::next_id() should not produce zero values");
464        }
465
466        let mut unique_ids = HashSet::new();
467        for id in &ids {
468            assert!(
469                unique_ids.insert(id.0),
470                "SpanId::next_id() should produce unique values"
471            );
472        }
473    }
474
475    #[test_case(SpanId(0), r#""0000000000000000""#, "0000000000000000")]
476    #[test_case(SpanId(1), r#""0100000000000000""#, "0000000000000001")]
477    #[test_case(SpanId(0x123), r#""2301000000000000""#, "0000000000000123")]
478    #[test_case(SpanId(u64::MAX), r#""ffffffffffffffff""#, "ffffffffffffffff")]
479    #[test_case(
480        SpanId(0xFEDCBA9876543210),
481        r#""1032547698badcfe""#,
482        "fedcba9876543210"
483    )]
484    #[test_case(
485        ProcessId::from_raw(0),
486        r#""00000000000000000000000000000000""#,
487        "00000000000000000000000000000000"
488    )]
489    #[test_case(
490        ProcessId::from_raw(1),
491        r#""01000000000000000000000000000000""#,
492        "00000000000000000000000000000001"
493    )]
494    #[test_case(
495        ProcessId::from_raw(0x123),
496        r#""23010000000000000000000000000000""#,
497        "00000000000000000000000000000123"
498    )]
499    #[test_case(
500        ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
501        r#""1032547698badcfef0debc9a78563412""#,
502        "123456789abcdef0fedcba9876543210"
503    )]
504    #[test_case(
505        ProcessId::from_raw(u128::MAX),
506        r#""ffffffffffffffffffffffffffffffff""#,
507        "ffffffffffffffffffffffffffffffff"
508    )]
509    #[test_case(
510        ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
511        r#""00000000000000000000000000000000:0100000000000000""#,
512        "00000000000000000000000000000000:0000000000000001"
513    )]
514    #[test_case(
515        ThreadId::from_raw(ProcessId::from_raw(0x123), NonZeroU64::new(0x456).unwrap()),
516        r#""23010000000000000000000000000000:5604000000000000""#,
517        "00000000000000000000000000000123:0000000000000456"
518    )]
519    #[test_case(
520        ThreadId::from_raw(ProcessId::from_raw(u128::MAX), NonZeroU64::new(u64::MAX).unwrap()),
521        r#""ffffffffffffffffffffffffffffffff:ffffffffffffffff""#,
522        "ffffffffffffffffffffffffffffffff:ffffffffffffffff"
523    )]
524    #[test_case(
525        ThreadId::from_raw(
526            ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
527            NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
528        ),
529        r#""1032547698badcfef0debc9a78563412:1032547698badcfe""#,
530        "123456789abcdef0fedcba9876543210:fedcba9876543210"
531    )]
532    #[test_case(
533        SpanContext::new(ProcessId::from_raw(0), SpanId(0)),
534        r#""00000000000000000000000000000000:0000000000000000""#,
535        "00000000000000000000000000000000:0000000000000000"
536    )]
537    #[test_case(
538        SpanContext::new(ProcessId::from_raw(0x123), SpanId(0x456)),
539        r#""23010000000000000000000000000000:5604000000000000""#,
540        "00000000000000000000000000000123:0000000000000456"
541    )]
542    #[test_case(
543        SpanContext::new(ProcessId::from_raw(u128::MAX), SpanId(u64::MAX)),
544        r#""ffffffffffffffffffffffffffffffff:ffffffffffffffff""#,
545        "ffffffffffffffffffffffffffffffff:ffffffffffffffff"
546    )]
547    #[test_case(
548        SpanContext::new(
549            ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
550            SpanId(0xFEDCBA9876543210)
551        ),
552        r#""1032547698badcfef0debc9a78563412:1032547698badcfe""#,
553        "123456789abcdef0fedcba9876543210:fedcba9876543210"
554    )]
555    fn serialization<T>(value: T, expected_json: &str, expected_display: &str)
556    where
557        T: fmt::Display
558            + serde::Serialize
559            + FromStr<Err: fmt::Debug>
560            + serde::de::DeserializeOwned
561            + fmt::Debug
562            + Eq,
563    {
564        assert_eq!(serde_json::to_string(&value).unwrap(), expected_json);
565        assert_eq!(value, serde_json::from_str::<T>(expected_json).unwrap());
566
567        assert_eq!(format!("{value}"), expected_display);
568        assert_eq!(value, T::from_str(expected_display).unwrap());
569    }
570
571    #[test_case("")]
572    #[test_case("xyz")]
573    fn span_id_from_str_error(input: &str) {
574        assert!(SpanId::from_str(input).is_err());
575    }
576
577    #[test_case("")]
578    #[test_case("ffffffffffffffffffffffffffffffff0")]
579    #[test_case("xyz")]
580    fn process_id_from_str_error(input: &str) {
581        assert!(ProcessId::from_str(input).is_err());
582    }
583
584    #[test_case("")]
585    #[test_case("00000000000000000000000000000000")]
586    #[test_case("00000000000000000000000000000000:0000000000000000")]
587    #[test_case("00000000000000000000000000000000:xyz")]
588    #[test_case("00000000000000000000000000000001:")]
589    #[test_case(":0000000000000001")]
590    #[test_case("xyz")]
591    #[test_case("xyz:0000000000000001")]
592    fn thread_id_from_str_error(input: &str) {
593        assert!(ThreadId::from_str(input).is_err());
594    }
595
596    #[test_case("")]
597    #[test_case("00000000000000000000000000000000")]
598    #[test_case("00000000000000000000000000000000:xyz")]
599    #[test_case("00000000000000000000000000000001:")]
600    #[test_case(":0000000000000000")]
601    #[test_case("xyz")]
602    #[test_case("xyz:0000000000000000")]
603    fn span_context_from_str_error(input: &str) {
604        assert!(SpanContext::from_str(input).is_err());
605    }
606}