Skip to main content

hyperactor_mesh/
proc_agent.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The mesh agent actor manages procs in ProcMeshes.
10
11// EnumAsInner generates code that triggers a false positive
12// unused_assignments lint on struct variant fields. #[allow] on the
13// enum itself doesn't propagate into derive-macro-generated code, so
14// the suppression must be at module scope.
15#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use hyperactor::Actor;
22use hyperactor::ActorAddr;
23use hyperactor::ActorHandle;
24use hyperactor::Addr;
25use hyperactor::Bind;
26use hyperactor::Context;
27use hyperactor::Data;
28use hyperactor::Endpoint as _;
29use hyperactor::Handler;
30use hyperactor::Instance;
31use hyperactor::PortAddr;
32use hyperactor::PortHandle;
33use hyperactor::PortRef;
34use hyperactor::RemoteEndpoint as _;
35use hyperactor::Unbind;
36use hyperactor::actor::handle_undeliverable_message;
37use hyperactor::actor::remote::Remote;
38use hyperactor::mailbox::MessageEnvelope;
39use hyperactor::mailbox::Undeliverable;
40use hyperactor::proc::Proc;
41use hyperactor::supervision::ActorSupervisionEvent;
42use hyperactor_config::CONFIG;
43use hyperactor_config::ConfigAttr;
44use hyperactor_config::Flattrs;
45use hyperactor_config::attrs::declare_attrs;
46use serde::Deserialize;
47use serde::Serialize;
48use typeuri::Named;
49
50use crate::config_dump::ConfigDump;
51use crate::config_dump::ConfigDumpResult;
52use crate::introspect::ProcessMemoryStats;
53use crate::mesh_id::ResourceId;
54use crate::pyspy::PySpyDump;
55use crate::pyspy::PySpyProfile;
56use crate::pyspy::PySpyProfileWorker;
57use crate::pyspy::PySpyWorker;
58use crate::resource;
59
60/// Actor name used when spawning the proc agent on user procs.
61pub const PROC_AGENT_ACTOR_NAME: &str = "proc_agent";
62
63declare_attrs! {
64    /// Whether to self kill actors, procs, and hosts whose owner is not
65    /// reachable. `None` disables orphan cleanup entirely; `Some(d)` sets the
66    /// keepalive expiry to `d`.
67    @meta(CONFIG = ConfigAttr::new(
68        Some("HYPERACTOR_MESH_ORPHAN_TIMEOUT".to_string()),
69        Some("mesh_orphan_timeout".to_string()),
70    ))
71    pub attr MESH_ORPHAN_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
72
73    /// Interval at which each ProcAgent republishes introspection
74    /// on a periodic timer and emits the
75    /// `process.memory.rss_bytes` / `process.memory.vm_bytes`
76    /// Scuba/OTLP gauges. Linux only — has no effect on other
77    /// platforms. `Duration::ZERO` disables the periodic timer
78    /// (gauges then never fire); non-periodic republishes (boot,
79    /// post-spawn, supervision-event coalesce) still publish
80    /// introspect attrs but do not emit gauges.
81    @meta(CONFIG = ConfigAttr::new(
82        Some("HYPERACTOR_PROCESS_MEMORY_METRIC_INTERVAL".to_string()),
83        Some("process_memory_metric_interval".to_string()),
84    ))
85    pub attr PROCESS_MEMORY_METRIC_INTERVAL: Duration = Duration::from_secs(300);
86
87    /// Header tag for StreamState subscriber messages. When present on an
88    /// undeliverable envelope, ProcAgent removes the dead subscriber instead
89    /// of treating it as an error.
90    pub(crate) attr STREAM_STATE_SUBSCRIBER: bool;
91}
92
93/// Deferred republish of introspect properties.
94///
95/// Carries an `emit_memory_metrics` flag distinguishing two senders:
96///
97/// - `emit_memory_metrics: false` — sent from the supervision event
98///   handler with a delay so the supervision handler returns
99///   immediately without blocking the ProcAgent message loop.
100///   Multiple rapid supervision events (e.g., 4 actors failing
101///   simultaneously via broadcast) coalesce into a single republish
102///   via the `introspect_dirty` flag. Without this, calling
103///   `publish_introspect_properties` inline in the supervision
104///   handler starves `GetRankStatus` polls from the
105///   `ActorMeshController`, preventing `__supervise__` from firing
106///   within the test timeout. See D94960791 for the root cause
107///   analysis.
108///
109/// - `emit_memory_metrics: true` — sent from `Actor::init` and
110///   re-armed by the handler at the `PROCESS_MEMORY_METRIC_INTERVAL`
111///   cadence.
112#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
113struct RepublishIntrospect {
114    emit_memory_metrics: bool,
115}
116wirevalue::register_type!(RepublishIntrospect);
117
118/// Collect live actor children and system actor children from the
119/// proc's instance DashMap using `all_instance_keys()` with point
120/// lookups. This avoids the convoy starvation from `all_actor_ids()`
121/// which holds shard read locks while doing heavy per-entry work.
122/// See S12 in `introspect` module doc.
123fn collect_live_children(
124    proc: &hyperactor::Proc,
125) -> (
126    Vec<hyperactor::introspect::IntrospectRef>,
127    Vec<crate::introspect::NodeRef>,
128) {
129    let all_keys = proc.all_instance_keys();
130    let mut children = Vec::with_capacity(all_keys.len());
131    let mut system_children = Vec::new();
132    for id in all_keys {
133        if let Some(cell) = proc.get_instance_by_id(&id) {
134            let actor_addr = cell.actor_addr().clone();
135            if cell.is_system() {
136                system_children.push(crate::introspect::NodeRef::Actor(actor_addr.clone()));
137            }
138            children.push(hyperactor::introspect::IntrospectRef::Actor(actor_addr));
139        }
140    }
141    (children, system_children)
142}
143
144/// Actor state used for v1 API.
145#[derive(Debug)]
146struct ActorInstanceState {
147    create_rank: usize,
148    spawn: Result<ActorAddr, anyhow::Error>,
149    /// True once a stop signal has been sent. This does *not* mean the actor
150    /// has reached a terminal state — that is determined by observing
151    /// supervision events.
152    stop_initiated: bool,
153    /// The supervision event observed for this actor, if it has reached
154    /// terminal state.
155    supervision_event: Option<ActorSupervisionEvent>,
156    /// Streaming subscribers that receive `State<ActorState>` on every
157    /// state change. Dead subscribers are removed via undeliverable handling.
158    subscribers: Vec<PortRef<resource::State<ActorState>>>,
159    /// The time at which the actor should be considered expired if no further
160    /// keepalive is received. `None` meaning it will never expire.
161    expiry_time: Option<std::time::SystemTime>,
162    /// Monotonic generation counter, incremented on every state-mutating
163    /// operation (spawn, stop, supervision event). Used for last-writer-wins
164    /// ordering in the mesh controller.
165    generation: u64,
166    /// Pending `WaitRankStatus` callers: each entry is the minimum
167    /// status threshold and the reply port to send once the threshold
168    /// is met.
169    pending_wait_status: Vec<(resource::Status, PortRef<crate::StatusOverlay>)>,
170}
171
172impl ActorInstanceState {
173    /// Derive the resource status from spawn result, stop initiation,
174    /// and the observed supervision event.
175    fn status(&self) -> resource::Status {
176        match &self.spawn {
177            Err(e) => resource::Status::Failed(e.to_string()),
178            Ok(_) => match &self.supervision_event {
179                Some(event) if event.is_error() => resource::Status::Failed(format!("{}", event)),
180                Some(_) => resource::Status::Stopped,
181                None if self.stop_initiated => resource::Status::Stopping,
182                None => resource::Status::Running,
183            },
184        }
185    }
186
187    /// True if the actor has reached a terminal state (stopped or failed),
188    /// or if it never successfully spawned.
189    fn is_terminal(&self) -> bool {
190        match &self.spawn {
191            Err(_) => true,
192            Ok(_) => self.supervision_event.is_some(),
193        }
194    }
195
196    /// True if the supervision event is an error.
197    fn has_errors(&self) -> bool {
198        self.supervision_event
199            .as_ref()
200            .is_some_and(|e| e.is_error())
201    }
202
203    /// Build the `State<ActorState>` for this instance, suitable for
204    /// replies and subscriber notifications.
205    fn to_state(&self, id: &ResourceId) -> resource::State<ActorState> {
206        let status = self.status();
207        let actor_state = self.spawn.as_ref().ok().map(|actor_id| ActorState {
208            actor_id: actor_id.clone(),
209            create_rank: self.create_rank,
210            supervision_events: self.supervision_event.clone().into_iter().collect(),
211        });
212        resource::State {
213            id: id.clone(),
214            status,
215            state: actor_state,
216            generation: self.generation,
217            timestamp: std::time::SystemTime::now(),
218        }
219    }
220
221    /// Notify all observers that this actor's status has changed:
222    /// streaming subscribers get the full state, and one-shot
223    /// `WaitRankStatus` waiters whose threshold is now met get replied
224    /// to and removed.
225    fn notify_status_changed(&mut self, cx: &impl hyperactor::context::Actor, id: &ResourceId) {
226        // Streaming subscribers (persistent).
227        let state = self.to_state(id);
228        for subscriber in &self.subscribers {
229            let mut headers = Flattrs::new();
230            headers.set(STREAM_STATE_SUBSCRIBER, true);
231            subscriber.post_with_headers(cx, headers, state.clone());
232        }
233
234        // One-shot waiters (predicated).
235        let status = self.status();
236        self.pending_wait_status.retain(|(min_status, reply)| {
237            if status >= *min_status {
238                let rank = self.create_rank;
239                let overlay =
240                    crate::StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status.clone())])
241                        .expect("valid single-run overlay");
242                let _ = reply.post(cx, overlay);
243                false
244            } else {
245                true
246            }
247        });
248    }
249}
250
251#[derive(
252    Clone,
253    Debug,
254    Default,
255    PartialEq,
256    Serialize,
257    Deserialize,
258    Named,
259    Bind,
260    Unbind
261)]
262pub(crate) struct SelfCheck {}
263
264/// A mesh agent is responsible for managing procs in a [`ProcMesh`].
265///
266/// ## Supervision event ingestion (remote)
267///
268/// `ProcAgent` is the *process/rank-local* sink for
269/// `ActorSupervisionEvent`s produced by the runtime (actor failures,
270/// routing failures, undeliverables, etc.).
271///
272/// We **export** `ActorSupervisionEvent` as a handler so that other
273/// procs—most importantly the process-global root client created by
274/// `context()`—can forward undeliverables as supervision
275/// events to the *currently active* mesh.
276///
277/// Without exporting this handler, `ActorSupervisionEvent` cannot be
278/// addressed via `ActorAddr`/`PortAddr` across processes, and the
279/// global-root-client undeliverable → supervision pipeline would
280/// degrade to log-only behavior (events become undeliverable again or
281/// are dropped).
282///
283/// See GC-1 in `global_context` module doc.
284#[hyperactor::export(
285    handlers=[
286        ActorSupervisionEvent,
287        resource::CreateOrUpdate<ActorSpec> { cast = true },
288        resource::Stop { cast = true },
289        resource::StopAll { cast = true },
290        resource::GetState<ActorState> { cast = true },
291        resource::StreamState<ActorState> { cast = true },
292        resource::KeepaliveGetState<ActorState> { cast = true },
293        resource::GetRankStatus { cast = true },
294        resource::WaitRankStatus { cast = true },
295        RepublishIntrospect { cast = true },
296        PySpyDump,
297        PySpyProfile,
298        ConfigDump,
299    ]
300)]
301pub struct ProcAgent {
302    proc: Proc,
303    remote: Remote,
304    /// Actors created and tracked through the resource behavior.
305    actor_states: HashMap<ResourceId, ActorInstanceState>,
306    /// If true, and supervisor is None, record supervision events to be reported
307    /// to owning actors later.
308    record_supervision_events: bool,
309    /// True when supervision events have arrived but introspect
310    /// properties haven't been republished yet.
311    introspect_dirty: bool,
312    /// If set, the shutdown handler will send the exit code through this
313    /// channel instead of calling process::exit directly, allowing the
314    /// caller to perform graceful shutdown (e.g. draining the mailbox server).
315    shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
316    /// True once a StopAll message has been received. When set, the
317    /// supervision event handler checks whether all actors have reached
318    /// terminal state and, if so, triggers process shutdown.
319    stopping_all: bool,
320    /// If set, check for expired actors whose keepalive has lapsed.
321    mesh_orphan_timeout: Option<Duration>,
322}
323
324impl ProcAgent {
325    pub(crate) fn boot_v1(
326        proc: Proc,
327        shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
328    ) -> Result<ActorHandle<Self>, anyhow::Error> {
329        let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
330        let agent = ProcAgent {
331            proc: proc.clone(),
332            remote: Remote::collect(),
333            actor_states: HashMap::new(),
334            record_supervision_events: true,
335            introspect_dirty: false,
336            shutdown_tx,
337            stopping_all: false,
338            mesh_orphan_timeout: orphan_timeout,
339        };
340        proc.spawn::<Self>(PROC_AGENT_ACTOR_NAME, agent)
341    }
342
343    /// Returns true when every tracked actor has a terminal supervision event
344    /// (or failed to spawn). Used to determine when shutdown can proceed
345    /// after a StopAll.
346    fn all_actors_terminal(&self) -> bool {
347        self.actor_states.values().all(|state| state.is_terminal())
348    }
349
350    /// Trigger process shutdown. Flushes the forwarder first so that
351    /// supervision events reach their destinations, then sends through
352    /// `shutdown_tx` if available, otherwise calls `process::exit`.
353    async fn shutdown(&mut self) {
354        let has_errors = self.actor_states.values().any(|state| state.has_errors());
355        let exit_code = if has_errors { 1 } else { 0 };
356
357        let flush_timeout =
358            hyperactor_config::global::get(hyperactor::config::FORWARDER_FLUSH_TIMEOUT);
359        match tokio::time::timeout(flush_timeout, self.proc.flush()).await {
360            Ok(Err(err)) => {
361                tracing::warn!("forwarder flush failed during shutdown: {}", err);
362            }
363            Err(_elapsed) => {
364                tracing::warn!("forwarder flush timed out during shutdown");
365            }
366            Ok(Ok(())) => {}
367        }
368
369        // Stop and join the mailbox server (no-op if this proc was
370        // not created with one). Pending receive-side acks are
371        // flushed before the underlying channel server is torn down.
372        self.proc.join_mailbox_server().await;
373
374        tracing::info!(
375            "shutting down process after all actors reached terminal state (exit_code={})",
376            exit_code,
377        );
378
379        if let Some(tx) = self.shutdown_tx.take() {
380            let _ = tx.send(exit_code);
381            return;
382        }
383        std::process::exit(exit_code);
384    }
385
386    /// Send a stop signal to an actor on this proc. This is fire-and-forget;
387    /// it does not wait for the actor to reach terminal status.
388    fn stop_actor_by_id(&self, actor_id: &ActorAddr, reason: &str) {
389        tracing::info!(
390            name = "StopActor",
391            %actor_id,
392            actor_name = actor_id.log_name(),
393            %reason,
394        );
395        self.proc.stop_actor(actor_id.id(), reason.to_string());
396    }
397
398    /// Publish the current proc properties and children list for
399    /// introspection. See S12 in `introspect` module doc.
400    fn publish_introspect_properties(
401        &self,
402        cx: &impl hyperactor::context::Actor,
403    ) -> ProcessMemoryStats {
404        let (mut children, mut system_children) = collect_live_children(&self.proc);
405
406        // Terminated actors appear as children but don't inflate
407        // the actor count. Track them in stopped_children so the
408        // TUI can filter/gray without per-child fetches.
409        let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
410        for id in self.proc.all_terminated_actor_ids() {
411            let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
412            let node_ref = crate::introspect::NodeRef::Actor(id.clone());
413            stopped_children.push(node_ref.clone());
414            if let Some(snapshot) = self.proc.terminated_snapshot(&id) {
415                let snapshot_attrs: hyperactor_config::Attrs =
416                    serde_json::from_str(&snapshot.attrs).unwrap_or_default();
417                if snapshot_attrs
418                    .get(hyperactor::introspect::IS_SYSTEM)
419                    .copied()
420                    .unwrap_or(false)
421                {
422                    system_children.push(node_ref);
423                }
424            }
425            if !children.contains(&child_ref) {
426                children.push(child_ref);
427            }
428        }
429
430        let stopped_retention_cap =
431            hyperactor_config::global::get(hyperactor::config::TERMINATED_SNAPSHOT_RETENTION);
432
433        // FI-5: is_poisoned iff failed_actor_count > 0.
434        let failed_actor_count = self
435            .actor_states
436            .values()
437            .filter(|s| s.has_errors())
438            .count();
439
440        // Attrs-based introspection.
441        let num_live = children.len();
442        let mut attrs = hyperactor_config::Attrs::new();
443        attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
444        attrs.set(
445            crate::introspect::PROC_NAME,
446            self.proc
447                .proc_addr()
448                .label()
449                .map(|l| l.as_str().to_string())
450                .unwrap_or_else(|| self.proc.proc_addr().id().to_string()),
451        );
452        attrs.set(crate::introspect::NUM_ACTORS, num_live);
453        attrs.set(hyperactor::introspect::CHILDREN, children);
454        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
455        attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
456        attrs.set(
457            crate::introspect::STOPPED_RETENTION_CAP,
458            stopped_retention_cap,
459        );
460        attrs.set(crate::introspect::IS_POISONED, failed_actor_count > 0);
461        attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
462
463        // PD-* proc debug stats intentionally join two signal classes:
464        // hosting-process memory for the OS process that owns this
465        // proc, and proc-local queue pressure aggregated over live
466        // actors only.
467        let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
468        memory.to_attrs(&mut attrs);
469
470        // Proc-wide total from runtime accounting path (O(1)).
471        let queue_total = self.proc.queue_depth_total();
472        attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL, queue_total);
473
474        // Per-actor max still needs the per-actor scan (PD-4: live actors only).
475        let mut queue_max: u64 = 0;
476        for actor_id in self.proc.all_instance_keys() {
477            if let Some(cell) = self.proc.get_instance_by_id(&actor_id) {
478                queue_max = queue_max.max(cell.queue_depth());
479            }
480        }
481        attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX, queue_max);
482
483        // Retained queue-pressure evidence (PD-6, PD-7).
484        attrs.set(
485            crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
486            self.proc.queue_depth_high_water_mark(),
487        );
488        attrs.set(
489            crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
490            self.proc.last_nonzero_queue_depth_age_ms(),
491        );
492
493        cx.instance().publish_attrs(attrs);
494
495        memory
496    }
497}
498
499#[async_trait]
500impl Actor for ProcAgent {
501    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
502        this.set_system();
503        self.proc.set_supervision_coordinator(this.port())?;
504        let _ = self.publish_introspect_properties(this);
505
506        // Resolve terminated actor snapshots via QueryChild so that
507        // dead actors remain directly queryable by reference.
508        let proc = self.proc.clone();
509        let self_id = this.self_addr().clone();
510        this.set_query_child_handler(move |child_ref| {
511            use hyperactor::introspect::IntrospectResult;
512
513            if let Addr::Actor(actor_ref) = child_ref
514                && let Some(snapshot) = proc.terminated_snapshot(actor_ref)
515            {
516                return snapshot;
517            }
518
519            // PA-1 (ProcAgent path): proc-node children used by
520            // admin/TUI must be computed from live proc state at query
521            // time, not solely from cached published_properties.
522            // Therefore a direct proc.spawn() actor must appear on the
523            // next QueryChild(Addr::Proc) response without an
524            // extra publish event. See
525            // test_query_child_proc_returns_live_children.
526            if let Addr::Proc(proc_ref) = child_ref
527                && *proc_ref == proc.proc_addr()
528            {
529                let (mut children, mut system_children) = collect_live_children(&proc);
530
531                let mut stopped_children: Vec<crate::introspect::NodeRef> = Vec::new();
532                for id in proc.all_terminated_actor_ids() {
533                    let child_ref = hyperactor::introspect::IntrospectRef::Actor(id.clone());
534                    let node_ref = crate::introspect::NodeRef::Actor(id.clone());
535                    stopped_children.push(node_ref.clone());
536                    if let Some(snapshot) = proc.terminated_snapshot(&id) {
537                        let snapshot_attrs: hyperactor_config::Attrs =
538                            serde_json::from_str(&snapshot.attrs).unwrap_or_default();
539                        if snapshot_attrs
540                            .get(hyperactor::introspect::IS_SYSTEM)
541                            .copied()
542                            .unwrap_or(false)
543                        {
544                            system_children.push(node_ref);
545                        }
546                    }
547                    if !children.contains(&child_ref) {
548                        children.push(child_ref);
549                    }
550                }
551
552                let stopped_retention_cap = hyperactor_config::global::get(
553                    hyperactor::config::TERMINATED_SNAPSHOT_RETENTION,
554                );
555
556                let (is_poisoned, failed_actor_count) = proc
557                    .get_instance(&self_id)
558                    .and_then(|cell| cell.published_attrs())
559                    .map(|attrs| {
560                        let is_poisoned = attrs
561                            .get(crate::introspect::IS_POISONED)
562                            .copied()
563                            .unwrap_or(false);
564                        let failed_actor_count = attrs
565                            .get(crate::introspect::FAILED_ACTOR_COUNT)
566                            .copied()
567                            .unwrap_or(0);
568                        (is_poisoned, failed_actor_count)
569                    })
570                    .unwrap_or((false, 0));
571
572                // Build attrs for this proc node.
573                let num_live = children.len();
574                let mut attrs = hyperactor_config::Attrs::new();
575                attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
576                attrs.set(
577                    crate::introspect::PROC_NAME,
578                    proc_ref
579                        .label()
580                        .map(|l| l.as_str().to_string())
581                        .unwrap_or_else(|| proc_ref.id().to_string()),
582                );
583                attrs.set(crate::introspect::NUM_ACTORS, num_live);
584                attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
585                attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
586                attrs.set(
587                    crate::introspect::STOPPED_RETENTION_CAP,
588                    stopped_retention_cap,
589                );
590                attrs.set(crate::introspect::IS_POISONED, is_poisoned);
591                attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
592
593                // PD-*: include proc debug stats in QueryChild
594                // to prevent resolution drift from the publish path.
595                let memory = crate::introspect::ProcessMemoryStats::read_from_procfs();
596                memory.to_attrs(&mut attrs);
597                attrs.set(
598                    crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL,
599                    proc.queue_depth_total(),
600                );
601                let mut queue_max: u64 = 0;
602                for aid in proc.all_instance_keys() {
603                    if let Some(cell) = proc.get_instance_by_id(&aid) {
604                        queue_max = queue_max.max(cell.queue_depth());
605                    }
606                }
607                attrs.set(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX, queue_max);
608                attrs.set(
609                    crate::introspect::ACTOR_WORK_QUEUE_DEPTH_HIGH_WATER_MARK,
610                    proc.queue_depth_high_water_mark(),
611                );
612                attrs.set(
613                    crate::introspect::LAST_NONZERO_QUEUE_DEPTH_AGE_MS,
614                    proc.last_nonzero_queue_depth_age_ms(),
615                );
616
617                let attrs_json = serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
618
619                return IntrospectResult {
620                    identity: hyperactor::introspect::IntrospectRef::Proc(proc_ref.clone()),
621                    attrs: attrs_json,
622                    children,
623                    parent: None,
624                    as_of: std::time::SystemTime::now(),
625                };
626            }
627
628            {
629                let mut error_attrs = hyperactor_config::Attrs::new();
630                error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
631                error_attrs.set(
632                    hyperactor::introspect::ERROR_MESSAGE,
633                    format!("child {} not found", child_ref),
634                );
635                let identity = match child_ref {
636                    Addr::Proc(p) => hyperactor::introspect::IntrospectRef::Proc(p.clone()),
637                    Addr::Actor(a) => hyperactor::introspect::IntrospectRef::Actor(a.clone()),
638                    Addr::Port(p) => hyperactor::introspect::IntrospectRef::Actor(p.actor_addr()),
639                };
640                IntrospectResult {
641                    identity,
642                    attrs: serde_json::to_string(&error_attrs).unwrap_or_else(|_| "{}".to_string()),
643                    children: Vec::new(),
644                    parent: None,
645                    as_of: std::time::SystemTime::now(),
646                }
647            }
648        });
649
650        if let Some(delay) = &self.mesh_orphan_timeout {
651            this.post_after(this, SelfCheck::default(), *delay);
652        }
653        if cfg!(target_os = "linux") {
654            let interval = hyperactor_config::global::get(PROCESS_MEMORY_METRIC_INTERVAL);
655            if !interval.is_zero() {
656                this.post_after(
657                    this,
658                    RepublishIntrospect {
659                        emit_memory_metrics: true,
660                    },
661                    interval,
662                );
663            }
664        }
665        Ok(())
666    }
667
668    async fn handle_undeliverable_message(
669        &mut self,
670        cx: &Instance<Self>,
671        envelope: Undeliverable<MessageEnvelope>,
672    ) -> Result<(), anyhow::Error> {
673        let Some(returned) = envelope.as_message() else {
674            return handle_undeliverable_message(cx, envelope);
675        };
676        if let Some(true) = returned.headers().get(STREAM_STATE_SUBSCRIBER) {
677            let dest_port_id: PortAddr = returned.dest().clone();
678            let port = PortRef::<resource::State<ActorState>>::attest(dest_port_id);
679            // Remove this subscriber from whichever actor instance holds it.
680            for instance in self.actor_states.values_mut() {
681                instance.subscribers.retain(|s| s != &port);
682            }
683            Ok(())
684        } else {
685            handle_undeliverable_message(cx, envelope)
686        }
687    }
688}
689
690#[async_trait]
691impl Handler<ActorSupervisionEvent> for ProcAgent {
692    async fn handle(
693        &mut self,
694        cx: &Context<Self>,
695        event: ActorSupervisionEvent,
696    ) -> anyhow::Result<()> {
697        if self.record_supervision_events {
698            if event.is_error() {
699                tracing::warn!(
700                    name = "SupervisionEvent",
701                    proc_id = %self.proc.proc_addr(),
702                    %event,
703                    "recording supervision error",
704                );
705            } else {
706                tracing::debug!(
707                    name = "SupervisionEvent",
708                    proc_id = %self.proc.proc_addr(),
709                    %event,
710                    "recording non-error supervision event",
711                );
712            }
713            // Record the event in the actor's instance state and notify subscribers.
714            if let Some((id, instance)) = self.actor_states.iter_mut().find(|(_, s)| {
715                s.spawn
716                    .as_ref()
717                    .ok()
718                    .is_some_and(|actor_id| actor_id.id() == event.actor_id.id())
719            }) {
720                instance.supervision_event = Some(event.clone());
721                instance.generation += 1;
722                let id = id.clone();
723                instance.notify_status_changed(cx, &id);
724            }
725            // Defer republish so introspection picks up is_poisoned /
726            // failed_actor_count without blocking the message loop.
727            // Multiple rapid events coalesce into one republish.
728            if !self.introspect_dirty {
729                self.introspect_dirty = true;
730                cx.post_after(
731                    cx,
732                    RepublishIntrospect {
733                        emit_memory_metrics: false,
734                    },
735                    std::time::Duration::from_millis(100),
736                );
737            }
738
739            // If StopAll was requested, check whether all actors have now
740            // reached terminal state. If so, shut down the process.
741            if self.stopping_all && self.all_actors_terminal() {
742                self.shutdown().await;
743            }
744        }
745        if !self.record_supervision_events && event.is_error() {
746            // If there is no supervisor, and nothing is recording these, crash
747            // the whole process on error events.
748            tracing::error!(
749                name = "supervision_event_transmit_failed",
750                proc_id = %cx.self_addr().proc_addr(),
751                %event,
752                "could not propagate supervision event, crashing",
753            );
754
755            // We should have a custom "crash" function here, so that this works
756            // in testing of the LocalAllocator, etc.
757            std::process::exit(1);
758        }
759        Ok(())
760    }
761}
762
763#[async_trait]
764impl Handler<RepublishIntrospect> for ProcAgent {
765    async fn handle(&mut self, cx: &Context<Self>, msg: RepublishIntrospect) -> anyhow::Result<()> {
766        self.introspect_dirty = false;
767        let memory = self.publish_introspect_properties(cx);
768        if msg.emit_memory_metrics {
769            let proc_id = self.proc.proc_addr().to_string();
770            let pid = std::process::id() as i64;
771            if let Some(rss) = memory.process_rss_bytes {
772                crate::metrics::PROCESS_RSS_BYTES.record(
773                    rss as f64,
774                    hyperactor_telemetry::kv_pairs!(
775                        "proc_id" => proc_id.clone(),
776                        "pid" => pid,
777                    ),
778                );
779            }
780            if let Some(vm) = memory.process_vm_size_bytes {
781                crate::metrics::PROCESS_VM_SIZE_BYTES.record(
782                    vm as f64,
783                    hyperactor_telemetry::kv_pairs!(
784                        "proc_id" => proc_id,
785                        "pid" => pid,
786                    ),
787                );
788            }
789            let interval = hyperactor_config::global::get(PROCESS_MEMORY_METRIC_INTERVAL);
790            if !interval.is_zero() {
791                cx.post_after(
792                    cx,
793                    RepublishIntrospect {
794                        emit_memory_metrics: true,
795                    },
796                    interval,
797                );
798            }
799        }
800        Ok(())
801    }
802}
803
804#[async_trait]
805impl Handler<PySpyDump> for ProcAgent {
806    async fn handle(
807        &mut self,
808        cx: &Context<Self>,
809        message: PySpyDump,
810    ) -> Result<(), anyhow::Error> {
811        PySpyWorker::spawn_and_forward(cx, message.opts, message.result)
812    }
813}
814
815#[async_trait]
816impl Handler<PySpyProfile> for ProcAgent {
817    async fn handle(
818        &mut self,
819        cx: &Context<Self>,
820        message: PySpyProfile,
821    ) -> Result<(), anyhow::Error> {
822        PySpyProfileWorker::spawn_and_forward(cx, message.request, message.result)
823    }
824}
825
826#[async_trait]
827impl Handler<ConfigDump> for ProcAgent {
828    async fn handle(
829        &mut self,
830        cx: &Context<Self>,
831        message: ConfigDump,
832    ) -> Result<(), anyhow::Error> {
833        let entries = hyperactor_config::global::config_entries();
834        // Reply is best-effort: the caller may have timed out and dropped
835        // the once-port.  That must not crash this actor.
836        let _ = message.result.post(cx, ConfigDumpResult { entries });
837        Ok(())
838    }
839}
840
841// Implement the resource behavior for managing actors:
842
843/// Actor spec.
844#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
845pub struct ActorSpec {
846    /// registered actor type
847    pub actor_type: String,
848    /// serialized parameters
849    pub params_data: Data,
850}
851wirevalue::register_type!(ActorSpec);
852
853/// Actor state.
854#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
855pub struct ActorState {
856    /// The actor's ID.
857    pub actor_id: ActorAddr,
858    /// The rank of the proc that created the actor. This is before any slicing.
859    pub create_rank: usize,
860    // TODO status: ActorStatus,
861    pub supervision_events: Vec<ActorSupervisionEvent>,
862}
863wirevalue::register_type!(ActorState);
864
865#[async_trait]
866impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcAgent {
867    async fn handle(
868        &mut self,
869        cx: &Context<Self>,
870        create_or_update: resource::CreateOrUpdate<ActorSpec>,
871    ) -> anyhow::Result<()> {
872        if self.actor_states.contains_key(&create_or_update.id) {
873            // There is no update.
874            return Ok(());
875        }
876        let create_rank = create_or_update.rank.unwrap();
877        // If any actor on this proc has error supervision events,
878        // we disallow spawning new actors on it, as this proc may be in an
879        // invalid state.
880        if self.actor_states.values().any(|s| s.has_errors()) {
881            self.actor_states.insert(
882                create_or_update.id.clone(),
883                ActorInstanceState {
884                    spawn: Err(anyhow::anyhow!(
885                        "Cannot spawn new actors on mesh with supervision events"
886                    )),
887                    create_rank,
888                    stop_initiated: false,
889                    supervision_event: None,
890                    subscribers: Vec::new(),
891                    expiry_time: None,
892                    generation: 1,
893                    pending_wait_status: Vec::new(),
894                },
895            );
896            return Ok(());
897        }
898
899        let ActorSpec {
900            actor_type,
901            params_data,
902        } = create_or_update.spec;
903        self.actor_states.insert(
904            create_or_update.id.clone(),
905            ActorInstanceState {
906                create_rank,
907                spawn: self
908                    .remote
909                    .gspawn(
910                        &self.proc,
911                        &actor_type,
912                        create_or_update.id.uid().clone(),
913                        params_data,
914                        cx.headers().clone(),
915                    )
916                    .await,
917                stop_initiated: false,
918                supervision_event: None,
919                subscribers: Vec::new(),
920                expiry_time: None,
921                generation: 1,
922                pending_wait_status: Vec::new(),
923            },
924        );
925
926        let _ = self.publish_introspect_properties(cx);
927        Ok(())
928    }
929}
930
931#[async_trait]
932impl Handler<resource::Stop> for ProcAgent {
933    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
934        let actor_id = match self.actor_states.get_mut(&message.id) {
935            Some(actor_state) => {
936                let id = actor_state.spawn.as_ref().ok().cloned();
937                if id.is_some() && !actor_state.stop_initiated {
938                    actor_state.stop_initiated = true;
939                    actor_state.generation += 1;
940                    actor_state.notify_status_changed(cx, &message.id);
941                    id
942                } else {
943                    None
944                }
945            }
946            None => None,
947        };
948        if let Some(actor_id) = actor_id {
949            self.stop_actor_by_id(&actor_id, &message.reason);
950        }
951
952        Ok(())
953    }
954}
955
956/// Handles `StopAll` by sending stop signals to all child actors.
957/// Process shutdown is deferred until all actors have reached terminal
958/// state, as observed through supervision events.
959#[async_trait]
960impl Handler<resource::StopAll> for ProcAgent {
961    async fn handle(
962        &mut self,
963        _cx: &Context<Self>,
964        message: resource::StopAll,
965    ) -> anyhow::Result<()> {
966        self.stopping_all = true;
967
968        // Send stop signals to all actors that haven't been stopped yet.
969        let to_stop: Vec<ActorAddr> = self
970            .actor_states
971            .values_mut()
972            .filter_map(|state| {
973                if state.stop_initiated {
974                    return None;
975                }
976                state.stop_initiated = true;
977                state.spawn.as_ref().ok().cloned()
978            })
979            .collect();
980
981        for actor_id in &to_stop {
982            self.stop_actor_by_id(actor_id, &message.reason);
983        }
984
985        // If there are no actors to stop, shut down immediately.
986        if self.all_actors_terminal() {
987            self.shutdown().await;
988        }
989
990        Ok(())
991    }
992}
993
994#[async_trait]
995impl Handler<resource::GetRankStatus> for ProcAgent {
996    async fn handle(
997        &mut self,
998        cx: &Context<Self>,
999        get_rank_status: resource::GetRankStatus,
1000    ) -> anyhow::Result<()> {
1001        use crate::StatusOverlay;
1002        use crate::resource::Status;
1003
1004        let (rank, status) = match self.actor_states.get(&get_rank_status.id) {
1005            Some(state) => (state.create_rank, state.status()),
1006            None => (usize::MAX, Status::NotExist),
1007        };
1008
1009        // Send a sparse overlay update. If rank is unknown, emit an
1010        // empty overlay.
1011        let overlay = if rank == usize::MAX {
1012            StatusOverlay::new()
1013        } else {
1014            StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1015                .expect("valid single-run overlay")
1016        };
1017        get_rank_status.reply.post(cx, overlay);
1018        Ok(())
1019    }
1020}
1021
1022#[async_trait]
1023impl Handler<resource::WaitRankStatus> for ProcAgent {
1024    async fn handle(
1025        &mut self,
1026        cx: &Context<Self>,
1027        msg: resource::WaitRankStatus,
1028    ) -> anyhow::Result<()> {
1029        use crate::StatusOverlay;
1030        use crate::resource::Status;
1031
1032        let (rank, status) = match self.actor_states.get(&msg.id) {
1033            Some(state) => (state.create_rank, state.status()),
1034            None => (usize::MAX, Status::NotExist),
1035        };
1036
1037        // If already at or past the requested threshold, reply immediately.
1038        if status >= msg.min_status || rank == usize::MAX {
1039            let overlay = if rank == usize::MAX {
1040                StatusOverlay::new()
1041            } else {
1042                StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
1043                    .expect("valid single-run overlay")
1044            };
1045            let _ = msg.reply.post(cx, overlay);
1046            return Ok(());
1047        }
1048
1049        // Otherwise, stash the waiter. It will be flushed when the
1050        // status changes (supervision event or stop).
1051        if let Some(state) = self.actor_states.get_mut(&msg.id) {
1052            state.pending_wait_status.push((msg.min_status, msg.reply));
1053        }
1054        Ok(())
1055    }
1056}
1057
1058#[async_trait]
1059impl Handler<resource::GetState<ActorState>> for ProcAgent {
1060    async fn handle(
1061        &mut self,
1062        cx: &Context<Self>,
1063        get_state: resource::GetState<ActorState>,
1064    ) -> anyhow::Result<()> {
1065        let state = match self.actor_states.get(&get_state.id) {
1066            Some(instance) => instance.to_state(&get_state.id),
1067            None => resource::State {
1068                id: get_state.id.clone(),
1069                status: resource::Status::NotExist,
1070                state: None,
1071                generation: 0,
1072                timestamp: std::time::SystemTime::now(),
1073            },
1074        };
1075
1076        get_state.reply.post(cx, state);
1077        Ok(())
1078    }
1079}
1080
1081#[async_trait]
1082impl Handler<resource::StreamState<ActorState>> for ProcAgent {
1083    async fn handle(
1084        &mut self,
1085        cx: &Context<Self>,
1086        stream_state: resource::StreamState<ActorState>,
1087    ) -> anyhow::Result<()> {
1088        let state = match self.actor_states.get_mut(&stream_state.id) {
1089            Some(instance) => {
1090                let state = instance.to_state(&stream_state.id);
1091                instance.subscribers.push(stream_state.subscriber.clone());
1092                state
1093            }
1094            None => resource::State {
1095                id: stream_state.id.clone(),
1096                status: resource::Status::NotExist,
1097                state: None,
1098                generation: 0,
1099                timestamp: std::time::SystemTime::now(),
1100            },
1101        };
1102
1103        // Send the current state immediately.
1104        let mut headers = Flattrs::new();
1105        headers.set(STREAM_STATE_SUBSCRIBER, true);
1106        stream_state
1107            .subscriber
1108            .post_with_headers(cx, headers, state);
1109        Ok(())
1110    }
1111}
1112
1113#[async_trait]
1114impl Handler<resource::KeepaliveGetState<ActorState>> for ProcAgent {
1115    async fn handle(
1116        &mut self,
1117        cx: &Context<Self>,
1118        message: resource::KeepaliveGetState<ActorState>,
1119    ) -> anyhow::Result<()> {
1120        // Same impl as GetState, but additionally update the expiry time on the actor.
1121        if let Ok(instance_state) =
1122            self.actor_states
1123                .get_mut(&message.get_state.id)
1124                .ok_or_else(|| {
1125                    anyhow::anyhow!(
1126                        "attempting to register a keepalive for an actor that doesn't exist: {}",
1127                        message.get_state.id
1128                    )
1129                })
1130        {
1131            instance_state.expiry_time = Some(message.expires_after);
1132        }
1133
1134        // Forward the rest of the impl to GetState.
1135        <Self as Handler<resource::GetState<ActorState>>>::handle(self, cx, message.get_state).await
1136    }
1137}
1138
1139/// A local handler to get a new client instance on the proc.
1140/// This is used to create root client instances.
1141#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1142pub struct NewClientInstance {
1143    #[reply]
1144    pub client_instance: PortHandle<Instance<()>>,
1145}
1146
1147#[async_trait]
1148impl Handler<NewClientInstance> for ProcAgent {
1149    async fn handle(
1150        &mut self,
1151        cx: &Context<Self>,
1152        NewClientInstance { client_instance }: NewClientInstance,
1153    ) -> anyhow::Result<()> {
1154        let (instance, _handle) = self.proc.client("client")?;
1155        client_instance.post(cx, instance);
1156        Ok(())
1157    }
1158}
1159
1160/// A handler to get a clone of the proc managed by this agent.
1161/// This is used to obtain the local proc from a host mesh.
1162#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1163pub struct GetProc {
1164    #[reply]
1165    pub proc: PortHandle<Proc>,
1166}
1167
1168#[async_trait]
1169impl Handler<GetProc> for ProcAgent {
1170    async fn handle(
1171        &mut self,
1172        cx: &Context<Self>,
1173        GetProc { proc }: GetProc,
1174    ) -> anyhow::Result<()> {
1175        proc.post(cx, self.proc.clone());
1176        Ok(())
1177    }
1178}
1179
1180#[async_trait]
1181impl Handler<SelfCheck> for ProcAgent {
1182    async fn handle(&mut self, cx: &Context<Self>, _: SelfCheck) -> anyhow::Result<()> {
1183        // Check each actor's expiry time. If the current time is past the expiry,
1184        // stop the actor. This allows automatic cleanup when a controller disappears
1185        // but owned resources remain. It is important that this check runs on the
1186        // same proc as the child actor itself, since the controller could be dead or
1187        // disconnected.
1188        let Some(duration) = &self.mesh_orphan_timeout else {
1189            return Ok(());
1190        };
1191        let duration = *duration;
1192        let now = std::time::SystemTime::now();
1193
1194        // Collect expired actors before mutating, since stop_actor borrows &mut self.
1195        let expired: Vec<(ResourceId, ActorAddr)> = self
1196            .actor_states
1197            .iter()
1198            .filter_map(|(id, state)| {
1199                let expiry = state.expiry_time?;
1200                // If a stop was already initiated we don't need to do it again.
1201                if now > expiry
1202                    && !state.stop_initiated
1203                    && let Ok(actor_id) = &state.spawn
1204                {
1205                    return Some((id.clone(), actor_id.clone()));
1206                }
1207                None
1208            })
1209            .collect();
1210
1211        if !expired.is_empty() {
1212            tracing::info!(
1213                "stopping {} orphaned actors past their keepalive expiry",
1214                expired.len(),
1215            );
1216        }
1217
1218        for (id, actor_id) in expired {
1219            if let Some(state) = self.actor_states.get_mut(&id) {
1220                state.stop_initiated = true;
1221            }
1222            self.stop_actor_by_id(&actor_id, "orphaned");
1223        }
1224
1225        // Reschedule.
1226        cx.post_after(cx, SelfCheck::default(), duration);
1227        Ok(())
1228    }
1229}
1230
1231#[cfg(test)]
1232mod tests {
1233    use std::sync::Arc;
1234
1235    use hyperactor::ActorRef;
1236
1237    use super::*;
1238
1239    // A no-op actor used to test direct proc-level spawning.
1240    #[derive(Debug, Default, Serialize, Deserialize)]
1241    #[hyperactor::export(handlers = [])]
1242    struct ExtraActor;
1243    impl hyperactor::Actor for ExtraActor {}
1244    hyperactor::register_spawnable!(ExtraActor);
1245    // Verifies that QueryChild(Addr::Proc) on a ProcAgent returns
1246    // a live IntrospectResult whose children reflect actors spawned
1247    // directly on the proc — i.e. via proc.spawn(), which bypasses the
1248    // gspawn message handler and therefore never triggers
1249    // publish_introspect_properties.
1250    //
1251    // Exercises PA-1 (see mesh_admin module doc).
1252    //
1253    // Regression guard for the bug introduced in 9a08d559: removing
1254    // handle_introspect left publish_introspect_properties as the only
1255    // update path, which missed supervision-spawned actors (e.g. every
1256    // sieve actor after sieve[0]). See also
1257    // mesh_admin::tests::test_proc_children_reflect_directly_spawned_actors.
1258    #[tokio::test]
1259    async fn test_query_child_proc_returns_live_children() {
1260        use hyperactor::Proc;
1261        use hyperactor::actor::ActorStatus;
1262        use hyperactor::channel::ChannelTransport;
1263        use hyperactor::introspect::IntrospectMessage;
1264        use hyperactor::introspect::IntrospectResult;
1265
1266        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1267        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1268
1269        // Wait for ProcAgent to finish init.
1270        agent_handle
1271            .status()
1272            .wait_for(|s| matches!(s, ActorStatus::Idle))
1273            .await
1274            .unwrap();
1275
1276        // Client instance for opening reply ports.
1277        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1278        let (client, _client_handle) = client_proc.client("client").unwrap();
1279
1280        let agent_id: ActorAddr = proc.proc_addr().actor_addr(PROC_AGENT_ACTOR_NAME);
1281        let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1282
1283        // Helper: send QueryChild(Proc) and return the payload with a
1284        // timeout so a misrouted reply fails fast rather than hanging.
1285        let query = |client: &hyperactor::Instance<()>| {
1286            let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1287            port.post(
1288                client,
1289                IntrospectMessage::QueryChild {
1290                    child_ref: Addr::Proc(proc.proc_addr().clone()),
1291                    reply: reply_port.bind(),
1292                },
1293            );
1294            reply_rx
1295        };
1296        let recv = |rx: hyperactor::mailbox::OncePortReceiver<IntrospectResult>| async move {
1297            tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
1298                .await
1299                .expect("QueryChild(Proc) timed out — reply never delivered")
1300                .expect("reply channel closed")
1301        };
1302
1303        // Initial query: ProcAgent itself should appear in children.
1304        let payload = recv(query(&client)).await;
1305        // Verify this is a proc node by checking attrs contain node_type=proc.
1306        let attrs: hyperactor_config::Attrs =
1307            serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1308        assert_eq!(
1309            attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1310            Some("proc"),
1311            "expected node_type=proc in attrs, got {:?}",
1312            payload.attrs
1313        );
1314        assert!(
1315            payload
1316                .children
1317                .iter()
1318                .any(|c| c.to_string().contains(PROC_AGENT_ACTOR_NAME)),
1319            "initial children {:?} should contain proc_agent",
1320            payload.children
1321        );
1322        let initial_count = payload.children.len();
1323
1324        // Spawn an actor directly on the proc, bypassing ProcAgent's
1325        // gspawn message handler. This is how supervision-spawned
1326        // actors (e.g. sieve children) are created.
1327        proc.spawn("extra_actor", ExtraActor).unwrap();
1328
1329        // Second query: extra_actor must appear without any republish.
1330        let payload2 = recv(query(&client)).await;
1331        let attrs2: hyperactor_config::Attrs =
1332            serde_json::from_str(&payload2.attrs).expect("valid attrs JSON");
1333        assert_eq!(
1334            attrs2.get(crate::introspect::NODE_TYPE).map(String::as_str),
1335            Some("proc"),
1336            "expected node_type=proc in attrs, got {:?}",
1337            payload2.attrs
1338        );
1339        assert!(
1340            payload2
1341                .children
1342                .iter()
1343                .any(|c| c.to_string().contains("extra_actor")),
1344            "after direct spawn, children {:?} should contain extra_actor",
1345            payload2.children
1346        );
1347        assert!(
1348            payload2.children.len() > initial_count,
1349            "expected at least {} children after direct spawn, got {:?}",
1350            initial_count + 1,
1351            payload2.children
1352        );
1353    }
1354
1355    // Exercises S12 (see introspect module doc): introspection must
1356    // not impair actor liveness. Rapidly spawns and stops
1357    // actors while concurrently querying QueryChild(Addr::Proc).
1358    // The spawn/stop loop must complete within the timeout and the
1359    // iteration count must match -- if DashMap convoy starvation
1360    // blocks the proc, the timeout fires and the test fails.
1361    #[tokio::test]
1362    async fn test_rapid_spawn_stop_does_not_stall_proc_agent() {
1363        use std::sync::Arc;
1364        use std::sync::atomic::AtomicUsize;
1365        use std::sync::atomic::Ordering;
1366
1367        use hyperactor::Proc;
1368        use hyperactor::actor::ActorStatus;
1369        use hyperactor::channel::ChannelTransport;
1370        use hyperactor::introspect::IntrospectMessage;
1371        use hyperactor::introspect::IntrospectResult;
1372
1373        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1374        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1375
1376        agent_handle
1377            .status()
1378            .wait_for(|s| matches!(s, ActorStatus::Idle))
1379            .await
1380            .unwrap();
1381
1382        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1383        let (client, _client_handle) = client_proc.client("client").unwrap();
1384
1385        let agent_id: ActorAddr = proc.proc_addr().actor_addr(PROC_AGENT_ACTOR_NAME);
1386        let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1387
1388        // Concurrent query task: send QueryChild(Proc) every 10ms.
1389        let query_client_proc =
1390            Proc::direct(ChannelTransport::Unix.any(), "query_client".to_string()).unwrap();
1391        let (query_client, _qc_handle) = query_client_proc.client("qc").unwrap();
1392        let query_port = port.clone();
1393        let query_proc_id = proc.proc_addr().clone();
1394        let query_count = Arc::new(AtomicUsize::new(0));
1395        let query_count_clone = query_count.clone();
1396        let query_task = tokio::spawn(async move {
1397            loop {
1398                let (reply_port, reply_rx) = query_client.open_once_port::<IntrospectResult>();
1399                query_port.post(
1400                    &query_client,
1401                    IntrospectMessage::QueryChild {
1402                        child_ref: Addr::Proc(query_proc_id.clone()),
1403                        reply: reply_port.bind(),
1404                    },
1405                );
1406                match tokio::time::timeout(std::time::Duration::from_secs(2), reply_rx.recv()).await
1407                {
1408                    Ok(Ok(_)) => {
1409                        query_count_clone.fetch_add(1, Ordering::Relaxed);
1410                    }
1411                    _ => {} // Transient failures expected during churn
1412                }
1413                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1414            }
1415        });
1416
1417        // Rapid spawn/stop loop with liveness timeout.
1418        const ITERATIONS: usize = 200;
1419        let mut completed = 0usize;
1420        let result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1421            for i in 0..ITERATIONS {
1422                let name = format!("churn_{}", i);
1423                let handle = proc.spawn(&name, ExtraActor).unwrap();
1424                let actor_id = handle.actor_addr().clone();
1425                if let Some(mut status) = proc.stop_actor(actor_id.id(), "churn".to_string()) {
1426                    let _ = tokio::time::timeout(
1427                        std::time::Duration::from_secs(5),
1428                        status.wait_for(ActorStatus::is_terminal),
1429                    )
1430                    .await;
1431                }
1432                completed += 1;
1433            }
1434        })
1435        .await;
1436
1437        query_task.abort();
1438        let _ = query_task.await; // Join to suppress noisy panic on drop.
1439
1440        assert!(
1441            result.is_ok(),
1442            "spawn/stop loop stalled after {completed}/{ITERATIONS} iterations — \
1443             DashMap convoy starvation likely"
1444        );
1445        assert_eq!(
1446            completed, ITERATIONS,
1447            "expected {ITERATIONS} completed iterations, got {completed}"
1448        );
1449        assert!(
1450            query_count.load(Ordering::Relaxed) > 0,
1451            "concurrent QueryChild queries never succeeded — query task may not have run"
1452        );
1453
1454        // Final consistency check: QueryChild should still work.
1455        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1456        port.post(
1457            &client,
1458            IntrospectMessage::QueryChild {
1459                child_ref: Addr::Proc(proc.proc_addr().clone()),
1460                reply: reply_port.bind(),
1461            },
1462        );
1463        let final_payload =
1464            tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1465                .await
1466                .expect("final QueryChild timed out")
1467                .expect("final QueryChild channel closed");
1468        let attrs: hyperactor_config::Attrs =
1469            serde_json::from_str(&final_payload.attrs).expect("valid attrs JSON");
1470        assert_eq!(
1471            attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1472            Some("proc"),
1473        );
1474    }
1475
1476    #[tokio::test]
1477    async fn test_stream_state_and_unsubscribe() {
1478        use hyperactor::Proc;
1479        use hyperactor::actor::ActorStatus;
1480        use hyperactor::channel::ChannelTransport;
1481
1482        use crate::resource::CreateOrUpdateClient;
1483        use crate::resource::GetStateClient;
1484        use crate::resource::StopClient;
1485        use crate::resource::StreamStateClient;
1486
1487        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1488        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1489        agent_handle
1490            .status()
1491            .wait_for(|s| matches!(s, ActorStatus::Idle))
1492            .await
1493            .unwrap();
1494
1495        let (client, _client_handle) = proc.client("client").unwrap();
1496        let agent_ref: ActorRef<ProcAgent> = agent_handle.bind();
1497
1498        let actor_type = hyperactor::actor::remote::Remote::collect()
1499            .name_of::<ExtraActor>()
1500            .unwrap()
1501            .to_string();
1502        let actor_params =
1503            bincode::serde::encode_to_vec(&ExtraActor, bincode::config::legacy()).unwrap();
1504        let actor_name = ResourceId::singleton(hyperactor::id::Label::new("test-actor").unwrap());
1505
1506        // 1. Spawn an actor via CreateOrUpdate.
1507        agent_ref
1508            .create_or_update(
1509                &client,
1510                actor_name.clone(),
1511                resource::Rank::new(0),
1512                ActorSpec {
1513                    actor_type: actor_type.clone(),
1514                    params_data: actor_params.clone(),
1515                },
1516            )
1517            .await
1518            .unwrap();
1519
1520        // 2. Subscribe to state updates.
1521        let (sub_port, mut sub_rx) = client.open_port::<resource::State<ActorState>>();
1522        agent_ref
1523            .stream_state(&client, actor_name.clone(), sub_port.bind())
1524            .await
1525            .unwrap();
1526
1527        // 3. Should receive the initial state (Running).
1528        let initial = sub_rx.recv().await.expect("subscriber channel error");
1529        assert_eq!(initial.status, resource::Status::Running);
1530        assert!(initial.state.is_some());
1531
1532        // 4. Send Stop — should receive Stopping.
1533        agent_ref
1534            .stop(&client, actor_name.clone(), "test".to_string())
1535            .await
1536            .unwrap();
1537
1538        let stopping = sub_rx.recv().await.expect("subscriber channel error");
1539        assert_eq!(stopping.status, resource::Status::Stopping);
1540
1541        // 5. Wait for the Stopped supervision event update.
1542        let stopped = sub_rx.recv().await.expect("subscriber channel error");
1543        assert_eq!(stopped.status, resource::Status::Stopped);
1544
1545        // 6. Test implicit unsubscription via undeliverable.
1546        let actor_name_2 =
1547            ResourceId::singleton(hyperactor::id::Label::new("test-actor-2").unwrap());
1548        agent_ref
1549            .create_or_update(
1550                &client,
1551                actor_name_2.clone(),
1552                resource::Rank::new(1),
1553                ActorSpec {
1554                    actor_type: actor_type.clone(),
1555                    params_data: actor_params.clone(),
1556                },
1557            )
1558            .await
1559            .unwrap();
1560
1561        let (sub_port_2, mut sub_rx_2) = client.open_port::<resource::State<ActorState>>();
1562        agent_ref
1563            .stream_state(&client, actor_name_2.clone(), sub_port_2.bind())
1564            .await
1565            .unwrap();
1566
1567        let initial_2 = sub_rx_2.recv().await.expect("subscriber 2 channel error");
1568        assert_eq!(initial_2.status, resource::Status::Running);
1569
1570        // Drop the receiver so the next send bounces as undeliverable.
1571        drop(sub_rx_2);
1572
1573        // Stop the second actor — triggers notify_status_changed to the
1574        // dead subscriber. ProcAgent should handle the undeliverable
1575        // gracefully.
1576        agent_ref
1577            .stop(
1578                &client,
1579                actor_name_2.clone(),
1580                "test unsubscribe".to_string(),
1581            )
1582            .await
1583            .unwrap();
1584
1585        // Wait for actor_2 to reach terminal state via a new stream subscription.
1586        let (sub_port_3, mut sub_rx_3) = client.open_port::<resource::State<ActorState>>();
1587        agent_ref
1588            .stream_state(&client, actor_name_2.clone(), sub_port_3.bind())
1589            .await
1590            .unwrap();
1591        loop {
1592            let state = sub_rx_3.recv().await.expect("subscriber 3 channel error");
1593            if state.status.is_terminating() {
1594                break;
1595            }
1596        }
1597
1598        // Verify ProcAgent is still alive after the undeliverable was handled.
1599        let state = agent_ref
1600            .get_state(&client, actor_name_2.clone())
1601            .await
1602            .unwrap();
1603        assert!(
1604            state.status.is_terminating(),
1605            "expected terminating status, got {:?}",
1606            state.status,
1607        );
1608    }
1609
1610    // ── PD-4/PD-5: live proc-agent queue pressure test ────────
1611
1612    // A blocking actor for inducing queue pressure. Uses a shared
1613    // Notify for the block/unblock protocol since actor messages
1614    // must be Serialize + Clone.
1615    #[derive(Debug, Default, Serialize, Deserialize)]
1616    #[hyperactor::export(handlers = [BlockMsg])]
1617    struct BlockActor {
1618        #[serde(skip)]
1619        gate: Option<Arc<tokio::sync::Notify>>,
1620    }
1621    impl hyperactor::Actor for BlockActor {}
1622
1623    #[derive(
1624        Debug,
1625        Clone,
1626        Serialize,
1627        Deserialize,
1628        Named,
1629        hyperactor::Handler,
1630        hyperactor::HandleClient
1631    )]
1632    enum BlockMsg {
1633        /// Block until the shared Notify fires.
1634        Block(),
1635        /// No-op message to queue behind a blocked Block.
1636        Noop(),
1637    }
1638    wirevalue::register_type!(BlockMsg);
1639
1640    #[async_trait::async_trait]
1641    #[hyperactor::handle(BlockMsg)]
1642    impl BlockMsgHandler for BlockActor {
1643        async fn block(&mut self, _cx: &hyperactor::Context<Self>) -> Result<(), anyhow::Error> {
1644            if let Some(gate) = &self.gate {
1645                gate.notified().await;
1646            }
1647            Ok(())
1648        }
1649        async fn noop(&mut self, _cx: &hyperactor::Context<Self>) -> Result<(), anyhow::Error> {
1650            Ok(())
1651        }
1652    }
1653
1654    // PD-4/PD-5: QueryChild(Proc) returns non-zero queue stats
1655    // while actors are under induced pressure. This proves the
1656    // live proc-agent introspection path carries the queue depth
1657    // signal that the TUI depends on.
1658    //
1659    // Queue depth is an instantaneous snapshot at query time,
1660    // not backlog history.
1661    #[tokio::test]
1662    async fn test_query_child_proc_queue_depth_under_pressure() {
1663        use hyperactor::Proc;
1664        use hyperactor::actor::ActorStatus;
1665        use hyperactor::channel::ChannelTransport;
1666        use hyperactor::introspect::IntrospectMessage;
1667        use hyperactor::introspect::IntrospectResult;
1668
1669        let proc = Proc::direct(ChannelTransport::Unix.any(), "qd_proc".to_string()).unwrap();
1670        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1671
1672        agent_handle
1673            .status()
1674            .wait_for(|s| matches!(s, ActorStatus::Idle))
1675            .await
1676            .unwrap();
1677
1678        let client_proc =
1679            Proc::direct(ChannelTransport::Unix.any(), "qd_client".to_string()).unwrap();
1680        let (client, _client_handle) = client_proc.client("client").unwrap();
1681
1682        // Spawn a blocking actor with a shared gate.
1683        let gate = Arc::new(tokio::sync::Notify::new());
1684        let blocker = proc
1685            .spawn(
1686                "blocker",
1687                BlockActor {
1688                    gate: Some(Arc::clone(&gate)),
1689                },
1690            )
1691            .unwrap();
1692
1693        // Block the actor and queue additional work behind it.
1694        blocker.block(&client).await.unwrap();
1695        // Give the actor time to enter the handler.
1696        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1697        blocker.noop(&client).await.unwrap();
1698        blocker.noop(&client).await.unwrap();
1699
1700        // QueryChild(Proc) — same aggregation logic as mesh-admin
1701        // resolution.
1702        let agent_id: ActorAddr = proc.proc_addr().actor_addr(PROC_AGENT_ACTOR_NAME);
1703        let port = PortRef::<IntrospectMessage>::attest_handler_port(&agent_id);
1704
1705        // Poll until queue stats are non-zero.
1706        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1707        loop {
1708            let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1709            port.post(
1710                &client,
1711                IntrospectMessage::QueryChild {
1712                    child_ref: Addr::Proc(proc.proc_addr().clone()),
1713                    reply: reply_port.bind(),
1714                },
1715            );
1716            let payload = tokio::time::timeout(std::time::Duration::from_secs(3), reply_rx.recv())
1717                .await
1718                .expect("QueryChild timed out")
1719                .expect("reply channel closed");
1720
1721            let attrs: hyperactor_config::Attrs =
1722                serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1723
1724            let total = attrs
1725                .get(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_TOTAL)
1726                .copied()
1727                .unwrap_or(0);
1728            let max = attrs
1729                .get(crate::introspect::ACTOR_WORK_QUEUE_DEPTH_MAX)
1730                .copied()
1731                .unwrap_or(0);
1732
1733            if total > 0 {
1734                assert!(max > 0, "max should be > 0 when total is {total}");
1735                assert!(max <= total, "PD-1: max ({max}) <= total ({total})");
1736                break;
1737            }
1738
1739            assert!(
1740                tokio::time::Instant::now() < deadline,
1741                "timed out waiting for non-zero queue depth in QueryChild(Proc)",
1742            );
1743            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1744        }
1745
1746        // Unblock the actor.
1747        gate.notify_one();
1748    }
1749}