hyperactor_mesh/
proc_agent.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! The mesh agent actor manages procs in ProcMeshes.
10
11// EnumAsInner generates code that triggers a false positive
12// unused_assignments lint on struct variant fields. #[allow] on the
13// enum itself doesn't propagate into derive-macro-generated code, so
14// the suppression must be at module scope.
15#![allow(unused_assignments)]
16
17use std::collections::HashMap;
18use std::mem::take;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::sync::RwLock;
22use std::sync::RwLockReadGuard;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use enum_as_inner::EnumAsInner;
27use hyperactor::Actor;
28use hyperactor::ActorHandle;
29use hyperactor::Bind;
30use hyperactor::Context;
31use hyperactor::Data;
32use hyperactor::HandleClient;
33use hyperactor::Handler;
34use hyperactor::Instance;
35use hyperactor::PortHandle;
36use hyperactor::RefClient;
37use hyperactor::Unbind;
38use hyperactor::actor::ActorStatus;
39use hyperactor::actor::remote::Remote;
40use hyperactor::channel;
41use hyperactor::channel::ChannelAddr;
42use hyperactor::mailbox::BoxedMailboxSender;
43use hyperactor::mailbox::DialMailboxRouter;
44use hyperactor::mailbox::IntoBoxedMailboxSender;
45use hyperactor::mailbox::MailboxClient;
46use hyperactor::mailbox::MailboxSender;
47use hyperactor::mailbox::MessageEnvelope;
48use hyperactor::mailbox::Undeliverable;
49use hyperactor::proc::Proc;
50use hyperactor::reference as hyperactor_reference;
51use hyperactor::supervision::ActorSupervisionEvent;
52use hyperactor_config::CONFIG;
53use hyperactor_config::ConfigAttr;
54use hyperactor_config::attrs::declare_attrs;
55use serde::Deserialize;
56use serde::Serialize;
57use typeuri::Named;
58
59use crate::Name;
60use crate::resource;
61
62/// Actor name used when spawning the proc agent on user procs.
63pub const PROC_AGENT_ACTOR_NAME: &str = "proc_agent";
64
65declare_attrs! {
66    /// Whether to self kill actors, procs, and hosts whose owner is not reachable.
67    @meta(CONFIG = ConfigAttr::new(
68        Some("HYPERACTOR_MESH_ORPHAN_TIMEOUT".to_string()),
69        Some("mesh_orphan_timeout".to_string()),
70    ))
71    pub attr MESH_ORPHAN_TIMEOUT: Duration = Duration::from_secs(60);
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
75pub enum GspawnResult {
76    Success {
77        rank: usize,
78        actor_id: hyperactor_reference::ActorId,
79    },
80    Error(String),
81}
82wirevalue::register_type!(GspawnResult);
83
84#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
85pub enum StopActorResult {
86    Success,
87    Timeout,
88    NotFound,
89}
90wirevalue::register_type!(StopActorResult);
91
92/// Deferred republish of introspect properties.
93///
94/// Sent as a zero-delay self-message from the supervision event
95/// handler so it returns immediately without blocking the ProcAgent
96/// message loop. Multiple rapid supervision events (e.g., 4 actors
97/// failing simultaneously via broadcast) coalesce into a single
98/// republish via the `introspect_dirty` flag.
99///
100/// Without this, calling `publish_introspect_properties` inline in
101/// the supervision handler starves `GetRankStatus` polls from the
102/// `ActorMeshController`, preventing `__supervise__` from firing
103/// within the test timeout. See D94960791 for the root cause
104/// analysis.
105#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
106struct RepublishIntrospect;
107wirevalue::register_type!(RepublishIntrospect);
108
109/// Collect live actor children and system actor children from the
110/// proc's instance DashMap using `all_instance_keys()` with point
111/// lookups. This avoids the convoy starvation from `all_actor_ids()`
112/// which holds shard read locks while doing heavy per-entry work.
113/// See S12 in `introspect` module doc.
114fn collect_live_children(proc: &hyperactor::Proc) -> (Vec<String>, Vec<String>) {
115    let all_keys = proc.all_instance_keys();
116    let mut children = Vec::with_capacity(all_keys.len());
117    let mut system_children = Vec::new();
118    for id in all_keys {
119        if let Some(cell) = proc.get_instance(&id) {
120            let ref_str = id.to_string();
121            if cell.is_system() {
122                system_children.push(ref_str.clone());
123            }
124            children.push(ref_str);
125        }
126    }
127    (children, system_children)
128}
129
130#[derive(
131    Debug,
132    Clone,
133    PartialEq,
134    Serialize,
135    Deserialize,
136    Handler,
137    HandleClient,
138    RefClient,
139    Named
140)]
141pub(crate) enum MeshAgentMessage {
142    /// Configure the proc in the mesh.
143    Configure {
144        /// The rank of this proc in the mesh.
145        rank: usize,
146        /// The forwarder to send messages to unknown destinations.
147        forwarder: ChannelAddr,
148        /// The supervisor port to which the agent should report supervision events.
149        supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
150        /// An address book to use for direct dialing.
151        address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
152        /// The agent should write its rank to this port when it successfully
153        /// configured.
154        configured: hyperactor_reference::PortRef<usize>,
155        /// If true, and supervisor is None, record supervision events to be reported
156        record_supervision_events: bool,
157    },
158
159    Status {
160        /// The status of the proc.
161        /// To be replaced with fine-grained lifecycle status,
162        /// and to use aggregation.
163        status: hyperactor_reference::PortRef<(usize, bool)>,
164    },
165
166    /// Spawn an actor on the proc to the provided name.
167    Gspawn {
168        /// registered actor type
169        actor_type: String,
170        /// spawned actor name
171        actor_name: String,
172        /// serialized parameters
173        params_data: Data,
174        /// reply port; the proc should send its rank to indicated a spawned actor
175        status_port: hyperactor_reference::PortRef<GspawnResult>,
176    },
177
178    /// Stop actors of a specific mesh name
179    StopActor {
180        /// The actor to stop
181        actor_id: hyperactor_reference::ActorId,
182        /// The timeout for waiting for the actor to stop
183        timeout_ms: u64,
184        /// The reason for stopping the actor
185        reason: String,
186        /// The result when trying to stop the actor
187        #[reply]
188        stopped: hyperactor_reference::OncePortRef<StopActorResult>,
189    },
190}
191
192/// Internal configuration state of the mesh agent.
193#[derive(Debug, EnumAsInner, Default)]
194enum State {
195    UnconfiguredV0 {
196        sender: ReconfigurableMailboxSender,
197    },
198
199    ConfiguredV0 {
200        sender: ReconfigurableMailboxSender,
201        rank: usize,
202        supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
203    },
204
205    V1,
206
207    #[default]
208    Invalid,
209}
210
211impl State {
212    fn rank(&self) -> Option<usize> {
213        match self {
214            State::ConfiguredV0 { rank, .. } => Some(*rank),
215            _ => None,
216        }
217    }
218
219    fn supervisor(&self) -> Option<hyperactor_reference::PortRef<ActorSupervisionEvent>> {
220        match self {
221            State::ConfiguredV0 { supervisor, .. } => supervisor.clone(),
222            _ => None,
223        }
224    }
225}
226
227/// Actor state used for v1 API.
228#[derive(Debug)]
229struct ActorInstanceState {
230    create_rank: usize,
231    spawn: Result<hyperactor_reference::ActorId, anyhow::Error>,
232    /// If true, the actor has been stopped. There is no way to restart it, a new
233    /// actor must be spawned.
234    stopped: bool,
235    /// The time at which the actor should be considered expired if no further
236    /// keepalive is received. `None` meaning it will never expire.
237    expiry_time: Option<std::time::SystemTime>,
238}
239
240#[derive(
241    Clone,
242    Debug,
243    Default,
244    PartialEq,
245    Serialize,
246    Deserialize,
247    Named,
248    Bind,
249    Unbind
250)]
251struct SelfCheck {}
252
253/// A mesh agent is responsible for managing procs in a [`ProcMesh`].
254///
255/// ## Supervision event ingestion (remote)
256///
257/// `ProcAgent` is the *process/rank-local* sink for
258/// `ActorSupervisionEvent`s produced by the runtime (actor failures,
259/// routing failures, undeliverables, etc.).
260///
261/// We **export** `ActorSupervisionEvent` as a handler so that other
262/// procs—most importantly the process-global root client created by
263/// `context()`—can forward undeliverables as supervision
264/// events to the *currently active* mesh.
265///
266/// Without exporting this handler, `ActorSupervisionEvent` cannot be
267/// addressed via `ActorRef`/`PortRef` across processes, and the
268/// global-root-client undeliverable → supervision pipeline would
269/// degrade to log-only behavior (events become undeliverable again or
270/// are dropped).
271///
272/// See GC-1 in `global_context` module doc.
273#[hyperactor::export(
274    handlers=[
275        MeshAgentMessage,
276        ActorSupervisionEvent,
277        resource::CreateOrUpdate<ActorSpec> { cast = true },
278        resource::Stop { cast = true },
279        resource::StopAll { cast = true },
280        resource::GetState<ActorState> { cast = true },
281        resource::KeepaliveGetState<ActorState> { cast = true },
282        resource::GetRankStatus { cast = true },
283        RepublishIntrospect { cast = true },
284    ]
285)]
286pub struct ProcAgent {
287    proc: Proc,
288    remote: Remote,
289    state: State,
290    /// Actors created and tracked through the resource behavior.
291    actor_states: HashMap<Name, ActorInstanceState>,
292    /// If true, and supervisor is None, record supervision events to be reported
293    /// to owning actors later.
294    record_supervision_events: bool,
295    /// If record_supervision_events is true, then this will contain the list
296    /// of all events that were received.
297    supervision_events: HashMap<hyperactor_reference::ActorId, Vec<ActorSupervisionEvent>>,
298    /// True when supervision events have arrived but introspect
299    /// properties haven't been republished yet.
300    introspect_dirty: bool,
301    /// If set, the StopAll handler will send the exit code through this
302    /// channel instead of calling process::exit directly, allowing the
303    /// caller to perform graceful shutdown (e.g. draining the mailbox server).
304    shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
305    /// If set, check for expired actors whose keepalive has lapsed.
306    mesh_orphan_timeout: Option<Duration>,
307}
308
309impl ProcAgent {
310    #[hyperactor::observe_result("MeshAgent")]
311    pub(crate) async fn bootstrap(
312        proc_id: hyperactor_reference::ProcId,
313    ) -> Result<(Proc, ActorHandle<Self>), anyhow::Error> {
314        let sender = ReconfigurableMailboxSender::new();
315        let proc = Proc::configured(proc_id.clone(), BoxedMailboxSender::new(sender.clone()));
316
317        let agent = ProcAgent {
318            proc: proc.clone(),
319            remote: Remote::collect(),
320            state: State::UnconfiguredV0 { sender },
321            actor_states: HashMap::new(),
322            record_supervision_events: false,
323            supervision_events: HashMap::new(),
324            introspect_dirty: false,
325            shutdown_tx: None,
326            // v0 procs don't have an owner they can check for, so they should
327            // never try to kill the children.
328            mesh_orphan_timeout: None,
329        };
330        let handle = proc.spawn::<Self>("mesh", agent)?;
331        Ok((proc, handle))
332    }
333
334    pub(crate) fn boot_v1(
335        proc: Proc,
336        shutdown_tx: Option<tokio::sync::oneshot::Sender<i32>>,
337    ) -> Result<ActorHandle<Self>, anyhow::Error> {
338        // We can't use Option<Duration> directly in config attrs because AttrValue
339        // is not implemented for Option<Duration>. So we use a zero timeout to
340        // indicate no timeout.
341        let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
342        let orphan_timeout = if orphan_timeout.is_zero() {
343            None
344        } else {
345            Some(orphan_timeout)
346        };
347        let agent = ProcAgent {
348            proc: proc.clone(),
349            remote: Remote::collect(),
350            state: State::V1,
351            actor_states: HashMap::new(),
352            record_supervision_events: true,
353            supervision_events: HashMap::new(),
354            introspect_dirty: false,
355            shutdown_tx,
356            mesh_orphan_timeout: orphan_timeout,
357        };
358        proc.spawn::<Self>(PROC_AGENT_ACTOR_NAME, agent)
359    }
360
361    async fn destroy_and_wait_except_current<'a>(
362        &mut self,
363        cx: &Context<'a, Self>,
364        timeout: tokio::time::Duration,
365        reason: &str,
366    ) -> Result<
367        (
368            Vec<hyperactor_reference::ActorId>,
369            Vec<hyperactor_reference::ActorId>,
370        ),
371        anyhow::Error,
372    > {
373        self.proc
374            .destroy_and_wait_except_current::<Self>(timeout, Some(cx), true, reason)
375            .await
376    }
377
378    /// Publish the current proc properties and children list for
379    /// introspection. See S12 in `introspect` module doc.
380    fn publish_introspect_properties(&self, cx: &impl hyperactor::context::Actor) {
381        let (mut children, mut system_children) = collect_live_children(&self.proc);
382
383        // Terminated actors appear as children but don't inflate
384        // the actor count. Track them in stopped_children so the
385        // TUI can filter/gray without per-child fetches.
386        let mut stopped_children: Vec<String> = Vec::new();
387        for id in self.proc.all_terminated_actor_ids() {
388            let ref_str = id.to_string();
389            stopped_children.push(ref_str.clone());
390            // Terminated system actors must also appear in
391            // system_children for correct filtering.
392            if let Some(snapshot) = self.proc.terminated_snapshot(&id) {
393                let snapshot_attrs: hyperactor_config::Attrs =
394                    serde_json::from_str(&snapshot.attrs).unwrap_or_default();
395                if snapshot_attrs
396                    .get(hyperactor::introspect::IS_SYSTEM)
397                    .copied()
398                    .unwrap_or(false)
399                {
400                    system_children.push(ref_str.clone());
401                }
402            }
403            if !children.contains(&ref_str) {
404                children.push(ref_str);
405            }
406        }
407
408        let stopped_retention_cap =
409            hyperactor_config::global::get(hyperactor::config::TERMINATED_SNAPSHOT_RETENTION);
410
411        // FI-5: is_poisoned iff failed_actor_count > 0.
412        let failed_actor_count = self.supervision_events.len();
413
414        // Attrs-based introspection.
415        let num_live = children.len();
416        let mut attrs = hyperactor_config::Attrs::new();
417        attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
418        attrs.set(
419            crate::introspect::PROC_NAME,
420            self.proc.proc_id().to_string(),
421        );
422        attrs.set(crate::introspect::NUM_ACTORS, num_live);
423        attrs.set(hyperactor::introspect::CHILDREN, children);
424        attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
425        attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
426        attrs.set(
427            crate::introspect::STOPPED_RETENTION_CAP,
428            stopped_retention_cap,
429        );
430        attrs.set(crate::introspect::IS_POISONED, failed_actor_count > 0);
431        attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
432        cx.instance().publish_attrs(attrs);
433    }
434}
435
436#[async_trait]
437impl Actor for ProcAgent {
438    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
439        this.set_system();
440        self.proc.set_supervision_coordinator(this.port())?;
441        self.publish_introspect_properties(this);
442
443        // Resolve terminated actor snapshots via QueryChild so that
444        // dead actors remain directly queryable by reference.
445        let proc = self.proc.clone();
446        let self_id = this.self_id().clone();
447        this.set_query_child_handler(move |child_ref| {
448            use hyperactor::introspect::IntrospectResult;
449
450            if let hyperactor::reference::Reference::Actor(id) = child_ref {
451                if let Some(snapshot) = proc.terminated_snapshot(id) {
452                    return snapshot;
453                }
454            }
455
456            // PA-1 (ProcAgent path): proc-node children used by
457            // admin/TUI must be computed from live proc state at query
458            // time, not solely from cached published_properties.
459            // Therefore a direct proc.spawn() actor must appear on the
460            // next QueryChild(Reference::Proc) response without an
461            // extra publish event. See
462            // test_query_child_proc_returns_live_children.
463            if let hyperactor::reference::Reference::Proc(proc_id) = child_ref {
464                if proc_id == proc.proc_id() {
465                    let (mut children, mut system_children) = collect_live_children(&proc);
466
467                    let mut stopped_children: Vec<String> = Vec::new();
468                    for id in proc.all_terminated_actor_ids() {
469                        let ref_str = id.to_string();
470                        stopped_children.push(ref_str.clone());
471                        if let Some(snapshot) = proc.terminated_snapshot(&id) {
472                            let snapshot_attrs: hyperactor_config::Attrs =
473                                serde_json::from_str(&snapshot.attrs).unwrap_or_default();
474                            if snapshot_attrs
475                                .get(hyperactor::introspect::IS_SYSTEM)
476                                .copied()
477                                .unwrap_or(false)
478                            {
479                                system_children.push(ref_str.clone());
480                            }
481                        }
482                        if !children.contains(&ref_str) {
483                            children.push(ref_str);
484                        }
485                    }
486
487                    let stopped_retention_cap = hyperactor_config::global::get(
488                        hyperactor::config::TERMINATED_SNAPSHOT_RETENTION,
489                    );
490
491                    let (is_poisoned, failed_actor_count) = proc
492                        .get_instance(&self_id)
493                        .and_then(|cell| cell.published_attrs())
494                        .map(|attrs| {
495                            let is_poisoned = attrs
496                                .get(crate::introspect::IS_POISONED)
497                                .copied()
498                                .unwrap_or(false);
499                            let failed_actor_count = attrs
500                                .get(crate::introspect::FAILED_ACTOR_COUNT)
501                                .copied()
502                                .unwrap_or(0);
503                            (is_poisoned, failed_actor_count)
504                        })
505                        .unwrap_or((false, 0));
506
507                    // Build attrs for this proc node.
508                    let num_live = children.len();
509                    let mut attrs = hyperactor_config::Attrs::new();
510                    attrs.set(crate::introspect::NODE_TYPE, "proc".to_string());
511                    attrs.set(crate::introspect::PROC_NAME, proc_id.to_string());
512                    attrs.set(crate::introspect::NUM_ACTORS, num_live);
513                    attrs.set(crate::introspect::SYSTEM_CHILDREN, system_children);
514                    attrs.set(crate::introspect::STOPPED_CHILDREN, stopped_children);
515                    attrs.set(
516                        crate::introspect::STOPPED_RETENTION_CAP,
517                        stopped_retention_cap,
518                    );
519                    attrs.set(crate::introspect::IS_POISONED, is_poisoned);
520                    attrs.set(crate::introspect::FAILED_ACTOR_COUNT, failed_actor_count);
521                    let attrs_json =
522                        serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string());
523
524                    return IntrospectResult {
525                        identity: proc_id.to_string(),
526                        attrs: attrs_json,
527                        children,
528                        parent: None,
529                        as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
530                            .to_string(),
531                    };
532                }
533            }
534
535            {
536                let mut error_attrs = hyperactor_config::Attrs::new();
537                error_attrs.set(hyperactor::introspect::ERROR_CODE, "not_found".to_string());
538                error_attrs.set(
539                    hyperactor::introspect::ERROR_MESSAGE,
540                    format!("child {} not found", child_ref),
541                );
542                IntrospectResult {
543                    identity: String::new(),
544                    attrs: serde_json::to_string(&error_attrs).unwrap_or_else(|_| "{}".to_string()),
545                    children: Vec::new(),
546                    parent: None,
547                    as_of: humantime::format_rfc3339_millis(std::time::SystemTime::now())
548                        .to_string(),
549                }
550            }
551        });
552
553        if let Some(delay) = &self.mesh_orphan_timeout {
554            this.self_message_with_delay(SelfCheck::default(), *delay)?;
555        }
556        Ok(())
557    }
558}
559
560#[async_trait]
561#[hyperactor::handle(MeshAgentMessage)]
562impl MeshAgentMessageHandler for ProcAgent {
563    async fn configure(
564        &mut self,
565        cx: &Context<Self>,
566        rank: usize,
567        forwarder: ChannelAddr,
568        supervisor: Option<hyperactor_reference::PortRef<ActorSupervisionEvent>>,
569        address_book: HashMap<hyperactor_reference::ProcId, ChannelAddr>,
570        configured: hyperactor_reference::PortRef<usize>,
571        record_supervision_events: bool,
572    ) -> Result<(), anyhow::Error> {
573        anyhow::ensure!(
574            self.state.is_unconfigured_v0(),
575            "mesh agent cannot be (re-)configured"
576        );
577        self.record_supervision_events = record_supervision_events;
578
579        let client = MailboxClient::new(channel::dial(forwarder)?);
580        let router =
581            DialMailboxRouter::new_with_default_direct_addressed_remote_only(client.into_boxed());
582
583        for (proc_id, addr) in address_book {
584            router.bind(proc_id.into(), addr);
585        }
586
587        let sender = take(&mut self.state).into_unconfigured_v0().unwrap();
588        assert!(sender.configure(router.into_boxed()));
589
590        // This is a bit suboptimal: ideally we'd set the supervisor first, to correctly report
591        // any errors that occur during configuration. However, these should anyway be correctly
592        // caught on process exit.
593        self.state = State::ConfiguredV0 {
594            sender,
595            rank,
596            supervisor,
597        };
598        configured.send(cx, rank)?;
599
600        Ok(())
601    }
602
603    async fn gspawn(
604        &mut self,
605        cx: &Context<Self>,
606        actor_type: String,
607        actor_name: String,
608        params_data: Data,
609        status_port: hyperactor_reference::PortRef<GspawnResult>,
610    ) -> Result<(), anyhow::Error> {
611        anyhow::ensure!(
612            self.state.is_configured_v0(),
613            "mesh agent is not v0 configured"
614        );
615        let actor_id = match self
616            .remote
617            .gspawn(
618                &self.proc,
619                &actor_type,
620                &actor_name,
621                params_data,
622                cx.headers().clone(),
623            )
624            .await
625        {
626            Ok(id) => id,
627            Err(err) => {
628                status_port.send(cx, GspawnResult::Error(format!("gspawn failed: {}", err)))?;
629                return Err(anyhow::anyhow!("gspawn failed"));
630            }
631        };
632        status_port.send(
633            cx,
634            GspawnResult::Success {
635                rank: self.state.rank().unwrap(),
636                actor_id,
637            },
638        )?;
639        self.publish_introspect_properties(cx);
640        Ok(())
641    }
642
643    async fn stop_actor(
644        &mut self,
645        cx: &Context<Self>,
646        actor_id: hyperactor_reference::ActorId,
647        timeout_ms: u64,
648        reason: String,
649    ) -> Result<StopActorResult, anyhow::Error> {
650        tracing::info!(
651            name = "StopActor",
652            %actor_id,
653            actor_name = actor_id.name(),
654            %reason,
655        );
656
657        let result = if let Some(mut status) = self.proc.stop_actor(&actor_id, reason) {
658            match tokio::time::timeout(
659                tokio::time::Duration::from_millis(timeout_ms),
660                status.wait_for(|state: &ActorStatus| state.is_terminal()),
661            )
662            .await
663            {
664                Ok(_) => Ok(StopActorResult::Success),
665                Err(_) => Ok(StopActorResult::Timeout),
666            }
667        } else {
668            Ok(StopActorResult::NotFound)
669        };
670        self.publish_introspect_properties(cx);
671        result
672    }
673
674    async fn status(
675        &mut self,
676        cx: &Context<Self>,
677        status_port: hyperactor_reference::PortRef<(usize, bool)>,
678    ) -> Result<(), anyhow::Error> {
679        match &self.state {
680            State::ConfiguredV0 { rank, .. } => {
681                // v0 path: configured with a concrete rank
682                status_port.send(cx, (*rank, true))?;
683                Ok(())
684            }
685            State::UnconfiguredV0 { .. } => {
686                // v0 path but not configured yet
687                Err(anyhow::anyhow!(
688                    "status unavailable: v0 agent not configured (waiting for Configure)"
689                ))
690            }
691            State::V1 => {
692                // v1/owned path does not support status (no rank semantics)
693                Err(anyhow::anyhow!(
694                    "status unsupported in v1/owned path (no rank)"
695                ))
696            }
697            State::Invalid => Err(anyhow::anyhow!(
698                "status unavailable: agent in invalid state"
699            )),
700        }
701    }
702}
703
704#[async_trait]
705impl Handler<ActorSupervisionEvent> for ProcAgent {
706    async fn handle(
707        &mut self,
708        cx: &Context<Self>,
709        event: ActorSupervisionEvent,
710    ) -> anyhow::Result<()> {
711        if self.record_supervision_events {
712            if event.is_error() {
713                tracing::warn!(
714                    name = "SupervisionEvent",
715                    proc_id = %self.proc.proc_id(),
716                    %event,
717                    "recording supervision error",
718                );
719            } else {
720                tracing::debug!(
721                    name = "SupervisionEvent",
722                    proc_id = %self.proc.proc_id(),
723                    %event,
724                    "recording non-error supervision event",
725                );
726            }
727            self.supervision_events
728                .entry(event.actor_id.clone())
729                .or_default()
730                .push(event.clone());
731            // Defer republish so introspection picks up is_poisoned /
732            // failed_actor_count without blocking the message loop.
733            // Multiple rapid events coalesce into one republish.
734            if !self.introspect_dirty {
735                self.introspect_dirty = true;
736                let _ = cx.self_message_with_delay(
737                    RepublishIntrospect,
738                    std::time::Duration::from_millis(100),
739                );
740            }
741        }
742        if let Some(supervisor) = self.state.supervisor() {
743            supervisor.send(cx, event)?;
744        } else if !self.record_supervision_events {
745            // If there is no supervisor, and nothing is recording these, crash
746            // the whole process.
747            tracing::error!(
748                name = "supervision_event_transmit_failed",
749                proc_id = %cx.self_id().proc_id(),
750                %event,
751                "could not propagate supervision event, crashing",
752            );
753
754            // We should have a custom "crash" function here, so that this works
755            // in testing of the LocalAllocator, etc.
756            std::process::exit(1);
757        }
758        Ok(())
759    }
760}
761
762#[async_trait]
763impl Handler<RepublishIntrospect> for ProcAgent {
764    async fn handle(&mut self, cx: &Context<Self>, _: RepublishIntrospect) -> anyhow::Result<()> {
765        if self.introspect_dirty {
766            self.introspect_dirty = false;
767            self.publish_introspect_properties(cx);
768        }
769        Ok(())
770    }
771}
772
773// Implement the resource behavior for managing actors:
774
775/// Actor spec.
776#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
777pub struct ActorSpec {
778    /// registered actor type
779    pub actor_type: String,
780    /// serialized parameters
781    pub params_data: Data,
782}
783wirevalue::register_type!(ActorSpec);
784
785/// Actor state.
786#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
787pub struct ActorState {
788    /// The actor's ID.
789    pub actor_id: hyperactor_reference::ActorId,
790    /// The rank of the proc that created the actor. This is before any slicing.
791    pub create_rank: usize,
792    // TODO status: ActorStatus,
793    pub supervision_events: Vec<ActorSupervisionEvent>,
794}
795wirevalue::register_type!(ActorState);
796
797#[async_trait]
798impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcAgent {
799    async fn handle(
800        &mut self,
801        cx: &Context<Self>,
802        create_or_update: resource::CreateOrUpdate<ActorSpec>,
803    ) -> anyhow::Result<()> {
804        if self.actor_states.contains_key(&create_or_update.name) {
805            // There is no update.
806            return Ok(());
807        }
808        let create_rank = create_or_update.rank.unwrap();
809        // If there have been supervision events for any actors on this proc,
810        // we disallow spawning new actors on it, as this proc may be in an
811        // invalid state.
812        if !self.supervision_events.is_empty() {
813            self.actor_states.insert(
814                create_or_update.name.clone(),
815                ActorInstanceState {
816                    spawn: Err(anyhow::anyhow!(
817                        "Cannot spawn new actors on mesh with supervision events"
818                    )),
819                    create_rank,
820                    stopped: false,
821                    expiry_time: None,
822                },
823            );
824            return Ok(());
825        }
826
827        let ActorSpec {
828            actor_type,
829            params_data,
830        } = create_or_update.spec;
831        self.actor_states.insert(
832            create_or_update.name.clone(),
833            ActorInstanceState {
834                create_rank,
835                spawn: self
836                    .remote
837                    .gspawn(
838                        &self.proc,
839                        &actor_type,
840                        &create_or_update.name.to_string(),
841                        params_data,
842                        cx.headers().clone(),
843                    )
844                    .await,
845                stopped: false,
846                expiry_time: None,
847            },
848        );
849
850        self.publish_introspect_properties(cx);
851        Ok(())
852    }
853}
854
855#[async_trait]
856impl Handler<resource::Stop> for ProcAgent {
857    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
858        // We don't remove the actor from the state map, instead we just store
859        // its state as Stopped.
860        let actor = self.actor_states.get_mut(&message.name);
861        // Have to separate stop_actor from setting "stopped" because it borrows
862        // as mutable and cannot have self borrowed mutably twice.
863        let actor_id = match actor {
864            Some(actor_state) => {
865                match &actor_state.spawn {
866                    Ok(actor_id) => {
867                        if actor_state.stopped {
868                            None
869                        } else {
870                            actor_state.stopped = true;
871                            Some(actor_id.clone())
872                        }
873                    }
874                    // If the original spawn had failed, the actor is still considered
875                    // successfully stopped.
876                    Err(_) => None,
877                }
878            }
879            // TODO: represent unknown rank
880            None => None,
881        };
882        let timeout = hyperactor_config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
883        if let Some(actor_id) = actor_id {
884            // The orphaned actor SelfCheck will consult the stopped field before
885            // trying to stop any actor again.
886            // While this function returns a Result, it never returns an Err
887            // value so we can simply expect without any failure handling.
888            self.stop_actor(cx, actor_id, timeout.as_millis() as u64, message.reason)
889                .await
890                .expect("stop_actor cannot fail");
891        }
892
893        Ok(())
894    }
895}
896
897/// Handles `StopAll` by coordinating an orderly stop of child actors and then
898/// exiting the process. This handler never returns to the caller: it calls
899/// `std::process::exit(0/1)` after shutdown. Any sender must *not* expect a
900/// reply or send any further message, and should watch `ProcStatus` instead.
901#[async_trait]
902impl Handler<resource::StopAll> for ProcAgent {
903    async fn handle(
904        &mut self,
905        cx: &Context<Self>,
906        message: resource::StopAll,
907    ) -> anyhow::Result<()> {
908        let timeout = hyperactor_config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
909        // By passing in the self context, destroy_and_wait will stop this agent
910        // last, after all others are stopped.
911        let stop_result = self
912            .destroy_and_wait_except_current(cx, timeout, &message.reason)
913            .await;
914        // Exit here to cleanup all remaining resources held by the process.
915        // This means ProcAgent will never run cleanup or any other code
916        // from exiting its root actor. Senders of this message should never
917        // send any further messages or expect a reply.
918        match stop_result {
919            Ok((stopped_actors, aborted_actors)) => {
920                // No need to clean up any state, the process is exiting.
921                tracing::info!(
922                    actor = %cx.self_id(),
923                    "exiting process after receiving StopAll message on ProcAgent. \
924                    stopped actors = {:?}, aborted actors = {:?}",
925                    stopped_actors.into_iter().map(|a| a.to_string()).collect::<Vec<_>>(),
926                    aborted_actors.into_iter().map(|a| a.to_string()).collect::<Vec<_>>(),
927                );
928                if let Some(tx) = self.shutdown_tx.take() {
929                    let _ = tx.send(0);
930                    return Ok(());
931                }
932                std::process::exit(0);
933            }
934            Err(e) => {
935                tracing::error!(actor = %cx.self_id(), "failed to stop all actors on ProcAgent: {:?}", e);
936                if let Some(tx) = self.shutdown_tx.take() {
937                    let _ = tx.send(1);
938                    return Ok(());
939                }
940                std::process::exit(1);
941            }
942        }
943    }
944}
945
946#[async_trait]
947impl Handler<resource::GetRankStatus> for ProcAgent {
948    async fn handle(
949        &mut self,
950        cx: &Context<Self>,
951        get_rank_status: resource::GetRankStatus,
952    ) -> anyhow::Result<()> {
953        use crate::StatusOverlay;
954        use crate::resource::Status;
955
956        let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
957            Some(ActorInstanceState {
958                spawn: Ok(actor_id),
959                create_rank,
960                stopped,
961                ..
962            }) => {
963                if *stopped {
964                    (*create_rank, resource::Status::Stopped)
965                } else {
966                    let supervision_events = self
967                        .supervision_events
968                        .get(actor_id)
969                        .map_or_else(Vec::new, |a| a.clone());
970                    (
971                        *create_rank,
972                        if supervision_events.is_empty() {
973                            resource::Status::Running
974                        } else {
975                            resource::Status::Failed(format!(
976                                "because of supervision events: {:?}",
977                                supervision_events
978                            ))
979                        },
980                    )
981                }
982            }
983            Some(ActorInstanceState {
984                spawn: Err(e),
985                create_rank,
986                ..
987            }) => (*create_rank, Status::Failed(e.to_string())),
988            // TODO: represent unknown rank
989            None => (usize::MAX, Status::NotExist),
990        };
991
992        // Send a sparse overlay update. If rank is unknown, emit an
993        // empty overlay.
994        let overlay = if rank == usize::MAX {
995            StatusOverlay::new()
996        } else {
997            StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
998                .expect("valid single-run overlay")
999        };
1000        let result = get_rank_status.reply.send(cx, overlay);
1001        // Ignore errors, because returning Err from here would cause the ProcAgent
1002        // to be stopped, which would prevent querying and spawning other actors.
1003        // This only means some actor that requested the state of an actor failed to receive it.
1004        if let Err(e) = result {
1005            tracing::warn!(
1006                actor = %cx.self_id(),
1007                "failed to send GetRankStatus reply to {} due to error: {}",
1008                get_rank_status.reply.port_id().actor_id(),
1009                e
1010            );
1011        }
1012        Ok(())
1013    }
1014}
1015
1016#[async_trait]
1017impl Handler<resource::GetState<ActorState>> for ProcAgent {
1018    async fn handle(
1019        &mut self,
1020        cx: &Context<Self>,
1021        get_state: resource::GetState<ActorState>,
1022    ) -> anyhow::Result<()> {
1023        let state = match self.actor_states.get(&get_state.name) {
1024            Some(ActorInstanceState {
1025                create_rank,
1026                spawn: Ok(actor_id),
1027                stopped,
1028                ..
1029            }) => {
1030                let supervision_events = self
1031                    .supervision_events
1032                    .get(actor_id)
1033                    .map_or_else(Vec::new, |a| a.clone());
1034                let status = if *stopped {
1035                    resource::Status::Stopped
1036                } else if supervision_events.is_empty() {
1037                    resource::Status::Running
1038                } else {
1039                    resource::Status::Failed(format!(
1040                        "because of supervision events: {:?}",
1041                        supervision_events
1042                    ))
1043                };
1044                resource::State {
1045                    name: get_state.name.clone(),
1046                    status,
1047                    state: Some(ActorState {
1048                        actor_id: actor_id.clone(),
1049                        create_rank: *create_rank,
1050                        supervision_events,
1051                    }),
1052                }
1053            }
1054            Some(ActorInstanceState { spawn: Err(e), .. }) => resource::State {
1055                name: get_state.name.clone(),
1056                status: resource::Status::Failed(e.to_string()),
1057                state: None,
1058            },
1059            None => resource::State {
1060                name: get_state.name.clone(),
1061                status: resource::Status::NotExist,
1062                state: None,
1063            },
1064        };
1065
1066        let result = get_state.reply.send(cx, state);
1067        // Ignore errors, because returning Err from here would cause the ProcAgent
1068        // to be stopped, which would prevent querying and spawning other actors.
1069        // This only means some actor that requested the state of an actor failed to receive it.
1070        if let Err(e) = result {
1071            tracing::warn!(
1072                actor = %cx.self_id(),
1073                "failed to send GetState reply to {} due to error: {}",
1074                get_state.reply.port_id().actor_id(),
1075                e
1076            );
1077        }
1078        Ok(())
1079    }
1080}
1081
1082#[async_trait]
1083impl Handler<resource::KeepaliveGetState<ActorState>> for ProcAgent {
1084    async fn handle(
1085        &mut self,
1086        cx: &Context<Self>,
1087        message: resource::KeepaliveGetState<ActorState>,
1088    ) -> anyhow::Result<()> {
1089        // Same impl as GetState, but additionally update the expiry time on the actor.
1090        if let Ok(instance_state) = self
1091            .actor_states
1092            .get_mut(&message.get_state.name)
1093            .ok_or_else(|| {
1094                anyhow::anyhow!(
1095                    "attempting to register a keepalive for an actor that doesn't exist: {}",
1096                    message.get_state.name
1097                )
1098            })
1099        {
1100            instance_state.expiry_time = Some(message.expires_after);
1101        }
1102
1103        // Forward the rest of the impl to GetState.
1104        <Self as Handler<resource::GetState<ActorState>>>::handle(self, cx, message.get_state).await
1105    }
1106}
1107
1108/// A local handler to get a new client instance on the proc.
1109/// This is used to create root client instances.
1110#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1111pub struct NewClientInstance {
1112    #[reply]
1113    pub client_instance: PortHandle<Instance<()>>,
1114}
1115
1116#[async_trait]
1117impl Handler<NewClientInstance> for ProcAgent {
1118    async fn handle(
1119        &mut self,
1120        cx: &Context<Self>,
1121        NewClientInstance { client_instance }: NewClientInstance,
1122    ) -> anyhow::Result<()> {
1123        let (instance, _handle) = self.proc.instance("client")?;
1124        client_instance.send(cx, instance)?;
1125        Ok(())
1126    }
1127}
1128
1129/// A handler to get a clone of the proc managed by this agent.
1130/// This is used to obtain the local proc from a host mesh.
1131#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
1132pub struct GetProc {
1133    #[reply]
1134    pub proc: PortHandle<Proc>,
1135}
1136
1137#[async_trait]
1138impl Handler<GetProc> for ProcAgent {
1139    async fn handle(
1140        &mut self,
1141        cx: &Context<Self>,
1142        GetProc { proc }: GetProc,
1143    ) -> anyhow::Result<()> {
1144        proc.send(cx, self.proc.clone())?;
1145        Ok(())
1146    }
1147}
1148
1149#[async_trait]
1150impl Handler<SelfCheck> for ProcAgent {
1151    async fn handle(&mut self, cx: &Context<Self>, _: SelfCheck) -> anyhow::Result<()> {
1152        // Check each actor's expiry time. If the current time is past the expiry,
1153        // stop the actor. This allows automatic cleanup when a controller disappears
1154        // but owned resources remain. It is important that this check runs on the
1155        // same proc as the child actor itself, since the controller could be dead or
1156        // disconnected.
1157        let Some(duration) = &self.mesh_orphan_timeout else {
1158            return Ok(());
1159        };
1160        let duration = duration.clone();
1161        let now = std::time::SystemTime::now();
1162
1163        // Collect expired actors before mutating, since stop_actor borrows &mut self.
1164        let expired: Vec<(Name, hyperactor_reference::ActorId)> = self
1165            .actor_states
1166            .iter()
1167            .filter_map(|(name, state)| {
1168                let expiry = state.expiry_time?;
1169                // If the actor was already stopped we don't need to stop it again.
1170                if now > expiry && !state.stopped {
1171                    if let Ok(actor_id) = &state.spawn {
1172                        return Some((name.clone(), actor_id.clone()));
1173                    }
1174                }
1175                None
1176            })
1177            .collect();
1178
1179        if !expired.is_empty() {
1180            tracing::info!(
1181                "stopping {} orphaned actors past their keepalive expiry",
1182                expired.len(),
1183            );
1184        }
1185
1186        let timeout = hyperactor_config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
1187        for (name, actor_id) in expired {
1188            if let Some(state) = self.actor_states.get_mut(&name) {
1189                state.stopped = true;
1190            }
1191            self.stop_actor(
1192                cx,
1193                actor_id,
1194                timeout.as_millis() as u64,
1195                "orphaned".to_string(),
1196            )
1197            .await
1198            .expect("stop_actor cannot fail");
1199        }
1200
1201        // Reschedule.
1202        cx.self_message_with_delay(SelfCheck::default(), duration)?;
1203        Ok(())
1204    }
1205}
1206
1207/// A mailbox sender that initially queues messages, and then relays them to
1208/// an underlying sender once configured.
1209#[derive(Clone)]
1210pub(crate) struct ReconfigurableMailboxSender {
1211    state: Arc<RwLock<ReconfigurableMailboxSenderState>>,
1212}
1213
1214impl std::fmt::Debug for ReconfigurableMailboxSender {
1215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1216        // Not super helpful, but we definitely don't wan to acquire any locks
1217        // in a Debug formatter.
1218        f.debug_struct("ReconfigurableMailboxSender").finish()
1219    }
1220}
1221
1222/// A capability wrapper granting access to the configured mailbox
1223/// sender.
1224///
1225/// This type exists to tie the lifetime of any `&BoxedMailboxSender`
1226/// reference to a lock guard, so the underlying state cannot be
1227/// reconfigured while the reference is in use.
1228///
1229/// A **read** guard is sufficient because we only need to *observe*
1230/// and borrow the configured sender, not mutate state. While a
1231/// `RwLockReadGuard` is held, `configure()` cannot acquire the write
1232/// lock, so the state cannot transition from `Configured(..)` to any
1233/// other variant during the guard’s lifetime.
1234pub(crate) struct ReconfigurableMailboxSenderInner<'a> {
1235    guard: RwLockReadGuard<'a, ReconfigurableMailboxSenderState>,
1236}
1237
1238impl<'a> ReconfigurableMailboxSenderInner<'a> {
1239    pub(crate) fn as_configured(&self) -> Option<&BoxedMailboxSender> {
1240        self.guard.as_configured()
1241    }
1242}
1243
1244type Post = (MessageEnvelope, PortHandle<Undeliverable<MessageEnvelope>>);
1245
1246#[derive(EnumAsInner, Debug)]
1247enum ReconfigurableMailboxSenderState {
1248    Queueing(Mutex<Vec<Post>>),
1249    Configured(BoxedMailboxSender),
1250}
1251
1252impl ReconfigurableMailboxSender {
1253    pub(crate) fn new() -> Self {
1254        Self {
1255            state: Arc::new(RwLock::new(ReconfigurableMailboxSenderState::Queueing(
1256                Mutex::new(Vec::new()),
1257            ))),
1258        }
1259    }
1260
1261    /// Configure this mailbox with the provided sender. This will first
1262    /// enqueue any pending messages onto the sender; future messages are
1263    /// posted directly to the configured sender.
1264    pub(crate) fn configure(&self, sender: BoxedMailboxSender) -> bool {
1265        // Hold the write lock until all queued messages are flushed.
1266        let mut state = self.state.write().unwrap();
1267        if state.is_configured() {
1268            return false;
1269        }
1270
1271        // Install the configured sender exactly once.
1272        let queued = std::mem::replace(
1273            &mut *state,
1274            ReconfigurableMailboxSenderState::Configured(sender),
1275        );
1276
1277        // Borrow the configured sender from the state (stable while
1278        // we hold the lock).
1279        let configured_sender = state.as_configured().expect("just configured");
1280
1281        // Flush the old queue while still holding the write lock.
1282        for (envelope, return_handle) in queued.into_queueing().unwrap().into_inner().unwrap() {
1283            configured_sender.post(envelope, return_handle);
1284        }
1285
1286        true
1287    }
1288
1289    pub(crate) fn as_inner<'a>(
1290        &'a self,
1291    ) -> Result<ReconfigurableMailboxSenderInner<'a>, anyhow::Error> {
1292        let state = self.state.read().unwrap();
1293        if state.is_configured() {
1294            Ok(ReconfigurableMailboxSenderInner { guard: state })
1295        } else {
1296            Err(anyhow::anyhow!("cannot get inner sender: not configured"))
1297        }
1298    }
1299}
1300
1301impl MailboxSender for ReconfigurableMailboxSender {
1302    fn post(
1303        &self,
1304        envelope: MessageEnvelope,
1305        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1306    ) {
1307        match &*self.state.read().unwrap() {
1308            ReconfigurableMailboxSenderState::Queueing(queue) => {
1309                queue.lock().unwrap().push((envelope, return_handle));
1310            }
1311            ReconfigurableMailboxSenderState::Configured(sender) => {
1312                sender.post(envelope, return_handle);
1313            }
1314        }
1315    }
1316
1317    fn post_unchecked(
1318        &self,
1319        envelope: MessageEnvelope,
1320        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1321    ) {
1322        match &*self.state.read().unwrap() {
1323            ReconfigurableMailboxSenderState::Queueing(queue) => {
1324                queue.lock().unwrap().push((envelope, return_handle));
1325            }
1326            ReconfigurableMailboxSenderState::Configured(sender) => {
1327                sender.post_unchecked(envelope, return_handle);
1328            }
1329        }
1330    }
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335    use std::sync::Arc;
1336    use std::sync::Mutex;
1337
1338    use hyperactor::mailbox::BoxedMailboxSender;
1339    use hyperactor::mailbox::Mailbox;
1340    use hyperactor::mailbox::MailboxSender;
1341    use hyperactor::mailbox::MessageEnvelope;
1342    use hyperactor::mailbox::PortHandle;
1343    use hyperactor::mailbox::Undeliverable;
1344    use hyperactor::testing::ids::test_actor_id;
1345    use hyperactor::testing::ids::test_port_id;
1346    use hyperactor_config::Flattrs;
1347
1348    use super::*;
1349
1350    #[derive(Debug, Clone)]
1351    struct QueueingMailboxSender {
1352        messages: Arc<Mutex<Vec<MessageEnvelope>>>,
1353    }
1354
1355    impl QueueingMailboxSender {
1356        fn new() -> Self {
1357            Self {
1358                messages: Arc::new(Mutex::new(Vec::new())),
1359            }
1360        }
1361
1362        fn get_messages(&self) -> Vec<MessageEnvelope> {
1363            self.messages.lock().unwrap().clone()
1364        }
1365    }
1366
1367    impl MailboxSender for QueueingMailboxSender {
1368        fn post_unchecked(
1369            &self,
1370            envelope: MessageEnvelope,
1371            _return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
1372        ) {
1373            self.messages.lock().unwrap().push(envelope);
1374        }
1375    }
1376
1377    // Helper function to create a test message envelope
1378    fn envelope(data: u64) -> MessageEnvelope {
1379        MessageEnvelope::serialize(
1380            test_actor_id("world_0", "sender"),
1381            test_port_id("world_0", "receiver", 1),
1382            &data,
1383            Flattrs::new(),
1384        )
1385        .unwrap()
1386    }
1387
1388    fn return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
1389        let mbox = Mailbox::new_detached(test_actor_id("0", "test"));
1390        let (port, _receiver) = mbox.open_port::<Undeliverable<MessageEnvelope>>();
1391        port
1392    }
1393
1394    #[test]
1395    fn test_queueing_before_configure() {
1396        let sender = ReconfigurableMailboxSender::new();
1397
1398        let test_sender = QueueingMailboxSender::new();
1399        let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1400
1401        let return_handle = return_handle();
1402        sender.post(envelope(1), return_handle.clone());
1403        sender.post(envelope(2), return_handle.clone());
1404
1405        assert_eq!(test_sender.get_messages().len(), 0);
1406
1407        sender.configure(boxed_sender);
1408
1409        let messages = test_sender.get_messages();
1410        assert_eq!(messages.len(), 2);
1411
1412        assert_eq!(messages[0].deserialized::<u64>().unwrap(), 1);
1413        assert_eq!(messages[1].deserialized::<u64>().unwrap(), 2);
1414    }
1415
1416    #[test]
1417    fn test_direct_delivery_after_configure() {
1418        // Create a ReconfigurableMailboxSender
1419        let sender = ReconfigurableMailboxSender::new();
1420
1421        let test_sender = QueueingMailboxSender::new();
1422        let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1423        sender.configure(boxed_sender);
1424
1425        let return_handle = return_handle();
1426        sender.post(envelope(3), return_handle.clone());
1427        sender.post(envelope(4), return_handle.clone());
1428
1429        let messages = test_sender.get_messages();
1430        assert_eq!(messages.len(), 2);
1431
1432        assert_eq!(messages[0].deserialized::<u64>().unwrap(), 3);
1433        assert_eq!(messages[1].deserialized::<u64>().unwrap(), 4);
1434    }
1435
1436    #[test]
1437    fn test_multiple_configurations() {
1438        let sender = ReconfigurableMailboxSender::new();
1439        let boxed_sender = BoxedMailboxSender::new(QueueingMailboxSender::new());
1440
1441        assert!(sender.configure(boxed_sender.clone()));
1442        assert!(!sender.configure(boxed_sender));
1443    }
1444
1445    #[test]
1446    fn test_mixed_queueing_and_direct_delivery() {
1447        let sender = ReconfigurableMailboxSender::new();
1448
1449        let test_sender = QueueingMailboxSender::new();
1450        let boxed_sender = BoxedMailboxSender::new(test_sender.clone());
1451
1452        let return_handle = return_handle();
1453        sender.post(envelope(5), return_handle.clone());
1454        sender.post(envelope(6), return_handle.clone());
1455
1456        sender.configure(boxed_sender);
1457
1458        sender.post(envelope(7), return_handle.clone());
1459        sender.post(envelope(8), return_handle.clone());
1460
1461        let messages = test_sender.get_messages();
1462        assert_eq!(messages.len(), 4);
1463
1464        assert_eq!(messages[0].deserialized::<u64>().unwrap(), 5);
1465        assert_eq!(messages[1].deserialized::<u64>().unwrap(), 6);
1466        assert_eq!(messages[2].deserialized::<u64>().unwrap(), 7);
1467        assert_eq!(messages[3].deserialized::<u64>().unwrap(), 8);
1468    }
1469
1470    // A no-op actor used to test direct proc-level spawning.
1471    #[derive(Debug, Default)]
1472    #[hyperactor::export(handlers = [])]
1473    struct ExtraActor;
1474    impl hyperactor::Actor for ExtraActor {}
1475
1476    // Verifies that QueryChild(Reference::Proc) on a ProcAgent returns
1477    // a live IntrospectResult whose children reflect actors spawned
1478    // directly on the proc — i.e. via proc.spawn(), which bypasses the
1479    // gspawn message handler and therefore never triggers
1480    // publish_introspect_properties.
1481    //
1482    // Exercises PA-1 (see mesh_admin module doc).
1483    //
1484    // Regression guard for the bug introduced in 9a08d559: removing
1485    // handle_introspect left publish_introspect_properties as the only
1486    // update path, which missed supervision-spawned actors (e.g. every
1487    // sieve actor after sieve[0]). See also
1488    // mesh_admin::tests::test_proc_children_reflect_directly_spawned_actors.
1489    #[tokio::test]
1490    async fn test_query_child_proc_returns_live_children() {
1491        use hyperactor::Proc;
1492        use hyperactor::actor::ActorStatus;
1493        use hyperactor::channel::ChannelTransport;
1494        use hyperactor::introspect::IntrospectMessage;
1495        use hyperactor::introspect::IntrospectResult;
1496        use hyperactor::reference as hyperactor_reference;
1497
1498        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1499        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1500
1501        // Wait for ProcAgent to finish init.
1502        agent_handle
1503            .status()
1504            .wait_for(|s| matches!(s, ActorStatus::Idle))
1505            .await
1506            .unwrap();
1507
1508        // Client instance for opening reply ports.
1509        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1510        let (client, _client_handle) = client_proc.instance("client").unwrap();
1511
1512        let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1513        let port =
1514            hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1515
1516        // Helper: send QueryChild(Proc) and return the payload with a
1517        // timeout so a misrouted reply fails fast rather than hanging.
1518        let query = |client: &hyperactor::Instance<()>| {
1519            let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1520            port.send(
1521                client,
1522                IntrospectMessage::QueryChild {
1523                    child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1524                    reply: reply_port.bind(),
1525                },
1526            )
1527            .unwrap();
1528            reply_rx
1529        };
1530        let recv = |rx: hyperactor::mailbox::OncePortReceiver<IntrospectResult>| async move {
1531            tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
1532                .await
1533                .expect("QueryChild(Proc) timed out — reply never delivered")
1534                .expect("reply channel closed")
1535        };
1536
1537        // Initial query: ProcAgent itself should appear in children.
1538        let payload = recv(query(&client)).await;
1539        // Verify this is a proc node by checking attrs contain node_type=proc.
1540        let attrs: hyperactor_config::Attrs =
1541            serde_json::from_str(&payload.attrs).expect("valid attrs JSON");
1542        assert_eq!(
1543            attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1544            Some("proc"),
1545            "expected node_type=proc in attrs, got {:?}",
1546            payload.attrs
1547        );
1548        assert!(
1549            payload
1550                .children
1551                .iter()
1552                .any(|c| c.contains(PROC_AGENT_ACTOR_NAME)),
1553            "initial children {:?} should contain proc_agent",
1554            payload.children
1555        );
1556        let initial_count = payload.children.len();
1557
1558        // Spawn an actor directly on the proc, bypassing ProcAgent's
1559        // gspawn message handler. This is how supervision-spawned
1560        // actors (e.g. sieve children) are created.
1561        proc.spawn("extra_actor", ExtraActor).unwrap();
1562
1563        // Second query: extra_actor must appear without any republish.
1564        let payload2 = recv(query(&client)).await;
1565        let attrs2: hyperactor_config::Attrs =
1566            serde_json::from_str(&payload2.attrs).expect("valid attrs JSON");
1567        assert_eq!(
1568            attrs2.get(crate::introspect::NODE_TYPE).map(String::as_str),
1569            Some("proc"),
1570            "expected node_type=proc in attrs, got {:?}",
1571            payload2.attrs
1572        );
1573        assert!(
1574            payload2.children.iter().any(|c| c.contains("extra_actor")),
1575            "after direct spawn, children {:?} should contain extra_actor",
1576            payload2.children
1577        );
1578        assert!(
1579            payload2.children.len() > initial_count,
1580            "expected at least {} children after direct spawn, got {:?}",
1581            initial_count + 1,
1582            payload2.children
1583        );
1584    }
1585
1586    // Exercises S12 (see introspect module doc): introspection must
1587    // not impair actor liveness. Rapidly spawns and stops
1588    // actors while concurrently querying QueryChild(Reference::Proc).
1589    // The spawn/stop loop must complete within the timeout and the
1590    // iteration count must match -- if DashMap convoy starvation
1591    // blocks the proc, the timeout fires and the test fails.
1592    #[tokio::test]
1593    async fn test_rapid_spawn_stop_does_not_stall_proc_agent() {
1594        use std::sync::Arc;
1595        use std::sync::atomic::AtomicUsize;
1596        use std::sync::atomic::Ordering;
1597
1598        use hyperactor::Proc;
1599        use hyperactor::actor::ActorStatus;
1600        use hyperactor::channel::ChannelTransport;
1601        use hyperactor::introspect::IntrospectMessage;
1602        use hyperactor::introspect::IntrospectResult;
1603        use hyperactor::reference as hyperactor_reference;
1604
1605        let proc = Proc::direct(ChannelTransport::Unix.any(), "test_proc".to_string()).unwrap();
1606        let agent_handle = ProcAgent::boot_v1(proc.clone(), None).unwrap();
1607
1608        agent_handle
1609            .status()
1610            .wait_for(|s| matches!(s, ActorStatus::Idle))
1611            .await
1612            .unwrap();
1613
1614        let client_proc = Proc::direct(ChannelTransport::Unix.any(), "client".to_string()).unwrap();
1615        let (client, _client_handle) = client_proc.instance("client").unwrap();
1616
1617        let agent_id = proc.proc_id().actor_id(PROC_AGENT_ACTOR_NAME, 0);
1618        let port =
1619            hyperactor_reference::PortRef::<IntrospectMessage>::attest_message_port(&agent_id);
1620
1621        // Concurrent query task: send QueryChild(Proc) every 10ms.
1622        let query_client_proc =
1623            Proc::direct(ChannelTransport::Unix.any(), "query_client".to_string()).unwrap();
1624        let (query_client, _qc_handle) = query_client_proc.instance("qc").unwrap();
1625        let query_port = port.clone();
1626        let query_proc_id = proc.proc_id().clone();
1627        let query_count = Arc::new(AtomicUsize::new(0));
1628        let query_count_clone = query_count.clone();
1629        let query_task = tokio::spawn(async move {
1630            loop {
1631                let (reply_port, reply_rx) = query_client.open_once_port::<IntrospectResult>();
1632                if query_port
1633                    .send(
1634                        &query_client,
1635                        IntrospectMessage::QueryChild {
1636                            child_ref: hyperactor_reference::Reference::Proc(query_proc_id.clone()),
1637                            reply: reply_port.bind(),
1638                        },
1639                    )
1640                    .is_err()
1641                {
1642                    break;
1643                }
1644                match tokio::time::timeout(std::time::Duration::from_secs(2), reply_rx.recv()).await
1645                {
1646                    Ok(Ok(_)) => {
1647                        query_count_clone.fetch_add(1, Ordering::Relaxed);
1648                    }
1649                    _ => {} // Transient failures expected during churn
1650                }
1651                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1652            }
1653        });
1654
1655        // Rapid spawn/stop loop with liveness timeout.
1656        const ITERATIONS: usize = 200;
1657        let mut completed = 0usize;
1658        let result = tokio::time::timeout(std::time::Duration::from_secs(30), async {
1659            for i in 0..ITERATIONS {
1660                let name = format!("churn_{}", i);
1661                let handle = proc.spawn(&name, ExtraActor).unwrap();
1662                let actor_id = handle.actor_id().clone();
1663                if let Some(mut status) = proc.stop_actor(&actor_id, "churn".to_string()) {
1664                    let _ = tokio::time::timeout(
1665                        std::time::Duration::from_secs(5),
1666                        status.wait_for(ActorStatus::is_terminal),
1667                    )
1668                    .await;
1669                }
1670                completed += 1;
1671            }
1672        })
1673        .await;
1674
1675        query_task.abort();
1676        let _ = query_task.await; // Join to suppress noisy panic on drop.
1677
1678        assert!(
1679            result.is_ok(),
1680            "spawn/stop loop stalled after {completed}/{ITERATIONS} iterations — \
1681             DashMap convoy starvation likely"
1682        );
1683        assert_eq!(
1684            completed, ITERATIONS,
1685            "expected {ITERATIONS} completed iterations, got {completed}"
1686        );
1687        assert!(
1688            query_count.load(Ordering::Relaxed) > 0,
1689            "concurrent QueryChild queries never succeeded — query task may not have run"
1690        );
1691
1692        // Final consistency check: QueryChild should still work.
1693        let (reply_port, reply_rx) = client.open_once_port::<IntrospectResult>();
1694        port.send(
1695            &client,
1696            IntrospectMessage::QueryChild {
1697                child_ref: hyperactor_reference::Reference::Proc(proc.proc_id().clone()),
1698                reply: reply_port.bind(),
1699            },
1700        )
1701        .unwrap();
1702        let final_payload =
1703            tokio::time::timeout(std::time::Duration::from_secs(5), reply_rx.recv())
1704                .await
1705                .expect("final QueryChild timed out")
1706                .expect("final QueryChild channel closed");
1707        let attrs: hyperactor_config::Attrs =
1708            serde_json::from_str(&final_payload.attrs).expect("valid attrs JSON");
1709        assert_eq!(
1710            attrs.get(crate::introspect::NODE_TYPE).map(String::as_str),
1711            Some("proc"),
1712        );
1713    }
1714}