hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module provides [`Proc`], which is the runtime used within a single
10//! proc.
11
12// TODO: define a set of proc errors and plumb these throughout
13
14use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::sync::Mutex;
27use std::sync::OnceLock;
28use std::sync::Weak;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicUsize;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::SystemTime;
34
35use async_trait::async_trait;
36use dashmap::DashMap;
37use dashmap::mapref::entry::Entry;
38use dashmap::mapref::multiple::RefMulti;
39use futures::FutureExt;
40use hyperactor_telemetry::recorder;
41use hyperactor_telemetry::recorder::Recording;
42use serde::Deserialize;
43use serde::Serialize;
44use tokio::sync::mpsc;
45use tokio::sync::watch;
46use tokio::task::JoinHandle;
47use tracing::Instrument;
48
49use crate as hyperactor;
50use crate::Actor;
51use crate::ActorRef;
52use crate::Handler;
53use crate::Message;
54use crate::Named;
55use crate::RemoteMessage;
56use crate::accum::ReducerSpec;
57use crate::actor::ActorError;
58use crate::actor::ActorErrorKind;
59use crate::actor::ActorHandle;
60use crate::actor::ActorStatus;
61use crate::actor::Binds;
62use crate::actor::RemoteActor;
63use crate::actor::RemoteHandles;
64use crate::actor::Signal;
65use crate::attrs::Attrs;
66use crate::cap;
67use crate::clock::Clock;
68use crate::clock::ClockKind;
69use crate::clock::RealClock;
70use crate::data::Serialized;
71use crate::data::TypeInfo;
72use crate::mailbox::BoxedMailboxSender;
73use crate::mailbox::DeliveryError;
74use crate::mailbox::Mailbox;
75use crate::mailbox::MailboxMuxer;
76use crate::mailbox::MailboxSender;
77use crate::mailbox::MessageEnvelope;
78use crate::mailbox::OncePortHandle;
79use crate::mailbox::OncePortReceiver;
80use crate::mailbox::PanickingMailboxSender;
81use crate::mailbox::PortHandle;
82use crate::mailbox::PortReceiver;
83use crate::mailbox::Undeliverable;
84use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
85use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
86use crate::metrics::ACTOR_MESSAGES_RECEIVED;
87use crate::panic_handler;
88use crate::reference::ActorId;
89use crate::reference::Index;
90use crate::reference::PortId;
91use crate::reference::ProcId;
92use crate::reference::id;
93use crate::supervision::ActorSupervisionEvent;
94
95/// This is used to mint new local ranks for [`Proc::local`].
96static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
97
98/// A proc instance is the runtime managing a single proc in Hyperactor.
99/// It is responsible for spawning actors in the proc, multiplexing messages
100/// to/within actors in the proc, and providing fallback routing to external
101/// procs.
102///
103/// Procs are also responsible for maintaining the local supervision hierarchy.
104#[derive(Clone, Debug)]
105pub struct Proc {
106    inner: Arc<ProcState>,
107}
108
109#[derive(Debug)]
110struct ProcState {
111    /// The proc's id. This should be globally unique, but is not (yet)
112    /// for local-only procs.
113    proc_id: ProcId,
114
115    /// A muxer instance that has entries for every actor managed by
116    /// the proc.
117    proc_muxer: MailboxMuxer,
118
119    /// Sender used to forward messages outside of the proc.
120    forwarder: BoxedMailboxSender,
121
122    /// All of the roots (i.e., named actors with pid=0) in the proc.
123    /// These are also known as "global actors", since they may be
124    /// spawned remotely.
125    roots: DashMap<String, AtomicUsize>,
126
127    /// Keep track of all of the active actors in the proc.
128    ledger: ActorLedger,
129
130    instances: DashMap<ActorId, WeakInstanceCell>,
131
132    /// Used by root actors to send events to the actor coordinating
133    /// supervision of root actors in this proc.
134    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
135
136    clock: ClockKind,
137}
138
139/// A snapshot view of the proc's actor ledger.
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
141pub struct ActorLedgerSnapshot {
142    /// All the actor trees in the proc, mapping the root id to the root
143    /// of each tree.
144    pub roots: HashMap<ActorId, ActorTreeSnapshot>,
145}
146
147/// A event for one row of log.
148#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
149pub struct Event {
150    /// Time when the event happend.
151    pub time: SystemTime,
152    /// The payload of the event.
153    pub fields: Vec<(String, recorder::Value)>,
154    /// The sequence number of the event.
155    pub seq: usize,
156}
157
158impl From<recorder::Event> for Event {
159    fn from(event: recorder::Event) -> Event {
160        Event {
161            time: event.time,
162            fields: event.fields(),
163            seq: event.seq,
164        }
165    }
166}
167
168/// A snapshot of an actor tree (rooted at a pid=0 actor).
169#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
170pub struct ActorTreeSnapshot {
171    /// The PID of this actor.
172    pub pid: Index,
173
174    /// The type name of the actor. If the actor is [`crate::Named`], then
175    /// this is the registered name; otherwise it is the actor type's
176    /// [`std::any::type_name`].
177    pub type_name: String,
178
179    /// The actor's current status.
180    pub status: ActorStatus,
181
182    /// Various operational stats for the actor.
183    pub stats: ActorStats,
184
185    /// This actor's handlers, mapping port numbers to the named type handled.
186    pub handlers: HashMap<u64, String>,
187
188    /// This actor's children.
189    pub children: HashMap<Index, ActorTreeSnapshot>,
190
191    /// Recent events emitted by the actor's logging.
192    pub events: Vec<Event>,
193
194    /// The current set of spans entered by the actor. These should be active
195    /// only while the actor is entered in a handler.
196    pub spans: Vec<Vec<String>>,
197}
198
199impl Hash for ActorTreeSnapshot {
200    fn hash<H: Hasher>(&self, state: &mut H) {
201        self.pid.hash(state);
202    }
203}
204
205/// Operational stats for an actor instance.
206#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
207#[derive(Default)]
208pub struct ActorStats {
209    /// The number of messages processed by the actor.
210    num_processed_messages: u64,
211}
212
213impl fmt::Display for ActorStats {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "num_processed_messages={}", self.num_processed_messages)
216    }
217}
218
219#[derive(Debug)]
220struct ActorLedger {
221    // Root actors. Map's value is its key's InstanceCell.
222    roots: DashMap<ActorId, WeakInstanceCell>,
223}
224
225impl ActorLedger {
226    fn new() -> Self {
227        Self {
228            roots: DashMap::new(),
229        }
230    }
231
232    fn insert(
233        &self,
234        root_actor_id: ActorId,
235        root_actor_cell: WeakInstanceCell,
236    ) -> Result<(), anyhow::Error> {
237        match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
238            None => Ok(()),
239            // This should never happen because we do not recycle root actor's
240            // IDs.
241            Some(current_cell) => {
242                let debugging_msg = match current_cell.upgrade() {
243                    Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
244                    None => "the stored cell has been dropped".to_string(),
245                };
246
247                Err(anyhow::anyhow!(
248                    "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
249                ))
250            }
251        }
252    }
253
254    /// Get a snapshot view of this ledger.
255    fn snapshot(&self) -> ActorLedgerSnapshot {
256        let roots = self
257            .roots
258            .iter()
259            .flat_map(|r| {
260                let (actor_id, weak_cell) = r.pair();
261                // The actor might have been stopped or errored out. Since we do
262                // not remove inactive actors from ledger, the upgrade() call
263                // will return None in that scenario.
264                weak_cell
265                    .upgrade()
266                    .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
267            })
268            .collect();
269
270        ActorLedgerSnapshot { roots }
271    }
272
273    fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
274        // Get the edges between this actor and its children.
275        let children = cell
276            .child_iter()
277            .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
278            .collect();
279
280        ActorTreeSnapshot {
281            pid: cell.actor_id().pid(),
282            type_name: cell.inner.actor_type.type_name().to_string(),
283            status: cell.status().borrow().clone(),
284            stats: ActorStats {
285                num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
286            },
287            handlers: cell
288                .inner
289                .exported_named_ports
290                .iter()
291                .map(|entry| (*entry.key(), entry.value().to_string()))
292                .collect(),
293            children,
294            events: cell
295                .inner
296                .recording
297                .tail()
298                .into_iter()
299                .map(Event::from)
300                .collect(),
301            spans: cell
302                .inner
303                .recording
304                .stacks()
305                .into_iter()
306                .map(|stack| {
307                    stack
308                        .into_iter()
309                        .map(|meta| meta.name().to_string())
310                        .collect()
311                })
312                .collect(),
313        }
314    }
315}
316
317impl Proc {
318    /// Create a new proc with the given proc id and forwarder.
319    pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
320        Self::new_with_clock(proc_id, forwarder, ClockKind::default())
321    }
322
323    /// Create a new proc with the given proc id, forwarder and clock kind.
324    pub fn new_with_clock(
325        proc_id: ProcId,
326        forwarder: BoxedMailboxSender,
327        clock: ClockKind,
328    ) -> Self {
329        Self {
330            inner: Arc::new(ProcState {
331                proc_id,
332                proc_muxer: MailboxMuxer::new(),
333                forwarder,
334                roots: DashMap::new(),
335                ledger: ActorLedger::new(),
336                instances: DashMap::new(),
337                supervision_coordinator_port: OnceLock::new(),
338                clock,
339            }),
340        }
341    }
342
343    /// Set the supervision coordinator's port for this proc. Return Err if it is
344    /// already set.
345    pub fn set_supervision_coordinator(
346        &self,
347        port: PortHandle<ActorSupervisionEvent>,
348    ) -> Result<(), anyhow::Error> {
349        self.state()
350            .supervision_coordinator_port
351            .set(port)
352            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
353    }
354
355    fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
356        let result = match self.state().supervision_coordinator_port.get() {
357            Some(port) => port.send(event).map_err(anyhow::Error::from),
358            None => Err(anyhow::anyhow!(
359                "coordinator port is not set for proc {}",
360                self.proc_id()
361            )),
362        };
363        if let Err(err) = result {
364            tracing::error!(
365                "proc {}: could not propagate supervision event: {:?}: crashing",
366                self.proc_id(),
367                err
368            );
369
370            std::process::exit(1);
371        }
372    }
373
374    /// Create a new local-only proc. This proc is not allowed to forward messages
375    /// outside of the proc itself.
376    pub fn local() -> Self {
377        // TODO: name these something that is ~ globally unique, e.g., incorporate
378        // the hostname, some GUID, etc.
379        let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
380        // TODO: make it so that local procs can talk to each other.
381        Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
382    }
383
384    /// The proc's ID.
385    pub fn proc_id(&self) -> &ProcId {
386        &self.state().proc_id
387    }
388
389    /// Shared sender used by the proc to forward messages to remote
390    /// destinations.
391    pub fn forwarder(&self) -> &BoxedMailboxSender {
392        &self.inner.forwarder
393    }
394
395    /// Convenience accessor for state.
396    fn state(&self) -> &ProcState {
397        self.inner.as_ref()
398    }
399
400    /// The proc's clock.
401    pub fn clock(&self) -> &ClockKind {
402        &self.state().clock
403    }
404
405    /// Get the snapshot of the ledger.
406    pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
407        self.state().ledger.snapshot()
408    }
409
410    /// Attach a mailbox to the proc with the provided root name.
411    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
412        let actor_id: ActorId = self.allocate_root_id(name)?;
413        Ok(self.bind_mailbox(actor_id))
414    }
415
416    /// Attach a mailbox to the proc as a child actor.
417    pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
418        let actor_id: ActorId = self.allocate_child_id(parent_id)?;
419        Ok(self.bind_mailbox(actor_id))
420    }
421
422    /// Bind a mailbox to the proc.
423    fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
424        let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
425
426        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
427        // by the caller. This will likely require a weak reference.
428        self.state().proc_muxer.bind_mailbox(mbox.clone());
429        mbox
430    }
431
432    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorRef`].
433    /// This is intended only for testing, and will be replaced by simpled utilities.
434    pub fn attach_actor<R, M>(
435        &self,
436        name: &str,
437    ) -> Result<(Mailbox, ActorRef<R>, PortReceiver<M>), anyhow::Error>
438    where
439        M: RemoteMessage,
440        R: RemoteActor + RemoteHandles<M>,
441    {
442        let mbox = self.attach(name)?;
443        let (handle, rx) = mbox.open_port::<M>();
444        handle.bind_to(M::port());
445        let actor_ref = ActorRef::attest(mbox.actor_id().clone());
446        Ok((mbox, actor_ref, rx))
447    }
448
449    /// Spawn a named (root) actor on this proc. The name of the actor must be
450    /// unique.
451    pub async fn spawn<A: Actor>(
452        &self,
453        name: &str,
454        params: A::Params,
455    ) -> Result<ActorHandle<A>, anyhow::Error> {
456        let actor_id = self.allocate_root_id(name)?;
457        let _ = tracing::debug_span!(
458            "spawn_actor",
459            actor_name = name,
460            actor_type = std::any::type_name::<A>(),
461            actor_id = actor_id.to_string(),
462        );
463        let instance = Instance::new(self.clone(), actor_id.clone(), false, None);
464        let actor = A::new(params).await?;
465        // Add this actor to the proc's actor ledger. We do not actively remove
466        // inactive actors from ledger, because the actor's state can be inferred
467        // from its weak cell.
468        self.state()
469            .ledger
470            .insert(actor_id.clone(), instance.cell.downgrade())?;
471
472        instance.start(actor).await
473    }
474
475    /// Create and return an actor instance and its corresponding handle. This allows actors to be
476    /// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
477    /// launch child actors, etc. The actor itself does not handle any messages, and supervision events
478    /// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be
479    /// referenced and stopped.
480    pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
481        let actor_id = self.allocate_root_id(name)?;
482        let _ = tracing::debug_span!(
483            "actor_instance",
484            actor_name = name,
485            actor_type = std::any::type_name::<()>(),
486            actor_id = actor_id.to_string(),
487        );
488
489        let instance = Instance::new(self.clone(), actor_id.clone(), true, None);
490        let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone());
491
492        instance.change_status(ActorStatus::Client);
493
494        Ok((instance, handle))
495    }
496
497    /// Spawn a child actor from the provided parent on this proc. The parent actor
498    /// must already belong to this proc, a fact which is asserted in code.
499    ///
500    /// When spawn_child returns, the child has an associated cell and is linked
501    /// with its parent.
502    async fn spawn_child<A: Actor>(
503        &self,
504        parent: InstanceCell,
505        params: A::Params,
506    ) -> Result<ActorHandle<A>, anyhow::Error> {
507        let actor_id = self.allocate_child_id(parent.actor_id())?;
508        let instance = Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
509        let actor = A::new(params).await?;
510        instance.start(actor).await
511    }
512
513    /// Call `abort` on the `JoinHandle` associated with the given
514    /// root actor. If successful return `Some(root.clone())` else
515    /// `None`.
516    pub fn abort_root_actor(&self, root: &ActorId) -> Option<ActorId> {
517        self.state()
518            .ledger
519            .roots
520            .get(root)
521            .into_iter()
522            .flat_map(|e| e.upgrade())
523            .map(|cell| {
524                // `start` was called on the actor's instance
525                // immediately following `root`'s insertion into the
526                // ledger. Since `Instance::start()` is infallible we
527                // know the cell's task handle has been set so it's
528                // safe to `unwrap`.
529                let h = cell.actor_task_handle().unwrap();
530                tracing::debug!("{}: aborting {:?}", root, h);
531                h.abort();
532                root.clone()
533            })
534            .next()
535    }
536
537    /// Signals to a root actor to stop,
538    /// returning a status observer if successful.
539    pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
540        if let Some(entry) = self.state().ledger.roots.get(actor_id) {
541            match entry.value().upgrade() {
542                None => None, // the root's cell has been dropped
543                Some(cell) => {
544                    tracing::info!("sending stop signal to {}", cell.actor_id());
545                    if let Err(err) = cell.signal(Signal::DrainAndStop) {
546                        tracing::error!(
547                            "{}: failed to send stop signal to pid {}: {:?}",
548                            self.proc_id(),
549                            cell.pid(),
550                            err
551                        );
552                        None
553                    } else {
554                        Some(cell.status().clone())
555                    }
556                }
557            }
558        } else {
559            tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
560            None
561        }
562    }
563
564    /// Stop the proc. Returns a pair of:
565    /// - the actors observed to stop;
566    /// - the actors not observed to stop when timeout.
567    ///
568    /// The "skip_waiting" actor, if it is Some, is always not observed to stop.
569    #[hyperactor::instrument]
570    pub async fn destroy_and_wait(
571        &mut self,
572        timeout: Duration,
573        skip_waiting: Option<&ActorId>,
574    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
575        tracing::debug!("{}: proc stopping", self.proc_id());
576
577        let mut statuses = HashMap::new();
578        for actor_id in self
579            .state()
580            .ledger
581            .roots
582            .iter()
583            .map(|entry| entry.key().clone())
584            .collect::<Vec<_>>()
585        {
586            if let Some(status) = self.stop_actor(&actor_id) {
587                statuses.insert(actor_id, status);
588            }
589        }
590        tracing::debug!("{}: proc stopped", self.proc_id());
591
592        let waits: Vec<_> = statuses
593            .iter_mut()
594            .filter(|(actor_id, _)| Some(*actor_id) != skip_waiting)
595            .map(|(actor_id, root)| {
596                let actor_id = actor_id.clone();
597                async move {
598                    RealClock
599                        .timeout(
600                            timeout,
601                            root.wait_for(|state: &ActorStatus| {
602                                matches!(*state, ActorStatus::Stopped)
603                            }),
604                        )
605                        .await
606                        .ok()
607                        .map(|_| actor_id)
608                }
609            })
610            .collect();
611
612        let results = futures::future::join_all(waits).await;
613        let stopped_actors: Vec<_> = results
614            .iter()
615            .filter_map(|actor_id| actor_id.as_ref())
616            .cloned()
617            .collect();
618        let aborted_actors: Vec<_> = statuses
619            .iter()
620            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
621            .map(|(actor_id, _)| {
622                let _: Option<ActorId> = self.abort_root_actor(actor_id);
623                // If `is_none(&_)` then the proc's `ledger.roots`
624                // contains an entry that wasn't a root or, the
625                // associated actor's instance cell was already
626                // dropped when we went to call `abort()` on the
627                // cell's task handle.
628
629                actor_id.clone()
630            })
631            .collect();
632
633        tracing::info!(
634            "destroy_and_wait: {} actors stopped, {} actors aborted",
635            stopped_actors.len(),
636            aborted_actors.len()
637        );
638        Ok((stopped_actors, aborted_actors))
639    }
640
641    /// Create a root allocation in the proc.
642    #[hyperactor::instrument]
643    fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
644        let name = name.to_string();
645        match self.state().roots.entry(name.to_string()) {
646            Entry::Vacant(entry) => {
647                entry.insert(AtomicUsize::new(1));
648            }
649            Entry::Occupied(_) => {
650                anyhow::bail!("an actor with name '{}' has already been spawned", name)
651            }
652        }
653        Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
654    }
655
656    /// Create a child allocation in the proc.
657    pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
658        assert_eq!(*parent_id.proc_id(), self.state().proc_id);
659        let pid = match self.state().roots.get(parent_id.name()) {
660            None => anyhow::bail!(
661                "no actor named {} in proc {}",
662                parent_id.name(),
663                self.state().proc_id
664            ),
665            Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
666        };
667        Ok(parent_id.child_id(pid))
668    }
669
670    fn downgrade(&self) -> WeakProc {
671        WeakProc::new(self)
672    }
673}
674
675#[async_trait]
676impl MailboxSender for Proc {
677    fn post(
678        &self,
679        envelope: MessageEnvelope,
680        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
681    ) {
682        if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
683            self.state().proc_muxer.post(envelope, return_handle)
684        } else {
685            self.state().forwarder.post(envelope, return_handle)
686        }
687    }
688}
689
690#[derive(Debug)]
691struct WeakProc(Weak<ProcState>);
692
693impl WeakProc {
694    fn new(proc: &Proc) -> Self {
695        Self(Arc::downgrade(&proc.inner))
696    }
697
698    fn upgrade(&self) -> Option<Proc> {
699        self.0.upgrade().map(|inner| Proc { inner })
700    }
701}
702
703#[async_trait]
704impl MailboxSender for WeakProc {
705    fn post(
706        &self,
707        envelope: MessageEnvelope,
708        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
709    ) {
710        match self.upgrade() {
711            Some(proc) => proc.post(envelope, return_handle),
712            None => envelope.undeliverable(
713                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
714                return_handle,
715            ),
716        }
717    }
718}
719
720/// Represents a single work item used by the instance to dispatch to
721/// actor handles. Specifically, this enables handler polymorphism.
722struct WorkCell<A: Actor + Send>(
723    Box<
724        dyn for<'a> FnOnce(
725                &'a mut A,
726                &'a mut Instance<A>,
727            )
728                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
729            + Send
730            + Sync,
731    >,
732);
733
734impl<A: Actor + Send> WorkCell<A> {
735    /// Create a new WorkCell from a concrete function (closure).
736    fn new(
737        f: impl for<'a> FnOnce(
738            &'a mut A,
739            &'a mut Instance<A>,
740        )
741            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
742        + Send
743        + Sync
744        + 'static,
745    ) -> Self {
746        Self(Box::new(f))
747    }
748
749    /// Handle the message represented by this work cell.
750    fn handle<'a>(
751        self,
752        actor: &'a mut A,
753        instance: &'a mut Instance<A>,
754    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
755        (self.0)(actor, instance)
756    }
757}
758
759/// Context for a message currently being handled by an Instance.
760pub struct Context<'a, A: Actor> {
761    instance: &'a Instance<A>,
762    headers: Attrs,
763}
764
765impl<'a, A: Actor> Context<'a, A> {
766    /// Construct a new Context.
767    pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
768        Self { instance, headers }
769    }
770
771    /// Get a reference to the message headers.
772    pub fn headers(&self) -> &Attrs {
773        &self.headers
774    }
775}
776
777impl<A: Actor> Deref for Context<'_, A> {
778    type Target = Instance<A>;
779
780    fn deref(&self) -> &Self::Target {
781        self.instance
782    }
783}
784
785/// An actor instance. This is responsible for managing a running actor, including
786/// its full lifecycle, supervision, signal management, etc. Instances can represent
787/// a managed actor or a "client" actor that has joined the proc.
788pub struct Instance<A: Actor> {
789    /// The proc that owns this instance.
790    proc: Proc,
791
792    /// The instance cell that manages instance hierarchy.
793    cell: InstanceCell,
794
795    /// The mailbox associated with the actor.
796    mailbox: Mailbox,
797
798    /// Receivers for the actor loop, if available.
799    actor_loop_receivers: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
800
801    ports: Arc<Ports<A>>,
802
803    /// Handler work queue.
804    work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
805
806    /// A watch for communicating the actor's state.
807    status_tx: watch::Sender<ActorStatus>,
808
809    status_span: Mutex<tracing::Span>,
810
811    /// The timestamp of when the currently active status was set.
812    _last_status_change: Arc<tokio::time::Instant>,
813}
814
815impl<A: Actor> Instance<A> {
816    /// Create a new actor instance in Created state.
817    pub(crate) fn new(
818        proc: Proc,
819        actor_id: ActorId,
820        detached: bool,
821        parent: Option<InstanceCell>,
822    ) -> Self {
823        // Set up messaging
824        let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
825        let (work_tx, work_rx) = mpsc::unbounded_channel();
826        let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
827        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
828        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
829
830        let actor_type = match TypeInfo::of::<A>() {
831            Some(info) => ActorType::Named(info),
832            None => ActorType::Anonymous(std::any::type_name::<A>()),
833        };
834        let ais = actor_id.to_string();
835
836        let actor_loop_ports = if detached {
837            None
838        } else {
839            let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
840            let (supervision_port, supervision_receiver) = mailbox.open_port();
841            Some((
842                (signal_port, supervision_port),
843                (signal_receiver, supervision_receiver),
844            ))
845        };
846
847        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
848
849        let cell = InstanceCell::new(
850            actor_id,
851            actor_type,
852            proc.clone(),
853            actor_loop,
854            status_rx,
855            parent,
856            ports.clone(),
857        );
858        let start = proc.clock().now();
859
860        Self {
861            proc,
862            cell,
863            mailbox,
864            actor_loop_receivers,
865            ports,
866            work_rx,
867            status_tx,
868            status_span: Mutex::new(tracing::debug_span!(
869                "actor_status",
870                actor_id = ais,
871                name = "created"
872            )),
873            _last_status_change: Arc::new(start),
874        }
875    }
876
877    /// Notify subscribers of a change in the actors status and bump counters with the duration which
878    /// the last status was active for.
879    fn change_status(&self, new: ActorStatus) {
880        // let old = self.status_tx.send_replace(new.clone());
881        self.status_tx.send_replace(new.clone());
882        let actor_id_str = self.self_id().to_string();
883        *self.status_span.lock().expect("can't change") = tracing::debug_span!(
884            "actor_status",
885            actor_id = actor_id_str,
886            name = new.arm().unwrap_or_default()
887        );
888    }
889
890    /// This instance's actor ID.
891    pub fn self_id(&self) -> &ActorId {
892        self.mailbox.actor_id()
893    }
894
895    /// Signal the actor to stop.
896    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
897    pub fn stop(&self) -> Result<(), ActorError> {
898        tracing::info!("Instance::stop called, {}", self.cell.actor_id());
899        self.cell.signal(Signal::DrainAndStop)
900    }
901
902    /// Open a new port that accepts M-typed messages. The returned
903    /// port may be freely cloned, serialized, and passed around. The
904    /// returned receiver should only be retained by the actor responsible
905    /// for processing the delivered messages.
906    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
907        self.mailbox.open_port()
908    }
909
910    /// Open a new one-shot port that accepts M-typed messages. The
911    /// returned port may be used to send a single message; ditto the
912    /// receiver may receive a single message.
913    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
914        self.mailbox.open_once_port()
915    }
916
917    /// Send a message to the actor running on the proc.
918    pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
919        <Self as cap::sealed::CanSend>::post(self, port_id, headers, message)
920    }
921
922    /// Send a message to the actor itself with a delay usually to trigger some event.
923    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
924    pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
925    where
926        M: Message,
927        A: Handler<M>,
928    {
929        let port = self.port();
930        let self_id = self.self_id().clone();
931        let clock = self.proc.state().clock.clone();
932        tokio::spawn(async move {
933            clock.non_advancing_sleep(delay).await;
934            if let Err(e) = port.send(message) {
935                // TODO: this is a fire-n-forget thread. We need to
936                // handle errors in a better way.
937                tracing::info!("{}: error sending delayed message: {}", self_id, e);
938            }
939        });
940        Ok(())
941    }
942
943    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
944    /// the actor has been linked with its parent, if it has one.
945    #[hyperactor::instrument]
946    async fn start(self, actor: A) -> Result<ActorHandle<A>, anyhow::Error> {
947        let instance_cell = self.cell.clone();
948        let actor_id = self.cell.actor_id().clone();
949        let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone());
950        let actor_task_handle =
951            A::spawn_server_task(panic_handler::with_backtrace_tracking(self.serve(actor)));
952        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
953        instance_cell
954            .inner
955            .actor_task_handle
956            .set(actor_task_handle)
957            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
958
959        Ok(actor_handle)
960    }
961
962    async fn serve(mut self, mut actor: A) {
963        let actor_loop_receivers = self.actor_loop_receivers.take().unwrap();
964
965        let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await;
966
967        let (actor_status, event) = match result {
968            Ok(_) => (ActorStatus::Stopped, None),
969            Err(ActorError {
970                kind: ActorErrorKind::UnhandledSupervisionEvent(event),
971                ..
972            }) => (event.actor_status.clone(), Some(event)),
973            Err(err) => (
974                ActorStatus::Failed(err.to_string()),
975                Some(ActorSupervisionEvent {
976                    actor_id: self.cell.actor_id().clone(),
977                    actor_status: ActorStatus::Failed(err.to_string()),
978                    message_headers: None,
979                    caused_by: None,
980                }),
981            ),
982        };
983
984        if let Some(parent) = self.cell.maybe_unlink_parent() {
985            if let Some(event) = event {
986                // Parent exists, failure should be propagated to the parent.
987                parent.send_supervision_event_or_crash(event);
988            }
989            // TODO: we should get rid of this signal, and use *only* supervision events for
990            // the purpose of conveying lifecycle changes
991            if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
992                tracing::error!(
993                    "{}: failed to send stop message to parent pid {}: {:?}",
994                    self.self_id(),
995                    parent.pid(),
996                    err
997                );
998            }
999        } else {
1000            // Failure happened to the root actor or orphaned child actors.
1001            // In either case, the failure should be propagated to proc.
1002            //
1003            // Note that orphaned actor is unexpected and would only happen if
1004            // there is a bug.
1005            if let Some(event) = event {
1006                self.proc.handle_supervision_event(event);
1007            }
1008        }
1009        self.change_status(actor_status);
1010    }
1011
1012    /// Runs the actor, and manages its supervision tree. When the function returns,
1013    /// the whole tree rooted at this actor has stopped.
1014    async fn run_actor_tree(
1015        &mut self,
1016        actor: &mut A,
1017        mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1018    ) -> Result<(), ActorError> {
1019        // It is okay to catch all panics here, because we are in a tokio task,
1020        // and tokio will catch the panic anyway:
1021        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
1022        // What we do here is just to catch it early so we can handle it.
1023
1024        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers))
1025            .catch_unwind()
1026            .await
1027        {
1028            Ok(result) => result,
1029            Err(err) => {
1030                // This is only the error message. Backtrace is not included.
1031                let err_msg = err
1032                    .downcast_ref::<&str>()
1033                    .copied()
1034                    .or_else(|| err.downcast_ref::<String>().map(|s| s.as_str()))
1035                    .unwrap_or("panic cannot be downcasted");
1036
1037                let backtrace = panic_handler::take_panic_backtrace()
1038                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1039                Err(ActorError::new(
1040                    self.self_id().clone(),
1041                    ActorErrorKind::Panic(anyhow::anyhow!("{}\n{}", err_msg, backtrace)),
1042                ))
1043            }
1044        };
1045
1046        if let Err(ref err) = result {
1047            tracing::error!("{}: actor failure: {}", self.self_id(), err);
1048        }
1049        self.change_status(ActorStatus::Stopping);
1050
1051        // After this point, we know we won't spawn any more children,
1052        // so we can safely read the current child keys.
1053        let mut to_unlink = Vec::new();
1054        for child in self.cell.child_iter() {
1055            if let Err(err) = child.value().signal(Signal::Stop) {
1056                tracing::error!(
1057                    "{}: failed to send stop signal to child pid {}: {:?}",
1058                    self.self_id(),
1059                    child.key(),
1060                    err
1061                );
1062                to_unlink.push(child.value().clone());
1063            }
1064        }
1065        // Manually unlink children that have already been stopped.
1066        for child in to_unlink {
1067            self.cell.unlink(&child);
1068        }
1069
1070        let (mut signal_receiver, _) = actor_loop_receivers;
1071        while self.cell.child_count() > 0 {
1072            match signal_receiver.recv().await? {
1073                Signal::ChildStopped(pid) => {
1074                    assert!(self.cell.get_child(pid).is_none());
1075                }
1076                _ => (),
1077            }
1078        }
1079
1080        result
1081    }
1082
1083    /// Initialize and run the actor until it fails or is stopped.
1084    async fn run(
1085        &mut self,
1086        actor: &mut A,
1087        actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1088    ) -> Result<(), ActorError> {
1089        tracing::debug!("entering actor loop: {}", self.self_id());
1090
1091        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1092
1093        self.change_status(ActorStatus::Initializing);
1094        actor
1095            .init(self)
1096            .await
1097            .map_err(|err| ActorError::new(self.self_id().clone(), ActorErrorKind::Init(err)))?;
1098        let need_drain;
1099        'messages: loop {
1100            self.change_status(ActorStatus::Idle);
1101            let metric_pairs =
1102                hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1103            tokio::select! {
1104                work = self.work_rx.recv() => {
1105                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1106                    ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1107                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1108                    let work = work.expect("inconsistent work queue state");
1109                    if let Err(err) = work.handle(actor, self).await {
1110                        for supervision_event in supervision_event_receiver.drain() {
1111                            self.handle_supervision_event(actor, supervision_event).await?;
1112                        }
1113                        return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
1114                    }
1115                }
1116                signal = signal_receiver.recv() => {
1117                    let signal = signal.map_err(ActorError::from);
1118                    tracing::debug!("Received signal {signal:?}");
1119                    match signal? {
1120                        signal@(Signal::Stop | Signal::DrainAndStop) => {
1121                            need_drain = matches!(signal, Signal::DrainAndStop);
1122                            break 'messages;
1123                        },
1124                        Signal::ChildStopped(pid) => {
1125                            assert!(self.cell.get_child(pid).is_none());
1126                        },
1127                    }
1128                }
1129                Ok(supervision_event) = supervision_event_receiver.recv() => {
1130                    self.handle_supervision_event(actor, supervision_event).await?;
1131                }
1132            }
1133            self.cell
1134                .inner
1135                .num_processed_messages
1136                .fetch_add(1, Ordering::SeqCst);
1137        }
1138
1139        if need_drain {
1140            self.change_status(ActorStatus::Stopping);
1141            let mut n = 0;
1142            while let Ok(work) = self.work_rx.try_recv() {
1143                if let Err(err) = work.handle(actor, self).await {
1144                    return Err(ActorError::new(
1145                        self.self_id().clone(),
1146                        ActorErrorKind::Processing(err),
1147                    ));
1148                }
1149                n += 1;
1150            }
1151            tracing::debug!("drained {} messages", n);
1152        }
1153        tracing::debug!("exited actor loop: {}", self.self_id());
1154        self.change_status(ActorStatus::Stopped);
1155        Ok(())
1156    }
1157
1158    async fn handle_supervision_event(
1159        &self,
1160        actor: &mut A,
1161        supervision_event: ActorSupervisionEvent,
1162    ) -> Result<(), ActorError> {
1163        // Handle the supervision event with the current actor.
1164        match actor
1165            .handle_supervision_event(self, &supervision_event)
1166            .await
1167        {
1168            Ok(true) => {
1169                // The supervision event was handled by this actor, nothing more to do.
1170                Ok(())
1171            }
1172            Ok(false) => {
1173                // The supervision event wasn't handled by this actor, chain it and bubble it up.
1174                let supervision_event = ActorSupervisionEvent {
1175                    actor_id: self.self_id().clone(),
1176                    actor_status: ActorStatus::Failed(
1177                        "did not handle supervision event".to_string(),
1178                    ),
1179                    message_headers: None,
1180                    caused_by: Some(Box::new(supervision_event)),
1181                };
1182                Err(supervision_event.into())
1183            }
1184            Err(err) => {
1185                // The actor failed to handle the supervision event, it should die.
1186                // Create a new supervision event for this failure and propagate it.
1187                let supervision_event = ActorSupervisionEvent {
1188                    actor_id: self.self_id().clone(),
1189                    actor_status: ActorStatus::Failed(format!(
1190                        "failed to handle supervision event: {}",
1191                        err
1192                    )),
1193                    message_headers: None,
1194                    caused_by: Some(Box::new(supervision_event)),
1195                };
1196                Err(supervision_event.into())
1197            }
1198        }
1199    }
1200
1201    async unsafe fn handle_message<M: Message>(
1202        &mut self,
1203        actor: &mut A,
1204        type_info: Option<&'static TypeInfo>,
1205        headers: Attrs,
1206        message: M,
1207    ) -> Result<(), anyhow::Error>
1208    where
1209        A: Handler<M>,
1210    {
1211        let handler = type_info.map(|info| {
1212            (
1213                info.typename().to_string(),
1214                // SAFETY: The caller promises to pass the correct type info.
1215                unsafe {
1216                    info.arm_unchecked(&message as *const M as *const ())
1217                        .map(str::to_string)
1218                },
1219            )
1220        });
1221
1222        let _ = self.change_status(ActorStatus::Processing(
1223            self.clock().system_time_now(),
1224            handler,
1225        ));
1226        let span = self.status_span.lock().unwrap().clone();
1227
1228        let context = Context::new(self, headers);
1229        // Pass a reference to the context to the handler, so that deref
1230        // coercion allows the `this` argument to be treated exactly like
1231        // &Instance<A>.
1232        actor.handle(&context, message).instrument(span).await
1233    }
1234
1235    /// Return a handle port handle representing the actor's message
1236    /// handler for M-typed messages.
1237    pub fn port<M: Message>(&self) -> PortHandle<M>
1238    where
1239        A: Handler<M>,
1240    {
1241        self.ports.get()
1242    }
1243
1244    /// The [`ActorHandle`] corresponding to this instance.
1245    pub fn handle(&self) -> ActorHandle<A> {
1246        ActorHandle::new(self.cell.clone(), Arc::clone(&self.ports))
1247    }
1248
1249    /// The owning actor ref.
1250    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1251        self.cell.bind(self.ports.as_ref())
1252    }
1253
1254    // Temporary in order to support python bindings.
1255    #[doc(hidden)]
1256    pub fn mailbox_for_py(&self) -> &Mailbox {
1257        &self.mailbox
1258    }
1259
1260    /// A reference to the proc's clock
1261    pub fn clock(&self) -> &(impl Clock + use<A>) {
1262        &self.proc.state().clock
1263    }
1264
1265    /// The owning proc.
1266    pub fn proc(&self) -> &Proc {
1267        &self.proc
1268    }
1269}
1270
1271impl<A: Actor> Drop for Instance<A> {
1272    fn drop(&mut self) {
1273        self.status_tx.send_if_modified(|status| {
1274            if status.is_terminal() {
1275                false
1276            } else {
1277                *status = ActorStatus::Stopped;
1278                true
1279            }
1280        });
1281    }
1282}
1283
1284impl<A: Actor> cap::sealed::CanSend for Instance<A> {
1285    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1286        let envelope = MessageEnvelope::new(self.self_id().clone(), dest, data, headers);
1287        self.proc.post(envelope, self.ports.get());
1288    }
1289    fn actor_id(&self) -> &ActorId {
1290        self.self_id()
1291    }
1292}
1293
1294impl<A: Actor> cap::sealed::CanSend for &Instance<A> {
1295    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1296        (*self).post(dest, headers, data)
1297    }
1298    fn actor_id(&self) -> &ActorId {
1299        self.self_id()
1300    }
1301}
1302
1303impl<A: Actor> cap::sealed::CanOpenPort for Instance<A> {
1304    fn mailbox(&self) -> &Mailbox {
1305        &self.mailbox
1306    }
1307}
1308
1309impl<A: Actor> cap::sealed::CanOpenPort for &Instance<A> {
1310    fn mailbox(&self) -> &Mailbox {
1311        &self.mailbox
1312    }
1313}
1314
1315impl<A: Actor> cap::sealed::CanSplitPort for Instance<A> {
1316    fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1317        self.mailbox.split(port_id, reducer_spec)
1318    }
1319}
1320
1321#[async_trait]
1322impl<A: Actor> cap::sealed::CanSpawn for Instance<A> {
1323    async fn spawn<C: Actor>(&self, params: C::Params) -> anyhow::Result<ActorHandle<C>> {
1324        self.proc.spawn_child(self.cell.clone(), params).await
1325    }
1326}
1327
1328impl<A: Actor> cap::sealed::CanResolveActorRef for Instance<A> {
1329    fn resolve_actor_ref<R: RemoteActor + Actor>(
1330        &self,
1331        actor_ref: &ActorRef<R>,
1332    ) -> Option<ActorHandle<R>> {
1333        self.proc
1334            .inner
1335            .instances
1336            .get(actor_ref.actor_id())?
1337            .upgrade()?
1338            .downcast_handle()
1339    }
1340}
1341
1342impl<A: Actor> cap::sealed::CanSend for Context<'_, A> {
1343    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1344        <Instance<A> as cap::sealed::CanSend>::post(self, dest, headers, data)
1345    }
1346    fn actor_id(&self) -> &ActorId {
1347        self.self_id()
1348    }
1349}
1350
1351impl<A: Actor> cap::sealed::CanSend for &Context<'_, A> {
1352    fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
1353        <Instance<A> as cap::sealed::CanSend>::post(self, dest, headers, data)
1354    }
1355    fn actor_id(&self) -> &ActorId {
1356        self.self_id()
1357    }
1358}
1359
1360impl<A: Actor> cap::sealed::CanOpenPort for Context<'_, A> {
1361    fn mailbox(&self) -> &Mailbox {
1362        <Instance<A> as cap::sealed::CanOpenPort>::mailbox(self)
1363    }
1364}
1365
1366impl<A: Actor> cap::sealed::CanOpenPort for &Context<'_, A> {
1367    fn mailbox(&self) -> &Mailbox {
1368        <Instance<A> as cap::sealed::CanOpenPort>::mailbox(self)
1369    }
1370}
1371
1372impl<A: Actor> cap::sealed::CanSplitPort for Context<'_, A> {
1373    fn split(&self, port_id: PortId, reducer_spec: Option<ReducerSpec>) -> anyhow::Result<PortId> {
1374        <Instance<A> as cap::sealed::CanSplitPort>::split(self, port_id, reducer_spec)
1375    }
1376}
1377
1378#[async_trait]
1379impl<A: Actor> cap::sealed::CanSpawn for Context<'_, A> {
1380    async fn spawn<C: Actor>(&self, params: C::Params) -> anyhow::Result<ActorHandle<C>> {
1381        <Instance<A> as cap::sealed::CanSpawn>::spawn(self, params).await
1382    }
1383}
1384
1385impl<A: Actor> cap::sealed::CanResolveActorRef for Context<'_, A> {
1386    fn resolve_actor_ref<R: RemoteActor + Actor>(
1387        &self,
1388        actor_ref: &ActorRef<R>,
1389    ) -> Option<ActorHandle<R>> {
1390        <Instance<A> as cap::sealed::CanResolveActorRef>::resolve_actor_ref(self, actor_ref)
1391    }
1392}
1393
1394#[derive(Debug)]
1395enum ActorType {
1396    Named(&'static TypeInfo),
1397    Anonymous(&'static str),
1398}
1399
1400impl ActorType {
1401    /// The actor's type name.
1402    fn type_name(&self) -> &'static str {
1403        match self {
1404            Self::Named(info) => info.typename(),
1405            Self::Anonymous(name) => name,
1406        }
1407    }
1408}
1409
1410/// InstanceCell contains all of the type-erased, shareable state of an instance.
1411/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
1412/// to access the underlying instance.
1413///
1414/// InstanceCell is reference counted and cloneable.
1415#[derive(Clone, Debug)]
1416pub struct InstanceCell {
1417    inner: Arc<InstanceState>,
1418}
1419
1420#[derive(Debug)]
1421struct InstanceState {
1422    /// The actor's id.
1423    actor_id: ActorId,
1424
1425    /// Actor info contains the actor's type information.
1426    actor_type: ActorType,
1427
1428    /// The proc in which the actor is running.
1429    proc: Proc,
1430
1431    /// Control port handles to the actor loop, if one is running.
1432    actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1433
1434    /// An observer that stores the current status of the actor.
1435    status: watch::Receiver<ActorStatus>,
1436
1437    /// A weak reference to this instance's parent.
1438    parent: WeakInstanceCell,
1439
1440    /// This instance's children by their PIDs.
1441    children: DashMap<Index, InstanceCell>,
1442
1443    /// Access to the spawned actor's join handle.
1444    actor_task_handle: OnceLock<JoinHandle<()>>,
1445
1446    /// The set of named ports that are exported by this actor.
1447    exported_named_ports: DashMap<u64, &'static str>,
1448
1449    /// The number of messages processed by this actor.
1450    num_processed_messages: AtomicU64,
1451
1452    /// The log recording associated with this actor. It is used to
1453    /// store a 'flight record' of events while the actor is running.
1454    recording: Recording,
1455
1456    /// A type-erased reference to Ports<A>, which allows us to recover
1457    /// an ActorHandle<A> by downcasting.
1458    ports: Arc<dyn Any + Send + Sync>,
1459}
1460
1461impl InstanceState {
1462    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1463    /// the parent is returned.
1464    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1465        self.parent
1466            .upgrade()
1467            .filter(|parent| parent.inner.unlink(self))
1468    }
1469
1470    /// Unlink this instance from a child.
1471    fn unlink(&self, child: &InstanceState) -> bool {
1472        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1473        self.children.remove(&child.actor_id.pid()).is_some()
1474    }
1475}
1476
1477impl InstanceCell {
1478    /// Creates a new instance cell with the provided internal state. If a parent
1479    /// is provided, it is linked to this cell.
1480    fn new(
1481        actor_id: ActorId,
1482        actor_type: ActorType,
1483        proc: Proc,
1484        actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1485        status: watch::Receiver<ActorStatus>,
1486        parent: Option<InstanceCell>,
1487        ports: Arc<dyn Any + Send + Sync>,
1488    ) -> Self {
1489        let _ais = actor_id.to_string();
1490        let cell = Self {
1491            inner: Arc::new(InstanceState {
1492                actor_id: actor_id.clone(),
1493                actor_type,
1494                proc: proc.clone(),
1495                actor_loop,
1496                status,
1497                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1498                children: DashMap::new(),
1499                actor_task_handle: OnceLock::new(),
1500                exported_named_ports: DashMap::new(),
1501                num_processed_messages: AtomicU64::new(0),
1502                recording: hyperactor_telemetry::recorder().record(64),
1503                ports,
1504            }),
1505        };
1506        cell.maybe_link_parent();
1507        proc.inner
1508            .instances
1509            .insert(actor_id.clone(), cell.downgrade());
1510        cell
1511    }
1512
1513    fn wrap(inner: Arc<InstanceState>) -> Self {
1514        Self { inner }
1515    }
1516
1517    /// The actor's ID.
1518    pub(crate) fn actor_id(&self) -> &ActorId {
1519        &self.inner.actor_id
1520    }
1521
1522    /// The actor's PID.
1523    pub(crate) fn pid(&self) -> Index {
1524        self.inner.actor_id.pid()
1525    }
1526
1527    /// The actor's join handle.
1528    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1529        self.inner.actor_task_handle.get()
1530    }
1531
1532    /// The instance's status observer.
1533    pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1534        &self.inner.status
1535    }
1536
1537    /// Send a signal to the actor.
1538    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
1539    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1540        if let Some((signal_port, _)) = &self.inner.actor_loop {
1541            signal_port.send(signal).map_err(ActorError::from)
1542        } else {
1543            tracing::warn!(
1544                "{}: attempted to send signal {} to detached actor",
1545                self.inner.actor_id,
1546                signal
1547            );
1548            Ok(())
1549        }
1550    }
1551
1552    /// Used by this actor's children to send a supervision event to this actor.
1553    /// When it fails to send, we will crash the process. As part of the crash,
1554    /// all the procs and actors running on this process will be terminated
1555    /// forcefully.
1556    ///
1557    /// Note that "let it crash" is the default behavior when a supervision event
1558    /// cannot be delivered upstream. It is the upstream's responsibility to
1559    /// detect and handle crashes.
1560    pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1561        match &self.inner.actor_loop {
1562            Some((_, supervision_port)) => {
1563                if let Err(err) = supervision_port.send(event) {
1564                    tracing::error!(
1565                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1566                        self.actor_id(),
1567                        err
1568                    );
1569                    std::process::exit(1);
1570                }
1571            }
1572            None => {
1573                tracing::error!(
1574                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1575                    self.actor_id(),
1576                    event,
1577                );
1578                std::process::exit(1);
1579            }
1580        }
1581    }
1582
1583    /// Downgrade this InstanceCell to a weak reference.
1584    pub fn downgrade(&self) -> WeakInstanceCell {
1585        WeakInstanceCell {
1586            inner: Arc::downgrade(&self.inner),
1587        }
1588    }
1589
1590    /// Link this instance to a new child.
1591    fn link(&self, child: InstanceCell) {
1592        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1593        self.inner.children.insert(child.pid(), child);
1594    }
1595
1596    /// Unlink this instance from a child.
1597    fn unlink(&self, child: &InstanceCell) {
1598        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1599        self.inner.children.remove(&child.pid());
1600    }
1601
1602    /// Link this instance to its parent, if it has one.
1603    fn maybe_link_parent(&self) {
1604        if let Some(parent) = self.inner.parent.upgrade() {
1605            parent.link(self.clone());
1606        }
1607    }
1608
1609    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1610    /// the parent is returned.
1611    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1612        self.inner.maybe_unlink_parent()
1613    }
1614
1615    /// Get parent instance cell, if it exists.
1616    fn get_parent_cell(&self) -> Option<InstanceCell> {
1617        self.inner.parent.upgrade()
1618    }
1619
1620    /// Return an iterator over this instance's children. This may deadlock if the
1621    /// caller already holds a reference to any item in map.
1622    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1623        self.inner.children.iter()
1624    }
1625
1626    /// The number of children this instance has.
1627    fn child_count(&self) -> usize {
1628        self.inner.children.len()
1629    }
1630
1631    /// Get a child by its PID.
1632    fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1633        self.inner.children.get(&pid).map(|child| child.clone())
1634    }
1635
1636    /// This is temporary so that we can share binding code between handle and instance.
1637    /// We should find some (better) way to consolidate the two.
1638    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1639        <R as Binds<A>>::bind(ports);
1640        // All actors handle signals and undeliverable messages.
1641        ports.bind::<Signal>();
1642        ports.bind::<Undeliverable<MessageEnvelope>>();
1643        // TODO: consider sharing `ports.bound` directly.
1644        for entry in ports.bound.iter() {
1645            self.inner
1646                .exported_named_ports
1647                .insert(*entry.key(), entry.value());
1648        }
1649        ActorRef::attest(self.actor_id().clone())
1650    }
1651
1652    /// Attempt to downcast this cell to a concrete actor handle.
1653    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1654        let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1655        Some(ActorHandle::new(self.clone(), ports))
1656    }
1657}
1658
1659impl Drop for InstanceState {
1660    fn drop(&mut self) {
1661        if let Some(parent) = self.maybe_unlink_parent() {
1662            tracing::debug!(
1663                "instance {} was dropped with parent {} still linked",
1664                self.actor_id,
1665                parent.actor_id()
1666            );
1667        }
1668        if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1669            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1670        }
1671    }
1672}
1673
1674/// A weak version of the InstanceCell. This is used to provide cyclical
1675/// linkage between actors without creating a strong reference cycle.
1676#[derive(Debug, Clone)]
1677pub struct WeakInstanceCell {
1678    inner: Weak<InstanceState>,
1679}
1680
1681impl WeakInstanceCell {
1682    /// Create a new weak instance cell that is never upgradeable.
1683    pub fn new() -> Self {
1684        Self { inner: Weak::new() }
1685    }
1686
1687    /// Upgrade this weak instance cell to a strong reference, if possible.
1688    pub fn upgrade(&self) -> Option<InstanceCell> {
1689        self.inner.upgrade().map(InstanceCell::wrap)
1690    }
1691}
1692
1693/// A polymorphic dictionary that stores ports for an actor's handlers.
1694/// The interface memoizes the ports so that they are reused. We do not
1695/// (yet) support stable identifiers across multiple instances of the same
1696/// actor.
1697pub struct Ports<A: Actor> {
1698    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
1699    bound: DashMap<u64, &'static str>,
1700    mailbox: Mailbox,
1701    workq: mpsc::UnboundedSender<WorkCell<A>>,
1702}
1703
1704impl<A: Actor> Ports<A> {
1705    fn new(mailbox: Mailbox, workq: mpsc::UnboundedSender<WorkCell<A>>) -> Self {
1706        Self {
1707            ports: DashMap::new(),
1708            bound: DashMap::new(),
1709            mailbox,
1710            workq,
1711        }
1712    }
1713
1714    /// Get a port for the Handler<M> of actor A.
1715    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
1716    where
1717        A: Handler<M>,
1718    {
1719        let key = TypeId::of::<M>();
1720        match self.ports.entry(key) {
1721            Entry::Vacant(entry) => {
1722                // Some special case hackery, but it keeps the rest of the code (relatively) simple.
1723                assert_ne!(
1724                    key,
1725                    TypeId::of::<Signal>(),
1726                    "cannot provision Signal port through `Ports::get`"
1727                );
1728
1729                let type_info = TypeInfo::get_by_typeid(key);
1730                let workq = self.workq.clone();
1731                let actor_id = self.mailbox.actor_id().to_string();
1732                let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
1733                    let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
1734                        Box::pin(async move {
1735                            // SAFETY: we guarantee that the passed type_info is for type M.
1736                            unsafe {
1737                                instance
1738                                    .handle_message(actor, type_info, headers, msg)
1739                                    .await
1740                            }
1741                        })
1742                    });
1743                    ACTOR_MESSAGE_QUEUE_SIZE.add(
1744                        1,
1745                        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
1746                    );
1747                    workq.send(work).map_err(anyhow::Error::from)
1748                });
1749                entry.insert(Box::new(port.clone()));
1750                port
1751            }
1752            Entry::Occupied(entry) => {
1753                let port = entry.get();
1754                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
1755            }
1756        }
1757    }
1758
1759    /// Open a (typed) message port as in [`get`], but return a port receiver instead of dispatching
1760    /// the underlying handler.
1761    pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
1762        match self.ports.entry(TypeId::of::<M>()) {
1763            Entry::Vacant(entry) => {
1764                let (port, receiver) = self.mailbox.open_port();
1765                entry.insert(Box::new(port.clone()));
1766                Some((port, receiver))
1767            }
1768            Entry::Occupied(_) => None,
1769        }
1770    }
1771
1772    /// Bind the given message type to its default port.
1773    pub fn bind<M: RemoteMessage>(&self)
1774    where
1775        A: Handler<M>,
1776    {
1777        self.bind_to::<M>(M::port());
1778    }
1779
1780    /// Bind the given message type to the provided port.
1781    /// Ports cannot be rebound to different message types;
1782    /// and attempting to do so will result in a panic.
1783    pub fn bind_to<M: RemoteMessage>(&self, port_index: u64)
1784    where
1785        A: Handler<M>,
1786    {
1787        match self.bound.entry(port_index) {
1788            Entry::Vacant(entry) => {
1789                self.get::<M>().bind_to(port_index);
1790                entry.insert(M::typename());
1791            }
1792            Entry::Occupied(entry) => {
1793                assert_eq!(
1794                    *entry.get(),
1795                    M::typename(),
1796                    "bind {}: port index {} already bound to type {}",
1797                    M::typename(),
1798                    port_index,
1799                    entry.get(),
1800                );
1801            }
1802        }
1803    }
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808    use std::assert_matches::assert_matches;
1809    use std::sync::atomic::AtomicBool;
1810
1811    use hyperactor_macros::export;
1812    use maplit::hashmap;
1813    use serde_json::json;
1814    use tokio::sync::Barrier;
1815    use tokio::sync::oneshot;
1816    use tracing::Level;
1817    use tracing_subscriber::layer::SubscriberExt;
1818    use tracing_test::internal::logs_with_scope_contain;
1819
1820    use super::*;
1821    // needed for in-crate macro expansion
1822    use crate as hyperactor;
1823    use crate::HandleClient;
1824    use crate::Handler;
1825    use crate::OncePortRef;
1826    use crate::PortRef;
1827    use crate::clock::RealClock;
1828    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
1829    use crate::test_utils::process_assertion::assert_termination;
1830
1831    impl ActorTreeSnapshot {
1832        #[allow(dead_code)]
1833        fn empty(pid: Index) -> Self {
1834            Self {
1835                pid,
1836                type_name: String::new(),
1837                status: ActorStatus::Idle,
1838                stats: ActorStats::default(),
1839                handlers: HashMap::new(),
1840                children: HashMap::new(),
1841                events: Vec::new(),
1842                spans: Vec::new(),
1843            }
1844        }
1845
1846        fn empty_typed(pid: Index, type_name: String) -> Self {
1847            Self {
1848                pid,
1849                type_name,
1850                status: ActorStatus::Idle,
1851                stats: ActorStats::default(),
1852                handlers: HashMap::new(),
1853                children: HashMap::new(),
1854                events: Vec::new(),
1855                spans: Vec::new(),
1856            }
1857        }
1858    }
1859
1860    #[derive(Debug, Default, Actor)]
1861    #[export]
1862    struct TestActor;
1863
1864    #[derive(Handler, HandleClient, Debug)]
1865    enum TestActorMessage {
1866        Reply(oneshot::Sender<()>),
1867        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
1868        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
1869        Noop(),
1870        Fail(anyhow::Error),
1871        Panic(String),
1872        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
1873    }
1874
1875    impl TestActor {
1876        async fn spawn_child(parent: &ActorHandle<TestActor>) -> ActorHandle<TestActor> {
1877            let (tx, rx) = oneshot::channel();
1878            parent.send(TestActorMessage::Spawn(tx)).unwrap();
1879            rx.await.unwrap()
1880        }
1881    }
1882
1883    #[async_trait]
1884    #[crate::forward(TestActorMessage)]
1885    impl TestActorMessageHandler for TestActor {
1886        async fn reply(
1887            &mut self,
1888            _cx: &crate::Context<Self>,
1889            sender: oneshot::Sender<()>,
1890        ) -> Result<(), anyhow::Error> {
1891            sender.send(()).unwrap();
1892            Ok(())
1893        }
1894
1895        async fn wait(
1896            &mut self,
1897            _cx: &crate::Context<Self>,
1898            sender: oneshot::Sender<()>,
1899            receiver: oneshot::Receiver<()>,
1900        ) -> Result<(), anyhow::Error> {
1901            sender.send(()).unwrap();
1902            receiver.await.unwrap();
1903            Ok(())
1904        }
1905
1906        async fn forward(
1907            &mut self,
1908            _cx: &crate::Context<Self>,
1909            destination: ActorHandle<TestActor>,
1910            message: Box<TestActorMessage>,
1911        ) -> Result<(), anyhow::Error> {
1912            // TODO: this needn't be async
1913            destination.send(*message)?;
1914            Ok(())
1915        }
1916
1917        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
1918            Ok(())
1919        }
1920
1921        async fn fail(
1922            &mut self,
1923            _cx: &crate::Context<Self>,
1924            err: anyhow::Error,
1925        ) -> Result<(), anyhow::Error> {
1926            Err(err)
1927        }
1928
1929        async fn panic(
1930            &mut self,
1931            _cx: &crate::Context<Self>,
1932            err_msg: String,
1933        ) -> Result<(), anyhow::Error> {
1934            panic!("{}", err_msg);
1935        }
1936
1937        async fn spawn(
1938            &mut self,
1939            cx: &crate::Context<Self>,
1940            reply: oneshot::Sender<ActorHandle<TestActor>>,
1941        ) -> Result<(), anyhow::Error> {
1942            let handle = <Self as Actor>::spawn(cx, ()).await?;
1943            reply.send(handle).unwrap();
1944            Ok(())
1945        }
1946    }
1947
1948    #[tracing_test::traced_test]
1949    #[tokio::test]
1950    async fn test_spawn_actor() {
1951        let proc = Proc::local();
1952        let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
1953
1954        // Check on the join handle.
1955        assert!(logs_contain(
1956            format!(
1957                "{}: spawned with {:?}",
1958                handle.actor_id(),
1959                handle.cell().actor_task_handle().unwrap(),
1960            )
1961            .as_str()
1962        ));
1963
1964        let mut state = handle.status().clone();
1965
1966        // Send a ping-pong to the actor. Wait for the actor to become idle.
1967
1968        let (tx, rx) = oneshot::channel::<()>();
1969        handle.send(TestActorMessage::Reply(tx)).unwrap();
1970        rx.await.unwrap();
1971
1972        state
1973            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
1974            .await
1975            .unwrap();
1976
1977        // Make sure we enter processing state while the actor is handling a message.
1978        let (enter_tx, enter_rx) = oneshot::channel::<()>();
1979        let (exit_tx, exit_rx) = oneshot::channel::<()>();
1980
1981        handle
1982            .send(TestActorMessage::Wait(enter_tx, exit_rx))
1983            .unwrap();
1984        enter_rx.await.unwrap();
1985        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
1986        exit_tx.send(()).unwrap();
1987
1988        state
1989            .wait_for(|state| matches!(*state, ActorStatus::Idle))
1990            .await
1991            .unwrap();
1992
1993        handle.drain_and_stop().unwrap();
1994        handle.await;
1995        assert_matches!(*state.borrow(), ActorStatus::Stopped);
1996    }
1997
1998    #[tokio::test]
1999    async fn test_proc_actors_messaging() {
2000        let proc = Proc::local();
2001        let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2002        let second = proc.spawn::<TestActor>("second", ()).await.unwrap();
2003        let (tx, rx) = oneshot::channel::<()>();
2004        let reply_message = TestActorMessage::Reply(tx);
2005        first
2006            .send(TestActorMessage::Forward(second, Box::new(reply_message)))
2007            .unwrap();
2008        rx.await.unwrap();
2009    }
2010
2011    #[derive(Debug, Default, Actor)]
2012    struct LookupTestActor;
2013
2014    #[derive(Handler, HandleClient, Debug)]
2015    enum LookupTestMessage {
2016        ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2017    }
2018
2019    #[async_trait]
2020    #[crate::forward(LookupTestMessage)]
2021    impl LookupTestMessageHandler for LookupTestActor {
2022        async fn actor_exists(
2023            &mut self,
2024            cx: &crate::Context<Self>,
2025            actor_ref: ActorRef<TestActor>,
2026        ) -> Result<bool, anyhow::Error> {
2027            Ok(actor_ref.downcast_handle(cx).is_some())
2028        }
2029    }
2030
2031    #[tokio::test]
2032    async fn test_actor_lookup() {
2033        let proc = Proc::local();
2034        let client = proc.attach("client").unwrap();
2035
2036        let target_actor = proc.spawn::<TestActor>("target", ()).await.unwrap();
2037        let target_actor_ref = target_actor.bind();
2038        let lookup_actor = proc.spawn::<LookupTestActor>("lookup", ()).await.unwrap();
2039
2040        assert!(
2041            lookup_actor
2042                .actor_exists(&client, target_actor_ref.clone())
2043                .await
2044                .unwrap()
2045        );
2046
2047        // Make up a child actor. It shouldn't exist.
2048        assert!(
2049            !lookup_actor
2050                .actor_exists(
2051                    &client,
2052                    ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2053                )
2054                .await
2055                .unwrap()
2056        );
2057        // A wrongly-typed actor ref should also not obtain.
2058        assert!(
2059            !lookup_actor
2060                .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2061                .await
2062                .unwrap()
2063        );
2064
2065        target_actor.drain_and_stop().unwrap();
2066        target_actor.await;
2067
2068        assert!(
2069            !lookup_actor
2070                .actor_exists(&client, target_actor_ref)
2071                .await
2072                .unwrap()
2073        );
2074
2075        lookup_actor.drain_and_stop().unwrap();
2076        lookup_actor.await;
2077    }
2078
2079    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2080        assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2081        assert_eq!(
2082            child.inner.parent.upgrade().unwrap().actor_id(),
2083            parent.actor_id()
2084        );
2085        assert_matches!(
2086            parent.inner.children.get(&child.pid()),
2087            Some(node) if node.actor_id() == child.actor_id()
2088        );
2089    }
2090
2091    #[tracing_test::traced_test]
2092    #[tokio::test]
2093    async fn test_spawn_child() {
2094        let proc = Proc::local();
2095
2096        let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2097        let second = TestActor::spawn_child(&first).await;
2098        let third = TestActor::spawn_child(&second).await;
2099
2100        // Check we've got the join handles.
2101        assert!(logs_with_scope_contain(
2102            "hyperactor::proc",
2103            format!(
2104                "{}: spawned with {:?}",
2105                first.actor_id(),
2106                first.cell().actor_task_handle().unwrap()
2107            )
2108            .as_str()
2109        ));
2110        assert!(logs_with_scope_contain(
2111            "hyperactor::proc",
2112            format!(
2113                "{}: spawned with {:?}",
2114                second.actor_id(),
2115                second.cell().actor_task_handle().unwrap()
2116            )
2117            .as_str()
2118        ));
2119        assert!(logs_with_scope_contain(
2120            "hyperactor::proc",
2121            format!(
2122                "{}: spawned with {:?}",
2123                third.actor_id(),
2124                third.cell().actor_task_handle().unwrap()
2125            )
2126            .as_str()
2127        ));
2128
2129        // These are allocated in sequence:
2130        assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2131        assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2132        assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2133
2134        // Supervision tree is constructed correctly.
2135        validate_link(third.cell(), second.cell());
2136        validate_link(second.cell(), first.cell());
2137        assert!(first.cell().inner.parent.upgrade().is_none());
2138
2139        // Supervision tree is torn down correctly.
2140        third.drain_and_stop().unwrap();
2141        third.await;
2142        assert!(second.cell().inner.children.is_empty());
2143        validate_link(second.cell(), first.cell());
2144
2145        second.drain_and_stop().unwrap();
2146        second.await;
2147        assert!(first.cell().inner.children.is_empty());
2148    }
2149
2150    #[tokio::test]
2151    async fn test_child_lifecycle() {
2152        let proc = Proc::local();
2153
2154        let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2155        let root_1 = TestActor::spawn_child(&root).await;
2156        let root_2 = TestActor::spawn_child(&root).await;
2157        let root_2_1 = TestActor::spawn_child(&root_2).await;
2158
2159        root.drain_and_stop().unwrap();
2160        root.await;
2161
2162        for actor in [root_1, root_2, root_2_1] {
2163            assert!(actor.send(TestActorMessage::Noop()).is_err());
2164            assert_matches!(actor.await, ActorStatus::Stopped);
2165        }
2166    }
2167
2168    #[tokio::test]
2169    async fn test_parent_failure() {
2170        let proc = Proc::local();
2171        // Need to set a supervison coordinator for this Proc because there will
2172        // be actor failure(s) in this test which trigger supervision.
2173        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2174
2175        let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2176        let root_1 = TestActor::spawn_child(&root).await;
2177        let root_2 = TestActor::spawn_child(&root).await;
2178        let root_2_1 = TestActor::spawn_child(&root_2).await;
2179
2180        root_2
2181            .send(TestActorMessage::Fail(anyhow::anyhow!(
2182                "some random failure"
2183            )))
2184            .unwrap();
2185        let root_2_actor_id = root_2.actor_id().clone();
2186        assert_matches!(
2187            root_2.await,
2188            ActorStatus::Failed(err) if err == format!("serving {}: processing error: some random failure", root_2_actor_id)
2189        );
2190
2191        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
2192        // stopped by a parent failure?
2193        assert_eq!(
2194            root.await,
2195            ActorStatus::Failed("did not handle supervision event".to_string())
2196        );
2197        assert_eq!(root_2_1.await, ActorStatus::Stopped);
2198        assert_eq!(root_1.await, ActorStatus::Stopped);
2199    }
2200
2201    #[tokio::test]
2202    async fn test_actor_ledger() {
2203        async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2204            actor_handle
2205                .status()
2206                .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2207                .await
2208                .unwrap();
2209        }
2210
2211        let proc = Proc::local();
2212
2213        // Add the 1st root. This root will remain active until the end of the test.
2214        let root: ActorHandle<TestActor> = proc.spawn::<TestActor>("root", ()).await.unwrap();
2215        wait_until_idle(&root).await;
2216        {
2217            let snapshot = proc.state().ledger.snapshot();
2218            assert_eq!(
2219                snapshot.roots,
2220                hashmap! {
2221                    root.actor_id().clone() =>
2222                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2223                },
2224            );
2225        }
2226
2227        // Add the 2nd root.
2228        let another_root: ActorHandle<TestActor> =
2229            proc.spawn::<TestActor>("another_root", ()).await.unwrap();
2230        wait_until_idle(&another_root).await;
2231        {
2232            let snapshot = proc.state().ledger.snapshot();
2233            assert_eq!(
2234                snapshot.roots,
2235                hashmap! {
2236                    root.actor_id().clone() =>
2237                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2238                    another_root.actor_id().clone() =>
2239                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2240                },
2241            );
2242        }
2243
2244        // Stop the 2nd root. It should be excluded from the snapshot after it
2245        // is stopped.
2246        another_root.drain_and_stop().unwrap();
2247        another_root.await;
2248        {
2249            let snapshot = proc.state().ledger.snapshot();
2250            assert_eq!(
2251                snapshot.roots,
2252                hashmap! { root.actor_id().clone() =>
2253                    ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2254                },
2255            );
2256        }
2257
2258        // Incrementally add the following children tree to root. This tree
2259        // should be captured by snapshot.
2260        //     root -> root_1 -> root_1_1
2261        //         |-> root_2
2262
2263        let root_1 = TestActor::spawn_child(&root).await;
2264        wait_until_idle(&root_1).await;
2265        {
2266            let snapshot = proc.state().ledger.snapshot();
2267            assert_eq!(
2268                snapshot.roots,
2269                hashmap! {
2270                    root.actor_id().clone() =>  ActorTreeSnapshot {
2271                        pid: 0,
2272                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2273                        status: ActorStatus::Idle,
2274                        stats: ActorStats { num_processed_messages: 1 },
2275                        handlers: HashMap::new(),
2276                        children: hashmap! {
2277                            root_1.actor_id().pid() =>
2278                                ActorTreeSnapshot::empty_typed(
2279                                    root_1.actor_id().pid(),
2280                                    "hyperactor::proc::tests::TestActor".to_string()
2281                                )
2282                        },
2283                        events: Vec::new(),
2284                        spans: Vec::new(),
2285                    }
2286                },
2287            );
2288        }
2289
2290        let root_1_1 = TestActor::spawn_child(&root_1).await;
2291        wait_until_idle(&root_1_1).await;
2292        {
2293            let snapshot = proc.state().ledger.snapshot();
2294            assert_eq!(
2295                snapshot.roots,
2296                hashmap! {
2297                    root.actor_id().clone() =>  ActorTreeSnapshot {
2298                        pid: 0,
2299                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2300                        status: ActorStatus::Idle,
2301                        stats: ActorStats { num_processed_messages: 1 },
2302                        handlers: HashMap::new(),
2303                        children: hashmap!{
2304                            root_1.actor_id().pid() =>
2305                                ActorTreeSnapshot {
2306                                    pid: root_1.actor_id().pid(),
2307                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2308                                    status: ActorStatus::Idle,
2309                                    stats: ActorStats { num_processed_messages: 1 },
2310                                    handlers: HashMap::new(),
2311                                    children: hashmap!{
2312                                        root_1_1.actor_id().pid() =>
2313                                            ActorTreeSnapshot::empty_typed(
2314                                                root_1_1.actor_id().pid(),
2315                                                "hyperactor::proc::tests::TestActor".to_string()
2316                                            )
2317                                    },
2318                                    events: Vec::new(),
2319                                    spans: Vec::new(),
2320                                }
2321                        },
2322                        events: Vec::new(),
2323                        spans: Vec::new(),
2324                    },
2325                }
2326            );
2327        }
2328
2329        let root_2 = TestActor::spawn_child(&root).await;
2330        wait_until_idle(&root_2).await;
2331        {
2332            let snapshot = proc.state().ledger.snapshot();
2333            assert_eq!(
2334                snapshot.roots,
2335                hashmap! {
2336                    root.actor_id().clone() =>  ActorTreeSnapshot {
2337                        pid: 0,
2338                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2339                        status: ActorStatus::Idle,
2340                        stats: ActorStats { num_processed_messages: 2 },
2341                        handlers: HashMap::new(),
2342                        children: hashmap!{
2343                            root_2.actor_id().pid() =>
2344                                ActorTreeSnapshot{
2345                                    pid: root_2.actor_id().pid(),
2346                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2347                                    status: ActorStatus::Idle,
2348                                    stats: ActorStats::default(),
2349                                    handlers: HashMap::new(),
2350                                    children: HashMap::new(),
2351                                    events: Vec::new(),
2352                                    spans: Vec::new(),
2353                                },
2354                            root_1.actor_id().pid() =>
2355                                ActorTreeSnapshot{
2356                                    pid: root_1.actor_id().pid(),
2357                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2358                                    status: ActorStatus::Idle,
2359                                    stats: ActorStats { num_processed_messages: 1 },
2360                                    handlers: HashMap::new(),
2361                                    children: hashmap!{
2362                                        root_1_1.actor_id().pid() =>
2363                                            ActorTreeSnapshot::empty_typed(
2364                                                root_1_1.actor_id().pid(),
2365                                                "hyperactor::proc::tests::TestActor".to_string()
2366                                            )
2367                                    },
2368                                    events: Vec::new(),
2369                                    spans: Vec::new(),
2370                                },
2371                        },
2372                        events: Vec::new(),
2373                        spans: Vec::new(),
2374                    },
2375                }
2376            );
2377        }
2378
2379        // Stop root_1. This should remove it, and its child, from snapshot.
2380        root_1.drain_and_stop().unwrap();
2381        root_1.await;
2382        {
2383            let snapshot = proc.state().ledger.snapshot();
2384            assert_eq!(
2385                snapshot.roots,
2386                hashmap! {
2387                    root.actor_id().clone() =>  ActorTreeSnapshot {
2388                        pid: 0,
2389                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2390                        status: ActorStatus::Idle,
2391                        stats: ActorStats { num_processed_messages: 3 },
2392                        handlers: HashMap::new(),
2393                        children: hashmap!{
2394                            root_2.actor_id().pid() =>
2395                                ActorTreeSnapshot {
2396                                    pid: root_2.actor_id().pid(),
2397                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2398                                    status: ActorStatus::Idle,
2399                                    stats: ActorStats::default(),
2400                                    handlers: HashMap::new(),
2401                                    children: HashMap::new(),
2402                                    events: Vec::new(),
2403                                    spans: Vec::new(),
2404                                }
2405                        },
2406                        events: Vec::new(),
2407                        spans: Vec::new(),
2408                    },
2409                }
2410            );
2411        }
2412
2413        // Finally stop root. No roots should be left in snapshot.
2414        root.drain_and_stop().unwrap();
2415        root.await;
2416        {
2417            let snapshot = proc.state().ledger.snapshot();
2418            assert_eq!(snapshot.roots, hashmap! {});
2419        }
2420    }
2421
2422    #[tokio::test]
2423    async fn test_multi_handler() {
2424        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
2425        // public interfaces to multi-handlers. This will be fixed shortly.
2426
2427        #[derive(Debug)]
2428        struct TestActor(Arc<AtomicUsize>);
2429
2430        #[async_trait]
2431        impl Actor for TestActor {
2432            type Params = Arc<AtomicUsize>;
2433
2434            async fn new(param: Arc<AtomicUsize>) -> Result<Self, anyhow::Error> {
2435                Ok(Self(param))
2436            }
2437        }
2438
2439        #[async_trait]
2440        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2441            async fn handle(
2442                &mut self,
2443                cx: &crate::Context<Self>,
2444                message: OncePortHandle<PortHandle<usize>>,
2445            ) -> anyhow::Result<()> {
2446                message.send(cx.port())?;
2447                Ok(())
2448            }
2449        }
2450
2451        #[async_trait]
2452        impl Handler<usize> for TestActor {
2453            async fn handle(
2454                &mut self,
2455                _cx: &crate::Context<Self>,
2456                message: usize,
2457            ) -> anyhow::Result<()> {
2458                self.0.fetch_add(message, Ordering::SeqCst);
2459                Ok(())
2460            }
2461        }
2462
2463        let proc = Proc::local();
2464        let state = Arc::new(AtomicUsize::new(0));
2465        let handle = proc
2466            .spawn::<TestActor>("test", state.clone())
2467            .await
2468            .unwrap();
2469        let client = proc.attach("client").unwrap();
2470        let (tx, rx) = client.open_once_port();
2471        handle.send(tx).unwrap();
2472        let usize_handle = rx.recv().await.unwrap();
2473        usize_handle.send(123).unwrap();
2474
2475        handle.drain_and_stop().unwrap();
2476        handle.await;
2477
2478        assert_eq!(state.load(Ordering::SeqCst), 123);
2479    }
2480
2481    #[tokio::test]
2482    async fn test_actor_panic() {
2483        // Need this custom hook to store panic backtrace in task_local.
2484        panic_handler::set_panic_hook();
2485
2486        let proc = Proc::local();
2487        // Need to set a supervison coordinator for this Proc because there will
2488        // be actor failure(s) in this test which trigger supervision.
2489        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2490
2491        let client = proc.attach("client").unwrap();
2492        let actor_handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2493        actor_handle
2494            .panic(&client, "some random failure".to_string())
2495            .await
2496            .unwrap();
2497        let actor_status = actor_handle.await;
2498
2499        // Note: even when the test passes, the panic stacktrace will still be
2500        // printed to stderr because that is the behavior controlled by the panic
2501        // hook.
2502        assert_matches!(actor_status, ActorStatus::Failed(_));
2503        if let ActorStatus::Failed(err) = actor_status {
2504            let error_msg = err.to_string();
2505            // Verify panic message is captured
2506            assert!(error_msg.contains("some random failure"));
2507            // Verify backtrace is captured. Note the backtrace message might
2508            // change in the future. If that happens, we need to update this
2509            // statement with something up-to-date.
2510            assert!(error_msg.contains("rust_begin_unwind"));
2511        }
2512    }
2513
2514    #[tokio::test]
2515    async fn test_local_supervision_propagation() {
2516        #[derive(Debug)]
2517        struct TestActor(Arc<AtomicBool>, bool);
2518
2519        #[async_trait]
2520        impl Actor for TestActor {
2521            type Params = (Arc<AtomicBool>, bool);
2522
2523            async fn new(param: (Arc<AtomicBool>, bool)) -> Result<Self, anyhow::Error> {
2524                Ok(Self(param.0, param.1))
2525            }
2526
2527            async fn handle_supervision_event(
2528                &mut self,
2529                _this: &Instance<Self>,
2530                _event: &ActorSupervisionEvent,
2531            ) -> Result<bool, anyhow::Error> {
2532                if !self.1 {
2533                    return Ok(false);
2534                }
2535
2536                tracing::error!(
2537                    "{}: supervision event received: {:?}",
2538                    _this.self_id(),
2539                    _event
2540                );
2541                self.0.store(true, Ordering::SeqCst);
2542                Ok(true)
2543            }
2544        }
2545
2546        #[async_trait]
2547        impl Handler<String> for TestActor {
2548            async fn handle(
2549                &mut self,
2550                cx: &crate::Context<Self>,
2551                message: String,
2552            ) -> anyhow::Result<()> {
2553                tracing::info!("{} received message: {}", cx.self_id(), message);
2554                Err(anyhow::anyhow!(message))
2555            }
2556        }
2557
2558        let proc = Proc::local();
2559        let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2560
2561        let root_state = Arc::new(AtomicBool::new(false));
2562        let root_1_state = Arc::new(AtomicBool::new(false));
2563        let root_1_1_state = Arc::new(AtomicBool::new(false));
2564        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2565        let root_2_state = Arc::new(AtomicBool::new(false));
2566        let root_2_1_state = Arc::new(AtomicBool::new(false));
2567
2568        let root = proc
2569            .spawn::<TestActor>("root", (root_state.clone(), false))
2570            .await
2571            .unwrap();
2572        let root_1 = proc
2573            .spawn_child::<TestActor>(
2574                root.cell().clone(),
2575                (
2576                    root_1_state.clone(),
2577                    true, /* set true so children's event stops here */
2578                ),
2579            )
2580            .await
2581            .unwrap();
2582        let root_1_1 = proc
2583            .spawn_child::<TestActor>(root_1.cell().clone(), (root_1_1_state.clone(), false))
2584            .await
2585            .unwrap();
2586        let root_1_1_1 = proc
2587            .spawn_child::<TestActor>(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false))
2588            .await
2589            .unwrap();
2590        let root_2 = proc
2591            .spawn_child::<TestActor>(root.cell().clone(), (root_2_state.clone(), false))
2592            .await
2593            .unwrap();
2594        let root_2_1 = proc
2595            .spawn_child::<TestActor>(root_2.cell().clone(), (root_2_1_state.clone(), false))
2596            .await
2597            .unwrap();
2598
2599        // fail `root_1_1_1`, the supervision msg should be propagated to
2600        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
2601        root_1_1_1
2602            .send::<String>("some random failure".into())
2603            .unwrap();
2604
2605        // fail `root_2_1`, the supervision msg should be propagated to
2606        // ProcSupervisionCoordinator.
2607        root_2_1
2608            .send::<String>("some random failure".into())
2609            .unwrap();
2610
2611        RealClock.sleep(Duration::from_secs(1)).await;
2612
2613        assert!(!root_state.load(Ordering::SeqCst));
2614        assert!(root_1_state.load(Ordering::SeqCst));
2615        assert!(!root_1_1_state.load(Ordering::SeqCst));
2616        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
2617        assert!(!root_2_state.load(Ordering::SeqCst));
2618        assert!(!root_2_1_state.load(Ordering::SeqCst));
2619        assert_eq!(
2620            reported_event.event().map(|e| e.actor_id.clone()),
2621            Some(root.actor_id().clone())
2622        );
2623    }
2624
2625    #[tokio::test]
2626    async fn test_supervision_event_handler_propagates() {
2627        #[derive(Debug)]
2628        struct FailingSupervisionActor;
2629
2630        #[async_trait]
2631        impl Actor for FailingSupervisionActor {
2632            type Params = ();
2633
2634            async fn new(_: ()) -> Result<Self, anyhow::Error> {
2635                Ok(Self)
2636            }
2637
2638            async fn handle_supervision_event(
2639                &mut self,
2640                _this: &Instance<Self>,
2641                _event: &ActorSupervisionEvent,
2642            ) -> Result<bool, anyhow::Error> {
2643                anyhow::bail!("failed to handle supervision event!")
2644            }
2645        }
2646
2647        #[async_trait]
2648        impl Handler<String> for FailingSupervisionActor {
2649            async fn handle(
2650                &mut self,
2651                _cx: &crate::Context<Self>,
2652                message: String,
2653            ) -> anyhow::Result<()> {
2654                Err(anyhow::anyhow!(message))
2655            }
2656        }
2657
2658        #[derive(Debug)]
2659        struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2660
2661        #[async_trait]
2662        impl Actor for ParentActor {
2663            type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2664
2665            async fn new(
2666                supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2667            ) -> Result<Self, anyhow::Error> {
2668                Ok(Self(supervision_events))
2669            }
2670
2671            async fn handle_supervision_event(
2672                &mut self,
2673                _this: &Instance<Self>,
2674                event: &ActorSupervisionEvent,
2675            ) -> Result<bool, anyhow::Error> {
2676                self.0.send(event.clone()).unwrap();
2677                Ok(true)
2678            }
2679        }
2680
2681        let proc = Proc::local();
2682
2683        let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2684
2685        let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2686        let child = proc
2687            .spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2688            .await
2689            .unwrap();
2690        let grandchild = proc
2691            .spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2692            .await
2693            .unwrap();
2694
2695        let child_actor_id = child.actor_id().clone();
2696        let grandchild_actor_id = grandchild.actor_id().clone();
2697
2698        // Grandchild fails, triggering failure up the tree, finally receiving
2699        // the event at the root.
2700        grandchild.send("trigger failure".to_string()).unwrap();
2701
2702        assert!(grandchild.await.is_failed());
2703        assert!(child.await.is_failed());
2704
2705        assert_eq!(
2706            event_rx.recv().await.unwrap(),
2707            ActorSupervisionEvent {
2708                actor_id: child_actor_id,
2709                actor_status: ActorStatus::Failed(
2710                    "failed to handle supervision event: failed to handle supervision event!"
2711                        .to_string()
2712                ),
2713                message_headers: None,
2714                caused_by: Some(Box::new(ActorSupervisionEvent {
2715                    actor_id: grandchild_actor_id,
2716                    actor_status: ActorStatus::Failed(
2717                        "serving local[0].parent[2]: processing error: trigger failure".to_string()
2718                    ),
2719                    message_headers: None,
2720                    caused_by: None,
2721                })),
2722            }
2723        );
2724
2725        assert!(event_rx.try_recv().is_err());
2726    }
2727
2728    #[tokio::test]
2729    async fn test_instance() {
2730        #[derive(Debug, Default, Actor)]
2731        struct TestActor;
2732
2733        #[async_trait]
2734        impl Handler<(String, PortRef<String>)> for TestActor {
2735            async fn handle(
2736                &mut self,
2737                cx: &crate::Context<Self>,
2738                (message, port): (String, PortRef<String>),
2739            ) -> anyhow::Result<()> {
2740                port.send(cx, message)?;
2741                Ok(())
2742            }
2743        }
2744
2745        let proc = Proc::local();
2746
2747        let (instance, handle) = proc.instance("my_test_actor").unwrap();
2748
2749        let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
2750
2751        let (port, mut receiver) = instance.open_port();
2752        child_actor
2753            .send(("hello".to_string(), port.bind()))
2754            .unwrap();
2755
2756        let message = receiver.recv().await.unwrap();
2757        assert_eq!(message, "hello");
2758
2759        child_actor.drain_and_stop().unwrap();
2760        child_actor.await;
2761
2762        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
2763        drop(instance);
2764        assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
2765        handle.await;
2766    }
2767
2768    #[tokio::test]
2769    async fn test_proc_terminate_without_coordinator() {
2770        if std::env::var("CARGO_TEST").is_ok() {
2771            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
2772            return;
2773        }
2774
2775        let process = async {
2776            let proc = Proc::local();
2777            // Intentionally not setting a proc supervison coordinator. This
2778            // should cause the process to terminate.
2779            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
2780            let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2781            let client = proc.attach("client").unwrap();
2782            root.fail(&client, anyhow::anyhow!("some random failure"))
2783                .await
2784                .unwrap();
2785            // It is okay to sleep a long time here, because we expect this
2786            // process to be terminated way before the sleep ends due to the
2787            // missing proc supervison coordinator.
2788            RealClock.sleep(Duration::from_secs(30)).await;
2789        };
2790
2791        assert_termination(|| process, 1).await.unwrap();
2792    }
2793
2794    fn trace_and_block(fut: impl Future) {
2795        tracing::subscriber::with_default(
2796            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
2797            || {
2798                tokio::runtime::Builder::new_current_thread()
2799                    .enable_all()
2800                    .build()
2801                    .unwrap()
2802                    .block_on(fut)
2803            },
2804        );
2805    }
2806
2807    #[ignore = "until trace recording is turned back on"]
2808    #[test]
2809    fn test_handler_logging() {
2810        #[derive(Debug, Default, Actor)]
2811        struct LoggingActor;
2812
2813        impl LoggingActor {
2814            async fn wait(handle: &ActorHandle<Self>) {
2815                let barrier = Arc::new(Barrier::new(2));
2816                handle.send(barrier.clone()).unwrap();
2817                barrier.wait().await;
2818            }
2819        }
2820
2821        #[async_trait]
2822        impl Handler<String> for LoggingActor {
2823            async fn handle(
2824                &mut self,
2825                _cx: &crate::Context<Self>,
2826                message: String,
2827            ) -> anyhow::Result<()> {
2828                tracing::info!("{}", message);
2829                Ok(())
2830            }
2831        }
2832
2833        #[async_trait]
2834        impl Handler<u64> for LoggingActor {
2835            async fn handle(
2836                &mut self,
2837                _cx: &crate::Context<Self>,
2838                message: u64,
2839            ) -> anyhow::Result<()> {
2840                tracing::event!(Level::INFO, number = message);
2841                Ok(())
2842            }
2843        }
2844
2845        #[async_trait]
2846        impl Handler<Arc<Barrier>> for LoggingActor {
2847            async fn handle(
2848                &mut self,
2849                _cx: &crate::Context<Self>,
2850                message: Arc<Barrier>,
2851            ) -> anyhow::Result<()> {
2852                message.wait().await;
2853                Ok(())
2854            }
2855        }
2856
2857        #[async_trait]
2858        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
2859            async fn handle(
2860                &mut self,
2861                _cx: &crate::Context<Self>,
2862                barriers: Arc<(Barrier, Barrier)>,
2863            ) -> anyhow::Result<()> {
2864                let inner = tracing::span!(Level::INFO, "child_span");
2865                let _inner_guard = inner.enter();
2866                barriers.0.wait().await;
2867                barriers.1.wait().await;
2868                Ok(())
2869            }
2870        }
2871
2872        trace_and_block(async {
2873            let handle = LoggingActor::spawn_detached(()).await.unwrap();
2874            handle.send("hello world".to_string()).unwrap();
2875            handle.send("hello world again".to_string()).unwrap();
2876            handle.send(123u64).unwrap();
2877
2878            LoggingActor::wait(&handle).await;
2879
2880            let events = handle.cell().inner.recording.tail();
2881            assert_eq!(events.len(), 3);
2882            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
2883            assert_eq!(
2884                events[1].json_value(),
2885                json!({ "message": "hello world again" })
2886            );
2887            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
2888
2889            let stacks = {
2890                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
2891                handle.send(Arc::clone(&barriers)).unwrap();
2892                barriers.0.wait().await;
2893                let stacks = handle.cell().inner.recording.stacks();
2894                barriers.1.wait().await;
2895                stacks
2896            };
2897            assert_eq!(stacks.len(), 1);
2898            assert_eq!(stacks[0].len(), 1);
2899            assert_eq!(stacks[0][0].name(), "child_span");
2900        })
2901    }
2902}