1use hyperactor::Actor;
12use hyperactor::Context;
13use hyperactor::Named;
14use hyperactor::RemoteHandles;
15use hyperactor::RemoteMessage;
16use hyperactor::actor::RemoteActor;
17use hyperactor::attrs::Attrs;
18use hyperactor::data::Serialized;
19use hyperactor::declare_attrs;
20use hyperactor::message::Castable;
21use hyperactor::message::ErasedUnbound;
22use hyperactor::message::IndexedErasedUnbound;
23use hyperactor::reference::ActorId;
24use ndslice::Extent;
25use ndslice::Shape;
26use ndslice::Slice;
27use ndslice::selection::Selection;
28use ndslice::selection::routing::RoutingFrame;
29use serde::Deserialize;
30use serde::Serialize;
31
32use crate::reference::ActorMeshId;
33
34#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct Uslice {
40 pub slice: Slice,
42 pub selection: Selection,
44}
45
46#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
48pub struct CastMessageEnvelope {
49 actor_mesh_id: ActorMeshId,
51 sender: ActorId,
53 dest_port: DestinationPort,
56 data: ErasedUnbound,
58 shape: Shape,
60}
61
62impl CastMessageEnvelope {
63 pub fn new<A, M>(
65 actor_mesh_id: ActorMeshId,
66 sender: ActorId,
67 shape: Shape,
68 message: M,
69 ) -> Result<Self, anyhow::Error>
70 where
71 A: RemoteActor + RemoteHandles<IndexedErasedUnbound<M>>,
72 M: Castable + RemoteMessage,
73 {
74 let data = ErasedUnbound::try_from_message(message)?;
75 let actor_name = actor_mesh_id.1.to_string();
76 Ok(Self {
77 actor_mesh_id,
78 sender,
79 dest_port: DestinationPort::new::<A, M>(actor_name),
80 data,
81 shape,
82 })
83 }
84
85 pub fn from_serialized(
89 actor_mesh_id: ActorMeshId,
90 sender: ActorId,
91 dest_port: DestinationPort,
92 shape: Shape,
93 data: Serialized,
94 ) -> Self {
95 Self {
96 actor_mesh_id,
97 sender,
98 dest_port,
99 data: ErasedUnbound::new(data),
100 shape,
101 }
102 }
103
104 pub(crate) fn sender(&self) -> &ActorId {
105 &self.sender
106 }
107
108 pub(crate) fn dest_port(&self) -> &DestinationPort {
109 &self.dest_port
110 }
111
112 pub(crate) fn data(&self) -> &ErasedUnbound {
113 &self.data
114 }
115
116 pub(crate) fn data_mut(&mut self) -> &mut ErasedUnbound {
117 &mut self.data
118 }
119
120 pub(crate) fn shape(&self) -> &Shape {
121 &self.shape
122 }
123
124 pub(crate) fn relative_rank(&self, rank_on_root_mesh: usize) -> anyhow::Result<usize> {
127 let shape = self.shape();
128 let coords = shape.slice().coordinates(rank_on_root_mesh).map_err(|e| {
129 anyhow::anyhow!(
130 "fail to calculate coords for root rank {} due to error: {}; shape is {:?}",
131 rank_on_root_mesh,
132 e,
133 shape,
134 )
135 })?;
136 let extent =
137 Extent::new(shape.labels().to_vec(), shape.slice().sizes().to_vec()).map_err(|e| {
138 anyhow::anyhow!(
139 "fail to calculate extent for root rank {} due to error: {}; shape is {}",
140 rank_on_root_mesh,
141 e,
142 shape,
143 )
144 })?;
145 let point = extent.point(coords).map_err(|e| {
146 anyhow::anyhow!(
147 "fail to calculate point for root rank {} due to error: {}; extent is {}, shape is {}",
148 rank_on_root_mesh,
149 e,
150 extent,
151 shape,
152 )
153 })?;
154 Ok(point.rank())
155 }
156
157 pub(crate) fn stream_key(&self) -> (ActorMeshId, ActorId) {
161 (self.actor_mesh_id.clone(), self.sender.clone())
162 }
163}
164
165#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
171pub struct DestinationPort {
172 actor_name: String,
174 port: u64,
177}
178
179impl DestinationPort {
180 pub fn new<A, M>(actor_name: String) -> Self
182 where
183 A: RemoteActor + RemoteHandles<IndexedErasedUnbound<M>>,
184 M: Castable + RemoteMessage,
185 {
186 Self {
187 actor_name,
188 port: IndexedErasedUnbound::<M>::port(),
189 }
190 }
191
192 pub fn port(&self) -> u64 {
194 self.port
195 }
196
197 pub fn actor_name(&self) -> &str {
199 &self.actor_name
200 }
201}
202
203#[derive(Serialize, Deserialize, Debug, Clone, Named)]
205pub struct CastMessage {
206 pub dest: Uslice,
208 pub message: CastMessageEnvelope,
210}
211
212#[derive(Serialize, Deserialize, Debug, Clone, Named)]
216pub(crate) struct ForwardMessage {
217 pub(crate) sender: ActorId,
219 pub(crate) dests: Vec<RoutingFrame>,
221 pub(crate) seq: usize,
223 pub(crate) last_seq: usize,
225 pub(crate) message: CastMessageEnvelope,
227}
228
229declare_attrs! {
230 attr CAST_RANK: usize;
233 attr CAST_SHAPE: Shape;
236 pub attr CAST_ORIGINATING_SENDER: ActorId;
238}
239
240pub fn set_cast_info_on_headers(
241 headers: &mut Attrs,
242 cast_rank: usize,
243 cast_shape: Shape,
244 sender: ActorId,
245) {
246 headers.set(CAST_RANK, cast_rank);
247 headers.set(CAST_SHAPE, cast_shape);
248 headers.set(CAST_ORIGINATING_SENDER, sender);
249}
250
251pub trait CastInfo {
252 fn cast_info(&self) -> (usize, Shape);
257 fn sender(&self) -> &ActorId;
258}
259
260impl<A: Actor> CastInfo for Context<'_, A> {
261 fn cast_info(&self) -> (usize, Shape) {
262 let headers = self.headers();
263 match (headers.get(CAST_RANK), headers.get(CAST_SHAPE)) {
264 (Some(rank), Some(shape)) => (*rank, shape.clone()),
265 (None, None) => (0, Shape::unity()),
266 _ => panic!("Expected either both rank and shape or neither"),
267 }
268 }
269 fn sender(&self) -> &ActorId {
270 self.headers()
271 .get(CAST_ORIGINATING_SENDER)
272 .expect("has sender header")
273 }
274}