1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::env::VarError;
18use std::fmt;
19use std::fs::OpenOptions;
20use std::future;
21use std::io;
22use std::io::Write;
23use std::path::Path;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::sync::OnceLock;
29use std::sync::Weak;
30use std::time::Duration;
31use std::time::SystemTime;
32
33use async_trait::async_trait;
34use base64::prelude::*;
35use futures::StreamExt;
36use futures::stream;
37use humantime::format_duration;
38use hyperactor::ActorHandle;
39use hyperactor::channel;
40use hyperactor::channel::ChannelAddr;
41use hyperactor::channel::ChannelError;
42use hyperactor::channel::ChannelTransport;
43use hyperactor::channel::Rx;
44use hyperactor::channel::Tx;
45use hyperactor::context;
46use hyperactor::host::Host;
47use hyperactor::host::HostError;
48use hyperactor::host::ProcHandle;
49use hyperactor::host::ProcManager;
50use hyperactor::host::TerminateSummary;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::mailbox::MailboxServerHandle;
55use hyperactor::proc::Proc;
56use hyperactor::reference as hyperactor_reference;
57use hyperactor_config::CONFIG;
58use hyperactor_config::ConfigAttr;
59use hyperactor_config::attrs::Attrs;
60use hyperactor_config::attrs::declare_attrs;
61use hyperactor_config::global::override_or_global;
62use serde::Deserialize;
63use serde::Serialize;
64use tempfile::TempDir;
65use tokio::process::Command;
66use tokio::sync::oneshot;
67use tokio::sync::watch;
68use tracing::Instrument;
69use tracing::Level;
70use typeuri::Named;
71
72use crate::config::MESH_PROC_LAUNCHER_KIND;
73use crate::host_mesh::host_agent::HostAgent;
74use crate::host_mesh::host_agent::HostAgentMode;
75use crate::logging::OutputTarget;
76use crate::logging::StreamFwder;
77use crate::proc_agent::ProcAgent;
78use crate::proc_launcher::LaunchOptions;
79use crate::proc_launcher::NativeProcLauncher;
80use crate::proc_launcher::ProcExitKind;
81use crate::proc_launcher::ProcExitResult;
82use crate::proc_launcher::ProcLauncher;
83use crate::proc_launcher::ProcLauncherError;
84use crate::proc_launcher::StdioHandling;
85#[cfg(target_os = "linux")]
86use crate::proc_launcher::SystemdProcLauncher;
87use crate::proc_launcher::format_process_name;
88use crate::resource;
89
90mod mailbox;
91
92declare_attrs! {
93 @meta(CONFIG = ConfigAttr::new(
109 Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
110 Some("enable_log_forwarding".to_string()),
111 ))
112 pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
113
114 @meta(CONFIG = ConfigAttr::new(
134 Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
135 Some("enable_file_capture".to_string()),
136 ))
137 pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
138
139 @meta(CONFIG = ConfigAttr::new(
143 Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
144 Some("tail_log_lines".to_string()),
145 ))
146 pub attr MESH_TAIL_LOG_LINES: usize = 0;
147
148 @meta(CONFIG = ConfigAttr::new(
155 Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
156 Some("mesh_bootstrap_enable_pdeathsig".to_string()),
157 ))
158 pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
159
160 @meta(CONFIG = ConfigAttr::new(
165 Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
166 Some("mesh_terminate_concurrency".to_string()),
167 ))
168 pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
169
170 @meta(CONFIG = ConfigAttr::new(
174 Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
175 Some("mesh_terminate_timeout".to_string()),
176 ))
177 pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
178}
179
180pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
181pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
182pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
183
184pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
188
189pub(crate) const BOOTSTRAP_MODE_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_MODE";
190pub(crate) const PROCESS_NAME_ENV: &str = "HYPERACTOR_PROCESS_NAME";
191
192#[derive(Debug, Clone, Serialize, Deserialize, Named)]
196pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
197wirevalue::register_type!(Process2Allocator);
198
199#[derive(Debug, Clone, Serialize, Deserialize, Named)]
201pub(crate) enum Process2AllocatorMessage {
202 Hello(ChannelAddr),
206
207 StartedProc(
212 hyperactor_reference::ProcId,
213 hyperactor_reference::ActorRef<ProcAgent>,
214 ChannelAddr,
215 ),
216
217 Heartbeat,
218}
219wirevalue::register_type!(Process2AllocatorMessage);
220
221#[derive(Debug, Clone, Serialize, Deserialize, Named)]
223pub(crate) enum Allocator2Process {
224 StartProc(hyperactor_reference::ProcId, ChannelTransport),
227
228 StopAndExit(i32),
231
232 Exit(i32),
235}
236wirevalue::register_type!(Allocator2Process);
237
238async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
239 let tx = match channel::dial(bootstrap_addr.clone()) {
240 Ok(tx) => tx,
241
242 Err(err) => {
243 tracing::error!(
244 "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
245 bootstrap_addr,
246 err
247 );
248 std::process::exit(1);
249 }
250 };
251 tracing::info!(
252 "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
253 );
254 loop {
255 tokio::time::sleep(Duration::from_secs(5)).await;
256
257 let result = tx
258 .send(Process2Allocator(
259 bootstrap_index,
260 Process2AllocatorMessage::Heartbeat,
261 ))
262 .await;
263
264 if let Err(err) = result {
265 tracing::error!(
266 "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
267 bootstrap_addr,
268 err
269 );
270 std::process::exit(1);
271 }
272 }
273}
274
275#[macro_export]
276macro_rules! ok {
277 ($expr:expr $(,)?) => {
278 match $expr {
279 Ok(value) => value,
280 Err(e) => return ::anyhow::Error::from(e),
281 }
282 };
283}
284
285async fn halt<R>() -> R {
286 future::pending::<()>().await;
287 unreachable!()
288}
289
290pub struct HostShutdownHandle {
296 rx: tokio::sync::oneshot::Receiver<MailboxServerHandle>,
297 exit_on_shutdown: bool,
298}
299
300impl HostShutdownHandle {
301 pub async fn join(self) {
304 match self.rx.await {
305 Ok(mailbox_handle) => {
306 mailbox_handle.stop("host shutting down");
307 let _ = mailbox_handle.await;
308 }
309 Err(_) => {} }
311 if self.exit_on_shutdown {
312 std::process::exit(0);
313 }
314 }
315}
316
317pub async fn host(
327 addr: ChannelAddr,
328 command: Option<BootstrapCommand>,
329 config: Option<Attrs>,
330 exit_on_shutdown: bool,
331) -> anyhow::Result<(ActorHandle<HostAgent>, HostShutdownHandle)> {
332 if let Some(attrs) = config {
333 hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
334 tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
335 } else {
336 tracing::debug!("bootstrap: no config snapshot provided (Host)");
337 }
338
339 let command = match command {
340 Some(command) => command,
341 None => BootstrapCommand::current()?,
342 };
343 let manager = BootstrapProcManager::new(command)?;
344
345 let host = Host::new(manager, addr).await?;
346 let addr = host.addr().clone();
347
348 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<MailboxServerHandle>();
353
354 let system_proc = host.system_proc().clone();
355 let host_mesh_agent = system_proc.spawn::<HostAgent>(
356 "host_agent",
357 HostAgent::new(HostAgentMode::Process {
358 host,
359 shutdown_tx: Some(shutdown_tx),
360 }),
361 )?;
362
363 tracing::info!(
364 "serving host at {}, agent: {}",
365 addr,
366 host_mesh_agent.bind::<HostAgent>()
367 );
368
369 Ok((
370 host_mesh_agent,
371 HostShutdownHandle {
372 rx: shutdown_rx,
373 exit_on_shutdown,
374 },
375 ))
376}
377
378#[derive(Clone, Debug, Serialize, Deserialize)]
387pub enum Bootstrap {
388 Proc {
390 proc_id: hyperactor_reference::ProcId,
392 backend_addr: ChannelAddr,
395 callback_addr: ChannelAddr,
397 socket_dir_path: PathBuf,
401 config: Option<Attrs>,
406 },
407
408 Host {
411 addr: ChannelAddr,
413 command: Option<BootstrapCommand>,
416 config: Option<Attrs>,
421 exit_on_shutdown: bool,
423 },
424
425 V0ProcMesh {
427 config: Option<Attrs>,
432 },
433}
434
435impl Default for Bootstrap {
436 fn default() -> Self {
437 Bootstrap::V0ProcMesh { config: None }
438 }
439}
440
441impl Bootstrap {
442 #[allow(clippy::result_large_err)]
445 pub(crate) fn to_env_safe_string(&self) -> crate::Result<String> {
446 Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
447 }
448
449 #[allow(clippy::result_large_err)]
451 pub(crate) fn from_env_safe_string(str: &str) -> crate::Result<Self> {
452 let data = BASE64_STANDARD.decode(str)?;
453 let data = std::str::from_utf8(&data)?;
454 Ok(serde_json::from_str(data)?)
455 }
456
457 pub fn get_from_env() -> anyhow::Result<Option<Self>> {
460 match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
461 Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
462 Ok(mode) => Ok(Some(mode)),
463 Err(e) => {
464 Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
465 }
466 },
467 Err(VarError::NotPresent) => Ok(None),
468 Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
469 }
470 }
471
472 pub fn to_env(&self, cmd: &mut Command) {
474 cmd.env(
475 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
476 self.to_env_safe_string().unwrap(),
477 );
478 }
479
480 pub async fn bootstrap(self) -> anyhow::Result<i32> {
484 tracing::info!(
485 "bootstrapping mesh process: {}",
486 serde_json::to_string(&self).unwrap()
487 );
488
489 if Debug::is_active() {
490 let mut buf = Vec::new();
491 writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
492 #[cfg(unix)]
493 writeln!(
494 &mut buf,
495 "\tparent pid: {}",
496 std::os::unix::process::parent_id()
497 )
498 .unwrap();
499 writeln!(
500 &mut buf,
501 "\tconfig: {}",
502 serde_json::to_string(&self).unwrap()
503 )
504 .unwrap();
505 match std::env::current_exe() {
506 Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
507 Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
508 }
509 writeln!(&mut buf, "\targs:").unwrap();
510 for arg in std::env::args() {
511 writeln!(&mut buf, "\t\t{}", arg).unwrap();
512 }
513 writeln!(&mut buf, "\tenv:").unwrap();
514 for (key, val) in std::env::vars() {
515 writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
516 }
517 let _ = Debug.write(&buf);
518 if let Ok(s) = std::str::from_utf8(&buf) {
519 tracing::info!("{}", s);
520 } else {
521 tracing::info!("{:?}", buf);
522 }
523 }
524
525 match self {
526 Bootstrap::Proc {
527 proc_id,
528 backend_addr,
529 callback_addr,
530 socket_dir_path,
531 config,
532 } => {
533 let entered = tracing::span!(
534 Level::INFO,
535 "proc_bootstrap",
536 %proc_id,
537 %backend_addr,
538 %callback_addr,
539 socket_dir_path = %socket_dir_path.display(),
540 )
541 .entered();
542 if let Some(attrs) = config {
543 hyperactor_config::global::set(
544 hyperactor_config::global::Source::ClientOverride,
545 attrs,
546 );
547 tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
548 } else {
549 tracing::debug!("bootstrap: no config snapshot provided (Proc)");
550 }
551
552 if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
553 let _ = install_pdeathsig_kill();
558 } else {
559 eprintln!("(bootstrap) PDEATHSIG disabled via config");
560 }
561
562 let (local_addr, name) = (proc_id.addr().clone(), proc_id.name());
563 let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
565 let serve_addr = serve_addr.parse().unwrap();
566
567 let proc_sender = mailbox::LocalProcDialer::new(
572 local_addr.clone(),
573 socket_dir_path,
574 MailboxClient::dial(backend_addr)?,
575 );
576
577 let proc = Proc::configured(proc_id.clone(), proc_sender.into_boxed());
578
579 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<i32>();
580 let agent_handle = ProcAgent::boot_v1(proc.clone(), Some(shutdown_tx))
581 .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
582
583 let span = entered.exit();
584
585 let (proc_addr, proc_rx) = channel::serve(serve_addr)?;
588 let mailbox_handle = proc.clone().serve(proc_rx);
589 channel::dial(callback_addr)?
590 .send((proc_addr, agent_handle.bind::<ProcAgent>()))
591 .instrument(span)
592 .await
593 .map_err(ChannelError::from)?;
594
595 let exit_code = shutdown_rx.await.unwrap_or(1);
598 mailbox_handle.stop("process shutting down");
599 let _ = mailbox_handle.await;
600 tracing::info!("bootstrap shutting down with exit code {}", exit_code);
601 Ok(exit_code)
604 }
605 Bootstrap::Host {
606 addr,
607 command,
608 config,
609 exit_on_shutdown,
610 } => {
611 let (_agent_handle, shutdown) =
612 host(addr, command, config, exit_on_shutdown).await?;
613 shutdown.join().await;
614 halt().await
615 }
616 Bootstrap::V0ProcMesh { config } => Err(bootstrap_v0_proc_mesh(config).await),
617 }
618 }
619
620 pub async fn bootstrap_or_die(self) -> ! {
623 let exit_code = match self.bootstrap().await {
624 Ok(exit_code) => exit_code,
625 Err(err) => {
626 tracing::error!("failed to bootstrap mesh process: {}", err);
627 1
628 }
629 };
630 std::process::exit(exit_code);
631 }
632}
633
634pub fn install_pdeathsig_kill() -> io::Result<()> {
636 #[cfg(target_os = "linux")]
637 {
638 let ppid_before = unsafe { libc::getppid() };
641
642 let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
645 if rc != 0 {
646 return Err(io::Error::last_os_error());
647 }
648
649 let ppid_after = unsafe { libc::getppid() };
659 if ppid_before != ppid_after {
660 std::process::exit(0);
661 }
662 }
663 Ok(())
664}
665
666#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
681pub enum ProcStatus {
682 Starting,
685 Running { started_at: SystemTime },
688 Ready {
691 started_at: SystemTime,
692 addr: ChannelAddr,
693 agent: hyperactor_reference::ActorRef<ProcAgent>,
694 },
695 Stopping { started_at: SystemTime },
699 Stopped {
702 exit_code: i32,
703 stderr_tail: Vec<String>,
704 },
705 Killed { signal: i32, core_dumped: bool },
708 Failed { reason: String },
712}
713
714impl ProcStatus {
715 #[inline]
719 pub fn is_exit(&self) -> bool {
720 matches!(
721 self,
722 ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
723 )
724 }
725}
726
727impl std::fmt::Display for ProcStatus {
728 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729 match self {
730 ProcStatus::Starting => write!(f, "Starting"),
731 ProcStatus::Running { started_at } => {
732 let uptime = started_at
733 .elapsed()
734 .map(|d| format!(" up {}", format_duration(d)))
735 .unwrap_or_default();
736 write!(f, "Running{uptime}")
737 }
738 ProcStatus::Ready {
739 started_at, addr, ..
740 } => {
741 let uptime = started_at
742 .elapsed()
743 .map(|d| format!(" up {}", format_duration(d)))
744 .unwrap_or_default();
745 write!(f, "Ready at {addr}{uptime}")
746 }
747 ProcStatus::Stopping { started_at } => {
748 let uptime = started_at
749 .elapsed()
750 .map(|d| format!(" up {}", format_duration(d)))
751 .unwrap_or_default();
752 write!(f, "Stopping{uptime}")
753 }
754 ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
755 ProcStatus::Killed {
756 signal,
757 core_dumped,
758 } => {
759 if *core_dumped {
760 write!(f, "Killed(sig={signal}, core)")
761 } else {
762 write!(f, "Killed(sig={signal})")
763 }
764 }
765 ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
766 }
767 }
768}
769
770#[derive(Debug, Clone)]
772pub enum ReadyError {
773 Terminal(ProcStatus),
775 ChannelClosed,
777}
778
779impl std::fmt::Display for ReadyError {
780 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781 match self {
782 ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
783 ReadyError::ChannelClosed => write!(f, "status channel closed"),
784 }
785 }
786}
787impl std::error::Error for ReadyError {}
788
789#[derive(Clone)]
828pub struct BootstrapProcHandle {
829 proc_id: hyperactor_reference::ProcId,
831
832 status: Arc<std::sync::Mutex<ProcStatus>>,
838
839 launcher: Weak<dyn ProcLauncher>,
848
849 stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
854
855 stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
858
859 tx: tokio::sync::watch::Sender<ProcStatus>,
864
865 rx: tokio::sync::watch::Receiver<ProcStatus>,
869}
870
871impl fmt::Debug for BootstrapProcHandle {
872 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873 let status = self.status.lock().expect("status mutex poisoned").clone();
874 f.debug_struct("BootstrapProcHandle")
875 .field("proc_id", &self.proc_id)
876 .field("status", &status)
877 .field("launcher", &"<dyn ProcLauncher>")
878 .field("tx", &"<watch::Sender>")
879 .field("rx", &"<watch::Receiver>")
880 .finish()
883 }
884}
885
886impl BootstrapProcHandle {
888 pub(crate) fn new(
899 proc_id: hyperactor_reference::ProcId,
900 launcher: Weak<dyn ProcLauncher>,
901 ) -> Self {
902 let (tx, rx) = watch::channel(ProcStatus::Starting);
903 Self {
904 proc_id,
905 status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
906 launcher,
907 stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
908 stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
909 tx,
910 rx,
911 }
912 }
913
914 #[inline]
916 pub fn proc_id(&self) -> &hyperactor_reference::ProcId {
917 &self.proc_id
918 }
919
920 #[inline]
933 pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
934 self.rx.clone()
935 }
936
937 #[inline]
955 pub async fn changed(&self) {
956 let _ = self.watch().changed().await;
957 }
958
959 #[must_use]
972 pub fn status(&self) -> ProcStatus {
973 self.status.lock().expect("status mutex poisoned").clone()
977 }
978
979 fn transition<F>(&self, f: F) -> bool
985 where
986 F: FnOnce(&mut ProcStatus) -> bool,
987 {
988 let mut guard = self.status.lock().expect("status mutex poisoned");
989 let _before = guard.clone();
990 let changed = f(&mut guard);
991 if changed {
992 let _ = self.tx.send(guard.clone());
994 }
995 changed
996 }
997
998 pub(crate) fn mark_running(&self, started_at: SystemTime) -> bool {
1008 self.transition(|st| match *st {
1009 ProcStatus::Starting => {
1010 *st = ProcStatus::Running { started_at };
1011 true
1012 }
1013 _ => {
1014 tracing::warn!(
1015 "illegal transition: {:?} -> Running; leaving status unchanged",
1016 *st
1017 );
1018 false
1019 }
1020 })
1021 }
1022
1023 pub(crate) fn mark_ready(
1035 &self,
1036 addr: ChannelAddr,
1037 agent: hyperactor_reference::ActorRef<ProcAgent>,
1038 ) -> bool {
1039 tracing::info!(proc_id = %self.proc_id, %addr, "{} ready at {}", self.proc_id, addr);
1040 self.transition(|st| match st {
1041 ProcStatus::Starting => {
1042 *st = ProcStatus::Ready {
1045 started_at: std::time::SystemTime::now(),
1046 addr,
1047 agent,
1048 };
1049 true
1050 }
1051 ProcStatus::Running { started_at } => {
1052 let started_at = *started_at;
1053 *st = ProcStatus::Ready {
1054 started_at,
1055 addr,
1056 agent,
1057 };
1058 true
1059 }
1060 _ => {
1061 tracing::warn!(
1062 "illegal transition: {:?} -> Ready; leaving status unchanged",
1063 st
1064 );
1065 false
1066 }
1067 })
1068 }
1069
1070 pub(crate) fn mark_stopping(&self) -> bool {
1074 let now = std::time::SystemTime::now();
1075
1076 self.transition(|st| match *st {
1077 ProcStatus::Running { started_at } => {
1078 *st = ProcStatus::Stopping { started_at };
1079 true
1080 }
1081 ProcStatus::Ready { started_at, .. } => {
1082 *st = ProcStatus::Stopping { started_at };
1083 true
1084 }
1085 ProcStatus::Starting => {
1086 *st = ProcStatus::Stopping { started_at: now };
1087 true
1088 }
1089 _ => false,
1090 })
1091 }
1092
1093 pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1096 self.transition(|st| match *st {
1097 ProcStatus::Starting
1098 | ProcStatus::Running { .. }
1099 | ProcStatus::Ready { .. }
1100 | ProcStatus::Stopping { .. } => {
1101 *st = ProcStatus::Stopped {
1102 exit_code,
1103 stderr_tail,
1104 };
1105 true
1106 }
1107 _ => {
1108 tracing::warn!(
1109 "illegal transition: {:?} -> Stopped; leaving status unchanged",
1110 *st
1111 );
1112 false
1113 }
1114 })
1115 }
1116
1117 pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1120 self.transition(|st| match *st {
1121 ProcStatus::Starting
1122 | ProcStatus::Running { .. }
1123 | ProcStatus::Ready { .. }
1124 | ProcStatus::Stopping { .. } => {
1125 *st = ProcStatus::Killed {
1126 signal,
1127 core_dumped,
1128 };
1129 true
1130 }
1131 _ => {
1132 tracing::warn!(
1133 "illegal transition: {:?} -> Killed; leaving status unchanged",
1134 *st
1135 );
1136 false
1137 }
1138 })
1139 }
1140
1141 pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1144 self.transition(|st| match *st {
1145 ProcStatus::Starting
1146 | ProcStatus::Running { .. }
1147 | ProcStatus::Ready { .. }
1148 | ProcStatus::Stopping { .. } => {
1149 *st = ProcStatus::Failed {
1150 reason: reason.into(),
1151 };
1152 true
1153 }
1154 _ => {
1155 tracing::warn!(
1156 "illegal transition: {:?} -> Failed; leaving status unchanged",
1157 *st
1158 );
1159 false
1160 }
1161 })
1162 }
1163
1164 #[must_use]
1183 pub async fn wait_inner(&self) -> ProcStatus {
1184 let mut rx = self.watch();
1185 loop {
1186 let st = rx.borrow().clone();
1187 if st.is_exit() {
1188 return st;
1189 }
1190 if rx.changed().await.is_err() {
1192 return st;
1193 }
1194 }
1195 }
1196
1197 pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1216 let mut rx = self.watch();
1217 loop {
1218 let st = rx.borrow().clone();
1219 match &st {
1220 ProcStatus::Ready { .. } => return Ok(()),
1221 s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1222 _non_terminal => {
1223 if rx.changed().await.is_err() {
1224 return Err(ReadyError::ChannelClosed);
1225 }
1226 }
1227 }
1228 }
1229 }
1230
1231 pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1232 *self
1233 .stdout_fwder
1234 .lock()
1235 .expect("stdout_tailer mutex poisoned") = out;
1236 *self
1237 .stderr_fwder
1238 .lock()
1239 .expect("stderr_tailer mutex poisoned") = err;
1240 }
1241
1242 fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1243 let out = self
1244 .stdout_fwder
1245 .lock()
1246 .expect("stdout_tailer mutex poisoned")
1247 .take();
1248 let err = self
1249 .stderr_fwder
1250 .lock()
1251 .expect("stderr_tailer mutex poisoned")
1252 .take();
1253 (out, err)
1254 }
1255
1256 pub(crate) async fn wait_or_brutally_kill(&self, timeout: Duration) {
1263 match tokio::time::timeout(timeout, self.wait_inner()).await {
1264 Ok(st) if st.is_exit() => return,
1265 _ => {}
1266 }
1267
1268 let _ = self.mark_stopping();
1269
1270 if let Some(launcher) = self.launcher.upgrade() {
1271 if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1272 tracing::warn!(
1273 proc_id = %self.proc_id,
1274 error = %e,
1275 "wait_or_brutally_kill: launcher terminate failed, trying kill"
1276 );
1277 let _ = launcher.kill(&self.proc_id).await;
1278 }
1279 }
1280
1281 let _ = self.wait_inner().await;
1282 }
1283
1284 async fn send_stop_all(
1288 &self,
1289 cx: &impl context::Actor,
1290 agent: hyperactor_reference::ActorRef<ProcAgent>,
1291 timeout: Duration,
1292 reason: &str,
1293 ) -> anyhow::Result<ProcStatus> {
1294 let mut agent_port = agent.port();
1301 agent_port.return_undeliverable(false);
1302 agent_port.send(
1303 cx,
1304 resource::StopAll {
1305 reason: reason.to_string(),
1306 },
1307 )?;
1308 match tokio::time::timeout(timeout, self.wait()).await {
1311 Ok(Ok(st)) => Ok(st),
1312 Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1313 Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1314 }
1315 }
1316}
1317
1318#[async_trait]
1319impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1320 type Agent = ProcAgent;
1321 type TerminalStatus = ProcStatus;
1322
1323 #[inline]
1324 fn proc_id(&self) -> &hyperactor_reference::ProcId {
1325 &self.proc_id
1326 }
1327
1328 #[inline]
1329 fn addr(&self) -> Option<ChannelAddr> {
1330 match &*self.status.lock().expect("status mutex poisoned") {
1331 ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1332 _ => None,
1333 }
1334 }
1335
1336 #[inline]
1337 fn agent_ref(&self) -> Option<hyperactor_reference::ActorRef<Self::Agent>> {
1338 match &*self.status.lock().expect("status mutex poisoned") {
1339 ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1340 _ => None,
1341 }
1342 }
1343
1344 async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1355 match self.ready_inner().await {
1356 Ok(()) => Ok(()),
1357 Err(ReadyError::Terminal(status)) => {
1358 Err(hyperactor::host::ReadyError::Terminal(status))
1359 }
1360 Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1361 }
1362 }
1363
1364 async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1372 let status = self.wait_inner().await;
1373 if status.is_exit() {
1374 Ok(status)
1375 } else {
1376 Err(hyperactor::host::WaitError::ChannelClosed)
1377 }
1378 }
1379
1380 async fn terminate(
1401 &self,
1402 cx: &impl context::Actor,
1403 timeout: Duration,
1404 reason: &str,
1405 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1406 let st0 = self.status();
1408 if st0.is_exit() {
1409 tracing::debug!(?st0, "terminate(): already terminal");
1410 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1411 }
1412
1413 let agent = self.agent_ref();
1416 if let Some(agent) = agent {
1417 match self.send_stop_all(cx, agent.clone(), timeout, reason).await {
1418 Ok(st) => return Ok(st),
1419 Err(e) => {
1420 tracing::warn!(
1422 "ProcAgent {} could not successfully stop all actors: {}",
1423 agent.actor_id(),
1424 e,
1425 );
1426 }
1427 }
1428 }
1429
1430 let _ = self.mark_stopping();
1432
1433 tracing::info!(proc_id = %self.proc_id, ?timeout, "terminate(): delegating to launcher");
1435 if let Some(launcher) = self.launcher.upgrade() {
1436 if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1437 tracing::warn!(proc_id = %self.proc_id, error=%e, "terminate(): launcher termination failed");
1438 return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1439 "launcher termination failed: {}",
1440 e
1441 )));
1442 }
1443 } else {
1444 tracing::debug!(proc_id = %self.proc_id, "terminate(): launcher gone, proc cleanup in progress");
1446 }
1447
1448 let st = self.wait_inner().await;
1450 if st.is_exit() {
1451 tracing::info!(proc_id = %self.proc_id, ?st, "terminate(): exited");
1452 Ok(st)
1453 } else {
1454 Err(hyperactor::host::TerminateError::ChannelClosed)
1455 }
1456 }
1457
1458 async fn kill(
1476 &self,
1477 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1478 let st0 = self.status();
1480 if st0.is_exit() {
1481 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1482 }
1483
1484 tracing::info!(proc_id = %self.proc_id, "kill(): delegating to launcher");
1486 if let Some(launcher) = self.launcher.upgrade() {
1487 if let Err(e) = launcher.kill(&self.proc_id).await {
1488 tracing::warn!(proc_id = %self.proc_id, error=%e, "kill(): launcher kill failed");
1489 return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1490 "launcher kill failed: {}",
1491 e
1492 )));
1493 }
1494 } else {
1495 tracing::debug!(proc_id = %self.proc_id, "kill(): launcher gone, proc cleanup in progress");
1497 }
1498
1499 let st = self.wait_inner().await;
1501 if st.is_exit() {
1502 Ok(st)
1503 } else {
1504 Err(hyperactor::host::TerminateError::ChannelClosed)
1505 }
1506 }
1507}
1508
1509#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1511pub struct BootstrapCommand {
1512 pub program: PathBuf,
1513 pub arg0: Option<String>,
1514 pub args: Vec<String>,
1515 pub env: HashMap<String, String>,
1516}
1517wirevalue::register_type!(BootstrapCommand);
1518
1519impl BootstrapCommand {
1520 pub fn current() -> io::Result<Self> {
1523 let mut args: VecDeque<String> = std::env::args().collect();
1524 let arg0 = args.pop_front();
1525
1526 Ok(Self {
1527 program: std::env::current_exe()?,
1528 arg0,
1529 args: args.into(),
1530 env: std::env::vars().collect(),
1531 })
1532 }
1533
1534 pub fn new(&self) -> Command {
1537 let mut cmd = Command::new(&self.program);
1538 if let Some(arg0) = &self.arg0 {
1539 cmd.arg0(arg0);
1540 }
1541 for arg in &self.args {
1542 cmd.arg(arg);
1543 }
1544 for (k, v) in &self.env {
1545 cmd.env(k, v);
1546 }
1547 cmd
1548 }
1549
1550 #[cfg(test)]
1557 #[cfg(fbcode_build)]
1558 pub(crate) fn test() -> Self {
1559 Self {
1560 program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1561 arg0: None,
1562 args: vec![],
1563 env: HashMap::new(),
1564 }
1565 }
1566}
1567
1568impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1569 fn from(s: T) -> Self {
1571 Self {
1572 program: s.into(),
1573 arg0: None,
1574 args: vec![],
1575 env: HashMap::new(),
1576 }
1577 }
1578}
1579
1580#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1603pub(crate) enum LauncherKind {
1604 Native,
1607 #[cfg(target_os = "linux")]
1610 Systemd,
1611}
1612
1613impl FromStr for LauncherKind {
1614 type Err = io::Error;
1615
1616 fn from_str(s: &str) -> Result<Self, Self::Err> {
1625 match s.trim().to_ascii_lowercase().as_str() {
1626 "" | "native" => Ok(Self::Native),
1627 #[cfg(target_os = "linux")]
1628 "systemd" => Ok(Self::Systemd),
1629 other => Err(io::Error::new(
1630 io::ErrorKind::InvalidInput,
1631 format!(
1632 "unknown proc launcher kind {other:?}; expected 'native'{}",
1633 if cfg!(target_os = "linux") {
1634 " or 'systemd'"
1635 } else {
1636 ""
1637 }
1638 ),
1639 )),
1640 }
1641 }
1642}
1643
1644pub struct BootstrapProcManager {
1673 launcher: OnceLock<Arc<dyn ProcLauncher>>,
1676
1677 command: BootstrapCommand,
1679
1680 children: Arc<tokio::sync::Mutex<HashMap<hyperactor_reference::ProcId, BootstrapProcHandle>>>,
1684
1685 file_appender: Option<Arc<crate::logging::FileAppender>>,
1688
1689 socket_dir: TempDir,
1693}
1694
1695impl BootstrapProcManager {
1696 pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1703 let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1704 match crate::logging::FileAppender::new() {
1705 Some(fm) => {
1706 tracing::info!("file appender created successfully");
1707 Some(Arc::new(fm))
1708 }
1709 None => {
1710 tracing::warn!("failed to create file appender");
1711 None
1712 }
1713 }
1714 } else {
1715 None
1716 };
1717
1718 Ok(Self {
1719 launcher: OnceLock::new(),
1720 command,
1721 children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1722 file_appender,
1723 socket_dir: runtime_dir()?,
1724 })
1725 }
1726
1727 pub fn set_launcher(&self, launcher: Arc<dyn ProcLauncher>) -> Result<(), ProcLauncherError> {
1736 self.launcher.set(launcher).map_err(|_| {
1737 ProcLauncherError::Other(
1738 "launcher already initialized; call set_proc_launcher before first spawn".into(),
1739 )
1740 })
1741 }
1742
1743 pub fn launcher(&self) -> &Arc<dyn ProcLauncher> {
1749 self.launcher.get_or_init(|| {
1750 let kind_str = hyperactor_config::global::get_cloned(MESH_PROC_LAUNCHER_KIND);
1751 let kind: LauncherKind = kind_str.parse().unwrap_or(LauncherKind::Native);
1752 tracing::info!(kind = ?kind, config_value = %kind_str, "using default proc launcher");
1753 match kind {
1754 LauncherKind::Native => Arc::new(NativeProcLauncher::new()),
1755 #[cfg(target_os = "linux")]
1756 LauncherKind::Systemd => Arc::new(SystemdProcLauncher::new()),
1757 }
1758 })
1759 }
1760
1761 pub fn command(&self) -> &BootstrapCommand {
1763 &self.command
1764 }
1765
1766 pub fn socket_dir(&self) -> &Path {
1768 self.socket_dir.path()
1769 }
1770
1771 pub async fn status(&self, proc_id: &hyperactor_reference::ProcId) -> Option<ProcStatus> {
1782 self.children.lock().await.get(proc_id).map(|h| h.status())
1783 }
1784
1785 pub(crate) async fn request_stop(
1792 &self,
1793 cx: &impl context::Actor,
1794 proc: &hyperactor_reference::ProcId,
1795 timeout: Duration,
1796 reason: &str,
1797 ) {
1798 let handle = {
1799 let guard = self.children.lock().await;
1800 guard.get(proc).cloned()
1801 };
1802
1803 let Some(handle) = handle else { return };
1804
1805 let status = handle.status();
1806 if status.is_exit() || matches!(status, ProcStatus::Stopping { .. }) {
1807 return;
1808 }
1809
1810 if let Some(agent) = handle.agent_ref() {
1811 let mut agent_port = agent.port();
1812 agent_port.return_undeliverable(false);
1813 let _ = agent_port.send(
1814 cx,
1815 resource::StopAll {
1816 reason: reason.to_string(),
1817 },
1818 );
1819 }
1820
1821 let _ = handle.mark_stopping();
1822 tokio::spawn(async move {
1823 handle.wait_or_brutally_kill(timeout).await;
1824 });
1825 }
1826
1827 fn spawn_exit_monitor(
1828 &self,
1829 proc_id: hyperactor_reference::ProcId,
1830 handle: BootstrapProcHandle,
1831 exit_rx: tokio::sync::oneshot::Receiver<ProcExitResult>,
1832 ) {
1833 tokio::spawn(async move {
1834 let exit_result = match exit_rx.await {
1836 Ok(res) => res,
1837 Err(_) => {
1838 let _ = handle.mark_failed("exit_rx sender dropped unexpectedly");
1840 tracing::error!(
1841 name = "ProcStatus",
1842 status = "Exited::ChannelDropped",
1843 %proc_id,
1844 "exit channel closed without result"
1845 );
1846 return;
1847 }
1848 };
1849
1850 let mut stderr_tail: Vec<String> = Vec::new();
1854 let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1855
1856 if let Some(t) = stderr_mon {
1857 let (lines, _bytes) = t.abort().await;
1858 stderr_tail = lines;
1859 }
1860 if let Some(t) = stdout_mon {
1861 let (_lines, _bytes) = t.abort().await;
1862 }
1863
1864 if stderr_tail.is_empty() {
1866 if let Some(tail) = exit_result.stderr_tail {
1867 stderr_tail = tail;
1868 }
1869 }
1870
1871 let tail_str = if stderr_tail.is_empty() {
1872 None
1873 } else {
1874 Some(stderr_tail.join("\n"))
1875 };
1876
1877 match exit_result.kind {
1878 ProcExitKind::Exited { code } => {
1879 let _ = handle.mark_stopped(code, stderr_tail);
1880 tracing::info!(
1881 name = "ProcStatus",
1882 status = "Exited::ExitWithCode",
1883 %proc_id,
1884 exit_code = code,
1885 tail = tail_str,
1886 "proc exited with code {code}"
1887 );
1888 }
1889 ProcExitKind::Signaled {
1890 signal,
1891 core_dumped,
1892 } => {
1893 let _ = handle.mark_killed(signal, core_dumped);
1894 tracing::info!(
1895 name = "ProcStatus",
1896 status = "Exited::KilledBySignal",
1897 %proc_id,
1898 tail = tail_str,
1899 "killed by signal {signal}"
1900 );
1901 }
1902 ProcExitKind::Failed { reason } => {
1903 let _ = handle.mark_failed(&reason);
1904 tracing::info!(
1905 name = "ProcStatus",
1906 status = "Exited::Failed",
1907 %proc_id,
1908 tail = tail_str,
1909 "proc failed: {reason}"
1910 );
1911 }
1912 }
1913 });
1914 }
1915}
1916
1917pub struct BootstrapProcConfig {
1919 pub create_rank: usize,
1921
1922 pub client_config_override: Attrs,
1925}
1926
1927#[async_trait]
1928impl ProcManager for BootstrapProcManager {
1929 type Handle = BootstrapProcHandle;
1930
1931 type Config = BootstrapProcConfig;
1932
1933 fn transport(&self) -> ChannelTransport {
1940 ChannelTransport::Unix
1941 }
1942
1943 #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
1970 async fn spawn(
1971 &self,
1972 proc_id: hyperactor_reference::ProcId,
1973 backend_addr: ChannelAddr,
1974 config: BootstrapProcConfig,
1975 ) -> Result<Self::Handle, HostError> {
1976 let (callback_addr, mut callback_rx) =
1977 channel::serve::<(ChannelAddr, hyperactor_reference::ActorRef<ProcAgent>)>(
1978 ChannelAddr::any(ChannelTransport::Unix),
1979 )?;
1980
1981 let overrides = &config.client_config_override;
1983 let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1984 let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1985 let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1986 let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1987
1988 let mode = Bootstrap::Proc {
1989 proc_id: proc_id.clone(),
1990 backend_addr,
1991 callback_addr,
1992 socket_dir_path: self.socket_dir.path().to_owned(),
1993 config: Some(config.client_config_override.clone()),
1994 };
1995
1996 let bootstrap_payload = mode
1998 .to_env_safe_string()
1999 .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?;
2000
2001 let opts = LaunchOptions {
2002 bootstrap_payload,
2003 process_name: format_process_name(&proc_id),
2004 command: self.command.clone(),
2005 want_stdio: need_stdio,
2006 tail_lines: tail_size,
2007 log_channel: if enable_forwarding {
2008 Some(ChannelAddr::any(ChannelTransport::Unix))
2009 } else {
2010 None
2011 },
2012 };
2013
2014 let launch_result = self
2016 .launcher()
2017 .launch(&proc_id, opts.clone())
2018 .await
2019 .map_err(|e| {
2020 let io_err = match e {
2021 ProcLauncherError::Launch(io_err) => io_err,
2022 other => std::io::Error::other(other.to_string()),
2023 };
2024 HostError::ProcessSpawnFailure(proc_id.clone(), io_err)
2025 })?;
2026
2027 let (out_fwder, err_fwder) = match launch_result.stdio {
2029 StdioHandling::Captured { stdout, stderr } => {
2030 let (file_stdout, file_stderr) = if enable_file_capture {
2031 match self.file_appender.as_deref() {
2032 Some(fm) => (
2033 Some(fm.addr_for(OutputTarget::Stdout)),
2034 Some(fm.addr_for(OutputTarget::Stderr)),
2035 ),
2036 None => {
2037 tracing::warn!("enable_file_capture=true but no FileAppender");
2038 (None, None)
2039 }
2040 }
2041 } else {
2042 (None, None)
2043 };
2044
2045 let out = StreamFwder::start(
2046 stdout,
2047 file_stdout,
2048 OutputTarget::Stdout,
2049 tail_size,
2050 opts.log_channel.clone(),
2051 &proc_id,
2052 config.create_rank,
2053 );
2054 let err = StreamFwder::start(
2055 stderr,
2056 file_stderr,
2057 OutputTarget::Stderr,
2058 tail_size,
2059 opts.log_channel.clone(),
2060 &proc_id,
2061 config.create_rank,
2062 );
2063 (Some(out), Some(err))
2064 }
2065 StdioHandling::Inherited | StdioHandling::ManagedByLauncher => {
2066 if !need_stdio {
2067 tracing::info!(
2068 %proc_id, enable_forwarding, enable_file_capture, tail_size,
2069 "child stdio NOT captured (forwarding/file_capture/tail all disabled)"
2070 );
2071 }
2072 (None, None)
2073 }
2074 };
2075
2076 let handle = BootstrapProcHandle::new(proc_id.clone(), Arc::downgrade(self.launcher()));
2078 handle.mark_running(launch_result.started_at);
2079 handle.set_stream_monitors(out_fwder, err_fwder);
2080
2081 {
2083 let mut children = self.children.lock().await;
2084 children.insert(proc_id.clone(), handle.clone());
2085 }
2086
2087 self.spawn_exit_monitor(proc_id.clone(), handle.clone(), launch_result.exit_rx);
2090
2091 let h = handle.clone();
2093 tokio::spawn(async move {
2094 match callback_rx.recv().await {
2095 Ok((addr, agent)) => {
2096 let _ = h.mark_ready(addr, agent);
2097 }
2098 Err(e) => {
2099 let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2101 }
2102 }
2103 });
2104
2105 Ok(handle)
2107 }
2108}
2109
2110#[async_trait]
2111impl hyperactor::host::SingleTerminate for BootstrapProcManager {
2112 async fn terminate_proc(
2122 &self,
2123 cx: &impl context::Actor,
2124 proc: &hyperactor_reference::ProcId,
2125 timeout: Duration,
2126 reason: &str,
2127 ) -> Result<
2128 (
2129 Vec<hyperactor_reference::ActorId>,
2130 Vec<hyperactor_reference::ActorId>,
2131 ),
2132 anyhow::Error,
2133 > {
2134 let proc_handle: Option<BootstrapProcHandle> = {
2136 let mut guard = self.children.lock().await;
2137 guard.remove(proc)
2138 };
2139
2140 if let Some(h) = proc_handle {
2141 h.terminate(cx, timeout, reason)
2142 .await
2143 .map(|_| (Vec::new(), Vec::new()))
2144 .map_err(|e| e.into())
2145 } else {
2146 Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2147 }
2148 }
2149}
2150
2151#[async_trait]
2152impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2153 async fn terminate_all(
2167 &self,
2168 cx: &impl context::Actor,
2169 timeout: Duration,
2170 max_in_flight: usize,
2171 reason: &str,
2172 ) -> TerminateSummary {
2173 let handles: Vec<BootstrapProcHandle> = {
2175 let guard = self.children.lock().await;
2176 guard.values().cloned().collect()
2177 };
2178
2179 let attempted = handles.len();
2180 let mut ok = 0usize;
2181
2182 let results = stream::iter(handles.into_iter().map(|h| async move {
2183 match h.terminate(cx, timeout, reason).await {
2184 Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2185 true
2187 }
2188 Err(e) => {
2189 tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2190 false
2191 }
2192 }
2193 }))
2194 .buffer_unordered(max_in_flight.max(1))
2195 .collect::<Vec<bool>>()
2196 .await;
2197
2198 for r in results {
2199 if r {
2200 ok += 1;
2201 }
2202 }
2203
2204 TerminateSummary {
2205 attempted,
2206 ok,
2207 failed: attempted.saturating_sub(ok),
2208 }
2209 }
2210}
2211
2212pub async fn bootstrap() -> anyhow::Result<i32> {
2229 let boot = Bootstrap::get_from_env()?.unwrap_or_else(Bootstrap::default);
2230 boot.bootstrap().await
2231}
2232
2233async fn bootstrap_v0_proc_mesh(config: Option<Attrs>) -> anyhow::Error {
2243 if let Some(attrs) = config {
2245 hyperactor_config::global::set(hyperactor_config::global::Source::ClientOverride, attrs);
2246 tracing::debug!("bootstrap: installed ClientOverride config snapshot (V0ProcMesh)");
2247 } else {
2248 tracing::debug!("bootstrap: no config snapshot provided (V0ProcMesh)");
2249 }
2250 tracing::info!(
2251 "bootstrap_v0_proc_mesh config:\n{}",
2252 hyperactor_config::global::attrs()
2253 );
2254
2255 pub async fn go() -> Result<(), anyhow::Error> {
2256 let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2257 let procs_for_cleanup = procs.clone();
2258 let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2259 for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2260 if let Err(err) = proc_to_stop
2261 .destroy_and_wait::<()>(
2262 Duration::from_millis(10),
2263 None,
2264 "execute cleanup callback",
2265 )
2266 .await
2267 {
2268 tracing::error!(
2269 "error while stopping proc {}: {}",
2270 proc_to_stop.proc_id(),
2271 err
2272 );
2273 }
2274 }
2275 }));
2276
2277 let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2278 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2279 .parse()?;
2280 let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2281 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2282 .parse()?;
2283 let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2284
2285 let entered = tracing::span!(
2286 Level::INFO,
2287 "bootstrap_v0_proc_mesh",
2288 %bootstrap_addr,
2289 %bootstrap_index,
2290 %listen_addr,
2291 )
2292 .entered();
2293
2294 let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2295 let tx = channel::dial(bootstrap_addr.clone())?;
2296
2297 let (rtx, mut return_channel) = oneshot::channel();
2298 tx.try_post(
2299 Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2300 rtx,
2301 );
2302 tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2303
2304 let _ = entered.exit();
2305
2306 let mut the_msg;
2307
2308 tokio::select! {
2309 msg = rx.recv() => {
2310 the_msg = msg;
2311 }
2312 returned_msg = &mut return_channel => {
2313 match returned_msg {
2314 Ok(msg) => {
2315 return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2316 }
2317 Err(_) => {
2318 the_msg = rx.recv().await;
2319 }
2320 }
2321 }
2322 }
2323 loop {
2324 match the_msg? {
2325 Allocator2Process::StartProc(proc_id, listen_transport) => {
2326 let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2327 let (proc, mesh_agent) = ProcAgent::bootstrap(proc_id.clone())
2328 .instrument(span.clone())
2329 .await?;
2330 let entered = span.entered();
2331 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2332 let handle = proc.clone().serve(proc_rx);
2333 drop(handle); let span = entered.exit();
2335 tx.send(Process2Allocator(
2336 bootstrap_index,
2337 Process2AllocatorMessage::StartedProc(
2338 proc_id.clone(),
2339 mesh_agent.bind(),
2340 proc_addr,
2341 ),
2342 ))
2343 .instrument(span)
2344 .await?;
2345 procs.lock().await.push(proc);
2346 }
2347 Allocator2Process::StopAndExit(code) => {
2348 tracing::info!("stopping procs with code {code}");
2349 {
2350 for proc_to_stop in procs.lock().await.iter_mut() {
2351 if let Err(err) = proc_to_stop
2352 .destroy_and_wait::<()>(
2353 Duration::from_millis(10),
2354 None,
2355 "stop and exit",
2356 )
2357 .await
2358 {
2359 tracing::error!(
2360 "error while stopping proc {}: {}",
2361 proc_to_stop.proc_id(),
2362 err
2363 );
2364 }
2365 }
2366 }
2367 tracing::info!("exiting with {code}");
2368 std::process::exit(code);
2369 }
2370 Allocator2Process::Exit(code) => {
2371 tracing::info!("exiting with {code}");
2372 std::process::exit(code);
2373 }
2374 }
2375 the_msg = rx.recv().await;
2376 }
2377 }
2378
2379 go().await.unwrap_err()
2380}
2381
2382pub async fn bootstrap_or_die() -> ! {
2385 match bootstrap().await {
2386 Ok(exit_code) => std::process::exit(exit_code),
2387 Err(err) => {
2388 let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2389 tracing::error!("failed to bootstrap mesh process: {}", err);
2390 std::process::exit(1);
2391 }
2392 }
2393}
2394
2395#[derive(enum_as_inner::EnumAsInner)]
2396enum DebugSink {
2397 File(std::fs::File),
2398 Sink,
2399}
2400
2401impl DebugSink {
2402 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2403 match self {
2404 DebugSink::File(f) => f.write(buf),
2405 DebugSink::Sink => Ok(buf.len()),
2406 }
2407 }
2408 fn flush(&mut self) -> io::Result<()> {
2409 match self {
2410 DebugSink::File(f) => f.flush(),
2411 DebugSink::Sink => Ok(()),
2412 }
2413 }
2414}
2415
2416fn debug_sink() -> &'static Mutex<DebugSink> {
2417 static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2418 DEBUG_SINK.get_or_init(|| {
2419 let debug_path = {
2420 let mut p = std::env::temp_dir();
2421 if let Ok(user) = std::env::var("USER") {
2422 p.push(user);
2423 }
2424 std::fs::create_dir_all(&p).ok();
2425 p.push("monarch-bootstrap-debug.log");
2426 p
2427 };
2428 let sink = if debug_path.exists() {
2429 match OpenOptions::new()
2430 .append(true)
2431 .create(true)
2432 .open(debug_path.clone())
2433 {
2434 Ok(f) => DebugSink::File(f),
2435 Err(_e) => {
2436 eprintln!(
2437 "failed to open {} for bootstrap debug logging",
2438 debug_path.display()
2439 );
2440 DebugSink::Sink
2441 }
2442 }
2443 } else {
2444 DebugSink::Sink
2445 };
2446 Mutex::new(sink)
2447 })
2448}
2449
2450const DEBUG_TO_STDERR: bool = false;
2452
2453struct Debug;
2456
2457impl Debug {
2458 fn is_active() -> bool {
2459 DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2460 }
2461}
2462
2463impl Write for Debug {
2464 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2465 let res = debug_sink().lock().unwrap().write(buf);
2466 if DEBUG_TO_STDERR {
2467 let n = match res {
2468 Ok(n) => n,
2469 Err(_) => buf.len(),
2470 };
2471 let _ = io::stderr().write_all(&buf[..n]);
2472 }
2473
2474 res
2475 }
2476 fn flush(&mut self) -> io::Result<()> {
2477 let res = debug_sink().lock().unwrap().flush();
2478 if DEBUG_TO_STDERR {
2479 let _ = io::stderr().flush();
2480 }
2481 res
2482 }
2483}
2484
2485fn runtime_dir() -> io::Result<TempDir> {
2488 match std::env::var_os("XDG_RUNTIME_DIR") {
2489 Some(runtime_dir) => {
2490 let path = PathBuf::from(runtime_dir);
2491 tempfile::tempdir_in(path)
2492 }
2493 None => tempfile::tempdir(),
2494 }
2495}
2496
2497#[cfg(test)]
2498mod tests {
2499 use std::path::PathBuf;
2500
2501 use hyperactor::RemoteSpawn;
2502 use hyperactor::channel::ChannelAddr;
2503 use hyperactor::channel::ChannelTransport;
2504 use hyperactor::channel::TcpMode;
2505 use hyperactor::context::Mailbox as _;
2506 use hyperactor::host::ProcHandle;
2507 use hyperactor::reference as hyperactor_reference;
2508 use hyperactor::testing::ids::test_proc_id;
2509 use hyperactor::testing::ids::test_proc_id_with_addr;
2510 use hyperactor_config::Flattrs;
2511 use ndslice::Extent;
2512 use ndslice::ViewExt;
2513 use ndslice::extent;
2514 use tokio::process::Command;
2515
2516 use super::*;
2517 use crate::ActorMesh;
2518 use crate::alloc::AllocSpec;
2519 use crate::alloc::Allocator;
2520 use crate::alloc::ProcessAllocator;
2521 use crate::host_mesh::HostMesh;
2522 use crate::testactor;
2523 use crate::testing;
2524
2525 #[test]
2526 fn test_bootstrap_mode_env_string_none_config_proc() {
2527 let values = [
2528 Bootstrap::default(),
2529 Bootstrap::Proc {
2530 proc_id: test_proc_id("foo_0"),
2531 backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2532 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2533 socket_dir_path: PathBuf::from("notexist"),
2534 config: None,
2535 },
2536 ];
2537
2538 for value in values {
2539 let safe = value.to_env_safe_string().unwrap();
2540 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2541
2542 let safe2 = round.to_env_safe_string().unwrap();
2545 assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2546
2547 match (&value, &round) {
2549 (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2550 (
2551 Bootstrap::V0ProcMesh { config: None },
2552 Bootstrap::V0ProcMesh { config: None },
2553 ) => {}
2554 _ => panic!("decoded variant mismatch: got {:?}", round),
2555 }
2556 }
2557 }
2558
2559 #[test]
2560 fn test_bootstrap_mode_env_string_none_config_host() {
2561 let value = Bootstrap::Host {
2562 addr: ChannelAddr::any(ChannelTransport::Unix),
2563 command: None,
2564 config: None,
2565 exit_on_shutdown: false,
2566 };
2567
2568 let safe = value.to_env_safe_string().unwrap();
2569 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2570
2571 let safe2 = round.to_env_safe_string().unwrap();
2573 assert_eq!(safe, safe2);
2574
2575 match round {
2577 Bootstrap::Host { config: None, .. } => {}
2578 other => panic!("expected Host with None config, got {:?}", other),
2579 }
2580 }
2581
2582 #[test]
2583 fn test_bootstrap_mode_env_string_invalid() {
2584 assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2586 }
2587
2588 #[test]
2589 fn test_bootstrap_config_snapshot_roundtrip() {
2590 let mut attrs = Attrs::new();
2592 attrs[MESH_TAIL_LOG_LINES] = 123;
2593 attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2594
2595 let socket_dir = runtime_dir().unwrap();
2596
2597 {
2599 let original = Bootstrap::Proc {
2600 proc_id: test_proc_id("foo_42"),
2601 backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2602 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2603 config: Some(attrs.clone()),
2604 socket_dir_path: socket_dir.path().to_owned(),
2605 };
2606 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2607 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2608 match &decoded {
2609 Bootstrap::Proc { config, .. } => {
2610 let cfg = config.as_ref().expect("expected Some(attrs)");
2611 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2612 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2613 }
2614 other => panic!("unexpected variant after roundtrip: {:?}", other),
2615 }
2616 }
2617
2618 {
2620 let original = Bootstrap::Host {
2621 addr: ChannelAddr::any(ChannelTransport::Unix),
2622 command: None,
2623 config: Some(attrs.clone()),
2624 exit_on_shutdown: false,
2625 };
2626 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2627 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2628 match &decoded {
2629 Bootstrap::Host { config, .. } => {
2630 let cfg = config.as_ref().expect("expected Some(attrs)");
2631 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2632 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2633 }
2634 other => panic!("unexpected variant after roundtrip: {:?}", other),
2635 }
2636 }
2637
2638 {
2640 let original = Bootstrap::V0ProcMesh {
2641 config: Some(attrs.clone()),
2642 };
2643 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2644 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2645 match &decoded {
2646 Bootstrap::V0ProcMesh { config } => {
2647 let cfg = config.as_ref().expect("expected Some(attrs)");
2648 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2649 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2650 }
2651 other => panic!("unexpected variant after roundtrip: {:?}", other),
2652 }
2653 }
2654 }
2655
2656 #[tokio::test]
2657 async fn test_v1_child_logging() {
2658 use hyperactor::channel;
2659 use hyperactor::mailbox::BoxedMailboxSender;
2660 use hyperactor::mailbox::DialMailboxRouter;
2661 use hyperactor::mailbox::MailboxServer;
2662 use hyperactor::proc::Proc;
2663
2664 use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2665 use crate::logging::LogClientActor;
2666 use crate::logging::LogClientMessageClient;
2667 use crate::logging::LogForwardActor;
2668 use crate::logging::LogMessage;
2669 use crate::logging::OutputTarget;
2670 use crate::logging::test_tap;
2671
2672 let router = DialMailboxRouter::new();
2673 let (proc_addr, proc_rx) =
2674 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2675 let proc = Proc::configured(
2676 test_proc_id("client_0"),
2677 BoxedMailboxSender::new(router.clone()),
2678 );
2679 proc.clone().serve(proc_rx);
2680 router.bind(test_proc_id("client_0").into(), proc_addr.clone());
2681 let (client, _handle) = proc.instance("client").unwrap();
2682
2683 let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2684 test_tap::install(tap_tx);
2685
2686 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2687 unsafe {
2689 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2690 }
2691
2692 let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
2695 let log_client: hyperactor_reference::ActorRef<LogClientActor> =
2696 proc.spawn("log_client", log_client_actor).unwrap().bind();
2697 log_client.set_aggregate(&client, None).await.unwrap();
2698
2699 let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
2702 .await
2703 .unwrap();
2704 let _log_forwarder: hyperactor_reference::ActorRef<LogForwardActor> = proc
2705 .spawn("log_forwarder", log_forwarder_actor)
2706 .unwrap()
2707 .bind();
2708
2709 let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2712
2713 tx.post(LogMessage::Log {
2716 hostname: "testhost".into(),
2717 proc_id: "testproc[0]".into(),
2718 output_target: OutputTarget::Stdout,
2719 payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2720 });
2721
2722 let line = tokio::time::timeout(Duration::from_secs(2), tap_rx.recv())
2724 .await
2725 .expect("timed out waiting for log line")
2726 .expect("tap channel closed unexpectedly");
2727 assert!(
2728 line.contains("hello from child"),
2729 "log line did not appear via LogClientActor; got: {line}"
2730 );
2731 }
2732
2733 mod proc_handle {
2734
2735 use std::sync::Arc;
2736 use std::time::Duration;
2737
2738 use async_trait::async_trait;
2739 use hyperactor::host::ProcHandle;
2740 use hyperactor::reference as hyperactor_reference;
2741 use hyperactor::testing::ids::test_proc_id;
2742
2743 use super::super::*;
2744 use crate::proc_launcher::LaunchOptions;
2745 use crate::proc_launcher::LaunchResult;
2746 use crate::proc_launcher::ProcLauncher;
2747 use crate::proc_launcher::ProcLauncherError;
2748
2749 struct TestProcLauncher;
2757
2758 #[async_trait]
2759 impl ProcLauncher for TestProcLauncher {
2760 async fn launch(
2761 &self,
2762 _proc_id: &hyperactor_reference::ProcId,
2763 _opts: LaunchOptions,
2764 ) -> Result<LaunchResult, ProcLauncherError> {
2765 panic!("TestProcLauncher::launch should not be called in unit tests");
2766 }
2767
2768 async fn terminate(
2769 &self,
2770 _proc_id: &hyperactor_reference::ProcId,
2771 _timeout: Duration,
2772 ) -> Result<(), ProcLauncherError> {
2773 panic!("TestProcLauncher::terminate should not be called in unit tests");
2774 }
2775
2776 async fn kill(
2777 &self,
2778 _proc_id: &hyperactor_reference::ProcId,
2779 ) -> Result<(), ProcLauncherError> {
2780 panic!("TestProcLauncher::kill should not be called in unit tests");
2781 }
2782 }
2783
2784 fn handle_for_test() -> BootstrapProcHandle {
2791 let proc_id = test_proc_id("0");
2792 let launcher: Arc<dyn ProcLauncher> = Arc::new(TestProcLauncher);
2793 BootstrapProcHandle::new(proc_id, Arc::downgrade(&launcher))
2794 }
2795
2796 #[tokio::test]
2797 async fn starting_to_running_ok() {
2798 let h = handle_for_test();
2799 assert!(matches!(h.status(), ProcStatus::Starting));
2800 let child_started_at = std::time::SystemTime::now();
2801 assert!(h.mark_running(child_started_at));
2802 match h.status() {
2803 ProcStatus::Running { started_at } => {
2804 assert_eq!(started_at, child_started_at);
2805 }
2806 other => panic!("expected Running, got {other:?}"),
2807 }
2808 }
2809
2810 #[tokio::test]
2811 async fn running_to_stopping_to_stopped_ok() {
2812 let h = handle_for_test();
2813 let child_started_at = std::time::SystemTime::now();
2814 assert!(h.mark_running(child_started_at));
2815 assert!(h.mark_stopping());
2816 assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2817 assert!(h.mark_stopped(0, Vec::new()));
2818 assert!(matches!(
2819 h.status(),
2820 ProcStatus::Stopped { exit_code: 0, .. }
2821 ));
2822 }
2823
2824 #[tokio::test]
2825 async fn running_to_killed_ok() {
2826 let h = handle_for_test();
2827 let child_started_at = std::time::SystemTime::now();
2828 assert!(h.mark_running(child_started_at));
2829 assert!(h.mark_killed(9, true));
2830 assert!(matches!(
2831 h.status(),
2832 ProcStatus::Killed {
2833 signal: 9,
2834 core_dumped: true
2835 }
2836 ));
2837 }
2838
2839 #[tokio::test]
2840 async fn running_to_failed_ok() {
2841 let h = handle_for_test();
2842 let child_started_at = std::time::SystemTime::now();
2843 assert!(h.mark_running(child_started_at));
2844 assert!(h.mark_failed("bootstrap error"));
2845 match h.status() {
2846 ProcStatus::Failed { reason } => {
2847 assert_eq!(reason, "bootstrap error");
2848 }
2849 other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2850 }
2851 }
2852
2853 #[tokio::test]
2854 async fn illegal_transitions_are_rejected() {
2855 let h = handle_for_test();
2856 let child_started_at = std::time::SystemTime::now();
2857 assert!(h.mark_running(child_started_at));
2859 assert!(!h.mark_running(std::time::SystemTime::now()));
2860 assert!(matches!(h.status(), ProcStatus::Running { .. }));
2861 assert!(h.mark_stopping());
2863 assert!(h.mark_stopped(0, Vec::new()));
2864 assert!(!h.mark_running(child_started_at));
2865 assert!(!h.mark_killed(9, false));
2866 assert!(!h.mark_failed("nope"));
2867
2868 assert!(matches!(
2869 h.status(),
2870 ProcStatus::Stopped { exit_code: 0, .. }
2871 ));
2872 }
2873
2874 #[tokio::test]
2875 async fn transitions_from_ready_are_legal() {
2876 let h = handle_for_test();
2877 let addr = ChannelAddr::any(ChannelTransport::Unix);
2878 let t0 = std::time::SystemTime::now();
2880 assert!(h.mark_running(t0));
2881 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2884 let actor_id = proc_id.actor_id("proc_agent", 0);
2885 let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
2886 hyperactor_reference::ActorRef::attest(actor_id);
2887 assert!(h.mark_ready(addr, agent_ref));
2889 assert!(h.mark_stopping());
2890 assert!(h.mark_stopped(0, Vec::new()));
2891 }
2892
2893 #[tokio::test]
2894 async fn ready_to_killed_is_legal() {
2895 let h = handle_for_test();
2896 let addr = ChannelAddr::any(ChannelTransport::Unix);
2897 let t0 = std::time::SystemTime::now();
2899 assert!(h.mark_running(t0));
2900 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2903 let actor_id = proc_id.actor_id("proc_agent", 0);
2904 let agent: hyperactor_reference::ActorRef<ProcAgent> =
2905 hyperactor_reference::ActorRef::attest(actor_id);
2906 assert!(h.mark_ready(addr, agent));
2908 assert!(h.mark_killed(9, false));
2910 }
2911
2912 #[tokio::test]
2913 async fn mark_failed_from_stopping_is_allowed() {
2914 let h = handle_for_test();
2915
2916 assert!(h.mark_stopping(), "precondition: to Stopping");
2918
2919 assert!(
2921 h.mark_failed("boom"),
2922 "mark_failed() should succeed from Stopping"
2923 );
2924 match h.status() {
2925 ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2926 other => panic!("expected Failed(\"boom\"), got {other:?}"),
2927 }
2928 }
2929 }
2930
2931 struct TestLauncher;
2937
2938 #[async_trait::async_trait]
2939 impl crate::proc_launcher::ProcLauncher for TestLauncher {
2940 async fn launch(
2941 &self,
2942 _proc_id: &hyperactor_reference::ProcId,
2943 _opts: crate::proc_launcher::LaunchOptions,
2944 ) -> Result<crate::proc_launcher::LaunchResult, crate::proc_launcher::ProcLauncherError>
2945 {
2946 panic!("TestLauncher::launch should not be called in unit tests");
2947 }
2948
2949 async fn terminate(
2950 &self,
2951 _proc_id: &hyperactor_reference::ProcId,
2952 _timeout: std::time::Duration,
2953 ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2954 panic!("TestLauncher::terminate should not be called in unit tests");
2955 }
2956
2957 async fn kill(
2958 &self,
2959 _proc_id: &hyperactor_reference::ProcId,
2960 ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2961 panic!("TestLauncher::kill should not be called in unit tests");
2962 }
2963 }
2964
2965 fn test_handle(proc_id: hyperactor_reference::ProcId) -> BootstrapProcHandle {
2966 let launcher: std::sync::Arc<dyn crate::proc_launcher::ProcLauncher> =
2967 std::sync::Arc::new(TestLauncher);
2968 BootstrapProcHandle::new(proc_id, std::sync::Arc::downgrade(&launcher))
2969 }
2970
2971 #[tokio::test]
2972 async fn watch_notifies_on_status_changes() {
2973 let proc_id = test_proc_id("1");
2974 let handle = test_handle(proc_id);
2975 let mut rx = handle.watch();
2976
2977 let now = std::time::SystemTime::now();
2979 assert!(handle.mark_running(now));
2980 rx.changed().await.ok(); match &*rx.borrow() {
2982 ProcStatus::Running { started_at } => {
2983 assert_eq!(*started_at, now);
2984 }
2985 s => panic!("expected Running, got {s:?}"),
2986 }
2987
2988 assert!(handle.mark_stopped(0, Vec::new()));
2990 rx.changed().await.ok(); assert!(matches!(
2992 &*rx.borrow(),
2993 ProcStatus::Stopped { exit_code: 0, .. }
2994 ));
2995 }
2996
2997 #[tokio::test]
2998 async fn ready_errs_if_process_exits_before_running() {
2999 let proc_id =
3000 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "early-exit");
3001 let handle = test_handle(proc_id);
3002
3003 assert!(handle.mark_stopped(7, Vec::new()));
3006
3007 match handle.ready_inner().await {
3009 Ok(()) => panic!("ready() unexpectedly succeeded"),
3010 Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3011 assert_eq!(exit_code, 7)
3012 }
3013 Err(other) => panic!("expected Stopped(7), got {other:?}"),
3014 }
3015 }
3016
3017 #[tokio::test]
3018 async fn status_unknown_proc_is_none() {
3019 let manager = BootstrapProcManager::new(BootstrapCommand {
3020 program: PathBuf::from("/bin/true"),
3021 ..Default::default()
3022 })
3023 .unwrap();
3024 let unknown = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "nope");
3025 assert!(manager.status(&unknown).await.is_none());
3026 }
3027
3028 #[tokio::test]
3029 async fn handle_ready_allows_waiters() {
3030 let proc_id = test_proc_id("42");
3031 let handle = test_handle(proc_id.clone());
3032
3033 let started_at = std::time::SystemTime::now();
3034 assert!(handle.mark_running(started_at));
3035
3036 let actor_id = proc_id.actor_id("proc_agent", 0);
3037 let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
3038 hyperactor_reference::ActorRef::attest(actor_id);
3039
3040 let ready_addr = ChannelAddr::any(ChannelTransport::Unix);
3043
3044 assert!(handle.mark_ready(ready_addr.clone(), agent_ref));
3046 handle
3047 .ready_inner()
3048 .await
3049 .expect("ready_inner() should complete after Ready");
3050
3051 match handle.status() {
3054 ProcStatus::Ready {
3055 started_at: t,
3056 addr: a,
3057 ..
3058 } => {
3059 assert_eq!(t, started_at);
3060 assert_eq!(a, ready_addr);
3061 }
3062 other => panic!("expected Ready, got {other:?}"),
3063 }
3064 }
3065
3066 #[test]
3067 fn display_running_includes_uptime() {
3068 let started_at = std::time::SystemTime::now() - Duration::from_secs(42);
3069 let st = ProcStatus::Running { started_at };
3070
3071 let s = format!("{}", st);
3072 assert!(s.contains("Running"));
3073 assert!(s.contains("42s"));
3074 }
3075
3076 #[test]
3077 fn display_ready_includes_addr() {
3078 let started_at = std::time::SystemTime::now() - Duration::from_secs(5);
3079 let addr = ChannelAddr::any(ChannelTransport::Unix);
3080 let agent = hyperactor_reference::ActorRef::attest(
3081 test_proc_id_with_addr(addr.clone(), "proc")
3082 .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3083 );
3084
3085 let st = ProcStatus::Ready {
3086 started_at,
3087 addr: addr.clone(),
3088 agent,
3089 };
3090
3091 let s = format!("{}", st);
3092 assert!(s.contains(&addr.to_string())); assert!(s.contains("Ready"));
3094 }
3095
3096 #[test]
3097 fn display_stopped_includes_exit_code() {
3098 let st = ProcStatus::Stopped {
3099 exit_code: 7,
3100 stderr_tail: Vec::new(),
3101 };
3102 let s = format!("{}", st);
3103 assert!(s.contains("Stopped"));
3104 assert!(s.contains("7"));
3105 }
3106
3107 #[test]
3108 fn display_other_variants_does_not_panic() {
3109 let samples = vec![
3110 ProcStatus::Starting,
3111 ProcStatus::Stopping {
3112 started_at: std::time::SystemTime::now(),
3113 },
3114 ProcStatus::Ready {
3115 started_at: std::time::SystemTime::now(),
3116 addr: ChannelAddr::any(ChannelTransport::Unix),
3117 agent: hyperactor_reference::ActorRef::attest(
3118 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "x")
3119 .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3120 ),
3121 },
3122 ProcStatus::Killed {
3123 signal: 9,
3124 core_dumped: false,
3125 },
3126 ProcStatus::Failed {
3127 reason: "boom".into(),
3128 },
3129 ];
3130
3131 for st in samples {
3132 let _ = format!("{}", st); }
3134 }
3135
3136 #[tokio::test]
3137 async fn proc_handle_ready_ok_through_trait() {
3138 let proc_id =
3139 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-ready-ok");
3140 let handle = test_handle(proc_id.clone());
3141
3142 let t0 = std::time::SystemTime::now();
3144 assert!(handle.mark_running(t0));
3145
3146 let addr = ChannelAddr::any(ChannelTransport::Unix);
3148 let agent: hyperactor_reference::ActorRef<ProcAgent> =
3149 hyperactor_reference::ActorRef::attest(proc_id.actor_id("proc_agent", 0));
3150 assert!(handle.mark_ready(addr, agent));
3151
3152 let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3154 assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3155 }
3156
3157 #[tokio::test]
3158 async fn proc_handle_wait_returns_terminal_status() {
3159 let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-wait");
3160 let handle = test_handle(proc_id);
3161
3162 assert!(handle.mark_stopped(0, Vec::new()));
3164
3165 let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3167 .await
3168 .expect("wait should return Ok(terminal)");
3169
3170 match st {
3171 ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3172 other => panic!("expected Stopped(0), got {other:?}"),
3173 }
3174 }
3175
3176 #[tokio::test]
3177 async fn ready_wrapper_maps_terminal_to_trait_error() {
3178 let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "wrap");
3179 let handle = test_handle(proc_id);
3180
3181 assert!(handle.mark_stopped(7, Vec::new()));
3182
3183 match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3184 Ok(()) => panic!("expected Err"),
3185 Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3186 exit_code, ..
3187 })) => {
3188 assert_eq!(exit_code, 7);
3189 }
3190 Err(e) => panic!("unexpected error: {e:?}"),
3191 }
3192 }
3193
3194 async fn make_proc_id_and_backend_addr(
3204 instance: &hyperactor::Instance<()>,
3205 _tag: &str,
3206 ) -> (hyperactor_reference::ProcId, ChannelAddr) {
3207 let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3210
3211 instance.proc().clone().serve(rx);
3215
3216 let proc_id = test_proc_id_with_addr(ChannelTransport::Unix.any(), "proc");
3219 (proc_id, backend_addr)
3220 }
3221
3222 #[tokio::test]
3223 #[cfg(fbcode_build)]
3224 async fn bootstrap_handle_terminate_graceful() {
3225 let root =
3227 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3228 let (instance, _handle) = root.instance("client").unwrap();
3229
3230 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3231 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3232 let handle = mgr
3233 .spawn(
3234 proc_id.clone(),
3235 backend_addr.clone(),
3236 BootstrapProcConfig {
3237 create_rank: 0,
3238 client_config_override: Attrs::new(),
3239 },
3240 )
3241 .await
3242 .expect("spawn bootstrap child");
3243
3244 handle.ready().await.expect("ready");
3245
3246 let deadline = Duration::from_secs(2);
3247 match tokio::time::timeout(
3248 deadline * 2,
3249 handle.terminate(&instance, deadline, "test terminate"),
3250 )
3251 .await
3252 {
3253 Err(_) => panic!("terminate() future hung"),
3254 Ok(Ok(st)) => {
3255 match st {
3256 ProcStatus::Stopped { exit_code, .. } => {
3257 assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3259 }
3260 ProcStatus::Killed { signal, .. } => {
3261 assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3279 }
3280 other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3281 }
3282 }
3283 Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3284 }
3285 }
3286
3287 #[tokio::test]
3288 #[cfg(fbcode_build)]
3289 async fn bootstrap_handle_kill_forced() {
3290 let root =
3292 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3293 let (instance, _handle) = root.instance("client").unwrap();
3294
3295 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3296
3297 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3299
3300 let handle = mgr
3302 .spawn(
3303 proc_id.clone(),
3304 backend_addr.clone(),
3305 BootstrapProcConfig {
3306 create_rank: 0,
3307 client_config_override: Attrs::new(),
3308 },
3309 )
3310 .await
3311 .expect("spawn bootstrap child");
3312
3313 handle.ready().await.expect("ready");
3316
3317 let deadline = Duration::from_secs(5);
3320 match tokio::time::timeout(deadline, handle.kill()).await {
3321 Err(_) => panic!("kill() future hung"),
3322 Ok(Ok(st)) => {
3323 match st {
3325 ProcStatus::Killed { signal, .. } => {
3326 assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3328 }
3329 other => panic!("expected Killed status after kill(); got: {other:?}"),
3330 }
3331 }
3332 Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3333 }
3334 }
3335
3336 #[tokio::test]
3337 #[cfg(fbcode_build)]
3338 async fn bootstrap_canonical_simple() {
3339 unsafe {
3341 std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3342 }
3343 let instance = testing::instance();
3346
3347 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3349 "monarch/hyperactor_mesh/bootstrap",
3350 )));
3351 let alloc = allocator
3353 .allocate(AllocSpec {
3354 extent: extent!(replicas = 1),
3355 constraints: Default::default(),
3356 proc_name: None,
3357 transport: ChannelTransport::Unix,
3358 proc_allocation_mode: Default::default(),
3359 })
3360 .await
3361 .unwrap();
3362
3363 let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3390 .await
3391 .unwrap();
3392
3393 let proc_mesh = host_mesh
3408 .spawn(&instance, "p0", Extent::unity())
3409 .await
3410 .unwrap();
3411
3412 let actor_mesh: ActorMesh<testactor::TestActor> =
3428 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3429
3430 let (port, mut rx) = instance.mailbox().open_port();
3436 actor_mesh
3437 .cast(&instance, testactor::GetActorId(port.bind()))
3438 .unwrap();
3439 let (got_id, _seq) = rx.recv().await.unwrap();
3440 assert_eq!(
3441 got_id,
3442 actor_mesh.values().next().unwrap().actor_id().clone()
3443 );
3444
3445 host_mesh.shutdown(&instance).await.expect("host shutdown");
3449 }
3450
3451 #[tokio::test]
3454 #[cfg(all(fbcode_build, target_os = "linux"))]
3455 async fn bootstrap_canonical_simple_systemd_launcher() {
3456 let config = hyperactor_config::global::lock();
3458 let _guard = config.override_key(MESH_PROC_LAUNCHER_KIND, "systemd".to_string());
3459
3460 let instance = testing::instance();
3463
3464 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3466 "monarch/hyperactor_mesh/bootstrap",
3467 )));
3468 let alloc = allocator
3471 .allocate(AllocSpec {
3472 extent: extent!(replicas = 1),
3473 constraints: Default::default(),
3474 proc_name: None,
3475 transport: ChannelTransport::Unix,
3476 proc_allocation_mode: Default::default(),
3477 })
3478 .await
3479 .unwrap();
3480
3481 let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3484 .await
3485 .unwrap();
3486
3487 let proc_mesh = host_mesh
3489 .spawn(&instance, "p0", Extent::unity())
3490 .await
3491 .unwrap();
3492
3493 let actor_mesh: ActorMesh<testactor::TestActor> =
3496 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3497
3498 let (port, mut rx) = instance.mailbox().open_port();
3500 actor_mesh
3501 .cast(&instance, testactor::GetActorId(port.bind()))
3502 .unwrap();
3503 let (got_id, _) = rx.recv().await.unwrap();
3504 assert_eq!(
3505 got_id,
3506 actor_mesh.values().next().unwrap().actor_id().clone()
3507 );
3508
3509 use crate::proc_launcher::SystemdProcLauncher;
3511 use crate::systemd::SystemdManagerProxy;
3512 use crate::systemd::SystemdUnitProxy;
3513
3514 let proc_id = proc_mesh.proc_ids().next().expect("one proc");
3515 let expected_unit = SystemdProcLauncher::unit_name(&proc_id);
3516
3517 host_mesh.shutdown(&instance).await.expect("host shutdown");
3526
3527 let conn = zbus::Connection::session().await.expect("D-Bus session");
3533 let manager = SystemdManagerProxy::new(&conn)
3534 .await
3535 .expect("manager proxy");
3536
3537 let mut ok = false;
3538 for _ in 0..100 {
3539 match manager.get_unit(&expected_unit).await {
3540 Err(_) => {
3541 ok = true;
3543 break;
3544 }
3545 Ok(path) => {
3546 if let Ok(unit) = SystemdUnitProxy::builder(&conn)
3547 .path(path)
3548 .unwrap()
3549 .build()
3550 .await
3551 {
3552 let active = unit.active_state().await.unwrap_or_default();
3553 let sub = unit.sub_state().await.unwrap_or_default();
3554 if !(active == "active" && sub == "running") && active != "activating" {
3557 ok = true;
3558 break;
3559 }
3560 }
3561 }
3562 }
3563 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3564 }
3565 assert!(
3566 ok,
3567 "unit should be gone or quiescent (not running) after shutdown"
3568 );
3569 }
3570
3571 #[tokio::test]
3572 #[cfg(fbcode_build)]
3573 async fn test_host_bootstrap() {
3574 use crate::host_mesh::host_agent::GetLocalProcClient;
3575 use crate::proc_agent::NewClientInstanceClient;
3576
3577 let temp_proc = Proc::local();
3580 let (temp_instance, _) = temp_proc.instance("temp").unwrap();
3581
3582 let handle = host(
3583 ChannelAddr::any(ChannelTransport::Unix),
3584 Some(BootstrapCommand::test()),
3585 None,
3586 false,
3587 )
3588 .await
3589 .unwrap();
3590
3591 let local_proc = handle.0.get_local_proc(&temp_instance).await.unwrap();
3592 let _local_instance = local_proc
3593 .new_client_instance(&temp_instance)
3594 .await
3595 .unwrap();
3596 }
3597
3598 use std::time::Duration;
3604
3605 use crate::proc_launcher::LaunchOptions;
3606 use crate::proc_launcher::LaunchResult;
3607 use crate::proc_launcher::ProcExitKind;
3608 use crate::proc_launcher::ProcExitResult;
3609 use crate::proc_launcher::ProcLauncher;
3610 use crate::proc_launcher::ProcLauncherError;
3611 use crate::proc_launcher::StdioHandling;
3612
3613 #[allow(dead_code)]
3616 struct DummyLauncher {
3617 marker: u64,
3619 }
3620
3621 impl DummyLauncher {
3622 fn new(marker: u64) -> Self {
3623 Self { marker }
3624 }
3625
3626 #[allow(dead_code)]
3627 fn marker(&self) -> u64 {
3628 self.marker
3629 }
3630 }
3631
3632 #[async_trait::async_trait]
3633 impl ProcLauncher for DummyLauncher {
3634 async fn launch(
3635 &self,
3636 _proc_id: &hyperactor_reference::ProcId,
3637 _opts: LaunchOptions,
3638 ) -> Result<LaunchResult, ProcLauncherError> {
3639 let (tx, rx) = tokio::sync::oneshot::channel();
3640 let _ = tx.send(ProcExitResult {
3642 kind: ProcExitKind::Exited { code: 0 },
3643 stderr_tail: Some(vec![]),
3644 });
3645 Ok(LaunchResult {
3646 pid: None,
3647 started_at: std::time::SystemTime::now(),
3648 stdio: StdioHandling::ManagedByLauncher,
3649 exit_rx: rx,
3650 })
3651 }
3652
3653 async fn terminate(
3654 &self,
3655 _proc_id: &hyperactor_reference::ProcId,
3656 _timeout: Duration,
3657 ) -> Result<(), ProcLauncherError> {
3658 Ok(())
3659 }
3660
3661 async fn kill(
3662 &self,
3663 _proc_id: &hyperactor_reference::ProcId,
3664 ) -> Result<(), ProcLauncherError> {
3665 Ok(())
3666 }
3667 }
3668
3669 #[test]
3671 #[cfg(fbcode_build)]
3672 fn test_set_launcher_then_get() {
3673 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3674
3675 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(42));
3676 let custom_ptr = Arc::as_ptr(&custom);
3677
3678 manager.set_launcher(custom).unwrap();
3680
3681 let got = manager.launcher();
3683 let got_ptr = Arc::as_ptr(got);
3684
3685 assert_eq!(
3686 custom_ptr, got_ptr,
3687 "launcher() should return the same Arc that was set"
3688 );
3689 }
3690
3691 #[test]
3694 #[cfg(fbcode_build)]
3695 fn test_get_launcher_then_set_fails() {
3696 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3697
3698 let _ = manager.launcher();
3700
3701 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(99));
3703 let result = manager.set_launcher(custom);
3704
3705 assert!(
3706 result.is_err(),
3707 "set_launcher should fail after launcher() was called"
3708 );
3709
3710 let err = result.unwrap_err();
3712 let err_msg = err.to_string();
3713 assert!(
3714 err_msg.contains("already initialized"),
3715 "error should mention 'already initialized', got: {}",
3716 err_msg
3717 );
3718 }
3719
3720 #[test]
3722 #[cfg(fbcode_build)]
3723 fn test_set_launcher_twice_fails() {
3724 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3725
3726 let first: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(1));
3727 let second: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(2));
3728
3729 manager.set_launcher(first).unwrap();
3731
3732 let result = manager.set_launcher(second);
3734 assert!(result.is_err(), "second set_launcher should fail");
3735
3736 let err = result.unwrap_err();
3738 let err_msg = err.to_string();
3739 assert!(
3740 err_msg.contains("already initialized"),
3741 "error should mention 'already initialized', got: {}",
3742 err_msg
3743 );
3744 }
3745
3746 #[test]
3748 #[cfg(fbcode_build)]
3749 fn test_launcher_initially_empty() {
3750 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3751
3752 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(123));
3756 let result = manager.set_launcher(custom);
3757
3758 assert!(
3759 result.is_ok(),
3760 "set_launcher should succeed on fresh manager"
3761 );
3762 }
3763
3764 #[test]
3766 #[cfg(fbcode_build)]
3767 fn test_launcher_idempotent() {
3768 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3769
3770 let first = manager.launcher();
3772 let second = manager.launcher();
3773
3774 assert!(
3776 Arc::ptr_eq(first, second),
3777 "launcher() should return the same Arc on repeated calls"
3778 );
3779 }
3780}