hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // Allow until this is used outside of tests.
10
11//! This module contains all the core traits required to define and manage actors.
12
13use std::any::TypeId;
14use std::fmt;
15use std::fmt::Debug;
16use std::future::Future;
17use std::future::IntoFuture;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use async_trait::async_trait;
23use futures::FutureExt;
24use futures::future::BoxFuture;
25use serde::Deserialize;
26use serde::Serialize;
27use tokio::sync::watch;
28use tokio::task::JoinHandle;
29
30use crate as hyperactor; // for macros
31use crate::ActorRef;
32use crate::Data;
33use crate::Message;
34use crate::Named;
35use crate::RemoteMessage;
36use crate::cap;
37use crate::checkpoint::CheckpointError;
38use crate::checkpoint::Checkpointable;
39use crate::clock::Clock;
40use crate::clock::RealClock;
41use crate::mailbox::MailboxError;
42use crate::mailbox::MailboxSenderError;
43use crate::mailbox::MessageEnvelope;
44use crate::mailbox::PortHandle;
45use crate::mailbox::Undeliverable;
46use crate::mailbox::UndeliverableMessageError;
47use crate::mailbox::log::MessageLogError;
48use crate::message::Castable;
49use crate::message::IndexedErasedUnbound;
50use crate::proc::Context;
51use crate::proc::Instance;
52use crate::proc::InstanceCell;
53use crate::proc::Ports;
54use crate::proc::Proc;
55use crate::reference::ActorId;
56use crate::reference::Index;
57use crate::supervision::ActorSupervisionEvent;
58
59pub mod remote;
60
61/// An Actor is an independent, asynchronous thread of execution. Each
62/// actor instance has a mailbox, whose messages are delivered through
63/// the method [`Actor::handle`].
64///
65/// Actors communicate with each other by way of message passing.
66/// Actors are assumed to be _deterministic_: that is, the state of an
67/// actor is determined by the set (and order) of messages it receives.
68#[async_trait]
69pub trait Actor: Sized + Send + Debug + 'static {
70    /// The type of initialization parameters accepted by this actor.
71    type Params: Send + 'static;
72
73    /// Creates a new actor instance given its instantiation parameters.
74    async fn new(params: Self::Params) -> Result<Self, anyhow::Error>;
75
76    /// Initialize the actor, after the runtime has been fully initialized.
77    /// Init thus provides a mechanism by which an actor can reliably and always
78    /// receive some initial event that can be used to kick off further
79    /// (potentially delayed) processing.
80    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
81        // Default implementation: no init.
82        Ok(())
83    }
84
85    /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
86    /// The spawned actor will be supervised by the parent (spawning) actor.
87    async fn spawn(
88        cap: &impl cap::CanSpawn,
89        params: Self::Params,
90    ) -> anyhow::Result<ActorHandle<Self>> {
91        cap.spawn(params).await
92    }
93
94    /// Spawns this actor in a detached state, handling its messages
95    /// in a background task. The returned handle is used to control
96    /// the actor's lifecycle and to interact with it.
97    ///
98    /// Actors spawned through `spawn_detached` are not attached to a supervision
99    /// hierarchy, and not managed by a [`Proc`].
100    async fn spawn_detached(params: Self::Params) -> Result<ActorHandle<Self>, anyhow::Error> {
101        Proc::local().spawn("anon", params).await
102    }
103
104    /// This method is used by the runtime to spawn the actor server. It can be
105    /// used by actors that require customized runtime setups
106    /// (e.g., dedicated actor threads), or want to use a custom tokio runtime.
107    #[hyperactor::instrument_infallible]
108    fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
109    where
110        F: Future + Send + 'static,
111        F::Output: Send + 'static,
112    {
113        tokio::spawn(future)
114    }
115
116    /// Handle actor supervision event. Return `Ok(true)`` if the event is handled here.
117    async fn handle_supervision_event(
118        &mut self,
119        _this: &Instance<Self>,
120        _event: &ActorSupervisionEvent,
121    ) -> Result<bool, anyhow::Error> {
122        // By default, the supervision event is not handled, caller is expected to bubble it up.
123        Ok(false)
124    }
125
126    /// Default undeliverable message handling behavior.
127    async fn handle_undeliverable_message(
128        &mut self,
129        cx: &Instance<Self>,
130        Undeliverable(envelope): Undeliverable<MessageEnvelope>,
131    ) -> Result<(), anyhow::Error> {
132        assert_eq!(envelope.sender(), cx.self_id());
133
134        anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
135    }
136}
137
138/// An actor that does nothing. It is used to represent "client only" actors,
139/// returned by [`Proc::instance`].
140#[async_trait]
141impl Actor for () {
142    type Params = ();
143
144    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
145        Ok(params)
146    }
147}
148
149/// A Handler allows an actor to handle a specific message type.
150#[async_trait]
151pub trait Handler<M>: Actor {
152    /// Handle the next M-typed message.
153    async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
154}
155
156/// We provide this handler to indicate that actors can handle the [`Signal`] message.
157/// Its actual handler is implemented by the runtime.
158#[async_trait]
159impl<A: Actor> Handler<Signal> for A {
160    async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
161        unimplemented!("signal handler should not be called directly")
162    }
163}
164
165/// This handler provides a default behavior when a message sent by
166/// the actor to another is returned due to delivery failure.
167#[async_trait]
168impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
169    async fn handle(
170        &mut self,
171        cx: &Context<Self>,
172        message: Undeliverable<MessageEnvelope>,
173    ) -> Result<(), anyhow::Error> {
174        self.handle_undeliverable_message(cx, message).await
175    }
176}
177
178/// This handler enables actors to unbind the [IndexedErasedUnbound]
179/// message, and forward the result to corresponding handler.
180#[async_trait]
181impl<A, M> Handler<IndexedErasedUnbound<M>> for A
182where
183    A: Handler<M>,
184    M: Castable,
185{
186    async fn handle(
187        &mut self,
188        cx: &Context<Self>,
189        msg: IndexedErasedUnbound<M>,
190    ) -> anyhow::Result<()> {
191        let message = msg.downcast()?.bind()?;
192        Handler::handle(self, cx, message).await
193    }
194}
195
196/// A RemotableActor may be spawned remotely, and receive messages across
197/// process boundaries.
198pub trait RemotableActor: Actor
199where
200    Self::Params: RemoteMessage,
201{
202    /// A type-erased entry point to spawn this actor. This is primarily used by Hyperactor's
203    /// remote actor registration mechanism.
204    // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`].
205    fn gspawn(
206        proc: &Proc,
207        name: &str,
208        serialized_params: Data,
209    ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>>;
210
211    /// The type ID of this actor.
212    fn get_type_id() -> TypeId {
213        TypeId::of::<Self>()
214    }
215}
216
217impl<A> RemotableActor for A
218where
219    A: Actor + RemoteActor,
220    A: Binds<A>,
221    A::Params: RemoteMessage,
222{
223    fn gspawn(
224        proc: &Proc,
225        name: &str,
226        serialized_params: Data,
227    ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>> {
228        // TODO: fix
229        let proc = proc.clone();
230        let name = name.to_string();
231        Box::pin(async move {
232            let handle = proc
233                .spawn::<A>(&name, bincode::deserialize(&serialized_params)?)
234                .await?;
235            // Gspawned actors are always type erased, and thus are only able to receive serialized
236            // messages; they can be safely exported.
237            //
238            // This will soon be replaced by an export mechanism.
239            Ok(handle.bind::<A>().actor_id)
240        })
241    }
242}
243
244#[async_trait]
245impl<T> Checkpointable for T
246where
247    T: RemoteMessage + Clone,
248{
249    type State = T;
250    async fn save(&self) -> Result<Self::State, CheckpointError> {
251        Ok(self.clone())
252    }
253
254    async fn load(state: Self::State) -> Result<Self, CheckpointError> {
255        Ok(state)
256    }
257}
258
259/// Errors that occur while serving actors. Each error is associated
260/// with the ID of the actor being served.
261#[derive(Debug)]
262pub struct ActorError {
263    pub(crate) actor_id: ActorId,
264    pub(crate) kind: ActorErrorKind,
265}
266
267/// The kinds of actor serving errors.
268#[derive(thiserror::Error, Debug)]
269pub enum ActorErrorKind {
270    /// Error while processing actor, i.e., returned by the actor's
271    /// processing method.
272    #[error("processing error: {0}")]
273    Processing(#[source] anyhow::Error),
274
275    /// Unwound stracktrace of a panic.
276    #[error("panic: {0}")]
277    Panic(#[source] anyhow::Error),
278
279    /// Error during actor initialization.
280    #[error("initialization error: {0}")]
281    Init(#[source] anyhow::Error),
282
283    /// An underlying mailbox error.
284    #[error(transparent)]
285    Mailbox(#[from] MailboxError),
286
287    /// An underlying mailbox sender error.
288    #[error(transparent)]
289    MailboxSender(#[from] MailboxSenderError),
290
291    /// An underlying checkpoint error.
292    #[error("checkpoint error: {0}")]
293    Checkpoint(#[source] CheckpointError),
294
295    /// An underlying message log error.
296    #[error("message log error: {0}")]
297    MessageLog(#[source] MessageLogError),
298
299    /// The actor's state could not be determined.
300    #[error("actor is in an indeterminate state")]
301    IndeterminateState,
302
303    /// An actor supervision event was not handled.
304    #[error("supervision: {0}")]
305    UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
306
307    /// A special kind of error that allows us to clone errors: we can keep the
308    /// error string, but we lose the error structure.
309    #[error("{0}")]
310    Passthrough(#[from] anyhow::Error),
311}
312
313impl ActorError {
314    /// Create a new actor server error with the provided id and kind.
315    pub(crate) fn new(actor_id: ActorId, kind: ActorErrorKind) -> Self {
316        Self { actor_id, kind }
317    }
318
319    /// Passthrough this error.
320    fn passthrough(&self) -> Self {
321        ActorError::new(
322            self.actor_id.clone(),
323            ActorErrorKind::Passthrough(anyhow::anyhow!("{}", self.kind)),
324        )
325    }
326}
327
328impl fmt::Display for ActorError {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        write!(f, "serving {}: ", self.actor_id)?;
331        fmt::Display::fmt(&self.kind, f)
332    }
333}
334
335impl std::error::Error for ActorError {
336    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
337        self.kind.source()
338    }
339}
340
341impl From<MailboxError> for ActorError {
342    fn from(inner: MailboxError) -> Self {
343        Self::new(inner.actor_id().clone(), ActorErrorKind::from(inner))
344    }
345}
346
347impl From<MailboxSenderError> for ActorError {
348    fn from(inner: MailboxSenderError) -> Self {
349        Self::new(
350            inner.location().actor_id().clone(),
351            ActorErrorKind::from(inner),
352        )
353    }
354}
355
356impl From<ActorSupervisionEvent> for ActorError {
357    fn from(inner: ActorSupervisionEvent) -> Self {
358        Self::new(
359            inner.actor_id.clone(),
360            ActorErrorKind::UnhandledSupervisionEvent(inner),
361        )
362    }
363}
364
365/// A collection of signals to control the behavior of the actor.
366/// Signals are internal runtime control plane messages and should not be
367/// sent outside of the runtime.
368///
369/// These messages are not handled directly by actors; instead, the runtime
370/// handles the various signals.
371#[derive(Clone, Debug, Serialize, Deserialize, Named)]
372pub enum Signal {
373    /// Stop the actor, after draining messages.
374    DrainAndStop,
375
376    /// Stop the actor immediately.
377    Stop,
378
379    /// The direct child with the given PID was stopped.
380    ChildStopped(Index),
381}
382
383impl fmt::Display for Signal {
384    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385        match self {
386            Signal::DrainAndStop => write!(f, "DrainAndStop"),
387            Signal::Stop => write!(f, "Stop"),
388            Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
389        }
390    }
391}
392
393/// The runtime status of an actor.
394#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)]
395pub enum ActorStatus {
396    /// The actor status is unknown.
397    Unknown,
398    /// The actor was created, but not yet started.
399    Created,
400    /// The actor is initializing. It is not yet ready to receive messages.
401    Initializing,
402    /// The actor is in "client" state: the user is managing the actor's
403    /// mailboxes manually.
404    Client,
405    /// The actor is ready to receive messages, but is currently idle.
406    Idle,
407    /// The actor has been processing a message, beginning at the specified
408    /// instant. The message handler and arm is included.
409    /// TODO: we shoudl use interned representations here, so we don't copy
410    /// strings willy-nilly.
411    Processing(SystemTime, Option<(String, Option<String>)>),
412    /// The actor has been saving its state.
413    Saving(SystemTime),
414    /// The actor has been loading its state.
415    Loading(SystemTime),
416    /// The actor is stopping. It is draining messages.
417    Stopping,
418    /// The actor is stopped. It is no longer processing messages.
419    Stopped,
420    /// The actor failed with the provided actor error formatted in string
421    /// representation.
422    Failed(String),
423}
424
425impl ActorStatus {
426    /// Tells whether the status is a terminal state.
427    pub(crate) fn is_terminal(&self) -> bool {
428        matches!(self, Self::Stopped | Self::Failed(_))
429    }
430
431    /// Tells whether the status represents a failure.
432    pub(crate) fn is_failed(&self) -> bool {
433        matches!(self, Self::Failed(_))
434    }
435
436    /// Create a passthrough of this status. The returned status is a clone,
437    /// except that [`ActorStatus::Failed`] is replaced with its passthrough.
438    fn passthrough(&self) -> Self {
439        match self {
440            Self::Unknown => Self::Unknown,
441            Self::Created => Self::Created,
442            Self::Initializing => Self::Initializing,
443            Self::Client => Self::Client,
444            Self::Idle => Self::Idle,
445            Self::Processing(instant, handler) => {
446                Self::Processing(instant.clone(), handler.clone())
447            }
448            Self::Saving(instant) => Self::Saving(instant.clone()),
449            Self::Loading(instant) => Self::Loading(instant.clone()),
450            Self::Stopping => Self::Stopping,
451            Self::Stopped => Self::Stopped,
452            Self::Failed(err) => Self::Failed(err.clone()),
453        }
454    }
455    fn span_string(&self) -> &'static str {
456        self.arm().unwrap_or_default()
457    }
458}
459
460impl fmt::Display for ActorStatus {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        match self {
463            Self::Unknown => write!(f, "unknown"),
464            Self::Created => write!(f, "created"),
465            Self::Initializing => write!(f, "initializing"),
466            Self::Client => write!(f, "client"),
467            Self::Idle => write!(f, "idle"),
468            Self::Processing(instant, None) => {
469                write!(
470                    f,
471                    "processing for {}ms",
472                    RealClock
473                        .system_time_now()
474                        .duration_since(instant.clone())
475                        .unwrap_or_default()
476                        .as_millis()
477                )
478            }
479            Self::Processing(instant, Some((handler, None))) => {
480                write!(
481                    f,
482                    "{}: processing for {}ms",
483                    handler,
484                    RealClock
485                        .system_time_now()
486                        .duration_since(instant.clone())
487                        .unwrap_or_default()
488                        .as_millis()
489                )
490            }
491            Self::Processing(instant, Some((handler, Some(arm)))) => {
492                write!(
493                    f,
494                    "{},{}: processing for {}ms",
495                    handler,
496                    arm,
497                    RealClock
498                        .system_time_now()
499                        .duration_since(instant.clone())
500                        .unwrap_or_default()
501                        .as_millis()
502                )
503            }
504            Self::Saving(instant) => {
505                write!(
506                    f,
507                    "saving for {}ms",
508                    RealClock
509                        .system_time_now()
510                        .duration_since(instant.clone())
511                        .unwrap_or_default()
512                        .as_millis()
513                )
514            }
515            Self::Loading(instant) => {
516                write!(
517                    f,
518                    "loading for {}ms",
519                    RealClock
520                        .system_time_now()
521                        .duration_since(instant.clone())
522                        .unwrap_or_default()
523                        .as_millis()
524                )
525            }
526            Self::Stopping => write!(f, "stopping"),
527            Self::Stopped => write!(f, "stopped"),
528            Self::Failed(err) => write!(f, "failed: {}", err),
529        }
530    }
531}
532
533/// ActorHandles represent a (local) serving actor. It is used to access
534/// its messaging and signal ports, as well as to synchronize with its
535/// lifecycle (e.g., providing joins).  Once dropped, the handle is
536/// detached from the underlying actor instance, and there is no longer
537/// any way to join it.
538///
539/// Correspondingly, [`crate::ActorRef`]s refer to (possibly) remote
540/// actors.
541pub struct ActorHandle<A: Actor> {
542    cell: InstanceCell,
543    ports: Arc<Ports<A>>,
544}
545
546/// A handle to a running (local) actor.
547impl<A: Actor> ActorHandle<A> {
548    pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
549        Self { cell, ports }
550    }
551
552    /// The actor's cell. Used primarily for testing.
553    /// TODO: this should not be a public API.
554    pub(crate) fn cell(&self) -> &InstanceCell {
555        &self.cell
556    }
557
558    /// The [`ActorId`] of the actor represented by this handle.
559    pub fn actor_id(&self) -> &ActorId {
560        self.cell.actor_id()
561    }
562
563    /// Signal the actor to drain its current messages and then stop.
564    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
565    pub fn drain_and_stop(&self) -> Result<(), ActorError> {
566        tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
567        self.cell.signal(Signal::DrainAndStop)
568    }
569
570    /// A watch that observes the lifecycle state of the actor.
571    pub fn status(&self) -> watch::Receiver<ActorStatus> {
572        self.cell.status().clone()
573    }
574
575    /// Send a message to the actor. Messages sent through the handle
576    /// are always queued in process, and do not require serialization.
577    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `MailboxSenderError`.
578    pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
579    where
580        A: Handler<M>,
581    {
582        self.ports.get().send(message)
583    }
584
585    /// Return a port for the provided message type handled by the actor.
586    pub fn port<M: Message>(&self) -> PortHandle<M>
587    where
588        A: Handler<M>,
589    {
590        self.ports.get()
591    }
592
593    /// TEMPORARY: bind...
594    /// TODO: we shoudl also have a default binding(?)
595    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
596        self.cell.bind(self.ports.as_ref())
597    }
598}
599
600/// IntoFuture allows users to await the handle to join it. The future
601/// resolves when the actor itself has stopped processing messages.
602/// The future resolves to the actor's final status.
603impl<A: Actor> IntoFuture for ActorHandle<A> {
604    type Output = ActorStatus;
605    type IntoFuture = BoxFuture<'static, Self::Output>;
606
607    fn into_future(self) -> Self::IntoFuture {
608        let future = async move {
609            let mut status_receiver = self.cell.status().clone();
610            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
611            match result {
612                Err(_) => ActorStatus::Unknown,
613                Ok(status) => status.passthrough(),
614            }
615        };
616
617        future.boxed()
618    }
619}
620
621impl<A: Actor> Debug for ActorHandle<A> {
622    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
623        f.debug_struct("ActorHandle").field("cell", &"..").finish()
624    }
625}
626
627impl<A: Actor> Clone for ActorHandle<A> {
628    fn clone(&self) -> Self {
629        Self {
630            cell: self.cell.clone(),
631            ports: self.ports.clone(),
632        }
633    }
634}
635
636/// RemoteActor is a marker trait for types that can be used as
637/// remote actor references. All [`Actor`]s are thus referencable;
638/// but other types may also implement this in order to separately
639/// specify actor interfaces.
640pub trait RemoteActor: Named + Send + Sync {}
641
642/// Binds determines how an actor's ports are bound to a specific
643/// reference type.
644pub trait Binds<A: Actor>: RemoteActor {
645    /// Bind ports in this actor.
646    fn bind(ports: &Ports<A>);
647}
648
649/// Handles is a marker trait specifying that message type [`M`]
650/// is handled by a specific actor type.
651pub trait RemoteHandles<M: RemoteMessage>: RemoteActor {}
652
653#[cfg(test)]
654mod tests {
655    use std::sync::Mutex;
656    use std::time::Duration;
657
658    use tokio::time::timeout;
659
660    use super::*;
661    use crate as hyperactor;
662    use crate::Actor;
663    use crate::Mailbox;
664    use crate::OncePortHandle;
665    use crate::PortRef;
666    use crate::checkpoint::CheckpointError;
667    use crate::checkpoint::Checkpointable;
668    use crate::test_utils::pingpong::PingPongActor;
669    use crate::test_utils::pingpong::PingPongActorParams;
670    use crate::test_utils::pingpong::PingPongMessage;
671    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; // for macros
672
673    #[derive(Debug)]
674    struct EchoActor(PortRef<u64>);
675
676    #[async_trait]
677    impl Actor for EchoActor {
678        type Params = PortRef<u64>;
679
680        async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
681            Ok(Self(params))
682        }
683    }
684
685    #[async_trait]
686    impl Handler<u64> for EchoActor {
687        async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
688            let Self(port) = self;
689            port.send(cx, message)?;
690            Ok(())
691        }
692    }
693
694    #[tokio::test]
695    async fn test_server_basic() {
696        let proc = Proc::local();
697        let client = proc.attach("client").unwrap();
698        let (tx, mut rx) = client.open_port();
699        let handle = proc.spawn::<EchoActor>("echo", tx.bind()).await.unwrap();
700        handle.send(123u64).unwrap();
701        handle.drain_and_stop().unwrap();
702        handle.await;
703
704        assert_eq!(rx.drain(), vec![123u64]);
705    }
706
707    #[tokio::test]
708    async fn test_ping_pong() {
709        let proc = Proc::local();
710        let client = proc.attach("client").unwrap();
711        let (undeliverable_msg_tx, _) = client.open_port();
712
713        let ping_pong_actor_params =
714            PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
715        let ping_handle = proc
716            .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
717            .await
718            .unwrap();
719        let pong_handle = proc
720            .spawn::<PingPongActor>("pong", ping_pong_actor_params)
721            .await
722            .unwrap();
723
724        let (local_port, local_receiver) = client.open_once_port();
725
726        ping_handle
727            .send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
728            .unwrap();
729
730        assert!(local_receiver.recv().await.unwrap());
731    }
732
733    #[tokio::test]
734    async fn test_ping_pong_on_handler_error() {
735        let proc = Proc::local();
736        let client = proc.attach("client").unwrap();
737        let (undeliverable_msg_tx, _) = client.open_port();
738
739        // Need to set a supervison coordinator for this Proc because there will
740        // be actor failure(s) in this test which trigger supervision.
741        ProcSupervisionCoordinator::set(&proc).await.unwrap();
742
743        let error_ttl = 66;
744        let ping_pong_actor_params =
745            PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl));
746        let ping_handle = proc
747            .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
748            .await
749            .unwrap();
750        let pong_handle = proc
751            .spawn::<PingPongActor>("pong", ping_pong_actor_params)
752            .await
753            .unwrap();
754
755        let (local_port, local_receiver) = client.open_once_port();
756
757        ping_handle
758            .send(PingPongMessage(
759                error_ttl + 1, // will encounter an error at TTL=66
760                pong_handle.bind(),
761                local_port.bind(),
762            ))
763            .unwrap();
764
765        // TODO: Fix this receiver hanging issue in T200423722.
766        #[allow(clippy::disallowed_methods)]
767        let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
768            timeout(Duration::from_secs(5), local_receiver.recv()).await;
769        assert!(res.is_err());
770    }
771
772    #[derive(Debug)]
773    struct InitActor(bool);
774
775    #[async_trait]
776    impl Actor for InitActor {
777        type Params = ();
778
779        async fn new(_params: ()) -> Result<Self, anyhow::Error> {
780            Ok(Self(false))
781        }
782
783        async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
784            self.0 = true;
785            Ok(())
786        }
787    }
788
789    #[async_trait]
790    impl Handler<OncePortHandle<bool>> for InitActor {
791        async fn handle(
792            &mut self,
793            _cx: &Context<Self>,
794            port: OncePortHandle<bool>,
795        ) -> Result<(), anyhow::Error> {
796            port.send(self.0)?;
797            Ok(())
798        }
799    }
800
801    #[tokio::test]
802    async fn test_init() {
803        let proc = Proc::local();
804        let handle = proc.spawn::<InitActor>("init", ()).await.unwrap();
805        let client = proc.attach("client").unwrap();
806
807        let (port, receiver) = client.open_once_port();
808        handle.send(port).unwrap();
809        assert!(receiver.recv().await.unwrap());
810
811        handle.drain_and_stop().unwrap();
812        handle.await;
813    }
814
815    #[derive(Debug)]
816    struct CheckpointActor {
817        // The actor does nothing but sum the values of messages.
818        sum: u64,
819        port: PortRef<u64>,
820    }
821
822    #[async_trait]
823    impl Actor for CheckpointActor {
824        type Params = PortRef<u64>;
825
826        async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
827            Ok(Self {
828                sum: 0,
829                port: params,
830            })
831        }
832    }
833
834    #[async_trait]
835    impl Handler<u64> for CheckpointActor {
836        async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
837            self.sum += value;
838            self.port.send(cx, self.sum)?;
839            Ok(())
840        }
841    }
842
843    #[async_trait]
844    impl Checkpointable for CheckpointActor {
845        type State = (u64, PortRef<u64>);
846
847        async fn save(&self) -> Result<Self::State, CheckpointError> {
848            Ok((self.sum, self.port.clone()))
849        }
850
851        async fn load(state: Self::State) -> Result<Self, CheckpointError> {
852            let (sum, port) = state;
853            Ok(CheckpointActor { sum, port })
854        }
855    }
856
857    type MultiValues = Arc<Mutex<(u64, String)>>;
858
859    struct MultiValuesTest {
860        proc: Proc,
861        values: MultiValues,
862        handle: ActorHandle<MultiActor>,
863        client: Mailbox,
864    }
865
866    impl MultiValuesTest {
867        async fn new() -> Self {
868            let proc = Proc::local();
869            let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
870            let handle = proc
871                .spawn::<MultiActor>("myactor", values.clone())
872                .await
873                .unwrap();
874            let client = proc.attach("client").unwrap();
875            Self {
876                proc,
877                values,
878                handle,
879                client,
880            }
881        }
882
883        fn send<M>(&self, message: M)
884        where
885            M: RemoteMessage,
886            MultiActor: Handler<M>,
887        {
888            self.handle.send(message).unwrap()
889        }
890
891        async fn sync(&self) {
892            let (port, done) = self.client.open_once_port::<bool>();
893            self.handle.send(port).unwrap();
894            assert!(done.recv().await.unwrap());
895        }
896
897        fn get_values(&self) -> (u64, String) {
898            self.values.lock().unwrap().clone()
899        }
900    }
901
902    #[derive(Debug)]
903    #[hyperactor::export(handlers = [u64, String])]
904    struct MultiActor(MultiValues);
905
906    #[async_trait]
907    impl Actor for MultiActor {
908        type Params = MultiValues;
909
910        async fn new(init: Self::Params) -> Result<Self, anyhow::Error> {
911            Ok(Self(init))
912        }
913    }
914
915    #[async_trait]
916    impl Handler<u64> for MultiActor {
917        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
918            let mut vals = self.0.lock().unwrap();
919            vals.0 = message;
920            Ok(())
921        }
922    }
923
924    #[async_trait]
925    impl Handler<String> for MultiActor {
926        async fn handle(
927            &mut self,
928            _cx: &Context<Self>,
929            message: String,
930        ) -> Result<(), anyhow::Error> {
931            let mut vals = self.0.lock().unwrap();
932            vals.1 = message;
933            Ok(())
934        }
935    }
936
937    #[async_trait]
938    impl Handler<OncePortHandle<bool>> for MultiActor {
939        async fn handle(
940            &mut self,
941            _cx: &Context<Self>,
942            message: OncePortHandle<bool>,
943        ) -> Result<(), anyhow::Error> {
944            message.send(true).unwrap();
945            Ok(())
946        }
947    }
948
949    #[tokio::test]
950    async fn test_multi_handler_refs() {
951        let test = MultiValuesTest::new().await;
952
953        test.send(123u64);
954        test.send("foo".to_string());
955        test.sync().await;
956        assert_eq!(test.get_values(), (123u64, "foo".to_string()));
957
958        let myref: ActorRef<MultiActor> = test.handle.bind();
959
960        myref.port().send(&test.client, 321u64).unwrap();
961        test.sync().await;
962        assert_eq!(test.get_values(), (321u64, "foo".to_string()));
963
964        myref.port().send(&test.client, "bar".to_string()).unwrap();
965        test.sync().await;
966        assert_eq!(test.get_values(), (321u64, "bar".to_string()));
967    }
968
969    #[tokio::test]
970    async fn test_ref_alias() {
971        let test = MultiValuesTest::new().await;
972
973        test.send(123u64);
974        test.send("foo".to_string());
975
976        hyperactor::alias!(MyActorAlias, u64, String);
977
978        let myref: ActorRef<MyActorAlias> = test.handle.bind();
979        myref.port().send(&test.client, "biz".to_string()).unwrap();
980        myref.port().send(&test.client, 999u64).unwrap();
981
982        test.sync().await;
983        assert_eq!(test.get_values(), (999u64, "biz".to_string()));
984    }
985
986    #[tokio::test]
987    async fn test_actor_handle_downcast() {
988        #[derive(Debug, Default, Actor)]
989        struct NothingActor;
990
991        // Just test that we can round-trip the handle through a downcast.
992
993        let proc = Proc::local();
994        let handle = proc.spawn::<NothingActor>("nothing", ()).await.unwrap();
995        let cell = handle.cell();
996
997        // Invalid actor doesn't succeed.
998        assert!(cell.downcast_handle::<EchoActor>().is_none());
999
1000        let handle = cell.downcast_handle::<NothingActor>().unwrap();
1001        handle.drain_and_stop().unwrap();
1002        handle.await;
1003    }
1004}