hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! [`Proc`] is an addressable actor-runtime boundary.
10//!
11//! It owns actor lifecycle (spawn, run, terminate), routes messages
12//! to local actors, forwards messages for remote destinations, and
13//! hosts supervision state.
14//!
15//! It also stores bounded snapshots of terminated actors for
16//! post-mortem introspection.
17//!
18//! ## Client instance invariants (CI-*)
19//!
20//! - **CI-1 (client status):** `IntrospectMessage::Query` on an
21//!   introspectable instance returns `status: "client"` and
22//!   `actor_type: "()"` in attrs.
23//! - **CI-2 (snapshot on drop):** Dropping the returned `Instance<()>`
24//!   transitions its status to terminal, causing the introspect task
25//!   to store a terminated snapshot.
26//!
27//! ## Actor identity invariants (AI-*)
28//!
29//! - **AI-1 (named-child pid):** The pid of a named child must
30//!   remain in the parent's sibling pid domain. The name is
31//!   presentation only; the numeric pid is allocated from the
32//!   parent's counter, preserving supervision linkage.
33//! - **AI-3 (controller ActorId uniqueness):** Callers must ensure
34//!   the name is unique proc-wide. Two children with the same name
35//!   under different parents get distinct pids but the same name
36//!   prefix.
37
38use std::any::Any;
39use std::any::TypeId;
40use std::collections::HashMap;
41use std::fmt;
42use std::future::Future;
43use std::ops::Deref;
44use std::panic;
45use std::panic::AssertUnwindSafe;
46use std::panic::Location;
47use std::pin::Pin;
48use std::sync::Arc;
49use std::sync::OnceLock;
50use std::sync::RwLock;
51use std::sync::Weak;
52use std::sync::atomic::AtomicBool;
53use std::sync::atomic::AtomicU64;
54use std::sync::atomic::AtomicUsize;
55use std::sync::atomic::Ordering;
56use std::time::Duration;
57use std::time::Instant;
58use std::time::SystemTime;
59
60use async_trait::async_trait;
61use dashmap::DashMap;
62use dashmap::mapref::entry::Entry;
63use dashmap::mapref::multiple::RefMulti;
64use futures::FutureExt;
65use hyperactor_config::Flattrs;
66use hyperactor_telemetry::ActorStatusEvent;
67use hyperactor_telemetry::generate_actor_status_event_id;
68use hyperactor_telemetry::hash_to_u64;
69use hyperactor_telemetry::notify_actor_status_changed;
70use hyperactor_telemetry::notify_message;
71use hyperactor_telemetry::notify_message_status;
72use hyperactor_telemetry::recorder::Recording;
73use tokio::sync::mpsc;
74use tokio::sync::watch;
75use tokio::task::JoinHandle;
76use tracing::Instrument;
77use tracing::Span;
78use typeuri::Named;
79use uuid::Uuid;
80use wirevalue::TypeInfo;
81
82use crate as hyperactor;
83use crate::Actor;
84use crate::Handler;
85use crate::Message;
86use crate::RemoteMessage;
87use crate::actor::ActorError;
88use crate::actor::ActorErrorKind;
89use crate::actor::ActorHandle;
90use crate::actor::ActorStatus;
91use crate::actor::Binds;
92use crate::actor::HandlerInfo;
93use crate::actor::Referable;
94use crate::actor::RemoteHandles;
95use crate::actor::Signal;
96use crate::actor_local::ActorLocalStorage;
97use crate::channel;
98use crate::channel::ChannelAddr;
99use crate::channel::ChannelError;
100use crate::channel::ChannelTransport;
101use crate::config;
102use crate::context;
103use crate::context::Mailbox as _;
104use crate::introspect::IntrospectMessage;
105use crate::introspect::IntrospectResult;
106use crate::mailbox::BoxedMailboxSender;
107use crate::mailbox::DeliveryError;
108use crate::mailbox::DialMailboxRouter;
109use crate::mailbox::IntoBoxedMailboxSender as _;
110use crate::mailbox::Mailbox;
111use crate::mailbox::MailboxMuxer;
112use crate::mailbox::MailboxSender;
113use crate::mailbox::MailboxServer as _;
114use crate::mailbox::MessageEnvelope;
115use crate::mailbox::OncePortHandle;
116use crate::mailbox::OncePortReceiver;
117use crate::mailbox::PanickingMailboxSender;
118use crate::mailbox::PortHandle;
119use crate::mailbox::PortReceiver;
120use crate::mailbox::Undeliverable;
121use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
122use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
123use crate::metrics::ACTOR_MESSAGES_RECEIVED;
124use crate::ordering::OrderedSender;
125use crate::ordering::OrderedSenderError;
126use crate::ordering::SEQ_INFO;
127use crate::ordering::SeqInfo;
128use crate::ordering::Sequencer;
129use crate::ordering::ordered_channel;
130use crate::panic_handler;
131use crate::reference;
132use crate::supervision::ActorSupervisionEvent;
133
134/// This is used to mint new local ranks for [`Proc::local`].
135static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
136
137/// A proc instance is the runtime managing a single proc in Hyperactor.
138/// It is responsible for spawning actors in the proc, multiplexing messages
139/// to/within actors in the proc, and providing fallback routing to external
140/// procs.
141///
142/// Procs are also responsible for maintaining the local supervision hierarchy.
143#[derive(Clone)]
144pub struct Proc {
145    inner: Arc<ProcState>,
146}
147
148impl fmt::Debug for Proc {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("Proc")
151            .field("proc_id", &self.inner.proc_id)
152            .finish()
153    }
154}
155
156struct ProcState {
157    /// The proc's id. This should be globally unique, but is not (yet)
158    /// for local-only procs.
159    proc_id: reference::ProcId,
160
161    /// A muxer instance that has entries for every actor managed by
162    /// the proc.
163    proc_muxer: MailboxMuxer,
164
165    /// Sender used to forward messages outside of the proc.
166    forwarder: BoxedMailboxSender,
167
168    /// Per-name atomic index allocator. Used by `allocate_root_id`
169    /// (index 0, counter starts at 1) and `allocate_child_id`
170    /// (increments the parent's counter). Each root name gets its
171    /// own independent counter.
172    roots: DashMap<String, AtomicUsize>,
173
174    /// All actor instances in this proc.
175    instances: DashMap<reference::ActorId, WeakInstanceCell>,
176
177    /// Snapshots of terminated actors for post-mortem introspection.
178    /// Populated by the introspect task just before it exits on
179    /// terminal status. Bounded by
180    /// [`config::TERMINATED_SNAPSHOT_RETENTION`].
181    terminated_snapshots: DashMap<reference::ActorId, crate::introspect::IntrospectResult>,
182
183    /// Used by root actors to send events to the actor coordinating
184    /// supervision of root actors in this proc.
185    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
186}
187
188impl Drop for ProcState {
189    fn drop(&mut self) {
190        // We only want log ProcStatus::Dropped when ProcState is dropped,
191        // rather than Proc is dropped. This is because we need to wait for
192        // Proc::inner's ref count becomes 0.
193        tracing::info!(
194            proc_id = %self.proc_id,
195            name = "ProcStatus",
196            status = "Dropped"
197        );
198    }
199}
200
201/// Structured return type for [`Proc::actor_instance`].
202///
203/// Groups the instance, handle, and per-channel receivers that an
204/// "inverted" actor caller needs to drive the actor manually.
205pub struct ActorInstance<A: Actor> {
206    /// The actor instance (used for sending/receiving messages, spawning children, etc.).
207    pub instance: Instance<A>,
208    /// Handle to the actor (used for lifecycle control and port access).
209    pub handle: ActorHandle<A>,
210    /// Supervision events delivered to this actor.
211    pub supervision: PortReceiver<ActorSupervisionEvent>,
212    /// Control signals for the actor.
213    pub signal: PortReceiver<Signal>,
214    /// Primary work queue for handler dispatch.
215    pub work: mpsc::UnboundedReceiver<WorkCell<A>>,
216}
217
218impl Proc {
219    /// Create a pre-configured proc with the given proc id and forwarder.
220    pub fn configured(proc_id: reference::ProcId, forwarder: BoxedMailboxSender) -> Self {
221        tracing::info!(
222            proc_id = %proc_id,
223            name = "ProcStatus",
224            status = "Created"
225        );
226
227        Self {
228            inner: Arc::new(ProcState {
229                proc_id,
230                proc_muxer: MailboxMuxer::new(),
231                forwarder,
232                roots: DashMap::new(),
233                instances: DashMap::new(),
234                terminated_snapshots: DashMap::new(),
235                supervision_coordinator_port: OnceLock::new(),
236            }),
237        }
238    }
239
240    /// Create a new direct-addressed proc.
241    pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
242        let (addr, rx) = channel::serve(addr)?;
243        let proc_id = reference::ProcId::with_name(addr, name);
244        let proc = Self::configured(proc_id, DialMailboxRouter::new().into_boxed());
245        proc.clone().serve(rx);
246        Ok(proc)
247    }
248
249    /// Set the supervision coordinator's port for this proc. Return Err if it is
250    /// already set.
251    pub fn set_supervision_coordinator(
252        &self,
253        port: PortHandle<ActorSupervisionEvent>,
254    ) -> Result<(), anyhow::Error> {
255        self.state()
256            .supervision_coordinator_port
257            .set(port)
258            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
259    }
260
261    /// Handle a supervision event received by the proc. Attempt to forward it to the
262    /// supervision coordinator port if one is set, otherwise crash the process.
263    pub fn handle_unhandled_supervision_event(
264        &self,
265        cx: &impl context::Actor,
266        event: ActorSupervisionEvent,
267    ) {
268        let result = match self.state().supervision_coordinator_port.get() {
269            Some(port) => port.send(cx, event.clone()).map_err(anyhow::Error::from),
270            None => Err(anyhow::anyhow!(
271                "coordinator port is not set for proc {}",
272                self.proc_id(),
273            )),
274        };
275        if let Err(err) = result {
276            tracing::error!(
277                "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
278                self.proc_id(),
279                event,
280                err
281            );
282
283            std::process::exit(1);
284        }
285    }
286
287    /// Create a new local-only proc. This proc is not allowed to forward messages
288    /// outside of the proc itself.
289    pub fn local() -> Self {
290        let rank = NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed);
291        let addr = ChannelAddr::any(ChannelTransport::Local);
292        let proc_id = reference::ProcId::unique(addr, format!("local_{}", rank));
293        Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
294    }
295
296    /// The proc's ID.
297    pub fn proc_id(&self) -> &reference::ProcId {
298        &self.state().proc_id
299    }
300
301    /// Shared sender used by the proc to forward messages to remote
302    /// destinations.
303    pub fn forwarder(&self) -> &BoxedMailboxSender {
304        &self.inner.forwarder
305    }
306
307    /// Convenience accessor for state.
308    fn state(&self) -> &ProcState {
309        self.inner.as_ref()
310    }
311
312    /// A global runtime proc used by this crate.
313    pub(crate) fn runtime() -> &'static Proc {
314        static RUNTIME_PROC: OnceLock<Proc> = OnceLock::new();
315        RUNTIME_PROC.get_or_init(|| {
316            let addr = ChannelAddr::any(ChannelTransport::Local);
317            let proc_id = reference::ProcId::unique(addr, "hyperactor_runtime");
318            Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
319        })
320    }
321
322    /// Attach a mailbox to the proc with the provided root name.
323    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
324        let actor_id: reference::ActorId = self.allocate_root_id(name)?;
325        Ok(self.bind_mailbox(actor_id))
326    }
327
328    /// Attach a mailbox to the proc as a child actor.
329    pub fn attach_child(&self, parent_id: &reference::ActorId) -> Result<Mailbox, anyhow::Error> {
330        let actor_id: reference::ActorId = self.allocate_child_id(parent_id)?;
331        Ok(self.bind_mailbox(actor_id))
332    }
333
334    /// Bind a mailbox to the proc.
335    fn bind_mailbox(&self, actor_id: reference::ActorId) -> Mailbox {
336        let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
337
338        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
339        // by the caller. This will likely require a weak reference.
340        self.state().proc_muxer.bind_mailbox(mbox.clone());
341        mbox
342    }
343
344    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorRef`].
345    /// This is intended only for testing, and will be replaced by simpled utilities.
346    pub fn attach_actor<R, M>(
347        &self,
348        name: &str,
349    ) -> Result<(Instance<()>, reference::ActorRef<R>, PortReceiver<M>), anyhow::Error>
350    where
351        M: RemoteMessage,
352        R: Referable + RemoteHandles<M>,
353    {
354        let (instance, _handle) = self.instance(name)?;
355        let (_handle, rx) = instance.bind_actor_port::<M>();
356        let actor_ref = reference::ActorRef::attest(instance.self_id().clone());
357        Ok((instance, actor_ref, rx))
358    }
359
360    /// Spawn a named (root) actor on this proc. The name of the actor must be
361    /// unique.
362    pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
363        let actor_id = self.allocate_root_id(name)?;
364        self.spawn_inner(actor_id, actor, None)
365    }
366
367    /// Common spawn logic for both root and child actors.
368    /// Creates a tracing span with the correct actor_id before starting the actor.
369    #[hyperactor::instrument(fields(actor_id = actor_id.to_string(), actor_name = actor_id.name(), actor_type = std::any::type_name::<A>()))]
370    fn spawn_inner<A: Actor>(
371        &self,
372        actor_id: reference::ActorId,
373        actor: A,
374        parent: Option<InstanceCell>,
375    ) -> Result<ActorHandle<A>, anyhow::Error> {
376        let (instance, receivers) = Instance::new(self.clone(), actor_id, false, parent);
377        Ok(instance.start(actor, receivers))
378    }
379
380    /// Create a lightweight client instance (no actor loop, no
381    /// introspect task).  This is safe to call outside a Tokio
382    /// runtime — unlike [`actor_instance`], it never calls
383    /// `tokio::spawn`.
384    pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
385        let actor_id = self.allocate_root_id(name)?;
386        let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, None);
387        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
388        instance.change_status(ActorStatus::Client);
389        Ok((instance, handle))
390    }
391
392    /// Create a lightweight client instance that handles
393    /// [`IntrospectMessage`].
394    ///
395    /// Like [`instance`](Self::instance), this creates a client-mode
396    /// instance with no actor message loop. Unlike `instance`, it
397    /// spawns a dedicated introspect task, so the instance responds
398    /// to `IntrospectMessage::Query` and is visible and navigable in
399    /// admin tooling such as the mesh TUI.
400    ///
401    /// See CI-1, CI-2 in module doc.
402    ///
403    /// Requires an active Tokio runtime (calls `tokio::spawn`).
404    pub fn introspectable_instance(
405        &self,
406        name: &str,
407    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
408        let actor_id = self.allocate_root_id(name)?;
409        let (instance, receivers) = Instance::new(self.clone(), actor_id, false, None);
410        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
411        instance.change_status(ActorStatus::Client);
412        tokio::spawn(crate::introspect::serve_introspect(
413            instance.inner.cell.clone(),
414            instance.inner.mailbox.clone(),
415            receivers.introspect,
416        ));
417        Ok((instance, handle))
418    }
419
420    /// Create and return an actor instance, its handle, and its
421    /// receivers. This allows actors to be "inverted": the caller can
422    /// use the returned [`Instance`] to send and receive messages,
423    /// launch child actors, etc. The actor itself does not handle any
424    /// messages unless driven by the caller.
425    pub fn actor_instance<A: Actor>(&self, name: &str) -> Result<ActorInstance<A>, anyhow::Error> {
426        let actor_id = self.allocate_root_id(name)?;
427        let span = tracing::debug_span!(
428            "actor_instance",
429            actor_name = name,
430            actor_type = std::any::type_name::<A>(),
431            actor_id = actor_id.to_string(),
432        );
433        let _guard = span.enter();
434        let (instance, receivers) = Instance::new(self.clone(), actor_id.clone(), false, None);
435        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
436        instance.change_status(ActorStatus::Client);
437
438        let introspect_cell = instance.inner.cell.clone();
439        let introspect_mailbox = instance.inner.mailbox.clone();
440        tokio::spawn(crate::introspect::serve_introspect(
441            introspect_cell,
442            introspect_mailbox,
443            receivers.introspect,
444        ));
445
446        let (signal_rx, supervision_rx) = receivers.actor_loop.unwrap();
447        Ok(ActorInstance {
448            instance,
449            handle,
450            supervision: supervision_rx,
451            signal: signal_rx,
452            work: receivers.work,
453        })
454    }
455
456    /// Traverse all actor trees in this proc, starting from root actors (pid=0).
457    ///
458    /// **Caution:** This holds DashMap shard read locks while doing
459    /// `Weak::upgrade()` and recursively walking the actor tree per
460    /// entry. Under rapid actor churn, this causes convoy starvation
461    /// with concurrent `insert`/`remove` operations. Prefer
462    /// `all_instance_keys()` with point lookups if you only need
463    /// actor IDs. Currently unused in production code.
464    pub fn traverse<F>(&self, f: &mut F)
465    where
466        F: FnMut(&InstanceCell, usize),
467    {
468        for entry in self.state().instances.iter() {
469            if entry.key().pid() == 0 {
470                if let Some(cell) = entry.value().upgrade() {
471                    cell.traverse(f);
472                }
473            }
474        }
475    }
476
477    /// Look up an instance by ActorId.
478    pub fn get_instance(&self, actor_id: &reference::ActorId) -> Option<InstanceCell> {
479        self.state()
480            .instances
481            .get(actor_id)
482            .and_then(|weak| weak.upgrade())
483    }
484
485    /// Returns the ActorIds of all root actors (pid=0) in this proc.
486    ///
487    /// **Caution:** This iterates the full DashMap under shard read
488    /// locks. The per-entry work is lightweight (key filter + clone),
489    /// but under very rapid churn the iteration can still contend
490    /// with concurrent writes. Prefer `all_instance_keys()` with a
491    /// post-filter if this becomes a hot path. Currently unused in
492    /// production code.
493    pub fn root_actor_ids(&self) -> Vec<reference::ActorId> {
494        self.state()
495            .instances
496            .iter()
497            .filter(|entry| entry.key().pid() == 0)
498            .map(|entry| entry.key().clone())
499            .collect()
500    }
501
502    /// Returns the ActorIds of all live actors in this proc, including
503    /// dynamically spawned children.
504    ///
505    /// An actor is considered live if its weak reference is
506    /// upgradeable and its status is not terminal. This excludes
507    /// actors whose `InstanceCell` has been dropped and actors that
508    /// have stopped or failed but whose Arc is still held (e.g. by
509    /// the introspect task during teardown).
510    pub fn all_actor_ids(&self) -> Vec<reference::ActorId> {
511        self.state()
512            .instances
513            .iter()
514            .filter(|entry| {
515                entry
516                    .value()
517                    .upgrade()
518                    .is_some_and(|cell| !cell.status().borrow().is_terminal())
519            })
520            .map(|entry| entry.key().clone())
521            .collect()
522    }
523
524    /// Snapshot all instance keys from the DashMap without inspecting
525    /// values. Each shard read lock is held only long enough to clone
526    /// the key — no `Weak::upgrade()`, no `watch::borrow()`, no
527    /// `is_terminal()` check. This minimises shard lock hold time to
528    /// avoid convoy starvation with concurrent `insert`/`remove`
529    /// operations during rapid actor churn.
530    ///
531    /// The returned list may include actors that are terminal or
532    /// whose `WeakInstanceCell` no longer upgrades. Callers should
533    /// tolerate stale entries (e.g. by handling "not found" on
534    /// subsequent per-actor lookups).
535    pub fn all_instance_keys(&self) -> Vec<reference::ActorId> {
536        self.state()
537            .instances
538            .iter()
539            .map(|entry| entry.key().clone())
540            .collect()
541    }
542
543    /// Look up a terminated actor's snapshot by ID.
544    pub fn terminated_snapshot(
545        &self,
546        actor_id: &reference::ActorId,
547    ) -> Option<crate::introspect::IntrospectResult> {
548        self.state()
549            .terminated_snapshots
550            .get(actor_id)
551            .map(|e| e.value().clone())
552    }
553
554    /// Return all terminated actor IDs currently retained.
555    pub fn all_terminated_actor_ids(&self) -> Vec<reference::ActorId> {
556        self.state()
557            .terminated_snapshots
558            .iter()
559            .map(|e| e.key().clone())
560            .collect()
561    }
562
563    /// Create a child instance. Called from `Instance`.
564    fn child_instance(
565        &self,
566        parent: InstanceCell,
567    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
568        let actor_id = self.allocate_child_id(parent.actor_id())?;
569        let _ = tracing::debug_span!(
570            "child_actor_instance",
571            parent_actor_id = %parent.actor_id(),
572            actor_type = std::any::type_name::<()>(),
573            actor_id = %actor_id,
574        );
575
576        let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, Some(parent));
577        // Client-mode instance: no actor loop, no introspect task.
578        // Receivers are intentionally dropped.
579        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
580        instance.change_status(ActorStatus::Client);
581        Ok((instance, handle))
582    }
583
584    /// Spawn a child actor from the provided parent on this proc. The parent actor
585    /// must already belong to this proc, a fact which is asserted in code.
586    ///
587    /// When spawn_child returns, the child has an associated cell and is linked
588    /// with its parent.
589    pub(crate) fn spawn_child<A: Actor>(
590        &self,
591        parent: InstanceCell,
592        actor: A,
593    ) -> Result<ActorHandle<A>, anyhow::Error> {
594        let actor_id = self.allocate_child_id(parent.actor_id())?;
595        self.spawn_inner(actor_id, actor, Some(parent))
596    }
597
598    /// Spawn a named child actor. Same as `spawn_child` but the child
599    /// gets a descriptive name instead of inheriting the parent's.
600    /// Supervision linkage to parent is preserved.
601    pub(crate) fn spawn_named_child<A: Actor>(
602        &self,
603        parent: InstanceCell,
604        name: &str,
605        actor: A,
606    ) -> Result<ActorHandle<A>, anyhow::Error> {
607        let actor_id = self.allocate_named_child_id(parent.actor_id(), name)?;
608        self.spawn_inner(actor_id, actor, Some(parent))
609    }
610
611    /// Call `abort` on the `JoinHandle` associated with the given
612    /// root actor. If successful return `Some(root.clone())` else
613    /// `None`.
614    pub fn abort_root_actor(
615        &self,
616        root: &reference::ActorId,
617        this_handle: Option<&JoinHandle<()>>,
618    ) -> Option<impl Future<Output = reference::ActorId>> {
619        self.state()
620            .instances
621            .get(root)
622            .into_iter()
623            .flat_map(|e| e.upgrade())
624            .map(|cell| {
625                let r1 = root.clone();
626                let r2 = root.clone();
627                // If abort_root_actor was called from inside an actor task, we don't want to abort that actor's task yet.
628                let skip_abort = this_handle.is_some_and(|this_h| {
629                    cell.inner
630                        .actor_task_handle
631                        .get()
632                        .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
633                });
634                // `Instance::start()` is infallible and should
635                // complete quickly, so calling `wait()` on `actor_task_handle`
636                // should be safe (i.e., not hang forever).
637                async move {
638                    tokio::task::spawn_blocking(move || {
639                        if !skip_abort {
640                            let h = cell.inner.actor_task_handle.wait();
641                            tracing::debug!("{}: aborting {:?}", r1, h);
642                            h.abort();
643                        }
644                    })
645                    .await
646                    .unwrap();
647                    r2
648                }
649            })
650            .next()
651    }
652
653    /// Signals to a root actor to stop,
654    /// returning a status observer if successful.
655    pub fn stop_actor(
656        &self,
657        actor_id: &reference::ActorId,
658        reason: String,
659    ) -> Option<watch::Receiver<ActorStatus>> {
660        if let Some(entry) = self.state().instances.get(actor_id) {
661            match entry.value().upgrade() {
662                None => None, // the actor's cell has been dropped
663                Some(cell) => {
664                    tracing::info!("sending stop signal to {}", cell.actor_id());
665                    if let Err(err) = cell.signal(Signal::DrainAndStop(reason)) {
666                        tracing::error!(
667                            "{}: failed to send stop signal to pid {}: {:?}",
668                            self.proc_id(),
669                            cell.pid(),
670                            err
671                        );
672                        None
673                    } else {
674                        Some(cell.status().clone())
675                    }
676                }
677            }
678        } else {
679            tracing::error!("no actor {} found in {}", actor_id, self.proc_id());
680            None
681        }
682    }
683
684    /// Stop the proc. Returns a pair of:
685    /// - the actors observed to stop;
686    /// - the actors not observed to stop when timeout.
687    ///
688    /// If `cx` is specified, it means this method was called from inside an actor
689    /// in which case we shouldn't wait for it to stop and need to delay aborting
690    /// its task.
691    pub async fn destroy_and_wait<A: Actor>(
692        &mut self,
693        timeout: Duration,
694        cx: Option<&Context<'_, A>>,
695        reason: &str,
696    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
697        self.destroy_and_wait_except_current::<A>(timeout, cx, false, reason)
698            .await
699    }
700
701    /// Stop the proc. Returns a pair of:
702    /// - the actors observed to stop;
703    /// - the actors not observed to stop when timeout.
704    ///
705    /// If `cx` is specified, it means this method was called from inside an actor
706    /// in which case we shouldn't wait for it to stop and need to delay aborting
707    /// its task.
708    /// If except_current is true, don't stop the actor represented by "cx" at
709    /// all.
710    #[hyperactor::instrument]
711    pub async fn destroy_and_wait_except_current<A: Actor>(
712        &mut self,
713        timeout: Duration,
714        cx: Option<&Context<'_, A>>,
715        except_current: bool,
716        reason: &str,
717    ) -> Result<(Vec<reference::ActorId>, Vec<reference::ActorId>), anyhow::Error> {
718        tracing::debug!("{}: proc stopping", self.proc_id());
719
720        let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
721            (
722                Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
723                Some(cx.self_id())
724            )
725        });
726
727        let mut statuses = HashMap::new();
728        for actor_id in self
729            .state()
730            .instances
731            .iter()
732            .filter(|entry| entry.key().pid() == 0)
733            .map(|entry| entry.key().clone())
734            .collect::<Vec<_>>()
735        {
736            if let Some(status) = self.stop_actor(&actor_id, reason.to_string()) {
737                statuses.insert(actor_id, status);
738            }
739        }
740        tracing::debug!("{}: proc stopped", self.proc_id());
741
742        let waits: Vec<_> = statuses
743            .iter_mut()
744            .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
745            .map(|(actor_id, root)| {
746                let actor_id = actor_id.clone();
747                async move {
748                    tokio::time::timeout(
749                        timeout,
750                        root.wait_for(|state: &ActorStatus| state.is_terminal()),
751                    )
752                    .await
753                    .ok()
754                    .map(|_| actor_id)
755                }
756            })
757            .collect();
758
759        let results = futures::future::join_all(waits).await;
760        let stopped_actors: Vec<_> = results
761            .iter()
762            .filter_map(|actor_id| actor_id.as_ref())
763            .cloned()
764            .collect();
765        let aborted_actors: Vec<_> = statuses
766            .iter()
767            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
768            .map(|(actor_id, _)| {
769                let f = self.abort_root_actor(actor_id, this_handle);
770                async move {
771                    let _ = if let Some(f) = f { Some(f.await) } else { None };
772                    // If `is_none(&_)` then the associated actor's
773                    // instance cell was already dropped when we went
774                    // to call `abort()` on the cell's task handle.
775
776                    actor_id.clone()
777                }
778            })
779            .collect();
780        let aborted_actors = futures::future::join_all(aborted_actors).await;
781
782        if let Some(this_handle) = this_handle
783            && let Some(this_actor_id) = this_actor_id
784            && !except_current
785        {
786            tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
787            this_handle.abort()
788        };
789
790        tracing::info!(
791            "destroy_and_wait: {} actors stopped, {} actors aborted",
792            stopped_actors.len(),
793            aborted_actors.len()
794        );
795        Ok((stopped_actors, aborted_actors))
796    }
797
798    /// Resolve an actor reference to a **live** actor on this proc.
799    ///
800    /// Returns `None` if:
801    /// - the actor was never spawned here,
802    /// - the actor's `InstanceCell` has been dropped, or
803    /// - the actor's status is terminal (stopped or failed).
804    ///
805    /// The terminal-status check guards a race window: the introspect
806    /// task (`serve_introspect`) holds a strong `InstanceCell` Arc
807    /// and drops it only after observing terminal status. Between the
808    /// actor reaching terminal and the introspect task reacting,
809    /// `upgrade()` on the weak ref succeeds even though the actor is
810    /// dead. The `is_terminal()` check closes that window. Once the
811    /// introspect task exits, the Arc is dropped and `upgrade()`
812    /// returns `None` on its own.
813    ///
814    /// Bounds:
815    /// - `R: Actor` — must be a real actor that can live in this
816    ///   proc.
817    /// - `R: Referable` — required because the input is an
818    ///   `ActorRef<R>`.
819    pub fn resolve_actor_ref<R: Actor + Referable>(
820        &self,
821        actor_ref: &reference::ActorRef<R>,
822    ) -> Option<ActorHandle<R>> {
823        let cell = self.inner.instances.get(actor_ref.actor_id())?.upgrade()?;
824        // An actor whose status is terminal has stopped processing
825        // messages even if its InstanceCell Arc is still alive (e.g.
826        // held by the introspect task during teardown).
827        if cell.status().borrow().is_terminal() {
828            return None;
829        }
830        cell.downcast_handle()
831    }
832
833    /// Create a root allocation in the proc.
834    fn allocate_root_id(&self, name: &str) -> Result<reference::ActorId, anyhow::Error> {
835        let name = name.to_string();
836        match self.state().roots.entry(name.to_string()) {
837            Entry::Vacant(entry) => {
838                entry.insert(AtomicUsize::new(1));
839            }
840            Entry::Occupied(_) => {
841                anyhow::bail!("an actor with name '{}' has already been spawned", name)
842            }
843        }
844        Ok(reference::ActorId::new(
845            self.state().proc_id.clone(),
846            name.to_string(),
847            0,
848        ))
849    }
850
851    /// Create a child allocation in the proc.
852    #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
853    pub(crate) fn allocate_child_id(
854        &self,
855        parent_id: &reference::ActorId,
856    ) -> Result<reference::ActorId, anyhow::Error> {
857        assert_eq!(*parent_id.proc_id(), self.state().proc_id);
858        let pid = match self.state().roots.get(parent_id.name()) {
859            None => anyhow::bail!(
860                "no actor named {} in proc {}",
861                parent_id.name(),
862                self.state().proc_id
863            ),
864            Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
865        };
866        Ok(parent_id.child_id(pid))
867    }
868
869    /// Allocate an actor ID with a custom name on this proc.
870    ///
871    /// See AI-1 (named-child pid) and AI-3 (controller ActorId
872    /// uniqueness) in module doc.
873    pub(crate) fn allocate_named_child_id(
874        &self,
875        parent_id: &reference::ActorId,
876        name: &str,
877    ) -> Result<reference::ActorId, anyhow::Error> {
878        let inherited = self.allocate_child_id(parent_id)?;
879        Ok(reference::ActorId::new(
880            inherited.proc_id().clone(),
881            name,
882            inherited.pid(),
883        ))
884    }
885
886    /// Downgrade to a weak reference that doesn't prevent the proc from being dropped.
887    pub fn downgrade(&self) -> WeakProc {
888        WeakProc::new(self)
889    }
890}
891
892#[async_trait]
893impl MailboxSender for Proc {
894    fn post_unchecked(
895        &self,
896        envelope: MessageEnvelope,
897        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
898    ) {
899        if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
900            self.state().proc_muxer.post(envelope, return_handle)
901        } else {
902            self.state().forwarder.post(envelope, return_handle)
903        }
904    }
905}
906
907/// A weak reference to a Proc that doesn't prevent it from being dropped.
908#[derive(Clone, Debug)]
909pub struct WeakProc(Weak<ProcState>);
910
911impl WeakProc {
912    fn new(proc: &Proc) -> Self {
913        Self(Arc::downgrade(&proc.inner))
914    }
915
916    /// Upgrade to a strong Proc reference, if the proc is still alive.
917    pub fn upgrade(&self) -> Option<Proc> {
918        self.0.upgrade().map(|inner| Proc { inner })
919    }
920}
921
922#[async_trait]
923impl MailboxSender for WeakProc {
924    fn post_unchecked(
925        &self,
926        envelope: MessageEnvelope,
927        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
928    ) {
929        match self.upgrade() {
930            Some(proc) => proc.post(envelope, return_handle),
931            None => envelope.undeliverable(
932                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
933                return_handle,
934            ),
935        }
936    }
937}
938
939/// Represents a single work item used by the instance to dispatch to
940/// actor handles. Specifically, this enables handler polymorphism.
941pub struct WorkCell<A: Actor + Send>(
942    Box<
943        dyn for<'a> FnOnce(
944                &'a mut A,
945                &'a Instance<A>,
946            )
947                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
948            + Send
949            + Sync,
950    >,
951);
952
953impl<A: Actor + Send> WorkCell<A> {
954    /// Create a new WorkCell from a concrete function (closure).
955    fn new(
956        f: impl for<'a> FnOnce(
957            &'a mut A,
958            &'a Instance<A>,
959        )
960            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
961        + Send
962        + Sync
963        + 'static,
964    ) -> Self {
965        Self(Box::new(f))
966    }
967
968    /// Handle the message represented by this work cell.
969    pub fn handle<'a>(
970        self,
971        actor: &'a mut A,
972        instance: &'a Instance<A>,
973    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
974        (self.0)(actor, instance)
975    }
976}
977
978/// Context for a message currently being handled by an Instance.
979pub struct Context<'a, A: Actor> {
980    instance: &'a Instance<A>,
981    headers: Flattrs,
982}
983
984impl<'a, A: Actor> Context<'a, A> {
985    /// Construct a new Context.
986    pub fn new(instance: &'a Instance<A>, headers: Flattrs) -> Self {
987        Self { instance, headers }
988    }
989
990    /// Get a reference to the message headers.
991    pub fn headers(&self) -> &Flattrs {
992        &self.headers
993    }
994}
995
996impl<A: Actor> Deref for Context<'_, A> {
997    type Target = Instance<A>;
998
999    fn deref(&self) -> &Self::Target {
1000        self.instance
1001    }
1002}
1003
1004/// An actor instance. This is responsible for managing a running actor, including
1005/// its full lifecycle, supervision, signal management, etc. Instances can represent
1006/// a managed actor or a "client" actor that has joined the proc.
1007pub struct Instance<A: Actor> {
1008    inner: Arc<InstanceState<A>>,
1009}
1010
1011impl<A: Actor> fmt::Debug for Instance<A> {
1012    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1013        f.debug_struct("Instance").field("inner", &"..").finish()
1014    }
1015}
1016
1017struct InstanceState<A: Actor> {
1018    /// The proc that owns this instance.
1019    proc: Proc,
1020
1021    /// The instance cell that manages instance hierarchy.
1022    cell: InstanceCell,
1023
1024    /// The mailbox associated with the actor.
1025    mailbox: Mailbox,
1026
1027    ports: Arc<Ports<A>>,
1028
1029    /// A watch for communicating the actor's state.
1030    status_tx: watch::Sender<ActorStatus>,
1031
1032    /// This instance's globally unique ID.
1033    id: Uuid,
1034
1035    /// Used to assign sequence numbers for messages sent from this actor.
1036    sequencer: Sequencer,
1037
1038    /// Per-instance local storage.
1039    instance_locals: ActorLocalStorage,
1040}
1041
1042impl<A: Actor> InstanceState<A> {
1043    fn self_id(&self) -> &reference::ActorId {
1044        self.mailbox.actor_id()
1045    }
1046}
1047
1048impl<A: Actor> Drop for InstanceState<A> {
1049    fn drop(&mut self) {
1050        self.status_tx.send_if_modified(|status| {
1051            if status.is_terminal() {
1052                false
1053            } else {
1054                tracing::info!(
1055                    name = "ActorStatus",
1056                    actor_id = %self.self_id(),
1057                    actor_name = self.self_id().name(),
1058                    status = "Stopped",
1059                    prev_status = status.arm().unwrap_or("unknown"),
1060                    "instance is dropped",
1061                );
1062                *status = ActorStatus::Stopped("instance is dropped".into());
1063                true
1064            }
1065        });
1066    }
1067}
1068
1069/// Receivers created by [`Instance::new`] that must be threaded to
1070/// their respective consumers (actor loop, introspect task, etc.).
1071///
1072/// # Invariant
1073///
1074/// See S10 in `introspect` module doc.
1075pub struct InstanceReceivers<A: Actor> {
1076    /// Signal and supervision receivers for the actor loop. `None`
1077    /// for detached/client instances that don't run an actor loop.
1078    actor_loop: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1079    /// Work queue for dispatching messages to actor handlers.
1080    work: mpsc::UnboundedReceiver<WorkCell<A>>,
1081    /// Introspect message receiver for the dedicated introspect task.
1082    introspect: PortReceiver<IntrospectMessage>,
1083}
1084
1085impl<A: Actor> Instance<A> {
1086    /// Create a new actor instance in Created state.
1087    fn new(
1088        proc: Proc,
1089        actor_id: reference::ActorId,
1090        detached: bool,
1091        parent: Option<InstanceCell>,
1092    ) -> (Self, InstanceReceivers<A>) {
1093        // Set up messaging
1094        let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1095        let (work_tx, work_rx) = ordered_channel(
1096            actor_id.to_string(),
1097            hyperactor_config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
1098        );
1099        let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1100        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1101        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1102
1103        let actor_type = match TypeInfo::of::<A>() {
1104            Some(info) => ActorType::Named(info),
1105            None => ActorType::Anonymous(std::any::type_name::<A>()),
1106        };
1107        let actor_loop_ports = if detached {
1108            None
1109        } else {
1110            let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1111            let (supervision_port, supervision_receiver) = mailbox.open_port();
1112            Some((
1113                (signal_port, supervision_port),
1114                (signal_receiver, supervision_receiver),
1115            ))
1116        };
1117
1118        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1119
1120        // Introspect port: a separate channel handled by a dedicated
1121        // tokio task (not the actor's message loop). Pre-registered
1122        // so Ports::get finds the Occupied entry and skips WorkCell
1123        // creation. bind_actor_port() registers in the mailbox
1124        // dispatch table at IntrospectMessage::port().
1125        //
1126        // Exercises S3, S4, S9 (see introspect module doc).
1127        let (introspect_port, introspect_receiver) =
1128            ports.open_message_port::<IntrospectMessage>().unwrap();
1129        introspect_port.bind_actor_port();
1130
1131        let cell = InstanceCell::new(
1132            actor_id,
1133            actor_type,
1134            proc.clone(),
1135            actor_loop,
1136            status_rx,
1137            parent,
1138            ports.clone(),
1139        );
1140        let instance_id = Uuid::now_v7();
1141        let inner = Arc::new(InstanceState {
1142            proc,
1143            cell,
1144            mailbox,
1145            ports,
1146            status_tx,
1147            sequencer: Sequencer::new(instance_id),
1148            id: instance_id,
1149            instance_locals: ActorLocalStorage::new(),
1150        });
1151        (
1152            Self { inner },
1153            InstanceReceivers {
1154                actor_loop: actor_loop_receivers,
1155                work: work_rx,
1156                introspect: introspect_receiver,
1157            },
1158        )
1159    }
1160
1161    /// Notify subscribers of a change in the actors status and bump counters with the duration which
1162    /// the last status was active for.
1163    #[track_caller]
1164    fn change_status(&self, new: ActorStatus) {
1165        let old = self.inner.status_tx.send_replace(new.clone());
1166        // 2 cases are allowed:
1167        // * non-terminal -> non-terminal
1168        // * non-terminal -> terminal
1169        // terminal -> terminal is not allowed unless it is the same status (no-op).
1170        // terminal -> non-terminal is never allowed.
1171        assert!(
1172            !old.is_terminal() && !new.is_terminal()
1173                || !old.is_terminal() && new.is_terminal()
1174                || old == new,
1175            "actor changing status illegally, only allow non-terminal -> non-terminal \
1176            and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1177            self.self_id(),
1178            old,
1179            new
1180        );
1181        // Actor status changes between Idle and Processing when handling every
1182        // message. It creates too many logs if we want to log these 2 states.
1183        // Also, sometimes the actor transitions from Processing -> Processing.
1184        // Therefore we skip the status changes between them.
1185        if !((old.is_idle() && new.is_processing())
1186            || (old.is_processing() && new.is_idle())
1187            || old == new)
1188        {
1189            let new_status = new.arm().unwrap_or("unknown");
1190            let change_reason = match new {
1191                ActorStatus::Failed(reason) => reason.to_string(),
1192                _ => "".to_string(),
1193            };
1194            tracing::info!(
1195                name = "ActorStatus",
1196                actor_id = %self.self_id(),
1197                actor_name = self.self_id().name(),
1198                status = new_status,
1199                prev_status = old.arm().unwrap_or("unknown"),
1200                caller = %Location::caller(),
1201                change_reason,
1202            );
1203            let actor_id = hash_to_u64(self.self_id());
1204            notify_actor_status_changed(ActorStatusEvent {
1205                id: generate_actor_status_event_id(actor_id),
1206                timestamp: std::time::SystemTime::now(),
1207                actor_id,
1208                new_status: new_status.to_string(),
1209                reason: if change_reason.is_empty() {
1210                    None
1211                } else {
1212                    Some(change_reason)
1213                },
1214            });
1215        }
1216    }
1217
1218    fn is_terminal(&self) -> bool {
1219        self.inner.status_tx.borrow().is_terminal()
1220    }
1221
1222    fn is_stopping(&self) -> bool {
1223        self.inner.status_tx.borrow().is_stopping()
1224    }
1225
1226    /// This instance's actor ID.
1227    pub fn self_id(&self) -> &reference::ActorId {
1228        self.inner.self_id()
1229    }
1230
1231    /// Snapshot of this actor's introspection payload.
1232    ///
1233    /// Returns an [`IntrospectResult`] built from live [`InstanceCell`]
1234    /// state, without going through the actor message loop. This is
1235    /// safe to call from within a handler on the same actor (no
1236    /// self-send deadlock).
1237    ///
1238    /// The snapshot is best-effort: it reflects framework-owned state
1239    /// (status, message count, flight recorder, supervision children)
1240    /// at the instant of the call. `parent` is left as `None` —
1241    /// callers are responsible for setting topology context.
1242    ///
1243    /// Note: this acquires a write lock on the flight recorder spool
1244    /// and clones its contents. Suitable for occasional introspection
1245    /// requests, not for hot paths.
1246    pub fn introspect_payload(&self) -> crate::introspect::IntrospectResult {
1247        crate::introspect::live_actor_payload(&self.inner.cell)
1248    }
1249
1250    /// Publish domain-specific properties for introspection.
1251    ///
1252    /// Publish a complete Attrs bag for introspection. Replaces any
1253    /// previously published attrs.
1254    ///
1255    /// Debug builds assert that every key in the bag is tagged with
1256    /// the `INTROSPECT` meta-attribute.
1257    pub fn publish_attrs(&self, attrs: hyperactor_config::Attrs) {
1258        #[cfg(debug_assertions)]
1259        {
1260            use std::collections::HashSet;
1261            use std::sync::OnceLock;
1262
1263            use hyperactor_config::attrs::AttrKeyInfo;
1264
1265            static INTROSPECT_KEYS: OnceLock<HashSet<&'static str>> = OnceLock::new();
1266            let allowed = INTROSPECT_KEYS.get_or_init(|| {
1267                inventory::iter::<AttrKeyInfo>()
1268                    .filter(|info| info.meta.get(hyperactor_config::INTROSPECT).is_some())
1269                    .map(|info| info.name)
1270                    .collect()
1271            });
1272            for (name, _) in attrs.iter() {
1273                debug_assert!(
1274                    allowed.contains(name),
1275                    "publish_attrs: key {:?} is not tagged with INTROSPECT",
1276                    name
1277                );
1278            }
1279        }
1280        self.inner.cell.set_published_attrs(attrs);
1281    }
1282
1283    /// Publish a single attr key-value pair for introspection. Merges
1284    /// into existing published attrs (insert or overwrite).
1285    ///
1286    /// Debug builds assert that the key is tagged with the
1287    /// `INTROSPECT` meta-attribute.
1288    pub fn publish_attr<T: hyperactor_config::AttrValue>(
1289        &self,
1290        key: hyperactor_config::Key<T>,
1291        value: T,
1292    ) {
1293        debug_assert!(
1294            key.attrs().get(hyperactor_config::INTROSPECT).is_some(),
1295            "publish_attr called with non-introspection key: {}",
1296            key.name()
1297        );
1298        self.inner.cell.merge_published_attr(key, value);
1299    }
1300
1301    /// Mark this actor as system/infrastructure. System actors are
1302    /// hidden by default in the TUI (toggled via `s`).
1303    pub fn set_system(&self) {
1304        self.inner
1305            .cell
1306            .inner
1307            .is_system
1308            .store(true, Ordering::Relaxed);
1309    }
1310
1311    /// Register a callback for resolving non-addressable children.
1312    ///
1313    /// The callback runs on the actor's introspect task (not the
1314    /// actor loop), so it must be `Send + Sync` and must not access
1315    /// actor-mutable state. Capture cloned `Proc` references.
1316    ///
1317    /// Only `HostAgent` uses this today — for resolving system
1318    /// procs that have no independent `ProcAgent`.
1319    pub fn set_query_child_handler(
1320        &self,
1321        handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
1322    ) {
1323        self.inner.cell.set_query_child_handler(handler);
1324    }
1325
1326    /// Signal the actor to stop.
1327    pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
1328        tracing::info!(
1329            actor_id = %self.inner.cell.actor_id(),
1330            reason,
1331            "Instance::stop called",
1332        );
1333        self.inner
1334            .cell
1335            .signal(Signal::DrainAndStop(reason.to_string()))
1336    }
1337
1338    /// Signal the actor to abort with a provided reason.
1339    pub fn abort(&self, reason: &str) -> Result<(), ActorError> {
1340        tracing::info!(
1341            actor_id = %self.inner.cell.actor_id(),
1342            reason,
1343            "Instance::abort called",
1344        );
1345        self.inner.cell.signal(Signal::Abort(reason.to_string()))
1346    }
1347
1348    /// Open a new port that accepts M-typed messages. The returned
1349    /// port may be freely cloned, serialized, and passed around. The
1350    /// returned receiver should only be retained by the actor responsible
1351    /// for processing the delivered messages.
1352    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1353        self.inner.mailbox.open_port()
1354    }
1355
1356    /// Open a new one-shot port that accepts M-typed messages. The
1357    /// returned port may be used to send a single message; ditto the
1358    /// receiver may receive a single message.
1359    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1360        self.inner.mailbox.open_once_port()
1361    }
1362
1363    /// Get the per-instance local storage.
1364    pub fn locals(&self) -> &ActorLocalStorage {
1365        &self.inner.instance_locals
1366    }
1367
1368    /// Send a message to the actor running on the proc.
1369    pub fn post(&self, port_id: reference::PortId, headers: Flattrs, message: wirevalue::Any) {
1370        <Self as context::MailboxExt>::post(
1371            self,
1372            port_id,
1373            headers,
1374            message,
1375            true,
1376            context::SeqInfoPolicy::AssignNew,
1377        )
1378    }
1379
1380    /// Post a message with pre-set SEQ_INFO. Only for internal use by CommActor.
1381    ///
1382    /// # Warning
1383    /// This method bypasses the SEQ_INFO assertion. Do not use unless you are
1384    /// implementing mesh-level message routing (CommActor).
1385    #[doc(hidden)]
1386    pub fn post_with_external_seq_info(
1387        &self,
1388        port_id: reference::PortId,
1389        headers: Flattrs,
1390        message: wirevalue::Any,
1391    ) {
1392        <Self as context::MailboxExt>::post(
1393            self,
1394            port_id,
1395            headers,
1396            message,
1397            true,
1398            context::SeqInfoPolicy::AllowExternal,
1399        )
1400    }
1401
1402    /// Send a message to the actor itself with a delay usually to trigger some event.
1403    pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1404    where
1405        M: Message,
1406        A: Handler<M>,
1407    {
1408        // A global client to send self message.
1409        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
1410        let client = &CLIENT
1411            .get_or_init(|| Proc::runtime().instance("self_message_client").unwrap())
1412            .0;
1413        let port = self.port();
1414        let self_id = self.self_id().clone();
1415        tokio::spawn(async move {
1416            tokio::time::sleep(delay).await;
1417            if let Err(e) = port.send(&client, message) {
1418                // TODO: this is a fire-n-forget thread. We need to
1419                // handle errors in a better way.
1420                tracing::info!("{}: error sending delayed message: {}", self_id, e);
1421            }
1422        });
1423        Ok(())
1424    }
1425
1426    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
1427    /// the actor has been linked with its parent, if it has one.
1428    fn start(self, actor: A, receivers: InstanceReceivers<A>) -> ActorHandle<A> {
1429        let instance_cell = self.inner.cell.clone();
1430        let actor_id = self.inner.cell.actor_id().clone();
1431        let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1432
1433        // Spawn the introspect task — a separate tokio task that
1434        // reads InstanceCell directly and replies via the actor's
1435        // Mailbox. The actor loop never sees IntrospectMessage.
1436        let introspect_cell = self.inner.cell.clone();
1437        let introspect_mailbox = self.inner.mailbox.clone();
1438        tokio::spawn(crate::introspect::serve_introspect(
1439            introspect_cell,
1440            introspect_mailbox,
1441            receivers.introspect,
1442        ));
1443
1444        let actor_loop_receivers = receivers
1445            .actor_loop
1446            .expect("non-detached instance must have actor loop receivers");
1447        let actor_task_handle = A::spawn_server_task(
1448            panic_handler::with_backtrace_tracking(self.serve(
1449                actor,
1450                actor_loop_receivers,
1451                receivers.work,
1452            ))
1453            .instrument(Span::current()),
1454        );
1455        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1456        instance_cell
1457            .inner
1458            .actor_task_handle
1459            .set(actor_task_handle)
1460            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1461
1462        actor_handle
1463    }
1464
1465    async fn serve(
1466        mut self,
1467        mut actor: A,
1468        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1469        mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1470    ) {
1471        let result = self
1472            .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1473            .await;
1474
1475        assert!(self.is_stopping());
1476        let event = match result {
1477            Ok(stop_reason) => {
1478                let status = ActorStatus::Stopped(stop_reason);
1479                self.mailbox().close(status.clone());
1480                // success exit case
1481                self.change_status(status);
1482                None
1483            }
1484            Err(err) => {
1485                match *err.kind {
1486                    ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1487                        // Currently only terminated actors are allowed to raise supervision events.
1488                        // If we want to change that in the future, we need to modify the exit
1489                        // status here too, because we use event's actor_status as this actor's
1490                        // terminal status.
1491                        assert!(event.actor_status.is_terminal());
1492                        self.mailbox().close(event.actor_status.clone());
1493                        // FI-1: store supervision_event BEFORE change_status.
1494                        *self.inner.cell.inner.supervision_event.lock().unwrap() =
1495                            Some(event.clone());
1496                        self.change_status(event.actor_status.clone());
1497                        Some(event)
1498                    }
1499                    _ => {
1500                        let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1501                        let status = ActorStatus::Failed(error_kind);
1502                        self.mailbox().close(status.clone());
1503                        let event = ActorSupervisionEvent::new(
1504                            self.inner.cell.actor_id().clone(),
1505                            actor.display_name(),
1506                            status.clone(),
1507                            None,
1508                        );
1509                        // FI-1: store supervision_event BEFORE change_status.
1510                        *self.inner.cell.inner.supervision_event.lock().unwrap() =
1511                            Some(event.clone());
1512                        self.change_status(status);
1513                        Some(event)
1514                    }
1515                }
1516            }
1517        };
1518
1519        if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1520            if let Some(event) = event {
1521                // Parent exists, failure should be propagated to the parent.
1522                parent.send_supervision_event_or_crash(&self, event);
1523            }
1524            // TODO: we should get rid of this signal, and use *only* supervision events for
1525            // the purpose of conveying lifecycle changes
1526            if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1527                tracing::error!(
1528                    "{}: failed to send stop message to parent pid {}: {:?}",
1529                    self.self_id(),
1530                    parent.pid(),
1531                    err
1532                );
1533            }
1534        } else {
1535            // Failure happened to the root actor or orphaned child actors.
1536            // In either case, the failure should be propagated to proc.
1537            //
1538            // Note that orphaned actor is unexpected and would only happen if
1539            // there is a bug.
1540            if let Some(event) = event {
1541                self.inner
1542                    .proc
1543                    .handle_unhandled_supervision_event(&self, event);
1544            }
1545        }
1546    }
1547
1548    /// Runs the actor, and manages its supervision tree. When the function returns,
1549    /// the whole tree rooted at this actor has stopped. On success, returns the reason
1550    /// why the actor stopped. On failure, returns the error that caused the failure.
1551    async fn run_actor_tree(
1552        &mut self,
1553        actor: &mut A,
1554        mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1555        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1556    ) -> Result<String, ActorError> {
1557        // It is okay to catch all panics here, because we are in a tokio task,
1558        // and tokio will catch the panic anyway:
1559        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
1560        // What we do here is just to catch it early so we can handle it.
1561
1562        let mut did_panic = false;
1563        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1564            .catch_unwind()
1565            .await
1566        {
1567            Ok(result) => result,
1568            Err(_) => {
1569                did_panic = true;
1570                let panic_info = panic_handler::take_panic_info()
1571                    .map(|info| info.to_string())
1572                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1573                Err(ActorError::new(
1574                    self.self_id(),
1575                    ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1576                ))
1577            }
1578        };
1579
1580        assert!(!self.is_terminal());
1581        self.change_status(ActorStatus::Stopping);
1582        if let Err(err) = &result {
1583            tracing::error!("{}: actor failure: {}", self.self_id(), err);
1584        }
1585
1586        // After this point, we know we won't spawn any more children,
1587        // so we can safely read the current child keys.
1588        let mut to_unlink = Vec::new();
1589        for child in self.inner.cell.child_iter() {
1590            if let Err(err) = child
1591                .value()
1592                .signal(Signal::Stop("parent stopping".to_string()))
1593            {
1594                tracing::error!(
1595                    "{}: failed to send stop signal to child pid {}: {:?}",
1596                    self.self_id(),
1597                    child.key(),
1598                    err
1599                );
1600                to_unlink.push(child.value().clone());
1601            }
1602        }
1603        // Manually unlink children that have already been stopped.
1604        for child in to_unlink {
1605            self.inner.cell.unlink(&child);
1606        }
1607
1608        let (mut signal_receiver, _) = actor_loop_receivers;
1609        while self.inner.cell.child_count() > 0 {
1610            match tokio::time::timeout(Duration::from_millis(500), signal_receiver.recv()).await {
1611                Ok(signal) => {
1612                    if let Signal::ChildStopped(pid) = signal? {
1613                        assert!(self.inner.cell.get_child(pid).is_none());
1614                    }
1615                }
1616                Err(_) => {
1617                    tracing::warn!(
1618                        "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1619                        self.self_id()
1620                    );
1621                    // No more waiting to receive messages. Unlink all remaining
1622                    // children.
1623                    self.inner.cell.unlink_all();
1624                    break;
1625                }
1626            }
1627        }
1628        // Run the actor cleanup function before the actor stops to delete
1629        // resources. If it times out, continue with stopping the actor.
1630        // Don't call it if there was a panic, because the actor may
1631        // be in an invalid state and unable to access anything, for example
1632        // the GIL.
1633        let cleanup_result = if !did_panic {
1634            let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
1635            match tokio::time::timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1636                .await
1637            {
1638                Ok(Ok(x)) => Ok(x),
1639                Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1640                Err(e) => Err(ActorError::new(
1641                    self.self_id(),
1642                    ActorErrorKind::cleanup(e.into()),
1643                )),
1644            }
1645        } else {
1646            Ok(())
1647        };
1648        if let Err(ref actor_err) = result {
1649            // The original result error takes precedence over the cleanup error,
1650            // so make sure the cleanup error is still logged in that case.
1651            if let Err(ref err) = cleanup_result {
1652                tracing::warn!(
1653                    cleanup_err = %err,
1654                    %actor_err,
1655                    "ignoring cleanup error after actor error",
1656                );
1657            }
1658        }
1659        // If the original exit was not an error, let cleanup errors be
1660        // surfaced.
1661        result.and_then(|reason| cleanup_result.map(|_| reason))
1662    }
1663
1664    /// Initialize and run the actor until it fails or is stopped. On success,
1665    /// returns the reason why the actor stopped. On failure, returns the error
1666    /// that caused the failure.
1667    async fn run(
1668        &mut self,
1669        actor: &mut A,
1670        actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1671        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1672    ) -> Result<String, ActorError> {
1673        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1674
1675        self.change_status(ActorStatus::Initializing);
1676        actor
1677            .init(self)
1678            .await
1679            .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1680        let need_drain;
1681        let stop_reason;
1682        'messages: loop {
1683            self.change_status(ActorStatus::Idle);
1684            let metric_pairs =
1685                hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1686            tokio::select! {
1687                work = work_rx.recv() => {
1688                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1689                    ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1690                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1691                    let work = work.expect("inconsistent work queue state");
1692                    if let Err(err) = work.handle(actor, self).await {
1693                        for supervision_event in supervision_event_receiver.drain() {
1694                            self.handle_supervision_event(actor, supervision_event).await?;
1695                        }
1696                        let kind = ActorErrorKind::processing(err);
1697                        return Err(ActorError {
1698                            actor_id: Box::new(self.self_id().clone()),
1699                            kind: Box::new(kind),
1700                        });
1701                    }
1702                }
1703                signal = signal_receiver.recv() => {
1704                    let signal = signal.map_err(ActorError::from);
1705                    tracing::debug!("Received signal {signal:?}");
1706                    match signal? {
1707                        Signal::Stop(reason) => {
1708                            need_drain = false;
1709                            stop_reason = reason;
1710                            break 'messages;
1711                        },
1712                        Signal::DrainAndStop(reason) => {
1713                            need_drain = true;
1714                            stop_reason = reason;
1715                            break 'messages;
1716                        },
1717                        Signal::ChildStopped(pid) => {
1718                            assert!(self.inner.cell.get_child(pid).is_none());
1719                        },
1720                        Signal::Abort(reason) => {
1721                            return Err(ActorError { actor_id: Box::new(self.self_id().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) });
1722                        }
1723                    }
1724                }
1725                Ok(supervision_event) = supervision_event_receiver.recv() => {
1726                    self.handle_supervision_event(actor, supervision_event).await?;
1727                }
1728            }
1729            self.inner
1730                .cell
1731                .inner
1732                .num_processed_messages
1733                .fetch_add(1, Ordering::SeqCst);
1734        }
1735
1736        if need_drain {
1737            let mut n = 0;
1738            while let Ok(work) = work_rx.try_recv() {
1739                if let Err(err) = work.handle(actor, self).await {
1740                    return Err(ActorError::new(
1741                        self.self_id(),
1742                        ActorErrorKind::processing(err),
1743                    ));
1744                }
1745                n += 1;
1746            }
1747            tracing::debug!("drained {} messages", n);
1748        }
1749        tracing::debug!(
1750            actor_id = %self.self_id(),
1751            reason = stop_reason,
1752            "exited actor loop",
1753        );
1754        Ok(stop_reason)
1755    }
1756
1757    /// Handle a supervision event using the provided actor.
1758    pub async fn handle_supervision_event(
1759        &self,
1760        actor: &mut A,
1761        supervision_event: ActorSupervisionEvent,
1762    ) -> Result<(), ActorError> {
1763        // Handle the supervision event with the current actor.
1764        match actor
1765            .handle_supervision_event(self, &supervision_event)
1766            .await
1767        {
1768            Ok(true) => {
1769                // The supervision event was handled by this actor, nothing more to do.
1770                Ok(())
1771            }
1772            Ok(false) => {
1773                let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1774                Err(ActorError::new(self.self_id(), kind))
1775            }
1776            Err(err) => {
1777                // The actor failed to handle the supervision event, it should die.
1778                // Create a new supervision event for this failure and propagate it.
1779                let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1780                    err.to_string(),
1781                    Box::new(supervision_event),
1782                );
1783                Err(ActorError::new(self.self_id(), kind))
1784            }
1785        }
1786    }
1787
1788    async unsafe fn handle_message<M: Message>(
1789        &self,
1790        actor: &mut A,
1791        type_info: Option<&'static TypeInfo>,
1792        headers: Flattrs,
1793        message: M,
1794    ) -> Result<(), anyhow::Error>
1795    where
1796        A: Handler<M>,
1797    {
1798        // Build HandlerInfo from TypeInfo (zero-copy) or fall back to type_name.
1799        let handler_info = match type_info {
1800            Some(info) => {
1801                // SAFETY: The caller promises to pass the correct type info.
1802                let arm = unsafe { info.arm_unchecked(&message as *const M as *const ()) };
1803                HandlerInfo::from_static(info.typename(), arm)
1804            }
1805            None => {
1806                // Fall back to std::any::type_name (also static, zero-copy).
1807                HandlerInfo::from_static(std::any::type_name::<M>(), None)
1808            }
1809        };
1810        // Use a helper function for a better instrument log.
1811        self.handle_message_with_handler_info(actor, handler_info, headers, message)
1812            .await
1813    }
1814
1815    // Skip serializing all fields except HandlerInfo which includes the typename.
1816    #[tracing::instrument(level = "debug", name = "handle_message", skip_all, fields(actor_id = %self.self_id(), message_type = %handler_info))]
1817    async fn handle_message_with_handler_info<M: Message>(
1818        &self,
1819        actor: &mut A,
1820        handler_info: HandlerInfo,
1821        headers: Flattrs,
1822        message: M,
1823    ) -> Result<(), anyhow::Error>
1824    where
1825        A: Handler<M>,
1826    {
1827        let now = std::time::SystemTime::now();
1828        let handler_info = Some(handler_info);
1829        self.change_status(ActorStatus::Processing(now, handler_info.clone()));
1830        crate::mailbox::headers::log_message_latency_if_sampling(
1831            &headers,
1832            self.self_id().to_string(),
1833        );
1834
1835        let message_id = headers.get(crate::mailbox::headers::TELEMETRY_MESSAGE_ID);
1836
1837        if let Some(message_id) = message_id {
1838            let from_actor_id = headers
1839                .get(crate::mailbox::headers::SENDER_ACTOR_ID_HASH)
1840                .unwrap_or(0);
1841            let to_actor_id = hash_to_u64(self.self_id());
1842            let port_id = headers.get(crate::mailbox::headers::TELEMETRY_PORT_ID);
1843
1844            notify_message(hyperactor_telemetry::MessageEvent {
1845                timestamp: now,
1846                id: message_id,
1847                from_actor_id,
1848                to_actor_id,
1849                // TODO: populate endpoint
1850                endpoint: None,
1851                port_id,
1852            });
1853
1854            notify_message_status(hyperactor_telemetry::MessageStatusEvent {
1855                timestamp: now,
1856                id: hyperactor_telemetry::generate_status_event_id(message_id),
1857                message_id,
1858                status: "active".to_string(),
1859            });
1860        }
1861
1862        // Record the message handler being invoked.
1863        *self.inner.cell.inner.last_message_handler.write().unwrap() = handler_info;
1864
1865        let context = Context::new(self, headers);
1866        // Pass a reference to the context to the handler, so that deref
1867        // coercion allows the `this` argument to be treated exactly like
1868        // &Instance<A>.
1869        let start = Instant::now();
1870        let result = actor
1871            .handle(&context, message)
1872            .instrument(self.inner.cell.inner.recording.span())
1873            .await;
1874        let elapsed_us = start.elapsed().as_micros() as u64;
1875        self.inner
1876            .cell
1877            .inner
1878            .total_processing_time_us
1879            .fetch_add(elapsed_us, Ordering::SeqCst);
1880
1881        if let Some(message_id) = message_id {
1882            notify_message_status(hyperactor_telemetry::MessageStatusEvent {
1883                timestamp: std::time::SystemTime::now(),
1884                id: hyperactor_telemetry::generate_status_event_id(message_id),
1885                message_id,
1886                status: "complete".to_string(),
1887            });
1888        }
1889
1890        result
1891    }
1892
1893    /// Spawn on child on this instance.
1894    pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1895        self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
1896    }
1897
1898    /// Spawn a named child actor on this instance. The child gets a
1899    /// descriptive name in its ActorId instead of inheriting this
1900    /// instance's name. Supervision linkage is preserved.
1901    pub fn spawn_with_name<C: Actor>(
1902        &self,
1903        name: &str,
1904        actor: C,
1905    ) -> anyhow::Result<ActorHandle<C>> {
1906        self.inner
1907            .proc
1908            .spawn_named_child(self.inner.cell.clone(), name, actor)
1909    }
1910
1911    /// Create a new direct child instance.
1912    pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1913        self.inner.proc.child_instance(self.inner.cell.clone())
1914    }
1915
1916    /// Return a handle port handle representing the actor's message
1917    /// handler for M-typed messages.
1918    pub fn port<M: Message>(&self) -> PortHandle<M>
1919    where
1920        A: Handler<M>,
1921    {
1922        self.inner.ports.get()
1923    }
1924
1925    /// The [`ActorHandle`] corresponding to this instance.
1926    pub fn handle(&self) -> ActorHandle<A> {
1927        ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
1928    }
1929
1930    /// The owning actor ref.
1931    pub fn bind<R: Binds<A>>(&self) -> reference::ActorRef<R> {
1932        self.inner.cell.bind(self.inner.ports.as_ref())
1933    }
1934
1935    // Temporary in order to support python bindings.
1936    #[doc(hidden)]
1937    pub fn mailbox_for_py(&self) -> &Mailbox {
1938        &self.inner.mailbox
1939    }
1940
1941    /// The owning proc.
1942    pub fn proc(&self) -> &Proc {
1943        &self.inner.proc
1944    }
1945
1946    /// Clone this Instance to get an owned struct that can be
1947    /// plumbed through python. This should really only be called
1948    /// for the explicit purpose of being passed into python
1949    #[doc(hidden)]
1950    pub fn clone_for_py(&self) -> Self {
1951        Self {
1952            inner: Arc::clone(&self.inner),
1953        }
1954    }
1955
1956    /// Get the join handle associated with this actor.
1957    fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1958        self.inner.cell.inner.actor_task_handle.get()
1959    }
1960
1961    /// Return this instance's sequencer.
1962    pub fn sequencer(&self) -> &Sequencer {
1963        &self.inner.sequencer
1964    }
1965
1966    /// Return this instance's ID.
1967    pub fn instance_id(&self) -> Uuid {
1968        self.inner.id
1969    }
1970
1971    /// Return a handle to this instance's parent actor, if it has one.
1972    pub fn parent_handle<P: Actor>(&self) -> Option<ActorHandle<P>> {
1973        let parent_cell = self.inner.cell.inner.parent.upgrade()?;
1974        let ports = if let Ok(ports) = parent_cell.inner.ports.clone().downcast() {
1975            ports
1976        } else {
1977            return None;
1978        };
1979        Some(ActorHandle::new(parent_cell, ports))
1980    }
1981}
1982
1983impl<A: Actor> context::Mailbox for Instance<A> {
1984    fn mailbox(&self) -> &Mailbox {
1985        &self.inner.mailbox
1986    }
1987}
1988
1989impl<A: Actor> context::Mailbox for Context<'_, A> {
1990    fn mailbox(&self) -> &Mailbox {
1991        &self.instance.inner.mailbox
1992    }
1993}
1994
1995impl<A: Actor> context::Mailbox for &Instance<A> {
1996    fn mailbox(&self) -> &Mailbox {
1997        &self.inner.mailbox
1998    }
1999}
2000
2001impl<A: Actor> context::Mailbox for &Context<'_, A> {
2002    fn mailbox(&self) -> &Mailbox {
2003        &self.instance.inner.mailbox
2004    }
2005}
2006
2007impl<A: Actor> context::Actor for Instance<A> {
2008    type A = A;
2009    fn instance(&self) -> &Instance<A> {
2010        self
2011    }
2012}
2013
2014impl<A: Actor> context::Actor for Context<'_, A> {
2015    type A = A;
2016    fn instance(&self) -> &Instance<A> {
2017        self
2018    }
2019}
2020
2021impl<A: Actor> context::Actor for &Instance<A> {
2022    type A = A;
2023    fn instance(&self) -> &Instance<A> {
2024        self
2025    }
2026}
2027
2028impl<A: Actor> context::Actor for &Context<'_, A> {
2029    type A = A;
2030    fn instance(&self) -> &Instance<A> {
2031        self
2032    }
2033}
2034
2035impl Instance<()> {
2036    /// See [Mailbox::bind_actor_port] for details.
2037    pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
2038        assert!(
2039            self.actor_task_handle().is_none(),
2040            "can only bind actor port on instance with no running actor task"
2041        );
2042        self.inner.mailbox.bind_actor_port()
2043    }
2044}
2045
2046#[derive(Debug)]
2047enum ActorType {
2048    Named(&'static TypeInfo),
2049    Anonymous(&'static str),
2050}
2051
2052impl ActorType {
2053    fn type_name(&self) -> &str {
2054        match self {
2055            ActorType::Named(info) => info.typename(),
2056            ActorType::Anonymous(name) => name,
2057        }
2058    }
2059}
2060
2061/// InstanceCell contains all of the type-erased, shareable state of an instance.
2062/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
2063/// to access the underlying instance.
2064///
2065/// InstanceCell is reference counted and cloneable.
2066#[derive(Clone)]
2067pub struct InstanceCell {
2068    inner: Arc<InstanceCellState>,
2069}
2070
2071impl fmt::Debug for InstanceCell {
2072    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2073        f.debug_struct("InstanceCell")
2074            .field("actor_id", &self.inner.actor_id)
2075            .field("actor_type", &self.inner.actor_type)
2076            .finish()
2077    }
2078}
2079
2080struct InstanceCellState {
2081    /// The actor's id.
2082    actor_id: reference::ActorId,
2083
2084    /// Actor info contains the actor's type information.
2085    actor_type: ActorType,
2086
2087    /// The proc in which the actor is running.
2088    proc: Proc,
2089
2090    /// Control port handles to the actor loop, if one is running.
2091    actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2092
2093    /// An observer that stores the current status of the actor.
2094    status: watch::Receiver<ActorStatus>,
2095
2096    /// A weak reference to this instance's parent.
2097    parent: WeakInstanceCell,
2098
2099    /// This instance's children by their PIDs.
2100    children: DashMap<reference::Index, InstanceCell>,
2101
2102    /// Access to the spawned actor's join handle.
2103    actor_task_handle: OnceLock<JoinHandle<()>>,
2104
2105    /// The set of named ports that are exported by this actor.
2106    exported_named_ports: DashMap<u64, &'static str>,
2107
2108    /// The number of messages processed by this actor.
2109    num_processed_messages: AtomicU64,
2110
2111    /// When this actor was created.
2112    created_at: SystemTime,
2113
2114    /// Name of the last message handler invoked.
2115    last_message_handler: RwLock<Option<HandlerInfo>>,
2116
2117    /// Total time spent processing messages, in microseconds.
2118    total_processing_time_us: AtomicU64,
2119
2120    /// The log recording associated with this actor. It is used to
2121    /// store a 'flight record' of events while the actor is running.
2122    recording: Recording,
2123
2124    /// Attrs-based introspection data published by the actor. Written
2125    /// by the actor via `Instance::publish_attrs()` /
2126    /// `Instance::publish_attr()`, and read by the introspection
2127    /// runtime handler when building node payloads.
2128    ///
2129    /// This bag may contain both mesh-level keys (`node_type`,
2130    /// `addr`, `num_procs`, ...) and actor-runtime keys (`status`,
2131    /// `messages_processed`, ...).
2132    published_attrs: RwLock<Option<hyperactor_config::Attrs>>,
2133
2134    /// Optional callback for resolving non-addressable children
2135    /// (e.g., system procs). Registered by infrastructure actors
2136    /// like `HostAgent` in `Actor::init`. Invoked by the
2137    /// introspection runtime handler for `QueryChild` messages.
2138    /// `None` means `QueryChild` returns a "not_found" error.
2139    ///
2140    /// See S7 in `introspect` module doc.
2141    query_child_handler: RwLock<
2142        Option<Box<dyn (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync>>,
2143    >,
2144
2145    /// The supervision event for this actor's failure, if any.
2146    /// See FI-1, FI-2 in `introspect` module doc.
2147    supervision_event: std::sync::Mutex<Option<crate::supervision::ActorSupervisionEvent>>,
2148
2149    /// Whether this actor is infrastructure/system (hidden by default
2150    /// in the TUI `s` toggle). Set by spawning code via
2151    /// `Instance::set_system()`.
2152    is_system: AtomicBool,
2153
2154    /// A type-erased reference to Ports<A>, which allows us to recover
2155    /// an ActorHandle<A> by downcasting.
2156    ports: Arc<dyn Any + Send + Sync>,
2157}
2158
2159impl InstanceCellState {
2160    /// Unlink this instance from its parent, if it has one. If it was unlinked,
2161    /// the parent is returned.
2162    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2163        self.parent
2164            .upgrade()
2165            .filter(|parent| parent.inner.unlink(self))
2166    }
2167
2168    /// Unlink this instance from a child.
2169    fn unlink(&self, child: &InstanceCellState) -> bool {
2170        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
2171        self.children.remove(&child.actor_id.pid()).is_some()
2172    }
2173}
2174
2175/// Select which terminated snapshots to evict when the retention cap
2176/// is exceeded.
2177///
2178/// Each entry is `(actor_id, Option<occurred_at>)` where `Some` means
2179/// the actor has `failure_info` (i.e. it failed), and `None` means a
2180/// clean stop.
2181///
2182/// Eviction priority:
2183/// 1. Cleanly-stopped actors are evicted first (arbitrary order).
2184/// 2. If more evictions are needed, failed actors are evicted
2185///    newest-first (descending `occurred_at`), preserving the
2186///    earliest failures which are closest to the root cause.
2187fn select_eviction_candidates(
2188    entries: &[(reference::ActorId, Option<String>)],
2189    excess: usize,
2190) -> Vec<reference::ActorId> {
2191    let mut clean: Vec<&reference::ActorId> = Vec::new();
2192    let mut failed: Vec<(&reference::ActorId, &str)> = Vec::new();
2193    for (id, occurred_at) in entries {
2194        match occurred_at {
2195            Some(ts) => failed.push((id, ts.as_str())),
2196            None => clean.push(id),
2197        }
2198    }
2199
2200    let mut to_remove: Vec<reference::ActorId> = Vec::new();
2201    let mut remaining = excess;
2202
2203    // Evict cleanly-stopped first.
2204    for id in clean {
2205        if remaining == 0 {
2206            break;
2207        }
2208        to_remove.push(id.clone());
2209        remaining -= 1;
2210    }
2211
2212    // If still over cap, evict most-recent failures first.
2213    if remaining > 0 {
2214        failed.sort_by(|a, b| b.1.cmp(a.1));
2215        for (id, _) in failed.into_iter().take(remaining) {
2216            to_remove.push(id.clone());
2217        }
2218    }
2219
2220    to_remove
2221}
2222
2223impl InstanceCell {
2224    /// Creates a new instance cell with the provided internal state. If a parent
2225    /// is provided, it is linked to this cell.
2226    fn new(
2227        actor_id: reference::ActorId,
2228        actor_type: ActorType,
2229        proc: Proc,
2230        actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2231        status: watch::Receiver<ActorStatus>,
2232        parent: Option<InstanceCell>,
2233        ports: Arc<dyn Any + Send + Sync>,
2234    ) -> Self {
2235        let _ais = actor_id.to_string();
2236        let cell = Self {
2237            inner: Arc::new(InstanceCellState {
2238                actor_id: actor_id.clone(),
2239                actor_type,
2240                proc: proc.clone(),
2241                actor_loop,
2242                status,
2243                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
2244                children: DashMap::new(),
2245                actor_task_handle: OnceLock::new(),
2246                exported_named_ports: DashMap::new(),
2247                num_processed_messages: AtomicU64::new(0),
2248                created_at: std::time::SystemTime::now(),
2249                last_message_handler: RwLock::new(None),
2250                total_processing_time_us: AtomicU64::new(0),
2251                recording: hyperactor_telemetry::recorder().record(64),
2252                published_attrs: RwLock::new(None),
2253                query_child_handler: RwLock::new(None),
2254                supervision_event: std::sync::Mutex::new(None),
2255                is_system: AtomicBool::new(false),
2256                ports,
2257            }),
2258        };
2259        cell.maybe_link_parent();
2260        proc.inner
2261            .instances
2262            .insert(actor_id.clone(), cell.downgrade());
2263        cell
2264    }
2265
2266    fn wrap(inner: Arc<InstanceCellState>) -> Self {
2267        Self { inner }
2268    }
2269
2270    /// The actor's ID.
2271    pub fn actor_id(&self) -> &reference::ActorId {
2272        &self.inner.actor_id
2273    }
2274
2275    /// The actor's PID.
2276    pub(crate) fn pid(&self) -> reference::Index {
2277        self.inner.actor_id.pid()
2278    }
2279
2280    /// The actor's join handle.
2281    #[allow(dead_code)]
2282    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
2283        self.inner.actor_task_handle.get()
2284    }
2285
2286    /// The instance's status observer.
2287    pub fn status(&self) -> &watch::Receiver<ActorStatus> {
2288        &self.inner.status
2289    }
2290
2291    /// The supervision event stored when this actor failed.
2292    /// `None` for actors that stopped cleanly or are still running.
2293    pub fn supervision_event(&self) -> Option<crate::supervision::ActorSupervisionEvent> {
2294        self.inner.supervision_event.lock().unwrap().clone()
2295    }
2296
2297    /// Send a signal to the actor.
2298    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
2299        if let Some((signal_port, _)) = &self.inner.actor_loop {
2300            // A global signal client is used to send signals to the actor.
2301            static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
2302            let client = &CLIENT
2303                .get_or_init(|| Proc::runtime().instance("global_signal_client").unwrap())
2304                .0;
2305            signal_port.send(&client, signal).map_err(ActorError::from)
2306        } else {
2307            tracing::warn!(
2308                "{}: attempted to send signal {} to detached actor",
2309                self.inner.actor_id,
2310                signal
2311            );
2312            Ok(())
2313        }
2314    }
2315
2316    /// Used by this actor's children to send a supervision event to this actor.
2317    /// When it fails to send, we will crash the process. As part of the crash,
2318    /// all the procs and actors running on this process will be terminated
2319    /// forcefully.
2320    ///
2321    /// Note that "let it crash" is the default behavior when a supervision event
2322    /// cannot be delivered upstream. It is the upstream's responsibility to
2323    /// detect and handle crashes.
2324    pub fn send_supervision_event_or_crash(
2325        &self,
2326        child_cx: &impl context::Actor, // context of the child who sends the event.
2327        event: ActorSupervisionEvent,
2328    ) {
2329        match &self.inner.actor_loop {
2330            Some((_, supervision_port)) => {
2331                if let Err(err) = supervision_port.send(child_cx, event) {
2332                    tracing::error!(
2333                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
2334                        self.actor_id(),
2335                        err
2336                    );
2337                    std::process::exit(1);
2338                }
2339            }
2340            None => {
2341                tracing::error!(
2342                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
2343                    self.actor_id(),
2344                    event,
2345                );
2346                std::process::exit(1);
2347            }
2348        }
2349    }
2350
2351    /// Downgrade this InstanceCell to a weak reference.
2352    pub fn downgrade(&self) -> WeakInstanceCell {
2353        WeakInstanceCell {
2354            inner: Arc::downgrade(&self.inner),
2355        }
2356    }
2357
2358    /// Link this instance to a new child.
2359    fn link(&self, child: InstanceCell) {
2360        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2361        self.inner.children.insert(child.pid(), child);
2362    }
2363
2364    /// Unlink this instance from a child.
2365    fn unlink(&self, child: &InstanceCell) {
2366        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
2367        self.inner.children.remove(&child.pid());
2368    }
2369
2370    /// Unlink this instance from all children.
2371    fn unlink_all(&self) {
2372        self.inner.children.clear();
2373    }
2374
2375    /// Link this instance to its parent, if it has one.
2376    fn maybe_link_parent(&self) {
2377        if let Some(parent) = self.inner.parent.upgrade() {
2378            parent.link(self.clone());
2379        }
2380    }
2381
2382    /// Unlink this instance from its parent, if it has one. If it was unlinked,
2383    /// the parent is returned.
2384    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
2385        self.inner.maybe_unlink_parent()
2386    }
2387
2388    /// Return an iterator over this instance's children. This may deadlock if the
2389    /// caller already holds a reference to any item in map.
2390    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, reference::Index, InstanceCell>> {
2391        self.inner.children.iter()
2392    }
2393
2394    /// The number of children this instance has.
2395    pub fn child_count(&self) -> usize {
2396        self.inner.children.len()
2397    }
2398
2399    /// Returns the ActorIds of this instance's direct children.
2400    pub fn child_actor_ids(&self) -> Vec<reference::ActorId> {
2401        self.inner
2402            .children
2403            .iter()
2404            .map(|entry| entry.value().actor_id().clone())
2405            .collect()
2406    }
2407
2408    /// Get a child by its PID.
2409    fn get_child(&self, pid: reference::Index) -> Option<InstanceCell> {
2410        self.inner.children.get(&pid).map(|child| child.clone())
2411    }
2412
2413    /// Access the flight recorder for this actor.
2414    pub fn recording(&self) -> &Recording {
2415        &self.inner.recording
2416    }
2417
2418    /// When this actor was created.
2419    pub fn created_at(&self) -> SystemTime {
2420        self.inner.created_at
2421    }
2422
2423    /// The number of messages processed by this actor.
2424    pub fn num_processed_messages(&self) -> u64 {
2425        self.inner.num_processed_messages.load(Ordering::SeqCst)
2426    }
2427
2428    /// The last message handler invoked by this actor.
2429    pub fn last_message_handler(&self) -> Option<HandlerInfo> {
2430        self.inner.last_message_handler.read().unwrap().clone()
2431    }
2432
2433    /// Total time spent processing messages, in microseconds.
2434    pub fn total_processing_time_us(&self) -> u64 {
2435        self.inner.total_processing_time_us.load(Ordering::SeqCst)
2436    }
2437
2438    /// Get parent instance cell, if it exists.
2439    pub fn parent(&self) -> Option<InstanceCell> {
2440        self.inner.parent.upgrade()
2441    }
2442
2443    /// The actor's type name.
2444    pub fn actor_type_name(&self) -> &str {
2445        self.inner.actor_type.type_name()
2446    }
2447
2448    /// Replace the published introspection attrs with a new bag.
2449    pub fn set_published_attrs(&self, attrs: hyperactor_config::Attrs) {
2450        *self.inner.published_attrs.write().unwrap() = Some(attrs);
2451    }
2452
2453    /// Set a single introspection attr, merging into the existing bag
2454    /// (or creating one if none exists).
2455    pub fn merge_published_attr<T: hyperactor_config::AttrValue>(
2456        &self,
2457        key: hyperactor_config::Key<T>,
2458        value: T,
2459    ) {
2460        self.inner
2461            .published_attrs
2462            .write()
2463            .unwrap()
2464            .get_or_insert_with(hyperactor_config::Attrs::new)
2465            .set(key, value);
2466    }
2467
2468    /// Read the published introspection attrs, if any.
2469    pub fn published_attrs(&self) -> Option<hyperactor_config::Attrs> {
2470        self.inner.published_attrs.read().unwrap().clone()
2471    }
2472
2473    /// Register a callback for resolving non-addressable children
2474    /// via `IntrospectMessage::QueryChild`.
2475    ///
2476    /// The callback runs on the actor's introspect task (a separate
2477    /// tokio task, not the actor's message loop), so it must be
2478    /// `Send + Sync` and must not access actor-mutable state.
2479    /// Capture cloned `Proc` references, not `&mut self`.
2480    pub fn set_query_child_handler(
2481        &self,
2482        handler: impl (Fn(&crate::reference::Reference) -> IntrospectResult) + Send + Sync + 'static,
2483    ) {
2484        *self.inner.query_child_handler.write().unwrap() = Some(Box::new(handler));
2485    }
2486
2487    /// Invoke the registered QueryChild handler, if any.
2488    pub fn query_child(&self, child_ref: &crate::reference::Reference) -> Option<IntrospectResult> {
2489        let guard = self.inner.query_child_handler.read().unwrap();
2490        guard.as_ref().map(|handler| handler(child_ref))
2491    }
2492
2493    /// Whether this actor is infrastructure/system.
2494    pub fn is_system(&self) -> bool {
2495        self.inner.is_system.load(Ordering::Relaxed)
2496    }
2497
2498    /// Store a post-mortem snapshot for this actor in the proc's
2499    /// `terminated_snapshots` map. Called by the introspect task
2500    /// just before exiting on terminal status.
2501    ///
2502    /// Eviction policy when the retention cap is exceeded:
2503    /// 1. Evict cleanly-stopped actors first (no `failure_info`).
2504    /// 2. When only failed actors remain, evict the most recent
2505    ///    (by `occurred_at`), preserving the earliest failures
2506    ///    which are closest to the root cause.
2507    pub fn store_terminated_snapshot(&self, payload: crate::introspect::IntrospectResult) {
2508        let snapshots = &self.inner.proc.inner.terminated_snapshots;
2509        snapshots.insert(self.actor_id().clone(), payload);
2510        let max = hyperactor_config::global::get(crate::config::TERMINATED_SNAPSHOT_RETENTION);
2511        let excess = snapshots.len().saturating_sub(max);
2512        if excess > 0 {
2513            // Build entries for the eviction selector.
2514            let entries: Vec<_> = snapshots
2515                .iter()
2516                .map(|entry| {
2517                    let occurred_at =
2518                        serde_json::from_str::<hyperactor_config::Attrs>(&entry.value().attrs)
2519                            .ok()
2520                            .and_then(|attrs| {
2521                                // Presence of FAILURE_ERROR_MESSAGE means the actor failed.
2522                                attrs
2523                                    .get(crate::introspect::FAILURE_ERROR_MESSAGE)
2524                                    .cloned()?;
2525                                // Extract occurred_at timestamp for sorting.
2526                                attrs
2527                                    .get(crate::introspect::FAILURE_OCCURRED_AT)
2528                                    .map(|t| humantime::format_rfc3339(*t).to_string())
2529                            });
2530                    (entry.key().clone(), occurred_at)
2531                })
2532                .collect();
2533
2534            for key in select_eviction_candidates(&entries, excess) {
2535                snapshots.remove(&key);
2536            }
2537        }
2538    }
2539
2540    /// This is temporary so that we can share binding code between handle and instance.
2541    /// We should find some (better) way to consolidate the two.
2542    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> reference::ActorRef<R> {
2543        <R as Binds<A>>::bind(ports);
2544        // Signal: pre-registered via open_message_port() in
2545        // Instance::new(), handled by the actor loop's select!.
2546        // Ports::bind() here reuses the existing handle.
2547        //
2548        // Undeliverable: dispatched through the work queue to the
2549        // actor's Handler<Undeliverable<MessageEnvelope>>.
2550        //
2551        // IntrospectMessage: pre-registered via open_message_port()
2552        // in Instance::new(), handled by a dedicated introspect task.
2553        // NOT bound here — its port is registered via
2554        // bind_actor_port() directly.
2555        ports.bind::<Signal>();
2556        ports.bind::<Undeliverable<MessageEnvelope>>();
2557        // TODO: consider sharing `ports.bound` directly.
2558        for entry in ports.bound.iter() {
2559            self.inner
2560                .exported_named_ports
2561                .insert(*entry.key(), entry.value());
2562        }
2563        reference::ActorRef::attest(self.actor_id().clone())
2564    }
2565
2566    /// Attempt to downcast this cell to a concrete actor handle.
2567    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
2568        let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
2569        Some(ActorHandle::new(self.clone(), ports))
2570    }
2571
2572    /// Traverse the subtree rooted at this instance in pre-order.
2573    /// The callback receives each InstanceCell and its depth (root = 0).
2574    /// Children are visited in pid order for deterministic traversal.
2575    pub fn traverse<F>(&self, f: &mut F)
2576    where
2577        F: FnMut(&InstanceCell, usize),
2578    {
2579        self.traverse_inner(0, f);
2580    }
2581
2582    fn traverse_inner<F>(&self, depth: usize, f: &mut F)
2583    where
2584        F: FnMut(&InstanceCell, usize),
2585    {
2586        f(self, depth);
2587        // Collect and sort children by pid for deterministic traversal order
2588        let mut children: Vec<_> = self.child_iter().map(|r| r.value().clone()).collect();
2589        children.sort_by_key(|c| c.pid());
2590        for child in children {
2591            child.traverse_inner(depth + 1, f);
2592        }
2593    }
2594}
2595
2596impl Drop for InstanceCellState {
2597    fn drop(&mut self) {
2598        if let Some(parent) = self.maybe_unlink_parent() {
2599            tracing::debug!(
2600                "instance {} was dropped with parent {} still linked",
2601                self.actor_id,
2602                parent.actor_id()
2603            );
2604        }
2605        if self.proc.inner.instances.remove(&self.actor_id).is_none() {
2606            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
2607        }
2608    }
2609}
2610
2611/// A weak version of the InstanceCell. This is used to provide cyclical
2612/// linkage between actors without creating a strong reference cycle.
2613#[derive(Debug, Clone)]
2614pub struct WeakInstanceCell {
2615    inner: Weak<InstanceCellState>,
2616}
2617
2618impl Default for WeakInstanceCell {
2619    fn default() -> Self {
2620        Self::new()
2621    }
2622}
2623
2624impl WeakInstanceCell {
2625    /// Create a new weak instance cell that is never upgradeable.
2626    pub fn new() -> Self {
2627        Self { inner: Weak::new() }
2628    }
2629
2630    /// Upgrade this weak instance cell to a strong reference, if possible.
2631    pub fn upgrade(&self) -> Option<InstanceCell> {
2632        self.inner.upgrade().map(InstanceCell::wrap)
2633    }
2634}
2635
2636/// A polymorphic dictionary that stores ports for an actor's handlers.
2637/// The interface memoizes the ports so that they are reused. We do not
2638/// (yet) support stable identifiers across multiple instances of the same
2639/// actor.
2640pub struct Ports<A: Actor> {
2641    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
2642    bound: DashMap<u64, &'static str>,
2643    mailbox: Mailbox,
2644    workq: OrderedSender<WorkCell<A>>,
2645}
2646
2647impl<A: Actor> Ports<A> {
2648    fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2649        Self {
2650            ports: DashMap::new(),
2651            bound: DashMap::new(),
2652            mailbox,
2653            workq,
2654        }
2655    }
2656
2657    /// Get a port for the Handler<M> of actor A.
2658    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2659    where
2660        A: Handler<M>,
2661    {
2662        let key = TypeId::of::<M>();
2663        match self.ports.entry(key) {
2664            Entry::Vacant(entry) => {
2665                // Some special case hackery, but it keeps the rest of the code (relatively) simple.
2666                assert_ne!(
2667                    key,
2668                    TypeId::of::<Signal>(),
2669                    "cannot provision Signal port through `Ports::get`"
2670                );
2671                assert_ne!(
2672                    key,
2673                    TypeId::of::<IntrospectMessage>(),
2674                    "cannot provision IntrospectMessage port through `Ports::get`"
2675                );
2676
2677                let type_info = TypeInfo::get_by_typeid(key);
2678                let workq = self.workq.clone();
2679                let actor_id = self.mailbox.actor_id().to_string();
2680                let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2681                    let seq_info = headers.get(SEQ_INFO);
2682
2683                    let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
2684                        Box::pin(async move {
2685                            // SAFETY: we guarantee that the passed type_info is for type M.
2686                            unsafe {
2687                                instance
2688                                    .handle_message(actor, type_info, headers, msg)
2689                                    .await
2690                            }
2691                        })
2692                    });
2693                    ACTOR_MESSAGE_QUEUE_SIZE.add(
2694                        1,
2695                        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2696                    );
2697                    if workq.enable_buffering {
2698                        match seq_info {
2699                            Some(SeqInfo::Session { session_id, seq }) => {
2700                                // TODO: return the message contained in the error instead of dropping them when converting
2701                                // to anyhow::Error. In that way, the message can be picked up by mailbox and returned to sender.
2702                                workq.send(session_id, seq, work).map_err(|e| match e {
2703                                    OrderedSenderError::InvalidZeroSeq(_) => {
2704                                        let error_msg = format!(
2705                                             "in enqueue func for {}, got seq 0 for message type {}",
2706                                            actor_id,
2707                                            std::any::type_name::<M>(),
2708                                        );
2709                                        tracing::error!(error_msg);
2710                                        anyhow::anyhow!(error_msg)
2711                                    }
2712                                    OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2713                                    OrderedSenderError::FlushError(e) => e,
2714                                })
2715                            }
2716                            Some(SeqInfo::Direct) => {
2717                                workq.direct_send(work).map_err(anyhow::Error::from)
2718                            }
2719                            None => {
2720                                let error_msg = format!(
2721                                    "in enqueue func for {}, buffering is enabled, but SEQ_INFO is not set for message type {}",
2722                                    actor_id,
2723                                    std::any::type_name::<M>(),
2724                                    );
2725                                tracing::error!(error_msg);
2726                                anyhow::bail!(error_msg);
2727                            }
2728                        }
2729                    } else {
2730                        workq.direct_send(work).map_err(anyhow::Error::from)
2731                    }
2732                });
2733                entry.insert(Box::new(port.clone()));
2734                port
2735            }
2736            Entry::Occupied(entry) => {
2737                let port = entry.get();
2738                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2739            }
2740        }
2741    }
2742
2743    /// Open a (typed) message port as in [`get`], but return a port receiver instead of dispatching
2744    /// the underlying handler.
2745    pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2746        match self.ports.entry(TypeId::of::<M>()) {
2747            Entry::Vacant(entry) => {
2748                let (port, receiver) = self.mailbox.open_port();
2749                entry.insert(Box::new(port.clone()));
2750                Some((port, receiver))
2751            }
2752            Entry::Occupied(_) => None,
2753        }
2754    }
2755
2756    /// Bind the given message type to its actor port.
2757    pub fn bind<M: RemoteMessage>(&self)
2758    where
2759        A: Handler<M>,
2760    {
2761        let port_index = M::port();
2762        match self.bound.entry(port_index) {
2763            Entry::Vacant(entry) => {
2764                self.get::<M>().bind_actor_port();
2765                entry.insert(M::typename());
2766            }
2767            Entry::Occupied(entry) => {
2768                assert_eq!(
2769                    *entry.get(),
2770                    M::typename(),
2771                    "bind {}: port index {} already bound to type {}",
2772                    M::typename(),
2773                    port_index,
2774                    entry.get(),
2775                );
2776            }
2777        }
2778    }
2779}
2780
2781#[cfg(test)]
2782mod tests {
2783    use std::assert_matches::assert_matches;
2784    use std::sync::atomic::AtomicBool;
2785
2786    use hyperactor_macros::export;
2787    use serde_json::json;
2788    use timed_test::async_timed_test;
2789    use tokio::sync::Barrier;
2790    use tokio::sync::oneshot;
2791    use tracing::Level;
2792    use tracing_subscriber::layer::SubscriberExt;
2793    use tracing_test::internal::logs_with_scope_contain;
2794
2795    use super::*;
2796    // needed for in-crate macro expansion
2797    use crate as hyperactor;
2798    use crate::HandleClient;
2799    use crate::Handler;
2800    use crate::testing::proc_supervison::ProcSupervisionCoordinator;
2801    use crate::testing::process_assertion::assert_termination;
2802
2803    #[derive(Debug, Default)]
2804    #[export]
2805    struct TestActor;
2806
2807    impl Actor for TestActor {}
2808
2809    #[derive(Handler, HandleClient, Debug)]
2810    enum TestActorMessage {
2811        Reply(oneshot::Sender<()>),
2812        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2813        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2814        Noop(),
2815        Fail(anyhow::Error),
2816        Panic(String),
2817        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2818    }
2819
2820    impl TestActor {
2821        async fn spawn_child(
2822            cx: &impl context::Actor,
2823            parent: &ActorHandle<TestActor>,
2824        ) -> ActorHandle<TestActor> {
2825            let (tx, rx) = oneshot::channel();
2826            parent.send(cx, TestActorMessage::Spawn(tx)).unwrap();
2827            rx.await.unwrap()
2828        }
2829    }
2830
2831    #[async_trait]
2832    #[crate::handle(TestActorMessage)]
2833    impl TestActorMessageHandler for TestActor {
2834        async fn reply(
2835            &mut self,
2836            _cx: &crate::Context<Self>,
2837            sender: oneshot::Sender<()>,
2838        ) -> Result<(), anyhow::Error> {
2839            sender.send(()).unwrap();
2840            Ok(())
2841        }
2842
2843        async fn wait(
2844            &mut self,
2845            _cx: &crate::Context<Self>,
2846            sender: oneshot::Sender<()>,
2847            receiver: oneshot::Receiver<()>,
2848        ) -> Result<(), anyhow::Error> {
2849            sender.send(()).unwrap();
2850            receiver.await.unwrap();
2851            Ok(())
2852        }
2853
2854        async fn forward(
2855            &mut self,
2856            cx: &crate::Context<Self>,
2857            destination: ActorHandle<TestActor>,
2858            message: Box<TestActorMessage>,
2859        ) -> Result<(), anyhow::Error> {
2860            // TODO: this needn't be async
2861            destination.send(cx, *message)?;
2862            Ok(())
2863        }
2864
2865        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2866            Ok(())
2867        }
2868
2869        async fn fail(
2870            &mut self,
2871            _cx: &crate::Context<Self>,
2872            err: anyhow::Error,
2873        ) -> Result<(), anyhow::Error> {
2874            Err(err)
2875        }
2876
2877        async fn panic(
2878            &mut self,
2879            _cx: &crate::Context<Self>,
2880            err_msg: String,
2881        ) -> Result<(), anyhow::Error> {
2882            panic!("{}", err_msg);
2883        }
2884
2885        async fn spawn(
2886            &mut self,
2887            cx: &crate::Context<Self>,
2888            reply: oneshot::Sender<ActorHandle<TestActor>>,
2889        ) -> Result<(), anyhow::Error> {
2890            let handle = TestActor.spawn(cx)?;
2891            reply.send(handle).unwrap();
2892            Ok(())
2893        }
2894    }
2895
2896    #[tracing_test::traced_test]
2897    #[async_timed_test(timeout_secs = 30)]
2898    async fn test_spawn_actor() {
2899        let proc = Proc::local();
2900        let (client, _) = proc.instance("client").unwrap();
2901        let handle = proc.spawn("test", TestActor).unwrap();
2902
2903        // Check on the join handle.
2904        assert!(logs_contain(
2905            format!(
2906                "{}: spawned with {:?}",
2907                handle.actor_id(),
2908                handle.cell().actor_task_handle().unwrap(),
2909            )
2910            .as_str()
2911        ));
2912
2913        let mut state = handle.status().clone();
2914
2915        // Send a ping-pong to the actor. Wait for the actor to become idle.
2916
2917        let (tx, rx) = oneshot::channel::<()>();
2918        handle.send(&client, TestActorMessage::Reply(tx)).unwrap();
2919        rx.await.unwrap();
2920
2921        state
2922            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2923            .await
2924            .unwrap();
2925
2926        // Make sure we enter processing state while the actor is handling a message.
2927        let (enter_tx, enter_rx) = oneshot::channel::<()>();
2928        let (exit_tx, exit_rx) = oneshot::channel::<()>();
2929
2930        handle
2931            .send(&client, TestActorMessage::Wait(enter_tx, exit_rx))
2932            .unwrap();
2933        enter_rx.await.unwrap();
2934        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= std::time::SystemTime::now());
2935        exit_tx.send(()).unwrap();
2936
2937        state
2938            .wait_for(|state| matches!(*state, ActorStatus::Idle))
2939            .await
2940            .unwrap();
2941
2942        handle.drain_and_stop("test").unwrap();
2943        handle.await;
2944        assert_matches!(&*state.borrow(), ActorStatus::Stopped(reason) if reason == "test");
2945    }
2946
2947    #[async_timed_test(timeout_secs = 30)]
2948    async fn test_proc_actors_messaging() {
2949        let proc = Proc::local();
2950        let (client, _) = proc.instance("client").unwrap();
2951        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
2952        let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
2953        let (tx, rx) = oneshot::channel::<()>();
2954        let reply_message = TestActorMessage::Reply(tx);
2955        first
2956            .send(
2957                &client,
2958                TestActorMessage::Forward(second, Box::new(reply_message)),
2959            )
2960            .unwrap();
2961        rx.await.unwrap();
2962    }
2963
2964    #[derive(Debug, Default)]
2965    #[export]
2966    struct LookupTestActor;
2967
2968    impl Actor for LookupTestActor {}
2969
2970    #[derive(Handler, HandleClient, Debug)]
2971    enum LookupTestMessage {
2972        ActorExists(
2973            reference::ActorRef<TestActor>,
2974            #[reply] reference::OncePortRef<bool>,
2975        ),
2976    }
2977
2978    #[async_trait]
2979    #[crate::handle(LookupTestMessage)]
2980    impl LookupTestMessageHandler for LookupTestActor {
2981        async fn actor_exists(
2982            &mut self,
2983            cx: &crate::Context<Self>,
2984            actor_ref: reference::ActorRef<TestActor>,
2985        ) -> Result<bool, anyhow::Error> {
2986            Ok(actor_ref.downcast_handle(cx).is_some())
2987        }
2988    }
2989
2990    #[async_timed_test(timeout_secs = 30)]
2991    async fn test_actor_lookup() {
2992        let proc = Proc::local();
2993        let (client, _handle) = proc.instance("client").unwrap();
2994
2995        let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
2996        let target_actor_ref = target_actor.bind();
2997        let lookup_actor = proc
2998            .spawn::<LookupTestActor>("lookup", LookupTestActor)
2999            .unwrap();
3000
3001        assert!(
3002            lookup_actor
3003                .actor_exists(&client, target_actor_ref.clone())
3004                .await
3005                .unwrap()
3006        );
3007
3008        // Make up a child actor. It shouldn't exist.
3009        assert!(
3010            !lookup_actor
3011                .actor_exists(
3012                    &client,
3013                    reference::ActorRef::attest(target_actor.actor_id().child_id(123).clone())
3014                )
3015                .await
3016                .unwrap()
3017        );
3018        // A wrongly-typed actor ref should also not obtain.
3019        assert!(
3020            !lookup_actor
3021                .actor_exists(
3022                    &client,
3023                    reference::ActorRef::attest(lookup_actor.actor_id().clone())
3024                )
3025                .await
3026                .unwrap()
3027        );
3028
3029        target_actor.drain_and_stop("test").unwrap();
3030        target_actor.await;
3031
3032        assert!(
3033            !lookup_actor
3034                .actor_exists(&client, target_actor_ref)
3035                .await
3036                .unwrap()
3037        );
3038
3039        lookup_actor.drain_and_stop("test").unwrap();
3040        lookup_actor.await;
3041    }
3042
3043    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
3044        assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
3045        assert_eq!(
3046            child.inner.parent.upgrade().unwrap().actor_id(),
3047            parent.actor_id()
3048        );
3049        assert_matches!(
3050            parent.inner.children.get(&child.pid()),
3051            Some(node) if node.actor_id() == child.actor_id()
3052        );
3053    }
3054
3055    #[tracing_test::traced_test]
3056    #[async_timed_test(timeout_secs = 30)]
3057    async fn test_spawn_child() {
3058        let proc = Proc::local();
3059        let (client, _) = proc.instance("client").unwrap();
3060
3061        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
3062        let second = TestActor::spawn_child(&client, &first).await;
3063        let third = TestActor::spawn_child(&client, &second).await;
3064
3065        // Check we've got the join handles.
3066        assert!(logs_with_scope_contain(
3067            "hyperactor::proc",
3068            format!(
3069                "{}: spawned with {:?}",
3070                first.actor_id(),
3071                first.cell().actor_task_handle().unwrap()
3072            )
3073            .as_str()
3074        ));
3075        assert!(logs_with_scope_contain(
3076            "hyperactor::proc",
3077            format!(
3078                "{}: spawned with {:?}",
3079                second.actor_id(),
3080                second.cell().actor_task_handle().unwrap()
3081            )
3082            .as_str()
3083        ));
3084        assert!(logs_with_scope_contain(
3085            "hyperactor::proc",
3086            format!(
3087                "{}: spawned with {:?}",
3088                third.actor_id(),
3089                third.cell().actor_task_handle().unwrap()
3090            )
3091            .as_str()
3092        ));
3093
3094        // These are allocated in sequence:
3095        assert_eq!(first.actor_id().proc_id(), proc.proc_id());
3096        assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
3097        assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
3098
3099        // Supervision tree is constructed correctly.
3100        validate_link(third.cell(), second.cell());
3101        validate_link(second.cell(), first.cell());
3102        assert!(first.cell().inner.parent.upgrade().is_none());
3103
3104        // Supervision tree is torn down correctly.
3105        // Once each actor is stopped, it should have no linked children.
3106        let third_cell = third.cell().clone();
3107        third.drain_and_stop("test").unwrap();
3108        third.await;
3109        assert!(third_cell.inner.children.is_empty());
3110        drop(third_cell);
3111        validate_link(second.cell(), first.cell());
3112
3113        let second_cell = second.cell().clone();
3114        second.drain_and_stop("test").unwrap();
3115        second.await;
3116        assert!(second_cell.inner.children.is_empty());
3117        drop(second_cell);
3118
3119        let first_cell = first.cell().clone();
3120        first.drain_and_stop("test").unwrap();
3121        first.await;
3122        assert!(first_cell.inner.children.is_empty());
3123    }
3124
3125    #[async_timed_test(timeout_secs = 30)]
3126    async fn test_child_lifecycle() {
3127        let proc = Proc::local();
3128        let (client, _) = proc.instance("client").unwrap();
3129
3130        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3131        let root_1 = TestActor::spawn_child(&client, &root).await;
3132        let root_2 = TestActor::spawn_child(&client, &root).await;
3133        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3134
3135        root.drain_and_stop("test").unwrap();
3136        root.await;
3137
3138        for actor in [root_1, root_2, root_2_1] {
3139            assert!(actor.send(&client, TestActorMessage::Noop()).is_err());
3140            assert_matches!(actor.await, ActorStatus::Stopped(reason) if reason == "parent stopping");
3141        }
3142    }
3143
3144    #[async_timed_test(timeout_secs = 30)]
3145    async fn test_parent_failure() {
3146        let proc = Proc::local();
3147        let (client, _) = proc.instance("client").unwrap();
3148        // Need to set a supervison coordinator for this Proc because there will
3149        // be actor failure(s) in this test which trigger supervision.
3150        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3151
3152        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3153        let root_1 = TestActor::spawn_child(&client, &root).await;
3154        let root_2 = TestActor::spawn_child(&client, &root).await;
3155        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
3156
3157        root_2
3158            .send(
3159                &client,
3160                TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
3161            )
3162            .unwrap();
3163        let _root_2_actor_id = root_2.actor_id().clone();
3164        assert_matches!(
3165            root_2.await,
3166            ActorStatus::Failed(err) if err.to_string() == "some random failure"
3167        );
3168
3169        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
3170        // stopped by a parent failure?
3171        // Currently the parent fails with an error related to the child's failure.
3172        assert_matches!(
3173            root.await,
3174            ActorStatus::Failed(err) if err.to_string().contains("some random failure")
3175        );
3176        assert_matches!(root_2_1.await, ActorStatus::Stopped(_));
3177        assert_matches!(root_1.await, ActorStatus::Stopped(_));
3178    }
3179
3180    #[async_timed_test(timeout_secs = 30)]
3181    async fn test_multi_handler() {
3182        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
3183        // public interfaces to multi-handlers. This will be fixed shortly.
3184
3185        #[derive(Debug)]
3186        struct TestActor(Arc<AtomicUsize>);
3187
3188        #[async_trait]
3189        impl Actor for TestActor {}
3190
3191        #[async_trait]
3192        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
3193            async fn handle(
3194                &mut self,
3195                cx: &crate::Context<Self>,
3196                message: OncePortHandle<PortHandle<usize>>,
3197            ) -> anyhow::Result<()> {
3198                message.send(cx, cx.port())?;
3199                Ok(())
3200            }
3201        }
3202
3203        #[async_trait]
3204        impl Handler<usize> for TestActor {
3205            async fn handle(
3206                &mut self,
3207                _cx: &crate::Context<Self>,
3208                message: usize,
3209            ) -> anyhow::Result<()> {
3210                self.0.fetch_add(message, Ordering::SeqCst);
3211                Ok(())
3212            }
3213        }
3214
3215        let proc = Proc::local();
3216        let state = Arc::new(AtomicUsize::new(0));
3217        let actor = TestActor(state.clone());
3218        let handle = proc.spawn::<TestActor>("test", actor).unwrap();
3219        let (client, _) = proc.instance("client").unwrap();
3220        let (tx, rx) = client.open_once_port();
3221        handle.send(&client, tx).unwrap();
3222        let usize_handle = rx.recv().await.unwrap();
3223        usize_handle.send(&client, 123).unwrap();
3224
3225        handle.drain_and_stop("test").unwrap();
3226        handle.await;
3227
3228        assert_eq!(state.load(Ordering::SeqCst), 123);
3229    }
3230
3231    #[async_timed_test(timeout_secs = 30)]
3232    async fn test_actor_panic() {
3233        // Need this custom hook to store panic backtrace in task_local.
3234        panic_handler::set_panic_hook();
3235
3236        let proc = Proc::local();
3237        // Need to set a supervison coordinator for this Proc because there will
3238        // be actor failure(s) in this test which trigger supervision.
3239        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3240
3241        let (client, _handle) = proc.instance("client").unwrap();
3242        let actor_handle = proc.spawn("test", TestActor).unwrap();
3243        actor_handle
3244            .panic(&client, "some random failure".to_string())
3245            .await
3246            .unwrap();
3247        let actor_status = actor_handle.await;
3248
3249        // Note: even when the test passes, the panic stacktrace will still be
3250        // printed to stderr because that is the behavior controlled by the panic
3251        // hook.
3252        assert_matches!(actor_status, ActorStatus::Failed(_));
3253        if let ActorStatus::Failed(err) = actor_status {
3254            let error_msg = err.to_string();
3255            // Verify panic message is captured
3256            assert!(error_msg.contains("some random failure"));
3257            // Verify backtrace is captured. Note the backtrace message might
3258            // change in the future. If that happens, we need to update this
3259            // statement with something up-to-date.
3260            assert!(error_msg.contains("library/std/src/panicking.rs"));
3261        }
3262    }
3263
3264    #[async_timed_test(timeout_secs = 30)]
3265    async fn test_local_supervision_propagation() {
3266        hyperactor_telemetry::initialize_logging_for_test();
3267
3268        #[derive(Debug)]
3269        struct TestActor(Arc<AtomicBool>, bool);
3270
3271        #[async_trait]
3272        impl Actor for TestActor {
3273            async fn handle_supervision_event(
3274                &mut self,
3275                _this: &Instance<Self>,
3276                _event: &ActorSupervisionEvent,
3277            ) -> Result<bool, anyhow::Error> {
3278                if !self.1 {
3279                    return Ok(false);
3280                }
3281
3282                tracing::error!(
3283                    "{}: supervision event received: {:?}",
3284                    _this.self_id(),
3285                    _event
3286                );
3287                self.0.store(true, Ordering::SeqCst);
3288                Ok(true)
3289            }
3290        }
3291
3292        #[async_trait]
3293        impl Handler<String> for TestActor {
3294            async fn handle(
3295                &mut self,
3296                cx: &crate::Context<Self>,
3297                message: String,
3298            ) -> anyhow::Result<()> {
3299                tracing::info!("{} received message: {}", cx.self_id(), message);
3300                Err(anyhow::anyhow!(message))
3301            }
3302        }
3303
3304        let proc = Proc::local();
3305        let (client, _) = proc.instance("client").unwrap();
3306        let (reported_event, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3307
3308        let root_state = Arc::new(AtomicBool::new(false));
3309        let root_1_state = Arc::new(AtomicBool::new(false));
3310        let root_1_1_state = Arc::new(AtomicBool::new(false));
3311        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
3312        let root_2_state = Arc::new(AtomicBool::new(false));
3313        let root_2_1_state = Arc::new(AtomicBool::new(false));
3314
3315        let root = proc
3316            .spawn::<TestActor>("root", TestActor(root_state.clone(), false))
3317            .unwrap();
3318        let root_1 = proc
3319            .spawn_child::<TestActor>(
3320                root.cell().clone(),
3321                TestActor(
3322                    root_1_state.clone(),
3323                    true, /* set true so children's event stops here */
3324                ),
3325            )
3326            .unwrap();
3327        let root_1_1 = proc
3328            .spawn_child::<TestActor>(
3329                root_1.cell().clone(),
3330                TestActor(root_1_1_state.clone(), false),
3331            )
3332            .unwrap();
3333        let root_1_1_1 = proc
3334            .spawn_child::<TestActor>(
3335                root_1_1.cell().clone(),
3336                TestActor(root_1_1_1_state.clone(), false),
3337            )
3338            .unwrap();
3339        let root_2 = proc
3340            .spawn_child::<TestActor>(root.cell().clone(), TestActor(root_2_state.clone(), false))
3341            .unwrap();
3342        let root_2_1 = proc
3343            .spawn_child::<TestActor>(
3344                root_2.cell().clone(),
3345                TestActor(root_2_1_state.clone(), false),
3346            )
3347            .unwrap();
3348
3349        // fail `root_1_1_1`, the supervision msg should be propagated to
3350        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
3351        root_1_1_1
3352            .send::<String>(&client, "some random failure".into())
3353            .unwrap();
3354
3355        // fail `root_2_1`, the supervision msg should be propagated to
3356        // ProcSupervisionCoordinator.
3357        root_2_1
3358            .send::<String>(&client, "some random failure".into())
3359            .unwrap();
3360
3361        tokio::time::sleep(Duration::from_secs(1)).await;
3362
3363        assert!(!root_state.load(Ordering::SeqCst));
3364        assert!(root_1_state.load(Ordering::SeqCst));
3365        assert!(!root_1_1_state.load(Ordering::SeqCst));
3366        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
3367        assert!(!root_2_state.load(Ordering::SeqCst));
3368        assert!(!root_2_1_state.load(Ordering::SeqCst));
3369        assert_eq!(
3370            reported_event.event().map(|e| e.actor_id.clone()),
3371            Some(root_2_1.actor_id().clone())
3372        );
3373    }
3374
3375    #[async_timed_test(timeout_secs = 30)]
3376    async fn test_instance() {
3377        #[derive(Debug, Default)]
3378        struct TestActor;
3379
3380        impl Actor for TestActor {}
3381
3382        #[async_trait]
3383        impl Handler<(String, reference::PortRef<String>)> for TestActor {
3384            async fn handle(
3385                &mut self,
3386                cx: &crate::Context<Self>,
3387                (message, port): (String, reference::PortRef<String>),
3388            ) -> anyhow::Result<()> {
3389                port.send(cx, message)?;
3390                Ok(())
3391            }
3392        }
3393
3394        let proc = Proc::local();
3395
3396        let (instance, handle) = proc.instance("my_test_actor").unwrap();
3397
3398        let child_actor = TestActor.spawn(&instance).unwrap();
3399
3400        let (port, mut receiver) = instance.open_port();
3401        child_actor
3402            .send(&instance, ("hello".to_string(), port.bind()))
3403            .unwrap();
3404
3405        let message = receiver.recv().await.unwrap();
3406        assert_eq!(message, "hello");
3407
3408        child_actor.drain_and_stop("test").unwrap();
3409        child_actor.await;
3410
3411        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3412        drop(instance);
3413        assert_matches!(*handle.status().borrow(), ActorStatus::Stopped(_));
3414        handle.await;
3415    }
3416
3417    #[tokio::test]
3418    async fn test_proc_terminate_without_coordinator() {
3419        if std::env::var("CARGO_TEST").is_ok() {
3420            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3421            return;
3422        }
3423
3424        let process = async {
3425            let proc = Proc::local();
3426            // Intentionally not setting a proc supervison coordinator. This
3427            // should cause the process to terminate.
3428            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
3429            let root = proc.spawn("root", TestActor).unwrap();
3430            let (client, _handle) = proc.instance("client").unwrap();
3431            root.fail(&client, anyhow::anyhow!("some random failure"))
3432                .await
3433                .unwrap();
3434            // It is okay to sleep a long time here, because we expect this
3435            // process to be terminated way before the sleep ends due to the
3436            // missing proc supervison coordinator.
3437            tokio::time::sleep(Duration::from_secs(30)).await;
3438        };
3439
3440        assert_termination(|| process, 1).await.unwrap();
3441    }
3442
3443    fn trace_and_block(fut: impl Future) {
3444        tracing::subscriber::with_default(
3445            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3446            || {
3447                tokio::runtime::Builder::new_current_thread()
3448                    .enable_all()
3449                    .build()
3450                    .unwrap()
3451                    .block_on(fut)
3452            },
3453        );
3454    }
3455
3456    #[ignore = "until trace recording is turned back on"]
3457    #[test]
3458    fn test_handler_logging() {
3459        #[derive(Debug, Default)]
3460        struct LoggingActor;
3461
3462        impl Actor for LoggingActor {}
3463
3464        impl LoggingActor {
3465            async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
3466                let barrier = Arc::new(Barrier::new(2));
3467                handle.send(cx, barrier.clone()).unwrap();
3468                barrier.wait().await;
3469            }
3470        }
3471
3472        #[async_trait]
3473        impl Handler<String> for LoggingActor {
3474            async fn handle(
3475                &mut self,
3476                _cx: &crate::Context<Self>,
3477                message: String,
3478            ) -> anyhow::Result<()> {
3479                tracing::info!("{}", message);
3480                Ok(())
3481            }
3482        }
3483
3484        #[async_trait]
3485        impl Handler<u64> for LoggingActor {
3486            async fn handle(
3487                &mut self,
3488                _cx: &crate::Context<Self>,
3489                message: u64,
3490            ) -> anyhow::Result<()> {
3491                tracing::event!(Level::INFO, number = message);
3492                Ok(())
3493            }
3494        }
3495
3496        #[async_trait]
3497        impl Handler<Arc<Barrier>> for LoggingActor {
3498            async fn handle(
3499                &mut self,
3500                _cx: &crate::Context<Self>,
3501                message: Arc<Barrier>,
3502            ) -> anyhow::Result<()> {
3503                message.wait().await;
3504                Ok(())
3505            }
3506        }
3507
3508        #[async_trait]
3509        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3510            async fn handle(
3511                &mut self,
3512                _cx: &crate::Context<Self>,
3513                barriers: Arc<(Barrier, Barrier)>,
3514            ) -> anyhow::Result<()> {
3515                let inner = tracing::span!(Level::INFO, "child_span");
3516                let _inner_guard = inner.enter();
3517                barriers.0.wait().await;
3518                barriers.1.wait().await;
3519                Ok(())
3520            }
3521        }
3522
3523        trace_and_block(async {
3524            let proc = Proc::local();
3525            let (client, _) = proc.instance("client").unwrap();
3526            let handle = LoggingActor.spawn_detached().unwrap();
3527            handle.send(&client, "hello world".to_string()).unwrap();
3528            handle
3529                .send(&client, "hello world again".to_string())
3530                .unwrap();
3531            handle.send(&client, 123u64).unwrap();
3532
3533            LoggingActor::wait(&client, &handle).await;
3534
3535            let events = handle.cell().inner.recording.tail();
3536            assert_eq!(events.len(), 3);
3537            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3538            assert_eq!(
3539                events[1].json_value(),
3540                json!({ "message": "hello world again" })
3541            );
3542            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3543
3544            let stacks = {
3545                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3546                handle.send(&client, Arc::clone(&barriers)).unwrap();
3547                barriers.0.wait().await;
3548                let stacks = handle.cell().inner.recording.stacks();
3549                barriers.1.wait().await;
3550                stacks
3551            };
3552            assert_eq!(stacks.len(), 1);
3553            assert_eq!(stacks[0].len(), 1);
3554            assert_eq!(stacks[0][0].name(), "child_span");
3555        })
3556    }
3557
3558    #[async_timed_test(timeout_secs = 30)]
3559    async fn test_mailbox_closed_with_owner_stopped_reason() {
3560        use crate::actor::ActorStatus;
3561        use crate::mailbox::MailboxErrorKind;
3562        use crate::mailbox::MailboxSenderErrorKind;
3563
3564        let proc = Proc::local();
3565        let (client, _) = proc.instance("client").unwrap();
3566        let actor_handle = proc.spawn("test", TestActor).unwrap();
3567
3568        // Clone the handle before awaiting since await consumes the handle
3569        let handle_for_send = actor_handle.clone();
3570
3571        // Stop the actor gracefully
3572        actor_handle.drain_and_stop("healthy shutdown").unwrap();
3573        actor_handle.await;
3574
3575        // Try to send a message to the stopped actor
3576        let result = handle_for_send.send(&client, TestActorMessage::Noop());
3577
3578        assert!(result.is_err(), "send should fail when actor is stopped");
3579        let err = result.unwrap_err();
3580        assert_matches!(
3581            err.kind(),
3582            MailboxSenderErrorKind::Mailbox(mailbox_err)
3583                if matches!(
3584                    mailbox_err.kind(),
3585                    MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "healthy shutdown"
3586                )
3587        );
3588    }
3589
3590    #[async_timed_test(timeout_secs = 30)]
3591    async fn test_mailbox_closed_with_owner_failed_reason() {
3592        use crate::actor::ActorErrorKind;
3593        use crate::actor::ActorStatus;
3594        use crate::mailbox::MailboxErrorKind;
3595        use crate::mailbox::MailboxSenderErrorKind;
3596
3597        let proc = Proc::local();
3598        let (client, _) = proc.instance("client").unwrap();
3599        // Need to set a supervison coordinator for this Proc because there will
3600        // be actor failure(s) in this test which trigger supervision.
3601        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
3602
3603        let actor_handle = proc.spawn("test", TestActor).unwrap();
3604
3605        // Clone the handle before awaiting since await consumes the handle
3606        let handle_for_send = actor_handle.clone();
3607
3608        // Cause the actor to fail
3609        actor_handle
3610            .send(
3611                &client,
3612                TestActorMessage::Fail(anyhow::anyhow!("intentional failure")),
3613            )
3614            .unwrap();
3615        actor_handle.await;
3616
3617        // Try to send a message to the failed actor
3618        let result = handle_for_send.send(&client, TestActorMessage::Noop());
3619
3620        assert!(result.is_err(), "send should fail when actor has failed");
3621        let err = result.unwrap_err();
3622        assert_matches!(
3623            err.kind(),
3624            MailboxSenderErrorKind::Mailbox(mailbox_err)
3625                if matches!(
3626                    mailbox_err.kind(),
3627                    MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg)))
3628                        if msg.contains("intentional failure")
3629                )
3630        );
3631    }
3632
3633    /// Wait for a terminated snapshot to appear for the given actor.
3634    /// The introspect task runs in a separate tokio task and may not
3635    /// have stored the snapshot by the time `handle.await` returns.
3636    async fn wait_for_terminated_snapshot(
3637        proc: &Proc,
3638        actor_id: &reference::ActorId,
3639    ) -> crate::introspect::IntrospectResult {
3640        // Yield to let the introspect task run, then poll. Use a
3641        // combination of yields (for fast paths) and sleeps (to
3642        // avoid busy-spinning if the scheduler is loaded).
3643        for i in 0..1000 {
3644            if let Some(snapshot) = proc.terminated_snapshot(actor_id) {
3645                return snapshot;
3646            }
3647            if i < 50 {
3648                tokio::task::yield_now().await;
3649            } else {
3650                tokio::time::sleep(Duration::from_millis(50)).await;
3651            }
3652        }
3653        panic!("timed out waiting for terminated snapshot for {}", actor_id);
3654    }
3655
3656    // Verifies that when an actor is stopped, the proc eventually
3657    // records a "terminated snapshot" for it (written by the
3658    // introspect task, which runs asynchronously). The test asserts
3659    // the snapshot is absent while the actor is live, then stops the
3660    // actor, waits for the introspect task to observe the terminal
3661    // state, and confirms:
3662    //   - the stored snapshot reports a `stopped:*` actor_status, and
3663    //   - the actor id moves from the live set to the terminated set.
3664    #[async_timed_test(timeout_secs = 30)]
3665    async fn test_terminated_snapshot_stored_on_stop() {
3666        let proc = Proc::local();
3667        let (_client, _client_handle) = proc.instance("client").unwrap();
3668
3669        let handle = proc.spawn::<TestActor>("actor", TestActor).unwrap();
3670        let actor_id = handle.actor_id().clone();
3671
3672        // Actor is live — no terminated snapshot yet.
3673        assert!(proc.terminated_snapshot(&actor_id).is_none());
3674        assert!(!proc.all_terminated_actor_ids().contains(&actor_id));
3675
3676        // Stop the actor and wait for it to fully terminate.
3677        handle.drain_and_stop("test").unwrap();
3678        handle.await;
3679
3680        // The introspect task runs in a separate tokio task; wait for
3681        // it to observe the terminal status and store the snapshot.
3682        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3683        let attrs: hyperactor_config::Attrs =
3684            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3685        let status = attrs
3686            .get(crate::introspect::STATUS)
3687            .expect("must have status");
3688        assert!(
3689            status.starts_with("stopped"),
3690            "expected stopped status, got: {}",
3691            status
3692        );
3693
3694        // Actor should appear in terminated IDs but not in live IDs.
3695        assert!(proc.all_terminated_actor_ids().contains(&actor_id));
3696        assert!(
3697            !proc.all_actor_ids().contains(&actor_id),
3698            "stopped actor should not appear in live actor IDs"
3699        );
3700    }
3701
3702    // Verifies that an actor failure results in a terminated snapshot
3703    // being stored. The test installs a ProcSupervisionCoordinator
3704    // (required for failure handling), spawns an actor, triggers a
3705    // failure via a message, waits for the actor to terminate, then
3706    // waits for the introspect task to persist the terminal snapshot
3707    // and asserts the snapshot reports a `failed:*` actor_status.
3708    #[async_timed_test(timeout_secs = 30)]
3709    async fn test_terminated_snapshot_stored_on_failure() {
3710        let proc = Proc::local();
3711        let (client, _client_handle) = proc.instance("client").unwrap();
3712        // Supervision coordinator required for actor failure handling.
3713        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3714
3715        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3716        let actor_id = handle.actor_id().clone();
3717
3718        // Trigger a failure.
3719        handle
3720            .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3721            .unwrap();
3722        handle.await;
3723
3724        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3725        let attrs: hyperactor_config::Attrs =
3726            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3727        let status = attrs
3728            .get(crate::introspect::STATUS)
3729            .expect("must have status");
3730        assert!(
3731            status.starts_with("failed"),
3732            "expected failed status, got: {}",
3733            status
3734        );
3735    }
3736
3737    // Exercises FI-1/FI-2 (see introspect.rs module-scope comment).
3738    #[async_timed_test(timeout_secs = 30)]
3739    async fn test_supervision_event_stored_on_failure() {
3740        let proc = Proc::local();
3741        let (client, _client_handle) = proc.instance("client").unwrap();
3742        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3743
3744        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3745        let actor_id = handle.actor_id().clone();
3746        let cell = handle.cell().clone();
3747
3748        handle
3749            .send(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")))
3750            .unwrap();
3751        handle.await;
3752
3753        let event = cell.supervision_event();
3754        assert!(event.is_some(), "failed actor must have supervision_event");
3755        let event = event.unwrap();
3756        assert_eq!(event.actor_id, actor_id);
3757        assert!(event.actor_status.is_failed());
3758        // Originated here, not propagated.
3759        assert_eq!(event.actually_failing_actor().actor_id, actor_id);
3760    }
3761
3762    // Exercises FI-2 (see introspect.rs module-scope comment).
3763    #[async_timed_test(timeout_secs = 30)]
3764    async fn test_supervision_event_none_on_clean_stop() {
3765        let proc = Proc::local();
3766        let (_client, _client_handle) = proc.instance("client").unwrap();
3767
3768        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
3769        let cell = handle.cell().clone();
3770
3771        handle.drain_and_stop("test").unwrap();
3772        handle.await;
3773
3774        assert!(
3775            cell.supervision_event().is_none(),
3776            "cleanly stopped actor must have no supervision_event"
3777        );
3778    }
3779
3780    // Exercises FI-4 (see introspect.rs module-scope comment).
3781    #[async_timed_test(timeout_secs = 30)]
3782    async fn test_supervision_event_on_propagated_failure() {
3783        let proc = Proc::local();
3784        let (client, _client_handle) = proc.instance("client").unwrap();
3785        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3786
3787        let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
3788        let parent_cell = parent.cell().clone();
3789        // Spawn child under parent.
3790        let (tx, rx) = oneshot::channel();
3791        parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
3792        let child = rx.await.unwrap();
3793        let child_id = child.actor_id().clone();
3794
3795        // Fail the child — parent doesn't handle supervision, so it
3796        // propagates and terminates too.
3797        child
3798            .send(
3799                &client,
3800                TestActorMessage::Fail(anyhow::anyhow!("child boom")),
3801            )
3802            .unwrap();
3803        parent.await;
3804
3805        let event = parent_cell.supervision_event();
3806        assert!(
3807            event.is_some(),
3808            "parent must have supervision_event from propagated failure"
3809        );
3810        let event = event.unwrap();
3811        // Root cause is the child, not the parent.
3812        assert_eq!(event.actually_failing_actor().actor_id, child_id);
3813    }
3814
3815    // Exercises S11 (see introspect.rs module doc).
3816    //
3817    // A live actor is resolvable. After drain_and_stop + await, the
3818    // actor's status is terminal and resolve_actor_ref must return
3819    // None — even though the introspect task may still hold a strong
3820    // InstanceCell Arc (it drops the Arc only after observing
3821    // terminal status asynchronously). The is_terminal() check in
3822    // resolve_actor_ref closes that race window.
3823    #[async_timed_test(timeout_secs = 30)]
3824    async fn test_resolve_actor_ref_none_for_terminal_actor() {
3825        let proc = Proc::local();
3826        let (_client, _client_handle) = proc.instance("client").unwrap();
3827
3828        let handle = proc.spawn::<TestActor>("target", TestActor).unwrap();
3829        let actor_ref: reference::ActorRef<TestActor> = handle.bind();
3830
3831        // Actor is live — resolve should succeed.
3832        assert!(
3833            proc.resolve_actor_ref(&actor_ref).is_some(),
3834            "live actor should be resolvable"
3835        );
3836
3837        handle.drain_and_stop("test").unwrap();
3838        handle.await;
3839
3840        // Actor is terminal — resolve must return None regardless of
3841        // whether the introspect task has dropped its Arc yet.
3842        assert!(
3843            proc.resolve_actor_ref(&actor_ref).is_none(),
3844            "terminal actor must not be resolvable"
3845        );
3846    }
3847
3848    // Exercises FI-3 (see introspect module doc).
3849    #[async_timed_test(timeout_secs = 30)]
3850    async fn test_terminated_snapshot_has_failure_info() {
3851        let proc = Proc::local();
3852        let (client, _client_handle) = proc.instance("client").unwrap();
3853        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3854
3855        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
3856        let actor_id = handle.actor_id().clone();
3857
3858        handle
3859            .send(&client, TestActorMessage::Fail(anyhow::anyhow!("kaboom")))
3860            .unwrap();
3861        handle.await;
3862
3863        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
3864        let attrs: hyperactor_config::Attrs =
3865            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3866        let status = attrs
3867            .get(crate::introspect::STATUS)
3868            .expect("must have status");
3869        assert!(
3870            status.starts_with("failed"),
3871            "expected failed status, got: {}",
3872            status
3873        );
3874        let err_msg = attrs
3875            .get(crate::introspect::FAILURE_ERROR_MESSAGE)
3876            .expect("failed actor must have failure_error_message");
3877        assert!(!err_msg.is_empty());
3878        let root_cause = attrs
3879            .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
3880            .expect("must have root_cause_actor");
3881        assert_eq!(root_cause, &actor_id.to_string());
3882        assert_eq!(
3883            attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
3884            Some(&false)
3885        );
3886        assert!(
3887            attrs.get(crate::introspect::FAILURE_OCCURRED_AT).is_some(),
3888            "failed actor must have occurred_at"
3889        );
3890    }
3891
3892    // Exercises FI-4 (see introspect module doc).
3893    #[async_timed_test(timeout_secs = 30)]
3894    async fn test_propagated_failure_info() {
3895        let proc = Proc::local();
3896        let (client, _client_handle) = proc.instance("client").unwrap();
3897        ProcSupervisionCoordinator::set(&proc).await.unwrap();
3898
3899        let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
3900        let parent_id = parent.actor_id().clone();
3901
3902        let (tx, rx) = oneshot::channel();
3903        parent.send(&client, TestActorMessage::Spawn(tx)).unwrap();
3904        let child = rx.await.unwrap();
3905        let child_id = child.actor_id().clone();
3906
3907        child
3908            .send(
3909                &client,
3910                TestActorMessage::Fail(anyhow::anyhow!("child fail")),
3911            )
3912            .unwrap();
3913        parent.await;
3914
3915        let snapshot = wait_for_terminated_snapshot(&proc, &parent_id).await;
3916        let attrs: hyperactor_config::Attrs =
3917            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
3918        let root_cause = attrs
3919            .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
3920            .expect("propagated failure must have root_cause_actor");
3921        assert_eq!(root_cause, &child_id.to_string());
3922        assert_eq!(
3923            attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
3924            Some(&true)
3925        );
3926    }
3927
3928    /// Exercises AI-1 (see module doc).
3929    #[async_timed_test(timeout_secs = 30)]
3930    async fn test_spawn_with_name_creates_descriptive_name() {
3931        let proc = Proc::local();
3932        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3933        let handle = proc
3934            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
3935            .unwrap();
3936        assert_eq!(handle.actor_id().name(), "my_controller");
3937        assert_eq!(handle.actor_id().pid(), 1);
3938    }
3939
3940    /// Exercises AI-1 (see module doc).
3941    #[async_timed_test(timeout_secs = 30)]
3942    async fn test_spawn_with_name_increments_index() {
3943        let proc = Proc::local();
3944        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3945        let first = proc
3946            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
3947            .unwrap();
3948        let second = proc
3949            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
3950            .unwrap();
3951        assert_eq!(first.actor_id().pid(), 1);
3952        assert_eq!(second.actor_id().pid(), 2);
3953    }
3954
3955    /// Exercises AI-1 (see module doc).
3956    /// spawn_named_child passes Some(parent) to spawn_inner.
3957    #[async_timed_test(timeout_secs = 30)]
3958    async fn test_spawn_with_name_preserves_supervision() {
3959        let proc = Proc::local();
3960        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3961        let child = proc
3962            .spawn_named_child(root.cell().clone(), "supervised_child", TestActor)
3963            .unwrap();
3964        let child_cell = child.cell();
3965        let parent = child_cell.parent().expect("named child must have a parent");
3966        assert_eq!(parent.actor_id(), root.actor_id());
3967    }
3968
3969    /// Exercises AI-1 (see module doc).
3970    #[async_timed_test(timeout_secs = 30)]
3971    async fn test_spawn_unchanged() {
3972        let proc = Proc::local();
3973        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3974        let child = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
3975        assert_eq!(child.actor_id().name(), root.actor_id().name());
3976    }
3977
3978    /// Exercises AI-1 (see module doc).
3979    #[async_timed_test(timeout_secs = 30)]
3980    async fn test_spawn_with_name_different_names_different_pids() {
3981        let proc = Proc::local();
3982        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3983        let a = proc
3984            .spawn_named_child(root.cell().clone(), "controller_a", TestActor)
3985            .unwrap();
3986        let b = proc
3987            .spawn_named_child(root.cell().clone(), "controller_b", TestActor)
3988            .unwrap();
3989        assert_ne!(a.actor_id().pid(), b.actor_id().pid());
3990        assert_eq!(a.actor_id().name(), "controller_a");
3991        assert_eq!(b.actor_id().name(), "controller_b");
3992    }
3993
3994    /// Exercises AI-1 (see module doc).
3995    #[async_timed_test(timeout_secs = 30)]
3996    async fn test_spawn_with_name_no_child_overwrite() {
3997        let proc = Proc::local();
3998        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
3999        let _a = proc
4000            .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4001            .unwrap();
4002        let _b = proc
4003            .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
4004            .unwrap();
4005        let _c = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
4006        assert_eq!(root.cell().child_count(), 3);
4007    }
4008
4009    /// Exercises AI-1 (see module doc).
4010    #[async_timed_test(timeout_secs = 30)]
4011    async fn test_spawn_with_name_does_not_pollute_roots() {
4012        let proc = Proc::local();
4013        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
4014        let _child = proc
4015            .spawn_named_child(root.cell().clone(), "foo", TestActor)
4016            .unwrap();
4017        // "foo" was used as a named child name but should NOT
4018        // prevent spawning a root actor with that name.
4019        let result = proc.spawn::<TestActor>("foo", TestActor);
4020        assert!(result.is_ok(), "named child should not pollute roots");
4021    }
4022
4023    /// Exercises AI-3 (see module doc).
4024    #[async_timed_test(timeout_secs = 30)]
4025    async fn test_ai3_controller_actor_ids_unique_across_parents_same_proc() {
4026        let proc = Proc::local();
4027        let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4028        let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4029
4030        // Simulate the correct pattern: include mesh identity in name.
4031        let ctrl_a = proc
4032            .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4033            .unwrap();
4034        let ctrl_b = proc
4035            .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4036            .unwrap();
4037
4038        assert_ne!(
4039            ctrl_a.actor_id(),
4040            ctrl_b.actor_id(),
4041            "controller ActorIds must be unique across parents"
4042        );
4043    }
4044
4045    /// Exercises AI-3 (see module doc).
4046    #[async_timed_test(timeout_secs = 30)]
4047    async fn test_ai3_no_controller_overwrite_in_parent_or_proc_maps() {
4048        let proc = Proc::local();
4049        let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
4050        let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
4051
4052        let ctrl_a = proc
4053            .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
4054            .unwrap();
4055        let ctrl_b = proc
4056            .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
4057            .unwrap();
4058
4059        // Both must be independently resolvable via the proc's instances.
4060        assert!(
4061            proc.get_instance(ctrl_a.actor_id()).is_some(),
4062            "ctrl_a must be resolvable"
4063        );
4064        assert!(
4065            proc.get_instance(ctrl_b.actor_id()).is_some(),
4066            "ctrl_b must be resolvable"
4067        );
4068        // Parents each see exactly one child.
4069        assert_eq!(parent_a.cell().child_count(), 1);
4070        assert_eq!(parent_b.cell().child_count(), 1);
4071    }
4072
4073    // Exercises FI-6 (see introspect module doc).
4074    #[async_timed_test(timeout_secs = 30)]
4075    async fn test_stopped_snapshot_has_no_failure_info() {
4076        let proc = Proc::local();
4077        let (_client, _client_handle) = proc.instance("client").unwrap();
4078
4079        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
4080        let actor_id = handle.actor_id().clone();
4081
4082        handle.drain_and_stop("test").unwrap();
4083        handle.await;
4084
4085        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
4086        let attrs: hyperactor_config::Attrs =
4087            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
4088        let status = attrs
4089            .get(crate::introspect::STATUS)
4090            .expect("must have status");
4091        assert!(
4092            status.starts_with("stopped"),
4093            "expected stopped, got: {}",
4094            status
4095        );
4096        assert!(
4097            attrs
4098                .get(crate::introspect::FAILURE_ERROR_MESSAGE)
4099                .is_none(),
4100            "stopped actor must not have failure attrs"
4101        );
4102    }
4103}