1use std::collections::HashMap;
252use std::io;
253use std::sync::Arc;
254use std::time::Duration;
255
256use async_trait::async_trait;
257use axum::Json;
258use axum::Router;
259use axum::extract::Path as AxumPath;
260use axum::extract::State;
261use axum::http::StatusCode;
262use axum::response::IntoResponse;
263use axum::routing::get;
264use hyperactor::Actor;
265use hyperactor::ActorHandle;
266use hyperactor::Context;
267use hyperactor::HandleClient;
268use hyperactor::Handler;
269use hyperactor::Instance;
270use hyperactor::RefClient;
271use hyperactor::channel::try_tls_acceptor;
272use hyperactor::introspect::IntrospectMessage;
273use hyperactor::introspect::IntrospectResult;
274use hyperactor::mailbox::open_once_port;
275use hyperactor::reference as hyperactor_reference;
276use serde::Deserialize;
277use serde::Serialize;
278use serde_json::Value;
279use tokio::net::TcpListener;
280use tokio_rustls::TlsAcceptor;
281use typeuri::Named;
282
283use crate::host_mesh::host_agent::HostAgent;
284use crate::host_mesh::host_agent::HostId;
285use crate::introspect::NodePayload;
286use crate::introspect::NodeProperties;
287use crate::introspect::to_node_payload;
288
289async fn query_introspect(
292 cx: &hyperactor::Context<'_, MeshAdminAgent>,
293 actor_id: &hyperactor_reference::ActorId,
294 view: hyperactor::introspect::IntrospectView,
295 timeout: Duration,
296 err_ctx: &str,
297) -> Result<IntrospectResult, anyhow::Error> {
298 let introspect_port =
299 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
300 let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
301 introspect_port.send(
302 cx,
303 IntrospectMessage::Query {
304 view,
305 reply: reply_handle.bind(),
306 },
307 )?;
308 tokio::time::timeout(timeout, reply_rx.recv())
309 .await
310 .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
311 .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
312}
313
314async fn query_child_introspect(
316 cx: &hyperactor::Context<'_, MeshAdminAgent>,
317 actor_id: &hyperactor_reference::ActorId,
318 child_ref: hyperactor_reference::Reference,
319 timeout: Duration,
320 err_ctx: &str,
321) -> Result<IntrospectResult, anyhow::Error> {
322 let introspect_port =
323 hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(actor_id);
324 let (reply_handle, reply_rx) = open_once_port::<IntrospectResult>(cx);
325 introspect_port.send(
326 cx,
327 IntrospectMessage::QueryChild {
328 child_ref,
329 reply: reply_handle.bind(),
330 },
331 )?;
332 tokio::time::timeout(timeout, reply_rx.recv())
333 .await
334 .map_err(|_| anyhow::anyhow!("timed out {}", err_ctx))?
335 .map_err(|e| anyhow::anyhow!("failed to receive {}: {}", err_ctx, e))
336}
337
338pub const MESH_ADMIN_ACTOR_NAME: &str = "mesh_admin";
340
341pub const MESH_ADMIN_BRIDGE_NAME: &str = "mesh_admin_bridge";
358
359const SINGLE_HOST_TIMEOUT: Duration = Duration::from_secs(3);
364
365const QUERY_CHILD_TIMEOUT: Duration = Duration::from_millis(100);
376
377fn resolve_actor_timeout() -> Duration {
379 hyperactor_config::global::get(crate::config::MESH_ADMIN_RESOLVE_ACTOR_TIMEOUT)
380}
381
382fn max_concurrent_resolves() -> usize {
384 hyperactor_config::global::get(crate::config::MESH_ADMIN_MAX_CONCURRENT_RESOLVES)
385}
386
387#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
390pub struct ApiError {
391 pub code: String,
393 pub message: String,
395 #[serde(skip_serializing_if = "Option::is_none")]
399 pub details: Option<Value>,
400}
401
402#[derive(Debug, Serialize, Deserialize, schemars::JsonSchema)]
404pub struct ApiErrorEnvelope {
405 pub error: ApiError,
406}
407
408impl ApiError {
409 pub fn not_found(message: impl Into<String>, details: Option<Value>) -> Self {
411 Self {
412 code: "not_found".to_string(),
413 message: message.into(),
414 details,
415 }
416 }
417
418 pub fn bad_request(message: impl Into<String>, details: Option<Value>) -> Self {
420 Self {
421 code: "bad_request".to_string(),
422 message: message.into(),
423 details,
424 }
425 }
426}
427
428impl IntoResponse for ApiError {
429 fn into_response(self) -> axum::response::Response {
430 let status = match self.code.as_str() {
431 "not_found" => StatusCode::NOT_FOUND,
432 "bad_request" => StatusCode::BAD_REQUEST,
433 "gateway_timeout" => StatusCode::GATEWAY_TIMEOUT,
434 "service_unavailable" => StatusCode::SERVICE_UNAVAILABLE,
435 _ => StatusCode::INTERNAL_SERVER_ERROR,
436 };
437 let envelope = ApiErrorEnvelope { error: self };
438 (status, Json(envelope)).into_response()
439 }
440}
441
442#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
447pub struct MeshAdminAddrResponse {
448 pub addr: Option<String>,
449}
450wirevalue::register_type!(MeshAdminAddrResponse);
451
452#[derive(
458 Debug,
459 Clone,
460 PartialEq,
461 Serialize,
462 Deserialize,
463 Handler,
464 HandleClient,
465 RefClient,
466 Named
467)]
468pub enum MeshAdminMessage {
469 GetAdminAddr {
474 #[reply]
475 reply: hyperactor_reference::OncePortRef<MeshAdminAddrResponse>,
476 },
477}
478wirevalue::register_type!(MeshAdminMessage);
479
480#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
483pub struct ResolveReferenceResponse(pub Result<NodePayload, String>);
484wirevalue::register_type!(ResolveReferenceResponse);
485
486#[derive(
503 Debug,
504 Clone,
505 PartialEq,
506 Serialize,
507 Deserialize,
508 Handler,
509 HandleClient,
510 RefClient,
511 Named
512)]
513pub enum ResolveReferenceMessage {
514 Resolve {
519 reference_string: String,
522 #[reply]
524 reply: hyperactor_reference::OncePortRef<ResolveReferenceResponse>,
525 },
526}
527wirevalue::register_type!(ResolveReferenceMessage);
528
529#[hyperactor::export(handlers = [MeshAdminMessage, ResolveReferenceMessage])]
542pub struct MeshAdminAgent {
543 hosts: HashMap<String, hyperactor_reference::ActorRef<HostAgent>>,
546
547 host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String>,
557
558 root_client_actor_id: Option<hyperactor_reference::ActorId>,
563
564 self_actor_id: Option<hyperactor_reference::ActorId>,
569
570 admin_addr_override: Option<std::net::SocketAddr>,
587
588 admin_addr: Option<std::net::SocketAddr>,
591
592 admin_host: Option<String>,
595
596 started_at: String,
598
599 started_by: String,
601}
602
603impl MeshAdminAgent {
604 pub fn new(
622 hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
623 root_client_actor_id: Option<hyperactor_reference::ActorId>,
624 admin_addr: Option<std::net::SocketAddr>,
625 ) -> Self {
626 let host_agents_by_actor_id: HashMap<hyperactor_reference::ActorId, String> = hosts
627 .iter()
628 .map(|(addr, agent_ref)| (agent_ref.actor_id().clone(), addr.clone()))
629 .collect();
630
631 let started_at = chrono::Utc::now().to_rfc3339();
633 let started_by = std::env::var("USER")
634 .or_else(|_| std::env::var("USERNAME"))
635 .unwrap_or_else(|_| "unknown".to_string());
636
637 Self {
638 hosts: hosts.into_iter().collect(),
639 host_agents_by_actor_id,
640 root_client_actor_id,
641 self_actor_id: None,
642 admin_addr_override: admin_addr,
643 admin_addr: None,
644 admin_host: None,
645 started_at,
646 started_by,
647 }
648 }
649}
650
651impl std::fmt::Debug for MeshAdminAgent {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 f.debug_struct("MeshAdminAgent")
654 .field("hosts", &self.hosts.keys().collect::<Vec<_>>())
655 .field("host_agents", &self.host_agents_by_actor_id.len())
656 .field("root_client_actor_id", &self.root_client_actor_id)
657 .field("self_actor_id", &self.self_actor_id)
658 .field("admin_addr", &self.admin_addr)
659 .field("admin_host", &self.admin_host)
660 .field("started_at", &self.started_at)
661 .field("started_by", &self.started_by)
662 .finish()
663 }
664}
665
666struct BridgeState {
675 admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent>,
678 bridge_cx: Instance<()>,
686 resolve_semaphore: tokio::sync::Semaphore,
690 _bridge_handle: ActorHandle<()>,
692}
693
694struct TlsListener {
701 tcp: TcpListener,
702 acceptor: TlsAcceptor,
703}
704
705impl axum::serve::Listener for TlsListener {
706 type Io = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
707 type Addr = std::net::SocketAddr;
708
709 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
710 loop {
711 let (stream, addr) = match self.tcp.accept().await {
712 Ok(conn) => conn,
713 Err(e) => {
714 tracing::warn!("TCP accept error: {}", e);
715 continue;
716 }
717 };
718
719 match self.acceptor.accept(stream).await {
720 Ok(tls_stream) => return (tls_stream, addr),
721 Err(e) => {
722 tracing::warn!("TLS handshake failed from {}: {}", addr, e);
723 continue;
724 }
725 }
726 }
727 }
728
729 fn local_addr(&self) -> io::Result<Self::Addr> {
730 self.tcp.local_addr()
731 }
732}
733
734#[async_trait]
735impl Actor for MeshAdminAgent {
736 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
757 this.bind::<Self>();
761 this.set_system();
762 self.self_actor_id = Some(this.self_id().clone());
763
764 let bind_addr = match self.admin_addr_override {
765 Some(addr) => addr,
766 None => hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR)
767 .parse_socket_addr()
768 .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?,
769 };
770 let listener = TcpListener::bind(bind_addr).await?;
771 let bound_addr = listener.local_addr()?;
772 let host = hostname::get()
776 .unwrap_or_else(|_| "localhost".into())
777 .into_string()
778 .unwrap_or_else(|_| "localhost".to_string());
779 self.admin_addr = Some(bound_addr);
780
781 let enforce_mtls = cfg!(fbcode_build);
785 let tls_acceptor = try_tls_acceptor(enforce_mtls);
786
787 if enforce_mtls && tls_acceptor.is_none() {
788 return Err(anyhow::anyhow!(
789 "mesh admin requires mTLS but no TLS certificates found; \
790 set HYPERACTOR_TLS_CERT/KEY/CA or ensure Meta cert paths exist \
791 (/var/facebook/x509_identities/server.pem, /var/facebook/rootcanal/ca.pem)"
792 ));
793 }
794
795 let scheme = if tls_acceptor.is_some() {
796 "https"
797 } else {
798 "http"
799 };
800 self.admin_host = Some(format!("{}://{}:{}", scheme, host, bound_addr.port()));
801
802 let (bridge_cx, bridge_handle) = this
806 .proc()
807 .introspectable_instance(MESH_ADMIN_BRIDGE_NAME)?;
808 bridge_cx.set_system();
809 let bridge_state = Arc::new(BridgeState {
810 admin_ref: hyperactor_reference::ActorRef::attest(this.self_id().clone()),
811 bridge_cx,
812 resolve_semaphore: tokio::sync::Semaphore::new(max_concurrent_resolves()),
813 _bridge_handle: bridge_handle,
814 });
815 let router = create_mesh_admin_router(bridge_state);
816
817 if let Some(acceptor) = tls_acceptor {
818 let tls_listener = TlsListener {
819 tcp: listener,
820 acceptor,
821 };
822 tokio::spawn(async move {
823 if let Err(e) = axum::serve(tls_listener, router).await {
824 tracing::error!("mesh admin server (mTLS) error: {}", e);
825 }
826 });
827 } else {
828 tokio::spawn(async move {
830 if let Err(e) = axum::serve(listener, router).await {
831 tracing::error!("mesh admin server error: {}", e);
832 }
833 });
834 }
835
836 tracing::info!(
837 "mesh admin server listening on {}",
838 self.admin_host.as_deref().unwrap_or("unknown")
839 );
840 Ok(())
841 }
842
843 async fn handle_undeliverable_message(
857 &mut self,
858 _cx: &Instance<Self>,
859 hyperactor::mailbox::Undeliverable(envelope): hyperactor::mailbox::Undeliverable<
860 hyperactor::mailbox::MessageEnvelope,
861 >,
862 ) -> Result<(), anyhow::Error> {
863 tracing::debug!(
864 "admin agent: undeliverable message to {} (port not bound?), ignoring",
865 envelope.dest(),
866 );
867 Ok(())
868 }
869}
870
871#[async_trait]
874impl Handler<MeshAdminMessage> for MeshAdminAgent {
875 async fn handle(
882 &mut self,
883 cx: &Context<Self>,
884 msg: MeshAdminMessage,
885 ) -> Result<(), anyhow::Error> {
886 match msg {
887 MeshAdminMessage::GetAdminAddr { reply } => {
888 let resp = MeshAdminAddrResponse {
889 addr: self.admin_host.clone(),
890 };
891 if let Err(e) = reply.send(cx, resp) {
892 tracing::debug!("GetAdminAddr reply failed (caller gone?): {e}");
893 }
894 }
895 }
896 Ok(())
897 }
898}
899
900#[async_trait]
903impl Handler<ResolveReferenceMessage> for MeshAdminAgent {
904 async fn handle(
912 &mut self,
913 cx: &Context<Self>,
914 msg: ResolveReferenceMessage,
915 ) -> Result<(), anyhow::Error> {
916 match msg {
917 ResolveReferenceMessage::Resolve {
918 reference_string,
919 reply,
920 } => {
921 let response = ResolveReferenceResponse(
922 self.resolve_reference(cx, &reference_string)
923 .await
924 .map_err(|e| format!("{:#}", e)),
925 );
926 if let Err(e) = reply.send(cx, response) {
927 tracing::debug!("Resolve reply failed (caller gone?): {e}");
928 }
929 }
930 }
931 Ok(())
932 }
933}
934
935impl MeshAdminAgent {
936 async fn resolve_reference(
955 &self,
956 cx: &Context<'_, Self>,
957 reference_string: &str,
958 ) -> Result<NodePayload, anyhow::Error> {
959 if reference_string == "root" {
960 return Ok(self.build_root_payload());
961 }
962
963 if let Ok(host_id) = reference_string.parse::<HostId>() {
968 return self.resolve_host_node(cx, &host_id.0).await;
969 }
970
971 let reference: hyperactor_reference::Reference = reference_string
972 .parse()
973 .map_err(|e| anyhow::anyhow!("invalid reference '{}': {}", reference_string, e))?;
974
975 match &reference {
976 hyperactor_reference::Reference::Proc(proc_id) => {
977 match self.resolve_proc_node(cx, proc_id).await {
981 Ok(payload) => Ok(payload),
982 Err(_) if self.standalone_proc_anchor(proc_id).is_some() => {
983 self.resolve_standalone_proc_node(cx, proc_id).await
984 }
985 Err(e) => Err(e),
986 }
987 }
988 hyperactor_reference::Reference::Actor(actor_id) => {
989 self.resolve_actor_node(cx, actor_id).await
990 }
991 _ => Err(anyhow::anyhow!(
992 "unsupported reference type: {}",
993 reference_string
994 )),
995 }
996 }
997
998 fn standalone_proc_actors(&self) -> impl Iterator<Item = &hyperactor_reference::ActorId> {
1006 std::iter::empty()
1007 }
1008
1009 fn standalone_proc_anchor(
1012 &self,
1013 proc_id: &hyperactor_reference::ProcId,
1014 ) -> Option<&hyperactor_reference::ActorId> {
1015 self.standalone_proc_actors()
1016 .find(|actor_id| *actor_id.proc_id() == *proc_id)
1017 }
1018
1019 fn is_standalone_proc_actor(&self, actor_id: &hyperactor_reference::ActorId) -> bool {
1021 self.standalone_proc_actors()
1022 .any(|a| *a.proc_id() == *actor_id.proc_id())
1023 }
1024
1025 fn build_root_payload(&self) -> NodePayload {
1032 let children: Vec<String> = self
1033 .hosts
1034 .values()
1035 .map(|agent| HostId(agent.actor_id().clone()).to_string())
1036 .collect();
1037 let system_children: Vec<String> = Vec::new();
1038 let mut attrs = hyperactor_config::Attrs::new();
1039 attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
1040 attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
1041 if let Ok(t) = humantime::parse_rfc3339(&self.started_at) {
1042 attrs.set(crate::introspect::STARTED_AT, t);
1043 }
1044 attrs.set(crate::introspect::STARTED_BY, self.started_by.clone());
1045 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1046 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1047 NodePayload {
1048 identity: "root".to_string(),
1049 properties: crate::introspect::derive_properties(&attrs_json),
1050 children,
1051 parent: None,
1052 as_of: hyperactor::introspect::format_timestamp(std::time::SystemTime::now()),
1053 }
1054 }
1055
1056 async fn resolve_host_node(
1065 &self,
1066 cx: &Context<'_, Self>,
1067 actor_id: &hyperactor_reference::ActorId,
1068 ) -> Result<NodePayload, anyhow::Error> {
1069 let result = query_introspect(
1070 cx,
1071 actor_id,
1072 hyperactor::introspect::IntrospectView::Entity,
1073 SINGLE_HOST_TIMEOUT,
1074 "querying host agent",
1075 )
1076 .await?;
1077 Ok(crate::introspect::to_node_payload_with(
1078 result,
1079 HostId(actor_id.clone()).to_string(),
1080 Some("root".to_string()),
1081 ))
1082 }
1083
1084 async fn resolve_proc_node(
1095 &self,
1096 cx: &Context<'_, Self>,
1097 proc_id: &hyperactor_reference::ProcId,
1098 ) -> Result<NodePayload, anyhow::Error> {
1099 let host_addr = proc_id.addr().to_string();
1100
1101 let agent = self
1102 .hosts
1103 .get(&host_addr)
1104 .ok_or_else(|| anyhow::anyhow!("host not found: {}", host_addr))?;
1105
1106 let result = query_child_introspect(
1108 cx,
1109 agent.actor_id(),
1110 hyperactor_reference::Reference::Proc(proc_id.clone()),
1111 QUERY_CHILD_TIMEOUT,
1112 "querying proc details",
1113 )
1114 .await?;
1115
1116 let payload = to_node_payload(result);
1119 if !matches!(payload.properties, NodeProperties::Error { .. }) {
1120 return Ok(payload);
1121 }
1122
1123 let mesh_agent_id = proc_id.actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0);
1125 let result = query_child_introspect(
1126 cx,
1127 &mesh_agent_id,
1128 hyperactor_reference::Reference::Proc(proc_id.clone()),
1129 resolve_actor_timeout(),
1130 "querying proc mesh agent",
1131 )
1132 .await?;
1133
1134 Ok(crate::introspect::to_node_payload_with(
1135 result,
1136 proc_id.to_string(),
1137 Some(HostId(agent.actor_id().clone()).to_string()),
1138 ))
1139 }
1140
1141 async fn resolve_standalone_proc_node(
1155 &self,
1156 cx: &Context<'_, Self>,
1157 proc_id: &hyperactor_reference::ProcId,
1158 ) -> Result<NodePayload, anyhow::Error> {
1159 let actor_id = self
1160 .standalone_proc_anchor(proc_id)
1161 .ok_or_else(|| anyhow::anyhow!("no anchor actor for standalone proc {}", proc_id))?;
1162
1163 let (children, system_children) = if self.self_actor_id.as_ref() == Some(actor_id) {
1164 let self_ref = actor_id.to_string();
1167 (vec![self_ref.clone()], vec![self_ref])
1168 } else {
1169 let actor_result = query_introspect(
1171 cx,
1172 actor_id,
1173 hyperactor::introspect::IntrospectView::Actor,
1174 SINGLE_HOST_TIMEOUT,
1175 &format!("querying anchor actor on {}", proc_id),
1176 )
1177 .await?;
1178 let actor_payload = to_node_payload(actor_result);
1180 let anchor_ref = actor_id.to_string();
1182 let anchor_is_system = matches!(
1183 &actor_payload.properties,
1184 NodeProperties::Actor {
1185 is_system: true,
1186 ..
1187 }
1188 );
1189
1190 let mut children = vec![anchor_ref.clone()];
1191 let mut system_children = Vec::new();
1192 if anchor_is_system {
1193 system_children.push(anchor_ref);
1194 }
1195
1196 for child_ref in actor_payload.children {
1198 if let Ok(child_actor_id) = child_ref.parse::<hyperactor_reference::ActorId>() {
1199 let child_is_system = if let Ok(r) = query_introspect(
1200 cx,
1201 &child_actor_id,
1202 hyperactor::introspect::IntrospectView::Actor,
1203 resolve_actor_timeout(),
1204 "querying child actor is_system",
1205 )
1206 .await
1207 {
1208 let p = to_node_payload(r);
1209 matches!(
1210 &p.properties,
1211 NodeProperties::Actor {
1212 is_system: true,
1213 ..
1214 }
1215 )
1216 } else {
1217 false
1218 };
1219 if child_is_system {
1220 system_children.push(child_ref.clone());
1221 }
1222 }
1223 children.push(child_ref);
1224 }
1225 (children, system_children)
1226 };
1227
1228 let proc_name = proc_id.name().to_string();
1229
1230 let mut attrs = hyperactor_config::Attrs::new();
1232 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
1233 attrs.set(crate::introspect::PROC_NAME, proc_name.clone());
1234 attrs.set(crate::introspect::NUM_ACTORS, children.len());
1235 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children.clone());
1236 let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
1237
1238 Ok(NodePayload {
1239 identity: proc_id.to_string(),
1240 properties: crate::introspect::derive_properties(&attrs_json),
1241 children,
1242 as_of: hyperactor::introspect::format_timestamp(std::time::SystemTime::now()),
1243 parent: Some("root".to_string()),
1244 })
1245 }
1246
1247 async fn resolve_actor_node(
1261 &self,
1262 cx: &Context<'_, Self>,
1263 actor_id: &hyperactor_reference::ActorId,
1264 ) -> Result<NodePayload, anyhow::Error> {
1265 let result = if self.self_actor_id.as_ref() == Some(actor_id) {
1270 cx.introspect_payload()
1271 } else if self.is_standalone_proc_actor(actor_id) {
1272 query_introspect(
1274 cx,
1275 actor_id,
1276 hyperactor::introspect::IntrospectView::Actor,
1277 SINGLE_HOST_TIMEOUT,
1278 &format!("querying actor {}", actor_id),
1279 )
1280 .await?
1281 } else {
1282 let proc_id = actor_id.proc_id();
1284 let mesh_agent_id = proc_id.actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0);
1285 let terminated = query_child_introspect(
1286 cx,
1287 &mesh_agent_id,
1288 hyperactor_reference::Reference::Actor(actor_id.clone()),
1289 QUERY_CHILD_TIMEOUT,
1290 "querying terminated snapshot",
1291 )
1292 .await
1293 .ok()
1294 .filter(|r| {
1295 let p = crate::introspect::derive_properties(&r.attrs);
1296 !matches!(p, NodeProperties::Error { .. })
1297 });
1298
1299 match terminated {
1300 Some(snapshot) => snapshot,
1301 None => {
1302 query_introspect(
1304 cx,
1305 actor_id,
1306 hyperactor::introspect::IntrospectView::Actor,
1307 resolve_actor_timeout(),
1308 &format!("querying actor {}", actor_id),
1309 )
1310 .await?
1311 }
1312 }
1313 };
1314 let mut payload = to_node_payload(result);
1315
1316 if self.is_standalone_proc_actor(actor_id) {
1318 payload.parent = Some(actor_id.proc_id().to_string());
1319 return Ok(payload);
1320 }
1321
1322 let proc_id = actor_id.proc_id();
1327 match &payload.properties {
1328 NodeProperties::Proc { .. } => {
1329 let host_addr = proc_id.addr().to_string();
1331 if let Some(agent) = self.hosts.get(&host_addr) {
1332 payload.parent = Some(HostId(agent.actor_id().clone()).to_string());
1333 }
1334 }
1335 _ => {
1336 payload.parent = Some(proc_id.to_string());
1343 }
1344 }
1345
1346 Ok(payload)
1347 }
1348}
1349
1350fn create_mesh_admin_router(bridge_state: Arc<BridgeState>) -> Router {
1360 Router::new()
1361 .route("/SKILL.md", get(serve_skill_md))
1362 .route("/v1/schema", get(serve_schema))
1364 .route("/v1/schema/error", get(serve_error_schema))
1365 .route("/v1/openapi.json", get(serve_openapi))
1366 .route("/v1/tree", get(tree_dump))
1367 .route("/v1/{*reference}", get(resolve_reference_bridge))
1368 .with_state(bridge_state)
1369}
1370
1371const SKILL_MD_TEMPLATE: &str = include_str!("mesh_admin_skill.md");
1373
1374fn extract_base_url(headers: &axum::http::HeaderMap) -> String {
1380 let host = headers
1381 .get(axum::http::header::HOST)
1382 .and_then(|v| v.to_str().ok())
1383 .unwrap_or("localhost");
1384 let scheme = headers
1385 .get("x-forwarded-proto")
1386 .and_then(|v| v.to_str().ok())
1387 .unwrap_or("https");
1388 format!("{scheme}://{host}")
1389}
1390
1391async fn serve_skill_md(headers: axum::http::HeaderMap) -> impl axum::response::IntoResponse {
1394 let base = extract_base_url(&headers);
1395 let body = SKILL_MD_TEMPLATE.replace("{base}", &base);
1396 (
1397 [(
1398 axum::http::header::CONTENT_TYPE,
1399 "text/markdown; charset=utf-8",
1400 )],
1401 body,
1402 )
1403}
1404
1405fn schema_with_id<T: schemars::JsonSchema>(id: &str) -> Result<serde_json::Value, ApiError> {
1407 let schema = schemars::schema_for!(T);
1408 let mut value = serde_json::to_value(schema).map_err(|e| ApiError {
1409 code: "internal_error".to_string(),
1410 message: format!("failed to serialize schema: {e}"),
1411 details: None,
1412 })?;
1413 if let Some(obj) = value.as_object_mut() {
1414 obj.insert("$id".into(), serde_json::Value::String(id.into()));
1415 }
1416 Ok(value)
1417}
1418
1419async fn serve_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1421 Ok(axum::response::Json(schema_with_id::<
1422 crate::introspect::NodePayload,
1423 >(
1424 "https://monarch.meta.com/schemas/v1/node_payload",
1425 )?))
1426}
1427
1428async fn serve_error_schema() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1430 Ok(axum::response::Json(schema_with_id::<ApiErrorEnvelope>(
1431 "https://monarch.meta.com/schemas/v1/error",
1432 )?))
1433}
1434
1435fn hoist_defs(
1439 schema: &mut serde_json::Value,
1440 shared: &mut serde_json::Map<String, serde_json::Value>,
1441) {
1442 if let Some(obj) = schema.as_object_mut() {
1443 if let Some(defs) = obj.remove("$defs") {
1444 if let Some(defs_map) = defs.as_object() {
1445 for (k, v) in defs_map {
1446 shared.insert(k.clone(), v.clone());
1447 }
1448 }
1449 }
1450 obj.remove("$schema");
1454 }
1455 rewrite_refs(schema);
1456}
1457
1458fn rewrite_refs(value: &mut serde_json::Value) {
1461 match value {
1462 serde_json::Value::Object(map) => {
1463 if let Some(serde_json::Value::String(r)) = map.get_mut("$ref") {
1464 if r.starts_with("#/$defs/") {
1465 *r = r.replace("#/$defs/", "#/components/schemas/");
1466 }
1467 }
1468 for v in map.values_mut() {
1469 rewrite_refs(v);
1470 }
1471 }
1472 serde_json::Value::Array(arr) => {
1473 for v in arr {
1474 rewrite_refs(v);
1475 }
1476 }
1477 _ => {}
1478 }
1479}
1480
1481pub fn build_openapi_spec() -> serde_json::Value {
1484 let mut node_schema =
1485 serde_json::to_value(schemars::schema_for!(crate::introspect::NodePayload))
1486 .expect("NodePayload schema must be serializable");
1487 let mut error_schema = serde_json::to_value(schemars::schema_for!(ApiErrorEnvelope))
1488 .expect("ApiErrorEnvelope schema must be serializable");
1489
1490 let mut shared_schemas = serde_json::Map::new();
1493 hoist_defs(&mut node_schema, &mut shared_schemas);
1494 hoist_defs(&mut error_schema, &mut shared_schemas);
1495 shared_schemas.insert("NodePayload".into(), node_schema);
1496 shared_schemas.insert("ApiErrorEnvelope".into(), error_schema);
1497
1498 let error_response = |desc: &str| -> serde_json::Value {
1499 serde_json::json!({
1500 "description": desc,
1501 "content": {
1502 "application/json": {
1503 "schema": { "$ref": "#/components/schemas/ApiErrorEnvelope" }
1504 }
1505 }
1506 })
1507 };
1508
1509 let success_payload = serde_json::json!({
1510 "description": "Resolved NodePayload",
1511 "content": {
1512 "application/json": {
1513 "schema": { "$ref": "#/components/schemas/NodePayload" }
1514 }
1515 }
1516 });
1517
1518 serde_json::json!({
1519 "openapi": "3.1.0",
1520 "info": {
1521 "title": "Monarch Mesh Admin API",
1522 "version": "1.0.0",
1523 "description": "Reference-walking introspection API for a Monarch actor mesh. See the Admin Gateway Pattern RFC."
1524 },
1525 "paths": {
1526 "/v1/root": {
1527 "get": {
1528 "summary": "Fetch root node",
1529 "operationId": "getRoot",
1530 "responses": {
1531 "200": success_payload,
1532 "500": error_response("Internal error"),
1533 "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1534 "504": error_response("Gateway timeout (downstream host unresponsive)")
1535 }
1536 }
1537 },
1538 "/v1/{reference}": {
1539 "get": {
1540 "summary": "Resolve a reference to a NodePayload",
1541 "operationId": "resolveReference",
1542 "parameters": [{
1543 "name": "reference",
1544 "in": "path",
1545 "required": true,
1546 "description": "URL-encoded opaque reference string",
1547 "schema": { "type": "string" }
1548 }],
1549 "responses": {
1550 "200": success_payload,
1551 "400": error_response("Bad request (malformed reference)"),
1552 "404": error_response("Reference not found"),
1553 "500": error_response("Internal error"),
1554 "503": error_response("Service unavailable (at capacity, retry with backoff)"),
1555 "504": error_response("Gateway timeout (downstream host unresponsive)")
1556 }
1557 }
1558 },
1559 "/v1/schema": {
1560 "get": {
1561 "summary": "JSON Schema for NodePayload (Draft 2020-12)",
1562 "operationId": "getSchema",
1563 "responses": {
1564 "200": {
1565 "description": "JSON Schema document",
1566 "content": { "application/json": {} }
1567 }
1568 }
1569 }
1570 },
1571 "/v1/schema/error": {
1572 "get": {
1573 "summary": "JSON Schema for ApiErrorEnvelope (Draft 2020-12)",
1574 "operationId": "getErrorSchema",
1575 "responses": {
1576 "200": {
1577 "description": "JSON Schema document",
1578 "content": { "application/json": {} }
1579 }
1580 }
1581 }
1582 },
1583 "/v1/tree": {
1584 "get": {
1585 "summary": "ASCII topology dump (debug)",
1586 "operationId": "getTree",
1587 "responses": {
1588 "200": {
1589 "description": "Human-readable topology tree",
1590 "content": { "text/plain": {} }
1591 }
1592 }
1593 }
1594 }
1595 },
1596 "components": {
1597 "schemas": serde_json::Value::Object(shared_schemas)
1598 }
1599 })
1600}
1601
1602async fn serve_openapi() -> Result<axum::response::Json<serde_json::Value>, ApiError> {
1604 Ok(axum::response::Json(build_openapi_spec()))
1605}
1606
1607async fn resolve_reference_bridge(
1620 State(state): State<Arc<BridgeState>>,
1621 AxumPath(reference): AxumPath<String>,
1622) -> Result<Json<NodePayload>, ApiError> {
1623 let reference = reference.trim_start_matches('/');
1625 if reference.is_empty() {
1626 return Err(ApiError::bad_request("empty reference", None));
1627 }
1628 let reference = urlencoding::decode(reference)
1629 .map(|cow| cow.into_owned())
1630 .map_err(|_| {
1631 ApiError::bad_request(
1632 "malformed percent-encoding: decoded bytes are not valid UTF-8",
1633 None,
1634 )
1635 })?;
1636
1637 let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
1640 tracing::warn!("mesh admin: rejecting resolve request (503): too many concurrent requests");
1641 ApiError {
1642 code: "service_unavailable".to_string(),
1643 message: "too many concurrent introspection requests".to_string(),
1644 details: None,
1645 }
1646 })?;
1647
1648 let cx = &state.bridge_cx;
1649 let resolve_start = std::time::Instant::now();
1650 let response = tokio::time::timeout(
1651 SINGLE_HOST_TIMEOUT,
1652 state.admin_ref.resolve(cx, reference.clone()),
1653 )
1654 .await
1655 .map_err(|_| {
1656 tracing::warn!(
1657 reference = %reference,
1658 elapsed_ms = resolve_start.elapsed().as_millis() as u64,
1659 "mesh admin: resolve timed out (gateway_timeout)",
1660 );
1661 ApiError {
1662 code: "gateway_timeout".to_string(),
1663 message: "timed out resolving reference".to_string(),
1664 details: None,
1665 }
1666 })?
1667 .map_err(|e| ApiError {
1668 code: "internal_error".to_string(),
1669 message: format!("failed to resolve reference: {}", e),
1670 details: None,
1671 })?;
1672
1673 match response.0 {
1674 Ok(payload) => Ok(Json(payload)),
1675 Err(error) => Err(ApiError::not_found(error, None)),
1676 }
1677}
1678
1679const TREE_TIMEOUT: Duration = Duration::from_secs(10);
1682
1683async fn tree_dump(
1704 State(state): State<Arc<BridgeState>>,
1705 headers: axum::http::header::HeaderMap,
1706) -> Result<String, ApiError> {
1707 let _permit = state.resolve_semaphore.try_acquire().map_err(|_| {
1709 tracing::warn!(
1710 "mesh admin: rejecting tree_dump request (503): too many concurrent requests"
1711 );
1712 ApiError {
1713 code: "service_unavailable".to_string(),
1714 message: "too many concurrent introspection requests".to_string(),
1715 details: None,
1716 }
1717 })?;
1718
1719 let cx = &state.bridge_cx;
1720
1721 let host = headers
1723 .get("host")
1724 .and_then(|v| v.to_str().ok())
1725 .unwrap_or("localhost");
1726 let scheme = headers
1727 .get("x-forwarded-proto")
1728 .and_then(|v| v.to_str().ok())
1729 .unwrap_or("http");
1730 let base_url = format!("{}://{}", scheme, host);
1731
1732 let root_resp = tokio::time::timeout(
1734 TREE_TIMEOUT,
1735 state.admin_ref.resolve(cx, "root".to_string()),
1736 )
1737 .await
1738 .map_err(|_| ApiError {
1739 code: "gateway_timeout".to_string(),
1740 message: "timed out resolving root".to_string(),
1741 details: None,
1742 })?
1743 .map_err(|e| ApiError {
1744 code: "internal_error".to_string(),
1745 message: format!("failed to resolve root: {}", e),
1746 details: None,
1747 })?;
1748
1749 let root = root_resp.0.map_err(|e| ApiError {
1750 code: "internal_error".to_string(),
1751 message: e,
1752 details: None,
1753 })?;
1754
1755 let mut output = String::new();
1756
1757 for child_ref in &root.children {
1761 let resp =
1762 tokio::time::timeout(TREE_TIMEOUT, state.admin_ref.resolve(cx, child_ref.clone()))
1763 .await;
1764
1765 let payload = match resp {
1766 Ok(Ok(r)) => r.0.ok(),
1767 _ => None,
1768 };
1769
1770 match payload {
1771 Some(node) if matches!(node.properties, NodeProperties::Host { .. }) => {
1772 let header = match &node.properties {
1774 NodeProperties::Host { addr, .. } => addr.clone(),
1775 _ => child_ref.clone(),
1776 };
1777 let host_url = format!("{}/v1/{}", base_url, urlencoding::encode(child_ref));
1778 output.push_str(&format!("{} -> {}\n", header, host_url));
1779
1780 let num_procs = node.children.len();
1782 for (i, proc_ref) in node.children.iter().enumerate() {
1783 let is_last_proc = i == num_procs - 1;
1784 let proc_connector = if is_last_proc {
1785 "└── "
1786 } else {
1787 "├── "
1788 };
1789 let proc_name = derive_tree_label(proc_ref);
1790 let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(proc_ref));
1791 output.push_str(&format!(
1792 "{}{} -> {}\n",
1793 proc_connector, proc_name, proc_url
1794 ));
1795
1796 let proc_resp = tokio::time::timeout(
1798 TREE_TIMEOUT,
1799 state.admin_ref.resolve(cx, proc_ref.clone()),
1800 )
1801 .await;
1802 let proc_payload = match proc_resp {
1803 Ok(Ok(r)) => r.0.ok(),
1804 _ => None,
1805 };
1806 if let Some(proc_node) = proc_payload {
1807 let num_actors = proc_node.children.len();
1808 let child_prefix = if is_last_proc { " " } else { "│ " };
1809 for (j, actor_ref) in proc_node.children.iter().enumerate() {
1810 let actor_connector = if j == num_actors - 1 {
1811 "└── "
1812 } else {
1813 "├── "
1814 };
1815 let actor_label = derive_actor_label(actor_ref);
1816 let actor_url =
1817 format!("{}/v1/{}", base_url, urlencoding::encode(actor_ref));
1818 output.push_str(&format!(
1819 "{}{}{} -> {}\n",
1820 child_prefix, actor_connector, actor_label, actor_url
1821 ));
1822 }
1823 }
1824 }
1825 output.push('\n');
1826 }
1827 Some(node) if matches!(node.properties, NodeProperties::Proc { .. }) => {
1828 let proc_name = match &node.properties {
1831 NodeProperties::Proc { proc_name, .. } => proc_name.clone(),
1832 _ => child_ref.clone(),
1833 };
1834 let proc_url = format!("{}/v1/{}", base_url, urlencoding::encode(child_ref));
1835 output.push_str(&format!("{} -> {}\n", proc_name, proc_url));
1836
1837 let num_actors = node.children.len();
1838 for (j, actor_ref) in node.children.iter().enumerate() {
1839 let actor_connector = if j == num_actors - 1 {
1840 "└── "
1841 } else {
1842 "├── "
1843 };
1844 let actor_label = derive_actor_label(actor_ref);
1845 let actor_url = format!("{}/v1/{}", base_url, urlencoding::encode(actor_ref));
1846 output.push_str(&format!(
1847 "{}{} -> {}\n",
1848 actor_connector, actor_label, actor_url
1849 ));
1850 }
1851 output.push('\n');
1852 }
1853 Some(_node) => {
1854 let label = derive_actor_label(child_ref);
1857 let url = format!("{}/v1/{}", base_url, urlencoding::encode(child_ref));
1858 output.push_str(&format!("{} -> {}\n\n", label, url));
1859 }
1860 _ => {
1861 output.push_str(&format!("{} (unreachable)\n\n", child_ref));
1862 }
1863 }
1864 }
1865 Ok(output)
1866}
1867
1868fn derive_tree_label(reference: &str) -> String {
1883 let parts: Vec<&str> = reference.splitn(3, ',').collect();
1888 match parts.len() {
1889 3 => parts[1].to_string(),
1891 2 => parts[1].to_string(),
1893 _ => reference.to_string(),
1894 }
1895}
1896
1897fn derive_actor_label(reference: &str) -> String {
1903 let parts: Vec<&str> = reference.splitn(3, ',').collect();
1904 match parts.len() {
1905 3 => parts[2].to_string(),
1907 2 => parts[1].to_string(),
1909 _ => reference.to_string(),
1910 }
1911}
1912
1913#[derive(serde::Deserialize)]
1919struct MastStatusResponse {
1920 #[serde(rename = "latestAttempt")]
1921 latest_attempt: MastAttempt,
1922}
1923
1924#[derive(serde::Deserialize)]
1926struct MastAttempt {
1927 #[serde(rename = "taskGroupExecutionAttempts")]
1928 task_groups: std::collections::HashMap<String, Vec<MastTaskGroup>>,
1929}
1930
1931#[derive(serde::Deserialize)]
1933struct MastTaskGroup {
1934 #[serde(rename = "taskExecutionAttempts")]
1935 tasks: std::collections::HashMap<String, Vec<MastTaskAttempt>>,
1936}
1937
1938#[derive(serde::Deserialize)]
1940struct MastTaskAttempt {
1941 hostname: Option<String>,
1942}
1943
1944fn head_hostname(response: &MastStatusResponse) -> Result<String, String> {
1952 let mut hosts: Vec<(i64, String)> = Vec::new();
1953 for attempts in response.latest_attempt.task_groups.values() {
1954 let group = match attempts.last() {
1955 Some(g) => g,
1956 None => continue,
1957 };
1958 for (index_str, task_attempts) in &group.tasks {
1959 let attempt = match task_attempts.last() {
1960 Some(a) => a,
1961 None => continue,
1962 };
1963 if let Some(ref hostname) = attempt.hostname {
1964 let index = index_str.parse::<i64>().unwrap_or(i64::MAX);
1965 hosts.push((index, hostname.clone()));
1966 }
1967 }
1968 }
1969 hosts.sort_by_key(|(idx, _)| *idx);
1970 hosts
1971 .into_iter()
1972 .next()
1973 .map(|(_, h)| h)
1974 .ok_or_else(|| "no hostnames found in MAST response".to_string())
1975}
1976
1977async fn qualify_fqdn(hostname: &str) -> String {
1984 if hostname.contains('.') {
1985 return hostname.to_string();
1986 }
1987 let owned = hostname.to_string();
1988 let fallback = owned.clone();
1989 tokio::task::spawn_blocking(move || qualify_fqdn_blocking(&owned))
1990 .await
1991 .unwrap_or(fallback)
1992}
1993
1994fn qualify_fqdn_blocking(hostname: &str) -> String {
1995 use std::ffi::CStr;
1996 use std::ffi::CString;
1997 use std::ptr;
1998
1999 let c_host = match CString::new(hostname) {
2000 Ok(c) => c,
2001 Err(_) => return hostname.to_string(),
2002 };
2003 let mut hints: libc::addrinfo = unsafe { std::mem::zeroed() };
2006 hints.ai_flags = libc::AI_CANONNAME;
2007 hints.ai_family = libc::AF_UNSPEC;
2008
2009 let mut result: *mut libc::addrinfo = ptr::null_mut();
2010 let rc = unsafe { libc::getaddrinfo(c_host.as_ptr(), ptr::null(), &hints, &mut result) };
2013 if rc != 0 || result.is_null() {
2014 return hostname.to_string();
2015 }
2016
2017 struct AddrInfoGuard(*mut libc::addrinfo);
2019 impl Drop for AddrInfoGuard {
2020 fn drop(&mut self) {
2021 unsafe { libc::freeaddrinfo(self.0) }
2023 }
2024 }
2025 let _guard = AddrInfoGuard(result);
2026
2027 let canon = unsafe { (*result).ai_canonname };
2029 if canon.is_null() {
2030 return hostname.to_string();
2031 }
2032 unsafe { CStr::from_ptr(canon) }
2035 .to_str()
2036 .unwrap_or(hostname)
2037 .to_string()
2038}
2039
2040fn resolve_admin_port(port_override: Option<u16>) -> Result<u16, anyhow::Error> {
2047 match port_override {
2048 Some(p) => Ok(p),
2049 None => {
2050 let config_addr = hyperactor_config::global::get_cloned(crate::config::MESH_ADMIN_ADDR);
2051 Ok(config_addr
2052 .parse_socket_addr()
2053 .map_err(|e| anyhow::anyhow!("invalid MESH_ADMIN_ADDR config: {}", e))?
2054 .port())
2055 }
2056 }
2057}
2058
2059async fn try_resolve_mast_handle(
2068 handle: &str,
2069 port_override: Option<u16>,
2070 cmd: &str,
2071) -> anyhow::Result<String> {
2072 let port = resolve_admin_port(port_override)?;
2073 let job_name = handle
2074 .strip_prefix("mast_conda:///")
2075 .ok_or_else(|| anyhow::anyhow!("expected mast_conda:/// prefix, got '{}'", handle))?;
2076
2077 let output = match tokio::process::Command::new(cmd)
2078 .args(["get-status", "--json", job_name])
2079 .output()
2080 .await
2081 {
2082 Ok(o) => o,
2083 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2084 anyhow::bail!(
2085 "MAST CLI not found; install via `sudo feature install --persist mast_cli`"
2086 );
2087 }
2088 Err(e) => {
2089 anyhow::bail!("failed to run '{}': {}", cmd, e);
2090 }
2091 };
2092
2093 if !output.status.success() {
2094 let stderr = String::from_utf8_lossy(&output.stderr);
2095 anyhow::bail!(
2096 "'mast get-status' exited with {}: {}",
2097 output.status,
2098 stderr.trim()
2099 );
2100 }
2101
2102 let response: MastStatusResponse = serde_json::from_slice(&output.stdout)
2103 .map_err(|e| anyhow::anyhow!("failed to parse mast JSON output: {}", e))?;
2104
2105 let hostname =
2106 head_hostname(&response).map_err(|e| anyhow::anyhow!("MAST job '{}': {}", job_name, e))?;
2107
2108 let fqdn = qualify_fqdn(&hostname).await;
2109 Ok(format!("https://{}:{}", fqdn, port))
2110}
2111
2112pub async fn resolve_mast_handle(
2119 handle: &str,
2120 port_override: Option<u16>,
2121) -> anyhow::Result<String> {
2122 try_resolve_mast_handle(handle, port_override, "mast").await
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127 use std::net::SocketAddr;
2128
2129 use hyperactor::channel::ChannelAddr;
2130 use hyperactor::testing::ids::test_proc_id_with_addr;
2131
2132 use super::*;
2133
2134 #[derive(Debug)]
2145 #[hyperactor::export(handlers = [])]
2146 struct TestIntrospectableActor;
2147 impl Actor for TestIntrospectableActor {}
2148
2149 #[test]
2154 fn test_build_root_payload() {
2155 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
2156 let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
2157
2158 let proc1 = test_proc_id_with_addr(ChannelAddr::Tcp(addr1), "host1");
2159 let proc2 = test_proc_id_with_addr(ChannelAddr::Tcp(addr2), "host2");
2160
2161 let actor_id1 = proc1.actor_id("mesh_agent", 0);
2162 let actor_id2 = proc2.actor_id("mesh_agent", 0);
2163
2164 let ref1: hyperactor_reference::ActorRef<HostAgent> =
2165 hyperactor_reference::ActorRef::attest(actor_id1.clone());
2166 let ref2: hyperactor_reference::ActorRef<HostAgent> =
2167 hyperactor_reference::ActorRef::attest(actor_id2.clone());
2168
2169 let agent = MeshAdminAgent::new(
2170 vec![("host_a".to_string(), ref1), ("host_b".to_string(), ref2)],
2171 None,
2172 None,
2173 );
2174
2175 let payload = agent.build_root_payload();
2176 assert_eq!(payload.identity, "root");
2177 assert_eq!(payload.parent, None);
2178 assert!(matches!(
2179 payload.properties,
2180 NodeProperties::Root { num_hosts: 2, .. }
2181 ));
2182 assert_eq!(payload.children.len(), 2);
2183 assert!(
2185 payload
2186 .children
2187 .contains(&HostId(actor_id1.clone()).to_string())
2188 );
2189 assert!(
2190 payload
2191 .children
2192 .contains(&HostId(actor_id2.clone()).to_string())
2193 );
2194
2195 match &payload.properties {
2197 NodeProperties::Root {
2198 num_hosts,
2199 started_by,
2200 ..
2201 } => {
2202 assert_eq!(*num_hosts, 2);
2203 assert!(!started_by.is_empty());
2204 }
2205 other => panic!("expected Root, got {:?}", other),
2206 }
2207 }
2208
2209 #[tokio::test]
2215 async fn test_resolve_reference_tree_walk() {
2216 use hyperactor::Proc;
2217 use hyperactor::channel::ChannelTransport;
2218 use hyperactor::host::Host;
2219 use hyperactor::host::LocalProcManager;
2220
2221 use crate::host_mesh::host_agent::HostAgentMode;
2222 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2223 use crate::proc_agent::ProcAgent;
2224
2225 let spawn: ProcManagerSpawnFn =
2229 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2230 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2231 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2232 Host::new(manager, ChannelTransport::Unix.any())
2233 .await
2234 .unwrap();
2235 let host_addr = host.addr().clone();
2236 let system_proc = host.system_proc().clone();
2237 let host_agent_handle = system_proc
2238 .spawn(
2239 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2240 HostAgent::new(HostAgentMode::Local(host)),
2241 )
2242 .unwrap();
2243 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2244 let host_addr_str = host_addr.to_string();
2245
2246 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2248 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2251 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2252 let admin_handle = admin_proc
2253 .spawn(
2254 MESH_ADMIN_ACTOR_NAME,
2255 MeshAdminAgent::new(
2256 vec![(host_addr_str.clone(), host_agent_ref.clone())],
2257 None,
2258 Some("[::]:0".parse().unwrap()),
2259 ),
2260 )
2261 .unwrap();
2262 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2263
2264 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2268 let (client, _handle) = client_proc.instance("client").unwrap();
2269
2270 let root_resp = admin_ref
2272 .resolve(&client, "root".to_string())
2273 .await
2274 .unwrap();
2275 let root = root_resp.0.unwrap();
2276 assert_eq!(root.identity, "root");
2277 assert!(matches!(
2278 root.properties,
2279 NodeProperties::Root { num_hosts: 1, .. }
2280 ));
2281 assert_eq!(root.parent, None);
2282 assert_eq!(root.children.len(), 1); let _host_agent_id_str = host_agent_ref.actor_id().to_string();
2286 let host_ref_str = HostId(host_agent_ref.actor_id().clone()).to_string();
2287 let host_child_ref_str = root
2288 .children
2289 .iter()
2290 .find(|c| **c == host_ref_str)
2291 .expect("root children should contain the host agent (as host: ref)");
2292 let host_resp = admin_ref
2293 .resolve(&client, host_child_ref_str.clone())
2294 .await
2295 .unwrap();
2296 let host_node = host_resp.0.unwrap();
2297 assert_eq!(host_node.identity, *host_child_ref_str);
2298 assert!(
2299 matches!(host_node.properties, NodeProperties::Host { .. }),
2300 "expected Host properties, got {:?}",
2301 host_node.properties
2302 );
2303 assert_eq!(host_node.parent, Some("root".to_string()));
2304 assert!(
2306 !host_node.children.is_empty(),
2307 "host should have at least one proc child"
2308 );
2309
2310 let proc_ref_str = &host_node.children[0];
2314 let proc_resp = admin_ref
2315 .resolve(&client, proc_ref_str.clone())
2316 .await
2317 .unwrap();
2318 let proc_node = proc_resp.0.unwrap();
2319 assert!(
2320 matches!(proc_node.properties, NodeProperties::Proc { .. }),
2321 "expected Proc properties, got {:?}",
2322 proc_node.properties
2323 );
2324 assert_eq!(proc_node.parent, Some(host_child_ref_str.clone()));
2325 assert!(
2327 !proc_node.children.is_empty(),
2328 "proc should have at least one actor child"
2329 );
2330
2331 let host_agent_id_str = host_agent_ref.actor_id().to_string();
2341 assert!(
2342 proc_node.children.contains(&host_agent_id_str),
2343 "system proc children {:?} should contain the host agent {}",
2344 proc_node.children,
2345 host_agent_id_str
2346 );
2347
2348 let xref_resp = admin_ref
2350 .resolve(&client, host_agent_id_str.clone())
2351 .await
2352 .unwrap();
2353 let xref_node = xref_resp.0.unwrap();
2354
2355 assert!(
2358 matches!(xref_node.properties, NodeProperties::Actor { .. }),
2359 "host agent child resolved as plain actor should be Actor, got {:?}",
2360 xref_node.properties
2361 );
2362 }
2363
2364 #[tokio::test]
2369 async fn test_proc_properties_for_all_procs() {
2370 use std::time::Duration;
2371
2372 use hyperactor::Proc;
2373 use hyperactor::channel::ChannelTransport;
2374 use hyperactor::host::Host;
2375 use hyperactor::host::LocalProcManager;
2376
2377 use crate::Name;
2378 use crate::host_mesh::host_agent::HostAgentMode;
2379 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2380 use crate::proc_agent::ProcAgent;
2381 use crate::resource;
2382 use crate::resource::ProcSpec;
2383 use crate::resource::Rank;
2384
2385 let spawn: ProcManagerSpawnFn =
2387 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2388 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2389 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2390 Host::new(manager, ChannelTransport::Unix.any())
2391 .await
2392 .unwrap();
2393 let host_addr = host.addr().clone();
2394 let system_proc = host.system_proc().clone();
2395 let host_agent_handle = system_proc
2396 .spawn(
2397 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2398 HostAgent::new(HostAgentMode::Local(host)),
2399 )
2400 .unwrap();
2401 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2402 let host_addr_str = host_addr.to_string();
2403
2404 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2406 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2407 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2408 let admin_handle = admin_proc
2409 .spawn(
2410 MESH_ADMIN_ACTOR_NAME,
2411 MeshAdminAgent::new(
2412 vec![(host_addr_str.clone(), host_agent_ref.clone())],
2413 None,
2414 Some("[::]:0".parse().unwrap()),
2415 ),
2416 )
2417 .unwrap();
2418 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2419
2420 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2422 let (client, _handle) = client_proc.instance("client").unwrap();
2423
2424 let user_proc_name = Name::new("user_proc").unwrap();
2426 host_agent_ref
2427 .send(
2428 &client,
2429 resource::CreateOrUpdate {
2430 name: user_proc_name.clone(),
2431 rank: Rank::new(0),
2432 spec: ProcSpec::default(),
2433 },
2434 )
2435 .unwrap();
2436
2437 tokio::time::sleep(Duration::from_secs(2)).await;
2439
2440 let host_ref_string = HostId(host_agent_ref.actor_id().clone()).to_string();
2442 let host_resp = admin_ref.resolve(&client, host_ref_string).await.unwrap();
2443 let host_node = host_resp.0.unwrap();
2444
2445 assert!(
2448 host_node.children.len() >= 3,
2449 "expected at least 3 proc children (2 system + 1 user), got {}",
2450 host_node.children.len()
2451 );
2452
2453 let user_proc_name_str = user_proc_name.to_string();
2455 let mut found_system = false;
2456 let mut found_user = false;
2457 for child_ref_str in &host_node.children {
2458 let resp = admin_ref
2459 .resolve(&client, child_ref_str.clone())
2460 .await
2461 .unwrap();
2462 let node = resp.0.unwrap();
2463 if let NodeProperties::Proc { proc_name, .. } = &node.properties {
2464 if proc_name.contains(&user_proc_name_str) {
2465 found_user = true;
2466 } else {
2467 found_system = true;
2468 }
2469 } else {
2471 }
2473 }
2474 assert!(
2475 found_system,
2476 "should have resolved at least one system proc"
2477 );
2478 assert!(found_user, "should have resolved the user proc");
2479 }
2480
2481 #[test]
2485 fn test_build_root_payload_with_root_client() {
2486 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
2487 let proc1 = hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "host1");
2488 let actor_id1 = hyperactor_reference::ActorId::root(proc1, "mesh_agent".to_string());
2489 let ref1: hyperactor_reference::ActorRef<HostAgent> =
2490 hyperactor_reference::ActorRef::attest(actor_id1.clone());
2491
2492 let client_proc_id =
2493 hyperactor_reference::ProcId::with_name(ChannelAddr::Tcp(addr1), "local");
2494 let client_actor_id = client_proc_id.actor_id("client", 0);
2495
2496 let agent = MeshAdminAgent::new(
2497 vec![("host_a".to_string(), ref1)],
2498 Some(client_actor_id.clone()),
2499 None,
2500 );
2501
2502 let payload = agent.build_root_payload();
2503 assert!(matches!(
2504 payload.properties,
2505 NodeProperties::Root { num_hosts: 1, .. }
2506 ));
2507 assert_eq!(payload.children.len(), 1);
2509 assert!(
2510 payload
2511 .children
2512 .contains(&HostId(actor_id1.clone()).to_string())
2513 );
2514 }
2515
2516 #[tokio::test]
2520 async fn test_resolve_root_client_actor() {
2521 use hyperactor::channel::ChannelTransport;
2522 use hyperactor::host::Host;
2523 use hyperactor::host::LocalProcManager;
2524
2525 use crate::host_mesh::host_agent::HostAgentMode;
2526 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2527 use crate::proc_agent::ProcAgent;
2528
2529 let spawn: ProcManagerSpawnFn =
2531 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2532 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2533 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2534 Host::new(manager, ChannelTransport::Unix.any())
2535 .await
2536 .unwrap();
2537 let host_addr = host.addr().clone();
2538 let system_proc = host.system_proc().clone();
2539
2540 let local_proc = host.local_proc();
2543 let local_proc_id = local_proc.proc_id().clone();
2544 let root_client_handle = local_proc.spawn("client", TestIntrospectableActor).unwrap();
2545 let root_client_ref: hyperactor_reference::ActorRef<TestIntrospectableActor> =
2546 root_client_handle.bind();
2547 let root_client_actor_id = root_client_ref.actor_id().clone();
2548
2549 let host_agent_handle = system_proc
2550 .spawn(
2551 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2552 HostAgent::new(HostAgentMode::Local(host)),
2553 )
2554 .unwrap();
2555 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2556 let host_addr_str = host_addr.to_string();
2557
2558 let admin_proc =
2560 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2561 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2562 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2563 let admin_handle = admin_proc
2564 .spawn(
2565 MESH_ADMIN_ACTOR_NAME,
2566 MeshAdminAgent::new(
2567 vec![(host_addr_str.clone(), host_agent_ref.clone())],
2568 Some(root_client_actor_id.clone()),
2569 Some("[::]:0".parse().unwrap()),
2570 ),
2571 )
2572 .unwrap();
2573 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2574
2575 let client_proc =
2577 hyperactor::Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2578 let (client, _handle) = client_proc.instance("client").unwrap();
2579
2580 let root_resp = admin_ref
2582 .resolve(&client, "root".to_string())
2583 .await
2584 .unwrap();
2585 let root = root_resp.0.unwrap();
2586 let host_id_str = HostId(host_agent_ref.actor_id().clone()).to_string();
2587 assert!(
2588 root.children.contains(&host_id_str),
2589 "root children {:?} should contain host {}",
2590 root.children,
2591 host_id_str
2592 );
2593
2594 let host_resp = admin_ref
2596 .resolve(&client, host_id_str.clone())
2597 .await
2598 .unwrap();
2599 let host_node = host_resp.0.unwrap();
2600 let local_proc_str = local_proc_id.to_string();
2601 assert!(
2602 host_node.children.contains(&local_proc_str),
2603 "host children {:?} should contain local proc {}",
2604 host_node.children,
2605 local_proc_str
2606 );
2607
2608 let proc_resp = admin_ref
2610 .resolve(&client, local_proc_str.clone())
2611 .await
2612 .unwrap();
2613 let proc_node = proc_resp.0.unwrap();
2614 assert!(
2615 matches!(proc_node.properties, NodeProperties::Proc { .. }),
2616 "expected Proc properties, got {:?}",
2617 proc_node.properties
2618 );
2619 assert!(
2620 proc_node
2621 .children
2622 .contains(&root_client_actor_id.to_string()),
2623 "local proc children {:?} should contain root client actor {}",
2624 proc_node.children,
2625 root_client_actor_id
2626 );
2627
2628 let client_resp = admin_ref
2630 .resolve(&client, root_client_actor_id.to_string())
2631 .await
2632 .unwrap();
2633 let client_node = client_resp.0.unwrap();
2634 assert!(
2635 matches!(client_node.properties, NodeProperties::Actor { .. }),
2636 "expected Actor properties, got {:?}",
2637 client_node.properties
2638 );
2639 assert_eq!(
2640 client_node.parent,
2641 Some(local_proc_str),
2642 "root client parent should be the local proc"
2643 );
2644 }
2645
2646 #[test]
2650 fn test_skill_md_contains_canonical_strings() {
2651 let template = SKILL_MD_TEMPLATE;
2652 assert!(
2653 template.contains("GET {base}/v1/root"),
2654 "SKILL.md must document the root endpoint"
2655 );
2656 assert!(
2657 template.contains("GET {base}/v1/{reference}"),
2658 "SKILL.md must document the reference endpoint"
2659 );
2660 assert!(
2661 template.contains("NodePayload"),
2662 "SKILL.md must mention the NodePayload response type"
2663 );
2664 assert!(
2665 template.contains("GET {base}/SKILL.md"),
2666 "SKILL.md must document itself"
2667 );
2668 assert!(
2669 template.contains("{base}"),
2670 "SKILL.md must use {{base}} placeholder for interpolation"
2671 );
2672 }
2673
2674 #[tokio::test]
2683 async fn test_navigation_identity_invariant() {
2684 use hyperactor::Proc;
2685 use hyperactor::channel::ChannelTransport;
2686 use hyperactor::host::Host;
2687 use hyperactor::host::LocalProcManager;
2688
2689 use crate::host_mesh::host_agent::HostAgentMode;
2690 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2691 use crate::proc_agent::ProcAgent;
2692
2693 let spawn: ProcManagerSpawnFn =
2695 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2696 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2697 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2698 Host::new(manager, ChannelTransport::Unix.any())
2699 .await
2700 .unwrap();
2701 let host_addr = host.addr().clone();
2702 let system_proc = host.system_proc().clone();
2703 let host_agent_handle = system_proc
2704 .spawn(
2705 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2706 HostAgent::new(HostAgentMode::Local(host)),
2707 )
2708 .unwrap();
2709 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2710 let host_addr_str = host_addr.to_string();
2711
2712 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2714 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2715 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2716 let admin_handle = admin_proc
2717 .spawn(
2718 MESH_ADMIN_ACTOR_NAME,
2719 MeshAdminAgent::new(
2720 vec![(host_addr_str, host_agent_ref)],
2721 None,
2722 Some("[::]:0".parse().unwrap()),
2723 ),
2724 )
2725 .unwrap();
2726 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2727
2728 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2729 let (client, _handle) = client_proc.instance("client").unwrap();
2730
2731 let mut queue: std::collections::VecDeque<(String, Option<String>)> =
2734 std::collections::VecDeque::new();
2735 queue.push_back(("root".to_string(), None));
2736
2737 let mut visited = std::collections::HashSet::new();
2738 while let Some((ref_str, expected_parent)) = queue.pop_front() {
2739 if !visited.insert(ref_str.clone()) {
2740 continue;
2741 }
2742
2743 let resp = admin_ref.resolve(&client, ref_str.clone()).await.unwrap();
2744 let node = resp.0.unwrap();
2745
2746 assert_eq!(
2748 node.identity, ref_str,
2749 "identity mismatch: resolved '{}' but payload.identity = '{}'",
2750 ref_str, node.identity
2751 );
2752
2753 assert_eq!(
2755 node.parent, expected_parent,
2756 "parent mismatch for '{}': expected {:?}, got {:?}",
2757 ref_str, expected_parent, node.parent
2758 );
2759
2760 for child_ref in &node.children {
2763 if !visited.contains(child_ref) {
2764 queue.push_back((child_ref.clone(), Some(node.identity.clone())));
2765 }
2766 }
2767 }
2768
2769 assert!(
2772 visited.len() >= 4,
2773 "expected at least 4 nodes in the tree, visited {}",
2774 visited.len()
2775 );
2776 }
2777
2778 #[tokio::test]
2780 async fn test_system_proc_identity() {
2781 use hyperactor::Proc;
2782 use hyperactor::channel::ChannelTransport;
2783 use hyperactor::host::Host;
2784 use hyperactor::host::LocalProcManager;
2785
2786 use crate::host_mesh::host_agent::HostAgentMode;
2787 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
2788 use crate::proc_agent::ProcAgent;
2789
2790 let spawn: ProcManagerSpawnFn =
2792 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
2793 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn);
2794 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
2795 Host::new(manager, ChannelTransport::Unix.any())
2796 .await
2797 .unwrap();
2798 let host_addr = host.addr().clone();
2799 let system_proc = host.system_proc().clone();
2800 let system_proc_id = system_proc.proc_id().clone();
2801 let host_agent_handle = system_proc
2802 .spawn(
2803 crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME,
2804 HostAgent::new(HostAgentMode::Local(host)),
2805 )
2806 .unwrap();
2807 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
2808 let host_addr_str = host_addr.to_string();
2809
2810 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
2812 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
2813 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
2814 let admin_handle = admin_proc
2815 .spawn(
2816 MESH_ADMIN_ACTOR_NAME,
2817 MeshAdminAgent::new(
2818 vec![(host_addr_str.clone(), host_agent_ref.clone())],
2819 None,
2820 Some("[::]:0".parse().unwrap()),
2821 ),
2822 )
2823 .unwrap();
2824 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
2825
2826 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
2828 let (client, _handle) = client_proc.instance("client").unwrap();
2829
2830 let host_ref_str = HostId(host_agent_ref.actor_id().clone()).to_string();
2832 let host_resp = admin_ref
2833 .resolve(&client, host_ref_str.clone())
2834 .await
2835 .unwrap();
2836 let host_node = host_resp.0.unwrap();
2837 assert!(
2838 !host_node.children.is_empty(),
2839 "host should have at least one proc child"
2840 );
2841
2842 let system_children = match &host_node.properties {
2844 NodeProperties::Host {
2845 system_children, ..
2846 } => system_children.clone(),
2847 other => panic!("expected Host properties, got {:?}", other),
2848 };
2849 assert!(
2851 system_children.is_empty(),
2852 "host system_children should be empty (procs are never system), got {:?}",
2853 system_children
2854 );
2855 assert!(
2857 matches!(&host_node.properties, NodeProperties::Host { .. }),
2858 "expected Host properties"
2859 );
2860
2861 let expected_system_ref = system_proc_id.to_string();
2863 assert!(
2864 host_node.children.contains(&expected_system_ref),
2865 "host children {:?} should contain the system proc ref '{}'",
2866 host_node.children,
2867 expected_system_ref
2868 );
2869
2870 let proc_child_ref = &host_node.children[0];
2872 let proc_resp = admin_ref
2873 .resolve(&client, proc_child_ref.clone())
2874 .await
2875 .unwrap();
2876 let proc_node = proc_resp.0.unwrap();
2877
2878 assert_eq!(
2879 proc_node.identity, *proc_child_ref,
2880 "identity must match the proc ref from the host's children list"
2881 );
2882
2883 assert!(
2884 matches!(proc_node.properties, NodeProperties::Proc { .. }),
2885 "expected NodeProperties::Proc, got {:?}",
2886 proc_node.properties
2887 );
2888
2889 assert_eq!(
2890 proc_node.parent,
2891 Some(host_ref_str.clone()),
2892 "proc parent should be the host reference (host:<actor_id>)"
2893 );
2894
2895 assert!(
2896 !proc_node.as_of.is_empty(),
2897 "as_of should be present and non-empty"
2898 );
2899
2900 assert!(
2902 matches!(&proc_node.properties, NodeProperties::Proc { .. }),
2903 "expected Proc properties"
2904 );
2905 }
2906
2907 fn mast_response_from_hosts(hosts: &[(i64, &str)]) -> super::MastStatusResponse {
2916 let mut tasks = std::collections::HashMap::new();
2917 for (idx, host) in hosts {
2918 tasks.insert(
2919 idx.to_string(),
2920 vec![super::MastTaskAttempt {
2921 hostname: Some(host.to_string()),
2922 }],
2923 );
2924 }
2925 super::MastStatusResponse {
2926 latest_attempt: super::MastAttempt {
2927 task_groups: std::collections::HashMap::from([(
2928 "trainers".to_string(),
2929 vec![super::MastTaskGroup { tasks }],
2930 )]),
2931 },
2932 }
2933 }
2934
2935 #[test]
2937 fn test_head_hostname_single_group() {
2938 let response = mast_response_from_hosts(&[(2, "host2"), (0, "host0"), (1, "host1")]);
2939 let head = super::head_hostname(&response).unwrap();
2940 assert_eq!(head, "host0");
2941 }
2942
2943 #[test]
2945 fn test_head_hostname_last_attempt_wins() {
2946 let mut tasks = std::collections::HashMap::new();
2947 tasks.insert(
2948 "0".to_string(),
2949 vec![
2950 super::MastTaskAttempt {
2951 hostname: Some("old_host".to_string()),
2952 },
2953 super::MastTaskAttempt {
2954 hostname: Some("new_host".to_string()),
2955 },
2956 ],
2957 );
2958 let response = super::MastStatusResponse {
2959 latest_attempt: super::MastAttempt {
2960 task_groups: std::collections::HashMap::from([(
2961 "trainers".to_string(),
2962 vec![super::MastTaskGroup { tasks }],
2963 )]),
2964 },
2965 };
2966 let head = super::head_hostname(&response).unwrap();
2967 assert_eq!(head, "new_host");
2968 }
2969
2970 #[test]
2972 fn test_head_hostname_multiple_groups() {
2973 let mut tasks_a = std::collections::HashMap::new();
2974 tasks_a.insert(
2975 "1".to_string(),
2976 vec![super::MastTaskAttempt {
2977 hostname: Some("host_a1".to_string()),
2978 }],
2979 );
2980 let mut tasks_b = std::collections::HashMap::new();
2981 tasks_b.insert(
2982 "0".to_string(),
2983 vec![super::MastTaskAttempt {
2984 hostname: Some("host_b0".to_string()),
2985 }],
2986 );
2987 let response = super::MastStatusResponse {
2988 latest_attempt: super::MastAttempt {
2989 task_groups: std::collections::HashMap::from([
2990 (
2991 "group_a".to_string(),
2992 vec![super::MastTaskGroup { tasks: tasks_a }],
2993 ),
2994 (
2995 "group_b".to_string(),
2996 vec![super::MastTaskGroup { tasks: tasks_b }],
2997 ),
2998 ]),
2999 },
3000 };
3001 let head = super::head_hostname(&response).unwrap();
3002 assert_eq!(head, "host_b0");
3003 }
3004
3005 #[test]
3007 fn test_head_hostname_empty() {
3008 let response = super::MastStatusResponse {
3009 latest_attempt: super::MastAttempt {
3010 task_groups: std::collections::HashMap::new(),
3011 },
3012 };
3013 assert!(super::head_hostname(&response).is_err());
3014 }
3015
3016 #[test]
3019 fn test_head_hostname_skips_unallocated() {
3020 let mut tasks = std::collections::HashMap::new();
3021 tasks.insert(
3022 "0".to_string(),
3023 vec![super::MastTaskAttempt { hostname: None }],
3024 );
3025 tasks.insert(
3026 "1".to_string(),
3027 vec![super::MastTaskAttempt {
3028 hostname: Some("allocated_host".to_string()),
3029 }],
3030 );
3031 let response = super::MastStatusResponse {
3032 latest_attempt: super::MastAttempt {
3033 task_groups: std::collections::HashMap::from([(
3034 "trainers".to_string(),
3035 vec![super::MastTaskGroup { tasks }],
3036 )]),
3037 },
3038 };
3039 let head = super::head_hostname(&response).unwrap();
3040 assert_eq!(head, "allocated_host");
3041 }
3042
3043 #[tokio::test]
3046 async fn test_qualify_fqdn_already_qualified() {
3047 let fqdn = super::qualify_fqdn("fake.nonexistent.tld").await;
3048 assert_eq!(fqdn, "fake.nonexistent.tld");
3049 }
3050
3051 #[tokio::test]
3056 async fn test_qualify_fqdn_short_hostname() {
3057 let fqdn = super::qualify_fqdn("localhost").await;
3058 assert!(!fqdn.is_empty(), "qualify_fqdn returned empty string");
3059 }
3060
3061 #[tokio::test]
3064 async fn test_qualify_fqdn_nonexistent_fallback() {
3065 let input = "__nonexistent_host_for_test";
3066 let fqdn = super::qualify_fqdn(input).await;
3067 assert_eq!(fqdn, input);
3068 }
3069
3070 #[test]
3072 fn test_resolve_admin_port_override() {
3073 assert_eq!(super::resolve_admin_port(Some(8080)).unwrap(), 8080);
3074 }
3075
3076 #[test]
3079 fn test_resolve_admin_port_from_config() {
3080 let port = super::resolve_admin_port(None).unwrap();
3081 assert_eq!(port, 1729);
3082 }
3083
3084 #[tokio::test]
3086 async fn test_cli_missing_binary() {
3087 let result = super::try_resolve_mast_handle(
3088 "mast_conda:///test-job",
3089 Some(1729),
3090 "__nonexistent_mast_test_bin",
3091 )
3092 .await;
3093 let err = format!("{:#}", result.unwrap_err());
3094 assert!(
3095 err.contains("not found"),
3096 "expected 'not found' in error, got: {}",
3097 err
3098 );
3099 }
3100
3101 fn write_test_script(content: &str) -> (tempfile::TempDir, String) {
3104 use std::os::unix::fs::PermissionsExt;
3105 let dir = tempfile::tempdir().unwrap();
3106 let script_path = dir.path().join("mast_stub.sh");
3107 std::fs::write(&script_path, content).unwrap();
3108 std::fs::set_permissions(&script_path, PermissionsExt::from_mode(0o755)).unwrap();
3109 let path_str = script_path.to_str().unwrap().to_string();
3110 (dir, path_str)
3111 }
3112
3113 #[tokio::test]
3115 async fn test_cli_happy_path() {
3116 let json = r#"{"latestAttempt":{"taskGroupExecutionAttempts":{"trainers":[{"taskExecutionAttempts":{"0":[{"hostname":"devgpu042"}]}}]}}}"#;
3117 let (_dir, script_path) = write_test_script(&format!("#!/bin/sh\necho '{}'\n", json));
3118 let url =
3119 super::try_resolve_mast_handle("mast_conda:///test-job", Some(1729), &script_path)
3120 .await
3121 .unwrap();
3122 assert!(url.starts_with("https://"), "url: {}", url);
3123 assert!(url.ends_with(":1729"), "url: {}", url);
3124 }
3125
3126 #[tokio::test]
3128 async fn test_cli_malformed_json() {
3129 let (_dir, script_path) = write_test_script("#!/bin/sh\necho 'not json'\n");
3130 let result =
3131 super::try_resolve_mast_handle("mast_conda:///test-job", Some(1729), &script_path)
3132 .await;
3133 let err = format!("{:#}", result.unwrap_err());
3134 assert!(
3135 err.contains("failed to parse"),
3136 "expected parse error, got: {}",
3137 err
3138 );
3139 }
3140
3141 #[tokio::test]
3143 async fn test_cli_nonzero_exit() {
3144 let (_dir, script_path) =
3145 write_test_script("#!/bin/sh\necho >&2 'job not found'\nexit 42\n");
3146 let result =
3147 super::try_resolve_mast_handle("mast_conda:///test-job", Some(1729), &script_path)
3148 .await;
3149 let err = format!("{:#}", result.unwrap_err());
3150 assert!(
3151 err.contains("job not found"),
3152 "expected stderr in error, got: {}",
3153 err
3154 );
3155 }
3156
3157 #[tokio::test]
3160 async fn test_cli_missing_prefix() {
3161 let result = super::try_resolve_mast_handle("bad_handle", Some(1729), "mast").await;
3162 let err = format!("{:#}", result.unwrap_err());
3163 assert!(
3164 err.contains("expected mast_conda:/// prefix"),
3165 "expected prefix error, got: {}",
3166 err
3167 );
3168 }
3169
3170 #[test]
3172 fn test_head_hostname_empty_attempts_vec() {
3173 let response = super::MastStatusResponse {
3174 latest_attempt: super::MastAttempt {
3175 task_groups: std::collections::HashMap::from([(
3176 "trainers".to_string(),
3177 vec![], )]),
3179 },
3180 };
3181 assert!(super::head_hostname(&response).is_err());
3182 }
3183
3184 #[test]
3187 fn test_head_hostname_non_numeric_index() {
3188 let mut tasks = std::collections::HashMap::new();
3189 tasks.insert(
3190 "abc".to_string(),
3191 vec![super::MastTaskAttempt {
3192 hostname: Some("host_abc".to_string()),
3193 }],
3194 );
3195 tasks.insert(
3196 "0".to_string(),
3197 vec![super::MastTaskAttempt {
3198 hostname: Some("host_0".to_string()),
3199 }],
3200 );
3201 let response = super::MastStatusResponse {
3202 latest_attempt: super::MastAttempt {
3203 task_groups: std::collections::HashMap::from([(
3204 "trainers".to_string(),
3205 vec![super::MastTaskGroup { tasks }],
3206 )]),
3207 },
3208 };
3209 let head = super::head_hostname(&response).unwrap();
3210 assert_eq!(head, "host_0");
3211 }
3212
3213 #[tokio::test]
3226 async fn test_proc_children_reflect_directly_spawned_actors() {
3227 use hyperactor::Proc;
3228 use hyperactor::actor::ActorStatus;
3229 use hyperactor::channel::ChannelTransport;
3230 use hyperactor::host::Host;
3231 use hyperactor::host::LocalProcManager;
3232 use hyperactor::testing::proc_supervison::ProcSupervisionCoordinator;
3233
3234 use crate::host_mesh::host_agent::HOST_MESH_AGENT_ACTOR_NAME;
3235 use crate::host_mesh::host_agent::HostAgent;
3236 use crate::host_mesh::host_agent::HostAgentMode;
3237 use crate::host_mesh::host_agent::ProcManagerSpawnFn;
3238 use crate::proc_agent::PROC_AGENT_ACTOR_NAME;
3239 use crate::proc_agent::ProcAgent;
3240
3241 let spawn_fn: ProcManagerSpawnFn =
3249 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
3250 let manager: LocalProcManager<ProcManagerSpawnFn> = LocalProcManager::new(spawn_fn);
3251 let host: Host<LocalProcManager<ProcManagerSpawnFn>> =
3252 Host::new(manager, ChannelTransport::Unix.any())
3253 .await
3254 .unwrap();
3255 let system_proc = host.system_proc().clone();
3256 let host_agent_handle = system_proc
3257 .spawn(
3258 HOST_MESH_AGENT_ACTOR_NAME,
3259 HostAgent::new(HostAgentMode::Local(host)),
3260 )
3261 .unwrap();
3262 let host_agent_ref: hyperactor_reference::ActorRef<HostAgent> = host_agent_handle.bind();
3263
3264 let user_proc =
3266 Proc::direct(ChannelTransport::Unix.any(), "user_proc".to_string()).unwrap();
3267 let user_proc_addr = user_proc.proc_id().addr().to_string();
3268 let agent_handle = ProcAgent::boot_v1(user_proc.clone(), None).unwrap();
3269 agent_handle
3270 .status()
3271 .wait_for(|s| matches!(s, ActorStatus::Idle))
3272 .await
3273 .unwrap();
3274
3275 let admin_proc = Proc::direct(ChannelTransport::Unix.any(), "admin".to_string()).unwrap();
3279 let _supervision = ProcSupervisionCoordinator::set(&admin_proc).await.unwrap();
3280 let admin_handle = admin_proc
3281 .spawn(
3282 MESH_ADMIN_ACTOR_NAME,
3283 MeshAdminAgent::new(
3284 vec![(user_proc_addr, host_agent_ref.clone())],
3285 None,
3286 Some("[::]:0".parse().unwrap()),
3287 ),
3288 )
3289 .unwrap();
3290 let admin_ref: hyperactor_reference::ActorRef<MeshAdminAgent> = admin_handle.bind();
3291
3292 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
3293 let (client, _client_handle) = client_proc.instance("client").unwrap();
3294
3295 let user_proc_ref = user_proc.proc_id().to_string();
3299 let resp = admin_ref
3300 .resolve(&client, user_proc_ref.clone())
3301 .await
3302 .unwrap();
3303 let node = resp.0.unwrap();
3304 assert!(
3305 matches!(node.properties, NodeProperties::Proc { .. }),
3306 "expected Proc, got {:?}",
3307 node.properties
3308 );
3309 let initial_count = node.children.len();
3310 assert!(
3311 node.children
3312 .iter()
3313 .any(|c| c.contains(PROC_AGENT_ACTOR_NAME)),
3314 "initial children {:?} should contain proc_agent",
3315 node.children
3316 );
3317
3318 user_proc
3321 .spawn("extra_actor", TestIntrospectableActor)
3322 .unwrap();
3323
3324 let resp2 = admin_ref
3327 .resolve(&client, user_proc_ref.clone())
3328 .await
3329 .unwrap();
3330 let node2 = resp2.0.unwrap();
3331 assert!(
3332 matches!(node2.properties, NodeProperties::Proc { .. }),
3333 "expected Proc, got {:?}",
3334 node2.properties
3335 );
3336 assert!(
3337 node2.children.iter().any(|c| c.contains("extra_actor")),
3338 "after direct spawn, children {:?} should contain extra_actor",
3339 node2.children
3340 );
3341 assert!(
3342 node2.children.len() > initial_count,
3343 "expected at least {} children after direct spawn, got {:?}",
3344 initial_count + 1,
3345 node2.children
3346 );
3347 }
3348}