1#![allow(dead_code)] use std::any::TypeId;
14use std::fmt;
15use std::fmt::Debug;
16use std::future::Future;
17use std::future::IntoFuture;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use async_trait::async_trait;
23use enum_as_inner::EnumAsInner;
24use futures::FutureExt;
25use futures::future::BoxFuture;
26use serde::Deserialize;
27use serde::Serialize;
28use tokio::sync::watch;
29use tokio::task::JoinHandle;
30use typeuri::Named;
31
32use crate as hyperactor; use crate::ActorRef;
34use crate::Data;
35use crate::Message;
36use crate::RemoteMessage;
37use crate::checkpoint::CheckpointError;
38use crate::checkpoint::Checkpointable;
39use crate::clock::Clock;
40use crate::clock::RealClock;
41use crate::context;
42use crate::mailbox::MailboxError;
43use crate::mailbox::MailboxSenderError;
44use crate::mailbox::MessageEnvelope;
45use crate::mailbox::PortHandle;
46use crate::mailbox::Undeliverable;
47use crate::mailbox::UndeliverableMessageError;
48use crate::mailbox::log::MessageLogError;
49use crate::message::Castable;
50use crate::message::IndexedErasedUnbound;
51use crate::proc::Context;
52use crate::proc::Instance;
53use crate::proc::InstanceCell;
54use crate::proc::Ports;
55use crate::proc::Proc;
56use crate::reference::ActorId;
57use crate::reference::Index;
58use crate::supervision::ActorSupervisionEvent;
59
60pub mod remote;
61
62#[async_trait]
70pub trait Actor: Sized + Send + 'static {
71 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
76 Ok(())
78 }
79
80 async fn cleanup(
91 &mut self,
92 _this: &Instance<Self>,
93 _err: Option<&ActorError>,
94 ) -> Result<(), anyhow::Error> {
95 Ok(())
97 }
98
99 fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102 cx.instance().spawn(self)
103 }
104
105 fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112 Proc::local().spawn("anon", self)
113 }
114
115 #[hyperactor::instrument_infallible]
119 fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
120 where
121 F: Future + Send + 'static,
122 F::Output: Send + 'static,
123 {
124 tokio::spawn(future)
125 }
126
127 async fn handle_supervision_event(
129 &mut self,
130 _this: &Instance<Self>,
131 _event: &ActorSupervisionEvent,
132 ) -> Result<bool, anyhow::Error> {
133 Ok(false)
135 }
136
137 async fn handle_undeliverable_message(
139 &mut self,
140 cx: &Instance<Self>,
141 envelope: Undeliverable<MessageEnvelope>,
142 ) -> Result<(), anyhow::Error> {
143 handle_undeliverable_message(cx, envelope)
144 }
145
146 fn display_name(&self) -> Option<String> {
150 None
151 }
152}
153
154pub fn handle_undeliverable_message<A: Actor>(
158 cx: &Instance<A>,
159 Undeliverable(envelope): Undeliverable<MessageEnvelope>,
160) -> Result<(), anyhow::Error> {
161 assert_eq!(envelope.sender(), cx.self_id());
162
163 anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
164}
165
166#[async_trait]
169impl Actor for () {}
170
171impl Referable for () {}
172
173impl Binds<()> for () {
174 fn bind(_ports: &Ports<Self>) {
175 }
177}
178
179#[async_trait]
181pub trait Handler<M>: Actor {
182 async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
184}
185
186#[async_trait]
189impl<A: Actor> Handler<Signal> for A {
190 async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
191 unimplemented!("signal handler should not be called directly")
192 }
193}
194
195#[async_trait]
198impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
199 async fn handle(
200 &mut self,
201 cx: &Context<Self>,
202 message: Undeliverable<MessageEnvelope>,
203 ) -> Result<(), anyhow::Error> {
204 self.handle_undeliverable_message(cx, message).await
205 }
206}
207
208#[async_trait]
211impl<A, M> Handler<IndexedErasedUnbound<M>> for A
212where
213 A: Handler<M>,
214 M: Castable,
215{
216 async fn handle(
217 &mut self,
218 cx: &Context<Self>,
219 msg: IndexedErasedUnbound<M>,
220 ) -> anyhow::Result<()> {
221 let message = msg.downcast()?.bind()?;
222 Handler::handle(self, cx, message).await
223 }
224}
225
226#[async_trait]
242pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
243 type Params: RemoteMessage;
245
246 async fn new(params: Self::Params) -> anyhow::Result<Self>;
248
249 fn gspawn(
254 proc: &Proc,
255 name: &str,
256 serialized_params: Data,
257 ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>> {
258 let proc = proc.clone();
259 let name = name.to_string();
260 Box::pin(async move {
261 let params = bincode::deserialize(&serialized_params)?;
262 let actor = Self::new(params).await?;
263 let handle = proc.spawn(&name, actor)?;
264 Ok(handle.bind::<Self>().actor_id)
276 })
277 }
278
279 fn get_type_id() -> TypeId {
281 TypeId::of::<Self>()
282 }
283}
284
285#[async_trait]
288impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
289 type Params = ();
290
291 async fn new(_params: Self::Params) -> anyhow::Result<Self> {
292 Ok(Default::default())
293 }
294}
295
296#[async_trait]
297impl<T> Checkpointable for T
298where
299 T: RemoteMessage + Clone,
300{
301 type State = T;
302 async fn save(&self) -> Result<Self::State, CheckpointError> {
303 Ok(self.clone())
304 }
305
306 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
307 Ok(state)
308 }
309}
310
311#[derive(Debug)]
314pub struct ActorError {
315 pub actor_id: Box<ActorId>,
317 pub kind: Box<ActorErrorKind>,
319}
320
321#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
323pub enum ActorErrorKind {
324 #[error("{0}")]
326 Generic(String),
327
328 #[error("{0} while handling {1}")]
330 ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
331 #[error("{0}")]
333 UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
334}
335
336impl ActorErrorKind {
337 pub fn processing(err: anyhow::Error) -> Self {
340 err.downcast::<ActorErrorKind>()
345 .unwrap_or_else(|err| Self::Generic(err.to_string()))
346 }
347
348 pub fn panic(err: anyhow::Error) -> Self {
350 Self::Generic(format!("panic: {}", err))
351 }
352
353 pub fn init(err: anyhow::Error) -> Self {
355 Self::Generic(format!("initialization error: {}", err))
356 }
357
358 pub fn cleanup(err: anyhow::Error) -> Self {
360 Self::Generic(format!("cleanup error: {}", err))
361 }
362
363 pub fn mailbox(err: MailboxError) -> Self {
365 Self::Generic(err.to_string())
366 }
367
368 pub fn mailbox_sender(err: MailboxSenderError) -> Self {
370 Self::Generic(err.to_string())
371 }
372
373 pub fn checkpoint(err: CheckpointError) -> Self {
375 Self::Generic(format!("checkpoint error: {}", err))
376 }
377
378 pub fn message_log(err: MessageLogError) -> Self {
380 Self::Generic(format!("message log error: {}", err))
381 }
382
383 pub fn indeterminate_state() -> Self {
385 Self::Generic("actor is in an indeterminate state".to_string())
386 }
387}
388
389impl ActorError {
390 pub(crate) fn new(actor_id: &ActorId, kind: ActorErrorKind) -> Self {
392 Self {
393 actor_id: Box::new(actor_id.clone()),
394 kind: Box::new(kind),
395 }
396 }
397}
398
399impl fmt::Display for ActorError {
400 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401 write!(f, "serving {}: ", self.actor_id)?;
402 fmt::Display::fmt(&self.kind, f)
403 }
404}
405
406impl std::error::Error for ActorError {
407 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
408 self.kind.source()
409 }
410}
411
412impl From<MailboxError> for ActorError {
413 fn from(inner: MailboxError) -> Self {
414 Self {
415 actor_id: Box::new(inner.actor_id().clone()),
416 kind: Box::new(ActorErrorKind::mailbox(inner)),
417 }
418 }
419}
420
421impl From<MailboxSenderError> for ActorError {
422 fn from(inner: MailboxSenderError) -> Self {
423 Self {
424 actor_id: Box::new(inner.location().actor_id().clone()),
425 kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
426 }
427 }
428}
429
430#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
437pub enum Signal {
438 DrainAndStop,
440
441 Stop,
443
444 ChildStopped(Index),
446}
447wirevalue::register_type!(Signal);
448
449impl fmt::Display for Signal {
450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451 match self {
452 Signal::DrainAndStop => write!(f, "DrainAndStop"),
453 Signal::Stop => write!(f, "Stop"),
454 Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
455 }
456 }
457}
458
459#[derive(
461 Debug,
462 Serialize,
463 Deserialize,
464 PartialEq,
465 Eq,
466 Clone,
467 typeuri::Named,
468 EnumAsInner
469)]
470pub enum ActorStatus {
471 Unknown,
473 Created,
475 Initializing,
477 Client,
480 Idle,
482 Processing(SystemTime, Option<(String, Option<String>)>),
487 Saving(SystemTime),
489 Loading(SystemTime),
491 Stopping,
493 Stopped,
495 Failed(ActorErrorKind),
497}
498
499impl ActorStatus {
500 pub fn is_terminal(&self) -> bool {
502 self.is_stopped() || self.is_failed()
503 }
504
505 pub fn generic_failure(message: impl Into<String>) -> Self {
507 Self::Failed(ActorErrorKind::Generic(message.into()))
508 }
509
510 fn span_string(&self) -> &'static str {
511 self.arm().unwrap_or_default()
512 }
513}
514
515impl fmt::Display for ActorStatus {
516 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
517 match self {
518 Self::Unknown => write!(f, "unknown"),
519 Self::Created => write!(f, "created"),
520 Self::Initializing => write!(f, "initializing"),
521 Self::Client => write!(f, "client"),
522 Self::Idle => write!(f, "idle"),
523 Self::Processing(instant, None) => {
524 write!(
525 f,
526 "processing for {}ms",
527 RealClock
528 .system_time_now()
529 .duration_since(*instant)
530 .unwrap_or_default()
531 .as_millis()
532 )
533 }
534 Self::Processing(instant, Some((handler, None))) => {
535 write!(
536 f,
537 "{}: processing for {}ms",
538 handler,
539 RealClock
540 .system_time_now()
541 .duration_since(*instant)
542 .unwrap_or_default()
543 .as_millis()
544 )
545 }
546 Self::Processing(instant, Some((handler, Some(arm)))) => {
547 write!(
548 f,
549 "{},{}: processing for {}ms",
550 handler,
551 arm,
552 RealClock
553 .system_time_now()
554 .duration_since(*instant)
555 .unwrap_or_default()
556 .as_millis()
557 )
558 }
559 Self::Saving(instant) => {
560 write!(
561 f,
562 "saving for {}ms",
563 RealClock
564 .system_time_now()
565 .duration_since(*instant)
566 .unwrap_or_default()
567 .as_millis()
568 )
569 }
570 Self::Loading(instant) => {
571 write!(
572 f,
573 "loading for {}ms",
574 RealClock
575 .system_time_now()
576 .duration_since(*instant)
577 .unwrap_or_default()
578 .as_millis()
579 )
580 }
581 Self::Stopping => write!(f, "stopping"),
582 Self::Stopped => write!(f, "stopped"),
583 Self::Failed(err) => write!(f, "failed: {}", err),
584 }
585 }
586}
587
588pub struct ActorHandle<A: Actor> {
597 cell: InstanceCell,
598 ports: Arc<Ports<A>>,
599}
600
601impl<A: Actor> ActorHandle<A> {
603 pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
604 Self { cell, ports }
605 }
606
607 pub(crate) fn cell(&self) -> &InstanceCell {
610 &self.cell
611 }
612
613 pub fn actor_id(&self) -> &ActorId {
615 self.cell.actor_id()
616 }
617
618 pub fn drain_and_stop(&self) -> Result<(), ActorError> {
620 tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
621 self.cell.signal(Signal::DrainAndStop)
622 }
623
624 pub fn status(&self) -> watch::Receiver<ActorStatus> {
626 self.cell.status().clone()
627 }
628
629 pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
632 where
633 A: Handler<M>,
634 {
635 self.ports.get().send(message)
636 }
637
638 pub fn port<M: Message>(&self) -> PortHandle<M>
640 where
641 A: Handler<M>,
642 {
643 self.ports.get()
644 }
645
646 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
649 self.cell.bind(self.ports.as_ref())
650 }
651}
652
653impl<A: Actor> IntoFuture for ActorHandle<A> {
657 type Output = ActorStatus;
658 type IntoFuture = BoxFuture<'static, Self::Output>;
659
660 fn into_future(self) -> Self::IntoFuture {
661 let future = async move {
662 let mut status_receiver = self.cell.status().clone();
663 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
664 match result {
665 Err(_) => ActorStatus::Unknown,
666 Ok(status) => status.clone(),
667 }
668 };
669
670 future.boxed()
671 }
672}
673
674impl<A: Actor> Debug for ActorHandle<A> {
675 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
676 f.debug_struct("ActorHandle").field("cell", &"..").finish()
677 }
678}
679
680impl<A: Actor> Clone for ActorHandle<A> {
681 fn clone(&self) -> Self {
682 Self {
683 cell: self.cell.clone(),
684 ports: self.ports.clone(),
685 }
686 }
687}
688
689pub trait Referable: Named {}
704
705pub trait Binds<A: Actor>: Referable {
708 fn bind(ports: &Ports<A>);
710}
711
712pub trait RemoteHandles<M: RemoteMessage>: Referable {}
715
716#[macro_export]
747macro_rules! assert_behaves {
748 ($ty:ty as $behavior:ty) => {
749 const _: fn() = || {
750 fn check<B: hyperactor::actor::Binds<$ty>>() {}
751 check::<$behavior>();
752 };
753 };
754}
755
756#[cfg(test)]
757mod tests {
758 use std::sync::Mutex;
759 use std::time::Duration;
760
761 use tokio::time::timeout;
762
763 use super::*;
764 use crate as hyperactor;
765 use crate::Actor;
766 use crate::OncePortHandle;
767 use crate::PortRef;
768 use crate::checkpoint::CheckpointError;
769 use crate::checkpoint::Checkpointable;
770 use crate::test_utils::pingpong::PingPongActor;
771 use crate::test_utils::pingpong::PingPongMessage;
772 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; #[derive(Debug)]
775 struct EchoActor(PortRef<u64>);
776
777 #[async_trait]
778 impl Actor for EchoActor {}
779
780 #[async_trait]
781 impl Handler<u64> for EchoActor {
782 async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
783 let Self(port) = self;
784 port.send(cx, message)?;
785 Ok(())
786 }
787 }
788
789 #[tokio::test]
790 async fn test_server_basic() {
791 let proc = Proc::local();
792 let client = proc.attach("client").unwrap();
793 let (tx, mut rx) = client.open_port();
794 let actor = EchoActor(tx.bind());
795 let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
796 handle.send(123u64).unwrap();
797 handle.drain_and_stop().unwrap();
798 handle.await;
799
800 assert_eq!(rx.drain(), vec![123u64]);
801 }
802
803 #[tokio::test]
804 async fn test_ping_pong() {
805 let proc = Proc::local();
806 let client = proc.attach("client").unwrap();
807 let (undeliverable_msg_tx, _) = client.open_port();
808
809 let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
810 let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
811 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
812 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
813
814 let (local_port, local_receiver) = client.open_once_port();
815
816 ping_handle
817 .send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
818 .unwrap();
819
820 assert!(local_receiver.recv().await.unwrap());
821 }
822
823 #[tokio::test]
824 async fn test_ping_pong_on_handler_error() {
825 let proc = Proc::local();
826 let client = proc.attach("client").unwrap();
827 let (undeliverable_msg_tx, _) = client.open_port();
828
829 ProcSupervisionCoordinator::set(&proc).await.unwrap();
832
833 let error_ttl = 66;
834
835 let ping_actor =
836 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
837 let pong_actor =
838 PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
839 let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
840 let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
841
842 let (local_port, local_receiver) = client.open_once_port();
843
844 ping_handle
845 .send(PingPongMessage(
846 error_ttl + 1, pong_handle.bind(),
848 local_port.bind(),
849 ))
850 .unwrap();
851
852 #[allow(clippy::disallowed_methods)]
854 let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
855 timeout(Duration::from_secs(5), local_receiver.recv()).await;
856 assert!(res.is_err());
857 }
858
859 #[derive(Debug)]
860 struct InitActor(bool);
861
862 #[async_trait]
863 impl Actor for InitActor {
864 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
865 self.0 = true;
866 Ok(())
867 }
868 }
869
870 #[async_trait]
871 impl Handler<OncePortHandle<bool>> for InitActor {
872 async fn handle(
873 &mut self,
874 _cx: &Context<Self>,
875 port: OncePortHandle<bool>,
876 ) -> Result<(), anyhow::Error> {
877 port.send(self.0)?;
878 Ok(())
879 }
880 }
881
882 #[tokio::test]
883 async fn test_init() {
884 let proc = Proc::local();
885 let actor = InitActor(false);
886 let handle = proc.spawn::<InitActor>("init", actor).unwrap();
887 let client = proc.attach("client").unwrap();
888
889 let (port, receiver) = client.open_once_port();
890 handle.send(port).unwrap();
891 assert!(receiver.recv().await.unwrap());
892
893 handle.drain_and_stop().unwrap();
894 handle.await;
895 }
896
897 #[derive(Debug)]
898 struct CheckpointActor {
899 sum: u64,
901 port: PortRef<u64>,
902 }
903
904 #[async_trait]
905 impl Actor for CheckpointActor {}
906
907 #[async_trait]
908 impl Handler<u64> for CheckpointActor {
909 async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
910 self.sum += value;
911 self.port.send(cx, self.sum)?;
912 Ok(())
913 }
914 }
915
916 #[async_trait]
917 impl Checkpointable for CheckpointActor {
918 type State = (u64, PortRef<u64>);
919
920 async fn save(&self) -> Result<Self::State, CheckpointError> {
921 Ok((self.sum, self.port.clone()))
922 }
923
924 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
925 let (sum, port) = state;
926 Ok(CheckpointActor { sum, port })
927 }
928 }
929
930 type MultiValues = Arc<Mutex<(u64, String)>>;
931
932 struct MultiValuesTest {
933 proc: Proc,
934 values: MultiValues,
935 handle: ActorHandle<MultiActor>,
936 client: Instance<()>,
937 _client_handle: ActorHandle<()>,
938 }
939
940 impl MultiValuesTest {
941 async fn new() -> Self {
942 let proc = Proc::local();
943 let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
944 let actor = MultiActor(values.clone());
945 let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
946 let (client, client_handle) = proc.instance("client").unwrap();
947 Self {
948 proc,
949 values,
950 handle,
951 client,
952 _client_handle: client_handle,
953 }
954 }
955
956 fn send<M>(&self, message: M)
957 where
958 M: RemoteMessage,
959 MultiActor: Handler<M>,
960 {
961 self.handle.send(message).unwrap()
962 }
963
964 async fn sync(&self) {
965 let (port, done) = self.client.open_once_port::<bool>();
966 self.handle.send(port).unwrap();
967 assert!(done.recv().await.unwrap());
968 }
969
970 fn get_values(&self) -> (u64, String) {
971 self.values.lock().unwrap().clone()
972 }
973 }
974
975 #[derive(Debug)]
976 #[hyperactor::export(handlers = [u64, String])]
977 struct MultiActor(MultiValues);
978
979 #[async_trait]
980 impl Actor for MultiActor {}
981
982 #[async_trait]
983 impl Handler<u64> for MultiActor {
984 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
985 let mut vals = self.0.lock().unwrap();
986 vals.0 = message;
987 Ok(())
988 }
989 }
990
991 #[async_trait]
992 impl Handler<String> for MultiActor {
993 async fn handle(
994 &mut self,
995 _cx: &Context<Self>,
996 message: String,
997 ) -> Result<(), anyhow::Error> {
998 let mut vals = self.0.lock().unwrap();
999 vals.1 = message;
1000 Ok(())
1001 }
1002 }
1003
1004 #[async_trait]
1005 impl Handler<OncePortHandle<bool>> for MultiActor {
1006 async fn handle(
1007 &mut self,
1008 _cx: &Context<Self>,
1009 message: OncePortHandle<bool>,
1010 ) -> Result<(), anyhow::Error> {
1011 message.send(true).unwrap();
1012 Ok(())
1013 }
1014 }
1015
1016 #[tokio::test]
1017 async fn test_multi_handler_refs() {
1018 let test = MultiValuesTest::new().await;
1019
1020 test.send(123u64);
1021 test.send("foo".to_string());
1022 test.sync().await;
1023 assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1024
1025 let myref: ActorRef<MultiActor> = test.handle.bind();
1026
1027 myref.port().send(&test.client, 321u64).unwrap();
1028 test.sync().await;
1029 assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1030
1031 myref.port().send(&test.client, "bar".to_string()).unwrap();
1032 test.sync().await;
1033 assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1034 }
1035
1036 #[tokio::test]
1037 async fn test_ref_behavior() {
1038 let test = MultiValuesTest::new().await;
1039
1040 test.send(123u64);
1041 test.send("foo".to_string());
1042
1043 hyperactor::behavior!(MyActorBehavior, u64, String);
1044
1045 let myref: ActorRef<MyActorBehavior> = test.handle.bind();
1046 myref.port().send(&test.client, "biz".to_string()).unwrap();
1047 myref.port().send(&test.client, 999u64).unwrap();
1048
1049 test.sync().await;
1050 assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1051 }
1052
1053 #[tokio::test]
1054 async fn test_actor_handle_downcast() {
1055 #[derive(Debug, Default)]
1056 struct NothingActor;
1057
1058 impl Actor for NothingActor {}
1059
1060 let proc = Proc::local();
1063 let handle = proc.spawn("nothing", NothingActor).unwrap();
1064 let cell = handle.cell();
1065
1066 assert!(cell.downcast_handle::<EchoActor>().is_none());
1068
1069 let handle = cell.downcast_handle::<NothingActor>().unwrap();
1070 handle.drain_and_stop().unwrap();
1071 handle.await;
1072 }
1073}