1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::env::VarError;
12use std::fmt;
13use std::fs::OpenOptions;
14use std::future;
15use std::io;
16use std::io::Write;
17use std::os::unix::process::ExitStatusExt;
18use std::path::Path;
19use std::path::PathBuf;
20use std::process::Stdio;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::OnceLock;
24use std::time::Duration;
25use std::time::SystemTime;
26
27use async_trait::async_trait;
28use base64::prelude::*;
29use futures::StreamExt;
30use futures::stream;
31use humantime::format_duration;
32use hyperactor::ActorHandle;
33use hyperactor::ActorId;
34use hyperactor::ActorRef;
35use hyperactor::ProcId;
36use hyperactor::channel;
37use hyperactor::channel::ChannelAddr;
38use hyperactor::channel::ChannelError;
39use hyperactor::channel::ChannelTransport;
40use hyperactor::channel::Rx;
41use hyperactor::channel::Tx;
42use hyperactor::clock::Clock;
43use hyperactor::clock::RealClock;
44use hyperactor::context;
45use hyperactor::host::Host;
46use hyperactor::host::HostError;
47use hyperactor::host::ProcHandle;
48use hyperactor::host::ProcManager;
49use hyperactor::host::TerminateSummary;
50use hyperactor::mailbox::BoxableMailboxSender;
51use hyperactor::mailbox::IntoBoxedMailboxSender;
52use hyperactor::mailbox::MailboxClient;
53use hyperactor::mailbox::MailboxServer;
54use hyperactor::proc::Proc;
55use hyperactor_config::CONFIG;
56use hyperactor_config::ConfigAttr;
57use hyperactor_config::attrs::Attrs;
58use hyperactor_config::attrs::declare_attrs;
59use hyperactor_config::global::override_or_global;
60use serde::Deserialize;
61use serde::Serialize;
62use tempfile::TempDir;
63use tokio::process::Child;
64use tokio::process::ChildStderr;
65use tokio::process::ChildStdout;
66use tokio::process::Command;
67use tokio::sync::oneshot;
68use tokio::sync::watch;
69use tracing::Instrument;
70use tracing::Level;
71use typeuri::Named;
72
73use crate::logging::OutputTarget;
74use crate::logging::StreamFwder;
75use crate::proc_mesh::mesh_agent::ProcMeshAgent;
76use crate::resource;
77use crate::v1;
78use crate::v1::host_mesh::mesh_agent::HostAgentMode;
79use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
80
81mod mailbox;
82
83declare_attrs! {
84 @meta(CONFIG = ConfigAttr {
100 env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
101 py_name: Some("enable_log_forwarding".to_string()),
102 })
103 pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
104
105 @meta(CONFIG = ConfigAttr {
125 env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
126 py_name: Some("enable_file_capture".to_string()),
127 })
128 pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
129
130 @meta(CONFIG = ConfigAttr {
134 env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
135 py_name: Some("tail_log_lines".to_string()),
136 })
137 pub attr MESH_TAIL_LOG_LINES: usize = 0;
138
139 @meta(CONFIG = ConfigAttr {
146 env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
147 py_name: Some("mesh_bootstrap_enable_pdeathsig".to_string()),
148 })
149 pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
150
151 @meta(CONFIG = ConfigAttr {
156 env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
157 py_name: Some("mesh_terminate_concurrency".to_string()),
158 })
159 pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
160
161 @meta(CONFIG = ConfigAttr {
165 env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
166 py_name: Some("mesh_terminate_timeout".to_string()),
167 })
168 pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
169}
170
171pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
172pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
173pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
174pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
178
179#[derive(Debug, Clone, Serialize, Deserialize, Named)]
183pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
184wirevalue::register_type!(Process2Allocator);
185
186#[derive(Debug, Clone, Serialize, Deserialize, Named)]
188pub(crate) enum Process2AllocatorMessage {
189 Hello(ChannelAddr),
193
194 StartedProc(ProcId, ActorRef<ProcMeshAgent>, ChannelAddr),
199
200 Heartbeat,
201}
202wirevalue::register_type!(Process2AllocatorMessage);
203
204#[derive(Debug, Clone, Serialize, Deserialize, Named)]
206pub(crate) enum Allocator2Process {
207 StartProc(ProcId, ChannelTransport),
210
211 StopAndExit(i32),
214
215 Exit(i32),
218}
219wirevalue::register_type!(Allocator2Process);
220
221async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
222 let tx = match channel::dial(bootstrap_addr.clone()) {
223 Ok(tx) => tx,
224
225 Err(err) => {
226 tracing::error!(
227 "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
228 bootstrap_addr,
229 err
230 );
231 std::process::exit(1);
232 }
233 };
234 tracing::info!(
235 "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
236 );
237 loop {
238 RealClock.sleep(Duration::from_secs(5)).await;
239
240 let result = tx
241 .send(Process2Allocator(
242 bootstrap_index,
243 Process2AllocatorMessage::Heartbeat,
244 ))
245 .await;
246
247 if let Err(err) = result {
248 tracing::error!(
249 "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
250 bootstrap_addr,
251 err
252 );
253 std::process::exit(1);
254 }
255 }
256}
257
258#[macro_export]
259macro_rules! ok {
260 ($expr:expr $(,)?) => {
261 match $expr {
262 Ok(value) => value,
263 Err(e) => return ::anyhow::Error::from(e),
264 }
265 };
266}
267
268async fn halt<R>() -> R {
269 future::pending::<()>().await;
270 unreachable!()
271}
272
273pub async fn host(
282 addr: ChannelAddr,
283 command: Option<BootstrapCommand>,
284 config: Option<Attrs>,
285) -> anyhow::Result<ActorHandle<HostMeshAgent>> {
286 if let Some(attrs) = config {
287 hyperactor_config::global::set(hyperactor_config::global::Source::Runtime, attrs);
288 tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
289 } else {
290 tracing::debug!("bootstrap: no config snapshot provided (Host)");
291 }
292
293 let command = match command {
294 Some(command) => command,
295 None => BootstrapCommand::current()?,
296 };
297 let manager = BootstrapProcManager::new(command)?;
298
299 let host = Host::new_with_default(manager, addr, Some(crate::router::global().clone().boxed()))
301 .await?;
302 let addr = host.addr().clone();
303 let system_proc = host.system_proc().clone();
304 let host_mesh_agent = system_proc
305 .spawn::<HostMeshAgent>("agent", HostMeshAgent::new(HostAgentMode::Process(host)))?;
306
307 tracing::info!(
308 "serving host at {}, agent: {}",
309 addr,
310 host_mesh_agent.bind::<HostMeshAgent>()
311 );
312
313 Ok(host_mesh_agent)
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
325pub enum Bootstrap {
326 Proc {
328 proc_id: ProcId,
330 backend_addr: ChannelAddr,
333 callback_addr: ChannelAddr,
335 socket_dir_path: PathBuf,
339 config: Option<Attrs>,
344 },
345
346 Host {
349 addr: ChannelAddr,
351 command: Option<BootstrapCommand>,
354 config: Option<Attrs>,
359 },
360
361 V0ProcMesh {
363 config: Option<Attrs>,
368 },
369}
370
371impl Default for Bootstrap {
372 fn default() -> Self {
373 Bootstrap::V0ProcMesh { config: None }
374 }
375}
376
377impl Bootstrap {
378 #[allow(clippy::result_large_err)]
381 fn to_env_safe_string(&self) -> v1::Result<String> {
382 Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
383 }
384
385 #[allow(clippy::result_large_err)]
387 fn from_env_safe_string(str: &str) -> v1::Result<Self> {
388 let data = BASE64_STANDARD.decode(str)?;
389 let data = std::str::from_utf8(&data)?;
390 Ok(serde_json::from_str(data)?)
391 }
392
393 pub fn get_from_env() -> anyhow::Result<Option<Self>> {
396 match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
397 Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
398 Ok(mode) => Ok(Some(mode)),
399 Err(e) => {
400 Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
401 }
402 },
403 Err(VarError::NotPresent) => Ok(None),
404 Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
405 }
406 }
407
408 pub fn to_env(&self, cmd: &mut Command) {
410 cmd.env(
411 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
412 self.to_env_safe_string().unwrap(),
413 );
414 }
415
416 pub async fn bootstrap(self) -> anyhow::Error {
419 tracing::info!(
420 "bootstrapping mesh process: {}",
421 serde_json::to_string(&self).unwrap()
422 );
423
424 if Debug::is_active() {
425 let mut buf = Vec::new();
426 writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
427 #[cfg(unix)]
428 writeln!(
429 &mut buf,
430 "\tparent pid: {}",
431 std::os::unix::process::parent_id()
432 )
433 .unwrap();
434 writeln!(
435 &mut buf,
436 "\tconfig: {}",
437 serde_json::to_string(&self).unwrap()
438 )
439 .unwrap();
440 match std::env::current_exe() {
441 Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
442 Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
443 }
444 writeln!(&mut buf, "\targs:").unwrap();
445 for arg in std::env::args() {
446 writeln!(&mut buf, "\t\t{}", arg).unwrap();
447 }
448 writeln!(&mut buf, "\tenv:").unwrap();
449 for (key, val) in std::env::vars() {
450 writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
451 }
452 let _ = Debug.write(&buf);
453 if let Ok(s) = std::str::from_utf8(&buf) {
454 tracing::info!("{}", s);
455 } else {
456 tracing::info!("{:?}", buf);
457 }
458 }
459
460 match self {
461 Bootstrap::Proc {
462 proc_id,
463 backend_addr,
464 callback_addr,
465 socket_dir_path,
466 config,
467 } => {
468 let entered = tracing::span!(
469 Level::INFO,
470 "proc_bootstrap",
471 %proc_id,
472 %backend_addr,
473 %callback_addr,
474 socket_dir_path = %socket_dir_path.display(),
475 )
476 .entered();
477 if let Some(attrs) = config {
478 hyperactor_config::global::set(
479 hyperactor_config::global::Source::ClientOverride,
480 attrs,
481 );
482 tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
483 } else {
484 tracing::debug!("bootstrap: no config snapshot provided (Proc)");
485 }
486
487 if hyperactor_config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
488 let _ = install_pdeathsig_kill();
493 } else {
494 eprintln!("(bootstrap) PDEATHSIG disabled via config");
495 }
496
497 let (local_addr, name) = ok!(proc_id
498 .as_direct()
499 .ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
500 let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
502 let serve_addr = serve_addr.parse().unwrap();
503
504 let proc_sender = mailbox::LocalProcDialer::new(
509 local_addr.clone(),
510 socket_dir_path,
511 ok!(MailboxClient::dial(backend_addr)),
512 );
513
514 let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
515
516 let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
517 .map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
518
519 let span = entered.exit();
520
521 let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr));
524 proc.clone().serve(proc_rx);
525 ok!(ok!(channel::dial(callback_addr))
526 .send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
527 .instrument(span)
528 .await
529 .map_err(ChannelError::from));
530
531 halt().await
532 }
533 Bootstrap::Host {
534 addr,
535 command,
536 config,
537 } => {
538 ok!(host(addr, command, config).await);
539 halt().await
540 }
541 Bootstrap::V0ProcMesh { config } => bootstrap_v0_proc_mesh(config).await,
542 }
543 }
544
545 pub async fn bootstrap_or_die(self) -> ! {
548 let err = self.bootstrap().await;
549 tracing::error!("failed to bootstrap mesh process: {}", err);
550 std::process::exit(1)
551 }
552}
553
554pub fn install_pdeathsig_kill() -> io::Result<()> {
556 #[cfg(target_os = "linux")]
557 {
558 let ppid_before = unsafe { libc::getppid() };
561
562 let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
565 if rc != 0 {
566 return Err(io::Error::last_os_error());
567 }
568
569 let ppid_after = unsafe { libc::getppid() };
579 if ppid_before != ppid_after {
580 std::process::exit(0);
581 }
582 }
583 Ok(())
584}
585
586#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
601pub enum ProcStatus {
602 Starting,
605 Running { pid: u32, started_at: SystemTime },
609 Ready {
613 pid: u32,
614 started_at: SystemTime,
615 addr: ChannelAddr,
616 agent: ActorRef<ProcMeshAgent>,
617 },
618 Stopping { pid: u32, started_at: SystemTime },
622 Stopped {
625 exit_code: i32,
626 stderr_tail: Vec<String>,
627 },
628 Killed { signal: i32, core_dumped: bool },
631 Failed { reason: String },
635}
636
637impl ProcStatus {
638 #[inline]
642 pub fn is_exit(&self) -> bool {
643 matches!(
644 self,
645 ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
646 )
647 }
648}
649
650impl std::fmt::Display for ProcStatus {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 match self {
653 ProcStatus::Starting => write!(f, "Starting"),
654 ProcStatus::Running { pid, started_at } => {
655 let uptime = started_at
656 .elapsed()
657 .map(|d| format!(" up {}", format_duration(d)))
658 .unwrap_or_default();
659 write!(f, "Running[{pid}]{uptime}")
660 }
661 ProcStatus::Ready {
662 pid,
663 started_at,
664 addr,
665 ..
666 } => {
667 let uptime = started_at
668 .elapsed()
669 .map(|d| format!(" up {}", format_duration(d)))
670 .unwrap_or_default();
671 write!(f, "Ready[{pid}] at {addr}{uptime}")
672 }
673 ProcStatus::Stopping { pid, started_at } => {
674 let uptime = started_at
675 .elapsed()
676 .map(|d| format!(" up {}", format_duration(d)))
677 .unwrap_or_default();
678 write!(f, "Stopping[{pid}]{uptime}")
679 }
680 ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
681 ProcStatus::Killed {
682 signal,
683 core_dumped,
684 } => {
685 if *core_dumped {
686 write!(f, "Killed(sig={signal}, core)")
687 } else {
688 write!(f, "Killed(sig={signal})")
689 }
690 }
691 ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
692 }
693 }
694}
695
696#[derive(Debug, Clone)]
698pub enum ReadyError {
699 Terminal(ProcStatus),
701 ChannelClosed,
703}
704
705impl std::fmt::Display for ReadyError {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 match self {
708 ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
709 ReadyError::ChannelClosed => write!(f, "status channel closed"),
710 }
711 }
712}
713impl std::error::Error for ReadyError {}
714
715#[derive(Clone)]
754pub struct BootstrapProcHandle {
755 proc_id: ProcId,
757 status: Arc<std::sync::Mutex<ProcStatus>>,
763 child: Arc<std::sync::Mutex<Option<Child>>>,
767 stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
772 stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
775 tx: tokio::sync::watch::Sender<ProcStatus>,
780 rx: tokio::sync::watch::Receiver<ProcStatus>,
784}
785
786impl fmt::Debug for BootstrapProcHandle {
787 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
788 let status = self.status.lock().expect("status mutex poisoned").clone();
789 f.debug_struct("BootstrapProcHandle")
790 .field("proc_id", &self.proc_id)
791 .field("status", &status)
792 .field("child", &"<child>")
793 .field("tx", &"<watch::Sender>")
794 .field("rx", &"<watch::Receiver>")
795 .finish()
798 }
799}
800
801impl BootstrapProcHandle {
807 pub fn new(proc_id: ProcId, child: Child) -> Self {
820 let (tx, rx) = watch::channel(ProcStatus::Starting);
821 Self {
822 proc_id,
823 status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
824 child: Arc::new(std::sync::Mutex::new(Some(child))),
825 stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
826 stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
827 tx,
828 rx,
829 }
830 }
831
832 #[inline]
834 pub fn proc_id(&self) -> &ProcId {
835 &self.proc_id
836 }
837
838 #[inline]
851 pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
852 self.rx.clone()
853 }
854
855 #[inline]
873 pub async fn changed(&self) {
874 let _ = self.watch().changed().await;
875 }
876
877 #[inline]
885 pub fn pid(&self) -> Option<u32> {
886 match *self.status.lock().expect("status mutex poisoned") {
887 ProcStatus::Running { pid, .. } | ProcStatus::Ready { pid, .. } => Some(pid),
888 _ => self
889 .child
890 .lock()
891 .expect("child mutex poisoned")
892 .as_ref()
893 .and_then(|c| c.id()),
894 }
895 }
896
897 #[must_use]
910 pub fn status(&self) -> ProcStatus {
911 self.status.lock().expect("status mutex poisoned").clone()
915 }
916
917 fn transition<F>(&self, f: F) -> bool
923 where
924 F: FnOnce(&mut ProcStatus) -> bool,
925 {
926 let mut guard = self.status.lock().expect("status mutex poisoned");
927 let _before = guard.clone();
928 let changed = f(&mut guard);
929 if changed {
930 let _ = self.tx.send(guard.clone());
932 }
933 changed
934 }
935
936 pub(crate) fn mark_running(&self, pid: u32, started_at: SystemTime) -> bool {
948 self.transition(|st| match *st {
949 ProcStatus::Starting => {
950 *st = ProcStatus::Running { pid, started_at };
951 true
952 }
953 _ => {
954 tracing::warn!(
955 "illegal transition: {:?} -> Running; leaving status unchanged",
956 *st
957 );
958 false
959 }
960 })
961 }
962
963 pub(crate) fn mark_ready(
975 &self,
976 pid: u32,
977 started_at: SystemTime,
978 addr: ChannelAddr,
979 agent: ActorRef<ProcMeshAgent>,
980 ) -> bool {
981 tracing::info!(proc_id = %self.proc_id, %addr, "{} running on pid {}", self.proc_id, pid);
982 self.transition(|st| match st {
983 ProcStatus::Starting | ProcStatus::Running { .. } => {
984 *st = ProcStatus::Ready {
985 pid,
986 started_at,
987 addr,
988 agent,
989 };
990 true
991 }
992 _ => {
993 tracing::warn!(
994 "illegal transition: {:?} -> Ready; leaving status unchanged",
995 st
996 );
997 false
998 }
999 })
1000 }
1001
1002 pub(crate) fn mark_stopping(&self) -> bool {
1006 let child_pid = self.child_pid_snapshot();
1008 let now = hyperactor::clock::RealClock.system_time_now();
1009
1010 self.transition(|st| match *st {
1011 ProcStatus::Running { pid, started_at } => {
1012 *st = ProcStatus::Stopping { pid, started_at };
1013 true
1014 }
1015 ProcStatus::Ready {
1016 pid, started_at, ..
1017 } => {
1018 *st = ProcStatus::Stopping { pid, started_at };
1019 true
1020 }
1021 ProcStatus::Starting => {
1022 if let Some(pid) = child_pid {
1023 *st = ProcStatus::Stopping {
1024 pid,
1025 started_at: now,
1026 };
1027 true
1028 } else {
1029 false
1030 }
1031 }
1032 _ => false,
1033 })
1034 }
1035
1036 pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
1039 self.transition(|st| match *st {
1040 ProcStatus::Starting
1041 | ProcStatus::Running { .. }
1042 | ProcStatus::Ready { .. }
1043 | ProcStatus::Stopping { .. } => {
1044 *st = ProcStatus::Stopped {
1045 exit_code,
1046 stderr_tail,
1047 };
1048 true
1049 }
1050 _ => {
1051 tracing::warn!(
1052 "illegal transition: {:?} -> Stopped; leaving status unchanged",
1053 *st
1054 );
1055 false
1056 }
1057 })
1058 }
1059
1060 pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1063 self.transition(|st| match *st {
1064 ProcStatus::Starting
1065 | ProcStatus::Running { .. }
1066 | ProcStatus::Ready { .. }
1067 | ProcStatus::Stopping { .. } => {
1068 *st = ProcStatus::Killed {
1069 signal,
1070 core_dumped,
1071 };
1072 true
1073 }
1074 _ => {
1075 tracing::warn!(
1076 "illegal transition: {:?} -> Killed; leaving status unchanged",
1077 *st
1078 );
1079 false
1080 }
1081 })
1082 }
1083
1084 pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1087 self.transition(|st| match *st {
1088 ProcStatus::Starting
1089 | ProcStatus::Running { .. }
1090 | ProcStatus::Ready { .. }
1091 | ProcStatus::Stopping { .. } => {
1092 *st = ProcStatus::Failed {
1093 reason: reason.into(),
1094 };
1095 true
1096 }
1097 _ => {
1098 tracing::warn!(
1099 "illegal transition: {:?} -> Failed; leaving status unchanged",
1100 *st
1101 );
1102 false
1103 }
1104 })
1105 }
1106
1107 #[must_use]
1126 pub async fn wait_inner(&self) -> ProcStatus {
1127 let mut rx = self.watch();
1128 loop {
1129 let st = rx.borrow().clone();
1130 if st.is_exit() {
1131 return st;
1132 }
1133 if rx.changed().await.is_err() {
1135 return st;
1136 }
1137 }
1138 }
1139
1140 pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1159 let mut rx = self.watch();
1160 loop {
1161 let st = rx.borrow().clone();
1162 match &st {
1163 ProcStatus::Ready { .. } => return Ok(()),
1164 s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1165 _non_terminal => {
1166 if rx.changed().await.is_err() {
1167 return Err(ReadyError::ChannelClosed);
1168 }
1169 }
1170 }
1171 }
1172 }
1173
1174 fn child_pid_snapshot(&self) -> Option<u32> {
1177 self.child
1178 .lock()
1179 .expect("child mutex poisoned")
1180 .as_ref()
1181 .and_then(|c| c.id())
1182 }
1183
1184 fn signalable_pid(&self) -> Option<i32> {
1189 match &*self.status.lock().expect("status mutex poisoned") {
1190 ProcStatus::Running { pid, .. }
1191 | ProcStatus::Ready { pid, .. }
1192 | ProcStatus::Stopping { pid, .. } => Some(*pid as i32),
1193 _ => self
1194 .child
1195 .lock()
1196 .expect("child mutex poisoned")
1197 .as_ref()
1198 .and_then(|c| c.id())
1199 .map(|p| p as i32),
1200 }
1201 }
1202
1203 fn send_signal(pid: i32, sig: i32) -> Result<(), anyhow::Error> {
1208 let rc = unsafe { libc::kill(pid, sig) };
1211 if rc == 0 {
1212 Ok(())
1213 } else {
1214 let e = std::io::Error::last_os_error();
1215 if let Some(libc::ESRCH) = e.raw_os_error() {
1216 Ok(())
1219 } else {
1220 Err(anyhow::anyhow!("kill({pid}, {sig}) failed: {e}"))
1221 }
1222 }
1223 }
1224
1225 pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1226 *self
1227 .stdout_fwder
1228 .lock()
1229 .expect("stdout_tailer mutex poisoned") = out;
1230 *self
1231 .stderr_fwder
1232 .lock()
1233 .expect("stderr_tailer mutex poisoned") = err;
1234 }
1235
1236 fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1237 let out = self
1238 .stdout_fwder
1239 .lock()
1240 .expect("stdout_tailer mutex poisoned")
1241 .take();
1242 let err = self
1243 .stderr_fwder
1244 .lock()
1245 .expect("stderr_tailer mutex poisoned")
1246 .take();
1247 (out, err)
1248 }
1249
1250 async fn send_stop_all(
1254 &self,
1255 cx: &impl context::Actor,
1256 agent: ActorRef<ProcMeshAgent>,
1257 timeout: Duration,
1258 ) -> anyhow::Result<ProcStatus> {
1259 let mut agent_port = agent.port();
1266 agent_port.return_undeliverable(false);
1267 agent_port.send(cx, resource::StopAll {})?;
1268 match RealClock.timeout(timeout, self.wait()).await {
1271 Ok(Ok(st)) => Ok(st),
1272 Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1273 Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1274 }
1275 }
1276}
1277
1278#[async_trait]
1279impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1280 type Agent = ProcMeshAgent;
1281 type TerminalStatus = ProcStatus;
1282
1283 #[inline]
1284 fn proc_id(&self) -> &ProcId {
1285 &self.proc_id
1286 }
1287
1288 #[inline]
1289 fn addr(&self) -> Option<ChannelAddr> {
1290 match &*self.status.lock().expect("status mutex poisoned") {
1291 ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1292 _ => None,
1293 }
1294 }
1295
1296 #[inline]
1297 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1298 match &*self.status.lock().expect("status mutex poisoned") {
1299 ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1300 _ => None,
1301 }
1302 }
1303
1304 async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1315 match self.ready_inner().await {
1316 Ok(()) => Ok(()),
1317 Err(ReadyError::Terminal(status)) => {
1318 Err(hyperactor::host::ReadyError::Terminal(status))
1319 }
1320 Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1321 }
1322 }
1323
1324 async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1332 let status = self.wait_inner().await;
1333 if status.is_exit() {
1334 Ok(status)
1335 } else {
1336 Err(hyperactor::host::WaitError::ChannelClosed)
1337 }
1338 }
1339
1340 async fn terminate(
1370 &self,
1371 cx: &impl context::Actor,
1372 timeout: Duration,
1373 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1374 const HARD_WAIT_AFTER_KILL: Duration = Duration::from_secs(5);
1375
1376 let st0 = self.status();
1378 if st0.is_exit() {
1379 tracing::debug!(?st0, "terminate(): already terminal");
1380 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1381 }
1382
1383 let pid = self.signalable_pid().ok_or_else(|| {
1385 let st = self.status();
1386 tracing::warn!(?st, "terminate(): no signalable pid");
1387 hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1388 "no signalable pid (state: {:?})",
1389 st
1390 ))
1391 })?;
1392
1393 let agent = self.agent_ref();
1398 if let Some(agent) = agent {
1399 match self.send_stop_all(cx, agent.clone(), timeout).await {
1400 Ok(st) => return Ok(st),
1401 Err(e) => {
1402 tracing::warn!(
1404 "ProcMeshAgent {} could not successfully stop all actors: {}",
1405 agent.actor_id(),
1406 e,
1407 );
1408 }
1409 }
1410 }
1411 let _ = self.mark_stopping();
1414
1415 tracing::info!(pid, ?timeout, "terminate(): sending SIGTERM");
1417 if let Err(e) = Self::send_signal(pid, libc::SIGTERM) {
1418 tracing::warn!(pid, error=%e, "terminate(): SIGTERM delivery failed");
1419 return Err(hyperactor::host::TerminateError::Io(e));
1420 }
1421 tracing::debug!(pid, "terminate(): SIGTERM sent");
1422
1423 match RealClock.timeout(timeout, self.wait_inner()).await {
1425 Ok(st) if st.is_exit() => {
1426 tracing::info!(pid, ?st, "terminate(): exited after SIGTERM");
1427 Ok(st)
1428 }
1429 Ok(non_exit) => {
1430 tracing::warn!(pid, ?non_exit, "terminate(): wait returned non-terminal");
1433 Err(hyperactor::host::TerminateError::ChannelClosed)
1434 }
1435 Err(_elapsed) => {
1436 tracing::warn!(pid, "terminate(): timeout; escalating to SIGKILL");
1438 if let Some(pid2) = self.signalable_pid() {
1439 if let Err(e) = Self::send_signal(pid2, libc::SIGKILL) {
1440 tracing::warn!(pid=pid2, error=%e, "terminate(): SIGKILL delivery failed");
1441 return Err(hyperactor::host::TerminateError::Io(e));
1442 }
1443 tracing::info!(pid = pid2, "terminate(): SIGKILL sent");
1444 } else {
1445 tracing::warn!("terminate(): lost pid before SIGKILL escalation");
1446 }
1447 match RealClock
1449 .timeout(HARD_WAIT_AFTER_KILL, self.wait_inner())
1450 .await
1451 {
1452 Ok(st) if st.is_exit() => {
1453 tracing::info!(?st, "terminate(): exited after SIGKILL");
1454 Ok(st)
1455 }
1456 other => {
1457 tracing::warn!(
1458 ?other,
1459 "terminate(): post-KILL wait did not yield terminal"
1460 );
1461 Err(hyperactor::host::TerminateError::ChannelClosed)
1462 }
1463 }
1464 }
1465 }
1466 }
1467
1468 async fn kill(
1494 &self,
1495 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1496 let st0 = self.status();
1504 if st0.is_exit() {
1505 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1506 }
1507
1508 let pid = self.signalable_pid().ok_or_else(|| {
1510 hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1511 "no signalable pid (state: {:?})",
1512 self.status()
1513 ))
1514 })?;
1515
1516 if let Err(e) = Self::send_signal(pid, libc::SIGKILL) {
1517 return Err(hyperactor::host::TerminateError::Io(e));
1518 }
1519
1520 let st = self.wait_inner().await;
1522 if st.is_exit() {
1523 Ok(st)
1524 } else {
1525 Err(hyperactor::host::TerminateError::ChannelClosed)
1526 }
1527 }
1528}
1529
1530#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1532pub struct BootstrapCommand {
1533 pub program: PathBuf,
1534 pub arg0: Option<String>,
1535 pub args: Vec<String>,
1536 pub env: HashMap<String, String>,
1537}
1538wirevalue::register_type!(BootstrapCommand);
1539
1540impl BootstrapCommand {
1541 pub fn current() -> io::Result<Self> {
1544 let mut args: VecDeque<String> = std::env::args().collect();
1545 let arg0 = args.pop_front();
1546
1547 Ok(Self {
1548 program: std::env::current_exe()?,
1549 arg0,
1550 args: args.into(),
1551 env: std::env::vars().collect(),
1552 })
1553 }
1554
1555 pub fn new(&self) -> Command {
1558 let mut cmd = Command::new(&self.program);
1559 if let Some(arg0) = &self.arg0 {
1560 cmd.arg0(arg0);
1561 }
1562 for arg in &self.args {
1563 cmd.arg(arg);
1564 }
1565 for (k, v) in &self.env {
1566 cmd.env(k, v);
1567 }
1568 cmd
1569 }
1570
1571 #[cfg(test)]
1578 #[cfg(fbcode_build)]
1579 pub(crate) fn test() -> Self {
1580 Self {
1581 program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1582 arg0: None,
1583 args: vec![],
1584 env: HashMap::new(),
1585 }
1586 }
1587}
1588
1589impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1590 fn from(s: T) -> Self {
1592 Self {
1593 program: s.into(),
1594 arg0: None,
1595 args: vec![],
1596 env: HashMap::new(),
1597 }
1598 }
1599}
1600
1601#[derive(Debug)]
1622pub struct BootstrapProcManager {
1623 command: BootstrapCommand,
1625 children: Arc<tokio::sync::Mutex<HashMap<ProcId, BootstrapProcHandle>>>,
1629 pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1633 file_appender: Option<Arc<crate::logging::FileAppender>>,
1636
1637 socket_dir: TempDir,
1641}
1642
1643impl Drop for BootstrapProcManager {
1644 fn drop(&mut self) {
1661 if let Ok(table) = self.pid_table.lock() {
1662 for (proc_id, pid) in table.iter() {
1663 unsafe {
1671 libc::kill(*pid as i32, libc::SIGKILL);
1672 }
1673 tracing::info!(
1674 "BootstrapProcManager::drop: sent SIGKILL to pid {} for {:?}",
1675 pid,
1676 proc_id
1677 );
1678 }
1679 }
1680 }
1681}
1682
1683impl BootstrapProcManager {
1684 pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1691 let file_appender = if hyperactor_config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1692 match crate::logging::FileAppender::new() {
1693 Some(fm) => {
1694 tracing::info!("file appender created successfully");
1695 Some(Arc::new(fm))
1696 }
1697 None => {
1698 tracing::warn!("failed to create file appender");
1699 None
1700 }
1701 }
1702 } else {
1703 None
1704 };
1705
1706 Ok(Self {
1707 command,
1708 children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1709 pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1710 file_appender,
1711 socket_dir: runtime_dir()?,
1712 })
1713 }
1714
1715 pub fn command(&self) -> &BootstrapCommand {
1717 &self.command
1718 }
1719
1720 pub fn socket_dir(&self) -> &Path {
1722 self.socket_dir.path()
1723 }
1724
1725 pub async fn status(&self, proc_id: &ProcId) -> Option<ProcStatus> {
1736 self.children.lock().await.get(proc_id).map(|h| h.status())
1737 }
1738
1739 fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) {
1740 let pid_table = Arc::clone(&self.pid_table);
1741
1742 let maybe_child = {
1743 let mut guard = handle.child.lock().expect("child mutex");
1744 let taken = guard.take();
1745 debug_assert!(guard.is_none(), "no Child should remain in handles");
1746 taken
1747 };
1748
1749 let Some(mut child) = maybe_child else {
1750 tracing::debug!("proc {proc_id}: child was already taken; skipping wait");
1751 return;
1752 };
1753
1754 tokio::spawn(async move {
1755 let wait_res = child.wait().await;
1756
1757 let mut stderr_tail: Vec<String> = Vec::new();
1758 let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1759
1760 if let Some(t) = stderr_mon {
1761 let (lines, _bytes) = t.abort().await;
1762 stderr_tail = lines;
1763 }
1764 if let Some(t) = stdout_mon {
1765 let (_lines, _bytes) = t.abort().await;
1766 }
1767
1768 let tail_str = if stderr_tail.is_empty() {
1769 None
1770 } else {
1771 Some(stderr_tail.join("\n"))
1772 };
1773
1774 let remove_from_pid_table = || {
1775 let pid = if let Ok(mut table) = pid_table.lock() {
1776 table.remove(&proc_id)
1777 } else {
1778 None
1779 };
1780
1781 pid.map_or_else(|| "not available".to_string(), |i| format!("{i}"))
1782 };
1783
1784 match wait_res {
1785 Ok(status) => {
1786 if let Some(sig) = status.signal() {
1787 let _ = handle.mark_killed(sig, status.core_dumped());
1788 let pid_str = remove_from_pid_table();
1789 tracing::info!(
1790 name = "ProcStatus",
1791 status = "Exited::KilledBySignal",
1792 %proc_id,
1793 tail = tail_str,
1794 "killed by signal {sig}; proc's pid: {pid_str}"
1795 );
1796 } else if let Some(code) = status.code() {
1797 let _ = handle.mark_stopped(code, stderr_tail);
1798 let pid_str = remove_from_pid_table();
1799 tracing::info!(
1800 name = "ProcStatus",
1801 status = "Exited::ExitWithCode",
1802 %proc_id,
1803 exit_code = code,
1804 tail = tail_str,
1805 "proc exited; proc's pid: {pid_str}"
1806 );
1807 } else {
1808 debug_assert!(
1809 false,
1810 "unreachable: process terminated with neither signal nor exit code"
1811 );
1812 let _ = handle.mark_failed("process exited with unknown status");
1813 let pid_str = remove_from_pid_table();
1814 tracing::info!(
1815 name = "ProcStatus",
1816 status = "Exited::Unknown",
1817 %proc_id,
1818 tail = tail_str,
1819 "unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}"
1820 );
1821 }
1822 }
1823 Err(e) => {
1824 let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
1825 let pid_str = remove_from_pid_table();
1826 tracing::info!(
1827 name = "ProcStatus",
1828 status = "Exited::WaitFailed",
1829 %proc_id,
1830 tail = tail_str,
1831 "proc {proc_id} wait failed; proc's pid: {pid_str}"
1832 );
1833 }
1834 }
1835 });
1836 }
1837}
1838
1839pub struct BootstrapProcConfig {
1841 pub create_rank: usize,
1843
1844 pub client_config_override: Attrs,
1847}
1848
1849#[async_trait]
1850impl ProcManager for BootstrapProcManager {
1851 type Handle = BootstrapProcHandle;
1852
1853 type Config = BootstrapProcConfig;
1854
1855 fn transport(&self) -> ChannelTransport {
1862 ChannelTransport::Unix
1863 }
1864
1865 #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string()))]
1892 async fn spawn(
1893 &self,
1894 proc_id: ProcId,
1895 backend_addr: ChannelAddr,
1896 config: BootstrapProcConfig,
1897 ) -> Result<Self::Handle, HostError> {
1898 let (callback_addr, mut callback_rx) =
1899 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1900
1901 let overrides = &config.client_config_override;
1903 let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1904 let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1905 let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1906 let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1907
1908 let mode = Bootstrap::Proc {
1909 proc_id: proc_id.clone(),
1910 backend_addr,
1911 callback_addr,
1912 socket_dir_path: self.socket_dir.path().to_owned(),
1913 config: Some(config.client_config_override),
1914 };
1915 let mut cmd = self.command.new();
1916 cmd.env(
1917 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
1918 mode.to_env_safe_string()
1919 .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
1920 );
1921 cmd.env(
1922 "HYPERACTOR_PROCESS_NAME",
1923 format!(
1924 "proc {} @ {}",
1925 match &proc_id {
1926 ProcId::Direct(_, name) => name.clone(),
1927 ProcId::Ranked(world_id, rank) => format!("{world_id}[{rank}]"),
1928 },
1929 hostname::get()
1930 .unwrap_or_else(|_| "unknown_host".into())
1931 .into_string()
1932 .unwrap_or("unknown_host".to_string())
1933 ),
1934 );
1935
1936 if need_stdio {
1937 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1938 } else {
1939 cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
1940 tracing::info!(
1941 %proc_id, enable_forwarding, enable_file_capture, tail_size,
1942 "child stdio NOT captured (forwarding/file_capture/tail all disabled); inheriting parent console"
1943 );
1944 }
1945
1946 let log_channel: Option<ChannelAddr> = if enable_forwarding {
1947 let addr = ChannelAddr::any(ChannelTransport::Unix);
1948 cmd.env(BOOTSTRAP_LOG_CHANNEL, addr.to_string());
1949 Some(addr)
1950 } else {
1951 None
1952 };
1953
1954 let mut child = cmd
1955 .spawn()
1956 .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1957 let pid = child.id().unwrap_or_default();
1958
1959 let (out_fwder, err_fwder) = if need_stdio {
1960 let stdout: ChildStdout = child.stdout.take().expect("stdout piped but missing");
1961 let stderr: ChildStderr = child.stderr.take().expect("stderr piped but missing");
1962
1963 let (file_stdout, file_stderr) = if enable_file_capture {
1964 match self.file_appender.as_deref() {
1965 Some(fm) => (
1966 Some(fm.addr_for(OutputTarget::Stdout)),
1967 Some(fm.addr_for(OutputTarget::Stderr)),
1968 ),
1969 None => {
1970 tracing::warn!("enable_file_capture=true but no FileAppender");
1971 (None, None)
1972 }
1973 }
1974 } else {
1975 (None, None)
1976 };
1977
1978 let out = StreamFwder::start(
1979 stdout,
1980 file_stdout, OutputTarget::Stdout,
1982 tail_size,
1983 log_channel.clone(), pid,
1985 config.create_rank,
1986 );
1987 let err = StreamFwder::start(
1988 stderr,
1989 file_stderr,
1990 OutputTarget::Stderr,
1991 tail_size,
1992 log_channel.clone(),
1993 pid,
1994 config.create_rank,
1995 );
1996 (Some(out), Some(err))
1997 } else {
1998 (None, None)
1999 };
2000
2001 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2002
2003 if let Some(pid) = handle.pid() {
2004 handle.mark_running(pid, hyperactor::clock::RealClock.system_time_now());
2005 if let Ok(mut table) = self.pid_table.lock() {
2006 table.insert(proc_id.clone(), pid);
2007 }
2008 }
2009
2010 handle.set_stream_monitors(out_fwder, err_fwder);
2011
2012 {
2014 let mut children = self.children.lock().await;
2015 children.insert(proc_id.clone(), handle.clone());
2016 }
2017
2018 self.spawn_exit_monitor(proc_id.clone(), handle.clone());
2021
2022 let h = handle.clone();
2023 let pid_table = Arc::clone(&self.pid_table);
2024 tokio::spawn(async move {
2025 match callback_rx.recv().await {
2026 Ok((addr, agent)) => {
2027 let pid = match h.pid() {
2028 Some(p) => p,
2029 None => {
2030 tracing::warn!("mark_ready called with missing pid; using 0");
2031 0
2032 }
2033 };
2034 let started_at = RealClock.system_time_now();
2035 let _ = h.mark_ready(pid, started_at, addr, agent);
2036 }
2037 Err(e) => {
2038 let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
2040 if let Ok(mut table) = pid_table.lock() {
2042 table.remove(&proc_id);
2043 }
2044 }
2045 }
2046 });
2047
2048 Ok(handle)
2050 }
2051}
2052
2053#[async_trait]
2054impl hyperactor::host::SingleTerminate for BootstrapProcManager {
2055 async fn terminate_proc(
2065 &self,
2066 cx: &impl context::Actor,
2067 proc: &ProcId,
2068 timeout: Duration,
2069 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
2070 let proc_handle: Option<BootstrapProcHandle> = {
2072 let mut guard = self.children.lock().await;
2073 guard.remove(proc)
2074 };
2075
2076 if let Some(h) = proc_handle {
2077 h.terminate(cx, timeout)
2078 .await
2079 .map(|_| (Vec::new(), Vec::new()))
2080 .map_err(|e| e.into())
2081 } else {
2082 Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2083 }
2084 }
2085}
2086
2087#[async_trait]
2088impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2089 async fn terminate_all(
2103 &self,
2104 cx: &impl context::Actor,
2105 timeout: Duration,
2106 max_in_flight: usize,
2107 ) -> TerminateSummary {
2108 let handles: Vec<BootstrapProcHandle> = {
2110 let guard = self.children.lock().await;
2111 guard.values().cloned().collect()
2112 };
2113
2114 let attempted = handles.len();
2115 let mut ok = 0usize;
2116
2117 let results = stream::iter(handles.into_iter().map(|h| async move {
2118 match h.terminate(cx, timeout).await {
2119 Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2120 true
2122 }
2123 Err(e) => {
2124 tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2125 false
2126 }
2127 }
2128 }))
2129 .buffer_unordered(max_in_flight.max(1))
2130 .collect::<Vec<bool>>()
2131 .await;
2132
2133 for r in results {
2134 if r {
2135 ok += 1;
2136 }
2137 }
2138
2139 TerminateSummary {
2140 attempted,
2141 ok,
2142 failed: attempted.saturating_sub(ok),
2143 }
2144 }
2145}
2146
2147pub async fn bootstrap() -> anyhow::Error {
2162 let boot = ok!(Bootstrap::get_from_env()).unwrap_or_else(Bootstrap::default);
2163 boot.bootstrap().await
2164}
2165
2166async fn bootstrap_v0_proc_mesh(config: Option<Attrs>) -> anyhow::Error {
2176 if let Some(attrs) = config {
2178 hyperactor_config::global::set(hyperactor_config::global::Source::ClientOverride, attrs);
2179 tracing::debug!("bootstrap: installed ClientOverride config snapshot (V0ProcMesh)");
2180 } else {
2181 tracing::debug!("bootstrap: no config snapshot provided (V0ProcMesh)");
2182 }
2183 tracing::info!(
2184 "bootstrap_v0_proc_mesh config:\n{}",
2185 hyperactor_config::global::attrs()
2186 );
2187
2188 pub async fn go() -> Result<(), anyhow::Error> {
2189 let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2190 let procs_for_cleanup = procs.clone();
2191 let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2192 for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2193 if let Err(err) = proc_to_stop
2194 .destroy_and_wait::<()>(Duration::from_millis(10), None)
2195 .await
2196 {
2197 tracing::error!(
2198 "error while stopping proc {}: {}",
2199 proc_to_stop.proc_id(),
2200 err
2201 );
2202 }
2203 }
2204 }));
2205
2206 let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2207 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2208 .parse()?;
2209 let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2210 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2211 .parse()?;
2212 let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2213
2214 let entered = tracing::span!(
2215 Level::INFO,
2216 "bootstrap_v0_proc_mesh",
2217 %bootstrap_addr,
2218 %bootstrap_index,
2219 %listen_addr,
2220 )
2221 .entered();
2222
2223 let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2224 let tx = channel::dial(bootstrap_addr.clone())?;
2225
2226 let (rtx, mut return_channel) = oneshot::channel();
2227 tx.try_post(
2228 Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2229 rtx,
2230 );
2231 tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2232
2233 let _ = entered.exit();
2234
2235 let mut the_msg;
2236
2237 tokio::select! {
2238 msg = rx.recv() => {
2239 the_msg = msg;
2240 }
2241 returned_msg = &mut return_channel => {
2242 match returned_msg {
2243 Ok(msg) => {
2244 return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2245 }
2246 Err(_) => {
2247 the_msg = rx.recv().await;
2248 }
2249 }
2250 }
2251 }
2252 loop {
2253 match the_msg? {
2254 Allocator2Process::StartProc(proc_id, listen_transport) => {
2255 let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2256 let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone())
2257 .instrument(span.clone())
2258 .await?;
2259 let entered = span.entered();
2260 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2261 let handle = proc.clone().serve(proc_rx);
2262 drop(handle); let span = entered.exit();
2264 tx.send(Process2Allocator(
2265 bootstrap_index,
2266 Process2AllocatorMessage::StartedProc(
2267 proc_id.clone(),
2268 mesh_agent.bind(),
2269 proc_addr,
2270 ),
2271 ))
2272 .instrument(span)
2273 .await?;
2274 procs.lock().await.push(proc);
2275 }
2276 Allocator2Process::StopAndExit(code) => {
2277 tracing::info!("stopping procs with code {code}");
2278 {
2279 for proc_to_stop in procs.lock().await.iter_mut() {
2280 if let Err(err) = proc_to_stop
2281 .destroy_and_wait::<()>(Duration::from_millis(10), None)
2282 .await
2283 {
2284 tracing::error!(
2285 "error while stopping proc {}: {}",
2286 proc_to_stop.proc_id(),
2287 err
2288 );
2289 }
2290 }
2291 }
2292 tracing::info!("exiting with {code}");
2293 std::process::exit(code);
2294 }
2295 Allocator2Process::Exit(code) => {
2296 tracing::info!("exiting with {code}");
2297 std::process::exit(code);
2298 }
2299 }
2300 the_msg = rx.recv().await;
2301 }
2302 }
2303
2304 go().await.unwrap_err()
2305}
2306
2307pub async fn bootstrap_or_die() -> ! {
2310 let err = bootstrap().await;
2311 let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2312 tracing::error!("failed to bootstrap mesh process: {}", err);
2313 std::process::exit(1)
2314}
2315
2316#[derive(enum_as_inner::EnumAsInner)]
2317enum DebugSink {
2318 File(std::fs::File),
2319 Sink,
2320}
2321
2322impl DebugSink {
2323 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2324 match self {
2325 DebugSink::File(f) => f.write(buf),
2326 DebugSink::Sink => Ok(buf.len()),
2327 }
2328 }
2329 fn flush(&mut self) -> io::Result<()> {
2330 match self {
2331 DebugSink::File(f) => f.flush(),
2332 DebugSink::Sink => Ok(()),
2333 }
2334 }
2335}
2336
2337fn debug_sink() -> &'static Mutex<DebugSink> {
2338 static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2339 DEBUG_SINK.get_or_init(|| {
2340 let debug_path = {
2341 let mut p = std::env::temp_dir();
2342 if let Ok(user) = std::env::var("USER") {
2343 p.push(user);
2344 }
2345 std::fs::create_dir_all(&p).ok();
2346 p.push("monarch-bootstrap-debug.log");
2347 p
2348 };
2349 let sink = if debug_path.exists() {
2350 match OpenOptions::new()
2351 .append(true)
2352 .create(true)
2353 .open(debug_path.clone())
2354 {
2355 Ok(f) => DebugSink::File(f),
2356 Err(_e) => {
2357 eprintln!(
2358 "failed to open {} for bootstrap debug logging",
2359 debug_path.display()
2360 );
2361 DebugSink::Sink
2362 }
2363 }
2364 } else {
2365 DebugSink::Sink
2366 };
2367 Mutex::new(sink)
2368 })
2369}
2370
2371const DEBUG_TO_STDERR: bool = false;
2373
2374struct Debug;
2377
2378impl Debug {
2379 fn is_active() -> bool {
2380 DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2381 }
2382}
2383
2384impl Write for Debug {
2385 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2386 let res = debug_sink().lock().unwrap().write(buf);
2387 if DEBUG_TO_STDERR {
2388 let n = match res {
2389 Ok(n) => n,
2390 Err(_) => buf.len(),
2391 };
2392 let _ = io::stderr().write_all(&buf[..n]);
2393 }
2394
2395 res
2396 }
2397 fn flush(&mut self) -> io::Result<()> {
2398 let res = debug_sink().lock().unwrap().flush();
2399 if DEBUG_TO_STDERR {
2400 let _ = io::stderr().flush();
2401 }
2402 res
2403 }
2404}
2405
2406fn runtime_dir() -> io::Result<TempDir> {
2409 match std::env::var_os("XDG_RUNTIME_DIR") {
2410 Some(runtime_dir) => {
2411 let path = PathBuf::from(runtime_dir);
2412 tempfile::tempdir_in(path)
2413 }
2414 None => tempfile::tempdir(),
2415 }
2416}
2417
2418#[cfg(test)]
2419mod tests {
2420 use std::path::PathBuf;
2421 use std::process::Stdio;
2422
2423 use hyperactor::ActorId;
2424 use hyperactor::ActorRef;
2425 use hyperactor::ProcId;
2426 use hyperactor::RemoteSpawn;
2427 use hyperactor::WorldId;
2428 use hyperactor::channel::ChannelAddr;
2429 use hyperactor::channel::ChannelTransport;
2430 use hyperactor::channel::TcpMode;
2431 use hyperactor::clock::RealClock;
2432 use hyperactor::context::Mailbox as _;
2433 use hyperactor::host::ProcHandle;
2434 use hyperactor::id;
2435 use ndslice::Extent;
2436 use ndslice::ViewExt;
2437 use ndslice::extent;
2438 use tokio::process::Command;
2439
2440 use super::*;
2441 use crate::alloc::AllocSpec;
2442 use crate::alloc::Allocator;
2443 use crate::alloc::ProcessAllocator;
2444 use crate::v1::ActorMesh;
2445 use crate::v1::host_mesh::HostMesh;
2446 use crate::v1::testactor;
2447 use crate::v1::testing;
2448
2449 fn any_addr_for_test() -> ChannelAddr {
2452 ChannelAddr::any(ChannelTransport::Unix)
2453 }
2454
2455 #[test]
2456 fn test_bootstrap_mode_env_string_none_config_proc() {
2457 let values = [
2458 Bootstrap::default(),
2459 Bootstrap::Proc {
2460 proc_id: id!(foo[0]),
2461 backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2462 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2463 socket_dir_path: PathBuf::from("notexist"),
2464 config: None,
2465 },
2466 ];
2467
2468 for value in values {
2469 let safe = value.to_env_safe_string().unwrap();
2470 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2471
2472 let safe2 = round.to_env_safe_string().unwrap();
2475 assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2476
2477 match (&value, &round) {
2479 (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2480 (
2481 Bootstrap::V0ProcMesh { config: None },
2482 Bootstrap::V0ProcMesh { config: None },
2483 ) => {}
2484 _ => panic!("decoded variant mismatch: got {:?}", round),
2485 }
2486 }
2487 }
2488
2489 #[test]
2490 fn test_bootstrap_mode_env_string_none_config_host() {
2491 let value = Bootstrap::Host {
2492 addr: ChannelAddr::any(ChannelTransport::Unix),
2493 command: None,
2494 config: None,
2495 };
2496
2497 let safe = value.to_env_safe_string().unwrap();
2498 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2499
2500 let safe2 = round.to_env_safe_string().unwrap();
2502 assert_eq!(safe, safe2);
2503
2504 match round {
2506 Bootstrap::Host { config: None, .. } => {}
2507 other => panic!("expected Host with None config, got {:?}", other),
2508 }
2509 }
2510
2511 #[test]
2512 fn test_bootstrap_mode_env_string_invalid() {
2513 assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2515 }
2516
2517 #[test]
2518 fn test_bootstrap_config_snapshot_roundtrip() {
2519 let mut attrs = Attrs::new();
2521 attrs[MESH_TAIL_LOG_LINES] = 123;
2522 attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2523
2524 let socket_dir = runtime_dir().unwrap();
2525
2526 {
2528 let original = Bootstrap::Proc {
2529 proc_id: id!(foo[42]),
2530 backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2531 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2532 config: Some(attrs.clone()),
2533 socket_dir_path: socket_dir.path().to_owned(),
2534 };
2535 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2536 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2537 match &decoded {
2538 Bootstrap::Proc { config, .. } => {
2539 let cfg = config.as_ref().expect("expected Some(attrs)");
2540 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2541 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2542 }
2543 other => panic!("unexpected variant after roundtrip: {:?}", other),
2544 }
2545 }
2546
2547 {
2549 let original = Bootstrap::Host {
2550 addr: ChannelAddr::any(ChannelTransport::Unix),
2551 command: None,
2552 config: Some(attrs.clone()),
2553 };
2554 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2555 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2556 match &decoded {
2557 Bootstrap::Host { config, .. } => {
2558 let cfg = config.as_ref().expect("expected Some(attrs)");
2559 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2560 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2561 }
2562 other => panic!("unexpected variant after roundtrip: {:?}", other),
2563 }
2564 }
2565
2566 {
2568 let original = Bootstrap::V0ProcMesh {
2569 config: Some(attrs.clone()),
2570 };
2571 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2572 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2573 match &decoded {
2574 Bootstrap::V0ProcMesh { config } => {
2575 let cfg = config.as_ref().expect("expected Some(attrs)");
2576 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2577 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2578 }
2579 other => panic!("unexpected variant after roundtrip: {:?}", other),
2580 }
2581 }
2582 }
2583
2584 #[tokio::test]
2585 async fn test_child_terminated_on_manager_drop() {
2586 use std::path::PathBuf;
2587 use std::process::Stdio;
2588
2589 use tokio::process::Command;
2590
2591 let command = BootstrapCommand {
2593 program: PathBuf::from("/bin/true"),
2594 ..Default::default()
2595 };
2596 let manager = BootstrapProcManager::new(command).unwrap();
2597
2598 let mut cmd = Command::new("/bin/sleep");
2601 cmd.arg("30")
2602 .stdout(Stdio::null())
2603 .stderr(Stdio::null())
2604 .kill_on_drop(true);
2605
2606 let child = cmd.spawn().expect("spawn sleep");
2607 let pid = child.id().expect("pid");
2608
2609 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "test".to_string());
2612 {
2613 let mut children = manager.children.lock().await;
2614 children.insert(
2615 proc_id.clone(),
2616 BootstrapProcHandle::new(proc_id.clone(), child),
2617 );
2618 }
2619
2620 #[cfg(target_os = "linux")]
2622 {
2623 let path = format!("/proc/{}", pid);
2624 assert!(
2625 std::fs::metadata(&path).is_ok(),
2626 "expected /proc/{pid} to exist before drop"
2627 );
2628 }
2629
2630 drop(manager);
2635
2636 #[cfg(target_os = "linux")]
2639 {
2640 let deadline = std::time::Instant::now() + Duration::from_millis(1500);
2641 let proc_dir = format!("/proc/{}", pid);
2642 let status_file = format!("{}/status", proc_dir);
2643
2644 let mut ok = false;
2645 while std::time::Instant::now() < deadline {
2646 match std::fs::read_to_string(&status_file) {
2647 Ok(s) => {
2648 if let Some(state_line) = s.lines().find(|l| l.starts_with("State:")) {
2649 if state_line.contains('Z') {
2650 ok = true;
2652 break;
2653 } else {
2654 }
2656 }
2657 }
2658 Err(_) => {
2659 ok = true;
2661 break;
2662 }
2663 }
2664 RealClock.sleep(Duration::from_millis(100)).await;
2665 }
2666
2667 assert!(ok, "expected /proc/{pid} to be gone or zombie after drop");
2668 }
2669
2670 }
2673
2674 #[tokio::test]
2675 async fn test_v1_child_logging() {
2676 use hyperactor::channel;
2677 use hyperactor::mailbox::BoxedMailboxSender;
2678 use hyperactor::mailbox::DialMailboxRouter;
2679 use hyperactor::mailbox::MailboxServer;
2680 use hyperactor::proc::Proc;
2681
2682 use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2683 use crate::logging::LogClientActor;
2684 use crate::logging::LogClientMessageClient;
2685 use crate::logging::LogForwardActor;
2686 use crate::logging::LogMessage;
2687 use crate::logging::OutputTarget;
2688 use crate::logging::test_tap;
2689
2690 let router = DialMailboxRouter::new();
2691 let (proc_addr, proc_rx) =
2692 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2693 let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
2694 proc.clone().serve(proc_rx);
2695 router.bind(id!(client[0]).into(), proc_addr.clone());
2696 let (client, _handle) = proc.instance("client").unwrap();
2697
2698 let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2699 test_tap::install(tap_tx);
2700
2701 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2702 unsafe {
2704 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2705 }
2706
2707 let log_client_actor = LogClientActor::new(()).await.unwrap();
2710 let log_client: ActorRef<LogClientActor> =
2711 proc.spawn("log_client", log_client_actor).unwrap().bind();
2712 log_client.set_aggregate(&client, None).await.unwrap();
2713
2714 let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap();
2717 let _log_forwarder: ActorRef<LogForwardActor> = proc
2718 .spawn("log_forwarder", log_forwarder_actor)
2719 .unwrap()
2720 .bind();
2721
2722 let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2725
2726 tx.post(LogMessage::Log {
2729 hostname: "testhost".into(),
2730 pid: 12345,
2731 output_target: OutputTarget::Stdout,
2732 payload: wirevalue::Any::serialize(&"hello from child".to_string()).unwrap(),
2733 });
2734
2735 let line = RealClock
2737 .timeout(Duration::from_secs(2), tap_rx.recv())
2738 .await
2739 .expect("timed out waiting for log line")
2740 .expect("tap channel closed unexpectedly");
2741 assert!(
2742 line.contains("hello from child"),
2743 "log line did not appear via LogClientActor; got: {line}"
2744 );
2745 }
2746
2747 mod proc_handle {
2748
2749 use hyperactor::ActorId;
2750 use hyperactor::ActorRef;
2751 use hyperactor::ProcId;
2752 use hyperactor::WorldId;
2753 use hyperactor::host::ProcHandle;
2754 use tokio::process::Command;
2755
2756 use super::super::*;
2757 use super::any_addr_for_test;
2758
2759 fn handle_for_test() -> BootstrapProcHandle {
2763 let child = Command::new("sh")
2765 .arg("-c")
2766 .arg("exit 0")
2767 .stdin(std::process::Stdio::null())
2768 .stdout(std::process::Stdio::null())
2769 .stderr(std::process::Stdio::null())
2770 .spawn()
2771 .expect("failed to spawn test child process");
2772
2773 let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2774 BootstrapProcHandle::new(proc_id, child)
2775 }
2776
2777 #[tokio::test]
2778 async fn starting_to_running_ok() {
2779 let h = handle_for_test();
2780 assert!(matches!(h.status(), ProcStatus::Starting));
2781 let child_pid = h.pid().expect("child should have a pid");
2782 let child_started_at = RealClock.system_time_now();
2783 assert!(h.mark_running(child_pid, child_started_at));
2784 match h.status() {
2785 ProcStatus::Running { pid, started_at } => {
2786 assert_eq!(pid, child_pid);
2787 assert_eq!(started_at, child_started_at);
2788 }
2789 other => panic!("expected Running, got {other:?}"),
2790 }
2791 }
2792
2793 #[tokio::test]
2794 async fn running_to_stopping_to_stopped_ok() {
2795 let h = handle_for_test();
2796 let child_pid = h.pid().expect("child should have a pid");
2797 let child_started_at = RealClock.system_time_now();
2798 assert!(h.mark_running(child_pid, child_started_at));
2799 assert!(h.mark_stopping());
2800 assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2801 assert!(h.mark_stopped(0, Vec::new()));
2802 assert!(matches!(
2803 h.status(),
2804 ProcStatus::Stopped { exit_code: 0, .. }
2805 ));
2806 }
2807
2808 #[tokio::test]
2809 async fn running_to_killed_ok() {
2810 let h = handle_for_test();
2811 let child_pid = h.pid().expect("child should have a pid");
2812 let child_started_at = RealClock.system_time_now();
2813 assert!(h.mark_running(child_pid, child_started_at));
2814 assert!(h.mark_killed(9, true));
2815 assert!(matches!(
2816 h.status(),
2817 ProcStatus::Killed {
2818 signal: 9,
2819 core_dumped: true
2820 }
2821 ));
2822 }
2823
2824 #[tokio::test]
2825 async fn running_to_failed_ok() {
2826 let h = handle_for_test();
2827 let child_pid = h.pid().expect("child should have a pid");
2828 let child_started_at = RealClock.system_time_now();
2829 assert!(h.mark_running(child_pid, child_started_at));
2830 assert!(h.mark_failed("bootstrap error"));
2831 match h.status() {
2832 ProcStatus::Failed { reason } => {
2833 assert_eq!(reason, "bootstrap error");
2834 }
2835 other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2836 }
2837 }
2838
2839 #[tokio::test]
2840 async fn illegal_transitions_are_rejected() {
2841 let h = handle_for_test();
2842 let child_pid = h.pid().expect("child should have a pid");
2843 let child_started_at = RealClock.system_time_now();
2844 assert!(h.mark_running(child_pid, child_started_at));
2846 assert!(!h.mark_running(child_pid, RealClock.system_time_now()));
2847 match h.status() {
2848 ProcStatus::Running { pid, .. } => assert_eq!(pid, child_pid),
2849 other => panic!("expected Running, got {other:?}"),
2850 }
2851 assert!(h.mark_stopping());
2853 assert!(h.mark_stopped(0, Vec::new()));
2854 assert!(!h.mark_running(child_pid, child_started_at));
2855 assert!(!h.mark_killed(9, false));
2856 assert!(!h.mark_failed("nope"));
2857
2858 assert!(matches!(
2859 h.status(),
2860 ProcStatus::Stopped { exit_code: 0, .. }
2861 ));
2862 }
2863
2864 #[tokio::test]
2865 async fn transitions_from_ready_are_legal() {
2866 let h = handle_for_test();
2867 let addr = any_addr_for_test();
2868 let pid = h.pid().expect("child should have a pid");
2870 let t0 = RealClock.system_time_now();
2871 assert!(h.mark_running(pid, t0));
2872 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2875 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2876 let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2877 assert!(h.mark_ready(pid, t0, addr, agent_ref));
2879 assert!(h.mark_stopping());
2880 assert!(h.mark_stopped(0, Vec::new()));
2881 }
2882
2883 #[tokio::test]
2884 async fn ready_to_killed_is_legal() {
2885 let h = handle_for_test();
2886 let addr = any_addr_for_test();
2887 let pid = h.pid().expect("child should have a pid");
2889 let t0 = RealClock.system_time_now();
2890 assert!(h.mark_running(pid, t0));
2891 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2894 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2895 let agent: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2896 assert!(h.mark_ready(pid, t0, addr, agent));
2898 assert!(h.mark_killed(9, false));
2900 }
2901
2902 #[tokio::test]
2903 async fn mark_stopping_from_starting_uses_child_pid_when_available() {
2904 let h = handle_for_test();
2905
2906 let child_pid = h
2909 .pid()
2910 .expect("Child::id() should be available in Starting");
2911
2912 assert!(
2915 h.mark_stopping(),
2916 "mark_stopping() should succeed from Starting"
2917 );
2918 match h.status() {
2919 ProcStatus::Stopping { pid, started_at } => {
2920 assert_eq!(pid, child_pid, "Stopping pid should come from Child::id()");
2921 assert!(
2922 started_at <= RealClock.system_time_now(),
2923 "started_at should be sane"
2924 );
2925 }
2926 other => panic!("expected Stopping{{..}}; got {other:?}"),
2927 }
2928 }
2929
2930 #[tokio::test]
2931 async fn mark_stopping_noop_when_no_child_pid_available() {
2932 let h = handle_for_test();
2933
2934 {
2938 let _ = h.child.lock().expect("child mutex").take();
2939 }
2940
2941 assert!(
2944 !h.mark_stopping(),
2945 "mark_stopping() should no-op from Starting when no pid is observable"
2946 );
2947 assert!(matches!(h.status(), ProcStatus::Starting));
2948 }
2949
2950 #[tokio::test]
2951 async fn mark_failed_from_stopping_is_allowed() {
2952 let h = handle_for_test();
2953
2954 assert!(h.mark_stopping(), "precondition: to Stopping");
2957
2958 assert!(
2960 h.mark_failed("boom"),
2961 "mark_failed() should succeed from Stopping"
2962 );
2963 match h.status() {
2964 ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2965 other => panic!("expected Failed(\"boom\"), got {other:?}"),
2966 }
2967 }
2968 }
2969
2970 #[tokio::test]
2971 async fn test_exit_monitor_updates_status_on_clean_exit() {
2972 let command = BootstrapCommand {
2973 program: PathBuf::from("/bin/true"),
2974 ..Default::default()
2975 };
2976 let manager = BootstrapProcManager::new(command).unwrap();
2977
2978 let mut cmd = Command::new("true");
2980 cmd.stdout(Stdio::null()).stderr(Stdio::null());
2981 let child = cmd.spawn().expect("spawn true");
2982
2983 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "clean".into());
2984 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2985
2986 {
2988 let mut children = manager.children.lock().await;
2989 children.insert(proc_id.clone(), handle.clone());
2990 }
2991 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2992 {
2993 let guard = handle.child.lock().expect("child mutex");
2994 assert!(
2995 guard.is_none(),
2996 "expected Child to be taken by exit monitor"
2997 );
2998 }
2999
3000 let st = handle.wait_inner().await;
3001 assert!(matches!(st, ProcStatus::Stopped { .. }), "status={st:?}");
3002 }
3003
3004 #[tokio::test]
3005 async fn test_exit_monitor_updates_status_on_kill() {
3006 let command = BootstrapCommand {
3007 program: PathBuf::from("/bin/sleep"),
3008 ..Default::default()
3009 };
3010 let manager = BootstrapProcManager::new(command).unwrap();
3011
3012 let mut cmd = Command::new("/bin/sleep");
3014 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3015 let child = cmd.spawn().expect("spawn sleep");
3016 let pid = child.id().expect("pid") as i32;
3017
3018 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "killed".into());
3020 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3021 {
3022 let mut children = manager.children.lock().await;
3023 children.insert(proc_id.clone(), handle.clone());
3024 }
3025 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3026 {
3027 let guard = handle.child.lock().expect("child mutex");
3028 assert!(
3029 guard.is_none(),
3030 "expected Child to be taken by exit monitor"
3031 );
3032 }
3033 #[cfg(unix)]
3035 unsafe {
3043 libc::kill(pid, libc::SIGKILL);
3044 }
3045
3046 let st = handle.wait_inner().await;
3047 match st {
3048 ProcStatus::Killed { signal, .. } => assert_eq!(signal, libc::SIGKILL),
3049 other => panic!("expected Killed(SIGKILL), got {other:?}"),
3050 }
3051 }
3052
3053 #[tokio::test]
3054 async fn watch_notifies_on_status_changes() {
3055 let child = Command::new("sh")
3056 .arg("-c")
3057 .arg("sleep 0.1")
3058 .stdin(std::process::Stdio::null())
3059 .stdout(std::process::Stdio::null())
3060 .stderr(std::process::Stdio::null())
3061 .spawn()
3062 .expect("spawn");
3063
3064 let proc_id = ProcId::Ranked(WorldId("test".into()), 1);
3065 let handle = BootstrapProcHandle::new(proc_id, child);
3066 let mut rx = handle.watch();
3067
3068 let pid = handle.pid().unwrap_or(0);
3070 let now = RealClock.system_time_now();
3071 assert!(handle.mark_running(pid, now));
3072 rx.changed().await.ok(); match &*rx.borrow() {
3074 ProcStatus::Running { pid: p, started_at } => {
3075 assert_eq!(*p, pid);
3076 assert_eq!(*started_at, now);
3077 }
3078 s => panic!("expected Running, got {s:?}"),
3079 }
3080
3081 assert!(handle.mark_stopped(0, Vec::new()));
3083 rx.changed().await.ok(); assert!(matches!(
3085 &*rx.borrow(),
3086 ProcStatus::Stopped { exit_code: 0, .. }
3087 ));
3088 }
3089
3090 #[tokio::test]
3091 async fn ready_errs_if_process_exits_before_running() {
3092 let child = Command::new("sh")
3094 .arg("-c")
3095 .arg("exit 7")
3096 .stdin(std::process::Stdio::null())
3097 .stdout(std::process::Stdio::null())
3098 .stderr(std::process::Stdio::null())
3099 .spawn()
3100 .expect("spawn");
3101
3102 let proc_id = ProcId::Direct(
3103 ChannelAddr::any(ChannelTransport::Unix),
3104 "early-exit".into(),
3105 );
3106 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3107
3108 assert!(handle.mark_stopped(7, Vec::new()));
3111
3112 match handle.ready_inner().await {
3114 Ok(()) => panic!("ready() unexpectedly succeeded"),
3115 Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3116 assert_eq!(exit_code, 7)
3117 }
3118 Err(other) => panic!("expected Stopped(7), got {other:?}"),
3119 }
3120 }
3121
3122 #[tokio::test]
3123 async fn status_unknown_proc_is_none() {
3124 let manager = BootstrapProcManager::new(BootstrapCommand {
3125 program: PathBuf::from("/bin/true"),
3126 ..Default::default()
3127 })
3128 .unwrap();
3129 let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
3130 assert!(manager.status(&unknown).await.is_none());
3131 }
3132
3133 #[tokio::test]
3134 async fn exit_monitor_child_already_taken_leaves_status_unchanged() {
3135 let manager = BootstrapProcManager::new(BootstrapCommand {
3136 program: PathBuf::from("/bin/sleep"),
3137 ..Default::default()
3138 })
3139 .unwrap();
3140
3141 let mut cmd = Command::new("/bin/sleep");
3143 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3144 let child = cmd.spawn().expect("spawn sleep");
3145
3146 let proc_id = ProcId::Direct(
3147 ChannelAddr::any(ChannelTransport::Unix),
3148 "already-taken".into(),
3149 );
3150 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3151
3152 {
3155 let mut children = manager.children.lock().await;
3156 children.insert(proc_id.clone(), handle.clone());
3157 }
3158 {
3159 let _ = handle.child.lock().expect("child mutex").take();
3160 }
3161
3162 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3164
3165 assert!(matches!(
3168 manager.status(&proc_id).await,
3169 Some(ProcStatus::Starting)
3170 ));
3171 }
3172
3173 #[tokio::test]
3174 async fn pid_none_after_exit_monitor_takes_child() {
3175 let manager = BootstrapProcManager::new(BootstrapCommand {
3176 program: PathBuf::from("/bin/sleep"),
3177 ..Default::default()
3178 })
3179 .unwrap();
3180
3181 let mut cmd = Command::new("/bin/sleep");
3182 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3183 let child = cmd.spawn().expect("spawn sleep");
3184
3185 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "pid-none".into());
3186 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3187
3188 assert!(handle.pid().is_some());
3190 {
3191 let mut children = manager.children.lock().await;
3192 children.insert(proc_id.clone(), handle.clone());
3193 }
3194 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3197
3198 assert!(handle.pid().is_none());
3201 }
3202
3203 #[tokio::test]
3204 async fn starting_may_directly_be_marked_stopped() {
3205 let child = Command::new("sh")
3208 .arg("-c")
3209 .arg("exit 0")
3210 .stdin(std::process::Stdio::null())
3211 .stdout(std::process::Stdio::null())
3212 .stderr(std::process::Stdio::null())
3213 .spawn()
3214 .expect("spawn true");
3215
3216 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "fast-exit".into());
3217 let handle = BootstrapProcHandle::new(proc_id, child);
3218
3219 let mut c = {
3221 let mut guard = handle.child.lock().expect("child mutex");
3222 guard.take()
3223 }
3224 .expect("child already taken");
3225
3226 let status = c.wait().await.expect("wait");
3227 let code = status.code().unwrap_or(0);
3228 assert!(handle.mark_stopped(code, Vec::new()));
3229
3230 assert!(matches!(
3231 handle.status(),
3232 ProcStatus::Stopped { exit_code: 0, .. }
3233 ));
3234 }
3235
3236 #[tokio::test]
3237 async fn handle_ready_allows_waiters() {
3238 let child = Command::new("sh")
3239 .arg("-c")
3240 .arg("sleep 0.1")
3241 .stdin(std::process::Stdio::null())
3242 .stdout(std::process::Stdio::null())
3243 .stderr(std::process::Stdio::null())
3244 .spawn()
3245 .expect("spawn sleep");
3246 let proc_id = ProcId::Ranked(WorldId("test".into()), 42);
3247 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3248
3249 let pid = handle.pid().expect("child should have a pid");
3250 let started_at = RealClock.system_time_now();
3251 assert!(handle.mark_running(pid, started_at));
3252
3253 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3254 let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
3255
3256 let ready_addr = any_addr_for_test();
3259
3260 assert!(handle.mark_ready(pid, started_at, ready_addr.clone(), agent_ref));
3262 handle
3263 .ready_inner()
3264 .await
3265 .expect("ready_inner() should complete after Ready");
3266
3267 match handle.status() {
3270 ProcStatus::Ready {
3271 pid: p,
3272 started_at: t,
3273 addr: a,
3274 ..
3275 } => {
3276 assert_eq!(p, pid);
3277 assert_eq!(t, started_at);
3278 assert_eq!(a, ready_addr);
3279 }
3280 other => panic!("expected Ready, got {other:?}"),
3281 }
3282 }
3283
3284 #[tokio::test]
3285 async fn pid_behavior_across_states_running_ready_then_stopped() {
3286 let child = Command::new("sh")
3287 .arg("-c")
3288 .arg("sleep 0.1")
3289 .stdin(std::process::Stdio::null())
3290 .stdout(std::process::Stdio::null())
3291 .stderr(std::process::Stdio::null())
3292 .spawn()
3293 .expect("spawn");
3294
3295 let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
3296 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3297
3298 let pid = handle.pid().expect("initial Child::id");
3300 let t0 = RealClock.system_time_now();
3301 assert!(handle.mark_running(pid, t0));
3302 assert_eq!(handle.pid(), Some(pid));
3303
3304 let addr = any_addr_for_test();
3306 let agent = {
3307 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3308 ActorRef::<ProcMeshAgent>::attest(actor_id)
3309 };
3310 assert!(handle.mark_ready(pid, t0, addr, agent));
3311 {
3312 let _ = handle.child.lock().expect("child mutex").take();
3313 }
3314 assert_eq!(handle.pid(), Some(pid));
3315
3316 assert!(handle.mark_stopped(0, Vec::new()));
3318 assert_eq!(handle.pid(), None, "pid() should be None once terminal");
3319 }
3320
3321 #[tokio::test]
3322 async fn pid_is_available_in_ready_even_after_child_taken() {
3323 let child = Command::new("sh")
3324 .arg("-c")
3325 .arg("sleep 0.1")
3326 .stdin(std::process::Stdio::null())
3327 .stdout(std::process::Stdio::null())
3328 .stderr(std::process::Stdio::null())
3329 .spawn()
3330 .expect("spawn");
3331
3332 let proc_id = ProcId::Ranked(WorldId("test".into()), 99);
3333 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3334
3335 let pid = handle.pid().expect("child should have pid (via Child::id)");
3337 let started_at = RealClock.system_time_now();
3338 assert!(handle.mark_running(pid, started_at));
3339
3340 let addr = any_addr_for_test();
3342 let agent = {
3343 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3344 ActorRef::<ProcMeshAgent>::attest(actor_id)
3345 };
3346 assert!(handle.mark_ready(pid, started_at, addr, agent));
3347
3348 {
3351 let _ = handle.child.lock().expect("child mutex").take();
3352 }
3353
3354 assert_eq!(handle.pid(), Some(pid), "pid() should be cached in Ready");
3357 }
3358
3359 #[test]
3360 fn display_running_includes_pid_and_uptime() {
3361 let started_at = RealClock.system_time_now() - Duration::from_secs(42);
3362 let st = ProcStatus::Running {
3363 pid: 1234,
3364 started_at,
3365 };
3366
3367 let s = format!("{}", st);
3368 assert!(s.contains("1234"));
3369 assert!(s.contains("Running"));
3370 assert!(s.contains("42s"));
3371 }
3372
3373 #[test]
3374 fn display_ready_includes_pid_and_addr() {
3375 let started_at = RealClock.system_time_now() - Duration::from_secs(5);
3376 let addr = ChannelAddr::any(ChannelTransport::Unix);
3377 let agent =
3378 ActorRef::attest(ProcId::Direct(addr.clone(), "proc".into()).actor_id("agent", 0));
3379
3380 let st = ProcStatus::Ready {
3381 pid: 4321,
3382 started_at,
3383 addr: addr.clone(),
3384 agent,
3385 };
3386
3387 let s = format!("{}", st);
3388 assert!(s.contains("4321")); assert!(s.contains(&addr.to_string())); assert!(s.contains("Ready"));
3391 }
3392
3393 #[test]
3394 fn display_stopped_includes_exit_code() {
3395 let st = ProcStatus::Stopped {
3396 exit_code: 7,
3397 stderr_tail: Vec::new(),
3398 };
3399 let s = format!("{}", st);
3400 assert!(s.contains("Stopped"));
3401 assert!(s.contains("7"));
3402 }
3403
3404 #[test]
3405 fn display_other_variants_does_not_panic() {
3406 let samples = vec![
3407 ProcStatus::Starting,
3408 ProcStatus::Stopping {
3409 pid: 42,
3410 started_at: RealClock.system_time_now(),
3411 },
3412 ProcStatus::Ready {
3413 pid: 42,
3414 started_at: RealClock.system_time_now(),
3415 addr: ChannelAddr::any(ChannelTransport::Unix),
3416 agent: ActorRef::attest(
3417 ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "x".into())
3418 .actor_id("agent", 0),
3419 ),
3420 },
3421 ProcStatus::Killed {
3422 signal: 9,
3423 core_dumped: false,
3424 },
3425 ProcStatus::Failed {
3426 reason: "boom".into(),
3427 },
3428 ];
3429
3430 for st in samples {
3431 let _ = format!("{}", st); }
3433 }
3434
3435 #[tokio::test]
3436 async fn proc_handle_ready_ok_through_trait() {
3437 let child = Command::new("sh")
3438 .arg("-c")
3439 .arg("sleep 0.1")
3440 .stdin(Stdio::null())
3441 .stdout(Stdio::null())
3442 .stderr(Stdio::null())
3443 .spawn()
3444 .expect("spawn");
3445
3446 let proc_id = ProcId::Direct(any_addr_for_test(), "ph-ready-ok".into());
3447 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3448
3449 let pid = handle.pid().expect("pid");
3451 let t0 = RealClock.system_time_now();
3452 assert!(handle.mark_running(pid, t0));
3453
3454 let addr = any_addr_for_test();
3456 let agent: ActorRef<ProcMeshAgent> =
3457 ActorRef::attest(ActorId(proc_id.clone(), "agent".into(), 0));
3458 assert!(handle.mark_ready(pid, t0, addr, agent));
3459
3460 let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3462 assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3463 }
3464
3465 #[tokio::test]
3466 async fn proc_handle_wait_returns_terminal_status() {
3467 let child = Command::new("sh")
3468 .arg("-c")
3469 .arg("exit 0")
3470 .stdin(Stdio::null())
3471 .stdout(Stdio::null())
3472 .stderr(Stdio::null())
3473 .spawn()
3474 .expect("spawn");
3475
3476 let proc_id = ProcId::Direct(any_addr_for_test(), "ph-wait".into());
3477 let handle = BootstrapProcHandle::new(proc_id, child);
3478
3479 assert!(handle.mark_stopped(0, Vec::new()));
3481
3482 let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3484 .await
3485 .expect("wait should return Ok(terminal)");
3486
3487 match st {
3488 ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3489 other => panic!("expected Stopped(0), got {other:?}"),
3490 }
3491 }
3492
3493 #[tokio::test]
3494 async fn ready_wrapper_maps_terminal_to_trait_error() {
3495 let child = Command::new("sh")
3496 .arg("-c")
3497 .arg("exit 7")
3498 .stdin(Stdio::null())
3499 .stdout(Stdio::null())
3500 .stderr(Stdio::null())
3501 .spawn()
3502 .expect("spawn");
3503 let proc_id = ProcId::Direct(any_addr_for_test(), "wrap".into());
3504 let handle = BootstrapProcHandle::new(proc_id, child);
3505
3506 assert!(handle.mark_stopped(7, Vec::new()));
3507
3508 match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3509 Ok(()) => panic!("expected Err"),
3510 Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3511 exit_code, ..
3512 })) => {
3513 assert_eq!(exit_code, 7);
3514 }
3515 Err(e) => panic!("unexpected error: {e:?}"),
3516 }
3517 }
3518
3519 async fn make_proc_id_and_backend_addr(
3529 instance: &hyperactor::Instance<()>,
3530 _tag: &str,
3531 ) -> (ProcId, ChannelAddr) {
3532 let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3535
3536 instance.proc().clone().serve(rx);
3540
3541 let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
3544 (proc_id, backend_addr)
3545 }
3546
3547 #[tokio::test]
3548 #[cfg(fbcode_build)]
3549 async fn bootstrap_handle_terminate_graceful() {
3550 let root =
3552 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3553 let (instance, _handle) = root.instance("client").unwrap();
3554
3555 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3556 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3557 let handle = mgr
3558 .spawn(
3559 proc_id.clone(),
3560 backend_addr.clone(),
3561 BootstrapProcConfig {
3562 create_rank: 0,
3563 client_config_override: Attrs::new(),
3564 },
3565 )
3566 .await
3567 .expect("spawn bootstrap child");
3568
3569 handle.ready().await.expect("ready");
3570
3571 let deadline = Duration::from_secs(2);
3572 match RealClock
3573 .timeout(deadline * 2, handle.terminate(&instance, deadline))
3574 .await
3575 {
3576 Err(_) => panic!("terminate() future hung"),
3577 Ok(Ok(st)) => {
3578 match st {
3579 ProcStatus::Stopped { exit_code, .. } => {
3580 assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3582 }
3583 ProcStatus::Killed { signal, .. } => {
3584 assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3602 }
3603 other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3604 }
3605 }
3606 Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3607 }
3608 }
3609
3610 #[tokio::test]
3611 #[cfg(fbcode_build)]
3612 async fn bootstrap_handle_kill_forced() {
3613 let root =
3615 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string()).unwrap();
3616 let (instance, _handle) = root.instance("client").unwrap();
3617
3618 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3619
3620 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3622
3623 let handle = mgr
3625 .spawn(
3626 proc_id.clone(),
3627 backend_addr.clone(),
3628 BootstrapProcConfig {
3629 create_rank: 0,
3630 client_config_override: Attrs::new(),
3631 },
3632 )
3633 .await
3634 .expect("spawn bootstrap child");
3635
3636 handle.ready().await.expect("ready");
3639
3640 let deadline = Duration::from_secs(5);
3643 match RealClock.timeout(deadline, handle.kill()).await {
3644 Err(_) => panic!("kill() future hung"),
3645 Ok(Ok(st)) => {
3646 match st {
3648 ProcStatus::Killed { signal, .. } => {
3649 assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3651 }
3652 other => panic!("expected Killed status after kill(); got: {other:?}"),
3653 }
3654 }
3655 Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3656 }
3657 }
3658
3659 #[tokio::test]
3660 #[cfg(fbcode_build)]
3661 async fn bootstrap_canonical_simple() {
3662 unsafe {
3664 std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3665 }
3666 let instance = testing::instance();
3669
3670 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3672 "monarch/hyperactor_mesh/bootstrap",
3673 )));
3674 let alloc = allocator
3676 .allocate(AllocSpec {
3677 extent: extent!(replicas = 1),
3678 constraints: Default::default(),
3679 proc_name: None,
3680 transport: ChannelTransport::Unix,
3681 proc_allocation_mode: Default::default(),
3682 })
3683 .await
3684 .unwrap();
3685
3686 let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3713 .await
3714 .unwrap();
3715
3716 let proc_mesh = host_mesh
3731 .spawn(&instance, "p0", Extent::unity())
3732 .await
3733 .unwrap();
3734
3735 let actor_mesh: ActorMesh<testactor::TestActor> =
3751 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3752
3753 let (port, mut rx) = instance.mailbox().open_port();
3759 actor_mesh
3760 .cast(&instance, testactor::GetActorId(port.bind()))
3761 .unwrap();
3762 let got_id = rx.recv().await.unwrap();
3763 assert_eq!(
3764 got_id,
3765 actor_mesh.values().next().unwrap().actor_id().clone()
3766 );
3767
3768 host_mesh.shutdown(&instance).await.expect("host shutdown");
3772 }
3773
3774 #[tokio::test]
3775 async fn exit_tail_is_attached_and_logged() {
3776 hyperactor_telemetry::initialize_logging_for_test();
3777
3778 let lock = hyperactor_config::global::lock();
3779 let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
3780
3781 let mut cmd = Command::new("sh");
3783 cmd.arg("-c")
3784 .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7")
3785 .stdout(Stdio::piped())
3786 .stderr(Stdio::piped());
3787
3788 let child = cmd.spawn().expect("spawn");
3789 let pid = child.id().unwrap_or_default();
3790
3791 let proc_id = hyperactor::id!(testproc[0]);
3794 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3795
3796 {
3799 let mut guard = handle.child.lock().expect("child mutex poisoned");
3801 if let Some(child) = guard.as_mut() {
3802 let out = child.stdout.take().expect("child stdout must be piped");
3803 let err = child.stderr.take().expect("child stderr must be piped");
3804
3805 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
3807 let tail_size = hyperactor_config::global::get(MESH_TAIL_LOG_LINES);
3808
3809 let stdout_monitor = StreamFwder::start(
3811 out,
3812 None,
3813 OutputTarget::Stdout,
3814 tail_size,
3815 Some(log_channel.clone()),
3816 pid,
3817 0,
3818 );
3819
3820 let stderr_monitor = StreamFwder::start(
3821 err,
3822 None,
3823 OutputTarget::Stderr,
3824 tail_size,
3825 Some(log_channel.clone()),
3826 pid,
3827 0,
3828 );
3829
3830 handle.set_stream_monitors(Some(stdout_monitor), Some(stderr_monitor));
3832 tracing::info!("set stream monitors");
3833 } else {
3834 panic!("child already taken before wiring tailers");
3835 }
3836 }
3837
3838 let manager = BootstrapProcManager::new(BootstrapCommand {
3841 program: std::path::PathBuf::from("/bin/true"), ..Default::default()
3843 })
3844 .unwrap();
3845
3846 RealClock.sleep(Duration::from_millis(1000)).await;
3848
3849 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3851
3852 let st = RealClock
3855 .timeout(Duration::from_secs(2), handle.wait_inner())
3856 .await
3857 .expect("wait_inner() timed out (exit monitor stuck?)");
3858
3859 match st {
3860 ProcStatus::Stopped {
3861 exit_code,
3862 stderr_tail,
3863 } => {
3864 assert_eq!(
3865 exit_code, 7,
3866 "unexpected exit code; stderr_tail={:?}",
3867 stderr_tail
3868 );
3869 let tail = stderr_tail.join("\n");
3870 assert!(tail.contains("boom-1"), "missing boom-1; tail:\n{tail}");
3871 assert!(tail.contains("boom-2"), "missing boom-2; tail:\n{tail}");
3872 }
3873 other => panic!("expected Stopped(7), got {other:?}"),
3874 }
3875 }
3876
3877 #[tokio::test]
3878 #[cfg(fbcode_build)]
3879 async fn test_host_bootstrap() {
3880 use crate::proc_mesh::mesh_agent::NewClientInstanceClient;
3881 use crate::v1::host_mesh::mesh_agent::GetLocalProcClient;
3882
3883 let temp_proc = Proc::local();
3886 let (temp_instance, _) = temp_proc.instance("temp").unwrap();
3887
3888 let handle = host(any_addr_for_test(), Some(BootstrapCommand::test()), None)
3889 .await
3890 .unwrap();
3891
3892 let local_proc = handle.get_local_proc(&temp_instance).await.unwrap();
3893 let _local_instance = local_proc
3894 .new_client_instance(&temp_instance)
3895 .await
3896 .unwrap();
3897 }
3898}