hyperactor_mesh/
actor_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // until used publically
10
11use std::collections::BTreeSet;
12use std::ops::Deref;
13
14use async_trait::async_trait;
15use hyperactor::Actor;
16use hyperactor::ActorRef;
17use hyperactor::Bind;
18use hyperactor::GangId;
19use hyperactor::GangRef;
20use hyperactor::Message;
21use hyperactor::Named;
22use hyperactor::PortHandle;
23use hyperactor::RemoteHandles;
24use hyperactor::RemoteMessage;
25use hyperactor::Unbind;
26use hyperactor::WorldId;
27use hyperactor::actor::RemoteActor;
28use hyperactor::attrs::Attrs;
29use hyperactor::attrs::declare_attrs;
30use hyperactor::cap;
31use hyperactor::cap::CanSend;
32use hyperactor::mailbox::MailboxSenderError;
33use hyperactor::mailbox::PortReceiver;
34use hyperactor::message::Castable;
35use hyperactor::message::IndexedErasedUnbound;
36use hyperactor::supervision::ActorSupervisionEvent;
37use ndslice::Range;
38use ndslice::Selection;
39use ndslice::Shape;
40use ndslice::ShapeError;
41use ndslice::SliceError;
42use ndslice::selection;
43use ndslice::selection::EvalOpts;
44use ndslice::selection::ReifySlice;
45use ndslice::selection::normal;
46use serde::Deserialize;
47use serde::Serialize;
48use tokio::sync::mpsc;
49
50use crate::CommActor;
51use crate::Mesh;
52use crate::comm::multicast::CastMessage;
53use crate::comm::multicast::CastMessageEnvelope;
54use crate::comm::multicast::Uslice;
55use crate::metrics;
56use crate::proc_mesh::ProcMesh;
57use crate::reference::ActorMeshId;
58use crate::reference::ActorMeshRef;
59use crate::reference::ProcMeshId;
60
61declare_attrs! {
62    /// Which mesh this message was cast to. Used for undeliverable message
63    /// handling, where the CastMessageEnvelope is serialized, and its content
64    /// cannot be inspected.
65    pub attr CAST_ACTOR_MESH_ID: ActorMeshId;
66}
67
68/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast
69/// an `M`-typed message
70#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
71pub(crate) fn actor_mesh_cast<A, M>(
72    caps: &impl cap::CanSend,
73    actor_mesh_id: ActorMeshId,
74    comm_actor_ref: &ActorRef<CommActor>,
75    selection_of_root: Selection,
76    root_mesh_shape: &Shape,
77    cast_mesh_shape: &Shape,
78    message: M,
79) -> Result<(), CastError>
80where
81    A: RemoteActor + RemoteHandles<IndexedErasedUnbound<M>>,
82    M: Castable + RemoteMessage,
83{
84    let _ = metrics::ACTOR_MESH_CAST_DURATION.start(hyperactor::kv_pairs!(
85        "message_type" => M::typename(),
86        "message_variant" => message.arm().unwrap_or_default(),
87    ));
88
89    let message = CastMessageEnvelope::new::<A, M>(
90        actor_mesh_id.clone(),
91        caps.actor_id().clone(),
92        cast_mesh_shape.clone(),
93        message,
94    )?;
95    let cast_message = CastMessage {
96        // Note: `dest` is on the root mesh' shape, which could be different
97        // from the cast mesh's shape if the cast is on a view, e.g. a sliced
98        // mesh.
99        dest: Uslice {
100            slice: root_mesh_shape.slice().clone(),
101            selection: selection_of_root,
102        },
103        message,
104    };
105
106    let mut headers = Attrs::new();
107    headers.set(CAST_ACTOR_MESH_ID, actor_mesh_id);
108
109    comm_actor_ref
110        .port()
111        .send_with_headers(caps, headers, cast_message)?;
112
113    Ok(())
114}
115
116#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
117pub(crate) fn cast_to_sliced_mesh<A, M>(
118    caps: &impl cap::CanSend,
119    actor_mesh_id: ActorMeshId,
120    comm_actor_ref: &ActorRef<CommActor>,
121    sel_of_sliced: &Selection,
122    message: M,
123    sliced_shape: &Shape,
124    root_mesh_shape: &Shape,
125) -> Result<(), CastError>
126where
127    A: RemoteActor + RemoteHandles<IndexedErasedUnbound<M>>,
128    M: Castable + RemoteMessage,
129{
130    let root_slice = root_mesh_shape.slice();
131
132    // Casting to `*`?
133    let sel_of_root = if selection::normalize(sel_of_sliced) == normal::NormalizedSelection::True {
134        // Reify this view into base.
135        root_slice.reify_slice(sliced_shape.slice())?
136    } else {
137        // No, fall back on `of_ranks`.
138        let ranks = sel_of_sliced
139            .eval(&EvalOpts::strict(), sliced_shape.slice())?
140            .collect::<BTreeSet<_>>();
141        Selection::of_ranks(root_slice, &ranks)?
142    };
143
144    // Cast.
145    actor_mesh_cast::<A, M>(
146        caps,
147        actor_mesh_id,
148        comm_actor_ref,
149        sel_of_root,
150        root_mesh_shape,
151        sliced_shape,
152        message,
153    )
154}
155
156/// A mesh of actors, all of which reside on the same [`ProcMesh`].
157#[async_trait]
158pub trait ActorMesh: Mesh<Id = ActorMeshId> {
159    /// The type of actor in the mesh.
160    type Actor: RemoteActor;
161
162    /// Cast an `M`-typed message to the ranks selected by `sel` in
163    /// this ActorMesh.
164    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
165    fn cast<M>(
166        &self,
167        sender: &impl CanSend,
168        selection: Selection,
169        message: M,
170    ) -> Result<(), CastError>
171    where
172        Self::Actor: RemoteHandles<IndexedErasedUnbound<M>>,
173        M: Castable + RemoteMessage,
174    {
175        actor_mesh_cast::<Self::Actor, M>(
176            sender,                        // send capability
177            self.id(),                     // actor mesh id (destination mesh)
178            self.proc_mesh().comm_actor(), // comm actor
179            selection,                     // the selected actors
180            self.shape(),                  // root mesh shape
181            self.shape(),                  // cast mesh shape
182            message,                       // the message
183        )
184    }
185
186    /// The ProcMesh on top of which this actor mesh is spawned.
187    fn proc_mesh(&self) -> &ProcMesh;
188
189    /// The name given to the actors in this mesh.
190    fn name(&self) -> &str;
191
192    fn world_id(&self) -> &WorldId {
193        self.proc_mesh().world_id()
194    }
195
196    /// Iterate over all `ActorRef<Self::Actor>` in this mesh.
197    fn iter_actor_refs(&self) -> impl Iterator<Item = ActorRef<Self::Actor>> {
198        let gang: GangRef<Self::Actor> =
199            GangId(self.proc_mesh().world_id().clone(), self.name().to_string()).into();
200        self.shape().slice().iter().map(move |rank| gang.rank(rank))
201    }
202
203    async fn stop(&self) -> Result<(), anyhow::Error> {
204        self.proc_mesh().stop_actor_by_name(self.name()).await
205    }
206
207    /// Get a serializeable reference to this mesh similar to ActorHandle::bind
208    fn bind(&self) -> ActorMeshRef<Self::Actor> {
209        ActorMeshRef::attest(
210            ActorMeshId(
211                ProcMeshId(self.world_id().to_string()),
212                self.name().to_string(),
213            ),
214            self.shape().clone(),
215            self.proc_mesh().comm_actor().clone(),
216        )
217    }
218}
219
220/// Abstracts over shared and borrowed references to a [`ProcMesh`].
221/// Given a shared ProcMesh, we can obtain a [`ActorMesh<'static, _>`]
222/// for it, useful when lifetime must be managed dynamically.
223enum ProcMeshRef<'a> {
224    /// The reference is shared without requiring a reference.
225    Shared(Box<dyn Deref<Target = ProcMesh> + Sync + Send>),
226    /// The reference is borrowed with a parameterized
227    /// lifetime.
228    Borrowed(&'a ProcMesh),
229}
230
231impl Deref for ProcMeshRef<'_> {
232    type Target = ProcMesh;
233
234    fn deref(&self) -> &Self::Target {
235        match self {
236            Self::Shared(p) => p,
237            Self::Borrowed(p) => p, // p: &ProcMesh
238        }
239    }
240}
241
242/// A mesh of actor instances. ActorMeshes are obtained by spawning an
243/// actor on a [`ProcMesh`].
244pub struct RootActorMesh<'a, A: RemoteActor> {
245    proc_mesh: ProcMeshRef<'a>,
246    name: String,
247    pub(crate) ranks: Vec<ActorRef<A>>, // temporary until we remove `ArcActorMesh`.
248    // The receiver of supervision events. It is None if it has been transferred to
249    // an actor event observer.
250    actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
251}
252
253impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
254    pub(crate) fn new(
255        proc_mesh: &'a ProcMesh,
256        name: String,
257        actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
258        ranks: Vec<ActorRef<A>>,
259    ) -> Self {
260        Self {
261            proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
262            name,
263            ranks,
264            actor_supervision_rx: Some(actor_supervision_rx),
265        }
266    }
267
268    pub(crate) fn new_shared<D: Deref<Target = ProcMesh> + Send + Sync + 'static>(
269        proc_mesh: D,
270        name: String,
271        actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
272        ranks: Vec<ActorRef<A>>,
273    ) -> Self {
274        Self {
275            proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
276            name,
277            ranks,
278            actor_supervision_rx: Some(actor_supervision_rx),
279        }
280    }
281
282    /// Open a port on this ActorMesh.
283    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
284        self.proc_mesh.client().open_port()
285    }
286
287    /// An event stream of actor events. Each RootActorMesh can produce only one such
288    /// stream, returning None after the first call.
289    pub fn events(&mut self) -> Option<ActorSupervisionEvents> {
290        self.actor_supervision_rx
291            .take()
292            .map(|actor_supervision_rx| ActorSupervisionEvents {
293                actor_supervision_rx,
294                mesh_id: self.id(),
295            })
296    }
297}
298
299/// Supervision event stream for actor mesh. It emits actor supervision events.
300pub struct ActorSupervisionEvents {
301    // The receiver of supervision events from proc mesh.
302    actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
303    // The name of the actor mesh.
304    mesh_id: ActorMeshId,
305}
306
307impl ActorSupervisionEvents {
308    pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
309        let result = self.actor_supervision_rx.recv().await;
310        if result.is_none() {
311            tracing::info!(
312                "supervision stream for actor mesh {:?} was closed!",
313                self.mesh_id
314            );
315        }
316        result
317    }
318}
319
320#[async_trait]
321impl<'a, A: RemoteActor> Mesh for RootActorMesh<'a, A> {
322    type Node = ActorRef<A>;
323    type Id = ActorMeshId;
324    type Sliced<'b>
325        = SlicedActorMesh<'b, A>
326    where
327        'a: 'b;
328
329    fn shape(&self) -> &Shape {
330        self.proc_mesh.shape()
331    }
332
333    fn select<R: Into<Range>>(
334        &self,
335        label: &str,
336        range: R,
337    ) -> Result<Self::Sliced<'_>, ShapeError> {
338        Ok(SlicedActorMesh(self, self.shape().select(label, range)?))
339    }
340
341    fn get(&self, rank: usize) -> Option<ActorRef<A>> {
342        self.ranks.get(rank).cloned()
343    }
344
345    fn id(&self) -> Self::Id {
346        ActorMeshId(self.proc_mesh.id(), self.name.clone())
347    }
348}
349
350impl<A: RemoteActor> ActorMesh for RootActorMesh<'_, A> {
351    type Actor = A;
352
353    fn proc_mesh(&self) -> &ProcMesh {
354        &self.proc_mesh
355    }
356
357    fn name(&self) -> &str {
358        &self.name
359    }
360}
361
362pub struct SlicedActorMesh<'a, A: RemoteActor>(&'a RootActorMesh<'a, A>, Shape);
363
364impl<'a, A: RemoteActor> SlicedActorMesh<'a, A> {
365    pub fn new(actor_mesh: &'a RootActorMesh<'a, A>, shape: Shape) -> Self {
366        Self(actor_mesh, shape)
367    }
368
369    pub fn shape(&self) -> &Shape {
370        &self.1
371    }
372}
373
374#[async_trait]
375impl<A: RemoteActor> Mesh for SlicedActorMesh<'_, A> {
376    type Node = ActorRef<A>;
377    type Id = ActorMeshId;
378    type Sliced<'b>
379        = SlicedActorMesh<'b, A>
380    where
381        Self: 'b;
382
383    fn shape(&self) -> &Shape {
384        &self.1
385    }
386
387    fn select<R: Into<Range>>(
388        &self,
389        label: &str,
390        range: R,
391    ) -> Result<Self::Sliced<'_>, ShapeError> {
392        Ok(Self(self.0, self.1.select(label, range)?))
393    }
394
395    fn get(&self, _index: usize) -> Option<ActorRef<A>> {
396        unimplemented!()
397    }
398
399    fn id(&self) -> Self::Id {
400        self.0.id()
401    }
402}
403
404impl<A: RemoteActor> ActorMesh for SlicedActorMesh<'_, A> {
405    type Actor = A;
406
407    fn proc_mesh(&self) -> &ProcMesh {
408        &self.0.proc_mesh
409    }
410
411    fn name(&self) -> &str {
412        &self.0.name
413    }
414
415    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
416    fn cast<M>(&self, sender: &impl CanSend, sel: Selection, message: M) -> Result<(), CastError>
417    where
418        Self::Actor: RemoteHandles<IndexedErasedUnbound<M>>,
419        M: Castable + RemoteMessage,
420    {
421        cast_to_sliced_mesh::<A, M>(
422            /*caps=*/ sender,
423            /*actor_mesh_id=*/ self.id(),
424            /*comm_actor_ref*/ self.proc_mesh().comm_actor(),
425            /*sel_of_sliced=*/ &sel,
426            /*message=*/ message,
427            /*sliced_shape=*/ self.shape(),
428            /*root_mesh_shape=*/ self.0.shape(),
429        )
430    }
431}
432
433/// The type of error of casting operations.
434#[derive(Debug, thiserror::Error)]
435pub enum CastError {
436    #[error("invalid selection {0}: {1}")]
437    InvalidSelection(Selection, ShapeError),
438
439    #[error("send on rank {0}: {1}")]
440    MailboxSenderError(usize, MailboxSenderError),
441
442    #[error(transparent)]
443    RootMailboxSenderError(#[from] MailboxSenderError),
444
445    #[error(transparent)]
446    ShapeError(#[from] ShapeError),
447
448    #[error(transparent)]
449    SliceError(#[from] SliceError),
450
451    #[error(transparent)]
452    SerializationError(#[from] bincode::Error),
453
454    #[error(transparent)]
455    Other(#[from] anyhow::Error),
456}
457
458// This has to be compiled outside of test mode because the bootstrap binary
459// is not built in test mode, and requires access to TestActor.
460pub(crate) mod test_util {
461    use std::collections::VecDeque;
462    use std::fmt;
463    use std::fmt::Debug;
464    use std::sync::Arc;
465
466    use anyhow::ensure;
467    use hyperactor::Context;
468    use hyperactor::Handler;
469    use hyperactor::PortRef;
470    use ndslice::extent;
471
472    use super::*;
473    use crate::comm::multicast::CastInfo;
474
475    // This can't be defined under a `#[cfg(test)]` because there needs to
476    // be an entry in the spawnable actor registry in the executable
477    // 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
478    // mesh test suite.
479    #[derive(Debug, Default, Actor)]
480    #[hyperactor::export(
481        spawn = true,
482        handlers = [
483            Echo { cast = true },
484            GetRank { cast = true },
485            Error { cast = true },
486            Relay,
487        ],
488    )]
489    pub struct TestActor;
490
491    /// Request message to retrieve the actor's rank.
492    ///
493    /// The `bool` in the tuple controls the outcome of the handler:
494    /// - If `true`, the handler will send the rank and return
495    ///   `Ok(())`.
496    /// - If `false`, the handler will still send the rank, but return
497    ///   an error (`Err(...)`).
498    ///
499    /// This is useful for testing both successful and failing
500    /// responses from a single message type.
501    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
502    pub struct GetRank(pub bool, #[binding(include)] pub PortRef<usize>);
503
504    #[async_trait]
505    impl Handler<GetRank> for TestActor {
506        async fn handle(
507            &mut self,
508            cx: &Context<Self>,
509            GetRank(ok, reply): GetRank,
510        ) -> Result<(), anyhow::Error> {
511            let (rank, _) = cx.cast_info();
512            reply.send(cx, rank)?;
513            anyhow::ensure!(ok, "intentional error!"); // If `!ok` exit with `Err()`.
514            Ok(())
515        }
516    }
517
518    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
519    pub struct Echo(pub String, #[binding(include)] pub PortRef<String>);
520
521    #[async_trait]
522    impl Handler<Echo> for TestActor {
523        async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
524            let Echo(message, reply_port) = message;
525            reply_port.send(cx, message)?;
526            Ok(())
527        }
528    }
529
530    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
531    pub struct Error(pub String);
532
533    #[async_trait]
534    impl Handler<Error> for TestActor {
535        async fn handle(
536            &mut self,
537            _cx: &Context<Self>,
538            Error(error): Error,
539        ) -> Result<(), anyhow::Error> {
540            Err(anyhow::anyhow!("{}", error))
541        }
542    }
543
544    #[derive(Debug, Serialize, Deserialize, Named, Clone)]
545    pub struct Relay(pub usize, pub VecDeque<PortRef<Relay>>);
546
547    #[async_trait]
548    impl Handler<Relay> for TestActor {
549        async fn handle(
550            &mut self,
551            cx: &Context<Self>,
552            Relay(count, mut hops): Relay,
553        ) -> Result<(), anyhow::Error> {
554            ensure!(!hops.is_empty(), "relay must have at least one hop");
555            let next = hops.pop_front().unwrap();
556            next.send(cx, Relay(count + 1, hops))?;
557            Ok(())
558        }
559    }
560
561    // -- ProxyActor
562
563    #[hyperactor::export(
564        spawn = true,
565        handlers = [
566            Echo,
567        ],
568    )]
569    pub struct ProxyActor {
570        proc_mesh: Arc<ProcMesh>,
571        actor_mesh: RootActorMesh<'static, TestActor>,
572    }
573
574    impl fmt::Debug for ProxyActor {
575        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576            f.debug_struct("ProxyActor")
577                .field("proc_mesh", &"...")
578                .field("actor_mesh", &"...")
579                .finish()
580        }
581    }
582
583    #[async_trait]
584    impl Actor for ProxyActor {
585        type Params = ();
586
587        async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
588            // The actor creates a mesh.
589            use std::sync::Arc;
590
591            use crate::alloc::AllocSpec;
592            use crate::alloc::Allocator;
593            use crate::alloc::LocalAllocator;
594
595            let mut allocator = LocalAllocator;
596            let alloc = allocator
597                .allocate(AllocSpec {
598                    extent: extent! { replica = 1 },
599                    constraints: Default::default(),
600                })
601                .await
602                .unwrap();
603            let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
604            let leaked: &'static Arc<ProcMesh> = Box::leak(Box::new(proc_mesh));
605            let actor_mesh: RootActorMesh<'static, TestActor> =
606                leaked.spawn("echo", &()).await.unwrap();
607            Ok(Self {
608                proc_mesh: Arc::clone(leaked),
609                actor_mesh,
610            })
611        }
612    }
613
614    #[async_trait]
615    impl Handler<Echo> for ProxyActor {
616        async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
617            if std::env::var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK").is_err() {
618                // test_proxy_mesh
619
620                let actor = self.actor_mesh.get(0).unwrap();
621
622                // For now, we reply directly to the client.
623                // We will support directly wiring up the meshes later.
624                let (tx, mut rx) = cx.open_port();
625
626                actor.send(cx, Echo(message.0, tx.bind()))?;
627                message.1.send(cx, rx.recv().await.unwrap())?;
628
629                Ok(())
630            } else {
631                // test_router_undeliverable_return
632
633                let actor: ActorRef<_> = self.actor_mesh.get(0).unwrap();
634                let (tx, mut rx) = cx.open_port::<String>();
635                actor.send(cx, Echo(message.0, tx.bind()))?;
636
637                use tokio::time::Duration;
638                use tokio::time::timeout;
639                #[allow(clippy::disallowed_methods)]
640                match timeout(Duration::from_secs(1), rx.recv()).await {
641                    Ok(_) => message
642                        .1
643                        .send(cx, "the impossible happened".to_owned())
644                        .unwrap(),
645                    _ => (),
646                }
647
648                Ok(())
649            }
650        }
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use std::sync::Arc;
657
658    use hyperactor::ActorId;
659    use hyperactor::PortRef;
660    use hyperactor::ProcId;
661    use hyperactor::WorldId;
662    use hyperactor::attrs::Attrs;
663    use timed_test::async_timed_test;
664
665    use super::*;
666    use crate::proc_mesh::ProcEvent;
667
668    // These tests are parametric over allocators.
669    #[macro_export]
670    macro_rules! actor_mesh_test_suite {
671        ($allocator:expr_2021) => {
672            use std::assert_matches::assert_matches;
673
674            use ndslice::extent;
675            use $crate::alloc::AllocSpec;
676            use $crate::alloc::Allocator;
677            use $crate::assign::Ranks;
678            use $crate::sel_from_shape;
679            use $crate::sel;
680            use $crate::comm::multicast::set_cast_info_on_headers;
681            use $crate::proc_mesh::SharedSpawnable;
682            use std::collections::VecDeque;
683            use hyperactor::data::Serialized;
684
685            use super::*;
686            use super::test_util::*;
687
688            #[tokio::test]
689            async fn test_proxy_mesh() {
690                use super::test_util::*;
691                use $crate::alloc::AllocSpec;
692                use $crate::alloc::Allocator;
693
694                use ndslice::extent;
695
696                let alloc = $allocator
697                    .allocate(AllocSpec {
698                        extent: extent! { replica = 1 },
699                        constraints: Default::default(),
700                    })
701                    .await
702                    .unwrap();
703                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
704                let actor_mesh: RootActorMesh<'_, ProxyActor> = proc_mesh.spawn("proxy", &()).await.unwrap();
705                let proxy_actor = actor_mesh.get(0).unwrap();
706                let (tx, mut rx) = actor_mesh.open_port::<String>();
707                proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind())).unwrap();
708
709                #[allow(clippy::disallowed_methods)]
710                match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
711                    Ok(msg) => assert_eq!(&msg.unwrap(), "hello!"),
712                    Err(_) =>  assert!(false),
713                }
714            }
715
716            #[tokio::test]
717            async fn test_basic() {
718                let alloc = $allocator
719                    .allocate(AllocSpec {
720                        extent: extent!(replica = 4),
721                        constraints: Default::default(),
722                    })
723                    .await
724                    .unwrap();
725
726                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
727                let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
728                let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
729                actor_mesh
730                    .cast(proc_mesh.client(), sel!(*), Echo("Hello".to_string(), reply_handle.bind()))
731                    .unwrap();
732                for _ in 0..4 {
733                    assert_eq!(&reply_receiver.recv().await.unwrap(), "Hello");
734                }
735            }
736
737            #[tokio::test]
738            async fn test_ping_pong() {
739                use hyperactor::test_utils::pingpong::PingPongActor;
740                use hyperactor::test_utils::pingpong::PingPongMessage;
741                use hyperactor::test_utils::pingpong::PingPongActorParams;
742
743                let alloc = $allocator
744                    .allocate(AllocSpec {
745                        extent: extent!(replica = 2),
746                        constraints: Default::default(),
747                    })
748                    .await
749                    .unwrap();
750                let mesh = ProcMesh::allocate(alloc).await.unwrap();
751
752                let (undeliverable_msg_tx, _) = mesh.client().open_port();
753                let ping_pong_actor_params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
754                let actor_mesh: RootActorMesh<PingPongActor> = mesh
755                    .spawn::<PingPongActor>("ping-pong", &ping_pong_actor_params)
756                    .await
757                    .unwrap();
758
759                let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
760                let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
761                let (done_tx, done_rx) = mesh.client().open_once_port();
762                ping.send(mesh.client(), PingPongMessage(4, pong.clone(), done_tx.bind())).unwrap();
763
764                assert!(done_rx.recv().await.unwrap());
765            }
766
767            #[tokio::test]
768            async fn test_pingpong_full_mesh() {
769                use hyperactor::test_utils::pingpong::PingPongActor;
770                use hyperactor::test_utils::pingpong::PingPongActorParams;
771                use hyperactor::test_utils::pingpong::PingPongMessage;
772
773                use futures::future::join_all;
774
775                const X: usize = 3;
776                const Y: usize = 3;
777                const Z: usize = 3;
778                let alloc = $allocator
779                    .allocate(AllocSpec {
780                        extent: extent!(x = X, y = Y, z = Z),
781                        constraints: Default::default(),
782                    })
783                    .await
784                    .unwrap();
785
786                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
787                let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port();
788                let params = PingPongActorParams::new(Some(undeliverable_tx.bind()), None);
789                let actor_mesh = proc_mesh.spawn::<PingPongActor>("pingpong", &params).await.unwrap();
790                let slice = actor_mesh.shape().slice();
791
792                let mut futures = Vec::new();
793                for rank in slice.iter() {
794                    let actor = actor_mesh.get(rank).unwrap();
795                    let coords = (&slice.coordinates(rank).unwrap()[..]).try_into().unwrap();
796                    let sizes = (&slice.sizes())[..].try_into().unwrap();
797                    let neighbors = ndslice::utils::stencil::moore_neighbors::<3>();
798                    for neighbor_coords in ndslice::utils::apply_stencil(&coords, sizes, &neighbors) {
799                        if let Ok(neighbor_rank) = slice.location(&neighbor_coords) {
800                            let neighbor = actor_mesh.get(neighbor_rank).unwrap();
801                            let (done_tx, done_rx) = proc_mesh.client().open_once_port();
802                            actor
803                                .send(
804                                    proc_mesh.client(),
805                                    PingPongMessage(4, neighbor.clone(), done_tx.bind()),
806                                )
807                                .unwrap();
808                            futures.push(done_rx.recv());
809                        }
810                    }
811                }
812                let results = join_all(futures).await;
813                assert_eq!(results.len(), 316); // 5180 messages
814                for result in results {
815                    assert_eq!(result.unwrap(), true);
816                }
817            }
818
819            #[tokio::test]
820            async fn test_cast() {
821                let alloc = $allocator
822                    .allocate(AllocSpec {
823                        extent: extent!(replica = 2, host = 2, gpu = 8),
824                        constraints: Default::default(),
825                    })
826                    .await
827                    .unwrap();
828
829                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
830                let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
831                let dont_simulate_error = true;
832                let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
833                actor_mesh
834                    .cast(proc_mesh.client(), sel!(*), GetRank(dont_simulate_error, reply_handle.bind()))
835                    .unwrap();
836                let mut ranks = Ranks::new(actor_mesh.shape().slice().len());
837                while !ranks.is_full() {
838                    let rank = reply_receiver.recv().await.unwrap();
839                    assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
840                }
841                // Retrieve all GPUs on replica=0, host=0
842                let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
843                actor_mesh
844                    .cast(
845                        proc_mesh.client(),
846                        sel_from_shape!(actor_mesh.shape(), replica = 0, host = 0),
847                        GetRank(dont_simulate_error, reply_handle.bind()),
848                    )
849                    .unwrap();
850                let mut ranks = Ranks::new(8);
851                while !ranks.is_full() {
852                    let rank = reply_receiver.recv().await.unwrap();
853                    assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
854                }
855            }
856
857            #[tokio::test]
858            async fn test_inter_actor_comms() {
859                let alloc = $allocator
860                    .allocate(AllocSpec {
861                        // Sizes intentionally small to keep the time
862                        // required for this test in the process case
863                        // reasonable (< 60s).
864                        extent: extent!(replica = 2, host = 2, gpu = 8),
865                        constraints: Default::default(),
866                    })
867                    .await
868                    .unwrap();
869
870                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
871                let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
872
873                // Bounce the message through all actors and return it to the sender (us).
874                let mut hops: VecDeque<_> = actor_mesh.iter().map(|actor| actor.port()).collect();
875                let (handle, mut rx) = proc_mesh.client().open_port();
876                hops.push_back(handle.bind());
877                hops.pop_front()
878                    .unwrap()
879                    .send(proc_mesh.client(), Relay(0, hops))
880                    .unwrap();
881                assert_matches!(
882                    rx.recv().await.unwrap(),
883                    Relay(count, hops)
884                        if count == actor_mesh.shape().slice().len()
885                        && hops.is_empty());
886            }
887
888            #[tokio::test]
889            async fn test_inter_proc_mesh_comms() {
890                let mut meshes = Vec::new();
891                for _ in 0..2 {
892                    let alloc = $allocator
893                        .allocate(AllocSpec {
894                            extent: extent!(replica = 1),
895                            constraints: Default::default(),
896                        })
897                        .await
898                        .unwrap();
899
900                    let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
901                    let proc_mesh_clone = Arc::clone(&proc_mesh);
902                    let actor_mesh : RootActorMesh<TestActor> = proc_mesh_clone.spawn("echo", &()).await.unwrap();
903                    meshes.push((proc_mesh, actor_mesh));
904                }
905
906                let mut hops: VecDeque<_> = meshes
907                    .iter()
908                    .flat_map(|(_proc_mesh, actor_mesh)| actor_mesh.iter())
909                    .map(|actor| actor.port())
910                    .collect();
911                let num_hops = hops.len();
912
913                let client = meshes[0].0.client();
914                let (handle, mut rx) = client.open_port();
915                hops.push_back(handle.bind());
916                hops.pop_front()
917                    .unwrap()
918                    .send(client, Relay(0, hops))
919                    .unwrap();
920                assert_matches!(
921                    rx.recv().await.unwrap(),
922                    Relay(count, hops)
923                        if count == num_hops
924                        && hops.is_empty());
925            }
926
927            #[async_timed_test(timeout_secs = 60)]
928            async fn test_actor_mesh_cast() {
929                // Verify a full broadcast in the mesh. Send a message
930                // to every actor and check each actor receives it.
931
932                use $crate::sel;
933                use $crate::comm::test_utils::TestActor as CastTestActor;
934                use $crate::comm::test_utils::TestActorParams as CastTestActorParams;
935                use $crate::comm::test_utils::TestMessage as CastTestMessage;
936
937                let extent = extent!(replica = 4, host = 4, gpu = 4);
938                let num_actors = extent.len();
939                let alloc = $allocator
940                    .allocate(AllocSpec {
941                        extent,
942                        constraints: Default::default(),
943                    })
944                    .await
945                    .unwrap();
946
947                let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
948
949                let (tx, mut rx) = hyperactor::mailbox::open_port(proc_mesh.client());
950                let params = CastTestActorParams{ forward_port: tx.bind() };
951                let actor_mesh: RootActorMesh<CastTestActor> = proc_mesh.spawn("actor", &params).await.unwrap();
952
953                actor_mesh.cast(proc_mesh.client(), sel!(*), CastTestMessage::Forward("abc".to_string())).unwrap();
954
955                for _ in 0..num_actors {
956                    assert_eq!(rx.recv().await.unwrap(), CastTestMessage::Forward("abc".to_string()));
957                }
958
959                // Attempt to avoid this intermittent fatal error.
960                // ⚠ Fatal: monarch/hyperactor_mesh:hyperactor_mesh-unittest - \
961                //            actor_mesh::tests::sim::test_actor_mesh_cast (2.5s)
962                // Test appears to have passed but the binary exited with a non-zero exit code.
963                proc_mesh.events().unwrap().into_alloc().stop_and_wait().await.unwrap();
964            }
965
966            #[tokio::test]
967            async fn test_delivery_failure() {
968                let alloc = $allocator
969                    .allocate(AllocSpec {
970                        extent: extent!(replica = 1 ),
971                        constraints: Default::default(),
972                    })
973                    .await
974                    .unwrap();
975
976                let name = alloc.name().to_string();
977                let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
978                let mut events = mesh.events().unwrap();
979
980                // Send a message to a non-existent actor (the proc however exists).
981                let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
982                let bad_actor = ActorRef::<TestActor>::attest(ActorId(ProcId::Ranked(WorldId(name.clone()), 0), "foo".into(), 0));
983                bad_actor.send(mesh.client(), GetRank(true, unmonitored_reply_to)).unwrap();
984
985                // The message will be returned!
986                assert_matches!(
987                    events.next().await.unwrap(),
988                    ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
989                );
990
991                // TODO: Stop the proc.
992            }
993
994            #[tokio::test]
995            async fn test_send_with_headers() {
996                let alloc = $allocator
997                    .allocate(AllocSpec {
998                        extent: extent!(replica = 1 ),
999                        constraints: Default::default(),
1000                    })
1001                    .await
1002                    .unwrap();
1003
1004                let mesh = ProcMesh::allocate(alloc).await.unwrap();
1005                let (reply_port_handle, mut reply_port_receiver) = mesh.client().open_port::<usize>();
1006                let reply_port = reply_port_handle.bind();
1007
1008                let actor_mesh: RootActorMesh<TestActor> = mesh.spawn("test", &()).await.unwrap();
1009                let actor_ref = actor_mesh.get(0).unwrap();
1010                let mut headers = Attrs::new();
1011                set_cast_info_on_headers(&mut headers, 0, Shape::unity(), mesh.client().actor_id().clone());
1012                actor_ref.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone())).unwrap();
1013                assert_eq!(0, reply_port_receiver.recv().await.unwrap());
1014
1015                set_cast_info_on_headers(&mut headers, 1, Shape::unity(), mesh.client().actor_id().clone());
1016                actor_ref.port()
1017                    .send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone()))
1018                    .unwrap();
1019                assert_eq!(1, reply_port_receiver.recv().await.unwrap());
1020
1021                set_cast_info_on_headers(&mut headers, 2, Shape::unity(), mesh.client().actor_id().clone());
1022                actor_ref.actor_id()
1023                    .port_id(GetRank::port())
1024                    .send_with_headers(
1025                        mesh.client(),
1026                        &Serialized::serialize(&GetRank(true, reply_port)).unwrap(),
1027                        headers
1028                    );
1029                assert_eq!(2, reply_port_receiver.recv().await.unwrap());
1030                // TODO: Stop the proc.
1031            }
1032        }
1033    }
1034
1035    mod local {
1036        use crate::alloc::local::LocalAllocator;
1037
1038        actor_mesh_test_suite!(LocalAllocator);
1039
1040        #[tokio::test]
1041        async fn test_send_failure() {
1042            use hyperactor::test_utils::pingpong::PingPongActor;
1043            use hyperactor::test_utils::pingpong::PingPongActorParams;
1044            use hyperactor::test_utils::pingpong::PingPongMessage;
1045
1046            use crate::alloc::ProcStopReason;
1047            use crate::proc_mesh::ProcEvent;
1048
1049            let config = hyperactor::config::global::lock();
1050            let _guard = config.override_key(
1051                hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1052                tokio::time::Duration::from_secs(1),
1053            );
1054
1055            let alloc = LocalAllocator
1056                .allocate(AllocSpec {
1057                    extent: extent!(replica = 2),
1058                    constraints: Default::default(),
1059                })
1060                .await
1061                .unwrap();
1062            let monkey = alloc.chaos_monkey();
1063            let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1064            let mut events = mesh.events().unwrap();
1065
1066            let ping_pong_actor_params = PingPongActorParams::new(
1067                Some(PortRef::attest_message_port(mesh.client().actor_id())),
1068                None,
1069            );
1070            let actor_mesh: RootActorMesh<PingPongActor> = mesh
1071                .spawn::<PingPongActor>("ping-pong", &ping_pong_actor_params)
1072                .await
1073                .unwrap();
1074
1075            let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
1076            let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
1077
1078            // Kill ping.
1079            monkey(0, ProcStopReason::Killed(0, false));
1080            assert_matches!(
1081                events.next().await.unwrap(),
1082                ProcEvent::Stopped(0, ProcStopReason::Killed(0, false))
1083            );
1084
1085            // Try to send a message to 'ping'. Since 'ping's mailbox
1086            // is stopped, the send will timeout and fail.
1087            let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1088            ping.send(
1089                mesh.client(),
1090                PingPongMessage(1, pong.clone(), unmonitored_done_tx.bind()),
1091            )
1092            .unwrap();
1093
1094            // The message will be returned!
1095            assert_matches!(
1096                events.next().await.unwrap(),
1097                ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
1098            );
1099
1100            // Get 'pong' to send 'ping' a message. Since 'ping's
1101            // mailbox is stopped, the send will timeout and fail.
1102            let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1103            pong.send(
1104                mesh.client(),
1105                PingPongMessage(1, ping.clone(), unmonitored_done_tx.bind()),
1106            )
1107            .unwrap();
1108
1109            // The message will be returned!
1110            assert_matches!(
1111                events.next().await.unwrap(),
1112                ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
1113            );
1114        }
1115
1116        #[tokio::test]
1117        async fn test_cast_failure() {
1118            use crate::alloc::ProcStopReason;
1119            use crate::proc_mesh::ProcEvent;
1120            use crate::sel;
1121
1122            let alloc = LocalAllocator
1123                .allocate(AllocSpec {
1124                    extent: extent!(replica = 1),
1125                    constraints: Default::default(),
1126                })
1127                .await
1128                .unwrap();
1129
1130            let stop = alloc.stopper();
1131            let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1132            let mut events = mesh.events().unwrap();
1133
1134            let actor_mesh = mesh
1135                .spawn::<TestActor>("reply-then-fail", &())
1136                .await
1137                .unwrap();
1138
1139            // `GetRank` with `false` means exit with error after
1140            // replying with rank.
1141            let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1142            actor_mesh
1143                .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1144                .unwrap();
1145            let rank = reply_receiver.recv().await.unwrap();
1146            assert_eq!(rank, 0);
1147
1148            // The above is expected to trigger a proc crash.
1149            assert_matches!(
1150                events.next().await.unwrap(),
1151                ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
1152            );
1153
1154            // Cast the message.
1155            let (reply_handle, _) = actor_mesh.open_port();
1156            actor_mesh
1157                .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1158                .unwrap();
1159
1160            // The message will be returned!
1161            assert_matches!(
1162                events.next().await.unwrap(),
1163                ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
1164            );
1165
1166            // Stop the mesh.
1167            stop();
1168            assert_matches!(
1169                events.next().await.unwrap(),
1170                ProcEvent::Stopped(0, ProcStopReason::Stopped),
1171            );
1172            assert!(events.next().await.is_none());
1173        }
1174
1175        #[tracing_test::traced_test]
1176        #[tokio::test]
1177        async fn test_stop_actor_mesh() {
1178            use hyperactor::test_utils::pingpong::PingPongActor;
1179            use hyperactor::test_utils::pingpong::PingPongActorParams;
1180            use hyperactor::test_utils::pingpong::PingPongMessage;
1181
1182            let config = hyperactor::config::global::lock();
1183            let _guard = config.override_key(
1184                hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1185                tokio::time::Duration::from_secs(1),
1186            );
1187
1188            let alloc = LocalAllocator
1189                .allocate(AllocSpec {
1190                    extent: extent!(replica = 2),
1191                    constraints: Default::default(),
1192                })
1193                .await
1194                .unwrap();
1195            let mesh = ProcMesh::allocate(alloc).await.unwrap();
1196
1197            let ping_pong_actor_params = PingPongActorParams::new(
1198                Some(PortRef::attest_message_port(mesh.client().actor_id())),
1199                None,
1200            );
1201            let mesh_one: RootActorMesh<PingPongActor> = mesh
1202                .spawn::<PingPongActor>("mesh_one", &ping_pong_actor_params)
1203                .await
1204                .unwrap();
1205
1206            let mesh_two: RootActorMesh<PingPongActor> = mesh
1207                .spawn::<PingPongActor>("mesh_two", &ping_pong_actor_params)
1208                .await
1209                .unwrap();
1210
1211            mesh_two.stop().await.unwrap();
1212
1213            let ping_two: ActorRef<PingPongActor> = mesh_two.get(0).unwrap();
1214            let pong_two: ActorRef<PingPongActor> = mesh_two.get(1).unwrap();
1215
1216            assert!(logs_contain(&format!(
1217                "stopped actor {}",
1218                ping_two.actor_id()
1219            )));
1220            assert!(logs_contain(&format!(
1221                "stopped actor {}",
1222                pong_two.actor_id()
1223            )));
1224
1225            // Other actor meshes on this proc mesh should still be up and running
1226            let ping_one: ActorRef<PingPongActor> = mesh_one.get(0).unwrap();
1227            let pong_one: ActorRef<PingPongActor> = mesh_one.get(1).unwrap();
1228            let (done_tx, done_rx) = mesh.client().open_once_port();
1229            pong_one
1230                .send(
1231                    mesh.client(),
1232                    PingPongMessage(1, ping_one.clone(), done_tx.bind()),
1233                )
1234                .unwrap();
1235            assert!(done_rx.recv().await.is_ok());
1236        }
1237    } // mod local
1238
1239    mod process {
1240        use tokio::process::Command;
1241
1242        use crate::alloc::process::ProcessAllocator;
1243
1244        fn process_allocator() -> ProcessAllocator {
1245            ProcessAllocator::new(Command::new(
1246                buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(),
1247            ))
1248        }
1249
1250        #[cfg(fbcode_build)] // we use an external binary, produced by buck
1251        actor_mesh_test_suite!(process_allocator());
1252
1253        // Set this test only for `mod process` because it leverages CODEC_MAX_FRAME_LENGTH
1254        // to trigger the timeout error, and CODEC_MAX_FRAME_LENGTH only applies
1255        // to NetTx/NetRx.
1256        #[cfg(fbcode_build)]
1257        #[async_timed_test(timeout_secs = 60)]
1258        async fn test_fail_sending_to_comm_actor_during_cast() {
1259            let max_frame_length = 1000 * 1000 * 1000;
1260            // Use temporary config for this test
1261            let config = hyperactor::config::global::lock();
1262            // Set to 1 sec because we want to timeout fast.
1263            let _guard1 = config.override_key(
1264                hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1265                Duration::from_secs(1),
1266            );
1267            let _guard2 =
1268                config.override_key(hyperactor::config::CODEC_MAX_FRAME_LENGTH, max_frame_length);
1269
1270            // One rank is enough for this test, because the timeout failure
1271            // occurred in the `client->1st comm actor` channel, and thus will
1272            // not be sent further to the rest of the network.
1273            let extent = extent! {replica = 1 };
1274            let alloc = process_allocator()
1275                .allocate(AllocSpec {
1276                    extent,
1277                    constraints: Default::default(),
1278                })
1279                .await
1280                .unwrap();
1281
1282            let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1283            let mut proc_events = proc_mesh.events().unwrap();
1284            let mut actor_mesh: RootActorMesh<TestActor> =
1285                { proc_mesh.spawn("echo", &()).await.unwrap() };
1286            let (reply_handle, _reply_receiver) = actor_mesh.open_port();
1287            let payload = "a".repeat(max_frame_length + 1);
1288            // Since the payload size is larger than the max frame length,
1289            // the message will fail to send.
1290            assert!(payload.len() > max_frame_length);
1291            actor_mesh
1292                .cast(
1293                    proc_mesh.client(),
1294                    sel!(*),
1295                    Echo(payload, reply_handle.bind()),
1296                )
1297                .unwrap();
1298
1299            // The undeliverable message will be turned into a proc event.
1300            // Part of proc event's handling logic will forward the event
1301            // to the mesh's event.
1302            {
1303                let event = proc_events.next().await.unwrap();
1304                assert_matches!(event, ProcEvent::Crashed(_, _),);
1305            }
1306            {
1307                let mut actor_mesh_events = actor_mesh.events().unwrap();
1308                let event = actor_mesh_events.next().await.unwrap();
1309                assert_eq!(event.actor_id.name(), &actor_mesh.name);
1310            }
1311        }
1312
1313        // Set this test only for `mod process` because it relies on a
1314        // trick to emulate router failure that only works when using
1315        // non-local allocators.
1316        #[cfg(fbcode_build)]
1317        #[tokio::test]
1318        async fn test_router_undeliverable_return() {
1319            // Test that an undeliverable message received by a
1320            // router results in actor mesh supervision events.
1321            use ndslice::extent;
1322
1323            use super::test_util::*;
1324            use crate::alloc::AllocSpec;
1325            use crate::alloc::Allocator;
1326
1327            let alloc = process_allocator()
1328                .allocate(AllocSpec {
1329                    extent: extent! { replica = 1 },
1330                    constraints: Default::default(),
1331                })
1332                .await
1333                .unwrap();
1334
1335            // SAFETY: Not multithread safe.
1336            unsafe { std::env::set_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK", "1") };
1337
1338            let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1339            let mut proc_events = proc_mesh.events().unwrap();
1340            let mut actor_mesh: RootActorMesh<'_, ProxyActor> =
1341                { proc_mesh.spawn("proxy", &()).await.unwrap() };
1342            let mut actor_events = actor_mesh.events().unwrap();
1343
1344            let proxy_actor = actor_mesh.get(0).unwrap();
1345            let (tx, mut rx) = actor_mesh.open_port::<String>();
1346            proxy_actor
1347                .send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))
1348                .unwrap();
1349
1350            #[allow(clippy::disallowed_methods)]
1351            match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
1352                Ok(_) => panic!("the impossible happened"),
1353                Err(_) => {
1354                    assert_matches!(
1355                        proc_events.next().await.unwrap(),
1356                        ProcEvent::Crashed(0, reason) if reason.contains("undeliverable")
1357                    );
1358                    assert_eq!(
1359                        actor_events.next().await.unwrap().actor_id.name(),
1360                        &actor_mesh.name
1361                    );
1362                }
1363            }
1364
1365            // SAFETY: Not multithread safe.
1366            unsafe { std::env::remove_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK") };
1367        }
1368    }
1369
1370    mod sim {
1371        use crate::alloc::sim::SimAllocator;
1372
1373        actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1374    }
1375}