hyperactor_mesh/v1/
actor_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::fmt;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::marker::PhantomData;
13use std::ops::Deref;
14use std::sync::OnceLock as OnceCell;
15
16use hyperactor::ActorRef;
17use hyperactor::RemoteHandles;
18use hyperactor::RemoteMessage;
19use hyperactor::actor::Referable;
20use hyperactor::attrs::Attrs;
21use hyperactor::context;
22use hyperactor::message::Castable;
23use hyperactor::message::IndexedErasedUnbound;
24use hyperactor::message::Unbound;
25use hyperactor_mesh_macros::sel;
26use ndslice::Selection;
27use ndslice::ViewExt as _;
28use ndslice::view;
29use ndslice::view::Region;
30use ndslice::view::View;
31use serde::Deserialize;
32use serde::Deserializer;
33use serde::Serialize;
34use serde::Serializer;
35
36use crate::CommActor;
37use crate::actor_mesh as v0_actor_mesh;
38use crate::comm::multicast;
39use crate::proc_mesh::mesh_agent::ActorState;
40use crate::reference::ActorMeshId;
41use crate::resource;
42use crate::v1;
43use crate::v1::Error;
44use crate::v1::Name;
45use crate::v1::ProcMeshRef;
46use crate::v1::ValueMesh;
47
48/// An ActorMesh is a collection of ranked A-typed actors.
49///
50/// Bound note: `A: Referable` because the mesh stores/returns
51/// `ActorRef<A>`, which is only defined for `A: Referable`.
52#[derive(Debug)]
53pub struct ActorMesh<A: Referable> {
54    proc_mesh: ProcMeshRef,
55    name: Name,
56    current_ref: ActorMeshRef<A>,
57}
58
59// `A: Referable` for the same reason as the struct: the mesh holds
60// `ActorRef<A>`.
61impl<A: Referable> ActorMesh<A> {
62    pub(crate) fn new(proc_mesh: ProcMeshRef, name: Name) -> Self {
63        let current_ref =
64            ActorMeshRef::with_page_size(name.clone(), proc_mesh.clone(), DEFAULT_PAGE);
65
66        Self {
67            proc_mesh,
68            name,
69            current_ref,
70        }
71    }
72
73    pub fn name(&self) -> &Name {
74        &self.name
75    }
76
77    /// Detach this mesh from the lifetime of `self`, and return its reference.
78    pub(crate) fn detach(self) -> ActorMeshRef<A> {
79        self.current_ref.clone()
80    }
81
82    /// Stop actors on this mesh across all procs.
83    pub async fn stop(&self, cx: &impl context::Actor) -> v1::Result<()> {
84        self.proc_mesh()
85            .stop_actor_by_name(cx, self.name.clone())
86            .await
87    }
88}
89
90impl<A: Referable> fmt::Display for ActorMesh<A> {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        write!(f, "{}", self.current_ref)
93    }
94}
95
96impl<A: Referable> Deref for ActorMesh<A> {
97    type Target = ActorMeshRef<A>;
98
99    fn deref(&self) -> &Self::Target {
100        &self.current_ref
101    }
102}
103
104/// Manual implementation of Clone because `A` doesn't need to implement Clone
105/// but we still want to be able to clone the ActorMesh.
106impl<A: Referable> Clone for ActorMesh<A> {
107    fn clone(&self) -> Self {
108        Self {
109            proc_mesh: self.proc_mesh.clone(),
110            name: self.name.clone(),
111            current_ref: self.current_ref.clone(),
112        }
113    }
114}
115
116impl<A: Referable> Drop for ActorMesh<A> {
117    fn drop(&mut self) {
118        tracing::info!(
119            name = "ActorMeshStatus",
120            actor_name = %self.name,
121            status = "Dropped",
122        );
123    }
124}
125
126/// Influences paging behavior for the lazy cache. Smaller pages
127/// reduce over-allocation for sparse access; larger pages reduce the
128/// number of heap allocations for contiguous scans.
129const DEFAULT_PAGE: usize = 1024;
130
131/// A lazily materialized page of ActorRefs.
132struct Page<A: Referable> {
133    slots: Box<[OnceCell<ActorRef<A>>]>,
134}
135
136impl<A: Referable> Page<A> {
137    fn new(len: usize) -> Self {
138        let mut v = Vec::with_capacity(len);
139        for _ in 0..len {
140            v.push(OnceCell::new());
141        }
142        Self {
143            slots: v.into_boxed_slice(),
144        }
145    }
146}
147
148/// A reference to a stable snapshot of an [`ActorMesh`].
149pub struct ActorMeshRef<A: Referable> {
150    proc_mesh: ProcMeshRef,
151    name: Name,
152
153    /// Lazily allocated collection of pages:
154    /// - The outer `OnceCell` defers creating the vector until first
155    ///   use.
156    /// - The `Vec` holds slots for multiple pages.
157    /// - Each slot is itself a `OnceCell<Box<Page<A>>>`, so that each
158    ///   page can be initialized on demand.
159    /// - A `Page<A>` is a boxed slice of `OnceCell<ActorRef<A>>`,
160    ///   i.e. the actual storage for actor references within that
161    ///   page.
162    pages: OnceCell<Vec<OnceCell<Box<Page<A>>>>>,
163    // Page size knob (not serialize; defaults after deserialize).
164    page_size: usize,
165
166    _phantom: PhantomData<A>,
167}
168
169impl<A: Referable> ActorMeshRef<A> {
170    /// Cast a message to all the actors in this mesh
171    #[allow(clippy::result_large_err)]
172    pub fn cast<M>(&self, cx: &impl context::Actor, message: M) -> v1::Result<()>
173    where
174        A: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
175        M: Castable + RemoteMessage + Clone, // Clone is required until we are fully onto comm actor
176    {
177        self.cast_with_selection(cx, sel!(*), message)
178    }
179
180    /// Cast a message to the actors in this mesh according to the provided selection.
181    /// This should *only* be used for temporary support for selections in the tensor
182    /// engine. If you use this for anything else, you will be fired (you too, OSS
183    /// contributor).
184    #[allow(clippy::result_large_err)]
185    pub(crate) fn cast_for_tensor_engine_only_do_not_use<M>(
186        &self,
187        cx: &impl context::Actor,
188        sel: Selection,
189        message: M,
190    ) -> v1::Result<()>
191    where
192        A: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
193        M: Castable + RemoteMessage + Clone, // Clone is required until we are fully onto comm actor
194    {
195        self.cast_with_selection(cx, sel, message)
196    }
197
198    #[allow(clippy::result_large_err)]
199    fn cast_with_selection<M>(
200        &self,
201        cx: &impl context::Actor,
202        sel: Selection,
203        message: M,
204    ) -> v1::Result<()>
205    where
206        A: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
207        M: Castable + RemoteMessage + Clone, // Clone is required until we are fully onto comm actor
208    {
209        if let Some(root_comm_actor) = self.proc_mesh.root_comm_actor() {
210            self.cast_v0(cx, message, sel, root_comm_actor)
211        } else {
212            for (point, actor) in self.iter() {
213                let create_rank = point.rank();
214                let mut headers = Attrs::new();
215                headers.set(
216                    multicast::CAST_ORIGINATING_SENDER,
217                    cx.instance().self_id().clone(),
218                );
219                headers.set(multicast::CAST_POINT, point);
220
221                // Make sure that we re-bind ranks, as these may be used for
222                // bootstrapping comm actors.
223                let mut unbound = Unbound::try_from_message(message.clone())
224                    .map_err(|e| Error::CastingError(self.name.clone(), e))?;
225                unbound
226                    .visit_mut::<resource::Rank>(|resource::Rank(rank)| {
227                        *rank = Some(create_rank);
228                        Ok(())
229                    })
230                    .map_err(|e| Error::CastingError(self.name.clone(), e))?;
231                let rebound_message = unbound
232                    .bind()
233                    .map_err(|e| Error::CastingError(self.name.clone(), e))?;
234                actor
235                    .send_with_headers(cx, headers, rebound_message)
236                    .map_err(|e| Error::SendingError(actor.actor_id().clone(), Box::new(e)))?;
237            }
238            Ok(())
239        }
240    }
241
242    #[allow(clippy::result_large_err)]
243    fn cast_v0<M>(
244        &self,
245        cx: &impl context::Actor,
246        message: M,
247        sel: Selection,
248        root_comm_actor: &ActorRef<CommActor>,
249    ) -> v1::Result<()>
250    where
251        A: RemoteHandles<IndexedErasedUnbound<M>>,
252        M: Castable + RemoteMessage + Clone, // Clone is required until we are fully onto comm actor
253    {
254        let cast_mesh_shape = view::Ranked::region(self).into();
255        let actor_mesh_id = ActorMeshId::V1(self.name.clone());
256        match &self.proc_mesh.root_region {
257            Some(root_region) => {
258                let root_mesh_shape = root_region.into();
259                v0_actor_mesh::cast_to_sliced_mesh::<A, M>(
260                    cx,
261                    actor_mesh_id,
262                    root_comm_actor,
263                    &sel,
264                    message,
265                    &cast_mesh_shape,
266                    &root_mesh_shape,
267                )
268                .map_err(|e| Error::CastingError(self.name.clone(), e.into()))
269            }
270            None => v0_actor_mesh::actor_mesh_cast::<A, M>(
271                cx,
272                actor_mesh_id,
273                root_comm_actor,
274                sel,
275                &cast_mesh_shape,
276                &cast_mesh_shape,
277                message,
278            )
279            .map_err(|e| Error::CastingError(self.name.clone(), e.into())),
280        }
281    }
282
283    #[allow(clippy::result_large_err)]
284    pub async fn actor_states(
285        &self,
286        cx: &impl context::Actor,
287    ) -> v1::Result<ValueMesh<resource::State<ActorState>>> {
288        self.proc_mesh.actor_states(cx, self.name.clone()).await
289    }
290}
291
292impl<A: Referable> ActorMeshRef<A> {
293    pub(crate) fn new(name: Name, proc_mesh: ProcMeshRef) -> Self {
294        Self::with_page_size(name, proc_mesh, DEFAULT_PAGE)
295    }
296
297    pub fn name(&self) -> &Name {
298        &self.name
299    }
300
301    pub(crate) fn with_page_size(name: Name, proc_mesh: ProcMeshRef, page_size: usize) -> Self {
302        Self {
303            proc_mesh,
304            name,
305            pages: OnceCell::new(),
306            page_size: page_size.max(1),
307            _phantom: PhantomData,
308        }
309    }
310
311    pub fn proc_mesh(&self) -> &ProcMeshRef {
312        &self.proc_mesh
313    }
314
315    #[inline]
316    fn len(&self) -> usize {
317        view::Ranked::region(&self.proc_mesh).num_ranks()
318    }
319
320    fn ensure_pages(&self) -> &Vec<OnceCell<Box<Page<A>>>> {
321        let n = self.len().div_ceil(self.page_size); // ⌈len / page_size⌉
322        self.pages
323            .get_or_init(|| (0..n).map(|_| OnceCell::new()).collect())
324    }
325
326    fn materialize(&self, rank: usize) -> Option<&ActorRef<A>> {
327        let len = self.len();
328        if rank >= len {
329            return None;
330        }
331        let p = self.page_size;
332        let page_ix = rank / p;
333        let local_ix = rank % p;
334
335        let pages = self.ensure_pages();
336        let page = pages[page_ix].get_or_init(|| {
337            // Last page may be partial.
338            let base = page_ix * p;
339            let remaining = len - base;
340            let page_len = remaining.min(p);
341            Box::new(Page::<A>::new(page_len))
342        });
343
344        Some(page.slots[local_ix].get_or_init(|| {
345            // Invariant: `proc_mesh` and this view share the same
346            // dense rank space:
347            //   - ranks are contiguous [0, self.len()) with no gaps
348            //     or reordering
349            //   - for every rank r, `proc_mesh.get(r)` is Some(..)
350            // Therefore we can index `proc_mesh` with `rank`
351            // directly.
352            debug_assert!(rank < self.len(), "rank must be within [0, len)");
353            debug_assert!(
354                self.proc_mesh.get(rank).is_some(),
355                "proc_mesh must be dense/aligned with this view"
356            );
357            let proc_ref = self.proc_mesh.get(rank).expect("rank in-bounds");
358            proc_ref.attest(&self.name)
359        }))
360    }
361}
362
363impl<A: Referable> Clone for ActorMeshRef<A> {
364    fn clone(&self) -> Self {
365        Self {
366            proc_mesh: self.proc_mesh.clone(),
367            name: self.name.clone(),
368            pages: OnceCell::new(), // No clone cache.
369            page_size: self.page_size,
370            _phantom: PhantomData,
371        }
372    }
373}
374
375impl<A: Referable> fmt::Display for ActorMeshRef<A> {
376    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377        write!(f, "{}:{}@{}", self.name, A::typename(), self.proc_mesh)
378    }
379}
380
381impl<A: Referable> PartialEq for ActorMeshRef<A> {
382    fn eq(&self, other: &Self) -> bool {
383        self.proc_mesh == other.proc_mesh && self.name == other.name
384    }
385}
386impl<A: Referable> Eq for ActorMeshRef<A> {}
387
388impl<A: Referable> Hash for ActorMeshRef<A> {
389    fn hash<H: Hasher>(&self, state: &mut H) {
390        self.proc_mesh.hash(state);
391        self.name.hash(state);
392    }
393}
394
395impl<A: Referable> fmt::Debug for ActorMeshRef<A> {
396    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397        f.debug_struct("ActorMeshRef")
398            .field("proc_mesh", &self.proc_mesh)
399            .field("name", &self.name)
400            .field("page_size", &self.page_size)
401            .finish_non_exhaustive() // No print cache.
402    }
403}
404
405// Implement Serialize manually, without requiring A: Serialize
406impl<A: Referable> Serialize for ActorMeshRef<A> {
407    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
408    where
409        S: Serializer,
410    {
411        // Serialize only the fields that don't depend on A
412        (&self.proc_mesh, &self.name).serialize(serializer)
413    }
414}
415
416// Implement Deserialize manually, without requiring A: Deserialize
417impl<'de, A: Referable> Deserialize<'de> for ActorMeshRef<A> {
418    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
419    where
420        D: Deserializer<'de>,
421    {
422        let (proc_mesh, name) = <(ProcMeshRef, Name)>::deserialize(deserializer)?;
423        Ok(ActorMeshRef::with_page_size(name, proc_mesh, DEFAULT_PAGE))
424    }
425}
426
427impl<A: Referable> view::Ranked for ActorMeshRef<A> {
428    type Item = ActorRef<A>;
429
430    #[inline]
431    fn region(&self) -> &Region {
432        view::Ranked::region(&self.proc_mesh)
433    }
434
435    #[inline]
436    fn get(&self, rank: usize) -> Option<&Self::Item> {
437        self.materialize(rank)
438    }
439}
440
441impl<A: Referable> view::RankedSliceable for ActorMeshRef<A> {
442    fn sliced(&self, region: Region) -> Self {
443        debug_assert!(region.is_subset(view::Ranked::region(self)));
444        let proc_mesh = self.proc_mesh.subset(region).unwrap();
445        Self::with_page_size(self.name.clone(), proc_mesh, self.page_size)
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use std::assert_matches::assert_matches;
452    use std::collections::HashSet;
453
454    use hyperactor::actor::ActorStatus;
455    use hyperactor::clock::Clock;
456    use hyperactor::clock::RealClock;
457    use hyperactor::context::Mailbox as _;
458    use hyperactor::mailbox;
459    use ndslice::Extent;
460    use ndslice::ViewExt;
461    use ndslice::extent;
462    use ndslice::view::Ranked;
463    use timed_test::async_timed_test;
464    use tokio::time::Duration;
465
466    use super::ActorMesh;
467    use crate::proc_mesh::mesh_agent::ActorState;
468    use crate::resource;
469    use crate::v1::ActorMeshRef;
470    use crate::v1::Name;
471    use crate::v1::ProcMesh;
472    use crate::v1::proc_mesh::GET_ACTOR_STATE_MAX_IDLE;
473    use crate::v1::testactor;
474    use crate::v1::testing;
475
476    #[tokio::test]
477    #[cfg(fbcode_build)]
478    async fn test_actor_mesh_ref_lazy_materialization() {
479        // 1) Bring up procs and spawn actors.
480        let instance = testing::instance().await;
481        // Small mesh so the test runs fast, but > page_size so we
482        // cross a boundary
483        let extent = extent!(replicas = 3, hosts = 2); // 6 ranks
484        let pm: ProcMesh = testing::proc_meshes(instance, extent.clone())
485            .await
486            .into_iter()
487            .next()
488            .expect("at least one proc mesh");
489        let am: ActorMesh<testactor::TestActor> = pm.spawn(instance, "test", &()).await.unwrap();
490
491        // 2) Build our ActorMeshRef with a tiny page size (2) to
492        // force multiple pages:
493        // page 0: ranks [0,1], page 1: [2,3], page 2: [4,5]
494        let page_size = 2;
495        let amr: ActorMeshRef<testactor::TestActor> =
496            ActorMeshRef::with_page_size(am.name.clone(), pm.clone(), page_size);
497        assert_eq!(amr.extent(), extent);
498        assert_eq!(amr.region().num_ranks(), 6);
499
500        // 3) Within-rank pointer stability (OnceLock caches &ActorRef)
501        let p0_a = amr.get(0).expect("rank 0 exists") as *const _;
502        let p0_b = amr.get(0).expect("rank 0 exists") as *const _;
503        assert_eq!(p0_a, p0_b, "same rank should return same cached pointer");
504
505        // 4) Same page, different rank (both materialize fine)
506        let p1_a = amr.get(1).expect("rank 1 exists") as *const _;
507        let p1_b = amr.get(1).expect("rank 1 exists") as *const _;
508        assert_eq!(p1_a, p1_b, "same rank should return same cached pointer");
509        // They're different ranks, so the pointers are different
510        // (distinct OnceLocks in the page)
511        assert_ne!(p0_a, p1_a, "different ranks have different cache slots");
512
513        // 5) Cross a page boundary (rank 2 is in a different page than rank 0/1)
514        let p2_a = amr.get(2).expect("rank 2 exists") as *const _;
515        let p2_b = amr.get(2).expect("rank 2 exists") as *const _;
516        assert_eq!(p2_a, p2_b, "same rank should return same cached pointer");
517        assert_ne!(p0_a, p2_a, "different pages have different cache slots");
518
519        // 6) Clone should drop the cache but keep identity (actor_id)
520        let amr_clone = amr.clone();
521        let orig_id_0 = amr.get(0).unwrap().actor_id().clone();
522        let clone_id_0 = amr_clone.get(0).unwrap().actor_id().clone();
523        assert_eq!(orig_id_0, clone_id_0, "clone preserves identity");
524        let p0_clone = amr_clone.get(0).unwrap() as *const _;
525        assert_ne!(
526            p0_a, p0_clone,
527            "cloned ActorMeshRef has a fresh cache (different pointer)"
528        );
529
530        // 7) Slicing preserves page_size and clears cache
531        // (RankedSliceable::sliced)
532        let sliced = amr.range("replicas", 1..).expect("slice should be valid"); // leaves 4 ranks
533        assert_eq!(sliced.region().num_ranks(), 4);
534        // First access materializes a new cache for the sliced view.
535        let sp0_a = sliced.get(0).unwrap() as *const _;
536        let sp0_b = sliced.get(0).unwrap() as *const _;
537        assert_eq!(sp0_a, sp0_b, "sliced view has its own cache slot per rank");
538        // Cross-page inside the slice too (page_size = 2 => pages are
539        // [0..2), [2..4)).
540        let sp2 = sliced.get(2).unwrap() as *const _;
541        assert_ne!(sp0_a, sp2, "sliced view crosses its own page boundary");
542
543        // 8) Hash/Eq ignore cache state; identical identity collapses
544        // to one set entry.
545        let mut set = HashSet::new();
546        set.insert(amr.clone());
547        set.insert(amr.clone());
548        assert_eq!(set.len(), 1, "cache state must not affect Hash/Eq");
549
550        // 9) As a sanity check, cast to ensure the refs are indeed
551        // usable/live.
552        let (port, mut rx) = mailbox::open_port(instance);
553        // Send to rank 0 and rank 3 (extent 3x2 => at least 4 ranks
554        // exist).
555        amr.get(0)
556            .expect("rank 0 exists")
557            .send(instance, testactor::GetActorId(port.bind()))
558            .expect("send to rank 0 should succeed");
559        amr.get(3)
560            .expect("rank 3 exists")
561            .send(instance, testactor::GetActorId(port.bind()))
562            .expect("send to rank 3 should succeed");
563        let id_a = RealClock
564            .timeout(Duration::from_secs(3), rx.recv())
565            .await
566            .expect("timed out waiting for first reply")
567            .expect("channel closed before first reply");
568        let id_b = RealClock
569            .timeout(Duration::from_secs(3), rx.recv())
570            .await
571            .expect("timed out waiting for second reply")
572            .expect("channel closed before second reply");
573        assert_ne!(id_a, id_b, "two different ranks responded");
574    }
575
576    #[async_timed_test(timeout_secs = 30)]
577    #[cfg(fbcode_build)]
578    async fn test_actor_states_with_panic() {
579        hyperactor_telemetry::initialize_logging_for_test();
580
581        let instance = testing::instance().await;
582        // Listen for supervision events sent to the parent instance.
583        let (supervision_port, mut supervision_receiver) =
584            instance.open_port::<resource::State<ActorState>>();
585        let supervisor = supervision_port.bind();
586        let num_replicas = 4;
587        let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
588        let proc_mesh = &meshes[1];
589        let child_name = Name::new("child");
590
591        let actor_mesh = proc_mesh
592            .spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
593            .await
594            .unwrap();
595
596        actor_mesh
597            .cast(
598                instance,
599                testactor::CauseSupervisionEvent(testactor::SupervisionEventType::Panic),
600            )
601            .unwrap();
602
603        // Wait for the casted message to cause a panic on all actors.
604        // We can't use a reply port because the handler for the message will
605        // by definition not complete and send a reply.
606        #[allow(clippy::disallowed_methods)]
607        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
608
609        // Now that all ranks have completed, set up a continuous poll of the
610        // status such that when a process switches to unhealthy it sets a
611        // supervision event.
612        let supervision_task = tokio::spawn(async move {
613            let events = actor_mesh.actor_states(&instance).await.unwrap();
614            for state in events.values() {
615                supervisor.send(instance, state.clone()).unwrap();
616            }
617        });
618        // Make sure the task completes first without a panic.
619        supervision_task.await.unwrap();
620
621        for _ in 0..num_replicas {
622            let state = RealClock
623                .timeout(Duration::from_secs(10), supervision_receiver.recv())
624                .await
625                .expect("timeout")
626                .unwrap();
627            if let resource::Status::Failed(s) = state.status {
628                assert!(s.contains("supervision events"));
629            } else {
630                panic!("Not failed: {:?}", state.status);
631            }
632            if let Some(ref inner) = state.state {
633                assert!(!inner.supervision_events.is_empty());
634                for event in &inner.supervision_events {
635                    println!("receiving event: {:?}", event);
636                    assert_eq!(event.actor_id.name(), format!("{}", child_name.clone()));
637                    assert_matches!(event.actor_status, ActorStatus::Failed(_));
638                }
639            }
640        }
641    }
642
643    #[async_timed_test(timeout_secs = 30)]
644    #[cfg(fbcode_build)]
645    async fn test_actor_states_with_process_exit() {
646        hyperactor_telemetry::initialize_logging_for_test();
647
648        let config = hyperactor::config::global::lock();
649        let _guard = config.override_key(GET_ACTOR_STATE_MAX_IDLE, Duration::from_secs(1));
650
651        let instance = testing::instance().await;
652        // Listen for supervision events sent to the parent instance.
653        let (supervision_port, mut supervision_receiver) =
654            instance.open_port::<resource::State<ActorState>>();
655        let supervisor = supervision_port.bind();
656        let num_replicas = 4;
657        let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
658        let proc_mesh = &meshes[1];
659        let child_name = Name::new("child");
660
661        let actor_mesh = proc_mesh
662            .spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
663            .await
664            .unwrap();
665
666        actor_mesh
667            .cast(
668                instance,
669                testactor::CauseSupervisionEvent(testactor::SupervisionEventType::ProcessExit(1)),
670            )
671            .unwrap();
672
673        // Wait for the casted message to cause a process exit on all actors.
674        // We can't use a reply port because the handler for the message will
675        // by definition not complete and send a reply.
676        #[allow(clippy::disallowed_methods)]
677        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
678
679        // Now that all ranks have completed, set up a continuous poll of the
680        // status such that when a process switches to unhealthy it sets a
681        // supervision event.
682        let supervision_task = tokio::spawn(async move {
683            let events = actor_mesh.actor_states(&instance).await.unwrap();
684            for state in events.values() {
685                supervisor.send(instance, state.clone()).unwrap();
686            }
687        });
688        // Make sure the task completes first without a panic.
689        RealClock
690            .timeout(Duration::from_secs(10), supervision_task)
691            .await
692            .expect("timeout")
693            .unwrap();
694
695        for _ in 0..num_replicas {
696            let state = RealClock
697                .timeout(Duration::from_secs(10), supervision_receiver.recv())
698                .await
699                .expect("timeout")
700                .unwrap();
701            assert_matches!(state.status, resource::Status::Timeout(_));
702            let events = state
703                .state
704                .expect("state should be present")
705                .supervision_events;
706            assert_eq!(events.len(), 1);
707            assert_eq!(events[0].actor_status, ActorStatus::Stopped);
708        }
709    }
710
711    #[async_timed_test(timeout_secs = 30)]
712    #[cfg(fbcode_build)]
713    async fn test_actor_states_on_sliced_mesh() {
714        hyperactor_telemetry::initialize_logging_for_test();
715
716        let instance = testing::instance().await;
717        // Listen for supervision events sent to the parent instance.
718        let (supervision_port, mut supervision_receiver) =
719            instance.open_port::<resource::State<ActorState>>();
720        let supervisor = supervision_port.bind();
721        let num_replicas = 4;
722        let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
723        let proc_mesh = &meshes[1];
724        let child_name = Name::new("child");
725
726        let actor_mesh = proc_mesh
727            .spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
728            .await
729            .unwrap();
730        let sliced = actor_mesh
731            .range("replicas", 1..3)
732            .expect("slice should be valid");
733        let sliced_replicas = sliced.len();
734
735        sliced
736            .cast(
737                instance,
738                testactor::CauseSupervisionEvent(testactor::SupervisionEventType::Panic),
739            )
740            .unwrap();
741
742        // Wait for the casted message to cause a process exit on all actors.
743        // We can't use a reply port because the handler for the message will
744        // by definition not complete and send a reply.
745        #[allow(clippy::disallowed_methods)]
746        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
747
748        // Now that all ranks have completed, set up a continuous poll of the
749        // status such that when a process switches to unhealthy it sets a
750        // supervision event.
751        let supervision_task = tokio::spawn(async move {
752            let events = sliced.actor_states(&instance).await.unwrap();
753            for state in events.values() {
754                supervisor.send(instance, state.clone()).unwrap();
755            }
756        });
757        // Make sure the task completes first without a panic.
758        RealClock
759            .timeout(Duration::from_secs(10), supervision_task)
760            .await
761            .expect("timeout")
762            .unwrap();
763
764        for _ in 0..sliced_replicas {
765            let state = RealClock
766                .timeout(Duration::from_secs(10), supervision_receiver.recv())
767                .await
768                .expect("timeout")
769                .unwrap();
770            if let resource::Status::Failed(s) = state.status {
771                assert!(s.contains("supervision events"));
772            } else {
773                panic!("Not failed: {:?}", state.status);
774            }
775            if let Some(ref inner) = state.state {
776                assert!(!inner.supervision_events.is_empty());
777                for event in &inner.supervision_events {
778                    assert_eq!(event.actor_id.name(), format!("{}", child_name.clone()));
779                    assert_matches!(event.actor_status, ActorStatus::Failed(_));
780                }
781            }
782        }
783    }
784
785    #[async_timed_test(timeout_secs = 30)]
786    #[cfg(fbcode_build)]
787    async fn test_cast() {
788        let config = hyperactor::config::global::lock();
789        let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
790
791        let instance = testing::instance().await;
792        let host_mesh = testing::host_mesh(extent!(host = 4)).await;
793        let proc_mesh = host_mesh
794            .spawn(instance, "test", Extent::unity())
795            .await
796            .unwrap();
797        let actor_mesh = proc_mesh
798            .spawn::<testactor::TestActor>(instance, "test", &())
799            .await
800            .unwrap();
801
802        let (cast_info, mut cast_info_rx) = instance.mailbox().open_port();
803        actor_mesh
804            .cast(
805                instance,
806                testactor::GetCastInfo {
807                    cast_info: cast_info.bind(),
808                },
809            )
810            .unwrap();
811
812        let mut point_to_actor: HashSet<_> = actor_mesh.iter().collect();
813        while !point_to_actor.is_empty() {
814            let (point, origin_actor_ref, sender_actor_id) = cast_info_rx.recv().await.unwrap();
815            let key = (point, origin_actor_ref);
816            assert!(
817                point_to_actor.remove(&key),
818                "key {:?} not present or removed twice",
819                key
820            );
821            assert_eq!(&sender_actor_id, instance.self_id());
822        }
823
824        let _ = host_mesh.shutdown(&instance).await;
825    }
826}