1#![allow(dead_code)] use std::any::TypeId;
14use std::fmt;
15use std::fmt::Debug;
16use std::future::Future;
17use std::future::IntoFuture;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use async_trait::async_trait;
23use futures::FutureExt;
24use futures::future::BoxFuture;
25use serde::Deserialize;
26use serde::Serialize;
27use tokio::sync::watch;
28use tokio::task::JoinHandle;
29
30use crate as hyperactor; use crate::ActorRef;
32use crate::Data;
33use crate::Message;
34use crate::Named;
35use crate::RemoteMessage;
36use crate::cap;
37use crate::checkpoint::CheckpointError;
38use crate::checkpoint::Checkpointable;
39use crate::clock::Clock;
40use crate::clock::RealClock;
41use crate::mailbox::MailboxError;
42use crate::mailbox::MailboxSenderError;
43use crate::mailbox::MessageEnvelope;
44use crate::mailbox::PortHandle;
45use crate::mailbox::Undeliverable;
46use crate::mailbox::UndeliverableMessageError;
47use crate::mailbox::log::MessageLogError;
48use crate::message::Castable;
49use crate::message::IndexedErasedUnbound;
50use crate::proc::Context;
51use crate::proc::Instance;
52use crate::proc::InstanceCell;
53use crate::proc::Ports;
54use crate::proc::Proc;
55use crate::reference::ActorId;
56use crate::reference::Index;
57use crate::supervision::ActorSupervisionEvent;
58
59pub mod remote;
60
61#[async_trait]
69pub trait Actor: Sized + Send + Debug + 'static {
70 type Params: Send + 'static;
72
73 async fn new(params: Self::Params) -> Result<Self, anyhow::Error>;
75
76 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
81 Ok(())
83 }
84
85 async fn spawn(
88 cap: &impl cap::CanSpawn,
89 params: Self::Params,
90 ) -> anyhow::Result<ActorHandle<Self>> {
91 cap.spawn(params).await
92 }
93
94 async fn spawn_detached(params: Self::Params) -> Result<ActorHandle<Self>, anyhow::Error> {
101 Proc::local().spawn("anon", params).await
102 }
103
104 #[hyperactor::instrument_infallible]
108 fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
109 where
110 F: Future + Send + 'static,
111 F::Output: Send + 'static,
112 {
113 tokio::spawn(future)
114 }
115
116 async fn handle_supervision_event(
118 &mut self,
119 _this: &Instance<Self>,
120 _event: &ActorSupervisionEvent,
121 ) -> Result<bool, anyhow::Error> {
122 Ok(false)
124 }
125
126 async fn handle_undeliverable_message(
128 &mut self,
129 cx: &Instance<Self>,
130 Undeliverable(envelope): Undeliverable<MessageEnvelope>,
131 ) -> Result<(), anyhow::Error> {
132 assert_eq!(envelope.sender(), cx.self_id());
133
134 anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
135 }
136}
137
138#[async_trait]
141impl Actor for () {
142 type Params = ();
143
144 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
145 Ok(params)
146 }
147}
148
149#[async_trait]
151pub trait Handler<M>: Actor {
152 async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
154}
155
156#[async_trait]
159impl<A: Actor> Handler<Signal> for A {
160 async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
161 unimplemented!("signal handler should not be called directly")
162 }
163}
164
165#[async_trait]
168impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
169 async fn handle(
170 &mut self,
171 cx: &Context<Self>,
172 message: Undeliverable<MessageEnvelope>,
173 ) -> Result<(), anyhow::Error> {
174 self.handle_undeliverable_message(cx, message).await
175 }
176}
177
178#[async_trait]
181impl<A, M> Handler<IndexedErasedUnbound<M>> for A
182where
183 A: Handler<M>,
184 M: Castable,
185{
186 async fn handle(
187 &mut self,
188 cx: &Context<Self>,
189 msg: IndexedErasedUnbound<M>,
190 ) -> anyhow::Result<()> {
191 let message = msg.downcast()?.bind()?;
192 Handler::handle(self, cx, message).await
193 }
194}
195
196pub trait RemotableActor: Actor
199where
200 Self::Params: RemoteMessage,
201{
202 fn gspawn(
206 proc: &Proc,
207 name: &str,
208 serialized_params: Data,
209 ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>>;
210
211 fn get_type_id() -> TypeId {
213 TypeId::of::<Self>()
214 }
215}
216
217impl<A> RemotableActor for A
218where
219 A: Actor + RemoteActor,
220 A: Binds<A>,
221 A::Params: RemoteMessage,
222{
223 fn gspawn(
224 proc: &Proc,
225 name: &str,
226 serialized_params: Data,
227 ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>> {
228 let proc = proc.clone();
230 let name = name.to_string();
231 Box::pin(async move {
232 let handle = proc
233 .spawn::<A>(&name, bincode::deserialize(&serialized_params)?)
234 .await?;
235 Ok(handle.bind::<A>().actor_id)
240 })
241 }
242}
243
244#[async_trait]
245impl<T> Checkpointable for T
246where
247 T: RemoteMessage + Clone,
248{
249 type State = T;
250 async fn save(&self) -> Result<Self::State, CheckpointError> {
251 Ok(self.clone())
252 }
253
254 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
255 Ok(state)
256 }
257}
258
259#[derive(Debug)]
262pub struct ActorError {
263 pub(crate) actor_id: ActorId,
264 pub(crate) kind: ActorErrorKind,
265}
266
267#[derive(thiserror::Error, Debug)]
269pub enum ActorErrorKind {
270 #[error("processing error: {0}")]
273 Processing(#[source] anyhow::Error),
274
275 #[error("panic: {0}")]
277 Panic(#[source] anyhow::Error),
278
279 #[error("initialization error: {0}")]
281 Init(#[source] anyhow::Error),
282
283 #[error(transparent)]
285 Mailbox(#[from] MailboxError),
286
287 #[error(transparent)]
289 MailboxSender(#[from] MailboxSenderError),
290
291 #[error("checkpoint error: {0}")]
293 Checkpoint(#[source] CheckpointError),
294
295 #[error("message log error: {0}")]
297 MessageLog(#[source] MessageLogError),
298
299 #[error("actor is in an indeterminate state")]
301 IndeterminateState,
302
303 #[error("supervision: {0}")]
305 UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
306
307 #[error("{0}")]
310 Passthrough(#[from] anyhow::Error),
311}
312
313impl ActorError {
314 pub(crate) fn new(actor_id: ActorId, kind: ActorErrorKind) -> Self {
316 Self { actor_id, kind }
317 }
318
319 fn passthrough(&self) -> Self {
321 ActorError::new(
322 self.actor_id.clone(),
323 ActorErrorKind::Passthrough(anyhow::anyhow!("{}", self.kind)),
324 )
325 }
326}
327
328impl fmt::Display for ActorError {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 write!(f, "serving {}: ", self.actor_id)?;
331 fmt::Display::fmt(&self.kind, f)
332 }
333}
334
335impl std::error::Error for ActorError {
336 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
337 self.kind.source()
338 }
339}
340
341impl From<MailboxError> for ActorError {
342 fn from(inner: MailboxError) -> Self {
343 Self::new(inner.actor_id().clone(), ActorErrorKind::from(inner))
344 }
345}
346
347impl From<MailboxSenderError> for ActorError {
348 fn from(inner: MailboxSenderError) -> Self {
349 Self::new(
350 inner.location().actor_id().clone(),
351 ActorErrorKind::from(inner),
352 )
353 }
354}
355
356impl From<ActorSupervisionEvent> for ActorError {
357 fn from(inner: ActorSupervisionEvent) -> Self {
358 Self::new(
359 inner.actor_id.clone(),
360 ActorErrorKind::UnhandledSupervisionEvent(inner),
361 )
362 }
363}
364
365#[derive(Clone, Debug, Serialize, Deserialize, Named)]
372pub enum Signal {
373 DrainAndStop,
375
376 Stop,
378
379 ChildStopped(Index),
381}
382
383impl fmt::Display for Signal {
384 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385 match self {
386 Signal::DrainAndStop => write!(f, "DrainAndStop"),
387 Signal::Stop => write!(f, "Stop"),
388 Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
389 }
390 }
391}
392
393#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)]
395pub enum ActorStatus {
396 Unknown,
398 Created,
400 Initializing,
402 Client,
405 Idle,
407 Processing(SystemTime, Option<(String, Option<String>)>),
412 Saving(SystemTime),
414 Loading(SystemTime),
416 Stopping,
418 Stopped,
420 Failed(String),
423}
424
425impl ActorStatus {
426 pub(crate) fn is_terminal(&self) -> bool {
428 matches!(self, Self::Stopped | Self::Failed(_))
429 }
430
431 pub(crate) fn is_failed(&self) -> bool {
433 matches!(self, Self::Failed(_))
434 }
435
436 fn passthrough(&self) -> Self {
439 match self {
440 Self::Unknown => Self::Unknown,
441 Self::Created => Self::Created,
442 Self::Initializing => Self::Initializing,
443 Self::Client => Self::Client,
444 Self::Idle => Self::Idle,
445 Self::Processing(instant, handler) => {
446 Self::Processing(instant.clone(), handler.clone())
447 }
448 Self::Saving(instant) => Self::Saving(instant.clone()),
449 Self::Loading(instant) => Self::Loading(instant.clone()),
450 Self::Stopping => Self::Stopping,
451 Self::Stopped => Self::Stopped,
452 Self::Failed(err) => Self::Failed(err.clone()),
453 }
454 }
455 fn span_string(&self) -> &'static str {
456 self.arm().unwrap_or_default()
457 }
458}
459
460impl fmt::Display for ActorStatus {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 match self {
463 Self::Unknown => write!(f, "unknown"),
464 Self::Created => write!(f, "created"),
465 Self::Initializing => write!(f, "initializing"),
466 Self::Client => write!(f, "client"),
467 Self::Idle => write!(f, "idle"),
468 Self::Processing(instant, None) => {
469 write!(
470 f,
471 "processing for {}ms",
472 RealClock
473 .system_time_now()
474 .duration_since(instant.clone())
475 .unwrap_or_default()
476 .as_millis()
477 )
478 }
479 Self::Processing(instant, Some((handler, None))) => {
480 write!(
481 f,
482 "{}: processing for {}ms",
483 handler,
484 RealClock
485 .system_time_now()
486 .duration_since(instant.clone())
487 .unwrap_or_default()
488 .as_millis()
489 )
490 }
491 Self::Processing(instant, Some((handler, Some(arm)))) => {
492 write!(
493 f,
494 "{},{}: processing for {}ms",
495 handler,
496 arm,
497 RealClock
498 .system_time_now()
499 .duration_since(instant.clone())
500 .unwrap_or_default()
501 .as_millis()
502 )
503 }
504 Self::Saving(instant) => {
505 write!(
506 f,
507 "saving for {}ms",
508 RealClock
509 .system_time_now()
510 .duration_since(instant.clone())
511 .unwrap_or_default()
512 .as_millis()
513 )
514 }
515 Self::Loading(instant) => {
516 write!(
517 f,
518 "loading for {}ms",
519 RealClock
520 .system_time_now()
521 .duration_since(instant.clone())
522 .unwrap_or_default()
523 .as_millis()
524 )
525 }
526 Self::Stopping => write!(f, "stopping"),
527 Self::Stopped => write!(f, "stopped"),
528 Self::Failed(err) => write!(f, "failed: {}", err),
529 }
530 }
531}
532
533pub struct ActorHandle<A: Actor> {
542 cell: InstanceCell,
543 ports: Arc<Ports<A>>,
544}
545
546impl<A: Actor> ActorHandle<A> {
548 pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
549 Self { cell, ports }
550 }
551
552 pub(crate) fn cell(&self) -> &InstanceCell {
555 &self.cell
556 }
557
558 pub fn actor_id(&self) -> &ActorId {
560 self.cell.actor_id()
561 }
562
563 #[allow(clippy::result_large_err)] pub fn drain_and_stop(&self) -> Result<(), ActorError> {
566 tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
567 self.cell.signal(Signal::DrainAndStop)
568 }
569
570 pub fn status(&self) -> watch::Receiver<ActorStatus> {
572 self.cell.status().clone()
573 }
574
575 #[allow(clippy::result_large_err)] pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
579 where
580 A: Handler<M>,
581 {
582 self.ports.get().send(message)
583 }
584
585 pub fn port<M: Message>(&self) -> PortHandle<M>
587 where
588 A: Handler<M>,
589 {
590 self.ports.get()
591 }
592
593 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
596 self.cell.bind(self.ports.as_ref())
597 }
598}
599
600impl<A: Actor> IntoFuture for ActorHandle<A> {
604 type Output = ActorStatus;
605 type IntoFuture = BoxFuture<'static, Self::Output>;
606
607 fn into_future(self) -> Self::IntoFuture {
608 let future = async move {
609 let mut status_receiver = self.cell.status().clone();
610 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
611 match result {
612 Err(_) => ActorStatus::Unknown,
613 Ok(status) => status.passthrough(),
614 }
615 };
616
617 future.boxed()
618 }
619}
620
621impl<A: Actor> Debug for ActorHandle<A> {
622 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
623 f.debug_struct("ActorHandle").field("cell", &"..").finish()
624 }
625}
626
627impl<A: Actor> Clone for ActorHandle<A> {
628 fn clone(&self) -> Self {
629 Self {
630 cell: self.cell.clone(),
631 ports: self.ports.clone(),
632 }
633 }
634}
635
636pub trait RemoteActor: Named + Send + Sync {}
641
642pub trait Binds<A: Actor>: RemoteActor {
645 fn bind(ports: &Ports<A>);
647}
648
649pub trait RemoteHandles<M: RemoteMessage>: RemoteActor {}
652
653#[cfg(test)]
654mod tests {
655 use std::sync::Mutex;
656 use std::time::Duration;
657
658 use tokio::time::timeout;
659
660 use super::*;
661 use crate as hyperactor;
662 use crate::Actor;
663 use crate::Mailbox;
664 use crate::OncePortHandle;
665 use crate::PortRef;
666 use crate::checkpoint::CheckpointError;
667 use crate::checkpoint::Checkpointable;
668 use crate::test_utils::pingpong::PingPongActor;
669 use crate::test_utils::pingpong::PingPongActorParams;
670 use crate::test_utils::pingpong::PingPongMessage;
671 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; #[derive(Debug)]
674 struct EchoActor(PortRef<u64>);
675
676 #[async_trait]
677 impl Actor for EchoActor {
678 type Params = PortRef<u64>;
679
680 async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
681 Ok(Self(params))
682 }
683 }
684
685 #[async_trait]
686 impl Handler<u64> for EchoActor {
687 async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
688 let Self(port) = self;
689 port.send(cx, message)?;
690 Ok(())
691 }
692 }
693
694 #[tokio::test]
695 async fn test_server_basic() {
696 let proc = Proc::local();
697 let client = proc.attach("client").unwrap();
698 let (tx, mut rx) = client.open_port();
699 let handle = proc.spawn::<EchoActor>("echo", tx.bind()).await.unwrap();
700 handle.send(123u64).unwrap();
701 handle.drain_and_stop().unwrap();
702 handle.await;
703
704 assert_eq!(rx.drain(), vec![123u64]);
705 }
706
707 #[tokio::test]
708 async fn test_ping_pong() {
709 let proc = Proc::local();
710 let client = proc.attach("client").unwrap();
711 let (undeliverable_msg_tx, _) = client.open_port();
712
713 let ping_pong_actor_params =
714 PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
715 let ping_handle = proc
716 .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
717 .await
718 .unwrap();
719 let pong_handle = proc
720 .spawn::<PingPongActor>("pong", ping_pong_actor_params)
721 .await
722 .unwrap();
723
724 let (local_port, local_receiver) = client.open_once_port();
725
726 ping_handle
727 .send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
728 .unwrap();
729
730 assert!(local_receiver.recv().await.unwrap());
731 }
732
733 #[tokio::test]
734 async fn test_ping_pong_on_handler_error() {
735 let proc = Proc::local();
736 let client = proc.attach("client").unwrap();
737 let (undeliverable_msg_tx, _) = client.open_port();
738
739 ProcSupervisionCoordinator::set(&proc).await.unwrap();
742
743 let error_ttl = 66;
744 let ping_pong_actor_params =
745 PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl));
746 let ping_handle = proc
747 .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
748 .await
749 .unwrap();
750 let pong_handle = proc
751 .spawn::<PingPongActor>("pong", ping_pong_actor_params)
752 .await
753 .unwrap();
754
755 let (local_port, local_receiver) = client.open_once_port();
756
757 ping_handle
758 .send(PingPongMessage(
759 error_ttl + 1, pong_handle.bind(),
761 local_port.bind(),
762 ))
763 .unwrap();
764
765 #[allow(clippy::disallowed_methods)]
767 let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
768 timeout(Duration::from_secs(5), local_receiver.recv()).await;
769 assert!(res.is_err());
770 }
771
772 #[derive(Debug)]
773 struct InitActor(bool);
774
775 #[async_trait]
776 impl Actor for InitActor {
777 type Params = ();
778
779 async fn new(_params: ()) -> Result<Self, anyhow::Error> {
780 Ok(Self(false))
781 }
782
783 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
784 self.0 = true;
785 Ok(())
786 }
787 }
788
789 #[async_trait]
790 impl Handler<OncePortHandle<bool>> for InitActor {
791 async fn handle(
792 &mut self,
793 _cx: &Context<Self>,
794 port: OncePortHandle<bool>,
795 ) -> Result<(), anyhow::Error> {
796 port.send(self.0)?;
797 Ok(())
798 }
799 }
800
801 #[tokio::test]
802 async fn test_init() {
803 let proc = Proc::local();
804 let handle = proc.spawn::<InitActor>("init", ()).await.unwrap();
805 let client = proc.attach("client").unwrap();
806
807 let (port, receiver) = client.open_once_port();
808 handle.send(port).unwrap();
809 assert!(receiver.recv().await.unwrap());
810
811 handle.drain_and_stop().unwrap();
812 handle.await;
813 }
814
815 #[derive(Debug)]
816 struct CheckpointActor {
817 sum: u64,
819 port: PortRef<u64>,
820 }
821
822 #[async_trait]
823 impl Actor for CheckpointActor {
824 type Params = PortRef<u64>;
825
826 async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
827 Ok(Self {
828 sum: 0,
829 port: params,
830 })
831 }
832 }
833
834 #[async_trait]
835 impl Handler<u64> for CheckpointActor {
836 async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
837 self.sum += value;
838 self.port.send(cx, self.sum)?;
839 Ok(())
840 }
841 }
842
843 #[async_trait]
844 impl Checkpointable for CheckpointActor {
845 type State = (u64, PortRef<u64>);
846
847 async fn save(&self) -> Result<Self::State, CheckpointError> {
848 Ok((self.sum, self.port.clone()))
849 }
850
851 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
852 let (sum, port) = state;
853 Ok(CheckpointActor { sum, port })
854 }
855 }
856
857 type MultiValues = Arc<Mutex<(u64, String)>>;
858
859 struct MultiValuesTest {
860 proc: Proc,
861 values: MultiValues,
862 handle: ActorHandle<MultiActor>,
863 client: Mailbox,
864 }
865
866 impl MultiValuesTest {
867 async fn new() -> Self {
868 let proc = Proc::local();
869 let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
870 let handle = proc
871 .spawn::<MultiActor>("myactor", values.clone())
872 .await
873 .unwrap();
874 let client = proc.attach("client").unwrap();
875 Self {
876 proc,
877 values,
878 handle,
879 client,
880 }
881 }
882
883 fn send<M>(&self, message: M)
884 where
885 M: RemoteMessage,
886 MultiActor: Handler<M>,
887 {
888 self.handle.send(message).unwrap()
889 }
890
891 async fn sync(&self) {
892 let (port, done) = self.client.open_once_port::<bool>();
893 self.handle.send(port).unwrap();
894 assert!(done.recv().await.unwrap());
895 }
896
897 fn get_values(&self) -> (u64, String) {
898 self.values.lock().unwrap().clone()
899 }
900 }
901
902 #[derive(Debug)]
903 #[hyperactor::export(handlers = [u64, String])]
904 struct MultiActor(MultiValues);
905
906 #[async_trait]
907 impl Actor for MultiActor {
908 type Params = MultiValues;
909
910 async fn new(init: Self::Params) -> Result<Self, anyhow::Error> {
911 Ok(Self(init))
912 }
913 }
914
915 #[async_trait]
916 impl Handler<u64> for MultiActor {
917 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
918 let mut vals = self.0.lock().unwrap();
919 vals.0 = message;
920 Ok(())
921 }
922 }
923
924 #[async_trait]
925 impl Handler<String> for MultiActor {
926 async fn handle(
927 &mut self,
928 _cx: &Context<Self>,
929 message: String,
930 ) -> Result<(), anyhow::Error> {
931 let mut vals = self.0.lock().unwrap();
932 vals.1 = message;
933 Ok(())
934 }
935 }
936
937 #[async_trait]
938 impl Handler<OncePortHandle<bool>> for MultiActor {
939 async fn handle(
940 &mut self,
941 _cx: &Context<Self>,
942 message: OncePortHandle<bool>,
943 ) -> Result<(), anyhow::Error> {
944 message.send(true).unwrap();
945 Ok(())
946 }
947 }
948
949 #[tokio::test]
950 async fn test_multi_handler_refs() {
951 let test = MultiValuesTest::new().await;
952
953 test.send(123u64);
954 test.send("foo".to_string());
955 test.sync().await;
956 assert_eq!(test.get_values(), (123u64, "foo".to_string()));
957
958 let myref: ActorRef<MultiActor> = test.handle.bind();
959
960 myref.port().send(&test.client, 321u64).unwrap();
961 test.sync().await;
962 assert_eq!(test.get_values(), (321u64, "foo".to_string()));
963
964 myref.port().send(&test.client, "bar".to_string()).unwrap();
965 test.sync().await;
966 assert_eq!(test.get_values(), (321u64, "bar".to_string()));
967 }
968
969 #[tokio::test]
970 async fn test_ref_alias() {
971 let test = MultiValuesTest::new().await;
972
973 test.send(123u64);
974 test.send("foo".to_string());
975
976 hyperactor::alias!(MyActorAlias, u64, String);
977
978 let myref: ActorRef<MyActorAlias> = test.handle.bind();
979 myref.port().send(&test.client, "biz".to_string()).unwrap();
980 myref.port().send(&test.client, 999u64).unwrap();
981
982 test.sync().await;
983 assert_eq!(test.get_values(), (999u64, "biz".to_string()));
984 }
985
986 #[tokio::test]
987 async fn test_actor_handle_downcast() {
988 #[derive(Debug, Default, Actor)]
989 struct NothingActor;
990
991 let proc = Proc::local();
994 let handle = proc.spawn::<NothingActor>("nothing", ()).await.unwrap();
995 let cell = handle.cell();
996
997 assert!(cell.downcast_handle::<EchoActor>().is_none());
999
1000 let handle = cell.downcast_handle::<NothingActor>().unwrap();
1001 handle.drain_and_stop().unwrap();
1002 handle.await;
1003 }
1004}