1#![allow(dead_code)] use std::any::TypeId;
14use std::borrow::Cow;
15use std::fmt;
16use std::fmt::Debug;
17use std::future::Future;
18use std::future::IntoFuture;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use hyperactor_config::Flattrs;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::watch;
31use tokio::task::JoinHandle;
32use typeuri::Named;
33
34use crate as hyperactor; use crate::ActorAddr;
36use crate::ActorRef;
37use crate::Data;
38use crate::EndpointLocation;
39use crate::Message;
40use crate::RemoteMessage;
41use crate::context;
42use crate::endpoint::Endpoint;
43use crate::mailbox::MailboxError;
44use crate::mailbox::MailboxSenderError;
45use crate::mailbox::MessageEnvelope;
46use crate::mailbox::PortHandle;
47use crate::mailbox::Undeliverable;
48use crate::mailbox::UndeliverableMessageError;
49use crate::message::Castable;
50use crate::message::IndexedErasedUnbound;
51use crate::proc::Context;
52use crate::proc::HandlerPorts;
53use crate::proc::Instance;
54use crate::proc::InstanceCell;
55use crate::proc::Proc;
56use crate::supervision::ActorSupervisionEvent;
57
58pub mod remote;
59
60#[derive(
62 Clone,
63 Copy,
64 Debug,
65 Serialize,
66 Deserialize,
67 PartialEq,
68 Eq,
69 typeuri::Named
70)]
71pub enum StopMode {
72 Stop,
74 DrainAndStop,
76}
77wirevalue::register_type!(StopMode);
78
79#[async_trait]
87pub trait Actor: Sized + Send + 'static {
88 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
93 Ok(())
95 }
96
97 async fn handle_stop(
106 &mut self,
107 this: &Instance<Self>,
108 mode: StopMode,
109 reason: &str,
110 ) -> Result<(), anyhow::Error> {
111 handle_stop(this, mode, reason)
112 }
113
114 async fn cleanup(
125 &mut self,
126 _this: &Instance<Self>,
127 _err: Option<&ActorError>,
128 ) -> Result<(), anyhow::Error> {
129 Ok(())
131 }
132
133 fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
136 cx.instance().spawn(self)
137 }
138
139 fn spawn_with_name(
142 self,
143 cx: &impl context::Actor,
144 name: &str,
145 ) -> anyhow::Result<ActorHandle<Self>> {
146 cx.instance().spawn_with_name(name, self)
147 }
148
149 fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
156 Proc::isolated().spawn("anon", self)
157 }
158
159 #[hyperactor::instrument_infallible]
163 fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
164 where
165 F: Future + Send + 'static,
166 F::Output: Send + 'static,
167 {
168 tokio::spawn(future)
169 }
170
171 async fn handle_supervision_event(
173 &mut self,
174 _this: &Instance<Self>,
175 event: &ActorSupervisionEvent,
176 ) -> Result<bool, anyhow::Error> {
177 Ok(!event.is_error())
180 }
181
182 async fn handle_undeliverable_message(
184 &mut self,
185 cx: &Instance<Self>,
186 envelope: Undeliverable<MessageEnvelope>,
187 ) -> Result<(), anyhow::Error> {
188 handle_undeliverable_message(cx, envelope)
189 }
190
191 fn display_name(&self) -> Option<String> {
195 None
196 }
197}
198
199pub fn handle_undeliverable_message<A: Actor>(
203 cx: &Instance<A>,
204 undeliverable: Undeliverable<MessageEnvelope>,
205) -> Result<(), anyhow::Error> {
206 match undeliverable {
207 Undeliverable::Message(envelope) => {
208 assert_eq!(envelope.sender(), cx.self_addr());
209 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
210 }
211 Undeliverable::Lost(lost) => {
212 assert_eq!(&lost.sender, cx.self_addr());
213 anyhow::bail!(UndeliverableMessageError::Lost { lost });
214 }
215 }
216}
217
218pub fn handle_stop<A: Actor>(
222 this: &Instance<A>,
223 mode: StopMode,
224 reason: &str,
225) -> Result<(), anyhow::Error> {
226 this.close();
229 match mode {
230 StopMode::Stop => this.exit(reason).map_err(anyhow::Error::from),
231 StopMode::DrainAndStop => this.exit_after_drain(reason).map_err(anyhow::Error::from),
232 }
233}
234
235#[async_trait]
238impl Actor for () {}
239
240impl Referable for () {}
241
242impl Binds<()> for () {
243 fn bind(_ports: &HandlerPorts<Self>) {
244 }
246}
247
248#[async_trait]
250pub trait Handler<M>: Actor {
251 async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
253}
254
255#[async_trait]
265impl<A: Actor> Handler<Signal> for A {
266 async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
267 unimplemented!("signal handler should not be called directly")
268 }
269}
270
271#[async_trait]
272impl<A: Actor> Handler<crate::introspect::IntrospectMessage> for A {
273 async fn handle(
274 &mut self,
275 _cx: &Context<Self>,
276 _message: crate::introspect::IntrospectMessage,
277 ) -> Result<(), anyhow::Error> {
278 unimplemented!("introspect message handler should not be called directly")
279 }
280}
281
282#[async_trait]
285impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
286 async fn handle(
287 &mut self,
288 cx: &Context<Self>,
289 message: Undeliverable<MessageEnvelope>,
290 ) -> Result<(), anyhow::Error> {
291 let (sender, dest, error) = match &message {
292 Undeliverable::Message(envelope) => (
293 envelope.sender().to_string(),
294 envelope.dest().to_string(),
295 envelope.error_msg().unwrap_or_default(),
296 ),
297 Undeliverable::Lost(lost) => (
298 lost.sender.to_string(),
299 lost.dest.to_string(),
300 lost.error.clone(),
301 ),
302 };
303 match self.handle_undeliverable_message(cx, message).await {
304 Ok(_) => {
305 tracing::debug!(
306 actor_id = %cx.self_addr(),
307 name = "undeliverable_message_handled",
308 %sender,
309 %dest,
310 error,
311 );
312 Ok(())
313 }
314 Err(e) => {
315 tracing::error!(
316 actor_id = %cx.self_addr(),
317 name = "undeliverable_message",
318 %sender,
319 %dest,
320 error,
321 handler_error = %e,
322 );
323 Err(e)
324 }
325 }
326 }
327}
328
329#[async_trait]
332impl<A, M> Handler<IndexedErasedUnbound<M>> for A
333where
334 A: Handler<M>,
335 M: Castable,
336{
337 async fn handle(
338 &mut self,
339 cx: &Context<Self>,
340 msg: IndexedErasedUnbound<M>,
341 ) -> anyhow::Result<()> {
342 let message = msg.downcast()?.bind()?;
343 Handler::handle(self, cx, message).await
344 }
345}
346
347#[async_trait]
363pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
364 type Params: RemoteMessage;
366
367 async fn new(params: Self::Params, environment: Flattrs) -> anyhow::Result<Self>;
371
372 fn gspawn_root_bind(
377 proc: &Proc,
378 uid: crate::id::Uid,
379 serialized_params: Data,
380 environment: Flattrs,
381 ) -> Pin<Box<dyn Future<Output = Result<ActorAddr, anyhow::Error>> + Send>> {
382 let proc = proc.clone();
383 Box::pin(async move {
384 let params =
385 bincode::serde::decode_from_slice(&serialized_params, bincode::config::legacy())
386 .map(|(v, _)| v)?;
387 let actor = Self::new(params, environment).await?;
388 let handle = proc.spawn_with_uid(uid, actor)?;
389 Ok(handle.bind::<Self>().into_actor_addr())
401 })
402 }
403
404 fn gspawn_child(
409 proc: &Proc,
410 parent: InstanceCell,
411 uid: crate::id::Uid,
412 serialized_params: Data,
413 environment: Flattrs,
414 ) -> Pin<Box<dyn Future<Output = Result<AnyActorHandle, anyhow::Error>> + Send>> {
415 let proc = proc.clone();
416 Box::pin(async move {
417 let params =
418 bincode::serde::decode_from_slice(&serialized_params, bincode::config::legacy())
419 .map(|(v, _)| v)?;
420 let actor = Self::new(params, environment).await?;
421 let handle = proc.spawn_child_with_uid(parent, uid, actor)?;
422 handle.bind::<Self>();
423 Ok(handle.into_any())
424 })
425 }
426
427 fn get_type_id() -> TypeId {
429 TypeId::of::<Self>()
430 }
431}
432
433#[async_trait]
436impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
437 type Params = ();
438
439 async fn new(_params: Self::Params, _environment: Flattrs) -> anyhow::Result<Self> {
440 Ok(Default::default())
441 }
442}
443
444#[derive(Debug)]
447pub struct ActorError {
448 pub actor_id: Box<ActorAddr>,
450 pub kind: Box<ActorErrorKind>,
452}
453
454#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
456pub enum ActorErrorKind {
457 #[error("{0}")]
459 Generic(String),
460
461 #[error("{0} while handling {1}")]
463 ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
464
465 #[error("{0}")]
467 UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
468
469 #[error("actor explicitly aborted due to: {0}")]
471 Aborted(String),
472}
473
474impl ActorErrorKind {
475 pub fn processing(err: anyhow::Error) -> Self {
478 err.downcast::<ActorErrorKind>()
483 .unwrap_or_else(|err| Self::Generic(err.to_string()))
484 }
485
486 pub fn panic(err: anyhow::Error) -> Self {
488 Self::Generic(format!("panic: {}", err))
489 }
490
491 pub fn init(err: anyhow::Error) -> Self {
493 Self::Generic(format!("initialization error: {}", err))
494 }
495
496 pub fn cleanup(err: anyhow::Error) -> Self {
498 Self::Generic(format!("cleanup error: {}", err))
499 }
500
501 pub fn mailbox(err: MailboxError) -> Self {
503 Self::Generic(err.to_string())
504 }
505
506 pub fn mailbox_sender(err: MailboxSenderError) -> Self {
508 Self::Generic(err.to_string())
509 }
510
511 pub fn indeterminate_state() -> Self {
513 Self::Generic("actor is in an indeterminate state".to_string())
514 }
515}
516
517impl ActorError {
518 pub(crate) fn new(actor_id: &ActorAddr, kind: ActorErrorKind) -> Self {
520 Self {
521 actor_id: Box::new(actor_id.clone()),
522 kind: Box::new(kind),
523 }
524 }
525}
526
527impl fmt::Display for ActorError {
528 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
529 write!(f, "serving {}: ", self.actor_id)?;
530 fmt::Display::fmt(&self.kind, f)
531 }
532}
533
534impl std::error::Error for ActorError {
535 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
536 self.kind.source()
537 }
538}
539
540impl From<MailboxError> for ActorError {
541 fn from(inner: MailboxError) -> Self {
542 Self {
543 actor_id: Box::new(inner.actor_addr().clone()),
544 kind: Box::new(ActorErrorKind::mailbox(inner)),
545 }
546 }
547}
548
549impl From<MailboxSenderError> for ActorError {
550 fn from(inner: MailboxSenderError) -> Self {
551 Self {
552 actor_id: Box::new(inner.location().actor_addr()),
553 kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
554 }
555 }
556}
557
558#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
565pub enum Signal {
566 DrainAndStop(String),
568
569 Stop(String),
571
572 ExitRequested(String),
574
575 ChildStopped(crate::id::Uid),
577
578 Kill(String),
582}
583wirevalue::register_type!(Signal);
584
585impl fmt::Display for Signal {
586 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
587 match self {
588 Signal::DrainAndStop(reason) => write!(f, "DrainAndStop({})", reason),
589 Signal::Stop(reason) => write!(f, "Stop({})", reason),
590 Signal::ExitRequested(reason) => write!(f, "ExitRequested({})", reason),
591 Signal::ChildStopped(uid) => write!(f, "ChildStopped({})", uid),
592 Signal::Kill(reason) => write!(f, "Kill({})", reason),
593 }
594 }
595}
596
597#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
602pub struct HandlerInfo {
603 pub typename: Cow<'static, str>,
605 pub arm: Option<Cow<'static, str>>,
607}
608
609impl HandlerInfo {
610 pub fn from_static(typename: &'static str, arm: Option<&'static str>) -> Self {
612 Self {
613 typename: Cow::Borrowed(typename),
614 arm: arm.map(Cow::Borrowed),
615 }
616 }
617
618 pub fn from_owned(typename: String, arm: Option<String>) -> Self {
620 Self {
621 typename: Cow::Owned(typename),
622 arm: arm.map(Cow::Owned),
623 }
624 }
625}
626
627impl fmt::Display for HandlerInfo {
628 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629 match &self.arm {
630 Some(arm) => write!(f, "{}.{}", self.typename, arm),
631 None => write!(f, "{}", self.typename),
632 }
633 }
634}
635
636#[derive(
638 Debug,
639 Serialize,
640 Deserialize,
641 PartialEq,
642 Eq,
643 Clone,
644 typeuri::Named,
645 EnumAsInner
646)]
647pub enum ActorStatus {
648 Unknown,
650 Created,
652 Initializing,
654 Client,
657 Idle,
659 Processing(SystemTime, Option<HandlerInfo>),
662 Stopping,
664 Stopped(String),
667 Failed(ActorErrorKind),
669}
670
671impl ActorStatus {
672 pub fn is_terminal(&self) -> bool {
674 self.is_stopped() || self.is_failed()
675 }
676
677 pub fn generic_failure(message: impl Into<String>) -> Self {
679 Self::Failed(ActorErrorKind::Generic(message.into()))
680 }
681
682 fn span_string(&self) -> &'static str {
683 self.arm().unwrap_or_default()
684 }
685}
686
687impl fmt::Display for ActorStatus {
688 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689 match self {
690 Self::Unknown => write!(f, "unknown"),
691 Self::Created => write!(f, "created"),
692 Self::Initializing => write!(f, "initializing"),
693 Self::Client => write!(f, "client"),
694 Self::Idle => write!(f, "idle"),
695 Self::Processing(instant, None) => {
696 write!(
697 f,
698 "processing for {}ms",
699 std::time::SystemTime::now()
700 .duration_since(*instant)
701 .unwrap_or_default()
702 .as_millis()
703 )
704 }
705 Self::Processing(instant, Some(handler_info)) => {
706 write!(
707 f,
708 "{}: processing for {}ms",
709 handler_info,
710 std::time::SystemTime::now()
711 .duration_since(*instant)
712 .unwrap_or_default()
713 .as_millis()
714 )
715 }
716 Self::Stopping => write!(f, "stopping"),
717 Self::Stopped(reason) => write!(f, "stopped: {}", reason),
718 Self::Failed(err) => write!(f, "failed: {}", err),
719 }
720 }
721}
722
723pub struct ActorHandle<A: Actor> {
732 cell: InstanceCell,
733 ports: Arc<HandlerPorts<A>>,
734}
735
736impl<A: Actor> ActorHandle<A> {
738 pub(crate) fn new(cell: InstanceCell, ports: Arc<HandlerPorts<A>>) -> Self {
739 Self { cell, ports }
740 }
741
742 pub(crate) fn cell(&self) -> &InstanceCell {
745 &self.cell
746 }
747
748 pub fn actor_addr(&self) -> &ActorAddr {
750 self.cell.actor_addr()
751 }
752
753 pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
755 tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_addr());
756 self.cell.signal(Signal::DrainAndStop(reason.to_string()))
757 }
758
759 pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
762 tracing::info!("actor handle stop called: {}", self.actor_addr());
763 self.cell.signal(Signal::Stop(reason.to_string()))
764 }
765
766 pub fn kill(&self, reason: &str) -> Result<(), ActorError> {
768 tracing::info!("actor handle kill called: {}", self.actor_addr());
769 self.cell.signal(Signal::Kill(reason.to_string()))
770 }
771
772 pub fn status(&self) -> watch::Receiver<ActorStatus> {
774 self.cell.status().clone()
775 }
776
777 pub fn port<M: Message>(&self) -> PortHandle<M>
779 where
780 A: Handler<M>,
781 {
782 self.ports.get()
783 }
784
785 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
788 self.cell.bind(self.ports.as_ref())
789 }
790
791 pub fn into_any(self) -> AnyActorHandle {
793 AnyActorHandle { cell: self.cell }
794 }
795}
796
797pub struct AnyActorHandle {
803 cell: InstanceCell,
804}
805
806impl AnyActorHandle {
807 pub fn actor_id(&self) -> &ActorAddr {
809 self.cell.actor_addr()
810 }
811
812 pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
814 self.cell.signal(Signal::DrainAndStop(reason.to_string()))
815 }
816
817 pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
819 self.cell.signal(Signal::Stop(reason.to_string()))
820 }
821
822 pub fn kill(&self, reason: &str) -> Result<(), ActorError> {
824 self.cell.signal(Signal::Kill(reason.to_string()))
825 }
826
827 pub fn status(&self) -> watch::Receiver<ActorStatus> {
829 self.cell.status().clone()
830 }
831
832 pub fn downcast<A: Actor>(&self) -> Option<ActorHandle<A>> {
834 self.cell.downcast_handle()
835 }
836}
837
838impl IntoFuture for AnyActorHandle {
842 type Output = ActorStatus;
843 type IntoFuture = BoxFuture<'static, Self::Output>;
844
845 fn into_future(self) -> Self::IntoFuture {
846 let future = async move {
847 let mut status_receiver = self.cell.status().clone();
848 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
849 match result {
850 Err(_) => ActorStatus::Unknown,
851 Ok(status) => status.clone(),
852 }
853 };
854
855 future.boxed()
856 }
857}
858
859impl Debug for AnyActorHandle {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
861 f.debug_struct("AnyActorHandle")
862 .field("cell", &"..")
863 .finish()
864 }
865}
866
867impl Clone for AnyActorHandle {
868 fn clone(&self) -> Self {
869 Self {
870 cell: self.cell.clone(),
871 }
872 }
873}
874
875impl<A: Actor> IntoFuture for ActorHandle<A> {
879 type Output = ActorStatus;
880 type IntoFuture = BoxFuture<'static, Self::Output>;
881
882 fn into_future(self) -> Self::IntoFuture {
883 let future = async move {
884 let mut status_receiver = self.cell.status().clone();
885 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
886 match result {
887 Err(_) => ActorStatus::Unknown,
888 Ok(status) => status.clone(),
889 }
890 };
891
892 future.boxed()
893 }
894}
895
896impl<A: Actor> Debug for ActorHandle<A> {
897 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
898 f.debug_struct("ActorHandle").field("cell", &"..").finish()
899 }
900}
901
902impl<A, M> Endpoint<M> for &ActorHandle<A>
903where
904 A: Actor + Handler<M>,
905 M: Message,
906{
907 fn endpoint_location(&self) -> EndpointLocation {
908 EndpointLocation::Actor(self.actor_addr().clone())
909 }
910
911 fn post<C>(self, cx: &C, message: M)
912 where
913 C: context::Actor,
914 {
915 Endpoint::post(&self.ports.get(), cx, message)
916 }
917}
918
919impl<A: Actor> Clone for ActorHandle<A> {
920 fn clone(&self) -> Self {
921 Self {
922 cell: self.cell.clone(),
923 ports: self.ports.clone(),
924 }
925 }
926}
927
928pub trait Referable: Named {}
943
944pub trait Binds<A: Actor>: Referable {
947 fn bind(ports: &HandlerPorts<A>);
949}
950
951pub trait RemoteHandles<M: RemoteMessage>: Referable {}
954
955#[macro_export]
986macro_rules! assert_behaves {
987 ($ty:ty as $behavior:ty) => {
988 const _: fn() = || {
989 fn check<B: hyperactor::actor::Binds<$ty>>() {}
990 check::<$behavior>();
991 };
992 };
993}
994
995#[cfg(test)]
996mod tests {
997 use std::sync::Mutex;
998 use std::time::Duration;
999
1000 use rand::seq::SliceRandom;
1001 use timed_test::async_timed_test;
1002 use tokio::sync::mpsc;
1003 use tokio::time::timeout;
1004
1005 use super::*;
1006 use crate as hyperactor;
1007 use crate::Actor;
1008 use crate::ActorRef;
1009 use crate::Addr;
1010 use crate::OncePortHandle;
1011 use crate::PortRef;
1012 use crate::config;
1013 use crate::context::Mailbox as _;
1014 use crate::introspect::IntrospectMessage;
1015 use crate::introspect::IntrospectResult;
1016 use crate::introspect::IntrospectView;
1017 use crate::mailbox::BoxableMailboxSender as _;
1018 use crate::mailbox::MailboxSender;
1019 use crate::mailbox::PortLocation;
1020 use crate::mailbox::monitored_return_handle;
1021 use crate::ordering::SEQ_INFO;
1022 use crate::ordering::SeqInfo;
1023 use crate::testing::ids::test_proc_id;
1024 use crate::testing::pingpong::PingPongActor;
1025 use crate::testing::pingpong::PingPongMessage;
1026 use crate::testing::proc_supervison::ProcSupervisionCoordinator; #[derive(Debug)]
1029 struct EchoActor(PortRef<u64>);
1030
1031 #[async_trait]
1032 impl Actor for EchoActor {}
1033
1034 #[async_trait]
1035 impl Handler<u64> for EchoActor {
1036 async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1037 let Self(port) = self;
1038 port.post(cx, message);
1039 Ok(())
1040 }
1041 }
1042
1043 #[tokio::test]
1044 async fn test_server_basic() {
1045 let proc = Proc::isolated();
1046 let (client, _) = proc.client("client").unwrap();
1047 let (tx, mut rx) = client.open_port();
1048 let actor = EchoActor(tx.bind());
1049 let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
1050 handle.post(&client, 123u64);
1051 handle.drain_and_stop("test").unwrap();
1052 handle.await;
1053
1054 assert_eq!(rx.drain(), vec![123u64]);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_ping_pong() {
1059 let proc = Proc::isolated();
1060 let (client, _) = proc.client("client").unwrap();
1061 let (undeliverable_msg_tx, _) = client.open_port();
1062
1063 let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
1064 let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
1065 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
1066 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
1067
1068 let (local_port, local_receiver) = client.open_once_port();
1069
1070 ping_handle.post(
1071 &client,
1072 PingPongMessage(10, pong_handle.bind(), local_port.bind()),
1073 );
1074
1075 assert!(local_receiver.recv().await.unwrap());
1076 }
1077
1078 #[tokio::test]
1079 async fn test_ping_pong_on_handler_error() {
1080 let proc = Proc::isolated();
1081 let (client, _) = proc.client("client").unwrap();
1082 let (undeliverable_msg_tx, _) = client.open_port();
1083
1084 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
1087
1088 let error_ttl = 66;
1089
1090 let ping_actor =
1091 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
1092 let pong_actor =
1093 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
1094 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
1095 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
1096
1097 let (local_port, local_receiver) = client.open_once_port();
1098
1099 ping_handle.post(
1100 &client,
1101 PingPongMessage(
1102 error_ttl + 1, pong_handle.bind(),
1104 local_port.bind(),
1105 ),
1106 );
1107
1108 let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
1110 timeout(Duration::from_secs(5), local_receiver.recv()).await;
1111 assert!(res.is_err());
1112 }
1113
1114 #[derive(Debug)]
1115 struct InitActor(bool);
1116
1117 #[async_trait]
1118 impl Actor for InitActor {
1119 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
1120 self.0 = true;
1121 Ok(())
1122 }
1123 }
1124
1125 #[async_trait]
1126 impl Handler<OncePortHandle<bool>> for InitActor {
1127 async fn handle(
1128 &mut self,
1129 cx: &Context<Self>,
1130 port: OncePortHandle<bool>,
1131 ) -> Result<(), anyhow::Error> {
1132 port.post(cx, self.0);
1133 Ok(())
1134 }
1135 }
1136
1137 #[tokio::test]
1138 async fn test_init() {
1139 let proc = Proc::isolated();
1140 let actor = InitActor(false);
1141 let handle = proc.spawn::<InitActor>("init", actor).unwrap();
1142 let (client, _) = proc.client("client").unwrap();
1143
1144 let (port, receiver) = client.open_once_port();
1145 handle.post(&client, port);
1146 assert!(receiver.recv().await.unwrap());
1147
1148 handle.drain_and_stop("test").unwrap();
1149 handle.await;
1150 }
1151
1152 type MultiValues = Arc<Mutex<(u64, String)>>;
1153
1154 struct MultiValuesTest {
1155 proc: Proc,
1156 values: MultiValues,
1157 handle: ActorHandle<MultiActor>,
1158 client: Instance<()>,
1159 _client_handle: ActorHandle<()>,
1160 }
1161
1162 impl MultiValuesTest {
1163 async fn new() -> Self {
1164 let proc = Proc::isolated();
1165 let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
1166 let actor = MultiActor(values.clone());
1167 let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
1168 let (client, client_handle) = proc.client("client").unwrap();
1169 Self {
1170 proc,
1171 values,
1172 handle,
1173 client,
1174 _client_handle: client_handle,
1175 }
1176 }
1177
1178 fn send<M>(&self, message: M)
1179 where
1180 M: RemoteMessage,
1181 MultiActor: Handler<M>,
1182 {
1183 self.handle.post(&self.client, message)
1184 }
1185
1186 async fn sync(&self) {
1187 let (port, done) = self.client.open_once_port::<bool>();
1188 self.handle.post(&self.client, port);
1189 assert!(done.recv().await.unwrap());
1190 }
1191
1192 fn get_values(&self) -> (u64, String) {
1193 self.values.lock().unwrap().clone()
1194 }
1195 }
1196
1197 #[derive(Debug)]
1198 #[hyperactor::export(handlers = [u64, String])]
1199 struct MultiActor(MultiValues);
1200
1201 #[async_trait]
1202 impl Actor for MultiActor {}
1203
1204 #[async_trait]
1205 impl Handler<u64> for MultiActor {
1206 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1207 let mut vals = self.0.lock().unwrap();
1208 vals.0 = message;
1209 Ok(())
1210 }
1211 }
1212
1213 #[async_trait]
1214 impl Handler<String> for MultiActor {
1215 async fn handle(
1216 &mut self,
1217 _cx: &Context<Self>,
1218 message: String,
1219 ) -> Result<(), anyhow::Error> {
1220 let mut vals = self.0.lock().unwrap();
1221 vals.1 = message;
1222 Ok(())
1223 }
1224 }
1225
1226 #[async_trait]
1227 impl Handler<OncePortHandle<bool>> for MultiActor {
1228 async fn handle(
1229 &mut self,
1230 cx: &Context<Self>,
1231 message: OncePortHandle<bool>,
1232 ) -> Result<(), anyhow::Error> {
1233 message.post(cx, true);
1234 Ok(())
1235 }
1236 }
1237
1238 #[tokio::test]
1239 async fn test_multi_handler_refs() {
1240 let test = MultiValuesTest::new().await;
1241
1242 test.send(123u64);
1243 test.send("foo".to_string());
1244 test.sync().await;
1245 assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1246
1247 let myref: ActorRef<MultiActor> = test.handle.bind();
1248
1249 myref.port().post(&test.client, 321u64);
1250 test.sync().await;
1251 assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1252
1253 myref.port().post(&test.client, "bar".to_string());
1254 test.sync().await;
1255 assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1256 }
1257
1258 #[tokio::test]
1259 async fn test_ref_behavior() {
1260 let test = MultiValuesTest::new().await;
1261
1262 test.send(123u64);
1263 test.send("foo".to_string());
1264
1265 hyperactor::behavior!(MyActorBehavior, u64, String);
1266
1267 let myref: ActorRef<MyActorBehavior> = test.handle.bind();
1268 myref.port().post(&test.client, "biz".to_string());
1269 myref.port().post(&test.client, 999u64);
1270
1271 test.sync().await;
1272 assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1273 }
1274
1275 #[tokio::test]
1276 async fn test_actor_handle_downcast() {
1277 #[derive(Debug, Default)]
1278 struct NothingActor;
1279
1280 impl Actor for NothingActor {}
1281
1282 let proc = Proc::isolated();
1285 let handle = proc.spawn("nothing", NothingActor).unwrap();
1286 let cell = handle.cell();
1287
1288 assert!(cell.downcast_handle::<EchoActor>().is_none());
1290
1291 let handle = cell.downcast_handle::<NothingActor>().unwrap();
1292 handle.drain_and_stop("test").unwrap();
1293 handle.await;
1294 }
1295
1296 #[derive(Debug)]
1298 #[hyperactor::export(handlers = [String, Callback])]
1299 struct GetSeqActor(PortRef<(String, SeqInfo)>);
1300
1301 #[async_trait]
1302 impl Actor for GetSeqActor {}
1303
1304 #[async_trait]
1305 impl Handler<String> for GetSeqActor {
1306 async fn handle(
1307 &mut self,
1308 cx: &Context<Self>,
1309 message: String,
1310 ) -> Result<(), anyhow::Error> {
1311 let Self(port) = self;
1312 let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1313 port.post(cx, (message, seq_info.clone()));
1314 Ok(())
1315 }
1316 }
1317
1318 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
1323 struct Callback(PortRef<PortRef<String>>);
1324
1325 #[async_trait]
1326 impl Handler<Callback> for GetSeqActor {
1327 async fn handle(
1328 &mut self,
1329 cx: &Context<Self>,
1330 message: Callback,
1331 ) -> Result<(), anyhow::Error> {
1332 let (handle, mut receiver) = cx.open_port::<String>();
1333 let callback_ref = handle.bind();
1334 message.0.post(cx, callback_ref);
1335 let msg = receiver.recv().await.unwrap();
1336 self.handle(cx, msg).await
1337 }
1338 }
1339
1340 #[async_timed_test(timeout_secs = 30)]
1341 async fn test_sequencing_actor_handle_basic() {
1342 let proc = Proc::isolated();
1343 let (client, _) = proc.client("client").unwrap();
1344 let (tx, mut rx) = client.open_port();
1345
1346 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1347
1348 actor_handle.post(&client, "unbound".to_string());
1350 assert_eq!(
1351 rx.recv().await.unwrap(),
1352 ("unbound".to_string(), SeqInfo::Direct)
1353 );
1354
1355 let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1356
1357 let session_id = client.sequencer().session_id();
1358 let mut expected_seq = 0;
1359 for m in 0..10 {
1361 actor_handle.post(&client, format!("{m}"));
1362 expected_seq += 1;
1363 assert_eq!(
1364 rx.recv().await.unwrap(),
1365 (
1366 format!("{m}"),
1367 SeqInfo::Session {
1368 session_id,
1369 seq: expected_seq,
1370 }
1371 )
1372 );
1373
1374 for n in 0..2 {
1375 actor_ref.port().post(&client, format!("{m}-{n}"));
1376 expected_seq += 1;
1377 assert_eq!(
1378 rx.recv().await.unwrap(),
1379 (
1380 format!("{m}-{n}"),
1381 SeqInfo::Session {
1382 session_id,
1383 seq: expected_seq,
1384 }
1385 )
1386 );
1387 }
1388 }
1389 }
1390
1391 #[async_timed_test(timeout_secs = 30)]
1393 async fn test_sequencing_mixed_handler_and_non_handler_ports() {
1394 let proc = Proc::isolated();
1395 let (client, _) = proc.client("client").unwrap();
1396
1397 let (actor_tx, mut actor_rx) = client.open_port();
1399
1400 let (non_handler_tx, mut non_handler_rx) = mpsc::unbounded_channel::<Option<SeqInfo>>();
1402
1403 let actor_handle = proc.spawn("get_seq", GetSeqActor(actor_tx.bind())).unwrap();
1404 let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1405
1406 let non_handler_tx_clone = non_handler_tx.clone();
1408 let non_handler_port_handle =
1409 client
1410 .mailbox()
1411 .open_enqueue_port(move |headers: Flattrs, _m: ()| {
1412 let seq_info = headers.get(SEQ_INFO);
1413 non_handler_tx_clone.send(seq_info).unwrap();
1414 Ok(())
1415 });
1416
1417 non_handler_port_handle.bind();
1419 let non_handler_port_id = match non_handler_port_handle.location() {
1420 PortLocation::Bound(port_id) => port_id,
1421 _ => panic!("port_handle should be bound"),
1422 };
1423 assert!(!non_handler_port_id.is_handler_port());
1424
1425 let session_id = client.sequencer().session_id();
1426
1427 actor_handle.post(&client, "msg1".to_string());
1429 assert_eq!(
1430 actor_rx.recv().await.unwrap().1,
1431 SeqInfo::Session { session_id, seq: 1 }
1432 );
1433
1434 actor_ref.port().post(&client, "msg2".to_string());
1436 assert_eq!(
1437 actor_rx.recv().await.unwrap().1,
1438 SeqInfo::Session { session_id, seq: 2 }
1439 );
1440
1441 non_handler_port_handle.post(&client, ());
1443 assert_eq!(
1444 non_handler_rx.recv().await.unwrap(),
1445 Some(SeqInfo::Session { session_id, seq: 1 })
1446 );
1447
1448 actor_handle.post(&client, "msg3".to_string());
1450 assert_eq!(
1451 actor_rx.recv().await.unwrap().1,
1452 SeqInfo::Session { session_id, seq: 3 }
1453 );
1454
1455 non_handler_port_handle.post(&client, ());
1457 assert_eq!(
1458 non_handler_rx.recv().await.unwrap(),
1459 Some(SeqInfo::Session { session_id, seq: 2 })
1460 );
1461
1462 actor_ref.port().post(&client, "msg4".to_string());
1464 assert_eq!(
1465 actor_rx.recv().await.unwrap().1,
1466 SeqInfo::Session { session_id, seq: 4 }
1467 );
1468
1469 actor_handle.drain_and_stop("test cleanup").unwrap();
1470 actor_handle.await;
1471 }
1472
1473 #[async_timed_test(timeout_secs = 30)]
1475 async fn test_sequencing_multiple_clients() {
1476 let proc = Proc::isolated();
1477 let (client1, _) = proc.client("client1").unwrap();
1478 let (client2, _) = proc.client("client2").unwrap();
1479
1480 let (tx, mut rx) = client1.open_port();
1482
1483 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1484 let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1485
1486 let session_id_1 = client1.sequencer().session_id();
1488 let session_id_2 = client2.sequencer().session_id();
1489 assert_ne!(session_id_1, session_id_2);
1490
1491 actor_handle.post(&client1, "c1_msg1".to_string());
1493 assert_eq!(
1494 rx.recv().await.unwrap().1,
1495 SeqInfo::Session {
1496 session_id: session_id_1,
1497 seq: 1
1498 }
1499 );
1500
1501 actor_handle.post(&client2, "c2_msg1".to_string());
1503 assert_eq!(
1504 rx.recv().await.unwrap().1,
1505 SeqInfo::Session {
1506 session_id: session_id_2,
1507 seq: 1
1508 }
1509 );
1510
1511 actor_ref.port().post(&client1, "c1_msg2".to_string());
1513 assert_eq!(
1514 rx.recv().await.unwrap().1,
1515 SeqInfo::Session {
1516 session_id: session_id_1,
1517 seq: 2
1518 }
1519 );
1520
1521 actor_ref.port().post(&client2, "c2_msg2".to_string());
1523 assert_eq!(
1524 rx.recv().await.unwrap().1,
1525 SeqInfo::Session {
1526 session_id: session_id_2,
1527 seq: 2
1528 }
1529 );
1530
1531 actor_handle.post(&client1, "c1_msg3".to_string());
1533 assert_eq!(
1534 rx.recv().await.unwrap().1,
1535 SeqInfo::Session {
1536 session_id: session_id_1,
1537 seq: 3
1538 }
1539 );
1540
1541 actor_ref.port().post(&client2, "c2_msg3".to_string());
1542 assert_eq!(
1543 rx.recv().await.unwrap().1,
1544 SeqInfo::Session {
1545 session_id: session_id_2,
1546 seq: 3
1547 }
1548 );
1549
1550 actor_handle.drain_and_stop("test cleanup").unwrap();
1551 actor_handle.await;
1552 }
1553
1554 #[async_timed_test(timeout_secs = 30)]
1575 async fn test_sequencing_actor_handle_callback() {
1576 let config = hyperactor_config::global::lock();
1577 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1578
1579 let proc = Proc::isolated();
1580 let (client, _) = proc.client("client").unwrap();
1581 let (tx, mut rx) = client.open_port();
1582
1583 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1584 let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1585
1586 let (callback_tx, mut callback_rx) = client.open_port();
1587 actor_ref.post(&client, Callback(callback_tx.bind()));
1589 let msg_port_ref = callback_rx.recv().await.unwrap();
1590 msg_port_ref.post(&client, "finally".to_string());
1593
1594 let session_id = client.sequencer().session_id();
1595 assert_eq!(
1597 rx.recv().await.unwrap(),
1598 (
1599 "finally".to_string(),
1600 SeqInfo::Session { session_id, seq: 1 }
1601 )
1602 );
1603 }
1604
1605 #[derive(Clone, Debug)]
1608 struct DelayedMailboxSender {
1609 relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1610 }
1611
1612 impl DelayedMailboxSender {
1613 fn new(
1615 dest_proc: Proc,
1618 relay_orders: Vec<usize>,
1622 ) -> Self {
1623 let (relay_tx, mut relay_rx) = mpsc::unbounded_channel::<MessageEnvelope>();
1624
1625 tokio::spawn(async move {
1626 let mut buffer = Vec::new();
1627
1628 for _ in 0..relay_orders.len() {
1629 let envelope = relay_rx.recv().await.unwrap();
1630 buffer.push(envelope);
1631 }
1632
1633 for m in buffer.clone() {
1634 let seq = match m.headers().get(SEQ_INFO).expect("seq should be set") {
1635 SeqInfo::Session { seq, .. } => seq as usize,
1636 SeqInfo::Direct => panic!("expected Session variant"),
1637 };
1638 let order = relay_orders[seq - 1];
1640 buffer[order] = m;
1641 }
1642
1643 let dest_proc_clone = dest_proc.clone();
1644 for msg in buffer {
1645 dest_proc_clone.post(msg, monitored_return_handle());
1646 }
1647 });
1648
1649 Self { relay_tx }
1650 }
1651 }
1652
1653 #[async_trait]
1654 impl MailboxSender for DelayedMailboxSender {
1655 fn post_unchecked(
1656 &self,
1657 envelope: MessageEnvelope,
1658 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1659 ) {
1660 self.relay_tx.send(envelope).unwrap();
1661 }
1662 }
1663
1664 async fn assert_out_of_order_delivery(expected: Vec<(String, u64)>, relay_orders: Vec<usize>) {
1665 let local_proc: Proc = Proc::isolated();
1666 let (client, _) = local_proc.client("local").unwrap();
1667 let (tx, mut rx) = client.open_port();
1668
1669 let handle = local_proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1670 let actor_ref: ActorRef<GetSeqActor> = handle.bind();
1671
1672 let remote_proc = Proc::configured(
1673 test_proc_id("remote_0"),
1674 DelayedMailboxSender::new(local_proc.clone(), relay_orders).boxed(),
1675 );
1676 let (remote_client, _) = remote_proc.client("remote").unwrap();
1677 let mut messages = expected.clone();
1679 messages.sort_by_key(|v| v.1);
1680 for (message, _seq) in messages {
1681 actor_ref.post(&remote_client, message);
1682 }
1683 let session_id = remote_client.sequencer().session_id();
1684 for expect in expected {
1685 let expected = (
1686 expect.0,
1687 SeqInfo::Session {
1688 session_id,
1689 seq: expect.1,
1690 },
1691 );
1692 assert_eq!(rx.recv().await.unwrap(), expected);
1693 }
1694
1695 handle.drain_and_stop("test cleanup").unwrap();
1696 handle.await;
1697 }
1698
1699 #[async_timed_test(timeout_secs = 30)]
1704 async fn test_sequencing_actor_ref_known_delivery_order() {
1705 let config = hyperactor_config::global::lock();
1706
1707 let relay_orders = vec![2, 0, 1];
1709
1710 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
1713 assert_out_of_order_delivery(
1714 vec![
1715 ("second".to_string(), 2),
1716 ("third".to_string(), 3),
1717 ("first".to_string(), 1),
1718 ],
1719 relay_orders.clone(),
1720 )
1721 .await;
1722
1723 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1726 assert_out_of_order_delivery(
1727 vec![
1728 ("first".to_string(), 1),
1729 ("second".to_string(), 2),
1730 ("third".to_string(), 3),
1731 ],
1732 relay_orders.clone(),
1733 )
1734 .await;
1735 }
1736
1737 #[async_timed_test(timeout_secs = 30)]
1742 async fn test_sequencing_actor_ref_random_delivery_order() {
1743 let config = hyperactor_config::global::lock();
1744
1745 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1748 let expected = (0..10000)
1749 .map(|i| (format!("msg{i}"), i + 1))
1750 .collect::<Vec<_>>();
1751
1752 let mut relay_orders: Vec<usize> = (0..10000).collect();
1753 relay_orders.shuffle(&mut rand::rng());
1754 assert_out_of_order_delivery(expected, relay_orders).await;
1755 }
1756
1757 #[tokio::test]
1775 async fn test_introspect_query_default_payload() {
1776 let proc = Proc::isolated();
1777 let (client, _) = proc.client("client").unwrap();
1778 let (tx, _rx) = client.open_port::<u64>();
1779 let actor = EchoActor(tx.bind());
1780 let handle = proc.spawn::<EchoActor>("echo_introspect", actor).unwrap();
1781
1782 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1783 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
1784 &client,
1785 IntrospectMessage::Query {
1786 view: IntrospectView::Actor,
1787 reply: reply_port.bind(),
1788 },
1789 );
1790 let payload = reply_rx.recv().await.unwrap();
1791
1792 assert_eq!(
1793 payload.identity,
1794 crate::introspect::IntrospectRef::Actor(handle.actor_addr().clone())
1795 );
1796 assert_valid_attrs(&payload);
1797 assert_has_attr(&payload, "status");
1798 assert_has_attr(&payload, "actor_type");
1799 assert_has_attr(&payload, "created_at");
1800 assert!(payload.children.is_empty());
1801 assert!(payload.parent.is_none());
1802
1803 handle.drain_and_stop("test").unwrap();
1804 handle.await;
1805 }
1806
1807 fn attrs_get(attrs_json: &str, short_name: &str) -> Option<serde_json::Value> {
1809 use hyperactor_config::INTROSPECT;
1810 use hyperactor_config::attrs::AttrKeyInfo;
1811 let fq_name = inventory::iter::<AttrKeyInfo>()
1812 .find(|info| {
1813 info.meta
1814 .get(INTROSPECT)
1815 .is_some_and(|ia| ia.name == short_name)
1816 })
1817 .map(|info| info.name)?;
1818 let obj: serde_json::Value = serde_json::from_str(attrs_json).ok()?;
1819 obj.get(fq_name).cloned()
1820 }
1821
1822 fn assert_valid_attrs(result: &IntrospectResult) {
1824 let parsed: serde_json::Value =
1825 serde_json::from_str(&result.attrs).expect("IA-1: attrs must be valid JSON");
1826 assert!(parsed.is_object(), "IA-1: attrs must be a JSON object");
1827 }
1828
1829 fn assert_status(result: &IntrospectResult, expected: &str) {
1831 let status = attrs_get(&result.attrs, "status")
1832 .and_then(|v| v.as_str().map(String::from))
1833 .expect("attrs must contain status");
1834 assert_eq!(status, expected, "unexpected actor status");
1835 }
1836
1837 fn assert_handler(result: &IntrospectResult, expected: Option<&str>) {
1839 let handler =
1840 attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from));
1841 assert_eq!(handler.as_deref(), expected);
1842 }
1843
1844 fn assert_error_code(result: &IntrospectResult, expected: &str) {
1846 let code = attrs_get(&result.attrs, "error_code")
1847 .and_then(|v| v.as_str().map(String::from))
1848 .expect("attrs must contain error_code");
1849 assert_eq!(code, expected);
1850 }
1851
1852 fn assert_handler_not_contains(result: &IntrospectResult, forbidden: &str) {
1854 if let Some(handler) =
1855 attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from))
1856 {
1857 assert!(
1858 !handler.contains(forbidden),
1859 "handler should not contain '{}'; got: {}",
1860 forbidden,
1861 handler
1862 );
1863 }
1864 }
1865
1866 fn assert_has_attr(result: &IntrospectResult, short_name: &str) {
1868 assert!(
1869 attrs_get(&result.attrs, short_name).is_some(),
1870 "attrs must contain '{}'",
1871 short_name
1872 );
1873 }
1874
1875 fn assert_status_contains(result: &IntrospectResult, substring: &str) {
1878 let status = attrs_get(&result.attrs, "status")
1879 .and_then(|v| v.as_str().map(String::from))
1880 .expect("attrs must contain status");
1881 assert!(
1882 status.contains(substring),
1883 "status should contain '{}'; got: {}",
1884 substring,
1885 status
1886 );
1887 }
1888
1889 fn assert_no_status_reason(result: &IntrospectResult) {
1891 assert!(
1892 attrs_get(&result.attrs, "status_reason").is_none(),
1893 "IA-3: must not have status_reason"
1894 );
1895 }
1896
1897 fn assert_has_handler(result: &IntrospectResult) {
1899 assert!(
1900 attrs_get(&result.attrs, "last_handler").is_some(),
1901 "must have a handler"
1902 );
1903 }
1904
1905 fn assert_no_failure_attrs(result: &IntrospectResult) {
1907 assert!(
1908 attrs_get(&result.attrs, "failure_error_message").is_none(),
1909 "IA-4: must not have failure attrs"
1910 );
1911 }
1912
1913 #[tokio::test]
1918 async fn test_ia1_ia4_running_actor_attrs() {
1919 let proc = Proc::isolated();
1920 let (client, _) = proc.client("client").unwrap();
1921 let (tx, _rx) = client.open_port::<u64>();
1922 let actor = EchoActor(tx.bind());
1923 let handle = proc.spawn::<EchoActor>("ia_test", actor).unwrap();
1924
1925 let payload = crate::introspect::live_actor_payload(handle.cell());
1926
1927 assert_valid_attrs(&payload);
1929
1930 assert_has_attr(&payload, "status");
1932 assert_no_status_reason(&payload);
1933
1934 assert_no_failure_attrs(&payload);
1936
1937 handle.drain_and_stop("test").unwrap();
1938 handle.await;
1939 }
1940
1941 #[tokio::test]
1947 async fn test_introspect_query_child_not_found() {
1948 let proc = Proc::isolated();
1949 let (client, _) = proc.client("client").unwrap();
1950 let (tx, _rx) = client.open_port::<u64>();
1951 let actor = EchoActor(tx.bind());
1952 let handle = proc.spawn::<EchoActor>("echo_qc", actor).unwrap();
1953
1954 let child_ref = crate::Addr::Actor(test_proc_id("nonexistent").actor_addr("child"));
1955 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1956 PortRef::<IntrospectMessage>::attest_handler_port(handle.actor_addr()).post(
1957 &client,
1958 IntrospectMessage::QueryChild {
1959 child_ref,
1960 reply: reply_port.bind(),
1961 },
1962 );
1963 let payload = reply_rx.recv().await.unwrap();
1964
1965 assert_eq!(
1966 payload.identity,
1967 crate::introspect::IntrospectRef::Actor(
1968 test_proc_id("nonexistent").actor_addr("child")
1969 )
1970 );
1971 assert_error_code(&payload, "not_found");
1972
1973 handle.drain_and_stop("test").unwrap();
1974 handle.await;
1975 }
1976
1977 #[tokio::test]
1983 async fn test_introspect_override() {
1984 #[derive(Debug, Default)]
1985 #[hyperactor::export(handlers = [])]
1986 struct CustomIntrospectActor;
1987
1988 #[async_trait]
1989 impl Actor for CustomIntrospectActor {}
1990
1991 let proc = Proc::isolated();
1992 let (client, _) = proc.client("client").unwrap();
1993 let handle = proc
1994 .spawn("custom_introspect", CustomIntrospectActor)
1995 .unwrap();
1996
1997 handle
1998 .status()
1999 .wait_for(|s| matches!(s, ActorStatus::Idle))
2000 .await
2001 .unwrap();
2002
2003 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2004 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2005 &client,
2006 IntrospectMessage::Query {
2007 view: IntrospectView::Actor,
2008 reply: reply_port.bind(),
2009 },
2010 );
2011 let payload = reply_rx.recv().await.unwrap();
2012
2013 assert_has_attr(&payload, "status");
2016
2017 handle.drain_and_stop("test").unwrap();
2018 handle.await;
2019 }
2020
2021 #[tokio::test]
2025 async fn test_introspect_query_supervision_child() {
2026 let proc = Proc::isolated();
2027 let (client, _) = proc.client("client").unwrap();
2028
2029 let (tx_parent, _rx_parent) = client.open_port::<u64>();
2031 let parent_handle = proc
2032 .spawn::<EchoActor>("parent", EchoActor(tx_parent.bind()))
2033 .unwrap();
2034
2035 let (tx_child, _rx_child) = client.open_port::<u64>();
2037 let child_handle = proc
2038 .spawn_child::<EchoActor>(parent_handle.cell().clone(), EchoActor(tx_child.bind()))
2039 .unwrap();
2040
2041 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2043 PortRef::<IntrospectMessage>::attest_handler_port(&child_handle.actor_addr().clone()).post(
2044 &client,
2045 IntrospectMessage::Query {
2046 view: IntrospectView::Actor,
2047 reply: reply_port.bind(),
2048 },
2049 );
2050 let child_payload = reply_rx.recv().await.unwrap();
2051
2052 assert_eq!(
2053 child_payload.identity,
2054 crate::introspect::IntrospectRef::Actor(child_handle.actor_addr().clone()),
2055 );
2056 assert!(
2058 attrs_get(&child_payload.attrs, "status").is_some(),
2059 "child should have actor attrs"
2060 );
2061 assert_eq!(
2062 child_payload.parent,
2063 Some(crate::introspect::IntrospectRef::Actor(
2064 parent_handle.actor_addr().clone()
2065 )),
2066 );
2067
2068 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2070 PortRef::<IntrospectMessage>::attest_handler_port(&parent_handle.actor_addr().clone())
2071 .post(
2072 &client,
2073 IntrospectMessage::Query {
2074 view: IntrospectView::Actor,
2075 reply: reply_port.bind(),
2076 },
2077 );
2078 let parent_payload = reply_rx.recv().await.unwrap();
2079
2080 assert!(parent_payload.parent.is_none());
2081 assert!(
2082 parent_payload
2083 .children
2084 .contains(&crate::introspect::IntrospectRef::Actor(
2085 child_handle.actor_addr().clone()
2086 )),
2087 );
2088
2089 child_handle.drain_and_stop("test").unwrap();
2090 child_handle.await;
2091 parent_handle.drain_and_stop("test").unwrap();
2092 parent_handle.await;
2093 }
2094
2095 #[tokio::test]
2100 async fn test_introspect_fresh_actor_status() {
2101 let proc = Proc::isolated();
2102 let (client, _) = proc.client("client").unwrap();
2103 let (tx, _rx) = client.open_port::<u64>();
2104 let actor = EchoActor(tx.bind());
2105 let handle = proc.spawn::<EchoActor>("echo_fresh", actor).unwrap();
2106
2107 handle
2109 .status()
2110 .wait_for(|s| matches!(s, ActorStatus::Idle))
2111 .await
2112 .unwrap();
2113
2114 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2115 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2116 &client,
2117 IntrospectMessage::Query {
2118 view: IntrospectView::Actor,
2119 reply: reply_port.bind(),
2120 },
2121 );
2122 let payload = reply_rx.recv().await.unwrap();
2123
2124 assert_status(&payload, "idle");
2125 assert_handler(&payload, None);
2126
2127 handle.drain_and_stop("test").unwrap();
2128 handle.await;
2129 }
2130
2131 #[tokio::test]
2136 async fn test_introspect_after_user_message() {
2137 let proc = Proc::isolated();
2138 let (client, _) = proc.client("client").unwrap();
2139 let (tx, mut rx) = client.open_port::<u64>();
2140 let actor = EchoActor(tx.bind());
2141 let handle = proc.spawn::<EchoActor>("echo_after_msg", actor).unwrap();
2142
2143 handle.post(&client, 42u64);
2145 let _ = rx.recv().await.unwrap();
2146
2147 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2148 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2149 &client,
2150 IntrospectMessage::Query {
2151 view: IntrospectView::Actor,
2152 reply: reply_port.bind(),
2153 },
2154 );
2155 let payload = reply_rx.recv().await.unwrap();
2156
2157 assert_status(&payload, "idle");
2158 assert_has_handler(&payload);
2159 assert_handler_not_contains(&payload, "IntrospectMessage");
2160
2161 handle.drain_and_stop("test").unwrap();
2162 handle.await;
2163 }
2164
2165 #[tokio::test]
2170 async fn test_introspect_consecutive_queries() {
2171 let proc = Proc::isolated();
2172 let (client, _) = proc.client("client").unwrap();
2173 let (tx, _rx) = client.open_port::<u64>();
2174 let actor = EchoActor(tx.bind());
2175 let handle = proc.spawn::<EchoActor>("echo_consec", actor).unwrap();
2176
2177 handle
2178 .status()
2179 .wait_for(|s| matches!(s, ActorStatus::Idle))
2180 .await
2181 .unwrap();
2182
2183 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2185 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2186 &client,
2187 IntrospectMessage::Query {
2188 view: IntrospectView::Actor,
2189 reply: reply_port.bind(),
2190 },
2191 );
2192 let payload1 = reply_rx.recv().await.unwrap();
2193
2194 let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2196 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2197 &client,
2198 IntrospectMessage::Query {
2199 view: IntrospectView::Actor,
2200 reply: reply_port2.bind(),
2201 },
2202 );
2203 let payload2 = reply_rx2.recv().await.unwrap();
2204
2205 assert_handler(&payload1, None);
2207 assert_handler(&payload2, None);
2208
2209 handle.drain_and_stop("test").unwrap();
2210 handle.await;
2211 }
2212
2213 #[tokio::test]
2221 async fn test_publish_attrs_round_trip() {
2222 use hyperactor_config::Attrs;
2223 use hyperactor_config::declare_attrs;
2224
2225 declare_attrs! {
2226 attr TEST_KEY_A: String;
2227 attr TEST_KEY_B: u64;
2228 }
2229
2230 let proc = Proc::isolated();
2231 let (client, _) = proc.client("client").unwrap();
2232 let (tx, _rx) = client.open_port::<u64>();
2233 let actor = EchoActor(tx.bind());
2234 let handle = proc.spawn::<EchoActor>("echo_attrs", actor).unwrap();
2235
2236 assert!(handle.cell().published_attrs().is_none());
2238
2239 let mut attrs = Attrs::new();
2241 attrs.set(TEST_KEY_A, "hello".to_string());
2242 handle.cell().set_published_attrs(attrs);
2243 let published = handle.cell().published_attrs().unwrap();
2244 assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2245
2246 handle.cell().merge_published_attr(TEST_KEY_B, 42u64);
2248 let published = handle.cell().published_attrs().unwrap();
2249 assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2250 assert_eq!(published.get(TEST_KEY_B), Some(&42u64));
2251
2252 handle
2254 .cell()
2255 .merge_published_attr(TEST_KEY_A, "world".to_string());
2256 let published = handle.cell().published_attrs().unwrap();
2257 assert_eq!(published.get(TEST_KEY_A), Some(&"world".to_string()));
2258
2259 handle.drain_and_stop("test").unwrap();
2260 handle.await;
2261 }
2262
2263 #[tokio::test]
2266 async fn test_query_child_handler_round_trip() {
2267 let proc = Proc::isolated();
2268 let (client, _) = proc.client("client").unwrap();
2269 let (tx, _rx) = client.open_port::<u64>();
2270 let actor = EchoActor(tx.bind());
2271 let handle = proc.spawn::<EchoActor>("echo_qch", actor).unwrap();
2272
2273 let test_ref = Addr::Actor(test_proc_id("test").actor_addr("child"));
2275 assert!(handle.cell().query_child(&test_ref).is_none());
2276
2277 handle.cell().set_query_child_handler(|child_ref| {
2279 use crate::introspect::IntrospectRef;
2280 let identity = match child_ref {
2281 Addr::Proc(p) => IntrospectRef::Proc(p.clone()),
2282 Addr::Actor(a) => IntrospectRef::Actor(a.clone()),
2283 Addr::Port(p) => IntrospectRef::Actor(p.actor_addr()),
2284 };
2285 IntrospectResult {
2286 identity,
2287 attrs: serde_json::json!({
2288 "proc_name": "test_proc",
2289 "num_actors": 42,
2290 })
2291 .to_string(),
2292 children: Vec::new(),
2293 parent: None,
2294 as_of: std::time::SystemTime::now(),
2295 }
2296 });
2297
2298 let payload = handle
2300 .cell()
2301 .query_child(&test_ref)
2302 .expect("callback should produce a payload");
2303 assert_eq!(
2304 payload.identity,
2305 crate::introspect::IntrospectRef::Actor(test_proc_id("test").actor_addr("child"))
2306 );
2307 let attrs: serde_json::Value =
2308 serde_json::from_str(&payload.attrs).expect("attrs must be valid JSON");
2309 assert_eq!(
2310 attrs.get("proc_name").and_then(|v| v.as_str()),
2311 Some("test_proc")
2312 );
2313 assert_eq!(attrs.get("num_actors").and_then(|v| v.as_u64()), Some(42));
2314
2315 handle.drain_and_stop("test").unwrap();
2316 handle.await;
2317 }
2318
2319 #[tokio::test]
2325 async fn test_introspect_wedged() {
2326 #[derive(Debug, Default)]
2327 #[hyperactor::export(handlers = [u64])]
2328 struct WedgedActor;
2329
2330 #[async_trait]
2331 impl Actor for WedgedActor {}
2332
2333 #[async_trait]
2334 impl Handler<u64> for WedgedActor {
2335 async fn handle(
2336 &mut self,
2337 _cx: &Context<Self>,
2338 _message: u64,
2339 ) -> Result<(), anyhow::Error> {
2340 std::future::pending::<()>().await;
2342 Ok(())
2343 }
2344 }
2345
2346 let proc = Proc::isolated();
2347 let (client, _) = proc.client("client").unwrap();
2348 let handle = proc.spawn("wedged", WedgedActor).unwrap();
2349
2350 handle
2352 .status()
2353 .wait_for(|s| matches!(s, ActorStatus::Idle))
2354 .await
2355 .unwrap();
2356
2357 handle.post(&client, 1u64);
2359
2360 tokio::time::sleep(Duration::from_millis(50)).await;
2362
2363 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2365 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2366 &client,
2367 IntrospectMessage::Query {
2368 view: IntrospectView::Actor,
2369 reply: reply_port.bind(),
2370 },
2371 );
2372
2373 let payload = tokio::time::timeout(Duration::from_secs(5), reply_rx.recv())
2375 .await
2376 .expect("introspect should not hang on a wedged actor")
2377 .unwrap();
2378
2379 assert_status_contains(&payload, "processing");
2380 assert_handler_not_contains(&payload, "IntrospectMessage");
2381 }
2382
2383 #[tokio::test]
2388 async fn test_introspect_no_perturbation() {
2389 let proc = Proc::isolated();
2390 let (client, _) = proc.client("client").unwrap();
2391 let (tx, mut rx) = client.open_port::<u64>();
2392 let actor = EchoActor(tx.bind());
2393 let handle = proc.spawn::<EchoActor>("echo_no_perturb", actor).unwrap();
2394
2395 handle
2397 .status()
2398 .wait_for(|s| matches!(s, ActorStatus::Idle))
2399 .await
2400 .unwrap();
2401
2402 handle.post(&client, 42u64);
2404 let _ = rx.recv().await.unwrap();
2405
2406 let (reply_port1, reply_rx1) = client.open_once_port::<IntrospectResult>();
2408 PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2409 &client,
2410 IntrospectMessage::Query {
2411 view: IntrospectView::Actor,
2412 reply: reply_port1.bind(),
2413 },
2414 );
2415 let payload1 = reply_rx1.recv().await.unwrap();
2416
2417 let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2419 crate::PortRef::<IntrospectMessage>::attest_handler_port(handle.actor_addr()).post(
2420 &client,
2421 IntrospectMessage::Query {
2422 view: IntrospectView::Actor,
2423 reply: reply_port2.bind(),
2424 },
2425 );
2426 let payload2 = reply_rx2.recv().await.unwrap();
2427
2428 assert_handler_not_contains(&payload1, "IntrospectMessage");
2430 assert_handler_not_contains(&payload2, "IntrospectMessage");
2431 let attrs1: serde_json::Value = serde_json::from_str(&payload1.attrs).unwrap();
2434 let attrs2: serde_json::Value = serde_json::from_str(&payload2.attrs).unwrap();
2435 assert_eq!(attrs1, attrs2, "consecutive queries should be identical");
2436
2437 handle.drain_and_stop("test").unwrap();
2438 handle.await;
2439 }
2440
2441 #[tokio::test]
2448 async fn test_introspectable_instance_responds_to_query() {
2449 let proc = Proc::isolated();
2450 let (bridge, handle) = proc.introspectable_instance("bridge").unwrap();
2451 let actor_id: crate::ActorAddr = handle.actor_addr().clone();
2452
2453 let (reply_port, reply_rx) = bridge.open_once_port::<IntrospectResult>();
2454 PortRef::<IntrospectMessage>::attest_handler_port(&actor_id).post(
2455 &bridge,
2456 IntrospectMessage::Query {
2457 view: IntrospectView::Actor,
2458 reply: reply_port.bind(),
2459 },
2460 );
2461 let payload = reply_rx.recv().await.unwrap();
2462
2463 assert_eq!(
2466 payload.identity,
2467 crate::introspect::IntrospectRef::Actor(actor_id.clone())
2468 );
2469 assert_status(&payload, "client");
2470 let actor_type = attrs_get(&payload.attrs, "actor_type")
2471 .and_then(|v| v.as_str().map(String::from))
2472 .expect("must have actor_type");
2473 assert_eq!(actor_type, "()", "CI-1: actor_type must be \"()\"");
2474 }
2475
2476 #[tokio::test]
2484 async fn test_instance_does_not_respond_to_query() {
2485 let proc = Proc::isolated();
2486 let (client, _client_handle) = proc.client("client").unwrap();
2487 let (_mailbox, mailbox_handle) = proc.client("mailbox").unwrap();
2488 let mailbox_id: crate::ActorAddr = mailbox_handle.actor_addr().clone();
2489
2490 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2491 PortRef::<IntrospectMessage>::attest_handler_port(&mailbox_id).post(
2492 &client,
2493 IntrospectMessage::Query {
2494 view: IntrospectView::Actor,
2495 reply: reply_port.bind(),
2496 },
2497 );
2498
2499 let result = tokio::time::timeout(Duration::from_millis(100), reply_rx.recv()).await;
2502 assert!(
2503 result.is_err(),
2504 "client() must not respond to IntrospectMessage (introspect receiver dropped)"
2505 );
2506 }
2507
2508 #[tokio::test]
2513 async fn test_introspectable_instance_snapshot_on_drop() {
2514 let proc = Proc::isolated();
2515 let (instance, handle) = proc.introspectable_instance("bridge").unwrap();
2516 let actor_id = handle.actor_addr().clone();
2517
2518 assert!(
2519 proc.all_actor_ids().contains(&actor_id),
2520 "should appear in all_actor_ids while live"
2521 );
2522
2523 drop(instance);
2526
2527 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2528 loop {
2529 if proc.terminated_snapshot(&actor_id).is_some() {
2530 break;
2531 }
2532 assert!(
2533 std::time::Instant::now() < deadline,
2534 "timed out waiting for terminated snapshot"
2535 );
2536 tokio::task::yield_now().await;
2537 }
2538
2539 let snapshot = proc.terminated_snapshot(&actor_id).unwrap();
2540 let actor_status = attrs_get(&snapshot.attrs, "status")
2541 .and_then(|v| v.as_str().map(String::from))
2542 .expect("snapshot attrs must contain status");
2543 assert!(
2544 actor_status.starts_with("stopped"),
2545 "CI-2: snapshot actor_status should be stopped, got: {}",
2546 actor_status
2547 );
2548 }
2549}