hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // Allow until this is used outside of tests.
10
11//! This module contains all the core traits required to define and manage actors.
12
13use std::any::TypeId;
14use std::borrow::Cow;
15use std::fmt;
16use std::fmt::Debug;
17use std::future::Future;
18use std::future::IntoFuture;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use hyperactor_config::Flattrs;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::watch;
31use tokio::task::JoinHandle;
32use typeuri::Named;
33
34use crate as hyperactor; // for macros
35use crate::Data;
36use crate::Message;
37use crate::RemoteMessage;
38use crate::checkpoint::CheckpointError;
39use crate::checkpoint::Checkpointable;
40use crate::context;
41use crate::mailbox::MailboxError;
42use crate::mailbox::MailboxSenderError;
43use crate::mailbox::MessageEnvelope;
44use crate::mailbox::PortHandle;
45use crate::mailbox::Undeliverable;
46use crate::mailbox::UndeliverableMessageError;
47use crate::mailbox::log::MessageLogError;
48use crate::message::Castable;
49use crate::message::IndexedErasedUnbound;
50use crate::proc::Context;
51use crate::proc::Instance;
52use crate::proc::InstanceCell;
53use crate::proc::Ports;
54use crate::proc::Proc;
55use crate::reference;
56use crate::supervision::ActorSupervisionEvent;
57
58pub mod remote;
59
60/// An Actor is an independent, asynchronous thread of execution. Each
61/// actor instance has a mailbox, whose messages are delivered through
62/// the method [`Actor::handle`].
63///
64/// Actors communicate with each other by way of message passing.
65/// Actors are assumed to be _deterministic_: that is, the state of an
66/// actor is determined by the set (and order) of messages it receives.
67#[async_trait]
68pub trait Actor: Sized + Send + 'static {
69    /// Initialize the actor, after the runtime has been fully initialized.
70    /// Init thus provides a mechanism by which an actor can reliably and always
71    /// receive some initial event that can be used to kick off further
72    /// (potentially delayed) processing.
73    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
74        // Default implementation: no init.
75        Ok(())
76    }
77
78    /// Cleanup things used by this actor before shutting down. Notably this function
79    /// is async and allows more complex cleanup. Simpler cleanup can be handled
80    /// by the impl Drop for this Actor.
81    /// If err is not None, it is the error that this actor is failing with. Any
82    /// errors returned by this function will be logged and ignored.
83    /// If err is None, any errors returned by this function will be propagated
84    /// as an ActorError.
85    /// This function is not called if there is a panic in the actor, as the
86    /// actor may be in an indeterminate state. It is also not called if the
87    /// process is killed, there is no atexit handler or signal handler.
88    async fn cleanup(
89        &mut self,
90        _this: &Instance<Self>,
91        _err: Option<&ActorError>,
92    ) -> Result<(), anyhow::Error> {
93        // Default implementation: no cleanup.
94        Ok(())
95    }
96
97    /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
98    /// The spawned actor will be supervised by the parent (spawning) actor.
99    fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
100        cx.instance().spawn(self)
101    }
102
103    /// Spawn a named child actor. Same supervision semantics as
104    /// `spawn`, but the child gets `name` in its ActorId.
105    fn spawn_with_name(
106        self,
107        cx: &impl context::Actor,
108        name: &str,
109    ) -> anyhow::Result<ActorHandle<Self>> {
110        cx.instance().spawn_with_name(name, self)
111    }
112
113    /// Spawns this actor in a detached state, handling its messages
114    /// in a background task. The returned handle is used to control
115    /// the actor's lifecycle and to interact with it.
116    ///
117    /// Actors spawned through `spawn_detached` are not attached to a supervision
118    /// hierarchy, and not managed by a [`Proc`].
119    fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
120        Proc::local().spawn("anon", self)
121    }
122
123    /// This method is used by the runtime to spawn the actor server. It can be
124    /// used by actors that require customized runtime setups
125    /// (e.g., dedicated actor threads), or want to use a custom tokio runtime.
126    #[hyperactor::instrument_infallible]
127    fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
128    where
129        F: Future + Send + 'static,
130        F::Output: Send + 'static,
131    {
132        tokio::spawn(future)
133    }
134
135    /// Handle actor supervision event. Return `Ok(true)`` if the event is handled here.
136    async fn handle_supervision_event(
137        &mut self,
138        _this: &Instance<Self>,
139        _event: &ActorSupervisionEvent,
140    ) -> Result<bool, anyhow::Error> {
141        // By default, the supervision event is not handled, caller is expected to bubble it up.
142        Ok(false)
143    }
144
145    /// Default undeliverable message handling behavior.
146    async fn handle_undeliverable_message(
147        &mut self,
148        cx: &Instance<Self>,
149        envelope: Undeliverable<MessageEnvelope>,
150    ) -> Result<(), anyhow::Error> {
151        handle_undeliverable_message(cx, envelope)
152    }
153
154    /// If overridden, we will use this name in place of the
155    /// ActorId for talking about this actor in supervision error
156    /// messages.
157    fn display_name(&self) -> Option<String> {
158        None
159    }
160}
161
162/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
163/// as a free function so that `Actor` implementations that override
164/// [`Actor::handle_undeliverable_message`] can fallback to this default.
165pub fn handle_undeliverable_message<A: Actor>(
166    cx: &Instance<A>,
167    Undeliverable(envelope): Undeliverable<MessageEnvelope>,
168) -> Result<(), anyhow::Error> {
169    assert_eq!(envelope.sender(), cx.self_id());
170
171    anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
172}
173
174/// An actor that does nothing. It is used to represent "client only" actors,
175/// returned by [`Proc::instance`].
176#[async_trait]
177impl Actor for () {}
178
179impl Referable for () {}
180
181impl Binds<()> for () {
182    fn bind(_ports: &Ports<Self>) {
183        // Binds no ports.
184    }
185}
186
187/// A Handler allows an actor to handle a specific message type.
188#[async_trait]
189pub trait Handler<M>: Actor {
190    /// Handle the next M-typed message.
191    async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
192}
193
194/// We provide this handler to indicate that actors can handle the [`Signal`] message.
195/// Its actual handler is implemented by the runtime.
196#[async_trait]
197impl<A: Actor> Handler<Signal> for A {
198    async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
199        unimplemented!("signal handler should not be called directly")
200    }
201}
202
203/// This handler provides a default behavior when a message sent by
204/// the actor to another is returned due to delivery failure.
205#[async_trait]
206impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
207    async fn handle(
208        &mut self,
209        cx: &Context<Self>,
210        message: Undeliverable<MessageEnvelope>,
211    ) -> Result<(), anyhow::Error> {
212        let sender = message.0.sender().clone();
213        let dest = message.0.dest().clone();
214        let error = message.0.error_msg().unwrap_or(String::new());
215        match self.handle_undeliverable_message(cx, message).await {
216            Ok(_) => {
217                tracing::debug!(
218                    actor_id = %cx.self_id(),
219                    name = "undeliverable_message_handled",
220                    %sender,
221                    %dest,
222                    error,
223                );
224                Ok(())
225            }
226            Err(e) => {
227                tracing::error!(
228                    actor_id = %cx.self_id(),
229                    name = "undeliverable_message",
230                    %sender,
231                    %dest,
232                    error,
233                    handler_error = %e,
234                );
235                Err(e)
236            }
237        }
238    }
239}
240
241/// This handler enables actors to unbind the [IndexedErasedUnbound]
242/// message, and forward the result to corresponding handler.
243#[async_trait]
244impl<A, M> Handler<IndexedErasedUnbound<M>> for A
245where
246    A: Handler<M>,
247    M: Castable,
248{
249    async fn handle(
250        &mut self,
251        cx: &Context<Self>,
252        msg: IndexedErasedUnbound<M>,
253    ) -> anyhow::Result<()> {
254        let message = msg.downcast()?.bind()?;
255        Handler::handle(self, cx, message).await
256    }
257}
258
259/// An `Actor` that can be spawned remotely.
260///
261/// Bounds explained:
262/// - `Actor`: only actors may be remotely spawned.
263/// - `Referable`: marks the type as eligible for typed remote
264///   references (`ActorRef<A>`); required because remote spawn
265///   ultimately hands back an `ActorId` that higher-level APIs may
266///   re-type as `ActorRef<A>`.
267/// - `Binds<Self>`: lets the runtime wire this actor's message ports
268///   when it is spawned (the blanket impl calls `handle.bind::<Self>()`).
269///
270/// `gspawn` is a type-erased entry point used by the remote
271/// spawn/registry machinery. It takes serialized params and returns
272/// the new actor's `ActorId`; application code shouldn't call it
273/// directly.
274#[async_trait]
275pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
276    /// The type of parameters used to instantiate the actor remotely.
277    type Params: RemoteMessage;
278
279    /// Creates a new actor instance given its instantiation parameters.
280    /// The `environment` allows whoever is responsible for spawning this actor
281    /// to pass in additional context that may be useful.
282    async fn new(params: Self::Params, environment: Flattrs) -> anyhow::Result<Self>;
283
284    /// A type-erased entry point to spawn this actor. This is
285    /// primarily used by hyperactor's remote actor registration
286    /// mechanism.
287    // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`].
288    fn gspawn(
289        proc: &Proc,
290        name: &str,
291        serialized_params: Data,
292        environment: Flattrs,
293    ) -> Pin<Box<dyn Future<Output = Result<reference::ActorId, anyhow::Error>> + Send>> {
294        let proc = proc.clone();
295        let name = name.to_string();
296        Box::pin(async move {
297            let params = bincode::deserialize(&serialized_params)?;
298            let actor = Self::new(params, environment).await?;
299            let handle = proc.spawn(&name, actor)?;
300            // We return only the ActorId, not a typed ActorRef.
301            // Callers that hold this ID can interact with the actor
302            // only via the serialized/opaque messaging path, which
303            // makes it safe to export across process boundaries.
304            //
305            // Note: the actor itself is still `A`-typed here; we
306            // merely restrict the *capability* we hand out to an
307            // untyped identifier.
308            //
309            // This will be replaced by a proper export/registry
310            // mechanism.
311            Ok(handle.bind::<Self>().actor_id)
312        })
313    }
314
315    /// The type ID of this actor.
316    fn get_type_id() -> TypeId {
317        TypeId::of::<Self>()
318    }
319}
320
321/// If an actor implements Default, we use this as the
322/// `RemoteSpawn` implementation, too.
323#[async_trait]
324impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
325    type Params = ();
326
327    async fn new(_params: Self::Params, _environment: Flattrs) -> anyhow::Result<Self> {
328        Ok(Default::default())
329    }
330}
331
332#[async_trait]
333impl<T> Checkpointable for T
334where
335    T: RemoteMessage + Clone,
336{
337    type State = T;
338    async fn save(&self) -> Result<Self::State, CheckpointError> {
339        Ok(self.clone())
340    }
341
342    async fn load(state: Self::State) -> Result<Self, CheckpointError> {
343        Ok(state)
344    }
345}
346
347/// Errors that occur while serving actors. Each error is associated
348/// with the ID of the actor being served.
349#[derive(Debug)]
350pub struct ActorError {
351    /// The ActorId for the actor that generated this error.
352    pub actor_id: Box<reference::ActorId>,
353    /// The kind of error that occurred.
354    pub kind: Box<ActorErrorKind>,
355}
356
357/// The kinds of actor serving errors.
358#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
359pub enum ActorErrorKind {
360    /// Generic error with a formatted message.
361    #[error("{0}")]
362    Generic(String),
363
364    /// An error that occurred while trying to handle a supervision event.
365    #[error("{0} while handling {1}")]
366    ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
367
368    /// The actor did not attempt to handle
369    #[error("{0}")]
370    UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
371
372    /// The actor was explicitly aborted with the provided reason.
373    #[error("actor explicitly aborted due to: {0}")]
374    Aborted(String),
375}
376
377impl ActorErrorKind {
378    /// Error while processing actor, i.e., returned by the actor's
379    /// processing method.
380    pub fn processing(err: anyhow::Error) -> Self {
381        // Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
382        // If it is directly use it as the new ActorError's ActorErrorKind.
383        // This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
384        // up the handling infrastructure.
385        err.downcast::<ActorErrorKind>()
386            .unwrap_or_else(|err| Self::Generic(err.to_string()))
387    }
388
389    /// Unwound stracktrace of a panic.
390    pub fn panic(err: anyhow::Error) -> Self {
391        Self::Generic(format!("panic: {}", err))
392    }
393
394    /// Error during actor initialization.
395    pub fn init(err: anyhow::Error) -> Self {
396        Self::Generic(format!("initialization error: {}", err))
397    }
398
399    /// Error during actor cleanup.
400    pub fn cleanup(err: anyhow::Error) -> Self {
401        Self::Generic(format!("cleanup error: {}", err))
402    }
403
404    /// An underlying mailbox error.
405    pub fn mailbox(err: MailboxError) -> Self {
406        Self::Generic(err.to_string())
407    }
408
409    /// An underlying mailbox sender error.
410    pub fn mailbox_sender(err: MailboxSenderError) -> Self {
411        Self::Generic(err.to_string())
412    }
413
414    /// An underlying checkpoint error.
415    pub fn checkpoint(err: CheckpointError) -> Self {
416        Self::Generic(format!("checkpoint error: {}", err))
417    }
418
419    /// An underlying message log error.
420    pub fn message_log(err: MessageLogError) -> Self {
421        Self::Generic(format!("message log error: {}", err))
422    }
423
424    /// The actor's state could not be determined.
425    pub fn indeterminate_state() -> Self {
426        Self::Generic("actor is in an indeterminate state".to_string())
427    }
428}
429
430impl ActorError {
431    /// Create a new actor server error with the provided id and kind.
432    pub(crate) fn new(actor_id: &reference::ActorId, kind: ActorErrorKind) -> Self {
433        Self {
434            actor_id: Box::new(actor_id.clone()),
435            kind: Box::new(kind),
436        }
437    }
438}
439
440impl fmt::Display for ActorError {
441    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442        write!(f, "serving {}: ", self.actor_id)?;
443        fmt::Display::fmt(&self.kind, f)
444    }
445}
446
447impl std::error::Error for ActorError {
448    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
449        self.kind.source()
450    }
451}
452
453impl From<MailboxError> for ActorError {
454    fn from(inner: MailboxError) -> Self {
455        Self {
456            actor_id: Box::new(inner.actor_id().clone()),
457            kind: Box::new(ActorErrorKind::mailbox(inner)),
458        }
459    }
460}
461
462impl From<MailboxSenderError> for ActorError {
463    fn from(inner: MailboxSenderError) -> Self {
464        Self {
465            actor_id: Box::new(inner.location().actor_id().clone()),
466            kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
467        }
468    }
469}
470
471/// A collection of signals to control the behavior of the actor.
472/// Signals are internal runtime control plane messages and should not be
473/// sent outside of the runtime.
474///
475/// These messages are not handled directly by actors; instead, the runtime
476/// handles the various signals.
477#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
478pub enum Signal {
479    /// Stop the actor, after draining messages.
480    DrainAndStop(String),
481
482    /// Stop the actor immediately.
483    Stop(String),
484
485    /// The direct child with the given PID was stopped.
486    ChildStopped(reference::Index),
487
488    /// Abort the actor. This will exit the actor loop with an error,
489    /// causing a supervision event to propagate up the supervision
490    /// hierarchy.
491    Abort(String),
492}
493wirevalue::register_type!(Signal);
494
495impl fmt::Display for Signal {
496    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497        match self {
498            Signal::DrainAndStop(reason) => write!(f, "DrainAndStop({})", reason),
499            Signal::Stop(reason) => write!(f, "Stop({})", reason),
500            Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
501            Signal::Abort(reason) => write!(f, "Abort({})", reason),
502        }
503    }
504}
505
506/// Information about a message handler being processed.
507///
508/// Uses `Cow<'static, str>` to avoid string copies on the hot path.
509/// The typename and arm are typically static strings from `TypeInfo`.
510#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
511pub struct HandlerInfo {
512    /// The type name of the message being handled.
513    pub typename: Cow<'static, str>,
514    /// The enum arm being handled, if the message is an enum.
515    pub arm: Option<Cow<'static, str>>,
516}
517
518impl HandlerInfo {
519    /// Create a new `HandlerInfo` from static strings (zero-copy).
520    pub fn from_static(typename: &'static str, arm: Option<&'static str>) -> Self {
521        Self {
522            typename: Cow::Borrowed(typename),
523            arm: arm.map(Cow::Borrowed),
524        }
525    }
526
527    /// Create a new `HandlerInfo` from owned strings.
528    pub fn from_owned(typename: String, arm: Option<String>) -> Self {
529        Self {
530            typename: Cow::Owned(typename),
531            arm: arm.map(Cow::Owned),
532        }
533    }
534}
535
536impl fmt::Display for HandlerInfo {
537    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538        match &self.arm {
539            Some(arm) => write!(f, "{}.{}", self.typename, arm),
540            None => write!(f, "{}", self.typename),
541        }
542    }
543}
544
545/// The runtime status of an actor.
546#[derive(
547    Debug,
548    Serialize,
549    Deserialize,
550    PartialEq,
551    Eq,
552    Clone,
553    typeuri::Named,
554    EnumAsInner
555)]
556pub enum ActorStatus {
557    /// The actor status is unknown.
558    Unknown,
559    /// The actor was created, but not yet started.
560    Created,
561    /// The actor is initializing. It is not yet ready to receive messages.
562    Initializing,
563    /// The actor is in "client" state: the user is managing the actor's
564    /// mailboxes manually.
565    Client,
566    /// The actor is ready to receive messages, but is currently idle.
567    Idle,
568    /// The actor has been processing a message, beginning at the specified
569    /// instant. The message handler info is included.
570    Processing(SystemTime, Option<HandlerInfo>),
571    /// The actor has been saving its state.
572    Saving(SystemTime),
573    /// The actor has been loading its state.
574    Loading(SystemTime),
575    /// The actor is stopping. It is draining messages.
576    Stopping,
577    /// The actor is stopped with a provided reason.
578    /// It is no longer processing messages.
579    Stopped(String),
580    /// The actor failed with the provided actor error.
581    Failed(ActorErrorKind),
582}
583
584impl ActorStatus {
585    /// Tells whether the status is a terminal state.
586    pub fn is_terminal(&self) -> bool {
587        self.is_stopped() || self.is_failed()
588    }
589
590    /// Create a generic failure status with the provided error message.
591    pub fn generic_failure(message: impl Into<String>) -> Self {
592        Self::Failed(ActorErrorKind::Generic(message.into()))
593    }
594
595    fn span_string(&self) -> &'static str {
596        self.arm().unwrap_or_default()
597    }
598}
599
600impl fmt::Display for ActorStatus {
601    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602        match self {
603            Self::Unknown => write!(f, "unknown"),
604            Self::Created => write!(f, "created"),
605            Self::Initializing => write!(f, "initializing"),
606            Self::Client => write!(f, "client"),
607            Self::Idle => write!(f, "idle"),
608            Self::Processing(instant, None) => {
609                write!(
610                    f,
611                    "processing for {}ms",
612                    std::time::SystemTime::now()
613                        .duration_since(*instant)
614                        .unwrap_or_default()
615                        .as_millis()
616                )
617            }
618            Self::Processing(instant, Some(handler_info)) => {
619                write!(
620                    f,
621                    "{}: processing for {}ms",
622                    handler_info,
623                    std::time::SystemTime::now()
624                        .duration_since(*instant)
625                        .unwrap_or_default()
626                        .as_millis()
627                )
628            }
629            Self::Saving(instant) => {
630                write!(
631                    f,
632                    "saving for {}ms",
633                    std::time::SystemTime::now()
634                        .duration_since(*instant)
635                        .unwrap_or_default()
636                        .as_millis()
637                )
638            }
639            Self::Loading(instant) => {
640                write!(
641                    f,
642                    "loading for {}ms",
643                    std::time::SystemTime::now()
644                        .duration_since(*instant)
645                        .unwrap_or_default()
646                        .as_millis()
647                )
648            }
649            Self::Stopping => write!(f, "stopping"),
650            Self::Stopped(reason) => write!(f, "stopped: {}", reason),
651            Self::Failed(err) => write!(f, "failed: {}", err),
652        }
653    }
654}
655
656/// ActorHandles represent a (local) serving actor. It is used to access
657/// its messaging and signal ports, as well as to synchronize with its
658/// lifecycle (e.g., providing joins).  Once dropped, the handle is
659/// detached from the underlying actor instance, and there is no longer
660/// any way to join it.
661///
662/// Correspondingly, [`crate::ActorRef`]s refer to (possibly) remote
663/// actors.
664pub struct ActorHandle<A: Actor> {
665    cell: InstanceCell,
666    ports: Arc<Ports<A>>,
667}
668
669/// A handle to a running (local) actor.
670impl<A: Actor> ActorHandle<A> {
671    pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
672        Self { cell, ports }
673    }
674
675    /// The actor's cell. Used primarily for testing.
676    /// TODO: this should not be a public API.
677    pub(crate) fn cell(&self) -> &InstanceCell {
678        &self.cell
679    }
680
681    /// The [`ActorId`] of the actor represented by this handle.
682    pub fn actor_id(&self) -> &reference::ActorId {
683        self.cell.actor_id()
684    }
685
686    /// Signal the actor to drain its current messages and then stop.
687    pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
688        tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
689        self.cell.signal(Signal::DrainAndStop(reason.to_string()))
690    }
691
692    /// A watch that observes the lifecycle state of the actor.
693    pub fn status(&self) -> watch::Receiver<ActorStatus> {
694        self.cell.status().clone()
695    }
696
697    /// Send a message to the actor. Messages sent through the handle
698    /// are always queued in process, and do not require serialization.
699    pub fn send<M: Message>(
700        &self,
701        cx: &impl context::Actor,
702        message: M,
703    ) -> Result<(), MailboxSenderError>
704    where
705        A: Handler<M>,
706    {
707        self.ports.get().send(cx, message)
708    }
709
710    /// Return a port for the provided message type handled by the actor.
711    pub fn port<M: Message>(&self) -> PortHandle<M>
712    where
713        A: Handler<M>,
714    {
715        self.ports.get()
716    }
717
718    /// TEMPORARY: bind...
719    /// TODO: we shoudl also have a default binding(?)
720    pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
721        self.cell.bind(self.ports.as_ref())
722    }
723}
724
725/// IntoFuture allows users to await the handle to join it. The future
726/// resolves when the actor itself has stopped processing messages.
727/// The future resolves to the actor's final status.
728impl<A: Actor> IntoFuture for ActorHandle<A> {
729    type Output = ActorStatus;
730    type IntoFuture = BoxFuture<'static, Self::Output>;
731
732    fn into_future(self) -> Self::IntoFuture {
733        let future = async move {
734            let mut status_receiver = self.cell.status().clone();
735            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
736            match result {
737                Err(_) => ActorStatus::Unknown,
738                Ok(status) => status.clone(),
739            }
740        };
741
742        future.boxed()
743    }
744}
745
746impl<A: Actor> Debug for ActorHandle<A> {
747    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
748        f.debug_struct("ActorHandle").field("cell", &"..").finish()
749    }
750}
751
752impl<A: Actor> Clone for ActorHandle<A> {
753    fn clone(&self) -> Self {
754        Self {
755            cell: self.cell.clone(),
756            ports: self.ports.clone(),
757        }
758    }
759}
760
761/// `Referable` is a marker trait for types that can appear as
762/// remote references across process boundaries.
763///
764/// It is not limited to concrete [`Actor`] implementations. For
765/// example, façade types generated by [`behavior!`] implement
766/// `Referable` so that you can hand out restricted or stable APIs
767/// while still using the same remote messaging machinery.
768///
769/// Implementing this trait means the type can be identified (`Named`)
770/// so the runtime knows what it is.
771///
772///  In contrast, [`RemoteSpawn`] is the trait that marks *actors*
773/// that can actually be **spawned remotely**. A behavior may be a
774/// `Referable` but is never a `RemoteSpawn`.
775pub trait Referable: Named {}
776
777/// Binds determines how an actor's ports are bound to a specific
778/// reference type.
779pub trait Binds<A: Actor>: Referable {
780    /// Bind ports in this actor.
781    fn bind(ports: &Ports<A>);
782}
783
784/// Handles is a marker trait specifying that message type [`M`]
785/// is handled by a specific actor type.
786pub trait RemoteHandles<M: RemoteMessage>: Referable {}
787
788/// Check if the actor behaves-as the a given behavior (defined by [`behavior!`]).
789///
790/// ```
791/// # use serde::Serialize;
792/// # use serde::Deserialize;
793/// # use typeuri::Named;
794/// # use hyperactor::Actor;
795///
796/// // First, define a behavior, based on handling a single message type `()`.
797/// hyperactor::behavior!(UnitBehavior, ());
798///
799/// #[derive(Debug, Default)]
800/// struct MyActor;
801///
802/// impl Actor for MyActor {}
803///
804/// #[async_trait::async_trait]
805/// impl hyperactor::Handler<()> for MyActor {
806///     async fn handle(
807///         &mut self,
808///         _cx: &hyperactor::Context<Self>,
809///         _message: (),
810///     ) -> Result<(), anyhow::Error> {
811///         // no-op
812///         Ok(())
813///     }
814/// }
815///
816/// hyperactor::assert_behaves!(MyActor as UnitBehavior);
817/// ```
818#[macro_export]
819macro_rules! assert_behaves {
820    ($ty:ty as $behavior:ty) => {
821        const _: fn() = || {
822            fn check<B: hyperactor::actor::Binds<$ty>>() {}
823            check::<$behavior>();
824        };
825    };
826}
827
828#[cfg(test)]
829mod tests {
830    use std::sync::Mutex;
831    use std::time::Duration;
832
833    use rand::seq::SliceRandom;
834    use timed_test::async_timed_test;
835    use tokio::sync::mpsc;
836    use tokio::time::timeout;
837
838    use super::*;
839    use crate as hyperactor;
840    use crate::Actor;
841    use crate::OncePortHandle;
842    use crate::checkpoint::CheckpointError;
843    use crate::checkpoint::Checkpointable;
844    use crate::config;
845    use crate::context::Mailbox as _;
846    use crate::introspect::IntrospectMessage;
847    use crate::introspect::IntrospectResult;
848    use crate::introspect::IntrospectView;
849    use crate::mailbox::BoxableMailboxSender as _;
850    use crate::mailbox::MailboxSender;
851    use crate::mailbox::PortLocation;
852    use crate::mailbox::monitored_return_handle;
853    use crate::ordering::SEQ_INFO;
854    use crate::ordering::SeqInfo;
855    use crate::testing::ids::test_proc_id;
856    use crate::testing::pingpong::PingPongActor;
857    use crate::testing::pingpong::PingPongMessage;
858    use crate::testing::proc_supervison::ProcSupervisionCoordinator; // for macros
859
860    #[derive(Debug)]
861    struct EchoActor(reference::PortRef<u64>);
862
863    #[async_trait]
864    impl Actor for EchoActor {}
865
866    #[async_trait]
867    impl Handler<u64> for EchoActor {
868        async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
869            let Self(port) = self;
870            port.send(cx, message)?;
871            Ok(())
872        }
873    }
874
875    #[tokio::test]
876    async fn test_server_basic() {
877        let proc = Proc::local();
878        let (client, _) = proc.instance("client").unwrap();
879        let (tx, mut rx) = client.open_port();
880        let actor = EchoActor(tx.bind());
881        let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
882        handle.send(&client, 123u64).unwrap();
883        handle.drain_and_stop("test").unwrap();
884        handle.await;
885
886        assert_eq!(rx.drain(), vec![123u64]);
887    }
888
889    #[tokio::test]
890    async fn test_ping_pong() {
891        let proc = Proc::local();
892        let (client, _) = proc.instance("client").unwrap();
893        let (undeliverable_msg_tx, _) = client.open_port();
894
895        let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
896        let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
897        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
898        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
899
900        let (local_port, local_receiver) = client.open_once_port();
901
902        ping_handle
903            .send(
904                &client,
905                PingPongMessage(10, pong_handle.bind(), local_port.bind()),
906            )
907            .unwrap();
908
909        assert!(local_receiver.recv().await.unwrap());
910    }
911
912    #[tokio::test]
913    async fn test_ping_pong_on_handler_error() {
914        let proc = Proc::local();
915        let (client, _) = proc.instance("client").unwrap();
916        let (undeliverable_msg_tx, _) = client.open_port();
917
918        // Need to set a supervison coordinator for this Proc because there will
919        // be actor failure(s) in this test which trigger supervision.
920        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
921
922        let error_ttl = 66;
923
924        let ping_actor =
925            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
926        let pong_actor =
927            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
928        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
929        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
930
931        let (local_port, local_receiver) = client.open_once_port();
932
933        ping_handle
934            .send(
935                &client,
936                PingPongMessage(
937                    error_ttl + 1, // will encounter an error at TTL=66
938                    pong_handle.bind(),
939                    local_port.bind(),
940                ),
941            )
942            .unwrap();
943
944        // TODO: Fix this receiver hanging issue in T200423722.
945        let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
946            timeout(Duration::from_secs(5), local_receiver.recv()).await;
947        assert!(res.is_err());
948    }
949
950    #[derive(Debug)]
951    struct InitActor(bool);
952
953    #[async_trait]
954    impl Actor for InitActor {
955        async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
956            self.0 = true;
957            Ok(())
958        }
959    }
960
961    #[async_trait]
962    impl Handler<OncePortHandle<bool>> for InitActor {
963        async fn handle(
964            &mut self,
965            cx: &Context<Self>,
966            port: OncePortHandle<bool>,
967        ) -> Result<(), anyhow::Error> {
968            port.send(cx, self.0)?;
969            Ok(())
970        }
971    }
972
973    #[tokio::test]
974    async fn test_init() {
975        let proc = Proc::local();
976        let actor = InitActor(false);
977        let handle = proc.spawn::<InitActor>("init", actor).unwrap();
978        let (client, _) = proc.instance("client").unwrap();
979
980        let (port, receiver) = client.open_once_port();
981        handle.send(&client, port).unwrap();
982        assert!(receiver.recv().await.unwrap());
983
984        handle.drain_and_stop("test").unwrap();
985        handle.await;
986    }
987
988    #[derive(Debug)]
989    struct CheckpointActor {
990        // The actor does nothing but sum the values of messages.
991        sum: u64,
992        port: reference::PortRef<u64>,
993    }
994
995    #[async_trait]
996    impl Actor for CheckpointActor {}
997
998    #[async_trait]
999    impl Handler<u64> for CheckpointActor {
1000        async fn handle(&mut self, cx: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
1001            self.sum += value;
1002            self.port.send(cx, self.sum)?;
1003            Ok(())
1004        }
1005    }
1006
1007    #[async_trait]
1008    impl Checkpointable for CheckpointActor {
1009        type State = (u64, reference::PortRef<u64>);
1010
1011        async fn save(&self) -> Result<Self::State, CheckpointError> {
1012            Ok((self.sum, self.port.clone()))
1013        }
1014
1015        async fn load(state: Self::State) -> Result<Self, CheckpointError> {
1016            let (sum, port) = state;
1017            Ok(CheckpointActor { sum, port })
1018        }
1019    }
1020
1021    type MultiValues = Arc<Mutex<(u64, String)>>;
1022
1023    struct MultiValuesTest {
1024        proc: Proc,
1025        values: MultiValues,
1026        handle: ActorHandle<MultiActor>,
1027        client: Instance<()>,
1028        _client_handle: ActorHandle<()>,
1029    }
1030
1031    impl MultiValuesTest {
1032        async fn new() -> Self {
1033            let proc = Proc::local();
1034            let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
1035            let actor = MultiActor(values.clone());
1036            let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
1037            let (client, client_handle) = proc.instance("client").unwrap();
1038            Self {
1039                proc,
1040                values,
1041                handle,
1042                client,
1043                _client_handle: client_handle,
1044            }
1045        }
1046
1047        fn send<M>(&self, message: M)
1048        where
1049            M: RemoteMessage,
1050            MultiActor: Handler<M>,
1051        {
1052            self.handle.send(&self.client, message).unwrap()
1053        }
1054
1055        async fn sync(&self) {
1056            let (port, done) = self.client.open_once_port::<bool>();
1057            self.handle.send(&self.client, port).unwrap();
1058            assert!(done.recv().await.unwrap());
1059        }
1060
1061        fn get_values(&self) -> (u64, String) {
1062            self.values.lock().unwrap().clone()
1063        }
1064    }
1065
1066    #[derive(Debug)]
1067    #[hyperactor::export(handlers = [u64, String])]
1068    struct MultiActor(MultiValues);
1069
1070    #[async_trait]
1071    impl Actor for MultiActor {}
1072
1073    #[async_trait]
1074    impl Handler<u64> for MultiActor {
1075        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1076            let mut vals = self.0.lock().unwrap();
1077            vals.0 = message;
1078            Ok(())
1079        }
1080    }
1081
1082    #[async_trait]
1083    impl Handler<String> for MultiActor {
1084        async fn handle(
1085            &mut self,
1086            _cx: &Context<Self>,
1087            message: String,
1088        ) -> Result<(), anyhow::Error> {
1089            let mut vals = self.0.lock().unwrap();
1090            vals.1 = message;
1091            Ok(())
1092        }
1093    }
1094
1095    #[async_trait]
1096    impl Handler<OncePortHandle<bool>> for MultiActor {
1097        async fn handle(
1098            &mut self,
1099            cx: &Context<Self>,
1100            message: OncePortHandle<bool>,
1101        ) -> Result<(), anyhow::Error> {
1102            message.send(cx, true).unwrap();
1103            Ok(())
1104        }
1105    }
1106
1107    #[tokio::test]
1108    async fn test_multi_handler_refs() {
1109        let test = MultiValuesTest::new().await;
1110
1111        test.send(123u64);
1112        test.send("foo".to_string());
1113        test.sync().await;
1114        assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1115
1116        let myref: reference::ActorRef<MultiActor> = test.handle.bind();
1117
1118        myref.port().send(&test.client, 321u64).unwrap();
1119        test.sync().await;
1120        assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1121
1122        myref.port().send(&test.client, "bar".to_string()).unwrap();
1123        test.sync().await;
1124        assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1125    }
1126
1127    #[tokio::test]
1128    async fn test_ref_behavior() {
1129        let test = MultiValuesTest::new().await;
1130
1131        test.send(123u64);
1132        test.send("foo".to_string());
1133
1134        hyperactor::behavior!(MyActorBehavior, u64, String);
1135
1136        let myref: reference::ActorRef<MyActorBehavior> = test.handle.bind();
1137        myref.port().send(&test.client, "biz".to_string()).unwrap();
1138        myref.port().send(&test.client, 999u64).unwrap();
1139
1140        test.sync().await;
1141        assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1142    }
1143
1144    #[tokio::test]
1145    async fn test_actor_handle_downcast() {
1146        #[derive(Debug, Default)]
1147        struct NothingActor;
1148
1149        impl Actor for NothingActor {}
1150
1151        // Just test that we can round-trip the handle through a downcast.
1152
1153        let proc = Proc::local();
1154        let handle = proc.spawn("nothing", NothingActor).unwrap();
1155        let cell = handle.cell();
1156
1157        // Invalid actor doesn't succeed.
1158        assert!(cell.downcast_handle::<EchoActor>().is_none());
1159
1160        let handle = cell.downcast_handle::<NothingActor>().unwrap();
1161        handle.drain_and_stop("test").unwrap();
1162        handle.await;
1163    }
1164
1165    // Returning the sequence number assigned to the message.
1166    #[derive(Debug)]
1167    #[hyperactor::export(handlers = [String, Callback])]
1168    struct GetSeqActor(reference::PortRef<(String, SeqInfo)>);
1169
1170    #[async_trait]
1171    impl Actor for GetSeqActor {}
1172
1173    #[async_trait]
1174    impl Handler<String> for GetSeqActor {
1175        async fn handle(
1176            &mut self,
1177            cx: &Context<Self>,
1178            message: String,
1179        ) -> Result<(), anyhow::Error> {
1180            let Self(port) = self;
1181            let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1182            port.send(cx, (message, seq_info.clone()))?;
1183            Ok(())
1184        }
1185    }
1186
1187    // Unlike Handler<String>, where the sender provides the string message
1188    // directly, in Handler<Callback>, sender needs to provide a port, and
1189    // handler will reply that port with its own callback port. Then sender can
1190    // send the string message through this callback port.
1191    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
1192    struct Callback(reference::PortRef<reference::PortRef<String>>);
1193
1194    #[async_trait]
1195    impl Handler<Callback> for GetSeqActor {
1196        async fn handle(
1197            &mut self,
1198            cx: &Context<Self>,
1199            message: Callback,
1200        ) -> Result<(), anyhow::Error> {
1201            let (handle, mut receiver) = cx.open_port::<String>();
1202            let callback_ref = handle.bind();
1203            message.0.send(cx, callback_ref).unwrap();
1204            let msg = receiver.recv().await.unwrap();
1205            self.handle(cx, msg).await
1206        }
1207    }
1208
1209    #[async_timed_test(timeout_secs = 30)]
1210    async fn test_sequencing_actor_handle_basic() {
1211        let proc = Proc::local();
1212        let (client, _) = proc.instance("client").unwrap();
1213        let (tx, mut rx) = client.open_port();
1214
1215        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1216
1217        // Verify that unbound handle can send message.
1218        actor_handle.send(&client, "unbound".to_string()).unwrap();
1219        assert_eq!(
1220            rx.recv().await.unwrap(),
1221            ("unbound".to_string(), SeqInfo::Direct)
1222        );
1223
1224        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1225
1226        let session_id = client.sequencer().session_id();
1227        let mut expected_seq = 0;
1228        // Interleave messages sent through the handle and the reference.
1229        for m in 0..10 {
1230            actor_handle.send(&client, format!("{m}")).unwrap();
1231            expected_seq += 1;
1232            assert_eq!(
1233                rx.recv().await.unwrap(),
1234                (
1235                    format!("{m}"),
1236                    SeqInfo::Session {
1237                        session_id,
1238                        seq: expected_seq,
1239                    }
1240                )
1241            );
1242
1243            for n in 0..2 {
1244                actor_ref.port().send(&client, format!("{m}-{n}")).unwrap();
1245                expected_seq += 1;
1246                assert_eq!(
1247                    rx.recv().await.unwrap(),
1248                    (
1249                        format!("{m}-{n}"),
1250                        SeqInfo::Session {
1251                            session_id,
1252                            seq: expected_seq,
1253                        }
1254                    )
1255                );
1256            }
1257        }
1258    }
1259
1260    // Test that actor ports share a sequence while non-actor ports get their own.
1261    #[async_timed_test(timeout_secs = 30)]
1262    async fn test_sequencing_mixed_actor_and_non_actor_ports() {
1263        let proc = Proc::local();
1264        let (client, _) = proc.instance("client").unwrap();
1265
1266        // Port for receiving seq info from actor handler
1267        let (actor_tx, mut actor_rx) = client.open_port();
1268
1269        // Channel for receiving seq info from non-actor port
1270        let (non_actor_tx, mut non_actor_rx) = mpsc::unbounded_channel::<Option<SeqInfo>>();
1271
1272        let actor_handle = proc.spawn("get_seq", GetSeqActor(actor_tx.bind())).unwrap();
1273        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1274
1275        // Create a non-actor port using open_enqueue_port
1276        let non_actor_tx_clone = non_actor_tx.clone();
1277        let non_actor_port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1278            let seq_info = headers.get(SEQ_INFO);
1279            non_actor_tx_clone.send(seq_info).unwrap();
1280            Ok(())
1281        });
1282
1283        // Bind the port to get a port ID
1284        non_actor_port_handle.bind();
1285        let non_actor_port_id = match non_actor_port_handle.location() {
1286            PortLocation::Bound(port_id) => port_id,
1287            _ => panic!("port_handle should be bound"),
1288        };
1289        assert!(!non_actor_port_id.is_actor_port());
1290
1291        let session_id = client.sequencer().session_id();
1292
1293        // Send to actor ports via ActorHandle - seq 1
1294        actor_handle.send(&client, "msg1".to_string()).unwrap();
1295        assert_eq!(
1296            actor_rx.recv().await.unwrap().1,
1297            SeqInfo::Session { session_id, seq: 1 }
1298        );
1299
1300        // Send to actor ports via ActorRef - seq 2 (shared with ActorHandle)
1301        actor_ref.port().send(&client, "msg2".to_string()).unwrap();
1302        assert_eq!(
1303            actor_rx.recv().await.unwrap().1,
1304            SeqInfo::Session { session_id, seq: 2 }
1305        );
1306
1307        // Send to non-actor port - has its own sequence starting at 1
1308        non_actor_port_handle.send(&client, ()).unwrap();
1309        assert_eq!(
1310            non_actor_rx.recv().await.unwrap(),
1311            Some(SeqInfo::Session { session_id, seq: 1 })
1312        );
1313
1314        // Send more to actor ports via ActorHandle - seq continues at 3
1315        actor_handle.send(&client, "msg3".to_string()).unwrap();
1316        assert_eq!(
1317            actor_rx.recv().await.unwrap().1,
1318            SeqInfo::Session { session_id, seq: 3 }
1319        );
1320
1321        // Send more to non-actor port - its sequence continues at 2
1322        non_actor_port_handle.send(&client, ()).unwrap();
1323        assert_eq!(
1324            non_actor_rx.recv().await.unwrap(),
1325            Some(SeqInfo::Session { session_id, seq: 2 })
1326        );
1327
1328        // Send via ActorRef again - seq 4
1329        actor_ref.port().send(&client, "msg4".to_string()).unwrap();
1330        assert_eq!(
1331            actor_rx.recv().await.unwrap().1,
1332            SeqInfo::Session { session_id, seq: 4 }
1333        );
1334
1335        actor_handle.drain_and_stop("test cleanup").unwrap();
1336        actor_handle.await;
1337    }
1338
1339    // Test that messages from different clients get independent sequence schemes.
1340    #[async_timed_test(timeout_secs = 30)]
1341    async fn test_sequencing_multiple_clients() {
1342        let proc = Proc::local();
1343        let (client1, _) = proc.instance("client1").unwrap();
1344        let (client2, _) = proc.instance("client2").unwrap();
1345
1346        // Port for receiving seq info from actor handler
1347        let (tx, mut rx) = client1.open_port();
1348
1349        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1350        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1351
1352        // Each client should have a different session_id
1353        let session_id_1 = client1.sequencer().session_id();
1354        let session_id_2 = client2.sequencer().session_id();
1355        assert_ne!(session_id_1, session_id_2);
1356
1357        // Send from client1 via ActorHandle - seq 1 for session_id_1
1358        actor_handle.send(&client1, "c1_msg1".to_string()).unwrap();
1359        assert_eq!(
1360            rx.recv().await.unwrap().1,
1361            SeqInfo::Session {
1362                session_id: session_id_1,
1363                seq: 1
1364            }
1365        );
1366
1367        // Send from client2 via ActorHandle - seq 1 for session_id_2 (independent)
1368        actor_handle.send(&client2, "c2_msg1".to_string()).unwrap();
1369        assert_eq!(
1370            rx.recv().await.unwrap().1,
1371            SeqInfo::Session {
1372                session_id: session_id_2,
1373                seq: 1
1374            }
1375        );
1376
1377        // Send from client1 via ActorRef - seq 2 for session_id_1
1378        actor_ref
1379            .port()
1380            .send(&client1, "c1_msg2".to_string())
1381            .unwrap();
1382        assert_eq!(
1383            rx.recv().await.unwrap().1,
1384            SeqInfo::Session {
1385                session_id: session_id_1,
1386                seq: 2
1387            }
1388        );
1389
1390        // Send from client2 via ActorRef - seq 2 for session_id_2
1391        actor_ref
1392            .port()
1393            .send(&client2, "c2_msg2".to_string())
1394            .unwrap();
1395        assert_eq!(
1396            rx.recv().await.unwrap().1,
1397            SeqInfo::Session {
1398                session_id: session_id_2,
1399                seq: 2
1400            }
1401        );
1402
1403        // Interleave more messages to further verify independence
1404        actor_handle.send(&client1, "c1_msg3".to_string()).unwrap();
1405        assert_eq!(
1406            rx.recv().await.unwrap().1,
1407            SeqInfo::Session {
1408                session_id: session_id_1,
1409                seq: 3
1410            }
1411        );
1412
1413        actor_ref
1414            .port()
1415            .send(&client2, "c2_msg3".to_string())
1416            .unwrap();
1417        assert_eq!(
1418            rx.recv().await.unwrap().1,
1419            SeqInfo::Session {
1420                session_id: session_id_2,
1421                seq: 3
1422            }
1423        );
1424
1425        actor_handle.drain_and_stop("test cleanup").unwrap();
1426        actor_handle.await;
1427    }
1428
1429    // Verify that ordering is guarranteed based on
1430    //   * (sender actor , client actor, port stream)
1431    // not
1432    //   * (sender actor, client actor)
1433    //
1434    // For "port stream",
1435    //   * actor ports of the same actor belongs to the same stream;
1436    //   * non-actor port has its independent stream.
1437    //
1438    // Specifically, in this test,
1439    //   * client sends a Callback message to dest actor's handler;
1440    //   * while dest actor is still processing that message, client sends
1441    //     another non-handler message to dest actor.
1442    //
1443    // If the ordering is based on (sender actor, client actor), this test would
1444    // hang, since dest actor is deadlock on waiting for the 2nd message while
1445    // still processing the 2nd message.
1446    //
1447    // But since port stream is also part of the ordering guarrantee, such
1448    // deadlock should not happen.
1449    #[async_timed_test(timeout_secs = 30)]
1450    async fn test_sequencing_actor_handle_callback() {
1451        let config = hyperactor_config::global::lock();
1452        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1453
1454        let proc = Proc::local();
1455        let (client, _) = proc.instance("client").unwrap();
1456        let (tx, mut rx) = client.open_port();
1457
1458        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1459        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1460
1461        let (callback_tx, mut callback_rx) = client.open_port();
1462        // Client sends the 1st message
1463        actor_ref
1464            .send(&client, Callback(callback_tx.bind()))
1465            .unwrap();
1466        let msg_port_ref = callback_rx.recv().await.unwrap();
1467        // client sends the 2nd message. At this time, GetSeqActor is still
1468        // processing the 1st message, and waiting for the 2nd message.
1469        msg_port_ref.send(&client, "finally".to_string()).unwrap();
1470
1471        let session_id = client.sequencer().session_id();
1472        // passing this assert means GetSeqActor processed the 2nd message.
1473        assert_eq!(
1474            rx.recv().await.unwrap(),
1475            (
1476                "finally".to_string(),
1477                SeqInfo::Session { session_id, seq: 1 }
1478            )
1479        );
1480    }
1481
1482    // Adding a delay before sending the destination proc. Useful for tests
1483    // requiring latency injection.
1484    #[derive(Clone, Debug)]
1485    struct DelayedMailboxSender {
1486        relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1487    }
1488
1489    impl DelayedMailboxSender {
1490        // Use a random latency between 0 and 1 second if the plan is empty.
1491        fn new(
1492            // The proc that hosts the dest actor. By posting envelope to this
1493            // proc, this proc will route that evenlope to the dest actor.
1494            dest_proc: Proc,
1495            // Vec index is the message seq - 1, value is the order this message
1496            // would be relayed to the dest actor. Dest actor is responsible to
1497            // ensure itself processes these messages in order.
1498            relay_orders: Vec<usize>,
1499        ) -> Self {
1500            let (relay_tx, mut relay_rx) = mpsc::unbounded_channel::<MessageEnvelope>();
1501
1502            tokio::spawn(async move {
1503                let mut buffer = Vec::new();
1504
1505                for _ in 0..relay_orders.len() {
1506                    let envelope = relay_rx.recv().await.unwrap();
1507                    buffer.push(envelope);
1508                }
1509
1510                for m in buffer.clone() {
1511                    let seq = match m.headers().get(SEQ_INFO).expect("seq should be set") {
1512                        SeqInfo::Session { seq, .. } => seq as usize,
1513                        SeqInfo::Direct => panic!("expected Session variant"),
1514                    };
1515                    // seq no is one-based.
1516                    let order = relay_orders[seq - 1];
1517                    buffer[order] = m;
1518                }
1519
1520                let dest_proc_clone = dest_proc.clone();
1521                for msg in buffer {
1522                    dest_proc_clone.post(msg, monitored_return_handle());
1523                }
1524            });
1525
1526            Self { relay_tx }
1527        }
1528    }
1529
1530    impl MailboxSender for DelayedMailboxSender {
1531        fn post_unchecked(
1532            &self,
1533            envelope: MessageEnvelope,
1534            _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1535        ) {
1536            self.relay_tx.send(envelope).unwrap();
1537        }
1538    }
1539
1540    async fn assert_out_of_order_delivery(expected: Vec<(String, u64)>, relay_orders: Vec<usize>) {
1541        let local_proc: Proc = Proc::local();
1542        let (client, _) = local_proc.instance("local").unwrap();
1543        let (tx, mut rx) = client.open_port();
1544
1545        let handle = local_proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1546        let actor_ref: reference::ActorRef<GetSeqActor> = handle.bind();
1547
1548        let remote_proc = Proc::configured(
1549            test_proc_id("remote_0"),
1550            DelayedMailboxSender::new(local_proc.clone(), relay_orders).boxed(),
1551        );
1552        let (remote_client, _) = remote_proc.instance("remote").unwrap();
1553        // Send the messages out in the order of their expected sequence numbers.
1554        let mut messages = expected.clone();
1555        messages.sort_by_key(|v| v.1);
1556        for (message, _seq) in messages {
1557            actor_ref.send(&remote_client, message).unwrap();
1558        }
1559        let session_id = remote_client.sequencer().session_id();
1560        for expect in expected {
1561            let expected = (
1562                expect.0,
1563                SeqInfo::Session {
1564                    session_id,
1565                    seq: expect.1,
1566                },
1567            );
1568            assert_eq!(rx.recv().await.unwrap(), expected);
1569        }
1570
1571        handle.drain_and_stop("test cleanup").unwrap();
1572        handle.await;
1573    }
1574
1575    // Send several messages, use DelayedMailboxSender and the relay orders to
1576    // ensure these messages will arrive at handler's workq out-of-order.
1577    // Then verify the actor handler will still process these messages based on
1578    // their sending order if reordering buffer is enabled.
1579    #[async_timed_test(timeout_secs = 30)]
1580    async fn test_sequencing_actor_ref_known_delivery_order() {
1581        let config = hyperactor_config::global::lock();
1582
1583        // relay order is second, third, first
1584        let relay_orders = vec![2, 0, 1];
1585
1586        // By disabling the actor side re-ordering buffer, the mssages will
1587        // be processed in the same order as they sent out.
1588        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
1589        assert_out_of_order_delivery(
1590            vec![
1591                ("second".to_string(), 2),
1592                ("third".to_string(), 3),
1593                ("first".to_string(), 1),
1594            ],
1595            relay_orders.clone(),
1596        )
1597        .await;
1598
1599        // By enabling the actor side re-ordering buffer, the mssages will
1600        // be re-ordered before being processed.
1601        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1602        assert_out_of_order_delivery(
1603            vec![
1604                ("first".to_string(), 1),
1605                ("second".to_string(), 2),
1606                ("third".to_string(), 3),
1607            ],
1608            relay_orders.clone(),
1609        )
1610        .await;
1611    }
1612
1613    // Send a large nubmer of messages, use DelayedMailboxSender to ensure these
1614    // messages will arrive at handler's workq in a random order. Then verify the
1615    // actor handler will still process these messages based on their sending
1616    // order with reordering buffer enabled.
1617    #[async_timed_test(timeout_secs = 30)]
1618    async fn test_sequencing_actor_ref_random_delivery_order() {
1619        let config = hyperactor_config::global::lock();
1620
1621        // By enabling the actor side re-ordering buffer, the mssages will
1622        // be re-ordered before being processed.
1623        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1624        let expected = (0..10000)
1625            .map(|i| (format!("msg{i}"), i + 1))
1626            .collect::<Vec<_>>();
1627
1628        let mut relay_orders: Vec<usize> = (0..10000).collect();
1629        relay_orders.shuffle(&mut rand::thread_rng());
1630        assert_out_of_order_delivery(expected, relay_orders).await;
1631    }
1632
1633    /// Verifies the default blanket introspection handler for a plain
1634    /// actor.
1635    ///
1636    /// This test spawns a simple `EchoActor`, sends it
1637    /// `IntrospectMessage::Query`, and checks that the returned
1638    /// `IntrospectResult` matches the framework’s structural default:
1639    ///
1640    /// - `identity` matches the actor id
1641    /// - `attrs` contains actor-runtime keys (status, actor_type, etc.)
1642    /// - no supervision children are reported
1643    /// - `supervisor` is None because this actor is spawned as a
1644    ///   root/top-level actor in the proc (only supervised child actors
1645    ///   report a supervisor id).
1646    ///
1647    /// This exercises the end-to-end introspect task path rather than
1648    /// calling `live_actor_payload` directly, ensuring the runtime
1649    /// wiring behaves as expected.
1650    #[tokio::test]
1651    async fn test_introspect_query_default_payload() {
1652        let proc = Proc::local();
1653        let (client, _) = proc.instance("client").unwrap();
1654        let (tx, _rx) = client.open_port::<u64>();
1655        let actor = EchoActor(tx.bind());
1656        let handle = proc.spawn::<EchoActor>("echo_introspect", actor).unwrap();
1657
1658        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1659        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1660            .send(
1661                &client,
1662                IntrospectMessage::Query {
1663                    view: IntrospectView::Actor,
1664                    reply: reply_port.bind(),
1665                },
1666            )
1667            .unwrap();
1668        let payload = reply_rx.recv().await.unwrap();
1669
1670        assert_eq!(payload.identity, handle.actor_id().to_string());
1671        assert_valid_attrs(&payload);
1672        assert_has_attr(&payload, "status");
1673        assert_has_attr(&payload, "actor_type");
1674        assert_has_attr(&payload, "created_at");
1675        assert!(payload.children.is_empty());
1676        assert!(payload.parent.is_none());
1677
1678        handle.drain_and_stop("test").unwrap();
1679        handle.await;
1680    }
1681
1682    /// Helper: look up an attr in the attrs JSON by short name.
1683    fn attrs_get(attrs_json: &str, short_name: &str) -> Option<serde_json::Value> {
1684        use hyperactor_config::INTROSPECT;
1685        use hyperactor_config::attrs::AttrKeyInfo;
1686        let fq_name = inventory::iter::<AttrKeyInfo>()
1687            .find(|info| {
1688                info.meta
1689                    .get(INTROSPECT)
1690                    .is_some_and(|ia| ia.name == short_name)
1691            })
1692            .map(|info| info.name)?;
1693        let obj: serde_json::Value = serde_json::from_str(attrs_json).ok()?;
1694        obj.get(fq_name).cloned()
1695    }
1696
1697    /// Assert that an IntrospectResult has valid JSON attrs (IA-1).
1698    fn assert_valid_attrs(result: &IntrospectResult) {
1699        let parsed: serde_json::Value =
1700            serde_json::from_str(&result.attrs).expect("IA-1: attrs must be valid JSON");
1701        assert!(parsed.is_object(), "IA-1: attrs must be a JSON object");
1702    }
1703
1704    /// Assert the actor status attr matches expected value.
1705    fn assert_status(result: &IntrospectResult, expected: &str) {
1706        let status = attrs_get(&result.attrs, "status")
1707            .and_then(|v| v.as_str().map(String::from))
1708            .expect("attrs must contain status");
1709        assert_eq!(status, expected, "unexpected actor status");
1710    }
1711
1712    /// Assert the actor has a specific handler (or None).
1713    fn assert_handler(result: &IntrospectResult, expected: Option<&str>) {
1714        let handler =
1715            attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from));
1716        assert_eq!(handler.as_deref(), expected);
1717    }
1718
1719    /// Assert the error code attr matches expected value.
1720    fn assert_error_code(result: &IntrospectResult, expected: &str) {
1721        let code = attrs_get(&result.attrs, "error_code")
1722            .and_then(|v| v.as_str().map(String::from))
1723            .expect("attrs must contain error_code");
1724        assert_eq!(code, expected);
1725    }
1726
1727    /// Assert handler does NOT contain a substring.
1728    fn assert_handler_not_contains(result: &IntrospectResult, forbidden: &str) {
1729        if let Some(handler) =
1730            attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from))
1731        {
1732            assert!(
1733                !handler.contains(forbidden),
1734                "handler should not contain '{}'; got: {}",
1735                forbidden,
1736                handler
1737            );
1738        }
1739    }
1740
1741    /// Assert an attr is present by short name.
1742    fn assert_has_attr(result: &IntrospectResult, short_name: &str) {
1743        assert!(
1744            attrs_get(&result.attrs, short_name).is_some(),
1745            "attrs must contain '{}'",
1746            short_name
1747        );
1748    }
1749
1750    /// Assert status contains a substring (for non-exact checks
1751    /// like "processing" on wedged actors).
1752    fn assert_status_contains(result: &IntrospectResult, substring: &str) {
1753        let status = attrs_get(&result.attrs, "status")
1754            .and_then(|v| v.as_str().map(String::from))
1755            .expect("attrs must contain status");
1756        assert!(
1757            status.contains(substring),
1758            "status should contain '{}'; got: {}",
1759            substring,
1760            status
1761        );
1762    }
1763
1764    /// Assert no status_reason attr (IA-3: non-terminal status).
1765    fn assert_no_status_reason(result: &IntrospectResult) {
1766        assert!(
1767            attrs_get(&result.attrs, "status_reason").is_none(),
1768            "IA-3: must not have status_reason"
1769        );
1770    }
1771
1772    /// Assert a handler is present (any value).
1773    fn assert_has_handler(result: &IntrospectResult) {
1774        assert!(
1775            attrs_get(&result.attrs, "last_handler").is_some(),
1776            "must have a handler"
1777        );
1778    }
1779
1780    /// Assert no failure attrs are present (IA-4).
1781    fn assert_no_failure_attrs(result: &IntrospectResult) {
1782        assert!(
1783            attrs_get(&result.attrs, "failure_error_message").is_none(),
1784            "IA-4: must not have failure attrs"
1785        );
1786    }
1787
1788    /// Establishes IA-1 (attrs-json), IA-3 (status-shape), and
1789    /// IA-4 (failure-shape) for the running-actor path only.
1790    /// Stopped/failed paths need separate tests (see proc.rs
1791    /// terminated snapshot tests).
1792    #[tokio::test]
1793    async fn test_ia1_ia4_running_actor_attrs() {
1794        let proc = Proc::local();
1795        let (client, _) = proc.instance("client").unwrap();
1796        let (tx, _rx) = client.open_port::<u64>();
1797        let actor = EchoActor(tx.bind());
1798        let handle = proc.spawn::<EchoActor>("ia_test", actor).unwrap();
1799
1800        let payload = crate::introspect::live_actor_payload(handle.cell());
1801
1802        // IA-1: valid JSON.
1803        assert_valid_attrs(&payload);
1804
1805        // IA-3: non-terminal status, no status_reason.
1806        assert_has_attr(&payload, "status");
1807        assert_no_status_reason(&payload);
1808
1809        // IA-4: no failure attrs.
1810        assert_no_failure_attrs(&payload);
1811
1812        handle.drain_and_stop("test").unwrap();
1813        handle.await;
1814    }
1815
1816    // Verifies that QueryChild returns an error for actors without
1817    // a registered query_child_handler callback. The runtime
1818    // introspect task responds with the error sentinel payload
1819    // (`identity == ""`, error attrs with code "not_found",
1820    // .. }`).
1821    #[tokio::test]
1822    async fn test_introspect_query_child_not_found() {
1823        let proc = Proc::local();
1824        let (client, _) = proc.instance("client").unwrap();
1825        let (tx, _rx) = client.open_port::<u64>();
1826        let actor = EchoActor(tx.bind());
1827        let handle = proc.spawn::<EchoActor>("echo_qc", actor).unwrap();
1828
1829        let child_ref =
1830            reference::Reference::Actor(test_proc_id("nonexistent").actor_id("child", 0));
1831        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1832        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1833            .send(
1834                &client,
1835                IntrospectMessage::QueryChild {
1836                    child_ref,
1837                    reply: reply_port.bind(),
1838                },
1839            )
1840            .unwrap();
1841        let payload = reply_rx.recv().await.unwrap();
1842
1843        assert!(payload.identity.is_empty());
1844        assert_error_code(&payload, "not_found");
1845
1846        handle.drain_and_stop("test").unwrap();
1847        handle.await;
1848    }
1849
1850    // Verifies that with the runtime introspect task, custom
1851    // `handle_introspect` overrides are not called. The runtime
1852    // task intercepts IntrospectMessage before it reaches the
1853    // actor's work queue. An actor with an override still gets
1854    // standard Actor properties from the runtime task.
1855    #[tokio::test]
1856    async fn test_introspect_override() {
1857        #[derive(Debug, Default)]
1858        #[hyperactor::export(handlers = [])]
1859        struct CustomIntrospectActor;
1860
1861        #[async_trait]
1862        impl Actor for CustomIntrospectActor {}
1863
1864        let proc = Proc::local();
1865        let (client, _) = proc.instance("client").unwrap();
1866        let handle = proc
1867            .spawn("custom_introspect", CustomIntrospectActor)
1868            .unwrap();
1869
1870        handle
1871            .status()
1872            .wait_for(|s| matches!(s, ActorStatus::Idle))
1873            .await
1874            .unwrap();
1875
1876        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1877        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1878            .send(
1879                &client,
1880                IntrospectMessage::Query {
1881                    view: IntrospectView::Actor,
1882                    reply: reply_port.bind(),
1883                },
1884            )
1885            .unwrap();
1886        let payload = reply_rx.recv().await.unwrap();
1887
1888        // The runtime task returns actor attrs (with status), NOT
1889        // the override's Host properties.
1890        assert_has_attr(&payload, "status");
1891
1892        handle.drain_and_stop("test").unwrap();
1893        handle.await;
1894    }
1895
1896    /// Verifies that a child actor spawned via `spawn_child` reports
1897    /// its parent as `supervisor` in the introspection payload, and
1898    /// that the parent's payload lists the child in `children`.
1899    #[tokio::test]
1900    async fn test_introspect_query_supervision_child() {
1901        let proc = Proc::local();
1902        let (client, _) = proc.instance("client").unwrap();
1903
1904        // Spawn parent.
1905        let (tx_parent, _rx_parent) = client.open_port::<u64>();
1906        let parent_handle = proc
1907            .spawn::<EchoActor>("parent", EchoActor(tx_parent.bind()))
1908            .unwrap();
1909
1910        // Spawn child under parent.
1911        let (tx_child, _rx_child) = client.open_port::<u64>();
1912        let child_handle = proc
1913            .spawn_child::<EchoActor>(parent_handle.cell().clone(), EchoActor(tx_child.bind()))
1914            .unwrap();
1915
1916        // Query the child — supervisor should be the parent.
1917        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1918        reference::PortRef::<IntrospectMessage>::attest_message_port(child_handle.actor_id())
1919            .send(
1920                &client,
1921                IntrospectMessage::Query {
1922                    view: IntrospectView::Actor,
1923                    reply: reply_port.bind(),
1924                },
1925            )
1926            .unwrap();
1927        let child_payload = reply_rx.recv().await.unwrap();
1928
1929        assert_eq!(child_payload.identity, child_handle.actor_id().to_string(),);
1930        // Verify it has actor attrs (status present).
1931        assert!(
1932            attrs_get(&child_payload.attrs, "status").is_some(),
1933            "child should have actor attrs"
1934        );
1935        assert_eq!(
1936            child_payload.parent,
1937            Some(parent_handle.actor_id().to_string()),
1938        );
1939
1940        // Query the parent — children should include the child.
1941        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1942        reference::PortRef::<IntrospectMessage>::attest_message_port(parent_handle.actor_id())
1943            .send(
1944                &client,
1945                IntrospectMessage::Query {
1946                    view: IntrospectView::Actor,
1947                    reply: reply_port.bind(),
1948                },
1949            )
1950            .unwrap();
1951        let parent_payload = reply_rx.recv().await.unwrap();
1952
1953        assert!(parent_payload.parent.is_none());
1954        assert!(
1955            parent_payload
1956                .children
1957                .contains(&child_handle.actor_id().to_string()),
1958        );
1959
1960        child_handle.drain_and_stop("test").unwrap();
1961        child_handle.await;
1962        parent_handle.drain_and_stop("test").unwrap();
1963        parent_handle.await;
1964    }
1965
1966    /// A freshly spawned actor that has received no user messages
1967    /// reports `last_message_handler == None` — the introspect
1968    /// handler does not leak through. Status is `"idle"` once
1969    /// initialization completes.
1970    #[tokio::test]
1971    async fn test_introspect_fresh_actor_status() {
1972        let proc = Proc::local();
1973        let (client, _) = proc.instance("client").unwrap();
1974        let (tx, _rx) = client.open_port::<u64>();
1975        let actor = EchoActor(tx.bind());
1976        let handle = proc.spawn::<EchoActor>("echo_fresh", actor).unwrap();
1977
1978        // Wait for the actor to finish initialization.
1979        handle
1980            .status()
1981            .wait_for(|s| matches!(s, ActorStatus::Idle))
1982            .await
1983            .unwrap();
1984
1985        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1986        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1987            .send(
1988                &client,
1989                IntrospectMessage::Query {
1990                    view: IntrospectView::Actor,
1991                    reply: reply_port.bind(),
1992                },
1993            )
1994            .unwrap();
1995        let payload = reply_rx.recv().await.unwrap();
1996
1997        assert_status(&payload, "idle");
1998        assert_handler(&payload, None);
1999
2000        handle.drain_and_stop("test").unwrap();
2001        handle.await;
2002    }
2003
2004    /// After processing a user message, the introspect payload reports
2005    /// the user message's handler and post-completion status — not
2006    /// the introspect handler itself (one-behind invariant,
2007    /// after-user-traffic case).
2008    #[tokio::test]
2009    async fn test_introspect_after_user_message() {
2010        let proc = Proc::local();
2011        let (client, _) = proc.instance("client").unwrap();
2012        let (tx, mut rx) = client.open_port::<u64>();
2013        let actor = EchoActor(tx.bind());
2014        let handle = proc.spawn::<EchoActor>("echo_after_msg", actor).unwrap();
2015
2016        // Send a user message and wait for it to be processed.
2017        handle.send(&client, 42u64).unwrap();
2018        let _ = rx.recv().await.unwrap();
2019
2020        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2021        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2022            .send(
2023                &client,
2024                IntrospectMessage::Query {
2025                    view: IntrospectView::Actor,
2026                    reply: reply_port.bind(),
2027                },
2028            )
2029            .unwrap();
2030        let payload = reply_rx.recv().await.unwrap();
2031
2032        assert_status(&payload, "idle");
2033        assert_has_handler(&payload);
2034        assert_handler_not_contains(&payload, "IntrospectMessage");
2035
2036        handle.drain_and_stop("test").unwrap();
2037        handle.await;
2038    }
2039
2040    /// Two consecutive introspect queries: with the runtime
2041    /// introspect task, neither perturbs the actor's state (S2).
2042    /// Both report the same `last_message_handler` for a fresh
2043    /// actor — `None`, not `IntrospectMessage`.
2044    #[tokio::test]
2045    async fn test_introspect_consecutive_queries() {
2046        let proc = Proc::local();
2047        let (client, _) = proc.instance("client").unwrap();
2048        let (tx, _rx) = client.open_port::<u64>();
2049        let actor = EchoActor(tx.bind());
2050        let handle = proc.spawn::<EchoActor>("echo_consec", actor).unwrap();
2051
2052        handle
2053            .status()
2054            .wait_for(|s| matches!(s, ActorStatus::Idle))
2055            .await
2056            .unwrap();
2057
2058        // First introspect query.
2059        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2060        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2061            .send(
2062                &client,
2063                IntrospectMessage::Query {
2064                    view: IntrospectView::Actor,
2065                    reply: reply_port.bind(),
2066                },
2067            )
2068            .unwrap();
2069        let payload1 = reply_rx.recv().await.unwrap();
2070
2071        // Second introspect query.
2072        let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2073        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2074            .send(
2075                &client,
2076                IntrospectMessage::Query {
2077                    view: IntrospectView::Actor,
2078                    reply: reply_port2.bind(),
2079                },
2080            )
2081            .unwrap();
2082        let payload2 = reply_rx2.recv().await.unwrap();
2083
2084        // Neither should show IntrospectMessage as the handler.
2085        assert_handler(&payload1, None);
2086        assert_handler(&payload2, None);
2087
2088        handle.drain_and_stop("test").unwrap();
2089        handle.await;
2090    }
2091
2092    // test_published_properties_round_trip removed — replaced by
2093    // test_publish_attrs_round_trip which tests the Attrs-based API.
2094
2095    /// Verify InstanceCell Attrs storage: `set_published_attrs`
2096    /// replaces the whole bag, `merge_published_attr` merges a single
2097    /// key incrementally. (Instance methods are thin wrappers over
2098    /// these.)
2099    #[tokio::test]
2100    async fn test_publish_attrs_round_trip() {
2101        use hyperactor_config::Attrs;
2102        use hyperactor_config::declare_attrs;
2103
2104        declare_attrs! {
2105            attr TEST_KEY_A: String;
2106            attr TEST_KEY_B: u64;
2107        }
2108
2109        let proc = Proc::local();
2110        let (client, _) = proc.instance("client").unwrap();
2111        let (tx, _rx) = client.open_port::<u64>();
2112        let actor = EchoActor(tx.bind());
2113        let handle = proc.spawn::<EchoActor>("echo_attrs", actor).unwrap();
2114
2115        // Before publishing, attrs are None.
2116        assert!(handle.cell().published_attrs().is_none());
2117
2118        // publish_attrs: replace entire bag.
2119        let mut attrs = Attrs::new();
2120        attrs.set(TEST_KEY_A, "hello".to_string());
2121        handle.cell().set_published_attrs(attrs);
2122        let published = handle.cell().published_attrs().unwrap();
2123        assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2124
2125        // publish_attr: merge single key into existing bag.
2126        handle.cell().merge_published_attr(TEST_KEY_B, 42u64);
2127        let published = handle.cell().published_attrs().unwrap();
2128        assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2129        assert_eq!(published.get(TEST_KEY_B), Some(&42u64));
2130
2131        // publish_attr: overwrite existing key.
2132        handle
2133            .cell()
2134            .merge_published_attr(TEST_KEY_A, "world".to_string());
2135        let published = handle.cell().published_attrs().unwrap();
2136        assert_eq!(published.get(TEST_KEY_A), Some(&"world".to_string()));
2137
2138        handle.drain_and_stop("test").unwrap();
2139        handle.await;
2140    }
2141
2142    /// Verify the query_child_handler callback: register a callback,
2143    /// invoke it via `query_child()`, and confirm the response.
2144    #[tokio::test]
2145    async fn test_query_child_handler_round_trip() {
2146        let proc = Proc::local();
2147        let (client, _) = proc.instance("client").unwrap();
2148        let (tx, _rx) = client.open_port::<u64>();
2149        let actor = EchoActor(tx.bind());
2150        let handle = proc.spawn::<EchoActor>("echo_qch", actor).unwrap();
2151
2152        // Before registering, query_child returns None.
2153        let test_ref = reference::Reference::Actor(test_proc_id("test").actor_id("child", 0));
2154        assert!(handle.cell().query_child(&test_ref).is_none());
2155
2156        // Register a callback.
2157        handle
2158            .cell()
2159            .set_query_child_handler(|child_ref| IntrospectResult {
2160                identity: child_ref.to_string(),
2161                attrs: serde_json::json!({
2162                    "proc_name": "test_proc",
2163                    "num_actors": 42,
2164                })
2165                .to_string(),
2166                children: Vec::new(),
2167                parent: None,
2168                as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now()).to_string(),
2169            });
2170
2171        // Now query_child returns the callback's response.
2172        let payload = handle
2173            .cell()
2174            .query_child(&test_ref)
2175            .expect("callback should produce a payload");
2176        assert_eq!(payload.identity, test_ref.to_string());
2177        let attrs: serde_json::Value =
2178            serde_json::from_str(&payload.attrs).expect("attrs must be valid JSON");
2179        assert_eq!(
2180            attrs.get("proc_name").and_then(|v| v.as_str()),
2181            Some("test_proc")
2182        );
2183        assert_eq!(attrs.get("num_actors").and_then(|v| v.as_u64()), Some(42));
2184
2185        handle.drain_and_stop("test").unwrap();
2186        handle.await;
2187    }
2188
2189    /// Exercises S1 (see `introspect` module doc).
2190    ///
2191    /// Sends a wedging message, then queries introspect while the
2192    /// actor is blocked. The response must arrive and report live
2193    /// processing status.
2194    #[tokio::test]
2195    async fn test_introspect_wedged() {
2196        #[derive(Debug, Default)]
2197        #[hyperactor::export(handlers = [u64])]
2198        struct WedgedActor;
2199
2200        #[async_trait]
2201        impl Actor for WedgedActor {}
2202
2203        #[async_trait]
2204        impl Handler<u64> for WedgedActor {
2205            async fn handle(
2206                &mut self,
2207                _cx: &Context<Self>,
2208                _message: u64,
2209            ) -> Result<(), anyhow::Error> {
2210                // Block forever.
2211                std::future::pending::<()>().await;
2212                Ok(())
2213            }
2214        }
2215
2216        let proc = Proc::local();
2217        let (client, _) = proc.instance("client").unwrap();
2218        let handle = proc.spawn("wedged", WedgedActor).unwrap();
2219
2220        // Wait for idle before sending the wedging message.
2221        handle
2222            .status()
2223            .wait_for(|s| matches!(s, ActorStatus::Idle))
2224            .await
2225            .unwrap();
2226
2227        // Send a u64 to wedge the actor in its handler.
2228        handle.send(&client, 1u64).unwrap();
2229
2230        // Wait for the handler to start blocking.
2231        tokio::time::sleep(Duration::from_millis(50)).await;
2232
2233        // Send introspect query via the dedicated introspect port.
2234        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2235        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2236            .send(
2237                &client,
2238                IntrospectMessage::Query {
2239                    view: IntrospectView::Actor,
2240                    reply: reply_port.bind(),
2241                },
2242            )
2243            .unwrap();
2244
2245        // Must not hang — the introspect task runs independently.
2246        let payload = tokio::time::timeout(Duration::from_secs(5), reply_rx.recv())
2247            .await
2248            .expect("introspect should not hang on a wedged actor")
2249            .unwrap();
2250
2251        assert_status_contains(&payload, "processing");
2252        assert_handler_not_contains(&payload, "IntrospectMessage");
2253    }
2254
2255    /// Exercises S2 (see `introspect` module doc).
2256    ///
2257    /// After a user message, two consecutive introspect queries both
2258    /// report the user message handler.
2259    #[tokio::test]
2260    async fn test_introspect_no_perturbation() {
2261        let proc = Proc::local();
2262        let (client, _) = proc.instance("client").unwrap();
2263        let (tx, mut rx) = client.open_port::<u64>();
2264        let actor = EchoActor(tx.bind());
2265        let handle = proc.spawn::<EchoActor>("echo_no_perturb", actor).unwrap();
2266
2267        // Wait for idle before sending the user message.
2268        handle
2269            .status()
2270            .wait_for(|s| matches!(s, ActorStatus::Idle))
2271            .await
2272            .unwrap();
2273
2274        // Send a user message and wait for it to be processed.
2275        handle.send(&client, 42u64).unwrap();
2276        let _ = rx.recv().await.unwrap();
2277
2278        // First introspect query.
2279        let (reply_port1, reply_rx1) = client.open_once_port::<IntrospectResult>();
2280        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2281            .send(
2282                &client,
2283                IntrospectMessage::Query {
2284                    view: IntrospectView::Actor,
2285                    reply: reply_port1.bind(),
2286                },
2287            )
2288            .unwrap();
2289        let payload1 = reply_rx1.recv().await.unwrap();
2290
2291        // Second introspect query.
2292        let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2293        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2294            .send(
2295                &client,
2296                IntrospectMessage::Query {
2297                    view: IntrospectView::Actor,
2298                    reply: reply_port2.bind(),
2299                },
2300            )
2301            .unwrap();
2302        let payload2 = reply_rx2.recv().await.unwrap();
2303
2304        // Both should report the user message handler, not IntrospectMessage.
2305        assert_handler_not_contains(&payload1, "IntrospectMessage");
2306        assert_handler_not_contains(&payload2, "IntrospectMessage");
2307        // Consecutive queries must agree (compare parsed, not raw
2308        // strings — HashMap key ordering is non-deterministic).
2309        let attrs1: serde_json::Value = serde_json::from_str(&payload1.attrs).unwrap();
2310        let attrs2: serde_json::Value = serde_json::from_str(&payload2.attrs).unwrap();
2311        assert_eq!(attrs1, attrs2, "consecutive queries should be identical");
2312
2313        handle.drain_and_stop("test").unwrap();
2314        handle.await;
2315    }
2316
2317    /// Exercises CI-1 (see `proc` module doc).
2318    ///
2319    /// Unlike a plain `instance()`, which drops the introspect
2320    /// receiver so queries are silently discarded, an
2321    /// `introspectable_instance` has a live `serve_introspect` task
2322    /// and is fully navigable in admin tooling.
2323    #[tokio::test]
2324    async fn test_introspectable_instance_responds_to_query() {
2325        let proc = Proc::local();
2326        let (bridge, handle) = proc.introspectable_instance("bridge").unwrap();
2327        let actor_id = handle.actor_id().clone();
2328
2329        let (reply_port, reply_rx) = bridge.open_once_port::<IntrospectResult>();
2330        reference::PortRef::<IntrospectMessage>::attest_message_port(&actor_id)
2331            .send(
2332                &bridge,
2333                IntrospectMessage::Query {
2334                    view: IntrospectView::Actor,
2335                    reply: reply_port.bind(),
2336                },
2337            )
2338            .unwrap();
2339        let payload = reply_rx.recv().await.unwrap();
2340
2341        // CI-1: introspectable_instance reports status "client"
2342        // and actor_type "()" (the unit type).
2343        assert_eq!(payload.identity, actor_id.to_string());
2344        assert_status(&payload, "client");
2345        let actor_type = attrs_get(&payload.attrs, "actor_type")
2346            .and_then(|v| v.as_str().map(String::from))
2347            .expect("must have actor_type");
2348        assert_eq!(actor_type, "()", "CI-1: actor_type must be \"()\"");
2349    }
2350
2351    /// Contrast with CI-1: a plain `instance()` does NOT respond to
2352    /// `IntrospectMessage::Query`. Its introspect receiver is dropped
2353    /// in `Proc::instance()`, so the message is silently discarded
2354    /// and the reply port never receives a value.
2355    ///
2356    /// Callers that need TUI visibility must use
2357    /// `introspectable_instance` instead.
2358    #[tokio::test]
2359    async fn test_instance_does_not_respond_to_query() {
2360        let proc = Proc::local();
2361        let (client, _client_handle) = proc.instance("client").unwrap();
2362        let (_mailbox, mailbox_handle) = proc.instance("mailbox").unwrap();
2363        let mailbox_id = mailbox_handle.actor_id().clone();
2364
2365        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2366        reference::PortRef::<IntrospectMessage>::attest_message_port(&mailbox_id)
2367            .send(
2368                &client,
2369                IntrospectMessage::Query {
2370                    view: IntrospectView::Actor,
2371                    reply: reply_port.bind(),
2372                },
2373            )
2374            .unwrap();
2375
2376        // The introspect receiver was dropped in `instance()`, so the
2377        // message is silently discarded and the reply never arrives.
2378        let result = tokio::time::timeout(Duration::from_millis(100), reply_rx.recv()).await;
2379        assert!(
2380            result.is_err(),
2381            "instance() must not respond to IntrospectMessage (introspect receiver dropped)"
2382        );
2383    }
2384
2385    /// Exercises CI-2 (see `proc` module doc).
2386    ///
2387    /// Dropping the instance transitions status to terminal,
2388    /// causing `serve_introspect` to store a terminated snapshot.
2389    #[tokio::test]
2390    async fn test_introspectable_instance_snapshot_on_drop() {
2391        let proc = Proc::local();
2392        let (instance, handle) = proc.introspectable_instance("bridge").unwrap();
2393        let actor_id = handle.actor_id().clone();
2394
2395        assert!(
2396            proc.all_actor_ids().contains(&actor_id),
2397            "should appear in all_actor_ids while live"
2398        );
2399
2400        // Dropping `instance` transitions status to Stopped, waking
2401        // the serve_introspect task which stores the snapshot.
2402        drop(instance);
2403
2404        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2405        loop {
2406            if proc.terminated_snapshot(&actor_id).is_some() {
2407                break;
2408            }
2409            assert!(
2410                std::time::Instant::now() < deadline,
2411                "timed out waiting for terminated snapshot"
2412            );
2413            tokio::task::yield_now().await;
2414        }
2415
2416        let snapshot = proc.terminated_snapshot(&actor_id).unwrap();
2417        let actor_status = attrs_get(&snapshot.attrs, "status")
2418            .and_then(|v| v.as_str().map(String::from))
2419            .expect("snapshot attrs must contain status");
2420        assert!(
2421            actor_status.starts_with("stopped"),
2422            "CI-2: snapshot actor_status should be stopped, got: {}",
2423            actor_status
2424        );
2425    }
2426}