hyperactor_mesh/
actor_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // until used publically
10
11use std::collections::BTreeSet;
12use std::ops::Deref;
13use std::sync::OnceLock;
14
15use async_trait::async_trait;
16use hyperactor::Actor;
17use hyperactor::ActorRef;
18use hyperactor::Bind;
19use hyperactor::GangId;
20use hyperactor::GangRef;
21use hyperactor::Message;
22use hyperactor::Named;
23use hyperactor::PortHandle;
24use hyperactor::RemoteHandles;
25use hyperactor::RemoteMessage;
26use hyperactor::Unbind;
27use hyperactor::WorldId;
28use hyperactor::actor::Referable;
29use hyperactor::attrs::Attrs;
30use hyperactor::attrs::declare_attrs;
31use hyperactor::config;
32use hyperactor::context;
33use hyperactor::mailbox::MailboxSenderError;
34use hyperactor::mailbox::PortReceiver;
35use hyperactor::message::Castable;
36use hyperactor::message::IndexedErasedUnbound;
37use hyperactor::supervision::ActorSupervisionEvent;
38use ndslice::Range;
39use ndslice::Selection;
40use ndslice::Shape;
41use ndslice::ShapeError;
42use ndslice::SliceError;
43use ndslice::View;
44use ndslice::reshape::Limit;
45use ndslice::reshape::ReshapeError;
46use ndslice::reshape::ReshapeSliceExt;
47use ndslice::reshape::reshape_selection;
48use ndslice::selection;
49use ndslice::selection::EvalOpts;
50use ndslice::selection::ReifySlice;
51use ndslice::selection::normal;
52use ndslice::view::ViewExt;
53use serde::Deserialize;
54use serde::Serialize;
55use serde_multipart::Part;
56use tokio::sync::mpsc;
57
58use crate::CommActor;
59use crate::Mesh;
60use crate::comm::multicast::CastMessage;
61use crate::comm::multicast::CastMessageEnvelope;
62use crate::comm::multicast::Uslice;
63use crate::config::MAX_CAST_DIMENSION_SIZE;
64use crate::metrics;
65use crate::proc_mesh::ProcMesh;
66use crate::reference::ActorMeshId;
67use crate::reference::ActorMeshRef;
68use crate::v1;
69
70declare_attrs! {
71    /// Which mesh this message was cast to. Used for undeliverable message
72    /// handling, where the CastMessageEnvelope is serialized, and its content
73    /// cannot be inspected.
74    pub attr CAST_ACTOR_MESH_ID: ActorMeshId;
75}
76
77/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast
78/// an `M`-typed message
79#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
80pub(crate) fn actor_mesh_cast<A, M>(
81    cx: &impl context::Actor,
82    actor_mesh_id: ActorMeshId,
83    comm_actor_ref: &ActorRef<CommActor>,
84    selection_of_root: Selection,
85    root_mesh_shape: &Shape,
86    cast_mesh_shape: &Shape,
87    message: M,
88) -> Result<(), CastError>
89where
90    A: Referable + RemoteHandles<IndexedErasedUnbound<M>>,
91    M: Castable + RemoteMessage,
92{
93    let _ = metrics::ACTOR_MESH_CAST_DURATION.start(hyperactor::kv_pairs!(
94        "message_type" => M::typename(),
95        "message_variant" => message.arm().unwrap_or_default(),
96    ));
97
98    let message = CastMessageEnvelope::new::<A, M>(
99        actor_mesh_id.clone(),
100        cx.mailbox().actor_id().clone(),
101        cast_mesh_shape.clone(),
102        message,
103    )?;
104
105    // Mesh's shape might have large extents on some dimensions. Those
106    // dimensions would cause large fanout in our comm actor
107    // implementation. To avoid that, we reshape it by increasing
108    // dimensionality and limiting the extent of each dimension. Note
109    // the reshape is only visible to the internal algorithm. The
110    // shape that user sees maintains intact.
111    //
112    // For example, a typical shape is [hosts=1024, gpus=8]. By using
113    // limit 8, it becomes [8, 8, 8, 2, 8] during casting. In other
114    // words, it adds 3 extra layers to the comm actor tree, while
115    // keeping the fanout in each layer per dimension at 8 or smaller.
116    //
117    // An important note here is that max dimension size != max fanout.
118    // Rank 0 must send a message to all ranks at index 0 for every dimension.
119    // If our reshaped shape is [8, 8, 8, 2, 8], rank 0 must send
120    // 7 + 7 + 7 + 1 + 7 = 21 messages.
121
122    let slice_of_root = root_mesh_shape.slice();
123
124    let max_cast_dimension_size = config::global::get(MAX_CAST_DIMENSION_SIZE);
125
126    let slice_of_cast = slice_of_root.reshape_with_limit(Limit::from(max_cast_dimension_size));
127
128    let selection_of_cast =
129        reshape_selection(selection_of_root, root_mesh_shape.slice(), &slice_of_cast)?;
130
131    let cast_message = CastMessage {
132        dest: Uslice {
133            slice: slice_of_cast,
134            selection: selection_of_cast,
135        },
136        message,
137    };
138
139    let mut headers = Attrs::new();
140    headers.set(CAST_ACTOR_MESH_ID, actor_mesh_id);
141
142    comm_actor_ref
143        .port()
144        .send_with_headers(cx, headers, cast_message)?;
145
146    Ok(())
147}
148
149#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
150pub(crate) fn cast_to_sliced_mesh<A, M>(
151    cx: &impl context::Actor,
152    actor_mesh_id: ActorMeshId,
153    comm_actor_ref: &ActorRef<CommActor>,
154    sel_of_sliced: &Selection,
155    message: M,
156    sliced_shape: &Shape,
157    root_mesh_shape: &Shape,
158) -> Result<(), CastError>
159where
160    A: Referable + RemoteHandles<IndexedErasedUnbound<M>>,
161    M: Castable + RemoteMessage,
162{
163    let root_slice = root_mesh_shape.slice();
164
165    // Casting to `*`?
166    let sel_of_root = if selection::normalize(sel_of_sliced) == normal::NormalizedSelection::True {
167        // Reify this view into base.
168        root_slice.reify_slice(sliced_shape.slice())?
169    } else {
170        // No, fall back on `of_ranks`.
171        let ranks = sel_of_sliced
172            .eval(&EvalOpts::strict(), sliced_shape.slice())?
173            .collect::<BTreeSet<_>>();
174        Selection::of_ranks(root_slice, &ranks)?
175    };
176
177    // Cast.
178    actor_mesh_cast::<A, M>(
179        cx,
180        actor_mesh_id,
181        comm_actor_ref,
182        sel_of_root,
183        root_mesh_shape,
184        sliced_shape,
185        message,
186    )
187}
188
189/// A mesh of actors, all of which reside on the same [`ProcMesh`].
190#[async_trait]
191pub trait ActorMesh: Mesh<Id = ActorMeshId> {
192    /// The type of actor in the mesh.
193    type Actor: Referable;
194
195    /// Cast an `M`-typed message to the ranks selected by `sel` in
196    /// this ActorMesh.
197    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
198    fn cast<M>(
199        &self,
200        cx: &impl context::Actor,
201        selection: Selection,
202        message: M,
203    ) -> Result<(), CastError>
204    where
205        Self::Actor: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
206        M: Castable + RemoteMessage + Clone,
207    {
208        if let Some(v1) = self.v1() {
209            return v1
210                .cast_for_tensor_engine_only_do_not_use(cx, selection, message)
211                .map_err(anyhow::Error::from)
212                .map_err(CastError::from);
213        }
214        actor_mesh_cast::<Self::Actor, M>(
215            cx,                            // actor context
216            self.id(),                     // actor mesh id (destination mesh)
217            self.proc_mesh().comm_actor(), // comm actor
218            selection,                     // the selected actors
219            self.shape(),                  // root mesh shape
220            self.shape(),                  // cast mesh shape
221            message,                       // the message
222        )
223    }
224
225    /// The ProcMesh on top of which this actor mesh is spawned.
226    fn proc_mesh(&self) -> &ProcMesh;
227
228    /// The name given to the actors in this mesh.
229    fn name(&self) -> &str;
230
231    fn world_id(&self) -> &WorldId {
232        self.proc_mesh().world_id()
233    }
234
235    /// Iterate over all `ActorRef<Self::Actor>` in this mesh.
236    fn iter_actor_refs(&self) -> Box<dyn Iterator<Item = ActorRef<Self::Actor>>> {
237        if let Some(v1) = self.v1() {
238            // We collect() here to ensure that the data are owned. Since this is a short-lived
239            // shim, we'll live with it.
240            return Box::new(
241                v1.iter()
242                    .map(|(_point, actor_ref)| actor_ref.clone())
243                    .collect::<Vec<_>>()
244                    .into_iter(),
245            );
246        }
247        let gang: GangRef<Self::Actor> = GangRef::attest(GangId(
248            self.proc_mesh().world_id().clone(),
249            self.name().to_string(),
250        ));
251        Box::new(self.shape().slice().iter().map(move |rank| gang.rank(rank)))
252    }
253
254    async fn stop(&self, cx: &impl context::Actor) -> Result<(), anyhow::Error> {
255        self.proc_mesh().stop_actor_by_name(cx, self.name()).await
256    }
257
258    /// Get a serializeable reference to this mesh similar to ActorHandle::bind
259    fn bind(&self) -> ActorMeshRef<Self::Actor> {
260        ActorMeshRef::attest(
261            self.id(),
262            self.shape().clone(),
263            self.proc_mesh().comm_actor().clone(),
264        )
265    }
266
267    /// Retrieves the v1 mesh for this v0 ActorMesh, if it is available.
268    fn v1(&self) -> Option<v1::ActorMeshRef<Self::Actor>>;
269}
270
271/// Abstracts over shared and borrowed references to a [`ProcMesh`].
272/// Given a shared ProcMesh, we can obtain a [`ActorMesh<'static, _>`]
273/// for it, useful when lifetime must be managed dynamically.
274enum ProcMeshRef<'a> {
275    /// The reference is shared without requiring a reference.
276    Shared(Box<dyn Deref<Target = ProcMesh> + Sync + Send>),
277    /// The reference is borrowed with a parameterized
278    /// lifetime.
279    Borrowed(&'a ProcMesh),
280}
281
282impl Deref for ProcMeshRef<'_> {
283    type Target = ProcMesh;
284
285    fn deref(&self) -> &Self::Target {
286        match self {
287            Self::Shared(p) => p,
288            Self::Borrowed(p) => p, // p: &ProcMesh
289        }
290    }
291}
292
293/// A mesh of actor instances. ActorMeshes are obtained by spawning an
294/// actor on a [`ProcMesh`].
295///
296/// Generic bound: `A: Referable` — this type hands out typed
297/// `ActorRef<A>` handles (see `ranks`), and `ActorRef` is only
298/// defined for `A: Referable`.
299pub struct RootActorMesh<'a, A: Referable> {
300    inner: ActorMeshKind<'a, A>,
301    shape: OnceLock<Shape>,
302    proc_mesh: OnceLock<ProcMesh>,
303    name: OnceLock<String>,
304}
305
306enum ActorMeshKind<'a, A: Referable> {
307    V0 {
308        proc_mesh: ProcMeshRef<'a>,
309        name: String,
310        ranks: Vec<ActorRef<A>>, // temporary until we remove `ArcActorMesh`.
311        // The receiver of supervision events. It is None if it has been transferred to
312        // an actor event observer.
313        actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
314    },
315
316    V1(v1::ActorMeshRef<A>),
317}
318
319impl<'a, A: Referable> From<v1::ActorMeshRef<A>> for RootActorMesh<'a, A> {
320    fn from(actor_mesh: v1::ActorMeshRef<A>) -> Self {
321        Self {
322            inner: ActorMeshKind::V1(actor_mesh),
323            shape: OnceLock::new(),
324            proc_mesh: OnceLock::new(),
325            name: OnceLock::new(),
326        }
327    }
328}
329
330impl<'a, A: Referable> From<v1::ActorMesh<A>> for RootActorMesh<'a, A> {
331    fn from(actor_mesh: v1::ActorMesh<A>) -> Self {
332        actor_mesh.detach().into()
333    }
334}
335
336impl<'a, A: Referable> RootActorMesh<'a, A> {
337    pub(crate) fn new(
338        proc_mesh: &'a ProcMesh,
339        name: String,
340        actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
341        ranks: Vec<ActorRef<A>>,
342    ) -> Self {
343        Self {
344            inner: ActorMeshKind::V0 {
345                proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
346                name,
347                ranks,
348                actor_supervision_rx: Some(actor_supervision_rx),
349            },
350            shape: OnceLock::new(),
351            proc_mesh: OnceLock::new(),
352            name: OnceLock::new(),
353        }
354    }
355
356    pub fn new_v1(actor_mesh: v1::ActorMeshRef<A>) -> Self {
357        Self {
358            inner: ActorMeshKind::V1(actor_mesh),
359            shape: OnceLock::new(),
360            proc_mesh: OnceLock::new(),
361            name: OnceLock::new(),
362        }
363    }
364
365    pub(crate) fn new_shared<D: Deref<Target = ProcMesh> + Send + Sync + 'static>(
366        proc_mesh: D,
367        name: String,
368        actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
369        ranks: Vec<ActorRef<A>>,
370    ) -> Self {
371        Self {
372            inner: ActorMeshKind::V0 {
373                proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
374                name,
375                ranks,
376                actor_supervision_rx: Some(actor_supervision_rx),
377            },
378            shape: OnceLock::new(),
379            proc_mesh: OnceLock::new(),
380            name: OnceLock::new(),
381        }
382    }
383
384    /// Open a port on this ActorMesh.
385    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
386        match &self.inner {
387            ActorMeshKind::V0 { proc_mesh, .. } => proc_mesh.client().open_port(),
388            ActorMeshKind::V1(_actor_mesh) => unimplemented!("unsupported operation"),
389        }
390    }
391
392    /// An event stream of actor events. Each RootActorMesh can produce only one such
393    /// stream, returning None after the first call.
394    pub fn events(&mut self) -> Option<ActorSupervisionEvents> {
395        match &mut self.inner {
396            ActorMeshKind::V0 {
397                actor_supervision_rx,
398                ..
399            } => actor_supervision_rx
400                .take()
401                .map(|actor_supervision_rx| ActorSupervisionEvents {
402                    actor_supervision_rx,
403                    mesh_id: self.id(),
404                }),
405            ActorMeshKind::V1(_actor_mesh) => unimplemented!("unsupported operation"),
406        }
407    }
408
409    /// Access the ranks field (temporary until we remove `ArcActorMesh`).
410    #[cfg(test)]
411    pub(crate) fn ranks(&self) -> &Vec<ActorRef<A>> {
412        match &self.inner {
413            ActorMeshKind::V0 { ranks, .. } => ranks,
414            ActorMeshKind::V1(_actor_mesh) => unimplemented!("unsupported operation"),
415        }
416    }
417}
418
419/// Supervision event stream for actor mesh. It emits actor supervision events.
420pub struct ActorSupervisionEvents {
421    // The receiver of supervision events from proc mesh.
422    actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
423    // The name of the actor mesh.
424    mesh_id: ActorMeshId,
425}
426
427impl ActorSupervisionEvents {
428    pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
429        let result = self.actor_supervision_rx.recv().await;
430        if result.is_none() {
431            tracing::info!(
432                "supervision stream for actor mesh {:?} was closed!",
433                self.mesh_id
434            );
435        }
436        result
437    }
438}
439
440#[async_trait]
441impl<'a, A: Referable> Mesh for RootActorMesh<'a, A> {
442    type Node = ActorRef<A>;
443    type Id = ActorMeshId;
444    type Sliced<'b>
445        = SlicedActorMesh<'b, A>
446    where
447        'a: 'b;
448
449    fn shape(&self) -> &Shape {
450        self.shape.get_or_init(|| match &self.inner {
451            ActorMeshKind::V0 { proc_mesh, .. } => proc_mesh.shape().clone(),
452            ActorMeshKind::V1(actor_mesh) => actor_mesh.region().into(),
453        })
454    }
455
456    fn select<R: Into<Range>>(
457        &self,
458        label: &str,
459        range: R,
460    ) -> Result<Self::Sliced<'_>, ShapeError> {
461        Ok(SlicedActorMesh(self, self.shape().select(label, range)?))
462    }
463
464    fn get(&self, rank: usize) -> Option<ActorRef<A>> {
465        match &self.inner {
466            ActorMeshKind::V0 { ranks, .. } => ranks.get(rank).cloned(),
467            ActorMeshKind::V1(actor_mesh) => actor_mesh.get(rank),
468        }
469    }
470
471    fn id(&self) -> Self::Id {
472        match &self.inner {
473            ActorMeshKind::V0 {
474                proc_mesh, name, ..
475            } => ActorMeshId::V0(proc_mesh.id(), name.clone()),
476            ActorMeshKind::V1(actor_mesh) => ActorMeshId::V1(actor_mesh.name().clone()),
477        }
478    }
479}
480
481impl<A: Referable> ActorMesh for RootActorMesh<'_, A> {
482    type Actor = A;
483
484    fn proc_mesh(&self) -> &ProcMesh {
485        match &self.inner {
486            ActorMeshKind::V0 { proc_mesh, .. } => proc_mesh,
487            ActorMeshKind::V1(actor_mesh) => self
488                .proc_mesh
489                .get_or_init(|| actor_mesh.proc_mesh().clone().into()),
490        }
491    }
492
493    fn name(&self) -> &str {
494        match &self.inner {
495            ActorMeshKind::V0 { name, .. } => name,
496            ActorMeshKind::V1(actor_mesh) => {
497                self.name.get_or_init(|| actor_mesh.name().to_string())
498            }
499        }
500    }
501
502    fn v1(&self) -> Option<v1::ActorMeshRef<Self::Actor>> {
503        match &self.inner {
504            ActorMeshKind::V0 { .. } => None,
505            ActorMeshKind::V1(actor_mesh) => Some(actor_mesh.clone()),
506        }
507    }
508}
509
510pub struct SlicedActorMesh<'a, A: Referable>(&'a RootActorMesh<'a, A>, Shape);
511
512impl<'a, A: Referable> SlicedActorMesh<'a, A> {
513    pub fn new(actor_mesh: &'a RootActorMesh<'a, A>, shape: Shape) -> Self {
514        Self(actor_mesh, shape)
515    }
516
517    pub fn shape(&self) -> &Shape {
518        &self.1
519    }
520}
521
522#[async_trait]
523impl<A: Referable> Mesh for SlicedActorMesh<'_, A> {
524    type Node = ActorRef<A>;
525    type Id = ActorMeshId;
526    type Sliced<'b>
527        = SlicedActorMesh<'b, A>
528    where
529        Self: 'b;
530
531    fn shape(&self) -> &Shape {
532        &self.1
533    }
534
535    fn select<R: Into<Range>>(
536        &self,
537        label: &str,
538        range: R,
539    ) -> Result<Self::Sliced<'_>, ShapeError> {
540        Ok(Self(self.0, self.1.select(label, range)?))
541    }
542
543    fn get(&self, _index: usize) -> Option<ActorRef<A>> {
544        unimplemented!()
545    }
546
547    fn id(&self) -> Self::Id {
548        self.0.id()
549    }
550}
551
552impl<A: Referable> ActorMesh for SlicedActorMesh<'_, A> {
553    type Actor = A;
554
555    fn proc_mesh(&self) -> &ProcMesh {
556        self.0.proc_mesh()
557    }
558
559    fn name(&self) -> &str {
560        self.0.name()
561    }
562
563    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
564    fn cast<M>(&self, cx: &impl context::Actor, sel: Selection, message: M) -> Result<(), CastError>
565    where
566        Self::Actor: RemoteHandles<IndexedErasedUnbound<M>>,
567        M: Castable + RemoteMessage,
568    {
569        cast_to_sliced_mesh::<A, M>(
570            /*cx=*/ cx,
571            /*actor_mesh_id=*/ self.id(),
572            /*comm_actor_ref*/ self.proc_mesh().comm_actor(),
573            /*sel_of_sliced=*/ &sel,
574            /*message=*/ message,
575            /*sliced_shape=*/ self.shape(),
576            /*root_mesh_shape=*/ self.0.shape(),
577        )
578    }
579
580    fn v1(&self) -> Option<v1::ActorMeshRef<Self::Actor>> {
581        self.0
582            .v1()
583            .map(|actor_mesh| actor_mesh.subset(self.shape().into()).unwrap())
584    }
585}
586
587/// The type of error of casting operations.
588#[derive(Debug, thiserror::Error)]
589pub enum CastError {
590    #[error("invalid selection {0}: {1}")]
591    InvalidSelection(Selection, ShapeError),
592
593    #[error("send on rank {0}: {1}")]
594    MailboxSenderError(usize, MailboxSenderError),
595
596    #[error("unsupported selection: {0}")]
597    SelectionNotSupported(String),
598
599    #[error(transparent)]
600    RootMailboxSenderError(#[from] MailboxSenderError),
601
602    #[error(transparent)]
603    ShapeError(#[from] ShapeError),
604
605    #[error(transparent)]
606    SliceError(#[from] SliceError),
607
608    #[error(transparent)]
609    SerializationError(#[from] bincode::Error),
610
611    #[error(transparent)]
612    Other(#[from] anyhow::Error),
613
614    #[error(transparent)]
615    ReshapeError(#[from] ReshapeError),
616}
617
618// This has to be compiled outside of test mode because the bootstrap binary
619// is not built in test mode, and requires access to TestActor.
620pub(crate) mod test_util {
621    use std::collections::VecDeque;
622    use std::fmt;
623    use std::fmt::Debug;
624    use std::sync::Arc;
625
626    use anyhow::ensure;
627    use hyperactor::Context;
628    use hyperactor::Handler;
629    use hyperactor::Instance;
630    use hyperactor::PortRef;
631    use ndslice::extent;
632
633    use super::*;
634    use crate::comm::multicast::CastInfo;
635
636    // This can't be defined under a `#[cfg(test)]` because there needs to
637    // be an entry in the spawnable actor registry in the executable
638    // 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
639    // mesh test suite.
640    #[derive(Debug, Default, Actor)]
641    #[hyperactor::export(
642        spawn = true,
643        handlers = [
644            Echo { cast = true },
645            Payload { cast = true },
646            GetRank { cast = true },
647            Error { cast = true },
648            Relay,
649        ],
650    )]
651    pub struct TestActor;
652
653    /// Request message to retrieve the actor's rank.
654    ///
655    /// The `bool` in the tuple controls the outcome of the handler:
656    /// - If `true`, the handler will send the rank and return
657    ///   `Ok(())`.
658    /// - If `false`, the handler will still send the rank, but return
659    ///   an error (`Err(...)`).
660    ///
661    /// This is useful for testing both successful and failing
662    /// responses from a single message type.
663    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
664    pub struct GetRank(pub bool, #[binding(include)] pub PortRef<usize>);
665
666    #[async_trait]
667    impl Handler<GetRank> for TestActor {
668        async fn handle(
669            &mut self,
670            cx: &Context<Self>,
671            GetRank(ok, reply): GetRank,
672        ) -> Result<(), anyhow::Error> {
673            let point = cx.cast_point();
674            reply.send(cx, point.rank())?;
675            anyhow::ensure!(ok, "intentional error!"); // If `!ok` exit with `Err()`.
676            Ok(())
677        }
678    }
679
680    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
681    pub struct Echo(pub String, #[binding(include)] pub PortRef<String>);
682
683    #[async_trait]
684    impl Handler<Echo> for TestActor {
685        async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
686            let Echo(message, reply_port) = message;
687            reply_port.send(cx, message)?;
688            Ok(())
689        }
690    }
691
692    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
693    pub struct Payload {
694        pub part: Part,
695        #[binding(include)]
696        pub reply_port: PortRef<()>,
697    }
698
699    #[async_trait]
700    impl Handler<Payload> for TestActor {
701        async fn handle(
702            &mut self,
703            cx: &Context<Self>,
704            message: Payload,
705        ) -> Result<(), anyhow::Error> {
706            let Payload { reply_port, .. } = message;
707            reply_port.send(cx, ())?;
708            Ok(())
709        }
710    }
711
712    #[derive(Debug, Serialize, Deserialize, Named, Clone, Bind, Unbind)]
713    pub struct Error(pub String);
714
715    #[async_trait]
716    impl Handler<Error> for TestActor {
717        async fn handle(
718            &mut self,
719            _cx: &Context<Self>,
720            Error(error): Error,
721        ) -> Result<(), anyhow::Error> {
722            Err(anyhow::anyhow!("{}", error))
723        }
724    }
725
726    #[derive(Debug, Serialize, Deserialize, Named, Clone)]
727    pub struct Relay(pub usize, pub VecDeque<PortRef<Relay>>);
728
729    #[async_trait]
730    impl Handler<Relay> for TestActor {
731        async fn handle(
732            &mut self,
733            cx: &Context<Self>,
734            Relay(count, mut hops): Relay,
735        ) -> Result<(), anyhow::Error> {
736            ensure!(!hops.is_empty(), "relay must have at least one hop");
737            let next = hops.pop_front().unwrap();
738            next.send(cx, Relay(count + 1, hops))?;
739            Ok(())
740        }
741    }
742
743    // -- ProxyActor
744
745    #[hyperactor::export(
746        spawn = true,
747        handlers = [
748            Echo,
749        ],
750    )]
751    pub struct ProxyActor {
752        proc_mesh: &'static Arc<ProcMesh>,
753        actor_mesh: Option<RootActorMesh<'static, TestActor>>,
754    }
755
756    impl fmt::Debug for ProxyActor {
757        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758            f.debug_struct("ProxyActor")
759                .field("proc_mesh", &"...")
760                .field("actor_mesh", &"...")
761                .finish()
762        }
763    }
764
765    #[async_trait]
766    impl Actor for ProxyActor {
767        type Params = ();
768
769        async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
770            // The actor creates a mesh.
771            use std::sync::Arc;
772
773            use hyperactor::channel::ChannelTransport;
774
775            use crate::alloc::AllocSpec;
776            use crate::alloc::Allocator;
777            use crate::alloc::LocalAllocator;
778
779            let mut allocator = LocalAllocator;
780            let alloc = allocator
781                .allocate(AllocSpec {
782                    extent: extent! { replica = 1 },
783                    constraints: Default::default(),
784                    proc_name: None,
785                    transport: ChannelTransport::Local,
786                    proc_allocation_mode: Default::default(),
787                })
788                .await
789                .unwrap();
790            let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
791            let leaked: &'static Arc<ProcMesh> = Box::leak(Box::new(proc_mesh));
792            Ok(Self {
793                proc_mesh: leaked,
794                actor_mesh: None,
795            })
796        }
797
798        async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
799            self.actor_mesh = Some(self.proc_mesh.spawn(this, "echo", &()).await?);
800            Ok(())
801        }
802    }
803
804    #[async_trait]
805    impl Handler<Echo> for ProxyActor {
806        async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
807            if std::env::var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK").is_err() {
808                // test_proxy_mesh
809
810                let actor = self.actor_mesh.as_ref().unwrap().get(0).unwrap();
811
812                // For now, we reply directly to the client.
813                // We will support directly wiring up the meshes later.
814                let (tx, mut rx) = cx.open_port();
815
816                actor.send(cx, Echo(message.0, tx.bind()))?;
817                message.1.send(cx, rx.recv().await.unwrap())?;
818
819                Ok(())
820            } else {
821                // test_router_undeliverable_return
822
823                let actor: ActorRef<_> = self.actor_mesh.as_ref().unwrap().get(0).unwrap();
824                let (tx, mut rx) = cx.open_port::<String>();
825                actor.send(cx, Echo(message.0, tx.bind()))?;
826
827                use tokio::time::Duration;
828                use tokio::time::timeout;
829                #[allow(clippy::disallowed_methods)]
830                if timeout(Duration::from_secs(1), rx.recv()).await.is_ok() {
831                    message
832                        .1
833                        .send(cx, "the impossible happened".to_owned())
834                        .unwrap()
835                }
836
837                Ok(())
838            }
839        }
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use std::sync::Arc;
846
847    use hyperactor::ActorId;
848    use hyperactor::PortRef;
849    use hyperactor::ProcId;
850    use hyperactor::WorldId;
851    use hyperactor::attrs::Attrs;
852    use hyperactor::data::Encoding;
853    use timed_test::async_timed_test;
854
855    use super::*;
856    use crate::proc_mesh::ProcEvent;
857
858    // These tests are parametric over allocators.
859    #[macro_export]
860    macro_rules! actor_mesh_test_suite {
861        ($allocator:expr) => {
862            use std::assert_matches::assert_matches;
863
864            use ndslice::extent;
865            use $crate::alloc::AllocSpec;
866            use $crate::alloc::Allocator;
867            use $crate::assign::Ranks;
868            use $crate::sel_from_shape;
869            use $crate::sel;
870            use $crate::comm::multicast::set_cast_info_on_headers;
871            use $crate::proc_mesh::SharedSpawnable;
872            use std::collections::VecDeque;
873            use hyperactor::data::Serialized;
874            use $crate::proc_mesh::default_transport;
875
876            use super::*;
877            use super::test_util::*;
878
879            #[tokio::test]
880            async fn test_proxy_mesh() {
881                use super::test_util::*;
882                use $crate::alloc::AllocSpec;
883                use $crate::alloc::Allocator;
884
885                use ndslice::extent;
886
887                let alloc = $allocator
888                    .allocate(AllocSpec {
889                        extent: extent! { replica = 1 },
890                        constraints: Default::default(),
891                        proc_name: None,
892                        transport: default_transport(),
893                        proc_allocation_mode: Default::default(),
894                    })
895                    .await
896                    .unwrap();
897                let instance = $crate::v1::testing::instance().await;
898                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
899                let actor_mesh: RootActorMesh<'_, ProxyActor> = proc_mesh.spawn(&instance, "proxy", &()).await.unwrap();
900                let proxy_actor = actor_mesh.get(0).unwrap();
901                let (tx, mut rx) = actor_mesh.open_port::<String>();
902                proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind())).unwrap();
903
904                #[allow(clippy::disallowed_methods)]
905                match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
906                    Ok(msg) => assert_eq!(&msg.unwrap(), "hello!"),
907                    Err(_) =>  assert!(false),
908                }
909            }
910
911            #[tokio::test]
912            async fn test_basic() {
913                let alloc = $allocator
914                    .allocate(AllocSpec {
915                        extent: extent!(replica = 4),
916                        constraints: Default::default(),
917                        proc_name: None,
918                        transport: default_transport(),
919                        proc_allocation_mode: Default::default(),
920                    })
921                    .await
922                    .unwrap();
923
924                let instance = $crate::v1::testing::instance().await;
925                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
926                let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn(&instance, "echo", &()).await.unwrap();
927                let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
928                actor_mesh
929                    .cast(proc_mesh.client(), sel!(*), Echo("Hello".to_string(), reply_handle.bind()))
930                    .unwrap();
931                for _ in 0..4 {
932                    assert_eq!(&reply_receiver.recv().await.unwrap(), "Hello");
933                }
934            }
935
936            #[tokio::test]
937            async fn test_ping_pong() {
938                use hyperactor::test_utils::pingpong::PingPongActor;
939                use hyperactor::test_utils::pingpong::PingPongMessage;
940                use hyperactor::test_utils::pingpong::PingPongActorParams;
941
942                let alloc = $allocator
943                    .allocate(AllocSpec {
944                        extent: extent!(replica = 2),
945                        constraints: Default::default(),
946                        proc_name: None,
947                        transport: default_transport(),
948                        proc_allocation_mode: Default::default(),
949                    })
950                    .await
951                    .unwrap();
952                let instance = $crate::v1::testing::instance().await;
953                let mesh = ProcMesh::allocate(alloc).await.unwrap();
954
955                let (undeliverable_msg_tx, _) = mesh.client().open_port();
956                let ping_pong_actor_params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
957                let actor_mesh: RootActorMesh<PingPongActor> = mesh
958                    .spawn::<PingPongActor>(&instance, "ping-pong", &ping_pong_actor_params)
959                    .await
960                    .unwrap();
961
962                let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
963                let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
964                let (done_tx, done_rx) = mesh.client().open_once_port();
965                ping.send(mesh.client(), PingPongMessage(4, pong.clone(), done_tx.bind())).unwrap();
966
967                assert!(done_rx.recv().await.unwrap());
968            }
969
970            #[tokio::test]
971            async fn test_pingpong_full_mesh() {
972                use hyperactor::test_utils::pingpong::PingPongActor;
973                use hyperactor::test_utils::pingpong::PingPongActorParams;
974                use hyperactor::test_utils::pingpong::PingPongMessage;
975
976                use futures::future::join_all;
977
978                const X: usize = 3;
979                const Y: usize = 3;
980                const Z: usize = 3;
981                let alloc = $allocator
982                    .allocate(AllocSpec {
983                        extent: extent!(x = X, y = Y, z = Z),
984                        constraints: Default::default(),
985                        proc_name: None,
986                        transport: default_transport(),
987                        proc_allocation_mode: Default::default(),
988                    })
989                    .await
990                    .unwrap();
991
992                let instance = $crate::v1::testing::instance().await;
993                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
994                let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port();
995                let params = PingPongActorParams::new(Some(undeliverable_tx.bind()), None);
996                let actor_mesh = proc_mesh.spawn::<PingPongActor>(&instance, "pingpong", &params).await.unwrap();
997                let slice = actor_mesh.shape().slice();
998
999                let mut futures = Vec::new();
1000                for rank in slice.iter() {
1001                    let actor = actor_mesh.get(rank).unwrap();
1002                    let coords = (&slice.coordinates(rank).unwrap()[..]).try_into().unwrap();
1003                    let sizes = (&slice.sizes())[..].try_into().unwrap();
1004                    let neighbors = ndslice::utils::stencil::moore_neighbors::<3>();
1005                    for neighbor_coords in ndslice::utils::apply_stencil(&coords, sizes, &neighbors) {
1006                        if let Ok(neighbor_rank) = slice.location(&neighbor_coords) {
1007                            let neighbor = actor_mesh.get(neighbor_rank).unwrap();
1008                            let (done_tx, done_rx) = proc_mesh.client().open_once_port();
1009                            actor
1010                                .send(
1011                                    proc_mesh.client(),
1012                                    PingPongMessage(4, neighbor.clone(), done_tx.bind()),
1013                                )
1014                                .unwrap();
1015                            futures.push(done_rx.recv());
1016                        }
1017                    }
1018                }
1019                let results = join_all(futures).await;
1020                assert_eq!(results.len(), 316); // 5180 messages
1021                for result in results {
1022                    assert_eq!(result.unwrap(), true);
1023                }
1024            }
1025
1026            #[tokio::test]
1027            async fn test_cast() {
1028                let alloc = $allocator
1029                    .allocate(AllocSpec {
1030                        extent: extent!(replica = 2, host = 2, gpu = 8),
1031                        constraints: Default::default(),
1032                        proc_name: None,
1033                        transport: default_transport(),
1034                        proc_allocation_mode: Default::default(),
1035                    })
1036                    .await
1037                    .unwrap();
1038
1039                let instance = $crate::v1::testing::instance().await;
1040                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1041                let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn(&instance, "echo", &()).await.unwrap();
1042                let dont_simulate_error = true;
1043                let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1044                actor_mesh
1045                    .cast(proc_mesh.client(), sel!(*), GetRank(dont_simulate_error, reply_handle.bind()))
1046                    .unwrap();
1047                let mut ranks = Ranks::new(actor_mesh.shape().slice().len());
1048                while !ranks.is_full() {
1049                    let rank = reply_receiver.recv().await.unwrap();
1050                    assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
1051                }
1052                // Retrieve all GPUs on replica=0, host=0
1053                let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1054                actor_mesh
1055                    .cast(
1056                        proc_mesh.client(),
1057                        sel_from_shape!(actor_mesh.shape(), replica = 0, host = 0),
1058                        GetRank(dont_simulate_error, reply_handle.bind()),
1059                    )
1060                    .unwrap();
1061                let mut ranks = Ranks::new(8);
1062                while !ranks.is_full() {
1063                    let rank = reply_receiver.recv().await.unwrap();
1064                    assert!(ranks.insert(rank, rank).is_none(), "duplicate rank {rank}");
1065                }
1066            }
1067
1068            #[tokio::test]
1069            async fn test_inter_actor_comms() {
1070                let alloc = $allocator
1071                    .allocate(AllocSpec {
1072                        // Sizes intentionally small to keep the time
1073                        // required for this test in the process case
1074                        // reasonable (< 60s).
1075                        extent: extent!(replica = 2, host = 2, gpu = 8),
1076                        constraints: Default::default(),
1077                        proc_name: None,
1078                        transport: default_transport(),
1079                        proc_allocation_mode: Default::default(),
1080                    })
1081                    .await
1082                    .unwrap();
1083
1084                let instance = $crate::v1::testing::instance().await;
1085                let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1086                let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn(&instance, "echo", &()).await.unwrap();
1087
1088                // Bounce the message through all actors and return it to the sender (us).
1089                let mut hops: VecDeque<_> = actor_mesh.iter().map(|actor| actor.port()).collect();
1090                let (handle, mut rx) = proc_mesh.client().open_port();
1091                hops.push_back(handle.bind());
1092                hops.pop_front()
1093                    .unwrap()
1094                    .send(proc_mesh.client(), Relay(0, hops))
1095                    .unwrap();
1096                assert_matches!(
1097                    rx.recv().await.unwrap(),
1098                    Relay(count, hops)
1099                        if count == actor_mesh.shape().slice().len()
1100                        && hops.is_empty());
1101            }
1102
1103            #[tokio::test]
1104            async fn test_inter_proc_mesh_comms() {
1105                let mut meshes = Vec::new();
1106                let instance = $crate::v1::testing::instance().await;
1107                for _ in 0..2 {
1108                    let alloc = $allocator
1109                        .allocate(AllocSpec {
1110                            extent: extent!(replica = 1),
1111                            constraints: Default::default(),
1112                            proc_name: None,
1113                            transport: default_transport(),
1114                            proc_allocation_mode: Default::default(),
1115                        })
1116                        .await
1117                        .unwrap();
1118
1119                    let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
1120                    let proc_mesh_clone = Arc::clone(&proc_mesh);
1121                    let actor_mesh : RootActorMesh<TestActor> = proc_mesh_clone.spawn(&instance, "echo", &()).await.unwrap();
1122                    meshes.push((proc_mesh, actor_mesh));
1123                }
1124
1125                let mut hops: VecDeque<_> = meshes
1126                    .iter()
1127                    .flat_map(|(_proc_mesh, actor_mesh)| actor_mesh.iter())
1128                    .map(|actor| actor.port())
1129                    .collect();
1130                let num_hops = hops.len();
1131
1132                let client = meshes[0].0.client();
1133                let (handle, mut rx) = client.open_port();
1134                hops.push_back(handle.bind());
1135                hops.pop_front()
1136                    .unwrap()
1137                    .send(client, Relay(0, hops))
1138                    .unwrap();
1139                assert_matches!(
1140                    rx.recv().await.unwrap(),
1141                    Relay(count, hops)
1142                        if count == num_hops
1143                        && hops.is_empty());
1144            }
1145
1146            #[async_timed_test(timeout_secs = 60)]
1147            async fn test_actor_mesh_cast() {
1148                // Verify a full broadcast in the mesh. Send a message
1149                // to every actor and check each actor receives it.
1150
1151                use $crate::sel;
1152                use $crate::comm::test_utils::TestActor as CastTestActor;
1153                use $crate::comm::test_utils::TestActorParams as CastTestActorParams;
1154                use $crate::comm::test_utils::TestMessage as CastTestMessage;
1155
1156                let extent = extent!(replica = 4, host = 4, gpu = 4);
1157                let num_actors = extent.len();
1158                let alloc = $allocator
1159                    .allocate(AllocSpec {
1160                        extent,
1161                        constraints: Default::default(),
1162                        proc_name: None,
1163                        transport: default_transport(),
1164                        proc_allocation_mode: Default::default(),
1165                    })
1166                    .await
1167                    .unwrap();
1168
1169                let instance = $crate::v1::testing::instance().await;
1170                let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1171
1172                let (tx, mut rx) = hyperactor::mailbox::open_port(proc_mesh.client());
1173                let params = CastTestActorParams{ forward_port: tx.bind() };
1174                let actor_mesh: RootActorMesh<CastTestActor> = proc_mesh.spawn(&instance, "actor", &params).await.unwrap();
1175
1176                actor_mesh.cast(proc_mesh.client(), sel!(*), CastTestMessage::Forward("abc".to_string())).unwrap();
1177
1178                for _ in 0..num_actors {
1179                    assert_eq!(rx.recv().await.unwrap(), CastTestMessage::Forward("abc".to_string()));
1180                }
1181
1182                // Attempt to avoid this intermittent fatal error.
1183                // ⚠ Fatal: monarch/hyperactor_mesh:hyperactor_mesh-unittest - \
1184                //            actor_mesh::tests::sim::test_actor_mesh_cast (2.5s)
1185                // Test appears to have passed but the binary exited with a non-zero exit code.
1186                proc_mesh.events().unwrap().into_alloc().stop_and_wait().await.unwrap();
1187            }
1188
1189            #[tokio::test]
1190            async fn test_delivery_failure() {
1191                let alloc = $allocator
1192                    .allocate(AllocSpec {
1193                        extent: extent!(replica = 1 ),
1194                        constraints: Default::default(),
1195                        proc_name: None,
1196                        transport: default_transport(),
1197                        proc_allocation_mode: Default::default(),
1198                    })
1199                    .await
1200                    .unwrap();
1201
1202                let name = alloc.name().to_string();
1203                let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1204                let mut events = mesh.events().unwrap();
1205
1206                // Send a message to a non-existent actor (the proc however exists).
1207                let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
1208                let bad_actor = ActorRef::<TestActor>::attest(ActorId(ProcId::Ranked(WorldId(name.clone()), 0), "foo".into(), 0));
1209                bad_actor.send(mesh.client(), GetRank(true, unmonitored_reply_to)).unwrap();
1210
1211                // The message will be returned!
1212                assert_matches!(
1213                    events.next().await.unwrap(),
1214                    ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1215                );
1216
1217                // TODO: Stop the proc.
1218            }
1219
1220            #[tokio::test]
1221            async fn test_send_with_headers() {
1222                let extent = extent!(replica = 3);
1223                let alloc = $allocator
1224                    .allocate(AllocSpec {
1225                        extent: extent.clone(),
1226                        constraints: Default::default(),
1227                        proc_name: None,
1228                        transport: default_transport(),
1229                        proc_allocation_mode: Default::default(),
1230                    })
1231                    .await
1232                    .unwrap();
1233
1234                let instance = $crate::v1::testing::instance().await;
1235                let mesh = ProcMesh::allocate(alloc).await.unwrap();
1236                let (reply_port_handle, mut reply_port_receiver) = mesh.client().open_port::<usize>();
1237                let reply_port = reply_port_handle.bind();
1238
1239                let actor_mesh: RootActorMesh<TestActor> = mesh.spawn(&instance, "test", &()).await.unwrap();
1240                let actor_ref = actor_mesh.get(0).unwrap();
1241                let mut headers = Attrs::new();
1242                set_cast_info_on_headers(&mut headers, extent.point_of_rank(0).unwrap(), mesh.client().self_id().clone());
1243                actor_ref.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone())).unwrap();
1244                assert_eq!(0, reply_port_receiver.recv().await.unwrap());
1245
1246                set_cast_info_on_headers(&mut headers, extent.point_of_rank(1).unwrap(), mesh.client().self_id().clone());
1247                actor_ref.port()
1248                    .send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone()))
1249                    .unwrap();
1250                assert_eq!(1, reply_port_receiver.recv().await.unwrap());
1251
1252                set_cast_info_on_headers(&mut headers, extent.point_of_rank(2).unwrap(), mesh.client().self_id().clone());
1253                actor_ref.actor_id()
1254                    .port_id(GetRank::port())
1255                    .send_with_headers(
1256                        mesh.client(),
1257                        Serialized::serialize(&GetRank(true, reply_port)).unwrap(),
1258                        headers
1259                    );
1260                assert_eq!(2, reply_port_receiver.recv().await.unwrap());
1261                // TODO: Stop the proc.
1262            }
1263        }
1264    }
1265
1266    mod local {
1267        use hyperactor::channel::ChannelTransport;
1268
1269        use crate::alloc::local::LocalAllocator;
1270
1271        actor_mesh_test_suite!(LocalAllocator);
1272
1273        #[tokio::test]
1274        async fn test_send_failure() {
1275            hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default());
1276
1277            use hyperactor::test_utils::pingpong::PingPongActor;
1278            use hyperactor::test_utils::pingpong::PingPongActorParams;
1279            use hyperactor::test_utils::pingpong::PingPongMessage;
1280
1281            use crate::alloc::ProcStopReason;
1282            use crate::proc_mesh::ProcEvent;
1283
1284            let config = hyperactor::config::global::lock();
1285            let _guard = config.override_key(
1286                hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1287                tokio::time::Duration::from_secs(1),
1288            );
1289
1290            let alloc = LocalAllocator
1291                .allocate(AllocSpec {
1292                    extent: extent!(replica = 2),
1293                    constraints: Default::default(),
1294                    proc_name: None,
1295                    transport: ChannelTransport::Local,
1296                    proc_allocation_mode: Default::default(),
1297                })
1298                .await
1299                .unwrap();
1300            let instance = crate::v1::testing::instance().await;
1301            let monkey = alloc.chaos_monkey();
1302            let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1303            let mut events = mesh.events().unwrap();
1304
1305            let ping_pong_actor_params = PingPongActorParams::new(
1306                Some(PortRef::attest_message_port(mesh.client().self_id())),
1307                None,
1308            );
1309            let actor_mesh: RootActorMesh<PingPongActor> = mesh
1310                .spawn::<PingPongActor>(&instance, "ping-pong", &ping_pong_actor_params)
1311                .await
1312                .unwrap();
1313
1314            let ping: ActorRef<PingPongActor> = actor_mesh.get(0).unwrap();
1315            let pong: ActorRef<PingPongActor> = actor_mesh.get(1).unwrap();
1316
1317            // Kill ping.
1318            monkey(0, ProcStopReason::Killed(0, false));
1319            assert_matches!(
1320                events.next().await.unwrap(),
1321                ProcEvent::Stopped(0, ProcStopReason::Killed(0, false))
1322            );
1323
1324            // Try to send a message to 'ping'. Since 'ping's mailbox
1325            // is stopped, the send will timeout and fail.
1326            let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1327            ping.send(
1328                mesh.client(),
1329                PingPongMessage(1, pong.clone(), unmonitored_done_tx.bind()),
1330            )
1331            .unwrap();
1332
1333            // The message will be returned!
1334            assert_matches!(
1335                events.next().await.unwrap(),
1336                ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1337            );
1338
1339            // Get 'pong' to send 'ping' a message. Since 'ping's
1340            // mailbox is stopped, the send will timeout and fail.
1341            let (unmonitored_done_tx, _) = mesh.client().open_once_port();
1342            pong.send(
1343                mesh.client(),
1344                PingPongMessage(1, ping.clone(), unmonitored_done_tx.bind()),
1345            )
1346            .unwrap();
1347
1348            // The message will be returned!
1349            assert_matches!(
1350                events.next().await.unwrap(),
1351                ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1352            );
1353        }
1354
1355        #[tokio::test]
1356        async fn test_cast_failure() {
1357            use crate::alloc::ProcStopReason;
1358            use crate::proc_mesh::ProcEvent;
1359            use crate::sel;
1360
1361            let alloc = LocalAllocator
1362                .allocate(AllocSpec {
1363                    extent: extent!(replica = 1),
1364                    constraints: Default::default(),
1365                    proc_name: None,
1366                    transport: ChannelTransport::Local,
1367                    proc_allocation_mode: Default::default(),
1368                })
1369                .await
1370                .unwrap();
1371            let instance = crate::v1::testing::instance().await;
1372
1373            let stop = alloc.stopper();
1374            let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
1375            let mut events = mesh.events().unwrap();
1376
1377            let actor_mesh = mesh
1378                .spawn::<TestActor>(&instance, "reply-then-fail", &())
1379                .await
1380                .unwrap();
1381
1382            // `GetRank` with `false` means exit with error after
1383            // replying with rank.
1384            let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1385            actor_mesh
1386                .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1387                .unwrap();
1388            let rank = reply_receiver.recv().await.unwrap();
1389            assert_eq!(rank, 0);
1390
1391            // The above is expected to trigger a proc crash.
1392            assert_matches!(
1393                events.next().await.unwrap(),
1394                ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
1395            );
1396
1397            // Cast the message.
1398            let (reply_handle, _) = actor_mesh.open_port();
1399            actor_mesh
1400                .cast(mesh.client(), sel!(*), GetRank(false, reply_handle.bind()))
1401                .unwrap();
1402
1403            // The message will be returned!
1404            assert_matches!(
1405                events.next().await.unwrap(),
1406                ProcEvent::Crashed(0, reason) if reason.contains("message not delivered")
1407            );
1408
1409            // Stop the mesh.
1410            stop();
1411            assert_matches!(
1412                events.next().await.unwrap(),
1413                ProcEvent::Stopped(0, ProcStopReason::Stopped),
1414            );
1415            assert!(events.next().await.is_none());
1416        }
1417
1418        #[tracing_test::traced_test]
1419        #[tokio::test]
1420        async fn test_stop_actor_mesh() {
1421            use hyperactor::test_utils::pingpong::PingPongActor;
1422            use hyperactor::test_utils::pingpong::PingPongActorParams;
1423            use hyperactor::test_utils::pingpong::PingPongMessage;
1424
1425            let config = hyperactor::config::global::lock();
1426            let _guard = config.override_key(
1427                hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1428                tokio::time::Duration::from_secs(1),
1429            );
1430
1431            let alloc = LocalAllocator
1432                .allocate(AllocSpec {
1433                    extent: extent!(replica = 2),
1434                    constraints: Default::default(),
1435                    proc_name: None,
1436                    transport: ChannelTransport::Local,
1437                    proc_allocation_mode: Default::default(),
1438                })
1439                .await
1440                .unwrap();
1441            let instance = crate::v1::testing::instance().await;
1442            let mesh = ProcMesh::allocate(alloc).await.unwrap();
1443
1444            let ping_pong_actor_params = PingPongActorParams::new(
1445                Some(PortRef::attest_message_port(mesh.client().self_id())),
1446                None,
1447            );
1448            let mesh_one: RootActorMesh<PingPongActor> = mesh
1449                .spawn::<PingPongActor>(&instance, "mesh_one", &ping_pong_actor_params)
1450                .await
1451                .unwrap();
1452
1453            let mesh_two: RootActorMesh<PingPongActor> = mesh
1454                .spawn::<PingPongActor>(&instance, "mesh_two", &ping_pong_actor_params)
1455                .await
1456                .unwrap();
1457
1458            mesh_two.stop(&instance).await.unwrap();
1459
1460            let ping_two: ActorRef<PingPongActor> = mesh_two.get(0).unwrap();
1461            let pong_two: ActorRef<PingPongActor> = mesh_two.get(1).unwrap();
1462
1463            assert!(logs_contain(&format!(
1464                "stopped actor {}",
1465                ping_two.actor_id()
1466            )));
1467            assert!(logs_contain(&format!(
1468                "stopped actor {}",
1469                pong_two.actor_id()
1470            )));
1471
1472            // Other actor meshes on this proc mesh should still be up and running
1473            let ping_one: ActorRef<PingPongActor> = mesh_one.get(0).unwrap();
1474            let pong_one: ActorRef<PingPongActor> = mesh_one.get(1).unwrap();
1475            let (done_tx, done_rx) = mesh.client().open_once_port();
1476            pong_one
1477                .send(
1478                    mesh.client(),
1479                    PingPongMessage(1, ping_one.clone(), done_tx.bind()),
1480                )
1481                .unwrap();
1482            assert!(done_rx.recv().await.is_ok());
1483        }
1484    } // mod local
1485
1486    mod process {
1487
1488        use bytes::Bytes;
1489        use hyperactor::PortId;
1490        use hyperactor::channel::ChannelTransport;
1491        use hyperactor::clock::Clock;
1492        use hyperactor::clock::RealClock;
1493        use hyperactor::mailbox::MessageEnvelope;
1494        use rand::Rng;
1495        use tokio::process::Command;
1496
1497        use crate::alloc::process::ProcessAllocator;
1498
1499        #[cfg(fbcode_build)]
1500        fn process_allocator() -> ProcessAllocator {
1501            ProcessAllocator::new(Command::new(crate::testresource::get(
1502                "monarch/hyperactor_mesh/bootstrap",
1503            )))
1504        }
1505
1506        #[cfg(fbcode_build)] // we use an external binary, produced by buck
1507        actor_mesh_test_suite!(process_allocator());
1508
1509        // This test is concerned with correctly reporting failures
1510        // when message sizes exceed configured limits.
1511        #[cfg(fbcode_build)]
1512        //#[tracing_test::traced_test]
1513        #[async_timed_test(timeout_secs = 30)]
1514        async fn test_oversized_frames() {
1515            // Reproduced from 'net.rs'.
1516            #[derive(Debug, Serialize, Deserialize, PartialEq)]
1517            enum Frame<M> {
1518                Init(u64),
1519                Message(u64, M),
1520            }
1521            // Calculate the frame length for the given message.
1522            fn frame_length(src: &ActorId, dst: &PortId, pay: &Payload) -> usize {
1523                let serialized = Serialized::serialize(pay).unwrap();
1524                let mut headers = Attrs::new();
1525                hyperactor::mailbox::headers::set_send_timestamp(&mut headers);
1526                hyperactor::mailbox::headers::set_rust_message_type::<Payload>(&mut headers);
1527                let envelope = MessageEnvelope::new(src.clone(), dst.clone(), serialized, headers);
1528                let frame = Frame::Message(0u64, envelope);
1529                let message = serde_multipart::serialize_illegal_bincode(&frame).unwrap();
1530                message.frame_len()
1531            }
1532
1533            // This process: short delivery timeout.
1534            let config = hyperactor::config::global::lock();
1535            // This process (write): max frame len for frame writes.
1536            let _guard2 =
1537                config.override_key(hyperactor::config::CODEC_MAX_FRAME_LENGTH, 1024usize);
1538            // Remote process (read): max frame len for frame reads.
1539            // SAFETY: Ok here but not safe for concurrent access.
1540            unsafe {
1541                std::env::set_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH", "1024");
1542            };
1543            let _guard3 =
1544                config.override_key(hyperactor::config::DEFAULT_ENCODING, Encoding::Bincode);
1545            let _guard4 = config.override_key(hyperactor::config::CHANNEL_MULTIPART, false);
1546
1547            let alloc = process_allocator()
1548                .allocate(AllocSpec {
1549                    extent: extent!(replica = 1),
1550                    constraints: Default::default(),
1551                    proc_name: None,
1552                    transport: ChannelTransport::Unix,
1553                    proc_allocation_mode: Default::default(),
1554                })
1555                .await
1556                .unwrap();
1557            let instance = crate::v1::testing::instance().await;
1558            let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1559            let mut proc_events = proc_mesh.events().unwrap();
1560            let actor_mesh: RootActorMesh<TestActor> =
1561                proc_mesh.spawn(&instance, "ingest", &()).await.unwrap();
1562            let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1563            let dest = actor_mesh.get(0).unwrap();
1564
1565            // Message sized to exactly max frame length.
1566            let payload = Payload {
1567                part: Part::from(Bytes::from(vec![0u8; 586])),
1568                reply_port: reply_handle.bind(),
1569            };
1570            let frame_len = frame_length(
1571                proc_mesh.client().self_id(),
1572                dest.port::<Payload>().port_id(),
1573                &payload,
1574            );
1575            assert_eq!(frame_len, 1024);
1576
1577            // Send direct. A cast message is > 1024 bytes.
1578            dest.send(proc_mesh.client(), payload).unwrap();
1579            #[allow(clippy::disallowed_methods)]
1580            let result = RealClock
1581                .timeout(Duration::from_secs(2), reply_receiver.recv())
1582                .await;
1583            assert!(result.is_ok(), "Operation should not time out");
1584
1585            // Message sized to max frame length + 1.
1586            let payload = Payload {
1587                part: Part::from(Bytes::from(vec![0u8; 587])),
1588                reply_port: reply_handle.bind(),
1589            };
1590            let frame_len = frame_length(
1591                proc_mesh.client().self_id(),
1592                dest.port::<Payload>().port_id(),
1593                &payload,
1594            );
1595            assert_eq!(frame_len, 1025); // over the max frame len
1596
1597            // Send direct or cast. Either are guaranteed over the
1598            // limit and will fail.
1599            if rand::thread_rng().gen_bool(0.5) {
1600                dest.send(proc_mesh.client(), payload).unwrap();
1601            } else {
1602                actor_mesh
1603                    .cast(proc_mesh.client(), sel!(*), payload)
1604                    .unwrap();
1605            }
1606
1607            // The undeliverable supervision event that happens next
1608            // does not depend on a timeout.
1609            {
1610                let event = proc_events.next().await.unwrap();
1611                assert_matches!(
1612                    event,
1613                    ProcEvent::Crashed(_, _),
1614                    "Should have received crash event"
1615                );
1616            }
1617        }
1618
1619        // Set this test only for `mod process` because it relies on a
1620        // trick to emulate router failure that only works when using
1621        // non-local allocators.
1622        #[cfg(fbcode_build)]
1623        #[tokio::test]
1624        async fn test_router_undeliverable_return() {
1625            // Test that an undeliverable message received by a
1626            // router results in actor mesh supervision events.
1627            use ndslice::extent;
1628
1629            use super::test_util::*;
1630            use crate::alloc::AllocSpec;
1631            use crate::alloc::Allocator;
1632
1633            let alloc = process_allocator()
1634                .allocate(AllocSpec {
1635                    extent: extent! { replica = 1 },
1636                    constraints: Default::default(),
1637                    proc_name: None,
1638                    transport: ChannelTransport::Unix,
1639                    proc_allocation_mode: Default::default(),
1640                })
1641                .await
1642                .unwrap();
1643
1644            // SAFETY: Not multithread safe.
1645            unsafe { std::env::set_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK", "1") };
1646
1647            let instance = crate::v1::testing::instance().await;
1648            let mut proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
1649            let mut proc_events = proc_mesh.events().unwrap();
1650            let mut actor_mesh: RootActorMesh<'_, ProxyActor> =
1651                { proc_mesh.spawn(&instance, "proxy", &()).await.unwrap() };
1652            let mut actor_events = actor_mesh.events().unwrap();
1653
1654            let proxy_actor = actor_mesh.get(0).unwrap();
1655            let (tx, mut rx) = actor_mesh.open_port::<String>();
1656            proxy_actor
1657                .send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))
1658                .unwrap();
1659
1660            #[allow(clippy::disallowed_methods)]
1661            match tokio::time::timeout(tokio::time::Duration::from_secs(3), rx.recv()).await {
1662                Ok(_) => panic!("the impossible happened"),
1663                Err(_) => {
1664                    assert_matches!(
1665                        proc_events.next().await.unwrap(),
1666                        ProcEvent::Crashed(0, reason) if reason.contains("undeliverable")
1667                    );
1668                    assert_eq!(
1669                        actor_events.next().await.unwrap().actor_id.name(),
1670                        actor_mesh.name(),
1671                    );
1672                }
1673            }
1674
1675            // SAFETY: Not multithread safe.
1676            unsafe { std::env::remove_var("HYPERACTOR_MESH_ROUTER_NO_GLOBAL_FALLBACK") };
1677        }
1678    }
1679
1680    mod sim {
1681        use crate::alloc::sim::SimAllocator;
1682
1683        actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1684    }
1685
1686    mod reshape_cast {
1687        use async_trait::async_trait;
1688        use hyperactor::Actor;
1689        use hyperactor::Context;
1690        use hyperactor::Handler;
1691        use hyperactor::channel::ChannelAddr;
1692        use hyperactor::channel::ChannelTransport;
1693        use hyperactor::channel::ChannelTx;
1694        use hyperactor::channel::Rx;
1695        use hyperactor::channel::Tx;
1696        use hyperactor::channel::dial;
1697        use hyperactor::channel::serve;
1698        use hyperactor::clock::Clock;
1699        use hyperactor::clock::RealClock;
1700        use ndslice::Selection;
1701
1702        use crate::Mesh;
1703        use crate::ProcMesh;
1704        use crate::RootActorMesh;
1705        use crate::actor_mesh::ActorMesh;
1706        use crate::alloc::AllocSpec;
1707        use crate::alloc::Allocator;
1708        use crate::alloc::LocalAllocator;
1709        use crate::config::MAX_CAST_DIMENSION_SIZE;
1710
1711        #[derive(Debug)]
1712        #[hyperactor::export(
1713            spawn = true,
1714            handlers = [() { cast = true }],
1715        )]
1716        struct EchoActor(ChannelTx<usize>);
1717
1718        #[async_trait]
1719        impl Actor for EchoActor {
1720            type Params = ChannelAddr;
1721
1722            async fn new(params: ChannelAddr) -> Result<Self, anyhow::Error> {
1723                Ok(Self(dial::<usize>(params)?))
1724            }
1725        }
1726
1727        #[async_trait]
1728        impl Handler<()> for EchoActor {
1729            async fn handle(
1730                &mut self,
1731                cx: &Context<Self>,
1732                _message: (),
1733            ) -> Result<(), anyhow::Error> {
1734                let Self(port) = self;
1735                port.post(cx.self_id().rank());
1736                Ok(())
1737            }
1738        }
1739
1740        async fn validate_cast<A>(
1741            actor_mesh: &A,
1742            caps: &impl hyperactor::context::Actor,
1743            addr: ChannelAddr,
1744            selection: Selection,
1745        ) where
1746            A: ActorMesh<Actor = EchoActor>,
1747        {
1748            let config = hyperactor::config::global::lock();
1749            let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2);
1750
1751            let (_, mut rx) = serve::<usize>(addr).unwrap();
1752
1753            let expected_ranks = selection
1754                .eval(
1755                    &ndslice::selection::EvalOpts::strict(),
1756                    actor_mesh.shape().slice(),
1757                )
1758                .unwrap()
1759                .collect::<std::collections::BTreeSet<_>>();
1760
1761            actor_mesh.cast(caps, selection, ()).unwrap();
1762
1763            let mut received = std::collections::BTreeSet::new();
1764
1765            for _ in 0..(expected_ranks.len()) {
1766                received.insert(
1767                    RealClock
1768                        .timeout(tokio::time::Duration::from_secs(1), rx.recv())
1769                        .await
1770                        .unwrap()
1771                        .unwrap(),
1772                );
1773            }
1774
1775            assert_eq!(received, expected_ranks);
1776        }
1777
1778        use ndslice::strategy::gen_extent;
1779        use ndslice::strategy::gen_selection;
1780        use proptest::prelude::*;
1781        use proptest::test_runner::TestRunner;
1782
1783        fn make_tokio_runtime() -> tokio::runtime::Runtime {
1784            tokio::runtime::Builder::new_multi_thread()
1785                .enable_all()
1786                .worker_threads(2)
1787                .build()
1788                .unwrap()
1789        }
1790
1791        proptest! {
1792            #![proptest_config(ProptestConfig {
1793                cases: 8, ..ProptestConfig::default()
1794            })]
1795            #[test]
1796            fn test_reshaped_actor_mesh_cast(extent in gen_extent(1..=4, 8)) {
1797                let runtime = make_tokio_runtime();
1798                let alloc = runtime.block_on(LocalAllocator
1799                    .allocate(AllocSpec {
1800                        extent,
1801                        constraints: Default::default(),
1802                        proc_name: None,
1803                        transport: ChannelTransport::Local,
1804                        proc_allocation_mode: Default::default(),
1805                    }))
1806                    .unwrap();
1807                let instance = runtime.block_on(crate::v1::testing::instance());
1808                let proc_mesh = runtime.block_on(ProcMesh::allocate(alloc)).unwrap();
1809
1810                let addr = ChannelAddr::any(ChannelTransport::Unix);
1811
1812                let actor_mesh: RootActorMesh<EchoActor> =
1813                    runtime.block_on(proc_mesh.spawn(&instance, "echo", &addr)).unwrap();
1814
1815                let mut runner = TestRunner::default();
1816                let selection = gen_selection(4, actor_mesh.shape().slice().sizes().to_vec(), 0)
1817                    .new_tree(&mut runner)
1818                    .unwrap()
1819                    .current();
1820
1821                runtime.block_on(validate_cast(&actor_mesh, actor_mesh.proc_mesh().client(), addr, selection));
1822            }
1823        }
1824
1825        proptest! {
1826            #![proptest_config(ProptestConfig {
1827                cases: 8, ..ProptestConfig::default()
1828            })]
1829            #[test]
1830            fn test_reshaped_actor_mesh_slice_cast(extent in gen_extent(1..=4, 8)) {
1831                let runtime = make_tokio_runtime();
1832                let alloc = runtime.block_on(LocalAllocator
1833                    .allocate(AllocSpec {
1834                        extent: extent.clone(),
1835                        constraints: Default::default(),
1836                        proc_name: None,
1837                        transport: ChannelTransport::Local,
1838                        proc_allocation_mode: Default::default(),
1839                    }))
1840                    .unwrap();
1841                let instance = runtime.block_on(crate::v1::testing::instance());
1842                let proc_mesh = runtime.block_on(ProcMesh::allocate(alloc)).unwrap();
1843
1844                let addr = ChannelAddr::any(ChannelTransport::Unix);
1845
1846                let actor_mesh: RootActorMesh<EchoActor> =
1847                    runtime.block_on(proc_mesh.spawn(&instance, "echo", &addr)).unwrap();
1848
1849
1850                let first_label = extent.labels().first().unwrap();
1851                let slice = actor_mesh.select(first_label, 0..extent.size(first_label).unwrap()).unwrap();
1852
1853                // Unfortunately we must do things this way due to borrow checker reasons
1854                let slice = if extent.len() >= 2 {
1855                    let label = &extent.labels()[1];
1856                    let size = extent.size(label).unwrap();
1857                    let start = if size > 1 { 1 } else { 0 };
1858                    let end = (if size > 1 { size - 1 } else { 1 }).max(start + 1);
1859                    slice.select(label, start..end).unwrap()
1860                } else {
1861                    slice
1862                };
1863
1864                let slice = if extent.len() >= 3 {
1865                    let label = &extent.labels()[2];
1866                    let size = extent.size(label).unwrap();
1867                    let start = if size > 1 { 1 } else { 0 };
1868                    let end = (if size > 1 { size - 1 } else { 1 }).max(start + 1);
1869                    slice.select(label, start..end).unwrap()
1870                } else {
1871                    slice
1872                };
1873
1874                let slice = if extent.len() >= 4 {
1875                    let label = &extent.labels()[3];
1876                    let size = extent.size(label).unwrap();
1877                    let start = if size > 1 { 1 } else { 0 };
1878                    let end = (if size > 1 { size - 1 } else { 1 }).max(start + 1);
1879                    slice.select(label, start..end).unwrap()
1880                } else {
1881                    slice
1882                };
1883
1884
1885                let mut runner = TestRunner::default();
1886                let selection = gen_selection(4, slice.shape().slice().sizes().to_vec(), 0)
1887                    .new_tree(&mut runner)
1888                    .unwrap()
1889                    .current();
1890
1891                runtime.block_on(validate_cast(
1892                    &slice,
1893                    actor_mesh.proc_mesh().client(),
1894                    addr,
1895                    selection
1896                ));
1897            }
1898        }
1899
1900        proptest! {
1901            #![proptest_config(ProptestConfig {
1902                cases: 8, ..ProptestConfig::default()
1903            })]
1904             #[test]
1905             fn test_reshaped_actor_mesh_cast_with_selection(extent in gen_extent(1..=4, 8)) {
1906                let runtime = make_tokio_runtime();
1907                let alloc = runtime.block_on(LocalAllocator
1908                    .allocate(AllocSpec {
1909                        extent,
1910                        constraints: Default::default(),
1911                        proc_name: None,
1912                        transport: ChannelTransport::Local,
1913                        proc_allocation_mode: Default::default(),
1914                    }))
1915                    .unwrap();
1916                let instance = runtime.block_on(crate::v1::testing::instance());
1917                let proc_mesh = runtime.block_on(ProcMesh::allocate(alloc)).unwrap();
1918
1919                let addr = ChannelAddr::any(ChannelTransport::Unix);
1920
1921                let actor_mesh: RootActorMesh<EchoActor> =
1922                    runtime.block_on(proc_mesh.spawn(&instance, "echo", &addr)).unwrap();
1923
1924                let mut runner = TestRunner::default();
1925                let selection = gen_selection(4, actor_mesh.shape().slice().sizes().to_vec(), 0)
1926                    .new_tree(&mut runner)
1927                    .unwrap()
1928                    .current();
1929
1930                runtime.block_on(validate_cast(
1931                    &actor_mesh,
1932                    actor_mesh.proc_mesh().client(),
1933                    addr,
1934                    selection
1935                ));
1936            }
1937        }
1938    }
1939
1940    mod shim {
1941        use std::collections::HashSet;
1942
1943        use hyperactor::context::Mailbox;
1944        use ndslice::Extent;
1945        use ndslice::extent;
1946
1947        use super::*;
1948        use crate::sel;
1949
1950        #[tokio::test]
1951        #[cfg(fbcode_build)]
1952        async fn test_basic() {
1953            let instance = v1::testing::instance().await;
1954            let host_mesh = v1::testing::host_mesh(extent!(host = 4)).await;
1955            let proc_mesh = host_mesh
1956                .spawn(instance, "test", Extent::unity())
1957                .await
1958                .unwrap();
1959            let actor_mesh = proc_mesh
1960                .spawn::<v1::testactor::TestActor>(instance, "test", &())
1961                .await
1962                .unwrap();
1963
1964            let actor_mesh_v0: RootActorMesh<'_, _> = actor_mesh.clone().into();
1965
1966            let (cast_info, mut cast_info_rx) = instance.mailbox().open_port();
1967            actor_mesh_v0
1968                .cast(
1969                    instance,
1970                    sel!(*),
1971                    v1::testactor::GetCastInfo {
1972                        cast_info: cast_info.bind(),
1973                    },
1974                )
1975                .unwrap();
1976
1977            let mut point_to_actor: HashSet<_> = actor_mesh.iter().collect();
1978            while !point_to_actor.is_empty() {
1979                let (point, origin_actor_ref, sender_actor_id) = cast_info_rx.recv().await.unwrap();
1980                let key = (point, origin_actor_ref);
1981                assert!(
1982                    point_to_actor.remove(&key),
1983                    "key {:?} not present or removed twice",
1984                    key
1985                );
1986                assert_eq!(&sender_actor_id, instance.self_id());
1987            }
1988        }
1989    }
1990}