hyperactor_mesh/
mesh_admin.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mesh-level admin surface for topology introspection and reference
10//! walking.
11//!
12//! This module defines `MeshAdminAgent`, an actor that exposes a
13//! uniform, reference-based HTTP API over an entire host mesh. Every
14//! addressable entity in the mesh is represented as a `NodePayload`
15//! and resolved via an opaque reference string.
16//!
17//! Incoming HTTP requests are bridged into the actor message loop
18//! using `ResolveReferenceMessage`, ensuring that all topology
19//! resolution and data collection happens through actor messaging.
20//! The agent fans out to `HostAgent` instances to fetch host,
21//! proc, and actor details, then normalizes them into a single
22//! tree-shaped model (`NodeProperties` + children references)
23//! suitable for topology-agnostic clients such as the admin TUI.
24//!
25//! # Schema strategy
26//!
27//! The external API contract is schema-first: the JSON Schema
28//! (Draft 2020-12) served at `GET /v1/schema` is the
29//! authoritative definition of the response shape, derived
30//! directly from the Rust types (`NodePayload`,
31//! `NodeProperties`, `FailureInfo`) via `schemars::JsonSchema`.
32//! The error envelope schema is at `GET /v1/schema/error`.
33//!
34//! This follows the "Admin Gateway Pattern" RFC
35//! ([doc](https://fburl.com/1dvah88uutaiyesebojouen2)):
36//! schema is the product; transports and tooling are projections.
37//!
38//! ## Schema generation pipeline
39//!
40//! 1. `#[derive(JsonSchema)]` on `NodePayload`, `NodeProperties`,
41//!    `FailureInfo`, `ApiError`, `ApiErrorEnvelope`.
42//! 2. `schemars::schema_for!(T)` produces a `Schema` value at
43//!    runtime (Draft 2020-12).
44//! 3. The `serve_schema` / `serve_error_schema` handlers inject a
45//!    `$id` field (SC-4) and serve the result as JSON.
46//! 4. Snapshot tests in `introspect::tests` compare the raw
47//!    schemars output (without `$id`) against checked-in golden
48//!    files to detect drift (SC-2).
49//! 5. Validation tests confirm that real `NodePayload` samples
50//!    pass schema validation (SC-3).
51//!
52//! ## Regenerating snapshots
53//!
54//! After intentional type changes to `NodePayload`,
55//! `NodeProperties`, `FailureInfo`, `ApiError`, or
56//! `ApiErrorEnvelope`, regenerate the golden files:
57//!
58//! ```sh
59//! buck run fbcode//monarch/hyperactor_mesh:generate_api_artifacts \
60//!   @fbcode//mode/dev-nosan -- \
61//!   fbcode/monarch/hyperactor_mesh/src/testdata
62//! ```
63//!
64//! Or via cargo:
65//!
66//! ```sh
67//! cargo run -p hyperactor_mesh --bin generate_api_artifacts -- \
68//!   hyperactor_mesh/src/testdata
69//! ```
70//!
71//! Then re-run tests to confirm the new snapshot passes.
72//!
73//! ## Schema invariants (SC-*)
74//!
75//! - **SC-1 (schema-derived):** Schema is derived from Rust
76//!   types via `schemars::JsonSchema`, not hand-written.
77//! - **SC-2 (schema-snapshot-stability):** Schema changes must
78//!   be explicit — a snapshot test catches unintentional drift.
79//! - **SC-3 (schema-payload-conformance):** Real `NodePayload`
80//!   instances validate against the generated schema.
81//! - **SC-4 (schema-version-identity):** Served schemas carry a
82//!   `$id` tied to the API version (e.g.
83//!   `https://monarch.meta.com/schemas/v1/node_payload`).
84//! - **SC-5 (route-precedence):** Literal schema routes are
85//!   matched by specificity before the `{*reference}` wildcard
86//!   (axum 0.8 specificity-based routing).
87//!
88//! Note on `ApiError.details`: the derived schema is maximally
89//! permissive for `details` (any valid JSON). This is intentional
90//! for v1 — `details` is a domain-specific escape hatch.
91//! Consumers must not assume a fixed shape.
92//!
93//! # Introspection visibility policy
94//!
95//! Admin tooling only displays **introspectable** nodes: entities
96//! that are reachable via actor messaging and respond to
97//! [`IntrospectMessage`]. Infrastructure procs that are
98//! **non-routable** are intentionally **opaque** to introspection and
99//! are omitted from the navigation graph.
100//!
101//! ## Definitions
102//!
103//! **Routable** — an entity is routable if the system can address it
104//! via the routing layer and successfully deliver a message to it
105//! using a `Reference` / `ActorId` (i.e., there exists a live mailbox
106//! sender reachable through normal routing). Practical test: "can I
107//! send `IntrospectMessage::Query` to it and get a reply?"
108//!
109//! **Non-routable** — an entity is non-routable if it has no
110//! externally reachable mailbox sender in the routing layer, so
111//! message delivery is impossible by construction (even if you know
112//! its name). Examples: `hyperactor_runtime[0]`, `mailbox_server[N]`,
113//! `local[N]` — these use `PanickingMailboxSender` and are never
114//! bound to the router.
115//!
116//! **Introspectable** — tooling can obtain a `NodePayload` for this
117//! node by sending `IntrospectMessage` to a routable actor.
118//!
119//! **Opaque** — the node exists but is not introspectable via
120//! messaging; tooling cannot observe it through the introspection
121//! protocol.
122//!
123//! ## Proc visibility
124//!
125//! A proc is not directly introspected; actors are. Tooling
126//! synthesizes proc-level nodes by grouping introspectable actors by
127//! `ProcId`.
128//!
129//! A proc is visible iff there exists at least one actor on that proc
130//! whose `ActorId` is deliverable via the routing layer (i.e., the
131//! actor has a bound mailbox sender reachable through normal routing)
132//! and responds to `IntrospectMessage`.
133//!
134//! The rule is: **if an entity is routable via the mesh routing layer
135//! (i.e., tooling can deliver `IntrospectMessage::Query` to one of its
136//! actors), then it is introspectable and appears in the admin graph.**
137//!
138//! ## Navigation identity invariants (NI-*)
139//!
140//! Every `NodePayload` in the topology tree satisfies:
141//!
142//! - **NI-1 (identity = reference):** A node's `identity` field must
143//!   equal the reference string used to resolve it. If the TUI asks
144//!   for reference `R`, `payload.identity == R`.
145//!
146//! - **NI-2 (parent coherence):** A node's `parent` field must equal
147//!   the `identity` of the node it appears under. If node `P` lists
148//!   `R` in its `children`, then `R.parent == Some(P.identity)`.
149//!
150//! Together these ensure that the TUI can correlate responses to tree
151//! nodes, and that upward/downward navigation is consistent.
152//!
153//! ## Proc-resolution invariants (SP-*)
154//!
155//! When a proc reference is resolved, the returned `NodePayload`
156//! satisfies:
157//!
158//! - **SP-1 (identity):** The identity matches the ProcId reference
159//!   from the parent's children list.
160//! - **SP-2 (properties):** The properties are `NodeProperties::Proc`.
161//! - **SP-3 (parent):** The parent is set to the HostId format
162//!   (`"host:<actor_id>"`).
163//! - **SP-4 (as_of):** The `as_of` field is present and non-empty.
164//!
165//! Enforced by `test_system_proc_identity`.
166//!
167//! ## Proc-agent invariants (PA-*)
168//!
169//! - **PA-1 (live children):** Proc-node children used by admin/TUI
170//!   must be derived from live proc state at query time. No
171//!   additional publish event is required for a newly spawned actor
172//!   to appear.
173//!
174//! Enforced by `test_proc_children_reflect_directly_spawned_actors`.
175//!
176//! ## Robustness invariant (MA-R1)
177//!
178//! - **MA-R1 (no-crash):** `MeshAdminAgent` must never crash the OS
179//!   process it resides in. Every handler catches errors and converts
180//!   them into structured error payloads
181//!   (`ResolveReferenceResponse(Err(..))`, `NodeProperties::Error`,
182//!   etc.) rather than propagating panics or unwinding. Failed reply
183//!   sends (the caller went away) are silently swallowed.
184//!
185//! ## TLS transport invariant (MA-T1)
186//!
187//! - **MA-T1 (tls):** At Meta (`fbcode_build`), the admin HTTP
188//!   server **requires** mutual TLS. At startup it probes for
189//!   certificates via `try_tls_acceptor` with client cert
190//!   enforcement enabled. If no usable certificate bundle is found,
191//!   `init()` returns an error — no plain HTTP fallback. In OSS,
192//!   TLS is best-effort with plain HTTP fallback.
193//!
194//! - **MA-T2 (scheme-in-url):** The URL returned by `GetAdminAddr`
195//!   is always `https://host:port` or `http://host:port`, never a
196//!   bare `host:port`. All callers receive and use this full URL
197//!   directly.
198//!
199//! ## Client host invariants (CH-*)
200//!
201//! Let **A** denote the observed host mesh (the host mesh for which
202//! this `MeshAdminAgent` was spawned), and let **C** denote the
203//! process-global singleton client host mesh in the caller process
204//! (whose local proc hosts the root client actor).
205//!
206//! - **CH-1 (deduplication):** When C ∈ A, the client host appears
207//!   exactly once in the admin host list (deduplicated by `HostAgent`
208//!   `ActorId` identity). When C ∉ A, `spawn_admin` includes C
209//!   alongside A's hosts so the admin introspects C as a normal host
210//!   subtree, not as a standalone proc.
211//!
212//! - **CH-2 (reachability):** In both cases, the root client actor
213//!   is reachable through the standard host → proc → actor walk.
214//!
215//! - **CH-3 (ordering):** `spawn_admin` requires `cx: &impl
216//!   context::Actor` (the caller's root client instance). Constructing
217//!   that instance initializes C. Therefore C is available when
218//!   `spawn_admin` executes. Any refactor must preserve this ordering.
219//!
220//! **Mechanism:** [`HostMeshRef::spawn_admin`] reads C from the
221//! caller process (via `try_this_host()`), merges it with A's host
222//! list, deduplicates by `HostAgent` `ActorId`, and sends the merged
223//! list in `SpawnMeshAdmin`. This works for same-process and
224//! cross-process setups because merge+dedeup happens in the caller
225//! process before sending the spawn request.
226//!
227//! ## MAST resolution invariants (MC-*)
228//!
229//! CLI-based `mast_conda:///` resolution (OSS-compatible fallback):
230//!
231//! - **MC-1 (cli-contract):** `mast get-status --json <job>` must
232//!   exit 0 and produce valid JSON. Missing binary → distinct error.
233//!   Non-zero exit → includes exit code and stderr. Malformed JSON →
234//!   parse error.
235//! - **MC-2 (head-hostname):** `head_hostname` extracts the first
236//!   hostname by ascending task index from the last attempt of each
237//!   task group.
238//! - **MC-3 (fqdn-idempotent):** `qualify_fqdn` passes through
239//!   hostnames containing a dot. Short hostnames are qualified via
240//!   `getaddrinfo(AI_CANONNAME)`. Failure falls back to the raw
241//!   hostname.
242//! - **MC-4 (fqdn-nonblocking):** `qualify_fqdn` runs the blocking
243//!   `getaddrinfo` syscall via `spawn_blocking`.
244//! - **MC-5 (admin-port):** `resolve_admin_port` uses the explicit
245//!   override when provided, otherwise reads the port from
246//!   `MESH_ADMIN_ADDR` config.
247//!
248//! Enforced by `test_head_hostname_*`, `test_qualify_fqdn_*`,
249//! `test_resolve_mast_*`, `test_resolve_admin_port_*`.
250
251use std::collections::HashMap;
252use std::io;
253use std::sync::Arc;
254use std::time::Duration;
255
256use async_trait::async_trait;
257use axum::Json;
258use axum::Router;
259use axum::extract::Path as AxumPath;
260use axum::extract::State;
261use axum::http::StatusCode;
262use axum::response::IntoResponse;
263use axum::routing::get;
264use hyperactor::Actor;
265use hyperactor::ActorHandle;
266use hyperactor::Context;
267use hyperactor::HandleClient;
268use hyperactor::Handler;
269use hyperactor::Instance;
270use hyperactor::RefClient;
271use hyperactor::channel::try_tls_acceptor;
272use hyperactor::introspect::IntrospectMessage;
273use hyperactor::introspect::IntrospectResult;
274use hyperactor::mailbox::open_once_port;
275use hyperactor::reference as hyperactor_reference;
276use serde::Deserialize;
277use serde::Serialize;
278use serde_json::Value;
279use tokio::net::TcpListener;
280use tokio_rustls::TlsAcceptor;
281use typeuri::Named;
282
283use crate::host_mesh::host_agent::HostAgent;
284use crate::host_mesh::host_agent::HostId;
285use crate::introspect::NodePayload;
286use crate::introspect::NodeProperties;
287use crate::introspect::to_node_payload;
288
289/// Send an `IntrospectMessage` to an actor and receive the reply.
290/// Encapsulates open_once_port + send + timeout + error handling.
291async fn query_introspect(
292    cx: &hyperactor::Context<'_, MeshAdminAgent>,
293    actor_id: &hyperactor_reference::ActorId,
294    view: hyperactor::introspect::IntrospectView,
295    timeout: Duration,
296    err_ctx: &str,
297) -> Result<IntrospectResult, anyhow::Error> {
298    let introspect_port =
299        hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
300    let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
301    introspect_port.send(
302        cx,
303        IntrospectMessage::Query {
304            view,
305            reply: reply_handle.bind(),
306        },
307    )?;
308    tokio::time::timeout(timeout, reply_rx.recv())
309        .await
310        .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
311        .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
312}
313
314/// Send an `IntrospectMessage::QueryChild` to an actor.
315async fn query_child_introspect(
316    cx: &hyperactor::Context<'_, MeshAdminAgent>,
317    actor_id: &hyperactor_reference::ActorId,
318    child_ref: hyperactor_reference::Reference,
319    timeout: Duration,
320    err_ctx: &str,
321) -> Result<IntrospectResult, anyhow::Error> {
322    let introspect_port =
323        hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
324    let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
325    introspect_port.send(
326        cx,
327        IntrospectMessage::QueryChild {
328            child_ref,
329            reply: reply_handle.bind(),
330        },
331    )?;
332    tokio::time::timeout(timeout, reply_rx.recv())
333        .await
334        .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
335        .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
336}
337
338/// Actor name used when spawning the mesh admin agent.
339pub const MESH_ADMIN_ACTOR_NAME: &str = "mesh_admin";
340
341/// Actor name for the HTTP bridge client mailbox on the service proc.
342///
343/// Unlike `MESH_ADMIN_ACTOR_NAME`, this is not a full actor: it is a
344/// client-mode `Instance<()>` created via
345/// `Proc::introspectable_instance()` and driven by Axum's Tokio task
346/// pool rather than an actor message loop. A separate instance is
347/// required because `MeshAdminAgent`'s own `Instance<Self>` is only
348/// accessible inside its message loop and cannot be shared with
349/// external tasks. This instance gives the HTTP handlers a routable
350/// proc identity so they can open one-shot reply ports
351/// (`open_once_port`) to receive responses from `MeshAdminAgent`.
352///
353/// Unlike a plain `instance()`, this uses
354/// `Proc::introspectable_instance()` so the bridge responds to
355/// `IntrospectMessage::Query` and appears as a navigable node in the
356/// mesh TUI rather than causing a 504 when selected.
357pub const MESH_ADMIN_BRIDGE_NAME: &str = "mesh_admin_bridge";
358
359/// Timeout for targeted queries that hit a single, specific host.
360/// Kept short so a slow or dying actor cannot block the
361/// single-threaded MeshAdminAgent message loop (which serializes
362/// all resolve requests, including the fast root refresh).
363const SINGLE_HOST_TIMEOUT: Duration = Duration::from_secs(3);
364
365/// Timeout for QueryChild snapshot lookups in resolve_actor_node.
366///
367/// QueryChild is handled by a synchronous callback on the target
368/// actor's IntrospectMessage port — it either returns a terminated
369/// snapshot immediately or returns Error { "not_found" } immediately.
370/// There is no async work behind it, so SINGLE_HOST_TIMEOUT is far
371/// too generous. A short budget here ensures the total time for
372/// resolve_actor_node (QueryChild + live actor Query) stays well
373/// under SINGLE_HOST_TIMEOUT, preventing cascading 504s when the
374/// outer bridge timeout fires before the inner work completes.
375const QUERY_CHILD_TIMEOUT: Duration = Duration::from_millis(100);
376
377/// Read `MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT` from config at call time.
378fn resolve_actor_timeout() -> Duration {
379    hyperactor_config::global::get(crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT)
380}
381
382/// Read `MESH_ADMIN_MAX_CONCURRENT_RESOLVES` from config at call time.
383fn max_concurrent_resolves() -> usize {
384    hyperactor_config::global::get(crate::config::MESH_ADMIN_MAX_CONCURRENT_RESOLVES)
385}
386
387/// Structured error response following the gateway RFC envelope
388/// pattern.
389#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
390pub struct ApiError {
391    /// Machine-readable error code (e.g. "not_found", "bad_request").
392    pub code: String,
393    /// Human-readable error message.
394    pub message: String,
395    /// Additional context about the error. Schema is permissive
396    /// (any valid JSON) — `details` is a domain-specific escape
397    /// hatch. Do not assume a fixed shape.
398    #[serde(skip_serializing_if = "Option::is_none")]
399    pub details: Option<Value>,
400}
401
402/// Wrapper for the structured error envelope.
403#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
404pub struct ApiErrorEnvelope {
405    pub error: ApiError,
406}
407
408impl ApiError {
409    /// Create a "not_found" error.
410    pub fn not_found(message: impl Into<String>, details: Option<Value>) -> Self {
411        Self {
412            code: "not_found".to_string(),
413            message: message.into(),
414            details,
415        }
416    }
417
418    /// Create a "bad_request" error.
419    pub fn bad_request(message: impl Into<String>, details: Option<Value>) -> Self {
420        Self {
421            code: "bad_request".to_string(),
422            message: message.into(),
423            details,
424        }
425    }
426}
427
428impl IntoResponse for ApiError {
429    fn into_response(self) -> axum::response::Response {
430        let status = match self.code.as_str() {
431            "not_found" => StatusCode::NOT_FOUND,
432            "bad_request" => StatusCode::BAD_REQUEST,
433            "gateway_timeout" => StatusCode::GATEWAY_TIMEOUT,
434            "service_unavailable" => StatusCode::SERVICE_UNAVAILABLE,
435            _ => StatusCode::INTERNAL_SERVER_ERROR,
436        };
437        let envelope = ApiErrorEnvelope { error: self };
438        (status, Json(envelope)).into_response()
439    }
440}
441
442/// Response payload for `MeshAdminMessage::GetAdminAddr`.
443///
444/// `addr` is `None` until the admin HTTP server has successfully
445/// bound a listening socket during `MeshAdminAgent::init`.
446#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
447pub struct MeshAdminAddrResponse {
448    pub addr: Option<String>,
449}
450wirevalue::register_type!(MeshAdminAddrResponse);
451
452/// Messages handled by the `MeshAdminAgent`.
453///
454/// These are mesh-admin control-plane queries (as opposed to topology
455/// resolution). They’re wirevalue-serializable and come with
456/// generated client/ref helpers via `HandleClient`/`RefClient`.
457#[derive(
458    Debug,
459    Clone,
460    PartialEq,
461    Serialize,
462    Deserialize,
463    Handler,
464    HandleClient,
465    RefClient,
466    Named
467)]
468pub enum MeshAdminMessage {
469    /// Return the HTTP admin server address that this agent bound in
470    /// `init`.
471    ///
472    /// The reply contains `None` if the server hasn't started yet.
473    GetAdminAddr {
474        #[reply]
475        reply: hyperactor_reference::OncePortRef<MeshAdminAddrResponse>,
476    },
477}
478wirevalue::register_type!(MeshAdminMessage);
479
480/// Newtype wrapper around `Result<NodePayload, String>` for the
481/// resolve reply port (`OncePortRef` requires `Named`).
482#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
483pub struct ResolveReferenceResponse(pub Result<NodePayload, String>);
484wirevalue::register_type!(ResolveReferenceResponse);
485
486/// Message for resolving an opaque reference string into a
487/// `NodePayload`.
488///
489/// This is the primary “navigation” request used by the admin HTTP
490/// bridge: the caller provides a reference (e.g. `"root"`, a `ProcId`
491/// string, or an `ActorId` string) and the `MeshAdminAgent` returns a
492/// uniformly shaped `NodePayload` plus child references to continue
493/// walking the topology.
494///
495/// The work happens inside the admin actor's message loop so
496/// resolution can:
497/// - parse and validate the reference format,
498/// - dispatch to the right host/proc/actor via existing admin
499///   queries, and
500/// - return a structured payload without blocking HTTP handlers on
501///   mesh logic.
502#[derive(
503    Debug,
504    Clone,
505    PartialEq,
506    Serialize,
507    Deserialize,
508    Handler,
509    HandleClient,
510    RefClient,
511    Named
512)]
513pub enum ResolveReferenceMessage {
514    /// Resolve `reference_string` to a `NodePayload`.
515    ///
516    /// On success the reply contains `payload=Some(..), error=None`; on failure
517    /// it contains `payload=None, error=Some(..)`.
518    Resolve {
519        /// Opaque reference string identifying a root/host/proc/actor
520        /// node.
521        reference_string: String,
522        /// Reply port receiving the resolution result.
523        #[reply]
524        reply: hyperactor_reference::OncePortRef<ResolveReferenceResponse>,
525    },
526}
527wirevalue::register_type!(ResolveReferenceMessage);
528
529/// Actor that serves a mesh-level admin HTTP endpoint.
530///
531/// `MeshAdminAgent` is the mesh-wide aggregation point for
532/// introspection: it holds `hyperactor_reference::ActorRef<HostAgent>` handles for each
533/// host, and answers admin queries by forwarding targeted requests to
534/// the appropriate host agent and assembling a uniform `NodePayload`
535/// response for the client.
536///
537/// The agent also exposes an HTTP server (spawned from `init`) and
538/// supports reference-based navigation (`GET /v1/{reference}`) by
539/// resolving opaque reference strings into typed `NodeProperties`
540/// plus child references.
541#[hyperactor::export(handlers = [MeshAdminMessage, ResolveReferenceMessage])]
542pub struct MeshAdminAgent {
543    /// Map of host address string → `HostAgent` reference used to
544    /// fan out our target admin queries.
545    hosts: HashMap<String, hyperactor_reference::ActorRef<HostAgent>>,
546
547    /// Reverse index: `HostAgent` `ActorId` → host address
548    /// string.
549    ///
550    /// The host agent itself is an actor that can appear in multiple
551    /// places (e.g., as a host node and as a child actor under a
552    /// system proc). This index lets reference resolution treat that
553    /// `ActorId` as a *Host* node (via `resolve_host_node`) rather
554    /// than a generic *Actor* node, avoiding cycles / dropped nodes
555    /// in clients like the TUI.
556    host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String>,
557
558    /// `ActorId` of the process-global root client (`client[0]` on
559    /// the singleton Host's `local_proc`), exposed as a first-class
560    /// child of the root node. Routable and introspectable via the
561    /// blanket `Handler<IntrospectMessage>`.
562    root_client_actor_id: Option<hyperactor_reference::ActorId>,
563
564    /// This agent's own `ActorId`, captured during `init`. Used to
565    /// include the admin proc as a visible node in the introspection
566    /// tree (the principle: "if you can send it a message, you can
567    /// introspect it").
568    self_actor_id: Option<hyperactor_reference::ActorId>,
569
570    // -- HTTP server address fields --
571    //
572    // The admin HTTP server has three address representations:
573    //
574    //   1. `admin_addr_override` — caller-supplied bind address
575    //      (constructor param). When `None`, `init` reads
576    //      `MESH_ADMIN_ADDR` from config instead.
577    //
578    //   2. `admin_addr` — the actual `SocketAddr` the OS assigned
579    //      after `TcpListener::bind`. Populated during `init`.
580    //
581    //   3. `admin_host` — human-friendly URL with the machine
582    //      hostname (not the raw IP) so it works with DNS and TLS
583    //      certificate validation. Returned via `GetAdminAddr`.
584    /// Caller-supplied bind address. When `None`, `init` reads
585    /// `MESH_ADMIN_ADDR` from config.
586    admin_addr_override: Option<std::net::SocketAddr>,
587
588    /// Actual bound address after `TcpListener::bind`, populated
589    /// during `init`.
590    admin_addr: Option<std::net::SocketAddr>,
591
592    /// Hostname-based URL (e.g. `"https://myhost.facebook.com:1729"`)
593    /// for the admin HTTP server. Returned via `GetAdminAddr`.
594    admin_host: Option<String>,
595
596    /// When the mesh was started (ISO-8601 timestamp).
597    started_at: String,
598
599    /// Username who started the mesh.
600    started_by: String,
601}
602
603impl MeshAdminAgent {
604    /// Construct a `MeshAdminAgent` from a list of `(host_addr,
605    /// host_agent_ref)` pairs and an optional root client `ActorId`.
606    ///
607    /// Builds both:
608    /// - `hosts`: the forward map used to route admin queries to the
609    ///   correct `HostAgent`, and
610    /// - `host_agents_by_actor_id`: a reverse index used during
611    ///   reference resolution to recognize host-agent `ActorId`s and
612    ///   resolve them as `NodeProperties::Host` rather than as
613    ///   generic actors.
614    ///
615    /// When `root_client_actor_id` is `Some`, the root client appears
616    /// as a first-class child of the root node in the introspection
617    /// tree.
618    ///
619    /// The HTTP listen address is initialized to `None` and populated
620    /// during `init()` after the server socket is bound.
621    pub fn new(
622        hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
623        root_client_actor_id: Option<hyperactor_reference::ActorId>,
624        admin_addr: Option<std::net::SocketAddr>,
625    ) -> Self {
626        let host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String> = hosts
627            .iter()
628            .map(|(addr, agent_ref)| (agent_ref.actor_id().clone(), addr.clone()))
629            .collect();
630
631        // Capture start time and username
632        let started_at = chrono::Utc::now().to_rfc3339();
633        let started_by = std::env::var("USER")
634            .or_else(|_| std::env::var("USERNAME"))
635            .unwrap_or_else(|_| "unknown".to_string());
636
637        Self {
638            hosts: hosts.into_iter().collect(),
639            host_agents_by_actor_id,
640            root_client_actor_id,
641            self_actor_id: None,
642            admin_addr_override: admin_addr,
643            admin_addr: None,
644            admin_host: None,
645            started_at,
646            started_by,
647        }
648    }
649}
650
651impl std::fmt::Debug for MeshAdminAgent {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        f.debug_struct("MeshAdminAgent")
654            .field("hosts", &self.hosts.keys().collect::<Vec<_>>())
655            .field("host_agents", &self.host_agents_by_actor_id.len())
656            .field("root_client_actor_id", &self.root_client_actor_id)
657            .field("self_actor_id", &self.self_actor_id)
658            .field("admin_addr", &self.admin_addr)
659            .field("admin_host", &self.admin_host)
660            .field("started_at", &self.started_at)
661            .field("started_by", &self.started_by)
662            .finish()
663    }
664}
665
666/// Shared state for the reference-based `/v1/{*reference}` bridge
667/// route.
668///
669/// The HTTP handler itself is intentionally thin and does not perform
670/// any routing logic. Instead, it forwards each request into the
671/// `MeshAdminAgent` actor via `ResolveReferenceMessage`, ensuring
672/// resolution happens inside the actor message loop (with access to
673/// actor messaging, timeouts, and indices).
674struct BridgeState {
675    /// Reference to the `MeshAdminAgent` actor that performs
676    /// reference resolution.
677    admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent>,
678    /// Dedicated client mailbox on system_proc for HTTP bridge reply
679    /// ports. Using a separate `Instance<()>` avoids sharing the
680    /// actor's own mailbox with the HTTP bridge and ensures the
681    /// bridge context is routable via system_proc's frontend address.
682    // Previous approach used `this.clone_for_py()` which cloned the
683    // admin actor's Instance:
684    //   bridge_cx: Instance<MeshAdminAgent>,
685    bridge_cx: Instance<()>,
686    /// Limits the number of in-flight resolve requests to prevent
687    /// introspection queries from overwhelming the shared tokio
688    /// runtime and starving user actor workloads.
689    resolve_semaphore: tokio::sync::Semaphore,
690    /// Keep the handle alive so the bridge mailbox is not dropped.
691    _bridge_handle: ActorHandle<()>,
692}
693
694/// A TCP listener that performs a TLS handshake on each accepted
695/// connection before handing it to axum.
696///
697/// Implements [`axum::serve::Listener`] so it can be passed directly
698/// to [`axum::serve`].  Per the trait contract, `accept` handles
699/// errors internally (logging + retrying) and never returns `Err`.
700struct TlsListener {
701    tcp: TcpListener,
702    acceptor: TlsAcceptor,
703}
704
705impl axum::serve::Listener for TlsListener {
706    type Io = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
707    type Addr = std::net::SocketAddr;
708
709    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
710        loop {
711            let (stream, addr) = match self.tcp.accept().await {
712                Ok(conn) => conn,
713                Err(e) => {
714                    tracing::warn!("TCP accept error: {}", e);
715                    continue;
716                }
717            };
718
719            match self.acceptor.accept(stream).await {
720                Ok(tls_stream) => return (tls_stream, addr),
721                Err(e) => {
722                    tracing::warn!("TLS handshake failed from {}: {}", addr, e);
723                    continue;
724                }
725            }
726        }
727    }
728
729    fn local_addr(&self) -> io::Result<Self::Addr> {
730        self.tcp.local_addr()
731    }
732}
733
734#[async_trait]
735impl Actor for MeshAdminAgent {
736    /// Initializes the mesh admin agent and its HTTP server.
737    ///
738    /// 1. Binds well-known message ports (`proc.spawn()` does not
739    ///    call `bind()` — unlike `gspawn` — so the actor must do it
740    ///    itself before becoming reachable).
741    /// 2. Binds a TCP listener (ephemeral or fixed port).
742    /// 3. Builds a TLS acceptor (explicit env vars, then Meta default
743    ///    paths). At Meta (`fbcode_build`), mTLS is mandatory and
744    ///    init fails if no certs are found. In OSS, falls back to
745    ///    plain HTTP.
746    /// 4. Creates a dedicated `Instance<()>` client mailbox on
747    ///    system_proc for the HTTP bridge's reply ports, keeping
748    ///    bridge traffic off the actor's own mailbox.
749    /// 5. Spawns the axum server in a background task (HTTPS with
750    ///    mTLS at Meta, HTTPS or HTTP in OSS depending on step 3).
751    ///
752    /// The hostname-based listen address is stored in `admin_host` so
753    /// it can be returned via `GetAdminAddr`. The scheme (`https://`
754    /// or `http://`) is included so clients know which protocol to
755    /// use.
756    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
757        // Bind well-known ports before the HTTP server is spawned, so
758        // messages (including Undeliverable bounces) can be delivered
759        // as soon as the admin is reachable.
760        this.bind::<Self>();
761        this.set_system();
762        self.self_actor_id = Some(this.self_id().clone());
763
764        let bind_addr = match self.admin_addr_override {
765            Some(addr) => addr,
766            None => hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR)
767                .parse_socket_addr()
768                .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?,
769        };
770        let listener = TcpListener::bind(bind_addr).await?;
771        let bound_addr = listener.local_addr()?;
772        // Report the hostname (e.g. Tupperware container name) + port
773        // rather than a raw IP, so the address works with DNS and TLS
774        // certificate validation.
775        let host = hostname::get()
776            .unwrap_or_else(|_| "localhost".into())
777            .into_string()
778            .unwrap_or_else(|_| "localhost".to_string());
779        self.admin_addr = Some(bound_addr);
780
781        // At Meta: mTLS is mandatory — fail if no certs are found.
782        // In OSS: TLS is best-effort with plain HTTP fallback.
783        // See MA-T1 in module doc.
784        let enforce_mtls = cfg!(fbcode_build);
785        let tls_acceptor = try_tls_acceptor(enforce_mtls);
786
787        if enforce_mtls && tls_acceptor.is_none() {
788            return Err(anyhow::anyhow!(
789                "mesh admin requires mTLS but no TLS certificates found; \
790                 set HYPERACTOR_TLS_CERT/KEY/CA or ensure Meta cert paths exist \
791                 (/var/facebook/x509_identities/server.pem, /var/facebook/rootcanal/ca.pem)"
792            ));
793        }
794
795        let scheme = if tls_acceptor.is_some() {
796            "https"
797        } else {
798            "http"
799        };
800        self.admin_host = Some(format!("{}://{}:{}", scheme, host, bound_addr.port()));
801
802        // Create a dedicated client mailbox on system_proc for the
803        // HTTP bridge's reply ports. This avoids sharing the admin
804        // actor's own mailbox with async HTTP handlers.
805        let (bridge_cx, bridge_handle) = this
806            .proc()
807            .introspectable_instance(MESH_ADMIN_BRIDGE_NAME)?;
808        bridge_cx.set_system();
809        let bridge_state = Arc::new(BridgeState {
810            admin_ref: hyperactor_reference::ActorRef::attest(this.self_id().clone()),
811            bridge_cx,
812            resolve_semaphore: tokio::sync::Semaphore::new(max_concurrent_resolves()),
813            _bridge_handle: bridge_handle,
814        });
815        let router = create_mesh_admin_router(bridge_state);
816
817        if let Some(acceptor) = tls_acceptor {
818            let tls_listener = TlsListener {
819                tcp: listener,
820                acceptor,
821            };
822            tokio::spawn(async move {
823                if let Err(e) = axum::serve(tls_listener, router).await {
824                    tracing::error!("mesh admin server (mTLS) error: {}", e);
825                }
826            });
827        } else {
828            // OSS fallback: plain HTTP (only reachable when !fbcode_build).
829            tokio::spawn(async move {
830                if let Err(e) = axum::serve(listener, router).await {
831                    tracing::error!("mesh admin server error: {}", e);
832                }
833            });
834        }
835
836        tracing::info!(
837            "mesh admin server listening on {}",
838            self.admin_host.as_deref().unwrap_or("unknown")
839        );
840        Ok(())
841    }
842
843    /// Swallow undeliverable message bounces instead of crashing.
844    ///
845    /// The admin agent sends `IntrospectMessage` to actors that may
846    /// not have the introspection port bound (e.g. actors spawned
847    /// via `cx.spawn()` whose `#[export]` list does not include it).
848    /// When the message cannot be delivered, the routing layer
849    /// bounces an `Undeliverable` back to the sender. The default
850    /// `Actor::handle_undeliverable_message` calls `bail!()`, which
851    /// would kill this admin agent and — via supervision cascade —
852    /// take down the entire admin process with `exit(1)`.
853    ///
854    /// Since the admin agent is best-effort infrastructure, an
855    /// undeliverable introspection probe is not a fatal error.
856    async fn handle_undeliverable_message(
857        &mut self,
858        _cx: &Instance<Self>,
859        hyperactor::mailbox::Undeliverable(envelope): hyperactor::mailbox::Undeliverable<
860            hyperactor::mailbox::MessageEnvelope,
861        >,
862    ) -> Result<(), anyhow::Error> {
863        tracing::debug!(
864            "admin agent: undeliverable message to {} (port not bound?), ignoring",
865            envelope.dest(),
866        );
867        Ok(())
868    }
869}
870
871/// Manual Handler impl — swallows `reply.send()` failures so the
872/// admin agent stays alive when the HTTP caller disconnects.
873#[async_trait]
874impl Handler<MeshAdminMessage> for MeshAdminAgent {
875    /// Dispatches `MeshAdminMessage` variants.
876    ///
877    /// Reply-send failures are swallowed because a dropped receiver
878    /// (e.g. the HTTP bridge timed out) is not an error — the caller
879    /// simply went away. Propagating the failure would crash the admin
880    /// agent and take down the entire process.
881    async fn handle(
882        &mut self,
883        cx: &Context<Self>,
884        msg: MeshAdminMessage,
885    ) -> Result<(), anyhow::Error> {
886        match msg {
887            MeshAdminMessage::GetAdminAddr { reply } => {
888                let resp = MeshAdminAddrResponse {
889                    addr: self.admin_host.clone(),
890                };
891                if let Err(e) = reply.send(cx, resp) {
892                    tracing::debug!("GetAdminAddr reply failed (caller gone?): {e}");
893                }
894            }
895        }
896        Ok(())
897    }
898}
899
900/// Manual Handler impl — swallows `reply.send()` failures so the
901/// admin agent stays alive when the HTTP caller disconnects.
902#[async_trait]
903impl Handler<ResolveReferenceMessage> for MeshAdminAgent {
904    /// Dispatches `ResolveReferenceMessage` variants.
905    ///
906    /// The inner `resolve_reference` call never returns `Err` to the
907    /// handler — failures are captured in the response payload.
908    /// Reply-send failures are swallowed for the same reason as
909    /// `MeshAdminMessage`: a dropped receiver means the caller (HTTP
910    /// bridge) went away, which must not crash the admin agent.
911    async fn handle(
912        &mut self,
913        cx: &Context<Self>,
914        msg: ResolveReferenceMessage,
915    ) -> Result<(), anyhow::Error> {
916        match msg {
917            ResolveReferenceMessage::Resolve {
918                reference_string,
919                reply,
920            } => {
921                let response = ResolveReferenceResponse(
922                    self.resolve_reference(cx, &reference_string)
923                        .await
924                        .map_err(|e| format!("{:#}", e)),
925                );
926                if let Err(e) = reply.send(cx, response) {
927                    tracing::debug!("Resolve reply failed (caller gone?): {e}");
928                }
929            }
930        }
931        Ok(())
932    }
933}
934
935impl MeshAdminAgent {
936    /// Core resolver for the reference-based admin API.
937    ///
938    /// Parses the caller-provided `reference_string` (or handles the
939    /// special `"root"` case), then dispatches to
940    /// `resolve_host_node`, `resolve_proc_node`, or
941    /// `resolve_actor_node` to assemble a fully-populated
942    /// `NodePayload` (properties + child references).
943    ///
944    /// The returned payload satisfies the **navigation identity
945    /// invariant** (see module docs): `payload.identity ==
946    /// reference_string`, and `payload.parent` equals the identity of
947    /// the node this one appears under.
948    ///
949    /// Note: this returns `Err` for internal use; the public
950    /// `resolve` handler converts failures into
951    /// `ResolveReferenceResponse(Err(..))` so the actor never crashes
952    /// on
953    /// lookup errors.
954    async fn resolve_reference(
955        &self,
956        cx: &Context<'_, Self>,
957        reference_string: &str,
958    ) -> Result<NodePayload, anyhow::Error> {
959        if reference_string == "root" {
960            return Ok(self.build_root_payload());
961        }
962
963        // Host refs use the "host:<actor_id>" format so they are
964        // unambiguous from plain actor references. The same
965        // HostAgent ActorId can appear both as a host (from root)
966        // and as an actor (from a proc's children list).
967        if let Ok(host_id) = reference_string.parse::<HostId>() {
968            return self.resolve_host_node(cx, &host_id.0).await;
969        }
970
971        let reference: hyperactor_reference::Reference = reference_string
972            .parse()
973            .map_err(|e| anyhow::anyhow!("invalid reference '{}': {}", reference_string, e))?;
974
975        match &reference {
976            hyperactor_reference::Reference::Proc(proc_id) => {
977                // Try the host-managed path first (uses ProcAgent,
978                // sees all actors). Fall back to the standalone
979                // anchor path for procs truly off-mesh (A != C).
980                match self.resolve_proc_node(cx, proc_id).await {
981                    Ok(payload) => Ok(payload),
982                    Err(_) if self.standalone_proc_anchor(proc_id).is_some() => {
983                        self.resolve_standalone_proc_node(cx, proc_id).await
984                    }
985                    Err(e) => Err(e),
986                }
987            }
988            hyperactor_reference::Reference::Actor(actor_id) => {
989                self.resolve_actor_node(cx, actor_id).await
990            }
991            _ => Err(anyhow::anyhow!(
992                "unsupported reference type: {}",
993                reference_string
994            )),
995        }
996    }
997
998    /// Returns the known actors on standalone procs — procs not
999    /// managed by any host but whose actors are routable and
1000    /// introspectable. Each proc appears as a root child; the
1001    /// actor is the "anchor" used to discover the proc's contents.
1002    ///
1003    /// The root client is no longer standalone: spawn_admin registers
1004    /// C (the bootstrap host) as a normal host entry (A/C invariant).
1005    fn standalone_proc_actors(&self) -> impl Iterator<Item = &hyperactor_reference::ActorId> {
1006        std::iter::empty()
1007    }
1008
1009    /// If `proc_id` belongs to a standalone proc, return the anchor
1010    /// actor on that proc. Returns `None` for host-managed procs.
1011    fn standalone_proc_anchor(
1012        &self,
1013        proc_id: &hyperactor_reference::ProcId,
1014    ) -> Option<&hyperactor_reference::ActorId> {
1015        self.standalone_proc_actors()
1016            .find(|actor_id| *actor_id.proc_id() == *proc_id)
1017    }
1018
1019    /// Returns true if `actor_id` lives on a standalone proc.
1020    fn is_standalone_proc_actor(&self, actor_id: &hyperactor_reference::ActorId) -> bool {
1021        self.standalone_proc_actors()
1022            .any(|a| *a.proc_id() == *actor_id.proc_id())
1023    }
1024
1025    /// Construct the synthetic root node for the reference tree.
1026    ///
1027    /// The root is not a real actor/proc; it's a convenience node
1028    /// that anchors navigation. Its children are the configured
1029    /// `HostAgent` actor IDs (as reference strings) plus any
1030    /// standalone procs (root client proc, admin proc).
1031    fn build_root_payload(&self) -> NodePayload {
1032        let children: Vec<String> = self
1033            .hosts
1034            .values()
1035            .map(|agent| HostId(agent.actor_id().clone()).to_string())
1036            .collect();
1037        let system_children: Vec<String> = Vec::new();
1038        let mut attrs = hyperactor_config::Attrs::new();
1039        attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
1040        attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
1041        if let Ok(t) = humantime::parse_rfc3339(&self.started_at) {
1042            attrs.set(crate::introspect::STARTED_AT, t);
1043        }
1044        attrs.set(crate::introspect::STARTED_BY, self.started_by.clone());
1045        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1046        let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1047        NodePayload {
1048            identity: "root".to_string(),
1049            properties: crate::introspect::derive_properties(&attrs_json),
1050            children,
1051            parent: None,
1052            as_of: hyperactor::introspect::format_timestamp(std::time::SystemTime::now()),
1053        }
1054    }
1055
1056    /// Resolve a `HostAgent` actor reference into a host-level
1057    /// `NodePayload`.
1058    ///
1059    /// Sends `IntrospectMessage::Query` directly to the
1060    /// `HostAgent`, which returns a `NodePayload` with
1061    /// `NodeProperties::Host` and the host's children. The resolver
1062    /// overrides `parent` to `"root"` since the host agent
1063    /// doesn't know its position in the navigation tree.
1064    async fn resolve_host_node(
1065        &self,
1066        cx: &Context<'_, Self>,
1067        actor_id: &hyperactor_reference::ActorId,
1068    ) -> Result<NodePayload, anyhow::Error> {
1069        let result = query_introspect(
1070            cx,
1071            actor_id,
1072            hyperactor::introspect::IntrospectView::Entity,
1073            SINGLE_HOST_TIMEOUT,
1074            "querying host agent",
1075        )
1076        .await?;
1077        Ok(crate::introspect::to_node_payload_with(
1078            result,
1079            HostId(actor_id.clone()).to_string(),
1080            Some("root".to_string()),
1081        ))
1082    }
1083
1084    /// Resolve a `ProcId` reference into a proc-level `NodePayload`.
1085    ///
1086    /// First tries `IntrospectMessage::QueryChild` against the owning
1087    /// `HostAgent` (which recognizes service and local procs). If
1088    /// that returns an error payload, falls back to `ProcAgent` for
1089    /// user procs by querying
1090    /// `QueryChild(hyperactor_reference::Reference::Proc(proc_id))`
1091    /// on `<proc_id>/proc_agent[0]`.
1092    ///
1093    /// See PA-1 in module doc.
1094    async fn resolve_proc_node(
1095        &self,
1096        cx: &Context<'_, Self>,
1097        proc_id: &hyperactor_reference::ProcId,
1098    ) -> Result<NodePayload, anyhow::Error> {
1099        let host_addr = proc_id.addr().to_string();
1100
1101        let agent = self
1102            .hosts
1103            .get(&host_addr)
1104            .ok_or_else(|| anyhow::anyhow!("host not found: {}", host_addr))?;
1105
1106        // Try the host agent's QueryChild first.
1107        let result = query_child_introspect(
1108            cx,
1109            agent.actor_id(),
1110            hyperactor_reference::Reference::Proc(proc_id.clone()),
1111            QUERY_CHILD_TIMEOUT,
1112            "querying proc details",
1113        )
1114        .await?;
1115
1116        // If the host recognized the proc, use its response directly.
1117        // No identity/parent normalization — QueryChild sets them correctly.
1118        let payload = to_node_payload(result);
1119        if !matches!(payload.properties, NodeProperties::Error { .. }) {
1120            return Ok(payload);
1121        }
1122
1123        // Fall back to querying the ProcAgent directly (user procs).
1124        let mesh_agent_id = proc_id.actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0);
1125        let result = query_child_introspect(
1126            cx,
1127            &mesh_agent_id,
1128            hyperactor_reference::Reference::Proc(proc_id.clone()),
1129            resolve_actor_timeout(),
1130            "querying proc mesh agent",
1131        )
1132        .await?;
1133
1134        Ok(crate::introspect::to_node_payload_with(
1135            result,
1136            proc_id.to_string(),
1137            Some(HostId(agent.actor_id().clone()).to_string()),
1138        ))
1139    }
1140
1141    /// Resolve a standalone proc into a proc-level `NodePayload`.
1142    ///
1143    /// Standalone procs (e.g. the admin proc) are not managed by any
1144    /// `HostAgent`, so
1145    /// `resolve_proc_node` cannot resolve them. Instead, we query the
1146    /// anchor actor on the proc for its introspection data, collect
1147    /// its supervision children, and build a synthetic proc node.
1148    ///
1149    /// Special case: when the anchor actor is this agent itself, we
1150    /// build the children list directly (just `[self]`) to avoid a
1151    /// self-deadlock — the actor loop cannot process an
1152    /// `IntrospectMessage` it sends to itself while handling a
1153    /// resolve request.
1154    async fn resolve_standalone_proc_node(
1155        &self,
1156        cx: &Context<'_, Self>,
1157        proc_id: &hyperactor_reference::ProcId,
1158    ) -> Result<NodePayload, anyhow::Error> {
1159        let actor_id = self
1160            .standalone_proc_anchor(proc_id)
1161            .ok_or_else(|| anyhow::anyhow!("no anchor actor for standalone proc {}", proc_id))?;
1162
1163        let (children, system_children) = if self.self_actor_id.as_ref() == Some(actor_id) {
1164            // Self-proc: we are the only actor; no message needed.
1165            // The admin agent is a system actor.
1166            let self_ref = actor_id.to_string();
1167            (vec![self_ref.clone()], vec![self_ref])
1168        } else {
1169            // Query the anchor actor for its supervision children.
1170            let actor_result = query_introspect(
1171                cx,
1172                actor_id,
1173                hyperactor::introspect::IntrospectView::Actor,
1174                SINGLE_HOST_TIMEOUT,
1175                &format!("querying anchor actor on {}", proc_id),
1176            )
1177            .await?;
1178            // No identity/parent normalization — only reading properties for is_system check.
1179            let actor_payload = to_node_payload(actor_result);
1180            // Check if anchor actor is system.
1181            let anchor_ref = actor_id.to_string();
1182            let anchor_is_system = matches!(
1183                &actor_payload.properties,
1184                NodeProperties::Actor {
1185                    is_system: true,
1186                    ..
1187                }
1188            );
1189
1190            let mut children = vec![anchor_ref.clone()];
1191            let mut system_children = Vec::new();
1192            if anchor_is_system {
1193                system_children.push(anchor_ref);
1194            }
1195
1196            // Query each supervision child to check is_system.
1197            for child_ref in actor_payload.children {
1198                if let Ok(child_actor_id) = child_ref.parse::<hyperactor_reference::ActorId>() {
1199                    let child_is_system = if let Ok(r) = query_introspect(
1200                        cx,
1201                        &child_actor_id,
1202                        hyperactor::introspect::IntrospectView::Actor,
1203                        resolve_actor_timeout(),
1204                        "querying child actor is_system",
1205                    )
1206                    .await
1207                    {
1208                        let p = to_node_payload(r);
1209                        matches!(
1210                            &p.properties,
1211                            NodeProperties::Actor {
1212                                is_system: true,
1213                                ..
1214                            }
1215                        )
1216                    } else {
1217                        false
1218                    };
1219                    if child_is_system {
1220                        system_children.push(child_ref.clone());
1221                    }
1222                }
1223                children.push(child_ref);
1224            }
1225            (children, system_children)
1226        };
1227
1228        let proc_name = proc_id.name().to_string();
1229
1230        // Build attrs for standalone proc.
1231        let mut attrs = hyperactor_config::Attrs::new();
1232        attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
1233        attrs.set(crate::introspect::PROC_NAME, proc_name.clone());
1234        attrs.set(crate::introspect::NUM_ACTORS, children.len());
1235        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1236        let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1237
1238        Ok(NodePayload {
1239            identity: proc_id.to_string(),
1240            properties: crate::introspect::derive_properties(&attrs_json),
1241            children,
1242            as_of: hyperactor::introspect::format_timestamp(std::time::SystemTime::now()),
1243            parent: Some("root".to_string()),
1244        })
1245    }
1246
1247    /// Resolve a non-host-agent `ActorId` reference into an
1248    /// actor-level `NodePayload`.
1249    ///
1250    /// Sends `IntrospectMessage::Query` directly to the target actor
1251    /// via `PortRef::attest_message_port`. The blanket handler
1252    /// returns a `NodePayload` with `NodeProperties::Actor` (or a
1253    /// domain-specific override like `NodeProperties::Proc` for
1254    /// `ProcAgent`).
1255    ///
1256    /// The resolver sets `parent` based on the actor's position
1257    /// in the topology: if the actor lives in a system proc, the
1258    /// parent is the system proc ref; otherwise it's the proc's
1259    /// `ProcId` string.
1260    async fn resolve_actor_node(
1261        &self,
1262        cx: &Context<'_, Self>,
1263        actor_id: &hyperactor_reference::ActorId,
1264    ) -> Result<NodePayload, anyhow::Error> {
1265        // Self-resolution: we cannot send IntrospectMessage to our
1266        // own actor loop while handling a resolve request (deadlock).
1267        // Use introspect_payload() to snapshot our own state
1268        // directly.
1269        let result = if self.self_actor_id.as_ref() == Some(actor_id) {
1270            cx.introspect_payload()
1271        } else if self.is_standalone_proc_actor(actor_id) {
1272            // Standalone procs have no ProcAgent — query directly.
1273            query_introspect(
1274                cx,
1275                actor_id,
1276                hyperactor::introspect::IntrospectView::Actor,
1277                SINGLE_HOST_TIMEOUT,
1278                &format!("querying actor {}", actor_id),
1279            )
1280            .await?
1281        } else {
1282            // Check terminated snapshots first — fast, no ambiguity.
1283            let proc_id = actor_id.proc_id();
1284            let mesh_agent_id = proc_id.actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0);
1285            let terminated = query_child_introspect(
1286                cx,
1287                &mesh_agent_id,
1288                hyperactor_reference::Reference::Actor(actor_id.clone()),
1289                QUERY_CHILD_TIMEOUT,
1290                "querying terminated snapshot",
1291            )
1292            .await
1293            .ok()
1294            .filter(|r| {
1295                let p = crate::introspect::derive_properties(&r.attrs);
1296                !matches!(p, NodeProperties::Error { .. })
1297            });
1298
1299            match terminated {
1300                Some(snapshot) => snapshot,
1301                None => {
1302                    // Not terminated — query the live actor.
1303                    query_introspect(
1304                        cx,
1305                        actor_id,
1306                        hyperactor::introspect::IntrospectView::Actor,
1307                        resolve_actor_timeout(),
1308                        &format!("querying actor {}", actor_id),
1309                    )
1310                    .await?
1311                }
1312            }
1313        };
1314        let mut payload = to_node_payload(result);
1315
1316        // Actors on standalone procs: parent is the proc.
1317        if self.is_standalone_proc_actor(actor_id) {
1318            payload.parent = Some(actor_id.proc_id().to_string());
1319            return Ok(payload);
1320        }
1321
1322        // Set parent based on topology. If the actor returns Proc
1323        // properties (ProcAgent override), its parent is the host
1324        // agent. Otherwise, it's a regular actor and its parent is
1325        // the proc.
1326        let proc_id = actor_id.proc_id();
1327        match &payload.properties {
1328            NodeProperties::Proc { .. } => {
1329                // ProcAgent: parent is the host agent.
1330                let host_addr = proc_id.addr().to_string();
1331                if let Some(agent) = self.hosts.get(&host_addr) {
1332                    payload.parent = Some(HostId(agent.actor_id().clone()).to_string());
1333                }
1334            }
1335            _ => {
1336                // Regular actor: parent is the proc. We use the
1337                // system proc ref format if the proc is a known
1338                // system/local proc, otherwise the ProcAgent
1339                // ActorId.
1340                // Parent is the proc node, whose identity is the
1341                // ProcId string (same for system and user procs).
1342                payload.parent = Some(proc_id.to_string());
1343            }
1344        }
1345
1346        Ok(payload)
1347    }
1348}
1349
1350/// Build the Axum router for the mesh admin HTTP server.
1351///
1352/// Routes:
1353/// - `GET /v1/schema` — JSON Schema (Draft 2020-12) for `NodePayload`.
1354/// - `GET /v1/schema/error` — JSON Schema for `ApiErrorEnvelope`.
1355/// - `GET /v1/openapi.json` — OpenAPI 3.1 spec (embeds JSON Schemas).
1356/// - `GET /v1/tree` — ASCII topology dump.
1357/// - `GET /v1/{*reference}` — JSON `NodePayload` for a single reference.
1358/// - `GET /SKILL.md` — agent-facing API documentation (markdown).
1359fn create_mesh_admin_router(bridge_state: Arc<BridgeState>) -> Router {
1360    Router::new()
1361        .route("/SKILL.md", get(serve_skill_md))
1362        // Literal paths matched by specificity before wildcard (SC-5).
1363        .route("/v1/schema", get(serve_schema))
1364        .route("/v1/schema/error", get(serve_error_schema))
1365        .route("/v1/openapi.json", get(serve_openapi))
1366        .route("/v1/tree", get(tree_dump))
1367        .route("/v1/{*reference}", get(resolve_reference_bridge))
1368        .with_state(bridge_state)
1369}
1370
1371/// Raw markdown template for the SKILL.md API document.
1372const SKILL_MD_TEMPLATE: &str = include_str!("mesh_admin_skill.md");
1373
1374/// Extract base URL from request headers.
1375///
1376/// Defaults to `https` when `x-forwarded-proto` is absent — the
1377/// admin server uses TLS in production, so `http` is the wrong
1378/// default for direct connections.
1379fn extract_base_url(headers: &axum::http::HeaderMap) -> String {
1380    let host = headers
1381        .get(axum::http::header::HOST)
1382        .and_then(|v| v.to_str().ok())
1383        .unwrap_or("localhost");
1384    let scheme = headers
1385        .get("x-forwarded-proto")
1386        .and_then(|v| v.to_str().ok())
1387        .unwrap_or("https");
1388    format!("{scheme}://{host}")
1389}
1390
1391/// Serves the self-describing API document with the base URL
1392/// interpolated so examples are copy-pasteable.
1393async fn serve_skill_md(headers: axum::http::HeaderMap) -> impl axum::response::IntoResponse {
1394    let base = extract_base_url(&headers);
1395    let body = SKILL_MD_TEMPLATE.replace("{base}", &base);
1396    (
1397        [(
1398            axum::http::header::CONTENT_TYPE,
1399            "text/markdown; charset=utf-8",
1400        )],
1401        body,
1402    )
1403}
1404
1405/// Build a JSON Schema value with a `$id` field.
1406fn schema_with_id<T: schemars::JsonSchema>(id: &str) -> Result<serde_json::Value, ApiError> {
1407    let schema = schemars::schema_for!(T);
1408    let mut value = serde_json::to_value(schema).map_err(|e| ApiError {
1409        code: "internal_error".to_string(),
1410        message: format!("failed to serialize schema: {e}"),
1411        details: None,
1412    })?;
1413    if let Some(obj) = value.as_object_mut() {
1414        obj.insert("$id".into(), serde_json::Value::String(id.into()));
1415    }
1416    Ok(value)
1417}
1418
1419/// JSON Schema for the `NodePayload` response type.
1420async fn serve_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1421    Ok(axum::response::Json(schema_with_id::<
1422        crate::introspect::NodePayload,
1423    >(
1424        "https://monarch.meta.com/schemas/v1/node_payload",
1425    )?))
1426}
1427
1428/// JSON Schema for the `ApiErrorEnvelope` error response.
1429async fn serve_error_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1430    Ok(axum::response::Json(schema_with_id::<ApiErrorEnvelope>(
1431        "https://monarch.meta.com/schemas/v1/error",
1432    )?))
1433}
1434
1435/// Hoist `$defs` from a schemars-generated schema into a shared
1436/// map and rewrite internal `$ref` pointers from `#/$defs/X` to
1437/// `#/components/schemas/X` so OpenAPI tools can resolve them.
1438fn hoist_defs(
1439    schema: &mut serde_json::Value,
1440    shared: &mut serde_json::Map<String, serde_json::Value>,
1441) {
1442    if let Some(obj) = schema.as_object_mut() {
1443        if let Some(defs) = obj.remove("$defs") {
1444            if let Some(defs_map) = defs.as_object() {
1445                for (k, v) in defs_map {
1446                    shared.insert(k.clone(), v.clone());
1447                }
1448            }
1449        }
1450        // Also remove $schema from embedded schemas — it's
1451        // only valid at the root of a JSON Schema document,
1452        // not inside an OpenAPI components/schemas entry.
1453        obj.remove("$schema");
1454    }
1455    rewrite_refs(schema);
1456}
1457
1458/// Recursively rewrite `$ref: "#/$defs/X"` →
1459/// `$ref: "#/components/schemas/X"`.
1460fn rewrite_refs(value: &mut serde_json::Value) {
1461    match value {
1462        serde_json::Value::Object(map) => {
1463            if let Some(serde_json::Value::String(r)) = map.get_mut("$ref") {
1464                if r.starts_with("#/$defs/") {
1465                    *r = r.replace("#/$defs/", "#/components/schemas/");
1466                }
1467            }
1468            for v in map.values_mut() {
1469                rewrite_refs(v);
1470            }
1471        }
1472        serde_json::Value::Array(arr) => {
1473            for v in arr {
1474                rewrite_refs(v);
1475            }
1476        }
1477        _ => {}
1478    }
1479}
1480
1481/// Build the OpenAPI 3.1 spec, embedding schemars-derived JSON
1482/// Schemas into `components/schemas`.
1483pub fn build_openapi_spec() -> serde_json::Value {
1484    let mut node_schema =
1485        serde_json::to_value(schemars::schema_for!(crate::introspect::NodePayload))
1486            .expect("NodePayload schema must be serializable");
1487    let mut error_schema = serde_json::to_value(schemars::schema_for!(ApiErrorEnvelope))
1488        .expect("ApiErrorEnvelope schema must be serializable");
1489
1490    // Hoist $defs into a shared components/schemas map so
1491    // OpenAPI tools can resolve references.
1492    let mut shared_schemas = serde_json::Map::new();
1493    hoist_defs(&mut node_schema, &mut shared_schemas);
1494    hoist_defs(&mut error_schema, &mut shared_schemas);
1495    shared_schemas.insert("NodePayload".into(), node_schema);
1496    shared_schemas.insert("ApiErrorEnvelope".into(), error_schema);
1497
1498    let error_response = |desc: &str| -> serde_json::Value {
1499        serde_json::json!({
1500            "description": desc,
1501            "content": {
1502                "application/json": {
1503                    "schema": { "$ref": "#/components/schemas/ApiErrorEnvelope" }
1504                }
1505            }
1506        })
1507    };
1508
1509    let success_payload = serde_json::json!({
1510        "description": "Resolved NodePayload",
1511        "content": {
1512            "application/json": {
1513                "schema": { "$ref": "#/components/schemas/NodePayload" }
1514            }
1515        }
1516    });
1517
1518    serde_json::json!({
1519        "openapi": "3.1.0",
1520        "info": {
1521            "title": "Monarch Mesh Admin API",
1522            "version": "1.0.0",
1523            "description": "Reference-walking introspection API for a Monarch actor mesh. See the Admin Gateway Pattern RFC."
1524        },
1525        "paths": {
1526            "/v1/root": {
1527                "get": {
1528                    "summary": "Fetch root node",
1529                    "operationId": "getRoot",
1530                    "responses": {
1531                        "200": success_payload,
1532                        "500": error_response("Internal error"),
1533                        "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1534                        "504": error_response("Gateway timeout (downstream host unresponsive)")
1535                    }
1536                }
1537            },
1538            "/v1/{reference}": {
1539                "get": {
1540                    "summary": "Resolve a reference to a NodePayload",
1541                    "operationId": "resolveReference",
1542                    "parameters": [{
1543                        "name": "reference",
1544                        "in": "path",
1545                        "required": true,
1546                        "description": "URL-encoded opaque reference string",
1547                        "schema": { "type": "string" }
1548                    }],
1549                    "responses": {
1550                        "200": success_payload,
1551                        "400": error_response("Bad request (malformed reference)"),
1552                        "404": error_response("Reference not found"),
1553                        "500": error_response("Internal error"),
1554                        "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1555                        "504": error_response("Gateway timeout (downstream host unresponsive)")
1556                    }
1557                }
1558            },
1559            "/v1/schema": {
1560                "get": {
1561                    "summary": "JSON Schema for NodePayload (Draft 2020-12)",
1562                    "operationId": "getSchema",
1563                    "responses": {
1564                        "200": {
1565                            "description": "JSON Schema document",
1566                            "content": { "application/json": {} }
1567                        }
1568                    }
1569                }
1570            },
1571            "/v1/schema/error": {
1572                "get": {
1573                    "summary": "JSON Schema for ApiErrorEnvelope (Draft 2020-12)",
1574                    "operationId": "getErrorSchema",
1575                    "responses": {
1576                        "200": {
1577                            "description": "JSON Schema document",
1578                            "content": { "application/json": {} }
1579                        }
1580                    }
1581                }
1582            },
1583            "/v1/tree": {
1584                "get": {
1585                    "summary": "ASCII topology dump (debug)",
1586                    "operationId": "getTree",
1587                    "responses": {
1588                        "200": {
1589                            "description": "Human-readable topology tree",
1590                            "content": { "text/plain": {} }
1591                        }
1592                    }
1593                }
1594            }
1595        },
1596        "components": {
1597            "schemas": serde_json::Value::Object(shared_schemas)
1598        }
1599    })
1600}
1601
1602/// OpenAPI 3.1 spec for the mesh admin API.
1603async fn serve_openapi() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1604    Ok(axum::response::Json(build_openapi_spec()))
1605}
1606
1607/// Resolve an opaque reference string to a `NodePayload` via the
1608/// actor-based resolver.
1609///
1610/// Implements `GET /v1/{*reference}` for the reference-walking client
1611/// (e.g. the TUI):
1612/// - Decodes the wildcard path segment into the original reference
1613///   string (Axum does not percent-decode `{*reference}` captures).
1614/// - Sends `ResolveReferenceMessage::Resolve` to `MeshAdminAgent` and
1615///   awaits the reply.
1616/// - Maps resolver failures into appropriate `ApiError`s
1617///   (`bad_request`, `not_found`, `gateway_timeout`, or
1618///   `internal_error`).
1619async fn resolve_reference_bridge(
1620    State(state): State<Arc<BridgeState>>,
1621    AxumPath(reference): AxumPath<String>,
1622) -> Result<Json<NodePayload>, ApiError> {
1623    // Axum's wildcard may include a leading slash; strip it.
1624    let reference = reference.trim_start_matches('/');
1625    if reference.is_empty() {
1626        return Err(ApiError::bad_request("empty reference", None));
1627    }
1628    let reference = urlencoding::decode(reference)
1629        .map(|cow| cow.into_owned())
1630        .map_err(|_| {
1631            ApiError::bad_request(
1632                "malformed percent-encoding: decoded bytes are not valid UTF-8",
1633                None,
1634            )
1635        })?;
1636
1637    // Limit concurrent resolves to avoid starving user workloads
1638    // that share this tokio runtime.
1639    let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
1640        tracing::warn!("mesh admin: rejecting resolve request (503): too many concurrent requests");
1641        ApiError {
1642            code: "service_unavailable".to_string(),
1643            message: "too many concurrent introspection requests".to_string(),
1644            details: None,
1645        }
1646    })?;
1647
1648    let cx = &state.bridge_cx;
1649    let resolve_start = std::time::Instant::now();
1650    let response = tokio::time::timeout(
1651        SINGLE_HOST_TIMEOUT,
1652        state.admin_ref.resolve(cx, reference.clone()),
1653    )
1654    .await
1655    .map_err(|_| {
1656        tracing::warn!(
1657            reference = %reference,
1658            elapsed_ms = resolve_start.elapsed().as_millis() as u64,
1659            "mesh admin: resolve timed out (gateway_timeout)",
1660        );
1661        ApiError {
1662            code: "gateway_timeout".to_string(),
1663            message: "timed out resolving reference".to_string(),
1664            details: None,
1665        }
1666    })?
1667    .map_err(|e| ApiError {
1668        code: "internal_error".to_string(),
1669        message: format!("failed to resolve reference: {}", e),
1670        details: None,
1671    })?;
1672
1673    match response.0 {
1674        Ok(payload) => Ok(Json(payload)),
1675        Err(error) => Err(ApiError::not_found(error, None)),
1676    }
1677}
1678
1679/// Timeout for the tree dump fan-out. Kept short so that slow or dead
1680/// hosts don't block the response.
1681const TREE_TIMEOUT: Duration = Duration::from_secs(10);
1682
1683/// `GET /v1/tree` — ASCII topology dump.
1684///
1685/// Walks the reference graph starting from `"root"`, resolving each
1686/// host and its proc children, and formats the result as a
1687/// human-readable ASCII tree suitable for quick `curl` inspection.
1688/// Each line includes a clickable URL for drilling into that node via
1689/// the reference API. Built on top of the same
1690/// `ResolveReferenceMessage` protocol used by the TUI.
1691///
1692/// Output format:
1693/// ```text
1694/// unix:@hash  ->  https://host:port/v1/...  (or http:// in OSS)
1695/// ├── service  ->  https://host:port/v1/...
1696/// │   ├── agent[0]  ->  https://host:port/v1/...
1697/// │   └── client[0]  ->  https://host:port/v1/...
1698/// ├── local  ->  https://host:port/v1/...
1699/// └── philosophers_0  ->  https://host:port/v1/...
1700///     ├── agent[0]  ->  https://host:port/v1/...
1701///     └── philosopher[0]  ->  https://host:port/v1/...
1702/// ```
1703async fn tree_dump(
1704    State(state): State<Arc<BridgeState>>,
1705    headers: axum::http::header::HeaderMap,
1706) -> Result<String, ApiError> {
1707    // Limit concurrent resolves to avoid starving user workloads.
1708    let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
1709        tracing::warn!(
1710            "mesh admin: rejecting tree_dump request (503): too many concurrent requests"
1711        );
1712        ApiError {
1713            code: "service_unavailable".to_string(),
1714            message: "too many concurrent introspection requests".to_string(),
1715            details: None,
1716        }
1717    })?;
1718
1719    let cx = &state.bridge_cx;
1720
1721    // Build base URL from the Host header for clickable links.
1722    let host = headers
1723        .get("host")
1724        .and_then(|v| v.to_str().ok())
1725        .unwrap_or("localhost");
1726    let scheme = headers
1727        .get("x-forwarded-proto")
1728        .and_then(|v| v.to_str().ok())
1729        .unwrap_or("http");
1730    let base_url = format!("{}://{}", scheme, host);
1731
1732    // Resolve root.
1733    let root_resp = tokio::time::timeout(
1734        TREE_TIMEOUT,
1735        state.admin_ref.resolve(cx, "root".to_string()),
1736    )
1737    .await
1738    .map_err(|_| ApiError {
1739        code: "gateway_timeout".to_string(),
1740        message: "timed out resolving root".to_string(),
1741        details: None,
1742    })?
1743    .map_err(|e| ApiError {
1744        code: "internal_error".to_string(),
1745        message: format!("failed to resolve root: {}", e),
1746        details: None,
1747    })?;
1748
1749    let root = root_resp.0.map_err(|e| ApiError {
1750        code: "internal_error".to_string(),
1751        message: e,
1752        details: None,
1753    })?;
1754
1755    let mut output = String::new();
1756
1757    // Resolve each root child. Hosts get the full host→proc→actor
1758    // subtree; non-host children (e.g. the root client actor) are
1759    // rendered as single leaf lines.
1760    for child_ref in &root.children {
1761        let resp =
1762            tokio::time::timeout(TREE_TIMEOUT, state.admin_ref.resolve(cx, child_ref.clone()))
1763                .await;
1764
1765        let payload = match resp {
1766            Ok(Ok(r)) => r.0.ok(),
1767            _ => None,
1768        };
1769
1770        match payload {
1771            Some(node) if matches!(node.properties, NodeProperties::Host { .. }) => {
1772                // Host header: show the addr from NodeProperties::Host.
1773                let header = match &node.properties {
1774                    NodeProperties::Host { addr, .. } => addr.clone(),
1775                    _ => child_ref.clone(),
1776                };
1777                let host_url = format!("{}/v1/{}", base_url, urlencoding::encode(child_ref));
1778                output.push_str(&format!("{}  ->  {}\n", header, host_url));
1779
1780                // Proc children with box-drawing connectors.
1781                let num_procs = node.children.len();
1782                for (i, proc_ref) in node.children.iter().enumerate() {
1783                    let is_last_proc = i == num_procs - 1;
1784                    let proc_connector = if is_last_proc {
1785                        "└── "
1786                    } else {
1787                        "├── "
1788                    };
1789                    let proc_name = derive_tree_label(proc_ref);
1790                    let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(proc_ref));
1791                    output.push_str(&format!(
1792                        "{}{}  ->  {}\n",
1793                        proc_connector, proc_name, proc_url
1794                    ));
1795
1796                    // Resolve the proc to get its actor children.
1797                    let proc_resp = tokio::time::timeout(
1798                        TREE_TIMEOUT,
1799                        state.admin_ref.resolve(cx, proc_ref.clone()),
1800                    )
1801                    .await;
1802                    let proc_payload = match proc_resp {
1803                        Ok(Ok(r)) => r.0.ok(),
1804                        _ => None,
1805                    };
1806                    if let Some(proc_node) = proc_payload {
1807                        let num_actors = proc_node.children.len();
1808                        let child_prefix = if is_last_proc { "    " } else { "│   " };
1809                        for (j, actor_ref) in proc_node.children.iter().enumerate() {
1810                            let actor_connector = if j == num_actors - 1 {
1811                                "└── "
1812                            } else {
1813                                "├── "
1814                            };
1815                            let actor_label = derive_actor_label(actor_ref);
1816                            let actor_url =
1817                                format!("{}/v1/{}", base_url, urlencoding::encode(actor_ref));
1818                            output.push_str(&format!(
1819                                "{}{}{}  ->  {}\n",
1820                                child_prefix, actor_connector, actor_label, actor_url
1821                            ));
1822                        }
1823                    }
1824                }
1825                output.push('\n');
1826            }
1827            Some(node) if matches!(node.properties, NodeProperties::Proc { .. }) => {
1828                // Non-host proc (e.g. root client proc). Render as a
1829                // proc-level subtree with its actor children.
1830                let proc_name = match &node.properties {
1831                    NodeProperties::Proc { proc_name, .. } => proc_name.clone(),
1832                    _ => child_ref.clone(),
1833                };
1834                let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(child_ref));
1835                output.push_str(&format!("{}  ->  {}\n", proc_name, proc_url));
1836
1837                let num_actors = node.children.len();
1838                for (j, actor_ref) in node.children.iter().enumerate() {
1839                    let actor_connector = if j == num_actors - 1 {
1840                        "└── "
1841                    } else {
1842                        "├── "
1843                    };
1844                    let actor_label = derive_actor_label(actor_ref);
1845                    let actor_url = format!("{}/v1/{}", base_url, urlencoding::encode(actor_ref));
1846                    output.push_str(&format!(
1847                        "{}{}  ->  {}\n",
1848                        actor_connector, actor_label, actor_url
1849                    ));
1850                }
1851                output.push('\n');
1852            }
1853            Some(_node) => {
1854                // Non-host root child (e.g. root client actor).
1855                // Render as a single leaf line.
1856                let label = derive_actor_label(child_ref);
1857                let url = format!("{}/v1/{}", base_url, urlencoding::encode(child_ref));
1858                output.push_str(&format!("{}  ->  {}\n\n", label, url));
1859            }
1860            _ => {
1861                output.push_str(&format!("{} (unreachable)\n\n", child_ref));
1862            }
1863        }
1864    }
1865    Ok(output)
1866}
1867
1868/// Derive a short display label from a reference string for the ASCII
1869/// tree.
1870///
1871/// Extracts the proc name — the meaningful identifier for tree
1872/// display — from the various reference formats emitted by
1873/// `HostAgent`'s children list:
1874///
1875/// - System proc ref `"[system] unix:@hash,service"` → `"service"`
1876/// - ProcAgent ActorId `"unix:@hash,my_proc,agent[0]"` →
1877///   `"my_proc"`
1878/// - Bare ProcId `"unix:@hash,my_proc"` → `"my_proc"`
1879///
1880/// Note: `ActorId::Display` for `ProcId` uses commas as
1881/// separators (`proc_id,actor_name[idx]`), not slashes.
1882fn derive_tree_label(reference: &str) -> String {
1883    // ActorId (Direct): "transport!addr,proc_name,actor[idx]"
1884    // ProcId (Direct): "transport!addr,proc_name"
1885    // In both cases, split on ',' and take the second segment (the
1886    // proc name).
1887    let parts: Vec<&str> = reference.splitn(3, ',').collect();
1888    match parts.len() {
1889        // "addr,proc_name,actor[idx]" → proc_name
1890        3 => parts[1].to_string(),
1891        // "addr,proc_name" → proc_name
1892        2 => parts[1].to_string(),
1893        _ => reference.to_string(),
1894    }
1895}
1896
1897/// Derive a short display label for an actor reference.
1898///
1899/// Actor references are `ActorId` strings in the format
1900/// `"transport!addr,proc_name,actor_name[idx]"`. This extracts the
1901/// actor name with index (e.g. `"philosopher[0]"`).
1902fn derive_actor_label(reference: &str) -> String {
1903    let parts: Vec<&str> = reference.splitn(3, ',').collect();
1904    match parts.len() {
1905        // "addr,proc_name,actor[idx]" → actor[idx]
1906        3 => parts[2].to_string(),
1907        // "addr,name" → name
1908        2 => parts[1].to_string(),
1909        _ => reference.to_string(),
1910    }
1911}
1912
1913// -- CLI-based mast_conda:/// resolution --
1914//
1915// See module doc for MC-1..MC-5 invariants.
1916
1917/// Top-level response from `mast get-status --json`.
1918#[derive(serde::Deserialize)]
1919struct MastStatusResponse {
1920    #[serde(rename = "latestAttempt")]
1921    latest_attempt: MastAttempt,
1922}
1923
1924/// A single execution attempt containing task groups.
1925#[derive(serde::Deserialize)]
1926struct MastAttempt {
1927    #[serde(rename = "taskGroupExecutionAttempts")]
1928    task_groups: std::collections::HashMap<String, Vec<MastTaskGroup>>,
1929}
1930
1931/// A task group execution attempt containing per-task attempts.
1932#[derive(serde::Deserialize)]
1933struct MastTaskGroup {
1934    #[serde(rename = "taskExecutionAttempts")]
1935    tasks: std::collections::HashMap<String, Vec<MastTaskAttempt>>,
1936}
1937
1938/// A single task execution attempt with an optional hostname.
1939#[derive(serde::Deserialize)]
1940struct MastTaskAttempt {
1941    hostname: Option<String>,
1942}
1943
1944/// Extract the head node hostname from a parsed MAST status response
1945/// (MC-2).
1946///
1947/// For each task group, the last attempt is selected. Within that
1948/// attempt, each task's last execution attempt provides the hostname.
1949/// Task indices (the map keys) are parsed as integers and sorted
1950/// ascending; the first hostname is the head node.
1951fn head_hostname(response: &MastStatusResponse) -> Result<String, String> {
1952    let mut hosts: Vec<(i64, String)> = Vec::new();
1953    for attempts in response.latest_attempt.task_groups.values() {
1954        let group = match attempts.last() {
1955            Some(g) => g,
1956            None => continue,
1957        };
1958        for (index_str, task_attempts) in &group.tasks {
1959            let attempt = match task_attempts.last() {
1960                Some(a) => a,
1961                None => continue,
1962            };
1963            if let Some(ref hostname) = attempt.hostname {
1964                let index = index_str.parse::<i64>().unwrap_or(i64::MAX);
1965                hosts.push((index, hostname.clone()));
1966            }
1967        }
1968    }
1969    hosts.sort_by_key(|(idx, _)| *idx);
1970    hosts
1971        .into_iter()
1972        .next()
1973        .map(|(_, h)| h)
1974        .ok_or_else(|| "no hostnames found in MAST response".to_string())
1975}
1976
1977/// Qualify a short hostname to an FQDN via
1978/// `getaddrinfo(AI_CANONNAME)` (MC-3,
1979/// MC-4).
1980///
1981/// Called via `spawn_blocking` to avoid blocking tokio workers. Falls
1982/// back to the raw hostname on any failure.
1983async fn qualify_fqdn(hostname: &str) -> String {
1984    if hostname.contains('.') {
1985        return hostname.to_string();
1986    }
1987    let owned = hostname.to_string();
1988    let fallback = owned.clone();
1989    tokio::task::spawn_blocking(move || qualify_fqdn_blocking(&owned))
1990        .await
1991        .unwrap_or(fallback)
1992}
1993
1994fn qualify_fqdn_blocking(hostname: &str) -> String {
1995    use std::ffi::CStr;
1996    use std::ffi::CString;
1997    use std::ptr;
1998
1999    let c_host = match CString::new(hostname) {
2000        Ok(c) => c,
2001        Err(_) => return hostname.to_string(),
2002    };
2003    // SAFETY: zeroed addrinfo is a valid hints struct (all-zero is
2004    // AF_UNSPEC with no flags, which we then override).
2005    let mut hints: libc::addrinfo = unsafe { std::mem::zeroed() };
2006    hints.ai_flags = libc::AI_CANONNAME;
2007    hints.ai_family = libc::AF_UNSPEC;
2008
2009    let mut result: *mut libc::addrinfo = ptr::null_mut();
2010    // SAFETY: c_host is a valid NUL-terminated string, hints is
2011    // initialized, and result is a valid out-pointer.
2012    let rc = unsafe { libc::getaddrinfo(c_host.as_ptr(), ptr::null(), &hints, &mut result) };
2013    if rc != 0 || result.is_null() {
2014        return hostname.to_string();
2015    }
2016
2017    // RAII guard ensures freeaddrinfo is called.
2018    struct AddrInfoGuard(*mut libc::addrinfo);
2019    impl Drop for AddrInfoGuard {
2020        fn drop(&mut self) {
2021            // SAFETY: self.0 was returned by a successful getaddrinfo.
2022            unsafe { libc::freeaddrinfo(self.0) }
2023        }
2024    }
2025    let _guard = AddrInfoGuard(result);
2026
2027    // SAFETY: result is non-null and was returned by getaddrinfo.
2028    let canon = unsafe { (*result).ai_canonname };
2029    if canon.is_null() {
2030        return hostname.to_string();
2031    }
2032    // SAFETY: canon is non-null and NUL-terminated per getaddrinfo
2033    // contract.
2034    unsafe { CStr::from_ptr(canon) }
2035        .to_str()
2036        .unwrap_or(hostname)
2037        .to_string()
2038}
2039
2040/// Resolve admin port from an explicit override or `MESH_ADMIN_ADDR`
2041/// config (MC-5).
2042///
2043/// When `port_override` is `Some`, that port is used directly. When
2044/// `None`, the port is read from the `MESH_ADMIN_ADDR` configuration
2045/// attribute (parsed as a `SocketAddr`).
2046fn resolve_admin_port(port_override: Option<u16>) -> Result<u16, anyhow::Error> {
2047    match port_override {
2048        Some(p) => Ok(p),
2049        None => {
2050            let config_addr = hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR);
2051            Ok(config_addr
2052                .parse_socket_addr()
2053                .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?
2054                .port())
2055        }
2056    }
2057}
2058
2059/// Resolve a `mast_conda:///<job-name>` handle into an
2060/// `https://<fqdn>:<port>` base URL by shelling out to the `mast` CLI
2061/// (MC-1).
2062///
2063/// This is the OSS-compatible counterpart to
2064/// `hyperactor_meta::mesh_admin::resolve_mast_handle`. The `cmd`
2065/// parameter names the CLI binary; production passes `"mast"`, tests
2066/// substitute a synthetic command.
2067async fn try_resolve_mast_handle(
2068    handle: &str,
2069    port_override: Option<u16>,
2070    cmd: &str,
2071) -> anyhow::Result<String> {
2072    let port = resolve_admin_port(port_override)?;
2073    let job_name = handle
2074        .strip_prefix("mast_conda:///")
2075        .ok_or_else(|| anyhow::anyhow!("expected mast_conda:/// prefix, got '{}'", handle))?;
2076
2077    let output = match tokio::process::Command::new(cmd)
2078        .args(["get-status", "--json", job_name])
2079        .output()
2080        .await
2081    {
2082        Ok(o) => o,
2083        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2084            anyhow::bail!(
2085                "MAST CLI not found; install via `sudo feature install --persist mast_cli`"
2086            );
2087        }
2088        Err(e) => {
2089            anyhow::bail!("failed to run '{}': {}", cmd, e);
2090        }
2091    };
2092
2093    if !output.status.success() {
2094        let stderr = String::from_utf8_lossy(&output.stderr);
2095        anyhow::bail!(
2096            "'mast get-status' exited with {}: {}",
2097            output.status,
2098            stderr.trim()
2099        );
2100    }
2101
2102    let response: MastStatusResponse = serde_json::from_slice(&output.stdout)
2103        .map_err(|e| anyhow::anyhow!("failed to parse mast JSON output: {}", e))?;
2104
2105    let hostname =
2106        head_hostname(&response).map_err(|e| anyhow::anyhow!("MAST job '{}': {}", job_name, e))?;
2107
2108    let fqdn = qualify_fqdn(&hostname).await;
2109    Ok(format!("https://{}:{}", fqdn, port))
2110}
2111
2112/// Resolve a `mast_conda:///<job-name>` handle into an
2113/// `https://<fqdn>:<port>` base URL using the `mast` CLI.
2114///
2115/// This is the CLI/OSS path. The thrift-based equivalent lives in
2116/// `hyperactor_meta::mesh_admin::resolve_mast_handle`. Binaries
2117/// match on `MastResolver` to select which to call.
2118pub async fn resolve_mast_handle(
2119    handle: &str,
2120    port_override: Option<u16>,
2121) -> anyhow::Result<String> {
2122    try_resolve_mast_handle(handle, port_override, "mast").await
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127    use std::net::SocketAddr;
2128
2129    use hyperactor::channel::ChannelAddr;
2130    use hyperactor::testing::ids::test_proc_id_with_addr;
2131
2132    use super::*;
2133
2134    // Integration tests that spawn MeshAdminAgent must pass
2135    // `Some("[::]:0".parse().unwrap())` as the admin_addr to get an
2136    // ephemeral port. The default (`None`) reads MESH_ADMIN_ADDR
2137    // config which is `[::]:1729` — a fixed port that causes bind
2138    // conflicts when tests run concurrently.
2139
2140    /// Minimal introspectable actor for tests. The `#[export]`
2141    /// attribute generates `Named + Referable + Binds` so that
2142    /// `handle.bind()` registers the `IntrospectMessage` port for
2143    /// remote delivery.
2144    #[derive(Debug)]
2145    #[hyperactor::export(handlers = [])]
2146    struct TestIntrospectableActor;
2147    impl Actor for TestIntrospectableActor {}
2148
2149    // Verifies that MeshAdminAgent::build_root_payload constructs the
2150    // expected root node: identity/root metadata, correct Root
2151    // properties (num_hosts), and child links populated with the
2152    // stringified IDs of the configured host mesh-agent ActorRefs.
2153    #[test]
2154    fn test_build_root_payload() {
2155        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
2156        let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
2157
2158        let proc1 = test_proc_id_with_addr(ChannelAddr::Tcp(addr1), "host1");
2159        let proc2 = test_proc_id_with_addr(ChannelAddr::Tcp(addr2), "host2");
2160
2161        let actor_id1 = proc1.actor_id("mesh_agent", 0);
2162        let actor_id2 = proc2.actor_id("mesh_agent", 0);
2163
2164        let ref1: hyperactor_reference::ActorRef<HostAgent> =
2165            hyperactor_reference::ActorRef::attest(actor_id1.clone());
2166        let ref2: hyperactor_reference::ActorRef<HostAgent> =
2167            hyperactor_reference::ActorRef::attest(actor_id2.clone());
2168
2169        let agent = MeshAdminAgent::new(
2170            vec![("host_a".to_string(), ref1), ("host_b".to_string(), ref2)],
2171            None,
2172            None,
2173        );
2174
2175        let payload = agent.build_root_payload();
2176        assert_eq!(payload.identity, "root");
2177        assert_eq!(payload.parent, None);
2178        assert!(matches!(
2179            payload.properties,
2180            NodeProperties::Root { num_hosts: 2, .. }
2181        ));
2182        assert_eq!(payload.children.len(), 2);
2183        // Children should be host: reference strings.
2184        assert!(
2185            payload
2186                .children
2187                .contains(&HostId(actor_id1.clone()).to_string())
2188        );
2189        assert!(
2190            payload
2191                .children
2192                .contains(&HostId(actor_id2.clone()).to_string())
2193        );
2194
2195        // Verify root properties derived from attrs.
2196        match &payload.properties {
2197            NodeProperties::Root {
2198                num_hosts,
2199                started_by,
2200                ..
2201            } => {
2202                assert_eq!(*num_hosts, 2);
2203                assert!(!started_by.is_empty());
2204            }
2205            other => panic!("expected Root, got {:?}", other),
2206        }
2207    }
2208
2209    // End-to-end smoke test for MeshAdminAgent::resolve that walks
2210    // the reference tree: root → host → system proc → host-agent
2211    // cross-reference. Verifies the reverse index routes the
2212    // HostAgent ActorId to NodeProperties::Host (not Actor),
2213    // preventing the TUI's cycle detection from dropping that node.
2214    #[tokio::test]
2215    async fn test_resolve_reference_tree_walk() {
2216        use hyperactor::Proc;
2217        use hyperactor::channel::ChannelTransport;
2218        use hyperactor::host::Host;
2219        use hyperactor::host::LocalProcManager;
2220
2221        use crate::host_mesh::host_agent::HostAgentMode;
2222        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2223        use crate::proc_agent::ProcAgent;
2224
2225        // -- 1. Stand up a local in-process Host with a HostAgent --
2226        // Use Unix transport for all procs — Local transport does not
2227        // support cross-proc message routing.
2228        let spawn: ProcManagerSpawnFn =
2229            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2230        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2231        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2232            Host::new(manager, ChannelTransport::Unix.any())
2233                .await
2234                .unwrap();
2235        let host_addr = host.addr().clone();
2236        let system_proc = host.system_proc().clone();
2237        let host_agent_handle = system_proc
2238            .spawn(
2239                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2240                HostAgent::new(HostAgentMode::Local(host)),
2241            )
2242            .unwrap();
2243        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2244        let host_addr_str = host_addr.to_string();
2245
2246        // -- 2. Spawn MeshAdminAgent on a separate proc --
2247        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2248        // The admin proc has no supervision coordinator by default.
2249        // Without one, actor teardown triggers std::process::exit(1).
2250        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2251        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2252        let admin_handle = admin_proc
2253            .spawn(
2254                MESH_ADMIN_ACTOR_NAME,
2255                MeshAdminAgent::new(
2256                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
2257                    None,
2258                    Some("[::]:0".parse().unwrap()),
2259                ),
2260            )
2261            .unwrap();
2262        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2263
2264        // -- 3. Create a bare client instance for sending messages --
2265        // Only a mailbox is needed for reply ports — no actor message
2266        // loop required.
2267        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2268        let (client, _handle) = client_proc.instance("client").unwrap();
2269
2270        // -- 4. Resolve "root" --
2271        let root_resp = admin_ref
2272            .resolve(&client, "root".to_string())
2273            .await
2274            .unwrap();
2275        let root = root_resp.0.unwrap();
2276        assert_eq!(root.identity, "root");
2277        assert!(matches!(
2278            root.properties,
2279            NodeProperties::Root { num_hosts: 1, .. }
2280        ));
2281        assert_eq!(root.parent, None);
2282        assert_eq!(root.children.len(), 1); // host only (admin proc no longer standalone)
2283
2284        // -- 5. Resolve the host child --
2285        let _host_agent_id_str = host_agent_ref.actor_id().to_string();
2286        let host_ref_str = HostId(host_agent_ref.actor_id().clone()).to_string();
2287        let host_child_ref_str = root
2288            .children
2289            .iter()
2290            .find(|c| **c == host_ref_str)
2291            .expect("root children should contain the host agent (as host: ref)");
2292        let host_resp = admin_ref
2293            .resolve(&client, host_child_ref_str.clone())
2294            .await
2295            .unwrap();
2296        let host_node = host_resp.0.unwrap();
2297        assert_eq!(host_node.identity, *host_child_ref_str);
2298        assert!(
2299            matches!(host_node.properties, NodeProperties::Host { .. }),
2300            "expected Host properties, got {:?}",
2301            host_node.properties
2302        );
2303        assert_eq!(host_node.parent, Some("root".to_string()));
2304        // A local host always has at least the system and local procs.
2305        assert!(
2306            !host_node.children.is_empty(),
2307            "host should have at least one proc child"
2308        );
2309
2310        // -- 6. Resolve a system proc child --
2311        // System proc children are ProcId strings (the "[system] "
2312        // prefix is stripped by resolve_host_node).
2313        let proc_ref_str = &host_node.children[0];
2314        let proc_resp = admin_ref
2315            .resolve(&client, proc_ref_str.clone())
2316            .await
2317            .unwrap();
2318        let proc_node = proc_resp.0.unwrap();
2319        assert!(
2320            matches!(proc_node.properties, NodeProperties::Proc { .. }),
2321            "expected Proc properties, got {:?}",
2322            proc_node.properties
2323        );
2324        assert_eq!(proc_node.parent, Some(host_child_ref_str.clone()));
2325        // The system proc should have at least the "host_agent" actor.
2326        assert!(
2327            !proc_node.children.is_empty(),
2328            "proc should have at least one actor child"
2329        );
2330
2331        // -- 7. Cross-reference: system proc child is the host agent --
2332        //
2333        // The service proc's actor (agent[0]) IS the HostAgent, so
2334        // it appears both as a host node (from root, via HostId) and
2335        // as an actor (from a proc's children list, via plain ActorId).
2336        // HostId in root children makes resolution unambiguous: host
2337        // refs get Entity view, plain actor refs get Actor view.
2338
2339        // The system proc must list the host agent among its children.
2340        let host_agent_id_str = host_agent_ref.actor_id().to_string();
2341        assert!(
2342            proc_node.children.contains(&host_agent_id_str),
2343            "system proc children {:?} should contain the host agent {}",
2344            proc_node.children,
2345            host_agent_id_str
2346        );
2347
2348        // Resolve that child reference as a plain actor (no host: prefix).
2349        let xref_resp = admin_ref
2350            .resolve(&client, host_agent_id_str.clone())
2351            .await
2352            .unwrap();
2353        let xref_node = xref_resp.0.unwrap();
2354
2355        // When resolved as a plain actor reference, it must return
2356        // Actor properties (not Host), because it has no host: prefix.
2357        assert!(
2358            matches!(xref_node.properties, NodeProperties::Actor { .. }),
2359            "host agent child resolved as plain actor should be Actor, got {:?}",
2360            xref_node.properties
2361        );
2362    }
2363
2364    // Verifies MeshAdminAgent::resolve returns NodeProperties::Proc
2365    // for all proc children. Spawns a user proc via
2366    // CreateOrUpdate<ProcSpec>, resolves all host proc-children, and
2367    // asserts every proc returns Proc properties.
2368    #[tokio::test]
2369    async fn test_proc_properties_for_all_procs() {
2370        use std::time::Duration;
2371
2372        use hyperactor::Proc;
2373        use hyperactor::channel::ChannelTransport;
2374        use hyperactor::host::Host;
2375        use hyperactor::host::LocalProcManager;
2376
2377        use crate::Name;
2378        use crate::host_mesh::host_agent::HostAgentMode;
2379        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2380        use crate::proc_agent::ProcAgent;
2381        use crate::resource;
2382        use crate::resource::ProcSpec;
2383        use crate::resource::Rank;
2384
2385        // Stand up a local in-process Host with a HostAgent.
2386        let spawn: ProcManagerSpawnFn =
2387            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2388        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2389        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2390            Host::new(manager, ChannelTransport::Unix.any())
2391                .await
2392                .unwrap();
2393        let host_addr = host.addr().clone();
2394        let system_proc = host.system_proc().clone();
2395        let host_agent_handle = system_proc
2396            .spawn(
2397                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2398                HostAgent::new(HostAgentMode::Local(host)),
2399            )
2400            .unwrap();
2401        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2402        let host_addr_str = host_addr.to_string();
2403
2404        // Spawn MeshAdminAgent on a separate proc.
2405        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2406        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2407        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2408        let admin_handle = admin_proc
2409            .spawn(
2410                MESH_ADMIN_ACTOR_NAME,
2411                MeshAdminAgent::new(
2412                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
2413                    None,
2414                    Some("[::]:0".parse().unwrap()),
2415                ),
2416            )
2417            .unwrap();
2418        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2419
2420        // Create a bare client instance for sending messages.
2421        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2422        let (client, _handle) = client_proc.instance("client").unwrap();
2423
2424        // Spawn a user proc via CreateOrUpdate<ProcSpec>.
2425        let user_proc_name = Name::new("user_proc").unwrap();
2426        host_agent_ref
2427            .send(
2428                &client,
2429                resource::CreateOrUpdate {
2430                    name: user_proc_name.clone(),
2431                    rank: Rank::new(0),
2432                    spec: ProcSpec::default(),
2433                },
2434            )
2435            .unwrap();
2436
2437        // Wait for the user proc to boot.
2438        tokio::time::sleep(Duration::from_secs(2)).await;
2439
2440        // Resolve the host to get its children (system + user procs).
2441        let host_ref_string = HostId(host_agent_ref.actor_id().clone()).to_string();
2442        let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
2443        let host_node = host_resp.0.unwrap();
2444
2445        // The host should have at least 3 children: system proc,
2446        // local proc, and our user proc.
2447        assert!(
2448            host_node.children.len() >= 3,
2449            "expected at least 3 proc children (2 system + 1 user), got {}",
2450            host_node.children.len()
2451        );
2452
2453        // Resolve each proc child and verify it has Proc properties.
2454        let user_proc_name_str = user_proc_name.to_string();
2455        let mut found_system = false;
2456        let mut found_user = false;
2457        for child_ref_str in &host_node.children {
2458            let resp = admin_ref
2459                .resolve(&client, child_ref_str.clone())
2460                .await
2461                .unwrap();
2462            let node = resp.0.unwrap();
2463            if let NodeProperties::Proc { proc_name, .. } = &node.properties {
2464                if proc_name.contains(&user_proc_name_str) {
2465                    found_user = true;
2466                } else {
2467                    found_system = true;
2468                }
2469                // Properties derived from attrs — verified by derive_properties tests.
2470            } else {
2471                // Host agent cross-reference — skip.
2472            }
2473        }
2474        assert!(
2475            found_system,
2476            "should have resolved at least one system proc"
2477        );
2478        assert!(found_user, "should have resolved the user proc");
2479    }
2480
2481    // Verifies that build_root_payload lists only the host as a
2482    // child. The root client is visible under its host's local proc,
2483    // not at root level.
2484    #[test]
2485    fn test_build_root_payload_with_root_client() {
2486        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
2487        let proc1 = hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "host1");
2488        let actor_id1 = hyperactor_reference::ActorId::root(proc1, "mesh_agent".to_string());
2489        let ref1: hyperactor_reference::ActorRef<HostAgent> =
2490            hyperactor_reference::ActorRef::attest(actor_id1.clone());
2491
2492        let client_proc_id =
2493            hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "local");
2494        let client_actor_id = client_proc_id.actor_id("client", 0);
2495
2496        let agent = MeshAdminAgent::new(
2497            vec![("host_a".to_string(), ref1)],
2498            Some(client_actor_id.clone()),
2499            None,
2500        );
2501
2502        let payload = agent.build_root_payload();
2503        assert!(matches!(
2504            payload.properties,
2505            NodeProperties::Root { num_hosts: 1, .. }
2506        ));
2507        // Only the host; root client is under host → local proc.
2508        assert_eq!(payload.children.len(), 1);
2509        assert!(
2510            payload
2511                .children
2512                .contains(&HostId(actor_id1.clone()).to_string())
2513        );
2514    }
2515
2516    // Verifies that the root client actor is visible through the
2517    // host → local proc → actor path, not as a standalone child of
2518    // root.
2519    #[tokio::test]
2520    async fn test_resolve_root_client_actor() {
2521        use hyperactor::channel::ChannelTransport;
2522        use hyperactor::host::Host;
2523        use hyperactor::host::LocalProcManager;
2524
2525        use crate::host_mesh::host_agent::HostAgentMode;
2526        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2527        use crate::proc_agent::ProcAgent;
2528
2529        // Stand up a local in-process Host with a HostAgent.
2530        let spawn: ProcManagerSpawnFn =
2531            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2532        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2533        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2534            Host::new(manager, ChannelTransport::Unix.any())
2535                .await
2536                .unwrap();
2537        let host_addr = host.addr().clone();
2538        let system_proc = host.system_proc().clone();
2539
2540        // Spawn the root client on the host's local proc (before
2541        // moving the host into HostAgentMode).
2542        let local_proc = host.local_proc();
2543        let local_proc_id = local_proc.proc_id().clone();
2544        let root_client_handle = local_proc.spawn("client", TestIntrospectableActor).unwrap();
2545        let root_client_ref: hyperactor_reference::ActorRef<TestIntrospectableActor> =
2546            root_client_handle.bind();
2547        let root_client_actor_id = root_client_ref.actor_id().clone();
2548
2549        let host_agent_handle = system_proc
2550            .spawn(
2551                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2552                HostAgent::new(HostAgentMode::Local(host)),
2553            )
2554            .unwrap();
2555        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2556        let host_addr_str = host_addr.to_string();
2557
2558        // Spawn MeshAdminAgent with the root client ActorId.
2559        let admin_proc =
2560            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2561        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2562        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2563        let admin_handle = admin_proc
2564            .spawn(
2565                MESH_ADMIN_ACTOR_NAME,
2566                MeshAdminAgent::new(
2567                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
2568                    Some(root_client_actor_id.clone()),
2569                    Some("[::]:0".parse().unwrap()),
2570                ),
2571            )
2572            .unwrap();
2573        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2574
2575        // Client for sending messages.
2576        let client_proc =
2577            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2578        let (client, _handle) = client_proc.instance("client").unwrap();
2579
2580        // Resolve "root" — should contain only the host.
2581        let root_resp = admin_ref
2582            .resolve(&client, "root".to_string())
2583            .await
2584            .unwrap();
2585        let root = root_resp.0.unwrap();
2586        let host_id_str = HostId(host_agent_ref.actor_id().clone()).to_string();
2587        assert!(
2588            root.children.contains(&host_id_str),
2589            "root children {:?} should contain host {}",
2590            root.children,
2591            host_id_str
2592        );
2593
2594        // Resolve the host — should list the local proc in children.
2595        let host_resp = admin_ref
2596            .resolve(&client, host_id_str.clone())
2597            .await
2598            .unwrap();
2599        let host_node = host_resp.0.unwrap();
2600        let local_proc_str = local_proc_id.to_string();
2601        assert!(
2602            host_node.children.contains(&local_proc_str),
2603            "host children {:?} should contain local proc {}",
2604            host_node.children,
2605            local_proc_str
2606        );
2607
2608        // Resolve the local proc — should contain the root client actor.
2609        let proc_resp = admin_ref
2610            .resolve(&client, local_proc_str.clone())
2611            .await
2612            .unwrap();
2613        let proc_node = proc_resp.0.unwrap();
2614        assert!(
2615            matches!(proc_node.properties, NodeProperties::Proc { .. }),
2616            "expected Proc properties, got {:?}",
2617            proc_node.properties
2618        );
2619        assert!(
2620            proc_node
2621                .children
2622                .contains(&root_client_actor_id.to_string()),
2623            "local proc children {:?} should contain root client actor {}",
2624            proc_node.children,
2625            root_client_actor_id
2626        );
2627
2628        // Resolve the root client actor — parent should be the local proc.
2629        let client_resp = admin_ref
2630            .resolve(&client, root_client_actor_id.to_string())
2631            .await
2632            .unwrap();
2633        let client_node = client_resp.0.unwrap();
2634        assert!(
2635            matches!(client_node.properties, NodeProperties::Actor { .. }),
2636            "expected Actor properties, got {:?}",
2637            client_node.properties
2638        );
2639        assert_eq!(
2640            client_node.parent,
2641            Some(local_proc_str),
2642            "root client parent should be the local proc"
2643        );
2644    }
2645
2646    // Verifies that the SKILL.md template contains the canonical
2647    // strings that agents and tests rely on. Prevents silent drift or
2648    // accidental removal.
2649    #[test]
2650    fn test_skill_md_contains_canonical_strings() {
2651        let template = SKILL_MD_TEMPLATE;
2652        assert!(
2653            template.contains("GET {base}/v1/root"),
2654            "SKILL.md must document the root endpoint"
2655        );
2656        assert!(
2657            template.contains("GET {base}/v1/{reference}"),
2658            "SKILL.md must document the reference endpoint"
2659        );
2660        assert!(
2661            template.contains("NodePayload"),
2662            "SKILL.md must mention the NodePayload response type"
2663        );
2664        assert!(
2665            template.contains("GET {base}/SKILL.md"),
2666            "SKILL.md must document itself"
2667        );
2668        assert!(
2669            template.contains("{base}"),
2670            "SKILL.md must use {{base}} placeholder for interpolation"
2671        );
2672    }
2673
2674    // Verifies the navigation identity invariant (see module docs):
2675    //
2676    // 1. payload.identity == reference_string used to resolve it.
2677    // 2. For each child reference C of a resolved node P,
2678    //    resolve(C).parent == P.identity.
2679    //
2680    // Walks the entire tree starting from root, checking both
2681    // properties at every reachable node.
2682    #[tokio::test]
2683    async fn test_navigation_identity_invariant() {
2684        use hyperactor::Proc;
2685        use hyperactor::channel::ChannelTransport;
2686        use hyperactor::host::Host;
2687        use hyperactor::host::LocalProcManager;
2688
2689        use crate::host_mesh::host_agent::HostAgentMode;
2690        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2691        use crate::proc_agent::ProcAgent;
2692
2693        // Stand up a local host with a HostAgent.
2694        let spawn: ProcManagerSpawnFn =
2695            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2696        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2697        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2698            Host::new(manager, ChannelTransport::Unix.any())
2699                .await
2700                .unwrap();
2701        let host_addr = host.addr().clone();
2702        let system_proc = host.system_proc().clone();
2703        let host_agent_handle = system_proc
2704            .spawn(
2705                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2706                HostAgent::new(HostAgentMode::Local(host)),
2707            )
2708            .unwrap();
2709        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2710        let host_addr_str = host_addr.to_string();
2711
2712        // Spawn MeshAdminAgent on a separate proc.
2713        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2714        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2715        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2716        let admin_handle = admin_proc
2717            .spawn(
2718                MESH_ADMIN_ACTOR_NAME,
2719                MeshAdminAgent::new(
2720                    vec![(host_addr_str, host_agent_ref)],
2721                    None,
2722                    Some("[::]:0".parse().unwrap()),
2723                ),
2724            )
2725            .unwrap();
2726        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2727
2728        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2729        let (client, _handle) = client_proc.instance("client").unwrap();
2730
2731        // Walk the tree breadth-first, checking the invariant at every node.
2732        // Each entry is (reference_string, expected_parent_identity).
2733        let mut queue: std::collections::VecDeque<(String, Option<String>)> =
2734            std::collections::VecDeque::new();
2735        queue.push_back(("root".to_string(), None));
2736
2737        let mut visited = std::collections::HashSet::new();
2738        while let Some((ref_str, expected_parent)) = queue.pop_front() {
2739            if !visited.insert(ref_str.clone()) {
2740                continue;
2741            }
2742
2743            let resp = admin_ref.resolve(&client, ref_str.clone()).await.unwrap();
2744            let node = resp.0.unwrap();
2745
2746            // NI-1: identity matches the reference used.
2747            assert_eq!(
2748                node.identity, ref_str,
2749                "identity mismatch: resolved '{}' but payload.identity = '{}'",
2750                ref_str, node.identity
2751            );
2752
2753            // NI-2: parent matches the parent node's identity.
2754            assert_eq!(
2755                node.parent, expected_parent,
2756                "parent mismatch for '{}': expected {:?}, got {:?}",
2757                ref_str, expected_parent, node.parent
2758            );
2759
2760            // Enqueue children with this node's identity as their
2761            // expected parent.
2762            for child_ref in &node.children {
2763                if !visited.contains(child_ref) {
2764                    queue.push_back((child_ref.clone(), Some(node.identity.clone())));
2765                }
2766            }
2767        }
2768
2769        // Sanity: we should have visited at least root, host, a
2770        // proc, and an actor.
2771        assert!(
2772            visited.len() >= 4,
2773            "expected at least 4 nodes in the tree, visited {}",
2774            visited.len()
2775        );
2776    }
2777
2778    // Exercises SP-1..SP-4 for host/proc payloads.
2779    #[tokio::test]
2780    async fn test_system_proc_identity() {
2781        use hyperactor::Proc;
2782        use hyperactor::channel::ChannelTransport;
2783        use hyperactor::host::Host;
2784        use hyperactor::host::LocalProcManager;
2785
2786        use crate::host_mesh::host_agent::HostAgentMode;
2787        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2788        use crate::proc_agent::ProcAgent;
2789
2790        // -- 1. Stand up a local in-process Host with a HostAgent --
2791        let spawn: ProcManagerSpawnFn =
2792            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2793        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2794        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2795            Host::new(manager, ChannelTransport::Unix.any())
2796                .await
2797                .unwrap();
2798        let host_addr = host.addr().clone();
2799        let system_proc = host.system_proc().clone();
2800        let system_proc_id = system_proc.proc_id().clone();
2801        let host_agent_handle = system_proc
2802            .spawn(
2803                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2804                HostAgent::new(HostAgentMode::Local(host)),
2805            )
2806            .unwrap();
2807        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2808        let host_addr_str = host_addr.to_string();
2809
2810        // -- 2. Spawn MeshAdminAgent on a separate proc --
2811        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2812        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2813        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2814        let admin_handle = admin_proc
2815            .spawn(
2816                MESH_ADMIN_ACTOR_NAME,
2817                MeshAdminAgent::new(
2818                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
2819                    None,
2820                    Some("[::]:0".parse().unwrap()),
2821                ),
2822            )
2823            .unwrap();
2824        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2825
2826        // -- 3. Create a bare client instance for sending messages --
2827        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2828        let (client, _handle) = client_proc.instance("client").unwrap();
2829
2830        // -- 4. Resolve the host to get its children --
2831        let host_ref_str = HostId(host_agent_ref.actor_id().clone()).to_string();
2832        let host_resp = admin_ref
2833            .resolve(&client, host_ref_str.clone())
2834            .await
2835            .unwrap();
2836        let host_node = host_resp.0.unwrap();
2837        assert!(
2838            !host_node.children.is_empty(),
2839            "host should have at least one proc child"
2840        );
2841
2842        // -- 5. Find a system proc child via system_children --
2843        let system_children = match &host_node.properties {
2844            NodeProperties::Host {
2845                system_children, ..
2846            } => system_children.clone(),
2847            other => panic!("expected Host properties, got {:?}", other),
2848        };
2849        // Procs are never system — host system_children should be empty.
2850        assert!(
2851            system_children.is_empty(),
2852            "host system_children should be empty (procs are never system), got {:?}",
2853            system_children
2854        );
2855        // Verify host properties derived from attrs.
2856        assert!(
2857            matches!(&host_node.properties, NodeProperties::Host { .. }),
2858            "expected Host properties"
2859        );
2860
2861        // -- 6. Verify host children contain the system proc --
2862        let expected_system_ref = system_proc_id.to_string();
2863        assert!(
2864            host_node.children.contains(&expected_system_ref),
2865            "host children {:?} should contain the system proc ref '{}'",
2866            host_node.children,
2867            expected_system_ref
2868        );
2869
2870        // -- 7. Resolve a proc child --
2871        let proc_child_ref = &host_node.children[0];
2872        let proc_resp = admin_ref
2873            .resolve(&client, proc_child_ref.clone())
2874            .await
2875            .unwrap();
2876        let proc_node = proc_resp.0.unwrap();
2877
2878        assert_eq!(
2879            proc_node.identity, *proc_child_ref,
2880            "identity must match the proc ref from the host's children list"
2881        );
2882
2883        assert!(
2884            matches!(proc_node.properties, NodeProperties::Proc { .. }),
2885            "expected NodeProperties::Proc, got {:?}",
2886            proc_node.properties
2887        );
2888
2889        assert_eq!(
2890            proc_node.parent,
2891            Some(host_ref_str.clone()),
2892            "proc parent should be the host reference (host:<actor_id>)"
2893        );
2894
2895        assert!(
2896            !proc_node.as_of.is_empty(),
2897            "as_of should be present and non-empty"
2898        );
2899
2900        // Verify proc properties derived from attrs.
2901        assert!(
2902            matches!(&proc_node.properties, NodeProperties::Proc { .. }),
2903            "expected Proc properties"
2904        );
2905    }
2906
2907    // -- MAST CLI resolver tests --
2908    //
2909    // These tests exercise the invariants introduced by the CLI-based
2910    // MAST handle resolution. Each test names the invariant it
2911    // establishes.
2912
2913    /// Helper: build a `MastStatusResponse` from a list of
2914    /// `(task_index, hostname)` pairs in one task group.
2915    fn mast_response_from_hosts(hosts: &[(i64, &str)]) -> super::MastStatusResponse {
2916        let mut tasks = std::collections::HashMap::new();
2917        for (idx, host) in hosts {
2918            tasks.insert(
2919                idx.to_string(),
2920                vec![super::MastTaskAttempt {
2921                    hostname: Some(host.to_string()),
2922                }],
2923            );
2924        }
2925        super::MastStatusResponse {
2926            latest_attempt: super::MastAttempt {
2927                task_groups: std::collections::HashMap::from([(
2928                    "trainers".to_string(),
2929                    vec![super::MastTaskGroup { tasks }],
2930                )]),
2931            },
2932        }
2933    }
2934
2935    // MC-2: single group, tasks sorted by ascending index.
2936    #[test]
2937    fn test_head_hostname_single_group() {
2938        let response = mast_response_from_hosts(&[(2, "host2"), (0, "host0"), (1, "host1")]);
2939        let head = super::head_hostname(&response).unwrap();
2940        assert_eq!(head, "host0");
2941    }
2942
2943    // MC-2: last attempt selected per task.
2944    #[test]
2945    fn test_head_hostname_last_attempt_wins() {
2946        let mut tasks = std::collections::HashMap::new();
2947        tasks.insert(
2948            "0".to_string(),
2949            vec![
2950                super::MastTaskAttempt {
2951                    hostname: Some("old_host".to_string()),
2952                },
2953                super::MastTaskAttempt {
2954                    hostname: Some("new_host".to_string()),
2955                },
2956            ],
2957        );
2958        let response = super::MastStatusResponse {
2959            latest_attempt: super::MastAttempt {
2960                task_groups: std::collections::HashMap::from([(
2961                    "trainers".to_string(),
2962                    vec![super::MastTaskGroup { tasks }],
2963                )]),
2964            },
2965        };
2966        let head = super::head_hostname(&response).unwrap();
2967        assert_eq!(head, "new_host");
2968    }
2969
2970    // MC-2: multiple groups merged and sorted.
2971    #[test]
2972    fn test_head_hostname_multiple_groups() {
2973        let mut tasks_a = std::collections::HashMap::new();
2974        tasks_a.insert(
2975            "1".to_string(),
2976            vec![super::MastTaskAttempt {
2977                hostname: Some("host_a1".to_string()),
2978            }],
2979        );
2980        let mut tasks_b = std::collections::HashMap::new();
2981        tasks_b.insert(
2982            "0".to_string(),
2983            vec![super::MastTaskAttempt {
2984                hostname: Some("host_b0".to_string()),
2985            }],
2986        );
2987        let response = super::MastStatusResponse {
2988            latest_attempt: super::MastAttempt {
2989                task_groups: std::collections::HashMap::from([
2990                    (
2991                        "group_a".to_string(),
2992                        vec![super::MastTaskGroup { tasks: tasks_a }],
2993                    ),
2994                    (
2995                        "group_b".to_string(),
2996                        vec![super::MastTaskGroup { tasks: tasks_b }],
2997                    ),
2998                ]),
2999            },
3000        };
3001        let head = super::head_hostname(&response).unwrap();
3002        assert_eq!(head, "host_b0");
3003    }
3004
3005    // MC-2: no hostnames → error.
3006    #[test]
3007    fn test_head_hostname_empty() {
3008        let response = super::MastStatusResponse {
3009            latest_attempt: super::MastAttempt {
3010                task_groups: std::collections::HashMap::new(),
3011            },
3012        };
3013        assert!(super::head_hostname(&response).is_err());
3014    }
3015
3016    // MC-2: hostname field is None (task not yet
3017    // allocated) → skipped.
3018    #[test]
3019    fn test_head_hostname_skips_unallocated() {
3020        let mut tasks = std::collections::HashMap::new();
3021        tasks.insert(
3022            "0".to_string(),
3023            vec![super::MastTaskAttempt { hostname: None }],
3024        );
3025        tasks.insert(
3026            "1".to_string(),
3027            vec![super::MastTaskAttempt {
3028                hostname: Some("allocated_host".to_string()),
3029            }],
3030        );
3031        let response = super::MastStatusResponse {
3032            latest_attempt: super::MastAttempt {
3033                task_groups: std::collections::HashMap::from([(
3034                    "trainers".to_string(),
3035                    vec![super::MastTaskGroup { tasks }],
3036                )]),
3037            },
3038        };
3039        let head = super::head_hostname(&response).unwrap();
3040        assert_eq!(head, "allocated_host");
3041    }
3042
3043    // MC-3: hostname with dot passes through
3044    // unchanged (no DNS lookup).
3045    #[tokio::test]
3046    async fn test_qualify_fqdn_already_qualified() {
3047        let fqdn = super::qualify_fqdn("fake.nonexistent.tld").await;
3048        assert_eq!(fqdn, "fake.nonexistent.tld");
3049    }
3050
3051    // MC-3, MC-4: short hostname
3052    // goes through getaddrinfo via spawn_blocking. The result is
3053    // environment-dependent, but must be non-empty and the call
3054    // must complete without hanging.
3055    #[tokio::test]
3056    async fn test_qualify_fqdn_short_hostname() {
3057        let fqdn = super::qualify_fqdn("localhost").await;
3058        assert!(!fqdn.is_empty(), "qualify_fqdn returned empty string");
3059    }
3060
3061    // MC-3: nonexistent short hostname falls back
3062    // to the raw input.
3063    #[tokio::test]
3064    async fn test_qualify_fqdn_nonexistent_fallback() {
3065        let input = "__nonexistent_host_for_test";
3066        let fqdn = super::qualify_fqdn(input).await;
3067        assert_eq!(fqdn, input);
3068    }
3069
3070    // MC-5: explicit override is used directly.
3071    #[test]
3072    fn test_resolve_admin_port_override() {
3073        assert_eq!(super::resolve_admin_port(Some(8080)).unwrap(), 8080);
3074    }
3075
3076    // MC-5: falls back to MESH_ADMIN_ADDR config
3077    // (default [::]:1729).
3078    #[test]
3079    fn test_resolve_admin_port_from_config() {
3080        let port = super::resolve_admin_port(None).unwrap();
3081        assert_eq!(port, 1729);
3082    }
3083
3084    // MC-1: missing binary produces a "not found" error.
3085    #[tokio::test]
3086    async fn test_cli_missing_binary() {
3087        let result = super::try_resolve_mast_handle(
3088            "mast_conda:///test-job",
3089            Some(1729),
3090            "__nonexistent_mast_test_bin",
3091        )
3092        .await;
3093        let err = format!("{:#}", result.unwrap_err());
3094        assert!(
3095            err.contains("not found"),
3096            "expected 'not found' in error, got: {}",
3097            err
3098        );
3099    }
3100
3101    /// Write a shell script to a temp directory and return the
3102    /// directory guard and script path.
3103    fn write_test_script(content: &str) -> (tempfile::TempDir, String) {
3104        use std::os::unix::fs::PermissionsExt;
3105        let dir = tempfile::tempdir().unwrap();
3106        let script_path = dir.path().join("mast_stub.sh");
3107        std::fs::write(&script_path, content).unwrap();
3108        std::fs::set_permissions(&script_path, PermissionsExt::from_mode(0o755)).unwrap();
3109        let path_str = script_path.to_str().unwrap().to_string();
3110        (dir, path_str)
3111    }
3112
3113    // MC-1: valid JSON, happy path end-to-end.
3114    #[tokio::test]
3115    async fn test_cli_happy_path() {
3116        let json = r#"{"latestAttempt":{"taskGroupExecutionAttempts":{"trainers":[{"taskExecutionAttempts":{"0":[{"hostname":"devgpu042"}]}}]}}}"#;
3117        let (_dir, script_path) = write_test_script(&format!("#!/bin/sh\necho '{}'\n", json));
3118        let url =
3119            super::try_resolve_mast_handle("mast_conda:///test-job", Some(1729), &script_path)
3120                .await
3121                .unwrap();
3122        assert!(url.starts_with("https://"), "url: {}", url);
3123        assert!(url.ends_with(":1729"), "url: {}", url);
3124    }
3125
3126    // MC-1: malformed JSON produces a parse error.
3127    #[tokio::test]
3128    async fn test_cli_malformed_json() {
3129        let (_dir, script_path) = write_test_script("#!/bin/sh\necho 'not json'\n");
3130        let result =
3131            super::try_resolve_mast_handle("mast_conda:///test-job", Some(1729), &script_path)
3132                .await;
3133        let err = format!("{:#}", result.unwrap_err());
3134        assert!(
3135            err.contains("failed to parse"),
3136            "expected parse error, got: {}",
3137            err
3138        );
3139    }
3140
3141    // MC-1: non-zero exit includes code + stderr.
3142    #[tokio::test]
3143    async fn test_cli_nonzero_exit() {
3144        let (_dir, script_path) =
3145            write_test_script("#!/bin/sh\necho >&2 'job not found'\nexit 42\n");
3146        let result =
3147            super::try_resolve_mast_handle("mast_conda:///test-job", Some(1729), &script_path)
3148                .await;
3149        let err = format!("{:#}", result.unwrap_err());
3150        assert!(
3151            err.contains("job not found"),
3152            "expected stderr in error, got: {}",
3153            err
3154        );
3155    }
3156
3157    // MC-1: handle without mast_conda:/// prefix is
3158    // rejected with a clear error.
3159    #[tokio::test]
3160    async fn test_cli_missing_prefix() {
3161        let result = super::try_resolve_mast_handle("bad_handle", Some(1729), "mast").await;
3162        let err = format!("{:#}", result.unwrap_err());
3163        assert!(
3164            err.contains("expected mast_conda:/// prefix"),
3165            "expected prefix error, got: {}",
3166            err
3167        );
3168    }
3169
3170    // MC-2: a task group with zero attempts is skipped.
3171    #[test]
3172    fn test_head_hostname_empty_attempts_vec() {
3173        let response = super::MastStatusResponse {
3174            latest_attempt: super::MastAttempt {
3175                task_groups: std::collections::HashMap::from([(
3176                    "trainers".to_string(),
3177                    vec![], // no attempts in this group
3178                )]),
3179            },
3180        };
3181        assert!(super::head_hostname(&response).is_err());
3182    }
3183
3184    // MC-2: non-numeric task index keys sort last
3185    // (i64::MAX fallback).
3186    #[test]
3187    fn test_head_hostname_non_numeric_index() {
3188        let mut tasks = std::collections::HashMap::new();
3189        tasks.insert(
3190            "abc".to_string(),
3191            vec![super::MastTaskAttempt {
3192                hostname: Some("host_abc".to_string()),
3193            }],
3194        );
3195        tasks.insert(
3196            "0".to_string(),
3197            vec![super::MastTaskAttempt {
3198                hostname: Some("host_0".to_string()),
3199            }],
3200        );
3201        let response = super::MastStatusResponse {
3202            latest_attempt: super::MastAttempt {
3203                task_groups: std::collections::HashMap::from([(
3204                    "trainers".to_string(),
3205                    vec![super::MastTaskGroup { tasks }],
3206                )]),
3207            },
3208        };
3209        let head = super::head_hostname(&response).unwrap();
3210        assert_eq!(head, "host_0");
3211    }
3212
3213    // Verifies that GET /v1/{proc_id} reflects actors spawned directly
3214    // on a proc — bypassing ProcAgent's gspawn message and therefore
3215    // never triggering publish_introspect_properties — so that the
3216    // resolved children list is always derived from live proc state.
3217    //
3218    // Regression guard for the bug introduced in 9a08d559: the switch
3219    // from a live handle_introspect to a cached publish model made
3220    // supervision-spawned actors (e.g. every sieve actor after
3221    // sieve[0]) invisible to the TUI.
3222    //
3223    // Exercises PA-1 (see module doc). See also
3224    // proc_agent::tests::test_query_child_proc_returns_live_children.
3225    #[tokio::test]
3226    async fn test_proc_children_reflect_directly_spawned_actors() {
3227        use hyperactor::Proc;
3228        use hyperactor::actor::ActorStatus;
3229        use hyperactor::channel::ChannelTransport;
3230        use hyperactor::host::Host;
3231        use hyperactor::host::LocalProcManager;
3232        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3233
3234        use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
3235        use crate::host_mesh::host_agent::HostAgent;
3236        use crate::host_mesh::host_agent::HostAgentMode;
3237        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3238        use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
3239        use crate::proc_agent::ProcAgent;
3240
3241        // Stand up a HostMeshAgent. The user proc gets its own
3242        // ephemeral address; we register that address in
3243        // MeshAdminAgent so resolve_proc_node can look it up.
3244        // HostMeshAgent won't know the user proc (it wasn't spawned
3245        // through it), so QueryChild returns Error and resolve falls
3246        // back to querying proc_agent[0] via QueryChild(Proc) — the
3247        // path being tested.
3248        let spawn_fn: ProcManagerSpawnFn =
3249            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3250        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn_fn);
3251        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3252            Host::new(manager, ChannelTransport::Unix.any())
3253                .await
3254                .unwrap();
3255        let system_proc = host.system_proc().clone();
3256        let host_agent_handle = system_proc
3257            .spawn(
3258                HOST_MESH_AGENT_ACTOR_NAME,
3259                HostAgent::new(HostAgentMode::Local(host)),
3260            )
3261            .unwrap();
3262        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3263
3264        // User proc: own ephemeral Unix socket, own ProcAgent.
3265        let user_proc =
3266            Proc::direct(ChannelTransport::Unix.any(), "user_proc".to_string()).unwrap();
3267        let user_proc_addr = user_proc.proc_id().addr().to_string();
3268        let agent_handle = ProcAgent::boot_v1(user_proc.clone(), None).unwrap();
3269        agent_handle
3270            .status()
3271            .wait_for(|s| matches!(s, ActorStatus::Idle))
3272            .await
3273            .unwrap();
3274
3275        // MeshAdminAgent: register the user proc's addr as a "host"
3276        // pointing to host_agent_ref. That agent doesn't know the
3277        // user proc, so QueryChild → Error → fallback to proc_agent.
3278        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3279        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3280        let admin_handle = admin_proc
3281            .spawn(
3282                MESH_ADMIN_ACTOR_NAME,
3283                MeshAdminAgent::new(
3284                    vec![(user_proc_addr, host_agent_ref.clone())],
3285                    None,
3286                    Some("[::]:0".parse().unwrap()),
3287                ),
3288            )
3289            .unwrap();
3290        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3291
3292        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3293        let (client, _client_handle) = client_proc.instance("client").unwrap();
3294
3295        // Resolve the user proc via MeshAdminAgent. HostMeshAgent
3296        // returns Error for QueryChild → fallback to proc_agent[0]
3297        // QueryChild(Reference::Proc) → live NodeProperties::Proc.
3298        let user_proc_ref = user_proc.proc_id().to_string();
3299        let resp = admin_ref
3300            .resolve(&client, user_proc_ref.clone())
3301            .await
3302            .unwrap();
3303        let node = resp.0.unwrap();
3304        assert!(
3305            matches!(node.properties, NodeProperties::Proc { .. }),
3306            "expected Proc, got {:?}",
3307            node.properties
3308        );
3309        let initial_count = node.children.len();
3310        assert!(
3311            node.children
3312                .iter()
3313                .any(|c| c.contains(PROC_AGENT_ACTOR_NAME)),
3314            "initial children {:?} should contain proc_agent",
3315            node.children
3316        );
3317
3318        // Spawn an actor directly on the user proc, bypassing gspawn.
3319        // This simulates how sieve[0] spawns sieve[1], sieve[2], etc.
3320        user_proc
3321            .spawn("extra_actor", TestIntrospectableActor)
3322            .unwrap();
3323
3324        // Resolve again — the new actor must appear immediately
3325        // without any republish, proving PA-1 is satisfied.
3326        let resp2 = admin_ref
3327            .resolve(&client, user_proc_ref.clone())
3328            .await
3329            .unwrap();
3330        let node2 = resp2.0.unwrap();
3331        assert!(
3332            matches!(node2.properties, NodeProperties::Proc { .. }),
3333            "expected Proc, got {:?}",
3334            node2.properties
3335        );
3336        assert!(
3337            node2.children.iter().any(|c| c.contains("extra_actor")),
3338            "after direct spawn, children {:?} should contain extra_actor",
3339            node2.children
3340        );
3341        assert!(
3342            node2.children.len() > initial_count,
3343            "expected at least {} children after direct spawn, got {:?}",
3344            initial_count + 1,
3345            node2.children
3346        );
3347    }
3348}