1#![allow(dead_code)] use std::any::TypeId;
14use std::fmt;
15use std::fmt::Debug;
16use std::future::Future;
17use std::future::IntoFuture;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use async_trait::async_trait;
23use futures::FutureExt;
24use futures::future::BoxFuture;
25use serde::Deserialize;
26use serde::Serialize;
27use tokio::sync::watch;
28use tokio::task::JoinHandle;
29
30use crate as hyperactor; use crate::ActorRef;
32use crate::Data;
33use crate::Message;
34use crate::Named;
35use crate::RemoteMessage;
36use crate::checkpoint::CheckpointError;
37use crate::checkpoint::Checkpointable;
38use crate::clock::Clock;
39use crate::clock::RealClock;
40use crate::context;
41use crate::mailbox::MailboxError;
42use crate::mailbox::MailboxSenderError;
43use crate::mailbox::MessageEnvelope;
44use crate::mailbox::PortHandle;
45use crate::mailbox::Undeliverable;
46use crate::mailbox::UndeliverableMessageError;
47use crate::mailbox::log::MessageLogError;
48use crate::message::Castable;
49use crate::message::IndexedErasedUnbound;
50use crate::proc::Context;
51use crate::proc::Instance;
52use crate::proc::InstanceCell;
53use crate::proc::Ports;
54use crate::proc::Proc;
55use crate::reference::ActorId;
56use crate::reference::Index;
57use crate::supervision::ActorSupervisionEvent;
58
59pub mod remote;
60
61#[async_trait]
69pub trait Actor: Sized + Send + Debug + 'static {
70 type Params: Send + 'static;
72
73 async fn new(params: Self::Params) -> Result<Self, anyhow::Error>;
75
76 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
81 Ok(())
83 }
84
85 async fn spawn(
88 cx: &impl context::Actor,
89 params: Self::Params,
90 ) -> anyhow::Result<ActorHandle<Self>> {
91 cx.instance().spawn(params).await
92 }
93
94 async fn spawn_detached(params: Self::Params) -> Result<ActorHandle<Self>, anyhow::Error> {
101 Proc::local().spawn("anon", params).await
102 }
103
104 #[hyperactor::instrument_infallible]
108 fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
109 where
110 F: Future + Send + 'static,
111 F::Output: Send + 'static,
112 {
113 tokio::spawn(future)
114 }
115
116 async fn handle_supervision_event(
118 &mut self,
119 _this: &Instance<Self>,
120 _event: &ActorSupervisionEvent,
121 ) -> Result<bool, anyhow::Error> {
122 Ok(false)
124 }
125
126 async fn handle_undeliverable_message(
128 &mut self,
129 cx: &Instance<Self>,
130 envelope: Undeliverable<MessageEnvelope>,
131 ) -> Result<(), anyhow::Error> {
132 handle_undeliverable_message(cx, envelope)
133 }
134}
135
136pub fn handle_undeliverable_message<A: Actor>(
140 cx: &Instance<A>,
141 Undeliverable(envelope): Undeliverable<MessageEnvelope>,
142) -> Result<(), anyhow::Error> {
143 assert_eq!(envelope.sender(), cx.self_id());
144
145 anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
146}
147
148#[async_trait]
151impl Actor for () {
152 type Params = ();
153
154 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
155 Ok(params)
156 }
157}
158
159impl Referable for () {}
160
161impl Binds<()> for () {
162 fn bind(_ports: &Ports<Self>) {
163 }
165}
166
167#[async_trait]
169pub trait Handler<M>: Actor {
170 async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
172}
173
174#[async_trait]
177impl<A: Actor> Handler<Signal> for A {
178 async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
179 unimplemented!("signal handler should not be called directly")
180 }
181}
182
183#[async_trait]
186impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
187 async fn handle(
188 &mut self,
189 cx: &Context<Self>,
190 message: Undeliverable<MessageEnvelope>,
191 ) -> Result<(), anyhow::Error> {
192 self.handle_undeliverable_message(cx, message).await
193 }
194}
195
196#[async_trait]
199impl<A, M> Handler<IndexedErasedUnbound<M>> for A
200where
201 A: Handler<M>,
202 M: Castable,
203{
204 async fn handle(
205 &mut self,
206 cx: &Context<Self>,
207 msg: IndexedErasedUnbound<M>,
208 ) -> anyhow::Result<()> {
209 let message = msg.downcast()?.bind()?;
210 Handler::handle(self, cx, message).await
211 }
212}
213
214pub trait RemotableActor: Actor
243where
244 Self::Params: RemoteMessage,
245{
246 fn gspawn(
251 proc: &Proc,
252 name: &str,
253 serialized_params: Data,
254 ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>>;
255
256 fn get_type_id() -> TypeId {
258 TypeId::of::<Self>()
259 }
260}
261
262impl<A> RemotableActor for A
263where
264 A: Actor + Referable + Binds<A>,
265 A::Params: RemoteMessage,
266{
267 fn gspawn(
268 proc: &Proc,
269 name: &str,
270 serialized_params: Data,
271 ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>> {
272 let proc = proc.clone();
273 let name = name.to_string();
274 Box::pin(async move {
275 let handle = proc
276 .spawn::<A>(&name, bincode::deserialize(&serialized_params)?)
277 .await?;
278 Ok(handle.bind::<A>().actor_id)
290 })
291 }
292}
293
294#[async_trait]
295impl<T> Checkpointable for T
296where
297 T: RemoteMessage + Clone,
298{
299 type State = T;
300 async fn save(&self) -> Result<Self::State, CheckpointError> {
301 Ok(self.clone())
302 }
303
304 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
305 Ok(state)
306 }
307}
308
309#[derive(Debug)]
312pub struct ActorError {
313 pub(crate) actor_id: ActorId,
314 pub(crate) kind: ActorErrorKind,
315}
316
317#[derive(thiserror::Error, Debug)]
319pub enum ActorErrorKind {
320 #[error("processing error: {0}")]
323 Processing(#[source] anyhow::Error),
324
325 #[error("panic: {0}")]
327 Panic(#[source] anyhow::Error),
328
329 #[error("initialization error: {0}")]
331 Init(#[source] anyhow::Error),
332
333 #[error(transparent)]
335 Mailbox(#[from] MailboxError),
336
337 #[error(transparent)]
339 MailboxSender(#[from] MailboxSenderError),
340
341 #[error("checkpoint error: {0}")]
343 Checkpoint(#[source] CheckpointError),
344
345 #[error("message log error: {0}")]
347 MessageLog(#[source] MessageLogError),
348
349 #[error("actor is in an indeterminate state")]
351 IndeterminateState,
352
353 #[error("supervision: {0}")]
355 UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
356
357 #[error("{0}")]
360 Passthrough(#[from] anyhow::Error),
361}
362
363impl ActorError {
364 pub(crate) fn new(actor_id: ActorId, kind: ActorErrorKind) -> Self {
366 Self { actor_id, kind }
367 }
368
369 fn passthrough(&self) -> Self {
371 ActorError::new(
372 self.actor_id.clone(),
373 ActorErrorKind::Passthrough(anyhow::anyhow!("{}", self.kind)),
374 )
375 }
376}
377
378impl fmt::Display for ActorError {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 write!(f, "serving {}: ", self.actor_id)?;
381 fmt::Display::fmt(&self.kind, f)
382 }
383}
384
385impl std::error::Error for ActorError {
386 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
387 self.kind.source()
388 }
389}
390
391impl From<MailboxError> for ActorError {
392 fn from(inner: MailboxError) -> Self {
393 Self::new(inner.actor_id().clone(), ActorErrorKind::from(inner))
394 }
395}
396
397impl From<MailboxSenderError> for ActorError {
398 fn from(inner: MailboxSenderError) -> Self {
399 Self::new(
400 inner.location().actor_id().clone(),
401 ActorErrorKind::from(inner),
402 )
403 }
404}
405
406impl From<ActorSupervisionEvent> for ActorError {
407 fn from(inner: ActorSupervisionEvent) -> Self {
408 Self::new(
409 inner.actor_id.clone(),
410 ActorErrorKind::UnhandledSupervisionEvent(inner),
411 )
412 }
413}
414
415#[derive(Clone, Debug, Serialize, Deserialize, Named)]
422pub enum Signal {
423 DrainAndStop,
425
426 Stop,
428
429 ChildStopped(Index),
431}
432
433impl fmt::Display for Signal {
434 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435 match self {
436 Signal::DrainAndStop => write!(f, "DrainAndStop"),
437 Signal::Stop => write!(f, "Stop"),
438 Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
439 }
440 }
441}
442
443#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)]
445pub enum ActorStatus {
446 Unknown,
448 Created,
450 Initializing,
452 Client,
455 Idle,
457 Processing(SystemTime, Option<(String, Option<String>)>),
462 Saving(SystemTime),
464 Loading(SystemTime),
466 Stopping,
468 Stopped,
470 Failed(String),
473}
474
475impl ActorStatus {
476 pub(crate) fn is_terminal(&self) -> bool {
478 matches!(self, Self::Stopped | Self::Failed(_))
479 }
480
481 pub(crate) fn is_failed(&self) -> bool {
483 matches!(self, Self::Failed(_))
484 }
485
486 fn passthrough(&self) -> Self {
489 match self {
490 Self::Unknown => Self::Unknown,
491 Self::Created => Self::Created,
492 Self::Initializing => Self::Initializing,
493 Self::Client => Self::Client,
494 Self::Idle => Self::Idle,
495 Self::Processing(instant, handler) => Self::Processing(*instant, handler.clone()),
496 Self::Saving(instant) => Self::Saving(*instant),
497 Self::Loading(instant) => Self::Loading(*instant),
498 Self::Stopping => Self::Stopping,
499 Self::Stopped => Self::Stopped,
500 Self::Failed(err) => Self::Failed(err.clone()),
501 }
502 }
503 fn span_string(&self) -> &'static str {
504 self.arm().unwrap_or_default()
505 }
506}
507
508impl fmt::Display for ActorStatus {
509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510 match self {
511 Self::Unknown => write!(f, "unknown"),
512 Self::Created => write!(f, "created"),
513 Self::Initializing => write!(f, "initializing"),
514 Self::Client => write!(f, "client"),
515 Self::Idle => write!(f, "idle"),
516 Self::Processing(instant, None) => {
517 write!(
518 f,
519 "processing for {}ms",
520 RealClock
521 .system_time_now()
522 .duration_since(*instant)
523 .unwrap_or_default()
524 .as_millis()
525 )
526 }
527 Self::Processing(instant, Some((handler, None))) => {
528 write!(
529 f,
530 "{}: processing for {}ms",
531 handler,
532 RealClock
533 .system_time_now()
534 .duration_since(*instant)
535 .unwrap_or_default()
536 .as_millis()
537 )
538 }
539 Self::Processing(instant, Some((handler, Some(arm)))) => {
540 write!(
541 f,
542 "{},{}: processing for {}ms",
543 handler,
544 arm,
545 RealClock
546 .system_time_now()
547 .duration_since(*instant)
548 .unwrap_or_default()
549 .as_millis()
550 )
551 }
552 Self::Saving(instant) => {
553 write!(
554 f,
555 "saving for {}ms",
556 RealClock
557 .system_time_now()
558 .duration_since(*instant)
559 .unwrap_or_default()
560 .as_millis()
561 )
562 }
563 Self::Loading(instant) => {
564 write!(
565 f,
566 "loading for {}ms",
567 RealClock
568 .system_time_now()
569 .duration_since(*instant)
570 .unwrap_or_default()
571 .as_millis()
572 )
573 }
574 Self::Stopping => write!(f, "stopping"),
575 Self::Stopped => write!(f, "stopped"),
576 Self::Failed(err) => write!(f, "failed: {}", err),
577 }
578 }
579}
580
581pub struct ActorHandle<A: Actor> {
590 cell: InstanceCell,
591 ports: Arc<Ports<A>>,
592}
593
594impl<A: Actor> ActorHandle<A> {
596 pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
597 Self { cell, ports }
598 }
599
600 pub(crate) fn cell(&self) -> &InstanceCell {
603 &self.cell
604 }
605
606 pub fn actor_id(&self) -> &ActorId {
608 self.cell.actor_id()
609 }
610
611 #[allow(clippy::result_large_err)] pub fn drain_and_stop(&self) -> Result<(), ActorError> {
614 tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
615 self.cell.signal(Signal::DrainAndStop)
616 }
617
618 pub fn status(&self) -> watch::Receiver<ActorStatus> {
620 self.cell.status().clone()
621 }
622
623 pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
626 where
627 A: Handler<M>,
628 {
629 self.ports.get().send(message)
630 }
631
632 pub fn port<M: Message>(&self) -> PortHandle<M>
634 where
635 A: Handler<M>,
636 {
637 self.ports.get()
638 }
639
640 pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
643 self.cell.bind(self.ports.as_ref())
644 }
645}
646
647impl<A: Actor> IntoFuture for ActorHandle<A> {
651 type Output = ActorStatus;
652 type IntoFuture = BoxFuture<'static, Self::Output>;
653
654 fn into_future(self) -> Self::IntoFuture {
655 let future = async move {
656 let mut status_receiver = self.cell.status().clone();
657 let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
658 match result {
659 Err(_) => ActorStatus::Unknown,
660 Ok(status) => status.passthrough(),
661 }
662 };
663
664 future.boxed()
665 }
666}
667
668impl<A: Actor> Debug for ActorHandle<A> {
669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
670 f.debug_struct("ActorHandle").field("cell", &"..").finish()
671 }
672}
673
674impl<A: Actor> Clone for ActorHandle<A> {
675 fn clone(&self) -> Self {
676 Self {
677 cell: self.cell.clone(),
678 ports: self.ports.clone(),
679 }
680 }
681}
682
683pub trait Referable: Named + Send + Sync {}
701
702pub trait Binds<A: Actor>: Referable {
705 fn bind(ports: &Ports<A>);
707}
708
709pub trait RemoteHandles<M: RemoteMessage>: Referable {}
712
713#[cfg(test)]
714mod tests {
715 use std::sync::Mutex;
716 use std::time::Duration;
717
718 use tokio::time::timeout;
719
720 use super::*;
721 use crate as hyperactor;
722 use crate::Actor;
723 use crate::OncePortHandle;
724 use crate::PortRef;
725 use crate::checkpoint::CheckpointError;
726 use crate::checkpoint::Checkpointable;
727 use crate::test_utils::pingpong::PingPongActor;
728 use crate::test_utils::pingpong::PingPongActorParams;
729 use crate::test_utils::pingpong::PingPongMessage;
730 use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; #[derive(Debug)]
733 struct EchoActor(PortRef<u64>);
734
735 #[async_trait]
736 impl Actor for EchoActor {
737 type Params = PortRef<u64>;
738
739 async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
740 Ok(Self(params))
741 }
742 }
743
744 #[async_trait]
745 impl Handler<u64> for EchoActor {
746 async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
747 let Self(port) = self;
748 port.send(cx, message)?;
749 Ok(())
750 }
751 }
752
753 #[tokio::test]
754 async fn test_server_basic() {
755 let proc = Proc::local();
756 let client = proc.attach("client").unwrap();
757 let (tx, mut rx) = client.open_port();
758 let handle = proc.spawn::<EchoActor>("echo", tx.bind()).await.unwrap();
759 handle.send(123u64).unwrap();
760 handle.drain_and_stop().unwrap();
761 handle.await;
762
763 assert_eq!(rx.drain(), vec![123u64]);
764 }
765
766 #[tokio::test]
767 async fn test_ping_pong() {
768 let proc = Proc::local();
769 let client = proc.attach("client").unwrap();
770 let (undeliverable_msg_tx, _) = client.open_port();
771
772 let ping_pong_actor_params =
773 PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
774 let ping_handle = proc
775 .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
776 .await
777 .unwrap();
778 let pong_handle = proc
779 .spawn::<PingPongActor>("pong", ping_pong_actor_params)
780 .await
781 .unwrap();
782
783 let (local_port, local_receiver) = client.open_once_port();
784
785 ping_handle
786 .send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
787 .unwrap();
788
789 assert!(local_receiver.recv().await.unwrap());
790 }
791
792 #[tokio::test]
793 async fn test_ping_pong_on_handler_error() {
794 let proc = Proc::local();
795 let client = proc.attach("client").unwrap();
796 let (undeliverable_msg_tx, _) = client.open_port();
797
798 ProcSupervisionCoordinator::set(&proc).await.unwrap();
801
802 let error_ttl = 66;
803 let ping_pong_actor_params =
804 PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl));
805 let ping_handle = proc
806 .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
807 .await
808 .unwrap();
809 let pong_handle = proc
810 .spawn::<PingPongActor>("pong", ping_pong_actor_params)
811 .await
812 .unwrap();
813
814 let (local_port, local_receiver) = client.open_once_port();
815
816 ping_handle
817 .send(PingPongMessage(
818 error_ttl + 1, pong_handle.bind(),
820 local_port.bind(),
821 ))
822 .unwrap();
823
824 #[allow(clippy::disallowed_methods)]
826 let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
827 timeout(Duration::from_secs(5), local_receiver.recv()).await;
828 assert!(res.is_err());
829 }
830
831 #[derive(Debug)]
832 struct InitActor(bool);
833
834 #[async_trait]
835 impl Actor for InitActor {
836 type Params = ();
837
838 async fn new(_params: ()) -> Result<Self, anyhow::Error> {
839 Ok(Self(false))
840 }
841
842 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
843 self.0 = true;
844 Ok(())
845 }
846 }
847
848 #[async_trait]
849 impl Handler<OncePortHandle<bool>> for InitActor {
850 async fn handle(
851 &mut self,
852 _cx: &Context<Self>,
853 port: OncePortHandle<bool>,
854 ) -> Result<(), anyhow::Error> {
855 port.send(self.0)?;
856 Ok(())
857 }
858 }
859
860 #[tokio::test]
861 async fn test_init() {
862 let proc = Proc::local();
863 let handle = proc.spawn::<InitActor>("init", ()).await.unwrap();
864 let client = proc.attach("client").unwrap();
865
866 let (port, receiver) = client.open_once_port();
867 handle.send(port).unwrap();
868 assert!(receiver.recv().await.unwrap());
869
870 handle.drain_and_stop().unwrap();
871 handle.await;
872 }
873
874 #[derive(Debug)]
875 struct CheckpointActor {
876 sum: u64,
878 port: PortRef<u64>,
879 }
880
881 #[async_trait]
882 impl Actor for CheckpointActor {
883 type Params = PortRef<u64>;
884
885 async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
886 Ok(Self {
887 sum: 0,
888 port: params,
889 })
890 }
891 }
892
893 #[async_trait]
894 impl Handler<u64> for CheckpointActor {
895 async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
896 self.sum += value;
897 self.port.send(cx, self.sum)?;
898 Ok(())
899 }
900 }
901
902 #[async_trait]
903 impl Checkpointable for CheckpointActor {
904 type State = (u64, PortRef<u64>);
905
906 async fn save(&self) -> Result<Self::State, CheckpointError> {
907 Ok((self.sum, self.port.clone()))
908 }
909
910 async fn load(state: Self::State) -> Result<Self, CheckpointError> {
911 let (sum, port) = state;
912 Ok(CheckpointActor { sum, port })
913 }
914 }
915
916 type MultiValues = Arc<Mutex<(u64, String)>>;
917
918 struct MultiValuesTest {
919 proc: Proc,
920 values: MultiValues,
921 handle: ActorHandle<MultiActor>,
922 client: Instance<()>,
923 _client_handle: ActorHandle<()>,
924 }
925
926 impl MultiValuesTest {
927 async fn new() -> Self {
928 let proc = Proc::local();
929 let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
930 let handle = proc
931 .spawn::<MultiActor>("myactor", values.clone())
932 .await
933 .unwrap();
934 let (client, client_handle) = proc.instance("client").unwrap();
935 Self {
936 proc,
937 values,
938 handle,
939 client,
940 _client_handle: client_handle,
941 }
942 }
943
944 fn send<M>(&self, message: M)
945 where
946 M: RemoteMessage,
947 MultiActor: Handler<M>,
948 {
949 self.handle.send(message).unwrap()
950 }
951
952 async fn sync(&self) {
953 let (port, done) = self.client.open_once_port::<bool>();
954 self.handle.send(port).unwrap();
955 assert!(done.recv().await.unwrap());
956 }
957
958 fn get_values(&self) -> (u64, String) {
959 self.values.lock().unwrap().clone()
960 }
961 }
962
963 #[derive(Debug)]
964 #[hyperactor::export(handlers = [u64, String])]
965 struct MultiActor(MultiValues);
966
967 #[async_trait]
968 impl Actor for MultiActor {
969 type Params = MultiValues;
970
971 async fn new(init: Self::Params) -> Result<Self, anyhow::Error> {
972 Ok(Self(init))
973 }
974 }
975
976 #[async_trait]
977 impl Handler<u64> for MultiActor {
978 async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
979 let mut vals = self.0.lock().unwrap();
980 vals.0 = message;
981 Ok(())
982 }
983 }
984
985 #[async_trait]
986 impl Handler<String> for MultiActor {
987 async fn handle(
988 &mut self,
989 _cx: &Context<Self>,
990 message: String,
991 ) -> Result<(), anyhow::Error> {
992 let mut vals = self.0.lock().unwrap();
993 vals.1 = message;
994 Ok(())
995 }
996 }
997
998 #[async_trait]
999 impl Handler<OncePortHandle<bool>> for MultiActor {
1000 async fn handle(
1001 &mut self,
1002 _cx: &Context<Self>,
1003 message: OncePortHandle<bool>,
1004 ) -> Result<(), anyhow::Error> {
1005 message.send(true).unwrap();
1006 Ok(())
1007 }
1008 }
1009
1010 #[tokio::test]
1011 async fn test_multi_handler_refs() {
1012 let test = MultiValuesTest::new().await;
1013
1014 test.send(123u64);
1015 test.send("foo".to_string());
1016 test.sync().await;
1017 assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1018
1019 let myref: ActorRef<MultiActor> = test.handle.bind();
1020
1021 myref.port().send(&test.client, 321u64).unwrap();
1022 test.sync().await;
1023 assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1024
1025 myref.port().send(&test.client, "bar".to_string()).unwrap();
1026 test.sync().await;
1027 assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1028 }
1029
1030 #[tokio::test]
1031 async fn test_ref_alias() {
1032 let test = MultiValuesTest::new().await;
1033
1034 test.send(123u64);
1035 test.send("foo".to_string());
1036
1037 hyperactor::alias!(MyActorAlias, u64, String);
1038
1039 let myref: ActorRef<MyActorAlias> = test.handle.bind();
1040 myref.port().send(&test.client, "biz".to_string()).unwrap();
1041 myref.port().send(&test.client, 999u64).unwrap();
1042
1043 test.sync().await;
1044 assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1045 }
1046
1047 #[tokio::test]
1048 async fn test_actor_handle_downcast() {
1049 #[derive(Debug, Default, Actor)]
1050 struct NothingActor;
1051
1052 let proc = Proc::local();
1055 let handle = proc.spawn::<NothingActor>("nothing", ()).await.unwrap();
1056 let cell = handle.cell();
1057
1058 assert!(cell.downcast_handle::<EchoActor>().is_none());
1060
1061 let handle = cell.downcast_handle::<NothingActor>().unwrap();
1062 handle.drain_and_stop().unwrap();
1063 handle.await;
1064 }
1065}