Skip to main content

hyperactor_mesh/host_mesh/
host_agent.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The mesh agent actor that manages a host.
10
11// EnumAsInner generates code that triggers a false positive
12// unused_assignments lint on struct variant fields. #[allow] on the
13// enum itself doesn't propagate into derive-macro-generated code, so
14// the suppression must be at module scope.
15#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fmt;
20use std::pin::Pin;
21use std::sync::OnceLock;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use hyperactor::Actor;
26use hyperactor::ActorHandle;
27use hyperactor::ActorRef;
28use hyperactor::Addr;
29use hyperactor::Context;
30use hyperactor::Endpoint as _;
31use hyperactor::HandleClient;
32use hyperactor::Handler;
33use hyperactor::Instance;
34use hyperactor::PortHandle;
35use hyperactor::PortRef;
36use hyperactor::Proc;
37use hyperactor::ProcAddr;
38use hyperactor::RefClient;
39use hyperactor::RemoteEndpoint as _;
40use hyperactor::context;
41use hyperactor::mailbox::MailboxServerHandle;
42use hyperactor_config::Flattrs;
43use hyperactor_config::attrs::Attrs;
44use serde::Deserialize;
45use serde::Serialize;
46use tokio::time::Duration;
47use typeuri::Named;
48
49use crate::bootstrap;
50use crate::bootstrap::BootstrapCommand;
51use crate::bootstrap::BootstrapProcConfig;
52use crate::bootstrap::BootstrapProcManager;
53use crate::config_dump::ConfigDump;
54use crate::config_dump::ConfigDumpResult;
55use crate::host::Host;
56use crate::host::HostError;
57use crate::host::LOCAL_PROC_NAME;
58use crate::host::LocalProcManager;
59use crate::host::SERVICE_PROC_NAME;
60use crate::host::SingleTerminate;
61use crate::mesh_id::HostMeshId;
62use crate::mesh_id::ResourceId;
63use crate::proc_agent::ProcAgent;
64use crate::pyspy::PySpyDump;
65use crate::pyspy::PySpyProfile;
66use crate::pyspy::PySpyProfileWorker;
67use crate::pyspy::PySpyWorker;
68use crate::resource;
69use crate::resource::ProcSpec;
70
71pub(crate) type ProcManagerSpawnFuture =
72    Pin<Box<dyn Future<Output = anyhow::Result<ActorHandle<ProcAgent>>> + Send>>;
73pub(crate) type ProcManagerSpawnFn = Box<dyn Fn(Proc) -> ProcManagerSpawnFuture + Send + Sync>;
74
75/// Represents the different ways a [`Host`] can be managed by an agent.
76///
77/// A host can either:
78/// - [`Process`] — a host running as an external OS process, managed by
79///   [`BootstrapProcManager`].
80/// - [`Local`] — a host running in-process, managed by
81///   [`LocalProcManager`] with a custom spawn function.
82///
83/// This abstraction lets the same `HostAgent` work across both
84/// out-of-process and in-process execution modes.
85#[derive(EnumAsInner)]
86pub enum HostAgentMode {
87    Process {
88        host: Host<BootstrapProcManager>,
89        /// If set, the ShutdownHost handler sends the frontend mailbox server
90        /// handle back to the bootstrap loop via this channel once shutdown is
91        /// complete, so the caller can drain it and exit.
92        shutdown_tx: Option<tokio::sync::oneshot::Sender<MailboxServerHandle>>,
93    },
94    Local(Host<LocalProcManager<ProcManagerSpawnFn>>),
95}
96
97impl HostAgentMode {
98    pub(crate) fn addr(&self) -> &hyperactor::channel::ChannelAddr {
99        #[allow(clippy::match_same_arms)]
100        match self {
101            HostAgentMode::Process { host, .. } => host.addr(),
102            HostAgentMode::Local(host) => host.addr(),
103        }
104    }
105
106    pub(crate) fn system_proc(&self) -> &Proc {
107        #[allow(clippy::match_same_arms)]
108        match self {
109            HostAgentMode::Process { host, .. } => host.system_proc(),
110            HostAgentMode::Local(host) => host.system_proc(),
111        }
112    }
113
114    pub(crate) fn local_proc(&self) -> &Proc {
115        #[allow(clippy::match_same_arms)]
116        match self {
117            HostAgentMode::Process { host, .. } => host.local_proc(),
118            HostAgentMode::Local(host) => host.local_proc(),
119        }
120    }
121
122    /// Non-blocking stop: send the stop signal and spawn a background
123    /// task for cleanup. Returns immediately without blocking the
124    /// actor.
125    async fn request_stop(
126        &self,
127        cx: &impl context::Actor,
128        proc: &ProcAddr,
129        timeout: Duration,
130        reason: &str,
131    ) {
132        match self {
133            HostAgentMode::Process { host, .. } => {
134                host.manager().request_stop(cx, proc, timeout, reason).await;
135            }
136            HostAgentMode::Local(host) => {
137                host.manager().request_stop(proc, timeout, reason).await;
138            }
139        }
140    }
141
142    /// Query a proc's lifecycle state, returning both the coarse
143    /// `resource::Status` used by the resource protocol and the
144    /// detailed `bootstrap::ProcStatus` (when available) for callers
145    /// that need process-level detail such as PIDs or exit codes.
146    async fn proc_status(
147        &self,
148        proc_id: &ProcAddr,
149    ) -> (resource::Status, Option<bootstrap::ProcStatus>) {
150        match self {
151            HostAgentMode::Process { host, .. } => match host.manager().status(proc_id).await {
152                Some(proc_status) => (proc_status.clone().into(), Some(proc_status)),
153                None => (resource::Status::Unknown, None),
154            },
155            HostAgentMode::Local(host) => {
156                let status = match host.manager().local_proc_status(proc_id).await {
157                    Some(crate::host::LocalProcStatus::Stopping) => resource::Status::Stopping,
158                    Some(crate::host::LocalProcStatus::Stopped) => resource::Status::Stopped,
159                    None => resource::Status::Running,
160                };
161                (status, None)
162            }
163        }
164    }
165
166    /// The bootstrap command used by the process manager, if any.
167    fn bootstrap_command(&self) -> Option<BootstrapCommand> {
168        match self {
169            HostAgentMode::Process { host, .. } => Some(host.manager().command().clone()),
170            HostAgentMode::Local(_) => None,
171        }
172    }
173}
174
175#[derive(Debug)]
176pub(crate) struct ProcCreationState {
177    pub(crate) rank: usize,
178    pub(crate) host_mesh_id: Option<HostMeshId>,
179    pub(crate) created: Result<(ProcAddr, ActorRef<ProcAgent>), HostError>,
180    /// "Owner is alive" deadline communicated by the controller via
181    /// `KeepaliveGetState`. The host's `SelfCheck` reaper compares against this
182    /// and tears down procs whose owner has stopped extending the keepalive.
183    pub(crate) expiry_time: Option<std::time::SystemTime>,
184}
185
186/// Actor name used when spawning the host mesh agent on the system proc.
187pub const HOST_MESH_AGENT_ACTOR_NAME: &str = "host_agent";
188
189/// Lifecycle state of the host managed by [`HostAgent`].
190enum HostAgentState {
191    /// Waiting for a client to attach. The host is idle and ready
192    /// to accept new proc spawn requests.
193    Detached(HostAgentMode),
194    /// Actively running procs for an attached client.
195    Attached(HostAgentMode),
196    /// Procs are being drained by a DrainWorker. The host has been
197    /// temporarily moved to the worker. The host agent remains
198    /// responsive; min_proc_status() returns Stopping.
199    Draining,
200    /// Host fully shut down.
201    Shutdown,
202}
203
204/// A mesh agent is responsible for managing a host in a [`HostMesh`],
205/// through the resource behaviors defined in [`crate::resource`].
206/// Self-notification sent by bridge tasks when a proc's status changes.
207/// Not exported or registered — only used internally via `PortHandle`.
208#[derive(Debug, Serialize, Deserialize, Named)]
209struct ProcStatusChanged {
210    id: ResourceId,
211}
212
213/// Sent by DrainWorker back to HostAgent when draining completes.
214/// Not exported — delivered locally via PortHandle (no serialization).
215struct DrainComplete {
216    host: HostAgentMode,
217    ack: PortRef<()>,
218}
219
220/// Child actor whose only job is to run `host.terminate_children()` in
221/// its `init()`, return the host and ack to the parent via DrainComplete,
222/// and exit. Runs on the same proc as the host agent so it gets its
223/// own `Instance` (required by `terminate_children`).
224#[hyperactor::export(handlers = [])]
225struct DrainWorker {
226    host: Option<HostAgentMode>,
227    timeout: Duration,
228    max_in_flight: usize,
229    ack: Option<PortRef<()>>,
230    done_notify: PortHandle<DrainComplete>,
231}
232
233#[async_trait]
234impl Actor for DrainWorker {
235    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
236        if let Some(host) = self.host.as_mut() {
237            match host {
238                HostAgentMode::Process { host, .. } => {
239                    host.terminate_children(
240                        this,
241                        self.timeout,
242                        self.max_in_flight.clamp(1, 256),
243                        "drain host",
244                    )
245                    .await;
246                }
247                HostAgentMode::Local(host) => {
248                    host.terminate_children(this, self.timeout, self.max_in_flight, "drain host")
249                        .await;
250                }
251            }
252        }
253
254        // Bundle host + ack into DrainComplete so the parent sends the ack
255        // AFTER restoring state (prevents race with ShutdownHost).
256        if let (Some(host), Some(ack)) = (self.host.take(), self.ack.take()) {
257            let _ = self.done_notify.post(this, DrainComplete { host, ack });
258        }
259
260        Ok(())
261    }
262}
263
264impl fmt::Debug for DrainWorker {
265    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266        f.debug_struct("DrainWorker")
267            .field("timeout", &self.timeout)
268            .field("max_in_flight", &self.max_in_flight)
269            .finish()
270    }
271}
272
273#[hyperactor::export(
274    handlers=[
275        resource::CreateOrUpdate<ProcSpec>,
276        resource::Stop,
277        resource::GetState<ProcState>,
278        resource::KeepaliveGetState<ProcState>,
279        resource::StreamState<ProcState> { cast = true },
280        resource::GetRankStatus { cast = true },
281        resource::WaitRankStatus { cast = true },
282        resource::List,
283        ShutdownHost,
284        DrainHost,
285        SetClientConfig,
286        ProcStatusChanged,
287        PySpyDump,
288        PySpyProfile,
289        ConfigDump,
290        crate::proc_agent::SelfCheck,
291    ]
292)]
293pub struct HostAgent {
294    state: HostAgentState,
295    pub(crate) created: HashMap<ResourceId, ProcCreationState>,
296    /// Pending `WaitRankStatus` waiters, keyed by resource name.
297    /// Each entry is `(min_status, rank, reply_port)`. Only touched
298    /// from `&mut self` handlers.
299    pending_proc_waiters:
300        HashMap<ResourceId, Vec<(resource::Status, usize, PortRef<crate::StatusOverlay>)>>,
301    /// Procs that already have an active bridge task watching their status.
302    watching: HashSet<ResourceId>,
303    /// Port handle for sending `ProcStatusChanged` to self. Set in `init()`.
304    proc_status_port: Option<PortHandle<ProcStatusChanged>>,
305    /// Lazily initialized ProcAgent on the host's local proc.
306    /// Boots on first [`GetLocalProc`] (LP-1 — see
307    /// `crate::host::LOCAL_PROC_NAME`).
308    local_mesh_agent: OnceLock<anyhow::Result<ActorHandle<ProcAgent>>>,
309    /// Handle to the host's frontend mailbox server, set during `init` after
310    /// `this.bind::<Self>()` ensures the handler port is registered before the
311    /// mailbox starts routing messages. Sent back to the bootstrap loop via
312    /// `shutdown_tx` when the host shuts down so the caller can
313    /// drain it.
314    mailbox_handle: Option<MailboxServerHandle>,
315}
316
317impl HostAgent {
318    /// Create a new host mesh agent running in the provided mode.
319    pub fn new(host: HostAgentMode) -> Self {
320        Self {
321            state: HostAgentState::Detached(host),
322            created: HashMap::new(),
323            pending_proc_waiters: HashMap::new(),
324            watching: HashSet::new(),
325            proc_status_port: None,
326            local_mesh_agent: OnceLock::new(),
327            mailbox_handle: None,
328        }
329    }
330
331    /// Minimum status floor derived from the host agent's lifecycle.
332    /// Procs on this host cannot be healthier than this.
333    fn min_proc_status(&self) -> resource::Status {
334        match &self.state {
335            HostAgentState::Detached(_) | HostAgentState::Attached(_) => resource::Status::Running,
336            HostAgentState::Draining => resource::Status::Stopping,
337            HostAgentState::Shutdown => resource::Status::Stopped,
338        }
339    }
340
341    fn host(&self) -> Option<&HostAgentMode> {
342        match &self.state {
343            HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
344            _ => None,
345        }
346    }
347
348    fn host_mut(&mut self) -> Option<&mut HostAgentMode> {
349        match &mut self.state {
350            HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
351            _ => None,
352        }
353    }
354
355    /// Terminate all tracked children on the host and clear proc state.
356    ///
357    /// The host, system proc, mailbox server, and HostAgent all stay
358    /// alive — only user procs are killed. After this returns the host
359    /// is ready to accept new spawn requests with the same proc names.
360    async fn drain(
361        &mut self,
362        cx: &Context<'_, Self>,
363        timeout: std::time::Duration,
364        max_in_flight: usize,
365    ) {
366        if let Some(host_mode) = self.host_mut() {
367            match host_mode {
368                HostAgentMode::Process { host, .. } => {
369                    let summary = host
370                        .terminate_children(cx, timeout, max_in_flight.clamp(1, 256), "stop host")
371                        .await;
372                    tracing::info!(?summary, "terminated children on host");
373                }
374                HostAgentMode::Local(host) => {
375                    let summary = host
376                        .terminate_children(cx, timeout, max_in_flight, "stop host")
377                        .await;
378                    tracing::info!(?summary, "terminated children on local host");
379                }
380            }
381        }
382        self.created.clear();
383    }
384
385    /// Selectively stop procs belonging to a specific host mesh.
386    /// Only procs whose `host_mesh_id` matches `filter` are stopped;
387    /// all other procs are left running.
388    async fn drain_by_mesh_name(
389        &mut self,
390        cx: &Context<'_, Self>,
391        timeout: std::time::Duration,
392        filter: Option<&HostMeshId>,
393    ) {
394        let matching_ids: Vec<ResourceId> = self
395            .created
396            .iter()
397            .filter(|(_, state)| state.host_mesh_id.as_ref() == filter)
398            .map(|(id, _)| id.clone())
399            .collect();
400
401        if let Some(host_mode) = self.host() {
402            for id in &matching_ids {
403                if let Some(ProcCreationState {
404                    created: Ok((proc_id, _)),
405                    ..
406                }) = self.created.get(id)
407                {
408                    match host_mode {
409                        HostAgentMode::Process { host, .. } => {
410                            let _ = host
411                                .terminate_proc(cx, proc_id, timeout, "selective drain")
412                                .await;
413                        }
414                        HostAgentMode::Local(host) => {
415                            let _ = host
416                                .terminate_proc(cx, proc_id, timeout, "selective drain")
417                                .await;
418                        }
419                    }
420                }
421            }
422        }
423
424        // Remove drained entries and associated state so that
425        // future spawns with the same proc names get fresh watch bridges.
426        for id in &matching_ids {
427            self.created.remove(id);
428            self.watching.remove(id);
429            self.pending_proc_waiters.remove(id);
430        }
431
432        tracing::info!(
433            count = matching_ids.len(),
434            filter = ?filter,
435            "selectively drained procs",
436        );
437    }
438
439    /// Publish the current host properties and child list for
440    /// introspection. Called from init and after each state change
441    /// (proc created/stopped).
442    fn publish_introspect_properties(&self, cx: &Instance<Self>) {
443        let host = match self.host() {
444            Some(h) => h,
445            None => return, // host shut down or stopping
446        };
447
448        let addr = host.addr().to_string();
449        let mut children: Vec<hyperactor::introspect::IntrospectRef> = Vec::new();
450        let system_children: Vec<crate::introspect::NodeRef> = Vec::new(); // LC-2
451
452        // Procs are not system — only actors are. Both service and
453        // local appear as regular children; 's' in the TUI toggles
454        // actor visibility, not proc visibility.
455        children.push(hyperactor::introspect::IntrospectRef::Proc(
456            host.system_proc().proc_addr().clone(),
457        ));
458        children.push(hyperactor::introspect::IntrospectRef::Proc(
459            host.local_proc().proc_addr().clone(),
460        ));
461
462        // User procs.
463        for state in self.created.values() {
464            if let Ok((proc_id, _agent_ref)) = &state.created {
465                children.push(hyperactor::introspect::IntrospectRef::Proc(proc_id.clone()));
466            }
467        }
468
469        let num_procs = children.len();
470
471        let mut attrs = hyperactor_config::Attrs::new();
472        attrs.set(crate::introspect::NODE_TYPE, "host".to_string());
473        attrs.set(crate::introspect::ADDR, addr);
474        attrs.set(crate::introspect::NUM_PROCS, num_procs);
475        attrs.set(hyperactor::introspect::CHILDREN, children);
476        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
477        // PD-*: hosting-process memory stats. This is the same
478        // hosting OS process signal surfaced on the proc path, but
479        // the host path does not attempt to publish proc-local queue
480        // pressure.
481        let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
482        memory.to_attrs(&mut attrs);
483        cx.publish_attrs(attrs);
484    }
485}
486
487#[async_trait]
488impl Actor for HostAgent {
489    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
490        // Serve the host now that the agent is initialized. Make sure our port is
491        // bound before serving.
492        this.bind::<Self>();
493        match self.host_mut().unwrap() {
494            HostAgentMode::Process { host, .. } => {
495                self.mailbox_handle = Some(host.serve()?);
496                let (directory, file) = hyperactor_telemetry::log_file_path(
497                    hyperactor_telemetry::env::Env::current(),
498                    None,
499                )
500                .unwrap();
501                eprintln!(
502                    "Monarch internal logs are being written to {}/{}.log; execution id {}",
503                    directory,
504                    file,
505                    hyperactor_telemetry::env::execution_id(),
506                );
507            }
508            HostAgentMode::Local(host) => {
509                host.serve()?;
510            }
511        };
512        this.set_system();
513        self.publish_introspect_properties(this);
514
515        // Register callback for QueryChild — resolves system procs
516        // that are not independently addressable actors.
517        let host = self.host().expect("host present");
518        let system_proc = host.system_proc().clone();
519        let local_proc = host.local_proc().clone();
520        let self_id = this.self_addr().clone();
521        this.set_query_child_handler(move |child_ref| {
522            use hyperactor::introspect::IntrospectResult;
523
524            let proc = match child_ref {
525                Addr::Proc(proc_ref) => {
526                    if *proc_ref == system_proc.proc_addr() {
527                        Some((&system_proc, SERVICE_PROC_NAME))
528                    } else if *proc_ref == local_proc.proc_addr() {
529                        Some((&local_proc, LOCAL_PROC_NAME))
530                    } else {
531                        None
532                    }
533                }
534                _ => None,
535            };
536
537            match proc {
538                Some((proc, label)) => {
539                    // Use all_instance_keys() instead of
540                    // all_actor_ids() to avoid holding DashMap shard
541                    // read locks while doing Weak::upgrade() +
542                    // watch::borrow() + is_terminal() per entry.
543                    // Under rapid actor churn the per-entry work in
544                    // all_actor_ids() causes convoy starvation with
545                    // concurrent insert/remove operations, stalling
546                    // the spawn/exit path. all_instance_keys() just
547                    // clones keys — microseconds per shard. Actor
548                    // addresses and the is_system check use individual
549                    // point lookups outside the iteration. Stale keys
550                    // are harmless: if the point lookup fails, the actor
551                    // has already gone away.
552                    let all_keys = proc.all_instance_keys();
553                    let mut actors: Vec<hyperactor::introspect::IntrospectRef> =
554                        Vec::with_capacity(all_keys.len());
555                    let mut system_actors: Vec<crate::introspect::NodeRef> = Vec::new();
556                    for id in all_keys {
557                        if let Some(cell) = proc.get_instance_by_id(&id) {
558                            let actor_addr = cell.actor_addr().clone();
559                            if cell.is_system() {
560                                system_actors
561                                    .push(crate::introspect::NodeRef::Actor(actor_addr.clone()));
562                            }
563                            actors.push(hyperactor::introspect::IntrospectRef::Actor(actor_addr));
564                        }
565                    }
566                    let mut attrs = hyperactor_config::Attrs::new();
567                    attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
568                    attrs.set(crate::introspect::PROC_NAME, label.to_string());
569                    attrs.set(crate::introspect::NUM_ACTORS, actors.len());
570                    attrs.set(crate::introspect::SYSTEM_CHILDREN, system_actors.clone());
571                    // PD-*: include proc debug stats so QueryChild
572                    // results carry real signal. Memory from procfs,
573                    // queue stats from the Proc's runtime accounting.
574                    let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
575                    memory.to_attrs(&mut attrs);
576                    attrs.set(
577                        crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL,
578                        proc.queue_depth_total(),
579                    );
580                    // Per-actor max from the live actor scan.
581                    let mut queue_max: u64 = 0;
582                    for aid in proc.all_instance_keys() {
583                        if let Some(cell) = proc.get_instance_by_id(&aid) {
584                            queue_max = queue_max.max(cell.queue_depth());
585                        }
586                    }
587                    attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX, queue_max);
588                    attrs.set(
589                        crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
590                        proc.queue_depth_high_water_mark(),
591                    );
592                    attrs.set(
593                        crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
594                        proc.last_nonzero_queue_depth_age_ms(),
595                    );
596                    let attrs_json =
597                        serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
598
599                    IntrospectResult {
600                        identity: hyperactor::introspect::IntrospectRef::Proc(
601                            proc.proc_addr().clone(),
602                        ),
603                        attrs: attrs_json,
604                        children: actors,
605                        parent: Some(hyperactor::introspect::IntrospectRef::Actor(
606                            self_id.clone(),
607                        )),
608                        as_of: std::time::SystemTime::now(),
609                    }
610                }
611                None => {
612                    let mut error_attrs = hyperactor_config::Attrs::new();
613                    error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
614                    error_attrs.set(
615                        hyperactor::introspect::ERROR_MESSAGE,
616                        format!("child {} not found", child_ref),
617                    );
618                    let identity = match child_ref {
619                        Addr::Proc(p) => hyperactor::introspect::IntrospectRef::Proc(p.clone()),
620                        Addr::Actor(a) => hyperactor::introspect::IntrospectRef::Actor(a.clone()),
621                        Addr::Port(p) => {
622                            hyperactor::introspect::IntrospectRef::Actor(p.actor_addr())
623                        }
624                    };
625                    IntrospectResult {
626                        identity,
627                        attrs: serde_json::to_string(&error_attrs)
628                            .unwrap_or_else(|_| "{}".to_string()),
629                        children: Vec::new(),
630                        parent: None,
631                        as_of: std::time::SystemTime::now(),
632                    }
633                }
634            }
635        });
636
637        self.proc_status_port = Some(this.port::<ProcStatusChanged>());
638
639        // Kick off the SelfCheck reaper if the orphan timeout is configured.
640        // The reaper walks `created` looking for procs whose owner stopped
641        // extending the keepalive and tears them down.
642        if let Some(delay) = hyperactor_config::global::get(crate::proc_agent::MESH_ORPHAN_TIMEOUT)
643        {
644            this.post_after(this, crate::proc_agent::SelfCheck::default(), delay);
645        }
646
647        Ok(())
648    }
649}
650
651impl fmt::Debug for HostAgent {
652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653        f.debug_struct("HostAgent")
654            .field("host", &"..")
655            .field("created", &self.created)
656            .finish()
657    }
658}
659
660#[async_trait]
661impl Handler<resource::CreateOrUpdate<ProcSpec>> for HostAgent {
662    #[tracing::instrument("HostAgent::CreateOrUpdate", level = "info", skip_all, fields(id=%create_or_update.id))]
663    async fn handle(
664        &mut self,
665        cx: &Context<Self>,
666        create_or_update: resource::CreateOrUpdate<ProcSpec>,
667    ) -> anyhow::Result<()> {
668        if self.created.contains_key(&create_or_update.id) {
669            // Already created: there is no update.
670            return Ok(());
671        }
672
673        let host = match self.host_mut() {
674            Some(h) => h,
675            None => {
676                tracing::warn!(
677                    id = %create_or_update.id,
678                    "ignoring CreateOrUpdate: HostAgent has already shut down"
679                );
680                return Ok(());
681            }
682        };
683        let created = match host {
684            HostAgentMode::Process { host, .. } => {
685                host.spawn(
686                    create_or_update.id.to_string(),
687                    BootstrapProcConfig {
688                        create_rank: create_or_update.rank.unwrap(),
689                        client_config_override: create_or_update
690                            .spec
691                            .client_config_override
692                            .clone(),
693                        proc_bind: create_or_update.spec.proc_bind.clone(),
694                        bootstrap_command: create_or_update.spec.bootstrap_command.clone(),
695                    },
696                )
697                .await
698            }
699            HostAgentMode::Local(host) => host.spawn(create_or_update.id.to_string(), ()).await,
700        };
701
702        let rank = create_or_update.rank.unwrap();
703
704        if let Err(e) = &created {
705            tracing::error!("failed to spawn proc {}: {}", create_or_update.id, e);
706        }
707        let was_empty = self.created.is_empty();
708        self.created.insert(
709            create_or_update.id.clone(),
710            ProcCreationState {
711                rank,
712                host_mesh_id: create_or_update.spec.host_mesh_id.clone(),
713                created,
714                expiry_time: None,
715            },
716        );
717
718        // Transition Detached → Attached on first proc creation.
719        if was_empty && let HostAgentState::Detached(_) = &self.state {
720            let host = match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
721                HostAgentState::Detached(h) => h,
722                _ => unreachable!(),
723            };
724            self.state = HostAgentState::Attached(host);
725        }
726
727        // If any WaitRankStatus messages arrived before this proc
728        // existed, their waiters were stashed with a sentinel rank.
729        // Now that we know the real rank, fix them up and start a
730        // watch bridge.
731        // Extract the proc_id before mutably borrowing pending_proc_waiters.
732        let proc_id = self
733            .created
734            .get(&create_or_update.id)
735            .and_then(|s| s.created.as_ref().ok())
736            .map(|(pid, _)| pid.clone());
737
738        if let Some(waiters) = self.pending_proc_waiters.get_mut(&create_or_update.id) {
739            for (_, waiter_rank, _) in waiters.iter_mut() {
740                if *waiter_rank == usize::MAX {
741                    *waiter_rank = rank;
742                }
743            }
744        }
745
746        // Start a bridge and send ourselves an initial check.
747        if self.pending_proc_waiters.contains_key(&create_or_update.id) {
748            if let Some(proc_id) = &proc_id {
749                self.start_watch_bridge(&create_or_update.id, proc_id).await;
750            }
751            self.notify_proc_status_changed(&create_or_update.id);
752        }
753
754        self.publish_introspect_properties(cx);
755        Ok(())
756    }
757}
758
759#[async_trait]
760impl Handler<resource::Stop> for HostAgent {
761    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
762        tracing::info!(
763            name = "HostMeshAgentStatus",
764            proc_id = %message.id,
765            reason = %message.reason,
766            "stopping proc"
767        );
768        let host = match self.host() {
769            Some(h) => h,
770            None => {
771                // Host already shut down; all procs are terminated.
772                tracing::debug!(
773                    proc_id = %message.id,
774                    "ignoring Stop: HostAgent has already shut down"
775                );
776                return Ok(());
777            }
778        };
779        let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
780
781        if let Some(ProcCreationState {
782            created: Ok((proc_id, _)),
783            ..
784        }) = self.created.get(&message.id)
785        {
786            host.request_stop(cx, proc_id, timeout, &message.reason)
787                .await;
788        }
789
790        // Status may have changed to Stopping; notify pending waiters.
791        self.notify_proc_status_changed(&message.id);
792
793        self.publish_introspect_properties(cx);
794        Ok(())
795    }
796}
797
798#[async_trait]
799impl Handler<resource::GetRankStatus> for HostAgent {
800    async fn handle(
801        &mut self,
802        cx: &Context<Self>,
803        get_rank_status: resource::GetRankStatus,
804    ) -> anyhow::Result<()> {
805        use crate::StatusOverlay;
806        use crate::resource::Status;
807
808        let (rank, status) = match self.created.get(&get_rank_status.id) {
809            Some(ProcCreationState {
810                rank,
811                created: Ok((proc_id, _mesh_agent)),
812                ..
813            }) => {
814                let raw_status = match self.host() {
815                    Some(host) => host.proc_status(proc_id).await.0,
816                    None => resource::Status::Unknown,
817                };
818                (*rank, raw_status.clamp_min(self.min_proc_status()))
819            }
820            Some(ProcCreationState {
821                rank,
822                created: Err(e),
823                ..
824            }) => (*rank, Status::Failed(e.to_string())),
825            None => (usize::MAX, Status::NotExist),
826        };
827
828        let overlay = if rank == usize::MAX {
829            StatusOverlay::new()
830        } else {
831            StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
832                .expect("valid single-run overlay")
833        };
834        get_rank_status.reply.post(cx, overlay);
835        Ok(())
836    }
837}
838
839#[async_trait]
840impl Handler<resource::WaitRankStatus> for HostAgent {
841    async fn handle(
842        &mut self,
843        cx: &Context<Self>,
844        msg: resource::WaitRankStatus,
845    ) -> anyhow::Result<()> {
846        use crate::StatusOverlay;
847        use crate::resource::Status;
848
849        match self.created.get(&msg.id) {
850            Some(ProcCreationState {
851                rank,
852                created: Ok((proc_id, _)),
853                ..
854            }) => {
855                let rank = *rank;
856                let status = match self.host() {
857                    Some(host) => host.proc_status(proc_id).await.0,
858                    None => Status::Stopped,
859                };
860
861                // If already at or past the requested threshold, reply immediately.
862                if status >= msg.min_status {
863                    let overlay = StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
864                        .expect("valid single-run overlay");
865                    let _ = msg.reply.post(cx, overlay);
866                    return Ok(());
867                }
868
869                // Stash the waiter and start a bridge if we don't have one yet.
870                self.pending_proc_waiters
871                    .entry(msg.id.clone())
872                    .or_default()
873                    .push((msg.min_status, rank, msg.reply));
874
875                let proc_id = proc_id.clone();
876                self.start_watch_bridge(&msg.id, &proc_id).await;
877            }
878            Some(ProcCreationState {
879                rank,
880                created: Err(e),
881                ..
882            }) => {
883                // Creation failed — reply immediately with Failed status.
884                let overlay = StatusOverlay::try_from_runs(vec![(
885                    *rank..(*rank + 1),
886                    Status::Failed(e.to_string()),
887                )])
888                .expect("valid single-run overlay");
889                let _ = msg.reply.post(cx, overlay);
890            }
891            None => {
892                // Proc doesn't exist yet. Stash the waiter with a
893                // sentinel rank; CreateOrUpdate will fill it in and
894                // start the watch bridge.
895                self.pending_proc_waiters
896                    .entry(msg.id.clone())
897                    .or_default()
898                    .push((msg.min_status, usize::MAX, msg.reply));
899            }
900        }
901
902        Ok(())
903    }
904}
905
906#[async_trait]
907impl Handler<ProcStatusChanged> for HostAgent {
908    async fn handle(&mut self, cx: &Context<Self>, msg: ProcStatusChanged) -> anyhow::Result<()> {
909        use crate::StatusOverlay;
910        use crate::resource::Status;
911
912        let status = match self.created.get(&msg.id) {
913            Some(ProcCreationState {
914                created: Ok((proc_id, _)),
915                ..
916            }) => match self.host() {
917                Some(host) => host.proc_status(proc_id).await.0,
918                None => Status::Stopped,
919            },
920            Some(ProcCreationState {
921                created: Err(_), ..
922            }) => {
923                // Already replied with Failed when they were stashed.
924                return Ok(());
925            }
926            None => {
927                // Proc not created yet, nothing to flush.
928                return Ok(());
929            }
930        };
931
932        let Some(waiters) = self.pending_proc_waiters.get_mut(&msg.id) else {
933            return Ok(());
934        };
935
936        let remaining = std::mem::take(waiters);
937        for (min_status, rank, reply) in remaining {
938            if status >= min_status {
939                let overlay =
940                    StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
941                        .expect("valid single-run overlay");
942                let _ = reply.post(cx, overlay);
943            } else {
944                waiters.push((min_status, rank, reply));
945            }
946        }
947
948        if waiters.is_empty() {
949            self.pending_proc_waiters.remove(&msg.id);
950        }
951
952        Ok(())
953    }
954}
955
956impl HostAgent {
957    /// Send a `ProcStatusChanged` self-notification for the given resource id.
958    fn notify_proc_status_changed(&self, id: &ResourceId) {
959        if let Some(port) = &self.proc_status_port {
960            let client = Instance::<()>::self_client();
961            let _ = port.post(client, ProcStatusChanged { id: id.clone() });
962        }
963    }
964
965    /// Start a bridge task that watches a proc's status channel and sends
966    /// `ProcStatusChanged` to self on each change. At most one bridge per proc.
967    async fn start_watch_bridge(&mut self, id: &ResourceId, proc_id: &ProcAddr) {
968        if self.watching.contains(id) {
969            return;
970        }
971        self.watching.insert(id.clone());
972
973        let port = match &self.proc_status_port {
974            Some(p) => p.clone(),
975            None => return,
976        };
977
978        match self.host() {
979            Some(HostAgentMode::Process { host, .. }) => {
980                if let Some(rx) = host.manager().watch(proc_id).await {
981                    start_proc_watch(port, rx, id.clone(), |s| s.clone().into());
982                }
983            }
984            Some(HostAgentMode::Local(host)) => {
985                if let Some(rx) = host.manager().watch(proc_id).await {
986                    start_proc_watch(port, rx, id.clone(), |s| (*s).into());
987                }
988            }
989            None => {}
990        }
991    }
992}
993
994/// Spawn a bridge task that watches a proc's status channel and sends
995/// `ProcStatusChanged` to the actor via the given `PortHandle`.
996fn start_proc_watch<S>(
997    port: PortHandle<ProcStatusChanged>,
998    mut rx: tokio::sync::watch::Receiver<S>,
999    id: ResourceId,
1000    to_status: impl Fn(&S) -> resource::Status + Send + 'static,
1001) where
1002    S: Send + Sync + 'static,
1003{
1004    // TODO: replace Instance::self_client() with a proper mechanism
1005    // for sending to port handles without an actor context.
1006    let client = Instance::<()>::self_client();
1007    tokio::spawn(async move {
1008        loop {
1009            match rx.changed().await {
1010                Ok(()) => {
1011                    let status = to_status(&*rx.borrow());
1012                    let terminated = status.is_terminated();
1013                    let _ = port.post(client, ProcStatusChanged { id: id.clone() });
1014                    if terminated {
1015                        return;
1016                    }
1017                }
1018                Err(_) => {
1019                    let _ = port.post(client, ProcStatusChanged { id: id.clone() });
1020                    return;
1021                }
1022            }
1023        }
1024    });
1025}
1026
1027#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1028pub struct ShutdownHost {
1029    /// Grace window: send SIGTERM and wait this long before
1030    /// escalating.
1031    pub timeout: std::time::Duration,
1032    /// Max number of children to terminate concurrently on this host.
1033    pub max_in_flight: usize,
1034    /// Ack that the agent finished shutdown work (best-effort).
1035    #[reply]
1036    pub ack: hyperactor::PortRef<()>,
1037}
1038wirevalue::register_type!(ShutdownHost);
1039
1040/// Drain user procs on this host but keep the host, service proc,
1041/// and networking alive. Used during mesh stop/shutdown so that
1042/// forwarder flushes can still reach remote hosts.
1043///
1044/// If `host_mesh_id` is `Some`, only procs belonging to that mesh
1045/// are stopped (selective drain). If `None`, all procs are
1046/// terminated (full drain).
1047#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1048pub struct DrainHost {
1049    pub timeout: std::time::Duration,
1050    pub max_in_flight: usize,
1051    pub host_mesh_id: Option<HostMeshId>,
1052    #[reply]
1053    pub ack: hyperactor::PortRef<()>,
1054}
1055wirevalue::register_type!(DrainHost);
1056
1057#[async_trait]
1058impl Handler<DrainHost> for HostAgent {
1059    async fn handle(&mut self, cx: &Context<Self>, msg: DrainHost) -> anyhow::Result<()> {
1060        if msg.host_mesh_id.is_some() {
1061            // Selective drain: stop only procs belonging to the named mesh.
1062            self.drain_by_mesh_name(cx, msg.timeout, msg.host_mesh_id.as_ref())
1063                .await;
1064            msg.ack.post(cx, ());
1065            return Ok(());
1066        }
1067
1068        // Full drain: terminate all children.
1069        let host = match std::mem::replace(&mut self.state, HostAgentState::Draining) {
1070            HostAgentState::Attached(h) => h,
1071            other @ (HostAgentState::Detached(_) | HostAgentState::Draining) => {
1072                // Nothing to drain — ack immediately.
1073                self.state = other;
1074                msg.ack.post(cx, ());
1075                return Ok(());
1076            }
1077            HostAgentState::Shutdown => {
1078                self.state = HostAgentState::Shutdown;
1079                msg.ack.post(cx, ());
1080                return Ok(());
1081            }
1082        };
1083
1084        // Do NOT clear `self.created` here: the DrainWorker
1085        // terminates procs asynchronously, and concurrent GetState /
1086        // GetRankStatus queries must still find the entries. With the
1087        // host in Draining state (`self.host()` returns None), those
1088        // handlers already report Status::Stopped for every known
1089        // proc, which is the correct answer while draining is
1090        // in progress.
1091
1092        let done_port = cx.port::<DrainComplete>();
1093
1094        cx.spawn_with_name(
1095            "drain_worker",
1096            DrainWorker {
1097                host: Some(host),
1098                timeout: msg.timeout,
1099                max_in_flight: msg.max_in_flight,
1100                ack: Some(msg.ack),
1101                done_notify: done_port,
1102            },
1103        )?;
1104
1105        Ok(())
1106    }
1107}
1108
1109#[async_trait]
1110impl Handler<DrainComplete> for HostAgent {
1111    async fn handle(&mut self, cx: &Context<Self>, msg: DrainComplete) -> anyhow::Result<()> {
1112        self.state = HostAgentState::Detached(msg.host);
1113        self.created.clear();
1114        msg.ack.post(cx, ());
1115        Ok(())
1116    }
1117}
1118
1119#[async_trait]
1120impl Handler<ShutdownHost> for HostAgent {
1121    async fn handle(&mut self, cx: &Context<Self>, msg: ShutdownHost) -> anyhow::Result<()> {
1122        // Terminate children BEFORE acking, so the caller's networking
1123        // stays alive while children flush their forwarders during
1124        // teardown. If we ack first, the caller proceeds to tear down
1125        // the host proc's networking while children are still running,
1126        // causing their forwarder flushes to hang until
1127        // MESSAGE_DELIVERY_TIMEOUT expires.
1128        if !self.created.is_empty() {
1129            self.drain(cx, msg.timeout, msg.max_in_flight).await;
1130        }
1131
1132        // Ack after children are terminated so the caller does not
1133        // tear down the host's networking prematurely.
1134        msg.ack.post(cx, ());
1135
1136        // Drop the host and signal the bootstrap loop to drain the
1137        // mailbox and exit.
1138        match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
1139            HostAgentState::Detached(HostAgentMode::Process {
1140                shutdown_tx: Some(tx),
1141                ..
1142            })
1143            | HostAgentState::Attached(HostAgentMode::Process {
1144                shutdown_tx: Some(tx),
1145                ..
1146            }) => {
1147                tracing::info!(
1148                    proc_id = %cx.self_addr().proc_addr(),
1149                    actor_id = %cx.self_addr(),
1150                    "host is shut down, sending mailbox handle to bootstrap for draining"
1151                );
1152                if let Some(handle) = self.mailbox_handle.take() {
1153                    let _ = tx.send(handle);
1154                }
1155            }
1156            _ => {}
1157        }
1158
1159        Ok(())
1160    }
1161}
1162
1163#[derive(
1164    Debug,
1165    Clone,
1166    PartialEq,
1167    Eq,
1168    Named,
1169    Serialize,
1170    Deserialize,
1171    hyperactor::Bind,
1172    hyperactor::Unbind
1173)]
1174pub struct ProcState {
1175    pub proc_id: ProcAddr,
1176    pub create_rank: usize,
1177    pub mesh_agent: ActorRef<ProcAgent>,
1178    pub bootstrap_command: Option<BootstrapCommand>,
1179    pub proc_status: Option<bootstrap::ProcStatus>,
1180}
1181wirevalue::register_type!(ProcState);
1182
1183#[async_trait]
1184impl Handler<resource::GetState<ProcState>> for HostAgent {
1185    async fn handle(
1186        &mut self,
1187        cx: &Context<Self>,
1188        get_state: resource::GetState<ProcState>,
1189    ) -> anyhow::Result<()> {
1190        let state = match self.created.get(&get_state.id) {
1191            Some(ProcCreationState {
1192                rank,
1193                created: Ok((proc_id, mesh_agent)),
1194                ..
1195            }) => {
1196                let (raw_status, proc_status, bootstrap_command) = match self.host() {
1197                    Some(host) => {
1198                        let (status, proc_status) = host.proc_status(proc_id).await;
1199                        (status, proc_status, host.bootstrap_command())
1200                    }
1201                    None => (resource::Status::Unknown, None, None),
1202                };
1203                let status = raw_status.clamp_min(self.min_proc_status());
1204                resource::State {
1205                    id: get_state.id.clone(),
1206                    status,
1207                    state: Some(ProcState {
1208                        proc_id: proc_id.clone(),
1209                        create_rank: *rank,
1210                        mesh_agent: mesh_agent.clone(),
1211                        bootstrap_command,
1212                        proc_status,
1213                    }),
1214                    generation: 0,
1215                    timestamp: std::time::SystemTime::now(),
1216                }
1217            }
1218            Some(ProcCreationState {
1219                created: Err(e), ..
1220            }) => resource::State {
1221                id: get_state.id.clone(),
1222                status: resource::Status::Failed(e.to_string()),
1223                state: None,
1224                generation: 0,
1225                timestamp: std::time::SystemTime::now(),
1226            },
1227            None => resource::State {
1228                id: get_state.id.clone(),
1229                status: resource::Status::NotExist,
1230                state: None,
1231                generation: 0,
1232                timestamp: std::time::SystemTime::now(),
1233            },
1234        };
1235
1236        get_state.reply.post(cx, state);
1237        Ok(())
1238    }
1239}
1240
1241#[async_trait]
1242impl Handler<crate::proc_agent::SelfCheck> for HostAgent {
1243    async fn handle(
1244        &mut self,
1245        cx: &Context<Self>,
1246        _: crate::proc_agent::SelfCheck,
1247    ) -> anyhow::Result<()> {
1248        // Walk procs and tear down any whose owner-supplied keepalive has
1249        // lapsed. Mirrors the proc-agent reaper but at host scope: we
1250        // address the same problem (a controller/client died abruptly)
1251        // for proc-level cleanup so the host doesn't leak children.
1252        let Some(duration) = hyperactor_config::global::get(crate::proc_agent::MESH_ORPHAN_TIMEOUT)
1253        else {
1254            return Ok(());
1255        };
1256        let now = std::time::SystemTime::now();
1257        let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
1258
1259        let expired: Vec<ResourceId> = self
1260            .created
1261            .iter()
1262            .filter_map(|(id, state)| {
1263                let expiry = state.expiry_time?;
1264                if now > expiry { Some(id.clone()) } else { None }
1265            })
1266            .collect();
1267
1268        if !expired.is_empty() {
1269            tracing::info!(
1270                "stopping {} orphaned procs past their keepalive expiry",
1271                expired.len(),
1272            );
1273        }
1274
1275        for id in expired {
1276            if let Some(ProcCreationState {
1277                created: Ok((proc_id, _)),
1278                ..
1279            }) = self.created.get(&id)
1280            {
1281                let proc_id = proc_id.clone();
1282                if let Some(host) = self.host() {
1283                    host.request_stop(cx, &proc_id, timeout, "orphaned").await;
1284                }
1285                // Don't reap repeatedly while teardown is in flight.
1286                if let Some(state) = self.created.get_mut(&id) {
1287                    state.expiry_time = None;
1288                }
1289            }
1290        }
1291
1292        cx.post_after(cx, crate::proc_agent::SelfCheck::default(), duration);
1293        Ok(())
1294    }
1295}
1296
1297#[async_trait]
1298impl Handler<resource::List> for HostAgent {
1299    async fn handle(&mut self, cx: &Context<Self>, list: resource::List) -> anyhow::Result<()> {
1300        list.reply.post(cx, self.created.keys().cloned().collect());
1301        Ok(())
1302    }
1303}
1304
1305#[async_trait]
1306impl Handler<resource::KeepaliveGetState<ProcState>> for HostAgent {
1307    async fn handle(
1308        &mut self,
1309        cx: &Context<Self>,
1310        message: resource::KeepaliveGetState<ProcState>,
1311    ) -> anyhow::Result<()> {
1312        // Record the new expiry so the periodic SelfCheck reaper knows the
1313        // owner is still alive. If the owner stops extending the keepalive
1314        // (e.g. its process dies abruptly), the proc will be reaped past
1315        // `expires_after`.
1316        if let Some(state) = self.created.get_mut(&message.get_state.id) {
1317            state.expiry_time = Some(message.expires_after);
1318        }
1319        <Self as Handler<resource::GetState<ProcState>>>::handle(self, cx, message.get_state).await
1320    }
1321}
1322
1323#[async_trait]
1324impl Handler<resource::StreamState<ProcState>> for HostAgent {
1325    async fn handle(
1326        &mut self,
1327        cx: &Context<Self>,
1328        stream_state: resource::StreamState<ProcState>,
1329    ) -> anyhow::Result<()> {
1330        // TODO: register `subscriber` for ongoing updates. For now send the
1331        // current state once so the controller has an initial snapshot.
1332        let state = match self.created.get(&stream_state.id) {
1333            Some(ProcCreationState {
1334                rank,
1335                created: Ok((proc_id, mesh_agent)),
1336                ..
1337            }) => {
1338                let (raw_status, proc_status, bootstrap_command) = match self.host() {
1339                    Some(host) => {
1340                        let (status, proc_status) = host.proc_status(proc_id).await;
1341                        (status, proc_status, host.bootstrap_command())
1342                    }
1343                    None => (resource::Status::Unknown, None, None),
1344                };
1345                let status = raw_status.clamp_min(self.min_proc_status());
1346                resource::State {
1347                    id: stream_state.id.clone(),
1348                    status,
1349                    state: Some(ProcState {
1350                        proc_id: proc_id.clone(),
1351                        create_rank: *rank,
1352                        mesh_agent: mesh_agent.clone(),
1353                        bootstrap_command,
1354                        proc_status,
1355                    }),
1356                    generation: 0,
1357                    timestamp: std::time::SystemTime::now(),
1358                }
1359            }
1360            Some(ProcCreationState {
1361                created: Err(e), ..
1362            }) => resource::State {
1363                id: stream_state.id.clone(),
1364                status: resource::Status::Failed(e.to_string()),
1365                state: None,
1366                generation: 0,
1367                timestamp: std::time::SystemTime::now(),
1368            },
1369            None => resource::State {
1370                id: stream_state.id.clone(),
1371                status: resource::Status::NotExist,
1372                state: None,
1373                generation: 0,
1374                timestamp: std::time::SystemTime::now(),
1375            },
1376        };
1377
1378        let mut headers = Flattrs::new();
1379        headers.set(crate::proc_agent::STREAM_STATE_SUBSCRIBER, true);
1380        stream_state
1381            .subscriber
1382            .post_with_headers(cx, headers, state);
1383        Ok(())
1384    }
1385}
1386
1387/// Push client configuration overrides to this host agent's process.
1388///
1389/// The attrs are installed as `Source::ClientOverride` (lowest explicit
1390/// priority), so the host's own env vars and file config take precedence.
1391/// This message is idempotent — sending the same attrs twice replaces
1392/// the layer wholesale.
1393///
1394/// Request-reply: the reply acts as a barrier confirming the config
1395/// is installed. The fatal-on-failure / best-effort policy is the
1396/// caller's contract, not this message's; for the canonical
1397/// attach-time contract see the HM-* invariants in `host_mesh.rs`.
1398#[derive(Debug, Named, Handler, RefClient, HandleClient, Serialize, Deserialize)]
1399pub struct SetClientConfig {
1400    pub attrs: Attrs,
1401    #[reply]
1402    pub done: PortRef<()>,
1403}
1404wirevalue::register_type!(SetClientConfig);
1405
1406#[async_trait]
1407impl Handler<SetClientConfig> for HostAgent {
1408    async fn handle(&mut self, cx: &Context<Self>, msg: SetClientConfig) -> anyhow::Result<()> {
1409        // Use `set` (not `create_or_merge`) because `push_config` always
1410        // sends a complete `propagatable_attrs()` snapshot. Replacing the
1411        // layer wholesale is intentional and idempotent.
1412        hyperactor_config::global::set(
1413            hyperactor_config::global::Source::ClientOverride,
1414            msg.attrs,
1415        );
1416        tracing::debug!("installed client config override on host agent");
1417        msg.done.post(cx, ());
1418        Ok(())
1419    }
1420}
1421
1422/// Boot the ProcAgent on the host's local proc (LP-1).
1423///
1424/// The local proc starts empty; this message activates it by spawning
1425/// a `ProcAgent` (once, via `OnceLock`). Called by
1426/// `monarch_hyperactor::bootstrap_host` when setting up the Python
1427/// `this_proc()` singleton.
1428///
1429/// See also: `crate::host::LOCAL_PROC_NAME`.
1430#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1431pub struct GetLocalProc {
1432    #[reply]
1433    pub proc_mesh_agent: PortHandle<ActorHandle<ProcAgent>>,
1434}
1435
1436#[async_trait]
1437impl Handler<GetLocalProc> for HostAgent {
1438    async fn handle(
1439        &mut self,
1440        cx: &Context<Self>,
1441        GetLocalProc { proc_mesh_agent }: GetLocalProc,
1442    ) -> anyhow::Result<()> {
1443        let host = self
1444            .host()
1445            .ok_or_else(|| anyhow::anyhow!("HostAgent has already shut down"))?;
1446        let agent = self
1447            .local_mesh_agent
1448            .get_or_init(|| ProcAgent::boot_v1(host.local_proc().clone(), None));
1449
1450        match agent {
1451            Err(e) => anyhow::bail!("error booting local proc: {}", e),
1452            Ok(agent) => proc_mesh_agent.post(cx, agent.clone()),
1453        };
1454
1455        Ok(())
1456    }
1457}
1458
1459#[async_trait]
1460impl Handler<PySpyDump> for HostAgent {
1461    async fn handle(
1462        &mut self,
1463        cx: &Context<Self>,
1464        message: PySpyDump,
1465    ) -> Result<(), anyhow::Error> {
1466        PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
1467    }
1468}
1469
1470#[async_trait]
1471impl Handler<PySpyProfile> for HostAgent {
1472    async fn handle(
1473        &mut self,
1474        cx: &Context<Self>,
1475        message: PySpyProfile,
1476    ) -> Result<(), anyhow::Error> {
1477        PySpyProfileWorker::spawn_and_forward(cx, message.request, message.result)
1478    }
1479}
1480
1481#[async_trait]
1482impl Handler<ConfigDump> for HostAgent {
1483    async fn handle(
1484        &mut self,
1485        cx: &Context<Self>,
1486        message: ConfigDump,
1487    ) -> Result<(), anyhow::Error> {
1488        let entries = hyperactor_config::global::config_entries();
1489        message.result.post(cx, ConfigDumpResult { entries });
1490        Ok(())
1491    }
1492}
1493
1494#[cfg(all(test, fbcode_build))]
1495mod tests {
1496    use std::assert_matches;
1497
1498    use hyperactor::ActorAddr;
1499    use hyperactor::Proc;
1500    use hyperactor::channel::ChannelTransport;
1501    use hyperactor::id::Label;
1502
1503    use super::*;
1504    use crate::bootstrap::ProcStatus;
1505    use crate::mesh_id::ResourceId;
1506    use crate::resource::CreateOrUpdateClient;
1507    use crate::resource::GetStateClient;
1508    use crate::resource::WaitRankStatusClient;
1509
1510    #[tokio::test]
1511    async fn test_basic() {
1512        let host = Host::new(
1513            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1514            ChannelTransport::Unix.any(),
1515        )
1516        .await
1517        .unwrap();
1518
1519        let host_addr = host.addr().clone();
1520        let system_proc = host.system_proc().clone();
1521        let host_agent = system_proc
1522            .spawn(
1523                HOST_MESH_AGENT_ACTOR_NAME,
1524                HostAgent::new(HostAgentMode::Process {
1525                    host,
1526                    shutdown_tx: None,
1527                }),
1528            )
1529            .unwrap();
1530
1531        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1532        let (client, _client_handle) = client_proc.client("client").unwrap();
1533
1534        let id = ResourceId::instance(Label::new("proc1").unwrap());
1535
1536        // First, create the proc, then query its state:
1537
1538        host_agent
1539            .create_or_update(
1540                &client,
1541                id.clone(),
1542                resource::Rank::new(0),
1543                ProcSpec::default(),
1544            )
1545            .await
1546            .unwrap();
1547        assert_matches!(
1548            host_agent.get_state(&client, id.clone()).await.unwrap(),
1549            resource::State {
1550                id: resource_id,
1551                status: resource::Status::Running,
1552                state: Some(ProcState {
1553                    // The proc itself should be direct addressed, with its name directly.
1554                    proc_id,
1555                    // The mesh agent should run in the same proc, under the name
1556                    // "proc_agent".
1557                    mesh_agent,
1558                    bootstrap_command,
1559                    proc_status: Some(ProcStatus::Ready { started_at: _, addr: _, agent: proc_status_mesh_agent}),
1560                    ..
1561                }),
1562                ..
1563            } if id == resource_id
1564              && proc_id == id.proc_addr(host_addr.clone())
1565              && mesh_agent == ActorRef::attest(id.proc_addr(host_addr.clone()).actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME)) && bootstrap_command == Some(BootstrapCommand::test())
1566              && mesh_agent == proc_status_mesh_agent
1567        );
1568    }
1569
1570    /// WaitRankStatus on a running proc replies immediately with Running.
1571    #[tokio::test]
1572    async fn test_wait_rank_status_already_running() {
1573        let host = Host::new(
1574            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1575            ChannelTransport::Unix.any(),
1576        )
1577        .await
1578        .unwrap();
1579
1580        let system_proc = host.system_proc().clone();
1581        let host_agent = system_proc
1582            .spawn(
1583                HOST_MESH_AGENT_ACTOR_NAME,
1584                HostAgent::new(HostAgentMode::Process {
1585                    host,
1586                    shutdown_tx: None,
1587                }),
1588            )
1589            .unwrap();
1590
1591        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1592        let (client, _client_handle) = client_proc.client("client").unwrap();
1593
1594        let id = ResourceId::instance(Label::new("proc1").unwrap());
1595        host_agent
1596            .create_or_update(
1597                &client,
1598                id.clone(),
1599                resource::Rank::new(0),
1600                ProcSpec::default(),
1601            )
1602            .await
1603            .unwrap();
1604
1605        // Proc is Running; wait for Running should reply immediately.
1606        let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1607        host_agent
1608            .wait_rank_status(&client, id, resource::Status::Running, port.bind())
1609            .await
1610            .unwrap();
1611
1612        let overlay = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1613            .await
1614            .expect("reply timed out")
1615            .expect("reply channel closed");
1616        assert!(!overlay.is_empty(), "expected non-empty overlay");
1617    }
1618
1619    /// WaitRankStatus for Stopped, then stop the proc — reply should
1620    /// arrive only after the proc actually stops.
1621    #[tokio::test]
1622    async fn test_wait_rank_status_stop() {
1623        let host = Host::new(
1624            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1625            ChannelTransport::Unix.any(),
1626        )
1627        .await
1628        .unwrap();
1629
1630        let system_proc = host.system_proc().clone();
1631        let host_agent = system_proc
1632            .spawn(
1633                HOST_MESH_AGENT_ACTOR_NAME,
1634                HostAgent::new(HostAgentMode::Process {
1635                    host,
1636                    shutdown_tx: None,
1637                }),
1638            )
1639            .unwrap();
1640
1641        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1642        let (client, _client_handle) = client_proc.client("client").unwrap();
1643
1644        let id = ResourceId::instance(Label::new("proc1").unwrap());
1645        host_agent
1646            .create_or_update(
1647                &client,
1648                id.clone(),
1649                resource::Rank::new(0),
1650                ProcSpec::default(),
1651            )
1652            .await
1653            .unwrap();
1654
1655        // Wait for Stopped — should not reply yet.
1656        let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1657        host_agent
1658            .wait_rank_status(&client, id.clone(), resource::Status::Stopped, port.bind())
1659            .await
1660            .unwrap();
1661
1662        // Stop the proc.
1663        crate::resource::StopClient::stop(&host_agent, &client, id, "test".to_string())
1664            .await
1665            .unwrap();
1666
1667        // Now the reply should arrive.
1668        let overlay = tokio::time::timeout(Duration::from_secs(30), rx.recv())
1669            .await
1670            .expect("reply timed out — proc did not reach Stopped")
1671            .expect("reply channel closed");
1672        assert!(!overlay.is_empty(), "expected non-empty overlay");
1673    }
1674
1675    /// WaitRankStatus sent before the proc is created — the waiter is
1676    /// stashed and replied to once CreateOrUpdate runs.
1677    #[tokio::test]
1678    async fn test_wait_rank_status_before_proc_exists() {
1679        let host = Host::new(
1680            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1681            ChannelTransport::Unix.any(),
1682        )
1683        .await
1684        .unwrap();
1685
1686        let system_proc = host.system_proc().clone();
1687        let host_agent = system_proc
1688            .spawn(
1689                HOST_MESH_AGENT_ACTOR_NAME,
1690                HostAgent::new(HostAgentMode::Process {
1691                    host,
1692                    shutdown_tx: None,
1693                }),
1694            )
1695            .unwrap();
1696
1697        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1698        let (client, _client_handle) = client_proc.client("client").unwrap();
1699
1700        let id = ResourceId::instance(Label::new("proc1").unwrap());
1701
1702        // Wait for Running on a proc that doesn't exist yet.
1703        let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1704        host_agent
1705            .wait_rank_status(&client, id.clone(), resource::Status::Running, port.bind())
1706            .await
1707            .unwrap();
1708
1709        // Now create the proc — the stashed waiter should get its
1710        // sentinel rank fixed and be flushed once the proc is Running.
1711        host_agent
1712            .create_or_update(&client, id, resource::Rank::new(0), ProcSpec::default())
1713            .await
1714            .unwrap();
1715
1716        let overlay = tokio::time::timeout(Duration::from_secs(10), rx.recv())
1717            .await
1718            .expect("reply timed out — waiter was not flushed after CreateOrUpdate")
1719            .expect("reply channel closed");
1720        assert!(!overlay.is_empty(), "expected non-empty overlay");
1721    }
1722
1723    /// DrainHost with a host_mesh_id filter only stops procs
1724    /// belonging to that mesh; procs from other meshes are unaffected.
1725    #[tokio::test]
1726    async fn test_drain_scoped_to_host_mesh_id() {
1727        let host = Host::new(
1728            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1729            ChannelTransport::Unix.any(),
1730        )
1731        .await
1732        .unwrap();
1733
1734        let system_proc = host.system_proc().clone();
1735        let host_agent = system_proc
1736            .spawn(
1737                HOST_MESH_AGENT_ACTOR_NAME,
1738                HostAgent::new(HostAgentMode::Process {
1739                    host,
1740                    shutdown_tx: None,
1741                }),
1742            )
1743            .unwrap();
1744
1745        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1746        let (client, _client_handle) = client_proc.client("client").unwrap();
1747
1748        let mesh_a = HostMeshId::instance(Label::new("mesh-a").unwrap());
1749        let mesh_b = HostMeshId::instance(Label::new("mesh-b").unwrap());
1750        let proc_a_id = ResourceId::instance(Label::new("proc-a").unwrap());
1751        let proc_b_id = ResourceId::instance(Label::new("proc-b").unwrap());
1752
1753        // Create proc_a belonging to mesh_a.
1754        let spec_a = ProcSpec {
1755            host_mesh_id: Some(mesh_a.clone()),
1756            ..Default::default()
1757        };
1758        host_agent
1759            .create_or_update(&client, proc_a_id.clone(), resource::Rank::new(0), spec_a)
1760            .await
1761            .unwrap();
1762
1763        // Create proc_b belonging to mesh_b.
1764        let spec_b = ProcSpec {
1765            host_mesh_id: Some(mesh_b.clone()),
1766            ..Default::default()
1767        };
1768        host_agent
1769            .create_or_update(&client, proc_b_id.clone(), resource::Rank::new(1), spec_b)
1770            .await
1771            .unwrap();
1772
1773        // Both should be Running.
1774        assert_matches!(
1775            host_agent
1776                .get_state(&client, proc_a_id.clone())
1777                .await
1778                .unwrap(),
1779            resource::State {
1780                status: resource::Status::Running,
1781                ..
1782            }
1783        );
1784        assert_matches!(
1785            host_agent
1786                .get_state(&client, proc_b_id.clone())
1787                .await
1788                .unwrap(),
1789            resource::State {
1790                status: resource::Status::Running,
1791                ..
1792            }
1793        );
1794
1795        // Drain only mesh_a.
1796        host_agent
1797            .drain_host(&client, Duration::from_secs(5), 16, Some(mesh_a.clone()))
1798            .await
1799            .unwrap();
1800
1801        // proc_a should be gone (removed from created).
1802        assert_matches!(
1803            host_agent
1804                .get_state(&client, proc_a_id.clone())
1805                .await
1806                .unwrap(),
1807            resource::State {
1808                status: resource::Status::NotExist,
1809                ..
1810            }
1811        );
1812
1813        // proc_b should still be Running.
1814        assert_matches!(
1815            host_agent
1816                .get_state(&client, proc_b_id.clone())
1817                .await
1818                .unwrap(),
1819            resource::State {
1820                status: resource::Status::Running,
1821                ..
1822            }
1823        );
1824    }
1825
1826    /// DrainHost with host_mesh_id=None drains all procs regardless
1827    /// of their mesh affiliation (backwards compatibility).
1828    #[tokio::test]
1829    async fn test_drain_none_drains_all() {
1830        let host = Host::new(
1831            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1832            ChannelTransport::Unix.any(),
1833        )
1834        .await
1835        .unwrap();
1836
1837        let system_proc = host.system_proc().clone();
1838        let host_agent = system_proc
1839            .spawn(
1840                HOST_MESH_AGENT_ACTOR_NAME,
1841                HostAgent::new(HostAgentMode::Process {
1842                    host,
1843                    shutdown_tx: None,
1844                }),
1845            )
1846            .unwrap();
1847
1848        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1849        let (client, _client_handle) = client_proc.client("client").unwrap();
1850
1851        let mesh_a = HostMeshId::instance(Label::new("mesh-a").unwrap());
1852        let mesh_b = HostMeshId::instance(Label::new("mesh-b").unwrap());
1853        let proc_a_id = ResourceId::instance(Label::new("proc-a").unwrap());
1854        let proc_b_id = ResourceId::instance(Label::new("proc-b").unwrap());
1855
1856        let spec_a = ProcSpec {
1857            host_mesh_id: Some(mesh_a),
1858            ..Default::default()
1859        };
1860        host_agent
1861            .create_or_update(&client, proc_a_id.clone(), resource::Rank::new(0), spec_a)
1862            .await
1863            .unwrap();
1864
1865        let spec_b = ProcSpec {
1866            host_mesh_id: Some(mesh_b),
1867            ..Default::default()
1868        };
1869        host_agent
1870            .create_or_update(&client, proc_b_id.clone(), resource::Rank::new(1), spec_b)
1871            .await
1872            .unwrap();
1873
1874        // Drain all (no filter).
1875        host_agent
1876            .drain_host(&client, Duration::from_secs(5), 16, None)
1877            .await
1878            .unwrap();
1879
1880        // Both should be gone.
1881        assert_matches!(
1882            host_agent.get_state(&client, proc_a_id).await.unwrap(),
1883            resource::State {
1884                status: resource::Status::NotExist,
1885                ..
1886            }
1887        );
1888        assert_matches!(
1889            host_agent.get_state(&client, proc_b_id).await.unwrap(),
1890            resource::State {
1891                status: resource::Status::NotExist,
1892                ..
1893            }
1894        );
1895    }
1896
1897    // PD-6/PD-8 regression: QueryChild(Proc) on the service proc
1898    // returns non-zero queue stats after the host_agent has handled
1899    // messages. Guards against the bug where the HostAgent closure
1900    // defaulted queue stats to zero because it predated Proc-level
1901    // queue accessors.
1902    #[tokio::test]
1903    async fn test_service_proc_query_child_has_queue_stats() {
1904        use hyperactor::actor::ActorStatus;
1905        use hyperactor::introspect::IntrospectMessage;
1906        use hyperactor::introspect::IntrospectResult;
1907
1908        let host = Host::new(
1909            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1910            ChannelTransport::Unix.any(),
1911        )
1912        .await
1913        .unwrap();
1914
1915        let system_proc = host.system_proc().clone();
1916        let host_agent = system_proc
1917            .spawn(
1918                HOST_MESH_AGENT_ACTOR_NAME,
1919                HostAgent::new(HostAgentMode::Process {
1920                    host,
1921                    shutdown_tx: None,
1922                }),
1923            )
1924            .unwrap();
1925
1926        // Wait for HostAgent to finish init.
1927        host_agent
1928            .status()
1929            .wait_for(|s| matches!(s, ActorStatus::Idle))
1930            .await
1931            .unwrap();
1932
1933        let client_proc =
1934            Proc::direct(ChannelTransport::Unix.any(), "qd_client".to_string()).unwrap();
1935        let (client, _client_handle) = client_proc.client("client").unwrap();
1936
1937        // Spawn a proc so the host_agent processes at least one
1938        // CreateOrUpdate message, which goes through the work queue.
1939        let name = ResourceId::instance(Label::new("qd_test_proc").unwrap());
1940        host_agent
1941            .create_or_update(
1942                &client,
1943                name.clone(),
1944                resource::Rank::new(0),
1945                ProcSpec::default(),
1946            )
1947            .await
1948            .unwrap();
1949
1950        // The host_agent has now processed messages on the service
1951        // proc. Query the service proc's introspection.
1952        let agent_ref = system_proc
1953            .proc_addr()
1954            .actor_addr(HOST_MESH_AGENT_ACTOR_NAME);
1955        let agent_id: ActorAddr = agent_ref;
1956        let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1957
1958        // Poll until we see non-zero watermark (evidence of queue
1959        // traffic since startup).
1960        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(10);
1961        loop {
1962            let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1963            port.post(
1964                &client,
1965                IntrospectMessage::QueryChild {
1966                    child_ref: Addr::Proc(system_proc.proc_addr().clone()),
1967                    reply: reply_port.bind(),
1968                },
1969            );
1970            let payload = tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1971                .await
1972                .expect("QueryChild timed out")
1973                .expect("reply channel closed");
1974
1975            let attrs: hyperactor_config::Attrs =
1976                serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1977
1978            let hwm = attrs
1979                .get(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK)
1980                .copied()
1981                .unwrap_or(0);
1982            let last_nonzero: Option<u64> = attrs
1983                .get(crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS)
1984                .copied()
1985                .flatten();
1986
1987            if hwm > 0 {
1988                // The service proc's watermark should reflect
1989                // the messages the host_agent processed.
1990                assert!(
1991                    last_nonzero.is_some(),
1992                    "last-nonzero should be Some when watermark is {hwm}",
1993                );
1994                break;
1995            }
1996
1997            assert!(
1998                tokio::time::Instant::now() < deadline,
1999                "timed out waiting for service proc watermark > 0",
2000            );
2001            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2002        }
2003    }
2004}