1#![allow(clippy::result_large_err)]
47
48use hyperactor::Actor;
49use hyperactor::ActorRef;
50use hyperactor::Endpoint as _;
51use hyperactor::Handler;
52use hyperactor::accum::StreamingReducerOpts;
53use hyperactor::channel::ChannelTransport;
54use hyperactor::id::Label;
55use hyperactor_config::CONFIG;
56use hyperactor_config::ConfigAttr;
57use hyperactor_config::attrs::declare_attrs;
58use ndslice::view::CollectMeshExt;
59
60use crate::mesh_admin::MeshAdminAgent;
61use crate::supervision::MeshFailure;
62
63pub mod host_agent;
64
65use std::collections::HashSet;
66use std::hash::Hash;
67use std::ops::Deref;
68use std::ops::DerefMut;
69use std::str::FromStr;
70use std::sync::Arc;
71use std::time::Duration;
72
73use hyperactor::ActorAddr;
74use hyperactor::ProcAddr;
75use hyperactor::channel::ChannelAddr;
76use hyperactor::context;
77use ndslice::Extent;
78use ndslice::Region;
79use ndslice::ViewExt;
80use ndslice::extent;
81use ndslice::view;
82use ndslice::view::Ranked;
83use ndslice::view::RegionParseError;
84use serde::Deserialize;
85use serde::Serialize;
86use tracing::Instrument;
87use typeuri::Named;
88
89use crate::Bootstrap;
90use crate::ProcMesh;
91use crate::ValueMesh;
92use crate::bootstrap::BootstrapCommand;
93use crate::bootstrap::BootstrapProcManager;
94use crate::bootstrap::ProcBind;
95use crate::host::Host;
96use crate::host::LocalProcManager;
97use crate::host::SERVICE_PROC_NAME;
98use crate::host_mesh::host_agent::DrainHostClient;
99pub use crate::host_mesh::host_agent::HostAgent;
100use crate::host_mesh::host_agent::HostAgentMode;
101use crate::host_mesh::host_agent::ProcManagerSpawnFn;
102use crate::host_mesh::host_agent::ProcState;
103use crate::host_mesh::host_agent::ShutdownHostClient;
104use crate::mesh_controller::ProcMeshController;
105use crate::mesh_id::HostMeshId;
106use crate::mesh_id::ProcMeshId;
107use crate::mesh_id::ResourceId;
108use crate::proc_agent::ProcAgent;
109use crate::proc_mesh::ProcMeshRef;
110use crate::resource;
111use crate::resource::CreateOrUpdateClient;
112use crate::resource::GetRankStatus;
113use crate::resource::GetRankStatusClient;
114use crate::resource::RankedValues;
115use crate::resource::Status;
116use crate::resource::WaitRankStatusClient;
117use crate::transport::DEFAULT_TRANSPORT;
118
119pub const PROC_MESH_CONTROLLER_NAME: &str = "proc_mesh_controller";
121
122declare_attrs! {
123 @meta(CONFIG = ConfigAttr::new(
126 Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
127 Some("mesh_proc_spawn_max_idle".to_string()),
128 ))
129 pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
130
131 @meta(CONFIG = ConfigAttr::new(
134 Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
135 Some("proc_stop_max_idle".to_string()),
136 ))
137 pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
138
139 @meta(CONFIG = ConfigAttr::new(
142 Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
143 Some("get_proc_state_max_idle".to_string()),
144 ))
145 pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
150pub struct HostRef(pub(crate) ChannelAddr);
151wirevalue::register_type!(HostRef);
152
153impl HostRef {
154 pub(crate) fn mesh_agent(&self) -> ActorRef<HostAgent> {
156 ActorRef::attest(
157 self.service_proc()
158 .actor_addr(host_agent::HOST_MESH_AGENT_ACTOR_NAME),
159 )
160 }
161
162 fn named_proc(&self, id: &ResourceId) -> ProcAddr {
164 id.proc_addr(self.0.clone())
165 }
166
167 fn service_proc(&self) -> ProcAddr {
169 ResourceId::proc_addr_from_name(self.0.clone(), SERVICE_PROC_NAME)
170 }
171
172 pub(crate) async fn shutdown(
191 &self,
192 cx: &impl hyperactor::context::Actor,
193 ) -> anyhow::Result<()> {
194 let agent = self.mesh_agent();
195 let terminate_timeout =
196 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
197 let max_in_flight =
198 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
199 agent
200 .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
201 .await?;
202 Ok(())
203 }
204
205 pub(crate) async fn drain(
209 &self,
210 cx: &impl hyperactor::context::Actor,
211 host_mesh_id: Option<crate::mesh_id::HostMeshId>,
212 ) -> anyhow::Result<()> {
213 let agent = self.mesh_agent();
214 let terminate_timeout =
215 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
216 let max_in_flight =
217 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
218 agent
219 .drain_host(
220 cx,
221 terminate_timeout,
222 max_in_flight.clamp(1, 256),
223 host_mesh_id,
224 )
225 .await?;
226 Ok(())
227 }
228}
229
230impl TryFrom<ActorRef<HostAgent>> for HostRef {
231 type Error = crate::Error;
232
233 fn try_from(value: ActorRef<HostAgent>) -> Result<Self, crate::Error> {
234 let proc_id = value.actor_addr().proc_addr();
235 Ok(HostRef(proc_id.addr().clone()))
236 }
237}
238
239impl std::fmt::Display for HostRef {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 self.0.fmt(f)
242 }
243}
244
245impl FromStr for HostRef {
246 type Err = <ChannelAddr as FromStr>::Err;
247
248 fn from_str(s: &str) -> Result<Self, Self::Err> {
249 Ok(HostRef(ChannelAddr::from_str(s)?))
250 }
251}
252
253#[derive(Debug, thiserror::Error)]
263pub enum ConfigPushFailure {
264 #[error("send failed: {0}")]
267 SendFailed(#[source] Box<hyperactor::mailbox::MailboxSenderError>),
268
269 #[error("reply timed out after MESH_ATTACH_CONFIG_TIMEOUT")]
276 ReplyTimedOut,
277
278 #[error("reply channel closed before reply")]
281 ReplyChannelClosed,
282}
283
284#[derive(Debug)]
292pub struct ConfigPushError {
293 pub failures: Vec<(HostRef, ConfigPushFailure)>,
295}
296
297impl std::fmt::Display for ConfigPushError {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 write!(
300 f,
301 "config push failed during attach on {} host(s):",
302 self.failures.len()
303 )?;
304 for (host, failure) in &self.failures {
305 write!(f, "\n - {}: {}", host, failure)?;
306 }
307 Ok(())
308 }
309}
310
311impl std::error::Error for ConfigPushError {
312 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
313 None
317 }
318}
319
320async fn push_config_to_host(
330 cx: &impl context::Actor,
331 host: &HostRef,
332 attrs: hyperactor_config::attrs::Attrs,
333 per_host_timeout: Duration,
334) -> Result<(), ConfigPushFailure> {
335 use crate::host_mesh::host_agent::SetClientConfig;
336
337 let (reply_handle, mut reply_receiver) = hyperactor::mailbox::open_port::<()>(cx);
338 let reply_ref = reply_handle.bind();
339 let msg = SetClientConfig {
340 attrs,
341 done: reply_ref,
342 };
343
344 let mut request_port = host.mesh_agent().port::<SetClientConfig>();
345 request_port.return_undeliverable(false);
351
352 request_port.post(cx, msg);
353
354 match tokio::time::timeout(per_host_timeout, reply_receiver.recv()).await {
355 Ok(Ok(())) => Ok(()),
356 Ok(Err(_recv_err)) => Err(ConfigPushFailure::ReplyChannelClosed),
357 Err(_elapsed) => Err(ConfigPushFailure::ReplyTimedOut),
358 }
359}
360
361pub struct HostMesh {
372 id: HostMeshId,
373 extent: Extent,
374 owned_hosts: Vec<HostRef>,
377 current_ref: HostMeshRef,
378}
379
380impl HostMesh {
381 fn notify_created(&self) {
383 let name_str = self.id.to_string();
384 let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
385
386 hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
387 id: mesh_id_hash,
388 timestamp: std::time::SystemTime::now(),
389 class: "Host".to_string(),
390 given_name: self
391 .id
392 .display_label()
393 .map(|l| l.as_str())
394 .unwrap_or("unnamed")
395 .to_string(),
396 full_name: name_str,
397 shape_json: serde_json::to_string(&self.extent).unwrap_or_default(),
398 parent_mesh_id: None,
399 parent_view_json: None,
400 });
401
402 let now = std::time::SystemTime::now();
405 for (rank, host) in self.current_ref.hosts().iter().enumerate() {
406 let actor = host.mesh_agent();
407 hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
408 id: hyperactor_telemetry::hash_to_u64(&actor.actor_addr()),
409 timestamp: now,
410 mesh_id: mesh_id_hash,
411 rank: rank as u64,
412 full_name: actor.actor_addr().to_string(),
413 display_name: None,
414 });
415 }
416 }
417
418 pub async fn local() -> crate::Result<HostMesh> {
441 Self::local_with_bootstrap(BootstrapCommand::current()?).await
442 }
443
444 pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> crate::Result<HostMesh> {
452 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
453 let result = boot.bootstrap().await;
454 if let Err(err) = result {
455 tracing::error!("failed to bootstrap local host mesh process: {}", err);
456 }
457 std::process::exit(1);
458 }
459
460 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
461
462 let manager = BootstrapProcManager::new(bootstrap_cmd)?;
463 let host = Host::new(manager, addr).await?;
464 let addr = host.addr().clone();
465 let system_proc = host.system_proc().clone();
466 let host_mesh_agent = system_proc
467 .spawn(
468 "host_agent",
469 HostAgent::new(HostAgentMode::Process {
470 host,
471 shutdown_tx: None,
472 }),
473 )
474 .map_err(crate::Error::SingletonActorSpawnError)?;
475 host_mesh_agent.bind::<HostAgent>();
476
477 let host = HostRef(addr);
478 let host_mesh_ref = HostMeshRef::new(
479 HostMeshId::instance(Label::new("local").unwrap()),
480 extent!(hosts = 1).into(),
481 vec![host],
482 )?;
483 Ok(HostMesh::take(host_mesh_ref))
484 }
485
486 pub async fn local_in_process() -> crate::Result<HostMesh> {
496 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
497 Ok(HostMesh::take(Self::local_n_in_process(vec![addr]).await?))
498 }
499
500 pub(crate) async fn local_n_in_process(addrs: Vec<ChannelAddr>) -> crate::Result<HostMeshRef> {
509 let n = addrs.len();
510 let mut host_refs = Vec::with_capacity(n);
511 for addr in addrs {
512 host_refs.push(Self::create_in_process_host(addr).await?);
513 }
514 HostMeshRef::new(
515 HostMeshId::instance(Label::new("local").unwrap()),
516 extent!(hosts = n).into(),
517 host_refs,
518 )
519 }
520
521 async fn create_in_process_host(addr: ChannelAddr) -> crate::Result<HostRef> {
524 let spawn: ProcManagerSpawnFn =
525 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
526 let manager = LocalProcManager::new(spawn);
527 let host = Host::new(manager, addr).await?;
528 let addr = host.addr().clone();
529 let system_proc = host.system_proc().clone();
530 let host_mesh_agent = system_proc
531 .spawn(
532 host_agent::HOST_MESH_AGENT_ACTOR_NAME,
533 HostAgent::new(HostAgentMode::Local(host)),
534 )
535 .map_err(crate::Error::SingletonActorSpawnError)?;
536 host_mesh_agent.bind::<HostAgent>();
537 Ok(HostRef(addr))
538 }
539
540 pub async fn process(extent: Extent, command: BootstrapCommand) -> crate::Result<HostMesh> {
551 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
552 let result = boot.bootstrap().await;
553 if let Err(err) = result {
554 tracing::error!("failed to bootstrap process host mesh process: {}", err);
555 }
556 std::process::exit(1);
557 }
558
559 let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
560 let mut hosts = Vec::with_capacity(extent.num_ranks());
561 for _ in 0..extent.num_ranks() {
562 let addr = bind_spec.binding_addr();
564 let bootstrap = Bootstrap::Host {
565 addr: addr.clone(),
566 command: Some(command.clone()),
567 config: Some(hyperactor_config::global::attrs()),
568 exit_on_shutdown: false,
569 };
570
571 let mut cmd = command.new();
572 bootstrap.to_env(&mut cmd);
573 cmd.spawn()?;
574 hosts.push(HostRef(addr));
575 }
576
577 let host_mesh_ref = HostMeshRef::new(
578 HostMeshId::instance(Label::new("process").unwrap()),
579 extent.into(),
580 hosts,
581 )?;
582 Ok(HostMesh::take(host_mesh_ref))
583 }
584 pub fn take(mesh: HostMeshRef) -> Self {
591 let region = mesh.region().clone();
592 let hosts: Vec<HostRef> = mesh.values().collect();
593
594 let current_ref = HostMeshRef::new(mesh.id.clone(), region.clone(), hosts.clone())
595 .expect("region/hosts cardinality must match");
596
597 let result = Self {
598 id: mesh.id,
599 extent: region.extent().clone(),
600 owned_hosts: hosts,
601 current_ref,
602 };
603 result.notify_created();
604 result
605 }
606
607 pub async fn attach(
621 cx: &impl context::Actor,
622 id: HostMeshId,
623 addresses: Vec<ChannelAddr>,
624 ) -> crate::Result<Self> {
625 let mesh_ref = HostMeshRef::from_hosts(id, addresses);
626 let config = hyperactor_config::global::propagatable_attrs();
627 mesh_ref.push_config(cx, config).await?;
628 Ok(Self::take(mesh_ref))
629 }
630
631 #[hyperactor::instrument(fields(host_mesh=self.id.to_string()))]
642 pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
643 let t0 = std::time::Instant::now();
644 tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
645
646 let results = futures::future::join_all(
649 self.current_ref
650 .values()
651 .map(|host| async move { host.drain(cx, None).await }),
652 )
653 .await;
654 let phase1_ms = t0.elapsed().as_millis();
655 for result in &results {
656 if let Err(e) = result {
657 tracing::warn!(
658 name = "HostMeshStatus",
659 status = "Shutdown::Drain::Failed",
660 error = %e,
661 "drain failed on a host"
662 );
663 }
664 }
665
666 let t1 = std::time::Instant::now();
668 let results = futures::future::join_all(self.current_ref.values().map(|host| async move {
669 let result = host.shutdown(cx).await;
670 (host, result)
671 }))
672 .await;
673 let phase2_ms = t1.elapsed().as_millis();
674 let total_ms = t0.elapsed().as_millis();
675 let mut failed_hosts = vec![];
676 for (host, result) in &results {
677 if let Err(e) = result {
678 tracing::warn!(
679 name = "HostMeshStatus",
680 status = "Shutdown::Host::Failed",
681 host = %host,
682 error = %e,
683 "host shutdown failed"
684 );
685 failed_hosts.push(host);
686 }
687 }
688 if failed_hosts.is_empty() {
689 tracing::info!(
690 name = "HostMeshStatus",
691 status = "Shutdown::Success",
692 phase1_ms,
693 phase2_ms,
694 total_ms,
695 );
696 } else {
697 tracing::error!(
698 name = "HostMeshStatus",
699 status = "Shutdown::Failed",
700 phase1_ms,
701 phase2_ms,
702 total_ms,
703 "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
704 failed_hosts
705 );
706 }
707
708 Ok(())
709 }
710
711 pub fn shutdown_guard(self) -> HostMeshShutdownGuard {
714 HostMeshShutdownGuard(self)
715 }
716
717 #[hyperactor::instrument(fields(host_mesh=self.id.to_string()))]
724 pub async fn stop(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
725 let t0 = std::time::Instant::now();
726 tracing::info!(name = "HostMeshStatus", status = "Stop::Attempt");
727
728 let mesh_id = self.id.clone();
729 let results = futures::future::join_all(self.current_ref.values().map(|host| {
730 let mesh_id = Some(mesh_id.clone());
731 async move { host.drain(cx, mesh_id).await }
732 }))
733 .await;
734 let total_ms = t0.elapsed().as_millis();
735 let mut failed_hosts = vec![];
736 for (i, result) in results.iter().enumerate() {
737 if let Err(e) = result {
738 tracing::warn!(
739 name = "HostMeshStatus",
740 status = "Stop::Drain::Failed",
741 error = %e,
742 "drain failed on a host"
743 );
744 failed_hosts.push(i);
745 }
746 }
747 if failed_hosts.is_empty() {
748 tracing::info!(name = "HostMeshStatus", status = "Stop::Success", total_ms,);
749 } else {
750 tracing::error!(
751 name = "HostMeshStatus",
752 status = "Stop::Failed",
753 total_ms,
754 "host mesh stop failed; check the logs of the failed hosts for details: {:?}",
755 failed_hosts
756 );
757 }
758
759 self.owned_hosts.clear();
762
763 Ok(())
764 }
765}
766
767impl HostMesh {
768 pub fn set_bootstrap(&mut self, cmd: BootstrapCommand) {
773 self.current_ref = self.current_ref.clone().with_bootstrap(cmd);
774 }
775}
776
777impl Deref for HostMesh {
778 type Target = HostMeshRef;
779
780 fn deref(&self) -> &Self::Target {
781 &self.current_ref
782 }
783}
784
785impl AsRef<HostMeshRef> for HostMesh {
786 fn as_ref(&self) -> &HostMeshRef {
787 self
788 }
789}
790
791impl AsRef<HostMeshRef> for HostMeshRef {
792 fn as_ref(&self) -> &HostMeshRef {
793 self
794 }
795}
796
797pub struct HostMeshShutdownGuard(pub HostMesh);
799
800impl Deref for HostMeshShutdownGuard {
801 type Target = HostMesh;
802
803 fn deref(&self) -> &HostMesh {
804 &self.0
805 }
806}
807
808impl DerefMut for HostMeshShutdownGuard {
809 fn deref_mut(&mut self) -> &mut HostMesh {
810 &mut self.0
811 }
812}
813
814impl Drop for HostMeshShutdownGuard {
815 fn drop(&mut self) {
833 tracing::info!(
834 name = "HostMeshStatus",
835 host_mesh = %self.0.id,
836 status = "Dropping",
837 );
838 let hosts: Vec<HostRef> = self.0.owned_hosts.clone();
840
841 if let Ok(handle) = tokio::runtime::Handle::try_current() {
843 let mesh_id = self.0.id.clone();
844 let span = tracing::info_span!(
845 "hostmesh_drop_cleanup",
846 host_mesh = %mesh_id,
847 hosts = hosts.len(),
848 );
849
850 handle.spawn(
851 async move {
852 match hyperactor::Proc::direct(
855 ChannelTransport::Unix.any(),
856 "hostmesh-drop".to_string(),
857 ) {
858 Err(e) => {
859 tracing::warn!(
860 error = %e,
861 "failed to construct ephemeral Proc for drop-cleanup; \
862 relying on PDEATHSIG/manager Drop"
863 );
864 }
865 Ok(proc) => match proc.client("drop") {
866 Err(e) => {
867 tracing::warn!(
868 error = %e,
869 "failed to create ephemeral instance for drop-cleanup; \
870 relying on PDEATHSIG/manager Drop"
871 );
872 }
873 Ok((instance, _guard)) => {
874 let mut attempted = 0usize;
875 let mut ok = 0usize;
876 let mut err = 0usize;
877
878 for host in hosts {
879 attempted += 1;
880 tracing::debug!(host = %host, "drop-cleanup: shutdown start");
881 match host.shutdown(&instance).await {
882 Ok(()) => {
883 ok += 1;
884 tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
885 }
886 Err(e) => {
887 err += 1;
888 tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
889 }
890 }
891 }
892
893 tracing::info!(
894 attempted, ok, err,
895 "hostmesh drop-cleanup summary"
896 );
897 }
898 },
899 }
900 }
901 .instrument(span),
902 );
903 } else {
904 tracing::warn!(
907 host_mesh = %self.0.id,
908 hosts = hosts.len(),
909 "HostMesh dropped without a Tokio runtime; skipping \
910 best-effort shutdown. This indicates that .shutdown() \
911 on this mesh has not been called before program exit \
912 (perhaps due to a missing call to \
913 'monarch.actor.shutdown_context()'?) This in turn can \
914 lead to backtrace output due to folly SIGTERM \
915 handlers."
916 );
917 }
918
919 tracing::info!(
920 name = "HostMeshStatus",
921 host_mesh = %self.0.id,
922 status = "Dropped",
923 );
924 }
925}
926
927pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
936 mesh: &ValueMesh<T>,
937 default: T,
938 is_sentinel: F,
939 len: usize,
940) -> RankedValues<T>
941where
942 T: Eq + Clone + 'static,
943 F: Fn(&T) -> bool,
944{
945 let mut out = RankedValues::from((0..len, default));
946 for (i, s) in mesh.values().enumerate() {
947 if !is_sentinel(&s) {
948 out.merge_from(RankedValues::from((i..i + 1, s)));
949 }
950 }
951 out
952}
953
954#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
973pub struct HostMeshRef {
974 id: HostMeshId,
975 region: Region,
976 ranks: Arc<Vec<HostRef>>,
977 #[serde(default)]
982 pub bootstrap_command: Option<BootstrapCommand>,
983}
984
985pub type PerRankBootstrapFn = dyn Fn(view::Point) -> anyhow::Result<BootstrapCommand> + Send + Sync;
991wirevalue::register_type!(HostMeshRef);
992
993impl HostMeshRef {
994 #[allow(clippy::result_large_err)]
997 fn new(id: HostMeshId, region: Region, ranks: Vec<HostRef>) -> crate::Result<Self> {
998 if region.num_ranks() != ranks.len() {
999 return Err(crate::Error::InvalidRankCardinality {
1000 expected: region.num_ranks(),
1001 actual: ranks.len(),
1002 });
1003 }
1004 Ok(Self {
1005 id,
1006 region,
1007 ranks: Arc::new(ranks),
1008 bootstrap_command: None,
1009 })
1010 }
1011
1012 pub fn from_hosts(id: HostMeshId, hosts: Vec<ChannelAddr>) -> Self {
1015 Self {
1016 id,
1017 region: extent!(hosts = hosts.len()).into(),
1018 ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
1019 bootstrap_command: None,
1020 }
1021 }
1022
1023 pub fn from_host_agents(
1025 id: HostMeshId,
1026 agents: Vec<ActorRef<HostAgent>>,
1027 ) -> crate::Result<Self> {
1028 Ok(Self {
1029 id,
1030 region: extent!(hosts = agents.len()).into(),
1031 ranks: Arc::new(
1032 agents
1033 .into_iter()
1034 .map(HostRef::try_from)
1035 .collect::<crate::Result<_>>()?,
1036 ),
1037 bootstrap_command: None,
1038 })
1039 }
1040
1041 pub fn from_host_agent(id: HostMeshId, agent: ActorRef<HostAgent>) -> crate::Result<Self> {
1043 Ok(Self {
1044 id,
1045 region: Extent::unity().into(),
1046 ranks: Arc::new(vec![HostRef::try_from(agent)?]),
1047 bootstrap_command: None,
1048 })
1049 }
1050
1051 pub fn with_bootstrap(self, cmd: BootstrapCommand) -> Self {
1054 Self {
1055 bootstrap_command: Some(cmd),
1056 ..self
1057 }
1058 }
1059
1060 pub(crate) fn host_entries(&self) -> Vec<(String, ActorRef<HostAgent>)> {
1064 self.ranks
1065 .iter()
1066 .map(|h| (h.0.to_string(), h.mesh_agent()))
1067 .collect()
1068 }
1069
1070 pub(crate) async fn push_config(
1082 &self,
1083 cx: &impl context::Actor,
1084 attrs: hyperactor_config::attrs::Attrs,
1085 ) -> Result<(), ConfigPushError> {
1086 let timeout = hyperactor_config::global::get(crate::config::MESH_ATTACH_CONFIG_TIMEOUT);
1087 let hosts: Vec<_> = self.values().collect();
1088
1089 let per_host = hosts.into_iter().map(|host| {
1090 let attrs = attrs.clone();
1091 async move {
1092 (
1093 host.clone(),
1094 push_config_to_host(cx, &host, attrs, timeout).await,
1095 )
1096 }
1097 });
1098
1099 let results = futures::future::join_all(per_host).await;
1100
1101 let mut failures = Vec::new();
1102 let mut success = 0_usize;
1103 for (host, outcome) in results {
1104 match outcome {
1105 Ok(()) => {
1106 success += 1;
1107 tracing::debug!(host = %host, "host agent config installed");
1108 }
1109 Err(failure) => {
1110 tracing::warn!(host = %host, error = %failure, "config push failed");
1111 failures.push((host, failure));
1112 }
1113 }
1114 }
1115
1116 if failures.is_empty() {
1117 tracing::info!(success, "push_config complete");
1118 Ok(())
1119 } else {
1120 tracing::info!(
1121 success,
1122 failed = failures.len(),
1123 "push_config complete with failures",
1124 );
1125 Err(ConfigPushError { failures })
1126 }
1127 }
1128
1129 #[allow(clippy::result_large_err)]
1147 pub async fn spawn<C: context::Actor>(
1148 &self,
1149 cx: &C,
1150 name: &str,
1151 per_host: Extent,
1152 proc_bind: Option<Vec<ProcBind>>,
1153 per_rank_bootstrap: Option<Box<PerRankBootstrapFn>>,
1154 ) -> crate::Result<ProcMesh>
1155 where
1156 C::A: Handler<MeshFailure>,
1157 {
1158 self.spawn_inner(
1159 cx,
1160 ProcMeshId::instance(Label::strip(name)),
1161 per_host,
1162 proc_bind,
1163 per_rank_bootstrap,
1164 )
1165 .await
1166 }
1167
1168 #[hyperactor::instrument(fields(host_mesh=self.id.to_string(), proc_mesh=proc_mesh_id.to_string()))]
1169 async fn spawn_inner<C: context::Actor>(
1170 &self,
1171 cx: &C,
1172 proc_mesh_id: ProcMeshId,
1173 per_host: Extent,
1174 proc_bind: Option<Vec<ProcBind>>,
1175 per_rank_bootstrap: Option<Box<PerRankBootstrapFn>>,
1176 ) -> crate::Result<ProcMesh>
1177 where
1178 C::A: Handler<MeshFailure>,
1179 {
1180 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
1181 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
1182 let result = self
1183 .spawn_inner_inner(cx, proc_mesh_id, per_host, proc_bind, per_rank_bootstrap)
1184 .await;
1185 match &result {
1186 Ok(_) => {
1187 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
1188 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
1189 }
1190 Err(error) => {
1191 tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
1192 tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
1193 }
1194 }
1195 result
1196 }
1197
1198 async fn spawn_inner_inner<C: context::Actor>(
1199 &self,
1200 cx: &C,
1201 proc_mesh_id: ProcMeshId,
1202 per_host: Extent,
1203 proc_bind: Option<Vec<ProcBind>>,
1204 per_rank_bootstrap: Option<Box<PerRankBootstrapFn>>,
1205 ) -> crate::Result<ProcMesh>
1206 where
1207 C::A: Handler<MeshFailure>,
1208 {
1209 let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
1210 let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
1211 if !per_host_labels
1212 .intersection(&host_labels)
1213 .collect::<Vec<_>>()
1214 .is_empty()
1215 {
1216 return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1217 "per_host dims overlap with existing dims when spawning proc mesh"
1218 )));
1219 }
1220 if let Some(proc_bind) = proc_bind.as_ref()
1221 && proc_bind.len() != per_host.num_ranks()
1222 {
1223 return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1224 "proc_bind length does not match per_host extent"
1225 )));
1226 }
1227
1228 let extent = self
1229 .region
1230 .extent()
1231 .concat(&per_host)
1232 .map_err(|err| crate::Error::ConfigurationError(err.into()))?;
1233
1234 let region: Region = extent.clone().into();
1235
1236 tracing::info!(
1237 name = "ProcMeshStatus",
1238 status = "Spawn::Attempt",
1239 %region,
1240 "spawning proc mesh"
1241 );
1242
1243 let mut procs = Vec::new();
1244 let num_ranks = region.num_ranks();
1245 let (port, rx) = cx.mailbox().open_accum_port_opts(
1248 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1249 StreamingReducerOpts {
1250 max_update_interval: Some(Duration::from_millis(50)),
1251 initial_update_interval: None,
1252 },
1253 );
1254
1255 let mut proc_names = Vec::new();
1262 let client_config_override = hyperactor_config::global::propagatable_attrs();
1263 for (host_rank, host) in self.ranks.iter().enumerate() {
1264 for per_host_rank in 0..per_host.num_ranks() {
1265 let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
1266 let proc_name = ResourceId::instance(Label::strip(&format!(
1267 "{}-{}",
1268 proc_mesh_id
1269 .display_label()
1270 .map(|l| l.as_str())
1271 .unwrap_or("unnamed"),
1272 per_host_rank
1273 )));
1274 proc_names.push(proc_name.clone());
1275 let bind = proc_bind.as_ref().map(|v| v[per_host_rank].clone());
1276 let bootstrap_command = match per_rank_bootstrap.as_ref() {
1277 Some(f) => Some(
1278 f(extent
1279 .point_of_rank(create_rank)
1280 .expect("rank in combined extent"))
1281 .map_err(crate::Error::ConfigurationError)?,
1282 ),
1283 None => self.bootstrap_command.clone(),
1284 };
1285 let proc_spec = resource::ProcSpec {
1286 client_config_override: client_config_override.clone(),
1287 bootstrap_command,
1288 proc_bind: bind,
1289 host_mesh_id: Some(self.id.clone()),
1290 };
1291 host.mesh_agent()
1292 .create_or_update(
1293 cx,
1294 proc_name.clone(),
1295 resource::Rank::new(create_rank),
1296 proc_spec,
1297 )
1298 .await
1299 .map_err(|e| {
1300 crate::Error::HostMeshAgentConfigurationError(
1301 host.mesh_agent().actor_addr().clone(),
1302 format!("failed while creating proc: {}", e),
1303 )
1304 })?;
1305 let mut reply_port = port.bind();
1306 reply_port.return_undeliverable(false);
1309 host.mesh_agent()
1310 .get_rank_status(cx, proc_name.clone(), reply_port)
1311 .await
1312 .map_err(|e| {
1313 crate::Error::HostMeshAgentConfigurationError(
1314 host.mesh_agent().actor_addr().clone(),
1315 format!("failed while querying proc status: {}", e),
1316 )
1317 })?;
1318 let proc_id = host.named_proc(&proc_name);
1319 tracing::info!(
1320 name = "ProcMeshStatus",
1321 status = "Spawn::CreatingProc",
1322 %proc_id,
1323 rank = create_rank,
1324 );
1325 procs.push(crate::proc_mesh::ProcRef::new(
1326 proc_id,
1327 create_rank,
1328 ActorRef::attest(
1330 host.named_proc(&proc_name)
1331 .actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME),
1332 ),
1333 ));
1334 }
1335 }
1336
1337 let start_time = tokio::time::Instant::now();
1338
1339 match GetRankStatus::wait(
1342 rx,
1343 num_ranks,
1344 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1345 region.clone(), )
1347 .await
1348 {
1349 Ok(statuses) => {
1350 if let Some((rank, status)) = statuses
1353 .values()
1354 .enumerate()
1355 .find(|(_, s)| s.is_terminating())
1356 {
1357 let proc_name = &proc_names[rank];
1358 let host_rank = rank / per_host.num_ranks();
1359 let mesh_agent = self.ranks[host_rank].mesh_agent();
1360 let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
1361 let mut reply_tx = reply_tx.bind();
1362 reply_tx.return_undeliverable(false);
1365 mesh_agent.post(
1366 cx,
1367 resource::GetState {
1368 id: proc_name.clone(),
1369 reply: reply_tx,
1370 },
1371 );
1372 let state = match tokio::time::timeout(
1373 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1374 reply_rx.recv(),
1375 )
1376 .await
1377 {
1378 Ok(Ok(state)) => state,
1379 _ => resource::State {
1380 id: proc_name.clone(),
1381 status,
1382 state: None,
1383 generation: 0,
1384 timestamp: std::time::SystemTime::now(),
1385 },
1386 };
1387
1388 tracing::error!(
1389 name = "ProcMeshStatus",
1390 status = "Spawn::GetRankStatus",
1391 rank = host_rank,
1392 "rank {} is terminating with state: {}",
1393 host_rank,
1394 state
1395 );
1396
1397 return Err(crate::Error::ProcCreationError {
1398 state: Box::new(state),
1399 host_rank,
1400 mesh_agent,
1401 });
1402 }
1403 }
1404 Err(complete) => {
1405 tracing::error!(
1406 name = "ProcMeshStatus",
1407 status = "Spawn::GetRankStatus",
1408 "timeout after {:?} when waiting for procs being created",
1409 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1410 );
1411 let legacy = mesh_to_rankedvalues_with_default(
1414 &complete,
1415 Status::Timeout(start_time.elapsed()),
1416 Status::is_not_exist,
1417 num_ranks,
1418 );
1419 return Err(crate::Error::ProcSpawnError { statuses: legacy });
1420 }
1421 }
1422
1423 let mut mesh = ProcMesh::create(cx, proc_mesh_id, extent, self.clone(), procs).await;
1424 if let Ok(ref mut mesh) = mesh {
1425 let mesh_ref: ProcMeshRef = (**mesh).clone();
1429 let region = ndslice::view::Ranked::region(&mesh_ref).clone();
1430 let initial_statuses: crate::ValueMesh<resource::Status> =
1431 std::iter::repeat_n(resource::Status::Running, region.num_ranks())
1432 .collect_mesh::<crate::ValueMesh<_>>(region)?;
1433 let controller = ProcMeshController::new(mesh_ref, None, None, initial_statuses);
1434 let controller_name = format!("{}_{}", PROC_MESH_CONTROLLER_NAME, mesh.id());
1437 let controller_handle =
1438 controller
1439 .spawn_with_name(cx, &controller_name)
1440 .map_err(|e| {
1441 crate::Error::ControllerActorSpawnError(mesh.id().resource_id().clone(), e)
1442 })?;
1443 let controller_ref: ActorRef<ProcMeshController> = controller_handle.bind();
1448 mesh.set_controller(Some(controller_ref));
1449 }
1450 mesh
1451 }
1452
1453 pub fn id(&self) -> &HostMeshId {
1455 &self.id
1456 }
1457
1458 pub fn hosts(&self) -> &[HostRef] {
1460 &self.ranks
1461 }
1462
1463 #[hyperactor::instrument(fields(host_mesh=self.id.to_string(), proc_mesh=proc_mesh_id.to_string()))]
1474 pub(crate) async fn stop_proc_mesh(
1475 &self,
1476 cx: &impl hyperactor::context::Actor,
1477 proc_mesh_id: &ProcMeshId,
1478 procs: impl IntoIterator<Item = ProcAddr>,
1479 region: Region,
1480 reason: String,
1481 ) -> crate::Result<crate::StatusMesh> {
1482 let mut proc_names = Vec::new();
1485 let num_ranks = region.num_ranks();
1486 let (port, rx) = cx.mailbox().open_accum_port_opts(
1489 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1490 StreamingReducerOpts {
1491 max_update_interval: Some(Duration::from_millis(50)),
1492 initial_update_interval: None,
1493 },
1494 );
1495 for proc_id in procs.into_iter() {
1496 let addr = proc_id.addr().clone();
1497 let proc_resource_id = ResourceId::new(proc_id.uid().clone(), proc_id.label().cloned());
1501 proc_names.push(proc_resource_id.clone());
1502
1503 let host = HostRef(addr);
1506 host.mesh_agent().post(
1507 cx,
1508 resource::Stop {
1509 id: proc_resource_id.clone(),
1510 reason: reason.clone(),
1511 },
1512 );
1513 host.mesh_agent()
1514 .wait_rank_status(cx, proc_resource_id, Status::Stopped, port.bind())
1515 .await
1516 .map_err(|e| crate::Error::CallError(host.mesh_agent().actor_addr().clone(), e))?;
1517
1518 tracing::info!(
1519 name = "ProcMeshStatus",
1520 %proc_id,
1521 status = "Stop::Sent",
1522 );
1523 }
1524 tracing::info!(
1525 name = "HostMeshStatus",
1526 status = "ProcMesh::Stop::Sent",
1527 "sending Stop to proc mesh for {} procs: {}",
1528 proc_names.len(),
1529 proc_names
1530 .iter()
1531 .map(|n| n.to_string())
1532 .collect::<Vec<_>>()
1533 .join(", ")
1534 );
1535
1536 let start_time = tokio::time::Instant::now();
1537
1538 match GetRankStatus::wait(
1539 rx,
1540 num_ranks,
1541 hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1542 region.clone(), )
1544 .await
1545 {
1546 Ok(statuses) => {
1547 let all_stopped = statuses.values().all(|s| s.is_terminated());
1548 if !all_stopped {
1549 let legacy = mesh_to_rankedvalues_with_default(
1550 &statuses,
1551 Status::NotExist,
1552 Status::is_not_exist,
1553 num_ranks,
1554 );
1555 tracing::error!(
1556 name = "ProcMeshStatus",
1557 status = "FailedToStop",
1558 "failed to terminate proc mesh: {:?}",
1559 statuses,
1560 );
1561 return Err(crate::Error::ProcMeshStopError { statuses: legacy });
1562 }
1563 tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1564 Ok(statuses)
1565 }
1566 Err(complete) => {
1567 let legacy = mesh_to_rankedvalues_with_default(
1570 &complete,
1571 Status::Timeout(start_time.elapsed()),
1572 Status::is_not_exist,
1573 num_ranks,
1574 );
1575 tracing::error!(
1576 name = "ProcMeshStatus",
1577 status = "StoppingTimeout",
1578 "failed to terminate proc mesh {} before timeout: {:?}",
1579 proc_mesh_id,
1580 legacy,
1581 );
1582 Err(crate::Error::ProcMeshStopError { statuses: legacy })
1583 }
1584 }
1585 }
1586
1587 #[allow(clippy::result_large_err)]
1595 pub(crate) async fn proc_states(
1596 &self,
1597 cx: &impl context::Actor,
1598 procs: impl IntoIterator<Item = ProcAddr>,
1599 region: Region,
1600 keepalive: Option<std::time::SystemTime>,
1601 ) -> crate::Result<ValueMesh<resource::State<ProcState>>> {
1602 let (tx, mut rx) = cx.mailbox().open_port();
1603
1604 let mut num_ranks = 0;
1605 let procs: Vec<ProcAddr> = procs.into_iter().collect();
1606 let mut proc_names = Vec::new();
1607 for proc_id in procs.iter() {
1608 num_ranks += 1;
1609 let addr = proc_id.addr().clone();
1610
1611 let host = HostRef(addr);
1614 let proc_resource_id = ResourceId::new(proc_id.uid().clone(), proc_id.label().cloned());
1615 proc_names.push(proc_resource_id.clone());
1616 let mut reply = tx.bind();
1617 reply.return_undeliverable(false);
1620 let mut send_port = host.mesh_agent().port();
1621 send_port.return_undeliverable(false);
1625 let get_state = resource::GetState {
1626 id: proc_resource_id,
1627 reply,
1628 };
1629 if let Some(expires_after) = keepalive {
1630 let mut keepalive_port = host.mesh_agent().port();
1631 keepalive_port.return_undeliverable(false);
1632 keepalive_port.post(
1633 cx,
1634 resource::KeepaliveGetState {
1635 expires_after,
1636 get_state,
1637 },
1638 );
1639 } else {
1640 send_port.post(cx, get_state);
1641 }
1642 }
1643
1644 let mut states = Vec::with_capacity(num_ranks);
1645 let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1646 for _ in 0..num_ranks {
1647 let state = tokio::time::timeout(timeout, rx.recv()).await;
1653 if let Ok(state) = state {
1654 let state = state?;
1656 match state.state {
1657 Some(ref inner) => {
1658 states.push((inner.create_rank, state));
1659 }
1660 None => {
1661 return Err(crate::Error::NotExist(state.id));
1662 }
1663 }
1664 } else {
1665 tracing::warn!(
1668 "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1669 timeout
1670 );
1671 let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1672 let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1673 let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1674 assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1675 while states.len() < num_ranks {
1676 let rank = *leftover_ranks
1677 .pop()
1678 .expect("leftover ranks should not be empty");
1679 states.push((
1680 rank,
1682 resource::State {
1683 id: proc_names[rank].clone(),
1684 status: resource::Status::Timeout(timeout),
1685 state: None,
1686 generation: 0,
1687 timestamp: std::time::SystemTime::now(),
1688 },
1689 ));
1690 }
1691 break;
1692 }
1693 }
1694 states.sort_by_key(|(rank, _)| *rank);
1698 let vm = states
1699 .into_iter()
1700 .map(|(_, state)| state)
1701 .collect_mesh::<ValueMesh<_>>(region)?;
1702 Ok(vm)
1703 }
1704}
1705
1706struct HostSet {
1714 seen: HashSet<ActorAddr>,
1715 entries: Vec<(String, ActorRef<HostAgent>)>,
1716}
1717
1718impl HostSet {
1719 fn new() -> Self {
1720 Self {
1721 seen: HashSet::new(),
1722 entries: Vec::new(),
1723 }
1724 }
1725
1726 fn insert(&mut self, addr: String, agent_ref: ActorRef<HostAgent>) {
1729 if self.seen.insert(agent_ref.actor_addr().clone()) {
1730 self.entries.push((addr, agent_ref));
1731 }
1732 }
1733
1734 fn extend_from_mesh(&mut self, mesh: &HostMeshRef) {
1736 for h in mesh.hosts() {
1737 self.insert(h.0.to_string(), h.mesh_agent());
1738 }
1739 }
1740
1741 fn into_vec(self) -> Vec<(String, ActorRef<HostAgent>)> {
1742 self.entries
1743 }
1744}
1745
1746fn aggregate_hosts(
1753 meshes: &[impl AsRef<HostMeshRef>],
1754 client_host_entries: Option<Vec<(String, ActorRef<HostAgent>)>>,
1755) -> Vec<(String, ActorRef<HostAgent>)> {
1756 let mut set = HostSet::new();
1757
1758 for mesh in meshes {
1760 set.extend_from_mesh(mesh.as_ref());
1761 }
1762
1763 if let Some(entries) = client_host_entries {
1765 for (addr, agent_ref) in entries {
1766 set.insert(addr, agent_ref);
1767 }
1768 }
1769
1770 set.into_vec()
1771}
1772
1773pub async fn spawn_admin(
1787 meshes: impl IntoIterator<Item = impl AsRef<HostMeshRef>>,
1788 cx: &impl hyperactor::context::Actor,
1789 admin_addr: Option<std::net::SocketAddr>,
1790 telemetry_url: Option<String>,
1791) -> anyhow::Result<ActorRef<MeshAdminAgent>> {
1792 let meshes: Vec<_> = meshes.into_iter().collect();
1793 anyhow::ensure!(!meshes.is_empty(), "at least one mesh is required (SA-1)");
1794 for (i, mesh) in meshes.iter().enumerate() {
1795 anyhow::ensure!(
1796 !mesh.as_ref().hosts().is_empty(),
1797 "mesh at index {} has no hosts (SA-2)",
1798 i,
1799 );
1800 }
1801
1802 let client_entries =
1803 crate::global_context::try_this_host().map(|client_host| client_host.host_entries());
1804 let hosts = aggregate_hosts(&meshes, client_entries);
1805
1806 let root_client_id = cx.mailbox().actor_addr().clone();
1807
1808 let local_proc = cx.instance().proc();
1811 let agent_handle = local_proc.spawn(
1812 crate::mesh_admin::MESH_ADMIN_ACTOR_NAME,
1813 crate::mesh_admin::MeshAdminAgent::new(
1814 hosts,
1815 Some(root_client_id),
1816 admin_addr,
1817 telemetry_url,
1818 ),
1819 )?;
1820 let admin_ref = agent_handle.bind();
1821 Ok(admin_ref)
1822}
1823
1824impl view::Ranked for HostMeshRef {
1825 type Item = HostRef;
1826
1827 fn region(&self) -> &Region {
1828 &self.region
1829 }
1830
1831 fn get(&self, rank: usize) -> Option<&Self::Item> {
1832 self.ranks.get(rank)
1833 }
1834}
1835
1836impl view::RankedSliceable for HostMeshRef {
1837 fn sliced(&self, region: Region) -> Self {
1838 let ranks = self
1839 .region()
1840 .remap(®ion)
1841 .unwrap()
1842 .map(|index| self.get(index).unwrap().clone());
1843 Self {
1844 bootstrap_command: self.bootstrap_command.clone(),
1845 ..Self::new(self.id.clone(), region, ranks.collect()).unwrap()
1846 }
1847 }
1848}
1849
1850impl std::fmt::Display for HostMeshRef {
1851 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1852 write!(f, "{}:", self.id)?;
1853 for (rank, host) in self.ranks.iter().enumerate() {
1854 if rank > 0 {
1855 write!(f, ",")?;
1856 }
1857 write!(f, "{}", host)?;
1858 }
1859 write!(f, "@{}", self.region)
1860 }
1861}
1862
1863#[derive(thiserror::Error, Debug)]
1865pub enum HostMeshRefParseError {
1866 #[error(transparent)]
1867 RegionParseError(#[from] RegionParseError),
1868
1869 #[error("invalid host mesh ref: missing region")]
1870 MissingRegion,
1871
1872 #[error("invalid host mesh ref: missing id")]
1873 MissingId,
1874
1875 #[error(transparent)]
1876 InvalidId(#[from] crate::mesh_id::ResourceIdParseError),
1877
1878 #[error(transparent)]
1879 InvalidHostMeshRef(#[from] Box<crate::Error>),
1880
1881 #[error(transparent)]
1882 Other(#[from] anyhow::Error),
1883}
1884
1885impl From<crate::Error> for HostMeshRefParseError {
1886 fn from(err: crate::Error) -> Self {
1887 Self::InvalidHostMeshRef(Box::new(err))
1888 }
1889}
1890
1891impl FromStr for HostMeshRef {
1892 type Err = HostMeshRefParseError;
1893
1894 fn from_str(s: &str) -> Result<Self, Self::Err> {
1895 let (id_str, rest) = s.split_once(':').ok_or(HostMeshRefParseError::MissingId)?;
1896
1897 let id = HostMeshId::from_str(id_str)?;
1898
1899 let (hosts, region) = rest
1900 .split_once('@')
1901 .ok_or(HostMeshRefParseError::MissingRegion)?;
1902 let hosts = hosts
1903 .split(',')
1904 .map(|host| host.trim())
1905 .map(|host| host.parse::<HostRef>())
1906 .collect::<Result<Vec<_>, _>>()?;
1907 let region = region.parse()?;
1908 Ok(HostMeshRef::new(id, region, hosts)?)
1909 }
1910}
1911
1912#[cfg(test)]
1913mod tests {
1914 #[cfg(fbcode_build)]
1915 use std::assert_matches;
1916
1917 #[cfg(fbcode_build)]
1918 use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1919 #[cfg(fbcode_build)]
1920 use hyperactor_config::attrs::Attrs;
1921 use ndslice::ViewExt;
1922 use ndslice::extent;
1923 #[cfg(fbcode_build)]
1924 use timed_test::assert_no_process_leak;
1925 #[cfg(fbcode_build)]
1926 use tokio::process::Command;
1927
1928 use super::*;
1929 #[cfg(fbcode_build)]
1930 use crate::ActorMesh;
1931 #[cfg(fbcode_build)]
1932 use crate::Bootstrap;
1933 #[cfg(fbcode_build)]
1934 use crate::bootstrap::MESH_TAIL_LOG_LINES;
1935 #[cfg(fbcode_build)]
1936 use crate::comm::ENABLE_NATIVE_V1_CASTING;
1937 #[cfg(fbcode_build)]
1938 use crate::resource::Status;
1939 #[cfg(fbcode_build)]
1940 use crate::testactor;
1941 #[cfg(fbcode_build)]
1942 use crate::testactor::GetConfigAttrs;
1943 #[cfg(fbcode_build)]
1944 use crate::testactor::SetConfigAttrs;
1945 use crate::testing;
1946
1947 #[test]
1948 fn test_host_mesh_subset() {
1949 let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1950 .parse()
1951 .unwrap();
1952 assert_eq!(
1953 hosts.range("replica", 1).unwrap().to_string(),
1954 "test:local:3,local:4@2+replica=1/2,host=2/1"
1955 );
1956 }
1957
1958 #[test]
1959 fn test_host_mesh_ref_parse_roundtrip() {
1960 let host_mesh_ref = HostMeshRef::new(
1961 HostMeshId::singleton(Label::new("test").unwrap()),
1962 extent!(replica = 2, host = 2).into(),
1963 vec![
1964 "tcp:127.0.0.1:123".parse().unwrap(),
1965 "tcp:127.0.0.1:123".parse().unwrap(),
1966 "tcp:127.0.0.1:123".parse().unwrap(),
1967 "tcp:127.0.0.1:123".parse().unwrap(),
1968 ],
1969 )
1970 .unwrap();
1971
1972 let parsed: HostMeshRef = host_mesh_ref.to_string().parse().unwrap();
1973 assert_eq!(parsed.id().to_string(), host_mesh_ref.id().to_string());
1974 assert_eq!(parsed.region(), host_mesh_ref.region());
1975 assert_eq!(parsed.hosts(), host_mesh_ref.hosts());
1976 assert_eq!(parsed.bootstrap_command, host_mesh_ref.bootstrap_command);
1977 }
1978
1979 fn free_localhost_addr() -> ChannelAddr {
1985 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1986 ChannelAddr::Tcp(listener.local_addr().unwrap())
1987 }
1988
1989 #[cfg(fbcode_build)]
1990 async fn execute_extrinsic_allocation(config: &hyperactor_config::global::ConfigLock) {
1991 let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1992
1993 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1994
1995 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1996
1997 let mut children = Vec::new();
1998 for host in hosts.iter() {
1999 let mut cmd = Command::new(program.clone());
2000 let boot = Bootstrap::Host {
2001 addr: host.clone(),
2002 command: None, config: None,
2004 exit_on_shutdown: false,
2005 };
2006 boot.to_env(&mut cmd);
2007 cmd.kill_on_drop(true);
2008 children.push(cmd.spawn().unwrap());
2009 }
2010
2011 let instance = testing::instance();
2012 let host_mesh =
2013 HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("test").unwrap()), hosts);
2014
2015 let proc_mesh = host_mesh
2016 .spawn(&testing::instance(), "test", Extent::unity(), None, None)
2017 .await
2018 .unwrap();
2019
2020 let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
2021 .spawn(&testing::instance(), "test", &())
2022 .await
2023 .unwrap();
2024
2025 testactor::assert_mesh_shape(actor_mesh).await;
2026
2027 HostMesh::take(host_mesh)
2028 .shutdown(&instance)
2029 .await
2030 .expect("hosts shutdown");
2031 }
2032
2033 #[tokio::test]
2034 #[cfg(fbcode_build)]
2035 async fn test_extrinsic_allocation_v0() {
2036 let config = hyperactor_config::global::lock();
2037 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
2038 execute_extrinsic_allocation(&config).await;
2039 }
2040
2041 #[tokio::test]
2042 #[cfg(fbcode_build)]
2043 async fn test_extrinsic_allocation_v1() {
2044 let config = hyperactor_config::global::lock();
2045 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
2046 let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
2047 execute_extrinsic_allocation(&config).await;
2048 }
2049
2050 #[tokio::test]
2051 #[cfg(fbcode_build)]
2052 async fn test_failing_proc_allocation() {
2053 let lock = hyperactor_config::global::lock();
2054 let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
2055
2056 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2057
2058 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2059
2060 let mut children = Vec::new();
2061 for host in hosts.iter() {
2062 let mut cmd = Command::new(program.clone());
2063 let boot = Bootstrap::Host {
2064 addr: host.clone(),
2065 config: None,
2066 command: Some(BootstrapCommand::from("false")),
2068 exit_on_shutdown: false,
2069 };
2070 boot.to_env(&mut cmd);
2071 cmd.kill_on_drop(true);
2072 children.push(cmd.spawn().unwrap());
2073 }
2074 let host_mesh =
2075 HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("test").unwrap()), hosts);
2076
2077 let instance = testing::instance();
2078
2079 let err = host_mesh
2080 .spawn(&instance, "test", Extent::unity(), None, None)
2081 .await
2082 .unwrap_err();
2083 assert_matches!(
2084 err,
2085 crate::Error::ProcCreationError { state, .. }
2086 if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
2087 );
2088 }
2089
2090 #[cfg(fbcode_build)]
2091 #[assert_no_process_leak]
2092 #[tokio::test]
2093 async fn test_halting_proc_allocation() {
2094 let config = hyperactor_config::global::lock();
2095 let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
2096
2097 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
2098
2099 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
2100
2101 let mut children = Vec::new();
2102
2103 for (index, host) in hosts.iter().enumerate() {
2104 let mut cmd = Command::new(program.clone());
2105 let command = if index == 0 {
2106 let mut command = BootstrapCommand::from("sleep");
2107 command.args.push("60".to_string());
2108 Some(command)
2109 } else {
2110 None
2111 };
2112 let boot = Bootstrap::Host {
2113 addr: host.clone(),
2114 config: None,
2115 command,
2116 exit_on_shutdown: false,
2117 };
2118 boot.to_env(&mut cmd);
2119 cmd.kill_on_drop(true);
2120 children.push(cmd.spawn().unwrap());
2121 }
2122 let host_mesh =
2123 HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("test").unwrap()), hosts);
2124
2125 let instance = testing::instance();
2126
2127 let err = host_mesh
2128 .spawn(&instance, "test", Extent::unity(), None, None)
2129 .await
2130 .unwrap_err();
2131 let statuses = err.into_proc_spawn_error().unwrap();
2132 assert_matches!(
2133 &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
2134 &[Status::Timeout(_), Status::Running]
2135 );
2136 }
2137
2138 #[tokio::test]
2139 #[cfg(fbcode_build)]
2140 async fn test_client_config_override() {
2141 let config = hyperactor_config::global::lock();
2142 let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
2143 let _guard2 = config.override_key(
2144 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2145 Duration::from_mins(2),
2146 );
2147 let _guard3 = config.override_key(
2148 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
2149 Duration::from_mins(1),
2150 );
2151 let _guard4 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_mins(2));
2152
2153 unsafe {
2158 std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
2159 std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
2160 }
2161
2162 let instance = testing::instance();
2163
2164 let mut hm = testing::host_mesh(2).await;
2165 let proc_mesh = hm
2166 .spawn(instance, "test", Extent::unity(), None, None)
2167 .await
2168 .unwrap();
2169 let proc_ids = proc_mesh
2170 .proc_ids()
2171 .map(|proc_addr| proc_addr.id().clone())
2172 .collect::<Vec<_>>();
2173 let unique_proc_ids = proc_ids.iter().collect::<std::collections::HashSet<_>>();
2174
2175 assert_eq!(proc_ids.len(), 2);
2176 assert_eq!(unique_proc_ids.len(), proc_ids.len());
2177
2178 let actor_mesh: ActorMesh<testactor::TestActor> =
2179 proc_mesh.spawn(instance, "test", &()).await.unwrap();
2180
2181 let mut attrs_override = Attrs::new();
2182 attrs_override.set(
2183 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2184 Duration::from_mins(3),
2185 );
2186 actor_mesh
2187 .cast(
2188 instance,
2189 SetConfigAttrs(
2190 bincode::serde::encode_to_vec(&attrs_override, bincode::config::legacy())
2191 .unwrap(),
2192 ),
2193 )
2194 .unwrap();
2195
2196 let (tx, mut rx) = instance.open_port();
2197 actor_mesh
2198 .cast(instance, GetConfigAttrs(tx.bind()))
2199 .unwrap();
2200 let actual_attrs = rx.recv().await.unwrap();
2201 let actual_attrs =
2202 bincode::serde::decode_from_slice::<Attrs, _>(&actual_attrs, bincode::config::legacy())
2203 .map(|(v, _)| v)
2204 .unwrap();
2205
2206 assert_eq!(
2207 *actual_attrs
2208 .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
2209 .unwrap(),
2210 Duration::from_mins(3)
2211 );
2212 assert_eq!(
2213 *actual_attrs
2214 .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
2215 .unwrap(),
2216 Duration::from_mins(1)
2217 );
2218
2219 let _ = hm.shutdown(instance).await;
2220 }
2221
2222 #[tokio::test]
2240 async fn test_attach_fails_closed_on_unreachable_host() {
2241 let config = hyperactor_config::global::lock();
2242 let _guard = config.override_key(
2245 crate::config::MESH_ATTACH_CONFIG_TIMEOUT,
2246 Duration::from_millis(500),
2247 );
2248
2249 let instance = testing::instance();
2250
2251 let unreachable = free_localhost_addr();
2256
2257 let id = HostMeshId::instance(Label::new("hm_test").unwrap());
2258 let result = HostMesh::attach(instance, id, vec![unreachable.clone()]).await;
2259
2260 let err = match result {
2262 Ok(_) => panic!("HM-2: attach must fail when a host is unreachable"),
2263 Err(e) => e,
2264 };
2265
2266 let push_err = match err {
2268 crate::Error::ConfigPushFailed(e) => e,
2269 other => panic!("expected ConfigPushFailed, got: {other:?}"),
2270 };
2271 assert_eq!(push_err.failures.len(), 1);
2272 let (failed_host, _failure) = &push_err.failures[0];
2273 assert_eq!(
2274 failed_host,
2275 &HostRef(unreachable),
2276 "HM-4: failure entry must identify the unreachable host"
2277 );
2278 }
2290
2291 #[tokio::test]
2292 async fn test_sa1_empty_mesh_set_rejected() {
2293 let instance = testing::instance();
2294 let result = spawn_admin(std::iter::empty::<&HostMeshRef>(), instance, None, None).await;
2295 let err = result.unwrap_err().to_string();
2296 assert!(err.contains("SA-1"), "expected SA-1 error, got: {err}");
2297 }
2298
2299 #[tokio::test]
2300 async fn test_sa2_empty_hosts_rejected() {
2301 let instance = testing::instance();
2302 let mesh =
2303 HostMeshRef::from_hosts(HostMeshId::singleton(Label::new("empty").unwrap()), vec![]);
2304 let result = spawn_admin([&mesh], instance, None, None).await;
2305 let err = result.unwrap_err().to_string();
2306 assert!(err.contains("SA-2"), "expected SA-2 error, got: {err}");
2307 }
2308
2309 #[test]
2314 fn test_sa3_host_set_insert_idempotent() {
2315 let addr_a: ChannelAddr = "tcp:127.0.0.1:2001".parse().unwrap();
2316 let addr_b: ChannelAddr = "tcp:127.0.0.1:2002".parse().unwrap();
2317
2318 let ref_a = HostRef(addr_a.clone()).mesh_agent();
2319 let ref_b = HostRef(addr_b.clone()).mesh_agent();
2320
2321 let mut set = HostSet::new();
2322 set.insert(addr_a.to_string(), ref_a.clone());
2323 set.insert(addr_b.to_string(), ref_b.clone());
2324 set.insert("duplicate_addr".to_string(), ref_a.clone());
2326
2327 let result = set.into_vec();
2328 assert_eq!(
2329 result.len(),
2330 2,
2331 "SA-3: duplicate ActorAddr must not add entry"
2332 );
2333 assert_eq!(
2334 result[0].0,
2335 addr_a.to_string(),
2336 "SA-3: first-seen order preserved"
2337 );
2338 assert_eq!(
2339 result[1].0,
2340 addr_b.to_string(),
2341 "SA-3: first-seen order preserved"
2342 );
2343 }
2344
2345 #[test]
2346 fn test_sa3_aggregate_hosts_dedup() {
2347 let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2348 let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2349 let addr_c: ChannelAddr = "tcp:127.0.0.1:1003".parse().unwrap();
2350
2351 let mesh_a = HostMeshRef::from_hosts(
2353 HostMeshId::singleton(Label::new("mesh-a").unwrap()),
2354 vec![addr_a.clone(), addr_b.clone()],
2355 );
2356 let mesh_b = HostMeshRef::from_hosts(
2358 HostMeshId::singleton(Label::new("mesh-b").unwrap()),
2359 vec![addr_b.clone(), addr_c.clone()],
2360 );
2361
2362 let result = aggregate_hosts(&[&mesh_a, &mesh_b], None);
2363
2364 assert_eq!(result.len(), 3, "expected 3 hosts, got {:?}", result);
2366
2367 let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2369 assert_eq!(addrs[0], addr_a.to_string());
2370 assert_eq!(addrs[1], addr_b.to_string());
2371 assert_eq!(addrs[2], addr_c.to_string());
2372 }
2373
2374 #[test]
2377 fn test_sa6_ch1_client_host_dedup() {
2378 let addr_a: ChannelAddr = "tcp:127.0.0.1:1001".parse().unwrap();
2379 let addr_b: ChannelAddr = "tcp:127.0.0.1:1002".parse().unwrap();
2380
2381 let mesh = HostMeshRef::from_hosts(
2382 HostMeshId::singleton(Label::new("mesh").unwrap()),
2383 vec![addr_a.clone(), addr_b.clone()],
2384 );
2385
2386 let client_ref = HostRef(addr_a.clone()).mesh_agent();
2388 let client_entries = vec![("client_addr".to_string(), client_ref)];
2389
2390 let result = aggregate_hosts(&[&mesh], Some(client_entries));
2391
2392 assert_eq!(result.len(), 2, "expected 2 hosts, got {:?}", result);
2394 let addrs: Vec<String> = result.iter().map(|(a, _)| a.clone()).collect();
2395 assert_eq!(addrs[0], addr_a.to_string());
2396 assert_eq!(addrs[1], addr_b.to_string());
2397 }
2398}