1#![allow(dead_code)] use std::any::TypeId;
14use std::borrow::Cow;
15use std::fmt;
16use std::fmt::Debug;
17use std::future::Future;
18use std::future::IntoFuture;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use hyperactor_config::Flattrs;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::watch;
31use tokio::task::JoinHandle;
32use typeuri::Named;
33
34use crate as hyperactor; use crate::Data;
36use crate::Message;
37use crate::RemoteMessage;
38use crate::context;
39use crate::mailbox::MailboxError;
40use crate::mailbox::MailboxSenderError;
41use crate::mailbox::MessageEnvelope;
42use crate::mailbox::PortHandle;
43use crate::mailbox::Undeliverable;
44use crate::mailbox::UndeliverableMessageError;
45use crate::message::Castable;
46use crate::message::IndexedErasedUnbound;
47use crate::proc::Context;
48use crate::proc::Instance;
49use crate::proc::InstanceCell;
50use crate::proc::Ports;
51use crate::proc::Proc;
52use crate::reference;
53use crate::supervision::ActorSupervisionEvent;
54
55pub mod remote;
56
57#[async_trait]
65pub trait Actor: Sized + Send + 'static {
66 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
71 Ok(())
73 }
74
75 async fn cleanup(
86 &mut self,
87 _this: &Instance<Self>,
88 _err: Option<&ActorError>,
89 ) -> Result<(), anyhow::Error> {
90 Ok(())
92 }
93
94 fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
97 cx.instance().spawn(self)
98 }
99
100 fn spawn_with_name(
103 self,
104 cx: &impl context::Actor,
105 name: &str,
106 ) -> anyhow::Result<ActorHandle<Self>> {
107 cx.instance().spawn_with_name(name, self)
108 }
109
110 fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
117 Proc::local().spawn("anon", self)
118 }
119
120 #[hyperactor::instrument_infallible]
124 fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
125 where
126 F: Future + Send + 'static,
127 F::Output: Send + 'static,
128 {
129 tokio::spawn(future)
130 }
131
132 async fn handle_supervision_event(
134 &mut self,
135 _this: &Instance<Self>,
136 event: &ActorSupervisionEvent,
137 ) -> Result<bool, anyhow::Error> {
138 Ok(!event.is_error())
141 }
142
143 async fn handle_undeliverable_message(
145 &mut self,
146 cx: &Instance<Self>,
147 envelope: Undeliverable<MessageEnvelope>,
148 ) -> Result<(), anyhow::Error> {
149 handle_undeliverable_message(cx, envelope)
150 }
151
152 fn display_name(&self) -> Option<String> {
156 None
157 }
158}
159
160pub fn handle_undeliverable_message<A: Actor>(
164 cx: &Instance<A>,
165 Undeliverable(envelope): Undeliverable<MessageEnvelope>,
166) -> Result<(), anyhow::Error> {
167 assert_eq!(envelope.sender(), cx.self_id());
168
169 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
170}
171
172#[async_trait]
175impl Actor for () {}
176
177impl Referable for () {}
178
179impl Binds<()> for () {
180 fn bind(_ports: &Ports<Self>) {
181 }
183}
184
185#[async_trait]
187pub trait Handler<M>: Actor {
188 async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
190}
191
192#[async_trait]
195impl<A: Actor> Handler<Signal> for A {
196 async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
197 unimplemented!("signal handler should not be called directly")
198 }
199}
200
201#[async_trait]
204impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
205 async fn handle(
206 &mut self,
207 cx: &Context<Self>,
208 message: Undeliverable<MessageEnvelope>,
209 ) -> Result<(), anyhow::Error> {
210 let sender = message.0.sender().clone();
211 let dest = message.0.dest().clone();
212 let error = message.0.error_msg().unwrap_or(String::new());
213 match self.handle_undeliverable_message(cx, message).await {
214 Ok(_) => {
215 tracing::debug!(
216 actor_id = %cx.self_id(),
217 name = "undeliverable_message_handled",
218 %sender,
219 %dest,
220 error,
221 );
222 Ok(())
223 }
224 Err(e) => {
225 tracing::error!(
226 actor_id = %cx.self_id(),
227 name = "undeliverable_message",
228 %sender,
229 %dest,
230 error,
231 handler_error = %e,
232 );
233 Err(e)
234 }
235 }
236 }
237}
238
239#[async_trait]
242impl<A, M> Handler<IndexedErasedUnbound<M>> for A
243where
244 A: Handler<M>,
245 M: Castable,
246{
247 async fn handle(
248 &mut self,
249 cx: &Context<Self>,
250 msg: IndexedErasedUnbound<M>,
251 ) -> anyhow::Result<()> {
252 let message = msg.downcast()?.bind()?;
253 Handler::handle(self, cx, message).await
254 }
255}
256
257#[async_trait]
273pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
274 type Params: RemoteMessage;
276
277 async fn new(params: Self::Params, environment: Flattrs) -> anyhow::Result<Self>;
281
282 fn gspawn(
287 proc: &Proc,
288 name: &str,
289 serialized_params: Data,
290 environment: Flattrs,
291 ) -> Pin<Box<dyn Future<Output = Result<reference::ActorId, anyhow::Error>> + Send>> {
292 let proc = proc.clone();
293 let name = name.to_string();
294 Box::pin(async move {
295 let params = bincode::deserialize(&serialized_params)?;
296 let actor = Self::new(params, environment).await?;
297 let handle = proc.spawn(&name, actor)?;
298 Ok(handle.bind::<Self>().actor_id)
310 })
311 }
312
313 fn get_type_id() -> TypeId {
315 TypeId::of::<Self>()
316 }
317}
318
319#[async_trait]
322impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
323 type Params = ();
324
325 async fn new(_params: Self::Params, _environment: Flattrs) -> anyhow::Result<Self> {
326 Ok(Default::default())
327 }
328}
329
330#[derive(Debug)]
333pub struct ActorError {
334 pub actor_id: Box<reference::ActorId>,
336 pub kind: Box<ActorErrorKind>,
338}
339
340#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
342pub enum ActorErrorKind {
343 #[error("{0}")]
345 Generic(String),
346
347 #[error("{0} while handling {1}")]
349 ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
350
351 #[error("{0}")]
353 UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
354
355 #[error("actor explicitly aborted due to: {0}")]
357 Aborted(String),
358}
359
360impl ActorErrorKind {
361 pub fn processing(err: anyhow::Error) -> Self {
364 err.downcast::<ActorErrorKind>()
369 .unwrap_or_else(|err| Self::Generic(err.to_string()))
370 }
371
372 pub fn panic(err: anyhow::Error) -> Self {
374 Self::Generic(format!("panic: {}", err))
375 }
376
377 pub fn init(err: anyhow::Error) -> Self {
379 Self::Generic(format!("initialization error: {}", err))
380 }
381
382 pub fn cleanup(err: anyhow::Error) -> Self {
384 Self::Generic(format!("cleanup error: {}", err))
385 }
386
387 pub fn mailbox(err: MailboxError) -> Self {
389 Self::Generic(err.to_string())
390 }
391
392 pub fn mailbox_sender(err: MailboxSenderError) -> Self {
394 Self::Generic(err.to_string())
395 }
396
397 pub fn indeterminate_state() -> Self {
399 Self::Generic("actor is in an indeterminate state".to_string())
400 }
401}
402
403impl ActorError {
404 pub(crate) fn new(actor_id: &reference::ActorId, kind: ActorErrorKind) -> Self {
406 Self {
407 actor_id: Box::new(actor_id.clone()),
408 kind: Box::new(kind),
409 }
410 }
411}
412
413impl fmt::Display for ActorError {
414 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415 write!(f, "serving {}: ", self.actor_id)?;
416 fmt::Display::fmt(&self.kind, f)
417 }
418}
419
420impl std::error::Error for ActorError {
421 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
422 self.kind.source()
423 }
424}
425
426impl From<MailboxError> for ActorError {
427 fn from(inner: MailboxError) -> Self {
428 Self {
429 actor_id: Box::new(inner.actor_id().clone()),
430 kind: Box::new(ActorErrorKind::mailbox(inner)),
431 }
432 }
433}
434
435impl From<MailboxSenderError> for ActorError {
436 fn from(inner: MailboxSenderError) -> Self {
437 Self {
438 actor_id: Box::new(inner.location().actor_id().clone()),
439 kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
440 }
441 }
442}
443
444#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
451pub enum Signal {
452 DrainAndStop(String),
454
455 Stop(String),
457
458 ChildStopped(reference::Index),
460
461 Abort(String),
465}
466wirevalue::register_type!(Signal);
467
468impl fmt::Display for Signal {
469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470 match self {
471 Signal::DrainAndStop(reason) => write!(f, "DrainAndStop({})", reason),
472 Signal::Stop(reason) => write!(f, "Stop({})", reason),
473 Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
474 Signal::Abort(reason) => write!(f, "Abort({})", reason),
475 }
476 }
477}
478
479#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
484pub struct HandlerInfo {
485 pub typename: Cow<'static, str>,
487 pub arm: Option<Cow<'static, str>>,
489}
490
491impl HandlerInfo {
492 pub fn from_static(typename: &'static str, arm: Option<&'static str>) -> Self {
494 Self {
495 typename: Cow::Borrowed(typename),
496 arm: arm.map(Cow::Borrowed),
497 }
498 }
499
500 pub fn from_owned(typename: String, arm: Option<String>) -> Self {
502 Self {
503 typename: Cow::Owned(typename),
504 arm: arm.map(Cow::Owned),
505 }
506 }
507}
508
509impl fmt::Display for HandlerInfo {
510 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
511 match &self.arm {
512 Some(arm) => write!(f, "{}.{}", self.typename, arm),
513 None => write!(f, "{}", self.typename),
514 }
515 }
516}
517
518#[derive(
520 Debug,
521 Serialize,
522 Deserialize,
523 PartialEq,
524 Eq,
525 Clone,
526 typeuri::Named,
527 EnumAsInner
528)]
529pub enum ActorStatus {
530 Unknown,
532 Created,
534 Initializing,
536 Client,
539 Idle,
541 Processing(SystemTime, Option<HandlerInfo>),
544 Saving(SystemTime),
546 Loading(SystemTime),
548 Stopping,
550 Stopped(String),
553 Failed(ActorErrorKind),
555}
556
557impl ActorStatus {
558 pub fn is_terminal(&self) -> bool {
560 self.is_stopped() || self.is_failed()
561 }
562
563 pub fn generic_failure(message: impl Into<String>) -> Self {
565 Self::Failed(ActorErrorKind::Generic(message.into()))
566 }
567
568 fn span_string(&self) -> &'static str {
569 self.arm().unwrap_or_default()
570 }
571}
572
573impl fmt::Display for ActorStatus {
574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575 match self {
576 Self::Unknown => write!(f, "unknown"),
577 Self::Created => write!(f, "created"),
578 Self::Initializing => write!(f, "initializing"),
579 Self::Client => write!(f, "client"),
580 Self::Idle => write!(f, "idle"),
581 Self::Processing(instant, None) => {
582 write!(
583 f,
584 "processing for {}ms",
585 std::time::SystemTime::now()
586 .duration_since(*instant)
587 .unwrap_or_default()
588 .as_millis()
589 )
590 }
591 Self::Processing(instant, Some(handler_info)) => {
592 write!(
593 f,
594 "{}: processing for {}ms",
595 handler_info,
596 std::time::SystemTime::now()
597 .duration_since(*instant)
598 .unwrap_or_default()
599 .as_millis()
600 )
601 }
602 Self::Saving(instant) => {
603 write!(
604 f,
605 "saving for {}ms",
606 std::time::SystemTime::now()
607 .duration_since(*instant)
608 .unwrap_or_default()
609 .as_millis()
610 )
611 }
612 Self::Loading(instant) => {
613 write!(
614 f,
615 "loading for {}ms",
616 std::time::SystemTime::now()
617 .duration_since(*instant)
618 .unwrap_or_default()
619 .as_millis()
620 )
621 }
622 Self::Stopping => write!(f, "stopping"),
623 Self::Stopped(reason) => write!(f, "stopped: {}", reason),
624 Self::Failed(err) => write!(f, "failed: {}", err),
625 }
626 }
627}
628
629pub struct ActorHandle<A: Actor> {
638 cell: InstanceCell,
639 ports: Arc<Ports<A>>,
640}
641
642impl<A: Actor> ActorHandle<A> {
644 pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
645 Self { cell, ports }
646 }
647
648 pub(crate) fn cell(&self) -> &InstanceCell {
651 &self.cell
652 }
653
654 pub fn actor_id(&self) -> &reference::ActorId {
656 self.cell.actor_id()
657 }
658
659 pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
661 tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
662 self.cell.signal(Signal::DrainAndStop(reason.to_string()))
663 }
664
665 pub fn status(&self) -> watch::Receiver<ActorStatus> {
667 self.cell.status().clone()
668 }
669
670 pub fn send<M: Message>(
673 &self,
674 cx: &impl context::Actor,
675 message: M,
676 ) -> Result<(), MailboxSenderError>
677 where
678 A: Handler<M>,
679 {
680 self.ports.get().send(cx, message)
681 }
682
683 pub fn port<M: Message>(&self) -> PortHandle<M>
685 where
686 A: Handler<M>,
687 {
688 self.ports.get()
689 }
690
691 pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
694 self.cell.bind(self.ports.as_ref())
695 }
696}
697
698impl<A: Actor> IntoFuture for ActorHandle<A> {
702 type Output = ActorStatus;
703 type IntoFuture = BoxFuture<'static, Self::Output>;
704
705 fn into_future(self) -> Self::IntoFuture {
706 let future = async move {
707 let mut status_receiver = self.cell.status().clone();
708 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
709 match result {
710 Err(_) => ActorStatus::Unknown,
711 Ok(status) => status.clone(),
712 }
713 };
714
715 future.boxed()
716 }
717}
718
719impl<A: Actor> Debug for ActorHandle<A> {
720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
721 f.debug_struct("ActorHandle").field("cell", &"..").finish()
722 }
723}
724
725impl<A: Actor> Clone for ActorHandle<A> {
726 fn clone(&self) -> Self {
727 Self {
728 cell: self.cell.clone(),
729 ports: self.ports.clone(),
730 }
731 }
732}
733
734pub trait Referable: Named {}
749
750pub trait Binds<A: Actor>: Referable {
753 fn bind(ports: &Ports<A>);
755}
756
757pub trait RemoteHandles<M: RemoteMessage>: Referable {}
760
761#[macro_export]
792macro_rules! assert_behaves {
793 ($ty:ty as $behavior:ty) => {
794 const _: fn() = || {
795 fn check<B: hyperactor::actor::Binds<$ty>>() {}
796 check::<$behavior>();
797 };
798 };
799}
800
801#[cfg(test)]
802mod tests {
803 use std::sync::Mutex;
804 use std::time::Duration;
805
806 use rand::seq::SliceRandom;
807 use timed_test::async_timed_test;
808 use tokio::sync::mpsc;
809 use tokio::time::timeout;
810
811 use super::*;
812 use crate as hyperactor;
813 use crate::Actor;
814 use crate::OncePortHandle;
815 use crate::config;
816 use crate::context::Mailbox as _;
817 use crate::introspect::IntrospectMessage;
818 use crate::introspect::IntrospectResult;
819 use crate::introspect::IntrospectView;
820 use crate::mailbox::BoxableMailboxSender as _;
821 use crate::mailbox::MailboxSender;
822 use crate::mailbox::PortLocation;
823 use crate::mailbox::monitored_return_handle;
824 use crate::ordering::SEQ_INFO;
825 use crate::ordering::SeqInfo;
826 use crate::testing::ids::test_proc_id;
827 use crate::testing::pingpong::PingPongActor;
828 use crate::testing::pingpong::PingPongMessage;
829 use crate::testing::proc_supervison::ProcSupervisionCoordinator; #[derive(Debug)]
832 struct EchoActor(reference::PortRef<u64>);
833
834 #[async_trait]
835 impl Actor for EchoActor {}
836
837 #[async_trait]
838 impl Handler<u64> for EchoActor {
839 async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
840 let Self(port) = self;
841 port.send(cx, message)?;
842 Ok(())
843 }
844 }
845
846 #[tokio::test]
847 async fn test_server_basic() {
848 let proc = Proc::local();
849 let (client, _) = proc.instance("client").unwrap();
850 let (tx, mut rx) = client.open_port();
851 let actor = EchoActor(tx.bind());
852 let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
853 handle.send(&client, 123u64).unwrap();
854 handle.drain_and_stop("test").unwrap();
855 handle.await;
856
857 assert_eq!(rx.drain(), vec![123u64]);
858 }
859
860 #[tokio::test]
861 async fn test_ping_pong() {
862 let proc = Proc::local();
863 let (client, _) = proc.instance("client").unwrap();
864 let (undeliverable_msg_tx, _) = client.open_port();
865
866 let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
867 let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
868 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
869 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
870
871 let (local_port, local_receiver) = client.open_once_port();
872
873 ping_handle
874 .send(
875 &client,
876 PingPongMessage(10, pong_handle.bind(), local_port.bind()),
877 )
878 .unwrap();
879
880 assert!(local_receiver.recv().await.unwrap());
881 }
882
883 #[tokio::test]
884 async fn test_ping_pong_on_handler_error() {
885 let proc = Proc::local();
886 let (client, _) = proc.instance("client").unwrap();
887 let (undeliverable_msg_tx, _) = client.open_port();
888
889 let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
892
893 let error_ttl = 66;
894
895 let ping_actor =
896 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
897 let pong_actor =
898 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
899 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
900 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
901
902 let (local_port, local_receiver) = client.open_once_port();
903
904 ping_handle
905 .send(
906 &client,
907 PingPongMessage(
908 error_ttl + 1, pong_handle.bind(),
910 local_port.bind(),
911 ),
912 )
913 .unwrap();
914
915 let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
917 timeout(Duration::from_secs(5), local_receiver.recv()).await;
918 assert!(res.is_err());
919 }
920
921 #[derive(Debug)]
922 struct InitActor(bool);
923
924 #[async_trait]
925 impl Actor for InitActor {
926 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
927 self.0 = true;
928 Ok(())
929 }
930 }
931
932 #[async_trait]
933 impl Handler<OncePortHandle<bool>> for InitActor {
934 async fn handle(
935 &mut self,
936 cx: &Context<Self>,
937 port: OncePortHandle<bool>,
938 ) -> Result<(), anyhow::Error> {
939 port.send(cx, self.0)?;
940 Ok(())
941 }
942 }
943
944 #[tokio::test]
945 async fn test_init() {
946 let proc = Proc::local();
947 let actor = InitActor(false);
948 let handle = proc.spawn::<InitActor>("init", actor).unwrap();
949 let (client, _) = proc.instance("client").unwrap();
950
951 let (port, receiver) = client.open_once_port();
952 handle.send(&client, port).unwrap();
953 assert!(receiver.recv().await.unwrap());
954
955 handle.drain_and_stop("test").unwrap();
956 handle.await;
957 }
958
959 type MultiValues = Arc<Mutex<(u64, String)>>;
960
961 struct MultiValuesTest {
962 proc: Proc,
963 values: MultiValues,
964 handle: ActorHandle<MultiActor>,
965 client: Instance<()>,
966 _client_handle: ActorHandle<()>,
967 }
968
969 impl MultiValuesTest {
970 async fn new() -> Self {
971 let proc = Proc::local();
972 let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
973 let actor = MultiActor(values.clone());
974 let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
975 let (client, client_handle) = proc.instance("client").unwrap();
976 Self {
977 proc,
978 values,
979 handle,
980 client,
981 _client_handle: client_handle,
982 }
983 }
984
985 fn send<M>(&self, message: M)
986 where
987 M: RemoteMessage,
988 MultiActor: Handler<M>,
989 {
990 self.handle.send(&self.client, message).unwrap()
991 }
992
993 async fn sync(&self) {
994 let (port, done) = self.client.open_once_port::<bool>();
995 self.handle.send(&self.client, port).unwrap();
996 assert!(done.recv().await.unwrap());
997 }
998
999 fn get_values(&self) -> (u64, String) {
1000 self.values.lock().unwrap().clone()
1001 }
1002 }
1003
1004 #[derive(Debug)]
1005 #[hyperactor::export(handlers = [u64, String])]
1006 struct MultiActor(MultiValues);
1007
1008 #[async_trait]
1009 impl Actor for MultiActor {}
1010
1011 #[async_trait]
1012 impl Handler<u64> for MultiActor {
1013 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1014 let mut vals = self.0.lock().unwrap();
1015 vals.0 = message;
1016 Ok(())
1017 }
1018 }
1019
1020 #[async_trait]
1021 impl Handler<String> for MultiActor {
1022 async fn handle(
1023 &mut self,
1024 _cx: &Context<Self>,
1025 message: String,
1026 ) -> Result<(), anyhow::Error> {
1027 let mut vals = self.0.lock().unwrap();
1028 vals.1 = message;
1029 Ok(())
1030 }
1031 }
1032
1033 #[async_trait]
1034 impl Handler<OncePortHandle<bool>> for MultiActor {
1035 async fn handle(
1036 &mut self,
1037 cx: &Context<Self>,
1038 message: OncePortHandle<bool>,
1039 ) -> Result<(), anyhow::Error> {
1040 message.send(cx, true).unwrap();
1041 Ok(())
1042 }
1043 }
1044
1045 #[tokio::test]
1046 async fn test_multi_handler_refs() {
1047 let test = MultiValuesTest::new().await;
1048
1049 test.send(123u64);
1050 test.send("foo".to_string());
1051 test.sync().await;
1052 assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1053
1054 let myref: reference::ActorRef<MultiActor> = test.handle.bind();
1055
1056 myref.port().send(&test.client, 321u64).unwrap();
1057 test.sync().await;
1058 assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1059
1060 myref.port().send(&test.client, "bar".to_string()).unwrap();
1061 test.sync().await;
1062 assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1063 }
1064
1065 #[tokio::test]
1066 async fn test_ref_behavior() {
1067 let test = MultiValuesTest::new().await;
1068
1069 test.send(123u64);
1070 test.send("foo".to_string());
1071
1072 hyperactor::behavior!(MyActorBehavior, u64, String);
1073
1074 let myref: reference::ActorRef<MyActorBehavior> = test.handle.bind();
1075 myref.port().send(&test.client, "biz".to_string()).unwrap();
1076 myref.port().send(&test.client, 999u64).unwrap();
1077
1078 test.sync().await;
1079 assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1080 }
1081
1082 #[tokio::test]
1083 async fn test_actor_handle_downcast() {
1084 #[derive(Debug, Default)]
1085 struct NothingActor;
1086
1087 impl Actor for NothingActor {}
1088
1089 let proc = Proc::local();
1092 let handle = proc.spawn("nothing", NothingActor).unwrap();
1093 let cell = handle.cell();
1094
1095 assert!(cell.downcast_handle::<EchoActor>().is_none());
1097
1098 let handle = cell.downcast_handle::<NothingActor>().unwrap();
1099 handle.drain_and_stop("test").unwrap();
1100 handle.await;
1101 }
1102
1103 #[derive(Debug)]
1105 #[hyperactor::export(handlers = [String, Callback])]
1106 struct GetSeqActor(reference::PortRef<(String, SeqInfo)>);
1107
1108 #[async_trait]
1109 impl Actor for GetSeqActor {}
1110
1111 #[async_trait]
1112 impl Handler<String> for GetSeqActor {
1113 async fn handle(
1114 &mut self,
1115 cx: &Context<Self>,
1116 message: String,
1117 ) -> Result<(), anyhow::Error> {
1118 let Self(port) = self;
1119 let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1120 port.send(cx, (message, seq_info.clone()))?;
1121 Ok(())
1122 }
1123 }
1124
1125 #[derive(Clone, Debug, Serialize, Deserialize, Named)]
1130 struct Callback(reference::PortRef<reference::PortRef<String>>);
1131
1132 #[async_trait]
1133 impl Handler<Callback> for GetSeqActor {
1134 async fn handle(
1135 &mut self,
1136 cx: &Context<Self>,
1137 message: Callback,
1138 ) -> Result<(), anyhow::Error> {
1139 let (handle, mut receiver) = cx.open_port::<String>();
1140 let callback_ref = handle.bind();
1141 message.0.send(cx, callback_ref).unwrap();
1142 let msg = receiver.recv().await.unwrap();
1143 self.handle(cx, msg).await
1144 }
1145 }
1146
1147 #[async_timed_test(timeout_secs = 30)]
1148 async fn test_sequencing_actor_handle_basic() {
1149 let proc = Proc::local();
1150 let (client, _) = proc.instance("client").unwrap();
1151 let (tx, mut rx) = client.open_port();
1152
1153 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1154
1155 actor_handle.send(&client, "unbound".to_string()).unwrap();
1157 assert_eq!(
1158 rx.recv().await.unwrap(),
1159 ("unbound".to_string(), SeqInfo::Direct)
1160 );
1161
1162 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1163
1164 let session_id = client.sequencer().session_id();
1165 let mut expected_seq = 0;
1166 for m in 0..10 {
1168 actor_handle.send(&client, format!("{m}")).unwrap();
1169 expected_seq += 1;
1170 assert_eq!(
1171 rx.recv().await.unwrap(),
1172 (
1173 format!("{m}"),
1174 SeqInfo::Session {
1175 session_id,
1176 seq: expected_seq,
1177 }
1178 )
1179 );
1180
1181 for n in 0..2 {
1182 actor_ref.port().send(&client, format!("{m}-{n}")).unwrap();
1183 expected_seq += 1;
1184 assert_eq!(
1185 rx.recv().await.unwrap(),
1186 (
1187 format!("{m}-{n}"),
1188 SeqInfo::Session {
1189 session_id,
1190 seq: expected_seq,
1191 }
1192 )
1193 );
1194 }
1195 }
1196 }
1197
1198 #[async_timed_test(timeout_secs = 30)]
1200 async fn test_sequencing_mixed_actor_and_non_actor_ports() {
1201 let proc = Proc::local();
1202 let (client, _) = proc.instance("client").unwrap();
1203
1204 let (actor_tx, mut actor_rx) = client.open_port();
1206
1207 let (non_actor_tx, mut non_actor_rx) = mpsc::unbounded_channel::<Option<SeqInfo>>();
1209
1210 let actor_handle = proc.spawn("get_seq", GetSeqActor(actor_tx.bind())).unwrap();
1211 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1212
1213 let non_actor_tx_clone = non_actor_tx.clone();
1215 let non_actor_port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1216 let seq_info = headers.get(SEQ_INFO);
1217 non_actor_tx_clone.send(seq_info).unwrap();
1218 Ok(())
1219 });
1220
1221 non_actor_port_handle.bind();
1223 let non_actor_port_id = match non_actor_port_handle.location() {
1224 PortLocation::Bound(port_id) => port_id,
1225 _ => panic!("port_handle should be bound"),
1226 };
1227 assert!(!non_actor_port_id.is_actor_port());
1228
1229 let session_id = client.sequencer().session_id();
1230
1231 actor_handle.send(&client, "msg1".to_string()).unwrap();
1233 assert_eq!(
1234 actor_rx.recv().await.unwrap().1,
1235 SeqInfo::Session { session_id, seq: 1 }
1236 );
1237
1238 actor_ref.port().send(&client, "msg2".to_string()).unwrap();
1240 assert_eq!(
1241 actor_rx.recv().await.unwrap().1,
1242 SeqInfo::Session { session_id, seq: 2 }
1243 );
1244
1245 non_actor_port_handle.send(&client, ()).unwrap();
1247 assert_eq!(
1248 non_actor_rx.recv().await.unwrap(),
1249 Some(SeqInfo::Session { session_id, seq: 1 })
1250 );
1251
1252 actor_handle.send(&client, "msg3".to_string()).unwrap();
1254 assert_eq!(
1255 actor_rx.recv().await.unwrap().1,
1256 SeqInfo::Session { session_id, seq: 3 }
1257 );
1258
1259 non_actor_port_handle.send(&client, ()).unwrap();
1261 assert_eq!(
1262 non_actor_rx.recv().await.unwrap(),
1263 Some(SeqInfo::Session { session_id, seq: 2 })
1264 );
1265
1266 actor_ref.port().send(&client, "msg4".to_string()).unwrap();
1268 assert_eq!(
1269 actor_rx.recv().await.unwrap().1,
1270 SeqInfo::Session { session_id, seq: 4 }
1271 );
1272
1273 actor_handle.drain_and_stop("test cleanup").unwrap();
1274 actor_handle.await;
1275 }
1276
1277 #[async_timed_test(timeout_secs = 30)]
1279 async fn test_sequencing_multiple_clients() {
1280 let proc = Proc::local();
1281 let (client1, _) = proc.instance("client1").unwrap();
1282 let (client2, _) = proc.instance("client2").unwrap();
1283
1284 let (tx, mut rx) = client1.open_port();
1286
1287 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1288 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1289
1290 let session_id_1 = client1.sequencer().session_id();
1292 let session_id_2 = client2.sequencer().session_id();
1293 assert_ne!(session_id_1, session_id_2);
1294
1295 actor_handle.send(&client1, "c1_msg1".to_string()).unwrap();
1297 assert_eq!(
1298 rx.recv().await.unwrap().1,
1299 SeqInfo::Session {
1300 session_id: session_id_1,
1301 seq: 1
1302 }
1303 );
1304
1305 actor_handle.send(&client2, "c2_msg1".to_string()).unwrap();
1307 assert_eq!(
1308 rx.recv().await.unwrap().1,
1309 SeqInfo::Session {
1310 session_id: session_id_2,
1311 seq: 1
1312 }
1313 );
1314
1315 actor_ref
1317 .port()
1318 .send(&client1, "c1_msg2".to_string())
1319 .unwrap();
1320 assert_eq!(
1321 rx.recv().await.unwrap().1,
1322 SeqInfo::Session {
1323 session_id: session_id_1,
1324 seq: 2
1325 }
1326 );
1327
1328 actor_ref
1330 .port()
1331 .send(&client2, "c2_msg2".to_string())
1332 .unwrap();
1333 assert_eq!(
1334 rx.recv().await.unwrap().1,
1335 SeqInfo::Session {
1336 session_id: session_id_2,
1337 seq: 2
1338 }
1339 );
1340
1341 actor_handle.send(&client1, "c1_msg3".to_string()).unwrap();
1343 assert_eq!(
1344 rx.recv().await.unwrap().1,
1345 SeqInfo::Session {
1346 session_id: session_id_1,
1347 seq: 3
1348 }
1349 );
1350
1351 actor_ref
1352 .port()
1353 .send(&client2, "c2_msg3".to_string())
1354 .unwrap();
1355 assert_eq!(
1356 rx.recv().await.unwrap().1,
1357 SeqInfo::Session {
1358 session_id: session_id_2,
1359 seq: 3
1360 }
1361 );
1362
1363 actor_handle.drain_and_stop("test cleanup").unwrap();
1364 actor_handle.await;
1365 }
1366
1367 #[async_timed_test(timeout_secs = 30)]
1388 async fn test_sequencing_actor_handle_callback() {
1389 let config = hyperactor_config::global::lock();
1390 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1391
1392 let proc = Proc::local();
1393 let (client, _) = proc.instance("client").unwrap();
1394 let (tx, mut rx) = client.open_port();
1395
1396 let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1397 let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1398
1399 let (callback_tx, mut callback_rx) = client.open_port();
1400 actor_ref
1402 .send(&client, Callback(callback_tx.bind()))
1403 .unwrap();
1404 let msg_port_ref = callback_rx.recv().await.unwrap();
1405 msg_port_ref.send(&client, "finally".to_string()).unwrap();
1408
1409 let session_id = client.sequencer().session_id();
1410 assert_eq!(
1412 rx.recv().await.unwrap(),
1413 (
1414 "finally".to_string(),
1415 SeqInfo::Session { session_id, seq: 1 }
1416 )
1417 );
1418 }
1419
1420 #[derive(Clone, Debug)]
1423 struct DelayedMailboxSender {
1424 relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1425 }
1426
1427 impl DelayedMailboxSender {
1428 fn new(
1430 dest_proc: Proc,
1433 relay_orders: Vec<usize>,
1437 ) -> Self {
1438 let (relay_tx, mut relay_rx) = mpsc::unbounded_channel::<MessageEnvelope>();
1439
1440 tokio::spawn(async move {
1441 let mut buffer = Vec::new();
1442
1443 for _ in 0..relay_orders.len() {
1444 let envelope = relay_rx.recv().await.unwrap();
1445 buffer.push(envelope);
1446 }
1447
1448 for m in buffer.clone() {
1449 let seq = match m.headers().get(SEQ_INFO).expect("seq should be set") {
1450 SeqInfo::Session { seq, .. } => seq as usize,
1451 SeqInfo::Direct => panic!("expected Session variant"),
1452 };
1453 let order = relay_orders[seq - 1];
1455 buffer[order] = m;
1456 }
1457
1458 let dest_proc_clone = dest_proc.clone();
1459 for msg in buffer {
1460 dest_proc_clone.post(msg, monitored_return_handle());
1461 }
1462 });
1463
1464 Self { relay_tx }
1465 }
1466 }
1467
1468 #[async_trait]
1469 impl MailboxSender for DelayedMailboxSender {
1470 fn post_unchecked(
1471 &self,
1472 envelope: MessageEnvelope,
1473 _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1474 ) {
1475 self.relay_tx.send(envelope).unwrap();
1476 }
1477 }
1478
1479 async fn assert_out_of_order_delivery(expected: Vec<(String, u64)>, relay_orders: Vec<usize>) {
1480 let local_proc: Proc = Proc::local();
1481 let (client, _) = local_proc.instance("local").unwrap();
1482 let (tx, mut rx) = client.open_port();
1483
1484 let handle = local_proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1485 let actor_ref: reference::ActorRef<GetSeqActor> = handle.bind();
1486
1487 let remote_proc = Proc::configured(
1488 test_proc_id("remote_0"),
1489 DelayedMailboxSender::new(local_proc.clone(), relay_orders).boxed(),
1490 );
1491 let (remote_client, _) = remote_proc.instance("remote").unwrap();
1492 let mut messages = expected.clone();
1494 messages.sort_by_key(|v| v.1);
1495 for (message, _seq) in messages {
1496 actor_ref.send(&remote_client, message).unwrap();
1497 }
1498 let session_id = remote_client.sequencer().session_id();
1499 for expect in expected {
1500 let expected = (
1501 expect.0,
1502 SeqInfo::Session {
1503 session_id,
1504 seq: expect.1,
1505 },
1506 );
1507 assert_eq!(rx.recv().await.unwrap(), expected);
1508 }
1509
1510 handle.drain_and_stop("test cleanup").unwrap();
1511 handle.await;
1512 }
1513
1514 #[async_timed_test(timeout_secs = 30)]
1519 async fn test_sequencing_actor_ref_known_delivery_order() {
1520 let config = hyperactor_config::global::lock();
1521
1522 let relay_orders = vec![2, 0, 1];
1524
1525 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
1528 assert_out_of_order_delivery(
1529 vec![
1530 ("second".to_string(), 2),
1531 ("third".to_string(), 3),
1532 ("first".to_string(), 1),
1533 ],
1534 relay_orders.clone(),
1535 )
1536 .await;
1537
1538 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1541 assert_out_of_order_delivery(
1542 vec![
1543 ("first".to_string(), 1),
1544 ("second".to_string(), 2),
1545 ("third".to_string(), 3),
1546 ],
1547 relay_orders.clone(),
1548 )
1549 .await;
1550 }
1551
1552 #[async_timed_test(timeout_secs = 30)]
1557 async fn test_sequencing_actor_ref_random_delivery_order() {
1558 let config = hyperactor_config::global::lock();
1559
1560 let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1563 let expected = (0..10000)
1564 .map(|i| (format!("msg{i}"), i + 1))
1565 .collect::<Vec<_>>();
1566
1567 let mut relay_orders: Vec<usize> = (0..10000).collect();
1568 relay_orders.shuffle(&mut rand::rng());
1569 assert_out_of_order_delivery(expected, relay_orders).await;
1570 }
1571
1572 #[tokio::test]
1590 async fn test_introspect_query_default_payload() {
1591 let proc = Proc::local();
1592 let (client, _) = proc.instance("client").unwrap();
1593 let (tx, _rx) = client.open_port::<u64>();
1594 let actor = EchoActor(tx.bind());
1595 let handle = proc.spawn::<EchoActor>("echo_introspect", actor).unwrap();
1596
1597 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1598 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1599 .send(
1600 &client,
1601 IntrospectMessage::Query {
1602 view: IntrospectView::Actor,
1603 reply: reply_port.bind(),
1604 },
1605 )
1606 .unwrap();
1607 let payload = reply_rx.recv().await.unwrap();
1608
1609 assert_eq!(
1610 payload.identity,
1611 crate::introspect::IntrospectRef::Actor(handle.actor_id().clone())
1612 );
1613 assert_valid_attrs(&payload);
1614 assert_has_attr(&payload, "status");
1615 assert_has_attr(&payload, "actor_type");
1616 assert_has_attr(&payload, "created_at");
1617 assert!(payload.children.is_empty());
1618 assert!(payload.parent.is_none());
1619
1620 handle.drain_and_stop("test").unwrap();
1621 handle.await;
1622 }
1623
1624 fn attrs_get(attrs_json: &str, short_name: &str) -> Option<serde_json::Value> {
1626 use hyperactor_config::INTROSPECT;
1627 use hyperactor_config::attrs::AttrKeyInfo;
1628 let fq_name = inventory::iter::<AttrKeyInfo>()
1629 .find(|info| {
1630 info.meta
1631 .get(INTROSPECT)
1632 .is_some_and(|ia| ia.name == short_name)
1633 })
1634 .map(|info| info.name)?;
1635 let obj: serde_json::Value = serde_json::from_str(attrs_json).ok()?;
1636 obj.get(fq_name).cloned()
1637 }
1638
1639 fn assert_valid_attrs(result: &IntrospectResult) {
1641 let parsed: serde_json::Value =
1642 serde_json::from_str(&result.attrs).expect("IA-1: attrs must be valid JSON");
1643 assert!(parsed.is_object(), "IA-1: attrs must be a JSON object");
1644 }
1645
1646 fn assert_status(result: &IntrospectResult, expected: &str) {
1648 let status = attrs_get(&result.attrs, "status")
1649 .and_then(|v| v.as_str().map(String::from))
1650 .expect("attrs must contain status");
1651 assert_eq!(status, expected, "unexpected actor status");
1652 }
1653
1654 fn assert_handler(result: &IntrospectResult, expected: Option<&str>) {
1656 let handler =
1657 attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from));
1658 assert_eq!(handler.as_deref(), expected);
1659 }
1660
1661 fn assert_error_code(result: &IntrospectResult, expected: &str) {
1663 let code = attrs_get(&result.attrs, "error_code")
1664 .and_then(|v| v.as_str().map(String::from))
1665 .expect("attrs must contain error_code");
1666 assert_eq!(code, expected);
1667 }
1668
1669 fn assert_handler_not_contains(result: &IntrospectResult, forbidden: &str) {
1671 if let Some(handler) =
1672 attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from))
1673 {
1674 assert!(
1675 !handler.contains(forbidden),
1676 "handler should not contain '{}'; got: {}",
1677 forbidden,
1678 handler
1679 );
1680 }
1681 }
1682
1683 fn assert_has_attr(result: &IntrospectResult, short_name: &str) {
1685 assert!(
1686 attrs_get(&result.attrs, short_name).is_some(),
1687 "attrs must contain '{}'",
1688 short_name
1689 );
1690 }
1691
1692 fn assert_status_contains(result: &IntrospectResult, substring: &str) {
1695 let status = attrs_get(&result.attrs, "status")
1696 .and_then(|v| v.as_str().map(String::from))
1697 .expect("attrs must contain status");
1698 assert!(
1699 status.contains(substring),
1700 "status should contain '{}'; got: {}",
1701 substring,
1702 status
1703 );
1704 }
1705
1706 fn assert_no_status_reason(result: &IntrospectResult) {
1708 assert!(
1709 attrs_get(&result.attrs, "status_reason").is_none(),
1710 "IA-3: must not have status_reason"
1711 );
1712 }
1713
1714 fn assert_has_handler(result: &IntrospectResult) {
1716 assert!(
1717 attrs_get(&result.attrs, "last_handler").is_some(),
1718 "must have a handler"
1719 );
1720 }
1721
1722 fn assert_no_failure_attrs(result: &IntrospectResult) {
1724 assert!(
1725 attrs_get(&result.attrs, "failure_error_message").is_none(),
1726 "IA-4: must not have failure attrs"
1727 );
1728 }
1729
1730 #[tokio::test]
1735 async fn test_ia1_ia4_running_actor_attrs() {
1736 let proc = Proc::local();
1737 let (client, _) = proc.instance("client").unwrap();
1738 let (tx, _rx) = client.open_port::<u64>();
1739 let actor = EchoActor(tx.bind());
1740 let handle = proc.spawn::<EchoActor>("ia_test", actor).unwrap();
1741
1742 let payload = crate::introspect::live_actor_payload(handle.cell());
1743
1744 assert_valid_attrs(&payload);
1746
1747 assert_has_attr(&payload, "status");
1749 assert_no_status_reason(&payload);
1750
1751 assert_no_failure_attrs(&payload);
1753
1754 handle.drain_and_stop("test").unwrap();
1755 handle.await;
1756 }
1757
1758 #[tokio::test]
1764 async fn test_introspect_query_child_not_found() {
1765 let proc = Proc::local();
1766 let (client, _) = proc.instance("client").unwrap();
1767 let (tx, _rx) = client.open_port::<u64>();
1768 let actor = EchoActor(tx.bind());
1769 let handle = proc.spawn::<EchoActor>("echo_qc", actor).unwrap();
1770
1771 let child_ref =
1772 reference::Reference::Actor(test_proc_id("nonexistent").actor_id("child", 0));
1773 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1774 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1775 .send(
1776 &client,
1777 IntrospectMessage::QueryChild {
1778 child_ref,
1779 reply: reply_port.bind(),
1780 },
1781 )
1782 .unwrap();
1783 let payload = reply_rx.recv().await.unwrap();
1784
1785 assert_eq!(
1786 payload.identity,
1787 crate::introspect::IntrospectRef::Actor(
1788 test_proc_id("nonexistent").actor_id("child", 0)
1789 )
1790 );
1791 assert_error_code(&payload, "not_found");
1792
1793 handle.drain_and_stop("test").unwrap();
1794 handle.await;
1795 }
1796
1797 #[tokio::test]
1803 async fn test_introspect_override() {
1804 #[derive(Debug, Default)]
1805 #[hyperactor::export(handlers = [])]
1806 struct CustomIntrospectActor;
1807
1808 #[async_trait]
1809 impl Actor for CustomIntrospectActor {}
1810
1811 let proc = Proc::local();
1812 let (client, _) = proc.instance("client").unwrap();
1813 let handle = proc
1814 .spawn("custom_introspect", CustomIntrospectActor)
1815 .unwrap();
1816
1817 handle
1818 .status()
1819 .wait_for(|s| matches!(s, ActorStatus::Idle))
1820 .await
1821 .unwrap();
1822
1823 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1824 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1825 .send(
1826 &client,
1827 IntrospectMessage::Query {
1828 view: IntrospectView::Actor,
1829 reply: reply_port.bind(),
1830 },
1831 )
1832 .unwrap();
1833 let payload = reply_rx.recv().await.unwrap();
1834
1835 assert_has_attr(&payload, "status");
1838
1839 handle.drain_and_stop("test").unwrap();
1840 handle.await;
1841 }
1842
1843 #[tokio::test]
1847 async fn test_introspect_query_supervision_child() {
1848 let proc = Proc::local();
1849 let (client, _) = proc.instance("client").unwrap();
1850
1851 let (tx_parent, _rx_parent) = client.open_port::<u64>();
1853 let parent_handle = proc
1854 .spawn::<EchoActor>("parent", EchoActor(tx_parent.bind()))
1855 .unwrap();
1856
1857 let (tx_child, _rx_child) = client.open_port::<u64>();
1859 let child_handle = proc
1860 .spawn_child::<EchoActor>(parent_handle.cell().clone(), EchoActor(tx_child.bind()))
1861 .unwrap();
1862
1863 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1865 reference::PortRef::<IntrospectMessage>::attest_message_port(child_handle.actor_id())
1866 .send(
1867 &client,
1868 IntrospectMessage::Query {
1869 view: IntrospectView::Actor,
1870 reply: reply_port.bind(),
1871 },
1872 )
1873 .unwrap();
1874 let child_payload = reply_rx.recv().await.unwrap();
1875
1876 assert_eq!(
1877 child_payload.identity,
1878 crate::introspect::IntrospectRef::Actor(child_handle.actor_id().clone()),
1879 );
1880 assert!(
1882 attrs_get(&child_payload.attrs, "status").is_some(),
1883 "child should have actor attrs"
1884 );
1885 assert_eq!(
1886 child_payload.parent,
1887 Some(crate::introspect::IntrospectRef::Actor(
1888 parent_handle.actor_id().clone()
1889 )),
1890 );
1891
1892 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1894 reference::PortRef::<IntrospectMessage>::attest_message_port(parent_handle.actor_id())
1895 .send(
1896 &client,
1897 IntrospectMessage::Query {
1898 view: IntrospectView::Actor,
1899 reply: reply_port.bind(),
1900 },
1901 )
1902 .unwrap();
1903 let parent_payload = reply_rx.recv().await.unwrap();
1904
1905 assert!(parent_payload.parent.is_none());
1906 assert!(
1907 parent_payload
1908 .children
1909 .contains(&crate::introspect::IntrospectRef::Actor(
1910 child_handle.actor_id().clone()
1911 )),
1912 );
1913
1914 child_handle.drain_and_stop("test").unwrap();
1915 child_handle.await;
1916 parent_handle.drain_and_stop("test").unwrap();
1917 parent_handle.await;
1918 }
1919
1920 #[tokio::test]
1925 async fn test_introspect_fresh_actor_status() {
1926 let proc = Proc::local();
1927 let (client, _) = proc.instance("client").unwrap();
1928 let (tx, _rx) = client.open_port::<u64>();
1929 let actor = EchoActor(tx.bind());
1930 let handle = proc.spawn::<EchoActor>("echo_fresh", actor).unwrap();
1931
1932 handle
1934 .status()
1935 .wait_for(|s| matches!(s, ActorStatus::Idle))
1936 .await
1937 .unwrap();
1938
1939 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1940 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1941 .send(
1942 &client,
1943 IntrospectMessage::Query {
1944 view: IntrospectView::Actor,
1945 reply: reply_port.bind(),
1946 },
1947 )
1948 .unwrap();
1949 let payload = reply_rx.recv().await.unwrap();
1950
1951 assert_status(&payload, "idle");
1952 assert_handler(&payload, None);
1953
1954 handle.drain_and_stop("test").unwrap();
1955 handle.await;
1956 }
1957
1958 #[tokio::test]
1963 async fn test_introspect_after_user_message() {
1964 let proc = Proc::local();
1965 let (client, _) = proc.instance("client").unwrap();
1966 let (tx, mut rx) = client.open_port::<u64>();
1967 let actor = EchoActor(tx.bind());
1968 let handle = proc.spawn::<EchoActor>("echo_after_msg", actor).unwrap();
1969
1970 handle.send(&client, 42u64).unwrap();
1972 let _ = rx.recv().await.unwrap();
1973
1974 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1975 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1976 .send(
1977 &client,
1978 IntrospectMessage::Query {
1979 view: IntrospectView::Actor,
1980 reply: reply_port.bind(),
1981 },
1982 )
1983 .unwrap();
1984 let payload = reply_rx.recv().await.unwrap();
1985
1986 assert_status(&payload, "idle");
1987 assert_has_handler(&payload);
1988 assert_handler_not_contains(&payload, "IntrospectMessage");
1989
1990 handle.drain_and_stop("test").unwrap();
1991 handle.await;
1992 }
1993
1994 #[tokio::test]
1999 async fn test_introspect_consecutive_queries() {
2000 let proc = Proc::local();
2001 let (client, _) = proc.instance("client").unwrap();
2002 let (tx, _rx) = client.open_port::<u64>();
2003 let actor = EchoActor(tx.bind());
2004 let handle = proc.spawn::<EchoActor>("echo_consec", actor).unwrap();
2005
2006 handle
2007 .status()
2008 .wait_for(|s| matches!(s, ActorStatus::Idle))
2009 .await
2010 .unwrap();
2011
2012 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2014 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2015 .send(
2016 &client,
2017 IntrospectMessage::Query {
2018 view: IntrospectView::Actor,
2019 reply: reply_port.bind(),
2020 },
2021 )
2022 .unwrap();
2023 let payload1 = reply_rx.recv().await.unwrap();
2024
2025 let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2027 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2028 .send(
2029 &client,
2030 IntrospectMessage::Query {
2031 view: IntrospectView::Actor,
2032 reply: reply_port2.bind(),
2033 },
2034 )
2035 .unwrap();
2036 let payload2 = reply_rx2.recv().await.unwrap();
2037
2038 assert_handler(&payload1, None);
2040 assert_handler(&payload2, None);
2041
2042 handle.drain_and_stop("test").unwrap();
2043 handle.await;
2044 }
2045
2046 #[tokio::test]
2054 async fn test_publish_attrs_round_trip() {
2055 use hyperactor_config::Attrs;
2056 use hyperactor_config::declare_attrs;
2057
2058 declare_attrs! {
2059 attr TEST_KEY_A: String;
2060 attr TEST_KEY_B: u64;
2061 }
2062
2063 let proc = Proc::local();
2064 let (client, _) = proc.instance("client").unwrap();
2065 let (tx, _rx) = client.open_port::<u64>();
2066 let actor = EchoActor(tx.bind());
2067 let handle = proc.spawn::<EchoActor>("echo_attrs", actor).unwrap();
2068
2069 assert!(handle.cell().published_attrs().is_none());
2071
2072 let mut attrs = Attrs::new();
2074 attrs.set(TEST_KEY_A, "hello".to_string());
2075 handle.cell().set_published_attrs(attrs);
2076 let published = handle.cell().published_attrs().unwrap();
2077 assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2078
2079 handle.cell().merge_published_attr(TEST_KEY_B, 42u64);
2081 let published = handle.cell().published_attrs().unwrap();
2082 assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2083 assert_eq!(published.get(TEST_KEY_B), Some(&42u64));
2084
2085 handle
2087 .cell()
2088 .merge_published_attr(TEST_KEY_A, "world".to_string());
2089 let published = handle.cell().published_attrs().unwrap();
2090 assert_eq!(published.get(TEST_KEY_A), Some(&"world".to_string()));
2091
2092 handle.drain_and_stop("test").unwrap();
2093 handle.await;
2094 }
2095
2096 #[tokio::test]
2099 async fn test_query_child_handler_round_trip() {
2100 let proc = Proc::local();
2101 let (client, _) = proc.instance("client").unwrap();
2102 let (tx, _rx) = client.open_port::<u64>();
2103 let actor = EchoActor(tx.bind());
2104 let handle = proc.spawn::<EchoActor>("echo_qch", actor).unwrap();
2105
2106 let test_ref = reference::Reference::Actor(test_proc_id("test").actor_id("child", 0));
2108 assert!(handle.cell().query_child(&test_ref).is_none());
2109
2110 handle.cell().set_query_child_handler(|child_ref| {
2112 use crate::introspect::IntrospectRef;
2113 let identity = match &child_ref {
2114 reference::Reference::Proc(id) => IntrospectRef::Proc(id.clone()),
2115 reference::Reference::Actor(id) => IntrospectRef::Actor(id.clone()),
2116 reference::Reference::Port(id) => IntrospectRef::Actor(id.actor_id().clone()),
2117 };
2118 IntrospectResult {
2119 identity,
2120 attrs: serde_json::json!({
2121 "proc_name": "test_proc",
2122 "num_actors": 42,
2123 })
2124 .to_string(),
2125 children: Vec::new(),
2126 parent: None,
2127 as_of: std::time::SystemTime::now(),
2128 }
2129 });
2130
2131 let payload = handle
2133 .cell()
2134 .query_child(&test_ref)
2135 .expect("callback should produce a payload");
2136 assert_eq!(
2137 payload.identity,
2138 crate::introspect::IntrospectRef::Actor(test_proc_id("test").actor_id("child", 0))
2139 );
2140 let attrs: serde_json::Value =
2141 serde_json::from_str(&payload.attrs).expect("attrs must be valid JSON");
2142 assert_eq!(
2143 attrs.get("proc_name").and_then(|v| v.as_str()),
2144 Some("test_proc")
2145 );
2146 assert_eq!(attrs.get("num_actors").and_then(|v| v.as_u64()), Some(42));
2147
2148 handle.drain_and_stop("test").unwrap();
2149 handle.await;
2150 }
2151
2152 #[tokio::test]
2158 async fn test_introspect_wedged() {
2159 #[derive(Debug, Default)]
2160 #[hyperactor::export(handlers = [u64])]
2161 struct WedgedActor;
2162
2163 #[async_trait]
2164 impl Actor for WedgedActor {}
2165
2166 #[async_trait]
2167 impl Handler<u64> for WedgedActor {
2168 async fn handle(
2169 &mut self,
2170 _cx: &Context<Self>,
2171 _message: u64,
2172 ) -> Result<(), anyhow::Error> {
2173 std::future::pending::<()>().await;
2175 Ok(())
2176 }
2177 }
2178
2179 let proc = Proc::local();
2180 let (client, _) = proc.instance("client").unwrap();
2181 let handle = proc.spawn("wedged", WedgedActor).unwrap();
2182
2183 handle
2185 .status()
2186 .wait_for(|s| matches!(s, ActorStatus::Idle))
2187 .await
2188 .unwrap();
2189
2190 handle.send(&client, 1u64).unwrap();
2192
2193 tokio::time::sleep(Duration::from_millis(50)).await;
2195
2196 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2198 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2199 .send(
2200 &client,
2201 IntrospectMessage::Query {
2202 view: IntrospectView::Actor,
2203 reply: reply_port.bind(),
2204 },
2205 )
2206 .unwrap();
2207
2208 let payload = tokio::time::timeout(Duration::from_secs(5), reply_rx.recv())
2210 .await
2211 .expect("introspect should not hang on a wedged actor")
2212 .unwrap();
2213
2214 assert_status_contains(&payload, "processing");
2215 assert_handler_not_contains(&payload, "IntrospectMessage");
2216 }
2217
2218 #[tokio::test]
2223 async fn test_introspect_no_perturbation() {
2224 let proc = Proc::local();
2225 let (client, _) = proc.instance("client").unwrap();
2226 let (tx, mut rx) = client.open_port::<u64>();
2227 let actor = EchoActor(tx.bind());
2228 let handle = proc.spawn::<EchoActor>("echo_no_perturb", actor).unwrap();
2229
2230 handle
2232 .status()
2233 .wait_for(|s| matches!(s, ActorStatus::Idle))
2234 .await
2235 .unwrap();
2236
2237 handle.send(&client, 42u64).unwrap();
2239 let _ = rx.recv().await.unwrap();
2240
2241 let (reply_port1, reply_rx1) = client.open_once_port::<IntrospectResult>();
2243 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2244 .send(
2245 &client,
2246 IntrospectMessage::Query {
2247 view: IntrospectView::Actor,
2248 reply: reply_port1.bind(),
2249 },
2250 )
2251 .unwrap();
2252 let payload1 = reply_rx1.recv().await.unwrap();
2253
2254 let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2256 reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2257 .send(
2258 &client,
2259 IntrospectMessage::Query {
2260 view: IntrospectView::Actor,
2261 reply: reply_port2.bind(),
2262 },
2263 )
2264 .unwrap();
2265 let payload2 = reply_rx2.recv().await.unwrap();
2266
2267 assert_handler_not_contains(&payload1, "IntrospectMessage");
2269 assert_handler_not_contains(&payload2, "IntrospectMessage");
2270 let attrs1: serde_json::Value = serde_json::from_str(&payload1.attrs).unwrap();
2273 let attrs2: serde_json::Value = serde_json::from_str(&payload2.attrs).unwrap();
2274 assert_eq!(attrs1, attrs2, "consecutive queries should be identical");
2275
2276 handle.drain_and_stop("test").unwrap();
2277 handle.await;
2278 }
2279
2280 #[tokio::test]
2287 async fn test_introspectable_instance_responds_to_query() {
2288 let proc = Proc::local();
2289 let (bridge, handle) = proc.introspectable_instance("bridge").unwrap();
2290 let actor_id = handle.actor_id().clone();
2291
2292 let (reply_port, reply_rx) = bridge.open_once_port::<IntrospectResult>();
2293 reference::PortRef::<IntrospectMessage>::attest_message_port(&actor_id)
2294 .send(
2295 &bridge,
2296 IntrospectMessage::Query {
2297 view: IntrospectView::Actor,
2298 reply: reply_port.bind(),
2299 },
2300 )
2301 .unwrap();
2302 let payload = reply_rx.recv().await.unwrap();
2303
2304 assert_eq!(
2307 payload.identity,
2308 crate::introspect::IntrospectRef::Actor(actor_id.clone())
2309 );
2310 assert_status(&payload, "client");
2311 let actor_type = attrs_get(&payload.attrs, "actor_type")
2312 .and_then(|v| v.as_str().map(String::from))
2313 .expect("must have actor_type");
2314 assert_eq!(actor_type, "()", "CI-1: actor_type must be \"()\"");
2315 }
2316
2317 #[tokio::test]
2325 async fn test_instance_does_not_respond_to_query() {
2326 let proc = Proc::local();
2327 let (client, _client_handle) = proc.instance("client").unwrap();
2328 let (_mailbox, mailbox_handle) = proc.instance("mailbox").unwrap();
2329 let mailbox_id = mailbox_handle.actor_id().clone();
2330
2331 let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2332 reference::PortRef::<IntrospectMessage>::attest_message_port(&mailbox_id)
2333 .send(
2334 &client,
2335 IntrospectMessage::Query {
2336 view: IntrospectView::Actor,
2337 reply: reply_port.bind(),
2338 },
2339 )
2340 .unwrap();
2341
2342 let result = tokio::time::timeout(Duration::from_millis(100), reply_rx.recv()).await;
2345 assert!(
2346 result.is_err(),
2347 "instance() must not respond to IntrospectMessage (introspect receiver dropped)"
2348 );
2349 }
2350
2351 #[tokio::test]
2356 async fn test_introspectable_instance_snapshot_on_drop() {
2357 let proc = Proc::local();
2358 let (instance, handle) = proc.introspectable_instance("bridge").unwrap();
2359 let actor_id = handle.actor_id().clone();
2360
2361 assert!(
2362 proc.all_actor_ids().contains(&actor_id),
2363 "should appear in all_actor_ids while live"
2364 );
2365
2366 drop(instance);
2369
2370 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2371 loop {
2372 if proc.terminated_snapshot(&actor_id).is_some() {
2373 break;
2374 }
2375 assert!(
2376 std::time::Instant::now() < deadline,
2377 "timed out waiting for terminated snapshot"
2378 );
2379 tokio::task::yield_now().await;
2380 }
2381
2382 let snapshot = proc.terminated_snapshot(&actor_id).unwrap();
2383 let actor_status = attrs_get(&snapshot.attrs, "status")
2384 .and_then(|v| v.as_str().map(String::from))
2385 .expect("snapshot attrs must contain status");
2386 assert!(
2387 actor_status.starts_with("stopped"),
2388 "CI-2: snapshot actor_status should be stopped, got: {}",
2389 actor_status
2390 );
2391 }
2392}