hyperactor_mesh/host_mesh/
host_agent.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The mesh agent actor that manages a host.
10
11// EnumAsInner generates code that triggers a false positive
12// unused_assignments lint on struct variant fields. #[allow] on the
13// enum itself doesn't propagate into derive-macro-generated code, so
14// the suppression must be at module scope.
15#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::fmt;
19use std::pin::Pin;
20use std::str::FromStr;
21use std::sync::OnceLock;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use hyperactor::Actor;
26use hyperactor::ActorHandle;
27use hyperactor::Context;
28use hyperactor::HandleClient;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::PortHandle;
32use hyperactor::Proc;
33use hyperactor::RefClient;
34use hyperactor::channel::ChannelTransport;
35use hyperactor::context;
36use hyperactor::context::Mailbox as _;
37use hyperactor::host::Host;
38use hyperactor::host::HostError;
39use hyperactor::host::LOCAL_PROC_NAME;
40use hyperactor::host::LocalProcManager;
41use hyperactor::host::SERVICE_PROC_NAME;
42use hyperactor::mailbox::MailboxServerHandle;
43use hyperactor::mailbox::PortSender as _;
44use hyperactor::reference as hyperactor_reference;
45use hyperactor_config::Flattrs;
46use hyperactor_config::attrs::Attrs;
47use serde::Deserialize;
48use serde::Serialize;
49use tokio::time::Duration;
50use typeuri::Named;
51
52use crate::Name;
53use crate::bootstrap;
54use crate::bootstrap::BootstrapCommand;
55use crate::bootstrap::BootstrapProcConfig;
56use crate::bootstrap::BootstrapProcManager;
57use crate::mesh_admin::MeshAdminMessageClient;
58use crate::proc_agent::ProcAgent;
59use crate::resource;
60use crate::resource::ProcSpec;
61
62/// Typed host-node identifier for mesh admin navigation.
63///
64/// Wraps an [`ActorId`] (the `HostAgent`'s actor id) and
65/// serializes with a `host:` prefix so that the admin resolver can
66/// distinguish host-level references from plain actor references.
67/// The same `HostAgent` `ActorId` can appear as both a host
68/// (from root's children) and as an actor (from a proc's children);
69/// `HostId` makes the host case unambiguous.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub(crate) struct HostId(pub hyperactor_reference::ActorId);
72
73/// Prefix used by [`HostId`] for display/parse round-tripping.
74const HOST_ID_PREFIX: &str = "host:";
75
76impl fmt::Display for HostId {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        write!(f, "{HOST_ID_PREFIX}{}", self.0)
79    }
80}
81
82impl FromStr for HostId {
83    type Err = anyhow::Error;
84
85    fn from_str(s: &str) -> Result<Self, Self::Err> {
86        let inner = s
87            .strip_prefix(HOST_ID_PREFIX)
88            .ok_or_else(|| anyhow::anyhow!("not a host reference: {}", s))?;
89        let actor_id: hyperactor_reference::ActorId = inner
90            .parse()
91            .map_err(|e| anyhow::anyhow!("invalid actor id in host ref '{}': {}", s, e))?;
92        Ok(HostId(actor_id))
93    }
94}
95
96pub(crate) type ProcManagerSpawnFuture =
97    Pin<Box<dyn Future<Output = anyhow::Result<ActorHandle<ProcAgent>>> + Send>>;
98pub(crate) type ProcManagerSpawnFn = Box<dyn Fn(Proc) -> ProcManagerSpawnFuture + Send + Sync>;
99
100/// Represents the different ways a [`Host`] can be managed by an agent.
101///
102/// A host can either:
103/// - [`Process`] — a host running as an external OS process, managed by
104///   [`BootstrapProcManager`].
105/// - [`Local`] — a host running in-process, managed by
106///   [`LocalProcManager`] with a custom spawn function.
107///
108/// This abstraction lets the same `HostAgent` work across both
109/// out-of-process and in-process execution modes.
110#[derive(EnumAsInner)]
111pub enum HostAgentMode {
112    Process {
113        host: Host<BootstrapProcManager>,
114        /// If set, the ShutdownHost handler sends the frontend mailbox server
115        /// handle back to the bootstrap loop via this channel once shutdown is
116        /// complete, so the caller can drain it and exit.
117        shutdown_tx: Option<tokio::sync::oneshot::Sender<MailboxServerHandle>>,
118    },
119    Local(Host<LocalProcManager<ProcManagerSpawnFn>>),
120}
121
122impl HostAgentMode {
123    pub(crate) fn addr(&self) -> &hyperactor::channel::ChannelAddr {
124        #[allow(clippy::match_same_arms)]
125        match self {
126            HostAgentMode::Process { host, .. } => host.addr(),
127            HostAgentMode::Local(host) => host.addr(),
128        }
129    }
130
131    pub(crate) fn system_proc(&self) -> &Proc {
132        #[allow(clippy::match_same_arms)]
133        match self {
134            HostAgentMode::Process { host, .. } => host.system_proc(),
135            HostAgentMode::Local(host) => host.system_proc(),
136        }
137    }
138
139    pub(crate) fn local_proc(&self) -> &Proc {
140        #[allow(clippy::match_same_arms)]
141        match self {
142            HostAgentMode::Process { host, .. } => host.local_proc(),
143            HostAgentMode::Local(host) => host.local_proc(),
144        }
145    }
146
147    /// Non-blocking stop: send the stop signal and spawn a background
148    /// task for cleanup. Returns immediately without blocking the
149    /// actor.
150    async fn request_stop(
151        &self,
152        cx: &impl context::Actor,
153        proc: &hyperactor_reference::ProcId,
154        timeout: Duration,
155        reason: &str,
156    ) {
157        match self {
158            HostAgentMode::Process { host, .. } => {
159                host.manager().request_stop(cx, proc, timeout, reason).await;
160            }
161            HostAgentMode::Local(host) => {
162                host.manager().request_stop(proc, timeout, reason).await;
163            }
164        }
165    }
166
167    /// Query a proc's lifecycle state, returning both the coarse
168    /// `resource::Status` used by the resource protocol and the
169    /// detailed `bootstrap::ProcStatus` (when available) for callers
170    /// that need process-level detail such as PIDs or exit codes.
171    async fn proc_status(
172        &self,
173        proc_id: &hyperactor_reference::ProcId,
174    ) -> (resource::Status, Option<bootstrap::ProcStatus>) {
175        match self {
176            HostAgentMode::Process { host, .. } => match host.manager().status(proc_id).await {
177                Some(proc_status) => (proc_status.clone().into(), Some(proc_status)),
178                None => (resource::Status::Unknown, None),
179            },
180            HostAgentMode::Local(host) => {
181                let status = match host.manager().local_proc_status(proc_id).await {
182                    Some(hyperactor::host::LocalProcStatus::Stopping) => resource::Status::Stopping,
183                    Some(hyperactor::host::LocalProcStatus::Stopped) => resource::Status::Stopped,
184                    None => resource::Status::Running,
185                };
186                (status, None)
187            }
188        }
189    }
190
191    /// The bootstrap command used by the process manager, if any.
192    fn bootstrap_command(&self) -> Option<BootstrapCommand> {
193        match self {
194            HostAgentMode::Process { host, .. } => Some(host.manager().command().clone()),
195            HostAgentMode::Local(_) => None,
196        }
197    }
198}
199
200#[derive(Debug)]
201pub(crate) struct ProcCreationState {
202    pub(crate) rank: usize,
203    pub(crate) created: Result<
204        (
205            hyperactor_reference::ProcId,
206            hyperactor_reference::ActorRef<ProcAgent>,
207        ),
208        HostError,
209    >,
210}
211
212/// Actor name used when spawning the host mesh agent on the system proc.
213pub const HOST_MESH_AGENT_ACTOR_NAME: &str = "host_agent";
214
215/// A mesh agent is responsible for managing a host in a [`HostMesh`],
216/// through the resource behaviors defined in [`crate::resource`].
217#[hyperactor::export(
218    handlers=[
219        resource::CreateOrUpdate<ProcSpec>,
220        resource::Stop,
221        resource::GetState<ProcState>,
222        resource::GetRankStatus { cast = true },
223        resource::List,
224        ShutdownHost,
225        SpawnMeshAdmin,
226        SetClientConfig,
227    ]
228)]
229pub struct HostAgent {
230    pub(crate) host: Option<HostAgentMode>,
231    pub(crate) created: HashMap<Name, ProcCreationState>,
232    /// Lazily initialized ProcAgent on the host's local proc.
233    /// Boots on first [`GetLocalProc`] (LP-1 — see
234    /// `hyperactor::host::LOCAL_PROC_NAME`).
235    local_mesh_agent: OnceLock<anyhow::Result<ActorHandle<ProcAgent>>>,
236    /// Handle to the host's frontend mailbox server, set during `init` after
237    /// `this.bind::<Self>()` ensures the actor port is registered before the
238    /// mailbox starts routing messages. Sent back to the bootstrap loop via
239    /// `shutdown_tx` when the host shuts down so the caller can drain it.
240    mailbox_handle: Option<MailboxServerHandle>,
241}
242
243impl HostAgent {
244    /// Create a new host mesh agent running in the provided mode.
245    pub fn new(host: HostAgentMode) -> Self {
246        Self {
247            host: Some(host),
248            created: HashMap::new(),
249            local_mesh_agent: OnceLock::new(),
250            mailbox_handle: None,
251        }
252    }
253
254    /// Publish the current host properties and children list for
255    /// introspection. Called from init and after each state change
256    /// (proc created/stopped).
257    fn publish_introspect_properties(&self, cx: &Instance<Self>) {
258        let host = match self.host.as_ref() {
259            Some(h) => h,
260            None => return, // host shut down
261        };
262
263        let addr = host.addr().to_string();
264        let mut children = Vec::new();
265        let system_children = Vec::new();
266
267        // Procs are not system — only actors are. Both service and
268        // local appear as regular children; 's' in the TUI toggles
269        // actor visibility, not proc visibility.
270        let sys_ref = host.system_proc().proc_id().to_string();
271        let local_ref = host.local_proc().proc_id().to_string();
272        children.push(sys_ref);
273        children.push(local_ref);
274
275        // User procs.
276        for state in self.created.values() {
277            if let Ok((proc_id, _agent_ref)) = &state.created {
278                children.push(proc_id.to_string());
279            }
280        }
281
282        let num_procs = children.len();
283
284        let mut attrs = hyperactor_config::Attrs::new();
285        attrs.set(crate::introspect::NODE_TYPE, "host".to_string());
286        attrs.set(crate::introspect::ADDR, addr);
287        attrs.set(crate::introspect::NUM_PROCS, num_procs);
288        attrs.set(hyperactor::introspect::CHILDREN, children);
289        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
290        cx.publish_attrs(attrs);
291    }
292}
293
294#[async_trait]
295impl Actor for HostAgent {
296    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
297        // Serve the host now that the agent is initialized. Make sure our port is
298        // bound before serving.
299        this.bind::<Self>();
300        match self.host.as_mut().unwrap() {
301            HostAgentMode::Process { host, .. } => {
302                self.mailbox_handle = host.serve();
303                let (directory, file) = hyperactor_telemetry::log_file_path(
304                    hyperactor_telemetry::env::Env::current(),
305                    None,
306                )
307                .unwrap();
308                eprintln!(
309                    "Monarch internal logs are being written to {}/{}.log; execution id {}",
310                    directory,
311                    file,
312                    hyperactor_telemetry::env::execution_id(),
313                );
314            }
315            HostAgentMode::Local(host) => {
316                host.serve();
317            }
318        };
319        this.set_system();
320        self.publish_introspect_properties(this);
321
322        // Register callback for QueryChild — resolves system procs
323        // that are not independently addressable actors.
324        let host = self.host.as_ref().expect("host present");
325        let system_proc = host.system_proc().clone();
326        let local_proc = host.local_proc().clone();
327        let self_id = this.self_id().clone();
328        this.set_query_child_handler(move |child_ref| {
329            use hyperactor::introspect::IntrospectResult;
330
331            let proc = match child_ref {
332                hyperactor::reference::Reference::Proc(proc_id) => {
333                    if *proc_id == *system_proc.proc_id() {
334                        Some((&system_proc, SERVICE_PROC_NAME))
335                    } else if *proc_id == *local_proc.proc_id() {
336                        Some((&local_proc, LOCAL_PROC_NAME))
337                    } else {
338                        None
339                    }
340                }
341                _ => None,
342            };
343
344            match proc {
345                Some((proc, label)) => {
346                    // Use all_instance_keys() instead of
347                    // all_actor_ids() to avoid holding DashMap shard
348                    // read locks while doing Weak::upgrade() +
349                    // watch::borrow() + is_terminal() per entry.
350                    // Under rapid actor churn the per-entry work in
351                    // all_actor_ids() causes convoy starvation with
352                    // concurrent insert/remove operations, stalling
353                    // the spawn/exit path. all_instance_keys() just
354                    // clones keys — microseconds per shard. The
355                    // is_system check uses individual point lookups
356                    // outside the iteration. Stale keys (terminal
357                    // actors) may appear but are harmless — the TUI
358                    // handles "not found" gracefully.
359                    let all_keys = proc.all_instance_keys();
360                    let mut actors = Vec::with_capacity(all_keys.len());
361                    let mut system_actors = Vec::new();
362                    for id in all_keys {
363                        let ref_str = id.to_string();
364                        if proc.get_instance(&id).is_some_and(|cell| cell.is_system()) {
365                            system_actors.push(ref_str.clone());
366                        }
367                        actors.push(ref_str);
368                    }
369                    // Build attrs for this proc node.
370                    let mut attrs = hyperactor_config::Attrs::new();
371                    attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
372                    attrs.set(crate::introspect::PROC_NAME, label.to_string());
373                    attrs.set(crate::introspect::NUM_ACTORS, actors.len());
374                    attrs.set(crate::introspect::SYSTEM_CHILDREN, system_actors.clone());
375                    let attrs_json =
376                        serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
377
378                    IntrospectResult {
379                        identity: proc.proc_id().to_string(),
380                        attrs: attrs_json,
381                        children: actors,
382                        parent: Some(HostId(self_id.clone()).to_string()),
383                        as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
384                            .to_string(),
385                    }
386                }
387                None => {
388                    let mut error_attrs = hyperactor_config::Attrs::new();
389                    error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
390                    error_attrs.set(
391                        hyperactor::introspect::ERROR_MESSAGE,
392                        format!("child {} not found", child_ref),
393                    );
394                    IntrospectResult {
395                        identity: String::new(),
396                        attrs: serde_json::to_string(&error_attrs)
397                            .unwrap_or_else(|_| "{}".to_string()),
398                        children: Vec::new(),
399                        parent: None,
400                        as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
401                            .to_string(),
402                    }
403                }
404            }
405        });
406
407        Ok(())
408    }
409}
410
411impl fmt::Debug for HostAgent {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        f.debug_struct("HostAgent")
414            .field("host", &"..")
415            .field("created", &self.created)
416            .finish()
417    }
418}
419
420#[async_trait]
421impl Handler<resource::CreateOrUpdate<ProcSpec>> for HostAgent {
422    #[tracing::instrument("HostAgent::CreateOrUpdate", level = "info", skip_all, fields(name=%create_or_update.name))]
423    async fn handle(
424        &mut self,
425        cx: &Context<Self>,
426        create_or_update: resource::CreateOrUpdate<ProcSpec>,
427    ) -> anyhow::Result<()> {
428        if self.created.contains_key(&create_or_update.name) {
429            // Already created: there is no update.
430            return Ok(());
431        }
432
433        let host = self.host.as_mut().expect("host present");
434        let created = match host {
435            HostAgentMode::Process { host, .. } => {
436                host.spawn(
437                    create_or_update.name.clone().to_string(),
438                    BootstrapProcConfig {
439                        create_rank: create_or_update.rank.unwrap(),
440                        client_config_override: create_or_update
441                            .spec
442                            .client_config_override
443                            .clone(),
444                    },
445                )
446                .await
447            }
448            HostAgentMode::Local(host) => {
449                host.spawn(create_or_update.name.clone().to_string(), ())
450                    .await
451            }
452        };
453
454        if let Err(e) = &created {
455            tracing::error!("failed to spawn proc {}: {}", create_or_update.name, e);
456        }
457        self.created.insert(
458            create_or_update.name.clone(),
459            ProcCreationState {
460                rank: create_or_update.rank.unwrap(),
461                created,
462            },
463        );
464
465        self.publish_introspect_properties(cx);
466        Ok(())
467    }
468}
469
470#[async_trait]
471impl Handler<resource::Stop> for HostAgent {
472    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
473        tracing::info!(
474            name = "HostMeshAgentStatus",
475            proc_name = %message.name,
476            reason = %message.reason,
477            "stopping proc"
478        );
479        let host = self
480            .host
481            .as_ref()
482            .ok_or(anyhow::anyhow!("HostAgent has already shut down"))?;
483        let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
484
485        if let Some(ProcCreationState {
486            created: Ok((proc_id, _)),
487            ..
488        }) = self.created.get(&message.name)
489        {
490            host.request_stop(cx, proc_id, timeout, &message.reason)
491                .await;
492        }
493
494        self.publish_introspect_properties(cx);
495        Ok(())
496    }
497}
498
499#[async_trait]
500impl Handler<resource::GetRankStatus> for HostAgent {
501    async fn handle(
502        &mut self,
503        cx: &Context<Self>,
504        get_rank_status: resource::GetRankStatus,
505    ) -> anyhow::Result<()> {
506        use crate::StatusOverlay;
507        use crate::resource::Status;
508
509        let (rank, status) = match self.created.get(&get_rank_status.name) {
510            Some(ProcCreationState {
511                rank,
512                created: Ok((proc_id, _mesh_agent)),
513            }) => {
514                let status = match self.host.as_ref() {
515                    Some(host) => host.proc_status(proc_id).await.0,
516                    None => Status::Stopped,
517                };
518                (*rank, status)
519            }
520            Some(ProcCreationState {
521                rank,
522                created: Err(e),
523                ..
524            }) => (*rank, Status::Failed(e.to_string())),
525            None => (usize::MAX, Status::NotExist),
526        };
527
528        let overlay = if rank == usize::MAX {
529            StatusOverlay::new()
530        } else {
531            StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
532                .expect("valid single-run overlay")
533        };
534        let result = get_rank_status.reply.send(cx, overlay);
535        // Ignore errors, because returning Err from here would cause the HostAgent
536        // to be stopped, which would take down the entire host. This only means
537        // some actor that requested the rank status failed to receive it.
538        if let Err(e) = result {
539            tracing::warn!(
540                actor = %cx.self_id(),
541                "failed to send GetRankStatus reply to {} due to error: {}",
542                get_rank_status.reply.port_id().actor_id(),
543                e
544            );
545        }
546        Ok(())
547    }
548}
549
550#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
551pub struct ShutdownHost {
552    /// Grace window: send SIGTERM and wait this long before
553    /// escalating.
554    pub timeout: std::time::Duration,
555    /// Max number of children to terminate concurrently on this host.
556    pub max_in_flight: usize,
557    /// Ack that the agent finished shutdown work (best-effort).
558    #[reply]
559    pub ack: hyperactor::reference::PortRef<()>,
560}
561wirevalue::register_type!(ShutdownHost);
562
563#[async_trait]
564impl Handler<ShutdownHost> for HostAgent {
565    async fn handle(&mut self, cx: &Context<Self>, msg: ShutdownHost) -> anyhow::Result<()> {
566        // Ack immediately so caller can stop waiting.
567        let (return_handle, mut return_receiver) = cx.mailbox().open_port();
568        cx.mailbox()
569            .serialize_and_send(&msg.ack, (), return_handle)?;
570
571        let mut shutdown_tx = None;
572        if let Some(host_mode) = self.host.take() {
573            match host_mode {
574                HostAgentMode::Process {
575                    host,
576                    shutdown_tx: tx,
577                } => {
578                    let summary = host
579                        .terminate_children(
580                            cx,
581                            msg.timeout,
582                            msg.max_in_flight.clamp(1, 256),
583                            "shutdown host",
584                        )
585                        .await;
586                    tracing::info!(?summary, "terminated children on host");
587                    shutdown_tx = tx;
588                }
589                HostAgentMode::Local(host) => {
590                    let summary = host
591                        .terminate_children(cx, msg.timeout, msg.max_in_flight, "shutdown host")
592                        .await;
593                    tracing::info!(?summary, "terminated children on local host");
594                }
595            }
596        }
597
598        // If message is returned, it means it ack was not sent successfully.
599        if return_receiver.recv().await.is_ok() {
600            tracing::warn!("failed to send ack");
601        }
602
603        // Drop the host to release any resources that somehow survived.
604        let _ = self.host.take();
605
606        if let Some(tx) = shutdown_tx {
607            tracing::info!(
608                proc_id = %cx.self_id().proc_id(),
609                actor_id = %cx.self_id(),
610                "host is shut down, sending mailbox handle to bootstrap for draining"
611            );
612            if let Some(handle) = self.mailbox_handle.take() {
613                let _ = tx.send(handle);
614            }
615        }
616
617        Ok(())
618    }
619}
620
621#[derive(Debug, Clone, PartialEq, Eq, Named, Serialize, Deserialize)]
622pub struct ProcState {
623    pub proc_id: hyperactor_reference::ProcId,
624    pub create_rank: usize,
625    pub mesh_agent: hyperactor_reference::ActorRef<ProcAgent>,
626    pub bootstrap_command: Option<BootstrapCommand>,
627    pub proc_status: Option<bootstrap::ProcStatus>,
628}
629wirevalue::register_type!(ProcState);
630
631#[async_trait]
632impl Handler<resource::GetState<ProcState>> for HostAgent {
633    async fn handle(
634        &mut self,
635        cx: &Context<Self>,
636        get_state: resource::GetState<ProcState>,
637    ) -> anyhow::Result<()> {
638        let state = match self.created.get(&get_state.name) {
639            Some(ProcCreationState {
640                rank,
641                created: Ok((proc_id, mesh_agent)),
642            }) => {
643                let (status, proc_status, bootstrap_command) = match self.host.as_ref() {
644                    Some(host) => {
645                        let (status, proc_status) = host.proc_status(proc_id).await;
646                        (status, proc_status, host.bootstrap_command())
647                    }
648                    None => (resource::Status::Stopped, None, None),
649                };
650                resource::State {
651                    name: get_state.name.clone(),
652                    status,
653                    state: Some(ProcState {
654                        proc_id: proc_id.clone(),
655                        create_rank: *rank,
656                        mesh_agent: mesh_agent.clone(),
657                        bootstrap_command,
658                        proc_status,
659                    }),
660                }
661            }
662            Some(ProcCreationState {
663                created: Err(e), ..
664            }) => resource::State {
665                name: get_state.name.clone(),
666                status: resource::Status::Failed(e.to_string()),
667                state: None,
668            },
669            None => resource::State {
670                name: get_state.name.clone(),
671                status: resource::Status::NotExist,
672                state: None,
673            },
674        };
675
676        let result = get_state.reply.send(cx, state);
677        // Ignore errors, because returning Err from here would cause the HostAgent
678        // to be stopped, which would take down the entire host. This only means
679        // some actor that requested the state of a proc failed to receive it.
680        if let Err(e) = result {
681            tracing::warn!(
682                actor = %cx.self_id(),
683                "failed to send GetState reply to {} due to error: {}",
684                get_state.reply.port_id().actor_id(),
685                e
686            );
687        }
688        Ok(())
689    }
690}
691
692#[async_trait]
693impl Handler<resource::List> for HostAgent {
694    async fn handle(&mut self, cx: &Context<Self>, list: resource::List) -> anyhow::Result<()> {
695        list.reply
696            .send(cx, self.created.keys().cloned().collect())?;
697        Ok(())
698    }
699}
700
701/// Message to spawn a [`MeshAdminAgent`] on this host's system proc.
702///
703/// The handler spawns the admin agent, queries its HTTP address via
704/// `GetAdminAddr`, and replies with the address string.
705#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
706pub struct SpawnMeshAdmin {
707    /// All hosts in the mesh as `(address, agent_ref)` pairs. Passed
708    /// through to [`MeshAdminAgent::new`] so the admin can fan out
709    /// introspection queries to every host.
710    pub hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
711
712    /// `ActorId` of the process-global root client, exposed as a
713    /// child node in the admin introspection tree. `None` if no root
714    /// client is available.
715    pub root_client_actor_id: Option<hyperactor_reference::ActorId>,
716
717    /// Explicit bind address for the admin HTTP server. When `None`,
718    /// the server reads `MESH_ADMIN_ADDR` from config.
719    pub admin_addr: Option<std::net::SocketAddr>,
720
721    /// Reply port for the admin HTTP address string (e.g.
722    /// `"myhost.facebook.com:8080"`).
723    #[reply]
724    pub addr: hyperactor::reference::PortRef<String>,
725}
726wirevalue::register_type!(SpawnMeshAdmin);
727
728#[async_trait]
729impl Handler<SpawnMeshAdmin> for HostAgent {
730    /// Spawns a [`MeshAdminAgent`] on this host's system proc, waits
731    /// for its HTTP server to bind, and replies with the listen
732    /// address.
733    async fn handle(&mut self, cx: &Context<Self>, msg: SpawnMeshAdmin) -> anyhow::Result<()> {
734        let proc = self
735            .host
736            .as_ref()
737            .ok_or_else(|| anyhow::anyhow!("host is not available"))?
738            .system_proc();
739
740        let agent_handle = proc.spawn(
741            crate::mesh_admin::MESH_ADMIN_ACTOR_NAME,
742            crate::mesh_admin::MeshAdminAgent::new(
743                msg.hosts,
744                msg.root_client_actor_id,
745                msg.admin_addr,
746            ),
747        )?;
748        let response = agent_handle.get_admin_addr(cx).await?;
749        let addr_str = response
750            .addr
751            .ok_or_else(|| anyhow::anyhow!("mesh admin agent did not report an address"))?;
752
753        msg.addr.send(cx, addr_str)?;
754        Ok(())
755    }
756}
757
758/// Push client configuration overrides to this host agent's process.
759///
760/// The attrs are installed as `Source::ClientOverride` (lowest explicit
761/// priority), so the host's own env vars and file config take precedence.
762/// This message is idempotent — sending the same attrs twice replaces
763/// the layer wholesale.
764///
765/// Request-reply: the reply acts as a barrier confirming the config
766/// is installed. The caller should await with a timeout and treat
767/// timeout as best-effort (log warning, continue).
768#[derive(Debug, Named, Handler, RefClient, HandleClient, Serialize, Deserialize)]
769pub struct SetClientConfig {
770    pub attrs: Attrs,
771    #[reply]
772    pub done: hyperactor_reference::PortRef<()>,
773}
774wirevalue::register_type!(SetClientConfig);
775
776#[async_trait]
777impl Handler<SetClientConfig> for HostAgent {
778    async fn handle(&mut self, cx: &Context<Self>, msg: SetClientConfig) -> anyhow::Result<()> {
779        // Use `set` (not `create_or_merge`) because `push_config` always
780        // sends a complete `propagatable_attrs()` snapshot. Replacing the
781        // layer wholesale is intentional and idempotent.
782        hyperactor_config::global::set(
783            hyperactor_config::global::Source::ClientOverride,
784            msg.attrs,
785        );
786        tracing::debug!("installed client config override on host agent");
787        msg.done.send(cx, ())?;
788        Ok(())
789    }
790}
791
792/// Boot the ProcAgent on the host's local proc (LP-1).
793///
794/// The local proc starts empty; this message activates it by spawning
795/// a `ProcAgent` (once, via `OnceLock`). Called by
796/// `monarch_hyperactor::bootstrap_host` when setting up the Python
797/// `this_proc()` singleton.
798///
799/// See also: `hyperactor::host::LOCAL_PROC_NAME`.
800#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
801pub struct GetLocalProc {
802    #[reply]
803    pub proc_mesh_agent: PortHandle<ActorHandle<ProcAgent>>,
804}
805
806#[async_trait]
807impl Handler<GetLocalProc> for HostAgent {
808    async fn handle(
809        &mut self,
810        cx: &Context<Self>,
811        GetLocalProc { proc_mesh_agent }: GetLocalProc,
812    ) -> anyhow::Result<()> {
813        let agent = self.local_mesh_agent.get_or_init(|| {
814            ProcAgent::boot_v1(self.host.as_ref().unwrap().local_proc().clone(), None)
815        });
816
817        match agent {
818            Err(e) => anyhow::bail!("error booting local proc: {}", e),
819            Ok(agent) => proc_mesh_agent.send(cx, agent.clone())?,
820        };
821
822        Ok(())
823    }
824}
825
826/// A trampoline actor that spawns a [`Host`], and sends a reference to the
827/// corresponding [`HostAgent`] to the provided reply port.
828///
829/// This is used to bootstrap host meshes from proc meshes.
830#[derive(Debug)]
831#[hyperactor::export(
832    spawn = true,
833    handlers=[GetHostMeshAgent]
834)]
835pub(crate) struct HostMeshAgentProcMeshTrampoline {
836    host_mesh_agent: ActorHandle<HostAgent>,
837    reply_port: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
838}
839
840#[async_trait]
841impl Actor for HostMeshAgentProcMeshTrampoline {
842    async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
843        self.reply_port.send(this, self.host_mesh_agent.bind())?;
844        Ok(())
845    }
846}
847
848#[async_trait]
849impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline {
850    type Params = (
851        ChannelTransport,
852        hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
853        Option<BootstrapCommand>,
854        bool, /* local? */
855    );
856
857    async fn new(
858        (transport, reply_port, command, local): Self::Params,
859        _environment: Flattrs,
860    ) -> anyhow::Result<Self> {
861        let host = if local {
862            let spawn: ProcManagerSpawnFn =
863                Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
864            let manager = LocalProcManager::new(spawn);
865            let host = Host::new(manager, transport.any()).await?;
866            HostAgentMode::Local(host)
867        } else {
868            let command = match command {
869                Some(command) => command,
870                None => BootstrapCommand::current()?,
871            };
872            tracing::info!("booting host with proc command {:?}", command);
873            let manager = BootstrapProcManager::new(command).unwrap();
874            let host = Host::new(manager, transport.any()).await?;
875            HostAgentMode::Process {
876                host,
877                shutdown_tx: None,
878            }
879        };
880
881        let system_proc = host.system_proc().clone();
882        let host_mesh_agent =
883            system_proc.spawn(HOST_MESH_AGENT_ACTOR_NAME, HostAgent::new(host))?;
884
885        Ok(Self {
886            host_mesh_agent,
887            reply_port,
888        })
889    }
890}
891
892#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient)]
893pub struct GetHostMeshAgent {
894    #[reply]
895    pub host_mesh_agent: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
896}
897wirevalue::register_type!(GetHostMeshAgent);
898
899#[async_trait]
900impl Handler<GetHostMeshAgent> for HostMeshAgentProcMeshTrampoline {
901    async fn handle(
902        &mut self,
903        cx: &Context<Self>,
904        get_host_mesh_agent: GetHostMeshAgent,
905    ) -> anyhow::Result<()> {
906        get_host_mesh_agent
907            .host_mesh_agent
908            .send(cx, self.host_mesh_agent.bind())?;
909        Ok(())
910    }
911}
912
913#[cfg(test)]
914mod tests {
915    use std::assert_matches::assert_matches;
916
917    use hyperactor::Proc;
918    use hyperactor::channel::ChannelTransport;
919
920    use super::*;
921    use crate::bootstrap::ProcStatus;
922    use crate::resource::CreateOrUpdateClient;
923    use crate::resource::GetStateClient;
924
925    #[tokio::test]
926    #[cfg(fbcode_build)]
927    async fn test_basic() {
928        let host = Host::new(
929            BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
930            ChannelTransport::Unix.any(),
931        )
932        .await
933        .unwrap();
934
935        let host_addr = host.addr().clone();
936        let system_proc = host.system_proc().clone();
937        let host_agent = system_proc
938            .spawn(
939                HOST_MESH_AGENT_ACTOR_NAME,
940                HostAgent::new(HostAgentMode::Process {
941                    host,
942                    shutdown_tx: None,
943                }),
944            )
945            .unwrap();
946
947        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
948        let (client, _client_handle) = client_proc.instance("client").unwrap();
949
950        let name = Name::new("proc1").unwrap();
951
952        // First, create the proc, then query its state:
953
954        host_agent
955            .create_or_update(
956                &client,
957                name.clone(),
958                resource::Rank::new(0),
959                ProcSpec::default(),
960            )
961            .await
962            .unwrap();
963        assert_matches!(
964            host_agent.get_state(&client, name.clone()).await.unwrap(),
965            resource::State {
966                name: resource_name,
967                status: resource::Status::Running,
968                state: Some(ProcState {
969                    // The proc itself should be direct addressed, with its name directly.
970                    proc_id,
971                    // The mesh agent should run in the same proc, under the name
972                    // "proc_agent".
973                    mesh_agent,
974                    bootstrap_command,
975                    proc_status: Some(ProcStatus::Ready { started_at: _, addr: _, agent: proc_status_mesh_agent}),
976                    ..
977                }),
978            } if name == resource_name
979              && proc_id == hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string())
980              && mesh_agent == hyperactor_reference::ActorRef::attest(hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string()).actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0)) && bootstrap_command == Some(BootstrapCommand::test())
981              && mesh_agent == proc_status_mesh_agent
982        );
983    }
984}