hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // Allow until this is used outside of tests.
10
11//! This module contains all the core traits required to define and manage actors.
12
13use std::any::TypeId;
14use std::borrow::Cow;
15use std::fmt;
16use std::fmt::Debug;
17use std::future::Future;
18use std::future::IntoFuture;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use hyperactor_config::Flattrs;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::watch;
31use tokio::task::JoinHandle;
32use typeuri::Named;
33
34use crate as hyperactor; // for macros
35use crate::Data;
36use crate::Message;
37use crate::RemoteMessage;
38use crate::context;
39use crate::mailbox::MailboxError;
40use crate::mailbox::MailboxSenderError;
41use crate::mailbox::MessageEnvelope;
42use crate::mailbox::PortHandle;
43use crate::mailbox::Undeliverable;
44use crate::mailbox::UndeliverableMessageError;
45use crate::message::Castable;
46use crate::message::IndexedErasedUnbound;
47use crate::proc::Context;
48use crate::proc::Instance;
49use crate::proc::InstanceCell;
50use crate::proc::Ports;
51use crate::proc::Proc;
52use crate::reference;
53use crate::supervision::ActorSupervisionEvent;
54
55pub mod remote;
56
57/// An Actor is an independent, asynchronous thread of execution. Each
58/// actor instance has a mailbox, whose messages are delivered through
59/// the method [`Actor::handle`].
60///
61/// Actors communicate with each other by way of message passing.
62/// Actors are assumed to be _deterministic_: that is, the state of an
63/// actor is determined by the set (and order) of messages it receives.
64#[async_trait]
65pub trait Actor: Sized + Send + 'static {
66    /// Initialize the actor, after the runtime has been fully initialized.
67    /// Init thus provides a mechanism by which an actor can reliably and always
68    /// receive some initial event that can be used to kick off further
69    /// (potentially delayed) processing.
70    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
71        // Default implementation: no init.
72        Ok(())
73    }
74
75    /// Cleanup things used by this actor before shutting down. Notably this function
76    /// is async and allows more complex cleanup. Simpler cleanup can be handled
77    /// by the impl Drop for this Actor.
78    /// If err is not None, it is the error that this actor is failing with. Any
79    /// errors returned by this function will be logged and ignored.
80    /// If err is None, any errors returned by this function will be propagated
81    /// as an ActorError.
82    /// This function is not called if there is a panic in the actor, as the
83    /// actor may be in an indeterminate state. It is also not called if the
84    /// process is killed, there is no atexit handler or signal handler.
85    async fn cleanup(
86        &mut self,
87        _this: &Instance<Self>,
88        _err: Option<&ActorError>,
89    ) -> Result<(), anyhow::Error> {
90        // Default implementation: no cleanup.
91        Ok(())
92    }
93
94    /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
95    /// The spawned actor will be supervised by the parent (spawning) actor.
96    fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
97        cx.instance().spawn(self)
98    }
99
100    /// Spawn a named child actor. Same supervision semantics as
101    /// `spawn`, but the child gets `name` in its ActorId.
102    fn spawn_with_name(
103        self,
104        cx: &impl context::Actor,
105        name: &str,
106    ) -> anyhow::Result<ActorHandle<Self>> {
107        cx.instance().spawn_with_name(name, self)
108    }
109
110    /// Spawns this actor in a detached state, handling its messages
111    /// in a background task. The returned handle is used to control
112    /// the actor's lifecycle and to interact with it.
113    ///
114    /// Actors spawned through `spawn_detached` are not attached to a supervision
115    /// hierarchy, and not managed by a [`Proc`].
116    fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
117        Proc::local().spawn("anon", self)
118    }
119
120    /// This method is used by the runtime to spawn the actor server. It can be
121    /// used by actors that require customized runtime setups
122    /// (e.g., dedicated actor threads), or want to use a custom tokio runtime.
123    #[hyperactor::instrument_infallible]
124    fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
125    where
126        F: Future + Send + 'static,
127        F::Output: Send + 'static,
128    {
129        tokio::spawn(future)
130    }
131
132    /// Handle actor supervision event. Return `Ok(true)`` if the event is handled here.
133    async fn handle_supervision_event(
134        &mut self,
135        _this: &Instance<Self>,
136        event: &ActorSupervisionEvent,
137    ) -> Result<bool, anyhow::Error> {
138        // Error events are not handled by default and bubble up to the parent.
139        // Normal lifecycle events (e.g. clean stop) are absorbed.
140        Ok(!event.is_error())
141    }
142
143    /// Default undeliverable message handling behavior.
144    async fn handle_undeliverable_message(
145        &mut self,
146        cx: &Instance<Self>,
147        envelope: Undeliverable<MessageEnvelope>,
148    ) -> Result<(), anyhow::Error> {
149        handle_undeliverable_message(cx, envelope)
150    }
151
152    /// If overridden, we will use this name in place of the
153    /// ActorId for talking about this actor in supervision error
154    /// messages.
155    fn display_name(&self) -> Option<String> {
156        None
157    }
158}
159
160/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
161/// as a free function so that `Actor` implementations that override
162/// [`Actor::handle_undeliverable_message`] can fallback to this default.
163pub fn handle_undeliverable_message<A: Actor>(
164    cx: &Instance<A>,
165    Undeliverable(envelope): Undeliverable<MessageEnvelope>,
166) -> Result<(), anyhow::Error> {
167    assert_eq!(envelope.sender(), cx.self_id());
168
169    anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
170}
171
172/// An actor that does nothing. It is used to represent "client only" actors,
173/// returned by [`Proc::instance`].
174#[async_trait]
175impl Actor for () {}
176
177impl Referable for () {}
178
179impl Binds<()> for () {
180    fn bind(_ports: &Ports<Self>) {
181        // Binds no ports.
182    }
183}
184
185/// A Handler allows an actor to handle a specific message type.
186#[async_trait]
187pub trait Handler<M>: Actor {
188    /// Handle the next M-typed message.
189    async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
190}
191
192/// We provide this handler to indicate that actors can handle the [`Signal`] message.
193/// Its actual handler is implemented by the runtime.
194#[async_trait]
195impl<A: Actor> Handler<Signal> for A {
196    async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
197        unimplemented!("signal handler should not be called directly")
198    }
199}
200
201/// This handler provides a default behavior when a message sent by
202/// the actor to another is returned due to delivery failure.
203#[async_trait]
204impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
205    async fn handle(
206        &mut self,
207        cx: &Context<Self>,
208        message: Undeliverable<MessageEnvelope>,
209    ) -> Result<(), anyhow::Error> {
210        let sender = message.0.sender().clone();
211        let dest = message.0.dest().clone();
212        let error = message.0.error_msg().unwrap_or(String::new());
213        match self.handle_undeliverable_message(cx, message).await {
214            Ok(_) => {
215                tracing::debug!(
216                    actor_id = %cx.self_id(),
217                    name = "undeliverable_message_handled",
218                    %sender,
219                    %dest,
220                    error,
221                );
222                Ok(())
223            }
224            Err(e) => {
225                tracing::error!(
226                    actor_id = %cx.self_id(),
227                    name = "undeliverable_message",
228                    %sender,
229                    %dest,
230                    error,
231                    handler_error = %e,
232                );
233                Err(e)
234            }
235        }
236    }
237}
238
239/// This handler enables actors to unbind the [IndexedErasedUnbound]
240/// message, and forward the result to corresponding handler.
241#[async_trait]
242impl<A, M> Handler<IndexedErasedUnbound<M>> for A
243where
244    A: Handler<M>,
245    M: Castable,
246{
247    async fn handle(
248        &mut self,
249        cx: &Context<Self>,
250        msg: IndexedErasedUnbound<M>,
251    ) -> anyhow::Result<()> {
252        let message = msg.downcast()?.bind()?;
253        Handler::handle(self, cx, message).await
254    }
255}
256
257/// An `Actor` that can be spawned remotely.
258///
259/// Bounds explained:
260/// - `Actor`: only actors may be remotely spawned.
261/// - `Referable`: marks the type as eligible for typed remote
262///   references (`ActorRef<A>`); required because remote spawn
263///   ultimately hands back an `ActorId` that higher-level APIs may
264///   re-type as `ActorRef<A>`.
265/// - `Binds<Self>`: lets the runtime wire this actor's message ports
266///   when it is spawned (the blanket impl calls `handle.bind::<Self>()`).
267///
268/// `gspawn` is a type-erased entry point used by the remote
269/// spawn/registry machinery. It takes serialized params and returns
270/// the new actor's `ActorId`; application code shouldn't call it
271/// directly.
272#[async_trait]
273pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
274    /// The type of parameters used to instantiate the actor remotely.
275    type Params: RemoteMessage;
276
277    /// Creates a new actor instance given its instantiation parameters.
278    /// The `environment` allows whoever is responsible for spawning this actor
279    /// to pass in additional context that may be useful.
280    async fn new(params: Self::Params, environment: Flattrs) -> anyhow::Result<Self>;
281
282    /// A type-erased entry point to spawn this actor. This is
283    /// primarily used by hyperactor's remote actor registration
284    /// mechanism.
285    // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`].
286    fn gspawn(
287        proc: &Proc,
288        name: &str,
289        serialized_params: Data,
290        environment: Flattrs,
291    ) -> Pin<Box<dyn Future<Output = Result<reference::ActorId, anyhow::Error>> + Send>> {
292        let proc = proc.clone();
293        let name = name.to_string();
294        Box::pin(async move {
295            let params = bincode::deserialize(&serialized_params)?;
296            let actor = Self::new(params, environment).await?;
297            let handle = proc.spawn(&name, actor)?;
298            // We return only the ActorId, not a typed ActorRef.
299            // Callers that hold this ID can interact with the actor
300            // only via the serialized/opaque messaging path, which
301            // makes it safe to export across process boundaries.
302            //
303            // Note: the actor itself is still `A`-typed here; we
304            // merely restrict the *capability* we hand out to an
305            // untyped identifier.
306            //
307            // This will be replaced by a proper export/registry
308            // mechanism.
309            Ok(handle.bind::<Self>().actor_id)
310        })
311    }
312
313    /// The type ID of this actor.
314    fn get_type_id() -> TypeId {
315        TypeId::of::<Self>()
316    }
317}
318
319/// If an actor implements Default, we use this as the
320/// `RemoteSpawn` implementation, too.
321#[async_trait]
322impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
323    type Params = ();
324
325    async fn new(_params: Self::Params, _environment: Flattrs) -> anyhow::Result<Self> {
326        Ok(Default::default())
327    }
328}
329
330/// Errors that occur while serving actors. Each error is associated
331/// with the ID of the actor being served.
332#[derive(Debug)]
333pub struct ActorError {
334    /// The ActorId for the actor that generated this error.
335    pub actor_id: Box<reference::ActorId>,
336    /// The kind of error that occurred.
337    pub kind: Box<ActorErrorKind>,
338}
339
340/// The kinds of actor serving errors.
341#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
342pub enum ActorErrorKind {
343    /// Generic error with a formatted message.
344    #[error("{0}")]
345    Generic(String),
346
347    /// An error that occurred while trying to handle a supervision event.
348    #[error("{0} while handling {1}")]
349    ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
350
351    /// The actor did not attempt to handle
352    #[error("{0}")]
353    UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
354
355    /// The actor was explicitly aborted with the provided reason.
356    #[error("actor explicitly aborted due to: {0}")]
357    Aborted(String),
358}
359
360impl ActorErrorKind {
361    /// Error while processing actor, i.e., returned by the actor's
362    /// processing method.
363    pub fn processing(err: anyhow::Error) -> Self {
364        // Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
365        // If it is directly use it as the new ActorError's ActorErrorKind.
366        // This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
367        // up the handling infrastructure.
368        err.downcast::<ActorErrorKind>()
369            .unwrap_or_else(|err| Self::Generic(err.to_string()))
370    }
371
372    /// Unwound stracktrace of a panic.
373    pub fn panic(err: anyhow::Error) -> Self {
374        Self::Generic(format!("panic: {}", err))
375    }
376
377    /// Error during actor initialization.
378    pub fn init(err: anyhow::Error) -> Self {
379        Self::Generic(format!("initialization error: {}", err))
380    }
381
382    /// Error during actor cleanup.
383    pub fn cleanup(err: anyhow::Error) -> Self {
384        Self::Generic(format!("cleanup error: {}", err))
385    }
386
387    /// An underlying mailbox error.
388    pub fn mailbox(err: MailboxError) -> Self {
389        Self::Generic(err.to_string())
390    }
391
392    /// An underlying mailbox sender error.
393    pub fn mailbox_sender(err: MailboxSenderError) -> Self {
394        Self::Generic(err.to_string())
395    }
396
397    /// The actor's state could not be determined.
398    pub fn indeterminate_state() -> Self {
399        Self::Generic("actor is in an indeterminate state".to_string())
400    }
401}
402
403impl ActorError {
404    /// Create a new actor server error with the provided id and kind.
405    pub(crate) fn new(actor_id: &reference::ActorId, kind: ActorErrorKind) -> Self {
406        Self {
407            actor_id: Box::new(actor_id.clone()),
408            kind: Box::new(kind),
409        }
410    }
411}
412
413impl fmt::Display for ActorError {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        write!(f, "serving {}: ", self.actor_id)?;
416        fmt::Display::fmt(&self.kind, f)
417    }
418}
419
420impl std::error::Error for ActorError {
421    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
422        self.kind.source()
423    }
424}
425
426impl From<MailboxError> for ActorError {
427    fn from(inner: MailboxError) -> Self {
428        Self {
429            actor_id: Box::new(inner.actor_id().clone()),
430            kind: Box::new(ActorErrorKind::mailbox(inner)),
431        }
432    }
433}
434
435impl From<MailboxSenderError> for ActorError {
436    fn from(inner: MailboxSenderError) -> Self {
437        Self {
438            actor_id: Box::new(inner.location().actor_id().clone()),
439            kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
440        }
441    }
442}
443
444/// A collection of signals to control the behavior of the actor.
445/// Signals are internal runtime control plane messages and should not be
446/// sent outside of the runtime.
447///
448/// These messages are not handled directly by actors; instead, the runtime
449/// handles the various signals.
450#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
451pub enum Signal {
452    /// Stop the actor, after draining messages.
453    DrainAndStop(String),
454
455    /// Stop the actor immediately.
456    Stop(String),
457
458    /// The direct child with the given PID was stopped.
459    ChildStopped(reference::Index),
460
461    /// Abort the actor. This will exit the actor loop with an error,
462    /// causing a supervision event to propagate up the supervision
463    /// hierarchy.
464    Abort(String),
465}
466wirevalue::register_type!(Signal);
467
468impl fmt::Display for Signal {
469    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470        match self {
471            Signal::DrainAndStop(reason) => write!(f, "DrainAndStop({})", reason),
472            Signal::Stop(reason) => write!(f, "Stop({})", reason),
473            Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index),
474            Signal::Abort(reason) => write!(f, "Abort({})", reason),
475        }
476    }
477}
478
479/// Information about a message handler being processed.
480///
481/// Uses `Cow<'static, str>` to avoid string copies on the hot path.
482/// The typename and arm are typically static strings from `TypeInfo`.
483#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
484pub struct HandlerInfo {
485    /// The type name of the message being handled.
486    pub typename: Cow<'static, str>,
487    /// The enum arm being handled, if the message is an enum.
488    pub arm: Option<Cow<'static, str>>,
489}
490
491impl HandlerInfo {
492    /// Create a new `HandlerInfo` from static strings (zero-copy).
493    pub fn from_static(typename: &'static str, arm: Option<&'static str>) -> Self {
494        Self {
495            typename: Cow::Borrowed(typename),
496            arm: arm.map(Cow::Borrowed),
497        }
498    }
499
500    /// Create a new `HandlerInfo` from owned strings.
501    pub fn from_owned(typename: String, arm: Option<String>) -> Self {
502        Self {
503            typename: Cow::Owned(typename),
504            arm: arm.map(Cow::Owned),
505        }
506    }
507}
508
509impl fmt::Display for HandlerInfo {
510    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
511        match &self.arm {
512            Some(arm) => write!(f, "{}.{}", self.typename, arm),
513            None => write!(f, "{}", self.typename),
514        }
515    }
516}
517
518/// The runtime status of an actor.
519#[derive(
520    Debug,
521    Serialize,
522    Deserialize,
523    PartialEq,
524    Eq,
525    Clone,
526    typeuri::Named,
527    EnumAsInner
528)]
529pub enum ActorStatus {
530    /// The actor status is unknown.
531    Unknown,
532    /// The actor was created, but not yet started.
533    Created,
534    /// The actor is initializing. It is not yet ready to receive messages.
535    Initializing,
536    /// The actor is in "client" state: the user is managing the actor's
537    /// mailboxes manually.
538    Client,
539    /// The actor is ready to receive messages, but is currently idle.
540    Idle,
541    /// The actor has been processing a message, beginning at the specified
542    /// instant. The message handler info is included.
543    Processing(SystemTime, Option<HandlerInfo>),
544    /// The actor has been saving its state.
545    Saving(SystemTime),
546    /// The actor has been loading its state.
547    Loading(SystemTime),
548    /// The actor is stopping. It is draining messages.
549    Stopping,
550    /// The actor is stopped with a provided reason.
551    /// It is no longer processing messages.
552    Stopped(String),
553    /// The actor failed with the provided actor error.
554    Failed(ActorErrorKind),
555}
556
557impl ActorStatus {
558    /// Tells whether the status is a terminal state.
559    pub fn is_terminal(&self) -> bool {
560        self.is_stopped() || self.is_failed()
561    }
562
563    /// Create a generic failure status with the provided error message.
564    pub fn generic_failure(message: impl Into<String>) -> Self {
565        Self::Failed(ActorErrorKind::Generic(message.into()))
566    }
567
568    fn span_string(&self) -> &'static str {
569        self.arm().unwrap_or_default()
570    }
571}
572
573impl fmt::Display for ActorStatus {
574    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575        match self {
576            Self::Unknown => write!(f, "unknown"),
577            Self::Created => write!(f, "created"),
578            Self::Initializing => write!(f, "initializing"),
579            Self::Client => write!(f, "client"),
580            Self::Idle => write!(f, "idle"),
581            Self::Processing(instant, None) => {
582                write!(
583                    f,
584                    "processing for {}ms",
585                    std::time::SystemTime::now()
586                        .duration_since(*instant)
587                        .unwrap_or_default()
588                        .as_millis()
589                )
590            }
591            Self::Processing(instant, Some(handler_info)) => {
592                write!(
593                    f,
594                    "{}: processing for {}ms",
595                    handler_info,
596                    std::time::SystemTime::now()
597                        .duration_since(*instant)
598                        .unwrap_or_default()
599                        .as_millis()
600                )
601            }
602            Self::Saving(instant) => {
603                write!(
604                    f,
605                    "saving for {}ms",
606                    std::time::SystemTime::now()
607                        .duration_since(*instant)
608                        .unwrap_or_default()
609                        .as_millis()
610                )
611            }
612            Self::Loading(instant) => {
613                write!(
614                    f,
615                    "loading for {}ms",
616                    std::time::SystemTime::now()
617                        .duration_since(*instant)
618                        .unwrap_or_default()
619                        .as_millis()
620                )
621            }
622            Self::Stopping => write!(f, "stopping"),
623            Self::Stopped(reason) => write!(f, "stopped: {}", reason),
624            Self::Failed(err) => write!(f, "failed: {}", err),
625        }
626    }
627}
628
629/// ActorHandles represent a (local) serving actor. It is used to access
630/// its messaging and signal ports, as well as to synchronize with its
631/// lifecycle (e.g., providing joins).  Once dropped, the handle is
632/// detached from the underlying actor instance, and there is no longer
633/// any way to join it.
634///
635/// Correspondingly, [`crate::ActorRef`]s refer to (possibly) remote
636/// actors.
637pub struct ActorHandle<A: Actor> {
638    cell: InstanceCell,
639    ports: Arc<Ports<A>>,
640}
641
642/// A handle to a running (local) actor.
643impl<A: Actor> ActorHandle<A> {
644    pub(crate) fn new(cell: InstanceCell, ports: Arc<Ports<A>>) -> Self {
645        Self { cell, ports }
646    }
647
648    /// The actor's cell. Used primarily for testing.
649    /// TODO: this should not be a public API.
650    pub(crate) fn cell(&self) -> &InstanceCell {
651        &self.cell
652    }
653
654    /// The [`ActorId`] of the actor represented by this handle.
655    pub fn actor_id(&self) -> &reference::ActorId {
656        self.cell.actor_id()
657    }
658
659    /// Signal the actor to drain its current messages and then stop.
660    pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
661        tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
662        self.cell.signal(Signal::DrainAndStop(reason.to_string()))
663    }
664
665    /// A watch that observes the lifecycle state of the actor.
666    pub fn status(&self) -> watch::Receiver<ActorStatus> {
667        self.cell.status().clone()
668    }
669
670    /// Send a message to the actor. Messages sent through the handle
671    /// are always queued in process, and do not require serialization.
672    pub fn send<M: Message>(
673        &self,
674        cx: &impl context::Actor,
675        message: M,
676    ) -> Result<(), MailboxSenderError>
677    where
678        A: Handler<M>,
679    {
680        self.ports.get().send(cx, message)
681    }
682
683    /// Return a port for the provided message type handled by the actor.
684    pub fn port<M: Message>(&self) -> PortHandle<M>
685    where
686        A: Handler<M>,
687    {
688        self.ports.get()
689    }
690
691    /// TEMPORARY: bind...
692    /// TODO: we shoudl also have a default binding(?)
693    pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
694        self.cell.bind(self.ports.as_ref())
695    }
696}
697
698/// IntoFuture allows users to await the handle to join it. The future
699/// resolves when the actor itself has stopped processing messages.
700/// The future resolves to the actor's final status.
701impl<A: Actor> IntoFuture for ActorHandle<A> {
702    type Output = ActorStatus;
703    type IntoFuture = BoxFuture<'static, Self::Output>;
704
705    fn into_future(self) -> Self::IntoFuture {
706        let future = async move {
707            let mut status_receiver = self.cell.status().clone();
708            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
709            match result {
710                Err(_) => ActorStatus::Unknown,
711                Ok(status) => status.clone(),
712            }
713        };
714
715        future.boxed()
716    }
717}
718
719impl<A: Actor> Debug for ActorHandle<A> {
720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
721        f.debug_struct("ActorHandle").field("cell", &"..").finish()
722    }
723}
724
725impl<A: Actor> Clone for ActorHandle<A> {
726    fn clone(&self) -> Self {
727        Self {
728            cell: self.cell.clone(),
729            ports: self.ports.clone(),
730        }
731    }
732}
733
734/// `Referable` is a marker trait for types that can appear as
735/// remote references across process boundaries.
736///
737/// It is not limited to concrete [`Actor`] implementations. For
738/// example, façade types generated by [`behavior!`] implement
739/// `Referable` so that you can hand out restricted or stable APIs
740/// while still using the same remote messaging machinery.
741///
742/// Implementing this trait means the type can be identified (`Named`)
743/// so the runtime knows what it is.
744///
745///  In contrast, [`RemoteSpawn`] is the trait that marks *actors*
746/// that can actually be **spawned remotely**. A behavior may be a
747/// `Referable` but is never a `RemoteSpawn`.
748pub trait Referable: Named {}
749
750/// Binds determines how an actor's ports are bound to a specific
751/// reference type.
752pub trait Binds<A: Actor>: Referable {
753    /// Bind ports in this actor.
754    fn bind(ports: &Ports<A>);
755}
756
757/// Handles is a marker trait specifying that message type [`M`]
758/// is handled by a specific actor type.
759pub trait RemoteHandles<M: RemoteMessage>: Referable {}
760
761/// Check if the actor behaves-as the a given behavior (defined by [`behavior!`]).
762///
763/// ```
764/// # use serde::Serialize;
765/// # use serde::Deserialize;
766/// # use typeuri::Named;
767/// # use hyperactor::Actor;
768///
769/// // First, define a behavior, based on handling a single message type `()`.
770/// hyperactor::behavior!(UnitBehavior, ());
771///
772/// #[derive(Debug, Default)]
773/// struct MyActor;
774///
775/// impl Actor for MyActor {}
776///
777/// #[async_trait::async_trait]
778/// impl hyperactor::Handler<()> for MyActor {
779///     async fn handle(
780///         &mut self,
781///         _cx: &hyperactor::Context<Self>,
782///         _message: (),
783///     ) -> Result<(), anyhow::Error> {
784///         // no-op
785///         Ok(())
786///     }
787/// }
788///
789/// hyperactor::assert_behaves!(MyActor as UnitBehavior);
790/// ```
791#[macro_export]
792macro_rules! assert_behaves {
793    ($ty:ty as $behavior:ty) => {
794        const _: fn() = || {
795            fn check<B: hyperactor::actor::Binds<$ty>>() {}
796            check::<$behavior>();
797        };
798    };
799}
800
801#[cfg(test)]
802mod tests {
803    use std::sync::Mutex;
804    use std::time::Duration;
805
806    use rand::seq::SliceRandom;
807    use timed_test::async_timed_test;
808    use tokio::sync::mpsc;
809    use tokio::time::timeout;
810
811    use super::*;
812    use crate as hyperactor;
813    use crate::Actor;
814    use crate::OncePortHandle;
815    use crate::config;
816    use crate::context::Mailbox as _;
817    use crate::introspect::IntrospectMessage;
818    use crate::introspect::IntrospectResult;
819    use crate::introspect::IntrospectView;
820    use crate::mailbox::BoxableMailboxSender as _;
821    use crate::mailbox::MailboxSender;
822    use crate::mailbox::PortLocation;
823    use crate::mailbox::monitored_return_handle;
824    use crate::ordering::SEQ_INFO;
825    use crate::ordering::SeqInfo;
826    use crate::testing::ids::test_proc_id;
827    use crate::testing::pingpong::PingPongActor;
828    use crate::testing::pingpong::PingPongMessage;
829    use crate::testing::proc_supervison::ProcSupervisionCoordinator; // for macros
830
831    #[derive(Debug)]
832    struct EchoActor(reference::PortRef<u64>);
833
834    #[async_trait]
835    impl Actor for EchoActor {}
836
837    #[async_trait]
838    impl Handler<u64> for EchoActor {
839        async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
840            let Self(port) = self;
841            port.send(cx, message)?;
842            Ok(())
843        }
844    }
845
846    #[tokio::test]
847    async fn test_server_basic() {
848        let proc = Proc::local();
849        let (client, _) = proc.instance("client").unwrap();
850        let (tx, mut rx) = client.open_port();
851        let actor = EchoActor(tx.bind());
852        let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
853        handle.send(&client, 123u64).unwrap();
854        handle.drain_and_stop("test").unwrap();
855        handle.await;
856
857        assert_eq!(rx.drain(), vec![123u64]);
858    }
859
860    #[tokio::test]
861    async fn test_ping_pong() {
862        let proc = Proc::local();
863        let (client, _) = proc.instance("client").unwrap();
864        let (undeliverable_msg_tx, _) = client.open_port();
865
866        let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
867        let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
868        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
869        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
870
871        let (local_port, local_receiver) = client.open_once_port();
872
873        ping_handle
874            .send(
875                &client,
876                PingPongMessage(10, pong_handle.bind(), local_port.bind()),
877            )
878            .unwrap();
879
880        assert!(local_receiver.recv().await.unwrap());
881    }
882
883    #[tokio::test]
884    async fn test_ping_pong_on_handler_error() {
885        let proc = Proc::local();
886        let (client, _) = proc.instance("client").unwrap();
887        let (undeliverable_msg_tx, _) = client.open_port();
888
889        // Need to set a supervison coordinator for this Proc because there will
890        // be actor failure(s) in this test which trigger supervision.
891        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
892
893        let error_ttl = 66;
894
895        let ping_actor =
896            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
897        let pong_actor =
898            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
899        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
900        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
901
902        let (local_port, local_receiver) = client.open_once_port();
903
904        ping_handle
905            .send(
906                &client,
907                PingPongMessage(
908                    error_ttl + 1, // will encounter an error at TTL=66
909                    pong_handle.bind(),
910                    local_port.bind(),
911                ),
912            )
913            .unwrap();
914
915        // TODO: Fix this receiver hanging issue in T200423722.
916        let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
917            timeout(Duration::from_secs(5), local_receiver.recv()).await;
918        assert!(res.is_err());
919    }
920
921    #[derive(Debug)]
922    struct InitActor(bool);
923
924    #[async_trait]
925    impl Actor for InitActor {
926        async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
927            self.0 = true;
928            Ok(())
929        }
930    }
931
932    #[async_trait]
933    impl Handler<OncePortHandle<bool>> for InitActor {
934        async fn handle(
935            &mut self,
936            cx: &Context<Self>,
937            port: OncePortHandle<bool>,
938        ) -> Result<(), anyhow::Error> {
939            port.send(cx, self.0)?;
940            Ok(())
941        }
942    }
943
944    #[tokio::test]
945    async fn test_init() {
946        let proc = Proc::local();
947        let actor = InitActor(false);
948        let handle = proc.spawn::<InitActor>("init", actor).unwrap();
949        let (client, _) = proc.instance("client").unwrap();
950
951        let (port, receiver) = client.open_once_port();
952        handle.send(&client, port).unwrap();
953        assert!(receiver.recv().await.unwrap());
954
955        handle.drain_and_stop("test").unwrap();
956        handle.await;
957    }
958
959    type MultiValues = Arc<Mutex<(u64, String)>>;
960
961    struct MultiValuesTest {
962        proc: Proc,
963        values: MultiValues,
964        handle: ActorHandle<MultiActor>,
965        client: Instance<()>,
966        _client_handle: ActorHandle<()>,
967    }
968
969    impl MultiValuesTest {
970        async fn new() -> Self {
971            let proc = Proc::local();
972            let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
973            let actor = MultiActor(values.clone());
974            let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
975            let (client, client_handle) = proc.instance("client").unwrap();
976            Self {
977                proc,
978                values,
979                handle,
980                client,
981                _client_handle: client_handle,
982            }
983        }
984
985        fn send<M>(&self, message: M)
986        where
987            M: RemoteMessage,
988            MultiActor: Handler<M>,
989        {
990            self.handle.send(&self.client, message).unwrap()
991        }
992
993        async fn sync(&self) {
994            let (port, done) = self.client.open_once_port::<bool>();
995            self.handle.send(&self.client, port).unwrap();
996            assert!(done.recv().await.unwrap());
997        }
998
999        fn get_values(&self) -> (u64, String) {
1000            self.values.lock().unwrap().clone()
1001        }
1002    }
1003
1004    #[derive(Debug)]
1005    #[hyperactor::export(handlers = [u64, String])]
1006    struct MultiActor(MultiValues);
1007
1008    #[async_trait]
1009    impl Actor for MultiActor {}
1010
1011    #[async_trait]
1012    impl Handler<u64> for MultiActor {
1013        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1014            let mut vals = self.0.lock().unwrap();
1015            vals.0 = message;
1016            Ok(())
1017        }
1018    }
1019
1020    #[async_trait]
1021    impl Handler<String> for MultiActor {
1022        async fn handle(
1023            &mut self,
1024            _cx: &Context<Self>,
1025            message: String,
1026        ) -> Result<(), anyhow::Error> {
1027            let mut vals = self.0.lock().unwrap();
1028            vals.1 = message;
1029            Ok(())
1030        }
1031    }
1032
1033    #[async_trait]
1034    impl Handler<OncePortHandle<bool>> for MultiActor {
1035        async fn handle(
1036            &mut self,
1037            cx: &Context<Self>,
1038            message: OncePortHandle<bool>,
1039        ) -> Result<(), anyhow::Error> {
1040            message.send(cx, true).unwrap();
1041            Ok(())
1042        }
1043    }
1044
1045    #[tokio::test]
1046    async fn test_multi_handler_refs() {
1047        let test = MultiValuesTest::new().await;
1048
1049        test.send(123u64);
1050        test.send("foo".to_string());
1051        test.sync().await;
1052        assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1053
1054        let myref: reference::ActorRef<MultiActor> = test.handle.bind();
1055
1056        myref.port().send(&test.client, 321u64).unwrap();
1057        test.sync().await;
1058        assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1059
1060        myref.port().send(&test.client, "bar".to_string()).unwrap();
1061        test.sync().await;
1062        assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1063    }
1064
1065    #[tokio::test]
1066    async fn test_ref_behavior() {
1067        let test = MultiValuesTest::new().await;
1068
1069        test.send(123u64);
1070        test.send("foo".to_string());
1071
1072        hyperactor::behavior!(MyActorBehavior, u64, String);
1073
1074        let myref: reference::ActorRef<MyActorBehavior> = test.handle.bind();
1075        myref.port().send(&test.client, "biz".to_string()).unwrap();
1076        myref.port().send(&test.client, 999u64).unwrap();
1077
1078        test.sync().await;
1079        assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1080    }
1081
1082    #[tokio::test]
1083    async fn test_actor_handle_downcast() {
1084        #[derive(Debug, Default)]
1085        struct NothingActor;
1086
1087        impl Actor for NothingActor {}
1088
1089        // Just test that we can round-trip the handle through a downcast.
1090
1091        let proc = Proc::local();
1092        let handle = proc.spawn("nothing", NothingActor).unwrap();
1093        let cell = handle.cell();
1094
1095        // Invalid actor doesn't succeed.
1096        assert!(cell.downcast_handle::<EchoActor>().is_none());
1097
1098        let handle = cell.downcast_handle::<NothingActor>().unwrap();
1099        handle.drain_and_stop("test").unwrap();
1100        handle.await;
1101    }
1102
1103    // Returning the sequence number assigned to the message.
1104    #[derive(Debug)]
1105    #[hyperactor::export(handlers = [String, Callback])]
1106    struct GetSeqActor(reference::PortRef<(String, SeqInfo)>);
1107
1108    #[async_trait]
1109    impl Actor for GetSeqActor {}
1110
1111    #[async_trait]
1112    impl Handler<String> for GetSeqActor {
1113        async fn handle(
1114            &mut self,
1115            cx: &Context<Self>,
1116            message: String,
1117        ) -> Result<(), anyhow::Error> {
1118            let Self(port) = self;
1119            let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1120            port.send(cx, (message, seq_info.clone()))?;
1121            Ok(())
1122        }
1123    }
1124
1125    // Unlike Handler<String>, where the sender provides the string message
1126    // directly, in Handler<Callback>, sender needs to provide a port, and
1127    // handler will reply that port with its own callback port. Then sender can
1128    // send the string message through this callback port.
1129    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
1130    struct Callback(reference::PortRef<reference::PortRef<String>>);
1131
1132    #[async_trait]
1133    impl Handler<Callback> for GetSeqActor {
1134        async fn handle(
1135            &mut self,
1136            cx: &Context<Self>,
1137            message: Callback,
1138        ) -> Result<(), anyhow::Error> {
1139            let (handle, mut receiver) = cx.open_port::<String>();
1140            let callback_ref = handle.bind();
1141            message.0.send(cx, callback_ref).unwrap();
1142            let msg = receiver.recv().await.unwrap();
1143            self.handle(cx, msg).await
1144        }
1145    }
1146
1147    #[async_timed_test(timeout_secs = 30)]
1148    async fn test_sequencing_actor_handle_basic() {
1149        let proc = Proc::local();
1150        let (client, _) = proc.instance("client").unwrap();
1151        let (tx, mut rx) = client.open_port();
1152
1153        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1154
1155        // Verify that unbound handle can send message.
1156        actor_handle.send(&client, "unbound".to_string()).unwrap();
1157        assert_eq!(
1158            rx.recv().await.unwrap(),
1159            ("unbound".to_string(), SeqInfo::Direct)
1160        );
1161
1162        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1163
1164        let session_id = client.sequencer().session_id();
1165        let mut expected_seq = 0;
1166        // Interleave messages sent through the handle and the reference.
1167        for m in 0..10 {
1168            actor_handle.send(&client, format!("{m}")).unwrap();
1169            expected_seq += 1;
1170            assert_eq!(
1171                rx.recv().await.unwrap(),
1172                (
1173                    format!("{m}"),
1174                    SeqInfo::Session {
1175                        session_id,
1176                        seq: expected_seq,
1177                    }
1178                )
1179            );
1180
1181            for n in 0..2 {
1182                actor_ref.port().send(&client, format!("{m}-{n}")).unwrap();
1183                expected_seq += 1;
1184                assert_eq!(
1185                    rx.recv().await.unwrap(),
1186                    (
1187                        format!("{m}-{n}"),
1188                        SeqInfo::Session {
1189                            session_id,
1190                            seq: expected_seq,
1191                        }
1192                    )
1193                );
1194            }
1195        }
1196    }
1197
1198    // Test that actor ports share a sequence while non-actor ports get their own.
1199    #[async_timed_test(timeout_secs = 30)]
1200    async fn test_sequencing_mixed_actor_and_non_actor_ports() {
1201        let proc = Proc::local();
1202        let (client, _) = proc.instance("client").unwrap();
1203
1204        // Port for receiving seq info from actor handler
1205        let (actor_tx, mut actor_rx) = client.open_port();
1206
1207        // Channel for receiving seq info from non-actor port
1208        let (non_actor_tx, mut non_actor_rx) = mpsc::unbounded_channel::<Option<SeqInfo>>();
1209
1210        let actor_handle = proc.spawn("get_seq", GetSeqActor(actor_tx.bind())).unwrap();
1211        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1212
1213        // Create a non-actor port using open_enqueue_port
1214        let non_actor_tx_clone = non_actor_tx.clone();
1215        let non_actor_port_handle = client.mailbox().open_enqueue_port(move |headers, _m: ()| {
1216            let seq_info = headers.get(SEQ_INFO);
1217            non_actor_tx_clone.send(seq_info).unwrap();
1218            Ok(())
1219        });
1220
1221        // Bind the port to get a port ID
1222        non_actor_port_handle.bind();
1223        let non_actor_port_id = match non_actor_port_handle.location() {
1224            PortLocation::Bound(port_id) => port_id,
1225            _ => panic!("port_handle should be bound"),
1226        };
1227        assert!(!non_actor_port_id.is_actor_port());
1228
1229        let session_id = client.sequencer().session_id();
1230
1231        // Send to actor ports via ActorHandle - seq 1
1232        actor_handle.send(&client, "msg1".to_string()).unwrap();
1233        assert_eq!(
1234            actor_rx.recv().await.unwrap().1,
1235            SeqInfo::Session { session_id, seq: 1 }
1236        );
1237
1238        // Send to actor ports via ActorRef - seq 2 (shared with ActorHandle)
1239        actor_ref.port().send(&client, "msg2".to_string()).unwrap();
1240        assert_eq!(
1241            actor_rx.recv().await.unwrap().1,
1242            SeqInfo::Session { session_id, seq: 2 }
1243        );
1244
1245        // Send to non-actor port - has its own sequence starting at 1
1246        non_actor_port_handle.send(&client, ()).unwrap();
1247        assert_eq!(
1248            non_actor_rx.recv().await.unwrap(),
1249            Some(SeqInfo::Session { session_id, seq: 1 })
1250        );
1251
1252        // Send more to actor ports via ActorHandle - seq continues at 3
1253        actor_handle.send(&client, "msg3".to_string()).unwrap();
1254        assert_eq!(
1255            actor_rx.recv().await.unwrap().1,
1256            SeqInfo::Session { session_id, seq: 3 }
1257        );
1258
1259        // Send more to non-actor port - its sequence continues at 2
1260        non_actor_port_handle.send(&client, ()).unwrap();
1261        assert_eq!(
1262            non_actor_rx.recv().await.unwrap(),
1263            Some(SeqInfo::Session { session_id, seq: 2 })
1264        );
1265
1266        // Send via ActorRef again - seq 4
1267        actor_ref.port().send(&client, "msg4".to_string()).unwrap();
1268        assert_eq!(
1269            actor_rx.recv().await.unwrap().1,
1270            SeqInfo::Session { session_id, seq: 4 }
1271        );
1272
1273        actor_handle.drain_and_stop("test cleanup").unwrap();
1274        actor_handle.await;
1275    }
1276
1277    // Test that messages from different clients get independent sequence schemes.
1278    #[async_timed_test(timeout_secs = 30)]
1279    async fn test_sequencing_multiple_clients() {
1280        let proc = Proc::local();
1281        let (client1, _) = proc.instance("client1").unwrap();
1282        let (client2, _) = proc.instance("client2").unwrap();
1283
1284        // Port for receiving seq info from actor handler
1285        let (tx, mut rx) = client1.open_port();
1286
1287        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1288        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1289
1290        // Each client should have a different session_id
1291        let session_id_1 = client1.sequencer().session_id();
1292        let session_id_2 = client2.sequencer().session_id();
1293        assert_ne!(session_id_1, session_id_2);
1294
1295        // Send from client1 via ActorHandle - seq 1 for session_id_1
1296        actor_handle.send(&client1, "c1_msg1".to_string()).unwrap();
1297        assert_eq!(
1298            rx.recv().await.unwrap().1,
1299            SeqInfo::Session {
1300                session_id: session_id_1,
1301                seq: 1
1302            }
1303        );
1304
1305        // Send from client2 via ActorHandle - seq 1 for session_id_2 (independent)
1306        actor_handle.send(&client2, "c2_msg1".to_string()).unwrap();
1307        assert_eq!(
1308            rx.recv().await.unwrap().1,
1309            SeqInfo::Session {
1310                session_id: session_id_2,
1311                seq: 1
1312            }
1313        );
1314
1315        // Send from client1 via ActorRef - seq 2 for session_id_1
1316        actor_ref
1317            .port()
1318            .send(&client1, "c1_msg2".to_string())
1319            .unwrap();
1320        assert_eq!(
1321            rx.recv().await.unwrap().1,
1322            SeqInfo::Session {
1323                session_id: session_id_1,
1324                seq: 2
1325            }
1326        );
1327
1328        // Send from client2 via ActorRef - seq 2 for session_id_2
1329        actor_ref
1330            .port()
1331            .send(&client2, "c2_msg2".to_string())
1332            .unwrap();
1333        assert_eq!(
1334            rx.recv().await.unwrap().1,
1335            SeqInfo::Session {
1336                session_id: session_id_2,
1337                seq: 2
1338            }
1339        );
1340
1341        // Interleave more messages to further verify independence
1342        actor_handle.send(&client1, "c1_msg3".to_string()).unwrap();
1343        assert_eq!(
1344            rx.recv().await.unwrap().1,
1345            SeqInfo::Session {
1346                session_id: session_id_1,
1347                seq: 3
1348            }
1349        );
1350
1351        actor_ref
1352            .port()
1353            .send(&client2, "c2_msg3".to_string())
1354            .unwrap();
1355        assert_eq!(
1356            rx.recv().await.unwrap().1,
1357            SeqInfo::Session {
1358                session_id: session_id_2,
1359                seq: 3
1360            }
1361        );
1362
1363        actor_handle.drain_and_stop("test cleanup").unwrap();
1364        actor_handle.await;
1365    }
1366
1367    // Verify that ordering is guarranteed based on
1368    //   * (sender actor , client actor, port stream)
1369    // not
1370    //   * (sender actor, client actor)
1371    //
1372    // For "port stream",
1373    //   * actor ports of the same actor belongs to the same stream;
1374    //   * non-actor port has its independent stream.
1375    //
1376    // Specifically, in this test,
1377    //   * client sends a Callback message to dest actor's handler;
1378    //   * while dest actor is still processing that message, client sends
1379    //     another non-handler message to dest actor.
1380    //
1381    // If the ordering is based on (sender actor, client actor), this test would
1382    // hang, since dest actor is deadlock on waiting for the 2nd message while
1383    // still processing the 2nd message.
1384    //
1385    // But since port stream is also part of the ordering guarrantee, such
1386    // deadlock should not happen.
1387    #[async_timed_test(timeout_secs = 30)]
1388    async fn test_sequencing_actor_handle_callback() {
1389        let config = hyperactor_config::global::lock();
1390        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1391
1392        let proc = Proc::local();
1393        let (client, _) = proc.instance("client").unwrap();
1394        let (tx, mut rx) = client.open_port();
1395
1396        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1397        let actor_ref: reference::ActorRef<GetSeqActor> = actor_handle.bind();
1398
1399        let (callback_tx, mut callback_rx) = client.open_port();
1400        // Client sends the 1st message
1401        actor_ref
1402            .send(&client, Callback(callback_tx.bind()))
1403            .unwrap();
1404        let msg_port_ref = callback_rx.recv().await.unwrap();
1405        // client sends the 2nd message. At this time, GetSeqActor is still
1406        // processing the 1st message, and waiting for the 2nd message.
1407        msg_port_ref.send(&client, "finally".to_string()).unwrap();
1408
1409        let session_id = client.sequencer().session_id();
1410        // passing this assert means GetSeqActor processed the 2nd message.
1411        assert_eq!(
1412            rx.recv().await.unwrap(),
1413            (
1414                "finally".to_string(),
1415                SeqInfo::Session { session_id, seq: 1 }
1416            )
1417        );
1418    }
1419
1420    // Adding a delay before sending the destination proc. Useful for tests
1421    // requiring latency injection.
1422    #[derive(Clone, Debug)]
1423    struct DelayedMailboxSender {
1424        relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1425    }
1426
1427    impl DelayedMailboxSender {
1428        // Use a random latency between 0 and 1 second if the plan is empty.
1429        fn new(
1430            // The proc that hosts the dest actor. By posting envelope to this
1431            // proc, this proc will route that evenlope to the dest actor.
1432            dest_proc: Proc,
1433            // Vec index is the message seq - 1, value is the order this message
1434            // would be relayed to the dest actor. Dest actor is responsible to
1435            // ensure itself processes these messages in order.
1436            relay_orders: Vec<usize>,
1437        ) -> Self {
1438            let (relay_tx, mut relay_rx) = mpsc::unbounded_channel::<MessageEnvelope>();
1439
1440            tokio::spawn(async move {
1441                let mut buffer = Vec::new();
1442
1443                for _ in 0..relay_orders.len() {
1444                    let envelope = relay_rx.recv().await.unwrap();
1445                    buffer.push(envelope);
1446                }
1447
1448                for m in buffer.clone() {
1449                    let seq = match m.headers().get(SEQ_INFO).expect("seq should be set") {
1450                        SeqInfo::Session { seq, .. } => seq as usize,
1451                        SeqInfo::Direct => panic!("expected Session variant"),
1452                    };
1453                    // seq no is one-based.
1454                    let order = relay_orders[seq - 1];
1455                    buffer[order] = m;
1456                }
1457
1458                let dest_proc_clone = dest_proc.clone();
1459                for msg in buffer {
1460                    dest_proc_clone.post(msg, monitored_return_handle());
1461                }
1462            });
1463
1464            Self { relay_tx }
1465        }
1466    }
1467
1468    #[async_trait]
1469    impl MailboxSender for DelayedMailboxSender {
1470        fn post_unchecked(
1471            &self,
1472            envelope: MessageEnvelope,
1473            _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1474        ) {
1475            self.relay_tx.send(envelope).unwrap();
1476        }
1477    }
1478
1479    async fn assert_out_of_order_delivery(expected: Vec<(String, u64)>, relay_orders: Vec<usize>) {
1480        let local_proc: Proc = Proc::local();
1481        let (client, _) = local_proc.instance("local").unwrap();
1482        let (tx, mut rx) = client.open_port();
1483
1484        let handle = local_proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1485        let actor_ref: reference::ActorRef<GetSeqActor> = handle.bind();
1486
1487        let remote_proc = Proc::configured(
1488            test_proc_id("remote_0"),
1489            DelayedMailboxSender::new(local_proc.clone(), relay_orders).boxed(),
1490        );
1491        let (remote_client, _) = remote_proc.instance("remote").unwrap();
1492        // Send the messages out in the order of their expected sequence numbers.
1493        let mut messages = expected.clone();
1494        messages.sort_by_key(|v| v.1);
1495        for (message, _seq) in messages {
1496            actor_ref.send(&remote_client, message).unwrap();
1497        }
1498        let session_id = remote_client.sequencer().session_id();
1499        for expect in expected {
1500            let expected = (
1501                expect.0,
1502                SeqInfo::Session {
1503                    session_id,
1504                    seq: expect.1,
1505                },
1506            );
1507            assert_eq!(rx.recv().await.unwrap(), expected);
1508        }
1509
1510        handle.drain_and_stop("test cleanup").unwrap();
1511        handle.await;
1512    }
1513
1514    // Send several messages, use DelayedMailboxSender and the relay orders to
1515    // ensure these messages will arrive at handler's workq out-of-order.
1516    // Then verify the actor handler will still process these messages based on
1517    // their sending order if reordering buffer is enabled.
1518    #[async_timed_test(timeout_secs = 30)]
1519    async fn test_sequencing_actor_ref_known_delivery_order() {
1520        let config = hyperactor_config::global::lock();
1521
1522        // relay order is second, third, first
1523        let relay_orders = vec![2, 0, 1];
1524
1525        // By disabling the actor side re-ordering buffer, the mssages will
1526        // be processed in the same order as they sent out.
1527        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
1528        assert_out_of_order_delivery(
1529            vec![
1530                ("second".to_string(), 2),
1531                ("third".to_string(), 3),
1532                ("first".to_string(), 1),
1533            ],
1534            relay_orders.clone(),
1535        )
1536        .await;
1537
1538        // By enabling the actor side re-ordering buffer, the mssages will
1539        // be re-ordered before being processed.
1540        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1541        assert_out_of_order_delivery(
1542            vec![
1543                ("first".to_string(), 1),
1544                ("second".to_string(), 2),
1545                ("third".to_string(), 3),
1546            ],
1547            relay_orders.clone(),
1548        )
1549        .await;
1550    }
1551
1552    // Send a large nubmer of messages, use DelayedMailboxSender to ensure these
1553    // messages will arrive at handler's workq in a random order. Then verify the
1554    // actor handler will still process these messages based on their sending
1555    // order with reordering buffer enabled.
1556    #[async_timed_test(timeout_secs = 30)]
1557    async fn test_sequencing_actor_ref_random_delivery_order() {
1558        let config = hyperactor_config::global::lock();
1559
1560        // By enabling the actor side re-ordering buffer, the mssages will
1561        // be re-ordered before being processed.
1562        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1563        let expected = (0..10000)
1564            .map(|i| (format!("msg{i}"), i + 1))
1565            .collect::<Vec<_>>();
1566
1567        let mut relay_orders: Vec<usize> = (0..10000).collect();
1568        relay_orders.shuffle(&mut rand::rng());
1569        assert_out_of_order_delivery(expected, relay_orders).await;
1570    }
1571
1572    /// Verifies the default blanket introspection handler for a plain
1573    /// actor.
1574    ///
1575    /// This test spawns a simple `EchoActor`, sends it
1576    /// `IntrospectMessage::Query`, and checks that the returned
1577    /// `IntrospectResult` matches the framework’s structural default:
1578    ///
1579    /// - `identity` matches the actor id
1580    /// - `attrs` contains actor-runtime keys (status, actor_type, etc.)
1581    /// - no supervision children are reported
1582    /// - `supervisor` is None because this actor is spawned as a
1583    ///   root/top-level actor in the proc (only supervised child actors
1584    ///   report a supervisor id).
1585    ///
1586    /// This exercises the end-to-end introspect task path rather than
1587    /// calling `live_actor_payload` directly, ensuring the runtime
1588    /// wiring behaves as expected.
1589    #[tokio::test]
1590    async fn test_introspect_query_default_payload() {
1591        let proc = Proc::local();
1592        let (client, _) = proc.instance("client").unwrap();
1593        let (tx, _rx) = client.open_port::<u64>();
1594        let actor = EchoActor(tx.bind());
1595        let handle = proc.spawn::<EchoActor>("echo_introspect", actor).unwrap();
1596
1597        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1598        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1599            .send(
1600                &client,
1601                IntrospectMessage::Query {
1602                    view: IntrospectView::Actor,
1603                    reply: reply_port.bind(),
1604                },
1605            )
1606            .unwrap();
1607        let payload = reply_rx.recv().await.unwrap();
1608
1609        assert_eq!(
1610            payload.identity,
1611            crate::introspect::IntrospectRef::Actor(handle.actor_id().clone())
1612        );
1613        assert_valid_attrs(&payload);
1614        assert_has_attr(&payload, "status");
1615        assert_has_attr(&payload, "actor_type");
1616        assert_has_attr(&payload, "created_at");
1617        assert!(payload.children.is_empty());
1618        assert!(payload.parent.is_none());
1619
1620        handle.drain_and_stop("test").unwrap();
1621        handle.await;
1622    }
1623
1624    /// Helper: look up an attr in the attrs JSON by short name.
1625    fn attrs_get(attrs_json: &str, short_name: &str) -> Option<serde_json::Value> {
1626        use hyperactor_config::INTROSPECT;
1627        use hyperactor_config::attrs::AttrKeyInfo;
1628        let fq_name = inventory::iter::<AttrKeyInfo>()
1629            .find(|info| {
1630                info.meta
1631                    .get(INTROSPECT)
1632                    .is_some_and(|ia| ia.name == short_name)
1633            })
1634            .map(|info| info.name)?;
1635        let obj: serde_json::Value = serde_json::from_str(attrs_json).ok()?;
1636        obj.get(fq_name).cloned()
1637    }
1638
1639    /// Assert that an IntrospectResult has valid JSON attrs (IA-1).
1640    fn assert_valid_attrs(result: &IntrospectResult) {
1641        let parsed: serde_json::Value =
1642            serde_json::from_str(&result.attrs).expect("IA-1: attrs must be valid JSON");
1643        assert!(parsed.is_object(), "IA-1: attrs must be a JSON object");
1644    }
1645
1646    /// Assert the actor status attr matches expected value.
1647    fn assert_status(result: &IntrospectResult, expected: &str) {
1648        let status = attrs_get(&result.attrs, "status")
1649            .and_then(|v| v.as_str().map(String::from))
1650            .expect("attrs must contain status");
1651        assert_eq!(status, expected, "unexpected actor status");
1652    }
1653
1654    /// Assert the actor has a specific handler (or None).
1655    fn assert_handler(result: &IntrospectResult, expected: Option<&str>) {
1656        let handler =
1657            attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from));
1658        assert_eq!(handler.as_deref(), expected);
1659    }
1660
1661    /// Assert the error code attr matches expected value.
1662    fn assert_error_code(result: &IntrospectResult, expected: &str) {
1663        let code = attrs_get(&result.attrs, "error_code")
1664            .and_then(|v| v.as_str().map(String::from))
1665            .expect("attrs must contain error_code");
1666        assert_eq!(code, expected);
1667    }
1668
1669    /// Assert handler does NOT contain a substring.
1670    fn assert_handler_not_contains(result: &IntrospectResult, forbidden: &str) {
1671        if let Some(handler) =
1672            attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from))
1673        {
1674            assert!(
1675                !handler.contains(forbidden),
1676                "handler should not contain '{}'; got: {}",
1677                forbidden,
1678                handler
1679            );
1680        }
1681    }
1682
1683    /// Assert an attr is present by short name.
1684    fn assert_has_attr(result: &IntrospectResult, short_name: &str) {
1685        assert!(
1686            attrs_get(&result.attrs, short_name).is_some(),
1687            "attrs must contain '{}'",
1688            short_name
1689        );
1690    }
1691
1692    /// Assert status contains a substring (for non-exact checks
1693    /// like "processing" on wedged actors).
1694    fn assert_status_contains(result: &IntrospectResult, substring: &str) {
1695        let status = attrs_get(&result.attrs, "status")
1696            .and_then(|v| v.as_str().map(String::from))
1697            .expect("attrs must contain status");
1698        assert!(
1699            status.contains(substring),
1700            "status should contain '{}'; got: {}",
1701            substring,
1702            status
1703        );
1704    }
1705
1706    /// Assert no status_reason attr (IA-3: non-terminal status).
1707    fn assert_no_status_reason(result: &IntrospectResult) {
1708        assert!(
1709            attrs_get(&result.attrs, "status_reason").is_none(),
1710            "IA-3: must not have status_reason"
1711        );
1712    }
1713
1714    /// Assert a handler is present (any value).
1715    fn assert_has_handler(result: &IntrospectResult) {
1716        assert!(
1717            attrs_get(&result.attrs, "last_handler").is_some(),
1718            "must have a handler"
1719        );
1720    }
1721
1722    /// Assert no failure attrs are present (IA-4).
1723    fn assert_no_failure_attrs(result: &IntrospectResult) {
1724        assert!(
1725            attrs_get(&result.attrs, "failure_error_message").is_none(),
1726            "IA-4: must not have failure attrs"
1727        );
1728    }
1729
1730    /// Establishes IA-1 (attrs-json), IA-3 (status-shape), and
1731    /// IA-4 (failure-shape) for the running-actor path only.
1732    /// Stopped/failed paths need separate tests (see proc.rs
1733    /// terminated snapshot tests).
1734    #[tokio::test]
1735    async fn test_ia1_ia4_running_actor_attrs() {
1736        let proc = Proc::local();
1737        let (client, _) = proc.instance("client").unwrap();
1738        let (tx, _rx) = client.open_port::<u64>();
1739        let actor = EchoActor(tx.bind());
1740        let handle = proc.spawn::<EchoActor>("ia_test", actor).unwrap();
1741
1742        let payload = crate::introspect::live_actor_payload(handle.cell());
1743
1744        // IA-1: valid JSON.
1745        assert_valid_attrs(&payload);
1746
1747        // IA-3: non-terminal status, no status_reason.
1748        assert_has_attr(&payload, "status");
1749        assert_no_status_reason(&payload);
1750
1751        // IA-4: no failure attrs.
1752        assert_no_failure_attrs(&payload);
1753
1754        handle.drain_and_stop("test").unwrap();
1755        handle.await;
1756    }
1757
1758    // Verifies that QueryChild returns an error for actors without
1759    // a registered query_child_handler callback. The runtime
1760    // introspect task responds with the error sentinel payload
1761    // (`identity == ""`, error attrs with code "not_found",
1762    // .. }`).
1763    #[tokio::test]
1764    async fn test_introspect_query_child_not_found() {
1765        let proc = Proc::local();
1766        let (client, _) = proc.instance("client").unwrap();
1767        let (tx, _rx) = client.open_port::<u64>();
1768        let actor = EchoActor(tx.bind());
1769        let handle = proc.spawn::<EchoActor>("echo_qc", actor).unwrap();
1770
1771        let child_ref =
1772            reference::Reference::Actor(test_proc_id("nonexistent").actor_id("child", 0));
1773        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1774        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1775            .send(
1776                &client,
1777                IntrospectMessage::QueryChild {
1778                    child_ref,
1779                    reply: reply_port.bind(),
1780                },
1781            )
1782            .unwrap();
1783        let payload = reply_rx.recv().await.unwrap();
1784
1785        assert_eq!(
1786            payload.identity,
1787            crate::introspect::IntrospectRef::Actor(
1788                test_proc_id("nonexistent").actor_id("child", 0)
1789            )
1790        );
1791        assert_error_code(&payload, "not_found");
1792
1793        handle.drain_and_stop("test").unwrap();
1794        handle.await;
1795    }
1796
1797    // Verifies that with the runtime introspect task, custom
1798    // `handle_introspect` overrides are not called. The runtime
1799    // task intercepts IntrospectMessage before it reaches the
1800    // actor's work queue. An actor with an override still gets
1801    // standard Actor properties from the runtime task.
1802    #[tokio::test]
1803    async fn test_introspect_override() {
1804        #[derive(Debug, Default)]
1805        #[hyperactor::export(handlers = [])]
1806        struct CustomIntrospectActor;
1807
1808        #[async_trait]
1809        impl Actor for CustomIntrospectActor {}
1810
1811        let proc = Proc::local();
1812        let (client, _) = proc.instance("client").unwrap();
1813        let handle = proc
1814            .spawn("custom_introspect", CustomIntrospectActor)
1815            .unwrap();
1816
1817        handle
1818            .status()
1819            .wait_for(|s| matches!(s, ActorStatus::Idle))
1820            .await
1821            .unwrap();
1822
1823        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1824        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1825            .send(
1826                &client,
1827                IntrospectMessage::Query {
1828                    view: IntrospectView::Actor,
1829                    reply: reply_port.bind(),
1830                },
1831            )
1832            .unwrap();
1833        let payload = reply_rx.recv().await.unwrap();
1834
1835        // The runtime task returns actor attrs (with status), NOT
1836        // the override's Host properties.
1837        assert_has_attr(&payload, "status");
1838
1839        handle.drain_and_stop("test").unwrap();
1840        handle.await;
1841    }
1842
1843    /// Verifies that a child actor spawned via `spawn_child` reports
1844    /// its parent as `supervisor` in the introspection payload, and
1845    /// that the parent's payload lists the child in `children`.
1846    #[tokio::test]
1847    async fn test_introspect_query_supervision_child() {
1848        let proc = Proc::local();
1849        let (client, _) = proc.instance("client").unwrap();
1850
1851        // Spawn parent.
1852        let (tx_parent, _rx_parent) = client.open_port::<u64>();
1853        let parent_handle = proc
1854            .spawn::<EchoActor>("parent", EchoActor(tx_parent.bind()))
1855            .unwrap();
1856
1857        // Spawn child under parent.
1858        let (tx_child, _rx_child) = client.open_port::<u64>();
1859        let child_handle = proc
1860            .spawn_child::<EchoActor>(parent_handle.cell().clone(), EchoActor(tx_child.bind()))
1861            .unwrap();
1862
1863        // Query the child — supervisor should be the parent.
1864        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1865        reference::PortRef::<IntrospectMessage>::attest_message_port(child_handle.actor_id())
1866            .send(
1867                &client,
1868                IntrospectMessage::Query {
1869                    view: IntrospectView::Actor,
1870                    reply: reply_port.bind(),
1871                },
1872            )
1873            .unwrap();
1874        let child_payload = reply_rx.recv().await.unwrap();
1875
1876        assert_eq!(
1877            child_payload.identity,
1878            crate::introspect::IntrospectRef::Actor(child_handle.actor_id().clone()),
1879        );
1880        // Verify it has actor attrs (status present).
1881        assert!(
1882            attrs_get(&child_payload.attrs, "status").is_some(),
1883            "child should have actor attrs"
1884        );
1885        assert_eq!(
1886            child_payload.parent,
1887            Some(crate::introspect::IntrospectRef::Actor(
1888                parent_handle.actor_id().clone()
1889            )),
1890        );
1891
1892        // Query the parent — children should include the child.
1893        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1894        reference::PortRef::<IntrospectMessage>::attest_message_port(parent_handle.actor_id())
1895            .send(
1896                &client,
1897                IntrospectMessage::Query {
1898                    view: IntrospectView::Actor,
1899                    reply: reply_port.bind(),
1900                },
1901            )
1902            .unwrap();
1903        let parent_payload = reply_rx.recv().await.unwrap();
1904
1905        assert!(parent_payload.parent.is_none());
1906        assert!(
1907            parent_payload
1908                .children
1909                .contains(&crate::introspect::IntrospectRef::Actor(
1910                    child_handle.actor_id().clone()
1911                )),
1912        );
1913
1914        child_handle.drain_and_stop("test").unwrap();
1915        child_handle.await;
1916        parent_handle.drain_and_stop("test").unwrap();
1917        parent_handle.await;
1918    }
1919
1920    /// A freshly spawned actor that has received no user messages
1921    /// reports `last_message_handler == None` — the introspect
1922    /// handler does not leak through. Status is `"idle"` once
1923    /// initialization completes.
1924    #[tokio::test]
1925    async fn test_introspect_fresh_actor_status() {
1926        let proc = Proc::local();
1927        let (client, _) = proc.instance("client").unwrap();
1928        let (tx, _rx) = client.open_port::<u64>();
1929        let actor = EchoActor(tx.bind());
1930        let handle = proc.spawn::<EchoActor>("echo_fresh", actor).unwrap();
1931
1932        // Wait for the actor to finish initialization.
1933        handle
1934            .status()
1935            .wait_for(|s| matches!(s, ActorStatus::Idle))
1936            .await
1937            .unwrap();
1938
1939        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1940        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1941            .send(
1942                &client,
1943                IntrospectMessage::Query {
1944                    view: IntrospectView::Actor,
1945                    reply: reply_port.bind(),
1946                },
1947            )
1948            .unwrap();
1949        let payload = reply_rx.recv().await.unwrap();
1950
1951        assert_status(&payload, "idle");
1952        assert_handler(&payload, None);
1953
1954        handle.drain_and_stop("test").unwrap();
1955        handle.await;
1956    }
1957
1958    /// After processing a user message, the introspect payload reports
1959    /// the user message's handler and post-completion status — not
1960    /// the introspect handler itself (one-behind invariant,
1961    /// after-user-traffic case).
1962    #[tokio::test]
1963    async fn test_introspect_after_user_message() {
1964        let proc = Proc::local();
1965        let (client, _) = proc.instance("client").unwrap();
1966        let (tx, mut rx) = client.open_port::<u64>();
1967        let actor = EchoActor(tx.bind());
1968        let handle = proc.spawn::<EchoActor>("echo_after_msg", actor).unwrap();
1969
1970        // Send a user message and wait for it to be processed.
1971        handle.send(&client, 42u64).unwrap();
1972        let _ = rx.recv().await.unwrap();
1973
1974        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1975        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
1976            .send(
1977                &client,
1978                IntrospectMessage::Query {
1979                    view: IntrospectView::Actor,
1980                    reply: reply_port.bind(),
1981                },
1982            )
1983            .unwrap();
1984        let payload = reply_rx.recv().await.unwrap();
1985
1986        assert_status(&payload, "idle");
1987        assert_has_handler(&payload);
1988        assert_handler_not_contains(&payload, "IntrospectMessage");
1989
1990        handle.drain_and_stop("test").unwrap();
1991        handle.await;
1992    }
1993
1994    /// Two consecutive introspect queries: with the runtime
1995    /// introspect task, neither perturbs the actor's state (S2).
1996    /// Both report the same `last_message_handler` for a fresh
1997    /// actor — `None`, not `IntrospectMessage`.
1998    #[tokio::test]
1999    async fn test_introspect_consecutive_queries() {
2000        let proc = Proc::local();
2001        let (client, _) = proc.instance("client").unwrap();
2002        let (tx, _rx) = client.open_port::<u64>();
2003        let actor = EchoActor(tx.bind());
2004        let handle = proc.spawn::<EchoActor>("echo_consec", actor).unwrap();
2005
2006        handle
2007            .status()
2008            .wait_for(|s| matches!(s, ActorStatus::Idle))
2009            .await
2010            .unwrap();
2011
2012        // First introspect query.
2013        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2014        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2015            .send(
2016                &client,
2017                IntrospectMessage::Query {
2018                    view: IntrospectView::Actor,
2019                    reply: reply_port.bind(),
2020                },
2021            )
2022            .unwrap();
2023        let payload1 = reply_rx.recv().await.unwrap();
2024
2025        // Second introspect query.
2026        let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2027        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2028            .send(
2029                &client,
2030                IntrospectMessage::Query {
2031                    view: IntrospectView::Actor,
2032                    reply: reply_port2.bind(),
2033                },
2034            )
2035            .unwrap();
2036        let payload2 = reply_rx2.recv().await.unwrap();
2037
2038        // Neither should show IntrospectMessage as the handler.
2039        assert_handler(&payload1, None);
2040        assert_handler(&payload2, None);
2041
2042        handle.drain_and_stop("test").unwrap();
2043        handle.await;
2044    }
2045
2046    // test_published_properties_round_trip removed — replaced by
2047    // test_publish_attrs_round_trip which tests the Attrs-based API.
2048
2049    /// Verify InstanceCell Attrs storage: `set_published_attrs`
2050    /// replaces the whole bag, `merge_published_attr` merges a single
2051    /// key incrementally. (Instance methods are thin wrappers over
2052    /// these.)
2053    #[tokio::test]
2054    async fn test_publish_attrs_round_trip() {
2055        use hyperactor_config::Attrs;
2056        use hyperactor_config::declare_attrs;
2057
2058        declare_attrs! {
2059            attr TEST_KEY_A: String;
2060            attr TEST_KEY_B: u64;
2061        }
2062
2063        let proc = Proc::local();
2064        let (client, _) = proc.instance("client").unwrap();
2065        let (tx, _rx) = client.open_port::<u64>();
2066        let actor = EchoActor(tx.bind());
2067        let handle = proc.spawn::<EchoActor>("echo_attrs", actor).unwrap();
2068
2069        // Before publishing, attrs are None.
2070        assert!(handle.cell().published_attrs().is_none());
2071
2072        // publish_attrs: replace entire bag.
2073        let mut attrs = Attrs::new();
2074        attrs.set(TEST_KEY_A, "hello".to_string());
2075        handle.cell().set_published_attrs(attrs);
2076        let published = handle.cell().published_attrs().unwrap();
2077        assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2078
2079        // publish_attr: merge single key into existing bag.
2080        handle.cell().merge_published_attr(TEST_KEY_B, 42u64);
2081        let published = handle.cell().published_attrs().unwrap();
2082        assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2083        assert_eq!(published.get(TEST_KEY_B), Some(&42u64));
2084
2085        // publish_attr: overwrite existing key.
2086        handle
2087            .cell()
2088            .merge_published_attr(TEST_KEY_A, "world".to_string());
2089        let published = handle.cell().published_attrs().unwrap();
2090        assert_eq!(published.get(TEST_KEY_A), Some(&"world".to_string()));
2091
2092        handle.drain_and_stop("test").unwrap();
2093        handle.await;
2094    }
2095
2096    /// Verify the query_child_handler callback: register a callback,
2097    /// invoke it via `query_child()`, and confirm the response.
2098    #[tokio::test]
2099    async fn test_query_child_handler_round_trip() {
2100        let proc = Proc::local();
2101        let (client, _) = proc.instance("client").unwrap();
2102        let (tx, _rx) = client.open_port::<u64>();
2103        let actor = EchoActor(tx.bind());
2104        let handle = proc.spawn::<EchoActor>("echo_qch", actor).unwrap();
2105
2106        // Before registering, query_child returns None.
2107        let test_ref = reference::Reference::Actor(test_proc_id("test").actor_id("child", 0));
2108        assert!(handle.cell().query_child(&test_ref).is_none());
2109
2110        // Register a callback.
2111        handle.cell().set_query_child_handler(|child_ref| {
2112            use crate::introspect::IntrospectRef;
2113            let identity = match &child_ref {
2114                reference::Reference::Proc(id) => IntrospectRef::Proc(id.clone()),
2115                reference::Reference::Actor(id) => IntrospectRef::Actor(id.clone()),
2116                reference::Reference::Port(id) => IntrospectRef::Actor(id.actor_id().clone()),
2117            };
2118            IntrospectResult {
2119                identity,
2120                attrs: serde_json::json!({
2121                    "proc_name": "test_proc",
2122                    "num_actors": 42,
2123                })
2124                .to_string(),
2125                children: Vec::new(),
2126                parent: None,
2127                as_of: std::time::SystemTime::now(),
2128            }
2129        });
2130
2131        // Now query_child returns the callback's response.
2132        let payload = handle
2133            .cell()
2134            .query_child(&test_ref)
2135            .expect("callback should produce a payload");
2136        assert_eq!(
2137            payload.identity,
2138            crate::introspect::IntrospectRef::Actor(test_proc_id("test").actor_id("child", 0))
2139        );
2140        let attrs: serde_json::Value =
2141            serde_json::from_str(&payload.attrs).expect("attrs must be valid JSON");
2142        assert_eq!(
2143            attrs.get("proc_name").and_then(|v| v.as_str()),
2144            Some("test_proc")
2145        );
2146        assert_eq!(attrs.get("num_actors").and_then(|v| v.as_u64()), Some(42));
2147
2148        handle.drain_and_stop("test").unwrap();
2149        handle.await;
2150    }
2151
2152    /// Exercises S1 (see `introspect` module doc).
2153    ///
2154    /// Sends a wedging message, then queries introspect while the
2155    /// actor is blocked. The response must arrive and report live
2156    /// processing status.
2157    #[tokio::test]
2158    async fn test_introspect_wedged() {
2159        #[derive(Debug, Default)]
2160        #[hyperactor::export(handlers = [u64])]
2161        struct WedgedActor;
2162
2163        #[async_trait]
2164        impl Actor for WedgedActor {}
2165
2166        #[async_trait]
2167        impl Handler<u64> for WedgedActor {
2168            async fn handle(
2169                &mut self,
2170                _cx: &Context<Self>,
2171                _message: u64,
2172            ) -> Result<(), anyhow::Error> {
2173                // Block forever.
2174                std::future::pending::<()>().await;
2175                Ok(())
2176            }
2177        }
2178
2179        let proc = Proc::local();
2180        let (client, _) = proc.instance("client").unwrap();
2181        let handle = proc.spawn("wedged", WedgedActor).unwrap();
2182
2183        // Wait for idle before sending the wedging message.
2184        handle
2185            .status()
2186            .wait_for(|s| matches!(s, ActorStatus::Idle))
2187            .await
2188            .unwrap();
2189
2190        // Send a u64 to wedge the actor in its handler.
2191        handle.send(&client, 1u64).unwrap();
2192
2193        // Wait for the handler to start blocking.
2194        tokio::time::sleep(Duration::from_millis(50)).await;
2195
2196        // Send introspect query via the dedicated introspect port.
2197        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2198        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2199            .send(
2200                &client,
2201                IntrospectMessage::Query {
2202                    view: IntrospectView::Actor,
2203                    reply: reply_port.bind(),
2204                },
2205            )
2206            .unwrap();
2207
2208        // Must not hang — the introspect task runs independently.
2209        let payload = tokio::time::timeout(Duration::from_secs(5), reply_rx.recv())
2210            .await
2211            .expect("introspect should not hang on a wedged actor")
2212            .unwrap();
2213
2214        assert_status_contains(&payload, "processing");
2215        assert_handler_not_contains(&payload, "IntrospectMessage");
2216    }
2217
2218    /// Exercises S2 (see `introspect` module doc).
2219    ///
2220    /// After a user message, two consecutive introspect queries both
2221    /// report the user message handler.
2222    #[tokio::test]
2223    async fn test_introspect_no_perturbation() {
2224        let proc = Proc::local();
2225        let (client, _) = proc.instance("client").unwrap();
2226        let (tx, mut rx) = client.open_port::<u64>();
2227        let actor = EchoActor(tx.bind());
2228        let handle = proc.spawn::<EchoActor>("echo_no_perturb", actor).unwrap();
2229
2230        // Wait for idle before sending the user message.
2231        handle
2232            .status()
2233            .wait_for(|s| matches!(s, ActorStatus::Idle))
2234            .await
2235            .unwrap();
2236
2237        // Send a user message and wait for it to be processed.
2238        handle.send(&client, 42u64).unwrap();
2239        let _ = rx.recv().await.unwrap();
2240
2241        // First introspect query.
2242        let (reply_port1, reply_rx1) = client.open_once_port::<IntrospectResult>();
2243        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2244            .send(
2245                &client,
2246                IntrospectMessage::Query {
2247                    view: IntrospectView::Actor,
2248                    reply: reply_port1.bind(),
2249                },
2250            )
2251            .unwrap();
2252        let payload1 = reply_rx1.recv().await.unwrap();
2253
2254        // Second introspect query.
2255        let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2256        reference::PortRef::<IntrospectMessage>::attest_message_port(handle.actor_id())
2257            .send(
2258                &client,
2259                IntrospectMessage::Query {
2260                    view: IntrospectView::Actor,
2261                    reply: reply_port2.bind(),
2262                },
2263            )
2264            .unwrap();
2265        let payload2 = reply_rx2.recv().await.unwrap();
2266
2267        // Both should report the user message handler, not IntrospectMessage.
2268        assert_handler_not_contains(&payload1, "IntrospectMessage");
2269        assert_handler_not_contains(&payload2, "IntrospectMessage");
2270        // Consecutive queries must agree (compare parsed, not raw
2271        // strings — HashMap key ordering is non-deterministic).
2272        let attrs1: serde_json::Value = serde_json::from_str(&payload1.attrs).unwrap();
2273        let attrs2: serde_json::Value = serde_json::from_str(&payload2.attrs).unwrap();
2274        assert_eq!(attrs1, attrs2, "consecutive queries should be identical");
2275
2276        handle.drain_and_stop("test").unwrap();
2277        handle.await;
2278    }
2279
2280    /// Exercises CI-1 (see `proc` module doc).
2281    ///
2282    /// Unlike a plain `instance()`, which drops the introspect
2283    /// receiver so queries are silently discarded, an
2284    /// `introspectable_instance` has a live `serve_introspect` task
2285    /// and is fully navigable in admin tooling.
2286    #[tokio::test]
2287    async fn test_introspectable_instance_responds_to_query() {
2288        let proc = Proc::local();
2289        let (bridge, handle) = proc.introspectable_instance("bridge").unwrap();
2290        let actor_id = handle.actor_id().clone();
2291
2292        let (reply_port, reply_rx) = bridge.open_once_port::<IntrospectResult>();
2293        reference::PortRef::<IntrospectMessage>::attest_message_port(&actor_id)
2294            .send(
2295                &bridge,
2296                IntrospectMessage::Query {
2297                    view: IntrospectView::Actor,
2298                    reply: reply_port.bind(),
2299                },
2300            )
2301            .unwrap();
2302        let payload = reply_rx.recv().await.unwrap();
2303
2304        // CI-1: introspectable_instance reports status "client"
2305        // and actor_type "()" (the unit type).
2306        assert_eq!(
2307            payload.identity,
2308            crate::introspect::IntrospectRef::Actor(actor_id.clone())
2309        );
2310        assert_status(&payload, "client");
2311        let actor_type = attrs_get(&payload.attrs, "actor_type")
2312            .and_then(|v| v.as_str().map(String::from))
2313            .expect("must have actor_type");
2314        assert_eq!(actor_type, "()", "CI-1: actor_type must be \"()\"");
2315    }
2316
2317    /// Contrast with CI-1: a plain `instance()` does NOT respond to
2318    /// `IntrospectMessage::Query`. Its introspect receiver is dropped
2319    /// in `Proc::instance()`, so the message is silently discarded
2320    /// and the reply port never receives a value.
2321    ///
2322    /// Callers that need TUI visibility must use
2323    /// `introspectable_instance` instead.
2324    #[tokio::test]
2325    async fn test_instance_does_not_respond_to_query() {
2326        let proc = Proc::local();
2327        let (client, _client_handle) = proc.instance("client").unwrap();
2328        let (_mailbox, mailbox_handle) = proc.instance("mailbox").unwrap();
2329        let mailbox_id = mailbox_handle.actor_id().clone();
2330
2331        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2332        reference::PortRef::<IntrospectMessage>::attest_message_port(&mailbox_id)
2333            .send(
2334                &client,
2335                IntrospectMessage::Query {
2336                    view: IntrospectView::Actor,
2337                    reply: reply_port.bind(),
2338                },
2339            )
2340            .unwrap();
2341
2342        // The introspect receiver was dropped in `instance()`, so the
2343        // message is silently discarded and the reply never arrives.
2344        let result = tokio::time::timeout(Duration::from_millis(100), reply_rx.recv()).await;
2345        assert!(
2346            result.is_err(),
2347            "instance() must not respond to IntrospectMessage (introspect receiver dropped)"
2348        );
2349    }
2350
2351    /// Exercises CI-2 (see `proc` module doc).
2352    ///
2353    /// Dropping the instance transitions status to terminal,
2354    /// causing `serve_introspect` to store a terminated snapshot.
2355    #[tokio::test]
2356    async fn test_introspectable_instance_snapshot_on_drop() {
2357        let proc = Proc::local();
2358        let (instance, handle) = proc.introspectable_instance("bridge").unwrap();
2359        let actor_id = handle.actor_id().clone();
2360
2361        assert!(
2362            proc.all_actor_ids().contains(&actor_id),
2363            "should appear in all_actor_ids while live"
2364        );
2365
2366        // Dropping `instance` transitions status to Stopped, waking
2367        // the serve_introspect task which stores the snapshot.
2368        drop(instance);
2369
2370        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2371        loop {
2372            if proc.terminated_snapshot(&actor_id).is_some() {
2373                break;
2374            }
2375            assert!(
2376                std::time::Instant::now() < deadline,
2377                "timed out waiting for terminated snapshot"
2378            );
2379            tokio::task::yield_now().await;
2380        }
2381
2382        let snapshot = proc.terminated_snapshot(&actor_id).unwrap();
2383        let actor_status = attrs_get(&snapshot.attrs, "status")
2384            .and_then(|v| v.as_str().map(String::from))
2385            .expect("snapshot attrs must contain status");
2386        assert!(
2387            actor_status.starts_with("stopped"),
2388            "CI-2: snapshot actor_status should be stopped, got: {}",
2389            actor_status
2390        );
2391    }
2392}