1use hyperactor::Actor;
10use hyperactor::Handler;
11use hyperactor::accum::StreamingReducerOpts;
12use hyperactor::channel::ChannelTransport;
13use hyperactor::host::Host;
14use hyperactor::host::LocalProcManager;
15use hyperactor::host::SERVICE_PROC_NAME;
16use hyperactor_config::CONFIG;
17use hyperactor_config::ConfigAttr;
18use hyperactor_config::attrs::declare_attrs;
19use ndslice::view::CollectMeshExt;
20
21use crate::supervision::MeshFailure;
22
23pub mod host_agent;
24
25use std::collections::HashSet;
26use std::hash::Hash;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30use std::time::Duration;
31
32use hyperactor::channel::ChannelAddr;
33use hyperactor::context;
34use hyperactor::reference as hyperactor_reference;
35use ndslice::Extent;
36use ndslice::Region;
37use ndslice::ViewExt;
38use ndslice::extent;
39use ndslice::view;
40use ndslice::view::Ranked;
41use ndslice::view::RegionParseError;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::Bootstrap;
47use crate::Name;
48use crate::ProcMesh;
49use crate::ProcMeshRef;
50use crate::ValueMesh;
51use crate::alloc::Alloc;
52use crate::bootstrap::BootstrapCommand;
53use crate::bootstrap::BootstrapProcManager;
54pub use crate::host_mesh::host_agent::HostAgent;
55use crate::host_mesh::host_agent::HostAgentMode;
56use crate::host_mesh::host_agent::HostMeshAgentProcMeshTrampoline;
57use crate::host_mesh::host_agent::ProcManagerSpawnFn;
58use crate::host_mesh::host_agent::ProcState;
59use crate::host_mesh::host_agent::SetClientConfigClient;
60use crate::host_mesh::host_agent::ShutdownHostClient;
61use crate::host_mesh::host_agent::SpawnMeshAdminClient;
62use crate::mesh_controller::HostMeshController;
63use crate::mesh_controller::ProcMeshController;
64use crate::proc_agent::ProcAgent;
65use crate::proc_mesh::ProcRef;
66use crate::resource;
67use crate::resource::CreateOrUpdateClient;
68use crate::resource::GetRankStatus;
69use crate::resource::GetRankStatusClient;
70use crate::resource::ProcSpec;
71use crate::resource::RankedValues;
72use crate::resource::Status;
73use crate::transport::DEFAULT_TRANSPORT;
74
75pub const HOST_MESH_CONTROLLER_NAME: &str = "host_mesh_controller";
77
78pub const PROC_MESH_CONTROLLER_NAME: &str = "proc_mesh_controller";
80
81declare_attrs! {
82 @meta(CONFIG = ConfigAttr::new(
85 Some("HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()),
86 Some("mesh_proc_spawn_max_idle".to_string()),
87 ))
88 pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30);
89
90 @meta(CONFIG = ConfigAttr::new(
93 Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()),
94 Some("proc_stop_max_idle".to_string()),
95 ))
96 pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30);
97
98 @meta(CONFIG = ConfigAttr::new(
101 Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()),
102 Some("get_proc_state_max_idle".to_string()),
103 ))
104 pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1);
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
109pub struct HostRef(ChannelAddr);
110wirevalue::register_type!(HostRef);
111
112impl HostRef {
113 fn mesh_agent(&self) -> hyperactor_reference::ActorRef<HostAgent> {
115 hyperactor_reference::ActorRef::attest(
116 self.service_proc()
117 .actor_id(host_agent::HOST_MESH_AGENT_ACTOR_NAME, 0),
118 )
119 }
120
121 fn named_proc(&self, name: &Name) -> hyperactor_reference::ProcId {
123 hyperactor_reference::ProcId::with_name(self.0.clone(), name.to_string())
124 }
125
126 fn service_proc(&self) -> hyperactor_reference::ProcId {
128 hyperactor_reference::ProcId::with_name(self.0.clone(), SERVICE_PROC_NAME)
129 }
130
131 pub(crate) async fn shutdown(
150 &self,
151 cx: &impl hyperactor::context::Actor,
152 ) -> anyhow::Result<()> {
153 let agent = self.mesh_agent();
154 let terminate_timeout =
155 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_TIMEOUT);
156 let max_in_flight =
157 hyperactor_config::global::get(crate::bootstrap::MESH_TERMINATE_CONCURRENCY);
158 agent
159 .shutdown_host(cx, terminate_timeout, max_in_flight.clamp(1, 256))
160 .await?;
161 Ok(())
162 }
163}
164
165impl TryFrom<hyperactor_reference::ActorRef<HostAgent>> for HostRef {
166 type Error = crate::Error;
167
168 fn try_from(value: hyperactor_reference::ActorRef<HostAgent>) -> Result<Self, crate::Error> {
169 let proc_id = value.actor_id().proc_id();
170 Ok(HostRef(proc_id.addr().clone()))
171 }
172}
173
174impl std::fmt::Display for HostRef {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 self.0.fmt(f)
177 }
178}
179
180impl FromStr for HostRef {
181 type Err = <ChannelAddr as FromStr>::Err;
182
183 fn from_str(s: &str) -> Result<Self, Self::Err> {
184 Ok(HostRef(ChannelAddr::from_str(s)?))
185 }
186}
187
188#[allow(dead_code)]
201pub struct HostMesh {
202 name: Name,
203 extent: Extent,
204 allocation: HostMeshAllocation,
205 current_ref: HostMeshRef,
206}
207
208#[allow(dead_code)]
226enum HostMeshAllocation {
227 ProcMesh {
234 proc_mesh: ProcMesh,
235 proc_mesh_ref: ProcMeshRef,
236 hosts: Vec<HostRef>,
237 },
238 Owned { hosts: Vec<HostRef> },
246}
247
248impl HostMesh {
249 fn notify_created(&self) {
251 let name_str = self.name.to_string();
252 let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
253
254 hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
255 id: mesh_id_hash,
256 timestamp: std::time::SystemTime::now(),
257 class: "Host".to_string(),
258 given_name: self.name.name().to_string(),
259 full_name: name_str,
260 shape_json: serde_json::to_string(&self.extent).unwrap_or_default(),
261 parent_mesh_id: None,
262 parent_view_json: None,
263 });
264
265 let now = std::time::SystemTime::now();
268 for (rank, host) in self.current_ref.hosts().iter().enumerate() {
269 let actor = host.mesh_agent();
270 hyperactor_telemetry::notify_actor_created(hyperactor_telemetry::ActorEvent {
271 id: hyperactor_telemetry::hash_to_u64(actor.actor_id()),
272 timestamp: now,
273 mesh_id: mesh_id_hash,
274 rank: rank as u64,
275 full_name: actor.actor_id().to_string(),
276 display_name: None,
277 });
278 }
279 }
280
281 pub async fn local() -> crate::Result<HostMesh> {
304 Self::local_with_bootstrap(BootstrapCommand::current()?).await
305 }
306
307 pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> crate::Result<HostMesh> {
315 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
316 let result = boot.bootstrap().await;
317 if let Err(err) = result {
318 tracing::error!("failed to bootstrap local host mesh process: {}", err);
319 }
320 std::process::exit(1);
321 }
322
323 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
324
325 let manager = BootstrapProcManager::new(bootstrap_cmd)?;
326 let host = Host::new(manager, addr).await?;
327 let addr = host.addr().clone();
328 let system_proc = host.system_proc().clone();
329 let host_mesh_agent = system_proc
330 .spawn(
331 "host_agent",
332 HostAgent::new(HostAgentMode::Process {
333 host,
334 shutdown_tx: None,
335 }),
336 )
337 .map_err(crate::Error::SingletonActorSpawnError)?;
338 host_mesh_agent.bind::<HostAgent>();
339
340 let host = HostRef(addr);
341 let host_mesh_ref = HostMeshRef::new(
342 Name::new("local").unwrap(),
343 extent!(hosts = 1).into(),
344 vec![host],
345 )?;
346 Ok(HostMesh::take(host_mesh_ref))
347 }
348
349 pub async fn local_in_process() -> crate::Result<HostMesh> {
359 let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).binding_addr();
360 Ok(HostMesh::take(Self::local_n_in_process(vec![addr]).await?))
361 }
362
363 pub(crate) async fn local_n_in_process(addrs: Vec<ChannelAddr>) -> crate::Result<HostMeshRef> {
372 let n = addrs.len();
373 let mut host_refs = Vec::with_capacity(n);
374 for addr in addrs {
375 host_refs.push(Self::create_in_process_host(addr).await?);
376 }
377 HostMeshRef::new(
378 Name::new("local").unwrap(),
379 extent!(hosts = n).into(),
380 host_refs,
381 )
382 }
383
384 async fn create_in_process_host(addr: ChannelAddr) -> crate::Result<HostRef> {
387 let spawn: ProcManagerSpawnFn =
388 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
389 let manager = LocalProcManager::new(spawn);
390 let host = Host::new(manager, addr).await?;
391 let addr = host.addr().clone();
392 let system_proc = host.system_proc().clone();
393 let host_mesh_agent = system_proc
394 .spawn(
395 host_agent::HOST_MESH_AGENT_ACTOR_NAME,
396 HostAgent::new(HostAgentMode::Local(host)),
397 )
398 .map_err(crate::Error::SingletonActorSpawnError)?;
399 host_mesh_agent.bind::<HostAgent>();
400 Ok(HostRef(addr))
401 }
402
403 pub async fn process(extent: Extent, command: BootstrapCommand) -> crate::Result<HostMesh> {
414 if let Ok(Some(boot)) = Bootstrap::get_from_env() {
415 let result = boot.bootstrap().await;
416 if let Err(err) = result {
417 tracing::error!("failed to bootstrap process host mesh process: {}", err);
418 }
419 std::process::exit(1);
420 }
421
422 let bind_spec = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT);
423 let mut hosts = Vec::with_capacity(extent.num_ranks());
424 for _ in 0..extent.num_ranks() {
425 let addr = bind_spec.binding_addr();
427 let bootstrap = Bootstrap::Host {
428 addr: addr.clone(),
429 command: Some(command.clone()),
430 config: Some(hyperactor_config::global::attrs()),
431 exit_on_shutdown: false,
432 };
433
434 let mut cmd = command.new();
435 bootstrap.to_env(&mut cmd);
436 cmd.spawn()?;
437 hosts.push(HostRef(addr));
438 }
439
440 let host_mesh_ref = HostMeshRef::new(Name::new("process").unwrap(), extent.into(), hosts)?;
441 Ok(HostMesh::take(host_mesh_ref))
442 }
443
444 pub async fn allocate<C: context::Actor>(
488 cx: &C,
489 alloc: Box<dyn Alloc + Send + Sync>,
490 name: &str,
491 bootstrap_params: Option<BootstrapCommand>,
492 ) -> crate::Result<Self>
493 where
494 C::A: Handler<MeshFailure>,
495 {
496 Self::allocate_inner(cx, alloc, Name::new(name)?, bootstrap_params).await
497 }
498
499 #[hyperactor::instrument(fields(host_mesh=name.to_string()))]
501 async fn allocate_inner<C: context::Actor>(
502 cx: &C,
503 alloc: Box<dyn Alloc + Send + Sync>,
504 name: Name,
505 bootstrap_params: Option<BootstrapCommand>,
506 ) -> crate::Result<Self>
507 where
508 C::A: Handler<MeshFailure>,
509 {
510 tracing::info!(name = "HostMeshStatus", status = "Allocate::Attempt");
511 let transport = alloc.transport();
512 let extent = alloc.extent().clone();
513 let is_local = alloc.is_local();
514 let proc_mesh = ProcMesh::allocate(cx, alloc, name.name()).await?;
515
516 let (mesh_agents, mut mesh_agents_rx) = cx.mailbox().open_port();
521 let trampoline_name = Name::new("host_mesh_trampoline").unwrap();
522 let _trampoline_actor_mesh = proc_mesh
523 .spawn_with_name::<HostMeshAgentProcMeshTrampoline, C>(
524 cx,
525 trampoline_name,
526 &(transport, mesh_agents.bind(), bootstrap_params, is_local),
527 None,
528 true,
530 )
531 .await?;
532
533 let mut hosts = Vec::new();
535 for _rank in 0..extent.num_ranks() {
536 let mesh_agent = mesh_agents_rx.recv().await?;
537
538 let addr = mesh_agent.actor_id().proc_id().addr().clone();
539
540 let host_ref = HostRef(addr);
541 if host_ref.mesh_agent() != mesh_agent {
542 return Err(crate::Error::HostMeshAgentConfigurationError(
543 mesh_agent.actor_id().clone(),
544 format!(
545 "expected mesh agent actor id to be {}",
546 host_ref.mesh_agent().actor_id()
547 ),
548 ));
549 }
550 hosts.push(host_ref);
551 }
552
553 let proc_mesh_ref = proc_mesh.clone();
554 let mesh = Self {
555 name: name.clone(),
556 extent: extent.clone(),
557 allocation: HostMeshAllocation::ProcMesh {
558 proc_mesh,
559 proc_mesh_ref,
560 hosts: hosts.clone(),
561 },
562 current_ref: HostMeshRef::new(name, extent.into(), hosts).unwrap(),
563 };
564
565 let controller = HostMeshController::new(mesh.deref().clone());
568 let controller_name = format!("{}_{}", HOST_MESH_CONTROLLER_NAME, mesh.name());
571 let controller_handle = controller
572 .spawn_with_name(cx, &controller_name)
573 .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
574 let _: hyperactor::reference::ActorRef<HostMeshController> = controller_handle.bind();
579
580 tracing::info!(name = "HostMeshStatus", status = "Allocate::Created");
581
582 mesh.notify_created();
583
584 Ok(mesh)
585 }
586
587 pub fn take(mesh: HostMeshRef) -> Self {
594 let region = mesh.region().clone();
595 let hosts: Vec<HostRef> = mesh.values().collect();
596
597 let current_ref = HostMeshRef::new(mesh.name.clone(), region.clone(), hosts.clone())
598 .expect("region/hosts cardinality must match");
599
600 let result = Self {
601 name: mesh.name,
602 extent: region.extent().clone(),
603 allocation: HostMeshAllocation::Owned { hosts },
604 current_ref,
605 };
606 result.notify_created();
607 result
608 }
609
610 pub async fn attach(
623 cx: &impl context::Actor,
624 name: Name,
625 addresses: Vec<ChannelAddr>,
626 ) -> crate::Result<Self> {
627 let mesh_ref = HostMeshRef::from_hosts(name, addresses);
628 let config = hyperactor_config::global::propagatable_attrs();
629 mesh_ref.push_config(cx, config).await;
630 Ok(Self::take(mesh_ref))
631 }
632
633 #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
643 pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
644 tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
645 let mut failed_hosts = vec![];
646 for host in self.current_ref.values() {
647 if let Err(e) = host.shutdown(cx).await {
648 tracing::warn!(
649 name = "HostMeshStatus",
650 status = "Shutdown::Host::Failed",
651 host = %host,
652 error = %e,
653 "host shutdown failed"
654 );
655 failed_hosts.push(host);
656 }
657 }
658 if failed_hosts.is_empty() {
659 tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success");
660 } else {
661 tracing::error!(
662 name = "HostMeshStatus",
663 status = "Shutdown::Failed",
664 "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
665 failed_hosts
666 );
667 }
668
669 match &mut self.allocation {
670 HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
671 proc_mesh.stop(cx, "host mesh shutdown".to_string()).await?;
672 }
673 HostMeshAllocation::Owned { .. } => {}
674 }
675 Ok(())
676 }
677}
678
679impl Deref for HostMesh {
680 type Target = HostMeshRef;
681
682 fn deref(&self) -> &Self::Target {
683 &self.current_ref
684 }
685}
686
687impl Drop for HostMesh {
688 fn drop(&mut self) {
706 tracing::info!(
707 name = "HostMeshStatus",
708 host_mesh = %self.name,
709 status = "Dropping",
710 );
711 let hosts: Vec<HostRef> = match &self.allocation {
713 HostMeshAllocation::ProcMesh { hosts, .. } | HostMeshAllocation::Owned { hosts } => {
714 hosts.clone()
715 }
716 };
717
718 if let Ok(handle) = tokio::runtime::Handle::try_current() {
720 let mesh_name = self.name.clone();
721 let allocation_label = match &self.allocation {
722 HostMeshAllocation::ProcMesh { .. } => "proc_mesh",
723 HostMeshAllocation::Owned { .. } => "owned",
724 }
725 .to_string();
726
727 handle.spawn(async move {
728 let span = tracing::info_span!(
729 "hostmesh_drop_cleanup",
730 host_mesh = %mesh_name,
731 allocation = %allocation_label,
732 hosts = hosts.len(),
733 );
734 let _g = span.enter();
735
736 match hyperactor::Proc::direct(
739 ChannelTransport::Unix.any(),
740 "hostmesh-drop".to_string(),
741 )
742 {
743 Err(e) => {
744 tracing::warn!(
745 error = %e,
746 "failed to construct ephemeral Proc for drop-cleanup; \
747 relying on PDEATHSIG/manager Drop"
748 );
749 }
750 Ok(proc) => {
751 match proc.instance("drop") {
752 Err(e) => {
753 tracing::warn!(
754 error = %e,
755 "failed to create ephemeral instance for drop-cleanup; \
756 relying on PDEATHSIG/manager Drop"
757 );
758 }
759 Ok((instance, _guard)) => {
760 let mut attempted = 0usize;
761 let mut ok = 0usize;
762 let mut err = 0usize;
763
764 for host in hosts {
765 attempted += 1;
766 tracing::debug!(host = %host, "drop-cleanup: shutdown start");
767 match host.shutdown(&instance).await {
768 Ok(()) => {
769 ok += 1;
770 tracing::debug!(host = %host, "drop-cleanup: shutdown ok");
771 }
772 Err(e) => {
773 err += 1;
774 tracing::warn!(host = %host, error = %e, "drop-cleanup: shutdown failed");
775 }
776 }
777 }
778
779 tracing::info!(
780 attempted, ok, err,
781 "hostmesh drop-cleanup summary"
782 );
783 }
784 }
785 }
786 }
787 });
788 } else {
789 tracing::warn!(
792 host_mesh = %self.name,
793 hosts = hosts.len(),
794 "HostMesh dropped without a Tokio runtime; skipping \
795 best-effort shutdown. This indicates that .shutdown() \
796 on this mesh has not been called before program exit \
797 (perhaps due to a missing call to \
798 'monarch.actor.shutdown_context()'?) This in turn can \
799 lead to backtrace output due to folly SIGTERM \
800 handlers."
801 );
802 }
803
804 tracing::info!(
805 name = "HostMeshStatus",
806 host_mesh = %self.name,
807 status = "Dropped",
808 );
809 }
810}
811
812pub(crate) fn mesh_to_rankedvalues_with_default<T, F>(
821 mesh: &ValueMesh<T>,
822 default: T,
823 is_sentinel: F,
824 len: usize,
825) -> RankedValues<T>
826where
827 T: Eq + Clone + 'static,
828 F: Fn(&T) -> bool,
829{
830 let mut out = RankedValues::from((0..len, default));
831 for (i, s) in mesh.values().enumerate() {
832 if !is_sentinel(&s) {
833 out.merge_from(RankedValues::from((i..i + 1, s)));
834 }
835 }
836 out
837}
838
839#[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)]
858pub struct HostMeshRef {
859 name: Name,
860 region: Region,
861 ranks: Arc<Vec<HostRef>>,
862}
863wirevalue::register_type!(HostMeshRef);
864
865impl HostMeshRef {
866 #[allow(clippy::result_large_err)]
869 fn new(name: Name, region: Region, ranks: Vec<HostRef>) -> crate::Result<Self> {
870 if region.num_ranks() != ranks.len() {
871 return Err(crate::Error::InvalidRankCardinality {
872 expected: region.num_ranks(),
873 actual: ranks.len(),
874 });
875 }
876 Ok(Self {
877 name,
878 region,
879 ranks: Arc::new(ranks),
880 })
881 }
882
883 pub fn from_hosts(name: Name, hosts: Vec<ChannelAddr>) -> Self {
886 Self {
887 name,
888 region: extent!(hosts = hosts.len()).into(),
889 ranks: Arc::new(hosts.into_iter().map(HostRef).collect()),
890 }
891 }
892
893 pub fn from_host_agents(
895 name: Name,
896 agents: Vec<hyperactor_reference::ActorRef<HostAgent>>,
897 ) -> crate::Result<Self> {
898 Ok(Self {
899 name,
900 region: extent!(hosts = agents.len()).into(),
901 ranks: Arc::new(
902 agents
903 .into_iter()
904 .map(HostRef::try_from)
905 .collect::<crate::Result<_>>()?,
906 ),
907 })
908 }
909
910 pub fn from_host_agent(
912 name: Name,
913 agent: hyperactor_reference::ActorRef<HostAgent>,
914 ) -> crate::Result<Self> {
915 Ok(Self {
916 name,
917 region: Extent::unity().into(),
918 ranks: Arc::new(vec![HostRef::try_from(agent)?]),
919 })
920 }
921
922 pub(crate) fn host_entries(&self) -> Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> {
926 self.ranks
927 .iter()
928 .map(|h| (h.0.to_string(), h.mesh_agent()))
929 .collect()
930 }
931
932 pub(crate) async fn push_config(
941 &self,
942 cx: &impl context::Actor,
943 attrs: hyperactor_config::attrs::Attrs,
944 ) {
945 let timeout = hyperactor_config::global::get(crate::config::MESH_ATTACH_CONFIG_TIMEOUT);
946 let hosts: Vec<_> = self.values().collect();
947 let num_hosts = hosts.len();
948
949 let barrier = futures::future::join_all(hosts.into_iter().map(|host| {
950 let attrs = attrs.clone();
951 let agent_id = host.mesh_agent().actor_id().clone();
952 async move {
953 match host.mesh_agent().set_client_config(cx, attrs).await {
954 Ok(()) => {
955 tracing::debug!(host = %agent_id, "host agent config installed");
956 true
957 }
958 Err(e) => {
959 tracing::warn!(
960 host = %agent_id,
961 error = %e,
962 "failed to push client config to host agent, \
963 continuing without it",
964 );
965 false
966 }
967 }
968 }
969 }));
970
971 match tokio::time::timeout(timeout, barrier).await {
972 Ok(results) => {
973 let success = results.iter().filter(|&&r| r).count();
974 let failed = num_hosts - success;
975 tracing::info!(
976 success = success,
977 failed = failed,
978 "push_config barrier complete",
979 );
980 }
981 Err(_) => {
982 tracing::warn!(
983 num_hosts = num_hosts,
984 timeout_secs = timeout.as_secs(),
985 "push_config barrier timed out, some hosts may not \
986 have received client config",
987 );
988 }
989 }
990 }
991
992 #[allow(clippy::result_large_err)]
998 pub async fn spawn<C: context::Actor>(
999 &self,
1000 cx: &C,
1001 name: &str,
1002 per_host: Extent,
1003 ) -> crate::Result<ProcMesh>
1004 where
1005 C::A: Handler<MeshFailure>,
1006 {
1007 self.spawn_inner(cx, Name::new(name)?, per_host).await
1008 }
1009
1010 #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1011 async fn spawn_inner<C: context::Actor>(
1012 &self,
1013 cx: &C,
1014 proc_mesh_name: Name,
1015 per_host: Extent,
1016 ) -> crate::Result<ProcMesh>
1017 where
1018 C::A: Handler<MeshFailure>,
1019 {
1020 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
1021 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
1022 let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await;
1023 match &result {
1024 Ok(_) => {
1025 tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
1026 tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
1027 }
1028 Err(error) => {
1029 tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
1030 tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
1031 }
1032 }
1033 result
1034 }
1035
1036 async fn spawn_inner_inner<C: context::Actor>(
1037 &self,
1038 cx: &C,
1039 proc_mesh_name: Name,
1040 per_host: Extent,
1041 ) -> crate::Result<ProcMesh>
1042 where
1043 C::A: Handler<MeshFailure>,
1044 {
1045 let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
1046 let host_labels = self.region.labels().iter().collect::<HashSet<_>>();
1047 if !per_host_labels
1048 .intersection(&host_labels)
1049 .collect::<Vec<_>>()
1050 .is_empty()
1051 {
1052 return Err(crate::Error::ConfigurationError(anyhow::anyhow!(
1053 "per_host dims overlap with existing dims when spawning proc mesh"
1054 )));
1055 }
1056
1057 let extent = self
1058 .region
1059 .extent()
1060 .concat(&per_host)
1061 .map_err(|err| crate::Error::ConfigurationError(err.into()))?;
1062
1063 let region: Region = extent.clone().into();
1064
1065 tracing::info!(
1066 name = "ProcMeshStatus",
1067 status = "Spawn::Attempt",
1068 %region,
1069 "spawning proc mesh"
1070 );
1071
1072 let mut procs = Vec::new();
1073 let num_ranks = region.num_ranks();
1074 let (port, rx) = cx.mailbox().open_accum_port_opts(
1077 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1078 StreamingReducerOpts {
1079 max_update_interval: Some(Duration::from_millis(50)),
1080 initial_update_interval: None,
1081 },
1082 );
1083
1084 let mut proc_names = Vec::new();
1091 let client_config_override = hyperactor_config::global::propagatable_attrs();
1092 for (host_rank, host) in self.ranks.iter().enumerate() {
1093 for per_host_rank in 0..per_host.num_ranks() {
1094 let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
1095 let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank))?;
1096 proc_names.push(proc_name.clone());
1097 host.mesh_agent()
1098 .create_or_update(
1099 cx,
1100 proc_name.clone(),
1101 resource::Rank::new(create_rank),
1102 ProcSpec::new(client_config_override.clone()),
1103 )
1104 .await
1105 .map_err(|e| {
1106 crate::Error::HostMeshAgentConfigurationError(
1107 host.mesh_agent().actor_id().clone(),
1108 format!("failed while creating proc: {}", e),
1109 )
1110 })?;
1111 let mut reply_port = port.bind();
1112 reply_port.return_undeliverable(false);
1115 host.mesh_agent()
1116 .get_rank_status(cx, proc_name.clone(), reply_port)
1117 .await
1118 .map_err(|e| {
1119 crate::Error::HostMeshAgentConfigurationError(
1120 host.mesh_agent().actor_id().clone(),
1121 format!("failed while querying proc status: {}", e),
1122 )
1123 })?;
1124 let proc_id = host.named_proc(&proc_name);
1125 tracing::info!(
1126 name = "ProcMeshStatus",
1127 status = "Spawn::CreatingProc",
1128 %proc_id,
1129 rank = create_rank,
1130 );
1131 procs.push(ProcRef::new(
1132 proc_id,
1133 create_rank,
1134 hyperactor_reference::ActorRef::attest(
1136 host.named_proc(&proc_name)
1137 .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
1138 ),
1139 ));
1140 }
1141 }
1142
1143 let start_time = tokio::time::Instant::now();
1144
1145 match GetRankStatus::wait(
1148 rx,
1149 num_ranks,
1150 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1151 region.clone(), )
1153 .await
1154 {
1155 Ok(statuses) => {
1156 if let Some((rank, status)) = statuses
1159 .values()
1160 .enumerate()
1161 .find(|(_, s)| s.is_terminating())
1162 {
1163 let proc_name = &proc_names[rank];
1164 let host_rank = rank / per_host.num_ranks();
1165 let mesh_agent = self.ranks[host_rank].mesh_agent();
1166 let (reply_tx, mut reply_rx) = cx.mailbox().open_port();
1167 let mut reply_tx = reply_tx.bind();
1168 reply_tx.return_undeliverable(false);
1171 mesh_agent
1172 .send(
1173 cx,
1174 resource::GetState {
1175 name: proc_name.clone(),
1176 reply: reply_tx,
1177 },
1178 )
1179 .map_err(|e| {
1180 crate::Error::SendingError(mesh_agent.actor_id().clone(), e.into())
1181 })?;
1182 let state = match tokio::time::timeout(
1183 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1184 reply_rx.recv(),
1185 )
1186 .await
1187 {
1188 Ok(Ok(state)) => state,
1189 _ => resource::State {
1190 name: proc_name.clone(),
1191 status,
1192 state: None,
1193 },
1194 };
1195
1196 tracing::error!(
1197 name = "ProcMeshStatus",
1198 status = "Spawn::GetRankStatus",
1199 rank = host_rank,
1200 "rank {} is terminating with state: {}",
1201 host_rank,
1202 state
1203 );
1204
1205 return Err(crate::Error::ProcCreationError {
1206 state: Box::new(state),
1207 host_rank,
1208 mesh_agent,
1209 });
1210 }
1211 }
1212 Err(complete) => {
1213 tracing::error!(
1214 name = "ProcMeshStatus",
1215 status = "Spawn::GetRankStatus",
1216 "timeout after {:?} when waiting for procs being created",
1217 hyperactor_config::global::get(PROC_SPAWN_MAX_IDLE),
1218 );
1219 let legacy = mesh_to_rankedvalues_with_default(
1222 &complete,
1223 Status::Timeout(start_time.elapsed()),
1224 Status::is_not_exist,
1225 num_ranks,
1226 );
1227 return Err(crate::Error::ProcSpawnError { statuses: legacy });
1228 }
1229 }
1230
1231 let mesh =
1232 ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
1233 if let Ok(ref mesh) = mesh {
1234 let controller = ProcMeshController::new(mesh.deref().clone());
1237 let controller_name = format!("{}_{}", PROC_MESH_CONTROLLER_NAME, mesh.name());
1240 let controller_handle = controller
1241 .spawn_with_name(cx, &controller_name)
1242 .map_err(|e| crate::Error::ControllerActorSpawnError(mesh.name().clone(), e))?;
1243 let _: hyperactor::reference::ActorRef<ProcMeshController> = controller_handle.bind();
1248 }
1249 mesh
1250 }
1251
1252 pub fn name(&self) -> &Name {
1254 &self.name
1255 }
1256
1257 pub fn hosts(&self) -> &[HostRef] {
1259 &self.ranks
1260 }
1261
1262 pub async fn spawn_admin(
1271 &self,
1272 cx: &impl hyperactor::context::Actor,
1273 admin_addr: Option<std::net::SocketAddr>,
1274 ) -> anyhow::Result<String> {
1275 let mut hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)> = self
1276 .ranks
1277 .iter()
1278 .map(|h| (h.0.to_string(), h.mesh_agent()))
1279 .collect();
1280
1281 if let Some(client_host) = crate::global_context::try_this_host() {
1284 for (addr, agent_ref) in client_host.host_entries() {
1285 let agent_id = agent_ref.actor_id();
1286 if !hosts
1287 .iter()
1288 .any(|(_, existing)| existing.actor_id() == agent_id)
1289 {
1290 hosts.push((addr, agent_ref));
1291 }
1292 }
1293 }
1294
1295 let root_client_id = cx.mailbox().actor_id().clone();
1296
1297 let head_agent = self.ranks[0].mesh_agent();
1298 let addr = head_agent
1299 .spawn_mesh_admin(cx, hosts, Some(root_client_id), admin_addr)
1300 .await?;
1301
1302 Ok(addr)
1303 }
1304
1305 #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
1306 pub(crate) async fn stop_proc_mesh(
1307 &self,
1308 cx: &impl hyperactor::context::Actor,
1309 proc_mesh_name: &Name,
1310 procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1311 region: Region,
1312 reason: String,
1313 ) -> anyhow::Result<()> {
1314 let mut proc_names = Vec::new();
1317 let num_ranks = region.num_ranks();
1318 let (port, rx) = cx.mailbox().open_accum_port_opts(
1321 crate::StatusMesh::from_single(region.clone(), Status::NotExist),
1322 StreamingReducerOpts {
1323 max_update_interval: Some(Duration::from_millis(50)),
1324 initial_update_interval: None,
1325 },
1326 );
1327 for proc_id in procs.into_iter() {
1328 let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1329 let proc_name = proc_name.parse::<Name>()?;
1333 proc_names.push(proc_name.clone());
1334
1335 let host = HostRef(addr);
1338 host.mesh_agent().send(
1339 cx,
1340 resource::Stop {
1341 name: proc_name.clone(),
1342 reason: reason.clone(),
1343 },
1344 )?;
1345 host.mesh_agent()
1346 .get_rank_status(cx, proc_name, port.bind())
1347 .await?;
1348
1349 tracing::info!(
1350 name = "ProcMeshStatus",
1351 %proc_id,
1352 status = "Stop::Sent",
1353 );
1354 }
1355 tracing::info!(
1356 name = "HostMeshStatus",
1357 status = "ProcMesh::Stop::Sent",
1358 "sending Stop to proc mesh for {} procs: {}",
1359 proc_names.len(),
1360 proc_names
1361 .iter()
1362 .map(|n| n.to_string())
1363 .collect::<Vec<_>>()
1364 .join(", ")
1365 );
1366
1367 let start_time = tokio::time::Instant::now();
1368
1369 match GetRankStatus::wait(
1370 rx,
1371 num_ranks,
1372 hyperactor_config::global::get(PROC_STOP_MAX_IDLE),
1373 region.clone(), )
1375 .await
1376 {
1377 Ok(statuses) => {
1378 let all_stopped = statuses.values().all(|s| s.is_terminating());
1379 if !all_stopped {
1380 tracing::error!(
1381 name = "ProcMeshStatus",
1382 status = "FailedToStop",
1383 "failed to terminate proc mesh: {:?}",
1384 statuses,
1385 );
1386 return Err(anyhow::anyhow!(
1387 "failed to terminate proc mesh: {:?}",
1388 statuses,
1389 ));
1390 }
1391 tracing::info!(name = "ProcMeshStatus", status = "Stopped");
1392 }
1393 Err(complete) => {
1394 let legacy = mesh_to_rankedvalues_with_default(
1397 &complete,
1398 Status::Timeout(start_time.elapsed()),
1399 Status::is_not_exist,
1400 num_ranks,
1401 );
1402 tracing::error!(
1403 name = "ProcMeshStatus",
1404 status = "StoppingTimeout",
1405 "failed to terminate proc mesh before timeout: {:?}",
1406 legacy,
1407 );
1408 return Err(anyhow::anyhow!(
1409 "failed to terminate proc mesh {} before timeout: {:?}",
1410 proc_mesh_name,
1411 legacy
1412 ));
1413 }
1414 }
1415 Ok(())
1416 }
1417
1418 #[allow(clippy::result_large_err)]
1423 pub(crate) async fn proc_states(
1424 &self,
1425 cx: &impl context::Actor,
1426 procs: impl IntoIterator<Item = hyperactor_reference::ProcId>,
1427 region: Region,
1428 ) -> crate::Result<ValueMesh<resource::State<ProcState>>> {
1429 let (tx, mut rx) = cx.mailbox().open_port();
1430
1431 let mut num_ranks = 0;
1432 let procs: Vec<hyperactor_reference::ProcId> = procs.into_iter().collect();
1433 let mut proc_names = Vec::new();
1434 for proc_id in procs.iter() {
1435 num_ranks += 1;
1436 let (addr, proc_name) = (proc_id.addr().clone(), proc_id.name().to_string());
1437
1438 let host = HostRef(addr);
1441 let proc_name = proc_name.parse::<Name>()?;
1442 proc_names.push(proc_name.clone());
1443 let mut reply = tx.bind();
1444 reply.return_undeliverable(false);
1447 host.mesh_agent()
1448 .send(
1449 cx,
1450 resource::GetState {
1451 name: proc_name,
1452 reply,
1453 },
1454 )
1455 .map_err(|e| {
1456 crate::Error::CallError(host.mesh_agent().actor_id().clone(), e.into())
1457 })?;
1458 }
1459
1460 let mut states = Vec::with_capacity(num_ranks);
1461 let timeout = hyperactor_config::global::get(GET_PROC_STATE_MAX_IDLE);
1462 for _ in 0..num_ranks {
1463 let state = tokio::time::timeout(timeout, rx.recv()).await;
1469 if let Ok(state) = state {
1470 let state = state?;
1472 match state.state {
1473 Some(ref inner) => {
1474 states.push((inner.create_rank, state));
1475 }
1476 None => {
1477 return Err(crate::Error::NotExist(state.name));
1478 }
1479 }
1480 } else {
1481 tracing::warn!(
1484 "Timeout waiting for response from host mesh agent for proc_states after {:?}",
1485 timeout
1486 );
1487 let all_ranks = (0..num_ranks).collect::<HashSet<_>>();
1488 let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::<HashSet<_>>();
1489 let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::<Vec<_>>();
1490 assert_eq!(leftover_ranks.len(), num_ranks - states.len());
1491 while states.len() < num_ranks {
1492 let rank = *leftover_ranks
1493 .pop()
1494 .expect("leftover ranks should not be empty");
1495 states.push((
1496 rank,
1498 resource::State {
1499 name: proc_names[rank].clone(),
1500 status: resource::Status::Timeout(timeout),
1501 state: None,
1502 },
1503 ));
1504 }
1505 break;
1506 }
1507 }
1508 states.sort_by_key(|(rank, _)| *rank);
1512 let vm = states
1513 .into_iter()
1514 .map(|(_, state)| state)
1515 .collect_mesh::<ValueMesh<_>>(region)?;
1516 Ok(vm)
1517 }
1518}
1519
1520impl view::Ranked for HostMeshRef {
1521 type Item = HostRef;
1522
1523 fn region(&self) -> &Region {
1524 &self.region
1525 }
1526
1527 fn get(&self, rank: usize) -> Option<&Self::Item> {
1528 self.ranks.get(rank)
1529 }
1530}
1531
1532impl view::RankedSliceable for HostMeshRef {
1533 fn sliced(&self, region: Region) -> Self {
1534 let ranks = self
1535 .region()
1536 .remap(®ion)
1537 .unwrap()
1538 .map(|index| self.get(index).unwrap().clone());
1539 Self::new(self.name.clone(), region, ranks.collect()).unwrap()
1540 }
1541}
1542
1543impl std::fmt::Display for HostMeshRef {
1544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1545 write!(f, "{}:", self.name)?;
1546 for (rank, host) in self.ranks.iter().enumerate() {
1547 if rank > 0 {
1548 write!(f, ",")?;
1549 }
1550 write!(f, "{}", host)?;
1551 }
1552 write!(f, "@{}", self.region)
1553 }
1554}
1555
1556#[derive(thiserror::Error, Debug)]
1558pub enum HostMeshRefParseError {
1559 #[error(transparent)]
1560 RegionParseError(#[from] RegionParseError),
1561
1562 #[error("invalid host mesh ref: missing region")]
1563 MissingRegion,
1564
1565 #[error("invalid host mesh ref: missing name")]
1566 MissingName,
1567
1568 #[error(transparent)]
1569 InvalidName(#[from] crate::NameParseError),
1570
1571 #[error(transparent)]
1572 InvalidHostMeshRef(#[from] Box<crate::Error>),
1573
1574 #[error(transparent)]
1575 Other(#[from] anyhow::Error),
1576}
1577
1578impl From<crate::Error> for HostMeshRefParseError {
1579 fn from(err: crate::Error) -> Self {
1580 Self::InvalidHostMeshRef(Box::new(err))
1581 }
1582}
1583
1584impl FromStr for HostMeshRef {
1585 type Err = HostMeshRefParseError;
1586
1587 fn from_str(s: &str) -> Result<Self, Self::Err> {
1588 let (name, rest) = s
1589 .split_once(':')
1590 .ok_or(HostMeshRefParseError::MissingName)?;
1591
1592 let name = Name::from_str(name)?;
1593
1594 let (hosts, region) = rest
1595 .split_once('@')
1596 .ok_or(HostMeshRefParseError::MissingRegion)?;
1597 let hosts = hosts
1598 .split(',')
1599 .map(|host| host.trim())
1600 .map(|host| host.parse::<HostRef>())
1601 .collect::<Result<Vec<_>, _>>()?;
1602 let region = region.parse()?;
1603 Ok(HostMeshRef::new(name, region, hosts)?)
1604 }
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609 use std::assert_matches::assert_matches;
1610 use std::collections::HashSet;
1611 use std::collections::VecDeque;
1612
1613 use hyperactor::config::ENABLE_DEST_ACTOR_REORDERING_BUFFER;
1614 use hyperactor::context::Mailbox as _;
1615 use hyperactor_config::attrs::Attrs;
1616 use itertools::Itertools;
1617 use ndslice::ViewExt;
1618 use ndslice::extent;
1619 use timed_test::async_timed_test;
1620 use tokio::process::Command;
1621
1622 use super::*;
1623 use crate::ActorMesh;
1624 use crate::Bootstrap;
1625 use crate::bootstrap::MESH_TAIL_LOG_LINES;
1626 use crate::comm::ENABLE_NATIVE_V1_CASTING;
1627 use crate::resource::Status;
1628 use crate::testactor;
1629 use crate::testactor::GetConfigAttrs;
1630 use crate::testactor::SetConfigAttrs;
1631 use crate::testing;
1632
1633 #[test]
1634 fn test_host_mesh_subset() {
1635 let hosts: HostMeshRef = "test:local:1,local:2,local:3,local:4@replica=2/2,host=2/1"
1636 .parse()
1637 .unwrap();
1638 assert_eq!(
1639 hosts.range("replica", 1).unwrap().to_string(),
1640 "test:local:3,local:4@2+replica=1/2,host=2/1"
1641 );
1642 }
1643
1644 #[test]
1645 fn test_host_mesh_ref_parse_roundtrip() {
1646 let host_mesh_ref = HostMeshRef::new(
1647 Name::new("test").unwrap(),
1648 extent!(replica = 2, host = 2).into(),
1649 vec![
1650 "tcp:127.0.0.1:123".parse().unwrap(),
1651 "tcp:127.0.0.1:123".parse().unwrap(),
1652 "tcp:127.0.0.1:123".parse().unwrap(),
1653 "tcp:127.0.0.1:123".parse().unwrap(),
1654 ],
1655 )
1656 .unwrap();
1657
1658 assert_eq!(
1659 host_mesh_ref.to_string().parse::<HostMeshRef>().unwrap(),
1660 host_mesh_ref
1661 );
1662 }
1663
1664 #[cfg(fbcode_build)]
1665 async fn execute_allocate(config: &hyperactor_config::global::ConfigLock) {
1666 let poll = Duration::from_secs(3);
1667 let get_actor = Duration::from_mins(1);
1668 let get_proc = Duration::from_mins(1);
1669 let slack = Duration::from_secs(57);
1671
1672 let _pdeath_sig =
1673 config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1674 let _poll = config.override_key(crate::mesh_controller::SUPERVISION_POLL_FREQUENCY, poll);
1675 let _get_actor = config.override_key(crate::proc_mesh::GET_ACTOR_STATE_MAX_IDLE, get_actor);
1676 let _get_proc = config.override_key(crate::host_mesh::GET_PROC_STATE_MAX_IDLE, get_proc);
1677
1678 let _watchdog = config.override_key(
1680 crate::actor_mesh::SUPERVISION_WATCHDOG_TIMEOUT,
1681 poll + get_actor + get_proc + slack,
1682 );
1683
1684 let instance = testing::instance();
1685
1686 for alloc in testing::allocs(extent!(replicas = 4)).await {
1687 let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1688 .await
1689 .unwrap();
1690
1691 let proc_mesh1 = host_mesh
1692 .spawn(instance, "test_1", Extent::unity())
1693 .await
1694 .unwrap();
1695
1696 let actor_mesh1: ActorMesh<testactor::TestActor> =
1697 proc_mesh1.spawn(instance, "test", &()).await.unwrap();
1698
1699 let proc_mesh2 = host_mesh
1700 .spawn(instance, "test_2", extent!(gpus = 3, extra = 2))
1701 .await
1702 .unwrap();
1703 assert_eq!(
1704 proc_mesh2.extent(),
1705 extent!(replicas = 4, gpus = 3, extra = 2)
1706 );
1707 assert_eq!(proc_mesh2.values().count(), 24);
1708
1709 let actor_mesh2: ActorMesh<testactor::TestActor> =
1710 proc_mesh2.spawn(instance, "test", &()).await.unwrap();
1711 assert_eq!(
1712 actor_mesh2.extent(),
1713 extent!(replicas = 4, gpus = 3, extra = 2)
1714 );
1715 assert_eq!(actor_mesh2.values().count(), 24);
1716
1717 let host_mesh_ref: HostMeshRef = host_mesh.clone();
1719 assert_eq!(
1721 host_mesh_ref.iter().collect::<Vec<_>>(),
1722 host_mesh.iter().collect::<Vec<_>>(),
1723 );
1724
1725 for actor_mesh in [&actor_mesh1, &actor_mesh2] {
1727 let (port, mut rx) = instance.mailbox().open_port();
1728 actor_mesh
1729 .cast(instance, testactor::GetActorId(port.bind()))
1730 .unwrap();
1731
1732 let mut expected_actor_ids: HashSet<_> = actor_mesh
1733 .values()
1734 .map(|actor_ref| actor_ref.actor_id().clone())
1735 .collect();
1736
1737 while !expected_actor_ids.is_empty() {
1738 let (actor_id, _seq) = rx.recv().await.unwrap();
1739 assert!(
1740 expected_actor_ids.remove(&actor_id),
1741 "got {actor_id}, expect {expected_actor_ids:?}"
1742 );
1743 }
1744 }
1745
1746 let mut to_visit: VecDeque<_> = actor_mesh1
1750 .values()
1751 .chain(actor_mesh2.values())
1752 .map(|actor_ref| actor_ref.port())
1753 .permutations(2)
1755 .flatten()
1757 .collect();
1758
1759 let expect_visited: Vec<_> = to_visit.clone().into();
1760
1761 let (last, mut last_rx) = instance.mailbox().open_port();
1763 to_visit.push_back(last.bind());
1764
1765 let forward = testactor::Forward {
1766 to_visit,
1767 visited: Vec::new(),
1768 };
1769 let first = forward.to_visit.front().unwrap().clone();
1770 first.send(instance, forward).unwrap();
1771
1772 let forward = last_rx.recv().await.unwrap();
1773 assert_eq!(forward.visited, expect_visited);
1774
1775 let _ = host_mesh.shutdown(&instance).await;
1776 }
1777 }
1778
1779 #[async_timed_test(timeout_secs = 600)]
1780 #[cfg(fbcode_build)]
1781 async fn test_allocate_dest_reorder_buffer_off() {
1782 let config = hyperactor_config::global::lock();
1783 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
1784 execute_allocate(&config).await;
1785 }
1786
1787 #[async_timed_test(timeout_secs = 600)]
1788 #[cfg(fbcode_build)]
1789 async fn test_allocate_dest_reorder_buffer_on() {
1790 let config = hyperactor_config::global::lock();
1791 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1792 let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1793 execute_allocate(&config).await;
1794 }
1795
1796 fn free_localhost_addr() -> ChannelAddr {
1802 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1803 ChannelAddr::Tcp(listener.local_addr().unwrap())
1804 }
1805
1806 #[cfg(fbcode_build)]
1807 async fn execute_extrinsic_allocation(config: &hyperactor_config::global::ConfigLock) {
1808 let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1809
1810 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1811
1812 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1813
1814 let mut children = Vec::new();
1815 for host in hosts.iter() {
1816 let mut cmd = Command::new(program.clone());
1817 let boot = Bootstrap::Host {
1818 addr: host.clone(),
1819 command: None, config: None,
1821 exit_on_shutdown: false,
1822 };
1823 boot.to_env(&mut cmd);
1824 cmd.kill_on_drop(true);
1825 children.push(cmd.spawn().unwrap());
1826 }
1827
1828 let instance = testing::instance();
1829 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1830
1831 let proc_mesh = host_mesh
1832 .spawn(&testing::instance(), "test", Extent::unity())
1833 .await
1834 .unwrap();
1835
1836 let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1837 .spawn(&testing::instance(), "test", &())
1838 .await
1839 .unwrap();
1840
1841 testactor::assert_mesh_shape(actor_mesh).await;
1842
1843 HostMesh::take(host_mesh)
1844 .shutdown(&instance)
1845 .await
1846 .expect("hosts shutdown");
1847 }
1848
1849 #[tokio::test]
1850 #[cfg(fbcode_build)]
1851 async fn test_extrinsic_allocation_v0() {
1852 let config = hyperactor_config::global::lock();
1853 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, false);
1854 execute_extrinsic_allocation(&config).await;
1855 }
1856
1857 #[tokio::test]
1858 #[cfg(fbcode_build)]
1859 async fn test_extrinsic_allocation_v1() {
1860 let config = hyperactor_config::global::lock();
1861 let _guard = config.override_key(ENABLE_NATIVE_V1_CASTING, true);
1862 let _guard1 = config.override_key(ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1863 execute_extrinsic_allocation(&config).await;
1864 }
1865
1866 #[tokio::test]
1867 #[cfg(fbcode_build)]
1868 async fn test_failing_proc_allocation() {
1869 let lock = hyperactor_config::global::lock();
1870 let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
1871
1872 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1873
1874 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1875
1876 let mut children = Vec::new();
1877 for host in hosts.iter() {
1878 let mut cmd = Command::new(program.clone());
1879 let boot = Bootstrap::Host {
1880 addr: host.clone(),
1881 config: None,
1882 command: Some(BootstrapCommand::from("false")),
1884 exit_on_shutdown: false,
1885 };
1886 boot.to_env(&mut cmd);
1887 cmd.kill_on_drop(true);
1888 children.push(cmd.spawn().unwrap());
1889 }
1890 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1891
1892 let instance = testing::instance();
1893
1894 let err = host_mesh
1895 .spawn(&instance, "test", Extent::unity())
1896 .await
1897 .unwrap_err();
1898 assert_matches!(
1899 err,
1900 crate::Error::ProcCreationError { state, .. }
1901 if matches!(state.status, resource::Status::Failed(ref msg) if msg.contains("failed to configure process: Ready(Terminal(Stopped { exit_code: 1"))
1902 );
1903 }
1904
1905 #[tokio::test]
1906 #[cfg(fbcode_build)]
1907 async fn test_halting_proc_allocation() {
1908 let config = hyperactor_config::global::lock();
1909 let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(20));
1910
1911 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1912
1913 let hosts = vec![free_localhost_addr(), free_localhost_addr()];
1914
1915 let mut children = Vec::new();
1916
1917 for (index, host) in hosts.iter().enumerate() {
1918 let mut cmd = Command::new(program.clone());
1919 let command = if index == 0 {
1920 let mut command = BootstrapCommand::from("sleep");
1921 command.args.push("60".to_string());
1922 Some(command)
1923 } else {
1924 None
1925 };
1926 let boot = Bootstrap::Host {
1927 addr: host.clone(),
1928 config: None,
1929 command,
1930 exit_on_shutdown: false,
1931 };
1932 boot.to_env(&mut cmd);
1933 cmd.kill_on_drop(true);
1934 children.push(cmd.spawn().unwrap());
1935 }
1936 let host_mesh = HostMeshRef::from_hosts(Name::new("test").unwrap(), hosts);
1937
1938 let instance = testing::instance();
1939
1940 let err = host_mesh
1941 .spawn(&instance, "test", Extent::unity())
1942 .await
1943 .unwrap_err();
1944 let statuses = err.into_proc_spawn_error().unwrap();
1945 assert_matches!(
1946 &statuses.materialized_iter(2).cloned().collect::<Vec<_>>()[..],
1947 &[Status::Timeout(_), Status::Running]
1948 );
1949 }
1950
1951 #[tokio::test]
1952 #[cfg(fbcode_build)]
1953 async fn test_client_config_override() {
1954 let config = hyperactor_config::global::lock();
1955 let _guard1 = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
1956 let _guard2 = config.override_key(
1957 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1958 Duration::from_mins(2),
1959 );
1960 let _guard3 = config.override_key(
1961 hyperactor::config::MESSAGE_DELIVERY_TIMEOUT,
1962 Duration::from_mins(1),
1963 );
1964
1965 unsafe {
1970 std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT");
1971 std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT");
1972 }
1973
1974 let instance = testing::instance();
1975
1976 let mut hm = testing::host_mesh(2).await;
1977 let proc_mesh = hm.spawn(instance, "test", Extent::unity()).await.unwrap();
1978
1979 let actor_mesh: ActorMesh<testactor::TestActor> =
1980 proc_mesh.spawn(instance, "test", &()).await.unwrap();
1981
1982 let mut attrs_override = Attrs::new();
1983 attrs_override.set(
1984 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1985 Duration::from_mins(3),
1986 );
1987 actor_mesh
1988 .cast(
1989 instance,
1990 SetConfigAttrs(bincode::serialize(&attrs_override).unwrap()),
1991 )
1992 .unwrap();
1993
1994 let (tx, mut rx) = instance.open_port();
1995 actor_mesh
1996 .cast(instance, GetConfigAttrs(tx.bind()))
1997 .unwrap();
1998 let actual_attrs = rx.recv().await.unwrap();
1999 let actual_attrs = bincode::deserialize::<Attrs>(&actual_attrs).unwrap();
2000
2001 assert_eq!(
2002 *actual_attrs
2003 .get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT)
2004 .unwrap(),
2005 Duration::from_mins(3)
2006 );
2007 assert_eq!(
2008 *actual_attrs
2009 .get(hyperactor::config::MESSAGE_DELIVERY_TIMEOUT)
2010 .unwrap(),
2011 Duration::from_mins(1)
2012 );
2013
2014 let _ = hm.shutdown(instance).await;
2015 }
2016}