Skip to main content

hyperactor/
actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(dead_code)] // Allow until this is used outside of tests.
10
11//! This module contains all the core traits required to define and manage actors.
12
13use std::any::TypeId;
14use std::borrow::Cow;
15use std::fmt;
16use std::fmt::Debug;
17use std::future::Future;
18use std::future::IntoFuture;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use enum_as_inner::EnumAsInner;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use hyperactor_config::Flattrs;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::watch;
31use tokio::task::JoinHandle;
32use typeuri::Named;
33
34use crate as hyperactor; // for macros
35use crate::ActorAddr;
36use crate::ActorRef;
37use crate::Data;
38use crate::EndpointLocation;
39use crate::Message;
40use crate::RemoteMessage;
41use crate::context;
42use crate::endpoint::Endpoint;
43use crate::mailbox::MailboxError;
44use crate::mailbox::MailboxSenderError;
45use crate::mailbox::MessageEnvelope;
46use crate::mailbox::PortHandle;
47use crate::mailbox::Undeliverable;
48use crate::mailbox::UndeliverableMessageError;
49use crate::message::Castable;
50use crate::message::IndexedErasedUnbound;
51use crate::proc::Context;
52use crate::proc::HandlerPorts;
53use crate::proc::Instance;
54use crate::proc::InstanceCell;
55use crate::proc::Proc;
56use crate::supervision::ActorSupervisionEvent;
57
58pub mod remote;
59
60/// The shutdown mode requested for an actor.
61#[derive(
62    Clone,
63    Copy,
64    Debug,
65    Serialize,
66    Deserialize,
67    PartialEq,
68    Eq,
69    typeuri::Named
70)]
71pub enum StopMode {
72    /// Stop without draining ordinary queued work first.
73    Stop,
74    /// Stop after draining already accepted ordinary queued work.
75    DrainAndStop,
76}
77wirevalue::register_type!(StopMode);
78
79/// An Actor is an independent, asynchronous thread of execution. Each
80/// actor instance has a mailbox, whose messages are delivered through
81/// the method [`Actor::handle`].
82///
83/// Actors communicate with each other by way of message passing.
84/// Actors are assumed to be _deterministic_: that is, the state of an
85/// actor is determined by the set (and order) of messages it receives.
86#[async_trait]
87pub trait Actor: Sized + Send + 'static {
88    /// Initialize the actor, after the runtime has been fully initialized.
89    /// Init thus provides a mechanism by which an actor can reliably and always
90    /// receive some initial event that can be used to kick off further
91    /// (potentially delayed) processing.
92    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
93        // Default implementation: no init.
94        Ok(())
95    }
96
97    /// Handle a stop request from the runtime.
98    ///
99    /// The default implementation closes handler ingress and then
100    /// either exits immediately or queues an exit after already
101    /// accepted handler work drains. Actors that need to coordinate
102    /// asynchronous shutdown work can override this method and call
103    /// `Instance::exit()` / `Instance::exit_after_drain()` later,
104    /// once they are ready to terminate.
105    async fn handle_stop(
106        &mut self,
107        this: &Instance<Self>,
108        mode: StopMode,
109        reason: &str,
110    ) -> Result<(), anyhow::Error> {
111        handle_stop(this, mode, reason)
112    }
113
114    /// Cleanup things used by this actor before shutting down. Notably this function
115    /// is async and allows more complex cleanup. Simpler cleanup can be handled
116    /// by the impl Drop for this Actor.
117    /// If err is not None, it is the error that this actor is failing with. Any
118    /// errors returned by this function will be logged and ignored.
119    /// If err is None, any errors returned by this function will be propagated
120    /// as an ActorError.
121    /// This function is not called if there is a panic in the actor, as the
122    /// actor may be in an indeterminate state. It is also not called if the
123    /// process is killed, there is no atexit handler or signal handler.
124    async fn cleanup(
125        &mut self,
126        _this: &Instance<Self>,
127        _err: Option<&ActorError>,
128    ) -> Result<(), anyhow::Error> {
129        // Default implementation: no cleanup.
130        Ok(())
131    }
132
133    /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]).
134    /// The spawned actor will be supervised by the parent (spawning) actor.
135    fn spawn(self, cx: &impl context::Actor) -> anyhow::Result<ActorHandle<Self>> {
136        cx.instance().spawn(self)
137    }
138
139    /// Spawn a named child actor. Same supervision semantics as
140    /// `spawn`, but the child gets `name` in its ActorAddr.
141    fn spawn_with_name(
142        self,
143        cx: &impl context::Actor,
144        name: &str,
145    ) -> anyhow::Result<ActorHandle<Self>> {
146        cx.instance().spawn_with_name(name, self)
147    }
148
149    /// Spawns this actor in a detached state, handling its messages
150    /// in a background task. The returned handle is used to control
151    /// the actor's lifecycle and to interact with it.
152    ///
153    /// Actors spawned through `spawn_detached` are not attached to a supervision
154    /// hierarchy, and not managed by a [`Proc`].
155    fn spawn_detached(self) -> Result<ActorHandle<Self>, anyhow::Error> {
156        Proc::isolated().spawn("anon", self)
157    }
158
159    /// This method is used by the runtime to spawn the actor server. It can be
160    /// used by actors that require customized runtime setups
161    /// (e.g., dedicated actor threads), or want to use a custom tokio runtime.
162    #[hyperactor::instrument_infallible]
163    fn spawn_server_task<F>(future: F) -> JoinHandle<F::Output>
164    where
165        F: Future + Send + 'static,
166        F::Output: Send + 'static,
167    {
168        tokio::spawn(future)
169    }
170
171    /// Handle actor supervision event. Return `Ok(true)`` if the event is handled here.
172    async fn handle_supervision_event(
173        &mut self,
174        _this: &Instance<Self>,
175        event: &ActorSupervisionEvent,
176    ) -> Result<bool, anyhow::Error> {
177        // Error events are not handled by default and bubble up to the parent.
178        // Normal lifecycle events (e.g. clean stop) are absorbed.
179        Ok(!event.is_error())
180    }
181
182    /// Default undeliverable message handling behavior.
183    async fn handle_undeliverable_message(
184        &mut self,
185        cx: &Instance<Self>,
186        envelope: Undeliverable<MessageEnvelope>,
187    ) -> Result<(), anyhow::Error> {
188        handle_undeliverable_message(cx, envelope)
189    }
190
191    /// If overridden, we will use this name in place of the
192    /// ActorAddr for talking about this actor in supervision error
193    /// messages.
194    fn display_name(&self) -> Option<String> {
195        None
196    }
197}
198
199/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
200/// as a free function so that `Actor` implementations that override
201/// [`Actor::handle_undeliverable_message`] can fallback to this default.
202pub fn handle_undeliverable_message<A: Actor>(
203    cx: &Instance<A>,
204    undeliverable: Undeliverable<MessageEnvelope>,
205) -> Result<(), anyhow::Error> {
206    match undeliverable {
207        Undeliverable::Message(envelope) => {
208            assert_eq!(envelope.sender(), cx.self_addr());
209            anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
210        }
211        Undeliverable::Lost(lost) => {
212            assert_eq!(&lost.sender, cx.self_addr());
213            anyhow::bail!(UndeliverableMessageError::Lost { lost });
214        }
215    }
216}
217
218/// Default implementation of [`Actor::handle_stop`]. Defined as a free
219/// function so that `Actor` implementations that override
220/// [`Actor::handle_stop`] can fall back to this default.
221pub fn handle_stop<A: Actor>(
222    this: &Instance<A>,
223    mode: StopMode,
224    reason: &str,
225) -> Result<(), anyhow::Error> {
226    // After `close`, no more messages may be enqueued.
227    // exit_after_drain will drain any pending messages before exiting.
228    this.close();
229    match mode {
230        StopMode::Stop => this.exit(reason).map_err(anyhow::Error::from),
231        StopMode::DrainAndStop => this.exit_after_drain(reason).map_err(anyhow::Error::from),
232    }
233}
234
235/// An actor that does nothing. It is used to represent "client only" actors,
236/// returned by [`Proc::client`].
237#[async_trait]
238impl Actor for () {}
239
240impl Referable for () {}
241
242impl Binds<()> for () {
243    fn bind(_ports: &HandlerPorts<Self>) {
244        // Binds no ports.
245    }
246}
247
248/// A Handler allows an actor to handle a specific message type.
249#[async_trait]
250pub trait Handler<M>: Actor {
251    /// Handle the next M-typed message.
252    async fn handle(&mut self, cx: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
253}
254
255/// Blanket Handler impls for bypass-workq message types. Since these messages
256/// bypass workq, they will never be sent to actor's handler.
257///
258/// These exist solely to lock the `Handler<M>` coherence slot for each bypass
259/// type, so no specific `impl Handler<BypassType> for SomeActor` can be written.
260/// The actual delivery for these types goes through dedicated channels set up
261/// in `Instance::new`, not through Handler. See the matching sender-side check
262/// in [crate::ordering::Sequencer::assign_seq] and the registry of bypass types
263/// in [crate::ordering::is_bypass_workq_type_id].
264#[async_trait]
265impl<A: Actor> Handler<Signal> for A {
266    async fn handle(&mut self, _cx: &Context<Self>, _message: Signal) -> Result<(), anyhow::Error> {
267        unimplemented!("signal handler should not be called directly")
268    }
269}
270
271#[async_trait]
272impl<A: Actor> Handler<crate::introspect::IntrospectMessage> for A {
273    async fn handle(
274        &mut self,
275        _cx: &Context<Self>,
276        _message: crate::introspect::IntrospectMessage,
277    ) -> Result<(), anyhow::Error> {
278        unimplemented!("introspect message handler should not be called directly")
279    }
280}
281
282/// This handler provides a default behavior when a message sent by
283/// the actor to another is returned due to delivery failure.
284#[async_trait]
285impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
286    async fn handle(
287        &mut self,
288        cx: &Context<Self>,
289        message: Undeliverable<MessageEnvelope>,
290    ) -> Result<(), anyhow::Error> {
291        let (sender, dest, error) = match &message {
292            Undeliverable::Message(envelope) => (
293                envelope.sender().to_string(),
294                envelope.dest().to_string(),
295                envelope.error_msg().unwrap_or_default(),
296            ),
297            Undeliverable::Lost(lost) => (
298                lost.sender.to_string(),
299                lost.dest.to_string(),
300                lost.error.clone(),
301            ),
302        };
303        match self.handle_undeliverable_message(cx, message).await {
304            Ok(_) => {
305                tracing::debug!(
306                    actor_id = %cx.self_addr(),
307                    name = "undeliverable_message_handled",
308                    %sender,
309                    %dest,
310                    error,
311                );
312                Ok(())
313            }
314            Err(e) => {
315                tracing::error!(
316                    actor_id = %cx.self_addr(),
317                    name = "undeliverable_message",
318                    %sender,
319                    %dest,
320                    error,
321                    handler_error = %e,
322                );
323                Err(e)
324            }
325        }
326    }
327}
328
329/// This handler enables actors to unbind the [IndexedErasedUnbound]
330/// message, and forward the result to corresponding handler.
331#[async_trait]
332impl<A, M> Handler<IndexedErasedUnbound<M>> for A
333where
334    A: Handler<M>,
335    M: Castable,
336{
337    async fn handle(
338        &mut self,
339        cx: &Context<Self>,
340        msg: IndexedErasedUnbound<M>,
341    ) -> anyhow::Result<()> {
342        let message = msg.downcast()?.bind()?;
343        Handler::handle(self, cx, message).await
344    }
345}
346
347/// An `Actor` that can be spawned remotely.
348///
349/// Bounds explained:
350/// - `Actor`: only actors may be remotely spawned.
351/// - `Referable`: marks the type as eligible for typed remote
352///   references (`ActorRef<A>`); required because remote spawn
353///   ultimately hands back an `ActorAddr` that higher-level APIs may
354///   re-type as `ActorRef<A>`.
355/// - `Binds<Self>`: lets the runtime wire this actor's handler ports
356///   when it is spawned (the blanket impl calls `handle.bind::<Self>()`).
357///
358/// `gspawn_root_bind` is a type-erased entry point used by the remote
359/// spawn/registry machinery. It takes serialized params and returns
360/// the new actor's `ActorAddr`; application code shouldn't call it
361/// directly.
362#[async_trait]
363pub trait RemoteSpawn: Actor + Referable + Binds<Self> {
364    /// The type of parameters used to instantiate the actor remotely.
365    type Params: RemoteMessage;
366
367    /// Creates a new actor instance given its instantiation parameters.
368    /// The `environment` allows whoever is responsible for spawning this actor
369    /// to pass in additional context that may be useful.
370    async fn new(params: Self::Params, environment: Flattrs) -> anyhow::Result<Self>;
371
372    /// A type-erased entry point to spawn this actor as a root. This is
373    /// primarily used by hyperactor's remote actor registration
374    /// mechanism.
375    // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`].
376    fn gspawn_root_bind(
377        proc: &Proc,
378        uid: crate::id::Uid,
379        serialized_params: Data,
380        environment: Flattrs,
381    ) -> Pin<Box<dyn Future<Output = Result<ActorAddr, anyhow::Error>> + Send>> {
382        let proc = proc.clone();
383        Box::pin(async move {
384            let params =
385                bincode::serde::decode_from_slice(&serialized_params, bincode::config::legacy())
386                    .map(|(v, _)| v)?;
387            let actor = Self::new(params, environment).await?;
388            let handle = proc.spawn_with_uid(uid, actor)?;
389            // We return only the ActorAddr, not a typed ActorRef.
390            // Callers that hold this ID can interact with the actor
391            // only via the serialized/opaque messaging path, which
392            // makes it safe to export across process boundaries.
393            //
394            // Note: the actor itself is still `A`-typed here; we
395            // merely restrict the *capability* we hand out to an
396            // untyped identifier.
397            //
398            // This will be replaced by a proper export/registry
399            // mechanism.
400            Ok(handle.bind::<Self>().into_actor_addr())
401        })
402    }
403
404    /// A type-erased entry point to spawn this actor as a child.
405    ///
406    /// The returned handle is lifecycle-only; callers that know the concrete
407    /// actor type can recover a typed handle with [`AnyActorHandle::downcast`].
408    fn gspawn_child(
409        proc: &Proc,
410        parent: InstanceCell,
411        uid: crate::id::Uid,
412        serialized_params: Data,
413        environment: Flattrs,
414    ) -> Pin<Box<dyn Future<Output = Result<AnyActorHandle, anyhow::Error>> + Send>> {
415        let proc = proc.clone();
416        Box::pin(async move {
417            let params =
418                bincode::serde::decode_from_slice(&serialized_params, bincode::config::legacy())
419                    .map(|(v, _)| v)?;
420            let actor = Self::new(params, environment).await?;
421            let handle = proc.spawn_child_with_uid(parent, uid, actor)?;
422            handle.bind::<Self>();
423            Ok(handle.into_any())
424        })
425    }
426
427    /// The type ID of this actor.
428    fn get_type_id() -> TypeId {
429        TypeId::of::<Self>()
430    }
431}
432
433/// If an actor implements Default, we use this as the
434/// `RemoteSpawn` implementation, too.
435#[async_trait]
436impl<A: Actor + Referable + Binds<Self> + Default> RemoteSpawn for A {
437    type Params = ();
438
439    async fn new(_params: Self::Params, _environment: Flattrs) -> anyhow::Result<Self> {
440        Ok(Default::default())
441    }
442}
443
444/// Errors that occur while serving actors. Each error is associated
445/// with the ID of the actor being served.
446#[derive(Debug)]
447pub struct ActorError {
448    /// The ActorAddr for the actor that generated this error.
449    pub actor_id: Box<ActorAddr>,
450    /// The kind of error that occurred.
451    pub kind: Box<ActorErrorKind>,
452}
453
454/// The kinds of actor serving errors.
455#[derive(thiserror::Error, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
456pub enum ActorErrorKind {
457    /// Generic error with a formatted message.
458    #[error("{0}")]
459    Generic(String),
460
461    /// An error that occurred while trying to handle a supervision event.
462    #[error("{0} while handling {1}")]
463    ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
464
465    /// The actor did not attempt to handle
466    #[error("{0}")]
467    UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
468
469    /// The actor was explicitly aborted with the provided reason.
470    #[error("actor explicitly aborted due to: {0}")]
471    Aborted(String),
472}
473
474impl ActorErrorKind {
475    /// Error while processing actor, i.e., returned by the actor's
476    /// processing method.
477    pub fn processing(err: anyhow::Error) -> Self {
478        // Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
479        // If it is directly use it as the new ActorError's ActorErrorKind.
480        // This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
481        // up the handling infrastructure.
482        err.downcast::<ActorErrorKind>()
483            .unwrap_or_else(|err| Self::Generic(err.to_string()))
484    }
485
486    /// Unwound stracktrace of a panic.
487    pub fn panic(err: anyhow::Error) -> Self {
488        Self::Generic(format!("panic: {}", err))
489    }
490
491    /// Error during actor initialization.
492    pub fn init(err: anyhow::Error) -> Self {
493        Self::Generic(format!("initialization error: {}", err))
494    }
495
496    /// Error during actor cleanup.
497    pub fn cleanup(err: anyhow::Error) -> Self {
498        Self::Generic(format!("cleanup error: {}", err))
499    }
500
501    /// An underlying mailbox error.
502    pub fn mailbox(err: MailboxError) -> Self {
503        Self::Generic(err.to_string())
504    }
505
506    /// An underlying mailbox sender error.
507    pub fn mailbox_sender(err: MailboxSenderError) -> Self {
508        Self::Generic(err.to_string())
509    }
510
511    /// The actor's state could not be determined.
512    pub fn indeterminate_state() -> Self {
513        Self::Generic("actor is in an indeterminate state".to_string())
514    }
515}
516
517impl ActorError {
518    /// Create a new actor server error with the provided id and kind.
519    pub(crate) fn new(actor_id: &ActorAddr, kind: ActorErrorKind) -> Self {
520        Self {
521            actor_id: Box::new(actor_id.clone()),
522            kind: Box::new(kind),
523        }
524    }
525}
526
527impl fmt::Display for ActorError {
528    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
529        write!(f, "serving {}: ", self.actor_id)?;
530        fmt::Display::fmt(&self.kind, f)
531    }
532}
533
534impl std::error::Error for ActorError {
535    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
536        self.kind.source()
537    }
538}
539
540impl From<MailboxError> for ActorError {
541    fn from(inner: MailboxError) -> Self {
542        Self {
543            actor_id: Box::new(inner.actor_addr().clone()),
544            kind: Box::new(ActorErrorKind::mailbox(inner)),
545        }
546    }
547}
548
549impl From<MailboxSenderError> for ActorError {
550    fn from(inner: MailboxSenderError) -> Self {
551        Self {
552            actor_id: Box::new(inner.location().actor_addr()),
553            kind: Box::new(ActorErrorKind::mailbox_sender(inner)),
554        }
555    }
556}
557
558/// A collection of signals to control the behavior of the actor.
559/// Signals are internal runtime control plane messages and should not be
560/// sent outside of the runtime.
561///
562/// These messages are not handled directly by actors; instead, the runtime
563/// handles the various signals.
564#[derive(Clone, Debug, Serialize, Deserialize, typeuri::Named)]
565pub enum Signal {
566    /// Stop the actor, after draining messages.
567    DrainAndStop(String),
568
569    /// Stop the actor immediately.
570    Stop(String),
571
572    /// Exit the actor loop with the provided stop reason.
573    ExitRequested(String),
574
575    /// The direct child with the given uid was stopped.
576    ChildStopped(crate::id::Uid),
577
578    /// Kill the actor. This will exit the actor loop with an error,
579    /// causing a supervision event to propagate up the supervision
580    /// hierarchy.
581    Kill(String),
582}
583wirevalue::register_type!(Signal);
584
585impl fmt::Display for Signal {
586    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
587        match self {
588            Signal::DrainAndStop(reason) => write!(f, "DrainAndStop({})", reason),
589            Signal::Stop(reason) => write!(f, "Stop({})", reason),
590            Signal::ExitRequested(reason) => write!(f, "ExitRequested({})", reason),
591            Signal::ChildStopped(uid) => write!(f, "ChildStopped({})", uid),
592            Signal::Kill(reason) => write!(f, "Kill({})", reason),
593        }
594    }
595}
596
597/// Information about a message handler being processed.
598///
599/// Uses `Cow<'static, str>` to avoid string copies on the hot path.
600/// The typename and arm are typically static strings from `TypeInfo`.
601#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
602pub struct HandlerInfo {
603    /// The type name of the message being handled.
604    pub typename: Cow<'static, str>,
605    /// The enum arm being handled, if the message is an enum.
606    pub arm: Option<Cow<'static, str>>,
607}
608
609impl HandlerInfo {
610    /// Create a new `HandlerInfo` from static strings (zero-copy).
611    pub fn from_static(typename: &'static str, arm: Option<&'static str>) -> Self {
612        Self {
613            typename: Cow::Borrowed(typename),
614            arm: arm.map(Cow::Borrowed),
615        }
616    }
617
618    /// Create a new `HandlerInfo` from owned strings.
619    pub fn from_owned(typename: String, arm: Option<String>) -> Self {
620        Self {
621            typename: Cow::Owned(typename),
622            arm: arm.map(Cow::Owned),
623        }
624    }
625}
626
627impl fmt::Display for HandlerInfo {
628    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629        match &self.arm {
630            Some(arm) => write!(f, "{}.{}", self.typename, arm),
631            None => write!(f, "{}", self.typename),
632        }
633    }
634}
635
636/// The runtime status of an actor.
637#[derive(
638    Debug,
639    Serialize,
640    Deserialize,
641    PartialEq,
642    Eq,
643    Clone,
644    typeuri::Named,
645    EnumAsInner
646)]
647pub enum ActorStatus {
648    /// The actor status is unknown.
649    Unknown,
650    /// The actor was created, but not yet started.
651    Created,
652    /// The actor is initializing. It is not yet ready to receive messages.
653    Initializing,
654    /// The actor is in "client" state: the user is managing the actor's
655    /// mailboxes manually.
656    Client,
657    /// The actor is ready to receive messages, but is currently idle.
658    Idle,
659    /// The actor has been processing a message, beginning at the specified
660    /// instant. The message handler info is included.
661    Processing(SystemTime, Option<HandlerInfo>),
662    /// The actor is stopping. It is draining messages.
663    Stopping,
664    /// The actor is stopped with a provided reason.
665    /// It is no longer processing messages.
666    Stopped(String),
667    /// The actor failed with the provided actor error.
668    Failed(ActorErrorKind),
669}
670
671impl ActorStatus {
672    /// Tells whether the status is a terminal state.
673    pub fn is_terminal(&self) -> bool {
674        self.is_stopped() || self.is_failed()
675    }
676
677    /// Create a generic failure status with the provided error message.
678    pub fn generic_failure(message: impl Into<String>) -> Self {
679        Self::Failed(ActorErrorKind::Generic(message.into()))
680    }
681
682    fn span_string(&self) -> &'static str {
683        self.arm().unwrap_or_default()
684    }
685}
686
687impl fmt::Display for ActorStatus {
688    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689        match self {
690            Self::Unknown => write!(f, "unknown"),
691            Self::Created => write!(f, "created"),
692            Self::Initializing => write!(f, "initializing"),
693            Self::Client => write!(f, "client"),
694            Self::Idle => write!(f, "idle"),
695            Self::Processing(instant, None) => {
696                write!(
697                    f,
698                    "processing for {}ms",
699                    std::time::SystemTime::now()
700                        .duration_since(*instant)
701                        .unwrap_or_default()
702                        .as_millis()
703                )
704            }
705            Self::Processing(instant, Some(handler_info)) => {
706                write!(
707                    f,
708                    "{}: processing for {}ms",
709                    handler_info,
710                    std::time::SystemTime::now()
711                        .duration_since(*instant)
712                        .unwrap_or_default()
713                        .as_millis()
714                )
715            }
716            Self::Stopping => write!(f, "stopping"),
717            Self::Stopped(reason) => write!(f, "stopped: {}", reason),
718            Self::Failed(err) => write!(f, "failed: {}", err),
719        }
720    }
721}
722
723/// ActorHandles represent a (local) serving actor. It is used to access
724/// its messaging and signal ports, as well as to synchronize with its
725/// lifecycle (e.g., providing joins).  Once dropped, the handle is
726/// detached from the underlying actor instance, and there is no longer
727/// any way to join it.
728///
729/// Correspondingly, [`crate::ActorAddr`]s refer to (possibly) remote
730/// actors.
731pub struct ActorHandle<A: Actor> {
732    cell: InstanceCell,
733    ports: Arc<HandlerPorts<A>>,
734}
735
736/// A handle to a running (local) actor.
737impl<A: Actor> ActorHandle<A> {
738    pub(crate) fn new(cell: InstanceCell, ports: Arc<HandlerPorts<A>>) -> Self {
739        Self { cell, ports }
740    }
741
742    /// The actor's cell. Used primarily for testing.
743    /// TODO: this should not be a public API.
744    pub(crate) fn cell(&self) -> &InstanceCell {
745        &self.cell
746    }
747
748    /// The [`ActorAddr`] of the actor represented by this handle.
749    pub fn actor_addr(&self) -> &ActorAddr {
750        self.cell.actor_addr()
751    }
752
753    /// Signal the actor to drain its current messages and then stop.
754    pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
755        tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_addr());
756        self.cell.signal(Signal::DrainAndStop(reason.to_string()))
757    }
758
759    /// Signal the actor to stop without draining ordinary queued
760    /// work first.
761    pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
762        tracing::info!("actor handle stop called: {}", self.actor_addr());
763        self.cell.signal(Signal::Stop(reason.to_string()))
764    }
765
766    /// Signal the actor to terminate immediately.
767    pub fn kill(&self, reason: &str) -> Result<(), ActorError> {
768        tracing::info!("actor handle kill called: {}", self.actor_addr());
769        self.cell.signal(Signal::Kill(reason.to_string()))
770    }
771
772    /// A watch that observes the lifecycle state of the actor.
773    pub fn status(&self) -> watch::Receiver<ActorStatus> {
774        self.cell.status().clone()
775    }
776
777    /// Return a port for the provided message type handled by the actor.
778    pub fn port<M: Message>(&self) -> PortHandle<M>
779    where
780        A: Handler<M>,
781    {
782        self.ports.get()
783    }
784
785    /// TEMPORARY: bind...
786    /// TODO: we shoudl also have a default binding(?)
787    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
788        self.cell.bind(self.ports.as_ref())
789    }
790
791    /// Erase this handle's actor type, preserving only lifecycle access.
792    pub fn into_any(self) -> AnyActorHandle {
793        AnyActorHandle { cell: self.cell }
794    }
795}
796
797/// A type-erased handle to a running actor whose concrete type is erased.
798///
799/// This handle intentionally does not expose typed messaging or binding APIs.
800/// Use [`AnyActorHandle::downcast`] to recover a typed [`ActorHandle`] when the
801/// concrete actor type is known.
802pub struct AnyActorHandle {
803    cell: InstanceCell,
804}
805
806impl AnyActorHandle {
807    /// The [`ActorAddr`] of the actor represented by this handle.
808    pub fn actor_id(&self) -> &ActorAddr {
809        self.cell.actor_addr()
810    }
811
812    /// Signal the actor to drain its current messages and then stop.
813    pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
814        self.cell.signal(Signal::DrainAndStop(reason.to_string()))
815    }
816
817    /// Signal the actor to stop without draining ordinary queued work first.
818    pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
819        self.cell.signal(Signal::Stop(reason.to_string()))
820    }
821
822    /// Signal the actor to terminate immediately.
823    pub fn kill(&self, reason: &str) -> Result<(), ActorError> {
824        self.cell.signal(Signal::Kill(reason.to_string()))
825    }
826
827    /// A watch that observes the lifecycle state of the actor.
828    pub fn status(&self) -> watch::Receiver<ActorStatus> {
829        self.cell.status().clone()
830    }
831
832    /// Attempt to recover a typed actor handle.
833    pub fn downcast<A: Actor>(&self) -> Option<ActorHandle<A>> {
834        self.cell.downcast_handle()
835    }
836}
837
838/// IntoFuture allows users to await the handle to join it. The future
839/// resolves when the actor itself has stopped processing messages.
840/// The future resolves to the actor's final status.
841impl IntoFuture for AnyActorHandle {
842    type Output = ActorStatus;
843    type IntoFuture = BoxFuture<'static, Self::Output>;
844
845    fn into_future(self) -> Self::IntoFuture {
846        let future = async move {
847            let mut status_receiver = self.cell.status().clone();
848            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
849            match result {
850                Err(_) => ActorStatus::Unknown,
851                Ok(status) => status.clone(),
852            }
853        };
854
855        future.boxed()
856    }
857}
858
859impl Debug for AnyActorHandle {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
861        f.debug_struct("AnyActorHandle")
862            .field("cell", &"..")
863            .finish()
864    }
865}
866
867impl Clone for AnyActorHandle {
868    fn clone(&self) -> Self {
869        Self {
870            cell: self.cell.clone(),
871        }
872    }
873}
874
875/// IntoFuture allows users to await the handle to join it. The future
876/// resolves when the actor itself has stopped processing messages.
877/// The future resolves to the actor's final status.
878impl<A: Actor> IntoFuture for ActorHandle<A> {
879    type Output = ActorStatus;
880    type IntoFuture = BoxFuture<'static, Self::Output>;
881
882    fn into_future(self) -> Self::IntoFuture {
883        let future = async move {
884            let mut status_receiver = self.cell.status().clone();
885            let result = status_receiver.wait_for(ActorStatus::is_terminal).await;
886            match result {
887                Err(_) => ActorStatus::Unknown,
888                Ok(status) => status.clone(),
889            }
890        };
891
892        future.boxed()
893    }
894}
895
896impl<A: Actor> Debug for ActorHandle<A> {
897    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
898        f.debug_struct("ActorHandle").field("cell", &"..").finish()
899    }
900}
901
902impl<A, M> Endpoint<M> for &ActorHandle<A>
903where
904    A: Actor + Handler<M>,
905    M: Message,
906{
907    fn endpoint_location(&self) -> EndpointLocation {
908        EndpointLocation::Actor(self.actor_addr().clone())
909    }
910
911    fn post<C>(self, cx: &C, message: M)
912    where
913        C: context::Actor,
914    {
915        Endpoint::post(&self.ports.get(), cx, message)
916    }
917}
918
919impl<A: Actor> Clone for ActorHandle<A> {
920    fn clone(&self) -> Self {
921        Self {
922            cell: self.cell.clone(),
923            ports: self.ports.clone(),
924        }
925    }
926}
927
928/// `Referable` is a marker trait for types that can appear as
929/// remote references across process boundaries.
930///
931/// It is not limited to concrete [`Actor`] implementations. For
932/// example, façade types generated by [`behavior!`] implement
933/// `Referable` so that you can hand out restricted or stable APIs
934/// while still using the same remote messaging machinery.
935///
936/// Implementing this trait means the type can be identified (`Named`)
937/// so the runtime knows what it is.
938///
939///  In contrast, [`RemoteSpawn`] is the trait that marks *actors*
940/// that can actually be **spawned remotely**. A behavior may be a
941/// `Referable` but is never a `RemoteSpawn`.
942pub trait Referable: Named {}
943
944/// Binds determines how an actor's ports are bound to a specific
945/// reference type.
946pub trait Binds<A: Actor>: Referable {
947    /// Bind ports in this actor.
948    fn bind(ports: &HandlerPorts<A>);
949}
950
951/// Handles is a marker trait specifying that message type [`M`]
952/// is handled by a specific actor type.
953pub trait RemoteHandles<M: RemoteMessage>: Referable {}
954
955/// Check if the actor behaves-as the a given behavior (defined by [`behavior!`]).
956///
957/// ```
958/// # use serde::Serialize;
959/// # use serde::Deserialize;
960/// # use typeuri::Named;
961/// # use hyperactor::Actor;
962///
963/// // First, define a behavior, based on handling a single message type `()`.
964/// hyperactor::behavior!(UnitBehavior, ());
965///
966/// #[derive(Debug, Default)]
967/// struct MyActor;
968///
969/// impl Actor for MyActor {}
970///
971/// #[async_trait::async_trait]
972/// impl hyperactor::Handler<()> for MyActor {
973///     async fn handle(
974///         &mut self,
975///         _cx: &hyperactor::Context<Self>,
976///         _message: (),
977///     ) -> Result<(), anyhow::Error> {
978///         // no-op
979///         Ok(())
980///     }
981/// }
982///
983/// hyperactor::assert_behaves!(MyActor as UnitBehavior);
984/// ```
985#[macro_export]
986macro_rules! assert_behaves {
987    ($ty:ty as $behavior:ty) => {
988        const _: fn() = || {
989            fn check<B: hyperactor::actor::Binds<$ty>>() {}
990            check::<$behavior>();
991        };
992    };
993}
994
995#[cfg(test)]
996mod tests {
997    use std::sync::Mutex;
998    use std::time::Duration;
999
1000    use rand::seq::SliceRandom;
1001    use timed_test::async_timed_test;
1002    use tokio::sync::mpsc;
1003    use tokio::time::timeout;
1004
1005    use super::*;
1006    use crate as hyperactor;
1007    use crate::Actor;
1008    use crate::ActorRef;
1009    use crate::Addr;
1010    use crate::OncePortHandle;
1011    use crate::PortRef;
1012    use crate::config;
1013    use crate::context::Mailbox as _;
1014    use crate::introspect::IntrospectMessage;
1015    use crate::introspect::IntrospectResult;
1016    use crate::introspect::IntrospectView;
1017    use crate::mailbox::BoxableMailboxSender as _;
1018    use crate::mailbox::MailboxSender;
1019    use crate::mailbox::PortLocation;
1020    use crate::mailbox::monitored_return_handle;
1021    use crate::ordering::SEQ_INFO;
1022    use crate::ordering::SeqInfo;
1023    use crate::testing::ids::test_proc_id;
1024    use crate::testing::pingpong::PingPongActor;
1025    use crate::testing::pingpong::PingPongMessage;
1026    use crate::testing::proc_supervison::ProcSupervisionCoordinator; // for macros
1027
1028    #[derive(Debug)]
1029    struct EchoActor(PortRef<u64>);
1030
1031    #[async_trait]
1032    impl Actor for EchoActor {}
1033
1034    #[async_trait]
1035    impl Handler<u64> for EchoActor {
1036        async fn handle(&mut self, cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1037            let Self(port) = self;
1038            port.post(cx, message);
1039            Ok(())
1040        }
1041    }
1042
1043    #[tokio::test]
1044    async fn test_server_basic() {
1045        let proc = Proc::isolated();
1046        let (client, _) = proc.client("client").unwrap();
1047        let (tx, mut rx) = client.open_port();
1048        let actor = EchoActor(tx.bind());
1049        let handle = proc.spawn::<EchoActor>("echo", actor).unwrap();
1050        handle.post(&client, 123u64);
1051        handle.drain_and_stop("test").unwrap();
1052        handle.await;
1053
1054        assert_eq!(rx.drain(), vec![123u64]);
1055    }
1056
1057    #[tokio::test]
1058    async fn test_ping_pong() {
1059        let proc = Proc::isolated();
1060        let (client, _) = proc.client("client").unwrap();
1061        let (undeliverable_msg_tx, _) = client.open_port();
1062
1063        let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
1064        let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None);
1065        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
1066        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
1067
1068        let (local_port, local_receiver) = client.open_once_port();
1069
1070        ping_handle.post(
1071            &client,
1072            PingPongMessage(10, pong_handle.bind(), local_port.bind()),
1073        );
1074
1075        assert!(local_receiver.recv().await.unwrap());
1076    }
1077
1078    #[tokio::test]
1079    async fn test_ping_pong_on_handler_error() {
1080        let proc = Proc::isolated();
1081        let (client, _) = proc.client("client").unwrap();
1082        let (undeliverable_msg_tx, _) = client.open_port();
1083
1084        // Need to set a supervison coordinator for this Proc because there will
1085        // be actor failure(s) in this test which trigger supervision.
1086        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
1087
1088        let error_ttl = 66;
1089
1090        let ping_actor =
1091            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
1092        let pong_actor =
1093            PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None);
1094        let ping_handle = proc.spawn::<PingPongActor>("ping", ping_actor).unwrap();
1095        let pong_handle = proc.spawn::<PingPongActor>("pong", pong_actor).unwrap();
1096
1097        let (local_port, local_receiver) = client.open_once_port();
1098
1099        ping_handle.post(
1100            &client,
1101            PingPongMessage(
1102                error_ttl + 1, // will encounter an error at TTL=66
1103                pong_handle.bind(),
1104                local_port.bind(),
1105            ),
1106        );
1107
1108        // TODO: Fix this receiver hanging issue in T200423722.
1109        let res: Result<Result<bool, MailboxError>, tokio::time::error::Elapsed> =
1110            timeout(Duration::from_secs(5), local_receiver.recv()).await;
1111        assert!(res.is_err());
1112    }
1113
1114    #[derive(Debug)]
1115    struct InitActor(bool);
1116
1117    #[async_trait]
1118    impl Actor for InitActor {
1119        async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
1120            self.0 = true;
1121            Ok(())
1122        }
1123    }
1124
1125    #[async_trait]
1126    impl Handler<OncePortHandle<bool>> for InitActor {
1127        async fn handle(
1128            &mut self,
1129            cx: &Context<Self>,
1130            port: OncePortHandle<bool>,
1131        ) -> Result<(), anyhow::Error> {
1132            port.post(cx, self.0);
1133            Ok(())
1134        }
1135    }
1136
1137    #[tokio::test]
1138    async fn test_init() {
1139        let proc = Proc::isolated();
1140        let actor = InitActor(false);
1141        let handle = proc.spawn::<InitActor>("init", actor).unwrap();
1142        let (client, _) = proc.client("client").unwrap();
1143
1144        let (port, receiver) = client.open_once_port();
1145        handle.post(&client, port);
1146        assert!(receiver.recv().await.unwrap());
1147
1148        handle.drain_and_stop("test").unwrap();
1149        handle.await;
1150    }
1151
1152    type MultiValues = Arc<Mutex<(u64, String)>>;
1153
1154    struct MultiValuesTest {
1155        proc: Proc,
1156        values: MultiValues,
1157        handle: ActorHandle<MultiActor>,
1158        client: Instance<()>,
1159        _client_handle: ActorHandle<()>,
1160    }
1161
1162    impl MultiValuesTest {
1163        async fn new() -> Self {
1164            let proc = Proc::isolated();
1165            let values: MultiValues = Arc::new(Mutex::new((0, "".to_string())));
1166            let actor = MultiActor(values.clone());
1167            let handle = proc.spawn::<MultiActor>("myactor", actor).unwrap();
1168            let (client, client_handle) = proc.client("client").unwrap();
1169            Self {
1170                proc,
1171                values,
1172                handle,
1173                client,
1174                _client_handle: client_handle,
1175            }
1176        }
1177
1178        fn send<M>(&self, message: M)
1179        where
1180            M: RemoteMessage,
1181            MultiActor: Handler<M>,
1182        {
1183            self.handle.post(&self.client, message)
1184        }
1185
1186        async fn sync(&self) {
1187            let (port, done) = self.client.open_once_port::<bool>();
1188            self.handle.post(&self.client, port);
1189            assert!(done.recv().await.unwrap());
1190        }
1191
1192        fn get_values(&self) -> (u64, String) {
1193            self.values.lock().unwrap().clone()
1194        }
1195    }
1196
1197    #[derive(Debug)]
1198    #[hyperactor::export(handlers = [u64, String])]
1199    struct MultiActor(MultiValues);
1200
1201    #[async_trait]
1202    impl Actor for MultiActor {}
1203
1204    #[async_trait]
1205    impl Handler<u64> for MultiActor {
1206        async fn handle(&mut self, _cx: &Context<Self>, message: u64) -> Result<(), anyhow::Error> {
1207            let mut vals = self.0.lock().unwrap();
1208            vals.0 = message;
1209            Ok(())
1210        }
1211    }
1212
1213    #[async_trait]
1214    impl Handler<String> for MultiActor {
1215        async fn handle(
1216            &mut self,
1217            _cx: &Context<Self>,
1218            message: String,
1219        ) -> Result<(), anyhow::Error> {
1220            let mut vals = self.0.lock().unwrap();
1221            vals.1 = message;
1222            Ok(())
1223        }
1224    }
1225
1226    #[async_trait]
1227    impl Handler<OncePortHandle<bool>> for MultiActor {
1228        async fn handle(
1229            &mut self,
1230            cx: &Context<Self>,
1231            message: OncePortHandle<bool>,
1232        ) -> Result<(), anyhow::Error> {
1233            message.post(cx, true);
1234            Ok(())
1235        }
1236    }
1237
1238    #[tokio::test]
1239    async fn test_multi_handler_refs() {
1240        let test = MultiValuesTest::new().await;
1241
1242        test.send(123u64);
1243        test.send("foo".to_string());
1244        test.sync().await;
1245        assert_eq!(test.get_values(), (123u64, "foo".to_string()));
1246
1247        let myref: ActorRef<MultiActor> = test.handle.bind();
1248
1249        myref.port().post(&test.client, 321u64);
1250        test.sync().await;
1251        assert_eq!(test.get_values(), (321u64, "foo".to_string()));
1252
1253        myref.port().post(&test.client, "bar".to_string());
1254        test.sync().await;
1255        assert_eq!(test.get_values(), (321u64, "bar".to_string()));
1256    }
1257
1258    #[tokio::test]
1259    async fn test_ref_behavior() {
1260        let test = MultiValuesTest::new().await;
1261
1262        test.send(123u64);
1263        test.send("foo".to_string());
1264
1265        hyperactor::behavior!(MyActorBehavior, u64, String);
1266
1267        let myref: ActorRef<MyActorBehavior> = test.handle.bind();
1268        myref.port().post(&test.client, "biz".to_string());
1269        myref.port().post(&test.client, 999u64);
1270
1271        test.sync().await;
1272        assert_eq!(test.get_values(), (999u64, "biz".to_string()));
1273    }
1274
1275    #[tokio::test]
1276    async fn test_actor_handle_downcast() {
1277        #[derive(Debug, Default)]
1278        struct NothingActor;
1279
1280        impl Actor for NothingActor {}
1281
1282        // Just test that we can round-trip the handle through a downcast.
1283
1284        let proc = Proc::isolated();
1285        let handle = proc.spawn("nothing", NothingActor).unwrap();
1286        let cell = handle.cell();
1287
1288        // Invalid actor doesn't succeed.
1289        assert!(cell.downcast_handle::<EchoActor>().is_none());
1290
1291        let handle = cell.downcast_handle::<NothingActor>().unwrap();
1292        handle.drain_and_stop("test").unwrap();
1293        handle.await;
1294    }
1295
1296    // Returning the sequence number assigned to the message.
1297    #[derive(Debug)]
1298    #[hyperactor::export(handlers = [String, Callback])]
1299    struct GetSeqActor(PortRef<(String, SeqInfo)>);
1300
1301    #[async_trait]
1302    impl Actor for GetSeqActor {}
1303
1304    #[async_trait]
1305    impl Handler<String> for GetSeqActor {
1306        async fn handle(
1307            &mut self,
1308            cx: &Context<Self>,
1309            message: String,
1310        ) -> Result<(), anyhow::Error> {
1311            let Self(port) = self;
1312            let seq_info = cx.headers().get(SEQ_INFO).unwrap();
1313            port.post(cx, (message, seq_info.clone()));
1314            Ok(())
1315        }
1316    }
1317
1318    // Unlike Handler<String>, where the sender provides the string message
1319    // directly, in Handler<Callback>, sender needs to provide a port, and
1320    // handler will reply that port with its own callback port. Then sender can
1321    // send the string message through this callback port.
1322    #[derive(Clone, Debug, Serialize, Deserialize, Named)]
1323    struct Callback(PortRef<PortRef<String>>);
1324
1325    #[async_trait]
1326    impl Handler<Callback> for GetSeqActor {
1327        async fn handle(
1328            &mut self,
1329            cx: &Context<Self>,
1330            message: Callback,
1331        ) -> Result<(), anyhow::Error> {
1332            let (handle, mut receiver) = cx.open_port::<String>();
1333            let callback_ref = handle.bind();
1334            message.0.post(cx, callback_ref);
1335            let msg = receiver.recv().await.unwrap();
1336            self.handle(cx, msg).await
1337        }
1338    }
1339
1340    #[async_timed_test(timeout_secs = 30)]
1341    async fn test_sequencing_actor_handle_basic() {
1342        let proc = Proc::isolated();
1343        let (client, _) = proc.client("client").unwrap();
1344        let (tx, mut rx) = client.open_port();
1345
1346        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1347
1348        // Verify that unbound handle can send message.
1349        actor_handle.post(&client, "unbound".to_string());
1350        assert_eq!(
1351            rx.recv().await.unwrap(),
1352            ("unbound".to_string(), SeqInfo::Direct)
1353        );
1354
1355        let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1356
1357        let session_id = client.sequencer().session_id();
1358        let mut expected_seq = 0;
1359        // Interleave messages sent through the handle and the reference.
1360        for m in 0..10 {
1361            actor_handle.post(&client, format!("{m}"));
1362            expected_seq += 1;
1363            assert_eq!(
1364                rx.recv().await.unwrap(),
1365                (
1366                    format!("{m}"),
1367                    SeqInfo::Session {
1368                        session_id,
1369                        seq: expected_seq,
1370                    }
1371                )
1372            );
1373
1374            for n in 0..2 {
1375                actor_ref.port().post(&client, format!("{m}-{n}"));
1376                expected_seq += 1;
1377                assert_eq!(
1378                    rx.recv().await.unwrap(),
1379                    (
1380                        format!("{m}-{n}"),
1381                        SeqInfo::Session {
1382                            session_id,
1383                            seq: expected_seq,
1384                        }
1385                    )
1386                );
1387            }
1388        }
1389    }
1390
1391    // Test that handler ports share a sequence while non-handler ports get their own.
1392    #[async_timed_test(timeout_secs = 30)]
1393    async fn test_sequencing_mixed_handler_and_non_handler_ports() {
1394        let proc = Proc::isolated();
1395        let (client, _) = proc.client("client").unwrap();
1396
1397        // Port for receiving seq info from actor handler
1398        let (actor_tx, mut actor_rx) = client.open_port();
1399
1400        // Channel for receiving seq info from non-handler port
1401        let (non_handler_tx, mut non_handler_rx) = mpsc::unbounded_channel::<Option<SeqInfo>>();
1402
1403        let actor_handle = proc.spawn("get_seq", GetSeqActor(actor_tx.bind())).unwrap();
1404        let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1405
1406        // Create a non-handler port using open_enqueue_port
1407        let non_handler_tx_clone = non_handler_tx.clone();
1408        let non_handler_port_handle =
1409            client
1410                .mailbox()
1411                .open_enqueue_port(move |headers: Flattrs, _m: ()| {
1412                    let seq_info = headers.get(SEQ_INFO);
1413                    non_handler_tx_clone.send(seq_info).unwrap();
1414                    Ok(())
1415                });
1416
1417        // Bind the port to get a port ID
1418        non_handler_port_handle.bind();
1419        let non_handler_port_id = match non_handler_port_handle.location() {
1420            PortLocation::Bound(port_id) => port_id,
1421            _ => panic!("port_handle should be bound"),
1422        };
1423        assert!(!non_handler_port_id.is_handler_port());
1424
1425        let session_id = client.sequencer().session_id();
1426
1427        // Send to handler ports via ActorHandle - seq 1
1428        actor_handle.post(&client, "msg1".to_string());
1429        assert_eq!(
1430            actor_rx.recv().await.unwrap().1,
1431            SeqInfo::Session { session_id, seq: 1 }
1432        );
1433
1434        // Send to handler ports via ActorRef - seq 2 (shared with ActorHandle)
1435        actor_ref.port().post(&client, "msg2".to_string());
1436        assert_eq!(
1437            actor_rx.recv().await.unwrap().1,
1438            SeqInfo::Session { session_id, seq: 2 }
1439        );
1440
1441        // Send to non-handler port - has its own sequence starting at 1
1442        non_handler_port_handle.post(&client, ());
1443        assert_eq!(
1444            non_handler_rx.recv().await.unwrap(),
1445            Some(SeqInfo::Session { session_id, seq: 1 })
1446        );
1447
1448        // Send more to handler ports via ActorHandle - seq continues at 3
1449        actor_handle.post(&client, "msg3".to_string());
1450        assert_eq!(
1451            actor_rx.recv().await.unwrap().1,
1452            SeqInfo::Session { session_id, seq: 3 }
1453        );
1454
1455        // Send more to non-handler port - its sequence continues at 2
1456        non_handler_port_handle.post(&client, ());
1457        assert_eq!(
1458            non_handler_rx.recv().await.unwrap(),
1459            Some(SeqInfo::Session { session_id, seq: 2 })
1460        );
1461
1462        // Send via ActorRef again - seq 4
1463        actor_ref.port().post(&client, "msg4".to_string());
1464        assert_eq!(
1465            actor_rx.recv().await.unwrap().1,
1466            SeqInfo::Session { session_id, seq: 4 }
1467        );
1468
1469        actor_handle.drain_and_stop("test cleanup").unwrap();
1470        actor_handle.await;
1471    }
1472
1473    // Test that messages from different clients get independent sequence schemes.
1474    #[async_timed_test(timeout_secs = 30)]
1475    async fn test_sequencing_multiple_clients() {
1476        let proc = Proc::isolated();
1477        let (client1, _) = proc.client("client1").unwrap();
1478        let (client2, _) = proc.client("client2").unwrap();
1479
1480        // Port for receiving seq info from actor handler
1481        let (tx, mut rx) = client1.open_port();
1482
1483        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1484        let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1485
1486        // Each client should have a different session_id
1487        let session_id_1 = client1.sequencer().session_id();
1488        let session_id_2 = client2.sequencer().session_id();
1489        assert_ne!(session_id_1, session_id_2);
1490
1491        // Send from client1 via ActorHandle - seq 1 for session_id_1
1492        actor_handle.post(&client1, "c1_msg1".to_string());
1493        assert_eq!(
1494            rx.recv().await.unwrap().1,
1495            SeqInfo::Session {
1496                session_id: session_id_1,
1497                seq: 1
1498            }
1499        );
1500
1501        // Send from client2 via ActorHandle - seq 1 for session_id_2 (independent)
1502        actor_handle.post(&client2, "c2_msg1".to_string());
1503        assert_eq!(
1504            rx.recv().await.unwrap().1,
1505            SeqInfo::Session {
1506                session_id: session_id_2,
1507                seq: 1
1508            }
1509        );
1510
1511        // Send from client1 via ActorRef - seq 2 for session_id_1
1512        actor_ref.port().post(&client1, "c1_msg2".to_string());
1513        assert_eq!(
1514            rx.recv().await.unwrap().1,
1515            SeqInfo::Session {
1516                session_id: session_id_1,
1517                seq: 2
1518            }
1519        );
1520
1521        // Send from client2 via ActorRef - seq 2 for session_id_2
1522        actor_ref.port().post(&client2, "c2_msg2".to_string());
1523        assert_eq!(
1524            rx.recv().await.unwrap().1,
1525            SeqInfo::Session {
1526                session_id: session_id_2,
1527                seq: 2
1528            }
1529        );
1530
1531        // Interleave more messages to further verify independence
1532        actor_handle.post(&client1, "c1_msg3".to_string());
1533        assert_eq!(
1534            rx.recv().await.unwrap().1,
1535            SeqInfo::Session {
1536                session_id: session_id_1,
1537                seq: 3
1538            }
1539        );
1540
1541        actor_ref.port().post(&client2, "c2_msg3".to_string());
1542        assert_eq!(
1543            rx.recv().await.unwrap().1,
1544            SeqInfo::Session {
1545                session_id: session_id_2,
1546                seq: 3
1547            }
1548        );
1549
1550        actor_handle.drain_and_stop("test cleanup").unwrap();
1551        actor_handle.await;
1552    }
1553
1554    // Verify that ordering is guarranteed based on
1555    //   * (sender actor , client actor, port stream)
1556    // not
1557    //   * (sender actor, client actor)
1558    //
1559    // For "port stream",
1560    //   * handler ports of the same actor belongs to the same stream;
1561    //   * non-handler port has its independent stream.
1562    //
1563    // Specifically, in this test,
1564    //   * client sends a Callback message to dest actor's handler;
1565    //   * while dest actor is still processing that message, client sends
1566    //     another non-handler message to dest actor.
1567    //
1568    // If the ordering is based on (sender actor, client actor), this test would
1569    // hang, since dest actor is deadlock on waiting for the 2nd message while
1570    // still processing the 2nd message.
1571    //
1572    // But since port stream is also part of the ordering guarrantee, such
1573    // deadlock should not happen.
1574    #[async_timed_test(timeout_secs = 30)]
1575    async fn test_sequencing_actor_handle_callback() {
1576        let config = hyperactor_config::global::lock();
1577        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1578
1579        let proc = Proc::isolated();
1580        let (client, _) = proc.client("client").unwrap();
1581        let (tx, mut rx) = client.open_port();
1582
1583        let actor_handle = proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1584        let actor_ref: ActorRef<GetSeqActor> = actor_handle.bind();
1585
1586        let (callback_tx, mut callback_rx) = client.open_port();
1587        // Client sends the 1st message
1588        actor_ref.post(&client, Callback(callback_tx.bind()));
1589        let msg_port_ref = callback_rx.recv().await.unwrap();
1590        // client sends the 2nd message. At this time, GetSeqActor is still
1591        // processing the 1st message, and waiting for the 2nd message.
1592        msg_port_ref.post(&client, "finally".to_string());
1593
1594        let session_id = client.sequencer().session_id();
1595        // passing this assert means GetSeqActor processed the 2nd message.
1596        assert_eq!(
1597            rx.recv().await.unwrap(),
1598            (
1599                "finally".to_string(),
1600                SeqInfo::Session { session_id, seq: 1 }
1601            )
1602        );
1603    }
1604
1605    // Adding a delay before sending the destination proc. Useful for tests
1606    // requiring latency injection.
1607    #[derive(Clone, Debug)]
1608    struct DelayedMailboxSender {
1609        relay_tx: mpsc::UnboundedSender<MessageEnvelope>,
1610    }
1611
1612    impl DelayedMailboxSender {
1613        // Use a random latency between 0 and 1 second if the plan is empty.
1614        fn new(
1615            // The proc that hosts the dest actor. By posting envelope to this
1616            // proc, this proc will route that evenlope to the dest actor.
1617            dest_proc: Proc,
1618            // Vec index is the message seq - 1, value is the order this message
1619            // would be relayed to the dest actor. Endpoint actor is responsible to
1620            // ensure itself processes these messages in order.
1621            relay_orders: Vec<usize>,
1622        ) -> Self {
1623            let (relay_tx, mut relay_rx) = mpsc::unbounded_channel::<MessageEnvelope>();
1624
1625            tokio::spawn(async move {
1626                let mut buffer = Vec::new();
1627
1628                for _ in 0..relay_orders.len() {
1629                    let envelope = relay_rx.recv().await.unwrap();
1630                    buffer.push(envelope);
1631                }
1632
1633                for m in buffer.clone() {
1634                    let seq = match m.headers().get(SEQ_INFO).expect("seq should be set") {
1635                        SeqInfo::Session { seq, .. } => seq as usize,
1636                        SeqInfo::Direct => panic!("expected Session variant"),
1637                    };
1638                    // seq no is one-based.
1639                    let order = relay_orders[seq - 1];
1640                    buffer[order] = m;
1641                }
1642
1643                let dest_proc_clone = dest_proc.clone();
1644                for msg in buffer {
1645                    dest_proc_clone.post(msg, monitored_return_handle());
1646                }
1647            });
1648
1649            Self { relay_tx }
1650        }
1651    }
1652
1653    #[async_trait]
1654    impl MailboxSender for DelayedMailboxSender {
1655        fn post_unchecked(
1656            &self,
1657            envelope: MessageEnvelope,
1658            _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1659        ) {
1660            self.relay_tx.send(envelope).unwrap();
1661        }
1662    }
1663
1664    async fn assert_out_of_order_delivery(expected: Vec<(String, u64)>, relay_orders: Vec<usize>) {
1665        let local_proc: Proc = Proc::isolated();
1666        let (client, _) = local_proc.client("local").unwrap();
1667        let (tx, mut rx) = client.open_port();
1668
1669        let handle = local_proc.spawn("get_seq", GetSeqActor(tx.bind())).unwrap();
1670        let actor_ref: ActorRef<GetSeqActor> = handle.bind();
1671
1672        let remote_proc = Proc::configured(
1673            test_proc_id("remote_0"),
1674            DelayedMailboxSender::new(local_proc.clone(), relay_orders).boxed(),
1675        );
1676        let (remote_client, _) = remote_proc.client("remote").unwrap();
1677        // Send the messages out in the order of their expected sequence numbers.
1678        let mut messages = expected.clone();
1679        messages.sort_by_key(|v| v.1);
1680        for (message, _seq) in messages {
1681            actor_ref.post(&remote_client, message);
1682        }
1683        let session_id = remote_client.sequencer().session_id();
1684        for expect in expected {
1685            let expected = (
1686                expect.0,
1687                SeqInfo::Session {
1688                    session_id,
1689                    seq: expect.1,
1690                },
1691            );
1692            assert_eq!(rx.recv().await.unwrap(), expected);
1693        }
1694
1695        handle.drain_and_stop("test cleanup").unwrap();
1696        handle.await;
1697    }
1698
1699    // Send several messages, use DelayedMailboxSender and the relay orders to
1700    // ensure these messages will arrive at handler's workq out-of-order.
1701    // Then verify the actor handler will still process these messages based on
1702    // their sending order if reordering buffer is enabled.
1703    #[async_timed_test(timeout_secs = 30)]
1704    async fn test_sequencing_actor_ref_known_delivery_order() {
1705        let config = hyperactor_config::global::lock();
1706
1707        // relay order is second, third, first
1708        let relay_orders = vec![2, 0, 1];
1709
1710        // By disabling the actor side re-ordering buffer, the mssages will
1711        // be processed in the same order as they sent out.
1712        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, false);
1713        assert_out_of_order_delivery(
1714            vec![
1715                ("second".to_string(), 2),
1716                ("third".to_string(), 3),
1717                ("first".to_string(), 1),
1718            ],
1719            relay_orders.clone(),
1720        )
1721        .await;
1722
1723        // By enabling the actor side re-ordering buffer, the mssages will
1724        // be re-ordered before being processed.
1725        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1726        assert_out_of_order_delivery(
1727            vec![
1728                ("first".to_string(), 1),
1729                ("second".to_string(), 2),
1730                ("third".to_string(), 3),
1731            ],
1732            relay_orders.clone(),
1733        )
1734        .await;
1735    }
1736
1737    // Send a large nubmer of messages, use DelayedMailboxSender to ensure these
1738    // messages will arrive at handler's workq in a random order. Then verify the
1739    // actor handler will still process these messages based on their sending
1740    // order with reordering buffer enabled.
1741    #[async_timed_test(timeout_secs = 30)]
1742    async fn test_sequencing_actor_ref_random_delivery_order() {
1743        let config = hyperactor_config::global::lock();
1744
1745        // By enabling the actor side re-ordering buffer, the mssages will
1746        // be re-ordered before being processed.
1747        let _guard = config.override_key(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER, true);
1748        let expected = (0..10000)
1749            .map(|i| (format!("msg{i}"), i + 1))
1750            .collect::<Vec<_>>();
1751
1752        let mut relay_orders: Vec<usize> = (0..10000).collect();
1753        relay_orders.shuffle(&mut rand::rng());
1754        assert_out_of_order_delivery(expected, relay_orders).await;
1755    }
1756
1757    /// Verifies the default blanket introspection handler for a plain
1758    /// actor.
1759    ///
1760    /// This test spawns a simple `EchoActor`, sends it
1761    /// `IntrospectMessage::Query`, and checks that the returned
1762    /// `IntrospectResult` matches the framework’s structural default:
1763    ///
1764    /// - `identity` matches the actor id
1765    /// - `attrs` contains actor-runtime keys (status, actor_type, etc.)
1766    /// - no supervision children are reported
1767    /// - `supervisor` is None because this actor is spawned as a
1768    ///   root/top-level actor in the proc (only supervised child actors
1769    ///   report a supervisor id).
1770    ///
1771    /// This exercises the end-to-end introspect task path rather than
1772    /// calling `live_actor_payload` directly, ensuring the runtime
1773    /// wiring behaves as expected.
1774    #[tokio::test]
1775    async fn test_introspect_query_default_payload() {
1776        let proc = Proc::isolated();
1777        let (client, _) = proc.client("client").unwrap();
1778        let (tx, _rx) = client.open_port::<u64>();
1779        let actor = EchoActor(tx.bind());
1780        let handle = proc.spawn::<EchoActor>("echo_introspect", actor).unwrap();
1781
1782        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1783        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
1784            &client,
1785            IntrospectMessage::Query {
1786                view: IntrospectView::Actor,
1787                reply: reply_port.bind(),
1788            },
1789        );
1790        let payload = reply_rx.recv().await.unwrap();
1791
1792        assert_eq!(
1793            payload.identity,
1794            crate::introspect::IntrospectRef::Actor(handle.actor_addr().clone())
1795        );
1796        assert_valid_attrs(&payload);
1797        assert_has_attr(&payload, "status");
1798        assert_has_attr(&payload, "actor_type");
1799        assert_has_attr(&payload, "created_at");
1800        assert!(payload.children.is_empty());
1801        assert!(payload.parent.is_none());
1802
1803        handle.drain_and_stop("test").unwrap();
1804        handle.await;
1805    }
1806
1807    /// Helper: look up an attr in the attrs JSON by short name.
1808    fn attrs_get(attrs_json: &str, short_name: &str) -> Option<serde_json::Value> {
1809        use hyperactor_config::INTROSPECT;
1810        use hyperactor_config::attrs::AttrKeyInfo;
1811        let fq_name = inventory::iter::<AttrKeyInfo>()
1812            .find(|info| {
1813                info.meta
1814                    .get(INTROSPECT)
1815                    .is_some_and(|ia| ia.name == short_name)
1816            })
1817            .map(|info| info.name)?;
1818        let obj: serde_json::Value = serde_json::from_str(attrs_json).ok()?;
1819        obj.get(fq_name).cloned()
1820    }
1821
1822    /// Assert that an IntrospectResult has valid JSON attrs (IA-1).
1823    fn assert_valid_attrs(result: &IntrospectResult) {
1824        let parsed: serde_json::Value =
1825            serde_json::from_str(&result.attrs).expect("IA-1: attrs must be valid JSON");
1826        assert!(parsed.is_object(), "IA-1: attrs must be a JSON object");
1827    }
1828
1829    /// Assert the actor status attr matches expected value.
1830    fn assert_status(result: &IntrospectResult, expected: &str) {
1831        let status = attrs_get(&result.attrs, "status")
1832            .and_then(|v| v.as_str().map(String::from))
1833            .expect("attrs must contain status");
1834        assert_eq!(status, expected, "unexpected actor status");
1835    }
1836
1837    /// Assert the actor has a specific handler (or None).
1838    fn assert_handler(result: &IntrospectResult, expected: Option<&str>) {
1839        let handler =
1840            attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from));
1841        assert_eq!(handler.as_deref(), expected);
1842    }
1843
1844    /// Assert the error code attr matches expected value.
1845    fn assert_error_code(result: &IntrospectResult, expected: &str) {
1846        let code = attrs_get(&result.attrs, "error_code")
1847            .and_then(|v| v.as_str().map(String::from))
1848            .expect("attrs must contain error_code");
1849        assert_eq!(code, expected);
1850    }
1851
1852    /// Assert handler does NOT contain a substring.
1853    fn assert_handler_not_contains(result: &IntrospectResult, forbidden: &str) {
1854        if let Some(handler) =
1855            attrs_get(&result.attrs, "last_handler").and_then(|v| v.as_str().map(String::from))
1856        {
1857            assert!(
1858                !handler.contains(forbidden),
1859                "handler should not contain '{}'; got: {}",
1860                forbidden,
1861                handler
1862            );
1863        }
1864    }
1865
1866    /// Assert an attr is present by short name.
1867    fn assert_has_attr(result: &IntrospectResult, short_name: &str) {
1868        assert!(
1869            attrs_get(&result.attrs, short_name).is_some(),
1870            "attrs must contain '{}'",
1871            short_name
1872        );
1873    }
1874
1875    /// Assert status contains a substring (for non-exact checks
1876    /// like "processing" on wedged actors).
1877    fn assert_status_contains(result: &IntrospectResult, substring: &str) {
1878        let status = attrs_get(&result.attrs, "status")
1879            .and_then(|v| v.as_str().map(String::from))
1880            .expect("attrs must contain status");
1881        assert!(
1882            status.contains(substring),
1883            "status should contain '{}'; got: {}",
1884            substring,
1885            status
1886        );
1887    }
1888
1889    /// Assert no status_reason attr (IA-3: non-terminal status).
1890    fn assert_no_status_reason(result: &IntrospectResult) {
1891        assert!(
1892            attrs_get(&result.attrs, "status_reason").is_none(),
1893            "IA-3: must not have status_reason"
1894        );
1895    }
1896
1897    /// Assert a handler is present (any value).
1898    fn assert_has_handler(result: &IntrospectResult) {
1899        assert!(
1900            attrs_get(&result.attrs, "last_handler").is_some(),
1901            "must have a handler"
1902        );
1903    }
1904
1905    /// Assert no failure attrs are present (IA-4).
1906    fn assert_no_failure_attrs(result: &IntrospectResult) {
1907        assert!(
1908            attrs_get(&result.attrs, "failure_error_message").is_none(),
1909            "IA-4: must not have failure attrs"
1910        );
1911    }
1912
1913    /// Establishes IA-1 (attrs-json), IA-3 (status-shape), and
1914    /// IA-4 (failure-shape) for the running-actor path only.
1915    /// Stopped/failed paths need separate tests (see proc.rs
1916    /// terminated snapshot tests).
1917    #[tokio::test]
1918    async fn test_ia1_ia4_running_actor_attrs() {
1919        let proc = Proc::isolated();
1920        let (client, _) = proc.client("client").unwrap();
1921        let (tx, _rx) = client.open_port::<u64>();
1922        let actor = EchoActor(tx.bind());
1923        let handle = proc.spawn::<EchoActor>("ia_test", actor).unwrap();
1924
1925        let payload = crate::introspect::live_actor_payload(handle.cell());
1926
1927        // IA-1: valid JSON.
1928        assert_valid_attrs(&payload);
1929
1930        // IA-3: non-terminal status, no status_reason.
1931        assert_has_attr(&payload, "status");
1932        assert_no_status_reason(&payload);
1933
1934        // IA-4: no failure attrs.
1935        assert_no_failure_attrs(&payload);
1936
1937        handle.drain_and_stop("test").unwrap();
1938        handle.await;
1939    }
1940
1941    // Verifies that QueryChild returns an error for actors without
1942    // a registered query_child_handler callback. The runtime
1943    // introspect task responds with the error sentinel payload
1944    // (`identity == ""`, error attrs with code "not_found",
1945    // .. }`).
1946    #[tokio::test]
1947    async fn test_introspect_query_child_not_found() {
1948        let proc = Proc::isolated();
1949        let (client, _) = proc.client("client").unwrap();
1950        let (tx, _rx) = client.open_port::<u64>();
1951        let actor = EchoActor(tx.bind());
1952        let handle = proc.spawn::<EchoActor>("echo_qc", actor).unwrap();
1953
1954        let child_ref = crate::Addr::Actor(test_proc_id("nonexistent").actor_addr("child"));
1955        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1956        PortRef::<IntrospectMessage>::attest_handler_port(handle.actor_addr()).post(
1957            &client,
1958            IntrospectMessage::QueryChild {
1959                child_ref,
1960                reply: reply_port.bind(),
1961            },
1962        );
1963        let payload = reply_rx.recv().await.unwrap();
1964
1965        assert_eq!(
1966            payload.identity,
1967            crate::introspect::IntrospectRef::Actor(
1968                test_proc_id("nonexistent").actor_addr("child")
1969            )
1970        );
1971        assert_error_code(&payload, "not_found");
1972
1973        handle.drain_and_stop("test").unwrap();
1974        handle.await;
1975    }
1976
1977    // Verifies that with the runtime introspect task, custom
1978    // `handle_introspect` overrides are not called. The runtime
1979    // task intercepts IntrospectMessage before it reaches the
1980    // actor's work queue. An actor with an override still gets
1981    // standard Actor properties from the runtime task.
1982    #[tokio::test]
1983    async fn test_introspect_override() {
1984        #[derive(Debug, Default)]
1985        #[hyperactor::export(handlers = [])]
1986        struct CustomIntrospectActor;
1987
1988        #[async_trait]
1989        impl Actor for CustomIntrospectActor {}
1990
1991        let proc = Proc::isolated();
1992        let (client, _) = proc.client("client").unwrap();
1993        let handle = proc
1994            .spawn("custom_introspect", CustomIntrospectActor)
1995            .unwrap();
1996
1997        handle
1998            .status()
1999            .wait_for(|s| matches!(s, ActorStatus::Idle))
2000            .await
2001            .unwrap();
2002
2003        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2004        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2005            &client,
2006            IntrospectMessage::Query {
2007                view: IntrospectView::Actor,
2008                reply: reply_port.bind(),
2009            },
2010        );
2011        let payload = reply_rx.recv().await.unwrap();
2012
2013        // The runtime task returns actor attrs (with status), NOT
2014        // the override's Host properties.
2015        assert_has_attr(&payload, "status");
2016
2017        handle.drain_and_stop("test").unwrap();
2018        handle.await;
2019    }
2020
2021    /// Verifies that a child actor spawned via `spawn_child` reports
2022    /// its parent as `supervisor` in the introspection payload, and
2023    /// that the parent's payload lists the child in `children`.
2024    #[tokio::test]
2025    async fn test_introspect_query_supervision_child() {
2026        let proc = Proc::isolated();
2027        let (client, _) = proc.client("client").unwrap();
2028
2029        // Spawn parent.
2030        let (tx_parent, _rx_parent) = client.open_port::<u64>();
2031        let parent_handle = proc
2032            .spawn::<EchoActor>("parent", EchoActor(tx_parent.bind()))
2033            .unwrap();
2034
2035        // Spawn child under parent.
2036        let (tx_child, _rx_child) = client.open_port::<u64>();
2037        let child_handle = proc
2038            .spawn_child::<EchoActor>(parent_handle.cell().clone(), EchoActor(tx_child.bind()))
2039            .unwrap();
2040
2041        // Query the child — supervisor should be the parent.
2042        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2043        PortRef::<IntrospectMessage>::attest_handler_port(&child_handle.actor_addr().clone()).post(
2044            &client,
2045            IntrospectMessage::Query {
2046                view: IntrospectView::Actor,
2047                reply: reply_port.bind(),
2048            },
2049        );
2050        let child_payload = reply_rx.recv().await.unwrap();
2051
2052        assert_eq!(
2053            child_payload.identity,
2054            crate::introspect::IntrospectRef::Actor(child_handle.actor_addr().clone()),
2055        );
2056        // Verify it has actor attrs (status present).
2057        assert!(
2058            attrs_get(&child_payload.attrs, "status").is_some(),
2059            "child should have actor attrs"
2060        );
2061        assert_eq!(
2062            child_payload.parent,
2063            Some(crate::introspect::IntrospectRef::Actor(
2064                parent_handle.actor_addr().clone()
2065            )),
2066        );
2067
2068        // Query the parent — children should include the child.
2069        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2070        PortRef::<IntrospectMessage>::attest_handler_port(&parent_handle.actor_addr().clone())
2071            .post(
2072                &client,
2073                IntrospectMessage::Query {
2074                    view: IntrospectView::Actor,
2075                    reply: reply_port.bind(),
2076                },
2077            );
2078        let parent_payload = reply_rx.recv().await.unwrap();
2079
2080        assert!(parent_payload.parent.is_none());
2081        assert!(
2082            parent_payload
2083                .children
2084                .contains(&crate::introspect::IntrospectRef::Actor(
2085                    child_handle.actor_addr().clone()
2086                )),
2087        );
2088
2089        child_handle.drain_and_stop("test").unwrap();
2090        child_handle.await;
2091        parent_handle.drain_and_stop("test").unwrap();
2092        parent_handle.await;
2093    }
2094
2095    /// A freshly spawned actor that has received no user messages
2096    /// reports `last_message_handler == None` — the introspect
2097    /// handler does not leak through. Status is `"idle"` once
2098    /// initialization completes.
2099    #[tokio::test]
2100    async fn test_introspect_fresh_actor_status() {
2101        let proc = Proc::isolated();
2102        let (client, _) = proc.client("client").unwrap();
2103        let (tx, _rx) = client.open_port::<u64>();
2104        let actor = EchoActor(tx.bind());
2105        let handle = proc.spawn::<EchoActor>("echo_fresh", actor).unwrap();
2106
2107        // Wait for the actor to finish initialization.
2108        handle
2109            .status()
2110            .wait_for(|s| matches!(s, ActorStatus::Idle))
2111            .await
2112            .unwrap();
2113
2114        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2115        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2116            &client,
2117            IntrospectMessage::Query {
2118                view: IntrospectView::Actor,
2119                reply: reply_port.bind(),
2120            },
2121        );
2122        let payload = reply_rx.recv().await.unwrap();
2123
2124        assert_status(&payload, "idle");
2125        assert_handler(&payload, None);
2126
2127        handle.drain_and_stop("test").unwrap();
2128        handle.await;
2129    }
2130
2131    /// After processing a user message, the introspect payload reports
2132    /// the user message's handler and post-completion status — not
2133    /// the introspect handler itself (one-behind invariant,
2134    /// after-user-traffic case).
2135    #[tokio::test]
2136    async fn test_introspect_after_user_message() {
2137        let proc = Proc::isolated();
2138        let (client, _) = proc.client("client").unwrap();
2139        let (tx, mut rx) = client.open_port::<u64>();
2140        let actor = EchoActor(tx.bind());
2141        let handle = proc.spawn::<EchoActor>("echo_after_msg", actor).unwrap();
2142
2143        // Send a user message and wait for it to be processed.
2144        handle.post(&client, 42u64);
2145        let _ = rx.recv().await.unwrap();
2146
2147        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2148        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2149            &client,
2150            IntrospectMessage::Query {
2151                view: IntrospectView::Actor,
2152                reply: reply_port.bind(),
2153            },
2154        );
2155        let payload = reply_rx.recv().await.unwrap();
2156
2157        assert_status(&payload, "idle");
2158        assert_has_handler(&payload);
2159        assert_handler_not_contains(&payload, "IntrospectMessage");
2160
2161        handle.drain_and_stop("test").unwrap();
2162        handle.await;
2163    }
2164
2165    /// Two consecutive introspect queries: with the runtime
2166    /// introspect task, neither perturbs the actor's state (S2).
2167    /// Both report the same `last_message_handler` for a fresh
2168    /// actor — `None`, not `IntrospectMessage`.
2169    #[tokio::test]
2170    async fn test_introspect_consecutive_queries() {
2171        let proc = Proc::isolated();
2172        let (client, _) = proc.client("client").unwrap();
2173        let (tx, _rx) = client.open_port::<u64>();
2174        let actor = EchoActor(tx.bind());
2175        let handle = proc.spawn::<EchoActor>("echo_consec", actor).unwrap();
2176
2177        handle
2178            .status()
2179            .wait_for(|s| matches!(s, ActorStatus::Idle))
2180            .await
2181            .unwrap();
2182
2183        // First introspect query.
2184        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2185        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2186            &client,
2187            IntrospectMessage::Query {
2188                view: IntrospectView::Actor,
2189                reply: reply_port.bind(),
2190            },
2191        );
2192        let payload1 = reply_rx.recv().await.unwrap();
2193
2194        // Second introspect query.
2195        let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2196        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2197            &client,
2198            IntrospectMessage::Query {
2199                view: IntrospectView::Actor,
2200                reply: reply_port2.bind(),
2201            },
2202        );
2203        let payload2 = reply_rx2.recv().await.unwrap();
2204
2205        // Neither should show IntrospectMessage as the handler.
2206        assert_handler(&payload1, None);
2207        assert_handler(&payload2, None);
2208
2209        handle.drain_and_stop("test").unwrap();
2210        handle.await;
2211    }
2212
2213    // test_published_properties_round_trip removed — replaced by
2214    // test_publish_attrs_round_trip which tests the Attrs-based API.
2215
2216    /// Verify InstanceCell Attrs storage: `set_published_attrs`
2217    /// replaces the whole bag, `merge_published_attr` merges a single
2218    /// key incrementally. (Instance methods are thin wrappers over
2219    /// these.)
2220    #[tokio::test]
2221    async fn test_publish_attrs_round_trip() {
2222        use hyperactor_config::Attrs;
2223        use hyperactor_config::declare_attrs;
2224
2225        declare_attrs! {
2226            attr TEST_KEY_A: String;
2227            attr TEST_KEY_B: u64;
2228        }
2229
2230        let proc = Proc::isolated();
2231        let (client, _) = proc.client("client").unwrap();
2232        let (tx, _rx) = client.open_port::<u64>();
2233        let actor = EchoActor(tx.bind());
2234        let handle = proc.spawn::<EchoActor>("echo_attrs", actor).unwrap();
2235
2236        // Before publishing, attrs are None.
2237        assert!(handle.cell().published_attrs().is_none());
2238
2239        // publish_attrs: replace entire bag.
2240        let mut attrs = Attrs::new();
2241        attrs.set(TEST_KEY_A, "hello".to_string());
2242        handle.cell().set_published_attrs(attrs);
2243        let published = handle.cell().published_attrs().unwrap();
2244        assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2245
2246        // publish_attr: merge single key into existing bag.
2247        handle.cell().merge_published_attr(TEST_KEY_B, 42u64);
2248        let published = handle.cell().published_attrs().unwrap();
2249        assert_eq!(published.get(TEST_KEY_A), Some(&"hello".to_string()));
2250        assert_eq!(published.get(TEST_KEY_B), Some(&42u64));
2251
2252        // publish_attr: overwrite existing key.
2253        handle
2254            .cell()
2255            .merge_published_attr(TEST_KEY_A, "world".to_string());
2256        let published = handle.cell().published_attrs().unwrap();
2257        assert_eq!(published.get(TEST_KEY_A), Some(&"world".to_string()));
2258
2259        handle.drain_and_stop("test").unwrap();
2260        handle.await;
2261    }
2262
2263    /// Verify the query_child_handler callback: register a callback,
2264    /// invoke it via `query_child()`, and confirm the response.
2265    #[tokio::test]
2266    async fn test_query_child_handler_round_trip() {
2267        let proc = Proc::isolated();
2268        let (client, _) = proc.client("client").unwrap();
2269        let (tx, _rx) = client.open_port::<u64>();
2270        let actor = EchoActor(tx.bind());
2271        let handle = proc.spawn::<EchoActor>("echo_qch", actor).unwrap();
2272
2273        // Before registering, query_child returns None.
2274        let test_ref = Addr::Actor(test_proc_id("test").actor_addr("child"));
2275        assert!(handle.cell().query_child(&test_ref).is_none());
2276
2277        // Register a callback.
2278        handle.cell().set_query_child_handler(|child_ref| {
2279            use crate::introspect::IntrospectRef;
2280            let identity = match child_ref {
2281                Addr::Proc(p) => IntrospectRef::Proc(p.clone()),
2282                Addr::Actor(a) => IntrospectRef::Actor(a.clone()),
2283                Addr::Port(p) => IntrospectRef::Actor(p.actor_addr()),
2284            };
2285            IntrospectResult {
2286                identity,
2287                attrs: serde_json::json!({
2288                    "proc_name": "test_proc",
2289                    "num_actors": 42,
2290                })
2291                .to_string(),
2292                children: Vec::new(),
2293                parent: None,
2294                as_of: std::time::SystemTime::now(),
2295            }
2296        });
2297
2298        // Now query_child returns the callback's response.
2299        let payload = handle
2300            .cell()
2301            .query_child(&test_ref)
2302            .expect("callback should produce a payload");
2303        assert_eq!(
2304            payload.identity,
2305            crate::introspect::IntrospectRef::Actor(test_proc_id("test").actor_addr("child"))
2306        );
2307        let attrs: serde_json::Value =
2308            serde_json::from_str(&payload.attrs).expect("attrs must be valid JSON");
2309        assert_eq!(
2310            attrs.get("proc_name").and_then(|v| v.as_str()),
2311            Some("test_proc")
2312        );
2313        assert_eq!(attrs.get("num_actors").and_then(|v| v.as_u64()), Some(42));
2314
2315        handle.drain_and_stop("test").unwrap();
2316        handle.await;
2317    }
2318
2319    /// Exercises S1 (see `introspect` module doc).
2320    ///
2321    /// Sends a wedging message, then queries introspect while the
2322    /// actor is blocked. The response must arrive and report live
2323    /// processing status.
2324    #[tokio::test]
2325    async fn test_introspect_wedged() {
2326        #[derive(Debug, Default)]
2327        #[hyperactor::export(handlers = [u64])]
2328        struct WedgedActor;
2329
2330        #[async_trait]
2331        impl Actor for WedgedActor {}
2332
2333        #[async_trait]
2334        impl Handler<u64> for WedgedActor {
2335            async fn handle(
2336                &mut self,
2337                _cx: &Context<Self>,
2338                _message: u64,
2339            ) -> Result<(), anyhow::Error> {
2340                // Block forever.
2341                std::future::pending::<()>().await;
2342                Ok(())
2343            }
2344        }
2345
2346        let proc = Proc::isolated();
2347        let (client, _) = proc.client("client").unwrap();
2348        let handle = proc.spawn("wedged", WedgedActor).unwrap();
2349
2350        // Wait for idle before sending the wedging message.
2351        handle
2352            .status()
2353            .wait_for(|s| matches!(s, ActorStatus::Idle))
2354            .await
2355            .unwrap();
2356
2357        // Send a u64 to wedge the actor in its handler.
2358        handle.post(&client, 1u64);
2359
2360        // Wait for the handler to start blocking.
2361        tokio::time::sleep(Duration::from_millis(50)).await;
2362
2363        // Send introspect query via the dedicated introspect port.
2364        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2365        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2366            &client,
2367            IntrospectMessage::Query {
2368                view: IntrospectView::Actor,
2369                reply: reply_port.bind(),
2370            },
2371        );
2372
2373        // Must not hang — the introspect task runs independently.
2374        let payload = tokio::time::timeout(Duration::from_secs(5), reply_rx.recv())
2375            .await
2376            .expect("introspect should not hang on a wedged actor")
2377            .unwrap();
2378
2379        assert_status_contains(&payload, "processing");
2380        assert_handler_not_contains(&payload, "IntrospectMessage");
2381    }
2382
2383    /// Exercises S2 (see `introspect` module doc).
2384    ///
2385    /// After a user message, two consecutive introspect queries both
2386    /// report the user message handler.
2387    #[tokio::test]
2388    async fn test_introspect_no_perturbation() {
2389        let proc = Proc::isolated();
2390        let (client, _) = proc.client("client").unwrap();
2391        let (tx, mut rx) = client.open_port::<u64>();
2392        let actor = EchoActor(tx.bind());
2393        let handle = proc.spawn::<EchoActor>("echo_no_perturb", actor).unwrap();
2394
2395        // Wait for idle before sending the user message.
2396        handle
2397            .status()
2398            .wait_for(|s| matches!(s, ActorStatus::Idle))
2399            .await
2400            .unwrap();
2401
2402        // Send a user message and wait for it to be processed.
2403        handle.post(&client, 42u64);
2404        let _ = rx.recv().await.unwrap();
2405
2406        // First introspect query.
2407        let (reply_port1, reply_rx1) = client.open_once_port::<IntrospectResult>();
2408        PortRef::<IntrospectMessage>::attest_handler_port(&handle.actor_addr().clone()).post(
2409            &client,
2410            IntrospectMessage::Query {
2411                view: IntrospectView::Actor,
2412                reply: reply_port1.bind(),
2413            },
2414        );
2415        let payload1 = reply_rx1.recv().await.unwrap();
2416
2417        // Second introspect query.
2418        let (reply_port2, reply_rx2) = client.open_once_port::<IntrospectResult>();
2419        crate::PortRef::<IntrospectMessage>::attest_handler_port(handle.actor_addr()).post(
2420            &client,
2421            IntrospectMessage::Query {
2422                view: IntrospectView::Actor,
2423                reply: reply_port2.bind(),
2424            },
2425        );
2426        let payload2 = reply_rx2.recv().await.unwrap();
2427
2428        // Both should report the user message handler, not IntrospectMessage.
2429        assert_handler_not_contains(&payload1, "IntrospectMessage");
2430        assert_handler_not_contains(&payload2, "IntrospectMessage");
2431        // Consecutive queries must agree (compare parsed, not raw
2432        // strings — HashMap key ordering is non-deterministic).
2433        let attrs1: serde_json::Value = serde_json::from_str(&payload1.attrs).unwrap();
2434        let attrs2: serde_json::Value = serde_json::from_str(&payload2.attrs).unwrap();
2435        assert_eq!(attrs1, attrs2, "consecutive queries should be identical");
2436
2437        handle.drain_and_stop("test").unwrap();
2438        handle.await;
2439    }
2440
2441    /// Exercises CI-1 (see `proc` module doc).
2442    ///
2443    /// Unlike a plain `client()`, which drops the introspect
2444    /// receiver so queries are silently discarded, an
2445    /// `introspectable_instance` has a live `serve_introspect` task
2446    /// and is fully navigable in admin tooling.
2447    #[tokio::test]
2448    async fn test_introspectable_instance_responds_to_query() {
2449        let proc = Proc::isolated();
2450        let (bridge, handle) = proc.introspectable_instance("bridge").unwrap();
2451        let actor_id: crate::ActorAddr = handle.actor_addr().clone();
2452
2453        let (reply_port, reply_rx) = bridge.open_once_port::<IntrospectResult>();
2454        PortRef::<IntrospectMessage>::attest_handler_port(&actor_id).post(
2455            &bridge,
2456            IntrospectMessage::Query {
2457                view: IntrospectView::Actor,
2458                reply: reply_port.bind(),
2459            },
2460        );
2461        let payload = reply_rx.recv().await.unwrap();
2462
2463        // CI-1: introspectable_instance reports status "client"
2464        // and actor_type "()" (the unit type).
2465        assert_eq!(
2466            payload.identity,
2467            crate::introspect::IntrospectRef::Actor(actor_id.clone())
2468        );
2469        assert_status(&payload, "client");
2470        let actor_type = attrs_get(&payload.attrs, "actor_type")
2471            .and_then(|v| v.as_str().map(String::from))
2472            .expect("must have actor_type");
2473        assert_eq!(actor_type, "()", "CI-1: actor_type must be \"()\"");
2474    }
2475
2476    /// Contrast with CI-1: a plain `client()` does NOT respond to
2477    /// `IntrospectMessage::Query`. Its introspect receiver is dropped
2478    /// in `Proc::client()`, so the message is silently discarded
2479    /// and the reply port never receives a value.
2480    ///
2481    /// Callers that need TUI visibility must use
2482    /// `introspectable_instance` instead.
2483    #[tokio::test]
2484    async fn test_instance_does_not_respond_to_query() {
2485        let proc = Proc::isolated();
2486        let (client, _client_handle) = proc.client("client").unwrap();
2487        let (_mailbox, mailbox_handle) = proc.client("mailbox").unwrap();
2488        let mailbox_id: crate::ActorAddr = mailbox_handle.actor_addr().clone();
2489
2490        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
2491        PortRef::<IntrospectMessage>::attest_handler_port(&mailbox_id).post(
2492            &client,
2493            IntrospectMessage::Query {
2494                view: IntrospectView::Actor,
2495                reply: reply_port.bind(),
2496            },
2497        );
2498
2499        // The introspect receiver was dropped in `client()`, so the
2500        // message is silently discarded and the reply never arrives.
2501        let result = tokio::time::timeout(Duration::from_millis(100), reply_rx.recv()).await;
2502        assert!(
2503            result.is_err(),
2504            "client() must not respond to IntrospectMessage (introspect receiver dropped)"
2505        );
2506    }
2507
2508    /// Exercises CI-2 (see `proc` module doc).
2509    ///
2510    /// Dropping the instance transitions status to terminal,
2511    /// causing `serve_introspect` to store a terminated snapshot.
2512    #[tokio::test]
2513    async fn test_introspectable_instance_snapshot_on_drop() {
2514        let proc = Proc::isolated();
2515        let (instance, handle) = proc.introspectable_instance("bridge").unwrap();
2516        let actor_id = handle.actor_addr().clone();
2517
2518        assert!(
2519            proc.all_actor_ids().contains(&actor_id),
2520            "should appear in all_actor_ids while live"
2521        );
2522
2523        // Dropping `instance` transitions status to Stopped, waking
2524        // the serve_introspect task which stores the snapshot.
2525        drop(instance);
2526
2527        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2528        loop {
2529            if proc.terminated_snapshot(&actor_id).is_some() {
2530                break;
2531            }
2532            assert!(
2533                std::time::Instant::now() < deadline,
2534                "timed out waiting for terminated snapshot"
2535            );
2536            tokio::task::yield_now().await;
2537        }
2538
2539        let snapshot = proc.terminated_snapshot(&actor_id).unwrap();
2540        let actor_status = attrs_get(&snapshot.attrs, "status")
2541            .and_then(|v| v.as_str().map(String::from))
2542            .expect("snapshot attrs must contain status");
2543        assert!(
2544            actor_status.starts_with("stopped"),
2545            "CI-2: snapshot actor_status should be stopped, got: {}",
2546            actor_status
2547        );
2548    }
2549}