1use hyperactor::Actor;
10use hyperactor::Handler;
11use hyperactor::accum::StreamingReducerOpts;
12use hyperactor::channel::ChannelTransport;
13use hyperactor::host::Host;
14use hyperactor::host::LocalProcManager;
15use hyperactor::host::SERVICE_PROC_NAME;
16use hyperactor_config::CONFIG;
17use hyperactor_config::ConfigAttr;
18use hyperactor_config::attrs::declare_attrs;
19use ndslice::view::CollectMeshExt;
20
21use crate::supervision::MeshFailure;
22
23pub mod host_agent;
24
25use std::collections::HashSet;
26use std::hash::Hash;
27use std::ops::Deref;
28use std::ops::DerefMut;
29use std::str::FromStr;
30use std::sync::Arc;
31use std::time::Duration;
32
33use hyperactor::channel::ChannelAddr;
34use hyperactor::context;
35use hyperactor::reference as hyperactor_reference;
36use ndslice::Extent;
37use ndslice::Region;
38use ndslice::ViewExt;
39use ndslice::extent;
40use ndslice::view;
41use ndslice::view::Ranked;
42use ndslice::view::RegionParseError;
43use serde::Deserialize;
44use serde::Serialize;
45use typeuri::Named;
46
47use crate::Bootstrap;
48use crate::Name;
49use crate::ProcMesh;
50use crate::ProcMeshRef;
51use crate::ValueMesh;
52use crate::alloc::Alloc;
53use crate::bootstrap::BootstrapCommand;
54use crate::bootstrap::BootstrapProcManager;
55use crate::bootstrap::ProcBind;
56use crate::host_mesh::host_agent::DrainHostClient;
57pub use crate::host_mesh::host_agent::HostAgent;
58use crate::host_mesh::host_agent::HostAgentMode;
59use crate::host_mesh::host_agent::HostMeshAgentProcMeshTrampoline;
60use crate::host_mesh::host_agent::ProcManagerSpawnFn;
61use crate::host_mesh::host_agent::ProcState;
62use crate::host_mesh::host_agent::SetClientConfigClient;
63use crate::host_mesh::host_agent::ShutdownHostClient;
64use crate::mesh_admin::MeshAdminMessageClient;
65use crate::mesh_controller::HostMeshController;
66use crate::mesh_controller::ProcMeshController;
67use crate::proc_agent::ProcAgent;
68use crate::proc_mesh::ProcRef;
69use crate::resource;
70use crate::resource::CreateOrUpdateClient;
71use crate::resource::GetRankStatus;
72use crate::resource::GetRankStatusClient;
73use crate::resource::RankedValues;
74use crate::resource::Status;
75use crate::resource::WaitRankStatusClient;
76use crate::transport::DEFAULT_TRANSPORT;
77
78pub const HOST_MESH_CONTROLLER_NAME: &str = "host_mesh_controller";
80
81pub const PROC_MESH_CONTROLLER_NAME: &str = "proc_mesh_controller";
83
84declare_attrs! {
85 @meta(CONFIG = ConfigAttr::new(
88 Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
89 Some("mesh_proc_spawn_max_idle".to_string()),
90 ))
91 pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
92
93 @meta(CONFIG = ConfigAttr::new(
96 Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
97 Some("proc_stop_max_idle".to_string()),
98 ))
99 pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
100
101 @meta(CONFIG = ConfigAttr::new(
104 Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
105 Some("get_proc_state_max_idle".to_string()),
106 ))
107 pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
112pub struct HostRef(ChannelAddr);
113wirevalue::register_type!(HostRef);
114
115impl HostRef {
116 fn mesh_agent(&self) -> hyperactor_reference::ActorRef<HostAgent> {
118 hyperactor_reference::ActorRef::attest(
119 self.service_proc()
120 .actor_id(host_agent::HOST_MESH_AGENT_ACTOR_NAME, 0),
121 )
122 }
123
124 fn named_proc(&self, name: &Name) -> hyperactor_reference::ProcId {
126 hyperactor_reference::ProcId::with_name(self.0.clone(), name.to_string())
127 }
128
129 fn service_proc(&self) -> hyperactor_reference::ProcId {
131 hyperactor_reference::ProcId::with_name(self.0.clone(), SERVICE_PROC_NAME)
132 }
133
134 pub(crate) async fn shutdown(
153 &self,
154 cx: &impl hyperactor::context::Actor,
155 ) -> anyhow::Result<()> {
156 let agent = self.mesh_agent();
157 let terminate_timeout =
158 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
159 let max_in_flight =
160 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
161 agent
162 .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
163 .await?;
164 Ok(())
165 }
166
167 pub(crate) async fn drain(
171 &self,
172 cx: &impl hyperactor::context::Actor,
173 host_mesh_name: Option<Name>,
174 ) -> anyhow::Result<()> {
175 let agent = self.mesh_agent();
176 let terminate_timeout =
177 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
178 let max_in_flight =
179 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
180 agent
181 .drain_host(
182 cx,
183 terminate_timeout,
184 max_in_flight.clamp(1, 256),
185 host_mesh_name,
186 )
187 .await?;
188 Ok(())
189 }
190}
191
192impl TryFrom<hyperactor_reference::ActorRef<HostAgent>> for HostRef {
193 type Error = crate::Error;
194
195 fn try_from(value: hyperactor_reference::ActorRef<HostAgent>) -> Result<Self, crate::Error> {
196 let proc_id = value.actor_id().proc_id();
197 Ok(HostRef(proc_id.addr().clone()))
198 }
199}
200
201impl std::fmt::Display for HostRef {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 self.0.fmt(f)
204 }
205}
206
207impl FromStr for HostRef {
208 type Err = <ChannelAddr as FromStr>::Err;
209
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 Ok(HostRef(ChannelAddr::from_str(s)?))
212 }
213}
214
215pub struct HostMesh {
226 name: Name,
227 extent: Extent,
228 allocation: HostMeshAllocation,
229 current_ref: HostMeshRef,
230}
231
232#[allow(dead_code)]
250enum HostMeshAllocation {
251 ProcMesh {
258 proc_mesh: ProcMesh,
259 proc_mesh_ref: ProcMeshRef,
260 hosts: Vec<HostRef>,
261 },
262 Owned { hosts: Vec<HostRef> },
270}
271
272impl HostMesh {
273 fn notify_created(&self) {
275 let name_str = self.name.to_string();
276 let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
277
278 hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
279 id: mesh_id_hash,
280 timestamp: std::time::SystemTime::now(),
281 class: "Host".to_string(),
282 given_name: self.name.name().to_string(),
283 full_name: name_str,
284 shape_json: serde_json::to_string(&self.extent).unwrap_or_default(),
285 parent_mesh_id: None,
286 parent_view_json: None,
287 });
288
289 let now = std::time::SystemTime::now();
292 for (rank, host) in self.current_ref.hosts().iter().enumerate() {
293 let actor = host.mesh_agent();
294 hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
295 id: hyperactor_telemetry::hash_to_u64(actor.actor_id()),
296 timestamp: now,
297 mesh_id: mesh_id_hash,
298 rank: rank as u64,
299 full_name: actor.actor_id().to_string(),
300 display_name: None,
301 });
302 }
303 }
304
305 pub async fn local() -> crate::Result<HostMesh> {
328 Self::local_with_bootstrap(BootstrapCommand::current()?).await
329 }
330
331 pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> crate::Result<HostMesh> {
339 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
340 let result = boot.bootstrap().await;
341 if let Err(err) = result {
342 tracing::error!("failed to bootstrap local host mesh process: {}", err);
343 }
344 std::process::exit(1);
345 }
346
347 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
348
349 let manager = BootstrapProcManager::new(bootstrap_cmd)?;
350 let host = Host::new(manager, addr).await?;
351 let addr = host.addr().clone();
352 let system_proc = host.system_proc().clone();
353 let host_mesh_agent = system_proc
354 .spawn(
355 "host_agent",
356 HostAgent::new(HostAgentMode::Process {
357 host,
358 shutdown_tx: None,
359 }),
360 )
361 .map_err(crate::Error::SingletonActorSpawnError)?;
362 host_mesh_agent.bind::<HostAgent>();
363
364 let host = HostRef(addr);
365 let host_mesh_ref = HostMeshRef::new(
366 Name::new("local").unwrap(),
367 extent!(hosts = 1).into(),
368 vec![host],
369 )?;
370 Ok(HostMesh::take(host_mesh_ref))
371 }
372
373 pub async fn local_in_process() -> crate::Result<HostMesh> {
383 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
384 Ok(HostMesh::take(Self::local_n_in_process(vec![addr]).await?))
385 }
386
387 pub(crate) async fn local_n_in_process(addrs: Vec<ChannelAddr>) -> crate::Result<HostMeshRef> {
396 let n = addrs.len();
397 let mut host_refs = Vec::with_capacity(n);
398 for addr in addrs {
399 host_refs.push(Self::create_in_process_host(addr).await?);
400 }
401 HostMeshRef::new(
402 Name::new("local").unwrap(),
403 extent!(hosts = n).into(),
404 host_refs,
405 )
406 }
407
408 async fn create_in_process_host(addr: ChannelAddr) -> crate::Result<HostRef> {
411 let spawn: ProcManagerSpawnFn =
412 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
413 let manager = LocalProcManager::new(spawn);
414 let host = Host::new(manager, addr).await?;
415 let addr = host.addr().clone();
416 let system_proc = host.system_proc().clone();
417 let host_mesh_agent = system_proc
418 .spawn(
419 host_agent::HOST_MESH_AGENT_ACTOR_NAME,
420 HostAgent::new(HostAgentMode::Local(host)),
421 )
422 .map_err(crate::Error::SingletonActorSpawnError)?;
423 host_mesh_agent.bind::<HostAgent>();
424 Ok(HostRef(addr))
425 }
426
427 pub async fn process(extent: Extent, command: BootstrapCommand) -> crate::Result<HostMesh> {
438 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
439 let result = boot.bootstrap().await;
440 if let Err(err) = result {
441 tracing::error!("failed to bootstrap process host mesh process: {}", err);
442 }
443 std::process::exit(1);
444 }
445
446 let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
447 let mut hosts = Vec::with_capacity(extent.num_ranks());
448 for _ in 0..extent.num_ranks() {
449 let addr = bind_spec.binding_addr();
451 let bootstrap = Bootstrap::Host {
452 addr: addr.clone(),
453 command: Some(command.clone()),
454 config: Some(hyperactor_config::global::attrs()),
455 exit_on_shutdown: false,
456 };
457
458 let mut cmd = command.new();
459 bootstrap.to_env(&mut cmd);
460 cmd.spawn()?;
461 hosts.push(HostRef(addr));
462 }
463
464 let host_mesh_ref = HostMeshRef::new(Name::new("process").unwrap(), extent.into(), hosts)?;
465 Ok(HostMesh::take(host_mesh_ref))
466 }
467
468 pub async fn allocate<C: context::Actor>(
512 cx: &C,
513 alloc: Box<dyn Alloc + Send + Sync>,
514 name: &str,
515 bootstrap_params: Option<BootstrapCommand>,
516 ) -> crate::Result<Self>
517 where
518 C::A: Handler<MeshFailure>,
519 {
520 Self::allocate_inner(cx, alloc, Name::new(name)?, bootstrap_params).await
521 }
522
523 #[hyperactor::instrument(fields(host_mesh=name.to_string()))]
525 async fn allocate_inner<C: context::Actor>(
526 cx: &C,
527 alloc: Box<dyn Alloc + Send + Sync>,
528 name: Name,
529 bootstrap_params: Option<BootstrapCommand>,
530 ) -> crate::Result<Self>
531 where
532 C::A: Handler<MeshFailure>,
533 {
534 tracing::info!(name = "HostMeshStatus", status = "Allocate::Attempt");
535 let transport = alloc.transport();
536 let extent = alloc.extent().clone();
537 let is_local = alloc.is_local();
538 let proc_mesh = ProcMesh::allocate(cx, alloc, name.name()).await?;
539
540 let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
545 let trampoline_name = Name::new("host_mesh_trampoline").unwrap();
546 let _trampoline_actor_mesh = proc_mesh
547 .spawn_with_name::<HostMeshAgentProcMeshTrampoline, C>(
548 cx,
549 trampoline_name,
550 &(transport, mesh_agents.bind(), bootstrap_params, is_local),
551 None,
552 true,
554 )
555 .await?;
556
557 let mut hosts = Vec::new();
559 for _rank in 0..extent.num_ranks() {
560 let mesh_agent = mesh_agents_rx.recv().await?;
561
562 let addr = mesh_agent.actor_id().proc_id().addr().clone();
563
564 let host_ref = HostRef(addr);
565 if host_ref.mesh_agent() != mesh_agent {
566 return Err(crate::Error::HostMeshAgentConfigurationError(
567 mesh_agent.actor_id().clone(),
568 format!(
569 "expected mesh agent actor id to be {}",
570 host_ref.mesh_agent().actor_id()
571 ),
572 ));
573 }
574 hosts.push(host_ref);
575 }
576
577 let proc_mesh_ref = proc_mesh.clone();
578 let mesh = Self {
579 name: name.clone(),
580 extent: extent.clone(),
581 allocation: HostMeshAllocation::ProcMesh {
582 proc_mesh,
583 proc_mesh_ref,
584 hosts: hosts.clone(),
585 },
586 current_ref: HostMeshRef::new(name, extent.into(), hosts).unwrap(),
587 };
588
589 let controller = HostMeshController::new(mesh.deref().clone());
592 let controller_name = format!("{}_{}", HOST_MESH_CONTROLLER_NAME, mesh.name());
595 let controller_handle = controller
596 .spawn_with_name(cx, &controller_name)
597 .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
598 let _: hyperactor::reference::ActorRef<HostMeshController> = controller_handle.bind();
603
604 tracing::info!(name = "HostMeshStatus", status = "Allocate::Created");
605
606 mesh.notify_created();
607
608 Ok(mesh)
609 }
610
611 pub fn take(mesh: HostMeshRef) -> Self {
618 let region = mesh.region().clone();
619 let hosts: Vec<HostRef> = mesh.values().collect();
620
621 let current_ref = HostMeshRef::new(mesh.name.clone(), region.clone(), hosts.clone())
622 .expect("region/hosts cardinality must match");
623
624 let result = Self {
625 name: mesh.name,
626 extent: region.extent().clone(),
627 allocation: HostMeshAllocation::Owned { hosts },
628 current_ref,
629 };
630 result.notify_created();
631 result
632 }
633
634 pub async fn attach(
645 cx: &impl context::Actor,
646 name: Name,
647 addresses: Vec<ChannelAddr>,
648 ) -> crate::Result<Self> {
649 let mesh_ref = HostMeshRef::from_hosts(name, addresses);
650 let config = hyperactor_config::global::propagatable_attrs();
651 mesh_ref.push_config(cx, config).await;
652 Ok(Self::take(mesh_ref))
653 }
654
655 #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
666 pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
667 let t0 = std::time::Instant::now();
668 tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
669
670 let results = futures::future::join_all(
673 self.current_ref
674 .values()
675 .map(|host| async move { host.drain(cx, None).await }),
676 )
677 .await;
678 let phase1_ms = t0.elapsed().as_millis();
679 for result in &results {
680 if let Err(e) = result {
681 tracing::warn!(
682 name = "HostMeshStatus",
683 status = "Shutdown::Drain::Failed",
684 error = %e,
685 "drain failed on a host"
686 );
687 }
688 }
689
690 let t1 = std::time::Instant::now();
692 let results = futures::future::join_all(self.current_ref.values().map(|host| async move {
693 let result = host.shutdown(cx).await;
694 (host, result)
695 }))
696 .await;
697 let phase2_ms = t1.elapsed().as_millis();
698 let total_ms = t0.elapsed().as_millis();
699 let mut failed_hosts = vec![];
700 for (host, result) in &results {
701 if let Err(e) = result {
702 tracing::warn!(
703 name = "HostMeshStatus",
704 status = "Shutdown::Host::Failed",
705 host = %host,
706 error = %e,
707 "host shutdown failed"
708 );
709 failed_hosts.push(host);
710 }
711 }
712 if failed_hosts.is_empty() {
713 tracing::info!(
714 name = "HostMeshStatus",
715 status = "Shutdown::Success",
716 phase1_ms,
717 phase2_ms,
718 total_ms,
719 );
720 } else {
721 tracing::error!(
722 name = "HostMeshStatus",
723 status = "Shutdown::Failed",
724 phase1_ms,
725 phase2_ms,
726 total_ms,
727 "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
728 failed_hosts
729 );
730 }
731
732 match &mut self.allocation {
733 HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
734 proc_mesh.stop(cx, "host mesh shutdown".to_string()).await?;
735 }
736 HostMeshAllocation::Owned { .. } => {}
737 }
738 Ok(())
739 }
740
741 pub fn shutdown_guard(self) -> HostMeshShutdownGuard {
744 HostMeshShutdownGuard(self)
745 }
746
747 #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
754 pub async fn stop(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
755 let t0 = std::time::Instant::now();
756 tracing::info!(name = "HostMeshStatus", status = "Stop::Attempt");
757
758 let mesh_name = self.name.clone();
759 let results = futures::future::join_all(self.current_ref.values().map(|host| {
760 let mesh_name = Some(mesh_name.clone());
761 async move { host.drain(cx, mesh_name).await }
762 }))
763 .await;
764 let total_ms = t0.elapsed().as_millis();
765 let mut failed_hosts = vec![];
766 for (i, result) in results.iter().enumerate() {
767 if let Err(e) = result {
768 tracing::warn!(
769 name = "HostMeshStatus",
770 status = "Stop::Drain::Failed",
771 error = %e,
772 "drain failed on a host"
773 );
774 failed_hosts.push(i);
775 }
776 }
777 if failed_hosts.is_empty() {
778 tracing::info!(name = "HostMeshStatus", status = "Stop::Success", total_ms,);
779 } else {
780 tracing::error!(
781 name = "HostMeshStatus",
782 status = "Stop::Failed",
783 total_ms,
784 "host mesh stop failed; check the logs of the failed hosts for details: {:?}",
785 failed_hosts
786 );
787 }
788
789 self.allocation = HostMeshAllocation::Owned { hosts: vec![] };
793
794 Ok(())
795 }
796}
797
798impl HostMesh {
799 pub fn set_bootstrap(&mut self, cmd: BootstrapCommand) {
804 self.current_ref = self.current_ref.clone().with_bootstrap(cmd);
805 }
806}
807
808impl Deref for HostMesh {
809 type Target = HostMeshRef;
810
811 fn deref(&self) -> &Self::Target {
812 &self.current_ref
813 }
814}
815
816impl AsRef<HostMeshRef> for HostMesh {
817 fn as_ref(&self) -> &HostMeshRef {
818 self
819 }
820}
821
822impl AsRef<HostMeshRef> for HostMeshRef {
823 fn as_ref(&self) -> &HostMeshRef {
824 self
825 }
826}
827
828pub struct HostMeshShutdownGuard(pub HostMesh);
830
831impl Deref for HostMeshShutdownGuard {
832 type Target = HostMesh;
833
834 fn deref(&self) -> &HostMesh {
835 &self.0
836 }
837}
838
839impl DerefMut for HostMeshShutdownGuard {
840 fn deref_mut(&mut self) -> &mut HostMesh {
841 &mut self.0
842 }
843}
844
845impl Drop for HostMeshShutdownGuard {
846 fn drop(&mut self) {
864 tracing::info!(
865 name = "HostMeshStatus",
866 host_mesh = %self.0.name,
867 status = "Dropping",
868 );
869 let hosts: Vec<HostRef> = match &self.0.allocation {
871 HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
872 hosts.clone()
873 }
874 };
875
876 if let Ok(handle) = tokio::runtime::Handle::try_current() {
878 let mesh_name = self.0.name.clone();
879 let allocation_label = match &self.0.allocation {
880 HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
881 HostMeshAllocation::Owned { .. } => "owned",
882 }
883 .to_string();
884
885 handle.spawn(async move {
886 let span = tracing::info_span!(
887 "hostmesh_drop_cleanup",
888 host_mesh = %mesh_name,
889 allocation = %allocation_label,
890 hosts = hosts.len(),
891 );
892 let _g = span.enter();
893
894 match hyperactor::Proc::direct(
897 ChannelTransport::Unix.any(),
898 "hostmesh-drop".to_string(),
899 )
900 {
901 Err(e) => {
902 tracing::warn!(
903 error = %e,
904 "failed to construct ephemeral Proc for drop-cleanup; \
905 relying on PDEATHSIG/manager Drop"
906 );
907 }
908 Ok(proc) => {
909 match proc.instance("drop") {
910 Err(e) => {
911 tracing::warn!(
912 error = %e,
913 "failed to create ephemeral instance for drop-cleanup; \
914 relying on PDEATHSIG/manager Drop"
915 );
916 }
917 Ok((instance, _guard)) => {
918 let mut attempted = 0usize;
919 let mut ok = 0usize;
920 let mut err = 0usize;
921
922 for host in hosts {
923 attempted += 1;
924 tracing::debug!(host = %host, "drop-cleanup: shutdown start");
925 match host.shutdown(&instance).await {
926 Ok(()) => {
927 ok += 1;
928 tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
929 }
930 Err(e) => {
931 err += 1;
932 tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
933 }
934 }
935 }
936
937 tracing::info!(
938 attempted, ok, err,
939 "hostmesh drop-cleanup summary"
940 );
941 }
942 }
943 }
944 }
945 });
946 } else {
947 tracing::warn!(
950 host_mesh = %self.0.name,
951 hosts = hosts.len(),
952 "HostMesh dropped without a Tokio runtime; skipping \
953 best-effort shutdown. This indicates that .shutdown() \
954 on this mesh has not been called before program exit \
955 (perhaps due to a missing call to \
956 'monarch.actor.shutdown_context()'?) This in turn can \
957 lead to backtrace output due to folly SIGTERM \
958 handlers."
959 );
960 }
961
962 tracing::info!(
963 name = "HostMeshStatus",
964 host_mesh = %self.0.name,
965 status = "Dropped",
966 );
967 }
968}
969
970pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
979 mesh: &ValueMesh<T>,
980 default: T,
981 is_sentinel: F,
982 len: usize,
983) -> RankedValues<T>
984where
985 T: Eq + Clone + 'static,
986 F: Fn(&T) -> bool,
987{
988 let mut out = RankedValues::from((0..len, default));
989 for (i, s) in mesh.values().enumerate() {
990 if !is_sentinel(&s) {
991 out.merge_from(RankedValues::from((i..i + 1, s)));
992 }
993 }
994 out
995}
996
997#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
1016pub struct HostMeshRef {
1017 name: Name,
1018 region: Region,
1019 ranks: Arc<Vec<HostRef>>,
1020 #[serde(default)]
1023 pub bootstrap_command: Option<BootstrapCommand>,
1024}
1025wirevalue::register_type!(HostMeshRef);
1026
1027impl HostMeshRef {
1028 #[allow(clippy::result_large_err)]
1031 fn new(name: Name, region: Region, ranks: Vec<HostRef>) -> crate::Result<Self> {
1032 if region.num_ranks() != ranks.len() {
1033 return Err(crate::Error::InvalidRankCardinality {
1034 expected: region.num_ranks(),
1035 actual: ranks.len(),
1036 });
1037 }
1038 Ok(Self {
1039 name,
1040 region,
1041 ranks: Arc::new(ranks),
1042 bootstrap_command: None,
1043 })
1044 }
1045
1046 pub fn from_hosts(name: Name, hosts: Vec<ChannelAddr>) -> Self {
1049 Self {
1050 name,
1051 region: extent!(hosts = hosts.len()).into(),
1052 ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
1053 bootstrap_command: None,
1054 }
1055 }
1056
1057 pub fn from_host_agents(
1059 name: Name,
1060 agents: Vec<hyperactor_reference::ActorRef<HostAgent>>,
1061 ) -> crate::Result<Self> {
1062 Ok(Self {
1063 name,
1064 region: extent!(hosts = agents.len()).into(),
1065 ranks: Arc::new(
1066 agents
1067 .into_iter()
1068 .map(HostRef::try_from)
1069 .collect::<crate::Result<_>>()?,
1070 ),
1071 bootstrap_command: None,
1072 })
1073 }
1074
1075 pub fn from_host_agent(
1077 name: Name,
1078 agent: hyperactor_reference::ActorRef<HostAgent>,
1079 ) -> crate::Result<Self> {
1080 Ok(Self {
1081 name,
1082 region: Extent::unity().into(),
1083 ranks: Arc::new(vec![HostRef::try_from(agent)?]),
1084 bootstrap_command: None,
1085 })
1086 }
1087
1088 pub fn with_bootstrap(self, cmd: BootstrapCommand) -> Self {
1091 Self {
1092 bootstrap_command: Some(cmd),
1093 ..self
1094 }
1095 }
1096
1097 pub(crate) fn host_entries(&self) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
1101 self.ranks
1102 .iter()
1103 .map(|h| (h.0.to_string(), h.mesh_agent()))
1104 .collect()
1105 }
1106
1107 pub(crate) async fn push_config(
1116 &self,
1117 cx: &impl context::Actor,
1118 attrs: hyperactor_config::attrs::Attrs,
1119 ) {
1120 let timeout = hyperactor_config::global::get(crate::config::MESH_ATTACH_CONFIG_TIMEOUT);
1121 let hosts: Vec<_> = self.values().collect();
1122 let num_hosts = hosts.len();
1123
1124 let barrier = futures::future::join_all(hosts.into_iter().map(|host| {
1125 let attrs = attrs.clone();
1126 let agent_id = host.mesh_agent().actor_id().clone();
1127 async move {
1128 match host.mesh_agent().set_client_config(cx, attrs).await {
1129 Ok(()) => {
1130 tracing::debug!(host = %agent_id, "host agent config installed");
1131 true
1132 }
1133 Err(e) => {
1134 tracing::warn!(
1135 host = %agent_id,
1136 error = %e,
1137 "failed to push client config to host agent, \
1138 continuing without it",
1139 );
1140 false
1141 }
1142 }
1143 }
1144 }));
1145
1146 match tokio::time::timeout(timeout, barrier).await {
1147 Ok(results) => {
1148 let success = results.iter().filter(|&&r| r).count();
1149 let failed = num_hosts - success;
1150 tracing::info!(
1151 success = success,
1152 failed = failed,
1153 "push_config barrier complete",
1154 );
1155 }
1156 Err(_) => {
1157 tracing::warn!(
1158 num_hosts = num_hosts,
1159 timeout_secs = timeout.as_secs(),
1160 "push_config barrier timed out, some hosts may not \
1161 have received client config",
1162 );
1163 }
1164 }
1165 }
1166
1167 #[allow(clippy::result_large_err)]
1179 pub async fn spawn<C: context::Actor>(
1180 &self,
1181 cx: &C,
1182 name: &str,
1183 per_host: Extent,
1184 proc_bind: Option<Vec<ProcBind>>,
1185 ) -> crate::Result<ProcMesh>
1186 where
1187 C::A: Handler<MeshFailure>,
1188 {
1189 self.spawn_inner(cx, Name::new(name)?, per_host, proc_bind)
1190 .await
1191 }
1192
1193 #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1194 async fn spawn_inner<C: context::Actor>(
1195 &self,
1196 cx: &C,
1197 proc_mesh_name: Name,
1198 per_host: Extent,
1199 proc_bind: Option<Vec<ProcBind>>,
1200 ) -> crate::Result<ProcMesh>
1201 where
1202 C::A: Handler<MeshFailure>,
1203 {
1204 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
1205 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
1206 let result = self
1207 .spawn_inner_inner(cx, proc_mesh_name, per_host, proc_bind)
1208 .await;
1209 match &result {
1210 Ok(_) => {
1211 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
1212 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
1213 }
1214 Err(error) => {
1215 tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
1216 tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
1217 }
1218 }
1219 result
1220 }
1221
1222 async fn spawn_inner_inner<C: context::Actor>(
1223 &self,
1224 cx: &C,
1225 proc_mesh_name: Name,
1226 per_host: Extent,
1227 proc_bind: Option<Vec<ProcBind>>,
1228 ) -> crate::Result<ProcMesh>
1229 where
1230 C::A: Handler<MeshFailure>,
1231 {
1232 let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
1233 let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
1234 if !per_host_labels
1235 .intersection(&host_labels)
1236 .collect::<Vec<_>>()
1237 .is_empty()
1238 {
1239 return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1240 "per_host dims overlap with existing dims when spawning proc mesh"
1241 )));
1242 }
1243 if let Some(proc_bind) = proc_bind.as_ref() {
1244 if proc_bind.len() != per_host.num_ranks() {
1245 return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1246 "proc_bind length does not match per_host extent"
1247 )));
1248 }
1249 }
1250
1251 let extent = self
1252 .region
1253 .extent()
1254 .concat(&per_host)
1255 .map_err(|err| crate::Error::ConfigurationError(err.into()))?;
1256
1257 let region: Region = extent.clone().into();
1258
1259 tracing::info!(
1260 name = "ProcMeshStatus",
1261 status = "Spawn::Attempt",
1262 %region,
1263 "spawning proc mesh"
1264 );
1265
1266 let mut procs = Vec::new();
1267 let num_ranks = region.num_ranks();
1268 let (port, rx) = cx.mailbox().open_accum_port_opts(
1271 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1272 StreamingReducerOpts {
1273 max_update_interval: Some(Duration::from_millis(50)),
1274 initial_update_interval: None,
1275 },
1276 );
1277
1278 let mut proc_names = Vec::new();
1285 let client_config_override = hyperactor_config::global::propagatable_attrs();
1286 for (host_rank, host) in self.ranks.iter().enumerate() {
1287 for per_host_rank in 0..per_host.num_ranks() {
1288 let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
1289 let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank))?;
1290 proc_names.push(proc_name.clone());
1291 let bind = proc_bind.as_ref().map(|v| v[per_host_rank].clone());
1292 let proc_spec = resource::ProcSpec {
1293 client_config_override: client_config_override.clone(),
1294 bootstrap_command: self.bootstrap_command.clone(),
1295 proc_bind: bind,
1296 host_mesh_name: Some(self.name.clone()),
1297 };
1298 host.mesh_agent()
1299 .create_or_update(
1300 cx,
1301 proc_name.clone(),
1302 resource::Rank::new(create_rank),
1303 proc_spec,
1304 )
1305 .await
1306 .map_err(|e| {
1307 crate::Error::HostMeshAgentConfigurationError(
1308 host.mesh_agent().actor_id().clone(),
1309 format!("failed while creating proc: {}", e),
1310 )
1311 })?;
1312 let mut reply_port = port.bind();
1313 reply_port.return_undeliverable(false);
1316 host.mesh_agent()
1317 .get_rank_status(cx, proc_name.clone(), reply_port)
1318 .await
1319 .map_err(|e| {
1320 crate::Error::HostMeshAgentConfigurationError(
1321 host.mesh_agent().actor_id().clone(),
1322 format!("failed while querying proc status: {}", e),
1323 )
1324 })?;
1325 let proc_id = host.named_proc(&proc_name);
1326 tracing::info!(
1327 name = "ProcMeshStatus",
1328 status = "Spawn::CreatingProc",
1329 %proc_id,
1330 rank = create_rank,
1331 );
1332 procs.push(ProcRef::new(
1333 proc_id,
1334 create_rank,
1335 hyperactor_reference::ActorRef::attest(
1337 host.named_proc(&proc_name)
1338 .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
1339 ),
1340 ));
1341 }
1342 }
1343
1344 let start_time = tokio::time::Instant::now();
1345
1346 match GetRankStatus::wait(
1349 rx,
1350 num_ranks,
1351 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1352 region.clone(), )
1354 .await
1355 {
1356 Ok(statuses) => {
1357 if let Some((rank, status)) = statuses
1360 .values()
1361 .enumerate()
1362 .find(|(_, s)| s.is_terminating())
1363 {
1364 let proc_name = &proc_names[rank];
1365 let host_rank = rank / per_host.num_ranks();
1366 let mesh_agent = self.ranks[host_rank].mesh_agent();
1367 let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
1368 let mut reply_tx = reply_tx.bind();
1369 reply_tx.return_undeliverable(false);
1372 mesh_agent
1373 .send(
1374 cx,
1375 resource::GetState {
1376 name: proc_name.clone(),
1377 reply: reply_tx,
1378 },
1379 )
1380 .map_err(|e| {
1381 crate::Error::SendingError(mesh_agent.actor_id().clone(), e.into())
1382 })?;
1383 let state = match tokio::time::timeout(
1384 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1385 reply_rx.recv(),
1386 )
1387 .await
1388 {
1389 Ok(Ok(state)) => state,
1390 _ => resource::State {
1391 name: proc_name.clone(),
1392 status,
1393 state: None,
1394 generation: 0,
1395 timestamp: std::time::SystemTime::now(),
1396 },
1397 };
1398
1399 tracing::error!(
1400 name = "ProcMeshStatus",
1401 status = "Spawn::GetRankStatus",
1402 rank = host_rank,
1403 "rank {} is terminating with state: {}",
1404 host_rank,
1405 state
1406 );
1407
1408 return Err(crate::Error::ProcCreationError {
1409 state: Box::new(state),
1410 host_rank,
1411 mesh_agent,
1412 });
1413 }
1414 }
1415 Err(complete) => {
1416 tracing::error!(
1417 name = "ProcMeshStatus",
1418 status = "Spawn::GetRankStatus",
1419 "timeout after {:?} when waiting for procs being created",
1420 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1421 );
1422 let legacy = mesh_to_rankedvalues_with_default(
1425 &complete,
1426 Status::Timeout(start_time.elapsed()),
1427 Status::is_not_exist,
1428 num_ranks,
1429 );
1430 return Err(crate::Error::ProcSpawnError { statuses: legacy });
1431 }
1432 }
1433
1434 let mesh =
1435 ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
1436 if let Ok(ref mesh) = mesh {
1437 let controller = ProcMeshController::new(mesh.deref().clone());
1440 let controller_name = format!("{}_{}", PROC_MESH_CONTROLLER_NAME, mesh.name());
1443 let controller_handle = controller
1444 .spawn_with_name(cx, &controller_name)
1445 .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1446 let _: hyperactor::reference::ActorRef<ProcMeshController> = controller_handle.bind();
1451 }
1452 mesh
1453 }
1454
1455 pub fn name(&self) -> &Name {
1457 &self.name
1458 }
1459
1460 pub fn hosts(&self) -> &[HostRef] {
1462 &self.ranks
1463 }
1464
1465 #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1466 pub(crate) async fn stop_proc_mesh(
1467 &self,
1468 cx: &impl hyperactor::context::Actor,
1469 proc_mesh_name: &Name,
1470 procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1471 region: Region,
1472 reason: String,
1473 ) -> anyhow::Result<()> {
1474 let mut proc_names = Vec::new();
1477 let num_ranks = region.num_ranks();
1478 let (port, rx) = cx.mailbox().open_accum_port_opts(
1481 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1482 StreamingReducerOpts {
1483 max_update_interval: Some(Duration::from_millis(50)),
1484 initial_update_interval: None,
1485 },
1486 );
1487 for proc_id in procs.into_iter() {
1488 let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1489 let proc_name = proc_name.parse::<Name>()?;
1493 proc_names.push(proc_name.clone());
1494
1495 let host = HostRef(addr);
1498 host.mesh_agent().send(
1499 cx,
1500 resource::Stop {
1501 name: proc_name.clone(),
1502 reason: reason.clone(),
1503 },
1504 )?;
1505 host.mesh_agent()
1506 .wait_rank_status(cx, proc_name, Status::Stopped, port.bind())
1507 .await?;
1508
1509 tracing::info!(
1510 name = "ProcMeshStatus",
1511 %proc_id,
1512 status = "Stop::Sent",
1513 );
1514 }
1515 tracing::info!(
1516 name = "HostMeshStatus",
1517 status = "ProcMesh::Stop::Sent",
1518 "sending Stop to proc mesh for {} procs: {}",
1519 proc_names.len(),
1520 proc_names
1521 .iter()
1522 .map(|n| n.to_string())
1523 .collect::<Vec<_>>()
1524 .join(", ")
1525 );
1526
1527 let start_time = tokio::time::Instant::now();
1528
1529 match GetRankStatus::wait(
1530 rx,
1531 num_ranks,
1532 hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1533 region.clone(), )
1535 .await
1536 {
1537 Ok(statuses) => {
1538 let all_stopped = statuses.values().all(|s| s.is_terminated());
1539 if !all_stopped {
1540 tracing::error!(
1541 name = "ProcMeshStatus",
1542 status = "FailedToStop",
1543 "failed to terminate proc mesh: {:?}",
1544 statuses,
1545 );
1546 return Err(anyhow::anyhow!(
1547 "failed to terminate proc mesh: {:?}",
1548 statuses,
1549 ));
1550 }
1551 tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1552 }
1553 Err(complete) => {
1554 let legacy = mesh_to_rankedvalues_with_default(
1557 &complete,
1558 Status::Timeout(start_time.elapsed()),
1559 Status::is_not_exist,
1560 num_ranks,
1561 );
1562 tracing::error!(
1563 name = "ProcMeshStatus",
1564 status = "StoppingTimeout",
1565 "failed to terminate proc mesh before timeout: {:?}",
1566 legacy,
1567 );
1568 return Err(anyhow::anyhow!(
1569 "failed to terminate proc mesh {} before timeout: {:?}",
1570 proc_mesh_name,
1571 legacy
1572 ));
1573 }
1574 }
1575 Ok(())
1576 }
1577
1578 #[allow(clippy::result_large_err)]
1583 pub(crate) async fn proc_states(
1584 &self,
1585 cx: &impl context::Actor,
1586 procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1587 region: Region,
1588 ) -> crate::Result<ValueMesh<resource::State<ProcState>>> {
1589 let (tx, mut rx) = cx.mailbox().open_port();
1590
1591 let mut num_ranks = 0;
1592 let procs: Vec<hyperactor_reference::ProcId> = procs.into_iter().collect();
1593 let mut proc_names = Vec::new();
1594 for proc_id in procs.iter() {
1595 num_ranks += 1;
1596 let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1597
1598 let host = HostRef(addr);
1601 let proc_name = proc_name.parse::<Name>()?;
1602 proc_names.push(proc_name.clone());
1603 let mut reply = tx.bind();
1604 reply.return_undeliverable(false);
1607 host.mesh_agent()
1608 .send(
1609 cx,
1610 resource::GetState {
1611 name: proc_name,
1612 reply,
1613 },
1614 )
1615 .map_err(|e| {
1616 crate::Error::CallError(host.mesh_agent().actor_id().clone(), e.into())
1617 })?;
1618 }
1619
1620 let mut states = Vec::with_capacity(num_ranks);
1621 let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1622 for _ in 0..num_ranks {
1623 let state = tokio::time::timeout(timeout, rx.recv()).await;
1629 if let Ok(state) = state {
1630 let state = state?;
1632 match state.state {
1633 Some(ref inner) => {
1634 states.push((inner.create_rank, state));
1635 }
1636 None => {
1637 return Err(crate::Error::NotExist(state.name));
1638 }
1639 }
1640 } else {
1641 tracing::warn!(
1644 "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1645 timeout
1646 );
1647 let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1648 let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1649 let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1650 assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1651 while states.len() < num_ranks {
1652 let rank = *leftover_ranks
1653 .pop()
1654 .expect("leftover ranks should not be empty");
1655 states.push((
1656 rank,
1658 resource::State {
1659 name: proc_names[rank].clone(),
1660 status: resource::Status::Timeout(timeout),
1661 state: None,
1662 generation: 0,
1663 timestamp: std::time::SystemTime::now(),
1664 },
1665 ));
1666 }
1667 break;
1668 }
1669 }
1670 states.sort_by_key(|(rank, _)| *rank);
1674 let vm = states
1675 .into_iter()
1676 .map(|(_, state)| state)
1677 .collect_mesh::<ValueMesh<_>>(region)?;
1678 Ok(vm)
1679 }
1680}
1681
1682struct HostSet {
1690 seen: HashSet<hyperactor_reference::ActorId>,
1691 entries: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
1692}
1693
1694impl HostSet {
1695 fn new() -> Self {
1696 Self {
1697 seen: HashSet::new(),
1698 entries: Vec::new(),
1699 }
1700 }
1701
1702 fn insert(&mut self, addr: String, agent_ref: hyperactor_reference::ActorRef<HostAgent>) {
1705 if self.seen.insert(agent_ref.actor_id().clone()) {
1706 self.entries.push((addr, agent_ref));
1707 }
1708 }
1709
1710 fn extend_from_mesh(&mut self, mesh: &HostMeshRef) {
1712 for h in mesh.hosts() {
1713 self.insert(h.0.to_string(), h.mesh_agent());
1714 }
1715 }
1716
1717 fn into_vec(self) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
1718 self.entries
1719 }
1720}
1721
1722fn aggregate_hosts(
1729 meshes: &[impl AsRef<HostMeshRef>],
1730 client_host_entries: Option<Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>>,
1731) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
1732 let mut set = HostSet::new();
1733
1734 for mesh in meshes {
1736 set.extend_from_mesh(mesh.as_ref());
1737 }
1738
1739 if let Some(entries) = client_host_entries {
1741 for (addr, agent_ref) in entries {
1742 set.insert(addr, agent_ref);
1743 }
1744 }
1745
1746 set.into_vec()
1747}
1748
1749pub async fn spawn_admin(
1759 meshes: impl IntoIterator<Item = impl AsRef<HostMeshRef>>,
1760 cx: &impl hyperactor::context::Actor,
1761 admin_addr: Option<std::net::SocketAddr>,
1762 telemetry_url: Option<String>,
1763) -> anyhow::Result<String> {
1764 let meshes: Vec<_> = meshes.into_iter().collect();
1765 anyhow::ensure!(!meshes.is_empty(), "at least one mesh is required (SA-1)");
1766 for (i, mesh) in meshes.iter().enumerate() {
1767 anyhow::ensure!(
1768 !mesh.as_ref().hosts().is_empty(),
1769 "mesh at index {} has no hosts (SA-2)",
1770 i,
1771 );
1772 }
1773
1774 let client_entries =
1775 crate::global_context::try_this_host().map(|client_host| client_host.host_entries());
1776 let hosts = aggregate_hosts(&meshes, client_entries);
1777
1778 let root_client_id = cx.mailbox().actor_id().clone();
1779
1780 let local_proc = cx.instance().proc();
1783 let agent_handle = local_proc.spawn(
1784 crate::mesh_admin::MESH_ADMIN_ACTOR_NAME,
1785 crate::mesh_admin::MeshAdminAgent::new(
1786 hosts,
1787 Some(root_client_id),
1788 admin_addr,
1789 telemetry_url,
1790 ),
1791 )?;
1792 let response = agent_handle.get_admin_addr(cx).await?;
1793 let addr = response
1794 .addr
1795 .ok_or_else(|| anyhow::anyhow!("mesh admin agent did not report an address"))?;
1796
1797 Ok(addr)
1798}
1799
1800impl view::Ranked for HostMeshRef {
1801 type Item = HostRef;
1802
1803 fn region(&self) -> &Region {
1804 &self.region
1805 }
1806
1807 fn get(&self, rank: usize) -> Option<&Self::Item> {
1808 self.ranks.get(rank)
1809 }
1810}
1811
1812impl view::RankedSliceable for HostMeshRef {
1813 fn sliced(&self, region: Region) -> Self {
1814 let ranks = self
1815 .region()
1816 .remap(®ion)
1817 .unwrap()
1818 .map(|index| self.get(index).unwrap().clone());
1819 Self {
1820 bootstrap_command: self.bootstrap_command.clone(),
1821 ..Self::new(self.name.clone(), region, ranks.collect()).unwrap()
1822 }
1823 }
1824}
1825
1826impl std::fmt::Display for HostMeshRef {
1827 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1828 write!(f, "{}:", self.name)?;
1829 for (rank, host) in self.ranks.iter().enumerate() {
1830 if rank > 0 {
1831 write!(f, ",")?;
1832 }
1833 write!(f, "{}", host)?;
1834 }
1835 write!(f, "@{}", self.region)
1836 }
1837}
1838
1839#[derive(thiserror::Error, Debug)]
1841pub enum HostMeshRefParseError {
1842 #[error(transparent)]
1843 RegionParseError(#[from] RegionParseError),
1844
1845 #[error("invalid host mesh ref: missing region")]
1846 MissingRegion,
1847
1848 #[error("invalid host mesh ref: missing name")]
1849 MissingName,
1850
1851 #[error(transparent)]
1852 InvalidName(#[from] crate::NameParseError),
1853
1854 #[error(transparent)]
1855 InvalidHostMeshRef(#[from] Box<crate::Error>),
1856
1857 #[error(transparent)]
1858 Other(#[from] anyhow::Error),
1859}
1860
1861impl From<crate::Error> for HostMeshRefParseError {
1862 fn from(err: crate::Error) -> Self {
1863 Self::InvalidHostMeshRef(Box::new(err))
1864 }
1865}
1866
1867impl FromStr for HostMeshRef {
1868 type Err = HostMeshRefParseError;
1869
1870 fn from_str(s: &str) -> Result<Self, Self::Err> {
1871 let (name, rest) = s
1872 .split_once(':')
1873 .ok_or(HostMeshRefParseError::MissingName)?;
1874
1875 let name = Name::from_str(name)?;
1876
1877 let (hosts, region) = rest
1878 .split_once('@')
1879 .ok_or(HostMeshRefParseError::MissingRegion)?;
1880 let hosts = hosts
1881 .split(',')
1882 .map(|host| host.trim())
1883 .map(|host| host.parse::<HostRef>())
1884 .collect::<Result<Vec<_>, _>>()?;
1885 let region = region.parse()?;
1886 Ok(HostMeshRef::new(name, region, hosts)?)
1887 }
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892 use std::assert_matches::assert_matches;
1893 use std::collections::HashSet;
1894 use std::collections::VecDeque;
1895
1896 use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1897 use hyperactor::context::Mailbox as _;
1898 use hyperactor_config::attrs::Attrs;
1899 use itertools::Itertools;
1900 use ndslice::ViewExt;
1901 use ndslice::extent;
1902 use timed_test::async_timed_test;
1903 use tokio::process::Command;
1904
1905 use super::*;
1906 use crate::ActorMesh;
1907 use crate::Bootstrap;
1908 use crate::bootstrap::MESH_TAIL_LOG_LINES;
1909 use crate::comm::ENABLE_NATIVE_V1_CASTING;
1910 use crate::resource::Status;
1911 use crate::testactor;
1912 use crate::testactor::GetConfigAttrs;
1913 use crate::testactor::SetConfigAttrs;
1914 use crate::testing;
1915
1916 #[test]
1917 fn test_host_mesh_subset() {
1918 let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1919 .parse()
1920 .unwrap();
1921 assert_eq!(
1922 hosts.range("replica", 1).unwrap().to_string(),
1923 "test:local:3,local:4@2+replica=1/2,host=2/1"
1924 );
1925 }
1926
1927 #[test]
1928 fn test_host_mesh_ref_parse_roundtrip() {
1929 let host_mesh_ref = HostMeshRef::new(
1930 Name::new("test").unwrap(),
1931 extent!(replica = 2, host = 2).into(),
1932 vec![
1933 "tcp:127.0.0.1:123".parse().unwrap(),
1934 "tcp:127.0.0.1:123".parse().unwrap(),
1935 "tcp:127.0.0.1:123".parse().unwrap(),
1936 "tcp:127.0.0.1:123".parse().unwrap(),
1937 ],
1938 )
1939 .unwrap();
1940
1941 assert_eq!(
1942 host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
1943 host_mesh_ref
1944 );
1945 }
1946
1947 #[cfg(fbcode_build)]
1948 async fn execute_allocate(config: &hyperactor_config::global::ConfigLock) {
1949 let poll = Duration::from_secs(3);
1950 let get_actor = Duration::from_mins(1);
1951 let get_proc = Duration::from_mins(1);
1952 let slack = Duration::from_secs(57);
1954
1955 let _pdeath_sig =
1956 config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1957 let _poll = config.override_key(crate::mesh_controller::SUPERVISION_POLL_FREQUENCY, poll);
1958 let _get_actor = config.override_key(crate::proc_mesh::GET_ACTOR_STATE_MAX_IDLE, get_actor);
1959 let _get_proc = config.override_key(crate::host_mesh::GET_PROC_STATE_MAX_IDLE, get_proc);
1960
1961 let _watchdog = config.override_key(
1963 crate::actor_mesh::SUPERVISION_WATCHDOG_TIMEOUT,
1964 poll + get_actor + get_proc + slack,
1965 );
1966
1967 let instance = testing::instance();
1968
1969 for alloc in testing::allocs(extent!(replicas = 4)).await {
1970 let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1971 .await
1972 .unwrap();
1973
1974 let proc_mesh1 = host_mesh
1975 .spawn(instance, "test_1", Extent::unity(), None)
1976 .await
1977 .unwrap();
1978
1979 let actor_mesh1: ActorMesh<testactor::TestActor> =
1980 proc_mesh1.spawn(instance, "test", &()).await.unwrap();
1981
1982 let proc_mesh2 = host_mesh
1983 .spawn(instance, "test_2", extent!(gpus = 3, extra = 2), None)
1984 .await
1985 .unwrap();
1986 assert_eq!(
1987 proc_mesh2.extent(),
1988 extent!(replicas = 4, gpus = 3, extra = 2)
1989 );
1990 assert_eq!(proc_mesh2.values().count(), 24);
1991
1992 let actor_mesh2: ActorMesh<testactor::TestActor> =
1993 proc_mesh2.spawn(instance, "test", &()).await.unwrap();
1994 assert_eq!(
1995 actor_mesh2.extent(),
1996 extent!(replicas = 4, gpus = 3, extra = 2)
1997 );
1998 assert_eq!(actor_mesh2.values().count(), 24);
1999
2000 let host_mesh_ref: HostMeshRef = host_mesh.clone();
2002 assert_eq!(
2004 host_mesh_ref.iter().collect::<Vec<_>>(),
2005 host_mesh.iter().collect::<Vec<_>>(),
2006 );
2007
2008 for actor_mesh in [&actor_mesh1, &actor_mesh2] {
2010 let (port, mut rx) = instance.mailbox().open_port();
2011 actor_mesh
2012 .cast(instance, testactor::GetActorId(port.bind()))
2013 .unwrap();
2014
2015 let mut expected_actor_ids: HashSet<_> = actor_mesh
2016 .values()
2017 .map(|actor_ref| actor_ref.actor_id().clone())
2018 .collect();
2019
2020 while !expected_actor_ids.is_empty() {
2021 let (actor_id, _seq) = rx.recv().await.unwrap();
2022 assert!(
2023 expected_actor_ids.remove(&actor_id),
2024 "got {actor_id}, expect {expected_actor_ids:?}"
2025 );
2026 }
2027 }
2028
2029 let mut to_visit: VecDeque<_> = actor_mesh1
2033 .values()
2034 .chain(actor_mesh2.values())
2035 .map(|actor_ref| actor_ref.port())
2036 .permutations(2)
2038 .flatten()
2040 .collect();
2041
2042 let expect_visited: Vec<_> = to_visit.clone().into();
2043
2044 let (last, mut last_rx) = instance.mailbox().open_port();
2046 to_visit.push_back(last.bind());
2047
2048 let forward = testactor::Forward {
2049 to_visit,
2050 visited: Vec::new(),
2051 };
2052 let first = forward.to_visit.front().unwrap().clone();
2053 first.send(instance, forward).unwrap();
2054
2055 let forward = last_rx.recv().await.unwrap();
2056 assert_eq!(forward.visited, expect_visited);
2057
2058 let _ = host_mesh.shutdown(&instance).await;
2059 }
2060 }
2061
2062 #[async_timed_test(timeout_secs = 600)]
2063 #[cfg(fbcode_build)]
2064 async fn test_allocate_dest_reorder_buffer_off() {
2065 let config = hyperactor_config::global::lock();
2066 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
2067 execute_allocate(&config).await;
2068 }
2069
2070 #[async_timed_test(timeout_secs = 600)]
2071 #[cfg(fbcode_build)]
2072 async fn test_allocate_dest_reorder_buffer_on() {
2073 let config = hyperactor_config::global::lock();
2074 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
2075 let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
2076 execute_allocate(&config).await;
2077 }
2078
2079 fn free_localhost_addr() -> ChannelAddr {
2085 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2086 ChannelAddr::Tcp(listener.local_addr().unwrap())
2087 }
2088
2089 #[cfg(fbcode_build)]
2090 async fn execute_extrinsic_allocation(config: &hyperactor_config::global::ConfigLock) {
2091 let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
2092
2093 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2094
2095 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2096
2097 let mut children = Vec::new();
2098 for host in hosts.iter() {
2099 let mut cmd = Command::new(program.clone());
2100 let boot = Bootstrap::Host {
2101 addr: host.clone(),
2102 command: None, config: None,
2104 exit_on_shutdown: false,
2105 };
2106 boot.to_env(&mut cmd);
2107 cmd.kill_on_drop(true);
2108 children.push(cmd.spawn().unwrap());
2109 }
2110
2111 let instance = testing::instance();
2112 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
2113
2114 let proc_mesh = host_mesh
2115 .spawn(&testing::instance(), "test", Extent::unity(), None)
2116 .await
2117 .unwrap();
2118
2119 let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
2120 .spawn(&testing::instance(), "test", &())
2121 .await
2122 .unwrap();
2123
2124 testactor::assert_mesh_shape(actor_mesh).await;
2125
2126 HostMesh::take(host_mesh)
2127 .shutdown(&instance)
2128 .await
2129 .expect("hosts shutdown");
2130 }
2131
2132 #[tokio::test]
2133 #[cfg(fbcode_build)]
2134 async fn test_extrinsic_allocation_v0() {
2135 let config = hyperactor_config::global::lock();
2136 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
2137 execute_extrinsic_allocation(&config).await;
2138 }
2139
2140 #[tokio::test]
2141 #[cfg(fbcode_build)]
2142 async fn test_extrinsic_allocation_v1() {
2143 let config = hyperactor_config::global::lock();
2144 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
2145 let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
2146 execute_extrinsic_allocation(&config).await;
2147 }
2148
2149 #[tokio::test]
2150 #[cfg(fbcode_build)]
2151 async fn test_failing_proc_allocation() {
2152 let lock = hyperactor_config::global::lock();
2153 let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
2154
2155 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2156
2157 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2158
2159 let mut children = Vec::new();
2160 for host in hosts.iter() {
2161 let mut cmd = Command::new(program.clone());
2162 let boot = Bootstrap::Host {
2163 addr: host.clone(),
2164 config: None,
2165 command: Some(BootstrapCommand::from("false")),
2167 exit_on_shutdown: false,
2168 };
2169 boot.to_env(&mut cmd);
2170 cmd.kill_on_drop(true);
2171 children.push(cmd.spawn().unwrap());
2172 }
2173 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
2174
2175 let instance = testing::instance();
2176
2177 let err = host_mesh
2178 .spawn(&instance, "test", Extent::unity(), None)
2179 .await
2180 .unwrap_err();
2181 assert_matches!(
2182 err,
2183 crate::Error::ProcCreationError { state, .. }
2184 if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
2185 );
2186 }
2187
2188 #[tokio::test]
2189 #[cfg(fbcode_build)]
2190 async fn test_halting_proc_allocation() {
2191 let config = hyperactor_config::global::lock();
2192 let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
2193
2194 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2195
2196 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2197
2198 let mut children = Vec::new();
2199
2200 for (index, host) in hosts.iter().enumerate() {
2201 let mut cmd = Command::new(program.clone());
2202 let command = if index == 0 {
2203 let mut command = BootstrapCommand::from("sleep");
2204 command.args.push("60".to_string());
2205 Some(command)
2206 } else {
2207 None
2208 };
2209 let boot = Bootstrap::Host {
2210 addr: host.clone(),
2211 config: None,
2212 command,
2213 exit_on_shutdown: false,
2214 };
2215 boot.to_env(&mut cmd);
2216 cmd.kill_on_drop(true);
2217 children.push(cmd.spawn().unwrap());
2218 }
2219 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
2220
2221 let instance = testing::instance();
2222
2223 let err = host_mesh
2224 .spawn(&instance, "test", Extent::unity(), None)
2225 .await
2226 .unwrap_err();
2227 let statuses = err.into_proc_spawn_error().unwrap();
2228 assert_matches!(
2229 &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
2230 &[Status::Timeout(_), Status::Running]
2231 );
2232 }
2233
2234 #[tokio::test]
2235 #[cfg(fbcode_build)]
2236 async fn test_client_config_override() {
2237 let config = hyperactor_config::global::lock();
2238 let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
2239 let _guard2 = config.override_key(
2240 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2241 Duration::from_mins(2),
2242 );
2243 let _guard3 = config.override_key(
2244 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
2245 Duration::from_mins(1),
2246 );
2247
2248 unsafe {
2253 std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
2254 std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
2255 }
2256
2257 let instance = testing::instance();
2258
2259 let mut hm = testing::host_mesh(2).await;
2260 let proc_mesh = hm
2261 .spawn(instance, "test", Extent::unity(), None)
2262 .await
2263 .unwrap();
2264
2265 let actor_mesh: ActorMesh<testactor::TestActor> =
2266 proc_mesh.spawn(instance, "test", &()).await.unwrap();
2267
2268 let mut attrs_override = Attrs::new();
2269 attrs_override.set(
2270 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2271 Duration::from_mins(3),
2272 );
2273 actor_mesh
2274 .cast(
2275 instance,
2276 SetConfigAttrs(bincode::serialize(&attrs_override).unwrap()),
2277 )
2278 .unwrap();
2279
2280 let (tx, mut rx) = instance.open_port();
2281 actor_mesh
2282 .cast(instance, GetConfigAttrs(tx.bind()))
2283 .unwrap();
2284 let actual_attrs = rx.recv().await.unwrap();
2285 let actual_attrs = bincode::deserialize::<Attrs>(&actual_attrs).unwrap();
2286
2287 assert_eq!(
2288 *actual_attrs
2289 .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
2290 .unwrap(),
2291 Duration::from_mins(3)
2292 );
2293 assert_eq!(
2294 *actual_attrs
2295 .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
2296 .unwrap(),
2297 Duration::from_mins(1)
2298 );
2299
2300 let _ = hm.shutdown(instance).await;
2301 }
2302
2303 #[tokio::test]
2306 async fn test_sa1_empty_mesh_set_rejected() {
2307 let instance = testing::instance();
2308 let result = spawn_admin(std::iter::empty::<&HostMeshRef>(), instance, None, None).await;
2309 let err = result.unwrap_err().to_string();
2310 assert!(err.contains("SA-1"), "expected SA-1 error, got: {err}");
2311 }
2312
2313 #[tokio::test]
2314 async fn test_sa2_empty_hosts_rejected() {
2315 let instance = testing::instance();
2316 let mesh = HostMeshRef::from_hosts(Name::new("empty").unwrap(), vec![]);
2317 let result = spawn_admin([&mesh], instance, None, None).await;
2318 let err = result.unwrap_err().to_string();
2319 assert!(err.contains("SA-2"), "expected SA-2 error, got: {err}");
2320 }
2321
2322 #[test]
2327 fn test_sa3_host_set_insert_idempotent() {
2328 let addr_a: ChannelAddr = "tcp:127.0.0.1:2001".parse().unwrap();
2329 let addr_b: ChannelAddr = "tcp:127.0.0.1:2002".parse().unwrap();
2330
2331 let ref_a = HostRef(addr_a.clone()).mesh_agent();
2332 let ref_b = HostRef(addr_b.clone()).mesh_agent();
2333
2334 let mut set = HostSet::new();
2335 set.insert(addr_a.to_string(), ref_a.clone());
2336 set.insert(addr_b.to_string(), ref_b.clone());
2337 set.insert("duplicate_addr".to_string(), ref_a.clone());
2339
2340 let result = set.into_vec();
2341 assert_eq!(
2342 result.len(),
2343 2,
2344 "SA-3: duplicate ActorId must not add entry"
2345 );
2346 assert_eq!(
2347 result[0].0,
2348 addr_a.to_string(),
2349 "SA-3: first-seen order preserved"
2350 );
2351 assert_eq!(
2352 result[1].0,
2353 addr_b.to_string(),
2354 "SA-3: first-seen order preserved"
2355 );
2356 }
2357
2358 #[test]
2359 fn test_sa3_aggregate_hosts_dedup() {
2360 let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2361 let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2362 let addr_c: ChannelAddr = "tcp:127.0.0.1:1003".parse().unwrap();
2363
2364 let mesh_a = HostMeshRef::from_hosts(
2366 Name::new("mesh_a").unwrap(),
2367 vec![addr_a.clone(), addr_b.clone()],
2368 );
2369 let mesh_b = HostMeshRef::from_hosts(
2371 Name::new("mesh_b").unwrap(),
2372 vec![addr_b.clone(), addr_c.clone()],
2373 );
2374
2375 let result = aggregate_hosts(&[&mesh_a, &mesh_b], None);
2376
2377 assert_eq!(result.len(), 3, "expected 3 hosts, got {:?}", result);
2379
2380 let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2382 assert_eq!(addrs[0], addr_a.to_string());
2383 assert_eq!(addrs[1], addr_b.to_string());
2384 assert_eq!(addrs[2], addr_c.to_string());
2385 }
2386
2387 #[test]
2390 fn test_sa6_ch1_client_host_dedup() {
2391 let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2392 let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2393
2394 let mesh = HostMeshRef::from_hosts(
2395 Name::new("mesh").unwrap(),
2396 vec![addr_a.clone(), addr_b.clone()],
2397 );
2398
2399 let client_ref = HostRef(addr_a.clone()).mesh_agent();
2401 let client_entries = vec![("client_addr".to_string(), client_ref)];
2402
2403 let result = aggregate_hosts(&[&mesh], Some(client_entries));
2404
2405 assert_eq!(result.len(), 2, "expected 2 hosts, got {:?}", result);
2407 let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2408 assert_eq!(addrs[0], addr_a.to_string());
2409 assert_eq!(addrs[1], addr_b.to_string());
2410 }
2411}