1#![allow(dead_code)] use std::any::TypeId;
14use std::borrow::Cow;
15use std::fmt;
16use std::fmt::Debug;
17use std::future::Future;
18use std::future::IntoFuture;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use hyperactor_config::Flattrs;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::watch;
31use tokio::task::JoinHandle;
32use typeuri::Named;
33
34use crate as hyperactor; use crate::Data;
36use crate::Message;
37use crate::RemoteMessage;
38use crate::checkpoint::CheckpointError;
39use crate::checkpoint::Checkpointable;
40use crate::context;
41use crate::mailbox::MailboxError;
42use crate::mailbox::MailboxSenderError;
43use crate::mailbox::MessageEnvelope;
44use crate::mailbox::PortHandle;
45use crate::mailbox::Undeliverable;
46use crate::mailbox::UndeliverableMessageError;
47use crate::mailbox::log::MessageLogError;
48use crate::message::Castable;
49use crate::message::IndexedErasedUnbound;
50use crate::proc::Context;
51use crate::proc::Instance;
52use crate::proc::InstanceCell;
53use crate::proc::Ports;
54use crate::proc::Proc;
55use crate::reference;
56use crate::supervision::ActorSupervisionEvent;
57
58pub mod remote;
59
60#[async_trait]
68pub trait Actor: Sized + Send + 'static {
69 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
74 Ok(())
76 }
77
78 async fn cleanup(
89 &mut self,
90 _this: &Instance<Self>,
91 _err: Option<&ActorError>,
92 ) -> Result<(), anyhow::Error> {
93 Ok(())
95 }
96
97 fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
100 cx.instance().spawn(self)
101 }
102
103 fn spawn_with_name(
106 self,
107 cx: &impl context::Actor,
108 name: &str,
109 ) -> anyhow::Result<ActorHandle<Self>> {
110 cx.instance().spawn_with_name(name, self)
111 }
112
113 fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
120 Proc::local().spawn("anon", self)
121 }
122
123 #[hyperactor::instrument_infallible]
127 fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
128 where
129 F: Future + Send + 'static,
130 F::Output: Send + 'static,
131 {
132 tokio::spawn(future)
133 }
134
135 async fn handle_supervision_event(
137 &mut self,
138 _this: &Instance<Self>,
139 _event: &ActorSupervisionEvent,
140 ) -> Result<bool, anyhow::Error> {
141 Ok(false)
143 }
144
145 async fn handle_undeliverable_message(
147 &mut self,
148 cx: &Instance<Self>,
149 envelope: Undeliverable<MessageEnvelope>,
150 ) -> Result<(), anyhow::Error> {
151 handle_undeliverable_message(cx, envelope)
152 }
153
154 fn display_name(&self) -> Option<String> {
158 None
159 }
160}
161
162pub fn handle_undeliverable_message<A: Actor>(
166 cx: &Instance<A>,
167 Undeliverable(envelope): Undeliverable<MessageEnvelope>,
168) -> Result<(), anyhow::Error> {
169 assert_eq!(envelope.sender(), cx.self_id());
170
171 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
172}
173
174#[async_trait]
177impl Actor for () {}
178
179impl Referable for () {}
180
181impl Binds<()> for () {
182 fn bind(_ports: &Ports<Self>) {
183 }
185}
186
187#[async_trait]
189pub trait Handler<M>: Actor {
190 async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
192}
193
194#[async_trait]
197impl<A: Actor> Handler<Signal> for A {
198 async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
199 unimplemented!("signal handler should not be called directly")
200 }
201}
202
203#[async_trait]
206impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
207 async fn handle(
208 &mut self,
209 cx: &Context<Self>,
210 message: Undeliverable<MessageEnvelope>,
211 ) -> Result<(), anyhow::Error> {
212 let sender = message.0.sender().clone();
213 let dest = message.0.dest().clone();
214 let error = message.0.error_msg().unwrap_or(String::new());
215 match self.handle_undeliverable_message(cx, message).await {
216 Ok(_) => {
217 tracing::debug!(
218 actor_id = %cx.self_id(),
219 name = "undeliverable_message_handled",
220 %sender,
221 %dest,
222 error,
223 );
224 Ok(())
225 }
226 Err(e) => {
227 tracing::error!(
228 actor_id = %cx.self_id(),
229 name = "undeliverable_message",
230 %sender,
231 %dest,
232 error,
233 handler_error = %e,
234 );
235 Err(e)
236 }
237 }
238 }
239}
240
241#[async_trait]
244impl<A, M> Handler<IndexedErasedUnbound<M>> for A
245where
246 A: Handler<M>,
247 M: Castable,
248{
249 async fn handle(
250 &mut self,
251 cx: &Context<Self>,
252 msg: IndexedErasedUnbound<M>,
253 ) -> anyhow::Result<()> {
254 let message = msg.downcast()?.bind()?;
255 Handler::handle(self, cx, message).await
256 }
257}
258
259#[async_trait]
275pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
276 type Params: RemoteMessage;
278
279 async fn new(params: Self::Params, environment: Flattrs) -> anyhow::Result<Self>;
283
284 fn gspawn(
289 proc: &Proc,
290 name: &str,
291 serialized_params: Data,
292 environment: Flattrs,
293 ) -> Pin<Box<dyn Future<Output = Result<reference::ActorId, anyhow::Error>> + Send>> {
294 let proc = proc.clone();
295 let name = name.to_string();
296 Box::pin(async move {
297 let params = bincode::deserialize(&serialized_params)?;
298 let actor = Self::new(params, environment).await?;
299 let handle = proc.spawn(&name, actor)?;
300 Ok(handle.bind::<Self>().actor_id)
312 })
313 }
314
315 fn get_type_id() -> TypeId {
317 TypeId::of::<Self>()
318 }
319}
320
321#[async_trait]
324impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
325 type Params = ();
326
327 async fn new(_params: Self::Params, _environment: Flattrs) -> anyhow::Result<Self> {
328 Ok(Default::default())
329 }
330}
331
332#[async_trait]
333impl<T> Checkpointable for T
334where
335 T: RemoteMessage + Clone,
336{
337 type State = T;
338 async fn save(&self) -> Result<Self::State, CheckpointError> {
339 Ok(self.clone())
340 }
341
342 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
343 Ok(state)
344 }
345}
346
347#[derive(Debug)]
350pub struct ActorError {
351 pub actor_id: Box<reference::ActorId>,
353 pub kind: Box<ActorErrorKind>,
355}
356
357#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
359pub enum ActorErrorKind {
360 #[error("{0}")]
362 Generic(String),
363
364 #[error("{0} while handling {1}")]
366 ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
367
368 #[error("{0}")]
370 UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
371
372 #[error("actor explicitly aborted due to: {0}")]
374 Aborted(String),
375}
376
377impl ActorErrorKind {
378 pub fn processing(err: anyhow::Error) -> Self {
381 err.downcast::<ActorErrorKind>()
386 .unwrap_or_else(|err| Self::Generic(err.to_string()))
387 }
388
389 pub fn panic(err: anyhow::Error) -> Self {
391 Self::Generic(format!("panic: {}", err))
392 }
393
394 pub fn init(err: anyhow::Error) -> Self {
396 Self::Generic(format!("initialization error: {}", err))
397 }
398
399 pub fn cleanup(err: anyhow::Error) -> Self {
401 Self::Generic(format!("cleanup error: {}", err))
402 }
403
404 pub fn mailbox(err: MailboxError) -> Self {
406 Self::Generic(err.to_string())
407 }
408
409 pub fn mailbox_sender(err: MailboxSenderError) -> Self {
411 Self::Generic(err.to_string())
412 }
413
414 pub fn checkpoint(err: CheckpointError) -> Self {
416 Self::Generic(format!("checkpoint error: {}", err))
417 }
418
419 pub fn message_log(err: MessageLogError) -> Self {
421 Self::Generic(format!("message log error: {}", err))
422 }
423
424 pub fn indeterminate_state() -> Self {
426 Self::Generic("actor is in an indeterminate state".to_string())
427 }
428}
429
430impl ActorError {
431 pub(crate) fn new(actor_id: &reference::ActorId, kind: ActorErrorKind) -> Self {
433 Self {
434 actor_id: Box::new(actor_id.clone()),
435 kind: Box::new(kind),
436 }
437 }
438}
439
440impl fmt::Display for ActorError {
441 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442 write!(f, "serving {}: ", self.actor_id)?;
443 fmt::Display::fmt(&self.kind, f)
444 }
445}
446
447impl std::error::Error for ActorError {
448 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
449 self.kind.source()
450 }
451}
452
453impl From<MailboxError> for ActorError {
454 fn from(inner: MailboxError) -> Self {
455 Self {
456 actor_id: Box::new(inner.actor_id().clone()),
457 kind: Box::new(ActorErrorKind::mailbox(inner)),
458 }
459 }
460}
461
462impl From<MailboxSenderError> for ActorError {
463 fn from(inner: MailboxSenderError) -> Self {
464 Self {
465 actor_id: Box::new(inner.location().actor_id().clone()),
466 kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
467 }
468 }
469}
470
471#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
478pub enum Signal {
479 DrainAndStop(String),
481
482 Stop(String),
484
485 ChildStopped(reference::Index),
487
488 Abort(String),
492}
493wirevalue::register_type!(Signal);
494
495impl fmt::Display for Signal {
496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497 match self {
498 Signal::DrainAndStop(reason) => write!(f, "DrainAndStop({})", reason),
499 Signal::Stop(reason) => write!(f, "Stop({})", reason),
500 Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
501 Signal::Abort(reason) => write!(f, "Abort({})", reason),
502 }
503 }
504}
505
506#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
511pub struct HandlerInfo {
512 pub typename: Cow<'static, str>,
514 pub arm: Option<Cow<'static, str>>,
516}
517
518impl HandlerInfo {
519 pub fn from_static(typename: &'static str, arm: Option<&'static str>) -> Self {
521 Self {
522 typename: Cow::Borrowed(typename),
523 arm: arm.map(Cow::Borrowed),
524 }
525 }
526
527 pub fn from_owned(typename: String, arm: Option<String>) -> Self {
529 Self {
530 typename: Cow::Owned(typename),
531 arm: arm.map(Cow::Owned),
532 }
533 }
534}
535
536impl fmt::Display for HandlerInfo {
537 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538 match &self.arm {
539 Some(arm) => write!(f, "{}.{}", self.typename, arm),
540 None => write!(f, "{}", self.typename),
541 }
542 }
543}
544
545#[derive(
547 Debug,
548 Serialize,
549 Deserialize,
550 PartialEq,
551 Eq,
552 Clone,
553 typeuri::Named,
554 EnumAsInner
555)]
556pub enum ActorStatus {
557 Unknown,
559 Created,
561 Initializing,
563 Client,
566 Idle,
568 Processing(SystemTime, Option<HandlerInfo>),
571 Saving(SystemTime),
573 Loading(SystemTime),
575 Stopping,
577 Stopped(String),
580 Failed(ActorErrorKind),
582}
583
584impl ActorStatus {
585 pub fn is_terminal(&self) -> bool {
587 self.is_stopped() || self.is_failed()
588 }
589
590 pub fn generic_failure(message: impl Into<String>) -> Self {
592 Self::Failed(ActorErrorKind::Generic(message.into()))
593 }
594
595 fn span_string(&self) -> &'static str {
596 self.arm().unwrap_or_default()
597 }
598}
599
600impl fmt::Display for ActorStatus {
601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602 match self {
603 Self::Unknown => write!(f, "unknown"),
604 Self::Created => write!(f, "created"),
605 Self::Initializing => write!(f, "initializing"),
606 Self::Client => write!(f, "client"),
607 Self::Idle => write!(f, "idle"),
608 Self::Processing(instant, None) => {
609 write!(
610 f,
611 "processing for {}ms",
612 std::time::SystemTime::now()
613 .duration_since(*instant)
614 .unwrap_or_default()
615 .as_millis()
616 )
617 }
618 Self::Processing(instant, Some(handler_info)) => {
619 write!(
620 f,
621 "{}: processing for {}ms",
622 handler_info,
623 std::time::SystemTime::now()
624 .duration_since(*instant)
625 .unwrap_or_default()
626 .as_millis()
627 )
628 }
629 Self::Saving(instant) => {
630 write!(
631 f,
632 "saving for {}ms",
633 std::time::SystemTime::now()
634 .duration_since(*instant)
635 .unwrap_or_default()
636 .as_millis()
637 )
638 }
639 Self::Loading(instant) => {
640 write!(
641 f,
642 "loading for {}ms",
643 std::time::SystemTime::now()
644 .duration_since(*instant)
645 .unwrap_or_default()
646 .as_millis()
647 )
648 }
649 Self::Stopping => write!(f, "stopping"),
650 Self::Stopped(reason) => write!(f, "stopped: {}", reason),
651 Self::Failed(err) => write!(f, "failed: {}", err),
652 }
653 }
654}
655
656pub struct ActorHandle<A: Actor> {
665 cell: InstanceCell,
666 ports: Arc<Ports<A>>,
667}
668
669impl<A: Actor> ActorHandle<A> {
671 pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
672 Self { cell, ports }
673 }
674
675 pub(crate) fn cell(&self) -> &InstanceCell {
678 &self.cell
679 }
680
681 pub fn actor_id(&self) -> &reference::ActorId {
683 self.cell.actor_id()
684 }
685
686 pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
688 tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
689 self.cell.signal(Signal::DrainAndStop(reason.to_string()))
690 }
691
692 pub fn status(&self) -> watch::Receiver<ActorStatus> {
694 self.cell.status().clone()
695 }
696
697 pub fn send<M: Message>(
700 &self,
701 cx: &impl context::Actor,
702 message: M,
703 ) -> Result<(), MailboxSenderError>
704 where
705 A: Handler<M>,
706 {
707 self.ports.get().send(cx, message)
708 }
709
710 pub fn port<M: Message>(&self) -> PortHandle<M>
712 where
713 A: Handler<M>,
714 {
715 self.ports.get()
716 }
717
718 pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
721 self.cell.bind(self.ports.as_ref())
722 }
723}
724
725impl<A: Actor> IntoFuture for ActorHandle<A> {
729 type Output = ActorStatus;
730 type IntoFuture = BoxFuture<'static, Self::Output>;
731
732 fn into_future(self) -> Self::IntoFuture {
733 let future = async move {
734 let mut status_receiver = self.cell.status().clone();
735 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
736 match result {
737 Err(_) => ActorStatus::Unknown,
738 Ok(status) => status.clone(),
739 }
740 };
741
742 future.boxed()
743 }
744}
745
746impl<A: Actor> Debug for ActorHandle<A> {
747 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
748 f.debug_struct("ActorHandle").field("cell", &"..").finish()
749 }
750}
751
752impl<A: Actor> Clone for ActorHandle<A> {
753 fn clone(&self) -> Self {
754 Self {
755 cell: self.cell.clone(),
756 ports: self.ports.clone(),
757 }
758 }
759}
760
761pub trait Referable: Named {}
776
777pub trait Binds<A: Actor>: Referable {
780 fn bind(ports: &Ports<A>);
782}
783
784pub trait RemoteHandles<M: RemoteMessage>: Referable {}
787
788#[macro_export]
819macro_rules! assert_behaves {
820 ($ty:ty as $behavior:ty) => {
821 const _: fn() = || {
822 fn check<B: hyperactor::actor::Binds<$ty>>() {}
823 check::<$behavior>();
824 };
825 };
826}
827
828#[cfg(test)]
829mod tests {
830 use std::sync::Mutex;
831 use std::time::Duration;
832
833 use rand::seq::SliceRandom;
834 use timed_test::async_timed_test;
835 use tokio::sync::mpsc;
836 use tokio::time::timeout;
837
838 use super::*;
839 use crate as hyperactor;
840 use crate::Actor;
841 use crate::OncePortHandle;
842 use crate::checkpoint::CheckpointError;
843 use crate::checkpoint::Checkpointable;
844 use crate::config;
845 use crate::context::Mailbox as _;
846 use crate::introspect::IntrospectMessage;
847 use crate::introspect::IntrospectResult;
848 use crate::introspect::IntrospectView;
849 use crate::mailbox::BoxableMailboxSender as _;
850 use crate::mailbox::MailboxSender;
851 use crate::mailbox::PortLocation;
852 use crate::mailbox::monitored_return_handle;
853 use crate::ordering::SEQ_INFO;
854 use crate::ordering::SeqInfo;
855 use crate::testing::ids::test_proc_id;
856 use crate::testing::pingpong::PingPongActor;
857 use crate::testing::pingpong::PingPongMessage;
858 use crate::testing::proc_supervison::ProcSupervisionCoordinator; #[derive(Debug)]
861 struct EchoActor(reference::PortRef<u64>);
862
863 #[async_trait]
864 impl Actor for EchoActor {}
865
866 #[async_trait]
867 impl Handler<u64> for EchoActor {
868 async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
869 let Self(port) = self;
870 port.send(cx, message)?;
871 Ok(())
872 }
873 }
874
875 #[tokio::test]
876 async fn test_server_basic() {
877 let proc = Proc::local();
878 let (client, _) = proc.instance("client").unwrap();
879 let (tx, mut rx) = client.open_port();
880 let actor = EchoActor(tx.bind());
881 let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
882 handle.send(&client, 123u64).unwrap();
883 handle.drain_and_stop("test").unwrap();
884 handle.await;
885
886 assert_eq!(rx.drain(), vec![123u64]);
887 }
888
889 #[tokio::test]
890 async fn test_ping_pong() {
891 let proc = Proc::local();
892 let (client, _) = proc.instance("client").unwrap();
893 let (undeliverable_msg_tx, _) = client.open_port();
894
895 let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
896 let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
897 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
898 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
899
900 let (local_port, local_receiver) = client.open_once_port();
901
902 ping_handle
903 .send(
904 &client,
905 PingPongMessage(10, pong_handle.bind(), local_port.bind()),
906 )
907 .unwrap();
908
909 assert!(local_receiver.recv().await.unwrap());
910 }
911
912 #[tokio::test]
913 async fn test_ping_pong_on_handler_error() {
914 let proc = Proc::local();
915 let (client, _) = proc.instance("client").unwrap();
916 let (undeliverable_msg_tx, _) = client.open_port();
917
918 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
921
922 let error_ttl = 66;
923
924 let ping_actor =
925 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
926 let pong_actor =
927 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
928 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
929 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
930
931 let (local_port, local_receiver) = client.open_once_port();
932
933 ping_handle
934 .send(
935 &client,
936 PingPongMessage(
937 error_ttl + 1, pong_handle.bind(),
939 local_port.bind(),
940 ),
941 )
942 .unwrap();
943
944 let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
946 timeout(Duration::from_secs(5), local_receiver.recv()).await;
947 assert!(res.is_err());
948 }
949
950 #[derive(Debug)]
951 struct InitActor(bool);
952
953 #[async_trait]
954 impl Actor for InitActor {
955 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
956 self.0 = true;
957 Ok(())
958 }
959 }
960
961 #[async_trait]
962 impl Handler<OncePortHandle<bool>> for InitActor {
963 async fn handle(
964 &mut self,
965 cx: &Context<Self>,
966 port: OncePortHandle<bool>,
967 ) -> Result<(), anyhow::Error> {
968 port.send(cx, self.0)?;
969 Ok(())
970 }
971 }
972
973 #[tokio::test]
974 async fn test_init() {
975 let proc = Proc::local();
976 let actor = InitActor(false);
977 let handle = proc.spawn::<InitActor>("init", actor).unwrap();
978 let (client, _) = proc.instance("client").unwrap();
979
980 let (port, receiver) = client.open_once_port();
981 handle.send(&client, port).unwrap();
982 assert!(receiver.recv().await.unwrap());
983
984 handle.drain_and_stop("test").unwrap();
985 handle.await;
986 }
987
988 #[derive(Debug)]
989 struct CheckpointActor {
990 sum: u64,
992 port: reference::PortRef<u64>,
993 }
994
995 #[async_trait]
996 impl Actor for CheckpointActor {}
997
998 #[async_trait]
999 impl Handler<u64> for CheckpointActor {
1000 async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
1001 self.sum += value;
1002 self.port.send(cx, self.sum)?;
1003 Ok(())
1004 }
1005 }
1006
1007 #[async_trait]
1008 impl Checkpointable for CheckpointActor {
1009 type State = (u64, reference::PortRef<u64>);
1010
1011 async fn save(&self) -> Result<Self::State, CheckpointError> {
1012 Ok((self.sum, self.port.clone()))
1013 }
1014
1015 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
1016 let (sum, port) = state;
1017 Ok(CheckpointActor { sum, port })
1018 }
1019 }
1020
1021 type MultiValues = Arc<Mutex<(u64, String)>>;
1022
1023 struct MultiValuesTest {
1024 proc: Proc,
1025 values: MultiValues,
1026 handle: ActorHandle<MultiActor>,
1027 client: Instance<()>,
1028 _client_handle: ActorHandle<()>,
1029 }
1030
1031 impl MultiValuesTest {
1032 async fn new() -> Self {
1033 let proc = Proc::local();
1034 let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
1035 let actor = MultiActor(values.clone());
1036 let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
1037 let (client, client_handle) = proc.instance("client").unwrap();
1038 Self {
1039 proc,
1040 values,
1041 handle,
1042 client,
1043 _client_handle: client_handle,
1044 }
1045 }
1046
1047 fn send<M>(&self, message: M)
1048 where
1049 M: RemoteMessage,
1050 MultiActor: Handler<M>,
1051 {
1052 self.handle.send(&self.client, message).unwrap()
1053 }
1054
1055 async fn sync(&self) {
1056 let (port, done) = self.client.open_once_port::<bool>();
1057 self.handle.send(&self.client, port).unwrap();
1058 assert!(done.recv().await.unwrap());
1059 }
1060
1061 fn get_values(&self) -> (u64, String) {
1062 self.values.lock().unwrap().clone()
1063 }
1064 }
1065
1066 #[derive(Debug)]
1067 #[hyperactor::export(handlers = [u64, String])]
1068 struct MultiActor(MultiValues);
1069
1070 #[async_trait]
1071 impl Actor for MultiActor {}
1072
1073 #[async_trait]
1074 impl Handler<u64> for MultiActor {
1075 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1076 let mut vals = self.0.lock().unwrap();
1077 vals.0 = message;
1078 Ok(())
1079 }
1080 }
1081
1082 #[async_trait]
1083 impl Handler<String> for MultiActor {
1084 async fn handle(
1085 &mut self,
1086 _cx: &Context<Self>,
1087 message: String,
1088 ) -> Result<(), anyhow::Error> {
1089 let mut vals = self.0.lock().unwrap();
1090 vals.1 = message;
1091 Ok(())
1092 }
1093 }
1094
1095 #[async_trait]
1096 impl Handler<OncePortHandle<bool>> for MultiActor {
1097 async fn handle(
1098 &mut self,
1099 cx: &Context<Self>,
1100 message: OncePortHandle<bool>,
1101 ) -> Result<(), anyhow::Error> {
1102 message.send(cx, true).unwrap();
1103 Ok(())
1104 }
1105 }
1106
1107 #[tokio::test]
1108 async fn test_multi_handler_refs() {
1109 let test = MultiValuesTest::new().await;
1110
1111 test.send(123u64);
1112 test.send("foo".to_string());
1113 test.sync().await;
1114 assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1115
1116 let myref: reference::ActorRef<MultiActor> = test.handle.bind();
1117
1118 myref.port().send(&test.client, 321u64).unwrap();
1119 test.sync().await;
1120 assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1121
1122 myref.port().send(&test.client, "bar".to_string()).unwrap();
1123 test.sync().await;
1124 assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1125 }
1126
1127 #[tokio::test]
1128 async fn test_ref_behavior() {
1129 let test = MultiValuesTest::new().await;
1130
1131 test.send(123u64);
1132 test.send("foo".to_string());
1133
1134 hyperactor::behavior!(MyActorBehavior, u64, String);
1135
1136 let myref: reference::ActorRef<MyActorBehavior> = test.handle.bind();
1137 myref.port().send(&test.client, "biz".to_string()).unwrap();
1138 myref.port().send(&test.client, 999u64).unwrap();
1139
1140 test.sync().await;
1141 assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1142 }
1143
1144 #[tokio::test]
1145 async fn test_actor_handle_downcast() {
1146 #[derive(Debug, Default)]
1147 struct NothingActor;
1148
1149 impl Actor for NothingActor {}
1150
1151 let proc = Proc::local();
1154 let handle = proc.spawn("nothing", NothingActor).unwrap();
1155 let cell = handle.cell();
1156
1157 assert!(cell.downcast_handle::<EchoActor>().is_none());
1159
1160 let handle = cell.downcast_handle::<NothingActor>().unwrap();
1161 handle.drain_and_stop("test").unwrap();
1162 handle.await;
1163 }
1164
1165 #[derive(Debug)]
1167 #[hyperactor::export(handlers = [String, Callback])]
1168 struct GetSeqActor(reference::PortRef<(String, SeqInfo)>);
1169
1170 #[async_trait]
1171 impl Actor for GetSeqActor {}
1172
1173 #[async_trait]
1174 impl Handler<String> for GetSeqActor {
1175 async fn handle(
1176 &mut self,
1177 cx: &Context<Self>,
1178 message: String,
1179 ) -> Result<(), anyhow::Error> {
1180 let Self(port) = self;
1181 let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1182 port.send(cx, (message, seq_info.clone()))?;
1183 Ok(())
1184 }
1185 }
1186
1187 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
1192 struct Callback(reference::PortRef<reference::PortRef<String>>);
1193
1194 #[async_trait]
1195 impl Handler<Callback> for GetSeqActor {
1196 async fn handle(
1197 &mut self,
1198 cx: &Context<Self>,
1199 message: Callback,
1200 ) -> Result<(), anyhow::Error> {
1201 let (handle, mut receiver) = cx.open_port::<String>();
1202 let callback_ref = handle.bind();
1203 message.0.send(cx, callback_ref).unwrap();
1204 let msg = receiver.recv().await.unwrap();
1205 self.handle(cx, msg).await
1206 }
1207 }
1208
1209 #[async_timed_test(timeout_secs = 30)]
1210 async fn test_sequencing_actor_handle_basic() {
1211 let proc = Proc::local();
1212 let (client, _) = proc.instance("client").unwrap();
1213 let (tx, mut rx) = client.open_port();
1214
1215 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1216
1217 actor_handle.send(&client, "unbound".to_string()).unwrap();
1219 assert_eq!(
1220 rx.recv().await.unwrap(),
1221 ("unbound".to_string(), SeqInfo::Direct)
1222 );
1223
1224 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1225
1226 let session_id = client.sequencer().session_id();
1227 let mut expected_seq = 0;
1228 for m in 0..10 {
1230 actor_handle.send(&client, format!("{m}")).unwrap();
1231 expected_seq += 1;
1232 assert_eq!(
1233 rx.recv().await.unwrap(),
1234 (
1235 format!("{m}"),
1236 SeqInfo::Session {
1237 session_id,
1238 seq: expected_seq,
1239 }
1240 )
1241 );
1242
1243 for n in 0..2 {
1244 actor_ref.port().send(&client, format!("{m}-{n}")).unwrap();
1245 expected_seq += 1;
1246 assert_eq!(
1247 rx.recv().await.unwrap(),
1248 (
1249 format!("{m}-{n}"),
1250 SeqInfo::Session {
1251 session_id,
1252 seq: expected_seq,
1253 }
1254 )
1255 );
1256 }
1257 }
1258 }
1259
1260 #[async_timed_test(timeout_secs = 30)]
1262 async fn test_sequencing_mixed_actor_and_non_actor_ports() {
1263 let proc = Proc::local();
1264 let (client, _) = proc.instance("client").unwrap();
1265
1266 let (actor_tx, mut actor_rx) = client.open_port();
1268
1269 let (non_actor_tx, mut non_actor_rx) = mpsc::unbounded_channel::<Option<SeqInfo>>();
1271
1272 let actor_handle = proc.spawn("get_seq", GetSeqActor(actor_tx.bind())).unwrap();
1273 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1274
1275 let non_actor_tx_clone = non_actor_tx.clone();
1277 let non_actor_port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1278 let seq_info = headers.get(SEQ_INFO);
1279 non_actor_tx_clone.send(seq_info).unwrap();
1280 Ok(())
1281 });
1282
1283 non_actor_port_handle.bind();
1285 let non_actor_port_id = match non_actor_port_handle.location() {
1286 PortLocation::Bound(port_id) => port_id,
1287 _ => panic!("port_handle should be bound"),
1288 };
1289 assert!(!non_actor_port_id.is_actor_port());
1290
1291 let session_id = client.sequencer().session_id();
1292
1293 actor_handle.send(&client, "msg1".to_string()).unwrap();
1295 assert_eq!(
1296 actor_rx.recv().await.unwrap().1,
1297 SeqInfo::Session { session_id, seq: 1 }
1298 );
1299
1300 actor_ref.port().send(&client, "msg2".to_string()).unwrap();
1302 assert_eq!(
1303 actor_rx.recv().await.unwrap().1,
1304 SeqInfo::Session { session_id, seq: 2 }
1305 );
1306
1307 non_actor_port_handle.send(&client, ()).unwrap();
1309 assert_eq!(
1310 non_actor_rx.recv().await.unwrap(),
1311 Some(SeqInfo::Session { session_id, seq: 1 })
1312 );
1313
1314 actor_handle.send(&client, "msg3".to_string()).unwrap();
1316 assert_eq!(
1317 actor_rx.recv().await.unwrap().1,
1318 SeqInfo::Session { session_id, seq: 3 }
1319 );
1320
1321 non_actor_port_handle.send(&client, ()).unwrap();
1323 assert_eq!(
1324 non_actor_rx.recv().await.unwrap(),
1325 Some(SeqInfo::Session { session_id, seq: 2 })
1326 );
1327
1328 actor_ref.port().send(&client, "msg4".to_string()).unwrap();
1330 assert_eq!(
1331 actor_rx.recv().await.unwrap().1,
1332 SeqInfo::Session { session_id, seq: 4 }
1333 );
1334
1335 actor_handle.drain_and_stop("test cleanup").unwrap();
1336 actor_handle.await;
1337 }
1338
1339 #[async_timed_test(timeout_secs = 30)]
1341 async fn test_sequencing_multiple_clients() {
1342 let proc = Proc::local();
1343 let (client1, _) = proc.instance("client1").unwrap();
1344 let (client2, _) = proc.instance("client2").unwrap();
1345
1346 let (tx, mut rx) = client1.open_port();
1348
1349 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1350 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1351
1352 let session_id_1 = client1.sequencer().session_id();
1354 let session_id_2 = client2.sequencer().session_id();
1355 assert_ne!(session_id_1, session_id_2);
1356
1357 actor_handle.send(&client1, "c1_msg1".to_string()).unwrap();
1359 assert_eq!(
1360 rx.recv().await.unwrap().1,
1361 SeqInfo::Session {
1362 session_id: session_id_1,
1363 seq: 1
1364 }
1365 );
1366
1367 actor_handle.send(&client2, "c2_msg1".to_string()).unwrap();
1369 assert_eq!(
1370 rx.recv().await.unwrap().1,
1371 SeqInfo::Session {
1372 session_id: session_id_2,
1373 seq: 1
1374 }
1375 );
1376
1377 actor_ref
1379 .port()
1380 .send(&client1, "c1_msg2".to_string())
1381 .unwrap();
1382 assert_eq!(
1383 rx.recv().await.unwrap().1,
1384 SeqInfo::Session {
1385 session_id: session_id_1,
1386 seq: 2
1387 }
1388 );
1389
1390 actor_ref
1392 .port()
1393 .send(&client2, "c2_msg2".to_string())
1394 .unwrap();
1395 assert_eq!(
1396 rx.recv().await.unwrap().1,
1397 SeqInfo::Session {
1398 session_id: session_id_2,
1399 seq: 2
1400 }
1401 );
1402
1403 actor_handle.send(&client1, "c1_msg3".to_string()).unwrap();
1405 assert_eq!(
1406 rx.recv().await.unwrap().1,
1407 SeqInfo::Session {
1408 session_id: session_id_1,
1409 seq: 3
1410 }
1411 );
1412
1413 actor_ref
1414 .port()
1415 .send(&client2, "c2_msg3".to_string())
1416 .unwrap();
1417 assert_eq!(
1418 rx.recv().await.unwrap().1,
1419 SeqInfo::Session {
1420 session_id: session_id_2,
1421 seq: 3
1422 }
1423 );
1424
1425 actor_handle.drain_and_stop("test cleanup").unwrap();
1426 actor_handle.await;
1427 }
1428
1429 #[async_timed_test(timeout_secs = 30)]
1450 async fn test_sequencing_actor_handle_callback() {
1451 let config = hyperactor_config::global::lock();
1452 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1453
1454 let proc = Proc::local();
1455 let (client, _) = proc.instance("client").unwrap();
1456 let (tx, mut rx) = client.open_port();
1457
1458 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1459 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1460
1461 let (callback_tx, mut callback_rx) = client.open_port();
1462 actor_ref
1464 .send(&client, Callback(callback_tx.bind()))
1465 .unwrap();
1466 let msg_port_ref = callback_rx.recv().await.unwrap();
1467 msg_port_ref.send(&client, "finally".to_string()).unwrap();
1470
1471 let session_id = client.sequencer().session_id();
1472 assert_eq!(
1474 rx.recv().await.unwrap(),
1475 (
1476 "finally".to_string(),
1477 SeqInfo::Session { session_id, seq: 1 }
1478 )
1479 );
1480 }
1481
1482 #[derive(Clone, Debug)]
1485 struct DelayedMailboxSender {
1486 relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1487 }
1488
1489 impl DelayedMailboxSender {
1490 fn new(
1492 dest_proc: Proc,
1495 relay_orders: Vec<usize>,
1499 ) -> Self {
1500 let (relay_tx, mut relay_rx) = mpsc::unbounded_channel::<MessageEnvelope>();
1501
1502 tokio::spawn(async move {
1503 let mut buffer = Vec::new();
1504
1505 for _ in 0..relay_orders.len() {
1506 let envelope = relay_rx.recv().await.unwrap();
1507 buffer.push(envelope);
1508 }
1509
1510 for m in buffer.clone() {
1511 let seq = match m.headers().get(SEQ_INFO).expect("seq should be set") {
1512 SeqInfo::Session { seq, .. } => seq as usize,
1513 SeqInfo::Direct => panic!("expected Session variant"),
1514 };
1515 let order = relay_orders[seq - 1];
1517 buffer[order] = m;
1518 }
1519
1520 let dest_proc_clone = dest_proc.clone();
1521 for msg in buffer {
1522 dest_proc_clone.post(msg, monitored_return_handle());
1523 }
1524 });
1525
1526 Self { relay_tx }
1527 }
1528 }
1529
1530 impl MailboxSender for DelayedMailboxSender {
1531 fn post_unchecked(
1532 &self,
1533 envelope: MessageEnvelope,
1534 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1535 ) {
1536 self.relay_tx.send(envelope).unwrap();
1537 }
1538 }
1539
1540 async fn assert_out_of_order_delivery(expected: Vec<(String, u64)>, relay_orders: Vec<usize>) {
1541 let local_proc: Proc = Proc::local();
1542 let (client, _) = local_proc.instance("local").unwrap();
1543 let (tx, mut rx) = client.open_port();
1544
1545 let handle = local_proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1546 let actor_ref: reference::ActorRef<GetSeqActor> = handle.bind();
1547
1548 let remote_proc = Proc::configured(
1549 test_proc_id("remote_0"),
1550 DelayedMailboxSender::new(local_proc.clone(), relay_orders).boxed(),
1551 );
1552 let (remote_client, _) = remote_proc.instance("remote").unwrap();
1553 let mut messages = expected.clone();
1555 messages.sort_by_key(|v| v.1);
1556 for (message, _seq) in messages {
1557 actor_ref.send(&remote_client, message).unwrap();
1558 }
1559 let session_id = remote_client.sequencer().session_id();
1560 for expect in expected {
1561 let expected = (
1562 expect.0,
1563 SeqInfo::Session {
1564 session_id,
1565 seq: expect.1,
1566 },
1567 );
1568 assert_eq!(rx.recv().await.unwrap(), expected);
1569 }
1570
1571 handle.drain_and_stop("test cleanup").unwrap();
1572 handle.await;
1573 }
1574
1575 #[async_timed_test(timeout_secs = 30)]
1580 async fn test_sequencing_actor_ref_known_delivery_order() {
1581 let config = hyperactor_config::global::lock();
1582
1583 let relay_orders = vec![2, 0, 1];
1585
1586 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
1589 assert_out_of_order_delivery(
1590 vec![
1591 ("second".to_string(), 2),
1592 ("third".to_string(), 3),
1593 ("first".to_string(), 1),
1594 ],
1595 relay_orders.clone(),
1596 )
1597 .await;
1598
1599 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1602 assert_out_of_order_delivery(
1603 vec![
1604 ("first".to_string(), 1),
1605 ("second".to_string(), 2),
1606 ("third".to_string(), 3),
1607 ],
1608 relay_orders.clone(),
1609 )
1610 .await;
1611 }
1612
1613 #[async_timed_test(timeout_secs = 30)]
1618 async fn test_sequencing_actor_ref_random_delivery_order() {
1619 let config = hyperactor_config::global::lock();
1620
1621 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1624 let expected = (0..10000)
1625 .map(|i| (format!("msg{i}"), i + 1))
1626 .collect::<Vec<_>>();
1627
1628 let mut relay_orders: Vec<usize> = (0..10000).collect();
1629 relay_orders.shuffle(&mut rand::thread_rng());
1630 assert_out_of_order_delivery(expected, relay_orders).await;
1631 }
1632
1633 #[tokio::test]
1651 async fn test_introspect_query_default_payload() {
1652 let proc = Proc::local();
1653 let (client, _) = proc.instance("client").unwrap();
1654 let (tx, _rx) = client.open_port::<u64>();
1655 let actor = EchoActor(tx.bind());
1656 let handle = proc.spawn::<EchoActor>("echo_introspect", actor).unwrap();
1657
1658 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1659 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1660 .send(
1661 &client,
1662 IntrospectMessage::Query {
1663 view: IntrospectView::Actor,
1664 reply: reply_port.bind(),
1665 },
1666 )
1667 .unwrap();
1668 let payload = reply_rx.recv().await.unwrap();
1669
1670 assert_eq!(payload.identity, handle.actor_id().to_string());
1671 assert_valid_attrs(&payload);
1672 assert_has_attr(&payload, "status");
1673 assert_has_attr(&payload, "actor_type");
1674 assert_has_attr(&payload, "created_at");
1675 assert!(payload.children.is_empty());
1676 assert!(payload.parent.is_none());
1677
1678 handle.drain_and_stop("test").unwrap();
1679 handle.await;
1680 }
1681
1682 fn attrs_get(attrs_json: &str, short_name: &str) -> Option<serde_json::Value> {
1684 use hyperactor_config::INTROSPECT;
1685 use hyperactor_config::attrs::AttrKeyInfo;
1686 let fq_name = inventory::iter::<AttrKeyInfo>()
1687 .find(|info| {
1688 info.meta
1689 .get(INTROSPECT)
1690 .is_some_and(|ia| ia.name == short_name)
1691 })
1692 .map(|info| info.name)?;
1693 let obj: serde_json::Value = serde_json::from_str(attrs_json).ok()?;
1694 obj.get(fq_name).cloned()
1695 }
1696
1697 fn assert_valid_attrs(result: &IntrospectResult) {
1699 let parsed: serde_json::Value =
1700 serde_json::from_str(&result.attrs).expect("IA-1: attrs must be valid JSON");
1701 assert!(parsed.is_object(), "IA-1: attrs must be a JSON object");
1702 }
1703
1704 fn assert_status(result: &IntrospectResult, expected: &str) {
1706 let status = attrs_get(&result.attrs, "status")
1707 .and_then(|v| v.as_str().map(String::from))
1708 .expect("attrs must contain status");
1709 assert_eq!(status, expected, "unexpected actor status");
1710 }
1711
1712 fn assert_handler(result: &IntrospectResult, expected: Option<&str>) {
1714 let handler =
1715 attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from));
1716 assert_eq!(handler.as_deref(), expected);
1717 }
1718
1719 fn assert_error_code(result: &IntrospectResult, expected: &str) {
1721 let code = attrs_get(&result.attrs, "error_code")
1722 .and_then(|v| v.as_str().map(String::from))
1723 .expect("attrs must contain error_code");
1724 assert_eq!(code, expected);
1725 }
1726
1727 fn assert_handler_not_contains(result: &IntrospectResult, forbidden: &str) {
1729 if let Some(handler) =
1730 attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from))
1731 {
1732 assert!(
1733 !handler.contains(forbidden),
1734 "handler should not contain '{}'; got: {}",
1735 forbidden,
1736 handler
1737 );
1738 }
1739 }
1740
1741 fn assert_has_attr(result: &IntrospectResult, short_name: &str) {
1743 assert!(
1744 attrs_get(&result.attrs, short_name).is_some(),
1745 "attrs must contain '{}'",
1746 short_name
1747 );
1748 }
1749
1750 fn assert_status_contains(result: &IntrospectResult, substring: &str) {
1753 let status = attrs_get(&result.attrs, "status")
1754 .and_then(|v| v.as_str().map(String::from))
1755 .expect("attrs must contain status");
1756 assert!(
1757 status.contains(substring),
1758 "status should contain '{}'; got: {}",
1759 substring,
1760 status
1761 );
1762 }
1763
1764 fn assert_no_status_reason(result: &IntrospectResult) {
1766 assert!(
1767 attrs_get(&result.attrs, "status_reason").is_none(),
1768 "IA-3: must not have status_reason"
1769 );
1770 }
1771
1772 fn assert_has_handler(result: &IntrospectResult) {
1774 assert!(
1775 attrs_get(&result.attrs, "last_handler").is_some(),
1776 "must have a handler"
1777 );
1778 }
1779
1780 fn assert_no_failure_attrs(result: &IntrospectResult) {
1782 assert!(
1783 attrs_get(&result.attrs, "failure_error_message").is_none(),
1784 "IA-4: must not have failure attrs"
1785 );
1786 }
1787
1788 #[tokio::test]
1793 async fn test_ia1_ia4_running_actor_attrs() {
1794 let proc = Proc::local();
1795 let (client, _) = proc.instance("client").unwrap();
1796 let (tx, _rx) = client.open_port::<u64>();
1797 let actor = EchoActor(tx.bind());
1798 let handle = proc.spawn::<EchoActor>("ia_test", actor).unwrap();
1799
1800 let payload = crate::introspect::live_actor_payload(handle.cell());
1801
1802 assert_valid_attrs(&payload);
1804
1805 assert_has_attr(&payload, "status");
1807 assert_no_status_reason(&payload);
1808
1809 assert_no_failure_attrs(&payload);
1811
1812 handle.drain_and_stop("test").unwrap();
1813 handle.await;
1814 }
1815
1816 #[tokio::test]
1822 async fn test_introspect_query_child_not_found() {
1823 let proc = Proc::local();
1824 let (client, _) = proc.instance("client").unwrap();
1825 let (tx, _rx) = client.open_port::<u64>();
1826 let actor = EchoActor(tx.bind());
1827 let handle = proc.spawn::<EchoActor>("echo_qc", actor).unwrap();
1828
1829 let child_ref =
1830 reference::Reference::Actor(test_proc_id("nonexistent").actor_id("child", 0));
1831 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1832 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1833 .send(
1834 &client,
1835 IntrospectMessage::QueryChild {
1836 child_ref,
1837 reply: reply_port.bind(),
1838 },
1839 )
1840 .unwrap();
1841 let payload = reply_rx.recv().await.unwrap();
1842
1843 assert!(payload.identity.is_empty());
1844 assert_error_code(&payload, "not_found");
1845
1846 handle.drain_and_stop("test").unwrap();
1847 handle.await;
1848 }
1849
1850 #[tokio::test]
1856 async fn test_introspect_override() {
1857 #[derive(Debug, Default)]
1858 #[hyperactor::export(handlers = [])]
1859 struct CustomIntrospectActor;
1860
1861 #[async_trait]
1862 impl Actor for CustomIntrospectActor {}
1863
1864 let proc = Proc::local();
1865 let (client, _) = proc.instance("client").unwrap();
1866 let handle = proc
1867 .spawn("custom_introspect", CustomIntrospectActor)
1868 .unwrap();
1869
1870 handle
1871 .status()
1872 .wait_for(|s| matches!(s, ActorStatus::Idle))
1873 .await
1874 .unwrap();
1875
1876 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1877 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1878 .send(
1879 &client,
1880 IntrospectMessage::Query {
1881 view: IntrospectView::Actor,
1882 reply: reply_port.bind(),
1883 },
1884 )
1885 .unwrap();
1886 let payload = reply_rx.recv().await.unwrap();
1887
1888 assert_has_attr(&payload, "status");
1891
1892 handle.drain_and_stop("test").unwrap();
1893 handle.await;
1894 }
1895
1896 #[tokio::test]
1900 async fn test_introspect_query_supervision_child() {
1901 let proc = Proc::local();
1902 let (client, _) = proc.instance("client").unwrap();
1903
1904 let (tx_parent, _rx_parent) = client.open_port::<u64>();
1906 let parent_handle = proc
1907 .spawn::<EchoActor>("parent", EchoActor(tx_parent.bind()))
1908 .unwrap();
1909
1910 let (tx_child, _rx_child) = client.open_port::<u64>();
1912 let child_handle = proc
1913 .spawn_child::<EchoActor>(parent_handle.cell().clone(), EchoActor(tx_child.bind()))
1914 .unwrap();
1915
1916 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1918 reference::PortRef::<IntrospectMessage>::attest_message_port(child_handle.actor_id())
1919 .send(
1920 &client,
1921 IntrospectMessage::Query {
1922 view: IntrospectView::Actor,
1923 reply: reply_port.bind(),
1924 },
1925 )
1926 .unwrap();
1927 let child_payload = reply_rx.recv().await.unwrap();
1928
1929 assert_eq!(child_payload.identity, child_handle.actor_id().to_string(),);
1930 assert!(
1932 attrs_get(&child_payload.attrs, "status").is_some(),
1933 "child should have actor attrs"
1934 );
1935 assert_eq!(
1936 child_payload.parent,
1937 Some(parent_handle.actor_id().to_string()),
1938 );
1939
1940 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1942 reference::PortRef::<IntrospectMessage>::attest_message_port(parent_handle.actor_id())
1943 .send(
1944 &client,
1945 IntrospectMessage::Query {
1946 view: IntrospectView::Actor,
1947 reply: reply_port.bind(),
1948 },
1949 )
1950 .unwrap();
1951 let parent_payload = reply_rx.recv().await.unwrap();
1952
1953 assert!(parent_payload.parent.is_none());
1954 assert!(
1955 parent_payload
1956 .children
1957 .contains(&child_handle.actor_id().to_string()),
1958 );
1959
1960 child_handle.drain_and_stop("test").unwrap();
1961 child_handle.await;
1962 parent_handle.drain_and_stop("test").unwrap();
1963 parent_handle.await;
1964 }
1965
1966 #[tokio::test]
1971 async fn test_introspect_fresh_actor_status() {
1972 let proc = Proc::local();
1973 let (client, _) = proc.instance("client").unwrap();
1974 let (tx, _rx) = client.open_port::<u64>();
1975 let actor = EchoActor(tx.bind());
1976 let handle = proc.spawn::<EchoActor>("echo_fresh", actor).unwrap();
1977
1978 handle
1980 .status()
1981 .wait_for(|s| matches!(s, ActorStatus::Idle))
1982 .await
1983 .unwrap();
1984
1985 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1986 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1987 .send(
1988 &client,
1989 IntrospectMessage::Query {
1990 view: IntrospectView::Actor,
1991 reply: reply_port.bind(),
1992 },
1993 )
1994 .unwrap();
1995 let payload = reply_rx.recv().await.unwrap();
1996
1997 assert_status(&payload, "idle");
1998 assert_handler(&payload, None);
1999
2000 handle.drain_and_stop("test").unwrap();
2001 handle.await;
2002 }
2003
2004 #[tokio::test]
2009 async fn test_introspect_after_user_message() {
2010 let proc = Proc::local();
2011 let (client, _) = proc.instance("client").unwrap();
2012 let (tx, mut rx) = client.open_port::<u64>();
2013 let actor = EchoActor(tx.bind());
2014 let handle = proc.spawn::<EchoActor>("echo_after_msg", actor).unwrap();
2015
2016 handle.send(&client, 42u64).unwrap();
2018 let _ = rx.recv().await.unwrap();
2019
2020 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2021 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2022 .send(
2023 &client,
2024 IntrospectMessage::Query {
2025 view: IntrospectView::Actor,
2026 reply: reply_port.bind(),
2027 },
2028 )
2029 .unwrap();
2030 let payload = reply_rx.recv().await.unwrap();
2031
2032 assert_status(&payload, "idle");
2033 assert_has_handler(&payload);
2034 assert_handler_not_contains(&payload, "IntrospectMessage");
2035
2036 handle.drain_and_stop("test").unwrap();
2037 handle.await;
2038 }
2039
2040 #[tokio::test]
2045 async fn test_introspect_consecutive_queries() {
2046 let proc = Proc::local();
2047 let (client, _) = proc.instance("client").unwrap();
2048 let (tx, _rx) = client.open_port::<u64>();
2049 let actor = EchoActor(tx.bind());
2050 let handle = proc.spawn::<EchoActor>("echo_consec", actor).unwrap();
2051
2052 handle
2053 .status()
2054 .wait_for(|s| matches!(s, ActorStatus::Idle))
2055 .await
2056 .unwrap();
2057
2058 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2060 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2061 .send(
2062 &client,
2063 IntrospectMessage::Query {
2064 view: IntrospectView::Actor,
2065 reply: reply_port.bind(),
2066 },
2067 )
2068 .unwrap();
2069 let payload1 = reply_rx.recv().await.unwrap();
2070
2071 let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2073 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2074 .send(
2075 &client,
2076 IntrospectMessage::Query {
2077 view: IntrospectView::Actor,
2078 reply: reply_port2.bind(),
2079 },
2080 )
2081 .unwrap();
2082 let payload2 = reply_rx2.recv().await.unwrap();
2083
2084 assert_handler(&payload1, None);
2086 assert_handler(&payload2, None);
2087
2088 handle.drain_and_stop("test").unwrap();
2089 handle.await;
2090 }
2091
2092 #[tokio::test]
2100 async fn test_publish_attrs_round_trip() {
2101 use hyperactor_config::Attrs;
2102 use hyperactor_config::declare_attrs;
2103
2104 declare_attrs! {
2105 attr TEST_KEY_A: String;
2106 attr TEST_KEY_B: u64;
2107 }
2108
2109 let proc = Proc::local();
2110 let (client, _) = proc.instance("client").unwrap();
2111 let (tx, _rx) = client.open_port::<u64>();
2112 let actor = EchoActor(tx.bind());
2113 let handle = proc.spawn::<EchoActor>("echo_attrs", actor).unwrap();
2114
2115 assert!(handle.cell().published_attrs().is_none());
2117
2118 let mut attrs = Attrs::new();
2120 attrs.set(TEST_KEY_A, "hello".to_string());
2121 handle.cell().set_published_attrs(attrs);
2122 let published = handle.cell().published_attrs().unwrap();
2123 assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2124
2125 handle.cell().merge_published_attr(TEST_KEY_B, 42u64);
2127 let published = handle.cell().published_attrs().unwrap();
2128 assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2129 assert_eq!(published.get(TEST_KEY_B), Some(&42u64));
2130
2131 handle
2133 .cell()
2134 .merge_published_attr(TEST_KEY_A, "world".to_string());
2135 let published = handle.cell().published_attrs().unwrap();
2136 assert_eq!(published.get(TEST_KEY_A), Some(&"world".to_string()));
2137
2138 handle.drain_and_stop("test").unwrap();
2139 handle.await;
2140 }
2141
2142 #[tokio::test]
2145 async fn test_query_child_handler_round_trip() {
2146 let proc = Proc::local();
2147 let (client, _) = proc.instance("client").unwrap();
2148 let (tx, _rx) = client.open_port::<u64>();
2149 let actor = EchoActor(tx.bind());
2150 let handle = proc.spawn::<EchoActor>("echo_qch", actor).unwrap();
2151
2152 let test_ref = reference::Reference::Actor(test_proc_id("test").actor_id("child", 0));
2154 assert!(handle.cell().query_child(&test_ref).is_none());
2155
2156 handle
2158 .cell()
2159 .set_query_child_handler(|child_ref| IntrospectResult {
2160 identity: child_ref.to_string(),
2161 attrs: serde_json::json!({
2162 "proc_name": "test_proc",
2163 "num_actors": 42,
2164 })
2165 .to_string(),
2166 children: Vec::new(),
2167 parent: None,
2168 as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now()).to_string(),
2169 });
2170
2171 let payload = handle
2173 .cell()
2174 .query_child(&test_ref)
2175 .expect("callback should produce a payload");
2176 assert_eq!(payload.identity, test_ref.to_string());
2177 let attrs: serde_json::Value =
2178 serde_json::from_str(&payload.attrs).expect("attrs must be valid JSON");
2179 assert_eq!(
2180 attrs.get("proc_name").and_then(|v| v.as_str()),
2181 Some("test_proc")
2182 );
2183 assert_eq!(attrs.get("num_actors").and_then(|v| v.as_u64()), Some(42));
2184
2185 handle.drain_and_stop("test").unwrap();
2186 handle.await;
2187 }
2188
2189 #[tokio::test]
2195 async fn test_introspect_wedged() {
2196 #[derive(Debug, Default)]
2197 #[hyperactor::export(handlers = [u64])]
2198 struct WedgedActor;
2199
2200 #[async_trait]
2201 impl Actor for WedgedActor {}
2202
2203 #[async_trait]
2204 impl Handler<u64> for WedgedActor {
2205 async fn handle(
2206 &mut self,
2207 _cx: &Context<Self>,
2208 _message: u64,
2209 ) -> Result<(), anyhow::Error> {
2210 std::future::pending::<()>().await;
2212 Ok(())
2213 }
2214 }
2215
2216 let proc = Proc::local();
2217 let (client, _) = proc.instance("client").unwrap();
2218 let handle = proc.spawn("wedged", WedgedActor).unwrap();
2219
2220 handle
2222 .status()
2223 .wait_for(|s| matches!(s, ActorStatus::Idle))
2224 .await
2225 .unwrap();
2226
2227 handle.send(&client, 1u64).unwrap();
2229
2230 tokio::time::sleep(Duration::from_millis(50)).await;
2232
2233 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2235 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2236 .send(
2237 &client,
2238 IntrospectMessage::Query {
2239 view: IntrospectView::Actor,
2240 reply: reply_port.bind(),
2241 },
2242 )
2243 .unwrap();
2244
2245 let payload = tokio::time::timeout(Duration::from_secs(5), reply_rx.recv())
2247 .await
2248 .expect("introspect should not hang on a wedged actor")
2249 .unwrap();
2250
2251 assert_status_contains(&payload, "processing");
2252 assert_handler_not_contains(&payload, "IntrospectMessage");
2253 }
2254
2255 #[tokio::test]
2260 async fn test_introspect_no_perturbation() {
2261 let proc = Proc::local();
2262 let (client, _) = proc.instance("client").unwrap();
2263 let (tx, mut rx) = client.open_port::<u64>();
2264 let actor = EchoActor(tx.bind());
2265 let handle = proc.spawn::<EchoActor>("echo_no_perturb", actor).unwrap();
2266
2267 handle
2269 .status()
2270 .wait_for(|s| matches!(s, ActorStatus::Idle))
2271 .await
2272 .unwrap();
2273
2274 handle.send(&client, 42u64).unwrap();
2276 let _ = rx.recv().await.unwrap();
2277
2278 let (reply_port1, reply_rx1) = client.open_once_port::<IntrospectResult>();
2280 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2281 .send(
2282 &client,
2283 IntrospectMessage::Query {
2284 view: IntrospectView::Actor,
2285 reply: reply_port1.bind(),
2286 },
2287 )
2288 .unwrap();
2289 let payload1 = reply_rx1.recv().await.unwrap();
2290
2291 let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2293 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2294 .send(
2295 &client,
2296 IntrospectMessage::Query {
2297 view: IntrospectView::Actor,
2298 reply: reply_port2.bind(),
2299 },
2300 )
2301 .unwrap();
2302 let payload2 = reply_rx2.recv().await.unwrap();
2303
2304 assert_handler_not_contains(&payload1, "IntrospectMessage");
2306 assert_handler_not_contains(&payload2, "IntrospectMessage");
2307 let attrs1: serde_json::Value = serde_json::from_str(&payload1.attrs).unwrap();
2310 let attrs2: serde_json::Value = serde_json::from_str(&payload2.attrs).unwrap();
2311 assert_eq!(attrs1, attrs2, "consecutive queries should be identical");
2312
2313 handle.drain_and_stop("test").unwrap();
2314 handle.await;
2315 }
2316
2317 #[tokio::test]
2324 async fn test_introspectable_instance_responds_to_query() {
2325 let proc = Proc::local();
2326 let (bridge, handle) = proc.introspectable_instance("bridge").unwrap();
2327 let actor_id = handle.actor_id().clone();
2328
2329 let (reply_port, reply_rx) = bridge.open_once_port::<IntrospectResult>();
2330 reference::PortRef::<IntrospectMessage>::attest_message_port(&actor_id)
2331 .send(
2332 &bridge,
2333 IntrospectMessage::Query {
2334 view: IntrospectView::Actor,
2335 reply: reply_port.bind(),
2336 },
2337 )
2338 .unwrap();
2339 let payload = reply_rx.recv().await.unwrap();
2340
2341 assert_eq!(payload.identity, actor_id.to_string());
2344 assert_status(&payload, "client");
2345 let actor_type = attrs_get(&payload.attrs, "actor_type")
2346 .and_then(|v| v.as_str().map(String::from))
2347 .expect("must have actor_type");
2348 assert_eq!(actor_type, "()", "CI-1: actor_type must be \"()\"");
2349 }
2350
2351 #[tokio::test]
2359 async fn test_instance_does_not_respond_to_query() {
2360 let proc = Proc::local();
2361 let (client, _client_handle) = proc.instance("client").unwrap();
2362 let (_mailbox, mailbox_handle) = proc.instance("mailbox").unwrap();
2363 let mailbox_id = mailbox_handle.actor_id().clone();
2364
2365 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2366 reference::PortRef::<IntrospectMessage>::attest_message_port(&mailbox_id)
2367 .send(
2368 &client,
2369 IntrospectMessage::Query {
2370 view: IntrospectView::Actor,
2371 reply: reply_port.bind(),
2372 },
2373 )
2374 .unwrap();
2375
2376 let result = tokio::time::timeout(Duration::from_millis(100), reply_rx.recv()).await;
2379 assert!(
2380 result.is_err(),
2381 "instance() must not respond to IntrospectMessage (introspect receiver dropped)"
2382 );
2383 }
2384
2385 #[tokio::test]
2390 async fn test_introspectable_instance_snapshot_on_drop() {
2391 let proc = Proc::local();
2392 let (instance, handle) = proc.introspectable_instance("bridge").unwrap();
2393 let actor_id = handle.actor_id().clone();
2394
2395 assert!(
2396 proc.all_actor_ids().contains(&actor_id),
2397 "should appear in all_actor_ids while live"
2398 );
2399
2400 drop(instance);
2403
2404 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2405 loop {
2406 if proc.terminated_snapshot(&actor_id).is_some() {
2407 break;
2408 }
2409 assert!(
2410 std::time::Instant::now() < deadline,
2411 "timed out waiting for terminated snapshot"
2412 );
2413 tokio::task::yield_now().await;
2414 }
2415
2416 let snapshot = proc.terminated_snapshot(&actor_id).unwrap();
2417 let actor_status = attrs_get(&snapshot.attrs, "status")
2418 .and_then(|v| v.as_str().map(String::from))
2419 .expect("snapshot attrs must contain status");
2420 assert!(
2421 actor_status.starts_with("stopped"),
2422 "CI-2: snapshot actor_status should be stopped, got: {}",
2423 actor_status
2424 );
2425 }
2426}