1#![allow(dead_code)] use std::collections::BTreeSet;
12use std::ops::Deref;
13
14use async_trait::async_trait;
15use hyperactor::Actor;
16use hyperactor::ActorRef;
17use hyperactor::Bind;
18use hyperactor::GangId;
19use hyperactor::GangRef;
20use hyperactor::Message;
21use hyperactor::Named;
22use hyperactor::PortHandle;
23use hyperactor::RemoteHandles;
24use hyperactor::RemoteMessage;
25use hyperactor::Unbind;
26use hyperactor::WorldId;
27use hyperactor::actor::RemoteActor;
28use hyperactor::attrs::Attrs;
29use hyperactor::attrs::declare_attrs;
30use hyperactor::cap;
31use hyperactor::cap::CanSend;
32use hyperactor::mailbox::MailboxSenderError;
33use hyperactor::mailbox::PortReceiver;
34use hyperactor::message::Castable;
35use hyperactor::message::IndexedErasedUnbound;
36use hyperactor::supervision::ActorSupervisionEvent;
37use ndslice::Range;
38use ndslice::Selection;
39use ndslice::Shape;
40use ndslice::ShapeError;
41use ndslice::SliceError;
42use ndslice::selection;
43use ndslice::selection::EvalOpts;
44use ndslice::selection::ReifySlice;
45use ndslice::selection::normal;
46use serde::Deserialize;
47use serde::Serialize;
48use tokio::sync::mpsc;
49
50use crate::CommActor;
51use crate::Mesh;
52use crate::comm::multicast::CastMessage;
53use crate::comm::multicast::CastMessageEnvelope;
54use crate::comm::multicast::Uslice;
55use crate::metrics;
56use crate::proc_mesh::ProcMesh;
57use crate::reference::ActorMeshId;
58use crate::reference::ActorMeshRef;
59use crate::reference::ProcMeshId;
60
61declare_attrs! {
62 pub attr CAST_ACTOR_MESH_ID: ActorMeshId;
66}
67
68#[allow(clippy::result_large_err)] pub(crate) fn actor_mesh_cast<A, M>(
72 caps: &impl cap::CanSend,
73 actor_mesh_id: ActorMeshId,
74 comm_actor_ref: &ActorRef<CommActor>,
75 selection_of_root: Selection,
76 root_mesh_shape: &Shape,
77 cast_mesh_shape: &Shape,
78 message: M,
79) -> Result<(), CastError>
80where
81 A: RemoteActor + RemoteHandles<IndexedErasedUnbound<M>>,
82 M: Castable + RemoteMessage,
83{
84 let _ = metrics::ACTOR_MESH_CAST_DURATION.start(hyperactor::kv_pairs!(
85 "message_type" => M::typename(),
86 "message_variant" => message.arm().unwrap_or_default(),
87 ));
88
89 let message = CastMessageEnvelope::new::<A, M>(
90 actor_mesh_id.clone(),
91 caps.actor_id().clone(),
92 cast_mesh_shape.clone(),
93 message,
94 )?;
95 let cast_message = CastMessage {
96 dest: Uslice {
100 slice: root_mesh_shape.slice().clone(),
101 selection: selection_of_root,
102 },
103 message,
104 };
105
106 let mut headers = Attrs::new();
107 headers.set(CAST_ACTOR_MESH_ID, actor_mesh_id);
108
109 comm_actor_ref
110 .port()
111 .send_with_headers(caps, headers, cast_message)?;
112
113 Ok(())
114}
115
116#[allow(clippy::result_large_err)] pub(crate) fn cast_to_sliced_mesh<A, M>(
118 caps: &impl cap::CanSend,
119 actor_mesh_id: ActorMeshId,
120 comm_actor_ref: &ActorRef<CommActor>,
121 sel_of_sliced: &Selection,
122 message: M,
123 sliced_shape: &Shape,
124 root_mesh_shape: &Shape,
125) -> Result<(), CastError>
126where
127 A: RemoteActor + RemoteHandles<IndexedErasedUnbound<M>>,
128 M: Castable + RemoteMessage,
129{
130 let root_slice = root_mesh_shape.slice();
131
132 let sel_of_root = if selection::normalize(sel_of_sliced) == normal::NormalizedSelection::True {
134 root_slice.reify_slice(sliced_shape.slice())?
136 } else {
137 let ranks = sel_of_sliced
139 .eval(&EvalOpts::strict(), sliced_shape.slice())?
140 .collect::<BTreeSet<_>>();
141 Selection::of_ranks(root_slice, &ranks)?
142 };
143
144 actor_mesh_cast::<A, M>(
146 caps,
147 actor_mesh_id,
148 comm_actor_ref,
149 sel_of_root,
150 root_mesh_shape,
151 sliced_shape,
152 message,
153 )
154}
155
156#[async_trait]
158pub trait ActorMesh: Mesh<Id = ActorMeshId> {
159 type Actor: RemoteActor;
161
162 #[allow(clippy::result_large_err)] fn cast<M>(
166 &self,
167 sender: &impl CanSend,
168 selection: Selection,
169 message: M,
170 ) -> Result<(), CastError>
171 where
172 Self::Actor: RemoteHandles<IndexedErasedUnbound<M>>,
173 M: Castable + RemoteMessage,
174 {
175 actor_mesh_cast::<Self::Actor, M>(
176 sender, self.id(), self.proc_mesh().comm_actor(), selection, self.shape(), self.shape(), message, )
184 }
185
186 fn proc_mesh(&self) -> &ProcMesh;
188
189 fn name(&self) -> &str;
191
192 fn world_id(&self) -> &WorldId {
193 self.proc_mesh().world_id()
194 }
195
196 fn iter_actor_refs(&self) -> impl Iterator<Item = ActorRef<Self::Actor>> {
198 let gang: GangRef<Self::Actor> =
199 GangId(self.proc_mesh().world_id().clone(), self.name().to_string()).into();
200 self.shape().slice().iter().map(move |rank| gang.rank(rank))
201 }
202
203 async fn stop(&self) -> Result<(), anyhow::Error> {
204 self.proc_mesh().stop_actor_by_name(self.name()).await
205 }
206
207 fn bind(&self) -> ActorMeshRef<Self::Actor> {
209 ActorMeshRef::attest(
210 ActorMeshId(
211 ProcMeshId(self.world_id().to_string()),
212 self.name().to_string(),
213 ),
214 self.shape().clone(),
215 self.proc_mesh().comm_actor().clone(),
216 )
217 }
218}
219
220enum ProcMeshRef<'a> {
224 Shared(Box<dyn Deref<Target = ProcMesh> + Sync + Send>),
226 Borrowed(&'a ProcMesh),
229}
230
231impl Deref for ProcMeshRef<'_> {
232 type Target = ProcMesh;
233
234 fn deref(&self) -> &Self::Target {
235 match self {
236 Self::Shared(p) => p,
237 Self::Borrowed(p) => p, }
239 }
240}
241
242pub struct RootActorMesh<'a, A: RemoteActor> {
245 proc_mesh: ProcMeshRef<'a>,
246 name: String,
247 pub(crate) ranks: Vec<ActorRef<A>>, actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
251}
252
253impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
254 pub(crate) fn new(
255 proc_mesh: &'a ProcMesh,
256 name: String,
257 actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
258 ranks: Vec<ActorRef<A>>,
259 ) -> Self {
260 Self {
261 proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
262 name,
263 ranks,
264 actor_supervision_rx: Some(actor_supervision_rx),
265 }
266 }
267
268 pub(crate) fn new_shared<D: Deref<Target = ProcMesh> + Send + Sync + 'static>(
269 proc_mesh: D,
270 name: String,
271 actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
272 ranks: Vec<ActorRef<A>>,
273 ) -> Self {
274 Self {
275 proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
276 name,
277 ranks,
278 actor_supervision_rx: Some(actor_supervision_rx),
279 }
280 }
281
282 pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
284 self.proc_mesh.client().open_port()
285 }
286
287 pub fn events(&mut self) -> Option<ActorSupervisionEvents> {
290 self.actor_supervision_rx
291 .take()
292 .map(|actor_supervision_rx| ActorSupervisionEvents {
293 actor_supervision_rx,
294 mesh_id: self.id(),
295 })
296 }
297}
298
299pub struct ActorSupervisionEvents {
301 actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
303 mesh_id: ActorMeshId,
305}
306
307impl ActorSupervisionEvents {
308 pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
309 let result = self.actor_supervision_rx.recv().await;
310 if result.is_none() {
311 tracing::info!(
312 "supervision stream for actor mesh {:?} was closed!",
313 self.mesh_id
314 );
315 }
316 result
317 }
318}
319
320#[async_trait]
321impl<'a, A: RemoteActor> Mesh for RootActorMesh<'a, A> {
322 type Node = ActorRef<A>;
323 type Id = ActorMeshId;
324 type Sliced<'b>
325 = SlicedActorMesh<'b, A>
326 where
327 'a: 'b;
328
329 fn shape(&self) -> &Shape {
330 self.proc_mesh.shape()
331 }
332
333 fn select<R: Into<Range>>(
334 &self,
335 label: &str,
336 range: R,
337 ) -> Result<Self::Sliced<'_>, ShapeError> {
338 Ok(SlicedActorMesh(self, self.shape().select(label, range)?))
339 }
340
341 fn get(&self, rank: usize) -> Option<ActorRef<A>> {
342 self.ranks.get(rank).cloned()
343 }
344
345 fn id(&self) -> Self::Id {
346 ActorMeshId(self.proc_mesh.id(), self.name.clone())
347 }
348}
349
350impl<A: RemoteActor> ActorMesh for RootActorMesh<'_, A> {
351 type Actor = A;
352
353 fn proc_mesh(&self) -> &ProcMesh {
354 &self.proc_mesh
355 }
356
357 fn name(&self) -> &str {
358 &self.name
359 }
360}
361
362pub struct SlicedActorMesh<'a, A: RemoteActor>(&'a RootActorMesh<'a, A>, Shape);
363
364impl<'a, A: RemoteActor> SlicedActorMesh<'a, A> {
365 pub fn new(actor_mesh: &'a RootActorMesh<'a, A>, shape: Shape) -> Self {
366 Self(actor_mesh, shape)
367 }
368
369 pub fn shape(&self) -> &Shape {
370 &self.1
371 }
372}
373
374#[async_trait]
375impl<A: RemoteActor> Mesh for SlicedActorMesh<'_, A> {
376 type Node = ActorRef<A>;
377 type Id = ActorMeshId;
378 type Sliced<'b>
379 = SlicedActorMesh<'b, A>
380 where
381 Self: 'b;
382
383 fn shape(&self) -> &Shape {
384 &self.1
385 }
386
387 fn select<R: Into<Range>>(
388 &self,
389 label: &str,
390 range: R,
391 ) -> Result<Self::Sliced<'_>, ShapeError> {
392 Ok(Self(self.0, self.1.select(label, range)?))
393 }
394
395 fn get(&self, _index: usize) -> Option<ActorRef<A>> {
396 unimplemented!()
397 }
398
399 fn id(&self) -> Self::Id {
400 self.0.id()
401 }
402}
403
404impl<A: RemoteActor> ActorMesh for SlicedActorMesh<'_, A> {
405 type Actor = A;
406
407 fn proc_mesh(&self) -> &ProcMesh {
408 &self.0.proc_mesh
409 }
410
411 fn name(&self) -> &str {
412 &self.0.name
413 }
414
415 #[allow(clippy::result_large_err)] fn cast<M>(&self, sender: &impl CanSend, sel: Selection, message: M) -> Result<(), CastError>
417 where
418 Self::Actor: RemoteHandles<IndexedErasedUnbound<M>>,
419 M: Castable + RemoteMessage,
420 {
421 cast_to_sliced_mesh::<A, M>(
422 sender,
423 self.id(),
424 self.proc_mesh().comm_actor(),
425 &sel,
426 message,
427 self.shape(),
428 self.0.shape(),
429 )
430 }
431}
432
433#[derive(Debug, thiserror::Error)]
435pub enum CastError {
436 #[error("invalid selection {0}: {1}")]
437 InvalidSelection(Selection, ShapeError),
438
439 #[error("send on rank {0}: {1}")]
440 MailboxSenderError(usize, MailboxSenderError),
441
442 #[error(transparent)]
443 RootMailboxSenderError(#[from] MailboxSenderError),
444
445 #[error(transparent)]
446 ShapeError(#[from] ShapeError),
447
448 #[error(transparent)]
449 SliceError(#[from] SliceError),
450
451 #[error(transparent)]
452 SerializationError(#[from] bincode::Error),
453
454 #[error(transparent)]
455 Other(#[from] anyhow::Error),
456}
457
458pub(crate) mod test_util {
461 use std::collections::VecDeque;
462 use std::fmt;
463 use std::fmt::Debug;
464 use std::sync::Arc;
465
466 use anyhow::ensure;
467 use hyperactor::Context;
468 use hyperactor::Handler;
469 use hyperactor::PortRef;
470 use ndslice::extent;
471
472 use super::*;
473 use crate::comm::multicast::CastInfo;
474
475 #[derive(Debug, Default, Actor)]
480 #[hyperactor::export(
481 spawn = true,
482 handlers = [
483 Echo { cast = true },
484 GetRank { cast = true },
485 Error { cast = true },
486 Relay,
487 ],
488 )]
489 pub struct TestActor;
490
491 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
502 pub struct GetRank(pub bool, #[binding(include)] pub PortRef<usize>);
503
504 #[async_trait]
505 impl Handler<GetRank> for TestActor {
506 async fn handle(
507 &mut self,
508 cx: &Context<Self>,
509 GetRank(ok, reply): GetRank,
510 ) -> Result<(), anyhow::Error> {
511 let (rank, _) = cx.cast_info();
512 reply.send(cx, rank)?;
513 anyhow::ensure!(ok, "intentional error!"); Ok(())
515 }
516 }
517
518 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
519 pub struct Echo(pub String, #[binding(include)] pub PortRef<String>);
520
521 #[async_trait]
522 impl Handler<Echo> for TestActor {
523 async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
524 let Echo(message, reply_port) = message;
525 reply_port.send(cx, message)?;
526 Ok(())
527 }
528 }
529
530 #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
531 pub struct Error(pub String);
532
533 #[async_trait]
534 impl Handler<Error> for TestActor {
535 async fn handle(
536 &mut self,
537 _cx: &Context<Self>,
538 Error(error): Error,
539 ) -> Result<(), anyhow::Error> {
540 Err(anyhow::anyhow!("{}", error))
541 }
542 }
543
544 #[derive(Debug, Serialize, Deserialize, Named, Clone)]
545 pub struct Relay(pub usize, pub VecDeque<PortRef<Relay>>);
546
547 #[async_trait]
548 impl Handler<Relay> for TestActor {
549 async fn handle(
550 &mut self,
551 cx: &Context<Self>,
552 Relay(count, mut hops): Relay,
553 ) -> Result<(), anyhow::Error> {
554 ensure!(!hops.is_empty(), "relay must have at least one hop");
555 let next = hops.pop_front().unwrap();
556 next.send(cx, Relay(count + 1, hops))?;
557 Ok(())
558 }
559 }
560
561 #[hyperactor::export(
564 spawn = true,
565 handlers = [
566 Echo,
567 ],
568 )]
569 pub struct ProxyActor {
570 proc_mesh: Arc<ProcMesh>,
571 actor_mesh: RootActorMesh<'static, TestActor>,
572 }
573
574 impl fmt::Debug for ProxyActor {
575 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576 f.debug_struct("ProxyActor")
577 .field("proc_mesh", &"...")
578 .field("actor_mesh", &"...")
579 .finish()
580 }
581 }
582
583 #[async_trait]
584 impl Actor for ProxyActor {
585 type Params = ();
586
587 async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
588 use std::sync::Arc;
590
591 use crate::alloc::AllocSpec;
592 use crate::alloc::Allocator;
593 use crate::alloc::LocalAllocator;
594
595 let mut allocator = LocalAllocator;
596 let alloc = allocator
597 .allocate(AllocSpec {
598 extent: extent! { replica = 1 },
599 constraints: Default::default(),
600 })
601 .await
602 .unwrap();
603 let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
604 let leaked: &'static Arc<ProcMesh> = Box::leak(Box::new(proc_mesh));
605 let actor_mesh: RootActorMesh<'static, TestActor> =
606 leaked.spawn("echo", &()).await.unwrap();
607 Ok(Self {
608 proc_mesh: Arc::clone(leaked),
609 actor_mesh,
610 })
611 }
612 }
613
614 #[async_trait]
615 impl Handler<Echo> for ProxyActor {
616 async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
617 if std::env::var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK").is_err() {
618 let actor = self.actor_mesh.get(0).unwrap();
621
622 let (tx, mut rx) = cx.open_port();
625
626 actor.send(cx, Echo(message.0, tx.bind()))?;
627 message.1.send(cx, rx.recv().await.unwrap())?;
628
629 Ok(())
630 } else {
631 let actor: ActorRef<_> = self.actor_mesh.get(0).unwrap();
634 let (tx, mut rx) = cx.open_port::<String>();
635 actor.send(cx, Echo(message.0, tx.bind()))?;
636
637 use tokio::time::Duration;
638 use tokio::time::timeout;
639 #[allow(clippy::disallowed_methods)]
640 match timeout(Duration::from_secs(1), rx.recv()).await {
641 Ok(_) => message
642 .1
643 .send(cx, "the impossible happened".to_owned())
644 .unwrap(),
645 _ => (),
646 }
647
648 Ok(())
649 }
650 }
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use std::sync::Arc;
657
658 use hyperactor::ActorId;
659 use hyperactor::PortRef;
660 use hyperactor::ProcId;
661 use hyperactor::WorldId;
662 use hyperactor::attrs::Attrs;
663 use timed_test::async_timed_test;
664
665 use super::*;
666 use crate::proc_mesh::ProcEvent;
667
668 #[macro_export]
670 macro_rules! actor_mesh_test_suite {
671 ($allocator:expr_2021) => {
672 use std::assert_matches::assert_matches;
673
674 use ndslice::extent;
675 use $crate::alloc::AllocSpec;
676 use $crate::alloc::Allocator;
677 use $crate::assign::Ranks;
678 use $crate::sel_from_shape;
679 use $crate::sel;
680 use $crate::comm::multicast::set_cast_info_on_headers;
681 use $crate::proc_mesh::SharedSpawnable;
682 use std::collections::VecDeque;
683 use hyperactor::data::Serialized;
684
685 use super::*;
686 use super::test_util::*;
687
688 #[tokio::test]
689 async fn test_proxy_mesh() {
690 use super::test_util::*;
691 use $crate::alloc::AllocSpec;
692 use $crate::alloc::Allocator;
693
694 use ndslice::extent;
695
696 let alloc = $allocator
697 .allocate(AllocSpec {
698 extent: extent! { replica = 1 },
699 constraints: Default::default(),
700 })
701 .await
702 .unwrap();
703 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
704 let actor_mesh: RootActorMesh<'_, ProxyActor> = proc_mesh.spawn("proxy", &()).await.unwrap();
705 let proxy_actor = actor_mesh.get(0).unwrap();
706 let (tx, mut rx) = actor_mesh.open_port::<String>();
707 proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind())).unwrap();
708
709 #[allow(clippy::disallowed_methods)]
710 match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
711 Ok(msg) => assert_eq!(&msg.unwrap(), "hello!"),
712 Err(_) => assert!(false),
713 }
714 }
715
716 #[tokio::test]
717 async fn test_basic() {
718 let alloc = $allocator
719 .allocate(AllocSpec {
720 extent: extent!(replica = 4),
721 constraints: Default::default(),
722 })
723 .await
724 .unwrap();
725
726 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
727 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
728 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
729 actor_mesh
730 .cast(proc_mesh.client(), sel!(*), Echo("Hello".to_string(), reply_handle.bind()))
731 .unwrap();
732 for _ in 0..4 {
733 assert_eq!(&reply_receiver.recv().await.unwrap(), "Hello");
734 }
735 }
736
737 #[tokio::test]
738 async fn test_ping_pong() {
739 use hyperactor::test_utils::pingpong::PingPongActor;
740 use hyperactor::test_utils::pingpong::PingPongMessage;
741 use hyperactor::test_utils::pingpong::PingPongActorParams;
742
743 let alloc = $allocator
744 .allocate(AllocSpec {
745 extent: extent!(replica = 2),
746 constraints: Default::default(),
747 })
748 .await
749 .unwrap();
750 let mesh = ProcMesh::allocate(alloc).await.unwrap();
751
752 let (undeliverable_msg_tx, _) = mesh.client().open_port();
753 let ping_pong_actor_params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
754 let actor_mesh: RootActorMesh<PingPongActor> = mesh
755 .spawn::<PingPongActor>("ping-pong", &ping_pong_actor_params)
756 .await
757 .unwrap();
758
759 let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
760 let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
761 let (done_tx, done_rx) = mesh.client().open_once_port();
762 ping.send(mesh.client(), PingPongMessage(4, pong.clone(), done_tx.bind())).unwrap();
763
764 assert!(done_rx.recv().await.unwrap());
765 }
766
767 #[tokio::test]
768 async fn test_pingpong_full_mesh() {
769 use hyperactor::test_utils::pingpong::PingPongActor;
770 use hyperactor::test_utils::pingpong::PingPongActorParams;
771 use hyperactor::test_utils::pingpong::PingPongMessage;
772
773 use futures::future::join_all;
774
775 const X: usize = 3;
776 const Y: usize = 3;
777 const Z: usize = 3;
778 let alloc = $allocator
779 .allocate(AllocSpec {
780 extent: extent!(x = X, y = Y, z = Z),
781 constraints: Default::default(),
782 })
783 .await
784 .unwrap();
785
786 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
787 let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port();
788 let params = PingPongActorParams::new(Some(undeliverable_tx.bind()), None);
789 let actor_mesh = proc_mesh.spawn::<PingPongActor>("pingpong", ¶ms).await.unwrap();
790 let slice = actor_mesh.shape().slice();
791
792 let mut futures = Vec::new();
793 for rank in slice.iter() {
794 let actor = actor_mesh.get(rank).unwrap();
795 let coords = (&slice.coordinates(rank).unwrap()[..]).try_into().unwrap();
796 let sizes = (&slice.sizes())[..].try_into().unwrap();
797 let neighbors = ndslice::utils::stencil::moore_neighbors::<3>();
798 for neighbor_coords in ndslice::utils::apply_stencil(&coords, sizes, &neighbors) {
799 if let Ok(neighbor_rank) = slice.location(&neighbor_coords) {
800 let neighbor = actor_mesh.get(neighbor_rank).unwrap();
801 let (done_tx, done_rx) = proc_mesh.client().open_once_port();
802 actor
803 .send(
804 proc_mesh.client(),
805 PingPongMessage(4, neighbor.clone(), done_tx.bind()),
806 )
807 .unwrap();
808 futures.push(done_rx.recv());
809 }
810 }
811 }
812 let results = join_all(futures).await;
813 assert_eq!(results.len(), 316); for result in results {
815 assert_eq!(result.unwrap(), true);
816 }
817 }
818
819 #[tokio::test]
820 async fn test_cast() {
821 let alloc = $allocator
822 .allocate(AllocSpec {
823 extent: extent!(replica = 2, host = 2, gpu = 8),
824 constraints: Default::default(),
825 })
826 .await
827 .unwrap();
828
829 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
830 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
831 let dont_simulate_error = true;
832 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
833 actor_mesh
834 .cast(proc_mesh.client(), sel!(*), GetRank(dont_simulate_error, reply_handle.bind()))
835 .unwrap();
836 let mut ranks = Ranks::new(actor_mesh.shape().slice().len());
837 while !ranks.is_full() {
838 let rank = reply_receiver.recv().await.unwrap();
839 assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
840 }
841 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
843 actor_mesh
844 .cast(
845 proc_mesh.client(),
846 sel_from_shape!(actor_mesh.shape(), replica = 0, host = 0),
847 GetRank(dont_simulate_error, reply_handle.bind()),
848 )
849 .unwrap();
850 let mut ranks = Ranks::new(8);
851 while !ranks.is_full() {
852 let rank = reply_receiver.recv().await.unwrap();
853 assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
854 }
855 }
856
857 #[tokio::test]
858 async fn test_inter_actor_comms() {
859 let alloc = $allocator
860 .allocate(AllocSpec {
861 extent: extent!(replica = 2, host = 2, gpu = 8),
865 constraints: Default::default(),
866 })
867 .await
868 .unwrap();
869
870 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
871 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
872
873 let mut hops: VecDeque<_> = actor_mesh.iter().map(|actor| actor.port()).collect();
875 let (handle, mut rx) = proc_mesh.client().open_port();
876 hops.push_back(handle.bind());
877 hops.pop_front()
878 .unwrap()
879 .send(proc_mesh.client(), Relay(0, hops))
880 .unwrap();
881 assert_matches!(
882 rx.recv().await.unwrap(),
883 Relay(count, hops)
884 if count == actor_mesh.shape().slice().len()
885 && hops.is_empty());
886 }
887
888 #[tokio::test]
889 async fn test_inter_proc_mesh_comms() {
890 let mut meshes = Vec::new();
891 for _ in 0..2 {
892 let alloc = $allocator
893 .allocate(AllocSpec {
894 extent: extent!(replica = 1),
895 constraints: Default::default(),
896 })
897 .await
898 .unwrap();
899
900 let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
901 let proc_mesh_clone = Arc::clone(&proc_mesh);
902 let actor_mesh : RootActorMesh<TestActor> = proc_mesh_clone.spawn("echo", &()).await.unwrap();
903 meshes.push((proc_mesh, actor_mesh));
904 }
905
906 let mut hops: VecDeque<_> = meshes
907 .iter()
908 .flat_map(|(_proc_mesh, actor_mesh)| actor_mesh.iter())
909 .map(|actor| actor.port())
910 .collect();
911 let num_hops = hops.len();
912
913 let client = meshes[0].0.client();
914 let (handle, mut rx) = client.open_port();
915 hops.push_back(handle.bind());
916 hops.pop_front()
917 .unwrap()
918 .send(client, Relay(0, hops))
919 .unwrap();
920 assert_matches!(
921 rx.recv().await.unwrap(),
922 Relay(count, hops)
923 if count == num_hops
924 && hops.is_empty());
925 }
926
927 #[async_timed_test(timeout_secs = 60)]
928 async fn test_actor_mesh_cast() {
929 use $crate::sel;
933 use $crate::comm::test_utils::TestActor as CastTestActor;
934 use $crate::comm::test_utils::TestActorParams as CastTestActorParams;
935 use $crate::comm::test_utils::TestMessage as CastTestMessage;
936
937 let extent = extent!(replica = 4, host = 4, gpu = 4);
938 let num_actors = extent.len();
939 let alloc = $allocator
940 .allocate(AllocSpec {
941 extent,
942 constraints: Default::default(),
943 })
944 .await
945 .unwrap();
946
947 let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
948
949 let (tx, mut rx) = hyperactor::mailbox::open_port(proc_mesh.client());
950 let params = CastTestActorParams{ forward_port: tx.bind() };
951 let actor_mesh: RootActorMesh<CastTestActor> = proc_mesh.spawn("actor", ¶ms).await.unwrap();
952
953 actor_mesh.cast(proc_mesh.client(), sel!(*), CastTestMessage::Forward("abc".to_string())).unwrap();
954
955 for _ in 0..num_actors {
956 assert_eq!(rx.recv().await.unwrap(), CastTestMessage::Forward("abc".to_string()));
957 }
958
959 proc_mesh.events().unwrap().into_alloc().stop_and_wait().await.unwrap();
964 }
965
966 #[tokio::test]
967 async fn test_delivery_failure() {
968 let alloc = $allocator
969 .allocate(AllocSpec {
970 extent: extent!(replica = 1 ),
971 constraints: Default::default(),
972 })
973 .await
974 .unwrap();
975
976 let name = alloc.name().to_string();
977 let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
978 let mut events = mesh.events().unwrap();
979
980 let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
982 let bad_actor = ActorRef::<TestActor>::attest(ActorId(ProcId::Ranked(WorldId(name.clone()), 0), "foo".into(), 0));
983 bad_actor.send(mesh.client(), GetRank(true, unmonitored_reply_to)).unwrap();
984
985 assert_matches!(
987 events.next().await.unwrap(),
988 ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
989 );
990
991 }
993
994 #[tokio::test]
995 async fn test_send_with_headers() {
996 let alloc = $allocator
997 .allocate(AllocSpec {
998 extent: extent!(replica = 1 ),
999 constraints: Default::default(),
1000 })
1001 .await
1002 .unwrap();
1003
1004 let mesh = ProcMesh::allocate(alloc).await.unwrap();
1005 let (reply_port_handle, mut reply_port_receiver) = mesh.client().open_port::<usize>();
1006 let reply_port = reply_port_handle.bind();
1007
1008 let actor_mesh: RootActorMesh<TestActor> = mesh.spawn("test", &()).await.unwrap();
1009 let actor_ref = actor_mesh.get(0).unwrap();
1010 let mut headers = Attrs::new();
1011 set_cast_info_on_headers(&mut headers, 0, Shape::unity(), mesh.client().actor_id().clone());
1012 actor_ref.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone())).unwrap();
1013 assert_eq!(0, reply_port_receiver.recv().await.unwrap());
1014
1015 set_cast_info_on_headers(&mut headers, 1, Shape::unity(), mesh.client().actor_id().clone());
1016 actor_ref.port()
1017 .send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone()))
1018 .unwrap();
1019 assert_eq!(1, reply_port_receiver.recv().await.unwrap());
1020
1021 set_cast_info_on_headers(&mut headers, 2, Shape::unity(), mesh.client().actor_id().clone());
1022 actor_ref.actor_id()
1023 .port_id(GetRank::port())
1024 .send_with_headers(
1025 mesh.client(),
1026 &Serialized::serialize(&GetRank(true, reply_port)).unwrap(),
1027 headers
1028 );
1029 assert_eq!(2, reply_port_receiver.recv().await.unwrap());
1030 }
1032 }
1033 }
1034
1035 mod local {
1036 use crate::alloc::local::LocalAllocator;
1037
1038 actor_mesh_test_suite!(LocalAllocator);
1039
1040 #[tokio::test]
1041 async fn test_send_failure() {
1042 use hyperactor::test_utils::pingpong::PingPongActor;
1043 use hyperactor::test_utils::pingpong::PingPongActorParams;
1044 use hyperactor::test_utils::pingpong::PingPongMessage;
1045
1046 use crate::alloc::ProcStopReason;
1047 use crate::proc_mesh::ProcEvent;
1048
1049 let config = hyperactor::config::global::lock();
1050 let _guard = config.override_key(
1051 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1052 tokio::time::Duration::from_secs(1),
1053 );
1054
1055 let alloc = LocalAllocator
1056 .allocate(AllocSpec {
1057 extent: extent!(replica = 2),
1058 constraints: Default::default(),
1059 })
1060 .await
1061 .unwrap();
1062 let monkey = alloc.chaos_monkey();
1063 let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1064 let mut events = mesh.events().unwrap();
1065
1066 let ping_pong_actor_params = PingPongActorParams::new(
1067 Some(PortRef::attest_message_port(mesh.client().actor_id())),
1068 None,
1069 );
1070 let actor_mesh: RootActorMesh<PingPongActor> = mesh
1071 .spawn::<PingPongActor>("ping-pong", &ping_pong_actor_params)
1072 .await
1073 .unwrap();
1074
1075 let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
1076 let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
1077
1078 monkey(0, ProcStopReason::Killed(0, false));
1080 assert_matches!(
1081 events.next().await.unwrap(),
1082 ProcEvent::Stopped(0, ProcStopReason::Killed(0, false))
1083 );
1084
1085 let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1088 ping.send(
1089 mesh.client(),
1090 PingPongMessage(1, pong.clone(), unmonitored_done_tx.bind()),
1091 )
1092 .unwrap();
1093
1094 assert_matches!(
1096 events.next().await.unwrap(),
1097 ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
1098 );
1099
1100 let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1103 pong.send(
1104 mesh.client(),
1105 PingPongMessage(1, ping.clone(), unmonitored_done_tx.bind()),
1106 )
1107 .unwrap();
1108
1109 assert_matches!(
1111 events.next().await.unwrap(),
1112 ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
1113 );
1114 }
1115
1116 #[tokio::test]
1117 async fn test_cast_failure() {
1118 use crate::alloc::ProcStopReason;
1119 use crate::proc_mesh::ProcEvent;
1120 use crate::sel;
1121
1122 let alloc = LocalAllocator
1123 .allocate(AllocSpec {
1124 extent: extent!(replica = 1),
1125 constraints: Default::default(),
1126 })
1127 .await
1128 .unwrap();
1129
1130 let stop = alloc.stopper();
1131 let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1132 let mut events = mesh.events().unwrap();
1133
1134 let actor_mesh = mesh
1135 .spawn::<TestActor>("reply-then-fail", &())
1136 .await
1137 .unwrap();
1138
1139 let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1142 actor_mesh
1143 .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1144 .unwrap();
1145 let rank = reply_receiver.recv().await.unwrap();
1146 assert_eq!(rank, 0);
1147
1148 assert_matches!(
1150 events.next().await.unwrap(),
1151 ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
1152 );
1153
1154 let (reply_handle, _) = actor_mesh.open_port();
1156 actor_mesh
1157 .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1158 .unwrap();
1159
1160 assert_matches!(
1162 events.next().await.unwrap(),
1163 ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
1164 );
1165
1166 stop();
1168 assert_matches!(
1169 events.next().await.unwrap(),
1170 ProcEvent::Stopped(0, ProcStopReason::Stopped),
1171 );
1172 assert!(events.next().await.is_none());
1173 }
1174
1175 #[tracing_test::traced_test]
1176 #[tokio::test]
1177 async fn test_stop_actor_mesh() {
1178 use hyperactor::test_utils::pingpong::PingPongActor;
1179 use hyperactor::test_utils::pingpong::PingPongActorParams;
1180 use hyperactor::test_utils::pingpong::PingPongMessage;
1181
1182 let config = hyperactor::config::global::lock();
1183 let _guard = config.override_key(
1184 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1185 tokio::time::Duration::from_secs(1),
1186 );
1187
1188 let alloc = LocalAllocator
1189 .allocate(AllocSpec {
1190 extent: extent!(replica = 2),
1191 constraints: Default::default(),
1192 })
1193 .await
1194 .unwrap();
1195 let mesh = ProcMesh::allocate(alloc).await.unwrap();
1196
1197 let ping_pong_actor_params = PingPongActorParams::new(
1198 Some(PortRef::attest_message_port(mesh.client().actor_id())),
1199 None,
1200 );
1201 let mesh_one: RootActorMesh<PingPongActor> = mesh
1202 .spawn::<PingPongActor>("mesh_one", &ping_pong_actor_params)
1203 .await
1204 .unwrap();
1205
1206 let mesh_two: RootActorMesh<PingPongActor> = mesh
1207 .spawn::<PingPongActor>("mesh_two", &ping_pong_actor_params)
1208 .await
1209 .unwrap();
1210
1211 mesh_two.stop().await.unwrap();
1212
1213 let ping_two: ActorRef<PingPongActor> = mesh_two.get(0).unwrap();
1214 let pong_two: ActorRef<PingPongActor> = mesh_two.get(1).unwrap();
1215
1216 assert!(logs_contain(&format!(
1217 "stopped actor {}",
1218 ping_two.actor_id()
1219 )));
1220 assert!(logs_contain(&format!(
1221 "stopped actor {}",
1222 pong_two.actor_id()
1223 )));
1224
1225 let ping_one: ActorRef<PingPongActor> = mesh_one.get(0).unwrap();
1227 let pong_one: ActorRef<PingPongActor> = mesh_one.get(1).unwrap();
1228 let (done_tx, done_rx) = mesh.client().open_once_port();
1229 pong_one
1230 .send(
1231 mesh.client(),
1232 PingPongMessage(1, ping_one.clone(), done_tx.bind()),
1233 )
1234 .unwrap();
1235 assert!(done_rx.recv().await.is_ok());
1236 }
1237 } mod process {
1240 use tokio::process::Command;
1241
1242 use crate::alloc::process::ProcessAllocator;
1243
1244 fn process_allocator() -> ProcessAllocator {
1245 ProcessAllocator::new(Command::new(
1246 buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(),
1247 ))
1248 }
1249
1250 #[cfg(fbcode_build)] actor_mesh_test_suite!(process_allocator());
1252
1253 #[cfg(fbcode_build)]
1257 #[async_timed_test(timeout_secs = 60)]
1258 async fn test_fail_sending_to_comm_actor_during_cast() {
1259 let max_frame_length = 1000 * 1000 * 1000;
1260 let config = hyperactor::config::global::lock();
1262 let _guard1 = config.override_key(
1264 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1265 Duration::from_secs(1),
1266 );
1267 let _guard2 =
1268 config.override_key(hyperactor::config::CODEC_MAX_FRAME_LENGTH, max_frame_length);
1269
1270 let extent = extent! {replica = 1 };
1274 let alloc = process_allocator()
1275 .allocate(AllocSpec {
1276 extent,
1277 constraints: Default::default(),
1278 })
1279 .await
1280 .unwrap();
1281
1282 let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1283 let mut proc_events = proc_mesh.events().unwrap();
1284 let mut actor_mesh: RootActorMesh<TestActor> =
1285 { proc_mesh.spawn("echo", &()).await.unwrap() };
1286 let (reply_handle, _reply_receiver) = actor_mesh.open_port();
1287 let payload = "a".repeat(max_frame_length + 1);
1288 assert!(payload.len() > max_frame_length);
1291 actor_mesh
1292 .cast(
1293 proc_mesh.client(),
1294 sel!(*),
1295 Echo(payload, reply_handle.bind()),
1296 )
1297 .unwrap();
1298
1299 {
1303 let event = proc_events.next().await.unwrap();
1304 assert_matches!(event, ProcEvent::Crashed(_, _),);
1305 }
1306 {
1307 let mut actor_mesh_events = actor_mesh.events().unwrap();
1308 let event = actor_mesh_events.next().await.unwrap();
1309 assert_eq!(event.actor_id.name(), &actor_mesh.name);
1310 }
1311 }
1312
1313 #[cfg(fbcode_build)]
1317 #[tokio::test]
1318 async fn test_router_undeliverable_return() {
1319 use ndslice::extent;
1322
1323 use super::test_util::*;
1324 use crate::alloc::AllocSpec;
1325 use crate::alloc::Allocator;
1326
1327 let alloc = process_allocator()
1328 .allocate(AllocSpec {
1329 extent: extent! { replica = 1 },
1330 constraints: Default::default(),
1331 })
1332 .await
1333 .unwrap();
1334
1335 unsafe { std::env::set_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK", "1") };
1337
1338 let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1339 let mut proc_events = proc_mesh.events().unwrap();
1340 let mut actor_mesh: RootActorMesh<'_, ProxyActor> =
1341 { proc_mesh.spawn("proxy", &()).await.unwrap() };
1342 let mut actor_events = actor_mesh.events().unwrap();
1343
1344 let proxy_actor = actor_mesh.get(0).unwrap();
1345 let (tx, mut rx) = actor_mesh.open_port::<String>();
1346 proxy_actor
1347 .send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))
1348 .unwrap();
1349
1350 #[allow(clippy::disallowed_methods)]
1351 match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
1352 Ok(_) => panic!("the impossible happened"),
1353 Err(_) => {
1354 assert_matches!(
1355 proc_events.next().await.unwrap(),
1356 ProcEvent::Crashed(0, reason) if reason.contains("undeliverable")
1357 );
1358 assert_eq!(
1359 actor_events.next().await.unwrap().actor_id.name(),
1360 &actor_mesh.name
1361 );
1362 }
1363 }
1364
1365 unsafe { std::env::remove_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK") };
1367 }
1368 }
1369
1370 mod sim {
1371 use crate::alloc::sim::SimAllocator;
1372
1373 actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1374 }
1375}