hyperactor/
proc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! This module provides [`Proc`], which is the runtime used within a single
10//! proc.
11
12// TODO: define a set of proc errors and plumb these throughout
13
14use std::any::Any;
15use std::any::TypeId;
16use std::collections::HashMap;
17use std::fmt;
18use std::future::Future;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::panic;
23use std::panic::AssertUnwindSafe;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::sync::OnceLock;
27use std::sync::Weak;
28use std::sync::atomic::AtomicU64;
29use std::sync::atomic::AtomicUsize;
30use std::sync::atomic::Ordering;
31use std::time::Duration;
32use std::time::SystemTime;
33
34use async_trait::async_trait;
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use dashmap::mapref::multiple::RefMulti;
38use futures::FutureExt;
39use hyperactor_macros::AttrValue;
40use hyperactor_macros::Named;
41use hyperactor_telemetry::recorder;
42use hyperactor_telemetry::recorder::Recording;
43use serde::Deserialize;
44use serde::Serialize;
45use tokio::sync::mpsc;
46use tokio::sync::watch;
47use tokio::task::JoinHandle;
48use tracing::Instrument;
49use uuid::Uuid;
50
51use crate as hyperactor;
52use crate::Actor;
53use crate::ActorRef;
54use crate::Handler;
55use crate::Message;
56use crate::RemoteMessage;
57use crate::actor::ActorError;
58use crate::actor::ActorErrorKind;
59use crate::actor::ActorHandle;
60use crate::actor::ActorStatus;
61use crate::actor::Binds;
62use crate::actor::Referable;
63use crate::actor::RemoteHandles;
64use crate::actor::Signal;
65use crate::attrs::Attrs;
66use crate::channel;
67use crate::channel::ChannelAddr;
68use crate::channel::ChannelError;
69use crate::clock::Clock;
70use crate::clock::ClockKind;
71use crate::clock::RealClock;
72use crate::config;
73use crate::context;
74use crate::data::Serialized;
75use crate::data::TypeInfo;
76use crate::declare_attrs;
77use crate::mailbox::BoxedMailboxSender;
78use crate::mailbox::DeliveryError;
79use crate::mailbox::DialMailboxRouter;
80use crate::mailbox::IntoBoxedMailboxSender as _;
81use crate::mailbox::Mailbox;
82use crate::mailbox::MailboxMuxer;
83use crate::mailbox::MailboxSender;
84use crate::mailbox::MailboxServer as _;
85use crate::mailbox::MessageEnvelope;
86use crate::mailbox::OncePortHandle;
87use crate::mailbox::OncePortReceiver;
88use crate::mailbox::PanickingMailboxSender;
89use crate::mailbox::PortHandle;
90use crate::mailbox::PortReceiver;
91use crate::mailbox::Undeliverable;
92use crate::metrics::ACTOR_MESSAGE_HANDLER_DURATION;
93use crate::metrics::ACTOR_MESSAGE_QUEUE_SIZE;
94use crate::metrics::ACTOR_MESSAGES_RECEIVED;
95use crate::ordering::OrderedSender;
96use crate::ordering::OrderedSenderError;
97use crate::ordering::Sequencer;
98use crate::ordering::ordered_channel;
99use crate::panic_handler;
100use crate::reference::ActorId;
101use crate::reference::Index;
102use crate::reference::PortId;
103use crate::reference::ProcId;
104use crate::reference::id;
105use crate::supervision::ActorSupervisionEvent;
106
107/// This is used to mint new local ranks for [`Proc::local`].
108static NEXT_LOCAL_RANK: AtomicUsize = AtomicUsize::new(0);
109
110/// A proc instance is the runtime managing a single proc in Hyperactor.
111/// It is responsible for spawning actors in the proc, multiplexing messages
112/// to/within actors in the proc, and providing fallback routing to external
113/// procs.
114///
115/// Procs are also responsible for maintaining the local supervision hierarchy.
116#[derive(Clone, Debug)]
117pub struct Proc {
118    inner: Arc<ProcState>,
119}
120
121#[derive(Debug)]
122struct ProcState {
123    /// The proc's id. This should be globally unique, but is not (yet)
124    /// for local-only procs.
125    proc_id: ProcId,
126
127    /// A muxer instance that has entries for every actor managed by
128    /// the proc.
129    proc_muxer: MailboxMuxer,
130
131    /// Sender used to forward messages outside of the proc.
132    forwarder: BoxedMailboxSender,
133
134    /// All of the roots (i.e., named actors with pid=0) in the proc.
135    /// These are also known as "global actors", since they may be
136    /// spawned remotely.
137    roots: DashMap<String, AtomicUsize>,
138
139    /// Keep track of all of the active actors in the proc.
140    ledger: ActorLedger,
141
142    instances: DashMap<ActorId, WeakInstanceCell>,
143
144    /// Used by root actors to send events to the actor coordinating
145    /// supervision of root actors in this proc.
146    supervision_coordinator_port: OnceLock<PortHandle<ActorSupervisionEvent>>,
147
148    clock: ClockKind,
149}
150
151/// A snapshot view of the proc's actor ledger.
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct ActorLedgerSnapshot {
154    /// All the actor trees in the proc, mapping the root id to the root
155    /// of each tree.
156    pub roots: HashMap<ActorId, ActorTreeSnapshot>,
157}
158
159/// A event for one row of log.
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
161pub struct Event {
162    /// Time when the event happend.
163    pub time: SystemTime,
164    /// The payload of the event.
165    pub fields: Vec<(String, recorder::Value)>,
166    /// The sequence number of the event.
167    pub seq: usize,
168}
169
170impl From<recorder::Event> for Event {
171    fn from(event: recorder::Event) -> Event {
172        Event {
173            time: event.time,
174            fields: event.fields(),
175            seq: event.seq,
176        }
177    }
178}
179
180/// A snapshot of an actor tree (rooted at a pid=0 actor).
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182pub struct ActorTreeSnapshot {
183    /// The PID of this actor.
184    pub pid: Index,
185
186    /// The type name of the actor. If the actor is [`crate::Named`], then
187    /// this is the registered name; otherwise it is the actor type's
188    /// [`std::any::type_name`].
189    pub type_name: String,
190
191    /// The actor's current status.
192    pub status: ActorStatus,
193
194    /// Various operational stats for the actor.
195    pub stats: ActorStats,
196
197    /// This actor's handlers, mapping port numbers to the named type handled.
198    pub handlers: HashMap<u64, String>,
199
200    /// This actor's children.
201    pub children: HashMap<Index, ActorTreeSnapshot>,
202
203    /// Recent events emitted by the actor's logging.
204    pub events: Vec<Event>,
205
206    /// The current set of spans entered by the actor. These should be active
207    /// only while the actor is entered in a handler.
208    pub spans: Vec<Vec<String>>,
209}
210
211impl Hash for ActorTreeSnapshot {
212    fn hash<H: Hasher>(&self, state: &mut H) {
213        self.pid.hash(state);
214    }
215}
216
217/// Operational stats for an actor instance.
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
219#[derive(Default)]
220pub struct ActorStats {
221    /// The number of messages processed by the actor.
222    num_processed_messages: u64,
223}
224
225impl fmt::Display for ActorStats {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        write!(f, "num_processed_messages={}", self.num_processed_messages)
228    }
229}
230
231#[derive(Debug)]
232struct ActorLedger {
233    // Root actors. Map's value is its key's InstanceCell.
234    roots: DashMap<ActorId, WeakInstanceCell>,
235}
236
237impl ActorLedger {
238    fn new() -> Self {
239        Self {
240            roots: DashMap::new(),
241        }
242    }
243
244    fn insert(
245        &self,
246        root_actor_id: ActorId,
247        root_actor_cell: WeakInstanceCell,
248    ) -> Result<(), anyhow::Error> {
249        match self.roots.insert(root_actor_id.clone(), root_actor_cell) {
250            None => Ok(()),
251            // This should never happen because we do not recycle root actor's
252            // IDs.
253            Some(current_cell) => {
254                let debugging_msg = match current_cell.upgrade() {
255                    Some(cell) => format!("the stored cell's actor ID is {}", cell.actor_id()),
256                    None => "the stored cell has been dropped".to_string(),
257                };
258
259                Err(anyhow::anyhow!(
260                    "actor '{root_actor_id}' has already been added to ledger: {debugging_msg}"
261                ))
262            }
263        }
264    }
265
266    /// Get a snapshot view of this ledger.
267    fn snapshot(&self) -> ActorLedgerSnapshot {
268        let roots = self
269            .roots
270            .iter()
271            .flat_map(|r| {
272                let (actor_id, weak_cell) = r.pair();
273                // The actor might have been stopped or errored out. Since we do
274                // not remove inactive actors from ledger, the upgrade() call
275                // will return None in that scenario.
276                weak_cell
277                    .upgrade()
278                    .map(|cell| (actor_id.clone(), Self::get_actor_tree_snapshot(&cell)))
279            })
280            .collect();
281
282        ActorLedgerSnapshot { roots }
283    }
284
285    fn get_actor_tree_snapshot(cell: &InstanceCell) -> ActorTreeSnapshot {
286        // Get the edges between this actor and its children.
287        let children = cell
288            .child_iter()
289            .map(|child| (child.pid(), Self::get_actor_tree_snapshot(child.value())))
290            .collect();
291
292        ActorTreeSnapshot {
293            pid: cell.actor_id().pid(),
294            type_name: cell.inner.actor_type.type_name().to_string(),
295            status: cell.status().borrow().clone(),
296            stats: ActorStats {
297                num_processed_messages: cell.inner.num_processed_messages.load(Ordering::SeqCst),
298            },
299            handlers: cell
300                .inner
301                .exported_named_ports
302                .iter()
303                .map(|entry| (*entry.key(), entry.value().to_string()))
304                .collect(),
305            children,
306            events: cell
307                .inner
308                .recording
309                .tail()
310                .into_iter()
311                .map(Event::from)
312                .collect(),
313            spans: cell
314                .inner
315                .recording
316                .stacks()
317                .into_iter()
318                .map(|stack| {
319                    stack
320                        .into_iter()
321                        .map(|meta| meta.name().to_string())
322                        .collect()
323                })
324                .collect(),
325        }
326    }
327}
328
329impl Proc {
330    /// Create a new proc with the given proc id and forwarder.
331    pub fn new(proc_id: ProcId, forwarder: BoxedMailboxSender) -> Self {
332        Self::new_with_clock(proc_id, forwarder, ClockKind::default())
333    }
334
335    /// Create a new direct-addressed proc.
336    pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
337        let (addr, rx) = channel::serve(addr)?;
338        let proc_id = ProcId::Direct(addr, name);
339        let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
340        proc.clone().serve(rx);
341        Ok(proc)
342    }
343
344    /// Create a new direct-addressed proc with a default sender for the forwarder.
345    pub fn direct_with_default(
346        addr: ChannelAddr,
347        name: String,
348        default: BoxedMailboxSender,
349    ) -> Result<Self, ChannelError> {
350        let (addr, rx) = channel::serve(addr)?;
351        let proc_id = ProcId::Direct(addr, name);
352        let proc = Self::new(
353            proc_id,
354            DialMailboxRouter::new_with_default(default).into_boxed(),
355        );
356        proc.clone().serve(rx);
357        Ok(proc)
358    }
359
360    /// Create a new proc with the given proc id, forwarder and clock kind.
361    pub fn new_with_clock(
362        proc_id: ProcId,
363        forwarder: BoxedMailboxSender,
364        clock: ClockKind,
365    ) -> Self {
366        Self {
367            inner: Arc::new(ProcState {
368                proc_id,
369                proc_muxer: MailboxMuxer::new(),
370                forwarder,
371                roots: DashMap::new(),
372                ledger: ActorLedger::new(),
373                instances: DashMap::new(),
374                supervision_coordinator_port: OnceLock::new(),
375                clock,
376            }),
377        }
378    }
379
380    /// Set the supervision coordinator's port for this proc. Return Err if it is
381    /// already set.
382    pub fn set_supervision_coordinator(
383        &self,
384        port: PortHandle<ActorSupervisionEvent>,
385    ) -> Result<(), anyhow::Error> {
386        self.state()
387            .supervision_coordinator_port
388            .set(port)
389            .map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
390    }
391
392    fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
393        let result = match self.state().supervision_coordinator_port.get() {
394            Some(port) => port.send(event).map_err(anyhow::Error::from),
395            None => Err(anyhow::anyhow!(
396                "coordinator port is not set for proc {}",
397                self.proc_id()
398            )),
399        };
400        if let Err(err) = result {
401            tracing::error!(
402                "proc {}: could not propagate supervision event: {:?}: crashing",
403                self.proc_id(),
404                err
405            );
406
407            std::process::exit(1);
408        }
409    }
410
411    /// Create a new local-only proc. This proc is not allowed to forward messages
412    /// outside of the proc itself.
413    pub fn local() -> Self {
414        // TODO: name these something that is ~ globally unique, e.g., incorporate
415        // the hostname, some GUID, etc.
416        let proc_id = ProcId::Ranked(id!(local), NEXT_LOCAL_RANK.fetch_add(1, Ordering::Relaxed));
417        // TODO: make it so that local procs can talk to each other.
418        Proc::new(proc_id, BoxedMailboxSender::new(PanickingMailboxSender))
419    }
420
421    /// The proc's ID.
422    pub fn proc_id(&self) -> &ProcId {
423        &self.state().proc_id
424    }
425
426    /// Shared sender used by the proc to forward messages to remote
427    /// destinations.
428    pub fn forwarder(&self) -> &BoxedMailboxSender {
429        &self.inner.forwarder
430    }
431
432    /// Convenience accessor for state.
433    fn state(&self) -> &ProcState {
434        self.inner.as_ref()
435    }
436
437    /// The proc's clock.
438    pub fn clock(&self) -> &ClockKind {
439        &self.state().clock
440    }
441
442    /// Get the snapshot of the ledger.
443    pub fn ledger_snapshot(&self) -> ActorLedgerSnapshot {
444        self.state().ledger.snapshot()
445    }
446
447    /// Attach a mailbox to the proc with the provided root name.
448    pub fn attach(&self, name: &str) -> Result<Mailbox, anyhow::Error> {
449        let actor_id: ActorId = self.allocate_root_id(name)?;
450        Ok(self.bind_mailbox(actor_id))
451    }
452
453    /// Attach a mailbox to the proc as a child actor.
454    pub fn attach_child(&self, parent_id: &ActorId) -> Result<Mailbox, anyhow::Error> {
455        let actor_id: ActorId = self.allocate_child_id(parent_id)?;
456        Ok(self.bind_mailbox(actor_id))
457    }
458
459    /// Bind a mailbox to the proc.
460    fn bind_mailbox(&self, actor_id: ActorId) -> Mailbox {
461        let mbox = Mailbox::new(actor_id, BoxedMailboxSender::new(self.downgrade()));
462
463        // TODO: T210748165 tie the muxer entry to the lifecycle of the mailbox held
464        // by the caller. This will likely require a weak reference.
465        self.state().proc_muxer.bind_mailbox(mbox.clone());
466        mbox
467    }
468
469    /// Attach a mailbox to the proc with the provided root name, and bind an [`ActorRef`].
470    /// This is intended only for testing, and will be replaced by simpled utilities.
471    pub fn attach_actor<R, M>(
472        &self,
473        name: &str,
474    ) -> Result<(Instance<()>, ActorRef<R>, PortReceiver<M>), anyhow::Error>
475    where
476        M: RemoteMessage,
477        R: Referable + RemoteHandles<M>,
478    {
479        let (instance, _handle) = self.instance(name)?;
480        let (handle, rx) = instance.open_port::<M>();
481        handle.bind_to(M::port());
482        let actor_ref = ActorRef::attest(instance.self_id().clone());
483        Ok((instance, actor_ref, rx))
484    }
485
486    /// Spawn a named (root) actor on this proc. The name of the actor must be
487    /// unique.
488    #[hyperactor::observe_result("Proc")]
489    pub async fn spawn<A: Actor>(
490        &self,
491        name: &str,
492        params: A::Params,
493    ) -> Result<ActorHandle<A>, anyhow::Error> {
494        let actor_id = self.allocate_root_id(name)?;
495        let _ = tracing::debug_span!(
496            "spawn_actor",
497            actor_name = name,
498            actor_type = std::any::type_name::<A>(),
499            actor_id = actor_id.to_string(),
500        );
501        let (instance, mut actor_loop_receivers, work_rx) =
502            Instance::new(self.clone(), actor_id.clone(), false, None);
503        let actor = A::new(params).await?;
504        // Add this actor to the proc's actor ledger. We do not actively remove
505        // inactive actors from ledger, because the actor's state can be inferred
506        // from its weak cell.
507        self.state()
508            .ledger
509            .insert(actor_id.clone(), instance.cell.downgrade())?;
510
511        instance
512            .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
513            .await
514    }
515
516    /// Create and return an actor instance and its corresponding handle. This allows actors to be
517    /// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
518    /// launch child actors, etc. The actor itself does not handle any messages, and supervision events
519    /// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be
520    /// referenced and stopped.
521    pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
522        let actor_id = self.allocate_root_id(name)?;
523        let _ = tracing::debug_span!(
524            "actor_instance",
525            actor_name = name,
526            actor_type = std::any::type_name::<()>(),
527            actor_id = actor_id.to_string(),
528        );
529
530        let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None);
531        let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone());
532
533        instance.change_status(ActorStatus::Client);
534
535        Ok((instance, handle))
536    }
537
538    /// Create a child instance. Called from `Instance`.
539    fn child_instance(
540        &self,
541        parent: InstanceCell,
542    ) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
543        let actor_id = self.allocate_child_id(parent.actor_id())?;
544        let _ = tracing::debug_span!(
545            "child_actor_instance",
546            parent_actor_id = %parent.actor_id(),
547            actor_type = std::any::type_name::<()>(),
548            actor_id = %actor_id,
549        );
550
551        let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent));
552        let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone());
553        instance.change_status(ActorStatus::Client);
554        Ok((instance, handle))
555    }
556
557    /// Spawn a child actor from the provided parent on this proc. The parent actor
558    /// must already belong to this proc, a fact which is asserted in code.
559    ///
560    /// When spawn_child returns, the child has an associated cell and is linked
561    /// with its parent.
562    async fn spawn_child<A: Actor>(
563        &self,
564        parent: InstanceCell,
565        params: A::Params,
566    ) -> Result<ActorHandle<A>, anyhow::Error> {
567        let actor_id = self.allocate_child_id(parent.actor_id())?;
568        let (instance, mut actor_loop_receivers, work_rx) =
569            Instance::new(self.clone(), actor_id, false, Some(parent.clone()));
570        let actor = A::new(params).await?;
571        instance
572            .start(actor, actor_loop_receivers.take().unwrap(), work_rx)
573            .await
574    }
575
576    /// Call `abort` on the `JoinHandle` associated with the given
577    /// root actor. If successful return `Some(root.clone())` else
578    /// `None`.
579    pub fn abort_root_actor(
580        &self,
581        root: &ActorId,
582        this_handle: Option<&JoinHandle<()>>,
583    ) -> Option<impl Future<Output = ActorId>> {
584        self.state()
585            .ledger
586            .roots
587            .get(root)
588            .into_iter()
589            .flat_map(|e| e.upgrade())
590            .map(|cell| {
591                let r1 = root.clone();
592                let r2 = root.clone();
593                // If abort_root_actor was called from inside an actor task, we don't want to abort that actor's task yet.
594                let skip_abort = this_handle.is_some_and(|this_h| {
595                    cell.inner
596                        .actor_task_handle
597                        .get()
598                        .is_some_and(|other_h| std::ptr::eq(this_h, other_h))
599                });
600                // `start` was called on the actor's instance
601                // immediately following `root`'s insertion into the
602                // ledger. `Instance::start()` is infallible and should
603                // complete quickly, so calling `wait()` on `actor_task_handle`
604                // should be safe (i.e., not hang forever).
605                async move {
606                    tokio::task::spawn_blocking(move || {
607                        if !skip_abort {
608                            let h = cell.inner.actor_task_handle.wait();
609                            tracing::debug!("{}: aborting {:?}", r1, h);
610                            h.abort();
611                        }
612                    })
613                    .await
614                    .unwrap();
615                    r2
616                }
617            })
618            .next()
619    }
620
621    /// Signals to a root actor to stop,
622    /// returning a status observer if successful.
623    pub fn stop_actor(&self, actor_id: &ActorId) -> Option<watch::Receiver<ActorStatus>> {
624        if let Some(entry) = self.state().ledger.roots.get(actor_id) {
625            match entry.value().upgrade() {
626                None => None, // the root's cell has been dropped
627                Some(cell) => {
628                    tracing::info!("sending stop signal to {}", cell.actor_id());
629                    if let Err(err) = cell.signal(Signal::DrainAndStop) {
630                        tracing::error!(
631                            "{}: failed to send stop signal to pid {}: {:?}",
632                            self.proc_id(),
633                            cell.pid(),
634                            err
635                        );
636                        None
637                    } else {
638                        Some(cell.status().clone())
639                    }
640                }
641            }
642        } else {
643            tracing::error!("no actor {} found in {} roots", actor_id, self.proc_id());
644            None
645        }
646    }
647
648    /// Stop the proc. Returns a pair of:
649    /// - the actors observed to stop;
650    /// - the actors not observed to stop when timeout.
651    ///
652    /// If `cx` is specified, it means this method was called from inside an actor
653    /// in which case we shouldn't wait for it to stop and need to delay aborting
654    /// its task.
655    #[hyperactor::instrument]
656    pub async fn destroy_and_wait<A: Actor>(
657        &mut self,
658        timeout: Duration,
659        cx: Option<&Context<'_, A>>,
660    ) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
661        tracing::debug!("{}: proc stopping", self.proc_id());
662
663        let (this_handle, this_actor_id) = cx.map_or((None, None), |cx| {
664            (
665                Some(cx.actor_task_handle().expect("cannot call destroy_and_wait from inside an actor unless actor has finished starting")),
666                Some(cx.self_id())
667            )
668        });
669
670        let mut statuses = HashMap::new();
671        for actor_id in self
672            .state()
673            .ledger
674            .roots
675            .iter()
676            .map(|entry| entry.key().clone())
677            .collect::<Vec<_>>()
678        {
679            if let Some(status) = self.stop_actor(&actor_id) {
680                statuses.insert(actor_id, status);
681            }
682        }
683        tracing::debug!("{}: proc stopped", self.proc_id());
684
685        let waits: Vec<_> = statuses
686            .iter_mut()
687            .filter(|(actor_id, _)| Some(*actor_id) != this_actor_id)
688            .map(|(actor_id, root)| {
689                let actor_id = actor_id.clone();
690                async move {
691                    RealClock
692                        .timeout(
693                            timeout,
694                            root.wait_for(|state: &ActorStatus| {
695                                matches!(*state, ActorStatus::Stopped)
696                            }),
697                        )
698                        .await
699                        .ok()
700                        .map(|_| actor_id)
701                }
702            })
703            .collect();
704
705        let results = futures::future::join_all(waits).await;
706        let stopped_actors: Vec<_> = results
707            .iter()
708            .filter_map(|actor_id| actor_id.as_ref())
709            .cloned()
710            .collect();
711        let aborted_actors: Vec<_> = statuses
712            .iter()
713            .filter(|(actor_id, _)| !stopped_actors.contains(actor_id))
714            .map(|(actor_id, _)| {
715                let f = self.abort_root_actor(actor_id, this_handle);
716                async move {
717                    let _ = if let Some(f) = f { Some(f.await) } else { None };
718                    // If `is_none(&_)` then the proc's `ledger.roots`
719                    // contains an entry that wasn't a root or, the
720                    // associated actor's instance cell was already
721                    // dropped when we went to call `abort()` on the
722                    // cell's task handle.
723
724                    actor_id.clone()
725                }
726            })
727            .collect();
728        let aborted_actors = futures::future::join_all(aborted_actors).await;
729
730        if let Some(this_handle) = this_handle
731            && let Some(this_actor_id) = this_actor_id
732        {
733            tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
734            this_handle.abort()
735        };
736
737        tracing::info!(
738            "destroy_and_wait: {} actors stopped, {} actors aborted",
739            stopped_actors.len(),
740            aborted_actors.len()
741        );
742        Ok((stopped_actors, aborted_actors))
743    }
744
745    /// Resolve an actor reference to an actor residing on this proc.
746    /// Returns None if the actor is not found on this proc.
747    ///
748    /// Bounds:
749    /// - `R: Actor` — must be a real actor that can live in this
750    ///   proc.
751    /// - `R: Referable` — required because the input is an
752    ///   `ActorRef<R>`.
753    pub fn resolve_actor_ref<R: Actor + Referable>(
754        &self,
755        actor_ref: &ActorRef<R>,
756    ) -> Option<ActorHandle<R>> {
757        self.inner
758            .instances
759            .get(actor_ref.actor_id())?
760            .upgrade()?
761            .downcast_handle()
762    }
763
764    /// Create a root allocation in the proc.
765    #[hyperactor::instrument(fields(actor_name=name))]
766    fn allocate_root_id(&self, name: &str) -> Result<ActorId, anyhow::Error> {
767        let name = name.to_string();
768        match self.state().roots.entry(name.to_string()) {
769            Entry::Vacant(entry) => {
770                entry.insert(AtomicUsize::new(1));
771            }
772            Entry::Occupied(_) => {
773                anyhow::bail!("an actor with name '{}' has already been spawned", name)
774            }
775        }
776        Ok(ActorId(self.state().proc_id.clone(), name.to_string(), 0))
777    }
778
779    /// Create a child allocation in the proc.
780    #[hyperactor::instrument(fields(actor_name=parent_id.name()))]
781    pub(crate) fn allocate_child_id(&self, parent_id: &ActorId) -> Result<ActorId, anyhow::Error> {
782        assert_eq!(*parent_id.proc_id(), self.state().proc_id);
783        let pid = match self.state().roots.get(parent_id.name()) {
784            None => anyhow::bail!(
785                "no actor named {} in proc {}",
786                parent_id.name(),
787                self.state().proc_id
788            ),
789            Some(next_pid) => next_pid.fetch_add(1, Ordering::Relaxed),
790        };
791        Ok(parent_id.child_id(pid))
792    }
793
794    fn downgrade(&self) -> WeakProc {
795        WeakProc::new(self)
796    }
797}
798
799#[async_trait]
800impl MailboxSender for Proc {
801    fn post_unchecked(
802        &self,
803        envelope: MessageEnvelope,
804        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
805    ) {
806        if envelope.dest().actor_id().proc_id() == &self.state().proc_id {
807            self.state().proc_muxer.post(envelope, return_handle)
808        } else {
809            self.state().forwarder.post(envelope, return_handle)
810        }
811    }
812}
813
814#[derive(Debug)]
815struct WeakProc(Weak<ProcState>);
816
817impl WeakProc {
818    fn new(proc: &Proc) -> Self {
819        Self(Arc::downgrade(&proc.inner))
820    }
821
822    fn upgrade(&self) -> Option<Proc> {
823        self.0.upgrade().map(|inner| Proc { inner })
824    }
825}
826
827#[async_trait]
828impl MailboxSender for WeakProc {
829    fn post_unchecked(
830        &self,
831        envelope: MessageEnvelope,
832        return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
833    ) {
834        match self.upgrade() {
835            Some(proc) => proc.post(envelope, return_handle),
836            None => envelope.undeliverable(
837                DeliveryError::BrokenLink("fail to upgrade WeakProc".to_string()),
838                return_handle,
839            ),
840        }
841    }
842}
843
844/// Represents a single work item used by the instance to dispatch to
845/// actor handles. Specifically, this enables handler polymorphism.
846struct WorkCell<A: Actor + Send>(
847    Box<
848        dyn for<'a> FnOnce(
849                &'a mut A,
850                &'a mut Instance<A>,
851            )
852                -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
853            + Send
854            + Sync,
855    >,
856);
857
858impl<A: Actor + Send> WorkCell<A> {
859    /// Create a new WorkCell from a concrete function (closure).
860    fn new(
861        f: impl for<'a> FnOnce(
862            &'a mut A,
863            &'a mut Instance<A>,
864        )
865            -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
866        + Send
867        + Sync
868        + 'static,
869    ) -> Self {
870        Self(Box::new(f))
871    }
872
873    /// Handle the message represented by this work cell.
874    fn handle<'a>(
875        self,
876        actor: &'a mut A,
877        instance: &'a mut Instance<A>,
878    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
879        (self.0)(actor, instance)
880    }
881}
882
883/// Context for a message currently being handled by an Instance.
884pub struct Context<'a, A: Actor> {
885    instance: &'a Instance<A>,
886    headers: Attrs,
887}
888
889impl<'a, A: Actor> Context<'a, A> {
890    /// Construct a new Context.
891    pub fn new(instance: &'a Instance<A>, headers: Attrs) -> Self {
892        Self { instance, headers }
893    }
894
895    /// Get a reference to the message headers.
896    pub fn headers(&self) -> &Attrs {
897        &self.headers
898    }
899}
900
901impl<A: Actor> Deref for Context<'_, A> {
902    type Target = Instance<A>;
903
904    fn deref(&self) -> &Self::Target {
905        self.instance
906    }
907}
908
909/// An actor instance. This is responsible for managing a running actor, including
910/// its full lifecycle, supervision, signal management, etc. Instances can represent
911/// a managed actor or a "client" actor that has joined the proc.
912pub struct Instance<A: Actor> {
913    /// The proc that owns this instance.
914    proc: Proc,
915
916    /// The instance cell that manages instance hierarchy.
917    cell: InstanceCell,
918
919    /// The mailbox associated with the actor.
920    mailbox: Mailbox,
921
922    ports: Arc<Ports<A>>,
923
924    /// A watch for communicating the actor's state.
925    status_tx: watch::Sender<ActorStatus>,
926
927    /// This instance's globally unique ID.
928    id: Uuid,
929
930    /// Used to assign sequence numbers for messages sent from this actor.
931    sequencer: Sequencer,
932}
933
934impl<A: Actor> Instance<A> {
935    /// Create a new actor instance in Created state.
936    fn new(
937        proc: Proc,
938        actor_id: ActorId,
939        detached: bool,
940        parent: Option<InstanceCell>,
941    ) -> (
942        Self,
943        Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
944        mpsc::UnboundedReceiver<WorkCell<A>>,
945    ) {
946        // Set up messaging
947        let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade()));
948        let (work_tx, work_rx) = ordered_channel(
949            actor_id.to_string(),
950            config::global::get(config::ENABLE_CLIENT_SEQ_ASSIGNMENT),
951        );
952        let ports: Arc<Ports<A>> = Arc::new(Ports::new(mailbox.clone(), work_tx));
953        proc.state().proc_muxer.bind_mailbox(mailbox.clone());
954        let (status_tx, status_rx) = watch::channel(ActorStatus::Created);
955
956        let actor_type = match TypeInfo::of::<A>() {
957            Some(info) => ActorType::Named(info),
958            None => ActorType::Anonymous(std::any::type_name::<A>()),
959        };
960        let actor_loop_ports = if detached {
961            None
962        } else {
963            let (signal_port, signal_receiver) = ports.open_message_port().unwrap();
964            let (supervision_port, supervision_receiver) = mailbox.open_port();
965            Some((
966                (signal_port, supervision_port),
967                (signal_receiver, supervision_receiver),
968            ))
969        };
970
971        let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip();
972
973        let cell = InstanceCell::new(
974            actor_id,
975            actor_type,
976            proc.clone(),
977            actor_loop,
978            status_rx,
979            parent,
980            ports.clone(),
981        );
982        let instance_id = Uuid::now_v7();
983        (
984            Self {
985                proc,
986                cell,
987                mailbox,
988                ports,
989                status_tx,
990                sequencer: Sequencer::new(instance_id),
991                id: instance_id,
992            },
993            actor_loop_receivers,
994            work_rx,
995        )
996    }
997
998    /// Notify subscribers of a change in the actors status and bump counters with the duration which
999    /// the last status was active for.
1000    fn change_status(&self, new: ActorStatus) {
1001        self.status_tx.send_replace(new.clone());
1002    }
1003
1004    /// This instance's actor ID.
1005    pub fn self_id(&self) -> &ActorId {
1006        self.mailbox.actor_id()
1007    }
1008
1009    /// Signal the actor to stop.
1010    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
1011    pub fn stop(&self) -> Result<(), ActorError> {
1012        tracing::info!("Instance::stop called, {}", self.cell.actor_id());
1013        self.cell.signal(Signal::DrainAndStop)
1014    }
1015
1016    /// Open a new port that accepts M-typed messages. The returned
1017    /// port may be freely cloned, serialized, and passed around. The
1018    /// returned receiver should only be retained by the actor responsible
1019    /// for processing the delivered messages.
1020    pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
1021        self.mailbox.open_port()
1022    }
1023
1024    /// Open a new one-shot port that accepts M-typed messages. The
1025    /// returned port may be used to send a single message; ditto the
1026    /// receiver may receive a single message.
1027    pub fn open_once_port<M: Message>(&self) -> (OncePortHandle<M>, OncePortReceiver<M>) {
1028        self.mailbox.open_once_port()
1029    }
1030
1031    /// Send a message to the actor running on the proc.
1032    pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
1033        <Self as context::MailboxExt>::post(self, port_id, headers, message)
1034    }
1035
1036    /// Send a message to the actor itself with a delay usually to trigger some event.
1037    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
1038    pub fn self_message_with_delay<M>(&self, message: M, delay: Duration) -> Result<(), ActorError>
1039    where
1040        M: Message,
1041        A: Handler<M>,
1042    {
1043        let port = self.port();
1044        let self_id = self.self_id().clone();
1045        let clock = self.proc.state().clock.clone();
1046        tokio::spawn(async move {
1047            clock.non_advancing_sleep(delay).await;
1048            if let Err(e) = port.send(message) {
1049                // TODO: this is a fire-n-forget thread. We need to
1050                // handle errors in a better way.
1051                tracing::info!("{}: error sending delayed message: {}", self_id, e);
1052            }
1053        });
1054        Ok(())
1055    }
1056
1057    /// Start an A-typed actor onto this instance with the provided params. When spawn returns,
1058    /// the actor has been linked with its parent, if it has one.
1059    #[hyperactor::instrument(fields(actor_id=self.cell.actor_id().clone().to_string(), actor_name=self.cell.actor_id().name()))]
1060    async fn start(
1061        self,
1062        actor: A,
1063        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1064        work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1065    ) -> Result<ActorHandle<A>, anyhow::Error> {
1066        let instance_cell = self.cell.clone();
1067        let actor_id = self.cell.actor_id().clone();
1068        let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone());
1069        let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking(
1070            self.serve(actor, actor_loop_receivers, work_rx),
1071        ));
1072        tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle);
1073        instance_cell
1074            .inner
1075            .actor_task_handle
1076            .set(actor_task_handle)
1077            .unwrap_or_else(|_| panic!("{}: task handle store failed", actor_id));
1078
1079        Ok(actor_handle)
1080    }
1081
1082    async fn serve(
1083        mut self,
1084        mut actor: A,
1085        actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1086        mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
1087    ) {
1088        // `run_actor_tree` borrows `work_rx` instead of taking ownership because
1089        // `work_rx` needs to remain alive until this function returns. If the owning
1090        // proc's `supervision_coordinator_port` is a port on this instance, if `work_rx`
1091        // is dropped before `self.proc.handle_supervision_event` is called, the process
1092        // will exit due to a "channel closed" failure.
1093        let result = self
1094            .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
1095            .await;
1096
1097        let (actor_status, event) = match result {
1098            Ok(_) => (ActorStatus::Stopped, None),
1099            Err(ActorError {
1100                kind: ActorErrorKind::UnhandledSupervisionEvent(event),
1101                ..
1102            }) => (event.actor_status.clone(), Some(event)),
1103            Err(err) => (
1104                ActorStatus::Failed(err.to_string()),
1105                Some(ActorSupervisionEvent::new(
1106                    self.cell.actor_id().clone(),
1107                    ActorStatus::Failed(err.to_string()),
1108                    None,
1109                    None,
1110                )),
1111            ),
1112        };
1113
1114        if let Some(parent) = self.cell.maybe_unlink_parent() {
1115            if let Some(event) = event {
1116                // Parent exists, failure should be propagated to the parent.
1117                parent.send_supervision_event_or_crash(event);
1118            }
1119            // TODO: we should get rid of this signal, and use *only* supervision events for
1120            // the purpose of conveying lifecycle changes
1121            if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
1122                tracing::error!(
1123                    "{}: failed to send stop message to parent pid {}: {:?}",
1124                    self.self_id(),
1125                    parent.pid(),
1126                    err
1127                );
1128            }
1129        } else {
1130            // Failure happened to the root actor or orphaned child actors.
1131            // In either case, the failure should be propagated to proc.
1132            //
1133            // Note that orphaned actor is unexpected and would only happen if
1134            // there is a bug.
1135            if let Some(event) = event {
1136                self.proc.handle_supervision_event(event);
1137            }
1138        }
1139        self.change_status(actor_status);
1140    }
1141
1142    /// Runs the actor, and manages its supervision tree. When the function returns,
1143    /// the whole tree rooted at this actor has stopped.
1144    async fn run_actor_tree(
1145        &mut self,
1146        actor: &mut A,
1147        mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1148        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1149    ) -> Result<(), ActorError> {
1150        // It is okay to catch all panics here, because we are in a tokio task,
1151        // and tokio will catch the panic anyway:
1152        // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic
1153        // What we do here is just to catch it early so we can handle it.
1154
1155        let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers, work_rx))
1156            .catch_unwind()
1157            .await
1158        {
1159            Ok(result) => result,
1160            Err(err) => {
1161                // This is only the error message. Backtrace is not included.
1162                let err_msg = err
1163                    .downcast_ref::<&str>()
1164                    .copied()
1165                    .or_else(|| err.downcast_ref::<String>().map(|s| s.as_str()))
1166                    .unwrap_or("panic cannot be downcasted");
1167
1168                let backtrace = panic_handler::take_panic_backtrace()
1169                    .unwrap_or_else(|e| format!("Cannot take backtrace due to: {:?}", e));
1170                Err(ActorError::new(
1171                    self.self_id().clone(),
1172                    ActorErrorKind::Panic(anyhow::anyhow!("{}\n{}", err_msg, backtrace)),
1173                ))
1174            }
1175        };
1176
1177        if let Err(ref err) = result {
1178            tracing::error!("{}: actor failure: {}", self.self_id(), err);
1179        }
1180        self.change_status(ActorStatus::Stopping);
1181
1182        // After this point, we know we won't spawn any more children,
1183        // so we can safely read the current child keys.
1184        let mut to_unlink = Vec::new();
1185        for child in self.cell.child_iter() {
1186            if let Err(err) = child.value().signal(Signal::Stop) {
1187                tracing::error!(
1188                    "{}: failed to send stop signal to child pid {}: {:?}",
1189                    self.self_id(),
1190                    child.key(),
1191                    err
1192                );
1193                to_unlink.push(child.value().clone());
1194            }
1195        }
1196        // Manually unlink children that have already been stopped.
1197        for child in to_unlink {
1198            self.cell.unlink(&child);
1199        }
1200
1201        let (mut signal_receiver, _) = actor_loop_receivers;
1202        while self.cell.child_count() > 0 {
1203            if let Signal::ChildStopped(pid) = signal_receiver.recv().await? {
1204                assert!(self.cell.get_child(pid).is_none());
1205            }
1206        }
1207
1208        result
1209    }
1210
1211    /// Initialize and run the actor until it fails or is stopped.
1212    #[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))]
1213    async fn run(
1214        &mut self,
1215        actor: &mut A,
1216        actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
1217        work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
1218    ) -> Result<(), ActorError> {
1219        let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
1220
1221        self.change_status(ActorStatus::Initializing);
1222        actor
1223            .init(self)
1224            .await
1225            .map_err(|err| ActorError::new(self.self_id().clone(), ActorErrorKind::Init(err)))?;
1226        let need_drain;
1227        'messages: loop {
1228            self.change_status(ActorStatus::Idle);
1229            let metric_pairs =
1230                hyperactor_telemetry::kv_pairs!("actor_id" => self.self_id().to_string());
1231            tokio::select! {
1232                work = work_rx.recv() => {
1233                    ACTOR_MESSAGES_RECEIVED.add(1, metric_pairs);
1234                    ACTOR_MESSAGE_QUEUE_SIZE.add(-1, metric_pairs);
1235                    let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
1236                    let work = work.expect("inconsistent work queue state");
1237                    if let Err(err) = work.handle(actor, self).await {
1238                        for supervision_event in supervision_event_receiver.drain() {
1239                            self.handle_supervision_event(actor, supervision_event).await?;
1240                        }
1241                        return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
1242                    }
1243                }
1244                signal = signal_receiver.recv() => {
1245                    let signal = signal.map_err(ActorError::from);
1246                    tracing::debug!("Received signal {signal:?}");
1247                    match signal? {
1248                        signal@(Signal::Stop | Signal::DrainAndStop) => {
1249                            need_drain = matches!(signal, Signal::DrainAndStop);
1250                            break 'messages;
1251                        },
1252                        Signal::ChildStopped(pid) => {
1253                            assert!(self.cell.get_child(pid).is_none());
1254                        },
1255                    }
1256                }
1257                Ok(supervision_event) = supervision_event_receiver.recv() => {
1258                    self.handle_supervision_event(actor, supervision_event).await?;
1259                }
1260            }
1261            self.cell
1262                .inner
1263                .num_processed_messages
1264                .fetch_add(1, Ordering::SeqCst);
1265        }
1266
1267        if need_drain {
1268            self.change_status(ActorStatus::Stopping);
1269            let mut n = 0;
1270            while let Ok(work) = work_rx.try_recv() {
1271                if let Err(err) = work.handle(actor, self).await {
1272                    return Err(ActorError::new(
1273                        self.self_id().clone(),
1274                        ActorErrorKind::Processing(err),
1275                    ));
1276                }
1277                n += 1;
1278            }
1279            tracing::debug!("drained {} messages", n);
1280        }
1281        tracing::debug!("exited actor loop: {}", self.self_id());
1282        self.change_status(ActorStatus::Stopped);
1283        Ok(())
1284    }
1285
1286    async fn handle_supervision_event(
1287        &self,
1288        actor: &mut A,
1289        supervision_event: ActorSupervisionEvent,
1290    ) -> Result<(), ActorError> {
1291        // Handle the supervision event with the current actor.
1292        match actor
1293            .handle_supervision_event(self, &supervision_event)
1294            .await
1295        {
1296            Ok(true) => {
1297                // The supervision event was handled by this actor, nothing more to do.
1298                Ok(())
1299            }
1300            Ok(false) => {
1301                // The supervision event wasn't handled by this actor, chain it and bubble it up.
1302                let supervision_event = ActorSupervisionEvent::new(
1303                    self.self_id().clone(),
1304                    ActorStatus::Failed("did not handle supervision event".to_string()),
1305                    None,
1306                    Some(Box::new(supervision_event)),
1307                );
1308                Err(supervision_event.into())
1309            }
1310            Err(err) => {
1311                // The actor failed to handle the supervision event, it should die.
1312                // Create a new supervision event for this failure and propagate it.
1313                let supervision_event = ActorSupervisionEvent::new(
1314                    self.self_id().clone(),
1315                    ActorStatus::Failed(format!("failed to handle supervision event: {}", err)),
1316                    None,
1317                    Some(Box::new(supervision_event)),
1318                );
1319                Err(supervision_event.into())
1320            }
1321        }
1322    }
1323
1324    async unsafe fn handle_message<M: Message>(
1325        &mut self,
1326        actor: &mut A,
1327        type_info: Option<&'static TypeInfo>,
1328        headers: Attrs,
1329        message: M,
1330    ) -> Result<(), anyhow::Error>
1331    where
1332        A: Handler<M>,
1333    {
1334        let handler = type_info.map(|info| {
1335            (
1336                info.typename().to_string(),
1337                // SAFETY: The caller promises to pass the correct type info.
1338                unsafe {
1339                    info.arm_unchecked(&message as *const M as *const ())
1340                        .map(str::to_string)
1341                },
1342            )
1343        });
1344
1345        self.change_status(ActorStatus::Processing(
1346            self.clock().system_time_now(),
1347            handler,
1348        ));
1349        crate::mailbox::headers::log_message_latency_if_sampling(
1350            &headers,
1351            self.self_id().to_string(),
1352        );
1353        let span = tracing::debug_span!(
1354            "actor_status",
1355            actor_id = self.self_id().to_string(),
1356            actor_name = self.self_id().name(),
1357            name = self.cell.status().borrow().to_string(),
1358        );
1359
1360        let context = Context::new(self, headers);
1361        // Pass a reference to the context to the handler, so that deref
1362        // coercion allows the `this` argument to be treated exactly like
1363        // &Instance<A>.
1364        actor.handle(&context, message).instrument(span).await
1365    }
1366
1367    // Spawn on child on this instance. Currently used only by cap::CanSpawn.
1368    pub(crate) async fn spawn<C: Actor>(
1369        &self,
1370        params: C::Params,
1371    ) -> anyhow::Result<ActorHandle<C>> {
1372        self.proc.spawn_child(self.cell.clone(), params).await
1373    }
1374
1375    /// Create a new direct child instance.
1376    pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> {
1377        self.proc.child_instance(self.cell.clone())
1378    }
1379
1380    /// Return a handle port handle representing the actor's message
1381    /// handler for M-typed messages.
1382    pub fn port<M: Message>(&self) -> PortHandle<M>
1383    where
1384        A: Handler<M>,
1385    {
1386        self.ports.get()
1387    }
1388
1389    /// The [`ActorHandle`] corresponding to this instance.
1390    pub fn handle(&self) -> ActorHandle<A> {
1391        ActorHandle::new(self.cell.clone(), Arc::clone(&self.ports))
1392    }
1393
1394    /// The owning actor ref.
1395    pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
1396        self.cell.bind(self.ports.as_ref())
1397    }
1398
1399    // Temporary in order to support python bindings.
1400    #[doc(hidden)]
1401    pub fn mailbox_for_py(&self) -> &Mailbox {
1402        &self.mailbox
1403    }
1404
1405    /// A reference to the proc's clock
1406    pub fn clock(&self) -> &(impl Clock + use<A>) {
1407        &self.proc.state().clock
1408    }
1409
1410    /// The owning proc.
1411    pub fn proc(&self) -> &Proc {
1412        &self.proc
1413    }
1414
1415    /// Clone this Instance to get an owned struct that can be
1416    /// plumbed through python. This should really only be called
1417    /// for the explicit purpose of being passed into python
1418    #[doc(hidden)]
1419    pub fn clone_for_py(&self) -> Self {
1420        Self {
1421            proc: self.proc.clone(),
1422            cell: self.cell.clone(),
1423            mailbox: self.mailbox.clone(),
1424            ports: self.ports.clone(),
1425            status_tx: self.status_tx.clone(),
1426            sequencer: self.sequencer.clone(),
1427            id: self.id,
1428        }
1429    }
1430
1431    /// Get the join handle associated with this actor.
1432    fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1433        self.cell.inner.actor_task_handle.get()
1434    }
1435
1436    /// Return this instance's sequencer.
1437    pub fn sequencer(&self) -> &Sequencer {
1438        &self.sequencer
1439    }
1440
1441    /// Return this instance's ID.
1442    pub fn instance_id(&self) -> Uuid {
1443        self.id
1444    }
1445}
1446
1447impl<A: Actor> Drop for Instance<A> {
1448    fn drop(&mut self) {
1449        self.status_tx.send_if_modified(|status| {
1450            if status.is_terminal() {
1451                false
1452            } else {
1453                *status = ActorStatus::Stopped;
1454                true
1455            }
1456        });
1457    }
1458}
1459
1460impl<A: Actor> context::Mailbox for Instance<A> {
1461    fn mailbox(&self) -> &Mailbox {
1462        &self.mailbox
1463    }
1464}
1465
1466impl<A: Actor> context::Mailbox for Context<'_, A> {
1467    fn mailbox(&self) -> &Mailbox {
1468        &self.mailbox
1469    }
1470}
1471
1472impl<A: Actor> context::Mailbox for &Instance<A> {
1473    fn mailbox(&self) -> &Mailbox {
1474        &self.mailbox
1475    }
1476}
1477
1478impl<A: Actor> context::Mailbox for &Context<'_, A> {
1479    fn mailbox(&self) -> &Mailbox {
1480        &self.mailbox
1481    }
1482}
1483
1484impl<A: Actor> context::Actor for Instance<A> {
1485    type A = A;
1486    fn instance(&self) -> &Instance<A> {
1487        self
1488    }
1489}
1490
1491impl<A: Actor> context::Actor for Context<'_, A> {
1492    type A = A;
1493    fn instance(&self) -> &Instance<A> {
1494        self
1495    }
1496}
1497
1498impl<A: Actor> context::Actor for &Instance<A> {
1499    type A = A;
1500    fn instance(&self) -> &Instance<A> {
1501        self
1502    }
1503}
1504
1505impl<A: Actor> context::Actor for &Context<'_, A> {
1506    type A = A;
1507    fn instance(&self) -> &Instance<A> {
1508        self
1509    }
1510}
1511
1512#[derive(Debug)]
1513enum ActorType {
1514    Named(&'static TypeInfo),
1515    Anonymous(&'static str),
1516}
1517
1518impl ActorType {
1519    /// The actor's type name.
1520    fn type_name(&self) -> &'static str {
1521        match self {
1522            Self::Named(info) => info.typename(),
1523            Self::Anonymous(name) => name,
1524        }
1525    }
1526}
1527
1528/// InstanceCell contains all of the type-erased, shareable state of an instance.
1529/// Specifically, InstanceCells form a supervision tree, and is used by ActorHandle
1530/// to access the underlying instance.
1531///
1532/// InstanceCell is reference counted and cloneable.
1533#[derive(Clone, Debug)]
1534pub struct InstanceCell {
1535    inner: Arc<InstanceState>,
1536}
1537
1538#[derive(Debug)]
1539struct InstanceState {
1540    /// The actor's id.
1541    actor_id: ActorId,
1542
1543    /// Actor info contains the actor's type information.
1544    actor_type: ActorType,
1545
1546    /// The proc in which the actor is running.
1547    proc: Proc,
1548
1549    /// Control port handles to the actor loop, if one is running.
1550    actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1551
1552    /// An observer that stores the current status of the actor.
1553    status: watch::Receiver<ActorStatus>,
1554
1555    /// A weak reference to this instance's parent.
1556    parent: WeakInstanceCell,
1557
1558    /// This instance's children by their PIDs.
1559    children: DashMap<Index, InstanceCell>,
1560
1561    /// Access to the spawned actor's join handle.
1562    actor_task_handle: OnceLock<JoinHandle<()>>,
1563
1564    /// The set of named ports that are exported by this actor.
1565    exported_named_ports: DashMap<u64, &'static str>,
1566
1567    /// The number of messages processed by this actor.
1568    num_processed_messages: AtomicU64,
1569
1570    /// The log recording associated with this actor. It is used to
1571    /// store a 'flight record' of events while the actor is running.
1572    recording: Recording,
1573
1574    /// A type-erased reference to Ports<A>, which allows us to recover
1575    /// an ActorHandle<A> by downcasting.
1576    ports: Arc<dyn Any + Send + Sync>,
1577}
1578
1579impl InstanceState {
1580    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1581    /// the parent is returned.
1582    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1583        self.parent
1584            .upgrade()
1585            .filter(|parent| parent.inner.unlink(self))
1586    }
1587
1588    /// Unlink this instance from a child.
1589    fn unlink(&self, child: &InstanceState) -> bool {
1590        assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id());
1591        self.children.remove(&child.actor_id.pid()).is_some()
1592    }
1593}
1594
1595impl InstanceCell {
1596    /// Creates a new instance cell with the provided internal state. If a parent
1597    /// is provided, it is linked to this cell.
1598    fn new(
1599        actor_id: ActorId,
1600        actor_type: ActorType,
1601        proc: Proc,
1602        actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
1603        status: watch::Receiver<ActorStatus>,
1604        parent: Option<InstanceCell>,
1605        ports: Arc<dyn Any + Send + Sync>,
1606    ) -> Self {
1607        let _ais = actor_id.to_string();
1608        let cell = Self {
1609            inner: Arc::new(InstanceState {
1610                actor_id: actor_id.clone(),
1611                actor_type,
1612                proc: proc.clone(),
1613                actor_loop,
1614                status,
1615                parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()),
1616                children: DashMap::new(),
1617                actor_task_handle: OnceLock::new(),
1618                exported_named_ports: DashMap::new(),
1619                num_processed_messages: AtomicU64::new(0),
1620                recording: hyperactor_telemetry::recorder().record(64),
1621                ports,
1622            }),
1623        };
1624        cell.maybe_link_parent();
1625        proc.inner
1626            .instances
1627            .insert(actor_id.clone(), cell.downgrade());
1628        cell
1629    }
1630
1631    fn wrap(inner: Arc<InstanceState>) -> Self {
1632        Self { inner }
1633    }
1634
1635    /// The actor's ID.
1636    pub(crate) fn actor_id(&self) -> &ActorId {
1637        &self.inner.actor_id
1638    }
1639
1640    /// The actor's PID.
1641    pub(crate) fn pid(&self) -> Index {
1642        self.inner.actor_id.pid()
1643    }
1644
1645    /// The actor's join handle.
1646    #[allow(dead_code)]
1647    pub(crate) fn actor_task_handle(&self) -> Option<&JoinHandle<()>> {
1648        self.inner.actor_task_handle.get()
1649    }
1650
1651    /// The instance's status observer.
1652    pub(crate) fn status(&self) -> &watch::Receiver<ActorStatus> {
1653        &self.inner.status
1654    }
1655
1656    /// Send a signal to the actor.
1657    #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
1658    pub fn signal(&self, signal: Signal) -> Result<(), ActorError> {
1659        if let Some((signal_port, _)) = &self.inner.actor_loop {
1660            signal_port.send(signal).map_err(ActorError::from)
1661        } else {
1662            tracing::warn!(
1663                "{}: attempted to send signal {} to detached actor",
1664                self.inner.actor_id,
1665                signal
1666            );
1667            Ok(())
1668        }
1669    }
1670
1671    /// Used by this actor's children to send a supervision event to this actor.
1672    /// When it fails to send, we will crash the process. As part of the crash,
1673    /// all the procs and actors running on this process will be terminated
1674    /// forcefully.
1675    ///
1676    /// Note that "let it crash" is the default behavior when a supervision event
1677    /// cannot be delivered upstream. It is the upstream's responsibility to
1678    /// detect and handle crashes.
1679    pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
1680        match &self.inner.actor_loop {
1681            Some((_, supervision_port)) => {
1682                if let Err(err) = supervision_port.send(event) {
1683                    tracing::error!(
1684                        "{}: failed to send supervision event to actor: {:?}. Crash the process.",
1685                        self.actor_id(),
1686                        err
1687                    );
1688                    std::process::exit(1);
1689                }
1690            }
1691            None => {
1692                tracing::error!(
1693                    "{}: failed: {}: cannot send supervision event to detached actor: crashing",
1694                    self.actor_id(),
1695                    event,
1696                );
1697                std::process::exit(1);
1698            }
1699        }
1700    }
1701
1702    /// Downgrade this InstanceCell to a weak reference.
1703    pub fn downgrade(&self) -> WeakInstanceCell {
1704        WeakInstanceCell {
1705            inner: Arc::downgrade(&self.inner),
1706        }
1707    }
1708
1709    /// Link this instance to a new child.
1710    fn link(&self, child: InstanceCell) {
1711        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1712        self.inner.children.insert(child.pid(), child);
1713    }
1714
1715    /// Unlink this instance from a child.
1716    fn unlink(&self, child: &InstanceCell) {
1717        assert_eq!(self.actor_id().proc_id(), child.actor_id().proc_id());
1718        self.inner.children.remove(&child.pid());
1719    }
1720
1721    /// Link this instance to its parent, if it has one.
1722    fn maybe_link_parent(&self) {
1723        if let Some(parent) = self.inner.parent.upgrade() {
1724            parent.link(self.clone());
1725        }
1726    }
1727
1728    /// Unlink this instance from its parent, if it has one. If it was unlinked,
1729    /// the parent is returned.
1730    fn maybe_unlink_parent(&self) -> Option<InstanceCell> {
1731        self.inner.maybe_unlink_parent()
1732    }
1733
1734    /// Get parent instance cell, if it exists.
1735    #[allow(dead_code)]
1736    fn get_parent_cell(&self) -> Option<InstanceCell> {
1737        self.inner.parent.upgrade()
1738    }
1739
1740    /// Return an iterator over this instance's children. This may deadlock if the
1741    /// caller already holds a reference to any item in map.
1742    fn child_iter(&self) -> impl Iterator<Item = RefMulti<'_, Index, InstanceCell>> {
1743        self.inner.children.iter()
1744    }
1745
1746    /// The number of children this instance has.
1747    fn child_count(&self) -> usize {
1748        self.inner.children.len()
1749    }
1750
1751    /// Get a child by its PID.
1752    fn get_child(&self, pid: Index) -> Option<InstanceCell> {
1753        self.inner.children.get(&pid).map(|child| child.clone())
1754    }
1755
1756    /// This is temporary so that we can share binding code between handle and instance.
1757    /// We should find some (better) way to consolidate the two.
1758    pub(crate) fn bind<A: Actor, R: Binds<A>>(&self, ports: &Ports<A>) -> ActorRef<R> {
1759        <R as Binds<A>>::bind(ports);
1760        // All actors handle signals and undeliverable messages.
1761        ports.bind::<Signal>();
1762        ports.bind::<Undeliverable<MessageEnvelope>>();
1763        // TODO: consider sharing `ports.bound` directly.
1764        for entry in ports.bound.iter() {
1765            self.inner
1766                .exported_named_ports
1767                .insert(*entry.key(), entry.value());
1768        }
1769        ActorRef::attest(self.actor_id().clone())
1770    }
1771
1772    /// Attempt to downcast this cell to a concrete actor handle.
1773    pub(crate) fn downcast_handle<A: Actor>(&self) -> Option<ActorHandle<A>> {
1774        let ports = Arc::clone(&self.inner.ports).downcast::<Ports<A>>().ok()?;
1775        Some(ActorHandle::new(self.clone(), ports))
1776    }
1777}
1778
1779impl Drop for InstanceState {
1780    fn drop(&mut self) {
1781        if let Some(parent) = self.maybe_unlink_parent() {
1782            tracing::debug!(
1783                "instance {} was dropped with parent {} still linked",
1784                self.actor_id,
1785                parent.actor_id()
1786            );
1787        }
1788        if self.proc.inner.instances.remove(&self.actor_id).is_none() {
1789            tracing::error!("instance {} was dropped but not in proc", self.actor_id);
1790        }
1791    }
1792}
1793
1794/// A weak version of the InstanceCell. This is used to provide cyclical
1795/// linkage between actors without creating a strong reference cycle.
1796#[derive(Debug, Clone)]
1797pub struct WeakInstanceCell {
1798    inner: Weak<InstanceState>,
1799}
1800
1801impl Default for WeakInstanceCell {
1802    fn default() -> Self {
1803        Self::new()
1804    }
1805}
1806
1807impl WeakInstanceCell {
1808    /// Create a new weak instance cell that is never upgradeable.
1809    pub fn new() -> Self {
1810        Self { inner: Weak::new() }
1811    }
1812
1813    /// Upgrade this weak instance cell to a strong reference, if possible.
1814    pub fn upgrade(&self) -> Option<InstanceCell> {
1815        self.inner.upgrade().map(InstanceCell::wrap)
1816    }
1817}
1818
1819/// A polymorphic dictionary that stores ports for an actor's handlers.
1820/// The interface memoizes the ports so that they are reused. We do not
1821/// (yet) support stable identifiers across multiple instances of the same
1822/// actor.
1823pub struct Ports<A: Actor> {
1824    ports: DashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
1825    bound: DashMap<u64, &'static str>,
1826    mailbox: Mailbox,
1827    workq: OrderedSender<WorkCell<A>>,
1828}
1829
1830/// A message's sequencer number infomation.
1831#[derive(Serialize, Deserialize, Clone, Named, AttrValue)]
1832pub struct SeqInfo {
1833    /// Message's session ID
1834    pub session_id: Uuid,
1835    /// Message's sequence number in the given session.
1836    pub seq: u64,
1837}
1838
1839impl fmt::Display for SeqInfo {
1840    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1841        write!(f, "{}:{}", self.session_id, self.seq)
1842    }
1843}
1844
1845impl std::str::FromStr for SeqInfo {
1846    type Err = anyhow::Error;
1847
1848    fn from_str(s: &str) -> Result<Self, Self::Err> {
1849        let parts: Vec<_> = s.split(':').collect();
1850        if parts.len() != 2 {
1851            return Err(anyhow::anyhow!("invalid SeqInfo: {}", s));
1852        }
1853        let session_id: Uuid = parts[0].parse()?;
1854        let seq: u64 = parts[1].parse()?;
1855        Ok(SeqInfo { session_id, seq })
1856    }
1857}
1858
1859declare_attrs! {
1860    /// The sender of this message, the session ID, and the message's sequence
1861    /// number assigned by this session.
1862    pub attr SEQ_INFO: SeqInfo;
1863}
1864
1865impl<A: Actor> Ports<A> {
1866    fn new(mailbox: Mailbox, workq: OrderedSender<WorkCell<A>>) -> Self {
1867        Self {
1868            ports: DashMap::new(),
1869            bound: DashMap::new(),
1870            mailbox,
1871            workq,
1872        }
1873    }
1874
1875    /// Get a port for the Handler<M> of actor A.
1876    pub(crate) fn get<M: Message>(&self) -> PortHandle<M>
1877    where
1878        A: Handler<M>,
1879    {
1880        let key = TypeId::of::<M>();
1881        match self.ports.entry(key) {
1882            Entry::Vacant(entry) => {
1883                // Some special case hackery, but it keeps the rest of the code (relatively) simple.
1884                assert_ne!(
1885                    key,
1886                    TypeId::of::<Signal>(),
1887                    "cannot provision Signal port through `Ports::get`"
1888                );
1889
1890                let type_info = TypeInfo::get_by_typeid(key);
1891                let workq = self.workq.clone();
1892                let actor_id = self.mailbox.actor_id().to_string();
1893                let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
1894                    let seq_info = headers.get(SEQ_INFO).cloned();
1895
1896                    let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
1897                        Box::pin(async move {
1898                            // SAFETY: we guarantee that the passed type_info is for type M.
1899                            unsafe {
1900                                instance
1901                                    .handle_message(actor, type_info, headers, msg)
1902                                    .await
1903                            }
1904                        })
1905                    });
1906                    ACTOR_MESSAGE_QUEUE_SIZE.add(
1907                        1,
1908                        hyperactor_telemetry::kv_pairs!("actor_id" => actor_id.clone()),
1909                    );
1910                    if workq.enable_buffering {
1911                        let SeqInfo { session_id, seq } =
1912                            seq_info.expect("SEQ_INFO must be set when buffering is enabled");
1913
1914                        // TODO: return the message contained in the error instead of dropping them when converting
1915                        // to anyhow::Error. In that way, the message can be picked up by mailbox and returned to sender.
1916                        workq.send(session_id, seq, work).map_err(|e| match e {
1917                            OrderedSenderError::InvalidZeroSeq(_) => {
1918                                anyhow::anyhow!("seq must be greater than 0")
1919                            }
1920                            OrderedSenderError::SendError(e) => anyhow::Error::from(e),
1921                            OrderedSenderError::FlushError(e) => e,
1922                        })
1923                    } else {
1924                        workq.direct_send(work).map_err(anyhow::Error::from)
1925                    }
1926                });
1927                entry.insert(Box::new(port.clone()));
1928                port
1929            }
1930            Entry::Occupied(entry) => {
1931                let port = entry.get();
1932                port.downcast_ref::<PortHandle<M>>().unwrap().clone()
1933            }
1934        }
1935    }
1936
1937    /// Open a (typed) message port as in [`get`], but return a port receiver instead of dispatching
1938    /// the underlying handler.
1939    pub(crate) fn open_message_port<M: Message>(&self) -> Option<(PortHandle<M>, PortReceiver<M>)> {
1940        match self.ports.entry(TypeId::of::<M>()) {
1941            Entry::Vacant(entry) => {
1942                let (port, receiver) = self.mailbox.open_port();
1943                entry.insert(Box::new(port.clone()));
1944                Some((port, receiver))
1945            }
1946            Entry::Occupied(_) => None,
1947        }
1948    }
1949
1950    /// Bind the given message type to its default port.
1951    pub fn bind<M: RemoteMessage>(&self)
1952    where
1953        A: Handler<M>,
1954    {
1955        self.bind_to::<M>(M::port());
1956    }
1957
1958    /// Bind the given message type to the provided port.
1959    /// Ports cannot be rebound to different message types;
1960    /// and attempting to do so will result in a panic.
1961    pub fn bind_to<M: RemoteMessage>(&self, port_index: u64)
1962    where
1963        A: Handler<M>,
1964    {
1965        match self.bound.entry(port_index) {
1966            Entry::Vacant(entry) => {
1967                self.get::<M>().bind_to(port_index);
1968                entry.insert(M::typename());
1969            }
1970            Entry::Occupied(entry) => {
1971                assert_eq!(
1972                    *entry.get(),
1973                    M::typename(),
1974                    "bind {}: port index {} already bound to type {}",
1975                    M::typename(),
1976                    port_index,
1977                    entry.get(),
1978                );
1979            }
1980        }
1981    }
1982}
1983
1984#[cfg(test)]
1985mod tests {
1986    use std::assert_matches::assert_matches;
1987    use std::sync::atomic::AtomicBool;
1988
1989    use hyperactor_macros::export;
1990    use maplit::hashmap;
1991    use serde_json::json;
1992    use tokio::sync::Barrier;
1993    use tokio::sync::oneshot;
1994    use tracing::Level;
1995    use tracing_subscriber::layer::SubscriberExt;
1996    use tracing_test::internal::logs_with_scope_contain;
1997
1998    use super::*;
1999    // needed for in-crate macro expansion
2000    use crate as hyperactor;
2001    use crate::HandleClient;
2002    use crate::Handler;
2003    use crate::OncePortRef;
2004    use crate::PortRef;
2005    use crate::clock::RealClock;
2006    use crate::test_utils::proc_supervison::ProcSupervisionCoordinator;
2007    use crate::test_utils::process_assertion::assert_termination;
2008
2009    impl ActorTreeSnapshot {
2010        #[allow(dead_code)]
2011        fn empty(pid: Index) -> Self {
2012            Self {
2013                pid,
2014                type_name: String::new(),
2015                status: ActorStatus::Idle,
2016                stats: ActorStats::default(),
2017                handlers: HashMap::new(),
2018                children: HashMap::new(),
2019                events: Vec::new(),
2020                spans: Vec::new(),
2021            }
2022        }
2023
2024        fn empty_typed(pid: Index, type_name: String) -> Self {
2025            Self {
2026                pid,
2027                type_name,
2028                status: ActorStatus::Idle,
2029                stats: ActorStats::default(),
2030                handlers: HashMap::new(),
2031                children: HashMap::new(),
2032                events: Vec::new(),
2033                spans: Vec::new(),
2034            }
2035        }
2036    }
2037
2038    #[derive(Debug, Default, Actor)]
2039    #[export]
2040    struct TestActor;
2041
2042    #[derive(Handler, HandleClient, Debug)]
2043    enum TestActorMessage {
2044        Reply(oneshot::Sender<()>),
2045        Wait(oneshot::Sender<()>, oneshot::Receiver<()>),
2046        Forward(ActorHandle<TestActor>, Box<TestActorMessage>),
2047        Noop(),
2048        Fail(anyhow::Error),
2049        Panic(String),
2050        Spawn(oneshot::Sender<ActorHandle<TestActor>>),
2051    }
2052
2053    impl TestActor {
2054        async fn spawn_child(parent: &ActorHandle<TestActor>) -> ActorHandle<TestActor> {
2055            let (tx, rx) = oneshot::channel();
2056            parent.send(TestActorMessage::Spawn(tx)).unwrap();
2057            rx.await.unwrap()
2058        }
2059    }
2060
2061    #[async_trait]
2062    #[crate::forward(TestActorMessage)]
2063    impl TestActorMessageHandler for TestActor {
2064        async fn reply(
2065            &mut self,
2066            _cx: &crate::Context<Self>,
2067            sender: oneshot::Sender<()>,
2068        ) -> Result<(), anyhow::Error> {
2069            sender.send(()).unwrap();
2070            Ok(())
2071        }
2072
2073        async fn wait(
2074            &mut self,
2075            _cx: &crate::Context<Self>,
2076            sender: oneshot::Sender<()>,
2077            receiver: oneshot::Receiver<()>,
2078        ) -> Result<(), anyhow::Error> {
2079            sender.send(()).unwrap();
2080            receiver.await.unwrap();
2081            Ok(())
2082        }
2083
2084        async fn forward(
2085            &mut self,
2086            _cx: &crate::Context<Self>,
2087            destination: ActorHandle<TestActor>,
2088            message: Box<TestActorMessage>,
2089        ) -> Result<(), anyhow::Error> {
2090            // TODO: this needn't be async
2091            destination.send(*message)?;
2092            Ok(())
2093        }
2094
2095        async fn noop(&mut self, _cx: &crate::Context<Self>) -> Result<(), anyhow::Error> {
2096            Ok(())
2097        }
2098
2099        async fn fail(
2100            &mut self,
2101            _cx: &crate::Context<Self>,
2102            err: anyhow::Error,
2103        ) -> Result<(), anyhow::Error> {
2104            Err(err)
2105        }
2106
2107        async fn panic(
2108            &mut self,
2109            _cx: &crate::Context<Self>,
2110            err_msg: String,
2111        ) -> Result<(), anyhow::Error> {
2112            panic!("{}", err_msg);
2113        }
2114
2115        async fn spawn(
2116            &mut self,
2117            cx: &crate::Context<Self>,
2118            reply: oneshot::Sender<ActorHandle<TestActor>>,
2119        ) -> Result<(), anyhow::Error> {
2120            let handle = <Self as Actor>::spawn(cx, ()).await?;
2121            reply.send(handle).unwrap();
2122            Ok(())
2123        }
2124    }
2125
2126    #[tracing_test::traced_test]
2127    #[tokio::test]
2128    async fn test_spawn_actor() {
2129        let proc = Proc::local();
2130        let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2131
2132        // Check on the join handle.
2133        assert!(logs_contain(
2134            format!(
2135                "{}: spawned with {:?}",
2136                handle.actor_id(),
2137                handle.cell().actor_task_handle().unwrap(),
2138            )
2139            .as_str()
2140        ));
2141
2142        let mut state = handle.status().clone();
2143
2144        // Send a ping-pong to the actor. Wait for the actor to become idle.
2145
2146        let (tx, rx) = oneshot::channel::<()>();
2147        handle.send(TestActorMessage::Reply(tx)).unwrap();
2148        rx.await.unwrap();
2149
2150        state
2151            .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2152            .await
2153            .unwrap();
2154
2155        // Make sure we enter processing state while the actor is handling a message.
2156        let (enter_tx, enter_rx) = oneshot::channel::<()>();
2157        let (exit_tx, exit_rx) = oneshot::channel::<()>();
2158
2159        handle
2160            .send(TestActorMessage::Wait(enter_tx, exit_rx))
2161            .unwrap();
2162        enter_rx.await.unwrap();
2163        assert_matches!(*state.borrow(), ActorStatus::Processing(instant, _) if instant <= RealClock.system_time_now());
2164        exit_tx.send(()).unwrap();
2165
2166        state
2167            .wait_for(|state| matches!(*state, ActorStatus::Idle))
2168            .await
2169            .unwrap();
2170
2171        handle.drain_and_stop().unwrap();
2172        handle.await;
2173        assert_matches!(*state.borrow(), ActorStatus::Stopped);
2174    }
2175
2176    #[tokio::test]
2177    async fn test_proc_actors_messaging() {
2178        let proc = Proc::local();
2179        let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2180        let second = proc.spawn::<TestActor>("second", ()).await.unwrap();
2181        let (tx, rx) = oneshot::channel::<()>();
2182        let reply_message = TestActorMessage::Reply(tx);
2183        first
2184            .send(TestActorMessage::Forward(second, Box::new(reply_message)))
2185            .unwrap();
2186        rx.await.unwrap();
2187    }
2188
2189    #[derive(Debug, Default, Actor)]
2190    struct LookupTestActor;
2191
2192    #[derive(Handler, HandleClient, Debug)]
2193    enum LookupTestMessage {
2194        ActorExists(ActorRef<TestActor>, #[reply] OncePortRef<bool>),
2195    }
2196
2197    #[async_trait]
2198    #[crate::forward(LookupTestMessage)]
2199    impl LookupTestMessageHandler for LookupTestActor {
2200        async fn actor_exists(
2201            &mut self,
2202            cx: &crate::Context<Self>,
2203            actor_ref: ActorRef<TestActor>,
2204        ) -> Result<bool, anyhow::Error> {
2205            Ok(actor_ref.downcast_handle(cx).is_some())
2206        }
2207    }
2208
2209    #[tokio::test]
2210    async fn test_actor_lookup() {
2211        let proc = Proc::local();
2212        let (client, _handle) = proc.instance("client").unwrap();
2213
2214        let target_actor = proc.spawn::<TestActor>("target", ()).await.unwrap();
2215        let target_actor_ref = target_actor.bind();
2216        let lookup_actor = proc.spawn::<LookupTestActor>("lookup", ()).await.unwrap();
2217
2218        assert!(
2219            lookup_actor
2220                .actor_exists(&client, target_actor_ref.clone())
2221                .await
2222                .unwrap()
2223        );
2224
2225        // Make up a child actor. It shouldn't exist.
2226        assert!(
2227            !lookup_actor
2228                .actor_exists(
2229                    &client,
2230                    ActorRef::attest(target_actor.actor_id().child_id(123).clone())
2231                )
2232                .await
2233                .unwrap()
2234        );
2235        // A wrongly-typed actor ref should also not obtain.
2236        assert!(
2237            !lookup_actor
2238                .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone()))
2239                .await
2240                .unwrap()
2241        );
2242
2243        target_actor.drain_and_stop().unwrap();
2244        target_actor.await;
2245
2246        assert!(
2247            !lookup_actor
2248                .actor_exists(&client, target_actor_ref)
2249                .await
2250                .unwrap()
2251        );
2252
2253        lookup_actor.drain_and_stop().unwrap();
2254        lookup_actor.await;
2255    }
2256
2257    fn validate_link(child: &InstanceCell, parent: &InstanceCell) {
2258        assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id());
2259        assert_eq!(
2260            child.inner.parent.upgrade().unwrap().actor_id(),
2261            parent.actor_id()
2262        );
2263        assert_matches!(
2264            parent.inner.children.get(&child.pid()),
2265            Some(node) if node.actor_id() == child.actor_id()
2266        );
2267    }
2268
2269    #[tracing_test::traced_test]
2270    #[tokio::test]
2271    async fn test_spawn_child() {
2272        let proc = Proc::local();
2273
2274        let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
2275        let second = TestActor::spawn_child(&first).await;
2276        let third = TestActor::spawn_child(&second).await;
2277
2278        // Check we've got the join handles.
2279        assert!(logs_with_scope_contain(
2280            "hyperactor::proc",
2281            format!(
2282                "{}: spawned with {:?}",
2283                first.actor_id(),
2284                first.cell().actor_task_handle().unwrap()
2285            )
2286            .as_str()
2287        ));
2288        assert!(logs_with_scope_contain(
2289            "hyperactor::proc",
2290            format!(
2291                "{}: spawned with {:?}",
2292                second.actor_id(),
2293                second.cell().actor_task_handle().unwrap()
2294            )
2295            .as_str()
2296        ));
2297        assert!(logs_with_scope_contain(
2298            "hyperactor::proc",
2299            format!(
2300                "{}: spawned with {:?}",
2301                third.actor_id(),
2302                third.cell().actor_task_handle().unwrap()
2303            )
2304            .as_str()
2305        ));
2306
2307        // These are allocated in sequence:
2308        assert_eq!(first.actor_id().proc_id(), proc.proc_id());
2309        assert_eq!(second.actor_id(), &first.actor_id().child_id(1));
2310        assert_eq!(third.actor_id(), &first.actor_id().child_id(2));
2311
2312        // Supervision tree is constructed correctly.
2313        validate_link(third.cell(), second.cell());
2314        validate_link(second.cell(), first.cell());
2315        assert!(first.cell().inner.parent.upgrade().is_none());
2316
2317        // Supervision tree is torn down correctly.
2318        third.drain_and_stop().unwrap();
2319        third.await;
2320        assert!(second.cell().inner.children.is_empty());
2321        validate_link(second.cell(), first.cell());
2322
2323        second.drain_and_stop().unwrap();
2324        second.await;
2325        assert!(first.cell().inner.children.is_empty());
2326    }
2327
2328    #[tokio::test]
2329    async fn test_child_lifecycle() {
2330        let proc = Proc::local();
2331
2332        let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2333        let root_1 = TestActor::spawn_child(&root).await;
2334        let root_2 = TestActor::spawn_child(&root).await;
2335        let root_2_1 = TestActor::spawn_child(&root_2).await;
2336
2337        root.drain_and_stop().unwrap();
2338        root.await;
2339
2340        for actor in [root_1, root_2, root_2_1] {
2341            assert!(actor.send(TestActorMessage::Noop()).is_err());
2342            assert_matches!(actor.await, ActorStatus::Stopped);
2343        }
2344    }
2345
2346    #[tokio::test]
2347    async fn test_parent_failure() {
2348        let proc = Proc::local();
2349        // Need to set a supervison coordinator for this Proc because there will
2350        // be actor failure(s) in this test which trigger supervision.
2351        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2352
2353        let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2354        let root_1 = TestActor::spawn_child(&root).await;
2355        let root_2 = TestActor::spawn_child(&root).await;
2356        let root_2_1 = TestActor::spawn_child(&root_2).await;
2357
2358        root_2
2359            .send(TestActorMessage::Fail(anyhow::anyhow!(
2360                "some random failure"
2361            )))
2362            .unwrap();
2363        let root_2_actor_id = root_2.actor_id().clone();
2364        assert_matches!(
2365            root_2.await,
2366            ActorStatus::Failed(err) if err == format!("serving {}: processing error: some random failure", root_2_actor_id)
2367        );
2368
2369        // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
2370        // stopped by a parent failure?
2371        assert_eq!(
2372            root.await,
2373            ActorStatus::Failed("did not handle supervision event".to_string())
2374        );
2375        assert_eq!(root_2_1.await, ActorStatus::Stopped);
2376        assert_eq!(root_1.await, ActorStatus::Stopped);
2377    }
2378
2379    #[tokio::test]
2380    async fn test_actor_ledger() {
2381        async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
2382            actor_handle
2383                .status()
2384                .wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Idle))
2385                .await
2386                .unwrap();
2387        }
2388
2389        let proc = Proc::local();
2390
2391        // Add the 1st root. This root will remain active until the end of the test.
2392        let root: ActorHandle<TestActor> = proc.spawn::<TestActor>("root", ()).await.unwrap();
2393        wait_until_idle(&root).await;
2394        {
2395            let snapshot = proc.state().ledger.snapshot();
2396            assert_eq!(
2397                snapshot.roots,
2398                hashmap! {
2399                    root.actor_id().clone() =>
2400                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2401                },
2402            );
2403        }
2404
2405        // Add the 2nd root.
2406        let another_root: ActorHandle<TestActor> =
2407            proc.spawn::<TestActor>("another_root", ()).await.unwrap();
2408        wait_until_idle(&another_root).await;
2409        {
2410            let snapshot = proc.state().ledger.snapshot();
2411            assert_eq!(
2412                snapshot.roots,
2413                hashmap! {
2414                    root.actor_id().clone() =>
2415                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2416                    another_root.actor_id().clone() =>
2417                        ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string()),
2418                },
2419            );
2420        }
2421
2422        // Stop the 2nd root. It should be excluded from the snapshot after it
2423        // is stopped.
2424        another_root.drain_and_stop().unwrap();
2425        another_root.await;
2426        {
2427            let snapshot = proc.state().ledger.snapshot();
2428            assert_eq!(
2429                snapshot.roots,
2430                hashmap! { root.actor_id().clone() =>
2431                    ActorTreeSnapshot::empty_typed(0, "hyperactor::proc::tests::TestActor".to_string())
2432                },
2433            );
2434        }
2435
2436        // Incrementally add the following children tree to root. This tree
2437        // should be captured by snapshot.
2438        //     root -> root_1 -> root_1_1
2439        //         |-> root_2
2440
2441        let root_1 = TestActor::spawn_child(&root).await;
2442        wait_until_idle(&root_1).await;
2443        {
2444            let snapshot = proc.state().ledger.snapshot();
2445            assert_eq!(
2446                snapshot.roots,
2447                hashmap! {
2448                    root.actor_id().clone() =>  ActorTreeSnapshot {
2449                        pid: 0,
2450                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2451                        status: ActorStatus::Idle,
2452                        stats: ActorStats { num_processed_messages: 1 },
2453                        handlers: HashMap::new(),
2454                        children: hashmap! {
2455                            root_1.actor_id().pid() =>
2456                                ActorTreeSnapshot::empty_typed(
2457                                    root_1.actor_id().pid(),
2458                                    "hyperactor::proc::tests::TestActor".to_string()
2459                                )
2460                        },
2461                        events: Vec::new(),
2462                        spans: Vec::new(),
2463                    }
2464                },
2465            );
2466        }
2467
2468        let root_1_1 = TestActor::spawn_child(&root_1).await;
2469        wait_until_idle(&root_1_1).await;
2470        {
2471            let snapshot = proc.state().ledger.snapshot();
2472            assert_eq!(
2473                snapshot.roots,
2474                hashmap! {
2475                    root.actor_id().clone() =>  ActorTreeSnapshot {
2476                        pid: 0,
2477                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2478                        status: ActorStatus::Idle,
2479                        stats: ActorStats { num_processed_messages: 1 },
2480                        handlers: HashMap::new(),
2481                        children: hashmap!{
2482                            root_1.actor_id().pid() =>
2483                                ActorTreeSnapshot {
2484                                    pid: root_1.actor_id().pid(),
2485                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2486                                    status: ActorStatus::Idle,
2487                                    stats: ActorStats { num_processed_messages: 1 },
2488                                    handlers: HashMap::new(),
2489                                    children: hashmap!{
2490                                        root_1_1.actor_id().pid() =>
2491                                            ActorTreeSnapshot::empty_typed(
2492                                                root_1_1.actor_id().pid(),
2493                                                "hyperactor::proc::tests::TestActor".to_string()
2494                                            )
2495                                    },
2496                                    events: Vec::new(),
2497                                    spans: Vec::new(),
2498                                }
2499                        },
2500                        events: Vec::new(),
2501                        spans: Vec::new(),
2502                    },
2503                }
2504            );
2505        }
2506
2507        let root_2 = TestActor::spawn_child(&root).await;
2508        wait_until_idle(&root_2).await;
2509        {
2510            let snapshot = proc.state().ledger.snapshot();
2511            assert_eq!(
2512                snapshot.roots,
2513                hashmap! {
2514                    root.actor_id().clone() =>  ActorTreeSnapshot {
2515                        pid: 0,
2516                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2517                        status: ActorStatus::Idle,
2518                        stats: ActorStats { num_processed_messages: 2 },
2519                        handlers: HashMap::new(),
2520                        children: hashmap!{
2521                            root_2.actor_id().pid() =>
2522                                ActorTreeSnapshot{
2523                                    pid: root_2.actor_id().pid(),
2524                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2525                                    status: ActorStatus::Idle,
2526                                    stats: ActorStats::default(),
2527                                    handlers: HashMap::new(),
2528                                    children: HashMap::new(),
2529                                    events: Vec::new(),
2530                                    spans: Vec::new(),
2531                                },
2532                            root_1.actor_id().pid() =>
2533                                ActorTreeSnapshot{
2534                                    pid: root_1.actor_id().pid(),
2535                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2536                                    status: ActorStatus::Idle,
2537                                    stats: ActorStats { num_processed_messages: 1 },
2538                                    handlers: HashMap::new(),
2539                                    children: hashmap!{
2540                                        root_1_1.actor_id().pid() =>
2541                                            ActorTreeSnapshot::empty_typed(
2542                                                root_1_1.actor_id().pid(),
2543                                                "hyperactor::proc::tests::TestActor".to_string()
2544                                            )
2545                                    },
2546                                    events: Vec::new(),
2547                                    spans: Vec::new(),
2548                                },
2549                        },
2550                        events: Vec::new(),
2551                        spans: Vec::new(),
2552                    },
2553                }
2554            );
2555        }
2556
2557        // Stop root_1. This should remove it, and its child, from snapshot.
2558        root_1.drain_and_stop().unwrap();
2559        root_1.await;
2560        {
2561            let snapshot = proc.state().ledger.snapshot();
2562            assert_eq!(
2563                snapshot.roots,
2564                hashmap! {
2565                    root.actor_id().clone() =>  ActorTreeSnapshot {
2566                        pid: 0,
2567                        type_name: "hyperactor::proc::tests::TestActor".to_string(),
2568                        status: ActorStatus::Idle,
2569                        stats: ActorStats { num_processed_messages: 3 },
2570                        handlers: HashMap::new(),
2571                        children: hashmap!{
2572                            root_2.actor_id().pid() =>
2573                                ActorTreeSnapshot {
2574                                    pid: root_2.actor_id().pid(),
2575                                    type_name: "hyperactor::proc::tests::TestActor".to_string(),
2576                                    status: ActorStatus::Idle,
2577                                    stats: ActorStats::default(),
2578                                    handlers: HashMap::new(),
2579                                    children: HashMap::new(),
2580                                    events: Vec::new(),
2581                                    spans: Vec::new(),
2582                                }
2583                        },
2584                        events: Vec::new(),
2585                        spans: Vec::new(),
2586                    },
2587                }
2588            );
2589        }
2590
2591        // Finally stop root. No roots should be left in snapshot.
2592        root.drain_and_stop().unwrap();
2593        root.await;
2594        {
2595            let snapshot = proc.state().ledger.snapshot();
2596            assert_eq!(snapshot.roots, hashmap! {});
2597        }
2598    }
2599
2600    #[tokio::test]
2601    async fn test_multi_handler() {
2602        // TEMPORARY: This test is currently a bit awkward since we don't yet expose
2603        // public interfaces to multi-handlers. This will be fixed shortly.
2604
2605        #[derive(Debug)]
2606        struct TestActor(Arc<AtomicUsize>);
2607
2608        #[async_trait]
2609        impl Actor for TestActor {
2610            type Params = Arc<AtomicUsize>;
2611
2612            async fn new(param: Arc<AtomicUsize>) -> Result<Self, anyhow::Error> {
2613                Ok(Self(param))
2614            }
2615        }
2616
2617        #[async_trait]
2618        impl Handler<OncePortHandle<PortHandle<usize>>> for TestActor {
2619            async fn handle(
2620                &mut self,
2621                cx: &crate::Context<Self>,
2622                message: OncePortHandle<PortHandle<usize>>,
2623            ) -> anyhow::Result<()> {
2624                message.send(cx.port())?;
2625                Ok(())
2626            }
2627        }
2628
2629        #[async_trait]
2630        impl Handler<usize> for TestActor {
2631            async fn handle(
2632                &mut self,
2633                _cx: &crate::Context<Self>,
2634                message: usize,
2635            ) -> anyhow::Result<()> {
2636                self.0.fetch_add(message, Ordering::SeqCst);
2637                Ok(())
2638            }
2639        }
2640
2641        let proc = Proc::local();
2642        let state = Arc::new(AtomicUsize::new(0));
2643        let handle = proc
2644            .spawn::<TestActor>("test", state.clone())
2645            .await
2646            .unwrap();
2647        let client = proc.attach("client").unwrap();
2648        let (tx, rx) = client.open_once_port();
2649        handle.send(tx).unwrap();
2650        let usize_handle = rx.recv().await.unwrap();
2651        usize_handle.send(123).unwrap();
2652
2653        handle.drain_and_stop().unwrap();
2654        handle.await;
2655
2656        assert_eq!(state.load(Ordering::SeqCst), 123);
2657    }
2658
2659    #[tokio::test]
2660    async fn test_actor_panic() {
2661        // Need this custom hook to store panic backtrace in task_local.
2662        panic_handler::set_panic_hook();
2663
2664        let proc = Proc::local();
2665        // Need to set a supervison coordinator for this Proc because there will
2666        // be actor failure(s) in this test which trigger supervision.
2667        ProcSupervisionCoordinator::set(&proc).await.unwrap();
2668
2669        let (client, _handle) = proc.instance("client").unwrap();
2670        let actor_handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
2671        actor_handle
2672            .panic(&client, "some random failure".to_string())
2673            .await
2674            .unwrap();
2675        let actor_status = actor_handle.await;
2676
2677        // Note: even when the test passes, the panic stacktrace will still be
2678        // printed to stderr because that is the behavior controlled by the panic
2679        // hook.
2680        assert_matches!(actor_status, ActorStatus::Failed(_));
2681        if let ActorStatus::Failed(err) = actor_status {
2682            let error_msg = err.to_string();
2683            // Verify panic message is captured
2684            assert!(error_msg.contains("some random failure"));
2685            // Verify backtrace is captured. Note the backtrace message might
2686            // change in the future. If that happens, we need to update this
2687            // statement with something up-to-date.
2688            assert!(error_msg.contains("rust_begin_unwind"));
2689        }
2690    }
2691
2692    #[tokio::test]
2693    async fn test_local_supervision_propagation() {
2694        #[derive(Debug)]
2695        struct TestActor(Arc<AtomicBool>, bool);
2696
2697        #[async_trait]
2698        impl Actor for TestActor {
2699            type Params = (Arc<AtomicBool>, bool);
2700
2701            async fn new(param: (Arc<AtomicBool>, bool)) -> Result<Self, anyhow::Error> {
2702                Ok(Self(param.0, param.1))
2703            }
2704
2705            async fn handle_supervision_event(
2706                &mut self,
2707                _this: &Instance<Self>,
2708                _event: &ActorSupervisionEvent,
2709            ) -> Result<bool, anyhow::Error> {
2710                if !self.1 {
2711                    return Ok(false);
2712                }
2713
2714                tracing::error!(
2715                    "{}: supervision event received: {:?}",
2716                    _this.self_id(),
2717                    _event
2718                );
2719                self.0.store(true, Ordering::SeqCst);
2720                Ok(true)
2721            }
2722        }
2723
2724        #[async_trait]
2725        impl Handler<String> for TestActor {
2726            async fn handle(
2727                &mut self,
2728                cx: &crate::Context<Self>,
2729                message: String,
2730            ) -> anyhow::Result<()> {
2731                tracing::info!("{} received message: {}", cx.self_id(), message);
2732                Err(anyhow::anyhow!(message))
2733            }
2734        }
2735
2736        let proc = Proc::local();
2737        let reported_event = ProcSupervisionCoordinator::set(&proc).await.unwrap();
2738
2739        let root_state = Arc::new(AtomicBool::new(false));
2740        let root_1_state = Arc::new(AtomicBool::new(false));
2741        let root_1_1_state = Arc::new(AtomicBool::new(false));
2742        let root_1_1_1_state = Arc::new(AtomicBool::new(false));
2743        let root_2_state = Arc::new(AtomicBool::new(false));
2744        let root_2_1_state = Arc::new(AtomicBool::new(false));
2745
2746        let root = proc
2747            .spawn::<TestActor>("root", (root_state.clone(), false))
2748            .await
2749            .unwrap();
2750        let root_1 = proc
2751            .spawn_child::<TestActor>(
2752                root.cell().clone(),
2753                (
2754                    root_1_state.clone(),
2755                    true, /* set true so children's event stops here */
2756                ),
2757            )
2758            .await
2759            .unwrap();
2760        let root_1_1 = proc
2761            .spawn_child::<TestActor>(root_1.cell().clone(), (root_1_1_state.clone(), false))
2762            .await
2763            .unwrap();
2764        let root_1_1_1 = proc
2765            .spawn_child::<TestActor>(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false))
2766            .await
2767            .unwrap();
2768        let root_2 = proc
2769            .spawn_child::<TestActor>(root.cell().clone(), (root_2_state.clone(), false))
2770            .await
2771            .unwrap();
2772        let root_2_1 = proc
2773            .spawn_child::<TestActor>(root_2.cell().clone(), (root_2_1_state.clone(), false))
2774            .await
2775            .unwrap();
2776
2777        // fail `root_1_1_1`, the supervision msg should be propagated to
2778        // `root_1` because `root_1` has set `true` to `handle_supervision_event`.
2779        root_1_1_1
2780            .send::<String>("some random failure".into())
2781            .unwrap();
2782
2783        // fail `root_2_1`, the supervision msg should be propagated to
2784        // ProcSupervisionCoordinator.
2785        root_2_1
2786            .send::<String>("some random failure".into())
2787            .unwrap();
2788
2789        RealClock.sleep(Duration::from_secs(1)).await;
2790
2791        assert!(!root_state.load(Ordering::SeqCst));
2792        assert!(root_1_state.load(Ordering::SeqCst));
2793        assert!(!root_1_1_state.load(Ordering::SeqCst));
2794        assert!(!root_1_1_1_state.load(Ordering::SeqCst));
2795        assert!(!root_2_state.load(Ordering::SeqCst));
2796        assert!(!root_2_1_state.load(Ordering::SeqCst));
2797        assert_eq!(
2798            reported_event.event().map(|e| e.actor_id.clone()),
2799            Some(root.actor_id().clone())
2800        );
2801    }
2802
2803    #[tokio::test]
2804    async fn test_supervision_event_handler_propagates() {
2805        #[derive(Debug)]
2806        struct FailingSupervisionActor;
2807
2808        #[async_trait]
2809        impl Actor for FailingSupervisionActor {
2810            type Params = ();
2811
2812            async fn new(_: ()) -> Result<Self, anyhow::Error> {
2813                Ok(Self)
2814            }
2815
2816            async fn handle_supervision_event(
2817                &mut self,
2818                _this: &Instance<Self>,
2819                _event: &ActorSupervisionEvent,
2820            ) -> Result<bool, anyhow::Error> {
2821                anyhow::bail!("failed to handle supervision event!")
2822            }
2823        }
2824
2825        #[async_trait]
2826        impl Handler<String> for FailingSupervisionActor {
2827            async fn handle(
2828                &mut self,
2829                _cx: &crate::Context<Self>,
2830                message: String,
2831            ) -> anyhow::Result<()> {
2832                Err(anyhow::anyhow!(message))
2833            }
2834        }
2835
2836        #[derive(Debug)]
2837        struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2838
2839        #[async_trait]
2840        impl Actor for ParentActor {
2841            type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2842
2843            async fn new(
2844                supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2845            ) -> Result<Self, anyhow::Error> {
2846                Ok(Self(supervision_events))
2847            }
2848
2849            async fn handle_supervision_event(
2850                &mut self,
2851                _this: &Instance<Self>,
2852                event: &ActorSupervisionEvent,
2853            ) -> Result<bool, anyhow::Error> {
2854                self.0.send(event.clone()).unwrap();
2855                Ok(true)
2856            }
2857        }
2858
2859        let proc = Proc::local();
2860
2861        let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2862
2863        let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2864        let child = proc
2865            .spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2866            .await
2867            .unwrap();
2868        let grandchild = proc
2869            .spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2870            .await
2871            .unwrap();
2872
2873        let child_actor_id = child.actor_id().clone();
2874        let grandchild_actor_id = grandchild.actor_id().clone();
2875
2876        // Grandchild fails, triggering failure up the tree, finally receiving
2877        // the event at the root.
2878        grandchild.send("trigger failure".to_string()).unwrap();
2879
2880        assert!(grandchild.await.is_failed());
2881        assert!(child.await.is_failed());
2882
2883        assert_eq!(
2884            event_rx.recv().await.unwrap(),
2885            // The time field is ignored for Eq and PartialEq.
2886            ActorSupervisionEvent::new(
2887                child_actor_id,
2888                ActorStatus::Failed(
2889                    "failed to handle supervision event: failed to handle supervision event!"
2890                        .to_string(),
2891                ),
2892                None,
2893                Some(Box::new(ActorSupervisionEvent::new(
2894                    grandchild_actor_id,
2895                    ActorStatus::Failed(
2896                        "serving local[0].parent[2]: processing error: trigger failure".to_string()
2897                    ),
2898                    None,
2899                    None,
2900                ))),
2901            )
2902        );
2903
2904        assert!(event_rx.try_recv().is_err());
2905    }
2906
2907    #[tokio::test]
2908    async fn test_instance() {
2909        #[derive(Debug, Default, Actor)]
2910        struct TestActor;
2911
2912        #[async_trait]
2913        impl Handler<(String, PortRef<String>)> for TestActor {
2914            async fn handle(
2915                &mut self,
2916                cx: &crate::Context<Self>,
2917                (message, port): (String, PortRef<String>),
2918            ) -> anyhow::Result<()> {
2919                port.send(cx, message)?;
2920                Ok(())
2921            }
2922        }
2923
2924        let proc = Proc::local();
2925
2926        let (instance, handle) = proc.instance("my_test_actor").unwrap();
2927
2928        let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
2929
2930        let (port, mut receiver) = instance.open_port();
2931        child_actor
2932            .send(("hello".to_string(), port.bind()))
2933            .unwrap();
2934
2935        let message = receiver.recv().await.unwrap();
2936        assert_eq!(message, "hello");
2937
2938        child_actor.drain_and_stop().unwrap();
2939        child_actor.await;
2940
2941        assert_eq!(*handle.status().borrow(), ActorStatus::Client);
2942        drop(instance);
2943        assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
2944        handle.await;
2945    }
2946
2947    #[tokio::test]
2948    async fn test_proc_terminate_without_coordinator() {
2949        if std::env::var("CARGO_TEST").is_ok() {
2950            eprintln!("test skipped as it hangs when run by cargo in sandcastle");
2951            return;
2952        }
2953
2954        let process = async {
2955            let proc = Proc::local();
2956            // Intentionally not setting a proc supervison coordinator. This
2957            // should cause the process to terminate.
2958            // ProcSupervisionCoordinator::set(&proc).await.unwrap();
2959            let root = proc.spawn::<TestActor>("root", ()).await.unwrap();
2960            let (client, _handle) = proc.instance("client").unwrap();
2961            root.fail(&client, anyhow::anyhow!("some random failure"))
2962                .await
2963                .unwrap();
2964            // It is okay to sleep a long time here, because we expect this
2965            // process to be terminated way before the sleep ends due to the
2966            // missing proc supervison coordinator.
2967            RealClock.sleep(Duration::from_secs(30)).await;
2968        };
2969
2970        assert_termination(|| process, 1).await.unwrap();
2971    }
2972
2973    fn trace_and_block(fut: impl Future) {
2974        tracing::subscriber::with_default(
2975            tracing_subscriber::Registry::default().with(hyperactor_telemetry::recorder().layer()),
2976            || {
2977                tokio::runtime::Builder::new_current_thread()
2978                    .enable_all()
2979                    .build()
2980                    .unwrap()
2981                    .block_on(fut)
2982            },
2983        );
2984    }
2985
2986    #[ignore = "until trace recording is turned back on"]
2987    #[test]
2988    fn test_handler_logging() {
2989        #[derive(Debug, Default, Actor)]
2990        struct LoggingActor;
2991
2992        impl LoggingActor {
2993            async fn wait(handle: &ActorHandle<Self>) {
2994                let barrier = Arc::new(Barrier::new(2));
2995                handle.send(barrier.clone()).unwrap();
2996                barrier.wait().await;
2997            }
2998        }
2999
3000        #[async_trait]
3001        impl Handler<String> for LoggingActor {
3002            async fn handle(
3003                &mut self,
3004                _cx: &crate::Context<Self>,
3005                message: String,
3006            ) -> anyhow::Result<()> {
3007                tracing::info!("{}", message);
3008                Ok(())
3009            }
3010        }
3011
3012        #[async_trait]
3013        impl Handler<u64> for LoggingActor {
3014            async fn handle(
3015                &mut self,
3016                _cx: &crate::Context<Self>,
3017                message: u64,
3018            ) -> anyhow::Result<()> {
3019                tracing::event!(Level::INFO, number = message);
3020                Ok(())
3021            }
3022        }
3023
3024        #[async_trait]
3025        impl Handler<Arc<Barrier>> for LoggingActor {
3026            async fn handle(
3027                &mut self,
3028                _cx: &crate::Context<Self>,
3029                message: Arc<Barrier>,
3030            ) -> anyhow::Result<()> {
3031                message.wait().await;
3032                Ok(())
3033            }
3034        }
3035
3036        #[async_trait]
3037        impl Handler<Arc<(Barrier, Barrier)>> for LoggingActor {
3038            async fn handle(
3039                &mut self,
3040                _cx: &crate::Context<Self>,
3041                barriers: Arc<(Barrier, Barrier)>,
3042            ) -> anyhow::Result<()> {
3043                let inner = tracing::span!(Level::INFO, "child_span");
3044                let _inner_guard = inner.enter();
3045                barriers.0.wait().await;
3046                barriers.1.wait().await;
3047                Ok(())
3048            }
3049        }
3050
3051        trace_and_block(async {
3052            let handle = LoggingActor::spawn_detached(()).await.unwrap();
3053            handle.send("hello world".to_string()).unwrap();
3054            handle.send("hello world again".to_string()).unwrap();
3055            handle.send(123u64).unwrap();
3056
3057            LoggingActor::wait(&handle).await;
3058
3059            let events = handle.cell().inner.recording.tail();
3060            assert_eq!(events.len(), 3);
3061            assert_eq!(events[0].json_value(), json!({ "message": "hello world" }));
3062            assert_eq!(
3063                events[1].json_value(),
3064                json!({ "message": "hello world again" })
3065            );
3066            assert_eq!(events[2].json_value(), json!({ "number": 123 }));
3067
3068            let stacks = {
3069                let barriers = Arc::new((Barrier::new(2), Barrier::new(2)));
3070                handle.send(Arc::clone(&barriers)).unwrap();
3071                barriers.0.wait().await;
3072                let stacks = handle.cell().inner.recording.stacks();
3073                barriers.1.wait().await;
3074                stacks
3075            };
3076            assert_eq!(stacks.len(), 1);
3077            assert_eq!(stacks[0].len(), 1);
3078            assert_eq!(stacks[0][0].name(), "child_span");
3079        })
3080    }
3081}