1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::env::VarError;
18use std::fmt;
19use std::fs::OpenOptions;
20use std::future;
21use std::io;
22use std::io::Write;
23use std::path::Path;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::sync::OnceLock;
29use std::sync::Weak;
30use std::time::Duration;
31use std::time::SystemTime;
32
33use async_trait::async_trait;
34use base64::prelude::*;
35use futures::StreamExt;
36use futures::stream;
37use humantime::format_duration;
38use hyperactor::ActorHandle;
39use hyperactor::channel;
40use hyperactor::channel::ChannelAddr;
41use hyperactor::channel::ChannelError;
42use hyperactor::channel::ChannelTransport;
43use hyperactor::channel::Rx;
44use hyperactor::channel::Tx;
45use hyperactor::context;
46use hyperactor::host::Host;
47use hyperactor::host::HostError;
48use hyperactor::host::ProcHandle;
49use hyperactor::host::ProcManager;
50use hyperactor::host::TerminateSummary;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::mailbox::MailboxServerHandle;
55use hyperactor::proc::Proc;
56use hyperactor::reference as hyperactor_reference;
57use hyperactor_config::CONFIG;
58use hyperactor_config::ConfigAttr;
59use hyperactor_config::attrs::Attrs;
60use hyperactor_config::attrs::declare_attrs;
61use hyperactor_config::global::override_or_global;
62use serde::Deserialize;
63use serde::Serialize;
64use tempfile::TempDir;
65use tokio::process::Command;
66use tokio::sync::oneshot;
67use tokio::sync::watch;
68use tracing::Instrument;
69use tracing::Level;
70use typeuri::Named;
71
72use crate::config::MESH_PROC_LAUNCHER_KIND;
73use crate::host_mesh::host_agent::HostAgent;
74use crate::host_mesh::host_agent::HostAgentMode;
75use crate::logging::OutputTarget;
76use crate::logging::StreamFwder;
77use crate::proc_agent::ProcAgent;
78use crate::proc_launcher::LaunchOptions;
79use crate::proc_launcher::NativeProcLauncher;
80use crate::proc_launcher::ProcExitKind;
81use crate::proc_launcher::ProcExitResult;
82use crate::proc_launcher::ProcLauncher;
83use crate::proc_launcher::ProcLauncherError;
84use crate::proc_launcher::StdioHandling;
85#[cfg(target_os = "linux")]
86use crate::proc_launcher::SystemdProcLauncher;
87use crate::proc_launcher::format_process_name;
88use crate::resource;
89
90mod mailbox;
91
92declare_attrs! {
93 @meta(CONFIG = ConfigAttr::new(
109 Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
110 Some("enable_log_forwarding".to_string()),
111 ))
112 pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
113
114 @meta(CONFIG = ConfigAttr::new(
134 Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
135 Some("enable_file_capture".to_string()),
136 ))
137 pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
138
139 @meta(CONFIG = ConfigAttr::new(
143 Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
144 Some("tail_log_lines".to_string()),
145 ))
146 pub attr MESH_TAIL_LOG_LINES: usize = 0;
147
148 @meta(CONFIG = ConfigAttr::new(
155 Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
156 Some("mesh_bootstrap_enable_pdeathsig".to_string()),
157 ))
158 pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
159
160 @meta(CONFIG = ConfigAttr::new(
165 Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
166 Some("mesh_terminate_concurrency".to_string()),
167 ))
168 pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
169
170 @meta(CONFIG = ConfigAttr::new(
174 Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
175 Some("mesh_terminate_timeout".to_string()),
176 ))
177 pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
178}
179
180pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
181pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
182pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
183
184pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
188
189pub(crate) const BOOTSTRAP_MODE_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_MODE";
190pub(crate) const PROCESS_NAME_ENV: &str = "HYPERACTOR_PROCESS_NAME";
191
192#[derive(Debug, Clone, Serialize, Deserialize, Named)]
196pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
197wirevalue::register_type!(Process2Allocator);
198
199#[derive(Debug, Clone, Serialize, Deserialize, Named)]
201pub(crate) enum Process2AllocatorMessage {
202 Hello(ChannelAddr),
206
207 StartedProc(
212 hyperactor_reference::ProcId,
213 hyperactor_reference::ActorRef<ProcAgent>,
214 ChannelAddr,
215 ),
216
217 Heartbeat,
218}
219wirevalue::register_type!(Process2AllocatorMessage);
220
221#[derive(Debug, Clone, Serialize, Deserialize, Named)]
223pub(crate) enum Allocator2Process {
224 StartProc(hyperactor_reference::ProcId, ChannelTransport),
227
228 StopAndExit(i32),
231
232 Exit(i32),
235}
236wirevalue::register_type!(Allocator2Process);
237
238async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
239 let tx = match channel::dial(bootstrap_addr.clone()) {
240 Ok(tx) => tx,
241
242 Err(err) => {
243 tracing::error!(
244 "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
245 bootstrap_addr,
246 err
247 );
248 std::process::exit(1);
249 }
250 };
251 tracing::info!(
252 "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
253 );
254 loop {
255 tokio::time::sleep(Duration::from_secs(5)).await;
256
257 let result = tx
258 .send(Process2Allocator(
259 bootstrap_index,
260 Process2AllocatorMessage::Heartbeat,
261 ))
262 .await;
263
264 if let Err(err) = result {
265 tracing::error!(
266 "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
267 bootstrap_addr,
268 err
269 );
270 std::process::exit(1);
271 }
272 }
273}
274
275#[macro_export]
276macro_rules! ok {
277 ($expr:expr $(,)?) => {
278 match $expr {
279 Ok(value) => value,
280 Err(e) => return ::anyhow::Error::from(e),
281 }
282 };
283}
284
285async fn halt<R>() -> R {
286 future::pending::<()>().await;
287 unreachable!()
288}
289
290pub struct HostShutdownHandle {
300 rx: tokio::sync::oneshot::Receiver<MailboxServerHandle>,
301 exit_on_shutdown: bool,
302}
303
304impl HostShutdownHandle {
305 pub async fn join(self) {
308 match self.rx.await {
309 Ok(mailbox_handle) => {
310 mailbox_handle.stop("host shutting down");
311 let _ = mailbox_handle.await;
312 }
313 Err(_) => {} }
315 if self.exit_on_shutdown {
316 std::process::exit(0);
317 }
318 }
319}
320
321pub async fn host(
331 addr: ChannelAddr,
332 command: Option<BootstrapCommand>,
333 config: Option<Attrs>,
334 exit_on_shutdown: bool,
335) -> anyhow::Result<(ActorHandle<HostAgent>, HostShutdownHandle)> {
336 if let Some(attrs) = config {
337 hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
338 tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
339 } else {
340 tracing::debug!("bootstrap: no config snapshot provided (Host)");
341 }
342
343 let command = match command {
344 Some(command) => command,
345 None => BootstrapCommand::current()?,
346 };
347 let manager = BootstrapProcManager::new(command)?;
348
349 let host = Host::new(manager, addr).await?;
350 let addr = host.addr().clone();
351
352 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<MailboxServerHandle>();
357
358 let system_proc = host.system_proc().clone();
359 let host_mesh_agent = system_proc.spawn::<HostAgent>(
360 "host_agent",
361 HostAgent::new(HostAgentMode::Process {
362 host,
363 shutdown_tx: Some(shutdown_tx),
364 }),
365 )?;
366
367 tracing::info!(
368 "serving host at {}, agent: {}",
369 addr,
370 host_mesh_agent.bind::<HostAgent>()
371 );
372
373 Ok((
374 host_mesh_agent,
375 HostShutdownHandle {
376 rx: shutdown_rx,
377 exit_on_shutdown,
378 },
379 ))
380}
381
382#[derive(Clone, Debug, Serialize, Deserialize)]
391pub enum Bootstrap {
392 Proc {
394 proc_id: hyperactor_reference::ProcId,
396 backend_addr: ChannelAddr,
399 callback_addr: ChannelAddr,
401 socket_dir_path: PathBuf,
405 config: Option<Attrs>,
410 },
411
412 Host {
415 addr: ChannelAddr,
417 command: Option<BootstrapCommand>,
420 config: Option<Attrs>,
425 exit_on_shutdown: bool,
427 },
428
429 V0ProcMesh {
431 config: Option<Attrs>,
436 },
437}
438
439impl Default for Bootstrap {
440 fn default() -> Self {
441 Bootstrap::V0ProcMesh { config: None }
442 }
443}
444
445impl Bootstrap {
446 #[allow(clippy::result_large_err)]
449 pub(crate) fn to_env_safe_string(&self) -> crate::Result<String> {
450 Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
451 }
452
453 #[allow(clippy::result_large_err)]
455 pub(crate) fn from_env_safe_string(str: &str) -> crate::Result<Self> {
456 let data = BASE64_STANDARD.decode(str)?;
457 let data = std::str::from_utf8(&data)?;
458 Ok(serde_json::from_str(data)?)
459 }
460
461 pub fn get_from_env() -> anyhow::Result<Option<Self>> {
464 match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
465 Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
466 Ok(mode) => Ok(Some(mode)),
467 Err(e) => {
468 Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
469 }
470 },
471 Err(VarError::NotPresent) => Ok(None),
472 Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
473 }
474 }
475
476 pub fn to_env(&self, cmd: &mut Command) {
478 cmd.env(
479 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
480 self.to_env_safe_string().unwrap(),
481 );
482 }
483
484 pub async fn bootstrap(self) -> anyhow::Result<i32> {
488 tracing::info!(
489 "bootstrapping mesh process: {}",
490 serde_json::to_string(&self).unwrap()
491 );
492
493 if Debug::is_active() {
494 let mut buf = Vec::new();
495 writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
496 #[cfg(unix)]
497 writeln!(
498 &mut buf,
499 "\tparent pid: {}",
500 std::os::unix::process::parent_id()
501 )
502 .unwrap();
503 writeln!(
504 &mut buf,
505 "\tconfig: {}",
506 serde_json::to_string(&self).unwrap()
507 )
508 .unwrap();
509 match std::env::current_exe() {
510 Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
511 Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
512 }
513 writeln!(&mut buf, "\targs:").unwrap();
514 for arg in std::env::args() {
515 writeln!(&mut buf, "\t\t{}", arg).unwrap();
516 }
517 writeln!(&mut buf, "\tenv:").unwrap();
518 for (key, val) in std::env::vars() {
519 writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
520 }
521 let _ = Debug.write(&buf);
522 if let Ok(s) = std::str::from_utf8(&buf) {
523 tracing::info!("{}", s);
524 } else {
525 tracing::info!("{:?}", buf);
526 }
527 }
528
529 match self {
530 Bootstrap::Proc {
531 proc_id,
532 backend_addr,
533 callback_addr,
534 socket_dir_path,
535 config,
536 } => {
537 let entered = tracing::span!(
538 Level::INFO,
539 "proc_bootstrap",
540 %proc_id,
541 %backend_addr,
542 %callback_addr,
543 socket_dir_path = %socket_dir_path.display(),
544 )
545 .entered();
546 if let Some(attrs) = config {
547 hyperactor_config::global::set(
548 hyperactor_config::global::Source::ClientOverride,
549 attrs,
550 );
551 tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
552 } else {
553 tracing::debug!("bootstrap: no config snapshot provided (Proc)");
554 }
555
556 if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
557 let _ = install_pdeathsig_kill();
562 } else {
563 eprintln!("(bootstrap) PDEATHSIG disabled via config");
564 }
565
566 let (local_addr, name) = (proc_id.addr().clone(), proc_id.name());
567 let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
569 let serve_addr = serve_addr.parse().unwrap();
570
571 let proc_sender = mailbox::LocalProcDialer::new(
576 local_addr.clone(),
577 socket_dir_path,
578 MailboxClient::dial(backend_addr)?,
579 );
580
581 let proc = Proc::configured(proc_id.clone(), proc_sender.into_boxed());
582
583 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<i32>();
584 let agent_handle = ProcAgent::boot_v1(proc.clone(), Some(shutdown_tx))
585 .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
586
587 let span = entered.exit();
588
589 let (proc_addr, proc_rx) = channel::serve(serve_addr)?;
592 let mailbox_handle = proc.clone().serve(proc_rx);
593 channel::dial(callback_addr)?
594 .send((proc_addr, agent_handle.bind::<ProcAgent>()))
595 .instrument(span)
596 .await
597 .map_err(ChannelError::from)?;
598
599 let exit_code = shutdown_rx.await.unwrap_or(1);
602 mailbox_handle.stop("process shutting down");
603 let _ = mailbox_handle.await;
604 tracing::info!("bootstrap shutting down with exit code {}", exit_code);
605 Ok(exit_code)
608 }
609 Bootstrap::Host {
610 addr,
611 command,
612 config,
613 exit_on_shutdown,
614 } => {
615 let (_agent_handle, shutdown) =
616 host(addr, command, config, exit_on_shutdown).await?;
617 shutdown.join().await;
618 halt().await
619 }
620 Bootstrap::V0ProcMesh { config } => Err(bootstrap_v0_proc_mesh(config).await),
621 }
622 }
623
624 pub async fn bootstrap_or_die(self) -> ! {
627 let exit_code = match self.bootstrap().await {
628 Ok(exit_code) => exit_code,
629 Err(err) => {
630 tracing::error!("failed to bootstrap mesh process: {}", err);
631 1
632 }
633 };
634 std::process::exit(exit_code);
635 }
636}
637
638pub fn install_pdeathsig_kill() -> io::Result<()> {
640 #[cfg(target_os = "linux")]
641 {
642 let ppid_before = unsafe { libc::getppid() };
645
646 let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
649 if rc != 0 {
650 return Err(io::Error::last_os_error());
651 }
652
653 let ppid_after = unsafe { libc::getppid() };
663 if ppid_before != ppid_after {
664 std::process::exit(0);
665 }
666 }
667 Ok(())
668}
669
670#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
685pub enum ProcStatus {
686 Starting,
689 Running { started_at: SystemTime },
692 Ready {
695 started_at: SystemTime,
696 addr: ChannelAddr,
697 agent: hyperactor_reference::ActorRef<ProcAgent>,
698 },
699 Stopping { started_at: SystemTime },
703 Stopped {
706 exit_code: i32,
707 stderr_tail: Vec<String>,
708 },
709 Killed { signal: i32, core_dumped: bool },
712 Failed { reason: String },
716}
717
718impl ProcStatus {
719 #[inline]
723 pub fn is_exit(&self) -> bool {
724 matches!(
725 self,
726 ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
727 )
728 }
729}
730
731impl std::fmt::Display for ProcStatus {
732 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733 match self {
734 ProcStatus::Starting => write!(f, "Starting"),
735 ProcStatus::Running { started_at } => {
736 let uptime = started_at
737 .elapsed()
738 .map(|d| format!(" up {}", format_duration(d)))
739 .unwrap_or_default();
740 write!(f, "Running{uptime}")
741 }
742 ProcStatus::Ready {
743 started_at, addr, ..
744 } => {
745 let uptime = started_at
746 .elapsed()
747 .map(|d| format!(" up {}", format_duration(d)))
748 .unwrap_or_default();
749 write!(f, "Ready at {addr}{uptime}")
750 }
751 ProcStatus::Stopping { started_at } => {
752 let uptime = started_at
753 .elapsed()
754 .map(|d| format!(" up {}", format_duration(d)))
755 .unwrap_or_default();
756 write!(f, "Stopping{uptime}")
757 }
758 ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
759 ProcStatus::Killed {
760 signal,
761 core_dumped,
762 } => {
763 if *core_dumped {
764 write!(f, "Killed(sig={signal}, core)")
765 } else {
766 write!(f, "Killed(sig={signal})")
767 }
768 }
769 ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
770 }
771 }
772}
773
774#[derive(Debug, Clone)]
776pub enum ReadyError {
777 Terminal(ProcStatus),
779 ChannelClosed,
781}
782
783impl std::fmt::Display for ReadyError {
784 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
785 match self {
786 ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
787 ReadyError::ChannelClosed => write!(f, "status channel closed"),
788 }
789 }
790}
791impl std::error::Error for ReadyError {}
792
793#[derive(Clone)]
832pub struct BootstrapProcHandle {
833 proc_id: hyperactor_reference::ProcId,
835
836 status: Arc<std::sync::Mutex<ProcStatus>>,
842
843 launcher: Weak<dyn ProcLauncher>,
852
853 stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
858
859 stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
862
863 tx: tokio::sync::watch::Sender<ProcStatus>,
868
869 rx: tokio::sync::watch::Receiver<ProcStatus>,
873}
874
875impl fmt::Debug for BootstrapProcHandle {
876 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877 let status = self.status.lock().expect("status mutex poisoned").clone();
878 f.debug_struct("BootstrapProcHandle")
879 .field("proc_id", &self.proc_id)
880 .field("status", &status)
881 .field("launcher", &"<dyn ProcLauncher>")
882 .field("tx", &"<watch::Sender>")
883 .field("rx", &"<watch::Receiver>")
884 .finish()
887 }
888}
889
890impl BootstrapProcHandle {
892 pub(crate) fn new(
903 proc_id: hyperactor_reference::ProcId,
904 launcher: Weak<dyn ProcLauncher>,
905 ) -> Self {
906 let (tx, rx) = watch::channel(ProcStatus::Starting);
907 Self {
908 proc_id,
909 status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
910 launcher,
911 stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
912 stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
913 tx,
914 rx,
915 }
916 }
917
918 #[inline]
920 pub fn proc_id(&self) -> &hyperactor_reference::ProcId {
921 &self.proc_id
922 }
923
924 #[inline]
937 pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
938 self.rx.clone()
939 }
940
941 #[inline]
959 pub async fn changed(&self) {
960 let _ = self.watch().changed().await;
961 }
962
963 #[must_use]
976 pub fn status(&self) -> ProcStatus {
977 self.status.lock().expect("status mutex poisoned").clone()
981 }
982
983 fn transition<F>(&self, f: F) -> bool
989 where
990 F: FnOnce(&mut ProcStatus) -> bool,
991 {
992 let mut guard = self.status.lock().expect("status mutex poisoned");
993 let _before = guard.clone();
994 let changed = f(&mut guard);
995 if changed {
996 let _ = self.tx.send(guard.clone());
998 }
999 changed
1000 }
1001
1002 pub(crate) fn mark_running(&self, started_at: SystemTime) -> bool {
1012 self.transition(|st| match *st {
1013 ProcStatus::Starting => {
1014 *st = ProcStatus::Running { started_at };
1015 true
1016 }
1017 _ => {
1018 tracing::warn!(
1019 "illegal transition: {:?} -> Running; leaving status unchanged",
1020 *st
1021 );
1022 false
1023 }
1024 })
1025 }
1026
1027 pub(crate) fn mark_ready(
1039 &self,
1040 addr: ChannelAddr,
1041 agent: hyperactor_reference::ActorRef<ProcAgent>,
1042 ) -> bool {
1043 tracing::info!(proc_id = %self.proc_id, %addr, "{} ready at {}", self.proc_id, addr);
1044 self.transition(|st| match st {
1045 ProcStatus::Starting => {
1046 *st = ProcStatus::Ready {
1049 started_at: std::time::SystemTime::now(),
1050 addr,
1051 agent,
1052 };
1053 true
1054 }
1055 ProcStatus::Running { started_at } => {
1056 let started_at = *started_at;
1057 *st = ProcStatus::Ready {
1058 started_at,
1059 addr,
1060 agent,
1061 };
1062 true
1063 }
1064 _ => {
1065 tracing::warn!(
1066 "illegal transition: {:?} -> Ready; leaving status unchanged",
1067 st
1068 );
1069 false
1070 }
1071 })
1072 }
1073
1074 pub(crate) fn mark_stopping(&self) -> bool {
1078 let now = std::time::SystemTime::now();
1079
1080 self.transition(|st| match *st {
1081 ProcStatus::Running { started_at } => {
1082 *st = ProcStatus::Stopping { started_at };
1083 true
1084 }
1085 ProcStatus::Ready { started_at, .. } => {
1086 *st = ProcStatus::Stopping { started_at };
1087 true
1088 }
1089 ProcStatus::Starting => {
1090 *st = ProcStatus::Stopping { started_at: now };
1091 true
1092 }
1093 _ => false,
1094 })
1095 }
1096
1097 pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1100 self.transition(|st| match *st {
1101 ProcStatus::Starting
1102 | ProcStatus::Running { .. }
1103 | ProcStatus::Ready { .. }
1104 | ProcStatus::Stopping { .. } => {
1105 *st = ProcStatus::Stopped {
1106 exit_code,
1107 stderr_tail,
1108 };
1109 true
1110 }
1111 _ => {
1112 tracing::warn!(
1113 "illegal transition: {:?} -> Stopped; leaving status unchanged",
1114 *st
1115 );
1116 false
1117 }
1118 })
1119 }
1120
1121 pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1124 self.transition(|st| match *st {
1125 ProcStatus::Starting
1126 | ProcStatus::Running { .. }
1127 | ProcStatus::Ready { .. }
1128 | ProcStatus::Stopping { .. } => {
1129 *st = ProcStatus::Killed {
1130 signal,
1131 core_dumped,
1132 };
1133 true
1134 }
1135 _ => {
1136 tracing::warn!(
1137 "illegal transition: {:?} -> Killed; leaving status unchanged",
1138 *st
1139 );
1140 false
1141 }
1142 })
1143 }
1144
1145 pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1148 self.transition(|st| match *st {
1149 ProcStatus::Starting
1150 | ProcStatus::Running { .. }
1151 | ProcStatus::Ready { .. }
1152 | ProcStatus::Stopping { .. } => {
1153 *st = ProcStatus::Failed {
1154 reason: reason.into(),
1155 };
1156 true
1157 }
1158 _ => {
1159 tracing::warn!(
1160 "illegal transition: {:?} -> Failed; leaving status unchanged",
1161 *st
1162 );
1163 false
1164 }
1165 })
1166 }
1167
1168 #[must_use]
1187 pub async fn wait_inner(&self) -> ProcStatus {
1188 let mut rx = self.watch();
1189 loop {
1190 let st = rx.borrow().clone();
1191 if st.is_exit() {
1192 return st;
1193 }
1194 if rx.changed().await.is_err() {
1196 return st;
1197 }
1198 }
1199 }
1200
1201 pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1220 let mut rx = self.watch();
1221 loop {
1222 let st = rx.borrow().clone();
1223 match &st {
1224 ProcStatus::Ready { .. } => return Ok(()),
1225 s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1226 _non_terminal => {
1227 if rx.changed().await.is_err() {
1228 return Err(ReadyError::ChannelClosed);
1229 }
1230 }
1231 }
1232 }
1233 }
1234
1235 pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1236 *self
1237 .stdout_fwder
1238 .lock()
1239 .expect("stdout_tailer mutex poisoned") = out;
1240 *self
1241 .stderr_fwder
1242 .lock()
1243 .expect("stderr_tailer mutex poisoned") = err;
1244 }
1245
1246 fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1247 let out = self
1248 .stdout_fwder
1249 .lock()
1250 .expect("stdout_tailer mutex poisoned")
1251 .take();
1252 let err = self
1253 .stderr_fwder
1254 .lock()
1255 .expect("stderr_tailer mutex poisoned")
1256 .take();
1257 (out, err)
1258 }
1259
1260 pub(crate) async fn wait_or_brutally_kill(&self, timeout: Duration) {
1267 match tokio::time::timeout(timeout, self.wait_inner()).await {
1268 Ok(st) if st.is_exit() => return,
1269 _ => {}
1270 }
1271
1272 let _ = self.mark_stopping();
1273
1274 if let Some(launcher) = self.launcher.upgrade() {
1275 if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1276 tracing::warn!(
1277 proc_id = %self.proc_id,
1278 error = %e,
1279 "wait_or_brutally_kill: launcher terminate failed, trying kill"
1280 );
1281 let _ = launcher.kill(&self.proc_id).await;
1282 }
1283 }
1284
1285 let _ = self.wait_inner().await;
1286 }
1287
1288 async fn send_stop_all(
1292 &self,
1293 cx: &impl context::Actor,
1294 agent: hyperactor_reference::ActorRef<ProcAgent>,
1295 timeout: Duration,
1296 reason: &str,
1297 ) -> anyhow::Result<ProcStatus> {
1298 let mut agent_port = agent.port();
1305 agent_port.return_undeliverable(false);
1306 agent_port.send(
1307 cx,
1308 resource::StopAll {
1309 reason: reason.to_string(),
1310 },
1311 )?;
1312 match tokio::time::timeout(timeout, self.wait()).await {
1315 Ok(Ok(st)) => Ok(st),
1316 Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1317 Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1318 }
1319 }
1320}
1321
1322#[async_trait]
1323impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1324 type Agent = ProcAgent;
1325 type TerminalStatus = ProcStatus;
1326
1327 #[inline]
1328 fn proc_id(&self) -> &hyperactor_reference::ProcId {
1329 &self.proc_id
1330 }
1331
1332 #[inline]
1333 fn addr(&self) -> Option<ChannelAddr> {
1334 match &*self.status.lock().expect("status mutex poisoned") {
1335 ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1336 _ => None,
1337 }
1338 }
1339
1340 #[inline]
1341 fn agent_ref(&self) -> Option<hyperactor_reference::ActorRef<Self::Agent>> {
1342 match &*self.status.lock().expect("status mutex poisoned") {
1343 ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1344 _ => None,
1345 }
1346 }
1347
1348 async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1359 match self.ready_inner().await {
1360 Ok(()) => Ok(()),
1361 Err(ReadyError::Terminal(status)) => {
1362 Err(hyperactor::host::ReadyError::Terminal(status))
1363 }
1364 Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1365 }
1366 }
1367
1368 async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1376 let status = self.wait_inner().await;
1377 if status.is_exit() {
1378 Ok(status)
1379 } else {
1380 Err(hyperactor::host::WaitError::ChannelClosed)
1381 }
1382 }
1383
1384 async fn terminate(
1405 &self,
1406 cx: &impl context::Actor,
1407 timeout: Duration,
1408 reason: &str,
1409 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1410 let st0 = self.status();
1412 if st0.is_exit() {
1413 tracing::debug!(?st0, "terminate(): already terminal");
1414 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1415 }
1416
1417 let agent = self.agent_ref();
1420 if let Some(agent) = agent {
1421 match self.send_stop_all(cx, agent.clone(), timeout, reason).await {
1422 Ok(st) => return Ok(st),
1423 Err(e) => {
1424 tracing::warn!(
1426 "ProcAgent {} could not successfully stop all actors: {}",
1427 agent.actor_id(),
1428 e,
1429 );
1430 }
1431 }
1432 }
1433
1434 let _ = self.mark_stopping();
1436
1437 tracing::info!(proc_id = %self.proc_id, ?timeout, "terminate(): delegating to launcher");
1439 if let Some(launcher) = self.launcher.upgrade() {
1440 if let Err(e) = launcher.terminate(&self.proc_id, timeout).await {
1441 tracing::warn!(proc_id = %self.proc_id, error=%e, "terminate(): launcher termination failed");
1442 return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1443 "launcher termination failed: {}",
1444 e
1445 )));
1446 }
1447 } else {
1448 tracing::debug!(proc_id = %self.proc_id, "terminate(): launcher gone, proc cleanup in progress");
1450 }
1451
1452 let st = self.wait_inner().await;
1454 if st.is_exit() {
1455 tracing::info!(proc_id = %self.proc_id, ?st, "terminate(): exited");
1456 Ok(st)
1457 } else {
1458 Err(hyperactor::host::TerminateError::ChannelClosed)
1459 }
1460 }
1461
1462 async fn kill(
1480 &self,
1481 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1482 let st0 = self.status();
1484 if st0.is_exit() {
1485 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1486 }
1487
1488 tracing::info!(proc_id = %self.proc_id, "kill(): delegating to launcher");
1490 if let Some(launcher) = self.launcher.upgrade() {
1491 if let Err(e) = launcher.kill(&self.proc_id).await {
1492 tracing::warn!(proc_id = %self.proc_id, error=%e, "kill(): launcher kill failed");
1493 return Err(hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1494 "launcher kill failed: {}",
1495 e
1496 )));
1497 }
1498 } else {
1499 tracing::debug!(proc_id = %self.proc_id, "kill(): launcher gone, proc cleanup in progress");
1501 }
1502
1503 let st = self.wait_inner().await;
1505 if st.is_exit() {
1506 Ok(st)
1507 } else {
1508 Err(hyperactor::host::TerminateError::ChannelClosed)
1509 }
1510 }
1511}
1512
1513#[derive(Debug, Named, Serialize, Deserialize, Clone, Default)]
1515pub struct BootstrapCommand {
1516 pub program: PathBuf,
1517 pub arg0: Option<String>,
1518 pub args: Vec<String>,
1519 pub env: HashMap<String, String>,
1520}
1521wirevalue::register_type!(BootstrapCommand);
1522
1523impl std::hash::Hash for BootstrapCommand {
1524 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1525 self.program.hash(state);
1526 self.arg0.hash(state);
1527 self.args.hash(state);
1528 let mut pairs: Vec<_> = self.env.iter().collect();
1529 pairs.sort();
1530 pairs.hash(state);
1531 }
1532}
1533
1534impl PartialEq for BootstrapCommand {
1535 fn eq(&self, other: &Self) -> bool {
1536 self.program == other.program
1537 && self.arg0 == other.arg0
1538 && self.args == other.args
1539 && self.env == other.env
1540 }
1541}
1542
1543impl Eq for BootstrapCommand {}
1544
1545impl BootstrapCommand {
1546 pub fn current() -> io::Result<Self> {
1549 let mut args: VecDeque<String> = std::env::args().collect();
1550 let arg0 = args.pop_front();
1551
1552 Ok(Self {
1553 program: std::env::current_exe()?,
1554 arg0,
1555 args: args.into(),
1556 env: std::env::vars().collect(),
1557 })
1558 }
1559
1560 pub fn new(&self) -> Command {
1563 let mut cmd = Command::new(&self.program);
1564 if let Some(arg0) = &self.arg0 {
1565 cmd.arg0(arg0);
1566 }
1567 for arg in &self.args {
1568 cmd.arg(arg);
1569 }
1570 for (k, v) in &self.env {
1571 cmd.env(k, v);
1572 }
1573 cmd
1574 }
1575
1576 #[cfg(test)]
1583 #[cfg(fbcode_build)]
1584 pub(crate) fn test() -> Self {
1585 Self {
1586 program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1587 arg0: None,
1588 args: vec![],
1589 env: HashMap::new(),
1590 }
1591 }
1592}
1593
1594impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1595 fn from(s: T) -> Self {
1597 Self {
1598 program: s.into(),
1599 arg0: None,
1600 args: vec![],
1601 env: HashMap::new(),
1602 }
1603 }
1604}
1605
1606#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1629pub(crate) enum LauncherKind {
1630 Native,
1633 #[cfg(target_os = "linux")]
1636 Systemd,
1637}
1638
1639impl FromStr for LauncherKind {
1640 type Err = io::Error;
1641
1642 fn from_str(s: &str) -> Result<Self, Self::Err> {
1651 match s.trim().to_ascii_lowercase().as_str() {
1652 "" | "native" => Ok(Self::Native),
1653 #[cfg(target_os = "linux")]
1654 "systemd" => Ok(Self::Systemd),
1655 other => Err(io::Error::new(
1656 io::ErrorKind::InvalidInput,
1657 format!(
1658 "unknown proc launcher kind {other:?}; expected 'native'{}",
1659 if cfg!(target_os = "linux") {
1660 " or 'systemd'"
1661 } else {
1662 ""
1663 }
1664 ),
1665 )),
1666 }
1667 }
1668}
1669
1670pub struct BootstrapProcManager {
1699 launcher: OnceLock<Arc<dyn ProcLauncher>>,
1702
1703 command: BootstrapCommand,
1705
1706 children: Arc<tokio::sync::Mutex<HashMap<hyperactor_reference::ProcId, BootstrapProcHandle>>>,
1710
1711 file_appender: Option<Arc<crate::logging::FileAppender>>,
1714
1715 socket_dir: TempDir,
1719}
1720
1721impl BootstrapProcManager {
1722 pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1729 let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1730 match crate::logging::FileAppender::new() {
1731 Some(fm) => {
1732 tracing::info!("file appender created successfully");
1733 Some(Arc::new(fm))
1734 }
1735 None => {
1736 tracing::warn!("failed to create file appender");
1737 None
1738 }
1739 }
1740 } else {
1741 None
1742 };
1743
1744 Ok(Self {
1745 launcher: OnceLock::new(),
1746 command,
1747 children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1748 file_appender,
1749 socket_dir: runtime_dir()?,
1750 })
1751 }
1752
1753 pub fn set_launcher(&self, launcher: Arc<dyn ProcLauncher>) -> Result<(), ProcLauncherError> {
1762 self.launcher.set(launcher).map_err(|_| {
1763 ProcLauncherError::Other(
1764 "launcher already initialized; call set_proc_launcher before first spawn".into(),
1765 )
1766 })
1767 }
1768
1769 pub fn launcher(&self) -> &Arc<dyn ProcLauncher> {
1775 self.launcher.get_or_init(|| {
1776 let kind_str = hyperactor_config::global::get_cloned(MESH_PROC_LAUNCHER_KIND);
1777 let kind: LauncherKind = kind_str.parse().unwrap_or(LauncherKind::Native);
1778 tracing::info!(kind = ?kind, config_value = %kind_str, "using default proc launcher");
1779 match kind {
1780 LauncherKind::Native => Arc::new(NativeProcLauncher::new()),
1781 #[cfg(target_os = "linux")]
1782 LauncherKind::Systemd => Arc::new(SystemdProcLauncher::new()),
1783 }
1784 })
1785 }
1786
1787 pub fn command(&self) -> &BootstrapCommand {
1789 &self.command
1790 }
1791
1792 pub fn socket_dir(&self) -> &Path {
1794 self.socket_dir.path()
1795 }
1796
1797 pub async fn status(&self, proc_id: &hyperactor_reference::ProcId) -> Option<ProcStatus> {
1808 self.children.lock().await.get(proc_id).map(|h| h.status())
1809 }
1810
1811 pub async fn watch(
1814 &self,
1815 proc_id: &hyperactor_reference::ProcId,
1816 ) -> Option<tokio::sync::watch::Receiver<ProcStatus>> {
1817 self.children.lock().await.get(proc_id).map(|h| h.watch())
1818 }
1819
1820 pub(crate) async fn request_stop(
1827 &self,
1828 cx: &impl context::Actor,
1829 proc: &hyperactor_reference::ProcId,
1830 timeout: Duration,
1831 reason: &str,
1832 ) {
1833 let handle = {
1834 let guard = self.children.lock().await;
1835 guard.get(proc).cloned()
1836 };
1837
1838 let Some(handle) = handle else { return };
1839
1840 let status = handle.status();
1841 if status.is_exit() || matches!(status, ProcStatus::Stopping { .. }) {
1842 return;
1843 }
1844
1845 if let Some(agent) = handle.agent_ref() {
1846 let mut agent_port = agent.port();
1847 agent_port.return_undeliverable(false);
1848 let _ = agent_port.send(
1849 cx,
1850 resource::StopAll {
1851 reason: reason.to_string(),
1852 },
1853 );
1854 }
1855
1856 let _ = handle.mark_stopping();
1857 tokio::spawn(async move {
1858 handle.wait_or_brutally_kill(timeout).await;
1859 });
1860 }
1861
1862 fn spawn_exit_monitor(
1863 &self,
1864 proc_id: hyperactor_reference::ProcId,
1865 handle: BootstrapProcHandle,
1866 exit_rx: tokio::sync::oneshot::Receiver<ProcExitResult>,
1867 ) {
1868 tokio::spawn(async move {
1869 let exit_result = match exit_rx.await {
1871 Ok(res) => res,
1872 Err(_) => {
1873 let _ = handle.mark_failed("exit_rx sender dropped unexpectedly");
1875 tracing::error!(
1876 name = "ProcStatus",
1877 status = "Exited::ChannelDropped",
1878 %proc_id,
1879 "exit channel closed without result"
1880 );
1881 return;
1882 }
1883 };
1884
1885 let mut stderr_tail: Vec<String> = Vec::new();
1889 let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1890
1891 if let Some(t) = stderr_mon {
1892 let (lines, _bytes) = t.abort().await;
1893 stderr_tail = lines;
1894 }
1895 if let Some(t) = stdout_mon {
1896 let (_lines, _bytes) = t.abort().await;
1897 }
1898
1899 if stderr_tail.is_empty() {
1901 if let Some(tail) = exit_result.stderr_tail {
1902 stderr_tail = tail;
1903 }
1904 }
1905
1906 let tail_str = if stderr_tail.is_empty() {
1907 None
1908 } else {
1909 Some(stderr_tail.join("\n"))
1910 };
1911
1912 match exit_result.kind {
1913 ProcExitKind::Exited { code } => {
1914 let _ = handle.mark_stopped(code, stderr_tail);
1915 tracing::info!(
1916 name = "ProcStatus",
1917 status = "Exited::ExitWithCode",
1918 %proc_id,
1919 exit_code = code,
1920 tail = tail_str,
1921 "proc exited with code {code}"
1922 );
1923 }
1924 ProcExitKind::Signaled {
1925 signal,
1926 core_dumped,
1927 } => {
1928 let _ = handle.mark_killed(signal, core_dumped);
1929 tracing::info!(
1930 name = "ProcStatus",
1931 status = "Exited::KilledBySignal",
1932 %proc_id,
1933 tail = tail_str,
1934 "killed by signal {signal}"
1935 );
1936 }
1937 ProcExitKind::Failed { reason } => {
1938 let _ = handle.mark_failed(&reason);
1939 tracing::info!(
1940 name = "ProcStatus",
1941 status = "Exited::Failed",
1942 %proc_id,
1943 tail = tail_str,
1944 "proc failed: {reason}"
1945 );
1946 }
1947 }
1948 });
1949 }
1950}
1951
1952pub use crate::proc_launcher::ProcBind;
1953
1954pub struct BootstrapProcConfig {
1956 pub create_rank: usize,
1958
1959 pub client_config_override: Attrs,
1962
1963 pub proc_bind: Option<ProcBind>,
1967 pub bootstrap_command: Option<BootstrapCommand>,
1970}
1971
1972#[async_trait]
1973impl ProcManager for BootstrapProcManager {
1974 type Handle = BootstrapProcHandle;
1975
1976 type Config = BootstrapProcConfig;
1977
1978 fn transport(&self) -> ChannelTransport {
1985 ChannelTransport::Unix
1986 }
1987
1988 #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
2015 async fn spawn(
2016 &self,
2017 proc_id: hyperactor_reference::ProcId,
2018 backend_addr: ChannelAddr,
2019 config: BootstrapProcConfig,
2020 ) -> Result<Self::Handle, HostError> {
2021 let (callback_addr, mut callback_rx) =
2022 channel::serve::<(ChannelAddr, hyperactor_reference::ActorRef<ProcAgent>)>(
2023 ChannelAddr::any(ChannelTransport::Unix),
2024 )?;
2025
2026 let overrides = &config.client_config_override;
2028 let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
2029 let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
2030 let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
2031 let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
2032
2033 let mode = Bootstrap::Proc {
2034 proc_id: proc_id.clone(),
2035 backend_addr,
2036 callback_addr,
2037 socket_dir_path: self.socket_dir.path().to_owned(),
2038 config: Some(config.client_config_override.clone()),
2039 };
2040
2041 let bootstrap_payload = mode
2043 .to_env_safe_string()
2044 .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?;
2045
2046 let opts = LaunchOptions {
2047 bootstrap_payload,
2048 process_name: format_process_name(&proc_id),
2049 command: config
2050 .bootstrap_command
2051 .as_ref()
2052 .unwrap_or(&self.command)
2053 .clone(),
2054 want_stdio: need_stdio,
2055 tail_lines: tail_size,
2056 log_channel: if enable_forwarding {
2057 Some(ChannelAddr::any(ChannelTransport::Unix))
2058 } else {
2059 None
2060 },
2061 proc_bind: config.proc_bind.clone(),
2062 };
2063
2064 let launch_result = self
2066 .launcher()
2067 .launch(&proc_id, opts.clone())
2068 .await
2069 .map_err(|e| {
2070 let io_err = match e {
2071 ProcLauncherError::Launch(io_err) => io_err,
2072 other => std::io::Error::other(other.to_string()),
2073 };
2074 HostError::ProcessSpawnFailure(proc_id.clone(), io_err)
2075 })?;
2076
2077 let (out_fwder, err_fwder) = match launch_result.stdio {
2079 StdioHandling::Captured { stdout, stderr } => {
2080 let (file_stdout, file_stderr) = if enable_file_capture {
2081 match self.file_appender.as_deref() {
2082 Some(fm) => (
2083 Some(fm.addr_for(OutputTarget::Stdout)),
2084 Some(fm.addr_for(OutputTarget::Stderr)),
2085 ),
2086 None => {
2087 tracing::warn!("enable_file_capture=true but no FileAppender");
2088 (None, None)
2089 }
2090 }
2091 } else {
2092 (None, None)
2093 };
2094
2095 let out = StreamFwder::start(
2096 stdout,
2097 file_stdout,
2098 OutputTarget::Stdout,
2099 tail_size,
2100 opts.log_channel.clone(),
2101 &proc_id,
2102 config.create_rank,
2103 );
2104 let err = StreamFwder::start(
2105 stderr,
2106 file_stderr,
2107 OutputTarget::Stderr,
2108 tail_size,
2109 opts.log_channel.clone(),
2110 &proc_id,
2111 config.create_rank,
2112 );
2113 (Some(out), Some(err))
2114 }
2115 StdioHandling::Inherited | StdioHandling::ManagedByLauncher => {
2116 if !need_stdio {
2117 tracing::info!(
2118 %proc_id, enable_forwarding, enable_file_capture, tail_size,
2119 "child stdio NOT captured (forwarding/file_capture/tail all disabled)"
2120 );
2121 }
2122 (None, None)
2123 }
2124 };
2125
2126 let handle = BootstrapProcHandle::new(proc_id.clone(), Arc::downgrade(self.launcher()));
2128 handle.mark_running(launch_result.started_at);
2129 handle.set_stream_monitors(out_fwder, err_fwder);
2130
2131 {
2133 let mut children = self.children.lock().await;
2134 children.insert(proc_id.clone(), handle.clone());
2135 }
2136
2137 self.spawn_exit_monitor(proc_id.clone(), handle.clone(), launch_result.exit_rx);
2140
2141 let h = handle.clone();
2143 tokio::spawn(async move {
2144 match callback_rx.recv().await {
2145 Ok((addr, agent)) => {
2146 let _ = h.mark_ready(addr, agent);
2147 }
2148 Err(e) => {
2149 let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2151 }
2152 }
2153 });
2154
2155 Ok(handle)
2157 }
2158}
2159
2160#[async_trait]
2161impl hyperactor::host::SingleTerminate for BootstrapProcManager {
2162 async fn terminate_proc(
2172 &self,
2173 cx: &impl context::Actor,
2174 proc: &hyperactor_reference::ProcId,
2175 timeout: Duration,
2176 reason: &str,
2177 ) -> Result<
2178 (
2179 Vec<hyperactor_reference::ActorId>,
2180 Vec<hyperactor_reference::ActorId>,
2181 ),
2182 anyhow::Error,
2183 > {
2184 let proc_handle: Option<BootstrapProcHandle> = {
2186 let mut guard = self.children.lock().await;
2187 guard.remove(proc)
2188 };
2189
2190 if let Some(h) = proc_handle {
2191 h.terminate(cx, timeout, reason)
2192 .await
2193 .map(|_| (Vec::new(), Vec::new()))
2194 .map_err(|e| e.into())
2195 } else {
2196 Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2197 }
2198 }
2199}
2200
2201#[async_trait]
2202impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2203 async fn terminate_all(
2217 &self,
2218 cx: &impl context::Actor,
2219 timeout: Duration,
2220 max_in_flight: usize,
2221 reason: &str,
2222 ) -> TerminateSummary {
2223 let handles: Vec<BootstrapProcHandle> = {
2226 let mut guard = self.children.lock().await;
2227 guard.drain().map(|(_, v)| v).collect()
2228 };
2229
2230 let attempted = handles.len();
2231 let mut ok = 0usize;
2232
2233 let results = stream::iter(handles.into_iter().map(|h| async move {
2234 match h.terminate(cx, timeout, reason).await {
2235 Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2236 true
2238 }
2239 Err(e) => {
2240 tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2241 false
2242 }
2243 }
2244 }))
2245 .buffer_unordered(max_in_flight.max(1))
2246 .collect::<Vec<bool>>()
2247 .await;
2248
2249 for r in results {
2250 if r {
2251 ok += 1;
2252 }
2253 }
2254
2255 TerminateSummary {
2256 attempted,
2257 ok,
2258 failed: attempted.saturating_sub(ok),
2259 }
2260 }
2261}
2262
2263pub async fn bootstrap() -> anyhow::Result<i32> {
2280 let boot = Bootstrap::get_from_env()?.unwrap_or_else(Bootstrap::default);
2281 boot.bootstrap().await
2282}
2283
2284async fn bootstrap_v0_proc_mesh(config: Option<Attrs>) -> anyhow::Error {
2294 if let Some(attrs) = config {
2296 hyperactor_config::global::set(hyperactor_config::global::Source::ClientOverride, attrs);
2297 tracing::debug!("bootstrap: installed ClientOverride config snapshot (V0ProcMesh)");
2298 } else {
2299 tracing::debug!("bootstrap: no config snapshot provided (V0ProcMesh)");
2300 }
2301 tracing::info!(
2302 "bootstrap_v0_proc_mesh config:\n{}",
2303 hyperactor_config::global::attrs()
2304 );
2305
2306 pub async fn go() -> Result<(), anyhow::Error> {
2307 let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2308 let procs_for_cleanup = procs.clone();
2309 let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2310 for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2311 if let Err(err) = proc_to_stop
2312 .destroy_and_wait::<()>(
2313 Duration::from_millis(10),
2314 None,
2315 "execute cleanup callback",
2316 )
2317 .await
2318 {
2319 tracing::error!(
2320 "error while stopping proc {}: {}",
2321 proc_to_stop.proc_id(),
2322 err
2323 );
2324 }
2325 }
2326 }));
2327
2328 let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2329 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2330 .parse()?;
2331 let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2332 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2333 .parse()?;
2334 let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2335
2336 let entered = tracing::span!(
2337 Level::INFO,
2338 "bootstrap_v0_proc_mesh",
2339 %bootstrap_addr,
2340 %bootstrap_index,
2341 %listen_addr,
2342 )
2343 .entered();
2344
2345 let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2346 let tx = channel::dial(bootstrap_addr.clone())?;
2347
2348 let (rtx, mut return_channel) = oneshot::channel();
2349 tx.try_post(
2350 Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2351 rtx,
2352 );
2353 tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2354
2355 let _ = entered.exit();
2356
2357 let mut the_msg;
2358
2359 tokio::select! {
2360 msg = rx.recv() => {
2361 the_msg = msg;
2362 }
2363 returned_msg = &mut return_channel => {
2364 match returned_msg {
2365 Ok(msg) => {
2366 return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2367 }
2368 Err(_) => {
2369 the_msg = rx.recv().await;
2370 }
2371 }
2372 }
2373 }
2374 loop {
2375 match the_msg? {
2376 Allocator2Process::StartProc(proc_id, listen_transport) => {
2377 let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2378 let (proc, mesh_agent) = ProcAgent::bootstrap(proc_id.clone())
2379 .instrument(span.clone())
2380 .await?;
2381 let entered = span.entered();
2382 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2383 let handle = proc.clone().serve(proc_rx);
2384 drop(handle); let span = entered.exit();
2386 tx.send(Process2Allocator(
2387 bootstrap_index,
2388 Process2AllocatorMessage::StartedProc(
2389 proc_id.clone(),
2390 mesh_agent.bind(),
2391 proc_addr,
2392 ),
2393 ))
2394 .instrument(span)
2395 .await?;
2396 procs.lock().await.push(proc);
2397 }
2398 Allocator2Process::StopAndExit(code) => {
2399 tracing::info!("stopping procs with code {code}");
2400 {
2401 for proc_to_stop in procs.lock().await.iter_mut() {
2402 if let Err(err) = proc_to_stop
2403 .destroy_and_wait::<()>(
2404 Duration::from_millis(10),
2405 None,
2406 "stop and exit",
2407 )
2408 .await
2409 {
2410 tracing::error!(
2411 "error while stopping proc {}: {}",
2412 proc_to_stop.proc_id(),
2413 err
2414 );
2415 }
2416 }
2417 }
2418 tracing::info!("exiting with {code}");
2419 std::process::exit(code);
2420 }
2421 Allocator2Process::Exit(code) => {
2422 tracing::info!("exiting with {code}");
2423 std::process::exit(code);
2424 }
2425 }
2426 the_msg = rx.recv().await;
2427 }
2428 }
2429
2430 go().await.unwrap_err()
2431}
2432
2433pub async fn bootstrap_or_die() -> ! {
2436 match bootstrap().await {
2437 Ok(exit_code) => std::process::exit(exit_code),
2438 Err(err) => {
2439 let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2440 tracing::error!("failed to bootstrap mesh process: {}", err);
2441 std::process::exit(1);
2442 }
2443 }
2444}
2445
2446#[derive(enum_as_inner::EnumAsInner)]
2447enum DebugSink {
2448 File(std::fs::File),
2449 Sink,
2450}
2451
2452impl DebugSink {
2453 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2454 match self {
2455 DebugSink::File(f) => f.write(buf),
2456 DebugSink::Sink => Ok(buf.len()),
2457 }
2458 }
2459 fn flush(&mut self) -> io::Result<()> {
2460 match self {
2461 DebugSink::File(f) => f.flush(),
2462 DebugSink::Sink => Ok(()),
2463 }
2464 }
2465}
2466
2467fn debug_sink() -> &'static Mutex<DebugSink> {
2468 static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2469 DEBUG_SINK.get_or_init(|| {
2470 let debug_path = {
2471 let mut p = std::env::temp_dir();
2472 if let Ok(user) = std::env::var("USER") {
2473 p.push(user);
2474 }
2475 std::fs::create_dir_all(&p).ok();
2476 p.push("monarch-bootstrap-debug.log");
2477 p
2478 };
2479 let sink = if debug_path.exists() {
2480 match OpenOptions::new()
2481 .append(true)
2482 .create(true)
2483 .open(debug_path.clone())
2484 {
2485 Ok(f) => DebugSink::File(f),
2486 Err(_e) => {
2487 eprintln!(
2488 "failed to open {} for bootstrap debug logging",
2489 debug_path.display()
2490 );
2491 DebugSink::Sink
2492 }
2493 }
2494 } else {
2495 DebugSink::Sink
2496 };
2497 Mutex::new(sink)
2498 })
2499}
2500
2501const DEBUG_TO_STDERR: bool = false;
2503
2504struct Debug;
2507
2508impl Debug {
2509 fn is_active() -> bool {
2510 DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2511 }
2512}
2513
2514impl Write for Debug {
2515 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2516 let res = debug_sink().lock().unwrap().write(buf);
2517 if DEBUG_TO_STDERR {
2518 let n = match res {
2519 Ok(n) => n,
2520 Err(_) => buf.len(),
2521 };
2522 let _ = io::stderr().write_all(&buf[..n]);
2523 }
2524
2525 res
2526 }
2527 fn flush(&mut self) -> io::Result<()> {
2528 let res = debug_sink().lock().unwrap().flush();
2529 if DEBUG_TO_STDERR {
2530 let _ = io::stderr().flush();
2531 }
2532 res
2533 }
2534}
2535
2536fn runtime_dir() -> io::Result<TempDir> {
2539 match std::env::var_os("XDG_RUNTIME_DIR") {
2540 Some(runtime_dir) => {
2541 let path = PathBuf::from(runtime_dir);
2542 tempfile::tempdir_in(path)
2543 }
2544 None => tempfile::tempdir(),
2545 }
2546}
2547
2548#[cfg(test)]
2549mod tests {
2550 use std::path::PathBuf;
2551
2552 use hyperactor::RemoteSpawn;
2553 use hyperactor::channel::ChannelAddr;
2554 use hyperactor::channel::ChannelTransport;
2555 use hyperactor::channel::TcpMode;
2556 use hyperactor::context::Mailbox as _;
2557 use hyperactor::host::ProcHandle;
2558 use hyperactor::reference as hyperactor_reference;
2559 use hyperactor::testing::ids::test_proc_id;
2560 use hyperactor::testing::ids::test_proc_id_with_addr;
2561 use hyperactor_config::Flattrs;
2562 use ndslice::Extent;
2563 use ndslice::ViewExt;
2564 use ndslice::extent;
2565 use tokio::process::Command;
2566
2567 use super::*;
2568 use crate::ActorMesh;
2569 use crate::alloc::AllocSpec;
2570 use crate::alloc::Allocator;
2571 use crate::alloc::ProcessAllocator;
2572 use crate::host_mesh::HostMesh;
2573 use crate::testactor;
2574 use crate::testing;
2575
2576 #[test]
2577 fn test_bootstrap_mode_env_string_none_config_proc() {
2578 let values = [
2579 Bootstrap::default(),
2580 Bootstrap::Proc {
2581 proc_id: test_proc_id("foo_0"),
2582 backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2583 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2584 socket_dir_path: PathBuf::from("notexist"),
2585 config: None,
2586 },
2587 ];
2588
2589 for value in values {
2590 let safe = value.to_env_safe_string().unwrap();
2591 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2592
2593 let safe2 = round.to_env_safe_string().unwrap();
2596 assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2597
2598 match (&value, &round) {
2600 (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2601 (
2602 Bootstrap::V0ProcMesh { config: None },
2603 Bootstrap::V0ProcMesh { config: None },
2604 ) => {}
2605 _ => panic!("decoded variant mismatch: got {:?}", round),
2606 }
2607 }
2608 }
2609
2610 #[test]
2611 fn test_bootstrap_mode_env_string_none_config_host() {
2612 let value = Bootstrap::Host {
2613 addr: ChannelAddr::any(ChannelTransport::Unix),
2614 command: None,
2615 config: None,
2616 exit_on_shutdown: false,
2617 };
2618
2619 let safe = value.to_env_safe_string().unwrap();
2620 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2621
2622 let safe2 = round.to_env_safe_string().unwrap();
2624 assert_eq!(safe, safe2);
2625
2626 match round {
2628 Bootstrap::Host { config: None, .. } => {}
2629 other => panic!("expected Host with None config, got {:?}", other),
2630 }
2631 }
2632
2633 #[test]
2634 fn test_bootstrap_mode_env_string_invalid() {
2635 assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2637 }
2638
2639 #[test]
2640 fn test_bootstrap_config_snapshot_roundtrip() {
2641 let mut attrs = Attrs::new();
2643 attrs[MESH_TAIL_LOG_LINES] = 123;
2644 attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2645
2646 let socket_dir = runtime_dir().unwrap();
2647
2648 {
2650 let original = Bootstrap::Proc {
2651 proc_id: test_proc_id("foo_42"),
2652 backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2653 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2654 config: Some(attrs.clone()),
2655 socket_dir_path: socket_dir.path().to_owned(),
2656 };
2657 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2658 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2659 match &decoded {
2660 Bootstrap::Proc { config, .. } => {
2661 let cfg = config.as_ref().expect("expected Some(attrs)");
2662 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2663 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2664 }
2665 other => panic!("unexpected variant after roundtrip: {:?}", other),
2666 }
2667 }
2668
2669 {
2671 let original = Bootstrap::Host {
2672 addr: ChannelAddr::any(ChannelTransport::Unix),
2673 command: None,
2674 config: Some(attrs.clone()),
2675 exit_on_shutdown: false,
2676 };
2677 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2678 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2679 match &decoded {
2680 Bootstrap::Host { config, .. } => {
2681 let cfg = config.as_ref().expect("expected Some(attrs)");
2682 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2683 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2684 }
2685 other => panic!("unexpected variant after roundtrip: {:?}", other),
2686 }
2687 }
2688
2689 {
2691 let original = Bootstrap::V0ProcMesh {
2692 config: Some(attrs.clone()),
2693 };
2694 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2695 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2696 match &decoded {
2697 Bootstrap::V0ProcMesh { config } => {
2698 let cfg = config.as_ref().expect("expected Some(attrs)");
2699 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2700 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2701 }
2702 other => panic!("unexpected variant after roundtrip: {:?}", other),
2703 }
2704 }
2705 }
2706
2707 #[tokio::test]
2708 async fn test_v1_child_logging() {
2709 use hyperactor::channel;
2710 use hyperactor::mailbox::BoxedMailboxSender;
2711 use hyperactor::mailbox::DialMailboxRouter;
2712 use hyperactor::mailbox::MailboxServer;
2713 use hyperactor::proc::Proc;
2714
2715 use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2716 use crate::logging::LogClientActor;
2717 use crate::logging::LogClientMessageClient;
2718 use crate::logging::LogForwardActor;
2719 use crate::logging::LogMessage;
2720 use crate::logging::OutputTarget;
2721 use crate::logging::test_tap;
2722
2723 let router = DialMailboxRouter::new();
2724 let (proc_addr, proc_rx) =
2725 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2726 let proc = Proc::configured(
2727 test_proc_id("client_0"),
2728 BoxedMailboxSender::new(router.clone()),
2729 );
2730 proc.clone().serve(proc_rx);
2731 router.bind(test_proc_id("client_0").into(), proc_addr.clone());
2732 let (client, _handle) = proc.instance("client").unwrap();
2733
2734 let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2735 test_tap::install(tap_tx);
2736
2737 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2738 unsafe {
2740 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2741 }
2742
2743 let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
2746 let log_client: hyperactor_reference::ActorRef<LogClientActor> =
2747 proc.spawn("log_client", log_client_actor).unwrap().bind();
2748 log_client.set_aggregate(&client, None).await.unwrap();
2749
2750 let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
2753 .await
2754 .unwrap();
2755 let _log_forwarder: hyperactor_reference::ActorRef<LogForwardActor> = proc
2756 .spawn("log_forwarder", log_forwarder_actor)
2757 .unwrap()
2758 .bind();
2759
2760 let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2763
2764 tx.post(LogMessage::Log {
2767 hostname: "testhost".into(),
2768 proc_id: "testproc[0]".into(),
2769 output_target: OutputTarget::Stdout,
2770 payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2771 });
2772
2773 let line = tokio::time::timeout(Duration::from_secs(2), tap_rx.recv())
2775 .await
2776 .expect("timed out waiting for log line")
2777 .expect("tap channel closed unexpectedly");
2778 assert!(
2779 line.contains("hello from child"),
2780 "log line did not appear via LogClientActor; got: {line}"
2781 );
2782 }
2783
2784 mod proc_handle {
2785
2786 use std::sync::Arc;
2787 use std::time::Duration;
2788
2789 use async_trait::async_trait;
2790 use hyperactor::host::ProcHandle;
2791 use hyperactor::reference as hyperactor_reference;
2792 use hyperactor::testing::ids::test_proc_id;
2793
2794 use super::super::*;
2795 use crate::proc_launcher::LaunchOptions;
2796 use crate::proc_launcher::LaunchResult;
2797 use crate::proc_launcher::ProcLauncher;
2798 use crate::proc_launcher::ProcLauncherError;
2799
2800 struct TestProcLauncher;
2808
2809 #[async_trait]
2810 impl ProcLauncher for TestProcLauncher {
2811 async fn launch(
2812 &self,
2813 _proc_id: &hyperactor_reference::ProcId,
2814 _opts: LaunchOptions,
2815 ) -> Result<LaunchResult, ProcLauncherError> {
2816 panic!("TestProcLauncher::launch should not be called in unit tests");
2817 }
2818
2819 async fn terminate(
2820 &self,
2821 _proc_id: &hyperactor_reference::ProcId,
2822 _timeout: Duration,
2823 ) -> Result<(), ProcLauncherError> {
2824 panic!("TestProcLauncher::terminate should not be called in unit tests");
2825 }
2826
2827 async fn kill(
2828 &self,
2829 _proc_id: &hyperactor_reference::ProcId,
2830 ) -> Result<(), ProcLauncherError> {
2831 panic!("TestProcLauncher::kill should not be called in unit tests");
2832 }
2833 }
2834
2835 fn handle_for_test() -> BootstrapProcHandle {
2842 let proc_id = test_proc_id("0");
2843 let launcher: Arc<dyn ProcLauncher> = Arc::new(TestProcLauncher);
2844 BootstrapProcHandle::new(proc_id, Arc::downgrade(&launcher))
2845 }
2846
2847 #[tokio::test]
2848 async fn starting_to_running_ok() {
2849 let h = handle_for_test();
2850 assert!(matches!(h.status(), ProcStatus::Starting));
2851 let child_started_at = std::time::SystemTime::now();
2852 assert!(h.mark_running(child_started_at));
2853 match h.status() {
2854 ProcStatus::Running { started_at } => {
2855 assert_eq!(started_at, child_started_at);
2856 }
2857 other => panic!("expected Running, got {other:?}"),
2858 }
2859 }
2860
2861 #[tokio::test]
2862 async fn running_to_stopping_to_stopped_ok() {
2863 let h = handle_for_test();
2864 let child_started_at = std::time::SystemTime::now();
2865 assert!(h.mark_running(child_started_at));
2866 assert!(h.mark_stopping());
2867 assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2868 assert!(h.mark_stopped(0, Vec::new()));
2869 assert!(matches!(
2870 h.status(),
2871 ProcStatus::Stopped { exit_code: 0, .. }
2872 ));
2873 }
2874
2875 #[tokio::test]
2876 async fn running_to_killed_ok() {
2877 let h = handle_for_test();
2878 let child_started_at = std::time::SystemTime::now();
2879 assert!(h.mark_running(child_started_at));
2880 assert!(h.mark_killed(9, true));
2881 assert!(matches!(
2882 h.status(),
2883 ProcStatus::Killed {
2884 signal: 9,
2885 core_dumped: true
2886 }
2887 ));
2888 }
2889
2890 #[tokio::test]
2891 async fn running_to_failed_ok() {
2892 let h = handle_for_test();
2893 let child_started_at = std::time::SystemTime::now();
2894 assert!(h.mark_running(child_started_at));
2895 assert!(h.mark_failed("bootstrap error"));
2896 match h.status() {
2897 ProcStatus::Failed { reason } => {
2898 assert_eq!(reason, "bootstrap error");
2899 }
2900 other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2901 }
2902 }
2903
2904 #[tokio::test]
2905 async fn illegal_transitions_are_rejected() {
2906 let h = handle_for_test();
2907 let child_started_at = std::time::SystemTime::now();
2908 assert!(h.mark_running(child_started_at));
2910 assert!(!h.mark_running(std::time::SystemTime::now()));
2911 assert!(matches!(h.status(), ProcStatus::Running { .. }));
2912 assert!(h.mark_stopping());
2914 assert!(h.mark_stopped(0, Vec::new()));
2915 assert!(!h.mark_running(child_started_at));
2916 assert!(!h.mark_killed(9, false));
2917 assert!(!h.mark_failed("nope"));
2918
2919 assert!(matches!(
2920 h.status(),
2921 ProcStatus::Stopped { exit_code: 0, .. }
2922 ));
2923 }
2924
2925 #[tokio::test]
2926 async fn transitions_from_ready_are_legal() {
2927 let h = handle_for_test();
2928 let addr = ChannelAddr::any(ChannelTransport::Unix);
2929 let t0 = std::time::SystemTime::now();
2931 assert!(h.mark_running(t0));
2932 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2935 let actor_id = proc_id.actor_id("proc_agent", 0);
2936 let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
2937 hyperactor_reference::ActorRef::attest(actor_id);
2938 assert!(h.mark_ready(addr, agent_ref));
2940 assert!(h.mark_stopping());
2941 assert!(h.mark_stopped(0, Vec::new()));
2942 }
2943
2944 #[tokio::test]
2945 async fn ready_to_killed_is_legal() {
2946 let h = handle_for_test();
2947 let addr = ChannelAddr::any(ChannelTransport::Unix);
2948 let t0 = std::time::SystemTime::now();
2950 assert!(h.mark_running(t0));
2951 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2954 let actor_id = proc_id.actor_id("proc_agent", 0);
2955 let agent: hyperactor_reference::ActorRef<ProcAgent> =
2956 hyperactor_reference::ActorRef::attest(actor_id);
2957 assert!(h.mark_ready(addr, agent));
2959 assert!(h.mark_killed(9, false));
2961 }
2962
2963 #[tokio::test]
2964 async fn mark_failed_from_stopping_is_allowed() {
2965 let h = handle_for_test();
2966
2967 assert!(h.mark_stopping(), "precondition: to Stopping");
2969
2970 assert!(
2972 h.mark_failed("boom"),
2973 "mark_failed() should succeed from Stopping"
2974 );
2975 match h.status() {
2976 ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2977 other => panic!("expected Failed(\"boom\"), got {other:?}"),
2978 }
2979 }
2980 }
2981
2982 struct TestLauncher;
2988
2989 #[async_trait::async_trait]
2990 impl crate::proc_launcher::ProcLauncher for TestLauncher {
2991 async fn launch(
2992 &self,
2993 _proc_id: &hyperactor_reference::ProcId,
2994 _opts: crate::proc_launcher::LaunchOptions,
2995 ) -> Result<crate::proc_launcher::LaunchResult, crate::proc_launcher::ProcLauncherError>
2996 {
2997 panic!("TestLauncher::launch should not be called in unit tests");
2998 }
2999
3000 async fn terminate(
3001 &self,
3002 _proc_id: &hyperactor_reference::ProcId,
3003 _timeout: std::time::Duration,
3004 ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
3005 panic!("TestLauncher::terminate should not be called in unit tests");
3006 }
3007
3008 async fn kill(
3009 &self,
3010 _proc_id: &hyperactor_reference::ProcId,
3011 ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
3012 panic!("TestLauncher::kill should not be called in unit tests");
3013 }
3014 }
3015
3016 fn test_handle(proc_id: hyperactor_reference::ProcId) -> BootstrapProcHandle {
3017 let launcher: std::sync::Arc<dyn crate::proc_launcher::ProcLauncher> =
3018 std::sync::Arc::new(TestLauncher);
3019 BootstrapProcHandle::new(proc_id, std::sync::Arc::downgrade(&launcher))
3020 }
3021
3022 #[tokio::test]
3023 async fn watch_notifies_on_status_changes() {
3024 let proc_id = test_proc_id("1");
3025 let handle = test_handle(proc_id);
3026 let mut rx = handle.watch();
3027
3028 let now = std::time::SystemTime::now();
3030 assert!(handle.mark_running(now));
3031 rx.changed().await.ok(); match &*rx.borrow() {
3033 ProcStatus::Running { started_at } => {
3034 assert_eq!(*started_at, now);
3035 }
3036 s => panic!("expected Running, got {s:?}"),
3037 }
3038
3039 assert!(handle.mark_stopped(0, Vec::new()));
3041 rx.changed().await.ok(); assert!(matches!(
3043 &*rx.borrow(),
3044 ProcStatus::Stopped { exit_code: 0, .. }
3045 ));
3046 }
3047
3048 #[tokio::test]
3049 async fn ready_errs_if_process_exits_before_running() {
3050 let proc_id =
3051 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "early-exit");
3052 let handle = test_handle(proc_id);
3053
3054 assert!(handle.mark_stopped(7, Vec::new()));
3057
3058 match handle.ready_inner().await {
3060 Ok(()) => panic!("ready() unexpectedly succeeded"),
3061 Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3062 assert_eq!(exit_code, 7)
3063 }
3064 Err(other) => panic!("expected Stopped(7), got {other:?}"),
3065 }
3066 }
3067
3068 #[tokio::test]
3069 async fn status_unknown_proc_is_none() {
3070 let manager = BootstrapProcManager::new(BootstrapCommand {
3071 program: PathBuf::from("/bin/true"),
3072 ..Default::default()
3073 })
3074 .unwrap();
3075 let unknown = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "nope");
3076 assert!(manager.status(&unknown).await.is_none());
3077 }
3078
3079 #[tokio::test]
3080 async fn handle_ready_allows_waiters() {
3081 let proc_id = test_proc_id("42");
3082 let handle = test_handle(proc_id.clone());
3083
3084 let started_at = std::time::SystemTime::now();
3085 assert!(handle.mark_running(started_at));
3086
3087 let actor_id = proc_id.actor_id("proc_agent", 0);
3088 let agent_ref: hyperactor_reference::ActorRef<ProcAgent> =
3089 hyperactor_reference::ActorRef::attest(actor_id);
3090
3091 let ready_addr = ChannelAddr::any(ChannelTransport::Unix);
3094
3095 assert!(handle.mark_ready(ready_addr.clone(), agent_ref));
3097 handle
3098 .ready_inner()
3099 .await
3100 .expect("ready_inner() should complete after Ready");
3101
3102 match handle.status() {
3105 ProcStatus::Ready {
3106 started_at: t,
3107 addr: a,
3108 ..
3109 } => {
3110 assert_eq!(t, started_at);
3111 assert_eq!(a, ready_addr);
3112 }
3113 other => panic!("expected Ready, got {other:?}"),
3114 }
3115 }
3116
3117 #[test]
3118 fn display_running_includes_uptime() {
3119 let started_at = std::time::SystemTime::now() - Duration::from_secs(42);
3120 let st = ProcStatus::Running { started_at };
3121
3122 let s = format!("{}", st);
3123 assert!(s.contains("Running"));
3124 assert!(s.contains("42s"));
3125 }
3126
3127 #[test]
3128 fn display_ready_includes_addr() {
3129 let started_at = std::time::SystemTime::now() - Duration::from_secs(5);
3130 let addr = ChannelAddr::any(ChannelTransport::Unix);
3131 let agent = hyperactor_reference::ActorRef::attest(
3132 test_proc_id_with_addr(addr.clone(), "proc")
3133 .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3134 );
3135
3136 let st = ProcStatus::Ready {
3137 started_at,
3138 addr: addr.clone(),
3139 agent,
3140 };
3141
3142 let s = format!("{}", st);
3143 assert!(s.contains(&addr.to_string())); assert!(s.contains("Ready"));
3145 }
3146
3147 #[test]
3148 fn display_stopped_includes_exit_code() {
3149 let st = ProcStatus::Stopped {
3150 exit_code: 7,
3151 stderr_tail: Vec::new(),
3152 };
3153 let s = format!("{}", st);
3154 assert!(s.contains("Stopped"));
3155 assert!(s.contains("7"));
3156 }
3157
3158 #[test]
3159 fn display_other_variants_does_not_panic() {
3160 let samples = vec![
3161 ProcStatus::Starting,
3162 ProcStatus::Stopping {
3163 started_at: std::time::SystemTime::now(),
3164 },
3165 ProcStatus::Ready {
3166 started_at: std::time::SystemTime::now(),
3167 addr: ChannelAddr::any(ChannelTransport::Unix),
3168 agent: hyperactor_reference::ActorRef::attest(
3169 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "x")
3170 .actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0),
3171 ),
3172 },
3173 ProcStatus::Killed {
3174 signal: 9,
3175 core_dumped: false,
3176 },
3177 ProcStatus::Failed {
3178 reason: "boom".into(),
3179 },
3180 ];
3181
3182 for st in samples {
3183 let _ = format!("{}", st); }
3185 }
3186
3187 #[tokio::test]
3188 async fn proc_handle_ready_ok_through_trait() {
3189 let proc_id =
3190 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-ready-ok");
3191 let handle = test_handle(proc_id.clone());
3192
3193 let t0 = std::time::SystemTime::now();
3195 assert!(handle.mark_running(t0));
3196
3197 let addr = ChannelAddr::any(ChannelTransport::Unix);
3199 let agent: hyperactor_reference::ActorRef<ProcAgent> =
3200 hyperactor_reference::ActorRef::attest(proc_id.actor_id("proc_agent", 0));
3201 assert!(handle.mark_ready(addr, agent));
3202
3203 let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3205 assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3206 }
3207
3208 #[tokio::test]
3209 async fn proc_handle_wait_returns_terminal_status() {
3210 let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-wait");
3211 let handle = test_handle(proc_id);
3212
3213 assert!(handle.mark_stopped(0, Vec::new()));
3215
3216 let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3218 .await
3219 .expect("wait should return Ok(terminal)");
3220
3221 match st {
3222 ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3223 other => panic!("expected Stopped(0), got {other:?}"),
3224 }
3225 }
3226
3227 #[tokio::test]
3228 async fn ready_wrapper_maps_terminal_to_trait_error() {
3229 let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "wrap");
3230 let handle = test_handle(proc_id);
3231
3232 assert!(handle.mark_stopped(7, Vec::new()));
3233
3234 match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3235 Ok(()) => panic!("expected Err"),
3236 Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3237 exit_code, ..
3238 })) => {
3239 assert_eq!(exit_code, 7);
3240 }
3241 Err(e) => panic!("unexpected error: {e:?}"),
3242 }
3243 }
3244
3245 async fn make_proc_id_and_backend_addr(
3255 instance: &hyperactor::Instance<()>,
3256 _tag: &str,
3257 ) -> (hyperactor_reference::ProcId, ChannelAddr) {
3258 let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3261
3262 instance.proc().clone().serve(rx);
3266
3267 let proc_id = test_proc_id_with_addr(ChannelTransport::Unix.any(), "proc");
3270 (proc_id, backend_addr)
3271 }
3272
3273 #[tokio::test]
3274 #[cfg(fbcode_build)]
3275 async fn bootstrap_handle_terminate_graceful() {
3276 let root =
3278 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3279 let (instance, _handle) = root.instance("client").unwrap();
3280
3281 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3282 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3283 let handle = mgr
3284 .spawn(
3285 proc_id.clone(),
3286 backend_addr.clone(),
3287 BootstrapProcConfig {
3288 create_rank: 0,
3289 client_config_override: Attrs::new(),
3290 proc_bind: None,
3291 bootstrap_command: None,
3292 },
3293 )
3294 .await
3295 .expect("spawn bootstrap child");
3296
3297 handle.ready().await.expect("ready");
3298
3299 let deadline = Duration::from_secs(2);
3300 match tokio::time::timeout(
3301 deadline * 2,
3302 handle.terminate(&instance, deadline, "test terminate"),
3303 )
3304 .await
3305 {
3306 Err(_) => panic!("terminate() future hung"),
3307 Ok(Ok(st)) => {
3308 match st {
3309 ProcStatus::Stopped { exit_code, .. } => {
3310 assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3312 }
3313 ProcStatus::Killed { signal, .. } => {
3314 assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3332 }
3333 other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3334 }
3335 }
3336 Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3337 }
3338 }
3339
3340 #[tokio::test]
3341 #[cfg(fbcode_build)]
3342 async fn bootstrap_handle_kill_forced() {
3343 let root =
3345 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3346 let (instance, _handle) = root.instance("client").unwrap();
3347
3348 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3349
3350 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3352
3353 let handle = mgr
3355 .spawn(
3356 proc_id.clone(),
3357 backend_addr.clone(),
3358 BootstrapProcConfig {
3359 create_rank: 0,
3360 client_config_override: Attrs::new(),
3361 proc_bind: None,
3362 bootstrap_command: None,
3363 },
3364 )
3365 .await
3366 .expect("spawn bootstrap child");
3367
3368 handle.ready().await.expect("ready");
3371
3372 let deadline = Duration::from_secs(5);
3375 match tokio::time::timeout(deadline, handle.kill()).await {
3376 Err(_) => panic!("kill() future hung"),
3377 Ok(Ok(st)) => {
3378 match st {
3380 ProcStatus::Killed { signal, .. } => {
3381 assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3383 }
3384 other => panic!("expected Killed status after kill(); got: {other:?}"),
3385 }
3386 }
3387 Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3388 }
3389 }
3390
3391 #[tokio::test]
3392 #[cfg(fbcode_build)]
3393 async fn bootstrap_canonical_simple() {
3394 unsafe {
3396 std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3397 }
3398 let instance = testing::instance();
3401
3402 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3404 "monarch/hyperactor_mesh/bootstrap",
3405 )));
3406 let alloc = allocator
3408 .allocate(AllocSpec {
3409 extent: extent!(replicas = 1),
3410 constraints: Default::default(),
3411 proc_name: None,
3412 transport: ChannelTransport::Unix,
3413 proc_allocation_mode: Default::default(),
3414 })
3415 .await
3416 .unwrap();
3417
3418 let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3445 .await
3446 .unwrap();
3447
3448 let proc_mesh = host_mesh
3463 .spawn(&instance, "p0", Extent::unity(), None)
3464 .await
3465 .unwrap();
3466
3467 let actor_mesh: ActorMesh<testactor::TestActor> =
3483 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3484
3485 let (port, mut rx) = instance.mailbox().open_port();
3491 actor_mesh
3492 .cast(&instance, testactor::GetActorId(port.bind()))
3493 .unwrap();
3494 let (got_id, _seq) = rx.recv().await.unwrap();
3495 assert_eq!(
3496 got_id,
3497 actor_mesh.values().next().unwrap().actor_id().clone()
3498 );
3499
3500 host_mesh.shutdown(&instance).await.expect("host shutdown");
3504 }
3505
3506 #[tokio::test]
3509 #[cfg(all(fbcode_build, target_os = "linux"))]
3510 async fn bootstrap_canonical_simple_systemd_launcher() {
3511 let instance = testing::instance();
3514
3515 let config = hyperactor_config::global::lock();
3519 let _flush_guard = config.override_key(
3520 hyperactor::config::FORWARDER_FLUSH_TIMEOUT,
3521 std::time::Duration::from_secs(600),
3522 );
3523
3524 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3526 "monarch/hyperactor_mesh/bootstrap",
3527 )));
3528 let alloc = allocator
3531 .allocate(AllocSpec {
3532 extent: extent!(replicas = 1),
3533 constraints: Default::default(),
3534 proc_name: None,
3535 transport: ChannelTransport::Unix,
3536 proc_allocation_mode: Default::default(),
3537 })
3538 .await
3539 .unwrap();
3540
3541 let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3544 .await
3545 .unwrap();
3546
3547 let proc_mesh = host_mesh
3549 .spawn(&instance, "p0", Extent::unity(), None)
3550 .await
3551 .unwrap();
3552
3553 let actor_mesh: ActorMesh<testactor::TestActor> =
3556 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3557
3558 let (port, mut rx) = instance.mailbox().open_port();
3560 actor_mesh
3561 .cast(&instance, testactor::GetActorId(port.bind()))
3562 .unwrap();
3563 let (got_id, _) = rx.recv().await.unwrap();
3564 assert_eq!(
3565 got_id,
3566 actor_mesh.values().next().unwrap().actor_id().clone()
3567 );
3568
3569 use crate::proc_launcher::SystemdProcLauncher;
3571 use crate::systemd::SystemdManagerProxy;
3572 use crate::systemd::SystemdUnitProxy;
3573
3574 let proc_id = proc_mesh.proc_ids().next().expect("one proc");
3575 let expected_unit = SystemdProcLauncher::unit_name(&proc_id);
3576
3577 host_mesh.shutdown(&instance).await.expect("host shutdown");
3586
3587 let conn = zbus::Connection::session().await.expect("D-Bus session");
3593 let manager = SystemdManagerProxy::new(&conn)
3594 .await
3595 .expect("manager proxy");
3596
3597 let mut ok = false;
3598 for _ in 0..100 {
3599 match manager.get_unit(&expected_unit).await {
3600 Err(_) => {
3601 ok = true;
3603 break;
3604 }
3605 Ok(path) => {
3606 if let Ok(unit) = SystemdUnitProxy::builder(&conn)
3607 .path(path)
3608 .unwrap()
3609 .build()
3610 .await
3611 {
3612 let active = unit.active_state().await.unwrap_or_default();
3613 let sub = unit.sub_state().await.unwrap_or_default();
3614 if !(active == "active" && sub == "running") && active != "activating" {
3617 ok = true;
3618 break;
3619 }
3620 }
3621 }
3622 }
3623 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3624 }
3625 assert!(
3626 ok,
3627 "unit should be gone or quiescent (not running) after shutdown"
3628 );
3629 }
3630
3631 #[tokio::test]
3632 #[cfg(fbcode_build)]
3633 async fn test_host_bootstrap() {
3634 use crate::host_mesh::host_agent::GetLocalProcClient;
3635 use crate::proc_agent::NewClientInstanceClient;
3636
3637 let temp_proc = Proc::local();
3640 let (temp_instance, _) = temp_proc.instance("temp").unwrap();
3641
3642 let handle = host(
3643 ChannelAddr::any(ChannelTransport::Unix),
3644 Some(BootstrapCommand::test()),
3645 None,
3646 false,
3647 )
3648 .await
3649 .unwrap();
3650
3651 let local_proc = handle.0.get_local_proc(&temp_instance).await.unwrap();
3652 let _local_instance = local_proc
3653 .new_client_instance(&temp_instance)
3654 .await
3655 .unwrap();
3656 }
3657
3658 use std::time::Duration;
3664
3665 use crate::proc_launcher::LaunchOptions;
3666 use crate::proc_launcher::LaunchResult;
3667 use crate::proc_launcher::ProcExitKind;
3668 use crate::proc_launcher::ProcExitResult;
3669 use crate::proc_launcher::ProcLauncher;
3670 use crate::proc_launcher::ProcLauncherError;
3671 use crate::proc_launcher::StdioHandling;
3672
3673 #[allow(dead_code)]
3676 struct DummyLauncher {
3677 marker: u64,
3679 }
3680
3681 impl DummyLauncher {
3682 fn new(marker: u64) -> Self {
3683 Self { marker }
3684 }
3685
3686 #[allow(dead_code)]
3687 fn marker(&self) -> u64 {
3688 self.marker
3689 }
3690 }
3691
3692 #[async_trait::async_trait]
3693 impl ProcLauncher for DummyLauncher {
3694 async fn launch(
3695 &self,
3696 _proc_id: &hyperactor_reference::ProcId,
3697 _opts: LaunchOptions,
3698 ) -> Result<LaunchResult, ProcLauncherError> {
3699 let (tx, rx) = tokio::sync::oneshot::channel();
3700 let _ = tx.send(ProcExitResult {
3702 kind: ProcExitKind::Exited { code: 0 },
3703 stderr_tail: Some(vec![]),
3704 });
3705 Ok(LaunchResult {
3706 pid: None,
3707 started_at: std::time::SystemTime::now(),
3708 stdio: StdioHandling::ManagedByLauncher,
3709 exit_rx: rx,
3710 })
3711 }
3712
3713 async fn terminate(
3714 &self,
3715 _proc_id: &hyperactor_reference::ProcId,
3716 _timeout: Duration,
3717 ) -> Result<(), ProcLauncherError> {
3718 Ok(())
3719 }
3720
3721 async fn kill(
3722 &self,
3723 _proc_id: &hyperactor_reference::ProcId,
3724 ) -> Result<(), ProcLauncherError> {
3725 Ok(())
3726 }
3727 }
3728
3729 #[test]
3731 #[cfg(fbcode_build)]
3732 fn test_set_launcher_then_get() {
3733 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3734
3735 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(42));
3736 let custom_ptr = Arc::as_ptr(&custom);
3737
3738 manager.set_launcher(custom).unwrap();
3740
3741 let got = manager.launcher();
3743 let got_ptr = Arc::as_ptr(got);
3744
3745 assert_eq!(
3746 custom_ptr, got_ptr,
3747 "launcher() should return the same Arc that was set"
3748 );
3749 }
3750
3751 #[test]
3754 #[cfg(fbcode_build)]
3755 fn test_get_launcher_then_set_fails() {
3756 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3757
3758 let _ = manager.launcher();
3760
3761 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(99));
3763 let result = manager.set_launcher(custom);
3764
3765 assert!(
3766 result.is_err(),
3767 "set_launcher should fail after launcher() was called"
3768 );
3769
3770 let err = result.unwrap_err();
3772 let err_msg = err.to_string();
3773 assert!(
3774 err_msg.contains("already initialized"),
3775 "error should mention 'already initialized', got: {}",
3776 err_msg
3777 );
3778 }
3779
3780 #[test]
3782 #[cfg(fbcode_build)]
3783 fn test_set_launcher_twice_fails() {
3784 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3785
3786 let first: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(1));
3787 let second: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(2));
3788
3789 manager.set_launcher(first).unwrap();
3791
3792 let result = manager.set_launcher(second);
3794 assert!(result.is_err(), "second set_launcher should fail");
3795
3796 let err = result.unwrap_err();
3798 let err_msg = err.to_string();
3799 assert!(
3800 err_msg.contains("already initialized"),
3801 "error should mention 'already initialized', got: {}",
3802 err_msg
3803 );
3804 }
3805
3806 #[test]
3808 #[cfg(fbcode_build)]
3809 fn test_launcher_initially_empty() {
3810 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3811
3812 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(123));
3816 let result = manager.set_launcher(custom);
3817
3818 assert!(
3819 result.is_ok(),
3820 "set_launcher should succeed on fresh manager"
3821 );
3822 }
3823
3824 #[test]
3826 #[cfg(fbcode_build)]
3827 fn test_launcher_idempotent() {
3828 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3829
3830 let first = manager.launcher();
3832 let second = manager.launcher();
3833
3834 assert!(
3836 Arc::ptr_eq(first, second),
3837 "launcher() should return the same Arc on repeated calls"
3838 );
3839 }
3840}