hyperactor_mesh/host_mesh/
host_agent.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The mesh agent actor that manages a host.
10
11// EnumAsInner generates code that triggers a false positive
12// unused_assignments lint on struct variant fields. #[allow] on the
13// enum itself doesn't propagate into derive-macro-generated code, so
14// the suppression must be at module scope.
15#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fmt;
20use std::pin::Pin;
21use std::sync::OnceLock;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use hyperactor::Actor;
26use hyperactor::ActorHandle;
27use hyperactor::Context;
28use hyperactor::HandleClient;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::PortHandle;
32use hyperactor::Proc;
33use hyperactor::RefClient;
34use hyperactor::channel::ChannelTransport;
35use hyperactor::context;
36use hyperactor::host::Host;
37use hyperactor::host::HostError;
38use hyperactor::host::LOCAL_PROC_NAME;
39use hyperactor::host::LocalProcManager;
40use hyperactor::host::SERVICE_PROC_NAME;
41use hyperactor::host::SingleTerminate;
42use hyperactor::mailbox::MailboxServerHandle;
43use hyperactor::reference as hyperactor_reference;
44use hyperactor_config::Flattrs;
45use hyperactor_config::attrs::Attrs;
46use serde::Deserialize;
47use serde::Serialize;
48use tokio::time::Duration;
49use typeuri::Named;
50
51use crate::Name;
52use crate::bootstrap;
53use crate::bootstrap::BootstrapCommand;
54use crate::bootstrap::BootstrapProcConfig;
55use crate::bootstrap::BootstrapProcManager;
56use crate::config_dump::ConfigDump;
57use crate::config_dump::ConfigDumpResult;
58use crate::proc_agent::ProcAgent;
59use crate::pyspy::PySpyDump;
60use crate::pyspy::PySpyWorker;
61use crate::resource;
62use crate::resource::ProcSpec;
63
64pub(crate) type ProcManagerSpawnFuture =
65    Pin<Box<dyn Future<Output = anyhow::Result<ActorHandle<ProcAgent>>> + Send>>;
66pub(crate) type ProcManagerSpawnFn = Box<dyn Fn(Proc) -> ProcManagerSpawnFuture + Send + Sync>;
67
68/// Represents the different ways a [`Host`] can be managed by an agent.
69///
70/// A host can either:
71/// - [`Process`] — a host running as an external OS process, managed by
72///   [`BootstrapProcManager`].
73/// - [`Local`] — a host running in-process, managed by
74///   [`LocalProcManager`] with a custom spawn function.
75///
76/// This abstraction lets the same `HostAgent` work across both
77/// out-of-process and in-process execution modes.
78#[derive(EnumAsInner)]
79pub enum HostAgentMode {
80    Process {
81        host: Host<BootstrapProcManager>,
82        /// If set, the ShutdownHost handler sends the frontend mailbox server
83        /// handle back to the bootstrap loop via this channel once shutdown is
84        /// complete, so the caller can drain it and exit.
85        shutdown_tx: Option<tokio::sync::oneshot::Sender<MailboxServerHandle>>,
86    },
87    Local(Host<LocalProcManager<ProcManagerSpawnFn>>),
88}
89
90impl HostAgentMode {
91    pub(crate) fn addr(&self) -> &hyperactor::channel::ChannelAddr {
92        #[allow(clippy::match_same_arms)]
93        match self {
94            HostAgentMode::Process { host, .. } => host.addr(),
95            HostAgentMode::Local(host) => host.addr(),
96        }
97    }
98
99    pub(crate) fn system_proc(&self) -> &Proc {
100        #[allow(clippy::match_same_arms)]
101        match self {
102            HostAgentMode::Process { host, .. } => host.system_proc(),
103            HostAgentMode::Local(host) => host.system_proc(),
104        }
105    }
106
107    pub(crate) fn local_proc(&self) -> &Proc {
108        #[allow(clippy::match_same_arms)]
109        match self {
110            HostAgentMode::Process { host, .. } => host.local_proc(),
111            HostAgentMode::Local(host) => host.local_proc(),
112        }
113    }
114
115    /// Non-blocking stop: send the stop signal and spawn a background
116    /// task for cleanup. Returns immediately without blocking the
117    /// actor.
118    async fn request_stop(
119        &self,
120        cx: &impl context::Actor,
121        proc: &hyperactor_reference::ProcId,
122        timeout: Duration,
123        reason: &str,
124    ) {
125        match self {
126            HostAgentMode::Process { host, .. } => {
127                host.manager().request_stop(cx, proc, timeout, reason).await;
128            }
129            HostAgentMode::Local(host) => {
130                host.manager().request_stop(proc, timeout, reason).await;
131            }
132        }
133    }
134
135    /// Query a proc's lifecycle state, returning both the coarse
136    /// `resource::Status` used by the resource protocol and the
137    /// detailed `bootstrap::ProcStatus` (when available) for callers
138    /// that need process-level detail such as PIDs or exit codes.
139    async fn proc_status(
140        &self,
141        proc_id: &hyperactor_reference::ProcId,
142    ) -> (resource::Status, Option<bootstrap::ProcStatus>) {
143        match self {
144            HostAgentMode::Process { host, .. } => match host.manager().status(proc_id).await {
145                Some(proc_status) => (proc_status.clone().into(), Some(proc_status)),
146                None => (resource::Status::Unknown, None),
147            },
148            HostAgentMode::Local(host) => {
149                let status = match host.manager().local_proc_status(proc_id).await {
150                    Some(hyperactor::host::LocalProcStatus::Stopping) => resource::Status::Stopping,
151                    Some(hyperactor::host::LocalProcStatus::Stopped) => resource::Status::Stopped,
152                    None => resource::Status::Running,
153                };
154                (status, None)
155            }
156        }
157    }
158
159    /// The bootstrap command used by the process manager, if any.
160    fn bootstrap_command(&self) -> Option<BootstrapCommand> {
161        match self {
162            HostAgentMode::Process { host, .. } => Some(host.manager().command().clone()),
163            HostAgentMode::Local(_) => None,
164        }
165    }
166}
167
168#[derive(Debug)]
169pub(crate) struct ProcCreationState {
170    pub(crate) rank: usize,
171    pub(crate) host_mesh_name: Option<crate::Name>,
172    pub(crate) created: Result<
173        (
174            hyperactor_reference::ProcId,
175            hyperactor_reference::ActorRef<ProcAgent>,
176        ),
177        HostError,
178    >,
179}
180
181/// Actor name used when spawning the host mesh agent on the system proc.
182pub const HOST_MESH_AGENT_ACTOR_NAME: &str = "host_agent";
183
184/// Lifecycle state of the host managed by [`HostAgent`].
185enum HostAgentState {
186    /// Waiting for a client to attach. The host is idle and ready
187    /// to accept new proc spawn requests.
188    Detached(HostAgentMode),
189    /// Actively running procs for an attached client.
190    Attached(HostAgentMode),
191    /// Procs are being drained by a DrainWorker. The host has been
192    /// temporarily moved to the worker. The host agent remains
193    /// responsive; min_proc_status() returns Stopping.
194    Draining,
195    /// Host fully shut down.
196    Shutdown,
197}
198
199/// A mesh agent is responsible for managing a host in a [`HostMesh`],
200/// through the resource behaviors defined in [`crate::resource`].
201/// Self-notification sent by bridge tasks when a proc's status changes.
202/// Not exported or registered — only used internally via `PortHandle`.
203#[derive(Debug, Serialize, Deserialize, Named)]
204struct ProcStatusChanged {
205    name: Name,
206}
207
208/// Sent by DrainWorker back to HostAgent when draining completes.
209/// Not exported — delivered locally via PortHandle (no serialization).
210struct DrainComplete {
211    host: HostAgentMode,
212    ack: hyperactor_reference::PortRef<()>,
213}
214
215/// Child actor whose only job is to run `host.terminate_children()` in
216/// its `init()`, return the host and ack to the parent via DrainComplete,
217/// and exit. Runs on the same proc as the host agent so it gets its
218/// own `Instance` (required by `terminate_children`).
219#[hyperactor::export(handlers = [])]
220struct DrainWorker {
221    host: Option<HostAgentMode>,
222    timeout: Duration,
223    max_in_flight: usize,
224    ack: Option<hyperactor_reference::PortRef<()>>,
225    done_notify: PortHandle<DrainComplete>,
226}
227
228#[async_trait]
229impl Actor for DrainWorker {
230    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
231        if let Some(host) = self.host.as_mut() {
232            match host {
233                HostAgentMode::Process { host, .. } => {
234                    host.terminate_children(
235                        this,
236                        self.timeout,
237                        self.max_in_flight.clamp(1, 256),
238                        "drain host",
239                    )
240                    .await;
241                }
242                HostAgentMode::Local(host) => {
243                    host.terminate_children(this, self.timeout, self.max_in_flight, "drain host")
244                        .await;
245                }
246            }
247        }
248
249        // Bundle host + ack into DrainComplete so the parent sends the ack
250        // AFTER restoring state (prevents race with ShutdownHost).
251        if let (Some(host), Some(ack)) = (self.host.take(), self.ack.take()) {
252            let _ = self.done_notify.send(this, DrainComplete { host, ack });
253        }
254
255        Ok(())
256    }
257}
258
259impl fmt::Debug for DrainWorker {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        f.debug_struct("DrainWorker")
262            .field("timeout", &self.timeout)
263            .field("max_in_flight", &self.max_in_flight)
264            .finish()
265    }
266}
267
268#[hyperactor::export(
269    handlers=[
270        resource::CreateOrUpdate<ProcSpec>,
271        resource::Stop,
272        resource::GetState<ProcState>,
273        resource::GetRankStatus { cast = true },
274        resource::WaitRankStatus { cast = true },
275        resource::List,
276        ShutdownHost,
277        DrainHost,
278        SetClientConfig,
279        ProcStatusChanged,
280        PySpyDump,
281        ConfigDump,
282    ]
283)]
284pub struct HostAgent {
285    state: HostAgentState,
286    pub(crate) created: HashMap<Name, ProcCreationState>,
287    /// Pending `WaitRankStatus` waiters, keyed by resource name.
288    /// Each entry is `(min_status, rank, reply_port)`. Only touched
289    /// from `&mut self` handlers.
290    pending_proc_waiters: HashMap<
291        Name,
292        Vec<(
293            resource::Status,
294            usize,
295            hyperactor_reference::PortRef<crate::StatusOverlay>,
296        )>,
297    >,
298    /// Procs that already have an active bridge task watching their status.
299    watching: HashSet<Name>,
300    /// Port handle for sending `ProcStatusChanged` to self. Set in `init()`.
301    proc_status_port: Option<PortHandle<ProcStatusChanged>>,
302    /// Lazily initialized ProcAgent on the host's local proc.
303    /// Boots on first [`GetLocalProc`] (LP-1 — see
304    /// `hyperactor::host::LOCAL_PROC_NAME`).
305    local_mesh_agent: OnceLock<anyhow::Result<ActorHandle<ProcAgent>>>,
306    /// Handle to the host's frontend mailbox server, set during `init` after
307    /// `this.bind::<Self>()` ensures the actor port is registered before the
308    /// mailbox starts routing messages. Sent back to the bootstrap loop via
309    /// `shutdown_tx` when the host shuts down so the caller can
310    /// drain it.
311    mailbox_handle: Option<MailboxServerHandle>,
312}
313
314impl HostAgent {
315    /// Create a new host mesh agent running in the provided mode.
316    pub fn new(host: HostAgentMode) -> Self {
317        Self {
318            state: HostAgentState::Detached(host),
319            created: HashMap::new(),
320            pending_proc_waiters: HashMap::new(),
321            watching: HashSet::new(),
322            proc_status_port: None,
323            local_mesh_agent: OnceLock::new(),
324            mailbox_handle: None,
325        }
326    }
327
328    /// Minimum status floor derived from the host agent's lifecycle.
329    /// Procs on this host cannot be healthier than this.
330    fn min_proc_status(&self) -> resource::Status {
331        match &self.state {
332            HostAgentState::Detached(_) | HostAgentState::Attached(_) => resource::Status::Running,
333            HostAgentState::Draining => resource::Status::Stopping,
334            HostAgentState::Shutdown => resource::Status::Stopped,
335        }
336    }
337
338    fn host(&self) -> Option<&HostAgentMode> {
339        match &self.state {
340            HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
341            _ => None,
342        }
343    }
344
345    fn host_mut(&mut self) -> Option<&mut HostAgentMode> {
346        match &mut self.state {
347            HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
348            _ => None,
349        }
350    }
351
352    /// Terminate all tracked children on the host and clear proc state.
353    ///
354    /// The host, system proc, mailbox server, and HostAgent all stay
355    /// alive — only user procs are killed. After this returns the host
356    /// is ready to accept new spawn requests with the same proc names.
357    async fn drain(
358        &mut self,
359        cx: &Context<'_, Self>,
360        timeout: std::time::Duration,
361        max_in_flight: usize,
362    ) {
363        if let Some(host_mode) = self.host_mut() {
364            match host_mode {
365                HostAgentMode::Process { host, .. } => {
366                    let summary = host
367                        .terminate_children(cx, timeout, max_in_flight.clamp(1, 256), "stop host")
368                        .await;
369                    tracing::info!(?summary, "terminated children on host");
370                }
371                HostAgentMode::Local(host) => {
372                    let summary = host
373                        .terminate_children(cx, timeout, max_in_flight, "stop host")
374                        .await;
375                    tracing::info!(?summary, "terminated children on local host");
376                }
377            }
378        }
379        self.created.clear();
380    }
381
382    /// Selectively stop procs belonging to a specific host mesh.
383    /// Only procs whose `host_mesh_name` matches `filter` are stopped;
384    /// all other procs are left running.
385    async fn drain_by_mesh_name(
386        &mut self,
387        cx: &Context<'_, Self>,
388        timeout: std::time::Duration,
389        filter: Option<&crate::Name>,
390    ) {
391        let matching_names: Vec<crate::Name> = self
392            .created
393            .iter()
394            .filter(|(_, state)| state.host_mesh_name.as_ref() == filter)
395            .map(|(name, _)| name.clone())
396            .collect();
397
398        if let Some(host_mode) = self.host() {
399            for name in &matching_names {
400                if let Some(ProcCreationState {
401                    created: Ok((proc_id, _)),
402                    ..
403                }) = self.created.get(name)
404                {
405                    match host_mode {
406                        HostAgentMode::Process { host, .. } => {
407                            let _ = host
408                                .terminate_proc(cx, proc_id, timeout, "selective drain")
409                                .await;
410                        }
411                        HostAgentMode::Local(host) => {
412                            let _ = host
413                                .terminate_proc(cx, proc_id, timeout, "selective drain")
414                                .await;
415                        }
416                    }
417                }
418            }
419        }
420
421        // Remove drained entries.
422        for name in &matching_names {
423            self.created.remove(name);
424        }
425
426        tracing::info!(
427            count = matching_names.len(),
428            filter = ?filter,
429            "selectively drained procs",
430        );
431    }
432
433    /// Publish the current host properties and child list for
434    /// introspection. Called from init and after each state change
435    /// (proc created/stopped).
436    fn publish_introspect_properties(&self, cx: &Instance<Self>) {
437        let host = match self.host() {
438            Some(h) => h,
439            None => return, // host shut down or stopping
440        };
441
442        let addr = host.addr().to_string();
443        let mut children: Vec<hyperactor::introspect::IntrospectRef> = Vec::new();
444        let system_children: Vec<crate::introspect::NodeRef> = Vec::new(); // LC-2
445
446        // Procs are not system — only actors are. Both service and
447        // local appear as regular children; 's' in the TUI toggles
448        // actor visibility, not proc visibility.
449        children.push(hyperactor::introspect::IntrospectRef::Proc(
450            host.system_proc().proc_id().clone(),
451        ));
452        children.push(hyperactor::introspect::IntrospectRef::Proc(
453            host.local_proc().proc_id().clone(),
454        ));
455
456        // User procs.
457        for state in self.created.values() {
458            if let Ok((proc_id, _agent_ref)) = &state.created {
459                children.push(hyperactor::introspect::IntrospectRef::Proc(proc_id.clone()));
460            }
461        }
462
463        let num_procs = children.len();
464
465        let mut attrs = hyperactor_config::Attrs::new();
466        attrs.set(crate::introspect::NODE_TYPE, "host".to_string());
467        attrs.set(crate::introspect::ADDR, addr);
468        attrs.set(crate::introspect::NUM_PROCS, num_procs);
469        attrs.set(hyperactor::introspect::CHILDREN, children);
470        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
471        cx.publish_attrs(attrs);
472    }
473}
474
475#[async_trait]
476impl Actor for HostAgent {
477    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
478        // Serve the host now that the agent is initialized. Make sure our port is
479        // bound before serving.
480        this.bind::<Self>();
481        match self.host_mut().unwrap() {
482            HostAgentMode::Process { host, .. } => {
483                self.mailbox_handle = host.serve();
484                let (directory, file) = hyperactor_telemetry::log_file_path(
485                    hyperactor_telemetry::env::Env::current(),
486                    None,
487                )
488                .unwrap();
489                eprintln!(
490                    "Monarch internal logs are being written to {}/{}.log; execution id {}",
491                    directory,
492                    file,
493                    hyperactor_telemetry::env::execution_id(),
494                );
495            }
496            HostAgentMode::Local(host) => {
497                host.serve();
498            }
499        };
500        this.set_system();
501        self.publish_introspect_properties(this);
502
503        // Register callback for QueryChild — resolves system procs
504        // that are not independently addressable actors.
505        let host = self.host().expect("host present");
506        let system_proc = host.system_proc().clone();
507        let local_proc = host.local_proc().clone();
508        let self_id = this.self_id().clone();
509        this.set_query_child_handler(move |child_ref| {
510            use hyperactor::introspect::IntrospectResult;
511
512            let proc = match child_ref {
513                hyperactor::reference::Reference::Proc(proc_id) => {
514                    if *proc_id == *system_proc.proc_id() {
515                        Some((&system_proc, SERVICE_PROC_NAME))
516                    } else if *proc_id == *local_proc.proc_id() {
517                        Some((&local_proc, LOCAL_PROC_NAME))
518                    } else {
519                        None
520                    }
521                }
522                _ => None,
523            };
524
525            match proc {
526                Some((proc, label)) => {
527                    // Use all_instance_keys() instead of
528                    // all_actor_ids() to avoid holding DashMap shard
529                    // read locks while doing Weak::upgrade() +
530                    // watch::borrow() + is_terminal() per entry.
531                    // Under rapid actor churn the per-entry work in
532                    // all_actor_ids() causes convoy starvation with
533                    // concurrent insert/remove operations, stalling
534                    // the spawn/exit path. all_instance_keys() just
535                    // clones keys — microseconds per shard. The
536                    // is_system check uses individual point lookups
537                    // outside the iteration. Stale keys (terminal
538                    // actors) may appear but are harmless — the TUI
539                    // handles "not found" gracefully.
540                    let all_keys = proc.all_instance_keys();
541                    let mut actors: Vec<hyperactor::introspect::IntrospectRef> =
542                        Vec::with_capacity(all_keys.len());
543                    let mut system_actors: Vec<crate::introspect::NodeRef> = Vec::new();
544                    for id in all_keys {
545                        if proc.get_instance(&id).is_some_and(|cell| cell.is_system()) {
546                            system_actors.push(crate::introspect::NodeRef::Actor(id.clone()));
547                        }
548                        actors.push(hyperactor::introspect::IntrospectRef::Actor(id));
549                    }
550                    let mut attrs = hyperactor_config::Attrs::new();
551                    attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
552                    attrs.set(crate::introspect::PROC_NAME, label.to_string());
553                    attrs.set(crate::introspect::NUM_ACTORS, actors.len());
554                    attrs.set(crate::introspect::SYSTEM_CHILDREN, system_actors.clone());
555                    let attrs_json =
556                        serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
557
558                    IntrospectResult {
559                        identity: hyperactor::introspect::IntrospectRef::Proc(
560                            proc.proc_id().clone(),
561                        ),
562                        attrs: attrs_json,
563                        children: actors,
564                        parent: Some(hyperactor::introspect::IntrospectRef::Actor(
565                            self_id.clone(),
566                        )),
567                        as_of: std::time::SystemTime::now(),
568                    }
569                }
570                None => {
571                    let mut error_attrs = hyperactor_config::Attrs::new();
572                    error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
573                    error_attrs.set(
574                        hyperactor::introspect::ERROR_MESSAGE,
575                        format!("child {} not found", child_ref),
576                    );
577                    let identity = match child_ref {
578                        hyperactor::reference::Reference::Proc(id) => {
579                            hyperactor::introspect::IntrospectRef::Proc(id.clone())
580                        }
581                        hyperactor::reference::Reference::Actor(id) => {
582                            hyperactor::introspect::IntrospectRef::Actor(id.clone())
583                        }
584                        hyperactor::reference::Reference::Port(id) => {
585                            hyperactor::introspect::IntrospectRef::Actor(id.actor_id().clone())
586                        }
587                    };
588                    IntrospectResult {
589                        identity,
590                        attrs: serde_json::to_string(&error_attrs)
591                            .unwrap_or_else(|_| "{}".to_string()),
592                        children: Vec::new(),
593                        parent: None,
594                        as_of: std::time::SystemTime::now(),
595                    }
596                }
597            }
598        });
599
600        self.proc_status_port = Some(this.port::<ProcStatusChanged>());
601
602        Ok(())
603    }
604}
605
606impl fmt::Debug for HostAgent {
607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608        f.debug_struct("HostAgent")
609            .field("host", &"..")
610            .field("created", &self.created)
611            .finish()
612    }
613}
614
615#[async_trait]
616impl Handler<resource::CreateOrUpdate<ProcSpec>> for HostAgent {
617    #[tracing::instrument("HostAgent::CreateOrUpdate", level = "info", skip_all, fields(name=%create_or_update.name))]
618    async fn handle(
619        &mut self,
620        cx: &Context<Self>,
621        create_or_update: resource::CreateOrUpdate<ProcSpec>,
622    ) -> anyhow::Result<()> {
623        if self.created.contains_key(&create_or_update.name) {
624            // Already created: there is no update.
625            return Ok(());
626        }
627
628        let host = match self.host_mut() {
629            Some(h) => h,
630            None => {
631                tracing::warn!(
632                    name = %create_or_update.name,
633                    "ignoring CreateOrUpdate: HostAgent has already shut down"
634                );
635                return Ok(());
636            }
637        };
638        let created = match host {
639            HostAgentMode::Process { host, .. } => {
640                host.spawn(
641                    create_or_update.name.clone().to_string(),
642                    BootstrapProcConfig {
643                        create_rank: create_or_update.rank.unwrap(),
644                        client_config_override: create_or_update
645                            .spec
646                            .client_config_override
647                            .clone(),
648                        proc_bind: create_or_update.spec.proc_bind.clone(),
649                        bootstrap_command: create_or_update.spec.bootstrap_command.clone(),
650                    },
651                )
652                .await
653            }
654            HostAgentMode::Local(host) => {
655                host.spawn(create_or_update.name.clone().to_string(), ())
656                    .await
657            }
658        };
659
660        let rank = create_or_update.rank.unwrap();
661
662        if let Err(e) = &created {
663            tracing::error!("failed to spawn proc {}: {}", create_or_update.name, e);
664        }
665        let was_empty = self.created.is_empty();
666        self.created.insert(
667            create_or_update.name.clone(),
668            ProcCreationState {
669                rank,
670                host_mesh_name: create_or_update.spec.host_mesh_name.clone(),
671                created,
672            },
673        );
674
675        // Transition Detached → Attached on first proc creation.
676        if was_empty {
677            if let HostAgentState::Detached(_) = &self.state {
678                let host = match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
679                    HostAgentState::Detached(h) => h,
680                    _ => unreachable!(),
681                };
682                self.state = HostAgentState::Attached(host);
683            }
684        }
685
686        // If any WaitRankStatus messages arrived before this proc
687        // existed, their waiters were stashed with a sentinel rank.
688        // Now that we know the real rank, fix them up and start a
689        // watch bridge.
690        // Extract the proc_id before mutably borrowing pending_proc_waiters.
691        let proc_id = self
692            .created
693            .get(&create_or_update.name)
694            .and_then(|s| s.created.as_ref().ok())
695            .map(|(pid, _)| pid.clone());
696
697        if let Some(waiters) = self.pending_proc_waiters.get_mut(&create_or_update.name) {
698            for (_, waiter_rank, _) in waiters.iter_mut() {
699                if *waiter_rank == usize::MAX {
700                    *waiter_rank = rank;
701                }
702            }
703        }
704
705        // Start a bridge and send ourselves an initial check.
706        if self
707            .pending_proc_waiters
708            .contains_key(&create_or_update.name)
709        {
710            if let Some(proc_id) = &proc_id {
711                self.start_watch_bridge(&create_or_update.name, proc_id)
712                    .await;
713            }
714            self.notify_proc_status_changed(&create_or_update.name);
715        }
716
717        self.publish_introspect_properties(cx);
718        Ok(())
719    }
720}
721
722#[async_trait]
723impl Handler<resource::Stop> for HostAgent {
724    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
725        tracing::info!(
726            name = "HostMeshAgentStatus",
727            proc_name = %message.name,
728            reason = %message.reason,
729            "stopping proc"
730        );
731        let host = match self.host() {
732            Some(h) => h,
733            None => {
734                // Host already shut down; all procs are terminated.
735                tracing::debug!(
736                    proc_name = %message.name,
737                    "ignoring Stop: HostAgent has already shut down"
738                );
739                return Ok(());
740            }
741        };
742        let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
743
744        if let Some(ProcCreationState {
745            created: Ok((proc_id, _)),
746            ..
747        }) = self.created.get(&message.name)
748        {
749            host.request_stop(cx, proc_id, timeout, &message.reason)
750                .await;
751        }
752
753        // Status may have changed to Stopping; notify pending waiters.
754        self.notify_proc_status_changed(&message.name);
755
756        self.publish_introspect_properties(cx);
757        Ok(())
758    }
759}
760
761#[async_trait]
762impl Handler<resource::GetRankStatus> for HostAgent {
763    async fn handle(
764        &mut self,
765        cx: &Context<Self>,
766        get_rank_status: resource::GetRankStatus,
767    ) -> anyhow::Result<()> {
768        use crate::StatusOverlay;
769        use crate::resource::Status;
770
771        let (rank, status) = match self.created.get(&get_rank_status.name) {
772            Some(ProcCreationState {
773                rank,
774                created: Ok((proc_id, _mesh_agent)),
775                ..
776            }) => {
777                let raw_status = match self.host() {
778                    Some(host) => host.proc_status(proc_id).await.0,
779                    None => resource::Status::Unknown,
780                };
781                (*rank, raw_status.clamp_min(self.min_proc_status()))
782            }
783            Some(ProcCreationState {
784                rank,
785                created: Err(e),
786                ..
787            }) => (*rank, Status::Failed(e.to_string())),
788            None => (usize::MAX, Status::NotExist),
789        };
790
791        let overlay = if rank == usize::MAX {
792            StatusOverlay::new()
793        } else {
794            StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
795                .expect("valid single-run overlay")
796        };
797        let result = get_rank_status.reply.send(cx, overlay);
798        // Ignore errors, because returning Err from here would cause the HostAgent
799        // to be stopped, which would take down the entire host. This only means
800        // some actor that requested the rank status failed to receive it.
801        if let Err(e) = result {
802            tracing::warn!(
803                actor = %cx.self_id(),
804                "failed to send GetRankStatus reply to {} due to error: {}",
805                get_rank_status.reply.port_id().actor_id(),
806                e
807            );
808        }
809        Ok(())
810    }
811}
812
813#[async_trait]
814impl Handler<resource::WaitRankStatus> for HostAgent {
815    async fn handle(
816        &mut self,
817        cx: &Context<Self>,
818        msg: resource::WaitRankStatus,
819    ) -> anyhow::Result<()> {
820        use crate::StatusOverlay;
821        use crate::resource::Status;
822
823        match self.created.get(&msg.name) {
824            Some(ProcCreationState {
825                rank,
826                created: Ok((proc_id, _)),
827                ..
828            }) => {
829                let rank = *rank;
830                let status = match self.host() {
831                    Some(host) => host.proc_status(proc_id).await.0,
832                    None => Status::Stopped,
833                };
834
835                // If already at or past the requested threshold, reply immediately.
836                if status >= msg.min_status {
837                    let overlay = StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
838                        .expect("valid single-run overlay");
839                    let _ = msg.reply.send(cx, overlay);
840                    return Ok(());
841                }
842
843                // Stash the waiter and start a bridge if we don't have one yet.
844                self.pending_proc_waiters
845                    .entry(msg.name.clone())
846                    .or_default()
847                    .push((msg.min_status, rank, msg.reply));
848
849                let proc_id = proc_id.clone();
850                self.start_watch_bridge(&msg.name, &proc_id).await;
851            }
852            Some(ProcCreationState {
853                rank,
854                created: Err(e),
855                ..
856            }) => {
857                // Creation failed — reply immediately with Failed status.
858                let overlay = StatusOverlay::try_from_runs(vec![(
859                    *rank..(*rank + 1),
860                    Status::Failed(e.to_string()),
861                )])
862                .expect("valid single-run overlay");
863                let _ = msg.reply.send(cx, overlay);
864            }
865            None => {
866                // Proc doesn't exist yet. Stash the waiter with a
867                // sentinel rank; CreateOrUpdate will fill it in and
868                // start the watch bridge.
869                self.pending_proc_waiters
870                    .entry(msg.name.clone())
871                    .or_default()
872                    .push((msg.min_status, usize::MAX, msg.reply));
873            }
874        }
875
876        Ok(())
877    }
878}
879
880#[async_trait]
881impl Handler<ProcStatusChanged> for HostAgent {
882    async fn handle(&mut self, cx: &Context<Self>, msg: ProcStatusChanged) -> anyhow::Result<()> {
883        use crate::StatusOverlay;
884        use crate::resource::Status;
885
886        let status = match self.created.get(&msg.name) {
887            Some(ProcCreationState {
888                created: Ok((proc_id, _)),
889                ..
890            }) => match self.host() {
891                Some(host) => host.proc_status(proc_id).await.0,
892                None => Status::Stopped,
893            },
894            Some(ProcCreationState {
895                created: Err(_), ..
896            }) => {
897                // Already replied with Failed when they were stashed.
898                return Ok(());
899            }
900            None => {
901                // Proc not created yet, nothing to flush.
902                return Ok(());
903            }
904        };
905
906        let Some(waiters) = self.pending_proc_waiters.get_mut(&msg.name) else {
907            return Ok(());
908        };
909
910        let remaining = std::mem::take(waiters);
911        for (min_status, rank, reply) in remaining {
912            if status >= min_status {
913                let overlay =
914                    StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
915                        .expect("valid single-run overlay");
916                let _ = reply.send(cx, overlay);
917            } else {
918                waiters.push((min_status, rank, reply));
919            }
920        }
921
922        if waiters.is_empty() {
923            self.pending_proc_waiters.remove(&msg.name);
924        }
925
926        Ok(())
927    }
928}
929
930impl HostAgent {
931    /// Send a `ProcStatusChanged` self-notification for the given proc name.
932    fn notify_proc_status_changed(&self, name: &Name) {
933        if let Some(port) = &self.proc_status_port {
934            let client = Instance::<()>::self_client();
935            let _ = port.send(client, ProcStatusChanged { name: name.clone() });
936        }
937    }
938
939    /// Start a bridge task that watches a proc's status channel and sends
940    /// `ProcStatusChanged` to self on each change. At most one bridge per proc.
941    async fn start_watch_bridge(&mut self, name: &Name, proc_id: &hyperactor_reference::ProcId) {
942        if self.watching.contains(name) {
943            return;
944        }
945        self.watching.insert(name.clone());
946
947        let port = match &self.proc_status_port {
948            Some(p) => p.clone(),
949            None => return,
950        };
951
952        match self.host() {
953            Some(HostAgentMode::Process { host, .. }) => {
954                if let Some(rx) = host.manager().watch(proc_id).await {
955                    start_proc_watch(port, rx, name.clone(), |s| s.clone().into());
956                }
957            }
958            Some(HostAgentMode::Local(host)) => {
959                if let Some(rx) = host.manager().watch(proc_id).await {
960                    start_proc_watch(port, rx, name.clone(), |s| (*s).into());
961                }
962            }
963            None => {}
964        }
965    }
966}
967
968/// Spawn a bridge task that watches a proc's status channel and sends
969/// `ProcStatusChanged` to the actor via the given `PortHandle`.
970fn start_proc_watch<S>(
971    port: PortHandle<ProcStatusChanged>,
972    mut rx: tokio::sync::watch::Receiver<S>,
973    name: Name,
974    to_status: impl Fn(&S) -> resource::Status + Send + 'static,
975) where
976    S: Send + Sync + 'static,
977{
978    // TODO: replace Instance::self_client() with a proper mechanism
979    // for sending to port handles without an actor context.
980    let client = Instance::<()>::self_client();
981    tokio::spawn(async move {
982        loop {
983            match rx.changed().await {
984                Ok(()) => {
985                    let status = to_status(&*rx.borrow());
986                    let terminated = status.is_terminated();
987                    let _ = port.send(client, ProcStatusChanged { name: name.clone() });
988                    if terminated {
989                        return;
990                    }
991                }
992                Err(_) => {
993                    let _ = port.send(client, ProcStatusChanged { name: name.clone() });
994                    return;
995                }
996            }
997        }
998    });
999}
1000
1001#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1002pub struct ShutdownHost {
1003    /// Grace window: send SIGTERM and wait this long before
1004    /// escalating.
1005    pub timeout: std::time::Duration,
1006    /// Max number of children to terminate concurrently on this host.
1007    pub max_in_flight: usize,
1008    /// Ack that the agent finished shutdown work (best-effort).
1009    #[reply]
1010    pub ack: hyperactor::reference::PortRef<()>,
1011}
1012wirevalue::register_type!(ShutdownHost);
1013
1014/// Drain user procs on this host but keep the host, service proc,
1015/// and networking alive. Used during mesh stop/shutdown so that
1016/// forwarder flushes can still reach remote hosts.
1017///
1018/// If `host_mesh_name` is `Some`, only procs belonging to that mesh
1019/// are stopped (selective drain). If `None`, all procs are
1020/// terminated (full drain).
1021#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1022pub struct DrainHost {
1023    pub timeout: std::time::Duration,
1024    pub max_in_flight: usize,
1025    pub host_mesh_name: Option<crate::Name>,
1026    #[reply]
1027    pub ack: hyperactor::reference::PortRef<()>,
1028}
1029wirevalue::register_type!(DrainHost);
1030
1031#[async_trait]
1032impl Handler<DrainHost> for HostAgent {
1033    async fn handle(&mut self, cx: &Context<Self>, msg: DrainHost) -> anyhow::Result<()> {
1034        if msg.host_mesh_name.is_some() {
1035            // Selective drain: stop only procs belonging to the named mesh.
1036            self.drain_by_mesh_name(cx, msg.timeout, msg.host_mesh_name.as_ref())
1037                .await;
1038            msg.ack.send(cx, ())?;
1039            return Ok(());
1040        }
1041
1042        // Full drain: terminate all children.
1043        let host = match std::mem::replace(&mut self.state, HostAgentState::Draining) {
1044            HostAgentState::Attached(h) => h,
1045            other @ (HostAgentState::Detached(_) | HostAgentState::Draining) => {
1046                // Nothing to drain — ack immediately.
1047                self.state = other;
1048                msg.ack.send(cx, ())?;
1049                return Ok(());
1050            }
1051            HostAgentState::Shutdown => {
1052                self.state = HostAgentState::Shutdown;
1053                msg.ack.send(cx, ())?;
1054                return Ok(());
1055            }
1056        };
1057
1058        // Do NOT clear `self.created` here: the DrainWorker
1059        // terminates procs asynchronously, and concurrent GetState /
1060        // GetRankStatus queries must still find the entries. With the
1061        // host in Draining state (`self.host()` returns None), those
1062        // handlers already report Status::Stopped for every known
1063        // proc, which is the correct answer while draining is
1064        // in progress.
1065
1066        let done_port = cx.port::<DrainComplete>();
1067
1068        cx.spawn_with_name(
1069            "drain_worker",
1070            DrainWorker {
1071                host: Some(host),
1072                timeout: msg.timeout,
1073                max_in_flight: msg.max_in_flight,
1074                ack: Some(msg.ack),
1075                done_notify: done_port,
1076            },
1077        )?;
1078
1079        Ok(())
1080    }
1081}
1082
1083#[async_trait]
1084impl Handler<DrainComplete> for HostAgent {
1085    async fn handle(&mut self, cx: &Context<Self>, msg: DrainComplete) -> anyhow::Result<()> {
1086        self.state = HostAgentState::Detached(msg.host);
1087        self.created.clear();
1088        msg.ack.send(cx, ())?;
1089        Ok(())
1090    }
1091}
1092
1093#[async_trait]
1094impl Handler<ShutdownHost> for HostAgent {
1095    async fn handle(&mut self, cx: &Context<Self>, msg: ShutdownHost) -> anyhow::Result<()> {
1096        // Terminate children BEFORE acking, so the caller's networking
1097        // stays alive while children flush their forwarders during
1098        // teardown. If we ack first, the caller proceeds to tear down
1099        // the host proc's networking while children are still running,
1100        // causing their forwarder flushes to hang until
1101        // MESSAGE_DELIVERY_TIMEOUT expires.
1102        if !self.created.is_empty() {
1103            self.drain(cx, msg.timeout, msg.max_in_flight).await;
1104        }
1105
1106        // Ack after children are terminated so the caller does not
1107        // tear down the host's networking prematurely.
1108        msg.ack.send(cx, ())?;
1109
1110        // Drop the host and signal the bootstrap loop to drain the
1111        // mailbox and exit.
1112        match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
1113            HostAgentState::Detached(HostAgentMode::Process {
1114                shutdown_tx: Some(tx),
1115                ..
1116            })
1117            | HostAgentState::Attached(HostAgentMode::Process {
1118                shutdown_tx: Some(tx),
1119                ..
1120            }) => {
1121                tracing::info!(
1122                    proc_id = %cx.self_id().proc_id(),
1123                    actor_id = %cx.self_id(),
1124                    "host is shut down, sending mailbox handle to bootstrap for draining"
1125                );
1126                if let Some(handle) = self.mailbox_handle.take() {
1127                    let _ = tx.send(handle);
1128                }
1129            }
1130            _ => {}
1131        }
1132
1133        Ok(())
1134    }
1135}
1136
1137#[derive(Debug, Clone, PartialEq, Eq, Named, Serialize, Deserialize)]
1138pub struct ProcState {
1139    pub proc_id: hyperactor_reference::ProcId,
1140    pub create_rank: usize,
1141    pub mesh_agent: hyperactor_reference::ActorRef<ProcAgent>,
1142    pub bootstrap_command: Option<BootstrapCommand>,
1143    pub proc_status: Option<bootstrap::ProcStatus>,
1144}
1145wirevalue::register_type!(ProcState);
1146
1147#[async_trait]
1148impl Handler<resource::GetState<ProcState>> for HostAgent {
1149    async fn handle(
1150        &mut self,
1151        cx: &Context<Self>,
1152        get_state: resource::GetState<ProcState>,
1153    ) -> anyhow::Result<()> {
1154        let state = match self.created.get(&get_state.name) {
1155            Some(ProcCreationState {
1156                rank,
1157                created: Ok((proc_id, mesh_agent)),
1158                ..
1159            }) => {
1160                let (raw_status, proc_status, bootstrap_command) = match self.host() {
1161                    Some(host) => {
1162                        let (status, proc_status) = host.proc_status(proc_id).await;
1163                        (status, proc_status, host.bootstrap_command())
1164                    }
1165                    None => (resource::Status::Unknown, None, None),
1166                };
1167                let status = raw_status.clamp_min(self.min_proc_status());
1168                resource::State {
1169                    name: get_state.name.clone(),
1170                    status,
1171                    state: Some(ProcState {
1172                        proc_id: proc_id.clone(),
1173                        create_rank: *rank,
1174                        mesh_agent: mesh_agent.clone(),
1175                        bootstrap_command,
1176                        proc_status,
1177                    }),
1178                    generation: 0,
1179                    timestamp: std::time::SystemTime::now(),
1180                }
1181            }
1182            Some(ProcCreationState {
1183                created: Err(e), ..
1184            }) => resource::State {
1185                name: get_state.name.clone(),
1186                status: resource::Status::Failed(e.to_string()),
1187                state: None,
1188                generation: 0,
1189                timestamp: std::time::SystemTime::now(),
1190            },
1191            None => resource::State {
1192                name: get_state.name.clone(),
1193                status: resource::Status::NotExist,
1194                state: None,
1195                generation: 0,
1196                timestamp: std::time::SystemTime::now(),
1197            },
1198        };
1199
1200        let result = get_state.reply.send(cx, state);
1201        // Ignore errors, because returning Err from here would cause the HostAgent
1202        // to be stopped, which would take down the entire host. This only means
1203        // some actor that requested the state of a proc failed to receive it.
1204        if let Err(e) = result {
1205            tracing::warn!(
1206                actor = %cx.self_id(),
1207                "failed to send GetState reply to {} due to error: {}",
1208                get_state.reply.port_id().actor_id(),
1209                e
1210            );
1211        }
1212        Ok(())
1213    }
1214}
1215
1216#[async_trait]
1217impl Handler<resource::List> for HostAgent {
1218    async fn handle(&mut self, cx: &Context<Self>, list: resource::List) -> anyhow::Result<()> {
1219        list.reply
1220            .send(cx, self.created.keys().cloned().collect())?;
1221        Ok(())
1222    }
1223}
1224
1225/// Push client configuration overrides to this host agent's process.
1226///
1227/// The attrs are installed as `Source::ClientOverride` (lowest explicit
1228/// priority), so the host's own env vars and file config take precedence.
1229/// This message is idempotent — sending the same attrs twice replaces
1230/// the layer wholesale.
1231///
1232/// Request-reply: the reply acts as a barrier confirming the config
1233/// is installed. The caller should await with a timeout and treat
1234/// timeout as best-effort (log warning, continue).
1235#[derive(Debug, Named, Handler, RefClient, HandleClient, Serialize, Deserialize)]
1236pub struct SetClientConfig {
1237    pub attrs: Attrs,
1238    #[reply]
1239    pub done: hyperactor_reference::PortRef<()>,
1240}
1241wirevalue::register_type!(SetClientConfig);
1242
1243#[async_trait]
1244impl Handler<SetClientConfig> for HostAgent {
1245    async fn handle(&mut self, cx: &Context<Self>, msg: SetClientConfig) -> anyhow::Result<()> {
1246        // Use `set` (not `create_or_merge`) because `push_config` always
1247        // sends a complete `propagatable_attrs()` snapshot. Replacing the
1248        // layer wholesale is intentional and idempotent.
1249        hyperactor_config::global::set(
1250            hyperactor_config::global::Source::ClientOverride,
1251            msg.attrs,
1252        );
1253        tracing::debug!("installed client config override on host agent");
1254        msg.done.send(cx, ())?;
1255        Ok(())
1256    }
1257}
1258
1259/// Boot the ProcAgent on the host's local proc (LP-1).
1260///
1261/// The local proc starts empty; this message activates it by spawning
1262/// a `ProcAgent` (once, via `OnceLock`). Called by
1263/// `monarch_hyperactor::bootstrap_host` when setting up the Python
1264/// `this_proc()` singleton.
1265///
1266/// See also: `hyperactor::host::LOCAL_PROC_NAME`.
1267#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1268pub struct GetLocalProc {
1269    #[reply]
1270    pub proc_mesh_agent: PortHandle<ActorHandle<ProcAgent>>,
1271}
1272
1273#[async_trait]
1274impl Handler<GetLocalProc> for HostAgent {
1275    async fn handle(
1276        &mut self,
1277        cx: &Context<Self>,
1278        GetLocalProc { proc_mesh_agent }: GetLocalProc,
1279    ) -> anyhow::Result<()> {
1280        let host = self
1281            .host()
1282            .ok_or_else(|| anyhow::anyhow!("HostAgent has already shut down"))?;
1283        let agent = self
1284            .local_mesh_agent
1285            .get_or_init(|| ProcAgent::boot_v1(host.local_proc().clone(), None));
1286
1287        match agent {
1288            Err(e) => anyhow::bail!("error booting local proc: {}", e),
1289            Ok(agent) => proc_mesh_agent.send(cx, agent.clone())?,
1290        };
1291
1292        Ok(())
1293    }
1294}
1295
1296#[async_trait]
1297impl Handler<PySpyDump> for HostAgent {
1298    async fn handle(
1299        &mut self,
1300        cx: &Context<Self>,
1301        message: PySpyDump,
1302    ) -> Result<(), anyhow::Error> {
1303        PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
1304    }
1305}
1306
1307#[async_trait]
1308impl Handler<ConfigDump> for HostAgent {
1309    async fn handle(
1310        &mut self,
1311        cx: &Context<Self>,
1312        message: ConfigDump,
1313    ) -> Result<(), anyhow::Error> {
1314        let entries = hyperactor_config::global::config_entries();
1315        // Reply is best-effort: the caller may have timed out and dropped
1316        // the once-port.  That must not crash this actor.
1317        if let Err(e) = message.result.send(cx, ConfigDumpResult { entries }) {
1318            tracing::warn!("HostAgent: ConfigDump reply undeliverable (caller timed out): {e}",);
1319        }
1320        Ok(())
1321    }
1322}
1323
1324/// A trampoline actor that spawns a [`Host`], and sends a reference to the
1325/// corresponding [`HostAgent`] to the provided reply port.
1326///
1327/// This is used to bootstrap host meshes from proc meshes.
1328#[derive(Debug)]
1329#[hyperactor::export(
1330    spawn = true,
1331    handlers=[GetHostMeshAgent]
1332)]
1333pub(crate) struct HostMeshAgentProcMeshTrampoline {
1334    host_mesh_agent: ActorHandle<HostAgent>,
1335    reply_port: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
1336}
1337
1338#[async_trait]
1339impl Actor for HostMeshAgentProcMeshTrampoline {
1340    async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
1341        self.reply_port.send(this, self.host_mesh_agent.bind())?;
1342        Ok(())
1343    }
1344}
1345
1346#[async_trait]
1347impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline {
1348    type Params = (
1349        ChannelTransport,
1350        hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
1351        Option<BootstrapCommand>,
1352        bool, /* local? */
1353    );
1354
1355    async fn new(
1356        (transport, reply_port, command, local): Self::Params,
1357        _environment: Flattrs,
1358    ) -> anyhow::Result<Self> {
1359        let host = if local {
1360            let spawn: ProcManagerSpawnFn =
1361                Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
1362            let manager = LocalProcManager::new(spawn);
1363            let host = Host::new(manager, transport.any()).await?;
1364            HostAgentMode::Local(host)
1365        } else {
1366            let command = match command {
1367                Some(command) => command,
1368                None => BootstrapCommand::current()?,
1369            };
1370            tracing::info!("booting host with proc command {:?}", command);
1371            let manager = BootstrapProcManager::new(command).unwrap();
1372            let host = Host::new(manager, transport.any()).await?;
1373            HostAgentMode::Process {
1374                host,
1375                shutdown_tx: None,
1376            }
1377        };
1378
1379        let system_proc = host.system_proc().clone();
1380        let host_mesh_agent =
1381            system_proc.spawn(HOST_MESH_AGENT_ACTOR_NAME, HostAgent::new(host))?;
1382
1383        Ok(Self {
1384            host_mesh_agent,
1385            reply_port,
1386        })
1387    }
1388}
1389
1390#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient)]
1391pub struct GetHostMeshAgent {
1392    #[reply]
1393    pub host_mesh_agent: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
1394}
1395wirevalue::register_type!(GetHostMeshAgent);
1396
1397#[async_trait]
1398impl Handler<GetHostMeshAgent> for HostMeshAgentProcMeshTrampoline {
1399    async fn handle(
1400        &mut self,
1401        cx: &Context<Self>,
1402        get_host_mesh_agent: GetHostMeshAgent,
1403    ) -> anyhow::Result<()> {
1404        get_host_mesh_agent
1405            .host_mesh_agent
1406            .send(cx, self.host_mesh_agent.bind())?;
1407        Ok(())
1408    }
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413    use std::assert_matches::assert_matches;
1414
1415    use hyperactor::Proc;
1416    use hyperactor::channel::ChannelTransport;
1417
1418    use super::*;
1419    use crate::bootstrap::ProcStatus;
1420    use crate::resource::CreateOrUpdateClient;
1421    use crate::resource::GetStateClient;
1422    use crate::resource::StopClient;
1423    use crate::resource::WaitRankStatusClient;
1424
1425    #[tokio::test]
1426    #[cfg(fbcode_build)]
1427    async fn test_basic() {
1428        let host = Host::new(
1429            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1430            ChannelTransport::Unix.any(),
1431        )
1432        .await
1433        .unwrap();
1434
1435        let host_addr = host.addr().clone();
1436        let system_proc = host.system_proc().clone();
1437        let host_agent = system_proc
1438            .spawn(
1439                HOST_MESH_AGENT_ACTOR_NAME,
1440                HostAgent::new(HostAgentMode::Process {
1441                    host,
1442                    shutdown_tx: None,
1443                }),
1444            )
1445            .unwrap();
1446
1447        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1448        let (client, _client_handle) = client_proc.instance("client").unwrap();
1449
1450        let name = Name::new("proc1").unwrap();
1451
1452        // First, create the proc, then query its state:
1453
1454        host_agent
1455            .create_or_update(
1456                &client,
1457                name.clone(),
1458                resource::Rank::new(0),
1459                ProcSpec::default(),
1460            )
1461            .await
1462            .unwrap();
1463        assert_matches!(
1464            host_agent.get_state(&client, name.clone()).await.unwrap(),
1465            resource::State {
1466                name: resource_name,
1467                status: resource::Status::Running,
1468                state: Some(ProcState {
1469                    // The proc itself should be direct addressed, with its name directly.
1470                    proc_id,
1471                    // The mesh agent should run in the same proc, under the name
1472                    // "proc_agent".
1473                    mesh_agent,
1474                    bootstrap_command,
1475                    proc_status: Some(ProcStatus::Ready { started_at: _, addr: _, agent: proc_status_mesh_agent}),
1476                    ..
1477                }),
1478                ..
1479            } if name == resource_name
1480              && proc_id == hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string())
1481              && mesh_agent == hyperactor_reference::ActorRef::attest(hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string()).actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0)) && bootstrap_command == Some(BootstrapCommand::test())
1482              && mesh_agent == proc_status_mesh_agent
1483        );
1484    }
1485
1486    /// WaitRankStatus on a running proc replies immediately with Running.
1487    #[tokio::test]
1488    #[cfg(fbcode_build)]
1489    async fn test_wait_rank_status_already_running() {
1490        let host = Host::new(
1491            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1492            ChannelTransport::Unix.any(),
1493        )
1494        .await
1495        .unwrap();
1496
1497        let system_proc = host.system_proc().clone();
1498        let host_agent = system_proc
1499            .spawn(
1500                HOST_MESH_AGENT_ACTOR_NAME,
1501                HostAgent::new(HostAgentMode::Process {
1502                    host,
1503                    shutdown_tx: None,
1504                }),
1505            )
1506            .unwrap();
1507
1508        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1509        let (client, _client_handle) = client_proc.instance("client").unwrap();
1510
1511        let name = Name::new("proc1").unwrap();
1512        host_agent
1513            .create_or_update(
1514                &client,
1515                name.clone(),
1516                resource::Rank::new(0),
1517                ProcSpec::default(),
1518            )
1519            .await
1520            .unwrap();
1521
1522        // Proc is Running; wait for Running should reply immediately.
1523        let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1524        host_agent
1525            .wait_rank_status(&client, name, resource::Status::Running, port.bind())
1526            .await
1527            .unwrap();
1528
1529        let overlay = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1530            .await
1531            .expect("reply timed out")
1532            .expect("reply channel closed");
1533        assert!(!overlay.is_empty(), "expected non-empty overlay");
1534    }
1535
1536    /// WaitRankStatus for Stopped, then stop the proc — reply should
1537    /// arrive only after the proc actually stops.
1538    #[tokio::test]
1539    #[cfg(fbcode_build)]
1540    async fn test_wait_rank_status_stop() {
1541        let host = Host::new(
1542            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1543            ChannelTransport::Unix.any(),
1544        )
1545        .await
1546        .unwrap();
1547
1548        let system_proc = host.system_proc().clone();
1549        let host_agent = system_proc
1550            .spawn(
1551                HOST_MESH_AGENT_ACTOR_NAME,
1552                HostAgent::new(HostAgentMode::Process {
1553                    host,
1554                    shutdown_tx: None,
1555                }),
1556            )
1557            .unwrap();
1558
1559        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1560        let (client, _client_handle) = client_proc.instance("client").unwrap();
1561
1562        let name = Name::new("proc1").unwrap();
1563        host_agent
1564            .create_or_update(
1565                &client,
1566                name.clone(),
1567                resource::Rank::new(0),
1568                ProcSpec::default(),
1569            )
1570            .await
1571            .unwrap();
1572
1573        // Wait for Stopped — should not reply yet.
1574        let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1575        host_agent
1576            .wait_rank_status(
1577                &client,
1578                name.clone(),
1579                resource::Status::Stopped,
1580                port.bind(),
1581            )
1582            .await
1583            .unwrap();
1584
1585        // Stop the proc.
1586        host_agent
1587            .stop(&client, name, "test".to_string())
1588            .await
1589            .unwrap();
1590
1591        // Now the reply should arrive.
1592        let overlay = tokio::time::timeout(Duration::from_secs(30), rx.recv())
1593            .await
1594            .expect("reply timed out — proc did not reach Stopped")
1595            .expect("reply channel closed");
1596        assert!(!overlay.is_empty(), "expected non-empty overlay");
1597    }
1598
1599    /// WaitRankStatus sent before the proc is created — the waiter is
1600    /// stashed and replied to once CreateOrUpdate runs.
1601    #[tokio::test]
1602    #[cfg(fbcode_build)]
1603    async fn test_wait_rank_status_before_proc_exists() {
1604        let host = Host::new(
1605            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1606            ChannelTransport::Unix.any(),
1607        )
1608        .await
1609        .unwrap();
1610
1611        let system_proc = host.system_proc().clone();
1612        let host_agent = system_proc
1613            .spawn(
1614                HOST_MESH_AGENT_ACTOR_NAME,
1615                HostAgent::new(HostAgentMode::Process {
1616                    host,
1617                    shutdown_tx: None,
1618                }),
1619            )
1620            .unwrap();
1621
1622        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1623        let (client, _client_handle) = client_proc.instance("client").unwrap();
1624
1625        let name = Name::new("proc1").unwrap();
1626
1627        // Wait for Running on a proc that doesn't exist yet.
1628        let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1629        host_agent
1630            .wait_rank_status(
1631                &client,
1632                name.clone(),
1633                resource::Status::Running,
1634                port.bind(),
1635            )
1636            .await
1637            .unwrap();
1638
1639        // Now create the proc — the stashed waiter should get its
1640        // sentinel rank fixed and be flushed once the proc is Running.
1641        host_agent
1642            .create_or_update(&client, name, resource::Rank::new(0), ProcSpec::default())
1643            .await
1644            .unwrap();
1645
1646        let overlay = tokio::time::timeout(Duration::from_secs(10), rx.recv())
1647            .await
1648            .expect("reply timed out — waiter was not flushed after CreateOrUpdate")
1649            .expect("reply channel closed");
1650        assert!(!overlay.is_empty(), "expected non-empty overlay");
1651    }
1652
1653    /// DrainHost with a host_mesh_name filter only stops procs
1654    /// belonging to that mesh; procs from other meshes are unaffected.
1655    #[tokio::test]
1656    #[cfg(fbcode_build)]
1657    async fn test_drain_scoped_to_host_mesh_name() {
1658        let host = Host::new(
1659            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1660            ChannelTransport::Unix.any(),
1661        )
1662        .await
1663        .unwrap();
1664
1665        let system_proc = host.system_proc().clone();
1666        let host_agent = system_proc
1667            .spawn(
1668                HOST_MESH_AGENT_ACTOR_NAME,
1669                HostAgent::new(HostAgentMode::Process {
1670                    host,
1671                    shutdown_tx: None,
1672                }),
1673            )
1674            .unwrap();
1675
1676        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1677        let (client, _client_handle) = client_proc.instance("client").unwrap();
1678
1679        let mesh_a = crate::Name::new("mesh_a").unwrap();
1680        let mesh_b = crate::Name::new("mesh_b").unwrap();
1681        let proc_a = crate::Name::new("proc_a").unwrap();
1682        let proc_b = crate::Name::new("proc_b").unwrap();
1683
1684        // Create proc_a belonging to mesh_a.
1685        let mut spec_a = ProcSpec::default();
1686        spec_a.host_mesh_name = Some(mesh_a.clone());
1687        host_agent
1688            .create_or_update(&client, proc_a.clone(), resource::Rank::new(0), spec_a)
1689            .await
1690            .unwrap();
1691
1692        // Create proc_b belonging to mesh_b.
1693        let mut spec_b = ProcSpec::default();
1694        spec_b.host_mesh_name = Some(mesh_b.clone());
1695        host_agent
1696            .create_or_update(&client, proc_b.clone(), resource::Rank::new(1), spec_b)
1697            .await
1698            .unwrap();
1699
1700        // Both should be Running.
1701        assert_matches!(
1702            host_agent.get_state(&client, proc_a.clone()).await.unwrap(),
1703            resource::State {
1704                status: resource::Status::Running,
1705                ..
1706            }
1707        );
1708        assert_matches!(
1709            host_agent.get_state(&client, proc_b.clone()).await.unwrap(),
1710            resource::State {
1711                status: resource::Status::Running,
1712                ..
1713            }
1714        );
1715
1716        // Drain only mesh_a.
1717        host_agent
1718            .drain_host(&client, Duration::from_secs(5), 16, Some(mesh_a.clone()))
1719            .await
1720            .unwrap();
1721
1722        // proc_a should be gone (removed from created).
1723        assert_matches!(
1724            host_agent.get_state(&client, proc_a.clone()).await.unwrap(),
1725            resource::State {
1726                status: resource::Status::NotExist,
1727                ..
1728            }
1729        );
1730
1731        // proc_b should still be Running.
1732        assert_matches!(
1733            host_agent.get_state(&client, proc_b.clone()).await.unwrap(),
1734            resource::State {
1735                status: resource::Status::Running,
1736                ..
1737            }
1738        );
1739    }
1740
1741    /// DrainHost with host_mesh_name=None drains all procs regardless
1742    /// of their mesh affiliation (backwards compatibility).
1743    #[tokio::test]
1744    #[cfg(fbcode_build)]
1745    async fn test_drain_none_drains_all() {
1746        let host = Host::new(
1747            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1748            ChannelTransport::Unix.any(),
1749        )
1750        .await
1751        .unwrap();
1752
1753        let system_proc = host.system_proc().clone();
1754        let host_agent = system_proc
1755            .spawn(
1756                HOST_MESH_AGENT_ACTOR_NAME,
1757                HostAgent::new(HostAgentMode::Process {
1758                    host,
1759                    shutdown_tx: None,
1760                }),
1761            )
1762            .unwrap();
1763
1764        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1765        let (client, _client_handle) = client_proc.instance("client").unwrap();
1766
1767        let mesh_a = crate::Name::new("mesh_a").unwrap();
1768        let mesh_b = crate::Name::new("mesh_b").unwrap();
1769        let proc_a = crate::Name::new("proc_a").unwrap();
1770        let proc_b = crate::Name::new("proc_b").unwrap();
1771
1772        let mut spec_a = ProcSpec::default();
1773        spec_a.host_mesh_name = Some(mesh_a);
1774        host_agent
1775            .create_or_update(&client, proc_a.clone(), resource::Rank::new(0), spec_a)
1776            .await
1777            .unwrap();
1778
1779        let mut spec_b = ProcSpec::default();
1780        spec_b.host_mesh_name = Some(mesh_b);
1781        host_agent
1782            .create_or_update(&client, proc_b.clone(), resource::Rank::new(1), spec_b)
1783            .await
1784            .unwrap();
1785
1786        // Drain all (no filter).
1787        host_agent
1788            .drain_host(&client, Duration::from_secs(5), 16, None)
1789            .await
1790            .unwrap();
1791
1792        // Both should be gone.
1793        assert_matches!(
1794            host_agent.get_state(&client, proc_a).await.unwrap(),
1795            resource::State {
1796                status: resource::Status::NotExist,
1797                ..
1798            }
1799        );
1800        assert_matches!(
1801            host_agent.get_state(&client, proc_b).await.unwrap(),
1802            resource::State {
1803                status: resource::Status::NotExist,
1804                ..
1805            }
1806        );
1807    }
1808}