1use std::fmt;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::marker::PhantomData;
13use std::ops::Deref;
14use std::sync::OnceLock as OnceCell;
15
16use hyperactor::ActorRef;
17use hyperactor::RemoteHandles;
18use hyperactor::RemoteMessage;
19use hyperactor::actor::Referable;
20use hyperactor::attrs::Attrs;
21use hyperactor::context;
22use hyperactor::message::Castable;
23use hyperactor::message::IndexedErasedUnbound;
24use hyperactor::message::Unbound;
25use hyperactor_mesh_macros::sel;
26use ndslice::Selection;
27use ndslice::ViewExt as _;
28use ndslice::view;
29use ndslice::view::Region;
30use ndslice::view::View;
31use serde::Deserialize;
32use serde::Deserializer;
33use serde::Serialize;
34use serde::Serializer;
35
36use crate::CommActor;
37use crate::actor_mesh as v0_actor_mesh;
38use crate::comm::multicast;
39use crate::proc_mesh::mesh_agent::ActorState;
40use crate::reference::ActorMeshId;
41use crate::resource;
42use crate::v1;
43use crate::v1::Error;
44use crate::v1::Name;
45use crate::v1::ProcMeshRef;
46use crate::v1::ValueMesh;
47
48#[derive(Debug)]
53pub struct ActorMesh<A: Referable> {
54 proc_mesh: ProcMeshRef,
55 name: Name,
56 current_ref: ActorMeshRef<A>,
57}
58
59impl<A: Referable> ActorMesh<A> {
62 pub(crate) fn new(proc_mesh: ProcMeshRef, name: Name) -> Self {
63 let current_ref =
64 ActorMeshRef::with_page_size(name.clone(), proc_mesh.clone(), DEFAULT_PAGE);
65
66 Self {
67 proc_mesh,
68 name,
69 current_ref,
70 }
71 }
72
73 pub fn name(&self) -> &Name {
74 &self.name
75 }
76
77 pub(crate) fn detach(self) -> ActorMeshRef<A> {
79 self.current_ref.clone()
80 }
81
82 pub async fn stop(&self, cx: &impl context::Actor) -> v1::Result<()> {
84 self.proc_mesh()
85 .stop_actor_by_name(cx, self.name.clone())
86 .await
87 }
88}
89
90impl<A: Referable> fmt::Display for ActorMesh<A> {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 write!(f, "{}", self.current_ref)
93 }
94}
95
96impl<A: Referable> Deref for ActorMesh<A> {
97 type Target = ActorMeshRef<A>;
98
99 fn deref(&self) -> &Self::Target {
100 &self.current_ref
101 }
102}
103
104impl<A: Referable> Clone for ActorMesh<A> {
107 fn clone(&self) -> Self {
108 Self {
109 proc_mesh: self.proc_mesh.clone(),
110 name: self.name.clone(),
111 current_ref: self.current_ref.clone(),
112 }
113 }
114}
115
116impl<A: Referable> Drop for ActorMesh<A> {
117 fn drop(&mut self) {
118 tracing::info!(
119 name = "ActorMeshStatus",
120 actor_name = %self.name,
121 status = "Dropped",
122 );
123 }
124}
125
126const DEFAULT_PAGE: usize = 1024;
130
131struct Page<A: Referable> {
133 slots: Box<[OnceCell<ActorRef<A>>]>,
134}
135
136impl<A: Referable> Page<A> {
137 fn new(len: usize) -> Self {
138 let mut v = Vec::with_capacity(len);
139 for _ in 0..len {
140 v.push(OnceCell::new());
141 }
142 Self {
143 slots: v.into_boxed_slice(),
144 }
145 }
146}
147
148pub struct ActorMeshRef<A: Referable> {
150 proc_mesh: ProcMeshRef,
151 name: Name,
152
153 pages: OnceCell<Vec<OnceCell<Box<Page<A>>>>>,
163 page_size: usize,
165
166 _phantom: PhantomData<A>,
167}
168
169impl<A: Referable> ActorMeshRef<A> {
170 #[allow(clippy::result_large_err)]
172 pub fn cast<M>(&self, cx: &impl context::Actor, message: M) -> v1::Result<()>
173 where
174 A: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
175 M: Castable + RemoteMessage + Clone, {
177 self.cast_with_selection(cx, sel!(*), message)
178 }
179
180 #[allow(clippy::result_large_err)]
185 pub(crate) fn cast_for_tensor_engine_only_do_not_use<M>(
186 &self,
187 cx: &impl context::Actor,
188 sel: Selection,
189 message: M,
190 ) -> v1::Result<()>
191 where
192 A: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
193 M: Castable + RemoteMessage + Clone, {
195 self.cast_with_selection(cx, sel, message)
196 }
197
198 #[allow(clippy::result_large_err)]
199 fn cast_with_selection<M>(
200 &self,
201 cx: &impl context::Actor,
202 sel: Selection,
203 message: M,
204 ) -> v1::Result<()>
205 where
206 A: RemoteHandles<M> + RemoteHandles<IndexedErasedUnbound<M>>,
207 M: Castable + RemoteMessage + Clone, {
209 if let Some(root_comm_actor) = self.proc_mesh.root_comm_actor() {
210 self.cast_v0(cx, message, sel, root_comm_actor)
211 } else {
212 for (point, actor) in self.iter() {
213 let create_rank = point.rank();
214 let mut headers = Attrs::new();
215 headers.set(
216 multicast::CAST_ORIGINATING_SENDER,
217 cx.instance().self_id().clone(),
218 );
219 headers.set(multicast::CAST_POINT, point);
220
221 let mut unbound = Unbound::try_from_message(message.clone())
224 .map_err(|e| Error::CastingError(self.name.clone(), e))?;
225 unbound
226 .visit_mut::<resource::Rank>(|resource::Rank(rank)| {
227 *rank = Some(create_rank);
228 Ok(())
229 })
230 .map_err(|e| Error::CastingError(self.name.clone(), e))?;
231 let rebound_message = unbound
232 .bind()
233 .map_err(|e| Error::CastingError(self.name.clone(), e))?;
234 actor
235 .send_with_headers(cx, headers, rebound_message)
236 .map_err(|e| Error::SendingError(actor.actor_id().clone(), Box::new(e)))?;
237 }
238 Ok(())
239 }
240 }
241
242 #[allow(clippy::result_large_err)]
243 fn cast_v0<M>(
244 &self,
245 cx: &impl context::Actor,
246 message: M,
247 sel: Selection,
248 root_comm_actor: &ActorRef<CommActor>,
249 ) -> v1::Result<()>
250 where
251 A: RemoteHandles<IndexedErasedUnbound<M>>,
252 M: Castable + RemoteMessage + Clone, {
254 let cast_mesh_shape = view::Ranked::region(self).into();
255 let actor_mesh_id = ActorMeshId::V1(self.name.clone());
256 match &self.proc_mesh.root_region {
257 Some(root_region) => {
258 let root_mesh_shape = root_region.into();
259 v0_actor_mesh::cast_to_sliced_mesh::<A, M>(
260 cx,
261 actor_mesh_id,
262 root_comm_actor,
263 &sel,
264 message,
265 &cast_mesh_shape,
266 &root_mesh_shape,
267 )
268 .map_err(|e| Error::CastingError(self.name.clone(), e.into()))
269 }
270 None => v0_actor_mesh::actor_mesh_cast::<A, M>(
271 cx,
272 actor_mesh_id,
273 root_comm_actor,
274 sel,
275 &cast_mesh_shape,
276 &cast_mesh_shape,
277 message,
278 )
279 .map_err(|e| Error::CastingError(self.name.clone(), e.into())),
280 }
281 }
282
283 #[allow(clippy::result_large_err)]
284 pub async fn actor_states(
285 &self,
286 cx: &impl context::Actor,
287 ) -> v1::Result<ValueMesh<resource::State<ActorState>>> {
288 self.proc_mesh.actor_states(cx, self.name.clone()).await
289 }
290}
291
292impl<A: Referable> ActorMeshRef<A> {
293 pub(crate) fn new(name: Name, proc_mesh: ProcMeshRef) -> Self {
294 Self::with_page_size(name, proc_mesh, DEFAULT_PAGE)
295 }
296
297 pub fn name(&self) -> &Name {
298 &self.name
299 }
300
301 pub(crate) fn with_page_size(name: Name, proc_mesh: ProcMeshRef, page_size: usize) -> Self {
302 Self {
303 proc_mesh,
304 name,
305 pages: OnceCell::new(),
306 page_size: page_size.max(1),
307 _phantom: PhantomData,
308 }
309 }
310
311 pub fn proc_mesh(&self) -> &ProcMeshRef {
312 &self.proc_mesh
313 }
314
315 #[inline]
316 fn len(&self) -> usize {
317 view::Ranked::region(&self.proc_mesh).num_ranks()
318 }
319
320 fn ensure_pages(&self) -> &Vec<OnceCell<Box<Page<A>>>> {
321 let n = self.len().div_ceil(self.page_size); self.pages
323 .get_or_init(|| (0..n).map(|_| OnceCell::new()).collect())
324 }
325
326 fn materialize(&self, rank: usize) -> Option<&ActorRef<A>> {
327 let len = self.len();
328 if rank >= len {
329 return None;
330 }
331 let p = self.page_size;
332 let page_ix = rank / p;
333 let local_ix = rank % p;
334
335 let pages = self.ensure_pages();
336 let page = pages[page_ix].get_or_init(|| {
337 let base = page_ix * p;
339 let remaining = len - base;
340 let page_len = remaining.min(p);
341 Box::new(Page::<A>::new(page_len))
342 });
343
344 Some(page.slots[local_ix].get_or_init(|| {
345 debug_assert!(rank < self.len(), "rank must be within [0, len)");
353 debug_assert!(
354 self.proc_mesh.get(rank).is_some(),
355 "proc_mesh must be dense/aligned with this view"
356 );
357 let proc_ref = self.proc_mesh.get(rank).expect("rank in-bounds");
358 proc_ref.attest(&self.name)
359 }))
360 }
361}
362
363impl<A: Referable> Clone for ActorMeshRef<A> {
364 fn clone(&self) -> Self {
365 Self {
366 proc_mesh: self.proc_mesh.clone(),
367 name: self.name.clone(),
368 pages: OnceCell::new(), page_size: self.page_size,
370 _phantom: PhantomData,
371 }
372 }
373}
374
375impl<A: Referable> fmt::Display for ActorMeshRef<A> {
376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 write!(f, "{}:{}@{}", self.name, A::typename(), self.proc_mesh)
378 }
379}
380
381impl<A: Referable> PartialEq for ActorMeshRef<A> {
382 fn eq(&self, other: &Self) -> bool {
383 self.proc_mesh == other.proc_mesh && self.name == other.name
384 }
385}
386impl<A: Referable> Eq for ActorMeshRef<A> {}
387
388impl<A: Referable> Hash for ActorMeshRef<A> {
389 fn hash<H: Hasher>(&self, state: &mut H) {
390 self.proc_mesh.hash(state);
391 self.name.hash(state);
392 }
393}
394
395impl<A: Referable> fmt::Debug for ActorMeshRef<A> {
396 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397 f.debug_struct("ActorMeshRef")
398 .field("proc_mesh", &self.proc_mesh)
399 .field("name", &self.name)
400 .field("page_size", &self.page_size)
401 .finish_non_exhaustive() }
403}
404
405impl<A: Referable> Serialize for ActorMeshRef<A> {
407 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
408 where
409 S: Serializer,
410 {
411 (&self.proc_mesh, &self.name).serialize(serializer)
413 }
414}
415
416impl<'de, A: Referable> Deserialize<'de> for ActorMeshRef<A> {
418 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
419 where
420 D: Deserializer<'de>,
421 {
422 let (proc_mesh, name) = <(ProcMeshRef, Name)>::deserialize(deserializer)?;
423 Ok(ActorMeshRef::with_page_size(name, proc_mesh, DEFAULT_PAGE))
424 }
425}
426
427impl<A: Referable> view::Ranked for ActorMeshRef<A> {
428 type Item = ActorRef<A>;
429
430 #[inline]
431 fn region(&self) -> &Region {
432 view::Ranked::region(&self.proc_mesh)
433 }
434
435 #[inline]
436 fn get(&self, rank: usize) -> Option<&Self::Item> {
437 self.materialize(rank)
438 }
439}
440
441impl<A: Referable> view::RankedSliceable for ActorMeshRef<A> {
442 fn sliced(&self, region: Region) -> Self {
443 debug_assert!(region.is_subset(view::Ranked::region(self)));
444 let proc_mesh = self.proc_mesh.subset(region).unwrap();
445 Self::with_page_size(self.name.clone(), proc_mesh, self.page_size)
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use std::assert_matches::assert_matches;
452 use std::collections::HashSet;
453
454 use hyperactor::actor::ActorStatus;
455 use hyperactor::clock::Clock;
456 use hyperactor::clock::RealClock;
457 use hyperactor::context::Mailbox as _;
458 use hyperactor::mailbox;
459 use ndslice::Extent;
460 use ndslice::ViewExt;
461 use ndslice::extent;
462 use ndslice::view::Ranked;
463 use timed_test::async_timed_test;
464 use tokio::time::Duration;
465
466 use super::ActorMesh;
467 use crate::proc_mesh::mesh_agent::ActorState;
468 use crate::resource;
469 use crate::v1::ActorMeshRef;
470 use crate::v1::Name;
471 use crate::v1::ProcMesh;
472 use crate::v1::proc_mesh::GET_ACTOR_STATE_MAX_IDLE;
473 use crate::v1::testactor;
474 use crate::v1::testing;
475
476 #[tokio::test]
477 #[cfg(fbcode_build)]
478 async fn test_actor_mesh_ref_lazy_materialization() {
479 let instance = testing::instance().await;
481 let extent = extent!(replicas = 3, hosts = 2); let pm: ProcMesh = testing::proc_meshes(instance, extent.clone())
485 .await
486 .into_iter()
487 .next()
488 .expect("at least one proc mesh");
489 let am: ActorMesh<testactor::TestActor> = pm.spawn(instance, "test", &()).await.unwrap();
490
491 let page_size = 2;
495 let amr: ActorMeshRef<testactor::TestActor> =
496 ActorMeshRef::with_page_size(am.name.clone(), pm.clone(), page_size);
497 assert_eq!(amr.extent(), extent);
498 assert_eq!(amr.region().num_ranks(), 6);
499
500 let p0_a = amr.get(0).expect("rank 0 exists") as *const _;
502 let p0_b = amr.get(0).expect("rank 0 exists") as *const _;
503 assert_eq!(p0_a, p0_b, "same rank should return same cached pointer");
504
505 let p1_a = amr.get(1).expect("rank 1 exists") as *const _;
507 let p1_b = amr.get(1).expect("rank 1 exists") as *const _;
508 assert_eq!(p1_a, p1_b, "same rank should return same cached pointer");
509 assert_ne!(p0_a, p1_a, "different ranks have different cache slots");
512
513 let p2_a = amr.get(2).expect("rank 2 exists") as *const _;
515 let p2_b = amr.get(2).expect("rank 2 exists") as *const _;
516 assert_eq!(p2_a, p2_b, "same rank should return same cached pointer");
517 assert_ne!(p0_a, p2_a, "different pages have different cache slots");
518
519 let amr_clone = amr.clone();
521 let orig_id_0 = amr.get(0).unwrap().actor_id().clone();
522 let clone_id_0 = amr_clone.get(0).unwrap().actor_id().clone();
523 assert_eq!(orig_id_0, clone_id_0, "clone preserves identity");
524 let p0_clone = amr_clone.get(0).unwrap() as *const _;
525 assert_ne!(
526 p0_a, p0_clone,
527 "cloned ActorMeshRef has a fresh cache (different pointer)"
528 );
529
530 let sliced = amr.range("replicas", 1..).expect("slice should be valid"); assert_eq!(sliced.region().num_ranks(), 4);
534 let sp0_a = sliced.get(0).unwrap() as *const _;
536 let sp0_b = sliced.get(0).unwrap() as *const _;
537 assert_eq!(sp0_a, sp0_b, "sliced view has its own cache slot per rank");
538 let sp2 = sliced.get(2).unwrap() as *const _;
541 assert_ne!(sp0_a, sp2, "sliced view crosses its own page boundary");
542
543 let mut set = HashSet::new();
546 set.insert(amr.clone());
547 set.insert(amr.clone());
548 assert_eq!(set.len(), 1, "cache state must not affect Hash/Eq");
549
550 let (port, mut rx) = mailbox::open_port(instance);
553 amr.get(0)
556 .expect("rank 0 exists")
557 .send(instance, testactor::GetActorId(port.bind()))
558 .expect("send to rank 0 should succeed");
559 amr.get(3)
560 .expect("rank 3 exists")
561 .send(instance, testactor::GetActorId(port.bind()))
562 .expect("send to rank 3 should succeed");
563 let id_a = RealClock
564 .timeout(Duration::from_secs(3), rx.recv())
565 .await
566 .expect("timed out waiting for first reply")
567 .expect("channel closed before first reply");
568 let id_b = RealClock
569 .timeout(Duration::from_secs(3), rx.recv())
570 .await
571 .expect("timed out waiting for second reply")
572 .expect("channel closed before second reply");
573 assert_ne!(id_a, id_b, "two different ranks responded");
574 }
575
576 #[async_timed_test(timeout_secs = 30)]
577 #[cfg(fbcode_build)]
578 async fn test_actor_states_with_panic() {
579 hyperactor_telemetry::initialize_logging_for_test();
580
581 let instance = testing::instance().await;
582 let (supervision_port, mut supervision_receiver) =
584 instance.open_port::<resource::State<ActorState>>();
585 let supervisor = supervision_port.bind();
586 let num_replicas = 4;
587 let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
588 let proc_mesh = &meshes[1];
589 let child_name = Name::new("child");
590
591 let actor_mesh = proc_mesh
592 .spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
593 .await
594 .unwrap();
595
596 actor_mesh
597 .cast(
598 instance,
599 testactor::CauseSupervisionEvent(testactor::SupervisionEventType::Panic),
600 )
601 .unwrap();
602
603 #[allow(clippy::disallowed_methods)]
607 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
608
609 let supervision_task = tokio::spawn(async move {
613 let events = actor_mesh.actor_states(&instance).await.unwrap();
614 for state in events.values() {
615 supervisor.send(instance, state.clone()).unwrap();
616 }
617 });
618 supervision_task.await.unwrap();
620
621 for _ in 0..num_replicas {
622 let state = RealClock
623 .timeout(Duration::from_secs(10), supervision_receiver.recv())
624 .await
625 .expect("timeout")
626 .unwrap();
627 if let resource::Status::Failed(s) = state.status {
628 assert!(s.contains("supervision events"));
629 } else {
630 panic!("Not failed: {:?}", state.status);
631 }
632 if let Some(ref inner) = state.state {
633 assert!(!inner.supervision_events.is_empty());
634 for event in &inner.supervision_events {
635 println!("receiving event: {:?}", event);
636 assert_eq!(event.actor_id.name(), format!("{}", child_name.clone()));
637 assert_matches!(event.actor_status, ActorStatus::Failed(_));
638 }
639 }
640 }
641 }
642
643 #[async_timed_test(timeout_secs = 30)]
644 #[cfg(fbcode_build)]
645 async fn test_actor_states_with_process_exit() {
646 hyperactor_telemetry::initialize_logging_for_test();
647
648 let config = hyperactor::config::global::lock();
649 let _guard = config.override_key(GET_ACTOR_STATE_MAX_IDLE, Duration::from_secs(1));
650
651 let instance = testing::instance().await;
652 let (supervision_port, mut supervision_receiver) =
654 instance.open_port::<resource::State<ActorState>>();
655 let supervisor = supervision_port.bind();
656 let num_replicas = 4;
657 let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
658 let proc_mesh = &meshes[1];
659 let child_name = Name::new("child");
660
661 let actor_mesh = proc_mesh
662 .spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
663 .await
664 .unwrap();
665
666 actor_mesh
667 .cast(
668 instance,
669 testactor::CauseSupervisionEvent(testactor::SupervisionEventType::ProcessExit(1)),
670 )
671 .unwrap();
672
673 #[allow(clippy::disallowed_methods)]
677 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
678
679 let supervision_task = tokio::spawn(async move {
683 let events = actor_mesh.actor_states(&instance).await.unwrap();
684 for state in events.values() {
685 supervisor.send(instance, state.clone()).unwrap();
686 }
687 });
688 RealClock
690 .timeout(Duration::from_secs(10), supervision_task)
691 .await
692 .expect("timeout")
693 .unwrap();
694
695 for _ in 0..num_replicas {
696 let state = RealClock
697 .timeout(Duration::from_secs(10), supervision_receiver.recv())
698 .await
699 .expect("timeout")
700 .unwrap();
701 assert_matches!(state.status, resource::Status::Timeout(_));
702 let events = state
703 .state
704 .expect("state should be present")
705 .supervision_events;
706 assert_eq!(events.len(), 1);
707 assert_eq!(events[0].actor_status, ActorStatus::Stopped);
708 }
709 }
710
711 #[async_timed_test(timeout_secs = 30)]
712 #[cfg(fbcode_build)]
713 async fn test_actor_states_on_sliced_mesh() {
714 hyperactor_telemetry::initialize_logging_for_test();
715
716 let instance = testing::instance().await;
717 let (supervision_port, mut supervision_receiver) =
719 instance.open_port::<resource::State<ActorState>>();
720 let supervisor = supervision_port.bind();
721 let num_replicas = 4;
722 let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
723 let proc_mesh = &meshes[1];
724 let child_name = Name::new("child");
725
726 let actor_mesh = proc_mesh
727 .spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
728 .await
729 .unwrap();
730 let sliced = actor_mesh
731 .range("replicas", 1..3)
732 .expect("slice should be valid");
733 let sliced_replicas = sliced.len();
734
735 sliced
736 .cast(
737 instance,
738 testactor::CauseSupervisionEvent(testactor::SupervisionEventType::Panic),
739 )
740 .unwrap();
741
742 #[allow(clippy::disallowed_methods)]
746 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
747
748 let supervision_task = tokio::spawn(async move {
752 let events = sliced.actor_states(&instance).await.unwrap();
753 for state in events.values() {
754 supervisor.send(instance, state.clone()).unwrap();
755 }
756 });
757 RealClock
759 .timeout(Duration::from_secs(10), supervision_task)
760 .await
761 .expect("timeout")
762 .unwrap();
763
764 for _ in 0..sliced_replicas {
765 let state = RealClock
766 .timeout(Duration::from_secs(10), supervision_receiver.recv())
767 .await
768 .expect("timeout")
769 .unwrap();
770 if let resource::Status::Failed(s) = state.status {
771 assert!(s.contains("supervision events"));
772 } else {
773 panic!("Not failed: {:?}", state.status);
774 }
775 if let Some(ref inner) = state.state {
776 assert!(!inner.supervision_events.is_empty());
777 for event in &inner.supervision_events {
778 assert_eq!(event.actor_id.name(), format!("{}", child_name.clone()));
779 assert_matches!(event.actor_status, ActorStatus::Failed(_));
780 }
781 }
782 }
783 }
784
785 #[async_timed_test(timeout_secs = 30)]
786 #[cfg(fbcode_build)]
787 async fn test_cast() {
788 let config = hyperactor::config::global::lock();
789 let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
790
791 let instance = testing::instance().await;
792 let host_mesh = testing::host_mesh(extent!(host = 4)).await;
793 let proc_mesh = host_mesh
794 .spawn(instance, "test", Extent::unity())
795 .await
796 .unwrap();
797 let actor_mesh = proc_mesh
798 .spawn::<testactor::TestActor>(instance, "test", &())
799 .await
800 .unwrap();
801
802 let (cast_info, mut cast_info_rx) = instance.mailbox().open_port();
803 actor_mesh
804 .cast(
805 instance,
806 testactor::GetCastInfo {
807 cast_info: cast_info.bind(),
808 },
809 )
810 .unwrap();
811
812 let mut point_to_actor: HashSet<_> = actor_mesh.iter().collect();
813 while !point_to_actor.is_empty() {
814 let (point, origin_actor_ref, sender_actor_id) = cast_info_rx.recv().await.unwrap();
815 let key = (point, origin_actor_ref);
816 assert!(
817 point_to_actor.remove(&key),
818 "key {:?} not present or removed twice",
819 key
820 );
821 assert_eq!(&sender_actor_id, instance.self_id());
822 }
823
824 let _ = host_mesh.shutdown(&instance).await;
825 }
826}