Skip to main content

hyperactor_mesh/
mesh_admin.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mesh-level admin surface for topology introspection and reference
10//! walking.
11//!
12//! This module defines `MeshAdminAgent`, an actor that exposes a
13//! uniform, reference-based HTTP API over an entire host mesh. Every
14//! addressable entity in the mesh is represented as a `NodePayload`
15//! and resolved via typed `NodeRef` references (parsed from HTTP
16//! path strings at the request boundary).
17//!
18//! Incoming HTTP requests are bridged into the actor message loop
19//! using `ResolveReferenceMessage`, ensuring that all topology
20//! resolution and data collection happens through actor messaging.
21//! The agent fans out to `HostAgent` instances to fetch host,
22//! proc, and actor details, then normalizes them into a single
23//! tree-shaped model (`NodeProperties` + children references)
24//! suitable for topology-agnostic clients such as the admin TUI.
25//!
26//! # Schema strategy
27//!
28//! The external API contract is schema-first: the JSON Schema
29//! (Draft 2020-12) served at `GET /v1/schema` is the
30//! authoritative definition of the response shape. The error
31//! envelope schema is at `GET /v1/schema/error`.
32//!
33//! Schema and OpenAPI are derived from the HTTP boundary DTO types
34//! in [`crate::introspect::dto`] (`NodePayloadDto`,
35//! `NodePropertiesDto`, `FailureInfoDto`) via
36//! `schemars::JsonSchema`. The domain types (`NodePayload`,
37//! `NodeProperties`, `FailureInfo`) do not carry `JsonSchema` —
38//! they own the typed internal model; the DTOs own the wire
39//! contract.
40//!
41//! This follows the "Admin Gateway Pattern" RFC
42//! ([doc](https://fburl.com/1dvah88uutaiyesebojouen2)):
43//! schema is the product; transports and tooling are projections.
44//!
45//! ## Schema generation pipeline
46//!
47//! 1. `#[derive(JsonSchema)]` on `NodePayloadDto`,
48//!    `NodePropertiesDto`, `FailureInfoDto`, `ApiError`,
49//!    `ApiErrorEnvelope`.
50//! 2. `schemars::schema_for!(T)` produces a `Schema` value at
51//!    runtime (Draft 2020-12).
52//! 3. The `serve_schema` / `serve_error_schema` handlers inject a
53//!    `$id` field (SC-4) and serve the result as JSON.
54//! 4. Snapshot tests in `introspect::tests` compare the raw
55//!    schemars output (without `$id`) against checked-in golden
56//!    files to detect drift (SC-2).
57//! 5. Validation tests construct domain payloads, convert to DTOs,
58//!    and confirm the serialized DTOs pass schema validation
59//!    (SC-3).
60//!
61//! ## Regenerating snapshots
62//!
63//! After intentional changes to the DTO types
64//! (`NodePayloadDto`, `NodePropertiesDto`, `FailureInfoDto`),
65//! `ApiError`, or `ApiErrorEnvelope`, regenerate the golden
66//! files:
67//!
68//! ```sh
69//! buck run fbcode//monarch/hyperactor_mesh:generate_api_artifacts \
70//!   @fbcode//mode/dev-nosan -- \
71//!   fbcode/monarch/hyperactor_mesh/src/testdata
72//! ```
73//!
74//! Or via cargo:
75//!
76//! ```sh
77//! cargo run -p hyperactor_mesh --bin generate_api_artifacts -- \
78//!   hyperactor_mesh/src/testdata
79//! ```
80//!
81//! Then re-run tests to confirm the new snapshot passes.
82//!
83//! ## Schema invariants (SC-*)
84//!
85//! - **SC-1 (schema-derived):** Schema is derived from the DTO
86//!   types via `schemars::JsonSchema`, not hand-written.
87//! - **SC-2 (schema-snapshot-stability):** Schema changes must
88//!   be explicit — a snapshot test catches unintentional drift.
89//! - **SC-3 (schema-payload-conformance):** Domain payloads
90//!   converted to DTOs validate against the generated schema.
91//! - **SC-4 (schema-version-identity):** Served schemas carry a
92//!   `$id` tied to the API version (e.g.
93//!   `https://monarch.meta.com/schemas/v1/node_payload`).
94//! - **SC-5 (route-precedence):** Literal schema routes are
95//!   matched by specificity before the `{*reference}` wildcard
96//!   (axum 0.8 specificity-based routing).
97//!
98//! Note on `ApiError.details`: the derived schema is maximally
99//! permissive for `details` (any valid JSON). This is intentional
100//! for v1 — `details` is a domain-specific escape hatch.
101//! Consumers must not assume a fixed shape.
102//!
103//! # Introspection visibility policy
104//!
105//! Admin tooling only displays **introspectable** nodes: entities
106//! that are reachable via actor messaging and respond to
107//! [`IntrospectMessage`]. Infrastructure procs that are
108//! **non-routable** are intentionally **opaque** to introspection and
109//! are omitted from the navigation graph.
110//!
111//! ## Definitions
112//!
113//! **Routable** — an entity is routable if the system can address it
114//! via the routing layer and successfully deliver a message to it
115//! using a `Addr` / `ActorAddr` (i.e., there exists a live mailbox
116//! sender reachable through normal routing). Practical test: "can I
117//! send `IntrospectMessage::Query` to it and get a reply?"
118//!
119//! **Non-routable** — an entity is non-routable if it has no
120//! externally reachable mailbox sender in the routing layer, so
121//! message delivery is impossible by construction (even if you know
122//! its name). Examples: `hyperactor_runtime[0]`, `mailbox_server[N]`,
123//! `local[N]` — these use `PanickingMailboxSender` and are never
124//! bound to the router.
125//!
126//! **Introspectable** — tooling can obtain a `NodePayload` for this
127//! node by sending `IntrospectMessage` to a routable actor.
128//!
129//! **Opaque** — the node exists but is not introspectable via
130//! messaging; tooling cannot observe it through the introspection
131//! protocol.
132//!
133//! ## Proc visibility
134//!
135//! A proc is not directly introspected; actors are. Tooling
136//! synthesizes proc-level nodes by grouping introspectable actors by
137//! `ProcAddr`.
138//!
139//! A proc is visible iff there exists at least one actor on that proc
140//! whose `ActorAddr` is deliverable via the routing layer (i.e., the
141//! actor has a bound mailbox sender reachable through normal routing)
142//! and responds to `IntrospectMessage`.
143//!
144//! The rule is: **if an entity is routable via the mesh routing layer
145//! (i.e., tooling can deliver `IntrospectMessage::Query` to one of its
146//! actors), then it is introspectable and appears in the admin graph.**
147//!
148//! ## Navigation identity invariants (NI-*)
149//!
150//! Every `NodePayload` in the topology tree satisfies:
151//!
152//! - **NI-1 (identity = reference):** A node's `identity: NodeRef`
153//!   must correspond to the reference used to resolve it. The
154//!   display form of `identity` round-trips through `NodeRef::from_str`.
155//!
156//! - **NI-2 (parent = containment parent):** A node's
157//!   `parent: Option<NodeRef>` records its canonical containment
158//!   parent, not the inverse of every navigation edge. Specifically:
159//!   root → `None`, host → `Root`, proc → `Host(…)`,
160//!   actor → `Proc(…)`. An actor's parent is always its owning proc,
161//!   even when the actor also appears as a child of another actor via
162//!   supervision.
163//!
164//! - **NI-3 (children = navigation graph):** A node's `children`
165//!   is the admin navigation graph. Actor-to-actor supervision links
166//!   coexist with proc→actor membership links without changing
167//!   `parent`. The same actor may therefore appear in `children` of
168//!   both its proc and its supervising actor.
169//!
170//! Together these ensure that the TUI can correlate responses to tree
171//! nodes, and that upward/downward navigation is consistent.
172//!
173//! ## Link-classification invariants (LC-*)
174//!
175//! These describe which nodes emit `system_children` and
176//! `stopped_children` classification sets, and what those sets
177//! contain.
178//!
179//! - **LC-1 (root system_children empty):** Root payloads always
180//!   emit `system_children: vec![]`. Root children are host nodes,
181//!   which are not classified as system.
182//!
183//! - **LC-2 (host system_children empty):** Host payloads always
184//!   emit `system_children: vec![]`. Host children are procs, which
185//!   are not classified as system — only actors carry the system
186//!   classification.
187//!
188//! - **LC-3 (proc system_children subset):** Proc payloads emit
189//!   `system_children ⊆ children`, containing only `NodeRef::Actor`
190//!   refs where `cell.is_system()` is true.
191//!
192//! - **LC-4 (proc stopped_children subset):** Proc payloads emit
193//!   `stopped_children ⊆ children`, containing only
194//!   `NodeRef::Actor` refs for terminated actors retained for
195//!   post-mortem inspection.
196//!
197//! - **LC-5 (actor/error no classification sets):** Actor and Error
198//!   payloads do not carry `system_children` or `stopped_children`.
199//!
200//! ## Proc-resolution invariants (SP-*)
201//!
202//! When a proc reference is resolved, the returned `NodePayload`
203//! satisfies:
204//!
205//! - **SP-1 (identity):** The identity matches the ProcAddr reference
206//!   from the parent's children list.
207//! - **SP-2 (properties):** The properties are `NodeProperties::Proc`.
208//! - **SP-3 (parent):** The parent is `NodeRef::Host(actor_id)`.
209//! - **SP-4 (as_of):** The `as_of` field is present and valid
210//!   (internally `SystemTime`; serialized as ISO 8601 string over
211//!   the HTTP JSON API per HB-1).
212//!
213//! Enforced by `test_system_proc_identity`.
214//!
215//! ## Proc-agent invariants (PA-*)
216//!
217//! - **PA-1 (live children):** Proc-node children used by admin/TUI
218//!   must be derived from live proc state at query time. No
219//!   additional publish event is required for a newly spawned actor
220//!   to appear.
221//!
222//! Enforced by `test_proc_children_reflect_directly_spawned_actors`.
223//!
224//! ## Robustness invariant (MA-R1)
225//!
226//! - **MA-R1 (no-crash):** `MeshAdminAgent` must never crash the OS
227//!   process it resides in. Every handler catches errors and converts
228//!   them into structured error payloads
229//!   (`ResolveReferenceResponse(Err(..))`, `NodeProperties::Error`,
230//!   etc.) rather than propagating panics or unwinding. Failed reply
231//!   sends (the caller went away) are silently swallowed.
232//!
233//! ## TLS transport invariant (MA-T1)
234//!
235//! - **MA-T1 (tls):** At Meta (`fbcode_build`), the admin HTTP
236//!   server **requires** mutual TLS. At startup it probes for
237//!   certificates via `try_tls_acceptor` with client cert
238//!   enforcement enabled. If no usable certificate bundle is found,
239//!   `init()` returns an error — no plain HTTP fallback. In OSS,
240//!   TLS is best-effort with plain HTTP fallback.
241//!
242//! - **MA-T2 (scheme-in-url):** The URL returned by `GetAdminAddr`
243//!   is always `https://host:port` or `http://host:port`, never a
244//!   bare `host:port`. All callers receive and use this full URL
245//!   directly.
246//!
247//! ## Client host invariants (CH-*)
248//!
249//! Let **A** denote the aggregated host set (the union of hosts
250//! from all meshes passed to [`host_mesh::spawn_admin`],
251//! deduplicated by `HostAgent` `ActorAddr` — see SA-3), and let
252//! **C** denote the process-global singleton client host mesh in
253//! the caller process (whose local proc hosts the root client
254//! actor).
255//!
256//! - **CH-1 (deduplication):** When C ∈ A, the client host appears
257//!   exactly once in the admin host list (deduplicated by `HostAgent`
258//!   `ActorAddr` identity). When C ∉ A, `spawn_admin` includes C
259//!   alongside A's hosts so the admin introspects C as a normal host
260//!   subtree, not as a standalone proc.
261//!
262//! - **CH-2 (reachability):** In both cases, the root client actor
263//!   is reachable through the standard host → proc → actor walk.
264//!
265//! - **CH-3 (ordering):** C must be initialized before
266//!   `spawn_admin` executes. In Rust, calling `context()` /
267//!   `this_host()` / `this_proc()` triggers `GLOBAL_CONTEXT`
268//!   bootstrap, which initializes C. In Python, `bootstrap_host()`
269//!   calls `register_client_host()` before any actor code runs.
270//!   Either path ensures C is available by the time `spawn_admin`
271//!   reads it via `try_this_host()`. Any refactor must preserve
272//!   this ordering.
273//!
274//! - **CH-4 (runtime-agnostic client-host discovery):** `spawn_admin`
275//!   discovers C via `try_this_host()`, which checks two sources
276//!   in order: the Rust `GLOBAL_CONTEXT` (initialized via
277//!   `context()` / `this_host()` / `this_proc()`) and the
278//!   externally registered client host (set by
279//!   `register_client_host()` from Python's `bootstrap_host()`).
280//!   Aggregation logic must not branch on which source provided C.
281//!
282//! **Mechanism:** [`host_mesh::spawn_admin`] aggregates hosts from
283//! all input meshes (SA-3), reads C from the caller process (via
284//! `try_this_host()`), merges it with the aggregated set (SA-6),
285//! deduplicates by `HostAgent` `ActorAddr`, and spawns the
286//! `MeshAdminAgent` on the caller's local proc via
287//! `cx.instance().proc().spawn(...)`. Placement now follows the
288//! caller context rather than mesh topology.
289//!
290//! ## Spawn/aggregation invariants (SA-*)
291//!
292//! [`host_mesh::spawn_admin`] aggregates hosts from one or more
293//! meshes into a single admin host set.
294//!
295//! - **SA-1 (non-empty mesh set):** The input must yield at least
296//!   one mesh.
297//! - **SA-2 (non-empty hosts):** Every input mesh must contain at
298//!   least one host.
299//! - **SA-3 (host-agent identity dedup):** The admin host set is
300//!   the ordered union of host agents from all input meshes,
301//!   deduplicated by `HostAgent` `ActorAddr` in first-seen order.
302//! - **SA-4 (single-mesh degeneracy):** `spawn_admin([mesh], ...)`
303//!   is behaviorally equivalent to the former `mesh.spawn_admin(...)`.
304//!   Established by existing single-mesh integration tests (e.g.
305//!   `dining_philosophers`); no dedicated unit test.
306//! - **SA-5 (caller-local placement):** The admin is spawned on the
307//!   caller's local proc — the `Proc` of the actor context passed to
308//!   `spawn_admin()`. In common remote launch flows, the caller is
309//!   typically the root client/control process.
310//! - **SA-6 (client-host merge after aggregation):** Client-host
311//!   inclusion/dedup (CH-1) operates on the already-aggregated host
312//!   set, not per-mesh independently.
313//!
314//! ## MAST resolution (disabled)
315//!
316//! `mast_conda:///` resolution is disabled. The old topology-based
317//! resolution assumed the admin lived on the first mesh head host,
318//! which is no longer true after SA-5 changed to caller-local
319//! placement. All resolution paths now return explicit errors.
320//! A publication-based discovery mechanism will replace this in a
321//! future change. Until then, discover the admin URL from
322//! startup output or another launch-time publication.
323//!
324//! ## Admin self-identification invariants (AI-*)
325//!
326//! - **AI-1 (live identity):** `GET /v1/admin` returns the live
327//!   admin actor identity as `AdminInfo`.
328//! - **AI-2 (reported proc):** `proc_id` reports the hosting proc.
329//!   Placement equality (SA-5) is proved by unit tests; integration
330//!   tests validate that `proc_id` is populated and well-formed.
331//! - **AI-3 (url consistency):** `url` matches `GetAdminAddr`.
332//!
333//! The relationship between `host` and `url` (formerly AI-4) is
334//! now a constructor guarantee of [`AdminInfo::new`] rather than a
335//! live invariant. It is not in this registry.
336
337use std::collections::HashMap;
338use std::io;
339use std::sync::Arc;
340use std::time::Duration;
341
342use async_trait::async_trait;
343use axum::Json;
344use axum::Router;
345use axum::extract::Path as AxumPath;
346use axum::extract::State;
347use axum::http::StatusCode;
348use axum::response::IntoResponse;
349use axum::routing::get;
350use axum::routing::post;
351use hyperactor::Actor;
352use hyperactor::ActorHandle;
353use hyperactor::ActorRef;
354use hyperactor::Context;
355use hyperactor::Endpoint as _;
356use hyperactor::HandleClient;
357use hyperactor::Handler;
358use hyperactor::Instance;
359use hyperactor::OncePortRef;
360use hyperactor::ProcAddr;
361use hyperactor::RefClient;
362use hyperactor::channel::try_tls_acceptor;
363use hyperactor::introspect::IntrospectMessage;
364use hyperactor::introspect::IntrospectResult;
365use hyperactor::introspect::IntrospectView;
366use hyperactor::mailbox::open_once_port;
367use serde::Deserialize;
368use serde::Serialize;
369use serde_json::Value;
370use tokio::net::TcpListener;
371use tokio_rustls::TlsAcceptor;
372use typeuri::Named;
373
374use crate::config_dump::ConfigDump;
375use crate::config_dump::ConfigDumpResult;
376use crate::host::SERVICE_PROC_NAME;
377use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
378use crate::host_mesh::host_agent::HostAgent;
379use crate::introspect::NodePayload;
380use crate::introspect::NodeProperties;
381use crate::introspect::dto::NodePayloadDto;
382use crate::introspect::to_node_payload;
383use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
384use crate::proc_agent::ProcAgent;
385use crate::pyspy::PySpyDump;
386use crate::pyspy::PySpyOpts;
387use crate::pyspy::PySpyProfile;
388use crate::pyspy::PySpyProfileOpts;
389use crate::pyspy::PySpyProfileResult;
390use crate::pyspy::PySpyResult;
391use crate::pyspy::ValidatedProfileRequest;
392
393/// Send an `IntrospectMessage` to an actor and receive the reply.
394/// Encapsulates open_once_port + send + timeout + error handling.
395async fn query_introspect(
396    cx: &hyperactor::Context<'_, MeshAdminAgent>,
397    actor_id: &hyperactor::ActorAddr,
398    view: hyperactor::introspect::IntrospectView,
399    timeout: Duration,
400    err_ctx: &str,
401) -> Result<IntrospectResult, anyhow::Error> {
402    let introspect_port = hyperactor::PortRef::<IntrospectMessage>::attest_handler_port(actor_id);
403    let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
404    let mut reply_ref = reply_handle.bind();
405    reply_ref.return_undeliverable(false);
406    introspect_port.post(
407        cx,
408        IntrospectMessage::Query {
409            view,
410            reply: reply_ref,
411        },
412    );
413    tokio::time::timeout(timeout, reply_rx.recv())
414        .await
415        .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
416        .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
417}
418
419/// Send an `IntrospectMessage::QueryChild` to an actor.
420async fn query_child_introspect(
421    cx: &hyperactor::Context<'_, MeshAdminAgent>,
422    actor_id: &hyperactor::ActorAddr,
423    child_ref: hyperactor::Addr,
424    timeout: Duration,
425    err_ctx: &str,
426) -> Result<IntrospectResult, anyhow::Error> {
427    let introspect_port = hyperactor::PortRef::<IntrospectMessage>::attest_handler_port(actor_id);
428    let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
429    let mut reply_ref = reply_handle.bind();
430    reply_ref.return_undeliverable(false);
431    introspect_port.post(
432        cx,
433        IntrospectMessage::QueryChild {
434            child_ref,
435            reply: reply_ref,
436        },
437    );
438    tokio::time::timeout(timeout, reply_rx.recv())
439        .await
440        .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
441        .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
442}
443
444/// Actor name used when spawning the mesh admin agent.
445pub const MESH_ADMIN_ACTOR_NAME: &str = "mesh_admin";
446
447/// Actor name for the HTTP bridge client mailbox on the service proc.
448///
449/// Unlike `MESH_ADMIN_ACTOR_NAME`, this is not a full actor: it is a
450/// client-mode `Instance<()>` created via
451/// `Proc::introspectable_instance()` and driven by Axum's Tokio task
452/// pool rather than an actor message loop. A separate instance is
453/// required because `MeshAdminAgent`'s own `Instance<Self>` is only
454/// accessible inside its message loop and cannot be shared with
455/// external tasks. This instance gives the HTTP handlers a routable
456/// proc identity so they can open one-shot reply ports
457/// (`open_once_port`) to receive responses from `MeshAdminAgent`.
458///
459/// Unlike a plain `client()`, this uses
460/// `Proc::introspectable_instance()` so the bridge responds to
461/// `IntrospectMessage::Query` and appears as a navigable node in the
462/// mesh TUI rather than causing a 504 when selected.
463pub const MESH_ADMIN_BRIDGE_NAME: &str = "mesh_admin_bridge";
464
465/// Structured error response following the gateway RFC envelope
466/// pattern.
467#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
468pub struct ApiError {
469    /// Machine-readable error code (e.g. "not_found", "bad_request").
470    pub code: String,
471    /// Human-readable error message.
472    pub message: String,
473    /// Additional context about the error. Schema is permissive
474    /// (any valid JSON) — `details` is a domain-specific escape
475    /// hatch. Do not assume a fixed shape.
476    #[serde(skip_serializing_if = "Option::is_none")]
477    pub details: Option<Value>,
478}
479
480/// Wrapper for the structured error envelope.
481#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
482pub struct ApiErrorEnvelope {
483    pub error: ApiError,
484}
485
486impl ApiError {
487    /// Create a "not_found" error.
488    pub fn not_found(message: impl Into<String>, details: Option<Value>) -> Self {
489        Self {
490            code: "not_found".to_string(),
491            message: message.into(),
492            details,
493        }
494    }
495
496    /// Create a "bad_request" error.
497    pub fn bad_request(message: impl Into<String>, details: Option<Value>) -> Self {
498        Self {
499            code: "bad_request".to_string(),
500            message: message.into(),
501            details,
502        }
503    }
504}
505
506impl IntoResponse for ApiError {
507    fn into_response(self) -> axum::response::Response {
508        let status = match self.code.as_str() {
509            "not_found" => StatusCode::NOT_FOUND,
510            "bad_request" => StatusCode::BAD_REQUEST,
511            "gateway_timeout" => StatusCode::GATEWAY_TIMEOUT,
512            "service_unavailable" => StatusCode::SERVICE_UNAVAILABLE,
513            _ => StatusCode::INTERNAL_SERVER_ERROR,
514        };
515        let envelope = ApiErrorEnvelope { error: self };
516        (status, Json(envelope)).into_response()
517    }
518}
519
520/// Response payload for `MeshAdminMessage::GetAdminAddr`.
521///
522/// `addr` is `None` until the admin HTTP server has successfully
523/// bound a listening socket during `MeshAdminAgent::init`.
524#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
525pub struct MeshAdminAddrResponse {
526    pub addr: Option<String>,
527}
528wirevalue::register_type!(MeshAdminAddrResponse);
529
530/// Messages handled by the `MeshAdminAgent`.
531///
532/// These are mesh-admin control-plane queries (as opposed to topology
533/// resolution). They’re wirevalue-serializable and come with
534/// generated client/ref helpers via `HandleClient`/`RefClient`.
535#[derive(
536    Debug,
537    Clone,
538    PartialEq,
539    Serialize,
540    Deserialize,
541    Handler,
542    HandleClient,
543    RefClient,
544    Named
545)]
546pub enum MeshAdminMessage {
547    /// Return the HTTP admin server address that this agent bound in
548    /// `init`.
549    ///
550    /// The reply contains `None` if the server hasn't started yet.
551    GetAdminAddr {
552        #[reply]
553        reply: OncePortRef<MeshAdminAddrResponse>,
554    },
555}
556wirevalue::register_type!(MeshAdminMessage);
557
558/// Newtype wrapper around `Result<NodePayload, String>` for the
559/// resolve reply port (`OncePortRef` requires `Named`).
560#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
561pub struct ResolveReferenceResponse(pub Result<NodePayload, String>);
562wirevalue::register_type!(ResolveReferenceResponse);
563
564/// Message for resolving a reference (string from HTTP path) into a
565/// `NodePayload`.
566///
567/// This is the primary “navigation” request used by the admin HTTP
568/// bridge: the caller provides a reference (e.g. `"root"`, a `ProcAddr`
569/// string, or an `ActorAddr` string) and the `MeshAdminAgent` returns a
570/// uniformly shaped `NodePayload` plus child references to continue
571/// walking the topology.
572///
573/// The work happens inside the admin actor's message loop so
574/// resolution can:
575/// - parse and validate the reference format,
576/// - dispatch to the right host/proc/actor via existing admin
577///   queries, and
578/// - return a structured payload without blocking HTTP handlers on
579///   mesh logic.
580#[derive(
581    Debug,
582    Clone,
583    PartialEq,
584    Serialize,
585    Deserialize,
586    Handler,
587    HandleClient,
588    RefClient,
589    Named
590)]
591pub enum ResolveReferenceMessage {
592    /// Resolve `reference_string` to a `NodePayload`.
593    ///
594    /// On success the reply contains `payload=Some(..), error=None`; on failure
595    /// it contains `payload=None, error=Some(..)`.
596    Resolve {
597        /// Addr string from the HTTP path, parsed into a typed
598        /// `NodeRef` at the resolve boundary.
599        reference_string: String,
600        /// Reply port receiving the resolution result.
601        #[reply]
602        reply: OncePortRef<ResolveReferenceResponse>,
603    },
604}
605wirevalue::register_type!(ResolveReferenceMessage);
606
607/// Actor that serves a mesh-level admin HTTP endpoint.
608///
609/// `MeshAdminAgent` is the mesh-wide aggregation point for
610/// introspection: it holds `ActorRef<HostAgent>` handles for each
611/// host, and answers admin queries by forwarding targeted requests to
612/// the appropriate host agent and assembling a uniform `NodePayload`
613/// response for the client.
614///
615/// The agent also exposes an HTTP server (spawned from `init`) and
616/// supports reference-based navigation (`GET /v1/{reference}`) by
617/// resolving HTTP path references into typed `NodePayload` values
618/// plus child references.
619#[hyperactor::export(handlers = [MeshAdminMessage, ResolveReferenceMessage])]
620pub struct MeshAdminAgent {
621    /// Map of host address string → `HostAgent` reference used to
622    /// fan out our target admin queries.
623    hosts: HashMap<String, ActorRef<HostAgent>>,
624
625    /// Reverse index: `HostAgent` `ActorAddr` → host address
626    /// string.
627    ///
628    /// The host agent itself is an actor that can appear in multiple
629    /// places (e.g., as a host node and as a child actor under a
630    /// system proc). This index lets reference resolution treat that
631    /// `ActorAddr` as a *Host* node (via `resolve_host_node`) rather
632    /// than a generic *Actor* node, avoiding cycles / dropped nodes
633    /// in clients like the TUI.
634    host_agents_by_actor_id: HashMap<hyperactor::ActorAddr, String>,
635
636    /// `ActorAddr` of the process-global root client (`client[0]` on
637    /// the singleton Host's `local_proc`), exposed as a first-class
638    /// child of the root node. Routable and introspectable via the
639    /// blanket `Handler<IntrospectMessage>`.
640    root_client_actor_id: Option<hyperactor::ActorAddr>,
641
642    /// This agent's own `ActorAddr`, captured during `init`. Used to
643    /// include the admin proc as a visible node in the introspection
644    /// tree (the principle: "if you can send it a message, you can
645    /// introspect it").
646    self_actor_id: Option<hyperactor::ActorAddr>,
647
648    // -- HTTP server address fields --
649    //
650    // The admin HTTP server has three address representations:
651    //
652    //   1. `admin_addr_override` — caller-supplied bind address
653    //      (constructor param). When `None`, `init` reads
654    //      `MESH_ADMIN_ADDR` from config instead.
655    //
656    //   2. `admin_addr` — the actual `SocketAddr` the OS assigned
657    //      after `TcpListener::bind`. Populated during `init`.
658    //
659    //   3. `admin_host` — human-friendly URL with the machine
660    //      hostname (not the raw IP) so it works with DNS and TLS
661    //      certificate validation. Returned via `GetAdminAddr`.
662    /// Caller-supplied bind address. When `None`, `init` reads
663    /// `MESH_ADMIN_ADDR` from config.
664    admin_addr_override: Option<std::net::SocketAddr>,
665
666    /// Actual bound address after `TcpListener::bind`, populated
667    /// during `init`.
668    admin_addr: Option<std::net::SocketAddr>,
669
670    /// Hostname-based URL (e.g. `"https://myhost.facebook.com:1729"`)
671    /// for the admin HTTP server. Returned via `GetAdminAddr`.
672    admin_host: Option<String>,
673
674    /// Base URL of the Monarch dashboard. Passed at construction.
675    /// Used by proxy routes that forward requests to the dashboard's
676    /// `/api/*` endpoints.
677    telemetry_url: Option<String>,
678
679    /// When the mesh was started (ISO-8601 timestamp).
680    started_at: String,
681
682    /// Username who started the mesh.
683    started_by: String,
684}
685
686impl MeshAdminAgent {
687    /// Construct a `MeshAdminAgent` from a list of `(host_addr,
688    /// host_agent_ref)` pairs and an optional root client `ActorAddr`.
689    ///
690    /// Builds both:
691    /// - `hosts`: the forward map used to route admin queries to the
692    ///   correct `HostAgent`, and
693    /// - `host_agents_by_actor_id`: a reverse index used during
694    ///   reference resolution to recognize host-agent `ActorAddr`s and
695    ///   resolve them as `NodeProperties::Host` rather than as
696    ///   generic actors.
697    ///
698    /// When `root_client_actor_id` is `Some`, the root client appears
699    /// as a first-class child of the root node in the introspection
700    /// tree.
701    ///
702    /// The HTTP listen address is initialized to `None` and populated
703    /// during `init()` after the server socket is bound.
704    pub fn new(
705        hosts: Vec<(String, ActorRef<HostAgent>)>,
706        root_client_actor_id: Option<hyperactor::ActorAddr>,
707        admin_addr: Option<std::net::SocketAddr>,
708        telemetry_url: Option<String>,
709    ) -> Self {
710        let host_agents_by_actor_id: HashMap<hyperactor::ActorAddr, String> = hosts
711            .iter()
712            .map(|(addr, agent_ref)| (agent_ref.actor_addr().clone(), addr.clone()))
713            .collect();
714
715        // Capture start time and username
716        let started_at = chrono::Utc::now().to_rfc3339();
717        let started_by = std::env::var("USER")
718            .or_else(|_| std::env::var("USERNAME"))
719            .unwrap_or_else(|_| "unknown".to_string());
720
721        Self {
722            hosts: hosts.into_iter().collect(),
723            host_agents_by_actor_id,
724            root_client_actor_id,
725            self_actor_id: None,
726            admin_addr_override: admin_addr,
727            admin_addr: None,
728            admin_host: None,
729            telemetry_url,
730            started_at,
731            started_by,
732        }
733    }
734}
735
736impl std::fmt::Debug for MeshAdminAgent {
737    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
738        f.debug_struct("MeshAdminAgent")
739            .field("hosts", &self.hosts.keys().collect::<Vec<_>>())
740            .field("host_agents", &self.host_agents_by_actor_id.len())
741            .field("root_client_actor_id", &self.root_client_actor_id)
742            .field("self_actor_id", &self.self_actor_id)
743            .field("admin_addr", &self.admin_addr)
744            .field("admin_host", &self.admin_host)
745            .field("started_at", &self.started_at)
746            .field("started_by", &self.started_by)
747            .finish()
748    }
749}
750
751/// Self-identification payload returned by `GET /v1/admin`.
752///
753/// Construct via [`AdminInfo::new`]. AI-1, AI-2, AI-3 are live
754/// invariants. The relationship between `host` and `url` is a
755/// constructor guarantee — `AdminInfo::new()` rejects URLs with no
756/// host, so `host` always derives from `url` at construction.
757#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema)]
758pub struct AdminInfo {
759    /// Stringified `ActorAddr` of the `MeshAdminAgent`.
760    pub actor_id: String,
761    /// Stringified `ProcAddr` of the proc hosting `MeshAdminAgent`.
762    pub proc_id: String,
763    /// Hostname the admin HTTP server bound on (derived from `url`).
764    pub host: String,
765    /// Full admin URL (e.g. `"https://myhost.facebook.com:1729"`).
766    pub url: String,
767}
768
769impl AdminInfo {
770    /// Construct from identity components and a full admin URL.
771    ///
772    /// Parses `url` strictly using the `url` crate. Returns an error
773    /// if the URL is invalid or has no host component. `host` is
774    /// derived from the parsed URL — the relationship between `host`
775    /// and `url` holds by construction, not by test.
776    pub fn new(actor_id: String, proc_id: String, url: String) -> anyhow::Result<Self> {
777        let parsed = url::Url::parse(&url)
778            .map_err(|e| anyhow::anyhow!("invalid admin URL '{}': {}", url, e))?;
779        let host = parsed
780            .host_str()
781            .ok_or_else(|| anyhow::anyhow!("admin URL '{}' has no host", url))?
782            .to_string();
783        Ok(Self {
784            actor_id,
785            proc_id,
786            host,
787            url,
788        })
789    }
790}
791
792/// Shared state for the reference-based `/v1/{*reference}` bridge
793/// route.
794///
795/// The HTTP handler itself is intentionally thin and does not perform
796/// any routing logic. Instead, it forwards each request into the
797/// `MeshAdminAgent` actor via `ResolveReferenceMessage`, ensuring
798/// resolution happens inside the actor message loop (with access to
799/// actor messaging, timeouts, and indices).
800struct BridgeState {
801    /// Addr to the `MeshAdminAgent` actor that performs
802    /// reference resolution.
803    admin_ref: ActorRef<MeshAdminAgent>,
804    /// Dedicated client mailbox on system_proc for HTTP bridge reply
805    /// ports. Using a separate `Instance<()>` avoids sharing the
806    /// actor's own mailbox with the HTTP bridge and ensures the
807    /// bridge context is routable via system_proc's frontend address.
808    // Previous approach used `this.clone_for_py()` which cloned the
809    // admin actor's Instance:
810    //   bridge_cx: Instance<MeshAdminAgent>,
811    bridge_cx: Instance<()>,
812    /// Limits the number of in-flight resolve requests to prevent
813    /// introspection queries from overwhelming the shared tokio
814    /// runtime and starving user actor workloads.
815    resolve_semaphore: tokio::sync::Semaphore,
816    /// Keep the handle alive so the bridge mailbox is not dropped.
817    _bridge_handle: ActorHandle<()>,
818    /// Base URL of the Monarch dashboard (e.g.
819    /// `"http://localhost:5000"`). Passed from `MeshAdminAgent` at
820    /// init time. Used by proxy routes that forward requests to the
821    /// dashboard's `/api/*` endpoints.
822    telemetry_url: Option<String>,
823    /// Shared HTTP client for outbound proxy requests to the
824    /// dashboard. Reuses connection pool across requests.
825    http_client: reqwest::Client,
826    /// Self-identification metadata, populated during admin init.
827    admin_info: AdminInfo,
828}
829
830/// Build an HTTP client for outbound proxy requests to the dashboard.
831///
832/// Loads the root CA via the same bundle-probing logic used by every
833/// other mesh-admin client so the reqwest client can verify the
834/// dashboard's TLS cert. Falls back to the default trust store (and
835/// a default client) when no bundle is available.
836fn build_http_client() -> reqwest::Client {
837    use std::io::Read;
838
839    if let Some(bundle) = hyperactor::channel::try_tls_pem_bundle() {
840        let mut ca_bytes = Vec::new();
841        if let Ok(mut reader) = bundle.ca.reader()
842            && reader.read_to_end(&mut ca_bytes).is_ok()
843        {
844            let (builder, ca_installed) = crate::mesh_admin_client::add_tls(
845                reqwest::Client::builder(),
846                &ca_bytes,
847                None,
848                None,
849            );
850            if ca_installed {
851                if let Ok(client) = builder.build() {
852                    return client;
853                }
854                tracing::warn!(
855                    "mesh admin: failed to build reqwest client with root CA; \
856                         falling back to default trust store"
857                );
858            }
859        }
860    }
861    reqwest::Client::new()
862}
863
864/// A TCP listener that performs a TLS handshake on each accepted
865/// connection before handing it to axum.
866///
867/// Implements [`axum::serve::Listener`] so it can be passed directly
868/// to [`axum::serve`].  Per the trait contract, `accept` handles
869/// errors internally (logging + retrying) and never returns `Err`.
870struct TlsListener {
871    tcp: TcpListener,
872    acceptor: TlsAcceptor,
873}
874
875impl axum::serve::Listener for TlsListener {
876    type Io = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
877    type Addr = std::net::SocketAddr;
878
879    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
880        loop {
881            let (stream, addr) = match self.tcp.accept().await {
882                Ok(conn) => conn,
883                Err(e) => {
884                    tracing::warn!("TCP accept error: {}", e);
885                    continue;
886                }
887            };
888
889            match self.acceptor.accept(stream).await {
890                Ok(tls_stream) => return (tls_stream, addr),
891                Err(e) => {
892                    tracing::warn!("TLS handshake failed from {}: {}", addr, e);
893                    continue;
894                }
895            }
896        }
897    }
898
899    fn local_addr(&self) -> io::Result<Self::Addr> {
900        self.tcp.local_addr()
901    }
902}
903
904#[async_trait]
905impl Actor for MeshAdminAgent {
906    /// Initializes the mesh admin agent and its HTTP server.
907    ///
908    /// 1. Binds well-known handler ports (`proc.spawn()` does not
909    ///    call `bind()` — unlike `gspawn` — so the actor must do it
910    ///    itself before becoming reachable).
911    /// 2. Binds a TCP listener (ephemeral or fixed port).
912    /// 3. Builds a TLS acceptor (explicit env vars, then Meta default
913    ///    paths). At Meta (`fbcode_build`), mTLS is mandatory and
914    ///    init fails if no certs are found. In OSS, falls back to
915    ///    plain HTTP.
916    /// 4. Creates a dedicated `Instance<()>` client mailbox on
917    ///    system_proc for the HTTP bridge's reply ports, keeping
918    ///    bridge traffic off the actor's own mailbox.
919    /// 5. Spawns the axum server in a background task (HTTPS with
920    ///    mTLS at Meta, HTTPS or HTTP in OSS depending on step 3).
921    ///
922    /// The hostname-based listen address is stored in `admin_host` so
923    /// it can be returned via `GetAdminAddr`. The scheme (`https://`
924    /// or `http://`) is included so clients know which protocol to
925    /// use.
926    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
927        // Bind well-known ports before the HTTP server is spawned, so
928        // messages (including Undeliverable bounces) can be delivered
929        // as soon as the admin is reachable.
930        this.bind::<Self>();
931        this.set_system();
932        self.self_actor_id = Some(this.self_addr().clone());
933
934        let bind_addr = match self.admin_addr_override {
935            Some(addr) => addr,
936            None => hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR)
937                .parse_socket_addr()
938                .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?,
939        };
940        let listener = TcpListener::bind(bind_addr).await?;
941        let bound_addr = listener.local_addr()?;
942        self.admin_addr = Some(bound_addr);
943
944        // At Meta: mTLS is mandatory — fail if no certs are found.
945        // In OSS: TLS is best-effort with plain HTTP fallback.
946        // See MA-T1 in module doc.
947        let enforce_mtls = cfg!(fbcode_build);
948        let tls_acceptor = try_tls_acceptor(enforce_mtls);
949
950        if enforce_mtls && tls_acceptor.is_none() {
951            return Err(anyhow::anyhow!(
952                "mesh admin requires mTLS but no TLS certificates found; \
953                 set HYPERACTOR_TLS_CERT/KEY/CA or ensure Meta cert paths exist \
954                 (/var/facebook/x509_identities/server.pem, /var/facebook/rootcanal/ca.pem)"
955            ));
956        }
957
958        let scheme = if tls_acceptor.is_some() {
959            "https"
960        } else {
961            "http"
962        };
963
964        // Build the host portion of the admin URL.
965        //
966        // Explicit bind (loopback, specific IP): honour the caller's
967        // choice — they bound that address intentionally.
968        //
969        // Wildcard bind: choose an advertised host that the loaded
970        // TLS certificate actually authorizes. Extract SANs from the
971        // cert and pick the first candidate that matches. This avoids
972        // emitting a URL that fails TLS verification.
973        let host = if !bound_addr.ip().is_unspecified() {
974            let ip = bound_addr.ip();
975            if ip.is_loopback() {
976                "localhost".to_string()
977            } else if let std::net::IpAddr::V6(v6) = ip {
978                format!("[{}]", v6)
979            } else {
980                ip.to_string()
981            }
982        } else {
983            advertised_host::from_cert_sans()
984        };
985        self.admin_host = Some(format!("{}://{}:{}", scheme, host, bound_addr.port()));
986
987        // Create a dedicated client mailbox on system_proc for the
988        // HTTP bridge's reply ports. This avoids sharing the admin
989        // actor's own mailbox with async HTTP handlers.
990        let (bridge_cx, bridge_handle) = this
991            .proc()
992            .introspectable_instance(MESH_ADMIN_BRIDGE_NAME)?;
993        bridge_cx.set_system();
994        let admin_url = self
995            .admin_host
996            .clone()
997            .unwrap_or_else(|| "unknown".to_string());
998        let bridge_state = Arc::new(BridgeState {
999            admin_ref: ActorRef::attest(this.self_addr().clone()),
1000            bridge_cx,
1001            resolve_semaphore: tokio::sync::Semaphore::new(hyperactor_config::global::get(
1002                crate::config::MESH_ADMIN_MAX_CONCURRENT_RESOLVES,
1003            )),
1004            _bridge_handle: bridge_handle,
1005            telemetry_url: self.telemetry_url.clone(),
1006            http_client: build_http_client(),
1007            admin_info: AdminInfo::new(
1008                this.self_addr().to_string(),
1009                this.self_addr().proc_addr().to_string(),
1010                admin_url,
1011            )?,
1012        });
1013        let router = create_mesh_admin_router(bridge_state);
1014
1015        if let Some(acceptor) = tls_acceptor {
1016            let tls_listener = TlsListener {
1017                tcp: listener,
1018                acceptor,
1019            };
1020            tokio::spawn(async move {
1021                if let Err(e) = axum::serve(tls_listener, router).await {
1022                    tracing::error!("mesh admin server (mTLS) error: {}", e);
1023                }
1024            });
1025        } else {
1026            // OSS fallback: plain HTTP (only reachable when !fbcode_build).
1027            tokio::spawn(async move {
1028                if let Err(e) = axum::serve(listener, router).await {
1029                    tracing::error!("mesh admin server error: {}", e);
1030                }
1031            });
1032        }
1033
1034        tracing::info!(
1035            "mesh admin server listening on {}",
1036            self.admin_host.as_deref().unwrap_or("unknown")
1037        );
1038        Ok(())
1039    }
1040
1041    /// Swallow undeliverable message bounces instead of crashing.
1042    ///
1043    /// The admin agent sends `IntrospectMessage` to actors that may
1044    /// not have the introspection port bound (e.g. actors spawned
1045    /// via `cx.spawn()` whose `#[export]` list does not include it).
1046    /// When the message cannot be delivered, the routing layer
1047    /// bounces an `Undeliverable` back to the sender. The default
1048    /// `Actor::handle_undeliverable_message` calls `bail!()`, which
1049    /// would kill this admin agent and — via supervision cascade —
1050    /// take down the entire admin process with `exit(1)`.
1051    ///
1052    /// Since the admin agent is best-effort infrastructure, an
1053    /// undeliverable introspection probe is not a fatal error.
1054    async fn handle_undeliverable_message(
1055        &mut self,
1056        _cx: &Instance<Self>,
1057        undeliverable: hyperactor::mailbox::Undeliverable<hyperactor::mailbox::MessageEnvelope>,
1058    ) -> Result<(), anyhow::Error> {
1059        match undeliverable {
1060            hyperactor::mailbox::Undeliverable::Message(envelope) => {
1061                tracing::debug!(
1062                    "admin agent: undeliverable message to {} (port not bound?), ignoring",
1063                    envelope.dest(),
1064                );
1065            }
1066            hyperactor::mailbox::Undeliverable::Lost(lost) => {
1067                tracing::debug!(
1068                    "admin agent: lost message to {} ({}), ignoring",
1069                    lost.dest,
1070                    lost.error,
1071                );
1072            }
1073        }
1074        Ok(())
1075    }
1076}
1077
1078/// Manual Handler impl — swallows `reply.send()` failures so the
1079/// admin agent stays alive when the HTTP caller disconnects.
1080#[async_trait]
1081impl Handler<MeshAdminMessage> for MeshAdminAgent {
1082    /// Dispatches `MeshAdminMessage` variants.
1083    ///
1084    /// Reply-send failures are swallowed because a dropped receiver
1085    /// (e.g. the HTTP bridge timed out) is not an error — the caller
1086    /// simply went away. Propagating the failure would crash the admin
1087    /// agent and take down the entire process.
1088    async fn handle(
1089        &mut self,
1090        cx: &Context<Self>,
1091        msg: MeshAdminMessage,
1092    ) -> Result<(), anyhow::Error> {
1093        match msg {
1094            MeshAdminMessage::GetAdminAddr { reply } => {
1095                let resp = MeshAdminAddrResponse {
1096                    addr: self.admin_host.clone(),
1097                };
1098                reply.post(cx, resp);
1099            }
1100        }
1101        Ok(())
1102    }
1103}
1104
1105/// Manual Handler impl — swallows `reply.send()` failures so the
1106/// admin agent stays alive when the HTTP caller disconnects.
1107#[async_trait]
1108impl Handler<ResolveReferenceMessage> for MeshAdminAgent {
1109    /// Dispatches `ResolveReferenceMessage` variants.
1110    ///
1111    /// The inner `resolve_reference` call never returns `Err` to the
1112    /// handler — failures are captured in the response payload.
1113    /// Reply-send failures are swallowed for the same reason as
1114    /// `MeshAdminMessage`: a dropped receiver means the caller (HTTP
1115    /// bridge) went away, which must not crash the admin agent.
1116    async fn handle(
1117        &mut self,
1118        cx: &Context<Self>,
1119        msg: ResolveReferenceMessage,
1120    ) -> Result<(), anyhow::Error> {
1121        match msg {
1122            ResolveReferenceMessage::Resolve {
1123                reference_string,
1124                reply,
1125            } => {
1126                let response = ResolveReferenceResponse(
1127                    self.resolve_reference(cx, &reference_string)
1128                        .await
1129                        .map_err(|e| format!("{:#}", e)),
1130                );
1131                reply.post(cx, response);
1132            }
1133        }
1134        Ok(())
1135    }
1136}
1137
1138impl MeshAdminAgent {
1139    /// Core resolver for the reference-based admin API.
1140    ///
1141    /// Parses the caller-provided `reference_string` (or handles the
1142    /// special `"root"` case), then dispatches to
1143    /// `resolve_host_node`, `resolve_proc_node`, or
1144    /// `resolve_actor_node` to assemble a fully-populated
1145    /// `NodePayload` (properties + child references).
1146    ///
1147    /// The returned payload satisfies the **navigation identity
1148    /// invariant** (see module docs): `payload.identity ==
1149    /// reference_string`, and `payload.parent` equals the identity of
1150    /// the node this one appears under.
1151    ///
1152    /// Note: this returns `Err` for internal use; the public
1153    /// `resolve` handler converts failures into
1154    /// `ResolveReferenceResponse(Err(..))` so the actor never crashes
1155    /// on
1156    /// lookup errors.
1157    async fn resolve_reference(
1158        &self,
1159        cx: &Context<'_, Self>,
1160        reference_string: &str,
1161    ) -> Result<NodePayload, anyhow::Error> {
1162        let node_ref: crate::introspect::NodeRef = reference_string
1163            .parse()
1164            .map_err(|e| anyhow::anyhow!("invalid reference '{}': {}", reference_string, e))?;
1165
1166        match &node_ref {
1167            crate::introspect::NodeRef::Root => Ok(self.build_root_payload()),
1168            crate::introspect::NodeRef::Host(actor_id) => {
1169                self.resolve_host_node(cx, actor_id).await
1170            }
1171            crate::introspect::NodeRef::Proc(proc_id) => {
1172                match self.resolve_proc_node(cx, proc_id).await {
1173                    Ok(payload) => Ok(payload),
1174                    Err(_) if self.standalone_proc_anchor(proc_id).is_some() => {
1175                        self.resolve_standalone_proc_node(cx, proc_id).await
1176                    }
1177                    Err(e) => Err(e),
1178                }
1179            }
1180            crate::introspect::NodeRef::Actor(actor_id) => {
1181                self.resolve_actor_node(cx, actor_id).await
1182            }
1183        }
1184    }
1185
1186    /// Returns the known actors on standalone procs — procs not
1187    /// managed by any host but whose actors are routable and
1188    /// introspectable. Each proc appears as a root child; the
1189    /// actor is the "anchor" used to discover the proc's contents.
1190    ///
1191    /// The root client is no longer standalone: spawn_admin registers
1192    /// C (the bootstrap host) as a normal host entry (A/C invariant).
1193    fn standalone_proc_actors(&self) -> impl Iterator<Item = &hyperactor::ActorAddr> {
1194        std::iter::empty()
1195    }
1196
1197    /// If `proc_id` belongs to a standalone proc, return the anchor
1198    /// actor on that proc. Returns `None` for host-managed procs.
1199    fn standalone_proc_anchor(&self, proc_id: &ProcAddr) -> Option<&hyperactor::ActorAddr> {
1200        self.standalone_proc_actors()
1201            .find(|actor_id| actor_id.proc_addr() == *proc_id)
1202    }
1203
1204    /// Returns true if `actor_id` lives on a standalone proc.
1205    fn is_standalone_proc_actor(&self, actor_id: &hyperactor::ActorAddr) -> bool {
1206        self.standalone_proc_actors()
1207            .any(|a| a.proc_addr() == actor_id.proc_addr())
1208    }
1209
1210    /// Construct the synthetic root node for the reference tree.
1211    ///
1212    /// The root is not a real actor/proc; it's a convenience node
1213    /// that anchors navigation. Its children are `NodeRef::Host`
1214    /// entries for each configured `HostAgent`.
1215    fn build_root_payload(&self) -> NodePayload {
1216        use crate::introspect::NodeRef;
1217
1218        let children: Vec<NodeRef> = self
1219            .hosts
1220            .values()
1221            .map(|agent| NodeRef::Host(agent.actor_addr().clone()))
1222            .collect();
1223        let system_children: Vec<NodeRef> = Vec::new(); // LC-1
1224        let mut attrs = hyperactor_config::Attrs::new();
1225        attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
1226        attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
1227        if let Ok(t) = humantime::parse_rfc3339(&self.started_at) {
1228            attrs.set(crate::introspect::STARTED_AT, t);
1229        }
1230        attrs.set(crate::introspect::STARTED_BY, self.started_by.clone());
1231        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1232        let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1233        NodePayload {
1234            identity: NodeRef::Root,
1235            properties: crate::introspect::derive_properties(&attrs_json),
1236            children,
1237            parent: None,
1238            as_of: std::time::SystemTime::now(),
1239        }
1240    }
1241
1242    /// Resolve a `HostAgent` actor reference into a host-level
1243    /// `NodePayload`.
1244    ///
1245    /// Sends `IntrospectMessage::Query` directly to the
1246    /// `HostAgent`, which returns a `NodePayload` with
1247    /// `NodeProperties::Host` and the host's children. The resolver
1248    /// overrides `parent` to `"root"` since the host agent
1249    /// doesn't know its position in the navigation tree.
1250    async fn resolve_host_node(
1251        &self,
1252        cx: &Context<'_, Self>,
1253        actor_id: &hyperactor::ActorAddr,
1254    ) -> Result<NodePayload, anyhow::Error> {
1255        let result = query_introspect(
1256            cx,
1257            actor_id,
1258            hyperactor::introspect::IntrospectView::Entity,
1259            hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1260            "querying host agent",
1261        )
1262        .await?;
1263        Ok(crate::introspect::to_node_payload_with(
1264            result,
1265            crate::introspect::NodeRef::Host(actor_id.clone()),
1266            Some(crate::introspect::NodeRef::Root),
1267        ))
1268    }
1269
1270    /// Resolve a `ProcAddr` reference into a proc-level `NodePayload`.
1271    ///
1272    /// First tries `IntrospectMessage::QueryChild` against the owning
1273    /// `HostAgent` (which recognizes service and local procs). If
1274    /// that returns an error payload, falls back to `ProcAgent` for
1275    /// user procs by querying
1276    /// `QueryChild(hyperactor::Addr::Proc(proc_id))`
1277    /// on `<proc_id>/proc_agent[0]`.
1278    ///
1279    /// See PA-1 in module doc.
1280    async fn resolve_proc_node(
1281        &self,
1282        cx: &Context<'_, Self>,
1283        proc_id: &ProcAddr,
1284    ) -> Result<NodePayload, anyhow::Error> {
1285        let host_addr = proc_id.addr().to_string();
1286
1287        let agent = self
1288            .hosts
1289            .get(&host_addr)
1290            .ok_or_else(|| anyhow::anyhow!("host not found: {}", host_addr))?;
1291
1292        // Try the host agent's QueryChild first.
1293        let result = query_child_introspect(
1294            cx,
1295            agent.actor_addr(),
1296            hyperactor::Addr::Proc(proc_id.clone()),
1297            hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1298            "querying proc details",
1299        )
1300        .await?;
1301
1302        // If the host recognized the proc, normalize identity and parent.
1303        // The host's QueryChild returns IntrospectRef::Actor(self_id) as
1304        // parent, which lifts to NodeRef::Actor. We need NodeRef::Host.
1305        let payload = crate::introspect::to_node_payload_with(
1306            result,
1307            crate::introspect::NodeRef::Proc(proc_id.clone()),
1308            Some(crate::introspect::NodeRef::Host(agent.actor_addr().clone())),
1309        );
1310        if !matches!(payload.properties, NodeProperties::Error { .. }) {
1311            return Ok(payload);
1312        }
1313
1314        // Fall back to querying the ProcAgent directly (user procs).
1315        let mesh_agent_id = proc_id.actor_addr(PROC_AGENT_ACTOR_NAME);
1316        let result = query_child_introspect(
1317            cx,
1318            &mesh_agent_id,
1319            hyperactor::Addr::Proc(proc_id.clone()),
1320            hyperactor_config::global::get(crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT),
1321            "querying proc mesh agent",
1322        )
1323        .await?;
1324
1325        Ok(crate::introspect::to_node_payload_with(
1326            result,
1327            crate::introspect::NodeRef::Proc(proc_id.clone()),
1328            Some(crate::introspect::NodeRef::Host(agent.actor_addr().clone())),
1329        ))
1330    }
1331
1332    /// Resolve a standalone proc into a proc-level `NodePayload`.
1333    ///
1334    /// Standalone procs (e.g. the admin proc) are not managed by any
1335    /// `HostAgent`, so
1336    /// `resolve_proc_node` cannot resolve them. Instead, we query the
1337    /// anchor actor on the proc for its introspection data, collect
1338    /// its supervision children, and build a synthetic proc node.
1339    ///
1340    /// Special case: when the anchor actor is this agent itself, we
1341    /// build the children list directly (just `[self]`) to avoid a
1342    /// self-deadlock — the actor loop cannot process an
1343    /// `IntrospectMessage` it sends to itself while handling a
1344    /// resolve request.
1345    async fn resolve_standalone_proc_node(
1346        &self,
1347        cx: &Context<'_, Self>,
1348        proc_id: &ProcAddr,
1349    ) -> Result<NodePayload, anyhow::Error> {
1350        let actor_id = self
1351            .standalone_proc_anchor(proc_id)
1352            .ok_or_else(|| anyhow::anyhow!("no anchor actor for standalone proc {}", proc_id))?;
1353
1354        use crate::introspect::NodeRef;
1355
1356        let (children, system_children) = if self.self_actor_id.as_ref() == Some(actor_id) {
1357            let self_ref = NodeRef::Actor(actor_id.clone());
1358            (vec![self_ref.clone()], vec![self_ref])
1359        } else {
1360            let actor_result = query_introspect(
1361                cx,
1362                actor_id,
1363                hyperactor::introspect::IntrospectView::Actor,
1364                hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1365                &format!("querying anchor actor on {}", proc_id),
1366            )
1367            .await?;
1368            let actor_payload = to_node_payload(actor_result);
1369            let anchor_ref = NodeRef::Actor(actor_id.clone());
1370            let anchor_is_system = matches!(
1371                &actor_payload.properties,
1372                NodeProperties::Actor {
1373                    is_system: true,
1374                    ..
1375                }
1376            );
1377
1378            let mut children = vec![anchor_ref.clone()];
1379            let mut system_children = Vec::new();
1380            if anchor_is_system {
1381                system_children.push(anchor_ref);
1382            }
1383
1384            for child_ref in actor_payload.children {
1385                let child_actor_id = match &child_ref {
1386                    NodeRef::Actor(id) => Some(id),
1387                    _ => None,
1388                };
1389                if let Some(child_actor_id) = child_actor_id {
1390                    let child_is_system = if let Ok(r) = query_introspect(
1391                        cx,
1392                        child_actor_id,
1393                        hyperactor::introspect::IntrospectView::Actor,
1394                        hyperactor_config::global::get(
1395                            crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1396                        ),
1397                        "querying child actor is_system",
1398                    )
1399                    .await
1400                    {
1401                        let p = to_node_payload(r);
1402                        matches!(
1403                            &p.properties,
1404                            NodeProperties::Actor {
1405                                is_system: true,
1406                                ..
1407                            }
1408                        )
1409                    } else {
1410                        false
1411                    };
1412                    if child_is_system {
1413                        system_children.push(child_ref.clone());
1414                    }
1415                }
1416                children.push(child_ref);
1417            }
1418            (children, system_children)
1419        };
1420
1421        let proc_name = proc_id
1422            .label()
1423            .map(|l| l.as_str().to_string())
1424            .unwrap_or_else(|| proc_id.id().to_string());
1425
1426        let mut attrs = hyperactor_config::Attrs::new();
1427        attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
1428        attrs.set(crate::introspect::PROC_NAME, proc_name.clone());
1429        attrs.set(crate::introspect::NUM_ACTORS, children.len());
1430        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1431        let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1432
1433        Ok(NodePayload {
1434            identity: NodeRef::Proc(proc_id.clone()),
1435            properties: crate::introspect::derive_properties(&attrs_json),
1436            children,
1437            as_of: std::time::SystemTime::now(),
1438            parent: Some(NodeRef::Root),
1439        })
1440    }
1441
1442    /// Resolve a non-host-agent `ActorAddr` reference into an
1443    /// actor-level `NodePayload`.
1444    ///
1445    /// Sends `IntrospectMessage::Query` directly to the target actor
1446    /// via `PortRef::attest_handler_port`. The blanket handler
1447    /// returns a `NodePayload` with `NodeProperties::Actor` (or a
1448    /// domain-specific override like `NodeProperties::Proc` for
1449    /// `ProcAgent`).
1450    ///
1451    /// The resolver sets `parent` based on the actor's position
1452    /// in the topology: if the actor lives in a system proc, the
1453    /// parent is the system proc ref; otherwise it's the proc's
1454    /// `ProcAddr` string.
1455    async fn resolve_actor_node(
1456        &self,
1457        cx: &Context<'_, Self>,
1458        actor_id: &hyperactor::ActorAddr,
1459    ) -> Result<NodePayload, anyhow::Error> {
1460        // Self-resolution: we cannot send IntrospectMessage to our
1461        // own actor loop while handling a resolve request (deadlock).
1462        // Use introspect_payload() to snapshot our own state
1463        // directly.
1464        let result = if self.self_actor_id.as_ref() == Some(actor_id) {
1465            cx.introspect_payload()
1466        } else if self.is_standalone_proc_actor(actor_id) {
1467            // Standalone procs have no ProcAgent — query directly.
1468            query_introspect(
1469                cx,
1470                actor_id,
1471                hyperactor::introspect::IntrospectView::Actor,
1472                hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1473                &format!("querying actor {}", actor_id),
1474            )
1475            .await?
1476        } else {
1477            // Check terminated snapshots first — fast, no ambiguity.
1478            let proc_id = actor_id.proc_addr();
1479            let mesh_agent_id = proc_id.actor_addr(PROC_AGENT_ACTOR_NAME);
1480            let terminated = query_child_introspect(
1481                cx,
1482                &mesh_agent_id,
1483                hyperactor::Addr::Actor(actor_id.clone()),
1484                hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1485                "querying terminated snapshot",
1486            )
1487            .await
1488            .ok()
1489            .filter(|r| {
1490                let p = crate::introspect::derive_properties(&r.attrs);
1491                !matches!(p, NodeProperties::Error { .. })
1492            });
1493
1494            match terminated {
1495                Some(snapshot) => snapshot,
1496                None => {
1497                    // Not terminated — query the live actor.
1498                    query_introspect(
1499                        cx,
1500                        actor_id,
1501                        hyperactor::introspect::IntrospectView::Actor,
1502                        hyperactor_config::global::get(
1503                            crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1504                        ),
1505                        &format!("querying actor {}", actor_id),
1506                    )
1507                    .await?
1508                }
1509            }
1510        };
1511        let mut payload = to_node_payload(result);
1512
1513        if self.is_standalone_proc_actor(actor_id) {
1514            payload.parent = Some(crate::introspect::NodeRef::Proc(actor_id.proc_addr()));
1515            return Ok(payload);
1516        }
1517
1518        let proc_id = actor_id.proc_addr();
1519        match &payload.properties {
1520            NodeProperties::Proc { .. } => {
1521                let host_addr = proc_id.addr().to_string();
1522                if let Some(agent) = self.hosts.get(&host_addr) {
1523                    payload.parent =
1524                        Some(crate::introspect::NodeRef::Host(agent.actor_addr().clone()));
1525                }
1526            }
1527            _ => {
1528                payload.parent = Some(crate::introspect::NodeRef::Proc(proc_id.clone()));
1529            }
1530        }
1531
1532        Ok(payload)
1533    }
1534}
1535
1536/// Build the Axum router for the mesh admin HTTP server.
1537///
1538/// Routes:
1539/// - `GET /v1/schema` — JSON Schema (Draft 2020-12) for `NodePayload`.
1540/// - `GET /v1/schema/error` — JSON Schema for `ApiErrorEnvelope`.
1541/// - `GET /v1/openapi.json` — OpenAPI 3.1 spec (embeds JSON Schemas).
1542/// - `GET /v1/tree` — ASCII topology dump.
1543/// - `POST /v1/query` — proxy SQL query to the dashboard server.
1544/// - `GET /v1/pyspy/{*proc_reference}` — py-spy stack dump for a proc.
1545/// - `POST /v1/pyspy_dump/{*proc_reference}` — py-spy dump + store in Datafusion.
1546/// - `POST /v1/pyspy_profile_svg/{*proc_reference}` — py-spy profile → SVG flamegraph.
1547/// - `GET /v1/config/{*proc_reference}` — config snapshot for a proc.
1548/// - `GET /v1/admin` — admin self-identification (`AdminInfo`).
1549/// - `GET /v1/{*reference}` — JSON `NodePayload` for a single reference.
1550/// - `GET /SKILL.md` — agent-facing API documentation (markdown).
1551fn create_mesh_admin_router(bridge_state: Arc<BridgeState>) -> Router {
1552    Router::new()
1553        .route("/SKILL.md", get(serve_skill_md))
1554        // Literal paths matched by specificity before wildcard (SC-5).
1555        .route("/v1/admin", get(serve_admin_info))
1556        .route("/v1/schema", get(serve_schema))
1557        .route("/v1/schema/admin", get(serve_admin_schema))
1558        .route("/v1/schema/error", get(serve_error_schema))
1559        .route("/v1/openapi.json", get(serve_openapi))
1560        .route("/v1/tree", get(tree_dump))
1561        .route("/v1/query", post(query_proxy))
1562        .route("/v1/pyspy/{*proc_reference}", get(pyspy_bridge))
1563        .route(
1564            "/v1/pyspy_dump/{*proc_reference}",
1565            post(pyspy_dump_and_store),
1566        )
1567        .route(
1568            "/v1/pyspy_profile_svg/{*proc_reference}",
1569            post(pyspy_profile_svg),
1570        )
1571        .route("/v1/config/{*proc_reference}", get(config_bridge))
1572        .route("/v1/{*reference}", get(resolve_reference_bridge))
1573        .with_state(bridge_state)
1574}
1575
1576/// Raw markdown template for the SKILL.md API document.
1577const SKILL_MD_TEMPLATE: &str = include_str!("mesh_admin_skill.md");
1578
1579/// Extract base URL from request headers.
1580///
1581/// Defaults to `https` when `x-forwarded-proto` is absent — the
1582/// admin server uses TLS in production, so `http` is the wrong
1583/// default for direct connections.
1584fn extract_base_url(headers: &axum::http::HeaderMap) -> String {
1585    let host = headers
1586        .get(axum::http::header::HOST)
1587        .and_then(|v| v.to_str().ok())
1588        .unwrap_or("localhost");
1589    let scheme = headers
1590        .get("x-forwarded-proto")
1591        .and_then(|v| v.to_str().ok())
1592        .unwrap_or("https");
1593    format!("{scheme}://{host}")
1594}
1595
1596/// Self-identification endpoint: returns `AdminInfo` (AI-1..AI-3;
1597/// AI-4 is a constructor guarantee of `AdminInfo::new()`).
1598async fn serve_admin_info(
1599    State(state): State<Arc<BridgeState>>,
1600) -> axum::response::Json<AdminInfo> {
1601    axum::response::Json(state.admin_info.clone())
1602}
1603
1604/// JSON Schema for `AdminInfo`.
1605async fn serve_admin_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1606    Ok(axum::response::Json(schema_with_id::<AdminInfo>(
1607        "https://monarch.meta.com/schemas/v1/admin_info",
1608    )?))
1609}
1610
1611/// Serves the self-describing API document with the base URL
1612/// interpolated so examples are copy-pasteable.
1613async fn serve_skill_md(headers: axum::http::HeaderMap) -> impl axum::response::IntoResponse {
1614    let base = extract_base_url(&headers);
1615    let body = SKILL_MD_TEMPLATE.replace("{base}", &base);
1616    (
1617        [(
1618            axum::http::header::CONTENT_TYPE,
1619            "text/markdown; charset=utf-8",
1620        )],
1621        body,
1622    )
1623}
1624
1625/// Build a JSON Schema value with a `$id` field.
1626fn schema_with_id<T: schemars::JsonSchema>(id: &str) -> Result<serde_json::Value, ApiError> {
1627    let schema = schemars::schema_for!(T);
1628    let mut value = serde_json::to_value(schema).map_err(|e| ApiError {
1629        code: "internal_error".to_string(),
1630        message: format!("failed to serialize schema: {e}"),
1631        details: None,
1632    })?;
1633    if let Some(obj) = value.as_object_mut() {
1634        obj.insert("$id".into(), serde_json::Value::String(id.into()));
1635    }
1636    Ok(value)
1637}
1638
1639/// JSON Schema for the `NodePayload` response type.
1640async fn serve_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1641    Ok(axum::response::Json(schema_with_id::<NodePayloadDto>(
1642        "https://monarch.meta.com/schemas/v1/node_payload",
1643    )?))
1644}
1645
1646/// JSON Schema for the `ApiErrorEnvelope` error response.
1647async fn serve_error_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1648    Ok(axum::response::Json(schema_with_id::<ApiErrorEnvelope>(
1649        "https://monarch.meta.com/schemas/v1/error",
1650    )?))
1651}
1652
1653/// Hoist `$defs` from a schemars-generated schema into a shared
1654/// map and rewrite internal `$ref` pointers from `#/$defs/X` to
1655/// `#/components/schemas/X` so OpenAPI tools can resolve them.
1656fn hoist_defs(
1657    schema: &mut serde_json::Value,
1658    shared: &mut serde_json::Map<String, serde_json::Value>,
1659) {
1660    if let Some(obj) = schema.as_object_mut() {
1661        if let Some(defs) = obj.remove("$defs")
1662            && let Some(defs_map) = defs.as_object()
1663        {
1664            for (k, v) in defs_map {
1665                shared.insert(k.clone(), v.clone());
1666            }
1667        }
1668        // Also remove $schema from embedded schemas — it's
1669        // only valid at the root of a JSON Schema document,
1670        // not inside an OpenAPI components/schemas entry.
1671        obj.remove("$schema");
1672    }
1673    rewrite_refs(schema);
1674}
1675
1676/// Recursively rewrite `$ref: "#/$defs/X"` →
1677/// `$ref: "#/components/schemas/X"`.
1678fn rewrite_refs(value: &mut serde_json::Value) {
1679    match value {
1680        serde_json::Value::Object(map) => {
1681            if let Some(serde_json::Value::String(r)) = map.get_mut("$ref")
1682                && r.starts_with("#/$defs/")
1683            {
1684                *r = r.replace("#/$defs/", "#/components/schemas/");
1685            }
1686            for v in map.values_mut() {
1687                rewrite_refs(v);
1688            }
1689        }
1690        serde_json::Value::Array(arr) => {
1691            for v in arr {
1692                rewrite_refs(v);
1693            }
1694        }
1695        _ => {}
1696    }
1697}
1698
1699/// Build the OpenAPI 3.1 spec, embedding schemars-derived JSON
1700/// Schemas into `components/schemas`.
1701pub fn build_openapi_spec() -> serde_json::Value {
1702    let mut node_schema = serde_json::to_value(schemars::schema_for!(NodePayloadDto))
1703        .expect("NodePayload schema must be serializable");
1704    let mut error_schema = serde_json::to_value(schemars::schema_for!(ApiErrorEnvelope))
1705        .expect("ApiErrorEnvelope schema must be serializable");
1706    let mut pyspy_schema = serde_json::to_value(schemars::schema_for!(PySpyResult))
1707        .expect("PySpyResult schema must be serializable");
1708    let mut query_request_schema = serde_json::to_value(schemars::schema_for!(QueryRequest))
1709        .expect("QueryRequest schema must be serializable");
1710    let mut query_response_schema = serde_json::to_value(schemars::schema_for!(QueryResponse))
1711        .expect("QueryResponse schema must be serializable");
1712    let mut pyspy_dump_response_schema =
1713        serde_json::to_value(schemars::schema_for!(PyspyDumpAndStoreResponse))
1714            .expect("PyspyDumpAndStoreResponse schema must be serializable");
1715    let mut admin_info_schema = serde_json::to_value(schemars::schema_for!(AdminInfo))
1716        .expect("AdminInfo schema must be serializable");
1717    let mut profile_opts_schema = serde_json::to_value(schemars::schema_for!(PySpyProfileOpts))
1718        .expect("PySpyProfileOpts schema must be serializable");
1719
1720    // Hoist $defs into a shared components/schemas map so
1721    // OpenAPI tools can resolve references.
1722    let mut shared_schemas = serde_json::Map::new();
1723    hoist_defs(&mut node_schema, &mut shared_schemas);
1724    hoist_defs(&mut error_schema, &mut shared_schemas);
1725    hoist_defs(&mut pyspy_schema, &mut shared_schemas);
1726    hoist_defs(&mut query_request_schema, &mut shared_schemas);
1727    hoist_defs(&mut query_response_schema, &mut shared_schemas);
1728    hoist_defs(&mut pyspy_dump_response_schema, &mut shared_schemas);
1729    hoist_defs(&mut admin_info_schema, &mut shared_schemas);
1730    hoist_defs(&mut profile_opts_schema, &mut shared_schemas);
1731    shared_schemas.insert("NodePayload".into(), node_schema);
1732    shared_schemas.insert("ApiErrorEnvelope".into(), error_schema);
1733    shared_schemas.insert("PySpyResult".into(), pyspy_schema);
1734    shared_schemas.insert("QueryRequest".into(), query_request_schema);
1735    shared_schemas.insert("QueryResponse".into(), query_response_schema);
1736    shared_schemas.insert(
1737        "PyspyDumpAndStoreResponse".into(),
1738        pyspy_dump_response_schema,
1739    );
1740    shared_schemas.insert("AdminInfo".into(), admin_info_schema);
1741    shared_schemas.insert("PySpyProfileOpts".into(), profile_opts_schema);
1742
1743    // Rewrite any remaining $defs refs in the hoisted component schemas.
1744    for value in shared_schemas.values_mut() {
1745        rewrite_refs(value);
1746    }
1747
1748    let error_response = |desc: &str| -> serde_json::Value {
1749        serde_json::json!({
1750            "description": desc,
1751            "content": {
1752                "application/json": {
1753                    "schema": { "$ref": "#/components/schemas/ApiErrorEnvelope" }
1754                }
1755            }
1756        })
1757    };
1758
1759    let success_payload = serde_json::json!({
1760        "description": "Resolved NodePayload",
1761        "content": {
1762            "application/json": {
1763                "schema": { "$ref": "#/components/schemas/NodePayload" }
1764            }
1765        }
1766    });
1767
1768    let mut spec = serde_json::json!({
1769        "openapi": "3.1.0",
1770        "info": {
1771            "title": "Monarch Mesh Admin API",
1772            "version": "1.0.0",
1773            "description": "Address-walking introspection API for a Monarch actor mesh. See the Admin Gateway Pattern RFC."
1774        },
1775        "paths": {
1776            "/v1/root": {
1777                "get": {
1778                    "summary": "Fetch root node",
1779                    "operationId": "getRoot",
1780                    "responses": {
1781                        "200": success_payload,
1782                        "500": error_response("Internal error"),
1783                        "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1784                        "504": error_response("Gateway timeout (downstream host unresponsive)")
1785                    }
1786                }
1787            },
1788            "/v1/{reference}": {
1789                "get": {
1790                    "summary": "Resolve a reference to a NodePayload",
1791                    "operationId": "resolveReference",
1792                    "parameters": [{
1793                        "name": "reference",
1794                        "in": "path",
1795                        "required": true,
1796                        "description": "URL-encoded opaque reference string",
1797                        "schema": { "type": "string" }
1798                    }],
1799                    "responses": {
1800                        "200": success_payload,
1801                        "400": error_response("Bad request (malformed reference)"),
1802                        "404": error_response("Address not found"),
1803                        "500": error_response("Internal error"),
1804                        "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1805                        "504": error_response("Gateway timeout (downstream host unresponsive)")
1806                    }
1807                }
1808            },
1809            "/v1/schema": {
1810                "get": {
1811                    "summary": "JSON Schema for NodePayload (Draft 2020-12)",
1812                    "operationId": "getSchema",
1813                    "responses": {
1814                        "200": {
1815                            "description": "JSON Schema document",
1816                            "content": { "application/json": {} }
1817                        }
1818                    }
1819                }
1820            },
1821            "/v1/schema/error": {
1822                "get": {
1823                    "summary": "JSON Schema for ApiErrorEnvelope (Draft 2020-12)",
1824                    "operationId": "getErrorSchema",
1825                    "responses": {
1826                        "200": {
1827                            "description": "JSON Schema document",
1828                            "content": { "application/json": {} }
1829                        }
1830                    }
1831                }
1832            },
1833            "/v1/admin": {
1834                "get": {
1835                    "summary": "Admin self-identification (placement, identity, URL)",
1836                    "operationId": "getAdminInfo",
1837                    "description": "Returns the admin actor's identity, proc placement, hostname, and URL. Used for placement verification and operational discovery.",
1838                    "responses": {
1839                        "200": {
1840                            "description": "AdminInfo — admin actor placement metadata",
1841                            "content": {
1842                                "application/json": {
1843                                    "schema": { "$ref": "#/components/schemas/AdminInfo" }
1844                                }
1845                            }
1846                        }
1847                    }
1848                }
1849            },
1850            "/v1/tree": {
1851                "get": {
1852                    "summary": "ASCII topology dump (debug)",
1853                    "operationId": "getTree",
1854                    "responses": {
1855                        "200": {
1856                            "description": "Human-readable topology tree",
1857                            "content": { "text/plain": {} }
1858                        }
1859                    }
1860                }
1861            },
1862            "/v1/config/{proc_reference}": {
1863                "get": {
1864                    "summary": "Config snapshot for a proc",
1865                    "operationId": "getConfig",
1866                    "description": "Returns the effective CONFIG-marked configuration entries from the target process. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1867                    "parameters": [{
1868                        "name": "proc_reference",
1869                        "in": "path",
1870                        "required": true,
1871                        "description": "URL-encoded proc reference (ProcAddr)",
1872                        "schema": { "type": "string" }
1873                    }],
1874                    "responses": {
1875                        "200": {
1876                            "description": "ConfigDumpResult — sorted list of config entries",
1877                            "content": {
1878                                "application/json": {
1879                                    "schema": {
1880                                        "type": "object",
1881                                        "properties": {
1882                                            "entries": {
1883                                                "type": "array",
1884                                                "items": {
1885                                                    "type": "object",
1886                                                    "properties": {
1887                                                        "name": { "type": "string" },
1888                                                        "value": { "type": "string" },
1889                                                        "default_value": { "type": ["string", "null"] },
1890                                                        "source": { "type": "string" },
1891                                                        "changed_from_default": { "type": "boolean" },
1892                                                        "env_var": { "type": ["string", "null"] }
1893                                                    }
1894                                                }
1895                                            }
1896                                        }
1897                                    }
1898                                }
1899                            }
1900                        },
1901                        "404": error_response("Proc not found or handler not reachable"),
1902                        "500": error_response("Internal error"),
1903                        "504": error_response("Gateway timeout")
1904                    }
1905                }
1906            },
1907            "/v1/pyspy/{proc_reference}": {
1908                "get": {
1909                    "summary": "Py-spy stack dump for a proc",
1910                    "operationId": "getPyspy",
1911                    "description": "Runs py-spy against the target process and returns structured stack traces. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1912                    "parameters": [{
1913                        "name": "proc_reference",
1914                        "in": "path",
1915                        "required": true,
1916                        "description": "URL-encoded proc reference (ProcAddr)",
1917                        "schema": { "type": "string" }
1918                    }],
1919                    "responses": {
1920                        "200": {
1921                            "description": "PySpyResult — one of Ok, BinaryNotFound, or Failed",
1922                            "content": {
1923                                "application/json": {
1924                                    "schema": { "$ref": "#/components/schemas/PySpyResult" }
1925                                }
1926                            }
1927                        },
1928                        "400": error_response("Bad request (malformed proc reference)"),
1929                        "404": error_response("Proc not found or handler not reachable"),
1930                        "500": error_response("Internal error"),
1931                        "504": error_response("Gateway timeout")
1932                    }
1933                }
1934            },
1935            "/v1/query": {
1936                "post": {
1937                    "summary": "Proxy SQL query to the telemetry dashboard",
1938                    "operationId": "queryProxy",
1939                    "description": "Forwards a SQL query to the Monarch dashboard's DataFusion engine. Requires telemetry_url to be configured.",
1940                    "requestBody": {
1941                        "required": true,
1942                        "content": {
1943                            "application/json": {
1944                                "schema": { "$ref": "#/components/schemas/QueryRequest" }
1945                            }
1946                        }
1947                    },
1948                    "responses": {
1949                        "200": {
1950                            "description": "Query results",
1951                            "content": {
1952                                "application/json": {
1953                                    "schema": { "$ref": "#/components/schemas/QueryResponse" }
1954                                }
1955                            }
1956                        },
1957                        "400": error_response("Bad request (invalid SQL or missing sql field)"),
1958                        "404": error_response("Dashboard not configured"),
1959                        "500": error_response("Internal error"),
1960                        "504": error_response("Gateway timeout")
1961                    }
1962                }
1963            },
1964            "/v1/pyspy_dump/{proc_reference}": {
1965                "post": {
1966                    "summary": "Trigger py-spy dump and store in telemetry",
1967                    "operationId": "pyspyDumpAndStore",
1968                    "description": "Runs py-spy against the target process, stores the result in the dashboard's DataFusion pyspy tables, and returns the dump_id.",
1969                    "parameters": [{
1970                        "name": "proc_reference",
1971                        "in": "path",
1972                        "required": true,
1973                        "description": "URL-encoded proc reference (ProcAddr)",
1974                        "schema": { "type": "string" }
1975                    }],
1976                    "responses": {
1977                        "200": {
1978                            "description": "Dump stored successfully",
1979                            "content": {
1980                                "application/json": {
1981                                    "schema": { "$ref": "#/components/schemas/PyspyDumpAndStoreResponse" }
1982                                }
1983                            }
1984                        },
1985                        "400": error_response("Bad request (malformed proc reference)"),
1986                        "404": error_response("Proc or dashboard not found"),
1987                        "500": error_response("Internal error"),
1988                        "504": error_response("Gateway timeout")
1989                    }
1990                }
1991            }
1992        },
1993        "components": {
1994            "schemas": serde_json::Value::Object(shared_schemas)
1995        }
1996    });
1997
1998    // Insert paths outside the json! macro to avoid hitting the
1999    // serde_json recursion limit.
2000    if let Some(paths) = spec.pointer_mut("/paths").and_then(|v| v.as_object_mut()) {
2001        paths.insert(
2002            "/v1/schema/admin".into(),
2003            serde_json::json!({
2004                "get": {
2005                    "summary": "JSON Schema for AdminInfo (Draft 2020-12)",
2006                    "operationId": "getAdminSchema",
2007                    "responses": {
2008                        "200": {
2009                            "description": "JSON Schema document",
2010                            "content": { "application/json": {} }
2011                        }
2012                    }
2013                }
2014            }),
2015        );
2016        paths.insert(
2017            "/v1/pyspy_profile_svg/{proc_reference}".into(),
2018            serde_json::json!({
2019                "post": {
2020                    "summary": "Profile a proc and return SVG flamegraph",
2021                    "operationId": "pyspyProfileSvg",
2022                    "description": "Runs py-spy record against the target process for the requested duration and returns an SVG flamegraph. Timeout scales with duration_s.",
2023                    "parameters": [{
2024                        "name": "proc_reference",
2025                        "in": "path",
2026                        "required": true,
2027                        "description": "URL-encoded proc reference (ProcAddr)",
2028                        "schema": { "type": "string" }
2029                    }],
2030                    "requestBody": {
2031                        "required": true,
2032                        "content": {
2033                            "application/json": {
2034                                "schema": { "$ref": "#/components/schemas/PySpyProfileOpts" }
2035                            }
2036                        }
2037                    },
2038                    "responses": {
2039                        "200": {
2040                            "description": "SVG flamegraph",
2041                            "content": { "image/svg+xml": {} }
2042                        },
2043                        "400": error_response("Bad request (invalid duration/rate or malformed proc reference)"),
2044                        "404": error_response("Proc not found or handler not reachable"),
2045                        "500": error_response("Internal error (profile failed or SVG generation failed)"),
2046                        "503": error_response("Service unavailable (py-spy not available on target host)"),
2047                        "504": error_response("Gateway timeout (subprocess timed out)")
2048                    }
2049                }
2050            }),
2051        );
2052    }
2053
2054    spec
2055}
2056
2057/// OpenAPI 3.1 spec for the mesh admin API.
2058async fn serve_openapi() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
2059    Ok(axum::response::Json(build_openapi_spec()))
2060}
2061
2062/// Validate and parse a raw proc reference path segment into a
2063/// decoded reference string and `ProcAddr`. Extracted for testability.
2064fn parse_proc_reference(raw: &str) -> Result<(String, ProcAddr), ApiError> {
2065    let trimmed = raw.trim_start_matches('/');
2066    if trimmed.is_empty() {
2067        return Err(ApiError::bad_request("empty proc reference", None));
2068    }
2069    let decoded = urlencoding::decode(trimmed)
2070        .map(|cow| cow.into_owned())
2071        .map_err(|_| {
2072            ApiError::bad_request(
2073                "malformed percent-encoding: decoded bytes are not valid UTF-8",
2074                None,
2075            )
2076        })?;
2077    let proc_id: ProcAddr = decoded
2078        .parse()
2079        .map_err(|e| ApiError::bad_request(format!("invalid proc reference: {}", e), None))?;
2080    Ok((decoded, proc_id))
2081}
2082
2083/// Probe whether an actor is reachable by sending a lightweight
2084/// introspect query bounded by `MESH_ADMIN_QUERY_CHILD_TIMEOUT`.
2085///
2086/// Returns `Ok(true)` if the actor responds, `Ok(false)` if the
2087/// actor is absent or unresponsive (timeout / recv error).
2088async fn probe_actor(
2089    cx: &Instance<()>,
2090    agent_id: &hyperactor::ActorAddr,
2091) -> Result<bool, ApiError> {
2092    let port = hyperactor::PortRef::<IntrospectMessage>::attest_handler_port(agent_id);
2093    let (handle, rx) = open_once_port::<IntrospectResult>(cx);
2094    port.post(
2095        cx,
2096        IntrospectMessage::Query {
2097            view: IntrospectView::Entity,
2098            reply: handle.bind(),
2099        },
2100    );
2101
2102    let timeout = hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT);
2103    match tokio::time::timeout(timeout, rx.recv()).await {
2104        Ok(Ok(_)) => Ok(true),
2105        Ok(Err(e)) => {
2106            tracing::debug!(
2107                name = "pyspy_probe_recv_failed",
2108                %agent_id,
2109                error = %e,
2110            );
2111            Ok(false)
2112        }
2113        Err(_elapsed) => {
2114            tracing::debug!(
2115                name = "pyspy_probe_timeout",
2116                %agent_id,
2117            );
2118            Ok(false)
2119        }
2120    }
2121}
2122
2123/// Core py-spy dump logic shared by `pyspy_bridge` and
2124/// `pyspy_dump_and_store`.
2125///
2126/// Typed proc-handler target. Private to this module. The single
2127/// minting point is `route_proc_handler` via `ActorRef::attest`.
2128/// After minting, all sends go through typed `ActorRef::send`.
2129enum ResolvedProcHandler {
2130    Host(ActorRef<HostAgent>),
2131    Proc(ActorRef<ProcAgent>),
2132}
2133
2134impl ResolvedProcHandler {
2135    fn agent_id(&self) -> hyperactor::ActorAddr {
2136        match self {
2137            Self::Host(r) => r.actor_addr().clone(),
2138            Self::Proc(r) => r.actor_addr().clone(),
2139        }
2140    }
2141
2142    async fn pyspy_dump(
2143        &self,
2144        cx: &impl hyperactor::context::Actor,
2145        opts: PySpyOpts,
2146        timeout: std::time::Duration,
2147    ) -> Result<PySpyResult, ApiError> {
2148        let (reply_handle, reply_rx) = open_once_port::<PySpyResult>(cx);
2149        let mut reply_ref = reply_handle.bind();
2150        reply_ref.return_undeliverable(false);
2151        let msg = PySpyDump {
2152            opts,
2153            result: reply_ref,
2154        };
2155        match self {
2156            Self::Host(r) => r.post(cx, msg),
2157            Self::Proc(r) => r.post(cx, msg),
2158        };
2159        tokio::time::timeout(timeout, reply_rx.recv())
2160            .await
2161            .map_err(|_| ApiError {
2162                code: "gateway_timeout".to_string(),
2163                message: "timed out waiting for py-spy dump".to_string(),
2164                details: None,
2165            })?
2166            .map_err(|e| ApiError {
2167                code: "internal_error".to_string(),
2168                message: format!("failed to receive PySpyResult: {}", e),
2169                details: None,
2170            })
2171    }
2172
2173    async fn pyspy_profile(
2174        &self,
2175        cx: &impl hyperactor::context::Actor,
2176        request: ValidatedProfileRequest,
2177        timeout: std::time::Duration,
2178    ) -> Result<PySpyProfileResult, ApiError> {
2179        let (reply_handle, reply_rx) = open_once_port::<PySpyProfileResult>(cx);
2180        let mut reply_ref = reply_handle.bind();
2181        reply_ref.return_undeliverable(false);
2182        let msg = PySpyProfile {
2183            request,
2184            result: reply_ref,
2185        };
2186        match self {
2187            Self::Host(r) => r.post(cx, msg),
2188            Self::Proc(r) => r.post(cx, msg),
2189        };
2190        tokio::time::timeout(timeout, reply_rx.recv())
2191            .await
2192            .map_err(|_| ApiError {
2193                code: "gateway_timeout".to_string(),
2194                message: "timed out waiting for py-spy profile".to_string(),
2195                details: None,
2196            })?
2197            .map_err(|e| ApiError {
2198                code: "internal_error".to_string(),
2199                message: format!("failed to receive PySpyProfileResult: {}", e),
2200                details: None,
2201            })
2202    }
2203
2204    async fn config_dump(
2205        &self,
2206        cx: &impl hyperactor::context::Actor,
2207        timeout: std::time::Duration,
2208    ) -> Result<ConfigDumpResult, ApiError> {
2209        let (reply_handle, reply_rx) = open_once_port::<ConfigDumpResult>(cx);
2210        let mut reply_ref = reply_handle.bind();
2211        reply_ref.return_undeliverable(false);
2212        let msg = ConfigDump { result: reply_ref };
2213        match self {
2214            Self::Host(r) => r.post(cx, msg),
2215            Self::Proc(r) => r.post(cx, msg),
2216        };
2217        tokio::time::timeout(timeout, reply_rx.recv())
2218            .await
2219            .map_err(|_| ApiError {
2220                code: "gateway_timeout".to_string(),
2221                message: "timed out waiting for config dump".to_string(),
2222                details: None,
2223            })?
2224            .map_err(|e| ApiError {
2225                code: "internal_error".to_string(),
2226                message: format!("failed to receive ConfigDumpResult: {}", e),
2227                details: None,
2228            })
2229    }
2230}
2231
2232/// Parse + route + attest. No probe. The single `ActorRef::attest`
2233/// minting point. Used by `config_bridge` which intentionally skips
2234/// the probe (CFG-4).
2235fn route_proc_handler(raw_proc_reference: &str) -> Result<ResolvedProcHandler, ApiError> {
2236    let (_proc_reference, proc_id) = parse_proc_reference(raw_proc_reference)?;
2237    let is_service = proc_id
2238        .uid()
2239        .as_singleton()
2240        .is_some_and(|label| label.as_str() == SERVICE_PROC_NAME);
2241    if is_service {
2242        let agent_id = proc_id.actor_addr(HOST_MESH_AGENT_ACTOR_NAME);
2243        Ok(ResolvedProcHandler::Host(ActorRef::attest(agent_id)))
2244    } else {
2245        let agent_id = proc_id.actor_addr(PROC_AGENT_ACTOR_NAME);
2246        Ok(ResolvedProcHandler::Proc(ActorRef::attest(agent_id)))
2247    }
2248}
2249
2250/// Parse + route + attest + probe (PS-13).
2251async fn resolve_proc_handler(
2252    state: &BridgeState,
2253    raw_proc_reference: &str,
2254) -> Result<ResolvedProcHandler, ApiError> {
2255    let handler = route_proc_handler(raw_proc_reference)?;
2256    let cx = &state.bridge_cx;
2257    if !probe_actor(cx, &handler.agent_id()).await? {
2258        return Err(ApiError::not_found(
2259            format!(
2260                "proc does not have a reachable handler ({})",
2261                raw_proc_reference,
2262            ),
2263            None,
2264        ));
2265    }
2266    Ok(handler)
2267}
2268
2269async fn do_pyspy_dump(
2270    state: &BridgeState,
2271    raw_proc_reference: &str,
2272) -> Result<PySpyResult, ApiError> {
2273    let handler = resolve_proc_handler(state, raw_proc_reference).await?;
2274    let timeout = hyperactor_config::global::get(crate::config::MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT);
2275    handler
2276        .pyspy_dump(
2277            &state.bridge_cx,
2278            PySpyOpts {
2279                threads: false,
2280                native: true,
2281                native_all: true,
2282                nonblocking: false,
2283            },
2284            timeout,
2285        )
2286        .await
2287}
2288
2289/// HTTP bridge for py-spy stack dump requests.
2290///
2291/// Parses the proc reference, routes to the appropriate actor
2292/// (ProcAgent on worker procs, HostAgent on the service proc),
2293/// probes for reachability, and sends `PySpyDump` directly.
2294/// See PS-12, PS-13 in `introspect` module doc.
2295async fn pyspy_bridge(
2296    State(state): State<Arc<BridgeState>>,
2297    AxumPath(proc_reference): AxumPath<String>,
2298) -> Result<Json<PySpyResult>, ApiError> {
2299    Ok(Json(do_pyspy_dump(&state, &proc_reference).await?))
2300}
2301
2302async fn do_pyspy_profile(
2303    state: &BridgeState,
2304    raw_proc_reference: &str,
2305    opts: PySpyProfileOpts,
2306) -> Result<PySpyProfileResult, ApiError> {
2307    let max_duration =
2308        hyperactor_config::global::get(crate::config::MESH_ADMIN_PYSPY_MAX_PROFILE_DURATION);
2309    let request = ValidatedProfileRequest::try_new(&opts, max_duration)
2310        .map_err(|msg| ApiError::bad_request(msg, None))?;
2311    let bridge_timeout = request.bridge_timeout();
2312    let handler = resolve_proc_handler(state, raw_proc_reference).await?;
2313    handler
2314        .pyspy_profile(&state.bridge_cx, request, bridge_timeout)
2315        .await
2316}
2317
2318/// HTTP bridge for py-spy profile SVG requests.
2319///
2320/// Accepts `PySpyProfileOpts` as JSON POST body, profiles the target
2321/// process, and returns raw SVG.
2322async fn pyspy_profile_svg(
2323    State(state): State<Arc<BridgeState>>,
2324    AxumPath(proc_reference): AxumPath<String>,
2325    Json(opts): Json<PySpyProfileOpts>,
2326) -> Result<axum::response::Response, ApiError> {
2327    let result = do_pyspy_profile(&state, &proc_reference, opts).await?;
2328    match result {
2329        PySpyProfileResult::Ok { svg, .. } => Ok(axum::response::Response::builder()
2330            .header("content-type", "image/svg+xml")
2331            .body(axum::body::Body::from(svg))
2332            .unwrap()),
2333        PySpyProfileResult::BinaryNotFound { searched } => Err(ApiError {
2334            code: "service_unavailable".to_string(),
2335            message: format!(
2336                "py-spy not available on target host; searched: {}",
2337                searched.join(", ")
2338            ),
2339            details: None,
2340        }),
2341        PySpyProfileResult::TimedOut {
2342            timeout_s, stderr, ..
2343        } => Err(ApiError {
2344            code: "gateway_timeout".to_string(),
2345            message: format!(
2346                "py-spy record subprocess timed out after {}s: {}",
2347                timeout_s,
2348                stderr.trim()
2349            ),
2350            details: None,
2351        }),
2352        PySpyProfileResult::ExitFailure { stderr, .. } => Err(ApiError {
2353            code: "profile_failed".to_string(),
2354            message: stderr,
2355            details: None,
2356        }),
2357        PySpyProfileResult::OutputMissing { pid, binary } => Err(ApiError {
2358            code: "profile_output_unusable".to_string(),
2359            message: format!("py-spy exited 0 but SVG file is missing (pid {pid}, {binary})"),
2360            details: None,
2361        }),
2362        PySpyProfileResult::OutputEmpty { pid, binary } => Err(ApiError {
2363            code: "profile_output_unusable".to_string(),
2364            message: format!("py-spy exited 0 but SVG output is empty (pid {pid}, {binary})"),
2365            details: None,
2366        }),
2367        PySpyProfileResult::OutputReadFailure { error, .. } => Err(ApiError {
2368            code: "internal_error".to_string(),
2369            message: format!("failed to read SVG output: {error}"),
2370            details: None,
2371        }),
2372        PySpyProfileResult::WorkerSpawnFailure { error } => Err(ApiError {
2373            code: "internal_error".to_string(),
2374            message: format!("failed to spawn profile worker actor: {error}"),
2375            details: None,
2376        }),
2377        PySpyProfileResult::SubprocessSpawnFailure { error, .. } => Err(ApiError {
2378            code: "internal_error".to_string(),
2379            message: format!("failed to execute py-spy: {error}"),
2380            details: None,
2381        }),
2382        PySpyProfileResult::WaitFailure { error, .. } => Err(ApiError {
2383            code: "internal_error".to_string(),
2384            message: format!("failed to wait for child: {error}"),
2385            details: None,
2386        }),
2387        PySpyProfileResult::TempDirFailure { error, .. } => Err(ApiError {
2388            code: "internal_error".to_string(),
2389            message: format!("failed to create temp dir: {error}"),
2390            details: None,
2391        }),
2392    }
2393}
2394
2395/// Request body for `POST /v1/query`.
2396#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2397pub struct QueryRequest {
2398    /// SQL query string.
2399    pub sql: String,
2400}
2401
2402/// Response body from `POST /v1/query`.
2403#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2404pub struct QueryResponse {
2405    /// Query result rows.
2406    pub rows: serde_json::Value,
2407}
2408
2409/// Request body sent to the dashboard's `/api/pyspy_dump` endpoint.
2410#[derive(Debug, Serialize)]
2411struct StorePyspyDumpRequest {
2412    dump_id: String,
2413    proc_ref: String,
2414    pyspy_result_json: String,
2415}
2416
2417/// Response body from `POST /v1/pyspy_dump/{*proc_reference}`.
2418#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2419pub struct PyspyDumpAndStoreResponse {
2420    /// Unique identifier for the stored dump.
2421    pub dump_id: String,
2422}
2423
2424/// Resolve the telemetry URL from bridge state, returning an
2425/// `ApiError` if not configured.
2426fn require_telemetry_url(state: &BridgeState) -> Result<&str, ApiError> {
2427    state.telemetry_url.as_deref().ok_or_else(|| {
2428        ApiError::not_found("dashboard not configured (no telemetry_url provided)", None)
2429    })
2430}
2431
2432/// Proxy SQL queries to the Monarch dashboard's `/api/query`
2433/// endpoint.
2434///
2435/// Requires `telemetry_url` to be set. The request body must
2436/// contain a `sql` field. The dashboard response rows are returned
2437/// verbatim.
2438async fn query_proxy(
2439    State(state): State<Arc<BridgeState>>,
2440    axum::Json(body): axum::Json<QueryRequest>,
2441) -> Result<axum::Json<QueryResponse>, ApiError> {
2442    let telemetry_url = require_telemetry_url(&state)?;
2443
2444    let resp = state
2445        .http_client
2446        .post(format!("{}/api/query", telemetry_url))
2447        .json(&body)
2448        .send()
2449        .await
2450        .map_err(|e| ApiError {
2451            code: "proxy_error".to_string(),
2452            message: format!("failed to proxy query to dashboard: {}", e),
2453            details: None,
2454        })?;
2455
2456    let status = resp.status();
2457    let resp_body = resp.bytes().await.map_err(|e| ApiError {
2458        code: "proxy_error".to_string(),
2459        message: format!("failed to read dashboard response: {}", e),
2460        details: None,
2461    })?;
2462
2463    if !status.is_success() {
2464        // Try to extract error message from dashboard response.
2465        let msg = serde_json::from_slice::<serde_json::Value>(&resp_body)
2466            .ok()
2467            .and_then(|v| v.get("error")?.as_str().map(String::from))
2468            .unwrap_or_else(|| format!("dashboard returned HTTP {status}"));
2469        let code = if status.is_client_error() {
2470            "bad_request"
2471        } else {
2472            "proxy_error"
2473        };
2474        return Err(ApiError {
2475            code: code.to_string(),
2476            message: msg,
2477            details: None,
2478        });
2479    }
2480
2481    let result: QueryResponse = serde_json::from_slice(&resp_body).map_err(|e| ApiError {
2482        code: "proxy_error".to_string(),
2483        message: format!("failed to parse dashboard response: {}", e),
2484        details: None,
2485    })?;
2486
2487    Ok(axum::Json(result))
2488}
2489
2490/// Trigger a py-spy dump and store the result in the dashboard's
2491/// DataFusion pyspy tables.
2492///
2493/// 1. Performs a py-spy dump via `do_pyspy_dump` (same as
2494///    `pyspy_bridge`).
2495/// 2. POSTs the serialized result to the dashboard's
2496///    `/api/pyspy_dump` endpoint for persistent storage.
2497/// 3. Returns the generated dump id.
2498async fn pyspy_dump_and_store(
2499    State(state): State<Arc<BridgeState>>,
2500    AxumPath(proc_reference): AxumPath<String>,
2501) -> Result<axum::Json<PyspyDumpAndStoreResponse>, ApiError> {
2502    let telemetry_url = require_telemetry_url(&state)?;
2503    let pyspy_result = do_pyspy_dump(&state, &proc_reference).await?;
2504
2505    let dump_id = uuid::Uuid::new_v4().to_string();
2506    let pyspy_json = serde_json::to_string(&pyspy_result).map_err(|e| ApiError {
2507        code: "internal_error".to_string(),
2508        message: format!("failed to serialize PySpyResult: {}", e),
2509        details: None,
2510    })?;
2511
2512    let store_body = StorePyspyDumpRequest {
2513        dump_id: dump_id.clone(),
2514        proc_ref: proc_reference,
2515        pyspy_result_json: pyspy_json,
2516    };
2517
2518    let store_resp = state
2519        .http_client
2520        .post(format!("{}/api/pyspy_dump", telemetry_url))
2521        .json(&store_body)
2522        .send()
2523        .await
2524        .map_err(|e| ApiError {
2525            code: "proxy_error".to_string(),
2526            message: format!("failed to store pyspy dump in dashboard: {}", e),
2527            details: None,
2528        })?;
2529
2530    if !store_resp.status().is_success() {
2531        return Err(ApiError {
2532            code: "proxy_error".to_string(),
2533            message: format!(
2534                "dashboard rejected pyspy dump store: HTTP {}",
2535                store_resp.status()
2536            ),
2537            details: None,
2538        });
2539    }
2540
2541    Ok(axum::Json(PyspyDumpAndStoreResponse { dump_id }))
2542}
2543
2544/// HTTP bridge for config dump requests.
2545///
2546/// Config dump bridge. No preflight probe — the send + bridge
2547/// timeout handles both absent and busy actors correctly (CFG-4).
2548async fn config_bridge(
2549    State(state): State<Arc<BridgeState>>,
2550    AxumPath(proc_reference): AxumPath<String>,
2551) -> Result<Json<ConfigDumpResult>, ApiError> {
2552    let handler = route_proc_handler(&proc_reference)?;
2553    let timeout =
2554        hyperactor_config::global::get(crate::config::MESH_ADMIN_CONFIG_DUMP_BRIDGE_TIMEOUT);
2555    let result = handler.config_dump(&state.bridge_cx, timeout).await?;
2556    Ok(Json(result))
2557}
2558
2559/// Resolve an opaque reference string to a `NodePayload` via the
2560/// actor-based resolver.
2561///
2562/// Implements `GET /v1/{*reference}` for the reference-walking client
2563/// (e.g. the TUI):
2564/// - Decodes the wildcard path segment into the original reference
2565///   string (Axum does not percent-decode `{*reference}` captures).
2566/// - Sends `ResolveReferenceMessage::Resolve` to `MeshAdminAgent` and
2567///   awaits the reply.
2568/// - Maps resolver failures into appropriate `ApiError`s
2569///   (`bad_request`, `not_found`, `gateway_timeout`, or
2570///   `internal_error`).
2571async fn resolve_reference_bridge(
2572    State(state): State<Arc<BridgeState>>,
2573    AxumPath(reference): AxumPath<String>,
2574) -> Result<Json<NodePayloadDto>, ApiError> {
2575    // Axum's wildcard may include a leading slash; strip it.
2576    let reference = reference.trim_start_matches('/');
2577    if reference.is_empty() {
2578        return Err(ApiError::bad_request("empty reference", None));
2579    }
2580    let reference = urlencoding::decode(reference)
2581        .map(|cow| cow.into_owned())
2582        .map_err(|_| {
2583            ApiError::bad_request(
2584                "malformed percent-encoding: decoded bytes are not valid UTF-8",
2585                None,
2586            )
2587        })?;
2588
2589    // Limit concurrent resolves to avoid starving user workloads
2590    // that share this tokio runtime.
2591    let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2592        tracing::warn!("mesh admin: rejecting resolve request (503): too many concurrent requests");
2593        ApiError {
2594            code: "service_unavailable".to_string(),
2595            message: "too many concurrent introspection requests".to_string(),
2596            details: None,
2597        }
2598    })?;
2599
2600    let cx = &state.bridge_cx;
2601    let resolve_start = std::time::Instant::now();
2602    let response = tokio::time::timeout(
2603        hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
2604        state.admin_ref.resolve(cx, reference.clone()),
2605    )
2606    .await
2607    .map_err(|_| {
2608        tracing::warn!(
2609            reference = %reference,
2610            elapsed_ms = resolve_start.elapsed().as_millis() as u64,
2611            "mesh admin: resolve timed out (gateway_timeout)",
2612        );
2613        ApiError {
2614            code: "gateway_timeout".to_string(),
2615            message: "timed out resolving reference".to_string(),
2616            details: None,
2617        }
2618    })?
2619    .map_err(|e| ApiError {
2620        code: "internal_error".to_string(),
2621        message: format!("failed to resolve reference: {}", e),
2622        details: None,
2623    })?;
2624
2625    match response.0 {
2626        Ok(payload) => Ok(Json(NodePayloadDto::from(payload))),
2627        Err(error) => Err(ApiError::not_found(error, None)),
2628    }
2629}
2630
2631// TODO: MESH_ADMIN_TREE_TIMEOUT is applied per-call, not as a total
2632// budget. On a mesh with N hosts and M procs, the worst case is
2633// N*(1+M) sequential calls each up to 10s. This should use a single
2634// deadline for the entire walk.
2635/// `GET /v1/tree` — ASCII topology dump.
2636///
2637/// Walks the reference graph starting from `"root"`, resolving each
2638/// host and its proc children, and formats the result as a
2639/// human-readable ASCII tree suitable for quick `curl` inspection.
2640/// Each line includes a clickable URL for drilling into that node via
2641/// the reference API. Built on top of the same
2642/// `ResolveReferenceMessage` protocol used by the TUI.
2643///
2644/// Output format:
2645/// ```text
2646/// unix:@hash  ->  https://host:port/v1/...  (or http:// in OSS)
2647/// ├── service  ->  https://host:port/v1/...
2648/// │   ├── agent[0]  ->  https://host:port/v1/...
2649/// │   └── client[0]  ->  https://host:port/v1/...
2650/// ├── local  ->  https://host:port/v1/...
2651/// └── philosophers_0  ->  https://host:port/v1/...
2652///     ├── agent[0]  ->  https://host:port/v1/...
2653///     └── philosopher[0]  ->  https://host:port/v1/...
2654/// ```
2655async fn tree_dump(
2656    State(state): State<Arc<BridgeState>>,
2657    headers: axum::http::header::HeaderMap,
2658) -> Result<String, ApiError> {
2659    // Limit concurrent resolves to avoid starving user workloads.
2660    let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2661        tracing::warn!(
2662            "mesh admin: rejecting tree_dump request (503): too many concurrent requests"
2663        );
2664        ApiError {
2665            code: "service_unavailable".to_string(),
2666            message: "too many concurrent introspection requests".to_string(),
2667            details: None,
2668        }
2669    })?;
2670
2671    let cx = &state.bridge_cx;
2672
2673    // Build base URL from the Host header for clickable links.
2674    let host = headers
2675        .get("host")
2676        .and_then(|v| v.to_str().ok())
2677        .unwrap_or("localhost");
2678    let scheme = headers
2679        .get("x-forwarded-proto")
2680        .and_then(|v| v.to_str().ok())
2681        .unwrap_or("http");
2682    let base_url = format!("{}://{}", scheme, host);
2683
2684    // Resolve root.
2685    let root_resp = tokio::time::timeout(
2686        hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2687        state.admin_ref.resolve(cx, "root".to_string()),
2688    )
2689    .await
2690    .map_err(|_| ApiError {
2691        code: "gateway_timeout".to_string(),
2692        message: "timed out resolving root".to_string(),
2693        details: None,
2694    })?
2695    .map_err(|e| ApiError {
2696        code: "internal_error".to_string(),
2697        message: format!("failed to resolve root: {}", e),
2698        details: None,
2699    })?;
2700
2701    let root = root_resp.0.map_err(|e| ApiError {
2702        code: "internal_error".to_string(),
2703        message: e,
2704        details: None,
2705    })?;
2706
2707    let mut output = String::new();
2708
2709    // Resolve each root child. Hosts get the full host→proc→actor
2710    // subtree; non-host children (e.g. the root client actor) are
2711    // rendered as single leaf lines.
2712    for child_ref in &root.children {
2713        let child_ref_str = child_ref.to_string();
2714        let resp = tokio::time::timeout(
2715            hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2716            state.admin_ref.resolve(cx, child_ref_str.clone()),
2717        )
2718        .await;
2719
2720        let payload = match resp {
2721            Ok(Ok(r)) => r.0.ok(),
2722            _ => None,
2723        };
2724
2725        match payload {
2726            Some(node) if matches!(node.properties, NodeProperties::Host { .. }) => {
2727                let header = match &node.properties {
2728                    NodeProperties::Host { addr, .. } => addr.clone(),
2729                    _ => child_ref_str.clone(),
2730                };
2731                let host_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2732                output.push_str(&format!("{}  ->  {}\n", header, host_url));
2733
2734                let num_procs = node.children.len();
2735                for (i, proc_ref) in node.children.iter().enumerate() {
2736                    let proc_ref_str = proc_ref.to_string();
2737                    let is_last_proc = i == num_procs - 1;
2738                    let proc_connector = if is_last_proc {
2739                        "└── "
2740                    } else {
2741                        "├── "
2742                    };
2743                    let proc_name = derive_tree_label(proc_ref);
2744                    let proc_url =
2745                        format!("{}/v1/{}", base_url, urlencoding::encode(&proc_ref_str));
2746                    output.push_str(&format!(
2747                        "{}{}  ->  {}\n",
2748                        proc_connector, proc_name, proc_url
2749                    ));
2750
2751                    let proc_resp = tokio::time::timeout(
2752                        hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2753                        state.admin_ref.resolve(cx, proc_ref_str),
2754                    )
2755                    .await;
2756                    let proc_payload = match proc_resp {
2757                        Ok(Ok(r)) => r.0.ok(),
2758                        _ => None,
2759                    };
2760                    if let Some(proc_node) = proc_payload {
2761                        let num_actors = proc_node.children.len();
2762                        let child_prefix = if is_last_proc { "    " } else { "│   " };
2763                        for (j, actor_ref) in proc_node.children.iter().enumerate() {
2764                            let actor_ref_str = actor_ref.to_string();
2765                            let actor_connector = if j == num_actors - 1 {
2766                                "└── "
2767                            } else {
2768                                "├── "
2769                            };
2770                            let actor_label = derive_actor_label(actor_ref);
2771                            let actor_url =
2772                                format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2773                            output.push_str(&format!(
2774                                "{}{}{}  ->  {}\n",
2775                                child_prefix, actor_connector, actor_label, actor_url
2776                            ));
2777                        }
2778                    }
2779                }
2780                output.push('\n');
2781            }
2782            Some(node) if matches!(node.properties, NodeProperties::Proc { .. }) => {
2783                let proc_name = match &node.properties {
2784                    NodeProperties::Proc { proc_name, .. } => proc_name.clone(),
2785                    _ => child_ref_str.clone(),
2786                };
2787                let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2788                output.push_str(&format!("{}  ->  {}\n", proc_name, proc_url));
2789
2790                let num_actors = node.children.len();
2791                for (j, actor_ref) in node.children.iter().enumerate() {
2792                    let actor_ref_str = actor_ref.to_string();
2793                    let actor_connector = if j == num_actors - 1 {
2794                        "└── "
2795                    } else {
2796                        "├── "
2797                    };
2798                    let actor_label = derive_actor_label(actor_ref);
2799                    let actor_url =
2800                        format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2801                    output.push_str(&format!(
2802                        "{}{}  ->  {}\n",
2803                        actor_connector, actor_label, actor_url
2804                    ));
2805                }
2806                output.push('\n');
2807            }
2808            Some(_node) => {
2809                let label = derive_actor_label(child_ref);
2810                let url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2811                output.push_str(&format!("{}  ->  {}\n\n", label, url));
2812            }
2813            _ => {
2814                output.push_str(&format!("{} (unreachable)\n\n", child_ref));
2815            }
2816        }
2817    }
2818    Ok(output)
2819}
2820
2821/// Derive a short display label from a reference string for the ASCII
2822/// tree.
2823///
2824/// Extracts the proc name — the meaningful identifier for tree
2825/// display — from the various reference formats emitted by
2826/// `HostAgent`'s children list:
2827///
2828/// - System proc ref `"[system] unix:@hash,service"` → `"service"`
2829/// - ProcAgent ActorAddr `"unix:@hash,my_proc,agent[0]"` →
2830///   `"my_proc"`
2831/// - Bare ProcAddr `"unix:@hash,my_proc"` → `"my_proc"`
2832///
2833/// Note: `ActorAddr::Display` for `ProcAddr` uses commas as
2834/// separators (`proc_id,actor_name[idx]`), not slashes.
2835fn derive_tree_label(node_ref: &crate::introspect::NodeRef) -> String {
2836    match node_ref {
2837        crate::introspect::NodeRef::Root => "root".to_string(),
2838        crate::introspect::NodeRef::Host(id) => id.proc_addr().id().to_string(),
2839        crate::introspect::NodeRef::Proc(id) => id.id().to_string(),
2840        crate::introspect::NodeRef::Actor(id) => {
2841            format!("{}[{}]", id.log_name(), id.uid())
2842        }
2843    }
2844}
2845
2846fn derive_actor_label(node_ref: &crate::introspect::NodeRef) -> String {
2847    match node_ref {
2848        crate::introspect::NodeRef::Root => "root".to_string(),
2849        crate::introspect::NodeRef::Host(id) => id.log_name().to_string(),
2850        crate::introspect::NodeRef::Proc(id) => id.id().to_string(),
2851        crate::introspect::NodeRef::Actor(id) => {
2852            format!("{}[{}]", id.log_name(), id.uid())
2853        }
2854    }
2855}
2856
2857// -- Admin handle type discrimination --
2858
2859/// A handle scheme that requires a publication-based lookup to resolve
2860/// to a concrete admin URL.
2861///
2862/// Only `Mast` is defined today. The nested-enum shape allows future
2863/// scheduler-specific variants (Slurm, K8s, etc.) to be added without
2864/// changing `AdminHandle`.
2865#[non_exhaustive]
2866pub enum PublishedHandle {
2867    /// `mast_conda:///<job-name>` — requires publication-based discovery.
2868    Mast(String),
2869}
2870
2871impl PublishedHandle {
2872    /// Resolve a published handle to a concrete admin URL.
2873    ///
2874    /// All published-handle schemes return an explicit error today.
2875    /// When real publication lookup is implemented, dispatch by variant here.
2876    pub async fn resolve(self, _port_override: Option<u16>) -> anyhow::Result<String> {
2877        anyhow::bail!(
2878            "publication-based admin handle resolution is not yet implemented: \
2879             mesh admin placement has moved to the caller's local proc. \
2880             Discover the admin URL from startup output or another \
2881             launch-time publication instead."
2882        )
2883    }
2884}
2885
2886/// A handle for locating the mesh admin server.
2887///
2888/// Parse a user-supplied address string with [`AdminHandle::parse`]
2889/// and resolve it to a concrete URL with [`AdminHandle::resolve`].
2890#[non_exhaustive]
2891pub enum AdminHandle {
2892    /// Already-resolved URL (e.g. `https://host:1729`).
2893    Url(String),
2894    /// Handle that requires a publication lookup. Currently unresolvable.
2895    Published(PublishedHandle),
2896    /// Scheme or format that is not recognized.
2897    Unsupported(String),
2898}
2899
2900impl AdminHandle {
2901    /// Parse an address string into an `AdminHandle`.
2902    ///
2903    /// Uses `url` crate parsing. Known publication-handle prefixes
2904    /// (`mast_conda:///`) are classified as `Published`. `http`/`https`
2905    /// scheme URLs are `Url`. Bare `host:port` inputs (no scheme) are
2906    /// inferred as `https://host:port` and classified as `Url` — this
2907    /// preserves existing TUI behavior where `--addr myhost:1729` is a
2908    /// valid input. Everything else is `Unsupported`.
2909    pub fn parse(addr: &str) -> Self {
2910        // Check known publication handle prefixes first.
2911        if addr.starts_with("mast_conda:///") {
2912            return AdminHandle::Published(PublishedHandle::Mast(addr.to_string()));
2913        }
2914        // Strict URL parse — only http/https accepted.
2915        if let Ok(parsed) = url::Url::parse(addr)
2916            && matches!(parsed.scheme(), "http" | "https")
2917        {
2918            return AdminHandle::Url(addr.to_string());
2919        }
2920        // Infer https:// for bare host:port inputs (e.g. "myhost:1729").
2921        // This preserves the TUI's documented --addr behavior.
2922        let with_scheme = format!("https://{}", addr);
2923        if let Ok(parsed) = url::Url::parse(&with_scheme)
2924            && parsed.host_str().is_some()
2925            && parsed.port().is_some()
2926        {
2927            return AdminHandle::Url(with_scheme);
2928        }
2929        AdminHandle::Unsupported(addr.to_string())
2930    }
2931
2932    /// Resolve to a concrete admin base URL.
2933    ///
2934    /// `port_override` is retained to preserve existing call surfaces
2935    /// but is intentionally unused until real publication lookup is
2936    /// implemented.
2937    pub async fn resolve(self, port_override: Option<u16>) -> anyhow::Result<String> {
2938        match self {
2939            AdminHandle::Url(url) => Ok(url),
2940            AdminHandle::Published(h) => h.resolve(port_override).await,
2941            AdminHandle::Unsupported(s) => anyhow::bail!(
2942                "unrecognized admin handle '{}': expected https://host:port or mast_conda:///job",
2943                s
2944            ),
2945        }
2946    }
2947}
2948
2949/// Resolve a `mast_conda:///<job-name>` handle into an admin base URL.
2950///
2951/// **Disabled.** Mesh admin placement has moved to the caller's local
2952/// proc. Delegates to [`AdminHandle::Published`] + [`PublishedHandle::resolve`].
2953/// Kept as a stable API shim; do not remove.
2954pub async fn resolve_mast_handle(
2955    handle: &str,
2956    port_override: Option<u16>,
2957) -> anyhow::Result<String> {
2958    AdminHandle::Published(PublishedHandle::Mast(handle.to_string()))
2959        .resolve(port_override)
2960        .await
2961}
2962
2963/// Cert-aware advertised host selection for wildcard binds.
2964///
2965/// The server advertises one URL. For wildcard binds, this module
2966/// generates candidate hosts from environment sources and picks
2967/// the first candidate covered by the loaded server cert's SAN
2968/// set. This ensures the advertised URL is always consistent with
2969/// the certificate the server presents.
2970mod advertised_host {
2971    use std::net::IpAddr;
2972
2973    /// An identity that can appear as a cert SAN entry.
2974    #[derive(Debug, PartialEq, Eq)]
2975    pub(super) enum SanIdentity {
2976        Ip(IpAddr),
2977        Dns(String),
2978    }
2979
2980    /// Choose the advertised host for a wildcard-bind admin URL.
2981    ///
2982    /// Candidates (in preference order):
2983    /// 1. `hostname::get()` (preferred — human-readable, no
2984    ///    brackets in URLs)
2985    /// 2. `host_ipv6_address()` (Meta: TW metadata → fbwhoami →
2986    ///    local_ipv6)
2987    ///
2988    /// The first candidate whose identity is covered by the loaded
2989    /// server cert's SANs wins. If no cert is available or no
2990    /// candidate matches, falls back to hostname.
2991    pub(super) fn from_cert_sans() -> String {
2992        let hostname = hostname::get()
2993            .unwrap_or_else(|_| "localhost".into())
2994            .into_string()
2995            .unwrap_or_else(|_| "localhost".to_string());
2996
2997        // (url_display_form, identity_to_match_against_cert)
2998        // Prefer DNS names over IPs — more readable, no brackets
2999        // in URLs, more stable across container restarts.
3000        let mut candidates: Vec<(String, SanIdentity)> = Vec::new();
3001
3002        // Candidate 1: hostname (preferred if cert covers it).
3003        candidates.push((hostname.clone(), SanIdentity::Dns(hostname.clone())));
3004
3005        // Candidate 2: host IPv6 address (Meta environments).
3006        #[cfg(fbcode_build)]
3007        if let Ok(ip_str) = hyperactor::meta::host_ip::host_ipv6_address()
3008            && let Ok(ip) = ip_str.parse::<IpAddr>()
3009        {
3010            candidates.push((format!("[{}]", ip), SanIdentity::Ip(ip)));
3011        }
3012
3013        let cert_sans = load_cert_sans();
3014        let chosen = pick_candidate(&candidates, &cert_sans, &hostname);
3015
3016        if chosen != hostname && !cert_sans.is_empty() {
3017            tracing::info!("admin URL host '{}' matches cert SAN", chosen);
3018        } else if !cert_sans.is_empty() && !candidates.iter().any(|(_, id)| cert_sans.contains(id))
3019        {
3020            tracing::warn!(
3021                "no admin URL candidate matched cert SANs; falling back to hostname '{}'",
3022                hostname,
3023            );
3024        }
3025
3026        chosen
3027    }
3028
3029    /// Extract SAN entries from the server cert PEM bundle.
3030    ///
3031    /// Loads the same cert bundle that `try_tls_acceptor` uses,
3032    /// parses the leaf cert with `x509_parser`, and returns SAN
3033    /// DNS names and IP addresses. Returns empty if no cert is
3034    /// available or parsing fails.
3035    fn load_cert_sans() -> Vec<SanIdentity> {
3036        use std::io::BufReader;
3037
3038        use x509_parser::prelude::*;
3039
3040        let bundle = match hyperactor::channel::try_tls_pem_bundle() {
3041            Some(b) => b,
3042            None => return Vec::new(),
3043        };
3044
3045        let cert_pem = match bundle.cert.reader() {
3046            Ok(r) => {
3047                let mut buf = Vec::new();
3048                if std::io::Read::read_to_end(&mut BufReader::new(r), &mut buf).is_err() {
3049                    return Vec::new();
3050                }
3051                buf
3052            }
3053            Err(_) => return Vec::new(),
3054        };
3055
3056        let mut cursor = &cert_pem[..];
3057        let certs: Vec<_> = rustls_pemfile::certs(&mut cursor)
3058            .filter_map(|r| r.ok())
3059            .collect();
3060
3061        let leaf_der = match certs.first() {
3062            Some(c) => c,
3063            None => return Vec::new(),
3064        };
3065
3066        let (_, cert) = match X509Certificate::from_der(leaf_der.as_ref()) {
3067            Ok(parsed) => parsed,
3068            Err(e) => {
3069                tracing::warn!("failed to parse leaf cert for SAN extraction: {}", e);
3070                return Vec::new();
3071            }
3072        };
3073
3074        let mut sans = Vec::new();
3075        if let Ok(Some(san_ext)) = cert.subject_alternative_name() {
3076            for name in &san_ext.value.general_names {
3077                match name {
3078                    GeneralName::DNSName(dns) => {
3079                        sans.push(SanIdentity::Dns(dns.to_string()));
3080                    }
3081                    GeneralName::IPAddress(bytes) => {
3082                        let ip = match bytes.len() {
3083                            4 => IpAddr::from(<[u8; 4]>::try_from(*bytes).unwrap()),
3084                            16 => IpAddr::from(<[u8; 16]>::try_from(*bytes).unwrap()),
3085                            _ => continue,
3086                        };
3087                        sans.push(SanIdentity::Ip(ip));
3088                    }
3089                    _ => {}
3090                }
3091            }
3092        }
3093
3094        sans
3095    }
3096
3097    /// Pick the first candidate covered by the given SAN set.
3098    /// Extracted from `from_cert_sans` for direct unit testing.
3099    fn pick_candidate(
3100        candidates: &[(String, SanIdentity)],
3101        cert_sans: &[SanIdentity],
3102        fallback: &str,
3103    ) -> String {
3104        if cert_sans.is_empty() {
3105            return fallback.to_string();
3106        }
3107        for (url_host, identity) in candidates {
3108            if cert_sans.iter().any(|san| san == identity) {
3109                return url_host.clone();
3110            }
3111        }
3112        fallback.to_string()
3113    }
3114
3115    #[cfg(test)]
3116    mod tests {
3117        use std::net::IpAddr;
3118        use std::net::Ipv4Addr;
3119        use std::net::Ipv6Addr;
3120
3121        use super::*;
3122
3123        #[test]
3124        fn cert_covers_hostname_only_picks_hostname() {
3125            let candidates = vec![
3126                ("myhost".to_string(), SanIdentity::Dns("myhost".to_string())),
3127                (
3128                    "[::1]".to_string(),
3129                    SanIdentity::Ip(IpAddr::V6(Ipv6Addr::LOCALHOST)),
3130                ),
3131            ];
3132            let sans = vec![SanIdentity::Dns("myhost".to_string())];
3133            assert_eq!(pick_candidate(&candidates, &sans, "fallback"), "myhost");
3134        }
3135
3136        #[test]
3137        fn cert_covers_ip_only_picks_ip() {
3138            let ip = IpAddr::V6("2803:6084:3894:2b36:b5d3:11ef:400:0".parse().unwrap());
3139            let candidates = vec![
3140                ("myhost".to_string(), SanIdentity::Dns("myhost".to_string())),
3141                (format!("[{}]", ip), SanIdentity::Ip(ip)),
3142            ];
3143            let sans = vec![SanIdentity::Ip(ip)];
3144            assert_eq!(
3145                pick_candidate(&candidates, &sans, "fallback"),
3146                format!("[{}]", ip)
3147            );
3148        }
3149
3150        #[test]
3151        fn cert_covers_both_prefers_hostname() {
3152            let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
3153            let candidates = vec![
3154                ("myhost".to_string(), SanIdentity::Dns("myhost".to_string())),
3155                ("10.0.0.1".to_string(), SanIdentity::Ip(ip)),
3156            ];
3157            let sans = vec![SanIdentity::Dns("myhost".to_string()), SanIdentity::Ip(ip)];
3158            assert_eq!(pick_candidate(&candidates, &sans, "fallback"), "myhost");
3159        }
3160
3161        #[test]
3162        fn no_sans_returns_fallback() {
3163            let candidates = vec![("myhost".to_string(), SanIdentity::Dns("myhost".to_string()))];
3164            assert_eq!(pick_candidate(&candidates, &[], "fallback"), "fallback");
3165        }
3166
3167        #[test]
3168        fn no_candidate_matches_returns_fallback() {
3169            let candidates = vec![("myhost".to_string(), SanIdentity::Dns("myhost".to_string()))];
3170            let sans = vec![SanIdentity::Dns("otherhost".to_string())];
3171            assert_eq!(pick_candidate(&candidates, &sans, "fallback"), "fallback");
3172        }
3173    }
3174}
3175
3176#[cfg(test)]
3177mod tests {
3178    use std::net::SocketAddr;
3179
3180    use hyperactor::channel::ChannelAddr;
3181    use hyperactor::id::Label;
3182    use hyperactor::testing::ids::test_proc_id_with_addr;
3183
3184    use super::*;
3185    use crate::mesh_id::ResourceId;
3186
3187    // Integration tests that spawn MeshAdminAgent must pass
3188    // `Some("[::]:0".parse().unwrap())` as the admin_addr to get an
3189    // ephemeral port. The default (`None`) reads MESH_ADMIN_ADDR
3190    // config which is `[::]:1729` — a fixed port that causes bind
3191    // conflicts when tests run concurrently.
3192
3193    /// Minimal introspectable actor for tests. The `#[export]`
3194    /// attribute generates `Named + Referable + Binds` so that
3195    /// `handle.bind()` registers the `IntrospectMessage` port for
3196    /// remote delivery.
3197    #[derive(Debug)]
3198    #[hyperactor::export(handlers = [])]
3199    struct TestIntrospectableActor;
3200    impl Actor for TestIntrospectableActor {}
3201
3202    // Verifies that MeshAdminAgent::build_root_payload constructs the
3203    // expected root node: identity/root metadata, correct Root
3204    // properties (num_hosts), and child links populated with the
3205    // stringified IDs of the configured host mesh-agent ActorRefs.
3206    #[test]
3207    fn test_build_root_payload() {
3208        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
3209        let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
3210
3211        let proc1 = test_proc_id_with_addr(ChannelAddr::Tcp(addr1), "host1");
3212        let proc2 = test_proc_id_with_addr(ChannelAddr::Tcp(addr2), "host2");
3213
3214        let actor_id1 = proc1.actor_addr("mesh_agent");
3215        let actor_id2 = proc2.actor_addr("mesh_agent");
3216
3217        let ref1: ActorRef<HostAgent> = ActorRef::attest(actor_id1.clone());
3218        let ref2: ActorRef<HostAgent> = ActorRef::attest(actor_id2.clone());
3219
3220        let agent = MeshAdminAgent::new(
3221            vec![("host_a".to_string(), ref1), ("host_b".to_string(), ref2)],
3222            None,
3223            None,
3224            None,
3225        );
3226
3227        let payload = agent.build_root_payload();
3228        assert_eq!(payload.identity, crate::introspect::NodeRef::Root);
3229        assert_eq!(payload.parent, None);
3230        assert!(matches!(
3231            payload.properties,
3232            NodeProperties::Root { num_hosts: 2, .. }
3233        ));
3234        assert_eq!(payload.children.len(), 2);
3235        assert!(
3236            payload
3237                .children
3238                .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
3239        );
3240        assert!(
3241            payload
3242                .children
3243                .contains(&crate::introspect::NodeRef::Host(actor_id2.clone()))
3244        );
3245
3246        // Verify root properties derived from attrs.
3247        match &payload.properties {
3248            NodeProperties::Root {
3249                num_hosts,
3250                started_by,
3251                system_children,
3252                ..
3253            } => {
3254                assert_eq!(*num_hosts, 2);
3255                assert!(!started_by.is_empty());
3256                // LC-1: root system_children is always empty.
3257                assert!(
3258                    system_children.is_empty(),
3259                    "LC-1: root system_children must be empty"
3260                );
3261            }
3262            other => panic!("expected Root, got {:?}", other),
3263        }
3264    }
3265
3266    // End-to-end smoke test for MeshAdminAgent::resolve that walks
3267    // the reference tree: root → host → system proc → host-agent
3268    // cross-reference. Verifies the reverse index routes the
3269    // HostAgent ActorAddr to NodeProperties::Host (not Actor),
3270    // preventing the TUI's cycle detection from dropping that node.
3271    #[tokio::test]
3272    async fn test_resolve_reference_tree_walk() {
3273        use hyperactor::Proc;
3274        use hyperactor::channel::ChannelTransport;
3275
3276        use crate::host::Host;
3277        use crate::host::LocalProcManager;
3278        use crate::host_mesh::host_agent::HostAgentMode;
3279        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3280        use crate::proc_agent::ProcAgent;
3281
3282        // -- 1. Stand up a local in-process Host with a HostAgent --
3283        // Use Unix transport for all procs — Local transport does not
3284        // support cross-proc message routing.
3285        let spawn: ProcManagerSpawnFn =
3286            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3287        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3288        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3289            Host::new(manager, ChannelTransport::Unix.any())
3290                .await
3291                .unwrap();
3292        let host_addr = host.addr().clone();
3293        let system_proc = host.system_proc().clone();
3294        let host_agent_handle = system_proc
3295            .spawn(
3296                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3297                HostAgent::new(HostAgentMode::Local(host)),
3298            )
3299            .unwrap();
3300        let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3301        let host_addr_str = host_addr.to_string();
3302
3303        // -- 2. Spawn MeshAdminAgent on a dedicated test proc --
3304        // NOTE: This does not conform to SA-5 (caller-local placement).
3305        // Production uses host_mesh::spawn_admin(). This is a white-box
3306        // test of admin behavior, not placement.
3307        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3308        // The admin proc has no supervision coordinator by default.
3309        // Without one, actor teardown triggers std::process::exit(1).
3310        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3311        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3312        let admin_handle = admin_proc
3313            .spawn(
3314                MESH_ADMIN_ACTOR_NAME,
3315                MeshAdminAgent::new(
3316                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3317                    None,
3318                    Some("[::]:0".parse().unwrap()),
3319                    None,
3320                ),
3321            )
3322            .unwrap();
3323        let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3324
3325        // -- 3. Create a bare client instance for sending messages --
3326        // Only a mailbox is needed for reply ports — no actor message
3327        // loop required.
3328        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3329        let (client, _handle) = client_proc.client("client").unwrap();
3330
3331        // -- 4. Resolve "root" --
3332        let root_resp = admin_ref
3333            .resolve(&client, "root".to_string())
3334            .await
3335            .unwrap();
3336        let root = root_resp.0.unwrap();
3337        assert_eq!(root.identity, crate::introspect::NodeRef::Root);
3338        assert!(matches!(
3339            root.properties,
3340            NodeProperties::Root { num_hosts: 1, .. }
3341        ));
3342        assert_eq!(root.parent, None);
3343        assert_eq!(root.children.len(), 1); // host only (admin proc no longer standalone)
3344
3345        // -- 5. Resolve the host child --
3346        let expected_host_ref =
3347            crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone());
3348        let host_child_ref = root
3349            .children
3350            .iter()
3351            .find(|c| **c == expected_host_ref)
3352            .expect("root children should contain the host agent (as Host ref)");
3353        let host_ref_string = host_child_ref.to_string();
3354        let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
3355        let host_node = host_resp.0.unwrap();
3356        assert_eq!(host_node.identity, expected_host_ref);
3357        assert!(
3358            matches!(host_node.properties, NodeProperties::Host { .. }),
3359            "expected Host properties, got {:?}",
3360            host_node.properties
3361        );
3362        assert_eq!(host_node.parent, Some(crate::introspect::NodeRef::Root));
3363        assert!(
3364            !host_node.children.is_empty(),
3365            "host should have at least one proc child"
3366        );
3367        // LC-2: host system_children is always empty.
3368        match &host_node.properties {
3369            NodeProperties::Host {
3370                system_children, ..
3371            } => {
3372                assert!(
3373                    system_children.is_empty(),
3374                    "LC-2: host system_children must be empty"
3375                );
3376            }
3377            other => panic!("expected Host, got {:?}", other),
3378        }
3379
3380        // -- 6. Resolve a system proc child --
3381        let proc_ref = &host_node.children[0];
3382        let proc_ref_str = proc_ref.to_string();
3383        let proc_resp = admin_ref.resolve(&client, proc_ref_str).await.unwrap();
3384        let proc_node = proc_resp.0.unwrap();
3385        assert!(
3386            matches!(proc_node.properties, NodeProperties::Proc { .. }),
3387            "expected Proc properties, got {:?}",
3388            proc_node.properties
3389        );
3390        assert_eq!(proc_node.parent, Some(expected_host_ref.clone()));
3391        // The system proc should have at least the "host_agent" actor.
3392        assert!(
3393            !proc_node.children.is_empty(),
3394            "proc should have at least one actor child"
3395        );
3396
3397        // -- 7. Cross-reference: system proc child is the host agent --
3398        //
3399        // The service proc's actor (agent[0]) IS the HostAgent, so
3400        // it appears both as a host node (from root, via NodeRef::Host)
3401        // and as an actor (from a proc's children list, via NodeRef::Actor).
3402        // NodeRef::Host in root children makes resolution unambiguous:
3403        // host refs get Entity view, plain actor refs get Actor view.
3404
3405        // The system proc must list the host agent among its children.
3406        let host_agent_node_ref =
3407            crate::introspect::NodeRef::Actor(host_agent_ref.actor_addr().clone());
3408        assert!(
3409            proc_node.children.contains(&host_agent_node_ref),
3410            "system proc children {:?} should contain the host agent {:?}",
3411            proc_node.children,
3412            host_agent_node_ref
3413        );
3414
3415        // Resolve that child reference as a plain actor (no host: prefix).
3416        let xref_resp = admin_ref
3417            .resolve(&client, host_agent_ref.actor_addr().to_string())
3418            .await
3419            .unwrap();
3420        let xref_node = xref_resp.0.unwrap();
3421
3422        // When resolved as a plain actor reference, it must return
3423        // Actor properties (not Host), because it has no host: prefix.
3424        assert!(
3425            matches!(xref_node.properties, NodeProperties::Actor { .. }),
3426            "host agent child resolved as plain actor should be Actor, got {:?}",
3427            xref_node.properties
3428        );
3429    }
3430
3431    // Verifies MeshAdminAgent::resolve returns NodeProperties::Proc
3432    // for all proc children. Spawns a user proc via
3433    // CreateOrUpdate<ProcSpec>, resolves all host proc-children, and
3434    // asserts every proc returns Proc properties.
3435    #[tokio::test]
3436    async fn test_proc_properties_for_all_procs() {
3437        use std::time::Duration;
3438
3439        use hyperactor::Proc;
3440        use hyperactor::channel::ChannelTransport;
3441        use hyperactor::id::Label;
3442
3443        use crate::host::Host;
3444        use crate::host::LocalProcManager;
3445        use crate::host_mesh::host_agent::HostAgentMode;
3446        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3447        use crate::proc_agent::ProcAgent;
3448        use crate::resource;
3449        use crate::resource::ProcSpec;
3450        use crate::resource::Rank;
3451
3452        // Stand up a local in-process Host with a HostAgent.
3453        let spawn: ProcManagerSpawnFn =
3454            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3455        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3456        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3457            Host::new(manager, ChannelTransport::Unix.any())
3458                .await
3459                .unwrap();
3460        let host_addr = host.addr().clone();
3461        let system_proc_id: ProcAddr = host.system_proc().proc_addr().clone();
3462        let local_proc_id: ProcAddr = host.local_proc().proc_addr().clone();
3463        let system_proc = host.system_proc().clone();
3464        let host_agent_handle = system_proc
3465            .spawn(
3466                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3467                HostAgent::new(HostAgentMode::Local(host)),
3468            )
3469            .unwrap();
3470        let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3471        let host_addr_str = host_addr.to_string();
3472
3473        // Spawn MeshAdminAgent on a dedicated test proc.
3474        // NOTE: Does not conform to SA-5 (caller-local placement).
3475        // Production uses host_mesh::spawn_admin(). White-box test setup.
3476        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3477        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3478        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3479        let admin_handle = admin_proc
3480            .spawn(
3481                MESH_ADMIN_ACTOR_NAME,
3482                MeshAdminAgent::new(
3483                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3484                    None,
3485                    Some("[::]:0".parse().unwrap()),
3486                    None,
3487                ),
3488            )
3489            .unwrap();
3490        let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3491
3492        // Create a bare client instance for sending messages.
3493        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3494        let (client, _handle) = client_proc.client("client").unwrap();
3495
3496        // Spawn a user proc via CreateOrUpdate<ProcSpec>.
3497        let user_proc_name = ResourceId::instance(Label::new("user-proc").unwrap());
3498        host_agent_ref.post(
3499            &client,
3500            resource::CreateOrUpdate {
3501                id: user_proc_name.clone(),
3502                rank: Rank::new(0),
3503                spec: ProcSpec::default(),
3504            },
3505        );
3506
3507        // Wait for the user proc to boot.
3508        tokio::time::sleep(Duration::from_secs(2)).await;
3509
3510        // Resolve the host to get its children (system + user procs).
3511        let host_ref_string =
3512            crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone()).to_string();
3513        let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
3514        let host_node = host_resp.0.unwrap();
3515
3516        // The host should have at least 3 children: system proc,
3517        // local proc, and our user proc.
3518        assert!(
3519            host_node.children.len() >= 3,
3520            "expected at least 3 proc children (2 system + 1 user), got {}",
3521            host_node.children.len()
3522        );
3523
3524        // Resolve each proc child and verify it has Proc properties.
3525        let mut found_system = false;
3526        let mut found_user = false;
3527        for child_ref in &host_node.children {
3528            let resp = admin_ref
3529                .resolve(&client, child_ref.to_string())
3530                .await
3531                .unwrap();
3532            let node = resp.0.unwrap();
3533            if let NodeProperties::Proc { .. } = &node.properties {
3534                if matches!(
3535                    child_ref,
3536                    crate::introspect::NodeRef::Proc(proc_id)
3537                        if proc_id != &system_proc_id && proc_id != &local_proc_id
3538                ) {
3539                    found_user = true;
3540                } else {
3541                    found_system = true;
3542                }
3543                // Properties derived from attrs — verified by derive_properties tests.
3544            } else {
3545                // Host agent cross-reference — skip.
3546            }
3547        }
3548        assert!(
3549            found_system,
3550            "should have resolved at least one system proc"
3551        );
3552        assert!(found_user, "should have resolved the user proc");
3553    }
3554
3555    // Verifies that build_root_payload lists only the host as a
3556    // child. The root client is visible under its host's local proc,
3557    // not at root level.
3558    #[test]
3559    fn test_build_root_payload_with_root_client() {
3560        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
3561        let proc1 = ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr1), "host1");
3562        let actor_id1 = hyperactor::ActorAddr::root(proc1, Label::new("mesh_agent").unwrap());
3563        let ref1: ActorRef<HostAgent> = ActorRef::attest(actor_id1.clone());
3564
3565        let client_proc_id = ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr1), "local");
3566        let client_actor_id = client_proc_id.actor_addr("client");
3567
3568        let agent = MeshAdminAgent::new(
3569            vec![("host_a".to_string(), ref1)],
3570            Some(client_actor_id.clone()),
3571            None,
3572            None,
3573        );
3574
3575        let payload = agent.build_root_payload();
3576        assert!(matches!(
3577            payload.properties,
3578            NodeProperties::Root { num_hosts: 1, .. }
3579        ));
3580        // Only the host; root client is under host → local proc.
3581        assert_eq!(payload.children.len(), 1);
3582        assert!(
3583            payload
3584                .children
3585                .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
3586        );
3587    }
3588
3589    // Verifies that the root client actor is visible through the
3590    // host → local proc → actor path, not as a standalone child of
3591    // root.
3592    #[tokio::test]
3593    async fn test_resolve_root_client_actor() {
3594        use hyperactor::channel::ChannelTransport;
3595
3596        use crate::host::Host;
3597        use crate::host::LocalProcManager;
3598        use crate::host_mesh::host_agent::HostAgentMode;
3599        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3600        use crate::proc_agent::ProcAgent;
3601
3602        // Stand up a local in-process Host with a HostAgent.
3603        let spawn: ProcManagerSpawnFn =
3604            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3605        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3606        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3607            Host::new(manager, ChannelTransport::Unix.any())
3608                .await
3609                .unwrap();
3610        let host_addr = host.addr().clone();
3611        let system_proc = host.system_proc().clone();
3612
3613        // Spawn the root client on the host's local proc (before
3614        // moving the host into HostAgentMode).
3615        let local_proc = host.local_proc();
3616        let local_proc_id = local_proc.proc_addr().clone();
3617        let root_client_handle = local_proc.spawn("client", TestIntrospectableActor).unwrap();
3618        let root_client_ref: ActorRef<TestIntrospectableActor> = root_client_handle.bind();
3619        let root_client_actor_id = root_client_ref.actor_addr().clone();
3620
3621        let host_agent_handle = system_proc
3622            .spawn(
3623                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3624                HostAgent::new(HostAgentMode::Local(host)),
3625            )
3626            .unwrap();
3627        let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3628        let host_addr_str = host_addr.to_string();
3629
3630        // Spawn MeshAdminAgent on a dedicated test proc with the root
3631        // client ActorAddr. NOTE: Does not conform to SA-5 (caller-local
3632        // placement). Production uses host_mesh::spawn_admin().
3633        // White-box test of root-client visibility, not placement.
3634        let admin_proc =
3635            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3636        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3637        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3638        let admin_handle = admin_proc
3639            .spawn(
3640                MESH_ADMIN_ACTOR_NAME,
3641                MeshAdminAgent::new(
3642                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3643                    Some(root_client_actor_id.clone()),
3644                    Some("[::]:0".parse().unwrap()),
3645                    None,
3646                ),
3647            )
3648            .unwrap();
3649        let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3650
3651        // Client for sending messages.
3652        let client_proc =
3653            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3654        let (client, _handle) = client_proc.client("client").unwrap();
3655
3656        // Resolve "root" — should contain only the host.
3657        let root_resp = admin_ref
3658            .resolve(&client, "root".to_string())
3659            .await
3660            .unwrap();
3661        let root = root_resp.0.unwrap();
3662        let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone());
3663        assert!(
3664            root.children.contains(&host_node_ref),
3665            "root children {:?} should contain host {:?}",
3666            root.children,
3667            host_node_ref
3668        );
3669
3670        // Resolve the host — should list the local proc in children.
3671        let host_resp = admin_ref
3672            .resolve(&client, host_node_ref.to_string())
3673            .await
3674            .unwrap();
3675        let host_node = host_resp.0.unwrap();
3676        let local_proc_node_ref = crate::introspect::NodeRef::Proc(local_proc_id.clone());
3677        assert!(
3678            host_node.children.contains(&local_proc_node_ref),
3679            "host children {:?} should contain local proc {:?}",
3680            host_node.children,
3681            local_proc_node_ref
3682        );
3683
3684        // Resolve the local proc — should contain the root client actor.
3685        let proc_resp = admin_ref
3686            .resolve(&client, local_proc_id.to_string())
3687            .await
3688            .unwrap();
3689        let proc_node = proc_resp.0.unwrap();
3690        assert!(
3691            matches!(proc_node.properties, NodeProperties::Proc { .. }),
3692            "expected Proc properties, got {:?}",
3693            proc_node.properties
3694        );
3695        let root_client_node_ref = crate::introspect::NodeRef::Actor(root_client_actor_id.clone());
3696        assert!(
3697            proc_node.children.contains(&root_client_node_ref),
3698            "local proc children {:?} should contain root client actor {:?}",
3699            proc_node.children,
3700            root_client_node_ref
3701        );
3702
3703        // Resolve the root client actor — parent should be the local proc.
3704        let client_resp = admin_ref
3705            .resolve(&client, root_client_actor_id.to_string())
3706            .await
3707            .unwrap();
3708        let client_node = client_resp.0.unwrap();
3709        assert!(
3710            matches!(client_node.properties, NodeProperties::Actor { .. }),
3711            "expected Actor properties, got {:?}",
3712            client_node.properties
3713        );
3714        assert_eq!(
3715            client_node.parent,
3716            Some(local_proc_node_ref),
3717            "root client parent should be the local proc"
3718        );
3719    }
3720
3721    // Verifies that the SKILL.md template contains the canonical
3722    // strings that agents and tests rely on. Prevents silent drift or
3723    // accidental removal.
3724    #[test]
3725    fn test_skill_md_contains_canonical_strings() {
3726        let template = SKILL_MD_TEMPLATE;
3727        assert!(
3728            template.contains("GET {base}/v1/root"),
3729            "SKILL.md must document the root endpoint"
3730        );
3731        assert!(
3732            template.contains("GET {base}/v1/{reference}"),
3733            "SKILL.md must document the reference endpoint"
3734        );
3735        assert!(
3736            template.contains("NodePayload"),
3737            "SKILL.md must mention the NodePayload response type"
3738        );
3739        assert!(
3740            template.contains("GET {base}/SKILL.md"),
3741            "SKILL.md must document itself"
3742        );
3743        assert!(
3744            template.contains("{base}"),
3745            "SKILL.md must use {{base}} placeholder for interpolation"
3746        );
3747    }
3748
3749    // Verifies the navigation identity invariant (see module docs):
3750    //
3751    // 1. payload.identity == reference_string used to resolve it.
3752    // 2. For each child reference C of a resolved node P,
3753    //    resolve(C).parent == P.identity.
3754    //
3755    // Walks the entire tree starting from root, checking both
3756    // properties at every reachable node.
3757    #[tokio::test]
3758    async fn test_navigation_identity_invariant() {
3759        use hyperactor::Proc;
3760        use hyperactor::channel::ChannelTransport;
3761
3762        use crate::host::Host;
3763        use crate::host::LocalProcManager;
3764        use crate::host_mesh::host_agent::HostAgentMode;
3765        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3766        use crate::proc_agent::ProcAgent;
3767
3768        // Stand up a local host with a HostAgent.
3769        let spawn: ProcManagerSpawnFn =
3770            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3771        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3772        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3773            Host::new(manager, ChannelTransport::Unix.any())
3774                .await
3775                .unwrap();
3776        let host_addr = host.addr().clone();
3777        let system_proc = host.system_proc().clone();
3778        let host_agent_handle = system_proc
3779            .spawn(
3780                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3781                HostAgent::new(HostAgentMode::Local(host)),
3782            )
3783            .unwrap();
3784        let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3785        let host_addr_str = host_addr.to_string();
3786
3787        // Spawn MeshAdminAgent on a dedicated test proc.
3788        // NOTE: Does not conform to SA-5 (caller-local placement).
3789        // Production uses host_mesh::spawn_admin(). White-box test setup.
3790        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3791        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3792        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3793        let admin_handle = admin_proc
3794            .spawn(
3795                MESH_ADMIN_ACTOR_NAME,
3796                MeshAdminAgent::new(
3797                    vec![(host_addr_str, host_agent_ref)],
3798                    None,
3799                    Some("[::]:0".parse().unwrap()),
3800                    None,
3801                ),
3802            )
3803            .unwrap();
3804        let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3805
3806        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3807        let (client, _handle) = client_proc.client("client").unwrap();
3808
3809        // Walk the tree breadth-first, checking the invariant at every node.
3810        // Each entry is (reference_string, expected_parent_identity).
3811        let mut queue: std::collections::VecDeque<(String, Option<crate::introspect::NodeRef>)> =
3812            std::collections::VecDeque::new();
3813        queue.push_back(("root".to_string(), None));
3814
3815        let mut visited = std::collections::HashSet::new();
3816        while let Some((ref_str, expected_parent)) = queue.pop_front() {
3817            if !visited.insert(ref_str.clone()) {
3818                continue;
3819            }
3820
3821            let resp = admin_ref.resolve(&client, ref_str.clone()).await.unwrap();
3822            let node = resp.0.unwrap();
3823
3824            // NI-1: identity display matches the reference used.
3825            assert_eq!(
3826                node.identity.to_string(),
3827                ref_str,
3828                "identity mismatch: resolved '{}' but payload.identity = '{}'",
3829                ref_str,
3830                node.identity
3831            );
3832
3833            // NI-2: parent matches the parent node's identity.
3834            assert_eq!(
3835                node.parent, expected_parent,
3836                "parent mismatch for '{}': expected {:?}, got {:?}",
3837                ref_str, expected_parent, node.parent
3838            );
3839
3840            // Enqueue children with this node's identity as their
3841            // expected parent.
3842            for child_ref in &node.children {
3843                let child_str = child_ref.to_string();
3844                if !visited.contains(&child_str) {
3845                    queue.push_back((child_str, Some(node.identity.clone())));
3846                }
3847            }
3848        }
3849
3850        // Sanity: we should have visited at least root, host, a
3851        // proc, and an actor.
3852        assert!(
3853            visited.len() >= 4,
3854            "expected at least 4 nodes in the tree, visited {}",
3855            visited.len()
3856        );
3857    }
3858
3859    // Exercises SP-1..SP-4 for host/proc payloads.
3860    #[tokio::test]
3861    async fn test_system_proc_identity() {
3862        use hyperactor::Proc;
3863        use hyperactor::channel::ChannelTransport;
3864
3865        use crate::host::Host;
3866        use crate::host::LocalProcManager;
3867        use crate::host_mesh::host_agent::HostAgentMode;
3868        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3869        use crate::proc_agent::ProcAgent;
3870
3871        // -- 1. Stand up a local in-process Host with a HostAgent --
3872        let spawn: ProcManagerSpawnFn =
3873            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3874        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3875        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3876            Host::new(manager, ChannelTransport::Unix.any())
3877                .await
3878                .unwrap();
3879        let host_addr = host.addr().clone();
3880        let system_proc = host.system_proc().clone();
3881        let system_proc_id = system_proc.proc_addr().clone();
3882        let host_agent_handle = system_proc
3883            .spawn(
3884                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3885                HostAgent::new(HostAgentMode::Local(host)),
3886            )
3887            .unwrap();
3888        let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3889        let host_addr_str = host_addr.to_string();
3890
3891        // -- 2. Spawn MeshAdminAgent on a dedicated test proc --
3892        // NOTE: This does not conform to SA-5 (caller-local placement).
3893        // Production uses host_mesh::spawn_admin(). This is a white-box
3894        // test of admin behavior, not placement.
3895        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3896        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3897        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3898        let admin_handle = admin_proc
3899            .spawn(
3900                MESH_ADMIN_ACTOR_NAME,
3901                MeshAdminAgent::new(
3902                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3903                    None,
3904                    Some("[::]:0".parse().unwrap()),
3905                    None,
3906                ),
3907            )
3908            .unwrap();
3909        let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3910
3911        // -- 3. Create a bare client instance for sending messages --
3912        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3913        let (client, _handle) = client_proc.client("client").unwrap();
3914
3915        // -- 4. Resolve the host to get its children --
3916        let host_ref_str =
3917            crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone()).to_string();
3918        let host_resp = admin_ref
3919            .resolve(&client, host_ref_str.clone())
3920            .await
3921            .unwrap();
3922        let host_node = host_resp.0.unwrap();
3923        assert!(
3924            !host_node.children.is_empty(),
3925            "host should have at least one proc child"
3926        );
3927
3928        // -- 5. Find a system proc child via system_children --
3929        let system_children = match &host_node.properties {
3930            NodeProperties::Host {
3931                system_children, ..
3932            } => system_children.clone(),
3933            other => panic!("expected Host properties, got {:?}", other),
3934        };
3935        // Procs are never system — host system_children should be empty.
3936        assert!(
3937            system_children.is_empty(),
3938            "host system_children should be empty (procs are never system), got {:?}",
3939            system_children
3940        );
3941        // Verify host properties derived from attrs.
3942        assert!(
3943            matches!(&host_node.properties, NodeProperties::Host { .. }),
3944            "expected Host properties"
3945        );
3946
3947        // -- 6. Verify host children contain the system proc --
3948        let expected_system_ref = crate::introspect::NodeRef::Proc(system_proc_id.clone());
3949        assert!(
3950            host_node.children.contains(&expected_system_ref),
3951            "host children {:?} should contain the system proc ref {:?}",
3952            host_node.children,
3953            expected_system_ref
3954        );
3955
3956        // -- 7. Resolve a proc child --
3957        let proc_child_ref = &host_node.children[0];
3958        let proc_resp = admin_ref
3959            .resolve(&client, proc_child_ref.to_string())
3960            .await
3961            .unwrap();
3962        let proc_node = proc_resp.0.unwrap();
3963
3964        assert_eq!(
3965            proc_node.identity, *proc_child_ref,
3966            "identity must match the proc ref from the host's children list"
3967        );
3968
3969        assert!(
3970            matches!(proc_node.properties, NodeProperties::Proc { .. }),
3971            "expected NodeProperties::Proc, got {:?}",
3972            proc_node.properties
3973        );
3974
3975        let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone());
3976        assert_eq!(
3977            proc_node.parent,
3978            Some(host_node_ref),
3979            "proc parent should be the host reference"
3980        );
3981
3982        // as_of is a SystemTime — just verify it's not the epoch.
3983        assert!(
3984            proc_node.as_of > std::time::UNIX_EPOCH,
3985            "as_of should be after the epoch"
3986        );
3987
3988        // Verify proc properties derived from attrs.
3989        assert!(
3990            matches!(&proc_node.properties, NodeProperties::Proc { .. }),
3991            "expected Proc properties"
3992        );
3993    }
3994
3995    // -- AdminHandle / PublishedHandle tests --
3996
3997    // AdminHandle::parse — all four cases.
3998    #[test]
3999    fn test_admin_handle_parse_https_url() {
4000        let h = super::AdminHandle::parse("https://myhost:1729");
4001        assert!(matches!(h, super::AdminHandle::Url(u) if u == "https://myhost:1729"));
4002    }
4003
4004    #[test]
4005    fn test_admin_handle_parse_bare_host_port() {
4006        // Bare host:port → inferred as https://host:port.
4007        let h = super::AdminHandle::parse("myhost:1729");
4008        assert!(
4009            matches!(h, super::AdminHandle::Url(ref u) if u == "https://myhost:1729"),
4010            "bare host:port should become https://host:port, got: {:?}",
4011            matches!(h, super::AdminHandle::Url(_))
4012        );
4013    }
4014
4015    #[test]
4016    fn test_admin_handle_parse_mast() {
4017        let h = super::AdminHandle::parse("mast_conda:///my-job");
4018        assert!(matches!(
4019            h,
4020            super::AdminHandle::Published(super::PublishedHandle::Mast(_))
4021        ));
4022    }
4023
4024    #[test]
4025    fn test_admin_handle_parse_unsupported() {
4026        // Bare hostname with no port → Unsupported (no port, scheme inference fails).
4027        let h = super::AdminHandle::parse("junk_hostname_no_port");
4028        assert!(matches!(h, super::AdminHandle::Unsupported(_)));
4029    }
4030
4031    #[tokio::test]
4032    async fn test_admin_handle_resolve_url_returns_url() {
4033        let h = super::AdminHandle::parse("https://myhost:1729");
4034        let result = h.resolve(None).await.unwrap();
4035        assert_eq!(result, "https://myhost:1729");
4036    }
4037
4038    #[tokio::test]
4039    async fn test_admin_handle_resolve_published_returns_error() {
4040        let h = super::AdminHandle::parse("mast_conda:///test-job");
4041        let err = format!("{:#}", h.resolve(Some(1729)).await.unwrap_err());
4042        assert!(
4043            err.contains("not yet implemented"),
4044            "expected 'not yet implemented' in error, got: {}",
4045            err
4046        );
4047    }
4048
4049    #[tokio::test]
4050    async fn test_admin_handle_resolve_unsupported_returns_error() {
4051        let h = super::AdminHandle::parse("junk_hostname_no_port");
4052        let err = format!("{:#}", h.resolve(None).await.unwrap_err());
4053        assert!(
4054            err.contains("unrecognized admin handle"),
4055            "expected 'unrecognized admin handle' in error, got: {}",
4056            err
4057        );
4058    }
4059
4060    // resolve_mast_handle delegates to PublishedHandle::resolve.
4061    // Error text changed from "disabled" to "not yet implemented".
4062    #[tokio::test]
4063    async fn test_resolve_mast_handle_returns_not_yet_implemented_error() {
4064        let result = super::resolve_mast_handle("mast_conda:///test-job", Some(1729)).await;
4065        let err = format!("{:#}", result.unwrap_err());
4066        assert!(
4067            err.contains("not yet implemented"),
4068            "expected 'not yet implemented' in error, got: {}",
4069            err
4070        );
4071    }
4072
4073    // -- AdminInfo::new() constructor tests --
4074
4075    // Constructor guarantee: valid https URL produces correct host.
4076    #[test]
4077    fn test_admin_info_new_derives_host_from_url() {
4078        let info = super::AdminInfo::new(
4079            "actor".to_string(),
4080            "proc".to_string(),
4081            "https://myhost.example.com:1729".to_string(),
4082        )
4083        .unwrap();
4084        assert_eq!(info.host, "myhost.example.com");
4085        assert_eq!(info.url, "https://myhost.example.com:1729");
4086    }
4087
4088    // Constructor guarantee: invalid URL is rejected.
4089    #[test]
4090    fn test_admin_info_new_rejects_invalid_url() {
4091        let result = super::AdminInfo::new(
4092            "actor".to_string(),
4093            "proc".to_string(),
4094            "not a url".to_string(),
4095        );
4096        assert!(result.is_err(), "invalid URL must be rejected");
4097    }
4098
4099    // Constructor guarantee: URL with no host is rejected.
4100    #[test]
4101    fn test_admin_info_new_rejects_url_without_host() {
4102        // data: URLs have no host component.
4103        let result = super::AdminInfo::new(
4104            "actor".to_string(),
4105            "proc".to_string(),
4106            "data:text/plain,hello".to_string(),
4107        );
4108        assert!(result.is_err(), "URL without host must be rejected");
4109    }
4110
4111    // -- Placement test (SA-5) --
4112
4113    // Exercises the real public entrypoint and checks SA-5 via
4114    // ActorRef reachability on the caller proc.
4115    #[tokio::test]
4116    async fn test_spawn_admin_places_on_caller_proc() {
4117        use hyperactor::Proc;
4118        use hyperactor::channel::ChannelTransport;
4119        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
4120
4121        use crate::host_mesh::HostMesh;
4122
4123        // 1. Stand up a local in-process host mesh.
4124        let host_mesh = HostMesh::local().await.unwrap();
4125
4126        // 2. Create a separate caller proc with an actor instance.
4127        let caller_proc = Proc::direct(ChannelTransport::Unix.any(), "caller".to_string()).unwrap();
4128        let _supervision = ProcSupervisionCoordinator::set(&caller_proc).await.unwrap();
4129        let (caller_cx, _caller_handle) = caller_proc.client("caller").unwrap();
4130
4131        // 3. Call the real public entrypoint.
4132        let admin_ref = crate::host_mesh::spawn_admin(
4133            [&host_mesh],
4134            &caller_cx,
4135            Some("[::]:0".parse().unwrap()),
4136            None,
4137        )
4138        .await
4139        .unwrap();
4140
4141        // 4. Prove the returned ActorRef is usable: fetch the URL
4142        //    via get_admin_addr. This also proves the admin is on
4143        //    caller_proc (undeliverable if not).
4144        let admin_url = admin_ref
4145            .get_admin_addr(&caller_cx)
4146            .await
4147            .unwrap()
4148            .addr
4149            .expect("SA-5: admin must report an address");
4150        assert!(
4151            !admin_url.is_empty(),
4152            "spawn_admin ref must yield a non-empty URL"
4153        );
4154    }
4155
4156    // AI-1..AI-3: GET /v1/admin HTTP route test requires TLS certs
4157    // (fbcode_build enforces mTLS). Covered by integration tests in
4158    // fbcode//monarch/hyperactor_mesh/test/mesh_admin_integration.
4159    // AI-4 is a constructor guarantee tested via AdminInfo::new() above.
4160
4161    // Verifies that GET /v1/{proc_id} reflects actors spawned directly
4162    // on a proc — bypassing ProcAgent's gspawn message and therefore
4163    // never triggering publish_introspect_properties — so that the
4164    // resolved children list is always derived from live proc state.
4165    //
4166    // Regression guard for the bug introduced in 9a08d559: the switch
4167    // from a live handle_introspect to a cached publish model made
4168    // supervision-spawned actors (e.g. every sieve actor after
4169    // sieve[0]) invisible to the TUI.
4170    //
4171    // Exercises PA-1 (see module doc). See also
4172    // proc_agent::tests::test_query_child_proc_returns_live_children.
4173    #[tokio::test]
4174    async fn test_proc_children_reflect_directly_spawned_actors() {
4175        use hyperactor::Proc;
4176        use hyperactor::actor::ActorStatus;
4177        use hyperactor::channel::ChannelTransport;
4178        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
4179
4180        use crate::host::Host;
4181        use crate::host::LocalProcManager;
4182        use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
4183        use crate::host_mesh::host_agent::HostAgent;
4184        use crate::host_mesh::host_agent::HostAgentMode;
4185        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
4186        use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
4187        use crate::proc_agent::ProcAgent;
4188
4189        // Stand up a HostMeshAgent. The user proc gets its own
4190        // ephemeral address; we register that address in
4191        // MeshAdminAgent so resolve_proc_node can look it up.
4192        // HostMeshAgent won't know the user proc (it wasn't spawned
4193        // through it), so QueryChild returns Error and resolve falls
4194        // back to querying proc_agent[0] via QueryChild(Proc) — the
4195        // path being tested.
4196        let spawn_fn: ProcManagerSpawnFn =
4197            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
4198        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn_fn);
4199        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
4200            Host::new(manager, ChannelTransport::Unix.any())
4201                .await
4202                .unwrap();
4203        let system_proc = host.system_proc().clone();
4204        let host_agent_handle = system_proc
4205            .spawn(
4206                HOST_MESH_AGENT_ACTOR_NAME,
4207                HostAgent::new(HostAgentMode::Local(host)),
4208            )
4209            .unwrap();
4210        let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
4211
4212        // User proc: own ephemeral Unix socket, own ProcAgent.
4213        let user_proc =
4214            Proc::direct(ChannelTransport::Unix.any(), "user_proc".to_string()).unwrap();
4215        let user_proc_addr = user_proc.proc_addr().addr().to_string();
4216        let agent_handle = ProcAgent::boot_v1(user_proc.clone(), None).unwrap();
4217        agent_handle
4218            .status()
4219            .wait_for(|s| matches!(s, ActorStatus::Idle))
4220            .await
4221            .unwrap();
4222
4223        // MeshAdminAgent: register the user proc's addr as a "host"
4224        // pointing to host_agent_ref. That agent doesn't know the
4225        // user proc, so QueryChild → Error → fallback to proc_agent.
4226        // NOTE: Does not conform to SA-5 (caller-local placement).
4227        // White-box test of proc-agent fallback, not placement.
4228        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
4229        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
4230        let admin_handle = admin_proc
4231            .spawn(
4232                MESH_ADMIN_ACTOR_NAME,
4233                MeshAdminAgent::new(
4234                    vec![(user_proc_addr, host_agent_ref.clone())],
4235                    None,
4236                    Some("[::]:0".parse().unwrap()),
4237                    None,
4238                ),
4239            )
4240            .unwrap();
4241        let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
4242
4243        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
4244        let (client, _client_handle) = client_proc.client("client").unwrap();
4245
4246        // Resolve the user proc via MeshAdminAgent. HostMeshAgent
4247        // returns Error for QueryChild → fallback to proc_agent[0]
4248        // QueryChild(Addr::Proc) → live NodeProperties::Proc.
4249        let user_proc_ref = user_proc.proc_addr().to_string();
4250        let resp = admin_ref
4251            .resolve(&client, user_proc_ref.clone())
4252            .await
4253            .unwrap();
4254        let node = resp.0.unwrap();
4255        assert!(
4256            matches!(node.properties, NodeProperties::Proc { .. }),
4257            "expected Proc, got {:?}",
4258            node.properties
4259        );
4260        let initial_count = node.children.len();
4261        assert!(
4262            node.children
4263                .iter()
4264                .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
4265            "initial children {:?} should contain proc_agent",
4266            node.children
4267        );
4268
4269        // Spawn an actor directly on the user proc, bypassing gspawn.
4270        // This simulates how sieve[0] spawns sieve[1], sieve[2], etc.
4271        user_proc
4272            .spawn("extra_actor", TestIntrospectableActor)
4273            .unwrap();
4274
4275        // Resolve again — the new actor must appear immediately
4276        // without any republish, proving PA-1 is satisfied.
4277        let resp2 = admin_ref
4278            .resolve(&client, user_proc_ref.clone())
4279            .await
4280            .unwrap();
4281        let node2 = resp2.0.unwrap();
4282        assert!(
4283            matches!(node2.properties, NodeProperties::Proc { .. }),
4284            "expected Proc, got {:?}",
4285            node2.properties
4286        );
4287        assert!(
4288            node2
4289                .children
4290                .iter()
4291                .any(|c| c.to_string().contains("extra_actor")),
4292            "after direct spawn, children {:?} should contain extra_actor",
4293            node2.children
4294        );
4295        assert!(
4296            node2.children.len() > initial_count,
4297            "expected at least {} children after direct spawn, got {:?}",
4298            initial_count + 1,
4299            node2.children
4300        );
4301    }
4302
4303    // -- pyspy bridge input validation tests --
4304    //
4305    // Tests for the v1 proc-reference strictness contract (see
4306    // introspect module doc): the py-spy bridge accepts only
4307    // ProcAddr-form references and rejects other forms as bad_request.
4308
4309    #[test]
4310    fn pyspy_parse_empty_reference() {
4311        // v1 contract: empty input → bad_request.
4312        let err = parse_proc_reference("").unwrap_err();
4313        assert_eq!(err.code, "bad_request");
4314        assert!(err.message.contains("empty"));
4315    }
4316
4317    #[test]
4318    fn pyspy_parse_slash_only() {
4319        // v1 contract: slash-only (axum wildcard artifact) → bad_request.
4320        let err = parse_proc_reference("/").unwrap_err();
4321        assert_eq!(err.code, "bad_request");
4322        assert!(err.message.contains("empty"));
4323    }
4324
4325    #[test]
4326    fn pyspy_parse_malformed_percent_encoding() {
4327        // v1 contract: malformed encoding → bad_request.
4328        // %FF%FE is not valid UTF-8.
4329        let err = parse_proc_reference("%FF%FE").unwrap_err();
4330        assert_eq!(err.code, "bad_request");
4331        assert!(err.message.contains("percent-encoding"));
4332    }
4333
4334    #[test]
4335    fn pyspy_parse_invalid_proc_id() {
4336        // v1 contract: non-ProcAddr reference → bad_request.
4337        let err = parse_proc_reference("not-a-valid-proc-id").unwrap_err();
4338        assert_eq!(err.code, "bad_request");
4339        assert!(err.message.contains("invalid proc reference"));
4340    }
4341
4342    #[test]
4343    fn pyspy_parse_valid_proc_reference() {
4344        // v1 contract: valid ProcAddr → accepted.
4345        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4346        let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
4347        let proc_id_str = proc_id.to_string();
4348
4349        let (decoded, parsed) = parse_proc_reference(&proc_id_str).unwrap();
4350        assert_eq!(decoded, proc_id_str);
4351        assert_eq!(parsed, proc_id);
4352    }
4353
4354    #[test]
4355    fn pyspy_parse_strips_leading_slash() {
4356        // v1 contract: leading slash from axum wildcard is stripped.
4357        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4358        let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
4359        let with_slash = format!("/{}", proc_id);
4360
4361        let (_, parsed) = parse_proc_reference(&with_slash).unwrap();
4362        assert_eq!(parsed, proc_id);
4363    }
4364
4365    /// PS-12: service proc routes to HostAgent.
4366    #[test]
4367    fn route_proc_handler_service_proc_yields_host() {
4368        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4369        let proc_id = ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr), SERVICE_PROC_NAME);
4370        let handler = route_proc_handler(&proc_id.to_string()).unwrap();
4371        assert!(
4372            matches!(handler, ResolvedProcHandler::Host(_)),
4373            "service proc should resolve to Host variant"
4374        );
4375    }
4376
4377    /// PS-12: non-service proc routes to ProcAgent.
4378    #[test]
4379    fn route_proc_handler_worker_proc_yields_proc() {
4380        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4381        let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "worker_0");
4382        let handler = route_proc_handler(&proc_id.to_string()).unwrap();
4383        assert!(
4384            matches!(handler, ResolvedProcHandler::Proc(_)),
4385            "non-service proc should resolve to Proc variant"
4386        );
4387    }
4388
4389    /// PS-12: a labeled instance named "service" is still a normal proc.
4390    #[test]
4391    fn route_proc_handler_service_instance_yields_proc() {
4392        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4393        let proc_id =
4394            ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr), "service-deadbeefdeadbeef");
4395        let handler = route_proc_handler(&proc_id.to_string()).unwrap();
4396        assert!(
4397            matches!(handler, ResolvedProcHandler::Proc(_)),
4398            "service-labeled instance proc should resolve to Proc variant"
4399        );
4400    }
4401}