1use std::collections::HashMap;
10use std::collections::VecDeque;
11use std::env::VarError;
12use std::fmt;
13use std::fs::OpenOptions;
14use std::future;
15use std::io;
16use std::io::Write;
17use std::os::unix::process::ExitStatusExt;
18use std::path::Path;
19use std::path::PathBuf;
20use std::process::Stdio;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::OnceLock;
24use std::time::Duration;
25use std::time::SystemTime;
26
27use async_trait::async_trait;
28use base64::prelude::*;
29use futures::StreamExt;
30use futures::stream;
31use humantime::format_duration;
32use hyperactor::ActorId;
33use hyperactor::ActorRef;
34use hyperactor::Named;
35use hyperactor::ProcId;
36use hyperactor::attrs::Attrs;
37use hyperactor::channel;
38use hyperactor::channel::ChannelAddr;
39use hyperactor::channel::ChannelError;
40use hyperactor::channel::ChannelTransport;
41use hyperactor::channel::Rx;
42use hyperactor::channel::Tx;
43use hyperactor::clock::Clock;
44use hyperactor::clock::RealClock;
45use hyperactor::config::CONFIG;
46use hyperactor::config::ConfigAttr;
47use hyperactor::config::global as config;
48use hyperactor::config::global::override_or_global;
49use hyperactor::context;
50use hyperactor::declare_attrs;
51use hyperactor::host::Host;
52use hyperactor::host::HostError;
53use hyperactor::host::ProcHandle;
54use hyperactor::host::ProcManager;
55use hyperactor::host::TerminateSummary;
56use hyperactor::mailbox::IntoBoxedMailboxSender;
57use hyperactor::mailbox::MailboxClient;
58use hyperactor::mailbox::MailboxServer;
59use hyperactor::proc::Proc;
60use serde::Deserialize;
61use serde::Serialize;
62use tempfile::TempDir;
63use tokio::process::Child;
64use tokio::process::ChildStderr;
65use tokio::process::ChildStdout;
66use tokio::process::Command;
67use tokio::sync::oneshot;
68use tokio::sync::watch;
69use tracing::Instrument;
70use tracing::Level;
71
72use crate::logging::OutputTarget;
73use crate::logging::StreamFwder;
74use crate::proc_mesh::mesh_agent::ProcMeshAgent;
75use crate::resource;
76use crate::v1;
77use crate::v1::host_mesh::mesh_agent::HostAgentMode;
78use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
79
80mod mailbox;
81
82declare_attrs! {
83 @meta(CONFIG = ConfigAttr {
99 env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
100 py_name: Some("enable_log_forwarding".to_string()),
101 })
102 pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
103
104 @meta(CONFIG = ConfigAttr {
124 env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
125 py_name: Some("enable_file_capture".to_string()),
126 })
127 pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
128
129 @meta(CONFIG = ConfigAttr {
133 env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
134 py_name: Some("tail_log_lines".to_string()),
135 })
136 pub attr MESH_TAIL_LOG_LINES: usize = 0;
137
138 @meta(CONFIG = ConfigAttr {
145 env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()),
146 py_name: None,
147 })
148 pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true;
149
150 @meta(CONFIG = ConfigAttr {
155 env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()),
156 py_name: None,
157 })
158 pub attr MESH_TERMINATE_CONCURRENCY: usize = 16;
159
160 @meta(CONFIG = ConfigAttr {
164 env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()),
165 py_name: None,
166 })
167 pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10);
168}
169
170pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
171pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
172pub const CLIENT_TRACE_ID_ENV: &str = "MONARCH_CLIENT_TRACE_ID";
173pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
177
178#[derive(Debug, Clone, Serialize, Deserialize, Named)]
182pub(crate) struct Process2Allocator(pub usize, pub Process2AllocatorMessage);
183
184#[derive(Debug, Clone, Serialize, Deserialize, Named)]
186pub(crate) enum Process2AllocatorMessage {
187 Hello(ChannelAddr),
191
192 StartedProc(ProcId, ActorRef<ProcMeshAgent>, ChannelAddr),
197
198 Heartbeat,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize, Named)]
203pub(crate) enum Allocator2Process {
204 StartProc(ProcId, ChannelTransport),
207
208 StopAndExit(i32),
211
212 Exit(i32),
215}
216
217async fn exit_if_missed_heartbeat(bootstrap_index: usize, bootstrap_addr: ChannelAddr) {
218 let tx = match channel::dial(bootstrap_addr.clone()) {
219 Ok(tx) => tx,
220
221 Err(err) => {
222 tracing::error!(
223 "Failed to establish heartbeat connection to allocator, exiting! (addr: {:?}): {}",
224 bootstrap_addr,
225 err
226 );
227 std::process::exit(1);
228 }
229 };
230 tracing::info!(
231 "Heartbeat connection established to allocator (idx: {bootstrap_index}, addr: {bootstrap_addr:?})",
232 );
233 loop {
234 RealClock.sleep(Duration::from_secs(5)).await;
235
236 let result = tx
237 .send(Process2Allocator(
238 bootstrap_index,
239 Process2AllocatorMessage::Heartbeat,
240 ))
241 .await;
242
243 if let Err(err) = result {
244 tracing::error!(
245 "Heartbeat failed to allocator, exiting! (addr: {:?}): {}",
246 bootstrap_addr,
247 err
248 );
249 std::process::exit(1);
250 }
251 }
252}
253
254#[macro_export]
255macro_rules! ok {
256 ($expr:expr $(,)?) => {
257 match $expr {
258 Ok(value) => value,
259 Err(e) => return ::anyhow::Error::from(e),
260 }
261 };
262}
263
264async fn halt<R>() -> R {
265 future::pending::<()>().await;
266 unreachable!()
267}
268
269#[derive(Clone, Default, Debug, Serialize, Deserialize)]
278pub enum Bootstrap {
279 Proc {
281 proc_id: ProcId,
283 backend_addr: ChannelAddr,
286 callback_addr: ChannelAddr,
288 socket_dir_path: PathBuf,
292 config: Option<Attrs>,
297 },
298
299 Host {
302 addr: ChannelAddr,
304 command: Option<BootstrapCommand>,
307 config: Option<Attrs>,
312 },
313
314 #[default]
315 V0ProcMesh, }
317
318impl Bootstrap {
319 #[allow(clippy::result_large_err)]
322 fn to_env_safe_string(&self) -> v1::Result<String> {
323 Ok(BASE64_STANDARD.encode(serde_json::to_string(&self)?))
324 }
325
326 #[allow(clippy::result_large_err)]
328 fn from_env_safe_string(str: &str) -> v1::Result<Self> {
329 let data = BASE64_STANDARD.decode(str)?;
330 let data = std::str::from_utf8(&data)?;
331 Ok(serde_json::from_str(data)?)
332 }
333
334 pub fn get_from_env() -> anyhow::Result<Option<Self>> {
337 match std::env::var("HYPERACTOR_MESH_BOOTSTRAP_MODE") {
338 Ok(mode) => match Bootstrap::from_env_safe_string(&mode) {
339 Ok(mode) => Ok(Some(mode)),
340 Err(e) => {
341 Err(anyhow::Error::from(e).context("parsing HYPERACTOR_MESH_BOOTSTRAP_MODE"))
342 }
343 },
344 Err(VarError::NotPresent) => Ok(None),
345 Err(e) => Err(anyhow::Error::from(e).context("reading HYPERACTOR_MESH_BOOTSTRAP_MODE")),
346 }
347 }
348
349 pub fn to_env(&self, cmd: &mut Command) {
351 cmd.env(
352 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
353 self.to_env_safe_string().unwrap(),
354 );
355 }
356
357 pub async fn bootstrap(self) -> anyhow::Error {
360 tracing::info!(
361 "bootstrapping mesh process: {}",
362 serde_json::to_string(&self).unwrap()
363 );
364
365 if Debug::is_active() {
366 let mut buf = Vec::new();
367 writeln!(&mut buf, "bootstrapping {}:", std::process::id()).unwrap();
368 #[cfg(unix)]
369 writeln!(
370 &mut buf,
371 "\tparent pid: {}",
372 std::os::unix::process::parent_id()
373 )
374 .unwrap();
375 writeln!(
376 &mut buf,
377 "\tconfig: {}",
378 serde_json::to_string(&self).unwrap()
379 )
380 .unwrap();
381 match std::env::current_exe() {
382 Ok(path) => writeln!(&mut buf, "\tcurrent_exe: {}", path.display()).unwrap(),
383 Err(e) => writeln!(&mut buf, "\tcurrent_exe: error<{}>", e).unwrap(),
384 }
385 writeln!(&mut buf, "\targs:").unwrap();
386 for arg in std::env::args() {
387 writeln!(&mut buf, "\t\t{}", arg).unwrap();
388 }
389 writeln!(&mut buf, "\tenv:").unwrap();
390 for (key, val) in std::env::vars() {
391 writeln!(&mut buf, "\t\t{}={}", key, val).unwrap();
392 }
393 let _ = Debug.write(&buf);
394 if let Ok(s) = std::str::from_utf8(&buf) {
395 tracing::info!("{}", s);
396 } else {
397 tracing::info!("{:?}", buf);
398 }
399 }
400
401 match self {
402 Bootstrap::Proc {
403 proc_id,
404 backend_addr,
405 callback_addr,
406 socket_dir_path,
407 config,
408 } => {
409 let entered = tracing::span!(
410 Level::INFO,
411 "proc_bootstrap",
412 %proc_id,
413 %backend_addr,
414 %callback_addr,
415 socket_dir_path = %socket_dir_path.display(),
416 )
417 .entered();
418 if let Some(attrs) = config {
419 config::set(config::Source::ClientOverride, attrs);
420 tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)");
421 } else {
422 tracing::debug!("bootstrap: no config snapshot provided (Proc)");
423 }
424
425 if hyperactor::config::global::get(MESH_BOOTSTRAP_ENABLE_PDEATHSIG) {
426 let _ = install_pdeathsig_kill();
431 } else {
432 eprintln!("(bootstrap) PDEATHSIG disabled via config");
433 }
434
435 let (local_addr, name) = ok!(proc_id
436 .as_direct()
437 .ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
438 let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
440 let serve_addr = serve_addr.parse().unwrap();
441
442 let proc_sender = mailbox::LocalProcDialer::new(
447 local_addr.clone(),
448 socket_dir_path,
449 ok!(MailboxClient::dial(backend_addr)),
450 );
451
452 let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
453
454 let span = entered.exit();
455
456 let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
457 .instrument(span.clone())
458 .await
459 .map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
460
461 let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr));
464 proc.clone().serve(proc_rx);
465 ok!(ok!(channel::dial(callback_addr))
466 .send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
467 .instrument(span)
468 .await
469 .map_err(ChannelError::from));
470
471 halt().await
472 }
473 Bootstrap::Host {
474 addr,
475 command,
476 config,
477 } => {
478 if let Some(attrs) = config {
479 config::set(config::Source::Runtime, attrs);
480 tracing::debug!("bootstrap: installed Runtime config snapshot (Host)");
481 } else {
482 tracing::debug!("bootstrap: no config snapshot provided (Host)");
483 }
484
485 let command = match command {
486 Some(command) => command,
487 None => ok!(BootstrapCommand::current()),
488 };
489 let manager = BootstrapProcManager::new(command).unwrap();
490
491 let (host, _handle) = ok!(Host::serve(manager, addr).await);
492 let addr = host.addr().clone();
493 let host_mesh_agent = ok!(host
494 .system_proc()
495 .clone()
496 .spawn::<HostMeshAgent>("agent", HostAgentMode::Process(host))
497 .await);
498
499 tracing::info!(
500 "serving host at {}, agent: {}",
501 addr,
502 host_mesh_agent.bind::<HostMeshAgent>()
503 );
504 halt().await
505 }
506 Bootstrap::V0ProcMesh => bootstrap_v0_proc_mesh().await,
507 }
508 }
509
510 pub async fn bootstrap_or_die(self) -> ! {
513 let err = self.bootstrap().await;
514 tracing::error!("failed to bootstrap mesh process: {}", err);
515 std::process::exit(1)
516 }
517}
518
519pub fn install_pdeathsig_kill() -> io::Result<()> {
521 #[cfg(target_os = "linux")]
522 {
523 let rc = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL as libc::c_int) };
526 if rc != 0 {
527 return Err(io::Error::last_os_error());
528 }
529 }
530 if unsafe { libc::getppid() } == 1 {
537 std::process::exit(0);
538 }
539 Ok(())
540}
541
542#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
557pub enum ProcStatus {
558 Starting,
561 Running { pid: u32, started_at: SystemTime },
565 Ready {
569 pid: u32,
570 started_at: SystemTime,
571 addr: ChannelAddr,
572 agent: ActorRef<ProcMeshAgent>,
573 },
574 Stopping { pid: u32, started_at: SystemTime },
578 Stopped {
581 exit_code: i32,
582 stderr_tail: Vec<String>,
583 },
584 Killed { signal: i32, core_dumped: bool },
587 Failed { reason: String },
591}
592
593impl ProcStatus {
594 #[inline]
598 pub fn is_exit(&self) -> bool {
599 matches!(
600 self,
601 ProcStatus::Stopped { .. } | ProcStatus::Killed { .. } | ProcStatus::Failed { .. }
602 )
603 }
604}
605
606impl std::fmt::Display for ProcStatus {
607 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
608 match self {
609 ProcStatus::Starting => write!(f, "Starting"),
610 ProcStatus::Running { pid, started_at } => {
611 let uptime = started_at
612 .elapsed()
613 .map(|d| format!(" up {}", format_duration(d)))
614 .unwrap_or_default();
615 write!(f, "Running[{pid}]{uptime}")
616 }
617 ProcStatus::Ready {
618 pid,
619 started_at,
620 addr,
621 ..
622 } => {
623 let uptime = started_at
624 .elapsed()
625 .map(|d| format!(" up {}", format_duration(d)))
626 .unwrap_or_default();
627 write!(f, "Ready[{pid}] at {addr}{uptime}")
628 }
629 ProcStatus::Stopping { pid, started_at } => {
630 let uptime = started_at
631 .elapsed()
632 .map(|d| format!(" up {}", format_duration(d)))
633 .unwrap_or_default();
634 write!(f, "Stopping[{pid}]{uptime}")
635 }
636 ProcStatus::Stopped { exit_code, .. } => write!(f, "Stopped(exit={exit_code})"),
637 ProcStatus::Killed {
638 signal,
639 core_dumped,
640 } => {
641 if *core_dumped {
642 write!(f, "Killed(sig={signal}, core)")
643 } else {
644 write!(f, "Killed(sig={signal})")
645 }
646 }
647 ProcStatus::Failed { reason } => write!(f, "Failed({reason})"),
648 }
649 }
650}
651
652#[derive(Debug, Clone)]
654pub enum ReadyError {
655 Terminal(ProcStatus),
657 ChannelClosed,
659}
660
661impl std::fmt::Display for ReadyError {
662 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
663 match self {
664 ReadyError::Terminal(st) => write!(f, "proc terminated before running: {st:?}"),
665 ReadyError::ChannelClosed => write!(f, "status channel closed"),
666 }
667 }
668}
669impl std::error::Error for ReadyError {}
670
671#[derive(Clone)]
710pub struct BootstrapProcHandle {
711 proc_id: ProcId,
713 status: Arc<std::sync::Mutex<ProcStatus>>,
719 child: Arc<std::sync::Mutex<Option<Child>>>,
723 stdout_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
728 stderr_fwder: Arc<std::sync::Mutex<Option<StreamFwder>>>,
731 tx: tokio::sync::watch::Sender<ProcStatus>,
736 rx: tokio::sync::watch::Receiver<ProcStatus>,
740}
741
742impl fmt::Debug for BootstrapProcHandle {
743 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744 let status = self.status.lock().expect("status mutex poisoned").clone();
745 f.debug_struct("BootstrapProcHandle")
746 .field("proc_id", &self.proc_id)
747 .field("status", &status)
748 .field("child", &"<child>")
749 .field("tx", &"<watch::Sender>")
750 .field("rx", &"<watch::Receiver>")
751 .finish()
754 }
755}
756
757impl BootstrapProcHandle {
763 pub fn new(proc_id: ProcId, child: Child) -> Self {
776 let (tx, rx) = watch::channel(ProcStatus::Starting);
777 Self {
778 proc_id,
779 status: Arc::new(std::sync::Mutex::new(ProcStatus::Starting)),
780 child: Arc::new(std::sync::Mutex::new(Some(child))),
781 stdout_fwder: Arc::new(std::sync::Mutex::new(None)),
782 stderr_fwder: Arc::new(std::sync::Mutex::new(None)),
783 tx,
784 rx,
785 }
786 }
787
788 #[inline]
790 pub fn proc_id(&self) -> &ProcId {
791 &self.proc_id
792 }
793
794 #[inline]
807 pub fn watch(&self) -> tokio::sync::watch::Receiver<ProcStatus> {
808 self.rx.clone()
809 }
810
811 #[inline]
829 pub async fn changed(&self) {
830 let _ = self.watch().changed().await;
831 }
832
833 #[inline]
841 pub fn pid(&self) -> Option<u32> {
842 match *self.status.lock().expect("status mutex poisoned") {
843 ProcStatus::Running { pid, .. } | ProcStatus::Ready { pid, .. } => Some(pid),
844 _ => self
845 .child
846 .lock()
847 .expect("child mutex poisoned")
848 .as_ref()
849 .and_then(|c| c.id()),
850 }
851 }
852
853 #[must_use]
866 pub fn status(&self) -> ProcStatus {
867 self.status.lock().expect("status mutex poisoned").clone()
871 }
872
873 fn transition<F>(&self, f: F) -> bool
879 where
880 F: FnOnce(&mut ProcStatus) -> bool,
881 {
882 let mut guard = self.status.lock().expect("status mutex poisoned");
883 let _before = guard.clone();
884 let changed = f(&mut guard);
885 if changed {
886 let _ = self.tx.send(guard.clone());
888 }
889 changed
890 }
891
892 pub(crate) fn mark_running(&self, pid: u32, started_at: SystemTime) -> bool {
904 self.transition(|st| match *st {
905 ProcStatus::Starting => {
906 *st = ProcStatus::Running { pid, started_at };
907 true
908 }
909 _ => {
910 tracing::warn!(
911 "illegal transition: {:?} -> Running; leaving status unchanged",
912 *st
913 );
914 false
915 }
916 })
917 }
918
919 pub(crate) fn mark_ready(
931 &self,
932 pid: u32,
933 started_at: SystemTime,
934 addr: ChannelAddr,
935 agent: ActorRef<ProcMeshAgent>,
936 ) -> bool {
937 tracing::info!(proc_id = %self.proc_id, %addr, "{} running on pid {}", self.proc_id, pid);
938 self.transition(|st| match st {
939 ProcStatus::Starting | ProcStatus::Running { .. } => {
940 *st = ProcStatus::Ready {
941 pid,
942 started_at,
943 addr,
944 agent,
945 };
946 true
947 }
948 _ => {
949 tracing::warn!(
950 "illegal transition: {:?} -> Ready; leaving status unchanged",
951 st
952 );
953 false
954 }
955 })
956 }
957
958 pub(crate) fn mark_stopping(&self) -> bool {
962 let child_pid = self.child_pid_snapshot();
964 let now = hyperactor::clock::RealClock.system_time_now();
965
966 self.transition(|st| match *st {
967 ProcStatus::Running { pid, started_at } => {
968 *st = ProcStatus::Stopping { pid, started_at };
969 true
970 }
971 ProcStatus::Ready {
972 pid, started_at, ..
973 } => {
974 *st = ProcStatus::Stopping { pid, started_at };
975 true
976 }
977 ProcStatus::Starting => {
978 if let Some(pid) = child_pid {
979 *st = ProcStatus::Stopping {
980 pid,
981 started_at: now,
982 };
983 true
984 } else {
985 false
986 }
987 }
988 _ => false,
989 })
990 }
991
992 pub(crate) fn mark_stopped(&self, exit_code: i32, stderr_tail: Vec<String>) -> bool {
995 self.transition(|st| match *st {
996 ProcStatus::Starting
997 | ProcStatus::Running { .. }
998 | ProcStatus::Ready { .. }
999 | ProcStatus::Stopping { .. } => {
1000 *st = ProcStatus::Stopped {
1001 exit_code,
1002 stderr_tail,
1003 };
1004 true
1005 }
1006 _ => {
1007 tracing::warn!(
1008 "illegal transition: {:?} -> Stopped; leaving status unchanged",
1009 *st
1010 );
1011 false
1012 }
1013 })
1014 }
1015
1016 pub(crate) fn mark_killed(&self, signal: i32, core_dumped: bool) -> bool {
1019 self.transition(|st| match *st {
1020 ProcStatus::Starting
1021 | ProcStatus::Running { .. }
1022 | ProcStatus::Ready { .. }
1023 | ProcStatus::Stopping { .. } => {
1024 *st = ProcStatus::Killed {
1025 signal,
1026 core_dumped,
1027 };
1028 true
1029 }
1030 _ => {
1031 tracing::warn!(
1032 "illegal transition: {:?} -> Killed; leaving status unchanged",
1033 *st
1034 );
1035 false
1036 }
1037 })
1038 }
1039
1040 pub(crate) fn mark_failed<S: Into<String>>(&self, reason: S) -> bool {
1043 self.transition(|st| match *st {
1044 ProcStatus::Starting
1045 | ProcStatus::Running { .. }
1046 | ProcStatus::Ready { .. }
1047 | ProcStatus::Stopping { .. } => {
1048 *st = ProcStatus::Failed {
1049 reason: reason.into(),
1050 };
1051 true
1052 }
1053 _ => {
1054 tracing::warn!(
1055 "illegal transition: {:?} -> Failed; leaving status unchanged",
1056 *st
1057 );
1058 false
1059 }
1060 })
1061 }
1062
1063 #[must_use]
1082 pub async fn wait_inner(&self) -> ProcStatus {
1083 let mut rx = self.watch();
1084 loop {
1085 let st = rx.borrow().clone();
1086 if st.is_exit() {
1087 return st;
1088 }
1089 if rx.changed().await.is_err() {
1091 return st;
1092 }
1093 }
1094 }
1095
1096 pub async fn ready_inner(&self) -> Result<(), ReadyError> {
1115 let mut rx = self.watch();
1116 loop {
1117 let st = rx.borrow().clone();
1118 match &st {
1119 ProcStatus::Ready { .. } => return Ok(()),
1120 s if s.is_exit() => return Err(ReadyError::Terminal(st)),
1121 _non_terminal => {
1122 if rx.changed().await.is_err() {
1123 return Err(ReadyError::ChannelClosed);
1124 }
1125 }
1126 }
1127 }
1128 }
1129
1130 fn child_pid_snapshot(&self) -> Option<u32> {
1133 self.child
1134 .lock()
1135 .expect("child mutex poisoned")
1136 .as_ref()
1137 .and_then(|c| c.id())
1138 }
1139
1140 fn signalable_pid(&self) -> Option<i32> {
1145 match &*self.status.lock().expect("status mutex poisoned") {
1146 ProcStatus::Running { pid, .. }
1147 | ProcStatus::Ready { pid, .. }
1148 | ProcStatus::Stopping { pid, .. } => Some(*pid as i32),
1149 _ => self
1150 .child
1151 .lock()
1152 .expect("child mutex poisoned")
1153 .as_ref()
1154 .and_then(|c| c.id())
1155 .map(|p| p as i32),
1156 }
1157 }
1158
1159 fn send_signal(pid: i32, sig: i32) -> Result<(), anyhow::Error> {
1164 let rc = unsafe { libc::kill(pid, sig) };
1167 if rc == 0 {
1168 Ok(())
1169 } else {
1170 let e = std::io::Error::last_os_error();
1171 if let Some(libc::ESRCH) = e.raw_os_error() {
1172 Ok(())
1175 } else {
1176 Err(anyhow::anyhow!("kill({pid}, {sig}) failed: {e}"))
1177 }
1178 }
1179 }
1180
1181 pub fn set_stream_monitors(&self, out: Option<StreamFwder>, err: Option<StreamFwder>) {
1182 *self
1183 .stdout_fwder
1184 .lock()
1185 .expect("stdout_tailer mutex poisoned") = out;
1186 *self
1187 .stderr_fwder
1188 .lock()
1189 .expect("stderr_tailer mutex poisoned") = err;
1190 }
1191
1192 fn take_stream_monitors(&self) -> (Option<StreamFwder>, Option<StreamFwder>) {
1193 let out = self
1194 .stdout_fwder
1195 .lock()
1196 .expect("stdout_tailer mutex poisoned")
1197 .take();
1198 let err = self
1199 .stderr_fwder
1200 .lock()
1201 .expect("stderr_tailer mutex poisoned")
1202 .take();
1203 (out, err)
1204 }
1205
1206 async fn send_stop_all(
1210 &self,
1211 cx: &impl context::Actor,
1212 agent: ActorRef<ProcMeshAgent>,
1213 timeout: Duration,
1214 ) -> anyhow::Result<ProcStatus> {
1215 let mut agent_port = agent.port();
1222 agent_port.return_undeliverable(false);
1223 agent_port.send(cx, resource::StopAll {})?;
1224 match RealClock.timeout(timeout, self.wait()).await {
1227 Ok(Ok(st)) => Ok(st),
1228 Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1229 Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
1230 }
1231 }
1232}
1233
1234#[async_trait]
1235impl hyperactor::host::ProcHandle for BootstrapProcHandle {
1236 type Agent = ProcMeshAgent;
1237 type TerminalStatus = ProcStatus;
1238
1239 #[inline]
1240 fn proc_id(&self) -> &ProcId {
1241 &self.proc_id
1242 }
1243
1244 #[inline]
1245 fn addr(&self) -> Option<ChannelAddr> {
1246 match &*self.status.lock().expect("status mutex poisoned") {
1247 ProcStatus::Ready { addr, .. } => Some(addr.clone()),
1248 _ => None,
1249 }
1250 }
1251
1252 #[inline]
1253 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1254 match &*self.status.lock().expect("status mutex poisoned") {
1255 ProcStatus::Ready { agent, .. } => Some(agent.clone()),
1256 _ => None,
1257 }
1258 }
1259
1260 async fn ready(&self) -> Result<(), hyperactor::host::ReadyError<Self::TerminalStatus>> {
1271 match self.ready_inner().await {
1272 Ok(()) => Ok(()),
1273 Err(ReadyError::Terminal(status)) => {
1274 Err(hyperactor::host::ReadyError::Terminal(status))
1275 }
1276 Err(ReadyError::ChannelClosed) => Err(hyperactor::host::ReadyError::ChannelClosed),
1277 }
1278 }
1279
1280 async fn wait(&self) -> Result<Self::TerminalStatus, hyperactor::host::WaitError> {
1288 let status = self.wait_inner().await;
1289 if status.is_exit() {
1290 Ok(status)
1291 } else {
1292 Err(hyperactor::host::WaitError::ChannelClosed)
1293 }
1294 }
1295
1296 async fn terminate(
1326 &self,
1327 cx: &impl context::Actor,
1328 timeout: Duration,
1329 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1330 const HARD_WAIT_AFTER_KILL: Duration = Duration::from_secs(5);
1331
1332 let st0 = self.status();
1334 if st0.is_exit() {
1335 tracing::debug!(?st0, "terminate(): already terminal");
1336 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1337 }
1338
1339 let pid = self.signalable_pid().ok_or_else(|| {
1341 let st = self.status();
1342 tracing::warn!(?st, "terminate(): no signalable pid");
1343 hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1344 "no signalable pid (state: {:?})",
1345 st
1346 ))
1347 })?;
1348
1349 let agent = self.agent_ref();
1354 if let Some(agent) = agent {
1355 match self.send_stop_all(cx, agent.clone(), timeout).await {
1356 Ok(st) => return Ok(st),
1357 Err(e) => {
1358 tracing::warn!(
1360 "ProcMeshAgent {} could not successfully stop all actors: {}",
1361 agent.actor_id(),
1362 e,
1363 );
1364 }
1365 }
1366 }
1367 let _ = self.mark_stopping();
1370
1371 tracing::info!(pid, ?timeout, "terminate(): sending SIGTERM");
1373 if let Err(e) = Self::send_signal(pid, libc::SIGTERM) {
1374 tracing::warn!(pid, error=%e, "terminate(): SIGTERM delivery failed");
1375 return Err(hyperactor::host::TerminateError::Io(e));
1376 }
1377 tracing::debug!(pid, "terminate(): SIGTERM sent");
1378
1379 match RealClock.timeout(timeout, self.wait_inner()).await {
1381 Ok(st) if st.is_exit() => {
1382 tracing::info!(pid, ?st, "terminate(): exited after SIGTERM");
1383 Ok(st)
1384 }
1385 Ok(non_exit) => {
1386 tracing::warn!(pid, ?non_exit, "terminate(): wait returned non-terminal");
1389 Err(hyperactor::host::TerminateError::ChannelClosed)
1390 }
1391 Err(_elapsed) => {
1392 tracing::warn!(pid, "terminate(): timeout; escalating to SIGKILL");
1394 if let Some(pid2) = self.signalable_pid() {
1395 if let Err(e) = Self::send_signal(pid2, libc::SIGKILL) {
1396 tracing::warn!(pid=pid2, error=%e, "terminate(): SIGKILL delivery failed");
1397 return Err(hyperactor::host::TerminateError::Io(e));
1398 }
1399 tracing::info!(pid = pid2, "terminate(): SIGKILL sent");
1400 } else {
1401 tracing::warn!("terminate(): lost pid before SIGKILL escalation");
1402 }
1403 match RealClock
1405 .timeout(HARD_WAIT_AFTER_KILL, self.wait_inner())
1406 .await
1407 {
1408 Ok(st) if st.is_exit() => {
1409 tracing::info!(?st, "terminate(): exited after SIGKILL");
1410 Ok(st)
1411 }
1412 other => {
1413 tracing::warn!(
1414 ?other,
1415 "terminate(): post-KILL wait did not yield terminal"
1416 );
1417 Err(hyperactor::host::TerminateError::ChannelClosed)
1418 }
1419 }
1420 }
1421 }
1422 }
1423
1424 async fn kill(
1450 &self,
1451 ) -> Result<ProcStatus, hyperactor::host::TerminateError<Self::TerminalStatus>> {
1452 let st0 = self.status();
1460 if st0.is_exit() {
1461 return Err(hyperactor::host::TerminateError::AlreadyTerminated(st0));
1462 }
1463
1464 let pid = self.signalable_pid().ok_or_else(|| {
1466 hyperactor::host::TerminateError::Io(anyhow::anyhow!(
1467 "no signalable pid (state: {:?})",
1468 self.status()
1469 ))
1470 })?;
1471
1472 if let Err(e) = Self::send_signal(pid, libc::SIGKILL) {
1473 return Err(hyperactor::host::TerminateError::Io(e));
1474 }
1475
1476 let st = self.wait_inner().await;
1478 if st.is_exit() {
1479 Ok(st)
1480 } else {
1481 Err(hyperactor::host::TerminateError::ChannelClosed)
1482 }
1483 }
1484}
1485
1486#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1488pub struct BootstrapCommand {
1489 pub program: PathBuf,
1490 pub arg0: Option<String>,
1491 pub args: Vec<String>,
1492 pub env: HashMap<String, String>,
1493}
1494
1495impl BootstrapCommand {
1496 pub fn current() -> io::Result<Self> {
1499 let mut args: VecDeque<String> = std::env::args().collect();
1500 let arg0 = args.pop_front();
1501
1502 Ok(Self {
1503 program: std::env::current_exe()?,
1504 arg0,
1505 args: args.into(),
1506 env: std::env::vars().collect(),
1507 })
1508 }
1509
1510 pub fn new(&self) -> Command {
1513 let mut cmd = Command::new(&self.program);
1514 if let Some(arg0) = &self.arg0 {
1515 cmd.arg0(arg0);
1516 }
1517 for arg in &self.args {
1518 cmd.arg(arg);
1519 }
1520 for (k, v) in &self.env {
1521 cmd.env(k, v);
1522 }
1523 cmd
1524 }
1525
1526 #[cfg(test)]
1533 #[cfg(fbcode_build)]
1534 pub(crate) fn test() -> Self {
1535 Self {
1536 program: crate::testresource::get("monarch/hyperactor_mesh/bootstrap"),
1537 arg0: None,
1538 args: vec![],
1539 env: HashMap::new(),
1540 }
1541 }
1542}
1543
1544impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1545 fn from(s: T) -> Self {
1547 Self {
1548 program: s.into(),
1549 arg0: None,
1550 args: vec![],
1551 env: HashMap::new(),
1552 }
1553 }
1554}
1555
1556#[derive(Debug)]
1577pub struct BootstrapProcManager {
1578 command: BootstrapCommand,
1580 children: Arc<tokio::sync::Mutex<HashMap<ProcId, BootstrapProcHandle>>>,
1584 pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1588 file_appender: Option<Arc<crate::logging::FileAppender>>,
1591
1592 socket_dir: TempDir,
1596}
1597
1598impl Drop for BootstrapProcManager {
1599 fn drop(&mut self) {
1616 if let Ok(table) = self.pid_table.lock() {
1617 for (proc_id, pid) in table.iter() {
1618 unsafe {
1626 libc::kill(*pid as i32, libc::SIGKILL);
1627 }
1628 tracing::info!(
1629 "BootstrapProcManager::drop: sent SIGKILL to pid {} for {:?}",
1630 pid,
1631 proc_id
1632 );
1633 }
1634 }
1635 }
1636}
1637
1638impl BootstrapProcManager {
1639 pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1646 let file_appender = if hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE) {
1647 match crate::logging::FileAppender::new() {
1648 Some(fm) => {
1649 tracing::info!("file appender created successfully");
1650 Some(Arc::new(fm))
1651 }
1652 None => {
1653 tracing::warn!("failed to create file appender");
1654 None
1655 }
1656 }
1657 } else {
1658 None
1659 };
1660
1661 Ok(Self {
1662 command,
1663 children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1664 pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1665 file_appender,
1666 socket_dir: runtime_dir()?,
1667 })
1668 }
1669
1670 pub fn command(&self) -> &BootstrapCommand {
1672 &self.command
1673 }
1674
1675 pub fn socket_dir(&self) -> &Path {
1677 self.socket_dir.path()
1678 }
1679
1680 pub async fn status(&self, proc_id: &ProcId) -> Option<ProcStatus> {
1691 self.children.lock().await.get(proc_id).map(|h| h.status())
1692 }
1693
1694 fn spawn_exit_monitor(&self, proc_id: ProcId, handle: BootstrapProcHandle) {
1695 let pid_table = Arc::clone(&self.pid_table);
1696
1697 let maybe_child = {
1698 let mut guard = handle.child.lock().expect("child mutex");
1699 let taken = guard.take();
1700 debug_assert!(guard.is_none(), "no Child should remain in handles");
1701 taken
1702 };
1703
1704 let Some(mut child) = maybe_child else {
1705 tracing::debug!("proc {proc_id}: child was already taken; skipping wait");
1706 return;
1707 };
1708
1709 tokio::spawn(async move {
1710 let wait_res = child.wait().await;
1711
1712 let mut stderr_tail: Vec<String> = Vec::new();
1713 let (stdout_mon, stderr_mon) = handle.take_stream_monitors();
1714
1715 if let Some(t) = stderr_mon {
1716 let (lines, _bytes) = t.abort().await;
1717 stderr_tail = lines;
1718 }
1719 if let Some(t) = stdout_mon {
1720 let (_lines, _bytes) = t.abort().await;
1721 }
1722
1723 let tail_str = if stderr_tail.is_empty() {
1724 None
1725 } else {
1726 Some(stderr_tail.join("\n"))
1727 };
1728
1729 let remove_from_pid_table = || {
1730 let pid = if let Ok(mut table) = pid_table.lock() {
1731 table.remove(&proc_id)
1732 } else {
1733 None
1734 };
1735
1736 pid.map_or_else(|| "not available".to_string(), |i| format!("{i}"))
1737 };
1738
1739 match wait_res {
1740 Ok(status) => {
1741 if let Some(sig) = status.signal() {
1742 let _ = handle.mark_killed(sig, status.core_dumped());
1743 let pid_str = remove_from_pid_table();
1744 tracing::info!(
1745 name = "ProcStatus",
1746 status = "Exited::KilledBySignal",
1747 %proc_id,
1748 tail = tail_str,
1749 "killed by signal {sig}; proc's pid: {pid_str}"
1750 );
1751 } else if let Some(code) = status.code() {
1752 let _ = handle.mark_stopped(code, stderr_tail);
1753 let pid_str = remove_from_pid_table();
1754 tracing::info!(
1755 name = "ProcStatus",
1756 status = "Exited::ExitWithCode",
1757 %proc_id,
1758 exit_code = code,
1759 tail = tail_str,
1760 "proc exited; proc's pid: {pid_str}"
1761 );
1762 } else {
1763 debug_assert!(
1764 false,
1765 "unreachable: process terminated with neither signal nor exit code"
1766 );
1767 let _ = handle.mark_failed("process exited with unknown status");
1768 let pid_str = remove_from_pid_table();
1769 tracing::info!(
1770 name = "ProcStatus",
1771 status = "Exited::Unknown",
1772 %proc_id,
1773 tail = tail_str,
1774 "unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}"
1775 );
1776 }
1777 }
1778 Err(e) => {
1779 let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
1780 let pid_str = remove_from_pid_table();
1781 tracing::info!(
1782 name = "ProcStatus",
1783 status = "Exited::WaitFailed",
1784 %proc_id,
1785 tail = tail_str,
1786 "proc {proc_id} wait failed; proc's pid: {pid_str}"
1787 );
1788 }
1789 }
1790 });
1791 }
1792}
1793
1794pub struct BootstrapProcConfig {
1796 pub create_rank: usize,
1798
1799 pub client_config_override: Attrs,
1802}
1803
1804#[async_trait]
1805impl ProcManager for BootstrapProcManager {
1806 type Handle = BootstrapProcHandle;
1807
1808 type Config = BootstrapProcConfig;
1809
1810 fn transport(&self) -> ChannelTransport {
1817 ChannelTransport::Unix
1818 }
1819
1820 #[tracing::instrument(skip(self, config))]
1847 async fn spawn(
1848 &self,
1849 proc_id: ProcId,
1850 backend_addr: ChannelAddr,
1851 config: BootstrapProcConfig,
1852 ) -> Result<Self::Handle, HostError> {
1853 let (callback_addr, mut callback_rx) =
1854 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1855
1856 let overrides = &config.client_config_override;
1858 let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1859 let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1860 let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1861 let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1862
1863 let mode = Bootstrap::Proc {
1864 proc_id: proc_id.clone(),
1865 backend_addr,
1866 callback_addr,
1867 socket_dir_path: self.socket_dir.path().to_owned(),
1868 config: Some(config.client_config_override),
1869 };
1870 let mut cmd = self.command.new();
1871 cmd.env(
1872 "HYPERACTOR_MESH_BOOTSTRAP_MODE",
1873 mode.to_env_safe_string()
1874 .map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
1875 );
1876
1877 if need_stdio {
1878 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1879 } else {
1880 cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
1881 tracing::info!(
1882 %proc_id, enable_forwarding, enable_file_capture, tail_size,
1883 "child stdio NOT captured (forwarding/file_capture/tail all disabled); inheriting parent console"
1884 );
1885 }
1886
1887 let log_channel: Option<ChannelAddr> = if enable_forwarding {
1888 let addr = ChannelAddr::any(ChannelTransport::Unix);
1889 cmd.env(BOOTSTRAP_LOG_CHANNEL, addr.to_string());
1890 Some(addr)
1891 } else {
1892 None
1893 };
1894
1895 let mut child = cmd
1896 .spawn()
1897 .map_err(|e| HostError::ProcessSpawnFailure(proc_id.clone(), e))?;
1898 let pid = child.id().unwrap_or_default();
1899
1900 let (out_fwder, err_fwder) = if need_stdio {
1901 let stdout: ChildStdout = child.stdout.take().expect("stdout piped but missing");
1902 let stderr: ChildStderr = child.stderr.take().expect("stderr piped but missing");
1903
1904 let (file_stdout, file_stderr) = if enable_file_capture {
1905 match self.file_appender.as_deref() {
1906 Some(fm) => (
1907 Some(fm.addr_for(OutputTarget::Stdout)),
1908 Some(fm.addr_for(OutputTarget::Stderr)),
1909 ),
1910 None => {
1911 tracing::warn!("enable_file_capture=true but no FileAppender");
1912 (None, None)
1913 }
1914 }
1915 } else {
1916 (None, None)
1917 };
1918
1919 let out = StreamFwder::start(
1920 stdout,
1921 file_stdout, OutputTarget::Stdout,
1923 tail_size,
1924 log_channel.clone(), pid,
1926 config.create_rank,
1927 );
1928 let err = StreamFwder::start(
1929 stderr,
1930 file_stderr,
1931 OutputTarget::Stderr,
1932 tail_size,
1933 log_channel.clone(),
1934 pid,
1935 config.create_rank,
1936 );
1937 (Some(out), Some(err))
1938 } else {
1939 (None, None)
1940 };
1941
1942 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
1943
1944 if let Some(pid) = handle.pid() {
1945 handle.mark_running(pid, hyperactor::clock::RealClock.system_time_now());
1946 if let Ok(mut table) = self.pid_table.lock() {
1947 table.insert(proc_id.clone(), pid);
1948 }
1949 }
1950
1951 handle.set_stream_monitors(out_fwder, err_fwder);
1952
1953 {
1955 let mut children = self.children.lock().await;
1956 children.insert(proc_id.clone(), handle.clone());
1957 }
1958
1959 self.spawn_exit_monitor(proc_id.clone(), handle.clone());
1962
1963 let h = handle.clone();
1964 let pid_table = Arc::clone(&self.pid_table);
1965 tokio::spawn(async move {
1966 match callback_rx.recv().await {
1967 Ok((addr, agent)) => {
1968 let pid = match h.pid() {
1969 Some(p) => p,
1970 None => {
1971 tracing::warn!("mark_ready called with missing pid; using 0");
1972 0
1973 }
1974 };
1975 let started_at = RealClock.system_time_now();
1976 let _ = h.mark_ready(pid, started_at, addr, agent);
1977 }
1978 Err(e) => {
1979 let _ = h.mark_failed(format!("bootstrap callback failed: {e}"));
1981 if let Ok(mut table) = pid_table.lock() {
1983 table.remove(&proc_id);
1984 }
1985 }
1986 }
1987 });
1988
1989 Ok(handle)
1991 }
1992}
1993
1994#[async_trait]
1995impl hyperactor::host::SingleTerminate for BootstrapProcManager {
1996 async fn terminate_proc(
2006 &self,
2007 cx: &impl context::Actor,
2008 proc: &ProcId,
2009 timeout: Duration,
2010 ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
2011 let proc_handle: Option<BootstrapProcHandle> = {
2013 let mut guard = self.children.lock().await;
2014 guard.remove(proc)
2015 };
2016
2017 if let Some(h) = proc_handle {
2018 h.terminate(cx, timeout)
2019 .await
2020 .map(|_| (Vec::new(), Vec::new()))
2021 .map_err(|e| e.into())
2022 } else {
2023 Err(anyhow::anyhow!("proc doesn't exist: {}", proc))
2024 }
2025 }
2026}
2027
2028#[async_trait]
2029impl hyperactor::host::BulkTerminate for BootstrapProcManager {
2030 async fn terminate_all(
2044 &self,
2045 cx: &impl context::Actor,
2046 timeout: Duration,
2047 max_in_flight: usize,
2048 ) -> TerminateSummary {
2049 let handles: Vec<BootstrapProcHandle> = {
2051 let guard = self.children.lock().await;
2052 guard.values().cloned().collect()
2053 };
2054
2055 let attempted = handles.len();
2056 let mut ok = 0usize;
2057
2058 let results = stream::iter(handles.into_iter().map(|h| async move {
2059 match h.terminate(cx, timeout).await {
2060 Ok(_) | Err(hyperactor::host::TerminateError::AlreadyTerminated(_)) => {
2061 true
2063 }
2064 Err(e) => {
2065 tracing::warn!(error=%e, "terminate_all: failed to terminate child");
2066 false
2067 }
2068 }
2069 }))
2070 .buffer_unordered(max_in_flight.max(1))
2071 .collect::<Vec<bool>>()
2072 .await;
2073
2074 for r in results {
2075 if r {
2076 ok += 1;
2077 }
2078 }
2079
2080 TerminateSummary {
2081 attempted,
2082 ok,
2083 failed: attempted.saturating_sub(ok),
2084 }
2085 }
2086}
2087
2088pub async fn bootstrap() -> anyhow::Error {
2103 let boot = ok!(Bootstrap::get_from_env()).unwrap_or_else(Bootstrap::default);
2104 boot.bootstrap().await
2105}
2106
2107async fn bootstrap_v0_proc_mesh() -> anyhow::Error {
2117 pub async fn go() -> Result<(), anyhow::Error> {
2118 let procs = Arc::new(tokio::sync::Mutex::new(Vec::<Proc>::new()));
2119 let procs_for_cleanup = procs.clone();
2120 let _cleanup_guard = hyperactor::register_signal_cleanup_scoped(Box::pin(async move {
2121 for proc_to_stop in procs_for_cleanup.lock().await.iter_mut() {
2122 if let Err(err) = proc_to_stop
2123 .destroy_and_wait::<()>(Duration::from_millis(10), None)
2124 .await
2125 {
2126 tracing::error!(
2127 "error while stopping proc {}: {}",
2128 proc_to_stop.proc_id(),
2129 err
2130 );
2131 }
2132 }
2133 }));
2134
2135 let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
2136 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
2137 .parse()?;
2138 let bootstrap_index: usize = std::env::var(BOOTSTRAP_INDEX_ENV)
2139 .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))?
2140 .parse()?;
2141 let listen_addr = ChannelAddr::any(bootstrap_addr.transport());
2142
2143 let entered = tracing::span!(
2144 Level::INFO,
2145 "bootstrap_v0_proc_mesh",
2146 %bootstrap_addr,
2147 %bootstrap_index,
2148 %listen_addr,
2149 )
2150 .entered();
2151
2152 let (serve_addr, mut rx) = channel::serve(listen_addr)?;
2153 let tx = channel::dial(bootstrap_addr.clone())?;
2154
2155 let (rtx, mut return_channel) = oneshot::channel();
2156 tx.try_post(
2157 Process2Allocator(bootstrap_index, Process2AllocatorMessage::Hello(serve_addr)),
2158 rtx,
2159 );
2160 tokio::spawn(exit_if_missed_heartbeat(bootstrap_index, bootstrap_addr));
2161
2162 let _ = entered.exit();
2163
2164 let mut the_msg;
2165
2166 tokio::select! {
2167 msg = rx.recv() => {
2168 the_msg = msg;
2169 }
2170 returned_msg = &mut return_channel => {
2171 match returned_msg {
2172 Ok(msg) => {
2173 return Err(anyhow::anyhow!("Hello message was not delivered:{:?}", msg));
2174 }
2175 Err(_) => {
2176 the_msg = rx.recv().await;
2177 }
2178 }
2179 }
2180 }
2181 loop {
2182 match the_msg? {
2183 Allocator2Process::StartProc(proc_id, listen_transport) => {
2184 let span = tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport);
2185 let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone())
2186 .instrument(span.clone())
2187 .await?;
2188 let entered = span.entered();
2189 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?;
2190 let handle = proc.clone().serve(proc_rx);
2191 drop(handle); let span = entered.exit();
2193 tx.send(Process2Allocator(
2194 bootstrap_index,
2195 Process2AllocatorMessage::StartedProc(
2196 proc_id.clone(),
2197 mesh_agent.bind(),
2198 proc_addr,
2199 ),
2200 ))
2201 .instrument(span)
2202 .await?;
2203 procs.lock().await.push(proc);
2204 }
2205 Allocator2Process::StopAndExit(code) => {
2206 tracing::info!("stopping procs with code {code}");
2207 {
2208 for proc_to_stop in procs.lock().await.iter_mut() {
2209 if let Err(err) = proc_to_stop
2210 .destroy_and_wait::<()>(Duration::from_millis(10), None)
2211 .await
2212 {
2213 tracing::error!(
2214 "error while stopping proc {}: {}",
2215 proc_to_stop.proc_id(),
2216 err
2217 );
2218 }
2219 }
2220 }
2221 tracing::info!("exiting with {code}");
2222 std::process::exit(code);
2223 }
2224 Allocator2Process::Exit(code) => {
2225 tracing::info!("exiting with {code}");
2226 std::process::exit(code);
2227 }
2228 }
2229 the_msg = rx.recv().await;
2230 }
2231 }
2232
2233 go().await.unwrap_err()
2234}
2235
2236pub async fn bootstrap_or_die() -> ! {
2239 let err = bootstrap().await;
2240 let _ = writeln!(Debug, "failed to bootstrap mesh process: {}", err);
2241 tracing::error!("failed to bootstrap mesh process: {}", err);
2242 std::process::exit(1)
2243}
2244
2245#[derive(enum_as_inner::EnumAsInner)]
2246enum DebugSink {
2247 File(std::fs::File),
2248 Sink,
2249}
2250
2251impl DebugSink {
2252 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2253 match self {
2254 DebugSink::File(f) => f.write(buf),
2255 DebugSink::Sink => Ok(buf.len()),
2256 }
2257 }
2258 fn flush(&mut self) -> io::Result<()> {
2259 match self {
2260 DebugSink::File(f) => f.flush(),
2261 DebugSink::Sink => Ok(()),
2262 }
2263 }
2264}
2265
2266fn debug_sink() -> &'static Mutex<DebugSink> {
2267 static DEBUG_SINK: OnceLock<Mutex<DebugSink>> = OnceLock::new();
2268 DEBUG_SINK.get_or_init(|| {
2269 let debug_path = {
2270 let mut p = std::env::temp_dir();
2271 if let Ok(user) = std::env::var("USER") {
2272 p.push(user);
2273 }
2274 std::fs::create_dir_all(&p).ok();
2275 p.push("monarch-bootstrap-debug.log");
2276 p
2277 };
2278 let sink = if debug_path.exists() {
2279 match OpenOptions::new()
2280 .append(true)
2281 .create(true)
2282 .open(debug_path.clone())
2283 {
2284 Ok(f) => DebugSink::File(f),
2285 Err(_e) => {
2286 eprintln!(
2287 "failed to open {} for bootstrap debug logging",
2288 debug_path.display()
2289 );
2290 DebugSink::Sink
2291 }
2292 }
2293 } else {
2294 DebugSink::Sink
2295 };
2296 Mutex::new(sink)
2297 })
2298}
2299
2300const DEBUG_TO_STDERR: bool = false;
2302
2303struct Debug;
2306
2307impl Debug {
2308 fn is_active() -> bool {
2309 DEBUG_TO_STDERR || debug_sink().lock().unwrap().is_file()
2310 }
2311}
2312
2313impl Write for Debug {
2314 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2315 let res = debug_sink().lock().unwrap().write(buf);
2316 if DEBUG_TO_STDERR {
2317 let n = match res {
2318 Ok(n) => n,
2319 Err(_) => buf.len(),
2320 };
2321 let _ = io::stderr().write_all(&buf[..n]);
2322 }
2323
2324 res
2325 }
2326 fn flush(&mut self) -> io::Result<()> {
2327 let res = debug_sink().lock().unwrap().flush();
2328 if DEBUG_TO_STDERR {
2329 let _ = io::stderr().flush();
2330 }
2331 res
2332 }
2333}
2334
2335fn runtime_dir() -> io::Result<TempDir> {
2338 match std::env::var_os("XDG_RUNTIME_DIR") {
2339 Some(runtime_dir) => {
2340 let path = PathBuf::from(runtime_dir);
2341 tempfile::tempdir_in(path)
2342 }
2343 None => tempfile::tempdir(),
2344 }
2345}
2346
2347#[cfg(test)]
2348mod tests {
2349 use std::path::PathBuf;
2350 use std::process::Stdio;
2351
2352 use hyperactor::ActorId;
2353 use hyperactor::ActorRef;
2354 use hyperactor::ProcId;
2355 use hyperactor::WorldId;
2356 use hyperactor::channel::ChannelAddr;
2357 use hyperactor::channel::ChannelTransport;
2358 use hyperactor::channel::TcpMode;
2359 use hyperactor::clock::RealClock;
2360 use hyperactor::context::Mailbox as _;
2361 use hyperactor::host::ProcHandle;
2362 use hyperactor::id;
2363 use ndslice::Extent;
2364 use ndslice::ViewExt;
2365 use ndslice::extent;
2366 use tokio::process::Command;
2367
2368 use super::*;
2369 use crate::alloc::AllocSpec;
2370 use crate::alloc::Allocator;
2371 use crate::alloc::ProcessAllocator;
2372 use crate::v1::ActorMesh;
2373 use crate::v1::host_mesh::HostMesh;
2374 use crate::v1::testactor;
2375
2376 fn any_addr_for_test() -> ChannelAddr {
2379 ChannelAddr::any(ChannelTransport::Unix)
2380 }
2381
2382 #[test]
2383 fn test_bootstrap_mode_env_string_none_config_proc() {
2384 let values = [
2385 Bootstrap::default(),
2386 Bootstrap::Proc {
2387 proc_id: id!(foo[0]),
2388 backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
2389 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2390 socket_dir_path: PathBuf::from("notexist"),
2391 config: None,
2392 },
2393 ];
2394
2395 for value in values {
2396 let safe = value.to_env_safe_string().unwrap();
2397 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2398
2399 let safe2 = round.to_env_safe_string().unwrap();
2402 assert_eq!(safe, safe2, "env-safe round-trip should be stable");
2403
2404 match (&value, &round) {
2406 (Bootstrap::Proc { config: None, .. }, Bootstrap::Proc { config: None, .. }) => {}
2407 (Bootstrap::V0ProcMesh, Bootstrap::V0ProcMesh) => {}
2408 _ => panic!("decoded variant mismatch: got {:?}", round),
2409 }
2410 }
2411 }
2412
2413 #[test]
2414 fn test_bootstrap_mode_env_string_none_config_host() {
2415 let value = Bootstrap::Host {
2416 addr: ChannelAddr::any(ChannelTransport::Unix),
2417 command: None,
2418 config: None,
2419 };
2420
2421 let safe = value.to_env_safe_string().unwrap();
2422 let round = Bootstrap::from_env_safe_string(&safe).unwrap();
2423
2424 let safe2 = round.to_env_safe_string().unwrap();
2426 assert_eq!(safe, safe2);
2427
2428 match round {
2430 Bootstrap::Host { config: None, .. } => {}
2431 other => panic!("expected Host with None config, got {:?}", other),
2432 }
2433 }
2434
2435 #[test]
2436 fn test_bootstrap_mode_env_string_invalid() {
2437 assert!(Bootstrap::from_env_safe_string("!!!").is_err());
2439 }
2440
2441 #[test]
2442 fn test_bootstrap_env_roundtrip_with_config_proc_and_host() {
2443 let mut attrs = Attrs::new();
2445 attrs[MESH_TAIL_LOG_LINES] = 123;
2446 attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
2447
2448 let socket_dir = runtime_dir().unwrap();
2449
2450 {
2452 let original = Bootstrap::Proc {
2453 proc_id: id!(foo[42]),
2454 backend_addr: ChannelAddr::any(ChannelTransport::Unix),
2455 callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2456 config: Some(attrs.clone()),
2457 socket_dir_path: socket_dir.path().to_owned(),
2458 };
2459 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2460 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2461 match &decoded {
2462 Bootstrap::Proc { config, .. } => {
2463 let cfg = config.as_ref().expect("expected Some(attrs)");
2464 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2465 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2466 }
2467 other => panic!("unexpected variant after roundtrip: {:?}", other),
2468 }
2469 }
2470
2471 {
2473 let original = Bootstrap::Host {
2474 addr: ChannelAddr::any(ChannelTransport::Unix),
2475 command: None,
2476 config: Some(attrs.clone()),
2477 };
2478 let env_str = original.to_env_safe_string().expect("encode bootstrap");
2479 let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
2480 match &decoded {
2481 Bootstrap::Host { config, .. } => {
2482 let cfg = config.as_ref().expect("expected Some(attrs)");
2483 assert_eq!(cfg[MESH_TAIL_LOG_LINES], 123);
2484 assert!(!cfg[MESH_BOOTSTRAP_ENABLE_PDEATHSIG]);
2485 }
2486 other => panic!("unexpected variant after roundtrip: {:?}", other),
2487 }
2488 }
2489 }
2490
2491 #[tokio::test]
2492 async fn test_child_terminated_on_manager_drop() {
2493 use std::path::PathBuf;
2494 use std::process::Stdio;
2495
2496 use tokio::process::Command;
2497
2498 let command = BootstrapCommand {
2500 program: PathBuf::from("/bin/true"),
2501 ..Default::default()
2502 };
2503 let manager = BootstrapProcManager::new(command).unwrap();
2504
2505 let mut cmd = Command::new("/bin/sleep");
2508 cmd.arg("30")
2509 .stdout(Stdio::null())
2510 .stderr(Stdio::null())
2511 .kill_on_drop(true);
2512
2513 let child = cmd.spawn().expect("spawn sleep");
2514 let pid = child.id().expect("pid");
2515
2516 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "test".to_string());
2519 {
2520 let mut children = manager.children.lock().await;
2521 children.insert(
2522 proc_id.clone(),
2523 BootstrapProcHandle::new(proc_id.clone(), child),
2524 );
2525 }
2526
2527 #[cfg(target_os = "linux")]
2529 {
2530 let path = format!("/proc/{}", pid);
2531 assert!(
2532 std::fs::metadata(&path).is_ok(),
2533 "expected /proc/{pid} to exist before drop"
2534 );
2535 }
2536
2537 drop(manager);
2542
2543 #[cfg(target_os = "linux")]
2546 {
2547 let deadline = std::time::Instant::now() + Duration::from_millis(1500);
2548 let proc_dir = format!("/proc/{}", pid);
2549 let status_file = format!("{}/status", proc_dir);
2550
2551 let mut ok = false;
2552 while std::time::Instant::now() < deadline {
2553 match std::fs::read_to_string(&status_file) {
2554 Ok(s) => {
2555 if let Some(state_line) = s.lines().find(|l| l.starts_with("State:")) {
2556 if state_line.contains('Z') {
2557 ok = true;
2559 break;
2560 } else {
2561 }
2563 }
2564 }
2565 Err(_) => {
2566 ok = true;
2568 break;
2569 }
2570 }
2571 RealClock.sleep(Duration::from_millis(100)).await;
2572 }
2573
2574 assert!(ok, "expected /proc/{pid} to be gone or zombie after drop");
2575 }
2576
2577 }
2580
2581 #[tokio::test]
2582 async fn test_v1_child_logging() {
2583 use hyperactor::channel;
2584 use hyperactor::data::Serialized;
2585 use hyperactor::mailbox::BoxedMailboxSender;
2586 use hyperactor::mailbox::DialMailboxRouter;
2587 use hyperactor::mailbox::MailboxServer;
2588 use hyperactor::proc::Proc;
2589
2590 use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL;
2591 use crate::logging::LogClientActor;
2592 use crate::logging::LogClientMessageClient;
2593 use crate::logging::LogForwardActor;
2594 use crate::logging::LogMessage;
2595 use crate::logging::OutputTarget;
2596 use crate::logging::test_tap;
2597
2598 let router = DialMailboxRouter::new();
2599 let (proc_addr, proc_rx) =
2600 channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
2601 let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone()));
2602 proc.clone().serve(proc_rx);
2603 router.bind(id!(client[0]).into(), proc_addr.clone());
2604 let (client, _handle) = proc.instance("client").unwrap();
2605
2606 let (tap_tx, mut tap_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
2607 test_tap::install(tap_tx);
2608
2609 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
2610 unsafe {
2612 std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
2613 }
2614
2615 let log_client: ActorRef<LogClientActor> =
2618 proc.spawn("log_client", ()).await.unwrap().bind();
2619 log_client.set_aggregate(&client, None).await.unwrap();
2620
2621 let _log_forwarder: ActorRef<LogForwardActor> = proc
2624 .spawn("log_forwarder", log_client.clone())
2625 .await
2626 .unwrap()
2627 .bind();
2628
2629 let tx = channel::dial::<LogMessage>(log_channel.clone()).unwrap();
2632
2633 tx.post(LogMessage::Log {
2636 hostname: "testhost".into(),
2637 pid: 12345,
2638 output_target: OutputTarget::Stdout,
2639 payload: Serialized::serialize(&"hello from child".to_string()).unwrap(),
2640 });
2641
2642 let line = RealClock
2644 .timeout(Duration::from_secs(2), tap_rx.recv())
2645 .await
2646 .expect("timed out waiting for log line")
2647 .expect("tap channel closed unexpectedly");
2648 assert!(
2649 line.contains("hello from child"),
2650 "log line did not appear via LogClientActor; got: {line}"
2651 );
2652 }
2653
2654 mod proc_handle {
2655
2656 use hyperactor::ActorId;
2657 use hyperactor::ActorRef;
2658 use hyperactor::ProcId;
2659 use hyperactor::WorldId;
2660 use hyperactor::host::ProcHandle;
2661 use tokio::process::Command;
2662
2663 use super::super::*;
2664 use super::any_addr_for_test;
2665
2666 fn handle_for_test() -> BootstrapProcHandle {
2670 let child = Command::new("sh")
2672 .arg("-c")
2673 .arg("exit 0")
2674 .stdin(std::process::Stdio::null())
2675 .stdout(std::process::Stdio::null())
2676 .stderr(std::process::Stdio::null())
2677 .spawn()
2678 .expect("failed to spawn test child process");
2679
2680 let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
2681 BootstrapProcHandle::new(proc_id, child)
2682 }
2683
2684 #[tokio::test]
2685 async fn starting_to_running_ok() {
2686 let h = handle_for_test();
2687 assert!(matches!(h.status(), ProcStatus::Starting));
2688 let child_pid = h.pid().expect("child should have a pid");
2689 let child_started_at = RealClock.system_time_now();
2690 assert!(h.mark_running(child_pid, child_started_at));
2691 match h.status() {
2692 ProcStatus::Running { pid, started_at } => {
2693 assert_eq!(pid, child_pid);
2694 assert_eq!(started_at, child_started_at);
2695 }
2696 other => panic!("expected Running, got {other:?}"),
2697 }
2698 }
2699
2700 #[tokio::test]
2701 async fn running_to_stopping_to_stopped_ok() {
2702 let h = handle_for_test();
2703 let child_pid = h.pid().expect("child should have a pid");
2704 let child_started_at = RealClock.system_time_now();
2705 assert!(h.mark_running(child_pid, child_started_at));
2706 assert!(h.mark_stopping());
2707 assert!(matches!(h.status(), ProcStatus::Stopping { .. }));
2708 assert!(h.mark_stopped(0, Vec::new()));
2709 assert!(matches!(
2710 h.status(),
2711 ProcStatus::Stopped { exit_code: 0, .. }
2712 ));
2713 }
2714
2715 #[tokio::test]
2716 async fn running_to_killed_ok() {
2717 let h = handle_for_test();
2718 let child_pid = h.pid().expect("child should have a pid");
2719 let child_started_at = RealClock.system_time_now();
2720 assert!(h.mark_running(child_pid, child_started_at));
2721 assert!(h.mark_killed(9, true));
2722 assert!(matches!(
2723 h.status(),
2724 ProcStatus::Killed {
2725 signal: 9,
2726 core_dumped: true
2727 }
2728 ));
2729 }
2730
2731 #[tokio::test]
2732 async fn running_to_failed_ok() {
2733 let h = handle_for_test();
2734 let child_pid = h.pid().expect("child should have a pid");
2735 let child_started_at = RealClock.system_time_now();
2736 assert!(h.mark_running(child_pid, child_started_at));
2737 assert!(h.mark_failed("bootstrap error"));
2738 match h.status() {
2739 ProcStatus::Failed { reason } => {
2740 assert_eq!(reason, "bootstrap error");
2741 }
2742 other => panic!("expected Failed(\"bootstrap error\"), got {other:?}"),
2743 }
2744 }
2745
2746 #[tokio::test]
2747 async fn illegal_transitions_are_rejected() {
2748 let h = handle_for_test();
2749 let child_pid = h.pid().expect("child should have a pid");
2750 let child_started_at = RealClock.system_time_now();
2751 assert!(h.mark_running(child_pid, child_started_at));
2753 assert!(!h.mark_running(child_pid, RealClock.system_time_now()));
2754 match h.status() {
2755 ProcStatus::Running { pid, .. } => assert_eq!(pid, child_pid),
2756 other => panic!("expected Running, got {other:?}"),
2757 }
2758 assert!(h.mark_stopping());
2760 assert!(h.mark_stopped(0, Vec::new()));
2761 assert!(!h.mark_running(child_pid, child_started_at));
2762 assert!(!h.mark_killed(9, false));
2763 assert!(!h.mark_failed("nope"));
2764
2765 assert!(matches!(
2766 h.status(),
2767 ProcStatus::Stopped { exit_code: 0, .. }
2768 ));
2769 }
2770
2771 #[tokio::test]
2772 async fn transitions_from_ready_are_legal() {
2773 let h = handle_for_test();
2774 let addr = any_addr_for_test();
2775 let pid = h.pid().expect("child should have a pid");
2777 let t0 = RealClock.system_time_now();
2778 assert!(h.mark_running(pid, t0));
2779 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2782 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2783 let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2784 assert!(h.mark_ready(pid, t0, addr, agent_ref));
2786 assert!(h.mark_stopping());
2787 assert!(h.mark_stopped(0, Vec::new()));
2788 }
2789
2790 #[tokio::test]
2791 async fn ready_to_killed_is_legal() {
2792 let h = handle_for_test();
2793 let addr = any_addr_for_test();
2794 let pid = h.pid().expect("child should have a pid");
2796 let t0 = RealClock.system_time_now();
2797 assert!(h.mark_running(pid, t0));
2798 let proc_id = <BootstrapProcHandle as ProcHandle>::proc_id(&h);
2801 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
2802 let agent: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
2803 assert!(h.mark_ready(pid, t0, addr, agent));
2805 assert!(h.mark_killed(9, false));
2807 }
2808
2809 #[tokio::test]
2810 async fn mark_stopping_from_starting_uses_child_pid_when_available() {
2811 let h = handle_for_test();
2812
2813 let child_pid = h
2816 .pid()
2817 .expect("Child::id() should be available in Starting");
2818
2819 assert!(
2822 h.mark_stopping(),
2823 "mark_stopping() should succeed from Starting"
2824 );
2825 match h.status() {
2826 ProcStatus::Stopping { pid, started_at } => {
2827 assert_eq!(pid, child_pid, "Stopping pid should come from Child::id()");
2828 assert!(
2829 started_at <= RealClock.system_time_now(),
2830 "started_at should be sane"
2831 );
2832 }
2833 other => panic!("expected Stopping{{..}}; got {other:?}"),
2834 }
2835 }
2836
2837 #[tokio::test]
2838 async fn mark_stopping_noop_when_no_child_pid_available() {
2839 let h = handle_for_test();
2840
2841 {
2845 let _ = h.child.lock().expect("child mutex").take();
2846 }
2847
2848 assert!(
2851 !h.mark_stopping(),
2852 "mark_stopping() should no-op from Starting when no pid is observable"
2853 );
2854 assert!(matches!(h.status(), ProcStatus::Starting));
2855 }
2856
2857 #[tokio::test]
2858 async fn mark_failed_from_stopping_is_allowed() {
2859 let h = handle_for_test();
2860
2861 assert!(h.mark_stopping(), "precondition: to Stopping");
2864
2865 assert!(
2867 h.mark_failed("boom"),
2868 "mark_failed() should succeed from Stopping"
2869 );
2870 match h.status() {
2871 ProcStatus::Failed { reason } => assert_eq!(reason, "boom"),
2872 other => panic!("expected Failed(\"boom\"), got {other:?}"),
2873 }
2874 }
2875 }
2876
2877 #[tokio::test]
2878 async fn test_exit_monitor_updates_status_on_clean_exit() {
2879 let command = BootstrapCommand {
2880 program: PathBuf::from("/bin/true"),
2881 ..Default::default()
2882 };
2883 let manager = BootstrapProcManager::new(command).unwrap();
2884
2885 let mut cmd = Command::new("true");
2887 cmd.stdout(Stdio::null()).stderr(Stdio::null());
2888 let child = cmd.spawn().expect("spawn true");
2889
2890 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "clean".into());
2891 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2892
2893 {
2895 let mut children = manager.children.lock().await;
2896 children.insert(proc_id.clone(), handle.clone());
2897 }
2898 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2899 {
2900 let guard = handle.child.lock().expect("child mutex");
2901 assert!(
2902 guard.is_none(),
2903 "expected Child to be taken by exit monitor"
2904 );
2905 }
2906
2907 let st = handle.wait_inner().await;
2908 assert!(matches!(st, ProcStatus::Stopped { .. }), "status={st:?}");
2909 }
2910
2911 #[tokio::test]
2912 async fn test_exit_monitor_updates_status_on_kill() {
2913 let command = BootstrapCommand {
2914 program: PathBuf::from("/bin/sleep"),
2915 ..Default::default()
2916 };
2917 let manager = BootstrapProcManager::new(command).unwrap();
2918
2919 let mut cmd = Command::new("/bin/sleep");
2921 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
2922 let child = cmd.spawn().expect("spawn sleep");
2923 let pid = child.id().expect("pid") as i32;
2924
2925 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "killed".into());
2927 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
2928 {
2929 let mut children = manager.children.lock().await;
2930 children.insert(proc_id.clone(), handle.clone());
2931 }
2932 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
2933 {
2934 let guard = handle.child.lock().expect("child mutex");
2935 assert!(
2936 guard.is_none(),
2937 "expected Child to be taken by exit monitor"
2938 );
2939 }
2940 #[cfg(unix)]
2942 unsafe {
2950 libc::kill(pid, libc::SIGKILL);
2951 }
2952
2953 let st = handle.wait_inner().await;
2954 match st {
2955 ProcStatus::Killed { signal, .. } => assert_eq!(signal, libc::SIGKILL),
2956 other => panic!("expected Killed(SIGKILL), got {other:?}"),
2957 }
2958 }
2959
2960 #[tokio::test]
2961 async fn watch_notifies_on_status_changes() {
2962 let child = Command::new("sh")
2963 .arg("-c")
2964 .arg("sleep 0.1")
2965 .stdin(std::process::Stdio::null())
2966 .stdout(std::process::Stdio::null())
2967 .stderr(std::process::Stdio::null())
2968 .spawn()
2969 .expect("spawn");
2970
2971 let proc_id = ProcId::Ranked(WorldId("test".into()), 1);
2972 let handle = BootstrapProcHandle::new(proc_id, child);
2973 let mut rx = handle.watch();
2974
2975 let pid = handle.pid().unwrap_or(0);
2977 let now = RealClock.system_time_now();
2978 assert!(handle.mark_running(pid, now));
2979 rx.changed().await.ok(); match &*rx.borrow() {
2981 ProcStatus::Running { pid: p, started_at } => {
2982 assert_eq!(*p, pid);
2983 assert_eq!(*started_at, now);
2984 }
2985 s => panic!("expected Running, got {s:?}"),
2986 }
2987
2988 assert!(handle.mark_stopped(0, Vec::new()));
2990 rx.changed().await.ok(); assert!(matches!(
2992 &*rx.borrow(),
2993 ProcStatus::Stopped { exit_code: 0, .. }
2994 ));
2995 }
2996
2997 #[tokio::test]
2998 async fn ready_errs_if_process_exits_before_running() {
2999 let child = Command::new("sh")
3001 .arg("-c")
3002 .arg("exit 7")
3003 .stdin(std::process::Stdio::null())
3004 .stdout(std::process::Stdio::null())
3005 .stderr(std::process::Stdio::null())
3006 .spawn()
3007 .expect("spawn");
3008
3009 let proc_id = ProcId::Direct(
3010 ChannelAddr::any(ChannelTransport::Unix),
3011 "early-exit".into(),
3012 );
3013 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3014
3015 assert!(handle.mark_stopped(7, Vec::new()));
3018
3019 match handle.ready_inner().await {
3021 Ok(()) => panic!("ready() unexpectedly succeeded"),
3022 Err(ReadyError::Terminal(ProcStatus::Stopped { exit_code, .. })) => {
3023 assert_eq!(exit_code, 7)
3024 }
3025 Err(other) => panic!("expected Stopped(7), got {other:?}"),
3026 }
3027 }
3028
3029 #[tokio::test]
3030 async fn status_unknown_proc_is_none() {
3031 let manager = BootstrapProcManager::new(BootstrapCommand {
3032 program: PathBuf::from("/bin/true"),
3033 ..Default::default()
3034 })
3035 .unwrap();
3036 let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
3037 assert!(manager.status(&unknown).await.is_none());
3038 }
3039
3040 #[tokio::test]
3041 async fn exit_monitor_child_already_taken_leaves_status_unchanged() {
3042 let manager = BootstrapProcManager::new(BootstrapCommand {
3043 program: PathBuf::from("/bin/sleep"),
3044 ..Default::default()
3045 })
3046 .unwrap();
3047
3048 let mut cmd = Command::new("/bin/sleep");
3050 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3051 let child = cmd.spawn().expect("spawn sleep");
3052
3053 let proc_id = ProcId::Direct(
3054 ChannelAddr::any(ChannelTransport::Unix),
3055 "already-taken".into(),
3056 );
3057 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3058
3059 {
3062 let mut children = manager.children.lock().await;
3063 children.insert(proc_id.clone(), handle.clone());
3064 }
3065 {
3066 let _ = handle.child.lock().expect("child mutex").take();
3067 }
3068
3069 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3071
3072 assert!(matches!(
3075 manager.status(&proc_id).await,
3076 Some(ProcStatus::Starting)
3077 ));
3078 }
3079
3080 #[tokio::test]
3081 async fn pid_none_after_exit_monitor_takes_child() {
3082 let manager = BootstrapProcManager::new(BootstrapCommand {
3083 program: PathBuf::from("/bin/sleep"),
3084 ..Default::default()
3085 })
3086 .unwrap();
3087
3088 let mut cmd = Command::new("/bin/sleep");
3089 cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
3090 let child = cmd.spawn().expect("spawn sleep");
3091
3092 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "pid-none".into());
3093 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3094
3095 assert!(handle.pid().is_some());
3097 {
3098 let mut children = manager.children.lock().await;
3099 children.insert(proc_id.clone(), handle.clone());
3100 }
3101 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3104
3105 assert!(handle.pid().is_none());
3108 }
3109
3110 #[tokio::test]
3111 async fn starting_may_directly_be_marked_stopped() {
3112 let child = Command::new("sh")
3115 .arg("-c")
3116 .arg("exit 0")
3117 .stdin(std::process::Stdio::null())
3118 .stdout(std::process::Stdio::null())
3119 .stderr(std::process::Stdio::null())
3120 .spawn()
3121 .expect("spawn true");
3122
3123 let proc_id = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "fast-exit".into());
3124 let handle = BootstrapProcHandle::new(proc_id, child);
3125
3126 let mut c = {
3128 let mut guard = handle.child.lock().expect("child mutex");
3129 guard.take()
3130 }
3131 .expect("child already taken");
3132
3133 let status = c.wait().await.expect("wait");
3134 let code = status.code().unwrap_or(0);
3135 assert!(handle.mark_stopped(code, Vec::new()));
3136
3137 assert!(matches!(
3138 handle.status(),
3139 ProcStatus::Stopped { exit_code: 0, .. }
3140 ));
3141 }
3142
3143 #[tokio::test]
3144 async fn handle_ready_allows_waiters() {
3145 let child = Command::new("sh")
3146 .arg("-c")
3147 .arg("sleep 0.1")
3148 .stdin(std::process::Stdio::null())
3149 .stdout(std::process::Stdio::null())
3150 .stderr(std::process::Stdio::null())
3151 .spawn()
3152 .expect("spawn sleep");
3153 let proc_id = ProcId::Ranked(WorldId("test".into()), 42);
3154 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3155
3156 let pid = handle.pid().expect("child should have a pid");
3157 let started_at = RealClock.system_time_now();
3158 assert!(handle.mark_running(pid, started_at));
3159
3160 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3161 let agent_ref: ActorRef<ProcMeshAgent> = ActorRef::attest(actor_id);
3162
3163 let ready_addr = any_addr_for_test();
3166
3167 assert!(handle.mark_ready(pid, started_at, ready_addr.clone(), agent_ref));
3169 handle
3170 .ready_inner()
3171 .await
3172 .expect("ready_inner() should complete after Ready");
3173
3174 match handle.status() {
3177 ProcStatus::Ready {
3178 pid: p,
3179 started_at: t,
3180 addr: a,
3181 ..
3182 } => {
3183 assert_eq!(p, pid);
3184 assert_eq!(t, started_at);
3185 assert_eq!(a, ready_addr);
3186 }
3187 other => panic!("expected Ready, got {other:?}"),
3188 }
3189 }
3190
3191 #[tokio::test]
3192 async fn pid_behavior_across_states_running_ready_then_stopped() {
3193 let child = Command::new("sh")
3194 .arg("-c")
3195 .arg("sleep 0.1")
3196 .stdin(std::process::Stdio::null())
3197 .stdout(std::process::Stdio::null())
3198 .stderr(std::process::Stdio::null())
3199 .spawn()
3200 .expect("spawn");
3201
3202 let proc_id = ProcId::Ranked(WorldId("test".into()), 0);
3203 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3204
3205 let pid = handle.pid().expect("initial Child::id");
3207 let t0 = RealClock.system_time_now();
3208 assert!(handle.mark_running(pid, t0));
3209 assert_eq!(handle.pid(), Some(pid));
3210
3211 let addr = any_addr_for_test();
3213 let agent = {
3214 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3215 ActorRef::<ProcMeshAgent>::attest(actor_id)
3216 };
3217 assert!(handle.mark_ready(pid, t0, addr, agent));
3218 {
3219 let _ = handle.child.lock().expect("child mutex").take();
3220 }
3221 assert_eq!(handle.pid(), Some(pid));
3222
3223 assert!(handle.mark_stopped(0, Vec::new()));
3225 assert_eq!(handle.pid(), None, "pid() should be None once terminal");
3226 }
3227
3228 #[tokio::test]
3229 async fn pid_is_available_in_ready_even_after_child_taken() {
3230 let child = Command::new("sh")
3231 .arg("-c")
3232 .arg("sleep 0.1")
3233 .stdin(std::process::Stdio::null())
3234 .stdout(std::process::Stdio::null())
3235 .stderr(std::process::Stdio::null())
3236 .spawn()
3237 .expect("spawn");
3238
3239 let proc_id = ProcId::Ranked(WorldId("test".into()), 99);
3240 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3241
3242 let pid = handle.pid().expect("child should have pid (via Child::id)");
3244 let started_at = RealClock.system_time_now();
3245 assert!(handle.mark_running(pid, started_at));
3246
3247 let addr = any_addr_for_test();
3249 let agent = {
3250 let actor_id = ActorId(proc_id.clone(), "agent".into(), 0);
3251 ActorRef::<ProcMeshAgent>::attest(actor_id)
3252 };
3253 assert!(handle.mark_ready(pid, started_at, addr, agent));
3254
3255 {
3258 let _ = handle.child.lock().expect("child mutex").take();
3259 }
3260
3261 assert_eq!(handle.pid(), Some(pid), "pid() should be cached in Ready");
3264 }
3265
3266 #[test]
3267 fn display_running_includes_pid_and_uptime() {
3268 let started_at = RealClock.system_time_now() - Duration::from_secs(42);
3269 let st = ProcStatus::Running {
3270 pid: 1234,
3271 started_at,
3272 };
3273
3274 let s = format!("{}", st);
3275 assert!(s.contains("1234"));
3276 assert!(s.contains("Running"));
3277 assert!(s.contains("42s"));
3278 }
3279
3280 #[test]
3281 fn display_ready_includes_pid_and_addr() {
3282 let started_at = RealClock.system_time_now() - Duration::from_secs(5);
3283 let addr = ChannelAddr::any(ChannelTransport::Unix);
3284 let agent =
3285 ActorRef::attest(ProcId::Direct(addr.clone(), "proc".into()).actor_id("agent", 0));
3286
3287 let st = ProcStatus::Ready {
3288 pid: 4321,
3289 started_at,
3290 addr: addr.clone(),
3291 agent,
3292 };
3293
3294 let s = format!("{}", st);
3295 assert!(s.contains("4321")); assert!(s.contains(&addr.to_string())); assert!(s.contains("Ready"));
3298 }
3299
3300 #[test]
3301 fn display_stopped_includes_exit_code() {
3302 let st = ProcStatus::Stopped {
3303 exit_code: 7,
3304 stderr_tail: Vec::new(),
3305 };
3306 let s = format!("{}", st);
3307 assert!(s.contains("Stopped"));
3308 assert!(s.contains("7"));
3309 }
3310
3311 #[test]
3312 fn display_other_variants_does_not_panic() {
3313 let samples = vec![
3314 ProcStatus::Starting,
3315 ProcStatus::Stopping {
3316 pid: 42,
3317 started_at: RealClock.system_time_now(),
3318 },
3319 ProcStatus::Ready {
3320 pid: 42,
3321 started_at: RealClock.system_time_now(),
3322 addr: ChannelAddr::any(ChannelTransport::Unix),
3323 agent: ActorRef::attest(
3324 ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "x".into())
3325 .actor_id("agent", 0),
3326 ),
3327 },
3328 ProcStatus::Killed {
3329 signal: 9,
3330 core_dumped: false,
3331 },
3332 ProcStatus::Failed {
3333 reason: "boom".into(),
3334 },
3335 ];
3336
3337 for st in samples {
3338 let _ = format!("{}", st); }
3340 }
3341
3342 #[tokio::test]
3343 async fn proc_handle_ready_ok_through_trait() {
3344 let child = Command::new("sh")
3345 .arg("-c")
3346 .arg("sleep 0.1")
3347 .stdin(Stdio::null())
3348 .stdout(Stdio::null())
3349 .stderr(Stdio::null())
3350 .spawn()
3351 .expect("spawn");
3352
3353 let proc_id = ProcId::Direct(any_addr_for_test(), "ph-ready-ok".into());
3354 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3355
3356 let pid = handle.pid().expect("pid");
3358 let t0 = RealClock.system_time_now();
3359 assert!(handle.mark_running(pid, t0));
3360
3361 let addr = any_addr_for_test();
3363 let agent: ActorRef<ProcMeshAgent> =
3364 ActorRef::attest(ActorId(proc_id.clone(), "agent".into(), 0));
3365 assert!(handle.mark_ready(pid, t0, addr, agent));
3366
3367 let r = <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await;
3369 assert!(r.is_ok(), "expected Ok(()), got {r:?}");
3370 }
3371
3372 #[tokio::test]
3373 async fn proc_handle_wait_returns_terminal_status() {
3374 let child = Command::new("sh")
3375 .arg("-c")
3376 .arg("exit 0")
3377 .stdin(Stdio::null())
3378 .stdout(Stdio::null())
3379 .stderr(Stdio::null())
3380 .spawn()
3381 .expect("spawn");
3382
3383 let proc_id = ProcId::Direct(any_addr_for_test(), "ph-wait".into());
3384 let handle = BootstrapProcHandle::new(proc_id, child);
3385
3386 assert!(handle.mark_stopped(0, Vec::new()));
3388
3389 let st = <BootstrapProcHandle as hyperactor::host::ProcHandle>::wait(&handle)
3391 .await
3392 .expect("wait should return Ok(terminal)");
3393
3394 match st {
3395 ProcStatus::Stopped { exit_code, .. } => assert_eq!(exit_code, 0),
3396 other => panic!("expected Stopped(0), got {other:?}"),
3397 }
3398 }
3399
3400 #[tokio::test]
3401 async fn ready_wrapper_maps_terminal_to_trait_error() {
3402 let child = Command::new("sh")
3403 .arg("-c")
3404 .arg("exit 7")
3405 .stdin(Stdio::null())
3406 .stdout(Stdio::null())
3407 .stderr(Stdio::null())
3408 .spawn()
3409 .expect("spawn");
3410 let proc_id = ProcId::Direct(any_addr_for_test(), "wrap".into());
3411 let handle = BootstrapProcHandle::new(proc_id, child);
3412
3413 assert!(handle.mark_stopped(7, Vec::new()));
3414
3415 match <BootstrapProcHandle as hyperactor::host::ProcHandle>::ready(&handle).await {
3416 Ok(()) => panic!("expected Err"),
3417 Err(hyperactor::host::ReadyError::Terminal(ProcStatus::Stopped {
3418 exit_code, ..
3419 })) => {
3420 assert_eq!(exit_code, 7);
3421 }
3422 Err(e) => panic!("unexpected error: {e:?}"),
3423 }
3424 }
3425
3426 async fn make_proc_id_and_backend_addr(
3436 instance: &hyperactor::Instance<()>,
3437 _tag: &str,
3438 ) -> (ProcId, ChannelAddr) {
3439 let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
3442
3443 instance.proc().clone().serve(rx);
3447
3448 let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
3451 (proc_id, backend_addr)
3452 }
3453
3454 #[tokio::test]
3455 #[cfg(fbcode_build)]
3456 async fn bootstrap_handle_terminate_graceful() {
3457 let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3459 .await
3460 .unwrap();
3461 let (instance, _handle) = root.instance("client").unwrap();
3462
3463 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3464 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
3465 let handle = mgr
3466 .spawn(
3467 proc_id.clone(),
3468 backend_addr.clone(),
3469 BootstrapProcConfig {
3470 create_rank: 0,
3471 client_config_override: Attrs::new(),
3472 },
3473 )
3474 .await
3475 .expect("spawn bootstrap child");
3476
3477 handle.ready().await.expect("ready");
3478
3479 let deadline = Duration::from_secs(2);
3480 match RealClock
3481 .timeout(deadline * 2, handle.terminate(&instance, deadline))
3482 .await
3483 {
3484 Err(_) => panic!("terminate() future hung"),
3485 Ok(Ok(st)) => {
3486 match st {
3487 ProcStatus::Stopped { exit_code, .. } => {
3488 assert_eq!(exit_code, 0, "expected clean exit; got {exit_code}");
3490 }
3491 ProcStatus::Killed { signal, .. } => {
3492 assert_eq!(signal, libc::SIGTERM, "expected SIGTERM; got {signal}");
3510 }
3511 other => panic!("expected Stopped or Killed(SIGTERM); got {other:?}"),
3512 }
3513 }
3514 Ok(Err(e)) => panic!("terminate() failed: {e:?}"),
3515 }
3516 }
3517
3518 #[tokio::test]
3519 #[cfg(fbcode_build)]
3520 async fn bootstrap_handle_kill_forced() {
3521 let root = hyperactor::Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3523 .await
3524 .unwrap();
3525 let (instance, _handle) = root.instance("client").unwrap();
3526
3527 let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
3528
3529 let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
3531
3532 let handle = mgr
3534 .spawn(
3535 proc_id.clone(),
3536 backend_addr.clone(),
3537 BootstrapProcConfig {
3538 create_rank: 0,
3539 client_config_override: Attrs::new(),
3540 },
3541 )
3542 .await
3543 .expect("spawn bootstrap child");
3544
3545 handle.ready().await.expect("ready");
3548
3549 let deadline = Duration::from_secs(5);
3552 match RealClock.timeout(deadline, handle.kill()).await {
3553 Err(_) => panic!("kill() future hung"),
3554 Ok(Ok(st)) => {
3555 match st {
3557 ProcStatus::Killed { signal, .. } => {
3558 assert_eq!(signal, libc::SIGKILL, "expected SIGKILL; got {}", signal);
3560 }
3561 other => panic!("expected Killed status after kill(); got: {other:?}"),
3562 }
3563 }
3564 Ok(Err(e)) => panic!("kill() failed: {e:?}"),
3565 }
3566 }
3567
3568 #[tokio::test]
3569 #[cfg(fbcode_build)]
3570 async fn bootstrap_canonical_simple() {
3571 unsafe {
3573 std::env::set_var("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG", "false");
3574 }
3575 let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string())
3577 .await
3578 .unwrap();
3579 let (instance, _handle) = proc.instance("client").unwrap();
3582
3583 let mut allocator = ProcessAllocator::new(Command::new(crate::testresource::get(
3585 "monarch/hyperactor_mesh/bootstrap",
3586 )));
3587 let alloc = allocator
3589 .allocate(AllocSpec {
3590 extent: extent!(replicas = 1),
3591 constraints: Default::default(),
3592 proc_name: None,
3593 transport: ChannelTransport::Unix,
3594 proc_allocation_mode: Default::default(),
3595 })
3596 .await
3597 .unwrap();
3598
3599 let host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3626 .await
3627 .unwrap();
3628
3629 let proc_mesh = host_mesh
3644 .spawn(&instance, "p0", Extent::unity())
3645 .await
3646 .unwrap();
3647
3648 let actor_mesh: ActorMesh<testactor::TestActor> =
3664 proc_mesh.spawn(&instance, "a0", &()).await.unwrap();
3665
3666 let (port, mut rx) = instance.mailbox().open_port();
3672 actor_mesh
3673 .cast(&instance, testactor::GetActorId(port.bind()))
3674 .unwrap();
3675 let got_id = rx.recv().await.unwrap();
3676 assert_eq!(
3677 got_id,
3678 actor_mesh.values().next().unwrap().actor_id().clone()
3679 );
3680
3681 host_mesh.shutdown(&instance).await.expect("host shutdown");
3685 }
3686
3687 #[tokio::test]
3688 async fn exit_tail_is_attached_and_logged() {
3689 hyperactor_telemetry::initialize_logging_for_test();
3690
3691 let lock = hyperactor::config::global::lock();
3692 let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);
3693
3694 let mut cmd = Command::new("sh");
3696 cmd.arg("-c")
3697 .arg("printf 'boom-1\\nboom-2\\n' 1>&2; exit 7")
3698 .stdout(Stdio::piped())
3699 .stderr(Stdio::piped());
3700
3701 let child = cmd.spawn().expect("spawn");
3702 let pid = child.id().unwrap_or_default();
3703
3704 let proc_id = hyperactor::id!(testproc[0]);
3707 let handle = BootstrapProcHandle::new(proc_id.clone(), child);
3708
3709 {
3712 let mut guard = handle.child.lock().expect("child mutex poisoned");
3714 if let Some(child) = guard.as_mut() {
3715 let out = child.stdout.take().expect("child stdout must be piped");
3716 let err = child.stderr.take().expect("child stderr must be piped");
3717
3718 let log_channel = ChannelAddr::any(ChannelTransport::Unix);
3720 let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
3721
3722 let stdout_monitor = StreamFwder::start(
3724 out,
3725 None,
3726 OutputTarget::Stdout,
3727 tail_size,
3728 Some(log_channel.clone()),
3729 pid,
3730 0,
3731 );
3732
3733 let stderr_monitor = StreamFwder::start(
3734 err,
3735 None,
3736 OutputTarget::Stderr,
3737 tail_size,
3738 Some(log_channel.clone()),
3739 pid,
3740 0,
3741 );
3742
3743 handle.set_stream_monitors(Some(stdout_monitor), Some(stderr_monitor));
3745 tracing::info!("set stream monitors");
3746 } else {
3747 panic!("child already taken before wiring tailers");
3748 }
3749 }
3750
3751 let manager = BootstrapProcManager::new(BootstrapCommand {
3754 program: std::path::PathBuf::from("/bin/true"), ..Default::default()
3756 })
3757 .unwrap();
3758
3759 RealClock.sleep(Duration::from_millis(1000)).await;
3761
3762 manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
3764
3765 let st = RealClock
3768 .timeout(Duration::from_secs(2), handle.wait_inner())
3769 .await
3770 .expect("wait_inner() timed out (exit monitor stuck?)");
3771
3772 match st {
3773 ProcStatus::Stopped {
3774 exit_code,
3775 stderr_tail,
3776 } => {
3777 assert_eq!(
3778 exit_code, 7,
3779 "unexpected exit code; stderr_tail={:?}",
3780 stderr_tail
3781 );
3782 let tail = stderr_tail.join("\n");
3783 assert!(tail.contains("boom-1"), "missing boom-1; tail:\n{tail}");
3784 assert!(tail.contains("boom-2"), "missing boom-2; tail:\n{tail}");
3785 }
3786 other => panic!("expected Stopped(7), got {other:?}"),
3787 }
3788 }
3789}