1#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fmt;
20use std::pin::Pin;
21use std::sync::OnceLock;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use hyperactor::Actor;
26use hyperactor::ActorHandle;
27use hyperactor::Context;
28use hyperactor::HandleClient;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::PortHandle;
32use hyperactor::Proc;
33use hyperactor::RefClient;
34use hyperactor::channel::ChannelTransport;
35use hyperactor::context;
36use hyperactor::host::Host;
37use hyperactor::host::HostError;
38use hyperactor::host::LOCAL_PROC_NAME;
39use hyperactor::host::LocalProcManager;
40use hyperactor::host::SERVICE_PROC_NAME;
41use hyperactor::host::SingleTerminate;
42use hyperactor::mailbox::MailboxServerHandle;
43use hyperactor::reference as hyperactor_reference;
44use hyperactor_config::Flattrs;
45use hyperactor_config::attrs::Attrs;
46use serde::Deserialize;
47use serde::Serialize;
48use tokio::time::Duration;
49use typeuri::Named;
50
51use crate::Name;
52use crate::bootstrap;
53use crate::bootstrap::BootstrapCommand;
54use crate::bootstrap::BootstrapProcConfig;
55use crate::bootstrap::BootstrapProcManager;
56use crate::config_dump::ConfigDump;
57use crate::config_dump::ConfigDumpResult;
58use crate::proc_agent::ProcAgent;
59use crate::pyspy::PySpyDump;
60use crate::pyspy::PySpyWorker;
61use crate::resource;
62use crate::resource::ProcSpec;
63
64pub(crate) type ProcManagerSpawnFuture =
65 Pin<Box<dyn Future<Output = anyhow::Result<ActorHandle<ProcAgent>>> + Send>>;
66pub(crate) type ProcManagerSpawnFn = Box<dyn Fn(Proc) -> ProcManagerSpawnFuture + Send + Sync>;
67
68#[derive(EnumAsInner)]
79pub enum HostAgentMode {
80 Process {
81 host: Host<BootstrapProcManager>,
82 shutdown_tx: Option<tokio::sync::oneshot::Sender<MailboxServerHandle>>,
86 },
87 Local(Host<LocalProcManager<ProcManagerSpawnFn>>),
88}
89
90impl HostAgentMode {
91 pub(crate) fn addr(&self) -> &hyperactor::channel::ChannelAddr {
92 #[allow(clippy::match_same_arms)]
93 match self {
94 HostAgentMode::Process { host, .. } => host.addr(),
95 HostAgentMode::Local(host) => host.addr(),
96 }
97 }
98
99 pub(crate) fn system_proc(&self) -> &Proc {
100 #[allow(clippy::match_same_arms)]
101 match self {
102 HostAgentMode::Process { host, .. } => host.system_proc(),
103 HostAgentMode::Local(host) => host.system_proc(),
104 }
105 }
106
107 pub(crate) fn local_proc(&self) -> &Proc {
108 #[allow(clippy::match_same_arms)]
109 match self {
110 HostAgentMode::Process { host, .. } => host.local_proc(),
111 HostAgentMode::Local(host) => host.local_proc(),
112 }
113 }
114
115 async fn request_stop(
119 &self,
120 cx: &impl context::Actor,
121 proc: &hyperactor_reference::ProcId,
122 timeout: Duration,
123 reason: &str,
124 ) {
125 match self {
126 HostAgentMode::Process { host, .. } => {
127 host.manager().request_stop(cx, proc, timeout, reason).await;
128 }
129 HostAgentMode::Local(host) => {
130 host.manager().request_stop(proc, timeout, reason).await;
131 }
132 }
133 }
134
135 async fn proc_status(
140 &self,
141 proc_id: &hyperactor_reference::ProcId,
142 ) -> (resource::Status, Option<bootstrap::ProcStatus>) {
143 match self {
144 HostAgentMode::Process { host, .. } => match host.manager().status(proc_id).await {
145 Some(proc_status) => (proc_status.clone().into(), Some(proc_status)),
146 None => (resource::Status::Unknown, None),
147 },
148 HostAgentMode::Local(host) => {
149 let status = match host.manager().local_proc_status(proc_id).await {
150 Some(hyperactor::host::LocalProcStatus::Stopping) => resource::Status::Stopping,
151 Some(hyperactor::host::LocalProcStatus::Stopped) => resource::Status::Stopped,
152 None => resource::Status::Running,
153 };
154 (status, None)
155 }
156 }
157 }
158
159 fn bootstrap_command(&self) -> Option<BootstrapCommand> {
161 match self {
162 HostAgentMode::Process { host, .. } => Some(host.manager().command().clone()),
163 HostAgentMode::Local(_) => None,
164 }
165 }
166}
167
168#[derive(Debug)]
169pub(crate) struct ProcCreationState {
170 pub(crate) rank: usize,
171 pub(crate) host_mesh_name: Option<crate::Name>,
172 pub(crate) created: Result<
173 (
174 hyperactor_reference::ProcId,
175 hyperactor_reference::ActorRef<ProcAgent>,
176 ),
177 HostError,
178 >,
179}
180
181pub const HOST_MESH_AGENT_ACTOR_NAME: &str = "host_agent";
183
184enum HostAgentState {
186 Detached(HostAgentMode),
189 Attached(HostAgentMode),
191 Draining,
195 Shutdown,
197}
198
199#[derive(Debug, Serialize, Deserialize, Named)]
204struct ProcStatusChanged {
205 name: Name,
206}
207
208struct DrainComplete {
211 host: HostAgentMode,
212 ack: hyperactor_reference::PortRef<()>,
213}
214
215#[hyperactor::export(handlers = [])]
220struct DrainWorker {
221 host: Option<HostAgentMode>,
222 timeout: Duration,
223 max_in_flight: usize,
224 ack: Option<hyperactor_reference::PortRef<()>>,
225 done_notify: PortHandle<DrainComplete>,
226}
227
228#[async_trait]
229impl Actor for DrainWorker {
230 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
231 if let Some(host) = self.host.as_mut() {
232 match host {
233 HostAgentMode::Process { host, .. } => {
234 host.terminate_children(
235 this,
236 self.timeout,
237 self.max_in_flight.clamp(1, 256),
238 "drain host",
239 )
240 .await;
241 }
242 HostAgentMode::Local(host) => {
243 host.terminate_children(this, self.timeout, self.max_in_flight, "drain host")
244 .await;
245 }
246 }
247 }
248
249 if let (Some(host), Some(ack)) = (self.host.take(), self.ack.take()) {
252 let _ = self.done_notify.send(this, DrainComplete { host, ack });
253 }
254
255 Ok(())
256 }
257}
258
259impl fmt::Debug for DrainWorker {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.debug_struct("DrainWorker")
262 .field("timeout", &self.timeout)
263 .field("max_in_flight", &self.max_in_flight)
264 .finish()
265 }
266}
267
268#[hyperactor::export(
269 handlers=[
270 resource::CreateOrUpdate<ProcSpec>,
271 resource::Stop,
272 resource::GetState<ProcState>,
273 resource::GetRankStatus { cast = true },
274 resource::WaitRankStatus { cast = true },
275 resource::List,
276 ShutdownHost,
277 DrainHost,
278 SetClientConfig,
279 ProcStatusChanged,
280 PySpyDump,
281 ConfigDump,
282 ]
283)]
284pub struct HostAgent {
285 state: HostAgentState,
286 pub(crate) created: HashMap<Name, ProcCreationState>,
287 pending_proc_waiters: HashMap<
291 Name,
292 Vec<(
293 resource::Status,
294 usize,
295 hyperactor_reference::PortRef<crate::StatusOverlay>,
296 )>,
297 >,
298 watching: HashSet<Name>,
300 proc_status_port: Option<PortHandle<ProcStatusChanged>>,
302 local_mesh_agent: OnceLock<anyhow::Result<ActorHandle<ProcAgent>>>,
306 mailbox_handle: Option<MailboxServerHandle>,
312}
313
314impl HostAgent {
315 pub fn new(host: HostAgentMode) -> Self {
317 Self {
318 state: HostAgentState::Detached(host),
319 created: HashMap::new(),
320 pending_proc_waiters: HashMap::new(),
321 watching: HashSet::new(),
322 proc_status_port: None,
323 local_mesh_agent: OnceLock::new(),
324 mailbox_handle: None,
325 }
326 }
327
328 fn min_proc_status(&self) -> resource::Status {
331 match &self.state {
332 HostAgentState::Detached(_) | HostAgentState::Attached(_) => resource::Status::Running,
333 HostAgentState::Draining => resource::Status::Stopping,
334 HostAgentState::Shutdown => resource::Status::Stopped,
335 }
336 }
337
338 fn host(&self) -> Option<&HostAgentMode> {
339 match &self.state {
340 HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
341 _ => None,
342 }
343 }
344
345 fn host_mut(&mut self) -> Option<&mut HostAgentMode> {
346 match &mut self.state {
347 HostAgentState::Detached(h) | HostAgentState::Attached(h) => Some(h),
348 _ => None,
349 }
350 }
351
352 async fn drain(
358 &mut self,
359 cx: &Context<'_, Self>,
360 timeout: std::time::Duration,
361 max_in_flight: usize,
362 ) {
363 if let Some(host_mode) = self.host_mut() {
364 match host_mode {
365 HostAgentMode::Process { host, .. } => {
366 let summary = host
367 .terminate_children(cx, timeout, max_in_flight.clamp(1, 256), "stop host")
368 .await;
369 tracing::info!(?summary, "terminated children on host");
370 }
371 HostAgentMode::Local(host) => {
372 let summary = host
373 .terminate_children(cx, timeout, max_in_flight, "stop host")
374 .await;
375 tracing::info!(?summary, "terminated children on local host");
376 }
377 }
378 }
379 self.created.clear();
380 }
381
382 async fn drain_by_mesh_name(
386 &mut self,
387 cx: &Context<'_, Self>,
388 timeout: std::time::Duration,
389 filter: Option<&crate::Name>,
390 ) {
391 let matching_names: Vec<crate::Name> = self
392 .created
393 .iter()
394 .filter(|(_, state)| state.host_mesh_name.as_ref() == filter)
395 .map(|(name, _)| name.clone())
396 .collect();
397
398 if let Some(host_mode) = self.host() {
399 for name in &matching_names {
400 if let Some(ProcCreationState {
401 created: Ok((proc_id, _)),
402 ..
403 }) = self.created.get(name)
404 {
405 match host_mode {
406 HostAgentMode::Process { host, .. } => {
407 let _ = host
408 .terminate_proc(cx, proc_id, timeout, "selective drain")
409 .await;
410 }
411 HostAgentMode::Local(host) => {
412 let _ = host
413 .terminate_proc(cx, proc_id, timeout, "selective drain")
414 .await;
415 }
416 }
417 }
418 }
419 }
420
421 for name in &matching_names {
423 self.created.remove(name);
424 }
425
426 tracing::info!(
427 count = matching_names.len(),
428 filter = ?filter,
429 "selectively drained procs",
430 );
431 }
432
433 fn publish_introspect_properties(&self, cx: &Instance<Self>) {
437 let host = match self.host() {
438 Some(h) => h,
439 None => return, };
441
442 let addr = host.addr().to_string();
443 let mut children: Vec<hyperactor::introspect::IntrospectRef> = Vec::new();
444 let system_children: Vec<crate::introspect::NodeRef> = Vec::new(); children.push(hyperactor::introspect::IntrospectRef::Proc(
450 host.system_proc().proc_id().clone(),
451 ));
452 children.push(hyperactor::introspect::IntrospectRef::Proc(
453 host.local_proc().proc_id().clone(),
454 ));
455
456 for state in self.created.values() {
458 if let Ok((proc_id, _agent_ref)) = &state.created {
459 children.push(hyperactor::introspect::IntrospectRef::Proc(proc_id.clone()));
460 }
461 }
462
463 let num_procs = children.len();
464
465 let mut attrs = hyperactor_config::Attrs::new();
466 attrs.set(crate::introspect::NODE_TYPE, "host".to_string());
467 attrs.set(crate::introspect::ADDR, addr);
468 attrs.set(crate::introspect::NUM_PROCS, num_procs);
469 attrs.set(hyperactor::introspect::CHILDREN, children);
470 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
471 cx.publish_attrs(attrs);
472 }
473}
474
475#[async_trait]
476impl Actor for HostAgent {
477 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
478 this.bind::<Self>();
481 match self.host_mut().unwrap() {
482 HostAgentMode::Process { host, .. } => {
483 self.mailbox_handle = host.serve();
484 let (directory, file) = hyperactor_telemetry::log_file_path(
485 hyperactor_telemetry::env::Env::current(),
486 None,
487 )
488 .unwrap();
489 eprintln!(
490 "Monarch internal logs are being written to {}/{}.log; execution id {}",
491 directory,
492 file,
493 hyperactor_telemetry::env::execution_id(),
494 );
495 }
496 HostAgentMode::Local(host) => {
497 host.serve();
498 }
499 };
500 this.set_system();
501 self.publish_introspect_properties(this);
502
503 let host = self.host().expect("host present");
506 let system_proc = host.system_proc().clone();
507 let local_proc = host.local_proc().clone();
508 let self_id = this.self_id().clone();
509 this.set_query_child_handler(move |child_ref| {
510 use hyperactor::introspect::IntrospectResult;
511
512 let proc = match child_ref {
513 hyperactor::reference::Reference::Proc(proc_id) => {
514 if *proc_id == *system_proc.proc_id() {
515 Some((&system_proc, SERVICE_PROC_NAME))
516 } else if *proc_id == *local_proc.proc_id() {
517 Some((&local_proc, LOCAL_PROC_NAME))
518 } else {
519 None
520 }
521 }
522 _ => None,
523 };
524
525 match proc {
526 Some((proc, label)) => {
527 let all_keys = proc.all_instance_keys();
541 let mut actors: Vec<hyperactor::introspect::IntrospectRef> =
542 Vec::with_capacity(all_keys.len());
543 let mut system_actors: Vec<crate::introspect::NodeRef> = Vec::new();
544 for id in all_keys {
545 if proc.get_instance(&id).is_some_and(|cell| cell.is_system()) {
546 system_actors.push(crate::introspect::NodeRef::Actor(id.clone()));
547 }
548 actors.push(hyperactor::introspect::IntrospectRef::Actor(id));
549 }
550 let mut attrs = hyperactor_config::Attrs::new();
551 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
552 attrs.set(crate::introspect::PROC_NAME, label.to_string());
553 attrs.set(crate::introspect::NUM_ACTORS, actors.len());
554 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_actors.clone());
555 let attrs_json =
556 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
557
558 IntrospectResult {
559 identity: hyperactor::introspect::IntrospectRef::Proc(
560 proc.proc_id().clone(),
561 ),
562 attrs: attrs_json,
563 children: actors,
564 parent: Some(hyperactor::introspect::IntrospectRef::Actor(
565 self_id.clone(),
566 )),
567 as_of: std::time::SystemTime::now(),
568 }
569 }
570 None => {
571 let mut error_attrs = hyperactor_config::Attrs::new();
572 error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
573 error_attrs.set(
574 hyperactor::introspect::ERROR_MESSAGE,
575 format!("child {} not found", child_ref),
576 );
577 let identity = match child_ref {
578 hyperactor::reference::Reference::Proc(id) => {
579 hyperactor::introspect::IntrospectRef::Proc(id.clone())
580 }
581 hyperactor::reference::Reference::Actor(id) => {
582 hyperactor::introspect::IntrospectRef::Actor(id.clone())
583 }
584 hyperactor::reference::Reference::Port(id) => {
585 hyperactor::introspect::IntrospectRef::Actor(id.actor_id().clone())
586 }
587 };
588 IntrospectResult {
589 identity,
590 attrs: serde_json::to_string(&error_attrs)
591 .unwrap_or_else(|_| "{}".to_string()),
592 children: Vec::new(),
593 parent: None,
594 as_of: std::time::SystemTime::now(),
595 }
596 }
597 }
598 });
599
600 self.proc_status_port = Some(this.port::<ProcStatusChanged>());
601
602 Ok(())
603 }
604}
605
606impl fmt::Debug for HostAgent {
607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608 f.debug_struct("HostAgent")
609 .field("host", &"..")
610 .field("created", &self.created)
611 .finish()
612 }
613}
614
615#[async_trait]
616impl Handler<resource::CreateOrUpdate<ProcSpec>> for HostAgent {
617 #[tracing::instrument("HostAgent::CreateOrUpdate", level = "info", skip_all, fields(name=%create_or_update.name))]
618 async fn handle(
619 &mut self,
620 cx: &Context<Self>,
621 create_or_update: resource::CreateOrUpdate<ProcSpec>,
622 ) -> anyhow::Result<()> {
623 if self.created.contains_key(&create_or_update.name) {
624 return Ok(());
626 }
627
628 let host = match self.host_mut() {
629 Some(h) => h,
630 None => {
631 tracing::warn!(
632 name = %create_or_update.name,
633 "ignoring CreateOrUpdate: HostAgent has already shut down"
634 );
635 return Ok(());
636 }
637 };
638 let created = match host {
639 HostAgentMode::Process { host, .. } => {
640 host.spawn(
641 create_or_update.name.clone().to_string(),
642 BootstrapProcConfig {
643 create_rank: create_or_update.rank.unwrap(),
644 client_config_override: create_or_update
645 .spec
646 .client_config_override
647 .clone(),
648 proc_bind: create_or_update.spec.proc_bind.clone(),
649 bootstrap_command: create_or_update.spec.bootstrap_command.clone(),
650 },
651 )
652 .await
653 }
654 HostAgentMode::Local(host) => {
655 host.spawn(create_or_update.name.clone().to_string(), ())
656 .await
657 }
658 };
659
660 let rank = create_or_update.rank.unwrap();
661
662 if let Err(e) = &created {
663 tracing::error!("failed to spawn proc {}: {}", create_or_update.name, e);
664 }
665 let was_empty = self.created.is_empty();
666 self.created.insert(
667 create_or_update.name.clone(),
668 ProcCreationState {
669 rank,
670 host_mesh_name: create_or_update.spec.host_mesh_name.clone(),
671 created,
672 },
673 );
674
675 if was_empty {
677 if let HostAgentState::Detached(_) = &self.state {
678 let host = match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
679 HostAgentState::Detached(h) => h,
680 _ => unreachable!(),
681 };
682 self.state = HostAgentState::Attached(host);
683 }
684 }
685
686 let proc_id = self
692 .created
693 .get(&create_or_update.name)
694 .and_then(|s| s.created.as_ref().ok())
695 .map(|(pid, _)| pid.clone());
696
697 if let Some(waiters) = self.pending_proc_waiters.get_mut(&create_or_update.name) {
698 for (_, waiter_rank, _) in waiters.iter_mut() {
699 if *waiter_rank == usize::MAX {
700 *waiter_rank = rank;
701 }
702 }
703 }
704
705 if self
707 .pending_proc_waiters
708 .contains_key(&create_or_update.name)
709 {
710 if let Some(proc_id) = &proc_id {
711 self.start_watch_bridge(&create_or_update.name, proc_id)
712 .await;
713 }
714 self.notify_proc_status_changed(&create_or_update.name);
715 }
716
717 self.publish_introspect_properties(cx);
718 Ok(())
719 }
720}
721
722#[async_trait]
723impl Handler<resource::Stop> for HostAgent {
724 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
725 tracing::info!(
726 name = "HostMeshAgentStatus",
727 proc_name = %message.name,
728 reason = %message.reason,
729 "stopping proc"
730 );
731 let host = match self.host() {
732 Some(h) => h,
733 None => {
734 tracing::debug!(
736 proc_name = %message.name,
737 "ignoring Stop: HostAgent has already shut down"
738 );
739 return Ok(());
740 }
741 };
742 let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
743
744 if let Some(ProcCreationState {
745 created: Ok((proc_id, _)),
746 ..
747 }) = self.created.get(&message.name)
748 {
749 host.request_stop(cx, proc_id, timeout, &message.reason)
750 .await;
751 }
752
753 self.notify_proc_status_changed(&message.name);
755
756 self.publish_introspect_properties(cx);
757 Ok(())
758 }
759}
760
761#[async_trait]
762impl Handler<resource::GetRankStatus> for HostAgent {
763 async fn handle(
764 &mut self,
765 cx: &Context<Self>,
766 get_rank_status: resource::GetRankStatus,
767 ) -> anyhow::Result<()> {
768 use crate::StatusOverlay;
769 use crate::resource::Status;
770
771 let (rank, status) = match self.created.get(&get_rank_status.name) {
772 Some(ProcCreationState {
773 rank,
774 created: Ok((proc_id, _mesh_agent)),
775 ..
776 }) => {
777 let raw_status = match self.host() {
778 Some(host) => host.proc_status(proc_id).await.0,
779 None => resource::Status::Unknown,
780 };
781 (*rank, raw_status.clamp_min(self.min_proc_status()))
782 }
783 Some(ProcCreationState {
784 rank,
785 created: Err(e),
786 ..
787 }) => (*rank, Status::Failed(e.to_string())),
788 None => (usize::MAX, Status::NotExist),
789 };
790
791 let overlay = if rank == usize::MAX {
792 StatusOverlay::new()
793 } else {
794 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
795 .expect("valid single-run overlay")
796 };
797 let result = get_rank_status.reply.send(cx, overlay);
798 if let Err(e) = result {
802 tracing::warn!(
803 actor = %cx.self_id(),
804 "failed to send GetRankStatus reply to {} due to error: {}",
805 get_rank_status.reply.port_id().actor_id(),
806 e
807 );
808 }
809 Ok(())
810 }
811}
812
813#[async_trait]
814impl Handler<resource::WaitRankStatus> for HostAgent {
815 async fn handle(
816 &mut self,
817 cx: &Context<Self>,
818 msg: resource::WaitRankStatus,
819 ) -> anyhow::Result<()> {
820 use crate::StatusOverlay;
821 use crate::resource::Status;
822
823 match self.created.get(&msg.name) {
824 Some(ProcCreationState {
825 rank,
826 created: Ok((proc_id, _)),
827 ..
828 }) => {
829 let rank = *rank;
830 let status = match self.host() {
831 Some(host) => host.proc_status(proc_id).await.0,
832 None => Status::Stopped,
833 };
834
835 if status >= msg.min_status {
837 let overlay = StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
838 .expect("valid single-run overlay");
839 let _ = msg.reply.send(cx, overlay);
840 return Ok(());
841 }
842
843 self.pending_proc_waiters
845 .entry(msg.name.clone())
846 .or_default()
847 .push((msg.min_status, rank, msg.reply));
848
849 let proc_id = proc_id.clone();
850 self.start_watch_bridge(&msg.name, &proc_id).await;
851 }
852 Some(ProcCreationState {
853 rank,
854 created: Err(e),
855 ..
856 }) => {
857 let overlay = StatusOverlay::try_from_runs(vec![(
859 *rank..(*rank + 1),
860 Status::Failed(e.to_string()),
861 )])
862 .expect("valid single-run overlay");
863 let _ = msg.reply.send(cx, overlay);
864 }
865 None => {
866 self.pending_proc_waiters
870 .entry(msg.name.clone())
871 .or_default()
872 .push((msg.min_status, usize::MAX, msg.reply));
873 }
874 }
875
876 Ok(())
877 }
878}
879
880#[async_trait]
881impl Handler<ProcStatusChanged> for HostAgent {
882 async fn handle(&mut self, cx: &Context<Self>, msg: ProcStatusChanged) -> anyhow::Result<()> {
883 use crate::StatusOverlay;
884 use crate::resource::Status;
885
886 let status = match self.created.get(&msg.name) {
887 Some(ProcCreationState {
888 created: Ok((proc_id, _)),
889 ..
890 }) => match self.host() {
891 Some(host) => host.proc_status(proc_id).await.0,
892 None => Status::Stopped,
893 },
894 Some(ProcCreationState {
895 created: Err(_), ..
896 }) => {
897 return Ok(());
899 }
900 None => {
901 return Ok(());
903 }
904 };
905
906 let Some(waiters) = self.pending_proc_waiters.get_mut(&msg.name) else {
907 return Ok(());
908 };
909
910 let remaining = std::mem::take(waiters);
911 for (min_status, rank, reply) in remaining {
912 if status >= min_status {
913 let overlay =
914 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
915 .expect("valid single-run overlay");
916 let _ = reply.send(cx, overlay);
917 } else {
918 waiters.push((min_status, rank, reply));
919 }
920 }
921
922 if waiters.is_empty() {
923 self.pending_proc_waiters.remove(&msg.name);
924 }
925
926 Ok(())
927 }
928}
929
930impl HostAgent {
931 fn notify_proc_status_changed(&self, name: &Name) {
933 if let Some(port) = &self.proc_status_port {
934 let client = Instance::<()>::self_client();
935 let _ = port.send(client, ProcStatusChanged { name: name.clone() });
936 }
937 }
938
939 async fn start_watch_bridge(&mut self, name: &Name, proc_id: &hyperactor_reference::ProcId) {
942 if self.watching.contains(name) {
943 return;
944 }
945 self.watching.insert(name.clone());
946
947 let port = match &self.proc_status_port {
948 Some(p) => p.clone(),
949 None => return,
950 };
951
952 match self.host() {
953 Some(HostAgentMode::Process { host, .. }) => {
954 if let Some(rx) = host.manager().watch(proc_id).await {
955 start_proc_watch(port, rx, name.clone(), |s| s.clone().into());
956 }
957 }
958 Some(HostAgentMode::Local(host)) => {
959 if let Some(rx) = host.manager().watch(proc_id).await {
960 start_proc_watch(port, rx, name.clone(), |s| (*s).into());
961 }
962 }
963 None => {}
964 }
965 }
966}
967
968fn start_proc_watch<S>(
971 port: PortHandle<ProcStatusChanged>,
972 mut rx: tokio::sync::watch::Receiver<S>,
973 name: Name,
974 to_status: impl Fn(&S) -> resource::Status + Send + 'static,
975) where
976 S: Send + Sync + 'static,
977{
978 let client = Instance::<()>::self_client();
981 tokio::spawn(async move {
982 loop {
983 match rx.changed().await {
984 Ok(()) => {
985 let status = to_status(&*rx.borrow());
986 let terminated = status.is_terminated();
987 let _ = port.send(client, ProcStatusChanged { name: name.clone() });
988 if terminated {
989 return;
990 }
991 }
992 Err(_) => {
993 let _ = port.send(client, ProcStatusChanged { name: name.clone() });
994 return;
995 }
996 }
997 }
998 });
999}
1000
1001#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1002pub struct ShutdownHost {
1003 pub timeout: std::time::Duration,
1006 pub max_in_flight: usize,
1008 #[reply]
1010 pub ack: hyperactor::reference::PortRef<()>,
1011}
1012wirevalue::register_type!(ShutdownHost);
1013
1014#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
1022pub struct DrainHost {
1023 pub timeout: std::time::Duration,
1024 pub max_in_flight: usize,
1025 pub host_mesh_name: Option<crate::Name>,
1026 #[reply]
1027 pub ack: hyperactor::reference::PortRef<()>,
1028}
1029wirevalue::register_type!(DrainHost);
1030
1031#[async_trait]
1032impl Handler<DrainHost> for HostAgent {
1033 async fn handle(&mut self, cx: &Context<Self>, msg: DrainHost) -> anyhow::Result<()> {
1034 if msg.host_mesh_name.is_some() {
1035 self.drain_by_mesh_name(cx, msg.timeout, msg.host_mesh_name.as_ref())
1037 .await;
1038 msg.ack.send(cx, ())?;
1039 return Ok(());
1040 }
1041
1042 let host = match std::mem::replace(&mut self.state, HostAgentState::Draining) {
1044 HostAgentState::Attached(h) => h,
1045 other @ (HostAgentState::Detached(_) | HostAgentState::Draining) => {
1046 self.state = other;
1048 msg.ack.send(cx, ())?;
1049 return Ok(());
1050 }
1051 HostAgentState::Shutdown => {
1052 self.state = HostAgentState::Shutdown;
1053 msg.ack.send(cx, ())?;
1054 return Ok(());
1055 }
1056 };
1057
1058 let done_port = cx.port::<DrainComplete>();
1067
1068 cx.spawn_with_name(
1069 "drain_worker",
1070 DrainWorker {
1071 host: Some(host),
1072 timeout: msg.timeout,
1073 max_in_flight: msg.max_in_flight,
1074 ack: Some(msg.ack),
1075 done_notify: done_port,
1076 },
1077 )?;
1078
1079 Ok(())
1080 }
1081}
1082
1083#[async_trait]
1084impl Handler<DrainComplete> for HostAgent {
1085 async fn handle(&mut self, cx: &Context<Self>, msg: DrainComplete) -> anyhow::Result<()> {
1086 self.state = HostAgentState::Detached(msg.host);
1087 self.created.clear();
1088 msg.ack.send(cx, ())?;
1089 Ok(())
1090 }
1091}
1092
1093#[async_trait]
1094impl Handler<ShutdownHost> for HostAgent {
1095 async fn handle(&mut self, cx: &Context<Self>, msg: ShutdownHost) -> anyhow::Result<()> {
1096 if !self.created.is_empty() {
1103 self.drain(cx, msg.timeout, msg.max_in_flight).await;
1104 }
1105
1106 msg.ack.send(cx, ())?;
1109
1110 match std::mem::replace(&mut self.state, HostAgentState::Shutdown) {
1113 HostAgentState::Detached(HostAgentMode::Process {
1114 shutdown_tx: Some(tx),
1115 ..
1116 })
1117 | HostAgentState::Attached(HostAgentMode::Process {
1118 shutdown_tx: Some(tx),
1119 ..
1120 }) => {
1121 tracing::info!(
1122 proc_id = %cx.self_id().proc_id(),
1123 actor_id = %cx.self_id(),
1124 "host is shut down, sending mailbox handle to bootstrap for draining"
1125 );
1126 if let Some(handle) = self.mailbox_handle.take() {
1127 let _ = tx.send(handle);
1128 }
1129 }
1130 _ => {}
1131 }
1132
1133 Ok(())
1134 }
1135}
1136
1137#[derive(Debug, Clone, PartialEq, Eq, Named, Serialize, Deserialize)]
1138pub struct ProcState {
1139 pub proc_id: hyperactor_reference::ProcId,
1140 pub create_rank: usize,
1141 pub mesh_agent: hyperactor_reference::ActorRef<ProcAgent>,
1142 pub bootstrap_command: Option<BootstrapCommand>,
1143 pub proc_status: Option<bootstrap::ProcStatus>,
1144}
1145wirevalue::register_type!(ProcState);
1146
1147#[async_trait]
1148impl Handler<resource::GetState<ProcState>> for HostAgent {
1149 async fn handle(
1150 &mut self,
1151 cx: &Context<Self>,
1152 get_state: resource::GetState<ProcState>,
1153 ) -> anyhow::Result<()> {
1154 let state = match self.created.get(&get_state.name) {
1155 Some(ProcCreationState {
1156 rank,
1157 created: Ok((proc_id, mesh_agent)),
1158 ..
1159 }) => {
1160 let (raw_status, proc_status, bootstrap_command) = match self.host() {
1161 Some(host) => {
1162 let (status, proc_status) = host.proc_status(proc_id).await;
1163 (status, proc_status, host.bootstrap_command())
1164 }
1165 None => (resource::Status::Unknown, None, None),
1166 };
1167 let status = raw_status.clamp_min(self.min_proc_status());
1168 resource::State {
1169 name: get_state.name.clone(),
1170 status,
1171 state: Some(ProcState {
1172 proc_id: proc_id.clone(),
1173 create_rank: *rank,
1174 mesh_agent: mesh_agent.clone(),
1175 bootstrap_command,
1176 proc_status,
1177 }),
1178 generation: 0,
1179 timestamp: std::time::SystemTime::now(),
1180 }
1181 }
1182 Some(ProcCreationState {
1183 created: Err(e), ..
1184 }) => resource::State {
1185 name: get_state.name.clone(),
1186 status: resource::Status::Failed(e.to_string()),
1187 state: None,
1188 generation: 0,
1189 timestamp: std::time::SystemTime::now(),
1190 },
1191 None => resource::State {
1192 name: get_state.name.clone(),
1193 status: resource::Status::NotExist,
1194 state: None,
1195 generation: 0,
1196 timestamp: std::time::SystemTime::now(),
1197 },
1198 };
1199
1200 let result = get_state.reply.send(cx, state);
1201 if let Err(e) = result {
1205 tracing::warn!(
1206 actor = %cx.self_id(),
1207 "failed to send GetState reply to {} due to error: {}",
1208 get_state.reply.port_id().actor_id(),
1209 e
1210 );
1211 }
1212 Ok(())
1213 }
1214}
1215
1216#[async_trait]
1217impl Handler<resource::List> for HostAgent {
1218 async fn handle(&mut self, cx: &Context<Self>, list: resource::List) -> anyhow::Result<()> {
1219 list.reply
1220 .send(cx, self.created.keys().cloned().collect())?;
1221 Ok(())
1222 }
1223}
1224
1225#[derive(Debug, Named, Handler, RefClient, HandleClient, Serialize, Deserialize)]
1236pub struct SetClientConfig {
1237 pub attrs: Attrs,
1238 #[reply]
1239 pub done: hyperactor_reference::PortRef<()>,
1240}
1241wirevalue::register_type!(SetClientConfig);
1242
1243#[async_trait]
1244impl Handler<SetClientConfig> for HostAgent {
1245 async fn handle(&mut self, cx: &Context<Self>, msg: SetClientConfig) -> anyhow::Result<()> {
1246 hyperactor_config::global::set(
1250 hyperactor_config::global::Source::ClientOverride,
1251 msg.attrs,
1252 );
1253 tracing::debug!("installed client config override on host agent");
1254 msg.done.send(cx, ())?;
1255 Ok(())
1256 }
1257}
1258
1259#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1268pub struct GetLocalProc {
1269 #[reply]
1270 pub proc_mesh_agent: PortHandle<ActorHandle<ProcAgent>>,
1271}
1272
1273#[async_trait]
1274impl Handler<GetLocalProc> for HostAgent {
1275 async fn handle(
1276 &mut self,
1277 cx: &Context<Self>,
1278 GetLocalProc { proc_mesh_agent }: GetLocalProc,
1279 ) -> anyhow::Result<()> {
1280 let host = self
1281 .host()
1282 .ok_or_else(|| anyhow::anyhow!("HostAgent has already shut down"))?;
1283 let agent = self
1284 .local_mesh_agent
1285 .get_or_init(|| ProcAgent::boot_v1(host.local_proc().clone(), None));
1286
1287 match agent {
1288 Err(e) => anyhow::bail!("error booting local proc: {}", e),
1289 Ok(agent) => proc_mesh_agent.send(cx, agent.clone())?,
1290 };
1291
1292 Ok(())
1293 }
1294}
1295
1296#[async_trait]
1297impl Handler<PySpyDump> for HostAgent {
1298 async fn handle(
1299 &mut self,
1300 cx: &Context<Self>,
1301 message: PySpyDump,
1302 ) -> Result<(), anyhow::Error> {
1303 PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
1304 }
1305}
1306
1307#[async_trait]
1308impl Handler<ConfigDump> for HostAgent {
1309 async fn handle(
1310 &mut self,
1311 cx: &Context<Self>,
1312 message: ConfigDump,
1313 ) -> Result<(), anyhow::Error> {
1314 let entries = hyperactor_config::global::config_entries();
1315 if let Err(e) = message.result.send(cx, ConfigDumpResult { entries }) {
1318 tracing::warn!("HostAgent: ConfigDump reply undeliverable (caller timed out): {e}",);
1319 }
1320 Ok(())
1321 }
1322}
1323
1324#[derive(Debug)]
1329#[hyperactor::export(
1330 spawn = true,
1331 handlers=[GetHostMeshAgent]
1332)]
1333pub(crate) struct HostMeshAgentProcMeshTrampoline {
1334 host_mesh_agent: ActorHandle<HostAgent>,
1335 reply_port: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
1336}
1337
1338#[async_trait]
1339impl Actor for HostMeshAgentProcMeshTrampoline {
1340 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
1341 self.reply_port.send(this, self.host_mesh_agent.bind())?;
1342 Ok(())
1343 }
1344}
1345
1346#[async_trait]
1347impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline {
1348 type Params = (
1349 ChannelTransport,
1350 hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
1351 Option<BootstrapCommand>,
1352 bool, );
1354
1355 async fn new(
1356 (transport, reply_port, command, local): Self::Params,
1357 _environment: Flattrs,
1358 ) -> anyhow::Result<Self> {
1359 let host = if local {
1360 let spawn: ProcManagerSpawnFn =
1361 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
1362 let manager = LocalProcManager::new(spawn);
1363 let host = Host::new(manager, transport.any()).await?;
1364 HostAgentMode::Local(host)
1365 } else {
1366 let command = match command {
1367 Some(command) => command,
1368 None => BootstrapCommand::current()?,
1369 };
1370 tracing::info!("booting host with proc command {:?}", command);
1371 let manager = BootstrapProcManager::new(command).unwrap();
1372 let host = Host::new(manager, transport.any()).await?;
1373 HostAgentMode::Process {
1374 host,
1375 shutdown_tx: None,
1376 }
1377 };
1378
1379 let system_proc = host.system_proc().clone();
1380 let host_mesh_agent =
1381 system_proc.spawn(HOST_MESH_AGENT_ACTOR_NAME, HostAgent::new(host))?;
1382
1383 Ok(Self {
1384 host_mesh_agent,
1385 reply_port,
1386 })
1387 }
1388}
1389
1390#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient)]
1391pub struct GetHostMeshAgent {
1392 #[reply]
1393 pub host_mesh_agent: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
1394}
1395wirevalue::register_type!(GetHostMeshAgent);
1396
1397#[async_trait]
1398impl Handler<GetHostMeshAgent> for HostMeshAgentProcMeshTrampoline {
1399 async fn handle(
1400 &mut self,
1401 cx: &Context<Self>,
1402 get_host_mesh_agent: GetHostMeshAgent,
1403 ) -> anyhow::Result<()> {
1404 get_host_mesh_agent
1405 .host_mesh_agent
1406 .send(cx, self.host_mesh_agent.bind())?;
1407 Ok(())
1408 }
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413 use std::assert_matches::assert_matches;
1414
1415 use hyperactor::Proc;
1416 use hyperactor::channel::ChannelTransport;
1417
1418 use super::*;
1419 use crate::bootstrap::ProcStatus;
1420 use crate::resource::CreateOrUpdateClient;
1421 use crate::resource::GetStateClient;
1422 use crate::resource::StopClient;
1423 use crate::resource::WaitRankStatusClient;
1424
1425 #[tokio::test]
1426 #[cfg(fbcode_build)]
1427 async fn test_basic() {
1428 let host = Host::new(
1429 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1430 ChannelTransport::Unix.any(),
1431 )
1432 .await
1433 .unwrap();
1434
1435 let host_addr = host.addr().clone();
1436 let system_proc = host.system_proc().clone();
1437 let host_agent = system_proc
1438 .spawn(
1439 HOST_MESH_AGENT_ACTOR_NAME,
1440 HostAgent::new(HostAgentMode::Process {
1441 host,
1442 shutdown_tx: None,
1443 }),
1444 )
1445 .unwrap();
1446
1447 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1448 let (client, _client_handle) = client_proc.instance("client").unwrap();
1449
1450 let name = Name::new("proc1").unwrap();
1451
1452 host_agent
1455 .create_or_update(
1456 &client,
1457 name.clone(),
1458 resource::Rank::new(0),
1459 ProcSpec::default(),
1460 )
1461 .await
1462 .unwrap();
1463 assert_matches!(
1464 host_agent.get_state(&client, name.clone()).await.unwrap(),
1465 resource::State {
1466 name: resource_name,
1467 status: resource::Status::Running,
1468 state: Some(ProcState {
1469 proc_id,
1471 mesh_agent,
1474 bootstrap_command,
1475 proc_status: Some(ProcStatus::Ready { started_at: _, addr: _, agent: proc_status_mesh_agent}),
1476 ..
1477 }),
1478 ..
1479 } if name == resource_name
1480 && proc_id == hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string())
1481 && mesh_agent == hyperactor_reference::ActorRef::attest(hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string()).actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0)) && bootstrap_command == Some(BootstrapCommand::test())
1482 && mesh_agent == proc_status_mesh_agent
1483 );
1484 }
1485
1486 #[tokio::test]
1488 #[cfg(fbcode_build)]
1489 async fn test_wait_rank_status_already_running() {
1490 let host = Host::new(
1491 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1492 ChannelTransport::Unix.any(),
1493 )
1494 .await
1495 .unwrap();
1496
1497 let system_proc = host.system_proc().clone();
1498 let host_agent = system_proc
1499 .spawn(
1500 HOST_MESH_AGENT_ACTOR_NAME,
1501 HostAgent::new(HostAgentMode::Process {
1502 host,
1503 shutdown_tx: None,
1504 }),
1505 )
1506 .unwrap();
1507
1508 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1509 let (client, _client_handle) = client_proc.instance("client").unwrap();
1510
1511 let name = Name::new("proc1").unwrap();
1512 host_agent
1513 .create_or_update(
1514 &client,
1515 name.clone(),
1516 resource::Rank::new(0),
1517 ProcSpec::default(),
1518 )
1519 .await
1520 .unwrap();
1521
1522 let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1524 host_agent
1525 .wait_rank_status(&client, name, resource::Status::Running, port.bind())
1526 .await
1527 .unwrap();
1528
1529 let overlay = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1530 .await
1531 .expect("reply timed out")
1532 .expect("reply channel closed");
1533 assert!(!overlay.is_empty(), "expected non-empty overlay");
1534 }
1535
1536 #[tokio::test]
1539 #[cfg(fbcode_build)]
1540 async fn test_wait_rank_status_stop() {
1541 let host = Host::new(
1542 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1543 ChannelTransport::Unix.any(),
1544 )
1545 .await
1546 .unwrap();
1547
1548 let system_proc = host.system_proc().clone();
1549 let host_agent = system_proc
1550 .spawn(
1551 HOST_MESH_AGENT_ACTOR_NAME,
1552 HostAgent::new(HostAgentMode::Process {
1553 host,
1554 shutdown_tx: None,
1555 }),
1556 )
1557 .unwrap();
1558
1559 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1560 let (client, _client_handle) = client_proc.instance("client").unwrap();
1561
1562 let name = Name::new("proc1").unwrap();
1563 host_agent
1564 .create_or_update(
1565 &client,
1566 name.clone(),
1567 resource::Rank::new(0),
1568 ProcSpec::default(),
1569 )
1570 .await
1571 .unwrap();
1572
1573 let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1575 host_agent
1576 .wait_rank_status(
1577 &client,
1578 name.clone(),
1579 resource::Status::Stopped,
1580 port.bind(),
1581 )
1582 .await
1583 .unwrap();
1584
1585 host_agent
1587 .stop(&client, name, "test".to_string())
1588 .await
1589 .unwrap();
1590
1591 let overlay = tokio::time::timeout(Duration::from_secs(30), rx.recv())
1593 .await
1594 .expect("reply timed out — proc did not reach Stopped")
1595 .expect("reply channel closed");
1596 assert!(!overlay.is_empty(), "expected non-empty overlay");
1597 }
1598
1599 #[tokio::test]
1602 #[cfg(fbcode_build)]
1603 async fn test_wait_rank_status_before_proc_exists() {
1604 let host = Host::new(
1605 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1606 ChannelTransport::Unix.any(),
1607 )
1608 .await
1609 .unwrap();
1610
1611 let system_proc = host.system_proc().clone();
1612 let host_agent = system_proc
1613 .spawn(
1614 HOST_MESH_AGENT_ACTOR_NAME,
1615 HostAgent::new(HostAgentMode::Process {
1616 host,
1617 shutdown_tx: None,
1618 }),
1619 )
1620 .unwrap();
1621
1622 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1623 let (client, _client_handle) = client_proc.instance("client").unwrap();
1624
1625 let name = Name::new("proc1").unwrap();
1626
1627 let (port, mut rx) = client.open_port::<crate::StatusOverlay>();
1629 host_agent
1630 .wait_rank_status(
1631 &client,
1632 name.clone(),
1633 resource::Status::Running,
1634 port.bind(),
1635 )
1636 .await
1637 .unwrap();
1638
1639 host_agent
1642 .create_or_update(&client, name, resource::Rank::new(0), ProcSpec::default())
1643 .await
1644 .unwrap();
1645
1646 let overlay = tokio::time::timeout(Duration::from_secs(10), rx.recv())
1647 .await
1648 .expect("reply timed out — waiter was not flushed after CreateOrUpdate")
1649 .expect("reply channel closed");
1650 assert!(!overlay.is_empty(), "expected non-empty overlay");
1651 }
1652
1653 #[tokio::test]
1656 #[cfg(fbcode_build)]
1657 async fn test_drain_scoped_to_host_mesh_name() {
1658 let host = Host::new(
1659 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1660 ChannelTransport::Unix.any(),
1661 )
1662 .await
1663 .unwrap();
1664
1665 let system_proc = host.system_proc().clone();
1666 let host_agent = system_proc
1667 .spawn(
1668 HOST_MESH_AGENT_ACTOR_NAME,
1669 HostAgent::new(HostAgentMode::Process {
1670 host,
1671 shutdown_tx: None,
1672 }),
1673 )
1674 .unwrap();
1675
1676 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1677 let (client, _client_handle) = client_proc.instance("client").unwrap();
1678
1679 let mesh_a = crate::Name::new("mesh_a").unwrap();
1680 let mesh_b = crate::Name::new("mesh_b").unwrap();
1681 let proc_a = crate::Name::new("proc_a").unwrap();
1682 let proc_b = crate::Name::new("proc_b").unwrap();
1683
1684 let mut spec_a = ProcSpec::default();
1686 spec_a.host_mesh_name = Some(mesh_a.clone());
1687 host_agent
1688 .create_or_update(&client, proc_a.clone(), resource::Rank::new(0), spec_a)
1689 .await
1690 .unwrap();
1691
1692 let mut spec_b = ProcSpec::default();
1694 spec_b.host_mesh_name = Some(mesh_b.clone());
1695 host_agent
1696 .create_or_update(&client, proc_b.clone(), resource::Rank::new(1), spec_b)
1697 .await
1698 .unwrap();
1699
1700 assert_matches!(
1702 host_agent.get_state(&client, proc_a.clone()).await.unwrap(),
1703 resource::State {
1704 status: resource::Status::Running,
1705 ..
1706 }
1707 );
1708 assert_matches!(
1709 host_agent.get_state(&client, proc_b.clone()).await.unwrap(),
1710 resource::State {
1711 status: resource::Status::Running,
1712 ..
1713 }
1714 );
1715
1716 host_agent
1718 .drain_host(&client, Duration::from_secs(5), 16, Some(mesh_a.clone()))
1719 .await
1720 .unwrap();
1721
1722 assert_matches!(
1724 host_agent.get_state(&client, proc_a.clone()).await.unwrap(),
1725 resource::State {
1726 status: resource::Status::NotExist,
1727 ..
1728 }
1729 );
1730
1731 assert_matches!(
1733 host_agent.get_state(&client, proc_b.clone()).await.unwrap(),
1734 resource::State {
1735 status: resource::Status::Running,
1736 ..
1737 }
1738 );
1739 }
1740
1741 #[tokio::test]
1744 #[cfg(fbcode_build)]
1745 async fn test_drain_none_drains_all() {
1746 let host = Host::new(
1747 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
1748 ChannelTransport::Unix.any(),
1749 )
1750 .await
1751 .unwrap();
1752
1753 let system_proc = host.system_proc().clone();
1754 let host_agent = system_proc
1755 .spawn(
1756 HOST_MESH_AGENT_ACTOR_NAME,
1757 HostAgent::new(HostAgentMode::Process {
1758 host,
1759 shutdown_tx: None,
1760 }),
1761 )
1762 .unwrap();
1763
1764 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1765 let (client, _client_handle) = client_proc.instance("client").unwrap();
1766
1767 let mesh_a = crate::Name::new("mesh_a").unwrap();
1768 let mesh_b = crate::Name::new("mesh_b").unwrap();
1769 let proc_a = crate::Name::new("proc_a").unwrap();
1770 let proc_b = crate::Name::new("proc_b").unwrap();
1771
1772 let mut spec_a = ProcSpec::default();
1773 spec_a.host_mesh_name = Some(mesh_a);
1774 host_agent
1775 .create_or_update(&client, proc_a.clone(), resource::Rank::new(0), spec_a)
1776 .await
1777 .unwrap();
1778
1779 let mut spec_b = ProcSpec::default();
1780 spec_b.host_mesh_name = Some(mesh_b);
1781 host_agent
1782 .create_or_update(&client, proc_b.clone(), resource::Rank::new(1), spec_b)
1783 .await
1784 .unwrap();
1785
1786 host_agent
1788 .drain_host(&client, Duration::from_secs(5), 16, None)
1789 .await
1790 .unwrap();
1791
1792 assert_matches!(
1794 host_agent.get_state(&client, proc_a).await.unwrap(),
1795 resource::State {
1796 status: resource::Status::NotExist,
1797 ..
1798 }
1799 );
1800 assert_matches!(
1801 host_agent.get_state(&client, proc_b).await.unwrap(),
1802 resource::State {
1803 status: resource::Status::NotExist,
1804 ..
1805 }
1806 );
1807 }
1808}