hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // Allow until this is used outside of tests.
10
11//! This module contains all the core traits required to define and manage actors.
12
13use std::any::TypeId;
14use std::fmt;
15use std::fmt::Debug;
16use std::future::Future;
17use std::future::IntoFuture;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use async_trait::async_trait;
23use futures::FutureExt;
24use futures::future::BoxFuture;
25use serde::Deserialize;
26use serde::Serialize;
27use tokio::sync::watch;
28use tokio::task::JoinHandle;
29
30use crate as hyperactor; // for macros
31use crate::ActorRef;
32use crate::Data;
33use crate::Message;
34use crate::Named;
35use crate::RemoteMessage;
36use crate::checkpoint::CheckpointError;
37use crate::checkpoint::Checkpointable;
38use crate::clock::Clock;
39use crate::clock::RealClock;
40use crate::context;
41use crate::mailbox::MailboxError;
42use crate::mailbox::MailboxSenderError;
43use crate::mailbox::MessageEnvelope;
44use crate::mailbox::PortHandle;
45use crate::mailbox::Undeliverable;
46use crate::mailbox::UndeliverableMessageError;
47use crate::mailbox::log::MessageLogError;
48use crate::message::Castable;
49use crate::message::IndexedErasedUnbound;
50use crate::proc::Context;
51use crate::proc::Instance;
52use crate::proc::InstanceCell;
53use crate::proc::Ports;
54use crate::proc::Proc;
55use crate::reference::ActorId;
56use crate::reference::Index;
57use crate::supervision::ActorSupervisionEvent;
58
59pub mod remote;
60
61/// An Actor is an independent, asynchronous thread of execution. Each
62/// actor instance has a mailbox, whose messages are delivered through
63/// the method [`Actor::handle`].
64///
65/// Actors communicate with each other by way of message passing.
66/// Actors are assumed to be _deterministic_: that is, the state of an
67/// actor is determined by the set (and order) of messages it receives.
68#[async_trait]
69pub trait Actor: Sized + Send + Debug + 'static {
70    /// The type of initialization parameters accepted by this actor.
71    type Params: Send + 'static;
72
73    /// Creates a new actor instance given its instantiation parameters.
74    async fn new(params: Self::Params) -> Result<Self, anyhow::Error>;
75
76    /// Initialize the actor, after the runtime has been fully initialized.
77    /// Init thus provides a mechanism by which an actor can reliably and always
78    /// receive some initial event that can be used to kick off further
79    /// (potentially delayed) processing.
80    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
81        // Default implementation: no init.
82        Ok(())
83    }
84
85    /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
86    /// The spawned actor will be supervised by the parent (spawning) actor.
87    async fn spawn(
88        cx: &impl context::Actor,
89        params: Self::Params,
90    ) -> anyhow::Result<ActorHandle<Self>> {
91        cx.instance().spawn(params).await
92    }
93
94    /// Spawns this actor in a detached state, handling its messages
95    /// in a background task. The returned handle is used to control
96    /// the actor's lifecycle and to interact with it.
97    ///
98    /// Actors spawned through `spawn_detached` are not attached to a supervision
99    /// hierarchy, and not managed by a [`Proc`].
100    async fn spawn_detached(params: Self::Params) -> Result<ActorHandle<Self>, anyhow::Error> {
101        Proc::local().spawn("anon", params).await
102    }
103
104    /// This method is used by the runtime to spawn the actor server. It can be
105    /// used by actors that require customized runtime setups
106    /// (e.g., dedicated actor threads), or want to use a custom tokio runtime.
107    #[hyperactor::instrument_infallible]
108    fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
109    where
110        F: Future + Send + 'static,
111        F::Output: Send + 'static,
112    {
113        tokio::spawn(future)
114    }
115
116    /// Handle actor supervision event. Return `Ok(true)`` if the event is handled here.
117    async fn handle_supervision_event(
118        &mut self,
119        _this: &Instance<Self>,
120        _event: &ActorSupervisionEvent,
121    ) -> Result<bool, anyhow::Error> {
122        // By default, the supervision event is not handled, caller is expected to bubble it up.
123        Ok(false)
124    }
125
126    /// Default undeliverable message handling behavior.
127    async fn handle_undeliverable_message(
128        &mut self,
129        cx: &Instance<Self>,
130        envelope: Undeliverable<MessageEnvelope>,
131    ) -> Result<(), anyhow::Error> {
132        handle_undeliverable_message(cx, envelope)
133    }
134}
135
136/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
137/// as a free function so that `Actor` implementations that override
138/// [`Actor::handle_undeliverable_message`] can fallback to this default.
139pub fn handle_undeliverable_message<A: Actor>(
140    cx: &Instance<A>,
141    Undeliverable(envelope): Undeliverable<MessageEnvelope>,
142) -> Result<(), anyhow::Error> {
143    assert_eq!(envelope.sender(), cx.self_id());
144
145    anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
146}
147
148/// An actor that does nothing. It is used to represent "client only" actors,
149/// returned by [`Proc::instance`].
150#[async_trait]
151impl Actor for () {
152    type Params = ();
153
154    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
155        Ok(params)
156    }
157}
158
159impl Referable for () {}
160
161impl Binds<()> for () {
162    fn bind(_ports: &Ports<Self>) {
163        // Binds no ports.
164    }
165}
166
167/// A Handler allows an actor to handle a specific message type.
168#[async_trait]
169pub trait Handler<M>: Actor {
170    /// Handle the next M-typed message.
171    async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
172}
173
174/// We provide this handler to indicate that actors can handle the [`Signal`] message.
175/// Its actual handler is implemented by the runtime.
176#[async_trait]
177impl<A: Actor> Handler<Signal> for A {
178    async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
179        unimplemented!("signal handler should not be called directly")
180    }
181}
182
183/// This handler provides a default behavior when a message sent by
184/// the actor to another is returned due to delivery failure.
185#[async_trait]
186impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
187    async fn handle(
188        &mut self,
189        cx: &Context<Self>,
190        message: Undeliverable<MessageEnvelope>,
191    ) -> Result<(), anyhow::Error> {
192        self.handle_undeliverable_message(cx, message).await
193    }
194}
195
196/// This handler enables actors to unbind the [IndexedErasedUnbound]
197/// message, and forward the result to corresponding handler.
198#[async_trait]
199impl<A, M> Handler<IndexedErasedUnbound<M>> for A
200where
201    A: Handler<M>,
202    M: Castable,
203{
204    async fn handle(
205        &mut self,
206        cx: &Context<Self>,
207        msg: IndexedErasedUnbound<M>,
208    ) -> anyhow::Result<()> {
209        let message = msg.downcast()?.bind()?;
210        Handler::handle(self, cx, message).await
211    }
212}
213
214/// An `Actor` that can be spawned remotely.
215///
216/// Blanket-implemented for actors that opt in to remote spawn by also
217/// implementing `Referable` and `Binds<A>`, with serializable
218/// params:
219///
220/// ```rust,ignore
221/// impl<A> RemotableActor for A
222/// where
223///     A: Actor + Referable + Binds<A>,
224///     A::Params: RemoteMessage,
225/// {}
226/// ```
227///
228/// Bounds explained:
229/// - `Referable`: marks the type as eligible for typed remote
230///   references (`ActorRef<A>`); required because remote spawn
231///   ultimately hands back an `ActorId` that higher-level APIs may
232///   re-type as `ActorRef<A>`.
233/// - `Binds<A>`: lets the runtime wire this actor's message ports
234///   when it is spawned (the blanket impl calls `handle.bind::<A>()`).
235/// - `A::Params: RemoteMessage`: constructor params must be
236///   (de)serializable to cross a process boundary.
237///
238/// `gspawn` is a type-erased entry point used by the remote
239/// spawn/registry machinery. It takes serialized params and returns
240/// the new actor’s `ActorId`; application code shouldn’t call it
241/// directly.
242pub trait RemotableActor: Actor
243where
244    Self::Params: RemoteMessage,
245{
246    /// A type-erased entry point to spawn this actor. This is
247    /// primarily used by hyperactor's remote actor registration
248    /// mechanism.
249    // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`].
250    fn gspawn(
251        proc: &Proc,
252        name: &str,
253        serialized_params: Data,
254    ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>>;
255
256    /// The type ID of this actor.
257    fn get_type_id() -> TypeId {
258        TypeId::of::<Self>()
259    }
260}
261
262impl<A> RemotableActor for A
263where
264    A: Actor + Referable + Binds<A>,
265    A::Params: RemoteMessage,
266{
267    fn gspawn(
268        proc: &Proc,
269        name: &str,
270        serialized_params: Data,
271    ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>> {
272        let proc = proc.clone();
273        let name = name.to_string();
274        Box::pin(async move {
275            let handle = proc
276                .spawn::<A>(&name, bincode::deserialize(&serialized_params)?)
277                .await?;
278            // We return only the ActorId, not a typed ActorRef.
279            // Callers that hold this ID can interact with the actor
280            // only via the serialized/opaque messaging path, which
281            // makes it safe to export across process boundaries.
282            //
283            // Note: the actor itself is still `A`-typed here; we
284            // merely restrict the *capability* we hand out to an
285            // untyped identifier.
286            //
287            // This will be replaced by a proper export/registry
288            // mechanism.
289            Ok(handle.bind::<A>().actor_id)
290        })
291    }
292}
293
294#[async_trait]
295impl<T> Checkpointable for T
296where
297    T: RemoteMessage + Clone,
298{
299    type State = T;
300    async fn save(&self) -> Result<Self::State, CheckpointError> {
301        Ok(self.clone())
302    }
303
304    async fn load(state: Self::State) -> Result<Self, CheckpointError> {
305        Ok(state)
306    }
307}
308
309/// Errors that occur while serving actors. Each error is associated
310/// with the ID of the actor being served.
311#[derive(Debug)]
312pub struct ActorError {
313    pub(crate) actor_id: ActorId,
314    pub(crate) kind: ActorErrorKind,
315}
316
317/// The kinds of actor serving errors.
318#[derive(thiserror::Error, Debug)]
319pub enum ActorErrorKind {
320    /// Error while processing actor, i.e., returned by the actor's
321    /// processing method.
322    #[error("processing error: {0}")]
323    Processing(#[source] anyhow::Error),
324
325    /// Unwound stracktrace of a panic.
326    #[error("panic: {0}")]
327    Panic(#[source] anyhow::Error),
328
329    /// Error during actor initialization.
330    #[error("initialization error: {0}")]
331    Init(#[source] anyhow::Error),
332
333    /// An underlying mailbox error.
334    #[error(transparent)]
335    Mailbox(#[from] MailboxError),
336
337    /// An underlying mailbox sender error.
338    #[error(transparent)]
339    MailboxSender(#[from] MailboxSenderError),
340
341    /// An underlying checkpoint error.
342    #[error("checkpoint error: {0}")]
343    Checkpoint(#[source] CheckpointError),
344
345    /// An underlying message log error.
346    #[error("message log error: {0}")]
347    MessageLog(#[source] MessageLogError),
348
349    /// The actor's state could not be determined.
350    #[error("actor is in an indeterminate state")]
351    IndeterminateState,
352
353    /// An actor supervision event was not handled.
354    #[error("supervision: {0}")]
355    UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
356
357    /// A special kind of error that allows us to clone errors: we can keep the
358    /// error string, but we lose the error structure.
359    #[error("{0}")]
360    Passthrough(#[from] anyhow::Error),
361}
362
363impl ActorError {
364    /// Create a new actor server error with the provided id and kind.
365    pub(crate) fn new(actor_id: ActorId, kind: ActorErrorKind) -> Self {
366        Self { actor_id, kind }
367    }
368
369    /// Passthrough this error.
370    fn passthrough(&self) -> Self {
371        ActorError::new(
372            self.actor_id.clone(),
373            ActorErrorKind::Passthrough(anyhow::anyhow!("{}", self.kind)),
374        )
375    }
376}
377
378impl fmt::Display for ActorError {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        write!(f, "serving {}: ", self.actor_id)?;
381        fmt::Display::fmt(&self.kind, f)
382    }
383}
384
385impl std::error::Error for ActorError {
386    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
387        self.kind.source()
388    }
389}
390
391impl From<MailboxError> for ActorError {
392    fn from(inner: MailboxError) -> Self {
393        Self::new(inner.actor_id().clone(), ActorErrorKind::from(inner))
394    }
395}
396
397impl From<MailboxSenderError> for ActorError {
398    fn from(inner: MailboxSenderError) -> Self {
399        Self::new(
400            inner.location().actor_id().clone(),
401            ActorErrorKind::from(inner),
402        )
403    }
404}
405
406impl From<ActorSupervisionEvent> for ActorError {
407    fn from(inner: ActorSupervisionEvent) -> Self {
408        Self::new(
409            inner.actor_id.clone(),
410            ActorErrorKind::UnhandledSupervisionEvent(inner),
411        )
412    }
413}
414
415/// A collection of signals to control the behavior of the actor.
416/// Signals are internal runtime control plane messages and should not be
417/// sent outside of the runtime.
418///
419/// These messages are not handled directly by actors; instead, the runtime
420/// handles the various signals.
421#[derive(Clone, Debug, Serialize, Deserialize, Named)]
422pub enum Signal {
423    /// Stop the actor, after draining messages.
424    DrainAndStop,
425
426    /// Stop the actor immediately.
427    Stop,
428
429    /// The direct child with the given PID was stopped.
430    ChildStopped(Index),
431}
432
433impl fmt::Display for Signal {
434    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435        match self {
436            Signal::DrainAndStop => write!(f, "DrainAndStop"),
437            Signal::Stop => write!(f, "Stop"),
438            Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
439        }
440    }
441}
442
443/// The runtime status of an actor.
444#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)]
445pub enum ActorStatus {
446    /// The actor status is unknown.
447    Unknown,
448    /// The actor was created, but not yet started.
449    Created,
450    /// The actor is initializing. It is not yet ready to receive messages.
451    Initializing,
452    /// The actor is in "client" state: the user is managing the actor's
453    /// mailboxes manually.
454    Client,
455    /// The actor is ready to receive messages, but is currently idle.
456    Idle,
457    /// The actor has been processing a message, beginning at the specified
458    /// instant. The message handler and arm is included.
459    /// TODO: we shoudl use interned representations here, so we don't copy
460    /// strings willy-nilly.
461    Processing(SystemTime, Option<(String, Option<String>)>),
462    /// The actor has been saving its state.
463    Saving(SystemTime),
464    /// The actor has been loading its state.
465    Loading(SystemTime),
466    /// The actor is stopping. It is draining messages.
467    Stopping,
468    /// The actor is stopped. It is no longer processing messages.
469    Stopped,
470    /// The actor failed with the provided actor error formatted in string
471    /// representation.
472    Failed(String),
473}
474
475impl ActorStatus {
476    /// Tells whether the status is a terminal state.
477    pub(crate) fn is_terminal(&self) -> bool {
478        matches!(self, Self::Stopped | Self::Failed(_))
479    }
480
481    /// Tells whether the status represents a failure.
482    pub(crate) fn is_failed(&self) -> bool {
483        matches!(self, Self::Failed(_))
484    }
485
486    /// Create a passthrough of this status. The returned status is a clone,
487    /// except that [`ActorStatus::Failed`] is replaced with its passthrough.
488    fn passthrough(&self) -> Self {
489        match self {
490            Self::Unknown => Self::Unknown,
491            Self::Created => Self::Created,
492            Self::Initializing => Self::Initializing,
493            Self::Client => Self::Client,
494            Self::Idle => Self::Idle,
495            Self::Processing(instant, handler) => Self::Processing(*instant, handler.clone()),
496            Self::Saving(instant) => Self::Saving(*instant),
497            Self::Loading(instant) => Self::Loading(*instant),
498            Self::Stopping => Self::Stopping,
499            Self::Stopped => Self::Stopped,
500            Self::Failed(err) => Self::Failed(err.clone()),
501        }
502    }
503    fn span_string(&self) -> &'static str {
504        self.arm().unwrap_or_default()
505    }
506}
507
508impl fmt::Display for ActorStatus {
509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510        match self {
511            Self::Unknown => write!(f, "unknown"),
512            Self::Created => write!(f, "created"),
513            Self::Initializing => write!(f, "initializing"),
514            Self::Client => write!(f, "client"),
515            Self::Idle => write!(f, "idle"),
516            Self::Processing(instant, None) => {
517                write!(
518                    f,
519                    "processing for {}ms",
520                    RealClock
521                        .system_time_now()
522                        .duration_since(*instant)
523                        .unwrap_or_default()
524                        .as_millis()
525                )
526            }
527            Self::Processing(instant, Some((handler, None))) => {
528                write!(
529                    f,
530                    "{}: processing for {}ms",
531                    handler,
532                    RealClock
533                        .system_time_now()
534                        .duration_since(*instant)
535                        .unwrap_or_default()
536                        .as_millis()
537                )
538            }
539            Self::Processing(instant, Some((handler, Some(arm)))) => {
540                write!(
541                    f,
542                    "{},{}: processing for {}ms",
543                    handler,
544                    arm,
545                    RealClock
546                        .system_time_now()
547                        .duration_since(*instant)
548                        .unwrap_or_default()
549                        .as_millis()
550                )
551            }
552            Self::Saving(instant) => {
553                write!(
554                    f,
555                    "saving for {}ms",
556                    RealClock
557                        .system_time_now()
558                        .duration_since(*instant)
559                        .unwrap_or_default()
560                        .as_millis()
561                )
562            }
563            Self::Loading(instant) => {
564                write!(
565                    f,
566                    "loading for {}ms",
567                    RealClock
568                        .system_time_now()
569                        .duration_since(*instant)
570                        .unwrap_or_default()
571                        .as_millis()
572                )
573            }
574            Self::Stopping => write!(f, "stopping"),
575            Self::Stopped => write!(f, "stopped"),
576            Self::Failed(err) => write!(f, "failed: {}", err),
577        }
578    }
579}
580
581/// ActorHandles represent a (local) serving actor. It is used to access
582/// its messaging and signal ports, as well as to synchronize with its
583/// lifecycle (e.g., providing joins).  Once dropped, the handle is
584/// detached from the underlying actor instance, and there is no longer
585/// any way to join it.
586///
587/// Correspondingly, [`crate::ActorRef`]s refer to (possibly) remote
588/// actors.
589pub struct ActorHandle<A: Actor> {
590    cell: InstanceCell,
591    ports: Arc<Ports<A>>,
592}
593
594/// A handle to a running (local) actor.
595impl<A: Actor> ActorHandle<A> {
596    pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
597        Self { cell, ports }
598    }
599
600    /// The actor's cell. Used primarily for testing.
601    /// TODO: this should not be a public API.
602    pub(crate) fn cell(&self) -> &InstanceCell {
603        &self.cell
604    }
605
606    /// The [`ActorId`] of the actor represented by this handle.
607    pub fn actor_id(&self) -> &ActorId {
608        self.cell.actor_id()
609    }
610
611    /// Signal the actor to drain its current messages and then stop.
612    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
613    pub fn drain_and_stop(&self) -> Result<(), ActorError> {
614        tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
615        self.cell.signal(Signal::DrainAndStop)
616    }
617
618    /// A watch that observes the lifecycle state of the actor.
619    pub fn status(&self) -> watch::Receiver<ActorStatus> {
620        self.cell.status().clone()
621    }
622
623    /// Send a message to the actor. Messages sent through the handle
624    /// are always queued in process, and do not require serialization.
625    pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
626    where
627        A: Handler<M>,
628    {
629        self.ports.get().send(message)
630    }
631
632    /// Return a port for the provided message type handled by the actor.
633    pub fn port<M: Message>(&self) -> PortHandle<M>
634    where
635        A: Handler<M>,
636    {
637        self.ports.get()
638    }
639
640    /// TEMPORARY: bind...
641    /// TODO: we shoudl also have a default binding(?)
642    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
643        self.cell.bind(self.ports.as_ref())
644    }
645}
646
647/// IntoFuture allows users to await the handle to join it. The future
648/// resolves when the actor itself has stopped processing messages.
649/// The future resolves to the actor's final status.
650impl<A: Actor> IntoFuture for ActorHandle<A> {
651    type Output = ActorStatus;
652    type IntoFuture = BoxFuture<'static, Self::Output>;
653
654    fn into_future(self) -> Self::IntoFuture {
655        let future = async move {
656            let mut status_receiver = self.cell.status().clone();
657            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
658            match result {
659                Err(_) => ActorStatus::Unknown,
660                Ok(status) => status.passthrough(),
661            }
662        };
663
664        future.boxed()
665    }
666}
667
668impl<A: Actor> Debug for ActorHandle<A> {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
670        f.debug_struct("ActorHandle").field("cell", &"..").finish()
671    }
672}
673
674impl<A: Actor> Clone for ActorHandle<A> {
675    fn clone(&self) -> Self {
676        Self {
677            cell: self.cell.clone(),
678            ports: self.ports.clone(),
679        }
680    }
681}
682
683/// `Referable` is a marker trait for types that can appear as
684/// remote references across process boundaries.
685///
686/// It is not limited to concrete [`Actor`] implementations. For
687/// example, façade types generated by [`alias!`] implement
688/// `Referable` so that you can hand out restricted or stable APIs
689/// while still using the same remote messaging machinery.
690///
691/// Implementing this trait means the type:
692/// - can be identified (`Named`) so the runtime knows what it is,
693/// - is safe to pass across threads (`Send + Sync`),
694/// - and can be carried in [`ActorRef<T>`] values across process
695///   boundaries.
696///
697/// In contrast, [`RemotableActor`] is the trait that marks *actors*
698/// that can actually be **spawned remotely**. An alias type may be a
699/// `Referable` but is never a `RemotableActor`.
700pub trait Referable: Named + Send + Sync {}
701
702/// Binds determines how an actor's ports are bound to a specific
703/// reference type.
704pub trait Binds<A: Actor>: Referable {
705    /// Bind ports in this actor.
706    fn bind(ports: &Ports<A>);
707}
708
709/// Handles is a marker trait specifying that message type [`M`]
710/// is handled by a specific actor type.
711pub trait RemoteHandles<M: RemoteMessage>: Referable {}
712
713#[cfg(test)]
714mod tests {
715    use std::sync::Mutex;
716    use std::time::Duration;
717
718    use tokio::time::timeout;
719
720    use super::*;
721    use crate as hyperactor;
722    use crate::Actor;
723    use crate::OncePortHandle;
724    use crate::PortRef;
725    use crate::checkpoint::CheckpointError;
726    use crate::checkpoint::Checkpointable;
727    use crate::test_utils::pingpong::PingPongActor;
728    use crate::test_utils::pingpong::PingPongActorParams;
729    use crate::test_utils::pingpong::PingPongMessage;
730    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; // for macros
731
732    #[derive(Debug)]
733    struct EchoActor(PortRef<u64>);
734
735    #[async_trait]
736    impl Actor for EchoActor {
737        type Params = PortRef<u64>;
738
739        async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
740            Ok(Self(params))
741        }
742    }
743
744    #[async_trait]
745    impl Handler<u64> for EchoActor {
746        async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
747            let Self(port) = self;
748            port.send(cx, message)?;
749            Ok(())
750        }
751    }
752
753    #[tokio::test]
754    async fn test_server_basic() {
755        let proc = Proc::local();
756        let client = proc.attach("client").unwrap();
757        let (tx, mut rx) = client.open_port();
758        let handle = proc.spawn::<EchoActor>("echo", tx.bind()).await.unwrap();
759        handle.send(123u64).unwrap();
760        handle.drain_and_stop().unwrap();
761        handle.await;
762
763        assert_eq!(rx.drain(), vec![123u64]);
764    }
765
766    #[tokio::test]
767    async fn test_ping_pong() {
768        let proc = Proc::local();
769        let client = proc.attach("client").unwrap();
770        let (undeliverable_msg_tx, _) = client.open_port();
771
772        let ping_pong_actor_params =
773            PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None);
774        let ping_handle = proc
775            .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
776            .await
777            .unwrap();
778        let pong_handle = proc
779            .spawn::<PingPongActor>("pong", ping_pong_actor_params)
780            .await
781            .unwrap();
782
783        let (local_port, local_receiver) = client.open_once_port();
784
785        ping_handle
786            .send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
787            .unwrap();
788
789        assert!(local_receiver.recv().await.unwrap());
790    }
791
792    #[tokio::test]
793    async fn test_ping_pong_on_handler_error() {
794        let proc = Proc::local();
795        let client = proc.attach("client").unwrap();
796        let (undeliverable_msg_tx, _) = client.open_port();
797
798        // Need to set a supervison coordinator for this Proc because there will
799        // be actor failure(s) in this test which trigger supervision.
800        ProcSupervisionCoordinator::set(&proc).await.unwrap();
801
802        let error_ttl = 66;
803        let ping_pong_actor_params =
804            PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl));
805        let ping_handle = proc
806            .spawn::<PingPongActor>("ping", ping_pong_actor_params.clone())
807            .await
808            .unwrap();
809        let pong_handle = proc
810            .spawn::<PingPongActor>("pong", ping_pong_actor_params)
811            .await
812            .unwrap();
813
814        let (local_port, local_receiver) = client.open_once_port();
815
816        ping_handle
817            .send(PingPongMessage(
818                error_ttl + 1, // will encounter an error at TTL=66
819                pong_handle.bind(),
820                local_port.bind(),
821            ))
822            .unwrap();
823
824        // TODO: Fix this receiver hanging issue in T200423722.
825        #[allow(clippy::disallowed_methods)]
826        let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
827            timeout(Duration::from_secs(5), local_receiver.recv()).await;
828        assert!(res.is_err());
829    }
830
831    #[derive(Debug)]
832    struct InitActor(bool);
833
834    #[async_trait]
835    impl Actor for InitActor {
836        type Params = ();
837
838        async fn new(_params: ()) -> Result<Self, anyhow::Error> {
839            Ok(Self(false))
840        }
841
842        async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
843            self.0 = true;
844            Ok(())
845        }
846    }
847
848    #[async_trait]
849    impl Handler<OncePortHandle<bool>> for InitActor {
850        async fn handle(
851            &mut self,
852            _cx: &Context<Self>,
853            port: OncePortHandle<bool>,
854        ) -> Result<(), anyhow::Error> {
855            port.send(self.0)?;
856            Ok(())
857        }
858    }
859
860    #[tokio::test]
861    async fn test_init() {
862        let proc = Proc::local();
863        let handle = proc.spawn::<InitActor>("init", ()).await.unwrap();
864        let client = proc.attach("client").unwrap();
865
866        let (port, receiver) = client.open_once_port();
867        handle.send(port).unwrap();
868        assert!(receiver.recv().await.unwrap());
869
870        handle.drain_and_stop().unwrap();
871        handle.await;
872    }
873
874    #[derive(Debug)]
875    struct CheckpointActor {
876        // The actor does nothing but sum the values of messages.
877        sum: u64,
878        port: PortRef<u64>,
879    }
880
881    #[async_trait]
882    impl Actor for CheckpointActor {
883        type Params = PortRef<u64>;
884
885        async fn new(params: PortRef<u64>) -> Result<Self, anyhow::Error> {
886            Ok(Self {
887                sum: 0,
888                port: params,
889            })
890        }
891    }
892
893    #[async_trait]
894    impl Handler<u64> for CheckpointActor {
895        async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
896            self.sum += value;
897            self.port.send(cx, self.sum)?;
898            Ok(())
899        }
900    }
901
902    #[async_trait]
903    impl Checkpointable for CheckpointActor {
904        type State = (u64, PortRef<u64>);
905
906        async fn save(&self) -> Result<Self::State, CheckpointError> {
907            Ok((self.sum, self.port.clone()))
908        }
909
910        async fn load(state: Self::State) -> Result<Self, CheckpointError> {
911            let (sum, port) = state;
912            Ok(CheckpointActor { sum, port })
913        }
914    }
915
916    type MultiValues = Arc<Mutex<(u64, String)>>;
917
918    struct MultiValuesTest {
919        proc: Proc,
920        values: MultiValues,
921        handle: ActorHandle<MultiActor>,
922        client: Instance<()>,
923        _client_handle: ActorHandle<()>,
924    }
925
926    impl MultiValuesTest {
927        async fn new() -> Self {
928            let proc = Proc::local();
929            let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
930            let handle = proc
931                .spawn::<MultiActor>("myactor", values.clone())
932                .await
933                .unwrap();
934            let (client, client_handle) = proc.instance("client").unwrap();
935            Self {
936                proc,
937                values,
938                handle,
939                client,
940                _client_handle: client_handle,
941            }
942        }
943
944        fn send<M>(&self, message: M)
945        where
946            M: RemoteMessage,
947            MultiActor: Handler<M>,
948        {
949            self.handle.send(message).unwrap()
950        }
951
952        async fn sync(&self) {
953            let (port, done) = self.client.open_once_port::<bool>();
954            self.handle.send(port).unwrap();
955            assert!(done.recv().await.unwrap());
956        }
957
958        fn get_values(&self) -> (u64, String) {
959            self.values.lock().unwrap().clone()
960        }
961    }
962
963    #[derive(Debug)]
964    #[hyperactor::export(handlers = [u64, String])]
965    struct MultiActor(MultiValues);
966
967    #[async_trait]
968    impl Actor for MultiActor {
969        type Params = MultiValues;
970
971        async fn new(init: Self::Params) -> Result<Self, anyhow::Error> {
972            Ok(Self(init))
973        }
974    }
975
976    #[async_trait]
977    impl Handler<u64> for MultiActor {
978        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
979            let mut vals = self.0.lock().unwrap();
980            vals.0 = message;
981            Ok(())
982        }
983    }
984
985    #[async_trait]
986    impl Handler<String> for MultiActor {
987        async fn handle(
988            &mut self,
989            _cx: &Context<Self>,
990            message: String,
991        ) -> Result<(), anyhow::Error> {
992            let mut vals = self.0.lock().unwrap();
993            vals.1 = message;
994            Ok(())
995        }
996    }
997
998    #[async_trait]
999    impl Handler<OncePortHandle<bool>> for MultiActor {
1000        async fn handle(
1001            &mut self,
1002            _cx: &Context<Self>,
1003            message: OncePortHandle<bool>,
1004        ) -> Result<(), anyhow::Error> {
1005            message.send(true).unwrap();
1006            Ok(())
1007        }
1008    }
1009
1010    #[tokio::test]
1011    async fn test_multi_handler_refs() {
1012        let test = MultiValuesTest::new().await;
1013
1014        test.send(123u64);
1015        test.send("foo".to_string());
1016        test.sync().await;
1017        assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1018
1019        let myref: ActorRef<MultiActor> = test.handle.bind();
1020
1021        myref.port().send(&test.client, 321u64).unwrap();
1022        test.sync().await;
1023        assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1024
1025        myref.port().send(&test.client, "bar".to_string()).unwrap();
1026        test.sync().await;
1027        assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1028    }
1029
1030    #[tokio::test]
1031    async fn test_ref_alias() {
1032        let test = MultiValuesTest::new().await;
1033
1034        test.send(123u64);
1035        test.send("foo".to_string());
1036
1037        hyperactor::alias!(MyActorAlias, u64, String);
1038
1039        let myref: ActorRef<MyActorAlias> = test.handle.bind();
1040        myref.port().send(&test.client, "biz".to_string()).unwrap();
1041        myref.port().send(&test.client, 999u64).unwrap();
1042
1043        test.sync().await;
1044        assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1045    }
1046
1047    #[tokio::test]
1048    async fn test_actor_handle_downcast() {
1049        #[derive(Debug, Default, Actor)]
1050        struct NothingActor;
1051
1052        // Just test that we can round-trip the handle through a downcast.
1053
1054        let proc = Proc::local();
1055        let handle = proc.spawn::<NothingActor>("nothing", ()).await.unwrap();
1056        let cell = handle.cell();
1057
1058        // Invalid actor doesn't succeed.
1059        assert!(cell.downcast_handle::<EchoActor>().is_none());
1060
1061        let handle = cell.downcast_handle::<NothingActor>().unwrap();
1062        handle.drain_and_stop().unwrap();
1063        handle.await;
1064    }
1065}