1#![allow(dead_code)] use std::collections::BTreeSet;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorRef;
18use hyperactor::Bind;
19use hyperactor::GangId;
20use hyperactor::GangRef;
21use hyperactor::Message;
22use hyperactor::Named;
23use hyperactor::PortHandle;
24use hyperactor::RemoteHandles;
25use hyperactor::RemoteMessage;
26use hyperactor::Unbind;
27use hyperactor::WorldId;
28use hyperactor::actor::Referable;
29use hyperactor::attrs::Attrs;
30use hyperactor::attrs::declare_attrs;
31use hyperactor::config;
32use hyperactor::context;
33use hyperactor::mailbox::MailboxSenderError;
34use hyperactor::mailbox::PortReceiver;
35use hyperactor::message::Castable;
36use hyperactor::message::IndexedErasedUnbound;
37use hyperactor::supervision::ActorSupervisionEvent;
38use ndslice::Range;
39use ndslice::Selection;
40use ndslice::Shape;
41use ndslice::ShapeError;
42use ndslice::SliceError;
43use ndslice::View;
44use ndslice::reshape::Limit;
45use ndslice::reshape::ReshapeError;
46use ndslice::reshape::ReshapeSliceExt;
47use ndslice::reshape::reshape_selection;
48use ndslice::selection;
49use ndslice::selection::EvalOpts;
50use ndslice::selection::ReifySlice;
51use ndslice::selection::normal;
52use ndslice::view::ViewExt;
53use serde::Deserialize;
54use serde::Serialize;
55use serde_multipart::Part;
56use tokio::sync::mpsc;
57
58use crate::CommActor;
59use crate::Mesh;
60use crate::comm::multicast::CastMessage;
61use crate::comm::multicast::CastMessageEnvelope;
62use crate::comm::multicast::Uslice;
63use crate::config::MAX_CAST_DIMENSION_SIZE;
64use crate::metrics;
65use crate::proc_mesh::ProcMesh;
66use crate::reference::ActorMeshId;
67use crate::reference::ActorMeshRef;
68use crate::v1;
69
70declare_attrs! {
71 pub attr CAST_ACTOR_MESH_ID: ActorMeshId;
75}
76
77#[allow(clippy::result_large_err)] pub(crate) fn actor_mesh_cast<A, M>(
81 cx: &impl context::Actor,
82 actor_mesh_id: ActorMeshId,
83 comm_actor_ref: &ActorRef<CommActor>,
84 selection_of_root: Selection,
85 root_mesh_shape: &Shape,
86 cast_mesh_shape: &Shape,
87 message: M,
88) -> Result<(), CastError>
89where
90 A: Referable + RemoteHandles<IndexedErasedUnbound<M>>,
91 M: Castable + RemoteMessage,
92{
93 let _ = metrics::ACTOR_MESH_CAST_DURATION.start(hyperactor::kv_pairs!(
94 "message_type" => M::typename(),
95 "message_variant" => message.arm().unwrap_or_default(),
96 ));
97
98 let message = CastMessageEnvelope::new::<A, M>(
99 actor_mesh_id.clone(),
100 cx.mailbox().actor_id().clone(),
101 cast_mesh_shape.clone(),
102 message,
103 )?;
104
105 let slice_of_root = root_mesh_shape.slice();
123
124 let max_cast_dimension_size = config::global::get(MAX_CAST_DIMENSION_SIZE);
125
126 let slice_of_cast = slice_of_root.reshape_with_limit(Limit::from(max_cast_dimension_size));
127
128 let selection_of_cast =
129 reshape_selection(selection_of_root, root_mesh_shape.slice(), &slice_of_cast)?;
130
131 let cast_message = CastMessage {
132 dest: Uslice {
133 slice: slice_of_cast,
134 selection: selection_of_cast,
135 },
136 message,
137 };
138
139 let mut headers = Attrs::new();
140 headers.set(CAST_ACTOR_MESH_ID, actor_mesh_id);
141
142 comm_actor_ref
143 .port()
144 .send_with_headers(cx, headers, cast_message)?;
145
146 Ok(())
147}
148
149#[allow(clippy::result_large_err)] pub(crate) fn cast_to_sliced_mesh<A, M>(
151 cx: &impl context::Actor,
152 actor_mesh_id: ActorMeshId,
153 comm_actor_ref: &ActorRef<CommActor>,
154 sel_of_sliced: &Selection,
155 message: M,
156 sliced_shape: &Shape,
157 root_mesh_shape: &Shape,
158) -> Result<(), CastError>
159where
160 A: Referable + RemoteHandles<IndexedErasedUnbound<M>>,
161 M: Castable + RemoteMessage,
162{
163 let root_slice = root_mesh_shape.slice();
164
165 let sel_of_root = if selection::normalize(sel_of_sliced) == normal::NormalizedSelection::True {
167 root_slice.reify_slice(sliced_shape.slice())?
169 } else {
170 let ranks = sel_of_sliced
172 .eval(&EvalOpts::strict(), sliced_shape.slice())?
173 .collect::<BTreeSet<_>>();
174 Selection::of_ranks(root_slice, &ranks)?
175 };
176
177 actor_mesh_cast::<A, M>(
179 cx,
180 actor_mesh_id,
181 comm_actor_ref,
182 sel_of_root,
183 root_mesh_shape,
184 sliced_shape,
185 message,
186 )
187}
188
189#[async_trait]
191pub trait ActorMesh: Mesh<Id = ActorMeshId> {
192 type Actor: Referable;
194
195 #[allow(clippy::result_large_err)] fn cast<M>(
199 &self,
200 cx: &impl context::Actor,
201 selection: Selection,
202 message: M,
203 ) -> Result<(), CastError>
204 where
205 Self::Actor: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
206 M: Castable + RemoteMessage + Clone,
207 {
208 if let Some(v1) = self.v1() {
209 return v1
210 .cast_for_tensor_engine_only_do_not_use(cx, selection, message)
211 .map_err(anyhow::Error::from)
212 .map_err(CastError::from);
213 }
214 actor_mesh_cast::<Self::Actor, M>(
215 cx, self.id(), self.proc_mesh().comm_actor(), selection, self.shape(), self.shape(), message, )
223 }
224
225 fn proc_mesh(&self) -> &ProcMesh;
227
228 fn name(&self) -> &str;
230
231 fn world_id(&self) -> &WorldId {
232 self.proc_mesh().world_id()
233 }
234
235 fn iter_actor_refs(&self) -> Box<dyn Iterator<Item = ActorRef<Self::Actor>>> {
237 if let Some(v1) = self.v1() {
238 return Box::new(
241 v1.iter()
242 .map(|(_point, actor_ref)| actor_ref.clone())
243 .collect::<Vec<_>>()
244 .into_iter(),
245 );
246 }
247 let gang: GangRef<Self::Actor> = GangRef::attest(GangId(
248 self.proc_mesh().world_id().clone(),
249 self.name().to_string(),
250 ));
251 Box::new(self.shape().slice().iter().map(move |rank| gang.rank(rank)))
252 }
253
254 async fn stop(&self, cx: &impl context::Actor) -> Result<(), anyhow::Error> {
255 self.proc_mesh().stop_actor_by_name(cx, self.name()).await
256 }
257
258 fn bind(&self) -> ActorMeshRef<Self::Actor> {
260 ActorMeshRef::attest(
261 self.id(),
262 self.shape().clone(),
263 self.proc_mesh().comm_actor().clone(),
264 )
265 }
266
267 fn v1(&self) -> Option<v1::ActorMeshRef<Self::Actor>>;
269}
270
271enum ProcMeshRef<'a> {
275 Shared(Box<dyn Deref<Target = ProcMesh> + Sync + Send>),
277 Borrowed(&'a ProcMesh),
280}
281
282impl Deref for ProcMeshRef<'_> {
283 type Target = ProcMesh;
284
285 fn deref(&self) -> &Self::Target {
286 match self {
287 Self::Shared(p) => p,
288 Self::Borrowed(p) => p, }
290 }
291}
292
293pub struct RootActorMesh<'a, A: Referable> {
300 inner: ActorMeshKind<'a, A>,
301 shape: OnceLock<Shape>,
302 proc_mesh: OnceLock<ProcMesh>,
303 name: OnceLock<String>,
304}
305
306enum ActorMeshKind<'a, A: Referable> {
307 V0 {
308 proc_mesh: ProcMeshRef<'a>,
309 name: String,
310 ranks: Vec<ActorRef<A>>, actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
314 },
315
316 V1(v1::ActorMeshRef<A>),
317}
318
319impl<'a, A: Referable> From<v1::ActorMeshRef<A>> for RootActorMesh<'a, A> {
320 fn from(actor_mesh: v1::ActorMeshRef<A>) -> Self {
321 Self {
322 inner: ActorMeshKind::V1(actor_mesh),
323 shape: OnceLock::new(),
324 proc_mesh: OnceLock::new(),
325 name: OnceLock::new(),
326 }
327 }
328}
329
330impl<'a, A: Referable> From<v1::ActorMesh<A>> for RootActorMesh<'a, A> {
331 fn from(actor_mesh: v1::ActorMesh<A>) -> Self {
332 actor_mesh.detach().into()
333 }
334}
335
336impl<'a, A: Referable> RootActorMesh<'a, A> {
337 pub(crate) fn new(
338 proc_mesh: &'a ProcMesh,
339 name: String,
340 actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
341 ranks: Vec<ActorRef<A>>,
342 ) -> Self {
343 Self {
344 inner: ActorMeshKind::V0 {
345 proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
346 name,
347 ranks,
348 actor_supervision_rx: Some(actor_supervision_rx),
349 },
350 shape: OnceLock::new(),
351 proc_mesh: OnceLock::new(),
352 name: OnceLock::new(),
353 }
354 }
355
356 pub fn new_v1(actor_mesh: v1::ActorMeshRef<A>) -> Self {
357 Self {
358 inner: ActorMeshKind::V1(actor_mesh),
359 shape: OnceLock::new(),
360 proc_mesh: OnceLock::new(),
361 name: OnceLock::new(),
362 }
363 }
364
365 pub(crate) fn new_shared<D: Deref<Target = ProcMesh> + Send + Sync + 'static>(
366 proc_mesh: D,
367 name: String,
368 actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
369 ranks: Vec<ActorRef<A>>,
370 ) -> Self {
371 Self {
372 inner: ActorMeshKind::V0 {
373 proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
374 name,
375 ranks,
376 actor_supervision_rx: Some(actor_supervision_rx),
377 },
378 shape: OnceLock::new(),
379 proc_mesh: OnceLock::new(),
380 name: OnceLock::new(),
381 }
382 }
383
384 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
386 match &self.inner {
387 ActorMeshKind::V0 { proc_mesh, .. } => proc_mesh.client().open_port(),
388 ActorMeshKind::V1(_actor_mesh) => unimplemented!("unsupported operation"),
389 }
390 }
391
392 pub fn events(&mut self) -> Option<ActorSupervisionEvents> {
395 match &mut self.inner {
396 ActorMeshKind::V0 {
397 actor_supervision_rx,
398 ..
399 } => actor_supervision_rx
400 .take()
401 .map(|actor_supervision_rx| ActorSupervisionEvents {
402 actor_supervision_rx,
403 mesh_id: self.id(),
404 }),
405 ActorMeshKind::V1(_actor_mesh) => unimplemented!("unsupported operation"),
406 }
407 }
408
409 #[cfg(test)]
411 pub(crate) fn ranks(&self) -> &Vec<ActorRef<A>> {
412 match &self.inner {
413 ActorMeshKind::V0 { ranks, .. } => ranks,
414 ActorMeshKind::V1(_actor_mesh) => unimplemented!("unsupported operation"),
415 }
416 }
417}
418
419pub struct ActorSupervisionEvents {
421 actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
423 mesh_id: ActorMeshId,
425}
426
427impl ActorSupervisionEvents {
428 pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
429 let result = self.actor_supervision_rx.recv().await;
430 if result.is_none() {
431 tracing::info!(
432 "supervision stream for actor mesh {:?} was closed!",
433 self.mesh_id
434 );
435 }
436 result
437 }
438}
439
440#[async_trait]
441impl<'a, A: Referable> Mesh for RootActorMesh<'a, A> {
442 type Node = ActorRef<A>;
443 type Id = ActorMeshId;
444 type Sliced<'b>
445 = SlicedActorMesh<'b, A>
446 where
447 'a: 'b;
448
449 fn shape(&self) -> &Shape {
450 self.shape.get_or_init(|| match &self.inner {
451 ActorMeshKind::V0 { proc_mesh, .. } => proc_mesh.shape().clone(),
452 ActorMeshKind::V1(actor_mesh) => actor_mesh.region().into(),
453 })
454 }
455
456 fn select<R: Into<Range>>(
457 &self,
458 label: &str,
459 range: R,
460 ) -> Result<Self::Sliced<'_>, ShapeError> {
461 Ok(SlicedActorMesh(self, self.shape().select(label, range)?))
462 }
463
464 fn get(&self, rank: usize) -> Option<ActorRef<A>> {
465 match &self.inner {
466 ActorMeshKind::V0 { ranks, .. } => ranks.get(rank).cloned(),
467 ActorMeshKind::V1(actor_mesh) => actor_mesh.get(rank),
468 }
469 }
470
471 fn id(&self) -> Self::Id {
472 match &self.inner {
473 ActorMeshKind::V0 {
474 proc_mesh, name, ..
475 } => ActorMeshId::V0(proc_mesh.id(), name.clone()),
476 ActorMeshKind::V1(actor_mesh) => ActorMeshId::V1(actor_mesh.name().clone()),
477 }
478 }
479}
480
481impl<A: Referable> ActorMesh for RootActorMesh<'_, A> {
482 type Actor = A;
483
484 fn proc_mesh(&self) -> &ProcMesh {
485 match &self.inner {
486 ActorMeshKind::V0 { proc_mesh, .. } => proc_mesh,
487 ActorMeshKind::V1(actor_mesh) => self
488 .proc_mesh
489 .get_or_init(|| actor_mesh.proc_mesh().clone().into()),
490 }
491 }
492
493 fn name(&self) -> &str {
494 match &self.inner {
495 ActorMeshKind::V0 { name, .. } => name,
496 ActorMeshKind::V1(actor_mesh) => {
497 self.name.get_or_init(|| actor_mesh.name().to_string())
498 }
499 }
500 }
501
502 fn v1(&self) -> Option<v1::ActorMeshRef<Self::Actor>> {
503 match &self.inner {
504 ActorMeshKind::V0 { .. } => None,
505 ActorMeshKind::V1(actor_mesh) => Some(actor_mesh.clone()),
506 }
507 }
508}
509
510pub struct SlicedActorMesh<'a, A: Referable>(&'a RootActorMesh<'a, A>, Shape);
511
512impl<'a, A: Referable> SlicedActorMesh<'a, A> {
513 pub fn new(actor_mesh: &'a RootActorMesh<'a, A>, shape: Shape) -> Self {
514 Self(actor_mesh, shape)
515 }
516
517 pub fn shape(&self) -> &Shape {
518 &self.1
519 }
520}
521
522#[async_trait]
523impl<A: Referable> Mesh for SlicedActorMesh<'_, A> {
524 type Node = ActorRef<A>;
525 type Id = ActorMeshId;
526 type Sliced<'b>
527 = SlicedActorMesh<'b, A>
528 where
529 Self: 'b;
530
531 fn shape(&self) -> &Shape {
532 &self.1
533 }
534
535 fn select<R: Into<Range>>(
536 &self,
537 label: &str,
538 range: R,
539 ) -> Result<Self::Sliced<'_>, ShapeError> {
540 Ok(Self(self.0, self.1.select(label, range)?))
541 }
542
543 fn get(&self, _index: usize) -> Option<ActorRef<A>> {
544 unimplemented!()
545 }
546
547 fn id(&self) -> Self::Id {
548 self.0.id()
549 }
550}
551
552impl<A: Referable> ActorMesh for SlicedActorMesh<'_, A> {
553 type Actor = A;
554
555 fn proc_mesh(&self) -> &ProcMesh {
556 self.0.proc_mesh()
557 }
558
559 fn name(&self) -> &str {
560 self.0.name()
561 }
562
563 #[allow(clippy::result_large_err)] fn cast<M>(&self, cx: &impl context::Actor, sel: Selection, message: M) -> Result<(), CastError>
565 where
566 Self::Actor: RemoteHandles<IndexedErasedUnbound<M>>,
567 M: Castable + RemoteMessage,
568 {
569 cast_to_sliced_mesh::<A, M>(
570 cx,
571 self.id(),
572 self.proc_mesh().comm_actor(),
573 &sel,
574 message,
575 self.shape(),
576 self.0.shape(),
577 )
578 }
579
580 fn v1(&self) -> Option<v1::ActorMeshRef<Self::Actor>> {
581 self.0
582 .v1()
583 .map(|actor_mesh| actor_mesh.subset(self.shape().into()).unwrap())
584 }
585}
586
587#[derive(Debug, thiserror::Error)]
589pub enum CastError {
590 #[error("invalid selection {0}: {1}")]
591 InvalidSelection(Selection, ShapeError),
592
593 #[error("send on rank {0}: {1}")]
594 MailboxSenderError(usize, MailboxSenderError),
595
596 #[error("unsupported selection: {0}")]
597 SelectionNotSupported(String),
598
599 #[error(transparent)]
600 RootMailboxSenderError(#[from] MailboxSenderError),
601
602 #[error(transparent)]
603 ShapeError(#[from] ShapeError),
604
605 #[error(transparent)]
606 SliceError(#[from] SliceError),
607
608 #[error(transparent)]
609 SerializationError(#[from] bincode::Error),
610
611 #[error(transparent)]
612 Other(#[from] anyhow::Error),
613
614 #[error(transparent)]
615 ReshapeError(#[from] ReshapeError),
616}
617
618pub(crate) mod test_util {
621 use std::collections::VecDeque;
622 use std::fmt;
623 use std::fmt::Debug;
624 use std::sync::Arc;
625
626 use anyhow::ensure;
627 use hyperactor::Context;
628 use hyperactor::Handler;
629 use hyperactor::Instance;
630 use hyperactor::PortRef;
631 use ndslice::extent;
632
633 use super::*;
634 use crate::comm::multicast::CastInfo;
635
636 #[derive(Debug, Default, Actor)]
641 #[hyperactor::export(
642 spawn = true,
643 handlers = [
644 Echo { cast = true },
645 Payload { cast = true },
646 GetRank { cast = true },
647 Error { cast = true },
648 Relay,
649 ],
650 )]
651 pub struct TestActor;
652
653 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
664 pub struct GetRank(pub bool, #[binding(include)] pub PortRef<usize>);
665
666 #[async_trait]
667 impl Handler<GetRank> for TestActor {
668 async fn handle(
669 &mut self,
670 cx: &Context<Self>,
671 GetRank(ok, reply): GetRank,
672 ) -> Result<(), anyhow::Error> {
673 let point = cx.cast_point();
674 reply.send(cx, point.rank())?;
675 anyhow::ensure!(ok, "intentional error!"); Ok(())
677 }
678 }
679
680 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
681 pub struct Echo(pub String, #[binding(include)] pub PortRef<String>);
682
683 #[async_trait]
684 impl Handler<Echo> for TestActor {
685 async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
686 let Echo(message, reply_port) = message;
687 reply_port.send(cx, message)?;
688 Ok(())
689 }
690 }
691
692 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
693 pub struct Payload {
694 pub part: Part,
695 #[binding(include)]
696 pub reply_port: PortRef<()>,
697 }
698
699 #[async_trait]
700 impl Handler<Payload> for TestActor {
701 async fn handle(
702 &mut self,
703 cx: &Context<Self>,
704 message: Payload,
705 ) -> Result<(), anyhow::Error> {
706 let Payload { reply_port, .. } = message;
707 reply_port.send(cx, ())?;
708 Ok(())
709 }
710 }
711
712 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
713 pub struct Error(pub String);
714
715 #[async_trait]
716 impl Handler<Error> for TestActor {
717 async fn handle(
718 &mut self,
719 _cx: &Context<Self>,
720 Error(error): Error,
721 ) -> Result<(), anyhow::Error> {
722 Err(anyhow::anyhow!("{}", error))
723 }
724 }
725
726 #[derive(Debug, Serialize, Deserialize, Named, Clone)]
727 pub struct Relay(pub usize, pub VecDeque<PortRef<Relay>>);
728
729 #[async_trait]
730 impl Handler<Relay> for TestActor {
731 async fn handle(
732 &mut self,
733 cx: &Context<Self>,
734 Relay(count, mut hops): Relay,
735 ) -> Result<(), anyhow::Error> {
736 ensure!(!hops.is_empty(), "relay must have at least one hop");
737 let next = hops.pop_front().unwrap();
738 next.send(cx, Relay(count + 1, hops))?;
739 Ok(())
740 }
741 }
742
743 #[hyperactor::export(
746 spawn = true,
747 handlers = [
748 Echo,
749 ],
750 )]
751 pub struct ProxyActor {
752 proc_mesh: &'static Arc<ProcMesh>,
753 actor_mesh: Option<RootActorMesh<'static, TestActor>>,
754 }
755
756 impl fmt::Debug for ProxyActor {
757 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758 f.debug_struct("ProxyActor")
759 .field("proc_mesh", &"...")
760 .field("actor_mesh", &"...")
761 .finish()
762 }
763 }
764
765 #[async_trait]
766 impl Actor for ProxyActor {
767 type Params = ();
768
769 async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
770 use std::sync::Arc;
772
773 use hyperactor::channel::ChannelTransport;
774
775 use crate::alloc::AllocSpec;
776 use crate::alloc::Allocator;
777 use crate::alloc::LocalAllocator;
778
779 let mut allocator = LocalAllocator;
780 let alloc = allocator
781 .allocate(AllocSpec {
782 extent: extent! { replica = 1 },
783 constraints: Default::default(),
784 proc_name: None,
785 transport: ChannelTransport::Local,
786 proc_allocation_mode: Default::default(),
787 })
788 .await
789 .unwrap();
790 let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
791 let leaked: &'static Arc<ProcMesh> = Box::leak(Box::new(proc_mesh));
792 Ok(Self {
793 proc_mesh: leaked,
794 actor_mesh: None,
795 })
796 }
797
798 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
799 self.actor_mesh = Some(self.proc_mesh.spawn(this, "echo", &()).await?);
800 Ok(())
801 }
802 }
803
804 #[async_trait]
805 impl Handler<Echo> for ProxyActor {
806 async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
807 if std::env::var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK").is_err() {
808 let actor = self.actor_mesh.as_ref().unwrap().get(0).unwrap();
811
812 let (tx, mut rx) = cx.open_port();
815
816 actor.send(cx, Echo(message.0, tx.bind()))?;
817 message.1.send(cx, rx.recv().await.unwrap())?;
818
819 Ok(())
820 } else {
821 let actor: ActorRef<_> = self.actor_mesh.as_ref().unwrap().get(0).unwrap();
824 let (tx, mut rx) = cx.open_port::<String>();
825 actor.send(cx, Echo(message.0, tx.bind()))?;
826
827 use tokio::time::Duration;
828 use tokio::time::timeout;
829 #[allow(clippy::disallowed_methods)]
830 if timeout(Duration::from_secs(1), rx.recv()).await.is_ok() {
831 message
832 .1
833 .send(cx, "the impossible happened".to_owned())
834 .unwrap()
835 }
836
837 Ok(())
838 }
839 }
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use std::sync::Arc;
846
847 use hyperactor::ActorId;
848 use hyperactor::PortRef;
849 use hyperactor::ProcId;
850 use hyperactor::WorldId;
851 use hyperactor::attrs::Attrs;
852 use hyperactor::data::Encoding;
853 use timed_test::async_timed_test;
854
855 use super::*;
856 use crate::proc_mesh::ProcEvent;
857
858 #[macro_export]
860 macro_rules! actor_mesh_test_suite {
861 ($allocator:expr) => {
862 use std::assert_matches::assert_matches;
863
864 use ndslice::extent;
865 use $crate::alloc::AllocSpec;
866 use $crate::alloc::Allocator;
867 use $crate::assign::Ranks;
868 use $crate::sel_from_shape;
869 use $crate::sel;
870 use $crate::comm::multicast::set_cast_info_on_headers;
871 use $crate::proc_mesh::SharedSpawnable;
872 use std::collections::VecDeque;
873 use hyperactor::data::Serialized;
874 use $crate::proc_mesh::default_transport;
875
876 use super::*;
877 use super::test_util::*;
878
879 #[tokio::test]
880 async fn test_proxy_mesh() {
881 use super::test_util::*;
882 use $crate::alloc::AllocSpec;
883 use $crate::alloc::Allocator;
884
885 use ndslice::extent;
886
887 let alloc = $allocator
888 .allocate(AllocSpec {
889 extent: extent! { replica = 1 },
890 constraints: Default::default(),
891 proc_name: None,
892 transport: default_transport(),
893 proc_allocation_mode: Default::default(),
894 })
895 .await
896 .unwrap();
897 let instance = $crate::v1::testing::instance().await;
898 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
899 let actor_mesh: RootActorMesh<'_, ProxyActor> = proc_mesh.spawn(&instance, "proxy", &()).await.unwrap();
900 let proxy_actor = actor_mesh.get(0).unwrap();
901 let (tx, mut rx) = actor_mesh.open_port::<String>();
902 proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind())).unwrap();
903
904 #[allow(clippy::disallowed_methods)]
905 match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
906 Ok(msg) => assert_eq!(&msg.unwrap(), "hello!"),
907 Err(_) => assert!(false),
908 }
909 }
910
911 #[tokio::test]
912 async fn test_basic() {
913 let alloc = $allocator
914 .allocate(AllocSpec {
915 extent: extent!(replica = 4),
916 constraints: Default::default(),
917 proc_name: None,
918 transport: default_transport(),
919 proc_allocation_mode: Default::default(),
920 })
921 .await
922 .unwrap();
923
924 let instance = $crate::v1::testing::instance().await;
925 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
926 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn(&instance, "echo", &()).await.unwrap();
927 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
928 actor_mesh
929 .cast(proc_mesh.client(), sel!(*), Echo("Hello".to_string(), reply_handle.bind()))
930 .unwrap();
931 for _ in 0..4 {
932 assert_eq!(&reply_receiver.recv().await.unwrap(), "Hello");
933 }
934 }
935
936 #[tokio::test]
937 async fn test_ping_pong() {
938 use hyperactor::test_utils::pingpong::PingPongActor;
939 use hyperactor::test_utils::pingpong::PingPongMessage;
940 use hyperactor::test_utils::pingpong::PingPongActorParams;
941
942 let alloc = $allocator
943 .allocate(AllocSpec {
944 extent: extent!(replica = 2),
945 constraints: Default::default(),
946 proc_name: None,
947 transport: default_transport(),
948 proc_allocation_mode: Default::default(),
949 })
950 .await
951 .unwrap();
952 let instance = $crate::v1::testing::instance().await;
953 let mesh = ProcMesh::allocate(alloc).await.unwrap();
954
955 let (undeliverable_msg_tx, _) = mesh.client().open_port();
956 let ping_pong_actor_params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
957 let actor_mesh: RootActorMesh<PingPongActor> = mesh
958 .spawn::<PingPongActor>(&instance, "ping-pong", &ping_pong_actor_params)
959 .await
960 .unwrap();
961
962 let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
963 let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
964 let (done_tx, done_rx) = mesh.client().open_once_port();
965 ping.send(mesh.client(), PingPongMessage(4, pong.clone(), done_tx.bind())).unwrap();
966
967 assert!(done_rx.recv().await.unwrap());
968 }
969
970 #[tokio::test]
971 async fn test_pingpong_full_mesh() {
972 use hyperactor::test_utils::pingpong::PingPongActor;
973 use hyperactor::test_utils::pingpong::PingPongActorParams;
974 use hyperactor::test_utils::pingpong::PingPongMessage;
975
976 use futures::future::join_all;
977
978 const X: usize = 3;
979 const Y: usize = 3;
980 const Z: usize = 3;
981 let alloc = $allocator
982 .allocate(AllocSpec {
983 extent: extent!(x = X, y = Y, z = Z),
984 constraints: Default::default(),
985 proc_name: None,
986 transport: default_transport(),
987 proc_allocation_mode: Default::default(),
988 })
989 .await
990 .unwrap();
991
992 let instance = $crate::v1::testing::instance().await;
993 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
994 let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port();
995 let params = PingPongActorParams::new(Some(undeliverable_tx.bind()), None);
996 let actor_mesh = proc_mesh.spawn::<PingPongActor>(&instance, "pingpong", ¶ms).await.unwrap();
997 let slice = actor_mesh.shape().slice();
998
999 let mut futures = Vec::new();
1000 for rank in slice.iter() {
1001 let actor = actor_mesh.get(rank).unwrap();
1002 let coords = (&slice.coordinates(rank).unwrap()[..]).try_into().unwrap();
1003 let sizes = (&slice.sizes())[..].try_into().unwrap();
1004 let neighbors = ndslice::utils::stencil::moore_neighbors::<3>();
1005 for neighbor_coords in ndslice::utils::apply_stencil(&coords, sizes, &neighbors) {
1006 if let Ok(neighbor_rank) = slice.location(&neighbor_coords) {
1007 let neighbor = actor_mesh.get(neighbor_rank).unwrap();
1008 let (done_tx, done_rx) = proc_mesh.client().open_once_port();
1009 actor
1010 .send(
1011 proc_mesh.client(),
1012 PingPongMessage(4, neighbor.clone(), done_tx.bind()),
1013 )
1014 .unwrap();
1015 futures.push(done_rx.recv());
1016 }
1017 }
1018 }
1019 let results = join_all(futures).await;
1020 assert_eq!(results.len(), 316); for result in results {
1022 assert_eq!(result.unwrap(), true);
1023 }
1024 }
1025
1026 #[tokio::test]
1027 async fn test_cast() {
1028 let alloc = $allocator
1029 .allocate(AllocSpec {
1030 extent: extent!(replica = 2, host = 2, gpu = 8),
1031 constraints: Default::default(),
1032 proc_name: None,
1033 transport: default_transport(),
1034 proc_allocation_mode: Default::default(),
1035 })
1036 .await
1037 .unwrap();
1038
1039 let instance = $crate::v1::testing::instance().await;
1040 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1041 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn(&instance, "echo", &()).await.unwrap();
1042 let dont_simulate_error = true;
1043 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1044 actor_mesh
1045 .cast(proc_mesh.client(), sel!(*), GetRank(dont_simulate_error, reply_handle.bind()))
1046 .unwrap();
1047 let mut ranks = Ranks::new(actor_mesh.shape().slice().len());
1048 while !ranks.is_full() {
1049 let rank = reply_receiver.recv().await.unwrap();
1050 assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
1051 }
1052 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1054 actor_mesh
1055 .cast(
1056 proc_mesh.client(),
1057 sel_from_shape!(actor_mesh.shape(), replica = 0, host = 0),
1058 GetRank(dont_simulate_error, reply_handle.bind()),
1059 )
1060 .unwrap();
1061 let mut ranks = Ranks::new(8);
1062 while !ranks.is_full() {
1063 let rank = reply_receiver.recv().await.unwrap();
1064 assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
1065 }
1066 }
1067
1068 #[tokio::test]
1069 async fn test_inter_actor_comms() {
1070 let alloc = $allocator
1071 .allocate(AllocSpec {
1072 extent: extent!(replica = 2, host = 2, gpu = 8),
1076 constraints: Default::default(),
1077 proc_name: None,
1078 transport: default_transport(),
1079 proc_allocation_mode: Default::default(),
1080 })
1081 .await
1082 .unwrap();
1083
1084 let instance = $crate::v1::testing::instance().await;
1085 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1086 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn(&instance, "echo", &()).await.unwrap();
1087
1088 let mut hops: VecDeque<_> = actor_mesh.iter().map(|actor| actor.port()).collect();
1090 let (handle, mut rx) = proc_mesh.client().open_port();
1091 hops.push_back(handle.bind());
1092 hops.pop_front()
1093 .unwrap()
1094 .send(proc_mesh.client(), Relay(0, hops))
1095 .unwrap();
1096 assert_matches!(
1097 rx.recv().await.unwrap(),
1098 Relay(count, hops)
1099 if count == actor_mesh.shape().slice().len()
1100 && hops.is_empty());
1101 }
1102
1103 #[tokio::test]
1104 async fn test_inter_proc_mesh_comms() {
1105 let mut meshes = Vec::new();
1106 let instance = $crate::v1::testing::instance().await;
1107 for _ in 0..2 {
1108 let alloc = $allocator
1109 .allocate(AllocSpec {
1110 extent: extent!(replica = 1),
1111 constraints: Default::default(),
1112 proc_name: None,
1113 transport: default_transport(),
1114 proc_allocation_mode: Default::default(),
1115 })
1116 .await
1117 .unwrap();
1118
1119 let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
1120 let proc_mesh_clone = Arc::clone(&proc_mesh);
1121 let actor_mesh : RootActorMesh<TestActor> = proc_mesh_clone.spawn(&instance, "echo", &()).await.unwrap();
1122 meshes.push((proc_mesh, actor_mesh));
1123 }
1124
1125 let mut hops: VecDeque<_> = meshes
1126 .iter()
1127 .flat_map(|(_proc_mesh, actor_mesh)| actor_mesh.iter())
1128 .map(|actor| actor.port())
1129 .collect();
1130 let num_hops = hops.len();
1131
1132 let client = meshes[0].0.client();
1133 let (handle, mut rx) = client.open_port();
1134 hops.push_back(handle.bind());
1135 hops.pop_front()
1136 .unwrap()
1137 .send(client, Relay(0, hops))
1138 .unwrap();
1139 assert_matches!(
1140 rx.recv().await.unwrap(),
1141 Relay(count, hops)
1142 if count == num_hops
1143 && hops.is_empty());
1144 }
1145
1146 #[async_timed_test(timeout_secs = 60)]
1147 async fn test_actor_mesh_cast() {
1148 use $crate::sel;
1152 use $crate::comm::test_utils::TestActor as CastTestActor;
1153 use $crate::comm::test_utils::TestActorParams as CastTestActorParams;
1154 use $crate::comm::test_utils::TestMessage as CastTestMessage;
1155
1156 let extent = extent!(replica = 4, host = 4, gpu = 4);
1157 let num_actors = extent.len();
1158 let alloc = $allocator
1159 .allocate(AllocSpec {
1160 extent,
1161 constraints: Default::default(),
1162 proc_name: None,
1163 transport: default_transport(),
1164 proc_allocation_mode: Default::default(),
1165 })
1166 .await
1167 .unwrap();
1168
1169 let instance = $crate::v1::testing::instance().await;
1170 let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1171
1172 let (tx, mut rx) = hyperactor::mailbox::open_port(proc_mesh.client());
1173 let params = CastTestActorParams{ forward_port: tx.bind() };
1174 let actor_mesh: RootActorMesh<CastTestActor> = proc_mesh.spawn(&instance, "actor", ¶ms).await.unwrap();
1175
1176 actor_mesh.cast(proc_mesh.client(), sel!(*), CastTestMessage::Forward("abc".to_string())).unwrap();
1177
1178 for _ in 0..num_actors {
1179 assert_eq!(rx.recv().await.unwrap(), CastTestMessage::Forward("abc".to_string()));
1180 }
1181
1182 proc_mesh.events().unwrap().into_alloc().stop_and_wait().await.unwrap();
1187 }
1188
1189 #[tokio::test]
1190 async fn test_delivery_failure() {
1191 let alloc = $allocator
1192 .allocate(AllocSpec {
1193 extent: extent!(replica = 1 ),
1194 constraints: Default::default(),
1195 proc_name: None,
1196 transport: default_transport(),
1197 proc_allocation_mode: Default::default(),
1198 })
1199 .await
1200 .unwrap();
1201
1202 let name = alloc.name().to_string();
1203 let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1204 let mut events = mesh.events().unwrap();
1205
1206 let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
1208 let bad_actor = ActorRef::<TestActor>::attest(ActorId(ProcId::Ranked(WorldId(name.clone()), 0), "foo".into(), 0));
1209 bad_actor.send(mesh.client(), GetRank(true, unmonitored_reply_to)).unwrap();
1210
1211 assert_matches!(
1213 events.next().await.unwrap(),
1214 ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1215 );
1216
1217 }
1219
1220 #[tokio::test]
1221 async fn test_send_with_headers() {
1222 let extent = extent!(replica = 3);
1223 let alloc = $allocator
1224 .allocate(AllocSpec {
1225 extent: extent.clone(),
1226 constraints: Default::default(),
1227 proc_name: None,
1228 transport: default_transport(),
1229 proc_allocation_mode: Default::default(),
1230 })
1231 .await
1232 .unwrap();
1233
1234 let instance = $crate::v1::testing::instance().await;
1235 let mesh = ProcMesh::allocate(alloc).await.unwrap();
1236 let (reply_port_handle, mut reply_port_receiver) = mesh.client().open_port::<usize>();
1237 let reply_port = reply_port_handle.bind();
1238
1239 let actor_mesh: RootActorMesh<TestActor> = mesh.spawn(&instance, "test", &()).await.unwrap();
1240 let actor_ref = actor_mesh.get(0).unwrap();
1241 let mut headers = Attrs::new();
1242 set_cast_info_on_headers(&mut headers, extent.point_of_rank(0).unwrap(), mesh.client().self_id().clone());
1243 actor_ref.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone())).unwrap();
1244 assert_eq!(0, reply_port_receiver.recv().await.unwrap());
1245
1246 set_cast_info_on_headers(&mut headers, extent.point_of_rank(1).unwrap(), mesh.client().self_id().clone());
1247 actor_ref.port()
1248 .send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone()))
1249 .unwrap();
1250 assert_eq!(1, reply_port_receiver.recv().await.unwrap());
1251
1252 set_cast_info_on_headers(&mut headers, extent.point_of_rank(2).unwrap(), mesh.client().self_id().clone());
1253 actor_ref.actor_id()
1254 .port_id(GetRank::port())
1255 .send_with_headers(
1256 mesh.client(),
1257 Serialized::serialize(&GetRank(true, reply_port)).unwrap(),
1258 headers
1259 );
1260 assert_eq!(2, reply_port_receiver.recv().await.unwrap());
1261 }
1263 }
1264 }
1265
1266 mod local {
1267 use hyperactor::channel::ChannelTransport;
1268
1269 use crate::alloc::local::LocalAllocator;
1270
1271 actor_mesh_test_suite!(LocalAllocator);
1272
1273 #[tokio::test]
1274 async fn test_send_failure() {
1275 hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
1276
1277 use hyperactor::test_utils::pingpong::PingPongActor;
1278 use hyperactor::test_utils::pingpong::PingPongActorParams;
1279 use hyperactor::test_utils::pingpong::PingPongMessage;
1280
1281 use crate::alloc::ProcStopReason;
1282 use crate::proc_mesh::ProcEvent;
1283
1284 let config = hyperactor::config::global::lock();
1285 let _guard = config.override_key(
1286 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1287 tokio::time::Duration::from_secs(1),
1288 );
1289
1290 let alloc = LocalAllocator
1291 .allocate(AllocSpec {
1292 extent: extent!(replica = 2),
1293 constraints: Default::default(),
1294 proc_name: None,
1295 transport: ChannelTransport::Local,
1296 proc_allocation_mode: Default::default(),
1297 })
1298 .await
1299 .unwrap();
1300 let instance = crate::v1::testing::instance().await;
1301 let monkey = alloc.chaos_monkey();
1302 let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1303 let mut events = mesh.events().unwrap();
1304
1305 let ping_pong_actor_params = PingPongActorParams::new(
1306 Some(PortRef::attest_message_port(mesh.client().self_id())),
1307 None,
1308 );
1309 let actor_mesh: RootActorMesh<PingPongActor> = mesh
1310 .spawn::<PingPongActor>(&instance, "ping-pong", &ping_pong_actor_params)
1311 .await
1312 .unwrap();
1313
1314 let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
1315 let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
1316
1317 monkey(0, ProcStopReason::Killed(0, false));
1319 assert_matches!(
1320 events.next().await.unwrap(),
1321 ProcEvent::Stopped(0, ProcStopReason::Killed(0, false))
1322 );
1323
1324 let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1327 ping.send(
1328 mesh.client(),
1329 PingPongMessage(1, pong.clone(), unmonitored_done_tx.bind()),
1330 )
1331 .unwrap();
1332
1333 assert_matches!(
1335 events.next().await.unwrap(),
1336 ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1337 );
1338
1339 let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1342 pong.send(
1343 mesh.client(),
1344 PingPongMessage(1, ping.clone(), unmonitored_done_tx.bind()),
1345 )
1346 .unwrap();
1347
1348 assert_matches!(
1350 events.next().await.unwrap(),
1351 ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1352 );
1353 }
1354
1355 #[tokio::test]
1356 async fn test_cast_failure() {
1357 use crate::alloc::ProcStopReason;
1358 use crate::proc_mesh::ProcEvent;
1359 use crate::sel;
1360
1361 let alloc = LocalAllocator
1362 .allocate(AllocSpec {
1363 extent: extent!(replica = 1),
1364 constraints: Default::default(),
1365 proc_name: None,
1366 transport: ChannelTransport::Local,
1367 proc_allocation_mode: Default::default(),
1368 })
1369 .await
1370 .unwrap();
1371 let instance = crate::v1::testing::instance().await;
1372
1373 let stop = alloc.stopper();
1374 let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1375 let mut events = mesh.events().unwrap();
1376
1377 let actor_mesh = mesh
1378 .spawn::<TestActor>(&instance, "reply-then-fail", &())
1379 .await
1380 .unwrap();
1381
1382 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1385 actor_mesh
1386 .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1387 .unwrap();
1388 let rank = reply_receiver.recv().await.unwrap();
1389 assert_eq!(rank, 0);
1390
1391 assert_matches!(
1393 events.next().await.unwrap(),
1394 ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
1395 );
1396
1397 let (reply_handle, _) = actor_mesh.open_port();
1399 actor_mesh
1400 .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1401 .unwrap();
1402
1403 assert_matches!(
1405 events.next().await.unwrap(),
1406 ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1407 );
1408
1409 stop();
1411 assert_matches!(
1412 events.next().await.unwrap(),
1413 ProcEvent::Stopped(0, ProcStopReason::Stopped),
1414 );
1415 assert!(events.next().await.is_none());
1416 }
1417
1418 #[tracing_test::traced_test]
1419 #[tokio::test]
1420 async fn test_stop_actor_mesh() {
1421 use hyperactor::test_utils::pingpong::PingPongActor;
1422 use hyperactor::test_utils::pingpong::PingPongActorParams;
1423 use hyperactor::test_utils::pingpong::PingPongMessage;
1424
1425 let config = hyperactor::config::global::lock();
1426 let _guard = config.override_key(
1427 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1428 tokio::time::Duration::from_secs(1),
1429 );
1430
1431 let alloc = LocalAllocator
1432 .allocate(AllocSpec {
1433 extent: extent!(replica = 2),
1434 constraints: Default::default(),
1435 proc_name: None,
1436 transport: ChannelTransport::Local,
1437 proc_allocation_mode: Default::default(),
1438 })
1439 .await
1440 .unwrap();
1441 let instance = crate::v1::testing::instance().await;
1442 let mesh = ProcMesh::allocate(alloc).await.unwrap();
1443
1444 let ping_pong_actor_params = PingPongActorParams::new(
1445 Some(PortRef::attest_message_port(mesh.client().self_id())),
1446 None,
1447 );
1448 let mesh_one: RootActorMesh<PingPongActor> = mesh
1449 .spawn::<PingPongActor>(&instance, "mesh_one", &ping_pong_actor_params)
1450 .await
1451 .unwrap();
1452
1453 let mesh_two: RootActorMesh<PingPongActor> = mesh
1454 .spawn::<PingPongActor>(&instance, "mesh_two", &ping_pong_actor_params)
1455 .await
1456 .unwrap();
1457
1458 mesh_two.stop(&instance).await.unwrap();
1459
1460 let ping_two: ActorRef<PingPongActor> = mesh_two.get(0).unwrap();
1461 let pong_two: ActorRef<PingPongActor> = mesh_two.get(1).unwrap();
1462
1463 assert!(logs_contain(&format!(
1464 "stopped actor {}",
1465 ping_two.actor_id()
1466 )));
1467 assert!(logs_contain(&format!(
1468 "stopped actor {}",
1469 pong_two.actor_id()
1470 )));
1471
1472 let ping_one: ActorRef<PingPongActor> = mesh_one.get(0).unwrap();
1474 let pong_one: ActorRef<PingPongActor> = mesh_one.get(1).unwrap();
1475 let (done_tx, done_rx) = mesh.client().open_once_port();
1476 pong_one
1477 .send(
1478 mesh.client(),
1479 PingPongMessage(1, ping_one.clone(), done_tx.bind()),
1480 )
1481 .unwrap();
1482 assert!(done_rx.recv().await.is_ok());
1483 }
1484 } mod process {
1487
1488 use bytes::Bytes;
1489 use hyperactor::PortId;
1490 use hyperactor::channel::ChannelTransport;
1491 use hyperactor::clock::Clock;
1492 use hyperactor::clock::RealClock;
1493 use hyperactor::mailbox::MessageEnvelope;
1494 use rand::Rng;
1495 use tokio::process::Command;
1496
1497 use crate::alloc::process::ProcessAllocator;
1498
1499 #[cfg(fbcode_build)]
1500 fn process_allocator() -> ProcessAllocator {
1501 ProcessAllocator::new(Command::new(crate::testresource::get(
1502 "monarch/hyperactor_mesh/bootstrap",
1503 )))
1504 }
1505
1506 #[cfg(fbcode_build)] actor_mesh_test_suite!(process_allocator());
1508
1509 #[cfg(fbcode_build)]
1512 #[async_timed_test(timeout_secs = 30)]
1514 async fn test_oversized_frames() {
1515 #[derive(Debug, Serialize, Deserialize, PartialEq)]
1517 enum Frame<M> {
1518 Init(u64),
1519 Message(u64, M),
1520 }
1521 fn frame_length(src: &ActorId, dst: &PortId, pay: &Payload) -> usize {
1523 let serialized = Serialized::serialize(pay).unwrap();
1524 let mut headers = Attrs::new();
1525 hyperactor::mailbox::headers::set_send_timestamp(&mut headers);
1526 hyperactor::mailbox::headers::set_rust_message_type::<Payload>(&mut headers);
1527 let envelope = MessageEnvelope::new(src.clone(), dst.clone(), serialized, headers);
1528 let frame = Frame::Message(0u64, envelope);
1529 let message = serde_multipart::serialize_illegal_bincode(&frame).unwrap();
1530 message.frame_len()
1531 }
1532
1533 let config = hyperactor::config::global::lock();
1535 let _guard2 =
1537 config.override_key(hyperactor::config::CODEC_MAX_FRAME_LENGTH, 1024usize);
1538 unsafe {
1541 std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024");
1542 };
1543 let _guard3 =
1544 config.override_key(hyperactor::config::DEFAULT_ENCODING, Encoding::Bincode);
1545 let _guard4 = config.override_key(hyperactor::config::CHANNEL_MULTIPART, false);
1546
1547 let alloc = process_allocator()
1548 .allocate(AllocSpec {
1549 extent: extent!(replica = 1),
1550 constraints: Default::default(),
1551 proc_name: None,
1552 transport: ChannelTransport::Unix,
1553 proc_allocation_mode: Default::default(),
1554 })
1555 .await
1556 .unwrap();
1557 let instance = crate::v1::testing::instance().await;
1558 let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1559 let mut proc_events = proc_mesh.events().unwrap();
1560 let actor_mesh: RootActorMesh<TestActor> =
1561 proc_mesh.spawn(&instance, "ingest", &()).await.unwrap();
1562 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1563 let dest = actor_mesh.get(0).unwrap();
1564
1565 let payload = Payload {
1567 part: Part::from(Bytes::from(vec![0u8; 586])),
1568 reply_port: reply_handle.bind(),
1569 };
1570 let frame_len = frame_length(
1571 proc_mesh.client().self_id(),
1572 dest.port::<Payload>().port_id(),
1573 &payload,
1574 );
1575 assert_eq!(frame_len, 1024);
1576
1577 dest.send(proc_mesh.client(), payload).unwrap();
1579 #[allow(clippy::disallowed_methods)]
1580 let result = RealClock
1581 .timeout(Duration::from_secs(2), reply_receiver.recv())
1582 .await;
1583 assert!(result.is_ok(), "Operation should not time out");
1584
1585 let payload = Payload {
1587 part: Part::from(Bytes::from(vec![0u8; 587])),
1588 reply_port: reply_handle.bind(),
1589 };
1590 let frame_len = frame_length(
1591 proc_mesh.client().self_id(),
1592 dest.port::<Payload>().port_id(),
1593 &payload,
1594 );
1595 assert_eq!(frame_len, 1025); if rand::thread_rng().gen_bool(0.5) {
1600 dest.send(proc_mesh.client(), payload).unwrap();
1601 } else {
1602 actor_mesh
1603 .cast(proc_mesh.client(), sel!(*), payload)
1604 .unwrap();
1605 }
1606
1607 {
1610 let event = proc_events.next().await.unwrap();
1611 assert_matches!(
1612 event,
1613 ProcEvent::Crashed(_, _),
1614 "Should have received crash event"
1615 );
1616 }
1617 }
1618
1619 #[cfg(fbcode_build)]
1623 #[tokio::test]
1624 async fn test_router_undeliverable_return() {
1625 use ndslice::extent;
1628
1629 use super::test_util::*;
1630 use crate::alloc::AllocSpec;
1631 use crate::alloc::Allocator;
1632
1633 let alloc = process_allocator()
1634 .allocate(AllocSpec {
1635 extent: extent! { replica = 1 },
1636 constraints: Default::default(),
1637 proc_name: None,
1638 transport: ChannelTransport::Unix,
1639 proc_allocation_mode: Default::default(),
1640 })
1641 .await
1642 .unwrap();
1643
1644 unsafe { std::env::set_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK", "1") };
1646
1647 let instance = crate::v1::testing::instance().await;
1648 let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1649 let mut proc_events = proc_mesh.events().unwrap();
1650 let mut actor_mesh: RootActorMesh<'_, ProxyActor> =
1651 { proc_mesh.spawn(&instance, "proxy", &()).await.unwrap() };
1652 let mut actor_events = actor_mesh.events().unwrap();
1653
1654 let proxy_actor = actor_mesh.get(0).unwrap();
1655 let (tx, mut rx) = actor_mesh.open_port::<String>();
1656 proxy_actor
1657 .send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))
1658 .unwrap();
1659
1660 #[allow(clippy::disallowed_methods)]
1661 match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
1662 Ok(_) => panic!("the impossible happened"),
1663 Err(_) => {
1664 assert_matches!(
1665 proc_events.next().await.unwrap(),
1666 ProcEvent::Crashed(0, reason) if reason.contains("undeliverable")
1667 );
1668 assert_eq!(
1669 actor_events.next().await.unwrap().actor_id.name(),
1670 actor_mesh.name(),
1671 );
1672 }
1673 }
1674
1675 unsafe { std::env::remove_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK") };
1677 }
1678 }
1679
1680 mod sim {
1681 use crate::alloc::sim::SimAllocator;
1682
1683 actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1684 }
1685
1686 mod reshape_cast {
1687 use async_trait::async_trait;
1688 use hyperactor::Actor;
1689 use hyperactor::Context;
1690 use hyperactor::Handler;
1691 use hyperactor::channel::ChannelAddr;
1692 use hyperactor::channel::ChannelTransport;
1693 use hyperactor::channel::ChannelTx;
1694 use hyperactor::channel::Rx;
1695 use hyperactor::channel::Tx;
1696 use hyperactor::channel::dial;
1697 use hyperactor::channel::serve;
1698 use hyperactor::clock::Clock;
1699 use hyperactor::clock::RealClock;
1700 use ndslice::Selection;
1701
1702 use crate::Mesh;
1703 use crate::ProcMesh;
1704 use crate::RootActorMesh;
1705 use crate::actor_mesh::ActorMesh;
1706 use crate::alloc::AllocSpec;
1707 use crate::alloc::Allocator;
1708 use crate::alloc::LocalAllocator;
1709 use crate::config::MAX_CAST_DIMENSION_SIZE;
1710
1711 #[derive(Debug)]
1712 #[hyperactor::export(
1713 spawn = true,
1714 handlers = [() { cast = true }],
1715 )]
1716 struct EchoActor(ChannelTx<usize>);
1717
1718 #[async_trait]
1719 impl Actor for EchoActor {
1720 type Params = ChannelAddr;
1721
1722 async fn new(params: ChannelAddr) -> Result<Self, anyhow::Error> {
1723 Ok(Self(dial::<usize>(params)?))
1724 }
1725 }
1726
1727 #[async_trait]
1728 impl Handler<()> for EchoActor {
1729 async fn handle(
1730 &mut self,
1731 cx: &Context<Self>,
1732 _message: (),
1733 ) -> Result<(), anyhow::Error> {
1734 let Self(port) = self;
1735 port.post(cx.self_id().rank());
1736 Ok(())
1737 }
1738 }
1739
1740 async fn validate_cast<A>(
1741 actor_mesh: &A,
1742 caps: &impl hyperactor::context::Actor,
1743 addr: ChannelAddr,
1744 selection: Selection,
1745 ) where
1746 A: ActorMesh<Actor = EchoActor>,
1747 {
1748 let config = hyperactor::config::global::lock();
1749 let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2);
1750
1751 let (_, mut rx) = serve::<usize>(addr).unwrap();
1752
1753 let expected_ranks = selection
1754 .eval(
1755 &ndslice::selection::EvalOpts::strict(),
1756 actor_mesh.shape().slice(),
1757 )
1758 .unwrap()
1759 .collect::<std::collections::BTreeSet<_>>();
1760
1761 actor_mesh.cast(caps, selection, ()).unwrap();
1762
1763 let mut received = std::collections::BTreeSet::new();
1764
1765 for _ in 0..(expected_ranks.len()) {
1766 received.insert(
1767 RealClock
1768 .timeout(tokio::time::Duration::from_secs(1), rx.recv())
1769 .await
1770 .unwrap()
1771 .unwrap(),
1772 );
1773 }
1774
1775 assert_eq!(received, expected_ranks);
1776 }
1777
1778 use ndslice::strategy::gen_extent;
1779 use ndslice::strategy::gen_selection;
1780 use proptest::prelude::*;
1781 use proptest::test_runner::TestRunner;
1782
1783 fn make_tokio_runtime() -> tokio::runtime::Runtime {
1784 tokio::runtime::Builder::new_multi_thread()
1785 .enable_all()
1786 .worker_threads(2)
1787 .build()
1788 .unwrap()
1789 }
1790
1791 proptest! {
1792 #![proptest_config(ProptestConfig {
1793 cases: 8, ..ProptestConfig::default()
1794 })]
1795 #[test]
1796 fn test_reshaped_actor_mesh_cast(extent in gen_extent(1..=4, 8)) {
1797 let runtime = make_tokio_runtime();
1798 let alloc = runtime.block_on(LocalAllocator
1799 .allocate(AllocSpec {
1800 extent,
1801 constraints: Default::default(),
1802 proc_name: None,
1803 transport: ChannelTransport::Local,
1804 proc_allocation_mode: Default::default(),
1805 }))
1806 .unwrap();
1807 let instance = runtime.block_on(crate::v1::testing::instance());
1808 let proc_mesh = runtime.block_on(ProcMesh::allocate(alloc)).unwrap();
1809
1810 let addr = ChannelAddr::any(ChannelTransport::Unix);
1811
1812 let actor_mesh: RootActorMesh<EchoActor> =
1813 runtime.block_on(proc_mesh.spawn(&instance, "echo", &addr)).unwrap();
1814
1815 let mut runner = TestRunner::default();
1816 let selection = gen_selection(4, actor_mesh.shape().slice().sizes().to_vec(), 0)
1817 .new_tree(&mut runner)
1818 .unwrap()
1819 .current();
1820
1821 runtime.block_on(validate_cast(&actor_mesh, actor_mesh.proc_mesh().client(), addr, selection));
1822 }
1823 }
1824
1825 proptest! {
1826 #![proptest_config(ProptestConfig {
1827 cases: 8, ..ProptestConfig::default()
1828 })]
1829 #[test]
1830 fn test_reshaped_actor_mesh_slice_cast(extent in gen_extent(1..=4, 8)) {
1831 let runtime = make_tokio_runtime();
1832 let alloc = runtime.block_on(LocalAllocator
1833 .allocate(AllocSpec {
1834 extent: extent.clone(),
1835 constraints: Default::default(),
1836 proc_name: None,
1837 transport: ChannelTransport::Local,
1838 proc_allocation_mode: Default::default(),
1839 }))
1840 .unwrap();
1841 let instance = runtime.block_on(crate::v1::testing::instance());
1842 let proc_mesh = runtime.block_on(ProcMesh::allocate(alloc)).unwrap();
1843
1844 let addr = ChannelAddr::any(ChannelTransport::Unix);
1845
1846 let actor_mesh: RootActorMesh<EchoActor> =
1847 runtime.block_on(proc_mesh.spawn(&instance, "echo", &addr)).unwrap();
1848
1849
1850 let first_label = extent.labels().first().unwrap();
1851 let slice = actor_mesh.select(first_label, 0..extent.size(first_label).unwrap()).unwrap();
1852
1853 let slice = if extent.len() >= 2 {
1855 let label = &extent.labels()[1];
1856 let size = extent.size(label).unwrap();
1857 let start = if size > 1 { 1 } else { 0 };
1858 let end = (if size > 1 { size - 1 } else { 1 }).max(start + 1);
1859 slice.select(label, start..end).unwrap()
1860 } else {
1861 slice
1862 };
1863
1864 let slice = if extent.len() >= 3 {
1865 let label = &extent.labels()[2];
1866 let size = extent.size(label).unwrap();
1867 let start = if size > 1 { 1 } else { 0 };
1868 let end = (if size > 1 { size - 1 } else { 1 }).max(start + 1);
1869 slice.select(label, start..end).unwrap()
1870 } else {
1871 slice
1872 };
1873
1874 let slice = if extent.len() >= 4 {
1875 let label = &extent.labels()[3];
1876 let size = extent.size(label).unwrap();
1877 let start = if size > 1 { 1 } else { 0 };
1878 let end = (if size > 1 { size - 1 } else { 1 }).max(start + 1);
1879 slice.select(label, start..end).unwrap()
1880 } else {
1881 slice
1882 };
1883
1884
1885 let mut runner = TestRunner::default();
1886 let selection = gen_selection(4, slice.shape().slice().sizes().to_vec(), 0)
1887 .new_tree(&mut runner)
1888 .unwrap()
1889 .current();
1890
1891 runtime.block_on(validate_cast(
1892 &slice,
1893 actor_mesh.proc_mesh().client(),
1894 addr,
1895 selection
1896 ));
1897 }
1898 }
1899
1900 proptest! {
1901 #![proptest_config(ProptestConfig {
1902 cases: 8, ..ProptestConfig::default()
1903 })]
1904 #[test]
1905 fn test_reshaped_actor_mesh_cast_with_selection(extent in gen_extent(1..=4, 8)) {
1906 let runtime = make_tokio_runtime();
1907 let alloc = runtime.block_on(LocalAllocator
1908 .allocate(AllocSpec {
1909 extent,
1910 constraints: Default::default(),
1911 proc_name: None,
1912 transport: ChannelTransport::Local,
1913 proc_allocation_mode: Default::default(),
1914 }))
1915 .unwrap();
1916 let instance = runtime.block_on(crate::v1::testing::instance());
1917 let proc_mesh = runtime.block_on(ProcMesh::allocate(alloc)).unwrap();
1918
1919 let addr = ChannelAddr::any(ChannelTransport::Unix);
1920
1921 let actor_mesh: RootActorMesh<EchoActor> =
1922 runtime.block_on(proc_mesh.spawn(&instance, "echo", &addr)).unwrap();
1923
1924 let mut runner = TestRunner::default();
1925 let selection = gen_selection(4, actor_mesh.shape().slice().sizes().to_vec(), 0)
1926 .new_tree(&mut runner)
1927 .unwrap()
1928 .current();
1929
1930 runtime.block_on(validate_cast(
1931 &actor_mesh,
1932 actor_mesh.proc_mesh().client(),
1933 addr,
1934 selection
1935 ));
1936 }
1937 }
1938 }
1939
1940 mod shim {
1941 use std::collections::HashSet;
1942
1943 use hyperactor::context::Mailbox;
1944 use ndslice::Extent;
1945 use ndslice::extent;
1946
1947 use super::*;
1948 use crate::sel;
1949
1950 #[tokio::test]
1951 #[cfg(fbcode_build)]
1952 async fn test_basic() {
1953 let instance = v1::testing::instance().await;
1954 let host_mesh = v1::testing::host_mesh(extent!(host = 4)).await;
1955 let proc_mesh = host_mesh
1956 .spawn(instance, "test", Extent::unity())
1957 .await
1958 .unwrap();
1959 let actor_mesh = proc_mesh
1960 .spawn::<v1::testactor::TestActor>(instance, "test", &())
1961 .await
1962 .unwrap();
1963
1964 let actor_mesh_v0: RootActorMesh<'_, _> = actor_mesh.clone().into();
1965
1966 let (cast_info, mut cast_info_rx) = instance.mailbox().open_port();
1967 actor_mesh_v0
1968 .cast(
1969 instance,
1970 sel!(*),
1971 v1::testactor::GetCastInfo {
1972 cast_info: cast_info.bind(),
1973 },
1974 )
1975 .unwrap();
1976
1977 let mut point_to_actor: HashSet<_> = actor_mesh.iter().collect();
1978 while !point_to_actor.is_empty() {
1979 let (point, origin_actor_ref, sender_actor_id) = cast_info_rx.recv().await.unwrap();
1980 let key = (point, origin_actor_ref);
1981 assert!(
1982 point_to_actor.remove(&key),
1983 "key {:?} not present or removed twice",
1984 key
1985 );
1986 assert_eq!(&sender_actor_id, instance.self_id());
1987 }
1988 }
1989 }
1990}