1#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fmt;
20use std::pin::Pin;
21use std::sync::OnceLock;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use hyperactor::Actor;
26use hyperactor::ActorHandle;
27use hyperactor::ActorRef;
28use hyperactor::Addr;
29use hyperactor::Context;
30use hyperactor::Endpoint as _;
31use hyperactor::HandleClient;
32use hyperactor::Handler;
33use hyperactor::Instance;
34use hyperactor::PortHandle;
35use hyperactor::PortRef;
36use hyperactor::Proc;
37use hyperactor::ProcAddr;
38use hyperactor::RefClient;
39use hyperactor::RemoteEndpoint as _;
40use hyperactor::context;
41use hyperactor::mailbox::MailboxServerHandle;
42use hyperactor_config::Flattrs;
43use hyperactor_config::attrs::Attrs;
44use serde::Deserialize;
45use serde::Serialize;
46use tokio::time::Duration;
47use typeuri::Named;
48
49use crate::bootstrap;
50use crate::bootstrap::BootstrapCommand;
51use crate::bootstrap::BootstrapProcConfig;
52use crate::bootstrap::BootstrapProcManager;
53use crate::config_dump::ConfigDump;
54use crate::config_dump::ConfigDumpResult;
55use crate::host::Host;
56use crate::host::HostError;
57use crate::host::LOCAL_PROC_NAME;
58use crate::host::LocalProcManager;
59use crate::host::SERVICE_PROC_NAME;
60use crate::host::SingleTerminate;
61use crate::mesh_id::HostMeshId;
62use crate::mesh_id::ResourceId;
63use crate::proc_agent::ProcAgent;
64use crate::pyspy::PySpyDump;
65use crate::pyspy::PySpyProfile;
66use crate::pyspy::PySpyProfileWorker;
67use crate::pyspy::PySpyWorker;
68use crate::resource;
69use crate::resource::ProcSpec;
70
71pub(crate) type ProcManagerSpawnFuture =
72 Pin<Box<dyn Future<Output = anyhow::Result<ActorHandle<ProcAgent>>> + Send>>;
73pub(crate) type ProcManagerSpawnFn = Box<dyn Fn(Proc) -> ProcManagerSpawnFuture + Send + Sync>;
74
75#[derive(EnumAsInner)]
86pub enum HostAgentMode {
87 Process {
88 host: Host<BootstrapProcManager>,
89 shutdown_tx: Option<tokio::sync::oneshot::Sender<MailboxServerHandle>>,
93 },
94 Local(Host<LocalProcManager<ProcManagerSpawnFn>>),
95}
96
97impl HostAgentMode {
98 pub(crate) fn addr(&self) -> &hyperactor::channel::ChannelAddr {
99 #[allow(clippy::match_same_arms)]
100 match self {
101 HostAgentMode::Process { host, .. } => host.addr(),
102 HostAgentMode::Local(host) => host.addr(),
103 }
104 }
105
106 pub(crate) fn system_proc(&self) -> &Proc {
107 #[allow(clippy::match_same_arms)]
108 match self {
109 HostAgentMode::Process { host, .. } => host.system_proc(),
110 HostAgentMode::Local(host) => host.system_proc(),
111 }
112 }
113
114 pub(crate) fn local_proc(&self) -> &Proc {
115 #[allow(clippy::match_same_arms)]
116 match self {
117 HostAgentMode::Process { host, .. } => host.local_proc(),
118 HostAgentMode::Local(host) => host.local_proc(),
119 }
120 }
121
122 async fn request_stop(
126 &self,
127 cx: &impl context::Actor,
128 proc: &ProcAddr,
129 timeout: Duration,
130 reason: &str,
131 ) {
132 match self {
133 HostAgentMode::Process { host, .. } => {
134 host.manager().request_stop(cx, proc, timeout, reason).await;
135 }
136 HostAgentMode::Local(host) => {
137 host.manager().request_stop(proc, timeout, reason).await;
138 }
139 }
140 }
141
142 async fn proc_status(
147 &self,
148 proc_id: &ProcAddr,
149 ) -> (resource::Status, Option<bootstrap::ProcStatus>) {
150 match self {
151 HostAgentMode::Process { host, .. } => match host.manager().status(proc_id).await {
152 Some(proc_status) => (proc_status.clone().into(), Some(proc_status)),
153 None => (resource::Status::Unknown, None),
154 },
155 HostAgentMode::Local(host) => {
156 let status = match host.manager().local_proc_status(proc_id).await {
157 Some(crate::host::LocalProcStatus::Stopping) => resource::Status::Stopping,
158 Some(crate::host::LocalProcStatus::Stopped) => resource::Status::Stopped,
159 None => resource::Status::Running,
160 };
161 (status, None)
162 }
163 }
164 }
165
166 fn bootstrap_command(&self) -> Option<BootstrapCommand> {
168 match self {
169 HostAgentMode::Process { host, .. } => Some(host.manager().command().clone()),
170 HostAgentMode::Local(_) => None,
171 }
172 }
173}
174
175#[derive(Debug)]
176pub(crate) struct ProcCreationState {
177 pub(crate) rank: usize,
178 pub(crate) host_mesh_id: Option<HostMeshId>,
179 pub(crate) created: Result<(ProcAddr, ActorRef<ProcAgent>), HostError>,
180 pub(crate) expiry_time: Option<std::time::SystemTime>,
184}
185
186pub const HOST_MESH_AGENT_ACTOR_NAME: &str = "host_agent";
188
189enum HostAgentState {
191 Detached(HostAgentMode),
194 Attached(HostAgentMode),
196 Draining,
200 Shutdown,
202}
203
204#[derive(Debug, Serialize, Deserialize, Named)]
209struct ProcStatusChanged {
210 id: ResourceId,
211}
212
213struct DrainComplete {
216 host: HostAgentMode,
217 ack: PortRef<()>,
218}
219
220#[hyperactor::export(handlers = [])]
225struct DrainWorker {
226 host: Option<HostAgentMode>,
227 timeout: Duration,
228 max_in_flight: usize,
229 ack: Option<PortRef<()>>,
230 done_notify: PortHandle<DrainComplete>,
231}
232
233#[async_trait]
234impl Actor for DrainWorker {
235 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
236 if let Some(host) = self.host.as_mut() {
237 match host {
238 HostAgentMode::Process { host, .. } => {
239 host.terminate_children(
240 this,
241 self.timeout,
242 self.max_in_flight.clamp(1, 256),
243 "drain host",
244 )
245 .await;
246 }
247 HostAgentMode::Local(host) => {
248 host.terminate_children(this, self.timeout, self.max_in_flight, "drain host")
249 .await;
250 }
251 }
252 }
253
254 if let (Some(host), Some(ack)) = (self.host.take(), self.ack.take()) {
257 let _ = self.done_notify.post(this, DrainComplete { host, ack });
258 }
259
260 Ok(())
261 }
262}
263
264impl fmt::Debug for DrainWorker {
265 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266 f.debug_struct("DrainWorker")
267 .field("timeout", &self.timeout)
268 .field("max_in_flight", &self.max_in_flight)
269 .finish()
270 }
271}
272
273#[hyperactor::export(
274 handlers=[
275 resource::CreateOrUpdate<ProcSpec>,
276 resource::Stop,
277 resource::GetState<ProcState>,
278 resource::KeepaliveGetState<ProcState>,
279 resource::StreamState<ProcState> { cast = true },
280 resource::GetRankStatus { cast = true },
281 resource::WaitRankStatus { cast = true },
282 resource::List,
283 ShutdownHost,
284 DrainHost,
285 SetClientConfig,
286 ProcStatusChanged,
287 PySpyDump,
288 PySpyProfile,
289 ConfigDump,
290 crate::proc_agent::SelfCheck,
291 ]
292)]
293pub struct HostAgent {
294 state: HostAgentState,
295 pub(crate) created: HashMap<ResourceId, ProcCreationState>,
296 pending_proc_waiters:
300 HashMap<ResourceId, Vec<(resource::Status, usize, PortRef<crate::StatusOverlay>)>>,
301 watching: HashSet<ResourceId>,
303 proc_status_port: Option<PortHandle<ProcStatusChanged>>,
305 local_mesh_agent: OnceLock<anyhow::Result<ActorHandle<ProcAgent>>>,
309 mailbox_handle: Option<MailboxServerHandle>,
315}
316
317impl HostAgent {
318 pub fn new(host: HostAgentMode) -> Self {
320 Self {
321 state: HostAgentState::Detached(host),
322 created: HashMap::new(),
323 pending_proc_waiters: HashMap::new(),
324 watching: HashSet::new(),
325 proc_status_port: None,
326 local_mesh_agent: OnceLock::new(),
327 mailbox_handle: None,
328 }
329 }
330
331 fn min_proc_status(&self) -> resource::Status {
334 match &self.state {
335 HostAgentState::Detached(_) | HostAgentState::Attached(_) => resource::Status::Running,
336 HostAgentState::Draining => resource::Status::Stopping,
337 HostAgentState::Shutdown => resource::Status::Stopped,
338 }
339 }
340
341 fn host(&self) -> Option<&HostAgentMode> {
342 match &self.state {
343 HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
344 _ => None,
345 }
346 }
347
348 fn host_mut(&mut self) -> Option<&mut HostAgentMode> {
349 match &mut self.state {
350 HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
351 _ => None,
352 }
353 }
354
355 async fn drain(
361 &mut self,
362 cx: &Context<'_, Self>,
363 timeout: std::time::Duration,
364 max_in_flight: usize,
365 ) {
366 if let Some(host_mode) = self.host_mut() {
367 match host_mode {
368 HostAgentMode::Process { host, .. } => {
369 let summary = host
370 .terminate_children(cx, timeout, max_in_flight.clamp(1, 256), "stop host")
371 .await;
372 tracing::info!(?summary, "terminated children on host");
373 }
374 HostAgentMode::Local(host) => {
375 let summary = host
376 .terminate_children(cx, timeout, max_in_flight, "stop host")
377 .await;
378 tracing::info!(?summary, "terminated children on local host");
379 }
380 }
381 }
382 self.created.clear();
383 }
384
385 async fn drain_by_mesh_name(
389 &mut self,
390 cx: &Context<'_, Self>,
391 timeout: std::time::Duration,
392 filter: Option<&HostMeshId>,
393 ) {
394 let matching_ids: Vec<ResourceId> = self
395 .created
396 .iter()
397 .filter(|(_, state)| state.host_mesh_id.as_ref() == filter)
398 .map(|(id, _)| id.clone())
399 .collect();
400
401 if let Some(host_mode) = self.host() {
402 for id in &matching_ids {
403 if let Some(ProcCreationState {
404 created: Ok((proc_id, _)),
405 ..
406 }) = self.created.get(id)
407 {
408 match host_mode {
409 HostAgentMode::Process { host, .. } => {
410 let _ = host
411 .terminate_proc(cx, proc_id, timeout, "selective drain")
412 .await;
413 }
414 HostAgentMode::Local(host) => {
415 let _ = host
416 .terminate_proc(cx, proc_id, timeout, "selective drain")
417 .await;
418 }
419 }
420 }
421 }
422 }
423
424 for id in &matching_ids {
427 self.created.remove(id);
428 self.watching.remove(id);
429 self.pending_proc_waiters.remove(id);
430 }
431
432 tracing::info!(
433 count = matching_ids.len(),
434 filter = ?filter,
435 "selectively drained procs",
436 );
437 }
438
439 fn publish_introspect_properties(&self, cx: &Instance<Self>) {
443 let host = match self.host() {
444 Some(h) => h,
445 None => return, };
447
448 let addr = host.addr().to_string();
449 let mut children: Vec<hyperactor::introspect::IntrospectRef> = Vec::new();
450 let system_children: Vec<crate::introspect::NodeRef> = Vec::new(); children.push(hyperactor::introspect::IntrospectRef::Proc(
456 host.system_proc().proc_addr().clone(),
457 ));
458 children.push(hyperactor::introspect::IntrospectRef::Proc(
459 host.local_proc().proc_addr().clone(),
460 ));
461
462 for state in self.created.values() {
464 if let Ok((proc_id, _agent_ref)) = &state.created {
465 children.push(hyperactor::introspect::IntrospectRef::Proc(proc_id.clone()));
466 }
467 }
468
469 let num_procs = children.len();
470
471 let mut attrs = hyperactor_config::Attrs::new();
472 attrs.set(crate::introspect::NODE_TYPE, "host".to_string());
473 attrs.set(crate::introspect::ADDR, addr);
474 attrs.set(crate::introspect::NUM_PROCS, num_procs);
475 attrs.set(hyperactor::introspect::CHILDREN, children);
476 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
477 let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
482 memory.to_attrs(&mut attrs);
483 cx.publish_attrs(attrs);
484 }
485}
486
487#[async_trait]
488impl Actor for HostAgent {
489 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
490 this.bind::<Self>();
493 match self.host_mut().unwrap() {
494 HostAgentMode::Process { host, .. } => {
495 self.mailbox_handle = Some(host.serve()?);
496 let (directory, file) = hyperactor_telemetry::log_file_path(
497 hyperactor_telemetry::env::Env::current(),
498 None,
499 )
500 .unwrap();
501 eprintln!(
502 "Monarch internal logs are being written to {}/{}.log; execution id {}",
503 directory,
504 file,
505 hyperactor_telemetry::env::execution_id(),
506 );
507 }
508 HostAgentMode::Local(host) => {
509 host.serve()?;
510 }
511 };
512 this.set_system();
513 self.publish_introspect_properties(this);
514
515 let host = self.host().expect("host present");
518 let system_proc = host.system_proc().clone();
519 let local_proc = host.local_proc().clone();
520 let self_id = this.self_addr().clone();
521 this.set_query_child_handler(move |child_ref| {
522 use hyperactor::introspect::IntrospectResult;
523
524 let proc = match child_ref {
525 Addr::Proc(proc_ref) => {
526 if *proc_ref == system_proc.proc_addr() {
527 Some((&system_proc, SERVICE_PROC_NAME))
528 } else if *proc_ref == local_proc.proc_addr() {
529 Some((&local_proc, LOCAL_PROC_NAME))
530 } else {
531 None
532 }
533 }
534 _ => None,
535 };
536
537 match proc {
538 Some((proc, label)) => {
539 let all_keys = proc.all_instance_keys();
553 let mut actors: Vec<hyperactor::introspect::IntrospectRef> =
554 Vec::with_capacity(all_keys.len());
555 let mut system_actors: Vec<crate::introspect::NodeRef> = Vec::new();
556 for id in all_keys {
557 if let Some(cell) = proc.get_instance_by_id(&id) {
558 let actor_addr = cell.actor_addr().clone();
559 if cell.is_system() {
560 system_actors
561 .push(crate::introspect::NodeRef::Actor(actor_addr.clone()));
562 }
563 actors.push(hyperactor::introspect::IntrospectRef::Actor(actor_addr));
564 }
565 }
566 let mut attrs = hyperactor_config::Attrs::new();
567 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
568 attrs.set(crate::introspect::PROC_NAME, label.to_string());
569 attrs.set(crate::introspect::NUM_ACTORS, actors.len());
570 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_actors.clone());
571 let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
575 memory.to_attrs(&mut attrs);
576 attrs.set(
577 crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL,
578 proc.queue_depth_total(),
579 );
580 let mut queue_max: u64 = 0;
582 for aid in proc.all_instance_keys() {
583 if let Some(cell) = proc.get_instance_by_id(&aid) {
584 queue_max = queue_max.max(cell.queue_depth());
585 }
586 }
587 attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX, queue_max);
588 attrs.set(
589 crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
590 proc.queue_depth_high_water_mark(),
591 );
592 attrs.set(
593 crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
594 proc.last_nonzero_queue_depth_age_ms(),
595 );
596 let attrs_json =
597 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
598
599 IntrospectResult {
600 identity: hyperactor::introspect::IntrospectRef::Proc(
601 proc.proc_addr().clone(),
602 ),
603 attrs: attrs_json,
604 children: actors,
605 parent: Some(hyperactor::introspect::IntrospectRef::Actor(
606 self_id.clone(),
607 )),
608 as_of: std::time::SystemTime::now(),
609 }
610 }
611 None => {
612 let mut error_attrs = hyperactor_config::Attrs::new();
613 error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
614 error_attrs.set(
615 hyperactor::introspect::ERROR_MESSAGE,
616 format!("child {} not found", child_ref),
617 );
618 let identity = match child_ref {
619 Addr::Proc(p) => hyperactor::introspect::IntrospectRef::Proc(p.clone()),
620 Addr::Actor(a) => hyperactor::introspect::IntrospectRef::Actor(a.clone()),
621 Addr::Port(p) => {
622 hyperactor::introspect::IntrospectRef::Actor(p.actor_addr())
623 }
624 };
625 IntrospectResult {
626 identity,
627 attrs: serde_json::to_string(&error_attrs)
628 .unwrap_or_else(|_| "{}".to_string()),
629 children: Vec::new(),
630 parent: None,
631 as_of: std::time::SystemTime::now(),
632 }
633 }
634 }
635 });
636
637 self.proc_status_port = Some(this.port::<ProcStatusChanged>());
638
639 if let Some(delay) = hyperactor_config::global::get(crate::proc_agent::MESH_ORPHAN_TIMEOUT)
643 {
644 this.post_after(this, crate::proc_agent::SelfCheck::default(), delay);
645 }
646
647 Ok(())
648 }
649}
650
651impl fmt::Debug for HostAgent {
652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653 f.debug_struct("HostAgent")
654 .field("host", &"..")
655 .field("created", &self.created)
656 .finish()
657 }
658}
659
660#[async_trait]
661impl Handler<resource::CreateOrUpdate<ProcSpec>> for HostAgent {
662 #[tracing::instrument("HostAgent::CreateOrUpdate", level = "info", skip_all, fields(id=%create_or_update.id))]
663 async fn handle(
664 &mut self,
665 cx: &Context<Self>,
666 create_or_update: resource::CreateOrUpdate<ProcSpec>,
667 ) -> anyhow::Result<()> {
668 if self.created.contains_key(&create_or_update.id) {
669 return Ok(());
671 }
672
673 let host = match self.host_mut() {
674 Some(h) => h,
675 None => {
676 tracing::warn!(
677 id = %create_or_update.id,
678 "ignoring CreateOrUpdate: HostAgent has already shut down"
679 );
680 return Ok(());
681 }
682 };
683 let created = match host {
684 HostAgentMode::Process { host, .. } => {
685 host.spawn(
686 create_or_update.id.to_string(),
687 BootstrapProcConfig {
688 create_rank: create_or_update.rank.unwrap(),
689 client_config_override: create_or_update
690 .spec
691 .client_config_override
692 .clone(),
693 proc_bind: create_or_update.spec.proc_bind.clone(),
694 bootstrap_command: create_or_update.spec.bootstrap_command.clone(),
695 },
696 )
697 .await
698 }
699 HostAgentMode::Local(host) => host.spawn(create_or_update.id.to_string(), ()).await,
700 };
701
702 let rank = create_or_update.rank.unwrap();
703
704 if let Err(e) = &created {
705 tracing::error!("failed to spawn proc {}: {}", create_or_update.id, e);
706 }
707 let was_empty = self.created.is_empty();
708 self.created.insert(
709 create_or_update.id.clone(),
710 ProcCreationState {
711 rank,
712 host_mesh_id: create_or_update.spec.host_mesh_id.clone(),
713 created,
714 expiry_time: None,
715 },
716 );
717
718 if was_empty && let HostAgentState::Detached(_) = &self.state {
720 let host = match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
721 HostAgentState::Detached(h) => h,
722 _ => unreachable!(),
723 };
724 self.state = HostAgentState::Attached(host);
725 }
726
727 let proc_id = self
733 .created
734 .get(&create_or_update.id)
735 .and_then(|s| s.created.as_ref().ok())
736 .map(|(pid, _)| pid.clone());
737
738 if let Some(waiters) = self.pending_proc_waiters.get_mut(&create_or_update.id) {
739 for (_, waiter_rank, _) in waiters.iter_mut() {
740 if *waiter_rank == usize::MAX {
741 *waiter_rank = rank;
742 }
743 }
744 }
745
746 if self.pending_proc_waiters.contains_key(&create_or_update.id) {
748 if let Some(proc_id) = &proc_id {
749 self.start_watch_bridge(&create_or_update.id, proc_id).await;
750 }
751 self.notify_proc_status_changed(&create_or_update.id);
752 }
753
754 self.publish_introspect_properties(cx);
755 Ok(())
756 }
757}
758
759#[async_trait]
760impl Handler<resource::Stop> for HostAgent {
761 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
762 tracing::info!(
763 name = "HostMeshAgentStatus",
764 proc_id = %message.id,
765 reason = %message.reason,
766 "stopping proc"
767 );
768 let host = match self.host() {
769 Some(h) => h,
770 None => {
771 tracing::debug!(
773 proc_id = %message.id,
774 "ignoring Stop: HostAgent has already shut down"
775 );
776 return Ok(());
777 }
778 };
779 let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
780
781 if let Some(ProcCreationState {
782 created: Ok((proc_id, _)),
783 ..
784 }) = self.created.get(&message.id)
785 {
786 host.request_stop(cx, proc_id, timeout, &message.reason)
787 .await;
788 }
789
790 self.notify_proc_status_changed(&message.id);
792
793 self.publish_introspect_properties(cx);
794 Ok(())
795 }
796}
797
798#[async_trait]
799impl Handler<resource::GetRankStatus> for HostAgent {
800 async fn handle(
801 &mut self,
802 cx: &Context<Self>,
803 get_rank_status: resource::GetRankStatus,
804 ) -> anyhow::Result<()> {
805 use crate::StatusOverlay;
806 use crate::resource::Status;
807
808 let (rank, status) = match self.created.get(&get_rank_status.id) {
809 Some(ProcCreationState {
810 rank,
811 created: Ok((proc_id, _mesh_agent)),
812 ..
813 }) => {
814 let raw_status = match self.host() {
815 Some(host) => host.proc_status(proc_id).await.0,
816 None => resource::Status::Unknown,
817 };
818 (*rank, raw_status.clamp_min(self.min_proc_status()))
819 }
820 Some(ProcCreationState {
821 rank,
822 created: Err(e),
823 ..
824 }) => (*rank, Status::Failed(e.to_string())),
825 None => (usize::MAX, Status::NotExist),
826 };
827
828 let overlay = if rank == usize::MAX {
829 StatusOverlay::new()
830 } else {
831 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
832 .expect("valid single-run overlay")
833 };
834 get_rank_status.reply.post(cx, overlay);
835 Ok(())
836 }
837}
838
839#[async_trait]
840impl Handler<resource::WaitRankStatus> for HostAgent {
841 async fn handle(
842 &mut self,
843 cx: &Context<Self>,
844 msg: resource::WaitRankStatus,
845 ) -> anyhow::Result<()> {
846 use crate::StatusOverlay;
847 use crate::resource::Status;
848
849 match self.created.get(&msg.id) {
850 Some(ProcCreationState {
851 rank,
852 created: Ok((proc_id, _)),
853 ..
854 }) => {
855 let rank = *rank;
856 let status = match self.host() {
857 Some(host) => host.proc_status(proc_id).await.0,
858 None => Status::Stopped,
859 };
860
861 if status >= msg.min_status {
863 let overlay = StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
864 .expect("valid single-run overlay");
865 let _ = msg.reply.post(cx, overlay);
866 return Ok(());
867 }
868
869 self.pending_proc_waiters
871 .entry(msg.id.clone())
872 .or_default()
873 .push((msg.min_status, rank, msg.reply));
874
875 let proc_id = proc_id.clone();
876 self.start_watch_bridge(&msg.id, &proc_id).await;
877 }
878 Some(ProcCreationState {
879 rank,
880 created: Err(e),
881 ..
882 }) => {
883 let overlay = StatusOverlay::try_from_runs(vec![(
885 *rank..(*rank + 1),
886 Status::Failed(e.to_string()),
887 )])
888 .expect("valid single-run overlay");
889 let _ = msg.reply.post(cx, overlay);
890 }
891 None => {
892 self.pending_proc_waiters
896 .entry(msg.id.clone())
897 .or_default()
898 .push((msg.min_status, usize::MAX, msg.reply));
899 }
900 }
901
902 Ok(())
903 }
904}
905
906#[async_trait]
907impl Handler<ProcStatusChanged> for HostAgent {
908 async fn handle(&mut self, cx: &Context<Self>, msg: ProcStatusChanged) -> anyhow::Result<()> {
909 use crate::StatusOverlay;
910 use crate::resource::Status;
911
912 let status = match self.created.get(&msg.id) {
913 Some(ProcCreationState {
914 created: Ok((proc_id, _)),
915 ..
916 }) => match self.host() {
917 Some(host) => host.proc_status(proc_id).await.0,
918 None => Status::Stopped,
919 },
920 Some(ProcCreationState {
921 created: Err(_), ..
922 }) => {
923 return Ok(());
925 }
926 None => {
927 return Ok(());
929 }
930 };
931
932 let Some(waiters) = self.pending_proc_waiters.get_mut(&msg.id) else {
933 return Ok(());
934 };
935
936 let remaining = std::mem::take(waiters);
937 for (min_status, rank, reply) in remaining {
938 if status >= min_status {
939 let overlay =
940 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
941 .expect("valid single-run overlay");
942 let _ = reply.post(cx, overlay);
943 } else {
944 waiters.push((min_status, rank, reply));
945 }
946 }
947
948 if waiters.is_empty() {
949 self.pending_proc_waiters.remove(&msg.id);
950 }
951
952 Ok(())
953 }
954}
955
956impl HostAgent {
957 fn notify_proc_status_changed(&self, id: &ResourceId) {
959 if let Some(port) = &self.proc_status_port {
960 let client = Instance::<()>::self_client();
961 let _ = port.post(client, ProcStatusChanged { id: id.clone() });
962 }
963 }
964
965 async fn start_watch_bridge(&mut self, id: &ResourceId, proc_id: &ProcAddr) {
968 if self.watching.contains(id) {
969 return;
970 }
971 self.watching.insert(id.clone());
972
973 let port = match &self.proc_status_port {
974 Some(p) => p.clone(),
975 None => return,
976 };
977
978 match self.host() {
979 Some(HostAgentMode::Process { host, .. }) => {
980 if let Some(rx) = host.manager().watch(proc_id).await {
981 start_proc_watch(port, rx, id.clone(), |s| s.clone().into());
982 }
983 }
984 Some(HostAgentMode::Local(host)) => {
985 if let Some(rx) = host.manager().watch(proc_id).await {
986 start_proc_watch(port, rx, id.clone(), |s| (*s).into());
987 }
988 }
989 None => {}
990 }
991 }
992}
993
994fn start_proc_watch<S>(
997 port: PortHandle<ProcStatusChanged>,
998 mut rx: tokio::sync::watch::Receiver<S>,
999 id: ResourceId,
1000 to_status: impl Fn(&S) -> resource::Status + Send + 'static,
1001) where
1002 S: Send + Sync + 'static,
1003{
1004 let client = Instance::<()>::self_client();
1007 tokio::spawn(async move {
1008 loop {
1009 match rx.changed().await {
1010 Ok(()) => {
1011 let status = to_status(&*rx.borrow());
1012 let terminated = status.is_terminated();
1013 let _ = port.post(client, ProcStatusChanged { id: id.clone() });
1014 if terminated {
1015 return;
1016 }
1017 }
1018 Err(_) => {
1019 let _ = port.post(client, ProcStatusChanged { id: id.clone() });
1020 return;
1021 }
1022 }
1023 }
1024 });
1025}
1026
1027#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1028pub struct ShutdownHost {
1029 pub timeout: std::time::Duration,
1032 pub max_in_flight: usize,
1034 #[reply]
1036 pub ack: hyperactor::PortRef<()>,
1037}
1038wirevalue::register_type!(ShutdownHost);
1039
1040#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1048pub struct DrainHost {
1049 pub timeout: std::time::Duration,
1050 pub max_in_flight: usize,
1051 pub host_mesh_id: Option<HostMeshId>,
1052 #[reply]
1053 pub ack: hyperactor::PortRef<()>,
1054}
1055wirevalue::register_type!(DrainHost);
1056
1057#[async_trait]
1058impl Handler<DrainHost> for HostAgent {
1059 async fn handle(&mut self, cx: &Context<Self>, msg: DrainHost) -> anyhow::Result<()> {
1060 if msg.host_mesh_id.is_some() {
1061 self.drain_by_mesh_name(cx, msg.timeout, msg.host_mesh_id.as_ref())
1063 .await;
1064 msg.ack.post(cx, ());
1065 return Ok(());
1066 }
1067
1068 let host = match std::mem::replace(&mut self.state, HostAgentState::Draining) {
1070 HostAgentState::Attached(h) => h,
1071 other @ (HostAgentState::Detached(_) | HostAgentState::Draining) => {
1072 self.state = other;
1074 msg.ack.post(cx, ());
1075 return Ok(());
1076 }
1077 HostAgentState::Shutdown => {
1078 self.state = HostAgentState::Shutdown;
1079 msg.ack.post(cx, ());
1080 return Ok(());
1081 }
1082 };
1083
1084 let done_port = cx.port::<DrainComplete>();
1093
1094 cx.spawn_with_name(
1095 "drain_worker",
1096 DrainWorker {
1097 host: Some(host),
1098 timeout: msg.timeout,
1099 max_in_flight: msg.max_in_flight,
1100 ack: Some(msg.ack),
1101 done_notify: done_port,
1102 },
1103 )?;
1104
1105 Ok(())
1106 }
1107}
1108
1109#[async_trait]
1110impl Handler<DrainComplete> for HostAgent {
1111 async fn handle(&mut self, cx: &Context<Self>, msg: DrainComplete) -> anyhow::Result<()> {
1112 self.state = HostAgentState::Detached(msg.host);
1113 self.created.clear();
1114 msg.ack.post(cx, ());
1115 Ok(())
1116 }
1117}
1118
1119#[async_trait]
1120impl Handler<ShutdownHost> for HostAgent {
1121 async fn handle(&mut self, cx: &Context<Self>, msg: ShutdownHost) -> anyhow::Result<()> {
1122 if !self.created.is_empty() {
1129 self.drain(cx, msg.timeout, msg.max_in_flight).await;
1130 }
1131
1132 msg.ack.post(cx, ());
1135
1136 match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
1139 HostAgentState::Detached(HostAgentMode::Process {
1140 shutdown_tx: Some(tx),
1141 ..
1142 })
1143 | HostAgentState::Attached(HostAgentMode::Process {
1144 shutdown_tx: Some(tx),
1145 ..
1146 }) => {
1147 tracing::info!(
1148 proc_id = %cx.self_addr().proc_addr(),
1149 actor_id = %cx.self_addr(),
1150 "host is shut down, sending mailbox handle to bootstrap for draining"
1151 );
1152 if let Some(handle) = self.mailbox_handle.take() {
1153 let _ = tx.send(handle);
1154 }
1155 }
1156 _ => {}
1157 }
1158
1159 Ok(())
1160 }
1161}
1162
1163#[derive(
1164 Debug,
1165 Clone,
1166 PartialEq,
1167 Eq,
1168 Named,
1169 Serialize,
1170 Deserialize,
1171 hyperactor::Bind,
1172 hyperactor::Unbind
1173)]
1174pub struct ProcState {
1175 pub proc_id: ProcAddr,
1176 pub create_rank: usize,
1177 pub mesh_agent: ActorRef<ProcAgent>,
1178 pub bootstrap_command: Option<BootstrapCommand>,
1179 pub proc_status: Option<bootstrap::ProcStatus>,
1180}
1181wirevalue::register_type!(ProcState);
1182
1183#[async_trait]
1184impl Handler<resource::GetState<ProcState>> for HostAgent {
1185 async fn handle(
1186 &mut self,
1187 cx: &Context<Self>,
1188 get_state: resource::GetState<ProcState>,
1189 ) -> anyhow::Result<()> {
1190 let state = match self.created.get(&get_state.id) {
1191 Some(ProcCreationState {
1192 rank,
1193 created: Ok((proc_id, mesh_agent)),
1194 ..
1195 }) => {
1196 let (raw_status, proc_status, bootstrap_command) = match self.host() {
1197 Some(host) => {
1198 let (status, proc_status) = host.proc_status(proc_id).await;
1199 (status, proc_status, host.bootstrap_command())
1200 }
1201 None => (resource::Status::Unknown, None, None),
1202 };
1203 let status = raw_status.clamp_min(self.min_proc_status());
1204 resource::State {
1205 id: get_state.id.clone(),
1206 status,
1207 state: Some(ProcState {
1208 proc_id: proc_id.clone(),
1209 create_rank: *rank,
1210 mesh_agent: mesh_agent.clone(),
1211 bootstrap_command,
1212 proc_status,
1213 }),
1214 generation: 0,
1215 timestamp: std::time::SystemTime::now(),
1216 }
1217 }
1218 Some(ProcCreationState {
1219 created: Err(e), ..
1220 }) => resource::State {
1221 id: get_state.id.clone(),
1222 status: resource::Status::Failed(e.to_string()),
1223 state: None,
1224 generation: 0,
1225 timestamp: std::time::SystemTime::now(),
1226 },
1227 None => resource::State {
1228 id: get_state.id.clone(),
1229 status: resource::Status::NotExist,
1230 state: None,
1231 generation: 0,
1232 timestamp: std::time::SystemTime::now(),
1233 },
1234 };
1235
1236 get_state.reply.post(cx, state);
1237 Ok(())
1238 }
1239}
1240
1241#[async_trait]
1242impl Handler<crate::proc_agent::SelfCheck> for HostAgent {
1243 async fn handle(
1244 &mut self,
1245 cx: &Context<Self>,
1246 _: crate::proc_agent::SelfCheck,
1247 ) -> anyhow::Result<()> {
1248 let Some(duration) = hyperactor_config::global::get(crate::proc_agent::MESH_ORPHAN_TIMEOUT)
1253 else {
1254 return Ok(());
1255 };
1256 let now = std::time::SystemTime::now();
1257 let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
1258
1259 let expired: Vec<ResourceId> = self
1260 .created
1261 .iter()
1262 .filter_map(|(id, state)| {
1263 let expiry = state.expiry_time?;
1264 if now > expiry { Some(id.clone()) } else { None }
1265 })
1266 .collect();
1267
1268 if !expired.is_empty() {
1269 tracing::info!(
1270 "stopping {} orphaned procs past their keepalive expiry",
1271 expired.len(),
1272 );
1273 }
1274
1275 for id in expired {
1276 if let Some(ProcCreationState {
1277 created: Ok((proc_id, _)),
1278 ..
1279 }) = self.created.get(&id)
1280 {
1281 let proc_id = proc_id.clone();
1282 if let Some(host) = self.host() {
1283 host.request_stop(cx, &proc_id, timeout, "orphaned").await;
1284 }
1285 if let Some(state) = self.created.get_mut(&id) {
1287 state.expiry_time = None;
1288 }
1289 }
1290 }
1291
1292 cx.post_after(cx, crate::proc_agent::SelfCheck::default(), duration);
1293 Ok(())
1294 }
1295}
1296
1297#[async_trait]
1298impl Handler<resource::List> for HostAgent {
1299 async fn handle(&mut self, cx: &Context<Self>, list: resource::List) -> anyhow::Result<()> {
1300 list.reply.post(cx, self.created.keys().cloned().collect());
1301 Ok(())
1302 }
1303}
1304
1305#[async_trait]
1306impl Handler<resource::KeepaliveGetState<ProcState>> for HostAgent {
1307 async fn handle(
1308 &mut self,
1309 cx: &Context<Self>,
1310 message: resource::KeepaliveGetState<ProcState>,
1311 ) -> anyhow::Result<()> {
1312 if let Some(state) = self.created.get_mut(&message.get_state.id) {
1317 state.expiry_time = Some(message.expires_after);
1318 }
1319 <Self as Handler<resource::GetState<ProcState>>>::handle(self, cx, message.get_state).await
1320 }
1321}
1322
1323#[async_trait]
1324impl Handler<resource::StreamState<ProcState>> for HostAgent {
1325 async fn handle(
1326 &mut self,
1327 cx: &Context<Self>,
1328 stream_state: resource::StreamState<ProcState>,
1329 ) -> anyhow::Result<()> {
1330 let state = match self.created.get(&stream_state.id) {
1333 Some(ProcCreationState {
1334 rank,
1335 created: Ok((proc_id, mesh_agent)),
1336 ..
1337 }) => {
1338 let (raw_status, proc_status, bootstrap_command) = match self.host() {
1339 Some(host) => {
1340 let (status, proc_status) = host.proc_status(proc_id).await;
1341 (status, proc_status, host.bootstrap_command())
1342 }
1343 None => (resource::Status::Unknown, None, None),
1344 };
1345 let status = raw_status.clamp_min(self.min_proc_status());
1346 resource::State {
1347 id: stream_state.id.clone(),
1348 status,
1349 state: Some(ProcState {
1350 proc_id: proc_id.clone(),
1351 create_rank: *rank,
1352 mesh_agent: mesh_agent.clone(),
1353 bootstrap_command,
1354 proc_status,
1355 }),
1356 generation: 0,
1357 timestamp: std::time::SystemTime::now(),
1358 }
1359 }
1360 Some(ProcCreationState {
1361 created: Err(e), ..
1362 }) => resource::State {
1363 id: stream_state.id.clone(),
1364 status: resource::Status::Failed(e.to_string()),
1365 state: None,
1366 generation: 0,
1367 timestamp: std::time::SystemTime::now(),
1368 },
1369 None => resource::State {
1370 id: stream_state.id.clone(),
1371 status: resource::Status::NotExist,
1372 state: None,
1373 generation: 0,
1374 timestamp: std::time::SystemTime::now(),
1375 },
1376 };
1377
1378 let mut headers = Flattrs::new();
1379 headers.set(crate::proc_agent::STREAM_STATE_SUBSCRIBER, true);
1380 stream_state
1381 .subscriber
1382 .post_with_headers(cx, headers, state);
1383 Ok(())
1384 }
1385}
1386
1387#[derive(Debug, Named, Handler, RefClient, HandleClient, Serialize, Deserialize)]
1399pub struct SetClientConfig {
1400 pub attrs: Attrs,
1401 #[reply]
1402 pub done: PortRef<()>,
1403}
1404wirevalue::register_type!(SetClientConfig);
1405
1406#[async_trait]
1407impl Handler<SetClientConfig> for HostAgent {
1408 async fn handle(&mut self, cx: &Context<Self>, msg: SetClientConfig) -> anyhow::Result<()> {
1409 hyperactor_config::global::set(
1413 hyperactor_config::global::Source::ClientOverride,
1414 msg.attrs,
1415 );
1416 tracing::debug!("installed client config override on host agent");
1417 msg.done.post(cx, ());
1418 Ok(())
1419 }
1420}
1421
1422#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1431pub struct GetLocalProc {
1432 #[reply]
1433 pub proc_mesh_agent: PortHandle<ActorHandle<ProcAgent>>,
1434}
1435
1436#[async_trait]
1437impl Handler<GetLocalProc> for HostAgent {
1438 async fn handle(
1439 &mut self,
1440 cx: &Context<Self>,
1441 GetLocalProc { proc_mesh_agent }: GetLocalProc,
1442 ) -> anyhow::Result<()> {
1443 let host = self
1444 .host()
1445 .ok_or_else(|| anyhow::anyhow!("HostAgent has already shut down"))?;
1446 let agent = self
1447 .local_mesh_agent
1448 .get_or_init(|| ProcAgent::boot_v1(host.local_proc().clone(), None));
1449
1450 match agent {
1451 Err(e) => anyhow::bail!("error booting local proc: {}", e),
1452 Ok(agent) => proc_mesh_agent.post(cx, agent.clone()),
1453 };
1454
1455 Ok(())
1456 }
1457}
1458
1459#[async_trait]
1460impl Handler<PySpyDump> for HostAgent {
1461 async fn handle(
1462 &mut self,
1463 cx: &Context<Self>,
1464 message: PySpyDump,
1465 ) -> Result<(), anyhow::Error> {
1466 PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
1467 }
1468}
1469
1470#[async_trait]
1471impl Handler<PySpyProfile> for HostAgent {
1472 async fn handle(
1473 &mut self,
1474 cx: &Context<Self>,
1475 message: PySpyProfile,
1476 ) -> Result<(), anyhow::Error> {
1477 PySpyProfileWorker::spawn_and_forward(cx, message.request, message.result)
1478 }
1479}
1480
1481#[async_trait]
1482impl Handler<ConfigDump> for HostAgent {
1483 async fn handle(
1484 &mut self,
1485 cx: &Context<Self>,
1486 message: ConfigDump,
1487 ) -> Result<(), anyhow::Error> {
1488 let entries = hyperactor_config::global::config_entries();
1489 message.result.post(cx, ConfigDumpResult { entries });
1490 Ok(())
1491 }
1492}
1493
1494#[cfg(all(test, fbcode_build))]
1495mod tests {
1496 use std::assert_matches;
1497
1498 use hyperactor::ActorAddr;
1499 use hyperactor::Proc;
1500 use hyperactor::channel::ChannelTransport;
1501 use hyperactor::id::Label;
1502
1503 use super::*;
1504 use crate::bootstrap::ProcStatus;
1505 use crate::mesh_id::ResourceId;
1506 use crate::resource::CreateOrUpdateClient;
1507 use crate::resource::GetStateClient;
1508 use crate::resource::WaitRankStatusClient;
1509
1510 #[tokio::test]
1511 async fn test_basic() {
1512 let host = Host::new(
1513 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1514 ChannelTransport::Unix.any(),
1515 )
1516 .await
1517 .unwrap();
1518
1519 let host_addr = host.addr().clone();
1520 let system_proc = host.system_proc().clone();
1521 let host_agent = system_proc
1522 .spawn(
1523 HOST_MESH_AGENT_ACTOR_NAME,
1524 HostAgent::new(HostAgentMode::Process {
1525 host,
1526 shutdown_tx: None,
1527 }),
1528 )
1529 .unwrap();
1530
1531 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1532 let (client, _client_handle) = client_proc.client("client").unwrap();
1533
1534 let id = ResourceId::instance(Label::new("proc1").unwrap());
1535
1536 host_agent
1539 .create_or_update(
1540 &client,
1541 id.clone(),
1542 resource::Rank::new(0),
1543 ProcSpec::default(),
1544 )
1545 .await
1546 .unwrap();
1547 assert_matches!(
1548 host_agent.get_state(&client, id.clone()).await.unwrap(),
1549 resource::State {
1550 id: resource_id,
1551 status: resource::Status::Running,
1552 state: Some(ProcState {
1553 proc_id,
1555 mesh_agent,
1558 bootstrap_command,
1559 proc_status: Some(ProcStatus::Ready { started_at: _, addr: _, agent: proc_status_mesh_agent}),
1560 ..
1561 }),
1562 ..
1563 } if id == resource_id
1564 && proc_id == id.proc_addr(host_addr.clone())
1565 && mesh_agent == ActorRef::attest(id.proc_addr(host_addr.clone()).actor_addr(crate::proc_agent::PROC_AGENT_ACTOR_NAME)) && bootstrap_command == Some(BootstrapCommand::test())
1566 && mesh_agent == proc_status_mesh_agent
1567 );
1568 }
1569
1570 #[tokio::test]
1572 async fn test_wait_rank_status_already_running() {
1573 let host = Host::new(
1574 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1575 ChannelTransport::Unix.any(),
1576 )
1577 .await
1578 .unwrap();
1579
1580 let system_proc = host.system_proc().clone();
1581 let host_agent = system_proc
1582 .spawn(
1583 HOST_MESH_AGENT_ACTOR_NAME,
1584 HostAgent::new(HostAgentMode::Process {
1585 host,
1586 shutdown_tx: None,
1587 }),
1588 )
1589 .unwrap();
1590
1591 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1592 let (client, _client_handle) = client_proc.client("client").unwrap();
1593
1594 let id = ResourceId::instance(Label::new("proc1").unwrap());
1595 host_agent
1596 .create_or_update(
1597 &client,
1598 id.clone(),
1599 resource::Rank::new(0),
1600 ProcSpec::default(),
1601 )
1602 .await
1603 .unwrap();
1604
1605 let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1607 host_agent
1608 .wait_rank_status(&client, id, resource::Status::Running, port.bind())
1609 .await
1610 .unwrap();
1611
1612 let overlay = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1613 .await
1614 .expect("reply timed out")
1615 .expect("reply channel closed");
1616 assert!(!overlay.is_empty(), "expected non-empty overlay");
1617 }
1618
1619 #[tokio::test]
1622 async fn test_wait_rank_status_stop() {
1623 let host = Host::new(
1624 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1625 ChannelTransport::Unix.any(),
1626 )
1627 .await
1628 .unwrap();
1629
1630 let system_proc = host.system_proc().clone();
1631 let host_agent = system_proc
1632 .spawn(
1633 HOST_MESH_AGENT_ACTOR_NAME,
1634 HostAgent::new(HostAgentMode::Process {
1635 host,
1636 shutdown_tx: None,
1637 }),
1638 )
1639 .unwrap();
1640
1641 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1642 let (client, _client_handle) = client_proc.client("client").unwrap();
1643
1644 let id = ResourceId::instance(Label::new("proc1").unwrap());
1645 host_agent
1646 .create_or_update(
1647 &client,
1648 id.clone(),
1649 resource::Rank::new(0),
1650 ProcSpec::default(),
1651 )
1652 .await
1653 .unwrap();
1654
1655 let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1657 host_agent
1658 .wait_rank_status(&client, id.clone(), resource::Status::Stopped, port.bind())
1659 .await
1660 .unwrap();
1661
1662 crate::resource::StopClient::stop(&host_agent, &client, id, "test".to_string())
1664 .await
1665 .unwrap();
1666
1667 let overlay = tokio::time::timeout(Duration::from_secs(30), rx.recv())
1669 .await
1670 .expect("reply timed out — proc did not reach Stopped")
1671 .expect("reply channel closed");
1672 assert!(!overlay.is_empty(), "expected non-empty overlay");
1673 }
1674
1675 #[tokio::test]
1678 async fn test_wait_rank_status_before_proc_exists() {
1679 let host = Host::new(
1680 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1681 ChannelTransport::Unix.any(),
1682 )
1683 .await
1684 .unwrap();
1685
1686 let system_proc = host.system_proc().clone();
1687 let host_agent = system_proc
1688 .spawn(
1689 HOST_MESH_AGENT_ACTOR_NAME,
1690 HostAgent::new(HostAgentMode::Process {
1691 host,
1692 shutdown_tx: None,
1693 }),
1694 )
1695 .unwrap();
1696
1697 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1698 let (client, _client_handle) = client_proc.client("client").unwrap();
1699
1700 let id = ResourceId::instance(Label::new("proc1").unwrap());
1701
1702 let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1704 host_agent
1705 .wait_rank_status(&client, id.clone(), resource::Status::Running, port.bind())
1706 .await
1707 .unwrap();
1708
1709 host_agent
1712 .create_or_update(&client, id, resource::Rank::new(0), ProcSpec::default())
1713 .await
1714 .unwrap();
1715
1716 let overlay = tokio::time::timeout(Duration::from_secs(10), rx.recv())
1717 .await
1718 .expect("reply timed out — waiter was not flushed after CreateOrUpdate")
1719 .expect("reply channel closed");
1720 assert!(!overlay.is_empty(), "expected non-empty overlay");
1721 }
1722
1723 #[tokio::test]
1726 async fn test_drain_scoped_to_host_mesh_id() {
1727 let host = Host::new(
1728 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1729 ChannelTransport::Unix.any(),
1730 )
1731 .await
1732 .unwrap();
1733
1734 let system_proc = host.system_proc().clone();
1735 let host_agent = system_proc
1736 .spawn(
1737 HOST_MESH_AGENT_ACTOR_NAME,
1738 HostAgent::new(HostAgentMode::Process {
1739 host,
1740 shutdown_tx: None,
1741 }),
1742 )
1743 .unwrap();
1744
1745 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1746 let (client, _client_handle) = client_proc.client("client").unwrap();
1747
1748 let mesh_a = HostMeshId::instance(Label::new("mesh-a").unwrap());
1749 let mesh_b = HostMeshId::instance(Label::new("mesh-b").unwrap());
1750 let proc_a_id = ResourceId::instance(Label::new("proc-a").unwrap());
1751 let proc_b_id = ResourceId::instance(Label::new("proc-b").unwrap());
1752
1753 let spec_a = ProcSpec {
1755 host_mesh_id: Some(mesh_a.clone()),
1756 ..Default::default()
1757 };
1758 host_agent
1759 .create_or_update(&client, proc_a_id.clone(), resource::Rank::new(0), spec_a)
1760 .await
1761 .unwrap();
1762
1763 let spec_b = ProcSpec {
1765 host_mesh_id: Some(mesh_b.clone()),
1766 ..Default::default()
1767 };
1768 host_agent
1769 .create_or_update(&client, proc_b_id.clone(), resource::Rank::new(1), spec_b)
1770 .await
1771 .unwrap();
1772
1773 assert_matches!(
1775 host_agent
1776 .get_state(&client, proc_a_id.clone())
1777 .await
1778 .unwrap(),
1779 resource::State {
1780 status: resource::Status::Running,
1781 ..
1782 }
1783 );
1784 assert_matches!(
1785 host_agent
1786 .get_state(&client, proc_b_id.clone())
1787 .await
1788 .unwrap(),
1789 resource::State {
1790 status: resource::Status::Running,
1791 ..
1792 }
1793 );
1794
1795 host_agent
1797 .drain_host(&client, Duration::from_secs(5), 16, Some(mesh_a.clone()))
1798 .await
1799 .unwrap();
1800
1801 assert_matches!(
1803 host_agent
1804 .get_state(&client, proc_a_id.clone())
1805 .await
1806 .unwrap(),
1807 resource::State {
1808 status: resource::Status::NotExist,
1809 ..
1810 }
1811 );
1812
1813 assert_matches!(
1815 host_agent
1816 .get_state(&client, proc_b_id.clone())
1817 .await
1818 .unwrap(),
1819 resource::State {
1820 status: resource::Status::Running,
1821 ..
1822 }
1823 );
1824 }
1825
1826 #[tokio::test]
1829 async fn test_drain_none_drains_all() {
1830 let host = Host::new(
1831 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1832 ChannelTransport::Unix.any(),
1833 )
1834 .await
1835 .unwrap();
1836
1837 let system_proc = host.system_proc().clone();
1838 let host_agent = system_proc
1839 .spawn(
1840 HOST_MESH_AGENT_ACTOR_NAME,
1841 HostAgent::new(HostAgentMode::Process {
1842 host,
1843 shutdown_tx: None,
1844 }),
1845 )
1846 .unwrap();
1847
1848 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1849 let (client, _client_handle) = client_proc.client("client").unwrap();
1850
1851 let mesh_a = HostMeshId::instance(Label::new("mesh-a").unwrap());
1852 let mesh_b = HostMeshId::instance(Label::new("mesh-b").unwrap());
1853 let proc_a_id = ResourceId::instance(Label::new("proc-a").unwrap());
1854 let proc_b_id = ResourceId::instance(Label::new("proc-b").unwrap());
1855
1856 let spec_a = ProcSpec {
1857 host_mesh_id: Some(mesh_a),
1858 ..Default::default()
1859 };
1860 host_agent
1861 .create_or_update(&client, proc_a_id.clone(), resource::Rank::new(0), spec_a)
1862 .await
1863 .unwrap();
1864
1865 let spec_b = ProcSpec {
1866 host_mesh_id: Some(mesh_b),
1867 ..Default::default()
1868 };
1869 host_agent
1870 .create_or_update(&client, proc_b_id.clone(), resource::Rank::new(1), spec_b)
1871 .await
1872 .unwrap();
1873
1874 host_agent
1876 .drain_host(&client, Duration::from_secs(5), 16, None)
1877 .await
1878 .unwrap();
1879
1880 assert_matches!(
1882 host_agent.get_state(&client, proc_a_id).await.unwrap(),
1883 resource::State {
1884 status: resource::Status::NotExist,
1885 ..
1886 }
1887 );
1888 assert_matches!(
1889 host_agent.get_state(&client, proc_b_id).await.unwrap(),
1890 resource::State {
1891 status: resource::Status::NotExist,
1892 ..
1893 }
1894 );
1895 }
1896
1897 #[tokio::test]
1903 async fn test_service_proc_query_child_has_queue_stats() {
1904 use hyperactor::actor::ActorStatus;
1905 use hyperactor::introspect::IntrospectMessage;
1906 use hyperactor::introspect::IntrospectResult;
1907
1908 let host = Host::new(
1909 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1910 ChannelTransport::Unix.any(),
1911 )
1912 .await
1913 .unwrap();
1914
1915 let system_proc = host.system_proc().clone();
1916 let host_agent = system_proc
1917 .spawn(
1918 HOST_MESH_AGENT_ACTOR_NAME,
1919 HostAgent::new(HostAgentMode::Process {
1920 host,
1921 shutdown_tx: None,
1922 }),
1923 )
1924 .unwrap();
1925
1926 host_agent
1928 .status()
1929 .wait_for(|s| matches!(s, ActorStatus::Idle))
1930 .await
1931 .unwrap();
1932
1933 let client_proc =
1934 Proc::direct(ChannelTransport::Unix.any(), "qd_client".to_string()).unwrap();
1935 let (client, _client_handle) = client_proc.client("client").unwrap();
1936
1937 let name = ResourceId::instance(Label::new("qd_test_proc").unwrap());
1940 host_agent
1941 .create_or_update(
1942 &client,
1943 name.clone(),
1944 resource::Rank::new(0),
1945 ProcSpec::default(),
1946 )
1947 .await
1948 .unwrap();
1949
1950 let agent_ref = system_proc
1953 .proc_addr()
1954 .actor_addr(HOST_MESH_AGENT_ACTOR_NAME);
1955 let agent_id: ActorAddr = agent_ref;
1956 let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1957
1958 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(10);
1961 loop {
1962 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1963 port.post(
1964 &client,
1965 IntrospectMessage::QueryChild {
1966 child_ref: Addr::Proc(system_proc.proc_addr().clone()),
1967 reply: reply_port.bind(),
1968 },
1969 );
1970 let payload = tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1971 .await
1972 .expect("QueryChild timed out")
1973 .expect("reply channel closed");
1974
1975 let attrs: hyperactor_config::Attrs =
1976 serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1977
1978 let hwm = attrs
1979 .get(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK)
1980 .copied()
1981 .unwrap_or(0);
1982 let last_nonzero: Option<u64> = attrs
1983 .get(crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS)
1984 .copied()
1985 .flatten();
1986
1987 if hwm > 0 {
1988 assert!(
1991 last_nonzero.is_some(),
1992 "last-nonzero should be Some when watermark is {hwm}",
1993 );
1994 break;
1995 }
1996
1997 assert!(
1998 tokio::time::Instant::now() < deadline,
1999 "timed out waiting for service proc watermark > 0",
2000 );
2001 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2002 }
2003 }
2004}