1use hyperactor::Actor;
10use hyperactor::Handler;
11use hyperactor::accum::ReducerOpts;
12use hyperactor::channel::ChannelTransport;
13use hyperactor::clock::Clock;
14use hyperactor::clock::RealClock;
15use hyperactor::host::Host;
16use hyperactor_config::CONFIG;
17use hyperactor_config::ConfigAttr;
18use hyperactor_config::attrs::declare_attrs;
19use ndslice::view::CollectMeshExt;
20
21use crate::supervision::MeshFailure;
22
23pub mod mesh_agent;
24
25use std::collections::HashSet;
26use std::ops::Deref;
27use std::str::FromStr;
28use std::sync::Arc;
29use std::time::Duration;
30
31use hyperactor::ActorRef;
32use hyperactor::ProcId;
33use hyperactor::channel::ChannelAddr;
34use hyperactor::context;
35use ndslice::Extent;
36use ndslice::Region;
37use ndslice::ViewExt;
38use ndslice::extent;
39use ndslice::view;
40use ndslice::view::Ranked;
41use ndslice::view::RegionParseError;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::Bootstrap;
47use crate::alloc::Alloc;
48use crate::bootstrap::BootstrapCommand;
49use crate::bootstrap::BootstrapProcManager;
50use crate::proc_mesh::DEFAULT_TRANSPORT;
51use crate::resource;
52use crate::resource::CreateOrUpdateClient;
53use crate::resource::GetRankStatus;
54use crate::resource::GetRankStatusClient;
55use crate::resource::ProcSpec;
56use crate::resource::RankedValues;
57use crate::resource::Status;
58use crate::v1;
59use crate::v1::Name;
60use crate::v1::ProcMesh;
61use crate::v1::ProcMeshRef;
62use crate::v1::ValueMesh;
63use crate::v1::host_mesh::mesh_agent::HostAgentMode;
64pub use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
65use crate::v1::host_mesh::mesh_agent::HostMeshAgentProcMeshTrampoline;
66use crate::v1::host_mesh::mesh_agent::ProcState;
67use crate::v1::host_mesh::mesh_agent::ShutdownHostClient;
68use crate::v1::mesh_controller::HostMeshController;
69use crate::v1::mesh_controller::ProcMeshController;
70use crate::v1::proc_mesh::ProcRef;
71
72declare_attrs! {
73 @meta(CONFIG = ConfigAttr {
76 env_name: Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
77 py_name: Some("mesh_proc_spawn_max_idle".to_string()),
78 })
79 pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
80
81 @meta(CONFIG = ConfigAttr {
84 env_name: Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
85 py_name: Some("proc_stop_max_idle".to_string()),
86 })
87 pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
88
89 @meta(CONFIG = ConfigAttr {
90 env_name: Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
91 py_name: Some("get_proc_state_max_idle".to_string()),
92 })
93 pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
98pub struct HostRef(ChannelAddr);
99wirevalue::register_type!(HostRef);
100
101impl HostRef {
102 fn mesh_agent(&self) -> ActorRef<HostMeshAgent> {
104 ActorRef::attest(self.service_proc().actor_id("agent", 0))
105 }
106
107 fn named_proc(&self, name: &Name) -> ProcId {
109 ProcId::Direct(self.0.clone(), name.to_string())
110 }
111
112 fn service_proc(&self) -> ProcId {
114 ProcId::Direct(self.0.clone(), "service".to_string())
115 }
116
117 pub(crate) async fn shutdown(
136 &self,
137 cx: &impl hyperactor::context::Actor,
138 ) -> anyhow::Result<()> {
139 let agent = self.mesh_agent();
140 let terminate_timeout =
141 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
142 let max_in_flight =
143 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
144 agent
145 .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
146 .await?;
147 Ok(())
148 }
149}
150
151impl TryFrom<ActorRef<HostMeshAgent>> for HostRef {
152 type Error = v1::Error;
153
154 fn try_from(value: ActorRef<HostMeshAgent>) -> Result<Self, v1::Error> {
155 let proc_id = value.actor_id().proc_id();
156 match proc_id.as_direct() {
157 Some((addr, _)) => Ok(HostRef(addr.clone())),
158 None => Err(v1::Error::RankedProc(proc_id.clone())),
159 }
160 }
161}
162
163impl std::fmt::Display for HostRef {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 self.0.fmt(f)
166 }
167}
168
169impl FromStr for HostRef {
170 type Err = <ChannelAddr as FromStr>::Err;
171
172 fn from_str(s: &str) -> Result<Self, Self::Err> {
173 Ok(HostRef(ChannelAddr::from_str(s)?))
174 }
175}
176
177#[allow(dead_code)]
190pub struct HostMesh {
191 name: Name,
192 extent: Extent,
193 allocation: HostMeshAllocation,
194 current_ref: HostMeshRef,
195}
196
197#[allow(dead_code)]
215enum HostMeshAllocation {
216 ProcMesh {
223 proc_mesh: ProcMesh,
224 proc_mesh_ref: ProcMeshRef,
225 hosts: Vec<HostRef>,
226 },
227 Owned { hosts: Vec<HostRef> },
235}
236
237impl HostMesh {
238 pub async fn local() -> v1::Result<HostMesh> {
261 Self::local_with_bootstrap(BootstrapCommand::current()?).await
262 }
263
264 pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> v1::Result<HostMesh> {
272 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
273 let err = boot.bootstrap().await;
274 tracing::error!("failed to bootstrap local host mesh process: {}", err);
275 std::process::exit(1);
276 }
277
278 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
279
280 let manager = BootstrapProcManager::new(bootstrap_cmd)?;
281 let host = Host::new(manager, addr).await?;
282 let addr = host.addr().clone();
283 let system_proc = host.system_proc().clone();
284 let host_mesh_agent = system_proc
285 .spawn("agent", HostMeshAgent::new(HostAgentMode::Process(host)))
286 .map_err(v1::Error::SingletonActorSpawnError)?;
287 host_mesh_agent.bind::<HostMeshAgent>();
288
289 let host = HostRef(addr);
290 let host_mesh_ref = HostMeshRef::new(
291 Name::new("local").unwrap(),
292 extent!(hosts = 1).into(),
293 vec![host],
294 )?;
295 Ok(HostMesh::take(host_mesh_ref))
296 }
297
298 pub async fn process(extent: Extent, command: BootstrapCommand) -> v1::Result<HostMesh> {
309 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
310 let err = boot.bootstrap().await;
311 tracing::error!("failed to bootstrap process host mesh process: {}", err);
312 std::process::exit(1);
313 }
314
315 let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
316 let mut hosts = Vec::with_capacity(extent.num_ranks());
317 for _ in 0..extent.num_ranks() {
318 let addr = bind_spec.binding_addr();
320 let bootstrap = Bootstrap::Host {
321 addr: addr.clone(),
322 command: Some(command.clone()),
323 config: Some(hyperactor_config::global::attrs()),
324 };
325
326 let mut cmd = command.new();
327 bootstrap.to_env(&mut cmd);
328 cmd.spawn()?;
329 hosts.push(HostRef(addr));
330 }
331
332 let host_mesh_ref = HostMeshRef::new(Name::new("process").unwrap(), extent.into(), hosts)?;
333 Ok(HostMesh::take(host_mesh_ref))
334 }
335
336 pub async fn allocate<C: context::Actor>(
380 cx: &C,
381 alloc: Box<dyn Alloc + Send + Sync>,
382 name: &str,
383 bootstrap_params: Option<BootstrapCommand>,
384 ) -> v1::Result<Self>
385 where
386 C::A: Handler<MeshFailure>,
387 {
388 Self::allocate_inner(cx, alloc, Name::new(name)?, bootstrap_params).await
389 }
390
391 #[hyperactor::instrument(fields(host_mesh=name.to_string()))]
393 async fn allocate_inner<C: context::Actor>(
394 cx: &C,
395 alloc: Box<dyn Alloc + Send + Sync>,
396 name: Name,
397 bootstrap_params: Option<BootstrapCommand>,
398 ) -> v1::Result<Self>
399 where
400 C::A: Handler<MeshFailure>,
401 {
402 tracing::info!(name = "HostMeshStatus", status = "Allocate::Attempt");
403 let transport = alloc.transport();
404 let extent = alloc.extent().clone();
405 let is_local = alloc.is_local();
406 let proc_mesh = ProcMesh::allocate(cx, alloc, name.name()).await?;
407
408 let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
413 let trampoline_name = Name::new("host_mesh_trampoline").unwrap();
414 let _trampoline_actor_mesh = proc_mesh
415 .spawn_with_name::<HostMeshAgentProcMeshTrampoline, C>(
416 cx,
417 trampoline_name,
418 &(transport, mesh_agents.bind(), bootstrap_params, is_local),
419 None,
420 true,
422 )
423 .await?;
424
425 let mut hosts = Vec::new();
427 for _rank in 0..extent.num_ranks() {
428 let mesh_agent = mesh_agents_rx.recv().await?;
429
430 let Some((addr, _)) = mesh_agent.actor_id().proc_id().as_direct() else {
431 return Err(v1::Error::HostMeshAgentConfigurationError(
432 mesh_agent.actor_id().clone(),
433 "host mesh agent must be a direct actor".to_string(),
434 ));
435 };
436
437 let host_ref = HostRef(addr.clone());
438 if host_ref.mesh_agent() != mesh_agent {
439 return Err(v1::Error::HostMeshAgentConfigurationError(
440 mesh_agent.actor_id().clone(),
441 format!(
442 "expected mesh agent actor id to be {}",
443 host_ref.mesh_agent().actor_id()
444 ),
445 ));
446 }
447 hosts.push(host_ref);
448 }
449
450 let proc_mesh_ref = proc_mesh.clone();
451 let mesh = Self {
452 name: name.clone(),
453 extent: extent.clone(),
454 allocation: HostMeshAllocation::ProcMesh {
455 proc_mesh,
456 proc_mesh_ref,
457 hosts: hosts.clone(),
458 },
459 current_ref: HostMeshRef::new(name, extent.into(), hosts).unwrap(),
460 };
461
462 let controller = HostMeshController::new(mesh.deref().clone());
465 controller
466 .spawn(cx)
467 .map_err(|e| v1::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
468
469 tracing::info!(name = "HostMeshStatus", status = "Allocate::Created");
470 Ok(mesh)
471 }
472
473 pub fn take(mesh: HostMeshRef) -> Self {
480 let region = mesh.region().clone();
481 let hosts: Vec<HostRef> = mesh.values().collect();
482
483 let current_ref = HostMeshRef::new(mesh.name.clone(), region.clone(), hosts.clone())
484 .expect("region/hosts cardinality must match");
485
486 Self {
487 name: mesh.name,
488 extent: region.extent().clone(),
489 allocation: HostMeshAllocation::Owned { hosts },
490 current_ref,
491 }
492 }
493
494 #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
504 pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
505 tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
506 let mut failed_hosts = vec![];
507 for host in self.current_ref.values() {
508 if let Err(e) = host.shutdown(cx).await {
509 tracing::warn!(
510 name = "HostMeshStatus",
511 status = "Shutdown::Host::Failed",
512 host = %host,
513 error = %e,
514 "host shutdown failed"
515 );
516 failed_hosts.push(host);
517 }
518 }
519 if failed_hosts.is_empty() {
520 tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success");
521 } else {
522 tracing::error!(
523 name = "HostMeshStatus",
524 status = "Shutdown::Failed",
525 "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
526 failed_hosts
527 );
528 }
529
530 match &mut self.allocation {
531 HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
532 proc_mesh.stop(cx).await?;
533 }
534 HostMeshAllocation::Owned { .. } => {}
535 }
536 Ok(())
537 }
538}
539
540impl Deref for HostMesh {
541 type Target = HostMeshRef;
542
543 fn deref(&self) -> &Self::Target {
544 &self.current_ref
545 }
546}
547
548impl Drop for HostMesh {
549 fn drop(&mut self) {
567 tracing::info!(
568 name = "HostMeshStatus",
569 host_mesh = %self.name,
570 status = "Dropping",
571 );
572 let hosts: Vec<HostRef> = match &self.allocation {
574 HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
575 hosts.clone()
576 }
577 };
578
579 if let Ok(handle) = tokio::runtime::Handle::try_current() {
581 let mesh_name = self.name.clone();
582 let allocation_label = match &self.allocation {
583 HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
584 HostMeshAllocation::Owned { .. } => "owned",
585 }
586 .to_string();
587
588 handle.spawn(async move {
589 let span = tracing::info_span!(
590 "hostmesh_drop_cleanup",
591 host_mesh = %mesh_name,
592 allocation = %allocation_label,
593 hosts = hosts.len(),
594 );
595 let _g = span.enter();
596
597 match hyperactor::Proc::direct(
600 ChannelTransport::Unix.any(),
601 "hostmesh-drop".to_string(),
602 )
603 {
604 Err(e) => {
605 tracing::warn!(
606 error = %e,
607 "failed to construct ephemeral Proc for drop-cleanup; \
608 relying on PDEATHSIG/manager Drop"
609 );
610 }
611 Ok(proc) => {
612 match proc.instance("drop") {
613 Err(e) => {
614 tracing::warn!(
615 error = %e,
616 "failed to create ephemeral instance for drop-cleanup; \
617 relying on PDEATHSIG/manager Drop"
618 );
619 }
620 Ok((instance, _guard)) => {
621 let mut attempted = 0usize;
622 let mut ok = 0usize;
623 let mut err = 0usize;
624
625 for host in hosts {
626 attempted += 1;
627 tracing::debug!(host = %host, "drop-cleanup: shutdown start");
628 match host.shutdown(&instance).await {
629 Ok(()) => {
630 ok += 1;
631 tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
632 }
633 Err(e) => {
634 err += 1;
635 tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
636 }
637 }
638 }
639
640 tracing::info!(
641 attempted, ok, err,
642 "hostmesh drop-cleanup summary"
643 );
644 }
645 }
646 }
647 }
648 });
649 } else {
650 tracing::warn!(
653 host_mesh = %self.name,
654 hosts = hosts.len(),
655 "HostMesh dropped without a Tokio runtime; skipping \
656 best-effort shutdown. This indicates that .shutdown() \
657 on this mesh has not been called before program exit \
658 (perhaps due to a missing call to \
659 'monarch.actor.shutdown_context()'?) This in turn can \
660 lead to backtrace output due to folly SIGTERM \
661 handlers."
662 );
663 }
664
665 tracing::info!(
666 name = "HostMeshStatus",
667 host_mesh = %self.name,
668 status = "Dropped",
669 );
670 }
671}
672
673pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
682 mesh: &ValueMesh<T>,
683 default: T,
684 is_sentinel: F,
685 len: usize,
686) -> RankedValues<T>
687where
688 T: Eq + Clone + 'static,
689 F: Fn(&T) -> bool,
690{
691 let mut out = RankedValues::from((0..len, default));
692 for (i, s) in mesh.values().enumerate() {
693 if !is_sentinel(&s) {
694 out.merge_from(RankedValues::from((i..i + 1, s)));
695 }
696 }
697 out
698}
699
700#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
719pub struct HostMeshRef {
720 name: Name,
721 region: Region,
722 ranks: Arc<Vec<HostRef>>,
723}
724wirevalue::register_type!(HostMeshRef);
725
726impl HostMeshRef {
727 #[allow(clippy::result_large_err)]
730 fn new(name: Name, region: Region, ranks: Vec<HostRef>) -> v1::Result<Self> {
731 if region.num_ranks() != ranks.len() {
732 return Err(v1::Error::InvalidRankCardinality {
733 expected: region.num_ranks(),
734 actual: ranks.len(),
735 });
736 }
737 Ok(Self {
738 name,
739 region,
740 ranks: Arc::new(ranks),
741 })
742 }
743
744 pub fn from_hosts(name: Name, hosts: Vec<ChannelAddr>) -> Self {
747 Self {
748 name,
749 region: extent!(hosts = hosts.len()).into(),
750 ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
751 }
752 }
753
754 pub fn from_host_agents(name: Name, agents: Vec<ActorRef<HostMeshAgent>>) -> v1::Result<Self> {
756 Ok(Self {
757 name,
758 region: extent!(hosts = agents.len()).into(),
759 ranks: Arc::new(
760 agents
761 .into_iter()
762 .map(HostRef::try_from)
763 .collect::<v1::Result<_>>()?,
764 ),
765 })
766 }
767
768 pub fn from_host_agent(name: Name, agent: ActorRef<HostMeshAgent>) -> v1::Result<Self> {
770 Ok(Self {
771 name,
772 region: Extent::unity().into(),
773 ranks: Arc::new(vec![HostRef::try_from(agent)?]),
774 })
775 }
776
777 #[allow(clippy::result_large_err)]
783 pub async fn spawn<C: context::Actor>(
784 &self,
785 cx: &C,
786 name: &str,
787 per_host: Extent,
788 ) -> v1::Result<ProcMesh>
789 where
790 C::A: Handler<MeshFailure>,
791 {
792 self.spawn_inner(cx, Name::new(name)?, per_host).await
793 }
794
795 #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
796 async fn spawn_inner<C: context::Actor>(
797 &self,
798 cx: &C,
799 proc_mesh_name: Name,
800 per_host: Extent,
801 ) -> v1::Result<ProcMesh>
802 where
803 C::A: Handler<MeshFailure>,
804 {
805 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
806 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
807 let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await;
808 match &result {
809 Ok(_) => {
810 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
811 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
812 }
813 Err(error) => {
814 tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
815 tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
816 }
817 }
818 result
819 }
820
821 async fn spawn_inner_inner<C: context::Actor>(
822 &self,
823 cx: &C,
824 proc_mesh_name: Name,
825 per_host: Extent,
826 ) -> v1::Result<ProcMesh>
827 where
828 C::A: Handler<MeshFailure>,
829 {
830 let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
831 let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
832 if !per_host_labels
833 .intersection(&host_labels)
834 .collect::<Vec<_>>()
835 .is_empty()
836 {
837 return Err(v1::Error::ConfigurationError(anyhow::anyhow!(
838 "per_host dims overlap with existing dims when spawning proc mesh"
839 )));
840 }
841
842 let extent = self
843 .region
844 .extent()
845 .concat(&per_host)
846 .map_err(|err| v1::Error::ConfigurationError(err.into()))?;
847
848 let region: Region = extent.clone().into();
849
850 tracing::info!(
851 name = "ProcMeshStatus",
852 status = "Spawn::Attempt",
853 %region,
854 "spawning proc mesh"
855 );
856
857 let mut procs = Vec::new();
858 let num_ranks = region.num_ranks();
859 let (port, rx) = cx.mailbox().open_accum_port_opts(
862 crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
863 Some(ReducerOpts {
864 max_update_interval: Some(Duration::from_millis(50)),
865 initial_update_interval: None,
866 }),
867 );
868
869 let mut proc_names = Vec::new();
876 let client_config_override = hyperactor_config::global::attrs();
877 for (host_rank, host) in self.ranks.iter().enumerate() {
878 for per_host_rank in 0..per_host.num_ranks() {
879 let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
880 let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank))?;
881 proc_names.push(proc_name.clone());
882 host.mesh_agent()
883 .create_or_update(
884 cx,
885 proc_name.clone(),
886 resource::Rank::new(create_rank),
887 ProcSpec::new(client_config_override.clone()),
888 )
889 .await
890 .map_err(|e| {
891 v1::Error::HostMeshAgentConfigurationError(
892 host.mesh_agent().actor_id().clone(),
893 format!("failed while creating proc: {}", e),
894 )
895 })?;
896 let mut reply_port = port.bind();
897 reply_port.return_undeliverable(false);
900 host.mesh_agent()
901 .get_rank_status(cx, proc_name.clone(), reply_port)
902 .await
903 .map_err(|e| {
904 v1::Error::HostMeshAgentConfigurationError(
905 host.mesh_agent().actor_id().clone(),
906 format!("failed while querying proc status: {}", e),
907 )
908 })?;
909 let proc_id = host.named_proc(&proc_name);
910 tracing::info!(
911 name = "ProcMeshStatus",
912 status = "Spawn::CreatingProc",
913 %proc_id,
914 rank = create_rank,
915 );
916 procs.push(ProcRef::new(
917 proc_id,
918 create_rank,
919 ActorRef::attest(host.named_proc(&proc_name).actor_id("agent", 0)),
921 ));
922 }
923 }
924
925 let start_time = RealClock.now();
926
927 match GetRankStatus::wait(
930 rx,
931 num_ranks,
932 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
933 region.clone(), )
935 .await
936 {
937 Ok(statuses) => {
938 if let Some((rank, status)) = statuses
941 .values()
942 .enumerate()
943 .find(|(_, s)| s.is_terminating())
944 {
945 let proc_name = &proc_names[rank];
946 let host_rank = rank / per_host.num_ranks();
947 let mesh_agent = self.ranks[host_rank].mesh_agent();
948 let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
949 let mut reply_tx = reply_tx.bind();
950 reply_tx.return_undeliverable(false);
953 mesh_agent
954 .send(
955 cx,
956 resource::GetState {
957 name: proc_name.clone(),
958 reply: reply_tx,
959 },
960 )
961 .map_err(|e| {
962 v1::Error::SendingError(mesh_agent.actor_id().clone(), e.into())
963 })?;
964 let state = match RealClock
965 .timeout(
966 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
967 reply_rx.recv(),
968 )
969 .await
970 {
971 Ok(Ok(state)) => state,
972 _ => resource::State {
973 name: proc_name.clone(),
974 status,
975 state: None,
976 },
977 };
978
979 tracing::error!(
980 name = "ProcMeshStatus",
981 status = "Spawn::GetRankStatus",
982 rank = host_rank,
983 "rank {} is terminating with state: {}",
984 host_rank,
985 state
986 );
987
988 return Err(v1::Error::ProcCreationError {
989 state: Box::new(state),
990 host_rank,
991 mesh_agent,
992 });
993 }
994 }
995 Err(complete) => {
996 tracing::error!(
997 name = "ProcMeshStatus",
998 status = "Spawn::GetRankStatus",
999 "timeout after {:?} when waiting for procs being created",
1000 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1001 );
1002 let legacy = mesh_to_rankedvalues_with_default(
1005 &complete,
1006 Status::Timeout(start_time.elapsed()),
1007 Status::is_not_exist,
1008 num_ranks,
1009 );
1010 return Err(v1::Error::ProcSpawnError { statuses: legacy });
1011 }
1012 }
1013
1014 let mesh =
1015 ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
1016 if let Ok(ref mesh) = mesh {
1017 let controller = ProcMeshController::new(mesh.deref().clone());
1020 controller
1021 .spawn(cx)
1022 .map_err(|e| v1::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1023 }
1024 mesh
1025 }
1026
1027 pub fn name(&self) -> &Name {
1029 &self.name
1030 }
1031
1032 #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1033 pub(crate) async fn stop_proc_mesh(
1034 &self,
1035 cx: &impl hyperactor::context::Actor,
1036 proc_mesh_name: &Name,
1037 procs: impl IntoIterator<Item = ProcId>,
1038 region: Region,
1039 ) -> anyhow::Result<()> {
1040 let mut proc_names = Vec::new();
1043 let num_ranks = region.num_ranks();
1044 let (port, rx) = cx.mailbox().open_accum_port_opts(
1047 crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
1048 Some(ReducerOpts {
1049 max_update_interval: Some(Duration::from_millis(50)),
1050 initial_update_interval: None,
1051 }),
1052 );
1053 for proc_id in procs.into_iter() {
1054 let Some((addr, proc_name)) = proc_id.as_direct() else {
1055 return Err(anyhow::anyhow!(
1056 "host mesh proc {} must be direct addressed",
1057 proc_id,
1058 ));
1059 };
1060 let proc_name = proc_name.parse::<Name>()?;
1064 proc_names.push(proc_name.clone());
1065
1066 let host = HostRef(addr.clone());
1069 host.mesh_agent().send(
1070 cx,
1071 resource::Stop {
1072 name: proc_name.clone(),
1073 },
1074 )?;
1075 host.mesh_agent()
1076 .get_rank_status(cx, proc_name, port.bind())
1077 .await?;
1078
1079 tracing::info!(
1080 name = "ProcMeshStatus",
1081 %proc_id,
1082 status = "Stop::Sent",
1083 );
1084 }
1085 tracing::info!(
1086 name = "HostMeshStatus",
1087 status = "ProcMesh::Stop::Sent",
1088 "sending Stop to proc mesh for {} procs: {}",
1089 proc_names.len(),
1090 proc_names
1091 .iter()
1092 .map(|n| n.to_string())
1093 .collect::<Vec<_>>()
1094 .join(", ")
1095 );
1096
1097 let start_time = RealClock.now();
1098
1099 match GetRankStatus::wait(
1100 rx,
1101 num_ranks,
1102 hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1103 region.clone(), )
1105 .await
1106 {
1107 Ok(statuses) => {
1108 let all_stopped = statuses.values().all(|s| s.is_terminating());
1109 if !all_stopped {
1110 tracing::error!(
1111 name = "ProcMeshStatus",
1112 status = "FailedToStop",
1113 "failed to terminate proc mesh: {:?}",
1114 statuses,
1115 );
1116 return Err(anyhow::anyhow!(
1117 "failed to terminate proc mesh: {:?}",
1118 statuses,
1119 ));
1120 }
1121 tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1122 }
1123 Err(complete) => {
1124 let legacy = mesh_to_rankedvalues_with_default(
1127 &complete,
1128 Status::Timeout(start_time.elapsed()),
1129 Status::is_not_exist,
1130 num_ranks,
1131 );
1132 tracing::error!(
1133 name = "ProcMeshStatus",
1134 status = "StoppingTimeout",
1135 "failed to terminate proc mesh before timeout: {:?}",
1136 legacy,
1137 );
1138 return Err(anyhow::anyhow!(
1139 "failed to terminate proc mesh {} before timeout: {:?}",
1140 proc_mesh_name,
1141 legacy
1142 ));
1143 }
1144 }
1145 Ok(())
1146 }
1147
1148 #[allow(clippy::result_large_err)]
1153 pub(crate) async fn proc_states(
1154 &self,
1155 cx: &impl context::Actor,
1156 procs: impl IntoIterator<Item = ProcId>,
1157 region: Region,
1158 ) -> v1::Result<ValueMesh<resource::State<ProcState>>> {
1159 let (tx, mut rx) = cx.mailbox().open_port();
1160
1161 let mut num_ranks = 0;
1162 let procs: Vec<ProcId> = procs.into_iter().collect();
1163 let mut proc_names = Vec::new();
1164 for proc_id in procs.iter() {
1165 num_ranks += 1;
1166 let Some((addr, proc_name)) = proc_id.as_direct() else {
1167 return Err(v1::Error::ConfigurationError(anyhow::anyhow!(
1168 "host mesh proc {} must be direct addressed",
1169 proc_id,
1170 )));
1171 };
1172
1173 let host = HostRef(addr.clone());
1176 let proc_name = proc_name.parse::<Name>()?;
1177 proc_names.push(proc_name.clone());
1178 let mut reply = tx.bind();
1179 reply.return_undeliverable(false);
1182 host.mesh_agent()
1183 .send(
1184 cx,
1185 resource::GetState {
1186 name: proc_name,
1187 reply,
1188 },
1189 )
1190 .map_err(|e| {
1191 v1::Error::CallError(host.mesh_agent().actor_id().clone(), e.into())
1192 })?;
1193 }
1194
1195 let mut states = Vec::with_capacity(num_ranks);
1196 let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1197 for _ in 0..num_ranks {
1198 let state = RealClock.timeout(timeout, rx.recv()).await;
1204 if let Ok(state) = state {
1205 let state = state?;
1207 match state.state {
1208 Some(ref inner) => {
1209 states.push((inner.create_rank, state));
1210 }
1211 None => {
1212 return Err(v1::Error::NotExist(state.name));
1213 }
1214 }
1215 } else {
1216 tracing::warn!(
1219 "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1220 timeout
1221 );
1222 let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1223 let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1224 let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1225 assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1226 while states.len() < num_ranks {
1227 let rank = *leftover_ranks
1228 .pop()
1229 .expect("leftover ranks should not be empty");
1230 states.push((
1231 rank,
1233 resource::State {
1234 name: proc_names[rank].clone(),
1235 status: resource::Status::Timeout(timeout),
1236 state: None,
1237 },
1238 ));
1239 }
1240 break;
1241 }
1242 }
1243 states.sort_by_key(|(rank, _)| *rank);
1247 let vm = states
1248 .into_iter()
1249 .map(|(_, state)| state)
1250 .collect_mesh::<ValueMesh<_>>(region)?;
1251 Ok(vm)
1252 }
1253}
1254
1255impl view::Ranked for HostMeshRef {
1256 type Item = HostRef;
1257
1258 fn region(&self) -> &Region {
1259 &self.region
1260 }
1261
1262 fn get(&self, rank: usize) -> Option<&Self::Item> {
1263 self.ranks.get(rank)
1264 }
1265}
1266
1267impl view::RankedSliceable for HostMeshRef {
1268 fn sliced(&self, region: Region) -> Self {
1269 let ranks = self
1270 .region()
1271 .remap(®ion)
1272 .unwrap()
1273 .map(|index| self.get(index).unwrap().clone());
1274 Self::new(self.name.clone(), region, ranks.collect()).unwrap()
1275 }
1276}
1277
1278impl std::fmt::Display for HostMeshRef {
1279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1280 write!(f, "{}:", self.name)?;
1281 for (rank, host) in self.ranks.iter().enumerate() {
1282 if rank > 0 {
1283 write!(f, ",")?;
1284 }
1285 write!(f, "{}", host)?;
1286 }
1287 write!(f, "@{}", self.region)
1288 }
1289}
1290
1291#[derive(thiserror::Error, Debug)]
1293pub enum HostMeshRefParseError {
1294 #[error(transparent)]
1295 RegionParseError(#[from] RegionParseError),
1296
1297 #[error("invalid host mesh ref: missing region")]
1298 MissingRegion,
1299
1300 #[error("invalid host mesh ref: missing name")]
1301 MissingName,
1302
1303 #[error(transparent)]
1304 InvalidName(#[from] v1::NameParseError),
1305
1306 #[error(transparent)]
1307 InvalidHostMeshRef(#[from] Box<v1::Error>),
1308
1309 #[error(transparent)]
1310 Other(#[from] anyhow::Error),
1311}
1312
1313impl From<v1::Error> for HostMeshRefParseError {
1314 fn from(err: v1::Error) -> Self {
1315 Self::InvalidHostMeshRef(Box::new(err))
1316 }
1317}
1318
1319impl FromStr for HostMeshRef {
1320 type Err = HostMeshRefParseError;
1321
1322 fn from_str(s: &str) -> Result<Self, Self::Err> {
1323 let (name, rest) = s
1324 .split_once(':')
1325 .ok_or(HostMeshRefParseError::MissingName)?;
1326
1327 let name = Name::from_str(name)?;
1328
1329 let (hosts, region) = rest
1330 .split_once('@')
1331 .ok_or(HostMeshRefParseError::MissingRegion)?;
1332 let hosts = hosts
1333 .split(',')
1334 .map(|host| host.trim())
1335 .map(|host| host.parse::<HostRef>())
1336 .collect::<Result<Vec<_>, _>>()?;
1337 let region = region.parse()?;
1338 Ok(HostMeshRef::new(name, region, hosts)?)
1339 }
1340}
1341
1342#[cfg(test)]
1343mod tests {
1344 use std::assert_matches::assert_matches;
1345 use std::collections::HashSet;
1346 use std::collections::VecDeque;
1347
1348 use hyperactor::context::Mailbox as _;
1349 use hyperactor_config::attrs::Attrs;
1350 use itertools::Itertools;
1351 use ndslice::ViewExt;
1352 use ndslice::extent;
1353 use tokio::process::Command;
1354
1355 use super::*;
1356 use crate::Bootstrap;
1357 use crate::bootstrap::MESH_TAIL_LOG_LINES;
1358 use crate::resource::Status;
1359 use crate::v1::ActorMesh;
1360 use crate::v1::testactor;
1361 use crate::v1::testactor::GetConfigAttrs;
1362 use crate::v1::testactor::SetConfigAttrs;
1363 use crate::v1::testing;
1364
1365 #[test]
1366 fn test_host_mesh_subset() {
1367 let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1368 .parse()
1369 .unwrap();
1370 assert_eq!(
1371 hosts.range("replica", 1).unwrap().to_string(),
1372 "test:local:3,local:4@2+replica=1/2,host=2/1"
1373 );
1374 }
1375
1376 #[test]
1377 fn test_host_mesh_ref_parse_roundtrip() {
1378 let host_mesh_ref = HostMeshRef::new(
1379 Name::new("test").unwrap(),
1380 extent!(replica = 2, host = 2).into(),
1381 vec![
1382 "tcp:127.0.0.1:123".parse().unwrap(),
1383 "tcp:127.0.0.1:123".parse().unwrap(),
1384 "tcp:127.0.0.1:123".parse().unwrap(),
1385 "tcp:127.0.0.1:123".parse().unwrap(),
1386 ],
1387 )
1388 .unwrap();
1389
1390 assert_eq!(
1391 host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
1392 host_mesh_ref
1393 );
1394 }
1395
1396 #[tokio::test]
1397 #[cfg(fbcode_build)]
1398 async fn test_allocate() {
1399 let config = hyperactor_config::global::lock();
1400 let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1401
1402 let instance = testing::instance();
1403
1404 for alloc in testing::allocs(extent!(replicas = 4)).await {
1405 let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1406 .await
1407 .unwrap();
1408
1409 let proc_mesh1 = host_mesh
1410 .spawn(instance, "test_1", Extent::unity())
1411 .await
1412 .unwrap();
1413
1414 let actor_mesh1: ActorMesh<testactor::TestActor> =
1415 proc_mesh1.spawn(instance, "test", &()).await.unwrap();
1416
1417 let proc_mesh2 = host_mesh
1418 .spawn(instance, "test_2", extent!(gpus = 3, extra = 2))
1419 .await
1420 .unwrap();
1421 assert_eq!(
1422 proc_mesh2.extent(),
1423 extent!(replicas = 4, gpus = 3, extra = 2)
1424 );
1425 assert_eq!(proc_mesh2.values().count(), 24);
1426
1427 let actor_mesh2: ActorMesh<testactor::TestActor> =
1428 proc_mesh2.spawn(instance, "test", &()).await.unwrap();
1429 assert_eq!(
1430 actor_mesh2.extent(),
1431 extent!(replicas = 4, gpus = 3, extra = 2)
1432 );
1433 assert_eq!(actor_mesh2.values().count(), 24);
1434
1435 let host_mesh_ref: HostMeshRef = host_mesh.clone();
1437 assert_eq!(
1439 host_mesh_ref.iter().collect::<Vec<_>>(),
1440 host_mesh.iter().collect::<Vec<_>>(),
1441 );
1442
1443 for actor_mesh in [&actor_mesh1, &actor_mesh2] {
1445 let (port, mut rx) = instance.mailbox().open_port();
1446 actor_mesh
1447 .cast(instance, testactor::GetActorId(port.bind()))
1448 .unwrap();
1449
1450 let mut expected_actor_ids: HashSet<_> = actor_mesh
1451 .values()
1452 .map(|actor_ref| actor_ref.actor_id().clone())
1453 .collect();
1454
1455 while !expected_actor_ids.is_empty() {
1456 let actor_id = rx.recv().await.unwrap();
1457 assert!(
1458 expected_actor_ids.remove(&actor_id),
1459 "got {actor_id}, expect {expected_actor_ids:?}"
1460 );
1461 }
1462 }
1463
1464 let mut to_visit: VecDeque<_> = actor_mesh1
1468 .values()
1469 .chain(actor_mesh2.values())
1470 .map(|actor_ref| actor_ref.port())
1471 .permutations(2)
1473 .flatten()
1475 .collect();
1476
1477 let expect_visited: Vec<_> = to_visit.clone().into();
1478
1479 let (last, mut last_rx) = instance.mailbox().open_port();
1481 to_visit.push_back(last.bind());
1482
1483 let forward = testactor::Forward {
1484 to_visit,
1485 visited: Vec::new(),
1486 };
1487 let first = forward.to_visit.front().unwrap().clone();
1488 first.send(instance, forward).unwrap();
1489
1490 let forward = last_rx.recv().await.unwrap();
1491 assert_eq!(forward.visited, expect_visited);
1492
1493 let _ = host_mesh.shutdown(&instance).await;
1494 }
1495 }
1496
1497 fn free_localhost_addr() -> ChannelAddr {
1503 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1504 ChannelAddr::Tcp(listener.local_addr().unwrap())
1505 }
1506
1507 #[tokio::test]
1508 #[cfg(fbcode_build)]
1509 async fn test_extrinsic_allocation() {
1510 let config = hyperactor_config::global::lock();
1511 let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1512
1513 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1514
1515 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1516
1517 let mut children = Vec::new();
1518 for host in hosts.iter() {
1519 let mut cmd = Command::new(program.clone());
1520 let boot = Bootstrap::Host {
1521 addr: host.clone(),
1522 command: None, config: None,
1524 };
1525 boot.to_env(&mut cmd);
1526 cmd.kill_on_drop(true);
1527 children.push(cmd.spawn().unwrap());
1528 }
1529
1530 let instance = testing::instance();
1531 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1532
1533 let proc_mesh = host_mesh
1534 .spawn(&testing::instance(), "test", Extent::unity())
1535 .await
1536 .unwrap();
1537
1538 let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1539 .spawn(&testing::instance(), "test", &())
1540 .await
1541 .unwrap();
1542
1543 testactor::assert_mesh_shape(actor_mesh).await;
1544
1545 HostMesh::take(host_mesh)
1546 .shutdown(&instance)
1547 .await
1548 .expect("hosts shutdown");
1549 }
1550
1551 #[tokio::test]
1552 #[cfg(fbcode_build)]
1553 async fn test_failing_proc_allocation() {
1554 let lock = hyperactor_config::global::lock();
1555 let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
1556
1557 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1558
1559 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1560
1561 let mut children = Vec::new();
1562 for host in hosts.iter() {
1563 let mut cmd = Command::new(program.clone());
1564 let boot = Bootstrap::Host {
1565 addr: host.clone(),
1566 config: None,
1567 command: Some(BootstrapCommand::from("false")),
1569 };
1570 boot.to_env(&mut cmd);
1571 cmd.kill_on_drop(true);
1572 children.push(cmd.spawn().unwrap());
1573 }
1574 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1575
1576 let instance = testing::instance();
1577
1578 let err = host_mesh
1579 .spawn(&instance, "test", Extent::unity())
1580 .await
1581 .unwrap_err();
1582 assert_matches!(
1583 err,
1584 v1::Error::ProcCreationError { state, .. }
1585 if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
1586 );
1587 }
1588
1589 #[tokio::test]
1590 #[cfg(fbcode_build)]
1591 async fn test_halting_proc_allocation() {
1592 let config = hyperactor_config::global::lock();
1593 let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
1594
1595 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1596
1597 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1598
1599 let mut children = Vec::new();
1600
1601 for (index, host) in hosts.iter().enumerate() {
1602 let mut cmd = Command::new(program.clone());
1603 let command = if index == 0 {
1604 let mut command = BootstrapCommand::from("sleep");
1605 command.args.push("60".to_string());
1606 Some(command)
1607 } else {
1608 None
1609 };
1610 let boot = Bootstrap::Host {
1611 addr: host.clone(),
1612 config: None,
1613 command,
1614 };
1615 boot.to_env(&mut cmd);
1616 cmd.kill_on_drop(true);
1617 children.push(cmd.spawn().unwrap());
1618 }
1619 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1620
1621 let instance = testing::instance();
1622
1623 let err = host_mesh
1624 .spawn(&instance, "test", Extent::unity())
1625 .await
1626 .unwrap_err();
1627 let statuses = err.into_proc_spawn_error().unwrap();
1628 assert_matches!(
1629 &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
1630 &[Status::Timeout(_), Status::Running]
1631 );
1632 }
1633
1634 #[tokio::test]
1635 #[cfg(fbcode_build)]
1636 async fn test_client_config_override() {
1637 let config = hyperactor_config::global::lock();
1638 let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1639 let _guard2 = config.override_key(
1640 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1641 Duration::from_mins(2),
1642 );
1643 let _guard3 = config.override_key(
1644 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1645 Duration::from_mins(1),
1646 );
1647
1648 unsafe {
1653 std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
1654 std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
1655 }
1656
1657 let instance = testing::instance();
1658
1659 let proc_meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await;
1660 let proc_mesh = proc_meshes.get(1).unwrap();
1661
1662 let actor_mesh: ActorMesh<testactor::TestActor> =
1663 proc_mesh.spawn(instance, "test", &()).await.unwrap();
1664
1665 let mut attrs_override = Attrs::new();
1666 attrs_override.set(
1667 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1668 Duration::from_mins(3),
1669 );
1670 actor_mesh
1671 .cast(
1672 instance,
1673 SetConfigAttrs(bincode::serialize(&attrs_override).unwrap()),
1674 )
1675 .unwrap();
1676
1677 let (tx, mut rx) = instance.open_port();
1678 actor_mesh
1679 .cast(instance, GetConfigAttrs(tx.bind()))
1680 .unwrap();
1681 let actual_attrs = rx.recv().await.unwrap();
1682 let actual_attrs = bincode::deserialize::<Attrs>(&actual_attrs).unwrap();
1683
1684 assert_eq!(
1685 *actual_attrs
1686 .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
1687 .unwrap(),
1688 Duration::from_mins(3)
1689 );
1690 assert_eq!(
1691 *actual_attrs
1692 .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
1693 .unwrap(),
1694 Duration::from_mins(1)
1695 );
1696 }
1697}