hyperactor_mesh/
proc_agent.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The mesh agent actor manages procs in ProcMeshes.
10
11// EnumAsInner generates code that triggers a false positive
12// unused_assignments lint on struct variant fields. #[allow] on the
13// enum itself doesn't propagate into derive-macro-generated code, so
14// the suppression must be at module scope.
15#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::mem::take;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::sync::RwLock;
22use std::sync::RwLockReadGuard;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use enum_as_inner::EnumAsInner;
27use hyperactor::Actor;
28use hyperactor::ActorHandle;
29use hyperactor::Bind;
30use hyperactor::Context;
31use hyperactor::Data;
32use hyperactor::HandleClient;
33use hyperactor::Handler;
34use hyperactor::Instance;
35use hyperactor::PortHandle;
36use hyperactor::RefClient;
37use hyperactor::Unbind;
38use hyperactor::actor::handle_undeliverable_message;
39use hyperactor::actor::remote::Remote;
40use hyperactor::channel;
41use hyperactor::channel::ChannelAddr;
42use hyperactor::mailbox::BoxedMailboxSender;
43use hyperactor::mailbox::DialMailboxRouter;
44use hyperactor::mailbox::IntoBoxedMailboxSender;
45use hyperactor::mailbox::MailboxClient;
46use hyperactor::mailbox::MailboxSender;
47use hyperactor::mailbox::MessageEnvelope;
48use hyperactor::mailbox::Undeliverable;
49use hyperactor::proc::Proc;
50use hyperactor::reference as hyperactor_reference;
51use hyperactor::supervision::ActorSupervisionEvent;
52use hyperactor_config::CONFIG;
53use hyperactor_config::ConfigAttr;
54use hyperactor_config::Flattrs;
55use hyperactor_config::attrs::declare_attrs;
56use serde::Deserialize;
57use serde::Serialize;
58use typeuri::Named;
59
60use crate::Name;
61use crate::config_dump::ConfigDump;
62use crate::config_dump::ConfigDumpResult;
63use crate::pyspy::PySpyDump;
64use crate::pyspy::PySpyWorker;
65use crate::resource;
66
67/// Actor name used when spawning the proc agent on user procs.
68pub const PROC_AGENT_ACTOR_NAME: &str = "proc_agent";
69
70declare_attrs! {
71    /// Whether to self kill actors, procs, and hosts whose owner is not reachable.
72    @meta(CONFIG = ConfigAttr::new(
73        Some("HYPERACTOR_MESH_ORPHAN_TIMEOUT".to_string()),
74        Some("mesh_orphan_timeout".to_string()),
75    ))
76    pub attr MESH_ORPHAN_TIMEOUT: Duration = Duration::from_secs(60);
77
78    /// Header tag for StreamState subscriber messages. When present on an
79    /// undeliverable envelope, ProcAgent removes the dead subscriber instead
80    /// of treating it as an error.
81    attr STREAM_STATE_SUBSCRIBER: bool;
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
85pub enum GspawnResult {
86    Success {
87        rank: usize,
88        actor_id: hyperactor_reference::ActorId,
89    },
90    Error(String),
91}
92wirevalue::register_type!(GspawnResult);
93
94/// Deferred republish of introspect properties.
95///
96/// Sent as a zero-delay self-message from the supervision event
97/// handler so it returns immediately without blocking the ProcAgent
98/// message loop. Multiple rapid supervision events (e.g., 4 actors
99/// failing simultaneously via broadcast) coalesce into a single
100/// republish via the `introspect_dirty` flag.
101///
102/// Without this, calling `publish_introspect_properties` inline in
103/// the supervision handler starves `GetRankStatus` polls from the
104/// `ActorMeshController`, preventing `__supervise__` from firing
105/// within the test timeout. See D94960791 for the root cause
106/// analysis.
107#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
108struct RepublishIntrospect;
109wirevalue::register_type!(RepublishIntrospect);
110
111/// Collect live actor children and system actor children from the
112/// proc's instance DashMap using `all_instance_keys()` with point
113/// lookups. This avoids the convoy starvation from `all_actor_ids()`
114/// which holds shard read locks while doing heavy per-entry work.
115/// See S12 in `introspect` module doc.
116fn collect_live_children(
117    proc: &hyperactor::Proc,
118) -> (
119    Vec<hyperactor::introspect::IntrospectRef>,
120    Vec<crate::introspect::NodeRef>,
121) {
122    let all_keys = proc.all_instance_keys();
123    let mut children = Vec::with_capacity(all_keys.len());
124    let mut system_children = Vec::new();
125    for id in all_keys {
126        if let Some(cell) = proc.get_instance(&id) {
127            if cell.is_system() {
128                system_children.push(crate::introspect::NodeRef::Actor(id.clone()));
129            }
130            children.push(hyperactor::introspect::IntrospectRef::Actor(id));
131        }
132    }
133    (children, system_children)
134}
135
136#[derive(
137    Debug,
138    Clone,
139    PartialEq,
140    Serialize,
141    Deserialize,
142    Handler,
143    HandleClient,
144    RefClient,
145    Named
146)]
147pub(crate) enum MeshAgentMessage {
148    /// Configure the proc in the mesh.
149    Configure {
150        /// The rank of this proc in the mesh.
151        rank: usize,
152        /// The forwarder to send messages to unknown destinations.
153        forwarder: ChannelAddr,
154        /// The supervisor port to which the agent should report supervision events.
155        supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
156        /// An address book to use for direct dialing.
157        address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
158        /// The agent should write its rank to this port when it successfully
159        /// configured.
160        configured: hyperactor_reference::PortRef<usize>,
161        /// If true, and supervisor is None, record supervision events to be reported
162        record_supervision_events: bool,
163    },
164
165    Status {
166        /// The status of the proc.
167        /// To be replaced with fine-grained lifecycle status,
168        /// and to use aggregation.
169        status: hyperactor_reference::PortRef<(usize, bool)>,
170    },
171
172    /// Spawn an actor on the proc to the provided name.
173    Gspawn {
174        /// registered actor type
175        actor_type: String,
176        /// spawned actor name
177        actor_name: String,
178        /// serialized parameters
179        params_data: Data,
180        /// reply port; the proc should send its rank to indicated a spawned actor
181        status_port: hyperactor_reference::PortRef<GspawnResult>,
182    },
183}
184
185/// Internal configuration state of the mesh agent.
186#[derive(Debug, EnumAsInner, Default)]
187enum State {
188    UnconfiguredV0 {
189        sender: ReconfigurableMailboxSender,
190    },
191
192    ConfiguredV0 {
193        sender: ReconfigurableMailboxSender,
194        rank: usize,
195        supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
196    },
197
198    V1,
199
200    #[default]
201    Invalid,
202}
203
204impl State {
205    fn rank(&self) -> Option<usize> {
206        match self {
207            State::ConfiguredV0 { rank, .. } => Some(*rank),
208            _ => None,
209        }
210    }
211
212    fn supervisor(&self) -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
213        match self {
214            State::ConfiguredV0 { supervisor, .. } => supervisor.clone(),
215            _ => None,
216        }
217    }
218}
219
220/// Actor state used for v1 API.
221#[derive(Debug)]
222struct ActorInstanceState {
223    create_rank: usize,
224    spawn: Result<hyperactor_reference::ActorId, anyhow::Error>,
225    /// True once a stop signal has been sent. This does *not* mean the actor
226    /// has reached a terminal state — that is determined by observing
227    /// supervision events.
228    stop_initiated: bool,
229    /// The supervision event observed for this actor, if it has reached
230    /// terminal state.
231    supervision_event: Option<ActorSupervisionEvent>,
232    /// Streaming subscribers that receive `State<ActorState>` on every
233    /// state change. Dead subscribers are removed via undeliverable handling.
234    subscribers: Vec<hyperactor_reference::PortRef<resource::State<ActorState>>>,
235    /// The time at which the actor should be considered expired if no further
236    /// keepalive is received. `None` meaning it will never expire.
237    expiry_time: Option<std::time::SystemTime>,
238    /// Monotonic generation counter, incremented on every state-mutating
239    /// operation (spawn, stop, supervision event). Used for last-writer-wins
240    /// ordering in the mesh controller.
241    generation: u64,
242    /// Pending `WaitRankStatus` callers: each entry is the minimum
243    /// status threshold and the reply port to send once the threshold
244    /// is met.
245    pending_wait_status: Vec<(
246        resource::Status,
247        hyperactor_reference::PortRef<crate::StatusOverlay>,
248    )>,
249}
250
251impl ActorInstanceState {
252    /// Derive the resource status from spawn result, stop initiation,
253    /// and the observed supervision event.
254    fn status(&self) -> resource::Status {
255        match &self.spawn {
256            Err(e) => resource::Status::Failed(e.to_string()),
257            Ok(_) => match &self.supervision_event {
258                Some(event) if event.is_error() => resource::Status::Failed(format!("{}", event)),
259                Some(_) => resource::Status::Stopped,
260                None if self.stop_initiated => resource::Status::Stopping,
261                None => resource::Status::Running,
262            },
263        }
264    }
265
266    /// True if the actor has reached a terminal state (stopped or failed),
267    /// or if it never successfully spawned.
268    fn is_terminal(&self) -> bool {
269        match &self.spawn {
270            Err(_) => true,
271            Ok(_) => self.supervision_event.is_some(),
272        }
273    }
274
275    /// True if the supervision event is an error.
276    fn has_errors(&self) -> bool {
277        self.supervision_event
278            .as_ref()
279            .is_some_and(|e| e.is_error())
280    }
281
282    /// Build the `State<ActorState>` for this instance, suitable for
283    /// replies and subscriber notifications.
284    fn to_state(&self, name: &Name) -> resource::State<ActorState> {
285        let status = self.status();
286        let actor_state = self.spawn.as_ref().ok().map(|actor_id| ActorState {
287            actor_id: actor_id.clone(),
288            create_rank: self.create_rank,
289            supervision_events: self.supervision_event.clone().into_iter().collect(),
290        });
291        resource::State {
292            name: name.clone(),
293            status,
294            state: actor_state,
295            generation: self.generation,
296            timestamp: std::time::SystemTime::now(),
297        }
298    }
299
300    /// Notify all observers that this actor's status has changed:
301    /// streaming subscribers get the full state, and one-shot
302    /// `WaitRankStatus` waiters whose threshold is now met get replied
303    /// to and removed.
304    fn notify_status_changed(&mut self, cx: &impl hyperactor::context::Actor, name: &Name) {
305        // Streaming subscribers (persistent).
306        let state = self.to_state(name);
307        for subscriber in &self.subscribers {
308            let mut headers = Flattrs::new();
309            headers.set(STREAM_STATE_SUBSCRIBER, true);
310            if let Err(e) = subscriber.send_with_headers(cx, headers, state.clone()) {
311                tracing::warn!(
312                    "failed to send state update to subscriber {}: {}",
313                    subscriber.port_id(),
314                    e,
315                );
316            }
317        }
318
319        // One-shot waiters (predicated).
320        let status = self.status();
321        self.pending_wait_status.retain(|(min_status, reply)| {
322            if status >= *min_status {
323                let rank = self.create_rank;
324                let overlay =
325                    crate::StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
326                        .expect("valid single-run overlay");
327                let _ = reply.send(cx, overlay);
328                false
329            } else {
330                true
331            }
332        });
333    }
334}
335
336#[derive(
337    Clone,
338    Debug,
339    Default,
340    PartialEq,
341    Serialize,
342    Deserialize,
343    Named,
344    Bind,
345    Unbind
346)]
347struct SelfCheck {}
348
349/// A mesh agent is responsible for managing procs in a [`ProcMesh`].
350///
351/// ## Supervision event ingestion (remote)
352///
353/// `ProcAgent` is the *process/rank-local* sink for
354/// `ActorSupervisionEvent`s produced by the runtime (actor failures,
355/// routing failures, undeliverables, etc.).
356///
357/// We **export** `ActorSupervisionEvent` as a handler so that other
358/// procs—most importantly the process-global root client created by
359/// `context()`—can forward undeliverables as supervision
360/// events to the *currently active* mesh.
361///
362/// Without exporting this handler, `ActorSupervisionEvent` cannot be
363/// addressed via `ActorRef`/`PortRef` across processes, and the
364/// global-root-client undeliverable → supervision pipeline would
365/// degrade to log-only behavior (events become undeliverable again or
366/// are dropped).
367///
368/// See GC-1 in `global_context` module doc.
369#[hyperactor::export(
370    handlers=[
371        MeshAgentMessage,
372        ActorSupervisionEvent,
373        resource::CreateOrUpdate<ActorSpec> { cast = true },
374        resource::Stop { cast = true },
375        resource::StopAll { cast = true },
376        resource::GetState<ActorState> { cast = true },
377        resource::StreamState<ActorState> { cast = true },
378        resource::KeepaliveGetState<ActorState> { cast = true },
379        resource::GetRankStatus { cast = true },
380        resource::WaitRankStatus { cast = true },
381        RepublishIntrospect { cast = true },
382        PySpyDump,
383        ConfigDump,
384    ]
385)]
386pub struct ProcAgent {
387    proc: Proc,
388    remote: Remote,
389    state: State,
390    /// Actors created and tracked through the resource behavior.
391    actor_states: HashMap<Name, ActorInstanceState>,
392    /// If true, and supervisor is None, record supervision events to be reported
393    /// to owning actors later.
394    record_supervision_events: bool,
395    /// True when supervision events have arrived but introspect
396    /// properties haven't been republished yet.
397    introspect_dirty: bool,
398    /// If set, the shutdown handler will send the exit code through this
399    /// channel instead of calling process::exit directly, allowing the
400    /// caller to perform graceful shutdown (e.g. draining the mailbox server).
401    shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
402    /// True once a StopAll message has been received. When set, the
403    /// supervision event handler checks whether all actors have reached
404    /// terminal state and, if so, triggers process shutdown.
405    stopping_all: bool,
406    /// If set, check for expired actors whose keepalive has lapsed.
407    mesh_orphan_timeout: Option<Duration>,
408}
409
410impl ProcAgent {
411    #[hyperactor::observe_result("MeshAgent")]
412    pub(crate) async fn bootstrap(
413        proc_id: hyperactor_reference::ProcId,
414    ) -> Result<(Proc, ActorHandle<Self>), anyhow::Error> {
415        let sender = ReconfigurableMailboxSender::new();
416        let proc = Proc::configured(proc_id.clone(), BoxedMailboxSender::new(sender.clone()));
417
418        let agent = ProcAgent {
419            proc: proc.clone(),
420            remote: Remote::collect(),
421            state: State::UnconfiguredV0 { sender },
422            actor_states: HashMap::new(),
423            record_supervision_events: false,
424            introspect_dirty: false,
425            shutdown_tx: None,
426            stopping_all: false,
427            // v0 procs don't have an owner they can check for, so they should
428            // never try to kill the children.
429            mesh_orphan_timeout: None,
430        };
431        let handle = proc.spawn::<Self>("mesh", agent)?;
432        Ok((proc, handle))
433    }
434
435    pub(crate) fn boot_v1(
436        proc: Proc,
437        shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
438    ) -> Result<ActorHandle<Self>, anyhow::Error> {
439        // We can't use Option<Duration> directly in config attrs because AttrValue
440        // is not implemented for Option<Duration>. So we use a zero timeout to
441        // indicate no timeout.
442        let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
443        let orphan_timeout = if orphan_timeout.is_zero() {
444            None
445        } else {
446            Some(orphan_timeout)
447        };
448        let agent = ProcAgent {
449            proc: proc.clone(),
450            remote: Remote::collect(),
451            state: State::V1,
452            actor_states: HashMap::new(),
453            record_supervision_events: true,
454            introspect_dirty: false,
455            shutdown_tx,
456            stopping_all: false,
457            mesh_orphan_timeout: orphan_timeout,
458        };
459        proc.spawn::<Self>(PROC_AGENT_ACTOR_NAME, agent)
460    }
461
462    /// Returns true when every tracked actor has a terminal supervision event
463    /// (or failed to spawn). Used to determine when shutdown can proceed
464    /// after a StopAll.
465    fn all_actors_terminal(&self) -> bool {
466        self.actor_states.values().all(|state| state.is_terminal())
467    }
468
469    /// Trigger process shutdown. Flushes the forwarder first so that
470    /// supervision events reach their destinations, then sends through
471    /// `shutdown_tx` if available, otherwise calls `process::exit`.
472    async fn shutdown(&mut self) {
473        let has_errors = self.actor_states.values().any(|state| state.has_errors());
474        let exit_code = if has_errors { 1 } else { 0 };
475
476        let flush_timeout =
477            hyperactor_config::global::get(hyperactor::config::FORWARDER_FLUSH_TIMEOUT);
478        match tokio::time::timeout(flush_timeout, self.proc.flush()).await {
479            Ok(Err(err)) => {
480                tracing::warn!("forwarder flush failed during shutdown: {}", err);
481            }
482            Err(_elapsed) => {
483                tracing::warn!("forwarder flush timed out during shutdown");
484            }
485            Ok(Ok(())) => {}
486        }
487
488        tracing::info!(
489            "shutting down process after all actors reached terminal state (exit_code={})",
490            exit_code,
491        );
492
493        if let Some(tx) = self.shutdown_tx.take() {
494            let _ = tx.send(exit_code);
495            return;
496        }
497        std::process::exit(exit_code);
498    }
499
500    /// Send a stop signal to an actor on this proc. This is fire-and-forget;
501    /// it does not wait for the actor to reach terminal status.
502    fn stop_actor_by_id(&self, actor_id: &hyperactor_reference::ActorId, reason: &str) {
503        tracing::info!(
504            name = "StopActor",
505            %actor_id,
506            actor_name = actor_id.name(),
507            %reason,
508        );
509        self.proc.stop_actor(actor_id, reason.to_string());
510    }
511
512    /// Publish the current proc properties and children list for
513    /// introspection. See S12 in `introspect` module doc.
514    fn publish_introspect_properties(&self, cx: &impl hyperactor::context::Actor) {
515        let (mut children, mut system_children) = collect_live_children(&self.proc);
516
517        // Terminated actors appear as children but don't inflate
518        // the actor count. Track them in stopped_children so the
519        // TUI can filter/gray without per-child fetches.
520        let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
521        for id in self.proc.all_terminated_actor_ids() {
522            let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
523            let node_ref = crate::introspect::NodeRef::Actor(id.clone());
524            stopped_children.push(node_ref.clone());
525            if let Some(snapshot) = self.proc.terminated_snapshot(&id) {
526                let snapshot_attrs: hyperactor_config::Attrs =
527                    serde_json::from_str(&snapshot.attrs).unwrap_or_default();
528                if snapshot_attrs
529                    .get(hyperactor::introspect::IS_SYSTEM)
530                    .copied()
531                    .unwrap_or(false)
532                {
533                    system_children.push(node_ref);
534                }
535            }
536            if !children.contains(&child_ref) {
537                children.push(child_ref);
538            }
539        }
540
541        let stopped_retention_cap =
542            hyperactor_config::global::get(hyperactor::config::TERMINATED_SNAPSHOT_RETENTION);
543
544        // FI-5: is_poisoned iff failed_actor_count > 0.
545        let failed_actor_count = self
546            .actor_states
547            .values()
548            .filter(|s| s.has_errors())
549            .count();
550
551        // Attrs-based introspection.
552        let num_live = children.len();
553        let mut attrs = hyperactor_config::Attrs::new();
554        attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
555        attrs.set(
556            crate::introspect::PROC_NAME,
557            self.proc.proc_id().to_string(),
558        );
559        attrs.set(crate::introspect::NUM_ACTORS, num_live);
560        attrs.set(hyperactor::introspect::CHILDREN, children);
561        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
562        attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
563        attrs.set(
564            crate::introspect::STOPPED_RETENTION_CAP,
565            stopped_retention_cap,
566        );
567        attrs.set(crate::introspect::IS_POISONED, failed_actor_count > 0);
568        attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
569        cx.instance().publish_attrs(attrs);
570    }
571}
572
573#[async_trait]
574impl Actor for ProcAgent {
575    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
576        this.set_system();
577        self.proc.set_supervision_coordinator(this.port())?;
578        self.publish_introspect_properties(this);
579
580        // Resolve terminated actor snapshots via QueryChild so that
581        // dead actors remain directly queryable by reference.
582        let proc = self.proc.clone();
583        let self_id = this.self_id().clone();
584        this.set_query_child_handler(move |child_ref| {
585            use hyperactor::introspect::IntrospectResult;
586
587            if let hyperactor::reference::Reference::Actor(id) = child_ref {
588                if let Some(snapshot) = proc.terminated_snapshot(id) {
589                    return snapshot;
590                }
591            }
592
593            // PA-1 (ProcAgent path): proc-node children used by
594            // admin/TUI must be computed from live proc state at query
595            // time, not solely from cached published_properties.
596            // Therefore a direct proc.spawn() actor must appear on the
597            // next QueryChild(Reference::Proc) response without an
598            // extra publish event. See
599            // test_query_child_proc_returns_live_children.
600            if let hyperactor::reference::Reference::Proc(proc_id) = child_ref {
601                if proc_id == proc.proc_id() {
602                    let (mut children, mut system_children) = collect_live_children(&proc);
603
604                    let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
605                    for id in proc.all_terminated_actor_ids() {
606                        let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
607                        let node_ref = crate::introspect::NodeRef::Actor(id.clone());
608                        stopped_children.push(node_ref.clone());
609                        if let Some(snapshot) = proc.terminated_snapshot(&id) {
610                            let snapshot_attrs: hyperactor_config::Attrs =
611                                serde_json::from_str(&snapshot.attrs).unwrap_or_default();
612                            if snapshot_attrs
613                                .get(hyperactor::introspect::IS_SYSTEM)
614                                .copied()
615                                .unwrap_or(false)
616                            {
617                                system_children.push(node_ref);
618                            }
619                        }
620                        if !children.contains(&child_ref) {
621                            children.push(child_ref);
622                        }
623                    }
624
625                    let stopped_retention_cap = hyperactor_config::global::get(
626                        hyperactor::config::TERMINATED_SNAPSHOT_RETENTION,
627                    );
628
629                    let (is_poisoned, failed_actor_count) = proc
630                        .get_instance(&self_id)
631                        .and_then(|cell| cell.published_attrs())
632                        .map(|attrs| {
633                            let is_poisoned = attrs
634                                .get(crate::introspect::IS_POISONED)
635                                .copied()
636                                .unwrap_or(false);
637                            let failed_actor_count = attrs
638                                .get(crate::introspect::FAILED_ACTOR_COUNT)
639                                .copied()
640                                .unwrap_or(0);
641                            (is_poisoned, failed_actor_count)
642                        })
643                        .unwrap_or((false, 0));
644
645                    // Build attrs for this proc node.
646                    let num_live = children.len();
647                    let mut attrs = hyperactor_config::Attrs::new();
648                    attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
649                    attrs.set(crate::introspect::PROC_NAME, proc_id.to_string());
650                    attrs.set(crate::introspect::NUM_ACTORS, num_live);
651                    attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
652                    attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
653                    attrs.set(
654                        crate::introspect::STOPPED_RETENTION_CAP,
655                        stopped_retention_cap,
656                    );
657                    attrs.set(crate::introspect::IS_POISONED, is_poisoned);
658                    attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
659                    let attrs_json =
660                        serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
661
662                    return IntrospectResult {
663                        identity: hyperactor::introspect::IntrospectRef::Proc(proc_id.clone()),
664                        attrs: attrs_json,
665                        children,
666                        parent: None,
667                        as_of: std::time::SystemTime::now(),
668                    };
669                }
670            }
671
672            {
673                let mut error_attrs = hyperactor_config::Attrs::new();
674                error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
675                error_attrs.set(
676                    hyperactor::introspect::ERROR_MESSAGE,
677                    format!("child {} not found", child_ref),
678                );
679                let identity = match child_ref {
680                    hyperactor::reference::Reference::Proc(id) => {
681                        hyperactor::introspect::IntrospectRef::Proc(id.clone())
682                    }
683                    hyperactor::reference::Reference::Actor(id) => {
684                        hyperactor::introspect::IntrospectRef::Actor(id.clone())
685                    }
686                    hyperactor::reference::Reference::Port(id) => {
687                        hyperactor::introspect::IntrospectRef::Actor(id.actor_id().clone())
688                    }
689                };
690                IntrospectResult {
691                    identity,
692                    attrs: serde_json::to_string(&error_attrs).unwrap_or_else(|_| "{}".to_string()),
693                    children: Vec::new(),
694                    parent: None,
695                    as_of: std::time::SystemTime::now(),
696                }
697            }
698        });
699
700        if let Some(delay) = &self.mesh_orphan_timeout {
701            this.self_message_with_delay(SelfCheck::default(), *delay)?;
702        }
703        Ok(())
704    }
705
706    async fn handle_undeliverable_message(
707        &mut self,
708        cx: &Instance<Self>,
709        envelope: Undeliverable<MessageEnvelope>,
710    ) -> Result<(), anyhow::Error> {
711        if let Some(true) = envelope.0.headers().get(STREAM_STATE_SUBSCRIBER) {
712            let dest_port_id = envelope.0.dest().clone();
713            let port =
714                hyperactor_reference::PortRef::<resource::State<ActorState>>::attest(dest_port_id);
715            // Remove this subscriber from whichever actor instance holds it.
716            for instance in self.actor_states.values_mut() {
717                instance.subscribers.retain(|s| s != &port);
718            }
719            Ok(())
720        } else {
721            handle_undeliverable_message(cx, envelope)
722        }
723    }
724}
725
726#[async_trait]
727#[hyperactor::handle(MeshAgentMessage)]
728impl MeshAgentMessageHandler for ProcAgent {
729    async fn configure(
730        &mut self,
731        cx: &Context<Self>,
732        rank: usize,
733        forwarder: ChannelAddr,
734        supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
735        address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
736        configured: hyperactor_reference::PortRef<usize>,
737        record_supervision_events: bool,
738    ) -> Result<(), anyhow::Error> {
739        anyhow::ensure!(
740            self.state.is_unconfigured_v0(),
741            "mesh agent cannot be (re-)configured"
742        );
743        self.record_supervision_events = record_supervision_events;
744
745        let client = MailboxClient::new(channel::dial(forwarder)?);
746        let router =
747            DialMailboxRouter::new_with_default_direct_addressed_remote_only(client.into_boxed());
748
749        for (proc_id, addr) in address_book {
750            router.bind(proc_id.into(), addr);
751        }
752
753        let sender = take(&mut self.state).into_unconfigured_v0().unwrap();
754        assert!(sender.configure(router.into_boxed()));
755
756        // This is a bit suboptimal: ideally we'd set the supervisor first, to correctly report
757        // any errors that occur during configuration. However, these should anyway be correctly
758        // caught on process exit.
759        self.state = State::ConfiguredV0 {
760            sender,
761            rank,
762            supervisor,
763        };
764        configured.send(cx, rank)?;
765
766        Ok(())
767    }
768
769    async fn gspawn(
770        &mut self,
771        cx: &Context<Self>,
772        actor_type: String,
773        actor_name: String,
774        params_data: Data,
775        status_port: hyperactor_reference::PortRef<GspawnResult>,
776    ) -> Result<(), anyhow::Error> {
777        anyhow::ensure!(
778            self.state.is_configured_v0(),
779            "mesh agent is not v0 configured"
780        );
781        let actor_id = match self
782            .remote
783            .gspawn(
784                &self.proc,
785                &actor_type,
786                &actor_name,
787                params_data,
788                cx.headers().clone(),
789            )
790            .await
791        {
792            Ok(id) => id,
793            Err(err) => {
794                status_port.send(cx, GspawnResult::Error(format!("gspawn failed: {}", err)))?;
795                return Err(anyhow::anyhow!("gspawn failed"));
796            }
797        };
798        status_port.send(
799            cx,
800            GspawnResult::Success {
801                rank: self.state.rank().unwrap(),
802                actor_id,
803            },
804        )?;
805        self.publish_introspect_properties(cx);
806        Ok(())
807    }
808
809    async fn status(
810        &mut self,
811        cx: &Context<Self>,
812        status_port: hyperactor_reference::PortRef<(usize, bool)>,
813    ) -> Result<(), anyhow::Error> {
814        match &self.state {
815            State::ConfiguredV0 { rank, .. } => {
816                // v0 path: configured with a concrete rank
817                status_port.send(cx, (*rank, true))?;
818                Ok(())
819            }
820            State::UnconfiguredV0 { .. } => {
821                // v0 path but not configured yet
822                Err(anyhow::anyhow!(
823                    "status unavailable: v0 agent not configured (waiting for Configure)"
824                ))
825            }
826            State::V1 => {
827                // v1/owned path does not support status (no rank semantics)
828                Err(anyhow::anyhow!(
829                    "status unsupported in v1/owned path (no rank)"
830                ))
831            }
832            State::Invalid => Err(anyhow::anyhow!(
833                "status unavailable: agent in invalid state"
834            )),
835        }
836    }
837}
838
839#[async_trait]
840impl Handler<ActorSupervisionEvent> for ProcAgent {
841    async fn handle(
842        &mut self,
843        cx: &Context<Self>,
844        event: ActorSupervisionEvent,
845    ) -> anyhow::Result<()> {
846        if self.record_supervision_events {
847            if event.is_error() {
848                tracing::warn!(
849                    name = "SupervisionEvent",
850                    proc_id = %self.proc.proc_id(),
851                    %event,
852                    "recording supervision error",
853                );
854            } else {
855                tracing::debug!(
856                    name = "SupervisionEvent",
857                    proc_id = %self.proc.proc_id(),
858                    %event,
859                    "recording non-error supervision event",
860                );
861            }
862            // Record the event in the actor's instance state and notify subscribers.
863            if let Some((name, instance)) = self
864                .actor_states
865                .iter_mut()
866                .find(|(_, s)| s.spawn.as_ref().ok() == Some(&event.actor_id))
867            {
868                instance.supervision_event = Some(event.clone());
869                instance.generation += 1;
870                let name = name.clone();
871                instance.notify_status_changed(cx, &name);
872            }
873            // Defer republish so introspection picks up is_poisoned /
874            // failed_actor_count without blocking the message loop.
875            // Multiple rapid events coalesce into one republish.
876            if !self.introspect_dirty {
877                self.introspect_dirty = true;
878                let _ = cx.self_message_with_delay(
879                    RepublishIntrospect,
880                    std::time::Duration::from_millis(100),
881                );
882            }
883
884            // If StopAll was requested, check whether all actors have now
885            // reached terminal state. If so, shut down the process.
886            if self.stopping_all && self.all_actors_terminal() {
887                self.shutdown().await;
888            }
889        }
890        if let Some(supervisor) = self.state.supervisor() {
891            supervisor.send(cx, event)?;
892        } else if !self.record_supervision_events && event.is_error() {
893            // If there is no supervisor, and nothing is recording these, crash
894            // the whole process on error events.
895            tracing::error!(
896                name = "supervision_event_transmit_failed",
897                proc_id = %cx.self_id().proc_id(),
898                %event,
899                "could not propagate supervision event, crashing",
900            );
901
902            // We should have a custom "crash" function here, so that this works
903            // in testing of the LocalAllocator, etc.
904            std::process::exit(1);
905        }
906        Ok(())
907    }
908}
909
910#[async_trait]
911impl Handler<RepublishIntrospect> for ProcAgent {
912    async fn handle(&mut self, cx: &Context<Self>, _: RepublishIntrospect) -> anyhow::Result<()> {
913        if self.introspect_dirty {
914            self.introspect_dirty = false;
915            self.publish_introspect_properties(cx);
916        }
917        Ok(())
918    }
919}
920
921#[async_trait]
922impl Handler<PySpyDump> for ProcAgent {
923    async fn handle(
924        &mut self,
925        cx: &Context<Self>,
926        message: PySpyDump,
927    ) -> Result<(), anyhow::Error> {
928        PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
929    }
930}
931
932#[async_trait]
933impl Handler<ConfigDump> for ProcAgent {
934    async fn handle(
935        &mut self,
936        cx: &Context<Self>,
937        message: ConfigDump,
938    ) -> Result<(), anyhow::Error> {
939        let entries = hyperactor_config::global::config_entries();
940        // Reply is best-effort: the caller may have timed out and dropped
941        // the once-port.  That must not crash this actor.
942        let _ = message.result.send(cx, ConfigDumpResult { entries });
943        Ok(())
944    }
945}
946
947// Implement the resource behavior for managing actors:
948
949/// Actor spec.
950#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
951pub struct ActorSpec {
952    /// registered actor type
953    pub actor_type: String,
954    /// serialized parameters
955    pub params_data: Data,
956}
957wirevalue::register_type!(ActorSpec);
958
959/// Actor state.
960#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
961pub struct ActorState {
962    /// The actor's ID.
963    pub actor_id: hyperactor_reference::ActorId,
964    /// The rank of the proc that created the actor. This is before any slicing.
965    pub create_rank: usize,
966    // TODO status: ActorStatus,
967    pub supervision_events: Vec<ActorSupervisionEvent>,
968}
969wirevalue::register_type!(ActorState);
970
971#[async_trait]
972impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcAgent {
973    async fn handle(
974        &mut self,
975        cx: &Context<Self>,
976        create_or_update: resource::CreateOrUpdate<ActorSpec>,
977    ) -> anyhow::Result<()> {
978        if self.actor_states.contains_key(&create_or_update.name) {
979            // There is no update.
980            return Ok(());
981        }
982        let create_rank = create_or_update.rank.unwrap();
983        // If any actor on this proc has error supervision events,
984        // we disallow spawning new actors on it, as this proc may be in an
985        // invalid state.
986        if self.actor_states.values().any(|s| s.has_errors()) {
987            self.actor_states.insert(
988                create_or_update.name.clone(),
989                ActorInstanceState {
990                    spawn: Err(anyhow::anyhow!(
991                        "Cannot spawn new actors on mesh with supervision events"
992                    )),
993                    create_rank,
994                    stop_initiated: false,
995                    supervision_event: None,
996                    subscribers: Vec::new(),
997                    expiry_time: None,
998                    generation: 1,
999                    pending_wait_status: Vec::new(),
1000                },
1001            );
1002            return Ok(());
1003        }
1004
1005        let ActorSpec {
1006            actor_type,
1007            params_data,
1008        } = create_or_update.spec;
1009        self.actor_states.insert(
1010            create_or_update.name.clone(),
1011            ActorInstanceState {
1012                create_rank,
1013                spawn: self
1014                    .remote
1015                    .gspawn(
1016                        &self.proc,
1017                        &actor_type,
1018                        &create_or_update.name.to_string(),
1019                        params_data,
1020                        cx.headers().clone(),
1021                    )
1022                    .await,
1023                stop_initiated: false,
1024                supervision_event: None,
1025                subscribers: Vec::new(),
1026                expiry_time: None,
1027                generation: 1,
1028                pending_wait_status: Vec::new(),
1029            },
1030        );
1031
1032        self.publish_introspect_properties(cx);
1033        Ok(())
1034    }
1035}
1036
1037#[async_trait]
1038impl Handler<resource::Stop> for ProcAgent {
1039    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
1040        let actor_id = match self.actor_states.get_mut(&message.name) {
1041            Some(actor_state) => {
1042                let id = actor_state.spawn.as_ref().ok().cloned();
1043                if id.is_some() && !actor_state.stop_initiated {
1044                    actor_state.stop_initiated = true;
1045                    actor_state.generation += 1;
1046                    actor_state.notify_status_changed(cx, &message.name);
1047                    id
1048                } else {
1049                    None
1050                }
1051            }
1052            None => None,
1053        };
1054        if let Some(actor_id) = actor_id {
1055            self.stop_actor_by_id(&actor_id, &message.reason);
1056        }
1057
1058        Ok(())
1059    }
1060}
1061
1062/// Handles `StopAll` by sending stop signals to all child actors.
1063/// Process shutdown is deferred until all actors have reached terminal
1064/// state, as observed through supervision events.
1065#[async_trait]
1066impl Handler<resource::StopAll> for ProcAgent {
1067    async fn handle(
1068        &mut self,
1069        _cx: &Context<Self>,
1070        message: resource::StopAll,
1071    ) -> anyhow::Result<()> {
1072        self.stopping_all = true;
1073
1074        // Send stop signals to all actors that haven't been stopped yet.
1075        let to_stop: Vec<hyperactor_reference::ActorId> = self
1076            .actor_states
1077            .values_mut()
1078            .filter_map(|state| {
1079                if state.stop_initiated {
1080                    return None;
1081                }
1082                state.stop_initiated = true;
1083                state.spawn.as_ref().ok().cloned()
1084            })
1085            .collect();
1086
1087        for actor_id in &to_stop {
1088            self.stop_actor_by_id(actor_id, &message.reason);
1089        }
1090
1091        // If there are no actors to stop, shut down immediately.
1092        if self.all_actors_terminal() {
1093            self.shutdown().await;
1094        }
1095
1096        Ok(())
1097    }
1098}
1099
1100#[async_trait]
1101impl Handler<resource::GetRankStatus> for ProcAgent {
1102    async fn handle(
1103        &mut self,
1104        cx: &Context<Self>,
1105        get_rank_status: resource::GetRankStatus,
1106    ) -> anyhow::Result<()> {
1107        use crate::StatusOverlay;
1108        use crate::resource::Status;
1109
1110        let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
1111            Some(state) => (state.create_rank, state.status()),
1112            None => (usize::MAX, Status::NotExist),
1113        };
1114
1115        // Send a sparse overlay update. If rank is unknown, emit an
1116        // empty overlay.
1117        let overlay = if rank == usize::MAX {
1118            StatusOverlay::new()
1119        } else {
1120            StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1121                .expect("valid single-run overlay")
1122        };
1123        let result = get_rank_status.reply.send(cx, overlay);
1124        // Ignore errors, because returning Err from here would cause the ProcAgent
1125        // to be stopped, which would prevent querying and spawning other actors.
1126        // This only means some actor that requested the state of an actor failed to receive it.
1127        if let Err(e) = result {
1128            tracing::warn!(
1129                actor = %cx.self_id(),
1130                "failed to send GetRankStatus reply to {} due to error: {}",
1131                get_rank_status.reply.port_id().actor_id(),
1132                e
1133            );
1134        }
1135        Ok(())
1136    }
1137}
1138
1139#[async_trait]
1140impl Handler<resource::WaitRankStatus> for ProcAgent {
1141    async fn handle(
1142        &mut self,
1143        cx: &Context<Self>,
1144        msg: resource::WaitRankStatus,
1145    ) -> anyhow::Result<()> {
1146        use crate::StatusOverlay;
1147        use crate::resource::Status;
1148
1149        let (rank, status) = match self.actor_states.get(&msg.name) {
1150            Some(state) => (state.create_rank, state.status()),
1151            None => (usize::MAX, Status::NotExist),
1152        };
1153
1154        // If already at or past the requested threshold, reply immediately.
1155        if status >= msg.min_status || rank == usize::MAX {
1156            let overlay = if rank == usize::MAX {
1157                StatusOverlay::new()
1158            } else {
1159                StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1160                    .expect("valid single-run overlay")
1161            };
1162            let _ = msg.reply.send(cx, overlay);
1163            return Ok(());
1164        }
1165
1166        // Otherwise, stash the waiter. It will be flushed when the
1167        // status changes (supervision event or stop).
1168        if let Some(state) = self.actor_states.get_mut(&msg.name) {
1169            state.pending_wait_status.push((msg.min_status, msg.reply));
1170        }
1171        Ok(())
1172    }
1173}
1174
1175#[async_trait]
1176impl Handler<resource::GetState<ActorState>> for ProcAgent {
1177    async fn handle(
1178        &mut self,
1179        cx: &Context<Self>,
1180        get_state: resource::GetState<ActorState>,
1181    ) -> anyhow::Result<()> {
1182        let state = match self.actor_states.get(&get_state.name) {
1183            Some(instance) => instance.to_state(&get_state.name),
1184            None => resource::State {
1185                name: get_state.name.clone(),
1186                status: resource::Status::NotExist,
1187                state: None,
1188                generation: 0,
1189                timestamp: std::time::SystemTime::now(),
1190            },
1191        };
1192
1193        let result = get_state.reply.send(cx, state);
1194        if let Err(e) = result {
1195            tracing::warn!(
1196                actor = %cx.self_id(),
1197                "failed to send GetState reply to {} due to error: {}",
1198                get_state.reply.port_id().actor_id(),
1199                e
1200            );
1201        }
1202        Ok(())
1203    }
1204}
1205
1206#[async_trait]
1207impl Handler<resource::StreamState<ActorState>> for ProcAgent {
1208    async fn handle(
1209        &mut self,
1210        cx: &Context<Self>,
1211        stream_state: resource::StreamState<ActorState>,
1212    ) -> anyhow::Result<()> {
1213        let state = match self.actor_states.get_mut(&stream_state.name) {
1214            Some(instance) => {
1215                let state = instance.to_state(&stream_state.name);
1216                instance.subscribers.push(stream_state.subscriber.clone());
1217                state
1218            }
1219            None => resource::State {
1220                name: stream_state.name.clone(),
1221                status: resource::Status::NotExist,
1222                state: None,
1223                generation: 0,
1224                timestamp: std::time::SystemTime::now(),
1225            },
1226        };
1227
1228        // Send the current state immediately.
1229        let mut headers = Flattrs::new();
1230        headers.set(STREAM_STATE_SUBSCRIBER, true);
1231        if let Err(e) = stream_state
1232            .subscriber
1233            .send_with_headers(cx, headers, state)
1234        {
1235            tracing::warn!(
1236                actor = %cx.self_id(),
1237                "failed to send initial StreamState to {}: {}",
1238                stream_state.subscriber.port_id().actor_id(),
1239                e,
1240            );
1241        }
1242        Ok(())
1243    }
1244}
1245
1246#[async_trait]
1247impl Handler<resource::KeepaliveGetState<ActorState>> for ProcAgent {
1248    async fn handle(
1249        &mut self,
1250        cx: &Context<Self>,
1251        message: resource::KeepaliveGetState<ActorState>,
1252    ) -> anyhow::Result<()> {
1253        // Same impl as GetState, but additionally update the expiry time on the actor.
1254        if let Ok(instance_state) = self
1255            .actor_states
1256            .get_mut(&message.get_state.name)
1257            .ok_or_else(|| {
1258                anyhow::anyhow!(
1259                    "attempting to register a keepalive for an actor that doesn't exist: {}",
1260                    message.get_state.name
1261                )
1262            })
1263        {
1264            instance_state.expiry_time = Some(message.expires_after);
1265        }
1266
1267        // Forward the rest of the impl to GetState.
1268        <Self as Handler<resource::GetState<ActorState>>>::handle(self, cx, message.get_state).await
1269    }
1270}
1271
1272/// A local handler to get a new client instance on the proc.
1273/// This is used to create root client instances.
1274#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1275pub struct NewClientInstance {
1276    #[reply]
1277    pub client_instance: PortHandle<Instance<()>>,
1278}
1279
1280#[async_trait]
1281impl Handler<NewClientInstance> for ProcAgent {
1282    async fn handle(
1283        &mut self,
1284        cx: &Context<Self>,
1285        NewClientInstance { client_instance }: NewClientInstance,
1286    ) -> anyhow::Result<()> {
1287        let (instance, _handle) = self.proc.instance("client")?;
1288        client_instance.send(cx, instance)?;
1289        Ok(())
1290    }
1291}
1292
1293/// A handler to get a clone of the proc managed by this agent.
1294/// This is used to obtain the local proc from a host mesh.
1295#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1296pub struct GetProc {
1297    #[reply]
1298    pub proc: PortHandle<Proc>,
1299}
1300
1301#[async_trait]
1302impl Handler<GetProc> for ProcAgent {
1303    async fn handle(
1304        &mut self,
1305        cx: &Context<Self>,
1306        GetProc { proc }: GetProc,
1307    ) -> anyhow::Result<()> {
1308        proc.send(cx, self.proc.clone())?;
1309        Ok(())
1310    }
1311}
1312
1313#[async_trait]
1314impl Handler<SelfCheck> for ProcAgent {
1315    async fn handle(&mut self, cx: &Context<Self>, _: SelfCheck) -> anyhow::Result<()> {
1316        // Check each actor's expiry time. If the current time is past the expiry,
1317        // stop the actor. This allows automatic cleanup when a controller disappears
1318        // but owned resources remain. It is important that this check runs on the
1319        // same proc as the child actor itself, since the controller could be dead or
1320        // disconnected.
1321        let Some(duration) = &self.mesh_orphan_timeout else {
1322            return Ok(());
1323        };
1324        let duration = duration.clone();
1325        let now = std::time::SystemTime::now();
1326
1327        // Collect expired actors before mutating, since stop_actor borrows &mut self.
1328        let expired: Vec<(Name, hyperactor_reference::ActorId)> = self
1329            .actor_states
1330            .iter()
1331            .filter_map(|(name, state)| {
1332                let expiry = state.expiry_time?;
1333                // If a stop was already initiated we don't need to do it again.
1334                if now > expiry && !state.stop_initiated {
1335                    if let Ok(actor_id) = &state.spawn {
1336                        return Some((name.clone(), actor_id.clone()));
1337                    }
1338                }
1339                None
1340            })
1341            .collect();
1342
1343        if !expired.is_empty() {
1344            tracing::info!(
1345                "stopping {} orphaned actors past their keepalive expiry",
1346                expired.len(),
1347            );
1348        }
1349
1350        for (name, actor_id) in expired {
1351            if let Some(state) = self.actor_states.get_mut(&name) {
1352                state.stop_initiated = true;
1353            }
1354            self.stop_actor_by_id(&actor_id, "orphaned");
1355        }
1356
1357        // Reschedule.
1358        cx.self_message_with_delay(SelfCheck::default(), duration)?;
1359        Ok(())
1360    }
1361}
1362
1363/// A mailbox sender that initially queues messages, and then relays them to
1364/// an underlying sender once configured.
1365#[derive(Clone)]
1366pub(crate) struct ReconfigurableMailboxSender {
1367    state: Arc<RwLock<ReconfigurableMailboxSenderState>>,
1368}
1369
1370impl std::fmt::Debug for ReconfigurableMailboxSender {
1371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1372        // Not super helpful, but we definitely don't wan to acquire any locks
1373        // in a Debug formatter.
1374        f.debug_struct("ReconfigurableMailboxSender").finish()
1375    }
1376}
1377
1378/// A capability wrapper granting access to the configured mailbox
1379/// sender.
1380///
1381/// This type exists to tie the lifetime of any `&BoxedMailboxSender`
1382/// reference to a lock guard, so the underlying state cannot be
1383/// reconfigured while the reference is in use.
1384///
1385/// A **read** guard is sufficient because we only need to *observe*
1386/// and borrow the configured sender, not mutate state. While a
1387/// `RwLockReadGuard` is held, `configure()` cannot acquire the write
1388/// lock, so the state cannot transition from `Configured(..)` to any
1389/// other variant during the guard’s lifetime.
1390pub(crate) struct ReconfigurableMailboxSenderInner<'a> {
1391    guard: RwLockReadGuard<'a, ReconfigurableMailboxSenderState>,
1392}
1393
1394impl<'a> ReconfigurableMailboxSenderInner<'a> {
1395    pub(crate) fn as_configured(&self) -> Option<&BoxedMailboxSender> {
1396        self.guard.as_configured()
1397    }
1398}
1399
1400type Post = (MessageEnvelope, PortHandle<Undeliverable<MessageEnvelope>>);
1401
1402#[derive(EnumAsInner, Debug)]
1403enum ReconfigurableMailboxSenderState {
1404    Queueing(Mutex<Vec<Post>>),
1405    Configured(BoxedMailboxSender),
1406}
1407
1408impl ReconfigurableMailboxSender {
1409    pub(crate) fn new() -> Self {
1410        Self {
1411            state: Arc::new(RwLock::new(ReconfigurableMailboxSenderState::Queueing(
1412                Mutex::new(Vec::new()),
1413            ))),
1414        }
1415    }
1416
1417    /// Configure this mailbox with the provided sender. This will first
1418    /// enqueue any pending messages onto the sender; future messages are
1419    /// posted directly to the configured sender.
1420    pub(crate) fn configure(&self, sender: BoxedMailboxSender) -> bool {
1421        // Hold the write lock until all queued messages are flushed.
1422        let mut state = self.state.write().unwrap();
1423        if state.is_configured() {
1424            return false;
1425        }
1426
1427        // Install the configured sender exactly once.
1428        let queued = std::mem::replace(
1429            &mut *state,
1430            ReconfigurableMailboxSenderState::Configured(sender),
1431        );
1432
1433        // Borrow the configured sender from the state (stable while
1434        // we hold the lock).
1435        let configured_sender = state.as_configured().expect("just configured");
1436
1437        // Flush the old queue while still holding the write lock.
1438        for (envelope, return_handle) in queued.into_queueing().unwrap().into_inner().unwrap() {
1439            configured_sender.post(envelope, return_handle);
1440        }
1441
1442        true
1443    }
1444
1445    pub(crate) fn as_inner<'a>(
1446        &'a self,
1447    ) -> Result<ReconfigurableMailboxSenderInner<'a>, anyhow::Error> {
1448        let state = self.state.read().unwrap();
1449        if state.is_configured() {
1450            Ok(ReconfigurableMailboxSenderInner { guard: state })
1451        } else {
1452            Err(anyhow::anyhow!("cannot get inner sender: not configured"))
1453        }
1454    }
1455}
1456
1457#[async_trait]
1458impl MailboxSender for ReconfigurableMailboxSender {
1459    fn post(
1460        &self,
1461        envelope: MessageEnvelope,
1462        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1463    ) {
1464        match &*self.state.read().unwrap() {
1465            ReconfigurableMailboxSenderState::Queueing(queue) => {
1466                queue.lock().unwrap().push((envelope, return_handle));
1467            }
1468            ReconfigurableMailboxSenderState::Configured(sender) => {
1469                sender.post(envelope, return_handle);
1470            }
1471        }
1472    }
1473
1474    fn post_unchecked(
1475        &self,
1476        envelope: MessageEnvelope,
1477        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1478    ) {
1479        match &*self.state.read().unwrap() {
1480            ReconfigurableMailboxSenderState::Queueing(queue) => {
1481                queue.lock().unwrap().push((envelope, return_handle));
1482            }
1483            ReconfigurableMailboxSenderState::Configured(sender) => {
1484                sender.post_unchecked(envelope, return_handle);
1485            }
1486        }
1487    }
1488
1489    async fn flush(&self) -> Result<(), anyhow::Error> {
1490        let sender = match &*self.state.read().unwrap() {
1491            ReconfigurableMailboxSenderState::Queueing(_) => return Ok(()),
1492            ReconfigurableMailboxSenderState::Configured(sender) => sender.clone(),
1493        };
1494        sender.flush().await
1495    }
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500    use std::sync::Arc;
1501    use std::sync::Mutex;
1502
1503    use hyperactor::mailbox::BoxedMailboxSender;
1504    use hyperactor::mailbox::Mailbox;
1505    use hyperactor::mailbox::MailboxSender;
1506    use hyperactor::mailbox::MessageEnvelope;
1507    use hyperactor::mailbox::PortHandle;
1508    use hyperactor::mailbox::Undeliverable;
1509    use hyperactor::testing::ids::test_actor_id;
1510    use hyperactor::testing::ids::test_port_id;
1511    use hyperactor_config::Flattrs;
1512
1513    use super::*;
1514
1515    #[derive(Debug, Clone)]
1516    struct QueueingMailboxSender {
1517        messages: Arc<Mutex<Vec<MessageEnvelope>>>,
1518    }
1519
1520    impl QueueingMailboxSender {
1521        fn new() -> Self {
1522            Self {
1523                messages: Arc::new(Mutex::new(Vec::new())),
1524            }
1525        }
1526
1527        fn get_messages(&self) -> Vec<MessageEnvelope> {
1528            self.messages.lock().unwrap().clone()
1529        }
1530    }
1531
1532    #[async_trait]
1533    impl MailboxSender for QueueingMailboxSender {
1534        fn post_unchecked(
1535            &self,
1536            envelope: MessageEnvelope,
1537            _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1538        ) {
1539            self.messages.lock().unwrap().push(envelope);
1540        }
1541    }
1542
1543    // Helper function to create a test message envelope
1544    fn envelope(data: u64) -> MessageEnvelope {
1545        MessageEnvelope::serialize(
1546            test_actor_id("world_0", "sender"),
1547            test_port_id("world_0", "receiver", 1),
1548            &data,
1549            Flattrs::new(),
1550        )
1551        .unwrap()
1552    }
1553
1554    fn return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
1555        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
1556        let (port, _receiver) = mbox.open_port::<Undeliverable<MessageEnvelope>>();
1557        port
1558    }
1559
1560    #[test]
1561    fn test_queueing_before_configure() {
1562        let sender = ReconfigurableMailboxSender::new();
1563
1564        let test_sender = QueueingMailboxSender::new();
1565        let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1566
1567        let return_handle = return_handle();
1568        sender.post(envelope(1), return_handle.clone());
1569        sender.post(envelope(2), return_handle.clone());
1570
1571        assert_eq!(test_sender.get_messages().len(), 0);
1572
1573        sender.configure(boxed_sender);
1574
1575        let messages = test_sender.get_messages();
1576        assert_eq!(messages.len(), 2);
1577
1578        assert_eq!(messages[0].deserialized::<u64>().unwrap(), 1);
1579        assert_eq!(messages[1].deserialized::<u64>().unwrap(), 2);
1580    }
1581
1582    #[test]
1583    fn test_direct_delivery_after_configure() {
1584        // Create a ReconfigurableMailboxSender
1585        let sender = ReconfigurableMailboxSender::new();
1586
1587        let test_sender = QueueingMailboxSender::new();
1588        let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1589        sender.configure(boxed_sender);
1590
1591        let return_handle = return_handle();
1592        sender.post(envelope(3), return_handle.clone());
1593        sender.post(envelope(4), return_handle.clone());
1594
1595        let messages = test_sender.get_messages();
1596        assert_eq!(messages.len(), 2);
1597
1598        assert_eq!(messages[0].deserialized::<u64>().unwrap(), 3);
1599        assert_eq!(messages[1].deserialized::<u64>().unwrap(), 4);
1600    }
1601
1602    #[test]
1603    fn test_multiple_configurations() {
1604        let sender = ReconfigurableMailboxSender::new();
1605        let boxed_sender = BoxedMailboxSender::new(QueueingMailboxSender::new());
1606
1607        assert!(sender.configure(boxed_sender.clone()));
1608        assert!(!sender.configure(boxed_sender));
1609    }
1610
1611    #[test]
1612    fn test_mixed_queueing_and_direct_delivery() {
1613        let sender = ReconfigurableMailboxSender::new();
1614
1615        let test_sender = QueueingMailboxSender::new();
1616        let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1617
1618        let return_handle = return_handle();
1619        sender.post(envelope(5), return_handle.clone());
1620        sender.post(envelope(6), return_handle.clone());
1621
1622        sender.configure(boxed_sender);
1623
1624        sender.post(envelope(7), return_handle.clone());
1625        sender.post(envelope(8), return_handle.clone());
1626
1627        let messages = test_sender.get_messages();
1628        assert_eq!(messages.len(), 4);
1629
1630        assert_eq!(messages[0].deserialized::<u64>().unwrap(), 5);
1631        assert_eq!(messages[1].deserialized::<u64>().unwrap(), 6);
1632        assert_eq!(messages[2].deserialized::<u64>().unwrap(), 7);
1633        assert_eq!(messages[3].deserialized::<u64>().unwrap(), 8);
1634    }
1635
1636    // A no-op actor used to test direct proc-level spawning.
1637    #[derive(Debug, Default, Serialize, Deserialize)]
1638    #[hyperactor::export(handlers = [])]
1639    struct ExtraActor;
1640    impl hyperactor::Actor for ExtraActor {}
1641    hyperactor::remote!(ExtraActor);
1642    // Verifies that QueryChild(Reference::Proc) on a ProcAgent returns
1643    // a live IntrospectResult whose children reflect actors spawned
1644    // directly on the proc — i.e. via proc.spawn(), which bypasses the
1645    // gspawn message handler and therefore never triggers
1646    // publish_introspect_properties.
1647    //
1648    // Exercises PA-1 (see mesh_admin module doc).
1649    //
1650    // Regression guard for the bug introduced in 9a08d559: removing
1651    // handle_introspect left publish_introspect_properties as the only
1652    // update path, which missed supervision-spawned actors (e.g. every
1653    // sieve actor after sieve[0]). See also
1654    // mesh_admin::tests::test_proc_children_reflect_directly_spawned_actors.
1655    #[tokio::test]
1656    async fn test_query_child_proc_returns_live_children() {
1657        use hyperactor::Proc;
1658        use hyperactor::actor::ActorStatus;
1659        use hyperactor::channel::ChannelTransport;
1660        use hyperactor::introspect::IntrospectMessage;
1661        use hyperactor::introspect::IntrospectResult;
1662        use hyperactor::reference as hyperactor_reference;
1663
1664        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1665        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1666
1667        // Wait for ProcAgent to finish init.
1668        agent_handle
1669            .status()
1670            .wait_for(|s| matches!(s, ActorStatus::Idle))
1671            .await
1672            .unwrap();
1673
1674        // Client instance for opening reply ports.
1675        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1676        let (client, _client_handle) = client_proc.instance("client").unwrap();
1677
1678        let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1679        let port =
1680            hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1681
1682        // Helper: send QueryChild(Proc) and return the payload with a
1683        // timeout so a misrouted reply fails fast rather than hanging.
1684        let query = |client: &hyperactor::Instance<()>| {
1685            let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1686            port.send(
1687                client,
1688                IntrospectMessage::QueryChild {
1689                    child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1690                    reply: reply_port.bind(),
1691                },
1692            )
1693            .unwrap();
1694            reply_rx
1695        };
1696        let recv = |rx: hyperactor::mailbox::OncePortReceiver<IntrospectResult>| async move {
1697            tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
1698                .await
1699                .expect("QueryChild(Proc) timed out — reply never delivered")
1700                .expect("reply channel closed")
1701        };
1702
1703        // Initial query: ProcAgent itself should appear in children.
1704        let payload = recv(query(&client)).await;
1705        // Verify this is a proc node by checking attrs contain node_type=proc.
1706        let attrs: hyperactor_config::Attrs =
1707            serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1708        assert_eq!(
1709            attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1710            Some("proc"),
1711            "expected node_type=proc in attrs, got {:?}",
1712            payload.attrs
1713        );
1714        assert!(
1715            payload
1716                .children
1717                .iter()
1718                .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
1719            "initial children {:?} should contain proc_agent",
1720            payload.children
1721        );
1722        let initial_count = payload.children.len();
1723
1724        // Spawn an actor directly on the proc, bypassing ProcAgent's
1725        // gspawn message handler. This is how supervision-spawned
1726        // actors (e.g. sieve children) are created.
1727        proc.spawn("extra_actor", ExtraActor).unwrap();
1728
1729        // Second query: extra_actor must appear without any republish.
1730        let payload2 = recv(query(&client)).await;
1731        let attrs2: hyperactor_config::Attrs =
1732            serde_json::from_str(&payload2.attrs).expect("valid attrs JSON");
1733        assert_eq!(
1734            attrs2.get(crate::introspect::NODE_TYPE).map(String::as_str),
1735            Some("proc"),
1736            "expected node_type=proc in attrs, got {:?}",
1737            payload2.attrs
1738        );
1739        assert!(
1740            payload2
1741                .children
1742                .iter()
1743                .any(|c| c.to_string().contains("extra_actor")),
1744            "after direct spawn, children {:?} should contain extra_actor",
1745            payload2.children
1746        );
1747        assert!(
1748            payload2.children.len() > initial_count,
1749            "expected at least {} children after direct spawn, got {:?}",
1750            initial_count + 1,
1751            payload2.children
1752        );
1753    }
1754
1755    // Exercises S12 (see introspect module doc): introspection must
1756    // not impair actor liveness. Rapidly spawns and stops
1757    // actors while concurrently querying QueryChild(Reference::Proc).
1758    // The spawn/stop loop must complete within the timeout and the
1759    // iteration count must match -- if DashMap convoy starvation
1760    // blocks the proc, the timeout fires and the test fails.
1761    #[tokio::test]
1762    async fn test_rapid_spawn_stop_does_not_stall_proc_agent() {
1763        use std::sync::Arc;
1764        use std::sync::atomic::AtomicUsize;
1765        use std::sync::atomic::Ordering;
1766
1767        use hyperactor::Proc;
1768        use hyperactor::actor::ActorStatus;
1769        use hyperactor::channel::ChannelTransport;
1770        use hyperactor::introspect::IntrospectMessage;
1771        use hyperactor::introspect::IntrospectResult;
1772        use hyperactor::reference as hyperactor_reference;
1773
1774        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1775        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1776
1777        agent_handle
1778            .status()
1779            .wait_for(|s| matches!(s, ActorStatus::Idle))
1780            .await
1781            .unwrap();
1782
1783        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1784        let (client, _client_handle) = client_proc.instance("client").unwrap();
1785
1786        let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1787        let port =
1788            hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1789
1790        // Concurrent query task: send QueryChild(Proc) every 10ms.
1791        let query_client_proc =
1792            Proc::direct(ChannelTransport::Unix.any(), "query_client".to_string()).unwrap();
1793        let (query_client, _qc_handle) = query_client_proc.instance("qc").unwrap();
1794        let query_port = port.clone();
1795        let query_proc_id = proc.proc_id().clone();
1796        let query_count = Arc::new(AtomicUsize::new(0));
1797        let query_count_clone = query_count.clone();
1798        let query_task = tokio::spawn(async move {
1799            loop {
1800                let (reply_port, reply_rx) = query_client.open_once_port::<IntrospectResult>();
1801                if query_port
1802                    .send(
1803                        &query_client,
1804                        IntrospectMessage::QueryChild {
1805                            child_ref: hyperactor_reference::Reference::Proc(query_proc_id.clone()),
1806                            reply: reply_port.bind(),
1807                        },
1808                    )
1809                    .is_err()
1810                {
1811                    break;
1812                }
1813                match tokio::time::timeout(std::time::Duration::from_secs(2), reply_rx.recv()).await
1814                {
1815                    Ok(Ok(_)) => {
1816                        query_count_clone.fetch_add(1, Ordering::Relaxed);
1817                    }
1818                    _ => {} // Transient failures expected during churn
1819                }
1820                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1821            }
1822        });
1823
1824        // Rapid spawn/stop loop with liveness timeout.
1825        const ITERATIONS: usize = 200;
1826        let mut completed = 0usize;
1827        let result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1828            for i in 0..ITERATIONS {
1829                let name = format!("churn_{}", i);
1830                let handle = proc.spawn(&name, ExtraActor).unwrap();
1831                let actor_id = handle.actor_id().clone();
1832                if let Some(mut status) = proc.stop_actor(&actor_id, "churn".to_string()) {
1833                    let _ = tokio::time::timeout(
1834                        std::time::Duration::from_secs(5),
1835                        status.wait_for(ActorStatus::is_terminal),
1836                    )
1837                    .await;
1838                }
1839                completed += 1;
1840            }
1841        })
1842        .await;
1843
1844        query_task.abort();
1845        let _ = query_task.await; // Join to suppress noisy panic on drop.
1846
1847        assert!(
1848            result.is_ok(),
1849            "spawn/stop loop stalled after {completed}/{ITERATIONS} iterations — \
1850             DashMap convoy starvation likely"
1851        );
1852        assert_eq!(
1853            completed, ITERATIONS,
1854            "expected {ITERATIONS} completed iterations, got {completed}"
1855        );
1856        assert!(
1857            query_count.load(Ordering::Relaxed) > 0,
1858            "concurrent QueryChild queries never succeeded — query task may not have run"
1859        );
1860
1861        // Final consistency check: QueryChild should still work.
1862        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1863        port.send(
1864            &client,
1865            IntrospectMessage::QueryChild {
1866                child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1867                reply: reply_port.bind(),
1868            },
1869        )
1870        .unwrap();
1871        let final_payload =
1872            tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1873                .await
1874                .expect("final QueryChild timed out")
1875                .expect("final QueryChild channel closed");
1876        let attrs: hyperactor_config::Attrs =
1877            serde_json::from_str(&final_payload.attrs).expect("valid attrs JSON");
1878        assert_eq!(
1879            attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1880            Some("proc"),
1881        );
1882    }
1883
1884    #[tokio::test]
1885    async fn test_stream_state_and_unsubscribe() {
1886        use hyperactor::Proc;
1887        use hyperactor::actor::ActorStatus;
1888        use hyperactor::channel::ChannelTransport;
1889
1890        use crate::resource::CreateOrUpdateClient;
1891        use crate::resource::GetStateClient;
1892        use crate::resource::StopClient;
1893        use crate::resource::StreamStateClient;
1894
1895        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1896        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1897        agent_handle
1898            .status()
1899            .wait_for(|s| matches!(s, ActorStatus::Idle))
1900            .await
1901            .unwrap();
1902
1903        let (client, _client_handle) = proc.instance("client").unwrap();
1904        let agent_ref: hyperactor_reference::ActorRef<ProcAgent> = agent_handle.bind();
1905
1906        let actor_type = hyperactor::actor::remote::Remote::collect()
1907            .name_of::<ExtraActor>()
1908            .unwrap()
1909            .to_string();
1910        let actor_params = bincode::serialize(&ExtraActor).unwrap();
1911        let actor_name = Name::Reserved("test_actor".to_string());
1912
1913        // 1. Spawn an actor via CreateOrUpdate.
1914        agent_ref
1915            .create_or_update(
1916                &client,
1917                actor_name.clone(),
1918                resource::Rank::new(0),
1919                ActorSpec {
1920                    actor_type: actor_type.clone(),
1921                    params_data: actor_params.clone(),
1922                },
1923            )
1924            .await
1925            .unwrap();
1926
1927        // 2. Subscribe to state updates.
1928        let (sub_port, mut sub_rx) = client.open_port::<resource::State<ActorState>>();
1929        agent_ref
1930            .stream_state(&client, actor_name.clone(), sub_port.bind())
1931            .await
1932            .unwrap();
1933
1934        // 3. Should receive the initial state (Running).
1935        let initial = sub_rx.recv().await.expect("subscriber channel error");
1936        assert_eq!(initial.status, resource::Status::Running);
1937        assert!(initial.state.is_some());
1938
1939        // 4. Send Stop — should receive Stopping.
1940        agent_ref
1941            .stop(&client, actor_name.clone(), "test".to_string())
1942            .await
1943            .unwrap();
1944
1945        let stopping = sub_rx.recv().await.expect("subscriber channel error");
1946        assert_eq!(stopping.status, resource::Status::Stopping);
1947
1948        // 5. Wait for the Stopped supervision event update.
1949        let stopped = sub_rx.recv().await.expect("subscriber channel error");
1950        assert_eq!(stopped.status, resource::Status::Stopped);
1951
1952        // 6. Test implicit unsubscription via undeliverable.
1953        let actor_name_2 = Name::Reserved("test_actor_2".to_string());
1954        agent_ref
1955            .create_or_update(
1956                &client,
1957                actor_name_2.clone(),
1958                resource::Rank::new(1),
1959                ActorSpec {
1960                    actor_type: actor_type.clone(),
1961                    params_data: actor_params.clone(),
1962                },
1963            )
1964            .await
1965            .unwrap();
1966
1967        let (sub_port_2, mut sub_rx_2) = client.open_port::<resource::State<ActorState>>();
1968        agent_ref
1969            .stream_state(&client, actor_name_2.clone(), sub_port_2.bind())
1970            .await
1971            .unwrap();
1972
1973        let initial_2 = sub_rx_2.recv().await.expect("subscriber 2 channel error");
1974        assert_eq!(initial_2.status, resource::Status::Running);
1975
1976        // Drop the receiver so the next send bounces as undeliverable.
1977        drop(sub_rx_2);
1978
1979        // Stop the second actor — triggers notify_status_changed to the
1980        // dead subscriber. ProcAgent should handle the undeliverable
1981        // gracefully.
1982        agent_ref
1983            .stop(
1984                &client,
1985                actor_name_2.clone(),
1986                "test unsubscribe".to_string(),
1987            )
1988            .await
1989            .unwrap();
1990
1991        // Wait for actor_2 to reach terminal state via a new stream subscription.
1992        let (sub_port_3, mut sub_rx_3) = client.open_port::<resource::State<ActorState>>();
1993        agent_ref
1994            .stream_state(&client, actor_name_2.clone(), sub_port_3.bind())
1995            .await
1996            .unwrap();
1997        loop {
1998            let state = sub_rx_3.recv().await.expect("subscriber 3 channel error");
1999            if state.status.is_terminating() {
2000                break;
2001            }
2002        }
2003
2004        // Verify ProcAgent is still alive after the undeliverable was handled.
2005        let state = agent_ref
2006            .get_state(&client, actor_name_2.clone())
2007            .await
2008            .unwrap();
2009        assert!(
2010            state.status.is_terminating(),
2011            "expected terminating status, got {:?}",
2012            state.status,
2013        );
2014    }
2015}