1#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::fmt;
19use std::pin::Pin;
20use std::str::FromStr;
21use std::sync::OnceLock;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use hyperactor::Actor;
26use hyperactor::ActorHandle;
27use hyperactor::Context;
28use hyperactor::HandleClient;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::PortHandle;
32use hyperactor::Proc;
33use hyperactor::RefClient;
34use hyperactor::channel::ChannelTransport;
35use hyperactor::context;
36use hyperactor::context::Mailbox as _;
37use hyperactor::host::Host;
38use hyperactor::host::HostError;
39use hyperactor::host::LOCAL_PROC_NAME;
40use hyperactor::host::LocalProcManager;
41use hyperactor::host::SERVICE_PROC_NAME;
42use hyperactor::mailbox::MailboxServerHandle;
43use hyperactor::mailbox::PortSender as _;
44use hyperactor::reference as hyperactor_reference;
45use hyperactor_config::Flattrs;
46use hyperactor_config::attrs::Attrs;
47use serde::Deserialize;
48use serde::Serialize;
49use tokio::time::Duration;
50use typeuri::Named;
51
52use crate::Name;
53use crate::bootstrap;
54use crate::bootstrap::BootstrapCommand;
55use crate::bootstrap::BootstrapProcConfig;
56use crate::bootstrap::BootstrapProcManager;
57use crate::mesh_admin::MeshAdminMessageClient;
58use crate::proc_agent::ProcAgent;
59use crate::resource;
60use crate::resource::ProcSpec;
61
62#[derive(Debug, Clone, PartialEq, Eq)]
71pub(crate) struct HostId(pub hyperactor_reference::ActorId);
72
73const HOST_ID_PREFIX: &str = "host:";
75
76impl fmt::Display for HostId {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 write!(f, "{HOST_ID_PREFIX}{}", self.0)
79 }
80}
81
82impl FromStr for HostId {
83 type Err = anyhow::Error;
84
85 fn from_str(s: &str) -> Result<Self, Self::Err> {
86 let inner = s
87 .strip_prefix(HOST_ID_PREFIX)
88 .ok_or_else(|| anyhow::anyhow!("not a host reference: {}", s))?;
89 let actor_id: hyperactor_reference::ActorId = inner
90 .parse()
91 .map_err(|e| anyhow::anyhow!("invalid actor id in host ref '{}': {}", s, e))?;
92 Ok(HostId(actor_id))
93 }
94}
95
96pub(crate) type ProcManagerSpawnFuture =
97 Pin<Box<dyn Future<Output = anyhow::Result<ActorHandle<ProcAgent>>> + Send>>;
98pub(crate) type ProcManagerSpawnFn = Box<dyn Fn(Proc) -> ProcManagerSpawnFuture + Send + Sync>;
99
100#[derive(EnumAsInner)]
111pub enum HostAgentMode {
112 Process {
113 host: Host<BootstrapProcManager>,
114 shutdown_tx: Option<tokio::sync::oneshot::Sender<MailboxServerHandle>>,
118 },
119 Local(Host<LocalProcManager<ProcManagerSpawnFn>>),
120}
121
122impl HostAgentMode {
123 pub(crate) fn addr(&self) -> &hyperactor::channel::ChannelAddr {
124 #[allow(clippy::match_same_arms)]
125 match self {
126 HostAgentMode::Process { host, .. } => host.addr(),
127 HostAgentMode::Local(host) => host.addr(),
128 }
129 }
130
131 pub(crate) fn system_proc(&self) -> &Proc {
132 #[allow(clippy::match_same_arms)]
133 match self {
134 HostAgentMode::Process { host, .. } => host.system_proc(),
135 HostAgentMode::Local(host) => host.system_proc(),
136 }
137 }
138
139 pub(crate) fn local_proc(&self) -> &Proc {
140 #[allow(clippy::match_same_arms)]
141 match self {
142 HostAgentMode::Process { host, .. } => host.local_proc(),
143 HostAgentMode::Local(host) => host.local_proc(),
144 }
145 }
146
147 async fn request_stop(
151 &self,
152 cx: &impl context::Actor,
153 proc: &hyperactor_reference::ProcId,
154 timeout: Duration,
155 reason: &str,
156 ) {
157 match self {
158 HostAgentMode::Process { host, .. } => {
159 host.manager().request_stop(cx, proc, timeout, reason).await;
160 }
161 HostAgentMode::Local(host) => {
162 host.manager().request_stop(proc, timeout, reason).await;
163 }
164 }
165 }
166
167 async fn proc_status(
172 &self,
173 proc_id: &hyperactor_reference::ProcId,
174 ) -> (resource::Status, Option<bootstrap::ProcStatus>) {
175 match self {
176 HostAgentMode::Process { host, .. } => match host.manager().status(proc_id).await {
177 Some(proc_status) => (proc_status.clone().into(), Some(proc_status)),
178 None => (resource::Status::Unknown, None),
179 },
180 HostAgentMode::Local(host) => {
181 let status = match host.manager().local_proc_status(proc_id).await {
182 Some(hyperactor::host::LocalProcStatus::Stopping) => resource::Status::Stopping,
183 Some(hyperactor::host::LocalProcStatus::Stopped) => resource::Status::Stopped,
184 None => resource::Status::Running,
185 };
186 (status, None)
187 }
188 }
189 }
190
191 fn bootstrap_command(&self) -> Option<BootstrapCommand> {
193 match self {
194 HostAgentMode::Process { host, .. } => Some(host.manager().command().clone()),
195 HostAgentMode::Local(_) => None,
196 }
197 }
198}
199
200#[derive(Debug)]
201pub(crate) struct ProcCreationState {
202 pub(crate) rank: usize,
203 pub(crate) created: Result<
204 (
205 hyperactor_reference::ProcId,
206 hyperactor_reference::ActorRef<ProcAgent>,
207 ),
208 HostError,
209 >,
210}
211
212pub const HOST_MESH_AGENT_ACTOR_NAME: &str = "host_agent";
214
215#[hyperactor::export(
218 handlers=[
219 resource::CreateOrUpdate<ProcSpec>,
220 resource::Stop,
221 resource::GetState<ProcState>,
222 resource::GetRankStatus { cast = true },
223 resource::List,
224 ShutdownHost,
225 SpawnMeshAdmin,
226 SetClientConfig,
227 ]
228)]
229pub struct HostAgent {
230 pub(crate) host: Option<HostAgentMode>,
231 pub(crate) created: HashMap<Name, ProcCreationState>,
232 local_mesh_agent: OnceLock<anyhow::Result<ActorHandle<ProcAgent>>>,
236 mailbox_handle: Option<MailboxServerHandle>,
241}
242
243impl HostAgent {
244 pub fn new(host: HostAgentMode) -> Self {
246 Self {
247 host: Some(host),
248 created: HashMap::new(),
249 local_mesh_agent: OnceLock::new(),
250 mailbox_handle: None,
251 }
252 }
253
254 fn publish_introspect_properties(&self, cx: &Instance<Self>) {
258 let host = match self.host.as_ref() {
259 Some(h) => h,
260 None => return, };
262
263 let addr = host.addr().to_string();
264 let mut children = Vec::new();
265 let system_children = Vec::new();
266
267 let sys_ref = host.system_proc().proc_id().to_string();
271 let local_ref = host.local_proc().proc_id().to_string();
272 children.push(sys_ref);
273 children.push(local_ref);
274
275 for state in self.created.values() {
277 if let Ok((proc_id, _agent_ref)) = &state.created {
278 children.push(proc_id.to_string());
279 }
280 }
281
282 let num_procs = children.len();
283
284 let mut attrs = hyperactor_config::Attrs::new();
285 attrs.set(crate::introspect::NODE_TYPE, "host".to_string());
286 attrs.set(crate::introspect::ADDR, addr);
287 attrs.set(crate::introspect::NUM_PROCS, num_procs);
288 attrs.set(hyperactor::introspect::CHILDREN, children);
289 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
290 cx.publish_attrs(attrs);
291 }
292}
293
294#[async_trait]
295impl Actor for HostAgent {
296 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
297 this.bind::<Self>();
300 match self.host.as_mut().unwrap() {
301 HostAgentMode::Process { host, .. } => {
302 self.mailbox_handle = host.serve();
303 let (directory, file) = hyperactor_telemetry::log_file_path(
304 hyperactor_telemetry::env::Env::current(),
305 None,
306 )
307 .unwrap();
308 eprintln!(
309 "Monarch internal logs are being written to {}/{}.log; execution id {}",
310 directory,
311 file,
312 hyperactor_telemetry::env::execution_id(),
313 );
314 }
315 HostAgentMode::Local(host) => {
316 host.serve();
317 }
318 };
319 this.set_system();
320 self.publish_introspect_properties(this);
321
322 let host = self.host.as_ref().expect("host present");
325 let system_proc = host.system_proc().clone();
326 let local_proc = host.local_proc().clone();
327 let self_id = this.self_id().clone();
328 this.set_query_child_handler(move |child_ref| {
329 use hyperactor::introspect::IntrospectResult;
330
331 let proc = match child_ref {
332 hyperactor::reference::Reference::Proc(proc_id) => {
333 if *proc_id == *system_proc.proc_id() {
334 Some((&system_proc, SERVICE_PROC_NAME))
335 } else if *proc_id == *local_proc.proc_id() {
336 Some((&local_proc, LOCAL_PROC_NAME))
337 } else {
338 None
339 }
340 }
341 _ => None,
342 };
343
344 match proc {
345 Some((proc, label)) => {
346 let all_keys = proc.all_instance_keys();
360 let mut actors = Vec::with_capacity(all_keys.len());
361 let mut system_actors = Vec::new();
362 for id in all_keys {
363 let ref_str = id.to_string();
364 if proc.get_instance(&id).is_some_and(|cell| cell.is_system()) {
365 system_actors.push(ref_str.clone());
366 }
367 actors.push(ref_str);
368 }
369 let mut attrs = hyperactor_config::Attrs::new();
371 attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
372 attrs.set(crate::introspect::PROC_NAME, label.to_string());
373 attrs.set(crate::introspect::NUM_ACTORS, actors.len());
374 attrs.set(crate::introspect::SYSTEM_CHILDREN, system_actors.clone());
375 let attrs_json =
376 serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
377
378 IntrospectResult {
379 identity: proc.proc_id().to_string(),
380 attrs: attrs_json,
381 children: actors,
382 parent: Some(HostId(self_id.clone()).to_string()),
383 as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
384 .to_string(),
385 }
386 }
387 None => {
388 let mut error_attrs = hyperactor_config::Attrs::new();
389 error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
390 error_attrs.set(
391 hyperactor::introspect::ERROR_MESSAGE,
392 format!("child {} not found", child_ref),
393 );
394 IntrospectResult {
395 identity: String::new(),
396 attrs: serde_json::to_string(&error_attrs)
397 .unwrap_or_else(|_| "{}".to_string()),
398 children: Vec::new(),
399 parent: None,
400 as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
401 .to_string(),
402 }
403 }
404 }
405 });
406
407 Ok(())
408 }
409}
410
411impl fmt::Debug for HostAgent {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 f.debug_struct("HostAgent")
414 .field("host", &"..")
415 .field("created", &self.created)
416 .finish()
417 }
418}
419
420#[async_trait]
421impl Handler<resource::CreateOrUpdate<ProcSpec>> for HostAgent {
422 #[tracing::instrument("HostAgent::CreateOrUpdate", level = "info", skip_all, fields(name=%create_or_update.name))]
423 async fn handle(
424 &mut self,
425 cx: &Context<Self>,
426 create_or_update: resource::CreateOrUpdate<ProcSpec>,
427 ) -> anyhow::Result<()> {
428 if self.created.contains_key(&create_or_update.name) {
429 return Ok(());
431 }
432
433 let host = self.host.as_mut().expect("host present");
434 let created = match host {
435 HostAgentMode::Process { host, .. } => {
436 host.spawn(
437 create_or_update.name.clone().to_string(),
438 BootstrapProcConfig {
439 create_rank: create_or_update.rank.unwrap(),
440 client_config_override: create_or_update
441 .spec
442 .client_config_override
443 .clone(),
444 },
445 )
446 .await
447 }
448 HostAgentMode::Local(host) => {
449 host.spawn(create_or_update.name.clone().to_string(), ())
450 .await
451 }
452 };
453
454 if let Err(e) = &created {
455 tracing::error!("failed to spawn proc {}: {}", create_or_update.name, e);
456 }
457 self.created.insert(
458 create_or_update.name.clone(),
459 ProcCreationState {
460 rank: create_or_update.rank.unwrap(),
461 created,
462 },
463 );
464
465 self.publish_introspect_properties(cx);
466 Ok(())
467 }
468}
469
470#[async_trait]
471impl Handler<resource::Stop> for HostAgent {
472 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
473 tracing::info!(
474 name = "HostMeshAgentStatus",
475 proc_name = %message.name,
476 reason = %message.reason,
477 "stopping proc"
478 );
479 let host = self
480 .host
481 .as_ref()
482 .ok_or(anyhow::anyhow!("HostAgent has already shut down"))?;
483 let timeout = hyperactor_config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
484
485 if let Some(ProcCreationState {
486 created: Ok((proc_id, _)),
487 ..
488 }) = self.created.get(&message.name)
489 {
490 host.request_stop(cx, proc_id, timeout, &message.reason)
491 .await;
492 }
493
494 self.publish_introspect_properties(cx);
495 Ok(())
496 }
497}
498
499#[async_trait]
500impl Handler<resource::GetRankStatus> for HostAgent {
501 async fn handle(
502 &mut self,
503 cx: &Context<Self>,
504 get_rank_status: resource::GetRankStatus,
505 ) -> anyhow::Result<()> {
506 use crate::StatusOverlay;
507 use crate::resource::Status;
508
509 let (rank, status) = match self.created.get(&get_rank_status.name) {
510 Some(ProcCreationState {
511 rank,
512 created: Ok((proc_id, _mesh_agent)),
513 }) => {
514 let status = match self.host.as_ref() {
515 Some(host) => host.proc_status(proc_id).await.0,
516 None => Status::Stopped,
517 };
518 (*rank, status)
519 }
520 Some(ProcCreationState {
521 rank,
522 created: Err(e),
523 ..
524 }) => (*rank, Status::Failed(e.to_string())),
525 None => (usize::MAX, Status::NotExist),
526 };
527
528 let overlay = if rank == usize::MAX {
529 StatusOverlay::new()
530 } else {
531 StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
532 .expect("valid single-run overlay")
533 };
534 let result = get_rank_status.reply.send(cx, overlay);
535 if let Err(e) = result {
539 tracing::warn!(
540 actor = %cx.self_id(),
541 "failed to send GetRankStatus reply to {} due to error: {}",
542 get_rank_status.reply.port_id().actor_id(),
543 e
544 );
545 }
546 Ok(())
547 }
548}
549
550#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
551pub struct ShutdownHost {
552 pub timeout: std::time::Duration,
555 pub max_in_flight: usize,
557 #[reply]
559 pub ack: hyperactor::reference::PortRef<()>,
560}
561wirevalue::register_type!(ShutdownHost);
562
563#[async_trait]
564impl Handler<ShutdownHost> for HostAgent {
565 async fn handle(&mut self, cx: &Context<Self>, msg: ShutdownHost) -> anyhow::Result<()> {
566 let (return_handle, mut return_receiver) = cx.mailbox().open_port();
568 cx.mailbox()
569 .serialize_and_send(&msg.ack, (), return_handle)?;
570
571 let mut shutdown_tx = None;
572 if let Some(host_mode) = self.host.take() {
573 match host_mode {
574 HostAgentMode::Process {
575 host,
576 shutdown_tx: tx,
577 } => {
578 let summary = host
579 .terminate_children(
580 cx,
581 msg.timeout,
582 msg.max_in_flight.clamp(1, 256),
583 "shutdown host",
584 )
585 .await;
586 tracing::info!(?summary, "terminated children on host");
587 shutdown_tx = tx;
588 }
589 HostAgentMode::Local(host) => {
590 let summary = host
591 .terminate_children(cx, msg.timeout, msg.max_in_flight, "shutdown host")
592 .await;
593 tracing::info!(?summary, "terminated children on local host");
594 }
595 }
596 }
597
598 if return_receiver.recv().await.is_ok() {
600 tracing::warn!("failed to send ack");
601 }
602
603 let _ = self.host.take();
605
606 if let Some(tx) = shutdown_tx {
607 tracing::info!(
608 proc_id = %cx.self_id().proc_id(),
609 actor_id = %cx.self_id(),
610 "host is shut down, sending mailbox handle to bootstrap for draining"
611 );
612 if let Some(handle) = self.mailbox_handle.take() {
613 let _ = tx.send(handle);
614 }
615 }
616
617 Ok(())
618 }
619}
620
621#[derive(Debug, Clone, PartialEq, Eq, Named, Serialize, Deserialize)]
622pub struct ProcState {
623 pub proc_id: hyperactor_reference::ProcId,
624 pub create_rank: usize,
625 pub mesh_agent: hyperactor_reference::ActorRef<ProcAgent>,
626 pub bootstrap_command: Option<BootstrapCommand>,
627 pub proc_status: Option<bootstrap::ProcStatus>,
628}
629wirevalue::register_type!(ProcState);
630
631#[async_trait]
632impl Handler<resource::GetState<ProcState>> for HostAgent {
633 async fn handle(
634 &mut self,
635 cx: &Context<Self>,
636 get_state: resource::GetState<ProcState>,
637 ) -> anyhow::Result<()> {
638 let state = match self.created.get(&get_state.name) {
639 Some(ProcCreationState {
640 rank,
641 created: Ok((proc_id, mesh_agent)),
642 }) => {
643 let (status, proc_status, bootstrap_command) = match self.host.as_ref() {
644 Some(host) => {
645 let (status, proc_status) = host.proc_status(proc_id).await;
646 (status, proc_status, host.bootstrap_command())
647 }
648 None => (resource::Status::Stopped, None, None),
649 };
650 resource::State {
651 name: get_state.name.clone(),
652 status,
653 state: Some(ProcState {
654 proc_id: proc_id.clone(),
655 create_rank: *rank,
656 mesh_agent: mesh_agent.clone(),
657 bootstrap_command,
658 proc_status,
659 }),
660 }
661 }
662 Some(ProcCreationState {
663 created: Err(e), ..
664 }) => resource::State {
665 name: get_state.name.clone(),
666 status: resource::Status::Failed(e.to_string()),
667 state: None,
668 },
669 None => resource::State {
670 name: get_state.name.clone(),
671 status: resource::Status::NotExist,
672 state: None,
673 },
674 };
675
676 let result = get_state.reply.send(cx, state);
677 if let Err(e) = result {
681 tracing::warn!(
682 actor = %cx.self_id(),
683 "failed to send GetState reply to {} due to error: {}",
684 get_state.reply.port_id().actor_id(),
685 e
686 );
687 }
688 Ok(())
689 }
690}
691
692#[async_trait]
693impl Handler<resource::List> for HostAgent {
694 async fn handle(&mut self, cx: &Context<Self>, list: resource::List) -> anyhow::Result<()> {
695 list.reply
696 .send(cx, self.created.keys().cloned().collect())?;
697 Ok(())
698 }
699}
700
701#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient, HandleClient)]
706pub struct SpawnMeshAdmin {
707 pub hosts: Vec<(String, hyperactor_reference::ActorRef<HostAgent>)>,
711
712 pub root_client_actor_id: Option<hyperactor_reference::ActorId>,
716
717 pub admin_addr: Option<std::net::SocketAddr>,
720
721 #[reply]
724 pub addr: hyperactor::reference::PortRef<String>,
725}
726wirevalue::register_type!(SpawnMeshAdmin);
727
728#[async_trait]
729impl Handler<SpawnMeshAdmin> for HostAgent {
730 async fn handle(&mut self, cx: &Context<Self>, msg: SpawnMeshAdmin) -> anyhow::Result<()> {
734 let proc = self
735 .host
736 .as_ref()
737 .ok_or_else(|| anyhow::anyhow!("host is not available"))?
738 .system_proc();
739
740 let agent_handle = proc.spawn(
741 crate::mesh_admin::MESH_ADMIN_ACTOR_NAME,
742 crate::mesh_admin::MeshAdminAgent::new(
743 msg.hosts,
744 msg.root_client_actor_id,
745 msg.admin_addr,
746 ),
747 )?;
748 let response = agent_handle.get_admin_addr(cx).await?;
749 let addr_str = response
750 .addr
751 .ok_or_else(|| anyhow::anyhow!("mesh admin agent did not report an address"))?;
752
753 msg.addr.send(cx, addr_str)?;
754 Ok(())
755 }
756}
757
758#[derive(Debug, Named, Handler, RefClient, HandleClient, Serialize, Deserialize)]
769pub struct SetClientConfig {
770 pub attrs: Attrs,
771 #[reply]
772 pub done: hyperactor_reference::PortRef<()>,
773}
774wirevalue::register_type!(SetClientConfig);
775
776#[async_trait]
777impl Handler<SetClientConfig> for HostAgent {
778 async fn handle(&mut self, cx: &Context<Self>, msg: SetClientConfig) -> anyhow::Result<()> {
779 hyperactor_config::global::set(
783 hyperactor_config::global::Source::ClientOverride,
784 msg.attrs,
785 );
786 tracing::debug!("installed client config override on host agent");
787 msg.done.send(cx, ())?;
788 Ok(())
789 }
790}
791
792#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
801pub struct GetLocalProc {
802 #[reply]
803 pub proc_mesh_agent: PortHandle<ActorHandle<ProcAgent>>,
804}
805
806#[async_trait]
807impl Handler<GetLocalProc> for HostAgent {
808 async fn handle(
809 &mut self,
810 cx: &Context<Self>,
811 GetLocalProc { proc_mesh_agent }: GetLocalProc,
812 ) -> anyhow::Result<()> {
813 let agent = self.local_mesh_agent.get_or_init(|| {
814 ProcAgent::boot_v1(self.host.as_ref().unwrap().local_proc().clone(), None)
815 });
816
817 match agent {
818 Err(e) => anyhow::bail!("error booting local proc: {}", e),
819 Ok(agent) => proc_mesh_agent.send(cx, agent.clone())?,
820 };
821
822 Ok(())
823 }
824}
825
826#[derive(Debug)]
831#[hyperactor::export(
832 spawn = true,
833 handlers=[GetHostMeshAgent]
834)]
835pub(crate) struct HostMeshAgentProcMeshTrampoline {
836 host_mesh_agent: ActorHandle<HostAgent>,
837 reply_port: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
838}
839
840#[async_trait]
841impl Actor for HostMeshAgentProcMeshTrampoline {
842 async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
843 self.reply_port.send(this, self.host_mesh_agent.bind())?;
844 Ok(())
845 }
846}
847
848#[async_trait]
849impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline {
850 type Params = (
851 ChannelTransport,
852 hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
853 Option<BootstrapCommand>,
854 bool, );
856
857 async fn new(
858 (transport, reply_port, command, local): Self::Params,
859 _environment: Flattrs,
860 ) -> anyhow::Result<Self> {
861 let host = if local {
862 let spawn: ProcManagerSpawnFn =
863 Box::new(|proc| Box::pin(std::future::ready(ProcAgent::boot_v1(proc, None))));
864 let manager = LocalProcManager::new(spawn);
865 let host = Host::new(manager, transport.any()).await?;
866 HostAgentMode::Local(host)
867 } else {
868 let command = match command {
869 Some(command) => command,
870 None => BootstrapCommand::current()?,
871 };
872 tracing::info!("booting host with proc command {:?}", command);
873 let manager = BootstrapProcManager::new(command).unwrap();
874 let host = Host::new(manager, transport.any()).await?;
875 HostAgentMode::Process {
876 host,
877 shutdown_tx: None,
878 }
879 };
880
881 let system_proc = host.system_proc().clone();
882 let host_mesh_agent =
883 system_proc.spawn(HOST_MESH_AGENT_ACTOR_NAME, HostAgent::new(host))?;
884
885 Ok(Self {
886 host_mesh_agent,
887 reply_port,
888 })
889 }
890}
891
892#[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient)]
893pub struct GetHostMeshAgent {
894 #[reply]
895 pub host_mesh_agent: hyperactor_reference::PortRef<hyperactor_reference::ActorRef<HostAgent>>,
896}
897wirevalue::register_type!(GetHostMeshAgent);
898
899#[async_trait]
900impl Handler<GetHostMeshAgent> for HostMeshAgentProcMeshTrampoline {
901 async fn handle(
902 &mut self,
903 cx: &Context<Self>,
904 get_host_mesh_agent: GetHostMeshAgent,
905 ) -> anyhow::Result<()> {
906 get_host_mesh_agent
907 .host_mesh_agent
908 .send(cx, self.host_mesh_agent.bind())?;
909 Ok(())
910 }
911}
912
913#[cfg(test)]
914mod tests {
915 use std::assert_matches::assert_matches;
916
917 use hyperactor::Proc;
918 use hyperactor::channel::ChannelTransport;
919
920 use super::*;
921 use crate::bootstrap::ProcStatus;
922 use crate::resource::CreateOrUpdateClient;
923 use crate::resource::GetStateClient;
924
925 #[tokio::test]
926 #[cfg(fbcode_build)]
927 async fn test_basic() {
928 let host = Host::new(
929 BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
930 ChannelTransport::Unix.any(),
931 )
932 .await
933 .unwrap();
934
935 let host_addr = host.addr().clone();
936 let system_proc = host.system_proc().clone();
937 let host_agent = system_proc
938 .spawn(
939 HOST_MESH_AGENT_ACTOR_NAME,
940 HostAgent::new(HostAgentMode::Process {
941 host,
942 shutdown_tx: None,
943 }),
944 )
945 .unwrap();
946
947 let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
948 let (client, _client_handle) = client_proc.instance("client").unwrap();
949
950 let name = Name::new("proc1").unwrap();
951
952 host_agent
955 .create_or_update(
956 &client,
957 name.clone(),
958 resource::Rank::new(0),
959 ProcSpec::default(),
960 )
961 .await
962 .unwrap();
963 assert_matches!(
964 host_agent.get_state(&client, name.clone()).await.unwrap(),
965 resource::State {
966 name: resource_name,
967 status: resource::Status::Running,
968 state: Some(ProcState {
969 proc_id,
971 mesh_agent,
974 bootstrap_command,
975 proc_status: Some(ProcStatus::Ready { started_at: _, addr: _, agent: proc_status_mesh_agent}),
976 ..
977 }),
978 } if name == resource_name
979 && proc_id == hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string())
980 && mesh_agent == hyperactor_reference::ActorRef::attest(hyperactor_reference::ProcId::with_name(host_addr.clone(), name.to_string()).actor_id(crate::proc_agent::PROC_AGENT_ACTOR_NAME, 0)) && bootstrap_command == Some(BootstrapCommand::test())
981 && mesh_agent == proc_status_mesh_agent
982 );
983 }
984}