1use std::collections::HashMap;
338use std::io;
339use std::sync::Arc;
340use std::time::Duration;
341
342use async_trait::async_trait;
343use axum::Json;
344use axum::Router;
345use axum::extract::Path as AxumPath;
346use axum::extract::State;
347use axum::http::StatusCode;
348use axum::response::IntoResponse;
349use axum::routing::get;
350use axum::routing::post;
351use hyperactor::Actor;
352use hyperactor::ActorHandle;
353use hyperactor::ActorRef;
354use hyperactor::Context;
355use hyperactor::Endpoint as _;
356use hyperactor::HandleClient;
357use hyperactor::Handler;
358use hyperactor::Instance;
359use hyperactor::OncePortRef;
360use hyperactor::ProcAddr;
361use hyperactor::RefClient;
362use hyperactor::channel::try_tls_acceptor;
363use hyperactor::introspect::IntrospectMessage;
364use hyperactor::introspect::IntrospectResult;
365use hyperactor::introspect::IntrospectView;
366use hyperactor::mailbox::open_once_port;
367use serde::Deserialize;
368use serde::Serialize;
369use serde_json::Value;
370use tokio::net::TcpListener;
371use tokio_rustls::TlsAcceptor;
372use typeuri::Named;
373
374use crate::config_dump::ConfigDump;
375use crate::config_dump::ConfigDumpResult;
376use crate::host::SERVICE_PROC_NAME;
377use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
378use crate::host_mesh::host_agent::HostAgent;
379use crate::introspect::NodePayload;
380use crate::introspect::NodeProperties;
381use crate::introspect::dto::NodePayloadDto;
382use crate::introspect::to_node_payload;
383use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
384use crate::proc_agent::ProcAgent;
385use crate::pyspy::PySpyDump;
386use crate::pyspy::PySpyOpts;
387use crate::pyspy::PySpyProfile;
388use crate::pyspy::PySpyProfileOpts;
389use crate::pyspy::PySpyProfileResult;
390use crate::pyspy::PySpyResult;
391use crate::pyspy::ValidatedProfileRequest;
392
393async fn query_introspect(
396 cx: &hyperactor::Context<'_, MeshAdminAgent>,
397 actor_id: &hyperactor::ActorAddr,
398 view: hyperactor::introspect::IntrospectView,
399 timeout: Duration,
400 err_ctx: &str,
401) -> Result<IntrospectResult, anyhow::Error> {
402 let introspect_port = hyperactor::PortRef::<IntrospectMessage>::attest_handler_port(actor_id);
403 let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
404 let mut reply_ref = reply_handle.bind();
405 reply_ref.return_undeliverable(false);
406 introspect_port.post(
407 cx,
408 IntrospectMessage::Query {
409 view,
410 reply: reply_ref,
411 },
412 );
413 tokio::time::timeout(timeout, reply_rx.recv())
414 .await
415 .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
416 .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
417}
418
419async fn query_child_introspect(
421 cx: &hyperactor::Context<'_, MeshAdminAgent>,
422 actor_id: &hyperactor::ActorAddr,
423 child_ref: hyperactor::Addr,
424 timeout: Duration,
425 err_ctx: &str,
426) -> Result<IntrospectResult, anyhow::Error> {
427 let introspect_port = hyperactor::PortRef::<IntrospectMessage>::attest_handler_port(actor_id);
428 let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
429 let mut reply_ref = reply_handle.bind();
430 reply_ref.return_undeliverable(false);
431 introspect_port.post(
432 cx,
433 IntrospectMessage::QueryChild {
434 child_ref,
435 reply: reply_ref,
436 },
437 );
438 tokio::time::timeout(timeout, reply_rx.recv())
439 .await
440 .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
441 .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
442}
443
444pub const MESH_ADMIN_ACTOR_NAME: &str = "mesh_admin";
446
447pub const MESH_ADMIN_BRIDGE_NAME: &str = "mesh_admin_bridge";
464
465#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
468pub struct ApiError {
469 pub code: String,
471 pub message: String,
473 #[serde(skip_serializing_if = "Option::is_none")]
477 pub details: Option<Value>,
478}
479
480#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
482pub struct ApiErrorEnvelope {
483 pub error: ApiError,
484}
485
486impl ApiError {
487 pub fn not_found(message: impl Into<String>, details: Option<Value>) -> Self {
489 Self {
490 code: "not_found".to_string(),
491 message: message.into(),
492 details,
493 }
494 }
495
496 pub fn bad_request(message: impl Into<String>, details: Option<Value>) -> Self {
498 Self {
499 code: "bad_request".to_string(),
500 message: message.into(),
501 details,
502 }
503 }
504}
505
506impl IntoResponse for ApiError {
507 fn into_response(self) -> axum::response::Response {
508 let status = match self.code.as_str() {
509 "not_found" => StatusCode::NOT_FOUND,
510 "bad_request" => StatusCode::BAD_REQUEST,
511 "gateway_timeout" => StatusCode::GATEWAY_TIMEOUT,
512 "service_unavailable" => StatusCode::SERVICE_UNAVAILABLE,
513 _ => StatusCode::INTERNAL_SERVER_ERROR,
514 };
515 let envelope = ApiErrorEnvelope { error: self };
516 (status, Json(envelope)).into_response()
517 }
518}
519
520#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
525pub struct MeshAdminAddrResponse {
526 pub addr: Option<String>,
527}
528wirevalue::register_type!(MeshAdminAddrResponse);
529
530#[derive(
536 Debug,
537 Clone,
538 PartialEq,
539 Serialize,
540 Deserialize,
541 Handler,
542 HandleClient,
543 RefClient,
544 Named
545)]
546pub enum MeshAdminMessage {
547 GetAdminAddr {
552 #[reply]
553 reply: OncePortRef<MeshAdminAddrResponse>,
554 },
555}
556wirevalue::register_type!(MeshAdminMessage);
557
558#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
561pub struct ResolveReferenceResponse(pub Result<NodePayload, String>);
562wirevalue::register_type!(ResolveReferenceResponse);
563
564#[derive(
581 Debug,
582 Clone,
583 PartialEq,
584 Serialize,
585 Deserialize,
586 Handler,
587 HandleClient,
588 RefClient,
589 Named
590)]
591pub enum ResolveReferenceMessage {
592 Resolve {
597 reference_string: String,
600 #[reply]
602 reply: OncePortRef<ResolveReferenceResponse>,
603 },
604}
605wirevalue::register_type!(ResolveReferenceMessage);
606
607#[hyperactor::export(handlers = [MeshAdminMessage, ResolveReferenceMessage])]
620pub struct MeshAdminAgent {
621 hosts: HashMap<String, ActorRef<HostAgent>>,
624
625 host_agents_by_actor_id: HashMap<hyperactor::ActorAddr, String>,
635
636 root_client_actor_id: Option<hyperactor::ActorAddr>,
641
642 self_actor_id: Option<hyperactor::ActorAddr>,
647
648 admin_addr_override: Option<std::net::SocketAddr>,
665
666 admin_addr: Option<std::net::SocketAddr>,
669
670 admin_host: Option<String>,
673
674 telemetry_url: Option<String>,
678
679 started_at: String,
681
682 started_by: String,
684}
685
686impl MeshAdminAgent {
687 pub fn new(
705 hosts: Vec<(String, ActorRef<HostAgent>)>,
706 root_client_actor_id: Option<hyperactor::ActorAddr>,
707 admin_addr: Option<std::net::SocketAddr>,
708 telemetry_url: Option<String>,
709 ) -> Self {
710 let host_agents_by_actor_id: HashMap<hyperactor::ActorAddr, String> = hosts
711 .iter()
712 .map(|(addr, agent_ref)| (agent_ref.actor_addr().clone(), addr.clone()))
713 .collect();
714
715 let started_at = chrono::Utc::now().to_rfc3339();
717 let started_by = std::env::var("USER")
718 .or_else(|_| std::env::var("USERNAME"))
719 .unwrap_or_else(|_| "unknown".to_string());
720
721 Self {
722 hosts: hosts.into_iter().collect(),
723 host_agents_by_actor_id,
724 root_client_actor_id,
725 self_actor_id: None,
726 admin_addr_override: admin_addr,
727 admin_addr: None,
728 admin_host: None,
729 telemetry_url,
730 started_at,
731 started_by,
732 }
733 }
734}
735
736impl std::fmt::Debug for MeshAdminAgent {
737 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
738 f.debug_struct("MeshAdminAgent")
739 .field("hosts", &self.hosts.keys().collect::<Vec<_>>())
740 .field("host_agents", &self.host_agents_by_actor_id.len())
741 .field("root_client_actor_id", &self.root_client_actor_id)
742 .field("self_actor_id", &self.self_actor_id)
743 .field("admin_addr", &self.admin_addr)
744 .field("admin_host", &self.admin_host)
745 .field("started_at", &self.started_at)
746 .field("started_by", &self.started_by)
747 .finish()
748 }
749}
750
751#[derive(Debug, Clone, Serialize, Deserialize, schemars::JsonSchema)]
758pub struct AdminInfo {
759 pub actor_id: String,
761 pub proc_id: String,
763 pub host: String,
765 pub url: String,
767}
768
769impl AdminInfo {
770 pub fn new(actor_id: String, proc_id: String, url: String) -> anyhow::Result<Self> {
777 let parsed = url::Url::parse(&url)
778 .map_err(|e| anyhow::anyhow!("invalid admin URL '{}': {}", url, e))?;
779 let host = parsed
780 .host_str()
781 .ok_or_else(|| anyhow::anyhow!("admin URL '{}' has no host", url))?
782 .to_string();
783 Ok(Self {
784 actor_id,
785 proc_id,
786 host,
787 url,
788 })
789 }
790}
791
792struct BridgeState {
801 admin_ref: ActorRef<MeshAdminAgent>,
804 bridge_cx: Instance<()>,
812 resolve_semaphore: tokio::sync::Semaphore,
816 _bridge_handle: ActorHandle<()>,
818 telemetry_url: Option<String>,
823 http_client: reqwest::Client,
826 admin_info: AdminInfo,
828}
829
830fn build_http_client() -> reqwest::Client {
837 use std::io::Read;
838
839 if let Some(bundle) = hyperactor::channel::try_tls_pem_bundle() {
840 let mut ca_bytes = Vec::new();
841 if let Ok(mut reader) = bundle.ca.reader()
842 && reader.read_to_end(&mut ca_bytes).is_ok()
843 {
844 let (builder, ca_installed) = crate::mesh_admin_client::add_tls(
845 reqwest::Client::builder(),
846 &ca_bytes,
847 None,
848 None,
849 );
850 if ca_installed {
851 if let Ok(client) = builder.build() {
852 return client;
853 }
854 tracing::warn!(
855 "mesh admin: failed to build reqwest client with root CA; \
856 falling back to default trust store"
857 );
858 }
859 }
860 }
861 reqwest::Client::new()
862}
863
864struct TlsListener {
871 tcp: TcpListener,
872 acceptor: TlsAcceptor,
873}
874
875impl axum::serve::Listener for TlsListener {
876 type Io = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
877 type Addr = std::net::SocketAddr;
878
879 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
880 loop {
881 let (stream, addr) = match self.tcp.accept().await {
882 Ok(conn) => conn,
883 Err(e) => {
884 tracing::warn!("TCP accept error: {}", e);
885 continue;
886 }
887 };
888
889 match self.acceptor.accept(stream).await {
890 Ok(tls_stream) => return (tls_stream, addr),
891 Err(e) => {
892 tracing::warn!("TLS handshake failed from {}: {}", addr, e);
893 continue;
894 }
895 }
896 }
897 }
898
899 fn local_addr(&self) -> io::Result<Self::Addr> {
900 self.tcp.local_addr()
901 }
902}
903
904#[async_trait]
905impl Actor for MeshAdminAgent {
906 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
927 this.bind::<Self>();
931 this.set_system();
932 self.self_actor_id = Some(this.self_addr().clone());
933
934 let bind_addr = match self.admin_addr_override {
935 Some(addr) => addr,
936 None => hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR)
937 .parse_socket_addr()
938 .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?,
939 };
940 let listener = TcpListener::bind(bind_addr).await?;
941 let bound_addr = listener.local_addr()?;
942 self.admin_addr = Some(bound_addr);
943
944 let enforce_mtls = cfg!(fbcode_build);
948 let tls_acceptor = try_tls_acceptor(enforce_mtls);
949
950 if enforce_mtls && tls_acceptor.is_none() {
951 return Err(anyhow::anyhow!(
952 "mesh admin requires mTLS but no TLS certificates found; \
953 set HYPERACTOR_TLS_CERT/KEY/CA or ensure Meta cert paths exist \
954 (/var/facebook/x509_identities/server.pem, /var/facebook/rootcanal/ca.pem)"
955 ));
956 }
957
958 let scheme = if tls_acceptor.is_some() {
959 "https"
960 } else {
961 "http"
962 };
963
964 let host = if !bound_addr.ip().is_unspecified() {
974 let ip = bound_addr.ip();
975 if ip.is_loopback() {
976 "localhost".to_string()
977 } else if let std::net::IpAddr::V6(v6) = ip {
978 format!("[{}]", v6)
979 } else {
980 ip.to_string()
981 }
982 } else {
983 advertised_host::from_cert_sans()
984 };
985 self.admin_host = Some(format!("{}://{}:{}", scheme, host, bound_addr.port()));
986
987 let (bridge_cx, bridge_handle) = this
991 .proc()
992 .introspectable_instance(MESH_ADMIN_BRIDGE_NAME)?;
993 bridge_cx.set_system();
994 let admin_url = self
995 .admin_host
996 .clone()
997 .unwrap_or_else(|| "unknown".to_string());
998 let bridge_state = Arc::new(BridgeState {
999 admin_ref: ActorRef::attest(this.self_addr().clone()),
1000 bridge_cx,
1001 resolve_semaphore: tokio::sync::Semaphore::new(hyperactor_config::global::get(
1002 crate::config::MESH_ADMIN_MAX_CONCURRENT_RESOLVES,
1003 )),
1004 _bridge_handle: bridge_handle,
1005 telemetry_url: self.telemetry_url.clone(),
1006 http_client: build_http_client(),
1007 admin_info: AdminInfo::new(
1008 this.self_addr().to_string(),
1009 this.self_addr().proc_addr().to_string(),
1010 admin_url,
1011 )?,
1012 });
1013 let router = create_mesh_admin_router(bridge_state);
1014
1015 if let Some(acceptor) = tls_acceptor {
1016 let tls_listener = TlsListener {
1017 tcp: listener,
1018 acceptor,
1019 };
1020 tokio::spawn(async move {
1021 if let Err(e) = axum::serve(tls_listener, router).await {
1022 tracing::error!("mesh admin server (mTLS) error: {}", e);
1023 }
1024 });
1025 } else {
1026 tokio::spawn(async move {
1028 if let Err(e) = axum::serve(listener, router).await {
1029 tracing::error!("mesh admin server error: {}", e);
1030 }
1031 });
1032 }
1033
1034 tracing::info!(
1035 "mesh admin server listening on {}",
1036 self.admin_host.as_deref().unwrap_or("unknown")
1037 );
1038 Ok(())
1039 }
1040
1041 async fn handle_undeliverable_message(
1055 &mut self,
1056 _cx: &Instance<Self>,
1057 undeliverable: hyperactor::mailbox::Undeliverable<hyperactor::mailbox::MessageEnvelope>,
1058 ) -> Result<(), anyhow::Error> {
1059 match undeliverable {
1060 hyperactor::mailbox::Undeliverable::Message(envelope) => {
1061 tracing::debug!(
1062 "admin agent: undeliverable message to {} (port not bound?), ignoring",
1063 envelope.dest(),
1064 );
1065 }
1066 hyperactor::mailbox::Undeliverable::Lost(lost) => {
1067 tracing::debug!(
1068 "admin agent: lost message to {} ({}), ignoring",
1069 lost.dest,
1070 lost.error,
1071 );
1072 }
1073 }
1074 Ok(())
1075 }
1076}
1077
1078#[async_trait]
1081impl Handler<MeshAdminMessage> for MeshAdminAgent {
1082 async fn handle(
1089 &mut self,
1090 cx: &Context<Self>,
1091 msg: MeshAdminMessage,
1092 ) -> Result<(), anyhow::Error> {
1093 match msg {
1094 MeshAdminMessage::GetAdminAddr { reply } => {
1095 let resp = MeshAdminAddrResponse {
1096 addr: self.admin_host.clone(),
1097 };
1098 reply.post(cx, resp);
1099 }
1100 }
1101 Ok(())
1102 }
1103}
1104
1105#[async_trait]
1108impl Handler<ResolveReferenceMessage> for MeshAdminAgent {
1109 async fn handle(
1117 &mut self,
1118 cx: &Context<Self>,
1119 msg: ResolveReferenceMessage,
1120 ) -> Result<(), anyhow::Error> {
1121 match msg {
1122 ResolveReferenceMessage::Resolve {
1123 reference_string,
1124 reply,
1125 } => {
1126 let response = ResolveReferenceResponse(
1127 self.resolve_reference(cx, &reference_string)
1128 .await
1129 .map_err(|e| format!("{:#}", e)),
1130 );
1131 reply.post(cx, response);
1132 }
1133 }
1134 Ok(())
1135 }
1136}
1137
1138impl MeshAdminAgent {
1139 async fn resolve_reference(
1158 &self,
1159 cx: &Context<'_, Self>,
1160 reference_string: &str,
1161 ) -> Result<NodePayload, anyhow::Error> {
1162 let node_ref: crate::introspect::NodeRef = reference_string
1163 .parse()
1164 .map_err(|e| anyhow::anyhow!("invalid reference '{}': {}", reference_string, e))?;
1165
1166 match &node_ref {
1167 crate::introspect::NodeRef::Root => Ok(self.build_root_payload()),
1168 crate::introspect::NodeRef::Host(actor_id) => {
1169 self.resolve_host_node(cx, actor_id).await
1170 }
1171 crate::introspect::NodeRef::Proc(proc_id) => {
1172 match self.resolve_proc_node(cx, proc_id).await {
1173 Ok(payload) => Ok(payload),
1174 Err(_) if self.standalone_proc_anchor(proc_id).is_some() => {
1175 self.resolve_standalone_proc_node(cx, proc_id).await
1176 }
1177 Err(e) => Err(e),
1178 }
1179 }
1180 crate::introspect::NodeRef::Actor(actor_id) => {
1181 self.resolve_actor_node(cx, actor_id).await
1182 }
1183 }
1184 }
1185
1186 fn standalone_proc_actors(&self) -> impl Iterator<Item = &hyperactor::ActorAddr> {
1194 std::iter::empty()
1195 }
1196
1197 fn standalone_proc_anchor(&self, proc_id: &ProcAddr) -> Option<&hyperactor::ActorAddr> {
1200 self.standalone_proc_actors()
1201 .find(|actor_id| actor_id.proc_addr() == *proc_id)
1202 }
1203
1204 fn is_standalone_proc_actor(&self, actor_id: &hyperactor::ActorAddr) -> bool {
1206 self.standalone_proc_actors()
1207 .any(|a| a.proc_addr() == actor_id.proc_addr())
1208 }
1209
1210 fn build_root_payload(&self) -> NodePayload {
1216 use crate::introspect::NodeRef;
1217
1218 let children: Vec<NodeRef> = self
1219 .hosts
1220 .values()
1221 .map(|agent| NodeRef::Host(agent.actor_addr().clone()))
1222 .collect();
1223 let system_children: Vec<NodeRef> = Vec::new(); let mut attrs = hyperactor_config::Attrs::new();
1225 attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
1226 attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
1227 if let Ok(t) = humantime::parse_rfc3339(&self.started_at) {
1228 attrs.set(crate::introspect::STARTED_AT, t);
1229 }
1230 attrs.set(crate::introspect::STARTED_BY, self.started_by.clone());
1231 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1232 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1233 NodePayload {
1234 identity: NodeRef::Root,
1235 properties: crate::introspect::derive_properties(&attrs_json),
1236 children,
1237 parent: None,
1238 as_of: std::time::SystemTime::now(),
1239 }
1240 }
1241
1242 async fn resolve_host_node(
1251 &self,
1252 cx: &Context<'_, Self>,
1253 actor_id: &hyperactor::ActorAddr,
1254 ) -> Result<NodePayload, anyhow::Error> {
1255 let result = query_introspect(
1256 cx,
1257 actor_id,
1258 hyperactor::introspect::IntrospectView::Entity,
1259 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1260 "querying host agent",
1261 )
1262 .await?;
1263 Ok(crate::introspect::to_node_payload_with(
1264 result,
1265 crate::introspect::NodeRef::Host(actor_id.clone()),
1266 Some(crate::introspect::NodeRef::Root),
1267 ))
1268 }
1269
1270 async fn resolve_proc_node(
1281 &self,
1282 cx: &Context<'_, Self>,
1283 proc_id: &ProcAddr,
1284 ) -> Result<NodePayload, anyhow::Error> {
1285 let host_addr = proc_id.addr().to_string();
1286
1287 let agent = self
1288 .hosts
1289 .get(&host_addr)
1290 .ok_or_else(|| anyhow::anyhow!("host not found: {}", host_addr))?;
1291
1292 let result = query_child_introspect(
1294 cx,
1295 agent.actor_addr(),
1296 hyperactor::Addr::Proc(proc_id.clone()),
1297 hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1298 "querying proc details",
1299 )
1300 .await?;
1301
1302 let payload = crate::introspect::to_node_payload_with(
1306 result,
1307 crate::introspect::NodeRef::Proc(proc_id.clone()),
1308 Some(crate::introspect::NodeRef::Host(agent.actor_addr().clone())),
1309 );
1310 if !matches!(payload.properties, NodeProperties::Error { .. }) {
1311 return Ok(payload);
1312 }
1313
1314 let mesh_agent_id = proc_id.actor_addr(PROC_AGENT_ACTOR_NAME);
1316 let result = query_child_introspect(
1317 cx,
1318 &mesh_agent_id,
1319 hyperactor::Addr::Proc(proc_id.clone()),
1320 hyperactor_config::global::get(crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT),
1321 "querying proc mesh agent",
1322 )
1323 .await?;
1324
1325 Ok(crate::introspect::to_node_payload_with(
1326 result,
1327 crate::introspect::NodeRef::Proc(proc_id.clone()),
1328 Some(crate::introspect::NodeRef::Host(agent.actor_addr().clone())),
1329 ))
1330 }
1331
1332 async fn resolve_standalone_proc_node(
1346 &self,
1347 cx: &Context<'_, Self>,
1348 proc_id: &ProcAddr,
1349 ) -> Result<NodePayload, anyhow::Error> {
1350 let actor_id = self
1351 .standalone_proc_anchor(proc_id)
1352 .ok_or_else(|| anyhow::anyhow!("no anchor actor for standalone proc {}", proc_id))?;
1353
1354 use crate::introspect::NodeRef;
1355
1356 let (children, system_children) = if self.self_actor_id.as_ref() == Some(actor_id) {
1357 let self_ref = NodeRef::Actor(actor_id.clone());
1358 (vec![self_ref.clone()], vec![self_ref])
1359 } else {
1360 let actor_result = query_introspect(
1361 cx,
1362 actor_id,
1363 hyperactor::introspect::IntrospectView::Actor,
1364 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1365 &format!("querying anchor actor on {}", proc_id),
1366 )
1367 .await?;
1368 let actor_payload = to_node_payload(actor_result);
1369 let anchor_ref = NodeRef::Actor(actor_id.clone());
1370 let anchor_is_system = matches!(
1371 &actor_payload.properties,
1372 NodeProperties::Actor {
1373 is_system: true,
1374 ..
1375 }
1376 );
1377
1378 let mut children = vec![anchor_ref.clone()];
1379 let mut system_children = Vec::new();
1380 if anchor_is_system {
1381 system_children.push(anchor_ref);
1382 }
1383
1384 for child_ref in actor_payload.children {
1385 let child_actor_id = match &child_ref {
1386 NodeRef::Actor(id) => Some(id),
1387 _ => None,
1388 };
1389 if let Some(child_actor_id) = child_actor_id {
1390 let child_is_system = if let Ok(r) = query_introspect(
1391 cx,
1392 child_actor_id,
1393 hyperactor::introspect::IntrospectView::Actor,
1394 hyperactor_config::global::get(
1395 crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1396 ),
1397 "querying child actor is_system",
1398 )
1399 .await
1400 {
1401 let p = to_node_payload(r);
1402 matches!(
1403 &p.properties,
1404 NodeProperties::Actor {
1405 is_system: true,
1406 ..
1407 }
1408 )
1409 } else {
1410 false
1411 };
1412 if child_is_system {
1413 system_children.push(child_ref.clone());
1414 }
1415 }
1416 children.push(child_ref);
1417 }
1418 (children, system_children)
1419 };
1420
1421 let proc_name = proc_id
1422 .label()
1423 .map(|l| l.as_str().to_string())
1424 .unwrap_or_else(|| proc_id.id().to_string());
1425
1426 let mut attrs = hyperactor_config::Attrs::new();
1427 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
1428 attrs.set(crate::introspect::PROC_NAME, proc_name.clone());
1429 attrs.set(crate::introspect::NUM_ACTORS, children.len());
1430 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1431 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1432
1433 Ok(NodePayload {
1434 identity: NodeRef::Proc(proc_id.clone()),
1435 properties: crate::introspect::derive_properties(&attrs_json),
1436 children,
1437 as_of: std::time::SystemTime::now(),
1438 parent: Some(NodeRef::Root),
1439 })
1440 }
1441
1442 async fn resolve_actor_node(
1456 &self,
1457 cx: &Context<'_, Self>,
1458 actor_id: &hyperactor::ActorAddr,
1459 ) -> Result<NodePayload, anyhow::Error> {
1460 let result = if self.self_actor_id.as_ref() == Some(actor_id) {
1465 cx.introspect_payload()
1466 } else if self.is_standalone_proc_actor(actor_id) {
1467 query_introspect(
1469 cx,
1470 actor_id,
1471 hyperactor::introspect::IntrospectView::Actor,
1472 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
1473 &format!("querying actor {}", actor_id),
1474 )
1475 .await?
1476 } else {
1477 let proc_id = actor_id.proc_addr();
1479 let mesh_agent_id = proc_id.actor_addr(PROC_AGENT_ACTOR_NAME);
1480 let terminated = query_child_introspect(
1481 cx,
1482 &mesh_agent_id,
1483 hyperactor::Addr::Actor(actor_id.clone()),
1484 hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT),
1485 "querying terminated snapshot",
1486 )
1487 .await
1488 .ok()
1489 .filter(|r| {
1490 let p = crate::introspect::derive_properties(&r.attrs);
1491 !matches!(p, NodeProperties::Error { .. })
1492 });
1493
1494 match terminated {
1495 Some(snapshot) => snapshot,
1496 None => {
1497 query_introspect(
1499 cx,
1500 actor_id,
1501 hyperactor::introspect::IntrospectView::Actor,
1502 hyperactor_config::global::get(
1503 crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT,
1504 ),
1505 &format!("querying actor {}", actor_id),
1506 )
1507 .await?
1508 }
1509 }
1510 };
1511 let mut payload = to_node_payload(result);
1512
1513 if self.is_standalone_proc_actor(actor_id) {
1514 payload.parent = Some(crate::introspect::NodeRef::Proc(actor_id.proc_addr()));
1515 return Ok(payload);
1516 }
1517
1518 let proc_id = actor_id.proc_addr();
1519 match &payload.properties {
1520 NodeProperties::Proc { .. } => {
1521 let host_addr = proc_id.addr().to_string();
1522 if let Some(agent) = self.hosts.get(&host_addr) {
1523 payload.parent =
1524 Some(crate::introspect::NodeRef::Host(agent.actor_addr().clone()));
1525 }
1526 }
1527 _ => {
1528 payload.parent = Some(crate::introspect::NodeRef::Proc(proc_id.clone()));
1529 }
1530 }
1531
1532 Ok(payload)
1533 }
1534}
1535
1536fn create_mesh_admin_router(bridge_state: Arc<BridgeState>) -> Router {
1552 Router::new()
1553 .route("/SKILL.md", get(serve_skill_md))
1554 .route("/v1/admin", get(serve_admin_info))
1556 .route("/v1/schema", get(serve_schema))
1557 .route("/v1/schema/admin", get(serve_admin_schema))
1558 .route("/v1/schema/error", get(serve_error_schema))
1559 .route("/v1/openapi.json", get(serve_openapi))
1560 .route("/v1/tree", get(tree_dump))
1561 .route("/v1/query", post(query_proxy))
1562 .route("/v1/pyspy/{*proc_reference}", get(pyspy_bridge))
1563 .route(
1564 "/v1/pyspy_dump/{*proc_reference}",
1565 post(pyspy_dump_and_store),
1566 )
1567 .route(
1568 "/v1/pyspy_profile_svg/{*proc_reference}",
1569 post(pyspy_profile_svg),
1570 )
1571 .route("/v1/config/{*proc_reference}", get(config_bridge))
1572 .route("/v1/{*reference}", get(resolve_reference_bridge))
1573 .with_state(bridge_state)
1574}
1575
1576const SKILL_MD_TEMPLATE: &str = include_str!("mesh_admin_skill.md");
1578
1579fn extract_base_url(headers: &axum::http::HeaderMap) -> String {
1585 let host = headers
1586 .get(axum::http::header::HOST)
1587 .and_then(|v| v.to_str().ok())
1588 .unwrap_or("localhost");
1589 let scheme = headers
1590 .get("x-forwarded-proto")
1591 .and_then(|v| v.to_str().ok())
1592 .unwrap_or("https");
1593 format!("{scheme}://{host}")
1594}
1595
1596async fn serve_admin_info(
1599 State(state): State<Arc<BridgeState>>,
1600) -> axum::response::Json<AdminInfo> {
1601 axum::response::Json(state.admin_info.clone())
1602}
1603
1604async fn serve_admin_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1606 Ok(axum::response::Json(schema_with_id::<AdminInfo>(
1607 "https://monarch.meta.com/schemas/v1/admin_info",
1608 )?))
1609}
1610
1611async fn serve_skill_md(headers: axum::http::HeaderMap) -> impl axum::response::IntoResponse {
1614 let base = extract_base_url(&headers);
1615 let body = SKILL_MD_TEMPLATE.replace("{base}", &base);
1616 (
1617 [(
1618 axum::http::header::CONTENT_TYPE,
1619 "text/markdown; charset=utf-8",
1620 )],
1621 body,
1622 )
1623}
1624
1625fn schema_with_id<T: schemars::JsonSchema>(id: &str) -> Result<serde_json::Value, ApiError> {
1627 let schema = schemars::schema_for!(T);
1628 let mut value = serde_json::to_value(schema).map_err(|e| ApiError {
1629 code: "internal_error".to_string(),
1630 message: format!("failed to serialize schema: {e}"),
1631 details: None,
1632 })?;
1633 if let Some(obj) = value.as_object_mut() {
1634 obj.insert("$id".into(), serde_json::Value::String(id.into()));
1635 }
1636 Ok(value)
1637}
1638
1639async fn serve_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1641 Ok(axum::response::Json(schema_with_id::<NodePayloadDto>(
1642 "https://monarch.meta.com/schemas/v1/node_payload",
1643 )?))
1644}
1645
1646async fn serve_error_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1648 Ok(axum::response::Json(schema_with_id::<ApiErrorEnvelope>(
1649 "https://monarch.meta.com/schemas/v1/error",
1650 )?))
1651}
1652
1653fn hoist_defs(
1657 schema: &mut serde_json::Value,
1658 shared: &mut serde_json::Map<String, serde_json::Value>,
1659) {
1660 if let Some(obj) = schema.as_object_mut() {
1661 if let Some(defs) = obj.remove("$defs")
1662 && let Some(defs_map) = defs.as_object()
1663 {
1664 for (k, v) in defs_map {
1665 shared.insert(k.clone(), v.clone());
1666 }
1667 }
1668 obj.remove("$schema");
1672 }
1673 rewrite_refs(schema);
1674}
1675
1676fn rewrite_refs(value: &mut serde_json::Value) {
1679 match value {
1680 serde_json::Value::Object(map) => {
1681 if let Some(serde_json::Value::String(r)) = map.get_mut("$ref")
1682 && r.starts_with("#/$defs/")
1683 {
1684 *r = r.replace("#/$defs/", "#/components/schemas/");
1685 }
1686 for v in map.values_mut() {
1687 rewrite_refs(v);
1688 }
1689 }
1690 serde_json::Value::Array(arr) => {
1691 for v in arr {
1692 rewrite_refs(v);
1693 }
1694 }
1695 _ => {}
1696 }
1697}
1698
1699pub fn build_openapi_spec() -> serde_json::Value {
1702 let mut node_schema = serde_json::to_value(schemars::schema_for!(NodePayloadDto))
1703 .expect("NodePayload schema must be serializable");
1704 let mut error_schema = serde_json::to_value(schemars::schema_for!(ApiErrorEnvelope))
1705 .expect("ApiErrorEnvelope schema must be serializable");
1706 let mut pyspy_schema = serde_json::to_value(schemars::schema_for!(PySpyResult))
1707 .expect("PySpyResult schema must be serializable");
1708 let mut query_request_schema = serde_json::to_value(schemars::schema_for!(QueryRequest))
1709 .expect("QueryRequest schema must be serializable");
1710 let mut query_response_schema = serde_json::to_value(schemars::schema_for!(QueryResponse))
1711 .expect("QueryResponse schema must be serializable");
1712 let mut pyspy_dump_response_schema =
1713 serde_json::to_value(schemars::schema_for!(PyspyDumpAndStoreResponse))
1714 .expect("PyspyDumpAndStoreResponse schema must be serializable");
1715 let mut admin_info_schema = serde_json::to_value(schemars::schema_for!(AdminInfo))
1716 .expect("AdminInfo schema must be serializable");
1717 let mut profile_opts_schema = serde_json::to_value(schemars::schema_for!(PySpyProfileOpts))
1718 .expect("PySpyProfileOpts schema must be serializable");
1719
1720 let mut shared_schemas = serde_json::Map::new();
1723 hoist_defs(&mut node_schema, &mut shared_schemas);
1724 hoist_defs(&mut error_schema, &mut shared_schemas);
1725 hoist_defs(&mut pyspy_schema, &mut shared_schemas);
1726 hoist_defs(&mut query_request_schema, &mut shared_schemas);
1727 hoist_defs(&mut query_response_schema, &mut shared_schemas);
1728 hoist_defs(&mut pyspy_dump_response_schema, &mut shared_schemas);
1729 hoist_defs(&mut admin_info_schema, &mut shared_schemas);
1730 hoist_defs(&mut profile_opts_schema, &mut shared_schemas);
1731 shared_schemas.insert("NodePayload".into(), node_schema);
1732 shared_schemas.insert("ApiErrorEnvelope".into(), error_schema);
1733 shared_schemas.insert("PySpyResult".into(), pyspy_schema);
1734 shared_schemas.insert("QueryRequest".into(), query_request_schema);
1735 shared_schemas.insert("QueryResponse".into(), query_response_schema);
1736 shared_schemas.insert(
1737 "PyspyDumpAndStoreResponse".into(),
1738 pyspy_dump_response_schema,
1739 );
1740 shared_schemas.insert("AdminInfo".into(), admin_info_schema);
1741 shared_schemas.insert("PySpyProfileOpts".into(), profile_opts_schema);
1742
1743 for value in shared_schemas.values_mut() {
1745 rewrite_refs(value);
1746 }
1747
1748 let error_response = |desc: &str| -> serde_json::Value {
1749 serde_json::json!({
1750 "description": desc,
1751 "content": {
1752 "application/json": {
1753 "schema": { "$ref": "#/components/schemas/ApiErrorEnvelope" }
1754 }
1755 }
1756 })
1757 };
1758
1759 let success_payload = serde_json::json!({
1760 "description": "Resolved NodePayload",
1761 "content": {
1762 "application/json": {
1763 "schema": { "$ref": "#/components/schemas/NodePayload" }
1764 }
1765 }
1766 });
1767
1768 let mut spec = serde_json::json!({
1769 "openapi": "3.1.0",
1770 "info": {
1771 "title": "Monarch Mesh Admin API",
1772 "version": "1.0.0",
1773 "description": "Address-walking introspection API for a Monarch actor mesh. See the Admin Gateway Pattern RFC."
1774 },
1775 "paths": {
1776 "/v1/root": {
1777 "get": {
1778 "summary": "Fetch root node",
1779 "operationId": "getRoot",
1780 "responses": {
1781 "200": success_payload,
1782 "500": error_response("Internal error"),
1783 "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1784 "504": error_response("Gateway timeout (downstream host unresponsive)")
1785 }
1786 }
1787 },
1788 "/v1/{reference}": {
1789 "get": {
1790 "summary": "Resolve a reference to a NodePayload",
1791 "operationId": "resolveReference",
1792 "parameters": [{
1793 "name": "reference",
1794 "in": "path",
1795 "required": true,
1796 "description": "URL-encoded opaque reference string",
1797 "schema": { "type": "string" }
1798 }],
1799 "responses": {
1800 "200": success_payload,
1801 "400": error_response("Bad request (malformed reference)"),
1802 "404": error_response("Address not found"),
1803 "500": error_response("Internal error"),
1804 "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1805 "504": error_response("Gateway timeout (downstream host unresponsive)")
1806 }
1807 }
1808 },
1809 "/v1/schema": {
1810 "get": {
1811 "summary": "JSON Schema for NodePayload (Draft 2020-12)",
1812 "operationId": "getSchema",
1813 "responses": {
1814 "200": {
1815 "description": "JSON Schema document",
1816 "content": { "application/json": {} }
1817 }
1818 }
1819 }
1820 },
1821 "/v1/schema/error": {
1822 "get": {
1823 "summary": "JSON Schema for ApiErrorEnvelope (Draft 2020-12)",
1824 "operationId": "getErrorSchema",
1825 "responses": {
1826 "200": {
1827 "description": "JSON Schema document",
1828 "content": { "application/json": {} }
1829 }
1830 }
1831 }
1832 },
1833 "/v1/admin": {
1834 "get": {
1835 "summary": "Admin self-identification (placement, identity, URL)",
1836 "operationId": "getAdminInfo",
1837 "description": "Returns the admin actor's identity, proc placement, hostname, and URL. Used for placement verification and operational discovery.",
1838 "responses": {
1839 "200": {
1840 "description": "AdminInfo — admin actor placement metadata",
1841 "content": {
1842 "application/json": {
1843 "schema": { "$ref": "#/components/schemas/AdminInfo" }
1844 }
1845 }
1846 }
1847 }
1848 }
1849 },
1850 "/v1/tree": {
1851 "get": {
1852 "summary": "ASCII topology dump (debug)",
1853 "operationId": "getTree",
1854 "responses": {
1855 "200": {
1856 "description": "Human-readable topology tree",
1857 "content": { "text/plain": {} }
1858 }
1859 }
1860 }
1861 },
1862 "/v1/config/{proc_reference}": {
1863 "get": {
1864 "summary": "Config snapshot for a proc",
1865 "operationId": "getConfig",
1866 "description": "Returns the effective CONFIG-marked configuration entries from the target process. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1867 "parameters": [{
1868 "name": "proc_reference",
1869 "in": "path",
1870 "required": true,
1871 "description": "URL-encoded proc reference (ProcAddr)",
1872 "schema": { "type": "string" }
1873 }],
1874 "responses": {
1875 "200": {
1876 "description": "ConfigDumpResult — sorted list of config entries",
1877 "content": {
1878 "application/json": {
1879 "schema": {
1880 "type": "object",
1881 "properties": {
1882 "entries": {
1883 "type": "array",
1884 "items": {
1885 "type": "object",
1886 "properties": {
1887 "name": { "type": "string" },
1888 "value": { "type": "string" },
1889 "default_value": { "type": ["string", "null"] },
1890 "source": { "type": "string" },
1891 "changed_from_default": { "type": "boolean" },
1892 "env_var": { "type": ["string", "null"] }
1893 }
1894 }
1895 }
1896 }
1897 }
1898 }
1899 }
1900 },
1901 "404": error_response("Proc not found or handler not reachable"),
1902 "500": error_response("Internal error"),
1903 "504": error_response("Gateway timeout")
1904 }
1905 }
1906 },
1907 "/v1/pyspy/{proc_reference}": {
1908 "get": {
1909 "summary": "Py-spy stack dump for a proc",
1910 "operationId": "getPyspy",
1911 "description": "Runs py-spy against the target process and returns structured stack traces. Routes to ProcAgent (worker procs) or HostAgent (service proc).",
1912 "parameters": [{
1913 "name": "proc_reference",
1914 "in": "path",
1915 "required": true,
1916 "description": "URL-encoded proc reference (ProcAddr)",
1917 "schema": { "type": "string" }
1918 }],
1919 "responses": {
1920 "200": {
1921 "description": "PySpyResult — one of Ok, BinaryNotFound, or Failed",
1922 "content": {
1923 "application/json": {
1924 "schema": { "$ref": "#/components/schemas/PySpyResult" }
1925 }
1926 }
1927 },
1928 "400": error_response("Bad request (malformed proc reference)"),
1929 "404": error_response("Proc not found or handler not reachable"),
1930 "500": error_response("Internal error"),
1931 "504": error_response("Gateway timeout")
1932 }
1933 }
1934 },
1935 "/v1/query": {
1936 "post": {
1937 "summary": "Proxy SQL query to the telemetry dashboard",
1938 "operationId": "queryProxy",
1939 "description": "Forwards a SQL query to the Monarch dashboard's DataFusion engine. Requires telemetry_url to be configured.",
1940 "requestBody": {
1941 "required": true,
1942 "content": {
1943 "application/json": {
1944 "schema": { "$ref": "#/components/schemas/QueryRequest" }
1945 }
1946 }
1947 },
1948 "responses": {
1949 "200": {
1950 "description": "Query results",
1951 "content": {
1952 "application/json": {
1953 "schema": { "$ref": "#/components/schemas/QueryResponse" }
1954 }
1955 }
1956 },
1957 "400": error_response("Bad request (invalid SQL or missing sql field)"),
1958 "404": error_response("Dashboard not configured"),
1959 "500": error_response("Internal error"),
1960 "504": error_response("Gateway timeout")
1961 }
1962 }
1963 },
1964 "/v1/pyspy_dump/{proc_reference}": {
1965 "post": {
1966 "summary": "Trigger py-spy dump and store in telemetry",
1967 "operationId": "pyspyDumpAndStore",
1968 "description": "Runs py-spy against the target process, stores the result in the dashboard's DataFusion pyspy tables, and returns the dump_id.",
1969 "parameters": [{
1970 "name": "proc_reference",
1971 "in": "path",
1972 "required": true,
1973 "description": "URL-encoded proc reference (ProcAddr)",
1974 "schema": { "type": "string" }
1975 }],
1976 "responses": {
1977 "200": {
1978 "description": "Dump stored successfully",
1979 "content": {
1980 "application/json": {
1981 "schema": { "$ref": "#/components/schemas/PyspyDumpAndStoreResponse" }
1982 }
1983 }
1984 },
1985 "400": error_response("Bad request (malformed proc reference)"),
1986 "404": error_response("Proc or dashboard not found"),
1987 "500": error_response("Internal error"),
1988 "504": error_response("Gateway timeout")
1989 }
1990 }
1991 }
1992 },
1993 "components": {
1994 "schemas": serde_json::Value::Object(shared_schemas)
1995 }
1996 });
1997
1998 if let Some(paths) = spec.pointer_mut("/paths").and_then(|v| v.as_object_mut()) {
2001 paths.insert(
2002 "/v1/schema/admin".into(),
2003 serde_json::json!({
2004 "get": {
2005 "summary": "JSON Schema for AdminInfo (Draft 2020-12)",
2006 "operationId": "getAdminSchema",
2007 "responses": {
2008 "200": {
2009 "description": "JSON Schema document",
2010 "content": { "application/json": {} }
2011 }
2012 }
2013 }
2014 }),
2015 );
2016 paths.insert(
2017 "/v1/pyspy_profile_svg/{proc_reference}".into(),
2018 serde_json::json!({
2019 "post": {
2020 "summary": "Profile a proc and return SVG flamegraph",
2021 "operationId": "pyspyProfileSvg",
2022 "description": "Runs py-spy record against the target process for the requested duration and returns an SVG flamegraph. Timeout scales with duration_s.",
2023 "parameters": [{
2024 "name": "proc_reference",
2025 "in": "path",
2026 "required": true,
2027 "description": "URL-encoded proc reference (ProcAddr)",
2028 "schema": { "type": "string" }
2029 }],
2030 "requestBody": {
2031 "required": true,
2032 "content": {
2033 "application/json": {
2034 "schema": { "$ref": "#/components/schemas/PySpyProfileOpts" }
2035 }
2036 }
2037 },
2038 "responses": {
2039 "200": {
2040 "description": "SVG flamegraph",
2041 "content": { "image/svg+xml": {} }
2042 },
2043 "400": error_response("Bad request (invalid duration/rate or malformed proc reference)"),
2044 "404": error_response("Proc not found or handler not reachable"),
2045 "500": error_response("Internal error (profile failed or SVG generation failed)"),
2046 "503": error_response("Service unavailable (py-spy not available on target host)"),
2047 "504": error_response("Gateway timeout (subprocess timed out)")
2048 }
2049 }
2050 }),
2051 );
2052 }
2053
2054 spec
2055}
2056
2057async fn serve_openapi() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
2059 Ok(axum::response::Json(build_openapi_spec()))
2060}
2061
2062fn parse_proc_reference(raw: &str) -> Result<(String, ProcAddr), ApiError> {
2065 let trimmed = raw.trim_start_matches('/');
2066 if trimmed.is_empty() {
2067 return Err(ApiError::bad_request("empty proc reference", None));
2068 }
2069 let decoded = urlencoding::decode(trimmed)
2070 .map(|cow| cow.into_owned())
2071 .map_err(|_| {
2072 ApiError::bad_request(
2073 "malformed percent-encoding: decoded bytes are not valid UTF-8",
2074 None,
2075 )
2076 })?;
2077 let proc_id: ProcAddr = decoded
2078 .parse()
2079 .map_err(|e| ApiError::bad_request(format!("invalid proc reference: {}", e), None))?;
2080 Ok((decoded, proc_id))
2081}
2082
2083async fn probe_actor(
2089 cx: &Instance<()>,
2090 agent_id: &hyperactor::ActorAddr,
2091) -> Result<bool, ApiError> {
2092 let port = hyperactor::PortRef::<IntrospectMessage>::attest_handler_port(agent_id);
2093 let (handle, rx) = open_once_port::<IntrospectResult>(cx);
2094 port.post(
2095 cx,
2096 IntrospectMessage::Query {
2097 view: IntrospectView::Entity,
2098 reply: handle.bind(),
2099 },
2100 );
2101
2102 let timeout = hyperactor_config::global::get(crate::config::MESH_ADMIN_QUERY_CHILD_TIMEOUT);
2103 match tokio::time::timeout(timeout, rx.recv()).await {
2104 Ok(Ok(_)) => Ok(true),
2105 Ok(Err(e)) => {
2106 tracing::debug!(
2107 name = "pyspy_probe_recv_failed",
2108 %agent_id,
2109 error = %e,
2110 );
2111 Ok(false)
2112 }
2113 Err(_elapsed) => {
2114 tracing::debug!(
2115 name = "pyspy_probe_timeout",
2116 %agent_id,
2117 );
2118 Ok(false)
2119 }
2120 }
2121}
2122
2123enum ResolvedProcHandler {
2130 Host(ActorRef<HostAgent>),
2131 Proc(ActorRef<ProcAgent>),
2132}
2133
2134impl ResolvedProcHandler {
2135 fn agent_id(&self) -> hyperactor::ActorAddr {
2136 match self {
2137 Self::Host(r) => r.actor_addr().clone(),
2138 Self::Proc(r) => r.actor_addr().clone(),
2139 }
2140 }
2141
2142 async fn pyspy_dump(
2143 &self,
2144 cx: &impl hyperactor::context::Actor,
2145 opts: PySpyOpts,
2146 timeout: std::time::Duration,
2147 ) -> Result<PySpyResult, ApiError> {
2148 let (reply_handle, reply_rx) = open_once_port::<PySpyResult>(cx);
2149 let mut reply_ref = reply_handle.bind();
2150 reply_ref.return_undeliverable(false);
2151 let msg = PySpyDump {
2152 opts,
2153 result: reply_ref,
2154 };
2155 match self {
2156 Self::Host(r) => r.post(cx, msg),
2157 Self::Proc(r) => r.post(cx, msg),
2158 };
2159 tokio::time::timeout(timeout, reply_rx.recv())
2160 .await
2161 .map_err(|_| ApiError {
2162 code: "gateway_timeout".to_string(),
2163 message: "timed out waiting for py-spy dump".to_string(),
2164 details: None,
2165 })?
2166 .map_err(|e| ApiError {
2167 code: "internal_error".to_string(),
2168 message: format!("failed to receive PySpyResult: {}", e),
2169 details: None,
2170 })
2171 }
2172
2173 async fn pyspy_profile(
2174 &self,
2175 cx: &impl hyperactor::context::Actor,
2176 request: ValidatedProfileRequest,
2177 timeout: std::time::Duration,
2178 ) -> Result<PySpyProfileResult, ApiError> {
2179 let (reply_handle, reply_rx) = open_once_port::<PySpyProfileResult>(cx);
2180 let mut reply_ref = reply_handle.bind();
2181 reply_ref.return_undeliverable(false);
2182 let msg = PySpyProfile {
2183 request,
2184 result: reply_ref,
2185 };
2186 match self {
2187 Self::Host(r) => r.post(cx, msg),
2188 Self::Proc(r) => r.post(cx, msg),
2189 };
2190 tokio::time::timeout(timeout, reply_rx.recv())
2191 .await
2192 .map_err(|_| ApiError {
2193 code: "gateway_timeout".to_string(),
2194 message: "timed out waiting for py-spy profile".to_string(),
2195 details: None,
2196 })?
2197 .map_err(|e| ApiError {
2198 code: "internal_error".to_string(),
2199 message: format!("failed to receive PySpyProfileResult: {}", e),
2200 details: None,
2201 })
2202 }
2203
2204 async fn config_dump(
2205 &self,
2206 cx: &impl hyperactor::context::Actor,
2207 timeout: std::time::Duration,
2208 ) -> Result<ConfigDumpResult, ApiError> {
2209 let (reply_handle, reply_rx) = open_once_port::<ConfigDumpResult>(cx);
2210 let mut reply_ref = reply_handle.bind();
2211 reply_ref.return_undeliverable(false);
2212 let msg = ConfigDump { result: reply_ref };
2213 match self {
2214 Self::Host(r) => r.post(cx, msg),
2215 Self::Proc(r) => r.post(cx, msg),
2216 };
2217 tokio::time::timeout(timeout, reply_rx.recv())
2218 .await
2219 .map_err(|_| ApiError {
2220 code: "gateway_timeout".to_string(),
2221 message: "timed out waiting for config dump".to_string(),
2222 details: None,
2223 })?
2224 .map_err(|e| ApiError {
2225 code: "internal_error".to_string(),
2226 message: format!("failed to receive ConfigDumpResult: {}", e),
2227 details: None,
2228 })
2229 }
2230}
2231
2232fn route_proc_handler(raw_proc_reference: &str) -> Result<ResolvedProcHandler, ApiError> {
2236 let (_proc_reference, proc_id) = parse_proc_reference(raw_proc_reference)?;
2237 let is_service = proc_id
2238 .uid()
2239 .as_singleton()
2240 .is_some_and(|label| label.as_str() == SERVICE_PROC_NAME);
2241 if is_service {
2242 let agent_id = proc_id.actor_addr(HOST_MESH_AGENT_ACTOR_NAME);
2243 Ok(ResolvedProcHandler::Host(ActorRef::attest(agent_id)))
2244 } else {
2245 let agent_id = proc_id.actor_addr(PROC_AGENT_ACTOR_NAME);
2246 Ok(ResolvedProcHandler::Proc(ActorRef::attest(agent_id)))
2247 }
2248}
2249
2250async fn resolve_proc_handler(
2252 state: &BridgeState,
2253 raw_proc_reference: &str,
2254) -> Result<ResolvedProcHandler, ApiError> {
2255 let handler = route_proc_handler(raw_proc_reference)?;
2256 let cx = &state.bridge_cx;
2257 if !probe_actor(cx, &handler.agent_id()).await? {
2258 return Err(ApiError::not_found(
2259 format!(
2260 "proc does not have a reachable handler ({})",
2261 raw_proc_reference,
2262 ),
2263 None,
2264 ));
2265 }
2266 Ok(handler)
2267}
2268
2269async fn do_pyspy_dump(
2270 state: &BridgeState,
2271 raw_proc_reference: &str,
2272) -> Result<PySpyResult, ApiError> {
2273 let handler = resolve_proc_handler(state, raw_proc_reference).await?;
2274 let timeout = hyperactor_config::global::get(crate::config::MESH_ADMIN_PYSPY_BRIDGE_TIMEOUT);
2275 handler
2276 .pyspy_dump(
2277 &state.bridge_cx,
2278 PySpyOpts {
2279 threads: false,
2280 native: true,
2281 native_all: true,
2282 nonblocking: false,
2283 },
2284 timeout,
2285 )
2286 .await
2287}
2288
2289async fn pyspy_bridge(
2296 State(state): State<Arc<BridgeState>>,
2297 AxumPath(proc_reference): AxumPath<String>,
2298) -> Result<Json<PySpyResult>, ApiError> {
2299 Ok(Json(do_pyspy_dump(&state, &proc_reference).await?))
2300}
2301
2302async fn do_pyspy_profile(
2303 state: &BridgeState,
2304 raw_proc_reference: &str,
2305 opts: PySpyProfileOpts,
2306) -> Result<PySpyProfileResult, ApiError> {
2307 let max_duration =
2308 hyperactor_config::global::get(crate::config::MESH_ADMIN_PYSPY_MAX_PROFILE_DURATION);
2309 let request = ValidatedProfileRequest::try_new(&opts, max_duration)
2310 .map_err(|msg| ApiError::bad_request(msg, None))?;
2311 let bridge_timeout = request.bridge_timeout();
2312 let handler = resolve_proc_handler(state, raw_proc_reference).await?;
2313 handler
2314 .pyspy_profile(&state.bridge_cx, request, bridge_timeout)
2315 .await
2316}
2317
2318async fn pyspy_profile_svg(
2323 State(state): State<Arc<BridgeState>>,
2324 AxumPath(proc_reference): AxumPath<String>,
2325 Json(opts): Json<PySpyProfileOpts>,
2326) -> Result<axum::response::Response, ApiError> {
2327 let result = do_pyspy_profile(&state, &proc_reference, opts).await?;
2328 match result {
2329 PySpyProfileResult::Ok { svg, .. } => Ok(axum::response::Response::builder()
2330 .header("content-type", "image/svg+xml")
2331 .body(axum::body::Body::from(svg))
2332 .unwrap()),
2333 PySpyProfileResult::BinaryNotFound { searched } => Err(ApiError {
2334 code: "service_unavailable".to_string(),
2335 message: format!(
2336 "py-spy not available on target host; searched: {}",
2337 searched.join(", ")
2338 ),
2339 details: None,
2340 }),
2341 PySpyProfileResult::TimedOut {
2342 timeout_s, stderr, ..
2343 } => Err(ApiError {
2344 code: "gateway_timeout".to_string(),
2345 message: format!(
2346 "py-spy record subprocess timed out after {}s: {}",
2347 timeout_s,
2348 stderr.trim()
2349 ),
2350 details: None,
2351 }),
2352 PySpyProfileResult::ExitFailure { stderr, .. } => Err(ApiError {
2353 code: "profile_failed".to_string(),
2354 message: stderr,
2355 details: None,
2356 }),
2357 PySpyProfileResult::OutputMissing { pid, binary } => Err(ApiError {
2358 code: "profile_output_unusable".to_string(),
2359 message: format!("py-spy exited 0 but SVG file is missing (pid {pid}, {binary})"),
2360 details: None,
2361 }),
2362 PySpyProfileResult::OutputEmpty { pid, binary } => Err(ApiError {
2363 code: "profile_output_unusable".to_string(),
2364 message: format!("py-spy exited 0 but SVG output is empty (pid {pid}, {binary})"),
2365 details: None,
2366 }),
2367 PySpyProfileResult::OutputReadFailure { error, .. } => Err(ApiError {
2368 code: "internal_error".to_string(),
2369 message: format!("failed to read SVG output: {error}"),
2370 details: None,
2371 }),
2372 PySpyProfileResult::WorkerSpawnFailure { error } => Err(ApiError {
2373 code: "internal_error".to_string(),
2374 message: format!("failed to spawn profile worker actor: {error}"),
2375 details: None,
2376 }),
2377 PySpyProfileResult::SubprocessSpawnFailure { error, .. } => Err(ApiError {
2378 code: "internal_error".to_string(),
2379 message: format!("failed to execute py-spy: {error}"),
2380 details: None,
2381 }),
2382 PySpyProfileResult::WaitFailure { error, .. } => Err(ApiError {
2383 code: "internal_error".to_string(),
2384 message: format!("failed to wait for child: {error}"),
2385 details: None,
2386 }),
2387 PySpyProfileResult::TempDirFailure { error, .. } => Err(ApiError {
2388 code: "internal_error".to_string(),
2389 message: format!("failed to create temp dir: {error}"),
2390 details: None,
2391 }),
2392 }
2393}
2394
2395#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2397pub struct QueryRequest {
2398 pub sql: String,
2400}
2401
2402#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2404pub struct QueryResponse {
2405 pub rows: serde_json::Value,
2407}
2408
2409#[derive(Debug, Serialize)]
2411struct StorePyspyDumpRequest {
2412 dump_id: String,
2413 proc_ref: String,
2414 pyspy_result_json: String,
2415}
2416
2417#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
2419pub struct PyspyDumpAndStoreResponse {
2420 pub dump_id: String,
2422}
2423
2424fn require_telemetry_url(state: &BridgeState) -> Result<&str, ApiError> {
2427 state.telemetry_url.as_deref().ok_or_else(|| {
2428 ApiError::not_found("dashboard not configured (no telemetry_url provided)", None)
2429 })
2430}
2431
2432async fn query_proxy(
2439 State(state): State<Arc<BridgeState>>,
2440 axum::Json(body): axum::Json<QueryRequest>,
2441) -> Result<axum::Json<QueryResponse>, ApiError> {
2442 let telemetry_url = require_telemetry_url(&state)?;
2443
2444 let resp = state
2445 .http_client
2446 .post(format!("{}/api/query", telemetry_url))
2447 .json(&body)
2448 .send()
2449 .await
2450 .map_err(|e| ApiError {
2451 code: "proxy_error".to_string(),
2452 message: format!("failed to proxy query to dashboard: {}", e),
2453 details: None,
2454 })?;
2455
2456 let status = resp.status();
2457 let resp_body = resp.bytes().await.map_err(|e| ApiError {
2458 code: "proxy_error".to_string(),
2459 message: format!("failed to read dashboard response: {}", e),
2460 details: None,
2461 })?;
2462
2463 if !status.is_success() {
2464 let msg = serde_json::from_slice::<serde_json::Value>(&resp_body)
2466 .ok()
2467 .and_then(|v| v.get("error")?.as_str().map(String::from))
2468 .unwrap_or_else(|| format!("dashboard returned HTTP {status}"));
2469 let code = if status.is_client_error() {
2470 "bad_request"
2471 } else {
2472 "proxy_error"
2473 };
2474 return Err(ApiError {
2475 code: code.to_string(),
2476 message: msg,
2477 details: None,
2478 });
2479 }
2480
2481 let result: QueryResponse = serde_json::from_slice(&resp_body).map_err(|e| ApiError {
2482 code: "proxy_error".to_string(),
2483 message: format!("failed to parse dashboard response: {}", e),
2484 details: None,
2485 })?;
2486
2487 Ok(axum::Json(result))
2488}
2489
2490async fn pyspy_dump_and_store(
2499 State(state): State<Arc<BridgeState>>,
2500 AxumPath(proc_reference): AxumPath<String>,
2501) -> Result<axum::Json<PyspyDumpAndStoreResponse>, ApiError> {
2502 let telemetry_url = require_telemetry_url(&state)?;
2503 let pyspy_result = do_pyspy_dump(&state, &proc_reference).await?;
2504
2505 let dump_id = uuid::Uuid::new_v4().to_string();
2506 let pyspy_json = serde_json::to_string(&pyspy_result).map_err(|e| ApiError {
2507 code: "internal_error".to_string(),
2508 message: format!("failed to serialize PySpyResult: {}", e),
2509 details: None,
2510 })?;
2511
2512 let store_body = StorePyspyDumpRequest {
2513 dump_id: dump_id.clone(),
2514 proc_ref: proc_reference,
2515 pyspy_result_json: pyspy_json,
2516 };
2517
2518 let store_resp = state
2519 .http_client
2520 .post(format!("{}/api/pyspy_dump", telemetry_url))
2521 .json(&store_body)
2522 .send()
2523 .await
2524 .map_err(|e| ApiError {
2525 code: "proxy_error".to_string(),
2526 message: format!("failed to store pyspy dump in dashboard: {}", e),
2527 details: None,
2528 })?;
2529
2530 if !store_resp.status().is_success() {
2531 return Err(ApiError {
2532 code: "proxy_error".to_string(),
2533 message: format!(
2534 "dashboard rejected pyspy dump store: HTTP {}",
2535 store_resp.status()
2536 ),
2537 details: None,
2538 });
2539 }
2540
2541 Ok(axum::Json(PyspyDumpAndStoreResponse { dump_id }))
2542}
2543
2544async fn config_bridge(
2549 State(state): State<Arc<BridgeState>>,
2550 AxumPath(proc_reference): AxumPath<String>,
2551) -> Result<Json<ConfigDumpResult>, ApiError> {
2552 let handler = route_proc_handler(&proc_reference)?;
2553 let timeout =
2554 hyperactor_config::global::get(crate::config::MESH_ADMIN_CONFIG_DUMP_BRIDGE_TIMEOUT);
2555 let result = handler.config_dump(&state.bridge_cx, timeout).await?;
2556 Ok(Json(result))
2557}
2558
2559async fn resolve_reference_bridge(
2572 State(state): State<Arc<BridgeState>>,
2573 AxumPath(reference): AxumPath<String>,
2574) -> Result<Json<NodePayloadDto>, ApiError> {
2575 let reference = reference.trim_start_matches('/');
2577 if reference.is_empty() {
2578 return Err(ApiError::bad_request("empty reference", None));
2579 }
2580 let reference = urlencoding::decode(reference)
2581 .map(|cow| cow.into_owned())
2582 .map_err(|_| {
2583 ApiError::bad_request(
2584 "malformed percent-encoding: decoded bytes are not valid UTF-8",
2585 None,
2586 )
2587 })?;
2588
2589 let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2592 tracing::warn!("mesh admin: rejecting resolve request (503): too many concurrent requests");
2593 ApiError {
2594 code: "service_unavailable".to_string(),
2595 message: "too many concurrent introspection requests".to_string(),
2596 details: None,
2597 }
2598 })?;
2599
2600 let cx = &state.bridge_cx;
2601 let resolve_start = std::time::Instant::now();
2602 let response = tokio::time::timeout(
2603 hyperactor_config::global::get(crate::config::MESH_ADMIN_SINGLE_HOST_TIMEOUT),
2604 state.admin_ref.resolve(cx, reference.clone()),
2605 )
2606 .await
2607 .map_err(|_| {
2608 tracing::warn!(
2609 reference = %reference,
2610 elapsed_ms = resolve_start.elapsed().as_millis() as u64,
2611 "mesh admin: resolve timed out (gateway_timeout)",
2612 );
2613 ApiError {
2614 code: "gateway_timeout".to_string(),
2615 message: "timed out resolving reference".to_string(),
2616 details: None,
2617 }
2618 })?
2619 .map_err(|e| ApiError {
2620 code: "internal_error".to_string(),
2621 message: format!("failed to resolve reference: {}", e),
2622 details: None,
2623 })?;
2624
2625 match response.0 {
2626 Ok(payload) => Ok(Json(NodePayloadDto::from(payload))),
2627 Err(error) => Err(ApiError::not_found(error, None)),
2628 }
2629}
2630
2631async fn tree_dump(
2656 State(state): State<Arc<BridgeState>>,
2657 headers: axum::http::header::HeaderMap,
2658) -> Result<String, ApiError> {
2659 let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
2661 tracing::warn!(
2662 "mesh admin: rejecting tree_dump request (503): too many concurrent requests"
2663 );
2664 ApiError {
2665 code: "service_unavailable".to_string(),
2666 message: "too many concurrent introspection requests".to_string(),
2667 details: None,
2668 }
2669 })?;
2670
2671 let cx = &state.bridge_cx;
2672
2673 let host = headers
2675 .get("host")
2676 .and_then(|v| v.to_str().ok())
2677 .unwrap_or("localhost");
2678 let scheme = headers
2679 .get("x-forwarded-proto")
2680 .and_then(|v| v.to_str().ok())
2681 .unwrap_or("http");
2682 let base_url = format!("{}://{}", scheme, host);
2683
2684 let root_resp = tokio::time::timeout(
2686 hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2687 state.admin_ref.resolve(cx, "root".to_string()),
2688 )
2689 .await
2690 .map_err(|_| ApiError {
2691 code: "gateway_timeout".to_string(),
2692 message: "timed out resolving root".to_string(),
2693 details: None,
2694 })?
2695 .map_err(|e| ApiError {
2696 code: "internal_error".to_string(),
2697 message: format!("failed to resolve root: {}", e),
2698 details: None,
2699 })?;
2700
2701 let root = root_resp.0.map_err(|e| ApiError {
2702 code: "internal_error".to_string(),
2703 message: e,
2704 details: None,
2705 })?;
2706
2707 let mut output = String::new();
2708
2709 for child_ref in &root.children {
2713 let child_ref_str = child_ref.to_string();
2714 let resp = tokio::time::timeout(
2715 hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2716 state.admin_ref.resolve(cx, child_ref_str.clone()),
2717 )
2718 .await;
2719
2720 let payload = match resp {
2721 Ok(Ok(r)) => r.0.ok(),
2722 _ => None,
2723 };
2724
2725 match payload {
2726 Some(node) if matches!(node.properties, NodeProperties::Host { .. }) => {
2727 let header = match &node.properties {
2728 NodeProperties::Host { addr, .. } => addr.clone(),
2729 _ => child_ref_str.clone(),
2730 };
2731 let host_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2732 output.push_str(&format!("{} -> {}\n", header, host_url));
2733
2734 let num_procs = node.children.len();
2735 for (i, proc_ref) in node.children.iter().enumerate() {
2736 let proc_ref_str = proc_ref.to_string();
2737 let is_last_proc = i == num_procs - 1;
2738 let proc_connector = if is_last_proc {
2739 "└── "
2740 } else {
2741 "├── "
2742 };
2743 let proc_name = derive_tree_label(proc_ref);
2744 let proc_url =
2745 format!("{}/v1/{}", base_url, urlencoding::encode(&proc_ref_str));
2746 output.push_str(&format!(
2747 "{}{} -> {}\n",
2748 proc_connector, proc_name, proc_url
2749 ));
2750
2751 let proc_resp = tokio::time::timeout(
2752 hyperactor_config::global::get(crate::config::MESH_ADMIN_TREE_TIMEOUT),
2753 state.admin_ref.resolve(cx, proc_ref_str),
2754 )
2755 .await;
2756 let proc_payload = match proc_resp {
2757 Ok(Ok(r)) => r.0.ok(),
2758 _ => None,
2759 };
2760 if let Some(proc_node) = proc_payload {
2761 let num_actors = proc_node.children.len();
2762 let child_prefix = if is_last_proc { " " } else { "│ " };
2763 for (j, actor_ref) in proc_node.children.iter().enumerate() {
2764 let actor_ref_str = actor_ref.to_string();
2765 let actor_connector = if j == num_actors - 1 {
2766 "└── "
2767 } else {
2768 "├── "
2769 };
2770 let actor_label = derive_actor_label(actor_ref);
2771 let actor_url =
2772 format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2773 output.push_str(&format!(
2774 "{}{}{} -> {}\n",
2775 child_prefix, actor_connector, actor_label, actor_url
2776 ));
2777 }
2778 }
2779 }
2780 output.push('\n');
2781 }
2782 Some(node) if matches!(node.properties, NodeProperties::Proc { .. }) => {
2783 let proc_name = match &node.properties {
2784 NodeProperties::Proc { proc_name, .. } => proc_name.clone(),
2785 _ => child_ref_str.clone(),
2786 };
2787 let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2788 output.push_str(&format!("{} -> {}\n", proc_name, proc_url));
2789
2790 let num_actors = node.children.len();
2791 for (j, actor_ref) in node.children.iter().enumerate() {
2792 let actor_ref_str = actor_ref.to_string();
2793 let actor_connector = if j == num_actors - 1 {
2794 "└── "
2795 } else {
2796 "├── "
2797 };
2798 let actor_label = derive_actor_label(actor_ref);
2799 let actor_url =
2800 format!("{}/v1/{}", base_url, urlencoding::encode(&actor_ref_str));
2801 output.push_str(&format!(
2802 "{}{} -> {}\n",
2803 actor_connector, actor_label, actor_url
2804 ));
2805 }
2806 output.push('\n');
2807 }
2808 Some(_node) => {
2809 let label = derive_actor_label(child_ref);
2810 let url = format!("{}/v1/{}", base_url, urlencoding::encode(&child_ref_str));
2811 output.push_str(&format!("{} -> {}\n\n", label, url));
2812 }
2813 _ => {
2814 output.push_str(&format!("{} (unreachable)\n\n", child_ref));
2815 }
2816 }
2817 }
2818 Ok(output)
2819}
2820
2821fn derive_tree_label(node_ref: &crate::introspect::NodeRef) -> String {
2836 match node_ref {
2837 crate::introspect::NodeRef::Root => "root".to_string(),
2838 crate::introspect::NodeRef::Host(id) => id.proc_addr().id().to_string(),
2839 crate::introspect::NodeRef::Proc(id) => id.id().to_string(),
2840 crate::introspect::NodeRef::Actor(id) => {
2841 format!("{}[{}]", id.log_name(), id.uid())
2842 }
2843 }
2844}
2845
2846fn derive_actor_label(node_ref: &crate::introspect::NodeRef) -> String {
2847 match node_ref {
2848 crate::introspect::NodeRef::Root => "root".to_string(),
2849 crate::introspect::NodeRef::Host(id) => id.log_name().to_string(),
2850 crate::introspect::NodeRef::Proc(id) => id.id().to_string(),
2851 crate::introspect::NodeRef::Actor(id) => {
2852 format!("{}[{}]", id.log_name(), id.uid())
2853 }
2854 }
2855}
2856
2857#[non_exhaustive]
2866pub enum PublishedHandle {
2867 Mast(String),
2869}
2870
2871impl PublishedHandle {
2872 pub async fn resolve(self, _port_override: Option<u16>) -> anyhow::Result<String> {
2877 anyhow::bail!(
2878 "publication-based admin handle resolution is not yet implemented: \
2879 mesh admin placement has moved to the caller's local proc. \
2880 Discover the admin URL from startup output or another \
2881 launch-time publication instead."
2882 )
2883 }
2884}
2885
2886#[non_exhaustive]
2891pub enum AdminHandle {
2892 Url(String),
2894 Published(PublishedHandle),
2896 Unsupported(String),
2898}
2899
2900impl AdminHandle {
2901 pub fn parse(addr: &str) -> Self {
2910 if addr.starts_with("mast_conda:///") {
2912 return AdminHandle::Published(PublishedHandle::Mast(addr.to_string()));
2913 }
2914 if let Ok(parsed) = url::Url::parse(addr)
2916 && matches!(parsed.scheme(), "http" | "https")
2917 {
2918 return AdminHandle::Url(addr.to_string());
2919 }
2920 let with_scheme = format!("https://{}", addr);
2923 if let Ok(parsed) = url::Url::parse(&with_scheme)
2924 && parsed.host_str().is_some()
2925 && parsed.port().is_some()
2926 {
2927 return AdminHandle::Url(with_scheme);
2928 }
2929 AdminHandle::Unsupported(addr.to_string())
2930 }
2931
2932 pub async fn resolve(self, port_override: Option<u16>) -> anyhow::Result<String> {
2938 match self {
2939 AdminHandle::Url(url) => Ok(url),
2940 AdminHandle::Published(h) => h.resolve(port_override).await,
2941 AdminHandle::Unsupported(s) => anyhow::bail!(
2942 "unrecognized admin handle '{}': expected https://host:port or mast_conda:///job",
2943 s
2944 ),
2945 }
2946 }
2947}
2948
2949pub async fn resolve_mast_handle(
2955 handle: &str,
2956 port_override: Option<u16>,
2957) -> anyhow::Result<String> {
2958 AdminHandle::Published(PublishedHandle::Mast(handle.to_string()))
2959 .resolve(port_override)
2960 .await
2961}
2962
2963mod advertised_host {
2971 use std::net::IpAddr;
2972
2973 #[derive(Debug, PartialEq, Eq)]
2975 pub(super) enum SanIdentity {
2976 Ip(IpAddr),
2977 Dns(String),
2978 }
2979
2980 pub(super) fn from_cert_sans() -> String {
2992 let hostname = hostname::get()
2993 .unwrap_or_else(|_| "localhost".into())
2994 .into_string()
2995 .unwrap_or_else(|_| "localhost".to_string());
2996
2997 let mut candidates: Vec<(String, SanIdentity)> = Vec::new();
3001
3002 candidates.push((hostname.clone(), SanIdentity::Dns(hostname.clone())));
3004
3005 #[cfg(fbcode_build)]
3007 if let Ok(ip_str) = hyperactor::meta::host_ip::host_ipv6_address()
3008 && let Ok(ip) = ip_str.parse::<IpAddr>()
3009 {
3010 candidates.push((format!("[{}]", ip), SanIdentity::Ip(ip)));
3011 }
3012
3013 let cert_sans = load_cert_sans();
3014 let chosen = pick_candidate(&candidates, &cert_sans, &hostname);
3015
3016 if chosen != hostname && !cert_sans.is_empty() {
3017 tracing::info!("admin URL host '{}' matches cert SAN", chosen);
3018 } else if !cert_sans.is_empty() && !candidates.iter().any(|(_, id)| cert_sans.contains(id))
3019 {
3020 tracing::warn!(
3021 "no admin URL candidate matched cert SANs; falling back to hostname '{}'",
3022 hostname,
3023 );
3024 }
3025
3026 chosen
3027 }
3028
3029 fn load_cert_sans() -> Vec<SanIdentity> {
3036 use std::io::BufReader;
3037
3038 use x509_parser::prelude::*;
3039
3040 let bundle = match hyperactor::channel::try_tls_pem_bundle() {
3041 Some(b) => b,
3042 None => return Vec::new(),
3043 };
3044
3045 let cert_pem = match bundle.cert.reader() {
3046 Ok(r) => {
3047 let mut buf = Vec::new();
3048 if std::io::Read::read_to_end(&mut BufReader::new(r), &mut buf).is_err() {
3049 return Vec::new();
3050 }
3051 buf
3052 }
3053 Err(_) => return Vec::new(),
3054 };
3055
3056 let mut cursor = &cert_pem[..];
3057 let certs: Vec<_> = rustls_pemfile::certs(&mut cursor)
3058 .filter_map(|r| r.ok())
3059 .collect();
3060
3061 let leaf_der = match certs.first() {
3062 Some(c) => c,
3063 None => return Vec::new(),
3064 };
3065
3066 let (_, cert) = match X509Certificate::from_der(leaf_der.as_ref()) {
3067 Ok(parsed) => parsed,
3068 Err(e) => {
3069 tracing::warn!("failed to parse leaf cert for SAN extraction: {}", e);
3070 return Vec::new();
3071 }
3072 };
3073
3074 let mut sans = Vec::new();
3075 if let Ok(Some(san_ext)) = cert.subject_alternative_name() {
3076 for name in &san_ext.value.general_names {
3077 match name {
3078 GeneralName::DNSName(dns) => {
3079 sans.push(SanIdentity::Dns(dns.to_string()));
3080 }
3081 GeneralName::IPAddress(bytes) => {
3082 let ip = match bytes.len() {
3083 4 => IpAddr::from(<[u8; 4]>::try_from(*bytes).unwrap()),
3084 16 => IpAddr::from(<[u8; 16]>::try_from(*bytes).unwrap()),
3085 _ => continue,
3086 };
3087 sans.push(SanIdentity::Ip(ip));
3088 }
3089 _ => {}
3090 }
3091 }
3092 }
3093
3094 sans
3095 }
3096
3097 fn pick_candidate(
3100 candidates: &[(String, SanIdentity)],
3101 cert_sans: &[SanIdentity],
3102 fallback: &str,
3103 ) -> String {
3104 if cert_sans.is_empty() {
3105 return fallback.to_string();
3106 }
3107 for (url_host, identity) in candidates {
3108 if cert_sans.iter().any(|san| san == identity) {
3109 return url_host.clone();
3110 }
3111 }
3112 fallback.to_string()
3113 }
3114
3115 #[cfg(test)]
3116 mod tests {
3117 use std::net::IpAddr;
3118 use std::net::Ipv4Addr;
3119 use std::net::Ipv6Addr;
3120
3121 use super::*;
3122
3123 #[test]
3124 fn cert_covers_hostname_only_picks_hostname() {
3125 let candidates = vec![
3126 ("myhost".to_string(), SanIdentity::Dns("myhost".to_string())),
3127 (
3128 "[::1]".to_string(),
3129 SanIdentity::Ip(IpAddr::V6(Ipv6Addr::LOCALHOST)),
3130 ),
3131 ];
3132 let sans = vec![SanIdentity::Dns("myhost".to_string())];
3133 assert_eq!(pick_candidate(&candidates, &sans, "fallback"), "myhost");
3134 }
3135
3136 #[test]
3137 fn cert_covers_ip_only_picks_ip() {
3138 let ip = IpAddr::V6("2803:6084:3894:2b36:b5d3:11ef:400:0".parse().unwrap());
3139 let candidates = vec![
3140 ("myhost".to_string(), SanIdentity::Dns("myhost".to_string())),
3141 (format!("[{}]", ip), SanIdentity::Ip(ip)),
3142 ];
3143 let sans = vec![SanIdentity::Ip(ip)];
3144 assert_eq!(
3145 pick_candidate(&candidates, &sans, "fallback"),
3146 format!("[{}]", ip)
3147 );
3148 }
3149
3150 #[test]
3151 fn cert_covers_both_prefers_hostname() {
3152 let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
3153 let candidates = vec![
3154 ("myhost".to_string(), SanIdentity::Dns("myhost".to_string())),
3155 ("10.0.0.1".to_string(), SanIdentity::Ip(ip)),
3156 ];
3157 let sans = vec![SanIdentity::Dns("myhost".to_string()), SanIdentity::Ip(ip)];
3158 assert_eq!(pick_candidate(&candidates, &sans, "fallback"), "myhost");
3159 }
3160
3161 #[test]
3162 fn no_sans_returns_fallback() {
3163 let candidates = vec![("myhost".to_string(), SanIdentity::Dns("myhost".to_string()))];
3164 assert_eq!(pick_candidate(&candidates, &[], "fallback"), "fallback");
3165 }
3166
3167 #[test]
3168 fn no_candidate_matches_returns_fallback() {
3169 let candidates = vec![("myhost".to_string(), SanIdentity::Dns("myhost".to_string()))];
3170 let sans = vec![SanIdentity::Dns("otherhost".to_string())];
3171 assert_eq!(pick_candidate(&candidates, &sans, "fallback"), "fallback");
3172 }
3173 }
3174}
3175
3176#[cfg(test)]
3177mod tests {
3178 use std::net::SocketAddr;
3179
3180 use hyperactor::channel::ChannelAddr;
3181 use hyperactor::id::Label;
3182 use hyperactor::testing::ids::test_proc_id_with_addr;
3183
3184 use super::*;
3185 use crate::mesh_id::ResourceId;
3186
3187 #[derive(Debug)]
3198 #[hyperactor::export(handlers = [])]
3199 struct TestIntrospectableActor;
3200 impl Actor for TestIntrospectableActor {}
3201
3202 #[test]
3207 fn test_build_root_payload() {
3208 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
3209 let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
3210
3211 let proc1 = test_proc_id_with_addr(ChannelAddr::Tcp(addr1), "host1");
3212 let proc2 = test_proc_id_with_addr(ChannelAddr::Tcp(addr2), "host2");
3213
3214 let actor_id1 = proc1.actor_addr("mesh_agent");
3215 let actor_id2 = proc2.actor_addr("mesh_agent");
3216
3217 let ref1: ActorRef<HostAgent> = ActorRef::attest(actor_id1.clone());
3218 let ref2: ActorRef<HostAgent> = ActorRef::attest(actor_id2.clone());
3219
3220 let agent = MeshAdminAgent::new(
3221 vec![("host_a".to_string(), ref1), ("host_b".to_string(), ref2)],
3222 None,
3223 None,
3224 None,
3225 );
3226
3227 let payload = agent.build_root_payload();
3228 assert_eq!(payload.identity, crate::introspect::NodeRef::Root);
3229 assert_eq!(payload.parent, None);
3230 assert!(matches!(
3231 payload.properties,
3232 NodeProperties::Root { num_hosts: 2, .. }
3233 ));
3234 assert_eq!(payload.children.len(), 2);
3235 assert!(
3236 payload
3237 .children
3238 .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
3239 );
3240 assert!(
3241 payload
3242 .children
3243 .contains(&crate::introspect::NodeRef::Host(actor_id2.clone()))
3244 );
3245
3246 match &payload.properties {
3248 NodeProperties::Root {
3249 num_hosts,
3250 started_by,
3251 system_children,
3252 ..
3253 } => {
3254 assert_eq!(*num_hosts, 2);
3255 assert!(!started_by.is_empty());
3256 assert!(
3258 system_children.is_empty(),
3259 "LC-1: root system_children must be empty"
3260 );
3261 }
3262 other => panic!("expected Root, got {:?}", other),
3263 }
3264 }
3265
3266 #[tokio::test]
3272 async fn test_resolve_reference_tree_walk() {
3273 use hyperactor::Proc;
3274 use hyperactor::channel::ChannelTransport;
3275
3276 use crate::host::Host;
3277 use crate::host::LocalProcManager;
3278 use crate::host_mesh::host_agent::HostAgentMode;
3279 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3280 use crate::proc_agent::ProcAgent;
3281
3282 let spawn: ProcManagerSpawnFn =
3286 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3287 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3288 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3289 Host::new(manager, ChannelTransport::Unix.any())
3290 .await
3291 .unwrap();
3292 let host_addr = host.addr().clone();
3293 let system_proc = host.system_proc().clone();
3294 let host_agent_handle = system_proc
3295 .spawn(
3296 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3297 HostAgent::new(HostAgentMode::Local(host)),
3298 )
3299 .unwrap();
3300 let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3301 let host_addr_str = host_addr.to_string();
3302
3303 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3308 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3311 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3312 let admin_handle = admin_proc
3313 .spawn(
3314 MESH_ADMIN_ACTOR_NAME,
3315 MeshAdminAgent::new(
3316 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3317 None,
3318 Some("[::]:0".parse().unwrap()),
3319 None,
3320 ),
3321 )
3322 .unwrap();
3323 let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3324
3325 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3329 let (client, _handle) = client_proc.client("client").unwrap();
3330
3331 let root_resp = admin_ref
3333 .resolve(&client, "root".to_string())
3334 .await
3335 .unwrap();
3336 let root = root_resp.0.unwrap();
3337 assert_eq!(root.identity, crate::introspect::NodeRef::Root);
3338 assert!(matches!(
3339 root.properties,
3340 NodeProperties::Root { num_hosts: 1, .. }
3341 ));
3342 assert_eq!(root.parent, None);
3343 assert_eq!(root.children.len(), 1); let expected_host_ref =
3347 crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone());
3348 let host_child_ref = root
3349 .children
3350 .iter()
3351 .find(|c| **c == expected_host_ref)
3352 .expect("root children should contain the host agent (as Host ref)");
3353 let host_ref_string = host_child_ref.to_string();
3354 let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
3355 let host_node = host_resp.0.unwrap();
3356 assert_eq!(host_node.identity, expected_host_ref);
3357 assert!(
3358 matches!(host_node.properties, NodeProperties::Host { .. }),
3359 "expected Host properties, got {:?}",
3360 host_node.properties
3361 );
3362 assert_eq!(host_node.parent, Some(crate::introspect::NodeRef::Root));
3363 assert!(
3364 !host_node.children.is_empty(),
3365 "host should have at least one proc child"
3366 );
3367 match &host_node.properties {
3369 NodeProperties::Host {
3370 system_children, ..
3371 } => {
3372 assert!(
3373 system_children.is_empty(),
3374 "LC-2: host system_children must be empty"
3375 );
3376 }
3377 other => panic!("expected Host, got {:?}", other),
3378 }
3379
3380 let proc_ref = &host_node.children[0];
3382 let proc_ref_str = proc_ref.to_string();
3383 let proc_resp = admin_ref.resolve(&client, proc_ref_str).await.unwrap();
3384 let proc_node = proc_resp.0.unwrap();
3385 assert!(
3386 matches!(proc_node.properties, NodeProperties::Proc { .. }),
3387 "expected Proc properties, got {:?}",
3388 proc_node.properties
3389 );
3390 assert_eq!(proc_node.parent, Some(expected_host_ref.clone()));
3391 assert!(
3393 !proc_node.children.is_empty(),
3394 "proc should have at least one actor child"
3395 );
3396
3397 let host_agent_node_ref =
3407 crate::introspect::NodeRef::Actor(host_agent_ref.actor_addr().clone());
3408 assert!(
3409 proc_node.children.contains(&host_agent_node_ref),
3410 "system proc children {:?} should contain the host agent {:?}",
3411 proc_node.children,
3412 host_agent_node_ref
3413 );
3414
3415 let xref_resp = admin_ref
3417 .resolve(&client, host_agent_ref.actor_addr().to_string())
3418 .await
3419 .unwrap();
3420 let xref_node = xref_resp.0.unwrap();
3421
3422 assert!(
3425 matches!(xref_node.properties, NodeProperties::Actor { .. }),
3426 "host agent child resolved as plain actor should be Actor, got {:?}",
3427 xref_node.properties
3428 );
3429 }
3430
3431 #[tokio::test]
3436 async fn test_proc_properties_for_all_procs() {
3437 use std::time::Duration;
3438
3439 use hyperactor::Proc;
3440 use hyperactor::channel::ChannelTransport;
3441 use hyperactor::id::Label;
3442
3443 use crate::host::Host;
3444 use crate::host::LocalProcManager;
3445 use crate::host_mesh::host_agent::HostAgentMode;
3446 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3447 use crate::proc_agent::ProcAgent;
3448 use crate::resource;
3449 use crate::resource::ProcSpec;
3450 use crate::resource::Rank;
3451
3452 let spawn: ProcManagerSpawnFn =
3454 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3455 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3456 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3457 Host::new(manager, ChannelTransport::Unix.any())
3458 .await
3459 .unwrap();
3460 let host_addr = host.addr().clone();
3461 let system_proc_id: ProcAddr = host.system_proc().proc_addr().clone();
3462 let local_proc_id: ProcAddr = host.local_proc().proc_addr().clone();
3463 let system_proc = host.system_proc().clone();
3464 let host_agent_handle = system_proc
3465 .spawn(
3466 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3467 HostAgent::new(HostAgentMode::Local(host)),
3468 )
3469 .unwrap();
3470 let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3471 let host_addr_str = host_addr.to_string();
3472
3473 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3477 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3478 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3479 let admin_handle = admin_proc
3480 .spawn(
3481 MESH_ADMIN_ACTOR_NAME,
3482 MeshAdminAgent::new(
3483 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3484 None,
3485 Some("[::]:0".parse().unwrap()),
3486 None,
3487 ),
3488 )
3489 .unwrap();
3490 let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3491
3492 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3494 let (client, _handle) = client_proc.client("client").unwrap();
3495
3496 let user_proc_name = ResourceId::instance(Label::new("user-proc").unwrap());
3498 host_agent_ref.post(
3499 &client,
3500 resource::CreateOrUpdate {
3501 id: user_proc_name.clone(),
3502 rank: Rank::new(0),
3503 spec: ProcSpec::default(),
3504 },
3505 );
3506
3507 tokio::time::sleep(Duration::from_secs(2)).await;
3509
3510 let host_ref_string =
3512 crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone()).to_string();
3513 let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
3514 let host_node = host_resp.0.unwrap();
3515
3516 assert!(
3519 host_node.children.len() >= 3,
3520 "expected at least 3 proc children (2 system + 1 user), got {}",
3521 host_node.children.len()
3522 );
3523
3524 let mut found_system = false;
3526 let mut found_user = false;
3527 for child_ref in &host_node.children {
3528 let resp = admin_ref
3529 .resolve(&client, child_ref.to_string())
3530 .await
3531 .unwrap();
3532 let node = resp.0.unwrap();
3533 if let NodeProperties::Proc { .. } = &node.properties {
3534 if matches!(
3535 child_ref,
3536 crate::introspect::NodeRef::Proc(proc_id)
3537 if proc_id != &system_proc_id && proc_id != &local_proc_id
3538 ) {
3539 found_user = true;
3540 } else {
3541 found_system = true;
3542 }
3543 } else {
3545 }
3547 }
3548 assert!(
3549 found_system,
3550 "should have resolved at least one system proc"
3551 );
3552 assert!(found_user, "should have resolved the user proc");
3553 }
3554
3555 #[test]
3559 fn test_build_root_payload_with_root_client() {
3560 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
3561 let proc1 = ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr1), "host1");
3562 let actor_id1 = hyperactor::ActorAddr::root(proc1, Label::new("mesh_agent").unwrap());
3563 let ref1: ActorRef<HostAgent> = ActorRef::attest(actor_id1.clone());
3564
3565 let client_proc_id = ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr1), "local");
3566 let client_actor_id = client_proc_id.actor_addr("client");
3567
3568 let agent = MeshAdminAgent::new(
3569 vec![("host_a".to_string(), ref1)],
3570 Some(client_actor_id.clone()),
3571 None,
3572 None,
3573 );
3574
3575 let payload = agent.build_root_payload();
3576 assert!(matches!(
3577 payload.properties,
3578 NodeProperties::Root { num_hosts: 1, .. }
3579 ));
3580 assert_eq!(payload.children.len(), 1);
3582 assert!(
3583 payload
3584 .children
3585 .contains(&crate::introspect::NodeRef::Host(actor_id1.clone()))
3586 );
3587 }
3588
3589 #[tokio::test]
3593 async fn test_resolve_root_client_actor() {
3594 use hyperactor::channel::ChannelTransport;
3595
3596 use crate::host::Host;
3597 use crate::host::LocalProcManager;
3598 use crate::host_mesh::host_agent::HostAgentMode;
3599 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3600 use crate::proc_agent::ProcAgent;
3601
3602 let spawn: ProcManagerSpawnFn =
3604 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3605 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3606 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3607 Host::new(manager, ChannelTransport::Unix.any())
3608 .await
3609 .unwrap();
3610 let host_addr = host.addr().clone();
3611 let system_proc = host.system_proc().clone();
3612
3613 let local_proc = host.local_proc();
3616 let local_proc_id = local_proc.proc_addr().clone();
3617 let root_client_handle = local_proc.spawn("client", TestIntrospectableActor).unwrap();
3618 let root_client_ref: ActorRef<TestIntrospectableActor> = root_client_handle.bind();
3619 let root_client_actor_id = root_client_ref.actor_addr().clone();
3620
3621 let host_agent_handle = system_proc
3622 .spawn(
3623 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3624 HostAgent::new(HostAgentMode::Local(host)),
3625 )
3626 .unwrap();
3627 let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3628 let host_addr_str = host_addr.to_string();
3629
3630 let admin_proc =
3635 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3636 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3637 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3638 let admin_handle = admin_proc
3639 .spawn(
3640 MESH_ADMIN_ACTOR_NAME,
3641 MeshAdminAgent::new(
3642 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3643 Some(root_client_actor_id.clone()),
3644 Some("[::]:0".parse().unwrap()),
3645 None,
3646 ),
3647 )
3648 .unwrap();
3649 let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3650
3651 let client_proc =
3653 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3654 let (client, _handle) = client_proc.client("client").unwrap();
3655
3656 let root_resp = admin_ref
3658 .resolve(&client, "root".to_string())
3659 .await
3660 .unwrap();
3661 let root = root_resp.0.unwrap();
3662 let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone());
3663 assert!(
3664 root.children.contains(&host_node_ref),
3665 "root children {:?} should contain host {:?}",
3666 root.children,
3667 host_node_ref
3668 );
3669
3670 let host_resp = admin_ref
3672 .resolve(&client, host_node_ref.to_string())
3673 .await
3674 .unwrap();
3675 let host_node = host_resp.0.unwrap();
3676 let local_proc_node_ref = crate::introspect::NodeRef::Proc(local_proc_id.clone());
3677 assert!(
3678 host_node.children.contains(&local_proc_node_ref),
3679 "host children {:?} should contain local proc {:?}",
3680 host_node.children,
3681 local_proc_node_ref
3682 );
3683
3684 let proc_resp = admin_ref
3686 .resolve(&client, local_proc_id.to_string())
3687 .await
3688 .unwrap();
3689 let proc_node = proc_resp.0.unwrap();
3690 assert!(
3691 matches!(proc_node.properties, NodeProperties::Proc { .. }),
3692 "expected Proc properties, got {:?}",
3693 proc_node.properties
3694 );
3695 let root_client_node_ref = crate::introspect::NodeRef::Actor(root_client_actor_id.clone());
3696 assert!(
3697 proc_node.children.contains(&root_client_node_ref),
3698 "local proc children {:?} should contain root client actor {:?}",
3699 proc_node.children,
3700 root_client_node_ref
3701 );
3702
3703 let client_resp = admin_ref
3705 .resolve(&client, root_client_actor_id.to_string())
3706 .await
3707 .unwrap();
3708 let client_node = client_resp.0.unwrap();
3709 assert!(
3710 matches!(client_node.properties, NodeProperties::Actor { .. }),
3711 "expected Actor properties, got {:?}",
3712 client_node.properties
3713 );
3714 assert_eq!(
3715 client_node.parent,
3716 Some(local_proc_node_ref),
3717 "root client parent should be the local proc"
3718 );
3719 }
3720
3721 #[test]
3725 fn test_skill_md_contains_canonical_strings() {
3726 let template = SKILL_MD_TEMPLATE;
3727 assert!(
3728 template.contains("GET {base}/v1/root"),
3729 "SKILL.md must document the root endpoint"
3730 );
3731 assert!(
3732 template.contains("GET {base}/v1/{reference}"),
3733 "SKILL.md must document the reference endpoint"
3734 );
3735 assert!(
3736 template.contains("NodePayload"),
3737 "SKILL.md must mention the NodePayload response type"
3738 );
3739 assert!(
3740 template.contains("GET {base}/SKILL.md"),
3741 "SKILL.md must document itself"
3742 );
3743 assert!(
3744 template.contains("{base}"),
3745 "SKILL.md must use {{base}} placeholder for interpolation"
3746 );
3747 }
3748
3749 #[tokio::test]
3758 async fn test_navigation_identity_invariant() {
3759 use hyperactor::Proc;
3760 use hyperactor::channel::ChannelTransport;
3761
3762 use crate::host::Host;
3763 use crate::host::LocalProcManager;
3764 use crate::host_mesh::host_agent::HostAgentMode;
3765 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3766 use crate::proc_agent::ProcAgent;
3767
3768 let spawn: ProcManagerSpawnFn =
3770 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3771 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3772 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3773 Host::new(manager, ChannelTransport::Unix.any())
3774 .await
3775 .unwrap();
3776 let host_addr = host.addr().clone();
3777 let system_proc = host.system_proc().clone();
3778 let host_agent_handle = system_proc
3779 .spawn(
3780 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3781 HostAgent::new(HostAgentMode::Local(host)),
3782 )
3783 .unwrap();
3784 let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3785 let host_addr_str = host_addr.to_string();
3786
3787 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3791 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3792 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3793 let admin_handle = admin_proc
3794 .spawn(
3795 MESH_ADMIN_ACTOR_NAME,
3796 MeshAdminAgent::new(
3797 vec![(host_addr_str, host_agent_ref)],
3798 None,
3799 Some("[::]:0".parse().unwrap()),
3800 None,
3801 ),
3802 )
3803 .unwrap();
3804 let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3805
3806 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3807 let (client, _handle) = client_proc.client("client").unwrap();
3808
3809 let mut queue: std::collections::VecDeque<(String, Option<crate::introspect::NodeRef>)> =
3812 std::collections::VecDeque::new();
3813 queue.push_back(("root".to_string(), None));
3814
3815 let mut visited = std::collections::HashSet::new();
3816 while let Some((ref_str, expected_parent)) = queue.pop_front() {
3817 if !visited.insert(ref_str.clone()) {
3818 continue;
3819 }
3820
3821 let resp = admin_ref.resolve(&client, ref_str.clone()).await.unwrap();
3822 let node = resp.0.unwrap();
3823
3824 assert_eq!(
3826 node.identity.to_string(),
3827 ref_str,
3828 "identity mismatch: resolved '{}' but payload.identity = '{}'",
3829 ref_str,
3830 node.identity
3831 );
3832
3833 assert_eq!(
3835 node.parent, expected_parent,
3836 "parent mismatch for '{}': expected {:?}, got {:?}",
3837 ref_str, expected_parent, node.parent
3838 );
3839
3840 for child_ref in &node.children {
3843 let child_str = child_ref.to_string();
3844 if !visited.contains(&child_str) {
3845 queue.push_back((child_str, Some(node.identity.clone())));
3846 }
3847 }
3848 }
3849
3850 assert!(
3853 visited.len() >= 4,
3854 "expected at least 4 nodes in the tree, visited {}",
3855 visited.len()
3856 );
3857 }
3858
3859 #[tokio::test]
3861 async fn test_system_proc_identity() {
3862 use hyperactor::Proc;
3863 use hyperactor::channel::ChannelTransport;
3864
3865 use crate::host::Host;
3866 use crate::host::LocalProcManager;
3867 use crate::host_mesh::host_agent::HostAgentMode;
3868 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3869 use crate::proc_agent::ProcAgent;
3870
3871 let spawn: ProcManagerSpawnFn =
3873 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3874 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
3875 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3876 Host::new(manager, ChannelTransport::Unix.any())
3877 .await
3878 .unwrap();
3879 let host_addr = host.addr().clone();
3880 let system_proc = host.system_proc().clone();
3881 let system_proc_id = system_proc.proc_addr().clone();
3882 let host_agent_handle = system_proc
3883 .spawn(
3884 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
3885 HostAgent::new(HostAgentMode::Local(host)),
3886 )
3887 .unwrap();
3888 let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
3889 let host_addr_str = host_addr.to_string();
3890
3891 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3896 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3897 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3898 let admin_handle = admin_proc
3899 .spawn(
3900 MESH_ADMIN_ACTOR_NAME,
3901 MeshAdminAgent::new(
3902 vec![(host_addr_str.clone(), host_agent_ref.clone())],
3903 None,
3904 Some("[::]:0".parse().unwrap()),
3905 None,
3906 ),
3907 )
3908 .unwrap();
3909 let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
3910
3911 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3913 let (client, _handle) = client_proc.client("client").unwrap();
3914
3915 let host_ref_str =
3917 crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone()).to_string();
3918 let host_resp = admin_ref
3919 .resolve(&client, host_ref_str.clone())
3920 .await
3921 .unwrap();
3922 let host_node = host_resp.0.unwrap();
3923 assert!(
3924 !host_node.children.is_empty(),
3925 "host should have at least one proc child"
3926 );
3927
3928 let system_children = match &host_node.properties {
3930 NodeProperties::Host {
3931 system_children, ..
3932 } => system_children.clone(),
3933 other => panic!("expected Host properties, got {:?}", other),
3934 };
3935 assert!(
3937 system_children.is_empty(),
3938 "host system_children should be empty (procs are never system), got {:?}",
3939 system_children
3940 );
3941 assert!(
3943 matches!(&host_node.properties, NodeProperties::Host { .. }),
3944 "expected Host properties"
3945 );
3946
3947 let expected_system_ref = crate::introspect::NodeRef::Proc(system_proc_id.clone());
3949 assert!(
3950 host_node.children.contains(&expected_system_ref),
3951 "host children {:?} should contain the system proc ref {:?}",
3952 host_node.children,
3953 expected_system_ref
3954 );
3955
3956 let proc_child_ref = &host_node.children[0];
3958 let proc_resp = admin_ref
3959 .resolve(&client, proc_child_ref.to_string())
3960 .await
3961 .unwrap();
3962 let proc_node = proc_resp.0.unwrap();
3963
3964 assert_eq!(
3965 proc_node.identity, *proc_child_ref,
3966 "identity must match the proc ref from the host's children list"
3967 );
3968
3969 assert!(
3970 matches!(proc_node.properties, NodeProperties::Proc { .. }),
3971 "expected NodeProperties::Proc, got {:?}",
3972 proc_node.properties
3973 );
3974
3975 let host_node_ref = crate::introspect::NodeRef::Host(host_agent_ref.actor_addr().clone());
3976 assert_eq!(
3977 proc_node.parent,
3978 Some(host_node_ref),
3979 "proc parent should be the host reference"
3980 );
3981
3982 assert!(
3984 proc_node.as_of > std::time::UNIX_EPOCH,
3985 "as_of should be after the epoch"
3986 );
3987
3988 assert!(
3990 matches!(&proc_node.properties, NodeProperties::Proc { .. }),
3991 "expected Proc properties"
3992 );
3993 }
3994
3995 #[test]
3999 fn test_admin_handle_parse_https_url() {
4000 let h = super::AdminHandle::parse("https://myhost:1729");
4001 assert!(matches!(h, super::AdminHandle::Url(u) if u == "https://myhost:1729"));
4002 }
4003
4004 #[test]
4005 fn test_admin_handle_parse_bare_host_port() {
4006 let h = super::AdminHandle::parse("myhost:1729");
4008 assert!(
4009 matches!(h, super::AdminHandle::Url(ref u) if u == "https://myhost:1729"),
4010 "bare host:port should become https://host:port, got: {:?}",
4011 matches!(h, super::AdminHandle::Url(_))
4012 );
4013 }
4014
4015 #[test]
4016 fn test_admin_handle_parse_mast() {
4017 let h = super::AdminHandle::parse("mast_conda:///my-job");
4018 assert!(matches!(
4019 h,
4020 super::AdminHandle::Published(super::PublishedHandle::Mast(_))
4021 ));
4022 }
4023
4024 #[test]
4025 fn test_admin_handle_parse_unsupported() {
4026 let h = super::AdminHandle::parse("junk_hostname_no_port");
4028 assert!(matches!(h, super::AdminHandle::Unsupported(_)));
4029 }
4030
4031 #[tokio::test]
4032 async fn test_admin_handle_resolve_url_returns_url() {
4033 let h = super::AdminHandle::parse("https://myhost:1729");
4034 let result = h.resolve(None).await.unwrap();
4035 assert_eq!(result, "https://myhost:1729");
4036 }
4037
4038 #[tokio::test]
4039 async fn test_admin_handle_resolve_published_returns_error() {
4040 let h = super::AdminHandle::parse("mast_conda:///test-job");
4041 let err = format!("{:#}", h.resolve(Some(1729)).await.unwrap_err());
4042 assert!(
4043 err.contains("not yet implemented"),
4044 "expected 'not yet implemented' in error, got: {}",
4045 err
4046 );
4047 }
4048
4049 #[tokio::test]
4050 async fn test_admin_handle_resolve_unsupported_returns_error() {
4051 let h = super::AdminHandle::parse("junk_hostname_no_port");
4052 let err = format!("{:#}", h.resolve(None).await.unwrap_err());
4053 assert!(
4054 err.contains("unrecognized admin handle"),
4055 "expected 'unrecognized admin handle' in error, got: {}",
4056 err
4057 );
4058 }
4059
4060 #[tokio::test]
4063 async fn test_resolve_mast_handle_returns_not_yet_implemented_error() {
4064 let result = super::resolve_mast_handle("mast_conda:///test-job", Some(1729)).await;
4065 let err = format!("{:#}", result.unwrap_err());
4066 assert!(
4067 err.contains("not yet implemented"),
4068 "expected 'not yet implemented' in error, got: {}",
4069 err
4070 );
4071 }
4072
4073 #[test]
4077 fn test_admin_info_new_derives_host_from_url() {
4078 let info = super::AdminInfo::new(
4079 "actor".to_string(),
4080 "proc".to_string(),
4081 "https://myhost.example.com:1729".to_string(),
4082 )
4083 .unwrap();
4084 assert_eq!(info.host, "myhost.example.com");
4085 assert_eq!(info.url, "https://myhost.example.com:1729");
4086 }
4087
4088 #[test]
4090 fn test_admin_info_new_rejects_invalid_url() {
4091 let result = super::AdminInfo::new(
4092 "actor".to_string(),
4093 "proc".to_string(),
4094 "not a url".to_string(),
4095 );
4096 assert!(result.is_err(), "invalid URL must be rejected");
4097 }
4098
4099 #[test]
4101 fn test_admin_info_new_rejects_url_without_host() {
4102 let result = super::AdminInfo::new(
4104 "actor".to_string(),
4105 "proc".to_string(),
4106 "data:text/plain,hello".to_string(),
4107 );
4108 assert!(result.is_err(), "URL without host must be rejected");
4109 }
4110
4111 #[tokio::test]
4116 async fn test_spawn_admin_places_on_caller_proc() {
4117 use hyperactor::Proc;
4118 use hyperactor::channel::ChannelTransport;
4119 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
4120
4121 use crate::host_mesh::HostMesh;
4122
4123 let host_mesh = HostMesh::local().await.unwrap();
4125
4126 let caller_proc = Proc::direct(ChannelTransport::Unix.any(), "caller".to_string()).unwrap();
4128 let _supervision = ProcSupervisionCoordinator::set(&caller_proc).await.unwrap();
4129 let (caller_cx, _caller_handle) = caller_proc.client("caller").unwrap();
4130
4131 let admin_ref = crate::host_mesh::spawn_admin(
4133 [&host_mesh],
4134 &caller_cx,
4135 Some("[::]:0".parse().unwrap()),
4136 None,
4137 )
4138 .await
4139 .unwrap();
4140
4141 let admin_url = admin_ref
4145 .get_admin_addr(&caller_cx)
4146 .await
4147 .unwrap()
4148 .addr
4149 .expect("SA-5: admin must report an address");
4150 assert!(
4151 !admin_url.is_empty(),
4152 "spawn_admin ref must yield a non-empty URL"
4153 );
4154 }
4155
4156 #[tokio::test]
4174 async fn test_proc_children_reflect_directly_spawned_actors() {
4175 use hyperactor::Proc;
4176 use hyperactor::actor::ActorStatus;
4177 use hyperactor::channel::ChannelTransport;
4178 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
4179
4180 use crate::host::Host;
4181 use crate::host::LocalProcManager;
4182 use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
4183 use crate::host_mesh::host_agent::HostAgent;
4184 use crate::host_mesh::host_agent::HostAgentMode;
4185 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
4186 use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
4187 use crate::proc_agent::ProcAgent;
4188
4189 let spawn_fn: ProcManagerSpawnFn =
4197 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
4198 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn_fn);
4199 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
4200 Host::new(manager, ChannelTransport::Unix.any())
4201 .await
4202 .unwrap();
4203 let system_proc = host.system_proc().clone();
4204 let host_agent_handle = system_proc
4205 .spawn(
4206 HOST_MESH_AGENT_ACTOR_NAME,
4207 HostAgent::new(HostAgentMode::Local(host)),
4208 )
4209 .unwrap();
4210 let host_agent_ref: ActorRef<HostAgent> = host_agent_handle.bind();
4211
4212 let user_proc =
4214 Proc::direct(ChannelTransport::Unix.any(), "user_proc".to_string()).unwrap();
4215 let user_proc_addr = user_proc.proc_addr().addr().to_string();
4216 let agent_handle = ProcAgent::boot_v1(user_proc.clone(), None).unwrap();
4217 agent_handle
4218 .status()
4219 .wait_for(|s| matches!(s, ActorStatus::Idle))
4220 .await
4221 .unwrap();
4222
4223 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
4229 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
4230 let admin_handle = admin_proc
4231 .spawn(
4232 MESH_ADMIN_ACTOR_NAME,
4233 MeshAdminAgent::new(
4234 vec![(user_proc_addr, host_agent_ref.clone())],
4235 None,
4236 Some("[::]:0".parse().unwrap()),
4237 None,
4238 ),
4239 )
4240 .unwrap();
4241 let admin_ref: ActorRef<MeshAdminAgent> = admin_handle.bind();
4242
4243 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
4244 let (client, _client_handle) = client_proc.client("client").unwrap();
4245
4246 let user_proc_ref = user_proc.proc_addr().to_string();
4250 let resp = admin_ref
4251 .resolve(&client, user_proc_ref.clone())
4252 .await
4253 .unwrap();
4254 let node = resp.0.unwrap();
4255 assert!(
4256 matches!(node.properties, NodeProperties::Proc { .. }),
4257 "expected Proc, got {:?}",
4258 node.properties
4259 );
4260 let initial_count = node.children.len();
4261 assert!(
4262 node.children
4263 .iter()
4264 .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
4265 "initial children {:?} should contain proc_agent",
4266 node.children
4267 );
4268
4269 user_proc
4272 .spawn("extra_actor", TestIntrospectableActor)
4273 .unwrap();
4274
4275 let resp2 = admin_ref
4278 .resolve(&client, user_proc_ref.clone())
4279 .await
4280 .unwrap();
4281 let node2 = resp2.0.unwrap();
4282 assert!(
4283 matches!(node2.properties, NodeProperties::Proc { .. }),
4284 "expected Proc, got {:?}",
4285 node2.properties
4286 );
4287 assert!(
4288 node2
4289 .children
4290 .iter()
4291 .any(|c| c.to_string().contains("extra_actor")),
4292 "after direct spawn, children {:?} should contain extra_actor",
4293 node2.children
4294 );
4295 assert!(
4296 node2.children.len() > initial_count,
4297 "expected at least {} children after direct spawn, got {:?}",
4298 initial_count + 1,
4299 node2.children
4300 );
4301 }
4302
4303 #[test]
4310 fn pyspy_parse_empty_reference() {
4311 let err = parse_proc_reference("").unwrap_err();
4313 assert_eq!(err.code, "bad_request");
4314 assert!(err.message.contains("empty"));
4315 }
4316
4317 #[test]
4318 fn pyspy_parse_slash_only() {
4319 let err = parse_proc_reference("/").unwrap_err();
4321 assert_eq!(err.code, "bad_request");
4322 assert!(err.message.contains("empty"));
4323 }
4324
4325 #[test]
4326 fn pyspy_parse_malformed_percent_encoding() {
4327 let err = parse_proc_reference("%FF%FE").unwrap_err();
4330 assert_eq!(err.code, "bad_request");
4331 assert!(err.message.contains("percent-encoding"));
4332 }
4333
4334 #[test]
4335 fn pyspy_parse_invalid_proc_id() {
4336 let err = parse_proc_reference("not-a-valid-proc-id").unwrap_err();
4338 assert_eq!(err.code, "bad_request");
4339 assert!(err.message.contains("invalid proc reference"));
4340 }
4341
4342 #[test]
4343 fn pyspy_parse_valid_proc_reference() {
4344 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4346 let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
4347 let proc_id_str = proc_id.to_string();
4348
4349 let (decoded, parsed) = parse_proc_reference(&proc_id_str).unwrap();
4350 assert_eq!(decoded, proc_id_str);
4351 assert_eq!(parsed, proc_id);
4352 }
4353
4354 #[test]
4355 fn pyspy_parse_strips_leading_slash() {
4356 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4358 let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "myproc");
4359 let with_slash = format!("/{}", proc_id);
4360
4361 let (_, parsed) = parse_proc_reference(&with_slash).unwrap();
4362 assert_eq!(parsed, proc_id);
4363 }
4364
4365 #[test]
4367 fn route_proc_handler_service_proc_yields_host() {
4368 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4369 let proc_id = ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr), SERVICE_PROC_NAME);
4370 let handler = route_proc_handler(&proc_id.to_string()).unwrap();
4371 assert!(
4372 matches!(handler, ResolvedProcHandler::Host(_)),
4373 "service proc should resolve to Host variant"
4374 );
4375 }
4376
4377 #[test]
4379 fn route_proc_handler_worker_proc_yields_proc() {
4380 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4381 let proc_id = test_proc_id_with_addr(ChannelAddr::Tcp(addr), "worker_0");
4382 let handler = route_proc_handler(&proc_id.to_string()).unwrap();
4383 assert!(
4384 matches!(handler, ResolvedProcHandler::Proc(_)),
4385 "non-service proc should resolve to Proc variant"
4386 );
4387 }
4388
4389 #[test]
4391 fn route_proc_handler_service_instance_yields_proc() {
4392 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
4393 let proc_id =
4394 ResourceId::proc_addr_from_name(ChannelAddr::Tcp(addr), "service-deadbeefdeadbeef");
4395 let handler = route_proc_handler(&proc_id.to_string()).unwrap();
4396 assert!(
4397 matches!(handler, ResolvedProcHandler::Proc(_)),
4398 "service-labeled instance proc should resolve to Proc variant"
4399 );
4400 }
4401}