Skip to main content

hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! [`Proc`] is an addressable actor-runtime boundary.
10//!
11//! It owns actor lifecycle (spawn, run, terminate), routes messages
12//! to local actors, forwards messages for remote destinations, and
13//! hosts supervision state.
14//!
15//! It also stores bounded snapshots of terminated actors for
16//! post-mortem introspection.
17//!
18//! ## Client instance invariants (CI-*)
19//!
20//! - **CI-1 (client status):** `IntrospectMessage::Query` on an
21//!   introspectable instance returns `status: "client"` and
22//!   `actor_type: "()"` in attrs.
23//! - **CI-2 (snapshot on drop):** Dropping the returned `Instance<()>`
24//!   transitions its status to terminal, causing the introspect task
25//!   to store a terminated snapshot.
26//!
27//! ## Actor identity invariants (AI-*)
28//!
29//! - **AI-1 (named-child uid):** Each child gets a globally unique
30//!   random uid. Named children carry a label for display purposes.
31//! - **AI-3 (controller ActorAddr uniqueness):** Each named child gets
32//!   a unique uid; the label is informational only.
33//!
34//! ## Flight recorder span invariants (FR-*)
35//!
36//! - **FR-1 (recording-span route equivalence):**
37//!   `Instance::recording_span()` returns a span bound to the same
38//!   actor-local `Recording` consumed by handler instrumentation and
39//!   introspection. Events emitted under that span land in the same
40//!   flight-recorder ring buffer returned by `introspect_payload()`.
41//! - **FR-2 (recording-span rootness):** Every span returned by
42//!   `Instance::recording_span()` is a fresh root span (`parent:
43//!   None`). Ambient tracing context does not cause events emitted
44//!   under that span to route into a parent actor's flight recorder.
45//! - **FR-3 (fresh-handle, stable-destination):** Repeated calls to
46//!   `Instance::recording_span()` return distinct span handles, but
47//!   all target the same underlying actor recording.
48//!
49//! ## Queue depth accounting invariants (PD-5*)
50//!
51//! - **PD-5a:** Per-actor queue depth counts work items enqueued for
52//!   handler execution but not yet received from `work_rx`.
53//! - **PD-5b:** Queue depth is incremented exactly once on every
54//!   enqueue into the actor work queue (in `HandlerPorts::get`).
55//! - **PD-5c:** Queue depth is decremented exactly once on every
56//!   dequeue from `work_rx` (in the actor `run` loop).
57//! - **PD-5d:** Queue depth is intended to be non-negative; tests
58//!   must cover ordered/buffered delivery paths to validate the
59//!   accounting.
60//! - **PD-5e:** `queue_depth` and the OTel `ACTOR_MESSAGE_QUEUE_SIZE`
61//!   counter are two consumers of one accounting path. The
62//!   `account_enqueue` / `account_dequeue` helpers update both
63//!   together so they cannot drift.
64//!
65//! ## Retained queue-pressure invariants (PD-6 through PD-9)
66//!
67//! `ProcQueueStats` holds proc-level retained evidence of queue
68//! pressure. These are runtime-driven (not publish-time sampled)
69//! so they capture between-publish bursts.
70//!
71//! - **PD-6:** `high_water_mark >= running_total` eventually.
72//!   Because `running_total` is incremented before `high_water_mark`
73//!   is updated, a concurrent reader may transiently observe
74//!   `total > high_water_mark`. This is a sampling artifact, not
75//!   an accounting error.
76//! - **PD-7:** `last_nonzero_age_ms() == None` iff proc queue
77//!   depth has never been non-zero since startup. The timestamp
78//!   is updated on enqueue and on dequeue when the queue remains
79//!   non-zero, so it reflects the last observed non-zero state.
80//! - **PD-8:** transient bursts that drain before publish still
81//!   update both the high-water mark and the last-nonzero state.
82//! - **PD-9:** `last_nonzero_age_ms()` is expected to be
83//!   non-decreasing during quiet periods, but this is not a hard
84//!   guarantee — the implementation uses `SystemTime` (wall clock),
85//!   which can move backward on NTP adjustments. Callers should
86//!   treat the age as best-effort telemetry, not a monotonic
87//!   invariant.
88
89use std::any::Any;
90use std::any::TypeId;
91use std::collections::BTreeMap;
92use std::collections::HashMap;
93use std::fmt;
94use std::future::Future;
95use std::ops::Deref;
96use std::panic;
97use std::panic::AssertUnwindSafe;
98use std::panic::Location as PanicLocation;
99use std::pin::Pin;
100use std::sync::Arc;
101use std::sync::Condvar;
102use std::sync::Mutex;
103use std::sync::OnceLock;
104use std::sync::RwLock;
105use std::sync::Weak;
106use std::sync::atomic::AtomicBool;
107use std::sync::atomic::AtomicU64;
108use std::sync::atomic::AtomicUsize;
109use std::sync::atomic::Ordering;
110use std::time::Duration;
111use std::time::Instant;
112use std::time::SystemTime;
113
114use async_trait::async_trait;
115use dashmap::DashMap;
116use dashmap::DashSet;
117use dashmap::mapref::entry::Entry;
118use dashmap::mapref::multiple::RefMulti;
119use futures::FutureExt;
120use hyperactor_config::Flattrs;
121use hyperactor_telemetry::ActorStatusEvent;
122use hyperactor_telemetry::generate_actor_status_event_id;
123use hyperactor_telemetry::hash_to_u64;
124use hyperactor_telemetry::notify_actor_status_changed;
125use hyperactor_telemetry::notify_message;
126use hyperactor_telemetry::notify_message_status;
127use hyperactor_telemetry::recorder::Recording;
128use serde::Deserialize;
129use serde::Serialize;
130use tokio::sync::Notify;
131use tokio::sync::mpsc;
132use tokio::sync::watch;
133use tokio::task::JoinHandle;
134use tracing::Instrument;
135use tracing::Span;
136use typeuri::Named;
137use uuid::Uuid;
138use wirevalue::TypeInfo;
139
140use crate as hyperactor;
141use crate::Actor;
142use crate::ActorAddr;
143use crate::ActorRef;
144use crate::Addr;
145use crate::Data;
146use crate::Handler;
147use crate::Location;
148use crate::Message;
149use crate::PortAddr;
150use crate::ProcAddr;
151use crate::ProcId;
152use crate::RemoteMessage;
153use crate::actor::ActorError;
154use crate::actor::ActorErrorKind;
155use crate::actor::ActorHandle;
156use crate::actor::ActorStatus;
157use crate::actor::AnyActorHandle;
158use crate::actor::Binds;
159use crate::actor::HandlerInfo;
160use crate::actor::Referable;
161use crate::actor::RemoteHandles;
162use crate::actor::Signal;
163use crate::actor::StopMode;
164use crate::actor_local::ActorLocalStorage;
165use crate::channel;
166use crate::channel::ChannelAddr;
167use crate::channel::ChannelError;
168use crate::channel::ChannelTransport;
169use crate::config;
170use crate::context;
171use crate::context::Mailbox as _;
172use crate::endpoint::Endpoint as _;
173use crate::gateway::Gateway;
174use crate::id::ActorId;
175use crate::id::Label;
176use crate::id::Uid;
177use crate::introspect::IntrospectMessage;
178use crate::introspect::IntrospectResult;
179use crate::mailbox::BoxedMailboxSender;
180use crate::mailbox::DeliveryError;
181use crate::mailbox::DialMailboxRouter;
182use crate::mailbox::IntoBoxedMailboxSender as _;
183use crate::mailbox::Mailbox;
184use crate::mailbox::MailboxClient;
185use crate::mailbox::MailboxMuxer;
186use crate::mailbox::MailboxSender;
187use crate::mailbox::MessageEnvelope;
188use crate::mailbox::OncePortHandle;
189use crate::mailbox::OncePortReceiver;
190use crate::mailbox::PanickingMailboxSender;
191use crate::mailbox::PortHandle;
192use crate::mailbox::PortReceiver;
193use crate::mailbox::Undeliverable;
194use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
195use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
196use crate::metrics::ACTOR_MESSAGES_RECEIVED;
197use crate::subject::AsSubject as _;
198
199/// Legacy singleton proc name used for host-local client actors.
200///
201/// This is not a true singleton: every host may have a `local` proc, so local
202/// delivery must compare both proc id and location for this id.
203pub const LEGACY_LOCAL_PROC_NAME: &str = "local";
204
205/// Legacy singleton proc name used for host system actors.
206///
207/// This is not a true singleton: every host may have a `service` proc, so
208/// local delivery must compare both proc id and location for this id.
209pub const LEGACY_SERVICE_PROC_NAME: &str = "service";
210
211/// Returns current epoch-millis from wall clock. Used by
212/// `ProcQueueStats` for timestamp recording. In tests, override
213/// via `ProcQueueStats::with_clock` to get deterministic behavior.
214fn wall_clock_epoch_ms() -> u64 {
215    std::time::SystemTime::now()
216        .duration_since(std::time::UNIX_EPOCH)
217        .unwrap_or_default()
218        .as_millis() as u64
219}
220
221/// Proc-level retained queue-pressure state (PD-6 through PD-9).
222///
223/// Runtime-driven and updated from the enqueue/dequeue accounting
224/// path, not from publish-time sampling. These metrics preserve
225/// between-publish queue-pressure evidence that instantaneous
226/// sampling misses.
227pub(crate) struct ProcQueueStats {
228    /// Proc-wide running total of queued work items. Incremented on
229    /// enqueue, decremented on dequeue. O(1) alternative to iterating
230    /// per-actor depths.
231    running_total: AtomicU64,
232    /// Maximum proc-wide queue depth observed since startup (PD-6).
233    high_water_mark: AtomicU64,
234    /// Epoch-millis of the most recent moment when proc-wide queue
235    /// depth was observed non-zero (PD-7). Sentinel 0 means never.
236    /// Updated on enqueue and on dequeue when the queue remains
237    /// non-zero, so the age reflects the last observed non-zero
238    /// state rather than merely the last enqueue.
239    last_nonzero_epoch_ms: AtomicU64,
240    /// Clock function for timestamps. Defaults to `wall_clock_epoch_ms`.
241    /// Tests can override via `with_clock` for deterministic behavior.
242    clock: fn() -> u64,
243}
244
245impl ProcQueueStats {
246    fn new() -> Self {
247        Self {
248            running_total: AtomicU64::new(0),
249            high_water_mark: AtomicU64::new(0),
250            last_nonzero_epoch_ms: AtomicU64::new(0),
251            clock: wall_clock_epoch_ms,
252        }
253    }
254
255    /// Create with a custom clock for testing.
256    #[cfg(test)]
257    fn with_clock(clock: fn() -> u64) -> Self {
258        Self {
259            running_total: AtomicU64::new(0),
260            high_water_mark: AtomicU64::new(0),
261            last_nonzero_epoch_ms: AtomicU64::new(0),
262            clock,
263        }
264    }
265
266    /// Current epoch-millis from this instance's clock.
267    fn now_ms(&self) -> u64 {
268        (self.clock)()
269    }
270
271    /// Current proc-wide running total.
272    pub(crate) fn running_total(&self) -> u64 {
273        self.running_total.load(Ordering::Relaxed)
274    }
275
276    /// Maximum proc-wide queue depth since startup (PD-6).
277    pub(crate) fn high_water_mark(&self) -> u64 {
278        self.high_water_mark.load(Ordering::Relaxed)
279    }
280
281    /// How long ago proc-wide queue depth was last observed non-zero
282    /// (PD-7). `None` means no counted actor work has traversed the
283    /// queue accounting path since startup. Uses the configured clock
284    /// (wall clock in production, injectable in tests).
285    pub(crate) fn last_nonzero_age_ms(&self) -> Option<u64> {
286        let ts = self.last_nonzero_epoch_ms.load(Ordering::Relaxed);
287        if ts == 0 {
288            return None;
289        }
290        Some(self.now_ms().saturating_sub(ts))
291    }
292}
293
294/// Single accounting path for actor work-queue enqueue.
295///
296/// Updates three consumers together: per-actor `queue_depth`,
297/// proc-level retained queue-pressure state (`ProcQueueStats`),
298/// and OTel `ACTOR_MESSAGE_QUEUE_SIZE`. Unifying the update
299/// here ensures they cannot drift.
300fn account_enqueue(queue_depth: &AtomicU64, proc_stats: &ProcQueueStats, actor_id: &str) {
301    queue_depth.fetch_add(1, Ordering::Relaxed);
302    let new_total = proc_stats.running_total.fetch_add(1, Ordering::Relaxed) + 1;
303    // PD-6: update high-water mark.
304    proc_stats
305        .high_water_mark
306        .fetch_max(new_total, Ordering::Relaxed);
307    // PD-7: record that the proc is non-zero right now.
308    proc_stats
309        .last_nonzero_epoch_ms
310        .store(proc_stats.now_ms(), Ordering::Relaxed);
311    ACTOR_MESSAGE_QUEUE_SIZE.add(
312        1,
313        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.to_owned()),
314    );
315}
316
317/// Single accounting path for actor work-queue dequeue.
318///
319/// Updates per-actor `queue_depth`, proc-level running total,
320/// OTel `ACTOR_MESSAGE_QUEUE_SIZE`, and the last-nonzero
321/// timestamp when the proc-wide queue remains non-zero after
322/// this dequeue.
323fn account_dequeue(queue_depth: &AtomicU64, proc_stats: &ProcQueueStats, actor_id: &str) {
324    queue_depth.fetch_sub(1, Ordering::Relaxed);
325    let prev_total = proc_stats.running_total.fetch_sub(1, Ordering::Relaxed);
326    // PD-7: if the queue is still non-zero after this dequeue,
327    // update the timestamp so last_nonzero_age_ms reflects
328    // "last observed non-zero state," not just "last enqueue."
329    if prev_total > 1 {
330        proc_stats
331            .last_nonzero_epoch_ms
332            .store(proc_stats.now_ms(), Ordering::Relaxed);
333    }
334    ACTOR_MESSAGE_QUEUE_SIZE.add(
335        -1,
336        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.to_owned()),
337    );
338}
339
340/// Roll back an accounted enqueue when the underlying send fails.
341///
342/// Must be paired with a prior `account_enqueue` that has not yet
343/// been balanced by `account_dequeue`. Decrements per-actor
344/// `queue_depth`, proc-level `running_total`, and OTel
345/// `ACTOR_MESSAGE_QUEUE_SIZE` symmetrically. Leaves
346/// `high_water_mark` alone (monotonic by design) and does not
347/// touch `last_nonzero_epoch_ms` (best-effort observational
348/// timestamp; brief overcount on failed sends is acceptable).
349fn account_cancel_enqueue(queue_depth: &AtomicU64, proc_stats: &ProcQueueStats, actor_id: &str) {
350    queue_depth.fetch_sub(1, Ordering::Relaxed);
351    proc_stats.running_total.fetch_sub(1, Ordering::Relaxed);
352    ACTOR_MESSAGE_QUEUE_SIZE.add(
353        -1,
354        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.to_owned()),
355    );
356}
357use crate::ordering::OrderedSender;
358use crate::ordering::OrderedSenderError;
359use crate::ordering::SEQ_INFO;
360use crate::ordering::SeqInfo;
361use crate::ordering::Sequencer;
362use crate::ordering::ordered_channel;
363use crate::panic_handler;
364use crate::supervision::ActorSupervisionEvent;
365
366/// Identity assignment sent by a host as the first message on a duplex
367/// attach connection. The child reads this to learn its [`ProcAddr`].
368#[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
369pub struct BootstrapAssignment {
370    /// The assigned proc identity.
371    pub proc_id: ProcAddr,
372}
373wirevalue::register_type!(BootstrapAssignment);
374
375/// Sentinel message sent by an attach client as its first
376/// [`MessageEnvelope`]. Hosts use this to distinguish attach requests
377/// from regular inbound [`MessageEnvelope`] connections.
378#[derive(Debug, Clone, Serialize, Deserialize, typeuri::Named)]
379pub struct AttachRequest;
380wirevalue::register_type!(AttachRequest);
381
382/// Wire protocol for the host -> client direction on a duplex attach
383/// connection.
384#[derive(Debug, Serialize, Deserialize, typeuri::Named)]
385#[expect(
386    clippy::large_enum_variant,
387    reason = "wire-protocol enum; boxing Envelope ripples through all channel/networking construction and destructure sites and needs a wire-compatibility review — separate diff"
388)]
389pub enum Host2Client {
390    /// First message: identity assignment from the host.
391    Bootstrap(BootstrapAssignment),
392    /// Subsequent messages: routed envelopes.
393    Envelope(MessageEnvelope),
394}
395wirevalue::register_type!(Host2Client);
396
397/// [`Rx<MessageEnvelope>`](channel::Rx) adapter that unwraps
398/// [`Host2Client::Envelope`] from a duplex receiver.
399pub struct AttachRx(pub channel::duplex::DuplexRx<Host2Client>);
400
401#[async_trait]
402impl channel::Rx<MessageEnvelope> for AttachRx {
403    async fn recv(&mut self) -> Result<MessageEnvelope, ChannelError> {
404        match self.0.recv().await? {
405            Host2Client::Envelope(envelope) => Ok(envelope),
406            Host2Client::Bootstrap(_) => Err(ChannelError::Other(anyhow::anyhow!(
407                "unexpected bootstrap message after handshake"
408            ))),
409        }
410    }
411
412    fn addr(&self) -> ChannelAddr {
413        self.0.addr()
414    }
415
416    async fn join(self) {
417        self.0.join().await
418    }
419}
420
421/// A proc instance is the runtime managing a single proc in Hyperactor.
422/// It is responsible for spawning actors in the proc, multiplexing messages
423/// to/within actors in the proc, and providing fallback routing to external
424/// procs.
425///
426/// Procs are also responsible for maintaining the local supervision hierarchy.
427#[derive(Clone)]
428pub struct Proc {
429    inner: Arc<ProcState>,
430}
431
432impl fmt::Debug for Proc {
433    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434        f.debug_struct("Proc")
435            .field("proc_id", &self.inner.proc_id)
436            .finish()
437    }
438}
439
440struct ProcState {
441    /// The proc's runtime identity. This should be globally unique,
442    /// but is not (yet) for local-only procs.
443    proc_id: ProcId,
444
445    /// Shared ingress, egress, and advertised reachability state.
446    gateway: Gateway,
447
448    /// A muxer instance that has entries for every actor managed by
449    /// the proc.
450    proc_muxer: MailboxMuxer,
451
452    /// Reserved root actor uids. Prevents races between concurrent
453    /// `allocate_root_id` callers — insert returns false if the uid
454    /// was already reserved.
455    reserved_roots: DashSet<crate::id::Uid>,
456
457    /// Reserved explicit child actor uids. Prevents races between concurrent
458    /// `gspawn_uid` callers with the same uid.
459    reserved_child_uids: DashSet<crate::id::Uid>,
460
461    /// All actor instances in this proc.
462    instances: DashMap<ActorId, WeakInstanceCell>,
463
464    /// Proc-level queue-pressure accounting (PD-6 through PD-9).
465    /// Runtime-driven — updated from `account_enqueue` /
466    /// `account_dequeue`, not from publish-time sampling.
467    /// `Arc`-wrapped so `HandlerPorts<A>` enqueue closures can share it.
468    queue_stats: Arc<ProcQueueStats>,
469
470    /// Snapshots of terminated actors for post-mortem introspection.
471    /// Populated by the introspect task just before it exits on
472    /// terminal status. Bounded by
473    /// [`config::TERMINATED_SNAPSHOT_RETENTION`].
474    terminated_snapshots: DashMap<ActorId, TerminatedSnapshot>,
475
476    /// Used by root actors to send events to the actor coordinating
477    /// supervision of root actors in this proc.
478    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
479
480    /// The actor ID of the supervision coordinator, if it lives on this proc.
481    /// Used to ensure the coordinator is shut down last during proc teardown.
482    supervision_coordinator_actor_id: OnceLock<ActorAddr>,
483
484    /// Handle to the mailbox server task, if this proc was created with
485    /// `Proc::direct()` or had `serve()` called on it. Used to
486    /// gracefully stop the server and join it (flushing receive-side
487    /// acks) during shutdown.
488    mailbox_server_handle: std::sync::Mutex<Option<crate::mailbox::MailboxServerHandle>>,
489}
490
491struct TerminatedSnapshot {
492    actor_addr: ActorAddr,
493    payload: crate::introspect::IntrospectResult,
494}
495
496impl Drop for ProcState {
497    fn drop(&mut self) {
498        // We only want log ProcStatus::Dropped when ProcState is dropped,
499        // rather than Proc is dropped. This is because we need to wait for
500        // Proc::inner's ref count becomes 0.
501        let proc_addr = self.proc_addr();
502        tracing::info!(
503            subject = %proc_addr.subject(),
504            name = "ProcStatus",
505            status = "Dropped"
506        );
507    }
508}
509
510impl ProcState {
511    fn default_location(&self) -> Location {
512        self.gateway.default_location()
513    }
514
515    fn set_default_location(&self, location: Location) {
516        self.gateway.set_default_location(location)
517    }
518
519    fn proc_addr(&self) -> ProcAddr {
520        self.gateway.proc_addr(&self.proc_id)
521    }
522}
523
524/// Structured return type for [`Proc::actor_instance`].
525///
526/// Groups the instance, handle, and per-channel receivers that an
527/// "inverted" actor caller needs to drive the actor manually.
528pub struct ActorInstance<A: Actor> {
529    /// The actor instance (used for sending/receiving messages, spawning children, etc.).
530    pub instance: Instance<A>,
531    /// Handle to the actor (used for lifecycle control and port access).
532    pub handle: ActorHandle<A>,
533    /// Supervision events delivered to this actor.
534    pub supervision: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
535    /// Control signals for the actor.
536    pub signal: PortReceiver<Signal>,
537    /// Primary work queue for handler dispatch.
538    pub work: mpsc::UnboundedReceiver<WorkCell<A>>,
539}
540
541/// Builder for constructing a [`Proc`] with explicit identity and connectivity.
542pub struct Builder<State = GlobalGateway> {
543    proc_id: Option<ProcId>,
544    state: State,
545}
546
547/// Builder state that attaches the proc to the process-wide global gateway.
548pub struct GlobalGateway;
549
550/// Builder state that attaches the proc to a shared gateway.
551pub struct SharedGateway {
552    gateway: Gateway,
553}
554
555/// Builder state that creates a private gateway with a custom forwarder.
556pub struct PrivateGateway {
557    forwarder: BoxedMailboxSender,
558}
559
560impl Builder<GlobalGateway> {
561    /// Create a new proc builder.
562    pub fn new() -> Self {
563        Self {
564            proc_id: None,
565            state: GlobalGateway,
566        }
567    }
568
569    /// Attach the proc to a shared gateway.
570    pub fn shared_gateway(self, gateway: Gateway) -> Builder<SharedGateway> {
571        Builder {
572            proc_id: self.proc_id,
573            state: SharedGateway { gateway },
574        }
575    }
576
577    /// Use a private gateway with the provided forwarder.
578    pub fn private_gateway(self, forwarder: BoxedMailboxSender) -> Builder<PrivateGateway> {
579        Builder {
580            proc_id: self.proc_id,
581            state: PrivateGateway { forwarder },
582        }
583    }
584
585    /// Build the proc.
586    pub fn build(self) -> Result<Proc, anyhow::Error> {
587        self.build_with_gateway(Gateway::global().clone())
588    }
589}
590
591impl<State> Builder<State> {
592    /// Set the proc identity.
593    pub fn proc_id(mut self, proc_id: ProcId) -> Self {
594        self.proc_id = Some(proc_id);
595        self
596    }
597
598    fn build_with_gateway(self, gateway: Gateway) -> Result<Proc, anyhow::Error> {
599        Self::build_proc(self.proc_id, gateway)
600    }
601
602    fn build_proc(proc_id: Option<ProcId>, gateway: Gateway) -> Result<Proc, anyhow::Error> {
603        let proc_id = proc_id.unwrap_or_else(ProcId::anonymous);
604        if is_legacy_pseudo_singleton_proc_id(&proc_id) {
605            anyhow::bail!(
606                "legacy pseudo-singleton proc id '{}' must be constructed with a dedicated Proc constructor",
607                proc_id
608            );
609        }
610        Ok(Proc::from_parts_unchecked(proc_id, gateway))
611    }
612}
613
614impl Builder<SharedGateway> {
615    /// Build the proc.
616    pub fn build(self) -> Result<Proc, anyhow::Error> {
617        let Builder {
618            proc_id,
619            state: SharedGateway { gateway },
620        } = self;
621        Self::build_proc(proc_id, gateway)
622    }
623}
624
625impl Builder<PrivateGateway> {
626    /// Build the proc.
627    pub fn build(self) -> Result<Proc, anyhow::Error> {
628        let Builder {
629            proc_id,
630            state: PrivateGateway { forwarder },
631        } = self;
632        let gateway = Gateway::configured(channel::reserve_local_addr().into(), forwarder);
633        Self::build_proc(proc_id, gateway)
634    }
635}
636
637impl Proc {
638    fn from_parts_unchecked(proc_id: ProcId, gateway: Gateway) -> Self {
639        let proc_addr = gateway.proc_addr(&proc_id);
640        tracing::info!(
641            subject = %proc_addr.subject(),
642            name = "ProcStatus",
643            status = "Created"
644        );
645
646        let proc = Self {
647            inner: Arc::new(ProcState {
648                proc_id: proc_id.clone(),
649                gateway: gateway.clone(),
650                proc_muxer: MailboxMuxer::new(),
651                reserved_roots: DashSet::new(),
652                reserved_child_uids: DashSet::new(),
653                instances: DashMap::new(),
654                queue_stats: Arc::new(ProcQueueStats::new()),
655                terminated_snapshots: DashMap::new(),
656                supervision_coordinator_port: OnceLock::new(),
657                supervision_coordinator_actor_id: OnceLock::new(),
658                mailbox_server_handle: std::sync::Mutex::new(None),
659            }),
660        };
661        gateway.attach(&proc);
662        proc
663    }
664
665    fn from_parts(proc_id: ProcId, gateway: Gateway) -> Self {
666        assert_not_legacy_pseudo_singleton_proc_id(&proc_id);
667        Self::from_parts_unchecked(proc_id, gateway)
668    }
669
670    /// Create the legacy host-local client proc pseudo-singleton.
671    pub fn legacy_local_pseudo_singleton(addr: ChannelAddr, forwarder: BoxedMailboxSender) -> Self {
672        Self::legacy_pseudo_singleton(addr, LEGACY_LOCAL_PROC_NAME, forwarder)
673    }
674
675    /// Create the legacy host system proc pseudo-singleton.
676    pub fn legacy_service_pseudo_singleton(
677        addr: ChannelAddr,
678        forwarder: BoxedMailboxSender,
679    ) -> Self {
680        Self::legacy_pseudo_singleton(addr, LEGACY_SERVICE_PROC_NAME, forwarder)
681    }
682
683    fn legacy_pseudo_singleton(
684        addr: ChannelAddr,
685        name: &'static str,
686        forwarder: BoxedMailboxSender,
687    ) -> Self {
688        let proc_addr = ProcAddr::singleton(addr, name);
689        Self::from_parts_unchecked(
690            proc_addr.id().clone(),
691            Gateway::configured(proc_addr.location().clone(), forwarder),
692        )
693    }
694
695    /// Create a proc with an anonymous instance id on the default gateway.
696    pub fn anonymous() -> Self {
697        Self::builder()
698            .build()
699            .expect("anonymous proc builder is valid")
700    }
701
702    /// Create a proc with an instance id and display label on the default gateway.
703    pub fn instance(label: impl AsRef<str>) -> Self {
704        Self::builder()
705            .proc_id(ProcId::instance(Label::strip(label.as_ref())))
706            .build()
707            .expect("instance proc builder is valid")
708    }
709
710    /// Create a proc with a singleton id on the default gateway.
711    pub fn singleton(name: impl AsRef<str>) -> Self {
712        Self::builder()
713            .proc_id(ProcId::singleton(Label::strip(name.as_ref())))
714            .build()
715            .expect("singleton proc builder is valid")
716    }
717
718    /// Create a proc with a random id on a fresh local-only gateway.
719    pub fn isolated() -> Self {
720        Self::builder()
721            .shared_gateway(Gateway::isolated())
722            .build()
723            .expect("isolated proc builder is valid")
724    }
725
726    /// Create a proc builder.
727    pub fn builder() -> Builder {
728        Builder::new()
729    }
730
731    /// Create a pre-configured proc with the given proc id and forwarder.
732    pub fn configured(proc_id: impl Into<ProcAddr>, forwarder: BoxedMailboxSender) -> Self {
733        let proc_addr = proc_id.into();
734        Self::from_parts(
735            proc_addr.id().clone(),
736            Gateway::configured(proc_addr.location().clone(), forwarder),
737        )
738    }
739
740    /// Create a new direct-addressed proc.
741    ///
742    /// The provided name is a display label. Direct procs are otherwise
743    /// independent instances, so each one receives a unique proc id.
744    pub fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
745        let (addr, rx) = channel::serve(addr)?;
746        let proc_id = ProcAddr::instance(addr, name);
747        let proc = Self::builder()
748            .proc_id(proc_id.id().clone())
749            .shared_gateway(Gateway::configured(
750                proc_id.location().clone(),
751                DialMailboxRouter::new().into_boxed(),
752            ))
753            .build()
754            .expect("direct proc builder is valid");
755        let handle = proc.gateway().serve_rx(rx);
756        *proc.inner.mailbox_server_handle.lock().unwrap() = Some(handle);
757        Ok(proc)
758    }
759
760    /// Connect to a host's duplex server and return a [`Proc`] whose
761    /// identity is assigned by the host. Outbound messages are forwarded
762    /// over the duplex channel; inbound messages are served into the
763    /// proc's muxer. Mirrors [`Proc::direct`] but the identity and
764    /// routing are managed by the remote host.
765    pub async fn attach_to_host(addr: ChannelAddr) -> Result<Self, anyhow::Error> {
766        use crate::channel::Rx;
767        use crate::channel::Tx;
768        let mut duplex_client = channel::duplex::dial::<MessageEnvelope, Host2Client>(addr)?;
769        let duplex_tx = duplex_client.tx();
770        let mut duplex_rx = duplex_client
771            .take_rx()
772            .expect("dial returns a fresh DuplexClient with rx present");
773        // Send an AttachRequest envelope to signal attach intent.
774        // The host deserializes the first message and enters the
775        // attach protocol when it finds an AttachRequest. The
776        // sender/dest ids are placeholders — on the happy path the
777        // host consumes the envelope without routing it. Clearing
778        // `return_undeliverable` closes the hazard path in case the
779        // envelope ever escapes into the forwarder: it should be
780        // dropped, not bounced to the fake sender.
781        let signal_actor_id = ActorAddr::root(
782            ProcAddr::singleton(ChannelAddr::any(channel::ChannelTransport::Local), "attach"),
783            crate::id::Label::strip("attach"),
784        );
785        let signal_port = signal_actor_id.port_addr(crate::port::Port::from(0u64));
786        let mut envelope = MessageEnvelope::serialize(
787            signal_actor_id,
788            signal_port,
789            &AttachRequest,
790            Default::default(),
791        )?;
792        envelope.set_return_undeliverable(false);
793        duplex_tx.post(envelope);
794        // Wait for the host to assign an identity.
795        let assignment = match duplex_rx.recv().await? {
796            Host2Client::Bootstrap(a) => a,
797            Host2Client::Envelope(_) => {
798                anyhow::bail!("expected bootstrap assignment as first message")
799            }
800        };
801        let proc = Self::builder()
802            .proc_id(assignment.proc_id.id().clone())
803            .shared_gateway(Gateway::configured(
804                assignment.proc_id.location().clone(),
805                MailboxClient::new(duplex_tx).into_boxed(),
806            ))
807            .build()
808            .expect("attached proc builder is valid");
809        // Wrap the inner mailbox server handle so that stopping/
810        // joining the outer handle also joins the dial-side
811        // `DuplexClient`.
812        let inner_handle = proc.gateway().serve_rx(AttachRx(duplex_rx));
813        let (stopped_tx, mut stopped_rx) = tokio::sync::watch::channel(false);
814        let wrapped_join = tokio::spawn(async move {
815            let _ = stopped_rx.wait_for(|stopped| *stopped).await;
816            inner_handle.stop("proc shutting down");
817            let _ = inner_handle.await;
818            duplex_client.join().await;
819            Ok(())
820        });
821        let handle = crate::mailbox::MailboxServerHandle::from_parts(wrapped_join, stopped_tx);
822        *proc.inner.mailbox_server_handle.lock().unwrap() = Some(handle);
823        Ok(proc)
824    }
825
826    /// Set the supervision coordinator's port for this proc. Return Err if it is
827    /// already set.
828    pub fn set_supervision_coordinator(
829        &self,
830        port: PortHandle<ActorSupervisionEvent>,
831    ) -> Result<(), anyhow::Error> {
832        let actor_ref: ActorAddr = port.location().actor_addr();
833        self.state()
834            .supervision_coordinator_port
835            .set(port)
836            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))?;
837        let _ = self.state().supervision_coordinator_actor_id.set(actor_ref);
838        Ok(())
839    }
840
841    /// The actor address of the supervision coordinator, if one is set and
842    /// lives on this proc.
843    pub fn supervision_coordinator_actor_addr(&self) -> Option<&ActorAddr> {
844        self.state().supervision_coordinator_actor_id.get()
845    }
846
847    /// Handle a supervision event received by the proc. Attempt to forward it to the
848    /// supervision coordinator port if one is set, otherwise crash the process.
849    pub fn handle_unhandled_supervision_event(
850        &self,
851        cx: &impl context::Actor,
852        event: ActorSupervisionEvent,
853    ) {
854        let result = match self.state().supervision_coordinator_port.get() {
855            Some(port) => {
856                port.post(cx, event.clone());
857                Ok(())
858            }
859            None => {
860                if !event.is_error() {
861                    // Normal lifecycle events (e.g. clean stop) without a coordinator
862                    // are silently dropped.
863                    return;
864                }
865                Err(anyhow::anyhow!(
866                    "coordinator port is not set for proc {}",
867                    self.proc_addr(),
868                ))
869            }
870        };
871        if let Err(err) = result {
872            if !event.is_error() {
873                // Normal lifecycle events that fail to send (e.g. coordinator
874                // mailbox already closed during shutdown) are silently dropped.
875                tracing::debug!(
876                    subject = %self.proc_addr().subject(),
877                    "dropping non-error supervision event {}: {:?}",
878                    event,
879                    err
880                );
881                return;
882            }
883            tracing::error!(
884                subject = %self.proc_addr().subject(),
885                "could not propagate supervision event {} due to error: {:?}: crashing",
886                event,
887                err
888            );
889
890            std::process::exit(1);
891        }
892    }
893
894    /// The proc's runtime identity.
895    pub fn proc_id(&self) -> &ProcId {
896        &self.state().proc_id
897    }
898
899    /// The proc's default advertised location.
900    pub fn default_location(&self) -> Location {
901        self.state().default_location()
902    }
903
904    /// Set the proc's default advertised location.
905    pub fn set_default_location(&self, location: Location) {
906        self.state().set_default_location(location)
907    }
908
909    /// The proc's routeable address using its default advertised location.
910    pub fn proc_addr(&self) -> ProcAddr {
911        self.state().proc_addr()
912    }
913
914    /// The proc's connectivity boundary.
915    pub fn gateway(&self) -> Gateway {
916        self.state().gateway.clone()
917    }
918
919    /// Shared sender used by the proc to forward messages to remote
920    /// destinations.
921    pub fn forwarder(&self) -> &BoxedMailboxSender {
922        self.state().gateway.forwarder()
923    }
924
925    /// The proc's mailbox muxer, which routes messages to actors
926    /// registered on this proc.
927    pub fn muxer(&self) -> &MailboxMuxer {
928        &self.inner.proc_muxer
929    }
930
931    /// Convenience accessor for state.
932    fn state(&self) -> &ProcState {
933        self.inner.as_ref()
934    }
935
936    /// A global runtime proc used by this crate.
937    pub(crate) fn runtime() -> &'static Proc {
938        static RUNTIME_PROC: OnceLock<Proc> = OnceLock::new();
939        RUNTIME_PROC.get_or_init(|| {
940            let addr = ChannelAddr::any(ChannelTransport::Local);
941            let proc_id = ProcAddr::instance(addr, "hyperactor_runtime");
942            Proc::configured(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
943        })
944    }
945
946    /// Attach a mailbox to the proc with the provided root name.
947    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
948        let actor_id: ActorAddr = self.allocate_root_id(name)?;
949        Ok(self.bind_mailbox(actor_id))
950    }
951
952    /// Attach a mailbox to the proc as a child actor.
953    pub fn attach_child(&self, parent_id: &ActorAddr) -> Result<Mailbox, anyhow::Error> {
954        let actor_id: ActorAddr = self.allocate_child_id(parent_id)?;
955        Ok(self.bind_mailbox(actor_id))
956    }
957
958    /// Bind a mailbox to the proc.
959    fn bind_mailbox(&self, actor_id: ActorAddr) -> Mailbox {
960        let mbox = Mailbox::new(actor_id);
961
962        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
963        // by the caller. This will likely require a weak reference.
964        self.state().proc_muxer.bind_mailbox(mbox.clone());
965        mbox
966    }
967
968    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorAddr`].
969    /// This is intended only for testing, and will be replaced by simpled utilities.
970    pub fn attach_actor<R, M>(
971        &self,
972        name: &str,
973    ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
974    where
975        M: RemoteMessage,
976        R: Referable + RemoteHandles<M>,
977    {
978        let (instance, _handle) = self.client(name)?;
979        let (_handle, rx) = instance.bind_handler_port::<M>();
980        let actor_ref = ActorRef::attest(instance.self_addr().clone());
981        Ok((instance, actor_ref, rx))
982    }
983
984    /// Spawn a named (root) actor on this proc. The name of the actor must be
985    /// unique.
986    pub fn spawn<A: Actor>(&self, name: &str, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
987        let actor_id: ActorAddr = self.allocate_root_id(name)?;
988        self.spawn_inner(actor_id, actor, None)
989    }
990
991    /// Spawn a root actor on this proc using an explicit uid.
992    ///
993    /// The uid must be unique among root actors on this proc. Instance labels,
994    /// if present, are descriptive only and do not affect uniqueness.
995    pub fn spawn_with_uid<A: Actor>(
996        &self,
997        uid: crate::id::Uid,
998        actor: A,
999    ) -> Result<ActorHandle<A>, anyhow::Error> {
1000        let actor_id: ActorAddr = self.allocate_root_uid(uid)?;
1001        self.spawn_inner(actor_id, actor, None)
1002    }
1003
1004    /// Common spawn logic for both root and child actors.
1005    #[hyperactor::instrument(fields(subject = actor_id.subject().to_string()))]
1006    fn spawn_inner<A: Actor>(
1007        &self,
1008        actor_id: ActorAddr,
1009        actor: A,
1010        parent: Option<InstanceCell>,
1011    ) -> Result<ActorHandle<A>, anyhow::Error> {
1012        let (instance, receivers) = Instance::new(self.clone(), actor_id, false, parent);
1013        Ok(instance.start(actor, receivers))
1014    }
1015
1016    /// Create a lightweight client instance (no actor loop, no
1017    /// introspect task).  This is safe to call outside a Tokio
1018    /// runtime — unlike [`actor_instance`], it never calls
1019    /// `tokio::spawn`.
1020    pub fn client(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
1021        let actor_id: ActorAddr = self.allocate_root_id(name)?;
1022        let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, None);
1023        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1024        instance.change_status(ActorStatus::Client);
1025        Ok((instance, handle))
1026    }
1027
1028    /// Create a lightweight client instance that handles
1029    /// [`IntrospectMessage`].
1030    ///
1031    /// Like [`client`](Self::client), this creates a client-mode
1032    /// instance with no actor message loop. Unlike `client`, it
1033    /// spawns a dedicated introspect task, so the instance responds
1034    /// to `IntrospectMessage::Query` and is visible and navigable in
1035    /// admin tooling such as the mesh TUI.
1036    ///
1037    /// See CI-1, CI-2 in module doc.
1038    ///
1039    /// Requires an active Tokio runtime (calls `tokio::spawn`).
1040    pub fn introspectable_instance(
1041        &self,
1042        name: &str,
1043    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
1044        let actor_id: ActorAddr = self.allocate_root_id(name)?;
1045        let (instance, receivers) = Instance::new(self.clone(), actor_id, false, None);
1046        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1047        instance.change_status(ActorStatus::Client);
1048        tokio::spawn(crate::introspect::serve_introspect(
1049            instance.inner.cell.clone(),
1050            receivers.introspect,
1051        ));
1052        Ok((instance, handle))
1053    }
1054
1055    /// Create and return an actor instance, its handle, and its
1056    /// receivers. This allows actors to be "inverted": the caller can
1057    /// use the returned [`Instance`] to send and receive messages,
1058    /// launch child actors, etc. The actor itself does not handle any
1059    /// messages unless driven by the caller.
1060    pub fn actor_instance<A: Actor>(&self, name: &str) -> Result<ActorInstance<A>, anyhow::Error> {
1061        let actor_id: ActorAddr = self.allocate_root_id(name)?;
1062        let span = tracing::debug_span!(
1063            "actor_instance",
1064            subject = %actor_id.subject(),
1065        );
1066        let _guard = span.enter();
1067        let (instance, receivers) = Instance::new(self.clone(), actor_id.clone(), false, None);
1068        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1069        instance.change_status(ActorStatus::Client);
1070
1071        tokio::spawn(crate::introspect::serve_introspect(
1072            instance.inner.cell.clone(),
1073            receivers.introspect,
1074        ));
1075
1076        let (signal_rx, supervision_rx) = receivers.actor_loop.unwrap();
1077        Ok(ActorInstance {
1078            instance,
1079            handle,
1080            supervision: supervision_rx,
1081            signal: signal_rx,
1082            work: receivers.work,
1083        })
1084    }
1085
1086    /// Traverse all actor trees in this proc, starting from root actors.
1087    ///
1088    /// **Caution:** This holds DashMap shard read locks while doing
1089    /// `Weak::upgrade()` and recursively walking the actor tree per
1090    /// entry. Under rapid actor churn, this causes convoy starvation
1091    /// with concurrent `insert`/`remove` operations. Prefer
1092    /// `all_instance_keys()` with point lookups if you only need
1093    /// actor IDs. Currently unused in production code.
1094    pub fn traverse<F>(&self, f: &mut F)
1095    where
1096        F: FnMut(&InstanceCell, usize),
1097    {
1098        for entry in self.state().instances.iter() {
1099            if entry.key().uid().is_singleton()
1100                && let Some(cell) = entry.value().upgrade()
1101            {
1102                cell.traverse(f);
1103            }
1104        }
1105    }
1106
1107    /// Proc-wide running total of queued work items.
1108    pub fn queue_depth_total(&self) -> u64 {
1109        self.state().queue_stats.running_total()
1110    }
1111
1112    /// Maximum proc-wide queue depth observed since startup (PD-6).
1113    pub fn queue_depth_high_water_mark(&self) -> u64 {
1114        self.state().queue_stats.high_water_mark()
1115    }
1116
1117    /// How long ago proc-wide queue depth was last non-zero (PD-7).
1118    pub fn last_nonzero_queue_depth_age_ms(&self) -> Option<u64> {
1119        self.state().queue_stats.last_nonzero_age_ms()
1120    }
1121
1122    /// Look up an instance by ActorAddr.
1123    pub fn get_instance(&self, actor_id: &ActorAddr) -> Option<InstanceCell> {
1124        self.get_instance_by_id(actor_id.id())
1125    }
1126
1127    /// Look up an instance by ActorId.
1128    pub fn get_instance_by_id(&self, actor_id: &ActorId) -> Option<InstanceCell> {
1129        self.state()
1130            .instances
1131            .get(actor_id)
1132            .and_then(|cell| cell.upgrade())
1133    }
1134
1135    /// Returns the ActorAddrs of all root actors in this proc.
1136    ///
1137    /// **Caution:** This iterates the full DashMap under shard read
1138    /// locks. The per-entry work is lightweight (key filter + clone),
1139    /// but under very rapid churn the iteration can still contend
1140    /// with concurrent writes. Prefer `all_instance_keys()` with a
1141    /// post-filter if this becomes a hot path. Currently unused in
1142    /// production code.
1143    pub fn root_actor_ids(&self) -> Vec<ActorAddr> {
1144        self.state()
1145            .instances
1146            .iter()
1147            .filter_map(|entry| {
1148                entry
1149                    .key()
1150                    .uid()
1151                    .is_singleton()
1152                    .then(|| entry.value().upgrade())
1153                    .flatten()
1154                    .map(|cell| cell.actor_addr().clone())
1155            })
1156            .collect()
1157    }
1158
1159    /// Returns the ActorAddrs of all live actors in this proc, including
1160    /// dynamically spawned children.
1161    ///
1162    /// An actor is considered live if its weak reference is
1163    /// upgradeable and its status is not terminal. This excludes
1164    /// actors whose `InstanceCell` has been dropped and actors that
1165    /// have stopped or failed but whose Arc is still held (e.g. by
1166    /// the introspect task during teardown).
1167    pub fn all_actor_ids(&self) -> Vec<ActorAddr> {
1168        self.state()
1169            .instances
1170            .iter()
1171            .filter_map(|entry| {
1172                let cell = entry.value().upgrade()?;
1173                (!cell.status().borrow().is_terminal()).then(|| cell.actor_addr().clone())
1174            })
1175            .collect()
1176    }
1177
1178    /// Snapshot all instance ids from the DashMap without inspecting
1179    /// values. Each shard read lock is held only long enough to clone
1180    /// the id — no `Weak::upgrade()`, no `watch::borrow()`, no
1181    /// `is_terminal()` check. This minimises shard lock hold time to
1182    /// avoid convoy starvation with concurrent `insert`/`remove`
1183    /// operations during rapid actor churn.
1184    ///
1185    /// The returned list may include actors that are terminal or whose
1186    /// `WeakInstanceCell` no longer upgrades. Callers should tolerate stale
1187    /// ids (e.g. by handling "not found" on subsequent per-actor lookups).
1188    pub fn all_instance_keys(&self) -> Vec<ActorId> {
1189        self.state()
1190            .instances
1191            .iter()
1192            .map(|entry| entry.key().clone())
1193            .collect()
1194    }
1195
1196    /// Look up a terminated actor's snapshot by ID.
1197    pub fn terminated_snapshot(
1198        &self,
1199        actor_id: &ActorAddr,
1200    ) -> Option<crate::introspect::IntrospectResult> {
1201        self.state()
1202            .terminated_snapshots
1203            .get(actor_id.id())
1204            .map(|entry| entry.value().payload.clone())
1205    }
1206
1207    /// Return all terminated actor IDs currently retained.
1208    pub fn all_terminated_actor_ids(&self) -> Vec<ActorAddr> {
1209        self.state()
1210            .terminated_snapshots
1211            .iter()
1212            .map(|entry| entry.value().actor_addr.clone())
1213            .collect()
1214    }
1215
1216    /// Create a child instance. Called from `Instance`.
1217    fn child_instance(
1218        &self,
1219        parent: InstanceCell,
1220    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
1221        let actor_id = self.allocate_child_id(parent.actor_addr())?;
1222        let _ = tracing::debug_span!(
1223            "child_actor_instance",
1224            subject = %actor_id.subject(),
1225        );
1226
1227        let (instance, _receivers) = Instance::new(self.clone(), actor_id, false, Some(parent));
1228        // Client-mode instance: no actor loop, no introspect task.
1229        // Receivers are intentionally dropped.
1230        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
1231        instance.change_status(ActorStatus::Client);
1232        Ok((instance, handle))
1233    }
1234
1235    /// Spawn a child actor from the provided parent on this proc. The parent actor
1236    /// must already belong to this proc, a fact which is asserted in code.
1237    ///
1238    /// When spawn_child returns, the child has an associated cell and is linked
1239    /// with its parent.
1240    pub(crate) fn spawn_child<A: Actor>(
1241        &self,
1242        parent: InstanceCell,
1243        actor: A,
1244    ) -> Result<ActorHandle<A>, anyhow::Error> {
1245        let actor_id = self.allocate_child_id(parent.actor_addr())?;
1246        self.spawn_inner(actor_id, actor, Some(parent))
1247    }
1248
1249    /// Spawn a child actor from the provided parent using an explicit uid.
1250    pub(crate) fn spawn_child_with_uid<A: Actor>(
1251        &self,
1252        parent: InstanceCell,
1253        uid: crate::id::Uid,
1254        actor: A,
1255    ) -> Result<ActorHandle<A>, anyhow::Error> {
1256        let actor_id = self.ensure_child_uid(parent.actor_addr(), uid)?;
1257        self.spawn_inner(actor_id, actor, Some(parent))
1258    }
1259
1260    /// Spawn a named child actor. Same as `spawn_child` but the child
1261    /// gets a descriptive name instead of inheriting the parent's.
1262    /// Supervision linkage to parent is preserved.
1263    pub(crate) fn spawn_named_child<A: Actor>(
1264        &self,
1265        parent: InstanceCell,
1266        name: &str,
1267        actor: A,
1268    ) -> Result<ActorHandle<A>, anyhow::Error> {
1269        let actor_id = self.allocate_named_child_id(parent.actor_addr(), name)?;
1270        self.spawn_inner(actor_id, actor, Some(parent))
1271    }
1272
1273    /// Call `abort` on the `JoinHandle` associated with the given
1274    /// root actor. If successful return `Some(root.clone())` else
1275    /// `None`.
1276    pub fn abort_root_actor(&self, root: &ActorId) -> Option<impl Future<Output = ActorAddr>> {
1277        self.state()
1278            .instances
1279            .get(root)
1280            .into_iter()
1281            .flat_map(|entry| entry.value().upgrade())
1282            .map(|cell| {
1283                let actor_addr = cell.actor_addr().clone();
1284                let r1 = actor_addr.clone();
1285                let r2 = actor_addr;
1286                // `Instance::start()` is infallible and should
1287                // complete quickly, so calling `wait()` on `actor_task_handle`
1288                // should be safe (i.e., not hang forever).
1289                async move {
1290                    tokio::task::spawn_blocking(move || {
1291                        let h = cell.inner.actor_task_handle.wait();
1292                        tracing::debug!("{}: aborting {:?}", r1, h);
1293                        h.abort();
1294                    })
1295                    .await
1296                    .unwrap();
1297                    r2
1298                }
1299            })
1300            .next()
1301    }
1302
1303    /// Signals to a root actor to stop,
1304    /// returning a status observer if successful.
1305    pub fn stop_actor(
1306        &self,
1307        actor_id: &ActorId,
1308        reason: String,
1309    ) -> Option<watch::Receiver<ActorStatus>> {
1310        // Upgrade the weak ref and immediately drop the DashMap entry (read
1311        // guard) before doing anything with `cell`. InstanceCellState::drop
1312        // calls instances.remove(), which needs a write lock on the same shard.
1313        // Holding the read guard while cell drops would self-deadlock.
1314        let cell = match self.state().instances.get(actor_id) {
1315            None => {
1316                tracing::error!(subject = %self.proc_addr().subject(), "no actor {} found", actor_id);
1317                return None;
1318            }
1319            Some(entry) => entry.value().upgrade(),
1320        }; // entry (shard read lock) dropped here
1321        match cell {
1322            None => None, // the actor's cell has been dropped
1323            Some(cell) => {
1324                tracing::info!("sending stop signal to {}", cell.actor_addr());
1325                if let Err(err) = cell.signal(Signal::DrainAndStop(reason)) {
1326                    tracing::error!(
1327                        "failed to send stop signal to uid {}: {:?}",
1328                        cell.uid(),
1329                        err
1330                    );
1331                    None
1332                } else {
1333                    Some(cell.status().clone())
1334                }
1335            }
1336        }
1337    }
1338
1339    /// Stop the proc. Returns a pair of:
1340    /// - the actors observed to stop;
1341    /// - the actors not observed to stop when timeout.
1342    #[hyperactor::instrument(fields(subject = self.proc_addr().subject().to_string()))]
1343    pub async fn destroy_and_wait(
1344        &mut self,
1345        timeout: Duration,
1346        reason: &str,
1347    ) -> Result<(Vec<ActorAddr>, Vec<ActorAddr>), anyhow::Error> {
1348        tracing::debug!("proc stopping");
1349
1350        let coordinator_id = self.supervision_coordinator_actor_addr().cloned();
1351
1352        // Phase 1: stop all root actors except the supervision coordinator
1353        // (which must stay alive to receive stop events from the others).
1354        let mut statuses = HashMap::new();
1355        for actor_id in self
1356            .state()
1357            .instances
1358            .iter()
1359            .filter(|entry| entry.key().uid().is_singleton())
1360            .filter_map(|entry| entry.value().upgrade())
1361            .filter(|cell| !matches!(*cell.status().borrow(), ActorStatus::Client))
1362            .map(|cell| cell.actor_addr().clone())
1363            .collect::<Vec<_>>()
1364        {
1365            if coordinator_id.as_ref() == Some(&actor_id) {
1366                continue;
1367            }
1368            if let Some(status) = self.stop_actor(actor_id.id(), reason.to_string()) {
1369                statuses.insert(actor_id, status);
1370            }
1371        }
1372        tracing::debug!("non-coordinator actors stopped");
1373
1374        let waits: Vec<_> = statuses
1375            .iter_mut()
1376            .map(|(actor_id, root)| {
1377                let actor_id = actor_id.clone();
1378                async move {
1379                    tokio::time::timeout(
1380                        timeout,
1381                        root.wait_for(|state: &ActorStatus| state.is_terminal()),
1382                    )
1383                    .await
1384                    .ok()
1385                    .map(|_| actor_id)
1386                }
1387            })
1388            .collect();
1389
1390        let results = futures::future::join_all(waits).await;
1391        let mut stopped_actors: Vec<_> = results
1392            .iter()
1393            .filter_map(|actor_id| actor_id.as_ref())
1394            .cloned()
1395            .collect();
1396        let aborted_actors: Vec<_> = statuses
1397            .iter()
1398            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
1399            .map(|(actor_id, _)| {
1400                let f = self.abort_root_actor(actor_id.id());
1401                async move {
1402                    let _ = if let Some(f) = f { Some(f.await) } else { None };
1403                    // If `is_none(&_)` then the associated actor's
1404                    // instance cell was already dropped when we went
1405                    // to call `abort()` on the cell's task handle.
1406
1407                    actor_id.clone()
1408                }
1409            })
1410            .collect();
1411        let mut aborted_actors = futures::future::join_all(aborted_actors).await;
1412
1413        // Phase 2: now that all other actors have stopped, request the
1414        // supervision coordinator to stop. Their terminal supervision
1415        // events have already been enqueued by this point, and the
1416        // coordinator's DrainAndStop path drains queued supervision
1417        // events before exiting.
1418        if let Some(ref coord_id) = coordinator_id
1419            && let Some(mut status) = self.stop_actor(coord_id.id(), reason.to_string())
1420        {
1421            let stopped =
1422                tokio::time::timeout(timeout, status.wait_for(|s: &ActorStatus| s.is_terminal()))
1423                    .await
1424                    .is_ok();
1425            if stopped {
1426                stopped_actors.push(coord_id.clone());
1427            } else {
1428                if let Some(f) = self.abort_root_actor(coord_id.id()) {
1429                    f.await;
1430                }
1431                aborted_actors.push(coord_id.clone());
1432            }
1433        }
1434
1435        // Flush the gateway so that any messages posted during
1436        // teardown (e.g. supervision events) are wire-delivered
1437        // before we tear down the proc's networking. The flush is
1438        // best-effort: if the remote side has already torn down its
1439        // networking, acks may never arrive and flush would hang
1440        // indefinitely, so we bound it with a configurable timeout.
1441        let flush_timeout = hyperactor_config::global::get(crate::config::FORWARDER_FLUSH_TIMEOUT);
1442        let gateway = self.gateway();
1443        match tokio::time::timeout(flush_timeout, gateway.flush()).await {
1444            Ok(Err(err)) => {
1445                tracing::warn!("gateway flush failed during proc exit: {:?}", err);
1446            }
1447            Err(_elapsed) => {
1448                tracing::warn!("gateway flush timed out during proc exit");
1449            }
1450            Ok(Ok(())) => {}
1451        }
1452
1453        tracing::info!(
1454            "destroy_and_wait: {} actors stopped, {} actors aborted",
1455            stopped_actors.len(),
1456            aborted_actors.len()
1457        );
1458        Ok((stopped_actors, aborted_actors))
1459    }
1460
1461    /// Resolve an actor reference to a **live** actor on this proc.
1462    ///
1463    /// Returns `None` if:
1464    /// - the actor was never spawned here,
1465    /// - the actor's `InstanceCell` has been dropped, or
1466    /// - the actor's status is terminal (stopped or failed).
1467    ///
1468    /// The terminal-status check guards a race window: the introspect
1469    /// task (`serve_introspect`) holds a strong `InstanceCell` Arc
1470    /// and drops it only after observing terminal status. Between the
1471    /// actor reaching terminal and the introspect task reacting,
1472    /// `upgrade()` on the weak ref succeeds even though the actor is
1473    /// dead. The `is_terminal()` check closes that window. Once the
1474    /// introspect task exits, the Arc is dropped and `upgrade()`
1475    /// returns `None` on its own.
1476    ///
1477    /// Bounds:
1478    /// - `R: Actor` — must be a real actor that can live in this
1479    ///   proc.
1480    /// - `R: Referable` — required because the input is an
1481    ///   `ActorRef<R>`.
1482    pub fn resolve_actor_ref<R: Actor + Referable>(
1483        &self,
1484        actor_ref: &ActorRef<R>,
1485    ) -> Option<ActorHandle<R>> {
1486        let cell = self
1487            .inner
1488            .instances
1489            .get(actor_ref.actor_addr().id())?
1490            .upgrade()?;
1491        // An actor whose status is terminal has stopped processing
1492        // messages even if its InstanceCell Arc is still alive (e.g.
1493        // held by the introspect task during teardown).
1494        if cell.status().borrow().is_terminal() {
1495            return None;
1496        }
1497        cell.downcast_handle()
1498    }
1499
1500    /// Create a root allocation in the proc.
1501    ///
1502    /// Uses `reserved_roots` to prevent races between concurrent callers.
1503    fn allocate_root_id(&self, name: &str) -> Result<ActorAddr, anyhow::Error> {
1504        self.reserve_root(Uid::singleton(Label::strip(name)))
1505    }
1506
1507    /// Create a root allocation in the proc from an explicit uid.
1508    fn allocate_root_uid(&self, uid: Uid) -> Result<ActorAddr, anyhow::Error> {
1509        self.reserve_root(uid)
1510    }
1511
1512    fn reserve_root(&self, uid: Uid) -> Result<ActorAddr, anyhow::Error> {
1513        let actor_id = ActorId::new(uid.clone(), self.proc_id().clone(), None);
1514        if !self.state().reserved_roots.insert(uid) {
1515            anyhow::bail!("an actor with id '{}' has already been spawned", actor_id)
1516        }
1517        Ok(ActorAddr::new(actor_id, self.default_location()))
1518    }
1519
1520    /// Create a child allocation in the proc.
1521    #[hyperactor::instrument]
1522    pub(crate) fn allocate_child_id(
1523        &self,
1524        parent_id: &ActorAddr,
1525    ) -> Result<ActorAddr, anyhow::Error> {
1526        assert_eq!(parent_id.proc_id(), self.proc_id());
1527        Ok(parent_id.anonymous_child())
1528    }
1529
1530    /// Ensure that the requested child uid is available in this proc.
1531    fn ensure_child_uid(
1532        &self,
1533        parent_id: &ActorAddr,
1534        uid: crate::id::Uid,
1535    ) -> Result<ActorAddr, anyhow::Error> {
1536        assert_eq!(parent_id.proc_id(), self.proc_id());
1537        let actor_id = ActorId::new(uid.clone(), self.proc_id().clone(), None);
1538        let actor_addr = ActorAddr::new(actor_id, self.default_location());
1539        if !self.state().reserved_child_uids.insert(uid) {
1540            anyhow::bail!("an actor with id {} has already been spawned", actor_addr);
1541        }
1542        Ok(actor_addr)
1543    }
1544
1545    /// Allocate an actor ID with a custom name on this proc.
1546    pub(crate) fn allocate_named_child_id(
1547        &self,
1548        parent_id: &ActorAddr,
1549        name: &str,
1550    ) -> Result<ActorAddr, anyhow::Error> {
1551        assert_eq!(parent_id.proc_id(), self.proc_id());
1552        let proc_id = self.proc_id().clone();
1553        let actor_id = crate::id::ActorId::instance(crate::id::Label::strip(name), proc_id);
1554        Ok(ActorAddr::new(actor_id, self.default_location()))
1555    }
1556
1557    /// Downgrade to a weak reference that doesn't prevent the proc from being dropped.
1558    pub fn downgrade(&self) -> WeakProc {
1559        WeakProc::new(self)
1560    }
1561
1562    /// Flush the gateway so that any buffered messages are
1563    /// wire-delivered before the proc's networking is torn down.
1564    pub async fn flush(&self) -> Result<(), anyhow::Error> {
1565        self.gateway().flush().await
1566    }
1567
1568    /// Stop and join the mailbox server, flushing receive-side acks.
1569    ///
1570    /// This stops the `MailboxServer::serve` loop and awaits its
1571    /// completion, which runs `Rx::join()` to flush any pending
1572    /// transport-level acks before the channel is torn down.
1573    ///
1574    /// No-op if no mailbox server handle is stored (e.g. for
1575    /// `Proc::configured` or `Proc::isolated` procs that don't serve).
1576    pub async fn join_mailbox_server(&self) {
1577        let handle = self.inner.mailbox_server_handle.lock().unwrap().take();
1578        if let Some(handle) = handle {
1579            handle.stop("proc shutting down");
1580            let _ = handle.await;
1581        }
1582    }
1583
1584    pub(crate) fn is_local_delivery_target(&self, dest_proc: &ProcAddr) -> bool {
1585        let local_proc_id = self.proc_id();
1586        if requires_location_for_local_delivery_identity(dest_proc.id()) {
1587            // TODO: check all bound addresses for this proc, not only
1588            // the current default advertised location.
1589            return dest_proc.id() == local_proc_id
1590                && dest_proc.location() == &self.default_location();
1591        }
1592
1593        dest_proc.id() == local_proc_id
1594    }
1595}
1596
1597fn requires_location_for_local_delivery_identity(proc_id: &ProcId) -> bool {
1598    // Temporary hyperactor_mesh compatibility hack: host bootstrap
1599    // still creates a `service` proc and a `local` proc in every host
1600    // process, so those proc ids are not globally unique. Until those
1601    // construction paths are assigned instance ids, local delivery for
1602    // those two ids must keep the old full-address comparison.
1603    is_legacy_pseudo_singleton_proc_id(proc_id)
1604}
1605
1606fn assert_not_legacy_pseudo_singleton_proc_id(proc_id: &ProcId) {
1607    if is_legacy_pseudo_singleton_proc_id(proc_id) {
1608        panic!(
1609            "legacy pseudo-singleton proc id '{}' must be constructed with a dedicated Proc constructor",
1610            proc_id
1611        );
1612    }
1613}
1614
1615fn is_legacy_pseudo_singleton_proc_id(proc_id: &ProcId) -> bool {
1616    matches!(
1617        proc_id.uid(),
1618        Uid::Singleton(label) if is_legacy_pseudo_singleton_label(label)
1619    )
1620}
1621
1622fn is_legacy_pseudo_singleton_label(label: &Label) -> bool {
1623    matches!(
1624        label.as_str(),
1625        LEGACY_SERVICE_PROC_NAME | LEGACY_LOCAL_PROC_NAME
1626    )
1627}
1628
1629#[async_trait]
1630impl MailboxSender for Proc {
1631    fn post_unchecked(
1632        &self,
1633        envelope: MessageEnvelope,
1634        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1635    ) {
1636        let dest_proc = envelope.dest().actor_addr().proc_addr();
1637        if self.is_local_delivery_target(&dest_proc) {
1638            self.state().proc_muxer.post(envelope, return_handle)
1639        } else {
1640            self.state().gateway.post(envelope, return_handle)
1641        }
1642    }
1643
1644    async fn flush(&self) -> Result<(), anyhow::Error> {
1645        self.gateway().flush().await
1646    }
1647}
1648
1649/// A weak reference to a Proc that doesn't prevent it from being dropped.
1650#[derive(Clone, Debug)]
1651pub struct WeakProc(Weak<ProcState>);
1652
1653impl WeakProc {
1654    fn new(proc: &Proc) -> Self {
1655        Self(Arc::downgrade(&proc.inner))
1656    }
1657
1658    /// Upgrade to a strong Proc reference, if the proc is still alive.
1659    pub fn upgrade(&self) -> Option<Proc> {
1660        self.0.upgrade().map(|inner| Proc { inner })
1661    }
1662}
1663
1664#[async_trait]
1665impl MailboxSender for WeakProc {
1666    fn post_unchecked(
1667        &self,
1668        envelope: MessageEnvelope,
1669        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1670    ) {
1671        match self.upgrade() {
1672            Some(proc) => proc.post(envelope, return_handle),
1673            None => envelope.undeliverable(
1674                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
1675                return_handle,
1676            ),
1677        }
1678    }
1679
1680    async fn flush(&self) -> Result<(), anyhow::Error> {
1681        match self.upgrade() {
1682            Some(proc) => proc.flush().await,
1683            None => Ok(()),
1684        }
1685    }
1686}
1687
1688/// Represents a single work item used by the instance to dispatch to
1689/// actor handles. Specifically, this enables handler polymorphism.
1690pub struct WorkCell<A: Actor + Send>(
1691    Box<
1692        dyn for<'a> FnOnce(
1693                &'a mut A,
1694                &'a Instance<A>,
1695            )
1696                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1697            + Send
1698            + Sync,
1699    >,
1700);
1701
1702impl<A: Actor + Send> WorkCell<A> {
1703    /// Create a new WorkCell from a concrete function (closure).
1704    fn new(
1705        f: impl for<'a> FnOnce(
1706            &'a mut A,
1707            &'a Instance<A>,
1708        )
1709            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
1710        + Send
1711        + Sync
1712        + 'static,
1713    ) -> Self {
1714        Self(Box::new(f))
1715    }
1716
1717    /// Handle the message represented by this work cell.
1718    pub fn handle<'a>(
1719        self,
1720        actor: &'a mut A,
1721        instance: &'a Instance<A>,
1722    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
1723        (self.0)(actor, instance)
1724    }
1725}
1726
1727/// Context for a message currently being handled by an Instance.
1728pub struct Context<'a, A: Actor> {
1729    instance: &'a Instance<A>,
1730    headers: Flattrs,
1731}
1732
1733impl<'a, A: Actor> Context<'a, A> {
1734    /// Construct a new Context.
1735    pub fn new(instance: &'a Instance<A>, headers: Flattrs) -> Self {
1736        Self { instance, headers }
1737    }
1738
1739    /// Get a reference to the message headers.
1740    pub fn headers(&self) -> &Flattrs {
1741        &self.headers
1742    }
1743}
1744
1745impl<A: Actor> Deref for Context<'_, A> {
1746    type Target = Instance<A>;
1747
1748    fn deref(&self) -> &Self::Target {
1749        self.instance
1750    }
1751}
1752
1753/// An actor instance. This is responsible for managing a running actor, including
1754/// its full lifecycle, supervision, signal management, etc. Instances can represent
1755/// a managed actor or a "client" actor that has joined the proc.
1756pub struct Instance<A: Actor> {
1757    inner: Arc<InstanceState<A>>,
1758}
1759
1760impl<A: Actor> fmt::Debug for Instance<A> {
1761    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1762        f.debug_struct("Instance").field("inner", &"..").finish()
1763    }
1764}
1765
1766struct InstanceState<A: Actor> {
1767    /// The proc that owns this instance.
1768    proc: Proc,
1769
1770    /// The instance cell that manages instance hierarchy.
1771    cell: InstanceCell,
1772
1773    /// The mailbox associated with the actor.
1774    mailbox: Mailbox,
1775
1776    ports: Arc<HandlerPorts<A>>,
1777
1778    /// Runtime-owned delayed-post scheduler.
1779    delayed_posts: DelayedPosts<A>,
1780
1781    /// A watch for communicating the actor's state.
1782    status_tx: watch::Sender<ActorStatus>,
1783
1784    /// This instance's globally unique ID.
1785    id: Uuid,
1786
1787    /// Used to assign sequence numbers for messages sent from this actor.
1788    sequencer: Sequencer,
1789
1790    /// Per-instance local storage.
1791    instance_locals: ActorLocalStorage,
1792}
1793
1794type DelayedPost<A> = Box<dyn FnOnce(&Instance<A>) + Send>;
1795
1796trait PostAfterEndpoint<A: Actor, M: Message>: Send {
1797    fn endpoint_location(&self) -> crate::EndpointLocation;
1798
1799    fn into_delayed_post(self, message: M) -> DelayedPost<A>;
1800}
1801
1802impl<A, M> PostAfterEndpoint<A, M> for &Instance<A>
1803where
1804    A: Actor + Handler<M>,
1805    M: Message,
1806{
1807    fn endpoint_location(&self) -> crate::EndpointLocation {
1808        crate::EndpointLocation::Actor(self.self_addr().clone())
1809    }
1810
1811    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1812        let dest = self.clone_for_py();
1813        Box::new(move |this| crate::Endpoint::post(&dest, this, message))
1814    }
1815}
1816
1817impl<A, M> PostAfterEndpoint<A, M> for &Context<'_, A>
1818where
1819    A: Actor + Handler<M>,
1820    M: Message,
1821{
1822    fn endpoint_location(&self) -> crate::EndpointLocation {
1823        crate::EndpointLocation::Actor(self.self_addr().clone())
1824    }
1825
1826    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1827        let dest = self.clone_for_py();
1828        Box::new(move |this| crate::Endpoint::post(&dest, this, message))
1829    }
1830}
1831
1832impl<A, M> PostAfterEndpoint<A, M> for Instance<A>
1833where
1834    A: Actor + Handler<M>,
1835    M: Message,
1836{
1837    fn endpoint_location(&self) -> crate::EndpointLocation {
1838        crate::EndpointLocation::Actor(self.self_addr().clone())
1839    }
1840
1841    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1842        Box::new(move |this| crate::Endpoint::post(&self, this, message))
1843    }
1844}
1845
1846impl<A, B, M> PostAfterEndpoint<A, M> for ActorHandle<B>
1847where
1848    A: Actor,
1849    B: Actor + Handler<M>,
1850    M: Message,
1851{
1852    fn endpoint_location(&self) -> crate::EndpointLocation {
1853        crate::Endpoint::endpoint_location(&self)
1854    }
1855
1856    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1857        Box::new(move |this| crate::Endpoint::post(&self, this, message))
1858    }
1859}
1860
1861impl<A, M> PostAfterEndpoint<A, M> for PortHandle<M>
1862where
1863    A: Actor,
1864    M: Message,
1865{
1866    fn endpoint_location(&self) -> crate::EndpointLocation {
1867        crate::Endpoint::endpoint_location(&self)
1868    }
1869
1870    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1871        Box::new(move |this| crate::Endpoint::post(&self, this, message))
1872    }
1873}
1874
1875impl<A, M> PostAfterEndpoint<A, M> for OncePortHandle<M>
1876where
1877    A: Actor,
1878    M: Message,
1879{
1880    fn endpoint_location(&self) -> crate::EndpointLocation {
1881        crate::Endpoint::endpoint_location(self)
1882    }
1883
1884    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1885        Box::new(move |this| crate::Endpoint::post(self, this, message))
1886    }
1887}
1888
1889impl<A, B, M> PostAfterEndpoint<A, M> for ActorRef<B>
1890where
1891    A: Actor,
1892    B: Referable + RemoteHandles<M>,
1893    M: RemoteMessage,
1894{
1895    fn endpoint_location(&self) -> crate::EndpointLocation {
1896        crate::Endpoint::endpoint_location(&self)
1897    }
1898
1899    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1900        Box::new(move |this| crate::Endpoint::post(&self, this, message))
1901    }
1902}
1903
1904impl<A, M> PostAfterEndpoint<A, M> for crate::PortRef<M>
1905where
1906    A: Actor,
1907    M: RemoteMessage,
1908{
1909    fn endpoint_location(&self) -> crate::EndpointLocation {
1910        crate::Endpoint::endpoint_location(&self)
1911    }
1912
1913    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1914        Box::new(move |this| crate::Endpoint::post(&self, this, message))
1915    }
1916}
1917
1918impl<A, M> PostAfterEndpoint<A, M> for crate::OncePortRef<M>
1919where
1920    A: Actor,
1921    M: RemoteMessage,
1922{
1923    fn endpoint_location(&self) -> crate::EndpointLocation {
1924        crate::Endpoint::endpoint_location(self)
1925    }
1926
1927    fn into_delayed_post(self, message: M) -> DelayedPost<A> {
1928        Box::new(move |this| crate::Endpoint::post(self, this, message))
1929    }
1930}
1931
1932struct DelayedPosts<A: Actor> {
1933    ingress: Arc<DelayedPostIngressGate>,
1934    state: Mutex<DelayedPostState<A>>,
1935    notify: Notify,
1936}
1937
1938struct DelayedPostState<A: Actor> {
1939    queue: BTreeMap<(tokio::time::Instant, u64), DelayedPost<A>>,
1940    next_order: u64,
1941}
1942
1943impl<A: Actor> DelayedPosts<A> {
1944    fn new() -> Self {
1945        Self {
1946            ingress: Arc::new(DelayedPostIngressGate::new()),
1947            state: Mutex::new(DelayedPostState {
1948                queue: BTreeMap::new(),
1949                next_order: 0,
1950            }),
1951            notify: Notify::new(),
1952        }
1953    }
1954
1955    fn push(&self, deadline: tokio::time::Instant, post: DelayedPost<A>) {
1956        let mut state = self.state.lock().unwrap();
1957        let order = state.next_order;
1958        state.next_order = state.next_order.wrapping_add(1);
1959        state.queue.insert((deadline, order), post);
1960        drop(state);
1961        self.notify.notify_one();
1962    }
1963
1964    fn next_deadline(&self) -> Option<tokio::time::Instant> {
1965        self.state
1966            .lock()
1967            .unwrap()
1968            .queue
1969            .keys()
1970            .next()
1971            .map(|(deadline, _)| *deadline)
1972    }
1973
1974    fn pop_due(&self, now: tokio::time::Instant) -> Vec<DelayedPost<A>> {
1975        let mut posts = Vec::new();
1976        let mut state = self.state.lock().unwrap();
1977        while let Some((&(deadline, _), _)) = state.queue.first_key_value() {
1978            if deadline > now {
1979                break;
1980            }
1981            let (_, post) = state.queue.pop_first().expect("delayed post should exist");
1982            posts.push(post);
1983        }
1984        posts
1985    }
1986
1987    fn drain(&self) {
1988        self.ingress.drain();
1989    }
1990
1991    fn is_draining(&self) -> bool {
1992        self.ingress.is_draining()
1993    }
1994}
1995
1996const DELAYED_POST_INGRESS_DRAINING: usize = 1usize << (usize::BITS as usize - 1);
1997const DELAYED_POST_INGRESS_ACTIVE_MASK: usize = !DELAYED_POST_INGRESS_DRAINING;
1998
1999struct DelayedPostIngressGate {
2000    state: AtomicUsize,
2001    wait_lock: Mutex<()>,
2002    drained: Condvar,
2003}
2004
2005struct DelayedPostIngressGuard {
2006    gate: Arc<DelayedPostIngressGate>,
2007}
2008
2009impl DelayedPostIngressGate {
2010    fn new() -> Self {
2011        Self {
2012            state: AtomicUsize::new(0),
2013            wait_lock: Mutex::new(()),
2014            drained: Condvar::new(),
2015        }
2016    }
2017
2018    fn try_enter(self: &Arc<Self>) -> Result<DelayedPostIngressGuard, ()> {
2019        let mut state = self.state.load(Ordering::Acquire);
2020        loop {
2021            if state & DELAYED_POST_INGRESS_DRAINING != 0 {
2022                return Err(());
2023            }
2024
2025            let active = state & DELAYED_POST_INGRESS_ACTIVE_MASK;
2026            assert!(
2027                active < DELAYED_POST_INGRESS_ACTIVE_MASK,
2028                "too many active delayed post sends"
2029            );
2030
2031            match self.state.compare_exchange_weak(
2032                state,
2033                state + 1,
2034                Ordering::AcqRel,
2035                Ordering::Acquire,
2036            ) {
2037                Ok(_) => {
2038                    return Ok(DelayedPostIngressGuard {
2039                        gate: Arc::clone(self),
2040                    });
2041                }
2042                Err(next_state) => state = next_state,
2043            }
2044        }
2045    }
2046
2047    fn drain(&self) {
2048        let mut state = self.state.load(Ordering::Acquire);
2049        loop {
2050            if state & DELAYED_POST_INGRESS_DRAINING != 0 {
2051                break;
2052            }
2053            match self.state.compare_exchange_weak(
2054                state,
2055                state | DELAYED_POST_INGRESS_DRAINING,
2056                Ordering::AcqRel,
2057                Ordering::Acquire,
2058            ) {
2059                Ok(_) => break,
2060                Err(next_state) => state = next_state,
2061            }
2062        }
2063
2064        let mut wait_guard = self.wait_lock.lock().unwrap();
2065        while self.state.load(Ordering::Acquire) & DELAYED_POST_INGRESS_ACTIVE_MASK != 0 {
2066            wait_guard = self.drained.wait(wait_guard).unwrap();
2067        }
2068    }
2069
2070    fn is_draining(&self) -> bool {
2071        self.state.load(Ordering::Acquire) & DELAYED_POST_INGRESS_DRAINING != 0
2072    }
2073}
2074
2075impl Drop for DelayedPostIngressGuard {
2076    fn drop(&mut self) {
2077        let previous = self.gate.state.fetch_sub(1, Ordering::AcqRel);
2078        assert!(
2079            previous & DELAYED_POST_INGRESS_ACTIVE_MASK != 0,
2080            "delayed post ingress active count underflow"
2081        );
2082        if previous & DELAYED_POST_INGRESS_DRAINING != 0
2083            && previous & DELAYED_POST_INGRESS_ACTIVE_MASK == 1
2084        {
2085            let _wait_guard = self.gate.wait_lock.lock().unwrap();
2086            self.gate.drained.notify_all();
2087        }
2088    }
2089}
2090
2091impl<A: Actor> InstanceState<A> {
2092    fn self_addr(&self) -> &ActorAddr {
2093        self.mailbox.actor_addr()
2094    }
2095}
2096
2097impl<A: Actor> Drop for InstanceState<A> {
2098    fn drop(&mut self) {
2099        self.status_tx.send_if_modified(|status| {
2100            if status.is_terminal() {
2101                false
2102            } else {
2103                tracing::info!(
2104                    name = "ActorStatus",
2105                    actor_id = %self.self_addr(),
2106                    actor_name = self.self_addr().log_name(),
2107                    status = "Stopped",
2108                    prev_status = status.arm().unwrap_or("unknown"),
2109                    "instance is dropped",
2110                );
2111                *status = ActorStatus::Stopped("instance is dropped".into());
2112                true
2113            }
2114        });
2115    }
2116}
2117
2118/// Receivers created by [`Instance::new`] that must be threaded to
2119/// their respective consumers (actor loop, introspect task, etc.).
2120///
2121/// # Invariant
2122///
2123/// See S10 in `introspect` module doc.
2124pub struct InstanceReceivers<A: Actor> {
2125    /// Signal and supervision receivers for the actor loop. `None`
2126    /// for detached/client instances that don't run an actor loop.
2127    actor_loop: Option<(
2128        PortReceiver<Signal>,
2129        mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2130    )>,
2131    /// Work queue for dispatching messages to actor handlers.
2132    work: mpsc::UnboundedReceiver<WorkCell<A>>,
2133    /// Introspect message receiver for the dedicated introspect task.
2134    introspect: PortReceiver<IntrospectMessage>,
2135}
2136
2137impl<A: Actor> Instance<A> {
2138    /// Create a new actor instance in Created state.
2139    fn new(
2140        proc: Proc,
2141        actor_id: ActorAddr,
2142        detached: bool,
2143        parent: Option<InstanceCell>,
2144    ) -> (Self, InstanceReceivers<A>) {
2145        // Set up messaging
2146        let mailbox = Mailbox::new(actor_id.clone());
2147        let (work_tx, work_rx) = ordered_channel(
2148            actor_id.to_string(),
2149            hyperactor_config::global::get(config::ENABLE_DEST_ACTOR_REORDERING_BUFFER),
2150        );
2151        let queue_depth = Arc::new(AtomicU64::new(0));
2152        let proc_stats = Arc::clone(&proc.state().queue_stats);
2153        let ports: Arc<HandlerPorts<A>> = Arc::new(HandlerPorts::new(
2154            mailbox.clone(),
2155            work_tx,
2156            Arc::clone(&queue_depth),
2157            proc_stats,
2158        ));
2159        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
2160        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
2161
2162        let actor_type = match TypeInfo::of::<A>() {
2163            Some(info) => ActorType::Named(info),
2164            None => ActorType::Anonymous(std::any::type_name::<A>()),
2165        };
2166        let actor_loop_ports = if detached {
2167            None
2168        } else {
2169            let (signal_port, signal_receiver) = mailbox.open_port::<Signal>();
2170            let (supervision_tx, supervision_receiver) =
2171                mpsc::unbounded_channel::<ActorSupervisionEvent>();
2172            Some((
2173                (signal_port, supervision_tx),
2174                (signal_receiver, supervision_receiver),
2175            ))
2176        };
2177
2178        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
2179
2180        // Introspect port: a separate channel handled by a dedicated
2181        // tokio task (not the actor's message loop). bind_handler_port()
2182        // registers in the mailbox
2183        // dispatch table at IntrospectMessage::port().
2184        //
2185        // Exercises S3, S4, S9 (see introspect module doc).
2186        let (introspect_port, introspect_receiver) = mailbox.open_port::<IntrospectMessage>();
2187        introspect_port.bind_handler_port();
2188
2189        let cell = InstanceCell::new(
2190            actor_id,
2191            actor_type,
2192            proc.clone(),
2193            actor_loop,
2194            status_rx,
2195            parent,
2196            ports.clone(),
2197            queue_depth,
2198        );
2199        let instance_id = Uuid::now_v7();
2200        let inner = Arc::new(InstanceState {
2201            proc,
2202            cell,
2203            mailbox,
2204            ports,
2205            delayed_posts: DelayedPosts::new(),
2206            status_tx,
2207            sequencer: Sequencer::new(instance_id),
2208            id: instance_id,
2209            instance_locals: ActorLocalStorage::new(),
2210        });
2211        (
2212            Self { inner },
2213            InstanceReceivers {
2214                actor_loop: actor_loop_receivers,
2215                work: work_rx,
2216                introspect: introspect_receiver,
2217            },
2218        )
2219    }
2220
2221    /// Notify subscribers of a change in the actors status and bump counters with the duration which
2222    /// the last status was active for.
2223    #[track_caller]
2224    pub fn change_status(&self, new: ActorStatus) {
2225        let old = self.inner.status_tx.send_replace(new.clone());
2226        // 2 cases are allowed:
2227        // * non-terminal -> non-terminal
2228        // * non-terminal -> terminal
2229        // terminal -> terminal is not allowed unless it is the same status (no-op).
2230        // terminal -> non-terminal is never allowed.
2231        assert!(
2232            !old.is_terminal() && !new.is_terminal()
2233                || !old.is_terminal() && new.is_terminal()
2234                || old == new,
2235            "actor changing status illegally, only allow non-terminal -> non-terminal \
2236            and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
2237            self.self_addr(),
2238            old,
2239            new
2240        );
2241        // Actor status changes between Idle and Processing when handling every
2242        // message. It creates too many logs if we want to log these 2 states.
2243        // Also, sometimes the actor transitions from Processing -> Processing.
2244        // Therefore we skip the status changes between them.
2245        if !((old.is_idle() && new.is_processing())
2246            || (old.is_processing() && new.is_idle())
2247            || old == new)
2248        {
2249            let new_status = new.arm().unwrap_or("unknown");
2250            let change_reason = match &new {
2251                ActorStatus::Failed(reason) => reason.to_string(),
2252                ActorStatus::Stopped(reason) => reason.clone(),
2253                _ => "".to_string(),
2254            };
2255            tracing::info!(
2256                name = "ActorStatus",
2257                actor_id = %self.self_addr(),
2258                actor_name = self.self_addr().log_name(),
2259                status = new_status,
2260                prev_status = old.arm().unwrap_or("unknown"),
2261                caller = %PanicLocation::caller(),
2262                change_reason,
2263            );
2264            let actor_id = hash_to_u64(self.self_addr());
2265            notify_actor_status_changed(ActorStatusEvent {
2266                id: generate_actor_status_event_id(actor_id),
2267                timestamp: std::time::SystemTime::now(),
2268                actor_id,
2269                new_status: new_status.to_string(),
2270                reason: if change_reason.is_empty() {
2271                    None
2272                } else {
2273                    Some(change_reason)
2274                },
2275            });
2276        }
2277    }
2278
2279    fn is_terminal(&self) -> bool {
2280        self.inner.status_tx.borrow().is_terminal()
2281    }
2282
2283    fn is_stopping(&self) -> bool {
2284        self.inner.status_tx.borrow().is_stopping()
2285    }
2286
2287    /// This instance's actor address.
2288    pub fn self_addr(&self) -> &ActorAddr {
2289        self.inner.self_addr()
2290    }
2291
2292    /// Report a message that could not be delivered and could not be returned.
2293    pub(crate) fn report_lost_message(&self, lost: crate::mailbox::LostMessage) {
2294        static REPORT_LOST_WARNED_MAILBOXES: OnceLock<DashSet<ActorAddr>> = OnceLock::new();
2295
2296        let mailbox = &self.inner.mailbox;
2297        let return_handle = mailbox.bound_return_handle().unwrap_or_else(|| {
2298            let actor_id = mailbox.actor_addr();
2299            if REPORT_LOST_WARNED_MAILBOXES
2300                .get_or_init(DashSet::new)
2301                .insert(actor_id.clone())
2302            {
2303                let bt = std::backtrace::Backtrace::force_capture();
2304                tracing::warn!(
2305                    actor_id = ?actor_id,
2306                    backtrace = ?bt,
2307                    "actor attempted to report a lost message without binding Undeliverable<MessageEnvelope>"
2308                );
2309            }
2310            crate::mailbox::monitored_return_handle()
2311        });
2312
2313        if let Err(error) =
2314            return_handle.try_send(self, crate::mailbox::Undeliverable::lost(lost.clone()))
2315        {
2316            tracing::error!(
2317                sender = %lost.sender,
2318                dest = %lost.dest,
2319                message_type = lost.message_type.as_deref().unwrap_or("unknown"),
2320                error = %lost.error,
2321                return_error = %error,
2322                "lost message could not be reported"
2323            );
2324        }
2325    }
2326
2327    /// Snapshot of this actor's introspection payload.
2328    ///
2329    /// Returns an [`IntrospectResult`] built from live [`InstanceCell`]
2330    /// state, without going through the actor message loop. This is
2331    /// safe to call from within a handler on the same actor (no
2332    /// self-send deadlock).
2333    ///
2334    /// The snapshot is best-effort: it reflects framework-owned state
2335    /// (status, message count, flight recorder, supervision children)
2336    /// at the instant of the call. `parent` is left as `None` —
2337    /// callers are responsible for setting topology context.
2338    ///
2339    /// Note: this acquires a write lock on the flight recorder spool
2340    /// and clones its contents. Suitable for occasional introspection
2341    /// requests, not for hot paths.
2342    pub fn introspect_payload(&self) -> crate::introspect::IntrospectResult {
2343        crate::introspect::live_actor_payload(&self.inner.cell)
2344    }
2345
2346    /// Return a fresh tracing span bound to this actor's flight
2347    /// recorder, with this actor as the subject. See FR-1, FR-2, FR-3
2348    /// in module doc.
2349    pub fn recording_span(&self) -> tracing::Span {
2350        use crate::subject::AsSubject;
2351        self.inner
2352            .cell
2353            .recording()
2354            .span(&self.self_addr().subject().to_string())
2355    }
2356
2357    /// Publish domain-specific properties for introspection.
2358    ///
2359    /// Publish a complete Attrs bag for introspection. Replaces any
2360    /// previously published attrs.
2361    ///
2362    /// Debug builds assert that every key in the bag is tagged with
2363    /// the `INTROSPECT` meta-attribute.
2364    pub fn publish_attrs(&self, attrs: hyperactor_config::Attrs) {
2365        #[cfg(debug_assertions)]
2366        {
2367            use std::collections::HashSet;
2368            use std::sync::OnceLock;
2369
2370            use hyperactor_config::attrs::AttrKeyInfo;
2371
2372            static INTROSPECT_KEYS: OnceLock<HashSet<&'static str>> = OnceLock::new();
2373            let allowed = INTROSPECT_KEYS.get_or_init(|| {
2374                inventory::iter::<AttrKeyInfo>()
2375                    .filter(|info| info.meta.get(hyperactor_config::INTROSPECT).is_some())
2376                    .map(|info| info.name)
2377                    .collect()
2378            });
2379            for (name, _) in attrs.iter() {
2380                debug_assert!(
2381                    allowed.contains(name),
2382                    "publish_attrs: key {:?} is not tagged with INTROSPECT",
2383                    name
2384                );
2385            }
2386        }
2387        self.inner.cell.set_published_attrs(attrs);
2388    }
2389
2390    /// Publish a single attr key-value pair for introspection. Merges
2391    /// into existing published attrs (insert or overwrite).
2392    ///
2393    /// Debug builds assert that the key is tagged with the
2394    /// `INTROSPECT` meta-attribute.
2395    pub fn publish_attr<T: hyperactor_config::AttrValue>(
2396        &self,
2397        key: hyperactor_config::Key<T>,
2398        value: T,
2399    ) {
2400        debug_assert!(
2401            key.attrs().get(hyperactor_config::INTROSPECT).is_some(),
2402            "publish_attr called with non-introspection key: {}",
2403            key.name()
2404        );
2405        self.inner.cell.merge_published_attr(key, value);
2406    }
2407
2408    /// Mark this actor as system/infrastructure. System actors are
2409    /// hidden by default in the TUI (toggled via `s`).
2410    pub fn set_system(&self) {
2411        self.inner
2412            .cell
2413            .inner
2414            .is_system
2415            .store(true, Ordering::Relaxed);
2416    }
2417
2418    /// Register a callback for resolving non-addressable children.
2419    ///
2420    /// The callback runs on the actor's introspect task (not the
2421    /// actor loop), so it must be `Send + Sync` and must not access
2422    /// actor-mutable state. Capture cloned `Proc` references.
2423    ///
2424    /// Only `HostAgent` uses this today — for resolving system
2425    /// procs that have no independent `ProcAgent`.
2426    pub fn set_query_child_handler(
2427        &self,
2428        handler: impl (Fn(&Addr) -> IntrospectResult) + Send + Sync + 'static,
2429    ) {
2430        self.inner.cell.set_query_child_handler(handler);
2431    }
2432
2433    /// Signal the actor to stop.
2434    pub fn stop(&self, reason: &str) -> Result<(), ActorError> {
2435        tracing::info!(
2436            actor_id = %self.inner.cell.actor_addr(),
2437            reason,
2438            "instance stop called",
2439        );
2440        self.inner.cell.signal(Signal::Stop(reason.to_string()))
2441    }
2442
2443    /// Signal the actor to drain current ordinary work and then stop.
2444    pub fn drain_and_stop(&self, reason: &str) -> Result<(), ActorError> {
2445        tracing::info!(
2446            actor_id = %self.inner.cell.actor_addr(),
2447            reason,
2448            "instance drain_and_stop called",
2449        );
2450        self.inner
2451            .cell
2452            .signal(Signal::DrainAndStop(reason.to_string()))
2453    }
2454
2455    /// Signal the actor to terminate immediately with a provided reason.
2456    pub fn kill(&self, reason: &str) -> Result<(), ActorError> {
2457        tracing::info!(
2458            actor_id = %self.inner.cell.actor_addr(),
2459            reason,
2460            "instance kill called",
2461        );
2462        self.inner.cell.signal(Signal::Kill(reason.to_string()))
2463    }
2464
2465    /// Backward-compatible alias for `kill()`.
2466    pub fn abort(&self, reason: &str) -> Result<(), ActorError> {
2467        tracing::info!(
2468            actor_id = %self.inner.cell.actor_addr(),
2469            reason,
2470            "instance abort called",
2471        );
2472        self.kill(reason)
2473    }
2474
2475    /// Close handler ingress for this actor.
2476    pub fn close(&self) {
2477        self.inner.delayed_posts.drain();
2478        self.inner.mailbox.drain();
2479    }
2480
2481    /// Request immediate actor exit with the provided stop reason.
2482    pub fn exit(&self, reason: &str) -> Result<(), ActorError> {
2483        self.inner
2484            .cell
2485            .signal(Signal::ExitRequested(reason.to_string()))
2486    }
2487
2488    /// Queue an internal exit request after already accepted handler work.
2489    ///
2490    /// This is intentionally a small runtime special case for now.
2491    /// The long-term goal is to make "exit after drain" fall out of
2492    /// ordinary self-messaging semantics rather than requiring a
2493    /// dedicated internal path here.
2494    pub fn exit_after_drain(&self, reason: &str) -> Result<(), ActorError> {
2495        let this = self.clone_for_py();
2496        let reason = reason.to_string();
2497        let work = WorkCell::new(move |_actor: &mut A, _instance: &Instance<A>| {
2498            Box::pin(async move {
2499                this.exit(&reason).map_err(anyhow::Error::from)?;
2500                Ok(())
2501            })
2502        });
2503        self.enqueue_runtime_work(work)
2504    }
2505
2506    /// Open a new port that accepts M-typed messages. The returned
2507    /// port may be freely cloned, serialized, and passed around. The
2508    /// returned receiver should only be retained by the actor responsible
2509    /// for processing the delivered messages.
2510    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
2511        self.inner.mailbox.open_port()
2512    }
2513
2514    /// Open a new one-shot port that accepts M-typed messages. The
2515    /// returned port may be used to send a single message; ditto the
2516    /// receiver may receive a single message.
2517    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
2518        self.inner.mailbox.open_once_port()
2519    }
2520
2521    /// Return this actor's runtime signal port.
2522    #[doc(hidden)]
2523    pub fn signal_port(&self) -> PortHandle<Signal> {
2524        self.inner.cell.signal_port()
2525    }
2526
2527    /// Get the per-instance local storage.
2528    pub fn locals(&self) -> &ActorLocalStorage {
2529        &self.inner.instance_locals
2530    }
2531
2532    /// Send a message to the actor running on the proc.
2533    pub fn post(&self, port_id: impl Into<PortAddr>, headers: Flattrs, message: wirevalue::Any) {
2534        let port_id: PortAddr = port_id.into();
2535        <Self as context::MailboxExt>::post(
2536            self,
2537            port_id,
2538            headers,
2539            message,
2540            true,
2541            context::SeqInfoPolicy::AssignNew,
2542        )
2543    }
2544
2545    /// Post a message with pre-set SEQ_INFO. Only for internal use by CommActor.
2546    ///
2547    /// # Warning
2548    /// This method bypasses the SEQ_INFO assertion. Do not use unless you are
2549    /// implementing mesh-level message routing (CommActor).
2550    #[doc(hidden)]
2551    pub fn post_with_external_seq_info(
2552        &self,
2553        port_id: impl Into<PortAddr>,
2554        headers: Flattrs,
2555        message: wirevalue::Any,
2556    ) {
2557        <Self as context::MailboxExt>::post(
2558            self,
2559            port_id.into(),
2560            headers,
2561            message,
2562            true,
2563            context::SeqInfoPolicy::AllowExternal,
2564        )
2565    }
2566
2567    fn enqueue_runtime_work(&self, work: WorkCell<A>) -> Result<(), ActorError> {
2568        let actor_id_str = self.self_addr().to_string();
2569        account_enqueue(
2570            &self.inner.cell.inner.queue_depth,
2571            &self.inner.proc.state().queue_stats,
2572            &actor_id_str,
2573        );
2574        let result = self
2575            .inner
2576            .ports
2577            .workq
2578            .direct_send(work)
2579            .map_err(anyhow::Error::from);
2580        if result.is_err() {
2581            account_cancel_enqueue(
2582                &self.inner.cell.inner.queue_depth,
2583                &self.inner.proc.state().queue_stats,
2584                &actor_id_str,
2585            );
2586        }
2587        result.map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::processing(err)))
2588    }
2589
2590    /// Return a static client instance that can be used to send
2591    /// messages to port handles from outside an actor context
2592    /// (e.g. from background tokio tasks).
2593    // TODO: replace with a proper mechanism for sending to port
2594    // handles without an actor context.
2595    pub fn self_client() -> &'static Instance<()> {
2596        static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
2597        &CLIENT
2598            .get_or_init(|| Proc::runtime().client("self_message_client").unwrap())
2599            .0
2600    }
2601
2602    /// Post `message` to `dest` after `delay`.
2603    ///
2604    /// Delayed posts are owned by the actor runtime. They are best-effort:
2605    /// messages are posted no earlier than `delay`, and any delayed posts that
2606    /// have not fired when the actor shuts down are discarded.
2607    #[allow(private_bounds)]
2608    pub fn post_after<D, M>(&self, dest: D, message: M, delay: Duration)
2609    where
2610        M: Message,
2611        D: PostAfterEndpoint<A, M>,
2612    {
2613        let dest_location = dest.endpoint_location();
2614        if matches!(*self.inner.status_tx.borrow(), ActorStatus::Client) {
2615            self.report_lost_message(crate::mailbox::LostMessage {
2616                sender: self.mailbox().actor_addr().clone(),
2617                dest: dest_location,
2618                message_type: Some(std::any::type_name::<M>().to_string()),
2619                error: "delayed posts require an actor runtime".to_string(),
2620            });
2621            return;
2622        }
2623        let Ok(_guard) = self.inner.delayed_posts.ingress.try_enter() else {
2624            self.report_lost_message(crate::mailbox::LostMessage {
2625                sender: self.mailbox().actor_addr().clone(),
2626                dest: dest_location,
2627                message_type: Some(std::any::type_name::<M>().to_string()),
2628                error: "actor runtime is stopping".to_string(),
2629            });
2630            return;
2631        };
2632        if self.is_stopping() || self.is_terminal() {
2633            self.report_lost_message(crate::mailbox::LostMessage {
2634                sender: self.mailbox().actor_addr().clone(),
2635                dest: dest_location,
2636                message_type: Some(std::any::type_name::<M>().to_string()),
2637                error: "actor runtime is stopping".to_string(),
2638            });
2639            return;
2640        }
2641
2642        self.inner.delayed_posts.push(
2643            tokio::time::Instant::now() + delay,
2644            dest.into_delayed_post(message),
2645        );
2646    }
2647
2648    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
2649    /// the actor has been linked with its parent, if it has one.
2650    fn start(self, actor: A, receivers: InstanceReceivers<A>) -> ActorHandle<A> {
2651        let instance_cell = self.inner.cell.clone();
2652        let actor_id = self.inner.cell.actor_addr().clone();
2653        let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
2654
2655        // Spawn the introspect task — a separate tokio task that
2656        // reads InstanceCell directly and replies through the owning Proc. The
2657        // actor loop never sees IntrospectMessage.
2658        tokio::spawn(crate::introspect::serve_introspect(
2659            self.inner.cell.clone(),
2660            receivers.introspect,
2661        ));
2662
2663        let actor_loop_receivers = receivers
2664            .actor_loop
2665            .expect("non-detached instance must have actor loop receivers");
2666        let actor_task_handle = A::spawn_server_task(
2667            panic_handler::with_backtrace_tracking(self.serve(
2668                actor,
2669                actor_loop_receivers,
2670                receivers.work,
2671            ))
2672            .instrument(Span::current()),
2673        );
2674        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
2675        instance_cell
2676            .inner
2677            .actor_task_handle
2678            .set(actor_task_handle)
2679            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
2680
2681        actor_handle
2682    }
2683
2684    async fn serve(
2685        mut self,
2686        mut actor: A,
2687        actor_loop_receivers: (
2688            PortReceiver<Signal>,
2689            mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2690        ),
2691        mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
2692    ) {
2693        let result = self
2694            .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
2695            .await;
2696
2697        assert!(self.is_stopping());
2698        // Compute the terminal status and supervision event, but defer
2699        // change_status until AFTER the event is delivered. If we flip
2700        // the status to terminal first, a concurrent destroy_and_wait
2701        // observer can release Phase 1 and stop the coordinator before
2702        // the event lands in its mailbox — dropping the event.
2703        let (terminal_status, event) = match result {
2704            Ok(stop_reason) => {
2705                let status = ActorStatus::Stopped(stop_reason);
2706                let event = ActorSupervisionEvent::new(
2707                    self.inner.cell.actor_addr().clone(),
2708                    actor.display_name(),
2709                    status.clone(),
2710                    None,
2711                );
2712                (status, Some(event))
2713            }
2714            Err(err) => match *err.kind {
2715                ActorErrorKind::UnhandledSupervisionEvent(box event) => {
2716                    // We use the event's actor_status as this actor's terminal status.
2717                    assert!(event.actor_status.is_terminal());
2718                    let status = event.actor_status.clone();
2719                    (status, Some(event))
2720                }
2721                _ => {
2722                    let error_kind = ActorErrorKind::Generic(err.kind.to_string());
2723                    let status = ActorStatus::Failed(error_kind);
2724                    let event = ActorSupervisionEvent::new(
2725                        self.inner.cell.actor_addr().clone(),
2726                        actor.display_name(),
2727                        status.clone(),
2728                        None,
2729                    );
2730                    (status, Some(event))
2731                }
2732            },
2733        };
2734
2735        self.mailbox().close(terminal_status.clone());
2736        // FI-1: store supervision_event BEFORE change_status.
2737        if let Some(event) = &event {
2738            *self.inner.cell.inner.supervision_event.lock().unwrap() = Some(event.clone());
2739        }
2740
2741        // Deliver the supervision event to the parent/proc BEFORE
2742        // change_status so that any observer waiting for this actor's
2743        // terminal state can only see it once the event has been
2744        // enqueued at its destination.
2745        if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
2746            if let Some(event) = event {
2747                // Parent exists, failure should be propagated to the parent.
2748                parent.send_supervision_event_or_crash(event);
2749            }
2750            // TODO: we should get rid of this signal, and use *only* supervision events for
2751            // the purpose of conveying lifecycle changes
2752            if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.uid().clone())) {
2753                tracing::error!(
2754                    "{}: failed to send stop message to parent uid {}: {:?}",
2755                    self.self_addr(),
2756                    parent.uid(),
2757                    err
2758                );
2759            }
2760        } else {
2761            // Failure happened to the root actor or orphaned child actors.
2762            // In either case, the failure should be propagated to proc.
2763            //
2764            // Note that orphaned actor is unexpected and would only happen if
2765            // there is a bug.
2766            if let Some(event) = event {
2767                self.inner
2768                    .proc
2769                    .handle_unhandled_supervision_event(&self, event);
2770            }
2771        }
2772
2773        self.change_status(terminal_status);
2774    }
2775
2776    /// Runs the actor, and manages its supervision tree. When the function returns,
2777    /// the whole tree rooted at this actor has stopped. On success, returns the reason
2778    /// why the actor stopped. On failure, returns the error that caused the failure.
2779    async fn run_actor_tree(
2780        &mut self,
2781        actor: &mut A,
2782        mut actor_loop_receivers: (
2783            PortReceiver<Signal>,
2784            mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2785        ),
2786        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
2787    ) -> Result<String, ActorError> {
2788        // It is okay to catch all panics here, because we are in a tokio task,
2789        // and tokio will catch the panic anyway:
2790        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
2791        // What we do here is just to catch it early so we can handle it.
2792
2793        let mut did_panic = false;
2794        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
2795            .catch_unwind()
2796            .await
2797        {
2798            Ok(result) => result,
2799            Err(_) => {
2800                did_panic = true;
2801                let panic_info = panic_handler::take_panic_info()
2802                    .map(|info| info.to_string())
2803                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
2804                Err(ActorError::new(
2805                    self.self_addr(),
2806                    ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
2807                ))
2808            }
2809        };
2810
2811        assert!(!self.is_terminal());
2812        self.change_status(ActorStatus::Stopping);
2813        if let Err(err) = &result {
2814            tracing::error!("{}: actor failure: {}", self.self_addr(), err);
2815        }
2816
2817        // After this point, we know we won't spawn any more children,
2818        // so we can safely read the current child keys.
2819        let mut to_unlink = Vec::new();
2820        for child in self.inner.cell.child_iter() {
2821            if let Err(err) = child
2822                .value()
2823                .signal(Signal::Stop("parent stopping".to_string()))
2824            {
2825                tracing::error!(
2826                    "{}: failed to send stop signal to child pid {}: {:?}",
2827                    self.self_addr(),
2828                    child.key(),
2829                    err
2830                );
2831                to_unlink.push(child.value().clone());
2832            }
2833        }
2834        // Manually unlink children that have already been stopped.
2835        for child in to_unlink {
2836            self.inner.cell.unlink(&child);
2837        }
2838
2839        let (mut signal_receiver, _) = actor_loop_receivers;
2840        while self.inner.cell.child_count() > 0 {
2841            match tokio::time::timeout(Duration::from_millis(500), signal_receiver.recv()).await {
2842                Ok(signal) => {
2843                    if let Signal::ChildStopped(uid) = signal? {
2844                        assert!(self.inner.cell.get_child(&uid).is_none());
2845                    }
2846                }
2847                Err(_) => {
2848                    tracing::warn!(
2849                        "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
2850                        self.self_addr()
2851                    );
2852                    // No more waiting to receive messages. Unlink all remaining
2853                    // children.
2854                    self.inner.cell.unlink_all();
2855                    break;
2856                }
2857            }
2858        }
2859        // Run the actor cleanup function before the actor stops to delete
2860        // resources. If it times out, continue with stopping the actor.
2861        // Don't call it if there was a panic, because the actor may
2862        // be in an invalid state and unable to access anything, for example
2863        // the GIL.
2864        let cleanup_result = if !did_panic {
2865            let cleanup_timeout = hyperactor_config::global::get(config::CLEANUP_TIMEOUT);
2866            match tokio::time::timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
2867                .await
2868            {
2869                Ok(Ok(x)) => Ok(x),
2870                Ok(Err(e)) => Err(ActorError::new(
2871                    self.self_addr(),
2872                    ActorErrorKind::cleanup(e),
2873                )),
2874                Err(e) => Err(ActorError::new(
2875                    self.self_addr(),
2876                    ActorErrorKind::cleanup(e.into()),
2877                )),
2878            }
2879        } else {
2880            Ok(())
2881        };
2882        if let Err(ref actor_err) = result {
2883            // The original result error takes precedence over the cleanup error,
2884            // so make sure the cleanup error is still logged in that case.
2885            if let Err(ref err) = cleanup_result {
2886                tracing::warn!(
2887                    cleanup_err = %err,
2888                    %actor_err,
2889                    "ignoring cleanup error after actor error",
2890                );
2891            }
2892        }
2893        // If the original exit was not an error, let cleanup errors be
2894        // surfaced.
2895        result.and_then(|reason| cleanup_result.map(|_| reason))
2896    }
2897
2898    /// Initialize and run the actor until it fails or is stopped. On success,
2899    /// returns the reason why the actor stopped. On failure, returns the error
2900    /// that caused the failure.
2901    async fn run(
2902        &mut self,
2903        actor: &mut A,
2904        actor_loop_receivers: &mut (
2905            PortReceiver<Signal>,
2906            mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2907        ),
2908        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
2909    ) -> Result<String, ActorError> {
2910        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
2911
2912        self.change_status(ActorStatus::Initializing);
2913        actor
2914            .init(self)
2915            .await
2916            .map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::init(err)))?;
2917        let actor_id_str = self.self_addr().to_string();
2918        let stop_reason = 'messages: loop {
2919            if !self.is_stopping() {
2920                self.change_status(ActorStatus::Idle);
2921            }
2922            let next_delayed_deadline = self.inner.delayed_posts.next_deadline();
2923            let metric_pairs = hyperactor_telemetry::kv_pairs!("actor_id" => actor_id_str.clone());
2924            tokio::select! {
2925                biased;
2926                signal = signal_receiver.recv() => {
2927                    let signal = signal.map_err(ActorError::from);
2928                    tracing::debug!("received signal {signal:?}");
2929                    match signal? {
2930                        Signal::Stop(reason) => {
2931                            self.change_status(ActorStatus::Stopping);
2932                            actor
2933                                .handle_stop(self, StopMode::Stop, &reason)
2934                                .await
2935                                .map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::processing(err)))?;
2936                        },
2937                        Signal::DrainAndStop(reason) => {
2938                            self.change_status(ActorStatus::Stopping);
2939                            actor
2940                                .handle_stop(self, StopMode::DrainAndStop, &reason)
2941                                .await
2942                                .map_err(|err| ActorError::new(self.self_addr(), ActorErrorKind::processing(err)))?;
2943                        },
2944                        Signal::ChildStopped(uid) => {
2945                            assert!(self.inner.cell.get_child(&uid).is_none());
2946                        },
2947                        Signal::ExitRequested(reason) => {
2948                            break 'messages reason;
2949                        }
2950                        Signal::Kill(reason) => {
2951                            return Err(ActorError { actor_id: Box::new(self.self_addr().clone()), kind: Box::new(ActorErrorKind::Aborted(reason)) });
2952                        }
2953                    }
2954                }
2955                work = work_rx.recv() => {
2956                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
2957                    account_dequeue(&self.inner.cell.inner.queue_depth, &self.inner.proc.state().queue_stats, &actor_id_str);
2958                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
2959                    let work = work.expect("inconsistent work queue state");
2960                    if let Err(err) = work.handle(actor, self).await {
2961                        while let Ok(supervision_event) = supervision_event_receiver.try_recv() {
2962                            self.handle_supervision_event(actor, supervision_event).await?;
2963                        }
2964                        let kind = ActorErrorKind::processing(err);
2965                        return Err(ActorError {
2966                            actor_id: Box::new(self.self_addr().clone()),
2967                            kind: Box::new(kind),
2968                        });
2969                    }
2970                }
2971                _ = self.inner.delayed_posts.notify.notified(), if !self.is_stopping() && !self.inner.delayed_posts.is_draining() => {
2972                }
2973                _ = async {
2974                    match next_delayed_deadline {
2975                        Some(deadline) => tokio::time::sleep_until(deadline).await,
2976                        None => std::future::pending::<()>().await,
2977                    }
2978                }, if !self.is_stopping() && !self.inner.delayed_posts.is_draining() && next_delayed_deadline.is_some() => {
2979                    let now = tokio::time::Instant::now();
2980                    if let Ok(_guard) = self.inner.delayed_posts.ingress.try_enter() {
2981                        for post in self.inner.delayed_posts.pop_due(now) {
2982                            post(self);
2983                        }
2984                    }
2985                }
2986                Some(supervision_event) = supervision_event_receiver.recv() => {
2987                    self.handle_supervision_event(actor, supervision_event).await?;
2988                }
2989            }
2990            self.inner
2991                .cell
2992                .inner
2993                .num_processed_messages
2994                .fetch_add(1, Ordering::SeqCst);
2995        };
2996        tracing::debug!(
2997            actor_id = %self.self_addr(),
2998            reason = stop_reason,
2999            "exited actor loop",
3000        );
3001        Ok(stop_reason)
3002    }
3003
3004    /// Handle a supervision event using the provided actor.
3005    pub async fn handle_supervision_event(
3006        &self,
3007        actor: &mut A,
3008        supervision_event: ActorSupervisionEvent,
3009    ) -> Result<(), ActorError> {
3010        // Handle the supervision event with the current actor.
3011        match actor
3012            .handle_supervision_event(self, &supervision_event)
3013            .await
3014        {
3015            Ok(true) => {
3016                // The supervision event was handled by this actor, nothing more to do.
3017                Ok(())
3018            }
3019            Ok(false) => {
3020                let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
3021                Err(ActorError::new(self.self_addr(), kind))
3022            }
3023            Err(err) => {
3024                // The actor failed to handle the supervision event, it should die.
3025                // Create a new supervision event for this failure and propagate it.
3026                let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
3027                    err.to_string(),
3028                    Box::new(supervision_event),
3029                );
3030                Err(ActorError::new(self.self_addr(), kind))
3031            }
3032        }
3033    }
3034
3035    async unsafe fn handle_message<M: Message>(
3036        &self,
3037        actor: &mut A,
3038        type_info: Option<&'static TypeInfo>,
3039        headers: Flattrs,
3040        message: M,
3041    ) -> Result<(), anyhow::Error>
3042    where
3043        A: Handler<M>,
3044    {
3045        // Build HandlerInfo from TypeInfo (zero-copy) or fall back to type_name.
3046        let handler_info = match type_info {
3047            Some(info) => {
3048                // SAFETY: The caller promises to pass the correct type info.
3049                let arm = unsafe { info.arm_unchecked(&message as *const M as *const ()) };
3050                HandlerInfo::from_static(info.typename(), arm)
3051            }
3052            None => {
3053                // Fall back to std::any::type_name (also static, zero-copy).
3054                HandlerInfo::from_static(std::any::type_name::<M>(), None)
3055            }
3056        };
3057
3058        let endpoint = type_info.and_then(|info| {
3059            // SAFETY: The caller promises to pass the correct type info.
3060            unsafe { info.endpoint_name(&message as *const M as *const ()) }
3061        });
3062
3063        // Use a helper function for a better instrument log.
3064        self.handle_message_with_handler_info(actor, handler_info, headers, message, endpoint)
3065            .await
3066    }
3067
3068    #[tracing::instrument(level = "debug", name = "handle_message", skip_all, fields(message_type = %handler_info))]
3069    async fn handle_message_with_handler_info<M: Message>(
3070        &self,
3071        actor: &mut A,
3072        handler_info: HandlerInfo,
3073        headers: Flattrs,
3074        message: M,
3075        endpoint: Option<String>,
3076    ) -> Result<(), anyhow::Error>
3077    where
3078        A: Handler<M>,
3079    {
3080        let now = std::time::SystemTime::now();
3081        let handler_info = Some(handler_info);
3082        self.change_status(ActorStatus::Processing(now, handler_info.clone()));
3083        crate::mailbox::headers::log_message_latency_if_sampling(
3084            &headers,
3085            self.self_addr().to_string(),
3086        );
3087
3088        let message_id = headers.get(crate::mailbox::headers::TELEMETRY_MESSAGE_ID);
3089
3090        if let Some(message_id) = message_id {
3091            let from_actor_id = headers
3092                .get(crate::mailbox::headers::SENDER_ACTOR_ID_HASH)
3093                .unwrap_or(0);
3094            let to_actor_id = hash_to_u64(self.self_addr());
3095            let port_id = headers.get(crate::mailbox::headers::TELEMETRY_PORT_ID);
3096
3097            notify_message(hyperactor_telemetry::MessageEvent {
3098                timestamp: now,
3099                id: message_id,
3100                from_actor_id,
3101                to_actor_id,
3102                endpoint,
3103                port_id,
3104            });
3105
3106            notify_message_status(hyperactor_telemetry::MessageStatusEvent {
3107                timestamp: now,
3108                id: hyperactor_telemetry::generate_status_event_id(message_id),
3109                message_id,
3110                status: "active".to_string(),
3111            });
3112        }
3113
3114        // Record the message handler being invoked.
3115        *self.inner.cell.inner.last_message_handler.write().unwrap() = handler_info;
3116
3117        let context = Context::new(self, headers);
3118        // Pass a reference to the context to the handler, so that deref
3119        // coercion allows the `this` argument to be treated exactly like
3120        // &Instance<A>.
3121        let start = Instant::now();
3122        let subject_str = self.self_addr().subject().to_string();
3123        let result = actor
3124            .handle(&context, message)
3125            .instrument(self.inner.cell.inner.recording.span(&subject_str))
3126            .await;
3127        let elapsed_us = start.elapsed().as_micros() as u64;
3128        self.inner
3129            .cell
3130            .inner
3131            .total_processing_time_us
3132            .fetch_add(elapsed_us, Ordering::SeqCst);
3133
3134        if let Some(message_id) = message_id {
3135            notify_message_status(hyperactor_telemetry::MessageStatusEvent {
3136                timestamp: std::time::SystemTime::now(),
3137                id: hyperactor_telemetry::generate_status_event_id(message_id),
3138                message_id,
3139                status: "complete".to_string(),
3140            });
3141        }
3142
3143        result
3144    }
3145
3146    /// Spawn on child on this instance.
3147    pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
3148        self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
3149    }
3150
3151    /// Spawn a named child actor on this instance. The child gets a
3152    /// descriptive name in its ActorId instead of inheriting this
3153    /// instance's name. Supervision linkage is preserved.
3154    pub fn spawn_with_name<C: Actor>(
3155        &self,
3156        name: &str,
3157        actor: C,
3158    ) -> anyhow::Result<ActorHandle<C>> {
3159        self.inner
3160            .proc
3161            .spawn_named_child(self.inner.cell.clone(), name, actor)
3162    }
3163
3164    /// Create a new direct child instance.
3165    pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
3166        self.inner.proc.child_instance(self.inner.cell.clone())
3167    }
3168
3169    /// Spawn a registered actor as this instance's child.
3170    ///
3171    /// The actor type is resolved through the remote spawn registry. The child
3172    /// receives an empty environment.
3173    pub async fn gspawn(&self, actor_type: &str, params: Data) -> anyhow::Result<AnyActorHandle> {
3174        self.gspawn_uid(actor_type, crate::id::Uid::anonymous(), params)
3175            .await
3176    }
3177
3178    /// Spawn a registered actor as this instance's child using an explicit uid.
3179    ///
3180    /// The actor type is resolved through the remote spawn registry. The child
3181    /// receives an empty environment.
3182    pub async fn gspawn_uid(
3183        &self,
3184        actor_type: &str,
3185        uid: crate::id::Uid,
3186        params: Data,
3187    ) -> anyhow::Result<AnyActorHandle> {
3188        crate::actor::remote::Remote::global()
3189            .gspawn_child(
3190                &self.inner.proc,
3191                self.inner.cell.clone(),
3192                actor_type,
3193                uid,
3194                params,
3195                Flattrs::default(),
3196            )
3197            .await
3198    }
3199
3200    /// Return a handler port handle representing the actor's message
3201    /// handler for M-typed messages.
3202    pub fn port<M: Message>(&self) -> PortHandle<M>
3203    where
3204        A: Handler<M>,
3205    {
3206        self.inner.ports.get()
3207    }
3208
3209    /// The [`ActorHandle`] corresponding to this instance.
3210    pub fn handle(&self) -> ActorHandle<A> {
3211        ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
3212    }
3213
3214    /// The owning actor ref.
3215    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
3216        self.inner.cell.bind(self.inner.ports.as_ref())
3217    }
3218
3219    // Temporary in order to support python bindings.
3220    #[doc(hidden)]
3221    pub fn mailbox_for_py(&self) -> &Mailbox {
3222        &self.inner.mailbox
3223    }
3224
3225    /// The owning proc.
3226    pub fn proc(&self) -> &Proc {
3227        &self.inner.proc
3228    }
3229
3230    /// Clone this Instance to get an owned struct that can be
3231    /// plumbed through python. This should really only be called
3232    /// for the explicit purpose of being passed into python
3233    #[doc(hidden)]
3234    pub fn clone_for_py(&self) -> Self {
3235        Self {
3236            inner: Arc::clone(&self.inner),
3237        }
3238    }
3239
3240    /// Get the join handle associated with this actor.
3241    fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
3242        self.inner.cell.inner.actor_task_handle.get()
3243    }
3244
3245    /// Return this instance's sequencer.
3246    pub fn sequencer(&self) -> &Sequencer {
3247        &self.inner.sequencer
3248    }
3249
3250    /// Return this instance's ID.
3251    pub fn instance_id(&self) -> Uuid {
3252        self.inner.id
3253    }
3254
3255    /// Return a handle to this instance's parent actor, if it has one.
3256    pub fn parent_handle<P: Actor>(&self) -> Option<ActorHandle<P>> {
3257        let parent_cell = self.inner.cell.inner.parent.upgrade()?;
3258        let ports = if let Ok(ports) = parent_cell.inner.ports.clone().downcast() {
3259            ports
3260        } else {
3261            return None;
3262        };
3263        Some(ActorHandle::new(parent_cell, ports))
3264    }
3265}
3266
3267impl<A: Actor> context::Mailbox for Instance<A> {
3268    fn mailbox(&self) -> &Mailbox {
3269        &self.inner.mailbox
3270    }
3271}
3272
3273impl<A: Actor> context::Mailbox for Context<'_, A> {
3274    fn mailbox(&self) -> &Mailbox {
3275        &self.instance.inner.mailbox
3276    }
3277}
3278
3279impl<A: Actor> context::Mailbox for &Instance<A> {
3280    fn mailbox(&self) -> &Mailbox {
3281        &self.inner.mailbox
3282    }
3283}
3284
3285impl<A: Actor> context::Mailbox for &Context<'_, A> {
3286    fn mailbox(&self) -> &Mailbox {
3287        &self.instance.inner.mailbox
3288    }
3289}
3290
3291impl<A: Actor> context::Actor for Instance<A> {
3292    type A = A;
3293    fn instance(&self) -> &Instance<A> {
3294        self
3295    }
3296}
3297
3298impl<A: Actor> context::Actor for Context<'_, A> {
3299    type A = A;
3300    fn instance(&self) -> &Instance<A> {
3301        self
3302    }
3303}
3304
3305impl<A: Actor> context::Actor for &Instance<A> {
3306    type A = A;
3307    fn instance(&self) -> &Instance<A> {
3308        self
3309    }
3310}
3311
3312impl<A: Actor> context::Actor for &Context<'_, A> {
3313    type A = A;
3314    fn instance(&self) -> &Instance<A> {
3315        self
3316    }
3317}
3318
3319impl<A, M> crate::Endpoint<M> for &Instance<A>
3320where
3321    A: Actor + Handler<M>,
3322    M: Message,
3323{
3324    fn endpoint_location(&self) -> crate::EndpointLocation {
3325        crate::EndpointLocation::Actor(self.self_addr().clone())
3326    }
3327
3328    fn post<C>(self, cx: &C, message: M)
3329    where
3330        C: context::Actor,
3331    {
3332        let port = self.port();
3333        crate::Endpoint::post(&port, cx, message)
3334    }
3335}
3336
3337impl<A, M> crate::Endpoint<M> for &Context<'_, A>
3338where
3339    A: Actor + Handler<M>,
3340    M: Message,
3341{
3342    fn endpoint_location(&self) -> crate::EndpointLocation {
3343        crate::EndpointLocation::Actor(self.self_addr().clone())
3344    }
3345
3346    fn post<C>(self, cx: &C, message: M)
3347    where
3348        C: context::Actor,
3349    {
3350        crate::Endpoint::post(self.instance, cx, message)
3351    }
3352}
3353
3354impl<A, M> crate::Endpoint<M> for Instance<A>
3355where
3356    A: Actor + Handler<M>,
3357    M: Message,
3358{
3359    fn endpoint_location(&self) -> crate::EndpointLocation {
3360        crate::EndpointLocation::Actor(self.self_addr().clone())
3361    }
3362
3363    fn post<C>(self, cx: &C, message: M)
3364    where
3365        C: context::Actor,
3366    {
3367        crate::Endpoint::post(&self, cx, message)
3368    }
3369}
3370
3371impl Instance<()> {
3372    /// See [Mailbox::bind_handler_port] for details.
3373    pub fn bind_handler_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
3374        assert!(
3375            self.actor_task_handle().is_none(),
3376            "can only bind handler port on instance with no running actor task"
3377        );
3378        self.inner.mailbox.bind_handler_port()
3379    }
3380}
3381
3382#[derive(Debug)]
3383enum ActorType {
3384    Named(&'static TypeInfo),
3385    Anonymous(&'static str),
3386}
3387
3388impl ActorType {
3389    fn type_name(&self) -> &str {
3390        match self {
3391            ActorType::Named(info) => info.typename(),
3392            ActorType::Anonymous(name) => name,
3393        }
3394    }
3395}
3396
3397/// InstanceCell contains all of the type-erased, shareable state of an instance.
3398/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
3399/// to access the underlying instance.
3400///
3401/// InstanceCell is reference counted and cloneable.
3402#[derive(Clone)]
3403pub struct InstanceCell {
3404    inner: Arc<InstanceCellState>,
3405}
3406
3407impl fmt::Debug for InstanceCell {
3408    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3409        f.debug_struct("InstanceCell")
3410            .field("actor_id", &self.inner.actor_id)
3411            .field("actor_type", &self.inner.actor_type)
3412            .finish()
3413    }
3414}
3415
3416struct InstanceCellState {
3417    /// The actor's id.
3418    actor_id: ActorAddr,
3419
3420    /// Actor info contains the actor's type information.
3421    actor_type: ActorType,
3422
3423    /// The proc in which the actor is running.
3424    proc: Proc,
3425
3426    /// Control port handles to the actor loop, if one is running.
3427    actor_loop: Option<(
3428        PortHandle<Signal>,
3429        mpsc::UnboundedSender<ActorSupervisionEvent>,
3430    )>,
3431
3432    /// An observer that stores the current status of the actor.
3433    status: watch::Receiver<ActorStatus>,
3434
3435    /// A weak reference to this instance's parent.
3436    parent: WeakInstanceCell,
3437
3438    /// This instance's children by their uids.
3439    children: DashMap<crate::id::Uid, InstanceCell>,
3440
3441    /// Access to the spawned actor's join handle.
3442    actor_task_handle: OnceLock<JoinHandle<()>>,
3443
3444    /// The set of named ports that are exported by this actor.
3445    exported_named_ports: DashMap<u64, &'static str>,
3446
3447    /// The number of messages processed by this actor.
3448    num_processed_messages: AtomicU64,
3449
3450    /// When this actor was created.
3451    created_at: SystemTime,
3452
3453    /// Name of the last message handler invoked.
3454    last_message_handler: RwLock<Option<HandlerInfo>>,
3455
3456    /// Total time spent processing messages, in microseconds.
3457    total_processing_time_us: AtomicU64,
3458
3459    /// Current actor work-queue depth.
3460    ///
3461    /// Two consumers of one accounting path (PD-5e): this field is
3462    /// the introspection-readable state; the OTel
3463    /// `ACTOR_MESSAGE_QUEUE_SIZE` counter is the telemetry export.
3464    /// Both are updated together by `account_enqueue` /
3465    /// `account_dequeue`.
3466    ///
3467    /// Shared with `HandlerPorts<A>`: incremented at enqueue in the send
3468    /// path, decremented when the actor loop receives from `work_rx`.
3469    queue_depth: Arc<AtomicU64>,
3470
3471    /// The log recording associated with this actor. It is used to
3472    /// store a 'flight record' of events while the actor is running.
3473    recording: Recording,
3474
3475    /// Attrs-based introspection data published by the actor. Written
3476    /// by the actor via `Instance::publish_attrs()` /
3477    /// `Instance::publish_attr()`, and read by the introspection
3478    /// runtime handler when building node payloads.
3479    ///
3480    /// This bag may contain both mesh-level keys (`node_type`,
3481    /// `addr`, `num_procs`, ...) and actor-runtime keys (`status`,
3482    /// `messages_processed`, ...).
3483    published_attrs: RwLock<Option<hyperactor_config::Attrs>>,
3484
3485    /// Optional callback for resolving non-addressable children
3486    /// (e.g., system procs). Registered by infrastructure actors
3487    /// like `HostAgent` in `Actor::init`. Invoked by the
3488    /// introspection runtime handler for `QueryChild` messages.
3489    /// `None` means `QueryChild` returns a "not_found" error.
3490    ///
3491    /// See S7 in `introspect` module doc.
3492    query_child_handler: RwLock<Option<Box<dyn (Fn(&Addr) -> IntrospectResult) + Send + Sync>>>,
3493
3494    /// The supervision event for this actor's failure, if any.
3495    /// See FI-1, FI-2 in `introspect` module doc.
3496    supervision_event: std::sync::Mutex<Option<crate::supervision::ActorSupervisionEvent>>,
3497
3498    /// Whether this actor is infrastructure/system (hidden by default
3499    /// in the TUI `s` toggle). Set by spawning code via
3500    /// `Instance::set_system()`.
3501    is_system: AtomicBool,
3502
3503    /// A type-erased reference to HandlerPorts<A>, which allows us to
3504    /// recover an ActorHandle<A> by downcasting.
3505    ports: Arc<dyn Any + Send + Sync>,
3506}
3507
3508impl InstanceCellState {
3509    /// Unlink this instance from its parent, if it has one. If it was unlinked,
3510    /// the parent is returned.
3511    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
3512        self.parent
3513            .upgrade()
3514            .filter(|parent| parent.inner.unlink(self))
3515    }
3516
3517    /// Unlink this instance from a child.
3518    fn unlink(&self, child: &InstanceCellState) -> bool {
3519        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
3520        self.children.remove(child.actor_id.uid()).is_some()
3521    }
3522}
3523
3524/// Select which terminated snapshots to evict when the retention cap
3525/// is exceeded.
3526///
3527/// Each entry is `(actor_id, Option<occurred_at>)` where `Some` means
3528/// the actor has `failure_info` (i.e. it failed), and `None` means a
3529/// clean stop.
3530///
3531/// Eviction priority:
3532/// 1. Cleanly-stopped actors are evicted first (arbitrary order).
3533/// 2. If more evictions are needed, failed actors are evicted
3534///    newest-first (descending `occurred_at`), preserving the
3535///    earliest failures which are closest to the root cause.
3536fn select_eviction_candidates(
3537    entries: &[(ActorAddr, Option<String>)],
3538    excess: usize,
3539) -> Vec<ActorAddr> {
3540    let mut clean: Vec<&ActorAddr> = Vec::new();
3541    let mut failed: Vec<(&ActorAddr, &str)> = Vec::new();
3542    for (id, occurred_at) in entries {
3543        match occurred_at {
3544            Some(ts) => failed.push((id, ts.as_str())),
3545            None => clean.push(id),
3546        }
3547    }
3548
3549    let mut to_remove: Vec<ActorAddr> = Vec::new();
3550    let mut remaining = excess;
3551
3552    // Evict cleanly-stopped first.
3553    for id in clean {
3554        if remaining == 0 {
3555            break;
3556        }
3557        to_remove.push(id.clone());
3558        remaining -= 1;
3559    }
3560
3561    // If still over cap, evict most-recent failures first.
3562    if remaining > 0 {
3563        failed.sort_by(|a, b| b.1.cmp(a.1));
3564        for (id, _) in failed.into_iter().take(remaining) {
3565            to_remove.push(id.clone());
3566        }
3567    }
3568
3569    to_remove
3570}
3571
3572impl InstanceCell {
3573    /// Creates a new instance cell with the provided internal state. If a parent
3574    /// is provided, it is linked to this cell.
3575    fn new(
3576        actor_id: ActorAddr,
3577        actor_type: ActorType,
3578        proc: Proc,
3579        actor_loop: Option<(
3580            PortHandle<Signal>,
3581            mpsc::UnboundedSender<ActorSupervisionEvent>,
3582        )>,
3583        status: watch::Receiver<ActorStatus>,
3584        parent: Option<InstanceCell>,
3585        ports: Arc<dyn Any + Send + Sync>,
3586        queue_depth: Arc<AtomicU64>,
3587    ) -> Self {
3588        let _ais = actor_id.to_string();
3589        let cell = Self {
3590            inner: Arc::new(InstanceCellState {
3591                actor_id: actor_id.clone(),
3592                actor_type,
3593                proc: proc.clone(),
3594                actor_loop,
3595                status,
3596                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
3597                children: DashMap::new(),
3598                actor_task_handle: OnceLock::new(),
3599                exported_named_ports: DashMap::new(),
3600                num_processed_messages: AtomicU64::new(0),
3601                created_at: std::time::SystemTime::now(),
3602                last_message_handler: RwLock::new(None),
3603                total_processing_time_us: AtomicU64::new(0),
3604                queue_depth,
3605                recording: hyperactor_telemetry::recorder().record(64),
3606                published_attrs: RwLock::new(None),
3607                query_child_handler: RwLock::new(None),
3608                supervision_event: std::sync::Mutex::new(None),
3609                is_system: AtomicBool::new(false),
3610                ports,
3611            }),
3612        };
3613        cell.maybe_link_parent();
3614        proc.inner
3615            .instances
3616            .insert(actor_id.id().clone(), cell.downgrade());
3617        cell
3618    }
3619
3620    fn wrap(inner: Arc<InstanceCellState>) -> Self {
3621        Self { inner }
3622    }
3623
3624    /// The actor's address.
3625    pub fn actor_addr(&self) -> &ActorAddr {
3626        &self.inner.actor_id
3627    }
3628
3629    /// The proc in which this actor is running.
3630    pub(crate) fn proc(&self) -> &Proc {
3631        &self.inner.proc
3632    }
3633
3634    /// The actor's uid.
3635    pub(crate) fn uid(&self) -> &crate::id::Uid {
3636        self.inner.actor_id.uid()
3637    }
3638
3639    /// The actor's join handle.
3640    #[allow(dead_code)]
3641    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
3642        self.inner.actor_task_handle.get()
3643    }
3644
3645    /// The instance's status observer.
3646    pub fn status(&self) -> &watch::Receiver<ActorStatus> {
3647        &self.inner.status
3648    }
3649
3650    /// The supervision event stored when this actor failed.
3651    /// `None` for actors that stopped cleanly or are still running.
3652    pub fn supervision_event(&self) -> Option<crate::supervision::ActorSupervisionEvent> {
3653        self.inner.supervision_event.lock().unwrap().clone()
3654    }
3655
3656    fn signal_port(&self) -> PortHandle<Signal> {
3657        self.inner
3658            .actor_loop
3659            .as_ref()
3660            .map(|(signal_port, _)| signal_port.clone())
3661            .unwrap_or_else(|| panic!("{} has no runtime signal port", self.actor_addr()))
3662    }
3663
3664    /// Send a signal to the actor.
3665    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
3666        if let Some((signal_port, _)) = &self.inner.actor_loop {
3667            // A global signal client is used to send signals to the actor.
3668            static CLIENT: OnceLock<(Instance<()>, ActorHandle<()>)> = OnceLock::new();
3669            let client = &CLIENT
3670                .get_or_init(|| Proc::runtime().client("global_signal_client").unwrap())
3671                .0;
3672            signal_port.post(&client, signal);
3673            Ok(())
3674        } else {
3675            tracing::warn!(
3676                "{}: attempted to send signal {} to detached actor",
3677                self.inner.actor_id,
3678                signal
3679            );
3680            Ok(())
3681        }
3682    }
3683
3684    /// Used by this actor's children to send a supervision event to this actor.
3685    /// When it fails to send, we will crash the process. As part of the crash,
3686    /// all the procs and actors running on this process will be terminated
3687    /// forcefully.
3688    ///
3689    /// Note that "let it crash" is the default behavior when a supervision event
3690    /// cannot be delivered upstream. It is the upstream's responsibility to
3691    /// detect and handle crashes.
3692    pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
3693        match &self.inner.actor_loop {
3694            Some((_, supervision_tx)) => {
3695                if let Err(err) = supervision_tx.send(event.clone()) {
3696                    if !event.is_error() {
3697                        // Normal lifecycle events (e.g. clean stop) that fail to
3698                        // send are silently dropped. This happens when a child
3699                        // stops after the parent's mailbox has been closed or its
3700                        // supervision port receiver has been dropped (e.g. client
3701                        // instances created via Proc::client()).
3702                        tracing::debug!(
3703                            "{}: dropping non-error supervision event {}: {:?}",
3704                            self.actor_addr(),
3705                            event,
3706                            err
3707                        );
3708                        return;
3709                    }
3710                    tracing::error!(
3711                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
3712                        self.actor_addr(),
3713                        err
3714                    );
3715                    std::process::exit(1);
3716                }
3717            }
3718            None => {
3719                if !event.is_error() {
3720                    tracing::debug!(
3721                        "{}: dropping non-error supervision event {} to detached actor",
3722                        self.actor_addr(),
3723                        event,
3724                    );
3725                    return;
3726                }
3727                tracing::error!(
3728                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
3729                    self.actor_addr(),
3730                    event,
3731                );
3732                std::process::exit(1);
3733            }
3734        }
3735    }
3736
3737    /// Downgrade this InstanceCell to a weak reference.
3738    pub fn downgrade(&self) -> WeakInstanceCell {
3739        WeakInstanceCell {
3740            inner: Arc::downgrade(&self.inner),
3741        }
3742    }
3743
3744    /// Link this instance to a new child.
3745    fn link(&self, child: InstanceCell) {
3746        assert_eq!(self.actor_addr().proc_id(), child.actor_addr().proc_id());
3747        self.inner.children.insert(child.uid().clone(), child);
3748    }
3749
3750    /// Unlink this instance from a child.
3751    fn unlink(&self, child: &InstanceCell) {
3752        assert_eq!(self.actor_addr().proc_id(), child.actor_addr().proc_id());
3753        self.inner.children.remove(child.uid());
3754    }
3755
3756    /// Unlink this instance from all children.
3757    fn unlink_all(&self) {
3758        self.inner.children.clear();
3759    }
3760
3761    /// Link this instance to its parent, if it has one.
3762    fn maybe_link_parent(&self) {
3763        if let Some(parent) = self.inner.parent.upgrade() {
3764            parent.link(self.clone());
3765        }
3766    }
3767
3768    /// Unlink this instance from its parent, if it has one. If it was unlinked,
3769    /// the parent is returned.
3770    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
3771        self.inner.maybe_unlink_parent()
3772    }
3773
3774    /// Return an iterator over this instance's children. This may deadlock if the
3775    /// caller already holds a reference to any item in map.
3776    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, crate::id::Uid, InstanceCell>> {
3777        self.inner.children.iter()
3778    }
3779
3780    /// The number of children this instance has.
3781    pub fn child_count(&self) -> usize {
3782        self.inner.children.len()
3783    }
3784
3785    /// Returns the ActorAddrs of this instance's direct children.
3786    pub fn child_actor_ids(&self) -> Vec<ActorAddr> {
3787        self.inner
3788            .children
3789            .iter()
3790            .map(|entry| entry.value().actor_addr().clone())
3791            .collect()
3792    }
3793
3794    /// Get a child by its uid.
3795    fn get_child(&self, uid: &crate::id::Uid) -> Option<InstanceCell> {
3796        self.inner.children.get(uid).map(|child| child.clone())
3797    }
3798
3799    /// Access the flight recorder for this actor.
3800    pub fn recording(&self) -> &Recording {
3801        &self.inner.recording
3802    }
3803
3804    /// When this actor was created.
3805    pub fn created_at(&self) -> SystemTime {
3806        self.inner.created_at
3807    }
3808
3809    /// The number of messages processed by this actor.
3810    pub fn num_processed_messages(&self) -> u64 {
3811        self.inner.num_processed_messages.load(Ordering::SeqCst)
3812    }
3813
3814    /// The last message handler invoked by this actor.
3815    pub fn last_message_handler(&self) -> Option<HandlerInfo> {
3816        self.inner.last_message_handler.read().unwrap().clone()
3817    }
3818
3819    /// Total time spent processing messages, in microseconds.
3820    pub fn total_processing_time_us(&self) -> u64 {
3821        self.inner.total_processing_time_us.load(Ordering::SeqCst)
3822    }
3823
3824    /// Current actor work-queue depth (PD-5).
3825    pub fn queue_depth(&self) -> u64 {
3826        self.inner.queue_depth.load(Ordering::Relaxed)
3827    }
3828
3829    /// Get parent instance cell, if it exists.
3830    pub fn parent(&self) -> Option<InstanceCell> {
3831        self.inner.parent.upgrade()
3832    }
3833
3834    /// The actor's type name.
3835    pub fn actor_type_name(&self) -> &str {
3836        self.inner.actor_type.type_name()
3837    }
3838
3839    /// Replace the published introspection attrs with a new bag.
3840    pub fn set_published_attrs(&self, attrs: hyperactor_config::Attrs) {
3841        *self.inner.published_attrs.write().unwrap() = Some(attrs);
3842    }
3843
3844    /// Set a single introspection attr, merging into the existing bag
3845    /// (or creating one if none exists).
3846    pub fn merge_published_attr<T: hyperactor_config::AttrValue>(
3847        &self,
3848        key: hyperactor_config::Key<T>,
3849        value: T,
3850    ) {
3851        self.inner
3852            .published_attrs
3853            .write()
3854            .unwrap()
3855            .get_or_insert_with(hyperactor_config::Attrs::new)
3856            .set(key, value);
3857    }
3858
3859    /// Read the published introspection attrs, if any.
3860    pub fn published_attrs(&self) -> Option<hyperactor_config::Attrs> {
3861        self.inner.published_attrs.read().unwrap().clone()
3862    }
3863
3864    /// Register a callback for resolving non-addressable children
3865    /// via `IntrospectMessage::QueryChild`.
3866    ///
3867    /// The callback runs on the actor's introspect task (a separate
3868    /// tokio task, not the actor's message loop), so it must be
3869    /// `Send + Sync` and must not access actor-mutable state.
3870    /// Capture cloned `Proc` references, not `&mut self`.
3871    pub fn set_query_child_handler(
3872        &self,
3873        handler: impl (Fn(&Addr) -> IntrospectResult) + Send + Sync + 'static,
3874    ) {
3875        *self.inner.query_child_handler.write().unwrap() = Some(Box::new(handler));
3876    }
3877
3878    /// Invoke the registered QueryChild handler, if any.
3879    pub fn query_child(&self, child_ref: &Addr) -> Option<IntrospectResult> {
3880        let guard = self.inner.query_child_handler.read().unwrap();
3881        guard.as_ref().map(|handler| handler(child_ref))
3882    }
3883
3884    /// Whether this actor is infrastructure/system.
3885    pub fn is_system(&self) -> bool {
3886        self.inner.is_system.load(Ordering::Relaxed)
3887    }
3888
3889    /// Store a post-mortem snapshot for this actor in the proc's
3890    /// `terminated_snapshots` map. Called by the introspect task
3891    /// just before exiting on terminal status.
3892    ///
3893    /// Eviction policy when the retention cap is exceeded:
3894    /// 1. Evict cleanly-stopped actors first (no `failure_info`).
3895    /// 2. When only failed actors remain, evict the most recent
3896    ///    (by `occurred_at`), preserving the earliest failures
3897    ///    which are closest to the root cause.
3898    pub fn store_terminated_snapshot(&self, payload: crate::introspect::IntrospectResult) {
3899        let snapshots = &self.inner.proc.inner.terminated_snapshots;
3900        snapshots.insert(
3901            self.actor_addr().id().clone(),
3902            TerminatedSnapshot {
3903                actor_addr: self.actor_addr().clone(),
3904                payload,
3905            },
3906        );
3907        let max = hyperactor_config::global::get(crate::config::TERMINATED_SNAPSHOT_RETENTION);
3908        let excess = snapshots.len().saturating_sub(max);
3909        if excess > 0 {
3910            // Build entries for the eviction selector.
3911            let entries: Vec<_> = snapshots
3912                .iter()
3913                .map(|entry| {
3914                    let occurred_at = serde_json::from_str::<hyperactor_config::Attrs>(
3915                        &entry.value().payload.attrs,
3916                    )
3917                    .ok()
3918                    .and_then(|attrs| {
3919                        // Presence of FAILURE_ERROR_MESSAGE means the actor failed.
3920                        attrs
3921                            .get(crate::introspect::FAILURE_ERROR_MESSAGE)
3922                            .cloned()?;
3923                        // Extract occurred_at timestamp for sorting.
3924                        attrs
3925                            .get(crate::introspect::FAILURE_OCCURRED_AT)
3926                            .map(|t| humantime::format_rfc3339(*t).to_string())
3927                    });
3928                    (entry.value().actor_addr.clone(), occurred_at)
3929                })
3930                .collect();
3931
3932            for key in select_eviction_candidates(&entries, excess) {
3933                snapshots.remove(key.id());
3934            }
3935        }
3936    }
3937
3938    /// This is temporary so that we can share binding code between handle and instance.
3939    /// We should find some (better) way to consolidate the two.
3940    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &HandlerPorts<A>) -> ActorRef<R> {
3941        <R as Binds<A>>::bind(ports);
3942        // Signal: registered directly in Instance::new() and handled
3943        // by the actor loop's select!. The port remains unbound
3944        // because runtime signals are sent through InstanceCell's
3945        // stored PortHandle, not as externally addressable actor
3946        // messages.
3947        //
3948        // Undeliverable: dispatched through the work queue to the
3949        // actor's Handler<Undeliverable<MessageEnvelope>>.
3950        //
3951        // IntrospectMessage: registered directly in Instance::new()
3952        // and handled by a dedicated introspect task.
3953        ports.bind::<Undeliverable<MessageEnvelope>>();
3954        // TODO: consider sharing `ports.bound` directly.
3955        for entry in ports.bound.iter() {
3956            self.inner
3957                .exported_named_ports
3958                .insert(*entry.key(), entry.value());
3959        }
3960        ActorRef::attest(ActorAddr::new(
3961            self.actor_addr().id().clone(),
3962            self.inner.proc.default_location(),
3963        ))
3964    }
3965
3966    /// Attempt to downcast this cell to a concrete actor handle.
3967    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
3968        let ports = Arc::clone(&self.inner.ports)
3969            .downcast::<HandlerPorts<A>>()
3970            .ok()?;
3971        Some(ActorHandle::new(self.clone(), ports))
3972    }
3973
3974    /// Traverse the subtree rooted at this instance in pre-order.
3975    /// The callback receives each InstanceCell and its depth (root = 0).
3976    /// Children are visited in pid order for deterministic traversal.
3977    pub fn traverse<F>(&self, f: &mut F)
3978    where
3979        F: FnMut(&InstanceCell, usize),
3980    {
3981        self.traverse_inner(0, f);
3982    }
3983
3984    fn traverse_inner<F>(&self, depth: usize, f: &mut F)
3985    where
3986        F: FnMut(&InstanceCell, usize),
3987    {
3988        f(self, depth);
3989        // Collect and sort children by uid for deterministic traversal order
3990        let mut children: Vec<_> = self.child_iter().map(|r| r.value().clone()).collect();
3991        children.sort_by_key(|c| c.uid().clone());
3992        for child in children {
3993            child.traverse_inner(depth + 1, f);
3994        }
3995    }
3996}
3997
3998impl Drop for InstanceCellState {
3999    fn drop(&mut self) {
4000        if let Some(parent) = self.maybe_unlink_parent() {
4001            tracing::debug!(
4002                "instance {} was dropped with parent {} still linked",
4003                self.actor_id,
4004                parent.actor_addr()
4005            );
4006        }
4007        if self
4008            .proc
4009            .inner
4010            .instances
4011            .remove(self.actor_id.id())
4012            .is_none()
4013        {
4014            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
4015        }
4016    }
4017}
4018
4019/// A weak version of the InstanceCell. This is used to provide cyclical
4020/// linkage between actors without creating a strong reference cycle.
4021#[derive(Debug, Clone)]
4022pub struct WeakInstanceCell {
4023    inner: Weak<InstanceCellState>,
4024}
4025
4026impl Default for WeakInstanceCell {
4027    fn default() -> Self {
4028        Self::new()
4029    }
4030}
4031
4032impl WeakInstanceCell {
4033    /// Create a new weak instance cell that is never upgradeable.
4034    pub fn new() -> Self {
4035        Self { inner: Weak::new() }
4036    }
4037
4038    /// Upgrade this weak instance cell to a strong reference, if possible.
4039    pub fn upgrade(&self) -> Option<InstanceCell> {
4040        self.inner.upgrade().map(InstanceCell::wrap)
4041    }
4042}
4043
4044/// A polymorphic dictionary that stores runtime-dispatched handler ports.
4045/// The interface memoizes the ports so that they are reused. We do not
4046/// (yet) support stable identifiers across multiple instances of the same
4047/// actor.
4048pub struct HandlerPorts<A: Actor> {
4049    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
4050    bound: DashMap<u64, &'static str>,
4051    mailbox: Mailbox,
4052    workq: OrderedSender<WorkCell<A>>,
4053    /// Per-actor queue depth (PD-5). Shared with `InstanceCellState`.
4054    queue_depth: Arc<AtomicU64>,
4055    /// Proc-level queue-pressure stats (PD-6 through PD-9).
4056    proc_stats: Arc<ProcQueueStats>,
4057}
4058
4059impl<A: Actor> HandlerPorts<A> {
4060    fn new(
4061        mailbox: Mailbox,
4062        workq: OrderedSender<WorkCell<A>>,
4063        queue_depth: Arc<AtomicU64>,
4064        proc_stats: Arc<ProcQueueStats>,
4065    ) -> Self {
4066        Self {
4067            ports: DashMap::new(),
4068            bound: DashMap::new(),
4069            mailbox,
4070            workq,
4071            queue_depth,
4072            proc_stats,
4073        }
4074    }
4075
4076    /// Get a port for the Handler<M> of actor A.
4077    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
4078    where
4079        A: Handler<M>,
4080    {
4081        let key = TypeId::of::<M>();
4082        match self.ports.entry(key) {
4083            Entry::Vacant(entry) => {
4084                // Runtime control-plane ports are provisioned directly, not
4085                // through HandlerPorts, nor wired to the work queue. So they
4086                // should never hit this code path.
4087                assert!(
4088                    !crate::ordering::is_bypass_workq_type_id(key),
4089                    "cannot provision bypass-workq port {} through `Ports::get`; \
4090                     it must be pre-registered via `open_message_port` in `Instance::new`",
4091                    std::any::type_name::<M>()
4092                );
4093
4094                let type_info = TypeInfo::get_by_typeid(key);
4095                let workq = self.workq.clone();
4096                let actor_id = self.mailbox.actor_addr().to_string();
4097                let enqueue_depth = Arc::clone(&self.queue_depth);
4098                let enqueue_proc_stats = Arc::clone(&self.proc_stats);
4099                // Handler-port draining holds an ingress guard while this
4100                // closure runs. Therefore, the drain guarantee depends on this
4101                // closure synchronously finishing all work that it admits into
4102                // the actor work queue before it returns. That includes the
4103                // ordered path: `OrderedSender::send` delivers the current item
4104                // and synchronously flushes any consecutive buffered items that
4105                // the current item unblocks. Messages already held in the
4106                // reorder buffer but still waiting on a future sequence are not
4107                // considered drainable accepted work; after draining begins,
4108                // that missing future sequence is rejected.
4109                let enqueue = move |headers: Flattrs, msg: M| {
4110                    let seq_info = headers.get(SEQ_INFO);
4111
4112                    let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
4113                        Box::pin(async move {
4114                            // SAFETY: we guarantee that the passed type_info is for type M.
4115                            unsafe {
4116                                instance
4117                                    .handle_message(actor, type_info, headers, msg)
4118                                    .await
4119                            }
4120                        })
4121                    });
4122                    // PD-5b: account the enqueue BEFORE handing the work
4123                    // to the queue. Otherwise the consumer can race and
4124                    // call `account_dequeue` before this thread accounts
4125                    // the enqueue, underflowing `running_total`. On send
4126                    // failure, `account_cancel_enqueue` rolls back the
4127                    // counters so `queue_depth` does not drift.
4128                    account_enqueue(&enqueue_depth, &enqueue_proc_stats, &actor_id);
4129                    let result = if workq.enable_buffering {
4130                        match seq_info {
4131                            Some(SeqInfo::Session { session_id, seq }) => {
4132                                // TODO: return the message contained in the error instead of dropping them when converting
4133                                // to anyhow::Error. In that way, the message can be picked up by mailbox and returned to sender.
4134                                workq.send(session_id, seq, work).map_err(|e| match e {
4135                                    OrderedSenderError::InvalidZeroSeq(_) => {
4136                                        let error_msg = format!(
4137                                            "in enqueue func for {}, got seq 0 for message type {}",
4138                                            actor_id,
4139                                            std::any::type_name::<M>(),
4140                                        );
4141                                        tracing::error!(error_msg);
4142                                        anyhow::anyhow!(error_msg)
4143                                    }
4144                                    OrderedSenderError::SendError(e) => anyhow::Error::from(e),
4145                                    OrderedSenderError::FlushError(e) => e,
4146                                })
4147                            }
4148                            Some(SeqInfo::Direct) => {
4149                                workq.direct_send(work).map_err(anyhow::Error::from)
4150                            }
4151                            None => {
4152                                let error_msg = format!(
4153                                    "in enqueue func for {}, buffering is enabled, but SEQ_INFO is not set for message type {}",
4154                                    actor_id,
4155                                    std::any::type_name::<M>(),
4156                                );
4157                                tracing::error!(error_msg);
4158                                Err(anyhow::anyhow!(error_msg))
4159                            }
4160                        }
4161                    } else {
4162                        workq.direct_send(work).map_err(anyhow::Error::from)
4163                    };
4164                    if result.is_err() {
4165                        account_cancel_enqueue(&enqueue_depth, &enqueue_proc_stats, &actor_id);
4166                    }
4167                    result
4168                };
4169                let port = self.mailbox.open_handler_enqueue_port(enqueue);
4170                entry.insert(Box::new(port.clone()));
4171                port
4172            }
4173            Entry::Occupied(entry) => {
4174                let port = entry.get();
4175                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
4176            }
4177        }
4178    }
4179
4180    /// Bind the given message type to its handler port.
4181    pub fn bind<M: RemoteMessage>(&self)
4182    where
4183        A: Handler<M>,
4184    {
4185        let port_index = M::port();
4186        match self.bound.entry(port_index) {
4187            Entry::Vacant(entry) => {
4188                self.get::<M>().bind_handler_port();
4189                entry.insert(M::typename());
4190            }
4191            Entry::Occupied(entry) => {
4192                assert_eq!(
4193                    *entry.get(),
4194                    M::typename(),
4195                    "bind {}: port index {} already bound to type {}",
4196                    M::typename(),
4197                    port_index,
4198                    entry.get(),
4199                );
4200            }
4201        }
4202    }
4203}
4204
4205#[cfg(test)]
4206mod tests {
4207    use std::assert_matches;
4208    use std::sync::atomic::AtomicBool;
4209
4210    use hyperactor_macros::export;
4211    use serde_json::json;
4212    use timed_test::async_timed_test;
4213    use tokio::sync::Barrier;
4214    use tokio::sync::oneshot;
4215    use tracing::Level;
4216    use tracing_subscriber::layer::SubscriberExt;
4217    use tracing_test::internal::logs_with_scope_contain;
4218
4219    use super::*;
4220    // needed for in-crate macro expansion
4221    use crate as hyperactor;
4222    use crate::HandleClient;
4223    use crate::Handler;
4224    use crate::OncePortRef;
4225    use crate::PortRef;
4226    use crate::port::Port;
4227    use crate::testing::proc_supervison::ProcSupervisionCoordinator;
4228    use crate::testing::process_assertion::assert_termination;
4229
4230    #[derive(Debug, Default)]
4231    #[export]
4232    struct TestActor;
4233
4234    impl Actor for TestActor {}
4235
4236    #[derive(Debug)]
4237    struct DelayedSelfActor {
4238        ready: Option<OncePortRef<()>>,
4239        fired: Option<OncePortRef<()>>,
4240        delay: Duration,
4241    }
4242
4243    #[derive(Debug)]
4244    struct DelayedSelfTick;
4245
4246    #[async_trait]
4247    impl Actor for DelayedSelfActor {
4248        async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
4249            if let Some(ready) = self.ready.take() {
4250                ready.post(this, ());
4251            }
4252            this.post_after(this, DelayedSelfTick, self.delay);
4253            Ok(())
4254        }
4255    }
4256
4257    #[async_trait]
4258    impl Handler<DelayedSelfTick> for DelayedSelfActor {
4259        async fn handle(
4260            &mut self,
4261            cx: &crate::Context<Self>,
4262            _message: DelayedSelfTick,
4263        ) -> anyhow::Result<()> {
4264            if let Some(fired) = self.fired.take() {
4265                fired.post(cx, ());
4266            }
4267            Ok(())
4268        }
4269    }
4270
4271    #[derive(Debug)]
4272    struct DelayedPortActor {
4273        reply: Option<PortRef<u64>>,
4274        delay: Duration,
4275    }
4276
4277    #[async_trait]
4278    impl Actor for DelayedPortActor {
4279        async fn init(&mut self, this: &Instance<Self>) -> anyhow::Result<()> {
4280            this.post_after(
4281                self.reply.take().expect("reply port should be present"),
4282                123u64,
4283                self.delay,
4284            );
4285            Ok(())
4286        }
4287    }
4288
4289    #[derive(Handler, HandleClient, Debug)]
4290    enum TestActorMessage {
4291        Reply(oneshot::Sender<()>),
4292        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
4293        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
4294        Noop(),
4295        Fail(anyhow::Error),
4296        Panic(String),
4297        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
4298    }
4299
4300    impl TestActor {
4301        async fn spawn_child(
4302            cx: &impl context::Actor,
4303            parent: &ActorHandle<TestActor>,
4304        ) -> ActorHandle<TestActor> {
4305            let (tx, rx) = oneshot::channel();
4306            parent.post(cx, TestActorMessage::Spawn(tx));
4307            rx.await.unwrap()
4308        }
4309    }
4310
4311    #[test]
4312    fn test_proc_identity_constructors() {
4313        let anonymous = Proc::anonymous();
4314        assert!(
4315            matches!(anonymous.proc_id().uid(), crate::id::Uid::Instance(_, None)),
4316            "anonymous proc must have an unlabeled instance id"
4317        );
4318        assert_eq!(anonymous.proc_id().label(), None);
4319
4320        let instance = Proc::instance("worker");
4321        assert!(
4322            matches!(
4323                instance.proc_id().uid(),
4324                crate::id::Uid::Instance(_, Some(label)) if label.as_str() == "worker"
4325            ),
4326            "instance proc must have a labeled instance id"
4327        );
4328        assert_eq!(
4329            instance.proc_id().label().map(|label| label.as_str()),
4330            Some("worker")
4331        );
4332
4333        let singleton = Proc::singleton("controller");
4334        assert!(
4335            matches!(
4336                singleton.proc_id().uid(),
4337                crate::id::Uid::Singleton(label) if label.as_str() == "controller"
4338            ),
4339            "singleton proc must have a singleton id"
4340        );
4341        assert_eq!(
4342            singleton.proc_id().label().map(|label| label.as_str()),
4343            Some("controller")
4344        );
4345    }
4346
4347    #[async_trait]
4348    #[crate::handle(TestActorMessage)]
4349    impl TestActorMessageHandler for TestActor {
4350        async fn reply(
4351            &mut self,
4352            _cx: &crate::Context<Self>,
4353            sender: oneshot::Sender<()>,
4354        ) -> Result<(), anyhow::Error> {
4355            sender.send(()).unwrap();
4356            Ok(())
4357        }
4358
4359        async fn wait(
4360            &mut self,
4361            _cx: &crate::Context<Self>,
4362            sender: oneshot::Sender<()>,
4363            receiver: oneshot::Receiver<()>,
4364        ) -> Result<(), anyhow::Error> {
4365            sender.send(()).unwrap();
4366            receiver.await.unwrap();
4367            Ok(())
4368        }
4369
4370        async fn forward(
4371            &mut self,
4372            cx: &crate::Context<Self>,
4373            destination: ActorHandle<TestActor>,
4374            message: Box<TestActorMessage>,
4375        ) -> Result<(), anyhow::Error> {
4376            // TODO: this needn't be async
4377            destination.post(cx, *message);
4378            Ok(())
4379        }
4380
4381        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
4382            Ok(())
4383        }
4384
4385        async fn fail(
4386            &mut self,
4387            _cx: &crate::Context<Self>,
4388            err: anyhow::Error,
4389        ) -> Result<(), anyhow::Error> {
4390            Err(err)
4391        }
4392
4393        async fn panic(
4394            &mut self,
4395            _cx: &crate::Context<Self>,
4396            err_msg: String,
4397        ) -> Result<(), anyhow::Error> {
4398            panic!("{}", err_msg);
4399        }
4400
4401        async fn spawn(
4402            &mut self,
4403            cx: &crate::Context<Self>,
4404            reply: oneshot::Sender<ActorHandle<TestActor>>,
4405        ) -> Result<(), anyhow::Error> {
4406            let handle = TestActor.spawn(cx)?;
4407            reply.send(handle).unwrap();
4408            Ok(())
4409        }
4410    }
4411
4412    #[tokio::test]
4413    async fn test_client_instance_can_bind_signal_port() {
4414        let proc = Proc::isolated();
4415        let (client, _) = proc.client("client").unwrap();
4416
4417        let (_signal_port, _signal_rx) = client.bind_handler_port::<Signal>();
4418    }
4419
4420    #[expect(
4421        clippy::await_holding_invalid_type,
4422        reason = "tracing_test::traced_test macro expansion holds tracing::span::Entered across awaits; can't be fixed in our code"
4423    )]
4424    #[tracing_test::traced_test]
4425    #[async_timed_test(timeout_secs = 30)]
4426    async fn test_spawn_actor() {
4427        let proc = Proc::isolated();
4428        let (client, _) = proc.client("client").unwrap();
4429        let handle = proc.spawn("test", TestActor).unwrap();
4430
4431        // Check on the join handle.
4432        assert!(logs_contain(
4433            format!(
4434                "{}: spawned with {:?}",
4435                handle.actor_addr(),
4436                handle.cell().actor_task_handle().unwrap(),
4437            )
4438            .as_str()
4439        ));
4440
4441        let mut state = handle.status().clone();
4442
4443        // Send a ping-pong to the actor. Wait for the actor to become idle.
4444
4445        let (tx, rx) = oneshot::channel::<()>();
4446        handle.post(&client, TestActorMessage::Reply(tx));
4447        rx.await.unwrap();
4448
4449        state
4450            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
4451            .await
4452            .unwrap();
4453
4454        // Make sure we enter processing state while the actor is handling a message.
4455        let (enter_tx, enter_rx) = oneshot::channel::<()>();
4456        let (exit_tx, exit_rx) = oneshot::channel::<()>();
4457
4458        handle.post(&client, TestActorMessage::Wait(enter_tx, exit_rx));
4459        enter_rx.await.unwrap();
4460        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= std::time::SystemTime::now());
4461        exit_tx.send(()).unwrap();
4462
4463        state
4464            .wait_for(|state| matches!(*state, ActorStatus::Idle))
4465            .await
4466            .unwrap();
4467
4468        handle.drain_and_stop("test").unwrap();
4469        handle.await;
4470        assert_matches!(&*state.borrow(), ActorStatus::Stopped(reason) if reason == "test");
4471    }
4472
4473    #[async_timed_test(timeout_secs = 30)]
4474    async fn test_proc_actors_messaging() {
4475        let proc = Proc::isolated();
4476        let (client, _) = proc.client("client").unwrap();
4477        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
4478        let second = proc.spawn::<TestActor>("second", TestActor).unwrap();
4479        let (tx, rx) = oneshot::channel::<()>();
4480        let reply_message = TestActorMessage::Reply(tx);
4481        first.post(
4482            &client,
4483            TestActorMessage::Forward(second, Box::new(reply_message)),
4484        );
4485        rx.await.unwrap();
4486    }
4487
4488    /// Proc ownership is based on `ProcId`, not the routeable
4489    /// `ProcAddr`. A proc may be reached through multiple locations,
4490    /// but a different proc id must still forward even when the
4491    /// location matches.
4492    #[tokio::test]
4493    async fn test_post_routes_by_proc_id() {
4494        use crate::mailbox::monitored_return_handle;
4495        use crate::testing::ids::test_actor_id;
4496
4497        #[derive(Clone)]
4498        struct CountingSender(Arc<AtomicUsize>);
4499
4500        #[async_trait]
4501        impl MailboxSender for CountingSender {
4502            fn post_unchecked(
4503                &self,
4504                _envelope: MessageEnvelope,
4505                _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
4506            ) {
4507                self.0.fetch_add(1, Ordering::SeqCst);
4508            }
4509        }
4510
4511        // Distinct in-process local addresses; `ChannelAddr::any` would
4512        // hand out the same `Local(0)` sentinel both times.
4513        let local_addr = ChannelAddr::Local(1);
4514        let remote_addr = ChannelAddr::Local(2);
4515
4516        let proc_local = ProcAddr::instance(local_addr.clone(), "shared");
4517        let proc_same_id_other_location =
4518            ProcAddr::new(proc_local.id().clone(), remote_addr.into());
4519        let proc_other_id_same_location = ProcAddr::instance(local_addr, "other");
4520        assert_eq!(
4521            proc_local.id(),
4522            proc_same_id_other_location.id(),
4523            "test setup: both procs must share a ProcId"
4524        );
4525        assert_ne!(
4526            proc_local.id(),
4527            proc_other_id_same_location.id(),
4528            "test setup: the remote proc must have a distinct ProcId"
4529        );
4530
4531        let forwarded = Arc::new(AtomicUsize::new(0));
4532        let proc = Proc::configured(
4533            proc_local.clone(),
4534            BoxedMailboxSender::new(CountingSender(forwarded.clone())),
4535        );
4536        let sender = test_actor_id("sender", "client");
4537
4538        // Same ProcId, same location: route locally; the forwarder must not see it.
4539        let local_dest = proc_local.actor_addr("worker").port_addr(Port::from(1234));
4540        proc.post(
4541            MessageEnvelope::new(
4542                sender.clone(),
4543                local_dest,
4544                wirevalue::Any::serialize(&1u64).unwrap(),
4545                Flattrs::new(),
4546            ),
4547            monitored_return_handle(),
4548        );
4549        assert_eq!(forwarded.load(Ordering::SeqCst), 0);
4550
4551        // Same instance ProcId, different location: still local ownership.
4552        let same_id_other_location_dest = proc_same_id_other_location
4553            .actor_addr("worker")
4554            .port_addr(Port::from(1234));
4555        proc.post(
4556            MessageEnvelope::new(
4557                sender.clone(),
4558                same_id_other_location_dest,
4559                wirevalue::Any::serialize(&1u64).unwrap(),
4560                Flattrs::new(),
4561            ),
4562            monitored_return_handle(),
4563        );
4564        assert_eq!(forwarded.load(Ordering::SeqCst), 0);
4565
4566        // Different ProcId, same location: forward.
4567        let other_id_same_location_dest = proc_other_id_same_location
4568            .actor_addr("worker")
4569            .port_addr(Port::from(1234));
4570        proc.post(
4571            MessageEnvelope::new(
4572                sender,
4573                other_id_same_location_dest,
4574                wirevalue::Any::serialize(&1u64).unwrap(),
4575                Flattrs::new(),
4576            ),
4577            monitored_return_handle(),
4578        );
4579        assert_eq!(forwarded.load(Ordering::SeqCst), 1);
4580    }
4581
4582    #[test]
4583    fn test_local_delivery_service_and_local_compare_full_proc_addr() {
4584        for name in [LEGACY_SERVICE_PROC_NAME, LEGACY_LOCAL_PROC_NAME] {
4585            let local = ProcAddr::singleton(ChannelAddr::Local(1), name);
4586            let same_id_other_location = ProcAddr::singleton(ChannelAddr::Local(2), name);
4587            let proc = match name {
4588                LEGACY_SERVICE_PROC_NAME => Proc::legacy_service_pseudo_singleton(
4589                    ChannelAddr::Local(1),
4590                    BoxedMailboxSender::new(PanickingMailboxSender),
4591                ),
4592                LEGACY_LOCAL_PROC_NAME => Proc::legacy_local_pseudo_singleton(
4593                    ChannelAddr::Local(1),
4594                    BoxedMailboxSender::new(PanickingMailboxSender),
4595                ),
4596                _ => unreachable!("test only covers legacy pseudo-singletons"),
4597            };
4598
4599            assert_eq!(local.id(), same_id_other_location.id());
4600            assert!(proc.is_local_delivery_target(&local));
4601            assert!(!proc.is_local_delivery_target(&same_id_other_location));
4602        }
4603
4604        let shared = ProcAddr::singleton(ChannelAddr::Local(1), "shared");
4605        let shared_other_location = ProcAddr::singleton(ChannelAddr::Local(2), "shared");
4606        let proc = Proc::configured(
4607            shared.clone(),
4608            BoxedMailboxSender::new(PanickingMailboxSender),
4609        );
4610        assert!(proc.is_local_delivery_target(&shared_other_location));
4611
4612        let service_instance = ProcAddr::instance(ChannelAddr::Local(1), "service");
4613        let service_instance_other_location =
4614            ProcAddr::new(service_instance.id().clone(), ChannelAddr::Local(2).into());
4615        let proc = Proc::configured(
4616            service_instance,
4617            BoxedMailboxSender::new(PanickingMailboxSender),
4618        );
4619        assert!(proc.is_local_delivery_target(&service_instance_other_location));
4620    }
4621
4622    #[test]
4623    fn test_legacy_pseudo_singletons_use_dedicated_constructors() {
4624        for name in [LEGACY_SERVICE_PROC_NAME, LEGACY_LOCAL_PROC_NAME] {
4625            let result = std::panic::catch_unwind(|| {
4626                Proc::configured(
4627                    ProcAddr::singleton(ChannelAddr::Local(1), name),
4628                    BoxedMailboxSender::new(PanickingMailboxSender),
4629                );
4630            });
4631            assert!(result.is_err());
4632        }
4633
4634        let service = Proc::legacy_service_pseudo_singleton(
4635            ChannelAddr::Local(1),
4636            BoxedMailboxSender::new(PanickingMailboxSender),
4637        );
4638        assert_eq!(
4639            service.proc_addr().id().uid().to_string(),
4640            LEGACY_SERVICE_PROC_NAME
4641        );
4642
4643        let local = Proc::legacy_local_pseudo_singleton(
4644            ChannelAddr::Local(2),
4645            BoxedMailboxSender::new(PanickingMailboxSender),
4646        );
4647        assert_eq!(
4648            local.proc_addr().id().uid().to_string(),
4649            LEGACY_LOCAL_PROC_NAME
4650        );
4651    }
4652
4653    #[tokio::test]
4654    async fn test_mailbox_muxer_delivers_by_actor_id() {
4655        use crate::mailbox::PortLocation;
4656        use crate::mailbox::monitored_return_handle;
4657        use crate::testing::ids::test_actor_id;
4658
4659        let proc = Proc::isolated();
4660        let (instance, _) = proc.client("worker").unwrap();
4661        let (port, mut receiver) = instance.bind_handler_port::<u64>();
4662
4663        let PortLocation::Bound(default_dest) = port.location() else {
4664            panic!("actor port must be bound");
4665        };
4666        let alternate_dest =
4667            PortAddr::new(default_dest.id().clone(), ChannelAddr::Local(9876).into());
4668
4669        proc.post(
4670            MessageEnvelope::serialize(
4671                test_actor_id("sender", "client"),
4672                alternate_dest,
4673                &123u64,
4674                Flattrs::new(),
4675            )
4676            .unwrap(),
4677            monitored_return_handle(),
4678        );
4679
4680        assert_eq!(receiver.recv().await.unwrap(), 123);
4681    }
4682
4683    #[test]
4684    fn test_default_location_changes_new_bindings_not_lookup() {
4685        let proc = Proc::isolated();
4686        let gateway = proc.gateway();
4687        let (_instance, handle) = proc.client("worker").unwrap();
4688
4689        let first_ref: ActorRef<()> = handle.bind();
4690        let new_location = ChannelAddr::Local(9876).into();
4691        gateway.set_default_location(new_location);
4692        let second_ref: ActorRef<()> = handle.bind();
4693
4694        assert_eq!(first_ref.actor_addr().id(), second_ref.actor_addr().id());
4695        assert_ne!(
4696            first_ref.actor_addr().location(),
4697            second_ref.actor_addr().location()
4698        );
4699        assert_eq!(second_ref.actor_addr().location(), &proc.default_location());
4700        assert_eq!(proc.default_location(), gateway.default_location());
4701        assert_eq!(proc.proc_addr(), gateway.proc_addr(proc.proc_id()));
4702        assert!(proc.get_instance(second_ref.actor_addr()).is_some());
4703    }
4704
4705    #[test]
4706    fn test_builder_procs_can_share_gateway_with_distinct_ids() {
4707        let gateway = Gateway::new();
4708        let first = Proc::builder()
4709            .proc_id(ProcId::instance(Label::strip("first")))
4710            .shared_gateway(gateway.clone())
4711            .build()
4712            .unwrap();
4713        let second = Proc::builder()
4714            .proc_id(ProcId::instance(Label::strip("second")))
4715            .shared_gateway(gateway.clone())
4716            .build()
4717            .unwrap();
4718
4719        assert_ne!(first.proc_id(), second.proc_id());
4720        assert_eq!(first.default_location(), second.default_location());
4721
4722        let new_location = ChannelAddr::Local(9876).into();
4723        gateway.set_default_location(new_location);
4724
4725        assert_eq!(first.default_location(), gateway.default_location());
4726        assert_eq!(second.default_location(), gateway.default_location());
4727        assert_eq!(first.proc_addr(), gateway.proc_addr(first.proc_id()));
4728        assert_eq!(second.proc_addr(), gateway.proc_addr(second.proc_id()));
4729    }
4730
4731    #[test]
4732    fn test_isolated_procs_use_distinct_gateways() {
4733        let first = Proc::isolated();
4734        let second = Proc::isolated();
4735        let second_location = second.default_location();
4736
4737        first
4738            .gateway()
4739            .set_default_location(ChannelAddr::Local(9876).into());
4740
4741        assert_ne!(first.proc_id(), second.proc_id());
4742        assert_ne!(first.default_location(), second_location);
4743        assert_eq!(second.default_location(), second_location);
4744    }
4745
4746    #[tokio::test]
4747    async fn test_gateway_serve_updates_location_and_stops() {
4748        use crate::mailbox::PortLocation;
4749        use crate::mailbox::monitored_return_handle;
4750        use crate::testing::ids::test_actor_id;
4751
4752        let proc = Proc::isolated();
4753        let gateway = proc.gateway();
4754        let initial_location = proc.default_location();
4755        let (client, _) = proc.client("client").unwrap();
4756        let (port, mut receiver) = client.bind_handler_port::<u64>();
4757        let PortLocation::Bound(default_dest) = port.location() else {
4758            panic!("handler port must be bound");
4759        };
4760
4761        async fn send_to_location(
4762            location: Location,
4763            default_dest: &PortAddr,
4764            value: u64,
4765            receiver: &mut PortReceiver<u64>,
4766        ) {
4767            let dest = PortAddr::new(default_dest.id().clone(), location.clone());
4768            let sender = MailboxClient::dial(location.addr().clone()).unwrap();
4769            sender.post(
4770                MessageEnvelope::serialize(
4771                    test_actor_id("sender", "client"),
4772                    dest,
4773                    &value,
4774                    Flattrs::new(),
4775                )
4776                .unwrap(),
4777                monitored_return_handle(),
4778            );
4779            sender.flush().await.unwrap();
4780            let received = tokio::time::timeout(Duration::from_secs(5), receiver.recv())
4781                .await
4782                .unwrap()
4783                .unwrap();
4784            assert_eq!(received, value);
4785        }
4786
4787        let server = Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
4788
4789        assert_eq!(proc.default_location(), initial_location);
4790        assert_eq!(proc.default_location(), gateway.default_location());
4791        assert_eq!(proc.proc_addr(), gateway.proc_addr(proc.proc_id()));
4792        send_to_location(initial_location.clone(), &default_dest, 1, &mut receiver).await;
4793
4794        let next_server =
4795            Gateway::serve(&gateway, ChannelAddr::any(ChannelTransport::Local)).unwrap();
4796        let next_location = proc.default_location();
4797
4798        assert_ne!(proc.default_location(), initial_location);
4799        assert_eq!(proc.default_location(), gateway.default_location());
4800        assert_eq!(proc.proc_addr(), gateway.proc_addr(proc.proc_id()));
4801        send_to_location(next_location.clone(), &default_dest, 2, &mut receiver).await;
4802        send_to_location(initial_location.clone(), &default_dest, 3, &mut receiver).await;
4803
4804        next_server.stop("test complete");
4805        next_server.await.unwrap().unwrap();
4806
4807        assert_eq!(proc.default_location(), initial_location);
4808        assert_eq!(proc.default_location(), gateway.default_location());
4809        assert!(MailboxClient::dial(next_location.addr().clone()).is_err());
4810        send_to_location(initial_location.clone(), &default_dest, 4, &mut receiver).await;
4811
4812        server.stop("test complete");
4813        server.await.unwrap().unwrap();
4814
4815        assert_eq!(proc.default_location(), initial_location);
4816        assert_eq!(proc.default_location(), gateway.default_location());
4817        assert!(MailboxClient::dial(initial_location.addr().clone()).is_err());
4818    }
4819
4820    #[tokio::test]
4821    async fn test_direct_proc_server_stops_via_join_mailbox_server() {
4822        let proc = Proc::direct(
4823            ChannelAddr::any(ChannelTransport::Local),
4824            "direct".to_string(),
4825        )
4826        .unwrap();
4827
4828        assert_eq!(proc.proc_addr(), proc.gateway().proc_addr(proc.proc_id()));
4829
4830        proc.join_mailbox_server().await;
4831    }
4832
4833    #[tokio::test]
4834    async fn test_local_only_gateway_returns_undeliverable_messages() {
4835        use crate::testing::ids::test_actor_id;
4836
4837        let proc = Proc::isolated();
4838        let (client, _) = proc.client("client").unwrap();
4839        let (return_handle, mut undeliverable_rx) =
4840            client.open_port::<Undeliverable<MessageEnvelope>>();
4841        let remote_proc = ProcAddr::instance(ChannelAddr::Local(1234), "remote");
4842        let remote_dest = remote_proc.actor_addr("worker").port_addr(Port::from(0));
4843
4844        proc.post(
4845            MessageEnvelope::serialize(
4846                test_actor_id("sender", "client"),
4847                remote_dest.clone(),
4848                &123u64,
4849                Flattrs::new(),
4850            )
4851            .unwrap(),
4852            return_handle,
4853        );
4854
4855        let Undeliverable::Message(envelope) = undeliverable_rx.recv().await.unwrap() else {
4856            panic!("expected returned message");
4857        };
4858        assert_eq!(envelope.dest(), &remote_dest);
4859    }
4860
4861    #[derive(Debug, Default)]
4862    #[export]
4863    struct LookupTestActor;
4864
4865    impl Actor for LookupTestActor {}
4866
4867    #[derive(Handler, HandleClient, Debug)]
4868    enum LookupTestMessage {
4869        ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
4870    }
4871
4872    #[async_trait]
4873    #[crate::handle(LookupTestMessage)]
4874    impl LookupTestMessageHandler for LookupTestActor {
4875        async fn actor_exists(
4876            &mut self,
4877            cx: &crate::Context<Self>,
4878            actor_ref: ActorRef<TestActor>,
4879        ) -> Result<bool, anyhow::Error> {
4880            Ok(actor_ref.downcast_handle(cx).is_some())
4881        }
4882    }
4883
4884    #[async_timed_test(timeout_secs = 30)]
4885    async fn test_actor_lookup() {
4886        let proc = Proc::isolated();
4887        let (client, _handle) = proc.client("client").unwrap();
4888
4889        let target_actor = proc.spawn::<TestActor>("target", TestActor).unwrap();
4890        let target_actor_ref = target_actor.bind();
4891        let lookup_actor = proc
4892            .spawn::<LookupTestActor>("lookup", LookupTestActor)
4893            .unwrap();
4894
4895        assert!(
4896            lookup_actor
4897                .actor_exists(&client, target_actor_ref.clone())
4898                .await
4899                .unwrap()
4900        );
4901
4902        // Make up a child actor. It shouldn't exist.
4903        assert!(
4904            !lookup_actor
4905                .actor_exists(
4906                    &client,
4907                    ActorRef::attest(target_actor.actor_addr().anonymous_child())
4908                )
4909                .await
4910                .unwrap()
4911        );
4912        // A wrongly-typed actor ref should also not obtain.
4913        assert!(
4914            !lookup_actor
4915                .actor_exists(&client, ActorRef::attest(lookup_actor.actor_addr().clone()))
4916                .await
4917                .unwrap()
4918        );
4919
4920        target_actor.drain_and_stop("test").unwrap();
4921        target_actor.await;
4922
4923        assert!(
4924            !lookup_actor
4925                .actor_exists(&client, target_actor_ref)
4926                .await
4927                .unwrap()
4928        );
4929
4930        lookup_actor.drain_and_stop("test").unwrap();
4931        lookup_actor.await;
4932    }
4933
4934    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
4935        assert_eq!(
4936            child.actor_addr().proc_addr(),
4937            parent.actor_addr().proc_addr()
4938        );
4939        assert_eq!(
4940            child.inner.parent.upgrade().unwrap().actor_addr(),
4941            parent.actor_addr()
4942        );
4943        assert_matches!(
4944            parent.inner.children.get(child.uid()),
4945            Some(node) if node.actor_addr() == child.actor_addr()
4946        );
4947    }
4948
4949    #[expect(
4950        clippy::await_holding_invalid_type,
4951        reason = "tracing_test::traced_test macro expansion holds tracing::span::Entered across awaits; can't be fixed in our code"
4952    )]
4953    #[tracing_test::traced_test]
4954    #[async_timed_test(timeout_secs = 30)]
4955    async fn test_spawn_child() {
4956        let proc = Proc::isolated();
4957        let (client, _) = proc.client("client").unwrap();
4958
4959        let first = proc.spawn::<TestActor>("first", TestActor).unwrap();
4960        let second = TestActor::spawn_child(&client, &first).await;
4961        let third = TestActor::spawn_child(&client, &second).await;
4962
4963        // Check we've got the join handles.
4964        assert!(logs_with_scope_contain(
4965            "hyperactor::proc",
4966            format!(
4967                "{}: spawned with {:?}",
4968                first.actor_addr(),
4969                first.cell().actor_task_handle().unwrap()
4970            )
4971            .as_str()
4972        ));
4973        assert!(logs_with_scope_contain(
4974            "hyperactor::proc",
4975            format!(
4976                "{}: spawned with {:?}",
4977                second.actor_addr(),
4978                second.cell().actor_task_handle().unwrap()
4979            )
4980            .as_str()
4981        ));
4982        assert!(logs_with_scope_contain(
4983            "hyperactor::proc",
4984            format!(
4985                "{}: spawned with {:?}",
4986                third.actor_addr(),
4987                third.cell().actor_task_handle().unwrap()
4988            )
4989            .as_str()
4990        ));
4991
4992        // All actors are in the same proc:
4993        assert_eq!(first.actor_addr().proc_addr(), proc.proc_addr());
4994        assert_eq!(second.actor_addr().proc_addr(), proc.proc_addr());
4995        assert_eq!(third.actor_addr().proc_addr(), proc.proc_addr());
4996
4997        // Supervision tree is constructed correctly.
4998        validate_link(third.cell(), second.cell());
4999        validate_link(second.cell(), first.cell());
5000        assert!(first.cell().inner.parent.upgrade().is_none());
5001
5002        // Supervision tree is torn down correctly.
5003        // Once each actor is stopped, it should have no linked children.
5004        let third_cell = third.cell().clone();
5005        third.drain_and_stop("test").unwrap();
5006        third.await;
5007        assert!(third_cell.inner.children.is_empty());
5008        drop(third_cell);
5009        validate_link(second.cell(), first.cell());
5010
5011        let second_cell = second.cell().clone();
5012        second.drain_and_stop("test").unwrap();
5013        second.await;
5014        assert!(second_cell.inner.children.is_empty());
5015        drop(second_cell);
5016
5017        let first_cell = first.cell().clone();
5018        first.drain_and_stop("test").unwrap();
5019        first.await;
5020        assert!(first_cell.inner.children.is_empty());
5021    }
5022
5023    #[async_timed_test(timeout_secs = 30)]
5024    async fn test_child_lifecycle() {
5025        let proc = Proc::isolated();
5026        let (client, _) = proc.client("client").unwrap();
5027
5028        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
5029        let root_1 = TestActor::spawn_child(&client, &root).await;
5030        let root_2 = TestActor::spawn_child(&client, &root).await;
5031        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
5032
5033        root.drain_and_stop("test").unwrap();
5034        root.await;
5035
5036        for actor in [root_1, root_2, root_2_1] {
5037            assert!(
5038                actor
5039                    .port::<TestActorMessage>()
5040                    .try_send(&client, TestActorMessage::Noop())
5041                    .is_err()
5042            );
5043            assert_matches!(actor.await, ActorStatus::Stopped(reason) if reason == "parent stopping");
5044        }
5045    }
5046
5047    #[derive(Debug)]
5048    struct DeferredStopActor {
5049        stop_started: Arc<tokio::sync::Notify>,
5050        release_stop: Arc<tokio::sync::Notify>,
5051    }
5052
5053    #[async_trait]
5054    impl Actor for DeferredStopActor {
5055        async fn handle_stop(
5056            &mut self,
5057            this: &Instance<Self>,
5058            mode: StopMode,
5059            reason: &str,
5060        ) -> Result<(), anyhow::Error> {
5061            let this = this.clone_for_py();
5062            let release_stop = Arc::clone(&self.release_stop);
5063            let reason = reason.to_string();
5064            this.close();
5065            self.stop_started.notify_one();
5066            tokio::spawn(async move {
5067                release_stop.notified().await;
5068                match mode {
5069                    StopMode::Stop => this.exit(&reason).unwrap(),
5070                    StopMode::DrainAndStop => this.exit_after_drain(&reason).unwrap(),
5071                }
5072            });
5073            Ok(())
5074        }
5075    }
5076
5077    #[async_trait]
5078    impl Handler<()> for DeferredStopActor {
5079        async fn handle(&mut self, _cx: &crate::Context<Self>, _message: ()) -> anyhow::Result<()> {
5080            Ok(())
5081        }
5082    }
5083
5084    #[async_timed_test(timeout_secs = 30)]
5085    async fn test_handle_stop_can_defer_exit() {
5086        let proc = Proc::isolated();
5087        let stop_started = Arc::new(tokio::sync::Notify::new());
5088        let release_stop = Arc::new(tokio::sync::Notify::new());
5089        let handle = proc
5090            .spawn(
5091                "deferred_stop",
5092                DeferredStopActor {
5093                    stop_started: Arc::clone(&stop_started),
5094                    release_stop: Arc::clone(&release_stop),
5095                },
5096            )
5097            .unwrap();
5098
5099        let mut status = handle.status();
5100        handle.stop("test").unwrap();
5101        stop_started.notified().await;
5102        status
5103            .wait_for(|state| matches!(state, ActorStatus::Stopping))
5104            .await
5105            .unwrap();
5106
5107        release_stop.notify_one();
5108        assert_matches!(handle.await, ActorStatus::Stopped(reason) if reason == "test");
5109    }
5110
5111    #[async_timed_test(timeout_secs = 30)]
5112    async fn test_drain_and_stop_closes_handler_ingress() {
5113        let proc = Proc::isolated();
5114        let (client, _) = proc.client("client").unwrap();
5115        let stop_started = Arc::new(tokio::sync::Notify::new());
5116        let release_stop = Arc::new(tokio::sync::Notify::new());
5117        let handle = proc
5118            .spawn(
5119                "deferred_drain_stop",
5120                DeferredStopActor {
5121                    stop_started: Arc::clone(&stop_started),
5122                    release_stop: Arc::clone(&release_stop),
5123                },
5124            )
5125            .unwrap();
5126
5127        handle.drain_and_stop("test").unwrap();
5128        stop_started.notified().await;
5129
5130        // Drain closes runtime-dispatched handler ingress, so new
5131        // sends to the actor's handler port are rejected.
5132        let err = handle.port::<()>().try_send(&client, ()).unwrap_err();
5133        assert_matches!(err.kind(), crate::mailbox::MailboxSenderErrorKind::Closed);
5134
5135        release_stop.notify_one();
5136        assert_matches!(handle.await, ActorStatus::Stopped(reason) if reason == "test");
5137    }
5138
5139    #[async_timed_test(timeout_secs = 30)]
5140    async fn test_parent_failure() {
5141        let proc = Proc::isolated();
5142        let (client, _) = proc.client("client").unwrap();
5143        // Need to set a supervison coordinator for this Proc because there will
5144        // be actor failure(s) in this test which trigger supervision.
5145        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
5146
5147        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
5148        let root_1 = TestActor::spawn_child(&client, &root).await;
5149        let root_2 = TestActor::spawn_child(&client, &root).await;
5150        let root_2_1 = TestActor::spawn_child(&client, &root_2).await;
5151
5152        root_2.post(
5153            &client,
5154            TestActorMessage::Fail(anyhow::anyhow!("some random failure")),
5155        );
5156        let _root_2_actor_id = root_2.actor_addr().clone();
5157        assert_matches!(
5158            root_2.await,
5159            ActorStatus::Failed(err) if err.to_string() == "some random failure"
5160        );
5161
5162        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
5163        // stopped by a parent failure?
5164        // Currently the parent fails with an error related to the child's failure.
5165        assert_matches!(
5166            root.await,
5167            ActorStatus::Failed(err) if err.to_string().contains("some random failure")
5168        );
5169        assert_matches!(root_2_1.await, ActorStatus::Stopped(_));
5170        assert_matches!(root_1.await, ActorStatus::Stopped(_));
5171    }
5172
5173    #[async_timed_test(timeout_secs = 30)]
5174    async fn test_multi_handler() {
5175        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
5176        // public interfaces to multi-handlers. This will be fixed shortly.
5177
5178        #[derive(Debug)]
5179        struct TestActor(Arc<AtomicUsize>);
5180
5181        #[async_trait]
5182        impl Actor for TestActor {}
5183
5184        #[async_trait]
5185        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
5186            async fn handle(
5187                &mut self,
5188                cx: &crate::Context<Self>,
5189                message: OncePortHandle<PortHandle<usize>>,
5190            ) -> anyhow::Result<()> {
5191                message.post(cx, cx.port());
5192                Ok(())
5193            }
5194        }
5195
5196        #[async_trait]
5197        impl Handler<usize> for TestActor {
5198            async fn handle(
5199                &mut self,
5200                _cx: &crate::Context<Self>,
5201                message: usize,
5202            ) -> anyhow::Result<()> {
5203                self.0.fetch_add(message, Ordering::SeqCst);
5204                Ok(())
5205            }
5206        }
5207
5208        let proc = Proc::isolated();
5209        let state = Arc::new(AtomicUsize::new(0));
5210        let actor = TestActor(state.clone());
5211        let handle = proc.spawn::<TestActor>("test", actor).unwrap();
5212        let (client, _) = proc.client("client").unwrap();
5213        let (tx, rx) = client.open_once_port();
5214        handle.post(&client, tx);
5215        let usize_handle = rx.recv().await.unwrap();
5216        usize_handle.post(&client, 123);
5217
5218        handle.drain_and_stop("test").unwrap();
5219        handle.await;
5220
5221        assert_eq!(state.load(Ordering::SeqCst), 123);
5222    }
5223
5224    #[async_timed_test(timeout_secs = 30)]
5225    async fn test_post_after_self_message() {
5226        let proc = Proc::isolated();
5227        let (client, _) = proc.client("client").unwrap();
5228        let (ready, ready_rx) = client.open_once_port();
5229        let (fired, fired_rx) = client.open_once_port();
5230        let delay = Duration::from_millis(50);
5231        let start = tokio::time::Instant::now();
5232        let handle = proc
5233            .spawn(
5234                "test",
5235                DelayedSelfActor {
5236                    ready: Some(ready.bind()),
5237                    fired: Some(fired.bind()),
5238                    delay,
5239                },
5240            )
5241            .unwrap();
5242
5243        ready_rx.recv().await.unwrap();
5244        fired_rx.recv().await.unwrap();
5245
5246        assert!(start.elapsed() >= delay);
5247        handle.drain_and_stop("test").unwrap();
5248        handle.await;
5249    }
5250
5251    #[async_timed_test(timeout_secs = 30)]
5252    async fn test_post_after_port_ref() {
5253        let proc = Proc::isolated();
5254        let (client, _) = proc.client("client").unwrap();
5255        let (reply, mut reply_rx) = client.open_port();
5256        let delay = Duration::from_millis(50);
5257        let start = tokio::time::Instant::now();
5258        let handle = proc
5259            .spawn(
5260                "test",
5261                DelayedPortActor {
5262                    reply: Some(reply.bind()),
5263                    delay,
5264                },
5265            )
5266            .unwrap();
5267
5268        assert_eq!(reply_rx.recv().await.unwrap(), 123);
5269        assert!(start.elapsed() >= delay);
5270        handle.drain_and_stop("test").unwrap();
5271        handle.await;
5272    }
5273
5274    #[async_timed_test(timeout_secs = 30)]
5275    async fn test_post_after_discards_pending_messages_on_shutdown() {
5276        let proc = Proc::isolated();
5277        let (client, _) = proc.client("client").unwrap();
5278        let (ready, ready_rx) = client.open_once_port();
5279        let (fired, fired_rx) = client.open_once_port();
5280        let handle = proc
5281            .spawn(
5282                "test",
5283                DelayedSelfActor {
5284                    ready: Some(ready.bind()),
5285                    fired: Some(fired.bind()),
5286                    delay: Duration::from_secs(60),
5287                },
5288            )
5289            .unwrap();
5290
5291        ready_rx.recv().await.unwrap();
5292        handle.drain_and_stop("test").unwrap();
5293        assert_matches!(handle.await, ActorStatus::Stopped(reason) if reason == "test");
5294
5295        let result = tokio::time::timeout(Duration::from_millis(100), fired_rx.recv()).await;
5296        assert!(!matches!(result, Ok(Ok(()))));
5297    }
5298
5299    #[async_timed_test(timeout_secs = 30)]
5300    async fn test_actor_panic() {
5301        // Need this custom hook to store panic backtrace in task_local.
5302        panic_handler::set_panic_hook();
5303
5304        let proc = Proc::isolated();
5305        // Need to set a supervison coordinator for this Proc because there will
5306        // be actor failure(s) in this test which trigger supervision.
5307        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
5308
5309        let (client, _handle) = proc.client("client").unwrap();
5310        let actor_handle = proc.spawn("test", TestActor).unwrap();
5311        actor_handle
5312            .panic(&client, "some random failure".to_string())
5313            .await
5314            .unwrap();
5315        let actor_status = actor_handle.await;
5316
5317        // Note: even when the test passes, the panic stacktrace will still be
5318        // printed to stderr because that is the behavior controlled by the panic
5319        // hook.
5320        assert_matches!(actor_status, ActorStatus::Failed(_));
5321        if let ActorStatus::Failed(err) = actor_status {
5322            let error_msg = err.to_string();
5323            // Verify panic message is captured
5324            assert!(error_msg.contains("some random failure"));
5325            // Verify backtrace is captured. Note the backtrace message might
5326            // change in the future. If that happens, we need to update this
5327            // statement with something up-to-date.
5328            assert!(error_msg.contains("library/std/src/panicking.rs"));
5329        }
5330    }
5331
5332    #[cfg_attr(not(target_os = "linux"), ignore = "linux-only")]
5333    #[async_timed_test(timeout_secs = 30)]
5334    async fn test_local_supervision_propagation() {
5335        hyperactor_telemetry::initialize_logging_for_test();
5336
5337        #[derive(Debug)]
5338        struct TestActor {
5339            handled: Arc<AtomicBool>,
5340            notify: Arc<tokio::sync::Notify>,
5341            should_handle: bool,
5342        }
5343
5344        #[async_trait]
5345        impl Actor for TestActor {
5346            async fn handle_supervision_event(
5347                &mut self,
5348                _this: &Instance<Self>,
5349                _event: &ActorSupervisionEvent,
5350            ) -> Result<bool, anyhow::Error> {
5351                if !self.should_handle {
5352                    return Ok(false);
5353                }
5354
5355                tracing::error!(
5356                    "{}: supervision event received: {:?}",
5357                    _this.self_addr(),
5358                    _event
5359                );
5360                self.handled.store(true, Ordering::SeqCst);
5361                self.notify.notify_one();
5362                Ok(true)
5363            }
5364        }
5365
5366        #[async_trait]
5367        impl Handler<String> for TestActor {
5368            async fn handle(
5369                &mut self,
5370                cx: &crate::Context<Self>,
5371                message: String,
5372            ) -> anyhow::Result<()> {
5373                tracing::info!("{} received message: {}", cx.self_addr(), message);
5374                Err(anyhow::anyhow!(message))
5375            }
5376        }
5377
5378        let make_actor = |handled: &Arc<AtomicBool>, should_handle: bool| TestActor {
5379            handled: handled.clone(),
5380            notify: Arc::new(tokio::sync::Notify::new()),
5381            should_handle,
5382        };
5383
5384        let proc = Proc::isolated();
5385        let (client, _) = proc.client("client").unwrap();
5386        let (mut reported_event, _coordinator) =
5387            ProcSupervisionCoordinator::set(&proc).await.unwrap();
5388
5389        let root_state = Arc::new(AtomicBool::new(false));
5390        let root_1_state = Arc::new(AtomicBool::new(false));
5391        let root_1_notify = Arc::new(tokio::sync::Notify::new());
5392        let root_1_1_state = Arc::new(AtomicBool::new(false));
5393        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
5394        let root_2_state = Arc::new(AtomicBool::new(false));
5395        let root_2_1_state = Arc::new(AtomicBool::new(false));
5396
5397        let root = proc
5398            .spawn::<TestActor>("root", make_actor(&root_state, false))
5399            .unwrap();
5400        let root_1 = proc
5401            .spawn_child::<TestActor>(
5402                root.cell().clone(),
5403                TestActor {
5404                    handled: root_1_state.clone(),
5405                    notify: root_1_notify.clone(),
5406                    should_handle: true, // children's event stops here
5407                },
5408            )
5409            .unwrap();
5410        let root_1_1 = proc
5411            .spawn_child::<TestActor>(root_1.cell().clone(), make_actor(&root_1_1_state, false))
5412            .unwrap();
5413        let root_1_1_1 = proc
5414            .spawn_child::<TestActor>(
5415                root_1_1.cell().clone(),
5416                make_actor(&root_1_1_1_state, false),
5417            )
5418            .unwrap();
5419        let root_2 = proc
5420            .spawn_child::<TestActor>(root.cell().clone(), make_actor(&root_2_state, false))
5421            .unwrap();
5422        let root_2_1 = proc
5423            .spawn_child::<TestActor>(root_2.cell().clone(), make_actor(&root_2_1_state, false))
5424            .unwrap();
5425
5426        // fail `root_1_1_1`, the supervision msg should be propagated to
5427        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
5428        root_1_1_1.post(&client, "some random failure".to_string());
5429
5430        // fail `root_2_1`, the supervision msg should be propagated to
5431        // ProcSupervisionCoordinator.
5432        let root_2_1_id = root_2_1.actor_addr().clone();
5433        root_2_1.post(&client, "some random failure".to_string());
5434
5435        // Wait for root_1 to handle the supervision event from the
5436        // root_1_1_1 -> root_1_1 -> root_1 chain. The Notify provides
5437        // a deterministic signal — no polling or timing needed.
5438        root_1_notify.notified().await;
5439
5440        // Wait for the supervision event from root_2_1's failure to
5441        // reach the ProcSupervisionCoordinator.
5442        let event = reported_event.recv().await;
5443        assert_eq!(event.actor_id, root_2_1_id);
5444
5445        assert!(!root_state.load(Ordering::SeqCst));
5446        assert!(root_1_state.load(Ordering::SeqCst));
5447        assert!(!root_1_1_state.load(Ordering::SeqCst));
5448        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
5449        assert!(!root_2_state.load(Ordering::SeqCst));
5450        assert!(!root_2_1_state.load(Ordering::SeqCst));
5451    }
5452
5453    #[async_timed_test(timeout_secs = 30)]
5454    async fn test_instance() {
5455        #[derive(Debug, Default)]
5456        struct TestActor;
5457
5458        impl Actor for TestActor {}
5459
5460        #[async_trait]
5461        impl Handler<(String, PortRef<String>)> for TestActor {
5462            async fn handle(
5463                &mut self,
5464                cx: &crate::Context<Self>,
5465                (message, port): (String, PortRef<String>),
5466            ) -> anyhow::Result<()> {
5467                port.post(cx, message);
5468                Ok(())
5469            }
5470        }
5471
5472        let proc = Proc::isolated();
5473
5474        let (instance, handle) = proc.client("my_test_actor").unwrap();
5475
5476        let child_actor = TestActor.spawn(&instance).unwrap();
5477
5478        let (port, mut receiver) = instance.open_port();
5479        child_actor.post(&instance, ("hello".to_string(), port.bind()));
5480
5481        let message = receiver.recv().await.unwrap();
5482        assert_eq!(message, "hello");
5483
5484        child_actor.drain_and_stop("test").unwrap();
5485        child_actor.await;
5486
5487        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
5488        drop(instance);
5489        assert_matches!(*handle.status().borrow(), ActorStatus::Stopped(_));
5490        handle.await;
5491    }
5492
5493    // Tokio's I/O driver is not fork-safe on macOS, and this test intentionally
5494    // validates process termination by forking without a coordinator.
5495    #[cfg_attr(target_os = "macos", ignore = "tokio runtime fork assertion on macOS")]
5496    #[tokio::test]
5497    async fn test_proc_terminate_without_coordinator() {
5498        if std::env::var("CARGO_TEST").is_ok() {
5499            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
5500            return;
5501        }
5502
5503        let process = async {
5504            let proc = Proc::isolated();
5505            // Intentionally not setting a proc supervison coordinator. This
5506            // should cause the process to terminate.
5507            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
5508            let root = proc.spawn("root", TestActor).unwrap();
5509            let (client, _handle) = proc.client("client").unwrap();
5510            root.fail(&client, anyhow::anyhow!("some random failure"))
5511                .await
5512                .unwrap();
5513            // It is okay to sleep a long time here, because we expect this
5514            // process to be terminated way before the sleep ends due to the
5515            // missing proc supervison coordinator.
5516            tokio::time::sleep(Duration::from_secs(30)).await;
5517        };
5518
5519        assert_termination(|| process, 1).await.unwrap();
5520    }
5521
5522    fn trace_and_block(fut: impl Future) {
5523        tracing::subscriber::with_default(
5524            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
5525            || {
5526                tokio::runtime::Builder::new_current_thread()
5527                    .enable_all()
5528                    .build()
5529                    .unwrap()
5530                    .block_on(fut)
5531            },
5532        );
5533    }
5534
5535    #[ignore = "until trace recording is turned back on"]
5536    #[test]
5537    fn test_handler_logging() {
5538        #[derive(Debug, Default)]
5539        struct LoggingActor;
5540
5541        impl Actor for LoggingActor {}
5542
5543        impl LoggingActor {
5544            async fn wait(cx: &impl context::Actor, handle: &ActorHandle<Self>) {
5545                let barrier = Arc::new(Barrier::new(2));
5546                handle.post(cx, barrier.clone());
5547                barrier.wait().await;
5548            }
5549        }
5550
5551        #[async_trait]
5552        impl Handler<String> for LoggingActor {
5553            async fn handle(
5554                &mut self,
5555                _cx: &crate::Context<Self>,
5556                message: String,
5557            ) -> anyhow::Result<()> {
5558                tracing::info!("{}", message);
5559                Ok(())
5560            }
5561        }
5562
5563        #[async_trait]
5564        impl Handler<u64> for LoggingActor {
5565            async fn handle(
5566                &mut self,
5567                _cx: &crate::Context<Self>,
5568                message: u64,
5569            ) -> anyhow::Result<()> {
5570                tracing::event!(Level::INFO, number = message);
5571                Ok(())
5572            }
5573        }
5574
5575        #[async_trait]
5576        impl Handler<Arc<Barrier>> for LoggingActor {
5577            async fn handle(
5578                &mut self,
5579                _cx: &crate::Context<Self>,
5580                message: Arc<Barrier>,
5581            ) -> anyhow::Result<()> {
5582                message.wait().await;
5583                Ok(())
5584            }
5585        }
5586
5587        #[async_trait]
5588        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
5589            #[expect(
5590                clippy::await_holding_invalid_type,
5591                reason = "tracing_test::traced_test macro expansion holds tracing::span::Entered across awaits; can't be fixed in our code"
5592            )]
5593            async fn handle(
5594                &mut self,
5595                _cx: &crate::Context<Self>,
5596                barriers: Arc<(Barrier, Barrier)>,
5597            ) -> anyhow::Result<()> {
5598                let inner = tracing::span!(Level::INFO, "child_span");
5599                let _inner_guard = inner.enter();
5600                barriers.0.wait().await;
5601                barriers.1.wait().await;
5602                Ok(())
5603            }
5604        }
5605
5606        trace_and_block(async {
5607            let proc = Proc::isolated();
5608            let (client, _) = proc.client("client").unwrap();
5609            let handle = LoggingActor.spawn_detached().unwrap();
5610            handle.post(&client, "hello world".to_string());
5611            handle.post(&client, "hello world again".to_string());
5612            handle.post(&client, 123u64);
5613
5614            LoggingActor::wait(&client, &handle).await;
5615
5616            let events = handle.cell().inner.recording.tail();
5617            assert_eq!(events.len(), 3);
5618            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
5619            assert_eq!(
5620                events[1].json_value(),
5621                json!({ "message": "hello world again" })
5622            );
5623            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
5624
5625            let stacks = {
5626                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
5627                handle.post(&client, Arc::clone(&barriers));
5628                barriers.0.wait().await;
5629                let stacks = handle.cell().inner.recording.stacks();
5630                barriers.1.wait().await;
5631                stacks
5632            };
5633            assert_eq!(stacks.len(), 1);
5634            assert_eq!(stacks[0].len(), 1);
5635            assert_eq!(stacks[0][0].name(), "child_span");
5636        })
5637    }
5638
5639    #[async_timed_test(timeout_secs = 30)]
5640    async fn test_mailbox_closed_with_owner_stopped_reason() {
5641        let proc = Proc::isolated();
5642        let (client, _) = proc.client("client").unwrap();
5643        let actor_handle = proc.spawn("test", TestActor).unwrap();
5644
5645        // Clone the handle before awaiting since await consumes the handle
5646        let handle_for_send = actor_handle.clone();
5647
5648        // Stop the actor gracefully
5649        actor_handle.drain_and_stop("healthy shutdown").unwrap();
5650        actor_handle.await;
5651
5652        // Try to send a message to the stopped actor
5653        let result = handle_for_send
5654            .port::<TestActorMessage>()
5655            .try_send(&client, TestActorMessage::Noop());
5656
5657        assert!(result.is_err(), "send should fail when actor is stopped");
5658        let err = result.unwrap_err();
5659        assert_matches!(
5660            err.kind(),
5661            crate::mailbox::MailboxSenderErrorKind::Mailbox(mailbox_err)
5662                if matches!(
5663                    mailbox_err.kind(),
5664                    crate::mailbox::MailboxErrorKind::OwnerTerminated(ActorStatus::Stopped(reason)) if reason == "healthy shutdown"
5665                )
5666        );
5667    }
5668
5669    #[async_timed_test(timeout_secs = 30)]
5670    async fn test_mailbox_closed_with_owner_failed_reason() {
5671        let proc = Proc::isolated();
5672        let (client, _) = proc.client("client").unwrap();
5673        // Need to set a supervison coordinator for this Proc because there will
5674        // be actor failure(s) in this test which trigger supervision.
5675        let (_reported, _coordinator) = ProcSupervisionCoordinator::set(&proc).await.unwrap();
5676
5677        let actor_handle = proc.spawn("test", TestActor).unwrap();
5678
5679        // Clone the handle before awaiting since await consumes the handle
5680        let handle_for_send = actor_handle.clone();
5681
5682        // Cause the actor to fail
5683        actor_handle.post(
5684            &client,
5685            TestActorMessage::Fail(anyhow::anyhow!("intentional failure")),
5686        );
5687        actor_handle.await;
5688
5689        // Try to send a message to the failed actor
5690        let result = handle_for_send
5691            .port::<TestActorMessage>()
5692            .try_send(&client, TestActorMessage::Noop());
5693
5694        assert!(result.is_err(), "send should fail when actor has failed");
5695        let err = result.unwrap_err();
5696        assert_matches!(
5697            err.kind(),
5698            crate::mailbox::MailboxSenderErrorKind::Mailbox(mailbox_err)
5699                if matches!(
5700                    mailbox_err.kind(),
5701                    crate::mailbox::MailboxErrorKind::OwnerTerminated(ActorStatus::Failed(ActorErrorKind::Generic(msg)))
5702                        if msg.contains("intentional failure")
5703                )
5704        );
5705    }
5706
5707    /// Wait for a terminated snapshot to appear for the given actor.
5708    /// The introspect task runs in a separate tokio task and may not
5709    /// have stored the snapshot by the time `handle.await` returns.
5710    async fn wait_for_terminated_snapshot(
5711        proc: &Proc,
5712        actor_id: &ActorAddr,
5713    ) -> crate::introspect::IntrospectResult {
5714        // Yield to let the introspect task run, then poll. Use a
5715        // combination of yields (for fast paths) and sleeps (to
5716        // avoid busy-spinning if the scheduler is loaded).
5717        for i in 0..1000 {
5718            if let Some(snapshot) = proc.terminated_snapshot(actor_id) {
5719                return snapshot;
5720            }
5721            if i < 50 {
5722                tokio::task::yield_now().await;
5723            } else {
5724                tokio::time::sleep(Duration::from_millis(50)).await;
5725            }
5726        }
5727        panic!("timed out waiting for terminated snapshot for {}", actor_id);
5728    }
5729
5730    // Verifies that when an actor is stopped, the proc eventually
5731    // records a "terminated snapshot" for it (written by the
5732    // introspect task, which runs asynchronously). The test asserts
5733    // the snapshot is absent while the actor is live, then stops the
5734    // actor, waits for the introspect task to observe the terminal
5735    // state, and confirms:
5736    //   - the stored snapshot reports a `stopped:*` actor_status, and
5737    //   - the actor id moves from the live set to the terminated set.
5738    #[async_timed_test(timeout_secs = 60)]
5739    async fn test_terminated_snapshot_stored_on_stop() {
5740        let proc = Proc::isolated();
5741        let (_client, _client_handle) = proc.client("client").unwrap();
5742
5743        let handle = proc.spawn::<TestActor>("actor", TestActor).unwrap();
5744        let actor_id = handle.actor_addr().clone();
5745
5746        // Actor is live — no terminated snapshot yet.
5747        assert!(proc.terminated_snapshot(&actor_id).is_none());
5748        assert!(!proc.all_terminated_actor_ids().contains(&actor_id));
5749
5750        // Stop the actor and wait for it to fully terminate.
5751        handle.drain_and_stop("test").unwrap();
5752        handle.await;
5753
5754        // The introspect task runs in a separate tokio task; wait for
5755        // it to observe the terminal status and store the snapshot.
5756        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
5757        let attrs: hyperactor_config::Attrs =
5758            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
5759        let status = attrs
5760            .get(crate::introspect::STATUS)
5761            .expect("must have status");
5762        assert!(
5763            status.starts_with("stopped"),
5764            "expected stopped status, got: {}",
5765            status
5766        );
5767
5768        // Actor should appear in terminated IDs but not in live IDs.
5769        assert!(proc.all_terminated_actor_ids().contains(&actor_id));
5770        assert!(
5771            !proc.all_actor_ids().contains(&actor_id),
5772            "stopped actor should not appear in live actor IDs"
5773        );
5774    }
5775
5776    // Verifies that an actor failure results in a terminated snapshot
5777    // being stored. The test installs a ProcSupervisionCoordinator
5778    // (required for failure handling), spawns an actor, triggers a
5779    // failure via a message, waits for the actor to terminate, then
5780    // waits for the introspect task to persist the terminal snapshot
5781    // and asserts the snapshot reports a `failed:*` actor_status.
5782    #[async_timed_test(timeout_secs = 60)]
5783    async fn test_terminated_snapshot_stored_on_failure() {
5784        let proc = Proc::isolated();
5785        let (client, _client_handle) = proc.client("client").unwrap();
5786        // Supervision coordinator required for actor failure handling.
5787        ProcSupervisionCoordinator::set(&proc).await.unwrap();
5788
5789        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
5790        let actor_id = handle.actor_addr().clone();
5791
5792        // Trigger a failure.
5793        handle.post(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")));
5794        handle.await;
5795
5796        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
5797        let attrs: hyperactor_config::Attrs =
5798            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
5799        let status = attrs
5800            .get(crate::introspect::STATUS)
5801            .expect("must have status");
5802        assert!(
5803            status.starts_with("failed"),
5804            "expected failed status, got: {}",
5805            status
5806        );
5807    }
5808
5809    // Exercises FI-1/FI-2 (see introspect.rs module-scope comment).
5810    #[async_timed_test(timeout_secs = 30)]
5811    async fn test_supervision_event_stored_on_failure() {
5812        let proc = Proc::isolated();
5813        let (client, _client_handle) = proc.client("client").unwrap();
5814        ProcSupervisionCoordinator::set(&proc).await.unwrap();
5815
5816        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
5817        let actor_id = handle.actor_addr().clone();
5818        let cell = handle.cell().clone();
5819
5820        handle.post(&client, TestActorMessage::Fail(anyhow::anyhow!("boom")));
5821        handle.await;
5822
5823        let event = cell.supervision_event();
5824        assert!(event.is_some(), "failed actor must have supervision_event");
5825        let event = event.unwrap();
5826        assert_eq!(event.actor_id, actor_id);
5827        assert!(event.actor_status.is_failed());
5828        // Originated here, not propagated.
5829        assert_eq!(event.actually_failing_actor().unwrap().actor_id, actor_id);
5830    }
5831
5832    // Exercises FI-2 (see introspect.rs module-scope comment).
5833    #[async_timed_test(timeout_secs = 30)]
5834    async fn test_supervision_event_on_clean_stop() {
5835        let proc = Proc::isolated();
5836        let (_client, _client_handle) = proc.client("client").unwrap();
5837
5838        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
5839        let cell = handle.cell().clone();
5840
5841        handle.drain_and_stop("test").unwrap();
5842        handle.await;
5843
5844        let event = cell
5845            .supervision_event()
5846            .expect("cleanly stopped actor must have a supervision_event");
5847        assert!(
5848            matches!(event.actor_status, ActorStatus::Stopped(_)),
5849            "expected Stopped status, got {:?}",
5850            event.actor_status
5851        );
5852        assert!(!event.is_error());
5853    }
5854
5855    #[async_timed_test(timeout_secs = 30)]
5856    async fn test_supervision_coordinator_receives_clean_stop() {
5857        let proc = Proc::isolated();
5858        let (_client, _client_handle) = proc.client("client").unwrap();
5859        let (mut reported_event, _coordinator_handle) =
5860            ProcSupervisionCoordinator::set(&proc).await.unwrap();
5861
5862        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
5863        let actor_id = handle.actor_addr().clone();
5864
5865        handle.drain_and_stop("test").unwrap();
5866        handle.await;
5867
5868        let event = reported_event.recv().await;
5869        assert_eq!(event.actor_id, actor_id);
5870        assert!(
5871            matches!(event.actor_status, ActorStatus::Stopped(_)),
5872            "expected Stopped status, got {:?}",
5873            event.actor_status
5874        );
5875        assert!(!event.is_error());
5876    }
5877
5878    #[async_timed_test(timeout_secs = 30)]
5879    async fn test_coordinator_shuts_down_last_during_destroy() {
5880        let mut proc = Proc::isolated();
5881        let (_client, _client_handle) = proc.client("client").unwrap();
5882        let (mut reported_event, _coordinator_handle) =
5883            ProcSupervisionCoordinator::set(&proc).await.unwrap();
5884
5885        // Spawn several actors that will all stop during destroy_and_wait.
5886        let mut actor_ids = Vec::new();
5887        for i in 0..3 {
5888            let handle = proc
5889                .spawn::<TestActor>(&format!("actor_{i}"), TestActor)
5890                .unwrap();
5891            actor_ids.push(handle.actor_addr().clone());
5892        }
5893
5894        // destroy_and_wait stops all actors. If the coordinator were stopped
5895        // simultaneously, supervision event delivery would fail and crash
5896        // the process. The fact that this completes without crashing proves
5897        // the coordinator outlived the other actors.
5898        proc.destroy_and_wait(Duration::from_secs(5), "test")
5899            .await
5900            .unwrap();
5901
5902        // Verify the coordinator received stop events from all three actors.
5903        let mut received_ids = Vec::new();
5904        for _ in 0..actor_ids.len() {
5905            let event = reported_event.recv().await;
5906            assert!(
5907                matches!(event.actor_status, ActorStatus::Stopped(_)),
5908                "expected Stopped, got {:?}",
5909                event.actor_status
5910            );
5911            received_ids.push(event.actor_id);
5912        }
5913        received_ids.sort();
5914        actor_ids.sort();
5915        assert_eq!(received_ids, actor_ids);
5916    }
5917
5918    // Exercises FI-4 (see introspect.rs module-scope comment).
5919    #[async_timed_test(timeout_secs = 30)]
5920    async fn test_supervision_event_on_propagated_failure() {
5921        let proc = Proc::isolated();
5922        let (client, _client_handle) = proc.client("client").unwrap();
5923        ProcSupervisionCoordinator::set(&proc).await.unwrap();
5924
5925        let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
5926        let parent_cell = parent.cell().clone();
5927        // Spawn child under parent.
5928        let (tx, rx) = oneshot::channel();
5929        parent.post(&client, TestActorMessage::Spawn(tx));
5930        let child = rx.await.unwrap();
5931        let child_id = child.actor_addr().clone();
5932
5933        // Fail the child — parent doesn't handle supervision, so it
5934        // propagates and terminates too.
5935        child.post(
5936            &client,
5937            TestActorMessage::Fail(anyhow::anyhow!("child boom")),
5938        );
5939        parent.await;
5940
5941        let event = parent_cell.supervision_event();
5942        assert!(
5943            event.is_some(),
5944            "parent must have supervision_event from propagated failure"
5945        );
5946        let event = event.unwrap();
5947        // Root cause is the child, not the parent.
5948        assert_eq!(event.actually_failing_actor().unwrap().actor_id, child_id);
5949    }
5950
5951    // Exercises S11 (see introspect.rs module doc).
5952    //
5953    // A live actor is resolvable. After drain_and_stop + await, the
5954    // actor's status is terminal and resolve_actor_ref must return
5955    // None — even though the introspect task may still hold a strong
5956    // InstanceCell Arc (it drops the Arc only after observing
5957    // terminal status asynchronously). The is_terminal() check in
5958    // resolve_actor_ref closes that race window.
5959    #[async_timed_test(timeout_secs = 30)]
5960    async fn test_resolve_actor_ref_none_for_terminal_actor() {
5961        let proc = Proc::isolated();
5962        let (_client, _client_handle) = proc.client("client").unwrap();
5963
5964        let handle = proc.spawn::<TestActor>("target", TestActor).unwrap();
5965        let actor_ref: ActorRef<TestActor> = handle.bind();
5966
5967        // Actor is live — resolve should succeed.
5968        assert!(
5969            proc.resolve_actor_ref(&actor_ref).is_some(),
5970            "live actor should be resolvable"
5971        );
5972
5973        handle.drain_and_stop("test").unwrap();
5974        handle.await;
5975
5976        // Actor is terminal — resolve must return None regardless of
5977        // whether the introspect task has dropped its Arc yet.
5978        assert!(
5979            proc.resolve_actor_ref(&actor_ref).is_none(),
5980            "terminal actor must not be resolvable"
5981        );
5982    }
5983
5984    // Exercises FI-3 (see introspect module doc).
5985    #[async_timed_test(timeout_secs = 60)]
5986    async fn test_terminated_snapshot_has_failure_info() {
5987        let proc = Proc::isolated();
5988        let (client, _client_handle) = proc.client("client").unwrap();
5989        ProcSupervisionCoordinator::set(&proc).await.unwrap();
5990
5991        let handle = proc.spawn::<TestActor>("fail_actor", TestActor).unwrap();
5992        let actor_id = handle.actor_addr().clone();
5993
5994        handle.post(&client, TestActorMessage::Fail(anyhow::anyhow!("kaboom")));
5995        handle.await;
5996
5997        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
5998        let attrs: hyperactor_config::Attrs =
5999            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
6000        let status = attrs
6001            .get(crate::introspect::STATUS)
6002            .expect("must have status");
6003        assert!(
6004            status.starts_with("failed"),
6005            "expected failed status, got: {}",
6006            status
6007        );
6008        let err_msg = attrs
6009            .get(crate::introspect::FAILURE_ERROR_MESSAGE)
6010            .expect("failed actor must have failure_error_message");
6011        assert!(!err_msg.is_empty());
6012        let root_cause = attrs
6013            .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
6014            .expect("must have root_cause_actor");
6015        assert_eq!(root_cause, &actor_id);
6016        assert_eq!(
6017            attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
6018            Some(&false)
6019        );
6020        assert!(
6021            attrs.get(crate::introspect::FAILURE_OCCURRED_AT).is_some(),
6022            "failed actor must have occurred_at"
6023        );
6024    }
6025
6026    // Exercises FI-4 (see introspect module doc).
6027    #[async_timed_test(timeout_secs = 60)]
6028    async fn test_propagated_failure_info() {
6029        let proc = Proc::isolated();
6030        let (client, _client_handle) = proc.client("client").unwrap();
6031        ProcSupervisionCoordinator::set(&proc).await.unwrap();
6032
6033        let parent = proc.spawn::<TestActor>("parent", TestActor).unwrap();
6034        let parent_id = parent.actor_addr().clone();
6035
6036        let (tx, rx) = oneshot::channel();
6037        parent.post(&client, TestActorMessage::Spawn(tx));
6038        let child = rx.await.unwrap();
6039        let child_id = child.actor_addr().clone();
6040
6041        child.post(
6042            &client,
6043            TestActorMessage::Fail(anyhow::anyhow!("child fail")),
6044        );
6045        parent.await;
6046
6047        let snapshot = wait_for_terminated_snapshot(&proc, &parent_id).await;
6048        let attrs: hyperactor_config::Attrs =
6049            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
6050        let root_cause = attrs
6051            .get(crate::introspect::FAILURE_ROOT_CAUSE_ACTOR)
6052            .expect("propagated failure must have root_cause_actor");
6053        assert_eq!(root_cause, &child_id);
6054        assert_eq!(
6055            attrs.get(crate::introspect::FAILURE_IS_PROPAGATED),
6056            Some(&true)
6057        );
6058    }
6059
6060    /// Exercises AI-1 (see module doc).
6061    #[async_timed_test(timeout_secs = 30)]
6062    async fn test_spawn_with_name_creates_descriptive_name() {
6063        let proc = Proc::isolated();
6064        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6065        let handle = proc
6066            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
6067            .unwrap();
6068        assert_eq!(
6069            handle.actor_addr().label().unwrap().as_str(),
6070            "my_controller"
6071        );
6072        assert!(!handle.actor_addr().is_root());
6073    }
6074
6075    /// Exercises AI-1 (see module doc).
6076    #[async_timed_test(timeout_secs = 30)]
6077    async fn test_spawn_with_name_increments_index() {
6078        let proc = Proc::isolated();
6079        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6080        let first = proc
6081            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
6082            .unwrap();
6083        let second = proc
6084            .spawn_named_child(root.cell().clone(), "my_controller", TestActor)
6085            .unwrap();
6086        assert_ne!(first.actor_addr().uid(), second.actor_addr().uid());
6087    }
6088
6089    /// Exercises AI-1 (see module doc).
6090    /// spawn_named_child passes Some(parent) to spawn_inner.
6091    #[async_timed_test(timeout_secs = 30)]
6092    async fn test_spawn_with_name_preserves_supervision() {
6093        let proc = Proc::isolated();
6094        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6095        let child = proc
6096            .spawn_named_child(root.cell().clone(), "supervised_child", TestActor)
6097            .unwrap();
6098        let child_cell = child.cell();
6099        let parent = child_cell.parent().expect("named child must have a parent");
6100        assert_eq!(parent.actor_addr(), root.actor_addr());
6101    }
6102
6103    /// Exercises AI-1 (see module doc).
6104    #[async_timed_test(timeout_secs = 30)]
6105    async fn test_spawn_unchanged() {
6106        let proc = Proc::isolated();
6107        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6108        let child = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
6109        assert!(!child.actor_addr().is_root());
6110    }
6111
6112    /// Exercises AI-1 (see module doc).
6113    #[async_timed_test(timeout_secs = 30)]
6114    async fn test_spawn_with_name_different_names_different_pids() {
6115        let proc = Proc::isolated();
6116        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6117        let a = proc
6118            .spawn_named_child(root.cell().clone(), "controller_a", TestActor)
6119            .unwrap();
6120        let b = proc
6121            .spawn_named_child(root.cell().clone(), "controller_b", TestActor)
6122            .unwrap();
6123        assert_ne!(a.actor_addr().uid(), b.actor_addr().uid());
6124        assert_eq!(a.actor_addr().label().unwrap().as_str(), "controller_a");
6125        assert_eq!(b.actor_addr().label().unwrap().as_str(), "controller_b");
6126    }
6127
6128    /// Exercises AI-1 (see module doc).
6129    #[async_timed_test(timeout_secs = 30)]
6130    async fn test_spawn_with_name_no_child_overwrite() {
6131        let proc = Proc::isolated();
6132        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6133        let _a = proc
6134            .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
6135            .unwrap();
6136        let _b = proc
6137            .spawn_named_child(root.cell().clone(), "ctrl", TestActor)
6138            .unwrap();
6139        let _c = proc.spawn_child(root.cell().clone(), TestActor).unwrap();
6140        assert_eq!(root.cell().child_count(), 3);
6141    }
6142
6143    /// Exercises AI-1 (see module doc).
6144    #[async_timed_test(timeout_secs = 30)]
6145    async fn test_spawn_with_name_does_not_pollute_roots() {
6146        let proc = Proc::isolated();
6147        let root = proc.spawn::<TestActor>("root", TestActor).unwrap();
6148        let _child = proc
6149            .spawn_named_child(root.cell().clone(), "foo", TestActor)
6150            .unwrap();
6151        // "foo" was used as a named child name but should NOT
6152        // prevent spawning a root actor with that name.
6153        let result = proc.spawn::<TestActor>("foo", TestActor);
6154        assert!(result.is_ok(), "named child should not pollute roots");
6155    }
6156
6157    /// Exercises AI-3 (see module doc).
6158    #[async_timed_test(timeout_secs = 30)]
6159    async fn test_ai3_controller_actor_ids_unique_across_parents_same_proc() {
6160        let proc = Proc::isolated();
6161        let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
6162        let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
6163
6164        // Simulate the correct pattern: include mesh identity in name.
6165        let ctrl_a = proc
6166            .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
6167            .unwrap();
6168        let ctrl_b = proc
6169            .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
6170            .unwrap();
6171
6172        assert_ne!(
6173            ctrl_a.actor_addr(),
6174            ctrl_b.actor_addr(),
6175            "controller ActorAddrs must be unique across parents"
6176        );
6177    }
6178
6179    /// Exercises AI-3 (see module doc).
6180    #[async_timed_test(timeout_secs = 30)]
6181    async fn test_ai3_no_controller_overwrite_in_parent_or_proc_maps() {
6182        let proc = Proc::isolated();
6183        let parent_a = proc.spawn::<TestActor>("parent_a", TestActor).unwrap();
6184        let parent_b = proc.spawn::<TestActor>("parent_b", TestActor).unwrap();
6185
6186        let ctrl_a = proc
6187            .spawn_named_child(parent_a.cell().clone(), "controller_mesh_a", TestActor)
6188            .unwrap();
6189        let ctrl_b = proc
6190            .spawn_named_child(parent_b.cell().clone(), "controller_mesh_b", TestActor)
6191            .unwrap();
6192
6193        // Both must be independently resolvable via the proc's instances.
6194        assert!(
6195            proc.get_instance(ctrl_a.actor_addr()).is_some(),
6196            "ctrl_a must be resolvable"
6197        );
6198        assert!(
6199            proc.get_instance(ctrl_b.actor_addr()).is_some(),
6200            "ctrl_b must be resolvable"
6201        );
6202        // Parents each see exactly one child.
6203        assert_eq!(parent_a.cell().child_count(), 1);
6204        assert_eq!(parent_b.cell().child_count(), 1);
6205    }
6206
6207    // Exercises FI-6 (see introspect module doc).
6208    #[async_timed_test(timeout_secs = 60)]
6209    async fn test_stopped_snapshot_has_no_failure_info() {
6210        let proc = Proc::isolated();
6211        let (_client, _client_handle) = proc.client("client").unwrap();
6212
6213        let handle = proc.spawn::<TestActor>("stop_actor", TestActor).unwrap();
6214        let actor_id = handle.actor_addr().clone();
6215
6216        handle.drain_and_stop("test").unwrap();
6217        handle.await;
6218
6219        let snapshot = wait_for_terminated_snapshot(&proc, &actor_id).await;
6220        let attrs: hyperactor_config::Attrs =
6221            serde_json::from_str(&snapshot.attrs).expect("attrs must be valid JSON");
6222        let status = attrs
6223            .get(crate::introspect::STATUS)
6224            .expect("must have status");
6225        assert!(
6226            status.starts_with("stopped"),
6227            "expected stopped, got: {}",
6228            status
6229        );
6230        assert!(
6231            attrs
6232                .get(crate::introspect::FAILURE_ERROR_MESSAGE)
6233                .is_none(),
6234            "stopped actor must not have failure attrs"
6235        );
6236    }
6237
6238    // ── PD-5: queue depth accounting ────────────────────────────
6239
6240    // PD-5b/PD-5c: queue depth increments on enqueue, decrements on
6241    // dequeue, and returns to zero after the message is handled. This
6242    // tests that the introspection-readable queue_depth is aligned
6243    // with the existing OTel ACTOR_MESSAGE_QUEUE_SIZE accounting.
6244    #[async_timed_test(timeout_secs = 10)]
6245    async fn test_queue_depth_increment_decrement() {
6246        let proc = Proc::isolated();
6247        let (client, _) = proc.client("client").unwrap();
6248        let handle = proc.spawn("qd_test", TestActor).unwrap();
6249        let actor_ref: crate::ActorRef<TestActor> = handle.bind();
6250        let actor_id = actor_ref.actor_addr().clone();
6251
6252        // Before any message: queue depth should be 0.
6253        let cell = proc.get_instance(&actor_id).expect("actor exists");
6254        assert_eq!(cell.queue_depth(), 0, "initial queue depth should be 0");
6255
6256        // Send a message that blocks until we signal it. This lets
6257        // us observe queue depth > 0 while the actor is busy.
6258        let (reply_tx, reply_rx) = oneshot::channel();
6259        let (gate_tx, gate_rx) = oneshot::channel::<()>();
6260        handle.wait(&client, reply_tx, gate_rx).await.unwrap();
6261
6262        // Wait for the actor to start processing (it sends reply_tx).
6263        reply_rx.await.unwrap();
6264
6265        // Now send a second message — it should be queued.
6266        let (reply2_tx, reply2_rx) = oneshot::channel();
6267        handle.reply(&client, reply2_tx).await.unwrap();
6268
6269        // Give the enqueue a moment to propagate.
6270        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
6271
6272        // Queue depth should be >= 1 (the Reply message is queued).
6273        let depth = cell.queue_depth();
6274        assert!(
6275            depth >= 1,
6276            "expected queue depth >= 1 while actor is busy, got {depth}"
6277        );
6278
6279        // Unblock the first message.
6280        let _ = gate_tx.send(());
6281
6282        // Wait for the second message to be handled.
6283        reply2_rx.await.unwrap();
6284
6285        // Give the dequeue a moment to propagate.
6286        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
6287
6288        // Queue depth should return to 0.
6289        let depth = cell.queue_depth();
6290        assert_eq!(
6291            depth, 0,
6292            "queue depth should return to 0 after all messages handled"
6293        );
6294    }
6295
6296    // PD-4/PD-5: proc-level queue pressure aggregation reports
6297    // non-zero under induced load. Queue depth is an instantaneous
6298    // snapshot of currently queued work, not backlog history.
6299    #[async_timed_test(timeout_secs = 10)]
6300    async fn test_proc_queue_depth_aggregation_under_pressure() {
6301        let proc = Proc::isolated();
6302        let (client, _) = proc.client("client").unwrap();
6303
6304        // Spawn two actors.
6305        let h1 = proc.spawn("a1", TestActor).unwrap();
6306        let h2 = proc.spawn("a2", TestActor).unwrap();
6307
6308        // Block both actors with a Wait message.
6309        let (reply1, rx1) = oneshot::channel();
6310        let (gate1, grx1) = oneshot::channel::<()>();
6311        h1.wait(&client, reply1, grx1).await.unwrap();
6312        rx1.await.unwrap();
6313
6314        let (reply2, rx2) = oneshot::channel();
6315        let (gate2, grx2) = oneshot::channel::<()>();
6316        h2.wait(&client, reply2, grx2).await.unwrap();
6317        rx2.await.unwrap();
6318
6319        // Queue additional messages while actors are blocked.
6320        h1.noop(&client).await.unwrap();
6321        h1.noop(&client).await.unwrap();
6322        h2.noop(&client).await.unwrap();
6323
6324        // Poll until aggregated queue depth reaches the expected
6325        // level, with a bounded timeout to avoid flakes.
6326        let aggregate = || -> (u64, u64) {
6327            let mut total: u64 = 0;
6328            let mut max: u64 = 0;
6329            for actor_id in proc.all_instance_keys() {
6330                if let Some(cell) = proc.get_instance_by_id(&actor_id) {
6331                    let depth = cell.queue_depth();
6332                    total = total.saturating_add(depth);
6333                    max = max.max(depth);
6334                }
6335            }
6336            (total, max)
6337        };
6338
6339        // Same aggregation logic used by
6340        // ProcAgent::publish_introspect_properties.
6341        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6342        loop {
6343            let (total, max) = aggregate();
6344            if total >= 3 {
6345                assert!(max >= 1, "expected max >= 1, got {max}");
6346                assert!(max <= total, "PD-1: max ({max}) <= total ({total})");
6347                break;
6348            }
6349            assert!(
6350                tokio::time::Instant::now() < deadline,
6351                "timed out waiting for queue depth >= 3, got {total}",
6352            );
6353            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6354        }
6355
6356        // Unblock both actors.
6357        let _ = gate1.send(());
6358        let _ = gate2.send(());
6359
6360        // Poll until aggregated depth returns to 0.
6361        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6362        loop {
6363            let (total, _) = aggregate();
6364            if total == 0 {
6365                break;
6366            }
6367            assert!(
6368                tokio::time::Instant::now() < deadline,
6369                "timed out waiting for queue depth to return to 0, got {total}",
6370            );
6371            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6372        }
6373    }
6374
6375    // ── PD-6 through PD-9: retained queue-pressure evidence ───
6376
6377    // PD-7: cold start — no queue traffic means last-nonzero is None
6378    // and watermark is 0.
6379    #[async_timed_test(timeout_secs = 5)]
6380    async fn test_retained_queue_stats_cold_start() {
6381        let proc = Proc::isolated();
6382        assert_eq!(proc.queue_depth_total(), 0);
6383        assert_eq!(proc.queue_depth_high_water_mark(), 0);
6384        assert_eq!(proc.last_nonzero_queue_depth_age_ms(), None);
6385    }
6386
6387    // PD-6/PD-8: after induced pressure drains, high-water mark
6388    // retains the peak and last-nonzero is Some.
6389    #[async_timed_test(timeout_secs = 10)]
6390    async fn test_retained_queue_stats_burst_then_drain() {
6391        let proc = Proc::isolated();
6392        let (client, _) = proc.client("client").unwrap();
6393        let h = proc.spawn("ret_test", TestActor).unwrap();
6394
6395        // Block the actor.
6396        let (ready_tx, ready_rx) = oneshot::channel();
6397        let (gate_tx, gate_rx) = oneshot::channel::<()>();
6398        h.wait(&client, ready_tx, gate_rx).await.unwrap();
6399        ready_rx.await.unwrap();
6400
6401        // Queue work behind it.
6402        h.noop(&client).await.unwrap();
6403        h.noop(&client).await.unwrap();
6404
6405        // Poll until watermark is updated.
6406        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6407        loop {
6408            let hwm = proc.queue_depth_high_water_mark();
6409            if hwm >= 2 {
6410                // PD-6: watermark >= current total.
6411                assert!(hwm >= proc.queue_depth_total());
6412                // Active pressure: last-nonzero should be near zero.
6413                let age = proc.last_nonzero_queue_depth_age_ms();
6414                assert!(
6415                    age.is_some(),
6416                    "last-nonzero should be Some while pressure is active"
6417                );
6418                assert!(age.unwrap() < 2000, "last-nonzero age should be near zero");
6419                break;
6420            }
6421            assert!(
6422                tokio::time::Instant::now() < deadline,
6423                "timed out waiting for watermark >= 2",
6424            );
6425            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6426        }
6427
6428        // Unblock and drain.
6429        let _ = gate_tx.send(());
6430        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
6431        loop {
6432            if proc.queue_depth_total() == 0 {
6433                break;
6434            }
6435            assert!(
6436                tokio::time::Instant::now() < deadline,
6437                "timed out waiting for total to drain",
6438            );
6439            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6440        }
6441
6442        // PD-8: watermark retained after drain.
6443        assert!(
6444            proc.queue_depth_high_water_mark() >= 2,
6445            "watermark should retain the peak after drain",
6446        );
6447
6448        // PD-7: last-nonzero is Some (not None) after pressure.
6449        let age = proc.last_nonzero_queue_depth_age_ms();
6450        assert!(age.is_some(), "last-nonzero should be Some after pressure");
6451    }
6452
6453    // PD-7: deterministic test of dequeue-side timestamp refresh
6454    // using a fake clock. Proves "last observed non-zero" semantics
6455    // without timing-dependent sleeps.
6456    #[test]
6457    fn test_last_nonzero_refreshed_on_dequeue_deterministic() {
6458        use std::sync::atomic::AtomicU64;
6459
6460        static FAKE_NOW: AtomicU64 = AtomicU64::new(0);
6461        fn fake_clock() -> u64 {
6462            FAKE_NOW.load(Ordering::Relaxed)
6463        }
6464
6465        let stats = ProcQueueStats::with_clock(fake_clock);
6466        let depth = Arc::new(AtomicU64::new(0));
6467
6468        // Cold start: no activity.
6469        assert_eq!(stats.last_nonzero_age_ms(), None);
6470
6471        // t=1000: enqueue two items.
6472        FAKE_NOW.store(1000, Ordering::Relaxed);
6473        account_enqueue(&depth, &stats, "a");
6474        account_enqueue(&depth, &stats, "a");
6475        assert_eq!(stats.running_total(), 2);
6476        assert_eq!(stats.high_water_mark(), 2);
6477
6478        // t=2000: read age — should be 1000ms since last nonzero.
6479        FAKE_NOW.store(2000, Ordering::Relaxed);
6480        assert_eq!(stats.last_nonzero_age_ms(), Some(1000));
6481
6482        // t=3000: dequeue one item. Queue still non-zero (1 left).
6483        // This should refresh the timestamp to 3000.
6484        FAKE_NOW.store(3000, Ordering::Relaxed);
6485        account_dequeue(&depth, &stats, "a");
6486        assert_eq!(stats.running_total(), 1);
6487
6488        // t=4000: read age — should be 1000ms (4000 - 3000), not
6489        // 3000ms (4000 - 1000). This proves the dequeue refreshed
6490        // the timestamp.
6491        FAKE_NOW.store(4000, Ordering::Relaxed);
6492        assert_eq!(stats.last_nonzero_age_ms(), Some(1000));
6493
6494        // t=5000: dequeue last item. Queue is now zero.
6495        // prev_total was 1, so prev_total > 1 is false — timestamp
6496        // is NOT refreshed. It stays at 3000.
6497        FAKE_NOW.store(5000, Ordering::Relaxed);
6498        account_dequeue(&depth, &stats, "a");
6499        assert_eq!(stats.running_total(), 0);
6500
6501        // t=6000: age should be 3000ms (6000 - 3000).
6502        FAKE_NOW.store(6000, Ordering::Relaxed);
6503        assert_eq!(stats.last_nonzero_age_ms(), Some(3000));
6504
6505        // Watermark retained.
6506        assert_eq!(stats.high_water_mark(), 2);
6507    }
6508
6509    // account_cancel_enqueue must symmetrically reverse
6510    // account_enqueue on queue_depth and running_total so that a
6511    // send failure after accounting cannot leave the proc-wide
6512    // counter at u64::MAX (which would panic the next enqueue via
6513    // the `fetch_add(1) + 1` path).
6514    #[test]
6515    fn test_account_cancel_enqueue_restores_counters() {
6516        let stats = ProcQueueStats::new();
6517        let depth = Arc::new(AtomicU64::new(0));
6518
6519        account_enqueue(&depth, &stats, "a");
6520        assert_eq!(stats.running_total(), 1);
6521        assert_eq!(depth.load(Ordering::Relaxed), 1);
6522
6523        account_cancel_enqueue(&depth, &stats, "a");
6524        assert_eq!(
6525            stats.running_total(),
6526            0,
6527            "cancel must restore running_total"
6528        );
6529        assert_eq!(
6530            depth.load(Ordering::Relaxed),
6531            0,
6532            "cancel must restore queue_depth"
6533        );
6534
6535        // high_water_mark is monotonic by design; cancel does not reset it.
6536        assert_eq!(stats.high_water_mark(), 1);
6537
6538        // A subsequent enqueue must not observe underflow: fetch_add(1) + 1
6539        // would panic in debug builds if running_total had wrapped to u64::MAX.
6540        account_enqueue(&depth, &stats, "a");
6541        assert_eq!(stats.running_total(), 1);
6542    }
6543}