hyperactor_mesh/
mesh_admin.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Mesh-level admin surface for topology introspection and reference
10//! walking.
11//!
12//! This module defines `MeshAdminAgent`, an actor that exposes a
13//! uniform, reference-based HTTP API over an entire host mesh. Every
14//! addressable entity in the mesh is represented as a `NodePayload`
15//! and resolved via typed `NodeRef` references (parsed from HTTP
16//! path strings at the request boundary).
17//!
18//! Incoming HTTP requests are bridged into the actor message loop
19//! using `ResolveReferenceMessage`, ensuring that all topology
20//! resolution and data collection happens through actor messaging.
21//! The agent fans out to `HostAgent` instances to fetch host,
22//! proc, and actor details, then normalizes them into a single
23//! tree-shaped model (`NodeProperties` + children references)
24//! suitable for topology-agnostic clients such as the admin TUI.
25//!
26//! # Schema strategy
27//!
28//! The external API contract is schema-first: the JSON Schema
29//! (Draft 2020-12) served at `GET /v1/schema` is the
30//! authoritative definition of the response shape. The error
31//! envelope schema is at `GET /v1/schema/error`.
32//!
33//! Schema and OpenAPI are derived from the HTTP boundary DTO types
34//! in [`crate::introspect::dto`] (`NodePayloadDto`,
35//! `NodePropertiesDto`, `FailureInfoDto`) via
36//! `schemars::JsonSchema`. The domain types (`NodePayload`,
37//! `NodeProperties`, `FailureInfo`) do not carry `JsonSchema` —
38//! they own the typed internal model; the DTOs own the wire
39//! contract.
40//!
41//! This follows the "Admin Gateway Pattern" RFC
42//! ([doc](https://fburl.com/1dvah88uutaiyesebojouen2)):
43//! schema is the product; transports and tooling are projections.
44//!
45//! ## Schema generation pipeline
46//!
47//! 1. `#[derive(JsonSchema)]` on `NodePayloadDto`,
48//!    `NodePropertiesDto`, `FailureInfoDto`, `ApiError`,
49//!    `ApiErrorEnvelope`.
50//! 2. `schemars::schema_for!(T)` produces a `Schema` value at
51//!    runtime (Draft 2020-12).
52//! 3. The `serve_schema` / `serve_error_schema` handlers inject a
53//!    `$id` field (SC-4) and serve the result as JSON.
54//! 4. Snapshot tests in `introspect::tests` compare the raw
55//!    schemars output (without `$id`) against checked-in golden
56//!    files to detect drift (SC-2).
57//! 5. Validation tests construct domain payloads, convert to DTOs,
58//!    and confirm the serialized DTOs pass schema validation
59//!    (SC-3).
60//!
61//! ## Regenerating snapshots
62//!
63//! After intentional changes to the DTO types
64//! (`NodePayloadDto`, `NodePropertiesDto`, `FailureInfoDto`),
65//! `ApiError`, or `ApiErrorEnvelope`, regenerate the golden
66//! files:
67//!
68//! ```sh
69//! buck run fbcode//monarch/hyperactor_mesh:generate_api_artifacts \
70//!   @fbcode//mode/dev-nosan -- \
71//!   fbcode/monarch/hyperactor_mesh/src/testdata
72//! ```
73//!
74//! Or via cargo:
75//!
76//! ```sh
77//! cargo run -p hyperactor_mesh --bin generate_api_artifacts -- \
78//!   hyperactor_mesh/src/testdata
79//! ```
80//!
81//! Then re-run tests to confirm the new snapshot passes.
82//!
83//! ## Schema invariants (SC-*)
84//!
85//! - **SC-1 (schema-derived):** Schema is derived from the DTO
86//!   types via `schemars::JsonSchema`, not hand-written.
87//! - **SC-2 (schema-snapshot-stability):** Schema changes must
88//!   be explicit — a snapshot test catches unintentional drift.
89//! - **SC-3 (schema-payload-conformance):** Domain payloads
90//!   converted to DTOs validate against the generated schema.
91//! - **SC-4 (schema-version-identity):** Served schemas carry a
92//!   `$id` tied to the API version (e.g.
93//!   `https://monarch.meta.com/schemas/v1/node_payload`).
94//! - **SC-5 (route-precedence):** Literal schema routes are
95//!   matched by specificity before the `{*reference}` wildcard
96//!   (axum 0.8 specificity-based routing).
97//!
98//! Note on `ApiError.details`: the derived schema is maximally
99//! permissive for `details` (any valid JSON). This is intentional
100//! for v1 — `details` is a domain-specific escape hatch.
101//! Consumers must not assume a fixed shape.
102//!
103//! # Introspection visibility policy
104//!
105//! Admin tooling only displays **introspectable** nodes: entities
106//! that are reachable via actor messaging and respond to
107//! [`IntrospectMessage`]. Infrastructure procs that are
108//! **non-routable** are intentionally **opaque** to introspection and
109//! are omitted from the navigation graph.
110//!
111//! ## Definitions
112//!
113//! **Routable** — an entity is routable if the system can address it
114//! via the routing layer and successfully deliver a message to it
115//! using a `Reference` / `ActorId` (i.e., there exists a live mailbox
116//! sender reachable through normal routing). Practical test: "can I
117//! send `IntrospectMessage::Query` to it and get a reply?"
118//!
119//! **Non-routable** — an entity is non-routable if it has no
120//! externally reachable mailbox sender in the routing layer, so
121//! message delivery is impossible by construction (even if you know
122//! its name). Examples: `hyperactor_runtime[0]`, `mailbox_server[N]`,
123//! `local[N]` — these use `PanickingMailboxSender` and are never
124//! bound to the router.
125//!
126//! **Introspectable** — tooling can obtain a `NodePayload` for this
127//! node by sending `IntrospectMessage` to a routable actor.
128//!
129//! **Opaque** — the node exists but is not introspectable via
130//! messaging; tooling cannot observe it through the introspection
131//! protocol.
132//!
133//! ## Proc visibility
134//!
135//! A proc is not directly introspected; actors are. Tooling
136//! synthesizes proc-level nodes by grouping introspectable actors by
137//! `ProcId`.
138//!
139//! A proc is visible iff there exists at least one actor on that proc
140//! whose `ActorId` is deliverable via the routing layer (i.e., the
141//! actor has a bound mailbox sender reachable through normal routing)
142//! and responds to `IntrospectMessage`.
143//!
144//! The rule is: **if an entity is routable via the mesh routing layer
145//! (i.e., tooling can deliver `IntrospectMessage::Query` to one of its
146//! actors), then it is introspectable and appears in the admin graph.**
147//!
148//! ## Navigation identity invariants (NI-*)
149//!
150//! Every `NodePayload` in the topology tree satisfies:
151//!
152//! - **NI-1 (identity = reference):** A node's `identity: NodeRef`
153//!   must correspond to the reference used to resolve it. The
154//!   display form of `identity` round-trips through `NodeRef::from_str`.
155//!
156//! - **NI-2 (parent = containment parent):** A node's
157//!   `parent: Option<NodeRef>` records its canonical containment
158//!   parent, not the inverse of every navigation edge. Specifically:
159//!   root → `None`, host → `Root`, proc → `Host(…)`,
160//!   actor → `Proc(…)`. An actor's parent is always its owning proc,
161//!   even when the actor also appears as a child of another actor via
162//!   supervision.
163//!
164//! - **NI-3 (children = navigation graph):** A node's `children`
165//!   is the admin navigation graph. Actor-to-actor supervision links
166//!   coexist with proc→actor membership links without changing
167//!   `parent`. The same actor may therefore appear in `children` of
168//!   both its proc and its supervising actor.
169//!
170//! Together these ensure that the TUI can correlate responses to tree
171//! nodes, and that upward/downward navigation is consistent.
172//!
173//! ## Link-classification invariants (LC-*)
174//!
175//! These describe which nodes emit `system_children` and
176//! `stopped_children` classification sets, and what those sets
177//! contain.
178//!
179//! - **LC-1 (root system_children empty):** Root payloads always
180//!   emit `system_children: vec![]`. Root children are host nodes,
181//!   which are not classified as system.
182//!
183//! - **LC-2 (host system_children empty):** Host payloads always
184//!   emit `system_children: vec![]`. Host children are procs, which
185//!   are not classified as system — only actors carry the system
186//!   classification.
187//!
188//! - **LC-3 (proc system_children subset):** Proc payloads emit
189//!   `system_children ⊆ children`, containing only `NodeRef::Actor`
190//!   refs where `cell.is_system()` is true.
191//!
192//! - **LC-4 (proc stopped_children subset):** Proc payloads emit
193//!   `stopped_children ⊆ children`, containing only
194//!   `NodeRef::Actor` refs for terminated actors retained for
195//!   post-mortem inspection.
196//!
197//! - **LC-5 (actor/error no classification sets):** Actor and Error
198//!   payloads do not carry `system_children` or `stopped_children`.
199//!
200//! ## Proc-resolution invariants (SP-*)
201//!
202//! When a proc reference is resolved, the returned `NodePayload`
203//! satisfies:
204//!
205//! - **SP-1 (identity):** The identity matches the ProcId reference
206//!   from the parent's children list.
207//! - **SP-2 (properties):** The properties are `NodeProperties::Proc`.
208//! - **SP-3 (parent):** The parent is `NodeRef::Host(actor_id)`.
209//! - **SP-4 (as_of):** The `as_of` field is present and valid
210//!   (internally `SystemTime`; serialized as ISO 8601 string over
211//!   the HTTP JSON API per HB-1).
212//!
213//! Enforced by `test_system_proc_identity`.
214//!
215//! ## Proc-agent invariants (PA-*)
216//!
217//! - **PA-1 (live children):** Proc-node children used by admin/TUI
218//!   must be derived from live proc state at query time. No
219//!   additional publish event is required for a newly spawned actor
220//!   to appear.
221//!
222//! Enforced by `test_proc_children_reflect_directly_spawned_actors`.
223//!
224//! ## Robustness invariant (MA-R1)
225//!
226//! - **MA-R1 (no-crash):** `MeshAdminAgent` must never crash the OS
227//!   process it resides in. Every handler catches errors and converts
228//!   them into structured error payloads
229//!   (`ResolveReferenceResponse(Err(..))`, `NodeProperties::Error`,
230//!   etc.) rather than propagating panics or unwinding. Failed reply
231//!   sends (the caller went away) are silently swallowed.
232//!
233//! ## TLS transport invariant (MA-T1)
234//!
235//! - **MA-T1 (tls):** At Meta (`fbcode_build`), the admin HTTP
236//!   server **requires** mutual TLS. At startup it probes for
237//!   certificates via `try_tls_acceptor` with client cert
238//!   enforcement enabled. If no usable certificate bundle is found,
239//!   `init()` returns an error — no plain HTTP fallback. In OSS,
240//!   TLS is best-effort with plain HTTP fallback.
241//!
242//! - **MA-T2 (scheme-in-url):** The URL returned by `GetAdminAddr`
243//!   is always `https://host:port` or `http://host:port`, never a
244//!   bare `host:port`. All callers receive and use this full URL
245//!   directly.
246//!
247//! ## Client host invariants (CH-*)
248//!
249//! Let **A** denote the aggregated host set (the union of hosts
250//! from all meshes passed to [`host_mesh::spawn_admin`],
251//! deduplicated by `HostAgent` `ActorId` — see SA-3), and let
252//! **C** denote the process-global singleton client host mesh in
253//! the caller process (whose local proc hosts the root client
254//! actor).
255//!
256//! - **CH-1 (deduplication):** When C ∈ A, the client host appears
257//!   exactly once in the admin host list (deduplicated by `HostAgent`
258//!   `ActorId` identity). When C ∉ A, `spawn_admin` includes C
259//!   alongside A's hosts so the admin introspects C as a normal host
260//!   subtree, not as a standalone proc.
261//!
262//! - **CH-2 (reachability):** In both cases, the root client actor
263//!   is reachable through the standard host → proc → actor walk.
264//!
265//! - **CH-3 (ordering):** C must be initialized before
266//!   `spawn_admin` executes. In Rust, calling `context()` /
267//!   `this_host()` / `this_proc()` triggers `GLOBAL_CONTEXT`
268//!   bootstrap, which initializes C. In Python, `bootstrap_host()`
269//!   calls `register_client_host()` before any actor code runs.
270//!   Either path ensures C is available by the time `spawn_admin`
271//!   reads it via `try_this_host()`. Any refactor must preserve
272//!   this ordering.
273//!
274//! - **CH-4 (runtime-agnostic client-host discovery):** `spawn_admin`
275//!   discovers C via `try_this_host()`, which checks two sources
276//!   in order: the Rust `GLOBAL_CONTEXT` (initialized via
277//!   `context()` / `this_host()` / `this_proc()`) and the
278//!   externally registered client host (set by
279//!   `register_client_host()` from Python's `bootstrap_host()`).
280//!   Aggregation logic must not branch on which source provided C.
281//!
282//! **Mechanism:** [`host_mesh::spawn_admin`] aggregates hosts from
283//! all input meshes (SA-3), reads C from the caller process (via
284//! `try_this_host()`), merges it with the aggregated set (SA-6),
285//! deduplicates by `HostAgent` `ActorId`, and spawns the
286//! `MeshAdminAgent` on the caller's local proc via
287//! `cx.instance().proc().spawn(...)`. Placement now follows the
288//! caller context rather than mesh topology.
289//!
290//! ## Spawn/aggregation invariants (SA-*)
291//!
292//! [`host_mesh::spawn_admin`] aggregates hosts from one or more
293//! meshes into a single admin host set.
294//!
295//! - **SA-1 (non-empty mesh set):** The input must yield at least
296//!   one mesh.
297//! - **SA-2 (non-empty hosts):** Every input mesh must contain at
298//!   least one host.
299//! - **SA-3 (host-agent identity dedup):** The admin host set is
300//!   the ordered union of host agents from all input meshes,
301//!   deduplicated by `HostAgent` `ActorId` in first-seen order.
302//! - **SA-4 (single-mesh degeneracy):** `spawn_admin([mesh], ...)`
303//!   is behaviorally equivalent to the former `mesh.spawn_admin(...)`.
304//!   Established by existing single-mesh integration tests (e.g.
305//!   `dining_philosophers`); no dedicated unit test.
306//! - **SA-5 (caller-local placement):** The admin is spawned on the
307//!   caller's local proc — the `Proc` of the actor context passed to
308//!   `spawn_admin()`. In common remote launch flows, the caller is
309//!   typically the root client/control process.
310//! - **SA-6 (client-host merge after aggregation):** Client-host
311//!   inclusion/dedup (CH-1) operates on the already-aggregated host
312//!   set, not per-mesh independently.
313//!
314//! ## MAST resolution (disabled)
315//!
316//! `mast_conda:///` resolution is disabled. The old topology-based
317//! resolution assumed the admin lived on the first mesh head host,
318//! which is no longer true after SA-5 changed to caller-local
319//! placement. All resolution paths now return explicit errors.
320//! A publication-based discovery mechanism will replace this in a
321//! future change. Until then, discover the admin URL from
322//! startup output or another launch-time publication.
323//!
324//! ## Admin self-identification invariants (AI-*)
325//!
326//! - **AI-1 (live identity):** `GET /v1/admin` returns the live
327//!   admin actor identity as `AdminInfo`.
328//! - **AI-2 (reported proc):** `proc_id` reports the hosting proc.
329//!   Placement equality (SA-5) is proved by unit tests; integration
330//!   tests validate that `proc_id` is populated and well-formed.
331//! - **AI-3 (url consistency):** `url` matches `GetAdminAddr`.
332//!
333//! The relationship between `host` and `url` (formerly AI-4) is
334//! now a constructor guarantee of [`AdminInfo::new`] rather than a
335//! live invariant. It is not in this registry.
336
337use std::collections::HashMap;
338use std::io;
339use std::sync::Arc;
340use std::time::Duration;
341
342use async_trait::async_trait;
343use axum::Json;
344use axum::Router;
345use axum::extract::Path as AxumPath;
346use axum::extract::State;
347use axum::http::StatusCode;
348use axum::response::IntoResponse;
349use axum::routing::get;
350use axum::routing::post;
351use hyperactor::Actor;
352use hyperactor::ActorHandle;
353use hyperactor::Context;
354use hyperactor::HandleClient;
355use hyperactor::Handler;
356use hyperactor::Instance;
357use hyperactor::RefClient;
358use hyperactor::channel::try_tls_acceptor;
359use hyperactor::host::SERVICE_PROC_NAME;
360use hyperactor::introspect::IntrospectMessage;
361use hyperactor::introspect::IntrospectResult;
362use hyperactor::introspect::IntrospectView;
363use hyperactor::mailbox::open_once_port;
364use hyperactor::reference as hyperactor_reference;
365use serde::Deserialize;
366use serde::Serialize;
367use serde_json::Value;
368use tokio::net::TcpListener;
369use tokio_rustls::TlsAcceptor;
370use typeuri::Named;
371
372use crate::config_dump::ConfigDump;
373use crate::config_dump::ConfigDumpResult;
374use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
375use crate::host_mesh::host_agent::HostAgent;
376use crate::introspect::NodePayload;
377use crate::introspect::NodeProperties;
378use crate::introspect::dto::NodePayloadDto;
379use crate::introspect::to_node_payload;
380use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
381use crate::pyspy::PySpyDump;
382use crate::pyspy::PySpyOpts;
383use crate::pyspy::PySpyResult;
384
385/// Send an `IntrospectMessage` to an actor and receive the reply.
386/// Encapsulates open_once_port + send + timeout + error handling.
387async fn query_introspect(
388    cx: &hyperactor::Context<'_, MeshAdminAgent>,
389    actor_id: &hyperactor_reference::ActorId,
390    view: hyperactor::introspect::IntrospectView,
391    timeout: Duration,
392    err_ctx: &str,
393) -> Result<IntrospectResult, anyhow::Error> {
394    let introspect_port =
395        hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
396    let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
397    let mut reply_ref = reply_handle.bind();
398    reply_ref.return_undeliverable(false);
399    introspect_port.send(
400        cx,
401        IntrospectMessage::Query {
402            view,
403            reply: reply_ref,
404        },
405    )?;
406    tokio::time::timeout(timeout, reply_rx.recv())
407        .await
408        .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
409        .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
410}
411
412/// Send an `IntrospectMessage::QueryChild` to an actor.
413async fn query_child_introspect(
414    cx: &hyperactor::Context<'_, MeshAdminAgent>,
415    actor_id: &hyperactor_reference::ActorId,
416    child_ref: hyperactor_reference::Reference,
417    timeout: Duration,
418    err_ctx: &str,
419) -> Result<IntrospectResult, anyhow::Error> {
420    let introspect_port =
421        hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
422    let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
423    let mut reply_ref = reply_handle.bind();
424    reply_ref.return_undeliverable(false);
425    introspect_port.send(
426        cx,
427        IntrospectMessage::QueryChild {
428            child_ref,
429            reply: reply_ref,
430        },
431    )?;
432    tokio::time::timeout(timeout, reply_rx.recv())
433        .await
434        .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
435        .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
436}
437
438/// Actor name used when spawning the mesh admin agent.
439pub const MESH_ADMIN_ACTOR_NAME: &str = "mesh_admin";
440
441/// Actor name for the HTTP bridge client mailbox on the service proc.
442///
443/// Unlike `MESH_ADMIN_ACTOR_NAME`, this is not a full actor: it is a
444/// client-mode `Instance<()>` created via
445/// `Proc::introspectable_instance()` and driven by Axum's Tokio task
446/// pool rather than an actor message loop. A separate instance is
447/// required because `MeshAdminAgent`'s own `Instance<Self>` is only
448/// accessible inside its message loop and cannot be shared with
449/// external tasks. This instance gives the HTTP handlers a routable
450/// proc identity so they can open one-shot reply ports
451/// (`open_once_port`) to receive responses from `MeshAdminAgent`.
452///
453/// Unlike a plain `instance()`, this uses
454/// `Proc::introspectable_instance()` so the bridge responds to
455/// `IntrospectMessage::Query` and appears as a navigable node in the
456/// mesh TUI rather than causing a 504 when selected.
457pub const MESH_ADMIN_BRIDGE_NAME: &str = "mesh_admin_bridge";
458
459/// Structured error response following the gateway RFC envelope
460/// pattern.
461#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
462pub struct ApiError {
463    /// Machine-readable error code (e.g. "not_found", "bad_request").
464    pub code: String,
465    /// Human-readable error message.
466    pub message: String,
467    /// Additional context about the error. Schema is permissive
468    /// (any valid JSON) — `details` is a domain-specific escape
469    /// hatch. Do not assume a fixed shape.
470    #[serde(skip_serializing_if = "Option::is_none")]
471    pub details: Option<Value>,
472}
473
474/// Wrapper for the structured error envelope.
475#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
476pub struct ApiErrorEnvelope {
477    pub error: ApiError,
478}
479
480impl ApiError {
481    /// Create a "not_found" error.
482    pub fn not_found(message: impl Into<String>, details: Option<Value>) -> Self {
483        Self {
484            code: "not_found".to_string(),
485            message: message.into(),
486            details,
487        }
488    }
489
490    /// Create a "bad_request" error.
491    pub fn bad_request(message: impl Into<String>, details: Option<Value>) -> Self {
492        Self {
493            code: "bad_request".to_string(),
494            message: message.into(),
495            details,
496        }
497    }
498}
499
500impl IntoResponse for ApiError {
501    fn into_response(self) -> axum::response::Response {
502        let status = match self.code.as_str() {
503            "not_found" => StatusCode::NOT_FOUND,
504            "bad_request" => StatusCode::BAD_REQUEST,
505            "gateway_timeout" => StatusCode::GATEWAY_TIMEOUT,
506            "service_unavailable" => StatusCode::SERVICE_UNAVAILABLE,
507            _ => StatusCode::INTERNAL_SERVER_ERROR,
508        };
509        let envelope = ApiErrorEnvelope { error: self };
510        (status, Json(envelope)).into_response()
511    }
512}
513
514/// Response payload for `MeshAdminMessage::GetAdminAddr`.
515///
516/// `addr` is `None` until the admin HTTP server has successfully
517/// bound a listening socket during `MeshAdminAgent::init`.
518#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
519pub struct MeshAdminAddrResponse {
520    pub addr: Option<String>,
521}
522wirevalue::register_type!(MeshAdminAddrResponse);
523
524/// Messages handled by the `MeshAdminAgent`.
525///
526/// These are mesh-admin control-plane queries (as opposed to topology
527/// resolution). They’re wirevalue-serializable and come with
528/// generated client/ref helpers via `HandleClient`/`RefClient`.
529#[derive(
530    Debug,
531    Clone,
532    PartialEq,
533    Serialize,
534    Deserialize,
535    Handler,
536    HandleClient,
537    RefClient,
538    Named
539)]
540pub enum MeshAdminMessage {
541    /// Return the HTTP admin server address that this agent bound in
542    /// `init`.
543    ///
544    /// The reply contains `None` if the server hasn't started yet.
545    GetAdminAddr {
546        #[reply]
547        reply: hyperactor_reference::OncePortRef<MeshAdminAddrResponse>,
548    },
549}
550wirevalue::register_type!(MeshAdminMessage);
551
552/// Newtype wrapper around `Result<NodePayload, String>` for the
553/// resolve reply port (`OncePortRef` requires `Named`).
554#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
555pub struct ResolveReferenceResponse(pub Result<NodePayload, String>);
556wirevalue::register_type!(ResolveReferenceResponse);
557
558/// Message for resolving a reference (string from HTTP path) into a
559/// `NodePayload`.
560///
561/// This is the primary “navigation” request used by the admin HTTP
562/// bridge: the caller provides a reference (e.g. `"root"`, a `ProcId`
563/// string, or an `ActorId` string) and the `MeshAdminAgent` returns a
564/// uniformly shaped `NodePayload` plus child references to continue
565/// walking the topology.
566///
567/// The work happens inside the admin actor's message loop so
568/// resolution can:
569/// - parse and validate the reference format,
570/// - dispatch to the right host/proc/actor via existing admin
571///   queries, and
572/// - return a structured payload without blocking HTTP handlers on
573///   mesh logic.
574#[derive(
575    Debug,
576    Clone,
577    PartialEq,
578    Serialize,
579    Deserialize,
580    Handler,
581    HandleClient,
582    RefClient,
583    Named
584)]
585pub enum ResolveReferenceMessage {
586    /// Resolve `reference_string` to a `NodePayload`.
587    ///
588    /// On success the reply contains `payload=Some(..), error=None`; on failure
589    /// it contains `payload=None, error=Some(..)`.
590    Resolve {
591        /// Reference string from the HTTP path, parsed into a typed
592        /// `NodeRef` at the resolve boundary.
593        reference_string: String,
594        /// Reply port receiving the resolution result.
595        #[reply]
596        reply: hyperactor_reference::OncePortRef<ResolveReferenceResponse>,
597    },
598}
599wirevalue::register_type!(ResolveReferenceMessage);
600
601/// Actor that serves a mesh-level admin HTTP endpoint.
602///
603/// `MeshAdminAgent` is the mesh-wide aggregation point for
604/// introspection: it holds `hyperactor_reference::ActorRef<HostAgent>` handles for each
605/// host, and answers admin queries by forwarding targeted requests to
606/// the appropriate host agent and assembling a uniform `NodePayload`
607/// response for the client.
608///
609/// The agent also exposes an HTTP server (spawned from `init`) and
610/// supports reference-based navigation (`GET /v1/{reference}`) by
611/// resolving HTTP path references into typed `NodePayload` values
612/// plus child references.
613#[hyperactor::export(handlers = [MeshAdminMessage, ResolveReferenceMessage])]
614pub struct MeshAdminAgent {
615    /// Map of host address string → `HostAgent` reference used to
616    /// fan out our target admin queries.
617    hosts: HashMap<String, hyperactor_reference::ActorRef<HostAgent>>,
618
619    /// Reverse index: `HostAgent` `ActorId` → host address
620    /// string.
621    ///
622    /// The host agent itself is an actor that can appear in multiple
623    /// places (e.g., as a host node and as a child actor under a
624    /// system proc). This index lets reference resolution treat that
625    /// `ActorId` as a *Host* node (via `resolve_host_node`) rather
626    /// than a generic *Actor* node, avoiding cycles / dropped nodes
627    /// in clients like the TUI.
628    host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String>,
629
630    /// `ActorId` of the process-global root client (`client[0]` on
631    /// the singleton Host's `local_proc`), exposed as a first-class
632    /// child of the root node. Routable and introspectable via the
633    /// blanket `Handler<IntrospectMessage>`.
634    root_client_actor_id: Option<hyperactor_reference::ActorId>,
635
636    /// This agent's own `ActorId`, captured during `init`. Used to
637    /// include the admin proc as a visible node in the introspection
638    /// tree (the principle: "if you can send it a message, you can
639    /// introspect it").
640    self_actor_id: Option<hyperactor_reference::ActorId>,
641
642    // -- HTTP server address fields --
643    //
644    // The admin HTTP server has three address representations:
645    //
646    //   1. `admin_addr_override` — caller-supplied bind address
647    //      (constructor param). When `None`, `init` reads
648    //      `MESH_ADMIN_ADDR` from config instead.
649    //
650    //   2. `admin_addr` — the actual `SocketAddr` the OS assigned
651    //      after `TcpListener::bind`. Populated during `init`.
652    //
653    //   3. `admin_host` — human-friendly URL with the machine
654    //      hostname (not the raw IP) so it works with DNS and TLS
655    //      certificate validation. Returned via `GetAdminAddr`.
656    /// Caller-supplied bind address. When `None`, `init` reads
657    /// `MESH_ADMIN_ADDR` from config.
658    admin_addr_override: Option<std::net::SocketAddr>,
659
660    /// Actual bound address after `TcpListener::bind`, populated
661    /// during `init`.
662    admin_addr: Option<std::net::SocketAddr>,
663
664    /// Hostname-based URL (e.g. `"https://myhost.facebook.com:1729"`)
665    /// for the admin HTTP server. Returned via `GetAdminAddr`.
666    admin_host: Option<String>,
667
668    /// Base URL of the Monarch dashboard. Passed at construction.
669    /// Used by proxy routes that forward requests to the dashboard's
670    /// `/api/*` endpoints.
671    telemetry_url: Option<String>,
672
673    /// When the mesh was started (ISO-8601 timestamp).
674    started_at: String,
675
676    /// Username who started the mesh.
677    started_by: String,
678}
679
680impl MeshAdminAgent {
681    /// Construct a `MeshAdminAgent` from a list of `(host_addr,
682    /// host_agent_ref)` pairs and an optional root client `ActorId`.
683    ///
684    /// Builds both:
685    /// - `hosts`: the forward map used to route admin queries to the
686    ///   correct `HostAgent`, and
687    /// - `host_agents_by_actor_id`: a reverse index used during
688    ///   reference resolution to recognize host-agent `ActorId`s and
689    ///   resolve them as `NodeProperties::Host` rather than as
690    ///   generic actors.
691    ///
692    /// When `root_client_actor_id` is `Some`, the root client appears
693    /// as a first-class child of the root node in the introspection
694    /// tree.
695    ///
696    /// The HTTP listen address is initialized to `None` and populated
697    /// during `init()` after the server socket is bound.
698    pub fn new(
699        hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
700        root_client_actor_id: Option<hyperactor_reference::ActorId>,
701        admin_addr: Option<std::net::SocketAddr>,
702        telemetry_url: Option<String>,
703    ) -> Self {
704        let host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String> = hosts
705            .iter()
706            .map(|(addr, agent_ref)| (agent_ref.actor_id().clone(), addr.clone()))
707            .collect();
708
709        // Capture start time and username
710        let started_at = chrono::Utc::now().to_rfc3339();
711        let started_by = std::env::var("USER")
712            .or_else(|_| std::env::var("USERNAME"))
713            .unwrap_or_else(|_| "unknown".to_string());
714
715        Self {
716            hosts: hosts.into_iter().collect(),
717            host_agents_by_actor_id,
718            root_client_actor_id,
719            self_actor_id: None,
720            admin_addr_override: admin_addr,
721            admin_addr: None,
722            admin_host: None,
723            telemetry_url,
724            started_at,
725            started_by,
726        }
727    }
728}
729
730impl std::fmt::Debug for MeshAdminAgent {
731    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
732        f.debug_struct("MeshAdminAgent")
733            .field("hosts", &self.hosts.keys().collect::<Vec<_>>())
734            .field("host_agents", &self.host_agents_by_actor_id.len())
735            .field("root_client_actor_id", &self.root_client_actor_id)
736            .field("self_actor_id", &self.self_actor_id)
737            .field("admin_addr", &self.admin_addr)
738            .field("admin_host", &self.admin_host)
739            .field("started_at", &self.started_at)
740            .field("started_by", &self.started_by)
741            .finish()
742    }
743}
744
745/// Self-identification payload returned by `GET /v1/admin`.
746///
747/// Construct via [`AdminInfo::new`]. AI-1, AI-2, AI-3 are live
748/// invariants. The relationship between `host` and `url` is a
749/// constructor guarantee — `AdminInfo::new()` rejects URLs with no
750/// host, so `host` always derives from `url` at construction.
751#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema)]
752pub struct AdminInfo {
753    /// Stringified `ActorId` of the `MeshAdminAgent`.
754    pub actor_id: String,
755    /// Stringified `ProcId` of the proc hosting `MeshAdminAgent`.
756    pub proc_id: String,
757    /// Hostname the admin HTTP server bound on (derived from `url`).
758    pub host: String,
759    /// Full admin URL (e.g. `"https://myhost.facebook.com:1729"`).
760    pub url: String,
761}
762
763impl AdminInfo {
764    /// Construct from identity components and a full admin URL.
765    ///
766    /// Parses `url` strictly using the `url` crate. Returns an error
767    /// if the URL is invalid or has no host component. `host` is
768    /// derived from the parsed URL — the relationship between `host`
769    /// and `url` holds by construction, not by test.
770    pub fn new(actor_id: String, proc_id: String, url: String) -> anyhow::Result<Self> {
771        let parsed = url::Url::parse(&url)
772            .map_err(|e| anyhow::anyhow!("invalid admin URL '{}': {}", url, e))?;
773        let host = parsed
774            .host_str()
775            .ok_or_else(|| anyhow::anyhow!("admin URL '{}' has no host", url))?
776            .to_string();
777        Ok(Self {
778            actor_id,
779            proc_id,
780            host,
781            url,
782        })
783    }
784}
785
786/// Shared state for the reference-based `/v1/{*reference}` bridge
787/// route.
788///
789/// The HTTP handler itself is intentionally thin and does not perform
790/// any routing logic. Instead, it forwards each request into the
791/// `MeshAdminAgent` actor via `ResolveReferenceMessage`, ensuring
792/// resolution happens inside the actor message loop (with access to
793/// actor messaging, timeouts, and indices).
794struct BridgeState {
795    /// Reference to the `MeshAdminAgent` actor that performs
796    /// reference resolution.
797    admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent>,
798    /// Dedicated client mailbox on system_proc for HTTP bridge reply
799    /// ports. Using a separate `Instance<()>` avoids sharing the
800    /// actor's own mailbox with the HTTP bridge and ensures the
801    /// bridge context is routable via system_proc's frontend address.
802    // Previous approach used `this.clone_for_py()` which cloned the
803    // admin actor's Instance:
804    //   bridge_cx: Instance<MeshAdminAgent>,
805    bridge_cx: Instance<()>,
806    /// Limits the number of in-flight resolve requests to prevent
807    /// introspection queries from overwhelming the shared tokio
808    /// runtime and starving user actor workloads.
809    resolve_semaphore: tokio::sync::Semaphore,
810    /// Keep the handle alive so the bridge mailbox is not dropped.
811    _bridge_handle: ActorHandle<()>,
812    /// Base URL of the Monarch dashboard (e.g.
813    /// `"http://localhost:5000"`). Passed from `MeshAdminAgent` at
814    /// init time. Used by proxy routes that forward requests to the
815    /// dashboard's `/api/*` endpoints.
816    telemetry_url: Option<String>,
817    /// Shared HTTP client for outbound proxy requests to the
818    /// dashboard. Reuses connection pool across requests.
819    http_client: reqwest::Client,
820    /// Self-identification metadata, populated during admin init.
821    admin_info: AdminInfo,
822}
823
824/// A TCP listener that performs a TLS handshake on each accepted
825/// connection before handing it to axum.
826///
827/// Implements [`axum::serve::Listener`] so it can be passed directly
828/// to [`axum::serve`].  Per the trait contract, `accept` handles
829/// errors internally (logging + retrying) and never returns `Err`.
830struct TlsListener {
831    tcp: TcpListener,
832    acceptor: TlsAcceptor,
833}
834
835impl axum::serve::Listener for TlsListener {
836    type Io = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
837    type Addr = std::net::SocketAddr;
838
839    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
840        loop {
841            let (stream, addr) = match self.tcp.accept().await {
842                Ok(conn) => conn,
843                Err(e) => {
844                    tracing::warn!("TCP accept error: {}", e);
845                    continue;
846                }
847            };
848
849            match self.acceptor.accept(stream).await {
850                Ok(tls_stream) => return (tls_stream, addr),
851                Err(e) => {
852                    tracing::warn!("TLS handshake failed from {}: {}", addr, e);
853                    continue;
854                }
855            }
856        }
857    }
858
859    fn local_addr(&self) -> io::Result<Self::Addr> {
860        self.tcp.local_addr()
861    }
862}
863
864#[async_trait]
865impl Actor for MeshAdminAgent {
866    /// Initializes the mesh admin agent and its HTTP server.
867    ///
868    /// 1. Binds well-known message ports (`proc.spawn()` does not
869    ///    call `bind()` — unlike `gspawn` — so the actor must do it
870    ///    itself before becoming reachable).
871    /// 2. Binds a TCP listener (ephemeral or fixed port).
872    /// 3. Builds a TLS acceptor (explicit env vars, then Meta default
873    ///    paths). At Meta (`fbcode_build`), mTLS is mandatory and
874    ///    init fails if no certs are found. In OSS, falls back to
875    ///    plain HTTP.
876    /// 4. Creates a dedicated `Instance<()>` client mailbox on
877    ///    system_proc for the HTTP bridge's reply ports, keeping
878    ///    bridge traffic off the actor's own mailbox.
879    /// 5. Spawns the axum server in a background task (HTTPS with
880    ///    mTLS at Meta, HTTPS or HTTP in OSS depending on step 3).
881    ///
882    /// The hostname-based listen address is stored in `admin_host` so
883    /// it can be returned via `GetAdminAddr`. The scheme (`https://`
884    /// or `http://`) is included so clients know which protocol to
885    /// use.
886    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
887        // Bind well-known ports before the HTTP server is spawned, so
888        // messages (including Undeliverable bounces) can be delivered
889        // as soon as the admin is reachable.
890        this.bind::<Self>();
891        this.set_system();
892        self.self_actor_id = Some(this.self_id().clone());
893
894        let bind_addr = match self.admin_addr_override {
895            Some(addr) => addr,
896            None => hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR)
897                .parse_socket_addr()
898                .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?,
899        };
900        let listener = TcpListener::bind(bind_addr).await?;
901        let bound_addr = listener.local_addr()?;
902        // Report the hostname (e.g. Tupperware container name) + port
903        // rather than a raw IP, so the address works with DNS and TLS
904        // certificate validation.
905        let host = hostname::get()
906            .unwrap_or_else(|_| "localhost".into())
907            .into_string()
908            .unwrap_or_else(|_| "localhost".to_string());
909        self.admin_addr = Some(bound_addr);
910
911        // At Meta: mTLS is mandatory — fail if no certs are found.
912        // In OSS: TLS is best-effort with plain HTTP fallback.
913        // See MA-T1 in module doc.
914        let enforce_mtls = cfg!(fbcode_build);
915        let tls_acceptor = try_tls_acceptor(enforce_mtls);
916
917        if enforce_mtls && tls_acceptor.is_none() {
918            return Err(anyhow::anyhow!(
919                "mesh admin requires mTLS but no TLS certificates found; \
920                 set HYPERACTOR_TLS_CERT/KEY/CA or ensure Meta cert paths exist \
921                 (/var/facebook/x509_identities/server.pem, /var/facebook/rootcanal/ca.pem)"
922            ));
923        }
924
925        let scheme = if tls_acceptor.is_some() {
926            "https"
927        } else {
928            "http"
929        };
930        self.admin_host = Some(format!("{}://{}:{}", scheme, host, bound_addr.port()));
931
932        // Create a dedicated client mailbox on system_proc for the
933        // HTTP bridge's reply ports. This avoids sharing the admin
934        // actor's own mailbox with async HTTP handlers.
935        let (bridge_cx, bridge_handle) = this
936            .proc()
937            .introspectable_instance(MESH_ADMIN_BRIDGE_NAME)?;
938        bridge_cx.set_system();
939        let admin_url = self
940            .admin_host
941            .clone()
942            .unwrap_or_else(|| "unknown".to_string());
943        let bridge_state = Arc::new(BridgeState {
944            admin_ref: hyperactor_reference::ActorRef::attest(this.self_id().clone()),
945            bridge_cx,
946            resolve_semaphore: tokio::sync::Semaphore::new(hyperactor_config::global::get(
947                crate::config::MESH_ADMIN_MAX_CONCURRENT_RESOLVES,
948            )),
949            _bridge_handle: bridge_handle,
950            telemetry_url: self.telemetry_url.clone(),
951            http_client: reqwest::Client::new(),
952            admin_info: AdminInfo::new(
953                this.self_id().to_string(),
954                this.self_id().proc_id().to_string(),
955                admin_url,
956            )?,
957        });
958        let router = create_mesh_admin_router(bridge_state);
959
960        if let Some(acceptor) = tls_acceptor {
961            let tls_listener = TlsListener {
962                tcp: listener,
963                acceptor,
964            };
965            tokio::spawn(async move {
966                if let Err(e) = axum::serve(tls_listener, router).await {
967                    tracing::error!("mesh admin server (mTLS) error: {}", e);
968                }
969            });
970        } else {
971            // OSS fallback: plain HTTP (only reachable when !fbcode_build).
972            tokio::spawn(async move {
973                if let Err(e) = axum::serve(listener, router).await {
974                    tracing::error!("mesh admin server error: {}", e);
975                }
976            });
977        }
978
979        tracing::info!(
980            "mesh admin server listening on {}",
981            self.admin_host.as_deref().unwrap_or("unknown")
982        );
983        Ok(())
984    }
985
986    /// Swallow undeliverable message bounces instead of crashing.
987    ///
988    /// The admin agent sends `IntrospectMessage` to actors that may
989    /// not have the introspection port bound (e.g. actors spawned
990    /// via `cx.spawn()` whose `#[export]` list does not include it).
991    /// When the message cannot be delivered, the routing layer
992    /// bounces an `Undeliverable` back to the sender. The default
993    /// `Actor::handle_undeliverable_message` calls `bail!()`, which
994    /// would kill this admin agent and — via supervision cascade —
995    /// take down the entire admin process with `exit(1)`.
996    ///
997    /// Since the admin agent is best-effort infrastructure, an
998    /// undeliverable introspection probe is not a fatal error.
999    async fn handle_undeliverable_message(
1000        &mut self,
1001        _cx: &Instance<Self>,
1002        hyperactor::mailbox::Undeliverable(envelope): hyperactor::mailbox::Undeliverable<
1003            hyperactor::mailbox::MessageEnvelope,
1004        >,
1005    ) -> Result<(), anyhow::Error> {
1006        tracing::debug!(
1007            "admin agent: undeliverable message to {} (port not bound?), ignoring",
1008            envelope.dest(),
1009        );
1010        Ok(())
1011    }
1012}
1013
1014/// Manual Handler impl — swallows `reply.send()` failures so the
1015/// admin agent stays alive when the HTTP caller disconnects.
1016#[async_trait]
1017impl Handler<MeshAdminMessage> for MeshAdminAgent {
1018    /// Dispatches `MeshAdminMessage` variants.
1019    ///
1020    /// Reply-send failures are swallowed because a dropped receiver
1021    /// (e.g. the HTTP bridge timed out) is not an error — the caller
1022    /// simply went away. Propagating the failure would crash the admin
1023    /// agent and take down the entire process.
1024    async fn handle(
1025        &mut self,
1026        cx: &Context<Self>,
1027        msg: MeshAdminMessage,
1028    ) -> Result<(), anyhow::Error> {
1029        match msg {
1030            MeshAdminMessage::GetAdminAddr { reply } => {
1031                let resp = MeshAdminAddrResponse {
1032                    addr: self.admin_host.clone(),
1033                };
1034                if let Err(e) = reply.send(cx, resp) {
1035                    tracing::debug!("GetAdminAddr reply failed (caller gone?): {e}");
1036                }
1037            }
1038        }
1039        Ok(())
1040    }
1041}
1042
1043/// Manual Handler impl — swallows `reply.send()` failures so the
1044/// admin agent stays alive when the HTTP caller disconnects.
1045#[async_trait]
1046impl Handler<ResolveReferenceMessage> for MeshAdminAgent {
1047    /// Dispatches `ResolveReferenceMessage` variants.
1048    ///
1049    /// The inner `resolve_reference` call never returns `Err` to the
1050    /// handler — failures are captured in the response payload.
1051    /// Reply-send failures are swallowed for the same reason as
1052    /// `MeshAdminMessage`: a dropped receiver means the caller (HTTP
1053    /// bridge) went away, which must not crash the admin agent.
1054    async fn handle(
1055        &mut self,
1056        cx: &Context<Self>,
1057        msg: ResolveReferenceMessage,
1058    ) -> Result<(), anyhow::Error> {
1059        match msg {
1060            ResolveReferenceMessage::Resolve {
1061                reference_string,
1062                reply,
1063            } => {
1064                let response = ResolveReferenceResponse(
1065                    self.resolve_reference(cx, &reference_string)
1066                        .await
1067                        .map_err(|e| format!("{:#}", e)),
1068                );
1069                if let Err(e) = reply.send(cx, response) {
1070                    tracing::debug!("Resolve reply failed (caller gone?): {e}");
1071                }
1072            }
1073        }
1074        Ok(())
1075    }
1076}
1077
1078impl MeshAdminAgent {
1079    /// Core resolver for the reference-based admin API.
1080    ///
1081    /// Parses the caller-provided `reference_string` (or handles the
1082    /// special `"root"` case), then dispatches to
1083    /// `resolve_host_node`, `resolve_proc_node`, or
1084    /// `resolve_actor_node` to assemble a fully-populated
1085    /// `NodePayload` (properties + child references).
1086    ///
1087    /// The returned payload satisfies the **navigation identity
1088    /// invariant** (see module docs): `payload.identity ==
1089    /// reference_string`, and `payload.parent` equals the identity of
1090    /// the node this one appears under.
1091    ///
1092    /// Note: this returns `Err` for internal use; the public
1093    /// `resolve` handler converts failures into
1094    /// `ResolveReferenceResponse(Err(..))` so the actor never crashes
1095    /// on
1096    /// lookup errors.
1097    async fn resolve_reference(
1098        &self,
1099        cx: &Context<'_, Self>,
1100        reference_string: &str,
1101    ) -> Result<NodePayload, anyhow::Error> {
1102        let node_ref: crate::introspect::NodeRef = reference_string
1103            .parse()
1104            .map_err(|e| anyhow::anyhow!("invalid reference '{}': {}", reference_string, e))?;
1105
1106        match &node_ref {
1107            crate::introspect::NodeRef::Root => Ok(self.build_root_payload()),
1108            crate::introspect::NodeRef::Host(actor_id) => {
1109                self.resolve_host_node(cx, actor_id).await
1110            }
1111            crate::introspect::NodeRef::Proc(proc_id) => {
1112                match self.resolve_proc_node(cx, proc_id).await {
1113                    Ok(payload) => Ok(payload),
1114                    Err(_) if self.standalone_proc_anchor(proc_id).is_some() => {
1115                        self.resolve_standalone_proc_node(cx, proc_id).await
1116                    }
1117                    Err(e) => Err(e),
1118                }
1119            }
1120            crate::introspect::NodeRef::Actor(actor_id) => {
1121                self.resolve_actor_node(cx, actor_id).await
1122            }
1123        }
1124    }
1125
1126    /// Returns the known actors on standalone procs — procs not
1127    /// managed by any host but whose actors are routable and
1128    /// introspectable. Each proc appears as a root child; the
1129    /// actor is the "anchor" used to discover the proc's contents.
1130    ///
1131    /// The root client is no longer standalone: spawn_admin registers
1132    /// C (the bootstrap host) as a normal host entry (A/C invariant).
1133    fn standalone_proc_actors(&self) -> impl Iterator<Item = &hyperactor_reference::ActorId> {
1134        std::iter::empty()
1135    }
1136
1137    /// If `proc_id` belongs to a standalone proc, return the anchor
1138    /// actor on that proc. Returns `None` for host-managed procs.
1139    fn standalone_proc_anchor(
1140        &self,
1141        proc_id: &hyperactor_reference::ProcId,
1142    ) -> Option<&hyperactor_reference::ActorId> {
1143        self.standalone_proc_actors()
1144            .find(|actor_id| *actor_id.proc_id() == *proc_id)
1145    }
1146
1147    /// Returns true if `actor_id` lives on a standalone proc.
1148    fn is_standalone_proc_actor(&self, actor_id: &hyperactor_reference::ActorId) -> bool {
1149        self.standalone_proc_actors()
1150            .any(|a| *a.proc_id() == *actor_id.proc_id())
1151    }
1152
1153    /// Construct the synthetic root node for the reference tree.
1154    ///
1155    /// The root is not a real actor/proc; it's a convenience node
1156    /// that anchors navigation. Its children are `NodeRef::Host`
1157    /// entries for each configured `HostAgent`.
1158    fn build_root_payload(&self) -> NodePayload {
1159        use crate::introspect::NodeRef;
1160
1161        let children: Vec<NodeRef> = self
1162            .hosts
1163            .values()
1164            .map(|agent| NodeRef::Host(agent.actor_id().clone()))
1165            .collect();
1166        let system_children: Vec<NodeRef> = Vec::new(); // LC-1
1167        let mut attrs = hyperactor_config::Attrs::new();
1168        attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
1169        attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
1170        if let Ok(t) = humantime::parse_rfc3339(&self.started_at) {
1171            attrs.set(crate::introspect::STARTED_AT, t);
1172        }
1173        attrs.set(crate::introspect::STARTED_BY, self.started_by.clone());
1174        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1175        let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1176        NodePayload {
1177            identity: NodeRef::Root,
1178            properties: crate::introspect::derive_properties(&attrs_json),
1179            children,
1180            parent: None,
1181            as_of: std::time::SystemTime::now(),
1182        }
1183    }
1184
1185    /// Resolve a `HostAgent` actor reference into a host-level
1186    /// `NodePayload`.
1187    ///
1188    /// Sends `IntrospectMessage::Query` directly to the
1189    /// `HostAgent`, which returns a `NodePayload` with
1190    /// `NodeProperties::Host` and the host's children. The resolver
1191    /// overrides `parent` to `"root"` since the host agent
1192    /// doesn't know its position in the navigation tree.
1193    async fn resolve_host_node(
1194        &self,
1195        cx: &Context<'_, Self>,
1196        actor_id: &hyperactor_reference::ActorId,
1197    ) -> Result<NodePayload, anyhow::Error> {
1198        let result = query_introspect(
1199            cx,
1200            actor_id,
1201            hyperactor::introspect::IntrospectView::Entity,
1202            hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1203            "querying host agent",
1204        )
1205        .await?;
1206        Ok(crate::introspect::to_node_payload_with(
1207            result,
1208            crate::introspect::NodeRef::Host(actor_id.clone()),
1209            Some(crate::introspect::NodeRef::Root),
1210        ))
1211    }
1212
1213    /// Resolve a `ProcId` reference into a proc-level `NodePayload`.
1214    ///
1215    /// First tries `IntrospectMessage::QueryChild` against the owning
1216    /// `HostAgent` (which recognizes service and local procs). If
1217    /// that returns an error payload, falls back to `ProcAgent` for
1218    /// user procs by querying
1219    /// `QueryChild(hyperactor_reference::Reference::Proc(proc_id))`
1220    /// on `<proc_id>/proc_agent[0]`.
1221    ///
1222    /// See PA-1 in module doc.
1223    async fn resolve_proc_node(
1224        &self,
1225        cx: &Context<'_, Self>,
1226        proc_id: &hyperactor_reference::ProcId,
1227    ) -> Result<NodePayload, anyhow::Error> {
1228        let host_addr = proc_id.addr().to_string();
1229
1230        let agent = self
1231            .hosts
1232            .get(&host_addr)
1233            .ok_or_else(|| anyhow::anyhow!("host not found: {}", host_addr))?;
1234
1235        // Try the host agent's QueryChild first.
1236        let result = query_child_introspect(
1237            cx,
1238            agent.actor_id(),
1239            hyperactor_reference::Reference::Proc(proc_id.clone()),
1240            hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1241            "querying proc details",
1242        )
1243        .await?;
1244
1245        // If the host recognized the proc, normalize identity and parent.
1246        // The host's QueryChild returns IntrospectRef::Actor(self_id) as
1247        // parent, which lifts to NodeRef::Actor. We need NodeRef::Host.
1248        let payload = crate::introspect::to_node_payload_with(
1249            result,
1250            crate::introspect::NodeRef::Proc(proc_id.clone()),
1251            Some(crate::introspect::NodeRef::Host(agent.actor_id().clone())),
1252        );
1253        if !matches!(payload.properties, NodeProperties::Error { .. }) {
1254            return Ok(payload);
1255        }
1256
1257        // Fall back to querying the ProcAgent directly (user procs).
1258        let mesh_agent_id = proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0);
1259        let result = query_child_introspect(
1260            cx,
1261            &mesh_agent_id,
1262            hyperactor_reference::Reference::Proc(proc_id.clone()),
1263            hyperactor_config::global::get(crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT),
1264            "querying proc mesh agent",
1265        )
1266        .await?;
1267
1268        Ok(crate::introspect::to_node_payload_with(
1269            result,
1270            crate::introspect::NodeRef::Proc(proc_id.clone()),
1271            Some(crate::introspect::NodeRef::Host(agent.actor_id().clone())),
1272        ))
1273    }
1274
1275    /// Resolve a standalone proc into a proc-level `NodePayload`.
1276    ///
1277    /// Standalone procs (e.g. the admin proc) are not managed by any
1278    /// `HostAgent`, so
1279    /// `resolve_proc_node` cannot resolve them. Instead, we query the
1280    /// anchor actor on the proc for its introspection data, collect
1281    /// its supervision children, and build a synthetic proc node.
1282    ///
1283    /// Special case: when the anchor actor is this agent itself, we
1284    /// build the children list directly (just `[self]`) to avoid a
1285    /// self-deadlock — the actor loop cannot process an
1286    /// `IntrospectMessage` it sends to itself while handling a
1287    /// resolve request.
1288    async fn resolve_standalone_proc_node(
1289        &self,
1290        cx: &Context<'_, Self>,
1291        proc_id: &hyperactor_reference::ProcId,
1292    ) -> Result<NodePayload, anyhow::Error> {
1293        let actor_id = self
1294            .standalone_proc_anchor(proc_id)
1295            .ok_or_else(|| anyhow::anyhow!("no anchor actor for standalone proc {}", proc_id))?;
1296
1297        use crate::introspect::NodeRef;
1298
1299        let (children, system_children) = if self.self_actor_id.as_ref() == Some(actor_id) {
1300            let self_ref = NodeRef::Actor(actor_id.clone());
1301            (vec![self_ref.clone()], vec![self_ref])
1302        } else {
1303            let actor_result = query_introspect(
1304                cx,
1305                actor_id,
1306                hyperactor::introspect::IntrospectView::Actor,
1307                hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1308                &format!("querying anchor actor on {}", proc_id),
1309            )
1310            .await?;
1311            let actor_payload = to_node_payload(actor_result);
1312            let anchor_ref = NodeRef::Actor(actor_id.clone());
1313            let anchor_is_system = matches!(
1314                &actor_payload.properties,
1315                NodeProperties::Actor {
1316                    is_system: true,
1317                    ..
1318                }
1319            );
1320
1321            let mut children = vec![anchor_ref.clone()];
1322            let mut system_children = Vec::new();
1323            if anchor_is_system {
1324                system_children.push(anchor_ref);
1325            }
1326
1327            for child_ref in actor_payload.children {
1328                let child_actor_id = match &child_ref {
1329                    NodeRef::Actor(id) => Some(id),
1330                    _ => None,
1331                };
1332                if let Some(child_actor_id) = child_actor_id {
1333                    let child_is_system = if let Ok(r) = query_introspect(
1334                        cx,
1335                        child_actor_id,
1336                        hyperactor::introspect::IntrospectView::Actor,
1337                        hyperactor_config::global::get(
1338                            crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1339                        ),
1340                        "querying child actor is_system",
1341                    )
1342                    .await
1343                    {
1344                        let p = to_node_payload(r);
1345                        matches!(
1346                            &p.properties,
1347                            NodeProperties::Actor {
1348                                is_system: true,
1349                                ..
1350                            }
1351                        )
1352                    } else {
1353                        false
1354                    };
1355                    if child_is_system {
1356                        system_children.push(child_ref.clone());
1357                    }
1358                }
1359                children.push(child_ref);
1360            }
1361            (children, system_children)
1362        };
1363
1364        let proc_name = proc_id.name().to_string();
1365
1366        let mut attrs = hyperactor_config::Attrs::new();
1367        attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
1368        attrs.set(crate::introspect::PROC_NAME, proc_name.clone());
1369        attrs.set(crate::introspect::NUM_ACTORS, children.len());
1370        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1371        let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1372
1373        Ok(NodePayload {
1374            identity: NodeRef::Proc(proc_id.clone()),
1375            properties: crate::introspect::derive_properties(&attrs_json),
1376            children,
1377            as_of: std::time::SystemTime::now(),
1378            parent: Some(NodeRef::Root),
1379        })
1380    }
1381
1382    /// Resolve a non-host-agent `ActorId` reference into an
1383    /// actor-level `NodePayload`.
1384    ///
1385    /// Sends `IntrospectMessage::Query` directly to the target actor
1386    /// via `PortRef::attest_message_port`. The blanket handler
1387    /// returns a `NodePayload` with `NodeProperties::Actor` (or a
1388    /// domain-specific override like `NodeProperties::Proc` for
1389    /// `ProcAgent`).
1390    ///
1391    /// The resolver sets `parent` based on the actor's position
1392    /// in the topology: if the actor lives in a system proc, the
1393    /// parent is the system proc ref; otherwise it's the proc's
1394    /// `ProcId` string.
1395    async fn resolve_actor_node(
1396        &self,
1397        cx: &Context<'_, Self>,
1398        actor_id: &hyperactor_reference::ActorId,
1399    ) -> Result<NodePayload, anyhow::Error> {
1400        // Self-resolution: we cannot send IntrospectMessage to our
1401        // own actor loop while handling a resolve request (deadlock).
1402        // Use introspect_payload() to snapshot our own state
1403        // directly.
1404        let result = if self.self_actor_id.as_ref() == Some(actor_id) {
1405            cx.introspect_payload()
1406        } else if self.is_standalone_proc_actor(actor_id) {
1407            // Standalone procs have no ProcAgent — query directly.
1408            query_introspect(
1409                cx,
1410                actor_id,
1411                hyperactor::introspect::IntrospectView::Actor,
1412                hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1413                &format!("querying actor {}", actor_id),
1414            )
1415            .await?
1416        } else {
1417            // Check terminated snapshots first — fast, no ambiguity.
1418            let proc_id = actor_id.proc_id();
1419            let mesh_agent_id = proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0);
1420            let terminated = query_child_introspect(
1421                cx,
1422                &mesh_agent_id,
1423                hyperactor_reference::Reference::Actor(actor_id.clone()),
1424                hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1425                "querying terminated snapshot",
1426            )
1427            .await
1428            .ok()
1429            .filter(|r| {
1430                let p = crate::introspect::derive_properties(&r.attrs);
1431                !matches!(p, NodeProperties::Error { .. })
1432            });
1433
1434            match terminated {
1435                Some(snapshot) => snapshot,
1436                None => {
1437                    // Not terminated — query the live actor.
1438                    query_introspect(
1439                        cx,
1440                        actor_id,
1441                        hyperactor::introspect::IntrospectView::Actor,
1442                        hyperactor_config::global::get(
1443                            crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1444                        ),
1445                        &format!("querying actor {}", actor_id),
1446                    )
1447                    .await?
1448                }
1449            }
1450        };
1451        let mut payload = to_node_payload(result);
1452
1453        if self.is_standalone_proc_actor(actor_id) {
1454            payload.parent = Some(crate::introspect::NodeRef::Proc(actor_id.proc_id().clone()));
1455            return Ok(payload);
1456        }
1457
1458        let proc_id = actor_id.proc_id();
1459        match &payload.properties {
1460            NodeProperties::Proc { .. } => {
1461                let host_addr = proc_id.addr().to_string();
1462                if let Some(agent) = self.hosts.get(&host_addr) {
1463                    payload.parent =
1464                        Some(crate::introspect::NodeRef::Host(agent.actor_id().clone()));
1465                }
1466            }
1467            _ => {
1468                payload.parent = Some(crate::introspect::NodeRef::Proc(proc_id.clone()));
1469            }
1470        }
1471
1472        Ok(payload)
1473    }
1474}
1475
1476/// Build the Axum router for the mesh admin HTTP server.
1477///
1478/// Routes:
1479/// - `GET /v1/schema` — JSON Schema (Draft 2020-12) for `NodePayload`.
1480/// - `GET /v1/schema/error` — JSON Schema for `ApiErrorEnvelope`.
1481/// - `GET /v1/openapi.json` — OpenAPI 3.1 spec (embeds JSON Schemas).
1482/// - `GET /v1/tree` — ASCII topology dump.
1483/// - `POST /v1/query` — proxy SQL query to the dashboard server.
1484/// - `GET /v1/pyspy/{*proc_reference}` — py-spy stack dump for a proc.
1485/// - `POST /v1/pyspy_dump/{*proc_reference}` — py-spy dump + store in Datafusion.
1486/// - `GET /v1/config/{*proc_reference}` — config snapshot for a proc.
1487/// - `GET /v1/admin` — admin self-identification (`AdminInfo`).
1488/// - `GET /v1/{*reference}` — JSON `NodePayload` for a single reference.
1489/// - `GET /SKILL.md` — agent-facing API documentation (markdown).
1490fn create_mesh_admin_router(bridge_state: Arc<BridgeState>) -> Router {
1491    Router::new()
1492        .route("/SKILL.md", get(serve_skill_md))
1493        // Literal paths matched by specificity before wildcard (SC-5).
1494        .route("/v1/admin", get(serve_admin_info))
1495        .route("/v1/schema", get(serve_schema))
1496        .route("/v1/schema/admin", get(serve_admin_schema))
1497        .route("/v1/schema/error", get(serve_error_schema))
1498        .route("/v1/openapi.json", get(serve_openapi))
1499        .route("/v1/tree", get(tree_dump))
1500        .route("/v1/query", post(query_proxy))
1501        .route("/v1/pyspy/{*proc_reference}", get(pyspy_bridge))
1502        .route(
1503            "/v1/pyspy_dump/{*proc_reference}",
1504            post(pyspy_dump_and_store),
1505        )
1506        .route("/v1/config/{*proc_reference}", get(config_bridge))
1507        .route("/v1/{*reference}", get(resolve_reference_bridge))
1508        .with_state(bridge_state)
1509}
1510
1511/// Raw markdown template for the SKILL.md API document.
1512const SKILL_MD_TEMPLATE: &str = include_str!("mesh_admin_skill.md");
1513
1514/// Extract base URL from request headers.
1515///
1516/// Defaults to `https` when `x-forwarded-proto` is absent — the
1517/// admin server uses TLS in production, so `http` is the wrong
1518/// default for direct connections.
1519fn extract_base_url(headers: &axum::http::HeaderMap) -> String {
1520    let host = headers
1521        .get(axum::http::header::HOST)
1522        .and_then(|v| v.to_str().ok())
1523        .unwrap_or("localhost");
1524    let scheme = headers
1525        .get("x-forwarded-proto")
1526        .and_then(|v| v.to_str().ok())
1527        .unwrap_or("https");
1528    format!("{scheme}://{host}")
1529}
1530
1531/// Self-identification endpoint: returns `AdminInfo` (AI-1..AI-3;
1532/// AI-4 is a constructor guarantee of `AdminInfo::new()`).
1533async fn serve_admin_info(
1534    State(state): State<Arc<BridgeState>>,
1535) -> axum::response::Json<AdminInfo> {
1536    axum::response::Json(state.admin_info.clone())
1537}
1538
1539/// JSON Schema for `AdminInfo`.
1540async fn serve_admin_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1541    Ok(axum::response::Json(schema_with_id::<AdminInfo>(
1542        "https://monarch.meta.com/schemas/v1/admin_info",
1543    )?))
1544}
1545
1546/// Serves the self-describing API document with the base URL
1547/// interpolated so examples are copy-pasteable.
1548async fn serve_skill_md(headers: axum::http::HeaderMap) -> impl axum::response::IntoResponse {
1549    let base = extract_base_url(&headers);
1550    let body = SKILL_MD_TEMPLATE.replace("{base}", &base);
1551    (
1552        [(
1553            axum::http::header::CONTENT_TYPE,
1554            "text/markdown; charset=utf-8",
1555        )],
1556        body,
1557    )
1558}
1559
1560/// Build a JSON Schema value with a `$id` field.
1561fn schema_with_id<T: schemars::JsonSchema>(id: &str) -> Result<serde_json::Value, ApiError> {
1562    let schema = schemars::schema_for!(T);
1563    let mut value = serde_json::to_value(schema).map_err(|e| ApiError {
1564        code: "internal_error".to_string(),
1565        message: format!("failed to serialize schema: {e}"),
1566        details: None,
1567    })?;
1568    if let Some(obj) = value.as_object_mut() {
1569        obj.insert("$id".into(), serde_json::Value::String(id.into()));
1570    }
1571    Ok(value)
1572}
1573
1574/// JSON Schema for the `NodePayload` response type.
1575async fn serve_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1576    Ok(axum::response::Json(schema_with_id::<NodePayloadDto>(
1577        "https://monarch.meta.com/schemas/v1/node_payload",
1578    )?))
1579}
1580
1581/// JSON Schema for the `ApiErrorEnvelope` error response.
1582async fn serve_error_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1583    Ok(axum::response::Json(schema_with_id::<ApiErrorEnvelope>(
1584        "https://monarch.meta.com/schemas/v1/error",
1585    )?))
1586}
1587
1588/// Hoist `$defs` from a schemars-generated schema into a shared
1589/// map and rewrite internal `$ref` pointers from `#/$defs/X` to
1590/// `#/components/schemas/X` so OpenAPI tools can resolve them.
1591fn hoist_defs(
1592    schema: &mut serde_json::Value,
1593    shared: &mut serde_json::Map<String, serde_json::Value>,
1594) {
1595    if let Some(obj) = schema.as_object_mut() {
1596        if let Some(defs) = obj.remove("$defs") {
1597            if let Some(defs_map) = defs.as_object() {
1598                for (k, v) in defs_map {
1599                    shared.insert(k.clone(), v.clone());
1600                }
1601            }
1602        }
1603        // Also remove $schema from embedded schemas — it's
1604        // only valid at the root of a JSON Schema document,
1605        // not inside an OpenAPI components/schemas entry.
1606        obj.remove("$schema");
1607    }
1608    rewrite_refs(schema);
1609}
1610
1611/// Recursively rewrite `$ref: "#/$defs/X"` →
1612/// `$ref: "#/components/schemas/X"`.
1613fn rewrite_refs(value: &mut serde_json::Value) {
1614    match value {
1615        serde_json::Value::Object(map) => {
1616            if let Some(serde_json::Value::String(r)) = map.get_mut("$ref") {
1617                if r.starts_with("#/$defs/") {
1618                    *r = r.replace("#/$defs/", "#/components/schemas/");
1619                }
1620            }
1621            for v in map.values_mut() {
1622                rewrite_refs(v);
1623            }
1624        }
1625        serde_json::Value::Array(arr) => {
1626            for v in arr {
1627                rewrite_refs(v);
1628            }
1629        }
1630        _ => {}
1631    }
1632}
1633
1634/// Build the OpenAPI 3.1 spec, embedding schemars-derived JSON
1635/// Schemas into `components/schemas`.
1636pub fn build_openapi_spec() -> serde_json::Value {
1637    let mut node_schema = serde_json::to_value(schemars::schema_for!(NodePayloadDto))
1638        .expect("NodePayload schema must be serializable");
1639    let mut error_schema = serde_json::to_value(schemars::schema_for!(ApiErrorEnvelope))
1640        .expect("ApiErrorEnvelope schema must be serializable");
1641    let mut pyspy_schema = serde_json::to_value(schemars::schema_for!(PySpyResult))
1642        .expect("PySpyResult schema must be serializable");
1643    let mut query_request_schema = serde_json::to_value(schemars::schema_for!(QueryRequest))
1644        .expect("QueryRequest schema must be serializable");
1645    let mut query_response_schema = serde_json::to_value(schemars::schema_for!(QueryResponse))
1646        .expect("QueryResponse schema must be serializable");
1647    let mut pyspy_dump_response_schema =
1648        serde_json::to_value(schemars::schema_for!(PyspyDumpAndStoreResponse))
1649            .expect("PyspyDumpAndStoreResponse schema must be serializable");
1650    let mut admin_info_schema = serde_json::to_value(schemars::schema_for!(AdminInfo))
1651        .expect("AdminInfo schema must be serializable");
1652
1653    // Hoist $defs into a shared components/schemas map so
1654    // OpenAPI tools can resolve references.
1655    let mut shared_schemas = serde_json::Map::new();
1656    hoist_defs(&mut node_schema, &mut shared_schemas);
1657    hoist_defs(&mut error_schema, &mut shared_schemas);
1658    hoist_defs(&mut pyspy_schema, &mut shared_schemas);
1659    hoist_defs(&mut query_request_schema, &mut shared_schemas);
1660    hoist_defs(&mut query_response_schema, &mut shared_schemas);
1661    hoist_defs(&mut pyspy_dump_response_schema, &mut shared_schemas);
1662    hoist_defs(&mut admin_info_schema, &mut shared_schemas);
1663    shared_schemas.insert("NodePayload".into(), node_schema);
1664    shared_schemas.insert("ApiErrorEnvelope".into(), error_schema);
1665    shared_schemas.insert("PySpyResult".into(), pyspy_schema);
1666    shared_schemas.insert("QueryRequest".into(), query_request_schema);
1667    shared_schemas.insert("QueryResponse".into(), query_response_schema);
1668    shared_schemas.insert(
1669        "PyspyDumpAndStoreResponse".into(),
1670        pyspy_dump_response_schema,
1671    );
1672    shared_schemas.insert("AdminInfo".into(), admin_info_schema);
1673
1674    // Rewrite any remaining $defs refs in the hoisted component schemas.
1675    for value in shared_schemas.values_mut() {
1676        rewrite_refs(value);
1677    }
1678
1679    let error_response = |desc: &str| -> serde_json::Value {
1680        serde_json::json!({
1681            "description": desc,
1682            "content": {
1683                "application/json": {
1684                    "schema": { "$ref": "#/components/schemas/ApiErrorEnvelope" }
1685                }
1686            }
1687        })
1688    };
1689
1690    let success_payload = serde_json::json!({
1691        "description": "Resolved NodePayload",
1692        "content": {
1693            "application/json": {
1694                "schema": { "$ref": "#/components/schemas/NodePayload" }
1695            }
1696        }
1697    });
1698
1699    let mut spec = serde_json::json!({
1700        "openapi": "3.1.0",
1701        "info": {
1702            "title": "Monarch Mesh Admin API",
1703            "version": "1.0.0",
1704            "description": "Reference-walking introspection API for a Monarch actor mesh. See the Admin Gateway Pattern RFC."
1705        },
1706        "paths": {
1707            "/v1/root": {
1708                "get": {
1709                    "summary": "Fetch root node",
1710                    "operationId": "getRoot",
1711                    "responses": {
1712                        "200": success_payload,
1713                        "500": error_response("Internal error"),
1714                        "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1715                        "504": error_response("Gateway timeout (downstream host unresponsive)")
1716                    }
1717                }
1718            },
1719            "/v1/{reference}": {
1720                "get": {
1721                    "summary": "Resolve a reference to a NodePayload",
1722                    "operationId": "resolveReference",
1723                    "parameters": [{
1724                        "name": "reference",
1725                        "in": "path",
1726                        "required": true,
1727                        "description": "URL-encoded opaque reference string",
1728                        "schema": { "type": "string" }
1729                    }],
1730                    "responses": {
1731                        "200": success_payload,
1732                        "400": error_response("Bad request (malformed reference)"),
1733                        "404": error_response("Reference not found"),
1734                        "500": error_response("Internal error"),
1735                        "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1736                        "504": error_response("Gateway timeout (downstream host unresponsive)")
1737                    }
1738                }
1739            },
1740            "/v1/schema": {
1741                "get": {
1742                    "summary": "JSON Schema for NodePayload (Draft 2020-12)",
1743                    "operationId": "getSchema",
1744                    "responses": {
1745                        "200": {
1746                            "description": "JSON Schema document",
1747                            "content": { "application/json": {} }
1748                        }
1749                    }
1750                }
1751            },
1752            "/v1/schema/error": {
1753                "get": {
1754                    "summary": "JSON Schema for ApiErrorEnvelope (Draft 2020-12)",
1755                    "operationId": "getErrorSchema",
1756                    "responses": {
1757                        "200": {
1758                            "description": "JSON Schema document",
1759                            "content": { "application/json": {} }
1760                        }
1761                    }
1762                }
1763            },
1764            "/v1/admin": {
1765                "get": {
1766                    "summary": "Admin self-identification (placement, identity, URL)",
1767                    "operationId": "getAdminInfo",
1768                    "description": "Returns the admin actor's identity, proc placement, hostname, and URL. Used for placement verification and operational discovery.",
1769                    "responses": {
1770                        "200": {
1771                            "description": "AdminInfo — admin actor placement metadata",
1772                            "content": {
1773                                "application/json": {
1774                                    "schema": { "$ref": "#/components/schemas/AdminInfo" }
1775                                }
1776                            }
1777                        }
1778                    }
1779                }
1780            },
1781            "/v1/tree": {
1782                "get": {
1783                    "summary": "ASCII topology dump (debug)",
1784                    "operationId": "getTree",
1785                    "responses": {
1786                        "200": {
1787                            "description": "Human-readable topology tree",
1788                            "content": { "text/plain": {} }
1789                        }
1790                    }
1791                }
1792            },
1793            "/v1/config/{proc_reference}": {
1794                "get": {
1795                    "summary": "Config snapshot for a proc",
1796                    "operationId": "getConfig",
1797                    "description": "Returns the effective CONFIG-marked configuration entries from the target process. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1798                    "parameters": [{
1799                        "name": "proc_reference",
1800                        "in": "path",
1801                        "required": true,
1802                        "description": "URL-encoded proc reference (ProcId)",
1803                        "schema": { "type": "string" }
1804                    }],
1805                    "responses": {
1806                        "200": {
1807                            "description": "ConfigDumpResult — sorted list of config entries",
1808                            "content": {
1809                                "application/json": {
1810                                    "schema": {
1811                                        "type": "object",
1812                                        "properties": {
1813                                            "entries": {
1814                                                "type": "array",
1815                                                "items": {
1816                                                    "type": "object",
1817                                                    "properties": {
1818                                                        "name": { "type": "string" },
1819                                                        "value": { "type": "string" },
1820                                                        "default_value": { "type": ["string", "null"] },
1821                                                        "source": { "type": "string" },
1822                                                        "changed_from_default": { "type": "boolean" },
1823                                                        "env_var": { "type": ["string", "null"] }
1824                                                    }
1825                                                }
1826                                            }
1827                                        }
1828                                    }
1829                                }
1830                            }
1831                        },
1832                        "404": error_response("Proc not found or handler not reachable"),
1833                        "500": error_response("Internal error"),
1834                        "504": error_response("Gateway timeout")
1835                    }
1836                }
1837            },
1838            "/v1/pyspy/{proc_reference}": {
1839                "get": {
1840                    "summary": "Py-spy stack dump for a proc",
1841                    "operationId": "getPyspy",
1842                    "description": "Runs py-spy against the target process and returns structured stack traces. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1843                    "parameters": [{
1844                        "name": "proc_reference",
1845                        "in": "path",
1846                        "required": true,
1847                        "description": "URL-encoded proc reference (ProcId)",
1848                        "schema": { "type": "string" }
1849                    }],
1850                    "responses": {
1851                        "200": {
1852                            "description": "PySpyResult — one of Ok, BinaryNotFound, or Failed",
1853                            "content": {
1854                                "application/json": {
1855                                    "schema": { "$ref": "#/components/schemas/PySpyResult" }
1856                                }
1857                            }
1858                        },
1859                        "400": error_response("Bad request (malformed proc reference)"),
1860                        "404": error_response("Proc not found or handler not reachable"),
1861                        "500": error_response("Internal error"),
1862                        "504": error_response("Gateway timeout")
1863                    }
1864                }
1865            },
1866            "/v1/query": {
1867                "post": {
1868                    "summary": "Proxy SQL query to the telemetry dashboard",
1869                    "operationId": "queryProxy",
1870                    "description": "Forwards a SQL query to the Monarch dashboard's DataFusion engine. Requires telemetry_url to be configured.",
1871                    "requestBody": {
1872                        "required": true,
1873                        "content": {
1874                            "application/json": {
1875                                "schema": { "$ref": "#/components/schemas/QueryRequest" }
1876                            }
1877                        }
1878                    },
1879                    "responses": {
1880                        "200": {
1881                            "description": "Query results",
1882                            "content": {
1883                                "application/json": {
1884                                    "schema": { "$ref": "#/components/schemas/QueryResponse" }
1885                                }
1886                            }
1887                        },
1888                        "400": error_response("Bad request (invalid SQL or missing sql field)"),
1889                        "404": error_response("Dashboard not configured"),
1890                        "500": error_response("Internal error"),
1891                        "504": error_response("Gateway timeout")
1892                    }
1893                }
1894            },
1895            "/v1/pyspy_dump/{proc_reference}": {
1896                "post": {
1897                    "summary": "Trigger py-spy dump and store in telemetry",
1898                    "operationId": "pyspyDumpAndStore",
1899                    "description": "Runs py-spy against the target process, stores the result in the dashboard's DataFusion pyspy tables, and returns the dump_id.",
1900                    "parameters": [{
1901                        "name": "proc_reference",
1902                        "in": "path",
1903                        "required": true,
1904                        "description": "URL-encoded proc reference (ProcId)",
1905                        "schema": { "type": "string" }
1906                    }],
1907                    "responses": {
1908                        "200": {
1909                            "description": "Dump stored successfully",
1910                            "content": {
1911                                "application/json": {
1912                                    "schema": { "$ref": "#/components/schemas/PyspyDumpAndStoreResponse" }
1913                                }
1914                            }
1915                        },
1916                        "400": error_response("Bad request (malformed proc reference)"),
1917                        "404": error_response("Proc or dashboard not found"),
1918                        "500": error_response("Internal error"),
1919                        "504": error_response("Gateway timeout")
1920                    }
1921                }
1922            }
1923        },
1924        "components": {
1925            "schemas": serde_json::Value::Object(shared_schemas)
1926        }
1927    });
1928
1929    // Insert /v1/schema/admin outside the json! macro to avoid
1930    // hitting the serde_json recursion limit.
1931    if let Some(paths) = spec.pointer_mut("/paths").and_then(|v| v.as_object_mut()) {
1932        paths.insert(
1933            "/v1/schema/admin".into(),
1934            serde_json::json!({
1935                "get": {
1936                    "summary": "JSON Schema for AdminInfo (Draft 2020-12)",
1937                    "operationId": "getAdminSchema",
1938                    "responses": {
1939                        "200": {
1940                            "description": "JSON Schema document",
1941                            "content": { "application/json": {} }
1942                        }
1943                    }
1944                }
1945            }),
1946        );
1947    }
1948
1949    spec
1950}
1951
1952/// OpenAPI 3.1 spec for the mesh admin API.
1953async fn serve_openapi() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1954    Ok(axum::response::Json(build_openapi_spec()))
1955}
1956
1957/// Validate and parse a raw proc reference path segment into a
1958/// decoded reference string and `ProcId`. Extracted for testability.
1959fn parse_pyspy_proc_reference(
1960    raw: &str,
1961) -> Result<(String, hyperactor_reference::ProcId), ApiError> {
1962    let trimmed = raw.trim_start_matches('/');
1963    if trimmed.is_empty() {
1964        return Err(ApiError::bad_request("empty proc reference", None));
1965    }
1966    let decoded = urlencoding::decode(trimmed)
1967        .map(|cow| cow.into_owned())
1968        .map_err(|_| {
1969            ApiError::bad_request(
1970                "malformed percent-encoding: decoded bytes are not valid UTF-8",
1971                None,
1972            )
1973        })?;
1974    let proc_id: hyperactor_reference::ProcId = decoded
1975        .parse()
1976        .map_err(|e| ApiError::bad_request(format!("invalid proc reference: {}", e), None))?;
1977    Ok((decoded, proc_id))
1978}
1979
1980/// Probe whether an actor is reachable by sending a lightweight
1981/// introspect query bounded by `MESH_ADMIN_QUERY_CHILD_TIMEOUT`.
1982///
1983/// Returns `Ok(true)` if the actor responds, `Ok(false)` if the
1984/// actor is absent or unresponsive (timeout / recv error).
1985/// Returns `Err(ApiError)` on bridge-side send failure — a real
1986/// infrastructure problem, not an absent actor.
1987async fn probe_actor(
1988    cx: &Instance<()>,
1989    agent_id: &hyperactor_reference::ActorId,
1990) -> Result<bool, ApiError> {
1991    let port = hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(agent_id);
1992    let (handle, rx) = open_once_port::<IntrospectResult>(cx);
1993    port.send(
1994        cx,
1995        IntrospectMessage::Query {
1996            view: IntrospectView::Entity,
1997            reply: handle.bind(),
1998        },
1999    )
2000    .map_err(|e| {
2001        tracing::warn!(
2002            name = "pyspy_probe_send_failed",
2003            %agent_id,
2004            error = %e,
2005        );
2006        ApiError {
2007            code: "internal_error".to_string(),
2008            message: format!("failed to send probe to {}: {}", agent_id, e),
2009            details: None,
2010        }
2011    })?;
2012
2013    let timeout = hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT);
2014    match tokio::time::timeout(timeout, rx.recv()).await {
2015        Ok(Ok(_)) => Ok(true),
2016        Ok(Err(e)) => {
2017            tracing::debug!(
2018                name = "pyspy_probe_recv_failed",
2019                %agent_id,
2020                error = %e,
2021            );
2022            Ok(false)
2023        }
2024        Err(_elapsed) => {
2025            tracing::debug!(
2026                name = "pyspy_probe_timeout",
2027                %agent_id,
2028            );
2029            Ok(false)
2030        }
2031    }
2032}
2033
2034/// Core py-spy dump logic shared by `pyspy_bridge` and
2035/// `pyspy_dump_and_store`.
2036///
2037/// Parses the proc reference, routes to the appropriate actor,
2038/// probes for reachability, sends `PySpyDump`, and returns the
2039/// result.
2040async fn do_pyspy_dump(
2041    state: &BridgeState,
2042    raw_proc_reference: &str,
2043) -> Result<PySpyResult, ApiError> {
2044    let (proc_reference, proc_id) = parse_pyspy_proc_reference(raw_proc_reference)?;
2045
2046    // PS-12: route by proc name — service proc → HostAgent, all others → ProcAgent.
2047    let agent_id = if proc_id.base_name() == SERVICE_PROC_NAME {
2048        proc_id.actor_id(HOST_MESH_AGENT_ACTOR_NAME, 0)
2049    } else {
2050        proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0)
2051    };
2052
2053    // PS-13: defensive probe — verify the target actor is reachable
2054    // before committing to the full py-spy timeout.
2055    let cx = &state.bridge_cx;
2056    if !probe_actor(cx, &agent_id).await? {
2057        return Err(ApiError::not_found(
2058            format!(
2059                "proc {} does not have a reachable py-spy handler (expected {} actor)",
2060                proc_reference,
2061                if proc_id.base_name() == SERVICE_PROC_NAME {
2062                    HOST_MESH_AGENT_ACTOR_NAME
2063                } else {
2064                    PROC_AGENT_ACTOR_NAME
2065                },
2066            ),
2067            None,
2068        ));
2069    }
2070
2071    let port = hyperactor_reference::PortRef::<PySpyDump>::attest_message_port(&agent_id);
2072    let (reply_handle, reply_rx) = open_once_port::<PySpyResult>(cx);
2073    // Mark the reply port non-returnable. Same rationale as config_bridge:
2074    // a timed-out admin client must not crash the observed actor.
2075    let mut reply_ref = reply_handle.bind();
2076    reply_ref.return_undeliverable(false);
2077    // Native frames are essential for diagnosing hangs in C
2078    // extensions and CUDA calls — the primary py-spy use case in
2079    // Monarch. These defaults match the old hyperactor_multiprocess
2080    // battle-tested diagnostics.
2081    port.send(
2082        cx,
2083        PySpyDump {
2084            opts: PySpyOpts {
2085                threads: false,
2086                native: true,
2087                native_all: true,
2088                nonblocking: false,
2089            },
2090            result: reply_ref,
2091        },
2092    )
2093    .map_err(|e| ApiError {
2094        code: "internal_error".to_string(),
2095        message: format!("failed to send PySpyDump: {}", e),
2096        details: None,
2097    })?;
2098
2099    tokio::time::timeout(
2100        hyperactor_config::global::get(crate::config::MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT),
2101        reply_rx.recv(),
2102    )
2103    .await
2104    .map_err(|_| {
2105        tracing::warn!(
2106            proc_reference = %proc_reference,
2107            "mesh admin: py-spy dump timed out (gateway_timeout)",
2108        );
2109        ApiError {
2110            code: "gateway_timeout".to_string(),
2111            message: format!("timed out waiting for py-spy dump from {}", proc_reference),
2112            details: None,
2113        }
2114    })?
2115    .map_err(|e| ApiError {
2116        code: "internal_error".to_string(),
2117        message: format!("failed to receive PySpyResult: {}", e),
2118        details: None,
2119    })
2120}
2121
2122/// HTTP bridge for py-spy stack dump requests.
2123///
2124/// Parses the proc reference, routes to the appropriate actor
2125/// (ProcAgent on worker procs, HostAgent on the service proc),
2126/// probes for reachability, and sends `PySpyDump` directly.
2127/// See PS-12, PS-13 in `introspect` module doc.
2128async fn pyspy_bridge(
2129    State(state): State<Arc<BridgeState>>,
2130    AxumPath(proc_reference): AxumPath<String>,
2131) -> Result<Json<PySpyResult>, ApiError> {
2132    Ok(Json(do_pyspy_dump(&state, &proc_reference).await?))
2133}
2134
2135/// Request body for `POST /v1/query`.
2136#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2137pub struct QueryRequest {
2138    /// SQL query string.
2139    pub sql: String,
2140}
2141
2142/// Response body from `POST /v1/query`.
2143#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2144pub struct QueryResponse {
2145    /// Query result rows.
2146    pub rows: serde_json::Value,
2147}
2148
2149/// Request body sent to the dashboard's `/api/pyspy_dump` endpoint.
2150#[derive(Debug, Serialize)]
2151struct StorePyspyDumpRequest {
2152    dump_id: String,
2153    proc_ref: String,
2154    pyspy_result_json: String,
2155}
2156
2157/// Response body from `POST /v1/pyspy_dump/{*proc_reference}`.
2158#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2159pub struct PyspyDumpAndStoreResponse {
2160    /// Unique identifier for the stored dump.
2161    pub dump_id: String,
2162}
2163
2164/// Resolve the telemetry URL from bridge state, returning an
2165/// `ApiError` if not configured.
2166fn require_telemetry_url(state: &BridgeState) -> Result<&str, ApiError> {
2167    state.telemetry_url.as_deref().ok_or_else(|| {
2168        ApiError::not_found("dashboard not configured (no telemetry_url provided)", None)
2169    })
2170}
2171
2172/// Proxy SQL queries to the Monarch dashboard's `/api/query`
2173/// endpoint.
2174///
2175/// Requires `telemetry_url` to be set. The request body must
2176/// contain a `sql` field. The dashboard response rows are returned
2177/// verbatim.
2178async fn query_proxy(
2179    State(state): State<Arc<BridgeState>>,
2180    axum::Json(body): axum::Json<QueryRequest>,
2181) -> Result<axum::Json<QueryResponse>, ApiError> {
2182    let telemetry_url = require_telemetry_url(&state)?;
2183
2184    let resp = state
2185        .http_client
2186        .post(format!("{}/api/query", telemetry_url))
2187        .json(&body)
2188        .send()
2189        .await
2190        .map_err(|e| ApiError {
2191            code: "proxy_error".to_string(),
2192            message: format!("failed to proxy query to dashboard: {}", e),
2193            details: None,
2194        })?;
2195
2196    let status = resp.status();
2197    let resp_body = resp.bytes().await.map_err(|e| ApiError {
2198        code: "proxy_error".to_string(),
2199        message: format!("failed to read dashboard response: {}", e),
2200        details: None,
2201    })?;
2202
2203    if !status.is_success() {
2204        // Try to extract error message from dashboard response.
2205        let msg = serde_json::from_slice::<serde_json::Value>(&resp_body)
2206            .ok()
2207            .and_then(|v| v.get("error")?.as_str().map(String::from))
2208            .unwrap_or_else(|| format!("dashboard returned HTTP {status}"));
2209        let code = if status.is_client_error() {
2210            "bad_request"
2211        } else {
2212            "proxy_error"
2213        };
2214        return Err(ApiError {
2215            code: code.to_string(),
2216            message: msg,
2217            details: None,
2218        });
2219    }
2220
2221    let result: QueryResponse = serde_json::from_slice(&resp_body).map_err(|e| ApiError {
2222        code: "proxy_error".to_string(),
2223        message: format!("failed to parse dashboard response: {}", e),
2224        details: None,
2225    })?;
2226
2227    Ok(axum::Json(result))
2228}
2229
2230/// Trigger a py-spy dump and store the result in the dashboard's
2231/// DataFusion pyspy tables.
2232///
2233/// 1. Performs a py-spy dump via `do_pyspy_dump` (same as
2234///    `pyspy_bridge`).
2235/// 2. POSTs the serialized result to the dashboard's
2236///    `/api/pyspy_dump` endpoint for persistent storage.
2237/// 3. Returns the generated dump id.
2238async fn pyspy_dump_and_store(
2239    State(state): State<Arc<BridgeState>>,
2240    AxumPath(proc_reference): AxumPath<String>,
2241) -> Result<axum::Json<PyspyDumpAndStoreResponse>, ApiError> {
2242    let telemetry_url = require_telemetry_url(&state)?;
2243    let pyspy_result = do_pyspy_dump(&state, &proc_reference).await?;
2244
2245    let dump_id = uuid::Uuid::new_v4().to_string();
2246    let pyspy_json = serde_json::to_string(&pyspy_result).map_err(|e| ApiError {
2247        code: "internal_error".to_string(),
2248        message: format!("failed to serialize PySpyResult: {}", e),
2249        details: None,
2250    })?;
2251
2252    let store_body = StorePyspyDumpRequest {
2253        dump_id: dump_id.clone(),
2254        proc_ref: proc_reference,
2255        pyspy_result_json: pyspy_json,
2256    };
2257
2258    let store_resp = state
2259        .http_client
2260        .post(format!("{}/api/pyspy_dump", telemetry_url))
2261        .json(&store_body)
2262        .send()
2263        .await
2264        .map_err(|e| ApiError {
2265            code: "proxy_error".to_string(),
2266            message: format!("failed to store pyspy dump in dashboard: {}", e),
2267            details: None,
2268        })?;
2269
2270    if !store_resp.status().is_success() {
2271        return Err(ApiError {
2272            code: "proxy_error".to_string(),
2273            message: format!(
2274                "dashboard rejected pyspy dump store: HTTP {}",
2275                store_resp.status()
2276            ),
2277            details: None,
2278        });
2279    }
2280
2281    Ok(axum::Json(PyspyDumpAndStoreResponse { dump_id }))
2282}
2283
2284/// HTTP bridge for config dump requests.
2285///
2286/// Parses the proc reference, routes to the appropriate actor
2287/// (ProcAgent on worker procs, HostAgent on the service proc),
2288/// probes for reachability, and sends `ConfigDump` directly.
2289/// See CFG-4 in `admin_tui/main.rs`.
2290async fn config_bridge(
2291    State(state): State<Arc<BridgeState>>,
2292    AxumPath(proc_reference): AxumPath<String>,
2293) -> Result<Json<ConfigDumpResult>, ApiError> {
2294    let (proc_reference, proc_id) = parse_pyspy_proc_reference(&proc_reference)?;
2295
2296    // Route by proc name — service proc → HostAgent, all others → ProcAgent.
2297    let agent_id = if proc_id.base_name() == SERVICE_PROC_NAME {
2298        proc_id.actor_id(HOST_MESH_AGENT_ACTOR_NAME, 0)
2299    } else {
2300        proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0)
2301    };
2302
2303    // No preflight probe. The previous probe_actor() call used
2304    // MESH_ADMIN_QUERY_CHILD_TIMEOUT (100ms) and mapped timeout to 404
2305    // "not_found", which misclassifies a live but busy actor as absent.
2306    // The ConfigDump send and its own bridge timeout handle both the
2307    // absent and busy cases correctly.
2308    let cx = &state.bridge_cx;
2309
2310    let port = hyperactor_reference::PortRef::<ConfigDump>::attest_message_port(&agent_id);
2311    let (reply_handle, reply_rx) = open_once_port::<ConfigDumpResult>(cx);
2312    // Mark the reply port non-returnable. If the bridge times out and
2313    // drops the receiver, the late reply from HostAgent/ProcAgent is
2314    // silently dropped instead of bouncing an Undeliverable back to
2315    // the observed actor (which would crash it via the default fatal
2316    // handle_undeliverable_message).
2317    let mut reply_ref = reply_handle.bind();
2318    reply_ref.return_undeliverable(false);
2319
2320    port.send(cx, ConfigDump { result: reply_ref })
2321        .map_err(|e| ApiError {
2322            code: "internal_error".to_string(),
2323            message: format!("failed to send ConfigDump: {}", e),
2324            details: None,
2325        })?;
2326
2327    // Config dumps go through the actor message queue (not the introspection
2328    // callback path). Use the dedicated bridge timeout.
2329    let bridge_timeout =
2330        hyperactor_config::global::get(crate::config::MESH_ADMIN_CONFIG_DUMP_BRIDGE_TIMEOUT);
2331    let wire_result = tokio::time::timeout(bridge_timeout, reply_rx.recv())
2332        .await
2333        .map_err(|_| {
2334            tracing::warn!(
2335                proc_reference = %proc_reference,
2336                "mesh admin: config dump timed out (gateway_timeout)",
2337            );
2338            ApiError {
2339                code: "gateway_timeout".to_string(),
2340                message: format!("timed out waiting for config dump from {}", proc_reference),
2341                details: None,
2342            }
2343        })?
2344        .map_err(|e| ApiError {
2345            code: "internal_error".to_string(),
2346            message: format!("failed to receive ConfigDumpResult: {}", e),
2347            details: None,
2348        })?;
2349
2350    Ok(Json(wire_result))
2351}
2352
2353/// Resolve an opaque reference string to a `NodePayload` via the
2354/// actor-based resolver.
2355///
2356/// Implements `GET /v1/{*reference}` for the reference-walking client
2357/// (e.g. the TUI):
2358/// - Decodes the wildcard path segment into the original reference
2359///   string (Axum does not percent-decode `{*reference}` captures).
2360/// - Sends `ResolveReferenceMessage::Resolve` to `MeshAdminAgent` and
2361///   awaits the reply.
2362/// - Maps resolver failures into appropriate `ApiError`s
2363///   (`bad_request`, `not_found`, `gateway_timeout`, or
2364///   `internal_error`).
2365async fn resolve_reference_bridge(
2366    State(state): State<Arc<BridgeState>>,
2367    AxumPath(reference): AxumPath<String>,
2368) -> Result<Json<NodePayloadDto>, ApiError> {
2369    // Axum's wildcard may include a leading slash; strip it.
2370    let reference = reference.trim_start_matches('/');
2371    if reference.is_empty() {
2372        return Err(ApiError::bad_request("empty reference", None));
2373    }
2374    let reference = urlencoding::decode(reference)
2375        .map(|cow| cow.into_owned())
2376        .map_err(|_| {
2377            ApiError::bad_request(
2378                "malformed percent-encoding: decoded bytes are not valid UTF-8",
2379                None,
2380            )
2381        })?;
2382
2383    // Limit concurrent resolves to avoid starving user workloads
2384    // that share this tokio runtime.
2385    let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2386        tracing::warn!("mesh admin: rejecting resolve request (503): too many concurrent requests");
2387        ApiError {
2388            code: "service_unavailable".to_string(),
2389            message: "too many concurrent introspection requests".to_string(),
2390            details: None,
2391        }
2392    })?;
2393
2394    let cx = &state.bridge_cx;
2395    let resolve_start = std::time::Instant::now();
2396    let response = tokio::time::timeout(
2397        hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
2398        state.admin_ref.resolve(cx, reference.clone()),
2399    )
2400    .await
2401    .map_err(|_| {
2402        tracing::warn!(
2403            reference = %reference,
2404            elapsed_ms = resolve_start.elapsed().as_millis() as u64,
2405            "mesh admin: resolve timed out (gateway_timeout)",
2406        );
2407        ApiError {
2408            code: "gateway_timeout".to_string(),
2409            message: "timed out resolving reference".to_string(),
2410            details: None,
2411        }
2412    })?
2413    .map_err(|e| ApiError {
2414        code: "internal_error".to_string(),
2415        message: format!("failed to resolve reference: {}", e),
2416        details: None,
2417    })?;
2418
2419    match response.0 {
2420        Ok(payload) => Ok(Json(NodePayloadDto::from(payload))),
2421        Err(error) => Err(ApiError::not_found(error, None)),
2422    }
2423}
2424
2425// TODO: MESH_ADMIN_TREE_TIMEOUT is applied per-call, not as a total
2426// budget. On a mesh with N hosts and M procs, the worst case is
2427// N*(1+M) sequential calls each up to 10s. This should use a single
2428// deadline for the entire walk.
2429/// `GET /v1/tree` — ASCII topology dump.
2430///
2431/// Walks the reference graph starting from `"root"`, resolving each
2432/// host and its proc children, and formats the result as a
2433/// human-readable ASCII tree suitable for quick `curl` inspection.
2434/// Each line includes a clickable URL for drilling into that node via
2435/// the reference API. Built on top of the same
2436/// `ResolveReferenceMessage` protocol used by the TUI.
2437///
2438/// Output format:
2439/// ```text
2440/// unix:@hash  ->  https://host:port/v1/...  (or http:// in OSS)
2441/// ├── service  ->  https://host:port/v1/...
2442/// │   ├── agent[0]  ->  https://host:port/v1/...
2443/// │   └── client[0]  ->  https://host:port/v1/...
2444/// ├── local  ->  https://host:port/v1/...
2445/// └── philosophers_0  ->  https://host:port/v1/...
2446///     ├── agent[0]  ->  https://host:port/v1/...
2447///     └── philosopher[0]  ->  https://host:port/v1/...
2448/// ```
2449async fn tree_dump(
2450    State(state): State<Arc<BridgeState>>,
2451    headers: axum::http::header::HeaderMap,
2452) -> Result<String, ApiError> {
2453    // Limit concurrent resolves to avoid starving user workloads.
2454    let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2455        tracing::warn!(
2456            "mesh admin: rejecting tree_dump request (503): too many concurrent requests"
2457        );
2458        ApiError {
2459            code: "service_unavailable".to_string(),
2460            message: "too many concurrent introspection requests".to_string(),
2461            details: None,
2462        }
2463    })?;
2464
2465    let cx = &state.bridge_cx;
2466
2467    // Build base URL from the Host header for clickable links.
2468    let host = headers
2469        .get("host")
2470        .and_then(|v| v.to_str().ok())
2471        .unwrap_or("localhost");
2472    let scheme = headers
2473        .get("x-forwarded-proto")
2474        .and_then(|v| v.to_str().ok())
2475        .unwrap_or("http");
2476    let base_url = format!("{}://{}", scheme, host);
2477
2478    // Resolve root.
2479    let root_resp = tokio::time::timeout(
2480        hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2481        state.admin_ref.resolve(cx, "root".to_string()),
2482    )
2483    .await
2484    .map_err(|_| ApiError {
2485        code: "gateway_timeout".to_string(),
2486        message: "timed out resolving root".to_string(),
2487        details: None,
2488    })?
2489    .map_err(|e| ApiError {
2490        code: "internal_error".to_string(),
2491        message: format!("failed to resolve root: {}", e),
2492        details: None,
2493    })?;
2494
2495    let root = root_resp.0.map_err(|e| ApiError {
2496        code: "internal_error".to_string(),
2497        message: e,
2498        details: None,
2499    })?;
2500
2501    let mut output = String::new();
2502
2503    // Resolve each root child. Hosts get the full host→proc→actor
2504    // subtree; non-host children (e.g. the root client actor) are
2505    // rendered as single leaf lines.
2506    for child_ref in &root.children {
2507        let child_ref_str = child_ref.to_string();
2508        let resp = tokio::time::timeout(
2509            hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2510            state.admin_ref.resolve(cx, child_ref_str.clone()),
2511        )
2512        .await;
2513
2514        let payload = match resp {
2515            Ok(Ok(r)) => r.0.ok(),
2516            _ => None,
2517        };
2518
2519        match payload {
2520            Some(node) if matches!(node.properties, NodeProperties::Host { .. }) => {
2521                let header = match &node.properties {
2522                    NodeProperties::Host { addr, .. } => addr.clone(),
2523                    _ => child_ref_str.clone(),
2524                };
2525                let host_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2526                output.push_str(&format!("{}  ->  {}\n", header, host_url));
2527
2528                let num_procs = node.children.len();
2529                for (i, proc_ref) in node.children.iter().enumerate() {
2530                    let proc_ref_str = proc_ref.to_string();
2531                    let is_last_proc = i == num_procs - 1;
2532                    let proc_connector = if is_last_proc {
2533                        "└── "
2534                    } else {
2535                        "├── "
2536                    };
2537                    let proc_name = derive_tree_label(proc_ref);
2538                    let proc_url =
2539                        format!("{}/v1/{}", base_url, urlencoding::encode(&proc_ref_str));
2540                    output.push_str(&format!(
2541                        "{}{}  ->  {}\n",
2542                        proc_connector, proc_name, proc_url
2543                    ));
2544
2545                    let proc_resp = tokio::time::timeout(
2546                        hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2547                        state.admin_ref.resolve(cx, proc_ref_str),
2548                    )
2549                    .await;
2550                    let proc_payload = match proc_resp {
2551                        Ok(Ok(r)) => r.0.ok(),
2552                        _ => None,
2553                    };
2554                    if let Some(proc_node) = proc_payload {
2555                        let num_actors = proc_node.children.len();
2556                        let child_prefix = if is_last_proc { "    " } else { "│   " };
2557                        for (j, actor_ref) in proc_node.children.iter().enumerate() {
2558                            let actor_ref_str = actor_ref.to_string();
2559                            let actor_connector = if j == num_actors - 1 {
2560                                "└── "
2561                            } else {
2562                                "├── "
2563                            };
2564                            let actor_label = derive_actor_label(actor_ref);
2565                            let actor_url =
2566                                format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2567                            output.push_str(&format!(
2568                                "{}{}{}  ->  {}\n",
2569                                child_prefix, actor_connector, actor_label, actor_url
2570                            ));
2571                        }
2572                    }
2573                }
2574                output.push('\n');
2575            }
2576            Some(node) if matches!(node.properties, NodeProperties::Proc { .. }) => {
2577                let proc_name = match &node.properties {
2578                    NodeProperties::Proc { proc_name, .. } => proc_name.clone(),
2579                    _ => child_ref_str.clone(),
2580                };
2581                let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2582                output.push_str(&format!("{}  ->  {}\n", proc_name, proc_url));
2583
2584                let num_actors = node.children.len();
2585                for (j, actor_ref) in node.children.iter().enumerate() {
2586                    let actor_ref_str = actor_ref.to_string();
2587                    let actor_connector = if j == num_actors - 1 {
2588                        "└── "
2589                    } else {
2590                        "├── "
2591                    };
2592                    let actor_label = derive_actor_label(actor_ref);
2593                    let actor_url =
2594                        format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2595                    output.push_str(&format!(
2596                        "{}{}  ->  {}\n",
2597                        actor_connector, actor_label, actor_url
2598                    ));
2599                }
2600                output.push('\n');
2601            }
2602            Some(_node) => {
2603                let label = derive_actor_label(child_ref);
2604                let url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2605                output.push_str(&format!("{}  ->  {}\n\n", label, url));
2606            }
2607            _ => {
2608                output.push_str(&format!("{} (unreachable)\n\n", child_ref));
2609            }
2610        }
2611    }
2612    Ok(output)
2613}
2614
2615/// Derive a short display label from a reference string for the ASCII
2616/// tree.
2617///
2618/// Extracts the proc name — the meaningful identifier for tree
2619/// display — from the various reference formats emitted by
2620/// `HostAgent`'s children list:
2621///
2622/// - System proc ref `"[system] unix:@hash,service"` → `"service"`
2623/// - ProcAgent ActorId `"unix:@hash,my_proc,agent[0]"` →
2624///   `"my_proc"`
2625/// - Bare ProcId `"unix:@hash,my_proc"` → `"my_proc"`
2626///
2627/// Note: `ActorId::Display` for `ProcId` uses commas as
2628/// separators (`proc_id,actor_name[idx]`), not slashes.
2629fn derive_tree_label(node_ref: &crate::introspect::NodeRef) -> String {
2630    match node_ref {
2631        crate::introspect::NodeRef::Root => "root".to_string(),
2632        crate::introspect::NodeRef::Host(id) => id.proc_id().name().to_string(),
2633        crate::introspect::NodeRef::Proc(id) => id.name().to_string(),
2634        crate::introspect::NodeRef::Actor(id) => {
2635            format!("{}{}", id.name(), format_args!("[{}]", id.pid()))
2636        }
2637    }
2638}
2639
2640fn derive_actor_label(node_ref: &crate::introspect::NodeRef) -> String {
2641    match node_ref {
2642        crate::introspect::NodeRef::Root => "root".to_string(),
2643        crate::introspect::NodeRef::Host(id) => id.name().to_string(),
2644        crate::introspect::NodeRef::Proc(id) => id.name().to_string(),
2645        crate::introspect::NodeRef::Actor(id) => {
2646            format!("{}[{}]", id.name(), id.pid())
2647        }
2648    }
2649}
2650
2651// -- Admin handle type discrimination --
2652
2653/// A handle scheme that requires a publication-based lookup to resolve
2654/// to a concrete admin URL.
2655///
2656/// Only `Mast` is defined today. The nested-enum shape allows future
2657/// scheduler-specific variants (Slurm, K8s, etc.) to be added without
2658/// changing `AdminHandle`.
2659#[non_exhaustive]
2660pub enum PublishedHandle {
2661    /// `mast_conda:///<job-name>` — requires publication-based discovery.
2662    Mast(String),
2663}
2664
2665impl PublishedHandle {
2666    /// Resolve a published handle to a concrete admin URL.
2667    ///
2668    /// All published-handle schemes return an explicit error today.
2669    /// When real publication lookup is implemented, dispatch by variant here.
2670    pub async fn resolve(self, _port_override: Option<u16>) -> anyhow::Result<String> {
2671        anyhow::bail!(
2672            "publication-based admin handle resolution is not yet implemented: \
2673             mesh admin placement has moved to the caller's local proc. \
2674             Discover the admin URL from startup output or another \
2675             launch-time publication instead."
2676        )
2677    }
2678}
2679
2680/// A handle for locating the mesh admin server.
2681///
2682/// Parse a user-supplied address string with [`AdminHandle::parse`]
2683/// and resolve it to a concrete URL with [`AdminHandle::resolve`].
2684#[non_exhaustive]
2685pub enum AdminHandle {
2686    /// Already-resolved URL (e.g. `https://host:1729`).
2687    Url(String),
2688    /// Handle that requires a publication lookup. Currently unresolvable.
2689    Published(PublishedHandle),
2690    /// Scheme or format that is not recognized.
2691    Unsupported(String),
2692}
2693
2694impl AdminHandle {
2695    /// Parse an address string into an `AdminHandle`.
2696    ///
2697    /// Uses `url` crate parsing. Known publication-handle prefixes
2698    /// (`mast_conda:///`) are classified as `Published`. `http`/`https`
2699    /// scheme URLs are `Url`. Bare `host:port` inputs (no scheme) are
2700    /// inferred as `https://host:port` and classified as `Url` — this
2701    /// preserves existing TUI behavior where `--addr myhost:1729` is a
2702    /// valid input. Everything else is `Unsupported`.
2703    pub fn parse(addr: &str) -> Self {
2704        // Check known publication handle prefixes first.
2705        if addr.starts_with("mast_conda:///") {
2706            return AdminHandle::Published(PublishedHandle::Mast(addr.to_string()));
2707        }
2708        // Strict URL parse — only http/https accepted.
2709        if let Ok(parsed) = url::Url::parse(addr) {
2710            if matches!(parsed.scheme(), "http" | "https") {
2711                return AdminHandle::Url(addr.to_string());
2712            }
2713        }
2714        // Infer https:// for bare host:port inputs (e.g. "myhost:1729").
2715        // This preserves the TUI's documented --addr behavior.
2716        let with_scheme = format!("https://{}", addr);
2717        if let Ok(parsed) = url::Url::parse(&with_scheme) {
2718            if parsed.host_str().is_some() && parsed.port().is_some() {
2719                return AdminHandle::Url(with_scheme);
2720            }
2721        }
2722        AdminHandle::Unsupported(addr.to_string())
2723    }
2724
2725    /// Resolve to a concrete admin base URL.
2726    ///
2727    /// `port_override` is retained to preserve existing call surfaces
2728    /// but is intentionally unused until real publication lookup is
2729    /// implemented.
2730    pub async fn resolve(self, port_override: Option<u16>) -> anyhow::Result<String> {
2731        match self {
2732            AdminHandle::Url(url) => Ok(url),
2733            AdminHandle::Published(h) => h.resolve(port_override).await,
2734            AdminHandle::Unsupported(s) => anyhow::bail!(
2735                "unrecognized admin handle '{}': expected https://host:port or mast_conda:///job",
2736                s
2737            ),
2738        }
2739    }
2740}
2741
2742/// Resolve a `mast_conda:///<job-name>` handle into an admin base URL.
2743///
2744/// **Disabled.** Mesh admin placement has moved to the caller's local
2745/// proc. Delegates to [`AdminHandle::Published`] + [`PublishedHandle::resolve`].
2746/// Kept as a stable API shim; do not remove.
2747pub async fn resolve_mast_handle(
2748    handle: &str,
2749    port_override: Option<u16>,
2750) -> anyhow::Result<String> {
2751    AdminHandle::Published(PublishedHandle::Mast(handle.to_string()))
2752        .resolve(port_override)
2753        .await
2754}
2755
2756#[cfg(test)]
2757mod tests {
2758    use std::net::SocketAddr;
2759
2760    use hyperactor::channel::ChannelAddr;
2761    use hyperactor::testing::ids::test_proc_id_with_addr;
2762
2763    use super::*;
2764
2765    // Integration tests that spawn MeshAdminAgent must pass
2766    // `Some("[::]:0".parse().unwrap())` as the admin_addr to get an
2767    // ephemeral port. The default (`None`) reads MESH_ADMIN_ADDR
2768    // config which is `[::]:1729` — a fixed port that causes bind
2769    // conflicts when tests run concurrently.
2770
2771    /// Minimal introspectable actor for tests. The `#[export]`
2772    /// attribute generates `Named + Referable + Binds` so that
2773    /// `handle.bind()` registers the `IntrospectMessage` port for
2774    /// remote delivery.
2775    #[derive(Debug)]
2776    #[hyperactor::export(handlers = [])]
2777    struct TestIntrospectableActor;
2778    impl Actor for TestIntrospectableActor {}
2779
2780    // Verifies that MeshAdminAgent::build_root_payload constructs the
2781    // expected root node: identity/root metadata, correct Root
2782    // properties (num_hosts), and child links populated with the
2783    // stringified IDs of the configured host mesh-agent ActorRefs.
2784    #[test]
2785    fn test_build_root_payload() {
2786        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
2787        let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
2788
2789        let proc1 = test_proc_id_with_addr(ChannelAddr::Tcp(addr1), "host1");
2790        let proc2 = test_proc_id_with_addr(ChannelAddr::Tcp(addr2), "host2");
2791
2792        let actor_id1 = proc1.actor_id("mesh_agent", 0);
2793        let actor_id2 = proc2.actor_id("mesh_agent", 0);
2794
2795        let ref1: hyperactor_reference::ActorRef<HostAgent> =
2796            hyperactor_reference::ActorRef::attest(actor_id1.clone());
2797        let ref2: hyperactor_reference::ActorRef<HostAgent> =
2798            hyperactor_reference::ActorRef::attest(actor_id2.clone());
2799
2800        let agent = MeshAdminAgent::new(
2801            vec![("host_a".to_string(), ref1), ("host_b".to_string(), ref2)],
2802            None,
2803            None,
2804            None,
2805        );
2806
2807        let payload = agent.build_root_payload();
2808        assert_eq!(payload.identity, crate::introspect::NodeRef::Root);
2809        assert_eq!(payload.parent, None);
2810        assert!(matches!(
2811            payload.properties,
2812            NodeProperties::Root { num_hosts: 2, .. }
2813        ));
2814        assert_eq!(payload.children.len(), 2);
2815        assert!(
2816            payload
2817                .children
2818                .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
2819        );
2820        assert!(
2821            payload
2822                .children
2823                .contains(&crate::introspect::NodeRef::Host(actor_id2.clone()))
2824        );
2825
2826        // Verify root properties derived from attrs.
2827        match &payload.properties {
2828            NodeProperties::Root {
2829                num_hosts,
2830                started_by,
2831                system_children,
2832                ..
2833            } => {
2834                assert_eq!(*num_hosts, 2);
2835                assert!(!started_by.is_empty());
2836                // LC-1: root system_children is always empty.
2837                assert!(
2838                    system_children.is_empty(),
2839                    "LC-1: root system_children must be empty"
2840                );
2841            }
2842            other => panic!("expected Root, got {:?}", other),
2843        }
2844    }
2845
2846    // End-to-end smoke test for MeshAdminAgent::resolve that walks
2847    // the reference tree: root → host → system proc → host-agent
2848    // cross-reference. Verifies the reverse index routes the
2849    // HostAgent ActorId to NodeProperties::Host (not Actor),
2850    // preventing the TUI's cycle detection from dropping that node.
2851    #[tokio::test]
2852    async fn test_resolve_reference_tree_walk() {
2853        use hyperactor::Proc;
2854        use hyperactor::channel::ChannelTransport;
2855        use hyperactor::host::Host;
2856        use hyperactor::host::LocalProcManager;
2857
2858        use crate::host_mesh::host_agent::HostAgentMode;
2859        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2860        use crate::proc_agent::ProcAgent;
2861
2862        // -- 1. Stand up a local in-process Host with a HostAgent --
2863        // Use Unix transport for all procs — Local transport does not
2864        // support cross-proc message routing.
2865        let spawn: ProcManagerSpawnFn =
2866            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2867        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2868        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2869            Host::new(manager, ChannelTransport::Unix.any())
2870                .await
2871                .unwrap();
2872        let host_addr = host.addr().clone();
2873        let system_proc = host.system_proc().clone();
2874        let host_agent_handle = system_proc
2875            .spawn(
2876                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2877                HostAgent::new(HostAgentMode::Local(host)),
2878            )
2879            .unwrap();
2880        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2881        let host_addr_str = host_addr.to_string();
2882
2883        // -- 2. Spawn MeshAdminAgent on a dedicated test proc --
2884        // NOTE: This does not conform to SA-5 (caller-local placement).
2885        // Production uses host_mesh::spawn_admin(). This is a white-box
2886        // test of admin behavior, not placement.
2887        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2888        // The admin proc has no supervision coordinator by default.
2889        // Without one, actor teardown triggers std::process::exit(1).
2890        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2891        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2892        let admin_handle = admin_proc
2893            .spawn(
2894                MESH_ADMIN_ACTOR_NAME,
2895                MeshAdminAgent::new(
2896                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
2897                    None,
2898                    Some("[::]:0".parse().unwrap()),
2899                    None,
2900                ),
2901            )
2902            .unwrap();
2903        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2904
2905        // -- 3. Create a bare client instance for sending messages --
2906        // Only a mailbox is needed for reply ports — no actor message
2907        // loop required.
2908        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2909        let (client, _handle) = client_proc.instance("client").unwrap();
2910
2911        // -- 4. Resolve "root" --
2912        let root_resp = admin_ref
2913            .resolve(&client, "root".to_string())
2914            .await
2915            .unwrap();
2916        let root = root_resp.0.unwrap();
2917        assert_eq!(root.identity, crate::introspect::NodeRef::Root);
2918        assert!(matches!(
2919            root.properties,
2920            NodeProperties::Root { num_hosts: 1, .. }
2921        ));
2922        assert_eq!(root.parent, None);
2923        assert_eq!(root.children.len(), 1); // host only (admin proc no longer standalone)
2924
2925        // -- 5. Resolve the host child --
2926        let expected_host_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone());
2927        let host_child_ref = root
2928            .children
2929            .iter()
2930            .find(|c| **c == expected_host_ref)
2931            .expect("root children should contain the host agent (as Host ref)");
2932        let host_ref_string = host_child_ref.to_string();
2933        let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
2934        let host_node = host_resp.0.unwrap();
2935        assert_eq!(host_node.identity, expected_host_ref);
2936        assert!(
2937            matches!(host_node.properties, NodeProperties::Host { .. }),
2938            "expected Host properties, got {:?}",
2939            host_node.properties
2940        );
2941        assert_eq!(host_node.parent, Some(crate::introspect::NodeRef::Root));
2942        assert!(
2943            !host_node.children.is_empty(),
2944            "host should have at least one proc child"
2945        );
2946        // LC-2: host system_children is always empty.
2947        match &host_node.properties {
2948            NodeProperties::Host {
2949                system_children, ..
2950            } => {
2951                assert!(
2952                    system_children.is_empty(),
2953                    "LC-2: host system_children must be empty"
2954                );
2955            }
2956            other => panic!("expected Host, got {:?}", other),
2957        }
2958
2959        // -- 6. Resolve a system proc child --
2960        let proc_ref = &host_node.children[0];
2961        let proc_ref_str = proc_ref.to_string();
2962        let proc_resp = admin_ref.resolve(&client, proc_ref_str).await.unwrap();
2963        let proc_node = proc_resp.0.unwrap();
2964        assert!(
2965            matches!(proc_node.properties, NodeProperties::Proc { .. }),
2966            "expected Proc properties, got {:?}",
2967            proc_node.properties
2968        );
2969        assert_eq!(proc_node.parent, Some(expected_host_ref.clone()));
2970        // The system proc should have at least the "host_agent" actor.
2971        assert!(
2972            !proc_node.children.is_empty(),
2973            "proc should have at least one actor child"
2974        );
2975
2976        // -- 7. Cross-reference: system proc child is the host agent --
2977        //
2978        // The service proc's actor (agent[0]) IS the HostAgent, so
2979        // it appears both as a host node (from root, via NodeRef::Host)
2980        // and as an actor (from a proc's children list, via NodeRef::Actor).
2981        // NodeRef::Host in root children makes resolution unambiguous:
2982        // host refs get Entity view, plain actor refs get Actor view.
2983
2984        // The system proc must list the host agent among its children.
2985        let host_agent_node_ref =
2986            crate::introspect::NodeRef::Actor(host_agent_ref.actor_id().clone());
2987        assert!(
2988            proc_node.children.contains(&host_agent_node_ref),
2989            "system proc children {:?} should contain the host agent {:?}",
2990            proc_node.children,
2991            host_agent_node_ref
2992        );
2993
2994        // Resolve that child reference as a plain actor (no host: prefix).
2995        let xref_resp = admin_ref
2996            .resolve(&client, host_agent_ref.actor_id().to_string())
2997            .await
2998            .unwrap();
2999        let xref_node = xref_resp.0.unwrap();
3000
3001        // When resolved as a plain actor reference, it must return
3002        // Actor properties (not Host), because it has no host: prefix.
3003        assert!(
3004            matches!(xref_node.properties, NodeProperties::Actor { .. }),
3005            "host agent child resolved as plain actor should be Actor, got {:?}",
3006            xref_node.properties
3007        );
3008    }
3009
3010    // Verifies MeshAdminAgent::resolve returns NodeProperties::Proc
3011    // for all proc children. Spawns a user proc via
3012    // CreateOrUpdate<ProcSpec>, resolves all host proc-children, and
3013    // asserts every proc returns Proc properties.
3014    #[tokio::test]
3015    async fn test_proc_properties_for_all_procs() {
3016        use std::time::Duration;
3017
3018        use hyperactor::Proc;
3019        use hyperactor::channel::ChannelTransport;
3020        use hyperactor::host::Host;
3021        use hyperactor::host::LocalProcManager;
3022
3023        use crate::Name;
3024        use crate::host_mesh::host_agent::HostAgentMode;
3025        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3026        use crate::proc_agent::ProcAgent;
3027        use crate::resource;
3028        use crate::resource::ProcSpec;
3029        use crate::resource::Rank;
3030
3031        // Stand up a local in-process Host with a HostAgent.
3032        let spawn: ProcManagerSpawnFn =
3033            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3034        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3035        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3036            Host::new(manager, ChannelTransport::Unix.any())
3037                .await
3038                .unwrap();
3039        let host_addr = host.addr().clone();
3040        let system_proc = host.system_proc().clone();
3041        let host_agent_handle = system_proc
3042            .spawn(
3043                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3044                HostAgent::new(HostAgentMode::Local(host)),
3045            )
3046            .unwrap();
3047        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3048        let host_addr_str = host_addr.to_string();
3049
3050        // Spawn MeshAdminAgent on a dedicated test proc.
3051        // NOTE: Does not conform to SA-5 (caller-local placement).
3052        // Production uses host_mesh::spawn_admin(). White-box test setup.
3053        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3054        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3055        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3056        let admin_handle = admin_proc
3057            .spawn(
3058                MESH_ADMIN_ACTOR_NAME,
3059                MeshAdminAgent::new(
3060                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3061                    None,
3062                    Some("[::]:0".parse().unwrap()),
3063                    None,
3064                ),
3065            )
3066            .unwrap();
3067        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3068
3069        // Create a bare client instance for sending messages.
3070        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3071        let (client, _handle) = client_proc.instance("client").unwrap();
3072
3073        // Spawn a user proc via CreateOrUpdate<ProcSpec>.
3074        let user_proc_name = Name::new("user_proc").unwrap();
3075        host_agent_ref
3076            .send(
3077                &client,
3078                resource::CreateOrUpdate {
3079                    name: user_proc_name.clone(),
3080                    rank: Rank::new(0),
3081                    spec: ProcSpec::default(),
3082                },
3083            )
3084            .unwrap();
3085
3086        // Wait for the user proc to boot.
3087        tokio::time::sleep(Duration::from_secs(2)).await;
3088
3089        // Resolve the host to get its children (system + user procs).
3090        let host_ref_string =
3091            crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone()).to_string();
3092        let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
3093        let host_node = host_resp.0.unwrap();
3094
3095        // The host should have at least 3 children: system proc,
3096        // local proc, and our user proc.
3097        assert!(
3098            host_node.children.len() >= 3,
3099            "expected at least 3 proc children (2 system + 1 user), got {}",
3100            host_node.children.len()
3101        );
3102
3103        // Resolve each proc child and verify it has Proc properties.
3104        let user_proc_name_str = user_proc_name.to_string();
3105        let mut found_system = false;
3106        let mut found_user = false;
3107        for child_ref in &host_node.children {
3108            let resp = admin_ref
3109                .resolve(&client, child_ref.to_string())
3110                .await
3111                .unwrap();
3112            let node = resp.0.unwrap();
3113            if let NodeProperties::Proc { proc_name, .. } = &node.properties {
3114                if proc_name.contains(&user_proc_name_str) {
3115                    found_user = true;
3116                } else {
3117                    found_system = true;
3118                }
3119                // Properties derived from attrs — verified by derive_properties tests.
3120            } else {
3121                // Host agent cross-reference — skip.
3122            }
3123        }
3124        assert!(
3125            found_system,
3126            "should have resolved at least one system proc"
3127        );
3128        assert!(found_user, "should have resolved the user proc");
3129    }
3130
3131    // Verifies that build_root_payload lists only the host as a
3132    // child. The root client is visible under its host's local proc,
3133    // not at root level.
3134    #[test]
3135    fn test_build_root_payload_with_root_client() {
3136        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
3137        let proc1 = hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "host1");
3138        let actor_id1 = hyperactor_reference::ActorId::root(proc1, "mesh_agent".to_string());
3139        let ref1: hyperactor_reference::ActorRef<HostAgent> =
3140            hyperactor_reference::ActorRef::attest(actor_id1.clone());
3141
3142        let client_proc_id =
3143            hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "local");
3144        let client_actor_id = client_proc_id.actor_id("client", 0);
3145
3146        let agent = MeshAdminAgent::new(
3147            vec![("host_a".to_string(), ref1)],
3148            Some(client_actor_id.clone()),
3149            None,
3150            None,
3151        );
3152
3153        let payload = agent.build_root_payload();
3154        assert!(matches!(
3155            payload.properties,
3156            NodeProperties::Root { num_hosts: 1, .. }
3157        ));
3158        // Only the host; root client is under host → local proc.
3159        assert_eq!(payload.children.len(), 1);
3160        assert!(
3161            payload
3162                .children
3163                .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
3164        );
3165    }
3166
3167    // Verifies that the root client actor is visible through the
3168    // host → local proc → actor path, not as a standalone child of
3169    // root.
3170    #[tokio::test]
3171    async fn test_resolve_root_client_actor() {
3172        use hyperactor::channel::ChannelTransport;
3173        use hyperactor::host::Host;
3174        use hyperactor::host::LocalProcManager;
3175
3176        use crate::host_mesh::host_agent::HostAgentMode;
3177        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3178        use crate::proc_agent::ProcAgent;
3179
3180        // Stand up a local in-process Host with a HostAgent.
3181        let spawn: ProcManagerSpawnFn =
3182            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3183        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3184        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3185            Host::new(manager, ChannelTransport::Unix.any())
3186                .await
3187                .unwrap();
3188        let host_addr = host.addr().clone();
3189        let system_proc = host.system_proc().clone();
3190
3191        // Spawn the root client on the host's local proc (before
3192        // moving the host into HostAgentMode).
3193        let local_proc = host.local_proc();
3194        let local_proc_id = local_proc.proc_id().clone();
3195        let root_client_handle = local_proc.spawn("client", TestIntrospectableActor).unwrap();
3196        let root_client_ref: hyperactor_reference::ActorRef<TestIntrospectableActor> =
3197            root_client_handle.bind();
3198        let root_client_actor_id = root_client_ref.actor_id().clone();
3199
3200        let host_agent_handle = system_proc
3201            .spawn(
3202                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3203                HostAgent::new(HostAgentMode::Local(host)),
3204            )
3205            .unwrap();
3206        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3207        let host_addr_str = host_addr.to_string();
3208
3209        // Spawn MeshAdminAgent on a dedicated test proc with the root
3210        // client ActorId. NOTE: Does not conform to SA-5 (caller-local
3211        // placement). Production uses host_mesh::spawn_admin().
3212        // White-box test of root-client visibility, not placement.
3213        let admin_proc =
3214            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3215        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3216        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3217        let admin_handle = admin_proc
3218            .spawn(
3219                MESH_ADMIN_ACTOR_NAME,
3220                MeshAdminAgent::new(
3221                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3222                    Some(root_client_actor_id.clone()),
3223                    Some("[::]:0".parse().unwrap()),
3224                    None,
3225                ),
3226            )
3227            .unwrap();
3228        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3229
3230        // Client for sending messages.
3231        let client_proc =
3232            hyperactor::Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3233        let (client, _handle) = client_proc.instance("client").unwrap();
3234
3235        // Resolve "root" — should contain only the host.
3236        let root_resp = admin_ref
3237            .resolve(&client, "root".to_string())
3238            .await
3239            .unwrap();
3240        let root = root_resp.0.unwrap();
3241        let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone());
3242        assert!(
3243            root.children.contains(&host_node_ref),
3244            "root children {:?} should contain host {:?}",
3245            root.children,
3246            host_node_ref
3247        );
3248
3249        // Resolve the host — should list the local proc in children.
3250        let host_resp = admin_ref
3251            .resolve(&client, host_node_ref.to_string())
3252            .await
3253            .unwrap();
3254        let host_node = host_resp.0.unwrap();
3255        let local_proc_node_ref = crate::introspect::NodeRef::Proc(local_proc_id.clone());
3256        assert!(
3257            host_node.children.contains(&local_proc_node_ref),
3258            "host children {:?} should contain local proc {:?}",
3259            host_node.children,
3260            local_proc_node_ref
3261        );
3262
3263        // Resolve the local proc — should contain the root client actor.
3264        let proc_resp = admin_ref
3265            .resolve(&client, local_proc_id.to_string())
3266            .await
3267            .unwrap();
3268        let proc_node = proc_resp.0.unwrap();
3269        assert!(
3270            matches!(proc_node.properties, NodeProperties::Proc { .. }),
3271            "expected Proc properties, got {:?}",
3272            proc_node.properties
3273        );
3274        let root_client_node_ref = crate::introspect::NodeRef::Actor(root_client_actor_id.clone());
3275        assert!(
3276            proc_node.children.contains(&root_client_node_ref),
3277            "local proc children {:?} should contain root client actor {:?}",
3278            proc_node.children,
3279            root_client_node_ref
3280        );
3281
3282        // Resolve the root client actor — parent should be the local proc.
3283        let client_resp = admin_ref
3284            .resolve(&client, root_client_actor_id.to_string())
3285            .await
3286            .unwrap();
3287        let client_node = client_resp.0.unwrap();
3288        assert!(
3289            matches!(client_node.properties, NodeProperties::Actor { .. }),
3290            "expected Actor properties, got {:?}",
3291            client_node.properties
3292        );
3293        assert_eq!(
3294            client_node.parent,
3295            Some(local_proc_node_ref),
3296            "root client parent should be the local proc"
3297        );
3298    }
3299
3300    // Verifies that the SKILL.md template contains the canonical
3301    // strings that agents and tests rely on. Prevents silent drift or
3302    // accidental removal.
3303    #[test]
3304    fn test_skill_md_contains_canonical_strings() {
3305        let template = SKILL_MD_TEMPLATE;
3306        assert!(
3307            template.contains("GET {base}/v1/root"),
3308            "SKILL.md must document the root endpoint"
3309        );
3310        assert!(
3311            template.contains("GET {base}/v1/{reference}"),
3312            "SKILL.md must document the reference endpoint"
3313        );
3314        assert!(
3315            template.contains("NodePayload"),
3316            "SKILL.md must mention the NodePayload response type"
3317        );
3318        assert!(
3319            template.contains("GET {base}/SKILL.md"),
3320            "SKILL.md must document itself"
3321        );
3322        assert!(
3323            template.contains("{base}"),
3324            "SKILL.md must use {{base}} placeholder for interpolation"
3325        );
3326    }
3327
3328    // Verifies the navigation identity invariant (see module docs):
3329    //
3330    // 1. payload.identity == reference_string used to resolve it.
3331    // 2. For each child reference C of a resolved node P,
3332    //    resolve(C).parent == P.identity.
3333    //
3334    // Walks the entire tree starting from root, checking both
3335    // properties at every reachable node.
3336    #[tokio::test]
3337    async fn test_navigation_identity_invariant() {
3338        use hyperactor::Proc;
3339        use hyperactor::channel::ChannelTransport;
3340        use hyperactor::host::Host;
3341        use hyperactor::host::LocalProcManager;
3342
3343        use crate::host_mesh::host_agent::HostAgentMode;
3344        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3345        use crate::proc_agent::ProcAgent;
3346
3347        // Stand up a local host with a HostAgent.
3348        let spawn: ProcManagerSpawnFn =
3349            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3350        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3351        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3352            Host::new(manager, ChannelTransport::Unix.any())
3353                .await
3354                .unwrap();
3355        let host_addr = host.addr().clone();
3356        let system_proc = host.system_proc().clone();
3357        let host_agent_handle = system_proc
3358            .spawn(
3359                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3360                HostAgent::new(HostAgentMode::Local(host)),
3361            )
3362            .unwrap();
3363        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3364        let host_addr_str = host_addr.to_string();
3365
3366        // Spawn MeshAdminAgent on a dedicated test proc.
3367        // NOTE: Does not conform to SA-5 (caller-local placement).
3368        // Production uses host_mesh::spawn_admin(). White-box test setup.
3369        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3370        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3371        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3372        let admin_handle = admin_proc
3373            .spawn(
3374                MESH_ADMIN_ACTOR_NAME,
3375                MeshAdminAgent::new(
3376                    vec![(host_addr_str, host_agent_ref)],
3377                    None,
3378                    Some("[::]:0".parse().unwrap()),
3379                    None,
3380                ),
3381            )
3382            .unwrap();
3383        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3384
3385        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3386        let (client, _handle) = client_proc.instance("client").unwrap();
3387
3388        // Walk the tree breadth-first, checking the invariant at every node.
3389        // Each entry is (reference_string, expected_parent_identity).
3390        let mut queue: std::collections::VecDeque<(String, Option<crate::introspect::NodeRef>)> =
3391            std::collections::VecDeque::new();
3392        queue.push_back(("root".to_string(), None));
3393
3394        let mut visited = std::collections::HashSet::new();
3395        while let Some((ref_str, expected_parent)) = queue.pop_front() {
3396            if !visited.insert(ref_str.clone()) {
3397                continue;
3398            }
3399
3400            let resp = admin_ref.resolve(&client, ref_str.clone()).await.unwrap();
3401            let node = resp.0.unwrap();
3402
3403            // NI-1: identity display matches the reference used.
3404            assert_eq!(
3405                node.identity.to_string(),
3406                ref_str,
3407                "identity mismatch: resolved '{}' but payload.identity = '{}'",
3408                ref_str,
3409                node.identity
3410            );
3411
3412            // NI-2: parent matches the parent node's identity.
3413            assert_eq!(
3414                node.parent, expected_parent,
3415                "parent mismatch for '{}': expected {:?}, got {:?}",
3416                ref_str, expected_parent, node.parent
3417            );
3418
3419            // Enqueue children with this node's identity as their
3420            // expected parent.
3421            for child_ref in &node.children {
3422                let child_str = child_ref.to_string();
3423                if !visited.contains(&child_str) {
3424                    queue.push_back((child_str, Some(node.identity.clone())));
3425                }
3426            }
3427        }
3428
3429        // Sanity: we should have visited at least root, host, a
3430        // proc, and an actor.
3431        assert!(
3432            visited.len() >= 4,
3433            "expected at least 4 nodes in the tree, visited {}",
3434            visited.len()
3435        );
3436    }
3437
3438    // Exercises SP-1..SP-4 for host/proc payloads.
3439    #[tokio::test]
3440    async fn test_system_proc_identity() {
3441        use hyperactor::Proc;
3442        use hyperactor::channel::ChannelTransport;
3443        use hyperactor::host::Host;
3444        use hyperactor::host::LocalProcManager;
3445
3446        use crate::host_mesh::host_agent::HostAgentMode;
3447        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3448        use crate::proc_agent::ProcAgent;
3449
3450        // -- 1. Stand up a local in-process Host with a HostAgent --
3451        let spawn: ProcManagerSpawnFn =
3452            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3453        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3454        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3455            Host::new(manager, ChannelTransport::Unix.any())
3456                .await
3457                .unwrap();
3458        let host_addr = host.addr().clone();
3459        let system_proc = host.system_proc().clone();
3460        let system_proc_id = system_proc.proc_id().clone();
3461        let host_agent_handle = system_proc
3462            .spawn(
3463                crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3464                HostAgent::new(HostAgentMode::Local(host)),
3465            )
3466            .unwrap();
3467        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3468        let host_addr_str = host_addr.to_string();
3469
3470        // -- 2. Spawn MeshAdminAgent on a dedicated test proc --
3471        // NOTE: This does not conform to SA-5 (caller-local placement).
3472        // Production uses host_mesh::spawn_admin(). This is a white-box
3473        // test of admin behavior, not placement.
3474        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3475        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3476        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3477        let admin_handle = admin_proc
3478            .spawn(
3479                MESH_ADMIN_ACTOR_NAME,
3480                MeshAdminAgent::new(
3481                    vec![(host_addr_str.clone(), host_agent_ref.clone())],
3482                    None,
3483                    Some("[::]:0".parse().unwrap()),
3484                    None,
3485                ),
3486            )
3487            .unwrap();
3488        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3489
3490        // -- 3. Create a bare client instance for sending messages --
3491        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3492        let (client, _handle) = client_proc.instance("client").unwrap();
3493
3494        // -- 4. Resolve the host to get its children --
3495        let host_ref_str =
3496            crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone()).to_string();
3497        let host_resp = admin_ref
3498            .resolve(&client, host_ref_str.clone())
3499            .await
3500            .unwrap();
3501        let host_node = host_resp.0.unwrap();
3502        assert!(
3503            !host_node.children.is_empty(),
3504            "host should have at least one proc child"
3505        );
3506
3507        // -- 5. Find a system proc child via system_children --
3508        let system_children = match &host_node.properties {
3509            NodeProperties::Host {
3510                system_children, ..
3511            } => system_children.clone(),
3512            other => panic!("expected Host properties, got {:?}", other),
3513        };
3514        // Procs are never system — host system_children should be empty.
3515        assert!(
3516            system_children.is_empty(),
3517            "host system_children should be empty (procs are never system), got {:?}",
3518            system_children
3519        );
3520        // Verify host properties derived from attrs.
3521        assert!(
3522            matches!(&host_node.properties, NodeProperties::Host { .. }),
3523            "expected Host properties"
3524        );
3525
3526        // -- 6. Verify host children contain the system proc --
3527        let expected_system_ref = crate::introspect::NodeRef::Proc(system_proc_id.clone());
3528        assert!(
3529            host_node.children.contains(&expected_system_ref),
3530            "host children {:?} should contain the system proc ref {:?}",
3531            host_node.children,
3532            expected_system_ref
3533        );
3534
3535        // -- 7. Resolve a proc child --
3536        let proc_child_ref = &host_node.children[0];
3537        let proc_resp = admin_ref
3538            .resolve(&client, proc_child_ref.to_string())
3539            .await
3540            .unwrap();
3541        let proc_node = proc_resp.0.unwrap();
3542
3543        assert_eq!(
3544            proc_node.identity, *proc_child_ref,
3545            "identity must match the proc ref from the host's children list"
3546        );
3547
3548        assert!(
3549            matches!(proc_node.properties, NodeProperties::Proc { .. }),
3550            "expected NodeProperties::Proc, got {:?}",
3551            proc_node.properties
3552        );
3553
3554        let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone());
3555        assert_eq!(
3556            proc_node.parent,
3557            Some(host_node_ref),
3558            "proc parent should be the host reference"
3559        );
3560
3561        // as_of is a SystemTime — just verify it's not the epoch.
3562        assert!(
3563            proc_node.as_of > std::time::UNIX_EPOCH,
3564            "as_of should be after the epoch"
3565        );
3566
3567        // Verify proc properties derived from attrs.
3568        assert!(
3569            matches!(&proc_node.properties, NodeProperties::Proc { .. }),
3570            "expected Proc properties"
3571        );
3572    }
3573
3574    // -- AdminHandle / PublishedHandle tests --
3575
3576    // AdminHandle::parse — all four cases.
3577    #[test]
3578    fn test_admin_handle_parse_https_url() {
3579        let h = super::AdminHandle::parse("https://myhost:1729");
3580        assert!(matches!(h, super::AdminHandle::Url(u) if u == "https://myhost:1729"));
3581    }
3582
3583    #[test]
3584    fn test_admin_handle_parse_bare_host_port() {
3585        // Bare host:port → inferred as https://host:port.
3586        let h = super::AdminHandle::parse("myhost:1729");
3587        assert!(
3588            matches!(h, super::AdminHandle::Url(ref u) if u == "https://myhost:1729"),
3589            "bare host:port should become https://host:port, got: {:?}",
3590            matches!(h, super::AdminHandle::Url(_))
3591        );
3592    }
3593
3594    #[test]
3595    fn test_admin_handle_parse_mast() {
3596        let h = super::AdminHandle::parse("mast_conda:///my-job");
3597        assert!(matches!(
3598            h,
3599            super::AdminHandle::Published(super::PublishedHandle::Mast(_))
3600        ));
3601    }
3602
3603    #[test]
3604    fn test_admin_handle_parse_unsupported() {
3605        // Bare hostname with no port → Unsupported (no port, scheme inference fails).
3606        let h = super::AdminHandle::parse("junk_hostname_no_port");
3607        assert!(matches!(h, super::AdminHandle::Unsupported(_)));
3608    }
3609
3610    #[tokio::test]
3611    async fn test_admin_handle_resolve_url_returns_url() {
3612        let h = super::AdminHandle::parse("https://myhost:1729");
3613        let result = h.resolve(None).await.unwrap();
3614        assert_eq!(result, "https://myhost:1729");
3615    }
3616
3617    #[tokio::test]
3618    async fn test_admin_handle_resolve_published_returns_error() {
3619        let h = super::AdminHandle::parse("mast_conda:///test-job");
3620        let err = format!("{:#}", h.resolve(Some(1729)).await.unwrap_err());
3621        assert!(
3622            err.contains("not yet implemented"),
3623            "expected 'not yet implemented' in error, got: {}",
3624            err
3625        );
3626    }
3627
3628    #[tokio::test]
3629    async fn test_admin_handle_resolve_unsupported_returns_error() {
3630        let h = super::AdminHandle::parse("junk_hostname_no_port");
3631        let err = format!("{:#}", h.resolve(None).await.unwrap_err());
3632        assert!(
3633            err.contains("unrecognized admin handle"),
3634            "expected 'unrecognized admin handle' in error, got: {}",
3635            err
3636        );
3637    }
3638
3639    // resolve_mast_handle delegates to PublishedHandle::resolve.
3640    // Error text changed from "disabled" to "not yet implemented".
3641    #[tokio::test]
3642    async fn test_resolve_mast_handle_returns_not_yet_implemented_error() {
3643        let result = super::resolve_mast_handle("mast_conda:///test-job", Some(1729)).await;
3644        let err = format!("{:#}", result.unwrap_err());
3645        assert!(
3646            err.contains("not yet implemented"),
3647            "expected 'not yet implemented' in error, got: {}",
3648            err
3649        );
3650    }
3651
3652    // -- AdminInfo::new() constructor tests --
3653
3654    // Constructor guarantee: valid https URL produces correct host.
3655    #[test]
3656    fn test_admin_info_new_derives_host_from_url() {
3657        let info = super::AdminInfo::new(
3658            "actor".to_string(),
3659            "proc".to_string(),
3660            "https://myhost.example.com:1729".to_string(),
3661        )
3662        .unwrap();
3663        assert_eq!(info.host, "myhost.example.com");
3664        assert_eq!(info.url, "https://myhost.example.com:1729");
3665    }
3666
3667    // Constructor guarantee: invalid URL is rejected.
3668    #[test]
3669    fn test_admin_info_new_rejects_invalid_url() {
3670        let result = super::AdminInfo::new(
3671            "actor".to_string(),
3672            "proc".to_string(),
3673            "not a url".to_string(),
3674        );
3675        assert!(result.is_err(), "invalid URL must be rejected");
3676    }
3677
3678    // Constructor guarantee: URL with no host is rejected.
3679    #[test]
3680    fn test_admin_info_new_rejects_url_without_host() {
3681        // data: URLs have no host component.
3682        let result = super::AdminInfo::new(
3683            "actor".to_string(),
3684            "proc".to_string(),
3685            "data:text/plain,hello".to_string(),
3686        );
3687        assert!(result.is_err(), "URL without host must be rejected");
3688    }
3689
3690    // -- Placement test (SA-5) --
3691
3692    // Exercises the real public entrypoint and checks SA-5 via
3693    // ActorRef reachability on the caller proc.
3694    #[tokio::test]
3695    async fn test_spawn_admin_places_on_caller_proc() {
3696        use hyperactor::Proc;
3697        use hyperactor::channel::ChannelTransport;
3698        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3699
3700        use crate::host_mesh::HostMesh;
3701
3702        // 1. Stand up a local in-process host mesh.
3703        let host_mesh = HostMesh::local().await.unwrap();
3704
3705        // 2. Create a separate caller proc with an actor instance.
3706        let caller_proc = Proc::direct(ChannelTransport::Unix.any(), "caller".to_string()).unwrap();
3707        let _supervision = ProcSupervisionCoordinator::set(&caller_proc).await.unwrap();
3708        let (caller_cx, _caller_handle) = caller_proc.instance("caller").unwrap();
3709
3710        // 3. Call the real public entrypoint.
3711        let admin_url = crate::host_mesh::spawn_admin(
3712            [&host_mesh],
3713            &caller_cx,
3714            Some("[::]:0".parse().unwrap()),
3715            None,
3716        )
3717        .await
3718        .unwrap();
3719
3720        assert!(!admin_url.is_empty(), "spawn_admin must return a URL");
3721
3722        // 4. Prove the admin is on caller_proc: construct an ActorRef
3723        //    targeting "mesh_admin[0]" on caller_proc and send it a
3724        //    GetAdminAddr message. If the admin were on a different
3725        //    proc, this message would be undeliverable.
3726        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> =
3727            hyperactor_reference::ActorRef::attest(
3728                caller_proc.proc_id().actor_id(MESH_ADMIN_ACTOR_NAME, 0),
3729            );
3730        let probe_proc = Proc::direct(ChannelTransport::Unix.any(), "probe".to_string()).unwrap();
3731        let (probe_cx, _probe_handle) = probe_proc.instance("probe").unwrap();
3732        let resp = admin_ref.get_admin_addr(&probe_cx).await.unwrap();
3733        assert_eq!(
3734            resp.addr.as_deref(),
3735            Some(admin_url.as_str()),
3736            "SA-5: admin on caller_proc must respond to GetAdminAddr"
3737        );
3738    }
3739
3740    // AI-1..AI-3: GET /v1/admin HTTP route test requires TLS certs
3741    // (fbcode_build enforces mTLS). Covered by integration tests in
3742    // fbcode//monarch/hyperactor_mesh/test/mesh_admin_integration.
3743    // AI-4 is a constructor guarantee tested via AdminInfo::new() above.
3744
3745    // Verifies that GET /v1/{proc_id} reflects actors spawned directly
3746    // on a proc — bypassing ProcAgent's gspawn message and therefore
3747    // never triggering publish_introspect_properties — so that the
3748    // resolved children list is always derived from live proc state.
3749    //
3750    // Regression guard for the bug introduced in 9a08d559: the switch
3751    // from a live handle_introspect to a cached publish model made
3752    // supervision-spawned actors (e.g. every sieve actor after
3753    // sieve[0]) invisible to the TUI.
3754    //
3755    // Exercises PA-1 (see module doc). See also
3756    // proc_agent::tests::test_query_child_proc_returns_live_children.
3757    #[tokio::test]
3758    async fn test_proc_children_reflect_directly_spawned_actors() {
3759        use hyperactor::Proc;
3760        use hyperactor::actor::ActorStatus;
3761        use hyperactor::channel::ChannelTransport;
3762        use hyperactor::host::Host;
3763        use hyperactor::host::LocalProcManager;
3764        use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3765
3766        use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
3767        use crate::host_mesh::host_agent::HostAgent;
3768        use crate::host_mesh::host_agent::HostAgentMode;
3769        use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3770        use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
3771        use crate::proc_agent::ProcAgent;
3772
3773        // Stand up a HostMeshAgent. The user proc gets its own
3774        // ephemeral address; we register that address in
3775        // MeshAdminAgent so resolve_proc_node can look it up.
3776        // HostMeshAgent won't know the user proc (it wasn't spawned
3777        // through it), so QueryChild returns Error and resolve falls
3778        // back to querying proc_agent[0] via QueryChild(Proc) — the
3779        // path being tested.
3780        let spawn_fn: ProcManagerSpawnFn =
3781            Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3782        let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn_fn);
3783        let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3784            Host::new(manager, ChannelTransport::Unix.any())
3785                .await
3786                .unwrap();
3787        let system_proc = host.system_proc().clone();
3788        let host_agent_handle = system_proc
3789            .spawn(
3790                HOST_MESH_AGENT_ACTOR_NAME,
3791                HostAgent::new(HostAgentMode::Local(host)),
3792            )
3793            .unwrap();
3794        let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3795
3796        // User proc: own ephemeral Unix socket, own ProcAgent.
3797        let user_proc =
3798            Proc::direct(ChannelTransport::Unix.any(), "user_proc".to_string()).unwrap();
3799        let user_proc_addr = user_proc.proc_id().addr().to_string();
3800        let agent_handle = ProcAgent::boot_v1(user_proc.clone(), None).unwrap();
3801        agent_handle
3802            .status()
3803            .wait_for(|s| matches!(s, ActorStatus::Idle))
3804            .await
3805            .unwrap();
3806
3807        // MeshAdminAgent: register the user proc's addr as a "host"
3808        // pointing to host_agent_ref. That agent doesn't know the
3809        // user proc, so QueryChild → Error → fallback to proc_agent.
3810        // NOTE: Does not conform to SA-5 (caller-local placement).
3811        // White-box test of proc-agent fallback, not placement.
3812        let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3813        let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3814        let admin_handle = admin_proc
3815            .spawn(
3816                MESH_ADMIN_ACTOR_NAME,
3817                MeshAdminAgent::new(
3818                    vec![(user_proc_addr, host_agent_ref.clone())],
3819                    None,
3820                    Some("[::]:0".parse().unwrap()),
3821                    None,
3822                ),
3823            )
3824            .unwrap();
3825        let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3826
3827        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3828        let (client, _client_handle) = client_proc.instance("client").unwrap();
3829
3830        // Resolve the user proc via MeshAdminAgent. HostMeshAgent
3831        // returns Error for QueryChild → fallback to proc_agent[0]
3832        // QueryChild(Reference::Proc) → live NodeProperties::Proc.
3833        let user_proc_ref = user_proc.proc_id().to_string();
3834        let resp = admin_ref
3835            .resolve(&client, user_proc_ref.clone())
3836            .await
3837            .unwrap();
3838        let node = resp.0.unwrap();
3839        assert!(
3840            matches!(node.properties, NodeProperties::Proc { .. }),
3841            "expected Proc, got {:?}",
3842            node.properties
3843        );
3844        let initial_count = node.children.len();
3845        assert!(
3846            node.children
3847                .iter()
3848                .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
3849            "initial children {:?} should contain proc_agent",
3850            node.children
3851        );
3852
3853        // Spawn an actor directly on the user proc, bypassing gspawn.
3854        // This simulates how sieve[0] spawns sieve[1], sieve[2], etc.
3855        user_proc
3856            .spawn("extra_actor", TestIntrospectableActor)
3857            .unwrap();
3858
3859        // Resolve again — the new actor must appear immediately
3860        // without any republish, proving PA-1 is satisfied.
3861        let resp2 = admin_ref
3862            .resolve(&client, user_proc_ref.clone())
3863            .await
3864            .unwrap();
3865        let node2 = resp2.0.unwrap();
3866        assert!(
3867            matches!(node2.properties, NodeProperties::Proc { .. }),
3868            "expected Proc, got {:?}",
3869            node2.properties
3870        );
3871        assert!(
3872            node2
3873                .children
3874                .iter()
3875                .any(|c| c.to_string().contains("extra_actor")),
3876            "after direct spawn, children {:?} should contain extra_actor",
3877            node2.children
3878        );
3879        assert!(
3880            node2.children.len() > initial_count,
3881            "expected at least {} children after direct spawn, got {:?}",
3882            initial_count + 1,
3883            node2.children
3884        );
3885    }
3886
3887    // -- pyspy bridge input validation tests --
3888    //
3889    // Tests for the v1 proc-reference strictness contract (see
3890    // introspect module doc): the py-spy bridge accepts only
3891    // ProcId-form references and rejects other forms as bad_request.
3892
3893    #[test]
3894    fn pyspy_parse_empty_reference() {
3895        // v1 contract: empty input → bad_request.
3896        let err = parse_pyspy_proc_reference("").unwrap_err();
3897        assert_eq!(err.code, "bad_request");
3898        assert!(err.message.contains("empty"));
3899    }
3900
3901    #[test]
3902    fn pyspy_parse_slash_only() {
3903        // v1 contract: slash-only (axum wildcard artifact) → bad_request.
3904        let err = parse_pyspy_proc_reference("/").unwrap_err();
3905        assert_eq!(err.code, "bad_request");
3906        assert!(err.message.contains("empty"));
3907    }
3908
3909    #[test]
3910    fn pyspy_parse_malformed_percent_encoding() {
3911        // v1 contract: malformed encoding → bad_request.
3912        // %FF%FE is not valid UTF-8.
3913        let err = parse_pyspy_proc_reference("%FF%FE").unwrap_err();
3914        assert_eq!(err.code, "bad_request");
3915        assert!(err.message.contains("percent-encoding"));
3916    }
3917
3918    #[test]
3919    fn pyspy_parse_invalid_proc_id() {
3920        // v1 contract: non-ProcId reference → bad_request.
3921        let err = parse_pyspy_proc_reference("not-a-valid-proc-id").unwrap_err();
3922        assert_eq!(err.code, "bad_request");
3923        assert!(err.message.contains("invalid proc reference"));
3924    }
3925
3926    #[test]
3927    fn pyspy_parse_valid_proc_reference() {
3928        // v1 contract: valid ProcId → accepted.
3929        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
3930        let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
3931        let proc_id_str = proc_id.to_string();
3932
3933        let (decoded, parsed) = parse_pyspy_proc_reference(&proc_id_str).unwrap();
3934        assert_eq!(decoded, proc_id_str);
3935        assert_eq!(parsed, proc_id);
3936    }
3937
3938    #[test]
3939    fn pyspy_parse_strips_leading_slash() {
3940        // v1 contract: leading slash from axum wildcard is stripped.
3941        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
3942        let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
3943        let with_slash = format!("/{}", proc_id);
3944
3945        let (_, parsed) = parse_pyspy_proc_reference(&with_slash).unwrap();
3946        assert_eq!(parsed, proc_id);
3947    }
3948}