1use std::collections::HashMap;
338use std::io;
339use std::sync::Arc;
340use std::time::Duration;
341
342use async_trait::async_trait;
343use axum::Json;
344use axum::Router;
345use axum::extract::Path as AxumPath;
346use axum::extract::State;
347use axum::http::StatusCode;
348use axum::response::IntoResponse;
349use axum::routing::get;
350use axum::routing::post;
351use hyperactor::Actor;
352use hyperactor::ActorHandle;
353use hyperactor::Context;
354use hyperactor::HandleClient;
355use hyperactor::Handler;
356use hyperactor::Instance;
357use hyperactor::RefClient;
358use hyperactor::channel::try_tls_acceptor;
359use hyperactor::host::SERVICE_PROC_NAME;
360use hyperactor::introspect::IntrospectMessage;
361use hyperactor::introspect::IntrospectResult;
362use hyperactor::introspect::IntrospectView;
363use hyperactor::mailbox::open_once_port;
364use hyperactor::reference as hyperactor_reference;
365use serde::Deserialize;
366use serde::Serialize;
367use serde_json::Value;
368use tokio::net::TcpListener;
369use tokio_rustls::TlsAcceptor;
370use typeuri::Named;
371
372use crate::config_dump::ConfigDump;
373use crate::config_dump::ConfigDumpResult;
374use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
375use crate::host_mesh::host_agent::HostAgent;
376use crate::introspect::NodePayload;
377use crate::introspect::NodeProperties;
378use crate::introspect::dto::NodePayloadDto;
379use crate::introspect::to_node_payload;
380use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
381use crate::pyspy::PySpyDump;
382use crate::pyspy::PySpyOpts;
383use crate::pyspy::PySpyResult;
384
385async fn query_introspect(
388 cx: &hyperactor::Context<'_, MeshAdminAgent>,
389 actor_id: &hyperactor_reference::ActorId,
390 view: hyperactor::introspect::IntrospectView,
391 timeout: Duration,
392 err_ctx: &str,
393) -> Result<IntrospectResult, anyhow::Error> {
394 let introspect_port =
395 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
396 let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
397 let mut reply_ref = reply_handle.bind();
398 reply_ref.return_undeliverable(false);
399 introspect_port.send(
400 cx,
401 IntrospectMessage::Query {
402 view,
403 reply: reply_ref,
404 },
405 )?;
406 tokio::time::timeout(timeout, reply_rx.recv())
407 .await
408 .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
409 .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
410}
411
412async fn query_child_introspect(
414 cx: &hyperactor::Context<'_, MeshAdminAgent>,
415 actor_id: &hyperactor_reference::ActorId,
416 child_ref: hyperactor_reference::Reference,
417 timeout: Duration,
418 err_ctx: &str,
419) -> Result<IntrospectResult, anyhow::Error> {
420 let introspect_port =
421 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
422 let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
423 let mut reply_ref = reply_handle.bind();
424 reply_ref.return_undeliverable(false);
425 introspect_port.send(
426 cx,
427 IntrospectMessage::QueryChild {
428 child_ref,
429 reply: reply_ref,
430 },
431 )?;
432 tokio::time::timeout(timeout, reply_rx.recv())
433 .await
434 .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
435 .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
436}
437
438pub const MESH_ADMIN_ACTOR_NAME: &str = "mesh_admin";
440
441pub const MESH_ADMIN_BRIDGE_NAME: &str = "mesh_admin_bridge";
458
459#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
462pub struct ApiError {
463 pub code: String,
465 pub message: String,
467 #[serde(skip_serializing_if = "Option::is_none")]
471 pub details: Option<Value>,
472}
473
474#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
476pub struct ApiErrorEnvelope {
477 pub error: ApiError,
478}
479
480impl ApiError {
481 pub fn not_found(message: impl Into<String>, details: Option<Value>) -> Self {
483 Self {
484 code: "not_found".to_string(),
485 message: message.into(),
486 details,
487 }
488 }
489
490 pub fn bad_request(message: impl Into<String>, details: Option<Value>) -> Self {
492 Self {
493 code: "bad_request".to_string(),
494 message: message.into(),
495 details,
496 }
497 }
498}
499
500impl IntoResponse for ApiError {
501 fn into_response(self) -> axum::response::Response {
502 let status = match self.code.as_str() {
503 "not_found" => StatusCode::NOT_FOUND,
504 "bad_request" => StatusCode::BAD_REQUEST,
505 "gateway_timeout" => StatusCode::GATEWAY_TIMEOUT,
506 "service_unavailable" => StatusCode::SERVICE_UNAVAILABLE,
507 _ => StatusCode::INTERNAL_SERVER_ERROR,
508 };
509 let envelope = ApiErrorEnvelope { error: self };
510 (status, Json(envelope)).into_response()
511 }
512}
513
514#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
519pub struct MeshAdminAddrResponse {
520 pub addr: Option<String>,
521}
522wirevalue::register_type!(MeshAdminAddrResponse);
523
524#[derive(
530 Debug,
531 Clone,
532 PartialEq,
533 Serialize,
534 Deserialize,
535 Handler,
536 HandleClient,
537 RefClient,
538 Named
539)]
540pub enum MeshAdminMessage {
541 GetAdminAddr {
546 #[reply]
547 reply: hyperactor_reference::OncePortRef<MeshAdminAddrResponse>,
548 },
549}
550wirevalue::register_type!(MeshAdminMessage);
551
552#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
555pub struct ResolveReferenceResponse(pub Result<NodePayload, String>);
556wirevalue::register_type!(ResolveReferenceResponse);
557
558#[derive(
575 Debug,
576 Clone,
577 PartialEq,
578 Serialize,
579 Deserialize,
580 Handler,
581 HandleClient,
582 RefClient,
583 Named
584)]
585pub enum ResolveReferenceMessage {
586 Resolve {
591 reference_string: String,
594 #[reply]
596 reply: hyperactor_reference::OncePortRef<ResolveReferenceResponse>,
597 },
598}
599wirevalue::register_type!(ResolveReferenceMessage);
600
601#[hyperactor::export(handlers = [MeshAdminMessage, ResolveReferenceMessage])]
614pub struct MeshAdminAgent {
615 hosts: HashMap<String, hyperactor_reference::ActorRef<HostAgent>>,
618
619 host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String>,
629
630 root_client_actor_id: Option<hyperactor_reference::ActorId>,
635
636 self_actor_id: Option<hyperactor_reference::ActorId>,
641
642 admin_addr_override: Option<std::net::SocketAddr>,
659
660 admin_addr: Option<std::net::SocketAddr>,
663
664 admin_host: Option<String>,
667
668 telemetry_url: Option<String>,
672
673 started_at: String,
675
676 started_by: String,
678}
679
680impl MeshAdminAgent {
681 pub fn new(
699 hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
700 root_client_actor_id: Option<hyperactor_reference::ActorId>,
701 admin_addr: Option<std::net::SocketAddr>,
702 telemetry_url: Option<String>,
703 ) -> Self {
704 let host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String> = hosts
705 .iter()
706 .map(|(addr, agent_ref)| (agent_ref.actor_id().clone(), addr.clone()))
707 .collect();
708
709 let started_at = chrono::Utc::now().to_rfc3339();
711 let started_by = std::env::var("USER")
712 .or_else(|_| std::env::var("USERNAME"))
713 .unwrap_or_else(|_| "unknown".to_string());
714
715 Self {
716 hosts: hosts.into_iter().collect(),
717 host_agents_by_actor_id,
718 root_client_actor_id,
719 self_actor_id: None,
720 admin_addr_override: admin_addr,
721 admin_addr: None,
722 admin_host: None,
723 telemetry_url,
724 started_at,
725 started_by,
726 }
727 }
728}
729
730impl std::fmt::Debug for MeshAdminAgent {
731 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
732 f.debug_struct("MeshAdminAgent")
733 .field("hosts", &self.hosts.keys().collect::<Vec<_>>())
734 .field("host_agents", &self.host_agents_by_actor_id.len())
735 .field("root_client_actor_id", &self.root_client_actor_id)
736 .field("self_actor_id", &self.self_actor_id)
737 .field("admin_addr", &self.admin_addr)
738 .field("admin_host", &self.admin_host)
739 .field("started_at", &self.started_at)
740 .field("started_by", &self.started_by)
741 .finish()
742 }
743}
744
745#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema)]
752pub struct AdminInfo {
753 pub actor_id: String,
755 pub proc_id: String,
757 pub host: String,
759 pub url: String,
761}
762
763impl AdminInfo {
764 pub fn new(actor_id: String, proc_id: String, url: String) -> anyhow::Result<Self> {
771 let parsed = url::Url::parse(&url)
772 .map_err(|e| anyhow::anyhow!("invalid admin URL '{}': {}", url, e))?;
773 let host = parsed
774 .host_str()
775 .ok_or_else(|| anyhow::anyhow!("admin URL '{}' has no host", url))?
776 .to_string();
777 Ok(Self {
778 actor_id,
779 proc_id,
780 host,
781 url,
782 })
783 }
784}
785
786struct BridgeState {
795 admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent>,
798 bridge_cx: Instance<()>,
806 resolve_semaphore: tokio::sync::Semaphore,
810 _bridge_handle: ActorHandle<()>,
812 telemetry_url: Option<String>,
817 http_client: reqwest::Client,
820 admin_info: AdminInfo,
822}
823
824struct TlsListener {
831 tcp: TcpListener,
832 acceptor: TlsAcceptor,
833}
834
835impl axum::serve::Listener for TlsListener {
836 type Io = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
837 type Addr = std::net::SocketAddr;
838
839 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
840 loop {
841 let (stream, addr) = match self.tcp.accept().await {
842 Ok(conn) => conn,
843 Err(e) => {
844 tracing::warn!("TCP accept error: {}", e);
845 continue;
846 }
847 };
848
849 match self.acceptor.accept(stream).await {
850 Ok(tls_stream) => return (tls_stream, addr),
851 Err(e) => {
852 tracing::warn!("TLS handshake failed from {}: {}", addr, e);
853 continue;
854 }
855 }
856 }
857 }
858
859 fn local_addr(&self) -> io::Result<Self::Addr> {
860 self.tcp.local_addr()
861 }
862}
863
864#[async_trait]
865impl Actor for MeshAdminAgent {
866 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
887 this.bind::<Self>();
891 this.set_system();
892 self.self_actor_id = Some(this.self_id().clone());
893
894 let bind_addr = match self.admin_addr_override {
895 Some(addr) => addr,
896 None => hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR)
897 .parse_socket_addr()
898 .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?,
899 };
900 let listener = TcpListener::bind(bind_addr).await?;
901 let bound_addr = listener.local_addr()?;
902 let host = hostname::get()
906 .unwrap_or_else(|_| "localhost".into())
907 .into_string()
908 .unwrap_or_else(|_| "localhost".to_string());
909 self.admin_addr = Some(bound_addr);
910
911 let enforce_mtls = cfg!(fbcode_build);
915 let tls_acceptor = try_tls_acceptor(enforce_mtls);
916
917 if enforce_mtls && tls_acceptor.is_none() {
918 return Err(anyhow::anyhow!(
919 "mesh admin requires mTLS but no TLS certificates found; \
920 set HYPERACTOR_TLS_CERT/KEY/CA or ensure Meta cert paths exist \
921 (/var/facebook/x509_identities/server.pem, /var/facebook/rootcanal/ca.pem)"
922 ));
923 }
924
925 let scheme = if tls_acceptor.is_some() {
926 "https"
927 } else {
928 "http"
929 };
930 self.admin_host = Some(format!("{}://{}:{}", scheme, host, bound_addr.port()));
931
932 let (bridge_cx, bridge_handle) = this
936 .proc()
937 .introspectable_instance(MESH_ADMIN_BRIDGE_NAME)?;
938 bridge_cx.set_system();
939 let admin_url = self
940 .admin_host
941 .clone()
942 .unwrap_or_else(|| "unknown".to_string());
943 let bridge_state = Arc::new(BridgeState {
944 admin_ref: hyperactor_reference::ActorRef::attest(this.self_id().clone()),
945 bridge_cx,
946 resolve_semaphore: tokio::sync::Semaphore::new(hyperactor_config::global::get(
947 crate::config::MESH_ADMIN_MAX_CONCURRENT_RESOLVES,
948 )),
949 _bridge_handle: bridge_handle,
950 telemetry_url: self.telemetry_url.clone(),
951 http_client: reqwest::Client::new(),
952 admin_info: AdminInfo::new(
953 this.self_id().to_string(),
954 this.self_id().proc_id().to_string(),
955 admin_url,
956 )?,
957 });
958 let router = create_mesh_admin_router(bridge_state);
959
960 if let Some(acceptor) = tls_acceptor {
961 let tls_listener = TlsListener {
962 tcp: listener,
963 acceptor,
964 };
965 tokio::spawn(async move {
966 if let Err(e) = axum::serve(tls_listener, router).await {
967 tracing::error!("mesh admin server (mTLS) error: {}", e);
968 }
969 });
970 } else {
971 tokio::spawn(async move {
973 if let Err(e) = axum::serve(listener, router).await {
974 tracing::error!("mesh admin server error: {}", e);
975 }
976 });
977 }
978
979 tracing::info!(
980 "mesh admin server listening on {}",
981 self.admin_host.as_deref().unwrap_or("unknown")
982 );
983 Ok(())
984 }
985
986 async fn handle_undeliverable_message(
1000 &mut self,
1001 _cx: &Instance<Self>,
1002 hyperactor::mailbox::Undeliverable(envelope): hyperactor::mailbox::Undeliverable<
1003 hyperactor::mailbox::MessageEnvelope,
1004 >,
1005 ) -> Result<(), anyhow::Error> {
1006 tracing::debug!(
1007 "admin agent: undeliverable message to {} (port not bound?), ignoring",
1008 envelope.dest(),
1009 );
1010 Ok(())
1011 }
1012}
1013
1014#[async_trait]
1017impl Handler<MeshAdminMessage> for MeshAdminAgent {
1018 async fn handle(
1025 &mut self,
1026 cx: &Context<Self>,
1027 msg: MeshAdminMessage,
1028 ) -> Result<(), anyhow::Error> {
1029 match msg {
1030 MeshAdminMessage::GetAdminAddr { reply } => {
1031 let resp = MeshAdminAddrResponse {
1032 addr: self.admin_host.clone(),
1033 };
1034 if let Err(e) = reply.send(cx, resp) {
1035 tracing::debug!("GetAdminAddr reply failed (caller gone?): {e}");
1036 }
1037 }
1038 }
1039 Ok(())
1040 }
1041}
1042
1043#[async_trait]
1046impl Handler<ResolveReferenceMessage> for MeshAdminAgent {
1047 async fn handle(
1055 &mut self,
1056 cx: &Context<Self>,
1057 msg: ResolveReferenceMessage,
1058 ) -> Result<(), anyhow::Error> {
1059 match msg {
1060 ResolveReferenceMessage::Resolve {
1061 reference_string,
1062 reply,
1063 } => {
1064 let response = ResolveReferenceResponse(
1065 self.resolve_reference(cx, &reference_string)
1066 .await
1067 .map_err(|e| format!("{:#}", e)),
1068 );
1069 if let Err(e) = reply.send(cx, response) {
1070 tracing::debug!("Resolve reply failed (caller gone?): {e}");
1071 }
1072 }
1073 }
1074 Ok(())
1075 }
1076}
1077
1078impl MeshAdminAgent {
1079 async fn resolve_reference(
1098 &self,
1099 cx: &Context<'_, Self>,
1100 reference_string: &str,
1101 ) -> Result<NodePayload, anyhow::Error> {
1102 let node_ref: crate::introspect::NodeRef = reference_string
1103 .parse()
1104 .map_err(|e| anyhow::anyhow!("invalid reference '{}': {}", reference_string, e))?;
1105
1106 match &node_ref {
1107 crate::introspect::NodeRef::Root => Ok(self.build_root_payload()),
1108 crate::introspect::NodeRef::Host(actor_id) => {
1109 self.resolve_host_node(cx, actor_id).await
1110 }
1111 crate::introspect::NodeRef::Proc(proc_id) => {
1112 match self.resolve_proc_node(cx, proc_id).await {
1113 Ok(payload) => Ok(payload),
1114 Err(_) if self.standalone_proc_anchor(proc_id).is_some() => {
1115 self.resolve_standalone_proc_node(cx, proc_id).await
1116 }
1117 Err(e) => Err(e),
1118 }
1119 }
1120 crate::introspect::NodeRef::Actor(actor_id) => {
1121 self.resolve_actor_node(cx, actor_id).await
1122 }
1123 }
1124 }
1125
1126 fn standalone_proc_actors(&self) -> impl Iterator<Item = &hyperactor_reference::ActorId> {
1134 std::iter::empty()
1135 }
1136
1137 fn standalone_proc_anchor(
1140 &self,
1141 proc_id: &hyperactor_reference::ProcId,
1142 ) -> Option<&hyperactor_reference::ActorId> {
1143 self.standalone_proc_actors()
1144 .find(|actor_id| *actor_id.proc_id() == *proc_id)
1145 }
1146
1147 fn is_standalone_proc_actor(&self, actor_id: &hyperactor_reference::ActorId) -> bool {
1149 self.standalone_proc_actors()
1150 .any(|a| *a.proc_id() == *actor_id.proc_id())
1151 }
1152
1153 fn build_root_payload(&self) -> NodePayload {
1159 use crate::introspect::NodeRef;
1160
1161 let children: Vec<NodeRef> = self
1162 .hosts
1163 .values()
1164 .map(|agent| NodeRef::Host(agent.actor_id().clone()))
1165 .collect();
1166 let system_children: Vec<NodeRef> = Vec::new(); let mut attrs = hyperactor_config::Attrs::new();
1168 attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
1169 attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
1170 if let Ok(t) = humantime::parse_rfc3339(&self.started_at) {
1171 attrs.set(crate::introspect::STARTED_AT, t);
1172 }
1173 attrs.set(crate::introspect::STARTED_BY, self.started_by.clone());
1174 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1175 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1176 NodePayload {
1177 identity: NodeRef::Root,
1178 properties: crate::introspect::derive_properties(&attrs_json),
1179 children,
1180 parent: None,
1181 as_of: std::time::SystemTime::now(),
1182 }
1183 }
1184
1185 async fn resolve_host_node(
1194 &self,
1195 cx: &Context<'_, Self>,
1196 actor_id: &hyperactor_reference::ActorId,
1197 ) -> Result<NodePayload, anyhow::Error> {
1198 let result = query_introspect(
1199 cx,
1200 actor_id,
1201 hyperactor::introspect::IntrospectView::Entity,
1202 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1203 "querying host agent",
1204 )
1205 .await?;
1206 Ok(crate::introspect::to_node_payload_with(
1207 result,
1208 crate::introspect::NodeRef::Host(actor_id.clone()),
1209 Some(crate::introspect::NodeRef::Root),
1210 ))
1211 }
1212
1213 async fn resolve_proc_node(
1224 &self,
1225 cx: &Context<'_, Self>,
1226 proc_id: &hyperactor_reference::ProcId,
1227 ) -> Result<NodePayload, anyhow::Error> {
1228 let host_addr = proc_id.addr().to_string();
1229
1230 let agent = self
1231 .hosts
1232 .get(&host_addr)
1233 .ok_or_else(|| anyhow::anyhow!("host not found: {}", host_addr))?;
1234
1235 let result = query_child_introspect(
1237 cx,
1238 agent.actor_id(),
1239 hyperactor_reference::Reference::Proc(proc_id.clone()),
1240 hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1241 "querying proc details",
1242 )
1243 .await?;
1244
1245 let payload = crate::introspect::to_node_payload_with(
1249 result,
1250 crate::introspect::NodeRef::Proc(proc_id.clone()),
1251 Some(crate::introspect::NodeRef::Host(agent.actor_id().clone())),
1252 );
1253 if !matches!(payload.properties, NodeProperties::Error { .. }) {
1254 return Ok(payload);
1255 }
1256
1257 let mesh_agent_id = proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0);
1259 let result = query_child_introspect(
1260 cx,
1261 &mesh_agent_id,
1262 hyperactor_reference::Reference::Proc(proc_id.clone()),
1263 hyperactor_config::global::get(crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT),
1264 "querying proc mesh agent",
1265 )
1266 .await?;
1267
1268 Ok(crate::introspect::to_node_payload_with(
1269 result,
1270 crate::introspect::NodeRef::Proc(proc_id.clone()),
1271 Some(crate::introspect::NodeRef::Host(agent.actor_id().clone())),
1272 ))
1273 }
1274
1275 async fn resolve_standalone_proc_node(
1289 &self,
1290 cx: &Context<'_, Self>,
1291 proc_id: &hyperactor_reference::ProcId,
1292 ) -> Result<NodePayload, anyhow::Error> {
1293 let actor_id = self
1294 .standalone_proc_anchor(proc_id)
1295 .ok_or_else(|| anyhow::anyhow!("no anchor actor for standalone proc {}", proc_id))?;
1296
1297 use crate::introspect::NodeRef;
1298
1299 let (children, system_children) = if self.self_actor_id.as_ref() == Some(actor_id) {
1300 let self_ref = NodeRef::Actor(actor_id.clone());
1301 (vec![self_ref.clone()], vec![self_ref])
1302 } else {
1303 let actor_result = query_introspect(
1304 cx,
1305 actor_id,
1306 hyperactor::introspect::IntrospectView::Actor,
1307 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1308 &format!("querying anchor actor on {}", proc_id),
1309 )
1310 .await?;
1311 let actor_payload = to_node_payload(actor_result);
1312 let anchor_ref = NodeRef::Actor(actor_id.clone());
1313 let anchor_is_system = matches!(
1314 &actor_payload.properties,
1315 NodeProperties::Actor {
1316 is_system: true,
1317 ..
1318 }
1319 );
1320
1321 let mut children = vec![anchor_ref.clone()];
1322 let mut system_children = Vec::new();
1323 if anchor_is_system {
1324 system_children.push(anchor_ref);
1325 }
1326
1327 for child_ref in actor_payload.children {
1328 let child_actor_id = match &child_ref {
1329 NodeRef::Actor(id) => Some(id),
1330 _ => None,
1331 };
1332 if let Some(child_actor_id) = child_actor_id {
1333 let child_is_system = if let Ok(r) = query_introspect(
1334 cx,
1335 child_actor_id,
1336 hyperactor::introspect::IntrospectView::Actor,
1337 hyperactor_config::global::get(
1338 crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1339 ),
1340 "querying child actor is_system",
1341 )
1342 .await
1343 {
1344 let p = to_node_payload(r);
1345 matches!(
1346 &p.properties,
1347 NodeProperties::Actor {
1348 is_system: true,
1349 ..
1350 }
1351 )
1352 } else {
1353 false
1354 };
1355 if child_is_system {
1356 system_children.push(child_ref.clone());
1357 }
1358 }
1359 children.push(child_ref);
1360 }
1361 (children, system_children)
1362 };
1363
1364 let proc_name = proc_id.name().to_string();
1365
1366 let mut attrs = hyperactor_config::Attrs::new();
1367 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
1368 attrs.set(crate::introspect::PROC_NAME, proc_name.clone());
1369 attrs.set(crate::introspect::NUM_ACTORS, children.len());
1370 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1371 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1372
1373 Ok(NodePayload {
1374 identity: NodeRef::Proc(proc_id.clone()),
1375 properties: crate::introspect::derive_properties(&attrs_json),
1376 children,
1377 as_of: std::time::SystemTime::now(),
1378 parent: Some(NodeRef::Root),
1379 })
1380 }
1381
1382 async fn resolve_actor_node(
1396 &self,
1397 cx: &Context<'_, Self>,
1398 actor_id: &hyperactor_reference::ActorId,
1399 ) -> Result<NodePayload, anyhow::Error> {
1400 let result = if self.self_actor_id.as_ref() == Some(actor_id) {
1405 cx.introspect_payload()
1406 } else if self.is_standalone_proc_actor(actor_id) {
1407 query_introspect(
1409 cx,
1410 actor_id,
1411 hyperactor::introspect::IntrospectView::Actor,
1412 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1413 &format!("querying actor {}", actor_id),
1414 )
1415 .await?
1416 } else {
1417 let proc_id = actor_id.proc_id();
1419 let mesh_agent_id = proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0);
1420 let terminated = query_child_introspect(
1421 cx,
1422 &mesh_agent_id,
1423 hyperactor_reference::Reference::Actor(actor_id.clone()),
1424 hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1425 "querying terminated snapshot",
1426 )
1427 .await
1428 .ok()
1429 .filter(|r| {
1430 let p = crate::introspect::derive_properties(&r.attrs);
1431 !matches!(p, NodeProperties::Error { .. })
1432 });
1433
1434 match terminated {
1435 Some(snapshot) => snapshot,
1436 None => {
1437 query_introspect(
1439 cx,
1440 actor_id,
1441 hyperactor::introspect::IntrospectView::Actor,
1442 hyperactor_config::global::get(
1443 crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1444 ),
1445 &format!("querying actor {}", actor_id),
1446 )
1447 .await?
1448 }
1449 }
1450 };
1451 let mut payload = to_node_payload(result);
1452
1453 if self.is_standalone_proc_actor(actor_id) {
1454 payload.parent = Some(crate::introspect::NodeRef::Proc(actor_id.proc_id().clone()));
1455 return Ok(payload);
1456 }
1457
1458 let proc_id = actor_id.proc_id();
1459 match &payload.properties {
1460 NodeProperties::Proc { .. } => {
1461 let host_addr = proc_id.addr().to_string();
1462 if let Some(agent) = self.hosts.get(&host_addr) {
1463 payload.parent =
1464 Some(crate::introspect::NodeRef::Host(agent.actor_id().clone()));
1465 }
1466 }
1467 _ => {
1468 payload.parent = Some(crate::introspect::NodeRef::Proc(proc_id.clone()));
1469 }
1470 }
1471
1472 Ok(payload)
1473 }
1474}
1475
1476fn create_mesh_admin_router(bridge_state: Arc<BridgeState>) -> Router {
1491 Router::new()
1492 .route("/SKILL.md", get(serve_skill_md))
1493 .route("/v1/admin", get(serve_admin_info))
1495 .route("/v1/schema", get(serve_schema))
1496 .route("/v1/schema/admin", get(serve_admin_schema))
1497 .route("/v1/schema/error", get(serve_error_schema))
1498 .route("/v1/openapi.json", get(serve_openapi))
1499 .route("/v1/tree", get(tree_dump))
1500 .route("/v1/query", post(query_proxy))
1501 .route("/v1/pyspy/{*proc_reference}", get(pyspy_bridge))
1502 .route(
1503 "/v1/pyspy_dump/{*proc_reference}",
1504 post(pyspy_dump_and_store),
1505 )
1506 .route("/v1/config/{*proc_reference}", get(config_bridge))
1507 .route("/v1/{*reference}", get(resolve_reference_bridge))
1508 .with_state(bridge_state)
1509}
1510
1511const SKILL_MD_TEMPLATE: &str = include_str!("mesh_admin_skill.md");
1513
1514fn extract_base_url(headers: &axum::http::HeaderMap) -> String {
1520 let host = headers
1521 .get(axum::http::header::HOST)
1522 .and_then(|v| v.to_str().ok())
1523 .unwrap_or("localhost");
1524 let scheme = headers
1525 .get("x-forwarded-proto")
1526 .and_then(|v| v.to_str().ok())
1527 .unwrap_or("https");
1528 format!("{scheme}://{host}")
1529}
1530
1531async fn serve_admin_info(
1534 State(state): State<Arc<BridgeState>>,
1535) -> axum::response::Json<AdminInfo> {
1536 axum::response::Json(state.admin_info.clone())
1537}
1538
1539async fn serve_admin_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1541 Ok(axum::response::Json(schema_with_id::<AdminInfo>(
1542 "https://monarch.meta.com/schemas/v1/admin_info",
1543 )?))
1544}
1545
1546async fn serve_skill_md(headers: axum::http::HeaderMap) -> impl axum::response::IntoResponse {
1549 let base = extract_base_url(&headers);
1550 let body = SKILL_MD_TEMPLATE.replace("{base}", &base);
1551 (
1552 [(
1553 axum::http::header::CONTENT_TYPE,
1554 "text/markdown; charset=utf-8",
1555 )],
1556 body,
1557 )
1558}
1559
1560fn schema_with_id<T: schemars::JsonSchema>(id: &str) -> Result<serde_json::Value, ApiError> {
1562 let schema = schemars::schema_for!(T);
1563 let mut value = serde_json::to_value(schema).map_err(|e| ApiError {
1564 code: "internal_error".to_string(),
1565 message: format!("failed to serialize schema: {e}"),
1566 details: None,
1567 })?;
1568 if let Some(obj) = value.as_object_mut() {
1569 obj.insert("$id".into(), serde_json::Value::String(id.into()));
1570 }
1571 Ok(value)
1572}
1573
1574async fn serve_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1576 Ok(axum::response::Json(schema_with_id::<NodePayloadDto>(
1577 "https://monarch.meta.com/schemas/v1/node_payload",
1578 )?))
1579}
1580
1581async fn serve_error_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1583 Ok(axum::response::Json(schema_with_id::<ApiErrorEnvelope>(
1584 "https://monarch.meta.com/schemas/v1/error",
1585 )?))
1586}
1587
1588fn hoist_defs(
1592 schema: &mut serde_json::Value,
1593 shared: &mut serde_json::Map<String, serde_json::Value>,
1594) {
1595 if let Some(obj) = schema.as_object_mut() {
1596 if let Some(defs) = obj.remove("$defs") {
1597 if let Some(defs_map) = defs.as_object() {
1598 for (k, v) in defs_map {
1599 shared.insert(k.clone(), v.clone());
1600 }
1601 }
1602 }
1603 obj.remove("$schema");
1607 }
1608 rewrite_refs(schema);
1609}
1610
1611fn rewrite_refs(value: &mut serde_json::Value) {
1614 match value {
1615 serde_json::Value::Object(map) => {
1616 if let Some(serde_json::Value::String(r)) = map.get_mut("$ref") {
1617 if r.starts_with("#/$defs/") {
1618 *r = r.replace("#/$defs/", "#/components/schemas/");
1619 }
1620 }
1621 for v in map.values_mut() {
1622 rewrite_refs(v);
1623 }
1624 }
1625 serde_json::Value::Array(arr) => {
1626 for v in arr {
1627 rewrite_refs(v);
1628 }
1629 }
1630 _ => {}
1631 }
1632}
1633
1634pub fn build_openapi_spec() -> serde_json::Value {
1637 let mut node_schema = serde_json::to_value(schemars::schema_for!(NodePayloadDto))
1638 .expect("NodePayload schema must be serializable");
1639 let mut error_schema = serde_json::to_value(schemars::schema_for!(ApiErrorEnvelope))
1640 .expect("ApiErrorEnvelope schema must be serializable");
1641 let mut pyspy_schema = serde_json::to_value(schemars::schema_for!(PySpyResult))
1642 .expect("PySpyResult schema must be serializable");
1643 let mut query_request_schema = serde_json::to_value(schemars::schema_for!(QueryRequest))
1644 .expect("QueryRequest schema must be serializable");
1645 let mut query_response_schema = serde_json::to_value(schemars::schema_for!(QueryResponse))
1646 .expect("QueryResponse schema must be serializable");
1647 let mut pyspy_dump_response_schema =
1648 serde_json::to_value(schemars::schema_for!(PyspyDumpAndStoreResponse))
1649 .expect("PyspyDumpAndStoreResponse schema must be serializable");
1650 let mut admin_info_schema = serde_json::to_value(schemars::schema_for!(AdminInfo))
1651 .expect("AdminInfo schema must be serializable");
1652
1653 let mut shared_schemas = serde_json::Map::new();
1656 hoist_defs(&mut node_schema, &mut shared_schemas);
1657 hoist_defs(&mut error_schema, &mut shared_schemas);
1658 hoist_defs(&mut pyspy_schema, &mut shared_schemas);
1659 hoist_defs(&mut query_request_schema, &mut shared_schemas);
1660 hoist_defs(&mut query_response_schema, &mut shared_schemas);
1661 hoist_defs(&mut pyspy_dump_response_schema, &mut shared_schemas);
1662 hoist_defs(&mut admin_info_schema, &mut shared_schemas);
1663 shared_schemas.insert("NodePayload".into(), node_schema);
1664 shared_schemas.insert("ApiErrorEnvelope".into(), error_schema);
1665 shared_schemas.insert("PySpyResult".into(), pyspy_schema);
1666 shared_schemas.insert("QueryRequest".into(), query_request_schema);
1667 shared_schemas.insert("QueryResponse".into(), query_response_schema);
1668 shared_schemas.insert(
1669 "PyspyDumpAndStoreResponse".into(),
1670 pyspy_dump_response_schema,
1671 );
1672 shared_schemas.insert("AdminInfo".into(), admin_info_schema);
1673
1674 for value in shared_schemas.values_mut() {
1676 rewrite_refs(value);
1677 }
1678
1679 let error_response = |desc: &str| -> serde_json::Value {
1680 serde_json::json!({
1681 "description": desc,
1682 "content": {
1683 "application/json": {
1684 "schema": { "$ref": "#/components/schemas/ApiErrorEnvelope" }
1685 }
1686 }
1687 })
1688 };
1689
1690 let success_payload = serde_json::json!({
1691 "description": "Resolved NodePayload",
1692 "content": {
1693 "application/json": {
1694 "schema": { "$ref": "#/components/schemas/NodePayload" }
1695 }
1696 }
1697 });
1698
1699 let mut spec = serde_json::json!({
1700 "openapi": "3.1.0",
1701 "info": {
1702 "title": "Monarch Mesh Admin API",
1703 "version": "1.0.0",
1704 "description": "Reference-walking introspection API for a Monarch actor mesh. See the Admin Gateway Pattern RFC."
1705 },
1706 "paths": {
1707 "/v1/root": {
1708 "get": {
1709 "summary": "Fetch root node",
1710 "operationId": "getRoot",
1711 "responses": {
1712 "200": success_payload,
1713 "500": error_response("Internal error"),
1714 "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1715 "504": error_response("Gateway timeout (downstream host unresponsive)")
1716 }
1717 }
1718 },
1719 "/v1/{reference}": {
1720 "get": {
1721 "summary": "Resolve a reference to a NodePayload",
1722 "operationId": "resolveReference",
1723 "parameters": [{
1724 "name": "reference",
1725 "in": "path",
1726 "required": true,
1727 "description": "URL-encoded opaque reference string",
1728 "schema": { "type": "string" }
1729 }],
1730 "responses": {
1731 "200": success_payload,
1732 "400": error_response("Bad request (malformed reference)"),
1733 "404": error_response("Reference not found"),
1734 "500": error_response("Internal error"),
1735 "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1736 "504": error_response("Gateway timeout (downstream host unresponsive)")
1737 }
1738 }
1739 },
1740 "/v1/schema": {
1741 "get": {
1742 "summary": "JSON Schema for NodePayload (Draft 2020-12)",
1743 "operationId": "getSchema",
1744 "responses": {
1745 "200": {
1746 "description": "JSON Schema document",
1747 "content": { "application/json": {} }
1748 }
1749 }
1750 }
1751 },
1752 "/v1/schema/error": {
1753 "get": {
1754 "summary": "JSON Schema for ApiErrorEnvelope (Draft 2020-12)",
1755 "operationId": "getErrorSchema",
1756 "responses": {
1757 "200": {
1758 "description": "JSON Schema document",
1759 "content": { "application/json": {} }
1760 }
1761 }
1762 }
1763 },
1764 "/v1/admin": {
1765 "get": {
1766 "summary": "Admin self-identification (placement, identity, URL)",
1767 "operationId": "getAdminInfo",
1768 "description": "Returns the admin actor's identity, proc placement, hostname, and URL. Used for placement verification and operational discovery.",
1769 "responses": {
1770 "200": {
1771 "description": "AdminInfo — admin actor placement metadata",
1772 "content": {
1773 "application/json": {
1774 "schema": { "$ref": "#/components/schemas/AdminInfo" }
1775 }
1776 }
1777 }
1778 }
1779 }
1780 },
1781 "/v1/tree": {
1782 "get": {
1783 "summary": "ASCII topology dump (debug)",
1784 "operationId": "getTree",
1785 "responses": {
1786 "200": {
1787 "description": "Human-readable topology tree",
1788 "content": { "text/plain": {} }
1789 }
1790 }
1791 }
1792 },
1793 "/v1/config/{proc_reference}": {
1794 "get": {
1795 "summary": "Config snapshot for a proc",
1796 "operationId": "getConfig",
1797 "description": "Returns the effective CONFIG-marked configuration entries from the target process. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1798 "parameters": [{
1799 "name": "proc_reference",
1800 "in": "path",
1801 "required": true,
1802 "description": "URL-encoded proc reference (ProcId)",
1803 "schema": { "type": "string" }
1804 }],
1805 "responses": {
1806 "200": {
1807 "description": "ConfigDumpResult — sorted list of config entries",
1808 "content": {
1809 "application/json": {
1810 "schema": {
1811 "type": "object",
1812 "properties": {
1813 "entries": {
1814 "type": "array",
1815 "items": {
1816 "type": "object",
1817 "properties": {
1818 "name": { "type": "string" },
1819 "value": { "type": "string" },
1820 "default_value": { "type": ["string", "null"] },
1821 "source": { "type": "string" },
1822 "changed_from_default": { "type": "boolean" },
1823 "env_var": { "type": ["string", "null"] }
1824 }
1825 }
1826 }
1827 }
1828 }
1829 }
1830 }
1831 },
1832 "404": error_response("Proc not found or handler not reachable"),
1833 "500": error_response("Internal error"),
1834 "504": error_response("Gateway timeout")
1835 }
1836 }
1837 },
1838 "/v1/pyspy/{proc_reference}": {
1839 "get": {
1840 "summary": "Py-spy stack dump for a proc",
1841 "operationId": "getPyspy",
1842 "description": "Runs py-spy against the target process and returns structured stack traces. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1843 "parameters": [{
1844 "name": "proc_reference",
1845 "in": "path",
1846 "required": true,
1847 "description": "URL-encoded proc reference (ProcId)",
1848 "schema": { "type": "string" }
1849 }],
1850 "responses": {
1851 "200": {
1852 "description": "PySpyResult — one of Ok, BinaryNotFound, or Failed",
1853 "content": {
1854 "application/json": {
1855 "schema": { "$ref": "#/components/schemas/PySpyResult" }
1856 }
1857 }
1858 },
1859 "400": error_response("Bad request (malformed proc reference)"),
1860 "404": error_response("Proc not found or handler not reachable"),
1861 "500": error_response("Internal error"),
1862 "504": error_response("Gateway timeout")
1863 }
1864 }
1865 },
1866 "/v1/query": {
1867 "post": {
1868 "summary": "Proxy SQL query to the telemetry dashboard",
1869 "operationId": "queryProxy",
1870 "description": "Forwards a SQL query to the Monarch dashboard's DataFusion engine. Requires telemetry_url to be configured.",
1871 "requestBody": {
1872 "required": true,
1873 "content": {
1874 "application/json": {
1875 "schema": { "$ref": "#/components/schemas/QueryRequest" }
1876 }
1877 }
1878 },
1879 "responses": {
1880 "200": {
1881 "description": "Query results",
1882 "content": {
1883 "application/json": {
1884 "schema": { "$ref": "#/components/schemas/QueryResponse" }
1885 }
1886 }
1887 },
1888 "400": error_response("Bad request (invalid SQL or missing sql field)"),
1889 "404": error_response("Dashboard not configured"),
1890 "500": error_response("Internal error"),
1891 "504": error_response("Gateway timeout")
1892 }
1893 }
1894 },
1895 "/v1/pyspy_dump/{proc_reference}": {
1896 "post": {
1897 "summary": "Trigger py-spy dump and store in telemetry",
1898 "operationId": "pyspyDumpAndStore",
1899 "description": "Runs py-spy against the target process, stores the result in the dashboard's DataFusion pyspy tables, and returns the dump_id.",
1900 "parameters": [{
1901 "name": "proc_reference",
1902 "in": "path",
1903 "required": true,
1904 "description": "URL-encoded proc reference (ProcId)",
1905 "schema": { "type": "string" }
1906 }],
1907 "responses": {
1908 "200": {
1909 "description": "Dump stored successfully",
1910 "content": {
1911 "application/json": {
1912 "schema": { "$ref": "#/components/schemas/PyspyDumpAndStoreResponse" }
1913 }
1914 }
1915 },
1916 "400": error_response("Bad request (malformed proc reference)"),
1917 "404": error_response("Proc or dashboard not found"),
1918 "500": error_response("Internal error"),
1919 "504": error_response("Gateway timeout")
1920 }
1921 }
1922 }
1923 },
1924 "components": {
1925 "schemas": serde_json::Value::Object(shared_schemas)
1926 }
1927 });
1928
1929 if let Some(paths) = spec.pointer_mut("/paths").and_then(|v| v.as_object_mut()) {
1932 paths.insert(
1933 "/v1/schema/admin".into(),
1934 serde_json::json!({
1935 "get": {
1936 "summary": "JSON Schema for AdminInfo (Draft 2020-12)",
1937 "operationId": "getAdminSchema",
1938 "responses": {
1939 "200": {
1940 "description": "JSON Schema document",
1941 "content": { "application/json": {} }
1942 }
1943 }
1944 }
1945 }),
1946 );
1947 }
1948
1949 spec
1950}
1951
1952async fn serve_openapi() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1954 Ok(axum::response::Json(build_openapi_spec()))
1955}
1956
1957fn parse_pyspy_proc_reference(
1960 raw: &str,
1961) -> Result<(String, hyperactor_reference::ProcId), ApiError> {
1962 let trimmed = raw.trim_start_matches('/');
1963 if trimmed.is_empty() {
1964 return Err(ApiError::bad_request("empty proc reference", None));
1965 }
1966 let decoded = urlencoding::decode(trimmed)
1967 .map(|cow| cow.into_owned())
1968 .map_err(|_| {
1969 ApiError::bad_request(
1970 "malformed percent-encoding: decoded bytes are not valid UTF-8",
1971 None,
1972 )
1973 })?;
1974 let proc_id: hyperactor_reference::ProcId = decoded
1975 .parse()
1976 .map_err(|e| ApiError::bad_request(format!("invalid proc reference: {}", e), None))?;
1977 Ok((decoded, proc_id))
1978}
1979
1980async fn probe_actor(
1988 cx: &Instance<()>,
1989 agent_id: &hyperactor_reference::ActorId,
1990) -> Result<bool, ApiError> {
1991 let port = hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(agent_id);
1992 let (handle, rx) = open_once_port::<IntrospectResult>(cx);
1993 port.send(
1994 cx,
1995 IntrospectMessage::Query {
1996 view: IntrospectView::Entity,
1997 reply: handle.bind(),
1998 },
1999 )
2000 .map_err(|e| {
2001 tracing::warn!(
2002 name = "pyspy_probe_send_failed",
2003 %agent_id,
2004 error = %e,
2005 );
2006 ApiError {
2007 code: "internal_error".to_string(),
2008 message: format!("failed to send probe to {}: {}", agent_id, e),
2009 details: None,
2010 }
2011 })?;
2012
2013 let timeout = hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT);
2014 match tokio::time::timeout(timeout, rx.recv()).await {
2015 Ok(Ok(_)) => Ok(true),
2016 Ok(Err(e)) => {
2017 tracing::debug!(
2018 name = "pyspy_probe_recv_failed",
2019 %agent_id,
2020 error = %e,
2021 );
2022 Ok(false)
2023 }
2024 Err(_elapsed) => {
2025 tracing::debug!(
2026 name = "pyspy_probe_timeout",
2027 %agent_id,
2028 );
2029 Ok(false)
2030 }
2031 }
2032}
2033
2034async fn do_pyspy_dump(
2041 state: &BridgeState,
2042 raw_proc_reference: &str,
2043) -> Result<PySpyResult, ApiError> {
2044 let (proc_reference, proc_id) = parse_pyspy_proc_reference(raw_proc_reference)?;
2045
2046 let agent_id = if proc_id.base_name() == SERVICE_PROC_NAME {
2048 proc_id.actor_id(HOST_MESH_AGENT_ACTOR_NAME, 0)
2049 } else {
2050 proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0)
2051 };
2052
2053 let cx = &state.bridge_cx;
2056 if !probe_actor(cx, &agent_id).await? {
2057 return Err(ApiError::not_found(
2058 format!(
2059 "proc {} does not have a reachable py-spy handler (expected {} actor)",
2060 proc_reference,
2061 if proc_id.base_name() == SERVICE_PROC_NAME {
2062 HOST_MESH_AGENT_ACTOR_NAME
2063 } else {
2064 PROC_AGENT_ACTOR_NAME
2065 },
2066 ),
2067 None,
2068 ));
2069 }
2070
2071 let port = hyperactor_reference::PortRef::<PySpyDump>::attest_message_port(&agent_id);
2072 let (reply_handle, reply_rx) = open_once_port::<PySpyResult>(cx);
2073 let mut reply_ref = reply_handle.bind();
2076 reply_ref.return_undeliverable(false);
2077 port.send(
2082 cx,
2083 PySpyDump {
2084 opts: PySpyOpts {
2085 threads: false,
2086 native: true,
2087 native_all: true,
2088 nonblocking: false,
2089 },
2090 result: reply_ref,
2091 },
2092 )
2093 .map_err(|e| ApiError {
2094 code: "internal_error".to_string(),
2095 message: format!("failed to send PySpyDump: {}", e),
2096 details: None,
2097 })?;
2098
2099 tokio::time::timeout(
2100 hyperactor_config::global::get(crate::config::MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT),
2101 reply_rx.recv(),
2102 )
2103 .await
2104 .map_err(|_| {
2105 tracing::warn!(
2106 proc_reference = %proc_reference,
2107 "mesh admin: py-spy dump timed out (gateway_timeout)",
2108 );
2109 ApiError {
2110 code: "gateway_timeout".to_string(),
2111 message: format!("timed out waiting for py-spy dump from {}", proc_reference),
2112 details: None,
2113 }
2114 })?
2115 .map_err(|e| ApiError {
2116 code: "internal_error".to_string(),
2117 message: format!("failed to receive PySpyResult: {}", e),
2118 details: None,
2119 })
2120}
2121
2122async fn pyspy_bridge(
2129 State(state): State<Arc<BridgeState>>,
2130 AxumPath(proc_reference): AxumPath<String>,
2131) -> Result<Json<PySpyResult>, ApiError> {
2132 Ok(Json(do_pyspy_dump(&state, &proc_reference).await?))
2133}
2134
2135#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2137pub struct QueryRequest {
2138 pub sql: String,
2140}
2141
2142#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2144pub struct QueryResponse {
2145 pub rows: serde_json::Value,
2147}
2148
2149#[derive(Debug, Serialize)]
2151struct StorePyspyDumpRequest {
2152 dump_id: String,
2153 proc_ref: String,
2154 pyspy_result_json: String,
2155}
2156
2157#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2159pub struct PyspyDumpAndStoreResponse {
2160 pub dump_id: String,
2162}
2163
2164fn require_telemetry_url(state: &BridgeState) -> Result<&str, ApiError> {
2167 state.telemetry_url.as_deref().ok_or_else(|| {
2168 ApiError::not_found("dashboard not configured (no telemetry_url provided)", None)
2169 })
2170}
2171
2172async fn query_proxy(
2179 State(state): State<Arc<BridgeState>>,
2180 axum::Json(body): axum::Json<QueryRequest>,
2181) -> Result<axum::Json<QueryResponse>, ApiError> {
2182 let telemetry_url = require_telemetry_url(&state)?;
2183
2184 let resp = state
2185 .http_client
2186 .post(format!("{}/api/query", telemetry_url))
2187 .json(&body)
2188 .send()
2189 .await
2190 .map_err(|e| ApiError {
2191 code: "proxy_error".to_string(),
2192 message: format!("failed to proxy query to dashboard: {}", e),
2193 details: None,
2194 })?;
2195
2196 let status = resp.status();
2197 let resp_body = resp.bytes().await.map_err(|e| ApiError {
2198 code: "proxy_error".to_string(),
2199 message: format!("failed to read dashboard response: {}", e),
2200 details: None,
2201 })?;
2202
2203 if !status.is_success() {
2204 let msg = serde_json::from_slice::<serde_json::Value>(&resp_body)
2206 .ok()
2207 .and_then(|v| v.get("error")?.as_str().map(String::from))
2208 .unwrap_or_else(|| format!("dashboard returned HTTP {status}"));
2209 let code = if status.is_client_error() {
2210 "bad_request"
2211 } else {
2212 "proxy_error"
2213 };
2214 return Err(ApiError {
2215 code: code.to_string(),
2216 message: msg,
2217 details: None,
2218 });
2219 }
2220
2221 let result: QueryResponse = serde_json::from_slice(&resp_body).map_err(|e| ApiError {
2222 code: "proxy_error".to_string(),
2223 message: format!("failed to parse dashboard response: {}", e),
2224 details: None,
2225 })?;
2226
2227 Ok(axum::Json(result))
2228}
2229
2230async fn pyspy_dump_and_store(
2239 State(state): State<Arc<BridgeState>>,
2240 AxumPath(proc_reference): AxumPath<String>,
2241) -> Result<axum::Json<PyspyDumpAndStoreResponse>, ApiError> {
2242 let telemetry_url = require_telemetry_url(&state)?;
2243 let pyspy_result = do_pyspy_dump(&state, &proc_reference).await?;
2244
2245 let dump_id = uuid::Uuid::new_v4().to_string();
2246 let pyspy_json = serde_json::to_string(&pyspy_result).map_err(|e| ApiError {
2247 code: "internal_error".to_string(),
2248 message: format!("failed to serialize PySpyResult: {}", e),
2249 details: None,
2250 })?;
2251
2252 let store_body = StorePyspyDumpRequest {
2253 dump_id: dump_id.clone(),
2254 proc_ref: proc_reference,
2255 pyspy_result_json: pyspy_json,
2256 };
2257
2258 let store_resp = state
2259 .http_client
2260 .post(format!("{}/api/pyspy_dump", telemetry_url))
2261 .json(&store_body)
2262 .send()
2263 .await
2264 .map_err(|e| ApiError {
2265 code: "proxy_error".to_string(),
2266 message: format!("failed to store pyspy dump in dashboard: {}", e),
2267 details: None,
2268 })?;
2269
2270 if !store_resp.status().is_success() {
2271 return Err(ApiError {
2272 code: "proxy_error".to_string(),
2273 message: format!(
2274 "dashboard rejected pyspy dump store: HTTP {}",
2275 store_resp.status()
2276 ),
2277 details: None,
2278 });
2279 }
2280
2281 Ok(axum::Json(PyspyDumpAndStoreResponse { dump_id }))
2282}
2283
2284async fn config_bridge(
2291 State(state): State<Arc<BridgeState>>,
2292 AxumPath(proc_reference): AxumPath<String>,
2293) -> Result<Json<ConfigDumpResult>, ApiError> {
2294 let (proc_reference, proc_id) = parse_pyspy_proc_reference(&proc_reference)?;
2295
2296 let agent_id = if proc_id.base_name() == SERVICE_PROC_NAME {
2298 proc_id.actor_id(HOST_MESH_AGENT_ACTOR_NAME, 0)
2299 } else {
2300 proc_id.actor_id(PROC_AGENT_ACTOR_NAME, 0)
2301 };
2302
2303 let cx = &state.bridge_cx;
2309
2310 let port = hyperactor_reference::PortRef::<ConfigDump>::attest_message_port(&agent_id);
2311 let (reply_handle, reply_rx) = open_once_port::<ConfigDumpResult>(cx);
2312 let mut reply_ref = reply_handle.bind();
2318 reply_ref.return_undeliverable(false);
2319
2320 port.send(cx, ConfigDump { result: reply_ref })
2321 .map_err(|e| ApiError {
2322 code: "internal_error".to_string(),
2323 message: format!("failed to send ConfigDump: {}", e),
2324 details: None,
2325 })?;
2326
2327 let bridge_timeout =
2330 hyperactor_config::global::get(crate::config::MESH_ADMIN_CONFIG_DUMP_BRIDGE_TIMEOUT);
2331 let wire_result = tokio::time::timeout(bridge_timeout, reply_rx.recv())
2332 .await
2333 .map_err(|_| {
2334 tracing::warn!(
2335 proc_reference = %proc_reference,
2336 "mesh admin: config dump timed out (gateway_timeout)",
2337 );
2338 ApiError {
2339 code: "gateway_timeout".to_string(),
2340 message: format!("timed out waiting for config dump from {}", proc_reference),
2341 details: None,
2342 }
2343 })?
2344 .map_err(|e| ApiError {
2345 code: "internal_error".to_string(),
2346 message: format!("failed to receive ConfigDumpResult: {}", e),
2347 details: None,
2348 })?;
2349
2350 Ok(Json(wire_result))
2351}
2352
2353async fn resolve_reference_bridge(
2366 State(state): State<Arc<BridgeState>>,
2367 AxumPath(reference): AxumPath<String>,
2368) -> Result<Json<NodePayloadDto>, ApiError> {
2369 let reference = reference.trim_start_matches('/');
2371 if reference.is_empty() {
2372 return Err(ApiError::bad_request("empty reference", None));
2373 }
2374 let reference = urlencoding::decode(reference)
2375 .map(|cow| cow.into_owned())
2376 .map_err(|_| {
2377 ApiError::bad_request(
2378 "malformed percent-encoding: decoded bytes are not valid UTF-8",
2379 None,
2380 )
2381 })?;
2382
2383 let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2386 tracing::warn!("mesh admin: rejecting resolve request (503): too many concurrent requests");
2387 ApiError {
2388 code: "service_unavailable".to_string(),
2389 message: "too many concurrent introspection requests".to_string(),
2390 details: None,
2391 }
2392 })?;
2393
2394 let cx = &state.bridge_cx;
2395 let resolve_start = std::time::Instant::now();
2396 let response = tokio::time::timeout(
2397 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
2398 state.admin_ref.resolve(cx, reference.clone()),
2399 )
2400 .await
2401 .map_err(|_| {
2402 tracing::warn!(
2403 reference = %reference,
2404 elapsed_ms = resolve_start.elapsed().as_millis() as u64,
2405 "mesh admin: resolve timed out (gateway_timeout)",
2406 );
2407 ApiError {
2408 code: "gateway_timeout".to_string(),
2409 message: "timed out resolving reference".to_string(),
2410 details: None,
2411 }
2412 })?
2413 .map_err(|e| ApiError {
2414 code: "internal_error".to_string(),
2415 message: format!("failed to resolve reference: {}", e),
2416 details: None,
2417 })?;
2418
2419 match response.0 {
2420 Ok(payload) => Ok(Json(NodePayloadDto::from(payload))),
2421 Err(error) => Err(ApiError::not_found(error, None)),
2422 }
2423}
2424
2425async fn tree_dump(
2450 State(state): State<Arc<BridgeState>>,
2451 headers: axum::http::header::HeaderMap,
2452) -> Result<String, ApiError> {
2453 let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2455 tracing::warn!(
2456 "mesh admin: rejecting tree_dump request (503): too many concurrent requests"
2457 );
2458 ApiError {
2459 code: "service_unavailable".to_string(),
2460 message: "too many concurrent introspection requests".to_string(),
2461 details: None,
2462 }
2463 })?;
2464
2465 let cx = &state.bridge_cx;
2466
2467 let host = headers
2469 .get("host")
2470 .and_then(|v| v.to_str().ok())
2471 .unwrap_or("localhost");
2472 let scheme = headers
2473 .get("x-forwarded-proto")
2474 .and_then(|v| v.to_str().ok())
2475 .unwrap_or("http");
2476 let base_url = format!("{}://{}", scheme, host);
2477
2478 let root_resp = tokio::time::timeout(
2480 hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2481 state.admin_ref.resolve(cx, "root".to_string()),
2482 )
2483 .await
2484 .map_err(|_| ApiError {
2485 code: "gateway_timeout".to_string(),
2486 message: "timed out resolving root".to_string(),
2487 details: None,
2488 })?
2489 .map_err(|e| ApiError {
2490 code: "internal_error".to_string(),
2491 message: format!("failed to resolve root: {}", e),
2492 details: None,
2493 })?;
2494
2495 let root = root_resp.0.map_err(|e| ApiError {
2496 code: "internal_error".to_string(),
2497 message: e,
2498 details: None,
2499 })?;
2500
2501 let mut output = String::new();
2502
2503 for child_ref in &root.children {
2507 let child_ref_str = child_ref.to_string();
2508 let resp = tokio::time::timeout(
2509 hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2510 state.admin_ref.resolve(cx, child_ref_str.clone()),
2511 )
2512 .await;
2513
2514 let payload = match resp {
2515 Ok(Ok(r)) => r.0.ok(),
2516 _ => None,
2517 };
2518
2519 match payload {
2520 Some(node) if matches!(node.properties, NodeProperties::Host { .. }) => {
2521 let header = match &node.properties {
2522 NodeProperties::Host { addr, .. } => addr.clone(),
2523 _ => child_ref_str.clone(),
2524 };
2525 let host_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2526 output.push_str(&format!("{} -> {}\n", header, host_url));
2527
2528 let num_procs = node.children.len();
2529 for (i, proc_ref) in node.children.iter().enumerate() {
2530 let proc_ref_str = proc_ref.to_string();
2531 let is_last_proc = i == num_procs - 1;
2532 let proc_connector = if is_last_proc {
2533 "└── "
2534 } else {
2535 "├── "
2536 };
2537 let proc_name = derive_tree_label(proc_ref);
2538 let proc_url =
2539 format!("{}/v1/{}", base_url, urlencoding::encode(&proc_ref_str));
2540 output.push_str(&format!(
2541 "{}{} -> {}\n",
2542 proc_connector, proc_name, proc_url
2543 ));
2544
2545 let proc_resp = tokio::time::timeout(
2546 hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2547 state.admin_ref.resolve(cx, proc_ref_str),
2548 )
2549 .await;
2550 let proc_payload = match proc_resp {
2551 Ok(Ok(r)) => r.0.ok(),
2552 _ => None,
2553 };
2554 if let Some(proc_node) = proc_payload {
2555 let num_actors = proc_node.children.len();
2556 let child_prefix = if is_last_proc { " " } else { "│ " };
2557 for (j, actor_ref) in proc_node.children.iter().enumerate() {
2558 let actor_ref_str = actor_ref.to_string();
2559 let actor_connector = if j == num_actors - 1 {
2560 "└── "
2561 } else {
2562 "├── "
2563 };
2564 let actor_label = derive_actor_label(actor_ref);
2565 let actor_url =
2566 format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2567 output.push_str(&format!(
2568 "{}{}{} -> {}\n",
2569 child_prefix, actor_connector, actor_label, actor_url
2570 ));
2571 }
2572 }
2573 }
2574 output.push('\n');
2575 }
2576 Some(node) if matches!(node.properties, NodeProperties::Proc { .. }) => {
2577 let proc_name = match &node.properties {
2578 NodeProperties::Proc { proc_name, .. } => proc_name.clone(),
2579 _ => child_ref_str.clone(),
2580 };
2581 let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2582 output.push_str(&format!("{} -> {}\n", proc_name, proc_url));
2583
2584 let num_actors = node.children.len();
2585 for (j, actor_ref) in node.children.iter().enumerate() {
2586 let actor_ref_str = actor_ref.to_string();
2587 let actor_connector = if j == num_actors - 1 {
2588 "└── "
2589 } else {
2590 "├── "
2591 };
2592 let actor_label = derive_actor_label(actor_ref);
2593 let actor_url =
2594 format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2595 output.push_str(&format!(
2596 "{}{} -> {}\n",
2597 actor_connector, actor_label, actor_url
2598 ));
2599 }
2600 output.push('\n');
2601 }
2602 Some(_node) => {
2603 let label = derive_actor_label(child_ref);
2604 let url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2605 output.push_str(&format!("{} -> {}\n\n", label, url));
2606 }
2607 _ => {
2608 output.push_str(&format!("{} (unreachable)\n\n", child_ref));
2609 }
2610 }
2611 }
2612 Ok(output)
2613}
2614
2615fn derive_tree_label(node_ref: &crate::introspect::NodeRef) -> String {
2630 match node_ref {
2631 crate::introspect::NodeRef::Root => "root".to_string(),
2632 crate::introspect::NodeRef::Host(id) => id.proc_id().name().to_string(),
2633 crate::introspect::NodeRef::Proc(id) => id.name().to_string(),
2634 crate::introspect::NodeRef::Actor(id) => {
2635 format!("{}{}", id.name(), format_args!("[{}]", id.pid()))
2636 }
2637 }
2638}
2639
2640fn derive_actor_label(node_ref: &crate::introspect::NodeRef) -> String {
2641 match node_ref {
2642 crate::introspect::NodeRef::Root => "root".to_string(),
2643 crate::introspect::NodeRef::Host(id) => id.name().to_string(),
2644 crate::introspect::NodeRef::Proc(id) => id.name().to_string(),
2645 crate::introspect::NodeRef::Actor(id) => {
2646 format!("{}[{}]", id.name(), id.pid())
2647 }
2648 }
2649}
2650
2651#[non_exhaustive]
2660pub enum PublishedHandle {
2661 Mast(String),
2663}
2664
2665impl PublishedHandle {
2666 pub async fn resolve(self, _port_override: Option<u16>) -> anyhow::Result<String> {
2671 anyhow::bail!(
2672 "publication-based admin handle resolution is not yet implemented: \
2673 mesh admin placement has moved to the caller's local proc. \
2674 Discover the admin URL from startup output or another \
2675 launch-time publication instead."
2676 )
2677 }
2678}
2679
2680#[non_exhaustive]
2685pub enum AdminHandle {
2686 Url(String),
2688 Published(PublishedHandle),
2690 Unsupported(String),
2692}
2693
2694impl AdminHandle {
2695 pub fn parse(addr: &str) -> Self {
2704 if addr.starts_with("mast_conda:///") {
2706 return AdminHandle::Published(PublishedHandle::Mast(addr.to_string()));
2707 }
2708 if let Ok(parsed) = url::Url::parse(addr) {
2710 if matches!(parsed.scheme(), "http" | "https") {
2711 return AdminHandle::Url(addr.to_string());
2712 }
2713 }
2714 let with_scheme = format!("https://{}", addr);
2717 if let Ok(parsed) = url::Url::parse(&with_scheme) {
2718 if parsed.host_str().is_some() && parsed.port().is_some() {
2719 return AdminHandle::Url(with_scheme);
2720 }
2721 }
2722 AdminHandle::Unsupported(addr.to_string())
2723 }
2724
2725 pub async fn resolve(self, port_override: Option<u16>) -> anyhow::Result<String> {
2731 match self {
2732 AdminHandle::Url(url) => Ok(url),
2733 AdminHandle::Published(h) => h.resolve(port_override).await,
2734 AdminHandle::Unsupported(s) => anyhow::bail!(
2735 "unrecognized admin handle '{}': expected https://host:port or mast_conda:///job",
2736 s
2737 ),
2738 }
2739 }
2740}
2741
2742pub async fn resolve_mast_handle(
2748 handle: &str,
2749 port_override: Option<u16>,
2750) -> anyhow::Result<String> {
2751 AdminHandle::Published(PublishedHandle::Mast(handle.to_string()))
2752 .resolve(port_override)
2753 .await
2754}
2755
2756#[cfg(test)]
2757mod tests {
2758 use std::net::SocketAddr;
2759
2760 use hyperactor::channel::ChannelAddr;
2761 use hyperactor::testing::ids::test_proc_id_with_addr;
2762
2763 use super::*;
2764
2765 #[derive(Debug)]
2776 #[hyperactor::export(handlers = [])]
2777 struct TestIntrospectableActor;
2778 impl Actor for TestIntrospectableActor {}
2779
2780 #[test]
2785 fn test_build_root_payload() {
2786 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
2787 let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
2788
2789 let proc1 = test_proc_id_with_addr(ChannelAddr::Tcp(addr1), "host1");
2790 let proc2 = test_proc_id_with_addr(ChannelAddr::Tcp(addr2), "host2");
2791
2792 let actor_id1 = proc1.actor_id("mesh_agent", 0);
2793 let actor_id2 = proc2.actor_id("mesh_agent", 0);
2794
2795 let ref1: hyperactor_reference::ActorRef<HostAgent> =
2796 hyperactor_reference::ActorRef::attest(actor_id1.clone());
2797 let ref2: hyperactor_reference::ActorRef<HostAgent> =
2798 hyperactor_reference::ActorRef::attest(actor_id2.clone());
2799
2800 let agent = MeshAdminAgent::new(
2801 vec![("host_a".to_string(), ref1), ("host_b".to_string(), ref2)],
2802 None,
2803 None,
2804 None,
2805 );
2806
2807 let payload = agent.build_root_payload();
2808 assert_eq!(payload.identity, crate::introspect::NodeRef::Root);
2809 assert_eq!(payload.parent, None);
2810 assert!(matches!(
2811 payload.properties,
2812 NodeProperties::Root { num_hosts: 2, .. }
2813 ));
2814 assert_eq!(payload.children.len(), 2);
2815 assert!(
2816 payload
2817 .children
2818 .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
2819 );
2820 assert!(
2821 payload
2822 .children
2823 .contains(&crate::introspect::NodeRef::Host(actor_id2.clone()))
2824 );
2825
2826 match &payload.properties {
2828 NodeProperties::Root {
2829 num_hosts,
2830 started_by,
2831 system_children,
2832 ..
2833 } => {
2834 assert_eq!(*num_hosts, 2);
2835 assert!(!started_by.is_empty());
2836 assert!(
2838 system_children.is_empty(),
2839 "LC-1: root system_children must be empty"
2840 );
2841 }
2842 other => panic!("expected Root, got {:?}", other),
2843 }
2844 }
2845
2846 #[tokio::test]
2852 async fn test_resolve_reference_tree_walk() {
2853 use hyperactor::Proc;
2854 use hyperactor::channel::ChannelTransport;
2855 use hyperactor::host::Host;
2856 use hyperactor::host::LocalProcManager;
2857
2858 use crate::host_mesh::host_agent::HostAgentMode;
2859 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2860 use crate::proc_agent::ProcAgent;
2861
2862 let spawn: ProcManagerSpawnFn =
2866 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2867 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2868 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2869 Host::new(manager, ChannelTransport::Unix.any())
2870 .await
2871 .unwrap();
2872 let host_addr = host.addr().clone();
2873 let system_proc = host.system_proc().clone();
2874 let host_agent_handle = system_proc
2875 .spawn(
2876 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2877 HostAgent::new(HostAgentMode::Local(host)),
2878 )
2879 .unwrap();
2880 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2881 let host_addr_str = host_addr.to_string();
2882
2883 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2888 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2891 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2892 let admin_handle = admin_proc
2893 .spawn(
2894 MESH_ADMIN_ACTOR_NAME,
2895 MeshAdminAgent::new(
2896 vec![(host_addr_str.clone(), host_agent_ref.clone())],
2897 None,
2898 Some("[::]:0".parse().unwrap()),
2899 None,
2900 ),
2901 )
2902 .unwrap();
2903 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2904
2905 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2909 let (client, _handle) = client_proc.instance("client").unwrap();
2910
2911 let root_resp = admin_ref
2913 .resolve(&client, "root".to_string())
2914 .await
2915 .unwrap();
2916 let root = root_resp.0.unwrap();
2917 assert_eq!(root.identity, crate::introspect::NodeRef::Root);
2918 assert!(matches!(
2919 root.properties,
2920 NodeProperties::Root { num_hosts: 1, .. }
2921 ));
2922 assert_eq!(root.parent, None);
2923 assert_eq!(root.children.len(), 1); let expected_host_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone());
2927 let host_child_ref = root
2928 .children
2929 .iter()
2930 .find(|c| **c == expected_host_ref)
2931 .expect("root children should contain the host agent (as Host ref)");
2932 let host_ref_string = host_child_ref.to_string();
2933 let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
2934 let host_node = host_resp.0.unwrap();
2935 assert_eq!(host_node.identity, expected_host_ref);
2936 assert!(
2937 matches!(host_node.properties, NodeProperties::Host { .. }),
2938 "expected Host properties, got {:?}",
2939 host_node.properties
2940 );
2941 assert_eq!(host_node.parent, Some(crate::introspect::NodeRef::Root));
2942 assert!(
2943 !host_node.children.is_empty(),
2944 "host should have at least one proc child"
2945 );
2946 match &host_node.properties {
2948 NodeProperties::Host {
2949 system_children, ..
2950 } => {
2951 assert!(
2952 system_children.is_empty(),
2953 "LC-2: host system_children must be empty"
2954 );
2955 }
2956 other => panic!("expected Host, got {:?}", other),
2957 }
2958
2959 let proc_ref = &host_node.children[0];
2961 let proc_ref_str = proc_ref.to_string();
2962 let proc_resp = admin_ref.resolve(&client, proc_ref_str).await.unwrap();
2963 let proc_node = proc_resp.0.unwrap();
2964 assert!(
2965 matches!(proc_node.properties, NodeProperties::Proc { .. }),
2966 "expected Proc properties, got {:?}",
2967 proc_node.properties
2968 );
2969 assert_eq!(proc_node.parent, Some(expected_host_ref.clone()));
2970 assert!(
2972 !proc_node.children.is_empty(),
2973 "proc should have at least one actor child"
2974 );
2975
2976 let host_agent_node_ref =
2986 crate::introspect::NodeRef::Actor(host_agent_ref.actor_id().clone());
2987 assert!(
2988 proc_node.children.contains(&host_agent_node_ref),
2989 "system proc children {:?} should contain the host agent {:?}",
2990 proc_node.children,
2991 host_agent_node_ref
2992 );
2993
2994 let xref_resp = admin_ref
2996 .resolve(&client, host_agent_ref.actor_id().to_string())
2997 .await
2998 .unwrap();
2999 let xref_node = xref_resp.0.unwrap();
3000
3001 assert!(
3004 matches!(xref_node.properties, NodeProperties::Actor { .. }),
3005 "host agent child resolved as plain actor should be Actor, got {:?}",
3006 xref_node.properties
3007 );
3008 }
3009
3010 #[tokio::test]
3015 async fn test_proc_properties_for_all_procs() {
3016 use std::time::Duration;
3017
3018 use hyperactor::Proc;
3019 use hyperactor::channel::ChannelTransport;
3020 use hyperactor::host::Host;
3021 use hyperactor::host::LocalProcManager;
3022
3023 use crate::Name;
3024 use crate::host_mesh::host_agent::HostAgentMode;
3025 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3026 use crate::proc_agent::ProcAgent;
3027 use crate::resource;
3028 use crate::resource::ProcSpec;
3029 use crate::resource::Rank;
3030
3031 let spawn: ProcManagerSpawnFn =
3033 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3034 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3035 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3036 Host::new(manager, ChannelTransport::Unix.any())
3037 .await
3038 .unwrap();
3039 let host_addr = host.addr().clone();
3040 let system_proc = host.system_proc().clone();
3041 let host_agent_handle = system_proc
3042 .spawn(
3043 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3044 HostAgent::new(HostAgentMode::Local(host)),
3045 )
3046 .unwrap();
3047 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3048 let host_addr_str = host_addr.to_string();
3049
3050 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3054 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3055 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3056 let admin_handle = admin_proc
3057 .spawn(
3058 MESH_ADMIN_ACTOR_NAME,
3059 MeshAdminAgent::new(
3060 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3061 None,
3062 Some("[::]:0".parse().unwrap()),
3063 None,
3064 ),
3065 )
3066 .unwrap();
3067 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3068
3069 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3071 let (client, _handle) = client_proc.instance("client").unwrap();
3072
3073 let user_proc_name = Name::new("user_proc").unwrap();
3075 host_agent_ref
3076 .send(
3077 &client,
3078 resource::CreateOrUpdate {
3079 name: user_proc_name.clone(),
3080 rank: Rank::new(0),
3081 spec: ProcSpec::default(),
3082 },
3083 )
3084 .unwrap();
3085
3086 tokio::time::sleep(Duration::from_secs(2)).await;
3088
3089 let host_ref_string =
3091 crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone()).to_string();
3092 let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
3093 let host_node = host_resp.0.unwrap();
3094
3095 assert!(
3098 host_node.children.len() >= 3,
3099 "expected at least 3 proc children (2 system + 1 user), got {}",
3100 host_node.children.len()
3101 );
3102
3103 let user_proc_name_str = user_proc_name.to_string();
3105 let mut found_system = false;
3106 let mut found_user = false;
3107 for child_ref in &host_node.children {
3108 let resp = admin_ref
3109 .resolve(&client, child_ref.to_string())
3110 .await
3111 .unwrap();
3112 let node = resp.0.unwrap();
3113 if let NodeProperties::Proc { proc_name, .. } = &node.properties {
3114 if proc_name.contains(&user_proc_name_str) {
3115 found_user = true;
3116 } else {
3117 found_system = true;
3118 }
3119 } else {
3121 }
3123 }
3124 assert!(
3125 found_system,
3126 "should have resolved at least one system proc"
3127 );
3128 assert!(found_user, "should have resolved the user proc");
3129 }
3130
3131 #[test]
3135 fn test_build_root_payload_with_root_client() {
3136 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
3137 let proc1 = hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "host1");
3138 let actor_id1 = hyperactor_reference::ActorId::root(proc1, "mesh_agent".to_string());
3139 let ref1: hyperactor_reference::ActorRef<HostAgent> =
3140 hyperactor_reference::ActorRef::attest(actor_id1.clone());
3141
3142 let client_proc_id =
3143 hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "local");
3144 let client_actor_id = client_proc_id.actor_id("client", 0);
3145
3146 let agent = MeshAdminAgent::new(
3147 vec![("host_a".to_string(), ref1)],
3148 Some(client_actor_id.clone()),
3149 None,
3150 None,
3151 );
3152
3153 let payload = agent.build_root_payload();
3154 assert!(matches!(
3155 payload.properties,
3156 NodeProperties::Root { num_hosts: 1, .. }
3157 ));
3158 assert_eq!(payload.children.len(), 1);
3160 assert!(
3161 payload
3162 .children
3163 .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
3164 );
3165 }
3166
3167 #[tokio::test]
3171 async fn test_resolve_root_client_actor() {
3172 use hyperactor::channel::ChannelTransport;
3173 use hyperactor::host::Host;
3174 use hyperactor::host::LocalProcManager;
3175
3176 use crate::host_mesh::host_agent::HostAgentMode;
3177 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3178 use crate::proc_agent::ProcAgent;
3179
3180 let spawn: ProcManagerSpawnFn =
3182 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3183 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3184 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3185 Host::new(manager, ChannelTransport::Unix.any())
3186 .await
3187 .unwrap();
3188 let host_addr = host.addr().clone();
3189 let system_proc = host.system_proc().clone();
3190
3191 let local_proc = host.local_proc();
3194 let local_proc_id = local_proc.proc_id().clone();
3195 let root_client_handle = local_proc.spawn("client", TestIntrospectableActor).unwrap();
3196 let root_client_ref: hyperactor_reference::ActorRef<TestIntrospectableActor> =
3197 root_client_handle.bind();
3198 let root_client_actor_id = root_client_ref.actor_id().clone();
3199
3200 let host_agent_handle = system_proc
3201 .spawn(
3202 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3203 HostAgent::new(HostAgentMode::Local(host)),
3204 )
3205 .unwrap();
3206 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3207 let host_addr_str = host_addr.to_string();
3208
3209 let admin_proc =
3214 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3215 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3216 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3217 let admin_handle = admin_proc
3218 .spawn(
3219 MESH_ADMIN_ACTOR_NAME,
3220 MeshAdminAgent::new(
3221 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3222 Some(root_client_actor_id.clone()),
3223 Some("[::]:0".parse().unwrap()),
3224 None,
3225 ),
3226 )
3227 .unwrap();
3228 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3229
3230 let client_proc =
3232 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3233 let (client, _handle) = client_proc.instance("client").unwrap();
3234
3235 let root_resp = admin_ref
3237 .resolve(&client, "root".to_string())
3238 .await
3239 .unwrap();
3240 let root = root_resp.0.unwrap();
3241 let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone());
3242 assert!(
3243 root.children.contains(&host_node_ref),
3244 "root children {:?} should contain host {:?}",
3245 root.children,
3246 host_node_ref
3247 );
3248
3249 let host_resp = admin_ref
3251 .resolve(&client, host_node_ref.to_string())
3252 .await
3253 .unwrap();
3254 let host_node = host_resp.0.unwrap();
3255 let local_proc_node_ref = crate::introspect::NodeRef::Proc(local_proc_id.clone());
3256 assert!(
3257 host_node.children.contains(&local_proc_node_ref),
3258 "host children {:?} should contain local proc {:?}",
3259 host_node.children,
3260 local_proc_node_ref
3261 );
3262
3263 let proc_resp = admin_ref
3265 .resolve(&client, local_proc_id.to_string())
3266 .await
3267 .unwrap();
3268 let proc_node = proc_resp.0.unwrap();
3269 assert!(
3270 matches!(proc_node.properties, NodeProperties::Proc { .. }),
3271 "expected Proc properties, got {:?}",
3272 proc_node.properties
3273 );
3274 let root_client_node_ref = crate::introspect::NodeRef::Actor(root_client_actor_id.clone());
3275 assert!(
3276 proc_node.children.contains(&root_client_node_ref),
3277 "local proc children {:?} should contain root client actor {:?}",
3278 proc_node.children,
3279 root_client_node_ref
3280 );
3281
3282 let client_resp = admin_ref
3284 .resolve(&client, root_client_actor_id.to_string())
3285 .await
3286 .unwrap();
3287 let client_node = client_resp.0.unwrap();
3288 assert!(
3289 matches!(client_node.properties, NodeProperties::Actor { .. }),
3290 "expected Actor properties, got {:?}",
3291 client_node.properties
3292 );
3293 assert_eq!(
3294 client_node.parent,
3295 Some(local_proc_node_ref),
3296 "root client parent should be the local proc"
3297 );
3298 }
3299
3300 #[test]
3304 fn test_skill_md_contains_canonical_strings() {
3305 let template = SKILL_MD_TEMPLATE;
3306 assert!(
3307 template.contains("GET {base}/v1/root"),
3308 "SKILL.md must document the root endpoint"
3309 );
3310 assert!(
3311 template.contains("GET {base}/v1/{reference}"),
3312 "SKILL.md must document the reference endpoint"
3313 );
3314 assert!(
3315 template.contains("NodePayload"),
3316 "SKILL.md must mention the NodePayload response type"
3317 );
3318 assert!(
3319 template.contains("GET {base}/SKILL.md"),
3320 "SKILL.md must document itself"
3321 );
3322 assert!(
3323 template.contains("{base}"),
3324 "SKILL.md must use {{base}} placeholder for interpolation"
3325 );
3326 }
3327
3328 #[tokio::test]
3337 async fn test_navigation_identity_invariant() {
3338 use hyperactor::Proc;
3339 use hyperactor::channel::ChannelTransport;
3340 use hyperactor::host::Host;
3341 use hyperactor::host::LocalProcManager;
3342
3343 use crate::host_mesh::host_agent::HostAgentMode;
3344 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3345 use crate::proc_agent::ProcAgent;
3346
3347 let spawn: ProcManagerSpawnFn =
3349 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3350 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3351 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3352 Host::new(manager, ChannelTransport::Unix.any())
3353 .await
3354 .unwrap();
3355 let host_addr = host.addr().clone();
3356 let system_proc = host.system_proc().clone();
3357 let host_agent_handle = system_proc
3358 .spawn(
3359 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3360 HostAgent::new(HostAgentMode::Local(host)),
3361 )
3362 .unwrap();
3363 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3364 let host_addr_str = host_addr.to_string();
3365
3366 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3370 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3371 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3372 let admin_handle = admin_proc
3373 .spawn(
3374 MESH_ADMIN_ACTOR_NAME,
3375 MeshAdminAgent::new(
3376 vec![(host_addr_str, host_agent_ref)],
3377 None,
3378 Some("[::]:0".parse().unwrap()),
3379 None,
3380 ),
3381 )
3382 .unwrap();
3383 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3384
3385 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3386 let (client, _handle) = client_proc.instance("client").unwrap();
3387
3388 let mut queue: std::collections::VecDeque<(String, Option<crate::introspect::NodeRef>)> =
3391 std::collections::VecDeque::new();
3392 queue.push_back(("root".to_string(), None));
3393
3394 let mut visited = std::collections::HashSet::new();
3395 while let Some((ref_str, expected_parent)) = queue.pop_front() {
3396 if !visited.insert(ref_str.clone()) {
3397 continue;
3398 }
3399
3400 let resp = admin_ref.resolve(&client, ref_str.clone()).await.unwrap();
3401 let node = resp.0.unwrap();
3402
3403 assert_eq!(
3405 node.identity.to_string(),
3406 ref_str,
3407 "identity mismatch: resolved '{}' but payload.identity = '{}'",
3408 ref_str,
3409 node.identity
3410 );
3411
3412 assert_eq!(
3414 node.parent, expected_parent,
3415 "parent mismatch for '{}': expected {:?}, got {:?}",
3416 ref_str, expected_parent, node.parent
3417 );
3418
3419 for child_ref in &node.children {
3422 let child_str = child_ref.to_string();
3423 if !visited.contains(&child_str) {
3424 queue.push_back((child_str, Some(node.identity.clone())));
3425 }
3426 }
3427 }
3428
3429 assert!(
3432 visited.len() >= 4,
3433 "expected at least 4 nodes in the tree, visited {}",
3434 visited.len()
3435 );
3436 }
3437
3438 #[tokio::test]
3440 async fn test_system_proc_identity() {
3441 use hyperactor::Proc;
3442 use hyperactor::channel::ChannelTransport;
3443 use hyperactor::host::Host;
3444 use hyperactor::host::LocalProcManager;
3445
3446 use crate::host_mesh::host_agent::HostAgentMode;
3447 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3448 use crate::proc_agent::ProcAgent;
3449
3450 let spawn: ProcManagerSpawnFn =
3452 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3453 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3454 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3455 Host::new(manager, ChannelTransport::Unix.any())
3456 .await
3457 .unwrap();
3458 let host_addr = host.addr().clone();
3459 let system_proc = host.system_proc().clone();
3460 let system_proc_id = system_proc.proc_id().clone();
3461 let host_agent_handle = system_proc
3462 .spawn(
3463 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3464 HostAgent::new(HostAgentMode::Local(host)),
3465 )
3466 .unwrap();
3467 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3468 let host_addr_str = host_addr.to_string();
3469
3470 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3475 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3476 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3477 let admin_handle = admin_proc
3478 .spawn(
3479 MESH_ADMIN_ACTOR_NAME,
3480 MeshAdminAgent::new(
3481 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3482 None,
3483 Some("[::]:0".parse().unwrap()),
3484 None,
3485 ),
3486 )
3487 .unwrap();
3488 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3489
3490 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3492 let (client, _handle) = client_proc.instance("client").unwrap();
3493
3494 let host_ref_str =
3496 crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone()).to_string();
3497 let host_resp = admin_ref
3498 .resolve(&client, host_ref_str.clone())
3499 .await
3500 .unwrap();
3501 let host_node = host_resp.0.unwrap();
3502 assert!(
3503 !host_node.children.is_empty(),
3504 "host should have at least one proc child"
3505 );
3506
3507 let system_children = match &host_node.properties {
3509 NodeProperties::Host {
3510 system_children, ..
3511 } => system_children.clone(),
3512 other => panic!("expected Host properties, got {:?}", other),
3513 };
3514 assert!(
3516 system_children.is_empty(),
3517 "host system_children should be empty (procs are never system), got {:?}",
3518 system_children
3519 );
3520 assert!(
3522 matches!(&host_node.properties, NodeProperties::Host { .. }),
3523 "expected Host properties"
3524 );
3525
3526 let expected_system_ref = crate::introspect::NodeRef::Proc(system_proc_id.clone());
3528 assert!(
3529 host_node.children.contains(&expected_system_ref),
3530 "host children {:?} should contain the system proc ref {:?}",
3531 host_node.children,
3532 expected_system_ref
3533 );
3534
3535 let proc_child_ref = &host_node.children[0];
3537 let proc_resp = admin_ref
3538 .resolve(&client, proc_child_ref.to_string())
3539 .await
3540 .unwrap();
3541 let proc_node = proc_resp.0.unwrap();
3542
3543 assert_eq!(
3544 proc_node.identity, *proc_child_ref,
3545 "identity must match the proc ref from the host's children list"
3546 );
3547
3548 assert!(
3549 matches!(proc_node.properties, NodeProperties::Proc { .. }),
3550 "expected NodeProperties::Proc, got {:?}",
3551 proc_node.properties
3552 );
3553
3554 let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_id().clone());
3555 assert_eq!(
3556 proc_node.parent,
3557 Some(host_node_ref),
3558 "proc parent should be the host reference"
3559 );
3560
3561 assert!(
3563 proc_node.as_of > std::time::UNIX_EPOCH,
3564 "as_of should be after the epoch"
3565 );
3566
3567 assert!(
3569 matches!(&proc_node.properties, NodeProperties::Proc { .. }),
3570 "expected Proc properties"
3571 );
3572 }
3573
3574 #[test]
3578 fn test_admin_handle_parse_https_url() {
3579 let h = super::AdminHandle::parse("https://myhost:1729");
3580 assert!(matches!(h, super::AdminHandle::Url(u) if u == "https://myhost:1729"));
3581 }
3582
3583 #[test]
3584 fn test_admin_handle_parse_bare_host_port() {
3585 let h = super::AdminHandle::parse("myhost:1729");
3587 assert!(
3588 matches!(h, super::AdminHandle::Url(ref u) if u == "https://myhost:1729"),
3589 "bare host:port should become https://host:port, got: {:?}",
3590 matches!(h, super::AdminHandle::Url(_))
3591 );
3592 }
3593
3594 #[test]
3595 fn test_admin_handle_parse_mast() {
3596 let h = super::AdminHandle::parse("mast_conda:///my-job");
3597 assert!(matches!(
3598 h,
3599 super::AdminHandle::Published(super::PublishedHandle::Mast(_))
3600 ));
3601 }
3602
3603 #[test]
3604 fn test_admin_handle_parse_unsupported() {
3605 let h = super::AdminHandle::parse("junk_hostname_no_port");
3607 assert!(matches!(h, super::AdminHandle::Unsupported(_)));
3608 }
3609
3610 #[tokio::test]
3611 async fn test_admin_handle_resolve_url_returns_url() {
3612 let h = super::AdminHandle::parse("https://myhost:1729");
3613 let result = h.resolve(None).await.unwrap();
3614 assert_eq!(result, "https://myhost:1729");
3615 }
3616
3617 #[tokio::test]
3618 async fn test_admin_handle_resolve_published_returns_error() {
3619 let h = super::AdminHandle::parse("mast_conda:///test-job");
3620 let err = format!("{:#}", h.resolve(Some(1729)).await.unwrap_err());
3621 assert!(
3622 err.contains("not yet implemented"),
3623 "expected 'not yet implemented' in error, got: {}",
3624 err
3625 );
3626 }
3627
3628 #[tokio::test]
3629 async fn test_admin_handle_resolve_unsupported_returns_error() {
3630 let h = super::AdminHandle::parse("junk_hostname_no_port");
3631 let err = format!("{:#}", h.resolve(None).await.unwrap_err());
3632 assert!(
3633 err.contains("unrecognized admin handle"),
3634 "expected 'unrecognized admin handle' in error, got: {}",
3635 err
3636 );
3637 }
3638
3639 #[tokio::test]
3642 async fn test_resolve_mast_handle_returns_not_yet_implemented_error() {
3643 let result = super::resolve_mast_handle("mast_conda:///test-job", Some(1729)).await;
3644 let err = format!("{:#}", result.unwrap_err());
3645 assert!(
3646 err.contains("not yet implemented"),
3647 "expected 'not yet implemented' in error, got: {}",
3648 err
3649 );
3650 }
3651
3652 #[test]
3656 fn test_admin_info_new_derives_host_from_url() {
3657 let info = super::AdminInfo::new(
3658 "actor".to_string(),
3659 "proc".to_string(),
3660 "https://myhost.example.com:1729".to_string(),
3661 )
3662 .unwrap();
3663 assert_eq!(info.host, "myhost.example.com");
3664 assert_eq!(info.url, "https://myhost.example.com:1729");
3665 }
3666
3667 #[test]
3669 fn test_admin_info_new_rejects_invalid_url() {
3670 let result = super::AdminInfo::new(
3671 "actor".to_string(),
3672 "proc".to_string(),
3673 "not a url".to_string(),
3674 );
3675 assert!(result.is_err(), "invalid URL must be rejected");
3676 }
3677
3678 #[test]
3680 fn test_admin_info_new_rejects_url_without_host() {
3681 let result = super::AdminInfo::new(
3683 "actor".to_string(),
3684 "proc".to_string(),
3685 "data:text/plain,hello".to_string(),
3686 );
3687 assert!(result.is_err(), "URL without host must be rejected");
3688 }
3689
3690 #[tokio::test]
3695 async fn test_spawn_admin_places_on_caller_proc() {
3696 use hyperactor::Proc;
3697 use hyperactor::channel::ChannelTransport;
3698 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3699
3700 use crate::host_mesh::HostMesh;
3701
3702 let host_mesh = HostMesh::local().await.unwrap();
3704
3705 let caller_proc = Proc::direct(ChannelTransport::Unix.any(), "caller".to_string()).unwrap();
3707 let _supervision = ProcSupervisionCoordinator::set(&caller_proc).await.unwrap();
3708 let (caller_cx, _caller_handle) = caller_proc.instance("caller").unwrap();
3709
3710 let admin_url = crate::host_mesh::spawn_admin(
3712 [&host_mesh],
3713 &caller_cx,
3714 Some("[::]:0".parse().unwrap()),
3715 None,
3716 )
3717 .await
3718 .unwrap();
3719
3720 assert!(!admin_url.is_empty(), "spawn_admin must return a URL");
3721
3722 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> =
3727 hyperactor_reference::ActorRef::attest(
3728 caller_proc.proc_id().actor_id(MESH_ADMIN_ACTOR_NAME, 0),
3729 );
3730 let probe_proc = Proc::direct(ChannelTransport::Unix.any(), "probe".to_string()).unwrap();
3731 let (probe_cx, _probe_handle) = probe_proc.instance("probe").unwrap();
3732 let resp = admin_ref.get_admin_addr(&probe_cx).await.unwrap();
3733 assert_eq!(
3734 resp.addr.as_deref(),
3735 Some(admin_url.as_str()),
3736 "SA-5: admin on caller_proc must respond to GetAdminAddr"
3737 );
3738 }
3739
3740 #[tokio::test]
3758 async fn test_proc_children_reflect_directly_spawned_actors() {
3759 use hyperactor::Proc;
3760 use hyperactor::actor::ActorStatus;
3761 use hyperactor::channel::ChannelTransport;
3762 use hyperactor::host::Host;
3763 use hyperactor::host::LocalProcManager;
3764 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3765
3766 use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
3767 use crate::host_mesh::host_agent::HostAgent;
3768 use crate::host_mesh::host_agent::HostAgentMode;
3769 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3770 use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
3771 use crate::proc_agent::ProcAgent;
3772
3773 let spawn_fn: ProcManagerSpawnFn =
3781 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3782 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn_fn);
3783 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3784 Host::new(manager, ChannelTransport::Unix.any())
3785 .await
3786 .unwrap();
3787 let system_proc = host.system_proc().clone();
3788 let host_agent_handle = system_proc
3789 .spawn(
3790 HOST_MESH_AGENT_ACTOR_NAME,
3791 HostAgent::new(HostAgentMode::Local(host)),
3792 )
3793 .unwrap();
3794 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3795
3796 let user_proc =
3798 Proc::direct(ChannelTransport::Unix.any(), "user_proc".to_string()).unwrap();
3799 let user_proc_addr = user_proc.proc_id().addr().to_string();
3800 let agent_handle = ProcAgent::boot_v1(user_proc.clone(), None).unwrap();
3801 agent_handle
3802 .status()
3803 .wait_for(|s| matches!(s, ActorStatus::Idle))
3804 .await
3805 .unwrap();
3806
3807 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3813 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3814 let admin_handle = admin_proc
3815 .spawn(
3816 MESH_ADMIN_ACTOR_NAME,
3817 MeshAdminAgent::new(
3818 vec![(user_proc_addr, host_agent_ref.clone())],
3819 None,
3820 Some("[::]:0".parse().unwrap()),
3821 None,
3822 ),
3823 )
3824 .unwrap();
3825 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3826
3827 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3828 let (client, _client_handle) = client_proc.instance("client").unwrap();
3829
3830 let user_proc_ref = user_proc.proc_id().to_string();
3834 let resp = admin_ref
3835 .resolve(&client, user_proc_ref.clone())
3836 .await
3837 .unwrap();
3838 let node = resp.0.unwrap();
3839 assert!(
3840 matches!(node.properties, NodeProperties::Proc { .. }),
3841 "expected Proc, got {:?}",
3842 node.properties
3843 );
3844 let initial_count = node.children.len();
3845 assert!(
3846 node.children
3847 .iter()
3848 .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
3849 "initial children {:?} should contain proc_agent",
3850 node.children
3851 );
3852
3853 user_proc
3856 .spawn("extra_actor", TestIntrospectableActor)
3857 .unwrap();
3858
3859 let resp2 = admin_ref
3862 .resolve(&client, user_proc_ref.clone())
3863 .await
3864 .unwrap();
3865 let node2 = resp2.0.unwrap();
3866 assert!(
3867 matches!(node2.properties, NodeProperties::Proc { .. }),
3868 "expected Proc, got {:?}",
3869 node2.properties
3870 );
3871 assert!(
3872 node2
3873 .children
3874 .iter()
3875 .any(|c| c.to_string().contains("extra_actor")),
3876 "after direct spawn, children {:?} should contain extra_actor",
3877 node2.children
3878 );
3879 assert!(
3880 node2.children.len() > initial_count,
3881 "expected at least {} children after direct spawn, got {:?}",
3882 initial_count + 1,
3883 node2.children
3884 );
3885 }
3886
3887 #[test]
3894 fn pyspy_parse_empty_reference() {
3895 let err = parse_pyspy_proc_reference("").unwrap_err();
3897 assert_eq!(err.code, "bad_request");
3898 assert!(err.message.contains("empty"));
3899 }
3900
3901 #[test]
3902 fn pyspy_parse_slash_only() {
3903 let err = parse_pyspy_proc_reference("/").unwrap_err();
3905 assert_eq!(err.code, "bad_request");
3906 assert!(err.message.contains("empty"));
3907 }
3908
3909 #[test]
3910 fn pyspy_parse_malformed_percent_encoding() {
3911 let err = parse_pyspy_proc_reference("%FF%FE").unwrap_err();
3914 assert_eq!(err.code, "bad_request");
3915 assert!(err.message.contains("percent-encoding"));
3916 }
3917
3918 #[test]
3919 fn pyspy_parse_invalid_proc_id() {
3920 let err = parse_pyspy_proc_reference("not-a-valid-proc-id").unwrap_err();
3922 assert_eq!(err.code, "bad_request");
3923 assert!(err.message.contains("invalid proc reference"));
3924 }
3925
3926 #[test]
3927 fn pyspy_parse_valid_proc_reference() {
3928 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
3930 let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
3931 let proc_id_str = proc_id.to_string();
3932
3933 let (decoded, parsed) = parse_pyspy_proc_reference(&proc_id_str).unwrap();
3934 assert_eq!(decoded, proc_id_str);
3935 assert_eq!(parsed, proc_id);
3936 }
3937
3938 #[test]
3939 fn pyspy_parse_strips_leading_slash() {
3940 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
3942 let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
3943 let with_slash = format!("/{}", proc_id);
3944
3945 let (_, parsed) = parse_pyspy_proc_reference(&with_slash).unwrap();
3946 assert_eq!(parsed, proc_id);
3947 }
3948}