1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::env::VarError;
18use std::fmt;
19use std::fs::OpenOptions;
20use std::future;
21use std::io;
22use std::io::Write;
23use std::path::Path;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::sync::OnceLock;
29use std::sync::Weak;
30use std::time::Duration;
31use std::time::SystemTime;
32
33use anyhow::Context;
34use async_trait::async_trait;
35use base64::prelude::*;
36use futures::StreamExt;
37use futures::stream;
38use humantime::format_duration;
39use hyperactor::ActorAddr;
40use hyperactor::ActorHandle;
41use hyperactor::ActorRef;
42use hyperactor::Endpoint as _;
43use hyperactor::ProcAddr;
44use hyperactor::channel;
45use hyperactor::channel::ChannelAddr;
46use hyperactor::channel::ChannelError;
47use hyperactor::channel::ChannelTransport;
48use hyperactor::channel::Rx;
49use hyperactor::channel::Tx;
50use hyperactor::context;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::mailbox::MailboxServerHandle;
55use hyperactor::proc::Proc;
56use hyperactor_config::CONFIG;
57use hyperactor_config::ConfigAttr;
58use hyperactor_config::attrs::Attrs;
59use hyperactor_config::attrs::declare_attrs;
60use hyperactor_config::global::override_or_global;
61use serde::Deserialize;
62use serde::Serialize;
63use tempfile::TempDir;
64use tokio::process::Command;
65use tokio::sync::watch;
66use tracing::Instrument;
67use tracing::Level;
68use typeuri::Named;
69
70use crate::config::MESH_PROC_LAUNCHER_KIND;
71use crate::host::BulkTerminate;
72use crate::host::Host;
73use crate::host::HostError;
74use crate::host::ProcHandle;
75use crate::host::ProcManager;
76use crate::host::ReadyError as HostReadyError;
77use crate::host::SingleTerminate;
78use crate::host::TerminateError;
79use crate::host::TerminateSummary;
80use crate::host::WaitError;
81use crate::host_mesh::host_agent::HostAgent;
82use crate::host_mesh::host_agent::HostAgentMode;
83use crate::logging::OutputTarget;
84use crate::logging::StreamFwder;
85use crate::proc_agent::ProcAgent;
86use crate::proc_launcher::LaunchOptions;
87use crate::proc_launcher::NativeProcLauncher;
88use crate::proc_launcher::ProcExitKind;
89use crate::proc_launcher::ProcExitResult;
90use crate::proc_launcher::ProcLauncher;
91use crate::proc_launcher::ProcLauncherError;
92use crate::proc_launcher::StdioHandling;
93#[cfg(target_os = "linux")]
94use crate::proc_launcher::SystemdProcLauncher;
95use crate::proc_launcher::format_process_name;
96use crate::resource;
97
98mod mailbox;
99
100declare_attrs! {
101 @meta(CONFIG = ConfigAttr::new(
117 Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
118 Some("enable_log_forwarding".to_string()),
119 ))
120 pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
121
122 @meta(CONFIG = ConfigAttr::new(
142 Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
143 Some("enable_file_capture".to_string()),
144 ))
145 pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
146
147 @meta(CONFIG = ConfigAttr::new(
151 Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
152 Some("tail_log_lines".to_string()),
153 ))
154 pub attr MESH_TAIL_LOG_LINES: usize = 0;
155
156 @meta(CONFIG = ConfigAttr::new(
163 Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
164 Some("mesh_bootstrap_enable_pdeathsig".to_string()),
165 ))
166 pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
167
168 @meta(CONFIG = ConfigAttr::new(
173 Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
174 Some("mesh_terminate_concurrency".to_string()),
175 ))
176 pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
177
178 @meta(CONFIG = ConfigAttr::new(
182 Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
183 Some("mesh_terminate_timeout".to_string()),
184 ))
185 pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
186}
187
188pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
189
190pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
194
195pub(crate) const BOOTSTRAP_MODE_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_MODE";
196pub(crate) const PROCESS_NAME_ENV: &str = "HYPERACTOR_PROCESS_NAME";
197
198#[macro_export]
199macro_rules! ok {
200 ($expr:expr $(,)?) => {
201 match $expr {
202 Ok(value) => value,
203 Err(e) => return ::anyhow::Error::from(e),
204 }
205 };
206}
207
208pub async fn halt<R>() -> R {
209 future::pending::<()>().await;
210 unreachable!()
211}
212
213pub struct HostShutdownHandle {
223 rx: tokio::sync::oneshot::Receiver<MailboxServerHandle>,
224 exit_on_shutdown: bool,
225}
226
227impl HostShutdownHandle {
228 pub async fn join(self) {
231 match self.rx.await {
232 Ok(mailbox_handle) => {
233 mailbox_handle.stop("host shutting down");
234 let _ = mailbox_handle.await;
235 }
236 Err(_) => {} }
238 if self.exit_on_shutdown {
239 std::process::exit(0);
240 }
241 }
242}
243
244pub async fn host(
256 addr: ChannelAddr,
257 command: Option<BootstrapCommand>,
258 config: Option<Attrs>,
259 exit_on_shutdown: bool,
260 listener: Option<std::net::TcpListener>,
261) -> anyhow::Result<(ActorHandle<HostAgent>, HostShutdownHandle)> {
262 if let Some(attrs) = config {
263 hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
264 tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
265 } else {
266 tracing::debug!("bootstrap: no config snapshot provided (Host)");
267 }
268
269 let command = match command {
270 Some(command) => command,
271 None => BootstrapCommand::current()?,
272 };
273 let manager = BootstrapProcManager::new(command)?;
274
275 let host = Host::new_with_default(manager, addr, None, listener).await?;
276 let addr = host.addr().clone();
277
278 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<MailboxServerHandle>();
283
284 let system_proc = host.system_proc().clone();
285 let host_mesh_agent = system_proc.spawn::<HostAgent>(
286 "host_agent",
287 HostAgent::new(HostAgentMode::Process {
288 host,
289 shutdown_tx: Some(shutdown_tx),
290 }),
291 )?;
292
293 tracing::info!(
294 "serving host at {}, agent: {}",
295 addr,
296 host_mesh_agent.bind::<HostAgent>()
297 );
298
299 Ok((
300 host_mesh_agent,
301 HostShutdownHandle {
302 rx: shutdown_rx,
303 exit_on_shutdown,
304 },
305 ))
306}
307
308#[derive(Clone, Debug, Serialize, Deserialize)]
317pub enum Bootstrap {
318 Proc {
320 proc_id: ProcAddr,
322 backend_addr: ChannelAddr,
325 callback_addr: ChannelAddr,
327 socket_dir_path: PathBuf,
331 config: Option<Attrs>,
336 },
337
338 Host {
341 addr: ChannelAddr,
343 command: Option<BootstrapCommand>,
346 config: Option<Attrs>,
351 exit_on_shutdown: bool,
353 },
354}
355
356impl Bootstrap {
357 #[allow(clippy::result_large_err)]
360 pub(crate) fn to_env_safe_string(&self) -> crate::Result<String> {
361 Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
362 }
363
364 #[allow(clippy::result_large_err)]
366 pub(crate) fn from_env_safe_string(str: &str) -> crate::Result<Self> {
367 let data = BASE64_STANDARD.decode(str)?;
368 let data = std::str::from_utf8(&data)?;
369 Ok(serde_json::from_str(data)?)
370 }
371
372 pub fn get_from_env() -> anyhow::Result<Option<Self>> {
375 match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
376 Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
377 Ok(mode) => Ok(Some(mode)),
378 Err(e) => {
379 Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
380 }
381 },
382 Err(VarError::NotPresent) => Ok(None),
383 Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
384 }
385 }
386
387 pub fn to_env(&self, cmd: &mut Command) {
389 cmd.env(
390 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
391 self.to_env_safe_string().unwrap(),
392 );
393 }
394
395 pub async fn bootstrap(self) -> anyhow::Result<i32> {
399 tracing::info!(
400 "bootstrapping mesh process: {}",
401 serde_json::to_string(&self).unwrap()
402 );
403
404 if Debug::is_active() {
405 let mut buf = Vec::new();
406 writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
407 #[cfg(unix)]
408 writeln!(
409 &mut buf,
410 "\tparent pid: {}",
411 std::os::unix::process::parent_id()
412 )
413 .unwrap();
414 writeln!(
415 &mut buf,
416 "\tconfig: {}",
417 serde_json::to_string(&self).unwrap()
418 )
419 .unwrap();
420 match std::env::current_exe() {
421 Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
422 Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
423 }
424 writeln!(&mut buf, "\targs:").unwrap();
425 for arg in std::env::args() {
426 writeln!(&mut buf, "\t\t{}", arg).unwrap();
427 }
428 writeln!(&mut buf, "\tenv:").unwrap();
429 for (key, val) in std::env::vars() {
430 writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
431 }
432 let _ = Debug.write(&buf);
433 if let Ok(s) = std::str::from_utf8(&buf) {
434 tracing::info!("{}", s);
435 } else {
436 tracing::info!("{:?}", buf);
437 }
438 }
439
440 match self {
441 Bootstrap::Proc {
442 proc_id,
443 backend_addr,
444 callback_addr,
445 socket_dir_path,
446 config,
447 } => {
448 let entered = tracing::span!(
449 Level::INFO,
450 "proc_bootstrap",
451 %proc_id,
452 %backend_addr,
453 %callback_addr,
454 socket_dir_path = %socket_dir_path.display(),
455 )
456 .entered();
457 if let Some(attrs) = config {
458 hyperactor_config::global::set(
459 hyperactor_config::global::Source::ClientOverride,
460 attrs,
461 );
462 tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
463 } else {
464 tracing::debug!("bootstrap: no config snapshot provided (Proc)");
465 }
466
467 if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
468 let _ = install_pdeathsig_kill();
473 } else {
474 eprintln!("(bootstrap) PDEATHSIG disabled via config");
475 }
476
477 let local_addr = proc_id.addr().clone();
478 let (serve_addr, _) = local_proc_addr(&socket_dir_path, proc_id.id())?;
479
480 let proc_sender = mailbox::LocalProcDialer::new(
485 local_addr.clone(),
486 socket_dir_path,
487 MailboxClient::dial(backend_addr)?,
488 );
489
490 let proc = Proc::configured(proc_id.clone(), proc_sender.into_boxed());
491
492 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<i32>();
493 let agent_handle = ProcAgent::boot_v1(proc.clone(), Some(shutdown_tx))
494 .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
495
496 let span = entered.exit();
497
498 let (proc_addr, proc_rx) = channel::serve(serve_addr)?;
501 let mailbox_handle = proc.clone().serve(proc_rx);
502 channel::dial(callback_addr)?
503 .send((proc_addr, agent_handle.bind::<ProcAgent>()))
504 .instrument(span)
505 .await
506 .map_err(ChannelError::from)?;
507
508 let exit_code = shutdown_rx.await.unwrap_or(1);
511 mailbox_handle.stop("process shutting down");
512 let _ = mailbox_handle.await;
513 tracing::info!("bootstrap shutting down with exit code {}", exit_code);
514 Ok(exit_code)
517 }
518 Bootstrap::Host {
519 addr,
520 command,
521 config,
522 exit_on_shutdown,
523 } => {
524 let (_agent_handle, shutdown) =
525 host(addr, command, config, exit_on_shutdown, None).await?;
526 shutdown.join().await;
527 halt().await
528 }
529 }
530 }
531
532 pub async fn bootstrap_or_die(self) -> ! {
535 let exit_code = match self.bootstrap().await {
536 Ok(exit_code) => exit_code,
537 Err(err) => {
538 tracing::error!("failed to bootstrap mesh process: {}", err);
539 1
540 }
541 };
542 std::process::exit(exit_code);
543 }
544}
545
546pub fn install_pdeathsig_kill() -> io::Result<()> {
548 #[cfg(target_os = "linux")]
549 {
550 let ppid_before = unsafe { libc::getppid() };
553
554 let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
557 if rc != 0 {
558 return Err(io::Error::last_os_error());
559 }
560
561 let ppid_after = unsafe { libc::getppid() };
571 if ppid_before != ppid_after {
572 std::process::exit(0);
573 }
574 }
575 Ok(())
576}
577
578#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
593pub enum ProcStatus {
594 Starting,
597 Running { started_at: SystemTime },
600 Ready {
603 started_at: SystemTime,
604 addr: ChannelAddr,
605 agent: ActorRef<ProcAgent>,
606 },
607 Stopping { started_at: SystemTime },
611 Stopped {
614 exit_code: i32,
615 stderr_tail: Vec<String>,
616 },
617 Killed { signal: i32, core_dumped: bool },
620 Failed { reason: String },
624}
625
626impl ProcStatus {
627 #[inline]
631 pub fn is_exit(&self) -> bool {
632 matches!(
633 self,
634 ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
635 )
636 }
637}
638
639impl std::fmt::Display for ProcStatus {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 match self {
642 ProcStatus::Starting => write!(f, "Starting"),
643 ProcStatus::Running { started_at } => {
644 let uptime = started_at
645 .elapsed()
646 .map(|d| format!(" up {}", format_duration(d)))
647 .unwrap_or_default();
648 write!(f, "Running{uptime}")
649 }
650 ProcStatus::Ready {
651 started_at, addr, ..
652 } => {
653 let uptime = started_at
654 .elapsed()
655 .map(|d| format!(" up {}", format_duration(d)))
656 .unwrap_or_default();
657 write!(f, "Ready at {addr}{uptime}")
658 }
659 ProcStatus::Stopping { started_at } => {
660 let uptime = started_at
661 .elapsed()
662 .map(|d| format!(" up {}", format_duration(d)))
663 .unwrap_or_default();
664 write!(f, "Stopping{uptime}")
665 }
666 ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
667 ProcStatus::Killed {
668 signal,
669 core_dumped,
670 } => {
671 if *core_dumped {
672 write!(f, "Killed(sig={signal}, core)")
673 } else {
674 write!(f, "Killed(sig={signal})")
675 }
676 }
677 ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
678 }
679 }
680}
681
682#[derive(Debug, Clone)]
684pub enum ReadyError {
685 Terminal(ProcStatus),
687 ChannelClosed,
689}
690
691impl std::fmt::Display for ReadyError {
692 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
693 match self {
694 ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
695 ReadyError::ChannelClosed => write!(f, "status channel closed"),
696 }
697 }
698}
699impl std::error::Error for ReadyError {}
700
701#[derive(Clone)]
740pub struct BootstrapProcHandle {
741 proc_id: ProcAddr,
743
744 status: Arc<std::sync::Mutex<ProcStatus>>,
750
751 launcher: Weak<dyn ProcLauncher>,
760
761 stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
766
767 stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
770
771 tx: tokio::sync::watch::Sender<ProcStatus>,
776
777 rx: tokio::sync::watch::Receiver<ProcStatus>,
781}
782
783impl fmt::Debug for BootstrapProcHandle {
784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785 let status = self.status.lock().expect("status mutex poisoned").clone();
786 f.debug_struct("BootstrapProcHandle")
787 .field("proc_id", &self.proc_id)
788 .field("status", &status)
789 .field("launcher", &"<dyn ProcLauncher>")
790 .field("tx", &"<watch::Sender>")
791 .field("rx", &"<watch::Receiver>")
792 .finish()
795 }
796}
797
798impl BootstrapProcHandle {
800 pub(crate) fn new(proc_id: ProcAddr, launcher: Weak<dyn ProcLauncher>) -> Self {
811 let (tx, rx) = watch::channel(ProcStatus::Starting);
812 Self {
813 proc_id,
814 status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
815 launcher,
816 stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
817 stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
818 tx,
819 rx,
820 }
821 }
822
823 #[inline]
825 pub fn proc_addr(&self) -> &ProcAddr {
826 &self.proc_id
827 }
828
829 #[inline]
842 pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
843 self.rx.clone()
844 }
845
846 #[inline]
864 pub async fn changed(&self) {
865 let _ = self.watch().changed().await;
866 }
867
868 #[must_use]
881 pub fn status(&self) -> ProcStatus {
882 self.status.lock().expect("status mutex poisoned").clone()
886 }
887
888 fn transition<F>(&self, f: F) -> bool
894 where
895 F: FnOnce(&mut ProcStatus) -> bool,
896 {
897 let mut guard = self.status.lock().expect("status mutex poisoned");
898 let _before = guard.clone();
899 let changed = f(&mut guard);
900 if changed {
901 let _ = self.tx.send(guard.clone());
903 }
904 changed
905 }
906
907 pub(crate) fn mark_running(&self, started_at: SystemTime) -> bool {
917 self.transition(|st| match *st {
918 ProcStatus::Starting => {
919 *st = ProcStatus::Running { started_at };
920 true
921 }
922 _ => {
923 tracing::warn!(
924 "illegal transition: {:?} -> Running; leaving status unchanged",
925 *st
926 );
927 false
928 }
929 })
930 }
931
932 pub(crate) fn mark_ready(&self, addr: ChannelAddr, agent: ActorRef<ProcAgent>) -> bool {
944 tracing::info!(proc_id = %self.proc_id, %addr, "{} ready at {}", self.proc_id, addr);
945 self.transition(|st| match st {
946 ProcStatus::Starting => {
947 *st = ProcStatus::Ready {
950 started_at: std::time::SystemTime::now(),
951 addr,
952 agent,
953 };
954 true
955 }
956 ProcStatus::Running { started_at } => {
957 let started_at = *started_at;
958 *st = ProcStatus::Ready {
959 started_at,
960 addr,
961 agent,
962 };
963 true
964 }
965 _ => {
966 tracing::warn!(
967 "illegal transition: {:?} -> Ready; leaving status unchanged",
968 st
969 );
970 false
971 }
972 })
973 }
974
975 pub(crate) fn mark_stopping(&self) -> bool {
979 let now = std::time::SystemTime::now();
980
981 self.transition(|st| match *st {
982 ProcStatus::Running { started_at } => {
983 *st = ProcStatus::Stopping { started_at };
984 true
985 }
986 ProcStatus::Ready { started_at, .. } => {
987 *st = ProcStatus::Stopping { started_at };
988 true
989 }
990 ProcStatus::Starting => {
991 *st = ProcStatus::Stopping { started_at: now };
992 true
993 }
994 _ => false,
995 })
996 }
997
998 pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1001 self.transition(|st| match *st {
1002 ProcStatus::Starting
1003 | ProcStatus::Running { .. }
1004 | ProcStatus::Ready { .. }
1005 | ProcStatus::Stopping { .. } => {
1006 *st = ProcStatus::Stopped {
1007 exit_code,
1008 stderr_tail,
1009 };
1010 true
1011 }
1012 _ => {
1013 tracing::warn!(
1014 "illegal transition: {:?} -> Stopped; leaving status unchanged",
1015 *st
1016 );
1017 false
1018 }
1019 })
1020 }
1021
1022 pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1025 self.transition(|st| match *st {
1026 ProcStatus::Starting
1027 | ProcStatus::Running { .. }
1028 | ProcStatus::Ready { .. }
1029 | ProcStatus::Stopping { .. } => {
1030 *st = ProcStatus::Killed {
1031 signal,
1032 core_dumped,
1033 };
1034 true
1035 }
1036 _ => {
1037 tracing::warn!(
1038 "illegal transition: {:?} -> Killed; leaving status unchanged",
1039 *st
1040 );
1041 false
1042 }
1043 })
1044 }
1045
1046 pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1049 self.transition(|st| match *st {
1050 ProcStatus::Starting
1051 | ProcStatus::Running { .. }
1052 | ProcStatus::Ready { .. }
1053 | ProcStatus::Stopping { .. } => {
1054 *st = ProcStatus::Failed {
1055 reason: reason.into(),
1056 };
1057 true
1058 }
1059 _ => {
1060 tracing::warn!(
1061 "illegal transition: {:?} -> Failed; leaving status unchanged",
1062 *st
1063 );
1064 false
1065 }
1066 })
1067 }
1068
1069 #[must_use]
1088 pub async fn wait_inner(&self) -> ProcStatus {
1089 let mut rx = self.watch();
1090 loop {
1091 let st = rx.borrow().clone();
1092 if st.is_exit() {
1093 return st;
1094 }
1095 if rx.changed().await.is_err() {
1097 return st;
1098 }
1099 }
1100 }
1101
1102 pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1121 let mut rx = self.watch();
1122 loop {
1123 let st = rx.borrow().clone();
1124 match &st {
1125 ProcStatus::Ready { .. } => return Ok(()),
1126 s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1127 _non_terminal => {
1128 if rx.changed().await.is_err() {
1129 return Err(ReadyError::ChannelClosed);
1130 }
1131 }
1132 }
1133 }
1134 }
1135
1136 pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1137 *self
1138 .stdout_fwder
1139 .lock()
1140 .expect("stdout_tailer mutex poisoned") = out;
1141 *self
1142 .stderr_fwder
1143 .lock()
1144 .expect("stderr_tailer mutex poisoned") = err;
1145 }
1146
1147 fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1148 let out = self
1149 .stdout_fwder
1150 .lock()
1151 .expect("stdout_tailer mutex poisoned")
1152 .take();
1153 let err = self
1154 .stderr_fwder
1155 .lock()
1156 .expect("stderr_tailer mutex poisoned")
1157 .take();
1158 (out, err)
1159 }
1160
1161 pub(crate) async fn wait_or_brutally_kill(&self, timeout: Duration) {
1168 match tokio::time::timeout(timeout, self.wait_inner()).await {
1169 Ok(st) if st.is_exit() => return,
1170 _ => {}
1171 }
1172
1173 let _ = self.mark_stopping();
1174
1175 if let Some(launcher) = self.launcher.upgrade() {
1176 let ref_proc_id: ProcAddr = self.proc_id.clone();
1177 if let Err(e) = launcher.terminate(&ref_proc_id, timeout).await {
1178 tracing::warn!(
1179 proc_id = %self.proc_id,
1180 error = %e,
1181 "wait_or_brutally_kill: launcher terminate failed, trying kill"
1182 );
1183 let _ = launcher.kill(&ref_proc_id).await;
1184 }
1185 }
1186
1187 let _ = self.wait_inner().await;
1188 }
1189
1190 async fn send_stop_all(
1194 &self,
1195 cx: &impl context::Actor,
1196 agent: ActorRef<ProcAgent>,
1197 timeout: Duration,
1198 reason: &str,
1199 ) -> anyhow::Result<ProcStatus> {
1200 let mut agent_port = agent.port();
1207 agent_port.return_undeliverable(false);
1208 agent_port.post(
1209 cx,
1210 resource::StopAll {
1211 reason: reason.to_string(),
1212 },
1213 );
1214 match tokio::time::timeout(timeout, self.wait()).await {
1217 Ok(Ok(st)) => Ok(st),
1218 Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1219 Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1220 }
1221 }
1222}
1223
1224#[async_trait]
1225impl ProcHandle for BootstrapProcHandle {
1226 type Agent = ProcAgent;
1227 type TerminalStatus = ProcStatus;
1228
1229 #[inline]
1230 fn proc_addr(&self) -> &ProcAddr {
1231 &self.proc_id
1232 }
1233
1234 #[inline]
1235 fn addr(&self) -> Option<ChannelAddr> {
1236 match &*self.status.lock().expect("status mutex poisoned") {
1237 ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1238 _ => None,
1239 }
1240 }
1241
1242 #[inline]
1243 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1244 match &*self.status.lock().expect("status mutex poisoned") {
1245 ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1246 _ => None,
1247 }
1248 }
1249
1250 async fn ready(&self) -> Result<(), HostReadyError<Self::TerminalStatus>> {
1261 match self.ready_inner().await {
1262 Ok(()) => Ok(()),
1263 Err(ReadyError::Terminal(status)) => Err(HostReadyError::Terminal(status)),
1264 Err(ReadyError::ChannelClosed) => Err(HostReadyError::ChannelClosed),
1265 }
1266 }
1267
1268 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1276 let status = self.wait_inner().await;
1277 if status.is_exit() {
1278 Ok(status)
1279 } else {
1280 Err(WaitError::ChannelClosed)
1281 }
1282 }
1283
1284 async fn terminate(
1305 &self,
1306 cx: &impl context::Actor,
1307 timeout: Duration,
1308 reason: &str,
1309 ) -> Result<ProcStatus, TerminateError<Self::TerminalStatus>> {
1310 let st0 = self.status();
1312 if st0.is_exit() {
1313 tracing::debug!(?st0, "terminate(): already terminal");
1314 return Err(TerminateError::AlreadyTerminated(st0));
1315 }
1316
1317 let agent = self.agent_ref();
1320 if let Some(agent) = agent {
1321 match self.send_stop_all(cx, agent.clone(), timeout, reason).await {
1322 Ok(st) => return Ok(st),
1323 Err(e) => {
1324 tracing::warn!(
1326 "ProcAgent {} could not successfully stop all actors: {}",
1327 agent.actor_addr(),
1328 e,
1329 );
1330 }
1331 }
1332 }
1333
1334 let _ = self.mark_stopping();
1336
1337 tracing::info!(proc_id = %self.proc_id, ?timeout, "terminate(): delegating to launcher");
1339 let ref_proc_id: ProcAddr = self.proc_id.clone();
1340 if let Some(launcher) = self.launcher.upgrade() {
1341 if let Err(e) = launcher.terminate(&ref_proc_id, timeout).await {
1342 tracing::warn!(proc_id = %self.proc_id, error=%e, "terminate(): launcher termination failed");
1343 return Err(TerminateError::Io(anyhow::anyhow!(
1344 "launcher termination failed: {}",
1345 e
1346 )));
1347 }
1348 } else {
1349 tracing::debug!(proc_id = %self.proc_id, "terminate(): launcher gone, proc cleanup in progress");
1351 }
1352
1353 let st = self.wait_inner().await;
1355 if st.is_exit() {
1356 tracing::info!(proc_id = %self.proc_id, ?st, "terminate(): exited");
1357 Ok(st)
1358 } else {
1359 Err(TerminateError::ChannelClosed)
1360 }
1361 }
1362
1363 async fn kill(&self) -> Result<ProcStatus, TerminateError<Self::TerminalStatus>> {
1381 let st0 = self.status();
1383 if st0.is_exit() {
1384 return Err(TerminateError::AlreadyTerminated(st0));
1385 }
1386
1387 tracing::info!(proc_id = %self.proc_id, "kill(): delegating to launcher");
1389 let ref_proc_id: ProcAddr = self.proc_id.clone();
1390 if let Some(launcher) = self.launcher.upgrade() {
1391 if let Err(e) = launcher.kill(&ref_proc_id).await {
1392 tracing::warn!(proc_id = %self.proc_id, error=%e, "kill(): launcher kill failed");
1393 return Err(TerminateError::Io(anyhow::anyhow!(
1394 "launcher kill failed: {}",
1395 e
1396 )));
1397 }
1398 } else {
1399 tracing::debug!(proc_id = %self.proc_id, "kill(): launcher gone, proc cleanup in progress");
1401 }
1402
1403 let st = self.wait_inner().await;
1405 if st.is_exit() {
1406 Ok(st)
1407 } else {
1408 Err(TerminateError::ChannelClosed)
1409 }
1410 }
1411}
1412
1413#[derive(Debug, Named, Serialize, Deserialize, Clone, Default)]
1415pub struct BootstrapCommand {
1416 pub program: PathBuf,
1417 pub arg0: Option<String>,
1418 pub args: Vec<String>,
1419 pub env: HashMap<String, String>,
1420}
1421wirevalue::register_type!(BootstrapCommand);
1422
1423impl std::hash::Hash for BootstrapCommand {
1424 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1425 self.program.hash(state);
1426 self.arg0.hash(state);
1427 self.args.hash(state);
1428 let mut pairs: Vec<_> = self.env.iter().collect();
1429 pairs.sort();
1430 pairs.hash(state);
1431 }
1432}
1433
1434impl PartialEq for BootstrapCommand {
1435 fn eq(&self, other: &Self) -> bool {
1436 self.program == other.program
1437 && self.arg0 == other.arg0
1438 && self.args == other.args
1439 && self.env == other.env
1440 }
1441}
1442
1443impl Eq for BootstrapCommand {}
1444
1445impl BootstrapCommand {
1446 pub fn current() -> io::Result<Self> {
1449 let mut args: VecDeque<String> = std::env::args().collect();
1450 let arg0 = args.pop_front();
1451
1452 Ok(Self {
1453 program: std::env::current_exe()?,
1454 arg0,
1455 args: args.into(),
1456 env: std::env::vars().collect(),
1457 })
1458 }
1459
1460 pub fn new(&self) -> Command {
1463 let mut cmd = Command::new(&self.program);
1464 if let Some(arg0) = &self.arg0 {
1465 cmd.arg0(arg0);
1466 }
1467 for arg in &self.args {
1468 cmd.arg(arg);
1469 }
1470 for (k, v) in &self.env {
1471 cmd.env(k, v);
1472 }
1473 cmd
1474 }
1475
1476 #[cfg(test)]
1483 #[cfg(fbcode_build)]
1484 pub(crate) fn test() -> Self {
1485 Self {
1486 program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1487 arg0: None,
1488 args: vec![],
1489 env: HashMap::new(),
1490 }
1491 }
1492}
1493
1494impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1495 fn from(s: T) -> Self {
1497 Self {
1498 program: s.into(),
1499 arg0: None,
1500 args: vec![],
1501 env: HashMap::new(),
1502 }
1503 }
1504}
1505
1506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1529pub(crate) enum LauncherKind {
1530 Native,
1533 #[cfg(target_os = "linux")]
1536 Systemd,
1537}
1538
1539impl FromStr for LauncherKind {
1540 type Err = io::Error;
1541
1542 fn from_str(s: &str) -> Result<Self, Self::Err> {
1551 match s.trim().to_ascii_lowercase().as_str() {
1552 "" | "native" => Ok(Self::Native),
1553 #[cfg(target_os = "linux")]
1554 "systemd" => Ok(Self::Systemd),
1555 other => Err(io::Error::new(
1556 io::ErrorKind::InvalidInput,
1557 format!(
1558 "unknown proc launcher kind {other:?}; expected 'native'{}",
1559 if cfg!(target_os = "linux") {
1560 " or 'systemd'"
1561 } else {
1562 ""
1563 }
1564 ),
1565 )),
1566 }
1567 }
1568}
1569
1570pub struct BootstrapProcManager {
1599 launcher: OnceLock<Arc<dyn ProcLauncher>>,
1602
1603 command: BootstrapCommand,
1605
1606 children: Arc<tokio::sync::Mutex<HashMap<ProcAddr, BootstrapProcHandle>>>,
1610
1611 file_appender: Option<Arc<crate::logging::FileAppender>>,
1614
1615 socket_dir: TempDir,
1619}
1620
1621impl BootstrapProcManager {
1622 pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1629 let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1630 match crate::logging::FileAppender::new() {
1631 Some(fm) => {
1632 tracing::info!("file appender created successfully");
1633 Some(Arc::new(fm))
1634 }
1635 None => {
1636 tracing::warn!("failed to create file appender");
1637 None
1638 }
1639 }
1640 } else {
1641 None
1642 };
1643
1644 Ok(Self {
1645 launcher: OnceLock::new(),
1646 command,
1647 children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1648 file_appender,
1649 socket_dir: runtime_dir()?,
1650 })
1651 }
1652
1653 pub fn set_launcher(&self, launcher: Arc<dyn ProcLauncher>) -> Result<(), ProcLauncherError> {
1662 self.launcher.set(launcher).map_err(|_| {
1663 ProcLauncherError::Other(
1664 "launcher already initialized; call set_proc_launcher before first spawn".into(),
1665 )
1666 })
1667 }
1668
1669 pub fn launcher(&self) -> &Arc<dyn ProcLauncher> {
1675 self.launcher.get_or_init(|| {
1676 let kind_str = hyperactor_config::global::get_cloned(MESH_PROC_LAUNCHER_KIND);
1677 let kind: LauncherKind = kind_str.parse().unwrap_or(LauncherKind::Native);
1678 tracing::info!(kind = ?kind, config_value = %kind_str, "using default proc launcher");
1679 match kind {
1680 LauncherKind::Native => Arc::new(NativeProcLauncher::new()),
1681 #[cfg(target_os = "linux")]
1682 LauncherKind::Systemd => Arc::new(SystemdProcLauncher::new()),
1683 }
1684 })
1685 }
1686
1687 pub fn command(&self) -> &BootstrapCommand {
1689 &self.command
1690 }
1691
1692 pub fn socket_dir(&self) -> &Path {
1694 self.socket_dir.path()
1695 }
1696
1697 pub async fn status(&self, proc_id: &ProcAddr) -> Option<ProcStatus> {
1708 self.children.lock().await.get(proc_id).map(|h| h.status())
1709 }
1710
1711 pub async fn watch(
1714 &self,
1715 proc_id: &ProcAddr,
1716 ) -> Option<tokio::sync::watch::Receiver<ProcStatus>> {
1717 self.children.lock().await.get(proc_id).map(|h| h.watch())
1718 }
1719
1720 pub(crate) async fn request_stop(
1727 &self,
1728 cx: &impl context::Actor,
1729 proc: &ProcAddr,
1730 timeout: Duration,
1731 reason: &str,
1732 ) {
1733 let handle = {
1734 let guard = self.children.lock().await;
1735 guard.get(proc).cloned()
1736 };
1737
1738 let Some(handle) = handle else { return };
1739
1740 let status = handle.status();
1741 if status.is_exit() || matches!(status, ProcStatus::Stopping { .. }) {
1742 return;
1743 }
1744
1745 if let Some(agent) = handle.agent_ref() {
1746 let mut agent_port = agent.port();
1747 agent_port.return_undeliverable(false);
1748 let _ = agent_port.post(
1749 cx,
1750 resource::StopAll {
1751 reason: reason.to_string(),
1752 },
1753 );
1754 }
1755
1756 let _ = handle.mark_stopping();
1757 tokio::spawn(async move {
1758 handle.wait_or_brutally_kill(timeout).await;
1759 });
1760 }
1761
1762 fn spawn_exit_monitor(
1763 &self,
1764 proc_id: ProcAddr,
1765 handle: BootstrapProcHandle,
1766 exit_rx: tokio::sync::oneshot::Receiver<ProcExitResult>,
1767 ) {
1768 tokio::spawn(async move {
1769 let exit_result = match exit_rx.await {
1771 Ok(res) => res,
1772 Err(_) => {
1773 let _ = handle.mark_failed("exit_rx sender dropped unexpectedly");
1775 tracing::error!(
1776 name = "ProcStatus",
1777 status = "Exited::ChannelDropped",
1778 %proc_id,
1779 "exit channel closed without result"
1780 );
1781 return;
1782 }
1783 };
1784
1785 let mut stderr_tail: Vec<String> = Vec::new();
1789 let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1790
1791 if let Some(t) = stderr_mon {
1792 let (lines, _bytes) = t.abort().await;
1793 stderr_tail = lines;
1794 }
1795 if let Some(t) = stdout_mon {
1796 let (_lines, _bytes) = t.abort().await;
1797 }
1798
1799 if stderr_tail.is_empty()
1801 && let Some(tail) = exit_result.stderr_tail
1802 {
1803 stderr_tail = tail;
1804 }
1805
1806 let tail_str = if stderr_tail.is_empty() {
1807 None
1808 } else {
1809 Some(stderr_tail.join("\n"))
1810 };
1811
1812 match exit_result.kind {
1813 ProcExitKind::Exited { code } => {
1814 let _ = handle.mark_stopped(code, stderr_tail);
1815 tracing::info!(
1816 name = "ProcStatus",
1817 status = "Exited::ExitWithCode",
1818 %proc_id,
1819 exit_code = code,
1820 tail = tail_str,
1821 "proc exited with code {code}"
1822 );
1823 }
1824 ProcExitKind::Signaled {
1825 signal,
1826 core_dumped,
1827 } => {
1828 let _ = handle.mark_killed(signal, core_dumped);
1829 tracing::info!(
1830 name = "ProcStatus",
1831 status = "Exited::KilledBySignal",
1832 %proc_id,
1833 tail = tail_str,
1834 "killed by signal {signal}"
1835 );
1836 }
1837 ProcExitKind::Failed { reason } => {
1838 let _ = handle.mark_failed(&reason);
1839 tracing::info!(
1840 name = "ProcStatus",
1841 status = "Exited::Failed",
1842 %proc_id,
1843 tail = tail_str,
1844 "proc failed: {reason}"
1845 );
1846 }
1847 }
1848 });
1849 }
1850}
1851
1852pub use crate::proc_launcher::ProcBind;
1853
1854pub struct BootstrapProcConfig {
1856 pub create_rank: usize,
1858
1859 pub client_config_override: Attrs,
1862
1863 pub proc_bind: Option<ProcBind>,
1867 pub bootstrap_command: Option<BootstrapCommand>,
1870}
1871
1872#[async_trait]
1873impl ProcManager for BootstrapProcManager {
1874 type Handle = BootstrapProcHandle;
1875
1876 type Config = BootstrapProcConfig;
1877
1878 fn transport(&self) -> ChannelTransport {
1885 ChannelTransport::Unix
1886 }
1887
1888 #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
1915 async fn spawn(
1916 &self,
1917 proc_id: ProcAddr,
1918 backend_addr: ChannelAddr,
1919 config: BootstrapProcConfig,
1920 ) -> Result<Self::Handle, HostError> {
1921 let (callback_addr, mut callback_rx) = channel::serve::<(ChannelAddr, ActorRef<ProcAgent>)>(
1922 ChannelAddr::any(ChannelTransport::Unix),
1923 )?;
1924
1925 let overrides = &config.client_config_override;
1927 let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1928 let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1929 let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1930 let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1931
1932 let mode = Bootstrap::Proc {
1933 proc_id: proc_id.clone(),
1934 backend_addr,
1935 callback_addr,
1936 socket_dir_path: self.socket_dir.path().to_owned(),
1937 config: Some(config.client_config_override.clone()),
1938 };
1939
1940 let bootstrap_payload = mode
1942 .to_env_safe_string()
1943 .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?;
1944
1945 let opts = LaunchOptions {
1946 bootstrap_payload,
1947 process_name: format_process_name(&proc_id.clone()),
1948 command: config
1949 .bootstrap_command
1950 .as_ref()
1951 .unwrap_or(&self.command)
1952 .clone(),
1953 want_stdio: need_stdio,
1954 tail_lines: tail_size,
1955 log_channel: if enable_forwarding {
1956 Some(ChannelAddr::any(ChannelTransport::Unix))
1957 } else {
1958 None
1959 },
1960 proc_bind: config.proc_bind.clone(),
1961 };
1962
1963 tracing::info!(proc_id = %proc_id, "launching proc with opts={opts:?}");
1965 let ref_proc_id: ProcAddr = proc_id.clone();
1966 let launch_result = self
1967 .launcher()
1968 .launch(&ref_proc_id, opts.clone())
1969 .await
1970 .map_err(|e| {
1971 let io_err = match e {
1972 ProcLauncherError::Launch(io_err) => io_err,
1973 other => std::io::Error::other(other.to_string()),
1974 };
1975 HostError::ProcessSpawnFailure(
1976 proc_id.clone(),
1977 format!("{:?}", opts.command),
1978 io_err,
1979 )
1980 })?;
1981
1982 let (out_fwder, err_fwder) = match launch_result.stdio {
1984 StdioHandling::Captured { stdout, stderr } => {
1985 let (file_stdout, file_stderr) = if enable_file_capture {
1986 match self.file_appender.as_deref() {
1987 Some(fm) => (
1988 Some(fm.addr_for(OutputTarget::Stdout)),
1989 Some(fm.addr_for(OutputTarget::Stderr)),
1990 ),
1991 None => {
1992 tracing::warn!("enable_file_capture=true but no FileAppender");
1993 (None, None)
1994 }
1995 }
1996 } else {
1997 (None, None)
1998 };
1999
2000 let out = StreamFwder::start(
2001 stdout,
2002 file_stdout,
2003 OutputTarget::Stdout,
2004 tail_size,
2005 opts.log_channel.clone(),
2006 &ref_proc_id,
2007 config.create_rank,
2008 );
2009 let err = StreamFwder::start(
2010 stderr,
2011 file_stderr,
2012 OutputTarget::Stderr,
2013 tail_size,
2014 opts.log_channel.clone(),
2015 &ref_proc_id,
2016 config.create_rank,
2017 );
2018 (Some(out), Some(err))
2019 }
2020 StdioHandling::Inherited | StdioHandling::ManagedByLauncher => {
2021 if !need_stdio {
2022 tracing::info!(
2023 %proc_id, enable_forwarding, enable_file_capture, tail_size,
2024 "child stdio NOT captured (forwarding/file_capture/tail all disabled)"
2025 );
2026 }
2027 (None, None)
2028 }
2029 };
2030
2031 let handle = BootstrapProcHandle::new(proc_id.clone(), Arc::downgrade(self.launcher()));
2033 handle.mark_running(launch_result.started_at);
2034 handle.set_stream_monitors(out_fwder, err_fwder);
2035
2036 {
2038 let mut children = self.children.lock().await;
2039 children.insert(proc_id.clone(), handle.clone());
2040 }
2041
2042 self.spawn_exit_monitor(proc_id.clone(), handle.clone(), launch_result.exit_rx);
2045
2046 let h = handle.clone();
2048 tokio::spawn(async move {
2049 match callback_rx.recv().await {
2050 Ok((addr, agent)) => {
2051 let _ = h.mark_ready(addr, agent);
2052 }
2053 Err(e) => {
2054 let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2056 }
2057 }
2058 });
2059
2060 Ok(handle)
2062 }
2063}
2064
2065#[async_trait]
2066impl SingleTerminate for BootstrapProcManager {
2067 async fn terminate_proc(
2077 &self,
2078 cx: &impl context::Actor,
2079 proc: &ProcAddr,
2080 timeout: Duration,
2081 reason: &str,
2082 ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
2083 let proc_handle: Option<BootstrapProcHandle> = {
2085 let mut guard = self.children.lock().await;
2086 guard.remove(proc)
2087 };
2088
2089 if let Some(h) = proc_handle {
2090 h.terminate(cx, timeout, reason)
2091 .await
2092 .map(|_| (Vec::new(), Vec::new()))
2093 .map_err(|e| e.into())
2094 } else {
2095 Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2096 }
2097 }
2098}
2099
2100#[async_trait]
2101impl BulkTerminate for BootstrapProcManager {
2102 async fn terminate_all(
2116 &self,
2117 cx: &impl context::Actor,
2118 timeout: Duration,
2119 max_in_flight: usize,
2120 reason: &str,
2121 ) -> TerminateSummary {
2122 let handles: Vec<BootstrapProcHandle> = {
2125 let mut guard = self.children.lock().await;
2126 guard.drain().map(|(_, v)| v).collect()
2127 };
2128
2129 let attempted = handles.len();
2130 let mut ok = 0usize;
2131
2132 let results = stream::iter(handles.into_iter().map(|h| async move {
2133 match h.terminate(cx, timeout, reason).await {
2134 Ok(_) | Err(TerminateError::AlreadyTerminated(_)) => {
2135 true
2137 }
2138 Err(e) => {
2139 tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2140 false
2141 }
2142 }
2143 }))
2144 .buffer_unordered(max_in_flight.max(1))
2145 .collect::<Vec<bool>>()
2146 .await;
2147
2148 for r in results {
2149 if r {
2150 ok += 1;
2151 }
2152 }
2153
2154 TerminateSummary {
2155 attempted,
2156 ok,
2157 failed: attempted.saturating_sub(ok),
2158 }
2159 }
2160}
2161
2162pub async fn bootstrap() -> anyhow::Result<i32> {
2179 let Some(boot) = Bootstrap::get_from_env()? else {
2180 anyhow::bail!(
2181 "bootstrap: no bootstrap mode configured (HYPERACTOR_MESH_BOOTSTRAP_MODE unset)"
2182 );
2183 };
2184 boot.bootstrap().await
2185}
2186
2187pub async fn bootstrap_or_die() -> ! {
2190 match bootstrap().await {
2191 Ok(exit_code) => std::process::exit(exit_code),
2192 Err(err) => {
2193 let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2194 tracing::error!("failed to bootstrap mesh process: {}", err);
2195 std::process::exit(1);
2196 }
2197 }
2198}
2199
2200#[derive(enum_as_inner::EnumAsInner)]
2201enum DebugSink {
2202 File(std::fs::File),
2203 Sink,
2204}
2205
2206impl DebugSink {
2207 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2208 match self {
2209 DebugSink::File(f) => f.write(buf),
2210 DebugSink::Sink => Ok(buf.len()),
2211 }
2212 }
2213 fn flush(&mut self) -> io::Result<()> {
2214 match self {
2215 DebugSink::File(f) => f.flush(),
2216 DebugSink::Sink => Ok(()),
2217 }
2218 }
2219}
2220
2221fn debug_sink() -> &'static Mutex<DebugSink> {
2222 static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2223 DEBUG_SINK.get_or_init(|| {
2224 let debug_path = {
2225 let mut p = std::env::temp_dir();
2226 if let Ok(user) = std::env::var("USER") {
2227 p.push(user);
2228 }
2229 std::fs::create_dir_all(&p).ok();
2230 p.push("monarch-bootstrap-debug.log");
2231 p
2232 };
2233 let sink = if debug_path.exists() {
2234 match OpenOptions::new()
2235 .append(true)
2236 .create(true)
2237 .open(debug_path.clone())
2238 {
2239 Ok(f) => DebugSink::File(f),
2240 Err(_e) => {
2241 eprintln!(
2242 "failed to open {} for bootstrap debug logging",
2243 debug_path.display()
2244 );
2245 DebugSink::Sink
2246 }
2247 }
2248 } else {
2249 DebugSink::Sink
2250 };
2251 Mutex::new(sink)
2252 })
2253}
2254
2255const DEBUG_TO_STDERR: bool = false;
2257
2258struct Debug;
2261
2262impl Debug {
2263 fn is_active() -> bool {
2264 DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2265 }
2266}
2267
2268impl Write for Debug {
2269 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2270 let res = debug_sink().lock().unwrap().write(buf);
2271 if DEBUG_TO_STDERR {
2272 let n = match res {
2273 Ok(n) => n,
2274 Err(_) => buf.len(),
2275 };
2276 let _ = io::stderr().write_all(&buf[..n]);
2277 }
2278
2279 res
2280 }
2281 fn flush(&mut self) -> io::Result<()> {
2282 let res = debug_sink().lock().unwrap().flush();
2283 if DEBUG_TO_STDERR {
2284 let _ = io::stderr().flush();
2285 }
2286 res
2287 }
2288}
2289
2290pub(crate) fn local_proc_addr(
2292 socket_dir: &Path,
2293 proc_id: &hyperactor::id::ProcId,
2294) -> anyhow::Result<(ChannelAddr, PathBuf)> {
2295 let path = proc_id.to_path_elem(socket_dir);
2296 let addr = std::os::unix::net::SocketAddr::from_pathname(path.clone())
2297 .with_context(|| {
2298 format!(
2299 "constructing unix socket address for proc {proc_id} \
2300 at {} ({} bytes); path must fit within SUN_LEN \
2301 (108 on Linux, 104 on macOS)",
2302 path.display(),
2303 path.as_os_str().len()
2304 )
2305 })?
2306 .into();
2307 Ok((addr, path))
2308}
2309
2310fn runtime_dir() -> io::Result<TempDir> {
2314 if let Some(runtime_dir) = std::env::var_os("XDG_RUNTIME_DIR") {
2315 let path = PathBuf::from(runtime_dir);
2316 if path.is_dir() {
2317 return tempfile::tempdir_in(path);
2318 }
2319 }
2320 tempfile::tempdir()
2321}
2322
2323#[cfg(test)]
2324mod tests {
2325 use std::path::PathBuf;
2326
2327 use hyperactor::RemoteSpawn;
2328 use hyperactor::channel::ChannelAddr;
2329 use hyperactor::channel::ChannelTransport;
2330 use hyperactor::channel::TcpMode;
2331 use hyperactor::testing::ids::test_proc_id;
2332 use hyperactor::testing::ids::test_proc_id_with_addr;
2333 use hyperactor_config::Flattrs;
2334
2335 use super::*;
2336
2337 #[test]
2338 fn test_bootstrap_mode_env_string_none_config_proc() {
2339 let value = Bootstrap::Proc {
2340 proc_id: test_proc_id("foo_0"),
2341 backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2342 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2343 socket_dir_path: PathBuf::from("notexist"),
2344 config: None,
2345 };
2346
2347 let safe = value.to_env_safe_string().unwrap();
2348 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2349
2350 let safe2 = round.to_env_safe_string().unwrap();
2353 assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2354
2355 match round {
2357 Bootstrap::Proc { config: None, .. } => {}
2358 other => panic!("expected Proc with None config, got {:?}", other),
2359 }
2360 }
2361
2362 #[test]
2363 fn test_bootstrap_mode_env_string_none_config_host() {
2364 let value = Bootstrap::Host {
2365 addr: ChannelAddr::any(ChannelTransport::Unix),
2366 command: None,
2367 config: None,
2368 exit_on_shutdown: false,
2369 };
2370
2371 let safe = value.to_env_safe_string().unwrap();
2372 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2373
2374 let safe2 = round.to_env_safe_string().unwrap();
2376 assert_eq!(safe, safe2);
2377
2378 match round {
2380 Bootstrap::Host { config: None, .. } => {}
2381 other => panic!("expected Host with None config, got {:?}", other),
2382 }
2383 }
2384
2385 #[test]
2386 fn test_bootstrap_mode_env_string_invalid() {
2387 assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2389 }
2390
2391 #[test]
2392 fn test_bootstrap_config_snapshot_roundtrip() {
2393 let mut attrs = Attrs::new();
2395 attrs[MESH_TAIL_LOG_LINES] = 123;
2396 attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2397
2398 let socket_dir = runtime_dir().unwrap();
2399
2400 {
2402 let original = Bootstrap::Proc {
2403 proc_id: test_proc_id("foo_42"),
2404 backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2405 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2406 config: Some(attrs.clone()),
2407 socket_dir_path: socket_dir.path().to_owned(),
2408 };
2409 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2410 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2411 match &decoded {
2412 Bootstrap::Proc { config, .. } => {
2413 let cfg = config.as_ref().expect("expected Some(attrs)");
2414 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2415 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2416 }
2417 other => panic!("unexpected variant after roundtrip: {:?}", other),
2418 }
2419 }
2420
2421 {
2423 let original = Bootstrap::Host {
2424 addr: ChannelAddr::any(ChannelTransport::Unix),
2425 command: None,
2426 config: Some(attrs.clone()),
2427 exit_on_shutdown: false,
2428 };
2429 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2430 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2431 match &decoded {
2432 Bootstrap::Host { config, .. } => {
2433 let cfg = config.as_ref().expect("expected Some(attrs)");
2434 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2435 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2436 }
2437 other => panic!("unexpected variant after roundtrip: {:?}", other),
2438 }
2439 }
2440 }
2441
2442 #[tokio::test]
2443 async fn test_v1_child_logging() {
2444 use hyperactor::channel;
2445 use hyperactor::mailbox::BoxedMailboxSender;
2446 use hyperactor::mailbox::DialMailboxRouter;
2447 use hyperactor::mailbox::MailboxServer;
2448 use hyperactor::proc::Proc;
2449
2450 use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2451 use crate::logging::LogClientActor;
2452 use crate::logging::LogClientMessageClient;
2453 use crate::logging::LogForwardActor;
2454 use crate::logging::LogMessage;
2455 use crate::logging::OutputTarget;
2456 use crate::logging::test_tap;
2457
2458 let router = DialMailboxRouter::new();
2459 let (proc_addr, proc_rx) =
2460 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2461 let proc = Proc::configured(
2462 test_proc_id("client_0"),
2463 BoxedMailboxSender::new(router.clone()),
2464 );
2465 proc.clone().serve(proc_rx);
2466 let proc_ref: ProcAddr = test_proc_id("client_0");
2467 router.bind(proc_ref, proc_addr.clone());
2468 let (client, _handle) = proc.client("client").unwrap();
2469
2470 let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2471 test_tap::install(tap_tx);
2472
2473 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2474 unsafe {
2476 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2477 }
2478
2479 let log_client_actor = LogClientActor::new((), Flattrs::default()).await.unwrap();
2482 let log_client: ActorRef<LogClientActor> =
2483 proc.spawn("log_client", log_client_actor).unwrap().bind();
2484 log_client.set_aggregate(&client, None).await.unwrap();
2485
2486 let log_forwarder_actor = LogForwardActor::new(log_client.clone(), Flattrs::default())
2489 .await
2490 .unwrap();
2491 let _log_forwarder: ActorRef<LogForwardActor> = proc
2492 .spawn("log_forwarder", log_forwarder_actor)
2493 .unwrap()
2494 .bind();
2495
2496 let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2499
2500 tx.post(LogMessage::Log {
2503 hostname: "testhost".into(),
2504 proc_id: "testproc[0]".into(),
2505 output_target: OutputTarget::Stdout,
2506 payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2507 });
2508
2509 let line = tokio::time::timeout(Duration::from_secs(2), tap_rx.recv())
2511 .await
2512 .expect("timed out waiting for log line")
2513 .expect("tap channel closed unexpectedly");
2514 assert!(
2515 line.contains("hello from child"),
2516 "log line did not appear via LogClientActor; got: {line}"
2517 );
2518 }
2519
2520 mod proc_handle {
2521
2522 use std::sync::Arc;
2523 use std::time::Duration;
2524
2525 use async_trait::async_trait;
2526 use hyperactor::ActorRef;
2527 use hyperactor::ProcAddr;
2528 use hyperactor::testing::ids::test_proc_id;
2529
2530 use super::super::*;
2531 use crate::host::ProcHandle;
2532 use crate::proc_launcher::LaunchOptions;
2533 use crate::proc_launcher::LaunchResult;
2534 use crate::proc_launcher::ProcLauncher;
2535 use crate::proc_launcher::ProcLauncherError;
2536
2537 struct TestProcLauncher;
2545
2546 #[async_trait]
2547 impl ProcLauncher for TestProcLauncher {
2548 async fn launch(
2549 &self,
2550 _proc_id: &ProcAddr,
2551 _opts: LaunchOptions,
2552 ) -> Result<LaunchResult, ProcLauncherError> {
2553 panic!("TestProcLauncher::launch should not be called in unit tests");
2554 }
2555
2556 async fn terminate(
2557 &self,
2558 _proc_id: &ProcAddr,
2559 _timeout: Duration,
2560 ) -> Result<(), ProcLauncherError> {
2561 panic!("TestProcLauncher::terminate should not be called in unit tests");
2562 }
2563
2564 async fn kill(&self, _proc_id: &ProcAddr) -> Result<(), ProcLauncherError> {
2565 panic!("TestProcLauncher::kill should not be called in unit tests");
2566 }
2567 }
2568
2569 fn handle_for_test() -> BootstrapProcHandle {
2576 let proc_id: ProcAddr = test_proc_id("0");
2577 let launcher: Arc<dyn ProcLauncher> = Arc::new(TestProcLauncher);
2578 BootstrapProcHandle::new(proc_id, Arc::downgrade(&launcher))
2579 }
2580
2581 #[tokio::test]
2582 async fn starting_to_running_ok() {
2583 let h = handle_for_test();
2584 assert!(matches!(h.status(), ProcStatus::Starting));
2585 let child_started_at = std::time::SystemTime::now();
2586 assert!(h.mark_running(child_started_at));
2587 match h.status() {
2588 ProcStatus::Running { started_at } => {
2589 assert_eq!(started_at, child_started_at);
2590 }
2591 other => panic!("expected Running, got {other:?}"),
2592 }
2593 }
2594
2595 #[tokio::test]
2596 async fn running_to_stopping_to_stopped_ok() {
2597 let h = handle_for_test();
2598 let child_started_at = std::time::SystemTime::now();
2599 assert!(h.mark_running(child_started_at));
2600 assert!(h.mark_stopping());
2601 assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2602 assert!(h.mark_stopped(0, Vec::new()));
2603 assert!(matches!(
2604 h.status(),
2605 ProcStatus::Stopped { exit_code: 0, .. }
2606 ));
2607 }
2608
2609 #[tokio::test]
2610 async fn running_to_killed_ok() {
2611 let h = handle_for_test();
2612 let child_started_at = std::time::SystemTime::now();
2613 assert!(h.mark_running(child_started_at));
2614 assert!(h.mark_killed(9, true));
2615 assert!(matches!(
2616 h.status(),
2617 ProcStatus::Killed {
2618 signal: 9,
2619 core_dumped: true
2620 }
2621 ));
2622 }
2623
2624 #[tokio::test]
2625 async fn running_to_failed_ok() {
2626 let h = handle_for_test();
2627 let child_started_at = std::time::SystemTime::now();
2628 assert!(h.mark_running(child_started_at));
2629 assert!(h.mark_failed("bootstrap error"));
2630 match h.status() {
2631 ProcStatus::Failed { reason } => {
2632 assert_eq!(reason, "bootstrap error");
2633 }
2634 other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2635 }
2636 }
2637
2638 #[tokio::test]
2639 async fn illegal_transitions_are_rejected() {
2640 let h = handle_for_test();
2641 let child_started_at = std::time::SystemTime::now();
2642 assert!(h.mark_running(child_started_at));
2644 assert!(!h.mark_running(std::time::SystemTime::now()));
2645 assert!(matches!(h.status(), ProcStatus::Running { .. }));
2646 assert!(h.mark_stopping());
2648 assert!(h.mark_stopped(0, Vec::new()));
2649 assert!(!h.mark_running(child_started_at));
2650 assert!(!h.mark_killed(9, false));
2651 assert!(!h.mark_failed("nope"));
2652
2653 assert!(matches!(
2654 h.status(),
2655 ProcStatus::Stopped { exit_code: 0, .. }
2656 ));
2657 }
2658
2659 #[tokio::test]
2660 async fn transitions_from_ready_are_legal() {
2661 let h = handle_for_test();
2662 let addr = ChannelAddr::any(ChannelTransport::Unix);
2663 let t0 = std::time::SystemTime::now();
2665 assert!(h.mark_running(t0));
2666 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_addr(&h);
2669 let actor_id = proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME);
2670 let agent_ref: ActorRef<ProcAgent> = ActorRef::attest(actor_id);
2671 assert!(h.mark_ready(addr, agent_ref));
2673 assert!(h.mark_stopping());
2674 assert!(h.mark_stopped(0, Vec::new()));
2675 }
2676
2677 #[tokio::test]
2678 async fn ready_to_killed_is_legal() {
2679 let h = handle_for_test();
2680 let addr = ChannelAddr::any(ChannelTransport::Unix);
2681 let t0 = std::time::SystemTime::now();
2683 assert!(h.mark_running(t0));
2684 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_addr(&h);
2687 let actor_id = proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME);
2688 let agent: ActorRef<ProcAgent> = ActorRef::attest(actor_id);
2689 assert!(h.mark_ready(addr, agent));
2691 assert!(h.mark_killed(9, false));
2693 }
2694
2695 #[tokio::test]
2696 async fn mark_failed_from_stopping_is_allowed() {
2697 let h = handle_for_test();
2698
2699 assert!(h.mark_stopping(), "precondition: to Stopping");
2701
2702 assert!(
2704 h.mark_failed("boom"),
2705 "mark_failed() should succeed from Stopping"
2706 );
2707 match h.status() {
2708 ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2709 other => panic!("expected Failed(\"boom\"), got {other:?}"),
2710 }
2711 }
2712 }
2713
2714 struct TestLauncher;
2720
2721 #[async_trait::async_trait]
2722 impl crate::proc_launcher::ProcLauncher for TestLauncher {
2723 async fn launch(
2724 &self,
2725 _proc_id: &ProcAddr,
2726 _opts: crate::proc_launcher::LaunchOptions,
2727 ) -> Result<crate::proc_launcher::LaunchResult, crate::proc_launcher::ProcLauncherError>
2728 {
2729 panic!("TestLauncher::launch should not be called in unit tests");
2730 }
2731
2732 async fn terminate(
2733 &self,
2734 _proc_id: &ProcAddr,
2735 _timeout: std::time::Duration,
2736 ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2737 panic!("TestLauncher::terminate should not be called in unit tests");
2738 }
2739
2740 async fn kill(
2741 &self,
2742 _proc_id: &ProcAddr,
2743 ) -> Result<(), crate::proc_launcher::ProcLauncherError> {
2744 panic!("TestLauncher::kill should not be called in unit tests");
2745 }
2746 }
2747
2748 fn test_handle(proc_id: ProcAddr) -> BootstrapProcHandle {
2749 let launcher: std::sync::Arc<dyn crate::proc_launcher::ProcLauncher> =
2750 std::sync::Arc::new(TestLauncher);
2751 BootstrapProcHandle::new(proc_id, std::sync::Arc::downgrade(&launcher))
2752 }
2753
2754 #[tokio::test]
2755 async fn watch_notifies_on_status_changes() {
2756 let proc_id = test_proc_id("1");
2757 let handle = test_handle(proc_id);
2758 let mut rx = handle.watch();
2759
2760 let now = std::time::SystemTime::now();
2762 assert!(handle.mark_running(now));
2763 rx.changed().await.ok(); match &*rx.borrow() {
2765 ProcStatus::Running { started_at } => {
2766 assert_eq!(*started_at, now);
2767 }
2768 s => panic!("expected Running, got {s:?}"),
2769 }
2770
2771 assert!(handle.mark_stopped(0, Vec::new()));
2773 rx.changed().await.ok(); assert!(matches!(
2775 &*rx.borrow(),
2776 ProcStatus::Stopped { exit_code: 0, .. }
2777 ));
2778 }
2779
2780 #[tokio::test]
2781 async fn ready_errs_if_process_exits_before_running() {
2782 let proc_id =
2783 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "early-exit");
2784 let handle = test_handle(proc_id);
2785
2786 assert!(handle.mark_stopped(7, Vec::new()));
2789
2790 match handle.ready_inner().await {
2792 Ok(()) => panic!("ready() unexpectedly succeeded"),
2793 Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
2794 assert_eq!(exit_code, 7)
2795 }
2796 Err(other) => panic!("expected Stopped(7), got {other:?}"),
2797 }
2798 }
2799
2800 #[tokio::test]
2801 async fn status_unknown_proc_is_none() {
2802 let manager = BootstrapProcManager::new(BootstrapCommand {
2803 program: PathBuf::from("/bin/true"),
2804 ..Default::default()
2805 })
2806 .unwrap();
2807 let unknown = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "nope");
2808 assert!(manager.status(&unknown).await.is_none());
2809 }
2810
2811 #[tokio::test]
2812 async fn handle_ready_allows_waiters() {
2813 let proc_id = test_proc_id("42");
2814 let handle = test_handle(proc_id.clone());
2815
2816 let started_at = std::time::SystemTime::now();
2817 assert!(handle.mark_running(started_at));
2818
2819 let actor_id = proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME);
2820 let agent_ref: ActorRef<ProcAgent> = ActorRef::attest(actor_id);
2821
2822 let ready_addr = ChannelAddr::any(ChannelTransport::Unix);
2825
2826 assert!(handle.mark_ready(ready_addr.clone(), agent_ref));
2828 handle
2829 .ready_inner()
2830 .await
2831 .expect("ready_inner() should complete after Ready");
2832
2833 match handle.status() {
2836 ProcStatus::Ready {
2837 started_at: t,
2838 addr: a,
2839 ..
2840 } => {
2841 assert_eq!(t, started_at);
2842 assert_eq!(a, ready_addr);
2843 }
2844 other => panic!("expected Ready, got {other:?}"),
2845 }
2846 }
2847
2848 #[test]
2849 fn display_running_includes_uptime() {
2850 let started_at = std::time::SystemTime::now() - Duration::from_secs(42);
2851 let st = ProcStatus::Running { started_at };
2852
2853 let s = format!("{}", st);
2854 assert!(s.contains("Running"));
2855 assert!(s.contains("42s"));
2856 }
2857
2858 #[test]
2859 fn display_ready_includes_addr() {
2860 let started_at = std::time::SystemTime::now() - Duration::from_secs(5);
2861 let addr = ChannelAddr::any(ChannelTransport::Unix);
2862 let agent = ActorRef::attest(
2863 test_proc_id_with_addr(addr.clone(), "proc")
2864 .actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME),
2865 );
2866
2867 let st = ProcStatus::Ready {
2868 started_at,
2869 addr: addr.clone(),
2870 agent,
2871 };
2872
2873 let s = format!("{}", st);
2874 assert!(s.contains(&addr.to_string())); assert!(s.contains("Ready"));
2876 }
2877
2878 #[test]
2879 fn display_stopped_includes_exit_code() {
2880 let st = ProcStatus::Stopped {
2881 exit_code: 7,
2882 stderr_tail: Vec::new(),
2883 };
2884 let s = format!("{}", st);
2885 assert!(s.contains("Stopped"));
2886 assert!(s.contains("7"));
2887 }
2888
2889 #[test]
2890 fn display_other_variants_does_not_panic() {
2891 let samples = vec![
2892 ProcStatus::Starting,
2893 ProcStatus::Stopping {
2894 started_at: std::time::SystemTime::now(),
2895 },
2896 ProcStatus::Ready {
2897 started_at: std::time::SystemTime::now(),
2898 addr: ChannelAddr::any(ChannelTransport::Unix),
2899 agent: ActorRef::attest(
2900 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "x")
2901 .actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME),
2902 ),
2903 },
2904 ProcStatus::Killed {
2905 signal: 9,
2906 core_dumped: false,
2907 },
2908 ProcStatus::Failed {
2909 reason: "boom".into(),
2910 },
2911 ];
2912
2913 for st in samples {
2914 let _ = format!("{}", st); }
2916 }
2917
2918 #[tokio::test]
2919 async fn proc_handle_ready_ok_through_trait() {
2920 let proc_id =
2921 test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-ready-ok");
2922 let handle = test_handle(proc_id.clone());
2923
2924 let t0 = std::time::SystemTime::now();
2926 assert!(handle.mark_running(t0));
2927
2928 let addr = ChannelAddr::any(ChannelTransport::Unix);
2930 let agent: ActorRef<ProcAgent> =
2931 ActorRef::attest(proc_id.actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME));
2932 assert!(handle.mark_ready(addr, agent));
2933
2934 let r = <BootstrapProcHandle as ProcHandle>::ready(&handle).await;
2936 assert!(r.is_ok(), "expected Ok(()), got {r:?}");
2937 }
2938
2939 #[tokio::test]
2940 async fn proc_handle_wait_returns_terminal_status() {
2941 let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "ph-wait");
2942 let handle = test_handle(proc_id);
2943
2944 assert!(handle.mark_stopped(0, Vec::new()));
2946
2947 let st = <BootstrapProcHandle as ProcHandle>::wait(&handle)
2949 .await
2950 .expect("wait should return Ok(terminal)");
2951
2952 match st {
2953 ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
2954 other => panic!("expected Stopped(0), got {other:?}"),
2955 }
2956 }
2957
2958 #[tokio::test]
2959 async fn ready_wrapper_maps_terminal_to_trait_error() {
2960 let proc_id = test_proc_id_with_addr(ChannelAddr::any(ChannelTransport::Unix), "wrap");
2961 let handle = test_handle(proc_id);
2962
2963 assert!(handle.mark_stopped(7, Vec::new()));
2964
2965 match <BootstrapProcHandle as ProcHandle>::ready(&handle).await {
2966 Ok(()) => panic!("expected Err"),
2967 Err(HostReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
2968 assert_eq!(exit_code, 7);
2969 }
2970 Err(e) => panic!("unexpected error: {e:?}"),
2971 }
2972 }
2973
2974 #[cfg(fbcode_build)]
2984 async fn make_proc_id_and_backend_addr(
2985 instance: &hyperactor::Instance<()>,
2986 _tag: &str,
2987 ) -> (ProcAddr, ChannelAddr) {
2988 let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2991
2992 instance.proc().clone().serve(rx);
2996
2997 let proc_id = test_proc_id_with_addr(ChannelTransport::Unix.any(), "proc");
3000 (proc_id, backend_addr)
3001 }
3002
3003 #[tokio::test]
3004 #[cfg(fbcode_build)]
3005 async fn bootstrap_handle_terminate_graceful() {
3006 let root =
3008 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3009 let (instance, _handle) = root.client("client").unwrap();
3010
3011 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3012 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3013 let handle = mgr
3014 .spawn(
3015 proc_id.clone(),
3016 backend_addr.clone(),
3017 BootstrapProcConfig {
3018 create_rank: 0,
3019 client_config_override: Attrs::new(),
3020 proc_bind: None,
3021 bootstrap_command: None,
3022 },
3023 )
3024 .await
3025 .expect("spawn bootstrap child");
3026
3027 handle.ready().await.expect("ready");
3028
3029 let deadline = Duration::from_secs(2);
3030 match tokio::time::timeout(
3031 deadline * 2,
3032 handle.terminate(&instance, deadline, "test terminate"),
3033 )
3034 .await
3035 {
3036 Err(_) => panic!("terminate() future hung"),
3037 Ok(Ok(st)) => {
3038 match st {
3039 ProcStatus::Stopped { exit_code, .. } => {
3040 assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3042 }
3043 ProcStatus::Killed { signal, .. } => {
3044 assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3062 }
3063 other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3064 }
3065 }
3066 Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3067 }
3068 }
3069
3070 #[tokio::test]
3071 #[cfg(fbcode_build)]
3072 async fn bootstrap_handle_kill_forced() {
3073 let root =
3075 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3076 let (instance, _handle) = root.client("client").unwrap();
3077
3078 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3079
3080 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3082
3083 let handle = mgr
3085 .spawn(
3086 proc_id.clone(),
3087 backend_addr.clone(),
3088 BootstrapProcConfig {
3089 create_rank: 0,
3090 client_config_override: Attrs::new(),
3091 proc_bind: None,
3092 bootstrap_command: None,
3093 },
3094 )
3095 .await
3096 .expect("spawn bootstrap child");
3097
3098 handle.ready().await.expect("ready");
3101
3102 let deadline = Duration::from_secs(5);
3105 match tokio::time::timeout(deadline, handle.kill()).await {
3106 Err(_) => panic!("kill() future hung"),
3107 Ok(Ok(st)) => {
3108 match st {
3110 ProcStatus::Killed { signal, .. } => {
3111 assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3113 }
3114 other => panic!("expected Killed status after kill(); got: {other:?}"),
3115 }
3116 }
3117 Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3118 }
3119 }
3120
3121 #[tokio::test]
3122 #[cfg(fbcode_build)]
3123 async fn test_host_bootstrap() {
3124 use crate::host_mesh::host_agent::GetLocalProcClient;
3125 use crate::proc_agent::NewClientInstanceClient;
3126
3127 let temp_proc = Proc::isolated();
3130 let (temp_instance, _) = temp_proc.client("temp").unwrap();
3131
3132 let handle = host(
3133 ChannelAddr::any(ChannelTransport::Unix),
3134 Some(BootstrapCommand::test()),
3135 None,
3136 false,
3137 None,
3138 )
3139 .await
3140 .unwrap();
3141
3142 let local_proc = handle.0.get_local_proc(&temp_instance).await.unwrap();
3143 let _local_instance = local_proc
3144 .new_client_instance(&temp_instance)
3145 .await
3146 .unwrap();
3147 }
3148
3149 use std::time::Duration;
3155
3156 use crate::proc_launcher::LaunchOptions;
3157 use crate::proc_launcher::LaunchResult;
3158 use crate::proc_launcher::ProcExitKind;
3159 use crate::proc_launcher::ProcExitResult;
3160 use crate::proc_launcher::ProcLauncher;
3161 use crate::proc_launcher::ProcLauncherError;
3162 use crate::proc_launcher::StdioHandling;
3163
3164 #[allow(dead_code)]
3167 struct DummyLauncher {
3168 marker: u64,
3170 }
3171
3172 impl DummyLauncher {
3173 #[allow(dead_code)]
3174 fn new(marker: u64) -> Self {
3175 Self { marker }
3176 }
3177
3178 #[allow(dead_code)]
3179 fn marker(&self) -> u64 {
3180 self.marker
3181 }
3182 }
3183
3184 #[async_trait::async_trait]
3185 impl ProcLauncher for DummyLauncher {
3186 async fn launch(
3187 &self,
3188 _proc_id: &ProcAddr,
3189 _opts: LaunchOptions,
3190 ) -> Result<LaunchResult, ProcLauncherError> {
3191 let (tx, rx) = tokio::sync::oneshot::channel();
3192 let _ = tx.send(ProcExitResult {
3194 kind: ProcExitKind::Exited { code: 0 },
3195 stderr_tail: Some(vec![]),
3196 });
3197 Ok(LaunchResult {
3198 pid: None,
3199 started_at: std::time::SystemTime::now(),
3200 stdio: StdioHandling::ManagedByLauncher,
3201 exit_rx: rx,
3202 })
3203 }
3204
3205 async fn terminate(
3206 &self,
3207 _proc_id: &ProcAddr,
3208 _timeout: Duration,
3209 ) -> Result<(), ProcLauncherError> {
3210 Ok(())
3211 }
3212
3213 async fn kill(&self, _proc_id: &ProcAddr) -> Result<(), ProcLauncherError> {
3214 Ok(())
3215 }
3216 }
3217
3218 #[test]
3220 #[cfg(fbcode_build)]
3221 fn test_set_launcher_then_get() {
3222 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3223
3224 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(42));
3225 let custom_ptr = Arc::as_ptr(&custom);
3226
3227 manager.set_launcher(custom).unwrap();
3229
3230 let got = manager.launcher();
3232 let got_ptr = Arc::as_ptr(got);
3233
3234 assert_eq!(
3235 custom_ptr, got_ptr,
3236 "launcher() should return the same Arc that was set"
3237 );
3238 }
3239
3240 #[test]
3243 #[cfg(fbcode_build)]
3244 fn test_get_launcher_then_set_fails() {
3245 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3246
3247 let _ = manager.launcher();
3249
3250 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(99));
3252 let result = manager.set_launcher(custom);
3253
3254 assert!(
3255 result.is_err(),
3256 "set_launcher should fail after launcher() was called"
3257 );
3258
3259 let err = result.unwrap_err();
3261 let err_msg = err.to_string();
3262 assert!(
3263 err_msg.contains("already initialized"),
3264 "error should mention 'already initialized', got: {}",
3265 err_msg
3266 );
3267 }
3268
3269 #[test]
3271 #[cfg(fbcode_build)]
3272 fn test_set_launcher_twice_fails() {
3273 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3274
3275 let first: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(1));
3276 let second: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(2));
3277
3278 manager.set_launcher(first).unwrap();
3280
3281 let result = manager.set_launcher(second);
3283 assert!(result.is_err(), "second set_launcher should fail");
3284
3285 let err = result.unwrap_err();
3287 let err_msg = err.to_string();
3288 assert!(
3289 err_msg.contains("already initialized"),
3290 "error should mention 'already initialized', got: {}",
3291 err_msg
3292 );
3293 }
3294
3295 #[test]
3297 #[cfg(fbcode_build)]
3298 fn test_launcher_initially_empty() {
3299 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3300
3301 let custom: Arc<dyn ProcLauncher> = Arc::new(DummyLauncher::new(123));
3305 let result = manager.set_launcher(custom);
3306
3307 assert!(
3308 result.is_ok(),
3309 "set_launcher should succeed on fresh manager"
3310 );
3311 }
3312
3313 #[test]
3315 #[cfg(fbcode_build)]
3316 fn test_launcher_idempotent() {
3317 let manager = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3318
3319 let first = manager.launcher();
3321 let second = manager.launcher();
3322
3323 assert!(
3325 Arc::ptr_eq(first, second),
3326 "launcher() should return the same Arc on repeated calls"
3327 );
3328 }
3329}