hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module provides [`Proc`], which is the runtime used within a single
10//! proc.
11
12// TODO: define a set of proc errors and plumb these throughout
13
14use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::panic::Location;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::sync::Weak;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::SystemTime;
34
35use async_trait::async_trait;
36use dashmap::DashMap;
37use dashmap::mapref::entry::Entry;
38use dashmap::mapref::multiple::RefMulti;
39use futures::FutureExt;
40use hyperactor_macros::AttrValue;
41use hyperactor_macros::Named;
42use hyperactor_telemetry::recorder;
43use hyperactor_telemetry::recorder::Recording;
44use serde::Deserialize;
45use serde::Serialize;
46use tokio::sync::mpsc;
47use tokio::sync::watch;
48use tokio::task::JoinHandle;
49use tracing::Instrument;
50use tracing::Level;
51use tracing::Span;
52use uuid::Uuid;
53
54use crate as hyperactor;
55use crate::Actor;
56use crate::ActorRef;
57use crate::Handler;
58use crate::Message;
59use crate::Named as _;
60use crate::RemoteMessage;
61use crate::actor::ActorError;
62use crate::actor::ActorErrorKind;
63use crate::actor::ActorHandle;
64use crate::actor::ActorStatus;
65use crate::actor::Binds;
66use crate::actor::Referable;
67use crate::actor::RemoteHandles;
68use crate::actor::Signal;
69use crate::attrs::Attrs;
70use crate::channel;
71use crate::channel::ChannelAddr;
72use crate::channel::ChannelError;
73use crate::clock::Clock;
74use crate::clock::ClockKind;
75use crate::clock::RealClock;
76use crate::config;
77use crate::context;
78use crate::data::Serialized;
79use crate::data::TypeInfo;
80use crate::declare_attrs;
81use crate::mailbox::BoxedMailboxSender;
82use crate::mailbox::DeliveryError;
83use crate::mailbox::DialMailboxRouter;
84use crate::mailbox::IntoBoxedMailboxSender as _;
85use crate::mailbox::Mailbox;
86use crate::mailbox::MailboxMuxer;
87use crate::mailbox::MailboxSender;
88use crate::mailbox::MailboxServer as _;
89use crate::mailbox::MessageEnvelope;
90use crate::mailbox::OncePortHandle;
91use crate::mailbox::OncePortReceiver;
92use crate::mailbox::PanickingMailboxSender;
93use crate::mailbox::PortHandle;
94use crate::mailbox::PortReceiver;
95use crate::mailbox::Undeliverable;
96use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
97use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
98use crate::metrics::ACTOR_MESSAGES_RECEIVED;
99use crate::ordering::OrderedSender;
100use crate::ordering::OrderedSenderError;
101use crate::ordering::Sequencer;
102use crate::ordering::ordered_channel;
103use crate::panic_handler;
104use crate::reference::ActorId;
105use crate::reference::Index;
106use crate::reference::PortId;
107use crate::reference::ProcId;
108use crate::reference::id;
109use crate::supervision::ActorSupervisionEvent;
110
111/// This is used to mint new local ranks for [`Proc::local`].
112static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
113
114/// A proc instance is the runtime managing a single proc in Hyperactor.
115/// It is responsible for spawning actors in the proc, multiplexing messages
116/// to/within actors in the proc, and providing fallback routing to external
117/// procs.
118///
119/// Procs are also responsible for maintaining the local supervision hierarchy.
120#[derive(Clone, Debug)]
121pub struct Proc {
122    inner: Arc<ProcState>,
123}
124
125#[derive(Debug)]
126struct ProcState {
127    /// The proc's id. This should be globally unique, but is not (yet)
128    /// for local-only procs.
129    proc_id: ProcId,
130
131    /// A muxer instance that has entries for every actor managed by
132    /// the proc.
133    proc_muxer: MailboxMuxer,
134
135    /// Sender used to forward messages outside of the proc.
136    forwarder: BoxedMailboxSender,
137
138    /// All of the roots (i.e., named actors with pid=0) in the proc.
139    /// These are also known as "global actors", since they may be
140    /// spawned remotely.
141    roots: DashMap<String, AtomicUsize>,
142
143    /// Keep track of all of the active actors in the proc.
144    ledger: ActorLedger,
145
146    instances: DashMap<ActorId, WeakInstanceCell>,
147
148    /// Used by root actors to send events to the actor coordinating
149    /// supervision of root actors in this proc.
150    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
151
152    clock: ClockKind,
153}
154
155impl Drop for ProcState {
156    fn drop(&mut self) {
157        // We only want log ProcStatus::Dropped when ProcState is dropped,
158        // rather than Proc is dropped. This is because we need to wait for
159        // Proc::inner's ref count becomes 0.
160        tracing::info!(
161            proc_id = %self.proc_id,
162            name = "ProcStatus",
163            status = "Dropped"
164        );
165    }
166}
167
168/// A snapshot view of the proc's actor ledger.
169#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
170pub struct ActorLedgerSnapshot {
171    /// All the actor trees in the proc, mapping the root id to the root
172    /// of each tree.
173    pub roots: HashMap<ActorId, ActorTreeSnapshot>,
174}
175
176/// A event for one row of log.
177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub struct Event {
179    /// Time when the event happend.
180    pub time: SystemTime,
181    /// The payload of the event.
182    pub fields: Vec<(String, recorder::Value)>,
183    /// The sequence number of the event.
184    pub seq: usize,
185}
186
187impl From<recorder::Event> for Event {
188    fn from(event: recorder::Event) -> Event {
189        Event {
190            time: event.time,
191            fields: event.fields(),
192            seq: event.seq,
193        }
194    }
195}
196
197/// A snapshot of an actor tree (rooted at a pid=0 actor).
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
199pub struct ActorTreeSnapshot {
200    /// The PID of this actor.
201    pub pid: Index,
202
203    /// The type name of the actor. If the actor is [`crate::Named`], then
204    /// this is the registered name; otherwise it is the actor type's
205    /// [`std::any::type_name`].
206    pub type_name: String,
207
208    /// The actor's current status.
209    pub status: ActorStatus,
210
211    /// Various operational stats for the actor.
212    pub stats: ActorStats,
213
214    /// This actor's handlers, mapping port numbers to the named type handled.
215    pub handlers: HashMap<u64, String>,
216
217    /// This actor's children.
218    pub children: HashMap<Index, ActorTreeSnapshot>,
219
220    /// Recent events emitted by the actor's logging.
221    pub events: Vec<Event>,
222
223    /// The current set of spans entered by the actor. These should be active
224    /// only while the actor is entered in a handler.
225    pub spans: Vec<Vec<String>>,
226}
227
228impl Hash for ActorTreeSnapshot {
229    fn hash<H: Hasher>(&self, state: &mut H) {
230        self.pid.hash(state);
231    }
232}
233
234/// Operational stats for an actor instance.
235#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
236#[derive(Default)]
237pub struct ActorStats {
238    /// The number of messages processed by the actor.
239    num_processed_messages: u64,
240}
241
242impl fmt::Display for ActorStats {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        write!(f, "num_processed_messages={}", self.num_processed_messages)
245    }
246}
247
248#[derive(Debug)]
249struct ActorLedger {
250    // Root actors. Map's value is its key's InstanceCell.
251    roots: DashMap<ActorId, WeakInstanceCell>,
252}
253
254impl ActorLedger {
255    fn new() -> Self {
256        Self {
257            roots: DashMap::new(),
258        }
259    }
260
261    fn insert(
262        &self,
263        root_actor_id: ActorId,
264        root_actor_cell: WeakInstanceCell,
265    ) -> Result<(), anyhow::Error> {
266        match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
267            None => Ok(()),
268            // This should never happen because we do not recycle root actor's
269            // IDs.
270            Some(current_cell) => {
271                let debugging_msg = match current_cell.upgrade() {
272                    Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
273                    None => "the stored cell has been dropped".to_string(),
274                };
275
276                Err(anyhow::anyhow!(
277                    "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
278                ))
279            }
280        }
281    }
282
283    /// Get a snapshot view of this ledger.
284    fn snapshot(&self) -> ActorLedgerSnapshot {
285        let roots = self
286            .roots
287            .iter()
288            .flat_map(|r| {
289                let (actor_id, weak_cell) = r.pair();
290                // The actor might have been stopped or errored out. Since we do
291                // not remove inactive actors from ledger, the upgrade() call
292                // will return None in that scenario.
293                weak_cell
294                    .upgrade()
295                    .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
296            })
297            .collect();
298
299        ActorLedgerSnapshot { roots }
300    }
301
302    fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
303        // Get the edges between this actor and its children.
304        let children = cell
305            .child_iter()
306            .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
307            .collect();
308
309        ActorTreeSnapshot {
310            pid: cell.actor_id().pid(),
311            type_name: cell.inner.actor_type.type_name().to_string(),
312            status: cell.status().borrow().clone(),
313            stats: ActorStats {
314                num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
315            },
316            handlers: cell
317                .inner
318                .exported_named_ports
319                .iter()
320                .map(|entry| (*entry.key(), entry.value().to_string()))
321                .collect(),
322            children,
323            events: cell
324                .inner
325                .recording
326                .tail()
327                .into_iter()
328                .map(Event::from)
329                .collect(),
330            spans: cell
331                .inner
332                .recording
333                .stacks()
334                .into_iter()
335                .map(|stack| {
336                    stack
337                        .into_iter()
338                        .map(|meta| meta.name().to_string())
339                        .collect()
340                })
341                .collect(),
342        }
343    }
344}
345
346impl Proc {
347    /// Create a new proc with the given proc id and forwarder.
348    pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
349        Self::new_with_clock(proc_id, forwarder, ClockKind::default())
350    }
351
352    /// Create a new direct-addressed proc.
353    pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
354        let (addr, rx) = channel::serve(addr)?;
355        let proc_id = ProcId::Direct(addr, name);
356        let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
357        proc.clone().serve(rx);
358        Ok(proc)
359    }
360
361    /// Create a new direct-addressed proc with a default sender for the forwarder.
362    pub fn direct_with_default(
363        addr: ChannelAddr,
364        name: String,
365        default: BoxedMailboxSender,
366    ) -> Result<Self, ChannelError> {
367        let (addr, rx) = channel::serve(addr)?;
368        let proc_id = ProcId::Direct(addr, name);
369        let proc = Self::new(
370            proc_id,
371            DialMailboxRouter::new_with_default(default).into_boxed(),
372        );
373        proc.clone().serve(rx);
374        Ok(proc)
375    }
376
377    /// Create a new proc with the given proc id, forwarder and clock kind.
378    pub fn new_with_clock(
379        proc_id: ProcId,
380        forwarder: BoxedMailboxSender,
381        clock: ClockKind,
382    ) -> Self {
383        tracing::info!(
384            proc_id = %proc_id,
385            name = "ProcStatus",
386            status = "Created"
387        );
388        Self {
389            inner: Arc::new(ProcState {
390                proc_id,
391                proc_muxer: MailboxMuxer::new(),
392                forwarder,
393                roots: DashMap::new(),
394                ledger: ActorLedger::new(),
395                instances: DashMap::new(),
396                supervision_coordinator_port: OnceLock::new(),
397                clock,
398            }),
399        }
400    }
401
402    /// Set the supervision coordinator's port for this proc. Return Err if it is
403    /// already set.
404    pub fn set_supervision_coordinator(
405        &self,
406        port: PortHandle<ActorSupervisionEvent>,
407    ) -> Result<(), anyhow::Error> {
408        self.state()
409            .supervision_coordinator_port
410            .set(port)
411            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
412    }
413
414    fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
415        let result = match self.state().supervision_coordinator_port.get() {
416            Some(port) => port.send(event.clone()).map_err(anyhow::Error::from),
417            None => Err(anyhow::anyhow!(
418                "coordinator port is not set for proc {}",
419                self.proc_id(),
420            )),
421        };
422        if let Err(err) = result {
423            tracing::error!(
424                "proc {}: could not propagate supervision event {} due to error: {:?}: crashing",
425                self.proc_id(),
426                event,
427                err
428            );
429
430            std::process::exit(1);
431        }
432    }
433
434    /// Create a new local-only proc. This proc is not allowed to forward messages
435    /// outside of the proc itself.
436    pub fn local() -> Self {
437        // TODO: name these something that is ~ globally unique, e.g., incorporate
438        // the hostname, some GUID, etc.
439        let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
440        // TODO: make it so that local procs can talk to each other.
441        Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
442    }
443
444    /// The proc's ID.
445    pub fn proc_id(&self) -> &ProcId {
446        &self.state().proc_id
447    }
448
449    /// Shared sender used by the proc to forward messages to remote
450    /// destinations.
451    pub fn forwarder(&self) -> &BoxedMailboxSender {
452        &self.inner.forwarder
453    }
454
455    /// Convenience accessor for state.
456    fn state(&self) -> &ProcState {
457        self.inner.as_ref()
458    }
459
460    /// The proc's clock.
461    pub fn clock(&self) -> &ClockKind {
462        &self.state().clock
463    }
464
465    /// Get the snapshot of the ledger.
466    pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
467        self.state().ledger.snapshot()
468    }
469
470    /// Attach a mailbox to the proc with the provided root name.
471    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
472        let actor_id: ActorId = self.allocate_root_id(name)?;
473        Ok(self.bind_mailbox(actor_id))
474    }
475
476    /// Attach a mailbox to the proc as a child actor.
477    pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
478        let actor_id: ActorId = self.allocate_child_id(parent_id)?;
479        Ok(self.bind_mailbox(actor_id))
480    }
481
482    /// Bind a mailbox to the proc.
483    fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
484        let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
485
486        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
487        // by the caller. This will likely require a weak reference.
488        self.state().proc_muxer.bind_mailbox(mbox.clone());
489        mbox
490    }
491
492    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorRef`].
493    /// This is intended only for testing, and will be replaced by simpled utilities.
494    pub fn attach_actor<R, M>(
495        &self,
496        name: &str,
497    ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
498    where
499        M: RemoteMessage,
500        R: Referable + RemoteHandles<M>,
501    {
502        let (instance, _handle) = self.instance(name)?;
503        let (_handle, rx) = instance.bind_actor_port::<M>();
504        let actor_ref = ActorRef::attest(instance.self_id().clone());
505        Ok((instance, actor_ref, rx))
506    }
507
508    /// Spawn a named (root) actor on this proc. The name of the actor must be
509    /// unique.
510    #[hyperactor::observe_result("Proc")]
511    pub async fn spawn<A: Actor>(
512        &self,
513        name: &str,
514        params: A::Params,
515    ) -> Result<ActorHandle<A>, anyhow::Error> {
516        let actor_id = self.allocate_root_id(name)?;
517        let span = tracing::span!(
518            Level::INFO,
519            "spawn_actor",
520            actor_name = name,
521            actor_type = std::any::type_name::<A>(),
522            actor_id = actor_id.to_string(),
523        );
524        let (instance, mut actor_loop_receivers, work_rx) = {
525            let _guard = span.clone().entered();
526            Instance::new(self.clone(), actor_id.clone(), false, None)
527        };
528        let actor = A::new(params).instrument(span.clone()).await?;
529        // Add this actor to the proc's actor ledger. We do not actively remove
530        // inactive actors from ledger, because the actor's state can be inferred
531        // from its weak cell.
532        self.state()
533            .ledger
534            .insert(actor_id.clone(), instance.inner.cell.downgrade())?;
535
536        Ok(instance
537            .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
538            .instrument(span)
539            .await)
540    }
541
542    /// Create and return an actor instance and its corresponding handle. This allows actors to be
543    /// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
544    /// launch child actors, etc. The actor itself does not handle any messages, and supervision events
545    /// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be
546    /// referenced and stopped.
547    pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
548        let actor_id = self.allocate_root_id(name)?;
549        let _ = tracing::debug_span!(
550            "actor_instance",
551            actor_name = name,
552            actor_type = std::any::type_name::<()>(),
553            actor_id = actor_id.to_string(),
554        );
555
556        let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None);
557        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
558
559        instance.change_status(ActorStatus::Client);
560
561        Ok((instance, handle))
562    }
563
564    /// Create a child instance. Called from `Instance`.
565    fn child_instance(
566        &self,
567        parent: InstanceCell,
568    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
569        let actor_id = self.allocate_child_id(parent.actor_id())?;
570        let _ = tracing::debug_span!(
571            "child_actor_instance",
572            parent_actor_id = %parent.actor_id(),
573            actor_type = std::any::type_name::<()>(),
574            actor_id = %actor_id,
575        );
576
577        let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent));
578        let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
579        instance.change_status(ActorStatus::Client);
580        Ok((instance, handle))
581    }
582
583    /// Spawn a child actor from the provided parent on this proc. The parent actor
584    /// must already belong to this proc, a fact which is asserted in code.
585    ///
586    /// When spawn_child returns, the child has an associated cell and is linked
587    /// with its parent.
588    async fn spawn_child<A: Actor>(
589        &self,
590        parent: InstanceCell,
591        params: A::Params,
592    ) -> Result<ActorHandle<A>, anyhow::Error> {
593        let actor_id = self.allocate_child_id(parent.actor_id())?;
594        let (instance, mut actor_loop_receivers, work_rx) =
595            Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
596        let actor = A::new(params).await?;
597        Ok(instance
598            .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
599            .await)
600    }
601
602    /// Call `abort` on the `JoinHandle` associated with the given
603    /// root actor. If successful return `Some(root.clone())` else
604    /// `None`.
605    pub fn abort_root_actor(
606        &self,
607        root: &ActorId,
608        this_handle: Option<&JoinHandle<()>>,
609    ) -> Option<impl Future<Output = ActorId>> {
610        self.state()
611            .ledger
612            .roots
613            .get(root)
614            .into_iter()
615            .flat_map(|e| e.upgrade())
616            .map(|cell| {
617                let r1 = root.clone();
618                let r2 = root.clone();
619                // If abort_root_actor was called from inside an actor task, we don't want to abort that actor's task yet.
620                let skip_abort = this_handle.is_some_and(|this_h| {
621                    cell.inner
622                        .actor_task_handle
623                        .get()
624                        .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
625                });
626                // `start` was called on the actor's instance
627                // immediately following `root`'s insertion into the
628                // ledger. `Instance::start()` is infallible and should
629                // complete quickly, so calling `wait()` on `actor_task_handle`
630                // should be safe (i.e., not hang forever).
631                async move {
632                    tokio::task::spawn_blocking(move || {
633                        if !skip_abort {
634                            let h = cell.inner.actor_task_handle.wait();
635                            tracing::debug!("{}: aborting {:?}", r1, h);
636                            h.abort();
637                        }
638                    })
639                    .await
640                    .unwrap();
641                    r2
642                }
643            })
644            .next()
645    }
646
647    /// Signals to a root actor to stop,
648    /// returning a status observer if successful.
649    pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
650        if let Some(entry) = self.state().ledger.roots.get(actor_id) {
651            match entry.value().upgrade() {
652                None => None, // the root's cell has been dropped
653                Some(cell) => {
654                    tracing::info!("sending stop signal to {}", cell.actor_id());
655                    if let Err(err) = cell.signal(Signal::DrainAndStop) {
656                        tracing::error!(
657                            "{}: failed to send stop signal to pid {}: {:?}",
658                            self.proc_id(),
659                            cell.pid(),
660                            err
661                        );
662                        None
663                    } else {
664                        Some(cell.status().clone())
665                    }
666                }
667            }
668        } else {
669            tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
670            None
671        }
672    }
673
674    /// Stop the proc. Returns a pair of:
675    /// - the actors observed to stop;
676    /// - the actors not observed to stop when timeout.
677    ///
678    /// If `cx` is specified, it means this method was called from inside an actor
679    /// in which case we shouldn't wait for it to stop and need to delay aborting
680    /// its task.
681    pub async fn destroy_and_wait<A: Actor>(
682        &mut self,
683        timeout: Duration,
684        cx: Option<&Context<'_, A>>,
685    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
686        self.destroy_and_wait_except_current::<A>(timeout, cx, false)
687            .await
688    }
689
690    /// Stop the proc. Returns a pair of:
691    /// - the actors observed to stop;
692    /// - the actors not observed to stop when timeout.
693    ///
694    /// If `cx` is specified, it means this method was called from inside an actor
695    /// in which case we shouldn't wait for it to stop and need to delay aborting
696    /// its task.
697    /// If except_current is true, don't stop the actor represented by "cx" at
698    /// all.
699    #[hyperactor::instrument]
700    pub async fn destroy_and_wait_except_current<A: Actor>(
701        &mut self,
702        timeout: Duration,
703        cx: Option<&Context<'_, A>>,
704        except_current: bool,
705    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
706        tracing::debug!("{}: proc stopping", self.proc_id());
707
708        let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
709            (
710                Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
711                Some(cx.self_id())
712            )
713        });
714
715        let mut statuses = HashMap::new();
716        for actor_id in self
717            .state()
718            .ledger
719            .roots
720            .iter()
721            .map(|entry| entry.key().clone())
722            .collect::<Vec<_>>()
723        {
724            if let Some(status) = self.stop_actor(&actor_id) {
725                statuses.insert(actor_id, status);
726            }
727        }
728        tracing::debug!("{}: proc stopped", self.proc_id());
729
730        let waits: Vec<_> = statuses
731            .iter_mut()
732            .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
733            .map(|(actor_id, root)| {
734                let actor_id = actor_id.clone();
735                async move {
736                    RealClock
737                        .timeout(
738                            timeout,
739                            root.wait_for(|state: &ActorStatus| state.is_terminal()),
740                        )
741                        .await
742                        .ok()
743                        .map(|_| actor_id)
744                }
745            })
746            .collect();
747
748        let results = futures::future::join_all(waits).await;
749        let stopped_actors: Vec<_> = results
750            .iter()
751            .filter_map(|actor_id| actor_id.as_ref())
752            .cloned()
753            .collect();
754        let aborted_actors: Vec<_> = statuses
755            .iter()
756            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
757            .map(|(actor_id, _)| {
758                let f = self.abort_root_actor(actor_id, this_handle);
759                async move {
760                    let _ = if let Some(f) = f { Some(f.await) } else { None };
761                    // If `is_none(&_)` then the proc's `ledger.roots`
762                    // contains an entry that wasn't a root or, the
763                    // associated actor's instance cell was already
764                    // dropped when we went to call `abort()` on the
765                    // cell's task handle.
766
767                    actor_id.clone()
768                }
769            })
770            .collect();
771        let aborted_actors = futures::future::join_all(aborted_actors).await;
772
773        if let Some(this_handle) = this_handle
774            && let Some(this_actor_id) = this_actor_id
775            && !except_current
776        {
777            tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
778            this_handle.abort()
779        };
780
781        tracing::info!(
782            "destroy_and_wait: {} actors stopped, {} actors aborted",
783            stopped_actors.len(),
784            aborted_actors.len()
785        );
786        Ok((stopped_actors, aborted_actors))
787    }
788
789    /// Resolve an actor reference to an actor residing on this proc.
790    /// Returns None if the actor is not found on this proc.
791    ///
792    /// Bounds:
793    /// - `R: Actor` — must be a real actor that can live in this
794    ///   proc.
795    /// - `R: Referable` — required because the input is an
796    ///   `ActorRef<R>`.
797    pub fn resolve_actor_ref<R: Actor + Referable>(
798        &self,
799        actor_ref: &ActorRef<R>,
800    ) -> Option<ActorHandle<R>> {
801        self.inner
802            .instances
803            .get(actor_ref.actor_id())?
804            .upgrade()?
805            .downcast_handle()
806    }
807
808    /// Create a root allocation in the proc.
809    fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
810        let name = name.to_string();
811        match self.state().roots.entry(name.to_string()) {
812            Entry::Vacant(entry) => {
813                entry.insert(AtomicUsize::new(1));
814            }
815            Entry::Occupied(_) => {
816                anyhow::bail!("an actor with name '{}' has already been spawned", name)
817            }
818        }
819        Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
820    }
821
822    /// Create a child allocation in the proc.
823    #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
824    pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
825        assert_eq!(*parent_id.proc_id(), self.state().proc_id);
826        let pid = match self.state().roots.get(parent_id.name()) {
827            None => anyhow::bail!(
828                "no actor named {} in proc {}",
829                parent_id.name(),
830                self.state().proc_id
831            ),
832            Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
833        };
834        Ok(parent_id.child_id(pid))
835    }
836
837    fn downgrade(&self) -> WeakProc {
838        WeakProc::new(self)
839    }
840}
841
842#[async_trait]
843impl MailboxSender for Proc {
844    fn post_unchecked(
845        &self,
846        envelope: MessageEnvelope,
847        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
848    ) {
849        if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
850            self.state().proc_muxer.post(envelope, return_handle)
851        } else {
852            self.state().forwarder.post(envelope, return_handle)
853        }
854    }
855}
856
857#[derive(Debug)]
858struct WeakProc(Weak<ProcState>);
859
860impl WeakProc {
861    fn new(proc: &Proc) -> Self {
862        Self(Arc::downgrade(&proc.inner))
863    }
864
865    fn upgrade(&self) -> Option<Proc> {
866        self.0.upgrade().map(|inner| Proc { inner })
867    }
868}
869
870#[async_trait]
871impl MailboxSender for WeakProc {
872    fn post_unchecked(
873        &self,
874        envelope: MessageEnvelope,
875        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
876    ) {
877        match self.upgrade() {
878            Some(proc) => proc.post(envelope, return_handle),
879            None => envelope.undeliverable(
880                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
881                return_handle,
882            ),
883        }
884    }
885}
886
887/// Represents a single work item used by the instance to dispatch to
888/// actor handles. Specifically, this enables handler polymorphism.
889struct WorkCell<A: Actor + Send>(
890    Box<
891        dyn for<'a> FnOnce(
892                &'a mut A,
893                &'a mut Instance<A>,
894            )
895                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
896            + Send
897            + Sync,
898    >,
899);
900
901impl<A: Actor + Send> WorkCell<A> {
902    /// Create a new WorkCell from a concrete function (closure).
903    fn new(
904        f: impl for<'a> FnOnce(
905            &'a mut A,
906            &'a mut Instance<A>,
907        )
908            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
909        + Send
910        + Sync
911        + 'static,
912    ) -> Self {
913        Self(Box::new(f))
914    }
915
916    /// Handle the message represented by this work cell.
917    fn handle<'a>(
918        self,
919        actor: &'a mut A,
920        instance: &'a mut Instance<A>,
921    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
922        (self.0)(actor, instance)
923    }
924}
925
926/// Context for a message currently being handled by an Instance.
927pub struct Context<'a, A: Actor> {
928    instance: &'a Instance<A>,
929    headers: Attrs,
930}
931
932impl<'a, A: Actor> Context<'a, A> {
933    /// Construct a new Context.
934    pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
935        Self { instance, headers }
936    }
937
938    /// Get a reference to the message headers.
939    pub fn headers(&self) -> &Attrs {
940        &self.headers
941    }
942}
943
944impl<A: Actor> Deref for Context<'_, A> {
945    type Target = Instance<A>;
946
947    fn deref(&self) -> &Self::Target {
948        self.instance
949    }
950}
951
952/// An actor instance. This is responsible for managing a running actor, including
953/// its full lifecycle, supervision, signal management, etc. Instances can represent
954/// a managed actor or a "client" actor that has joined the proc.
955pub struct Instance<A: Actor> {
956    inner: Arc<InstanceState<A>>,
957}
958
959struct InstanceState<A: Actor> {
960    /// The proc that owns this instance.
961    proc: Proc,
962
963    /// The instance cell that manages instance hierarchy.
964    cell: InstanceCell,
965
966    /// The mailbox associated with the actor.
967    mailbox: Mailbox,
968
969    ports: Arc<Ports<A>>,
970
971    /// A watch for communicating the actor's state.
972    status_tx: watch::Sender<ActorStatus>,
973
974    /// This instance's globally unique ID.
975    id: Uuid,
976
977    /// Used to assign sequence numbers for messages sent from this actor.
978    sequencer: Sequencer,
979}
980
981impl<A: Actor> InstanceState<A> {
982    fn self_id(&self) -> &ActorId {
983        self.mailbox.actor_id()
984    }
985}
986
987impl<A: Actor> Drop for InstanceState<A> {
988    fn drop(&mut self) {
989        self.status_tx.send_if_modified(|status| {
990            if status.is_terminal() {
991                false
992            } else {
993                tracing::info!(
994                    name = "ActorStatus",
995                    actor_id = %self.self_id(),
996                    actor_name = self.self_id().name(),
997                    status = "Stopped",
998                    prev_status = status.arm().unwrap_or("unknown"),
999                    "instance is dropped",
1000                );
1001                *status = ActorStatus::Stopped;
1002                true
1003            }
1004        });
1005    }
1006}
1007
1008impl<A: Actor> Instance<A> {
1009    /// Create a new actor instance in Created state.
1010    fn new(
1011        proc: Proc,
1012        actor_id: ActorId,
1013        detached: bool,
1014        parent: Option<InstanceCell>,
1015    ) -> (
1016        Self,
1017        Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1018        mpsc::UnboundedReceiver<WorkCell<A>>,
1019    ) {
1020        // Set up messaging
1021        let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
1022        let (work_tx, work_rx) = ordered_channel(
1023            actor_id.to_string(),
1024            config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
1025        );
1026        let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
1027        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
1028        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
1029
1030        let actor_type = match TypeInfo::of::<A>() {
1031            Some(info) => ActorType::Named(info),
1032            None => ActorType::Anonymous(std::any::type_name::<A>()),
1033        };
1034        let actor_loop_ports = if detached {
1035            None
1036        } else {
1037            let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
1038            let (supervision_port, supervision_receiver) = mailbox.open_port();
1039            Some((
1040                (signal_port, supervision_port),
1041                (signal_receiver, supervision_receiver),
1042            ))
1043        };
1044
1045        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
1046
1047        let cell = InstanceCell::new(
1048            actor_id,
1049            actor_type,
1050            proc.clone(),
1051            actor_loop,
1052            status_rx,
1053            parent,
1054            ports.clone(),
1055        );
1056        let instance_id = Uuid::now_v7();
1057        let inner = Arc::new(InstanceState {
1058            proc,
1059            cell,
1060            mailbox,
1061            ports,
1062            status_tx,
1063            sequencer: Sequencer::new(instance_id),
1064            id: instance_id,
1065        });
1066        (Self { inner }, actor_loop_receivers, work_rx)
1067    }
1068
1069    /// Notify subscribers of a change in the actors status and bump counters with the duration which
1070    /// the last status was active for.
1071    #[track_caller]
1072    fn change_status(&self, new: ActorStatus) {
1073        let old = self.inner.status_tx.send_replace(new.clone());
1074        // 2 cases are allowed:
1075        // * non-terminal -> non-terminal
1076        // * non-terminal -> terminal
1077        // terminal -> terminal is not allowed unless it is the same status (no-op).
1078        // terminal -> non-terminal is never allowed.
1079        assert!(
1080            !old.is_terminal() && !new.is_terminal()
1081                || !old.is_terminal() && new.is_terminal()
1082                || old == new,
1083            "actor changing status illegally, only allow non-terminal -> non-terminal \
1084            and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1085            self.self_id(),
1086            old,
1087            new
1088        );
1089        // Actor status changes between Idle and Processing when handling every
1090        // message. It creates too many logs if we want to log these 2 states.
1091        // Therefore we skip the status changes between them.
1092        if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) {
1093            let new_status = new.arm().unwrap_or("unknown");
1094            let change_reason = match new {
1095                ActorStatus::Failed(reason) => reason.to_string(),
1096                _ => "".to_string(),
1097            };
1098            tracing::info!(
1099                name = "ActorStatus",
1100                actor_id = %self.self_id(),
1101                actor_name = self.self_id().name(),
1102                status = new_status,
1103                prev_status = old.arm().unwrap_or("unknown"),
1104                caller = %Location::caller(),
1105                change_reason,
1106            );
1107        }
1108    }
1109
1110    fn is_terminal(&self) -> bool {
1111        self.inner.status_tx.borrow().is_terminal()
1112    }
1113
1114    fn is_stopping(&self) -> bool {
1115        self.inner.status_tx.borrow().is_stopping()
1116    }
1117
1118    /// This instance's actor ID.
1119    pub fn self_id(&self) -> &ActorId {
1120        self.inner.self_id()
1121    }
1122
1123    /// Signal the actor to stop.
1124    pub fn stop(&self) -> Result<(), ActorError> {
1125        tracing::info!("Instance::stop called, {}", self.inner.cell.actor_id());
1126        self.inner.cell.signal(Signal::DrainAndStop)
1127    }
1128
1129    /// Open a new port that accepts M-typed messages. The returned
1130    /// port may be freely cloned, serialized, and passed around. The
1131    /// returned receiver should only be retained by the actor responsible
1132    /// for processing the delivered messages.
1133    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1134        self.inner.mailbox.open_port()
1135    }
1136
1137    /// Open a new one-shot port that accepts M-typed messages. The
1138    /// returned port may be used to send a single message; ditto the
1139    /// receiver may receive a single message.
1140    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1141        self.inner.mailbox.open_once_port()
1142    }
1143
1144    /// Send a message to the actor running on the proc.
1145    pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
1146        <Self as context::MailboxExt>::post(self, port_id, headers, message, true)
1147    }
1148
1149    /// Send a message to the actor itself with a delay usually to trigger some event.
1150    pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1151    where
1152        M: Message,
1153        A: Handler<M>,
1154    {
1155        let port = self.port();
1156        let self_id = self.self_id().clone();
1157        let clock = self.inner.proc.state().clock.clone();
1158        tokio::spawn(async move {
1159            clock.non_advancing_sleep(delay).await;
1160            if let Err(e) = port.send(message) {
1161                // TODO: this is a fire-n-forget thread. We need to
1162                // handle errors in a better way.
1163                tracing::info!("{}: error sending delayed message: {}", self_id, e);
1164            }
1165        });
1166        Ok(())
1167    }
1168
1169    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
1170    /// the actor has been linked with its parent, if it has one.
1171    #[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))]
1172    async fn start(
1173        self,
1174        actor: A,
1175        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1176        work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1177    ) -> ActorHandle<A> {
1178        let instance_cell = self.inner.cell.clone();
1179        let actor_id = self.inner.cell.actor_id().clone();
1180        let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone());
1181        let actor_task_handle = A::spawn_server_task(
1182            panic_handler::with_backtrace_tracking(self.serve(
1183                actor,
1184                actor_loop_receivers,
1185                work_rx,
1186            ))
1187            .instrument(Span::current()),
1188        );
1189        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1190        instance_cell
1191            .inner
1192            .actor_task_handle
1193            .set(actor_task_handle)
1194            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1195
1196        actor_handle
1197    }
1198
1199    async fn serve(
1200        mut self,
1201        mut actor: A,
1202        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1203        mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1204    ) {
1205        // `run_actor_tree` borrows `work_rx` instead of taking ownership because
1206        // `work_rx` needs to remain alive until this function returns. If the owning
1207        // proc's `supervision_coordinator_port` is a port on this instance, if `work_rx`
1208        // is dropped before `self.proc.handle_supervision_event` is called, the process
1209        // will exit due to a "channel closed" failure.
1210        let result = self
1211            .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1212            .await;
1213
1214        assert!(self.is_stopping());
1215        let event = match result {
1216            Ok(_) => {
1217                // success exit case
1218                self.change_status(ActorStatus::Stopped);
1219                None
1220            }
1221            Err(err) => {
1222                match *err.kind {
1223                    ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1224                        // Currently only terminated actors are allowed to raise supervision events.
1225                        // If we want to change that in the future, we need to modify the exit
1226                        // status here too, because we use event's actor_status as this actor's
1227                        // terminal status.
1228                        assert!(event.actor_status.is_terminal());
1229                        self.change_status(event.actor_status.clone());
1230                        Some(event)
1231                    }
1232                    _ => {
1233                        let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1234                        let status = ActorStatus::Failed(error_kind);
1235                        self.change_status(status.clone());
1236                        Some(ActorSupervisionEvent::new(
1237                            self.inner.cell.actor_id().clone(),
1238                            actor.display_name(),
1239                            status,
1240                            None,
1241                        ))
1242                    }
1243                }
1244            }
1245        };
1246
1247        if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
1248            if let Some(event) = event {
1249                // Parent exists, failure should be propagated to the parent.
1250                parent.send_supervision_event_or_crash(event);
1251            }
1252            // TODO: we should get rid of this signal, and use *only* supervision events for
1253            // the purpose of conveying lifecycle changes
1254            if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) {
1255                tracing::error!(
1256                    "{}: failed to send stop message to parent pid {}: {:?}",
1257                    self.self_id(),
1258                    parent.pid(),
1259                    err
1260                );
1261            }
1262        } else {
1263            // Failure happened to the root actor or orphaned child actors.
1264            // In either case, the failure should be propagated to proc.
1265            //
1266            // Note that orphaned actor is unexpected and would only happen if
1267            // there is a bug.
1268            if let Some(event) = event {
1269                self.inner.proc.handle_supervision_event(event);
1270            }
1271        }
1272    }
1273
1274    /// Runs the actor, and manages its supervision tree. When the function returns,
1275    /// the whole tree rooted at this actor has stopped.
1276    async fn run_actor_tree(
1277        &mut self,
1278        actor: &mut A,
1279        mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1280        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1281    ) -> Result<(), ActorError> {
1282        // It is okay to catch all panics here, because we are in a tokio task,
1283        // and tokio will catch the panic anyway:
1284        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
1285        // What we do here is just to catch it early so we can handle it.
1286
1287        let mut did_panic = false;
1288        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1289            .catch_unwind()
1290            .await
1291        {
1292            Ok(result) => result,
1293            Err(_) => {
1294                did_panic = true;
1295                let panic_info = panic_handler::take_panic_info()
1296                    .map(|info| info.to_string())
1297                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1298                Err(ActorError::new(
1299                    self.self_id(),
1300                    ActorErrorKind::panic(anyhow::anyhow!(panic_info)),
1301                ))
1302            }
1303        };
1304
1305        assert!(!self.is_terminal());
1306        self.change_status(ActorStatus::Stopping);
1307        if let Err(err) = &result {
1308            tracing::error!("{}: actor failure: {}", self.self_id(), err);
1309        }
1310
1311        // After this point, we know we won't spawn any more children,
1312        // so we can safely read the current child keys.
1313        let mut to_unlink = Vec::new();
1314        for child in self.inner.cell.child_iter() {
1315            if let Err(err) = child.value().signal(Signal::Stop) {
1316                tracing::error!(
1317                    "{}: failed to send stop signal to child pid {}: {:?}",
1318                    self.self_id(),
1319                    child.key(),
1320                    err
1321                );
1322                to_unlink.push(child.value().clone());
1323            }
1324        }
1325        // Manually unlink children that have already been stopped.
1326        for child in to_unlink {
1327            self.inner.cell.unlink(&child);
1328        }
1329
1330        let (mut signal_receiver, _) = actor_loop_receivers;
1331        while self.inner.cell.child_count() > 0 {
1332            match RealClock
1333                .timeout(Duration::from_millis(500), signal_receiver.recv())
1334                .await
1335            {
1336                Ok(signal) => {
1337                    if let Signal::ChildStopped(pid) = signal? {
1338                        assert!(self.inner.cell.get_child(pid).is_none());
1339                    }
1340                }
1341                Err(_) => {
1342                    tracing::warn!(
1343                        "timeout waiting for ChildStopped signal from child on actor: {}, ignoring",
1344                        self.self_id()
1345                    );
1346                    // No more waiting to receive messages. Unlink all remaining
1347                    // children.
1348                    self.inner.cell.unlink_all();
1349                    break;
1350                }
1351            }
1352        }
1353        // Run the actor cleanup function before the actor stops to delete
1354        // resources. If it times out, continue with stopping the actor.
1355        // Don't call it if there was a panic, because the actor may
1356        // be in an invalid state and unable to access anything, for example
1357        // the GIL.
1358        let cleanup_result = if !did_panic {
1359            let cleanup_timeout = config::global::get(config::CLEANUP_TIMEOUT);
1360            match RealClock
1361                .timeout(cleanup_timeout, actor.cleanup(self, result.as_ref().err()))
1362                .await
1363            {
1364                Ok(Ok(x)) => Ok(x),
1365                Ok(Err(e)) => Err(ActorError::new(self.self_id(), ActorErrorKind::cleanup(e))),
1366                Err(e) => Err(ActorError::new(
1367                    self.self_id(),
1368                    ActorErrorKind::cleanup(e.into()),
1369                )),
1370            }
1371        } else {
1372            Ok(())
1373        };
1374        if let Err(ref actor_err) = result {
1375            // The original result error takes precedence over the cleanup error,
1376            // so make sure the cleanup error is still logged in that case.
1377            if let Err(ref err) = cleanup_result {
1378                tracing::warn!(
1379                    cleanup_err = %err,
1380                    %actor_err,
1381                    "ignoring cleanup error after actor error",
1382                );
1383            }
1384        }
1385        // If the original exit was not an error, let cleanup errors be
1386        // surfaced.
1387        result.and(cleanup_result)
1388    }
1389
1390    /// Initialize and run the actor until it fails or is stopped.
1391    async fn run(
1392        &mut self,
1393        actor: &mut A,
1394        actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1395        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1396    ) -> Result<(), ActorError> {
1397        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1398
1399        self.change_status(ActorStatus::Initializing);
1400        actor
1401            .init(self)
1402            .await
1403            .map_err(|err| ActorError::new(self.self_id(), ActorErrorKind::init(err)))?;
1404        let need_drain;
1405        'messages: loop {
1406            self.change_status(ActorStatus::Idle);
1407            let metric_pairs =
1408                hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1409            tokio::select! {
1410                work = work_rx.recv() => {
1411                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1412                    ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1413                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1414                    let work = work.expect("inconsistent work queue state");
1415                    if let Err(err) = work.handle(actor, self).await {
1416                        for supervision_event in supervision_event_receiver.drain() {
1417                            self.handle_supervision_event(actor, supervision_event).await?;
1418                        }
1419                        let kind = ActorErrorKind::processing(err);
1420                        return Err(ActorError {
1421                            actor_id: Box::new(self.self_id().clone()),
1422                            kind: Box::new(kind),
1423                        });
1424                    }
1425                }
1426                signal = signal_receiver.recv() => {
1427                    let signal = signal.map_err(ActorError::from);
1428                    tracing::debug!("Received signal {signal:?}");
1429                    match signal? {
1430                        signal@(Signal::Stop | Signal::DrainAndStop) => {
1431                            need_drain = matches!(signal, Signal::DrainAndStop);
1432                            break 'messages;
1433                        },
1434                        Signal::ChildStopped(pid) => {
1435                            assert!(self.inner.cell.get_child(pid).is_none());
1436                        },
1437                    }
1438                }
1439                Ok(supervision_event) = supervision_event_receiver.recv() => {
1440                    self.handle_supervision_event(actor, supervision_event).await?;
1441                }
1442            }
1443            self.inner
1444                .cell
1445                .inner
1446                .num_processed_messages
1447                .fetch_add(1, Ordering::SeqCst);
1448        }
1449
1450        if need_drain {
1451            let mut n = 0;
1452            while let Ok(work) = work_rx.try_recv() {
1453                if let Err(err) = work.handle(actor, self).await {
1454                    return Err(ActorError::new(
1455                        self.self_id(),
1456                        ActorErrorKind::processing(err),
1457                    ));
1458                }
1459                n += 1;
1460            }
1461            tracing::debug!("drained {} messages", n);
1462        }
1463        tracing::debug!("exited actor loop: {}", self.self_id());
1464        Ok(())
1465    }
1466
1467    async fn handle_supervision_event(
1468        &self,
1469        actor: &mut A,
1470        supervision_event: ActorSupervisionEvent,
1471    ) -> Result<(), ActorError> {
1472        // Handle the supervision event with the current actor.
1473        match actor
1474            .handle_supervision_event(self, &supervision_event)
1475            .await
1476        {
1477            Ok(true) => {
1478                // The supervision event was handled by this actor, nothing more to do.
1479                Ok(())
1480            }
1481            Ok(false) => {
1482                let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1483                Err(ActorError::new(self.self_id(), kind))
1484            }
1485            Err(err) => {
1486                // The actor failed to handle the supervision event, it should die.
1487                // Create a new supervision event for this failure and propagate it.
1488                let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1489                    err.to_string(),
1490                    Box::new(supervision_event),
1491                );
1492                Err(ActorError::new(self.self_id(), kind))
1493            }
1494        }
1495    }
1496
1497    #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
1498    async unsafe fn handle_message<M: Message>(
1499        &mut self,
1500        actor: &mut A,
1501        type_info: Option<&'static TypeInfo>,
1502        headers: Attrs,
1503        message: M,
1504    ) -> Result<(), anyhow::Error>
1505    where
1506        A: Handler<M>,
1507    {
1508        let handler = type_info.map(|info| {
1509            (
1510                info.typename().to_string(),
1511                // SAFETY: The caller promises to pass the correct type info.
1512                unsafe {
1513                    info.arm_unchecked(&message as *const M as *const ())
1514                        .map(str::to_string)
1515                },
1516            )
1517        });
1518
1519        self.change_status(ActorStatus::Processing(
1520            self.clock().system_time_now(),
1521            handler,
1522        ));
1523        crate::mailbox::headers::log_message_latency_if_sampling(
1524            &headers,
1525            self.self_id().to_string(),
1526        );
1527
1528        let context = Context::new(self, headers);
1529        // Pass a reference to the context to the handler, so that deref
1530        // coercion allows the `this` argument to be treated exactly like
1531        // &Instance<A>.
1532        actor.handle(&context, message).await
1533    }
1534
1535    /// Spawn on child on this instance. Currently used only by cap::CanSpawn.
1536    pub(crate) async fn spawn<C: Actor>(
1537        &self,
1538        params: C::Params,
1539    ) -> anyhow::Result<ActorHandle<C>> {
1540        self.inner
1541            .proc
1542            .spawn_child(self.inner.cell.clone(), params)
1543            .await
1544    }
1545
1546    /// Create a new direct child instance.
1547    pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1548        self.inner.proc.child_instance(self.inner.cell.clone())
1549    }
1550
1551    /// Return a handle port handle representing the actor's message
1552    /// handler for M-typed messages.
1553    pub fn port<M: Message>(&self) -> PortHandle<M>
1554    where
1555        A: Handler<M>,
1556    {
1557        self.inner.ports.get()
1558    }
1559
1560    /// The [`ActorHandle`] corresponding to this instance.
1561    pub fn handle(&self) -> ActorHandle<A> {
1562        ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports))
1563    }
1564
1565    /// The owning actor ref.
1566    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1567        self.inner.cell.bind(self.inner.ports.as_ref())
1568    }
1569
1570    // Temporary in order to support python bindings.
1571    #[doc(hidden)]
1572    pub fn mailbox_for_py(&self) -> &Mailbox {
1573        &self.inner.mailbox
1574    }
1575
1576    /// A reference to the proc's clock
1577    pub fn clock(&self) -> &(impl Clock + use<A>) {
1578        &self.inner.proc.state().clock
1579    }
1580
1581    /// The owning proc.
1582    pub fn proc(&self) -> &Proc {
1583        &self.inner.proc
1584    }
1585
1586    /// Clone this Instance to get an owned struct that can be
1587    /// plumbed through python. This should really only be called
1588    /// for the explicit purpose of being passed into python
1589    #[doc(hidden)]
1590    pub fn clone_for_py(&self) -> Self {
1591        Self {
1592            inner: Arc::clone(&self.inner),
1593        }
1594    }
1595
1596    /// Get the join handle associated with this actor.
1597    fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1598        self.inner.cell.inner.actor_task_handle.get()
1599    }
1600
1601    /// Return this instance's sequencer.
1602    pub fn sequencer(&self) -> &Sequencer {
1603        &self.inner.sequencer
1604    }
1605
1606    /// Return this instance's ID.
1607    pub fn instance_id(&self) -> Uuid {
1608        self.inner.id
1609    }
1610}
1611
1612impl<A: Actor> context::Mailbox for Instance<A> {
1613    fn mailbox(&self) -> &Mailbox {
1614        &self.inner.mailbox
1615    }
1616}
1617
1618impl<A: Actor> context::Mailbox for Context<'_, A> {
1619    fn mailbox(&self) -> &Mailbox {
1620        &self.instance.inner.mailbox
1621    }
1622}
1623
1624impl<A: Actor> context::Mailbox for &Instance<A> {
1625    fn mailbox(&self) -> &Mailbox {
1626        &self.inner.mailbox
1627    }
1628}
1629
1630impl<A: Actor> context::Mailbox for &Context<'_, A> {
1631    fn mailbox(&self) -> &Mailbox {
1632        &self.instance.inner.mailbox
1633    }
1634}
1635
1636impl<A: Actor> context::Actor for Instance<A> {
1637    type A = A;
1638    fn instance(&self) -> &Instance<A> {
1639        self
1640    }
1641}
1642
1643impl<A: Actor> context::Actor for Context<'_, A> {
1644    type A = A;
1645    fn instance(&self) -> &Instance<A> {
1646        self
1647    }
1648}
1649
1650impl<A: Actor> context::Actor for &Instance<A> {
1651    type A = A;
1652    fn instance(&self) -> &Instance<A> {
1653        self
1654    }
1655}
1656
1657impl<A: Actor> context::Actor for &Context<'_, A> {
1658    type A = A;
1659    fn instance(&self) -> &Instance<A> {
1660        self
1661    }
1662}
1663
1664impl Instance<()> {
1665    /// See [Mailbox::bind_actor_port] for details.
1666    pub fn bind_actor_port<M: RemoteMessage>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1667        assert!(
1668            self.actor_task_handle().is_none(),
1669            "can only bind actor port on instance with no running actor task"
1670        );
1671        self.inner.mailbox.bind_actor_port()
1672    }
1673}
1674
1675#[derive(Debug)]
1676enum ActorType {
1677    Named(&'static TypeInfo),
1678    Anonymous(&'static str),
1679}
1680
1681impl ActorType {
1682    /// The actor's type name.
1683    fn type_name(&self) -> &'static str {
1684        match self {
1685            Self::Named(info) => info.typename(),
1686            Self::Anonymous(name) => name,
1687        }
1688    }
1689}
1690
1691/// InstanceCell contains all of the type-erased, shareable state of an instance.
1692/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
1693/// to access the underlying instance.
1694///
1695/// InstanceCell is reference counted and cloneable.
1696#[derive(Clone, Debug)]
1697pub struct InstanceCell {
1698    inner: Arc<InstanceCellState>,
1699}
1700
1701#[derive(Debug)]
1702struct InstanceCellState {
1703    /// The actor's id.
1704    actor_id: ActorId,
1705
1706    /// Actor info contains the actor's type information.
1707    actor_type: ActorType,
1708
1709    /// The proc in which the actor is running.
1710    proc: Proc,
1711
1712    /// Control port handles to the actor loop, if one is running.
1713    actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1714
1715    /// An observer that stores the current status of the actor.
1716    status: watch::Receiver<ActorStatus>,
1717
1718    /// A weak reference to this instance's parent.
1719    parent: WeakInstanceCell,
1720
1721    /// This instance's children by their PIDs.
1722    children: DashMap<Index, InstanceCell>,
1723
1724    /// Access to the spawned actor's join handle.
1725    actor_task_handle: OnceLock<JoinHandle<()>>,
1726
1727    /// The set of named ports that are exported by this actor.
1728    exported_named_ports: DashMap<u64, &'static str>,
1729
1730    /// The number of messages processed by this actor.
1731    num_processed_messages: AtomicU64,
1732
1733    /// The log recording associated with this actor. It is used to
1734    /// store a 'flight record' of events while the actor is running.
1735    recording: Recording,
1736
1737    /// A type-erased reference to Ports<A>, which allows us to recover
1738    /// an ActorHandle<A> by downcasting.
1739    ports: Arc<dyn Any + Send + Sync>,
1740}
1741
1742impl InstanceCellState {
1743    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1744    /// the parent is returned.
1745    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1746        self.parent
1747            .upgrade()
1748            .filter(|parent| parent.inner.unlink(self))
1749    }
1750
1751    /// Unlink this instance from a child.
1752    fn unlink(&self, child: &InstanceCellState) -> bool {
1753        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1754        self.children.remove(&child.actor_id.pid()).is_some()
1755    }
1756}
1757
1758impl InstanceCell {
1759    /// Creates a new instance cell with the provided internal state. If a parent
1760    /// is provided, it is linked to this cell.
1761    fn new(
1762        actor_id: ActorId,
1763        actor_type: ActorType,
1764        proc: Proc,
1765        actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1766        status: watch::Receiver<ActorStatus>,
1767        parent: Option<InstanceCell>,
1768        ports: Arc<dyn Any + Send + Sync>,
1769    ) -> Self {
1770        let _ais = actor_id.to_string();
1771        let cell = Self {
1772            inner: Arc::new(InstanceCellState {
1773                actor_id: actor_id.clone(),
1774                actor_type,
1775                proc: proc.clone(),
1776                actor_loop,
1777                status,
1778                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1779                children: DashMap::new(),
1780                actor_task_handle: OnceLock::new(),
1781                exported_named_ports: DashMap::new(),
1782                num_processed_messages: AtomicU64::new(0),
1783                recording: hyperactor_telemetry::recorder().record(64),
1784                ports,
1785            }),
1786        };
1787        cell.maybe_link_parent();
1788        proc.inner
1789            .instances
1790            .insert(actor_id.clone(), cell.downgrade());
1791        cell
1792    }
1793
1794    fn wrap(inner: Arc<InstanceCellState>) -> Self {
1795        Self { inner }
1796    }
1797
1798    /// The actor's ID.
1799    pub(crate) fn actor_id(&self) -> &ActorId {
1800        &self.inner.actor_id
1801    }
1802
1803    /// The actor's PID.
1804    pub(crate) fn pid(&self) -> Index {
1805        self.inner.actor_id.pid()
1806    }
1807
1808    /// The actor's join handle.
1809    #[allow(dead_code)]
1810    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1811        self.inner.actor_task_handle.get()
1812    }
1813
1814    /// The instance's status observer.
1815    pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1816        &self.inner.status
1817    }
1818
1819    /// Send a signal to the actor.
1820    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1821        if let Some((signal_port, _)) = &self.inner.actor_loop {
1822            signal_port.send(signal).map_err(ActorError::from)
1823        } else {
1824            tracing::warn!(
1825                "{}: attempted to send signal {} to detached actor",
1826                self.inner.actor_id,
1827                signal
1828            );
1829            Ok(())
1830        }
1831    }
1832
1833    /// Used by this actor's children to send a supervision event to this actor.
1834    /// When it fails to send, we will crash the process. As part of the crash,
1835    /// all the procs and actors running on this process will be terminated
1836    /// forcefully.
1837    ///
1838    /// Note that "let it crash" is the default behavior when a supervision event
1839    /// cannot be delivered upstream. It is the upstream's responsibility to
1840    /// detect and handle crashes.
1841    pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1842        match &self.inner.actor_loop {
1843            Some((_, supervision_port)) => {
1844                if let Err(err) = supervision_port.send(event) {
1845                    tracing::error!(
1846                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1847                        self.actor_id(),
1848                        err
1849                    );
1850                    std::process::exit(1);
1851                }
1852            }
1853            None => {
1854                tracing::error!(
1855                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1856                    self.actor_id(),
1857                    event,
1858                );
1859                std::process::exit(1);
1860            }
1861        }
1862    }
1863
1864    /// Downgrade this InstanceCell to a weak reference.
1865    pub fn downgrade(&self) -> WeakInstanceCell {
1866        WeakInstanceCell {
1867            inner: Arc::downgrade(&self.inner),
1868        }
1869    }
1870
1871    /// Link this instance to a new child.
1872    fn link(&self, child: InstanceCell) {
1873        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1874        self.inner.children.insert(child.pid(), child);
1875    }
1876
1877    /// Unlink this instance from a child.
1878    fn unlink(&self, child: &InstanceCell) {
1879        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1880        self.inner.children.remove(&child.pid());
1881    }
1882
1883    /// Unlink this instance from all children.
1884    fn unlink_all(&self) {
1885        self.inner.children.clear();
1886    }
1887
1888    /// Link this instance to its parent, if it has one.
1889    fn maybe_link_parent(&self) {
1890        if let Some(parent) = self.inner.parent.upgrade() {
1891            parent.link(self.clone());
1892        }
1893    }
1894
1895    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1896    /// the parent is returned.
1897    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1898        self.inner.maybe_unlink_parent()
1899    }
1900
1901    /// Get parent instance cell, if it exists.
1902    #[allow(dead_code)]
1903    fn get_parent_cell(&self) -> Option<InstanceCell> {
1904        self.inner.parent.upgrade()
1905    }
1906
1907    /// Return an iterator over this instance's children. This may deadlock if the
1908    /// caller already holds a reference to any item in map.
1909    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1910        self.inner.children.iter()
1911    }
1912
1913    /// The number of children this instance has.
1914    fn child_count(&self) -> usize {
1915        self.inner.children.len()
1916    }
1917
1918    /// Get a child by its PID.
1919    fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1920        self.inner.children.get(&pid).map(|child| child.clone())
1921    }
1922
1923    /// This is temporary so that we can share binding code between handle and instance.
1924    /// We should find some (better) way to consolidate the two.
1925    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1926        <R as Binds<A>>::bind(ports);
1927        // All actors handle signals and undeliverable messages.
1928        ports.bind::<Signal>();
1929        ports.bind::<Undeliverable<MessageEnvelope>>();
1930        // TODO: consider sharing `ports.bound` directly.
1931        for entry in ports.bound.iter() {
1932            self.inner
1933                .exported_named_ports
1934                .insert(*entry.key(), entry.value());
1935        }
1936        ActorRef::attest(self.actor_id().clone())
1937    }
1938
1939    /// Attempt to downcast this cell to a concrete actor handle.
1940    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1941        let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1942        Some(ActorHandle::new(self.clone(), ports))
1943    }
1944}
1945
1946impl Drop for InstanceCellState {
1947    fn drop(&mut self) {
1948        if let Some(parent) = self.maybe_unlink_parent() {
1949            tracing::debug!(
1950                "instance {} was dropped with parent {} still linked",
1951                self.actor_id,
1952                parent.actor_id()
1953            );
1954        }
1955        if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1956            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1957        }
1958    }
1959}
1960
1961/// A weak version of the InstanceCell. This is used to provide cyclical
1962/// linkage between actors without creating a strong reference cycle.
1963#[derive(Debug, Clone)]
1964pub struct WeakInstanceCell {
1965    inner: Weak<InstanceCellState>,
1966}
1967
1968impl Default for WeakInstanceCell {
1969    fn default() -> Self {
1970        Self::new()
1971    }
1972}
1973
1974impl WeakInstanceCell {
1975    /// Create a new weak instance cell that is never upgradeable.
1976    pub fn new() -> Self {
1977        Self { inner: Weak::new() }
1978    }
1979
1980    /// Upgrade this weak instance cell to a strong reference, if possible.
1981    pub fn upgrade(&self) -> Option<InstanceCell> {
1982        self.inner.upgrade().map(InstanceCell::wrap)
1983    }
1984}
1985
1986/// A polymorphic dictionary that stores ports for an actor's handlers.
1987/// The interface memoizes the ports so that they are reused. We do not
1988/// (yet) support stable identifiers across multiple instances of the same
1989/// actor.
1990pub struct Ports<A: Actor> {
1991    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
1992    bound: DashMap<u64, &'static str>,
1993    mailbox: Mailbox,
1994    workq: OrderedSender<WorkCell<A>>,
1995}
1996
1997/// A message's sequencer number infomation.
1998#[derive(Serialize, Deserialize, Clone, Named, AttrValue)]
1999pub struct SeqInfo {
2000    /// Message's session ID
2001    pub session_id: Uuid,
2002    /// Message's sequence number in the given session.
2003    pub seq: u64,
2004}
2005
2006impl fmt::Display for SeqInfo {
2007    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2008        write!(f, "{}:{}", self.session_id, self.seq)
2009    }
2010}
2011
2012impl std::str::FromStr for SeqInfo {
2013    type Err = anyhow::Error;
2014
2015    fn from_str(s: &str) -> Result<Self, Self::Err> {
2016        let parts: Vec<_> = s.split(':').collect();
2017        if parts.len() != 2 {
2018            return Err(anyhow::anyhow!("invalid SeqInfo: {}", s));
2019        }
2020        let session_id: Uuid = parts[0].parse()?;
2021        let seq: u64 = parts[1].parse()?;
2022        Ok(SeqInfo { session_id, seq })
2023    }
2024}
2025
2026declare_attrs! {
2027    /// The sender of this message, the session ID, and the message's sequence
2028    /// number assigned by this session.
2029    pub attr SEQ_INFO: SeqInfo;
2030}
2031
2032impl<A: Actor> Ports<A> {
2033    fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
2034        Self {
2035            ports: DashMap::new(),
2036            bound: DashMap::new(),
2037            mailbox,
2038            workq,
2039        }
2040    }
2041
2042    /// Get a port for the Handler<M> of actor A.
2043    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
2044    where
2045        A: Handler<M>,
2046    {
2047        let key = TypeId::of::<M>();
2048        match self.ports.entry(key) {
2049            Entry::Vacant(entry) => {
2050                // Some special case hackery, but it keeps the rest of the code (relatively) simple.
2051                assert_ne!(
2052                    key,
2053                    TypeId::of::<Signal>(),
2054                    "cannot provision Signal port through `Ports::get`"
2055                );
2056
2057                let type_info = TypeInfo::get_by_typeid(key);
2058                let workq = self.workq.clone();
2059                let actor_id = self.mailbox.actor_id().to_string();
2060                let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
2061                    let seq_info = headers.get(SEQ_INFO).cloned();
2062
2063                    let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
2064                        Box::pin(async move {
2065                            // SAFETY: we guarantee that the passed type_info is for type M.
2066                            unsafe {
2067                                instance
2068                                    .handle_message(actor, type_info, headers, msg)
2069                                    .await
2070                            }
2071                        })
2072                    });
2073                    ACTOR_MESSAGE_QUEUE_SIZE.add(
2074                        1,
2075                        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
2076                    );
2077                    if workq.enable_buffering {
2078                        let SeqInfo { session_id, seq } =
2079                            seq_info.expect("SEQ_INFO must be set when buffering is enabled");
2080
2081                        // TODO: return the message contained in the error instead of dropping them when converting
2082                        // to anyhow::Error. In that way, the message can be picked up by mailbox and returned to sender.
2083                        workq.send(session_id, seq, work).map_err(|e| match e {
2084                            OrderedSenderError::InvalidZeroSeq(_) => {
2085                                anyhow::anyhow!("seq must be greater than 0")
2086                            }
2087                            OrderedSenderError::SendError(e) => anyhow::Error::from(e),
2088                            OrderedSenderError::FlushError(e) => e,
2089                        })
2090                    } else {
2091                        workq.direct_send(work).map_err(anyhow::Error::from)
2092                    }
2093                });
2094                entry.insert(Box::new(port.clone()));
2095                port
2096            }
2097            Entry::Occupied(entry) => {
2098                let port = entry.get();
2099                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
2100            }
2101        }
2102    }
2103
2104    /// Open a (typed) message port as in [`get`], but return a port receiver instead of dispatching
2105    /// the underlying handler.
2106    pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
2107        match self.ports.entry(TypeId::of::<M>()) {
2108            Entry::Vacant(entry) => {
2109                let (port, receiver) = self.mailbox.open_port();
2110                entry.insert(Box::new(port.clone()));
2111                Some((port, receiver))
2112            }
2113            Entry::Occupied(_) => None,
2114        }
2115    }
2116
2117    /// Bind the given message type to its actor port.
2118    pub fn bind<M: RemoteMessage>(&self)
2119    where
2120        A: Handler<M>,
2121    {
2122        let port_index = M::port();
2123        match self.bound.entry(port_index) {
2124            Entry::Vacant(entry) => {
2125                self.get::<M>().bind_actor_port();
2126                entry.insert(M::typename());
2127            }
2128            Entry::Occupied(entry) => {
2129                assert_eq!(
2130                    *entry.get(),
2131                    M::typename(),
2132                    "bind {}: port index {} already bound to type {}",
2133                    M::typename(),
2134                    port_index,
2135                    entry.get(),
2136                );
2137            }
2138        }
2139    }
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144    use std::assert_matches::assert_matches;
2145    use std::sync::atomic::AtomicBool;
2146
2147    use hyperactor_macros::export;
2148    use maplit::hashmap;
2149    use serde_json::json;
2150    use timed_test::async_timed_test;
2151    use tokio::sync::Barrier;
2152    use tokio::sync::oneshot;
2153    use tracing::Level;
2154    use tracing_subscriber::layer::SubscriberExt;
2155    use tracing_test::internal::logs_with_scope_contain;
2156
2157    use super::*;
2158    // needed for in-crate macro expansion
2159    use crate as hyperactor;
2160    use crate::HandleClient;
2161    use crate::Handler;
2162    use crate::OncePortRef;
2163    use crate::PortRef;
2164    use crate::clock::RealClock;
2165    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2166    use crate::test_utils::process_assertion::assert_termination;
2167
2168    impl ActorTreeSnapshot {
2169        #[allow(dead_code)]
2170        fn empty(pid: Index) -> Self {
2171            Self {
2172                pid,
2173                type_name: String::new(),
2174                status: ActorStatus::Idle,
2175                stats: ActorStats::default(),
2176                handlers: HashMap::new(),
2177                children: HashMap::new(),
2178                events: Vec::new(),
2179                spans: Vec::new(),
2180            }
2181        }
2182
2183        fn empty_typed(pid: Index, type_name: String) -> Self {
2184            Self {
2185                pid,
2186                type_name,
2187                status: ActorStatus::Idle,
2188                stats: ActorStats::default(),
2189                handlers: HashMap::new(),
2190                children: HashMap::new(),
2191                events: Vec::new(),
2192                spans: Vec::new(),
2193            }
2194        }
2195    }
2196
2197    #[derive(Debug, Default, Actor)]
2198    #[export]
2199    struct TestActor;
2200
2201    #[derive(Handler, HandleClient, Debug)]
2202    enum TestActorMessage {
2203        Reply(oneshot::Sender<()>),
2204        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2205        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2206        Noop(),
2207        Fail(anyhow::Error),
2208        Panic(String),
2209        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2210    }
2211
2212    impl TestActor {
2213        async fn spawn_child(parent: &ActorHandle<TestActor>) -> ActorHandle<TestActor> {
2214            let (tx, rx) = oneshot::channel();
2215            parent.send(TestActorMessage::Spawn(tx)).unwrap();
2216            rx.await.unwrap()
2217        }
2218    }
2219
2220    #[async_trait]
2221    #[crate::forward(TestActorMessage)]
2222    impl TestActorMessageHandler for TestActor {
2223        async fn reply(
2224            &mut self,
2225            _cx: &crate::Context<Self>,
2226            sender: oneshot::Sender<()>,
2227        ) -> Result<(), anyhow::Error> {
2228            sender.send(()).unwrap();
2229            Ok(())
2230        }
2231
2232        async fn wait(
2233            &mut self,
2234            _cx: &crate::Context<Self>,
2235            sender: oneshot::Sender<()>,
2236            receiver: oneshot::Receiver<()>,
2237        ) -> Result<(), anyhow::Error> {
2238            sender.send(()).unwrap();
2239            receiver.await.unwrap();
2240            Ok(())
2241        }
2242
2243        async fn forward(
2244            &mut self,
2245            _cx: &crate::Context<Self>,
2246            destination: ActorHandle<TestActor>,
2247            message: Box<TestActorMessage>,
2248        ) -> Result<(), anyhow::Error> {
2249            // TODO: this needn't be async
2250            destination.send(*message)?;
2251            Ok(())
2252        }
2253
2254        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2255            Ok(())
2256        }
2257
2258        async fn fail(
2259            &mut self,
2260            _cx: &crate::Context<Self>,
2261            err: anyhow::Error,
2262        ) -> Result<(), anyhow::Error> {
2263            Err(err)
2264        }
2265
2266        async fn panic(
2267            &mut self,
2268            _cx: &crate::Context<Self>,
2269            err_msg: String,
2270        ) -> Result<(), anyhow::Error> {
2271            panic!("{}", err_msg);
2272        }
2273
2274        async fn spawn(
2275            &mut self,
2276            cx: &crate::Context<Self>,
2277            reply: oneshot::Sender<ActorHandle<TestActor>>,
2278        ) -> Result<(), anyhow::Error> {
2279            let handle = <Self as Actor>::spawn(cx, ()).await?;
2280            reply.send(handle).unwrap();
2281            Ok(())
2282        }
2283    }
2284
2285    #[tracing_test::traced_test]
2286    #[async_timed_test(timeout_secs = 30)]
2287    async fn test_spawn_actor() {
2288        let proc = Proc::local();
2289        let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2290
2291        // Check on the join handle.
2292        assert!(logs_contain(
2293            format!(
2294                "{}: spawned with {:?}",
2295                handle.actor_id(),
2296                handle.cell().actor_task_handle().unwrap(),
2297            )
2298            .as_str()
2299        ));
2300
2301        let mut state = handle.status().clone();
2302
2303        // Send a ping-pong to the actor. Wait for the actor to become idle.
2304
2305        let (tx, rx) = oneshot::channel::<()>();
2306        handle.send(TestActorMessage::Reply(tx)).unwrap();
2307        rx.await.unwrap();
2308
2309        state
2310            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2311            .await
2312            .unwrap();
2313
2314        // Make sure we enter processing state while the actor is handling a message.
2315        let (enter_tx, enter_rx) = oneshot::channel::<()>();
2316        let (exit_tx, exit_rx) = oneshot::channel::<()>();
2317
2318        handle
2319            .send(TestActorMessage::Wait(enter_tx, exit_rx))
2320            .unwrap();
2321        enter_rx.await.unwrap();
2322        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
2323        exit_tx.send(()).unwrap();
2324
2325        state
2326            .wait_for(|state| matches!(*state, ActorStatus::Idle))
2327            .await
2328            .unwrap();
2329
2330        handle.drain_and_stop().unwrap();
2331        handle.await;
2332        assert_matches!(*state.borrow(), ActorStatus::Stopped);
2333    }
2334
2335    #[async_timed_test(timeout_secs = 30)]
2336    async fn test_proc_actors_messaging() {
2337        let proc = Proc::local();
2338        let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2339        let second = proc.spawn::<TestActor>("second", ()).await.unwrap();
2340        let (tx, rx) = oneshot::channel::<()>();
2341        let reply_message = TestActorMessage::Reply(tx);
2342        first
2343            .send(TestActorMessage::Forward(second, Box::new(reply_message)))
2344            .unwrap();
2345        rx.await.unwrap();
2346    }
2347
2348    #[derive(Debug, Default, Actor)]
2349    struct LookupTestActor;
2350
2351    #[derive(Handler, HandleClient, Debug)]
2352    enum LookupTestMessage {
2353        ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2354    }
2355
2356    #[async_trait]
2357    #[crate::forward(LookupTestMessage)]
2358    impl LookupTestMessageHandler for LookupTestActor {
2359        async fn actor_exists(
2360            &mut self,
2361            cx: &crate::Context<Self>,
2362            actor_ref: ActorRef<TestActor>,
2363        ) -> Result<bool, anyhow::Error> {
2364            Ok(actor_ref.downcast_handle(cx).is_some())
2365        }
2366    }
2367
2368    #[async_timed_test(timeout_secs = 30)]
2369    async fn test_actor_lookup() {
2370        let proc = Proc::local();
2371        let (client, _handle) = proc.instance("client").unwrap();
2372
2373        let target_actor = proc.spawn::<TestActor>("target", ()).await.unwrap();
2374        let target_actor_ref = target_actor.bind();
2375        let lookup_actor = proc.spawn::<LookupTestActor>("lookup", ()).await.unwrap();
2376
2377        assert!(
2378            lookup_actor
2379                .actor_exists(&client, target_actor_ref.clone())
2380                .await
2381                .unwrap()
2382        );
2383
2384        // Make up a child actor. It shouldn't exist.
2385        assert!(
2386            !lookup_actor
2387                .actor_exists(
2388                    &client,
2389                    ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2390                )
2391                .await
2392                .unwrap()
2393        );
2394        // A wrongly-typed actor ref should also not obtain.
2395        assert!(
2396            !lookup_actor
2397                .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2398                .await
2399                .unwrap()
2400        );
2401
2402        target_actor.drain_and_stop().unwrap();
2403        target_actor.await;
2404
2405        assert!(
2406            !lookup_actor
2407                .actor_exists(&client, target_actor_ref)
2408                .await
2409                .unwrap()
2410        );
2411
2412        lookup_actor.drain_and_stop().unwrap();
2413        lookup_actor.await;
2414    }
2415
2416    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2417        assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2418        assert_eq!(
2419            child.inner.parent.upgrade().unwrap().actor_id(),
2420            parent.actor_id()
2421        );
2422        assert_matches!(
2423            parent.inner.children.get(&child.pid()),
2424            Some(node) if node.actor_id() == child.actor_id()
2425        );
2426    }
2427
2428    #[tracing_test::traced_test]
2429    #[async_timed_test(timeout_secs = 30)]
2430    async fn test_spawn_child() {
2431        let proc = Proc::local();
2432
2433        let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2434        let second = TestActor::spawn_child(&first).await;
2435        let third = TestActor::spawn_child(&second).await;
2436
2437        // Check we've got the join handles.
2438        assert!(logs_with_scope_contain(
2439            "hyperactor::proc",
2440            format!(
2441                "{}: spawned with {:?}",
2442                first.actor_id(),
2443                first.cell().actor_task_handle().unwrap()
2444            )
2445            .as_str()
2446        ));
2447        assert!(logs_with_scope_contain(
2448            "hyperactor::proc",
2449            format!(
2450                "{}: spawned with {:?}",
2451                second.actor_id(),
2452                second.cell().actor_task_handle().unwrap()
2453            )
2454            .as_str()
2455        ));
2456        assert!(logs_with_scope_contain(
2457            "hyperactor::proc",
2458            format!(
2459                "{}: spawned with {:?}",
2460                third.actor_id(),
2461                third.cell().actor_task_handle().unwrap()
2462            )
2463            .as_str()
2464        ));
2465
2466        // These are allocated in sequence:
2467        assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2468        assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2469        assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2470
2471        // Supervision tree is constructed correctly.
2472        validate_link(third.cell(), second.cell());
2473        validate_link(second.cell(), first.cell());
2474        assert!(first.cell().inner.parent.upgrade().is_none());
2475
2476        // Supervision tree is torn down correctly.
2477        // Once each actor is stopped, it should have no linked children.
2478        let third_cell = third.cell().clone();
2479        third.drain_and_stop().unwrap();
2480        third.await;
2481        assert!(third_cell.inner.children.is_empty());
2482        drop(third_cell);
2483        validate_link(second.cell(), first.cell());
2484
2485        let second_cell = second.cell().clone();
2486        second.drain_and_stop().unwrap();
2487        second.await;
2488        assert!(second_cell.inner.children.is_empty());
2489        drop(second_cell);
2490
2491        let first_cell = first.cell().clone();
2492        first.drain_and_stop().unwrap();
2493        first.await;
2494        assert!(first_cell.inner.children.is_empty());
2495    }
2496
2497    #[async_timed_test(timeout_secs = 30)]
2498    async fn test_child_lifecycle() {
2499        let proc = Proc::local();
2500
2501        let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2502        let root_1 = TestActor::spawn_child(&root).await;
2503        let root_2 = TestActor::spawn_child(&root).await;
2504        let root_2_1 = TestActor::spawn_child(&root_2).await;
2505
2506        root.drain_and_stop().unwrap();
2507        root.await;
2508
2509        for actor in [root_1, root_2, root_2_1] {
2510            assert!(actor.send(TestActorMessage::Noop()).is_err());
2511            assert_matches!(actor.await, ActorStatus::Stopped);
2512        }
2513    }
2514
2515    #[async_timed_test(timeout_secs = 30)]
2516    async fn test_parent_failure() {
2517        let proc = Proc::local();
2518        // Need to set a supervison coordinator for this Proc because there will
2519        // be actor failure(s) in this test which trigger supervision.
2520        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2521
2522        let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2523        let root_1 = TestActor::spawn_child(&root).await;
2524        let root_2 = TestActor::spawn_child(&root).await;
2525        let root_2_1 = TestActor::spawn_child(&root_2).await;
2526
2527        root_2
2528            .send(TestActorMessage::Fail(anyhow::anyhow!(
2529                "some random failure"
2530            )))
2531            .unwrap();
2532        let _root_2_actor_id = root_2.actor_id().clone();
2533        assert_matches!(
2534            root_2.await,
2535            ActorStatus::Failed(err) if err.to_string() == "some random failure"
2536        );
2537
2538        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
2539        // stopped by a parent failure?
2540        // Currently the parent fails with an error related to the child's failure.
2541        assert_matches!(
2542            root.await,
2543            ActorStatus::Failed(err) if err.to_string().contains("some random failure")
2544        );
2545        assert_eq!(root_2_1.await, ActorStatus::Stopped);
2546        assert_eq!(root_1.await, ActorStatus::Stopped);
2547    }
2548
2549    #[async_timed_test(timeout_secs = 30)]
2550    async fn test_actor_ledger() {
2551        async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2552            actor_handle
2553                .status()
2554                .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2555                .await
2556                .unwrap();
2557        }
2558
2559        let proc = Proc::local();
2560
2561        // Add the 1st root. This root will remain active until the end of the test.
2562        let root: ActorHandle<TestActor> = proc.spawn::<TestActor>("root", ()).await.unwrap();
2563        wait_until_idle(&root).await;
2564        {
2565            let snapshot = proc.state().ledger.snapshot();
2566            assert_eq!(
2567                snapshot.roots,
2568                hashmap! {
2569                    root.actor_id().clone() =>
2570                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2571                },
2572            );
2573        }
2574
2575        // Add the 2nd root.
2576        let another_root: ActorHandle<TestActor> =
2577            proc.spawn::<TestActor>("another_root", ()).await.unwrap();
2578        wait_until_idle(&another_root).await;
2579        {
2580            let snapshot = proc.state().ledger.snapshot();
2581            assert_eq!(
2582                snapshot.roots,
2583                hashmap! {
2584                    root.actor_id().clone() =>
2585                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2586                    another_root.actor_id().clone() =>
2587                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2588                },
2589            );
2590        }
2591
2592        // Stop the 2nd root. It should be excluded from the snapshot after it
2593        // is stopped.
2594        another_root.drain_and_stop().unwrap();
2595        another_root.await;
2596        {
2597            let snapshot = proc.state().ledger.snapshot();
2598            assert_eq!(
2599                snapshot.roots,
2600                hashmap! { root.actor_id().clone() =>
2601                    ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2602                },
2603            );
2604        }
2605
2606        // Incrementally add the following children tree to root. This tree
2607        // should be captured by snapshot.
2608        //     root -> root_1 -> root_1_1
2609        //         |-> root_2
2610
2611        let root_1 = TestActor::spawn_child(&root).await;
2612        wait_until_idle(&root_1).await;
2613        // Wait until the root actor processes the message and is then idle again.
2614        wait_until_idle(&root).await;
2615        {
2616            let snapshot = proc.state().ledger.snapshot();
2617            assert_eq!(
2618                snapshot.roots,
2619                hashmap! {
2620                    root.actor_id().clone() =>  ActorTreeSnapshot {
2621                        pid: 0,
2622                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2623                        status: ActorStatus::Idle,
2624                        stats: ActorStats { num_processed_messages: 1 },
2625                        handlers: HashMap::new(),
2626                        children: hashmap! {
2627                            root_1.actor_id().pid() =>
2628                                ActorTreeSnapshot::empty_typed(
2629                                    root_1.actor_id().pid(),
2630                                    "hyperactor::proc::tests::TestActor".to_string()
2631                                )
2632                        },
2633                        events: Vec::new(),
2634                        spans: Vec::new(),
2635                    }
2636                },
2637            );
2638        }
2639
2640        let root_1_1 = TestActor::spawn_child(&root_1).await;
2641        wait_until_idle(&root_1_1).await;
2642        wait_until_idle(&root_1).await;
2643        {
2644            let snapshot = proc.state().ledger.snapshot();
2645            assert_eq!(
2646                snapshot.roots,
2647                hashmap! {
2648                    root.actor_id().clone() =>  ActorTreeSnapshot {
2649                        pid: 0,
2650                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2651                        status: ActorStatus::Idle,
2652                        stats: ActorStats { num_processed_messages: 1 },
2653                        handlers: HashMap::new(),
2654                        children: hashmap!{
2655                            root_1.actor_id().pid() =>
2656                                ActorTreeSnapshot {
2657                                    pid: root_1.actor_id().pid(),
2658                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2659                                    status: ActorStatus::Idle,
2660                                    stats: ActorStats { num_processed_messages: 1 },
2661                                    handlers: HashMap::new(),
2662                                    children: hashmap!{
2663                                        root_1_1.actor_id().pid() =>
2664                                            ActorTreeSnapshot::empty_typed(
2665                                                root_1_1.actor_id().pid(),
2666                                                "hyperactor::proc::tests::TestActor".to_string()
2667                                            )
2668                                    },
2669                                    events: Vec::new(),
2670                                    spans: Vec::new(),
2671                                }
2672                        },
2673                        events: Vec::new(),
2674                        spans: Vec::new(),
2675                    },
2676                }
2677            );
2678        }
2679
2680        let root_2 = TestActor::spawn_child(&root).await;
2681        wait_until_idle(&root_2).await;
2682        wait_until_idle(&root).await;
2683        {
2684            let snapshot = proc.state().ledger.snapshot();
2685            assert_eq!(
2686                snapshot.roots,
2687                hashmap! {
2688                    root.actor_id().clone() =>  ActorTreeSnapshot {
2689                        pid: 0,
2690                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2691                        status: ActorStatus::Idle,
2692                        stats: ActorStats { num_processed_messages: 2 },
2693                        handlers: HashMap::new(),
2694                        children: hashmap!{
2695                            root_2.actor_id().pid() =>
2696                                ActorTreeSnapshot{
2697                                    pid: root_2.actor_id().pid(),
2698                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2699                                    status: ActorStatus::Idle,
2700                                    stats: ActorStats::default(),
2701                                    handlers: HashMap::new(),
2702                                    children: HashMap::new(),
2703                                    events: Vec::new(),
2704                                    spans: Vec::new(),
2705                                },
2706                            root_1.actor_id().pid() =>
2707                                ActorTreeSnapshot{
2708                                    pid: root_1.actor_id().pid(),
2709                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2710                                    status: ActorStatus::Idle,
2711                                    stats: ActorStats { num_processed_messages: 1 },
2712                                    handlers: HashMap::new(),
2713                                    children: hashmap!{
2714                                        root_1_1.actor_id().pid() =>
2715                                            ActorTreeSnapshot::empty_typed(
2716                                                root_1_1.actor_id().pid(),
2717                                                "hyperactor::proc::tests::TestActor".to_string()
2718                                            )
2719                                    },
2720                                    events: Vec::new(),
2721                                    spans: Vec::new(),
2722                                },
2723                        },
2724                        events: Vec::new(),
2725                        spans: Vec::new(),
2726                    },
2727                }
2728            );
2729        }
2730
2731        // Stop root_1. This should remove it, and its child, from snapshot.
2732        root_1.drain_and_stop().unwrap();
2733        root_1.await;
2734        // root also needs to stop processing messages to get a reliable number.
2735        wait_until_idle(&root).await;
2736        {
2737            let snapshot = proc.state().ledger.snapshot();
2738            assert_eq!(
2739                snapshot.roots,
2740                hashmap! {
2741                    root.actor_id().clone() =>  ActorTreeSnapshot {
2742                        pid: 0,
2743                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2744                        status: ActorStatus::Idle,
2745                        stats: ActorStats { num_processed_messages: 3 },
2746                        handlers: HashMap::new(),
2747                        children: hashmap!{
2748                            root_2.actor_id().pid() =>
2749                                ActorTreeSnapshot {
2750                                    pid: root_2.actor_id().pid(),
2751                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2752                                    status: ActorStatus::Idle,
2753                                    stats: ActorStats::default(),
2754                                    handlers: HashMap::new(),
2755                                    children: HashMap::new(),
2756                                    events: Vec::new(),
2757                                    spans: Vec::new(),
2758                                }
2759                        },
2760                        events: Vec::new(),
2761                        spans: Vec::new(),
2762                    },
2763                }
2764            );
2765        }
2766
2767        // Finally stop root. No roots should be left in snapshot.
2768        root.drain_and_stop().unwrap();
2769        root.await;
2770        {
2771            let snapshot = proc.state().ledger.snapshot();
2772            assert_eq!(snapshot.roots, hashmap! {});
2773        }
2774    }
2775
2776    #[async_timed_test(timeout_secs = 30)]
2777    async fn test_multi_handler() {
2778        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
2779        // public interfaces to multi-handlers. This will be fixed shortly.
2780
2781        #[derive(Debug)]
2782        struct TestActor(Arc<AtomicUsize>);
2783
2784        #[async_trait]
2785        impl Actor for TestActor {
2786            type Params = Arc<AtomicUsize>;
2787
2788            async fn new(param: Arc<AtomicUsize>) -> Result<Self, anyhow::Error> {
2789                Ok(Self(param))
2790            }
2791        }
2792
2793        #[async_trait]
2794        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2795            async fn handle(
2796                &mut self,
2797                cx: &crate::Context<Self>,
2798                message: OncePortHandle<PortHandle<usize>>,
2799            ) -> anyhow::Result<()> {
2800                message.send(cx.port())?;
2801                Ok(())
2802            }
2803        }
2804
2805        #[async_trait]
2806        impl Handler<usize> for TestActor {
2807            async fn handle(
2808                &mut self,
2809                _cx: &crate::Context<Self>,
2810                message: usize,
2811            ) -> anyhow::Result<()> {
2812                self.0.fetch_add(message, Ordering::SeqCst);
2813                Ok(())
2814            }
2815        }
2816
2817        let proc = Proc::local();
2818        let state = Arc::new(AtomicUsize::new(0));
2819        let handle = proc
2820            .spawn::<TestActor>("test", state.clone())
2821            .await
2822            .unwrap();
2823        let client = proc.attach("client").unwrap();
2824        let (tx, rx) = client.open_once_port();
2825        handle.send(tx).unwrap();
2826        let usize_handle = rx.recv().await.unwrap();
2827        usize_handle.send(123).unwrap();
2828
2829        handle.drain_and_stop().unwrap();
2830        handle.await;
2831
2832        assert_eq!(state.load(Ordering::SeqCst), 123);
2833    }
2834
2835    #[async_timed_test(timeout_secs = 30)]
2836    async fn test_actor_panic() {
2837        // Need this custom hook to store panic backtrace in task_local.
2838        panic_handler::set_panic_hook();
2839
2840        let proc = Proc::local();
2841        // Need to set a supervison coordinator for this Proc because there will
2842        // be actor failure(s) in this test which trigger supervision.
2843        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2844
2845        let (client, _handle) = proc.instance("client").unwrap();
2846        let actor_handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2847        actor_handle
2848            .panic(&client, "some random failure".to_string())
2849            .await
2850            .unwrap();
2851        let actor_status = actor_handle.await;
2852
2853        // Note: even when the test passes, the panic stacktrace will still be
2854        // printed to stderr because that is the behavior controlled by the panic
2855        // hook.
2856        assert_matches!(actor_status, ActorStatus::Failed(_));
2857        if let ActorStatus::Failed(err) = actor_status {
2858            let error_msg = err.to_string();
2859            // Verify panic message is captured
2860            assert!(error_msg.contains("some random failure"));
2861            // Verify backtrace is captured. Note the backtrace message might
2862            // change in the future. If that happens, we need to update this
2863            // statement with something up-to-date.
2864            assert!(error_msg.contains("rust_begin_unwind"));
2865        }
2866    }
2867
2868    #[async_timed_test(timeout_secs = 30)]
2869    async fn test_local_supervision_propagation() {
2870        #[derive(Debug)]
2871        struct TestActor(Arc<AtomicBool>, bool);
2872
2873        #[async_trait]
2874        impl Actor for TestActor {
2875            type Params = (Arc<AtomicBool>, bool);
2876
2877            async fn new(param: (Arc<AtomicBool>, bool)) -> Result<Self, anyhow::Error> {
2878                Ok(Self(param.0, param.1))
2879            }
2880
2881            async fn handle_supervision_event(
2882                &mut self,
2883                _this: &Instance<Self>,
2884                _event: &ActorSupervisionEvent,
2885            ) -> Result<bool, anyhow::Error> {
2886                if !self.1 {
2887                    return Ok(false);
2888                }
2889
2890                tracing::error!(
2891                    "{}: supervision event received: {:?}",
2892                    _this.self_id(),
2893                    _event
2894                );
2895                self.0.store(true, Ordering::SeqCst);
2896                Ok(true)
2897            }
2898        }
2899
2900        #[async_trait]
2901        impl Handler<String> for TestActor {
2902            async fn handle(
2903                &mut self,
2904                cx: &crate::Context<Self>,
2905                message: String,
2906            ) -> anyhow::Result<()> {
2907                tracing::info!("{} received message: {}", cx.self_id(), message);
2908                Err(anyhow::anyhow!(message))
2909            }
2910        }
2911
2912        let proc = Proc::local();
2913        let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2914
2915        let root_state = Arc::new(AtomicBool::new(false));
2916        let root_1_state = Arc::new(AtomicBool::new(false));
2917        let root_1_1_state = Arc::new(AtomicBool::new(false));
2918        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2919        let root_2_state = Arc::new(AtomicBool::new(false));
2920        let root_2_1_state = Arc::new(AtomicBool::new(false));
2921
2922        let root = proc
2923            .spawn::<TestActor>("root", (root_state.clone(), false))
2924            .await
2925            .unwrap();
2926        let root_1 = proc
2927            .spawn_child::<TestActor>(
2928                root.cell().clone(),
2929                (
2930                    root_1_state.clone(),
2931                    true, /* set true so children's event stops here */
2932                ),
2933            )
2934            .await
2935            .unwrap();
2936        let root_1_1 = proc
2937            .spawn_child::<TestActor>(root_1.cell().clone(), (root_1_1_state.clone(), false))
2938            .await
2939            .unwrap();
2940        let root_1_1_1 = proc
2941            .spawn_child::<TestActor>(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false))
2942            .await
2943            .unwrap();
2944        let root_2 = proc
2945            .spawn_child::<TestActor>(root.cell().clone(), (root_2_state.clone(), false))
2946            .await
2947            .unwrap();
2948        let root_2_1 = proc
2949            .spawn_child::<TestActor>(root_2.cell().clone(), (root_2_1_state.clone(), false))
2950            .await
2951            .unwrap();
2952
2953        // fail `root_1_1_1`, the supervision msg should be propagated to
2954        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
2955        root_1_1_1
2956            .send::<String>("some random failure".into())
2957            .unwrap();
2958
2959        // fail `root_2_1`, the supervision msg should be propagated to
2960        // ProcSupervisionCoordinator.
2961        root_2_1
2962            .send::<String>("some random failure".into())
2963            .unwrap();
2964
2965        RealClock.sleep(Duration::from_secs(1)).await;
2966
2967        assert!(!root_state.load(Ordering::SeqCst));
2968        assert!(root_1_state.load(Ordering::SeqCst));
2969        assert!(!root_1_1_state.load(Ordering::SeqCst));
2970        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
2971        assert!(!root_2_state.load(Ordering::SeqCst));
2972        assert!(!root_2_1_state.load(Ordering::SeqCst));
2973        assert_eq!(
2974            reported_event.event().map(|e| e.actor_id.clone()),
2975            Some(root_2_1.actor_id().clone())
2976        );
2977    }
2978
2979    #[async_timed_test(timeout_secs = 30)]
2980    async fn test_instance() {
2981        #[derive(Debug, Default, Actor)]
2982        struct TestActor;
2983
2984        #[async_trait]
2985        impl Handler<(String, PortRef<String>)> for TestActor {
2986            async fn handle(
2987                &mut self,
2988                cx: &crate::Context<Self>,
2989                (message, port): (String, PortRef<String>),
2990            ) -> anyhow::Result<()> {
2991                port.send(cx, message)?;
2992                Ok(())
2993            }
2994        }
2995
2996        let proc = Proc::local();
2997
2998        let (instance, handle) = proc.instance("my_test_actor").unwrap();
2999
3000        let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
3001
3002        let (port, mut receiver) = instance.open_port();
3003        child_actor
3004            .send(("hello".to_string(), port.bind()))
3005            .unwrap();
3006
3007        let message = receiver.recv().await.unwrap();
3008        assert_eq!(message, "hello");
3009
3010        child_actor.drain_and_stop().unwrap();
3011        child_actor.await;
3012
3013        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
3014        drop(instance);
3015        assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
3016        handle.await;
3017    }
3018
3019    #[tokio::test]
3020    async fn test_proc_terminate_without_coordinator() {
3021        if std::env::var("CARGO_TEST").is_ok() {
3022            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
3023            return;
3024        }
3025
3026        let process = async {
3027            let proc = Proc::local();
3028            // Intentionally not setting a proc supervison coordinator. This
3029            // should cause the process to terminate.
3030            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
3031            let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
3032            let (client, _handle) = proc.instance("client").unwrap();
3033            root.fail(&client, anyhow::anyhow!("some random failure"))
3034                .await
3035                .unwrap();
3036            // It is okay to sleep a long time here, because we expect this
3037            // process to be terminated way before the sleep ends due to the
3038            // missing proc supervison coordinator.
3039            RealClock.sleep(Duration::from_secs(30)).await;
3040        };
3041
3042        assert_termination(|| process, 1).await.unwrap();
3043    }
3044
3045    fn trace_and_block(fut: impl Future) {
3046        tracing::subscriber::with_default(
3047            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
3048            || {
3049                tokio::runtime::Builder::new_current_thread()
3050                    .enable_all()
3051                    .build()
3052                    .unwrap()
3053                    .block_on(fut)
3054            },
3055        );
3056    }
3057
3058    #[ignore = "until trace recording is turned back on"]
3059    #[test]
3060    fn test_handler_logging() {
3061        #[derive(Debug, Default, Actor)]
3062        struct LoggingActor;
3063
3064        impl LoggingActor {
3065            async fn wait(handle: &ActorHandle<Self>) {
3066                let barrier = Arc::new(Barrier::new(2));
3067                handle.send(barrier.clone()).unwrap();
3068                barrier.wait().await;
3069            }
3070        }
3071
3072        #[async_trait]
3073        impl Handler<String> for LoggingActor {
3074            async fn handle(
3075                &mut self,
3076                _cx: &crate::Context<Self>,
3077                message: String,
3078            ) -> anyhow::Result<()> {
3079                tracing::info!("{}", message);
3080                Ok(())
3081            }
3082        }
3083
3084        #[async_trait]
3085        impl Handler<u64> for LoggingActor {
3086            async fn handle(
3087                &mut self,
3088                _cx: &crate::Context<Self>,
3089                message: u64,
3090            ) -> anyhow::Result<()> {
3091                tracing::event!(Level::INFO, number = message);
3092                Ok(())
3093            }
3094        }
3095
3096        #[async_trait]
3097        impl Handler<Arc<Barrier>> for LoggingActor {
3098            async fn handle(
3099                &mut self,
3100                _cx: &crate::Context<Self>,
3101                message: Arc<Barrier>,
3102            ) -> anyhow::Result<()> {
3103                message.wait().await;
3104                Ok(())
3105            }
3106        }
3107
3108        #[async_trait]
3109        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3110            async fn handle(
3111                &mut self,
3112                _cx: &crate::Context<Self>,
3113                barriers: Arc<(Barrier, Barrier)>,
3114            ) -> anyhow::Result<()> {
3115                let inner = tracing::span!(Level::INFO, "child_span");
3116                let _inner_guard = inner.enter();
3117                barriers.0.wait().await;
3118                barriers.1.wait().await;
3119                Ok(())
3120            }
3121        }
3122
3123        trace_and_block(async {
3124            let handle = LoggingActor::spawn_detached(()).await.unwrap();
3125            handle.send("hello world".to_string()).unwrap();
3126            handle.send("hello world again".to_string()).unwrap();
3127            handle.send(123u64).unwrap();
3128
3129            LoggingActor::wait(&handle).await;
3130
3131            let events = handle.cell().inner.recording.tail();
3132            assert_eq!(events.len(), 3);
3133            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3134            assert_eq!(
3135                events[1].json_value(),
3136                json!({ "message": "hello world again" })
3137            );
3138            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3139
3140            let stacks = {
3141                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3142                handle.send(Arc::clone(&barriers)).unwrap();
3143                barriers.0.wait().await;
3144                let stacks = handle.cell().inner.recording.stacks();
3145                barriers.1.wait().await;
3146                stacks
3147            };
3148            assert_eq!(stacks.len(), 1);
3149            assert_eq!(stacks[0].len(), 1);
3150            assert_eq!(stacks[0][0].name(), "child_span");
3151        })
3152    }
3153}