1use std::collections::HashMap;
52use std::collections::HashSet;
53use std::fmt;
54use std::marker::PhantomData;
55use std::str::FromStr;
56use std::sync::Arc;
57use std::time::Duration;
58
59use async_trait::async_trait;
60use futures::Future;
61use futures::StreamExt;
62use futures::stream;
63use hyperactor::Actor;
64use hyperactor::ActorAddr;
65use hyperactor::ActorHandle;
66use hyperactor::ActorRef;
67use hyperactor::Addr;
68use hyperactor::AttachRequest;
69use hyperactor::BootstrapAssignment;
70use hyperactor::Host2Client;
71use hyperactor::PortHandle;
72use hyperactor::Proc;
73use hyperactor::ProcAddr;
74use hyperactor::actor::Binds;
75use hyperactor::actor::Referable;
76use hyperactor::channel;
77use hyperactor::channel::ChannelAddr;
78use hyperactor::channel::ChannelError;
79use hyperactor::channel::ChannelRx;
80use hyperactor::channel::ChannelTransport;
81use hyperactor::channel::Rx;
82use hyperactor::channel::ServerError;
83use hyperactor::channel::Tx;
84use hyperactor::context;
85use hyperactor::mailbox::BoxableMailboxSender;
86use hyperactor::mailbox::BoxedMailboxSender;
87use hyperactor::mailbox::DialMailboxRouter;
88use hyperactor::mailbox::IntoBoxedMailboxSender as _;
89use hyperactor::mailbox::MailboxClient;
90use hyperactor::mailbox::MailboxRouter;
91use hyperactor::mailbox::MailboxSender;
92use hyperactor::mailbox::MailboxServer;
93use hyperactor::mailbox::MailboxServerError;
94use hyperactor::mailbox::MailboxServerHandle;
95use hyperactor::mailbox::MessageEnvelope;
96use hyperactor::mailbox::Undeliverable;
97pub use hyperactor::proc::LEGACY_LOCAL_PROC_NAME as LOCAL_PROC_NAME;
106pub use hyperactor::proc::LEGACY_SERVICE_PROC_NAME as SERVICE_PROC_NAME;
110use tokio::process::Child;
111use tokio::process::Command;
112use tokio::sync::Mutex;
113use tokio::sync::watch;
114use tokio::task::JoinSet;
115
116use crate::mesh_id::ResourceId;
117
118#[derive(Clone)]
123struct AttachSender(channel::duplex::DuplexTx<Host2Client>);
124
125#[async_trait]
126impl MailboxSender for AttachSender {
127 fn post_unchecked(
128 &self,
129 envelope: MessageEnvelope,
130 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
131 ) {
132 self.0.post(Host2Client::Envelope(envelope));
133 }
134}
135
136#[derive(Debug, thiserror::Error)]
138pub enum HostError {
139 #[error(transparent)]
141 ChannelError(#[from] ChannelError),
142
143 #[error(transparent)]
145 ServerError(#[from] ServerError),
146
147 #[error("host is already serving")]
149 AlreadyServing,
150
151 #[error("proc '{0}' already exists")]
153 ProcExists(String),
154
155 #[error("proc '{0}' (command: {1}) failed to spawn process: {2}")]
157 ProcessSpawnFailure(ProcAddr, String, #[source] std::io::Error),
158
159 #[error("proc '{0}' failed to configure process: {1}")]
161 ProcessConfigurationFailure(ProcAddr, #[source] anyhow::Error),
162
163 #[error("failed to spawn agent on proc '{0}': {1}")]
165 AgentSpawnFailure(ProcAddr, #[source] anyhow::Error),
166
167 #[error("parameter '{0}' missing: {1}")]
169 MissingParameter(String, std::env::VarError),
170
171 #[error("parameter '{0}' invalid: {1}")]
173 InvalidParameter(String, anyhow::Error),
174}
175
176pub struct Host<M> {
179 procs: HashSet<String>,
180 frontend_addr: ChannelAddr,
181 backend_addr: ChannelAddr,
182 router: MailboxRouter,
184 dial_router: DialMailboxRouter,
188 manager: M,
189 service_proc: Proc,
190 local_proc: Proc,
191 frontend: Option<Frontend>,
193}
194
195enum Frontend {
201 Duplex(channel::duplex::DuplexServer<MessageEnvelope, Host2Client>),
205 Simplex(ChannelRx<MessageEnvelope>),
208}
209
210impl<M: ProcManager> Host<M> {
211 pub async fn new(manager: M, addr: ChannelAddr) -> Result<Self, HostError> {
215 Self::new_with_default(manager, addr, None, None).await
216 }
217
218 #[hyperactor::instrument(fields(addr=addr.to_string()))]
223 pub async fn new_with_default(
224 manager: M,
225 addr: ChannelAddr,
226 default_sender: Option<BoxedMailboxSender>,
227 listener: Option<std::net::TcpListener>,
228 ) -> Result<Self, HostError> {
229 let (frontend_addr, frontend) = if addr.transport().supports_duplex() {
233 let server = channel::duplex::serve::<MessageEnvelope, Host2Client>(addr, listener)?;
234 let frontend_addr = server.addr().clone();
235 (frontend_addr, Frontend::Duplex(server))
236 } else {
237 let (frontend_addr, frontend_rx) = channel::serve_with_listener(addr, listener)?;
238 (frontend_addr, Frontend::Simplex(frontend_rx))
239 };
240 let dial_router = match default_sender {
244 Some(d) => DialMailboxRouter::new_with_default(d),
245 None => DialMailboxRouter::new(),
246 };
247 let router = MailboxRouter::new();
248
249 let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
251
252 let combined = router.fallback(dial_router.boxed());
257 let service_proc =
258 Proc::legacy_service_pseudo_singleton(frontend_addr.clone(), combined.clone());
259 let local_proc = Proc::legacy_local_pseudo_singleton(frontend_addr.clone(), combined);
260 let service_proc_id = service_proc.proc_addr().clone();
261 let local_proc_id = local_proc.proc_addr().clone();
262
263 router.bind(
268 Addr::from(service_proc_id.clone()),
269 service_proc.muxer().clone(),
270 );
271 router.bind(
272 Addr::from(local_proc_id.clone()),
273 local_proc.muxer().clone(),
274 );
275
276 tracing::info!(
277 frontend_addr = frontend_addr.to_string(),
278 backend_addr = backend_addr.to_string(),
279 service_proc_id = service_proc_id.to_string(),
280 local_proc_id = local_proc_id.to_string(),
281 "serving host"
282 );
283
284 let host = Host {
285 procs: HashSet::new(),
286 frontend_addr,
287 backend_addr,
288 router,
289 dial_router,
290 manager,
291 service_proc,
292 local_proc,
293 frontend: Some(frontend),
294 };
295
296 let _backend_handle = host.forwarder().serve(backend_rx);
302
303 Ok(host)
304 }
305
306 pub fn serve(&mut self) -> Result<MailboxServerHandle, HostError> {
316 let frontend = self.frontend.take().ok_or(HostError::AlreadyServing)?;
317 let forwarder = self.forwarder();
318 Ok(match frontend {
319 Frontend::Duplex(server) => spawn_duplex_accept_loop(
320 server,
321 self.frontend_addr.clone(),
322 self.router.clone(),
323 forwarder,
324 ),
325 Frontend::Simplex(rx) => forwarder.serve(rx),
326 })
327 }
328
329 pub fn manager(&self) -> &M {
331 &self.manager
332 }
333
334 pub fn addr(&self) -> &ChannelAddr {
336 &self.frontend_addr
337 }
338
339 pub fn system_proc(&self) -> &Proc {
342 &self.service_proc
343 }
344
345 pub fn local_proc(&self) -> &Proc {
350 &self.local_proc
351 }
352
353 pub async fn spawn(
358 &mut self,
359 name: String,
360 config: M::Config,
361 ) -> Result<(ProcAddr, ActorRef<ManagerAgent<M>>), HostError> {
362 if self.procs.contains(&name) {
363 return Err(HostError::ProcExists(name));
364 }
365
366 let proc_id = ResourceId::proc_addr_from_name(self.frontend_addr.clone(), &name);
367 let handle = self
368 .manager
369 .spawn(proc_id.clone(), self.backend_addr.clone(), config)
370 .await?;
371
372 let to: Duration =
374 hyperactor_config::global::get(hyperactor::config::HOST_SPAWN_READY_TIMEOUT);
375 let ready = if to == Duration::from_secs(0) {
376 ReadyProc::ensure(&handle).await
377 } else {
378 match tokio::time::timeout(to, ReadyProc::ensure(&handle)).await {
379 Ok(result) => result,
380 Err(_elapsed) => Err(ReadyProcError::Timeout),
381 }
382 }
383 .map_err(|e| {
384 HostError::ProcessConfigurationFailure(proc_id.clone(), anyhow::anyhow!("{e:?}"))
385 })?;
386
387 self.dial_router
388 .bind(Addr::from(proc_id.clone()), ready.addr().clone());
389 self.procs.insert(name.clone());
390
391 Ok((proc_id, ready.agent_ref().clone()))
392 }
393
394 fn forwarder(&self) -> BoxedMailboxSender {
398 self.router.fallback(self.dial_router.boxed())
399 }
400}
401
402fn spawn_duplex_accept_loop(
407 server: channel::duplex::DuplexServer<MessageEnvelope, Host2Client>,
408 frontend_addr: ChannelAddr,
409 router: MailboxRouter,
410 forwarder: BoxedMailboxSender,
411) -> MailboxServerHandle {
412 let (stopped_tx, stopped_rx) = watch::channel(false);
413 let join_handle = tokio::spawn(async move {
414 duplex_accept_loop(server, frontend_addr, router, forwarder, stopped_rx).await;
415 Ok::<(), MailboxServerError>(())
416 });
417 MailboxServerHandle::from_parts(join_handle, stopped_tx)
418}
419
420async fn wait_for_stop(mut stopped_rx: watch::Receiver<bool>) {
425 let ok = stopped_rx.wait_for(|stopped| *stopped).await.is_ok();
426 if !ok {
427 std::future::pending::<()>().await;
428 }
429}
430
431struct PrependRx<R> {
435 first: Option<MessageEnvelope>,
436 inner: R,
437}
438
439#[async_trait]
440impl<R: channel::Rx<MessageEnvelope> + Send> channel::Rx<MessageEnvelope> for PrependRx<R> {
441 async fn recv(&mut self) -> Result<MessageEnvelope, ChannelError> {
442 if let Some(msg) = self.first.take() {
443 return Ok(msg);
444 }
445 self.inner.recv().await
446 }
447
448 fn addr(&self) -> ChannelAddr {
449 self.inner.addr()
450 }
451
452 async fn join(self) {
453 self.inner.join().await
454 }
455}
456
457async fn duplex_accept_loop(
491 mut duplex_server: channel::duplex::DuplexServer<MessageEnvelope, Host2Client>,
492 frontend_addr: ChannelAddr,
493 router: MailboxRouter,
494 forwarder: BoxedMailboxSender,
495 stopped_rx: watch::Receiver<bool>,
496) {
497 let mut tasks = JoinSet::new();
498 loop {
499 let accept = tokio::select! {
500 result = duplex_server.accept() => result,
501 () = wait_for_stop(stopped_rx.clone()) => break,
502 };
503 let (mut duplex_rx, duplex_tx) = match accept {
504 Ok(pair) => pair,
505 Err(e) => {
506 tracing::info!(
507 frontend_addr = frontend_addr.to_string(),
508 error = %e,
509 "duplex accept loop ended"
510 );
511 break;
512 }
513 };
514
515 let first_msg = match duplex_rx.recv().await {
517 Ok(msg) => msg,
518 Err(e) => {
519 tracing::info!(error = %e, "duplex connection closed before first message");
520 continue;
521 }
522 };
523
524 let is_attach = first_msg.deserialized::<AttachRequest>().is_ok();
525
526 if is_attach {
527 let proc_id = ProcAddr::instance(frontend_addr.clone(), "remote");
533
534 let assignment = BootstrapAssignment {
535 proc_id: proc_id.clone(),
536 };
537 tracing::info!(
538 proc_id = proc_id.to_string(),
539 "duplex accepted attach connection"
540 );
541 duplex_tx.post(Host2Client::Bootstrap(assignment));
542
543 router.bind(Addr::from(proc_id.clone()), AttachSender(duplex_tx));
544
545 let mut handle = forwarder.clone().serve(duplex_rx);
546 let cleanup_router = router.clone();
547 let conn_stop = stopped_rx.clone();
548 tasks.spawn(async move {
549 tokio::select! {
550 _ = &mut handle => {}
551 () = wait_for_stop(conn_stop) => {
552 handle.stop("host duplex cancel");
553 let _ = handle.await;
554 }
555 }
556 cleanup_router.unbind(&Addr::from(proc_id.clone()));
557 tracing::info!(
558 proc_id = proc_id.to_string(),
559 "attach connection closed, removed route"
560 );
561 });
562 } else {
563 let fwd = forwarder.clone();
570 let conn_stop = stopped_rx.clone();
571 tasks.spawn(async move {
572 let _keep_alive = duplex_tx;
573 let rx = PrependRx {
574 first: Some(first_msg),
575 inner: duplex_rx,
576 };
577 let mut handle = fwd.serve(rx);
578 tokio::select! {
579 _ = &mut handle => {}
580 () = wait_for_stop(conn_stop) => {
581 handle.stop("host frontend cancel");
582 let _ = handle.await;
583 }
584 }
585 });
586 }
587 }
588
589 while tasks.join_next().await.is_some() {}
590
591 duplex_server.join().await;
598}
599
600#[derive(Debug, Clone)]
602pub enum ReadyError<TerminalStatus> {
603 Terminal(TerminalStatus),
605 ChannelClosed,
607}
608
609#[derive(Debug, Clone)]
611pub enum ReadyProcError<TerminalStatus> {
612 Timeout,
614 Ready(ReadyError<TerminalStatus>),
616 MissingAddr,
618 MissingAgentRef,
621}
622
623impl<T> From<ReadyError<T>> for ReadyProcError<T> {
624 fn from(e: ReadyError<T>) -> Self {
625 ReadyProcError::Ready(e)
626 }
627}
628
629#[derive(Debug, Clone)]
631pub enum WaitError {
632 ChannelClosed,
634}
635
636#[derive(Debug)]
649pub enum TerminateError<TerminalStatus> {
650 Unsupported,
652 AlreadyTerminated(TerminalStatus),
655 ChannelClosed,
657 Io(anyhow::Error),
660}
661
662impl<T: fmt::Debug> fmt::Display for TerminateError<T> {
663 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
664 match self {
665 TerminateError::Unsupported => write!(f, "terminate/kill unsupported by manager"),
666 TerminateError::AlreadyTerminated(st) => {
667 write!(f, "proc already terminated (status: {st:?})")
668 }
669 TerminateError::ChannelClosed => {
670 write!(f, "lifecycle channel closed; cannot observe state")
671 }
672 TerminateError::Io(err) => write!(f, "I/O error during terminate/kill: {err}"),
673 }
674 }
675}
676
677impl<T: fmt::Debug> std::error::Error for TerminateError<T> {
678 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
679 match self {
680 TerminateError::Io(err) => Some(err.root_cause()),
681 _ => None,
682 }
683 }
684}
685
686#[derive(Debug)]
695pub struct TerminateSummary {
696 pub attempted: usize,
699 pub ok: usize,
705 pub failed: usize,
712}
713
714impl fmt::Display for TerminateSummary {
715 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716 write!(
717 f,
718 "attempted={} ok={} failed={}",
719 self.attempted, self.ok, self.failed
720 )
721 }
722}
723
724#[async_trait::async_trait]
725pub trait SingleTerminate: Send + Sync {
727 async fn terminate_proc(
748 &self,
749 cx: &impl context::Actor,
750 proc: &ProcAddr,
751 timeout: std::time::Duration,
752 reason: &str,
753 ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error>;
754}
755
756#[async_trait::async_trait]
766pub trait BulkTerminate: Send + Sync {
767 async fn terminate_all(
790 &self,
791 cx: &impl context::Actor,
792 timeout: std::time::Duration,
793 max_in_flight: usize,
794 reason: &str,
795 ) -> TerminateSummary;
796}
797
798impl<M: ProcManager + BulkTerminate> Host<M> {
801 pub async fn terminate_children(
815 &mut self,
816 cx: &impl context::Actor,
817 timeout: Duration,
818 max_in_flight: usize,
819 reason: &str,
820 ) -> TerminateSummary {
821 let summary = self
822 .manager
823 .terminate_all(cx, timeout, max_in_flight, reason)
824 .await;
825 for name in self.procs.drain() {
828 let proc_ref = ResourceId::proc_addr_from_name(self.frontend_addr.clone(), &name);
829 self.dial_router.unbind(&Addr::from(proc_ref));
830 }
831 summary
832 }
833}
834
835#[async_trait::async_trait]
836impl<M: ProcManager + SingleTerminate> SingleTerminate for Host<M> {
837 async fn terminate_proc(
838 &self,
839 cx: &impl context::Actor,
840 proc: &ProcAddr,
841 timeout: Duration,
842 reason: &str,
843 ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
844 self.manager.terminate_proc(cx, proc, timeout, reason).await
845 }
846}
847
848pub struct ReadyProc<'a, H: ProcHandle> {
856 handle: &'a H,
857 addr: ChannelAddr,
858 agent_ref: ActorRef<H::Agent>,
859}
860
861impl<'a, H: ProcHandle> ReadyProc<'a, H> {
862 pub async fn ensure(
870 handle: &'a H,
871 ) -> Result<ReadyProc<'a, H>, ReadyProcError<H::TerminalStatus>> {
872 handle.ready().await?;
873 let addr = handle.addr().ok_or(ReadyProcError::MissingAddr)?;
874 let agent_ref = handle.agent_ref().ok_or(ReadyProcError::MissingAgentRef)?;
875 Ok(ReadyProc {
876 handle,
877 addr,
878 agent_ref,
879 })
880 }
881
882 pub fn proc_addr(&self) -> &ProcAddr {
884 self.handle.proc_addr()
885 }
886
887 pub fn addr(&self) -> &ChannelAddr {
889 &self.addr
890 }
891
892 pub fn agent_ref(&self) -> &ActorRef<H::Agent> {
894 &self.agent_ref
895 }
896}
897
898#[async_trait]
933pub trait ProcHandle: Clone + Send + Sync + 'static {
934 type Agent: Actor + Referable;
940
941 type TerminalStatus: std::fmt::Debug + Clone + Send + Sync + 'static;
948
949 fn proc_addr(&self) -> &ProcAddr;
951
952 fn addr(&self) -> Option<ChannelAddr>;
959
960 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>>;
967
968 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>>;
971
972 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError>;
975
976 async fn terminate(
992 &self,
993 cx: &impl context::Actor,
994 timeout: Duration,
995 reason: &str,
996 ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
997
998 async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>>;
1003}
1004
1005#[async_trait]
1009pub trait ProcManager {
1010 type Handle: ProcHandle;
1012
1013 type Config = ();
1015
1016 fn transport(&self) -> ChannelTransport;
1021
1022 async fn spawn(
1030 &self,
1031 proc_id: ProcAddr,
1032 forwarder_addr: ChannelAddr,
1033 config: Self::Config,
1034 ) -> Result<Self::Handle, HostError>;
1035}
1036
1037pub type ManagerAgent<M> = <<M as ProcManager>::Handle as ProcHandle>::Agent; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1056pub enum LocalProcStatus {
1057 Stopping,
1059 Stopped,
1061}
1062
1063pub struct LocalProcManager<S> {
1076 procs: Arc<Mutex<HashMap<ProcAddr, Proc>>>,
1077 stopping: Arc<Mutex<HashMap<ProcAddr, tokio::sync::watch::Sender<LocalProcStatus>>>>,
1078 spawn: S,
1079}
1080
1081impl<S> LocalProcManager<S> {
1082 pub fn new(spawn: S) -> Self {
1085 Self {
1086 procs: Arc::new(Mutex::new(HashMap::new())),
1087 stopping: Arc::new(Mutex::new(HashMap::new())),
1088 spawn,
1089 }
1090 }
1091
1092 pub async fn request_stop(&self, proc: &ProcAddr, timeout: Duration, reason: &str) {
1099 {
1100 let guard = self.stopping.lock().await;
1101 if guard.contains_key(proc) {
1102 return;
1103 }
1104 }
1105
1106 let mut proc_handle = {
1107 let mut guard = self.procs.lock().await;
1108 match guard.remove(proc) {
1109 Some(p) => p,
1110 None => return,
1111 }
1112 };
1113
1114 let proc_ref: ProcAddr = proc_handle.proc_addr().clone();
1115 let (tx, _) = tokio::sync::watch::channel(LocalProcStatus::Stopping);
1116 self.stopping.lock().await.insert(proc_ref.clone(), tx);
1117
1118 let stopping = Arc::clone(&self.stopping);
1119 let reason = reason.to_string();
1120 tokio::spawn(async move {
1121 if let Err(e) = proc_handle.destroy_and_wait(timeout, &reason).await {
1122 tracing::warn!(error = %e, "request_stop(local): destroy_and_wait failed");
1123 }
1124 if let Some(tx) = stopping.lock().await.get(&proc_ref) {
1125 let _ = tx.send(LocalProcStatus::Stopped);
1126 }
1127 });
1128 }
1129
1130 pub async fn local_proc_status(&self, proc: &ProcAddr) -> Option<LocalProcStatus> {
1135 self.stopping.lock().await.get(proc).map(|tx| *tx.borrow())
1136 }
1137
1138 pub async fn watch(
1143 &self,
1144 proc: &ProcAddr,
1145 ) -> Option<tokio::sync::watch::Receiver<LocalProcStatus>> {
1146 self.stopping
1147 .lock()
1148 .await
1149 .get(proc)
1150 .map(|tx| tx.subscribe())
1151 }
1152}
1153
1154#[async_trait]
1155impl<S> BulkTerminate for LocalProcManager<S>
1156where
1157 S: Send + Sync,
1158{
1159 async fn terminate_all(
1160 &self,
1161 _cx: &impl context::Actor,
1162 timeout: std::time::Duration,
1163 max_in_flight: usize,
1164 reason: &str,
1165 ) -> TerminateSummary {
1166 let procs: Vec<Proc> = {
1169 let mut guard = self.procs.lock().await;
1170 guard.drain().map(|(_, v)| v).collect()
1171 };
1172
1173 let attempted = procs.len();
1174
1175 let results = stream::iter(procs.into_iter().map(|mut p| async move {
1176 match p.destroy_and_wait(timeout, reason).await {
1178 Ok(_) => true,
1179 Err(e) => {
1180 tracing::warn!(error=%e, "terminate_all(local): destroy_and_wait failed");
1181 false
1182 }
1183 }
1184 }))
1185 .buffer_unordered(max_in_flight.max(1))
1186 .collect::<Vec<bool>>()
1187 .await;
1188
1189 let ok = results.into_iter().filter(|b| *b).count();
1190
1191 TerminateSummary {
1192 attempted,
1193 ok,
1194 failed: attempted.saturating_sub(ok),
1195 }
1196 }
1197}
1198
1199#[async_trait::async_trait]
1200impl<S> SingleTerminate for LocalProcManager<S>
1201where
1202 S: Send + Sync,
1203{
1204 async fn terminate_proc(
1205 &self,
1206 _cx: &impl context::Actor,
1207 proc: &ProcAddr,
1208 timeout: std::time::Duration,
1209 reason: &str,
1210 ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
1211 let procs: Option<Proc> = {
1213 let mut guard = self.procs.lock().await;
1214 guard.remove(proc)
1215 };
1216 if let Some(mut p) = procs {
1217 p.destroy_and_wait(timeout, reason).await
1218 } else {
1219 Err(anyhow::anyhow!("proc {} doesn't exist", proc))
1220 }
1221 }
1222}
1223
1224pub struct LocalHandle<A: Actor + Referable> {
1242 proc_id: ProcAddr,
1243 addr: ChannelAddr,
1244 agent_ref: ActorRef<A>,
1245 procs: Arc<Mutex<HashMap<ProcAddr, Proc>>>,
1246}
1247
1248impl<A: Actor + Referable> Clone for LocalHandle<A> {
1250 fn clone(&self) -> Self {
1251 Self {
1252 proc_id: self.proc_id.clone(),
1253 addr: self.addr.clone(),
1254 agent_ref: self.agent_ref.clone(),
1255 procs: Arc::clone(&self.procs),
1256 }
1257 }
1258}
1259
1260#[async_trait]
1261impl<A: Actor + Referable> ProcHandle for LocalHandle<A> {
1262 type Agent = A;
1265 type TerminalStatus = ();
1266
1267 fn proc_addr(&self) -> &ProcAddr {
1268 &self.proc_id
1269 }
1270
1271 fn addr(&self) -> Option<ChannelAddr> {
1272 Some(self.addr.clone())
1273 }
1274
1275 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1276 Some(self.agent_ref.clone())
1277 }
1278
1279 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1282 Ok(())
1283 }
1284 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1288 Ok(())
1289 }
1290
1291 async fn terminate(
1292 &self,
1293 _cx: &impl context::Actor,
1294 timeout: Duration,
1295 reason: &str,
1296 ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1297 let mut proc = {
1298 let guard = self.procs.lock().await;
1299 match guard.get(self.proc_addr()) {
1300 Some(p) => p.clone(),
1301 None => {
1302 return Err(TerminateError::AlreadyTerminated(()));
1305 }
1306 }
1307 };
1308
1309 let _ = proc
1312 .destroy_and_wait(timeout, reason)
1313 .await
1314 .map_err(TerminateError::Io)?;
1315
1316 Ok(())
1317 }
1318
1319 async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1320 let mut proc = {
1323 let guard = self.procs.lock().await;
1324 match guard.get(self.proc_addr()) {
1325 Some(p) => p.clone(),
1326 None => return Err(TerminateError::AlreadyTerminated(())),
1327 }
1328 };
1329
1330 let _ = proc
1331 .destroy_and_wait(Duration::from_millis(0), "kill")
1332 .await
1333 .map_err(TerminateError::Io)?;
1334
1335 Ok(())
1336 }
1337}
1338
1339#[async_trait]
1354impl<A, S, F> ProcManager for LocalProcManager<S>
1355where
1356 A: Actor + Referable + Binds<A>,
1357 F: Future<Output = anyhow::Result<ActorHandle<A>>> + Send,
1358 S: Fn(Proc) -> F + Sync,
1359{
1360 type Handle = LocalHandle<A>;
1361
1362 fn transport(&self) -> ChannelTransport {
1363 ChannelTransport::Local
1364 }
1365
1366 #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1367 async fn spawn(
1368 &self,
1369 proc_id: ProcAddr,
1370 forwarder_addr: ChannelAddr,
1371 _config: (),
1372 ) -> Result<Self::Handle, HostError> {
1373 let transport = forwarder_addr.transport();
1374 let proc = Proc::configured(
1375 proc_id.clone(),
1376 MailboxClient::dial(forwarder_addr)?.into_boxed(),
1377 );
1378 let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
1379 self.procs
1380 .lock()
1381 .await
1382 .insert(proc_id.clone(), proc.clone());
1383 let _handle = proc.clone().serve(rx);
1384 let agent_handle = (self.spawn)(proc)
1385 .await
1386 .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1387
1388 Ok(LocalHandle {
1389 proc_id,
1390 addr: proc_addr,
1391 agent_ref: agent_handle.bind(),
1392 procs: Arc::clone(&self.procs),
1393 })
1394 }
1395}
1396
1397pub struct ProcessProcManager<A> {
1421 program: std::path::PathBuf,
1422 children: Arc<Mutex<HashMap<ProcAddr, Child>>>,
1423 _phantom: PhantomData<A>,
1424}
1425
1426impl<A> ProcessProcManager<A> {
1427 pub fn new(program: std::path::PathBuf) -> Self {
1430 Self {
1431 program,
1432 children: Arc::new(Mutex::new(HashMap::new())),
1433 _phantom: PhantomData,
1434 }
1435 }
1436}
1437
1438impl<A> Drop for ProcessProcManager<A> {
1439 fn drop(&mut self) {
1440 }
1444}
1445
1446#[derive(Debug)]
1472pub struct ProcessHandle<A: Actor + Referable> {
1473 proc_id: ProcAddr,
1474 addr: ChannelAddr,
1475 agent_ref: ActorRef<A>,
1476}
1477
1478impl<A: Actor + Referable> Clone for ProcessHandle<A> {
1480 fn clone(&self) -> Self {
1481 Self {
1482 proc_id: self.proc_id.clone(),
1483 addr: self.addr.clone(),
1484 agent_ref: self.agent_ref.clone(),
1485 }
1486 }
1487}
1488
1489#[async_trait]
1490impl<A: Actor + Referable> ProcHandle for ProcessHandle<A> {
1491 type Agent = A;
1494 type TerminalStatus = ();
1495
1496 fn proc_addr(&self) -> &ProcAddr {
1497 &self.proc_id
1498 }
1499
1500 fn addr(&self) -> Option<ChannelAddr> {
1501 Some(self.addr.clone())
1502 }
1503
1504 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1505 Some(self.agent_ref.clone())
1506 }
1507
1508 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1512 Ok(())
1513 }
1514 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1517 Ok(())
1518 }
1519
1520 async fn terminate(
1521 &self,
1522 _cx: &impl context::Actor,
1523 _deadline: Duration,
1524 _reason: &str,
1525 ) -> Result<(), TerminateError<Self::TerminalStatus>> {
1526 Err(TerminateError::Unsupported)
1527 }
1528
1529 async fn kill(&self) -> Result<(), TerminateError<Self::TerminalStatus>> {
1530 Err(TerminateError::Unsupported)
1531 }
1532}
1533
1534#[async_trait]
1535impl<A> ProcManager for ProcessProcManager<A>
1536where
1537 A: Actor + Referable + Sync,
1540{
1541 type Handle = ProcessHandle<A>;
1542
1543 fn transport(&self) -> ChannelTransport {
1544 ChannelTransport::Unix
1545 }
1546
1547 #[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=forwarder_addr.to_string()))]
1548 async fn spawn(
1549 &self,
1550 proc_id: ProcAddr,
1551 forwarder_addr: ChannelAddr,
1552 _config: (),
1553 ) -> Result<Self::Handle, HostError> {
1554 let (callback_addr, mut callback_rx) =
1555 channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1556
1557 let mut cmd = Command::new(&self.program);
1558 cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
1559 cmd.env("HYPERACTOR_HOST_BACKEND_ADDR", forwarder_addr.to_string());
1560 cmd.env("HYPERACTOR_HOST_CALLBACK_ADDR", callback_addr.to_string());
1561
1562 cmd.kill_on_drop(true);
1573
1574 let child = cmd.spawn().map_err(|e| {
1575 HostError::ProcessSpawnFailure(proc_id.clone(), self.program.display().to_string(), e)
1576 })?;
1577
1578 {
1581 let mut children = self.children.lock().await;
1582 children.insert(proc_id.clone(), child);
1583 }
1584
1585 let (proc_addr, agent_ref) = callback_rx.recv().await?;
1587
1588 Ok(ProcessHandle {
1597 proc_id,
1598 addr: proc_addr,
1599 agent_ref,
1600 })
1601 }
1602}
1603
1604impl<A> ProcessProcManager<A>
1605where
1606 A: Actor + Referable + Binds<A>,
1609{
1610 pub async fn boot_proc<S, F>(spawn: S) -> Result<Proc, HostError>
1615 where
1616 S: FnOnce(Proc) -> F,
1617 F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1618 {
1619 let proc_id: ProcAddr = Self::parse_env("HYPERACTOR_HOST_PROC_ID")?;
1620 let backend_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_BACKEND_ADDR")?;
1621 let callback_addr: ChannelAddr = Self::parse_env("HYPERACTOR_HOST_CALLBACK_ADDR")?;
1622 spawn_proc(proc_id, backend_addr, callback_addr, spawn).await
1623 }
1624
1625 fn parse_env<T, E>(key: &str) -> Result<T, HostError>
1626 where
1627 T: FromStr<Err = E>,
1628 E: Into<anyhow::Error>,
1629 {
1630 std::env::var(key)
1631 .map_err(|e| HostError::MissingParameter(key.to_string(), e))?
1632 .parse()
1633 .map_err(|e: E| HostError::InvalidParameter(key.to_string(), e.into()))
1634 }
1635}
1636
1637#[hyperactor::instrument(fields(proc_id=proc_id.to_string(), addr=backend_addr.to_string(), callback_addr=callback_addr.to_string()))]
1642pub async fn spawn_proc<A, S, F>(
1643 proc_id: ProcAddr,
1644 backend_addr: ChannelAddr,
1645 callback_addr: ChannelAddr,
1646 spawn: S,
1647) -> Result<Proc, HostError>
1648where
1649 A: Actor + Referable + Binds<A>,
1652 S: FnOnce(Proc) -> F,
1653 F: Future<Output = Result<ActorHandle<A>, anyhow::Error>>,
1654{
1655 let backend_transport = backend_addr.transport();
1656 let proc = Proc::configured(
1657 proc_id.clone(),
1658 MailboxClient::dial(backend_addr)?.into_boxed(),
1659 );
1660
1661 let agent_handle = spawn(proc.clone())
1662 .await
1663 .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
1664
1665 let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1668 proc.clone().serve(proc_rx);
1669 let agent_ref: ActorRef<A> = agent_handle.bind::<A>();
1670 channel::dial::<(ChannelAddr, ActorRef<A>)>(callback_addr)?
1671 .send((proc_addr, agent_ref))
1672 .await
1673 .map_err(ChannelError::from)?;
1674
1675 Ok(proc)
1676}
1677
1678pub mod testing {
1681 use async_trait::async_trait;
1682 use hyperactor::Actor;
1683 use hyperactor::ActorAddr;
1684 use hyperactor::Context;
1685 use hyperactor::Endpoint as _;
1686 use hyperactor::Handler;
1687 use hyperactor::OncePortRef;
1688 #[derive(Debug, Default)]
1691 #[hyperactor::export(handlers = [OncePortRef<ActorAddr>])]
1692 pub struct EchoActor;
1693
1694 impl Actor for EchoActor {}
1695
1696 #[async_trait]
1697 impl Handler<OncePortRef<ActorAddr>> for EchoActor {
1698 async fn handle(
1699 &mut self,
1700 cx: &Context<Self>,
1701 reply: OncePortRef<ActorAddr>,
1702 ) -> Result<(), anyhow::Error> {
1703 reply.post(cx, cx.self_addr().clone());
1704 Ok(())
1705 }
1706 }
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711 use std::sync::Arc;
1712 use std::time::Duration;
1713
1714 use async_trait::async_trait;
1715 use hyperactor::Actor;
1716 use hyperactor::Context;
1717 use hyperactor::Endpoint as _;
1718 use hyperactor::Handler;
1719 use hyperactor::Instance;
1720 use hyperactor::OncePortRef;
1721 use hyperactor::PortRef;
1722 use hyperactor::channel::ChannelTransport;
1723 use hyperactor::channel::Tx;
1724 use hyperactor::channel::TxStatus;
1725 use hyperactor::context::Mailbox;
1726 use hyperactor::mailbox::Undeliverable;
1727 use hyperactor::port::Port;
1728 use tokio::sync::mpsc;
1729
1730 use super::testing::EchoActor;
1731 use super::*;
1732
1733 type SendTo = PortRef<String>;
1737
1738 #[derive(Debug)]
1741 #[hyperactor::export(handlers = [SendTo])]
1742 struct UndeliverableCollector {
1743 tx: mpsc::UnboundedSender<Undeliverable<MessageEnvelope>>,
1744 }
1745
1746 #[async_trait]
1747 impl Actor for UndeliverableCollector {
1748 async fn handle_undeliverable_message(
1749 &mut self,
1750 _cx: &Instance<Self>,
1751 message: Undeliverable<MessageEnvelope>,
1752 ) -> Result<(), anyhow::Error> {
1753 let _ = self.tx.send(message);
1754 Ok(())
1755 }
1756 }
1757
1758 #[async_trait]
1759 impl Handler<SendTo> for UndeliverableCollector {
1760 async fn handle(&mut self, cx: &Context<Self>, dest: SendTo) -> Result<(), anyhow::Error> {
1761 dest.post(cx, "into-the-void".to_string());
1762 Ok(())
1763 }
1764 }
1765
1766 #[tokio::test]
1767 async fn test_basic() {
1768 let proc_manager =
1769 LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
1770 let procs = Arc::clone(&proc_manager.procs);
1771 let mut host = Host::new(proc_manager, ChannelAddr::any(ChannelTransport::Unix))
1772 .await
1773 .unwrap();
1774
1775 let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
1776 assert_eq!(
1777 proc_id1,
1778 ResourceId::proc_addr_from_name(host.addr().clone(), "proc1")
1779 );
1780 assert!(procs.lock().await.contains_key(&proc_id1));
1781
1782 let (proc_id2, _ref) = host.spawn("proc2".to_string(), ()).await.unwrap();
1783 assert!(procs.lock().await.contains_key(&proc_id2));
1784
1785 let proc1 = procs.lock().await.get(&proc_id1).unwrap().clone();
1786 let proc2 = procs.lock().await.get(&proc_id2).unwrap().clone();
1787
1788 let (instance1, _handle) = proc1.client("client").unwrap();
1790 let (instance2, _handle) = proc2.client("client").unwrap();
1791
1792 let (port, mut rx) = instance1.mailbox().open_port();
1793
1794 port.bind().post(&instance2, "hello".to_string());
1795 assert_eq!(rx.recv().await.unwrap(), "hello".to_string());
1796
1797 let (system_actor, _handle) = host.system_proc().client("test").unwrap();
1799
1800 port.bind()
1802 .post(&system_actor, "hello from the system proc".to_string());
1803 assert_eq!(
1804 rx.recv().await.unwrap(),
1805 "hello from the system proc".to_string()
1806 );
1807
1808 let (port, mut rx) = system_actor.mailbox().open_port();
1810 port.bind()
1811 .post(&system_actor, "hello from the system".to_string());
1812 assert_eq!(
1813 rx.recv().await.unwrap(),
1814 "hello from the system".to_string()
1815 );
1816
1817 port.bind()
1819 .post(&instance1, "hello from the instance1".to_string());
1820 assert_eq!(
1821 rx.recv().await.unwrap(),
1822 "hello from the instance1".to_string()
1823 );
1824 }
1825
1826 #[tokio::test]
1827 #[cfg_attr(not(fbcode_build), ignore)]
1829 async fn test_process_proc_manager() {
1830 hyperactor_telemetry::initialize_logging(hyperactor_telemetry::DefaultTelemetryClock {});
1831
1832 let process_manager = ProcessProcManager::<EchoActor>::new(
1834 buck_resources::get("monarch/hyperactor_mesh/host_bootstrap").unwrap(),
1835 );
1836 let mut host = Host::new(process_manager, ChannelAddr::any(ChannelTransport::Unix))
1837 .await
1838 .unwrap();
1839
1840 host.serve().unwrap();
1843
1844 assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
1846 let (proc1, echo1) = host.spawn("proc1".to_string(), ()).await.unwrap();
1847 let (proc2, echo2) = host.spawn("proc2".to_string(), ()).await.unwrap();
1848 assert_eq!(echo1.actor_addr().proc_addr(), proc1);
1849 assert_eq!(echo2.actor_addr().proc_addr(), proc2);
1850
1851 let dup = host.spawn("proc1".to_string(), ()).await;
1853 assert!(matches!(dup, Err(HostError::ProcExists(_))));
1854
1855 let client = Proc::direct(
1862 ChannelAddr::any(host.addr().transport()),
1863 "test".to_string(),
1864 )
1865 .unwrap();
1866 let (client_inst, _h) = client.client("test").unwrap();
1867 let (port, rx) = client_inst.mailbox().open_once_port();
1868 echo1.post(&client_inst, port.bind());
1869 let id = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1870 .await
1871 .unwrap()
1872 .unwrap();
1873 assert_eq!(id, *echo1.actor_addr());
1874
1875 let (port2, rx2) = client_inst.mailbox().open_once_port();
1883 echo2.post(&client_inst, port2.bind());
1884 let id2 = tokio::time::timeout(Duration::from_secs(5), rx2.recv())
1885 .await
1886 .unwrap()
1887 .unwrap();
1888 assert_eq!(id2, *echo2.actor_addr());
1889
1890 let (sys_inst, _h) = host.system_proc().client("sys-client").unwrap();
1899 let (port3, rx3) = client_inst.mailbox().open_once_port();
1900 echo1.post(&sys_inst, port3.bind());
1903 let id3 = tokio::time::timeout(Duration::from_secs(5), rx3.recv())
1904 .await
1905 .unwrap()
1906 .unwrap();
1907 assert_eq!(id3, *echo1.actor_addr());
1908 }
1909
1910 #[tokio::test]
1911 async fn local_ready_and_wait_are_immediate() {
1912 let addr = ChannelAddr::any(ChannelTransport::Local);
1914 let proc_ref = ResourceId::proc_addr_from_name(addr.clone(), "p");
1915 let actor_ref = proc_ref.actor_addr("host_agent");
1916 let agent_ref = ActorRef::<()>::attest(actor_ref);
1917 let h = LocalHandle::<()> {
1918 proc_id: proc_ref,
1919 addr,
1920 agent_ref,
1921 procs: Arc::new(Mutex::new(HashMap::new())),
1922 };
1923
1924 assert!(h.ready().await.is_ok());
1926
1927 assert!(h.wait().await.is_ok());
1929
1930 let (r1, r2) = tokio::join!(h.ready(), h.ready());
1932 assert!(r1.is_ok() && r2.is_ok());
1933 }
1934
1935 #[derive(Debug, Clone, Copy)]
1939 enum ReadyMode {
1940 OkAfter(Duration),
1941 ErrTerminal,
1942 ErrChannelClosed,
1943 }
1944
1945 #[derive(Debug, Clone)]
1946 struct TestHandle {
1947 id: ProcAddr,
1948 addr: ChannelAddr,
1949 agent: ActorRef<()>,
1950 mode: ReadyMode,
1951 omit_addr: bool,
1952 omit_agent: bool,
1953 }
1954
1955 #[async_trait::async_trait]
1956 impl ProcHandle for TestHandle {
1957 type Agent = ();
1958 type TerminalStatus = ();
1959
1960 fn proc_addr(&self) -> &ProcAddr {
1961 &self.id
1962 }
1963
1964 fn addr(&self) -> Option<ChannelAddr> {
1965 if self.omit_addr {
1966 None
1967 } else {
1968 Some(self.addr.clone())
1969 }
1970 }
1971
1972 fn agent_ref(&self) -> Option<ActorRef<Self::Agent>> {
1973 if self.omit_agent {
1974 None
1975 } else {
1976 Some(self.agent.clone())
1977 }
1978 }
1979
1980 async fn ready(&self) -> Result<(), ReadyError<Self::TerminalStatus>> {
1981 match self.mode {
1982 ReadyMode::OkAfter(d) => {
1983 if !d.is_zero() {
1984 tokio::time::sleep(d).await;
1985 }
1986 Ok(())
1987 }
1988 ReadyMode::ErrTerminal => Err(ReadyError::Terminal(())),
1989 ReadyMode::ErrChannelClosed => Err(ReadyError::ChannelClosed),
1990 }
1991 }
1992 async fn wait(&self) -> Result<Self::TerminalStatus, WaitError> {
1993 Ok(())
1994 }
1995 async fn terminate(
1996 &self,
1997 _cx: &impl context::Actor,
1998 _timeout: Duration,
1999 _reason: &str,
2000 ) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
2001 Err(TerminateError::Unsupported)
2002 }
2003 async fn kill(&self) -> Result<Self::TerminalStatus, TerminateError<Self::TerminalStatus>> {
2004 Err(TerminateError::Unsupported)
2005 }
2006 }
2007
2008 #[derive(Debug, Clone)]
2009 struct TestManager {
2010 mode: ReadyMode,
2011 omit_addr: bool,
2012 omit_agent: bool,
2013 transport: ChannelTransport,
2014 }
2015
2016 impl TestManager {
2017 fn local(mode: ReadyMode) -> Self {
2018 Self {
2019 mode,
2020 omit_addr: false,
2021 omit_agent: false,
2022 transport: ChannelTransport::Local,
2023 }
2024 }
2025 fn with_omissions(mut self, addr: bool, agent: bool) -> Self {
2026 self.omit_addr = addr;
2027 self.omit_agent = agent;
2028 self
2029 }
2030 }
2031
2032 #[async_trait::async_trait]
2033 impl ProcManager for TestManager {
2034 type Handle = TestHandle;
2035
2036 fn transport(&self) -> ChannelTransport {
2037 self.transport.clone()
2038 }
2039
2040 async fn spawn(
2041 &self,
2042 proc_id: ProcAddr,
2043 forwarder_addr: ChannelAddr,
2044 _config: (),
2045 ) -> Result<Self::Handle, HostError> {
2046 let agent = ActorRef::<()>::attest(proc_id.actor_addr("host_agent"));
2047 Ok(TestHandle {
2048 id: proc_id,
2049 addr: forwarder_addr,
2050 agent,
2051 mode: self.mode,
2052 omit_addr: self.omit_addr,
2053 omit_agent: self.omit_agent,
2054 })
2055 }
2056 }
2057
2058 #[tokio::test]
2059 async fn host_spawn_times_out_when_configured() {
2060 let cfg = hyperactor_config::global::lock();
2061 let _g = cfg.override_key(
2062 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2063 Duration::from_millis(10),
2064 );
2065
2066 let mut host = Host::new(
2067 TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
2068 ChannelAddr::any(ChannelTransport::Local),
2069 )
2070 .await
2071 .unwrap();
2072
2073 let err = host.spawn("t".into(), ()).await.expect_err("must time out");
2074 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2075 }
2076
2077 #[tokio::test]
2078 async fn host_spawn_timeout_zero_disables_and_succeeds() {
2079 let cfg = hyperactor_config::global::lock();
2080 let _g = cfg.override_key(
2081 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
2082 Duration::from_secs(0),
2083 );
2084
2085 let mut host = Host::new(
2086 TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
2087 ChannelAddr::any(ChannelTransport::Local),
2088 )
2089 .await
2090 .unwrap();
2091
2092 let (pid, agent) = host.spawn("ok".into(), ()).await.expect("must succeed");
2093 assert_eq!(agent.actor_addr().proc_addr(), pid);
2094 assert!(host.procs.contains("ok"));
2095 }
2096
2097 #[tokio::test]
2098 async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
2099 let mut host = Host::new(
2100 TestManager::local(ReadyMode::ErrChannelClosed),
2101 ChannelAddr::any(ChannelTransport::Local),
2102 )
2103 .await
2104 .unwrap();
2105
2106 let err = host.spawn("p".into(), ()).await.expect_err("must fail");
2107 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2108 }
2109
2110 #[tokio::test]
2111 async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
2112 let mut host = Host::new(
2113 TestManager::local(ReadyMode::ErrTerminal),
2114 ChannelAddr::any(ChannelTransport::Local),
2115 )
2116 .await
2117 .unwrap();
2118
2119 let err = host.spawn("p".into(), ()).await.expect_err("must fail");
2120 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2121 }
2122
2123 #[tokio::test]
2124 async fn host_spawn_fails_if_ready_but_missing_addr() {
2125 let mut host = Host::new(
2126 TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
2127 ChannelAddr::any(ChannelTransport::Local),
2128 )
2129 .await
2130 .unwrap();
2131
2132 let err = host
2133 .spawn("no-addr".into(), ())
2134 .await
2135 .expect_err("must fail");
2136 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2137 }
2138
2139 #[tokio::test]
2140 async fn host_spawn_fails_if_ready_but_missing_agent() {
2141 let mut host = Host::new(
2142 TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
2143 ChannelAddr::any(ChannelTransport::Local),
2144 )
2145 .await
2146 .unwrap();
2147
2148 let err = host
2149 .spawn("no-agent".into(), ())
2150 .await
2151 .expect_err("must fail");
2152 assert!(matches!(err, HostError::ProcessConfigurationFailure(_, _)));
2153 }
2154
2155 #[tokio::test]
2156 async fn test_duplex_remote_proc() {
2157 let proc_manager =
2159 LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2160 let mut host = Host::new_with_default(
2161 proc_manager,
2162 ChannelAddr::any(ChannelTransport::Unix),
2163 None,
2164 None,
2165 )
2166 .await
2167 .unwrap();
2168 host.serve().unwrap();
2169
2170 let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2171 assert_eq!(remote_proc.proc_addr().addr(), host.addr());
2172
2173 let (system_inst, _h) = host.system_proc().client("test-sender").unwrap();
2176 let (remote_inst, _rh) = remote_proc.client("remote-client").unwrap();
2177
2178 let (remote_port, mut remote_rx) = remote_inst.mailbox().open_port();
2179 let remote_port = remote_port.bind();
2180
2181 remote_port.post(&system_inst, "hello-to-remote".to_string());
2182
2183 let arrived: String = tokio::time::timeout(Duration::from_secs(5), remote_rx.recv())
2184 .await
2185 .expect("timed out waiting for message on remote rx")
2186 .expect("recv failed");
2187 assert_eq!(arrived, "hello-to-remote");
2188
2189 let (host_port, mut host_rx) = system_inst.mailbox().open_port();
2192 let host_port = host_port.bind();
2193
2194 host_port.post(&remote_inst, "hello-from-remote".to_string());
2195
2196 let arrived: String = tokio::time::timeout(Duration::from_secs(5), host_rx.recv())
2197 .await
2198 .expect("timed out waiting for inbound message")
2199 .expect("recv failed");
2200 assert_eq!(arrived, "hello-from-remote");
2201 }
2202
2203 #[tokio::test]
2204 async fn test_duplex_undeliverable_from_client() {
2205 let proc_manager =
2210 LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2211 let mut host = Host::new_with_default(
2212 proc_manager,
2213 ChannelAddr::any(ChannelTransport::Unix),
2214 None,
2215 None,
2216 )
2217 .await
2218 .unwrap();
2219 host.serve().unwrap();
2220
2221 let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2222
2223 let (undlv_tx, mut undlv_rx) = mpsc::unbounded_channel();
2225 let handle = remote_proc
2226 .spawn("collector", UndeliverableCollector { tx: undlv_tx })
2227 .unwrap();
2228 let collector_ref = handle.bind::<UndeliverableCollector>();
2229
2230 let bogus_actor = host.system_proc().proc_addr().actor_addr("no-such-actor");
2233 let bogus_port = bogus_actor.port_addr(Port::from(0u64));
2234 let bogus_dest = PortRef::<String>::attest(bogus_port);
2235
2236 let (trigger_inst, _h) = remote_proc.client("trigger").unwrap();
2237 collector_ref
2238 .port::<SendTo>()
2239 .post(&trigger_inst, bogus_dest);
2240
2241 let undeliverable = tokio::time::timeout(Duration::from_secs(5), undlv_rx.recv())
2242 .await
2243 .expect("timed out waiting for undeliverable")
2244 .expect("channel closed");
2245
2246 assert_eq!(
2247 undeliverable
2248 .into_message()
2249 .expect("expected returned envelope")
2250 .dest()
2251 .actor_id(),
2252 bogus_actor.id()
2253 );
2254 }
2255
2256 #[tokio::test]
2257 async fn test_duplex_undeliverable_from_host() {
2258 let proc_manager =
2263 LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2264 let mut host = Host::new_with_default(
2265 proc_manager,
2266 ChannelAddr::any(ChannelTransport::Unix),
2267 None,
2268 None,
2269 )
2270 .await
2271 .unwrap();
2272 host.serve().unwrap();
2273
2274 let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2275
2276 let (undlv_tx, mut undlv_rx) = mpsc::unbounded_channel();
2278 let handle = host
2279 .system_proc()
2280 .spawn("collector", UndeliverableCollector { tx: undlv_tx })
2281 .unwrap();
2282 let collector_ref = handle.bind::<UndeliverableCollector>();
2283
2284 let bogus_actor = remote_proc.proc_addr().actor_addr("ghost-actor");
2287 let bogus_port = bogus_actor.port_addr(Port::from(0u64));
2288 let bogus_dest = PortRef::<String>::attest(bogus_port);
2289
2290 let (trigger_inst, _h) = host.system_proc().client("trigger").unwrap();
2291 collector_ref
2292 .port::<SendTo>()
2293 .post(&trigger_inst, bogus_dest);
2294
2295 let undeliverable = tokio::time::timeout(Duration::from_secs(5), undlv_rx.recv())
2296 .await
2297 .expect("timed out waiting for undeliverable")
2298 .expect("channel closed");
2299
2300 assert_eq!(
2301 undeliverable
2302 .into_message()
2303 .expect("expected returned envelope")
2304 .dest()
2305 .actor_id(),
2306 bogus_actor.id()
2307 );
2308 }
2309
2310 #[tokio::test]
2311 async fn test_duplex_teardown() {
2312 let proc_manager =
2317 LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("host_agent", ()) });
2318 let mut host = Host::new_with_default(
2319 proc_manager,
2320 ChannelAddr::any(ChannelTransport::Unix),
2321 None,
2322 None,
2323 )
2324 .await
2325 .unwrap();
2326 let serve_handle = host.serve().unwrap();
2327
2328 assert!(matches!(host.serve(), Err(HostError::AlreadyServing)));
2330
2331 let remote_proc = Proc::attach_to_host(host.addr().clone()).await.unwrap();
2332
2333 let (system_inst, _h) = host.system_proc().client("teardown-sender").unwrap();
2334 let (remote_inst, _rh) = remote_proc.client("teardown-client").unwrap();
2335
2336 let (remote_port, mut remote_rx) = remote_inst.mailbox().open_port();
2337 let remote_port = remote_port.bind();
2338 remote_port.post(&system_inst, "pre-stop".to_string());
2339 let arrived: String = tokio::time::timeout(Duration::from_secs(5), remote_rx.recv())
2340 .await
2341 .expect("timed out waiting for message on remote rx")
2342 .expect("recv failed");
2343 assert_eq!(arrived, "pre-stop");
2344
2345 serve_handle.stop("teardown");
2346 tokio::time::timeout(Duration::from_secs(5), serve_handle)
2347 .await
2348 .expect("timed out waiting for serve handle to resolve")
2349 .expect("serve task panicked")
2350 .expect("serve task returned error");
2351 }
2352
2353 #[tokio::test]
2366 async fn test_simplex_peer_sees_clean_close_on_host_shutdown() {
2367 let proc_manager = LocalProcManager::new(|proc: Proc| async move {
2368 proc.spawn::<EchoActor>("host_agent", EchoActor)
2369 });
2370 let mut host = Host::new_with_default(
2371 proc_manager,
2372 ChannelAddr::any(ChannelTransport::Unix),
2373 None,
2374 None,
2375 )
2376 .await
2377 .unwrap();
2378 let serve_handle = host.serve().unwrap();
2379
2380 let echo_handle = host
2382 .system_proc()
2383 .spawn::<EchoActor>("echo", EchoActor)
2384 .unwrap();
2385 let echo_ref = echo_handle.bind::<EchoActor>();
2386
2387 let dial_router = DialMailboxRouter::new();
2388 dial_router.bind(
2389 Addr::from(host.system_proc().proc_addr().clone()),
2390 host.addr().clone(),
2391 );
2392 let client_addr = ChannelAddr::any(ChannelTransport::Unix);
2393 let (client_listen_addr, client_rx) = channel::serve(client_addr).unwrap();
2394 let client_proc_id = ResourceId::proc_addr_from_name(client_listen_addr, "client");
2395 let client_proc = Proc::configured(client_proc_id, dial_router.into_boxed());
2396 let _client_handle = client_proc.clone().serve(client_rx);
2397
2398 let (client_inst, _h) = client_proc.client("requester").unwrap();
2399 let (reply_port, reply_handle) = client_inst.mailbox().open_once_port::<ActorAddr>();
2400 let reply_port = reply_port.bind();
2401 echo_ref
2402 .port::<OncePortRef<ActorAddr>>()
2403 .post(&client_inst, reply_port);
2404 let _ = tokio::time::timeout(Duration::from_secs(5), reply_handle.recv())
2405 .await
2406 .expect("baseline round-trip timed out")
2407 .expect("baseline recv failed");
2408
2409 let host_tx = channel::dial::<MessageEnvelope>(host.addr().clone()).unwrap();
2411 let dummy_dest = host
2413 .system_proc()
2414 .proc_addr()
2415 .actor_addr("noop")
2416 .port_addr(Port::from(0u64));
2417 let envelope = MessageEnvelope::serialize(
2418 client_inst.self_addr().clone(),
2419 dummy_dest,
2420 &"warmup".to_string(),
2421 Default::default(),
2422 )
2423 .unwrap();
2424 host_tx.post(envelope);
2425 tokio::time::sleep(Duration::from_millis(200)).await;
2427 assert!(matches!(*host_tx.status().borrow(), TxStatus::Active));
2428
2429 serve_handle.stop("test shutdown");
2433 let _ = tokio::time::timeout(Duration::from_secs(5), serve_handle)
2434 .await
2435 .expect("serve handle did not resolve");
2436
2437 let mut status = host_tx.status().clone();
2441 tokio::time::timeout(Duration::from_secs(10), async {
2442 loop {
2443 if let TxStatus::Closed(_) = *status.borrow() {
2444 return;
2445 }
2446 if status.changed().await.is_err() {
2447 return;
2448 }
2449 }
2450 })
2451 .await
2452 .expect("simplex peer did not see Closed within 10s of host shutdown");
2453
2454 match &*host_tx.status().borrow() {
2455 TxStatus::Closed(_) => {}
2456 other => panic!("expected TxStatus::Closed, got {:?}", other),
2457 }
2458 }
2459
2460 #[tokio::test]
2465 async fn test_simplex_clients_during_host_shutdown() {
2466 let proc_manager = LocalProcManager::new(|proc: Proc| async move {
2467 proc.spawn::<EchoActor>("host_agent", EchoActor)
2468 });
2469 let mut host = Host::new_with_default(
2470 proc_manager,
2471 ChannelAddr::any(ChannelTransport::Unix),
2472 None,
2473 None,
2474 )
2475 .await
2476 .unwrap();
2477 let serve_handle = host.serve().unwrap();
2478
2479 let echo_handle = host
2480 .system_proc()
2481 .spawn::<EchoActor>("echo", EchoActor)
2482 .unwrap();
2483 let echo_ref = echo_handle.bind::<EchoActor>();
2484 let host_addr = host.addr().clone();
2485 let echo_actor_id = echo_ref.actor_addr().clone();
2486 let system_proc_id = host.system_proc().proc_addr().clone();
2487
2488 const N_CLIENTS: usize = 4;
2490 const M_REQUESTS: usize = 5;
2491
2492 let mut client_tasks = Vec::new();
2493 for ci in 0..N_CLIENTS {
2494 let host_addr = host_addr.clone();
2495 let echo_actor_id = echo_actor_id.clone();
2496 let system_proc_id = system_proc_id.clone();
2497 client_tasks.push(tokio::spawn(async move {
2498 let dial_router = DialMailboxRouter::new();
2499 dial_router.bind(Addr::from(system_proc_id.clone()), host_addr);
2500 let client_addr = ChannelAddr::any(ChannelTransport::Unix);
2501 let (client_listen_addr, client_rx) = channel::serve(client_addr).unwrap();
2502 let client_proc_id =
2503 ResourceId::proc_addr_from_name(client_listen_addr, format!("client-{}", ci));
2504 let client_proc = Proc::configured(client_proc_id, dial_router.into_boxed());
2505 let _client_handle = client_proc.clone().serve(client_rx);
2506
2507 let echo_ref = ActorRef::<EchoActor>::attest(echo_actor_id);
2508
2509 for ri in 0..M_REQUESTS {
2510 let (client_inst, _h) = client_proc.client(&format!("req-{}", ri)).unwrap();
2511 let (reply_port, reply_handle) =
2512 client_inst.mailbox().open_once_port::<ActorAddr>();
2513 let reply_port = reply_port.bind();
2514 echo_ref
2515 .port::<OncePortRef<ActorAddr>>()
2516 .post(&client_inst, reply_port);
2517 let received =
2518 tokio::time::timeout(Duration::from_secs(10), reply_handle.recv())
2519 .await
2520 .expect("timeout waiting for reply")
2521 .expect("recv failed");
2522 assert_eq!(received, *echo_ref.actor_addr());
2523 }
2524 }));
2525 }
2526
2527 for task in client_tasks {
2528 task.await.unwrap();
2529 }
2530
2531 serve_handle.stop("test cleanup");
2533 tokio::time::timeout(Duration::from_secs(10), serve_handle)
2534 .await
2535 .expect("serve handle did not resolve")
2536 .expect("serve task panicked")
2537 .expect("serve task error");
2538 }
2539
2540 #[tokio::test]
2544 async fn test_simplex_client_to_duplex_host() {
2545 let proc_manager = LocalProcManager::new(|proc: Proc| async move {
2546 proc.spawn::<EchoActor>("host_agent", EchoActor)
2547 });
2548 let mut host = Host::new_with_default(
2549 proc_manager,
2550 ChannelAddr::any(ChannelTransport::Unix),
2551 None,
2552 None,
2553 )
2554 .await
2555 .unwrap();
2556 let _serve_handle = host.serve().unwrap();
2557
2558 let echo_handle = host
2560 .system_proc()
2561 .spawn::<EchoActor>("echo", EchoActor)
2562 .unwrap();
2563 let echo_ref = echo_handle.bind::<EchoActor>();
2564
2565 let client_addr = ChannelAddr::any(ChannelTransport::Unix);
2571 let dial_router = DialMailboxRouter::new();
2572 dial_router.bind(
2573 Addr::from(host.system_proc().proc_addr().clone()),
2574 host.addr().clone(),
2575 );
2576 let (client_listen_addr, client_rx) = channel::serve(client_addr).unwrap();
2577 let client_proc_id = ResourceId::proc_addr_from_name(client_listen_addr, "external-client");
2578 let client_proc = Proc::configured(client_proc_id, dial_router.into_boxed());
2579 let _client_handle = client_proc.clone().serve(client_rx);
2580
2581 let (client_inst, _client_h) = client_proc.client("requester").unwrap();
2582
2583 let (reply_port, reply_handle) = client_inst.mailbox().open_once_port::<ActorAddr>();
2587 let reply_port = reply_port.bind();
2588 echo_ref
2589 .port::<OncePortRef<ActorAddr>>()
2590 .post(&client_inst, reply_port);
2591
2592 let received = tokio::time::timeout(Duration::from_secs(10), reply_handle.recv())
2593 .await
2594 .expect("timed out waiting for reply")
2595 .expect("recv failed");
2596 assert_eq!(received, *echo_ref.actor_addr());
2597 }
2598}