hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // Allow until this is used outside of tests.
10
11//! This module contains all the core traits required to define and manage actors.
12
13use std::any::TypeId;
14use std::fmt;
15use std::fmt::Debug;
16use std::future::Future;
17use std::future::IntoFuture;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use async_trait::async_trait;
23use enum_as_inner::EnumAsInner;
24use futures::FutureExt;
25use futures::future::BoxFuture;
26use serde::Deserialize;
27use serde::Serialize;
28use tokio::sync::watch;
29use tokio::task::JoinHandle;
30use typeuri::Named;
31
32use crate as hyperactor; // for macros
33use crate::ActorRef;
34use crate::Data;
35use crate::Message;
36use crate::RemoteMessage;
37use crate::checkpoint::CheckpointError;
38use crate::checkpoint::Checkpointable;
39use crate::clock::Clock;
40use crate::clock::RealClock;
41use crate::context;
42use crate::mailbox::MailboxError;
43use crate::mailbox::MailboxSenderError;
44use crate::mailbox::MessageEnvelope;
45use crate::mailbox::PortHandle;
46use crate::mailbox::Undeliverable;
47use crate::mailbox::UndeliverableMessageError;
48use crate::mailbox::log::MessageLogError;
49use crate::message::Castable;
50use crate::message::IndexedErasedUnbound;
51use crate::proc::Context;
52use crate::proc::Instance;
53use crate::proc::InstanceCell;
54use crate::proc::Ports;
55use crate::proc::Proc;
56use crate::reference::ActorId;
57use crate::reference::Index;
58use crate::supervision::ActorSupervisionEvent;
59
60pub mod remote;
61
62/// An Actor is an independent, asynchronous thread of execution. Each
63/// actor instance has a mailbox, whose messages are delivered through
64/// the method [`Actor::handle`].
65///
66/// Actors communicate with each other by way of message passing.
67/// Actors are assumed to be _deterministic_: that is, the state of an
68/// actor is determined by the set (and order) of messages it receives.
69#[async_trait]
70pub trait Actor: Sized + Send + 'static {
71    /// Initialize the actor, after the runtime has been fully initialized.
72    /// Init thus provides a mechanism by which an actor can reliably and always
73    /// receive some initial event that can be used to kick off further
74    /// (potentially delayed) processing.
75    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
76        // Default implementation: no init.
77        Ok(())
78    }
79
80    /// Cleanup things used by this actor before shutting down. Notably this function
81    /// is async and allows more complex cleanup. Simpler cleanup can be handled
82    /// by the impl Drop for this Actor.
83    /// If err is not None, it is the error that this actor is failing with. Any
84    /// errors returned by this function will be logged and ignored.
85    /// If err is None, any errors returned by this function will be propagated
86    /// as an ActorError.
87    /// This function is not called if there is a panic in the actor, as the
88    /// actor may be in an indeterminate state. It is also not called if the
89    /// process is killed, there is no atexit handler or signal handler.
90    async fn cleanup(
91        &mut self,
92        _this: &Instance<Self>,
93        _err: Option<&ActorError>,
94    ) -> Result<(), anyhow::Error> {
95        // Default implementation: no cleanup.
96        Ok(())
97    }
98
99    /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
100    /// The spawned actor will be supervised by the parent (spawning) actor.
101    fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
102        cx.instance().spawn(self)
103    }
104
105    /// Spawns this actor in a detached state, handling its messages
106    /// in a background task. The returned handle is used to control
107    /// the actor's lifecycle and to interact with it.
108    ///
109    /// Actors spawned through `spawn_detached` are not attached to a supervision
110    /// hierarchy, and not managed by a [`Proc`].
111    fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
112        Proc::local().spawn("anon", self)
113    }
114
115    /// This method is used by the runtime to spawn the actor server. It can be
116    /// used by actors that require customized runtime setups
117    /// (e.g., dedicated actor threads), or want to use a custom tokio runtime.
118    #[hyperactor::instrument_infallible]
119    fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
120    where
121        F: Future + Send + 'static,
122        F::Output: Send + 'static,
123    {
124        tokio::spawn(future)
125    }
126
127    /// Handle actor supervision event. Return `Ok(true)`` if the event is handled here.
128    async fn handle_supervision_event(
129        &mut self,
130        _this: &Instance<Self>,
131        _event: &ActorSupervisionEvent,
132    ) -> Result<bool, anyhow::Error> {
133        // By default, the supervision event is not handled, caller is expected to bubble it up.
134        Ok(false)
135    }
136
137    /// Default undeliverable message handling behavior.
138    async fn handle_undeliverable_message(
139        &mut self,
140        cx: &Instance<Self>,
141        envelope: Undeliverable<MessageEnvelope>,
142    ) -> Result<(), anyhow::Error> {
143        handle_undeliverable_message(cx, envelope)
144    }
145
146    /// If overridden, we will use this name in place of the
147    /// ActorId for talking about this actor in supervision error
148    /// messages.
149    fn display_name(&self) -> Option<String> {
150        None
151    }
152}
153
154/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
155/// as a free function so that `Actor` implementations that override
156/// [`Actor::handle_undeliverable_message`] can fallback to this default.
157pub fn handle_undeliverable_message<A: Actor>(
158    cx: &Instance<A>,
159    Undeliverable(envelope): Undeliverable<MessageEnvelope>,
160) -> Result<(), anyhow::Error> {
161    assert_eq!(envelope.sender(), cx.self_id());
162
163    anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
164}
165
166/// An actor that does nothing. It is used to represent "client only" actors,
167/// returned by [`Proc::instance`].
168#[async_trait]
169impl Actor for () {}
170
171impl Referable for () {}
172
173impl Binds<()> for () {
174    fn bind(_ports: &Ports<Self>) {
175        // Binds no ports.
176    }
177}
178
179/// A Handler allows an actor to handle a specific message type.
180#[async_trait]
181pub trait Handler<M>: Actor {
182    /// Handle the next M-typed message.
183    async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
184}
185
186/// We provide this handler to indicate that actors can handle the [`Signal`] message.
187/// Its actual handler is implemented by the runtime.
188#[async_trait]
189impl<A: Actor> Handler<Signal> for A {
190    async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
191        unimplemented!("signal handler should not be called directly")
192    }
193}
194
195/// This handler provides a default behavior when a message sent by
196/// the actor to another is returned due to delivery failure.
197#[async_trait]
198impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
199    async fn handle(
200        &mut self,
201        cx: &Context<Self>,
202        message: Undeliverable<MessageEnvelope>,
203    ) -> Result<(), anyhow::Error> {
204        self.handle_undeliverable_message(cx, message).await
205    }
206}
207
208/// This handler enables actors to unbind the [IndexedErasedUnbound]
209/// message, and forward the result to corresponding handler.
210#[async_trait]
211impl<A, M> Handler<IndexedErasedUnbound<M>> for A
212where
213    A: Handler<M>,
214    M: Castable,
215{
216    async fn handle(
217        &mut self,
218        cx: &Context<Self>,
219        msg: IndexedErasedUnbound<M>,
220    ) -> anyhow::Result<()> {
221        let message = msg.downcast()?.bind()?;
222        Handler::handle(self, cx, message).await
223    }
224}
225
226/// An `Actor` that can be spawned remotely.
227///
228/// Bounds explained:
229/// - `Actor`: only actors may be remotely spawned.
230/// - `Referable`: marks the type as eligible for typed remote
231///   references (`ActorRef<A>`); required because remote spawn
232///   ultimately hands back an `ActorId` that higher-level APIs may
233///   re-type as `ActorRef<A>`.
234/// - `Binds<Self>`: lets the runtime wire this actor's message ports
235///   when it is spawned (the blanket impl calls `handle.bind::<Self>()`).
236///
237/// `gspawn` is a type-erased entry point used by the remote
238/// spawn/registry machinery. It takes serialized params and returns
239/// the new actor's `ActorId`; application code shouldn't call it
240/// directly.
241#[async_trait]
242pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
243    /// The type of parameters used to instantiate the actor remotely.
244    type Params: RemoteMessage;
245
246    /// Creates a new actor instance given its instantiation parameters.
247    async fn new(params: Self::Params) -> anyhow::Result<Self>;
248
249    /// A type-erased entry point to spawn this actor. This is
250    /// primarily used by hyperactor's remote actor registration
251    /// mechanism.
252    // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`].
253    fn gspawn(
254        proc: &Proc,
255        name: &str,
256        serialized_params: Data,
257    ) -> Pin<Box<dyn Future<Output = Result<ActorId, anyhow::Error>> + Send>> {
258        let proc = proc.clone();
259        let name = name.to_string();
260        Box::pin(async move {
261            let params = bincode::deserialize(&serialized_params)?;
262            let actor = Self::new(params).await?;
263            let handle = proc.spawn(&name, actor)?;
264            // We return only the ActorId, not a typed ActorRef.
265            // Callers that hold this ID can interact with the actor
266            // only via the serialized/opaque messaging path, which
267            // makes it safe to export across process boundaries.
268            //
269            // Note: the actor itself is still `A`-typed here; we
270            // merely restrict the *capability* we hand out to an
271            // untyped identifier.
272            //
273            // This will be replaced by a proper export/registry
274            // mechanism.
275            Ok(handle.bind::<Self>().actor_id)
276        })
277    }
278
279    /// The type ID of this actor.
280    fn get_type_id() -> TypeId {
281        TypeId::of::<Self>()
282    }
283}
284
285/// If an actor implements Default, we use this as the
286/// `RemoteSpawn` implementation, too.
287#[async_trait]
288impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
289    type Params = ();
290
291    async fn new(_params: Self::Params) -> anyhow::Result<Self> {
292        Ok(Default::default())
293    }
294}
295
296#[async_trait]
297impl<T> Checkpointable for T
298where
299    T: RemoteMessage + Clone,
300{
301    type State = T;
302    async fn save(&self) -> Result<Self::State, CheckpointError> {
303        Ok(self.clone())
304    }
305
306    async fn load(state: Self::State) -> Result<Self, CheckpointError> {
307        Ok(state)
308    }
309}
310
311/// Errors that occur while serving actors. Each error is associated
312/// with the ID of the actor being served.
313#[derive(Debug)]
314pub struct ActorError {
315    /// The ActorId for the actor that generated this error.
316    pub actor_id: Box<ActorId>,
317    /// The kind of error that occurred.
318    pub kind: Box<ActorErrorKind>,
319}
320
321/// The kinds of actor serving errors.
322#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
323pub enum ActorErrorKind {
324    /// Generic error with a formatted message.
325    #[error("{0}")]
326    Generic(String),
327
328    /// An error that occurred while trying to handle a supervision event.
329    #[error("{0} while handling {1}")]
330    ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
331    /// The actor did not attempt to handle
332    #[error("{0}")]
333    UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
334}
335
336impl ActorErrorKind {
337    /// Error while processing actor, i.e., returned by the actor's
338    /// processing method.
339    pub fn processing(err: anyhow::Error) -> Self {
340        // Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
341        // If it is directly use it as the new ActorError's ActorErrorKind.
342        // This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
343        // up the handling infrastructure.
344        err.downcast::<ActorErrorKind>()
345            .unwrap_or_else(|err| Self::Generic(err.to_string()))
346    }
347
348    /// Unwound stracktrace of a panic.
349    pub fn panic(err: anyhow::Error) -> Self {
350        Self::Generic(format!("panic: {}", err))
351    }
352
353    /// Error during actor initialization.
354    pub fn init(err: anyhow::Error) -> Self {
355        Self::Generic(format!("initialization error: {}", err))
356    }
357
358    /// Error during actor cleanup.
359    pub fn cleanup(err: anyhow::Error) -> Self {
360        Self::Generic(format!("cleanup error: {}", err))
361    }
362
363    /// An underlying mailbox error.
364    pub fn mailbox(err: MailboxError) -> Self {
365        Self::Generic(err.to_string())
366    }
367
368    /// An underlying mailbox sender error.
369    pub fn mailbox_sender(err: MailboxSenderError) -> Self {
370        Self::Generic(err.to_string())
371    }
372
373    /// An underlying checkpoint error.
374    pub fn checkpoint(err: CheckpointError) -> Self {
375        Self::Generic(format!("checkpoint error: {}", err))
376    }
377
378    /// An underlying message log error.
379    pub fn message_log(err: MessageLogError) -> Self {
380        Self::Generic(format!("message log error: {}", err))
381    }
382
383    /// The actor's state could not be determined.
384    pub fn indeterminate_state() -> Self {
385        Self::Generic("actor is in an indeterminate state".to_string())
386    }
387}
388
389impl ActorError {
390    /// Create a new actor server error with the provided id and kind.
391    pub(crate) fn new(actor_id: &ActorId, kind: ActorErrorKind) -> Self {
392        Self {
393            actor_id: Box::new(actor_id.clone()),
394            kind: Box::new(kind),
395        }
396    }
397}
398
399impl fmt::Display for ActorError {
400    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
401        write!(f, "serving {}: ", self.actor_id)?;
402        fmt::Display::fmt(&self.kind, f)
403    }
404}
405
406impl std::error::Error for ActorError {
407    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
408        self.kind.source()
409    }
410}
411
412impl From<MailboxError> for ActorError {
413    fn from(inner: MailboxError) -> Self {
414        Self {
415            actor_id: Box::new(inner.actor_id().clone()),
416            kind: Box::new(ActorErrorKind::mailbox(inner)),
417        }
418    }
419}
420
421impl From<MailboxSenderError> for ActorError {
422    fn from(inner: MailboxSenderError) -> Self {
423        Self {
424            actor_id: Box::new(inner.location().actor_id().clone()),
425            kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
426        }
427    }
428}
429
430/// A collection of signals to control the behavior of the actor.
431/// Signals are internal runtime control plane messages and should not be
432/// sent outside of the runtime.
433///
434/// These messages are not handled directly by actors; instead, the runtime
435/// handles the various signals.
436#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
437pub enum Signal {
438    /// Stop the actor, after draining messages.
439    DrainAndStop,
440
441    /// Stop the actor immediately.
442    Stop,
443
444    /// The direct child with the given PID was stopped.
445    ChildStopped(Index),
446}
447wirevalue::register_type!(Signal);
448
449impl fmt::Display for Signal {
450    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451        match self {
452            Signal::DrainAndStop => write!(f, "DrainAndStop"),
453            Signal::Stop => write!(f, "Stop"),
454            Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
455        }
456    }
457}
458
459/// The runtime status of an actor.
460#[derive(
461    Debug,
462    Serialize,
463    Deserialize,
464    PartialEq,
465    Eq,
466    Clone,
467    typeuri::Named,
468    EnumAsInner
469)]
470pub enum ActorStatus {
471    /// The actor status is unknown.
472    Unknown,
473    /// The actor was created, but not yet started.
474    Created,
475    /// The actor is initializing. It is not yet ready to receive messages.
476    Initializing,
477    /// The actor is in "client" state: the user is managing the actor's
478    /// mailboxes manually.
479    Client,
480    /// The actor is ready to receive messages, but is currently idle.
481    Idle,
482    /// The actor has been processing a message, beginning at the specified
483    /// instant. The message handler and arm is included.
484    /// TODO: we shoudl use interned representations here, so we don't copy
485    /// strings willy-nilly.
486    Processing(SystemTime, Option<(String, Option<String>)>),
487    /// The actor has been saving its state.
488    Saving(SystemTime),
489    /// The actor has been loading its state.
490    Loading(SystemTime),
491    /// The actor is stopping. It is draining messages.
492    Stopping,
493    /// The actor is stopped. It is no longer processing messages.
494    Stopped,
495    /// The actor failed with the provided actor error.
496    Failed(ActorErrorKind),
497}
498
499impl ActorStatus {
500    /// Tells whether the status is a terminal state.
501    pub fn is_terminal(&self) -> bool {
502        self.is_stopped() || self.is_failed()
503    }
504
505    /// Create a generic failure status with the provided error message.
506    pub fn generic_failure(message: impl Into<String>) -> Self {
507        Self::Failed(ActorErrorKind::Generic(message.into()))
508    }
509
510    fn span_string(&self) -> &'static str {
511        self.arm().unwrap_or_default()
512    }
513}
514
515impl fmt::Display for ActorStatus {
516    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
517        match self {
518            Self::Unknown => write!(f, "unknown"),
519            Self::Created => write!(f, "created"),
520            Self::Initializing => write!(f, "initializing"),
521            Self::Client => write!(f, "client"),
522            Self::Idle => write!(f, "idle"),
523            Self::Processing(instant, None) => {
524                write!(
525                    f,
526                    "processing for {}ms",
527                    RealClock
528                        .system_time_now()
529                        .duration_since(*instant)
530                        .unwrap_or_default()
531                        .as_millis()
532                )
533            }
534            Self::Processing(instant, Some((handler, None))) => {
535                write!(
536                    f,
537                    "{}: processing for {}ms",
538                    handler,
539                    RealClock
540                        .system_time_now()
541                        .duration_since(*instant)
542                        .unwrap_or_default()
543                        .as_millis()
544                )
545            }
546            Self::Processing(instant, Some((handler, Some(arm)))) => {
547                write!(
548                    f,
549                    "{},{}: processing for {}ms",
550                    handler,
551                    arm,
552                    RealClock
553                        .system_time_now()
554                        .duration_since(*instant)
555                        .unwrap_or_default()
556                        .as_millis()
557                )
558            }
559            Self::Saving(instant) => {
560                write!(
561                    f,
562                    "saving for {}ms",
563                    RealClock
564                        .system_time_now()
565                        .duration_since(*instant)
566                        .unwrap_or_default()
567                        .as_millis()
568                )
569            }
570            Self::Loading(instant) => {
571                write!(
572                    f,
573                    "loading for {}ms",
574                    RealClock
575                        .system_time_now()
576                        .duration_since(*instant)
577                        .unwrap_or_default()
578                        .as_millis()
579                )
580            }
581            Self::Stopping => write!(f, "stopping"),
582            Self::Stopped => write!(f, "stopped"),
583            Self::Failed(err) => write!(f, "failed: {}", err),
584        }
585    }
586}
587
588/// ActorHandles represent a (local) serving actor. It is used to access
589/// its messaging and signal ports, as well as to synchronize with its
590/// lifecycle (e.g., providing joins).  Once dropped, the handle is
591/// detached from the underlying actor instance, and there is no longer
592/// any way to join it.
593///
594/// Correspondingly, [`crate::ActorRef`]s refer to (possibly) remote
595/// actors.
596pub struct ActorHandle<A: Actor> {
597    cell: InstanceCell,
598    ports: Arc<Ports<A>>,
599}
600
601/// A handle to a running (local) actor.
602impl<A: Actor> ActorHandle<A> {
603    pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
604        Self { cell, ports }
605    }
606
607    /// The actor's cell. Used primarily for testing.
608    /// TODO: this should not be a public API.
609    pub(crate) fn cell(&self) -> &InstanceCell {
610        &self.cell
611    }
612
613    /// The [`ActorId`] of the actor represented by this handle.
614    pub fn actor_id(&self) -> &ActorId {
615        self.cell.actor_id()
616    }
617
618    /// Signal the actor to drain its current messages and then stop.
619    pub fn drain_and_stop(&self) -> Result<(), ActorError> {
620        tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
621        self.cell.signal(Signal::DrainAndStop)
622    }
623
624    /// A watch that observes the lifecycle state of the actor.
625    pub fn status(&self) -> watch::Receiver<ActorStatus> {
626        self.cell.status().clone()
627    }
628
629    /// Send a message to the actor. Messages sent through the handle
630    /// are always queued in process, and do not require serialization.
631    pub fn send<M: Message>(&self, message: M) -> Result<(), MailboxSenderError>
632    where
633        A: Handler<M>,
634    {
635        self.ports.get().send(message)
636    }
637
638    /// Return a port for the provided message type handled by the actor.
639    pub fn port<M: Message>(&self) -> PortHandle<M>
640    where
641        A: Handler<M>,
642    {
643        self.ports.get()
644    }
645
646    /// TEMPORARY: bind...
647    /// TODO: we shoudl also have a default binding(?)
648    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
649        self.cell.bind(self.ports.as_ref())
650    }
651}
652
653/// IntoFuture allows users to await the handle to join it. The future
654/// resolves when the actor itself has stopped processing messages.
655/// The future resolves to the actor's final status.
656impl<A: Actor> IntoFuture for ActorHandle<A> {
657    type Output = ActorStatus;
658    type IntoFuture = BoxFuture<'static, Self::Output>;
659
660    fn into_future(self) -> Self::IntoFuture {
661        let future = async move {
662            let mut status_receiver = self.cell.status().clone();
663            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
664            match result {
665                Err(_) => ActorStatus::Unknown,
666                Ok(status) => status.clone(),
667            }
668        };
669
670        future.boxed()
671    }
672}
673
674impl<A: Actor> Debug for ActorHandle<A> {
675    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
676        f.debug_struct("ActorHandle").field("cell", &"..").finish()
677    }
678}
679
680impl<A: Actor> Clone for ActorHandle<A> {
681    fn clone(&self) -> Self {
682        Self {
683            cell: self.cell.clone(),
684            ports: self.ports.clone(),
685        }
686    }
687}
688
689/// `Referable` is a marker trait for types that can appear as
690/// remote references across process boundaries.
691///
692/// It is not limited to concrete [`Actor`] implementations. For
693/// example, façade types generated by [`behavior!`] implement
694/// `Referable` so that you can hand out restricted or stable APIs
695/// while still using the same remote messaging machinery.
696///
697/// Implementing this trait means the type can be identified (`Named`)
698/// so the runtime knows what it is.
699///
700///  In contrast, [`RemoteSpawn`] is the trait that marks *actors*
701/// that can actually be **spawned remotely**. A behavior may be a
702/// `Referable` but is never a `RemoteSpawn`.
703pub trait Referable: Named {}
704
705/// Binds determines how an actor's ports are bound to a specific
706/// reference type.
707pub trait Binds<A: Actor>: Referable {
708    /// Bind ports in this actor.
709    fn bind(ports: &Ports<A>);
710}
711
712/// Handles is a marker trait specifying that message type [`M`]
713/// is handled by a specific actor type.
714pub trait RemoteHandles<M: RemoteMessage>: Referable {}
715
716/// Check if the actor behaves-as the a given behavior (defined by [`behavior!`]).
717///
718/// ```
719/// # use serde::Serialize;
720/// # use serde::Deserialize;
721/// # use typeuri::Named;
722/// # use hyperactor::Actor;
723///
724/// // First, define a behavior, based on handling a single message type `()`.
725/// hyperactor::behavior!(UnitBehavior, ());
726///
727/// #[derive(Debug, Default)]
728/// struct MyActor;
729///
730/// impl Actor for MyActor {}
731///
732/// #[async_trait::async_trait]
733/// impl hyperactor::Handler<()> for MyActor {
734///     async fn handle(
735///         &mut self,
736///         _cx: &hyperactor::Context<Self>,
737///         _message: (),
738///     ) -> Result<(), anyhow::Error> {
739///         // no-op
740///         Ok(())
741///     }
742/// }
743///
744/// hyperactor::assert_behaves!(MyActor as UnitBehavior);
745/// ```
746#[macro_export]
747macro_rules! assert_behaves {
748    ($ty:ty as $behavior:ty) => {
749        const _: fn() = || {
750            fn check<B: hyperactor::actor::Binds<$ty>>() {}
751            check::<$behavior>();
752        };
753    };
754}
755
756#[cfg(test)]
757mod tests {
758    use std::sync::Mutex;
759    use std::time::Duration;
760
761    use tokio::time::timeout;
762
763    use super::*;
764    use crate as hyperactor;
765    use crate::Actor;
766    use crate::OncePortHandle;
767    use crate::PortRef;
768    use crate::checkpoint::CheckpointError;
769    use crate::checkpoint::Checkpointable;
770    use crate::test_utils::pingpong::PingPongActor;
771    use crate::test_utils::pingpong::PingPongMessage;
772    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; // for macros
773
774    #[derive(Debug)]
775    struct EchoActor(PortRef<u64>);
776
777    #[async_trait]
778    impl Actor for EchoActor {}
779
780    #[async_trait]
781    impl Handler<u64> for EchoActor {
782        async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
783            let Self(port) = self;
784            port.send(cx, message)?;
785            Ok(())
786        }
787    }
788
789    #[tokio::test]
790    async fn test_server_basic() {
791        let proc = Proc::local();
792        let client = proc.attach("client").unwrap();
793        let (tx, mut rx) = client.open_port();
794        let actor = EchoActor(tx.bind());
795        let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
796        handle.send(123u64).unwrap();
797        handle.drain_and_stop().unwrap();
798        handle.await;
799
800        assert_eq!(rx.drain(), vec![123u64]);
801    }
802
803    #[tokio::test]
804    async fn test_ping_pong() {
805        let proc = Proc::local();
806        let client = proc.attach("client").unwrap();
807        let (undeliverable_msg_tx, _) = client.open_port();
808
809        let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
810        let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
811        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
812        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
813
814        let (local_port, local_receiver) = client.open_once_port();
815
816        ping_handle
817            .send(PingPongMessage(10, pong_handle.bind(), local_port.bind()))
818            .unwrap();
819
820        assert!(local_receiver.recv().await.unwrap());
821    }
822
823    #[tokio::test]
824    async fn test_ping_pong_on_handler_error() {
825        let proc = Proc::local();
826        let client = proc.attach("client").unwrap();
827        let (undeliverable_msg_tx, _) = client.open_port();
828
829        // Need to set a supervison coordinator for this Proc because there will
830        // be actor failure(s) in this test which trigger supervision.
831        ProcSupervisionCoordinator::set(&proc).await.unwrap();
832
833        let error_ttl = 66;
834
835        let ping_actor =
836            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
837        let pong_actor =
838            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
839        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
840        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
841
842        let (local_port, local_receiver) = client.open_once_port();
843
844        ping_handle
845            .send(PingPongMessage(
846                error_ttl + 1, // will encounter an error at TTL=66
847                pong_handle.bind(),
848                local_port.bind(),
849            ))
850            .unwrap();
851
852        // TODO: Fix this receiver hanging issue in T200423722.
853        #[allow(clippy::disallowed_methods)]
854        let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
855            timeout(Duration::from_secs(5), local_receiver.recv()).await;
856        assert!(res.is_err());
857    }
858
859    #[derive(Debug)]
860    struct InitActor(bool);
861
862    #[async_trait]
863    impl Actor for InitActor {
864        async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
865            self.0 = true;
866            Ok(())
867        }
868    }
869
870    #[async_trait]
871    impl Handler<OncePortHandle<bool>> for InitActor {
872        async fn handle(
873            &mut self,
874            _cx: &Context<Self>,
875            port: OncePortHandle<bool>,
876        ) -> Result<(), anyhow::Error> {
877            port.send(self.0)?;
878            Ok(())
879        }
880    }
881
882    #[tokio::test]
883    async fn test_init() {
884        let proc = Proc::local();
885        let actor = InitActor(false);
886        let handle = proc.spawn::<InitActor>("init", actor).unwrap();
887        let client = proc.attach("client").unwrap();
888
889        let (port, receiver) = client.open_once_port();
890        handle.send(port).unwrap();
891        assert!(receiver.recv().await.unwrap());
892
893        handle.drain_and_stop().unwrap();
894        handle.await;
895    }
896
897    #[derive(Debug)]
898    struct CheckpointActor {
899        // The actor does nothing but sum the values of messages.
900        sum: u64,
901        port: PortRef<u64>,
902    }
903
904    #[async_trait]
905    impl Actor for CheckpointActor {}
906
907    #[async_trait]
908    impl Handler<u64> for CheckpointActor {
909        async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
910            self.sum += value;
911            self.port.send(cx, self.sum)?;
912            Ok(())
913        }
914    }
915
916    #[async_trait]
917    impl Checkpointable for CheckpointActor {
918        type State = (u64, PortRef<u64>);
919
920        async fn save(&self) -> Result<Self::State, CheckpointError> {
921            Ok((self.sum, self.port.clone()))
922        }
923
924        async fn load(state: Self::State) -> Result<Self, CheckpointError> {
925            let (sum, port) = state;
926            Ok(CheckpointActor { sum, port })
927        }
928    }
929
930    type MultiValues = Arc<Mutex<(u64, String)>>;
931
932    struct MultiValuesTest {
933        proc: Proc,
934        values: MultiValues,
935        handle: ActorHandle<MultiActor>,
936        client: Instance<()>,
937        _client_handle: ActorHandle<()>,
938    }
939
940    impl MultiValuesTest {
941        async fn new() -> Self {
942            let proc = Proc::local();
943            let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
944            let actor = MultiActor(values.clone());
945            let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
946            let (client, client_handle) = proc.instance("client").unwrap();
947            Self {
948                proc,
949                values,
950                handle,
951                client,
952                _client_handle: client_handle,
953            }
954        }
955
956        fn send<M>(&self, message: M)
957        where
958            M: RemoteMessage,
959            MultiActor: Handler<M>,
960        {
961            self.handle.send(message).unwrap()
962        }
963
964        async fn sync(&self) {
965            let (port, done) = self.client.open_once_port::<bool>();
966            self.handle.send(port).unwrap();
967            assert!(done.recv().await.unwrap());
968        }
969
970        fn get_values(&self) -> (u64, String) {
971            self.values.lock().unwrap().clone()
972        }
973    }
974
975    #[derive(Debug)]
976    #[hyperactor::export(handlers = [u64, String])]
977    struct MultiActor(MultiValues);
978
979    #[async_trait]
980    impl Actor for MultiActor {}
981
982    #[async_trait]
983    impl Handler<u64> for MultiActor {
984        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
985            let mut vals = self.0.lock().unwrap();
986            vals.0 = message;
987            Ok(())
988        }
989    }
990
991    #[async_trait]
992    impl Handler<String> for MultiActor {
993        async fn handle(
994            &mut self,
995            _cx: &Context<Self>,
996            message: String,
997        ) -> Result<(), anyhow::Error> {
998            let mut vals = self.0.lock().unwrap();
999            vals.1 = message;
1000            Ok(())
1001        }
1002    }
1003
1004    #[async_trait]
1005    impl Handler<OncePortHandle<bool>> for MultiActor {
1006        async fn handle(
1007            &mut self,
1008            _cx: &Context<Self>,
1009            message: OncePortHandle<bool>,
1010        ) -> Result<(), anyhow::Error> {
1011            message.send(true).unwrap();
1012            Ok(())
1013        }
1014    }
1015
1016    #[tokio::test]
1017    async fn test_multi_handler_refs() {
1018        let test = MultiValuesTest::new().await;
1019
1020        test.send(123u64);
1021        test.send("foo".to_string());
1022        test.sync().await;
1023        assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1024
1025        let myref: ActorRef<MultiActor> = test.handle.bind();
1026
1027        myref.port().send(&test.client, 321u64).unwrap();
1028        test.sync().await;
1029        assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1030
1031        myref.port().send(&test.client, "bar".to_string()).unwrap();
1032        test.sync().await;
1033        assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1034    }
1035
1036    #[tokio::test]
1037    async fn test_ref_behavior() {
1038        let test = MultiValuesTest::new().await;
1039
1040        test.send(123u64);
1041        test.send("foo".to_string());
1042
1043        hyperactor::behavior!(MyActorBehavior, u64, String);
1044
1045        let myref: ActorRef<MyActorBehavior> = test.handle.bind();
1046        myref.port().send(&test.client, "biz".to_string()).unwrap();
1047        myref.port().send(&test.client, 999u64).unwrap();
1048
1049        test.sync().await;
1050        assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1051    }
1052
1053    #[tokio::test]
1054    async fn test_actor_handle_downcast() {
1055        #[derive(Debug, Default)]
1056        struct NothingActor;
1057
1058        impl Actor for NothingActor {}
1059
1060        // Just test that we can round-trip the handle through a downcast.
1061
1062        let proc = Proc::local();
1063        let handle = proc.spawn("nothing", NothingActor).unwrap();
1064        let cell = handle.cell();
1065
1066        // Invalid actor doesn't succeed.
1067        assert!(cell.downcast_handle::<EchoActor>().is_none());
1068
1069        let handle = cell.downcast_handle::<NothingActor>().unwrap();
1070        handle.drain_and_stop().unwrap();
1071        handle.await;
1072    }
1073}