1pub trait Named: Sized + 'static {
14 fn typename() -> &'static str;
17
18 fn typehash() -> u64 {
21 cityhasher::hash(Self::typename())
24 }
25
26 fn typeid() -> TypeId {
29 TypeId::of::<Self>()
30 }
31
32 fn port() -> u64 {
35 Self::typehash() | (1 << 63)
36 }
37
38 fn arm(&self) -> Option<&'static str> {
41 None
42 }
43
44 unsafe fn arm_unchecked(self_: *const ()) -> Option<&'static str> {
47 unsafe { &*(self_ as *const Self) }.arm()
49 }
50}
51
52#[doc(hidden)]
53pub trait NamedDumpable: Named + Serialize + for<'de> Deserialize<'de> {
57 fn dump(data: Serialized) -> Result<serde_json::Value, anyhow::Error>;
59}
60
61impl<T: Named + Serialize + for<'de> Deserialize<'de>> NamedDumpable for T {
62 fn dump(data: Serialized) -> Result<serde_json::Value, anyhow::Error> {
63 let value = data.deserialized::<Self>()?;
64 Ok(serde_json::to_value(value)?)
65 }
66}
67
68macro_rules! impl_basic {
69 ($t:ty) => {
70 impl Named for $t {
71 fn typename() -> &'static str {
72 stringify!($t)
73 }
74 }
75 };
76}
77
78impl_basic!(());
79impl_basic!(bool);
80impl_basic!(i8);
81impl_basic!(u8);
82impl_basic!(i16);
83impl_basic!(u16);
84impl_basic!(i32);
85impl_basic!(u32);
86impl_basic!(i64);
87impl_basic!(u64);
88impl_basic!(i128);
89impl_basic!(u128);
90impl_basic!(isize);
91impl_basic!(usize);
92impl_basic!(f32);
93impl_basic!(f64);
94impl_basic!(String);
95
96impl Named for &'static str {
97 fn typename() -> &'static str {
98 "&str"
99 }
100}
101
102impl Named for std::time::Duration {
103 fn typename() -> &'static str {
104 "std::time::Duration"
105 }
106}
107
108impl Named for bytes::Bytes {
109 fn typename() -> &'static str {
110 "bytes::Bytes"
111 }
112}
113
114#[doc(hidden)] #[macro_export]
118macro_rules! intern_typename {
119 ($key:ty, $format_string:expr_2021, $($args:ty),+) => {
120 {
121 static CACHE: std::sync::LazyLock<$crate::dashmap::DashMap<std::any::TypeId, &'static str>> =
122 std::sync::LazyLock::new($crate::dashmap::DashMap::new);
123
124 match CACHE.entry(std::any::TypeId::of::<$key>()) {
125 $crate::dashmap::mapref::entry::Entry::Vacant(entry) => {
126 let typename = format!($format_string, $(<$args>::typename()),+).leak();
127 entry.insert(typename);
128 typename
129 }
130 $crate::dashmap::mapref::entry::Entry::Occupied(entry) => *entry.get(),
131 }
132 }
133 };
134}
135use std::any::TypeId;
136use std::collections::HashMap;
137use std::fmt;
138use std::io::Cursor;
139use std::sync::LazyLock;
140
141use enum_as_inner::EnumAsInner;
142pub use intern_typename;
143use serde::Deserialize;
144use serde::Serialize;
145use serde::de::DeserializeOwned;
146
147macro_rules! tuple_format_string {
148 ($a:ident,) => { "{}" };
149 ($a:ident, $($rest_a:ident,)+) => { concat!("{}, ", tuple_format_string!($($rest_a,)+)) };
150}
151
152macro_rules! impl_tuple_peel {
153 ($name:ident, $($other:ident,)*) => (impl_tuple! { $($other,)* })
154}
155
156macro_rules! impl_tuple {
157 () => ();
158 ( $($name:ident,)+ ) => (
159 impl<$($name:Named + 'static),+> Named for ($($name,)+) {
160 fn typename() -> &'static str {
161 intern_typename!(Self, concat!("(", tuple_format_string!($($name,)+), ")"), $($name),+)
162 }
163 }
164 impl_tuple_peel! { $($name,)+ }
165 )
166}
167
168impl_tuple! { E, D, C, B, A, Z, Y, X, W, V, U, T, }
169
170impl<T: Named + 'static> Named for Option<T> {
171 fn typename() -> &'static str {
172 intern_typename!(Self, "Option<{}>", T)
173 }
174}
175
176impl<T: Named + 'static> Named for Vec<T> {
177 fn typename() -> &'static str {
178 intern_typename!(Self, "Vec<{}>", T)
179 }
180}
181
182impl<T: Named + 'static, E: Named + 'static> Named for Result<T, E> {
183 fn typename() -> &'static str {
184 intern_typename!(Self, "Result<{}, {}>", T, E)
185 }
186}
187
188static SHAPE_CACHED_TYPEHASH: LazyLock<u64> =
189 LazyLock::new(|| cityhasher::hash(<ndslice::shape::Shape as Named>::typename()));
190
191impl Named for ndslice::shape::Shape {
192 fn typename() -> &'static str {
193 "ndslice::shape::Shape"
194 }
195
196 fn typehash() -> u64 {
197 *SHAPE_CACHED_TYPEHASH
198 }
199}
200
201#[doc(hidden)]
203#[derive(Debug)]
204pub struct TypeInfo {
205 pub typename: fn() -> &'static str,
207 pub typehash: fn() -> u64,
209 pub typeid: fn() -> TypeId,
211 pub port: fn() -> u64,
213 pub dump: Option<fn(Serialized) -> Result<serde_json::Value, anyhow::Error>>,
215 pub arm_unchecked: unsafe fn(*const ()) -> Option<&'static str>,
217}
218
219#[allow(dead_code)]
220impl TypeInfo {
221 pub(crate) fn get(typehash: u64) -> Option<&'static TypeInfo> {
223 TYPE_INFO.get(&typehash).map(|v| &**v)
224 }
225
226 pub(crate) fn get_by_typeid(typeid: TypeId) -> Option<&'static TypeInfo> {
228 TYPE_INFO_BY_TYPE_ID.get(&typeid).map(|v| &**v)
229 }
230
231 pub(crate) fn of<T: ?Sized + 'static>() -> Option<&'static TypeInfo> {
233 Self::get_by_typeid(TypeId::of::<T>())
234 }
235
236 pub(crate) fn typename(&self) -> &'static str {
237 (self.typename)()
238 }
239 pub(crate) fn typehash(&self) -> u64 {
240 (self.typehash)()
241 }
242 pub(crate) fn typeid(&self) -> TypeId {
243 (self.typeid)()
244 }
245 pub(crate) fn port(&self) -> u64 {
246 (self.port)()
247 }
248 pub(crate) fn dump(&self, data: Serialized) -> Result<serde_json::Value, anyhow::Error> {
249 if let Some(dump) = self.dump {
250 (dump)(data)
251 } else {
252 anyhow::bail!("binary does not have dumper for {}", self.typehash())
253 }
254 }
255 pub(crate) unsafe fn arm_unchecked(&self, value: *const ()) -> Option<&'static str> {
256 unsafe { (self.arm_unchecked)(value) }
258 }
259}
260
261inventory::collect!(TypeInfo);
262
263static TYPE_INFO: LazyLock<HashMap<u64, &'static TypeInfo>> = LazyLock::new(|| {
265 inventory::iter::<TypeInfo>()
266 .map(|entry| (entry.typehash(), entry))
267 .collect()
268});
269
270static TYPE_INFO_BY_TYPE_ID: LazyLock<HashMap<std::any::TypeId, &'static TypeInfo>> =
272 LazyLock::new(|| {
273 TYPE_INFO
274 .values()
275 .map(|info| (info.typeid(), &**info))
276 .collect()
277 });
278
279#[macro_export]
285macro_rules! register_type {
286 ($type:ty) => {
287 hyperactor::submit! {
288 hyperactor::data::TypeInfo {
289 typename: <$type as hyperactor::data::Named>::typename,
290 typehash: <$type as hyperactor::data::Named>::typehash,
291 typeid: <$type as hyperactor::data::Named>::typeid,
292 port: <$type as hyperactor::data::Named>::port,
293 dump: Some(<$type as hyperactor::data::NamedDumpable>::dump),
294 arm_unchecked: <$type as hyperactor::data::Named>::arm_unchecked,
295 }
296 }
297 };
298}
299
300#[derive(Clone, Serialize, Deserialize, PartialEq, EnumAsInner)]
302enum Encoded {
303 Bincode(bytes::Bytes),
304 Json(bytes::Bytes),
305 }
307
308impl Encoded {
309 pub fn len(&self) -> usize {
311 match &self {
312 Encoded::Bincode(data) => data.len(),
313 Encoded::Json(data) => data.len(),
314 }
315 }
316
317 pub fn is_empty(&self) -> bool {
319 match &self {
320 Encoded::Bincode(data) => data.is_empty(),
321 Encoded::Json(data) => data.is_empty(),
322 }
323 }
324
325 pub fn crc(&self) -> u32 {
327 match &self {
328 Encoded::Bincode(data) => crc32fast::hash(data),
329 Encoded::Json(data) => crc32fast::hash(data),
330 }
331 }
332}
333
334impl std::fmt::Debug for Encoded {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 match self {
337 Encoded::Bincode(data) => write!(f, "Encoded::Bincode({})", HexFmt(data)),
338 Encoded::Json(data) => write!(f, "Encoded::Json({})", HexFmt(data)),
339 }
340 }
341}
342
343#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
350pub struct Serialized {
351 encoded: Encoded,
353 typehash: Option<u64>,
356}
357
358impl std::fmt::Display for Serialized {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 match self.dump() {
361 Ok(value) => {
362 let typename = self.typename().unwrap();
364 let basename = typename.split("::").last().unwrap_or(typename);
366 write!(f, "{}{}", basename, JsonFmt(&value))
367 }
368 Err(_) => write!(f, "{:?}", self.encoded),
369 }
370 }
371}
372
373impl Serialized {
374 pub fn serialize<T: Serialize + Named>(value: &T) -> Result<Self, bincode::Error> {
376 Ok(Self {
377 encoded: Encoded::Bincode(bincode::serialize(value)?.into()),
378 typehash: Some(T::typehash()),
379 })
380 }
381
382 pub fn serialize_anon<T: Serialize>(value: &T) -> Result<Self, bincode::Error> {
384 Ok(Self {
385 encoded: Encoded::Bincode(bincode::serialize(value)?.into()),
386 typehash: None,
387 })
388 }
389
390 pub fn deserialized<T: DeserializeOwned>(&self) -> Result<T, anyhow::Error> {
392 match &self.encoded {
393 Encoded::Bincode(data) => bincode::deserialize(data).map_err(anyhow::Error::from),
394 Encoded::Json(data) => serde_json::from_slice(data).map_err(anyhow::Error::from),
395 }
396 }
397
398 pub fn transcode_to_json(self) -> Result<Self, Self> {
401 match self.encoded {
402 Encoded::Bincode(_) => {
403 let json_value = match self.dump() {
404 Ok(json_value) => json_value,
405 Err(_) => return Err(self),
406 };
407 let json_data = match serde_json::to_vec(&json_value) {
408 Ok(json_data) => json_data,
409 Err(_) => return Err(self),
410 };
411 Ok(Self {
412 encoded: Encoded::Json(json_data.into()),
413 typehash: self.typehash,
414 })
415 }
416 Encoded::Json(_) => Ok(self),
417 }
418 }
419
420 pub fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
423 match &self.encoded {
424 Encoded::Bincode(_) => {
425 let Some(typehash) = self.typehash() else {
426 anyhow::bail!("serialized value does not contain a typehash");
427 };
428 let Some(typeinfo) = TYPE_INFO.get(&typehash) else {
429 anyhow::bail!("binary does not have typeinfo for {}", typehash);
430 };
431 typeinfo.dump(self.clone())
432 }
433 Encoded::Json(data) => serde_json::from_slice(data).map_err(anyhow::Error::from),
434 }
435 }
436
437 pub fn typehash(&self) -> Option<u64> {
439 self.typehash
440 }
441
442 pub fn typename(&self) -> Option<&'static str> {
444 self.typehash
445 .and_then(|typehash| TYPE_INFO.get(&typehash).map(|typeinfo| typeinfo.typename()))
446 }
447
448 pub fn prefix<T: DeserializeOwned>(&self) -> Result<T, anyhow::Error> {
453 match &self.encoded {
454 Encoded::Bincode(data) => bincode::deserialize(data).map_err(anyhow::Error::from),
455 _ => anyhow::bail!("only bincode supports prefix emplacement"),
456 }
457 }
458
459 pub fn emplace_prefix<T: Serialize + DeserializeOwned>(
462 &mut self,
463 prefix: T,
464 ) -> Result<(), anyhow::Error> {
465 let data = match &self.encoded {
466 Encoded::Bincode(data) => data,
467 _ => anyhow::bail!("only bincode supports prefix emplacement"),
468 };
469
470 let mut cursor = Cursor::new(data.clone());
475 let _prefix: T = bincode::deserialize_from(&mut cursor).unwrap();
476 let position = cursor.position() as usize;
477 let suffix = &cursor.into_inner()[position..];
478 let mut data = bincode::serialize(&prefix)?;
479 data.extend_from_slice(suffix);
480 self.encoded = Encoded::Bincode(data.into());
481
482 Ok(())
483 }
484
485 pub fn len(&self) -> usize {
487 self.encoded.len()
488 }
489
490 pub fn is_empty(&self) -> bool {
492 self.encoded.is_empty()
493 }
494
495 pub fn crc(&self) -> u32 {
497 self.encoded.crc()
498 }
499}
500
501const MAX_BYTE_PREVIEW_LENGTH: usize = 8;
502
503fn display_bytes_as_hash(f: &mut impl std::fmt::Write, bytes: &[u8]) -> std::fmt::Result {
504 let hash = crc32fast::hash(bytes);
505 write!(f, "CRC:{:x}", hash)?;
506 for &byte in bytes.iter().take(MAX_BYTE_PREVIEW_LENGTH) {
508 write!(f, " {:x}", byte)?;
509 }
510 if bytes.len() > MAX_BYTE_PREVIEW_LENGTH {
511 write!(f, " [...{} bytes]", bytes.len() - MAX_BYTE_PREVIEW_LENGTH)?;
512 }
513 Ok(())
514}
515
516pub struct HexFmt<'a>(pub &'a [u8]);
518
519impl<'a> std::fmt::Display for HexFmt<'a> {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 display_bytes_as_hash(f, self.0)
523 }
524}
525
526pub struct JsonFmt<'a>(pub &'a serde_json::Value);
529
530const MAX_JSON_VALUE_DISPLAY_LENGTH: usize = 8;
531
532impl<'a> std::fmt::Display for JsonFmt<'a> {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534 fn truncate_and_hash(value_str: &str) -> String {
537 let truncated_str = &value_str[..MAX_JSON_VALUE_DISPLAY_LENGTH];
538 let mut result = truncated_str.to_string();
539 result.push_str(&format!("[...{} chars] ", value_str.len()));
540 display_bytes_as_hash(&mut result, value_str.as_bytes()).unwrap();
541 result
542 }
543
544 fn truncate_json_values(value: &serde_json::Value) -> serde_json::Value {
546 match value {
547 serde_json::Value::String(s) => {
548 if s.len() > MAX_JSON_VALUE_DISPLAY_LENGTH {
549 serde_json::Value::String(truncate_and_hash(s))
550 } else {
551 value.clone()
552 }
553 }
554 serde_json::Value::Array(arr) => {
555 let array_str = serde_json::to_string(arr).unwrap();
556 if array_str.len() > MAX_JSON_VALUE_DISPLAY_LENGTH {
557 serde_json::Value::String(truncate_and_hash(&array_str))
558 } else {
559 value.clone()
560 }
561 }
562 serde_json::Value::Object(obj) => {
563 let truncated_obj: serde_json::Map<_, _> = obj
564 .iter()
565 .map(|(k, v)| (k.clone(), truncate_json_values(v)))
566 .collect();
567 serde_json::Value::Object(truncated_obj)
568 }
569 _ => value.clone(),
570 }
571 }
572
573 let truncated = truncate_json_values(self.0);
574 write!(f, "{}", truncated)
575 }
576}
577
578#[cfg(test)]
579mod tests {
580
581 use serde::Deserialize;
582 use serde::Serialize;
583
584 use super::*;
585 use crate as hyperactor; use crate::Named;
587
588 #[derive(Named)]
589 struct TestStruct;
590
591 #[test]
592 fn test_names() {
593 assert_eq!(String::typename(), "String");
594 assert_eq!(Option::<String>::typename(), "Option<String>");
595 assert_eq!(Vec::<String>::typename(), "Vec<String>");
596 assert_eq!(Vec::<Vec::<String>>::typename(), "Vec<Vec<String>>");
597 assert_eq!(
598 Vec::<Vec::<Vec::<String>>>::typename(),
599 "Vec<Vec<Vec<String>>>"
600 );
601 assert_eq!(
602 <(u64, String, Option::<isize>)>::typename(),
603 "(u64, String, Option<isize>)"
604 );
605 assert_eq!(
606 TestStruct::typename(),
607 "hyperactor::data::tests::TestStruct"
608 );
609 assert_eq!(
610 Vec::<TestStruct>::typename(),
611 "Vec<hyperactor::data::tests::TestStruct>"
612 );
613 }
614
615 #[test]
616 fn test_ports() {
617 assert_eq!(String::typehash(), 3947244799002047352u64);
618 assert_eq!(String::port(), 13170616835856823160u64);
619 assert_ne!(
620 Vec::<Vec::<Vec::<String>>>::typehash(),
621 Vec::<Vec::<Vec::<Vec::<String>>>>::typehash(),
622 );
623 }
624
625 #[derive(Named, Serialize, Deserialize, PartialEq, Eq, Debug)]
626 struct TestDumpStruct {
627 a: String,
628 b: u64,
629 c: Option<i32>,
630 }
631 crate::register_type!(TestDumpStruct);
632
633 #[test]
634 fn test_dump_struct() {
635 let data = TestDumpStruct {
636 a: "hello".to_string(),
637 b: 1234,
638 c: Some(5678),
639 };
640 let serialized = Serialized::serialize(&data).unwrap();
641 let serialized_json = serialized.clone().transcode_to_json().unwrap();
642
643 assert!(serialized.encoded.is_bincode());
644 assert!(serialized_json.encoded.is_json());
645
646 let json_string =
647 String::from_utf8(serialized_json.encoded.as_json().unwrap().to_vec().clone()).unwrap();
648 assert_eq!(json_string, "{\"a\":\"hello\",\"b\":1234,\"c\":5678}");
650
651 for serialized in [serialized, serialized_json] {
652 assert_eq!(
655 serialized.typename(),
656 Some("hyperactor::data::tests::TestDumpStruct")
657 );
658
659 let json = serialized.dump().unwrap();
660 assert_eq!(
661 json,
662 serde_json::json!({
663 "a": "hello",
664 "b": 1234,
665 "c": 5678,
666 })
667 );
668
669 assert_eq!(
670 format!("{}", serialized),
671 "TestDumpStruct{\"a\":\"hello\",\"b\":1234,\"c\":5678}",
672 );
673 }
674 }
675
676 #[test]
677 fn test_emplace_prefix() {
678 let data = TestDumpStruct {
679 a: "hello".to_string(),
680 b: 1234,
681 c: Some(5678),
682 };
683
684 let mut ser = Serialized::serialize(&data).unwrap();
685 assert_eq!(ser.prefix::<String>().unwrap(), "hello".to_string());
686
687 ser.emplace_prefix("hello, world, 123!".to_string())
688 .unwrap();
689
690 assert_eq!(
691 ser.deserialized::<TestDumpStruct>().unwrap(),
692 TestDumpStruct {
693 a: "hello, world, 123!".to_string(),
694 b: 1234,
695 c: Some(5678),
696 }
697 );
698 }
699
700 #[test]
701 fn test_arms() {
702 #[derive(Named)]
703 enum TestArm {
704 #[allow(dead_code)]
705 A(u32),
706 B,
707 C(),
708 D {
709 #[allow(dead_code)]
710 a: u32,
711 #[allow(dead_code)]
712 b: String,
713 },
714 }
715
716 assert_eq!(TestArm::A(1234).arm(), Some("A"));
717 assert_eq!(TestArm::B.arm(), Some("B"));
718 assert_eq!(TestArm::C().arm(), Some("C"));
719 assert_eq!(
720 TestArm::D {
721 a: 1234,
722 b: "hello".to_string()
723 }
724 .arm(),
725 Some("D")
726 );
727 }
728
729 #[test]
730 fn display_hex() {
731 assert_eq!(
732 format!("{}", HexFmt("hello world".as_bytes())),
733 "CRC:d4a1185 68 65 6c 6c 6f 20 77 6f [...3 bytes]"
734 );
735 assert_eq!(format!("{}", HexFmt("".as_bytes())), "CRC:0");
736 assert_eq!(
737 format!("{}", HexFmt("a very long string that is long".as_bytes())),
738 "CRC:c7e24f62 61 20 76 65 72 79 20 6c [...23 bytes]"
739 );
740 }
741
742 #[test]
743 fn test_json_fmt() {
744 let json_value = serde_json::json!({
745 "name": "test",
746 "number": 42,
747 "nested": {
748 "key": "value"
749 }
750 });
751 assert_eq!(
753 format!("{}", JsonFmt(&json_value)),
754 "{\"name\":\"test\",\"nested\":{\"key\":\"value\"},\"number\":42}",
755 );
756
757 let empty_json = serde_json::json!({});
758 assert_eq!(format!("{}", JsonFmt(&empty_json)), "{}");
759
760 let simple_array = serde_json::json!([1, 2, 3]);
761 assert_eq!(format!("{}", JsonFmt(&simple_array)), "[1,2,3]");
762
763 let long_string_json = serde_json::json!({
765 "long_string": "a".repeat(MAX_JSON_VALUE_DISPLAY_LENGTH * 5)
766 });
767 assert_eq!(
768 format!("{}", JsonFmt(&long_string_json)),
769 "{\"long_string\":\"aaaaaaaa[...40 chars] CRC:c95b8a25 61 61 61 61 61 61 61 61 [...32 bytes]\"}"
770 );
771
772 let long_array_json =
774 serde_json::json!((1..=(MAX_JSON_VALUE_DISPLAY_LENGTH + 4)).collect::<Vec<_>>());
775 assert_eq!(
776 format!("{}", JsonFmt(&long_array_json)),
777 "\"[1,2,3,4[...28 chars] CRC:e5c881af 5b 31 2c 32 2c 33 2c 34 [...20 bytes]\""
778 );
779
780 let nested_json = serde_json::json!({
782 "simple_number": 42,
783 "simple_bool": true,
784 "outer": {
785 "long_string": "a".repeat(MAX_JSON_VALUE_DISPLAY_LENGTH + 10),
786 "long_array": (1..=(MAX_JSON_VALUE_DISPLAY_LENGTH + 4)).collect::<Vec<_>>(),
787 "inner": {
788 "simple_value": "short",
789 }
790 }
791 });
792 println!("{}", JsonFmt(&nested_json));
793 assert_eq!(
794 format!("{}", JsonFmt(&nested_json)),
795 "{\"outer\":{\"inner\":{\"simple_value\":\"short\"},\"long_array\":\"[1,2,3,4[...28 chars] CRC:e5c881af 5b 31 2c 32 2c 33 2c 34 [...20 bytes]\",\"long_string\":\"aaaaaaaa[...18 chars] CRC:b8ac0e31 61 61 61 61 61 61 61 61 [...10 bytes]\"},\"simple_bool\":true,\"simple_number\":42}",
796 );
797 }
798}