1use std::any::type_name;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fmt;
13use std::hash::Hash;
14use std::ops::Deref;
15use std::sync::Arc;
16use std::time::Duration;
17
18use hyperactor::Actor;
19use hyperactor::ActorAddr;
20use hyperactor::ActorRef;
21use hyperactor::Endpoint as _;
22use hyperactor::Handler;
23use hyperactor::ProcAddr;
24use hyperactor::RemoteMessage;
25use hyperactor::RemoteSpawn;
26use hyperactor::accum::StreamingReducerOpts;
27use hyperactor::actor::ActorStatus;
28use hyperactor::actor::Referable;
29use hyperactor::actor::remote::Remote;
30use hyperactor::context;
31use hyperactor::id::Label;
32use hyperactor::supervision::ActorSupervisionEvent;
33use hyperactor_config::CONFIG;
34use hyperactor_config::ConfigAttr;
35use hyperactor_config::attrs::declare_attrs;
36use ndslice::Extent;
37use ndslice::ViewExt as _;
38use ndslice::view;
39use ndslice::view::CollectMeshExt;
40use ndslice::view::Ranked;
41use ndslice::view::Region;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::ActorMesh;
47use crate::ActorMeshRef;
48use crate::CommActor;
49use crate::Error;
50use crate::HostMeshRef;
51use crate::ValueMesh;
52use crate::comm::CommMeshConfig;
53use crate::host_mesh::host_agent::ProcState;
54use crate::host_mesh::mesh_to_rankedvalues_with_default;
55use crate::mesh_controller::ActorMeshController;
56use crate::mesh_id::ActorMeshId;
57use crate::mesh_id::ProcMeshId;
58use crate::proc_agent;
59use crate::proc_agent::ActorState;
60use crate::proc_agent::ProcAgent;
61use crate::resource;
62use crate::resource::GetRankStatus;
63use crate::resource::Status;
64use crate::supervision::MeshFailure;
65
66declare_attrs! {
67 @meta(CONFIG = ConfigAttr::new(
70 Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()),
71 Some("actor_spawn_max_idle".to_string()),
72 ))
73 pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
74
75 @meta(CONFIG = ConfigAttr::new(
78 Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()),
79 Some("get_actor_state_max_idle".to_string()),
80 ))
81 pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(30);
82}
83
84pub const COMM_ACTOR_NAME: &str = "comm";
89
90#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
92pub struct ProcRef {
93 proc_id: ProcAddr,
94 create_rank: usize,
96 agent: ActorRef<ProcAgent>,
98}
99
100impl ProcRef {
101 pub fn new(proc_id: ProcAddr, create_rank: usize, agent: ActorRef<ProcAgent>) -> Self {
103 Self {
104 proc_id,
105 create_rank,
106 agent,
107 }
108 }
109
110 pub fn proc_addr(&self) -> &ProcAddr {
111 &self.proc_id
112 }
113
114 pub(crate) fn actor_addr(&self, id: &ActorMeshId) -> ActorAddr {
115 self.proc_id.actor_addr_uid(id.uid().clone())
116 }
117
118 pub(crate) fn attest<A: Referable>(&self, id: &ActorMeshId) -> ActorRef<A> {
121 ActorRef::attest(self.actor_addr(id))
122 }
123}
124
125#[derive(Debug)]
127pub struct ProcMesh {
128 #[allow(dead_code)]
129 id: ProcMeshId,
130 #[allow(dead_code)]
131 comm_actor_name: Option<ActorMeshId>,
132 current_ref: ProcMeshRef,
133 controller: Option<ActorRef<crate::mesh_controller::ProcMeshController>>,
134}
135
136impl ProcMesh {
137 pub(crate) async fn create<C: context::Actor>(
138 cx: &C,
139 id: ProcMeshId,
140 extent: Extent,
141 hosts: HostMeshRef,
142 ranks: Vec<ProcRef>,
143 ) -> crate::Result<Self>
144 where
145 C::A: Handler<MeshFailure>,
146 {
147 let comm_actor_name = ActorMeshId::singleton(Label::new(COMM_ACTOR_NAME).unwrap());
148
149 let region = extent.into();
150 let ranks = Arc::new(ranks);
151
152 if let Some(first) = ranks.first() {
156 crate::global_context::set_global_supervision_sink(
157 first.agent.port::<ActorSupervisionEvent>(),
158 );
159 }
160
161 let root_comm_actor: ActorRef<CommActor> = ActorRef::attest(
162 ranks
163 .first()
164 .expect("root mesh cannot be empty")
165 .actor_addr(&comm_actor_name),
166 );
167 let current_ref = ProcMeshRef::new(
168 id.clone(),
169 region,
170 ranks,
171 Some(hosts),
172 None, None, )
175 .unwrap();
176
177 {
179 let name_str = id.to_string();
180 let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
181
182 let hm = current_ref
183 .host_mesh
184 .as_ref()
185 .expect("ProcMesh always has a host mesh");
186 let parent_mesh_id = hyperactor_telemetry::hash_to_u64(&hm.id().to_string());
187 let parent_view_json = serde_json::to_string(hm.region())
188 .unwrap_or_else(|e| format!("encountered error when serializing region: {}", e));
189
190 hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
191 id: mesh_id_hash,
192 timestamp: std::time::SystemTime::now(),
193 class: "Proc".to_string(),
194 given_name: id
195 .display_label()
196 .map(|l| l.as_str())
197 .unwrap_or("unnamed")
198 .to_string(),
199 full_name: name_str,
200 shape_json: serde_json::to_string(¤t_ref.region.extent()).unwrap_or_default(),
201 parent_mesh_id: Some(parent_mesh_id),
202 parent_view_json: Some(parent_view_json),
203 });
204
205 let now = std::time::SystemTime::now();
208 for rank in current_ref.ranks.iter() {
209 let actor_id = rank.agent.actor_addr();
210
211 hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
212 id: hyperactor_telemetry::hash_to_u64(&actor_id),
213 timestamp: now,
214 mesh_id: mesh_id_hash,
215 rank: rank.create_rank as u64,
216 full_name: actor_id.to_string(),
217 display_name: None,
218 });
219 }
220 }
221
222 let mut proc_mesh = Self {
223 id,
224 comm_actor_name: Some(comm_actor_name.clone()),
225 current_ref,
226 controller: None,
227 };
228
229 let comm_actor_mesh: ActorMesh<CommActor> = proc_mesh
233 .spawn_with_name(cx, comm_actor_name, &Default::default(), None, true)
234 .await?;
235 let address_book: HashMap<_, _> = comm_actor_mesh
236 .iter()
237 .map(|(point, actor_ref)| (point.rank(), actor_ref))
238 .collect();
239 for (rank, comm_actor) in &address_book {
242 comm_actor.post(cx, CommMeshConfig::new(*rank, address_book.clone()));
243 }
244 proc_mesh.current_ref.root_comm_actor = Some(root_comm_actor);
245
246 Ok(proc_mesh)
247 }
248
249 pub(crate) fn set_controller(
251 &mut self,
252 controller: Option<ActorRef<crate::mesh_controller::ProcMeshController>>,
253 ) {
254 self.controller = controller;
255 }
256
257 pub async fn stop(&mut self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
268 if let Some(controller) = self.controller.take() {
269 let id = self.id.resource_id().clone();
270 controller.post(
271 cx,
272 resource::Stop {
273 id: id.clone(),
274 reason,
275 },
276 );
277
278 let (port, mut rx) = cx.mailbox().open_port();
283 controller.post(
284 cx,
285 resource::GetState::<resource::mesh::State<()>> {
286 id: id.clone(),
287 reply: port.bind(),
288 },
289 );
290
291 let statuses = rx.recv().await?;
292 let Some(state) = &statuses.state else {
293 anyhow::bail!(
294 "non-existent state in GetState reply from controller: {}",
295 controller.actor_addr()
296 );
297 };
298 let all_stopped = state.statuses.values().all(|s| s.is_terminating());
304 if !all_stopped {
305 anyhow::bail!(
306 "proc mesh {} not all procs reached terminating state after stop: {:?}",
307 id,
308 state.statuses,
309 );
310 }
311 return Ok(());
312 }
313
314 let region = self.region.clone();
315 let procs = self.current_ref.proc_ids().collect::<Vec<ProcAddr>>();
316 self.current_ref
319 .host_mesh
320 .as_ref()
321 .expect("ProcMesh always has a host mesh")
322 .stop_proc_mesh(cx, &self.id, procs, region, reason)
323 .await
324 .map(|_| ())
325 .map_err(anyhow::Error::from)
326 }
327
328 #[cfg(test)]
329 pub(crate) fn ranks(&self) -> Arc<Vec<ProcRef>> {
330 Arc::clone(&self.current_ref.ranks)
331 }
332}
333
334impl fmt::Display for ProcMesh {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 write!(f, "{}", self.current_ref)
337 }
338}
339
340impl Deref for ProcMesh {
341 type Target = ProcMeshRef;
342
343 fn deref(&self) -> &Self::Target {
344 &self.current_ref
345 }
346}
347
348impl Drop for ProcMesh {
349 fn drop(&mut self) {
350 tracing::info!(
351 name = "ProcMeshStatus",
352 proc_mesh = %self.id,
353 status = "Dropped",
354 );
355 }
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
365pub struct ProcMeshRef {
366 id: ProcMeshId,
367 region: Region,
368 ranks: Arc<Vec<ProcRef>>,
369 host_mesh: Option<HostMeshRef>,
371 pub(crate) root_region: Option<Region>,
375 pub(crate) root_comm_actor: Option<ActorRef<CommActor>>,
380}
381wirevalue::register_type!(ProcMeshRef);
382
383impl ProcMeshRef {
384 #[allow(clippy::result_large_err)]
386 fn new(
387 id: ProcMeshId,
388 region: Region,
389 ranks: Arc<Vec<ProcRef>>,
390 host_mesh: Option<HostMeshRef>,
391 root_region: Option<Region>,
392 root_comm_actor: Option<ActorRef<CommActor>>,
393 ) -> crate::Result<Self> {
394 if region.num_ranks() != ranks.len() {
395 return Err(crate::Error::InvalidRankCardinality {
396 expected: region.num_ranks(),
397 actual: ranks.len(),
398 });
399 }
400 Ok(Self {
401 id,
402 region,
403 ranks,
404 host_mesh,
405 root_region,
406 root_comm_actor,
407 })
408 }
409
410 pub fn new_singleton(id: ProcMeshId, proc_ref: ProcRef) -> Self {
414 Self {
415 id,
416 region: Extent::unity().into(),
417 ranks: Arc::new(vec![proc_ref]),
418 host_mesh: None,
419 root_region: None,
420 root_comm_actor: None,
421 }
422 }
423
424 pub(crate) fn root_comm_actor(&self) -> Option<&ActorRef<CommActor>> {
425 self.root_comm_actor.as_ref()
426 }
427
428 pub fn id(&self) -> &ProcMeshId {
429 &self.id
430 }
431
432 pub fn host_mesh_id(&self) -> Option<&crate::mesh_id::HostMeshId> {
433 self.host_mesh.as_ref().map(|h| h.id())
434 }
435
436 pub fn hosts(&self) -> Option<&HostMeshRef> {
438 self.host_mesh.as_ref()
439 }
440
441 pub(crate) fn agent_mesh(&self) -> ActorMeshRef<ProcAgent> {
442 let agent_label = self
443 .ranks
444 .first()
445 .unwrap()
446 .agent
447 .actor_addr()
448 .label()
449 .cloned()
450 .unwrap_or_else(|| Label::new(proc_agent::PROC_AGENT_ACTOR_NAME).unwrap());
451 let id = ActorMeshId::singleton(agent_label);
452 ActorMeshRef::new(id, self.clone(), None)
453 }
454
455 pub async fn actor_states(
457 &self,
458 cx: &impl context::Actor,
459 id: ActorMeshId,
460 ) -> crate::Result<ValueMesh<resource::State<ActorState>>> {
461 self.actor_states_with_keepalive(cx, id, None).await
462 }
463
464 pub(crate) async fn actor_states_with_keepalive(
470 &self,
471 cx: &impl context::Actor,
472 id: ActorMeshId,
473 keepalive: Option<std::time::SystemTime>,
474 ) -> crate::Result<ValueMesh<resource::State<ActorState>>> {
475 let agent_mesh = self.agent_mesh();
476 let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
477 let mut port = port.bind();
478 port.return_undeliverable(false);
481 let get_state = resource::GetState::<ActorState> {
484 id: id.resource_id().clone(),
485 reply: port,
486 };
487 if let Some(expires_after) = keepalive {
488 agent_mesh.cast(
489 cx,
490 resource::KeepaliveGetState {
491 expires_after,
492 get_state,
493 },
494 )?;
495 } else {
496 agent_mesh.cast(cx, get_state)?;
497 }
498 let expected = self.ranks.len();
499 let mut states = Vec::with_capacity(expected);
500 let timeout = hyperactor_config::global::get(GET_ACTOR_STATE_MAX_IDLE);
501 for _ in 0..expected {
502 let state = tokio::time::timeout(timeout, rx.recv()).await;
508 if let Ok(state) = state {
509 let state = state?;
511 match state.state {
512 Some(ref inner) => {
513 states.push((inner.create_rank, state));
514 }
515 None => {
516 return Err(Error::NotExist(state.id));
517 }
518 }
519 } else {
520 tracing::error!(
521 "timeout waiting for a message after {:?} from proc mesh agent in mesh {}",
522 timeout,
523 agent_mesh
524 );
525 let all_ranks = (0..self.ranks.len()).collect::<HashSet<_>>();
528 let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
529 let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
530 assert_eq!(leftover_ranks.len(), expected - states.len());
531 while states.len() < expected {
532 let rank = *leftover_ranks
533 .pop()
534 .expect("leftover ranks should not be empty");
535 let agent = agent_mesh.get(rank).expect("agent should exist");
536 let agent_id = agent.actor_addr().clone();
537 states.push((
538 rank,
540 resource::State {
541 id: id.resource_id().clone(),
542 status: resource::Status::Timeout(timeout),
543 generation: u64::MAX,
548 timestamp: std::time::SystemTime::now(),
549 state: Some(ActorState {
550 actor_id: agent_id.clone(),
551 create_rank: rank,
552 supervision_events: vec![ActorSupervisionEvent::new(
553 agent_id,
554 None,
555 ActorStatus::generic_failure(format!(
556 "timeout waiting for message from proc mesh agent while querying for \"{}\". The process likely crashed",
557 id,
558 )),
559 None,
560 )],
561 }),
562 },
563 ));
564 }
565 break;
566 }
567 }
568 states.sort_by_key(|(rank, _)| *rank);
572 let vm = states
573 .into_iter()
574 .map(|(_, state)| state)
575 .collect_mesh::<ValueMesh<_>>(self.region.clone())?;
576 Ok(vm)
577 }
578
579 pub async fn proc_states(
580 &self,
581 cx: &impl context::Actor,
582 keepalive: Option<std::time::SystemTime>,
583 ) -> crate::Result<Option<ValueMesh<resource::State<ProcState>>>> {
584 let names = self.proc_ids().collect::<Vec<ProcAddr>>();
585 if let Some(host_mesh) = &self.host_mesh {
586 Ok(Some(
587 host_mesh
588 .proc_states(cx, names, self.region.clone(), keepalive)
589 .await?,
590 ))
591 } else {
592 Ok(None)
593 }
594 }
595
596 pub(crate) fn proc_ids(&self) -> impl Iterator<Item = ProcAddr> {
598 self.ranks.iter().map(|proc_ref| proc_ref.proc_id.clone())
599 }
600
601 pub async fn spawn<A: RemoteSpawn, C: context::Actor>(
611 &self,
612 cx: &C,
613 name: &str,
614 params: &A::Params,
615 ) -> crate::Result<ActorMesh<A>>
616 where
617 A::Params: RemoteMessage,
618 C::A: Handler<MeshFailure>,
619 {
620 let id = ActorMeshId::instance(Label::strip(name));
622 self.spawn_with_name(cx, id, params, None, false).await
623 }
624
625 pub async fn spawn_service<A: RemoteSpawn, C: context::Actor>(
633 &self,
634 cx: &C,
635 name: &str,
636 params: &A::Params,
637 ) -> crate::Result<ActorMesh<A>>
638 where
639 A::Params: RemoteMessage,
640 C::A: Handler<MeshFailure>,
641 {
642 let id = ActorMeshId::singleton(Label::strip(name));
643 self.spawn_with_name(cx, id, params, None, false).await
644 }
645
646 #[hyperactor::instrument(fields(
664 host_mesh=self.host_mesh_id().map(|id| id.to_string()),
665 proc_mesh=self.id.to_string(),
666 actor_name=name.to_string(),
667 ))]
668 pub async fn spawn_with_name<A: RemoteSpawn, C: context::Actor>(
669 &self,
670 cx: &C,
671 name: ActorMeshId,
672 params: &A::Params,
673 supervision_display_name: Option<String>,
674 is_system_actor: bool,
675 ) -> crate::Result<ActorMesh<A>>
676 where
677 A::Params: RemoteMessage,
678 C::A: Handler<MeshFailure>,
679 {
680 tracing::info!(
681 name = "ProcMeshStatus",
682 status = "ActorMesh::Spawn::Attempt",
683 );
684 tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt");
685 let result = self
686 .spawn_with_name_inner(cx, name, params, supervision_display_name, is_system_actor)
687 .await;
688 match &result {
689 Ok(_) => {
690 tracing::info!(
691 name = "ProcMeshStatus",
692 status = "ActorMesh::Spawn::Success",
693 );
694 tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success");
695 }
696 Err(error) => {
697 tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error);
698 tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error);
699 }
700 }
701 result
702 }
703
704 async fn spawn_with_name_inner<A: RemoteSpawn, C: context::Actor>(
705 &self,
706 cx: &C,
707 actor_mesh_id: ActorMeshId,
708 params: &A::Params,
709 supervision_display_name: Option<String>,
710 is_system_actor: bool,
711 ) -> crate::Result<ActorMesh<A>>
712 where
713 C::A: Handler<MeshFailure>,
714 {
715 let remote = Remote::collect();
716 let actor_type = remote
720 .name_of::<A>()
721 .ok_or(Error::ActorTypeNotRegistered(type_name::<A>().to_string()))?
722 .to_string();
723
724 let serialized_params = bincode::serde::encode_to_vec(params, bincode::config::legacy())?;
725 let agent_mesh = self.agent_mesh();
726
727 agent_mesh.cast(
728 cx,
729 resource::CreateOrUpdate::<proc_agent::ActorSpec> {
730 id: actor_mesh_id.resource_id().clone(),
731 rank: Default::default(),
732 spec: proc_agent::ActorSpec {
733 actor_type: actor_type.clone(),
734 params_data: serialized_params.clone(),
735 },
736 },
737 )?;
738
739 let region = self.region().clone();
740 let (port, rx) = cx.mailbox().open_accum_port_opts(
750 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
753 StreamingReducerOpts {
754 max_update_interval: Some(Duration::from_millis(50)),
755 initial_update_interval: None,
756 },
757 );
758
759 let mut reply = port.bind();
760 reply.return_undeliverable(false);
763 agent_mesh.cast(
766 cx,
767 resource::GetRankStatus {
768 id: actor_mesh_id.resource_id().clone(),
769 reply,
770 },
771 )?;
772
773 let start_time = tokio::time::Instant::now();
774
775 let (statuses, mut mesh) = match GetRankStatus::wait(
784 rx,
785 self.ranks.len(),
786 hyperactor_config::global::get(ACTOR_SPAWN_MAX_IDLE),
787 region.clone(), )
789 .await
790 {
791 Ok(statuses) => {
792 let has_terminating = statuses.values().any(|s| s.is_terminating());
796 if !has_terminating {
797 Ok((
798 statuses,
799 ActorMesh::new(self.clone(), actor_mesh_id.clone(), None),
800 ))
801 } else {
802 let legacy = mesh_to_rankedvalues_with_default(
803 &statuses,
804 Status::NotExist,
805 Status::is_not_exist,
806 self.ranks.len(),
807 );
808 Err(Error::ActorSpawnError { statuses: legacy })
809 }
810 }
811 Err(complete) => {
812 let elapsed = start_time.elapsed();
815 let legacy = mesh_to_rankedvalues_with_default(
816 &complete,
817 Status::Timeout(elapsed),
818 Status::is_not_exist,
819 self.ranks.len(),
820 );
821 Err(Error::ActorSpawnError { statuses: legacy })
822 }
823 }?;
824 if !is_system_actor {
826 let controller: ActorMeshController<A> = ActorMeshController::new(
829 mesh.deref().clone(),
830 supervision_display_name.clone(),
831 Some(cx.instance().port().bind()),
832 statuses,
833 );
834 let controller_name = format!(
839 "{}_{}",
840 crate::mesh_controller::ACTOR_MESH_CONTROLLER_NAME,
841 mesh.id()
842 );
843 let controller = controller
844 .spawn_with_name(cx, &controller_name)
845 .map_err(|e| {
846 Error::ControllerActorSpawnError(mesh.id().resource_id().clone(), e)
847 })?;
848 mesh.set_controller(Some(controller.bind()));
851 }
852 {
854 let id_str = mesh.id().to_string();
855
856 let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&id_str);
859
860 let parent_mesh_id_hash = hyperactor_telemetry::hash_to_u64(&self.id().to_string());
862
863 hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
864 id: mesh_id_hash,
865 timestamp: std::time::SystemTime::now(),
866 class: supervision_display_name
867 .as_deref()
868 .and_then(python_class_from_supervision_name)
869 .unwrap_or(actor_type),
870 given_name: mesh
871 .id()
872 .display_label()
873 .map(|l| l.as_str())
874 .unwrap_or("unnamed")
875 .to_string(),
876 full_name: id_str,
877 shape_json: serde_json::to_string(&self.region().extent()).unwrap_or_default(),
878 parent_mesh_id: Some(parent_mesh_id_hash),
879 parent_view_json: serde_json::to_string(self.region()).ok(),
880 });
881
882 let now = std::time::SystemTime::now();
886 for (rank, proc_ref) in self.ranks.iter().enumerate() {
887 let display_name = supervision_display_name.as_ref().map(|sdn| {
888 let point = self.region().extent().point_of_rank(rank).unwrap();
889 crate::actor_display_name(sdn, &point)
890 });
891 let actor_id = proc_ref.actor_addr(&actor_mesh_id);
892 hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
893 id: hyperactor_telemetry::hash_to_u64(&actor_id),
894 timestamp: now,
895 mesh_id: mesh_id_hash,
896 rank: rank as u64,
897 full_name: actor_id.to_string(),
898 display_name,
899 });
900 }
901 }
902
903 Ok(mesh)
904 }
905
906 #[hyperactor::instrument(fields(
908 host_mesh = self.host_mesh_id().map(|id| id.to_string()),
909 proc_mesh = self.id.to_string(),
910 actor_mesh = actor_mesh_id.to_string(),
911 ))]
912 pub(crate) async fn stop_actor_by_id(
913 &self,
914 cx: &impl context::Actor,
915 actor_mesh_id: ActorMeshId,
916 reason: String,
917 ) -> crate::Result<ValueMesh<Status>> {
918 tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt");
919 tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt");
920 let result = self.stop_actor_by_id_inner(cx, actor_mesh_id, reason).await;
921 match &result {
922 Ok(_) => {
923 tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success");
924 tracing::info!(name = "ActorMeshStatus", status = "Stop::Success");
925 }
926 Err(error) => {
927 tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error);
928 tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error);
929 }
930 }
931 result
932 }
933
934 async fn stop_actor_by_id_inner(
935 &self,
936 cx: &impl context::Actor,
937 actor_mesh_id: ActorMeshId,
938 reason: String,
939 ) -> crate::Result<ValueMesh<Status>> {
940 let region = self.region().clone();
941 let agent_mesh = self.agent_mesh();
942 agent_mesh.cast(
943 cx,
944 resource::Stop {
945 id: actor_mesh_id.resource_id().clone(),
946 reason,
947 },
948 )?;
949
950 let (port, rx) = cx.mailbox().open_accum_port_opts(
960 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
963 StreamingReducerOpts {
964 max_update_interval: Some(Duration::from_millis(50)),
965 initial_update_interval: None,
966 },
967 );
968 agent_mesh.cast(
972 cx,
973 resource::WaitRankStatus {
974 id: actor_mesh_id.resource_id().clone(),
975 min_status: Status::Stopped,
976 reply: port.bind(),
977 },
978 )?;
979 let start_time = tokio::time::Instant::now();
980
981 let max_idle_time = hyperactor_config::global::get(ACTOR_SPAWN_MAX_IDLE);
983 match GetRankStatus::wait(
984 rx,
985 self.ranks.len(),
986 max_idle_time,
987 region.clone(), )
989 .await
990 {
991 Ok(statuses) => {
992 let all_stopped = statuses.values().all(|s| s.is_terminating());
996 if all_stopped {
997 Ok(statuses)
998 } else {
999 let legacy = mesh_to_rankedvalues_with_default(
1000 &statuses,
1001 Status::NotExist,
1002 Status::is_not_exist,
1003 self.ranks.len(),
1004 );
1005 Err(Error::ActorStopError { statuses: legacy })
1006 }
1007 }
1008 Err(complete) => {
1009 let legacy = mesh_to_rankedvalues_with_default(
1012 &complete,
1013 Status::Timeout(start_time.elapsed()),
1014 Status::is_not_exist,
1015 self.ranks.len(),
1016 );
1017 Err(Error::ActorStopError { statuses: legacy })
1018 }
1019 }
1020 }
1021}
1022
1023impl fmt::Display for ProcMeshRef {
1024 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1025 write!(f, "{}{{{}}}", self.id, self.region)
1026 }
1027}
1028
1029impl view::Ranked for ProcMeshRef {
1030 type Item = ProcRef;
1031
1032 fn region(&self) -> &Region {
1033 &self.region
1034 }
1035
1036 fn get(&self, rank: usize) -> Option<&Self::Item> {
1037 self.ranks.get(rank)
1038 }
1039}
1040
1041impl view::RankedSliceable for ProcMeshRef {
1042 fn sliced(&self, region: Region) -> Self {
1043 debug_assert!(region.is_subset(view::Ranked::region(self)));
1044 let ranks = self
1045 .region()
1046 .remap(®ion)
1047 .unwrap()
1048 .map(|index| self.get(index).unwrap().clone())
1049 .collect();
1050 Self::new(
1051 self.id.clone(),
1052 region,
1053 Arc::new(ranks),
1054 self.host_mesh.clone(),
1055 Some(self.root_region.as_ref().unwrap_or(&self.region).clone()),
1056 self.root_comm_actor.clone(),
1057 )
1058 .unwrap()
1059 }
1060}
1061
1062fn python_class_from_supervision_name(sdn: &str) -> Option<String> {
1078 let inner = sdn.rsplit_once('<')?.1.strip_suffix('>')?;
1079 let qualified = inner.split_whitespace().next()?;
1080 let class_name = qualified.rsplit_once('.')?.1;
1081 Some(format!("Python<{class_name}>"))
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 #[cfg(fbcode_build)]
1087 use std::ops::Deref;
1088 #[cfg(fbcode_build)]
1089 use std::time::Duration;
1090
1091 #[cfg(fbcode_build)]
1092 use hyperactor::Instance;
1093 #[cfg(fbcode_build)]
1094 use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1095 #[cfg(fbcode_build)]
1096 use ndslice::ViewExt as _;
1097 #[cfg(fbcode_build)]
1098 use ndslice::extent;
1099 #[cfg(fbcode_build)]
1100 use timed_test::assert_no_process_leak;
1101 #[cfg(fbcode_build)]
1102 use timed_test::async_timed_test;
1103 #[cfg(fbcode_build)]
1104 use uuid::Uuid;
1105
1106 #[cfg(fbcode_build)]
1107 use crate::ActorMesh;
1108 #[cfg(fbcode_build)]
1109 use crate::comm::ENABLE_NATIVE_V1_CASTING;
1110 #[cfg(fbcode_build)]
1111 use crate::host_mesh::PROC_SPAWN_MAX_IDLE;
1112 #[cfg(fbcode_build)]
1113 use crate::resource::RankedValues;
1114 #[cfg(fbcode_build)]
1115 use crate::resource::Status;
1116 #[cfg(fbcode_build)]
1117 use crate::testactor;
1118 #[cfg(fbcode_build)]
1119 use crate::testing;
1120
1121 #[cfg(fbcode_build)]
1122 async fn execute_spawn_actor() {
1123 hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1124
1125 let instance = testing::instance();
1126
1127 let mut hm = testing::host_mesh(2).await;
1128 let proc_mesh = hm
1129 .spawn(&instance, "test", extent!(gpus = 1), None, None)
1130 .await
1131 .unwrap();
1132 let actor_mesh = proc_mesh.spawn(instance, "test", &()).await.unwrap();
1133 testactor::assert_mesh_shape(actor_mesh).await;
1134
1135 let _ = hm.shutdown(instance).await;
1136 }
1137
1138 #[async_timed_test(timeout_secs = 120)]
1139 #[cfg(fbcode_build)]
1140 async fn test_spawn_actor_v1_casting() {
1141 let config = hyperactor_config::global::lock();
1142 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1143 let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1144 let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(120));
1145 let _guard4 = config.override_key(
1146 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1147 Duration::from_secs(120),
1148 );
1149 execute_spawn_actor().await;
1150 }
1151
1152 #[async_timed_test(timeout_secs = 120)]
1153 #[cfg(fbcode_build)]
1154 async fn test_spawn_actor_v1_casting_p2p() {
1155 let config = hyperactor_config::global::lock();
1156 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1157 let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1158 let _guard3 = config.override_key(crate::config::V1_CAST_POINT_TO_POINT_THRESHOLD, 1024);
1159 let _guard4 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(120));
1160 let _guard5 = config.override_key(
1161 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1162 Duration::from_secs(120),
1163 );
1164 execute_spawn_actor().await;
1165 }
1166
1167 #[async_timed_test(timeout_secs = 120)]
1168 #[cfg(fbcode_build)]
1169 async fn test_spawn_actor_v0_casting() {
1170 let config = hyperactor_config::global::lock();
1171 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
1172 let _guard2 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(120));
1173 let _guard3 = config.override_key(
1174 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1175 Duration::from_secs(120),
1176 );
1177 execute_spawn_actor().await;
1178 }
1179
1180 #[cfg(fbcode_build)]
1184 async fn spawn_for_seq_test(
1185 cx: &Instance<testing::TestRootClient>,
1186 proc_mesh: &super::ProcMeshRef,
1187 ) -> ActorMesh<testactor::TestActor> {
1188 let actor_mesh: ActorMesh<testactor::TestActor> =
1189 proc_mesh.spawn(cx, "test", &()).await.unwrap();
1190
1191 let (instance, _) = cx
1192 .proc()
1193 .client(&format!("random_casts_{}", Uuid::now_v7()))
1194 .unwrap();
1195 let n = 1;
1196 for _ in 0..n {
1197 actor_mesh.cast(&instance, ()).unwrap();
1198 }
1199 println!(
1200 "did {} casts with sequencer session id {}",
1201 n,
1202 instance.sequencer().session_id()
1203 );
1204 actor_mesh
1205 }
1206
1207 #[async_timed_test(timeout_secs = 60)]
1208 #[cfg(fbcode_build)]
1209 async fn test_seq_from_same_sender_to_different_meshes() {
1210 let config = hyperactor_config::global::lock();
1211 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1212 let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1213 let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1214 let _guard4 = config.override_key(
1215 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1216 Duration::from_secs(60),
1217 );
1218
1219 hyperactor_telemetry::initialize_logging_for_test();
1220 let instance = testing::instance();
1221 let session_id = instance.sequencer().session_id();
1222
1223 let mut hm = testing::host_mesh(2).await;
1224 let proc_mesh = hm
1225 .spawn(&instance, "test", extent!(gpus = 1), None, None)
1226 .await
1227 .unwrap();
1228 let proc_mesh_ref = proc_mesh.deref();
1229
1230 let handles = (0..3)
1234 .map(|_| {
1235 let proc_mesh_ref_clone = proc_mesh_ref.clone();
1236 tokio::spawn(async move {
1237 let actor_mesh = spawn_for_seq_test(instance, &proc_mesh_ref_clone).await;
1238 let expected_seqs = vec![1; 2];
1239 testactor::assert_casting_correctness(
1240 &actor_mesh,
1241 instance,
1242 Some((session_id, expected_seqs)),
1243 )
1244 .await;
1245 })
1246 })
1247 .collect::<Vec<_>>();
1248 futures::future::join_all(handles).await;
1249
1250 let _ = hm.shutdown(instance).await;
1251 }
1252
1253 #[async_timed_test(timeout_secs = 60)]
1256 #[cfg(fbcode_build)]
1257 async fn test_seq_from_same_sender_to_different_views() {
1258 let config = hyperactor_config::global::lock();
1259 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1260 let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1261 let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1262 let _guard4 = config.override_key(
1263 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1264 Duration::from_secs(60),
1265 );
1266
1267 hyperactor_telemetry::initialize_logging_for_test();
1268
1269 let instance = testing::instance();
1270 let session_id = instance.sequencer().session_id();
1271
1272 let mut hm = testing::host_mesh(3).await;
1273 let proc_mesh = hm
1274 .spawn(&instance, "test", extent!(gpus = 1), None, None)
1275 .await
1276 .unwrap();
1277
1278 let actor_mesh = spawn_for_seq_test(instance, &proc_mesh).await;
1279
1280 let expected_seqs = vec![1; 3];
1282 testactor::assert_casting_correctness(
1283 &actor_mesh,
1284 instance,
1285 Some((session_id, expected_seqs)),
1286 )
1287 .await;
1288
1289 let sliced_actor_mesh = actor_mesh.range("hosts", 1..3).unwrap();
1291 let expected_seqs = vec![2; 2];
1293 testactor::assert_casting_correctness(
1294 &sliced_actor_mesh,
1295 instance,
1296 Some((session_id, expected_seqs)),
1297 )
1298 .await;
1299
1300 let sliced_actor_mesh = actor_mesh.range("hosts", 0..2).unwrap();
1302 let expected_seqs = vec![2, 3];
1306 testactor::assert_casting_correctness(
1307 &sliced_actor_mesh,
1308 instance,
1309 Some((session_id, expected_seqs)),
1310 )
1311 .await;
1312
1313 let _ = hm.shutdown(instance).await;
1314 }
1315
1316 #[async_timed_test(timeout_secs = 60)]
1317 #[cfg(fbcode_build)]
1318 async fn test_seq_from_different_senders() {
1319 let config = hyperactor_config::global::lock();
1320 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1321 let _guard2 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1322 let _guard3 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1323 let _guard4 = config.override_key(
1324 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1325 Duration::from_secs(60),
1326 );
1327
1328 hyperactor_telemetry::initialize_logging_for_test();
1329
1330 use hyperactor::Proc;
1331 use hyperactor::channel::ChannelTransport;
1332
1333 let proc = Proc::direct(ChannelTransport::Unix.any(), "test_0".to_string()).unwrap();
1334 let instance = proc
1335 .actor_instance::<testing::TestRootClient>("test_client")
1336 .unwrap()
1337 .instance;
1338 let first_instance = proc
1339 .actor_instance::<testing::TestRootClient>("first_client")
1340 .unwrap()
1341 .instance;
1342 let second_instance = proc
1343 .actor_instance::<testing::TestRootClient>("second_client")
1344 .unwrap()
1345 .instance;
1346 let third_instance = proc
1347 .actor_instance::<testing::TestRootClient>("third_client")
1348 .unwrap()
1349 .instance;
1350
1351 let mut hm = testing::host_mesh(2).await;
1352 let proc_mesh = hm
1353 .spawn(&instance, "test", extent!(gpus = 1), None, None)
1354 .await
1355 .unwrap();
1356
1357 let actor_mesh = spawn_for_seq_test(&instance, &proc_mesh).await;
1358
1359 for inst in [&first_instance, &second_instance, &third_instance] {
1362 let expected_seqs = vec![1; 2];
1363 let session_id = inst.sequencer().session_id();
1364 testactor::assert_casting_correctness(
1365 &actor_mesh,
1366 inst,
1367 Some((session_id, expected_seqs)),
1368 )
1369 .await;
1370 }
1371
1372 let _ = hm.shutdown(&instance).await;
1373 }
1374
1375 #[cfg(fbcode_build)]
1376 #[assert_no_process_leak]
1377 #[tokio::test]
1378 async fn test_failing_spawn_actor() {
1379 hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1380
1381 let config = hyperactor_config::global::lock();
1382 let _guard = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1383 let _guard2 = config.override_key(
1384 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1385 Duration::from_secs(60),
1386 );
1387
1388 let instance = testing::instance();
1389
1390 let mut hm = testing::host_mesh(1).await;
1391 let proc_mesh = hm
1392 .spawn(&instance, "test", extent!(gpus = 1), None, None)
1393 .await
1394 .unwrap();
1395 let err = proc_mesh
1396 .spawn::<testactor::FailingCreateTestActor, Instance<testing::TestRootClient>>(
1397 instance,
1398 "testfail",
1399 &(),
1400 )
1401 .await
1402 .unwrap_err();
1403 let statuses = err.into_actor_spawn_error().unwrap();
1404 assert_eq!(
1405 statuses,
1406 RankedValues::from((0..1, Status::Failed("test failure".to_string()))),
1407 );
1408
1409 let _ = hm.shutdown(instance).await;
1410 }
1411
1412 #[test]
1413 fn test_python_class_from_supervision_name() {
1414 use super::python_class_from_supervision_name;
1415
1416 assert_eq!(
1417 python_class_from_supervision_name("instance0.<my_module.MyWorker test_mesh>"),
1418 Some("Python<MyWorker>".to_string()),
1419 );
1420 assert_eq!(
1421 python_class_from_supervision_name(
1422 "instance0.<package.submodule.TrainingActor mesh_0>"
1423 ),
1424 Some("Python<TrainingActor>".to_string()),
1425 );
1426 assert_eq!(python_class_from_supervision_name("plain_name"), None,);
1428 assert_eq!(
1430 python_class_from_supervision_name("instance0.<NoModule mesh>"),
1431 None,
1432 );
1433 }
1434}