hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! [`Proc`] is an addressable actor-runtime boundary.
10//!
11//! It owns actor lifecycle (spawn, run, terminate), routes messages
12//! to local actors, forwards messages for remote destinations, and
13//! hosts supervision state.
14//!
15//! It also stores bounded snapshots of terminated actors for
16//! post-mortem introspection.
17//!
18//! ## Client instance invariants (CI-*)
19//!
20//! - **CI-1 (client status):** `IntrospectMessage::Query` on an
21//!   introspectable instance returns `status: "client"` and
22//!   `actor_type: "()"` in attrs.
23//! - **CI-2 (snapshot on drop):** Dropping the returned `Instance<()>`
24//!   transitions its status to terminal, causing the introspect task
25//!   to store a terminated snapshot.
26//!
27//! ## Actor identity invariants (AI-*)
28//!
29//! - **AI-1 (named-child pid):** The pid of a named child must
30//!   remain in the parent's sibling pid domain. The name is
31//!   presentation only; the numeric pid is allocated from the
32//!   parent's counter, preserving supervision linkage.
33//! - **AI-3 (controller ActorId uniqueness):** Callers must ensure
34//!   the name is unique proc-wide. Two children with the same name
35//!   under different parents get distinct pids but the same name
36//!   prefix.
37
38use std::any::Any;
39use std::any::TypeId;
40use std::collections::HashMap;
41use std::fmt;
42use std::future::Future;
43use std::ops::Deref;
44use std::panic;
45use std::panic::AssertUnwindSafe;
46use std::panic::Location;
47use std::pin::Pin;
48use std::sync::Arc;
49use std::sync::OnceLock;
50use std::sync::RwLock;
51use std::sync::Weak;
52use std::sync::atomic::AtomicBool;
53use std::sync::atomic::AtomicU64;
54use std::sync::atomic::AtomicUsize;
55use std::sync::atomic::Ordering;
56use std::time::Duration;
57use std::time::Instant;
58use std::time::SystemTime;
59
60use async_trait::async_trait;
61use dashmap::DashMap;
62use dashmap::mapref::entry::Entry;
63use dashmap::mapref::multiple::RefMulti;
64use futures::FutureExt;
65use hyperactor_config::Flattrs;
66use hyperactor_telemetry::ActorStatusEvent;
67use hyperactor_telemetry::generate_actor_status_event_id;
68use hyperactor_telemetry::hash_to_u64;
69use hyperactor_telemetry::notify_actor_status_changed;
70use hyperactor_telemetry::notify_message;
71use hyperactor_telemetry::notify_message_status;
72use hyperactor_telemetry::recorder::Recording;
73use tokio::sync::mpsc;
74use tokio::sync::watch;
75use tokio::task::JoinHandle;
76use tracing::Instrument;
77use tracing::Span;
78use typeuri::Named;
79use uuid::Uuid;
80use wirevalue::TypeInfo;
81
82use crate as hyperactor;
83use crate::Actor;
84use crate::Handler;
85use crate::Message;
86use crate::RemoteMessage;
87use crate::actor::ActorError;
88use crate::actor::ActorErrorKind;
89use crate::actor::ActorHandle;
90use crate::actor::ActorStatus;
91use crate::actor::Binds;
92use crate::actor::HandlerInfo;
93use crate::actor::Referable;
94use crate::actor::RemoteHandles;
95use crate::actor::Signal;
96use crate::actor_local::ActorLocalStorage;
97use crate::channel;
98use crate::channel::ChannelAddr;
99use crate::channel::ChannelError;
100use crate::channel::ChannelTransport;
101use crate::config;
102use crate::context;
103use crate::context::Mailbox as _;
104use crate::introspect::IntrospectMessage;
105use crate::introspect::IntrospectResult;
106use crate::mailbox::BoxedMailboxSender;
107use crate::mailbox::DeliveryError;
108use crate::mailbox::DialMailboxRouter;
109use crate::mailbox::IntoBoxedMailboxSender as _;
110use crate::mailbox::Mailbox;
111use crate::mailbox::MailboxMuxer;
112use crate::mailbox::MailboxSender;
113use crate::mailbox::MailboxServer as _;
114use crate::mailbox::MessageEnvelope;
115use crate::mailbox::OncePortHandle;
116use crate::mailbox::OncePortReceiver;
117use crate::mailbox::PanickingMailboxSender;
118use crate::mailbox::PortHandle;
119use crate::mailbox::PortReceiver;
120use crate::mailbox::Undeliverable;
121use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
122use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
123use crate::metrics::ACTOR_MESSAGES_RECEIVED;
124use crate::ordering::OrderedSender;
125use crate::ordering::OrderedSenderError;
126use crate::ordering::SEQ_INFO;
127use crate::ordering::SeqInfo;
128use crate::ordering::Sequencer;
129use crate::ordering::ordered_channel;
130use crate::panic_handler;
131use crate::reference;
132use crate::supervision::ActorSupervisionEvent;
133
134/// This is used to mint new local ranks for [`Proc::local`].
135static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
136
137/// A proc instance is the runtime managing a single proc in Hyperactor.
138/// It is responsible for spawning actors in the proc, multiplexing messages
139/// to/within actors in the proc, and providing fallback routing to external
140/// procs.
141///
142/// Procs are also responsible for maintaining the local supervision hierarchy.
143#[derive(Clone)]
144pub struct Proc {
145    inner: Arc<ProcState>,
146}
147
148impl fmt::Debug for Proc {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("Proc")
151            .field("proc_id", &self.inner.proc_id)
152            .finish()
153    }
154}
155
156struct ProcState {
157    /// The proc's id. This should be globally unique, but is not (yet)
158    /// for local-only procs.
159    proc_id: reference::ProcId,
160
161    /// A muxer instance that has entries for every actor managed by
162    /// the proc.
163    proc_muxer: MailboxMuxer,
164
165    /// Sender used to forward messages outside of the proc.
166    forwarder: BoxedMailboxSender,
167
168    /// Per-name atomic index allocator. Used by `allocate_root_id`
169    /// (index 0, counter starts at 1) and `allocate_child_id`
170    /// (increments the parent's counter). Each root name gets its
171    /// own independent counter.
172    roots: DashMap<String, AtomicUsize>,
173
174    /// All actor instances in this proc.
175    instances: DashMap<reference::ActorId, WeakInstanceCell>,
176
177    /// Snapshots of terminated actors for post-mortem introspection.
178    /// Populated by the introspect task just before it exits on
179    /// terminal status. Bounded by
180    /// [`config::TERMINATED_SNAPSHOT_RETENTION`].
181    terminated_snapshots: DashMap<reference::ActorId, crate::introspect::IntrospectResult>,
182
183    /// Used by root actors to send events to the actor coordinating
184    /// supervision of root actors in this proc.
185    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
186
187    /// The actor ID of the supervision coordinator, if it lives on this proc.
188    /// Used to ensure the coordinator is shut down last during proc teardown.
189    supervision_coordinator_actor_id: OnceLock<reference::ActorId>,
190
191    /// Handle to the mailbox server task, if this proc was created with
192    /// `Proc::direct()` or had `serve()` called on it. Used to
193    /// gracefully stop the server and join it (flushing receive-side
194    /// acks) during shutdown.
195    mailbox_server_handle: std::sync::Mutex<Option<crate::mailbox::MailboxServerHandle>>,
196}
197
198impl Drop for ProcState {
199    fn drop(&mut self) {
200        // We only want log ProcStatus::Dropped when ProcState is dropped,
201        // rather than Proc is dropped. This is because we need to wait for
202        // Proc::inner's ref count becomes 0.
203        tracing::info!(
204            proc_id = %self.proc_id,
205            name = "ProcStatus",
206            status = "Dropped"
207        );
208    }
209}
210
211/// Structured return type for [`Proc::actor_instance`].
212///
213/// Groups the instance, handle, and per-channel receivers that an
214/// "inverted" actor caller needs to drive the actor manually.
215pub struct ActorInstance<A: Actor> {
216    /// The actor instance (used for sending/receiving messages, spawning children, etc.).
217    pub instance: Instance<A>,
218    /// Handle to the actor (used for lifecycle control and port access).
219    pub handle: ActorHandle<A>,
220    /// Supervision events delivered to this actor.
221    pub supervision: PortReceiver<ActorSupervisionEvent>,
222    /// Control signals for the actor.
223    pub signal: PortReceiver<Signal>,
224    /// Primary work queue for handler dispatch.
225    pub work: mpsc::UnboundedReceiver<WorkCell<A>>,
226}
227
228impl Proc {
229    /// Create a pre-configured proc with the given proc id and forwarder.
230    pub fn configured(proc_id: reference::ProcId, forwarder: BoxedMailboxSender) -> Self {
231        tracing::info!(
232            proc_id = %proc_id,
233            name = "ProcStatus",
234            status = "Created"
235        );
236
237        Self {
238            inner: Arc::new(ProcState {
239                proc_id,
240                proc_muxer: MailboxMuxer::new(),
241                forwarder,
242                roots: DashMap::new(),
243                instances: DashMap::new(),
244                terminated_snapshots: DashMap::new(),
245                supervision_coordinator_port: OnceLock::new(),
246                supervision_coordinator_actor_id: OnceLock::new(),
247                mailbox_server_handle: std::sync::Mutex::new(None),
248            }),
249        }
250    }
251
252    /// Create a new direct-addressed proc.
253    pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
254        let (addr, rx) = channel::serve(addr)?;
255        let proc_id = reference::ProcId::with_name(addr, name);
256        let proc = Self::configured(proc_id, DialMailboxRouter::new().into_boxed());
257        let handle = proc.clone().serve(rx);
258        *proc.inner.mailbox_server_handle.lock().unwrap() = Some(handle);
259        Ok(proc)
260    }
261
262    /// Set the supervision coordinator's port for this proc. Return Err if it is
263    /// already set.
264    pub fn set_supervision_coordinator(
265        &self,
266        port: PortHandle<ActorSupervisionEvent>,
267    ) -> Result<(), anyhow::Error> {
268        let actor_id = port.location().actor_id().clone();
269        self.state()
270            .supervision_coordinator_port
271            .set(port)
272            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))?;
273        let _ = self.state().supervision_coordinator_actor_id.set(actor_id);
274        Ok(())
275    }
276
277    /// The actor ID of the supervision coordinator, if one is set and
278    /// lives on this proc.
279    pub fn supervision_coordinator_actor_id(&self) -> Option<&reference::ActorId> {
280        self.state().supervision_coordinator_actor_id.get()
281    }
282
283    /// Handle a supervision event received by the proc. Attempt to forward it to the
284    /// supervision coordinator port if one is set, otherwise crash the process.
285    pub fn handle_unhandled_supervision_event(
286        &self,
287        cx: &impl context::Actor,
288        event: ActorSupervisionEvent,
289    ) {
290        let result = match self.state().supervision_coordinator_port.get() {
291            Some(port) => port.send(cx, event.clone()).map_err(anyhow::Error::from),
292            None => {
293                if !event.is_error() {
294                    // Normal lifecycle events (e.g. clean stop) without a coordinator
295                    // are silently dropped.
296                    return;
297                }
298                Err(anyhow::anyhow!(
299                    "coordinator port is not set for proc {}",
300                    self.proc_id(),
301                ))
302            }
303        };
304        if let Err(err) = result {
305            if !event.is_error() {
306                // Normal lifecycle events that fail to send (e.g. coordinator
307                // mailbox already closed during shutdown) are silently dropped.
308                tracing::debug!(
309                    "proc {}: dropping non-error supervision event {}: {:?}",
310                    self.proc_id(),
311                    event,
312                    err
313                );
314                return;
315            }
316            tracing::error!(
317                "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
318                self.proc_id(),
319                event,
320                err
321            );
322
323            std::process::exit(1);
324        }
325    }
326
327    /// Create a new local-only proc. This proc is not allowed to forward messages
328    /// outside of the proc itself.
329    pub fn local() -> Self {
330        let rank = NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed);
331        let addr = ChannelAddr::any(ChannelTransport::Local);
332        let proc_id = reference::ProcId::unique(addr, format!("local_{}", rank));
333        Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
334    }
335
336    /// The proc's ID.
337    pub fn proc_id(&self) -> &reference::ProcId {
338        &self.state().proc_id
339    }
340
341    /// Shared sender used by the proc to forward messages to remote
342    /// destinations.
343    pub fn forwarder(&self) -> &BoxedMailboxSender {
344        &self.inner.forwarder
345    }
346
347    /// Convenience accessor for state.
348    fn state(&self) -> &ProcState {
349        self.inner.as_ref()
350    }
351
352    /// A global runtime proc used by this crate.
353    pub(crate) fn runtime() -> &'static Proc {
354        static RUNTIME_PROC: OnceLock<Proc> = OnceLock::new();
355        RUNTIME_PROC.get_or_init(|| {
356            let addr = ChannelAddr::any(ChannelTransport::Local);
357            let proc_id = reference::ProcId::unique(addr, "hyperactor_runtime");
358            Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
359        })
360    }
361
362    /// Attach a mailbox to the proc with the provided root name.
363    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
364        let actor_id: reference::ActorId = self.allocate_root_id(name)?;
365        Ok(self.bind_mailbox(actor_id))
366    }
367
368    /// Attach a mailbox to the proc as a child actor.
369    pub fn attach_child(&self, parent_id: &reference::ActorId) -> Result<Mailbox, anyhow::Error> {
370        let actor_id: reference::ActorId = self.allocate_child_id(parent_id)?;
371        Ok(self.bind_mailbox(actor_id))
372    }
373
374    /// Bind a mailbox to the proc.
375    fn bind_mailbox(&self, actor_id: reference::ActorId) -> Mailbox {
376        let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
377
378        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
379        // by the caller. This will likely require a weak reference.
380        self.state().proc_muxer.bind_mailbox(mbox.clone());
381        mbox
382    }
383
384    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorRef`].
385    /// This is intended only for testing, and will be replaced by simpled utilities.
386    pub fn attach_actor<R, M>(
387        &self,
388        name: &str,
389    ) -> Result<(Instance<()>, reference::ActorRef<R>, PortReceiver<M>), anyhow::Error>
390    where
391        M: RemoteMessage,
392        R: Referable + RemoteHandles<M>,
393    {
394        let (instance, _handle) = self.instance(name)?;
395        let (_handle, rx) = instance.bind_actor_port::<M>();
396        let actor_ref = reference::ActorRef::attest(instance.self_id().clone());
397        Ok((instance, actor_ref, rx))
398    }
399
400    /// Spawn a named (root) actor on this proc. The name of the actor must be
401    /// unique.
402    pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
403        let actor_id = self.allocate_root_id(name)?;
404        self.spawn_inner(actor_id, actor, None)
405    }
406
407    /// Common spawn logic for both root and child actors.
408    /// Creates a tracing span with the correct actor_id before starting the actor.
409    #[hyperactor::instrument(fields(actor_id = actor_id.to_string(), actor_name = actor_id.name(), actor_type = std::any::type_name::<A>()))]
410    fn spawn_inner<A: Actor>(
411        &self,
412        actor_id: reference::ActorId,
413        actor: A,
414        parent: Option<InstanceCell>,
415    ) -> Result<ActorHandle<A>, anyhow::Error> {
416        let (instance, receivers) = Instance::new(self.clone(), actor_id, false, parent);
417        Ok(instance.start(actor, receivers))
418    }
419
420    /// Create a lightweight client instance (no actor loop, no
421    /// introspect task).  This is safe to call outside a Tokio
422    /// runtime — unlike [`actor_instance`], it never calls
423    /// `tokio::spawn`.
424    pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
425        let actor_id = self.allocate_root_id(name)?;
426        let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, None);
427        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
428        instance.change_status(ActorStatus::Client);
429        Ok((instance, handle))
430    }
431
432    /// Create a lightweight client instance that handles
433    /// [`IntrospectMessage`].
434    ///
435    /// Like [`instance`](Self::instance), this creates a client-mode
436    /// instance with no actor message loop. Unlike `instance`, it
437    /// spawns a dedicated introspect task, so the instance responds
438    /// to `IntrospectMessage::Query` and is visible and navigable in
439    /// admin tooling such as the mesh TUI.
440    ///
441    /// See CI-1, CI-2 in module doc.
442    ///
443    /// Requires an active Tokio runtime (calls `tokio::spawn`).
444    pub fn introspectable_instance(
445        &self,
446        name: &str,
447    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
448        let actor_id = self.allocate_root_id(name)?;
449        let (instance, receivers) = Instance::new(self.clone(), actor_id, false, None);
450        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
451        instance.change_status(ActorStatus::Client);
452        tokio::spawn(crate::introspect::serve_introspect(
453            instance.inner.cell.clone(),
454            instance.inner.mailbox.clone(),
455            receivers.introspect,
456        ));
457        Ok((instance, handle))
458    }
459
460    /// Create and return an actor instance, its handle, and its
461    /// receivers. This allows actors to be "inverted": the caller can
462    /// use the returned [`Instance`] to send and receive messages,
463    /// launch child actors, etc. The actor itself does not handle any
464    /// messages unless driven by the caller.
465    pub fn actor_instance<A: Actor>(&self, name: &str) -> Result<ActorInstance<A>, anyhow::Error> {
466        let actor_id = self.allocate_root_id(name)?;
467        let span = tracing::debug_span!(
468            "actor_instance",
469            actor_name = name,
470            actor_type = std::any::type_name::<A>(),
471            actor_id = actor_id.to_string(),
472        );
473        let _guard = span.enter();
474        let (instance, receivers) = Instance::new(self.clone(), actor_id.clone(), false, None);
475        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
476        instance.change_status(ActorStatus::Client);
477
478        tokio::spawn(crate::introspect::serve_introspect(
479            instance.inner.cell.clone(),
480            instance.inner.mailbox.clone(),
481            receivers.introspect,
482        ));
483
484        let (signal_rx, supervision_rx) = receivers.actor_loop.unwrap();
485        Ok(ActorInstance {
486            instance,
487            handle,
488            supervision: supervision_rx,
489            signal: signal_rx,
490            work: receivers.work,
491        })
492    }
493
494    /// Traverse all actor trees in this proc, starting from root actors (pid=0).
495    ///
496    /// **Caution:** This holds DashMap shard read locks while doing
497    /// `Weak::upgrade()` and recursively walking the actor tree per
498    /// entry. Under rapid actor churn, this causes convoy starvation
499    /// with concurrent `insert`/`remove` operations. Prefer
500    /// `all_instance_keys()` with point lookups if you only need
501    /// actor IDs. Currently unused in production code.
502    pub fn traverse<F>(&self, f: &mut F)
503    where
504        F: FnMut(&InstanceCell, usize),
505    {
506        for entry in self.state().instances.iter() {
507            if entry.key().pid() == 0 {
508                if let Some(cell) = entry.value().upgrade() {
509                    cell.traverse(f);
510                }
511            }
512        }
513    }
514
515    /// Look up an instance by ActorId.
516    pub fn get_instance(&self, actor_id: &reference::ActorId) -> Option<InstanceCell> {
517        self.state()
518            .instances
519            .get(actor_id)
520            .and_then(|weak| weak.upgrade())
521    }
522
523    /// Returns the ActorIds of all root actors (pid=0) in this proc.
524    ///
525    /// **Caution:** This iterates the full DashMap under shard read
526    /// locks. The per-entry work is lightweight (key filter + clone),
527    /// but under very rapid churn the iteration can still contend
528    /// with concurrent writes. Prefer `all_instance_keys()` with a
529    /// post-filter if this becomes a hot path. Currently unused in
530    /// production code.
531    pub fn root_actor_ids(&self) -> Vec<reference::ActorId> {
532        self.state()
533            .instances
534            .iter()
535            .filter(|entry| entry.key().pid() == 0)
536            .map(|entry| entry.key().clone())
537            .collect()
538    }
539
540    /// Returns the ActorIds of all live actors in this proc, including
541    /// dynamically spawned children.
542    ///
543    /// An actor is considered live if its weak reference is
544    /// upgradeable and its status is not terminal. This excludes
545    /// actors whose `InstanceCell` has been dropped and actors that
546    /// have stopped or failed but whose Arc is still held (e.g. by
547    /// the introspect task during teardown).
548    pub fn all_actor_ids(&self) -> Vec<reference::ActorId> {
549        self.state()
550            .instances
551            .iter()
552            .filter(|entry| {
553                entry
554                    .value()
555                    .upgrade()
556                    .is_some_and(|cell| !cell.status().borrow().is_terminal())
557            })
558            .map(|entry| entry.key().clone())
559            .collect()
560    }
561
562    /// Snapshot all instance keys from the DashMap without inspecting
563    /// values. Each shard read lock is held only long enough to clone
564    /// the key — no `Weak::upgrade()`, no `watch::borrow()`, no
565    /// `is_terminal()` check. This minimises shard lock hold time to
566    /// avoid convoy starvation with concurrent `insert`/`remove`
567    /// operations during rapid actor churn.
568    ///
569    /// The returned list may include actors that are terminal or
570    /// whose `WeakInstanceCell` no longer upgrades. Callers should
571    /// tolerate stale entries (e.g. by handling "not found" on
572    /// subsequent per-actor lookups).
573    pub fn all_instance_keys(&self) -> Vec<reference::ActorId> {
574        self.state()
575            .instances
576            .iter()
577            .map(|entry| entry.key().clone())
578            .collect()
579    }
580
581    /// Look up a terminated actor's snapshot by ID.
582    pub fn terminated_snapshot(
583        &self,
584        actor_id: &reference::ActorId,
585    ) -> Option<crate::introspect::IntrospectResult> {
586        self.state()
587            .terminated_snapshots
588            .get(actor_id)
589            .map(|e| e.value().clone())
590    }
591
592    /// Return all terminated actor IDs currently retained.
593    pub fn all_terminated_actor_ids(&self) -> Vec<reference::ActorId> {
594        self.state()
595            .terminated_snapshots
596            .iter()
597            .map(|e| e.key().clone())
598            .collect()
599    }
600
601    /// Create a child instance. Called from `Instance`.
602    fn child_instance(
603        &self,
604        parent: InstanceCell,
605    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
606        let actor_id = self.allocate_child_id(parent.actor_id())?;
607        let _ = tracing::debug_span!(
608            "child_actor_instance",
609            parent_actor_id = %parent.actor_id(),
610            actor_type = std::any::type_name::<()>(),
611            actor_id = %actor_id,
612        );
613
614        let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, Some(parent));
615        // Client-mode instance: no actor loop, no introspect task.
616        // Receivers are intentionally dropped.
617        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
618        instance.change_status(ActorStatus::Client);
619        Ok((instance, handle))
620    }
621
622    /// Spawn a child actor from the provided parent on this proc. The parent actor
623    /// must already belong to this proc, a fact which is asserted in code.
624    ///
625    /// When spawn_child returns, the child has an associated cell and is linked
626    /// with its parent.
627    pub(crate) fn spawn_child<A: Actor>(
628        &self,
629        parent: InstanceCell,
630        actor: A,
631    ) -> Result<ActorHandle<A>, anyhow::Error> {
632        let actor_id = self.allocate_child_id(parent.actor_id())?;
633        self.spawn_inner(actor_id, actor, Some(parent))
634    }
635
636    /// Spawn a named child actor. Same as `spawn_child` but the child
637    /// gets a descriptive name instead of inheriting the parent's.
638    /// Supervision linkage to parent is preserved.
639    pub(crate) fn spawn_named_child<A: Actor>(
640        &self,
641        parent: InstanceCell,
642        name: &str,
643        actor: A,
644    ) -> Result<ActorHandle<A>, anyhow::Error> {
645        let actor_id = self.allocate_named_child_id(parent.actor_id(), name)?;
646        self.spawn_inner(actor_id, actor, Some(parent))
647    }
648
649    /// Call `abort` on the `JoinHandle` associated with the given
650    /// root actor. If successful return `Some(root.clone())` else
651    /// `None`.
652    pub fn abort_root_actor(
653        &self,
654        root: &reference::ActorId,
655        this_handle: Option<&JoinHandle<()>>,
656    ) -> Option<impl Future<Output = reference::ActorId>> {
657        self.state()
658            .instances
659            .get(root)
660            .into_iter()
661            .flat_map(|e| e.upgrade())
662            .map(|cell| {
663                let r1 = root.clone();
664                let r2 = root.clone();
665                // If abort_root_actor was called from inside an actor task, we don't want to abort that actor's task yet.
666                let skip_abort = this_handle.is_some_and(|this_h| {
667                    cell.inner
668                        .actor_task_handle
669                        .get()
670                        .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
671                });
672                // `Instance::start()` is infallible and should
673                // complete quickly, so calling `wait()` on `actor_task_handle`
674                // should be safe (i.e., not hang forever).
675                async move {
676                    tokio::task::spawn_blocking(move || {
677                        if !skip_abort {
678                            let h = cell.inner.actor_task_handle.wait();
679                            tracing::debug!("{}: aborting {:?}", r1, h);
680                            h.abort();
681                        }
682                    })
683                    .await
684                    .unwrap();
685                    r2
686                }
687            })
688            .next()
689    }
690
691    /// Signals to a root actor to stop,
692    /// returning a status observer if successful.
693    pub fn stop_actor(
694        &self,
695        actor_id: &reference::ActorId,
696        reason: String,
697    ) -> Option<watch::Receiver<ActorStatus>> {
698        if let Some(entry) = self.state().instances.get(actor_id) {
699            match entry.value().upgrade() {
700                None => None, // the actor's cell has been dropped
701                Some(cell) => {
702                    tracing::info!("sending stop signal to {}", cell.actor_id());
703                    if let Err(err) = cell.signal(Signal::DrainAndStop(reason)) {
704                        tracing::error!(
705                            "{}: failed to send stop signal to pid {}: {:?}",
706                            self.proc_id(),
707                            cell.pid(),
708                            err
709                        );
710                        None
711                    } else {
712                        Some(cell.status().clone())
713                    }
714                }
715            }
716        } else {
717            tracing::error!("no actor {} found in {}", actor_id, self.proc_id());
718            None
719        }
720    }
721
722    /// Stop the proc. Returns a pair of:
723    /// - the actors observed to stop;
724    /// - the actors not observed to stop when timeout.
725    ///
726    /// If `cx` is specified, it means this method was called from inside an actor
727    /// in which case we shouldn't wait for it to stop and need to delay aborting
728    /// its task.
729    pub async fn destroy_and_wait<A: Actor>(
730        &mut self,
731        timeout: Duration,
732        cx: Option<&Context<'_, A>>,
733        reason: &str,
734    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
735        self.destroy_and_wait_except_current::<A>(timeout, cx, false, reason)
736            .await
737    }
738
739    /// Stop the proc. Returns a pair of:
740    /// - the actors observed to stop;
741    /// - the actors not observed to stop when timeout.
742    ///
743    /// If `cx` is specified, it means this method was called from inside an actor
744    /// in which case we shouldn't wait for it to stop and need to delay aborting
745    /// its task.
746    /// If except_current is true, don't stop the actor represented by "cx" at
747    /// all.
748    #[hyperactor::instrument]
749    pub async fn destroy_and_wait_except_current<A: Actor>(
750        &mut self,
751        timeout: Duration,
752        cx: Option<&Context<'_, A>>,
753        except_current: bool,
754        reason: &str,
755    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
756        tracing::debug!("{}: proc stopping", self.proc_id());
757
758        let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
759            (
760                Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
761                Some(cx.self_id())
762            )
763        });
764
765        let coordinator_id = self.supervision_coordinator_actor_id().cloned();
766
767        // Phase 1: stop all root actors except the supervision coordinator
768        // (which must stay alive to receive stop events from the others).
769        let mut statuses = HashMap::new();
770        for actor_id in self
771            .state()
772            .instances
773            .iter()
774            .filter(|entry| entry.key().pid() == 0)
775            .map(|entry| entry.key().clone())
776            .collect::<Vec<_>>()
777        {
778            if coordinator_id.as_ref() == Some(&actor_id) {
779                continue;
780            }
781            if let Some(status) = self.stop_actor(&actor_id, reason.to_string()) {
782                statuses.insert(actor_id, status);
783            }
784        }
785        tracing::debug!("{}: non-coordinator actors stopped", self.proc_id());
786
787        let waits: Vec<_> = statuses
788            .iter_mut()
789            .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
790            .map(|(actor_id, root)| {
791                let actor_id = actor_id.clone();
792                async move {
793                    tokio::time::timeout(
794                        timeout,
795                        root.wait_for(|state: &ActorStatus| state.is_terminal()),
796                    )
797                    .await
798                    .ok()
799                    .map(|_| actor_id)
800                }
801            })
802            .collect();
803
804        let results = futures::future::join_all(waits).await;
805        let mut stopped_actors: Vec<_> = results
806            .iter()
807            .filter_map(|actor_id| actor_id.as_ref())
808            .cloned()
809            .collect();
810        let aborted_actors: Vec<_> = statuses
811            .iter()
812            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
813            .map(|(actor_id, _)| {
814                let f = self.abort_root_actor(actor_id, this_handle);
815                async move {
816                    let _ = if let Some(f) = f { Some(f.await) } else { None };
817                    // If `is_none(&_)` then the associated actor's
818                    // instance cell was already dropped when we went
819                    // to call `abort()` on the cell's task handle.
820
821                    actor_id.clone()
822                }
823            })
824            .collect();
825        let mut aborted_actors = futures::future::join_all(aborted_actors).await;
826
827        // Phase 2: now that all other actors have stopped, stop the
828        // supervision coordinator so it had a chance to receive all
829        // supervision events.
830        if let Some(ref coord_id) = coordinator_id
831            && this_actor_id != Some(coord_id)
832        {
833            if let Some(mut status) = self.stop_actor(coord_id, reason.to_string()) {
834                let stopped = tokio::time::timeout(
835                    timeout,
836                    status.wait_for(|s: &ActorStatus| s.is_terminal()),
837                )
838                .await
839                .is_ok();
840                if stopped {
841                    stopped_actors.push(coord_id.clone());
842                } else {
843                    if let Some(f) = self.abort_root_actor(coord_id, this_handle) {
844                        f.await;
845                    }
846                    aborted_actors.push(coord_id.clone());
847                }
848            }
849        }
850
851        // Flush the forwarder so that any messages posted during
852        // teardown (e.g. supervision events) are wire-delivered
853        // before we tear down the proc's networking. The flush is
854        // best-effort: if the remote side has already torn down its
855        // networking, acks may never arrive and flush would hang
856        // indefinitely, so we bound it with a configurable timeout.
857        let flush_timeout = hyperactor_config::global::get(crate::config::FORWARDER_FLUSH_TIMEOUT);
858        match tokio::time::timeout(flush_timeout, self.state().forwarder.flush()).await {
859            Ok(Err(err)) => {
860                tracing::warn!(
861                    "{}: forwarder flush failed during proc exit: {:?}",
862                    self.proc_id(),
863                    err
864                );
865            }
866            Err(_elapsed) => {
867                tracing::warn!(
868                    "{}: forwarder flush timed out during proc exit",
869                    self.proc_id(),
870                );
871            }
872            Ok(Ok(())) => {}
873        }
874
875        if let Some(this_handle) = this_handle
876            && let Some(this_actor_id) = this_actor_id
877            && !except_current
878        {
879            tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
880            this_handle.abort()
881        };
882
883        tracing::info!(
884            "destroy_and_wait: {} actors stopped, {} actors aborted",
885            stopped_actors.len(),
886            aborted_actors.len()
887        );
888        Ok((stopped_actors, aborted_actors))
889    }
890
891    /// Resolve an actor reference to a **live** actor on this proc.
892    ///
893    /// Returns `None` if:
894    /// - the actor was never spawned here,
895    /// - the actor's `InstanceCell` has been dropped, or
896    /// - the actor's status is terminal (stopped or failed).
897    ///
898    /// The terminal-status check guards a race window: the introspect
899    /// task (`serve_introspect`) holds a strong `InstanceCell` Arc
900    /// and drops it only after observing terminal status. Between the
901    /// actor reaching terminal and the introspect task reacting,
902    /// `upgrade()` on the weak ref succeeds even though the actor is
903    /// dead. The `is_terminal()` check closes that window. Once the
904    /// introspect task exits, the Arc is dropped and `upgrade()`
905    /// returns `None` on its own.
906    ///
907    /// Bounds:
908    /// - `R: Actor` — must be a real actor that can live in this
909    ///   proc.
910    /// - `R: Referable` — required because the input is an
911    ///   `ActorRef<R>`.
912    pub fn resolve_actor_ref<R: Actor + Referable>(
913        &self,
914        actor_ref: &reference::ActorRef<R>,
915    ) -> Option<ActorHandle<R>> {
916        let cell = self.inner.instances.get(actor_ref.actor_id())?.upgrade()?;
917        // An actor whose status is terminal has stopped processing
918        // messages even if its InstanceCell Arc is still alive (e.g.
919        // held by the introspect task during teardown).
920        if cell.status().borrow().is_terminal() {
921            return None;
922        }
923        cell.downcast_handle()
924    }
925
926    /// Create a root allocation in the proc.
927    fn allocate_root_id(&self, name: &str) -> Result<reference::ActorId, anyhow::Error> {
928        let name = name.to_string();
929        match self.state().roots.entry(name.to_string()) {
930            Entry::Vacant(entry) => {
931                entry.insert(AtomicUsize::new(1));
932            }
933            Entry::Occupied(_) => {
934                anyhow::bail!("an actor with name '{}' has already been spawned", name)
935            }
936        }
937        Ok(reference::ActorId::new(
938            self.state().proc_id.clone(),
939            name.to_string(),
940            0,
941        ))
942    }
943
944    /// Create a child allocation in the proc.
945    #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
946    pub(crate) fn allocate_child_id(
947        &self,
948        parent_id: &reference::ActorId,
949    ) -> Result<reference::ActorId, anyhow::Error> {
950        assert_eq!(*parent_id.proc_id(), self.state().proc_id);
951        let pid = match self.state().roots.get(parent_id.name()) {
952            None => anyhow::bail!(
953                "no actor named {} in proc {}",
954                parent_id.name(),
955                self.state().proc_id
956            ),
957            Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
958        };
959        Ok(parent_id.child_id(pid))
960    }
961
962    /// Allocate an actor ID with a custom name on this proc.
963    ///
964    /// See AI-1 (named-child pid) and AI-3 (controller ActorId
965    /// uniqueness) in module doc.
966    pub(crate) fn allocate_named_child_id(
967        &self,
968        parent_id: &reference::ActorId,
969        name: &str,
970    ) -> Result<reference::ActorId, anyhow::Error> {
971        let inherited = self.allocate_child_id(parent_id)?;
972        Ok(reference::ActorId::new(
973            inherited.proc_id().clone(),
974            name,
975            inherited.pid(),
976        ))
977    }
978
979    /// Downgrade to a weak reference that doesn't prevent the proc from being dropped.
980    pub fn downgrade(&self) -> WeakProc {
981        WeakProc::new(self)
982    }
983
984    /// Flush the forwarder so that any buffered outbound messages
985    /// (e.g. supervision events posted during teardown) are
986    /// wire-delivered before the proc's networking is torn down.
987    pub async fn flush(&self) -> Result<(), anyhow::Error> {
988        self.state().forwarder.flush().await
989    }
990
991    /// Stop and join the mailbox server, flushing receive-side acks.
992    ///
993    /// This stops the `MailboxServer::serve` loop and awaits its
994    /// completion, which runs `Rx::join()` to flush any pending
995    /// transport-level acks before the channel is torn down.
996    ///
997    /// No-op if no mailbox server handle is stored (e.g. for
998    /// `Proc::configured` or `Proc::local` procs that don't serve).
999    pub async fn join_mailbox_server(&self) {
1000        let handle = self.inner.mailbox_server_handle.lock().unwrap().take();
1001        if let Some(handle) = handle {
1002            handle.stop("proc shutting down");
1003            let _ = handle.await;
1004        }
1005    }
1006}
1007
1008#[async_trait]
1009impl MailboxSender for Proc {
1010    fn post_unchecked(
1011        &self,
1012        envelope: MessageEnvelope,
1013        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1014    ) {
1015        if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
1016            self.state().proc_muxer.post(envelope, return_handle)
1017        } else {
1018            self.state().forwarder.post(envelope, return_handle)
1019        }
1020    }
1021
1022    async fn flush(&self) -> Result<(), anyhow::Error> {
1023        let (r1, r2) = futures::future::join(
1024            self.state().proc_muxer.flush(),
1025            self.state().forwarder.flush(),
1026        )
1027        .await;
1028        r1?;
1029        r2?;
1030        Ok(())
1031    }
1032}
1033
1034/// A weak reference to a Proc that doesn't prevent it from being dropped.
1035#[derive(Clone, Debug)]
1036pub struct WeakProc(Weak<ProcState>);
1037
1038impl WeakProc {
1039    fn new(proc: &Proc) -> Self {
1040        Self(Arc::downgrade(&proc.inner))
1041    }
1042
1043    /// Upgrade to a strong Proc reference, if the proc is still alive.
1044    pub fn upgrade(&self) -> Option<Proc> {
1045        self.0.upgrade().map(|inner| Proc { inner })
1046    }
1047}
1048
1049#[async_trait]
1050impl MailboxSender for WeakProc {
1051    fn post_unchecked(
1052        &self,
1053        envelope: MessageEnvelope,
1054        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1055    ) {
1056        match self.upgrade() {
1057            Some(proc) => proc.post(envelope, return_handle),
1058            None => envelope.undeliverable(
1059                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
1060                return_handle,
1061            ),
1062        }
1063    }
1064
1065    async fn flush(&self) -> Result<(), anyhow::Error> {
1066        match self.upgrade() {
1067            Some(proc) => proc.flush().await,
1068            None => Ok(()),
1069        }
1070    }
1071}
1072
1073/// Represents a single work item used by the instance to dispatch to
1074/// actor handles. Specifically, this enables handler polymorphism.
1075pub struct WorkCell<A: Actor + Send>(
1076    Box<
1077        dyn for<'a> FnOnce(
1078                &'a mut A,
1079                &'a Instance<A>,
1080            )
1081                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1082            + Send
1083            + Sync,
1084    >,
1085);
1086
1087impl<A: Actor + Send> WorkCell<A> {
1088    /// Create a new WorkCell from a concrete function (closure).
1089    fn new(
1090        f: impl for<'a> FnOnce(
1091            &'a mut A,
1092            &'a Instance<A>,
1093        )
1094            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1095        + Send
1096        + Sync
1097        + 'static,
1098    ) -> Self {
1099        Self(Box::new(f))
1100    }
1101
1102    /// Handle the message represented by this work cell.
1103    pub fn handle<'a>(
1104        self,
1105        actor: &'a mut A,
1106        instance: &'a Instance<A>,
1107    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
1108        (self.0)(actor, instance)
1109    }
1110}
1111
1112/// Context for a message currently being handled by an Instance.
1113pub struct Context<'a, A: Actor> {
1114    instance: &'a Instance<A>,
1115    headers: Flattrs,
1116}
1117
1118impl<'a, A: Actor> Context<'a, A> {
1119    /// Construct a new Context.
1120    pub fn new(instance: &'a Instance<A>, headers: Flattrs) -> Self {
1121        Self { instance, headers }
1122    }
1123
1124    /// Get a reference to the message headers.
1125    pub fn headers(&self) -> &Flattrs {
1126        &self.headers
1127    }
1128}
1129
1130impl<A: Actor> Deref for Context<'_, A> {
1131    type Target = Instance<A>;
1132
1133    fn deref(&self) -> &Self::Target {
1134        self.instance
1135    }
1136}
1137
1138/// An actor instance. This is responsible for managing a running actor, including
1139/// its full lifecycle, supervision, signal management, etc. Instances can represent
1140/// a managed actor or a "client" actor that has joined the proc.
1141pub struct Instance<A: Actor> {
1142    inner: Arc<InstanceState<A>>,
1143}
1144
1145impl<A: Actor> fmt::Debug for Instance<A> {
1146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1147        f.debug_struct("Instance").field("inner", &"..").finish()
1148    }
1149}
1150
1151struct InstanceState<A: Actor> {
1152    /// The proc that owns this instance.
1153    proc: Proc,
1154
1155    /// The instance cell that manages instance hierarchy.
1156    cell: InstanceCell,
1157
1158    /// The mailbox associated with the actor.
1159    mailbox: Mailbox,
1160
1161    ports: Arc<Ports<A>>,
1162
1163    /// A watch for communicating the actor's state.
1164    status_tx: watch::Sender<ActorStatus>,
1165
1166    /// This instance's globally unique ID.
1167    id: Uuid,
1168
1169    /// Used to assign sequence numbers for messages sent from this actor.
1170    sequencer: Sequencer,
1171
1172    /// Per-instance local storage.
1173    instance_locals: ActorLocalStorage,
1174}
1175
1176impl<A: Actor> InstanceState<A> {
1177    fn self_id(&self) -> &reference::ActorId {
1178        self.mailbox.actor_id()
1179    }
1180}
1181
1182impl<A: Actor> Drop for InstanceState<A> {
1183    fn drop(&mut self) {
1184        self.status_tx.send_if_modified(|status| {
1185            if status.is_terminal() {
1186                false
1187            } else {
1188                tracing::info!(
1189                    name = "ActorStatus",
1190                    actor_id = %self.self_id(),
1191                    actor_name = self.self_id().name(),
1192                    status = "Stopped",
1193                    prev_status = status.arm().unwrap_or("unknown"),
1194                    "instance is dropped",
1195                );
1196                *status = ActorStatus::Stopped("instance is dropped".into());
1197                true
1198            }
1199        });
1200    }
1201}
1202
1203/// Receivers created by [`Instance::new`] that must be threaded to
1204/// their respective consumers (actor loop, introspect task, etc.).
1205///
1206/// # Invariant
1207///
1208/// See S10 in `introspect` module doc.
1209pub struct InstanceReceivers<A: Actor> {
1210    /// Signal and supervision receivers for the actor loop. `None`
1211    /// for detached/client instances that don't run an actor loop.
1212    actor_loop: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1213    /// Work queue for dispatching messages to actor handlers.
1214    work: mpsc::UnboundedReceiver<WorkCell<A>>,
1215    /// Introspect message receiver for the dedicated introspect task.
1216    introspect: PortReceiver<IntrospectMessage>,
1217}
1218
1219impl<A: Actor> Instance<A> {
1220    /// Create a new actor instance in Created state.
1221    fn new(
1222        proc: Proc,
1223        actor_id: reference::ActorId,
1224        detached: bool,
1225        parent: Option<InstanceCell>,
1226    ) -> (Self, InstanceReceivers<A>) {
1227        // Set up messaging
1228        let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1229        let (work_tx, work_rx) = ordered_channel(
1230            actor_id.to_string(),
1231            hyperactor_config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
1232        );
1233        let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1234        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1235        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1236
1237        let actor_type = match TypeInfo::of::<A>() {
1238            Some(info) => ActorType::Named(info),
1239            None => ActorType::Anonymous(std::any::type_name::<A>()),
1240        };
1241        let actor_loop_ports = if detached {
1242            None
1243        } else {
1244            let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1245            let (supervision_port, supervision_receiver) = mailbox.open_port();
1246            Some((
1247                (signal_port, supervision_port),
1248                (signal_receiver, supervision_receiver),
1249            ))
1250        };
1251
1252        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1253
1254        // Introspect port: a separate channel handled by a dedicated
1255        // tokio task (not the actor's message loop). Pre-registered
1256        // so Ports::get finds the Occupied entry and skips WorkCell
1257        // creation. bind_actor_port() registers in the mailbox
1258        // dispatch table at IntrospectMessage::port().
1259        //
1260        // Exercises S3, S4, S9 (see introspect module doc).
1261        let (introspect_port, introspect_receiver) =
1262            ports.open_message_port::<IntrospectMessage>().unwrap();
1263        introspect_port.bind_actor_port();
1264
1265        let cell = InstanceCell::new(
1266            actor_id,
1267            actor_type,
1268            proc.clone(),
1269            actor_loop,
1270            status_rx,
1271            parent,
1272            ports.clone(),
1273        );
1274        let instance_id = Uuid::now_v7();
1275        let inner = Arc::new(InstanceState {
1276            proc,
1277            cell,
1278            mailbox,
1279            ports,
1280            status_tx,
1281            sequencer: Sequencer::new(instance_id),
1282            id: instance_id,
1283            instance_locals: ActorLocalStorage::new(),
1284        });
1285        (
1286            Self { inner },
1287            InstanceReceivers {
1288                actor_loop: actor_loop_receivers,
1289                work: work_rx,
1290                introspect: introspect_receiver,
1291            },
1292        )
1293    }
1294
1295    /// Notify subscribers of a change in the actors status and bump counters with the duration which
1296    /// the last status was active for.
1297    #[track_caller]
1298    pub fn change_status(&self, new: ActorStatus) {
1299        let old = self.inner.status_tx.send_replace(new.clone());
1300        // 2 cases are allowed:
1301        // * non-terminal -> non-terminal
1302        // * non-terminal -> terminal
1303        // terminal -> terminal is not allowed unless it is the same status (no-op).
1304        // terminal -> non-terminal is never allowed.
1305        assert!(
1306            !old.is_terminal() && !new.is_terminal()
1307                || !old.is_terminal() && new.is_terminal()
1308                || old == new,
1309            "actor changing status illegally, only allow non-terminal -> non-terminal \
1310            and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1311            self.self_id(),
1312            old,
1313            new
1314        );
1315        // Actor status changes between Idle and Processing when handling every
1316        // message. It creates too many logs if we want to log these 2 states.
1317        // Also, sometimes the actor transitions from Processing -> Processing.
1318        // Therefore we skip the status changes between them.
1319        if !((old.is_idle() && new.is_processing())
1320            || (old.is_processing() && new.is_idle())
1321            || old == new)
1322        {
1323            let new_status = new.arm().unwrap_or("unknown");
1324            let change_reason = match &new {
1325                ActorStatus::Failed(reason) => reason.to_string(),
1326                ActorStatus::Stopped(reason) => reason.clone(),
1327                _ => "".to_string(),
1328            };
1329            tracing::info!(
1330                name = "ActorStatus",
1331                actor_id = %self.self_id(),
1332                actor_name = self.self_id().name(),
1333                status = new_status,
1334                prev_status = old.arm().unwrap_or("unknown"),
1335                caller = %Location::caller(),
1336                change_reason,
1337            );
1338            let actor_id = hash_to_u64(self.self_id());
1339            notify_actor_status_changed(ActorStatusEvent {
1340                id: generate_actor_status_event_id(actor_id),
1341                timestamp: std::time::SystemTime::now(),
1342                actor_id,
1343                new_status: new_status.to_string(),
1344                reason: if change_reason.is_empty() {
1345                    None
1346                } else {
1347                    Some(change_reason)
1348                },
1349            });
1350        }
1351    }
1352
1353    fn is_terminal(&self) -> bool {
1354        self.inner.status_tx.borrow().is_terminal()
1355    }
1356
1357    fn is_stopping(&self) -> bool {
1358        self.inner.status_tx.borrow().is_stopping()
1359    }
1360
1361    /// This instance's actor ID.
1362    pub fn self_id(&self) -> &reference::ActorId {
1363        self.inner.self_id()
1364    }
1365
1366    /// Snapshot of this actor's introspection payload.
1367    ///
1368    /// Returns an [`IntrospectResult`] built from live [`InstanceCell`]
1369    /// state, without going through the actor message loop. This is
1370    /// safe to call from within a handler on the same actor (no
1371    /// self-send deadlock).
1372    ///
1373    /// The snapshot is best-effort: it reflects framework-owned state
1374    /// (status, message count, flight recorder, supervision children)
1375    /// at the instant of the call. `parent` is left as `None` —
1376    /// callers are responsible for setting topology context.
1377    ///
1378    /// Note: this acquires a write lock on the flight recorder spool
1379    /// and clones its contents. Suitable for occasional introspection
1380    /// requests, not for hot paths.
1381    pub fn introspect_payload(&self) -> crate::introspect::IntrospectResult {
1382        crate::introspect::live_actor_payload(&self.inner.cell)
1383    }
1384
1385    /// Publish domain-specific properties for introspection.
1386    ///
1387    /// Publish a complete Attrs bag for introspection. Replaces any
1388    /// previously published attrs.
1389    ///
1390    /// Debug builds assert that every key in the bag is tagged with
1391    /// the `INTROSPECT` meta-attribute.
1392    pub fn publish_attrs(&self, attrs: hyperactor_config::Attrs) {
1393        #[cfg(debug_assertions)]
1394        {
1395            use std::collections::HashSet;
1396            use std::sync::OnceLock;
1397
1398            use hyperactor_config::attrs::AttrKeyInfo;
1399
1400            static INTROSPECT_KEYS: OnceLock<HashSet<&'static str>> = OnceLock::new();
1401            let allowed = INTROSPECT_KEYS.get_or_init(|| {
1402                inventory::iter::<AttrKeyInfo>()
1403                    .filter(|info| info.meta.get(hyperactor_config::INTROSPECT).is_some())
1404                    .map(|info| info.name)
1405                    .collect()
1406            });
1407            for (name, _) in attrs.iter() {
1408                debug_assert!(
1409                    allowed.contains(name),
1410                    "publish_attrs: key {:?} is not tagged with INTROSPECT",
1411                    name
1412                );
1413            }
1414        }
1415        self.inner.cell.set_published_attrs(attrs);
1416    }
1417
1418    /// Publish a single attr key-value pair for introspection. Merges
1419    /// into existing published attrs (insert or overwrite).
1420    ///
1421    /// Debug builds assert that the key is tagged with the
1422    /// `INTROSPECT` meta-attribute.
1423    pub fn publish_attr<T: hyperactor_config::AttrValue>(
1424        &self,
1425        key: hyperactor_config::Key<T>,
1426        value: T,
1427    ) {
1428        debug_assert!(
1429            key.attrs().get(hyperactor_config::INTROSPECT).is_some(),
1430            "publish_attr called with non-introspection key: {}",
1431            key.name()
1432        );
1433        self.inner.cell.merge_published_attr(key, value);
1434    }
1435
1436    /// Mark this actor as system/infrastructure. System actors are
1437    /// hidden by default in the TUI (toggled via `s`).
1438    pub fn set_system(&self) {
1439        self.inner
1440            .cell
1441            .inner
1442            .is_system
1443            .store(true, Ordering::Relaxed);
1444    }
1445
1446    /// Register a callback for resolving non-addressable children.
1447    ///
1448    /// The callback runs on the actor's introspect task (not the
1449    /// actor loop), so it must be `Send + Sync` and must not access
1450    /// actor-mutable state. Capture cloned `Proc` references.
1451    ///
1452    /// Only `HostAgent` uses this today — for resolving system
1453    /// procs that have no independent `ProcAgent`.
1454    pub fn set_query_child_handler(
1455        &self,
1456        handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
1457    ) {
1458        self.inner.cell.set_query_child_handler(handler);
1459    }
1460
1461    /// Signal the actor to stop.
1462    pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
1463        tracing::info!(
1464            actor_id = %self.inner.cell.actor_id(),
1465            reason,
1466            "Instance::stop called",
1467        );
1468        self.inner
1469            .cell
1470            .signal(Signal::DrainAndStop(reason.to_string()))
1471    }
1472
1473    /// Signal the actor to abort with a provided reason.
1474    pub fn abort(&self, reason: &str) -> Result<(), ActorError> {
1475        tracing::info!(
1476            actor_id = %self.inner.cell.actor_id(),
1477            reason,
1478            "Instance::abort called",
1479        );
1480        self.inner.cell.signal(Signal::Abort(reason.to_string()))
1481    }
1482
1483    /// Open a new port that accepts M-typed messages. The returned
1484    /// port may be freely cloned, serialized, and passed around. The
1485    /// returned receiver should only be retained by the actor responsible
1486    /// for processing the delivered messages.
1487    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1488        self.inner.mailbox.open_port()
1489    }
1490
1491    /// Open a new one-shot port that accepts M-typed messages. The
1492    /// returned port may be used to send a single message; ditto the
1493    /// receiver may receive a single message.
1494    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1495        self.inner.mailbox.open_once_port()
1496    }
1497
1498    /// Get the per-instance local storage.
1499    pub fn locals(&self) -> &ActorLocalStorage {
1500        &self.inner.instance_locals
1501    }
1502
1503    /// Send a message to the actor running on the proc.
1504    pub fn post(&self, port_id: reference::PortId, headers: Flattrs, message: wirevalue::Any) {
1505        <Self as context::MailboxExt>::post(
1506            self,
1507            port_id,
1508            headers,
1509            message,
1510            true,
1511            context::SeqInfoPolicy::AssignNew,
1512        )
1513    }
1514
1515    /// Post a message with pre-set SEQ_INFO. Only for internal use by CommActor.
1516    ///
1517    /// # Warning
1518    /// This method bypasses the SEQ_INFO assertion. Do not use unless you are
1519    /// implementing mesh-level message routing (CommActor).
1520    #[doc(hidden)]
1521    pub fn post_with_external_seq_info(
1522        &self,
1523        port_id: reference::PortId,
1524        headers: Flattrs,
1525        message: wirevalue::Any,
1526    ) {
1527        <Self as context::MailboxExt>::post(
1528            self,
1529            port_id,
1530            headers,
1531            message,
1532            true,
1533            context::SeqInfoPolicy::AllowExternal,
1534        )
1535    }
1536
1537    /// Return a static client instance that can be used to send
1538    /// messages to port handles from outside an actor context
1539    /// (e.g. from background tokio tasks).
1540    // TODO: replace with a proper mechanism for sending to port
1541    // handles without an actor context.
1542    pub fn self_client() -> &'static Instance<()> {
1543        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1544        &CLIENT
1545            .get_or_init(|| Proc::runtime().instance("self_message_client").unwrap())
1546            .0
1547    }
1548
1549    /// Send a message to the actor itself with a delay usually to trigger some event.
1550    pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1551    where
1552        M: Message,
1553        A: Handler<M>,
1554    {
1555        let client = Self::self_client();
1556        let port = self.port();
1557        let self_id = self.self_id().clone();
1558        tokio::spawn(async move {
1559            tokio::time::sleep(delay).await;
1560            if let Err(e) = port.send(&client, message) {
1561                // TODO: this is a fire-n-forget thread. We need to
1562                // handle errors in a better way.
1563                tracing::info!("{}: error sending delayed message: {}", self_id, e);
1564            }
1565        });
1566        Ok(())
1567    }
1568
1569    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
1570    /// the actor has been linked with its parent, if it has one.
1571    fn start(self, actor: A, receivers: InstanceReceivers<A>) -> ActorHandle<A> {
1572        let instance_cell = self.inner.cell.clone();
1573        let actor_id = self.inner.cell.actor_id().clone();
1574        let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1575
1576        // Spawn the introspect task — a separate tokio task that
1577        // reads InstanceCell directly and replies via the actor's
1578        // Mailbox. The actor loop never sees IntrospectMessage.
1579        tokio::spawn(crate::introspect::serve_introspect(
1580            self.inner.cell.clone(),
1581            self.inner.mailbox.clone(),
1582            receivers.introspect,
1583        ));
1584
1585        let actor_loop_receivers = receivers
1586            .actor_loop
1587            .expect("non-detached instance must have actor loop receivers");
1588        let actor_task_handle = A::spawn_server_task(
1589            panic_handler::with_backtrace_tracking(self.serve(
1590                actor,
1591                actor_loop_receivers,
1592                receivers.work,
1593            ))
1594            .instrument(Span::current()),
1595        );
1596        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1597        instance_cell
1598            .inner
1599            .actor_task_handle
1600            .set(actor_task_handle)
1601            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1602
1603        actor_handle
1604    }
1605
1606    async fn serve(
1607        mut self,
1608        mut actor: A,
1609        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1610        mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1611    ) {
1612        let result = self
1613            .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1614            .await;
1615
1616        assert!(self.is_stopping());
1617        let event = match result {
1618            Ok(stop_reason) => {
1619                let status = ActorStatus::Stopped(stop_reason);
1620                self.mailbox().close(status.clone());
1621                let event = ActorSupervisionEvent::new(
1622                    self.inner.cell.actor_id().clone(),
1623                    actor.display_name(),
1624                    status.clone(),
1625                    None,
1626                );
1627                // FI-1: store supervision_event BEFORE change_status.
1628                *self.inner.cell.inner.supervision_event.lock().unwrap() = Some(event.clone());
1629                self.change_status(status);
1630                Some(event)
1631            }
1632            Err(err) => {
1633                match *err.kind {
1634                    ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1635                        // We use the event's actor_status as this actor's terminal status.
1636                        assert!(event.actor_status.is_terminal());
1637                        self.mailbox().close(event.actor_status.clone());
1638                        // FI-1: store supervision_event BEFORE change_status.
1639                        *self.inner.cell.inner.supervision_event.lock().unwrap() =
1640                            Some(event.clone());
1641                        self.change_status(event.actor_status.clone());
1642                        Some(event)
1643                    }
1644                    _ => {
1645                        let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1646                        let status = ActorStatus::Failed(error_kind);
1647                        self.mailbox().close(status.clone());
1648                        let event = ActorSupervisionEvent::new(
1649                            self.inner.cell.actor_id().clone(),
1650                            actor.display_name(),
1651                            status.clone(),
1652                            None,
1653                        );
1654                        // FI-1: store supervision_event BEFORE change_status.
1655                        *self.inner.cell.inner.supervision_event.lock().unwrap() =
1656                            Some(event.clone());
1657                        self.change_status(status);
1658                        Some(event)
1659                    }
1660                }
1661            }
1662        };
1663
1664        if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1665            if let Some(event) = event {
1666                // Parent exists, failure should be propagated to the parent.
1667                parent.send_supervision_event_or_crash(&self, event);
1668            }
1669            // TODO: we should get rid of this signal, and use *only* supervision events for
1670            // the purpose of conveying lifecycle changes
1671            if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1672                tracing::error!(
1673                    "{}: failed to send stop message to parent pid {}: {:?}",
1674                    self.self_id(),
1675                    parent.pid(),
1676                    err
1677                );
1678            }
1679        } else {
1680            // Failure happened to the root actor or orphaned child actors.
1681            // In either case, the failure should be propagated to proc.
1682            //
1683            // Note that orphaned actor is unexpected and would only happen if
1684            // there is a bug.
1685            if let Some(event) = event {
1686                self.inner
1687                    .proc
1688                    .handle_unhandled_supervision_event(&self, event);
1689            }
1690        }
1691    }
1692
1693    /// Runs the actor, and manages its supervision tree. When the function returns,
1694    /// the whole tree rooted at this actor has stopped. On success, returns the reason
1695    /// why the actor stopped. On failure, returns the error that caused the failure.
1696    async fn run_actor_tree(
1697        &mut self,
1698        actor: &mut A,
1699        mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1700        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1701    ) -> Result<String, ActorError> {
1702        // It is okay to catch all panics here, because we are in a tokio task,
1703        // and tokio will catch the panic anyway:
1704        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
1705        // What we do here is just to catch it early so we can handle it.
1706
1707        let mut did_panic = false;
1708        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1709            .catch_unwind()
1710            .await
1711        {
1712            Ok(result) => result,
1713            Err(_) => {
1714                did_panic = true;
1715                let panic_info = panic_handler::take_panic_info()
1716                    .map(|info| info.to_string())
1717                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1718                Err(ActorError::new(
1719                    self.self_id(),
1720                    ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1721                ))
1722            }
1723        };
1724
1725        assert!(!self.is_terminal());
1726        self.change_status(ActorStatus::Stopping);
1727        if let Err(err) = &result {
1728            tracing::error!("{}: actor failure: {}", self.self_id(), err);
1729        }
1730
1731        // After this point, we know we won't spawn any more children,
1732        // so we can safely read the current child keys.
1733        let mut to_unlink = Vec::new();
1734        for child in self.inner.cell.child_iter() {
1735            if let Err(err) = child
1736                .value()
1737                .signal(Signal::Stop("parent stopping".to_string()))
1738            {
1739                tracing::error!(
1740                    "{}: failed to send stop signal to child pid {}: {:?}",
1741                    self.self_id(),
1742                    child.key(),
1743                    err
1744                );
1745                to_unlink.push(child.value().clone());
1746            }
1747        }
1748        // Manually unlink children that have already been stopped.
1749        for child in to_unlink {
1750            self.inner.cell.unlink(&child);
1751        }
1752
1753        let (mut signal_receiver, _) = actor_loop_receivers;
1754        while self.inner.cell.child_count() > 0 {
1755            match tokio::time::timeout(Duration::from_millis(500), signal_receiver.recv()).await {
1756                Ok(signal) => {
1757                    if let Signal::ChildStopped(pid) = signal? {
1758                        assert!(self.inner.cell.get_child(pid).is_none());
1759                    }
1760                }
1761                Err(_) => {
1762                    tracing::warn!(
1763                        "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1764                        self.self_id()
1765                    );
1766                    // No more waiting to receive messages. Unlink all remaining
1767                    // children.
1768                    self.inner.cell.unlink_all();
1769                    break;
1770                }
1771            }
1772        }
1773        // Run the actor cleanup function before the actor stops to delete
1774        // resources. If it times out, continue with stopping the actor.
1775        // Don't call it if there was a panic, because the actor may
1776        // be in an invalid state and unable to access anything, for example
1777        // the GIL.
1778        let cleanup_result = if !did_panic {
1779            let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
1780            match tokio::time::timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1781                .await
1782            {
1783                Ok(Ok(x)) => Ok(x),
1784                Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1785                Err(e) => Err(ActorError::new(
1786                    self.self_id(),
1787                    ActorErrorKind::cleanup(e.into()),
1788                )),
1789            }
1790        } else {
1791            Ok(())
1792        };
1793        if let Err(ref actor_err) = result {
1794            // The original result error takes precedence over the cleanup error,
1795            // so make sure the cleanup error is still logged in that case.
1796            if let Err(ref err) = cleanup_result {
1797                tracing::warn!(
1798                    cleanup_err = %err,
1799                    %actor_err,
1800                    "ignoring cleanup error after actor error",
1801                );
1802            }
1803        }
1804        // If the original exit was not an error, let cleanup errors be
1805        // surfaced.
1806        result.and_then(|reason| cleanup_result.map(|_| reason))
1807    }
1808
1809    /// Initialize and run the actor until it fails or is stopped. On success,
1810    /// returns the reason why the actor stopped. On failure, returns the error
1811    /// that caused the failure.
1812    async fn run(
1813        &mut self,
1814        actor: &mut A,
1815        actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1816        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1817    ) -> Result<String, ActorError> {
1818        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1819
1820        self.change_status(ActorStatus::Initializing);
1821        actor
1822            .init(self)
1823            .await
1824            .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1825        let need_drain;
1826        let stop_reason;
1827        'messages: loop {
1828            self.change_status(ActorStatus::Idle);
1829            let metric_pairs =
1830                hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1831            tokio::select! {
1832                work = work_rx.recv() => {
1833                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1834                    ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1835                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1836                    let work = work.expect("inconsistent work queue state");
1837                    if let Err(err) = work.handle(actor, self).await {
1838                        for supervision_event in supervision_event_receiver.drain() {
1839                            self.handle_supervision_event(actor, supervision_event).await?;
1840                        }
1841                        let kind = ActorErrorKind::processing(err);
1842                        return Err(ActorError {
1843                            actor_id: Box::new(self.self_id().clone()),
1844                            kind: Box::new(kind),
1845                        });
1846                    }
1847                }
1848                signal = signal_receiver.recv() => {
1849                    let signal = signal.map_err(ActorError::from);
1850                    tracing::debug!("Received signal {signal:?}");
1851                    match signal? {
1852                        Signal::Stop(reason) => {
1853                            need_drain = false;
1854                            stop_reason = reason;
1855                            break 'messages;
1856                        },
1857                        Signal::DrainAndStop(reason) => {
1858                            need_drain = true;
1859                            stop_reason = reason;
1860                            break 'messages;
1861                        },
1862                        Signal::ChildStopped(pid) => {
1863                            assert!(self.inner.cell.get_child(pid).is_none());
1864                        },
1865                        Signal::Abort(reason) => {
1866                            return Err(ActorError { actor_id: Box::new(self.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) });
1867                        }
1868                    }
1869                }
1870                Ok(supervision_event) = supervision_event_receiver.recv() => {
1871                    self.handle_supervision_event(actor, supervision_event).await?;
1872                }
1873            }
1874            self.inner
1875                .cell
1876                .inner
1877                .num_processed_messages
1878                .fetch_add(1, Ordering::SeqCst);
1879        }
1880
1881        if need_drain {
1882            let mut n = 0;
1883            while let Ok(work) = work_rx.try_recv() {
1884                if let Err(err) = work.handle(actor, self).await {
1885                    return Err(ActorError::new(
1886                        self.self_id(),
1887                        ActorErrorKind::processing(err),
1888                    ));
1889                }
1890                n += 1;
1891            }
1892            tracing::debug!("drained {} messages", n);
1893        }
1894        tracing::debug!(
1895            actor_id = %self.self_id(),
1896            reason = stop_reason,
1897            "exited actor loop",
1898        );
1899        Ok(stop_reason)
1900    }
1901
1902    /// Handle a supervision event using the provided actor.
1903    pub async fn handle_supervision_event(
1904        &self,
1905        actor: &mut A,
1906        supervision_event: ActorSupervisionEvent,
1907    ) -> Result<(), ActorError> {
1908        // Handle the supervision event with the current actor.
1909        match actor
1910            .handle_supervision_event(self, &supervision_event)
1911            .await
1912        {
1913            Ok(true) => {
1914                // The supervision event was handled by this actor, nothing more to do.
1915                Ok(())
1916            }
1917            Ok(false) => {
1918                let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1919                Err(ActorError::new(self.self_id(), kind))
1920            }
1921            Err(err) => {
1922                // The actor failed to handle the supervision event, it should die.
1923                // Create a new supervision event for this failure and propagate it.
1924                let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1925                    err.to_string(),
1926                    Box::new(supervision_event),
1927                );
1928                Err(ActorError::new(self.self_id(), kind))
1929            }
1930        }
1931    }
1932
1933    async unsafe fn handle_message<M: Message>(
1934        &self,
1935        actor: &mut A,
1936        type_info: Option<&'static TypeInfo>,
1937        headers: Flattrs,
1938        message: M,
1939    ) -> Result<(), anyhow::Error>
1940    where
1941        A: Handler<M>,
1942    {
1943        // Build HandlerInfo from TypeInfo (zero-copy) or fall back to type_name.
1944        let handler_info = match type_info {
1945            Some(info) => {
1946                // SAFETY: The caller promises to pass the correct type info.
1947                let arm = unsafe { info.arm_unchecked(&message as *const M as *const ()) };
1948                HandlerInfo::from_static(info.typename(), arm)
1949            }
1950            None => {
1951                // Fall back to std::any::type_name (also static, zero-copy).
1952                HandlerInfo::from_static(std::any::type_name::<M>(), None)
1953            }
1954        };
1955
1956        let endpoint = type_info.and_then(|info| {
1957            // SAFETY: The caller promises to pass the correct type info.
1958            unsafe { info.endpoint_name(&message as *const M as *const ()) }
1959        });
1960
1961        // Use a helper function for a better instrument log.
1962        self.handle_message_with_handler_info(actor, handler_info, headers, message, endpoint)
1963            .await
1964    }
1965
1966    // Skip serializing all fields except HandlerInfo which includes the typename.
1967    #[tracing::instrument(level = "debug", name = "handle_message", skip_all, fields(actor_id = %self.self_id(), message_type = %handler_info))]
1968    async fn handle_message_with_handler_info<M: Message>(
1969        &self,
1970        actor: &mut A,
1971        handler_info: HandlerInfo,
1972        headers: Flattrs,
1973        message: M,
1974        endpoint: Option<String>,
1975    ) -> Result<(), anyhow::Error>
1976    where
1977        A: Handler<M>,
1978    {
1979        let now = std::time::SystemTime::now();
1980        let handler_info = Some(handler_info);
1981        self.change_status(ActorStatus::Processing(now, handler_info.clone()));
1982        crate::mailbox::headers::log_message_latency_if_sampling(
1983            &headers,
1984            self.self_id().to_string(),
1985        );
1986
1987        let message_id = headers.get(crate::mailbox::headers::TELEMETRY_MESSAGE_ID);
1988
1989        if let Some(message_id) = message_id {
1990            let from_actor_id = headers
1991                .get(crate::mailbox::headers::SENDER_ACTOR_ID_HASH)
1992                .unwrap_or(0);
1993            let to_actor_id = hash_to_u64(self.self_id());
1994            let port_id = headers.get(crate::mailbox::headers::TELEMETRY_PORT_ID);
1995
1996            notify_message(hyperactor_telemetry::MessageEvent {
1997                timestamp: now,
1998                id: message_id,
1999                from_actor_id,
2000                to_actor_id,
2001                endpoint,
2002                port_id,
2003            });
2004
2005            notify_message_status(hyperactor_telemetry::MessageStatusEvent {
2006                timestamp: now,
2007                id: hyperactor_telemetry::generate_status_event_id(message_id),
2008                message_id,
2009                status: "active".to_string(),
2010            });
2011        }
2012
2013        // Record the message handler being invoked.
2014        *self.inner.cell.inner.last_message_handler.write().unwrap() = handler_info;
2015
2016        let context = Context::new(self, headers);
2017        // Pass a reference to the context to the handler, so that deref
2018        // coercion allows the `this` argument to be treated exactly like
2019        // &Instance<A>.
2020        let start = Instant::now();
2021        let result = actor
2022            .handle(&context, message)
2023            .instrument(self.inner.cell.inner.recording.span())
2024            .await;
2025        let elapsed_us = start.elapsed().as_micros() as u64;
2026        self.inner
2027            .cell
2028            .inner
2029            .total_processing_time_us
2030            .fetch_add(elapsed_us, Ordering::SeqCst);
2031
2032        if let Some(message_id) = message_id {
2033            notify_message_status(hyperactor_telemetry::MessageStatusEvent {
2034                timestamp: std::time::SystemTime::now(),
2035                id: hyperactor_telemetry::generate_status_event_id(message_id),
2036                message_id,
2037                status: "complete".to_string(),
2038            });
2039        }
2040
2041        result
2042    }
2043
2044    /// Spawn on child on this instance.
2045    pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
2046        self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
2047    }
2048
2049    /// Spawn a named child actor on this instance. The child gets a
2050    /// descriptive name in its ActorId instead of inheriting this
2051    /// instance's name. Supervision linkage is preserved.
2052    pub fn spawn_with_name<C: Actor>(
2053        &self,
2054        name: &str,
2055        actor: C,
2056    ) -> anyhow::Result<ActorHandle<C>> {
2057        self.inner
2058            .proc
2059            .spawn_named_child(self.inner.cell.clone(), name, actor)
2060    }
2061
2062    /// Create a new direct child instance.
2063    pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
2064        self.inner.proc.child_instance(self.inner.cell.clone())
2065    }
2066
2067    /// Return a handle port handle representing the actor's message
2068    /// handler for M-typed messages.
2069    pub fn port<M: Message>(&self) -> PortHandle<M>
2070    where
2071        A: Handler<M>,
2072    {
2073        self.inner.ports.get()
2074    }
2075
2076    /// The [`ActorHandle`] corresponding to this instance.
2077    pub fn handle(&self) -> ActorHandle<A> {
2078        ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
2079    }
2080
2081    /// The owning actor ref.
2082    pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
2083        self.inner.cell.bind(self.inner.ports.as_ref())
2084    }
2085
2086    // Temporary in order to support python bindings.
2087    #[doc(hidden)]
2088    pub fn mailbox_for_py(&self) -> &Mailbox {
2089        &self.inner.mailbox
2090    }
2091
2092    /// The owning proc.
2093    pub fn proc(&self) -> &Proc {
2094        &self.inner.proc
2095    }
2096
2097    /// Clone this Instance to get an owned struct that can be
2098    /// plumbed through python. This should really only be called
2099    /// for the explicit purpose of being passed into python
2100    #[doc(hidden)]
2101    pub fn clone_for_py(&self) -> Self {
2102        Self {
2103            inner: Arc::clone(&self.inner),
2104        }
2105    }
2106
2107    /// Get the join handle associated with this actor.
2108    fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
2109        self.inner.cell.inner.actor_task_handle.get()
2110    }
2111
2112    /// Return this instance's sequencer.
2113    pub fn sequencer(&self) -> &Sequencer {
2114        &self.inner.sequencer
2115    }
2116
2117    /// Return this instance's ID.
2118    pub fn instance_id(&self) -> Uuid {
2119        self.inner.id
2120    }
2121
2122    /// Return a handle to this instance's parent actor, if it has one.
2123    pub fn parent_handle<P: Actor>(&self) -> Option<ActorHandle<P>> {
2124        let parent_cell = self.inner.cell.inner.parent.upgrade()?;
2125        let ports = if let Ok(ports) = parent_cell.inner.ports.clone().downcast() {
2126            ports
2127        } else {
2128            return None;
2129        };
2130        Some(ActorHandle::new(parent_cell, ports))
2131    }
2132}
2133
2134impl<A: Actor> context::Mailbox for Instance<A> {
2135    fn mailbox(&self) -> &Mailbox {
2136        &self.inner.mailbox
2137    }
2138}
2139
2140impl<A: Actor> context::Mailbox for Context<'_, A> {
2141    fn mailbox(&self) -> &Mailbox {
2142        &self.instance.inner.mailbox
2143    }
2144}
2145
2146impl<A: Actor> context::Mailbox for &Instance<A> {
2147    fn mailbox(&self) -> &Mailbox {
2148        &self.inner.mailbox
2149    }
2150}
2151
2152impl<A: Actor> context::Mailbox for &Context<'_, A> {
2153    fn mailbox(&self) -> &Mailbox {
2154        &self.instance.inner.mailbox
2155    }
2156}
2157
2158impl<A: Actor> context::Actor for Instance<A> {
2159    type A = A;
2160    fn instance(&self) -> &Instance<A> {
2161        self
2162    }
2163}
2164
2165impl<A: Actor> context::Actor for Context<'_, A> {
2166    type A = A;
2167    fn instance(&self) -> &Instance<A> {
2168        self
2169    }
2170}
2171
2172impl<A: Actor> context::Actor for &Instance<A> {
2173    type A = A;
2174    fn instance(&self) -> &Instance<A> {
2175        self
2176    }
2177}
2178
2179impl<A: Actor> context::Actor for &Context<'_, A> {
2180    type A = A;
2181    fn instance(&self) -> &Instance<A> {
2182        self
2183    }
2184}
2185
2186impl Instance<()> {
2187    /// See [Mailbox::bind_actor_port] for details.
2188    pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
2189        assert!(
2190            self.actor_task_handle().is_none(),
2191            "can only bind actor port on instance with no running actor task"
2192        );
2193        self.inner.mailbox.bind_actor_port()
2194    }
2195}
2196
2197#[derive(Debug)]
2198enum ActorType {
2199    Named(&'static TypeInfo),
2200    Anonymous(&'static str),
2201}
2202
2203impl ActorType {
2204    fn type_name(&self) -> &str {
2205        match self {
2206            ActorType::Named(info) => info.typename(),
2207            ActorType::Anonymous(name) => name,
2208        }
2209    }
2210}
2211
2212/// InstanceCell contains all of the type-erased, shareable state of an instance.
2213/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
2214/// to access the underlying instance.
2215///
2216/// InstanceCell is reference counted and cloneable.
2217#[derive(Clone)]
2218pub struct InstanceCell {
2219    inner: Arc<InstanceCellState>,
2220}
2221
2222impl fmt::Debug for InstanceCell {
2223    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2224        f.debug_struct("InstanceCell")
2225            .field("actor_id", &self.inner.actor_id)
2226            .field("actor_type", &self.inner.actor_type)
2227            .finish()
2228    }
2229}
2230
2231struct InstanceCellState {
2232    /// The actor's id.
2233    actor_id: reference::ActorId,
2234
2235    /// Actor info contains the actor's type information.
2236    actor_type: ActorType,
2237
2238    /// The proc in which the actor is running.
2239    proc: Proc,
2240
2241    /// Control port handles to the actor loop, if one is running.
2242    actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2243
2244    /// An observer that stores the current status of the actor.
2245    status: watch::Receiver<ActorStatus>,
2246
2247    /// A weak reference to this instance's parent.
2248    parent: WeakInstanceCell,
2249
2250    /// This instance's children by their PIDs.
2251    children: DashMap<reference::Index, InstanceCell>,
2252
2253    /// Access to the spawned actor's join handle.
2254    actor_task_handle: OnceLock<JoinHandle<()>>,
2255
2256    /// The set of named ports that are exported by this actor.
2257    exported_named_ports: DashMap<u64, &'static str>,
2258
2259    /// The number of messages processed by this actor.
2260    num_processed_messages: AtomicU64,
2261
2262    /// When this actor was created.
2263    created_at: SystemTime,
2264
2265    /// Name of the last message handler invoked.
2266    last_message_handler: RwLock<Option<HandlerInfo>>,
2267
2268    /// Total time spent processing messages, in microseconds.
2269    total_processing_time_us: AtomicU64,
2270
2271    /// The log recording associated with this actor. It is used to
2272    /// store a 'flight record' of events while the actor is running.
2273    recording: Recording,
2274
2275    /// Attrs-based introspection data published by the actor. Written
2276    /// by the actor via `Instance::publish_attrs()` /
2277    /// `Instance::publish_attr()`, and read by the introspection
2278    /// runtime handler when building node payloads.
2279    ///
2280    /// This bag may contain both mesh-level keys (`node_type`,
2281    /// `addr`, `num_procs`, ...) and actor-runtime keys (`status`,
2282    /// `messages_processed`, ...).
2283    published_attrs: RwLock<Option<hyperactor_config::Attrs>>,
2284
2285    /// Optional callback for resolving non-addressable children
2286    /// (e.g., system procs). Registered by infrastructure actors
2287    /// like `HostAgent` in `Actor::init`. Invoked by the
2288    /// introspection runtime handler for `QueryChild` messages.
2289    /// `None` means `QueryChild` returns a "not_found" error.
2290    ///
2291    /// See S7 in `introspect` module doc.
2292    query_child_handler: RwLock<
2293        Option<Box<dyn (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync>>,
2294    >,
2295
2296    /// The supervision event for this actor's failure, if any.
2297    /// See FI-1, FI-2 in `introspect` module doc.
2298    supervision_event: std::sync::Mutex<Option<crate::supervision::ActorSupervisionEvent>>,
2299
2300    /// Whether this actor is infrastructure/system (hidden by default
2301    /// in the TUI `s` toggle). Set by spawning code via
2302    /// `Instance::set_system()`.
2303    is_system: AtomicBool,
2304
2305    /// A type-erased reference to Ports<A>, which allows us to recover
2306    /// an ActorHandle<A> by downcasting.
2307    ports: Arc<dyn Any + Send + Sync>,
2308}
2309
2310impl InstanceCellState {
2311    /// Unlink this instance from its parent, if it has one. If it was unlinked,
2312    /// the parent is returned.
2313    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2314        self.parent
2315            .upgrade()
2316            .filter(|parent| parent.inner.unlink(self))
2317    }
2318
2319    /// Unlink this instance from a child.
2320    fn unlink(&self, child: &InstanceCellState) -> bool {
2321        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
2322        self.children.remove(&child.actor_id.pid()).is_some()
2323    }
2324}
2325
2326/// Select which terminated snapshots to evict when the retention cap
2327/// is exceeded.
2328///
2329/// Each entry is `(actor_id, Option<occurred_at>)` where `Some` means
2330/// the actor has `failure_info` (i.e. it failed), and `None` means a
2331/// clean stop.
2332///
2333/// Eviction priority:
2334/// 1. Cleanly-stopped actors are evicted first (arbitrary order).
2335/// 2. If more evictions are needed, failed actors are evicted
2336///    newest-first (descending `occurred_at`), preserving the
2337///    earliest failures which are closest to the root cause.
2338fn select_eviction_candidates(
2339    entries: &[(reference::ActorId, Option<String>)],
2340    excess: usize,
2341) -> Vec<reference::ActorId> {
2342    let mut clean: Vec<&reference::ActorId> = Vec::new();
2343    let mut failed: Vec<(&reference::ActorId, &str)> = Vec::new();
2344    for (id, occurred_at) in entries {
2345        match occurred_at {
2346            Some(ts) => failed.push((id, ts.as_str())),
2347            None => clean.push(id),
2348        }
2349    }
2350
2351    let mut to_remove: Vec<reference::ActorId> = Vec::new();
2352    let mut remaining = excess;
2353
2354    // Evict cleanly-stopped first.
2355    for id in clean {
2356        if remaining == 0 {
2357            break;
2358        }
2359        to_remove.push(id.clone());
2360        remaining -= 1;
2361    }
2362
2363    // If still over cap, evict most-recent failures first.
2364    if remaining > 0 {
2365        failed.sort_by(|a, b| b.1.cmp(a.1));
2366        for (id, _) in failed.into_iter().take(remaining) {
2367            to_remove.push(id.clone());
2368        }
2369    }
2370
2371    to_remove
2372}
2373
2374impl InstanceCell {
2375    /// Creates a new instance cell with the provided internal state. If a parent
2376    /// is provided, it is linked to this cell.
2377    fn new(
2378        actor_id: reference::ActorId,
2379        actor_type: ActorType,
2380        proc: Proc,
2381        actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2382        status: watch::Receiver<ActorStatus>,
2383        parent: Option<InstanceCell>,
2384        ports: Arc<dyn Any + Send + Sync>,
2385    ) -> Self {
2386        let _ais = actor_id.to_string();
2387        let cell = Self {
2388            inner: Arc::new(InstanceCellState {
2389                actor_id: actor_id.clone(),
2390                actor_type,
2391                proc: proc.clone(),
2392                actor_loop,
2393                status,
2394                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
2395                children: DashMap::new(),
2396                actor_task_handle: OnceLock::new(),
2397                exported_named_ports: DashMap::new(),
2398                num_processed_messages: AtomicU64::new(0),
2399                created_at: std::time::SystemTime::now(),
2400                last_message_handler: RwLock::new(None),
2401                total_processing_time_us: AtomicU64::new(0),
2402                recording: hyperactor_telemetry::recorder().record(64),
2403                published_attrs: RwLock::new(None),
2404                query_child_handler: RwLock::new(None),
2405                supervision_event: std::sync::Mutex::new(None),
2406                is_system: AtomicBool::new(false),
2407                ports,
2408            }),
2409        };
2410        cell.maybe_link_parent();
2411        proc.inner
2412            .instances
2413            .insert(actor_id.clone(), cell.downgrade());
2414        cell
2415    }
2416
2417    fn wrap(inner: Arc<InstanceCellState>) -> Self {
2418        Self { inner }
2419    }
2420
2421    /// The actor's ID.
2422    pub fn actor_id(&self) -> &reference::ActorId {
2423        &self.inner.actor_id
2424    }
2425
2426    /// The actor's PID.
2427    pub(crate) fn pid(&self) -> reference::Index {
2428        self.inner.actor_id.pid()
2429    }
2430
2431    /// The actor's join handle.
2432    #[allow(dead_code)]
2433    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
2434        self.inner.actor_task_handle.get()
2435    }
2436
2437    /// The instance's status observer.
2438    pub fn status(&self) -> &watch::Receiver<ActorStatus> {
2439        &self.inner.status
2440    }
2441
2442    /// The supervision event stored when this actor failed.
2443    /// `None` for actors that stopped cleanly or are still running.
2444    pub fn supervision_event(&self) -> Option<crate::supervision::ActorSupervisionEvent> {
2445        self.inner.supervision_event.lock().unwrap().clone()
2446    }
2447
2448    /// Send a signal to the actor.
2449    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
2450        if let Some((signal_port, _)) = &self.inner.actor_loop {
2451            // A global signal client is used to send signals to the actor.
2452            static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
2453            let client = &CLIENT
2454                .get_or_init(|| Proc::runtime().instance("global_signal_client").unwrap())
2455                .0;
2456            signal_port.send(&client, signal).map_err(ActorError::from)
2457        } else {
2458            tracing::warn!(
2459                "{}: attempted to send signal {} to detached actor",
2460                self.inner.actor_id,
2461                signal
2462            );
2463            Ok(())
2464        }
2465    }
2466
2467    /// Used by this actor's children to send a supervision event to this actor.
2468    /// When it fails to send, we will crash the process. As part of the crash,
2469    /// all the procs and actors running on this process will be terminated
2470    /// forcefully.
2471    ///
2472    /// Note that "let it crash" is the default behavior when a supervision event
2473    /// cannot be delivered upstream. It is the upstream's responsibility to
2474    /// detect and handle crashes.
2475    pub fn send_supervision_event_or_crash(
2476        &self,
2477        child_cx: &impl context::Actor, // context of the child who sends the event.
2478        event: ActorSupervisionEvent,
2479    ) {
2480        match &self.inner.actor_loop {
2481            Some((_, supervision_port)) => {
2482                if let Err(err) = supervision_port.send(child_cx, event.clone()) {
2483                    if !event.is_error() {
2484                        // Normal lifecycle events (e.g. clean stop) that fail to
2485                        // send are silently dropped. This happens when a child
2486                        // stops after the parent's mailbox has been closed or its
2487                        // supervision port receiver has been dropped (e.g. client
2488                        // instances created via Proc::instance()).
2489                        tracing::debug!(
2490                            "{}: dropping non-error supervision event {}: {:?}",
2491                            self.actor_id(),
2492                            event,
2493                            err
2494                        );
2495                        return;
2496                    }
2497                    tracing::error!(
2498                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
2499                        self.actor_id(),
2500                        err
2501                    );
2502                    std::process::exit(1);
2503                }
2504            }
2505            None => {
2506                if !event.is_error() {
2507                    tracing::debug!(
2508                        "{}: dropping non-error supervision event {} to detached actor",
2509                        self.actor_id(),
2510                        event,
2511                    );
2512                    return;
2513                }
2514                tracing::error!(
2515                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
2516                    self.actor_id(),
2517                    event,
2518                );
2519                std::process::exit(1);
2520            }
2521        }
2522    }
2523
2524    /// Downgrade this InstanceCell to a weak reference.
2525    pub fn downgrade(&self) -> WeakInstanceCell {
2526        WeakInstanceCell {
2527            inner: Arc::downgrade(&self.inner),
2528        }
2529    }
2530
2531    /// Link this instance to a new child.
2532    fn link(&self, child: InstanceCell) {
2533        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2534        self.inner.children.insert(child.pid(), child);
2535    }
2536
2537    /// Unlink this instance from a child.
2538    fn unlink(&self, child: &InstanceCell) {
2539        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2540        self.inner.children.remove(&child.pid());
2541    }
2542
2543    /// Unlink this instance from all children.
2544    fn unlink_all(&self) {
2545        self.inner.children.clear();
2546    }
2547
2548    /// Link this instance to its parent, if it has one.
2549    fn maybe_link_parent(&self) {
2550        if let Some(parent) = self.inner.parent.upgrade() {
2551            parent.link(self.clone());
2552        }
2553    }
2554
2555    /// Unlink this instance from its parent, if it has one. If it was unlinked,
2556    /// the parent is returned.
2557    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2558        self.inner.maybe_unlink_parent()
2559    }
2560
2561    /// Return an iterator over this instance's children. This may deadlock if the
2562    /// caller already holds a reference to any item in map.
2563    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, reference::Index, InstanceCell>> {
2564        self.inner.children.iter()
2565    }
2566
2567    /// The number of children this instance has.
2568    pub fn child_count(&self) -> usize {
2569        self.inner.children.len()
2570    }
2571
2572    /// Returns the ActorIds of this instance's direct children.
2573    pub fn child_actor_ids(&self) -> Vec<reference::ActorId> {
2574        self.inner
2575            .children
2576            .iter()
2577            .map(|entry| entry.value().actor_id().clone())
2578            .collect()
2579    }
2580
2581    /// Get a child by its PID.
2582    fn get_child(&self, pid: reference::Index) -> Option<InstanceCell> {
2583        self.inner.children.get(&pid).map(|child| child.clone())
2584    }
2585
2586    /// Access the flight recorder for this actor.
2587    pub fn recording(&self) -> &Recording {
2588        &self.inner.recording
2589    }
2590
2591    /// When this actor was created.
2592    pub fn created_at(&self) -> SystemTime {
2593        self.inner.created_at
2594    }
2595
2596    /// The number of messages processed by this actor.
2597    pub fn num_processed_messages(&self) -> u64 {
2598        self.inner.num_processed_messages.load(Ordering::SeqCst)
2599    }
2600
2601    /// The last message handler invoked by this actor.
2602    pub fn last_message_handler(&self) -> Option<HandlerInfo> {
2603        self.inner.last_message_handler.read().unwrap().clone()
2604    }
2605
2606    /// Total time spent processing messages, in microseconds.
2607    pub fn total_processing_time_us(&self) -> u64 {
2608        self.inner.total_processing_time_us.load(Ordering::SeqCst)
2609    }
2610
2611    /// Get parent instance cell, if it exists.
2612    pub fn parent(&self) -> Option<InstanceCell> {
2613        self.inner.parent.upgrade()
2614    }
2615
2616    /// The actor's type name.
2617    pub fn actor_type_name(&self) -> &str {
2618        self.inner.actor_type.type_name()
2619    }
2620
2621    /// Replace the published introspection attrs with a new bag.
2622    pub fn set_published_attrs(&self, attrs: hyperactor_config::Attrs) {
2623        *self.inner.published_attrs.write().unwrap() = Some(attrs);
2624    }
2625
2626    /// Set a single introspection attr, merging into the existing bag
2627    /// (or creating one if none exists).
2628    pub fn merge_published_attr<T: hyperactor_config::AttrValue>(
2629        &self,
2630        key: hyperactor_config::Key<T>,
2631        value: T,
2632    ) {
2633        self.inner
2634            .published_attrs
2635            .write()
2636            .unwrap()
2637            .get_or_insert_with(hyperactor_config::Attrs::new)
2638            .set(key, value);
2639    }
2640
2641    /// Read the published introspection attrs, if any.
2642    pub fn published_attrs(&self) -> Option<hyperactor_config::Attrs> {
2643        self.inner.published_attrs.read().unwrap().clone()
2644    }
2645
2646    /// Register a callback for resolving non-addressable children
2647    /// via `IntrospectMessage::QueryChild`.
2648    ///
2649    /// The callback runs on the actor's introspect task (a separate
2650    /// tokio task, not the actor's message loop), so it must be
2651    /// `Send + Sync` and must not access actor-mutable state.
2652    /// Capture cloned `Proc` references, not `&mut self`.
2653    pub fn set_query_child_handler(
2654        &self,
2655        handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
2656    ) {
2657        *self.inner.query_child_handler.write().unwrap() = Some(Box::new(handler));
2658    }
2659
2660    /// Invoke the registered QueryChild handler, if any.
2661    pub fn query_child(&self, child_ref: &crate::reference::Reference) -> Option<IntrospectResult> {
2662        let guard = self.inner.query_child_handler.read().unwrap();
2663        guard.as_ref().map(|handler| handler(child_ref))
2664    }
2665
2666    /// Whether this actor is infrastructure/system.
2667    pub fn is_system(&self) -> bool {
2668        self.inner.is_system.load(Ordering::Relaxed)
2669    }
2670
2671    /// Store a post-mortem snapshot for this actor in the proc's
2672    /// `terminated_snapshots` map. Called by the introspect task
2673    /// just before exiting on terminal status.
2674    ///
2675    /// Eviction policy when the retention cap is exceeded:
2676    /// 1. Evict cleanly-stopped actors first (no `failure_info`).
2677    /// 2. When only failed actors remain, evict the most recent
2678    ///    (by `occurred_at`), preserving the earliest failures
2679    ///    which are closest to the root cause.
2680    pub fn store_terminated_snapshot(&self, payload: crate::introspect::IntrospectResult) {
2681        let snapshots = &self.inner.proc.inner.terminated_snapshots;
2682        snapshots.insert(self.actor_id().clone(), payload);
2683        let max = hyperactor_config::global::get(crate::config::TERMINATED_SNAPSHOT_RETENTION);
2684        let excess = snapshots.len().saturating_sub(max);
2685        if excess > 0 {
2686            // Build entries for the eviction selector.
2687            let entries: Vec<_> = snapshots
2688                .iter()
2689                .map(|entry| {
2690                    let occurred_at =
2691                        serde_json::from_str::<hyperactor_config::Attrs>(&entry.value().attrs)
2692                            .ok()
2693                            .and_then(|attrs| {
2694                                // Presence of FAILURE_ERROR_MESSAGE means the actor failed.
2695                                attrs
2696                                    .get(crate::introspect::FAILURE_ERROR_MESSAGE)
2697                                    .cloned()?;
2698                                // Extract occurred_at timestamp for sorting.
2699                                attrs
2700                                    .get(crate::introspect::FAILURE_OCCURRED_AT)
2701                                    .map(|t| humantime::format_rfc3339(*t).to_string())
2702                            });
2703                    (entry.key().clone(), occurred_at)
2704                })
2705                .collect();
2706
2707            for key in select_eviction_candidates(&entries, excess) {
2708                snapshots.remove(&key);
2709            }
2710        }
2711    }
2712
2713    /// This is temporary so that we can share binding code between handle and instance.
2714    /// We should find some (better) way to consolidate the two.
2715    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> reference::ActorRef<R> {
2716        <R as Binds<A>>::bind(ports);
2717        // Signal: pre-registered via open_message_port() in
2718        // Instance::new(), handled by the actor loop's select!.
2719        // Ports::bind() here reuses the existing handle.
2720        //
2721        // Undeliverable: dispatched through the work queue to the
2722        // actor's Handler<Undeliverable<MessageEnvelope>>.
2723        //
2724        // IntrospectMessage: pre-registered via open_message_port()
2725        // in Instance::new(), handled by a dedicated introspect task.
2726        // NOT bound here — its port is registered via
2727        // bind_actor_port() directly.
2728        ports.bind::<Signal>();
2729        ports.bind::<Undeliverable<MessageEnvelope>>();
2730        // TODO: consider sharing `ports.bound` directly.
2731        for entry in ports.bound.iter() {
2732            self.inner
2733                .exported_named_ports
2734                .insert(*entry.key(), entry.value());
2735        }
2736        reference::ActorRef::attest(self.actor_id().clone())
2737    }
2738
2739    /// Attempt to downcast this cell to a concrete actor handle.
2740    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
2741        let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
2742        Some(ActorHandle::new(self.clone(), ports))
2743    }
2744
2745    /// Traverse the subtree rooted at this instance in pre-order.
2746    /// The callback receives each InstanceCell and its depth (root = 0).
2747    /// Children are visited in pid order for deterministic traversal.
2748    pub fn traverse<F>(&self, f: &mut F)
2749    where
2750        F: FnMut(&InstanceCell, usize),
2751    {
2752        self.traverse_inner(0, f);
2753    }
2754
2755    fn traverse_inner<F>(&self, depth: usize, f: &mut F)
2756    where
2757        F: FnMut(&InstanceCell, usize),
2758    {
2759        f(self, depth);
2760        // Collect and sort children by pid for deterministic traversal order
2761        let mut children: Vec<_> = self.child_iter().map(|r| r.value().clone()).collect();
2762        children.sort_by_key(|c| c.pid());
2763        for child in children {
2764            child.traverse_inner(depth + 1, f);
2765        }
2766    }
2767}
2768
2769impl Drop for InstanceCellState {
2770    fn drop(&mut self) {
2771        if let Some(parent) = self.maybe_unlink_parent() {
2772            tracing::debug!(
2773                "instance {} was dropped with parent {} still linked",
2774                self.actor_id,
2775                parent.actor_id()
2776            );
2777        }
2778        if self.proc.inner.instances.remove(&self.actor_id).is_none() {
2779            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
2780        }
2781    }
2782}
2783
2784/// A weak version of the InstanceCell. This is used to provide cyclical
2785/// linkage between actors without creating a strong reference cycle.
2786#[derive(Debug, Clone)]
2787pub struct WeakInstanceCell {
2788    inner: Weak<InstanceCellState>,
2789}
2790
2791impl Default for WeakInstanceCell {
2792    fn default() -> Self {
2793        Self::new()
2794    }
2795}
2796
2797impl WeakInstanceCell {
2798    /// Create a new weak instance cell that is never upgradeable.
2799    pub fn new() -> Self {
2800        Self { inner: Weak::new() }
2801    }
2802
2803    /// Upgrade this weak instance cell to a strong reference, if possible.
2804    pub fn upgrade(&self) -> Option<InstanceCell> {
2805        self.inner.upgrade().map(InstanceCell::wrap)
2806    }
2807}
2808
2809/// A polymorphic dictionary that stores ports for an actor's handlers.
2810/// The interface memoizes the ports so that they are reused. We do not
2811/// (yet) support stable identifiers across multiple instances of the same
2812/// actor.
2813pub struct Ports<A: Actor> {
2814    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
2815    bound: DashMap<u64, &'static str>,
2816    mailbox: Mailbox,
2817    workq: OrderedSender<WorkCell<A>>,
2818}
2819
2820impl<A: Actor> Ports<A> {
2821    fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2822        Self {
2823            ports: DashMap::new(),
2824            bound: DashMap::new(),
2825            mailbox,
2826            workq,
2827        }
2828    }
2829
2830    /// Get a port for the Handler<M> of actor A.
2831    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2832    where
2833        A: Handler<M>,
2834    {
2835        let key = TypeId::of::<M>();
2836        match self.ports.entry(key) {
2837            Entry::Vacant(entry) => {
2838                // Some special case hackery, but it keeps the rest of the code (relatively) simple.
2839                assert_ne!(
2840                    key,
2841                    TypeId::of::<Signal>(),
2842                    "cannot provision Signal port through `Ports::get`"
2843                );
2844                assert_ne!(
2845                    key,
2846                    TypeId::of::<IntrospectMessage>(),
2847                    "cannot provision IntrospectMessage port through `Ports::get`"
2848                );
2849
2850                let type_info = TypeInfo::get_by_typeid(key);
2851                let workq = self.workq.clone();
2852                let actor_id = self.mailbox.actor_id().to_string();
2853                let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2854                    let seq_info = headers.get(SEQ_INFO);
2855
2856                    let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
2857                        Box::pin(async move {
2858                            // SAFETY: we guarantee that the passed type_info is for type M.
2859                            unsafe {
2860                                instance
2861                                    .handle_message(actor, type_info, headers, msg)
2862                                    .await
2863                            }
2864                        })
2865                    });
2866                    ACTOR_MESSAGE_QUEUE_SIZE.add(
2867                        1,
2868                        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2869                    );
2870                    if workq.enable_buffering {
2871                        match seq_info {
2872                            Some(SeqInfo::Session { session_id, seq }) => {
2873                                // TODO: return the message contained in the error instead of dropping them when converting
2874                                // to anyhow::Error. In that way, the message can be picked up by mailbox and returned to sender.
2875                                workq.send(session_id, seq, work).map_err(|e| match e {
2876                                    OrderedSenderError::InvalidZeroSeq(_) => {
2877                                        let error_msg = format!(
2878                                             "in enqueue func for {}, got seq 0 for message type {}",
2879                                            actor_id,
2880                                            std::any::type_name::<M>(),
2881                                        );
2882                                        tracing::error!(error_msg);
2883                                        anyhow::anyhow!(error_msg)
2884                                    }
2885                                    OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2886                                    OrderedSenderError::FlushError(e) => e,
2887                                })
2888                            }
2889                            Some(SeqInfo::Direct) => {
2890                                workq.direct_send(work).map_err(anyhow::Error::from)
2891                            }
2892                            None => {
2893                                let error_msg = format!(
2894                                    "in enqueue func for {}, buffering is enabled, but SEQ_INFO is not set for message type {}",
2895                                    actor_id,
2896                                    std::any::type_name::<M>(),
2897                                    );
2898                                tracing::error!(error_msg);
2899                                anyhow::bail!(error_msg);
2900                            }
2901                        }
2902                    } else {
2903                        workq.direct_send(work).map_err(anyhow::Error::from)
2904                    }
2905                });
2906                entry.insert(Box::new(port.clone()));
2907                port
2908            }
2909            Entry::Occupied(entry) => {
2910                let port = entry.get();
2911                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2912            }
2913        }
2914    }
2915
2916    /// Open a (typed) message port as in [`get`], but return a port receiver instead of dispatching
2917    /// the underlying handler.
2918    pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2919        match self.ports.entry(TypeId::of::<M>()) {
2920            Entry::Vacant(entry) => {
2921                let (port, receiver) = self.mailbox.open_port();
2922                entry.insert(Box::new(port.clone()));
2923                Some((port, receiver))
2924            }
2925            Entry::Occupied(_) => None,
2926        }
2927    }
2928
2929    /// Bind the given message type to its actor port.
2930    pub fn bind<M: RemoteMessage>(&self)
2931    where
2932        A: Handler<M>,
2933    {
2934        let port_index = M::port();
2935        match self.bound.entry(port_index) {
2936            Entry::Vacant(entry) => {
2937                self.get::<M>().bind_actor_port();
2938                entry.insert(M::typename());
2939            }
2940            Entry::Occupied(entry) => {
2941                assert_eq!(
2942                    *entry.get(),
2943                    M::typename(),
2944                    "bind {}: port index {} already bound to type {}",
2945                    M::typename(),
2946                    port_index,
2947                    entry.get(),
2948                );
2949            }
2950        }
2951    }
2952}
2953
2954#[cfg(test)]
2955mod tests {
2956    use std::assert_matches::assert_matches;
2957    use std::sync::atomic::AtomicBool;
2958
2959    use hyperactor_macros::export;
2960    use serde_json::json;
2961    use timed_test::async_timed_test;
2962    use tokio::sync::Barrier;
2963    use tokio::sync::oneshot;
2964    use tracing::Level;
2965    use tracing_subscriber::layer::SubscriberExt;
2966    use tracing_test::internal::logs_with_scope_contain;
2967
2968    use super::*;
2969    // needed for in-crate macro expansion
2970    use crate as hyperactor;
2971    use crate::HandleClient;
2972    use crate::Handler;
2973    use crate::testing::proc_supervison::ProcSupervisionCoordinator;
2974    use crate::testing::process_assertion::assert_termination;
2975
2976    #[derive(Debug, Default)]
2977    #[export]
2978    struct TestActor;
2979
2980    impl Actor for TestActor {}
2981
2982    #[derive(Handler, HandleClient, Debug)]
2983    enum TestActorMessage {
2984        Reply(oneshot::Sender<()>),
2985        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2986        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2987        Noop(),
2988        Fail(anyhow::Error),
2989        Panic(String),
2990        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2991    }
2992
2993    impl TestActor {
2994        async fn spawn_child(
2995            cx: &impl context::Actor,
2996            parent: &ActorHandle<TestActor>,
2997        ) -> ActorHandle<TestActor> {
2998            let (tx, rx) = oneshot::channel();
2999            parent.send(cx, TestActorMessage::Spawn(tx)).unwrap();
3000            rx.await.unwrap()
3001        }
3002    }
3003
3004    #[async_trait]
3005    #[crate::handle(TestActorMessage)]
3006    impl TestActorMessageHandler for TestActor {
3007        async fn reply(
3008            &mut self,
3009            _cx: &crate::Context<Self>,
3010            sender: oneshot::Sender<()>,
3011        ) -> Result<(), anyhow::Error> {
3012            sender.send(()).unwrap();
3013            Ok(())
3014        }
3015
3016        async fn wait(
3017            &mut self,
3018            _cx: &crate::Context<Self>,
3019            sender: oneshot::Sender<()>,
3020            receiver: oneshot::Receiver<()>,
3021        ) -> Result<(), anyhow::Error> {
3022            sender.send(()).unwrap();
3023            receiver.await.unwrap();
3024            Ok(())
3025        }
3026
3027        async fn forward(
3028            &mut self,
3029            cx: &crate::Context<Self>,
3030            destination: ActorHandle<TestActor>,
3031            message: Box<TestActorMessage>,
3032        ) -> Result<(), anyhow::Error> {
3033            // TODO: this needn't be async
3034            destination.send(cx, *message)?;
3035            Ok(())
3036        }
3037
3038        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
3039            Ok(())
3040        }
3041
3042        async fn fail(
3043            &mut self,
3044            _cx: &crate::Context<Self>,
3045            err: anyhow::Error,
3046        ) -> Result<(), anyhow::Error> {
3047            Err(err)
3048        }
3049
3050        async fn panic(
3051            &mut self,
3052            _cx: &crate::Context<Self>,
3053            err_msg: String,
3054        ) -> Result<(), anyhow::Error> {
3055            panic!("{}", err_msg);
3056        }
3057
3058        async fn spawn(
3059            &mut self,
3060            cx: &crate::Context<Self>,
3061            reply: oneshot::Sender<ActorHandle<TestActor>>,
3062        ) -> Result<(), anyhow::Error> {
3063            let handle = TestActor.spawn(cx)?;
3064            reply.send(handle).unwrap();
3065            Ok(())
3066        }
3067    }
3068
3069    #[tracing_test::traced_test]
3070    #[async_timed_test(timeout_secs = 30)]
3071    async fn test_spawn_actor() {
3072        let proc = Proc::local();
3073        let (client, _) = proc.instance("client").unwrap();
3074        let handle = proc.spawn("test", TestActor).unwrap();
3075
3076        // Check on the join handle.
3077        assert!(logs_contain(
3078            format!(
3079                "{}: spawned with {:?}",
3080                handle.actor_id(),
3081                handle.cell().actor_task_handle().unwrap(),
3082            )
3083            .as_str()
3084        ));
3085
3086        let mut state = handle.status().clone();
3087
3088        // Send a ping-pong to the actor. Wait for the actor to become idle.
3089
3090        let (tx, rx) = oneshot::channel::<()>();
3091        handle.send(&client, TestActorMessage::Reply(tx)).unwrap();
3092        rx.await.unwrap();
3093
3094        state
3095            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
3096            .await
3097            .unwrap();
3098
3099        // Make sure we enter processing state while the actor is handling a message.
3100        let (enter_tx, enter_rx) = oneshot::channel::<()>();
3101        let (exit_tx, exit_rx) = oneshot::channel::<()>();
3102
3103        handle
3104            .send(&client, TestActorMessage::Wait(enter_tx, exit_rx))
3105            .unwrap();
3106        enter_rx.await.unwrap();
3107        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= std::time::SystemTime::now());
3108        exit_tx.send(()).unwrap();
3109
3110        state
3111            .wait_for(|state| matches!(*state, ActorStatus::Idle))
3112            .await
3113            .unwrap();
3114
3115        handle.drain_and_stop("test").unwrap();
3116        handle.await;
3117        assert_matches!(&*state.borrow(), ActorStatus::Stopped(reason) if reason == "test");
3118    }
3119
3120    #[async_timed_test(timeout_secs = 30)]
3121    async fn test_proc_actors_messaging() {
3122        let proc = Proc::local();
3123        let (client, _) = proc.instance("client").unwrap();
3124        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
3125        let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
3126        let (tx, rx) = oneshot::channel::<()>();
3127        let reply_message = TestActorMessage::Reply(tx);
3128        first
3129            .send(
3130                &client,
3131                TestActorMessage::Forward(second, Box::new(reply_message)),
3132            )
3133            .unwrap();
3134        rx.await.unwrap();
3135    }
3136
3137    #[derive(Debug, Default)]
3138    #[export]
3139    struct LookupTestActor;
3140
3141    impl Actor for LookupTestActor {}
3142
3143    #[derive(Handler, HandleClient, Debug)]
3144    enum LookupTestMessage {
3145        ActorExists(
3146            reference::ActorRef<TestActor>,
3147            #[reply] reference::OncePortRef<bool>,
3148        ),
3149    }
3150
3151    #[async_trait]
3152    #[crate::handle(LookupTestMessage)]
3153    impl LookupTestMessageHandler for LookupTestActor {
3154        async fn actor_exists(
3155            &mut self,
3156            cx: &crate::Context<Self>,
3157            actor_ref: reference::ActorRef<TestActor>,
3158        ) -> Result<bool, anyhow::Error> {
3159            Ok(actor_ref.downcast_handle(cx).is_some())
3160        }
3161    }
3162
3163    #[async_timed_test(timeout_secs = 30)]
3164    async fn test_actor_lookup() {
3165        let proc = Proc::local();
3166        let (client, _handle) = proc.instance("client").unwrap();
3167
3168        let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
3169        let target_actor_ref = target_actor.bind();
3170        let lookup_actor = proc
3171            .spawn::<LookupTestActor>("lookup", LookupTestActor)
3172            .unwrap();
3173
3174        assert!(
3175            lookup_actor
3176                .actor_exists(&client, target_actor_ref.clone())
3177                .await
3178                .unwrap()
3179        );
3180
3181        // Make up a child actor. It shouldn't exist.
3182        assert!(
3183            !lookup_actor
3184                .actor_exists(
3185                    &client,
3186                    reference::ActorRef::attest(target_actor.actor_id().child_id(123).clone())
3187                )
3188                .await
3189                .unwrap()
3190        );
3191        // A wrongly-typed actor ref should also not obtain.
3192        assert!(
3193            !lookup_actor
3194                .actor_exists(
3195                    &client,
3196                    reference::ActorRef::attest(lookup_actor.actor_id().clone())
3197                )
3198                .await
3199                .unwrap()
3200        );
3201
3202        target_actor.drain_and_stop("test").unwrap();
3203        target_actor.await;
3204
3205        assert!(
3206            !lookup_actor
3207                .actor_exists(&client, target_actor_ref)
3208                .await
3209                .unwrap()
3210        );
3211
3212        lookup_actor.drain_and_stop("test").unwrap();
3213        lookup_actor.await;
3214    }
3215
3216    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
3217        assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
3218        assert_eq!(
3219            child.inner.parent.upgrade().unwrap().actor_id(),
3220            parent.actor_id()
3221        );
3222        assert_matches!(
3223            parent.inner.children.get(&child.pid()),
3224            Some(node) if node.actor_id() == child.actor_id()
3225        );
3226    }
3227
3228    #[tracing_test::traced_test]
3229    #[async_timed_test(timeout_secs = 30)]
3230    async fn test_spawn_child() {
3231        let proc = Proc::local();
3232        let (client, _) = proc.instance("client").unwrap();
3233
3234        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
3235        let second = TestActor::spawn_child(&client, &first).await;
3236        let third = TestActor::spawn_child(&client, &second).await;
3237
3238        // Check we've got the join handles.
3239        assert!(logs_with_scope_contain(
3240            "hyperactor::proc",
3241            format!(
3242                "{}: spawned with {:?}",
3243                first.actor_id(),
3244                first.cell().actor_task_handle().unwrap()
3245            )
3246            .as_str()
3247        ));
3248        assert!(logs_with_scope_contain(
3249            "hyperactor::proc",
3250            format!(
3251                "{}: spawned with {:?}",
3252                second.actor_id(),
3253                second.cell().actor_task_handle().unwrap()
3254            )
3255            .as_str()
3256        ));
3257        assert!(logs_with_scope_contain(
3258            "hyperactor::proc",
3259            format!(
3260                "{}: spawned with {:?}",
3261                third.actor_id(),
3262                third.cell().actor_task_handle().unwrap()
3263            )
3264            .as_str()
3265        ));
3266
3267        // These are allocated in sequence:
3268        assert_eq!(first.actor_id().proc_id(), proc.proc_id());
3269        assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
3270        assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
3271
3272        // Supervision tree is constructed correctly.
3273        validate_link(third.cell(), second.cell());
3274        validate_link(second.cell(), first.cell());
3275        assert!(first.cell().inner.parent.upgrade().is_none());
3276
3277        // Supervision tree is torn down correctly.
3278        // Once each actor is stopped, it should have no linked children.
3279        let third_cell = third.cell().clone();
3280        third.drain_and_stop("test").unwrap();
3281        third.await;
3282        assert!(third_cell.inner.children.is_empty());
3283        drop(third_cell);
3284        validate_link(second.cell(), first.cell());
3285
3286        let second_cell = second.cell().clone();
3287        second.drain_and_stop("test").unwrap();
3288        second.await;
3289        assert!(second_cell.inner.children.is_empty());
3290        drop(second_cell);
3291
3292        let first_cell = first.cell().clone();
3293        first.drain_and_stop("test").unwrap();
3294        first.await;
3295        assert!(first_cell.inner.children.is_empty());
3296    }
3297
3298    #[async_timed_test(timeout_secs = 30)]
3299    async fn test_child_lifecycle() {
3300        let proc = Proc::local();
3301        let (client, _) = proc.instance("client").unwrap();
3302
3303        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3304        let root_1 = TestActor::spawn_child(&client, &root).await;
3305        let root_2 = TestActor::spawn_child(&client, &root).await;
3306        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3307
3308        root.drain_and_stop("test").unwrap();
3309        root.await;
3310
3311        for actor in [root_1, root_2, root_2_1] {
3312            assert!(actor.send(&client, TestActorMessage::Noop()).is_err());
3313            assert_matches!(actor.await, ActorStatus::Stopped(reason) if reason == "parent stopping");
3314        }
3315    }
3316
3317    #[async_timed_test(timeout_secs = 30)]
3318    async fn test_parent_failure() {
3319        let proc = Proc::local();
3320        let (client, _) = proc.instance("client").unwrap();
3321        // Need to set a supervison coordinator for this Proc because there will
3322        // be actor failure(s) in this test which trigger supervision.
3323        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3324
3325        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3326        let root_1 = TestActor::spawn_child(&client, &root).await;
3327        let root_2 = TestActor::spawn_child(&client, &root).await;
3328        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3329
3330        root_2
3331            .send(
3332                &client,
3333                TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
3334            )
3335            .unwrap();
3336        let _root_2_actor_id = root_2.actor_id().clone();
3337        assert_matches!(
3338            root_2.await,
3339            ActorStatus::Failed(err) if err.to_string() == "some random failure"
3340        );
3341
3342        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
3343        // stopped by a parent failure?
3344        // Currently the parent fails with an error related to the child's failure.
3345        assert_matches!(
3346            root.await,
3347            ActorStatus::Failed(err) if err.to_string().contains("some random failure")
3348        );
3349        assert_matches!(root_2_1.await, ActorStatus::Stopped(_));
3350        assert_matches!(root_1.await, ActorStatus::Stopped(_));
3351    }
3352
3353    #[async_timed_test(timeout_secs = 30)]
3354    async fn test_multi_handler() {
3355        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
3356        // public interfaces to multi-handlers. This will be fixed shortly.
3357
3358        #[derive(Debug)]
3359        struct TestActor(Arc<AtomicUsize>);
3360
3361        #[async_trait]
3362        impl Actor for TestActor {}
3363
3364        #[async_trait]
3365        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
3366            async fn handle(
3367                &mut self,
3368                cx: &crate::Context<Self>,
3369                message: OncePortHandle<PortHandle<usize>>,
3370            ) -> anyhow::Result<()> {
3371                message.send(cx, cx.port())?;
3372                Ok(())
3373            }
3374        }
3375
3376        #[async_trait]
3377        impl Handler<usize> for TestActor {
3378            async fn handle(
3379                &mut self,
3380                _cx: &crate::Context<Self>,
3381                message: usize,
3382            ) -> anyhow::Result<()> {
3383                self.0.fetch_add(message, Ordering::SeqCst);
3384                Ok(())
3385            }
3386        }
3387
3388        let proc = Proc::local();
3389        let state = Arc::new(AtomicUsize::new(0));
3390        let actor = TestActor(state.clone());
3391        let handle = proc.spawn::<TestActor>("test", actor).unwrap();
3392        let (client, _) = proc.instance("client").unwrap();
3393        let (tx, rx) = client.open_once_port();
3394        handle.send(&client, tx).unwrap();
3395        let usize_handle = rx.recv().await.unwrap();
3396        usize_handle.send(&client, 123).unwrap();
3397
3398        handle.drain_and_stop("test").unwrap();
3399        handle.await;
3400
3401        assert_eq!(state.load(Ordering::SeqCst), 123);
3402    }
3403
3404    #[async_timed_test(timeout_secs = 30)]
3405    async fn test_actor_panic() {
3406        // Need this custom hook to store panic backtrace in task_local.
3407        panic_handler::set_panic_hook();
3408
3409        let proc = Proc::local();
3410        // Need to set a supervison coordinator for this Proc because there will
3411        // be actor failure(s) in this test which trigger supervision.
3412        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3413
3414        let (client, _handle) = proc.instance("client").unwrap();
3415        let actor_handle = proc.spawn("test", TestActor).unwrap();
3416        actor_handle
3417            .panic(&client, "some random failure".to_string())
3418            .await
3419            .unwrap();
3420        let actor_status = actor_handle.await;
3421
3422        // Note: even when the test passes, the panic stacktrace will still be
3423        // printed to stderr because that is the behavior controlled by the panic
3424        // hook.
3425        assert_matches!(actor_status, ActorStatus::Failed(_));
3426        if let ActorStatus::Failed(err) = actor_status {
3427            let error_msg = err.to_string();
3428            // Verify panic message is captured
3429            assert!(error_msg.contains("some random failure"));
3430            // Verify backtrace is captured. Note the backtrace message might
3431            // change in the future. If that happens, we need to update this
3432            // statement with something up-to-date.
3433            assert!(error_msg.contains("library/std/src/panicking.rs"));
3434        }
3435    }
3436
3437    #[async_timed_test(timeout_secs = 30)]
3438    async fn test_local_supervision_propagation() {
3439        hyperactor_telemetry::initialize_logging_for_test();
3440
3441        #[derive(Debug)]
3442        struct TestActor(Arc<AtomicBool>, bool);
3443
3444        #[async_trait]
3445        impl Actor for TestActor {
3446            async fn handle_supervision_event(
3447                &mut self,
3448                _this: &Instance<Self>,
3449                _event: &ActorSupervisionEvent,
3450            ) -> Result<bool, anyhow::Error> {
3451                if !self.1 {
3452                    return Ok(false);
3453                }
3454
3455                tracing::error!(
3456                    "{}: supervision event received: {:?}",
3457                    _this.self_id(),
3458                    _event
3459                );
3460                self.0.store(true, Ordering::SeqCst);
3461                Ok(true)
3462            }
3463        }
3464
3465        #[async_trait]
3466        impl Handler<String> for TestActor {
3467            async fn handle(
3468                &mut self,
3469                cx: &crate::Context<Self>,
3470                message: String,
3471            ) -> anyhow::Result<()> {
3472                tracing::info!("{} received message: {}", cx.self_id(), message);
3473                Err(anyhow::anyhow!(message))
3474            }
3475        }
3476
3477        let proc = Proc::local();
3478        let (client, _) = proc.instance("client").unwrap();
3479        let (reported_event, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3480
3481        let root_state = Arc::new(AtomicBool::new(false));
3482        let root_1_state = Arc::new(AtomicBool::new(false));
3483        let root_1_1_state = Arc::new(AtomicBool::new(false));
3484        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
3485        let root_2_state = Arc::new(AtomicBool::new(false));
3486        let root_2_1_state = Arc::new(AtomicBool::new(false));
3487
3488        let root = proc
3489            .spawn::<TestActor>("root", TestActor(root_state.clone(), false))
3490            .unwrap();
3491        let root_1 = proc
3492            .spawn_child::<TestActor>(
3493                root.cell().clone(),
3494                TestActor(
3495                    root_1_state.clone(),
3496                    true, /* set true so children's event stops here */
3497                ),
3498            )
3499            .unwrap();
3500        let root_1_1 = proc
3501            .spawn_child::<TestActor>(
3502                root_1.cell().clone(),
3503                TestActor(root_1_1_state.clone(), false),
3504            )
3505            .unwrap();
3506        let root_1_1_1 = proc
3507            .spawn_child::<TestActor>(
3508                root_1_1.cell().clone(),
3509                TestActor(root_1_1_1_state.clone(), false),
3510            )
3511            .unwrap();
3512        let root_2 = proc
3513            .spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
3514            .unwrap();
3515        let root_2_1 = proc
3516            .spawn_child::<TestActor>(
3517                root_2.cell().clone(),
3518                TestActor(root_2_1_state.clone(), false),
3519            )
3520            .unwrap();
3521
3522        // fail `root_1_1_1`, the supervision msg should be propagated to
3523        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
3524        root_1_1_1
3525            .send::<String>(&client, "some random failure".into())
3526            .unwrap();
3527
3528        // fail `root_2_1`, the supervision msg should be propagated to
3529        // ProcSupervisionCoordinator.
3530        root_2_1
3531            .send::<String>(&client, "some random failure".into())
3532            .unwrap();
3533
3534        tokio::time::sleep(Duration::from_secs(1)).await;
3535
3536        assert!(!root_state.load(Ordering::SeqCst));
3537        assert!(root_1_state.load(Ordering::SeqCst));
3538        assert!(!root_1_1_state.load(Ordering::SeqCst));
3539        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
3540        assert!(!root_2_state.load(Ordering::SeqCst));
3541        assert!(!root_2_1_state.load(Ordering::SeqCst));
3542        assert_eq!(
3543            reported_event.event().map(|e| e.actor_id.clone()),
3544            Some(root_2_1.actor_id().clone())
3545        );
3546    }
3547
3548    #[async_timed_test(timeout_secs = 30)]
3549    async fn test_instance() {
3550        #[derive(Debug, Default)]
3551        struct TestActor;
3552
3553        impl Actor for TestActor {}
3554
3555        #[async_trait]
3556        impl Handler<(String, reference::PortRef<String>)> for TestActor {
3557            async fn handle(
3558                &mut self,
3559                cx: &crate::Context<Self>,
3560                (message, port): (String, reference::PortRef<String>),
3561            ) -> anyhow::Result<()> {
3562                port.send(cx, message)?;
3563                Ok(())
3564            }
3565        }
3566
3567        let proc = Proc::local();
3568
3569        let (instance, handle) = proc.instance("my_test_actor").unwrap();
3570
3571        let child_actor = TestActor.spawn(&instance).unwrap();
3572
3573        let (port, mut receiver) = instance.open_port();
3574        child_actor
3575            .send(&instance, ("hello".to_string(), port.bind()))
3576            .unwrap();
3577
3578        let message = receiver.recv().await.unwrap();
3579        assert_eq!(message, "hello");
3580
3581        child_actor.drain_and_stop("test").unwrap();
3582        child_actor.await;
3583
3584        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3585        drop(instance);
3586        assert_matches!(*handle.status().borrow(), ActorStatus::Stopped(_));
3587        handle.await;
3588    }
3589
3590    #[tokio::test]
3591    async fn test_proc_terminate_without_coordinator() {
3592        if std::env::var("CARGO_TEST").is_ok() {
3593            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3594            return;
3595        }
3596
3597        let process = async {
3598            let proc = Proc::local();
3599            // Intentionally not setting a proc supervison coordinator. This
3600            // should cause the process to terminate.
3601            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
3602            let root = proc.spawn("root", TestActor).unwrap();
3603            let (client, _handle) = proc.instance("client").unwrap();
3604            root.fail(&client, anyhow::anyhow!("some random failure"))
3605                .await
3606                .unwrap();
3607            // It is okay to sleep a long time here, because we expect this
3608            // process to be terminated way before the sleep ends due to the
3609            // missing proc supervison coordinator.
3610            tokio::time::sleep(Duration::from_secs(30)).await;
3611        };
3612
3613        assert_termination(|| process, 1).await.unwrap();
3614    }
3615
3616    fn trace_and_block(fut: impl Future) {
3617        tracing::subscriber::with_default(
3618            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3619            || {
3620                tokio::runtime::Builder::new_current_thread()
3621                    .enable_all()
3622                    .build()
3623                    .unwrap()
3624                    .block_on(fut)
3625            },
3626        );
3627    }
3628
3629    #[ignore = "until trace recording is turned back on"]
3630    #[test]
3631    fn test_handler_logging() {
3632        #[derive(Debug, Default)]
3633        struct LoggingActor;
3634
3635        impl Actor for LoggingActor {}
3636
3637        impl LoggingActor {
3638            async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
3639                let barrier = Arc::new(Barrier::new(2));
3640                handle.send(cx, barrier.clone()).unwrap();
3641                barrier.wait().await;
3642            }
3643        }
3644
3645        #[async_trait]
3646        impl Handler<String> for LoggingActor {
3647            async fn handle(
3648                &mut self,
3649                _cx: &crate::Context<Self>,
3650                message: String,
3651            ) -> anyhow::Result<()> {
3652                tracing::info!("{}", message);
3653                Ok(())
3654            }
3655        }
3656
3657        #[async_trait]
3658        impl Handler<u64> for LoggingActor {
3659            async fn handle(
3660                &mut self,
3661                _cx: &crate::Context<Self>,
3662                message: u64,
3663            ) -> anyhow::Result<()> {
3664                tracing::event!(Level::INFO, number = message);
3665                Ok(())
3666            }
3667        }
3668
3669        #[async_trait]
3670        impl Handler<Arc<Barrier>> for LoggingActor {
3671            async fn handle(
3672                &mut self,
3673                _cx: &crate::Context<Self>,
3674                message: Arc<Barrier>,
3675            ) -> anyhow::Result<()> {
3676                message.wait().await;
3677                Ok(())
3678            }
3679        }
3680
3681        #[async_trait]
3682        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3683            async fn handle(
3684                &mut self,
3685                _cx: &crate::Context<Self>,
3686                barriers: Arc<(Barrier, Barrier)>,
3687            ) -> anyhow::Result<()> {
3688                let inner = tracing::span!(Level::INFO, "child_span");
3689                let _inner_guard = inner.enter();
3690                barriers.0.wait().await;
3691                barriers.1.wait().await;
3692                Ok(())
3693            }
3694        }
3695
3696        trace_and_block(async {
3697            let proc = Proc::local();
3698            let (client, _) = proc.instance("client").unwrap();
3699            let handle = LoggingActor.spawn_detached().unwrap();
3700            handle.send(&client, "hello world".to_string()).unwrap();
3701            handle
3702                .send(&client, "hello world again".to_string())
3703                .unwrap();
3704            handle.send(&client, 123u64).unwrap();
3705
3706            LoggingActor::wait(&client, &handle).await;
3707
3708            let events = handle.cell().inner.recording.tail();
3709            assert_eq!(events.len(), 3);
3710            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3711            assert_eq!(
3712                events[1].json_value(),
3713                json!({ "message": "hello world again" })
3714            );
3715            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3716
3717            let stacks = {
3718                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3719                handle.send(&client, Arc::clone(&barriers)).unwrap();
3720                barriers.0.wait().await;
3721                let stacks = handle.cell().inner.recording.stacks();
3722                barriers.1.wait().await;
3723                stacks
3724            };
3725            assert_eq!(stacks.len(), 1);
3726            assert_eq!(stacks[0].len(), 1);
3727            assert_eq!(stacks[0][0].name(), "child_span");
3728        })
3729    }
3730
3731    #[async_timed_test(timeout_secs = 30)]
3732    async fn test_mailbox_closed_with_owner_stopped_reason() {
3733        use crate::actor::ActorStatus;
3734        use crate::mailbox::MailboxErrorKind;
3735        use crate::mailbox::MailboxSenderErrorKind;
3736
3737        let proc = Proc::local();
3738        let (client, _) = proc.instance("client").unwrap();
3739        let actor_handle = proc.spawn("test", TestActor).unwrap();
3740
3741        // Clone the handle before awaiting since await consumes the handle
3742        let handle_for_send = actor_handle.clone();
3743
3744        // Stop the actor gracefully
3745        actor_handle.drain_and_stop("healthy shutdown").unwrap();
3746        actor_handle.await;
3747
3748        // Try to send a message to the stopped actor
3749        let result = handle_for_send.send(&client, TestActorMessage::Noop());
3750
3751        assert!(result.is_err(), "send should fail when actor is stopped");
3752        let err = result.unwrap_err();
3753        assert_matches!(
3754            err.kind(),
3755            MailboxSenderErrorKind::Mailbox(mailbox_err)
3756                if matches!(
3757                    mailbox_err.kind(),
3758                    MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "healthy shutdown"
3759                )
3760        );
3761    }
3762
3763    #[async_timed_test(timeout_secs = 30)]
3764    async fn test_mailbox_closed_with_owner_failed_reason() {
3765        use crate::actor::ActorErrorKind;
3766        use crate::actor::ActorStatus;
3767        use crate::mailbox::MailboxErrorKind;
3768        use crate::mailbox::MailboxSenderErrorKind;
3769
3770        let proc = Proc::local();
3771        let (client, _) = proc.instance("client").unwrap();
3772        // Need to set a supervison coordinator for this Proc because there will
3773        // be actor failure(s) in this test which trigger supervision.
3774        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3775
3776        let actor_handle = proc.spawn("test", TestActor).unwrap();
3777
3778        // Clone the handle before awaiting since await consumes the handle
3779        let handle_for_send = actor_handle.clone();
3780
3781        // Cause the actor to fail
3782        actor_handle
3783            .send(
3784                &client,
3785                TestActorMessage::Fail(anyhow::anyhow!("intentional failure")),
3786            )
3787            .unwrap();
3788        actor_handle.await;
3789
3790        // Try to send a message to the failed actor
3791        let result = handle_for_send.send(&client, TestActorMessage::Noop());
3792
3793        assert!(result.is_err(), "send should fail when actor has failed");
3794        let err = result.unwrap_err();
3795        assert_matches!(
3796            err.kind(),
3797            MailboxSenderErrorKind::Mailbox(mailbox_err)
3798                if matches!(
3799                    mailbox_err.kind(),
3800                    MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg)))
3801                        if msg.contains("intentional failure")
3802                )
3803        );
3804    }
3805
3806    /// Wait for a terminated snapshot to appear for the given actor.
3807    /// The introspect task runs in a separate tokio task and may not
3808    /// have stored the snapshot by the time `handle.await` returns.
3809    async fn wait_for_terminated_snapshot(
3810        proc: &Proc,
3811        actor_id: &reference::ActorId,
3812    ) -> crate::introspect::IntrospectResult {
3813        // Yield to let the introspect task run, then poll. Use a
3814        // combination of yields (for fast paths) and sleeps (to
3815        // avoid busy-spinning if the scheduler is loaded).
3816        for i in 0..1000 {
3817            if let Some(snapshot) = proc.terminated_snapshot(actor_id) {
3818                return snapshot;
3819            }
3820            if i < 50 {
3821                tokio::task::yield_now().await;
3822            } else {
3823                tokio::time::sleep(Duration::from_millis(50)).await;
3824            }
3825        }
3826        panic!("timed out waiting for terminated snapshot for {}", actor_id);
3827    }
3828
3829    // Verifies that when an actor is stopped, the proc eventually
3830    // records a "terminated snapshot" for it (written by the
3831    // introspect task, which runs asynchronously). The test asserts
3832    // the snapshot is absent while the actor is live, then stops the
3833    // actor, waits for the introspect task to observe the terminal
3834    // state, and confirms:
3835    //   - the stored snapshot reports a `stopped:*` actor_status, and
3836    //   - the actor id moves from the live set to the terminated set.
3837    #[async_timed_test(timeout_secs = 30)]
3838    async fn test_terminated_snapshot_stored_on_stop() {
3839        let proc = Proc::local();
3840        let (_client, _client_handle) = proc.instance("client").unwrap();
3841
3842        let handle = proc.spawn::<TestActor>("actor", TestActor).unwrap();
3843        let actor_id = handle.actor_id().clone();
3844
3845        // Actor is live — no terminated snapshot yet.
3846        assert!(proc.terminated_snapshot(&actor_id).is_none());
3847        assert!(!proc.all_terminated_actor_ids().contains(&actor_id));
3848
3849        // Stop the actor and wait for it to fully terminate.
3850        handle.drain_and_stop("test").unwrap();
3851        handle.await;
3852
3853        // The introspect task runs in a separate tokio task; wait for
3854        // it to observe the terminal status and store the snapshot.
3855        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3856        let attrs: hyperactor_config::Attrs =
3857            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3858        let status = attrs
3859            .get(crate::introspect::STATUS)
3860            .expect("must have status");
3861        assert!(
3862            status.starts_with("stopped"),
3863            "expected stopped status, got: {}",
3864            status
3865        );
3866
3867        // Actor should appear in terminated IDs but not in live IDs.
3868        assert!(proc.all_terminated_actor_ids().contains(&actor_id));
3869        assert!(
3870            !proc.all_actor_ids().contains(&actor_id),
3871            "stopped actor should not appear in live actor IDs"
3872        );
3873    }
3874
3875    // Verifies that an actor failure results in a terminated snapshot
3876    // being stored. The test installs a ProcSupervisionCoordinator
3877    // (required for failure handling), spawns an actor, triggers a
3878    // failure via a message, waits for the actor to terminate, then
3879    // waits for the introspect task to persist the terminal snapshot
3880    // and asserts the snapshot reports a `failed:*` actor_status.
3881    #[async_timed_test(timeout_secs = 30)]
3882    async fn test_terminated_snapshot_stored_on_failure() {
3883        let proc = Proc::local();
3884        let (client, _client_handle) = proc.instance("client").unwrap();
3885        // Supervision coordinator required for actor failure handling.
3886        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3887
3888        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3889        let actor_id = handle.actor_id().clone();
3890
3891        // Trigger a failure.
3892        handle
3893            .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3894            .unwrap();
3895        handle.await;
3896
3897        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3898        let attrs: hyperactor_config::Attrs =
3899            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3900        let status = attrs
3901            .get(crate::introspect::STATUS)
3902            .expect("must have status");
3903        assert!(
3904            status.starts_with("failed"),
3905            "expected failed status, got: {}",
3906            status
3907        );
3908    }
3909
3910    // Exercises FI-1/FI-2 (see introspect.rs module-scope comment).
3911    #[async_timed_test(timeout_secs = 30)]
3912    async fn test_supervision_event_stored_on_failure() {
3913        let proc = Proc::local();
3914        let (client, _client_handle) = proc.instance("client").unwrap();
3915        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3916
3917        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3918        let actor_id = handle.actor_id().clone();
3919        let cell = handle.cell().clone();
3920
3921        handle
3922            .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3923            .unwrap();
3924        handle.await;
3925
3926        let event = cell.supervision_event();
3927        assert!(event.is_some(), "failed actor must have supervision_event");
3928        let event = event.unwrap();
3929        assert_eq!(event.actor_id, actor_id);
3930        assert!(event.actor_status.is_failed());
3931        // Originated here, not propagated.
3932        assert_eq!(event.actually_failing_actor().unwrap().actor_id, actor_id);
3933    }
3934
3935    // Exercises FI-2 (see introspect.rs module-scope comment).
3936    #[async_timed_test(timeout_secs = 30)]
3937    async fn test_supervision_event_on_clean_stop() {
3938        let proc = Proc::local();
3939        let (_client, _client_handle) = proc.instance("client").unwrap();
3940
3941        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
3942        let cell = handle.cell().clone();
3943
3944        handle.drain_and_stop("test").unwrap();
3945        handle.await;
3946
3947        let event = cell
3948            .supervision_event()
3949            .expect("cleanly stopped actor must have a supervision_event");
3950        assert!(
3951            matches!(event.actor_status, ActorStatus::Stopped(_)),
3952            "expected Stopped status, got {:?}",
3953            event.actor_status
3954        );
3955        assert!(!event.is_error());
3956    }
3957
3958    #[async_timed_test(timeout_secs = 30)]
3959    async fn test_supervision_coordinator_receives_clean_stop() {
3960        let proc = Proc::local();
3961        let (_client, _client_handle) = proc.instance("client").unwrap();
3962        let (mut reported_event, _coordinator_handle) =
3963            ProcSupervisionCoordinator::set(&proc).await.unwrap();
3964
3965        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
3966        let actor_id = handle.actor_id().clone();
3967
3968        handle.drain_and_stop("test").unwrap();
3969        handle.await;
3970
3971        let event = reported_event.recv().await;
3972        assert_eq!(event.actor_id, actor_id);
3973        assert!(
3974            matches!(event.actor_status, ActorStatus::Stopped(_)),
3975            "expected Stopped status, got {:?}",
3976            event.actor_status
3977        );
3978        assert!(!event.is_error());
3979    }
3980
3981    #[async_timed_test(timeout_secs = 30)]
3982    async fn test_coordinator_shuts_down_last_during_destroy() {
3983        let mut proc = Proc::local();
3984        let (_client, _client_handle) = proc.instance("client").unwrap();
3985        let (mut reported_event, _coordinator_handle) =
3986            ProcSupervisionCoordinator::set(&proc).await.unwrap();
3987
3988        // Spawn several actors that will all stop during destroy_and_wait.
3989        let mut actor_ids = Vec::new();
3990        for i in 0..3 {
3991            let handle = proc
3992                .spawn::<TestActor>(&format!("actor_{i}"), TestActor)
3993                .unwrap();
3994            actor_ids.push(handle.actor_id().clone());
3995        }
3996
3997        // destroy_and_wait stops all actors. If the coordinator were stopped
3998        // simultaneously, supervision event delivery would fail and crash
3999        // the process. The fact that this completes without crashing proves
4000        // the coordinator outlived the other actors.
4001        proc.destroy_and_wait::<()>(Duration::from_secs(5), None, "test")
4002            .await
4003            .unwrap();
4004
4005        // Verify the coordinator received stop events from all three actors.
4006        let mut received_ids = Vec::new();
4007        for _ in 0..actor_ids.len() {
4008            let event = reported_event.recv().await;
4009            assert!(
4010                matches!(event.actor_status, ActorStatus::Stopped(_)),
4011                "expected Stopped, got {:?}",
4012                event.actor_status
4013            );
4014            received_ids.push(event.actor_id);
4015        }
4016        received_ids.sort();
4017        actor_ids.sort();
4018        assert_eq!(received_ids, actor_ids);
4019    }
4020
4021    // Exercises FI-4 (see introspect.rs module-scope comment).
4022    #[async_timed_test(timeout_secs = 30)]
4023    async fn test_supervision_event_on_propagated_failure() {
4024        let proc = Proc::local();
4025        let (client, _client_handle) = proc.instance("client").unwrap();
4026        ProcSupervisionCoordinator::set(&proc).await.unwrap();
4027
4028        let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
4029        let parent_cell = parent.cell().clone();
4030        // Spawn child under parent.
4031        let (tx, rx) = oneshot::channel();
4032        parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
4033        let child = rx.await.unwrap();
4034        let child_id = child.actor_id().clone();
4035
4036        // Fail the child — parent doesn't handle supervision, so it
4037        // propagates and terminates too.
4038        child
4039            .send(
4040                &client,
4041                TestActorMessage::Fail(anyhow::anyhow!("child boom")),
4042            )
4043            .unwrap();
4044        parent.await;
4045
4046        let event = parent_cell.supervision_event();
4047        assert!(
4048            event.is_some(),
4049            "parent must have supervision_event from propagated failure"
4050        );
4051        let event = event.unwrap();
4052        // Root cause is the child, not the parent.
4053        assert_eq!(event.actually_failing_actor().unwrap().actor_id, child_id);
4054    }
4055
4056    // Exercises S11 (see introspect.rs module doc).
4057    //
4058    // A live actor is resolvable. After drain_and_stop + await, the
4059    // actor's status is terminal and resolve_actor_ref must return
4060    // None — even though the introspect task may still hold a strong
4061    // InstanceCell Arc (it drops the Arc only after observing
4062    // terminal status asynchronously). The is_terminal() check in
4063    // resolve_actor_ref closes that race window.
4064    #[async_timed_test(timeout_secs = 30)]
4065    async fn test_resolve_actor_ref_none_for_terminal_actor() {
4066        let proc = Proc::local();
4067        let (_client, _client_handle) = proc.instance("client").unwrap();
4068
4069        let handle = proc.spawn::<TestActor>("target", TestActor).unwrap();
4070        let actor_ref: reference::ActorRef<TestActor> = handle.bind();
4071
4072        // Actor is live — resolve should succeed.
4073        assert!(
4074            proc.resolve_actor_ref(&actor_ref).is_some(),
4075            "live actor should be resolvable"
4076        );
4077
4078        handle.drain_and_stop("test").unwrap();
4079        handle.await;
4080
4081        // Actor is terminal — resolve must return None regardless of
4082        // whether the introspect task has dropped its Arc yet.
4083        assert!(
4084            proc.resolve_actor_ref(&actor_ref).is_none(),
4085            "terminal actor must not be resolvable"
4086        );
4087    }
4088
4089    // Exercises FI-3 (see introspect module doc).
4090    #[async_timed_test(timeout_secs = 30)]
4091    async fn test_terminated_snapshot_has_failure_info() {
4092        let proc = Proc::local();
4093        let (client, _client_handle) = proc.instance("client").unwrap();
4094        ProcSupervisionCoordinator::set(&proc).await.unwrap();
4095
4096        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
4097        let actor_id = handle.actor_id().clone();
4098
4099        handle
4100            .send(&client, TestActorMessage::Fail(anyhow::anyhow!("kaboom")))
4101            .unwrap();
4102        handle.await;
4103
4104        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
4105        let attrs: hyperactor_config::Attrs =
4106            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4107        let status = attrs
4108            .get(crate::introspect::STATUS)
4109            .expect("must have status");
4110        assert!(
4111            status.starts_with("failed"),
4112            "expected failed status, got: {}",
4113            status
4114        );
4115        let err_msg = attrs
4116            .get(crate::introspect::FAILURE_ERROR_MESSAGE)
4117            .expect("failed actor must have failure_error_message");
4118        assert!(!err_msg.is_empty());
4119        let root_cause = attrs
4120            .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
4121            .expect("must have root_cause_actor");
4122        assert_eq!(root_cause, &actor_id);
4123        assert_eq!(
4124            attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
4125            Some(&false)
4126        );
4127        assert!(
4128            attrs.get(crate::introspect::FAILURE_OCCURRED_AT).is_some(),
4129            "failed actor must have occurred_at"
4130        );
4131    }
4132
4133    // Exercises FI-4 (see introspect module doc).
4134    #[async_timed_test(timeout_secs = 30)]
4135    async fn test_propagated_failure_info() {
4136        let proc = Proc::local();
4137        let (client, _client_handle) = proc.instance("client").unwrap();
4138        ProcSupervisionCoordinator::set(&proc).await.unwrap();
4139
4140        let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
4141        let parent_id = parent.actor_id().clone();
4142
4143        let (tx, rx) = oneshot::channel();
4144        parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
4145        let child = rx.await.unwrap();
4146        let child_id = child.actor_id().clone();
4147
4148        child
4149            .send(
4150                &client,
4151                TestActorMessage::Fail(anyhow::anyhow!("child fail")),
4152            )
4153            .unwrap();
4154        parent.await;
4155
4156        let snapshot = wait_for_terminated_snapshot(&proc, &parent_id).await;
4157        let attrs: hyperactor_config::Attrs =
4158            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4159        let root_cause = attrs
4160            .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
4161            .expect("propagated failure must have root_cause_actor");
4162        assert_eq!(root_cause, &child_id);
4163        assert_eq!(
4164            attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
4165            Some(&true)
4166        );
4167    }
4168
4169    /// Exercises AI-1 (see module doc).
4170    #[async_timed_test(timeout_secs = 30)]
4171    async fn test_spawn_with_name_creates_descriptive_name() {
4172        let proc = Proc::local();
4173        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4174        let handle = proc
4175            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
4176            .unwrap();
4177        assert_eq!(handle.actor_id().name(), "my_controller");
4178        assert_eq!(handle.actor_id().pid(), 1);
4179    }
4180
4181    /// Exercises AI-1 (see module doc).
4182    #[async_timed_test(timeout_secs = 30)]
4183    async fn test_spawn_with_name_increments_index() {
4184        let proc = Proc::local();
4185        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4186        let first = proc
4187            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
4188            .unwrap();
4189        let second = proc
4190            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
4191            .unwrap();
4192        assert_eq!(first.actor_id().pid(), 1);
4193        assert_eq!(second.actor_id().pid(), 2);
4194    }
4195
4196    /// Exercises AI-1 (see module doc).
4197    /// spawn_named_child passes Some(parent) to spawn_inner.
4198    #[async_timed_test(timeout_secs = 30)]
4199    async fn test_spawn_with_name_preserves_supervision() {
4200        let proc = Proc::local();
4201        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4202        let child = proc
4203            .spawn_named_child(root.cell().clone(), "supervised_child", TestActor)
4204            .unwrap();
4205        let child_cell = child.cell();
4206        let parent = child_cell.parent().expect("named child must have a parent");
4207        assert_eq!(parent.actor_id(), root.actor_id());
4208    }
4209
4210    /// Exercises AI-1 (see module doc).
4211    #[async_timed_test(timeout_secs = 30)]
4212    async fn test_spawn_unchanged() {
4213        let proc = Proc::local();
4214        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4215        let child = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
4216        assert_eq!(child.actor_id().name(), root.actor_id().name());
4217    }
4218
4219    /// Exercises AI-1 (see module doc).
4220    #[async_timed_test(timeout_secs = 30)]
4221    async fn test_spawn_with_name_different_names_different_pids() {
4222        let proc = Proc::local();
4223        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4224        let a = proc
4225            .spawn_named_child(root.cell().clone(), "controller_a", TestActor)
4226            .unwrap();
4227        let b = proc
4228            .spawn_named_child(root.cell().clone(), "controller_b", TestActor)
4229            .unwrap();
4230        assert_ne!(a.actor_id().pid(), b.actor_id().pid());
4231        assert_eq!(a.actor_id().name(), "controller_a");
4232        assert_eq!(b.actor_id().name(), "controller_b");
4233    }
4234
4235    /// Exercises AI-1 (see module doc).
4236    #[async_timed_test(timeout_secs = 30)]
4237    async fn test_spawn_with_name_no_child_overwrite() {
4238        let proc = Proc::local();
4239        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4240        let _a = proc
4241            .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4242            .unwrap();
4243        let _b = proc
4244            .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4245            .unwrap();
4246        let _c = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
4247        assert_eq!(root.cell().child_count(), 3);
4248    }
4249
4250    /// Exercises AI-1 (see module doc).
4251    #[async_timed_test(timeout_secs = 30)]
4252    async fn test_spawn_with_name_does_not_pollute_roots() {
4253        let proc = Proc::local();
4254        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4255        let _child = proc
4256            .spawn_named_child(root.cell().clone(), "foo", TestActor)
4257            .unwrap();
4258        // "foo" was used as a named child name but should NOT
4259        // prevent spawning a root actor with that name.
4260        let result = proc.spawn::<TestActor>("foo", TestActor);
4261        assert!(result.is_ok(), "named child should not pollute roots");
4262    }
4263
4264    /// Exercises AI-3 (see module doc).
4265    #[async_timed_test(timeout_secs = 30)]
4266    async fn test_ai3_controller_actor_ids_unique_across_parents_same_proc() {
4267        let proc = Proc::local();
4268        let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4269        let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4270
4271        // Simulate the correct pattern: include mesh identity in name.
4272        let ctrl_a = proc
4273            .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4274            .unwrap();
4275        let ctrl_b = proc
4276            .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4277            .unwrap();
4278
4279        assert_ne!(
4280            ctrl_a.actor_id(),
4281            ctrl_b.actor_id(),
4282            "controller ActorIds must be unique across parents"
4283        );
4284    }
4285
4286    /// Exercises AI-3 (see module doc).
4287    #[async_timed_test(timeout_secs = 30)]
4288    async fn test_ai3_no_controller_overwrite_in_parent_or_proc_maps() {
4289        let proc = Proc::local();
4290        let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4291        let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4292
4293        let ctrl_a = proc
4294            .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4295            .unwrap();
4296        let ctrl_b = proc
4297            .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4298            .unwrap();
4299
4300        // Both must be independently resolvable via the proc's instances.
4301        assert!(
4302            proc.get_instance(ctrl_a.actor_id()).is_some(),
4303            "ctrl_a must be resolvable"
4304        );
4305        assert!(
4306            proc.get_instance(ctrl_b.actor_id()).is_some(),
4307            "ctrl_b must be resolvable"
4308        );
4309        // Parents each see exactly one child.
4310        assert_eq!(parent_a.cell().child_count(), 1);
4311        assert_eq!(parent_b.cell().child_count(), 1);
4312    }
4313
4314    // Exercises FI-6 (see introspect module doc).
4315    #[async_timed_test(timeout_secs = 30)]
4316    async fn test_stopped_snapshot_has_no_failure_info() {
4317        let proc = Proc::local();
4318        let (_client, _client_handle) = proc.instance("client").unwrap();
4319
4320        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
4321        let actor_id = handle.actor_id().clone();
4322
4323        handle.drain_and_stop("test").unwrap();
4324        handle.await;
4325
4326        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
4327        let attrs: hyperactor_config::Attrs =
4328            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4329        let status = attrs
4330            .get(crate::introspect::STATUS)
4331            .expect("must have status");
4332        assert!(
4333            status.starts_with("stopped"),
4334            "expected stopped, got: {}",
4335            status
4336        );
4337        assert!(
4338            attrs
4339                .get(crate::introspect::FAILURE_ERROR_MESSAGE)
4340                .is_none(),
4341            "stopped actor must not have failure attrs"
4342        );
4343    }
4344}