1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::env::VarError;
12use std::fmt;
13use std::fs::OpenOptions;
14use std::future;
15use std::io;
16use std::io::Write;
17use std::os::unix::process::ExitStatusExt;
18use std::path::PathBuf;
19use std::process::Stdio;
20use std::sync::Arc;
21use std::sync::Mutex;
22use std::sync::OnceLock;
23use std::time::Duration;
24use std::time::SystemTime;
25
26use async_trait::async_trait;
27use base64::prelude::*;
28use futures::StreamExt;
29use futures::stream;
30use humantime::format_duration;
31use hyperactor::ActorRef;
32use hyperactor::Named;
33use hyperactor::ProcId;
34use hyperactor::attrs::Attrs;
35use hyperactor::channel;
36use hyperactor::channel::ChannelAddr;
37use hyperactor::channel::ChannelTransport;
38use hyperactor::channel::Rx;
39use hyperactor::channel::Tx;
40use hyperactor::clock::Clock;
41use hyperactor::clock::RealClock;
42use hyperactor::config::CONFIG_ENV_VAR;
43use hyperactor::config::global as config;
44use hyperactor::declare_attrs;
45use hyperactor::host;
46use hyperactor::host::Host;
47use hyperactor::host::HostError;
48use hyperactor::host::ProcHandle;
49use hyperactor::host::ProcManager;
50use hyperactor::host::TerminateSummary;
51use hyperactor::mailbox::MailboxServer;
52use hyperactor::proc::Proc;
53use serde::Deserialize;
54use serde::Serialize;
55use tokio::process::Child;
56use tokio::process::Command;
57use tokio::sync::oneshot;
58use tokio::sync::watch;
59
60use crate::alloc::logtailer::LogTailer;
61use crate::logging::create_log_writers;
62use crate::proc_mesh::mesh_agent::ProcMeshAgent;
63use crate::v1;
64use crate::v1::host_mesh::mesh_agent::HostAgentMode;
65use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
66
67declare_attrs! {
68 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string())
75 pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
76
77 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TAIL_LOG_LINES".to_string())
81 pub attr MESH_TAIL_LOG_LINES: usize = 100;
82
83 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string())
88 pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
89
90 @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string())
94 pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
95}
96
97pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
98pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
99pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
100pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
104
105#[derive(Debug, Clone, Serialize, Deserialize, Named)]
109pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
110
111#[derive(Debug, Clone, Serialize, Deserialize, Named)]
113pub(crate) enum Process2AllocatorMessage {
114 Hello(ChannelAddr),
118
119 StartedProc(ProcId, ActorRef<ProcMeshAgent>, ChannelAddr),
124
125 Heartbeat,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Named)]
130pub(crate) enum Allocator2Process {
131 StartProc(ProcId, ChannelTransport),
134
135 StopAndExit(i32),
138
139 Exit(i32),
142}
143
144async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
145 let tx = match channel::dial(bootstrap_addr.clone()) {
146 Ok(tx) => tx,
147
148 Err(err) => {
149 tracing::error!(
150 "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
151 bootstrap_addr,
152 err
153 );
154 std::process::exit(1);
155 }
156 };
157 tracing::info!(
158 "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
159 );
160 loop {
161 RealClock.sleep(Duration::from_secs(5)).await;
162
163 let result = tx
164 .send(Process2Allocator(
165 bootstrap_index,
166 Process2AllocatorMessage::Heartbeat,
167 ))
168 .await;
169
170 if let Err(err) = result {
171 tracing::error!(
172 "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
173 bootstrap_addr,
174 err
175 );
176 std::process::exit(1);
177 }
178 }
179}
180
181#[macro_export]
182macro_rules! ok {
183 ($expr:expr $(,)?) => {
184 match $expr {
185 Ok(value) => value,
186 Err(e) => return ::anyhow::Error::from(e),
187 }
188 };
189}
190
191async fn halt<R>() -> R {
192 future::pending::<()>().await;
193 unreachable!()
194}
195
196#[derive(Clone, Default, Debug, Serialize, Deserialize)]
205pub enum Bootstrap {
206 Proc {
208 proc_id: ProcId,
210 backend_addr: ChannelAddr,
213 callback_addr: ChannelAddr,
215 config: Option<Attrs>,
220 },
221
222 Host {
225 addr: ChannelAddr,
227 command: Option<BootstrapCommand>,
230 config: Option<Attrs>,
235 },
236
237 #[default]
238 V0ProcMesh, }
240
241impl Bootstrap {
242 fn to_env_safe_string(&self) -> v1::Result<String> {
245 Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
246 }
247
248 fn from_env_safe_string(str: &str) -> v1::Result<Self> {
250 let data = BASE64_STANDARD.decode(str)?;
251 let data = std::str::from_utf8(&data)?;
252 Ok(serde_json::from_str(data)?)
253 }
254
255 pub fn get_from_env() -> anyhow::Result<Option<Self>> {
258 match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
259 Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
260 Ok(mode) => Ok(Some(mode)),
261 Err(e) => {
262 Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
263 }
264 },
265 Err(VarError::NotPresent) => Ok(None),
266 Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
267 }
268 }
269
270 pub fn to_env(&self, cmd: &mut Command) {
272 cmd.env(
273 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
274 self.to_env_safe_string().unwrap(),
275 );
276 }
277
278 pub async fn bootstrap(self) -> anyhow::Error {
281 tracing::info!(
282 "bootstrapping mesh process: {}",
283 serde_json::to_string(&self).unwrap()
284 );
285
286 if Debug::is_active() {
287 let mut buf = Vec::new();
288 writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
289 #[cfg(unix)]
290 writeln!(
291 &mut buf,
292 "\tparent pid: {}",
293 std::os::unix::process::parent_id()
294 )
295 .unwrap();
296 writeln!(
297 &mut buf,
298 "\tconfig: {}",
299 serde_json::to_string(&self).unwrap()
300 )
301 .unwrap();
302 match std::env::current_exe() {
303 Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
304 Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
305 }
306 writeln!(&mut buf, "\targs:").unwrap();
307 for arg in std::env::args() {
308 writeln!(&mut buf, "\t\t{}", arg).unwrap();
309 }
310 writeln!(&mut buf, "\tenv:").unwrap();
311 for (key, val) in std::env::vars() {
312 writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
313 }
314 let _ = Debug.write(&buf);
315 if let Ok(s) = std::str::from_utf8(&buf) {
316 tracing::info!("{}", s);
317 } else {
318 tracing::info!("{:?}", buf);
319 }
320 }
321
322 match self {
323 Bootstrap::Proc {
324 proc_id,
325 backend_addr,
326 callback_addr,
327 config,
328 } => {
329 if let Some(attrs) = config {
330 config::set(config::Source::Runtime, attrs);
331 tracing::debug!("bootstrap: installed Runtime config snapshot (Proc)");
332 } else {
333 tracing::debug!("bootstrap: no config snapshot provided (Proc)");
334 }
335
336 if hyperactor::config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
337 let _ = install_pdeathsig_kill();
342 } else {
343 eprintln!("(bootstrap) PDEATHSIG disabled via config");
344 }
345
346 let result =
347 host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
348 ProcMeshAgent::boot_v1(proc).await
349 })
350 .await;
351 match result {
352 Ok(_proc) => halt().await,
353 Err(e) => e.into(),
354 }
355 }
356 Bootstrap::Host {
357 addr,
358 command,
359 config,
360 } => {
361 if let Some(attrs) = config {
362 config::set(config::Source::Runtime, attrs);
363 tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
364 } else {
365 tracing::debug!("bootstrap: no config snapshot provided (Host)");
366 }
367
368 let command = match command {
369 Some(command) => command,
370 None => ok!(BootstrapCommand::current()),
371 };
372 let manager = BootstrapProcManager::new(command);
373 let (host, _handle) = ok!(Host::serve(manager, addr).await);
374 let addr = host.addr().clone();
375 let host_mesh_agent = ok!(host
376 .system_proc()
377 .clone()
378 .spawn::<HostMeshAgent>("agent", HostAgentMode::Process(host))
379 .await);
380
381 tracing::info!(
382 "serving host at {}, agent: {}",
383 addr,
384 host_mesh_agent.bind::<HostMeshAgent>()
385 );
386 halt().await
387 }
388 Bootstrap::V0ProcMesh => bootstrap_v0_proc_mesh().await,
389 }
390 }
391
392 pub async fn bootstrap_or_die(self) -> ! {
395 let err = self.bootstrap().await;
396 tracing::error!("failed to bootstrap mesh process: {}", err);
397 std::process::exit(1)
398 }
399}
400
401pub fn install_pdeathsig_kill() -> io::Result<()> {
403 #[cfg(target_os = "linux")]
404 {
405 let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
408 if rc != 0 {
409 return Err(io::Error::last_os_error());
410 }
411 }
412 if unsafe { libc::getppid() } == 1 {
419 std::process::exit(0);
420 }
421 Ok(())
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
439pub enum ProcStatus {
440 Starting,
443 Running { pid: u32, started_at: SystemTime },
447 Ready {
451 pid: u32,
452 started_at: SystemTime,
453 addr: ChannelAddr,
454 agent: ActorRef<ProcMeshAgent>,
455 },
456 Stopping { pid: u32, started_at: SystemTime },
460 Stopped {
463 exit_code: i32,
464 stderr_tail: Vec<String>,
465 },
466 Killed { signal: i32, core_dumped: bool },
469 Failed { reason: String },
473}
474
475impl ProcStatus {
476 #[inline]
480 pub fn is_exit(&self) -> bool {
481 matches!(
482 self,
483 ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
484 )
485 }
486}
487
488impl std::fmt::Display for ProcStatus {
489 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490 match self {
491 ProcStatus::Starting => write!(f, "Starting"),
492 ProcStatus::Running { pid, started_at } => {
493 let uptime = started_at
494 .elapsed()
495 .map(|d| format!(" up {}", format_duration(d)))
496 .unwrap_or_default();
497 write!(f, "Running[{pid}]{uptime}")
498 }
499 ProcStatus::Ready {
500 pid,
501 started_at,
502 addr,
503 ..
504 } => {
505 let uptime = started_at
506 .elapsed()
507 .map(|d| format!(" up {}", format_duration(d)))
508 .unwrap_or_default();
509 write!(f, "Ready[{pid}] at {addr}{uptime}")
510 }
511 ProcStatus::Stopping { pid, started_at } => {
512 let uptime = started_at
513 .elapsed()
514 .map(|d| format!(" up {}", format_duration(d)))
515 .unwrap_or_default();
516 write!(f, "Stopping[{pid}]{uptime}")
517 }
518 ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
519 ProcStatus::Killed {
520 signal,
521 core_dumped,
522 } => {
523 if *core_dumped {
524 write!(f, "Killed(sig={signal}, core)")
525 } else {
526 write!(f, "Killed(sig={signal})")
527 }
528 }
529 ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
536pub enum ReadyError {
537 Terminal(ProcStatus),
539 ChannelClosed,
541}
542
543impl std::fmt::Display for ReadyError {
544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545 match self {
546 ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
547 ReadyError::ChannelClosed => write!(f, "status channel closed"),
548 }
549 }
550}
551impl std::error::Error for ReadyError {}
552
553#[derive(Clone)]
592pub struct BootstrapProcHandle {
593 proc_id: ProcId,
595 status: Arc<std::sync::Mutex<ProcStatus>>,
601 child: Arc<std::sync::Mutex<Option<Child>>>,
605 stdout_tailer: Arc<std::sync::Mutex<Option<LogTailer>>>,
610 stderr_tailer: Arc<std::sync::Mutex<Option<LogTailer>>>,
613 tx: tokio::sync::watch::Sender<ProcStatus>,
618 rx: tokio::sync::watch::Receiver<ProcStatus>,
622}
623
624impl fmt::Debug for BootstrapProcHandle {
625 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626 let status = self.status.lock().expect("status mutex poisoned").clone();
627 f.debug_struct("BootstrapProcHandle")
628 .field("proc_id", &self.proc_id)
629 .field("status", &status)
630 .field("child", &"<child>")
631 .field("tx", &"<watch::Sender>")
632 .field("rx", &"<watch::Receiver>")
633 .finish()
636 }
637}
638
639impl BootstrapProcHandle {
645 pub fn new(proc_id: ProcId, child: Child) -> Self {
658 let (tx, rx) = watch::channel(ProcStatus::Starting);
659 Self {
660 proc_id,
661 status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
662 child: Arc::new(std::sync::Mutex::new(Some(child))),
663 stdout_tailer: Arc::new(std::sync::Mutex::new(None)),
664 stderr_tailer: Arc::new(std::sync::Mutex::new(None)),
665 tx,
666 rx,
667 }
668 }
669
670 #[inline]
672 pub fn proc_id(&self) -> &ProcId {
673 &self.proc_id
674 }
675
676 #[inline]
689 pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
690 self.rx.clone()
691 }
692
693 #[inline]
711 pub async fn changed(&self) {
712 let _ = self.watch().changed().await;
713 }
714
715 #[inline]
723 pub fn pid(&self) -> Option<u32> {
724 match *self.status.lock().expect("status mutex poisoned") {
725 ProcStatus::Running { pid, .. } | ProcStatus::Ready { pid, .. } => Some(pid),
726 _ => self
727 .child
728 .lock()
729 .expect("child mutex poisoned")
730 .as_ref()
731 .and_then(|c| c.id()),
732 }
733 }
734
735 #[must_use]
748 pub fn status(&self) -> ProcStatus {
749 self.status.lock().expect("status mutex poisoned").clone()
753 }
754
755 fn transition<F>(&self, f: F) -> bool
761 where
762 F: FnOnce(&mut ProcStatus) -> bool,
763 {
764 let mut guard = self.status.lock().expect("status mutex poisoned");
765 let _before = guard.clone();
766 let changed = f(&mut guard);
767 if changed {
768 let _ = self.tx.send(guard.clone());
770 }
771 changed
772 }
773
774 pub(crate) fn mark_running(&self, pid: u32, started_at: SystemTime) -> bool {
786 self.transition(|st| match *st {
787 ProcStatus::Starting => {
788 *st = ProcStatus::Running { pid, started_at };
789 true
790 }
791 _ => {
792 tracing::warn!(
793 "illegal transition: {:?} -> Running; leaving status unchanged",
794 *st
795 );
796 false
797 }
798 })
799 }
800
801 pub(crate) fn mark_ready(
813 &self,
814 pid: u32,
815 started_at: SystemTime,
816 addr: ChannelAddr,
817 agent: ActorRef<ProcMeshAgent>,
818 ) -> bool {
819 self.transition(|st| match st {
820 ProcStatus::Starting | ProcStatus::Running { .. } => {
821 *st = ProcStatus::Ready {
822 pid,
823 started_at,
824 addr,
825 agent,
826 };
827 true
828 }
829 _ => {
830 tracing::warn!(
831 "illegal transition: {:?} -> Ready; leaving status unchanged",
832 st
833 );
834 false
835 }
836 })
837 }
838
839 pub(crate) fn mark_stopping(&self) -> bool {
843 let child_pid = self.child_pid_snapshot();
845 let now = hyperactor::clock::RealClock.system_time_now();
846
847 self.transition(|st| match *st {
848 ProcStatus::Running { pid, started_at } => {
849 *st = ProcStatus::Stopping { pid, started_at };
850 true
851 }
852 ProcStatus::Ready {
853 pid, started_at, ..
854 } => {
855 *st = ProcStatus::Stopping { pid, started_at };
856 true
857 }
858 ProcStatus::Starting => {
859 if let Some(pid) = child_pid {
860 *st = ProcStatus::Stopping {
861 pid,
862 started_at: now,
863 };
864 true
865 } else {
866 false
867 }
868 }
869 _ => false,
870 })
871 }
872
873 pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
876 self.transition(|st| match *st {
877 ProcStatus::Starting
878 | ProcStatus::Running { .. }
879 | ProcStatus::Ready { .. }
880 | ProcStatus::Stopping { .. } => {
881 *st = ProcStatus::Stopped {
882 exit_code,
883 stderr_tail,
884 };
885 true
886 }
887 _ => {
888 tracing::warn!(
889 "illegal transition: {:?} -> Stopped; leaving status unchanged",
890 *st
891 );
892 false
893 }
894 })
895 }
896
897 pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
900 self.transition(|st| match *st {
901 ProcStatus::Starting
902 | ProcStatus::Running { .. }
903 | ProcStatus::Ready { .. }
904 | ProcStatus::Stopping { .. } => {
905 *st = ProcStatus::Killed {
906 signal,
907 core_dumped,
908 };
909 true
910 }
911 _ => {
912 tracing::warn!(
913 "illegal transition: {:?} -> Killed; leaving status unchanged",
914 *st
915 );
916 false
917 }
918 })
919 }
920
921 pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
924 self.transition(|st| match *st {
925 ProcStatus::Starting
926 | ProcStatus::Running { .. }
927 | ProcStatus::Ready { .. }
928 | ProcStatus::Stopping { .. } => {
929 *st = ProcStatus::Failed {
930 reason: reason.into(),
931 };
932 true
933 }
934 _ => {
935 tracing::warn!(
936 "illegal transition: {:?} -> Failed; leaving status unchanged",
937 *st
938 );
939 false
940 }
941 })
942 }
943
944 #[must_use]
963 pub async fn wait_inner(&self) -> ProcStatus {
964 let mut rx = self.watch();
965 loop {
966 let st = rx.borrow().clone();
967 if st.is_exit() {
968 return st;
969 }
970 if rx.changed().await.is_err() {
972 return st;
973 }
974 }
975 }
976
977 pub async fn ready_inner(&self) -> Result<(), ReadyError> {
996 let mut rx = self.watch();
997 loop {
998 let st = rx.borrow().clone();
999 match &st {
1000 ProcStatus::Ready { .. } => return Ok(()),
1001 s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1002 _non_terminal => {
1003 if rx.changed().await.is_err() {
1004 return Err(ReadyError::ChannelClosed);
1005 }
1006 }
1007 }
1008 }
1009 }
1010
1011 fn child_pid_snapshot(&self) -> Option<u32> {
1014 self.child
1015 .lock()
1016 .expect("child mutex poisoned")
1017 .as_ref()
1018 .and_then(|c| c.id())
1019 }
1020
1021 fn signalable_pid(&self) -> Option<i32> {
1026 match &*self.status.lock().expect("status mutex poisoned") {
1027 ProcStatus::Running { pid, .. }
1028 | ProcStatus::Ready { pid, .. }
1029 | ProcStatus::Stopping { pid, .. } => Some(*pid as i32),
1030 _ => self
1031 .child
1032 .lock()
1033 .expect("child mutex poisoned")
1034 .as_ref()
1035 .and_then(|c| c.id())
1036 .map(|p| p as i32),
1037 }
1038 }
1039
1040 fn send_signal(pid: i32, sig: i32) -> Result<(), anyhow::Error> {
1045 let rc = unsafe { libc::kill(pid, sig) };
1048 if rc == 0 {
1049 Ok(())
1050 } else {
1051 let e = std::io::Error::last_os_error();
1052 if let Some(libc::ESRCH) = e.raw_os_error() {
1053 Ok(())
1056 } else {
1057 Err(anyhow::anyhow!("kill({pid}, {sig}) failed: {e}"))
1058 }
1059 }
1060 }
1061
1062 pub fn set_tailers(&self, out: Option<LogTailer>, err: Option<LogTailer>) {
1063 *self
1064 .stdout_tailer
1065 .lock()
1066 .expect("stdout_tailer mutex poisoned") = out;
1067 *self
1068 .stderr_tailer
1069 .lock()
1070 .expect("stderr_tailer mutex poisoned") = err;
1071 }
1072
1073 fn take_tailers(&self) -> (Option<LogTailer>, Option<LogTailer>) {
1074 let out = self
1075 .stdout_tailer
1076 .lock()
1077 .expect("stdout_tailer mutex poisoned")
1078 .take();
1079 let err = self
1080 .stderr_tailer
1081 .lock()
1082 .expect("stderr_tailer mutex poisoned")
1083 .take();
1084 (out, err)
1085 }
1086}
1087
1088#[async_trait]
1089impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1090 type Agent = ProcMeshAgent;
1091 type TerminalStatus = ProcStatus;
1092
1093 #[inline]
1094 fn proc_id(&self) -> &ProcId {
1095 &self.proc_id
1096 }
1097
1098 #[inline]
1099 fn addr(&self) -> Option<ChannelAddr> {
1100 match &*self.status.lock().expect("status mutex poisoned") {
1101 ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1102 _ => None,
1103 }
1104 }
1105
1106 #[inline]
1107 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1108 match &*self.status.lock().expect("status mutex poisoned") {
1109 ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1110 _ => None,
1111 }
1112 }
1113
1114 async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1125 match self.ready_inner().await {
1126 Ok(()) => Ok(()),
1127 Err(ReadyError::Terminal(status)) => {
1128 Err(hyperactor::host::ReadyError::Terminal(status))
1129 }
1130 Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1131 }
1132 }
1133
1134 async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1142 let status = self.wait_inner().await;
1143 if status.is_exit() {
1144 Ok(status)
1145 } else {
1146 Err(hyperactor::host::WaitError::ChannelClosed)
1147 }
1148 }
1149
1150 async fn terminate(
1180 &self,
1181 timeout: Duration,
1182 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1183 const HARD_WAIT_AFTER_KILL: Duration = Duration::from_secs(5);
1184
1185 let st0 = self.status();
1187 if st0.is_exit() {
1188 tracing::debug!(?st0, "terminate(): already terminal");
1189 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1190 }
1191
1192 let pid = self.signalable_pid().ok_or_else(|| {
1194 let st = self.status();
1195 tracing::warn!(?st, "terminate(): no signalable pid");
1196 hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1197 "no signalable pid (state: {:?})",
1198 st
1199 ))
1200 })?;
1201
1202 let _ = self.mark_stopping();
1204
1205 tracing::info!(pid, ?timeout, "terminate(): sending SIGTERM");
1207 if let Err(e) = Self::send_signal(pid, libc::SIGTERM) {
1208 tracing::warn!(pid, error=%e, "terminate(): SIGTERM delivery failed");
1209 return Err(hyperactor::host::TerminateError::Io(e));
1210 }
1211 tracing::debug!(pid, "terminate(): SIGTERM sent");
1212
1213 match RealClock.timeout(timeout, self.wait_inner()).await {
1215 Ok(st) if st.is_exit() => {
1216 tracing::info!(pid, ?st, "terminate(): exited after SIGTERM");
1217 Ok(st)
1218 }
1219 Ok(non_exit) => {
1220 tracing::warn!(pid, ?non_exit, "terminate(): wait returned non-terminal");
1223 Err(hyperactor::host::TerminateError::ChannelClosed)
1224 }
1225 Err(_elapsed) => {
1226 tracing::warn!(pid, "terminate(): timeout; escalating to SIGKILL");
1228 if let Some(pid2) = self.signalable_pid() {
1229 if let Err(e) = Self::send_signal(pid2, libc::SIGKILL) {
1230 tracing::warn!(pid=pid2, error=%e, "terminate(): SIGKILL delivery failed");
1231 return Err(hyperactor::host::TerminateError::Io(e));
1232 }
1233 tracing::info!(pid = pid2, "terminate(): SIGKILL sent");
1234 } else {
1235 tracing::warn!("terminate(): lost pid before SIGKILL escalation");
1236 }
1237 match RealClock
1239 .timeout(HARD_WAIT_AFTER_KILL, self.wait_inner())
1240 .await
1241 {
1242 Ok(st) if st.is_exit() => {
1243 tracing::info!(?st, "terminate(): exited after SIGKILL");
1244 Ok(st)
1245 }
1246 other => {
1247 tracing::warn!(
1248 ?other,
1249 "terminate(): post-KILL wait did not yield terminal"
1250 );
1251 Err(hyperactor::host::TerminateError::ChannelClosed)
1252 }
1253 }
1254 }
1255 }
1256 }
1257
1258 async fn kill(
1284 &self,
1285 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1286 let st0 = self.status();
1294 if st0.is_exit() {
1295 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1296 }
1297
1298 let pid = self.signalable_pid().ok_or_else(|| {
1300 hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1301 "no signalable pid (state: {:?})",
1302 self.status()
1303 ))
1304 })?;
1305
1306 if let Err(e) = Self::send_signal(pid, libc::SIGKILL) {
1307 return Err(hyperactor::host::TerminateError::Io(e));
1308 }
1309
1310 let st = self.wait_inner().await;
1312 if st.is_exit() {
1313 Ok(st)
1314 } else {
1315 Err(hyperactor::host::TerminateError::ChannelClosed)
1316 }
1317 }
1318}
1319
1320#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1322pub struct BootstrapCommand {
1323 pub program: PathBuf,
1324 pub arg0: Option<String>,
1325 pub args: Vec<String>,
1326 pub env: HashMap<String, String>,
1327}
1328
1329impl BootstrapCommand {
1330 pub fn current() -> io::Result<Self> {
1333 let mut args: VecDeque<String> = std::env::args().collect();
1334 let arg0 = args.pop_front();
1335
1336 Ok(Self {
1337 program: std::env::current_exe()?,
1338 arg0,
1339 args: args.into(),
1340 env: std::env::vars().collect(),
1341 })
1342 }
1343
1344 #[cfg(test)]
1351 pub(crate) fn test() -> Self {
1352 Self {
1353 program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1354 arg0: None,
1355 args: vec![],
1356 env: HashMap::new(),
1357 }
1358 }
1359}
1360
1361impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1362 fn from(s: T) -> Self {
1364 Self {
1365 program: s.into(),
1366 arg0: None,
1367 args: vec![],
1368 env: HashMap::new(),
1369 }
1370 }
1371}
1372
1373#[derive(Debug)]
1394pub struct BootstrapProcManager {
1395 command: BootstrapCommand,
1397 children: Arc<tokio::sync::Mutex<HashMap<ProcId, BootstrapProcHandle>>>,
1401 pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1405}
1406
1407impl Drop for BootstrapProcManager {
1408 fn drop(&mut self) {
1425 if let Ok(table) = self.pid_table.lock() {
1426 for (proc_id, pid) in table.iter() {
1427 unsafe {
1435 libc::kill(*pid as i32, libc::SIGKILL);
1436 }
1437 tracing::info!(
1438 "BootstrapProcManager::drop: sent SIGKILL to pid {} for {:?}",
1439 pid,
1440 proc_id
1441 );
1442 }
1443 }
1444 }
1445}
1446
1447impl BootstrapProcManager {
1448 pub(crate) fn new(command: BootstrapCommand) -> Self {
1455 Self {
1456 command,
1457 children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1458 pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1459 }
1460 }
1461
1462 pub fn command(&self) -> &BootstrapCommand {
1464 &self.command
1465 }
1466
1467 pub async fn status(&self, proc_id: &ProcId) -> Option<ProcStatus> {
1478 self.children.lock().await.get(proc_id).map(|h| h.status())
1479 }
1480
1481 fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) {
1482 let pid_table = Arc::clone(&self.pid_table);
1483
1484 let (stdout_tailer, stderr_tailer) = handle.take_tailers();
1485
1486 let maybe_child = {
1487 let mut guard = handle.child.lock().expect("child mutex");
1488 let taken = guard.take();
1489 debug_assert!(guard.is_none(), "no Child should remain in handles");
1490 taken
1491 };
1492
1493 let Some(mut child) = maybe_child else {
1494 tracing::debug!("proc {proc_id}: child was already taken; skipping wait");
1495 return;
1496 };
1497
1498 tokio::spawn(async move {
1499 let wait_res = child.wait().await;
1500
1501 let mut stderr_tail: Vec<String> = Vec::new();
1502 if let Some(t) = stderr_tailer {
1503 let (lines, _bytes) = t.join().await;
1504 stderr_tail = lines;
1505 }
1506 if let Some(t) = stdout_tailer {
1507 let (_lines, _bytes) = t.join().await;
1508 }
1509
1510 match wait_res {
1511 Ok(status) => {
1512 if let Some(sig) = status.signal() {
1513 let _ = handle.mark_killed(sig, status.core_dumped());
1514 if let Ok(mut table) = pid_table.lock() {
1515 table.remove(&proc_id);
1516 }
1517 if stderr_tail.is_empty() {
1518 tracing::debug!("proc {proc_id} killed by signal {sig}");
1519 } else {
1520 let tail = stderr_tail.join("\n");
1521 tracing::debug!(
1522 "proc {proc_id} killed by signal {sig}; stderr tail:\n{tail}"
1523 );
1524 }
1525 } else if let Some(code) = status.code() {
1526 let _ = handle.mark_stopped(code, stderr_tail.clone());
1527 if let Ok(mut table) = pid_table.lock() {
1528 table.remove(&proc_id);
1529 }
1530 let tail_str = if stderr_tail.is_empty() {
1531 None
1532 } else {
1533 Some(stderr_tail.join("\n"))
1534 };
1535 if code == 0 {
1536 tracing::debug!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited");
1537 } else {
1538 tracing::info!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited");
1539 }
1540 } else {
1541 debug_assert!(
1542 false,
1543 "unreachable: process terminated with neither signal nor exit code"
1544 );
1545 tracing::error!(
1546 "proc {proc_id}: unreachable exit status (no code, no signal)"
1547 );
1548 let _ = handle.mark_failed("process exited with unknown status");
1549 if let Ok(mut table) = pid_table.lock() {
1550 table.remove(&proc_id);
1551 }
1552 if stderr_tail.is_empty() {
1553 tracing::warn!("proc {proc_id} unknown exit");
1554 } else {
1555 let tail = stderr_tail.join("\n");
1556 tracing::warn!("proc {proc_id} unknown exit; stderr tail:\n{tail}");
1557 }
1558 }
1559 }
1560 Err(e) => {
1561 let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
1562 if let Ok(mut table) = pid_table.lock() {
1563 table.remove(&proc_id);
1564 }
1565 if stderr_tail.is_empty() {
1566 tracing::info!("proc {proc_id} wait failed");
1567 } else {
1568 let tail = stderr_tail.join("\n");
1569 tracing::info!("proc {proc_id} wait failed; stderr tail:\n{tail}");
1570 }
1571 }
1572 }
1573 });
1574 }
1575}
1576
1577#[async_trait]
1578impl ProcManager for BootstrapProcManager {
1579 type Handle = BootstrapProcHandle;
1580
1581 fn transport(&self) -> ChannelTransport {
1588 ChannelTransport::Unix
1589 }
1590
1591 async fn spawn(
1618 &self,
1619 proc_id: ProcId,
1620 backend_addr: ChannelAddr,
1621 ) -> Result<Self::Handle, HostError> {
1622 let (callback_addr, mut callback_rx) =
1623 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1624
1625 let cfg = hyperactor::config::global::attrs();
1626
1627 let mode = Bootstrap::Proc {
1628 proc_id: proc_id.clone(),
1629 backend_addr,
1630 callback_addr,
1631 config: Some(cfg),
1632 };
1633 let mut cmd = Command::new(&self.command.program);
1634 if let Some(arg0) = &self.command.arg0 {
1635 cmd.arg0(arg0);
1636 }
1637 for arg in &self.command.args {
1638 cmd.arg(arg);
1639 }
1640 for (k, v) in &self.command.env {
1641 cmd.env(k, v);
1642 }
1643 cmd.env(
1644 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
1645 mode.to_env_safe_string()
1646 .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
1647 )
1648 .stdout(Stdio::piped())
1649 .stderr(Stdio::piped());
1650
1651 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
1652 cmd.env(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
1653
1654 let child = cmd
1655 .spawn()
1656 .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1657 let pid = child.id().unwrap_or_default();
1658
1659 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
1660
1661 if let Some(pid) = handle.pid() {
1662 handle.mark_running(pid, hyperactor::clock::RealClock.system_time_now());
1663 if let Ok(mut table) = self.pid_table.lock() {
1664 table.insert(proc_id.clone(), pid);
1665 }
1666 }
1667
1668 let (out_writer, err_writer) = create_log_writers(0, log_channel.clone(), pid)
1671 .unwrap_or_else(|_| (Box::new(tokio::io::stdout()), Box::new(tokio::io::stderr())));
1672
1673 let mut stdout_tailer: Option<LogTailer> = None;
1674 let mut stderr_tailer: Option<LogTailer> = None;
1675
1676 {
1678 let mut guard = handle.child.lock().expect("child mutex poisoned");
1679 if let Some(child) = guard.as_mut() {
1680 let max_tail_lines = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
1683 if let Some(out) = child.stdout.take() {
1684 stdout_tailer = Some(LogTailer::tee(max_tail_lines, out, out_writer));
1685 }
1686 if let Some(err) = child.stderr.take() {
1687 stderr_tailer = Some(LogTailer::tee(max_tail_lines, err, err_writer));
1688 }
1689 handle.set_tailers(stdout_tailer.take(), stderr_tailer.take());
1691 } else {
1692 tracing::debug!("proc {proc_id}: child already taken before wiring IO");
1693 }
1694 }
1695
1696 {
1698 let mut children = self.children.lock().await;
1699 children.insert(proc_id.clone(), handle.clone());
1700 }
1701
1702 self.spawn_exit_monitor(proc_id.clone(), handle.clone());
1705
1706 let h = handle.clone();
1707 let pid_table = Arc::clone(&self.pid_table);
1708 tokio::spawn(async move {
1709 match callback_rx.recv().await {
1710 Ok((addr, agent)) => {
1711 let pid = match h.pid() {
1712 Some(p) => p,
1713 None => {
1714 tracing::warn!("mark_ready called with missing pid; using 0");
1715 0
1716 }
1717 };
1718 let started_at = RealClock.system_time_now();
1719 let _ = h.mark_ready(pid, started_at, addr, agent);
1720 }
1721 Err(e) => {
1722 let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
1724 if let Ok(mut table) = pid_table.lock() {
1726 table.remove(&proc_id);
1727 }
1728 }
1729 }
1730 });
1731
1732 Ok(handle)
1734 }
1735}
1736
1737#[async_trait]
1738impl hyperactor::host::BulkTerminate for BootstrapProcManager {
1739 async fn terminate_all(&self, timeout: Duration, max_in_flight: usize) -> TerminateSummary {
1753 let handles: Vec<BootstrapProcHandle> = {
1755 let guard = self.children.lock().await;
1756 guard.values().cloned().collect()
1757 };
1758
1759 let attempted = handles.len();
1760 let mut ok = 0usize;
1761
1762 let results = stream::iter(handles.into_iter().map(|h| async move {
1763 match h.terminate(timeout).await {
1764 Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
1765 true
1767 }
1768 Err(e) => {
1769 tracing::warn!(error=%e, "terminate_all: failed to terminate child");
1770 false
1771 }
1772 }
1773 }))
1774 .buffer_unordered(max_in_flight.max(1))
1775 .collect::<Vec<bool>>()
1776 .await;
1777
1778 for r in results {
1779 if r {
1780 ok += 1;
1781 }
1782 }
1783
1784 TerminateSummary {
1785 attempted,
1786 ok,
1787 failed: attempted.saturating_sub(ok),
1788 }
1789 }
1790}
1791
1792pub async fn bootstrap() -> anyhow::Error {
1807 let boot = ok!(Bootstrap::get_from_env()).unwrap_or_else(Bootstrap::default);
1808 boot.bootstrap().await
1809}
1810
1811async fn bootstrap_v0_proc_mesh() -> anyhow::Error {
1821 pub async fn go() -> Result<(), anyhow::Error> {
1822 let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
1823 let procs_for_cleanup = procs.clone();
1824 let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
1825 for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
1826 if let Err(err) = proc_to_stop
1827 .destroy_and_wait::<()>(Duration::from_millis(10), None)
1828 .await
1829 {
1830 tracing::error!(
1831 "error while stopping proc {}: {}",
1832 proc_to_stop.proc_id(),
1833 err
1834 );
1835 }
1836 }
1837 }));
1838
1839 let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
1840 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
1841 .parse()?;
1842 let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
1843 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
1844 .parse()?;
1845 let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
1846 let (serve_addr, mut rx) = channel::serve(listen_addr)?;
1847 let tx = channel::dial(bootstrap_addr.clone())?;
1848
1849 let (rtx, mut return_channel) = oneshot::channel();
1850 tx.try_post(
1851 Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
1852 rtx,
1853 )?;
1854 tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
1855
1856 let mut the_msg;
1857
1858 tokio::select! {
1859 msg = rx.recv() => {
1860 the_msg = msg;
1861 }
1862 returned_msg = &mut return_channel => {
1863 match returned_msg {
1864 Ok(msg) => {
1865 return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
1866 }
1867 Err(_) => {
1868 the_msg = rx.recv().await;
1869 }
1870 }
1871 }
1872 }
1873 loop {
1874 let _ = hyperactor::tracing::info_span!("wait_for_next_message_from_mesh_agent");
1875 match the_msg? {
1876 Allocator2Process::StartProc(proc_id, listen_transport) => {
1877 let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone()).await?;
1878 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
1879 let handle = proc.clone().serve(proc_rx);
1880 drop(handle); tx.send(Process2Allocator(
1882 bootstrap_index,
1883 Process2AllocatorMessage::StartedProc(
1884 proc_id.clone(),
1885 mesh_agent.bind(),
1886 proc_addr,
1887 ),
1888 ))
1889 .await?;
1890 procs.lock().await.push(proc);
1891 }
1892 Allocator2Process::StopAndExit(code) => {
1893 tracing::info!("stopping procs with code {code}");
1894 {
1895 for proc_to_stop in procs.lock().await.iter_mut() {
1896 if let Err(err) = proc_to_stop
1897 .destroy_and_wait::<()>(Duration::from_millis(10), None)
1898 .await
1899 {
1900 tracing::error!(
1901 "error while stopping proc {}: {}",
1902 proc_to_stop.proc_id(),
1903 err
1904 );
1905 }
1906 }
1907 }
1908 tracing::info!("exiting with {code}");
1909 std::process::exit(code);
1910 }
1911 Allocator2Process::Exit(code) => {
1912 tracing::info!("exiting with {code}");
1913 std::process::exit(code);
1914 }
1915 }
1916 the_msg = rx.recv().await;
1917 }
1918 }
1919
1920 go().await.unwrap_err()
1921}
1922
1923pub async fn bootstrap_or_die() -> ! {
1926 let err = bootstrap().await;
1927 let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
1928 tracing::error!("failed to bootstrap mesh process: {}", err);
1929 std::process::exit(1)
1930}
1931
1932#[derive(enum_as_inner::EnumAsInner)]
1933enum DebugSink {
1934 File(std::fs::File),
1935 Sink,
1936}
1937
1938impl DebugSink {
1939 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1940 match self {
1941 DebugSink::File(f) => f.write(buf),
1942 DebugSink::Sink => Ok(buf.len()),
1943 }
1944 }
1945 fn flush(&mut self) -> io::Result<()> {
1946 match self {
1947 DebugSink::File(f) => f.flush(),
1948 DebugSink::Sink => Ok(()),
1949 }
1950 }
1951}
1952
1953fn debug_sink() -> &'static Mutex<DebugSink> {
1954 static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
1955 DEBUG_SINK.get_or_init(|| {
1956 let debug_path = {
1957 let mut p = std::env::temp_dir();
1958 if let Ok(user) = std::env::var("USER") {
1959 p.push(user);
1960 }
1961 std::fs::create_dir_all(&p).ok();
1962 p.push("monarch-bootstrap-debug.log");
1963 p
1964 };
1965 let sink = if debug_path.exists() {
1966 match OpenOptions::new()
1967 .append(true)
1968 .create(true)
1969 .open(debug_path.clone())
1970 {
1971 Ok(f) => DebugSink::File(f),
1972 Err(_e) => {
1973 eprintln!(
1974 "failed to open {} for bootstrap debug logging",
1975 debug_path.display()
1976 );
1977 DebugSink::Sink
1978 }
1979 }
1980 } else {
1981 DebugSink::Sink
1982 };
1983 Mutex::new(sink)
1984 })
1985}
1986
1987const DEBUG_TO_STDERR: bool = false;
1989
1990struct Debug;
1993
1994impl Debug {
1995 fn is_active() -> bool {
1996 DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
1997 }
1998}
1999
2000impl Write for Debug {
2001 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2002 let res = debug_sink().lock().unwrap().write(buf);
2003 if DEBUG_TO_STDERR {
2004 let n = match res {
2005 Ok(n) => n,
2006 Err(_) => buf.len(),
2007 };
2008 let _ = io::stderr().write_all(&buf[..n]);
2009 }
2010
2011 res
2012 }
2013 fn flush(&mut self) -> io::Result<()> {
2014 let res = debug_sink().lock().unwrap().flush();
2015 if DEBUG_TO_STDERR {
2016 let _ = io::stderr().flush();
2017 }
2018 res
2019 }
2020}
2021
2022#[cfg(test)]
2023mod tests {
2024 use std::path::PathBuf;
2025 use std::process::Stdio;
2026
2027 use hyperactor::ActorId;
2028 use hyperactor::ActorRef;
2029 use hyperactor::ProcId;
2030 use hyperactor::WorldId;
2031 use hyperactor::channel::ChannelAddr;
2032 use hyperactor::channel::ChannelTransport;
2033 use hyperactor::clock::RealClock;
2034 use hyperactor::context::Mailbox as _;
2035 use hyperactor::host::ProcHandle;
2036 use hyperactor::id;
2037 use ndslice::Extent;
2038 use ndslice::ViewExt;
2039 use ndslice::extent;
2040 use tokio::io;
2041 use tokio::process::Command;
2042
2043 use super::*;
2044 use crate::alloc::AllocSpec;
2045 use crate::alloc::Allocator;
2046 use crate::alloc::ProcessAllocator;
2047 use crate::v1::ActorMesh;
2048 use crate::v1::host_mesh::HostMesh;
2049 use crate::v1::testactor;
2050
2051 fn any_addr_for_test() -> ChannelAddr {
2054 ChannelAddr::any(ChannelTransport::Unix)
2055 }
2056
2057 #[test]
2058 fn test_bootstrap_mode_env_string_none_config_proc() {
2059 let values = [
2060 Bootstrap::default(),
2061 Bootstrap::Proc {
2062 proc_id: id!(foo[0]),
2063 backend_addr: ChannelAddr::any(ChannelTransport::Tcp),
2064 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2065 config: None,
2066 },
2067 ];
2068
2069 for value in values {
2070 let safe = value.to_env_safe_string().unwrap();
2071 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2072
2073 let safe2 = round.to_env_safe_string().unwrap();
2076 assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2077
2078 match (&value, &round) {
2080 (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2081 (Bootstrap::V0ProcMesh, Bootstrap::V0ProcMesh) => {}
2082 _ => panic!("decoded variant mismatch: got {:?}", round),
2083 }
2084 }
2085 }
2086
2087 #[test]
2088 fn test_bootstrap_mode_env_string_none_config_host() {
2089 let value = Bootstrap::Host {
2090 addr: ChannelAddr::any(ChannelTransport::Unix),
2091 command: None,
2092 config: None,
2093 };
2094
2095 let safe = value.to_env_safe_string().unwrap();
2096 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2097
2098 let safe2 = round.to_env_safe_string().unwrap();
2100 assert_eq!(safe, safe2);
2101
2102 match round {
2104 Bootstrap::Host { config: None, .. } => {}
2105 other => panic!("expected Host with None config, got {:?}", other),
2106 }
2107 }
2108
2109 #[test]
2110 fn test_bootstrap_mode_env_string_invalid() {
2111 assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2113 }
2114
2115 #[test]
2116 fn test_bootstrap_env_roundtrip_with_config_proc_and_host() {
2117 let mut attrs = Attrs::new();
2119 attrs[MESH_TAIL_LOG_LINES] = 123;
2120 attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2121
2122 {
2124 let original = Bootstrap::Proc {
2125 proc_id: id!(foo[42]),
2126 backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2127 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2128 config: Some(attrs.clone()),
2129 };
2130 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2131 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2132 match &decoded {
2133 Bootstrap::Proc { config, .. } => {
2134 let cfg = config.as_ref().expect("expected Some(attrs)");
2135 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2136 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2137 }
2138 other => panic!("unexpected variant after roundtrip: {:?}", other),
2139 }
2140 }
2141
2142 {
2144 let original = Bootstrap::Host {
2145 addr: ChannelAddr::any(ChannelTransport::Unix),
2146 command: None,
2147 config: Some(attrs.clone()),
2148 };
2149 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2150 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2151 match &decoded {
2152 Bootstrap::Host { config, .. } => {
2153 let cfg = config.as_ref().expect("expected Some(attrs)");
2154 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2155 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2156 }
2157 other => panic!("unexpected variant after roundtrip: {:?}", other),
2158 }
2159 }
2160 }
2161
2162 #[tokio::test]
2163 async fn test_child_terminated_on_manager_drop() {
2164 use std::path::PathBuf;
2165 use std::process::Stdio;
2166
2167 use tokio::process::Command;
2168 use tokio::time::Duration;
2169
2170 let command = BootstrapCommand {
2172 program: PathBuf::from("/bin/true"),
2173 ..Default::default()
2174 };
2175 let manager = BootstrapProcManager::new(command);
2176
2177 let mut cmd = Command::new("/bin/sleep");
2180 cmd.arg("30")
2181 .stdout(Stdio::null())
2182 .stderr(Stdio::null())
2183 .kill_on_drop(true);
2184
2185 let child = cmd.spawn().expect("spawn sleep");
2186 let pid = child.id().expect("pid");
2187
2188 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "test".to_string());
2191 {
2192 let mut children = manager.children.lock().await;
2193 children.insert(
2194 proc_id.clone(),
2195 BootstrapProcHandle::new(proc_id.clone(), child),
2196 );
2197 }
2198
2199 #[cfg(target_os = "linux")]
2201 {
2202 let path = format!("/proc/{}", pid);
2203 assert!(
2204 std::fs::metadata(&path).is_ok(),
2205 "expected /proc/{pid} to exist before drop"
2206 );
2207 }
2208
2209 drop(manager);
2214
2215 #[cfg(target_os = "linux")]
2218 {
2219 let deadline = std::time::Instant::now() + Duration::from_millis(1500);
2220 let proc_dir = format!("/proc/{}", pid);
2221 let status_file = format!("{}/status", proc_dir);
2222
2223 let mut ok = false;
2224 while std::time::Instant::now() < deadline {
2225 match std::fs::read_to_string(&status_file) {
2226 Ok(s) => {
2227 if let Some(state_line) = s.lines().find(|l| l.starts_with("State:")) {
2228 if state_line.contains('Z') {
2229 ok = true;
2231 break;
2232 } else {
2233 }
2235 }
2236 }
2237 Err(_) => {
2238 ok = true;
2240 break;
2241 }
2242 }
2243 RealClock.sleep(Duration::from_millis(100)).await;
2244 }
2245
2246 assert!(ok, "expected /proc/{pid} to be gone or zombie after drop");
2247 }
2248
2249 }
2252
2253 #[tokio::test]
2254 async fn test_v1_child_logging() {
2255 use hyperactor::channel;
2256 use hyperactor::data::Serialized;
2257 use hyperactor::mailbox::BoxedMailboxSender;
2258 use hyperactor::mailbox::DialMailboxRouter;
2259 use hyperactor::mailbox::MailboxServer;
2260 use hyperactor::proc::Proc;
2261
2262 use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2263 use crate::logging::LogClientActor;
2264 use crate::logging::LogClientMessageClient;
2265 use crate::logging::LogForwardActor;
2266 use crate::logging::LogMessage;
2267 use crate::logging::OutputTarget;
2268 use crate::logging::test_tap;
2269
2270 let router = DialMailboxRouter::new();
2271 let (proc_addr, proc_rx) =
2272 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2273 let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
2274 proc.clone().serve(proc_rx);
2275 router.bind(id!(client[0]).into(), proc_addr.clone());
2276 let (client, _handle) = proc.instance("client").unwrap();
2277
2278 let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2279 test_tap::install(tap_tx);
2280
2281 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2282 unsafe {
2284 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2285 }
2286
2287 let log_client: ActorRef<LogClientActor> =
2290 proc.spawn("log_client", ()).await.unwrap().bind();
2291 log_client.set_aggregate(&client, None).await.unwrap();
2292
2293 let _log_forwarder: ActorRef<LogForwardActor> = proc
2296 .spawn("log_forwarder", log_client.clone())
2297 .await
2298 .unwrap()
2299 .bind();
2300
2301 let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2304
2305 tx.post(LogMessage::Log {
2308 hostname: "testhost".into(),
2309 pid: 12345,
2310 output_target: OutputTarget::Stdout,
2311 payload: Serialized::serialize(&"hello from child".to_string()).unwrap(),
2312 });
2313
2314 let line = RealClock
2316 .timeout(Duration::from_secs(2), tap_rx.recv())
2317 .await
2318 .expect("timed out waiting for log line")
2319 .expect("tap channel closed unexpectedly");
2320 assert!(
2321 line.contains("hello from child"),
2322 "log line did not appear via LogClientActor; got: {line}"
2323 );
2324 }
2325
2326 mod proc_handle {
2327
2328 use hyperactor::ActorId;
2329 use hyperactor::ActorRef;
2330 use hyperactor::ProcId;
2331 use hyperactor::WorldId;
2332 use hyperactor::host::ProcHandle;
2333 use tokio::process::Command;
2334
2335 use super::super::*;
2336 use super::any_addr_for_test;
2337
2338 fn handle_for_test() -> BootstrapProcHandle {
2342 let child = Command::new("sh")
2344 .arg("-c")
2345 .arg("exit 0")
2346 .stdin(std::process::Stdio::null())
2347 .stdout(std::process::Stdio::null())
2348 .stderr(std::process::Stdio::null())
2349 .spawn()
2350 .expect("failed to spawn test child process");
2351
2352 let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2353 BootstrapProcHandle::new(proc_id, child)
2354 }
2355
2356 #[tokio::test]
2357 async fn starting_to_running_ok() {
2358 let h = handle_for_test();
2359 assert!(matches!(h.status(), ProcStatus::Starting));
2360 let child_pid = h.pid().expect("child should have a pid");
2361 let child_started_at = RealClock.system_time_now();
2362 assert!(h.mark_running(child_pid, child_started_at));
2363 match h.status() {
2364 ProcStatus::Running { pid, started_at } => {
2365 assert_eq!(pid, child_pid);
2366 assert_eq!(started_at, child_started_at);
2367 }
2368 other => panic!("expected Running, got {other:?}"),
2369 }
2370 }
2371
2372 #[tokio::test]
2373 async fn running_to_stopping_to_stopped_ok() {
2374 let h = handle_for_test();
2375 let child_pid = h.pid().expect("child should have a pid");
2376 let child_started_at = RealClock.system_time_now();
2377 assert!(h.mark_running(child_pid, child_started_at));
2378 assert!(h.mark_stopping());
2379 assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2380 assert!(h.mark_stopped(0, Vec::new()));
2381 assert!(matches!(
2382 h.status(),
2383 ProcStatus::Stopped { exit_code: 0, .. }
2384 ));
2385 }
2386
2387 #[tokio::test]
2388 async fn running_to_killed_ok() {
2389 let h = handle_for_test();
2390 let child_pid = h.pid().expect("child should have a pid");
2391 let child_started_at = RealClock.system_time_now();
2392 assert!(h.mark_running(child_pid, child_started_at));
2393 assert!(h.mark_killed(9, true));
2394 assert!(matches!(
2395 h.status(),
2396 ProcStatus::Killed {
2397 signal: 9,
2398 core_dumped: true
2399 }
2400 ));
2401 }
2402
2403 #[tokio::test]
2404 async fn running_to_failed_ok() {
2405 let h = handle_for_test();
2406 let child_pid = h.pid().expect("child should have a pid");
2407 let child_started_at = RealClock.system_time_now();
2408 assert!(h.mark_running(child_pid, child_started_at));
2409 assert!(h.mark_failed("bootstrap error"));
2410 match h.status() {
2411 ProcStatus::Failed { reason } => {
2412 assert_eq!(reason, "bootstrap error");
2413 }
2414 other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2415 }
2416 }
2417
2418 #[tokio::test]
2419 async fn illegal_transitions_are_rejected() {
2420 let h = handle_for_test();
2421 let child_pid = h.pid().expect("child should have a pid");
2422 let child_started_at = RealClock.system_time_now();
2423 assert!(h.mark_running(child_pid, child_started_at));
2425 assert!(!h.mark_running(child_pid, RealClock.system_time_now()));
2426 match h.status() {
2427 ProcStatus::Running { pid, .. } => assert_eq!(pid, child_pid),
2428 other => panic!("expected Running, got {other:?}"),
2429 }
2430 assert!(h.mark_stopping());
2432 assert!(h.mark_stopped(0, Vec::new()));
2433 assert!(!h.mark_running(child_pid, child_started_at));
2434 assert!(!h.mark_killed(9, false));
2435 assert!(!h.mark_failed("nope"));
2436
2437 assert!(matches!(
2438 h.status(),
2439 ProcStatus::Stopped { exit_code: 0, .. }
2440 ));
2441 }
2442
2443 #[tokio::test]
2444 async fn transitions_from_ready_are_legal() {
2445 let h = handle_for_test();
2446 let addr = any_addr_for_test();
2447 let pid = h.pid().expect("child should have a pid");
2449 let t0 = RealClock.system_time_now();
2450 assert!(h.mark_running(pid, t0));
2451 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2454 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2455 let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2456 assert!(h.mark_ready(pid, t0, addr, agent_ref));
2458 assert!(h.mark_stopping());
2459 assert!(h.mark_stopped(0, Vec::new()));
2460 }
2461
2462 #[tokio::test]
2463 async fn ready_to_killed_is_legal() {
2464 let h = handle_for_test();
2465 let addr = any_addr_for_test();
2466 let pid = h.pid().expect("child should have a pid");
2468 let t0 = RealClock.system_time_now();
2469 assert!(h.mark_running(pid, t0));
2470 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2473 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2474 let agent: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2475 assert!(h.mark_ready(pid, t0, addr, agent));
2477 assert!(h.mark_killed(9, false));
2479 }
2480
2481 #[tokio::test]
2482 async fn mark_stopping_from_starting_uses_child_pid_when_available() {
2483 let h = handle_for_test();
2484
2485 let child_pid = h
2488 .pid()
2489 .expect("Child::id() should be available in Starting");
2490
2491 assert!(
2494 h.mark_stopping(),
2495 "mark_stopping() should succeed from Starting"
2496 );
2497 match h.status() {
2498 ProcStatus::Stopping { pid, started_at } => {
2499 assert_eq!(pid, child_pid, "Stopping pid should come from Child::id()");
2500 assert!(
2501 started_at <= RealClock.system_time_now(),
2502 "started_at should be sane"
2503 );
2504 }
2505 other => panic!("expected Stopping{{..}}; got {other:?}"),
2506 }
2507 }
2508
2509 #[tokio::test]
2510 async fn mark_stopping_noop_when_no_child_pid_available() {
2511 let h = handle_for_test();
2512
2513 {
2517 let _ = h.child.lock().expect("child mutex").take();
2518 }
2519
2520 assert!(
2523 !h.mark_stopping(),
2524 "mark_stopping() should no-op from Starting when no pid is observable"
2525 );
2526 assert!(matches!(h.status(), ProcStatus::Starting));
2527 }
2528
2529 #[tokio::test]
2530 async fn mark_failed_from_stopping_is_allowed() {
2531 let h = handle_for_test();
2532
2533 assert!(h.mark_stopping(), "precondition: to Stopping");
2536
2537 assert!(
2539 h.mark_failed("boom"),
2540 "mark_failed() should succeed from Stopping"
2541 );
2542 match h.status() {
2543 ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2544 other => panic!("expected Failed(\"boom\"), got {other:?}"),
2545 }
2546 }
2547 }
2548
2549 #[tokio::test]
2550 async fn test_exit_monitor_updates_status_on_clean_exit() {
2551 let command = BootstrapCommand {
2552 program: PathBuf::from("/bin/true"),
2553 ..Default::default()
2554 };
2555 let manager = BootstrapProcManager::new(command);
2556
2557 let mut cmd = Command::new("true");
2559 cmd.stdout(Stdio::null()).stderr(Stdio::null());
2560 let child = cmd.spawn().expect("spawn true");
2561
2562 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "clean".into());
2563 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2564
2565 {
2567 let mut children = manager.children.lock().await;
2568 children.insert(proc_id.clone(), handle.clone());
2569 }
2570 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2571 {
2572 let guard = handle.child.lock().expect("child mutex");
2573 assert!(
2574 guard.is_none(),
2575 "expected Child to be taken by exit monitor"
2576 );
2577 }
2578
2579 let st = handle.wait_inner().await;
2580 assert!(matches!(st, ProcStatus::Stopped { .. }), "status={st:?}");
2581 }
2582
2583 #[tokio::test]
2584 async fn test_exit_monitor_updates_status_on_kill() {
2585 let command = BootstrapCommand {
2586 program: PathBuf::from("/bin/sleep"),
2587 ..Default::default()
2588 };
2589 let manager = BootstrapProcManager::new(command);
2590
2591 let mut cmd = Command::new("/bin/sleep");
2593 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2594 let child = cmd.spawn().expect("spawn sleep");
2595 let pid = child.id().expect("pid") as i32;
2596
2597 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "killed".into());
2599 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2600 {
2601 let mut children = manager.children.lock().await;
2602 children.insert(proc_id.clone(), handle.clone());
2603 }
2604 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2605 {
2606 let guard = handle.child.lock().expect("child mutex");
2607 assert!(
2608 guard.is_none(),
2609 "expected Child to be taken by exit monitor"
2610 );
2611 }
2612 #[cfg(unix)]
2614 unsafe {
2622 libc::kill(pid, libc::SIGKILL);
2623 }
2624
2625 let st = handle.wait_inner().await;
2626 match st {
2627 ProcStatus::Killed { signal, .. } => assert_eq!(signal, libc::SIGKILL),
2628 other => panic!("expected Killed(SIGKILL), got {other:?}"),
2629 }
2630 }
2631
2632 #[tokio::test]
2633 async fn watch_notifies_on_status_changes() {
2634 let child = Command::new("sh")
2635 .arg("-c")
2636 .arg("sleep 0.1")
2637 .stdin(std::process::Stdio::null())
2638 .stdout(std::process::Stdio::null())
2639 .stderr(std::process::Stdio::null())
2640 .spawn()
2641 .expect("spawn");
2642
2643 let proc_id = ProcId::Ranked(WorldId("test".into()), 1);
2644 let handle = BootstrapProcHandle::new(proc_id, child);
2645 let mut rx = handle.watch();
2646
2647 let pid = handle.pid().unwrap_or(0);
2649 let now = RealClock.system_time_now();
2650 assert!(handle.mark_running(pid, now));
2651 rx.changed().await.ok(); match &*rx.borrow() {
2653 ProcStatus::Running { pid: p, started_at } => {
2654 assert_eq!(*p, pid);
2655 assert_eq!(*started_at, now);
2656 }
2657 s => panic!("expected Running, got {s:?}"),
2658 }
2659
2660 assert!(handle.mark_stopped(0, Vec::new()));
2662 rx.changed().await.ok(); assert!(matches!(
2664 &*rx.borrow(),
2665 ProcStatus::Stopped { exit_code: 0, .. }
2666 ));
2667 }
2668
2669 #[tokio::test]
2670 async fn ready_errs_if_process_exits_before_running() {
2671 let child = Command::new("sh")
2673 .arg("-c")
2674 .arg("exit 7")
2675 .stdin(std::process::Stdio::null())
2676 .stdout(std::process::Stdio::null())
2677 .stderr(std::process::Stdio::null())
2678 .spawn()
2679 .expect("spawn");
2680
2681 let proc_id = ProcId::Direct(
2682 ChannelAddr::any(ChannelTransport::Unix),
2683 "early-exit".into(),
2684 );
2685 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2686
2687 assert!(handle.mark_stopped(7, Vec::new()));
2690
2691 match handle.ready_inner().await {
2693 Ok(()) => panic!("ready() unexpectedly succeeded"),
2694 Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
2695 assert_eq!(exit_code, 7)
2696 }
2697 Err(other) => panic!("expected Stopped(7), got {other:?}"),
2698 }
2699 }
2700
2701 #[tokio::test]
2702 async fn status_unknown_proc_is_none() {
2703 let manager = BootstrapProcManager::new(BootstrapCommand {
2704 program: PathBuf::from("/bin/true"),
2705 ..Default::default()
2706 });
2707 let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
2708 assert!(manager.status(&unknown).await.is_none());
2709 }
2710
2711 #[tokio::test]
2712 async fn exit_monitor_child_already_taken_leaves_status_unchanged() {
2713 let manager = BootstrapProcManager::new(BootstrapCommand {
2714 program: PathBuf::from("/bin/sleep"),
2715 ..Default::default()
2716 });
2717
2718 let mut cmd = Command::new("/bin/sleep");
2720 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2721 let child = cmd.spawn().expect("spawn sleep");
2722
2723 let proc_id = ProcId::Direct(
2724 ChannelAddr::any(ChannelTransport::Unix),
2725 "already-taken".into(),
2726 );
2727 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2728
2729 {
2732 let mut children = manager.children.lock().await;
2733 children.insert(proc_id.clone(), handle.clone());
2734 }
2735 {
2736 let _ = handle.child.lock().expect("child mutex").take();
2737 }
2738
2739 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2741
2742 assert!(matches!(
2745 manager.status(&proc_id).await,
2746 Some(ProcStatus::Starting)
2747 ));
2748 }
2749
2750 #[tokio::test]
2751 async fn pid_none_after_exit_monitor_takes_child() {
2752 let manager = BootstrapProcManager::new(BootstrapCommand {
2753 program: PathBuf::from("/bin/sleep"),
2754 ..Default::default()
2755 });
2756
2757 let mut cmd = Command::new("/bin/sleep");
2758 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2759 let child = cmd.spawn().expect("spawn sleep");
2760
2761 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "pid-none".into());
2762 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2763
2764 assert!(handle.pid().is_some());
2766 {
2767 let mut children = manager.children.lock().await;
2768 children.insert(proc_id.clone(), handle.clone());
2769 }
2770 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2773
2774 assert!(handle.pid().is_none());
2777 }
2778
2779 #[tokio::test]
2780 async fn starting_may_directly_be_marked_stopped() {
2781 let child = Command::new("sh")
2784 .arg("-c")
2785 .arg("exit 0")
2786 .stdin(std::process::Stdio::null())
2787 .stdout(std::process::Stdio::null())
2788 .stderr(std::process::Stdio::null())
2789 .spawn()
2790 .expect("spawn true");
2791
2792 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "fast-exit".into());
2793 let handle = BootstrapProcHandle::new(proc_id, child);
2794
2795 let mut c = {
2797 let mut guard = handle.child.lock().expect("child mutex");
2798 guard.take()
2799 }
2800 .expect("child already taken");
2801
2802 let status = c.wait().await.expect("wait");
2803 let code = status.code().unwrap_or(0);
2804 assert!(handle.mark_stopped(code, Vec::new()));
2805
2806 assert!(matches!(
2807 handle.status(),
2808 ProcStatus::Stopped { exit_code: 0, .. }
2809 ));
2810 }
2811
2812 #[tokio::test]
2813 async fn handle_ready_allows_waiters() {
2814 let child = Command::new("sh")
2815 .arg("-c")
2816 .arg("sleep 0.1")
2817 .stdin(std::process::Stdio::null())
2818 .stdout(std::process::Stdio::null())
2819 .stderr(std::process::Stdio::null())
2820 .spawn()
2821 .expect("spawn sleep");
2822 let proc_id = ProcId::Ranked(WorldId("test".into()), 42);
2823 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2824
2825 let pid = handle.pid().expect("child should have a pid");
2826 let started_at = RealClock.system_time_now();
2827 assert!(handle.mark_running(pid, started_at));
2828
2829 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2830 let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2831
2832 let ready_addr = any_addr_for_test();
2835
2836 assert!(handle.mark_ready(pid, started_at, ready_addr.clone(), agent_ref));
2838 handle
2839 .ready_inner()
2840 .await
2841 .expect("ready_inner() should complete after Ready");
2842
2843 match handle.status() {
2846 ProcStatus::Ready {
2847 pid: p,
2848 started_at: t,
2849 addr: a,
2850 ..
2851 } => {
2852 assert_eq!(p, pid);
2853 assert_eq!(t, started_at);
2854 assert_eq!(a, ready_addr);
2855 }
2856 other => panic!("expected Ready, got {other:?}"),
2857 }
2858 }
2859
2860 #[tokio::test]
2861 async fn pid_behavior_across_states_running_ready_then_stopped() {
2862 let child = Command::new("sh")
2863 .arg("-c")
2864 .arg("sleep 0.1")
2865 .stdin(std::process::Stdio::null())
2866 .stdout(std::process::Stdio::null())
2867 .stderr(std::process::Stdio::null())
2868 .spawn()
2869 .expect("spawn");
2870
2871 let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2872 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2873
2874 let pid = handle.pid().expect("initial Child::id");
2876 let t0 = RealClock.system_time_now();
2877 assert!(handle.mark_running(pid, t0));
2878 assert_eq!(handle.pid(), Some(pid));
2879
2880 let addr = any_addr_for_test();
2882 let agent = {
2883 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2884 ActorRef::<ProcMeshAgent>::attest(actor_id)
2885 };
2886 assert!(handle.mark_ready(pid, t0, addr, agent));
2887 {
2888 let _ = handle.child.lock().expect("child mutex").take();
2889 }
2890 assert_eq!(handle.pid(), Some(pid));
2891
2892 assert!(handle.mark_stopped(0, Vec::new()));
2894 assert_eq!(handle.pid(), None, "pid() should be None once terminal");
2895 }
2896
2897 #[tokio::test]
2898 async fn pid_is_available_in_ready_even_after_child_taken() {
2899 let child = Command::new("sh")
2900 .arg("-c")
2901 .arg("sleep 0.1")
2902 .stdin(std::process::Stdio::null())
2903 .stdout(std::process::Stdio::null())
2904 .stderr(std::process::Stdio::null())
2905 .spawn()
2906 .expect("spawn");
2907
2908 let proc_id = ProcId::Ranked(WorldId("test".into()), 99);
2909 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2910
2911 let pid = handle.pid().expect("child should have pid (via Child::id)");
2913 let started_at = RealClock.system_time_now();
2914 assert!(handle.mark_running(pid, started_at));
2915
2916 let addr = any_addr_for_test();
2918 let agent = {
2919 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2920 ActorRef::<ProcMeshAgent>::attest(actor_id)
2921 };
2922 assert!(handle.mark_ready(pid, started_at, addr, agent));
2923
2924 {
2927 let _ = handle.child.lock().expect("child mutex").take();
2928 }
2929
2930 assert_eq!(handle.pid(), Some(pid), "pid() should be cached in Ready");
2933 }
2934
2935 #[test]
2936 fn display_running_includes_pid_and_uptime() {
2937 let started_at = RealClock.system_time_now() - Duration::from_secs(42);
2938 let st = ProcStatus::Running {
2939 pid: 1234,
2940 started_at,
2941 };
2942
2943 let s = format!("{}", st);
2944 assert!(s.contains("1234"));
2945 assert!(s.contains("Running"));
2946 assert!(s.contains("42s"));
2947 }
2948
2949 #[test]
2950 fn display_ready_includes_pid_and_addr() {
2951 let started_at = RealClock.system_time_now() - Duration::from_secs(5);
2952 let addr = ChannelAddr::any(ChannelTransport::Unix);
2953 let agent =
2954 ActorRef::attest(ProcId::Direct(addr.clone(), "proc".into()).actor_id("agent", 0));
2955
2956 let st = ProcStatus::Ready {
2957 pid: 4321,
2958 started_at,
2959 addr: addr.clone(),
2960 agent,
2961 };
2962
2963 let s = format!("{}", st);
2964 assert!(s.contains("4321")); assert!(s.contains(&addr.to_string())); assert!(s.contains("Ready"));
2967 }
2968
2969 #[test]
2970 fn display_stopped_includes_exit_code() {
2971 let st = ProcStatus::Stopped {
2972 exit_code: 7,
2973 stderr_tail: Vec::new(),
2974 };
2975 let s = format!("{}", st);
2976 assert!(s.contains("Stopped"));
2977 assert!(s.contains("7"));
2978 }
2979
2980 #[test]
2981 fn display_other_variants_does_not_panic() {
2982 let samples = vec![
2983 ProcStatus::Starting,
2984 ProcStatus::Stopping {
2985 pid: 42,
2986 started_at: RealClock.system_time_now(),
2987 },
2988 ProcStatus::Ready {
2989 pid: 42,
2990 started_at: RealClock.system_time_now(),
2991 addr: ChannelAddr::any(ChannelTransport::Unix),
2992 agent: ActorRef::attest(
2993 ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "x".into())
2994 .actor_id("agent", 0),
2995 ),
2996 },
2997 ProcStatus::Killed {
2998 signal: 9,
2999 core_dumped: false,
3000 },
3001 ProcStatus::Failed {
3002 reason: "boom".into(),
3003 },
3004 ];
3005
3006 for st in samples {
3007 let _ = format!("{}", st); }
3009 }
3010
3011 #[tokio::test]
3012 async fn proc_handle_ready_ok_through_trait() {
3013 let child = Command::new("sh")
3014 .arg("-c")
3015 .arg("sleep 0.1")
3016 .stdin(Stdio::null())
3017 .stdout(Stdio::null())
3018 .stderr(Stdio::null())
3019 .spawn()
3020 .expect("spawn");
3021
3022 let proc_id = ProcId::Direct(any_addr_for_test(), "ph-ready-ok".into());
3023 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3024
3025 let pid = handle.pid().expect("pid");
3027 let t0 = RealClock.system_time_now();
3028 assert!(handle.mark_running(pid, t0));
3029
3030 let addr = any_addr_for_test();
3032 let agent: ActorRef<ProcMeshAgent> =
3033 ActorRef::attest(ActorId(proc_id.clone(), "agent".into(), 0));
3034 assert!(handle.mark_ready(pid, t0, addr, agent));
3035
3036 let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3038 assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3039 }
3040
3041 #[tokio::test]
3042 async fn proc_handle_wait_returns_terminal_status() {
3043 let child = Command::new("sh")
3044 .arg("-c")
3045 .arg("exit 0")
3046 .stdin(Stdio::null())
3047 .stdout(Stdio::null())
3048 .stderr(Stdio::null())
3049 .spawn()
3050 .expect("spawn");
3051
3052 let proc_id = ProcId::Direct(any_addr_for_test(), "ph-wait".into());
3053 let handle = BootstrapProcHandle::new(proc_id, child);
3054
3055 assert!(handle.mark_stopped(0, Vec::new()));
3057
3058 let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3060 .await
3061 .expect("wait should return Ok(terminal)");
3062
3063 match st {
3064 ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3065 other => panic!("expected Stopped(0), got {other:?}"),
3066 }
3067 }
3068
3069 #[tokio::test]
3070 async fn ready_wrapper_maps_terminal_to_trait_error() {
3071 let child = Command::new("sh")
3072 .arg("-c")
3073 .arg("exit 7")
3074 .stdin(Stdio::null())
3075 .stdout(Stdio::null())
3076 .stderr(Stdio::null())
3077 .spawn()
3078 .expect("spawn");
3079 let proc_id = ProcId::Direct(any_addr_for_test(), "wrap".into());
3080 let handle = BootstrapProcHandle::new(proc_id, child);
3081
3082 assert!(handle.mark_stopped(7, Vec::new()));
3083
3084 match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3085 Ok(()) => panic!("expected Err"),
3086 Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3087 exit_code, ..
3088 })) => {
3089 assert_eq!(exit_code, 7);
3090 }
3091 Err(e) => panic!("unexpected error: {e:?}"),
3092 }
3093 }
3094
3095 async fn make_proc_id_and_backend_addr(
3105 instance: &hyperactor::Instance<()>,
3106 _tag: &str,
3107 ) -> (ProcId, ChannelAddr) {
3108 let proc_id = id!(bootstrap_child[0]);
3109
3110 let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3113
3114 instance.proc().clone().serve(rx);
3118
3119 (proc_id, backend_addr)
3120 }
3121
3122 #[tokio::test]
3123 async fn bootstrap_handle_terminate_graceful() {
3124 let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3126 .await
3127 .unwrap();
3128 let (instance, _handle) = root.instance("client").unwrap();
3129
3130 let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3131 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3132 let handle = mgr
3133 .spawn(proc_id.clone(), backend_addr.clone())
3134 .await
3135 .expect("spawn bootstrap child");
3136
3137 handle.ready().await.expect("ready");
3138
3139 let deadline = Duration::from_secs(2);
3140 match RealClock
3141 .timeout(deadline * 2, handle.terminate(deadline))
3142 .await
3143 {
3144 Err(_) => panic!("terminate() future hung"),
3145 Ok(Ok(st)) => {
3146 match st {
3147 ProcStatus::Stopped { exit_code, .. } => {
3148 assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3150 }
3151 ProcStatus::Killed { signal, .. } => {
3152 assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3170 }
3171 other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3172 }
3173 }
3174 Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3175 }
3176 }
3177
3178 #[tokio::test]
3179 async fn bootstrap_handle_kill_forced() {
3180 let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3182 .await
3183 .unwrap();
3184 let (instance, _handle) = root.instance("client").unwrap();
3185
3186 let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3187
3188 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3190
3191 let handle = mgr
3193 .spawn(proc_id.clone(), backend_addr.clone())
3194 .await
3195 .expect("spawn bootstrap child");
3196
3197 handle.ready().await.expect("ready");
3200
3201 let deadline = Duration::from_secs(5);
3204 match RealClock.timeout(deadline, handle.kill()).await {
3205 Err(_) => panic!("kill() future hung"),
3206 Ok(Ok(st)) => {
3207 match st {
3209 ProcStatus::Killed { signal, .. } => {
3210 assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3212 }
3213 other => panic!("expected Killed status after kill(); got: {other:?}"),
3214 }
3215 }
3216 Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3217 }
3218 }
3219
3220 #[tokio::test]
3221 async fn bootstrap_cannonical_simple() {
3222 unsafe {
3224 std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3225 }
3226 let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3228 .await
3229 .unwrap();
3230 let (instance, _handle) = proc.instance("client").unwrap();
3233
3234 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3236 "monarch/hyperactor_mesh/bootstrap",
3237 )));
3238 let alloc = allocator
3240 .allocate(AllocSpec {
3241 extent: extent!(replicas = 1),
3242 constraints: Default::default(),
3243 proc_name: None,
3244 transport: ChannelTransport::Unix,
3245 })
3246 .await
3247 .unwrap();
3248
3249 let host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3276 .await
3277 .unwrap();
3278
3279 let proc_mesh = host_mesh
3294 .spawn(&instance, "p0", Extent::unity())
3295 .await
3296 .unwrap();
3297
3298 let actor_mesh: ActorMesh<testactor::TestActor> =
3314 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3315
3316 let (port, mut rx) = instance.mailbox().open_port();
3322 actor_mesh
3323 .cast(&instance, testactor::GetActorId(port.bind()))
3324 .unwrap();
3325 let got_id = rx.recv().await.unwrap();
3326 assert_eq!(
3327 got_id,
3328 actor_mesh.values().next().unwrap().actor_id().clone()
3329 );
3330
3331 host_mesh.shutdown(&instance).await.expect("host shutdown");
3335 }
3336
3337 #[tokio::test]
3338 async fn exit_tail_is_attached_and_logged() {
3339 let mut cmd = Command::new("sh");
3341 cmd.arg("-c")
3342 .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7")
3343 .stdout(Stdio::piped())
3344 .stderr(Stdio::piped());
3345
3346 let child = cmd.spawn().expect("spawn");
3347
3348 let proc_id = hyperactor::id!(testproc[0]);
3351 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3352
3353 {
3356 let mut guard = handle.child.lock().expect("child mutex poisoned");
3358 if let Some(child) = guard.as_mut() {
3359 let out = child.stdout.take().expect("child stdout must be piped");
3360 let err = child.stderr.take().expect("child stderr must be piped");
3361
3362 let out_writer: Box<dyn io::AsyncWrite + Send + Unpin> = Box::new(io::sink());
3365 let err_writer: Box<dyn io::AsyncWrite + Send + Unpin> = Box::new(io::sink());
3366
3367 let max_tail_lines = 3;
3369 let out_tailer = LogTailer::tee(max_tail_lines, out, out_writer);
3370 let err_tailer = LogTailer::tee(max_tail_lines, err, err_writer);
3371
3372 handle.set_tailers(Some(out_tailer), Some(err_tailer));
3375 } else {
3376 panic!("child already taken before wiring tailers");
3377 }
3378 }
3379
3380 let manager = BootstrapProcManager::new(BootstrapCommand {
3383 program: std::path::PathBuf::from("/bin/true"), ..Default::default()
3385 });
3386 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3387
3388 let st = RealClock
3391 .timeout(Duration::from_secs(2), handle.wait_inner())
3392 .await
3393 .expect("wait_inner() timed out (exit monitor stuck?)");
3394 match st {
3395 ProcStatus::Stopped {
3396 exit_code,
3397 stderr_tail,
3398 } => {
3399 assert_eq!(
3400 exit_code, 7,
3401 "unexpected exit code; stderr_tail={:?}",
3402 stderr_tail
3403 );
3404 let tail = stderr_tail.join("\n");
3405 assert!(tail.contains("boom-1"), "missing boom-1; tail:\n{tail}");
3406 assert!(tail.contains("boom-2"), "missing boom-2; tail:\n{tail}");
3407 }
3408 other => panic!("expected Stopped(7), got {other:?}"),
3409 }
3410 }
3411}