1use hyperactor::Actor;
12use hyperactor::Context;
13use hyperactor::Named;
14use hyperactor::RemoteHandles;
15use hyperactor::RemoteMessage;
16use hyperactor::actor::Referable;
17use hyperactor::attrs::Attrs;
18use hyperactor::data::Serialized;
19use hyperactor::declare_attrs;
20use hyperactor::message::Castable;
21use hyperactor::message::ErasedUnbound;
22use hyperactor::message::IndexedErasedUnbound;
23use hyperactor::reference::ActorId;
24use ndslice::Extent;
25use ndslice::Point;
26use ndslice::Shape;
27use ndslice::Slice;
28use ndslice::selection::Selection;
29use ndslice::selection::routing::RoutingFrame;
30use serde::Deserialize;
31use serde::Serialize;
32
33use crate::reference::ActorMeshId;
34
35#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct Uslice {
41 pub slice: Slice,
43 pub selection: Selection,
45}
46
47#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
49pub struct CastMessageEnvelope {
50 actor_mesh_id: ActorMeshId,
52 sender: ActorId,
54 dest_port: DestinationPort,
57 data: ErasedUnbound,
59 shape: Shape,
61}
62
63impl CastMessageEnvelope {
64 pub fn new<A, M>(
66 actor_mesh_id: ActorMeshId,
67 sender: ActorId,
68 shape: Shape,
69 message: M,
70 ) -> Result<Self, anyhow::Error>
71 where
72 A: Referable + RemoteHandles<IndexedErasedUnbound<M>>,
73 M: Castable + RemoteMessage,
74 {
75 let data = ErasedUnbound::try_from_message(message)?;
76 let actor_name = match &actor_mesh_id {
77 ActorMeshId::V0(_, actor_name) => actor_name.clone(),
78 ActorMeshId::V1(name) => name.to_string(),
79 };
80 Ok(Self {
81 actor_mesh_id,
82 sender,
83 dest_port: DestinationPort::new::<A, M>(actor_name),
84 data,
85 shape,
86 })
87 }
88
89 pub fn from_serialized(
93 actor_mesh_id: ActorMeshId,
94 sender: ActorId,
95 dest_port: DestinationPort,
96 shape: Shape,
97 data: Serialized,
98 ) -> Self {
99 Self {
100 actor_mesh_id,
101 sender,
102 dest_port,
103 data: ErasedUnbound::new(data),
104 shape,
105 }
106 }
107
108 pub(crate) fn sender(&self) -> &ActorId {
109 &self.sender
110 }
111
112 pub(crate) fn dest_port(&self) -> &DestinationPort {
113 &self.dest_port
114 }
115
116 pub(crate) fn data(&self) -> &ErasedUnbound {
117 &self.data
118 }
119
120 pub(crate) fn data_mut(&mut self) -> &mut ErasedUnbound {
121 &mut self.data
122 }
123
124 pub(crate) fn shape(&self) -> &Shape {
125 &self.shape
126 }
127
128 pub(crate) fn relative_rank(&self, rank_on_root_mesh: usize) -> anyhow::Result<usize> {
131 let shape = self.shape();
132 let coords = shape.slice().coordinates(rank_on_root_mesh).map_err(|e| {
133 anyhow::anyhow!(
134 "fail to calculate coords for root rank {} due to error: {}; shape is {:?}",
135 rank_on_root_mesh,
136 e,
137 shape,
138 )
139 })?;
140 let extent =
141 Extent::new(shape.labels().to_vec(), shape.slice().sizes().to_vec()).map_err(|e| {
142 anyhow::anyhow!(
143 "fail to calculate extent for root rank {} due to error: {}; shape is {}",
144 rank_on_root_mesh,
145 e,
146 shape,
147 )
148 })?;
149 let point = extent.point(coords).map_err(|e| {
150 anyhow::anyhow!(
151 "fail to calculate point for root rank {} due to error: {}; extent is {}, shape is {}",
152 rank_on_root_mesh,
153 e,
154 extent,
155 shape,
156 )
157 })?;
158 Ok(point.rank())
159 }
160
161 pub(crate) fn stream_key(&self) -> (ActorMeshId, ActorId) {
165 (self.actor_mesh_id.clone(), self.sender.clone())
166 }
167}
168
169#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
175pub struct DestinationPort {
176 actor_name: String,
178 port: u64,
181}
182
183impl DestinationPort {
184 pub fn new<A, M>(actor_name: String) -> Self
186 where
187 A: Referable + RemoteHandles<IndexedErasedUnbound<M>>,
188 M: Castable + RemoteMessage,
189 {
190 Self {
191 actor_name,
192 port: IndexedErasedUnbound::<M>::port(),
193 }
194 }
195
196 pub fn port(&self) -> u64 {
198 self.port
199 }
200
201 pub fn actor_name(&self) -> &str {
203 &self.actor_name
204 }
205}
206
207#[derive(Serialize, Deserialize, Debug, Clone, Named)]
209pub struct CastMessage {
210 pub dest: Uslice,
212 pub message: CastMessageEnvelope,
214}
215
216#[derive(Serialize, Deserialize, Debug, Clone, Named)]
220pub(crate) struct ForwardMessage {
221 pub(crate) sender: ActorId,
223 pub(crate) dests: Vec<RoutingFrame>,
225 pub(crate) seq: usize,
227 pub(crate) last_seq: usize,
229 pub(crate) message: CastMessageEnvelope,
231}
232
233declare_attrs! {
234 pub attr CAST_ORIGINATING_SENDER: ActorId;
236
237 pub attr CAST_POINT: Point;
239}
240
241pub fn set_cast_info_on_headers(headers: &mut Attrs, cast_point: Point, sender: ActorId) {
242 headers.set(CAST_POINT, cast_point);
243 headers.set(CAST_ORIGINATING_SENDER, sender);
244}
245
246pub trait CastInfo {
247 fn cast_point(&self) -> Point;
252 fn sender(&self) -> &ActorId;
253}
254
255impl<A: Actor> CastInfo for Context<'_, A> {
256 fn cast_point(&self) -> Point {
257 match self.headers().get(CAST_POINT) {
258 Some(point) => point.clone(),
259 None => Extent::unity().point_of_rank(0).unwrap(),
260 }
261 }
262
263 fn sender(&self) -> &ActorId {
264 self.headers()
265 .get(CAST_ORIGINATING_SENDER)
266 .expect("has sender header")
267 }
268}